Compare commits

...

126 Commits

Author SHA1 Message Date
Richard Kuo (Danswer)
b22848be52 has delete_single working in optimized fashion 2024-10-02 16:32:36 -07:00
Richard Kuo (Danswer)
92c0923581 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/shared-httpx-2
# Conflicts:
#	backend/danswer/background/celery/celery_app.py
#	backend/danswer/background/celery/celery_utils.py
#	backend/danswer/background/celery/tasks/connector_deletion/tasks.py
#	backend/danswer/background/celery/tasks/pruning/tasks.py
#	backend/danswer/background/celery/tasks/vespa/tasks.py
#	backend/danswer/document_index/factory.py
#	backend/ee/danswer/background/celery/tasks/vespa/tasks.py
#	backend/ee/danswer/background/celery_utils.py
#	backend/supervisord.conf
2024-10-02 12:16:57 -07:00
Richard Kuo (Danswer)
a8cd42ccee timing info 2024-10-02 12:00:27 -07:00
Richard Kuo (Danswer)
5c0f6a678f httpx singleton ... allows reusing connections. should be more performant 2024-09-20 16:01:29 -07:00
Richard Kuo (Danswer)
236b1e078f Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/celery_multi 2024-09-20 13:59:30 -07:00
Richard Kuo (Danswer)
84e3fbd38d use name as primary check, some minor refactoring and type hinting too. 2024-09-20 11:24:15 -07:00
Richard Kuo (Danswer)
e0829a9211 supervisord needs the percent symbols escaped 2024-09-20 09:33:35 -07:00
Richard Kuo (Danswer)
12219b28a1 autoscale sqlalchemy pool size to celery concurrency (allow override later?) 2024-09-19 18:17:52 -07:00
Richard Kuo (Danswer)
6433d47982 merge main 2024-09-19 17:51:04 -07:00
Richard Kuo (Danswer)
6ebc6232f4 fix comments 2024-09-19 15:42:08 -07:00
Richard Kuo (Danswer)
4f7e5cb5fb add configurable sql alchemy engine settings on startup (needed for various intents like API server, different celery workers and tasks, etc) 2024-09-19 15:35:38 -07:00
Richard Kuo (Danswer)
bb4ee6b5ba name celery workers and shorten dev script prefixing 2024-09-18 18:10:12 -07:00
Richard Kuo (Danswer)
2f6b5f4f38 update supervisord with some recommended settings for celery 2024-09-18 16:52:18 -07:00
Richard Kuo (Danswer)
bffdf946be add multi workers to dev_run_background_jobs.py 2024-09-18 16:51:50 -07:00
Richard Kuo (Danswer)
3a767334d7 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/celery_multi
# Conflicts:
#	backend/danswer/background/celery/celery_app.py
#	backend/danswer/background/connector_deletion.py
#	backend/danswer/server/manage/administrative.py
#	backend/ee/danswer/background/celery_utils.py
#	backend/supervisord.conf
2024-09-18 12:14:07 -07:00
Richard Kuo (Danswer)
166b405b62 Merge branch 'feature/background_deletion' of https://github.com/danswer-ai/danswer into feature/celery_multi
# Conflicts:
#	backend/danswer/background/celery/celery_app.py
#	backend/danswer/background/celery/celery_utils.py
#	backend/danswer/server/manage/administrative.py
#	backend/supervisord.conf
2024-09-17 22:09:57 -07:00
Richard Kuo (Danswer)
6a8e4ef497 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_deletion 2024-09-17 15:25:13 -07:00
Richard Kuo (Danswer)
905f104064 move monitor_usergroup_taskset to ee, improve logging 2024-09-17 12:13:56 -07:00
Richard Kuo (Danswer)
e2f3363b08 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_deletion 2024-09-17 10:04:42 -07:00
Richard Kuo
c032861573 code review fixes 2024-09-17 01:16:32 -07:00
Richard Kuo (Danswer)
faaccc7c44 celery auto associates tasks created inside another task, which bloats the result metadata considerably. trail=False prevents this. 2024-09-16 22:47:52 -07:00
Richard Kuo (Danswer)
90cc75381d organize tasks into separate files 2024-09-16 16:26:36 -07:00
Richard Kuo (Danswer)
76b0996cba Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/celery_multi 2024-09-16 11:08:19 -07:00
Richard Kuo (Danswer)
9ac8a10c5a add some preliminary locking 2024-09-16 11:07:53 -07:00
Richard Kuo
a4d36b9e77 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_deletion 2024-09-15 12:06:58 -07:00
Richard Kuo
fc3af78bf2 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_deletion 2024-09-13 22:40:04 -07:00
Richard Kuo
aca5204768 add db refresh to connector deletion 2024-09-13 22:39:25 -07:00
Richard Kuo
b5528e3eb0 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_deletion
# Conflicts:
#	backend/danswer/background/celery/celery_app.py
2024-09-13 20:00:53 -07:00
Richard Kuo (Danswer)
9b99c14c56 update logs as well and set prefetch multipliers appropriate to the worker intent 2024-09-13 13:59:40 -07:00
Richard Kuo (Danswer)
20ba19b857 multiple celery workers 2024-09-13 13:27:23 -07:00
Richard Kuo (Danswer)
5f85632efb Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_deletion
# Conflicts:
#	backend/danswer/background/celery/celery_app.py

