Compare commits

...

10 Commits

Author SHA1 Message Date
pablodanswer
5503a06eb7 log! 2024-11-10 14:48:46 -08:00
pablodanswer
d26e9725af k 2024-11-10 13:05:50 -08:00
pablodanswer
65a9a95462 reset children processes 2024-11-10 11:47:15 -08:00
pablodanswer
8263b8ae7a ensure all pushed 2024-11-10 11:19:09 -08:00
pablodanswer
103212bbc4 temporary 2024-11-10 11:10:00 -08:00
pablodanswer
2f2aa7a962 minor update to schedule 2024-11-10 11:08:47 -08:00
pablodanswer
2117a17fbc ensure we reset engine across process 2024-11-10 11:04:49 -08:00
pablodanswer
8983a2db50 remove extraneous session creation 2024-11-10 10:59:03 -08:00
pablodanswer
9ce8cb9a1c k 2024-11-09 21:49:07 -08:00
pablodanswer
33ea51f2aa more reasonable limits 2024-11-09 14:18:18 -08:00
19 changed files with 109 additions and 136 deletions

View File

@@ -55,6 +55,7 @@ class DynamicTenantScheduler(PersistentScheduler):
logger.info("Fetching all tenant IDs")
tenant_ids = get_all_tenant_ids()
logger.info(f"Found {len(tenant_ids)} tenants")
logger.info(f"Tenant IDs: {', '.join(str(id) for id in tenant_ids)}")
logger.info("Fetching tasks to schedule")
tasks_to_schedule = fetch_versioned_implementation(

View File

@@ -6,6 +6,7 @@ from celery import signals
from celery import Task
from celery.signals import celeryd_init
from celery.signals import worker_init
from celery.signals import worker_process_init
from celery.signals import worker_ready
from celery.signals import worker_shutdown
@@ -81,6 +82,11 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
app_base.on_worker_shutdown(sender, **kwargs)
@worker_process_init.connect
def init_worker(**kwargs: Any) -> None:
SqlEngine.reset_engine()
@signals.setup_logging.connect
def on_setup_logging(
loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any

View File

@@ -1,96 +0,0 @@
from datetime import timedelta
from typing import Any
from celery.beat import PersistentScheduler # type: ignore
from celery.utils.log import get_task_logger
from danswer.db.engine import get_all_tenant_ids
from danswer.utils.variable_functionality import fetch_versioned_implementation
logger = get_task_logger(__name__)
class DynamicTenantScheduler(PersistentScheduler):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._reload_interval = timedelta(minutes=1)
self._last_reload = self.app.now() - self._reload_interval
def setup_schedule(self) -> None:
super().setup_schedule()
def tick(self) -> float:
retval = super().tick()
now = self.app.now()
if (
self._last_reload is None
or (now - self._last_reload) > self._reload_interval
):
logger.info("Reloading schedule to check for new tenants...")
self._update_tenant_tasks()
self._last_reload = now
return retval
def _update_tenant_tasks(self) -> None:
logger.info("Checking for tenant task updates...")
try:
tenant_ids = get_all_tenant_ids()
tasks_to_schedule = fetch_versioned_implementation(
"danswer.background.celery.tasks.beat_schedule", "get_tasks_to_schedule"
)
new_beat_schedule: dict[str, dict[str, Any]] = {}
current_schedule = getattr(self, "_store", {"entries": {}}).get(
"entries", {}
)
existing_tenants = set()
for task_name in current_schedule.keys():
if "-" in task_name:
existing_tenants.add(task_name.split("-")[-1])
for tenant_id in tenant_ids:
if tenant_id not in existing_tenants:
logger.info(f"Found new tenant: {tenant_id}")
for task in tasks_to_schedule():
task_name = f"{task['name']}-{tenant_id}"
new_task = {
"task": task["task"],
"schedule": task["schedule"],
"kwargs": {"tenant_id": tenant_id},
}
if options := task.get("options"):
new_task["options"] = options
new_beat_schedule[task_name] = new_task
if self._should_update_schedule(current_schedule, new_beat_schedule):
logger.info(
"Updating schedule",
extra={
"new_tasks": len(new_beat_schedule),
"current_tasks": len(current_schedule),
},
)
if not hasattr(self, "_store"):
self._store: dict[str, dict] = {"entries": {}}
self.update_from_dict(new_beat_schedule)
logger.info(f"New schedule: {new_beat_schedule}")
logger.info("Tenant tasks updated successfully")
else:
logger.debug("No schedule updates needed")
except (AttributeError, KeyError):
logger.exception("Failed to process task configuration")
except Exception:
logger.exception("Unexpected error updating tenant tasks")
def _should_update_schedule(
self, current_schedule: dict, new_schedule: dict
) -> bool:
"""Compare schedules to determine if an update is needed."""
current_tasks = set(current_schedule.keys())
new_tasks = set(new_schedule.keys())
return current_tasks != new_tasks

View File

@@ -6,27 +6,30 @@ from danswer.configs.constants import DanswerCeleryPriority
tasks_to_schedule = [
{
# 15 light workers can check for 1144 in a minute,
# but previously we were creating (4 * 120 * 15 = 7,200 per minute)
# This should be more reasonably accomplishable.
"name": "check-for-vespa-sync",
"task": "check_for_vespa_sync_task",
"schedule": timedelta(seconds=5),
"schedule": timedelta(minutes=5),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-connector-deletion",
"task": "check_for_connector_deletion_task",
"schedule": timedelta(seconds=20),
"schedule": timedelta(seconds=60),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-indexing",
"task": "check_for_indexing",
"schedule": timedelta(seconds=10),
"schedule": timedelta(seconds=15),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-prune",
"task": "check_for_pruning",
"schedule": timedelta(seconds=10),
"schedule": timedelta(seconds=60),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
@@ -38,7 +41,7 @@ tasks_to_schedule = [
{
"name": "monitor-vespa-sync",
"task": "monitor_vespa_sync",
"schedule": timedelta(seconds=5),
"schedule": timedelta(seconds=60),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
]

View File

@@ -118,15 +118,13 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
embedding_model=embedding_model,
)
cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
cc_pair_ids: list[int] = []
cc_pairs = fetch_connector_credential_pairs(db_session)
for cc_pair_entry in cc_pairs:
cc_pair_ids.append(cc_pair_entry.id)
for cc_pair_id in cc_pair_ids:
redis_connector = RedisConnector(tenant_id, cc_pair_id)
with get_session_with_tenant(tenant_id) as db_session:
for cc_pair_id in cc_pair_ids:
redis_connector = RedisConnector(tenant_id, cc_pair_id)
# Get the primary search settings
primary_search_settings = get_current_search_settings(db_session)
search_settings = [primary_search_settings]

View File

@@ -29,18 +29,26 @@ JobStatusType = (
def _initializer(
func: Callable, args: list | tuple, kwargs: dict[str, Any] | None = None
) -> Any:
"""Ensure the parent proc's database connections are not touched
in the new connection pool
"""Initialize the child process with a fresh SQLAlchemy Engine.
Based on the recommended approach in the SQLAlchemy docs found:
Based on SQLAlchemy's recommendations to handle multiprocessing:
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
"""
if kwargs is None:
kwargs = {}
logger.info("Initializing spawned worker child process.")
# Reset the engine in the child process
SqlEngine.reset_engine()
# Optionally set a custom app name for database logging purposes
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
# Initialize a new engine with desired parameters
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
# Proceed with executing the target function
return func(*args, **kwargs)

View File

@@ -164,9 +164,18 @@ class SlackbotHandler:
def acquire_tenants(self) -> None:
tenant_ids = get_all_tenant_ids()
# This is temporary
logger.debug(f"Found {len(tenant_ids)} total tenants in Postgres")
allowed_tenant_ids = os.environ.get("ALLOWED_TENANT_IDS")
allowed_tenant_list = (
allowed_tenant_ids.split(",") if allowed_tenant_ids else None
)
for tenant_id in tenant_ids:
if allowed_tenant_list and tenant_id not in allowed_tenant_list:
logger.debug(f"Tenant {tenant_id} not in allowed list, skipping")
continue
if tenant_id in self.tenant_ids:
logger.debug(f"Tenant {tenant_id} already in self.tenant_ids")
continue

View File

@@ -189,6 +189,13 @@ class SqlEngine:
return ""
return cls._app_name
@classmethod
def reset_engine(cls) -> None:
with cls._lock:
if cls._engine:
cls._engine.dispose()
cls._engine = None
def get_all_tenant_ids() -> list[str] | list[None]:
if not MULTI_TENANT:
@@ -204,10 +211,18 @@ def get_all_tenant_ids() -> list[str] | list[None]:
)
tenant_ids = [row[0] for row in result]
ignored_tenants = [
"tenant_i-0dbc199db55ad29a3",
"tenant_i-043470d740845ec56",
"tenant_i-06a83ec44869babc4",
"tenant_i-06064a5b18ca0bb7b",
]
valid_tenants = [
tenant
for tenant in tenant_ids
if tenant is None or tenant.startswith(TENANT_ID_PREFIX)
if tenant is None
or (tenant.startswith(TENANT_ID_PREFIX) and tenant not in ignored_tenants)
]
return valid_tenants

View File

@@ -63,6 +63,7 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
stmt = construct_document_select_for_connector_credential_pair_by_needs_sync(
cc_pair.connector_id, cc_pair.credential_id
)
for doc in db_session.scalars(stmt).yield_per(1):
current_time = time.monotonic()
if current_time - last_lock_time >= (

View File

@@ -9,12 +9,12 @@ spec:
scaleTargetRef:
name: celery-worker-indexing
minReplicaCount: 1
maxReplicaCount: 10
maxReplicaCount: 30
triggers:
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_indexing
@@ -25,7 +25,7 @@ spec:
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_indexing:2
@@ -36,7 +36,7 @@ spec:
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_indexing:3
@@ -44,3 +44,12 @@ spec:
databaseIndex: "15"
authenticationRef:
name: celery-worker-auth
- type: cpu
metadata:
type: Utilization
value: "70"
- type: memory
metadata:
type: Utilization
value: "70"

View File

@@ -8,12 +8,12 @@ metadata:
spec:
scaleTargetRef:
name: celery-worker-light
minReplicaCount: 1
minReplicaCount: 5
maxReplicaCount: 20
triggers:
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync
@@ -23,7 +23,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync:2
@@ -33,7 +33,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync:3
@@ -43,7 +43,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_deletion
@@ -53,7 +53,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_deletion:2

View File

@@ -11,11 +11,11 @@ spec:
pollingInterval: 15 # Check every 15 seconds
cooldownPeriod: 30 # Wait 30 seconds before scaling down
minReplicaCount: 1
maxReplicaCount: 1
maxReplicaCount: 10
triggers:
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: celery
@@ -26,7 +26,7 @@ spec:
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: celery:1
@@ -36,7 +36,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: celery:2
@@ -46,7 +46,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: celery:3
@@ -56,7 +56,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: periodic_tasks
@@ -66,7 +66,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: periodic_tasks:2

View File

@@ -0,0 +1,19 @@
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: indexing-model-server-scaledobject
namespace: danswer
labels:
app: indexing-model-server
spec:
scaleTargetRef:
name: indexing-model-server-deployment
pollingInterval: 15 # Check every 15 seconds
cooldownPeriod: 30 # Wait 30 seconds before scaling down
minReplicaCount: 1
maxReplicaCount: 12
triggers:
- type: cpu
metadata:
type: Utilization
value: "70"

View File

@@ -5,5 +5,5 @@ metadata:
namespace: danswer
type: Opaque
data:
host: { { base64-encoded-hostname } }
password: { { base64-encoded-password } }
host: bWFzdGVyLnJlZGlzZW5jcnlwdGVkLjRoNHhvci51c2UyLmNhY2hlLmFtYXpvbmF3cy5jb20=
password: NTBsYnNBdXRoVG9rZW5EYW5zd2Vy

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-beat
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8
imagePullPolicy: Always
command:
[

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-heavy
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8
imagePullPolicy: Always
command:
[

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-indexing
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8
imagePullPolicy: Always
command:
[
@@ -47,10 +47,10 @@ spec:
resources:
requests:
cpu: "500m"
memory: "1Gi"
memory: "4Gi"
limits:
cpu: "1000m"
memory: "2Gi"
memory: "8Gi"
volumes:
- name: vespa-certificates
secret:

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-light
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8
imagePullPolicy: Always
command:
[

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-primary
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8
imagePullPolicy: Always
command:
[