Compare commits

...

12 Commits

Author SHA1 Message Date
Richard Kuo (Danswer)
d8650699eb Merge branch 'main' of https://github.com/onyx-dot-app/onyx into bugfix/log-vespa-sync-timings 2025-01-28 15:41:46 -08:00
Richard Kuo (Danswer)
bdaa1c1b0c Merge commit '4a318bac2690fd6dca723732ef9c23f695464b7a' into bugfix/log-vespa-sync-timings 2025-01-28 15:05:03 -08:00
Richard Kuo (Danswer)
cba041275d use global httpx pool for the main vespa flows in celery. Use in more places eventually. 2025-01-28 14:46:05 -08:00
pablodanswer
4a318bac26 additional logs to narrow down issue 2025-01-28 13:55:04 -08:00
pablodanswer
52849386bd various improvements 2025-01-28 13:40:01 -08:00
pablodanswer
f02e6f00a2 add logs 2025-01-28 11:20:21 -08:00
Richard Kuo (Danswer)
7f18afbdb3 more debugging 2025-01-28 00:46:55 -08:00
Richard Kuo (Danswer)
6722e87fcf circular imports? 2025-01-27 21:21:48 -08:00
Richard Kuo (Danswer)
f87173f18d refactor multipass/db check out of VespaIndex 2025-01-27 20:37:06 -08:00
Richard Kuo (Danswer)
060a988d13 more debugging 2025-01-27 18:18:10 -08:00
Richard Kuo (Danswer)
12dcf6a09b add more logging 2025-01-27 14:29:47 -08:00
Richard Kuo (Danswer)
c1a4eea2d0 add timings for syncing 2025-01-27 11:54:17 -08:00
31 changed files with 573 additions and 168 deletions

View File

@@ -24,6 +24,7 @@ from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine import get_sqlalchemy_engine
from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout
from onyx.httpx.httpx_pool import HttpxPool
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from onyx.redis.redis_connector_delete import RedisConnectorDelete
@@ -316,6 +317,8 @@ def on_worker_ready(sender: Any, **kwargs: Any) -> None:
def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
HttpxPool.close_all()
if not celery_is_worker_primary(sender):
return

View File

@@ -10,6 +10,7 @@ from celery.signals import worker_ready
from celery.signals import worker_shutdown
import onyx.background.celery.apps.app_base as app_base
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
from onyx.configs.constants import POSTGRES_CELERY_WORKER_LIGHT_APP_NAME
from onyx.db.engine import SqlEngine
from onyx.utils.logger import setup_logger
@@ -54,12 +55,16 @@ def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None:
@worker_init.connect
def on_worker_init(sender: Worker, **kwargs: Any) -> None:
EXTRA_CONCURRENCY = 8 # small extra fudge factor for connection limits
logger.info("worker_init signal received.")
logger.info(f"Concurrency: {sender.concurrency}") # type: ignore
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME)
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) # type: ignore
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=EXTRA_CONCURRENCY) # type: ignore
httpx_init_vespa_pool(sender.concurrency + EXTRA_CONCURRENCY) # type: ignore
app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)

View File

@@ -1,10 +1,16 @@
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import cast
import httpx
from sqlalchemy.orm import Session
from onyx.configs.app_configs import MANAGED_VESPA
from onyx.configs.app_configs import MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE
from onyx.configs.app_configs import VESPA_CLOUD_CERT_PATH
from onyx.configs.app_configs import VESPA_CLOUD_KEY_PATH
from onyx.configs.app_configs import VESPA_REQUEST_TIMEOUT
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import (
rate_limit_builder,
)
@@ -17,6 +23,7 @@ from onyx.db.connector_credential_pair import get_connector_credential_pair
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import TaskStatus
from onyx.db.models import TaskQueueState
from onyx.httpx.httpx_pool import HttpxPool
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.redis.redis_connector import RedisConnector
from onyx.server.documents.models import DeletionAttemptSnapshot
@@ -154,3 +161,22 @@ def celery_is_worker_primary(worker: Any) -> bool:
return True
return False
def httpx_init_vespa_pool(max_keepalive_connections: int) -> None:
httpx_cert = None
httpx_verify = False
if MANAGED_VESPA:
httpx_cert = cast(
tuple[str, str], (VESPA_CLOUD_CERT_PATH, VESPA_CLOUD_KEY_PATH)
)
httpx_verify = True
HttpxPool.init_client(
name="vespa",
cert=httpx_cert,
verify=httpx_verify,
timeout=VESPA_REQUEST_TIMEOUT,
http2=False,
limits=httpx.Limits(max_keepalive_connections=max_keepalive_connections),
)

View File

