Compare commits

...

1 Commits

Author SHA1 Message Date
Weves
6426b80abb Add logging to investigate slow primary work 2025-06-28 13:42:03 -07:00
2 changed files with 37 additions and 0 deletions

View File

@@ -422,6 +422,7 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
# SPECIAL 0/3: sync lookup table for active fences
# we want to run this less frequently than the overall task
if not redis_client.exists(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE):
task_logger.info("check_for_indexing - building fence lookup table")
# build a lookup table of existing fences
# this is just a migration concern and should be unnecessary once
# lookup tables are rolled out
@@ -440,6 +441,8 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
ex=OnyxRuntime.get_build_fence_lookup_table_interval(),
)
task_logger.info("check_for_indexing - fence lookup table build complete")
# 1/3: KICKOFF
# check for search settings swap
@@ -461,6 +464,8 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
embedding_model=embedding_model,
)
task_logger.info("check_for_indexing - search settings check and swap complete")
# gather cc_pair_ids
lock_beat.reacquire()
cc_pair_ids: list[int] = []
@@ -471,6 +476,8 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
for cc_pair_entry in cc_pairs:
cc_pair_ids.append(cc_pair_entry.id)
task_logger.info("check_for_indexing - cc_pair_ids gather complete")
# mark CC Pairs that are repeatedly failing as in repeated error state
with get_session_with_current_tenant() as db_session:
current_search_settings = get_current_search_settings(db_session)
@@ -486,6 +493,10 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
in_repeated_error_state=True,
)
task_logger.info(
"check_for_indexing - cc_pair_ids repeated error state marking complete"
)
# kick off index attempts
for cc_pair_id in cc_pair_ids:
lock_beat.reacquire()
@@ -588,6 +599,8 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
f"search_settings={search_settings_instance.id}"
)
task_logger.info("check_for_indexing - indexing task kickoff complete")
lock_beat.reacquire()
# 2/3: VALIDATE
@@ -617,12 +630,17 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
attempt.id, db_session, failure_reason=failure_reason
)
task_logger.info(
"check_for_indexing - unfenced index attempts validation complete"
)
lock_beat.reacquire()
# we want to run this less frequently than the overall task
if not redis_client.exists(OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES):
# clear any indexing fences that don't have associated celery tasks in progress
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
task_logger.info("check_for_indexing - validating all indexing fences")
try:
validate_indexing_fences(
tenant_id, redis_client_replica, redis_client_celery, lock_beat
@@ -632,6 +650,8 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
redis_client.set(OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES, 1, ex=60)
task_logger.info("check_for_indexing - validating all indexing fences complete")
# 3/3: FINALIZE
lock_beat.reacquire()
keys = cast(
@@ -647,10 +667,15 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
key_str = key_bytes.decode("utf-8")
if key_str.startswith(RedisConnectorIndex.FENCE_PREFIX):
with get_session_with_current_tenant() as db_session:
task_logger.info(
f"check_for_indexing - monitoring indexing taskset: {key_str}"
)
monitor_ccpair_indexing_taskset(
tenant_id, key_bytes, redis_client_replica, db_session
)
task_logger.info("check_for_indexing - indexing taskset monitoring complete")
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."

View File

@@ -116,6 +116,8 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
bind=True,
)
def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
task_logger.info("check_for_pruning - Starting")
r = get_redis_client()
r_replica = get_redis_replica_client()
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
@@ -164,9 +166,12 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
)
r.set(OnyxRedisSignals.BLOCK_PRUNING, 1, ex=3600)
task_logger.info("check_for_pruning - pruning task kickoff complete")
# we want to run this less frequently than the overall task
lock_beat.reacquire()
if not r.exists(OnyxRedisSignals.BLOCK_VALIDATE_PRUNING_FENCES):
task_logger.info("check_for_pruning - validating pruning fences")
# clear any permission fences that don't have associated celery tasks in progress
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
@@ -177,6 +182,8 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
r.set(OnyxRedisSignals.BLOCK_VALIDATE_PRUNING_FENCES, 1, ex=300)
task_logger.info("check_for_pruning - pruning fence validation complete")
# use a lookup table to find active fences. We still have to verify the fence
# exists since it is an optimization and not the source of truth.
lock_beat.reacquire()
@@ -190,8 +197,13 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
key_str = key_bytes.decode("utf-8")
if key_str.startswith(RedisConnectorPrune.FENCE_PREFIX):
task_logger.info(
f"check_for_pruning - monitoring pruning taskset: {key_str}"
)
with get_session_with_current_tenant() as db_session:
monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session)
task_logger.info("check_for_pruning - pruning taskset monitoring complete")
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."