Compare commits

...

1 Commits

Author SHA1 Message Date
Justin Tahara
50f14f58ed feat(infra): Add WAF implementation (#5213) (#5217)
* feat(infra): Add WAF implementation

* Addressing greptile comments

* Additional removal of unnecessary code
2025-08-19 18:54:49 -07:00
2 changed files with 57 additions and 0 deletions

View File

@@ -11,11 +11,13 @@ from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from pydantic import BaseModel
from redis.lock import Lock as RedisLock
from sqlalchemy import func
from sqlalchemy import select
from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
@@ -33,6 +35,8 @@ from onyx.background.indexing.checkpointing_utils import (
get_index_attempts_with_old_checkpoints,
)
from onyx.configs.app_configs import MANAGED_VESPA
from onyx.configs.app_configs import MAX_ACTIVE_DOCFETCHING_ATTEMPTS
from onyx.configs.app_configs import MAX_DOCFETCHING_QUEUE_LENGTH
from onyx.configs.app_configs import VESPA_CLOUD_CERT_PATH
from onyx.configs.app_configs import VESPA_CLOUD_KEY_PATH
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
@@ -687,6 +691,48 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
lock_beat.reacquire()
with get_session_with_current_tenant() as db_session:
# Backpressure: prevent unbounded growth of docfetching tasks
try:
# 1) Broker queue length cap
r_celery = self.app.broker_connection().channel().client # type: ignore
current_df_queue_len = celery_get_queue_length(
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery
)
if current_df_queue_len >= MAX_DOCFETCHING_QUEUE_LENGTH:
task_logger.warning(
"check_for_indexing - Skipping enqueue due to docfetching queue cap: "
f"len={current_df_queue_len} cap={MAX_DOCFETCHING_QUEUE_LENGTH}"
)
return tasks_created
# 2) Active attempts cap (NOT_STARTED/IN_PROGRESS)
active_attempts_count = (
db_session.execute(
select(func.count(IndexAttempt.id)).where(
IndexAttempt.status.in_(
[
IndexingStatus.NOT_STARTED,
IndexingStatus.IN_PROGRESS,
]
)
)
)
.scalars()
.first()
or 0
)
if active_attempts_count >= MAX_ACTIVE_DOCFETCHING_ATTEMPTS:
task_logger.warning(
"check_for_indexing - Skipping enqueue due to active attempts cap: "
f"active={active_attempts_count} cap={MAX_ACTIVE_DOCFETCHING_ATTEMPTS}"
)
return tasks_created
except Exception:
# Best-effort backpressure; do not fail task if metrics fetch fails
task_logger.exception(
"check_for_indexing - backpressure check failed"
)
for search_settings_instance in search_settings_list:
# skip non-live search settings that don't have background reindex enabled
# those should just auto-change to live shortly after creation without

View File

@@ -359,6 +359,17 @@ DB_YIELD_PER_DEFAULT = 64
#####
POLL_CONNECTOR_OFFSET = 30 # Minutes overlap between poll windows
# Backpressure caps to prevent unbounded docfetching queue growth
# Maximum number of concurrently active (NOT_STARTED or IN_PROGRESS) indexing attempts across the tenant
MAX_ACTIVE_DOCFETCHING_ATTEMPTS = int(
os.environ.get("MAX_ACTIVE_DOCFETCHING_ATTEMPTS") or 64
)
# Maximum queued docfetching tasks allowed in the broker before we stop enqueueing new ones
MAX_DOCFETCHING_QUEUE_LENGTH = int(
os.environ.get("MAX_DOCFETCHING_QUEUE_LENGTH") or 200
)
# View the list here:
# https://github.com/onyx-dot-app/onyx/blob/main/backend/onyx/connectors/factory.py
# If this is empty, all connectors are enabled, this is an option for security heavy orgs where