@@ -15,6 +15,7 @@ from redis import Redis
from redis.lock import Lock as RedisLock
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
from onyx.background.celery.tasks.indexing.utils import _should_index
from onyx.background.celery.tasks.indexing.utils import get_unfenced_index_attempt_ids
from onyx.background.celery.tasks.indexing.utils import IndexingCallback
@@ -303,6 +304,8 @@ def connector_indexing_task(
attempt_found = False
n_final_progress: int | None = None
httpx_init_vespa_pool(20) # documented default
redis_connector = RedisConnector(tenant_id, cc_pair_id)
redis_connector_index = redis_connector.new_index(search_settings_id)

View File

@@ -27,9 +27,10 @@ from onyx.db.document import mark_document_as_synced
from onyx.db.document_set import fetch_document_sets_for_document
from onyx.db.engine import get_all_tenant_ids
from onyx.db.engine import get_session_with_tenant
from onyx.document_index.document_index_utils import get_both_index_names
from onyx.document_index.document_index_utils import get_both_index_properties
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import VespaDocumentFields
from onyx.httpx.httpx_pool import HttpxPool
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.server.documents.models import ConnectorCredentialPairIdentifier
@@ -79,9 +80,18 @@ def document_by_cc_pair_cleanup_task(
action = "skip"
chunks_affected = 0
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
(
curr_ind_name,
sec_ind_name,
large_chunks,
secondary_large_chunks,
) = get_both_index_properties(db_session)
doc_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
primary_index_name=curr_ind_name,
secondary_index_name=sec_ind_name,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=secondary_large_chunks,
httpx_client=HttpxPool.get("vespa"),
)
retry_index = RetryDocumentIndex(doc_index)

View File

@@ -64,9 +64,10 @@ from onyx.db.models import UserGroup
from onyx.db.sync_record import cleanup_sync_records
from onyx.db.sync_record import insert_sync_record
from onyx.db.sync_record import update_sync_record_status
from onyx.document_index.document_index_utils import get_both_index_names
from onyx.document_index.document_index_utils import get_both_index_properties
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import VespaDocumentFields
from onyx.httpx.httpx_pool import HttpxPool
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from onyx.redis.redis_connector_delete import RedisConnectorDelete
@@ -1092,30 +1093,51 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
def vespa_metadata_sync_task(
self: Task, document_id: str, tenant_id: str | None
) -> bool:
timings: dict[str, Any] = {}
start = time.monotonic()
timings["start"] = start
try:
with get_session_with_tenant(tenant_id) as db_session:
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
phase_start = time.monotonic()
(
curr_ind_name,
sec_ind_name,
large_chunks,
secondary_large_chunks,
) = get_both_index_properties(db_session)
doc_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
primary_index_name=curr_ind_name,
secondary_index_name=sec_ind_name,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=secondary_large_chunks,
httpx_client=HttpxPool.get("vespa"),
)
timings["get_index"] = time.monotonic() - phase_start
phase_start = time.monotonic()
retry_index = RetryDocumentIndex(doc_index)
doc = get_document(document_id, db_session)
if not doc:
return False
timings["get_document"] = time.monotonic() - phase_start
# document set sync
phase_start = time.monotonic()
doc_sets = fetch_document_sets_for_document(document_id, db_session)
update_doc_sets: set[str] = set(doc_sets)
timings["fetch_document_sets_for_document"] = time.monotonic() - phase_start
# User group sync
phase_start = time.monotonic()
doc_access = get_access_for_document(
document_id=document_id, db_session=db_session
)
timings["get_access_for_document"] = time.monotonic() - phase_start
phase_start = time.monotonic()
fields = VespaDocumentFields(
document_sets=update_doc_sets,
access=doc_access,
@@ -1130,10 +1152,13 @@ def vespa_metadata_sync_task(
chunk_count=doc.chunk_count,
fields=fields,
)
timings["index.update_single"] = time.monotonic() - phase_start
# update db last. Worst case = we crash right before this and
# the sync might repeat again later
phase_start = time.monotonic()
mark_document_as_synced(document_id, db_session)
timings["mark_document_as_synced"] = time.monotonic() - phase_start
# this code checks for and removes a per document sync key that is
# used to block out the same doc from continualy resyncing
@@ -1149,7 +1174,8 @@ def vespa_metadata_sync_task(
f"doc={document_id} "
f"action=sync "
f"chunks={chunks_affected} "
f"elapsed={elapsed:.2f}"
f"elapsed={elapsed:.2f} "
f"debug_timings={timings}"
)
except SoftTimeLimitExceeded:
task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}")

View File

