1
0
forked from github/onyx

fix: thread safe approach to docprocessing logging (#5185)

* thread safe approach to docprocessing logging

* unify approaches

* reset
This commit is contained in:
Evan Lohn
2025-08-11 19:25:47 -07:00
committed by GitHub
parent 9bc62cc803
commit da02962a67
4 changed files with 21 additions and 39 deletions

View File

@@ -92,7 +92,6 @@ from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.redis.redis_utils import is_fence
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.utils.logger import setup_logger
from onyx.utils.logger import TaskAttemptSingleton
from onyx.utils.middleware import make_randomized_onyx_request_id
from onyx.utils.telemetry import optional_telemetry
from onyx.utils.telemetry import RecordType
@@ -100,6 +99,7 @@ from shared_configs.configs import INDEXING_MODEL_SERVER_HOST
from shared_configs.configs import INDEXING_MODEL_SERVER_PORT
from shared_configs.configs import MULTI_TENANT
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
from shared_configs.contextvars import INDEX_ATTEMPT_INFO_CONTEXTVAR
logger = setup_logger()
@@ -1072,9 +1072,12 @@ def docprocessing_task(
# Start heartbeat for this indexing attempt
heartbeat_thread, stop_event = start_heartbeat(index_attempt_id)
try:
# Cannot use the TaskSingleton approach here because the worker is multithreaded
token = INDEX_ATTEMPT_INFO_CONTEXTVAR.set((cc_pair_id, index_attempt_id))
_docprocessing_task(index_attempt_id, cc_pair_id, tenant_id, batch_num)
finally:
stop_heartbeat(heartbeat_thread, stop_event) # Stop heartbeat before exiting
INDEX_ATTEMPT_INFO_CONTEXTVAR.reset(token)
def _docprocessing_task(
@@ -1085,9 +1088,6 @@ def _docprocessing_task(
) -> None:
start_time = time.monotonic()
# set the indexing attempt ID so that all log messages from this process
# will have it added as a prefix
TaskAttemptSingleton.set_cc_and_index_id(index_attempt_id, cc_pair_id)
if tenant_id:
CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)

View File

@@ -71,13 +71,13 @@ from onyx.natural_language_processing.search_nlp_models import (
InformationContentClassificationModel,
)
from onyx.utils.logger import setup_logger
from onyx.utils.logger import TaskAttemptSingleton
from onyx.utils.middleware import make_randomized_onyx_request_id
from onyx.utils.telemetry import create_milestone_and_report
from onyx.utils.telemetry import optional_telemetry
from onyx.utils.telemetry import RecordType
from onyx.utils.variable_functionality import global_version
from shared_configs.configs import MULTI_TENANT
from shared_configs.contextvars import INDEX_ATTEMPT_INFO_CONTEXTVAR
logger = setup_logger(propagate=False)
@@ -851,8 +851,8 @@ def run_docfetching_entrypoint(
# set the indexing attempt ID so that all log messages from this process
# will have it added as a prefix
TaskAttemptSingleton.set_cc_and_index_id(
index_attempt_id, connector_credential_pair_id
token = INDEX_ATTEMPT_INFO_CONTEXTVAR.set(
(connector_credential_pair_id, index_attempt_id)
)
with get_session_with_current_tenant() as db_session:
attempt = transition_attempt_to_in_progress(index_attempt_id, db_session)
@@ -890,6 +890,8 @@ def run_docfetching_entrypoint(
f"credentials='{credential_id}'"
)
INDEX_ATTEMPT_INFO_CONTEXTVAR.reset(token)
def connector_document_extraction(
app: Celery,

View File

@@ -13,6 +13,7 @@ from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
from shared_configs.configs import SLACK_CHANNEL_ID
from shared_configs.configs import TENANT_ID_PREFIX
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
from shared_configs.contextvars import INDEX_ATTEMPT_INFO_CONTEXTVAR
from shared_configs.contextvars import ONYX_REQUEST_ID_CONTEXTVAR
@@ -34,30 +35,6 @@ class LoggerContextVars:
doc_permission_sync_ctx.set(dict())
class TaskAttemptSingleton:
"""Used to tell if this process is an indexing job, and if so what is the
unique identifier for this indexing attempt. For things like the API server,
main background job (scheduler), etc. this will not be used."""
_INDEX_ATTEMPT_ID: None | int = None
_CONNECTOR_CREDENTIAL_PAIR_ID: None | int = None
@classmethod
def get_index_attempt_id(cls) -> None | int:
return cls._INDEX_ATTEMPT_ID
@classmethod
def get_connector_credential_pair_id(cls) -> None | int:
return cls._CONNECTOR_CREDENTIAL_PAIR_ID
@classmethod
def set_cc_and_index_id(
cls, index_attempt_id: int, connector_credential_pair_id: int
) -> None:
cls._INDEX_ATTEMPT_ID = index_attempt_id
cls._CONNECTOR_CREDENTIAL_PAIR_ID = connector_credential_pair_id
def get_log_level_from_str(log_level_str: str = LOG_LEVEL) -> int:
log_level_dict = {
"CRITICAL": logging.CRITICAL,
@@ -102,14 +79,12 @@ class OnyxLoggingAdapter(logging.LoggerAdapter):
msg = f"[Doc Permissions Sync: {doc_permission_sync_ctx_dict['request_id']}] {msg}"
break
index_attempt_id = TaskAttemptSingleton.get_index_attempt_id()
cc_pair_id = TaskAttemptSingleton.get_connector_credential_pair_id()
if index_attempt_id is not None:
msg = f"[Index Attempt: {index_attempt_id}] {msg}"
if cc_pair_id is not None:
msg = f"[CC Pair: {cc_pair_id}] {msg}"
index_attempt_info = INDEX_ATTEMPT_INFO_CONTEXTVAR.get()
if index_attempt_info:
cc_pair_id, index_attempt_id = index_attempt_info
msg = (
f"[Index Attempt: {index_attempt_id}] [CC Pair: {cc_pair_id}] {msg}"
)
break

View File

@@ -21,6 +21,11 @@ ONYX_REQUEST_ID_CONTEXTVAR: contextvars.ContextVar[str | None] = contextvars.Con
"onyx_request_id", default=None
)
# Used to store cc pair id and index attempt id in multithreaded environments
INDEX_ATTEMPT_INFO_CONTEXTVAR: contextvars.ContextVar[tuple[int, int] | None] = (
contextvars.ContextVar("index_attempt_info", default=None)
)
"""Utils related to contextvars"""