Compare commits

...

13 Commits

Author SHA1 Message Date
Richard Kuo (Danswer)
ef07f505ae missed removed scan_iter, also remove other scan_iters and replace with sscan_iter of the lookup table 2025-02-06 15:19:28 -08:00
Richard Kuo (Danswer)
2be59a4bb8 remove scan_iter from pruning 2025-02-06 14:46:31 -08:00
Richard Kuo (Danswer)
8a0943a3b3 log the payload id 2025-02-06 14:27:32 -08:00
Richard Kuo (Danswer)
46f36aaeb9 fix pruning 2025-02-06 12:35:48 -08:00
Richard Kuo (Danswer)
d1c978069a backport fix for pruning check 2025-02-06 10:42:23 -08:00
Richard Kuo (Danswer)
d7d33695a7 Merge branch 'main' of https://github.com/onyx-dot-app/onyx into feature/more_lock_validation 2025-02-06 10:32:25 -08:00
Richard Kuo (Danswer)
d61b0f977f Merge branch 'main' of https://github.com/onyx-dot-app/onyx into feature/more_lock_validation 2025-02-06 09:21:22 -08:00
Richard Kuo (Danswer)
ac9a6da737 get external group sync validation working 2025-02-05 02:35:18 -08:00
Richard Kuo (Danswer)
0a341e2f7e Merge branch 'main' of https://github.com/onyx-dot-app/onyx into feature/more_lock_validation 2025-02-04 18:14:56 -08:00
Richard Kuo (Danswer)
06245fa742 fix missing class 2025-02-04 16:28:14 -08:00
Richard Kuo (Danswer)
bc1da21bd9 Merge branch 'main' of https://github.com/onyx-dot-app/onyx into feature/more_lock_validation 2025-02-04 16:13:07 -08:00
Richard Kuo (Danswer)
a2f514c943 Merge branch 'main' of https://github.com/onyx-dot-app/onyx into feature/more_lock_validation
# Conflicts:
#	backend/onyx/configs/constants.py
#	backend/onyx/redis/redis_connector_prune.py
2025-02-04 16:12:27 -08:00
Richard Kuo (Danswer)
e1b1bce04e add validation for pruning 2025-02-03 01:40:23 -08:00
10 changed files with 480 additions and 91 deletions

View File

@@ -659,7 +659,7 @@ def validate_permission_sync_fence(
f"tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
)
# we're only active if tasks_scanned > 0 and tasks_not_in_celery == 0
# we're active if there are still tasks to run and those tasks all exist in celery
if tasks_scanned > 0 and tasks_not_in_celery == 0:
redis_connector.permissions.set_active()
return
@@ -680,7 +680,8 @@ def validate_permission_sync_fence(
"validate_permission_sync_fence - "
"Resetting fence because no associated celery tasks were found: "
f"cc_pair={cc_pair_id} "
f"fence={fence_key}"
f"fence={fence_key} "
f"payload_id={payload.id}"
)
redis_connector.permissions.reset()

View File

@@ -2,15 +2,17 @@ import time
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from typing import Any
from typing import cast
from uuid import uuid4
from celery import Celery
from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from pydantic import ValidationError
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from ee.onyx.db.connector_credential_pair import get_all_auto_sync_cc_pairs
from ee.onyx.db.connector_credential_pair import get_cc_pairs_by_source
@@ -32,7 +34,9 @@ from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.db.connector import mark_cc_pair_as_external_group_synced
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.engine import get_session_with_tenant
@@ -49,7 +53,8 @@ from onyx.redis.redis_connector_ext_group_sync import (
RedisConnectorExternalGroupSyncPayload,
)
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.server.utils import make_short_id
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -107,11 +112,11 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
bind=True,
)
def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)
# we need to use celery's redis client to access its redis data
# (which lives on a different db number)
# r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
r = get_redis_client(tenant_id=tenant_id)
r_replica = get_redis_replica_client(tenant_id=tenant_id)
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_EXTERNAL_GROUP_SYNC_BEAT_LOCK,
@@ -149,30 +154,32 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool
lock_beat.reacquire()
for cc_pair_id in cc_pair_ids_to_sync:
tasks_created = try_creating_external_group_sync_task(
payload_id = try_creating_external_group_sync_task(
self.app, cc_pair_id, r, tenant_id
)
if not tasks_created:
if not payload_id:
continue
task_logger.info(f"External group sync queued: cc_pair={cc_pair_id}")
task_logger.info(
f"External group sync queued: cc_pair={cc_pair_id} id={payload_id}"
)
# we want to run this less frequently than the overall task
# lock_beat.reacquire()
# if not r.exists(OnyxRedisSignals.VALIDATE_EXTERNAL_GROUP_SYNC_FENCES):
# # clear any indexing fences that don't have associated celery tasks in progress
# # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# # or be currently executing
# try:
# validate_external_group_sync_fences(
# tenant_id, self.app, r, r_celery, lock_beat
# )
# except Exception:
# task_logger.exception(
# "Exception while validating external group sync fences"
# )
lock_beat.reacquire()
if not r.exists(OnyxRedisSignals.BLOCK_VALIDATE_EXTERNAL_GROUP_SYNC_FENCES):
# clear fences that don't have associated celery tasks in progress
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
validate_external_group_sync_fences(
tenant_id, self.app, r, r_replica, r_celery, lock_beat
)
except Exception:
task_logger.exception(
"Exception while validating external group sync fences"
)
# r.set(OnyxRedisSignals.VALIDATE_EXTERNAL_GROUP_SYNC_FENCES, 1, ex=60)
r.set(OnyxRedisSignals.BLOCK_VALIDATE_EXTERNAL_GROUP_SYNC_FENCES, 1, ex=300)
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
@@ -191,9 +198,11 @@ def try_creating_external_group_sync_task(
cc_pair_id: int,
r: Redis,
tenant_id: str | None,
) -> int | None:
) -> str | None:
"""Returns an int if syncing is needed. The int represents the number of sync tasks generated.
Returns None if no syncing is required."""
payload_id: str | None = None
redis_connector = RedisConnector(tenant_id, cc_pair_id)
LOCK_TIMEOUT = 30
@@ -216,6 +225,7 @@ def try_creating_external_group_sync_task(
redis_connector.external_group_sync.taskset_clear()
payload = RedisConnectorExternalGroupSyncPayload(
id=make_short_id(),
submitted=datetime.now(timezone.utc),
started=None,
celery_task_id=None,
@@ -245,6 +255,8 @@ def try_creating_external_group_sync_task(
payload.celery_task_id = result.id
redis_connector.external_group_sync.set_fence(payload)
payload_id = payload.celery_task_id
except Exception:
task_logger.exception(
f"Unexpected exception while trying to create external group sync task: cc_pair={cc_pair_id}"
@@ -254,7 +266,7 @@ def try_creating_external_group_sync_task(
if lock.owned():
lock.release()
return 1
return payload_id
@shared_task(
@@ -406,27 +418,32 @@ def validate_external_group_sync_fences(
tenant_id: str | None,
celery_app: Celery,
r: Redis,
r_replica: Redis,
r_celery: Redis,
lock_beat: RedisLock,
) -> None:
reserved_sync_tasks = celery_get_unacked_task_ids(
reserved_tasks = celery_get_unacked_task_ids(
OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, r_celery
)
# validate all existing indexing jobs
for key_bytes in r.scan_iter(
RedisConnectorExternalGroupSync.FENCE_PREFIX + "*",
count=SCAN_ITER_COUNT_DEFAULT,
):
# validate all existing external group sync tasks
lock_beat.reacquire()
keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES))
for key in keys:
key_bytes = cast(bytes, key)
key_str = key_bytes.decode("utf-8")
if not key_str.startswith(RedisConnectorExternalGroupSync.FENCE_PREFIX):
continue
validate_external_group_sync_fence(
tenant_id,
key_bytes,
reserved_tasks,
r_celery,
)
lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session:
validate_external_group_sync_fence(
tenant_id,
key_bytes,
reserved_sync_tasks,
r_celery,
db_session,
)
return
@@ -435,7 +452,6 @@ def validate_external_group_sync_fence(
key_bytes: bytes,
reserved_tasks: set[str],
r_celery: Redis,
db_session: Session,
) -> None:
"""Checks for the error condition where an indexing fence is set but the associated celery tasks don't exist.
This can happen if the indexing worker hard crashes or is terminated.
@@ -478,26 +494,26 @@ def validate_external_group_sync_fence(
if not redis_connector.external_group_sync.fenced:
return
payload = redis_connector.external_group_sync.payload
if not payload:
return
# OK, there's actually something for us to validate
if payload.celery_task_id is None:
# the fence is just barely set up.
# if redis_connector_index.active():
# return
# it would be odd to get here as there isn't that much that can go wrong during
# initial fence setup, but it's still worth making sure we can recover
logger.info(
try:
payload = redis_connector.external_group_sync.payload
except ValidationError:
task_logger.exception(
"validate_external_group_sync_fence - "
f"Resetting fence in basic state without any activity: fence={fence_key}"
"Resetting fence because fence schema is out of date: "
f"cc_pair={cc_pair_id} "
f"fence={fence_key}"
)
redis_connector.external_group_sync.reset()
return
if not payload:
return
if not payload.celery_task_id:
return
# OK, there's actually something for us to validate
found = celery_find_task(
payload.celery_task_id, OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, r_celery
)
@@ -527,7 +543,8 @@ def validate_external_group_sync_fence(
"validate_external_group_sync_fence - "
"Resetting fence because no associated celery tasks were found: "
f"cc_pair={cc_pair_id} "
f"fence={fence_key}"
f"fence={fence_key} "
f"payload_id={payload.id}"
)
redis_connector.external_group_sync.reset()

View File

@@ -423,8 +423,8 @@ def connector_indexing_task(
# define a callback class
callback = IndexingCallback(
os.getppid(),
redis_connector.stop.fence_key,
redis_connector_index.generator_progress_key,
redis_connector,
redis_connector_index,
lock,
r,
)

View File

@@ -99,16 +99,16 @@ class IndexingCallback(IndexingHeartbeatInterface):
def __init__(
self,
parent_pid: int,
stop_key: str,
generator_progress_key: str,
redis_connector: RedisConnector,
redis_connector_index: RedisConnectorIndex,
redis_lock: RedisLock,
redis_client: Redis,
):
super().__init__()
self.parent_pid = parent_pid
self.redis_connector: RedisConnector = redis_connector
self.redis_connector_index: RedisConnectorIndex = redis_connector_index
self.redis_lock: RedisLock = redis_lock
self.stop_key: str = stop_key
self.generator_progress_key: str = generator_progress_key
self.redis_client = redis_client
self.started: datetime = datetime.now(timezone.utc)
self.redis_lock.reacquire()
@@ -120,7 +120,7 @@ class IndexingCallback(IndexingHeartbeatInterface):
self.last_parent_check = time.monotonic()
def should_stop(self) -> bool:
if self.redis_client.exists(self.stop_key):
if self.redis_connector.stop.fenced:
return True
return False
@@ -143,6 +143,8 @@ class IndexingCallback(IndexingHeartbeatInterface):
# self.last_parent_check = now
try:
self.redis_connector.prune.set_active()
current_time = time.monotonic()
if current_time - self.last_lock_monotonic >= (
CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4
@@ -165,7 +167,9 @@ class IndexingCallback(IndexingHeartbeatInterface):
redis_lock_dump(self.redis_lock, self.redis_client)
raise
self.redis_client.incrby(self.generator_progress_key, amount)
self.redis_client.incrby(
self.redis_connector_index.generator_progress_key, amount
)
def validate_indexing_fence(

View File

@@ -1,28 +1,39 @@
import time
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from typing import Any
from typing import cast
from uuid import uuid4
from celery import Celery
from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from pydantic import ValidationError
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_redis import celery_get_queued_task_ids
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.celery_utils import extract_ids_from_runnable_connector
from onyx.background.celery.tasks.indexing.utils import IndexingCallback
from onyx.configs.app_configs import ALLOW_SIMULTANEOUS_PRUNING
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_PRUNING_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT
from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.connectors.factory import instantiate_connector
from onyx.connectors.models import InputType
from onyx.db.connector import mark_ccpair_as_pruned
@@ -35,10 +46,15 @@ from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.models import ConnectorCredentialPair
from onyx.db.search_settings import get_current_search_settings
from onyx.db.sync_record import insert_sync_record
from onyx.db.sync_record import update_sync_record_status
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_prune import RedisConnectorPrune
from onyx.redis.redis_connector_prune import RedisConnectorPrunePayload
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.server.utils import make_short_id
from onyx.utils.logger import LoggerContextVars
from onyx.utils.logger import pruning_ctx
from onyx.utils.logger import setup_logger
@@ -93,6 +109,8 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
)
def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
r = get_redis_client(tenant_id=tenant_id)
r_replica = get_redis_replica_client(tenant_id=tenant_id)
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK,
@@ -123,13 +141,28 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
if not _is_pruning_due(cc_pair):
continue
tasks_created = try_creating_prune_generator_task(
payload_id = try_creating_prune_generator_task(
self.app, cc_pair, db_session, r, tenant_id
)
if not tasks_created:
if not payload_id:
continue
task_logger.info(f"Pruning queued: cc_pair={cc_pair.id}")
task_logger.info(
f"Pruning queued: cc_pair={cc_pair.id} id={payload_id}"
)
# we want to run this less frequently than the overall task
lock_beat.reacquire()
if not r.exists(OnyxRedisSignals.BLOCK_VALIDATE_PRUNING_FENCES):
# clear any permission fences that don't have associated celery tasks in progress
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
validate_pruning_fences(tenant_id, r, r_replica, r_celery, lock_beat)
except Exception:
task_logger.exception("Exception while validating pruning fences")
r.set(OnyxRedisSignals.BLOCK_VALIDATE_PRUNING_FENCES, 1, ex=300)
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
@@ -149,7 +182,7 @@ def try_creating_prune_generator_task(
db_session: Session,
r: Redis,
tenant_id: str | None,
) -> int | None:
) -> str | None:
"""Checks for any conditions that should block the pruning generator task from being
created, then creates the task.
@@ -168,7 +201,7 @@ def try_creating_prune_generator_task(
# we need to serialize starting pruning since it can be triggered either via
# celery beat or manually (API call)
lock = r.lock(
lock: RedisLock = r.lock(
DANSWER_REDIS_FUNCTION_LOCK_PREFIX + "try_creating_prune_generator_task",
timeout=LOCK_TIMEOUT,
)
@@ -200,7 +233,17 @@ def try_creating_prune_generator_task(
custom_task_id = f"{redis_connector.prune.generator_task_key}_{uuid4()}"
celery_app.send_task(
# set a basic fence to start
redis_connector.prune.set_active()
payload = RedisConnectorPrunePayload(
id=make_short_id(),
submitted=datetime.now(timezone.utc),
started=None,
celery_task_id=None,
)
redis_connector.prune.set_fence(payload)
result = celery_app.send_task(
OnyxCeleryTask.CONNECTOR_PRUNING_GENERATOR_TASK,
kwargs=dict(
cc_pair_id=cc_pair.id,
@@ -221,8 +264,12 @@ def try_creating_prune_generator_task(
sync_type=SyncType.PRUNING,
)
# set this only after all tasks have been added
redis_connector.prune.set_fence(True)
# fill in the celery task id
redis_connector.prune.set_active()
payload.celery_task_id = result.id
redis_connector.prune.set_fence(payload)
payload_id = payload.id
except Exception:
task_logger.exception(f"Unexpected exception: cc_pair={cc_pair.id}")
return None
@@ -230,7 +277,7 @@ def try_creating_prune_generator_task(
if lock.owned():
lock.release()
return 1
return payload_id
@shared_task(
@@ -265,6 +312,43 @@ def connector_pruning_generator_task(
r = get_redis_client(tenant_id=tenant_id)
# this wait is needed to avoid a race condition where
# the primary worker sends the task and it is immediately executed
# before the primary worker can finalize the fence
start = time.monotonic()
while True:
if time.monotonic() - start > CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT:
raise ValueError(
f"connector_prune_generator_task - timed out waiting for fence to be ready: "
f"fence={redis_connector.prune.fence_key}"
)
if not redis_connector.prune.fenced: # The fence must exist
raise ValueError(
f"connector_prune_generator_task - fence not found: "
f"fence={redis_connector.prune.fence_key}"
)
payload = redis_connector.prune.payload # The payload must exist
if not payload:
raise ValueError(
"connector_prune_generator_task: payload invalid or not found"
)
if payload.celery_task_id is None:
logger.info(
f"connector_prune_generator_task - Waiting for fence: "
f"fence={redis_connector.prune.fence_key}"
)
time.sleep(1)
continue
logger.info(
f"connector_prune_generator_task - Fence found, continuing...: "
f"fence={redis_connector.prune.fence_key}"
)
break
# set thread_local=False since we don't control what thread the indexing/pruning
# might run our callback with
lock: RedisLock = r.lock(
@@ -294,6 +378,18 @@ def connector_pruning_generator_task(
)
return
payload = redis_connector.prune.payload
if not payload:
raise ValueError(f"No fence payload found: cc_pair={cc_pair_id}")
new_payload = RedisConnectorPrunePayload(
id=payload.id,
submitted=payload.submitted,
started=datetime.now(timezone.utc),
celery_task_id=payload.celery_task_id,
)
redis_connector.prune.set_fence(new_payload)
task_logger.info(
f"Pruning generator running connector: "
f"cc_pair={cc_pair_id} "
@@ -307,10 +403,13 @@ def connector_pruning_generator_task(
cc_pair.credential,
)
search_settings = get_current_search_settings(db_session)
redis_connector_index = redis_connector.new_index(search_settings.id)
callback = IndexingCallback(
0,
redis_connector.stop.fence_key,
redis_connector.prune.generator_progress_key,
redis_connector,
redis_connector_index,
lock,
r,
)
@@ -415,4 +514,184 @@ def monitor_ccpair_pruning_taskset(
redis_connector.prune.taskset_clear()
redis_connector.prune.generator_clear()
redis_connector.prune.set_fence(False)
redis_connector.prune.set_fence(None)
def validate_pruning_fences(
tenant_id: str | None,
r: Redis,
r_replica: Redis,
r_celery: Redis,
lock_beat: RedisLock,
) -> None:
# building lookup table can be expensive, so we won't bother
# validating until the queue is small
PERMISSION_SYNC_VALIDATION_MAX_QUEUE_LEN = 1024
queue_len = celery_get_queue_length(OnyxCeleryQueues.CONNECTOR_DELETION, r_celery)
if queue_len > PERMISSION_SYNC_VALIDATION_MAX_QUEUE_LEN:
return
# the queue for a single pruning generator task
reserved_generator_tasks = celery_get_unacked_task_ids(
OnyxCeleryQueues.CONNECTOR_PRUNING, r_celery
)
# the queue for a reasonably large set of lightweight deletion tasks
queued_upsert_tasks = celery_get_queued_task_ids(
OnyxCeleryQueues.CONNECTOR_DELETION, r_celery
)
# Use replica for this because the worst thing that happens
# is that we don't run the validation on this pass
keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES))
for key in keys:
key_bytes = cast(bytes, key)
key_str = key_bytes.decode("utf-8")
if not key_str.startswith(RedisConnectorPrune.FENCE_PREFIX):
continue
validate_pruning_fence(
tenant_id,
key_bytes,
reserved_generator_tasks,
queued_upsert_tasks,
r,
r_celery,
)
lock_beat.reacquire()
return
def validate_pruning_fence(
tenant_id: str | None,
key_bytes: bytes,
reserved_tasks: set[str],
queued_tasks: set[str],
r: Redis,
r_celery: Redis,
) -> None:
"""See validate_indexing_fence for an overall idea of validation flows.
queued_tasks: the celery queue of lightweight permission sync tasks
reserved_tasks: prefetched tasks for sync task generator
"""
# if the fence doesn't exist, there's nothing to do
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
if cc_pair_id_str is None:
task_logger.warning(
f"validate_pruning_fence - could not parse id from {fence_key}"
)
return
cc_pair_id = int(cc_pair_id_str)
# parse out metadata and initialize the helper class with it
redis_connector = RedisConnector(tenant_id, int(cc_pair_id))
# check to see if the fence/payload exists
if not redis_connector.prune.fenced:
return
# in the cloud, the payload format may have changed ...
# it's a little sloppy, but just reset the fence for now if that happens
# TODO: add intentional cleanup/abort logic
try:
payload = redis_connector.prune.payload
except ValidationError:
task_logger.exception(
"validate_pruning_fence - "
"Resetting fence because fence schema is out of date: "
f"cc_pair={cc_pair_id} "
f"fence={fence_key}"
)
redis_connector.prune.reset()
return
if not payload:
return
if not payload.celery_task_id:
return
# OK, there's actually something for us to validate
# either the generator task must be in flight or its subtasks must be
found = celery_find_task(
payload.celery_task_id,
OnyxCeleryQueues.CONNECTOR_PRUNING,
r_celery,
)
if found:
# the celery task exists in the redis queue
redis_connector.prune.set_active()
return
if payload.celery_task_id in reserved_tasks:
# the celery task was prefetched and is reserved within a worker
redis_connector.prune.set_active()
return
# look up every task in the current taskset in the celery queue
# every entry in the taskset should have an associated entry in the celery task queue
# because we get the celery tasks first, the entries in our own pruning taskset
# should be roughly a subset of the tasks in celery
# this check isn't very exact, but should be sufficient over a period of time
# A single successful check over some number of attempts is sufficient.
# TODO: if the number of tasks in celery is much lower than than the taskset length
# we might be able to shortcut the lookup since by definition some of the tasks
# must not exist in celery.
tasks_scanned = 0
tasks_not_in_celery = 0 # a non-zero number after completing our check is bad
for member in r.sscan_iter(redis_connector.prune.taskset_key):
tasks_scanned += 1
member_bytes = cast(bytes, member)
member_str = member_bytes.decode("utf-8")
if member_str in queued_tasks:
continue
if member_str in reserved_tasks:
continue
tasks_not_in_celery += 1
task_logger.info(
"validate_pruning_fence task check: "
f"tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
)
# we're active if there are still tasks to run and those tasks all exist in celery
if tasks_scanned > 0 and tasks_not_in_celery == 0:
redis_connector.prune.set_active()
return
# we may want to enable this check if using the active task list somehow isn't good enough
# if redis_connector_index.generator_locked():
# logger.info(f"{payload.celery_task_id} is currently executing.")
# if we get here, we didn't find any direct indication that the associated celery tasks exist,
# but they still might be there due to gaps in our ability to check states during transitions
# Checking the active signal safeguards us against these transition periods
# (which has a duration that allows us to bridge those gaps)
if redis_connector.prune.active():
return
# celery tasks don't exist and the active signal has expired, possibly due to a crash. Clean it up.
task_logger.warning(
"validate_pruning_fence - "
"Resetting fence because no associated celery tasks were found: "
f"cc_pair={cc_pair_id} "
f"fence={fence_key} "
f"payload_id={payload.id}"
)
redis_connector.prune.reset()
return

View File

@@ -319,6 +319,7 @@ class OnyxRedisSignals:
BLOCK_VALIDATE_PERMISSION_SYNC_FENCES = (
"signal:block_validate_permission_sync_fences"
)
BLOCK_VALIDATE_PRUNING_FENCES = "signal:block_validate_pruning_fences"
BLOCK_BUILD_FENCE_LOOKUP_TABLE = "signal:block_build_fence_lookup_table"

View File

@@ -80,7 +80,8 @@ class RedisConnectorPermissionSync:
def get_active_task_count(self) -> int:
"""Count of active permission sync tasks"""
count = 0
for _ in self.redis.scan_iter(
for _ in self.redis.sscan_iter(
OnyxRedisConstants.ACTIVE_FENCES,
RedisConnectorPermissionSync.FENCE_PREFIX + "*",
count=SCAN_ITER_COUNT_DEFAULT,
):

View File

@@ -1,5 +1,4 @@
from datetime import datetime
from typing import Any
from typing import cast
import redis
@@ -8,10 +7,12 @@ from pydantic import BaseModel
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from onyx.configs.constants import OnyxRedisConstants
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
class RedisConnectorExternalGroupSyncPayload(BaseModel):
id: str
submitted: datetime
started: datetime | None
celery_task_id: str | None
@@ -37,6 +38,12 @@ class RedisConnectorExternalGroupSync:
TASKSET_PREFIX = f"{PREFIX}_taskset" # connectorexternalgroupsync_taskset
SUBTASK_PREFIX = f"{PREFIX}+sub" # connectorexternalgroupsync+sub
# used to signal the overall workflow is still active
# it's impossible to get the exact state of the system at a single point in time
# so we need a signal with a TTL to bridge gaps in our checks
ACTIVE_PREFIX = PREFIX + "_active"
ACTIVE_TTL = 3600
def __init__(self, tenant_id: str | None, id: int, redis: redis.Redis) -> None:
self.tenant_id: str | None = tenant_id
self.id = id
@@ -50,6 +57,7 @@ class RedisConnectorExternalGroupSync:
self.taskset_key = f"{self.TASKSET_PREFIX}_{id}"
self.subtask_prefix: str = f"{self.SUBTASK_PREFIX}_{id}"
self.active_key = f"{self.ACTIVE_PREFIX}_{id}"
def taskset_clear(self) -> None:
self.redis.delete(self.taskset_key)
@@ -66,7 +74,8 @@ class RedisConnectorExternalGroupSync:
def get_active_task_count(self) -> int:
"""Count of active external group syncing tasks"""
count = 0
for _ in self.redis.scan_iter(
for _ in self.redis.sscan_iter(
OnyxRedisConstants.ACTIVE_FENCES,
RedisConnectorExternalGroupSync.FENCE_PREFIX + "*",
count=SCAN_ITER_COUNT_DEFAULT,
):
@@ -83,10 +92,11 @@ class RedisConnectorExternalGroupSync:
@property
def payload(self) -> RedisConnectorExternalGroupSyncPayload | None:
# read related data and evaluate/print task progress
fence_bytes = cast(Any, self.redis.get(self.fence_key))
if fence_bytes is None:
fence_raw = self.redis.get(self.fence_key)
if fence_raw is None:
return None
fence_bytes = cast(bytes, fence_raw)
fence_str = fence_bytes.decode("utf-8")
payload = RedisConnectorExternalGroupSyncPayload.model_validate_json(
cast(str, fence_str)
@@ -99,10 +109,26 @@ class RedisConnectorExternalGroupSync:
payload: RedisConnectorExternalGroupSyncPayload | None,
) -> None:
if not payload:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key)
return
self.redis.set(self.fence_key, payload.model_dump_json())
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
def set_active(self) -> None:
"""This sets a signal to keep the permissioning flow from getting cleaned up within
the expiration time.
The slack in timing is needed to avoid race conditions where simply checking
the celery queue and task status could result in race conditions."""
self.redis.set(self.active_key, 0, ex=self.ACTIVE_TTL)
def active(self) -> bool:
if self.redis.exists(self.active_key):
return True
return False
@property
def generator_complete(self) -> int | None:
@@ -138,6 +164,8 @@ class RedisConnectorExternalGroupSync:
pass
def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.active_key)
self.redis.delete(self.generator_progress_key)
self.redis.delete(self.generator_complete_key)
self.redis.delete(self.taskset_key)
@@ -152,6 +180,9 @@ class RedisConnectorExternalGroupSync:
@staticmethod
def reset_all(r: redis.Redis) -> None:
"""Deletes all redis values for all connectors"""
for key in r.scan_iter(RedisConnectorExternalGroupSync.ACTIVE_PREFIX + "*"):
r.delete(key)
for key in r.scan_iter(RedisConnectorExternalGroupSync.TASKSET_PREFIX + "*"):
r.delete(key)

View File

@@ -1,9 +1,11 @@
import time
from datetime import datetime
from typing import cast
from uuid import uuid4
import redis
from celery import Celery
from pydantic import BaseModel
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
@@ -16,6 +18,13 @@ from onyx.db.connector_credential_pair import get_connector_credential_pair_from
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
class RedisConnectorPrunePayload(BaseModel):
id: str
submitted: datetime
started: datetime | None
celery_task_id: str | None
class RedisConnectorPrune:
"""Manages interactions with redis for pruning tasks. Should only be accessed
through RedisConnector."""
@@ -36,6 +45,12 @@ class RedisConnectorPrune:
TASKSET_PREFIX = f"{PREFIX}_taskset" # connectorpruning_taskset
SUBTASK_PREFIX = f"{PREFIX}+sub" # connectorpruning+sub
# used to signal the overall workflow is still active
# it's impossible to get the exact state of the system at a single point in time
# so we need a signal with a TTL to bridge gaps in our checks
ACTIVE_PREFIX = PREFIX + "_active"
ACTIVE_TTL = 3600
def __init__(self, tenant_id: str | None, id: int, redis: redis.Redis) -> None:
self.tenant_id: str | None = tenant_id
self.id = id
@@ -49,6 +64,7 @@ class RedisConnectorPrune:
self.taskset_key = f"{self.TASKSET_PREFIX}_{id}"
self.subtask_prefix: str = f"{self.SUBTASK_PREFIX}_{id}"
self.active_key = f"{self.ACTIVE_PREFIX}_{id}"
def taskset_clear(self) -> None:
self.redis.delete(self.taskset_key)
@@ -65,8 +81,10 @@ class RedisConnectorPrune:
def get_active_task_count(self) -> int:
"""Count of active pruning tasks"""
count = 0
for key in self.redis.scan_iter(
RedisConnectorPrune.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
for _ in self.redis.sscan_iter(
OnyxRedisConstants.ACTIVE_FENCES,
RedisConnectorPrune.FENCE_PREFIX + "*",
count=SCAN_ITER_COUNT_DEFAULT,
):
count += 1
return count
@@ -78,15 +96,44 @@ class RedisConnectorPrune:
return False
def set_fence(self, value: bool) -> None:
if not value:
@property
def payload(self) -> RedisConnectorPrunePayload | None:
# read related data and evaluate/print task progress
fence_bytes = cast(bytes, self.redis.get(self.fence_key))
if fence_bytes is None:
return None
fence_str = fence_bytes.decode("utf-8")
payload = RedisConnectorPrunePayload.model_validate_json(cast(str, fence_str))
return payload
def set_fence(
self,
payload: RedisConnectorPrunePayload | None,
) -> None:
if not payload:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.fence_key)
return
self.redis.set(self.fence_key, 0)
self.redis.set(self.fence_key, payload.model_dump_json())
self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
def set_active(self) -> None:
"""This sets a signal to keep the permissioning flow from getting cleaned up within
the expiration time.
The slack in timing is needed to avoid race conditions where simply checking
the celery queue and task status could result in race conditions."""
self.redis.set(self.active_key, 0, ex=self.ACTIVE_TTL)
def active(self) -> bool:
if self.redis.exists(self.active_key):
return True
return False
@property
def generator_complete(self) -> int | None:
"""the fence payload is an int representing the starting number of
@@ -162,6 +209,7 @@ class RedisConnectorPrune:
def reset(self) -> None:
self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key)
self.redis.delete(self.active_key)
self.redis.delete(self.generator_progress_key)
self.redis.delete(self.generator_complete_key)
self.redis.delete(self.taskset_key)
@@ -176,6 +224,9 @@ class RedisConnectorPrune:
@staticmethod
def reset_all(r: redis.Redis) -> None:
"""Deletes all redis values for all connectors"""
for key in r.scan_iter(RedisConnectorPrune.ACTIVE_PREFIX + "*"):
r.delete(key)
for key in r.scan_iter(RedisConnectorPrune.TASKSET_PREFIX + "*"):
r.delete(key)

View File

@@ -359,15 +359,17 @@ def prune_cc_pair(
f"credential={cc_pair.credential_id} "
f"{cc_pair.connector.name} connector."
)
tasks_created = try_creating_prune_generator_task(
payload_id = try_creating_prune_generator_task(
primary_app, cc_pair, db_session, r, CURRENT_TENANT_ID_CONTEXTVAR.get()
)
if not tasks_created:
if not payload_id:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail="Pruning task creation failed.",
)
logger.info(f"Pruning queued: cc_pair={cc_pair.id} id={payload_id}")
return StatusResponse(
success=True,
message="Successfully created the pruning task.",
@@ -505,15 +507,17 @@ def sync_cc_pair_groups(
f"credential_id={cc_pair.credential_id} "
f"{cc_pair.connector.name} connector."
)
tasks_created = try_creating_external_group_sync_task(
payload_id = try_creating_external_group_sync_task(
primary_app, cc_pair_id, r, CURRENT_TENANT_ID_CONTEXTVAR.get()
)
if not tasks_created:
if not payload_id:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail="External group sync task creation failed.",
)
logger.info(f"External group sync queued: cc_pair={cc_pair_id} id={payload_id}")
return StatusResponse(
success=True,
message="Successfully created the external group sync task.",