1
0
forked from github/onyx

fix: remove erroneous error case and add valid error (#5163)

* fix: remove erroneous error case and add valid error

* also address docfetching-docprocessing limbo
This commit is contained in:
Evan Lohn
2025-08-07 11:17:00 -07:00
committed by GitHub
parent e1a305d18a
commit 1b47fa2700
12 changed files with 108 additions and 199 deletions

View File

@@ -32,7 +32,6 @@ from onyx.db.indexing_coordination import IndexingCoordination
from onyx.redis.redis_connector_delete import RedisConnectorDelete
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from onyx.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync
from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.redis.redis_connector_prune import RedisConnectorPrune
from onyx.redis.redis_connector_stop import RedisConnectorStop
from onyx.redis.redis_document_set import RedisDocumentSet
@@ -161,7 +160,6 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
RedisUserGroup.reset_all(r)
RedisConnectorDelete.reset_all(r)
RedisConnectorPrune.reset_all(r)
RedisConnectorIndex.reset_all(r)
RedisConnectorStop.reset_all(r)
RedisConnectorPermissionSync.reset_all(r)
RedisConnectorExternalGroupSync.reset_all(r)

View File

@@ -193,12 +193,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str) -> bool | N
task_logger.info(
"Timed out waiting for tasks blocking deletion. Resetting blocking fences."
)
search_settings_list = get_all_search_settings(db_session)
for search_settings in search_settings_list:
redis_connector_index = redis_connector.new_index(
search_settings.id
)
redis_connector_index.reset()
redis_connector.prune.reset()
redis_connector.permissions.reset()
redis_connector.external_group_sync.reset()

View File

