Compare commits

...

9 Commits

Author SHA1 Message Date
pablodanswer
4a318bac26 additional logs to narrow down issue 2025-01-28 13:55:04 -08:00
pablodanswer
52849386bd various improvements 2025-01-28 13:40:01 -08:00
pablodanswer
f02e6f00a2 add logs 2025-01-28 11:20:21 -08:00
Richard Kuo (Danswer)
7f18afbdb3 more debugging 2025-01-28 00:46:55 -08:00
Richard Kuo (Danswer)
6722e87fcf circular imports? 2025-01-27 21:21:48 -08:00
Richard Kuo (Danswer)
f87173f18d refactor multipass/db check out of VespaIndex 2025-01-27 20:37:06 -08:00
Richard Kuo (Danswer)
060a988d13 more debugging 2025-01-27 18:18:10 -08:00
Richard Kuo (Danswer)
12dcf6a09b add more logging 2025-01-27 14:29:47 -08:00
Richard Kuo (Danswer)
c1a4eea2d0 add timings for syncing 2025-01-27 11:54:17 -08:00
25 changed files with 375 additions and 129 deletions

View File

@@ -79,9 +79,17 @@ def document_by_cc_pair_cleanup_task(
action = "skip"
chunks_affected = 0
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
(
curr_ind_name,
sec_ind_name,
large_chunks,
secondary_large_chunks,
) = get_both_index_names(db_session)
doc_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
primary_index_name=curr_ind_name,
secondary_index_name=sec_ind_name,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=secondary_large_chunks,
)
retry_index = RetryDocumentIndex(doc_index)

View File

@@ -1092,30 +1092,50 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
def vespa_metadata_sync_task(
self: Task, document_id: str, tenant_id: str | None
) -> bool:
timings: dict[str, Any] = {}
start = time.monotonic()
timings["start"] = start
try:
with get_session_with_tenant(tenant_id) as db_session:
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
phase_start = time.monotonic()
(
curr_ind_name,
sec_ind_name,
large_chunks,
secondary_large_chunks,
) = get_both_index_names(db_session)
doc_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
primary_index_name=curr_ind_name,
secondary_index_name=sec_ind_name,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=secondary_large_chunks,
)
timings["get_index"] = time.monotonic() - phase_start
phase_start = time.monotonic()
retry_index = RetryDocumentIndex(doc_index)
doc = get_document(document_id, db_session)
if not doc:
return False
timings["get_document"] = time.monotonic() - phase_start
# document set sync
phase_start = time.monotonic()
doc_sets = fetch_document_sets_for_document(document_id, db_session)
update_doc_sets: set[str] = set(doc_sets)
timings["fetch_document_sets_for_document"] = time.monotonic() - phase_start
# User group sync
phase_start = time.monotonic()
doc_access = get_access_for_document(
document_id=document_id, db_session=db_session
)
timings["get_access_for_document"] = time.monotonic() - phase_start
phase_start = time.monotonic()
fields = VespaDocumentFields(
document_sets=update_doc_sets,
access=doc_access,
@@ -1130,10 +1150,13 @@ def vespa_metadata_sync_task(
chunk_count=doc.chunk_count,
fields=fields,
)
timings["index.update_single"] = time.monotonic() - phase_start
# update db last. Worst case = we crash right before this and
# the sync might repeat again later
phase_start = time.monotonic()
mark_document_as_synced(document_id, db_session)
timings["mark_document_as_synced"] = time.monotonic() - phase_start
# this code checks for and removes a per document sync key that is
# used to block out the same doc from continualy resyncing
@@ -1149,7 +1172,8 @@ def vespa_metadata_sync_task(
f"doc={document_id} "
f"action=sync "
f"chunks={chunks_affected} "
f"elapsed={elapsed:.2f}"
f"elapsed={elapsed:.2f} "
f"debug_timings={timings}"
)
except SoftTimeLimitExceeded:
task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}")

View File

