Compare commits

...

9 Commits

Author SHA1 Message Date
pablodanswer
d26e9725af k 2024-11-10 13:05:50 -08:00
pablodanswer
65a9a95462 reset children processes 2024-11-10 11:47:15 -08:00
pablodanswer
8263b8ae7a ensure all pushed 2024-11-10 11:19:09 -08:00
pablodanswer
103212bbc4 temporary 2024-11-10 11:10:00 -08:00
pablodanswer
2f2aa7a962 minor update to schedule 2024-11-10 11:08:47 -08:00
pablodanswer
2117a17fbc ensure we reset engine across process 2024-11-10 11:04:49 -08:00
pablodanswer
8983a2db50 remove extraneous session creation 2024-11-10 10:59:03 -08:00
pablodanswer
9ce8cb9a1c k 2024-11-09 21:49:07 -08:00
pablodanswer
33ea51f2aa more reasonable limits 2024-11-09 14:18:18 -08:00
17 changed files with 109 additions and 41 deletions

View File

@@ -6,6 +6,7 @@ from celery import signals
from celery import Task
from celery.signals import celeryd_init
from celery.signals import worker_init
from celery.signals import worker_process_init
from celery.signals import worker_ready
from celery.signals import worker_shutdown
@@ -81,6 +82,11 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
app_base.on_worker_shutdown(sender, **kwargs)
@worker_process_init.connect
def init_worker(**kwargs: Any) -> None:
SqlEngine.reset_engine()
@signals.setup_logging.connect
def on_setup_logging(
loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any

View File

@@ -6,27 +6,30 @@ from danswer.configs.constants import DanswerCeleryPriority
tasks_to_schedule = [
{
# 15 light workers can check for 1144 in a minute,
# but previously we were creating (4 * 120 * 15 = 7,200 per minute)
# This should be more reasonably accomplishable.
"name": "check-for-vespa-sync",
"task": "check_for_vespa_sync_task",
"schedule": timedelta(seconds=5),
"schedule": timedelta(minutes=5),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-connector-deletion",
"task": "check_for_connector_deletion_task",
"schedule": timedelta(seconds=20),
"schedule": timedelta(seconds=60),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-indexing",
"task": "check_for_indexing",
"schedule": timedelta(seconds=10),
"schedule": timedelta(seconds=15),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-prune",
"task": "check_for_pruning",
"schedule": timedelta(seconds=10),
"schedule": timedelta(seconds=60),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
@@ -38,7 +41,7 @@ tasks_to_schedule = [
{
"name": "monitor-vespa-sync",
"task": "monitor_vespa_sync",
"schedule": timedelta(seconds=5),
"schedule": timedelta(seconds=60),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
]

View File

@@ -118,15 +118,13 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
embedding_model=embedding_model,
)
cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
cc_pair_ids: list[int] = []
cc_pairs = fetch_connector_credential_pairs(db_session)
for cc_pair_entry in cc_pairs:
cc_pair_ids.append(cc_pair_entry.id)
for cc_pair_id in cc_pair_ids:
redis_connector = RedisConnector(tenant_id, cc_pair_id)
with get_session_with_tenant(tenant_id) as db_session:
for cc_pair_id in cc_pair_ids:
redis_connector = RedisConnector(tenant_id, cc_pair_id)
# Get the primary search settings
primary_search_settings = get_current_search_settings(db_session)
search_settings = [primary_search_settings]

View File

@@ -29,18 +29,26 @@ JobStatusType = (
def _initializer(
func: Callable, args: list | tuple, kwargs: dict[str, Any] | None = None
) -> Any:
"""Ensure the parent proc's database connections are not touched
in the new connection pool
"""Initialize the child process with a fresh SQLAlchemy Engine.
Based on the recommended approach in the SQLAlchemy docs found:
Based on SQLAlchemy's recommendations to handle multiprocessing:
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
"""
if kwargs is None:
kwargs = {}
logger.info("Initializing spawned worker child process.")
# Reset the engine in the child process
SqlEngine.reset_engine()
# Optionally set a custom app name for database logging purposes
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
# Initialize a new engine with desired parameters
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
# Proceed with executing the target function
return func(*args, **kwargs)

View File

@@ -164,9 +164,18 @@ class SlackbotHandler:
def acquire_tenants(self) -> None:
tenant_ids = get_all_tenant_ids()
# This is temporary
logger.debug(f"Found {len(tenant_ids)} total tenants in Postgres")
allowed_tenant_ids = os.environ.get("ALLOWED_TENANT_IDS")
allowed_tenant_list = (
allowed_tenant_ids.split(",") if allowed_tenant_ids else None
)
for tenant_id in tenant_ids:
if allowed_tenant_list and tenant_id not in allowed_tenant_list:
logger.debug(f"Tenant {tenant_id} not in allowed list, skipping")
continue
if tenant_id in self.tenant_ids:
logger.debug(f"Tenant {tenant_id} already in self.tenant_ids")
continue

View File

@@ -189,6 +189,13 @@ class SqlEngine:
return ""
return cls._app_name
@classmethod
def reset_engine(cls) -> None:
with cls._lock:
if cls._engine:
cls._engine.dispose()
cls._engine = None
def get_all_tenant_ids() -> list[str] | list[None]:
if not MULTI_TENANT:
@@ -204,10 +211,18 @@ def get_all_tenant_ids() -> list[str] | list[None]:
)
tenant_ids = [row[0] for row in result]
ignored_tenants = [
"tenant_i-0dbc199db55ad29a3",
"tenant_i-043470d740845ec56",
"tenant_i-06a83ec44869babc4",
"tenant_i-06064a5b18ca0bb7b",
]
valid_tenants = [
tenant
for tenant in tenant_ids
if tenant is None or tenant.startswith(TENANT_ID_PREFIX)
if tenant is None
or (tenant.startswith(TENANT_ID_PREFIX) and tenant not in ignored_tenants)
]
return valid_tenants

View File

@@ -63,6 +63,7 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
stmt = construct_document_select_for_connector_credential_pair_by_needs_sync(
cc_pair.connector_id, cc_pair.credential_id
)
for doc in db_session.scalars(stmt).yield_per(1):
current_time = time.monotonic()
if current_time - last_lock_time >= (

View File

@@ -9,12 +9,12 @@ spec:
scaleTargetRef:
name: celery-worker-indexing
minReplicaCount: 1
maxReplicaCount: 10
maxReplicaCount: 30
triggers:
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_indexing
@@ -25,7 +25,7 @@ spec:
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_indexing:2
@@ -36,7 +36,7 @@ spec:
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_indexing:3
@@ -44,3 +44,12 @@ spec:
databaseIndex: "15"
authenticationRef:
name: celery-worker-auth
- type: cpu
metadata:
type: Utilization
value: "70"
- type: memory
metadata:
type: Utilization
value: "70"

View File

@@ -8,12 +8,12 @@ metadata:
spec:
scaleTargetRef:
name: celery-worker-light
minReplicaCount: 1
minReplicaCount: 5
maxReplicaCount: 20
triggers:
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync
@@ -23,7 +23,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync:2
@@ -33,7 +33,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync:3
@@ -43,7 +43,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_deletion
@@ -53,7 +53,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_deletion:2

View File

@@ -10,12 +10,12 @@ spec:
name: celery-worker-primary
pollingInterval: 15 # Check every 15 seconds
cooldownPeriod: 30 # Wait 30 seconds before scaling down
minReplicaCount: 1
maxReplicaCount: 1
minReplicaCount: 0
maxReplicaCount: 0
triggers:
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: celery
@@ -26,7 +26,7 @@ spec:
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: celery:1
@@ -36,7 +36,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: celery:2
@@ -46,7 +46,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: celery:3
@@ -56,7 +56,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: periodic_tasks
@@ -66,7 +66,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: periodic_tasks:2

View File

@@ -0,0 +1,19 @@
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: indexing-model-server-scaledobject
namespace: danswer
labels:
app: indexing-model-server
spec:
scaleTargetRef:
name: indexing-model-server-deployment
pollingInterval: 15 # Check every 15 seconds
cooldownPeriod: 30 # Wait 30 seconds before scaling down
minReplicaCount: 1
maxReplicaCount: 5
triggers:
- type: cpu
metadata:
type: Utilization
value: "70"

View File

@@ -5,5 +5,5 @@ metadata:
namespace: danswer
type: Opaque
data:
host: { { base64-encoded-hostname } }
password: { { base64-encoded-password } }
host: bWFzdGVyLnJlZGlzZW5jcnlwdGVkLjRoNHhvci51c2UyLmNhY2hlLmFtYXpvbmF3cy5jb20=
password: NTBsYnNBdXRoVG9rZW5EYW5zd2Vy

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-beat
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.7
imagePullPolicy: Always
command:
[

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-heavy
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.7
imagePullPolicy: Always
command:
[

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-indexing
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.7
imagePullPolicy: Always
command:
[
@@ -47,10 +47,10 @@ spec:
resources:
requests:
cpu: "500m"
memory: "1Gi"
memory: "4Gi"
limits:
cpu: "1000m"
memory: "2Gi"
memory: "8Gi"
volumes:
- name: vespa-certificates
secret:

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-light
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.7
imagePullPolicy: Always
command:
[

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-primary
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.7
imagePullPolicy: Always
command:
[