mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-03-28 19:12:43 +00:00
Compare commits
22 Commits
cli/v0.2.0
...
extra-driv
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e1f4a52661 | ||
|
|
4a70b33d87 | ||
|
|
209dcc006e | ||
|
|
18a024b759 | ||
|
|
320c4eec46 | ||
|
|
75b1862600 | ||
|
|
aac6390aa1 | ||
|
|
1c6584f0b2 | ||
|
|
1d15ac7f34 | ||
|
|
8594b72b32 | ||
|
|
5a7c07fc44 | ||
|
|
70df0be775 | ||
|
|
0e5f90cebd | ||
|
|
5788dc011a | ||
|
|
d91728fe86 | ||
|
|
ae887d0540 | ||
|
|
fc3d6ba47f | ||
|
|
a3c8be4cb3 | ||
|
|
6164223656 | ||
|
|
c0eb19a942 | ||
|
|
f18d719ed6 | ||
|
|
b4da2130ef |
@@ -261,7 +261,7 @@ def _run_indexing(
|
||||
3. Updates Postgres to record the indexed documents + the outcome of this run
|
||||
"""
|
||||
start_time = time.monotonic() # jsut used for logging
|
||||
|
||||
logger.error("Starting indexing run")
|
||||
with get_session_with_current_tenant() as db_session_temp:
|
||||
index_attempt_start = get_index_attempt(db_session_temp, index_attempt_id)
|
||||
if not index_attempt_start:
|
||||
@@ -315,6 +315,7 @@ def _run_indexing(
|
||||
# don't go into "negative" time if we've never indexed before
|
||||
window_start = datetime.fromtimestamp(0, tz=timezone.utc)
|
||||
|
||||
logger.error("Getting most recent attempt")
|
||||
most_recent_attempt = next(
|
||||
iter(
|
||||
get_recent_completed_attempts_for_cc_pair(
|
||||
@@ -326,6 +327,7 @@ def _run_indexing(
|
||||
),
|
||||
None,
|
||||
)
|
||||
logger.error(f"Most recent attempt: {most_recent_attempt}")
|
||||
# if the last attempt failed, try and use the same window. This is necessary
|
||||
# to ensure correctness with checkpointing. If we don't do this, things like
|
||||
# new slack channels could be missed (since existing slack channels are
|
||||
@@ -361,6 +363,7 @@ def _run_indexing(
|
||||
httpx_client=HttpxPool.get("vespa"),
|
||||
)
|
||||
|
||||
logger.error("Building indexing pipeline")
|
||||
indexing_pipeline = build_indexing_pipeline(
|
||||
embedder=embedding_model,
|
||||
information_content_classification_model=information_content_classification_model,
|
||||
@@ -447,6 +450,9 @@ def _run_indexing(
|
||||
for document_batch, failure, next_checkpoint in connector_runner.run(
|
||||
checkpoint
|
||||
):
|
||||
logger.info(
|
||||
f"Document batch: {len(document_batch) if document_batch else 0}"
|
||||
)
|
||||
# Check if connector is disabled mid run and stop if so unless it's the secondary
|
||||
# index being built. We want to populate it even for paused connectors
|
||||
# Often paused connectors are sources that aren't updated frequently but the
|
||||
@@ -467,6 +473,7 @@ def _run_indexing(
|
||||
db_session_temp, ctx, index_attempt_id
|
||||
)
|
||||
|
||||
logger.info(f"Maybe failure: {failure}")
|
||||
# save record of any failures at the connector level
|
||||
if failure is not None:
|
||||
total_failures += 1
|
||||
@@ -482,6 +489,7 @@ def _run_indexing(
|
||||
total_failures, document_count, batch_num, failure
|
||||
)
|
||||
|
||||
logger.info(f"Next checkpoint: {next_checkpoint}")
|
||||
# save the new checkpoint (if one is provided)
|
||||
if next_checkpoint:
|
||||
checkpoint = next_checkpoint
|
||||
@@ -513,7 +521,7 @@ def _run_indexing(
|
||||
f"threshold={INDEXING_SIZE_WARNING_THRESHOLD}"
|
||||
)
|
||||
|
||||
logger.debug(f"Indexing batch of documents: {batch_description}")
|
||||
logger.info(f"Indexing batch of documents: {batch_description}")
|
||||
|
||||
index_attempt_md.request_id = make_randomized_onyx_request_id("CIX")
|
||||
index_attempt_md.structured_id = (
|
||||
@@ -532,6 +540,10 @@ def _run_indexing(
|
||||
chunk_count += index_pipeline_result.total_chunks
|
||||
document_count += index_pipeline_result.total_docs
|
||||
|
||||
logger.info(
|
||||
f"From run_indexing: indexed {index_pipeline_result.total_docs} docs"
|
||||
)
|
||||
|
||||
# resolve errors for documents that were successfully indexed
|
||||
failed_document_ids = [
|
||||
failure.failed_document.document_id
|
||||
@@ -573,6 +585,8 @@ def _run_indexing(
|
||||
index_pipeline_result.failures[-1],
|
||||
)
|
||||
|
||||
logger.info("From run_indexing: updating indexed docs")
|
||||
|
||||
# This new value is updated every batch, so UI can refresh per batch update
|
||||
with get_session_with_current_tenant() as db_session_temp:
|
||||
# NOTE: Postgres uses the start of the transactions when computing `NOW()`
|
||||
@@ -782,7 +796,7 @@ def run_indexing_entrypoint(
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
) -> None:
|
||||
"""Don't swallow exceptions here ... propagate them up."""
|
||||
|
||||
logger.error("Starting indexing run: run_indexing_entrypoint")
|
||||
if is_ee:
|
||||
global_version.set_ee()
|
||||
|
||||
|
||||
@@ -117,6 +117,9 @@ class ConnectorRunner(Generic[CT]):
|
||||
yield None, failure, None
|
||||
|
||||
if len(self.doc_batch) >= self.batch_size:
|
||||
logger.info(
|
||||
f"From runner: Yielding batch of {len(self.doc_batch)} documents"
|
||||
)
|
||||
yield self.doc_batch, None, None
|
||||
self.doc_batch = []
|
||||
|
||||
@@ -126,6 +129,7 @@ class ConnectorRunner(Generic[CT]):
|
||||
self.doc_batch = []
|
||||
|
||||
yield None, None, next_checkpoint
|
||||
logger.info("From runner: Yielding next checkpoint")
|
||||
|
||||
logger.debug(
|
||||
f"Connector took {time.monotonic() - start} seconds to get to the next checkpoint."
|
||||
|
||||
@@ -10,6 +10,7 @@ from typing import cast
|
||||
from typing import Protocol
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from google.auth.exceptions import RefreshError # type: ignore
|
||||
from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
|
||||
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore
|
||||
from googleapiclient.errors import HttpError # type: ignore
|
||||
@@ -17,7 +18,6 @@ from typing_extensions import override
|
||||
|
||||
from onyx.configs.app_configs import GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD
|
||||
from onyx.configs.app_configs import INDEX_BATCH_SIZE
|
||||
from onyx.configs.app_configs import MAX_DRIVE_WORKERS
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.connectors.exceptions import ConnectorValidationError
|
||||
from onyx.connectors.exceptions import CredentialExpiredError
|
||||
@@ -66,13 +66,17 @@ from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.retry_wrapper import retry_builder
|
||||
from onyx.utils.threadpool_concurrency import parallel_yield
|
||||
from onyx.utils.threadpool_concurrency import run_functions_tuples_in_parallel
|
||||
from onyx.utils.threadpool_concurrency import run_with_timeout
|
||||
from onyx.utils.threadpool_concurrency import ThreadSafeDict
|
||||
|
||||
logger = setup_logger()
|
||||
# TODO: Improve this by using the batch utility: https://googleapis.github.io/google-api-python-client/docs/batch.html
|
||||
# All file retrievals could be batched and made at once
|
||||
|
||||
BATCHES_PER_CHECKPOINT = 10
|
||||
MAX_DRIVE_WORKERS = 4
|
||||
BATCHES_PER_CHECKPOINT = 1
|
||||
|
||||
DRIVE_BATCH_SIZE = 80
|
||||
|
||||
|
||||
def _extract_str_list_from_comma_str(string: str | None) -> list[str]:
|
||||
@@ -120,6 +124,7 @@ def add_retrieval_info(
|
||||
parent_id: str | None = None,
|
||||
) -> Iterator[RetrievedDriveFile]:
|
||||
for file in drive_files:
|
||||
logger.info(f"Adding retrieval info for file: {file.get('id')} as {user_email}")
|
||||
yield RetrievedDriveFile(
|
||||
drive_file=file,
|
||||
user_email=user_email,
|
||||
@@ -184,8 +189,6 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
"shared_folder_urls, or my_drive_emails"
|
||||
)
|
||||
|
||||
self.batch_size = batch_size
|
||||
|
||||
specific_requests_made = False
|
||||
if bool(shared_drive_urls) or bool(my_drive_emails) or bool(shared_folder_urls):
|
||||
specific_requests_made = True
|
||||
@@ -306,14 +309,14 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
return user_emails
|
||||
|
||||
def get_all_drive_ids(self) -> set[str]:
|
||||
primary_drive_service = get_drive_service(
|
||||
creds=self.creds,
|
||||
user_email=self.primary_admin_email,
|
||||
)
|
||||
return self._get_all_drives_for_user(self.primary_admin_email)
|
||||
|
||||
def _get_all_drives_for_user(self, user_email: str) -> set[str]:
|
||||
drive_service = get_drive_service(self.creds, user_email)
|
||||
is_service_account = isinstance(self.creds, ServiceAccountCredentials)
|
||||
all_drive_ids = set()
|
||||
all_drive_ids: set[str] = set()
|
||||
for drive in execute_paginated_retrieval(
|
||||
retrieval_function=primary_drive_service.drives().list,
|
||||
retrieval_function=drive_service.drives().list,
|
||||
list_key="drives",
|
||||
useDomainAdminAccess=is_service_account,
|
||||
fields="drives(id),nextPageToken",
|
||||
@@ -367,12 +370,19 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
|
||||
def record_drive_processing(drive_id: str) -> None:
|
||||
with cv:
|
||||
logger.info(
|
||||
f"Record drive processing for drive id: {drive_id}, user email: {thread_id}"
|
||||
)
|
||||
completion.processed_drive_ids.add(drive_id)
|
||||
drive_id_status[drive_id] = (
|
||||
DriveIdStatus.FINISHED
|
||||
if drive_id in self._retrieved_folder_and_drive_ids
|
||||
else DriveIdStatus.AVAILABLE
|
||||
)
|
||||
logger.info(
|
||||
f"Drive id status: {len(drive_id_status)}, user email: {thread_id},"
|
||||
f"processed drive ids: {len(completion.processed_drive_ids)}"
|
||||
)
|
||||
# wake up other threads waiting for work
|
||||
cv.notify_all()
|
||||
|
||||
@@ -423,16 +433,22 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
curr_stage = checkpoint.completion_map[user_email]
|
||||
resuming = True
|
||||
if curr_stage.stage == DriveRetrievalStage.START:
|
||||
logger.info(f"Setting stage to {DriveRetrievalStage.MY_DRIVE_FILES.value}")
|
||||
curr_stage.stage = DriveRetrievalStage.MY_DRIVE_FILES
|
||||
resuming = False
|
||||
drive_service = get_drive_service(self.creds, user_email)
|
||||
logger.info(f"Got drive service: {drive_service}")
|
||||
|
||||
# validate that the user has access to the drive APIs by performing a simple
|
||||
# request and checking for a 401
|
||||
try:
|
||||
logger.info(f"Getting root folder id for user {user_email}")
|
||||
# default is ~17mins of retries, don't do that here for cases so we don't
|
||||
# waste 17mins everytime we run into a user without access to drive APIs
|
||||
retry_builder(tries=3, delay=1)(get_root_folder_id)(drive_service)
|
||||
retry_builder(tries=3, delay=1)(
|
||||
lambda: run_with_timeout(30, get_root_folder_id, drive_service)
|
||||
)()
|
||||
logger.info(f"Got root folder id for user {user_email}")
|
||||
except HttpError as e:
|
||||
if e.status_code == 401:
|
||||
# fail gracefully, let the other impersonations continue
|
||||
@@ -445,14 +461,31 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
curr_stage.stage = DriveRetrievalStage.DONE
|
||||
return
|
||||
raise
|
||||
|
||||
except TimeoutError:
|
||||
logger.warning(
|
||||
f"User '{user_email}' timed out when trying to access the drive APIs."
|
||||
)
|
||||
# mark this user as done so we don't try to retrieve anything for them
|
||||
# again
|
||||
curr_stage.stage = DriveRetrievalStage.DONE
|
||||
return
|
||||
except RefreshError as e:
|
||||
logger.warning(
|
||||
f"User '{user_email}' could not refresh their token. Error: {e}"
|
||||
)
|
||||
# mark this user as done so we don't try to retrieve anything for them
|
||||
# again
|
||||
curr_stage.stage = DriveRetrievalStage.DONE
|
||||
return
|
||||
# if we are including my drives, try to get the current user's my
|
||||
# drive if any of the following are true:
|
||||
# - include_my_drives is true
|
||||
# - the current user's email is in the requested emails
|
||||
if curr_stage.stage == DriveRetrievalStage.MY_DRIVE_FILES:
|
||||
if self.include_my_drives or user_email in self._requested_my_drive_emails:
|
||||
logger.info(f"Getting all files in my drive as '{user_email}'")
|
||||
logger.info(
|
||||
f"Getting all files in my drive as '{user_email}. Resuming: {resuming}"
|
||||
)
|
||||
|
||||
yield from add_retrieval_info(
|
||||
get_all_files_in_my_drive_and_shared(
|
||||
@@ -505,7 +538,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
|
||||
for drive_id in concurrent_drive_itr(user_email):
|
||||
logger.info(
|
||||
f"Getting files in shared drive '{drive_id}' as '{user_email}'"
|
||||
f"Getting files in shared drive '{drive_id}' as '{user_email}. Resuming: {resuming}"
|
||||
)
|
||||
curr_stage.completed_until = 0
|
||||
curr_stage.current_folder_or_drive_id = drive_id
|
||||
@@ -593,6 +626,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
sorted_drive_ids, sorted_folder_ids = self._determine_retrieval_ids(
|
||||
checkpoint, is_slim, DriveRetrievalStage.MY_DRIVE_FILES
|
||||
)
|
||||
all_drive_ids = set(sorted_drive_ids)
|
||||
|
||||
# Setup initial completion map on first connector run
|
||||
for email in all_org_emails:
|
||||
@@ -602,6 +636,8 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
checkpoint.completion_map[email] = StageCompletion(
|
||||
stage=DriveRetrievalStage.START,
|
||||
completed_until=0,
|
||||
processed_drive_ids=all_drive_ids
|
||||
- self._get_all_drives_for_user(email),
|
||||
)
|
||||
|
||||
# we've found all users and drives, now time to actually start
|
||||
@@ -627,7 +663,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
# to the drive APIs. Without this, we could loop through these emails for
|
||||
# more than 3 hours, causing a timeout and stalling progress.
|
||||
email_batch_takes_us_to_completion = True
|
||||
MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING = 50
|
||||
MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING = MAX_DRIVE_WORKERS
|
||||
if len(non_completed_org_emails) > MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING:
|
||||
non_completed_org_emails = non_completed_org_emails[
|
||||
:MAX_EMAILS_TO_PROCESS_BEFORE_CHECKPOINTING
|
||||
@@ -646,7 +682,22 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
)
|
||||
for email in non_completed_org_emails
|
||||
]
|
||||
yield from parallel_yield(user_retrieval_gens, max_workers=MAX_DRIVE_WORKERS)
|
||||
for drive_file in parallel_yield(
|
||||
user_retrieval_gens, max_workers=MAX_DRIVE_WORKERS
|
||||
):
|
||||
logger.info(f"Yielding thing: {drive_file.drive_file.get('id')}")
|
||||
yield drive_file
|
||||
# while user_retrieval_gens:
|
||||
# done_inds = []
|
||||
# for ind, gen in enumerate(user_retrieval_gens):
|
||||
# drive_file = next(gen, None)
|
||||
# if drive_file is None:
|
||||
# done_inds.append(ind)
|
||||
# else:
|
||||
# logger.info(f"Yielding thing: {drive_file.drive_file.get('id')}")
|
||||
# yield drive_file
|
||||
# for ind in done_inds[::-1]:
|
||||
# user_retrieval_gens.pop(ind)
|
||||
|
||||
# if there are more emails to process, don't mark as complete
|
||||
if not email_batch_takes_us_to_completion:
|
||||
@@ -871,6 +922,10 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
return
|
||||
|
||||
for file in drive_files:
|
||||
logger.info(
|
||||
f"Updating checkpoint for file: {file.drive_file.get('name')}. "
|
||||
f"Seen: {file.drive_file.get('id') in checkpoint.all_retrieved_file_ids}"
|
||||
)
|
||||
checkpoint.completion_map[file.user_email].update(
|
||||
stage=file.completion_stage,
|
||||
completed_until=datetime.fromisoformat(
|
||||
@@ -881,6 +936,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
if file.drive_file["id"] not in checkpoint.all_retrieved_file_ids:
|
||||
checkpoint.all_retrieved_file_ids.add(file.drive_file["id"])
|
||||
yield file
|
||||
logger.info("Done yielding from checkpointed retrieval")
|
||||
|
||||
def _manage_oauth_retrieval(
|
||||
self,
|
||||
@@ -981,6 +1037,9 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
) -> Iterator[Document | ConnectorFailure]:
|
||||
try:
|
||||
import time
|
||||
|
||||
start_time = time.time()
|
||||
# Prepare a partial function with the credentials and admin email
|
||||
convert_func = partial(
|
||||
convert_drive_item_to_document,
|
||||
@@ -1008,11 +1067,15 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
)
|
||||
for file in files_batch
|
||||
]
|
||||
logger.info(f"Processing batch of {len(func_with_args)} files")
|
||||
logger.info(
|
||||
f"File names: {[file.drive_file.get('name') for file in files_batch]}"
|
||||
)
|
||||
results = cast(
|
||||
list[Document | ConnectorFailure | None],
|
||||
run_functions_tuples_in_parallel(func_with_args, max_workers=8),
|
||||
)
|
||||
|
||||
logger.info(f"Results: {len(results)}")
|
||||
docs_and_failures = [result for result in results if result is not None]
|
||||
|
||||
if docs_and_failures:
|
||||
@@ -1025,6 +1088,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
start=start,
|
||||
end=end,
|
||||
):
|
||||
logger.info(f"Retrieved file: {retrieved_file}")
|
||||
if retrieved_file.error is not None:
|
||||
failure_stage = retrieved_file.completion_stage.value
|
||||
failure_message = (
|
||||
@@ -1047,24 +1111,46 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
continue
|
||||
files_batch.append(retrieved_file)
|
||||
|
||||
if len(files_batch) < self.batch_size:
|
||||
if len(files_batch) < DRIVE_BATCH_SIZE:
|
||||
logger.info(
|
||||
f"Not Yielding batch of {len(files_batch)} files; "
|
||||
f"num seen doc ids: {len(checkpoint.all_retrieved_file_ids)}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
f"Yielding batch of {len(files_batch)} files; num seen doc ids: {len(checkpoint.all_retrieved_file_ids)}"
|
||||
f"Yielding batch of {len(files_batch)} files; "
|
||||
f"num seen doc ids: {len(checkpoint.all_retrieved_file_ids)}"
|
||||
)
|
||||
yield from _yield_batch(files_batch)
|
||||
files_batch = []
|
||||
|
||||
if batches_complete > BATCHES_PER_CHECKPOINT:
|
||||
checkpoint.retrieved_folder_and_drive_ids = (
|
||||
self._retrieved_folder_and_drive_ids
|
||||
)
|
||||
return # create a new checkpoint
|
||||
# if batches_complete > BATCHES_PER_CHECKPOINT:
|
||||
# logger.info(
|
||||
# f"Returning checkpoint after {batches_complete} batches; "
|
||||
# f"num seen doc ids: {len(checkpoint.all_retrieved_file_ids)}"
|
||||
# )
|
||||
# checkpoint.retrieved_folder_and_drive_ids = (
|
||||
# self._retrieved_folder_and_drive_ids
|
||||
# )
|
||||
# logger.info(
|
||||
# f"Time taken until checkpoint: {time.time() - start_time} for {batches_complete*DRIVE_BATCH_SIZE} files"
|
||||
# )
|
||||
# return # create a new checkpoint
|
||||
|
||||
logger.info(
|
||||
f"Processing remaining files: {[file.drive_file.get('name') for file in files_batch]}"
|
||||
)
|
||||
# Process any remaining files
|
||||
if files_batch:
|
||||
yield from _yield_batch(files_batch)
|
||||
checkpoint.retrieved_folder_and_drive_ids = (
|
||||
self._retrieved_folder_and_drive_ids
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Completed processing files. time taken: {time.time() - start_time}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"Error extracting documents from Google Drive: {e}")
|
||||
raise e
|
||||
@@ -1083,15 +1169,23 @@ class GoogleDriveConnector(SlimConnector, CheckpointedConnector[GoogleDriveCheck
|
||||
"Credentials missing, should not call this method before calling load_credentials"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Loading from checkpoint with completion stage: {checkpoint.completion_stage},"
|
||||
f"retrieved ids: {len(checkpoint.all_retrieved_file_ids)}"
|
||||
)
|
||||
checkpoint = copy.deepcopy(checkpoint)
|
||||
self._retrieved_folder_and_drive_ids = checkpoint.retrieved_folder_and_drive_ids
|
||||
try:
|
||||
yield from self._extract_docs_from_google_drive(checkpoint, start, end)
|
||||
logger.info("Done extracting docs from google drive for this checkpoint")
|
||||
except Exception as e:
|
||||
if MISSING_SCOPES_ERROR_STR in str(e):
|
||||
raise PermissionError(ONYX_SCOPE_INSTRUCTIONS) from e
|
||||
raise e
|
||||
checkpoint.retrieved_folder_and_drive_ids = self._retrieved_folder_and_drive_ids
|
||||
logger.info(
|
||||
f"Returning checkpoint with this many things seen: {len(checkpoint.all_retrieved_file_ids)}"
|
||||
)
|
||||
if checkpoint.completion_stage == DriveRetrievalStage.DONE:
|
||||
checkpoint.has_more = False
|
||||
return checkpoint
|
||||
|
||||
@@ -324,6 +324,9 @@ def convert_drive_item_to_document(
|
||||
if retriever_email in seen:
|
||||
continue
|
||||
seen.add(retriever_email)
|
||||
logger.info(
|
||||
f"Converting file {file.get('name')} to document as retriever: {retriever_email}"
|
||||
)
|
||||
doc_or_failure = _convert_drive_item_to_document(
|
||||
creds, allow_images, size_threshold, retriever_email, file
|
||||
)
|
||||
@@ -335,6 +338,7 @@ def convert_drive_item_to_document(
|
||||
and doc_or_failure.exception.status_code == 403
|
||||
)
|
||||
):
|
||||
logger.info(f" file {file.get('name')} -> {type(doc_or_failure)}")
|
||||
return doc_or_failure
|
||||
|
||||
if first_error is None:
|
||||
@@ -387,6 +391,7 @@ def _convert_drive_item_to_document(
|
||||
# If it's a Google Doc, we might do advanced parsing
|
||||
if file.get("mimeType") == GDriveMimeType.DOC.value:
|
||||
try:
|
||||
logger.info(f"Getting document sections for {file.get('name')}")
|
||||
# get_document_sections is the advanced approach for Google Docs
|
||||
doc_sections = get_document_sections(
|
||||
docs_service=docs_service(),
|
||||
@@ -412,6 +417,7 @@ def _convert_drive_item_to_document(
|
||||
except ValueError:
|
||||
logger.warning(f"Parsing string to int failed: size_str={size_str}")
|
||||
else:
|
||||
logger.info(f"File name: {file.get('name')}, File size: {size_int}")
|
||||
if size_int > size_threshold:
|
||||
logger.warning(
|
||||
f"{file.get('name')} exceeds size threshold of {size_threshold}. Skipping."
|
||||
@@ -420,6 +426,9 @@ def _convert_drive_item_to_document(
|
||||
|
||||
# If we don't have sections yet, use the basic extraction method
|
||||
if not sections:
|
||||
logger.info(
|
||||
f"Downloading and extracting sections (basic) for {file.get('name')}"
|
||||
)
|
||||
sections = _download_and_extract_sections_basic(
|
||||
file, drive_service(), allow_images
|
||||
)
|
||||
|
||||
@@ -236,6 +236,9 @@ def get_all_files_in_my_drive_and_shared(
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
) -> Iterator[GoogleDriveFileType]:
|
||||
import time
|
||||
|
||||
t1 = time.time()
|
||||
kwargs = {}
|
||||
if not is_slim:
|
||||
kwargs[ORDER_BY_KEY] = GoogleFields.MODIFIED_TIME.value
|
||||
@@ -258,14 +261,16 @@ def get_all_files_in_my_drive_and_shared(
|
||||
found_folders = True
|
||||
if found_folders:
|
||||
update_traversed_ids_func(get_root_folder_id(service))
|
||||
|
||||
t2 = time.time()
|
||||
logger.info(f"Time taken to get all folders: {t2 - t1}")
|
||||
# Then get the files
|
||||
file_query = f"mimeType != '{DRIVE_FOLDER_TYPE}'"
|
||||
file_query += " and trashed = false"
|
||||
if not include_shared_with_me:
|
||||
file_query += " and 'me' in owners"
|
||||
file_query += _generate_time_range_filter(start, end)
|
||||
yield from execute_paginated_retrieval(
|
||||
logger.info("listing files as user")
|
||||
for drive_file in execute_paginated_retrieval(
|
||||
retrieval_function=service.files().list,
|
||||
list_key="files",
|
||||
continue_on_404_or_403=False,
|
||||
@@ -273,7 +278,11 @@ def get_all_files_in_my_drive_and_shared(
|
||||
fields=SLIM_FILE_FIELDS if is_slim else FILE_FIELDS,
|
||||
q=file_query,
|
||||
**kwargs,
|
||||
)
|
||||
):
|
||||
logger.info(f"Inner yielding drive file: {drive_file.get('id')}")
|
||||
yield drive_file
|
||||
t3 = time.time()
|
||||
logger.info(f"Time taken to get all files: {t3 - t2}")
|
||||
|
||||
|
||||
def get_all_files_for_oauth(
|
||||
|
||||
@@ -12,6 +12,7 @@ from googleapiclient.errors import HttpError # type: ignore
|
||||
from onyx.connectors.google_drive.models import GoogleDriveFileType
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.retry_wrapper import retry_builder
|
||||
from onyx.utils.threadpool_concurrency import run_with_timeout
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
@@ -176,11 +177,19 @@ def execute_paginated_retrieval(
|
||||
request_kwargs = kwargs.copy()
|
||||
if next_page_token:
|
||||
request_kwargs[PAGE_TOKEN_KEY] = next_page_token
|
||||
results = _execute_single_retrieval(
|
||||
retrieval_function,
|
||||
continue_on_404_or_403,
|
||||
**request_kwargs,
|
||||
)
|
||||
try:
|
||||
results = run_with_timeout(
|
||||
30,
|
||||
_execute_single_retrieval,
|
||||
retrieval_function,
|
||||
continue_on_404_or_403,
|
||||
**request_kwargs,
|
||||
)
|
||||
except TimeoutError:
|
||||
logger.warning(
|
||||
f"Paginated retrieval for {retrieval_function.__name__} timed out after 30 seconds"
|
||||
)
|
||||
break
|
||||
|
||||
next_page_token = results.get(NEXT_PAGE_TOKEN_KEY)
|
||||
if list_key:
|
||||
|
||||
@@ -327,17 +327,20 @@ def index_doc_batch_prepare(
|
||||
This preceeds indexing it into the actual document index."""
|
||||
# Create a trimmed list of docs that don't have a newer updated at
|
||||
# Shortcuts the time-consuming flow on connector index retries
|
||||
logger.info(f"index_doc_batch_prepare: {len(documents)} documents")
|
||||
document_ids: list[str] = [document.id for document in documents]
|
||||
db_docs: list[DBDocument] = get_documents_by_ids(
|
||||
db_session=db_session,
|
||||
document_ids=document_ids,
|
||||
)
|
||||
logger.info(f"index_doc_batch_prepare: {len(db_docs)} db_docs")
|
||||
|
||||
updatable_docs = (
|
||||
get_doc_ids_to_update(documents=documents, db_docs=db_docs)
|
||||
if not ignore_time_skip
|
||||
else documents
|
||||
)
|
||||
logger.info(f"index_doc_batch_prepare: {len(updatable_docs)} updatable_docs")
|
||||
if len(updatable_docs) != len(documents):
|
||||
updatable_doc_ids = [doc.id for doc in updatable_docs]
|
||||
skipped_doc_ids = [
|
||||
@@ -718,6 +721,8 @@ def index_doc_batch(
|
||||
Returns a tuple where the first element is the number of new docs and the
|
||||
second element is the number of chunks."""
|
||||
|
||||
logger.error(f"index_doc_batch: {len(document_batch)} documents")
|
||||
|
||||
no_access = DocumentAccess.build(
|
||||
user_emails=[],
|
||||
user_groups=[],
|
||||
@@ -1057,6 +1062,7 @@ def build_indexing_pipeline(
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
) -> IndexingPipelineProtocol:
|
||||
"""Builds a pipeline which takes in a list (batch) of docs and indexes them."""
|
||||
logger.error("Building indexing pipeline")
|
||||
all_search_settings = get_active_search_settings(db_session)
|
||||
if (
|
||||
all_search_settings.secondary
|
||||
@@ -1067,10 +1073,12 @@ def build_indexing_pipeline(
|
||||
search_settings = all_search_settings.primary
|
||||
|
||||
multipass_config = get_multipass_config(search_settings)
|
||||
logger.error(f"multipass_config: {multipass_config}")
|
||||
|
||||
enable_contextual_rag = (
|
||||
search_settings.enable_contextual_rag or ENABLE_CONTEXTUAL_RAG
|
||||
)
|
||||
logger.error(f"enable_contextual_rag: {enable_contextual_rag}")
|
||||
llm = None
|
||||
if enable_contextual_rag:
|
||||
llm = get_llm_for_contextual_rag(
|
||||
|
||||
@@ -223,6 +223,7 @@ def run_functions_tuples_in_parallel(
|
||||
raise
|
||||
|
||||
results.sort(key=lambda x: x[0])
|
||||
logger.info(f"Results from run in parallel: {len(results)}")
|
||||
return [result for index, result in results]
|
||||
|
||||
|
||||
@@ -297,6 +298,9 @@ class TimeoutThread(threading.Thread, Generic[R]):
|
||||
self.exception = e
|
||||
|
||||
def end(self) -> None:
|
||||
logger.info(
|
||||
f"Function {self.func.__name__} timed out after {self.timeout} seconds"
|
||||
)
|
||||
raise TimeoutError(
|
||||
f"Function {self.func.__name__} timed out after {self.timeout} seconds"
|
||||
)
|
||||
@@ -374,7 +378,9 @@ def parallel_yield(gens: list[Iterator[R]], max_workers: int = 10) -> Iterator[R
|
||||
|
||||
next_ind = len(gens)
|
||||
while future_to_index:
|
||||
logger.info(f"Waiting for {len(future_to_index)} futures")
|
||||
done, _ = wait(future_to_index, return_when=FIRST_COMPLETED)
|
||||
logger.info(f"Done: {len(done)}")
|
||||
for future in done:
|
||||
ind, result = future.result()
|
||||
if result is not None:
|
||||
|
||||
Reference in New Issue
Block a user