mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-03-24 17:12:44 +00:00
Compare commits
10 Commits
bo/query_p
...
v0.12.0-cl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5503a06eb7 | ||
|
|
d26e9725af | ||
|
|
65a9a95462 | ||
|
|
8263b8ae7a | ||
|
|
103212bbc4 | ||
|
|
2f2aa7a962 | ||
|
|
2117a17fbc | ||
|
|
8983a2db50 | ||
|
|
9ce8cb9a1c | ||
|
|
33ea51f2aa |
@@ -55,6 +55,7 @@ class DynamicTenantScheduler(PersistentScheduler):
|
||||
logger.info("Fetching all tenant IDs")
|
||||
tenant_ids = get_all_tenant_ids()
|
||||
logger.info(f"Found {len(tenant_ids)} tenants")
|
||||
logger.info(f"Tenant IDs: {', '.join(str(id) for id in tenant_ids)}")
|
||||
|
||||
logger.info("Fetching tasks to schedule")
|
||||
tasks_to_schedule = fetch_versioned_implementation(
|
||||
|
||||
@@ -6,6 +6,7 @@ from celery import signals
|
||||
from celery import Task
|
||||
from celery.signals import celeryd_init
|
||||
from celery.signals import worker_init
|
||||
from celery.signals import worker_process_init
|
||||
from celery.signals import worker_ready
|
||||
from celery.signals import worker_shutdown
|
||||
|
||||
@@ -81,6 +82,11 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
|
||||
app_base.on_worker_shutdown(sender, **kwargs)
|
||||
|
||||
|
||||
@worker_process_init.connect
|
||||
def init_worker(**kwargs: Any) -> None:
|
||||
SqlEngine.reset_engine()
|
||||
|
||||
|
||||
@signals.setup_logging.connect
|
||||
def on_setup_logging(
|
||||
loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any
|
||||
|
||||
@@ -1,96 +0,0 @@
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
|
||||
from celery.beat import PersistentScheduler # type: ignore
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
from danswer.db.engine import get_all_tenant_ids
|
||||
from danswer.utils.variable_functionality import fetch_versioned_implementation
|
||||
|
||||
logger = get_task_logger(__name__)
|
||||
|
||||
|
||||
class DynamicTenantScheduler(PersistentScheduler):
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
self._reload_interval = timedelta(minutes=1)
|
||||
self._last_reload = self.app.now() - self._reload_interval
|
||||
|
||||
def setup_schedule(self) -> None:
|
||||
super().setup_schedule()
|
||||
|
||||
def tick(self) -> float:
|
||||
retval = super().tick()
|
||||
now = self.app.now()
|
||||
if (
|
||||
self._last_reload is None
|
||||
or (now - self._last_reload) > self._reload_interval
|
||||
):
|
||||
logger.info("Reloading schedule to check for new tenants...")
|
||||
self._update_tenant_tasks()
|
||||
self._last_reload = now
|
||||
return retval
|
||||
|
||||
def _update_tenant_tasks(self) -> None:
|
||||
logger.info("Checking for tenant task updates...")
|
||||
try:
|
||||
tenant_ids = get_all_tenant_ids()
|
||||
tasks_to_schedule = fetch_versioned_implementation(
|
||||
"danswer.background.celery.tasks.beat_schedule", "get_tasks_to_schedule"
|
||||
)
|
||||
|
||||
new_beat_schedule: dict[str, dict[str, Any]] = {}
|
||||
|
||||
current_schedule = getattr(self, "_store", {"entries": {}}).get(
|
||||
"entries", {}
|
||||
)
|
||||
|
||||
existing_tenants = set()
|
||||
for task_name in current_schedule.keys():
|
||||
if "-" in task_name:
|
||||
existing_tenants.add(task_name.split("-")[-1])
|
||||
|
||||
for tenant_id in tenant_ids:
|
||||
if tenant_id not in existing_tenants:
|
||||
logger.info(f"Found new tenant: {tenant_id}")
|
||||
|
||||
for task in tasks_to_schedule():
|
||||
task_name = f"{task['name']}-{tenant_id}"
|
||||
new_task = {
|
||||
"task": task["task"],
|
||||
"schedule": task["schedule"],
|
||||
"kwargs": {"tenant_id": tenant_id},
|
||||
}
|
||||
if options := task.get("options"):
|
||||
new_task["options"] = options
|
||||
new_beat_schedule[task_name] = new_task
|
||||
|
||||
if self._should_update_schedule(current_schedule, new_beat_schedule):
|
||||
logger.info(
|
||||
"Updating schedule",
|
||||
extra={
|
||||
"new_tasks": len(new_beat_schedule),
|
||||
"current_tasks": len(current_schedule),
|
||||
},
|
||||
)
|
||||
if not hasattr(self, "_store"):
|
||||
self._store: dict[str, dict] = {"entries": {}}
|
||||
self.update_from_dict(new_beat_schedule)
|
||||
logger.info(f"New schedule: {new_beat_schedule}")
|
||||
|
||||
logger.info("Tenant tasks updated successfully")
|
||||
else:
|
||||
logger.debug("No schedule updates needed")
|
||||
|
||||
except (AttributeError, KeyError):
|
||||
logger.exception("Failed to process task configuration")
|
||||
except Exception:
|
||||
logger.exception("Unexpected error updating tenant tasks")
|
||||
|
||||
def _should_update_schedule(
|
||||
self, current_schedule: dict, new_schedule: dict
|
||||
) -> bool:
|
||||
"""Compare schedules to determine if an update is needed."""
|
||||
current_tasks = set(current_schedule.keys())
|
||||
new_tasks = set(new_schedule.keys())
|
||||
return current_tasks != new_tasks
|
||||
@@ -6,27 +6,30 @@ from danswer.configs.constants import DanswerCeleryPriority
|
||||
|
||||
tasks_to_schedule = [
|
||||
{
|
||||
# 15 light workers can check for 1144 in a minute,
|
||||
# but previously we were creating (4 * 120 * 15 = 7,200 per minute)
|
||||
# This should be more reasonably accomplishable.
|
||||
"name": "check-for-vespa-sync",
|
||||
"task": "check_for_vespa_sync_task",
|
||||
"schedule": timedelta(seconds=5),
|
||||
"schedule": timedelta(minutes=5),
|
||||
"options": {"priority": DanswerCeleryPriority.HIGH},
|
||||
},
|
||||
{
|
||||
"name": "check-for-connector-deletion",
|
||||
"task": "check_for_connector_deletion_task",
|
||||
"schedule": timedelta(seconds=20),
|
||||
"schedule": timedelta(seconds=60),
|
||||
"options": {"priority": DanswerCeleryPriority.HIGH},
|
||||
},
|
||||
{
|
||||
"name": "check-for-indexing",
|
||||
"task": "check_for_indexing",
|
||||
"schedule": timedelta(seconds=10),
|
||||
"schedule": timedelta(seconds=15),
|
||||
"options": {"priority": DanswerCeleryPriority.HIGH},
|
||||
},
|
||||
{
|
||||
"name": "check-for-prune",
|
||||
"task": "check_for_pruning",
|
||||
"schedule": timedelta(seconds=10),
|
||||
"schedule": timedelta(seconds=60),
|
||||
"options": {"priority": DanswerCeleryPriority.HIGH},
|
||||
},
|
||||
{
|
||||
@@ -38,7 +41,7 @@ tasks_to_schedule = [
|
||||
{
|
||||
"name": "monitor-vespa-sync",
|
||||
"task": "monitor_vespa_sync",
|
||||
"schedule": timedelta(seconds=5),
|
||||
"schedule": timedelta(seconds=60),
|
||||
"options": {"priority": DanswerCeleryPriority.HIGH},
|
||||
},
|
||||
]
|
||||
|
||||
@@ -118,15 +118,13 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
embedding_model=embedding_model,
|
||||
)
|
||||
|
||||
cc_pair_ids: list[int] = []
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
cc_pair_ids: list[int] = []
|
||||
cc_pairs = fetch_connector_credential_pairs(db_session)
|
||||
for cc_pair_entry in cc_pairs:
|
||||
cc_pair_ids.append(cc_pair_entry.id)
|
||||
|
||||
for cc_pair_id in cc_pair_ids:
|
||||
redis_connector = RedisConnector(tenant_id, cc_pair_id)
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
for cc_pair_id in cc_pair_ids:
|
||||
redis_connector = RedisConnector(tenant_id, cc_pair_id)
|
||||
# Get the primary search settings
|
||||
primary_search_settings = get_current_search_settings(db_session)
|
||||
search_settings = [primary_search_settings]
|
||||
|
||||
@@ -29,18 +29,26 @@ JobStatusType = (
|
||||
def _initializer(
|
||||
func: Callable, args: list | tuple, kwargs: dict[str, Any] | None = None
|
||||
) -> Any:
|
||||
"""Ensure the parent proc's database connections are not touched
|
||||
in the new connection pool
|
||||
"""Initialize the child process with a fresh SQLAlchemy Engine.
|
||||
|
||||
Based on the recommended approach in the SQLAlchemy docs found:
|
||||
Based on SQLAlchemy's recommendations to handle multiprocessing:
|
||||
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
|
||||
"""
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
|
||||
logger.info("Initializing spawned worker child process.")
|
||||
|
||||
# Reset the engine in the child process
|
||||
SqlEngine.reset_engine()
|
||||
|
||||
# Optionally set a custom app name for database logging purposes
|
||||
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
|
||||
|
||||
# Initialize a new engine with desired parameters
|
||||
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
|
||||
|
||||
# Proceed with executing the target function
|
||||
return func(*args, **kwargs)
|
||||
|
||||
|
||||
|
||||
@@ -164,9 +164,18 @@ class SlackbotHandler:
|
||||
|
||||
def acquire_tenants(self) -> None:
|
||||
tenant_ids = get_all_tenant_ids()
|
||||
|
||||
# This is temporary
|
||||
logger.debug(f"Found {len(tenant_ids)} total tenants in Postgres")
|
||||
allowed_tenant_ids = os.environ.get("ALLOWED_TENANT_IDS")
|
||||
allowed_tenant_list = (
|
||||
allowed_tenant_ids.split(",") if allowed_tenant_ids else None
|
||||
)
|
||||
|
||||
for tenant_id in tenant_ids:
|
||||
if allowed_tenant_list and tenant_id not in allowed_tenant_list:
|
||||
logger.debug(f"Tenant {tenant_id} not in allowed list, skipping")
|
||||
continue
|
||||
if tenant_id in self.tenant_ids:
|
||||
logger.debug(f"Tenant {tenant_id} already in self.tenant_ids")
|
||||
continue
|
||||
|
||||
@@ -189,6 +189,13 @@ class SqlEngine:
|
||||
return ""
|
||||
return cls._app_name
|
||||
|
||||
@classmethod
|
||||
def reset_engine(cls) -> None:
|
||||
with cls._lock:
|
||||
if cls._engine:
|
||||
cls._engine.dispose()
|
||||
cls._engine = None
|
||||
|
||||
|
||||
def get_all_tenant_ids() -> list[str] | list[None]:
|
||||
if not MULTI_TENANT:
|
||||
@@ -204,10 +211,18 @@ def get_all_tenant_ids() -> list[str] | list[None]:
|
||||
)
|
||||
tenant_ids = [row[0] for row in result]
|
||||
|
||||
ignored_tenants = [
|
||||
"tenant_i-0dbc199db55ad29a3",
|
||||
"tenant_i-043470d740845ec56",
|
||||
"tenant_i-06a83ec44869babc4",
|
||||
"tenant_i-06064a5b18ca0bb7b",
|
||||
]
|
||||
|
||||
valid_tenants = [
|
||||
tenant
|
||||
for tenant in tenant_ids
|
||||
if tenant is None or tenant.startswith(TENANT_ID_PREFIX)
|
||||
if tenant is None
|
||||
or (tenant.startswith(TENANT_ID_PREFIX) and tenant not in ignored_tenants)
|
||||
]
|
||||
|
||||
return valid_tenants
|
||||
|
||||
@@ -63,6 +63,7 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
|
||||
stmt = construct_document_select_for_connector_credential_pair_by_needs_sync(
|
||||
cc_pair.connector_id, cc_pair.credential_id
|
||||
)
|
||||
|
||||
for doc in db_session.scalars(stmt).yield_per(1):
|
||||
current_time = time.monotonic()
|
||||
if current_time - last_lock_time >= (
|
||||
|
||||
@@ -9,12 +9,12 @@ spec:
|
||||
scaleTargetRef:
|
||||
name: celery-worker-indexing
|
||||
minReplicaCount: 1
|
||||
maxReplicaCount: 10
|
||||
maxReplicaCount: 30
|
||||
triggers:
|
||||
- type: redis
|
||||
metadata:
|
||||
sslEnabled: "true"
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: connector_indexing
|
||||
@@ -25,7 +25,7 @@ spec:
|
||||
- type: redis
|
||||
metadata:
|
||||
sslEnabled: "true"
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: connector_indexing:2
|
||||
@@ -36,7 +36,7 @@ spec:
|
||||
- type: redis
|
||||
metadata:
|
||||
sslEnabled: "true"
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: connector_indexing:3
|
||||
@@ -44,3 +44,12 @@ spec:
|
||||
databaseIndex: "15"
|
||||
authenticationRef:
|
||||
name: celery-worker-auth
|
||||
- type: cpu
|
||||
metadata:
|
||||
type: Utilization
|
||||
value: "70"
|
||||
|
||||
- type: memory
|
||||
metadata:
|
||||
type: Utilization
|
||||
value: "70"
|
||||
|
||||
@@ -8,12 +8,12 @@ metadata:
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
name: celery-worker-light
|
||||
minReplicaCount: 1
|
||||
minReplicaCount: 5
|
||||
maxReplicaCount: 20
|
||||
triggers:
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: vespa_metadata_sync
|
||||
@@ -23,7 +23,7 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: vespa_metadata_sync:2
|
||||
@@ -33,7 +33,7 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: vespa_metadata_sync:3
|
||||
@@ -43,7 +43,7 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: connector_deletion
|
||||
@@ -53,7 +53,7 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: connector_deletion:2
|
||||
|
||||
@@ -11,11 +11,11 @@ spec:
|
||||
pollingInterval: 15 # Check every 15 seconds
|
||||
cooldownPeriod: 30 # Wait 30 seconds before scaling down
|
||||
minReplicaCount: 1
|
||||
maxReplicaCount: 1
|
||||
maxReplicaCount: 10
|
||||
triggers:
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: celery
|
||||
@@ -26,7 +26,7 @@ spec:
|
||||
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: celery:1
|
||||
@@ -36,7 +36,7 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: celery:2
|
||||
@@ -46,7 +46,7 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: celery:3
|
||||
@@ -56,7 +56,7 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: periodic_tasks
|
||||
@@ -66,7 +66,7 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: periodic_tasks:2
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
apiVersion: keda.sh/v1alpha1
|
||||
kind: ScaledObject
|
||||
metadata:
|
||||
name: indexing-model-server-scaledobject
|
||||
namespace: danswer
|
||||
labels:
|
||||
app: indexing-model-server
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
name: indexing-model-server-deployment
|
||||
pollingInterval: 15 # Check every 15 seconds
|
||||
cooldownPeriod: 30 # Wait 30 seconds before scaling down
|
||||
minReplicaCount: 1
|
||||
maxReplicaCount: 12
|
||||
triggers:
|
||||
- type: cpu
|
||||
metadata:
|
||||
type: Utilization
|
||||
value: "70"
|
||||
@@ -5,5 +5,5 @@ metadata:
|
||||
namespace: danswer
|
||||
type: Opaque
|
||||
data:
|
||||
host: { { base64-encoded-hostname } }
|
||||
password: { { base64-encoded-password } }
|
||||
host: bWFzdGVyLnJlZGlzZW5jcnlwdGVkLjRoNHhvci51c2UyLmNhY2hlLmFtYXpvbmF3cy5jb20=
|
||||
password: NTBsYnNBdXRoVG9rZW5EYW5zd2Vy
|
||||
|
||||
@@ -14,7 +14,7 @@ spec:
|
||||
spec:
|
||||
containers:
|
||||
- name: celery-beat
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8
|
||||
imagePullPolicy: Always
|
||||
command:
|
||||
[
|
||||
|
||||
@@ -14,7 +14,7 @@ spec:
|
||||
spec:
|
||||
containers:
|
||||
- name: celery-worker-heavy
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8
|
||||
imagePullPolicy: Always
|
||||
command:
|
||||
[
|
||||
|
||||
@@ -14,7 +14,7 @@ spec:
|
||||
spec:
|
||||
containers:
|
||||
- name: celery-worker-indexing
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8
|
||||
imagePullPolicy: Always
|
||||
command:
|
||||
[
|
||||
@@ -47,10 +47,10 @@ spec:
|
||||
resources:
|
||||
requests:
|
||||
cpu: "500m"
|
||||
memory: "1Gi"
|
||||
memory: "4Gi"
|
||||
limits:
|
||||
cpu: "1000m"
|
||||
memory: "2Gi"
|
||||
memory: "8Gi"
|
||||
volumes:
|
||||
- name: vespa-certificates
|
||||
secret:
|
||||
|
||||
@@ -14,7 +14,7 @@ spec:
|
||||
spec:
|
||||
containers:
|
||||
- name: celery-worker-light
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8
|
||||
imagePullPolicy: Always
|
||||
command:
|
||||
[
|
||||
|
||||
@@ -14,7 +14,7 @@ spec:
|
||||
spec:
|
||||
containers:
|
||||
- name: celery-worker-primary
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8
|
||||
imagePullPolicy: Always
|
||||
command:
|
||||
[
|
||||
|
||||
Reference in New Issue
Block a user