also move session management into individual monitor functions
2024-09-12 17:48:35 -07:00
Richard Kuo (Danswer)
026bca49b5 merge 2024-09-10 13:35:47 -07:00
Richard Kuo (Danswer)
be613716e9 add vespa test 2024-09-10 09:12:32 -07:00
Richard Kuo (Danswer)
092e190a59 mypy fixes 2024-09-09 16:43:46 -07:00
Richard Kuo (Danswer)
30f6463d6c refactor to use update_single 2024-09-09 16:15:29 -07:00
Richard Kuo (Danswer)
8aef77ac20 fix imports 2024-09-09 13:54:46 -07:00
Richard Kuo (Danswer)
ba58e7dfab Merge branch 'feature/background_processing' of https://github.com/danswer-ai/danswer into feature/background_deletion 2024-09-09 13:53:50 -07:00
Richard Kuo (Danswer)
fe9bb067df Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_processing
rebase alembic migration
2024-09-09 13:53:11 -07:00
Richard Kuo (Danswer)
ca6858b5a9 Merge branch 'feature/background_processing' of https://github.com/danswer-ai/danswer into feature/background_deletion
# Conflicts:
#	backend/danswer/background/celery/celery_app.py
#	backend/danswer/background/celery/celery_redis.py
2024-09-09 09:07:34 -07:00
Richard Kuo (Danswer)
ab7769213e Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_processing 2024-09-09 09:01:39 -07:00
Richard Kuo
84ec5f4fe2 remove commented warning ... just not needed 2024-09-07 01:24:01 -07:00
Richard Kuo (Danswer)
138866aa5f missed a file 2024-09-07 00:18:58 -07:00
Richard Kuo (Danswer)
69983ab315 address code review 2024-09-06 23:22:42 -07:00
Richard Kuo (Danswer)
89619cef41 adding clarifying comments 2024-09-06 15:06:07 -07:00
Richard Kuo (Danswer)
f5cbb6dc61 Merge branch 'feature/background_processing' of https://github.com/danswer-ai/danswer into feature/background_deletion 2024-09-06 14:39:48 -07:00
Richard Kuo (Danswer)
29860f85a4 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_processing
# Conflicts:
#	backend/danswer/background/celery/celery_app.py
2024-09-06 14:21:13 -07:00
Richard Kuo (Danswer)
7512d68a33 Merge branch 'feature/background_processing' of https://github.com/danswer-ai/danswer into feature/background_deletion
# Conflicts:
#	backend/danswer/background/celery/celery_app.py
2024-09-06 12:10:43 -07:00
Richard Kuo (Danswer)
2af4926cbe Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer into feature/background_processing 2024-09-06 10:55:27 -07:00
Richard Kuo (Danswer)
3afa07acdf Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/redis 2024-09-06 10:19:03 -07:00
Richard Kuo (Danswer)
3323d4c715 rename helm testing workflow to disable it 2024-09-06 10:05:16 -07:00
Richard Kuo (Danswer)
f02a2316b5 add comment to helm chart testing workflow 2024-09-06 10:04:34 -07:00
Richard Kuo (Danswer)
3574c23543 fix redis password reference 2024-09-05 21:37:08 -07:00
Richard Kuo (Danswer)
97d36717e2 try amd64 runner 2024-09-05 20:38:53 -07:00
Richard Kuo (Danswer)
b4274395b1 increase timeout 2024-09-05 20:16:43 -07:00
Richard Kuo (Danswer)
ea35879482 add postgresql repo 2024-09-05 18:35:30 -07:00
Richard Kuo (Danswer)
bc5cb2d95f add vespa repo 2024-09-05 18:26:06 -07:00
Richard Kuo (Danswer)
8cf40988e2 some issues suggest using --config works 2024-09-05 18:18:28 -07:00
Richard Kuo (Danswer)
5d6fe9295d update helm testing 2024-09-05 18:03:26 -07:00
Richard Kuo (Danswer)
af34d19131 move files and lint them 2024-09-05 17:54:38 -07:00
Richard Kuo (Danswer)
f90cfa7cd5 bypass testing only on change for now 2024-09-05 16:24:39 -07:00
Richard Kuo (Danswer)
b8d9205682 try setting ct working directory 2024-09-05 16:15:01 -07:00
Richard Kuo (Danswer)
d6c3bd34c0 edit values.yaml 2024-09-05 15:39:36 -07:00
Richard Kuo (Danswer)
c20d92108c fetch-depth 0 2024-09-05 15:31:50 -07:00
Richard Kuo (Danswer)
09889478d5 fix command line option to --chart-dirs 2024-09-05 15:26:54 -07:00
Richard Kuo (Danswer)
82bd7f4046 hopefully this release version actually exists 2024-09-05 15:17:49 -07:00
Richard Kuo (Danswer)
0b293ddb83 fix indent 2024-09-05 15:12:11 -07:00
Richard Kuo (Danswer)
0bb2e57f5c try setting up pr testing for helm 2024-09-05 15:09:17 -07:00
Richard Kuo (Danswer)
bd079080bd add redis password to various deployments 2024-09-05 14:37:56 -07:00
Richard Kuo (Danswer)
dafbe136f1 renaming cache to cache_volume 2024-09-05 14:16:55 -07:00
Richard Kuo (Danswer)
daf46b342b backporting fixes from background_deletion 2024-09-05 14:09:48 -07:00
Richard Kuo (Danswer)
f363c6b8fd Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer into feature/background_processing 2024-09-05 13:44:07 -07:00
Richard Kuo (Danswer)
ab097cebcf Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/redis 2024-09-05 13:42:53 -07:00
Richard Kuo (Danswer)
98ec0b791d update contributing guide 2024-09-05 11:43:09 -07:00
Richard Kuo (Danswer)
590bd0fd38 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/redis 2024-09-05 11:41:38 -07:00
Richard Kuo (Danswer)
71334508a1 actually working connector deletion 2024-09-05 11:32:35 -07:00
Richard Kuo (Danswer)
957bd55a5e Merge branch 'feature/background_processing' of https://github.com/danswer-ai/danswer into feature/background_deletion 2024-09-04 13:40:17 -07:00
Richard Kuo (Danswer)
4c5c4a21fc add back writing to vespa on indexing 2024-09-04 12:31:00 -07:00
Richard Kuo (Danswer)
6335f5b1c0 allow no task syncs to run because we create certain objects with no entries but initially marked as out of date 2024-09-04 12:26:58 -07:00
Richard Kuo (Danswer)
d6d3d24157 harden indexing-status endpoint against db changes happening in the background. Needs further improvement but OK for now. 2024-09-04 12:25:07 -07:00
Richard Kuo (Danswer)
a56e3d7825 Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer into feature/background_processing
# Conflicts:
#	backend/danswer/indexing/indexing_pipeline.py
2024-09-04 09:13:45 -07:00
Richard Kuo (Danswer)
69d762e946 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/redis 2024-09-03 22:24:28 -07:00
Richard Kuo (Danswer)
6724a13de3 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/redis 2024-09-03 20:42:06 -07:00
Richard Kuo
1d4b8cf6c6 in flight 2024-09-03 00:54:48 -07:00
Richard Kuo
f358acfb09 update the rest of the docker files 2024-09-02 20:59:29 -07:00
Richard Kuo
aa1eec8d46 update REDIS_HOST env var in docker-compose.dev.yml 2024-09-02 20:19:33 -07:00
Richard Kuo
076ed181ca add redis_host environment override 2024-09-02 19:45:40 -07:00
Richard Kuo
e53ff2555f Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer into feature/background_processing 2024-09-02 16:13:05 -07:00
Richard Kuo
f0bdb39704 mypy 2024-09-02 16:10:55 -07:00
Richard Kuo
a9aca68aa8 Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer into feature/background_processing
# Conflicts:
#	backend/danswer/background/celery/celery_app.py
2024-09-02 15:58:51 -07:00
Richard Kuo
c388a815fc kombu cleanup - fail silently 2024-09-02 15:56:55 -07:00
Richard Kuo
e731a335bb Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/redis 2024-09-02 15:51:40 -07:00
Richard Kuo
bdcd553bea rebase alembic migration 2024-09-02 14:22:10 -07:00
Richard Kuo
015fab9dc7 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_processing 2024-09-02 14:22:01 -07:00
Richard Kuo
575db62c7c Merge branch 'feature/redis' of https://github.com/danswer-ai/danswer 2024-09-02 13:37:39 -07:00
Richard Kuo
9dbd29dab0 fix startup dependencies on redis 2024-09-02 13:30:53 -07:00
Richard Kuo (Danswer)
0652698633 switch to monotonic time 2024-09-02 10:52:00 -07:00
Richard Kuo (Danswer)
838a7fbfe7 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_processing 2024-09-02 10:27:16 -07:00
Richard Kuo (Danswer)
d17f49f6b7 mypy 2024-08-31 22:07:55 -07:00
Richard Kuo (Danswer)
c0a7c7cda4 fix fence ordering, rename to "monitor", fix fetch_versioned_implementation call 2024-08-31 00:49:37 -07:00
Richard Kuo (Danswer)
c174d00b21 update alembic migration 2024-08-30 14:03:53 -07:00
Richard Kuo (Danswer)
db252a6913 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_processing
# Conflicts:
#	backend/danswer/db/document_set.py
#	backend/ee/danswer/user_groups/sync.py
2024-08-30 13:58:02 -07:00
Richard Kuo (Danswer)
27eae3d9d6 mypy fixes 2024-08-30 12:16:25 -07:00
Richard Kuo (Danswer)
eea178fbd0 change vespa index log line to debug 2024-08-30 11:17:02 -07:00
Richard Kuo (Danswer)
21446175b4 the big one. adds queues and tasks, updates functions to use the queues with priorities, etc 2024-08-30 11:16:09 -07:00
Richard Kuo (Danswer)
1cfbfe173c use celeryconfig.py 2024-08-30 10:40:10 -07:00
Richard Kuo (Danswer)
b4b7801e71 fix task naming convention 2024-08-30 10:33:21 -07:00
Richard Kuo (Danswer)
48963ed32b add last_modified and last_synced to documents 2024-08-30 10:30:18 -07:00
Richard Kuo (Danswer)
ab9cf9a7b0 kombu warning getting spammy since celery is not self managing its queue in Postgres any more 2024-08-30 09:18:09 -07:00
Richard Kuo (Danswer)
e36fd3cc04 Add celery redis helper. used in a subsequent PR 2024-08-30 09:16:10 -07:00
Richard Kuo (Danswer)
f06efb96db add celeryconfig.py to simplify configuration. Will be used in a subsequent commit 2024-08-30 09:14:46 -07:00
Richard Kuo (Danswer)
66fb6b10e6 use task_logger in various celery tasks 2024-08-30 09:06:46 -07:00
Richard Kuo (Danswer)
5f7ca157e1 Add get_document function 2024-08-30 08:59:35 -07:00
Richard Kuo (Danswer)
186d4f146d add a global redis pool 2024-08-29 20:11:48 -07:00
Richard Kuo (Danswer)
024d4dd3dc fix type hinting 2024-08-29 20:10:03 -07:00
Richard Kuo (Danswer)
6d05e2910c fix double function declaration and typing 2024-08-29 20:08:58 -07:00
Richard Kuo (Danswer)
8d09dcf072 docstrings! 2024-08-29 20:08:04 -07:00
Richard Kuo (Danswer)
93fb8ef17d fix _get_access_for_document 2024-08-29 20:07:42 -07:00
Richard Kuo (Danswer)
d8562973a7 add constants 2024-08-29 20:04:51 -07:00
Richard Kuo (Danswer)
34a8bde7fa fix returning tuple fields 2024-08-29 20:04:15 -07:00
Richard Kuo (Danswer)
5100065f49 typo fix 2024-08-29 20:02:29 -07:00
Richard Kuo (Danswer)
d4c06b344b multiline commands for readability, add vespa_metadata_sync queue to worker 2024-08-29 20:01:52 -07:00
Richard Kuo (Danswer)
25f4b087dd ignore kombu tables in alembic migrations (used by celery) 2024-08-28 17:17:06 -07:00
Richard Kuo (Danswer)
7244baed90 Merge from feature/redis 2024-08-23 12:12:18 -07:00
Richard Kuo (Danswer)
09e95fe293 some new helper functions for the db 2024-08-23 12:06:35 -07:00
Richard Kuo (Danswer)
74429577e0 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/redis 2024-08-23 10:59:15 -07:00
Richard Kuo (Danswer)
10330fb34b first cut at redis 2024-08-23 10:33:55 -07:00
11 changed files with 372 additions and 53 deletions

View File

@@ -3,6 +3,7 @@ import time
from datetime import timedelta
from typing import Any
import httpx
import redis
from celery import bootsteps # type: ignore
from celery import Celery
@@ -30,6 +31,7 @@ from danswer.configs.constants import POSTGRES_CELERY_WORKER_HEAVY_APP_NAME
from danswer.configs.constants import POSTGRES_CELERY_WORKER_LIGHT_APP_NAME
from danswer.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME
from danswer.db.engine import SqlEngine
from danswer.httpx.httpx_pool import HttpxPool
from danswer.redis.redis_pool import get_redis_client
from danswer.utils.logger import ColoredFormatter
from danswer.utils.logger import PlainFormatter
@@ -113,12 +115,16 @@ def on_beat_init(sender: Any, **kwargs: Any) -> None:
@worker_init.connect
def on_worker_init(sender: Any, **kwargs: Any) -> None:
EXTRA_CONCURRENCY = 8 # a few extra connections for side operations
# decide some initial startup settings based on the celery worker's hostname
# (set at the command line)
hostname = sender.hostname
if hostname.startswith("light"):
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME)
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8)
SqlEngine.init_engine(
pool_size=sender.concurrency, max_overflow=EXTRA_CONCURRENCY
)
elif hostname.startswith("heavy"):
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
SqlEngine.init_engine(pool_size=8, max_overflow=0)
@@ -126,6 +132,12 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME)
SqlEngine.init_engine(pool_size=8, max_overflow=0)
HttpxPool.init_client(
limits=httpx.Limits(
max_keepalive_connections=sender.concurrency + EXTRA_CONCURRENCY
)
)
r = get_redis_client()
WAIT_INTERVAL = 5
@@ -212,6 +224,86 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
sender.primary_worker_lock = lock
WAIT_INTERVAL = 5
WAIT_LIMIT = 60
time_start = time.monotonic()
logger.info("Redis: Readiness check starting.")
while True:
try:
if r.ping():
break
except Exception:
pass
time_elapsed = time.monotonic() - time_start
logger.info(
f"Redis: Ping failed. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
)
if time_elapsed > WAIT_LIMIT:
msg = (
f"Redis: Readiness check did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)
time.sleep(WAIT_INTERVAL)
logger.info("Redis: Readiness check succeeded. Continuing...")
if not celery_is_worker_primary(sender):
logger.info("Running as a secondary celery worker.")
logger.info("Waiting for primary worker to be ready...")
time_start = time.monotonic()
while True:
if r.exists(DanswerRedisLocks.PRIMARY_WORKER):
break
time.monotonic()
time_elapsed = time.monotonic() - time_start
logger.info(
f"Primary worker is not ready yet. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
)
if time_elapsed > WAIT_LIMIT:
msg = (
f"Primary worker was not ready within the timeout. "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)
time.sleep(WAIT_INTERVAL)
logger.info("Wait for primary worker completed successfully. Continuing...")
return
logger.info("Running as the primary celery worker.")
# This is singleton work that should be done on startup exactly once
# by the primary worker
r = get_redis_client()
# For the moment, we're assuming that we are the only primary worker
# that should be running.
# TODO: maybe check for or clean up another zombie primary worker if we detect it
r.delete(DanswerRedisLocks.PRIMARY_WORKER)
lock = r.lock(
DanswerRedisLocks.PRIMARY_WORKER,
timeout=CELERY_PRIMARY_WORKER_LOCK_TIMEOUT,
)
logger.info("Primary worker lock: Acquire starting.")
acquired = lock.acquire(blocking_timeout=CELERY_PRIMARY_WORKER_LOCK_TIMEOUT / 2)
if acquired:
logger.info("Primary worker lock: Acquire succeeded.")
else:
logger.error("Primary worker lock: Acquire failed!")
raise WorkerShutdown("Primary worker lock could not be acquired!")
sender.primary_worker_lock = lock
r.delete(DanswerRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK)
r.delete(DanswerRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)

View File

@@ -17,6 +17,7 @@ from danswer.db.document import get_documents_for_connector_credential_pair
from danswer.db.engine import get_sqlalchemy_engine
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
from danswer.httpx.httpx_pool import HttpxPool
# use this within celery tasks to get celery task specific logging
@@ -95,7 +96,9 @@ def prune_documents_task(connector_id: int, credential_id: int) -> None:
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
document_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
primary_index_name=curr_ind_name,
secondary_index_name=sec_ind_name,
httpx_client=HttpxPool.get(),
)
if len(doc_ids_to_remove) == 0:

