mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-03-29 19:42:41 +00:00
Compare commits
126 Commits
cli/v0.2.0
...
bugfix/ves
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b22848be52 | ||
|
|
92c0923581 | ||
|
|
a8cd42ccee | ||
|
|
5c0f6a678f | ||
|
|
236b1e078f | ||
|
|
84e3fbd38d | ||
|
|
e0829a9211 | ||
|
|
12219b28a1 | ||
|
|
6433d47982 | ||
|
|
6ebc6232f4 | ||
|
|
4f7e5cb5fb | ||
|
|
bb4ee6b5ba | ||
|
|
2f6b5f4f38 | ||
|
|
bffdf946be | ||
|
|
3a767334d7 | ||
|
|
166b405b62 | ||
|
|
6a8e4ef497 | ||
|
|
905f104064 | ||
|
|
e2f3363b08 | ||
|
|
c032861573 | ||
|
|
faaccc7c44 | ||
|
|
90cc75381d | ||
|
|
76b0996cba | ||
|
|
9ac8a10c5a | ||
|
|
a4d36b9e77 | ||
|
|
fc3af78bf2 | ||
|
|
aca5204768 | ||
|
|
b5528e3eb0 | ||
|
|
9b99c14c56 | ||
|
|
20ba19b857 | ||
|
|
5f85632efb | ||
|
|
026bca49b5 | ||
|
|
be613716e9 | ||
|
|
092e190a59 | ||
|
|
30f6463d6c | ||
|
|
8aef77ac20 | ||
|
|
ba58e7dfab | ||
|
|
fe9bb067df | ||
|
|
ca6858b5a9 | ||
|
|
ab7769213e | ||
|
|
84ec5f4fe2 | ||
|
|
138866aa5f | ||
|
|
69983ab315 | ||
|
|
89619cef41 | ||
|
|
f5cbb6dc61 | ||
|
|
29860f85a4 | ||
|
|
7512d68a33 | ||
|
|
2af4926cbe | ||
|
|
3afa07acdf | ||
|
|
3323d4c715 | ||
|
|
f02a2316b5 | ||
|
|
3574c23543 | ||
|
|
97d36717e2 | ||
|
|
b4274395b1 | ||
|
|
ea35879482 | ||
|
|
bc5cb2d95f | ||
|
|
8cf40988e2 | ||
|
|
5d6fe9295d | ||
|
|
af34d19131 | ||
|
|
f90cfa7cd5 | ||
|
|
b8d9205682 | ||
|
|
d6c3bd34c0 | ||
|
|
c20d92108c | ||
|
|
09889478d5 | ||
|
|
82bd7f4046 | ||
|
|
0b293ddb83 | ||
|
|
0bb2e57f5c | ||
|
|
bd079080bd | ||
|
|
dafbe136f1 | ||
|
|
daf46b342b | ||
|
|
f363c6b8fd | ||
|
|
ab097cebcf | ||
|
|
98ec0b791d | ||
|
|
590bd0fd38 | ||
|
|
71334508a1 | ||
|
|
957bd55a5e | ||
|
|
4c5c4a21fc | ||
|
|
6335f5b1c0 | ||
|
|
d6d3d24157 | ||
|
|
a56e3d7825 | ||
|
|
69d762e946 | ||
|
|
6724a13de3 | ||
|
|
1d4b8cf6c6 | ||
|
|
f358acfb09 | ||
|
|
aa1eec8d46 | ||
|
|
076ed181ca | ||
|
|
e53ff2555f | ||
|
|
f0bdb39704 | ||
|
|
a9aca68aa8 | ||
|
|
c388a815fc | ||
|
|
e731a335bb | ||
|
|
bdcd553bea | ||
|
|
015fab9dc7 | ||
|
|
575db62c7c | ||
|
|
9dbd29dab0 | ||
|
|
0652698633 | ||
|
|
838a7fbfe7 | ||
|
|
d17f49f6b7 | ||
|
|
c0a7c7cda4 | ||
|
|
c174d00b21 | ||
|
|
db252a6913 | ||
|
|
27eae3d9d6 | ||
|
|
eea178fbd0 | ||
|
|
21446175b4 | ||
|
|
1cfbfe173c | ||
|
|
b4b7801e71 | ||
|
|
48963ed32b | ||
|
|
ab9cf9a7b0 | ||
|
|
e36fd3cc04 | ||
|
|
f06efb96db | ||
|
|
66fb6b10e6 | ||
|
|
5f7ca157e1 | ||
|
|
186d4f146d | ||
|
|
024d4dd3dc | ||
|
|
6d05e2910c | ||
|
|
8d09dcf072 | ||
|
|
93fb8ef17d | ||
|
|
d8562973a7 | ||
|
|
34a8bde7fa | ||
|
|
5100065f49 | ||
|
|
d4c06b344b | ||
|
|
25f4b087dd | ||
|
|
7244baed90 | ||
|
|
09e95fe293 | ||
|
|
74429577e0 | ||
|
|
10330fb34b |
@@ -3,6 +3,7 @@ import time
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import redis
|
||||
from celery import bootsteps # type: ignore
|
||||
from celery import Celery
|
||||
@@ -30,6 +31,7 @@ from danswer.configs.constants import POSTGRES_CELERY_WORKER_HEAVY_APP_NAME
|
||||
from danswer.configs.constants import POSTGRES_CELERY_WORKER_LIGHT_APP_NAME
|
||||
from danswer.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME
|
||||
from danswer.db.engine import SqlEngine
|
||||
from danswer.httpx.httpx_pool import HttpxPool
|
||||
from danswer.redis.redis_pool import get_redis_client
|
||||
from danswer.utils.logger import ColoredFormatter
|
||||
from danswer.utils.logger import PlainFormatter
|
||||
@@ -113,12 +115,16 @@ def on_beat_init(sender: Any, **kwargs: Any) -> None:
|
||||
|
||||
@worker_init.connect
|
||||
def on_worker_init(sender: Any, **kwargs: Any) -> None:
|
||||
EXTRA_CONCURRENCY = 8 # a few extra connections for side operations
|
||||
|
||||
# decide some initial startup settings based on the celery worker's hostname
|
||||
# (set at the command line)
|
||||
hostname = sender.hostname
|
||||
if hostname.startswith("light"):
|
||||
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME)
|
||||
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8)
|
||||
SqlEngine.init_engine(
|
||||
pool_size=sender.concurrency, max_overflow=EXTRA_CONCURRENCY
|
||||
)
|
||||
elif hostname.startswith("heavy"):
|
||||
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
|
||||
SqlEngine.init_engine(pool_size=8, max_overflow=0)
|
||||
@@ -126,6 +132,12 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
|
||||
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME)
|
||||
SqlEngine.init_engine(pool_size=8, max_overflow=0)
|
||||
|
||||
HttpxPool.init_client(
|
||||
limits=httpx.Limits(
|
||||
max_keepalive_connections=sender.concurrency + EXTRA_CONCURRENCY
|
||||
)
|
||||
)
|
||||
|
||||
r = get_redis_client()
|
||||
|
||||
WAIT_INTERVAL = 5
|
||||
@@ -212,6 +224,86 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
|
||||
|
||||
sender.primary_worker_lock = lock
|
||||
|
||||
WAIT_INTERVAL = 5
|
||||
WAIT_LIMIT = 60
|
||||
|
||||
time_start = time.monotonic()
|
||||
logger.info("Redis: Readiness check starting.")
|
||||
while True:
|
||||
try:
|
||||
if r.ping():
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
time_elapsed = time.monotonic() - time_start
|
||||
logger.info(
|
||||
f"Redis: Ping failed. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
|
||||
)
|
||||
if time_elapsed > WAIT_LIMIT:
|
||||
msg = (
|
||||
f"Redis: Readiness check did not succeed within the timeout "
|
||||
f"({WAIT_LIMIT} seconds). Exiting..."
|
||||
)
|
||||
logger.error(msg)
|
||||
raise WorkerShutdown(msg)
|
||||
|
||||
time.sleep(WAIT_INTERVAL)
|
||||
|
||||
logger.info("Redis: Readiness check succeeded. Continuing...")
|
||||
|
||||
if not celery_is_worker_primary(sender):
|
||||
logger.info("Running as a secondary celery worker.")
|
||||
logger.info("Waiting for primary worker to be ready...")
|
||||
time_start = time.monotonic()
|
||||
while True:
|
||||
if r.exists(DanswerRedisLocks.PRIMARY_WORKER):
|
||||
break
|
||||
|
||||
time.monotonic()
|
||||
time_elapsed = time.monotonic() - time_start
|
||||
logger.info(
|
||||
f"Primary worker is not ready yet. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
|
||||
)
|
||||
if time_elapsed > WAIT_LIMIT:
|
||||
msg = (
|
||||
f"Primary worker was not ready within the timeout. "
|
||||
f"({WAIT_LIMIT} seconds). Exiting..."
|
||||
)
|
||||
logger.error(msg)
|
||||
raise WorkerShutdown(msg)
|
||||
|
||||
time.sleep(WAIT_INTERVAL)
|
||||
|
||||
logger.info("Wait for primary worker completed successfully. Continuing...")
|
||||
return
|
||||
|
||||
logger.info("Running as the primary celery worker.")
|
||||
|
||||
# This is singleton work that should be done on startup exactly once
|
||||
# by the primary worker
|
||||
r = get_redis_client()
|
||||
|
||||
# For the moment, we're assuming that we are the only primary worker
|
||||
# that should be running.
|
||||
# TODO: maybe check for or clean up another zombie primary worker if we detect it
|
||||
r.delete(DanswerRedisLocks.PRIMARY_WORKER)
|
||||
|
||||
lock = r.lock(
|
||||
DanswerRedisLocks.PRIMARY_WORKER,
|
||||
timeout=CELERY_PRIMARY_WORKER_LOCK_TIMEOUT,
|
||||
)
|
||||
|
||||
logger.info("Primary worker lock: Acquire starting.")
|
||||
acquired = lock.acquire(blocking_timeout=CELERY_PRIMARY_WORKER_LOCK_TIMEOUT / 2)
|
||||
if acquired:
|
||||
logger.info("Primary worker lock: Acquire succeeded.")
|
||||
else:
|
||||
logger.error("Primary worker lock: Acquire failed!")
|
||||
raise WorkerShutdown("Primary worker lock could not be acquired!")
|
||||
|
||||
sender.primary_worker_lock = lock
|
||||
|
||||
r.delete(DanswerRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK)
|
||||
r.delete(DanswerRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK)
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ from danswer.db.document import get_documents_for_connector_credential_pair
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.document_index.document_index_utils import get_both_index_names
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.httpx.httpx_pool import HttpxPool
|
||||
|
||||
|
||||
# use this within celery tasks to get celery task specific logging
|
||||
@@ -95,7 +96,9 @@ def prune_documents_task(connector_id: int, credential_id: int) -> None:
|
||||
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
primary_index_name=curr_ind_name,
|
||||
secondary_index_name=sec_ind_name,
|
||||
httpx_client=HttpxPool.get(),
|
||||
)
|
||||
|
||||
if len(doc_ids_to_remove) == 0:
|
||||
|
||||
@@ -41,6 +41,7 @@ from danswer.db.models import UserGroup
|
||||
from danswer.document_index.document_index_utils import get_both_index_names
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.document_index.interfaces import UpdateRequest
|
||||
from danswer.httpx.httpx_pool import HttpxPool
|
||||
from danswer.redis.redis_pool import get_redis_client
|
||||
from danswer.utils.variable_functionality import fetch_versioned_implementation
|
||||
from danswer.utils.variable_functionality import (
|
||||
@@ -484,7 +485,9 @@ def vespa_metadata_sync_task(self: Task, document_id: str) -> bool:
|
||||
with Session(get_sqlalchemy_engine()) as db_session:
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
primary_index_name=curr_ind_name,
|
||||
secondary_index_name=sec_ind_name,
|
||||
httpx_client=HttpxPool.get(),
|
||||
)
|
||||
|
||||
doc = get_document(document_id, db_session)
|
||||
|
||||
@@ -10,6 +10,8 @@ are multiple connector / credential pairs that have indexed it
|
||||
connector / credential pair from the access list
|
||||
(6) delete all relevant entries from postgres
|
||||
"""
|
||||
import time
|
||||
|
||||
from celery import shared_task
|
||||
from celery import Task
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
@@ -137,6 +139,8 @@ def document_by_cc_pair_cleanup_task(
|
||||
) -> bool:
|
||||
task_logger.info(f"document_id={document_id}")
|
||||
|
||||
timing = {}
|
||||
timing["start"] = time.monotonic()
|
||||
try:
|
||||
with Session(get_sqlalchemy_engine()) as db_session:
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
@@ -148,11 +152,15 @@ def document_by_cc_pair_cleanup_task(
|
||||
if count == 1:
|
||||
# count == 1 means this is the only remaining cc_pair reference to the doc
|
||||
# delete it from vespa and the db
|
||||
timing["db_read"] = time.monotonic()
|
||||
document_index.delete(doc_ids=[document_id])
|
||||
# document_index.delete_single(doc_id=document_id)
|
||||
timing["indexed"] = time.monotonic()
|
||||
delete_documents_complete__no_commit(
|
||||
db_session=db_session,
|
||||
document_ids=[document_id],
|
||||
)
|
||||
time.monotonic()
|
||||
elif count > 1:
|
||||
# count > 1 means the document still has cc_pair references
|
||||
doc = get_document(document_id, db_session)
|
||||
@@ -176,9 +184,13 @@ def document_by_cc_pair_cleanup_task(
|
||||
hidden=doc.hidden,
|
||||
)
|
||||
|
||||
timing["db_read"] = time.monotonic()
|
||||
|
||||
# update Vespa. OK if doc doesn't exist. Raises exception otherwise.
|
||||
document_index.update_single(update_request=update_request)
|
||||
|
||||
timing["indexed"] = time.monotonic()
|
||||
|
||||
# there are still other cc_pair references to the doc, so just resync to Vespa
|
||||
delete_document_by_connector_credential_pair__no_commit(
|
||||
db_session=db_session,
|
||||
@@ -191,7 +203,8 @@ def document_by_cc_pair_cleanup_task(
|
||||
|
||||
mark_document_as_synced(document_id, db_session)
|
||||
else:
|
||||
pass
|
||||
timing["db_read"] = time.monotonic()
|
||||
timing["indexed"] = time.monotonic()
|
||||
|
||||
# update_docs_last_modified__no_commit(
|
||||
# db_session=db_session,
|
||||
@@ -208,4 +221,13 @@ def document_by_cc_pair_cleanup_task(
|
||||
countdown = 2 ** (self.request.retries + 4)
|
||||
self.retry(exc=e, countdown=countdown)
|
||||
|
||||
timing["end"] = time.monotonic()
|
||||
|
||||
db_read_s = timing["db_read"] - timing["start"]
|
||||
index_s = timing["indexed"] - timing["db_read"]
|
||||
db_write_s = timing["end"] - timing["indexed"]
|
||||
all_s = timing["end"] - timing["start"]
|
||||
task_logger.info(
|
||||
f"db_read={db_read_s:.2f} index={index_s:.2f} db_write={db_write_s:.2f} all={all_s:.2f}"
|
||||
)
|
||||
return True
|
||||
|
||||
@@ -239,7 +239,7 @@ CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD = int(
|
||||
# Attachments with more chars than this will not be indexed. This is to prevent extremely
|
||||
# large files from freezing indexing. 200,000 is ~100 google doc pages.
|
||||
CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD = int(
|
||||
os.environ.get("CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD", 200_000)
|
||||
os.environ.get("CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD", 200_000_000)
|
||||
)
|
||||
|
||||
JIRA_CONNECTOR_LABELS_TO_SKIP = [
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import httpx
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.db.search_settings import get_current_search_settings
|
||||
@@ -8,13 +9,16 @@ from danswer.document_index.vespa.index import VespaIndex
|
||||
def get_default_document_index(
|
||||
primary_index_name: str,
|
||||
secondary_index_name: str | None,
|
||||
httpx_client: httpx.Client | None = None,
|
||||
) -> DocumentIndex:
|
||||
"""Primary index is the index that is used for querying/updating etc.
|
||||
Secondary index is for when both the currently used index and the upcoming
|
||||
index both need to be updated, updates are applied to both indices"""
|
||||
# Currently only supporting Vespa
|
||||
return VespaIndex(
|
||||
index_name=primary_index_name, secondary_index_name=secondary_index_name
|
||||
index_name=primary_index_name,
|
||||
secondary_index_name=secondary_index_name,
|
||||
httpx_client=httpx_client,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -166,6 +166,16 @@ class Deletable(abc.ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_single(self, doc_id: str) -> None:
|
||||
"""
|
||||
Given a single document ids, hard delete it from the document index
|
||||
|
||||
Parameters:
|
||||
- doc_id: document id as specified by the connector
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class Updatable(abc.ABC):
|
||||
"""
|
||||
|
||||
@@ -7,6 +7,7 @@ from datetime import timezone
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
|
||||
import httpx
|
||||
import requests
|
||||
from retry import retry
|
||||
|
||||
@@ -149,6 +150,7 @@ def _get_chunks_via_visit_api(
|
||||
chunk_request: VespaChunkRequest,
|
||||
index_name: str,
|
||||
filters: IndexFilters,
|
||||
http_client: httpx.Client,
|
||||
field_names: list[str] | None = None,
|
||||
get_large_chunks: bool = False,
|
||||
) -> list[dict]:
|
||||
@@ -181,21 +183,22 @@ def _get_chunks_via_visit_api(
|
||||
selection += f" and {index_name}.large_chunk_reference_ids == null"
|
||||
|
||||
# Setting up the selection criteria in the query parameters
|
||||
params = {
|
||||
# NOTE: Document Selector Language doesn't allow `contains`, so we can't check
|
||||
# for the ACL in the selection. Instead, we have to check as a postfilter
|
||||
"selection": selection,
|
||||
"continuation": None,
|
||||
"wantedDocumentCount": 1_000,
|
||||
"fieldSet": field_set,
|
||||
}
|
||||
params = httpx.QueryParams(
|
||||
{
|
||||
# NOTE: Document Selector Language doesn't allow `contains`, so we can't check
|
||||
# for the ACL in the selection. Instead, we have to check as a postfilter
|
||||
"selection": selection,
|
||||
"wantedDocumentCount": 1_000,
|
||||
"fieldSet": field_set,
|
||||
}
|
||||
)
|
||||
|
||||
document_chunks: list[dict] = []
|
||||
while True:
|
||||
response = requests.get(url, params=params)
|
||||
response = http_client.get(url, params=params)
|
||||
try:
|
||||
response.raise_for_status()
|
||||
except requests.HTTPError as e:
|
||||
except httpx.HTTPStatusError as e:
|
||||
request_info = f"Headers: {response.request.headers}\nPayload: {params}"
|
||||
response_info = f"Status Code: {response.status_code}\nResponse Content: {response.text}"
|
||||
error_base = f"Error occurred getting chunk by Document ID {chunk_request.document_id}"
|
||||
@@ -205,7 +208,9 @@ def _get_chunks_via_visit_api(
|
||||
f"{response_info}\n"
|
||||
f"Exception: {e}"
|
||||
)
|
||||
raise requests.HTTPError(error_base) from e
|
||||
raise httpx.HTTPStatusError(
|
||||
error_base, request=e.request, response=e.response
|
||||
) from e
|
||||
|
||||
# Check if the response contains any documents
|
||||
response_data = response.json()
|
||||
@@ -221,17 +226,21 @@ def _get_chunks_via_visit_api(
|
||||
document_chunks.append(document)
|
||||
|
||||
# Check for continuation token to handle pagination
|
||||
if "continuation" in response_data and response_data["continuation"]:
|
||||
params["continuation"] = response_data["continuation"]
|
||||
else:
|
||||
if "continuation" not in response_data:
|
||||
break # Exit loop if no continuation token
|
||||
|
||||
if not response_data["continuation"]:
|
||||
break # Exit loop if continuation token is empty
|
||||
|
||||
params = params.set("continuation", response_data["continuation"])
|
||||
|
||||
return document_chunks
|
||||
|
||||
|
||||
def get_all_vespa_ids_for_document_id(
|
||||
document_id: str,
|
||||
index_name: str,
|
||||
http_client: httpx.Client,
|
||||
filters: IndexFilters | None = None,
|
||||
get_large_chunks: bool = False,
|
||||
) -> list[str]:
|
||||
@@ -239,6 +248,7 @@ def get_all_vespa_ids_for_document_id(
|
||||
chunk_request=VespaChunkRequest(document_id=document_id),
|
||||
index_name=index_name,
|
||||
filters=filters or IndexFilters(access_control_list=None),
|
||||
http_client=http_client,
|
||||
field_names=[DOCUMENT_ID],
|
||||
get_large_chunks=get_large_chunks,
|
||||
)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import concurrent.futures
|
||||
import time
|
||||
|
||||
import httpx
|
||||
from retry import retry
|
||||
@@ -20,12 +21,17 @@ CONTENT_SUMMARY = "content_summary"
|
||||
def _delete_vespa_doc_chunks(
|
||||
document_id: str, index_name: str, http_client: httpx.Client
|
||||
) -> None:
|
||||
t = {}
|
||||
t["start"] = time.monotonic()
|
||||
doc_chunk_ids = get_all_vespa_ids_for_document_id(
|
||||
document_id=document_id,
|
||||
index_name=index_name,
|
||||
http_client=http_client,
|
||||
get_large_chunks=True,
|
||||
)
|
||||
|
||||
t["chunks_fetched"] = time.monotonic()
|
||||
|
||||
for chunk_id in doc_chunk_ids:
|
||||
try:
|
||||
res = http_client.delete(
|
||||
@@ -36,6 +42,19 @@ def _delete_vespa_doc_chunks(
|
||||
logger.error(f"Failed to delete chunk, details: {e.response.text}")
|
||||
raise
|
||||
|
||||
t["end"] = time.monotonic()
|
||||
|
||||
t_chunk_fetch = t["chunks_fetched"] - t["start"]
|
||||
t_delete = t["end"] - t["chunks_fetched"]
|
||||
t_all = t["end"] - t["start"]
|
||||
logger.info(
|
||||
f"_delete_vespa_doc_chunks: "
|
||||
f"len={len(doc_chunk_ids)} "
|
||||
f"chunk_fetch={t_chunk_fetch:.2f} "
|
||||
f"delete={t_delete:.2f} "
|
||||
f"all={t_all:.2f}"
|
||||
)
|
||||
|
||||
|
||||
def delete_vespa_docs(
|
||||
document_ids: list[str],
|
||||
|
||||
@@ -13,6 +13,7 @@ from typing import cast
|
||||
import httpx
|
||||
import requests
|
||||
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.configs.chat_configs import DOC_TIME_DECAY
|
||||
from danswer.configs.chat_configs import NUM_RETURNED_HITS
|
||||
from danswer.configs.chat_configs import TITLE_CONTENT_RATIO
|
||||
@@ -110,9 +111,15 @@ def add_ngrams_to_schema(schema_content: str) -> str:
|
||||
|
||||
|
||||
class VespaIndex(DocumentIndex):
|
||||
def __init__(self, index_name: str, secondary_index_name: str | None) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
index_name: str,
|
||||
secondary_index_name: str | None,
|
||||
httpx_client: httpx.Client | None = None,
|
||||
) -> None:
|
||||
self.index_name = index_name
|
||||
self.secondary_index_name = secondary_index_name
|
||||
self.httpx_client = httpx_client or httpx.Client(http2=True)
|
||||
|
||||
def ensure_indices_exist(
|
||||
self,
|
||||
@@ -204,8 +211,12 @@ class VespaIndex(DocumentIndex):
|
||||
# indexing / updates / deletes since we have to make a large volume of requests.
|
||||
with (
|
||||
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
|
||||
httpx.Client(http2=True) as http_client,
|
||||
httpx.Client(http2=True) as http_temp_client,
|
||||
):
|
||||
httpx_client = self.httpx_client
|
||||
if not httpx_client:
|
||||
httpx_client = http_temp_client
|
||||
|
||||
# Check for existing documents, existing documents need to have all of their chunks deleted
|
||||
# prior to indexing as the document size (num chunks) may have shrunk
|
||||
first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0]
|
||||
@@ -214,7 +225,7 @@ class VespaIndex(DocumentIndex):
|
||||
get_existing_documents_from_chunks(
|
||||
chunks=chunk_batch,
|
||||
index_name=self.index_name,
|
||||
http_client=http_client,
|
||||
http_client=httpx_client,
|
||||
executor=executor,
|
||||
)
|
||||
)
|
||||
@@ -223,7 +234,7 @@ class VespaIndex(DocumentIndex):
|
||||
delete_vespa_docs(
|
||||
document_ids=doc_id_batch,
|
||||
index_name=self.index_name,
|
||||
http_client=http_client,
|
||||
http_client=httpx_client,
|
||||
executor=executor,
|
||||
)
|
||||
|
||||
@@ -231,7 +242,7 @@ class VespaIndex(DocumentIndex):
|
||||
batch_index_vespa_chunks(
|
||||
chunks=chunk_batch,
|
||||
index_name=self.index_name,
|
||||
http_client=http_client,
|
||||
http_client=httpx_client,
|
||||
executor=executor,
|
||||
)
|
||||
|
||||
@@ -248,6 +259,7 @@ class VespaIndex(DocumentIndex):
|
||||
@staticmethod
|
||||
def _apply_updates_batched(
|
||||
updates: list[_VespaUpdateRequest],
|
||||
http_client: httpx.Client,
|
||||
batch_size: int = BATCH_SIZE,
|
||||
) -> None:
|
||||
"""Runs a batch of updates in parallel via the ThreadPoolExecutor."""
|
||||
@@ -266,10 +278,7 @@ class VespaIndex(DocumentIndex):
|
||||
|
||||
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
|
||||
# indexing / updates / deletes since we have to make a large volume of requests.
|
||||
with (
|
||||
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
|
||||
httpx.Client(http2=True) as http_client,
|
||||
):
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
|
||||
for update_batch in batch_generator(updates, batch_size):
|
||||
future_to_document_id = {
|
||||
executor.submit(
|
||||
@@ -309,12 +318,20 @@ class VespaIndex(DocumentIndex):
|
||||
index_names.append(self.secondary_index_name)
|
||||
|
||||
chunk_id_start_time = time.monotonic()
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
|
||||
with (
|
||||
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
|
||||
httpx.Client(http2=True) as http_temp_client,
|
||||
):
|
||||
httpx_client = self.httpx_client
|
||||
if not httpx_client:
|
||||
httpx_client = http_temp_client
|
||||
|
||||
future_to_doc_chunk_ids = {
|
||||
executor.submit(
|
||||
get_all_vespa_ids_for_document_id,
|
||||
document_id=document_id,
|
||||
index_name=index_name,
|
||||
http_client=httpx_client,
|
||||
filters=None,
|
||||
get_large_chunks=True,
|
||||
): (document_id, index_name)
|
||||
@@ -370,8 +387,15 @@ class VespaIndex(DocumentIndex):
|
||||
update_request=update_dict,
|
||||
)
|
||||
)
|
||||
with httpx.Client(http2=True) as http_temp_client:
|
||||
httpx_client = self.httpx_client
|
||||
if not httpx_client:
|
||||
httpx_client = http_temp_client
|
||||
|
||||
self._apply_updates_batched(
|
||||
processed_updates_requests, http_client=httpx_client
|
||||
)
|
||||
|
||||
self._apply_updates_batched(processed_updates_requests)
|
||||
logger.debug(
|
||||
"Finished updating Vespa documents in %.2f seconds",
|
||||
time.monotonic() - update_start,
|
||||
@@ -382,6 +406,9 @@ class VespaIndex(DocumentIndex):
|
||||
function will complete with no errors or exceptions.
|
||||
Handle other exceptions if you wish to implement retry behavior
|
||||
"""
|
||||
timing = {}
|
||||
timing["start"] = time.monotonic()
|
||||
|
||||
if len(update_request.document_ids) != 1:
|
||||
raise ValueError("update_request must contain a single document id")
|
||||
|
||||
@@ -399,21 +426,26 @@ class VespaIndex(DocumentIndex):
|
||||
if self.secondary_index_name:
|
||||
index_names.append(self.secondary_index_name)
|
||||
|
||||
chunk_id_start_time = time.monotonic()
|
||||
all_doc_chunk_ids: list[str] = []
|
||||
for index_name in index_names:
|
||||
for document_id in update_request.document_ids:
|
||||
# this calls vespa and can raise http exceptions
|
||||
doc_chunk_ids = get_all_vespa_ids_for_document_id(
|
||||
document_id=document_id,
|
||||
index_name=index_name,
|
||||
filters=None,
|
||||
get_large_chunks=True,
|
||||
)
|
||||
all_doc_chunk_ids.extend(doc_chunk_ids)
|
||||
logger.debug(
|
||||
f"Took {time.monotonic() - chunk_id_start_time:.2f} seconds to fetch all Vespa chunk IDs"
|
||||
)
|
||||
timing["chunk_fetch_start"] = time.monotonic()
|
||||
with httpx.Client(http2=True) as http_temp_client:
|
||||
httpx_client = self.httpx_client
|
||||
if not httpx_client:
|
||||
httpx_client = http_temp_client
|
||||
|
||||
all_doc_chunk_ids: list[str] = []
|
||||
for index_name in index_names:
|
||||
for document_id in update_request.document_ids:
|
||||
# this calls vespa and can raise http exceptions
|
||||
doc_chunk_ids = get_all_vespa_ids_for_document_id(
|
||||
document_id=document_id,
|
||||
index_name=index_name,
|
||||
http_client=httpx_client,
|
||||
filters=None,
|
||||
get_large_chunks=True,
|
||||
)
|
||||
all_doc_chunk_ids.extend(doc_chunk_ids)
|
||||
|
||||
timing["chunk_fetch_end"] = time.monotonic()
|
||||
|
||||
# Build the _VespaUpdateRequest objects
|
||||
update_dict: dict[str, dict] = {"fields": {}}
|
||||
@@ -447,38 +479,120 @@ class VespaIndex(DocumentIndex):
|
||||
)
|
||||
)
|
||||
|
||||
with httpx.Client(http2=True) as http_client:
|
||||
with httpx.Client(http2=True) as http_temp_client:
|
||||
httpx_client = self.httpx_client
|
||||
if not httpx_client:
|
||||
httpx_client = http_temp_client
|
||||
|
||||
for update in processed_update_requests:
|
||||
http_client.put(
|
||||
httpx_client.put(
|
||||
update.url,
|
||||
headers={"Content-Type": "application/json"},
|
||||
json=update.update_request,
|
||||
)
|
||||
timing["end"] = time.monotonic()
|
||||
|
||||
# logger.debug(
|
||||
# "Finished updating Vespa documents in %.2f seconds",
|
||||
# time.monotonic() - update_start,
|
||||
# )
|
||||
|
||||
t_setup = timing["chunk_fetch_start"] - timing["start"]
|
||||
t_chunk_fetch = timing["chunk_fetch_end"] - timing["chunk_fetch_start"]
|
||||
t_update = timing["chunk_fetch_end"] - timing["chunk_fetch_start"]
|
||||
t_all = timing["end"] - timing["start"]
|
||||
logger.info(
|
||||
f"VespaIndex.update_single: setup={t_setup:.2f}"
|
||||
f"chunk_fetch={t_chunk_fetch:.2f} "
|
||||
f"update={t_update:.2f} "
|
||||
f"all={t_all:.2f}"
|
||||
)
|
||||
return
|
||||
|
||||
def delete(self, doc_ids: list[str]) -> None:
|
||||
logger.info(f"Deleting {len(doc_ids)} documents from Vespa")
|
||||
|
||||
time_start = time.monotonic()
|
||||
|
||||
doc_ids = [replace_invalid_doc_id_characters(doc_id) for doc_id in doc_ids]
|
||||
|
||||
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
|
||||
# indexing / updates / deletes since we have to make a large volume of requests.
|
||||
with httpx.Client(http2=True) as http_client:
|
||||
index_names = [self.index_name]
|
||||
if self.secondary_index_name:
|
||||
index_names.append(self.secondary_index_name)
|
||||
index_names = [self.index_name]
|
||||
if self.secondary_index_name:
|
||||
index_names.append(self.secondary_index_name)
|
||||
|
||||
with httpx.Client(http2=True) as http_temp_client:
|
||||
httpx_client = self.httpx_client
|
||||
if not httpx_client:
|
||||
httpx_client = http_temp_client
|
||||
|
||||
for index_name in index_names:
|
||||
delete_vespa_docs(
|
||||
document_ids=doc_ids, index_name=index_name, http_client=http_client
|
||||
document_ids=doc_ids,
|
||||
index_name=index_name,
|
||||
http_client=httpx_client,
|
||||
)
|
||||
|
||||
t_all = time.monotonic() - time_start
|
||||
logger.info(f"VespaIndex.delete: all={t_all:.2f}")
|
||||
|
||||
def delete_single(self, doc_id: str) -> None:
|
||||
# Vespa deletion is poorly documented ... luckily we found this
|
||||
# https://docs.vespa.ai/en/operations/batch-delete.html#example
|
||||
|
||||
time_start = time.monotonic()
|
||||
|
||||
doc_id = replace_invalid_doc_id_characters(doc_id)
|
||||
|
||||
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
|
||||
# indexing / updates / deletes since we have to make a large volume of requests.
|
||||
index_names = [self.index_name]
|
||||
if self.secondary_index_name:
|
||||
index_names.append(self.secondary_index_name)
|
||||
|
||||
# if self.httpx_client:
|
||||
# for index_name in index_names:
|
||||
# _delete_vespa_doc_chunks(document_id=doc_id, index_name=index_name, http_client=self.httpx_client)
|
||||
# else:
|
||||
# with httpx.Client(http2=True) as httpx_client:
|
||||
# for index_name in index_names:
|
||||
# _delete_vespa_doc_chunks(document_id=doc_id, index_name=index_name, http_client=httpx_client)
|
||||
|
||||
for index_name in index_names:
|
||||
params = httpx.QueryParams(
|
||||
{
|
||||
"selection": f"{index_name}.document_id=='{doc_id}'",
|
||||
"cluster": DOCUMENT_INDEX_NAME,
|
||||
}
|
||||
)
|
||||
|
||||
while True:
|
||||
try:
|
||||
resp = self.httpx_client.delete(
|
||||
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}",
|
||||
params=params,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
except httpx.HTTPStatusError as e:
|
||||
logger.error(f"Failed to delete chunk, details: {e.response.text}")
|
||||
raise
|
||||
|
||||
resp_data = resp.json()
|
||||
if "documentCount" in resp_data:
|
||||
count = resp_data["documentCount"]
|
||||
logger.info(f"VespaIndex.delete_single: chunks_deleted={count}")
|
||||
|
||||
# Check for continuation token to handle pagination
|
||||
if "continuation" not in resp_data:
|
||||
break # Exit loop if no continuation token
|
||||
|
||||
if not resp_data["continuation"]:
|
||||
break # Exit loop if continuation token is empty
|
||||
|
||||
t_all = time.monotonic() - time_start
|
||||
logger.info(f"VespaIndex.delete_single: all={t_all:.2f}")
|
||||
|
||||
def id_based_retrieval(
|
||||
self,
|
||||
chunk_requests: list[VespaChunkRequest],
|
||||
|
||||
42
backend/danswer/httpx/httpx_pool.py
Normal file
42
backend/danswer/httpx/httpx_pool.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
class HttpxPool:
|
||||
"""Class to manage a global httpx Client instance"""
|
||||
|
||||
_client: httpx.Client | None = None
|
||||
_lock: threading.Lock = threading.Lock()
|
||||
|
||||
# Default parameters for creation
|
||||
DEFAULT_KWARGS = {
|
||||
"http2": True,
|
||||
"limits": httpx.Limits(),
|
||||
}
|
||||
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def _init_client(cls, **kwargs: Any) -> httpx.Client:
|
||||
"""Private helper method to create and return an httpx.Client."""
|
||||
merged_kwargs = {**cls.DEFAULT_KWARGS, **kwargs}
|
||||
return httpx.Client(**merged_kwargs)
|
||||
|
||||
@classmethod
|
||||
def init_client(cls, **kwargs: Any) -> None:
|
||||
"""Allow the caller to init the client with extra params."""
|
||||
with cls._lock:
|
||||
if not cls._client:
|
||||
cls._client = cls._init_client(**kwargs)
|
||||
|
||||
@classmethod
|
||||
def get(cls) -> httpx.Client:
|
||||
"""Gets the httpx.Client. Will init to default settings if not init'd."""
|
||||
if not cls._client:
|
||||
with cls._lock:
|
||||
if not cls._client:
|
||||
cls._client = cls._init_client()
|
||||
return cls._client
|
||||
Reference in New Issue
Block a user