fix: remove user file indexing from launch, add init imports for all celery tasks, bump sandbox memory limits (#8443)

This commit is contained in:
Wenxi
2026-02-13 13:15:30 -08:00
committed by GitHub
parent 049e8ef0e2
commit 49a35f8aaa
6 changed files with 76 additions and 9 deletions

4
.vscode/launch.json vendored
View File

@@ -275,7 +275,7 @@
"--loglevel=INFO",
"--hostname=background@%n",
"-Q",
"vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup,index_attempt_cleanup,docprocessing,connector_doc_fetching,user_files_indexing,connector_pruning,connector_doc_permissions_sync,connector_external_group_sync,csv_generation,kg_processing,monitoring,user_file_processing,user_file_project_sync,user_file_delete,opensearch_migration"
"vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup,index_attempt_cleanup,docprocessing,connector_doc_fetching,connector_pruning,connector_doc_permissions_sync,connector_external_group_sync,csv_generation,kg_processing,monitoring,user_file_processing,user_file_project_sync,user_file_delete,opensearch_migration"
],
"presentation": {
"group": "2"
@@ -419,7 +419,7 @@
"--loglevel=INFO",
"--hostname=docfetching@%n",
"-Q",
"connector_doc_fetching,user_files_indexing"
"connector_doc_fetching"
],
"presentation": {
"group": "2"

View File

@@ -0,0 +1,10 @@
"""Celery tasks for hierarchy fetching."""
from onyx.background.celery.tasks.hierarchyfetching.tasks import ( # noqa: F401
check_for_hierarchy_fetching,
)
from onyx.background.celery.tasks.hierarchyfetching.tasks import ( # noqa: F401
connector_hierarchy_fetching_task,
)
__all__ = ["check_for_hierarchy_fetching", "connector_hierarchy_fetching_task"]

View File

@@ -146,14 +146,26 @@ def _collect_queue_metrics(redis_celery: Redis) -> list[Metric]:
"""Collect metrics about queue lengths for different Celery queues"""
metrics = []
queue_mappings = {
"celery_queue_length": "celery",
"docprocessing_queue_length": "docprocessing",
"sync_queue_length": "sync",
"deletion_queue_length": "deletion",
"pruning_queue_length": "pruning",
"celery_queue_length": OnyxCeleryQueues.PRIMARY,
"docprocessing_queue_length": OnyxCeleryQueues.DOCPROCESSING,
"docfetching_queue_length": OnyxCeleryQueues.CONNECTOR_DOC_FETCHING,
"sync_queue_length": OnyxCeleryQueues.VESPA_METADATA_SYNC,
"deletion_queue_length": OnyxCeleryQueues.CONNECTOR_DELETION,
"pruning_queue_length": OnyxCeleryQueues.CONNECTOR_PRUNING,
"permissions_sync_queue_length": OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC,
"external_group_sync_queue_length": OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC,
"permissions_upsert_queue_length": OnyxCeleryQueues.DOC_PERMISSIONS_UPSERT,
"hierarchy_fetching_queue_length": OnyxCeleryQueues.CONNECTOR_HIERARCHY_FETCHING,
"llm_model_update_queue_length": OnyxCeleryQueues.LLM_MODEL_UPDATE,
"checkpoint_cleanup_queue_length": OnyxCeleryQueues.CHECKPOINT_CLEANUP,
"index_attempt_cleanup_queue_length": OnyxCeleryQueues.INDEX_ATTEMPT_CLEANUP,
"csv_generation_queue_length": OnyxCeleryQueues.CSV_GENERATION,
"user_file_processing_queue_length": OnyxCeleryQueues.USER_FILE_PROCESSING,
"user_file_project_sync_queue_length": OnyxCeleryQueues.USER_FILE_PROJECT_SYNC,
"user_file_delete_queue_length": OnyxCeleryQueues.USER_FILE_DELETE,
"monitoring_queue_length": OnyxCeleryQueues.MONITORING,
"sandbox_queue_length": OnyxCeleryQueues.SANDBOX,
"opensearch_migration_queue_length": OnyxCeleryQueues.OPENSEARCH_MIGRATION,
}
for name, queue in queue_mappings.items():
@@ -881,7 +893,7 @@ def monitor_celery_queues_helper(
"""A task to monitor all celery queue lengths."""
r_celery = task.app.broker_connection().channel().client # type: ignore
n_celery = celery_get_queue_length("celery", r_celery)
n_celery = celery_get_queue_length(OnyxCeleryQueues.PRIMARY, r_celery)
n_docfetching = celery_get_queue_length(
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery
)
@@ -908,6 +920,26 @@ def monitor_celery_queues_helper(
n_permissions_upsert = celery_get_queue_length(
OnyxCeleryQueues.DOC_PERMISSIONS_UPSERT, r_celery
)
n_hierarchy_fetching = celery_get_queue_length(
OnyxCeleryQueues.CONNECTOR_HIERARCHY_FETCHING, r_celery
)
n_llm_model_update = celery_get_queue_length(
OnyxCeleryQueues.LLM_MODEL_UPDATE, r_celery
)
n_checkpoint_cleanup = celery_get_queue_length(
OnyxCeleryQueues.CHECKPOINT_CLEANUP, r_celery
)
n_index_attempt_cleanup = celery_get_queue_length(
OnyxCeleryQueues.INDEX_ATTEMPT_CLEANUP, r_celery
)
n_csv_generation = celery_get_queue_length(
OnyxCeleryQueues.CSV_GENERATION, r_celery
)
n_monitoring = celery_get_queue_length(OnyxCeleryQueues.MONITORING, r_celery)
n_sandbox = celery_get_queue_length(OnyxCeleryQueues.SANDBOX, r_celery)
n_opensearch_migration = celery_get_queue_length(
OnyxCeleryQueues.OPENSEARCH_MIGRATION, r_celery
)
n_docfetching_prefetched = celery_get_unacked_task_ids(
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery
@@ -931,6 +963,14 @@ def monitor_celery_queues_helper(
f"permissions_sync={n_permissions_sync} "
f"external_group_sync={n_external_group_sync} "
f"permissions_upsert={n_permissions_upsert} "
f"hierarchy_fetching={n_hierarchy_fetching} "
f"llm_model_update={n_llm_model_update} "
f"checkpoint_cleanup={n_checkpoint_cleanup} "
f"index_attempt_cleanup={n_index_attempt_cleanup} "
f"csv_generation={n_csv_generation} "
f"monitoring={n_monitoring} "
f"sandbox={n_sandbox} "
f"opensearch_migration={n_opensearch_migration} "
)

View File

@@ -0,0 +1,8 @@
"""Celery tasks for connector pruning."""
from onyx.background.celery.tasks.pruning.tasks import check_for_pruning # noqa: F401
from onyx.background.celery.tasks.pruning.tasks import ( # noqa: F401
connector_pruning_generator_task,
)
__all__ = ["check_for_pruning", "connector_pruning_generator_task"]

View File

@@ -532,7 +532,7 @@ done
],
resources=client.V1ResourceRequirements(
requests={"cpu": "1000m", "memory": "2Gi"},
limits={"cpu": "4000m", "memory": "8Gi"},
limits={"cpu": "2000m", "memory": "10Gi"},
),
# TODO: Re-enable probes when sandbox container runs actual services.
# Note: Next.js ports are now per-session (dynamic), so container-level

View File

@@ -1 +1,10 @@
"""Celery tasks for sandbox management."""
from onyx.server.features.build.sandbox.tasks.tasks import (
cleanup_idle_sandboxes_task,
) # noqa: F401
from onyx.server.features.build.sandbox.tasks.tasks import (
sync_sandbox_files,
) # noqa: F401
__all__ = ["cleanup_idle_sandboxes_task", "sync_sandbox_files"]