mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-03-24 00:52:47 +00:00
Compare commits
12 Commits
bo/query_p
...
v0.19.0-cl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d8650699eb | ||
|
|
bdaa1c1b0c | ||
|
|
cba041275d | ||
|
|
4a318bac26 | ||
|
|
52849386bd | ||
|
|
f02e6f00a2 | ||
|
|
7f18afbdb3 | ||
|
|
6722e87fcf | ||
|
|
f87173f18d | ||
|
|
060a988d13 | ||
|
|
12dcf6a09b | ||
|
|
c1a4eea2d0 |
@@ -24,6 +24,7 @@ from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX
|
||||
from onyx.configs.constants import OnyxRedisLocks
|
||||
from onyx.db.engine import get_sqlalchemy_engine
|
||||
from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout
|
||||
from onyx.httpx.httpx_pool import HttpxPool
|
||||
from onyx.redis.redis_connector import RedisConnector
|
||||
from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
|
||||
from onyx.redis.redis_connector_delete import RedisConnectorDelete
|
||||
@@ -316,6 +317,8 @@ def on_worker_ready(sender: Any, **kwargs: Any) -> None:
|
||||
|
||||
|
||||
def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
|
||||
HttpxPool.close_all()
|
||||
|
||||
if not celery_is_worker_primary(sender):
|
||||
return
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ from celery.signals import worker_ready
|
||||
from celery.signals import worker_shutdown
|
||||
|
||||
import onyx.background.celery.apps.app_base as app_base
|
||||
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
|
||||
from onyx.configs.constants import POSTGRES_CELERY_WORKER_LIGHT_APP_NAME
|
||||
from onyx.db.engine import SqlEngine
|
||||
from onyx.utils.logger import setup_logger
|
||||
@@ -54,12 +55,16 @@ def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None:
|
||||
|
||||
@worker_init.connect
|
||||
def on_worker_init(sender: Worker, **kwargs: Any) -> None:
|
||||
EXTRA_CONCURRENCY = 8 # small extra fudge factor for connection limits
|
||||
|
||||
logger.info("worker_init signal received.")
|
||||
|
||||
logger.info(f"Concurrency: {sender.concurrency}") # type: ignore
|
||||
|
||||
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME)
|
||||
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) # type: ignore
|
||||
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=EXTRA_CONCURRENCY) # type: ignore
|
||||
|
||||
httpx_init_vespa_pool(sender.concurrency + EXTRA_CONCURRENCY) # type: ignore
|
||||
|
||||
app_base.wait_for_redis(sender, **kwargs)
|
||||
app_base.wait_for_db(sender, **kwargs)
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
|
||||
import httpx
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.app_configs import MANAGED_VESPA
|
||||
from onyx.configs.app_configs import MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE
|
||||
from onyx.configs.app_configs import VESPA_CLOUD_CERT_PATH
|
||||
from onyx.configs.app_configs import VESPA_CLOUD_KEY_PATH
|
||||
from onyx.configs.app_configs import VESPA_REQUEST_TIMEOUT
|
||||
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import (
|
||||
rate_limit_builder,
|
||||
)
|
||||
@@ -17,6 +23,7 @@ from onyx.db.connector_credential_pair import get_connector_credential_pair
|
||||
from onyx.db.enums import ConnectorCredentialPairStatus
|
||||
from onyx.db.enums import TaskStatus
|
||||
from onyx.db.models import TaskQueueState
|
||||
from onyx.httpx.httpx_pool import HttpxPool
|
||||
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
|
||||
from onyx.redis.redis_connector import RedisConnector
|
||||
from onyx.server.documents.models import DeletionAttemptSnapshot
|
||||
@@ -154,3 +161,22 @@ def celery_is_worker_primary(worker: Any) -> bool:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def httpx_init_vespa_pool(max_keepalive_connections: int) -> None:
|
||||
httpx_cert = None
|
||||
httpx_verify = False
|
||||
if MANAGED_VESPA:
|
||||
httpx_cert = cast(
|
||||
tuple[str, str], (VESPA_CLOUD_CERT_PATH, VESPA_CLOUD_KEY_PATH)
|
||||
)
|
||||
httpx_verify = True
|
||||
|
||||
HttpxPool.init_client(
|
||||
name="vespa",
|
||||
cert=httpx_cert,
|
||||
verify=httpx_verify,
|
||||
timeout=VESPA_REQUEST_TIMEOUT,
|
||||
http2=False,
|
||||
limits=httpx.Limits(max_keepalive_connections=max_keepalive_connections),
|
||||
)
|
||||
|
||||
@@ -15,6 +15,7 @@ from redis import Redis
|
||||
from redis.lock import Lock as RedisLock
|
||||
|
||||
from onyx.background.celery.apps.app_base import task_logger
|
||||
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
|
||||
from onyx.background.celery.tasks.indexing.utils import _should_index
|
||||
from onyx.background.celery.tasks.indexing.utils import get_unfenced_index_attempt_ids
|
||||
from onyx.background.celery.tasks.indexing.utils import IndexingCallback
|
||||
@@ -303,6 +304,8 @@ def connector_indexing_task(
|
||||
attempt_found = False
|
||||
n_final_progress: int | None = None
|
||||
|
||||
httpx_init_vespa_pool(20) # documented default
|
||||
|
||||
redis_connector = RedisConnector(tenant_id, cc_pair_id)
|
||||
redis_connector_index = redis_connector.new_index(search_settings_id)
|
||||
|
||||
|
||||
@@ -27,9 +27,10 @@ from onyx.db.document import mark_document_as_synced
|
||||
from onyx.db.document_set import fetch_document_sets_for_document
|
||||
from onyx.db.engine import get_all_tenant_ids
|
||||
from onyx.db.engine import get_session_with_tenant
|
||||
from onyx.document_index.document_index_utils import get_both_index_names
|
||||
from onyx.document_index.document_index_utils import get_both_index_properties
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.document_index.interfaces import VespaDocumentFields
|
||||
from onyx.httpx.httpx_pool import HttpxPool
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.redis.redis_pool import redis_lock_dump
|
||||
from onyx.server.documents.models import ConnectorCredentialPairIdentifier
|
||||
@@ -79,9 +80,18 @@ def document_by_cc_pair_cleanup_task(
|
||||
action = "skip"
|
||||
chunks_affected = 0
|
||||
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
(
|
||||
curr_ind_name,
|
||||
sec_ind_name,
|
||||
large_chunks,
|
||||
secondary_large_chunks,
|
||||
) = get_both_index_properties(db_session)
|
||||
doc_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
primary_index_name=curr_ind_name,
|
||||
secondary_index_name=sec_ind_name,
|
||||
large_chunks_enabled=large_chunks,
|
||||
secondary_large_chunks_enabled=secondary_large_chunks,
|
||||
httpx_client=HttpxPool.get("vespa"),
|
||||
)
|
||||
|
||||
retry_index = RetryDocumentIndex(doc_index)
|
||||
|
||||
@@ -64,9 +64,10 @@ from onyx.db.models import UserGroup
|
||||
from onyx.db.sync_record import cleanup_sync_records
|
||||
from onyx.db.sync_record import insert_sync_record
|
||||
from onyx.db.sync_record import update_sync_record_status
|
||||
from onyx.document_index.document_index_utils import get_both_index_names
|
||||
from onyx.document_index.document_index_utils import get_both_index_properties
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.document_index.interfaces import VespaDocumentFields
|
||||
from onyx.httpx.httpx_pool import HttpxPool
|
||||
from onyx.redis.redis_connector import RedisConnector
|
||||
from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
|
||||
from onyx.redis.redis_connector_delete import RedisConnectorDelete
|
||||
@@ -1092,30 +1093,51 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
|
||||
def vespa_metadata_sync_task(
|
||||
self: Task, document_id: str, tenant_id: str | None
|
||||
) -> bool:
|
||||
timings: dict[str, Any] = {}
|
||||
|
||||
start = time.monotonic()
|
||||
timings["start"] = start
|
||||
|
||||
try:
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
phase_start = time.monotonic()
|
||||
(
|
||||
curr_ind_name,
|
||||
sec_ind_name,
|
||||
large_chunks,
|
||||
secondary_large_chunks,
|
||||
) = get_both_index_properties(db_session)
|
||||
doc_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
primary_index_name=curr_ind_name,
|
||||
secondary_index_name=sec_ind_name,
|
||||
large_chunks_enabled=large_chunks,
|
||||
secondary_large_chunks_enabled=secondary_large_chunks,
|
||||
httpx_client=HttpxPool.get("vespa"),
|
||||
)
|
||||
timings["get_index"] = time.monotonic() - phase_start
|
||||
|
||||
phase_start = time.monotonic()
|
||||
retry_index = RetryDocumentIndex(doc_index)
|
||||
|
||||
doc = get_document(document_id, db_session)
|
||||
if not doc:
|
||||
return False
|
||||
timings["get_document"] = time.monotonic() - phase_start
|
||||
|
||||
# document set sync
|
||||
phase_start = time.monotonic()
|
||||
doc_sets = fetch_document_sets_for_document(document_id, db_session)
|
||||
update_doc_sets: set[str] = set(doc_sets)
|
||||
timings["fetch_document_sets_for_document"] = time.monotonic() - phase_start
|
||||
|
||||
# User group sync
|
||||
phase_start = time.monotonic()
|
||||
doc_access = get_access_for_document(
|
||||
document_id=document_id, db_session=db_session
|
||||
)
|
||||
timings["get_access_for_document"] = time.monotonic() - phase_start
|
||||
|
||||
phase_start = time.monotonic()
|
||||
fields = VespaDocumentFields(
|
||||
document_sets=update_doc_sets,
|
||||
access=doc_access,
|
||||
@@ -1130,10 +1152,13 @@ def vespa_metadata_sync_task(
|
||||
chunk_count=doc.chunk_count,
|
||||
fields=fields,
|
||||
)
|
||||
timings["index.update_single"] = time.monotonic() - phase_start
|
||||
|
||||
# update db last. Worst case = we crash right before this and
|
||||
# the sync might repeat again later
|
||||
phase_start = time.monotonic()
|
||||
mark_document_as_synced(document_id, db_session)
|
||||
timings["mark_document_as_synced"] = time.monotonic() - phase_start
|
||||
|
||||
# this code checks for and removes a per document sync key that is
|
||||
# used to block out the same doc from continualy resyncing
|
||||
@@ -1149,7 +1174,8 @@ def vespa_metadata_sync_task(
|
||||
f"doc={document_id} "
|
||||
f"action=sync "
|
||||
f"chunks={chunks_affected} "
|
||||
f"elapsed={elapsed:.2f}"
|
||||
f"elapsed={elapsed:.2f} "
|
||||
f"debug_timings={timings}"
|
||||
)
|
||||
except SoftTimeLimitExceeded:
|
||||
task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}")
|
||||
|
||||
@@ -34,7 +34,9 @@ from onyx.db.models import ConnectorCredentialPair
|
||||
from onyx.db.models import IndexAttempt
|
||||
from onyx.db.models import IndexingStatus
|
||||
from onyx.db.models import IndexModelStatus
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.httpx.httpx_pool import HttpxPool
|
||||
from onyx.indexing.embedder import DefaultIndexingEmbedder
|
||||
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
|
||||
from onyx.indexing.indexing_pipeline import build_indexing_pipeline
|
||||
@@ -149,6 +151,7 @@ class RunIndexingContext(BaseModel):
|
||||
from_beginning: bool
|
||||
is_primary: bool
|
||||
search_settings_status: IndexModelStatus
|
||||
large_chunks_enabled: bool
|
||||
|
||||
|
||||
def _run_indexing(
|
||||
@@ -179,6 +182,8 @@ def _run_indexing(
|
||||
"Search settings must be set for indexing. This should not be possible."
|
||||
)
|
||||
|
||||
multipass_config = get_multipass_config(index_attempt_start.search_settings)
|
||||
|
||||
# search_settings = index_attempt_start.search_settings
|
||||
db_connector = index_attempt_start.connector_credential_pair.connector
|
||||
db_credential = index_attempt_start.connector_credential_pair.credential
|
||||
@@ -200,6 +205,7 @@ def _run_indexing(
|
||||
index_attempt_start.search_settings.status == IndexModelStatus.PRESENT
|
||||
),
|
||||
search_settings_status=index_attempt_start.search_settings.status,
|
||||
large_chunks_enabled=multipass_config.enable_large_chunks,
|
||||
)
|
||||
|
||||
last_successful_index_time = (
|
||||
@@ -221,7 +227,11 @@ def _run_indexing(
|
||||
|
||||
# Indexing is only done into one index at a time
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=ctx.index_name, secondary_index_name=None
|
||||
primary_index_name=ctx.index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=ctx.large_chunks_enabled,
|
||||
secondary_large_chunks_enabled=None,
|
||||
httpx_client=HttpxPool.get("vespa"),
|
||||
)
|
||||
|
||||
indexing_pipeline = build_indexing_pipeline(
|
||||
|
||||
@@ -63,6 +63,7 @@ from onyx.db.models import ToolCall
|
||||
from onyx.db.models import User
|
||||
from onyx.db.persona import get_persona_by_id
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.file_store.models import ChatFileType
|
||||
from onyx.file_store.models import FileDescriptor
|
||||
@@ -425,8 +426,12 @@ def stream_chat_message_objects(
|
||||
)
|
||||
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
mp_config = get_multipass_config(search_settings)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=search_settings.index_name, secondary_index_name=None
|
||||
primary_index_name=search_settings.index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=mp_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
|
||||
# Every chat Session begins with an empty root message
|
||||
|
||||
@@ -29,6 +29,7 @@ from onyx.context.search.utils import inference_section_from_chunks
|
||||
from onyx.context.search.utils import relevant_sections_to_indices
|
||||
from onyx.db.models import User
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.document_index.interfaces import VespaChunkRequest
|
||||
from onyx.llm.interfaces import LLM
|
||||
@@ -67,9 +68,12 @@ class SearchPipeline:
|
||||
self.rerank_metrics_callback = rerank_metrics_callback
|
||||
|
||||
self.search_settings = get_current_search_settings(db_session)
|
||||
mp_config = get_multipass_config(self.search_settings)
|
||||
self.document_index = get_default_document_index(
|
||||
primary_index_name=self.search_settings.index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=mp_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
self.prompt_config: PromptConfig | None = prompt_config
|
||||
|
||||
|
||||
@@ -452,10 +452,16 @@ def update_docs_chunk_count__no_commit(
|
||||
doc_id_to_chunk_count: dict[str, int],
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
logger.debug("Updating chunk count for these documents")
|
||||
logger.debug(document_ids)
|
||||
logger.debug(doc_id_to_chunk_count)
|
||||
documents_to_update = (
|
||||
db_session.query(DbDocument).filter(DbDocument.id.in_(document_ids)).all()
|
||||
)
|
||||
for doc in documents_to_update:
|
||||
logger.debug(
|
||||
f"Updating chunk count for {doc.id} to {doc_id_to_chunk_count[doc.id]}"
|
||||
)
|
||||
doc.chunk_count = doc_id_to_chunk_count[doc.id]
|
||||
|
||||
|
||||
|
||||
@@ -4,24 +4,76 @@ from uuid import UUID
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.app_configs import ENABLE_MULTIPASS_INDEXING
|
||||
from onyx.db.models import SearchSettings
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.db.search_settings import get_secondary_search_settings
|
||||
from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo
|
||||
from onyx.indexing.models import DocMetadataAwareIndexChunk
|
||||
from onyx.indexing.models import EmbeddingProvider
|
||||
from onyx.indexing.models import MultipassConfig
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
|
||||
DEFAULT_BATCH_SIZE = 30
|
||||
DEFAULT_INDEX_NAME = "danswer_chunk"
|
||||
|
||||
|
||||
def get_both_index_names(db_session: Session) -> tuple[str, str | None]:
|
||||
def should_use_multipass(search_settings: SearchSettings | None) -> bool:
|
||||
"""
|
||||
Determines whether multipass should be used based on the search settings
|
||||
or the default config if settings are unavailable.
|
||||
"""
|
||||
if search_settings is not None:
|
||||
return search_settings.multipass_indexing
|
||||
return ENABLE_MULTIPASS_INDEXING
|
||||
|
||||
|
||||
def can_use_large_chunks(multipass: bool, search_settings: SearchSettings) -> bool:
|
||||
"""
|
||||
Given multipass usage and an embedder, decides whether large chunks are allowed
|
||||
based on model/provider constraints.
|
||||
"""
|
||||
# Only local models that support a larger context are from Nomic
|
||||
# Cohere does not support larger contexts (they recommend not going above ~512 tokens)
|
||||
return (
|
||||
multipass
|
||||
and search_settings.model_name.startswith("nomic-ai")
|
||||
and search_settings.provider_type != EmbeddingProvider.COHERE
|
||||
)
|
||||
|
||||
|
||||
def get_multipass_config(search_settings: SearchSettings) -> MultipassConfig:
|
||||
"""
|
||||
Determines whether to enable multipass and large chunks by examining
|
||||
the current search settings and the embedder configuration.
|
||||
"""
|
||||
if not search_settings:
|
||||
return MultipassConfig(multipass_indexing=False, enable_large_chunks=False)
|
||||
|
||||
multipass = should_use_multipass(search_settings)
|
||||
enable_large_chunks = can_use_large_chunks(multipass, search_settings)
|
||||
return MultipassConfig(
|
||||
multipass_indexing=multipass, enable_large_chunks=enable_large_chunks
|
||||
)
|
||||
|
||||
|
||||
def get_both_index_properties(
|
||||
db_session: Session,
|
||||
) -> tuple[str, str | None, bool, bool | None]:
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
config_1 = get_multipass_config(search_settings)
|
||||
|
||||
search_settings_new = get_secondary_search_settings(db_session)
|
||||
if not search_settings_new:
|
||||
return search_settings.index_name, None
|
||||
return search_settings.index_name, None, config_1.enable_large_chunks, None
|
||||
|
||||
return search_settings.index_name, search_settings_new.index_name
|
||||
config_2 = get_multipass_config(search_settings)
|
||||
return (
|
||||
search_settings.index_name,
|
||||
search_settings_new.index_name,
|
||||
config_1.enable_large_chunks,
|
||||
config_2.enable_large_chunks,
|
||||
)
|
||||
|
||||
|
||||
def translate_boost_count_to_multiplier(boost: int) -> float:
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import httpx
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.interfaces import DocumentIndex
|
||||
from onyx.document_index.vespa.index import VespaIndex
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
@@ -9,15 +11,22 @@ from shared_configs.configs import MULTI_TENANT
|
||||
def get_default_document_index(
|
||||
primary_index_name: str,
|
||||
secondary_index_name: str | None,
|
||||
large_chunks_enabled: bool,
|
||||
secondary_large_chunks_enabled: bool | None,
|
||||
httpx_client: httpx.Client | None = None,
|
||||
) -> DocumentIndex:
|
||||
"""Primary index is the index that is used for querying/updating etc.
|
||||
Secondary index is for when both the currently used index and the upcoming
|
||||
index both need to be updated, updates are applied to both indices"""
|
||||
|
||||
# Currently only supporting Vespa
|
||||
return VespaIndex(
|
||||
index_name=primary_index_name,
|
||||
secondary_index_name=secondary_index_name,
|
||||
large_chunks_enabled=large_chunks_enabled,
|
||||
secondary_large_chunks_enabled=secondary_large_chunks_enabled,
|
||||
multitenant=MULTI_TENANT,
|
||||
httpx_client=httpx_client,
|
||||
)
|
||||
|
||||
|
||||
@@ -26,7 +35,10 @@ def get_current_primary_default_document_index(db_session: Session) -> DocumentI
|
||||
TODO: Use redis to cache this or something
|
||||
"""
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
multipass_config = get_multipass_config(search_settings)
|
||||
return get_default_document_index(
|
||||
primary_index_name=search_settings.index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=multipass_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
|
||||
@@ -231,21 +231,21 @@ def _get_chunks_via_visit_api(
|
||||
return document_chunks
|
||||
|
||||
|
||||
@retry(tries=10, delay=1, backoff=2)
|
||||
def get_all_vespa_ids_for_document_id(
|
||||
document_id: str,
|
||||
index_name: str,
|
||||
filters: IndexFilters | None = None,
|
||||
get_large_chunks: bool = False,
|
||||
) -> list[str]:
|
||||
document_chunks = _get_chunks_via_visit_api(
|
||||
chunk_request=VespaChunkRequest(document_id=document_id),
|
||||
index_name=index_name,
|
||||
filters=filters or IndexFilters(access_control_list=None),
|
||||
field_names=[DOCUMENT_ID],
|
||||
get_large_chunks=get_large_chunks,
|
||||
)
|
||||
return [chunk["id"].split("::", 1)[-1] for chunk in document_chunks]
|
||||
# @retry(tries=10, delay=1, backoff=2)
|
||||
# def get_all_vespa_ids_for_document_id(
|
||||
# document_id: str,
|
||||
# index_name: str,
|
||||
# filters: IndexFilters | None = None,
|
||||
# get_large_chunks: bool = False,
|
||||
# ) -> list[str]:
|
||||
# document_chunks = _get_chunks_via_visit_api(
|
||||
# chunk_request=VespaChunkRequest(document_id=document_id),
|
||||
# index_name=index_name,
|
||||
# filters=filters or IndexFilters(access_control_list=None),
|
||||
# field_names=[DOCUMENT_ID],
|
||||
# get_large_chunks=get_large_chunks,
|
||||
# )
|
||||
# return [chunk["id"].split("::", 1)[-1] for chunk in document_chunks]
|
||||
|
||||
|
||||
def parallel_visit_api_retrieval(
|
||||
|
||||
@@ -10,6 +10,7 @@ import zipfile
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
from typing import BinaryIO
|
||||
from typing import cast
|
||||
from typing import List
|
||||
@@ -25,7 +26,6 @@ from onyx.configs.chat_configs import VESPA_SEARCHER_THREADS
|
||||
from onyx.configs.constants import KV_REINDEX_KEY
|
||||
from onyx.context.search.models import IndexFilters
|
||||
from onyx.context.search.models import InferenceChunkUncleaned
|
||||
from onyx.db.engine import get_session_with_tenant
|
||||
from onyx.document_index.document_index_utils import get_document_chunk_ids
|
||||
from onyx.document_index.interfaces import DocumentIndex
|
||||
from onyx.document_index.interfaces import DocumentInsertionRecord
|
||||
@@ -41,12 +41,12 @@ from onyx.document_index.vespa.chunk_retrieval import (
|
||||
)
|
||||
from onyx.document_index.vespa.chunk_retrieval import query_vespa
|
||||
from onyx.document_index.vespa.deletion import delete_vespa_chunks
|
||||
from onyx.document_index.vespa.indexing_utils import BaseHTTPXClientContext
|
||||
from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks
|
||||
from onyx.document_index.vespa.indexing_utils import check_for_final_chunk_existence
|
||||
from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy
|
||||
from onyx.document_index.vespa.indexing_utils import (
|
||||
get_multipass_config,
|
||||
)
|
||||
from onyx.document_index.vespa.indexing_utils import GlobalHTTPXClientContext
|
||||
from onyx.document_index.vespa.indexing_utils import TemporaryHTTPXClientContext
|
||||
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
|
||||
from onyx.document_index.vespa.shared_utils.utils import (
|
||||
replace_invalid_doc_id_characters,
|
||||
@@ -132,12 +132,34 @@ class VespaIndex(DocumentIndex):
|
||||
self,
|
||||
index_name: str,
|
||||
secondary_index_name: str | None,
|
||||
large_chunks_enabled: bool,
|
||||
secondary_large_chunks_enabled: bool | None,
|
||||
multitenant: bool = False,
|
||||
httpx_client: httpx.Client | None = None,
|
||||
) -> None:
|
||||
self.index_name = index_name
|
||||
self.secondary_index_name = secondary_index_name
|
||||
|
||||
self.large_chunks_enabled = large_chunks_enabled
|
||||
self.secondary_large_chunks_enabled = secondary_large_chunks_enabled
|
||||
|
||||
self.multitenant = multitenant
|
||||
self.http_client = get_vespa_http_client()
|
||||
|
||||
self.httpx_client_context: BaseHTTPXClientContext
|
||||
|
||||
if httpx_client:
|
||||
self.httpx_client_context = GlobalHTTPXClientContext(httpx_client)
|
||||
else:
|
||||
self.httpx_client_context = TemporaryHTTPXClientContext(
|
||||
get_vespa_http_client
|
||||
)
|
||||
|
||||
self.index_to_large_chunks_enabled: dict[str, bool] = {}
|
||||
self.index_to_large_chunks_enabled[index_name] = large_chunks_enabled
|
||||
if secondary_index_name and secondary_large_chunks_enabled:
|
||||
self.index_to_large_chunks_enabled[
|
||||
secondary_index_name
|
||||
] = secondary_large_chunks_enabled
|
||||
|
||||
def ensure_indices_exist(
|
||||
self,
|
||||
@@ -331,7 +353,7 @@ class VespaIndex(DocumentIndex):
|
||||
# indexing / updates / deletes since we have to make a large volume of requests.
|
||||
with (
|
||||
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
|
||||
get_vespa_http_client() as http_client,
|
||||
self.httpx_client_context as http_client,
|
||||
):
|
||||
# We require the start and end index for each document in order to
|
||||
# know precisely which chunks to delete. This information exists for
|
||||
@@ -349,6 +371,12 @@ class VespaIndex(DocumentIndex):
|
||||
for doc_id in doc_id_to_new_chunk_cnt.keys()
|
||||
]
|
||||
|
||||
logger.debug("Indexing these doc IDs / counts")
|
||||
logger.debug(doc_id_to_new_chunk_cnt)
|
||||
|
||||
logger.debug("Enriched docs are as follows")
|
||||
logger.debug(enriched_doc_infos)
|
||||
|
||||
for cleaned_doc_info in enriched_doc_infos:
|
||||
# If the document has previously indexed chunks, we know it previously existed
|
||||
if cleaned_doc_info.chunk_end_index:
|
||||
@@ -390,9 +418,11 @@ class VespaIndex(DocumentIndex):
|
||||
for doc_id in all_doc_ids
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
@classmethod
|
||||
def _apply_updates_batched(
|
||||
cls,
|
||||
updates: list[_VespaUpdateRequest],
|
||||
httpx_client: httpx.Client,
|
||||
batch_size: int = BATCH_SIZE,
|
||||
) -> None:
|
||||
"""Runs a batch of updates in parallel via the ThreadPoolExecutor."""
|
||||
@@ -414,7 +444,7 @@ class VespaIndex(DocumentIndex):
|
||||
|
||||
with (
|
||||
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
|
||||
get_vespa_http_client() as http_client,
|
||||
httpx_client as http_client,
|
||||
):
|
||||
for update_batch in batch_generator(updates, batch_size):
|
||||
future_to_document_id = {
|
||||
@@ -455,7 +485,7 @@ class VespaIndex(DocumentIndex):
|
||||
index_names.append(self.secondary_index_name)
|
||||
|
||||
chunk_id_start_time = time.monotonic()
|
||||
with get_vespa_http_client() as http_client:
|
||||
with self.httpx_client_context as http_client:
|
||||
for update_request in update_requests:
|
||||
for doc_info in update_request.minimal_document_indexing_info:
|
||||
for index_name in index_names:
|
||||
@@ -511,7 +541,8 @@ class VespaIndex(DocumentIndex):
|
||||
)
|
||||
)
|
||||
|
||||
self._apply_updates_batched(processed_updates_requests)
|
||||
with self.httpx_client_context as httpx_client:
|
||||
self._apply_updates_batched(processed_updates_requests, httpx_client)
|
||||
logger.debug(
|
||||
"Finished updating Vespa documents in %.2f seconds",
|
||||
time.monotonic() - update_start,
|
||||
@@ -523,6 +554,7 @@ class VespaIndex(DocumentIndex):
|
||||
index_name: str,
|
||||
fields: VespaDocumentFields,
|
||||
doc_id: str,
|
||||
http_client: httpx.Client,
|
||||
) -> None:
|
||||
"""
|
||||
Update a single "chunk" (document) in Vespa using its chunk ID.
|
||||
@@ -554,18 +586,17 @@ class VespaIndex(DocumentIndex):
|
||||
|
||||
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}?create=true"
|
||||
|
||||
with get_vespa_http_client(http2=False) as http_client:
|
||||
try:
|
||||
resp = http_client.put(
|
||||
vespa_url,
|
||||
headers={"Content-Type": "application/json"},
|
||||
json=update_dict,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
except httpx.HTTPStatusError as e:
|
||||
error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}"
|
||||
logger.error(error_message)
|
||||
raise
|
||||
try:
|
||||
resp = http_client.put(
|
||||
vespa_url,
|
||||
headers={"Content-Type": "application/json"},
|
||||
json=update_dict,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
except httpx.HTTPStatusError as e:
|
||||
error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}"
|
||||
logger.error(error_message)
|
||||
raise
|
||||
|
||||
def update_single(
|
||||
self,
|
||||
@@ -579,24 +610,26 @@ class VespaIndex(DocumentIndex):
|
||||
function will complete with no errors or exceptions.
|
||||
Handle other exceptions if you wish to implement retry behavior
|
||||
"""
|
||||
start = time.monotonic()
|
||||
|
||||
timings: dict[str, Any] = {}
|
||||
|
||||
doc_chunk_count = 0
|
||||
|
||||
index_names = [self.index_name]
|
||||
if self.secondary_index_name:
|
||||
index_names.append(self.secondary_index_name)
|
||||
index = 0
|
||||
with self.httpx_client_context as httpx_client:
|
||||
timings["get_vespa_http_client_enter"] = time.monotonic() - start
|
||||
for (
|
||||
index_name,
|
||||
large_chunks_enabled,
|
||||
) in self.index_to_large_chunks_enabled.items():
|
||||
index += 1
|
||||
|
||||
phase_start = time.monotonic()
|
||||
|
||||
with get_vespa_http_client(http2=False) as http_client:
|
||||
for index_name in index_names:
|
||||
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
|
||||
multipass_config = get_multipass_config(
|
||||
db_session=db_session,
|
||||
primary_index=index_name == self.index_name,
|
||||
)
|
||||
large_chunks_enabled = multipass_config.enable_large_chunks
|
||||
enriched_doc_infos = VespaIndex.enrich_basic_chunk_info(
|
||||
index_name=index_name,
|
||||
http_client=http_client,
|
||||
http_client=httpx_client,
|
||||
document_id=doc_id,
|
||||
previous_chunk_count=chunk_count,
|
||||
new_chunk_count=0,
|
||||
@@ -608,16 +641,42 @@ class VespaIndex(DocumentIndex):
|
||||
large_chunks_enabled=large_chunks_enabled,
|
||||
)
|
||||
|
||||
doc_chunk_count += len(doc_chunk_ids)
|
||||
|
||||
for doc_chunk_id in doc_chunk_ids:
|
||||
self.update_single_chunk(
|
||||
doc_chunk_id=doc_chunk_id,
|
||||
index_name=index_name,
|
||||
fields=fields,
|
||||
doc_id=doc_id,
|
||||
if len(doc_chunk_ids) == 0:
|
||||
logger.warning(
|
||||
f"len(doc_chunk_ids) == 0: "
|
||||
f"tenant_id={tenant_id} "
|
||||
f"enriched_doc_infos={enriched_doc_infos} "
|
||||
f"large_chunks_enabled={large_chunks_enabled}"
|
||||
)
|
||||
logger.warning(
|
||||
"Document chunk info:\n"
|
||||
f"Enriched doc info: {enriched_doc_infos}\n"
|
||||
f"Doc chunk ids: {doc_chunk_ids}\n"
|
||||
f"Document ID: {doc_id}"
|
||||
f"Tenant_id: {tenant_id}"
|
||||
)
|
||||
|
||||
doc_chunk_count += len(doc_chunk_ids)
|
||||
timings[f"prep_{index}"] = time.monotonic() - phase_start
|
||||
|
||||
chunk = 0
|
||||
for doc_chunk_id in doc_chunk_ids:
|
||||
chunk += 1
|
||||
|
||||
phase_start = time.monotonic()
|
||||
self.update_single_chunk(
|
||||
doc_chunk_id, index_name, fields, doc_id, httpx_client
|
||||
)
|
||||
timings[f"chunk_{index}_{chunk}"] = time.monotonic() - phase_start
|
||||
|
||||
phase_start = time.monotonic()
|
||||
|
||||
timings["get_vespa_http_client_exit"] = time.monotonic() - phase_start
|
||||
|
||||
elapsed = time.monotonic() - start
|
||||
logger.debug(
|
||||
f"num_chunks={doc_chunk_count} elapsed={elapsed:.2f} timings={timings}"
|
||||
)
|
||||
return doc_chunk_count
|
||||
|
||||
def delete_single(
|
||||
@@ -637,19 +696,13 @@ class VespaIndex(DocumentIndex):
|
||||
if self.secondary_index_name:
|
||||
index_names.append(self.secondary_index_name)
|
||||
|
||||
with get_vespa_http_client(
|
||||
http2=False
|
||||
) as http_client, concurrent.futures.ThreadPoolExecutor(
|
||||
with self.httpx_client_context as http_client, concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=NUM_THREADS
|
||||
) as executor:
|
||||
for index_name in index_names:
|
||||
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
|
||||
multipass_config = get_multipass_config(
|
||||
db_session=db_session,
|
||||
primary_index=index_name == self.index_name,
|
||||
)
|
||||
large_chunks_enabled = multipass_config.enable_large_chunks
|
||||
|
||||
for (
|
||||
index_name,
|
||||
large_chunks_enabled,
|
||||
) in self.index_to_large_chunks_enabled.items():
|
||||
enriched_doc_infos = VespaIndex.enrich_basic_chunk_info(
|
||||
index_name=index_name,
|
||||
http_client=http_client,
|
||||
@@ -808,9 +861,8 @@ class VespaIndex(DocumentIndex):
|
||||
)
|
||||
return enriched_doc_info
|
||||
|
||||
@classmethod
|
||||
def delete_entries_by_tenant_id(
|
||||
cls,
|
||||
self,
|
||||
*,
|
||||
tenant_id: str,
|
||||
index_name: str,
|
||||
@@ -827,7 +879,7 @@ class VespaIndex(DocumentIndex):
|
||||
)
|
||||
|
||||
# Step 1: Retrieve all document IDs with the given tenant_id
|
||||
document_ids = cls._get_all_document_ids_by_tenant_id(tenant_id, index_name)
|
||||
document_ids = self._get_all_document_ids_by_tenant_id(tenant_id, index_name)
|
||||
|
||||
if not document_ids:
|
||||
logger.info(
|
||||
@@ -841,11 +893,10 @@ class VespaIndex(DocumentIndex):
|
||||
for doc_id in document_ids
|
||||
]
|
||||
|
||||
cls._apply_deletes_batched(delete_requests)
|
||||
self._apply_deletes_batched(delete_requests)
|
||||
|
||||
@classmethod
|
||||
def _get_all_document_ids_by_tenant_id(
|
||||
cls, tenant_id: str, index_name: str
|
||||
self, tenant_id: str, index_name: str
|
||||
) -> List[str]:
|
||||
"""
|
||||
Retrieves all document IDs with the specified tenant_id, handling pagination.
|
||||
@@ -882,8 +933,8 @@ class VespaIndex(DocumentIndex):
|
||||
f"Querying for document IDs with tenant_id: {tenant_id}, offset: {offset}"
|
||||
)
|
||||
|
||||
with get_vespa_http_client(no_timeout=True) as http_client:
|
||||
response = http_client.get(url, params=query_params)
|
||||
with self.httpx_client_context as http_client:
|
||||
response = http_client.get(url, params=query_params, timeout=None)
|
||||
response.raise_for_status()
|
||||
|
||||
search_result = response.json()
|
||||
@@ -904,9 +955,8 @@ class VespaIndex(DocumentIndex):
|
||||
)
|
||||
return document_ids
|
||||
|
||||
@classmethod
|
||||
def _apply_deletes_batched(
|
||||
cls,
|
||||
self,
|
||||
delete_requests: List["_VespaDeleteRequest"],
|
||||
batch_size: int = BATCH_SIZE,
|
||||
) -> None:
|
||||
@@ -925,13 +975,14 @@ class VespaIndex(DocumentIndex):
|
||||
response = http_client.delete(
|
||||
delete_request.url,
|
||||
headers={"Content-Type": "application/json"},
|
||||
timeout=None,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
logger.debug(f"Starting batch deletion for {len(delete_requests)} documents")
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
|
||||
with get_vespa_http_client(no_timeout=True) as http_client:
|
||||
with self.httpx_client_context as http_client:
|
||||
for batch_start in range(0, len(delete_requests), batch_size):
|
||||
batch = delete_requests[batch_start : batch_start + batch_size]
|
||||
|
||||
|
||||
@@ -1,21 +1,19 @@
|
||||
import concurrent.futures
|
||||
import json
|
||||
import uuid
|
||||
from abc import ABC
|
||||
from abc import abstractmethod
|
||||
from collections.abc import Callable
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from http import HTTPStatus
|
||||
|
||||
import httpx
|
||||
from retry import retry
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.app_configs import ENABLE_MULTIPASS_INDEXING
|
||||
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
|
||||
get_experts_stores_representations,
|
||||
)
|
||||
from onyx.db.models import SearchSettings
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.db.search_settings import get_secondary_search_settings
|
||||
from onyx.document_index.document_index_utils import get_uuid_from_chunk
|
||||
from onyx.document_index.document_index_utils import get_uuid_from_chunk_info_old
|
||||
from onyx.document_index.interfaces import MinimalDocumentIndexingInfo
|
||||
@@ -50,10 +48,9 @@ from onyx.document_index.vespa_constants import TENANT_ID
|
||||
from onyx.document_index.vespa_constants import TITLE
|
||||
from onyx.document_index.vespa_constants import TITLE_EMBEDDING
|
||||
from onyx.indexing.models import DocMetadataAwareIndexChunk
|
||||
from onyx.indexing.models import EmbeddingProvider
|
||||
from onyx.indexing.models import MultipassConfig
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
@@ -275,46 +272,63 @@ def check_for_final_chunk_existence(
|
||||
index += 1
|
||||
|
||||
|
||||
def should_use_multipass(search_settings: SearchSettings | None) -> bool:
|
||||
"""
|
||||
Determines whether multipass should be used based on the search settings
|
||||
or the default config if settings are unavailable.
|
||||
"""
|
||||
if search_settings is not None:
|
||||
return search_settings.multipass_indexing
|
||||
return ENABLE_MULTIPASS_INDEXING
|
||||
# def get_multipass_config(
|
||||
# db_session: Session, primary_index: bool = True
|
||||
# ) -> MultipassConfig:
|
||||
# """
|
||||
# Determines whether to enable multipass and large chunks by examining
|
||||
# the current search settings and the embedder configuration.
|
||||
# """
|
||||
# search_settings = (
|
||||
# get_current_search_settings(db_session)
|
||||
# if primary_index
|
||||
# else get_secondary_search_settings(db_session)
|
||||
# )
|
||||
# multipass = should_use_multipass(search_settings)
|
||||
# if not search_settings:
|
||||
# return MultipassConfig(multipass_indexing=False, enable_large_chunks=False)
|
||||
# enable_large_chunks = can_use_large_chunks(multipass, search_settings)
|
||||
# return MultipassConfig(
|
||||
# multipass_indexing=multipass, enable_large_chunks=enable_large_chunks
|
||||
# )
|
||||
|
||||
|
||||
def can_use_large_chunks(multipass: bool, search_settings: SearchSettings) -> bool:
|
||||
"""
|
||||
Given multipass usage and an embedder, decides whether large chunks are allowed
|
||||
based on model/provider constraints.
|
||||
"""
|
||||
# Only local models that support a larger context are from Nomic
|
||||
# Cohere does not support larger contexts (they recommend not going above ~512 tokens)
|
||||
return (
|
||||
multipass
|
||||
and search_settings.model_name.startswith("nomic-ai")
|
||||
and search_settings.provider_type != EmbeddingProvider.COHERE
|
||||
)
|
||||
class BaseHTTPXClientContext(ABC):
|
||||
"""Abstract base class for an HTTPX client context manager."""
|
||||
|
||||
@abstractmethod
|
||||
def __enter__(self) -> httpx.Client:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
|
||||
pass
|
||||
|
||||
|
||||
def get_multipass_config(
|
||||
db_session: Session, primary_index: bool = True
|
||||
) -> MultipassConfig:
|
||||
"""
|
||||
Determines whether to enable multipass and large chunks by examining
|
||||
the current search settings and the embedder configuration.
|
||||
"""
|
||||
search_settings = (
|
||||
get_current_search_settings(db_session)
|
||||
if primary_index
|
||||
else get_secondary_search_settings(db_session)
|
||||
)
|
||||
multipass = should_use_multipass(search_settings)
|
||||
if not search_settings:
|
||||
return MultipassConfig(multipass_indexing=False, enable_large_chunks=False)
|
||||
enable_large_chunks = can_use_large_chunks(multipass, search_settings)
|
||||
return MultipassConfig(
|
||||
multipass_indexing=multipass, enable_large_chunks=enable_large_chunks
|
||||
)
|
||||
class GlobalHTTPXClientContext(BaseHTTPXClientContext):
|
||||
"""Context manager for a global HTTPX client that does not close it."""
|
||||
|
||||
def __init__(self, client: httpx.Client):
|
||||
self._client = client
|
||||
|
||||
def __enter__(self) -> httpx.Client:
|
||||
return self._client # Reuse the global client
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
|
||||
pass # Do nothing; don't close the global client
|
||||
|
||||
|
||||
class TemporaryHTTPXClientContext(BaseHTTPXClientContext):
|
||||
"""Context manager for a temporary HTTPX client that closes it after use."""
|
||||
|
||||
def __init__(self, client_factory: Callable[[], httpx.Client]):
|
||||
self._client_factory = client_factory
|
||||
self._client: httpx.Client | None = None # Client will be created in __enter__
|
||||
|
||||
def __enter__(self) -> httpx.Client:
|
||||
self._client = self._client_factory() # Create a new client
|
||||
return self._client
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
|
||||
if self._client:
|
||||
self._client.close()
|
||||
|
||||
57
backend/onyx/httpx/httpx_pool.py
Normal file
57
backend/onyx/httpx/httpx_pool.py
Normal file
@@ -0,0 +1,57 @@
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
class HttpxPool:
|
||||
"""Class to manage a global httpx Client instance"""
|
||||
|
||||
_clients: dict[str, httpx.Client] = {}
|
||||
_lock: threading.Lock = threading.Lock()
|
||||
|
||||
# Default parameters for creation
|
||||
DEFAULT_KWARGS = {
|
||||
"http2": True,
|
||||
"limits": lambda: httpx.Limits(),
|
||||
}
|
||||
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def _init_client(cls, **kwargs: Any) -> httpx.Client:
|
||||
"""Private helper method to create and return an httpx.Client."""
|
||||
merged_kwargs = {**cls.DEFAULT_KWARGS, **kwargs}
|
||||
return httpx.Client(**merged_kwargs)
|
||||
|
||||
@classmethod
|
||||
def init_client(cls, name: str, **kwargs: Any) -> None:
|
||||
"""Allow the caller to init the client with extra params."""
|
||||
with cls._lock:
|
||||
if name not in cls._clients:
|
||||
cls._clients[name] = cls._init_client(**kwargs)
|
||||
|
||||
@classmethod
|
||||
def close_client(cls, name: str) -> None:
|
||||
"""Allow the caller to close the client."""
|
||||
with cls._lock:
|
||||
client = cls._clients.pop(name, None)
|
||||
if client:
|
||||
client.close()
|
||||
|
||||
@classmethod
|
||||
def close_all(cls) -> None:
|
||||
"""Close all registered clients."""
|
||||
with cls._lock:
|
||||
for client in cls._clients.values():
|
||||
client.close()
|
||||
cls._clients.clear()
|
||||
|
||||
@classmethod
|
||||
def get(cls, name: str) -> httpx.Client:
|
||||
"""Gets the httpx.Client. Will init to default settings if not init'd."""
|
||||
with cls._lock:
|
||||
if name not in cls._clients:
|
||||
cls._clients[name] = cls._init_client()
|
||||
return cls._clients[name]
|
||||
@@ -31,14 +31,15 @@ from onyx.db.document import upsert_documents
|
||||
from onyx.db.document_set import fetch_document_sets_for_documents
|
||||
from onyx.db.index_attempt import create_index_attempt_error
|
||||
from onyx.db.models import Document as DBDocument
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.db.tag import create_or_add_document_tag
|
||||
from onyx.db.tag import create_or_add_document_tag_list
|
||||
from onyx.document_index.document_index_utils import (
|
||||
get_multipass_config,
|
||||
)
|
||||
from onyx.document_index.interfaces import DocumentIndex
|
||||
from onyx.document_index.interfaces import DocumentMetadata
|
||||
from onyx.document_index.interfaces import IndexBatchParams
|
||||
from onyx.document_index.vespa.indexing_utils import (
|
||||
get_multipass_config,
|
||||
)
|
||||
from onyx.indexing.chunker import Chunker
|
||||
from onyx.indexing.embedder import IndexingEmbedder
|
||||
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
|
||||
@@ -358,8 +359,15 @@ def index_doc_batch(
|
||||
)
|
||||
|
||||
logger.debug("Filtering Documents")
|
||||
|
||||
logger.debug("Document IDs before filtering")
|
||||
logger.debug([doc.id for doc in document_batch])
|
||||
|
||||
filtered_documents = filter_fnc(document_batch)
|
||||
|
||||
logger.debug("Document IDs after filtering")
|
||||
logger.debug([doc.id for doc in filtered_documents])
|
||||
|
||||
ctx = index_doc_batch_prepare(
|
||||
documents=filtered_documents,
|
||||
index_attempt_metadata=index_attempt_metadata,
|
||||
@@ -481,6 +489,7 @@ def index_doc_batch(
|
||||
update_docs_updated_at__no_commit(
|
||||
ids_to_new_updated_at=ids_to_new_updated_at, db_session=db_session
|
||||
)
|
||||
logger.info("UPDATING DOCS")
|
||||
|
||||
update_docs_last_modified__no_commit(
|
||||
document_ids=last_modified_ids, db_session=db_session
|
||||
@@ -527,7 +536,8 @@ def build_indexing_pipeline(
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
) -> IndexingPipelineProtocol:
|
||||
"""Builds a pipeline which takes in a list (batch) of docs and indexes them."""
|
||||
multipass_config = get_multipass_config(db_session, primary_index=True)
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
multipass_config = get_multipass_config(search_settings)
|
||||
|
||||
chunker = chunker or Chunker(
|
||||
tokenizer=embedder.embedding_model.tokenizer,
|
||||
|
||||
@@ -55,9 +55,7 @@ class DocAwareChunk(BaseChunk):
|
||||
|
||||
def to_short_descriptor(self) -> str:
|
||||
"""Used when logging the identity of a chunk"""
|
||||
return (
|
||||
f"Chunk ID: '{self.chunk_id}'; {self.source_document.to_short_descriptor()}"
|
||||
)
|
||||
return f"{self.source_document.to_short_descriptor()} Chunk ID: {self.chunk_id}"
|
||||
|
||||
|
||||
class IndexChunk(DocAwareChunk):
|
||||
|
||||
@@ -16,7 +16,7 @@ from onyx.context.search.preprocessing.access_filters import (
|
||||
from onyx.db.document_set import get_document_sets_by_ids
|
||||
from onyx.db.models import StarterMessageModel as StarterMessage
|
||||
from onyx.db.models import User
|
||||
from onyx.document_index.document_index_utils import get_both_index_names
|
||||
from onyx.document_index.document_index_utils import get_both_index_properties
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.llm.factory import get_default_llms
|
||||
from onyx.prompts.starter_messages import format_persona_starter_message_prompt
|
||||
@@ -34,8 +34,18 @@ def get_random_chunks_from_doc_sets(
|
||||
"""
|
||||
Retrieves random chunks from the specified document sets.
|
||||
"""
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
document_index = get_default_document_index(curr_ind_name, sec_ind_name)
|
||||
(
|
||||
curr_ind_name,
|
||||
sec_ind_name,
|
||||
large_chunks,
|
||||
secondary_large_chunks,
|
||||
) = get_both_index_properties(db_session)
|
||||
document_index = get_default_document_index(
|
||||
curr_ind_name,
|
||||
sec_ind_name,
|
||||
large_chunks_enabled=large_chunks,
|
||||
secondary_large_chunks_enabled=secondary_large_chunks,
|
||||
)
|
||||
|
||||
acl_filters = build_access_filters_for_user(user, db_session)
|
||||
filters = IndexFilters(document_set=doc_sets, access_control_list=acl_filters)
|
||||
|
||||
@@ -3,6 +3,7 @@ import json
|
||||
import os
|
||||
from typing import cast
|
||||
|
||||
from sqlalchemy import update
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.access.models import default_public_access
|
||||
@@ -23,7 +24,9 @@ from onyx.db.document import check_docs_exist
|
||||
from onyx.db.enums import AccessType
|
||||
from onyx.db.enums import ConnectorCredentialPairStatus
|
||||
from onyx.db.index_attempt import mock_successful_index_attempt
|
||||
from onyx.db.models import Document as DbDocument
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.document_index.interfaces import IndexBatchParams
|
||||
from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout
|
||||
@@ -59,6 +62,7 @@ def _create_indexable_chunks(
|
||||
doc_updated_at=None,
|
||||
primary_owners=[],
|
||||
secondary_owners=[],
|
||||
chunk_count=1,
|
||||
)
|
||||
if preprocessed_doc["chunk_ind"] == 0:
|
||||
ids_to_documents[document.id] = document
|
||||
@@ -155,8 +159,12 @@ def seed_initial_documents(
|
||||
logger.info("Embedding model has been updated, skipping")
|
||||
return
|
||||
|
||||
mp_config = get_multipass_config(search_settings)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=search_settings.index_name, secondary_index_name=None
|
||||
primary_index_name=search_settings.index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=mp_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
|
||||
# Create a connector so the user can delete it if they want
|
||||
@@ -240,4 +248,12 @@ def seed_initial_documents(
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
# Since we bypass the indexing flow, we need to manually update the chunk count
|
||||
for doc in docs:
|
||||
db_session.execute(
|
||||
update(DbDocument)
|
||||
.where(DbDocument.id == doc.id)
|
||||
.values(chunk_count=doc.chunk_count)
|
||||
)
|
||||
|
||||
kv_store.store(KV_DOCUMENTS_SEEDED_KEY, True)
|
||||
|
||||
@@ -12,6 +12,7 @@ from onyx.context.search.preprocessing.access_filters import (
|
||||
from onyx.db.engine import get_session
|
||||
from onyx.db.models import User
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.document_index.interfaces import VespaChunkRequest
|
||||
from onyx.natural_language_processing.utils import get_tokenizer
|
||||
@@ -32,9 +33,12 @@ def get_document_info(
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> DocumentInfo:
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
|
||||
mp_config = get_multipass_config(search_settings)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=search_settings.index_name, secondary_index_name=None
|
||||
primary_index_name=search_settings.index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=mp_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
|
||||
user_acl_filters = build_access_filters_for_user(user, db_session)
|
||||
@@ -79,9 +83,12 @@ def get_chunk_info(
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> ChunkInfo:
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
|
||||
mp_config = get_multipass_config(search_settings)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=search_settings.index_name, secondary_index_name=None
|
||||
primary_index_name=search_settings.index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=mp_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
|
||||
user_acl_filters = build_access_filters_for_user(user, db_session)
|
||||
|
||||
@@ -22,6 +22,7 @@ from onyx.db.search_settings import get_embedding_provider_from_provider_type
|
||||
from onyx.db.search_settings import get_secondary_search_settings
|
||||
from onyx.db.search_settings import update_current_search_settings
|
||||
from onyx.db.search_settings import update_search_settings_status
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.file_processing.unstructured import delete_unstructured_api_key
|
||||
from onyx.file_processing.unstructured import get_unstructured_api_key
|
||||
@@ -97,9 +98,13 @@ def set_new_search_settings(
|
||||
)
|
||||
|
||||
# Ensure Vespa has the new index immediately
|
||||
mp_config_1 = get_multipass_config(search_settings)
|
||||
mp_config_2 = get_multipass_config(new_search_settings)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=search_settings.index_name,
|
||||
secondary_index_name=new_search_settings.index_name,
|
||||
large_chunks_enabled=mp_config_1.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=mp_config_2.enable_large_chunks,
|
||||
)
|
||||
|
||||
document_index.ensure_indices_exist(
|
||||
|
||||
@@ -16,7 +16,7 @@ from onyx.db.engine import get_session
|
||||
from onyx.db.models import User
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.db.search_settings import get_secondary_search_settings
|
||||
from onyx.document_index.document_index_utils import get_both_index_names
|
||||
from onyx.document_index.document_index_utils import get_both_index_properties
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.indexing.embedder import DefaultIndexingEmbedder
|
||||
from onyx.indexing.indexing_pipeline import build_indexing_pipeline
|
||||
@@ -89,11 +89,24 @@ def upsert_ingestion_doc(
|
||||
)
|
||||
|
||||
# Need to index for both the primary and secondary index if possible
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
(
|
||||
curr_ind_name,
|
||||
sec_ind_name,
|
||||
large_chunks,
|
||||
secondary_large_chunks,
|
||||
) = get_both_index_properties(db_session)
|
||||
curr_doc_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=None
|
||||
primary_index_name=curr_ind_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
|
||||
# curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
# curr_doc_index = get_default_document_index(
|
||||
# primary_index_name=curr_ind_name, secondary_index_name=None
|
||||
# )
|
||||
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
|
||||
index_embedding_model = DefaultIndexingEmbedder.from_db_search_settings(
|
||||
@@ -118,8 +131,12 @@ def upsert_ingestion_doc(
|
||||
|
||||
# If there's a secondary index being built, index the doc but don't use it for return here
|
||||
if sec_ind_name:
|
||||
# rkuo: i don't understand why we create the secondaray index with the current index again
|
||||
sec_doc_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=None
|
||||
primary_index_name=curr_ind_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
|
||||
sec_search_settings = get_secondary_search_settings(db_session)
|
||||
|
||||
@@ -27,6 +27,7 @@ from onyx.db.engine import get_session
|
||||
from onyx.db.models import User
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.db.tag import find_tags
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.document_index.vespa.index import VespaIndex
|
||||
from onyx.server.query_and_chat.models import AdminSearchRequest
|
||||
@@ -64,9 +65,14 @@ def admin_search(
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
mp_config = get_multipass_config(search_settings)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=search_settings.index_name, secondary_index_name=None
|
||||
primary_index_name=search_settings.index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=mp_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
|
||||
if not isinstance(document_index, VespaIndex):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
|
||||
@@ -30,6 +30,7 @@ from onyx.db.search_settings import get_secondary_search_settings
|
||||
from onyx.db.search_settings import update_current_search_settings
|
||||
from onyx.db.search_settings import update_secondary_search_settings
|
||||
from onyx.db.swap_index import check_index_swap
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.document_index.interfaces import DocumentIndex
|
||||
from onyx.document_index.vespa.index import VespaIndex
|
||||
@@ -71,7 +72,13 @@ def setup_onyx(
|
||||
"""
|
||||
check_index_swap(db_session=db_session)
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
multipass_config_1 = get_multipass_config(search_settings)
|
||||
|
||||
secondary_large_chunks_enabled: bool | None = None
|
||||
secondary_search_settings = get_secondary_search_settings(db_session)
|
||||
if secondary_search_settings:
|
||||
multipass_config_2 = get_multipass_config(secondary_search_settings)
|
||||
secondary_large_chunks_enabled = multipass_config_2.enable_large_chunks
|
||||
|
||||
# Break bad state for thrashing indexes
|
||||
if secondary_search_settings and DISABLE_INDEX_UPDATE_ON_SWAP:
|
||||
@@ -123,9 +130,11 @@ def setup_onyx(
|
||||
logger.notice("Verifying Document Index(s) is/are available.")
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=search_settings.index_name,
|
||||
large_chunks_enabled=multipass_config_1.enable_large_chunks,
|
||||
secondary_index_name=secondary_search_settings.index_name
|
||||
if secondary_search_settings
|
||||
else None,
|
||||
secondary_large_chunks_enabled=secondary_large_chunks_enabled,
|
||||
)
|
||||
|
||||
success = setup_vespa(
|
||||
|
||||
@@ -38,7 +38,7 @@ from onyx.db.connector_credential_pair import (
|
||||
from onyx.db.engine import get_session_context_manager
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.document_index.document_index_utils import get_both_index_names
|
||||
from onyx.document_index.document_index_utils import get_both_index_properties
|
||||
|
||||
# pylint: enable=E402
|
||||
# flake8: noqa: E402
|
||||
@@ -191,9 +191,17 @@ def _delete_connector(cc_pair_id: int, db_session: Session) -> None:
|
||||
)
|
||||
try:
|
||||
logger.notice("Deleting information from Vespa and Postgres")
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
(
|
||||
curr_ind_name,
|
||||
sec_ind_name,
|
||||
large_chunks,
|
||||
secondary_large_chunks,
|
||||
) = get_both_index_properties(db_session)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
primary_index_name=curr_ind_name,
|
||||
secondary_index_name=sec_ind_name,
|
||||
large_chunks_enabled=large_chunks,
|
||||
secondary_large_chunks_enabled=secondary_large_chunks,
|
||||
)
|
||||
|
||||
files_deleted_count = _unsafe_deletion(
|
||||
|
||||
@@ -5,6 +5,8 @@ import sys
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
|
||||
# makes it so `PYTHONPATH=.` is not required when running this script
|
||||
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
sys.path.append(parent_dir)
|
||||
@@ -54,8 +56,14 @@ def main() -> None:
|
||||
|
||||
# Setup Vespa index
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
multipass_config = get_multipass_config(search_settings)
|
||||
index_name = search_settings.index_name
|
||||
vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None)
|
||||
vespa_index = VespaIndex(
|
||||
index_name=index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=multipass_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
|
||||
# Delete chunks from Vespa first
|
||||
print("Deleting orphaned document chunks from Vespa")
|
||||
|
||||
@@ -16,6 +16,7 @@ from onyx.configs.constants import DocumentSource
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.db.engine import get_session_context_manager
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.vespa.index import VespaIndex
|
||||
from onyx.indexing.indexing_pipeline import IndexBatchParams
|
||||
from onyx.indexing.models import ChunkEmbedding
|
||||
@@ -133,10 +134,16 @@ def seed_dummy_docs(
|
||||
) -> None:
|
||||
with get_session_context_manager() as db_session:
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
multipass_config = get_multipass_config(search_settings)
|
||||
index_name = search_settings.index_name
|
||||
embedding_dim = search_settings.model_dim
|
||||
|
||||
vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None)
|
||||
vespa_index = VespaIndex(
|
||||
index_name=index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=multipass_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
print(index_name)
|
||||
|
||||
all_chunks = []
|
||||
|
||||
@@ -9,6 +9,7 @@ from onyx.configs.model_configs import DOC_EMBEDDING_DIM
|
||||
from onyx.context.search.models import IndexFilters
|
||||
from onyx.db.engine import get_session_context_manager
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.vespa.index import VespaIndex
|
||||
from scripts.query_time_check.seed_dummy_docs import TOTAL_ACL_ENTRIES_PER_CATEGORY
|
||||
from scripts.query_time_check.seed_dummy_docs import TOTAL_DOC_SETS
|
||||
@@ -62,9 +63,15 @@ def test_hybrid_retrieval_times(
|
||||
) -> None:
|
||||
with get_session_context_manager() as db_session:
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
multipass_config = get_multipass_config(search_settings)
|
||||
index_name = search_settings.index_name
|
||||
|
||||
vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None)
|
||||
vespa_index = VespaIndex(
|
||||
index_name=index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=multipass_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
)
|
||||
|
||||
# Generate random queries
|
||||
queries = [f"Random Query {i}" for i in range(number_of_queries)]
|
||||
|
||||
@@ -18,6 +18,7 @@ from onyx.db.engine import get_session_with_tenant
|
||||
from onyx.db.engine import SYNC_DB_API
|
||||
from onyx.db.search_settings import get_current_search_settings
|
||||
from onyx.db.swap_index import check_index_swap
|
||||
from onyx.document_index.document_index_utils import get_multipass_config
|
||||
from onyx.document_index.vespa.index import DOCUMENT_ID_ENDPOINT
|
||||
from onyx.document_index.vespa.index import VespaIndex
|
||||
from onyx.indexing.models import IndexingSetting
|
||||
@@ -173,10 +174,16 @@ def reset_vespa() -> None:
|
||||
check_index_swap(db_session)
|
||||
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
multipass_config = get_multipass_config(search_settings)
|
||||
index_name = search_settings.index_name
|
||||
|
||||
success = setup_vespa(
|
||||
document_index=VespaIndex(index_name=index_name, secondary_index_name=None),
|
||||
document_index=VespaIndex(
|
||||
index_name=index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=multipass_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
),
|
||||
index_setting=IndexingSetting.from_db_model(search_settings),
|
||||
secondary_index_setting=None,
|
||||
)
|
||||
@@ -250,10 +257,16 @@ def reset_vespa_multitenant() -> None:
|
||||
check_index_swap(db_session)
|
||||
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
multipass_config = get_multipass_config(search_settings)
|
||||
index_name = search_settings.index_name
|
||||
|
||||
success = setup_vespa(
|
||||
document_index=VespaIndex(index_name=index_name, secondary_index_name=None),
|
||||
document_index=VespaIndex(
|
||||
index_name=index_name,
|
||||
secondary_index_name=None,
|
||||
large_chunks_enabled=multipass_config.enable_large_chunks,
|
||||
secondary_large_chunks_enabled=None,
|
||||
),
|
||||
index_setting=IndexingSetting.from_db_model(search_settings),
|
||||
secondary_index_setting=None,
|
||||
)
|
||||
|
||||
@@ -6,7 +6,7 @@ import pytest
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.db.engine import get_sqlalchemy_engine
|
||||
from onyx.document_index.document_index_utils import get_both_index_names
|
||||
from onyx.document_index.document_index_utils import get_both_index_properties
|
||||
from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ def test_vespa_update() -> None:
|
||||
doc_id = "test-vespa-update"
|
||||
|
||||
with Session(get_sqlalchemy_engine()) as db_session:
|
||||
primary_index_name, _ = get_both_index_names(db_session)
|
||||
primary_index_name, _, _, _ = get_both_index_properties(db_session)
|
||||
endpoint = (
|
||||
f"{DOCUMENT_ID_ENDPOINT.format(index_name=primary_index_name)}/{doc_id}"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user