Compare commits

...

1 Commits

Author SHA1 Message Date
Richard Kuo (Danswer)
05cd0a5ef5 select only doc_id 2025-02-05 18:12:01 -08:00
2 changed files with 33 additions and 8 deletions

View File

@@ -105,6 +105,32 @@ def construct_document_select_for_connector_credential_pair_by_needs_sync(
return stmt
def construct_document_id_select_for_connector_credential_pair_by_needs_sync(
connector_id: int, credential_id: int
) -> Select:
initial_doc_ids_stmt = select(DocumentByConnectorCredentialPair.id).where(
and_(
DocumentByConnectorCredentialPair.connector_id == connector_id,
DocumentByConnectorCredentialPair.credential_id == credential_id,
)
)
stmt = (
select(DbDocument.id)
.where(
DbDocument.id.in_(initial_doc_ids_stmt),
or_(
DbDocument.last_modified
> DbDocument.last_synced, # last_modified is newer than last_synced
DbDocument.last_synced.is_(None), # never synced
),
)
.distinct()
)
return stmt
def get_all_documents_needing_vespa_sync_for_cc_pair(
db_session: Session, cc_pair_id: int
) -> list[DbDocument]:

View File

@@ -16,9 +16,8 @@ from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.document import (
construct_document_select_for_connector_credential_pair_by_needs_sync,
construct_document_id_select_for_connector_credential_pair_by_needs_sync,
)
from onyx.db.models import Document
from onyx.redis.redis_object_helper import RedisObjectHelper
@@ -80,14 +79,14 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
if not cc_pair:
return None
stmt = construct_document_select_for_connector_credential_pair_by_needs_sync(
stmt = construct_document_id_select_for_connector_credential_pair_by_needs_sync(
cc_pair.connector_id, cc_pair.credential_id
)
num_docs = 0
for doc in db_session.scalars(stmt).yield_per(DB_YIELD_PER_DEFAULT):
doc = cast(Document, doc)
for doc_id in db_session.scalars(stmt).yield_per(DB_YIELD_PER_DEFAULT):
doc_id = cast(str, doc_id)
current_time = time.monotonic()
if current_time - last_lock_time >= (
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
@@ -98,7 +97,7 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
num_docs += 1
# check if we should skip the document (typically because it's already syncing)
if doc.id in self.skip_docs:
if doc_id in self.skip_docs:
continue
# celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac"
@@ -116,14 +115,14 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
# Priority on sync's triggered by new indexing should be medium
result = celery_app.send_task(
OnyxCeleryTask.VESPA_METADATA_SYNC_TASK,
kwargs=dict(document_id=doc.id, tenant_id=tenant_id),
kwargs=dict(document_id=doc_id, tenant_id=tenant_id),
queue=OnyxCeleryQueues.VESPA_METADATA_SYNC,
task_id=custom_task_id,
priority=OnyxCeleryPriority.MEDIUM,
)
async_results.append(result)
self.skip_docs.add(doc.id)
self.skip_docs.add(doc_id)
if len(async_results) >= max_tasks:
break