Compare commits

...

22 Commits

Author SHA1 Message Date
Evan Lohn
e1f4a52661 fast? 2025-05-06 18:13:42 -07:00
Evan Lohn
4a70b33d87 time 2025-05-06 17:36:38 -07:00
Evan Lohn
209dcc006e bs 2025-05-06 17:24:25 -07:00
Evan Lohn
18a024b759 back to business 2025-05-06 16:43:03 -07:00
Evan Lohn
320c4eec46 moo 2025-05-06 14:48:03 -07:00
Evan Lohn
75b1862600 ko 2025-05-06 14:19:42 -07:00
Evan Lohn
aac6390aa1 hmm 2025-05-06 13:57:52 -07:00
Evan Lohn
1c6584f0b2 drastic measures 2025-05-06 13:31:14 -07:00
Evan Lohn
1d15ac7f34 mn 2025-05-06 10:51:01 -07:00
Evan Lohn
8594b72b32 mm 2025-05-06 10:34:24 -07:00
Evan Lohn
5a7c07fc44 minor change 2025-05-06 09:54:14 -07:00
Evan Lohn
70df0be775 moo 2025-05-06 09:54:14 -07:00
Evan Lohn
0e5f90cebd loo 2025-05-06 09:54:14 -07:00
Evan Lohn
5788dc011a l 2025-05-06 09:54:14 -07:00
Evan Lohn
d91728fe86 lo 2025-05-06 09:54:14 -07:00
Evan Lohn
ae887d0540 logzz 2025-05-06 09:54:14 -07:00
Evan Lohn
fc3d6ba47f fix 2025-05-06 09:54:14 -07:00
Evan Lohn
a3c8be4cb3 looggss 2025-05-06 09:54:14 -07:00
Evan Lohn
6164223656 even more logs 2025-05-06 09:54:14 -07:00
Evan Lohn
c0eb19a942 even more logs 2025-05-06 09:54:14 -07:00
Evan Lohn
f18d719ed6 more logs 2025-05-06 09:54:14 -07:00
Evan Lohn
b4da2130ef debug logs 2025-05-06 09:54:14 -07:00
8 changed files with 188 additions and 35 deletions

View File