@@ -34,7 +34,9 @@ from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import IndexAttempt
from onyx.db.models import IndexingStatus
from onyx.db.models import IndexModelStatus
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.httpx.httpx_pool import HttpxPool
from onyx.indexing.embedder import DefaultIndexingEmbedder
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.indexing.indexing_pipeline import build_indexing_pipeline
@@ -149,6 +151,7 @@ class RunIndexingContext(BaseModel):
from_beginning: bool
is_primary: bool
search_settings_status: IndexModelStatus
large_chunks_enabled: bool
def _run_indexing(
@@ -179,6 +182,8 @@ def _run_indexing(
"Search settings must be set for indexing. This should not be possible."
)
multipass_config = get_multipass_config(index_attempt_start.search_settings)
# search_settings = index_attempt_start.search_settings
db_connector = index_attempt_start.connector_credential_pair.connector
db_credential = index_attempt_start.connector_credential_pair.credential
@@ -200,6 +205,7 @@ def _run_indexing(
index_attempt_start.search_settings.status == IndexModelStatus.PRESENT
),
search_settings_status=index_attempt_start.search_settings.status,
large_chunks_enabled=multipass_config.enable_large_chunks,
)
last_successful_index_time = (
@@ -221,7 +227,11 @@ def _run_indexing(
# Indexing is only done into one index at a time
document_index = get_default_document_index(
primary_index_name=ctx.index_name, secondary_index_name=None
primary_index_name=ctx.index_name,
secondary_index_name=None,
large_chunks_enabled=ctx.large_chunks_enabled,
secondary_large_chunks_enabled=None,
httpx_client=HttpxPool.get("vespa"),
)
indexing_pipeline = build_indexing_pipeline(

View File

@@ -63,6 +63,7 @@ from onyx.db.models import ToolCall
from onyx.db.models import User
from onyx.db.persona import get_persona_by_id
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.file_store.models import ChatFileType
from onyx.file_store.models import FileDescriptor
@@ -425,8 +426,12 @@ def stream_chat_message_objects(
)
search_settings = get_current_search_settings(db_session)
mp_config = get_multipass_config(search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name, secondary_index_name=None
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
# Every chat Session begins with an empty root message

View File

@@ -29,6 +29,7 @@ from onyx.context.search.utils import inference_section_from_chunks
from onyx.context.search.utils import relevant_sections_to_indices
from onyx.db.models import User
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import VespaChunkRequest
from onyx.llm.interfaces import LLM
@@ -67,9 +68,12 @@ class SearchPipeline:
self.rerank_metrics_callback = rerank_metrics_callback
self.search_settings = get_current_search_settings(db_session)
mp_config = get_multipass_config(self.search_settings)
self.document_index = get_default_document_index(
primary_index_name=self.search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
self.prompt_config: PromptConfig | None = prompt_config

View File

@@ -452,10 +452,16 @@ def update_docs_chunk_count__no_commit(
doc_id_to_chunk_count: dict[str, int],
db_session: Session,
) -> None:
logger.debug("Updating chunk count for these documents")
logger.debug(document_ids)
logger.debug(doc_id_to_chunk_count)
documents_to_update = (
db_session.query(DbDocument).filter(DbDocument.id.in_(document_ids)).all()
)
for doc in documents_to_update:
logger.debug(
f"Updating chunk count for {doc.id} to {doc_id_to_chunk_count[doc.id]}"
)
doc.chunk_count = doc_id_to_chunk_count[doc.id]

View File

@@ -4,24 +4,76 @@ from uuid import UUID
from sqlalchemy.orm import Session
from onyx.configs.app_configs import ENABLE_MULTIPASS_INDEXING
from onyx.db.models import SearchSettings
from onyx.db.search_settings import get_current_search_settings
from onyx.db.search_settings import get_secondary_search_settings
from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo
from onyx.indexing.models import DocMetadataAwareIndexChunk
from onyx.indexing.models import EmbeddingProvider
from onyx.indexing.models import MultipassConfig
from shared_configs.configs import MULTI_TENANT
DEFAULT_BATCH_SIZE = 30
DEFAULT_INDEX_NAME = "danswer_chunk"
def get_both_index_names(db_session: Session) -> tuple[str, str | None]:
def should_use_multipass(search_settings: SearchSettings | None) -> bool:
"""
Determines whether multipass should be used based on the search settings
or the default config if settings are unavailable.
"""
if search_settings is not None:
return search_settings.multipass_indexing
return ENABLE_MULTIPASS_INDEXING
def can_use_large_chunks(multipass: bool, search_settings: SearchSettings) -> bool:
"""
Given multipass usage and an embedder, decides whether large chunks are allowed
based on model/provider constraints.
"""
# Only local models that support a larger context are from Nomic
# Cohere does not support larger contexts (they recommend not going above ~512 tokens)
return (
multipass
and search_settings.model_name.startswith("nomic-ai")
and search_settings.provider_type != EmbeddingProvider.COHERE
)
def get_multipass_config(search_settings: SearchSettings) -> MultipassConfig:
"""
Determines whether to enable multipass and large chunks by examining
the current search settings and the embedder configuration.
"""
if not search_settings:
return MultipassConfig(multipass_indexing=False, enable_large_chunks=False)
multipass = should_use_multipass(search_settings)
enable_large_chunks = can_use_large_chunks(multipass, search_settings)
return MultipassConfig(
multipass_indexing=multipass, enable_large_chunks=enable_large_chunks
)
def get_both_index_properties(
db_session: Session,
) -> tuple[str, str | None, bool, bool | None]:
search_settings = get_current_search_settings(db_session)
config_1 = get_multipass_config(search_settings)
search_settings_new = get_secondary_search_settings(db_session)
if not search_settings_new:
return search_settings.index_name, None
return search_settings.index_name, None, config_1.enable_large_chunks, None
return search_settings.index_name, search_settings_new.index_name
config_2 = get_multipass_config(search_settings)
return (
search_settings.index_name,
search_settings_new.index_name,
config_1.enable_large_chunks,
config_2.enable_large_chunks,
)
def translate_boost_count_to_multiplier(boost: int) -> float:

View File

@@ -1,6 +1,8 @@
import httpx
from sqlalchemy.orm import Session
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.vespa.index import VespaIndex
from shared_configs.configs import MULTI_TENANT
@@ -9,15 +11,22 @@ from shared_configs.configs import MULTI_TENANT
def get_default_document_index(
primary_index_name: str,
secondary_index_name: str | None,
large_chunks_enabled: bool,
secondary_large_chunks_enabled: bool | None,
httpx_client: httpx.Client | None = None,
) -> DocumentIndex:
"""Primary index is the index that is used for querying/updating etc.
Secondary index is for when both the currently used index and the upcoming
index both need to be updated, updates are applied to both indices"""
# Currently only supporting Vespa
return VespaIndex(
index_name=primary_index_name,
secondary_index_name=secondary_index_name,
large_chunks_enabled=large_chunks_enabled,
secondary_large_chunks_enabled=secondary_large_chunks_enabled,
multitenant=MULTI_TENANT,
httpx_client=httpx_client,
)
@@ -26,7 +35,10 @@ def get_current_primary_default_document_index(db_session: Session) -> DocumentI
TODO: Use redis to cache this or something
"""
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
return get_default_document_index(
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)

View File

@@ -231,21 +231,21 @@ def _get_chunks_via_visit_api(
return document_chunks
@retry(tries=10, delay=1, backoff=2)
def get_all_vespa_ids_for_document_id(
document_id: str,
index_name: str,
filters: IndexFilters | None = None,
get_large_chunks: bool = False,
) -> list[str]:
document_chunks = _get_chunks_via_visit_api(
chunk_request=VespaChunkRequest(document_id=document_id),
index_name=index_name,
filters=filters or IndexFilters(access_control_list=None),
field_names=[DOCUMENT_ID],
get_large_chunks=get_large_chunks,
)
return [chunk["id"].split("::", 1)[-1] for chunk in document_chunks]
# @retry(tries=10, delay=1, backoff=2)
# def get_all_vespa_ids_for_document_id(
# document_id: str,
# index_name: str,
# filters: IndexFilters | None = None,
# get_large_chunks: bool = False,
# ) -> list[str]:
# document_chunks = _get_chunks_via_visit_api(
# chunk_request=VespaChunkRequest(document_id=document_id),
# index_name=index_name,
# filters=filters or IndexFilters(access_control_list=None),
# field_names=[DOCUMENT_ID],
# get_large_chunks=get_large_chunks,
# )
# return [chunk["id"].split("::", 1)[-1] for chunk in document_chunks]
def parallel_visit_api_retrieval(

View File

@@ -10,6 +10,7 @@ import zipfile
from dataclasses import dataclass
from datetime import datetime
from datetime import timedelta
from typing import Any
from typing import BinaryIO
from typing import cast
from typing import List
@@ -25,7 +26,6 @@ from onyx.configs.chat_configs import VESPA_SEARCHER_THREADS
from onyx.configs.constants import KV_REINDEX_KEY
from onyx.context.search.models import IndexFilters
from onyx.context.search.models import InferenceChunkUncleaned
from onyx.db.engine import get_session_with_tenant
from onyx.document_index.document_index_utils import get_document_chunk_ids
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.interfaces import DocumentInsertionRecord
@@ -41,12 +41,12 @@ from onyx.document_index.vespa.chunk_retrieval import (
)
from onyx.document_index.vespa.chunk_retrieval import query_vespa
from onyx.document_index.vespa.deletion import delete_vespa_chunks
from onyx.document_index.vespa.indexing_utils import BaseHTTPXClientContext
from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks
from onyx.document_index.vespa.indexing_utils import check_for_final_chunk_existence
from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy
from onyx.document_index.vespa.indexing_utils import (
get_multipass_config,
)
from onyx.document_index.vespa.indexing_utils import GlobalHTTPXClientContext
from onyx.document_index.vespa.indexing_utils import TemporaryHTTPXClientContext
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
from onyx.document_index.vespa.shared_utils.utils import (
replace_invalid_doc_id_characters,
@@ -132,12 +132,34 @@ class VespaIndex(DocumentIndex):
self,
index_name: str,
secondary_index_name: str | None,
large_chunks_enabled: bool,
secondary_large_chunks_enabled: bool | None,
multitenant: bool = False,
httpx_client: httpx.Client | None = None,
) -> None:
self.index_name = index_name
self.secondary_index_name = secondary_index_name
self.large_chunks_enabled = large_chunks_enabled
self.secondary_large_chunks_enabled = secondary_large_chunks_enabled
self.multitenant = multitenant
self.http_client = get_vespa_http_client()
self.httpx_client_context: BaseHTTPXClientContext
if httpx_client:
self.httpx_client_context = GlobalHTTPXClientContext(httpx_client)
else:
self.httpx_client_context = TemporaryHTTPXClientContext(
get_vespa_http_client
)
self.index_to_large_chunks_enabled: dict[str, bool] = {}
self.index_to_large_chunks_enabled[index_name] = large_chunks_enabled
if secondary_index_name and secondary_large_chunks_enabled:
self.index_to_large_chunks_enabled[
secondary_index_name
] = secondary_large_chunks_enabled
def ensure_indices_exist(
self,
@@ -331,7 +353,7 @@ class VespaIndex(DocumentIndex):
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
get_vespa_http_client() as http_client,
self.httpx_client_context as http_client,
):
# We require the start and end index for each document in order to
# know precisely which chunks to delete. This information exists for
@@ -349,6 +371,12 @@ class VespaIndex(DocumentIndex):
for doc_id in doc_id_to_new_chunk_cnt.keys()
]
logger.debug("Indexing these doc IDs / counts")
logger.debug(doc_id_to_new_chunk_cnt)
logger.debug("Enriched docs are as follows")
logger.debug(enriched_doc_infos)
for cleaned_doc_info in enriched_doc_infos:
# If the document has previously indexed chunks, we know it previously existed
if cleaned_doc_info.chunk_end_index:
@@ -390,9 +418,11 @@ class VespaIndex(DocumentIndex):
for doc_id in all_doc_ids
}
@staticmethod
@classmethod
def _apply_updates_batched(
cls,
updates: list[_VespaUpdateRequest],
httpx_client: httpx.Client,
batch_size: int = BATCH_SIZE,
) -> None:
"""Runs a batch of updates in parallel via the ThreadPoolExecutor."""
@@ -414,7 +444,7 @@ class VespaIndex(DocumentIndex):
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
get_vespa_http_client() as http_client,
httpx_client as http_client,
):
for update_batch in batch_generator(updates, batch_size):
future_to_document_id = {
@@ -455,7 +485,7 @@ class VespaIndex(DocumentIndex):
index_names.append(self.secondary_index_name)
chunk_id_start_time = time.monotonic()
with get_vespa_http_client() as http_client:
with self.httpx_client_context as http_client:
for update_request in update_requests:
for doc_info in update_request.minimal_document_indexing_info:
for index_name in index_names:
@@ -511,7 +541,8 @@ class VespaIndex(DocumentIndex):
)
)
self._apply_updates_batched(processed_updates_requests)
with self.httpx_client_context as httpx_client:
self._apply_updates_batched(processed_updates_requests, httpx_client)
logger.debug(
"Finished updating Vespa documents in %.2f seconds",
time.monotonic() - update_start,
@@ -523,6 +554,7 @@ class VespaIndex(DocumentIndex):
index_name: str,
fields: VespaDocumentFields,
doc_id: str,
http_client: httpx.Client,
) -> None:
"""
Update a single "chunk" (document) in Vespa using its chunk ID.
@@ -554,18 +586,17 @@ class VespaIndex(DocumentIndex):
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}?create=true"
with get_vespa_http_client(http2=False) as http_client:
try:
resp = http_client.put(
vespa_url,
headers={"Content-Type": "application/json"},
json=update_dict,
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}"
logger.error(error_message)
raise
try:
resp = http_client.put(
vespa_url,
headers={"Content-Type": "application/json"},
json=update_dict,
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}"
logger.error(error_message)
raise
def update_single(
self,
@@ -579,24 +610,26 @@ class VespaIndex(DocumentIndex):
function will complete with no errors or exceptions.
Handle other exceptions if you wish to implement retry behavior
"""
start = time.monotonic()
timings: dict[str, Any] = {}
doc_chunk_count = 0
index_names = [self.index_name]
if self.secondary_index_name:
index_names.append(self.secondary_index_name)
index = 0
with self.httpx_client_context as httpx_client:
timings["get_vespa_http_client_enter"] = time.monotonic() - start
for (
index_name,
large_chunks_enabled,
) in self.index_to_large_chunks_enabled.items():
index += 1
phase_start = time.monotonic()
with get_vespa_http_client(http2=False) as http_client:
for index_name in index_names:
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
multipass_config = get_multipass_config(
db_session=db_session,
primary_index=index_name == self.index_name,
)
large_chunks_enabled = multipass_config.enable_large_chunks
enriched_doc_infos = VespaIndex.enrich_basic_chunk_info(
index_name=index_name,
http_client=http_client,
http_client=httpx_client,
document_id=doc_id,
previous_chunk_count=chunk_count,
new_chunk_count=0,
@@ -608,16 +641,42 @@ class VespaIndex(DocumentIndex):
large_chunks_enabled=large_chunks_enabled,
)
doc_chunk_count += len(doc_chunk_ids)
for doc_chunk_id in doc_chunk_ids:
self.update_single_chunk(
doc_chunk_id=doc_chunk_id,
index_name=index_name,
fields=fields,
doc_id=doc_id,
if len(doc_chunk_ids) == 0:
logger.warning(
f"len(doc_chunk_ids) == 0: "
f"tenant_id={tenant_id} "
f"enriched_doc_infos={enriched_doc_infos} "
f"large_chunks_enabled={large_chunks_enabled}"
)
logger.warning(
"Document chunk info:\n"
f"Enriched doc info: {enriched_doc_infos}\n"
f"Doc chunk ids: {doc_chunk_ids}\n"
f"Document ID: {doc_id}"
f"Tenant_id: {tenant_id}"
)
doc_chunk_count += len(doc_chunk_ids)
timings[f"prep_{index}"] = time.monotonic() - phase_start
chunk = 0
for doc_chunk_id in doc_chunk_ids:
chunk += 1
phase_start = time.monotonic()
self.update_single_chunk(
doc_chunk_id, index_name, fields, doc_id, httpx_client
)
timings[f"chunk_{index}_{chunk}"] = time.monotonic() - phase_start
phase_start = time.monotonic()
timings["get_vespa_http_client_exit"] = time.monotonic() - phase_start
elapsed = time.monotonic() - start
logger.debug(
f"num_chunks={doc_chunk_count} elapsed={elapsed:.2f} timings={timings}"
)
return doc_chunk_count
def delete_single(
@@ -637,19 +696,13 @@ class VespaIndex(DocumentIndex):
if self.secondary_index_name:
index_names.append(self.secondary_index_name)
with get_vespa_http_client(
http2=False
) as http_client, concurrent.futures.ThreadPoolExecutor(
with self.httpx_client_context as http_client, concurrent.futures.ThreadPoolExecutor(
max_workers=NUM_THREADS
) as executor:
for index_name in index_names:
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
multipass_config = get_multipass_config(
db_session=db_session,
primary_index=index_name == self.index_name,
)
large_chunks_enabled = multipass_config.enable_large_chunks
for (
index_name,
large_chunks_enabled,
) in self.index_to_large_chunks_enabled.items():
enriched_doc_infos = VespaIndex.enrich_basic_chunk_info(
index_name=index_name,
http_client=http_client,
@@ -808,9 +861,8 @@ class VespaIndex(DocumentIndex):
)
return enriched_doc_info
@classmethod
def delete_entries_by_tenant_id(
cls,
self,
*,
tenant_id: str,
index_name: str,
@@ -827,7 +879,7 @@ class VespaIndex(DocumentIndex):
)
# Step 1: Retrieve all document IDs with the given tenant_id
document_ids = cls._get_all_document_ids_by_tenant_id(tenant_id, index_name)
document_ids = self._get_all_document_ids_by_tenant_id(tenant_id, index_name)
if not document_ids:
logger.info(
@@ -841,11 +893,10 @@ class VespaIndex(DocumentIndex):
for doc_id in document_ids
]
cls._apply_deletes_batched(delete_requests)
self._apply_deletes_batched(delete_requests)
@classmethod
def _get_all_document_ids_by_tenant_id(
cls, tenant_id: str, index_name: str
self, tenant_id: str, index_name: str
) -> List[str]:
"""
Retrieves all document IDs with the specified tenant_id, handling pagination.
@@ -882,8 +933,8 @@ class VespaIndex(DocumentIndex):
f"Querying for document IDs with tenant_id: {tenant_id}, offset: {offset}"
)
with get_vespa_http_client(no_timeout=True) as http_client:
response = http_client.get(url, params=query_params)
with self.httpx_client_context as http_client:
response = http_client.get(url, params=query_params, timeout=None)
response.raise_for_status()
search_result = response.json()
@@ -904,9 +955,8 @@ class VespaIndex(DocumentIndex):
)
return document_ids
@classmethod
def _apply_deletes_batched(
cls,
self,
delete_requests: List["_VespaDeleteRequest"],
batch_size: int = BATCH_SIZE,
) -> None:
@@ -925,13 +975,14 @@ class VespaIndex(DocumentIndex):
response = http_client.delete(
delete_request.url,
headers={"Content-Type": "application/json"},
timeout=None,
)
response.raise_for_status()
logger.debug(f"Starting batch deletion for {len(delete_requests)} documents")
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
with get_vespa_http_client(no_timeout=True) as http_client:
with self.httpx_client_context as http_client:
for batch_start in range(0, len(delete_requests), batch_size):
batch = delete_requests[batch_start : batch_start + batch_size]

View File

@@ -1,21 +1,19 @@
import concurrent.futures
import json
import uuid
from abc import ABC
from abc import abstractmethod
from collections.abc import Callable
from datetime import datetime
from datetime import timezone
from http import HTTPStatus
import httpx
from retry import retry
from sqlalchemy.orm import Session
from onyx.configs.app_configs import ENABLE_MULTIPASS_INDEXING
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
get_experts_stores_representations,
)
from onyx.db.models import SearchSettings
from onyx.db.search_settings import get_current_search_settings
from onyx.db.search_settings import get_secondary_search_settings
from onyx.document_index.document_index_utils import get_uuid_from_chunk
from onyx.document_index.document_index_utils import get_uuid_from_chunk_info_old
from onyx.document_index.interfaces import MinimalDocumentIndexingInfo
@@ -50,10 +48,9 @@ from onyx.document_index.vespa_constants import TENANT_ID
from onyx.document_index.vespa_constants import TITLE
from onyx.document_index.vespa_constants import TITLE_EMBEDDING
from onyx.indexing.models import DocMetadataAwareIndexChunk
from onyx.indexing.models import EmbeddingProvider
from onyx.indexing.models import MultipassConfig
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -275,46 +272,63 @@ def check_for_final_chunk_existence(
index += 1
def should_use_multipass(search_settings: SearchSettings | None) -> bool:
"""
Determines whether multipass should be used based on the search settings
or the default config if settings are unavailable.
"""
if search_settings is not None:
return search_settings.multipass_indexing
return ENABLE_MULTIPASS_INDEXING
# def get_multipass_config(
# db_session: Session, primary_index: bool = True
# ) -> MultipassConfig:
# """
# Determines whether to enable multipass and large chunks by examining
# the current search settings and the embedder configuration.
# """
# search_settings = (
# get_current_search_settings(db_session)
# if primary_index
# else get_secondary_search_settings(db_session)
# )
# multipass = should_use_multipass(search_settings)
# if not search_settings:
# return MultipassConfig(multipass_indexing=False, enable_large_chunks=False)
# enable_large_chunks = can_use_large_chunks(multipass, search_settings)
# return MultipassConfig(
# multipass_indexing=multipass, enable_large_chunks=enable_large_chunks
# )
def can_use_large_chunks(multipass: bool, search_settings: SearchSettings) -> bool:
"""
Given multipass usage and an embedder, decides whether large chunks are allowed
based on model/provider constraints.
"""
# Only local models that support a larger context are from Nomic
# Cohere does not support larger contexts (they recommend not going above ~512 tokens)
return (
multipass
and search_settings.model_name.startswith("nomic-ai")
and search_settings.provider_type != EmbeddingProvider.COHERE
)
class BaseHTTPXClientContext(ABC):
"""Abstract base class for an HTTPX client context manager."""
@abstractmethod
def __enter__(self) -> httpx.Client:
pass
@abstractmethod
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
pass
def get_multipass_config(
db_session: Session, primary_index: bool = True
) -> MultipassConfig:
"""
Determines whether to enable multipass and large chunks by examining
the current search settings and the embedder configuration.
"""
search_settings = (
get_current_search_settings(db_session)
if primary_index
else get_secondary_search_settings(db_session)
)
multipass = should_use_multipass(search_settings)
if not search_settings:
return MultipassConfig(multipass_indexing=False, enable_large_chunks=False)
enable_large_chunks = can_use_large_chunks(multipass, search_settings)
return MultipassConfig(
multipass_indexing=multipass, enable_large_chunks=enable_large_chunks
)
class GlobalHTTPXClientContext(BaseHTTPXClientContext):
"""Context manager for a global HTTPX client that does not close it."""
def __init__(self, client: httpx.Client):
self._client = client
def __enter__(self) -> httpx.Client:
return self._client # Reuse the global client
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
pass # Do nothing; don't close the global client
class TemporaryHTTPXClientContext(BaseHTTPXClientContext):
"""Context manager for a temporary HTTPX client that closes it after use."""
def __init__(self, client_factory: Callable[[], httpx.Client]):
self._client_factory = client_factory
self._client: httpx.Client | None = None # Client will be created in __enter__
def __enter__(self) -> httpx.Client:
self._client = self._client_factory() # Create a new client
return self._client
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
if self._client:
self._client.close()

View File

@@ -0,0 +1,57 @@
import threading
from typing import Any
import httpx
class HttpxPool:
"""Class to manage a global httpx Client instance"""
_clients: dict[str, httpx.Client] = {}
_lock: threading.Lock = threading.Lock()
# Default parameters for creation
DEFAULT_KWARGS = {
"http2": True,
"limits": lambda: httpx.Limits(),
}
def __init__(self) -> None:
pass
@classmethod
def _init_client(cls, **kwargs: Any) -> httpx.Client:
"""Private helper method to create and return an httpx.Client."""
merged_kwargs = {**cls.DEFAULT_KWARGS, **kwargs}
return httpx.Client(**merged_kwargs)
@classmethod
def init_client(cls, name: str, **kwargs: Any) -> None:
"""Allow the caller to init the client with extra params."""
with cls._lock:
if name not in cls._clients:
cls._clients[name] = cls._init_client(**kwargs)
@classmethod
def close_client(cls, name: str) -> None:
"""Allow the caller to close the client."""
with cls._lock:
client = cls._clients.pop(name, None)
if client:
client.close()
@classmethod
def close_all(cls) -> None:
"""Close all registered clients."""
with cls._lock:
for client in cls._clients.values():
client.close()
cls._clients.clear()
@classmethod
def get(cls, name: str) -> httpx.Client:
"""Gets the httpx.Client. Will init to default settings if not init'd."""
with cls._lock:
if name not in cls._clients:
cls._clients[name] = cls._init_client()
return cls._clients[name]

View File

@@ -31,14 +31,15 @@ from onyx.db.document import upsert_documents
from onyx.db.document_set import fetch_document_sets_for_documents
from onyx.db.index_attempt import create_index_attempt_error
from onyx.db.models import Document as DBDocument
from onyx.db.search_settings import get_current_search_settings
from onyx.db.tag import create_or_add_document_tag
from onyx.db.tag import create_or_add_document_tag_list
from onyx.document_index.document_index_utils import (
get_multipass_config,
)
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.interfaces import DocumentMetadata
from onyx.document_index.interfaces import IndexBatchParams
from onyx.document_index.vespa.indexing_utils import (
get_multipass_config,
)
from onyx.indexing.chunker import Chunker
from onyx.indexing.embedder import IndexingEmbedder
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
@@ -358,8 +359,15 @@ def index_doc_batch(
)
logger.debug("Filtering Documents")
logger.debug("Document IDs before filtering")
logger.debug([doc.id for doc in document_batch])
filtered_documents = filter_fnc(document_batch)
logger.debug("Document IDs after filtering")
logger.debug([doc.id for doc in filtered_documents])
ctx = index_doc_batch_prepare(
documents=filtered_documents,
index_attempt_metadata=index_attempt_metadata,
@@ -481,6 +489,7 @@ def index_doc_batch(
update_docs_updated_at__no_commit(
ids_to_new_updated_at=ids_to_new_updated_at, db_session=db_session
)
logger.info("UPDATING DOCS")
update_docs_last_modified__no_commit(
document_ids=last_modified_ids, db_session=db_session
@@ -527,7 +536,8 @@ def build_indexing_pipeline(
callback: IndexingHeartbeatInterface | None = None,
) -> IndexingPipelineProtocol:
"""Builds a pipeline which takes in a list (batch) of docs and indexes them."""
multipass_config = get_multipass_config(db_session, primary_index=True)
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
chunker = chunker or Chunker(
tokenizer=embedder.embedding_model.tokenizer,

View File

@@ -55,9 +55,7 @@ class DocAwareChunk(BaseChunk):
def to_short_descriptor(self) -> str:
"""Used when logging the identity of a chunk"""
return (
f"Chunk ID: '{self.chunk_id}'; {self.source_document.to_short_descriptor()}"
)
return f"{self.source_document.to_short_descriptor()} Chunk ID: {self.chunk_id}"
class IndexChunk(DocAwareChunk):

View File

@@ -16,7 +16,7 @@ from onyx.context.search.preprocessing.access_filters import (
from onyx.db.document_set import get_document_sets_by_ids
from onyx.db.models import StarterMessageModel as StarterMessage
from onyx.db.models import User
from onyx.document_index.document_index_utils import get_both_index_names
from onyx.document_index.document_index_utils import get_both_index_properties
from onyx.document_index.factory import get_default_document_index
from onyx.llm.factory import get_default_llms
from onyx.prompts.starter_messages import format_persona_starter_message_prompt
@@ -34,8 +34,18 @@ def get_random_chunks_from_doc_sets(
"""
Retrieves random chunks from the specified document sets.
"""
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
document_index = get_default_document_index(curr_ind_name, sec_ind_name)
(
curr_ind_name,
sec_ind_name,
large_chunks,
secondary_large_chunks,
) = get_both_index_properties(db_session)
document_index = get_default_document_index(
curr_ind_name,
sec_ind_name,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=secondary_large_chunks,
)
acl_filters = build_access_filters_for_user(user, db_session)
filters = IndexFilters(document_set=doc_sets, access_control_list=acl_filters)

View File

@@ -3,6 +3,7 @@ import json
import os
from typing import cast
from sqlalchemy import update
from sqlalchemy.orm import Session
from onyx.access.models import default_public_access
@@ -23,7 +24,9 @@ from onyx.db.document import check_docs_exist
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.index_attempt import mock_successful_index_attempt
from onyx.db.models import Document as DbDocument
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import IndexBatchParams
from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout
@@ -59,6 +62,7 @@ def _create_indexable_chunks(
doc_updated_at=None,
primary_owners=[],
secondary_owners=[],
chunk_count=1,
)
if preprocessed_doc["chunk_ind"] == 0:
ids_to_documents[document.id] = document
@@ -155,8 +159,12 @@ def seed_initial_documents(
logger.info("Embedding model has been updated, skipping")
return
mp_config = get_multipass_config(search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name, secondary_index_name=None
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
# Create a connector so the user can delete it if they want
@@ -240,4 +248,12 @@ def seed_initial_documents(
db_session=db_session,
)
# Since we bypass the indexing flow, we need to manually update the chunk count
for doc in docs:
db_session.execute(
update(DbDocument)
.where(DbDocument.id == doc.id)
.values(chunk_count=doc.chunk_count)
)
kv_store.store(KV_DOCUMENTS_SEEDED_KEY, True)

View File

@@ -12,6 +12,7 @@ from onyx.context.search.preprocessing.access_filters import (
from onyx.db.engine import get_session
from onyx.db.models import User
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import VespaChunkRequest
from onyx.natural_language_processing.utils import get_tokenizer
@@ -32,9 +33,12 @@ def get_document_info(
db_session: Session = Depends(get_session),
) -> DocumentInfo:
search_settings = get_current_search_settings(db_session)
mp_config = get_multipass_config(search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name, secondary_index_name=None
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
user_acl_filters = build_access_filters_for_user(user, db_session)
@@ -79,9 +83,12 @@ def get_chunk_info(
db_session: Session = Depends(get_session),
) -> ChunkInfo:
search_settings = get_current_search_settings(db_session)
mp_config = get_multipass_config(search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name, secondary_index_name=None
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
user_acl_filters = build_access_filters_for_user(user, db_session)

View File

@@ -22,6 +22,7 @@ from onyx.db.search_settings import get_embedding_provider_from_provider_type
from onyx.db.search_settings import get_secondary_search_settings
from onyx.db.search_settings import update_current_search_settings
from onyx.db.search_settings import update_search_settings_status
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.file_processing.unstructured import delete_unstructured_api_key
from onyx.file_processing.unstructured import get_unstructured_api_key
@@ -97,9 +98,13 @@ def set_new_search_settings(
)
# Ensure Vespa has the new index immediately
mp_config_1 = get_multipass_config(search_settings)
mp_config_2 = get_multipass_config(new_search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name,
secondary_index_name=new_search_settings.index_name,
large_chunks_enabled=mp_config_1.enable_large_chunks,
secondary_large_chunks_enabled=mp_config_2.enable_large_chunks,
)
document_index.ensure_indices_exist(

View File

@@ -16,7 +16,7 @@ from onyx.db.engine import get_session
from onyx.db.models import User
from onyx.db.search_settings import get_current_search_settings
from onyx.db.search_settings import get_secondary_search_settings
from onyx.document_index.document_index_utils import get_both_index_names
from onyx.document_index.document_index_utils import get_both_index_properties
from onyx.document_index.factory import get_default_document_index
from onyx.indexing.embedder import DefaultIndexingEmbedder
from onyx.indexing.indexing_pipeline import build_indexing_pipeline
@@ -89,11 +89,24 @@ def upsert_ingestion_doc(
)
# Need to index for both the primary and secondary index if possible
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
(
curr_ind_name,
sec_ind_name,
large_chunks,
secondary_large_chunks,
) = get_both_index_properties(db_session)
curr_doc_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=None
primary_index_name=curr_ind_name,
secondary_index_name=None,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=None,
)
# curr_ind_name, sec_ind_name = get_both_index_names(db_session)
# curr_doc_index = get_default_document_index(
# primary_index_name=curr_ind_name, secondary_index_name=None
# )
search_settings = get_current_search_settings(db_session)
index_embedding_model = DefaultIndexingEmbedder.from_db_search_settings(
@@ -118,8 +131,12 @@ def upsert_ingestion_doc(
# If there's a secondary index being built, index the doc but don't use it for return here
if sec_ind_name:
# rkuo: i don't understand why we create the secondaray index with the current index again
sec_doc_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=None
primary_index_name=curr_ind_name,
secondary_index_name=None,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=None,
)
sec_search_settings = get_secondary_search_settings(db_session)

View File

@@ -27,6 +27,7 @@ from onyx.db.engine import get_session
from onyx.db.models import User
from onyx.db.search_settings import get_current_search_settings
from onyx.db.tag import find_tags
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.vespa.index import VespaIndex
from onyx.server.query_and_chat.models import AdminSearchRequest
@@ -64,9 +65,14 @@ def admin_search(
tenant_id=tenant_id,
)
search_settings = get_current_search_settings(db_session)
mp_config = get_multipass_config(search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name, secondary_index_name=None
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
if not isinstance(document_index, VespaIndex):
raise HTTPException(
status_code=400,

View File

@@ -30,6 +30,7 @@ from onyx.db.search_settings import get_secondary_search_settings
from onyx.db.search_settings import update_current_search_settings
from onyx.db.search_settings import update_secondary_search_settings
from onyx.db.swap_index import check_index_swap
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.vespa.index import VespaIndex
@@ -71,7 +72,13 @@ def setup_onyx(
"""
check_index_swap(db_session=db_session)
search_settings = get_current_search_settings(db_session)
multipass_config_1 = get_multipass_config(search_settings)
secondary_large_chunks_enabled: bool | None = None
secondary_search_settings = get_secondary_search_settings(db_session)
if secondary_search_settings:
multipass_config_2 = get_multipass_config(secondary_search_settings)
secondary_large_chunks_enabled = multipass_config_2.enable_large_chunks
# Break bad state for thrashing indexes
if secondary_search_settings and DISABLE_INDEX_UPDATE_ON_SWAP:
@@ -123,9 +130,11 @@ def setup_onyx(
logger.notice("Verifying Document Index(s) is/are available.")
document_index = get_default_document_index(
primary_index_name=search_settings.index_name,
large_chunks_enabled=multipass_config_1.enable_large_chunks,
secondary_index_name=secondary_search_settings.index_name
if secondary_search_settings
else None,
secondary_large_chunks_enabled=secondary_large_chunks_enabled,
)
success = setup_vespa(

View File

@@ -38,7 +38,7 @@ from onyx.db.connector_credential_pair import (
from onyx.db.engine import get_session_context_manager
from onyx.document_index.factory import get_default_document_index
from onyx.file_store.file_store import get_default_file_store
from onyx.document_index.document_index_utils import get_both_index_names
from onyx.document_index.document_index_utils import get_both_index_properties
# pylint: enable=E402
# flake8: noqa: E402
@@ -191,9 +191,17 @@ def _delete_connector(cc_pair_id: int, db_session: Session) -> None:
)
try:
logger.notice("Deleting information from Vespa and Postgres")
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
(
curr_ind_name,
sec_ind_name,
large_chunks,
secondary_large_chunks,
) = get_both_index_properties(db_session)
document_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
primary_index_name=curr_ind_name,
secondary_index_name=sec_ind_name,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=secondary_large_chunks,
)
files_deleted_count = _unsafe_deletion(

View File

@@ -5,6 +5,8 @@ import sys
from sqlalchemy import text
from sqlalchemy.orm import Session
from onyx.document_index.document_index_utils import get_multipass_config
# makes it so `PYTHONPATH=.` is not required when running this script
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
@@ -54,8 +56,14 @@ def main() -> None:
# Setup Vespa index
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
index_name = search_settings.index_name
vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None)
vespa_index = VespaIndex(
index_name=index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
# Delete chunks from Vespa first
print("Deleting orphaned document chunks from Vespa")

View File

@@ -16,6 +16,7 @@ from onyx.configs.constants import DocumentSource
from onyx.connectors.models import Document
from onyx.db.engine import get_session_context_manager
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.vespa.index import VespaIndex
from onyx.indexing.indexing_pipeline import IndexBatchParams
from onyx.indexing.models import ChunkEmbedding
@@ -133,10 +134,16 @@ def seed_dummy_docs(
) -> None:
with get_session_context_manager() as db_session:
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
index_name = search_settings.index_name
embedding_dim = search_settings.model_dim
vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None)
vespa_index = VespaIndex(
index_name=index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
print(index_name)
all_chunks = []

View File

@@ -9,6 +9,7 @@ from onyx.configs.model_configs import DOC_EMBEDDING_DIM
from onyx.context.search.models import IndexFilters
from onyx.db.engine import get_session_context_manager
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.vespa.index import VespaIndex
from scripts.query_time_check.seed_dummy_docs import TOTAL_ACL_ENTRIES_PER_CATEGORY
from scripts.query_time_check.seed_dummy_docs import TOTAL_DOC_SETS
@@ -62,9 +63,15 @@ def test_hybrid_retrieval_times(
) -> None:
with get_session_context_manager() as db_session:
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
index_name = search_settings.index_name
vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None)
vespa_index = VespaIndex(
index_name=index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
# Generate random queries
queries = [f"Random Query {i}" for i in range(number_of_queries)]

View File

@@ -18,6 +18,7 @@ from onyx.db.engine import get_session_with_tenant
from onyx.db.engine import SYNC_DB_API
from onyx.db.search_settings import get_current_search_settings
from onyx.db.swap_index import check_index_swap
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.vespa.index import DOCUMENT_ID_ENDPOINT
from onyx.document_index.vespa.index import VespaIndex
from onyx.indexing.models import IndexingSetting
@@ -173,10 +174,16 @@ def reset_vespa() -> None:
check_index_swap(db_session)
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
index_name = search_settings.index_name
success = setup_vespa(
document_index=VespaIndex(index_name=index_name, secondary_index_name=None),
document_index=VespaIndex(
index_name=index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
),
index_setting=IndexingSetting.from_db_model(search_settings),
secondary_index_setting=None,
)
@@ -250,10 +257,16 @@ def reset_vespa_multitenant() -> None:
check_index_swap(db_session)
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
index_name = search_settings.index_name
success = setup_vespa(
document_index=VespaIndex(index_name=index_name, secondary_index_name=None),
document_index=VespaIndex(
index_name=index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
),
index_setting=IndexingSetting.from_db_model(search_settings),
secondary_index_setting=None,
)

View File

@@ -6,7 +6,7 @@ import pytest
from sqlalchemy.orm import Session
from onyx.db.engine import get_sqlalchemy_engine
from onyx.document_index.document_index_utils import get_both_index_names
from onyx.document_index.document_index_utils import get_both_index_properties
from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
@@ -19,7 +19,7 @@ def test_vespa_update() -> None:
doc_id = "test-vespa-update"
with Session(get_sqlalchemy_engine()) as db_session:
primary_index_name, _ = get_both_index_names(db_session)
primary_index_name, _, _, _ = get_both_index_properties(db_session)
endpoint = (
f"{DOCUMENT_ID_ENDPOINT.format(index_name=primary_index_name)}/{doc_id}"
)