Compare commits

...

2 Commits

Author SHA1 Message Date
Richard Kuo (Danswer)
bcca868421 raise beat expiry ... 60 seconds might be starving certain tasks completely 2024-12-23 00:50:51 -08:00
Richard Kuo (Danswer)
314901af4d try fixing exception in cloud 2024-12-23 00:46:16 -08:00
3 changed files with 45 additions and 12 deletions

View File

@@ -4,6 +4,8 @@ from typing import Any
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryTask
BEAT_EXPIRES_DEFAULT = 30 * 60 # 30 minutes
# we set expires because it isn't necessary to queue up these tasks
# it's only important that they run relatively regularly
tasks_to_schedule = [
@@ -13,7 +15,7 @@ tasks_to_schedule = [
"schedule": timedelta(seconds=20),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
@@ -22,7 +24,7 @@ tasks_to_schedule = [
"schedule": timedelta(seconds=20),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
@@ -31,7 +33,7 @@ tasks_to_schedule = [
"schedule": timedelta(seconds=15),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
@@ -40,7 +42,7 @@ tasks_to_schedule = [
"schedule": timedelta(seconds=15),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
@@ -49,7 +51,7 @@ tasks_to_schedule = [
"schedule": timedelta(seconds=3600),
"options": {
"priority": OnyxCeleryPriority.LOWEST,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
@@ -58,7 +60,7 @@ tasks_to_schedule = [
"schedule": timedelta(seconds=5),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
@@ -67,7 +69,7 @@ tasks_to_schedule = [
"schedule": timedelta(seconds=30),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
@@ -76,7 +78,7 @@ tasks_to_schedule = [
"schedule": timedelta(seconds=20),
"options": {
"priority": OnyxCeleryPriority.HIGH,
"expires": 60,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
]

View File

@@ -1,6 +1,7 @@
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from time import sleep
from uuid import uuid4
from celery import Celery
@@ -219,6 +220,33 @@ def connector_permission_sync_generator_task(
r = get_redis_client(tenant_id=tenant_id)
while True:
if not redis_connector.permissions.fenced: # The fence must exist
raise ValueError(
f"connector_permission_sync_generator_task - fence not found: "
f"fence={redis_connector.permissions.fence_key}"
)
payload = redis_connector.permissions.payload # The payload must exist
if not payload:
raise ValueError(
"connector_permission_sync_generator_task: payload invalid or not found"
)
if payload.celery_task_id is None:
logger.info(
f"connector_permission_sync_generator_task - Waiting for fence: "
f"fence={redis_connector.permissions.fence_key}"
)
sleep(1)
continue
logger.info(
f"connector_permission_sync_generator_task - Fence found, continuing...: "
f"fence={redis_connector.permissions.fence_key}"
)
break
lock: RedisLock = r.lock(
OnyxRedisLocks.CONNECTOR_DOC_PERMISSIONS_SYNC_LOCK_PREFIX
+ f"_{redis_connector.id}",
@@ -254,8 +282,11 @@ def connector_permission_sync_generator_task(
if not payload:
raise ValueError(f"No fence payload found: cc_pair={cc_pair_id}")
payload.started = datetime.now(timezone.utc)
redis_connector.permissions.set_fence(payload)
new_payload = RedisConnectorPermissionSyncPayload(
started=datetime.now(timezone.utc),
celery_task_id=payload.celery_task_id,
)
redis_connector.permissions.set_fence(new_payload)
document_external_accesses: list[DocExternalAccess] = doc_sync_func(cc_pair)

View File

@@ -97,7 +97,7 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
r = get_redis_client(tenant_id=tenant_id)
lock_beat = r.lock(
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_EXTERNAL_GROUP_SYNC_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)
@@ -162,7 +162,7 @@ def try_creating_external_group_sync_task(
LOCK_TIMEOUT = 30
lock = r.lock(
lock: RedisLock = r.lock(
DANSWER_REDIS_FUNCTION_LOCK_PREFIX + "try_generate_external_group_sync_tasks",
timeout=LOCK_TIMEOUT,
)