@@ -261,7 +261,7 @@ def _run_indexing(
3. Updates Postgres to record the indexed documents + the outcome of this run
"""
start_time = time.monotonic() # jsut used for logging
logger.error("Starting indexing run")
with get_session_with_current_tenant() as db_session_temp:
index_attempt_start = get_index_attempt(db_session_temp, index_attempt_id)
if not index_attempt_start:
@@ -315,6 +315,7 @@ def _run_indexing(
# don't go into "negative" time if we've never indexed before
window_start = datetime.fromtimestamp(0, tz=timezone.utc)
logger.error("Getting most recent attempt")
most_recent_attempt = next(
iter(
get_recent_completed_attempts_for_cc_pair(
@@ -326,6 +327,7 @@ def _run_indexing(
),
None,
)
logger.error(f"Most recent attempt: {most_recent_attempt}")
# if the last attempt failed, try and use the same window. This is necessary
# to ensure correctness with checkpointing. If we don't do this, things like
# new slack channels could be missed (since existing slack channels are
@@ -361,6 +363,7 @@ def _run_indexing(
httpx_client=HttpxPool.get("vespa"),
)
logger.error("Building indexing pipeline")
indexing_pipeline = build_indexing_pipeline(
embedder=embedding_model,
information_content_classification_model=information_content_classification_model,
@@ -447,6 +450,9 @@ def _run_indexing(
for document_batch, failure, next_checkpoint in connector_runner.run(
checkpoint
):
logger.info(
f"Document batch: {len(document_batch) if document_batch else 0}"
)
# Check if connector is disabled mid run and stop if so unless it's the secondary
# index being built. We want to populate it even for paused connectors
# Often paused connectors are sources that aren't updated frequently but the
@@ -467,6 +473,7 @@ def _run_indexing(
db_session_temp, ctx, index_attempt_id
)
logger.info(f"Maybe failure: {failure}")
# save record of any failures at the connector level
if failure is not None:
total_failures += 1
@@ -482,6 +489,7 @@ def _run_indexing(
total_failures, document_count, batch_num, failure
)
logger.info(f"Next checkpoint: {next_checkpoint}")
# save the new checkpoint (if one is provided)
if next_checkpoint:
checkpoint = next_checkpoint
@@ -513,7 +521,7 @@ def _run_indexing(
f"threshold={INDEXING_SIZE_WARNING_THRESHOLD}"
)
logger.debug(f"Indexing batch of documents: {batch_description}")
logger.info(f"Indexing batch of documents: {batch_description}")
index_attempt_md.request_id = make_randomized_onyx_request_id("CIX")
index_attempt_md.structured_id = (
@@ -532,6 +540,10 @@ def _run_indexing(
chunk_count += index_pipeline_result.total_chunks
document_count += index_pipeline_result.total_docs
logger.info(
f"From run_indexing: indexed {index_pipeline_result.total_docs} docs"
)
# resolve errors for documents that were successfully indexed
failed_document_ids = [
failure.failed_document.document_id
@@ -573,6 +585,8 @@ def _run_indexing(
index_pipeline_result.failures[-1],
)
logger.info("From run_indexing: updating indexed docs")
# This new value is updated every batch, so UI can refresh per batch update
with get_session_with_current_tenant() as db_session_temp:
# NOTE: Postgres uses the start of the transactions when computing `NOW()`
@@ -782,7 +796,7 @@ def run_indexing_entrypoint(
callback: IndexingHeartbeatInterface | None = None,
) -> None:
"""Don't swallow exceptions here ... propagate them up."""
logger.error("Starting indexing run: run_indexing_entrypoint")
if is_ee:
global_version.set_ee()

View File

@@ -117,6 +117,9 @@ class ConnectorRunner(Generic[CT]):
yield None, failure, None
if len(self.doc_batch) >= self.batch_size:
logger.info(
f"From runner: Yielding batch of {len(self.doc_batch)} documents"
)
yield self.doc_batch, None, None
self.doc_batch = []
@@ -126,6 +129,7 @@ class ConnectorRunner(Generic[CT]):
self.doc_batch = []
yield None, None, next_checkpoint
logger.info("From runner: Yielding next checkpoint")
logger.debug(
f"Connector took {time.monotonic() - start} seconds to get to the next checkpoint."

View File

@@ -10,6 +10,7 @@ from typing import cast
from typing import Protocol
from urllib.parse import urlparse
from google.auth.exceptions import RefreshError # type: ignore
from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore
from googleapiclient.errors import HttpError # type: ignore
@@ -17,7 +18,6 @@ from typing_extensions import override
from onyx.configs.app_configs import GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.app_configs import MAX_DRIVE_WORKERS
from onyx.configs.constants import DocumentSource
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
@@ -66,13 +66,17 @@ from onyx.utils.logger import setup_logger
from onyx.utils.retry_wrapper import retry_builder
from onyx.utils.threadpool_concurrency import parallel_yield
from onyx.utils.threadpool_concurrency import run_functions_tuples_in_parallel
from onyx.utils.threadpool_concurrency import run_with_timeout
from onyx.utils.threadpool_concurrency import ThreadSafeDict
logger = setup_logger()
# TODO: Improve this by using the batch utility: https://googleapis.github.io/google-api-python-client/docs/batch.html
# All file retrievals could be batched and made at once
BATCHES_PER_CHECKPOINT = 10
MAX_DRIVE_WORKERS = 4
BATCHES_PER_CHECKPOINT = 1
DRIVE_BATCH_SIZE = 80
def _extract_str_list_from_comma_str(string: str | None) -> list[str]:
@@ -120,6 +124,7 @@ def add_retrieval_info(
parent_id: str | None = None,
) -> Iterator[RetrievedDriveFile]:
for file in drive_files:
logger.info(f"Adding retrieval info for file: {file.get('id')} as {user_email}")
yield RetrievedDriveFile(
drive_file=file,
user_email=user_email,
@@ -184,8 +189,6 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
"shared_folder_urls, or my_drive_emails"
)
self.batch_size = batch_size
specific_requests_made = False
if bool(shared_drive_urls) or bool(my_drive_emails) or bool(shared_folder_urls):
specific_requests_made = True
@@ -306,14 +309,14 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
return user_emails
def get_all_drive_ids(self) -> set[str]:
primary_drive_service = get_drive_service(
creds=self.creds,
user_email=self.primary_admin_email,
)
return self._get_all_drives_for_user(self.primary_admin_email)
def _get_all_drives_for_user(self, user_email: str) -> set[str]:
drive_service = get_drive_service(self.creds, user_email)
is_service_account = isinstance(self.creds, ServiceAccountCredentials)
all_drive_ids = set()
all_drive_ids: set[str] = set()
for drive in execute_paginated_retrieval(
retrieval_function=primary_drive_service.drives().list,
retrieval_function=drive_service.drives().list,
list_key="drives",
useDomainAdminAccess=is_service_account,
fields="drives(id),nextPageToken",
@@ -367,12 +370,19 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
def record_drive_processing(drive_id: str) -> None:
with cv:
logger.info(
f"Record drive processing for drive id: {drive_id}, user email: {thread_id}"
)
completion.processed_drive_ids.add(drive_id)
drive_id_status[drive_id] = (
DriveIdStatus.FINISHED
if drive_id in self._retrieved_folder_and_drive_ids
else DriveIdStatus.AVAILABLE
)
logger.info(
f"Drive id status: {len(drive_id_status)}, user email: {thread_id},"
f"processed drive ids: {len(completion.processed_drive_ids)}"
)
# wake up other threads waiting for work
cv.notify_all()
@@ -423,16 +433,22 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
curr_stage = checkpoint.completion_map[user_email]
resuming = True
if curr_stage.stage == DriveRetrievalStage.START:
logger.info(f"Setting stage to {DriveRetrievalStage.MY_DRIVE_FILES.value}")
curr_stage.stage = DriveRetrievalStage.MY_DRIVE_FILES
resuming = False
drive_service = get_drive_service(self.creds, user_email)
logger.info(f"Got drive service: {drive_service}")
# validate that the user has access to the drive APIs by performing a simple
# request and checking for a 401
try:
logger.info(f"Getting root folder id for user {user_email}")
# default is ~17mins of retries, don't do that here for cases so we don't
# waste 17mins everytime we run into a user without access to drive APIs
retry_builder(tries=3, delay=1)(get_root_folder_id)(drive_service)
retry_builder(tries=3, delay=1)(
lambda: run_with_timeout(30, get_root_folder_id, drive_service)
)()
logger.info(f"Got root folder id for user {user_email}")
except HttpError as e:
if e.status_code == 401:
# fail gracefully, let the other impersonations continue
@@ -445,14 +461,31 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
curr_stage.stage = DriveRetrievalStage.DONE
return
raise
except TimeoutError:
logger.warning(
f"User '{user_email}' timed out when trying to access the drive APIs."
)
# mark this user as done so we don't try to retrieve anything for them
# again
curr_stage.stage = DriveRetrievalStage.DONE
return
except RefreshError as e:
logger.warning(
f"User '{user_email}' could not refresh their token. Error: {e}"
)
# mark this user as done so we don't try to retrieve anything for them
# again
curr_stage.stage = DriveRetrievalStage.DONE
return
# if we are including my drives, try to get the current user's my
# drive if any of the following are true:
# - include_my_drives is true
# - the current user's email is in the requested emails
if curr_stage.stage == DriveRetrievalStage.MY_DRIVE_FILES:
if self.include_my_drives or user_email in self._requested_my_drive_emails:
logger.info(f"Getting all files in my drive as '{user_email}'")
logger.info(
f"Getting all files in my drive as '{user_email}. Resuming: {resuming}"
)
yield from add_retrieval_info(
get_all_files_in_my_drive_and_shared(
@@ -505,7 +538,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
for drive_id in concurrent_drive_itr(user_email):
logger.info(
f"Getting files in shared drive '{drive_id}' as '{user_email}'"
f"Getting files in shared drive '{drive_id}' as '{user_email}. Resuming: {resuming}"
)
curr_stage.completed_until = 0
curr_stage.current_folder_or_drive_id = drive_id
@@ -593,6 +626,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
sorted_drive_ids, sorted_folder_ids = self._determine_retrieval_ids(
checkpoint, is_slim, DriveRetrievalStage.MY_DRIVE_FILES
)
all_drive_ids = set(sorted_drive_ids)
# Setup initial completion map on first connector run
for email in all_org_emails:
@@ -602,6 +636,8 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
checkpoint.completion_map[email] = StageCompletion(
stage=DriveRetrievalStage.START,
completed_until=0,
processed_drive_ids=all_drive_ids
- self._get_all_drives_for_user(email),
)
# we've found all users and drives, now time to actually start
@@ -627,7 +663,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
# to the drive APIs. Without this, we could loop through these emails for
# more than 3 hours, causing a timeout and stalling progress.
email_batch_takes_us_to_completion = True
MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING = 50
MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING = MAX_DRIVE_WORKERS
if len(non_completed_org_emails) > MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING:
non_completed_org_emails = non_completed_org_emails[
:MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING
@@ -646,7 +682,22 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
)
for email in non_completed_org_emails
]
yield from parallel_yield(user_retrieval_gens, max_workers=MAX_DRIVE_WORKERS)
for drive_file in parallel_yield(
user_retrieval_gens, max_workers=MAX_DRIVE_WORKERS
):
logger.info(f"Yielding thing: {drive_file.drive_file.get('id')}")
yield drive_file
# while user_retrieval_gens:
# done_inds = []
# for ind, gen in enumerate(user_retrieval_gens):
# drive_file = next(gen, None)
# if drive_file is None:
# done_inds.append(ind)
# else:
# logger.info(f"Yielding thing: {drive_file.drive_file.get('id')}")
# yield drive_file
# for ind in done_inds[::-1]:
# user_retrieval_gens.pop(ind)
# if there are more emails to process, don't mark as complete
if not email_batch_takes_us_to_completion:
@@ -871,6 +922,10 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
return
for file in drive_files:
logger.info(
f"Updating checkpoint for file: {file.drive_file.get('name')}. "
f"Seen: {file.drive_file.get('id') in checkpoint.all_retrieved_file_ids}"
)
checkpoint.completion_map[file.user_email].update(
stage=file.completion_stage,
completed_until=datetime.fromisoformat(
@@ -881,6 +936,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
if file.drive_file["id"] not in checkpoint.all_retrieved_file_ids:
checkpoint.all_retrieved_file_ids.add(file.drive_file["id"])
yield file
logger.info("Done yielding from checkpointed retrieval")
def _manage_oauth_retrieval(
self,
@@ -981,6 +1037,9 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
end: SecondsSinceUnixEpoch | None = None,
) -> Iterator[Document | ConnectorFailure]:
try:
import time
start_time = time.time()
# Prepare a partial function with the credentials and admin email
convert_func = partial(
convert_drive_item_to_document,
@@ -1008,11 +1067,15 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
)
for file in files_batch
]
logger.info(f"Processing batch of {len(func_with_args)} files")
logger.info(
f"File names: {[file.drive_file.get('name') for file in files_batch]}"
)
results = cast(
list[Document | ConnectorFailure | None],
run_functions_tuples_in_parallel(func_with_args, max_workers=8),
)
logger.info(f"Results: {len(results)}")
docs_and_failures = [result for result in results if result is not None]
if docs_and_failures:
@@ -1025,6 +1088,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
start=start,
end=end,
):
logger.info(f"Retrieved file: {retrieved_file}")
if retrieved_file.error is not None:
failure_stage = retrieved_file.completion_stage.value
failure_message = (
@@ -1047,24 +1111,46 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
continue
files_batch.append(retrieved_file)
if len(files_batch) < self.batch_size:
if len(files_batch) < DRIVE_BATCH_SIZE:
logger.info(
f"Not Yielding batch of {len(files_batch)} files; "
f"num seen doc ids: {len(checkpoint.all_retrieved_file_ids)}"
)
continue
logger.info(
f"Yielding batch of {len(files_batch)} files; num seen doc ids: {len(checkpoint.all_retrieved_file_ids)}"
f"Yielding batch of {len(files_batch)} files; "
f"num seen doc ids: {len(checkpoint.all_retrieved_file_ids)}"
)
yield from _yield_batch(files_batch)
files_batch = []
if batches_complete > BATCHES_PER_CHECKPOINT:
checkpoint.retrieved_folder_and_drive_ids = (
self._retrieved_folder_and_drive_ids
)
return # create a new checkpoint
# if batches_complete > BATCHES_PER_CHECKPOINT:
# logger.info(
# f"Returning checkpoint after {batches_complete} batches; "
# f"num seen doc ids: {len(checkpoint.all_retrieved_file_ids)}"
# )
# checkpoint.retrieved_folder_and_drive_ids = (
# self._retrieved_folder_and_drive_ids
# )
# logger.info(
# f"Time taken until checkpoint: {time.time() - start_time} for {batches_complete*DRIVE_BATCH_SIZE} files"
# )
# return # create a new checkpoint
logger.info(
f"Processing remaining files: {[file.drive_file.get('name') for file in files_batch]}"
)
# Process any remaining files
if files_batch:
yield from _yield_batch(files_batch)
checkpoint.retrieved_folder_and_drive_ids = (
self._retrieved_folder_and_drive_ids
)
logger.info(
f"Completed processing files. time taken: {time.time() - start_time}"
)
except Exception as e:
logger.exception(f"Error extracting documents from Google Drive: {e}")
raise e
@@ -1083,15 +1169,23 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
"Credentials missing, should not call this method before calling load_credentials"
)
logger.info(
f"Loading from checkpoint with completion stage: {checkpoint.completion_stage},"
f"retrieved ids: {len(checkpoint.all_retrieved_file_ids)}"
)
checkpoint = copy.deepcopy(checkpoint)
self._retrieved_folder_and_drive_ids = checkpoint.retrieved_folder_and_drive_ids
try:
yield from self._extract_docs_from_google_drive(checkpoint, start, end)
logger.info("Done extracting docs from google drive for this checkpoint")
except Exception as e:
if MISSING_SCOPES_ERROR_STR in str(e):
raise PermissionError(ONYX_SCOPE_INSTRUCTIONS) from e
raise e
checkpoint.retrieved_folder_and_drive_ids = self._retrieved_folder_and_drive_ids
logger.info(
f"Returning checkpoint with this many things seen: {len(checkpoint.all_retrieved_file_ids)}"
)
if checkpoint.completion_stage == DriveRetrievalStage.DONE:
checkpoint.has_more = False
return checkpoint

View File

@@ -324,6 +324,9 @@ def convert_drive_item_to_document(
if retriever_email in seen:
continue
seen.add(retriever_email)
logger.info(
f"Converting file {file.get('name')} to document as retriever: {retriever_email}"
)
doc_or_failure = _convert_drive_item_to_document(
creds, allow_images, size_threshold, retriever_email, file
)
@@ -335,6 +338,7 @@ def convert_drive_item_to_document(
and doc_or_failure.exception.status_code == 403
)
):
logger.info(f" file {file.get('name')} -> {type(doc_or_failure)}")
return doc_or_failure
if first_error is None:
@@ -387,6 +391,7 @@ def _convert_drive_item_to_document(
# If it's a Google Doc, we might do advanced parsing
if file.get("mimeType") == GDriveMimeType.DOC.value:
try:
logger.info(f"Getting document sections for {file.get('name')}")
# get_document_sections is the advanced approach for Google Docs
doc_sections = get_document_sections(
docs_service=docs_service(),
@@ -412,6 +417,7 @@ def _convert_drive_item_to_document(
except ValueError:
logger.warning(f"Parsing string to int failed: size_str={size_str}")
else:
logger.info(f"File name: {file.get('name')}, File size: {size_int}")
if size_int > size_threshold:
logger.warning(
f"{file.get('name')} exceeds size threshold of {size_threshold}. Skipping."
@@ -420,6 +426,9 @@ def _convert_drive_item_to_document(
# If we don't have sections yet, use the basic extraction method
if not sections:
logger.info(
f"Downloading and extracting sections (basic) for {file.get('name')}"
)
sections = _download_and_extract_sections_basic(
file, drive_service(), allow_images
)

View File

@@ -236,6 +236,9 @@ def get_all_files_in_my_drive_and_shared(
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> Iterator[GoogleDriveFileType]:
import time
t1 = time.time()
kwargs = {}
if not is_slim:
kwargs[ORDER_BY_KEY] = GoogleFields.MODIFIED_TIME.value
@@ -258,14 +261,16 @@ def get_all_files_in_my_drive_and_shared(
found_folders = True
if found_folders:
update_traversed_ids_func(get_root_folder_id(service))
t2 = time.time()
logger.info(f"Time taken to get all folders: {t2 - t1}")
# Then get the files
file_query = f"mimeType != '{DRIVE_FOLDER_TYPE}'"
file_query += " and trashed = false"
if not include_shared_with_me:
file_query += " and 'me' in owners"
file_query += _generate_time_range_filter(start, end)
yield from execute_paginated_retrieval(
logger.info("listing files as user")
for drive_file in execute_paginated_retrieval(
retrieval_function=service.files().list,
list_key="files",
continue_on_404_or_403=False,
@@ -273,7 +278,11 @@ def get_all_files_in_my_drive_and_shared(
fields=SLIM_FILE_FIELDS if is_slim else FILE_FIELDS,
q=file_query,
**kwargs,
)
):
logger.info(f"Inner yielding drive file: {drive_file.get('id')}")
yield drive_file
t3 = time.time()
logger.info(f"Time taken to get all files: {t3 - t2}")
def get_all_files_for_oauth(

View File

@@ -12,6 +12,7 @@ from googleapiclient.errors import HttpError # type: ignore
from onyx.connectors.google_drive.models import GoogleDriveFileType
from onyx.utils.logger import setup_logger
from onyx.utils.retry_wrapper import retry_builder
from onyx.utils.threadpool_concurrency import run_with_timeout
logger = setup_logger()
@@ -176,11 +177,19 @@ def execute_paginated_retrieval(
request_kwargs = kwargs.copy()
if next_page_token:
request_kwargs[PAGE_TOKEN_KEY] = next_page_token
results = _execute_single_retrieval(
retrieval_function,
continue_on_404_or_403,
**request_kwargs,
)
try:
results = run_with_timeout(
30,
_execute_single_retrieval,
retrieval_function,
continue_on_404_or_403,
**request_kwargs,
)
except TimeoutError:
logger.warning(
f"Paginated retrieval for {retrieval_function.__name__} timed out after 30 seconds"
)
break
next_page_token = results.get(NEXT_PAGE_TOKEN_KEY)
if list_key:

View File

@@ -327,17 +327,20 @@ def index_doc_batch_prepare(
This preceeds indexing it into the actual document index."""
# Create a trimmed list of docs that don't have a newer updated at
# Shortcuts the time-consuming flow on connector index retries
logger.info(f"index_doc_batch_prepare: {len(documents)} documents")
document_ids: list[str] = [document.id for document in documents]
db_docs: list[DBDocument] = get_documents_by_ids(
db_session=db_session,
document_ids=document_ids,
)
logger.info(f"index_doc_batch_prepare: {len(db_docs)} db_docs")
updatable_docs = (
get_doc_ids_to_update(documents=documents, db_docs=db_docs)
if not ignore_time_skip
else documents
)
logger.info(f"index_doc_batch_prepare: {len(updatable_docs)} updatable_docs")
if len(updatable_docs) != len(documents):
updatable_doc_ids = [doc.id for doc in updatable_docs]
skipped_doc_ids = [
@@ -718,6 +721,8 @@ def index_doc_batch(
Returns a tuple where the first element is the number of new docs and the
second element is the number of chunks."""
logger.error(f"index_doc_batch: {len(document_batch)} documents")
no_access = DocumentAccess.build(
user_emails=[],
user_groups=[],
@@ -1057,6 +1062,7 @@ def build_indexing_pipeline(
callback: IndexingHeartbeatInterface | None = None,
) -> IndexingPipelineProtocol:
"""Builds a pipeline which takes in a list (batch) of docs and indexes them."""
logger.error("Building indexing pipeline")
all_search_settings = get_active_search_settings(db_session)
if (
all_search_settings.secondary
@@ -1067,10 +1073,12 @@ def build_indexing_pipeline(
search_settings = all_search_settings.primary
multipass_config = get_multipass_config(search_settings)
logger.error(f"multipass_config: {multipass_config}")
enable_contextual_rag = (
search_settings.enable_contextual_rag or ENABLE_CONTEXTUAL_RAG
)
logger.error(f"enable_contextual_rag: {enable_contextual_rag}")
llm = None
if enable_contextual_rag:
llm = get_llm_for_contextual_rag(

View File

@@ -223,6 +223,7 @@ def run_functions_tuples_in_parallel(
raise
results.sort(key=lambda x: x[0])
logger.info(f"Results from run in parallel: {len(results)}")
return [result for index, result in results]
@@ -297,6 +298,9 @@ class TimeoutThread(threading.Thread, Generic[R]):
self.exception = e
def end(self) -> None:
logger.info(
f"Function {self.func.__name__} timed out after {self.timeout} seconds"
)
raise TimeoutError(
f"Function {self.func.__name__} timed out after {self.timeout} seconds"
)
@@ -374,7 +378,9 @@ def parallel_yield(gens: list[Iterator[R]], max_workers: int = 10) -> Iterator[R
next_ind = len(gens)
while future_to_index:
logger.info(f"Waiting for {len(future_to_index)} futures")
done, _ = wait(future_to_index, return_when=FIRST_COMPLETED)
logger.info(f"Done: {len(done)}")
for future in done:
ind, result = future.result()
if result is not None: