Compare commits

...

4 Commits

Author SHA1 Message Date
hagen-danswer
dd174ce9fa Made frontend conditional check for source (#3434) 2024-12-11 23:17:31 +00:00
Chris Weaver
d0b490126b Fix Confluence perm sync for cloud users (#3374) 2024-12-09 02:08:01 +00:00
rkuo-danswer
c216406f17 Merge pull request #3334 from danswer-ai/hotfix/v0.16-redis-thread-local
Merge hotfix/v0.16-redis-thread-local into release/v0.16
2024-12-03 23:00:18 -08:00
rkuo-danswer
55b9514fb9 disable thread local locking in callbacks (#3319) 2024-12-04 05:39:56 +00:00
5 changed files with 16 additions and 3 deletions

View File

@@ -39,7 +39,6 @@ from danswer.redis.redis_usergroup import RedisUserGroup
from danswer.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
logger = setup_logger()
celery_app = Celery(__name__)
@@ -117,9 +116,13 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
# it is planned to use this lock to enforce singleton behavior on the primary
# worker, since the primary worker does redis cleanup on startup, but this isn't
# implemented yet.
# set thread_local=False since we don't control what thread the periodic task might
# reacquire the lock with
lock: RedisLock = r.lock(
DanswerRedisLocks.PRIMARY_WORKER,
timeout=CELERY_PRIMARY_WORKER_LOCK_TIMEOUT,
thread_local=False,
)
logger.info("Primary worker lock: Acquire starting.")

View File

@@ -789,9 +789,12 @@ def connector_indexing_task(
)
break
# set thread_local=False since we don't control what thread the indexing/pruning
# might run our callback with
lock: RedisLock = r.lock(
redis_connector_index.generator_lock_key,
timeout=CELERY_INDEXING_LOCK_TIMEOUT,
thread_local=False,
)
acquired = lock.acquire(blocking=False)

View File

@@ -8,6 +8,7 @@ from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from danswer.background.celery.apps.app_base import task_logger
@@ -239,9 +240,12 @@ def connector_pruning_generator_task(
r = get_redis_client(tenant_id=tenant_id)
lock = r.lock(
# set thread_local=False since we don't control what thread the indexing/pruning
# might run our callback with
lock: RedisLock = r.lock(
DanswerRedisLocks.PRUNING_LOCK_PREFIX + f"_{redis_connector.id}",
timeout=CELERY_PRUNING_LOCK_TIMEOUT,
thread_local=False,
)
acquired = lock.acquire(blocking=False)

View File

@@ -368,4 +368,5 @@ def build_confluence_client(
backoff_and_retry=True,
max_backoff_retries=10,
max_backoff_seconds=60,
cloud=is_cloud,
)

View File

@@ -108,7 +108,9 @@ const GDriveMain = ({}: {}) => {
const googleDriveServiceAccountCredential:
| Credential<GoogleDriveServiceAccountCredentialJson>
| undefined = credentialsData.find(
(credential) => credential.credential_json?.google_service_account_key
(credential) =>
credential.credential_json?.google_service_account_key &&
credential.source === "google_drive"
);
const googleDriveConnectorIndexingStatuses: ConnectorIndexingStatus<