View File

@@ -41,6 +41,7 @@ from danswer.db.models import UserGroup
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
from danswer.document_index.interfaces import UpdateRequest
from danswer.httpx.httpx_pool import HttpxPool
from danswer.redis.redis_pool import get_redis_client
from danswer.utils.variable_functionality import fetch_versioned_implementation
from danswer.utils.variable_functionality import (
@@ -484,7 +485,9 @@ def vespa_metadata_sync_task(self: Task, document_id: str) -> bool:
with Session(get_sqlalchemy_engine()) as db_session:
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
document_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
primary_index_name=curr_ind_name,
secondary_index_name=sec_ind_name,
httpx_client=HttpxPool.get(),
)
doc = get_document(document_id, db_session)

View File

@@ -10,6 +10,8 @@ are multiple connector / credential pairs that have indexed it
connector / credential pair from the access list
(6) delete all relevant entries from postgres
"""
import time
from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
@@ -137,6 +139,8 @@ def document_by_cc_pair_cleanup_task(
) -> bool:
task_logger.info(f"document_id={document_id}")
timing = {}
timing["start"] = time.monotonic()
try:
with Session(get_sqlalchemy_engine()) as db_session:
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
@@ -148,11 +152,15 @@ def document_by_cc_pair_cleanup_task(
if count == 1:
# count == 1 means this is the only remaining cc_pair reference to the doc
# delete it from vespa and the db
timing["db_read"] = time.monotonic()
document_index.delete(doc_ids=[document_id])
# document_index.delete_single(doc_id=document_id)
timing["indexed"] = time.monotonic()
delete_documents_complete__no_commit(
db_session=db_session,
document_ids=[document_id],
)
time.monotonic()
elif count > 1:
# count > 1 means the document still has cc_pair references
doc = get_document(document_id, db_session)
@@ -176,9 +184,13 @@ def document_by_cc_pair_cleanup_task(
hidden=doc.hidden,
)
timing["db_read"] = time.monotonic()
# update Vespa. OK if doc doesn't exist. Raises exception otherwise.
document_index.update_single(update_request=update_request)
timing["indexed"] = time.monotonic()
# there are still other cc_pair references to the doc, so just resync to Vespa
delete_document_by_connector_credential_pair__no_commit(
db_session=db_session,
@@ -191,7 +203,8 @@ def document_by_cc_pair_cleanup_task(
mark_document_as_synced(document_id, db_session)
else:
pass
timing["db_read"] = time.monotonic()
timing["indexed"] = time.monotonic()
# update_docs_last_modified__no_commit(
# db_session=db_session,
@@ -208,4 +221,13 @@ def document_by_cc_pair_cleanup_task(
countdown = 2 ** (self.request.retries + 4)
self.retry(exc=e, countdown=countdown)
timing["end"] = time.monotonic()
db_read_s = timing["db_read"] - timing["start"]
index_s = timing["indexed"] - timing["db_read"]
db_write_s = timing["end"] - timing["indexed"]
all_s = timing["end"] - timing["start"]
task_logger.info(
f"db_read={db_read_s:.2f} index={index_s:.2f} db_write={db_write_s:.2f} all={all_s:.2f}"
)
return True

View File

@@ -239,7 +239,7 @@ CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD = int(
# Attachments with more chars than this will not be indexed. This is to prevent extremely
# large files from freezing indexing. 200,000 is ~100 google doc pages.
CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD = int(
os.environ.get("CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD", 200_000)
os.environ.get("CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD", 200_000_000)
)
JIRA_CONNECTOR_LABELS_TO_SKIP = [

View File

@@ -1,3 +1,4 @@
import httpx
from sqlalchemy.orm import Session
from danswer.db.search_settings import get_current_search_settings
@@ -8,13 +9,16 @@ from danswer.document_index.vespa.index import VespaIndex
def get_default_document_index(
primary_index_name: str,
secondary_index_name: str | None,
httpx_client: httpx.Client | None = None,
) -> DocumentIndex:
"""Primary index is the index that is used for querying/updating etc.
Secondary index is for when both the currently used index and the upcoming
index both need to be updated, updates are applied to both indices"""
# Currently only supporting Vespa
return VespaIndex(
index_name=primary_index_name, secondary_index_name=secondary_index_name
index_name=primary_index_name,
secondary_index_name=secondary_index_name,
httpx_client=httpx_client,
)

View File

@@ -166,6 +166,16 @@ class Deletable(abc.ABC):
"""
raise NotImplementedError
@abc.abstractmethod
def delete_single(self, doc_id: str) -> None:
"""
Given a single document ids, hard delete it from the document index
Parameters:
- doc_id: document id as specified by the connector
"""
raise NotImplementedError
class Updatable(abc.ABC):
"""

View File

@@ -7,6 +7,7 @@ from datetime import timezone
from typing import Any
from typing import cast
import httpx
import requests
from retry import retry
@@ -149,6 +150,7 @@ def _get_chunks_via_visit_api(
chunk_request: VespaChunkRequest,
index_name: str,
filters: IndexFilters,
http_client: httpx.Client,
field_names: list[str] | None = None,
get_large_chunks: bool = False,
) -> list[dict]:
@@ -181,21 +183,22 @@ def _get_chunks_via_visit_api(
selection += f" and {index_name}.large_chunk_reference_ids == null"
# Setting up the selection criteria in the query parameters
params = {
# NOTE: Document Selector Language doesn't allow `contains`, so we can't check
# for the ACL in the selection. Instead, we have to check as a postfilter
"selection": selection,
"continuation": None,
"wantedDocumentCount": 1_000,
"fieldSet": field_set,
}
params = httpx.QueryParams(
{
# NOTE: Document Selector Language doesn't allow `contains`, so we can't check
# for the ACL in the selection. Instead, we have to check as a postfilter
"selection": selection,
"wantedDocumentCount": 1_000,
"fieldSet": field_set,
}
)
document_chunks: list[dict] = []
while True:
response = requests.get(url, params=params)
response = http_client.get(url, params=params)
try:
response.raise_for_status()
except requests.HTTPError as e:
except httpx.HTTPStatusError as e:
request_info = f"Headers: {response.request.headers}\nPayload: {params}"
response_info = f"Status Code: {response.status_code}\nResponse Content: {response.text}"
error_base = f"Error occurred getting chunk by Document ID {chunk_request.document_id}"
@@ -205,7 +208,9 @@ def _get_chunks_via_visit_api(
f"{response_info}\n"
f"Exception: {e}"
)
raise requests.HTTPError(error_base) from e
raise httpx.HTTPStatusError(
error_base, request=e.request, response=e.response
) from e
# Check if the response contains any documents
response_data = response.json()
@@ -221,17 +226,21 @@ def _get_chunks_via_visit_api(
document_chunks.append(document)
# Check for continuation token to handle pagination
if "continuation" in response_data and response_data["continuation"]:
params["continuation"] = response_data["continuation"]
else:
if "continuation" not in response_data:
break # Exit loop if no continuation token
if not response_data["continuation"]:
break # Exit loop if continuation token is empty
params = params.set("continuation", response_data["continuation"])
return document_chunks
def get_all_vespa_ids_for_document_id(
document_id: str,
index_name: str,
http_client: httpx.Client,
filters: IndexFilters | None = None,
get_large_chunks: bool = False,
) -> list[str]:
@@ -239,6 +248,7 @@ def get_all_vespa_ids_for_document_id(
chunk_request=VespaChunkRequest(document_id=document_id),
index_name=index_name,
filters=filters or IndexFilters(access_control_list=None),
http_client=http_client,
field_names=[DOCUMENT_ID],
get_large_chunks=get_large_chunks,
)

View File

@@ -1,4 +1,5 @@
import concurrent.futures
import time
import httpx
from retry import retry
@@ -20,12 +21,17 @@ CONTENT_SUMMARY = "content_summary"
def _delete_vespa_doc_chunks(
document_id: str, index_name: str, http_client: httpx.Client
) -> None:
t = {}
t["start"] = time.monotonic()
doc_chunk_ids = get_all_vespa_ids_for_document_id(
document_id=document_id,
index_name=index_name,
http_client=http_client,
get_large_chunks=True,
)
t["chunks_fetched"] = time.monotonic()
for chunk_id in doc_chunk_ids:
try:
res = http_client.delete(
@@ -36,6 +42,19 @@ def _delete_vespa_doc_chunks(
logger.error(f"Failed to delete chunk, details: {e.response.text}")
raise
t["end"] = time.monotonic()
t_chunk_fetch = t["chunks_fetched"] - t["start"]
t_delete = t["end"] - t["chunks_fetched"]
t_all = t["end"] - t["start"]
logger.info(
f"_delete_vespa_doc_chunks: "
f"len={len(doc_chunk_ids)} "
f"chunk_fetch={t_chunk_fetch:.2f} "
f"delete={t_delete:.2f} "
f"all={t_all:.2f}"
)
def delete_vespa_docs(
document_ids: list[str],

View File

@@ -13,6 +13,7 @@ from typing import cast
import httpx
import requests
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
from danswer.configs.chat_configs import DOC_TIME_DECAY
from danswer.configs.chat_configs import NUM_RETURNED_HITS
from danswer.configs.chat_configs import TITLE_CONTENT_RATIO
@@ -110,9 +111,15 @@ def add_ngrams_to_schema(schema_content: str) -> str:
class VespaIndex(DocumentIndex):
def __init__(self, index_name: str, secondary_index_name: str | None) -> None:
def __init__(
self,
index_name: str,
secondary_index_name: str | None,
httpx_client: httpx.Client | None = None,
) -> None:
self.index_name = index_name
self.secondary_index_name = secondary_index_name
self.httpx_client = httpx_client or httpx.Client(http2=True)
def ensure_indices_exist(
self,
@@ -204,8 +211,12 @@ class VespaIndex(DocumentIndex):
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
httpx.Client(http2=True) as http_client,
httpx.Client(http2=True) as http_temp_client,
):
httpx_client = self.httpx_client
if not httpx_client:
httpx_client = http_temp_client
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0]
@@ -214,7 +225,7 @@ class VespaIndex(DocumentIndex):
get_existing_documents_from_chunks(
chunks=chunk_batch,
index_name=self.index_name,
http_client=http_client,
http_client=httpx_client,
executor=executor,
)
)
@@ -223,7 +234,7 @@ class VespaIndex(DocumentIndex):
delete_vespa_docs(
document_ids=doc_id_batch,
index_name=self.index_name,
http_client=http_client,
http_client=httpx_client,
executor=executor,
)
@@ -231,7 +242,7 @@ class VespaIndex(DocumentIndex):
batch_index_vespa_chunks(
chunks=chunk_batch,
index_name=self.index_name,
http_client=http_client,
http_client=httpx_client,
executor=executor,
)
@@ -248,6 +259,7 @@ class VespaIndex(DocumentIndex):
@staticmethod
def _apply_updates_batched(
updates: list[_VespaUpdateRequest],
http_client: httpx.Client,
batch_size: int = BATCH_SIZE,
) -> None:
"""Runs a batch of updates in parallel via the ThreadPoolExecutor."""
@@ -266,10 +278,7 @@ class VespaIndex(DocumentIndex):
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
httpx.Client(http2=True) as http_client,
):
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
for update_batch in batch_generator(updates, batch_size):
future_to_document_id = {
executor.submit(
@@ -309,12 +318,20 @@ class VespaIndex(DocumentIndex):
index_names.append(self.secondary_index_name)
chunk_id_start_time = time.monotonic()
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
httpx.Client(http2=True) as http_temp_client,
):
httpx_client = self.httpx_client
if not httpx_client:
httpx_client = http_temp_client
future_to_doc_chunk_ids = {
executor.submit(
get_all_vespa_ids_for_document_id,
document_id=document_id,
index_name=index_name,
http_client=httpx_client,
filters=None,
get_large_chunks=True,
): (document_id, index_name)
@@ -370,8 +387,15 @@ class VespaIndex(DocumentIndex):
update_request=update_dict,
)
)
with httpx.Client(http2=True) as http_temp_client:
httpx_client = self.httpx_client
if not httpx_client:
httpx_client = http_temp_client
self._apply_updates_batched(
processed_updates_requests, http_client=httpx_client
)
self._apply_updates_batched(processed_updates_requests)
logger.debug(
"Finished updating Vespa documents in %.2f seconds",
time.monotonic() - update_start,
@@ -382,6 +406,9 @@ class VespaIndex(DocumentIndex):
function will complete with no errors or exceptions.
Handle other exceptions if you wish to implement retry behavior
"""
timing = {}
timing["start"] = time.monotonic()
if len(update_request.document_ids) != 1:
raise ValueError("update_request must contain a single document id")
@@ -399,21 +426,26 @@ class VespaIndex(DocumentIndex):
if self.secondary_index_name:
index_names.append(self.secondary_index_name)
chunk_id_start_time = time.monotonic()
all_doc_chunk_ids: list[str] = []
for index_name in index_names:
for document_id in update_request.document_ids:
# this calls vespa and can raise http exceptions
doc_chunk_ids = get_all_vespa_ids_for_document_id(
document_id=document_id,
index_name=index_name,
filters=None,
get_large_chunks=True,
)
all_doc_chunk_ids.extend(doc_chunk_ids)
logger.debug(
f"Took {time.monotonic() - chunk_id_start_time:.2f} seconds to fetch all Vespa chunk IDs"
)
timing["chunk_fetch_start"] = time.monotonic()
with httpx.Client(http2=True) as http_temp_client:
httpx_client = self.httpx_client
if not httpx_client:
httpx_client = http_temp_client
all_doc_chunk_ids: list[str] = []
for index_name in index_names:
for document_id in update_request.document_ids:
# this calls vespa and can raise http exceptions
doc_chunk_ids = get_all_vespa_ids_for_document_id(
document_id=document_id,
index_name=index_name,
http_client=httpx_client,
filters=None,
get_large_chunks=True,
)
all_doc_chunk_ids.extend(doc_chunk_ids)
timing["chunk_fetch_end"] = time.monotonic()
# Build the _VespaUpdateRequest objects
update_dict: dict[str, dict] = {"fields": {}}
@@ -447,38 +479,120 @@ class VespaIndex(DocumentIndex):
)
)
with httpx.Client(http2=True) as http_client:
with httpx.Client(http2=True) as http_temp_client:
httpx_client = self.httpx_client
if not httpx_client:
httpx_client = http_temp_client
for update in processed_update_requests:
http_client.put(
httpx_client.put(
update.url,
headers={"Content-Type": "application/json"},
json=update.update_request,
)
timing["end"] = time.monotonic()
# logger.debug(
# "Finished updating Vespa documents in %.2f seconds",
# time.monotonic() - update_start,
# )
t_setup = timing["chunk_fetch_start"] - timing["start"]
t_chunk_fetch = timing["chunk_fetch_end"] - timing["chunk_fetch_start"]
t_update = timing["chunk_fetch_end"] - timing["chunk_fetch_start"]
t_all = timing["end"] - timing["start"]
logger.info(
f"VespaIndex.update_single: setup={t_setup:.2f}"
f"chunk_fetch={t_chunk_fetch:.2f} "
f"update={t_update:.2f} "
f"all={t_all:.2f}"
)
return
def delete(self, doc_ids: list[str]) -> None:
logger.info(f"Deleting {len(doc_ids)} documents from Vespa")
time_start = time.monotonic()
doc_ids = [replace_invalid_doc_id_characters(doc_id) for doc_id in doc_ids]
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
# indexing / updates / deletes since we have to make a large volume of requests.
with httpx.Client(http2=True) as http_client:
index_names = [self.index_name]
if self.secondary_index_name:
index_names.append(self.secondary_index_name)
index_names = [self.index_name]
if self.secondary_index_name:
index_names.append(self.secondary_index_name)
with httpx.Client(http2=True) as http_temp_client:
httpx_client = self.httpx_client
if not httpx_client:
httpx_client = http_temp_client
for index_name in index_names:
delete_vespa_docs(
document_ids=doc_ids, index_name=index_name, http_client=http_client
document_ids=doc_ids,
index_name=index_name,
http_client=httpx_client,
)
t_all = time.monotonic() - time_start
logger.info(f"VespaIndex.delete: all={t_all:.2f}")
def delete_single(self, doc_id: str) -> None:
# Vespa deletion is poorly documented ... luckily we found this
# https://docs.vespa.ai/en/operations/batch-delete.html#example
time_start = time.monotonic()
doc_id = replace_invalid_doc_id_characters(doc_id)
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
# indexing / updates / deletes since we have to make a large volume of requests.
index_names = [self.index_name]
if self.secondary_index_name:
index_names.append(self.secondary_index_name)
# if self.httpx_client:
# for index_name in index_names:
# _delete_vespa_doc_chunks(document_id=doc_id, index_name=index_name, http_client=self.httpx_client)
# else:
# with httpx.Client(http2=True) as httpx_client:
# for index_name in index_names:
# _delete_vespa_doc_chunks(document_id=doc_id, index_name=index_name, http_client=httpx_client)
for index_name in index_names:
params = httpx.QueryParams(
{
"selection": f"{index_name}.document_id=='{doc_id}'",
"cluster": DOCUMENT_INDEX_NAME,
}
)
while True:
try:
resp = self.httpx_client.delete(
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}",
params=params,
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
logger.error(f"Failed to delete chunk, details: {e.response.text}")
raise
resp_data = resp.json()
if "documentCount" in resp_data:
count = resp_data["documentCount"]
logger.info(f"VespaIndex.delete_single: chunks_deleted={count}")
# Check for continuation token to handle pagination
if "continuation" not in resp_data:
break # Exit loop if no continuation token
if not resp_data["continuation"]:
break # Exit loop if continuation token is empty
t_all = time.monotonic() - time_start
logger.info(f"VespaIndex.delete_single: all={t_all:.2f}")
def id_based_retrieval(
self,
chunk_requests: list[VespaChunkRequest],

View File

@@ -0,0 +1,42 @@
import threading
from typing import Any
import httpx
class HttpxPool:
"""Class to manage a global httpx Client instance"""
_client: httpx.Client | None = None
_lock: threading.Lock = threading.Lock()
# Default parameters for creation
DEFAULT_KWARGS = {
"http2": True,
"limits": httpx.Limits(),
}
def __init__(self) -> None:
pass
@classmethod
def _init_client(cls, **kwargs: Any) -> httpx.Client:
"""Private helper method to create and return an httpx.Client."""
merged_kwargs = {**cls.DEFAULT_KWARGS, **kwargs}
return httpx.Client(**merged_kwargs)
@classmethod
def init_client(cls, **kwargs: Any) -> None:
"""Allow the caller to init the client with extra params."""
with cls._lock:
if not cls._client:
cls._client = cls._init_client(**kwargs)
@classmethod
def get(cls) -> httpx.Client:
"""Gets the httpx.Client. Will init to default settings if not init'd."""
if not cls._client:
with cls._lock:
if not cls._client:
cls._client = cls._init_client()
return cls._client