@@ -34,6 +34,7 @@ from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import IndexAttempt
from onyx.db.models import IndexingStatus
from onyx.db.models import IndexModelStatus
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.indexing.embedder import DefaultIndexingEmbedder
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
@@ -149,6 +150,7 @@ class RunIndexingContext(BaseModel):
from_beginning: bool
is_primary: bool
search_settings_status: IndexModelStatus
large_chunks_enabled: bool
def _run_indexing(
@@ -179,6 +181,8 @@ def _run_indexing(
"Search settings must be set for indexing. This should not be possible."
)
multipass_config = get_multipass_config(index_attempt_start.search_settings)
# search_settings = index_attempt_start.search_settings
db_connector = index_attempt_start.connector_credential_pair.connector
db_credential = index_attempt_start.connector_credential_pair.credential
@@ -200,6 +204,7 @@ def _run_indexing(
index_attempt_start.search_settings.status == IndexModelStatus.PRESENT
),
search_settings_status=index_attempt_start.search_settings.status,
large_chunks_enabled=multipass_config.enable_large_chunks,
)
last_successful_index_time = (
@@ -221,7 +226,10 @@ def _run_indexing(
# Indexing is only done into one index at a time
document_index = get_default_document_index(
primary_index_name=ctx.index_name, secondary_index_name=None
primary_index_name=ctx.index_name,
secondary_index_name=None,
large_chunks_enabled=ctx.large_chunks_enabled,
secondary_large_chunks_enabled=None,
)
indexing_pipeline = build_indexing_pipeline(

View File

@@ -63,6 +63,7 @@ from onyx.db.models import ToolCall
from onyx.db.models import User
from onyx.db.persona import get_persona_by_id
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.file_store.models import ChatFileType
from onyx.file_store.models import FileDescriptor
@@ -425,8 +426,12 @@ def stream_chat_message_objects(
)
search_settings = get_current_search_settings(db_session)
mp_config = get_multipass_config(search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name, secondary_index_name=None
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
# Every chat Session begins with an empty root message

View File

@@ -29,6 +29,7 @@ from onyx.context.search.utils import inference_section_from_chunks
from onyx.context.search.utils import relevant_sections_to_indices
from onyx.db.models import User
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import VespaChunkRequest
from onyx.llm.interfaces import LLM
@@ -67,9 +68,12 @@ class SearchPipeline:
self.rerank_metrics_callback = rerank_metrics_callback
self.search_settings = get_current_search_settings(db_session)
mp_config = get_multipass_config(self.search_settings)
self.document_index = get_default_document_index(
primary_index_name=self.search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
self.prompt_config: PromptConfig | None = prompt_config

View File

@@ -452,10 +452,16 @@ def update_docs_chunk_count__no_commit(
doc_id_to_chunk_count: dict[str, int],
db_session: Session,
) -> None:
logger.debug("Updating chunk count for these documents")
logger.debug(document_ids)
logger.debug(doc_id_to_chunk_count)
documents_to_update = (
db_session.query(DbDocument).filter(DbDocument.id.in_(document_ids)).all()
)
for doc in documents_to_update:
logger.debug(
f"Updating chunk count for {doc.id} to {doc_id_to_chunk_count[doc.id]}"
)
doc.chunk_count = doc_id_to_chunk_count[doc.id]

View File

@@ -4,24 +4,76 @@ from uuid import UUID
from sqlalchemy.orm import Session
from onyx.configs.app_configs import ENABLE_MULTIPASS_INDEXING
from onyx.db.models import SearchSettings
from onyx.db.search_settings import get_current_search_settings
from onyx.db.search_settings import get_secondary_search_settings
from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo
from onyx.indexing.models import DocMetadataAwareIndexChunk
from onyx.indexing.models import EmbeddingProvider
from onyx.indexing.models import MultipassConfig
from shared_configs.configs import MULTI_TENANT
DEFAULT_BATCH_SIZE = 30
DEFAULT_INDEX_NAME = "danswer_chunk"
def get_both_index_names(db_session: Session) -> tuple[str, str | None]:
def should_use_multipass(search_settings: SearchSettings | None) -> bool:
"""
Determines whether multipass should be used based on the search settings
or the default config if settings are unavailable.
"""
if search_settings is not None:
return search_settings.multipass_indexing
return ENABLE_MULTIPASS_INDEXING
def can_use_large_chunks(multipass: bool, search_settings: SearchSettings) -> bool:
"""
Given multipass usage and an embedder, decides whether large chunks are allowed
based on model/provider constraints.
"""
# Only local models that support a larger context are from Nomic
# Cohere does not support larger contexts (they recommend not going above ~512 tokens)
return (
multipass
and search_settings.model_name.startswith("nomic-ai")
and search_settings.provider_type != EmbeddingProvider.COHERE
)
def get_multipass_config(search_settings: SearchSettings) -> MultipassConfig:
"""
Determines whether to enable multipass and large chunks by examining
the current search settings and the embedder configuration.
"""
if not search_settings:
return MultipassConfig(multipass_indexing=False, enable_large_chunks=False)
multipass = should_use_multipass(search_settings)
enable_large_chunks = can_use_large_chunks(multipass, search_settings)
return MultipassConfig(
multipass_indexing=multipass, enable_large_chunks=enable_large_chunks
)
def get_both_index_names(
db_session: Session,
) -> tuple[str, str | None, bool, bool | None]:
search_settings = get_current_search_settings(db_session)
config_1 = get_multipass_config(search_settings)
search_settings_new = get_secondary_search_settings(db_session)
if not search_settings_new:
return search_settings.index_name, None
return search_settings.index_name, None, config_1.enable_large_chunks, None
return search_settings.index_name, search_settings_new.index_name
config_2 = get_multipass_config(search_settings)
return (
search_settings.index_name,
search_settings_new.index_name,
config_1.enable_large_chunks,
config_2.enable_large_chunks,
)
def translate_boost_count_to_multiplier(boost: int) -> float:

View File

@@ -1,6 +1,7 @@
from sqlalchemy.orm import Session
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.vespa.index import VespaIndex
from shared_configs.configs import MULTI_TENANT
@@ -9,14 +10,19 @@ from shared_configs.configs import MULTI_TENANT
def get_default_document_index(
primary_index_name: str,
secondary_index_name: str | None,
large_chunks_enabled: bool,
secondary_large_chunks_enabled: bool | None,
) -> DocumentIndex:
"""Primary index is the index that is used for querying/updating etc.
Secondary index is for when both the currently used index and the upcoming
index both need to be updated, updates are applied to both indices"""
# Currently only supporting Vespa
return VespaIndex(
index_name=primary_index_name,
secondary_index_name=secondary_index_name,
large_chunks_enabled=large_chunks_enabled,
secondary_large_chunks_enabled=secondary_large_chunks_enabled,
multitenant=MULTI_TENANT,
)
@@ -26,7 +32,10 @@ def get_current_primary_default_document_index(db_session: Session) -> DocumentI
TODO: Use redis to cache this or something
"""
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
return get_default_document_index(
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)

View File

@@ -10,6 +10,7 @@ import zipfile
from dataclasses import dataclass
from datetime import datetime
from datetime import timedelta
from typing import Any
from typing import BinaryIO
from typing import cast
from typing import List
@@ -25,7 +26,6 @@ from onyx.configs.chat_configs import VESPA_SEARCHER_THREADS
from onyx.configs.constants import KV_REINDEX_KEY
from onyx.context.search.models import IndexFilters
from onyx.context.search.models import InferenceChunkUncleaned
from onyx.db.engine import get_session_with_tenant
from onyx.document_index.document_index_utils import get_document_chunk_ids
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.interfaces import DocumentInsertionRecord
@@ -44,9 +44,6 @@ from onyx.document_index.vespa.deletion import delete_vespa_chunks
from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks
from onyx.document_index.vespa.indexing_utils import check_for_final_chunk_existence
from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy
from onyx.document_index.vespa.indexing_utils import (
get_multipass_config,
)
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
from onyx.document_index.vespa.shared_utils.utils import (
replace_invalid_doc_id_characters,
@@ -132,13 +129,26 @@ class VespaIndex(DocumentIndex):
self,
index_name: str,
secondary_index_name: str | None,
large_chunks_enabled: bool,
secondary_large_chunks_enabled: bool | None,
multitenant: bool = False,
) -> None:
self.index_name = index_name
self.secondary_index_name = secondary_index_name
self.large_chunks_enabled = large_chunks_enabled
self.secondary_large_chunks_enabled = secondary_large_chunks_enabled
self.multitenant = multitenant
self.http_client = get_vespa_http_client()
self.index_to_large_chunks_enabled: dict[str, bool] = {}
self.index_to_large_chunks_enabled[index_name] = large_chunks_enabled
if secondary_index_name and secondary_large_chunks_enabled:
self.index_to_large_chunks_enabled[
secondary_index_name
] = secondary_large_chunks_enabled
def ensure_indices_exist(
self,
index_embedding_dim: int,
@@ -349,6 +359,12 @@ class VespaIndex(DocumentIndex):
for doc_id in doc_id_to_new_chunk_cnt.keys()
]
logger.debug("Indexing these doc IDs / counts")
logger.debug(doc_id_to_new_chunk_cnt)
logger.debug("Enriched docs are as follows")
logger.debug(enriched_doc_infos)
for cleaned_doc_info in enriched_doc_infos:
# If the document has previously indexed chunks, we know it previously existed
if cleaned_doc_info.chunk_end_index:
@@ -523,6 +539,7 @@ class VespaIndex(DocumentIndex):
index_name: str,
fields: VespaDocumentFields,
doc_id: str,
http_client: httpx.Client,
) -> None:
"""
Update a single "chunk" (document) in Vespa using its chunk ID.
@@ -554,18 +571,17 @@ class VespaIndex(DocumentIndex):
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}?create=true"
with get_vespa_http_client(http2=False) as http_client:
try:
resp = http_client.put(
vespa_url,
headers={"Content-Type": "application/json"},
json=update_dict,
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}"
logger.error(error_message)
raise
try:
resp = http_client.put(
vespa_url,
headers={"Content-Type": "application/json"},
json=update_dict,
)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
error_message = f"Failed to update doc chunk {doc_chunk_id} (doc_id={doc_id}). Details: {e.response.text}"
logger.error(error_message)
raise
def update_single(
self,
@@ -579,21 +595,23 @@ class VespaIndex(DocumentIndex):
function will complete with no errors or exceptions.
Handle other exceptions if you wish to implement retry behavior
"""
start = time.monotonic()
timings: dict[str, Any] = {}
doc_chunk_count = 0
index_names = [self.index_name]
if self.secondary_index_name:
index_names.append(self.secondary_index_name)
index = 0
with get_vespa_http_client(http2=False) as http_client:
for index_name in index_names:
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
multipass_config = get_multipass_config(
db_session=db_session,
primary_index=index_name == self.index_name,
)
large_chunks_enabled = multipass_config.enable_large_chunks
timings["get_vespa_http_client_enter"] = time.monotonic() - start
for (
index_name,
large_chunks_enabled,
) in self.index_to_large_chunks_enabled.items():
index += 1
phase_start = time.monotonic()
enriched_doc_infos = VespaIndex.enrich_basic_chunk_info(
index_name=index_name,
http_client=http_client,
@@ -608,16 +626,42 @@ class VespaIndex(DocumentIndex):
large_chunks_enabled=large_chunks_enabled,
)
doc_chunk_count += len(doc_chunk_ids)
for doc_chunk_id in doc_chunk_ids:
self.update_single_chunk(
doc_chunk_id=doc_chunk_id,
index_name=index_name,
fields=fields,
doc_id=doc_id,
if len(doc_chunk_ids) == 0:
logger.warning(
f"len(doc_chunk_ids) == 0: "
f"tenant_id={tenant_id} "
f"enriched_doc_infos={enriched_doc_infos} "
f"large_chunks_enabled={large_chunks_enabled}"
)
logger.warning(
"Document chunk info:\n"
f"Enriched doc info: {enriched_doc_infos}\n"
f"Doc chunk ids: {doc_chunk_ids}\n"
f"Document ID: {doc_id}"
f"Tenant_id: {tenant_id}"
)
doc_chunk_count += len(doc_chunk_ids)
timings[f"prep_{index}"] = time.monotonic() - phase_start
chunk = 0
for doc_chunk_id in doc_chunk_ids:
chunk += 1
phase_start = time.monotonic()
self.update_single_chunk(
doc_chunk_id, index_name, fields, doc_id, http_client
)
timings[f"chunk_{index}_{chunk}"] = time.monotonic() - phase_start
phase_start = time.monotonic()
timings["get_vespa_http_client_exit"] = time.monotonic() - phase_start
elapsed = time.monotonic() - start
logger.debug(
f"num_chunks={doc_chunk_count} elapsed={elapsed:.2f} timings={timings}"
)
return doc_chunk_count
def delete_single(
@@ -642,14 +686,10 @@ class VespaIndex(DocumentIndex):
) as http_client, concurrent.futures.ThreadPoolExecutor(
max_workers=NUM_THREADS
) as executor:
for index_name in index_names:
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
multipass_config = get_multipass_config(
db_session=db_session,
primary_index=index_name == self.index_name,
)
large_chunks_enabled = multipass_config.enable_large_chunks
for (
index_name,
large_chunks_enabled,
) in self.index_to_large_chunks_enabled.items():
enriched_doc_infos = VespaIndex.enrich_basic_chunk_info(
index_name=index_name,
http_client=http_client,

View File

@@ -7,15 +7,10 @@ from http import HTTPStatus
import httpx
from retry import retry
from sqlalchemy.orm import Session
from onyx.configs.app_configs import ENABLE_MULTIPASS_INDEXING
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
get_experts_stores_representations,
)
from onyx.db.models import SearchSettings
from onyx.db.search_settings import get_current_search_settings
from onyx.db.search_settings import get_secondary_search_settings
from onyx.document_index.document_index_utils import get_uuid_from_chunk
from onyx.document_index.document_index_utils import get_uuid_from_chunk_info_old
from onyx.document_index.interfaces import MinimalDocumentIndexingInfo
@@ -50,8 +45,6 @@ from onyx.document_index.vespa_constants import TENANT_ID
from onyx.document_index.vespa_constants import TITLE
from onyx.document_index.vespa_constants import TITLE_EMBEDDING
from onyx.indexing.models import DocMetadataAwareIndexChunk
from onyx.indexing.models import EmbeddingProvider
from onyx.indexing.models import MultipassConfig
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -275,46 +268,22 @@ def check_for_final_chunk_existence(
index += 1
def should_use_multipass(search_settings: SearchSettings | None) -> bool:
"""
Determines whether multipass should be used based on the search settings
or the default config if settings are unavailable.
"""
if search_settings is not None:
return search_settings.multipass_indexing
return ENABLE_MULTIPASS_INDEXING
def can_use_large_chunks(multipass: bool, search_settings: SearchSettings) -> bool:
"""
Given multipass usage and an embedder, decides whether large chunks are allowed
based on model/provider constraints.
"""
# Only local models that support a larger context are from Nomic
# Cohere does not support larger contexts (they recommend not going above ~512 tokens)
return (
multipass
and search_settings.model_name.startswith("nomic-ai")
and search_settings.provider_type != EmbeddingProvider.COHERE
)
def get_multipass_config(
db_session: Session, primary_index: bool = True
) -> MultipassConfig:
"""
Determines whether to enable multipass and large chunks by examining
the current search settings and the embedder configuration.
"""
search_settings = (
get_current_search_settings(db_session)
if primary_index
else get_secondary_search_settings(db_session)
)
multipass = should_use_multipass(search_settings)
if not search_settings:
return MultipassConfig(multipass_indexing=False, enable_large_chunks=False)
enable_large_chunks = can_use_large_chunks(multipass, search_settings)
return MultipassConfig(
multipass_indexing=multipass, enable_large_chunks=enable_large_chunks
)
# def get_multipass_config(
# db_session: Session, primary_index: bool = True
# ) -> MultipassConfig:
# """
# Determines whether to enable multipass and large chunks by examining
# the current search settings and the embedder configuration.
# """
# search_settings = (
# get_current_search_settings(db_session)
# if primary_index
# else get_secondary_search_settings(db_session)
# )
# multipass = should_use_multipass(search_settings)
# if not search_settings:
# return MultipassConfig(multipass_indexing=False, enable_large_chunks=False)
# enable_large_chunks = can_use_large_chunks(multipass, search_settings)
# return MultipassConfig(
# multipass_indexing=multipass, enable_large_chunks=enable_large_chunks
# )

View File

@@ -31,14 +31,15 @@ from onyx.db.document import upsert_documents
from onyx.db.document_set import fetch_document_sets_for_documents
from onyx.db.index_attempt import create_index_attempt_error
from onyx.db.models import Document as DBDocument
from onyx.db.search_settings import get_current_search_settings
from onyx.db.tag import create_or_add_document_tag
from onyx.db.tag import create_or_add_document_tag_list
from onyx.document_index.document_index_utils import (
get_multipass_config,
)
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.interfaces import DocumentMetadata
from onyx.document_index.interfaces import IndexBatchParams
from onyx.document_index.vespa.indexing_utils import (
get_multipass_config,
)
from onyx.indexing.chunker import Chunker
from onyx.indexing.embedder import IndexingEmbedder
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
@@ -358,8 +359,15 @@ def index_doc_batch(
)
logger.debug("Filtering Documents")
logger.debug("Document IDs before filtering")
logger.debug([doc.id for doc in document_batch])
filtered_documents = filter_fnc(document_batch)
logger.debug("Document IDs after filtering")
logger.debug([doc.id for doc in filtered_documents])
ctx = index_doc_batch_prepare(
documents=filtered_documents,
index_attempt_metadata=index_attempt_metadata,
@@ -481,6 +489,7 @@ def index_doc_batch(
update_docs_updated_at__no_commit(
ids_to_new_updated_at=ids_to_new_updated_at, db_session=db_session
)
logger.info("UPDATING DOCS")
update_docs_last_modified__no_commit(
document_ids=last_modified_ids, db_session=db_session
@@ -527,7 +536,8 @@ def build_indexing_pipeline(
callback: IndexingHeartbeatInterface | None = None,
) -> IndexingPipelineProtocol:
"""Builds a pipeline which takes in a list (batch) of docs and indexes them."""
multipass_config = get_multipass_config(db_session, primary_index=True)
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
chunker = chunker or Chunker(
tokenizer=embedder.embedding_model.tokenizer,

View File

@@ -55,9 +55,7 @@ class DocAwareChunk(BaseChunk):
def to_short_descriptor(self) -> str:
"""Used when logging the identity of a chunk"""
return (
f"Chunk ID: '{self.chunk_id}'; {self.source_document.to_short_descriptor()}"
)
return f"{self.source_document.to_short_descriptor()} Chunk ID: {self.chunk_id}"
class IndexChunk(DocAwareChunk):

View File

@@ -34,8 +34,18 @@ def get_random_chunks_from_doc_sets(
"""
Retrieves random chunks from the specified document sets.
"""
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
document_index = get_default_document_index(curr_ind_name, sec_ind_name)
(
curr_ind_name,
sec_ind_name,
large_chunks,
secondary_large_chunks,
) = get_both_index_names(db_session)
document_index = get_default_document_index(
curr_ind_name,
sec_ind_name,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=secondary_large_chunks,
)
acl_filters = build_access_filters_for_user(user, db_session)
filters = IndexFilters(document_set=doc_sets, access_control_list=acl_filters)

View File

@@ -3,6 +3,7 @@ import json
import os
from typing import cast
from sqlalchemy import update
from sqlalchemy.orm import Session
from onyx.access.models import default_public_access
@@ -23,7 +24,9 @@ from onyx.db.document import check_docs_exist
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.index_attempt import mock_successful_index_attempt
from onyx.db.models import Document as DbDocument
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import IndexBatchParams
from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout
@@ -59,6 +62,7 @@ def _create_indexable_chunks(
doc_updated_at=None,
primary_owners=[],
secondary_owners=[],
chunk_count=1,
)
if preprocessed_doc["chunk_ind"] == 0:
ids_to_documents[document.id] = document
@@ -155,8 +159,12 @@ def seed_initial_documents(
logger.info("Embedding model has been updated, skipping")
return
mp_config = get_multipass_config(search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name, secondary_index_name=None
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
# Create a connector so the user can delete it if they want
@@ -240,4 +248,12 @@ def seed_initial_documents(
db_session=db_session,
)
# Since we bypass the indexing flow, we need to manually update the chunk count
for doc in docs:
db_session.execute(
update(DbDocument)
.where(DbDocument.id == doc.id)
.values(chunk_count=doc.chunk_count)
)
kv_store.store(KV_DOCUMENTS_SEEDED_KEY, True)

View File

@@ -12,6 +12,7 @@ from onyx.context.search.preprocessing.access_filters import (
from onyx.db.engine import get_session
from onyx.db.models import User
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import VespaChunkRequest
from onyx.natural_language_processing.utils import get_tokenizer
@@ -32,9 +33,12 @@ def get_document_info(
db_session: Session = Depends(get_session),
) -> DocumentInfo:
search_settings = get_current_search_settings(db_session)
mp_config = get_multipass_config(search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name, secondary_index_name=None
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
user_acl_filters = build_access_filters_for_user(user, db_session)
@@ -79,9 +83,12 @@ def get_chunk_info(
db_session: Session = Depends(get_session),
) -> ChunkInfo:
search_settings = get_current_search_settings(db_session)
mp_config = get_multipass_config(search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name, secondary_index_name=None
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
user_acl_filters = build_access_filters_for_user(user, db_session)

View File

@@ -22,6 +22,7 @@ from onyx.db.search_settings import get_embedding_provider_from_provider_type
from onyx.db.search_settings import get_secondary_search_settings
from onyx.db.search_settings import update_current_search_settings
from onyx.db.search_settings import update_search_settings_status
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.file_processing.unstructured import delete_unstructured_api_key
from onyx.file_processing.unstructured import get_unstructured_api_key
@@ -97,9 +98,13 @@ def set_new_search_settings(
)
# Ensure Vespa has the new index immediately
mp_config_1 = get_multipass_config(search_settings)
mp_config_2 = get_multipass_config(new_search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name,
secondary_index_name=new_search_settings.index_name,
large_chunks_enabled=mp_config_1.enable_large_chunks,
secondary_large_chunks_enabled=mp_config_2.enable_large_chunks,
)
document_index.ensure_indices_exist(

View File

@@ -89,11 +89,24 @@ def upsert_ingestion_doc(
)
# Need to index for both the primary and secondary index if possible
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
(
curr_ind_name,
sec_ind_name,
large_chunks,
secondary_large_chunks,
) = get_both_index_names(db_session)
curr_doc_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=None
primary_index_name=curr_ind_name,
secondary_index_name=None,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=None,
)
# curr_ind_name, sec_ind_name = get_both_index_names(db_session)
# curr_doc_index = get_default_document_index(
# primary_index_name=curr_ind_name, secondary_index_name=None
# )
search_settings = get_current_search_settings(db_session)
index_embedding_model = DefaultIndexingEmbedder.from_db_search_settings(
@@ -118,8 +131,12 @@ def upsert_ingestion_doc(
# If there's a secondary index being built, index the doc but don't use it for return here
if sec_ind_name:
# rkuo: i don't understand why we create the secondaray index with the current index again
sec_doc_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=None
primary_index_name=curr_ind_name,
secondary_index_name=None,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=None,
)
sec_search_settings = get_secondary_search_settings(db_session)

View File

@@ -27,6 +27,7 @@ from onyx.db.engine import get_session
from onyx.db.models import User
from onyx.db.search_settings import get_current_search_settings
from onyx.db.tag import find_tags
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.vespa.index import VespaIndex
from onyx.server.query_and_chat.models import AdminSearchRequest
@@ -64,9 +65,14 @@ def admin_search(
tenant_id=tenant_id,
)
search_settings = get_current_search_settings(db_session)
mp_config = get_multipass_config(search_settings)
document_index = get_default_document_index(
primary_index_name=search_settings.index_name, secondary_index_name=None
primary_index_name=search_settings.index_name,
secondary_index_name=None,
large_chunks_enabled=mp_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
if not isinstance(document_index, VespaIndex):
raise HTTPException(
status_code=400,

View File

@@ -30,6 +30,7 @@ from onyx.db.search_settings import get_secondary_search_settings
from onyx.db.search_settings import update_current_search_settings
from onyx.db.search_settings import update_secondary_search_settings
from onyx.db.swap_index import check_index_swap
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.vespa.index import VespaIndex
@@ -71,7 +72,13 @@ def setup_onyx(
"""
check_index_swap(db_session=db_session)
search_settings = get_current_search_settings(db_session)
multipass_config_1 = get_multipass_config(search_settings)
secondary_large_chunks_enabled: bool | None = None
secondary_search_settings = get_secondary_search_settings(db_session)
if secondary_search_settings:
multipass_config_2 = get_multipass_config(secondary_search_settings)
secondary_large_chunks_enabled = multipass_config_2.enable_large_chunks
# Break bad state for thrashing indexes
if secondary_search_settings and DISABLE_INDEX_UPDATE_ON_SWAP:
@@ -123,9 +130,11 @@ def setup_onyx(
logger.notice("Verifying Document Index(s) is/are available.")
document_index = get_default_document_index(
primary_index_name=search_settings.index_name,
large_chunks_enabled=multipass_config_1.enable_large_chunks,
secondary_index_name=secondary_search_settings.index_name
if secondary_search_settings
else None,
secondary_large_chunks_enabled=secondary_large_chunks_enabled,
)
success = setup_vespa(

View File

@@ -191,9 +191,17 @@ def _delete_connector(cc_pair_id: int, db_session: Session) -> None:
)
try:
logger.notice("Deleting information from Vespa and Postgres")
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
(
curr_ind_name,
sec_ind_name,
large_chunks,
secondary_large_chunks,
) = get_both_index_names(db_session)
document_index = get_default_document_index(
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
primary_index_name=curr_ind_name,
secondary_index_name=sec_ind_name,
large_chunks_enabled=large_chunks,
secondary_large_chunks_enabled=secondary_large_chunks,
)
files_deleted_count = _unsafe_deletion(

View File

@@ -5,6 +5,8 @@ import sys
from sqlalchemy import text
from sqlalchemy.orm import Session
from onyx.document_index.document_index_utils import get_multipass_config
# makes it so `PYTHONPATH=.` is not required when running this script
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
@@ -54,8 +56,14 @@ def main() -> None:
# Setup Vespa index
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
index_name = search_settings.index_name
vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None)
vespa_index = VespaIndex(
index_name=index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
# Delete chunks from Vespa first
print("Deleting orphaned document chunks from Vespa")

View File

@@ -16,6 +16,7 @@ from onyx.configs.constants import DocumentSource
from onyx.connectors.models import Document
from onyx.db.engine import get_session_context_manager
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.vespa.index import VespaIndex
from onyx.indexing.indexing_pipeline import IndexBatchParams
from onyx.indexing.models import ChunkEmbedding
@@ -133,10 +134,16 @@ def seed_dummy_docs(
) -> None:
with get_session_context_manager() as db_session:
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
index_name = search_settings.index_name
embedding_dim = search_settings.model_dim
vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None)
vespa_index = VespaIndex(
index_name=index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
print(index_name)
all_chunks = []

View File

@@ -9,6 +9,7 @@ from onyx.configs.model_configs import DOC_EMBEDDING_DIM
from onyx.context.search.models import IndexFilters
from onyx.db.engine import get_session_context_manager
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.vespa.index import VespaIndex
from scripts.query_time_check.seed_dummy_docs import TOTAL_ACL_ENTRIES_PER_CATEGORY
from scripts.query_time_check.seed_dummy_docs import TOTAL_DOC_SETS
@@ -62,9 +63,15 @@ def test_hybrid_retrieval_times(
) -> None:
with get_session_context_manager() as db_session:
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
index_name = search_settings.index_name
vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None)
vespa_index = VespaIndex(
index_name=index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
)
# Generate random queries
queries = [f"Random Query {i}" for i in range(number_of_queries)]

View File

@@ -18,6 +18,7 @@ from onyx.db.engine import get_session_with_tenant
from onyx.db.engine import SYNC_DB_API
from onyx.db.search_settings import get_current_search_settings
from onyx.db.swap_index import check_index_swap
from onyx.document_index.document_index_utils import get_multipass_config
from onyx.document_index.vespa.index import DOCUMENT_ID_ENDPOINT
from onyx.document_index.vespa.index import VespaIndex
from onyx.indexing.models import IndexingSetting
@@ -173,10 +174,16 @@ def reset_vespa() -> None:
check_index_swap(db_session)
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
index_name = search_settings.index_name
success = setup_vespa(
document_index=VespaIndex(index_name=index_name, secondary_index_name=None),
document_index=VespaIndex(
index_name=index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
),
index_setting=IndexingSetting.from_db_model(search_settings),
secondary_index_setting=None,
)
@@ -250,10 +257,16 @@ def reset_vespa_multitenant() -> None:
check_index_swap(db_session)
search_settings = get_current_search_settings(db_session)
multipass_config = get_multipass_config(search_settings)
index_name = search_settings.index_name
success = setup_vespa(
document_index=VespaIndex(index_name=index_name, secondary_index_name=None),
document_index=VespaIndex(
index_name=index_name,
secondary_index_name=None,
large_chunks_enabled=multipass_config.enable_large_chunks,
secondary_large_chunks_enabled=None,
),
index_setting=IndexingSetting.from_db_model(search_settings),
secondary_index_setting=None,
)

View File

@@ -19,7 +19,7 @@ def test_vespa_update() -> None:
doc_id = "test-vespa-update"
with Session(get_sqlalchemy_engine()) as db_session:
primary_index_name, _ = get_both_index_names(db_session)
primary_index_name, _, _, _ = get_both_index_names(db_session)
endpoint = (
f"{DOCUMENT_ID_ENDPOINT.format(index_name=primary_index_name)}/{doc_id}"
)