@@ -2,7 +2,6 @@ import multiprocessing
import os
import time
import traceback
from http import HTTPStatus
from time import sleep
import sentry_sdk
@@ -22,7 +21,7 @@ from onyx.background.celery.tasks.models import SimpleJobResult
from onyx.background.indexing.job_client import SimpleJob
from onyx.background.indexing.job_client import SimpleJobClient
from onyx.background.indexing.job_client import SimpleJobException
from onyx.background.indexing.run_docfetching import run_indexing_entrypoint
from onyx.background.indexing.run_docfetching import run_docfetching_entrypoint
from onyx.configs.constants import CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT
from onyx.configs.constants import OnyxCeleryTask
from onyx.connectors.exceptions import ConnectorValidationError
@@ -34,7 +33,6 @@ from onyx.db.index_attempt import mark_attempt_canceled
from onyx.db.index_attempt import mark_attempt_failed
from onyx.db.indexing_coordination import IndexingCoordination
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import global_version
from shared_configs.configs import SENTRY_DSN
@@ -156,7 +154,6 @@ def _docfetching_task(
)
redis_connector = RedisConnector(tenant_id, cc_pair_id)
redis_connector.new_index(search_settings_id)
# TODO: remove all fences, cause all signals to be set in postgres
if redis_connector.delete.fenced:
@@ -214,7 +211,7 @@ def _docfetching_task(
)
# This is where the heavy/real work happens
run_indexing_entrypoint(
run_docfetching_entrypoint(
app,
index_attempt_id,
tenant_id,
@@ -261,7 +258,7 @@ def _docfetching_task(
def process_job_result(
job: SimpleJob,
connector_source: str | None,
redis_connector_index: RedisConnectorIndex,
index_attempt_id: int,
log_builder: ConnectorIndexingLogBuilder,
) -> SimpleJobResult:
result = SimpleJobResult()
@@ -278,13 +275,11 @@ def process_job_result(
# In EKS, there is an edge case where successful tasks return exit
# code 1 in the cloud due to the set_spawn_method not sticking.
# We've since worked around this, but the following is a safe way to
# work around this issue. Basically, we ignore the job error state
# if the completion signal is OK.
status_int = redis_connector_index.get_completion()
if status_int:
status_enum = HTTPStatus(status_int)
if status_enum == HTTPStatus.OK:
# Workaround: check that the total number of batches is set, since this only
# happens when docfetching completed successfully
with get_session_with_current_tenant() as db_session:
index_attempt = get_index_attempt(db_session, index_attempt_id)
if index_attempt and index_attempt.total_batches is not None:
ignore_exitcode = True
if ignore_exitcode:
@@ -458,9 +453,6 @@ def docfetching_proxy_task(
)
)
redis_connector = RedisConnector(tenant_id, cc_pair_id)
redis_connector_index = redis_connector.new_index(search_settings_id)
# Track the last time memory info was emitted
last_memory_emit_time = 0.0
@@ -487,7 +479,7 @@ def docfetching_proxy_task(
if job.done():
try:
result = process_job_result(
job, result.connector_source, redis_connector_index, log_builder
job, result.connector_source, index_attempt_id, log_builder
)
except Exception:
task_logger.exception(

View File

@@ -4,7 +4,6 @@ from collections import defaultdict
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from http import HTTPStatus
from typing import Any
from celery import shared_task
@@ -16,6 +15,8 @@ from sqlalchemy import select
from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
from onyx.background.celery.tasks.docprocessing.heartbeat import start_heartbeat
@@ -66,6 +67,7 @@ from onyx.db.index_attempt import mark_attempt_failed
from onyx.db.index_attempt import mark_attempt_partially_succeeded
from onyx.db.index_attempt import mark_attempt_succeeded
from onyx.db.indexing_coordination import CoordinationStatus
from onyx.db.indexing_coordination import INDEXING_PROGRESS_TIMEOUT_HOURS
from onyx.db.indexing_coordination import IndexingCoordination
from onyx.db.models import IndexAttempt
from onyx.db.search_settings import get_active_search_settings_list
@@ -102,6 +104,7 @@ from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
logger = setup_logger()
USER_FILE_INDEXING_LIMIT = 100
DOCPROCESSING_STALL_TIMEOUT_MULTIPLIER = 4
def _get_fence_validation_block_expiration() -> int:
@@ -257,7 +260,7 @@ class ConnectorIndexingLogBuilder:
def monitor_indexing_attempt_progress(
attempt: IndexAttempt, tenant_id: str, db_session: Session
attempt: IndexAttempt, tenant_id: str, db_session: Session, task: Task
) -> None:
"""
TODO: rewrite this docstring
@@ -316,7 +319,9 @@ def monitor_indexing_attempt_progress(
# Check task completion using Celery
try:
check_indexing_completion(attempt.id, coordination_status, storage, tenant_id)
check_indexing_completion(
attempt.id, coordination_status, storage, tenant_id, task
)
except Exception as e:
logger.exception(
f"Failed to monitor document processing completion: "
@@ -350,6 +355,7 @@ def check_indexing_completion(
coordination_status: CoordinationStatus,
storage: DocumentBatchStorage,
tenant_id: str,
task: Task,
) -> None:
logger.info(
@@ -376,21 +382,78 @@ def check_indexing_completion(
# Update progress tracking and check for stalls
with get_session_with_current_tenant() as db_session:
# Update progress tracking
stalled_timeout_hours = INDEXING_PROGRESS_TIMEOUT_HOURS
# Index attempts that are waiting between docfetching and
# docprocessing get a generous stalling timeout
if batches_total is not None and batches_processed == 0:
stalled_timeout_hours = (
stalled_timeout_hours * DOCPROCESSING_STALL_TIMEOUT_MULTIPLIER
)
timed_out = not IndexingCoordination.update_progress_tracking(
db_session, index_attempt_id, batches_processed
db_session,
index_attempt_id,
batches_processed,
timeout_hours=stalled_timeout_hours,
)
# Check for stalls (3-6 hour timeout). Only applies to in-progress attempts.
attempt = get_index_attempt(db_session, index_attempt_id)
if timed_out and attempt and attempt.status == IndexingStatus.IN_PROGRESS:
logger.error(
f"Indexing attempt {index_attempt_id} has been indexing for 3-6 hours without progress. "
f"Marking it as failed."
)
mark_attempt_failed(
index_attempt_id, db_session, failure_reason="Stalled indexing"
)
if attempt and timed_out:
if attempt.status == IndexingStatus.IN_PROGRESS:
logger.error(
f"Indexing attempt {index_attempt_id} has been indexing for "
f"{stalled_timeout_hours//2}-{stalled_timeout_hours} hours without progress. "
f"Marking it as failed."
)
mark_attempt_failed(
index_attempt_id, db_session, failure_reason="Stalled indexing"
)
elif (
attempt.status == IndexingStatus.NOT_STARTED and attempt.celery_task_id
):
# Check if the task exists in the celery queue
# This handles the case where Redis dies after task creation but before task execution
redis_celery = task.app.broker_connection().channel().client # type: ignore
task_exists = celery_find_task(
attempt.celery_task_id,
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING,
redis_celery,
)
unacked_task_ids = celery_get_unacked_task_ids(
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, redis_celery
)
if not task_exists and attempt.celery_task_id not in unacked_task_ids:
# there is a race condition where the docfetching task has been taken off
# the queues (i.e. started) but the indexing attempt still has a status of
# Not Started because the switch to in progress takes like 0.1 seconds.
# sleep a bit and confirm that the attempt is still not in progress.
time.sleep(1)
attempt = get_index_attempt(db_session, index_attempt_id)
if attempt and attempt.status == IndexingStatus.NOT_STARTED:
logger.error(
f"Task {attempt.celery_task_id} attached to indexing attempt "
f"{index_attempt_id} does not exist in the queue. "
f"Marking indexing attempt as failed."
)
mark_attempt_failed(
index_attempt_id,
db_session,
failure_reason="Task not in queue",
)
else:
logger.info(
f"Indexing attempt {index_attempt_id} is {attempt.status}. 3-6 hours without heartbeat "
"but task is in the queue. Likely underprovisioned docfetching worker."
)
# Update last progress time so we won't time out again for another 3 hours
IndexingCoordination.update_progress_tracking(
db_session,
index_attempt_id,
batches_processed,
force_update_progress=True,
)
# check again on the next check_for_indexing task
# TODO: on the cloud this is currently 25 minutes at most, which
@@ -450,15 +513,6 @@ def check_indexing_completion(
db_session=db_session,
)
# TODO: make it so we don't need this (might already be true)
redis_connector = RedisConnector(
tenant_id, attempt.connector_credential_pair_id
)
redis_connector_index = redis_connector.new_index(
attempt.search_settings_id
)
redis_connector_index.set_generator_complete(HTTPStatus.OK.value)
# Clean up FileStore storage (still needed for document batches during transition)
try:
logger.info(f"Cleaning up storage after indexing completion: {storage}")
@@ -812,7 +866,9 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
for attempt in active_attempts:
try:
monitor_indexing_attempt_progress(attempt, tenant_id, db_session)
monitor_indexing_attempt_progress(
attempt, tenant_id, db_session, self
)
except Exception:
task_logger.exception(f"Error monitoring attempt {attempt.id}")
@@ -1086,12 +1142,8 @@ def _docprocessing_task(
f"Index attempt {index_attempt_id} is not running, status {index_attempt.status}"
)
redis_connector_index = redis_connector.new_index(
index_attempt.search_settings.id
)
cross_batch_db_lock: RedisLock = r.lock(
redis_connector_index.db_lock_key,
redis_connector.db_lock_key(index_attempt.search_settings.id),
timeout=CELERY_INDEXING_LOCK_TIMEOUT,
thread_local=False,
)
@@ -1231,17 +1283,6 @@ def _docprocessing_task(
f"attempt={index_attempt_id} "
)
# on failure, signal completion with an error to unblock the watchdog
with get_session_with_current_tenant() as db_session:
index_attempt = get_index_attempt(db_session, index_attempt_id)
if index_attempt and index_attempt.search_settings:
redis_connector_index = redis_connector.new_index(
index_attempt.search_settings.id
)
redis_connector_index.set_generator_complete(
HTTPStatus.INTERNAL_SERVER_ERROR.value
)
raise
finally:
if per_batch_lock and per_batch_lock.owned():

View File

@@ -47,7 +47,6 @@ from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.models import ConnectorCredentialPair
from onyx.db.search_settings import get_current_search_settings
from onyx.db.sync_record import insert_sync_record
from onyx.db.sync_record import update_sync_record_status
from onyx.db.tag import delete_orphan_tags__no_commit
@@ -519,9 +518,6 @@ def connector_pruning_generator_task(
cc_pair.credential,
)
search_settings = get_current_search_settings(db_session)
redis_connector.new_index(search_settings.id)
callback = PruneCallback(
0,
redis_connector,

View File

@@ -832,7 +832,7 @@ def _run_indexing(
)
def run_indexing_entrypoint(
def run_docfetching_entrypoint(
app: Celery,
index_attempt_id: int,
tenant_id: str,
@@ -1350,6 +1350,9 @@ def reissue_old_batches(
)
path_info = batch_storage.extract_path_info(batch_id)
if path_info is None:
logger.warning(
f"Could not extract path info from batch {batch_id}, skipping"
)
continue
if path_info.cc_pair_id != cc_pair_id:
raise RuntimeError(f"Batch {batch_id} is not for cc pair {cc_pair_id}")

View File

@@ -267,6 +267,7 @@ class IndexingCoordination:
index_attempt_id: int,
current_batches_completed: int,
timeout_hours: int = INDEXING_PROGRESS_TIMEOUT_HOURS,
force_update_progress: bool = False,
) -> bool:
"""
Update progress tracking for stall detection.
@@ -281,7 +282,8 @@ class IndexingCoordination:
current_time = get_db_current_time(db_session)
# No progress - check if this is the first time tracking
if attempt.last_progress_time is None:
# or if the caller wants to simulate guaranteed progress
if attempt.last_progress_time is None or force_update_progress:
# First time tracking - initialize
attempt.last_progress_time = current_time
attempt.last_batches_completed_count = current_batches_completed

View File

@@ -196,6 +196,9 @@ class FileStoreDocumentBatchStorage(DocumentBatchStorage):
for batch_file_name in batch_names:
path_info = self.extract_path_info(batch_file_name)
if path_info is None:
logger.warning(
f"Could not extract path info from batch file: {batch_file_name}"
)
continue
new_batch_file_name = self._get_batch_file_name(path_info.batch_num)
self.file_store.change_file_id(batch_file_name, new_batch_file_name)

View File

@@ -3,7 +3,6 @@ import redis
from onyx.redis.redis_connector_delete import RedisConnectorDelete
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from onyx.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync
from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.redis.redis_connector_prune import RedisConnectorPrune
from onyx.redis.redis_connector_stop import RedisConnectorStop
from onyx.redis.redis_pool import get_redis_client
@@ -31,11 +30,6 @@ class RedisConnector:
tenant_id, cc_pair_id, self.redis
)
def new_index(self, search_settings_id: int) -> RedisConnectorIndex:
return RedisConnectorIndex(
self.tenant_id, self.cc_pair_id, search_settings_id, self.redis
)
@staticmethod
def get_id_from_fence_key(key: str) -> str | None:
"""
@@ -81,3 +75,11 @@ class RedisConnector:
object_id = parts[1]
return object_id
def db_lock_key(self, search_settings_id: int) -> str:
"""
Key for the db lock for an indexing attempt.
Prevents multiple modifications to the current indexing attempt row
from multiple docfetching/docprocessing tasks.
"""
return f"da_lock:indexing:db_{self.cc_pair_id}/{search_settings_id}"

View File

@@ -1,126 +1,10 @@
from datetime import datetime
from typing import cast
import redis
from pydantic import BaseModel
from onyx.configs.constants import CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT
class RedisConnectorIndexPayload(BaseModel):
index_attempt_id: int | None
started: datetime | None
submitted: datetime
celery_task_id: str | None
class RedisConnectorIndex:
"""Manages interactions with redis for indexing tasks. Should only be accessed
through RedisConnector."""
PREFIX = "connectorindexing"
FENCE_PREFIX = f"{PREFIX}_fence" # "connectorindexing_fence"
GENERATOR_TASK_PREFIX = PREFIX + "+generator" # "connectorindexing+generator_fence"
GENERATOR_PROGRESS_PREFIX = (
PREFIX + "_generator_progress"
) # connectorindexing_generator_progress
GENERATOR_COMPLETE_PREFIX = (
PREFIX + "_generator_complete"
) # connectorindexing_generator_complete
GENERATOR_LOCK_PREFIX = "da_lock:indexing:docfetching"
FILESTORE_LOCK_PREFIX = "da_lock:indexing:filestore"
DB_LOCK_PREFIX = "da_lock:indexing:db"
PER_WORKER_LOCK_PREFIX = "da_lock:indexing:per_worker"
TERMINATE_PREFIX = PREFIX + "_terminate" # connectorindexing_terminate
TERMINATE_TTL = 600
# used to signal the overall workflow is still active
# it's impossible to get the exact state of the system at a single point in time
# so we need a signal with a TTL to bridge gaps in our checks
ACTIVE_PREFIX = PREFIX + "_active"
ACTIVE_TTL = 3600
# used to signal that the watchdog is running
WATCHDOG_PREFIX = PREFIX + "_watchdog"
WATCHDOG_TTL = 300
# used to signal that the connector itself is still running
CONNECTOR_ACTIVE_PREFIX = PREFIX + "_connector_active"
CONNECTOR_ACTIVE_TTL = CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT
def __init__(
self,
tenant_id: str,
cc_pair_id: int,
search_settings_id: int,
redis: redis.Redis,
) -> None:
self.tenant_id: str = tenant_id
self.cc_pair_id = cc_pair_id
self.search_settings_id = search_settings_id
self.redis = redis
self.generator_complete_key = (
f"{self.GENERATOR_COMPLETE_PREFIX}_{cc_pair_id}/{search_settings_id}"
)
self.filestore_lock_key = (
f"{self.FILESTORE_LOCK_PREFIX}_{cc_pair_id}/{search_settings_id}"
)
self.generator_lock_key = (
f"{self.GENERATOR_LOCK_PREFIX}_{cc_pair_id}/{search_settings_id}"
)
self.per_worker_lock_key = (
f"{self.PER_WORKER_LOCK_PREFIX}_{cc_pair_id}/{search_settings_id}"
)
self.db_lock_key = f"{self.DB_LOCK_PREFIX}_{cc_pair_id}/{search_settings_id}"
self.terminate_key = (
f"{self.TERMINATE_PREFIX}_{cc_pair_id}/{search_settings_id}"
)
def set_generator_complete(self, payload: int | None) -> None:
if not payload:
self.redis.delete(self.generator_complete_key)
return
self.redis.set(self.generator_complete_key, payload)
def generator_clear(self) -> None:
self.redis.delete(self.generator_complete_key)
def get_completion(self) -> int | None:
bytes = self.redis.get(self.generator_complete_key)
if bytes is None:
return None
status = int(cast(int, bytes))
return status
def reset(self) -> None:
self.redis.delete(self.filestore_lock_key)
self.redis.delete(self.db_lock_key)
self.redis.delete(self.generator_lock_key)
self.redis.delete(self.generator_complete_key)
@staticmethod
def reset_all(r: redis.Redis) -> None:
"""Deletes all redis values for all connectors"""
# leaving these temporarily for backwards compat, TODO: remove
for key in r.scan_iter(RedisConnectorIndex.CONNECTOR_ACTIVE_PREFIX + "*"):
r.delete(key)
for key in r.scan_iter(RedisConnectorIndex.ACTIVE_PREFIX + "*"):
r.delete(key)
for key in r.scan_iter(RedisConnectorIndex.FILESTORE_LOCK_PREFIX + "*"):
r.delete(key)
for key in r.scan_iter(RedisConnectorIndex.GENERATOR_COMPLETE_PREFIX + "*"):
r.delete(key)
for key in r.scan_iter(RedisConnectorIndex.GENERATOR_PROGRESS_PREFIX + "*"):
r.delete(key)
for key in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
r.delete(key)

View File

@@ -1,6 +1,5 @@
from onyx.redis.redis_connector_delete import RedisConnectorDelete
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.redis.redis_connector_prune import RedisConnectorPrune
from onyx.redis.redis_document_set import RedisDocumentSet
from onyx.redis.redis_usergroup import RedisUserGroup
@@ -16,8 +15,6 @@ def is_fence(key_bytes: bytes) -> bool:
return True
if key_str.startswith(RedisConnectorPrune.FENCE_PREFIX):
return True
if key_str.startswith(RedisConnectorIndex.FENCE_PREFIX):
return True
if key_str.startswith(RedisConnectorPermissionSync.FENCE_PREFIX):
return True

View File

@@ -22,7 +22,6 @@ from onyx.configs.app_configs import REDIS_SSL
from onyx.db.engine.sql_engine import get_session_with_tenant
from onyx.db.users import get_user_by_email
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.redis.redis_pool import RedisPool
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
@@ -130,9 +129,6 @@ def onyx_redis(
logger.info(f"Purging locks associated with deleting cc_pair={cc_pair_id}.")
redis_connector = RedisConnector(tenant_id, cc_pair_id)
match_pattern = f"{tenant_id}:{RedisConnectorIndex.FENCE_PREFIX}_{cc_pair_id}/*"
purge_by_match_and_type(match_pattern, "string", batch, dry_run, r)
redis_delete_if_exists_helper(
f"{tenant_id}:{redis_connector.prune.fence_key}", dry_run, r
)