Compare commits

...

2 Commits

Author SHA1 Message Date
Richard Kuo (Danswer)
12dcf6a09b add more logging 2025-01-27 14:29:47 -08:00
Richard Kuo (Danswer)
c1a4eea2d0 add timings for syncing 2025-01-27 11:54:17 -08:00
2 changed files with 44 additions and 19 deletions

View File

@@ -1092,30 +1092,42 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
def vespa_metadata_sync_task(
self: Task, document_id: str, tenant_id: str | None
) -> bool:
timings: dict[str, Any] = {}
start = time.monotonic()
timings["start"] = start
try:
with get_session_with_tenant(tenant_id) as db_session:
phase_start = time.monotonic()
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
doc_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
)
timings["get_index"] = time.monotonic() - phase_start
phase_start = time.monotonic()
retry_index = RetryDocumentIndex(doc_index)
doc = get_document(document_id, db_session)
if not doc:
return False
timings["get_document"] = time.monotonic() - phase_start
# document set sync
phase_start = time.monotonic()
doc_sets = fetch_document_sets_for_document(document_id, db_session)
update_doc_sets: set[str] = set(doc_sets)
timings["fetch_document_sets_for_document"] = time.monotonic() - phase_start
# User group sync
phase_start = time.monotonic()
doc_access = get_access_for_document(
document_id=document_id, db_session=db_session
)
timings["get_access_for_document"] = time.monotonic() - phase_start
phase_start = time.monotonic()
fields = VespaDocumentFields(
document_sets=update_doc_sets,
access=doc_access,
@@ -1130,10 +1142,13 @@ def vespa_metadata_sync_task(
chunk_count=doc.chunk_count,
fields=fields,
)
timings["index.update_single"] = time.monotonic() - phase_start
# update db last. Worst case = we crash right before this and
# the sync might repeat again later
phase_start = time.monotonic()
mark_document_as_synced(document_id, db_session)
timings["mark_document_as_synced"] = time.monotonic() - phase_start
# this code checks for and removes a per document sync key that is
# used to block out the same doc from continualy resyncing
@@ -1149,7 +1164,8 @@ def vespa_metadata_sync_task(
f"doc={document_id} "
f"action=sync "
f"chunks={chunks_affected} "
f"elapsed={elapsed:.2f}"
f"elapsed={elapsed:.2f} "
f"debug_timings={timings}"
)
except SoftTimeLimitExceeded:
task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}")

View File

@@ -10,6 +10,7 @@ import zipfile
from dataclasses import dataclass
from datetime import datetime
from datetime import timedelta
from typing import Any
from typing import BinaryIO
from typing import cast
from typing import List
@@ -523,6 +524,7 @@ class VespaIndex(DocumentIndex):
index_name: str,
fields: VespaDocumentFields,
doc_id: str,
http_client: httpx.Client,
) -> None:
"""
Update a single "chunk" (document) in Vespa using its chunk ID.
@@ -554,18 +556,17 @@ class VespaIndex(DocumentIndex):
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}?create=true"
with get_vespa_http_client(http2=False) as http_client:
try:
resp = http_client.put(
vespa_url,
headers={"Content-Type": "application/json"},
json=update_dict,
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}"
logger.error(error_message)
raise
try:
resp = http_client.put(
vespa_url,
headers={"Content-Type": "application/json"},
json=update_dict,
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}"
logger.error(error_message)
raise
def update_single(
self,
@@ -580,6 +581,8 @@ class VespaIndex(DocumentIndex):
Handle other exceptions if you wish to implement retry behavior
"""
timings: dict[str, Any] = {}
doc_chunk_count = 0
index_names = [self.index_name]
@@ -588,6 +591,7 @@ class VespaIndex(DocumentIndex):
with get_vespa_http_client(http2=False) as http_client:
for index_name in index_names:
phase_start = time.monotonic()
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
multipass_config = get_multipass_config(
db_session=db_session,
@@ -609,15 +613,20 @@ class VespaIndex(DocumentIndex):
)
doc_chunk_count += len(doc_chunk_ids)
timings["prep"] = time.monotonic() - phase_start
phase_start = time.monotonic()
chunk = 0
for doc_chunk_id in doc_chunk_ids:
self.update_single_chunk(
doc_chunk_id=doc_chunk_id,
index_name=index_name,
fields=fields,
doc_id=doc_id,
)
chunk += 1
phase_start = time.monotonic()
self.update_single_chunk(
doc_chunk_id, index_name, fields, doc_id, http_client
)
timings[f"chunk_{chunk}"] = time.monotonic() - phase_start
logger.debug(f"timings={timings}")
return doc_chunk_count
def delete_single(