Compare commits

...

6 Commits

Author SHA1 Message Date
pablodanswer
103212bbc4 temporary 2024-11-10 11:10:00 -08:00
pablodanswer
2f2aa7a962 minor update to schedule 2024-11-10 11:08:47 -08:00
pablodanswer
2117a17fbc ensure we reset engine across process 2024-11-10 11:04:49 -08:00
pablodanswer
8983a2db50 remove extraneous session creation 2024-11-10 10:59:03 -08:00
pablodanswer
9ce8cb9a1c k 2024-11-09 21:49:07 -08:00
pablodanswer
33ea51f2aa more reasonable limits 2024-11-09 14:18:18 -08:00
14 changed files with 44 additions and 27 deletions

View File

@@ -6,6 +6,7 @@ from celery import signals
from celery import Task
from celery.signals import celeryd_init
from celery.signals import worker_init
from celery.signals import worker_process_init
from celery.signals import worker_ready
from celery.signals import worker_shutdown
@@ -81,6 +82,11 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
app_base.on_worker_shutdown(sender, **kwargs)
@worker_process_init.connect
def init_worker(**kwargs: Any) -> None:
SqlEngine.reset_engine()
@signals.setup_logging.connect
def on_setup_logging(
loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any

View File

@@ -6,27 +6,30 @@ from danswer.configs.constants import DanswerCeleryPriority
tasks_to_schedule = [
{
# 15 light workers can check for 1144 in a minute,
# but previously we were creating (4 * 120 * 15 = 7,200 per minute)
# This should be more reasonably accomplishable.
"name": "check-for-vespa-sync",
"task": "check_for_vespa_sync_task",
"schedule": timedelta(seconds=5),
"schedule": timedelta(minutes=5),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-connector-deletion",
"task": "check_for_connector_deletion_task",
"schedule": timedelta(seconds=20),
"schedule": timedelta(seconds=60),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-indexing",
"task": "check_for_indexing",
"schedule": timedelta(seconds=10),
"schedule": timedelta(seconds=15),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-prune",
"task": "check_for_pruning",
"schedule": timedelta(seconds=10),
"schedule": timedelta(seconds=60),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
@@ -38,7 +41,7 @@ tasks_to_schedule = [
{
"name": "monitor-vespa-sync",
"task": "monitor_vespa_sync",
"schedule": timedelta(seconds=5),
"schedule": timedelta(seconds=60),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
]

View File

@@ -118,15 +118,13 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
embedding_model=embedding_model,
)
cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
cc_pair_ids: list[int] = []
cc_pairs = fetch_connector_credential_pairs(db_session)
for cc_pair_entry in cc_pairs:
cc_pair_ids.append(cc_pair_entry.id)
for cc_pair_id in cc_pair_ids:
redis_connector = RedisConnector(tenant_id, cc_pair_id)
with get_session_with_tenant(tenant_id) as db_session:
for cc_pair_id in cc_pair_ids:
redis_connector = RedisConnector(tenant_id, cc_pair_id)
# Get the primary search settings
primary_search_settings = get_current_search_settings(db_session)
search_settings = [primary_search_settings]

View File

@@ -164,9 +164,18 @@ class SlackbotHandler:
def acquire_tenants(self) -> None:
tenant_ids = get_all_tenant_ids()
# This is temporary
logger.debug(f"Found {len(tenant_ids)} total tenants in Postgres")
allowed_tenant_ids = os.environ.get("ALLOWED_TENANT_IDS")
allowed_tenant_list = (
allowed_tenant_ids.split(",") if allowed_tenant_ids else None
)
for tenant_id in tenant_ids:
if allowed_tenant_list and tenant_id not in allowed_tenant_list:
logger.debug(f"Tenant {tenant_id} not in allowed list, skipping")
continue
if tenant_id in self.tenant_ids:
logger.debug(f"Tenant {tenant_id} already in self.tenant_ids")
continue

View File

@@ -63,6 +63,7 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
stmt = construct_document_select_for_connector_credential_pair_by_needs_sync(
cc_pair.connector_id, cc_pair.credential_id
)
for doc in db_session.scalars(stmt).yield_per(1):
current_time = time.monotonic()
if current_time - last_lock_time >= (

View File

@@ -8,8 +8,8 @@ metadata:
spec:
scaleTargetRef:
name: celery-worker-indexing
minReplicaCount: 1
maxReplicaCount: 10
minReplicaCount: 0
maxReplicaCount: 0
triggers:
- type: redis
metadata:

View File

@@ -8,12 +8,12 @@ metadata:
spec:
scaleTargetRef:
name: celery-worker-light
minReplicaCount: 1
minReplicaCount: 5
maxReplicaCount: 20
triggers:
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync
@@ -23,7 +23,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync:2
@@ -33,7 +33,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync:3
@@ -43,7 +43,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_deletion
@@ -53,7 +53,7 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
host: "master.redisencrypted.4h4xor.use2.cache.amazonaws.com"
port: "6379"
enableTLS: "true"
listName: connector_deletion:2

View File

@@ -10,8 +10,8 @@ spec:
name: celery-worker-primary
pollingInterval: 15 # Check every 15 seconds
cooldownPeriod: 30 # Wait 30 seconds before scaling down
minReplicaCount: 1
maxReplicaCount: 1
minReplicaCount: 0
maxReplicaCount: 0
triggers:
- type: redis
metadata:

View File

@@ -5,5 +5,5 @@ metadata:
namespace: danswer
type: Opaque
data:
host: { { base64-encoded-hostname } }
password: { { base64-encoded-password } }
host: bWFzdGVyLnJlZGlzZW5jcnlwdGVkLjRoNHhvci51c2UyLmNhY2hlLmFtYXpvbmF3cy5jb20=
password: NTBsYnNBdXRoVG9rZW5EYW5zd2Vy

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-beat
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.3
imagePullPolicy: Always
command:
[

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-heavy
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.3
imagePullPolicy: Always
command:
[

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-indexing
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.3
imagePullPolicy: Always
command:
[

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-light
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.3
imagePullPolicy: Always
command:
[

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-primary
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.3
imagePullPolicy: Always
command:
[