Compare commits

...

1 Commits

Author SHA1 Message Date
pablodanswer
e168756280 k 2025-01-01 18:58:35 -05:00
6 changed files with 54 additions and 13 deletions

View File

@@ -57,4 +57,7 @@ def get_uuid_from_chunk(
for referenced_chunk_id in chunk.large_chunk_reference_ids
]
)
# Add tenant_id if it exists
if hasattr(chunk, "tenant_id") and chunk.tenant_id:
unique_identifier_string += f"_tenant_{chunk.tenant_id}"
return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)

View File

@@ -149,6 +149,7 @@ class Indexable(abc.ABC):
self,
chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
tenant_id: str | None = None,
) -> set[DocumentInsertionRecord]:
"""
Takes a list of document chunks and indexes them in the document index
@@ -174,6 +175,8 @@ class Indexable(abc.ABC):
- chunks: Document chunks with all of the information needed for indexing to the document
index.
- fresh_index: Boolean indicating whether this is a fresh index with no existing documents.
- tenant_id: The tenant id to index the chunks for. If not provided, the chunks will be
indexed for the default tenant.
Returns:
List of document ids which map to unique documents and are used for deduping chunks

View File

@@ -180,6 +180,8 @@ def _get_chunks_via_visit_api(
selection += f" and {index_name}.chunk_id<={chunk_request.max_chunk_ind}"
if not get_large_chunks:
selection += f" and {index_name}.large_chunk_reference_ids == null"
if filters.tenant_id:
selection += f" and {index_name}.tenant_id == '{filters.tenant_id}'"
# Setting up the selection criteria in the query parameters
params = {
@@ -237,6 +239,7 @@ def get_all_vespa_ids_for_document_id(
index_name: str,
filters: IndexFilters | None = None,
get_large_chunks: bool = False,
tenant_id: str | None = None,
) -> list[str]:
document_chunks = _get_chunks_via_visit_api(
chunk_request=VespaChunkRequest(document_id=document_id),

View File

@@ -1,6 +1,7 @@
import concurrent.futures
import httpx
from pydantic import BaseModel
from retry import retry
from onyx.document_index.vespa.chunk_retrieval import (
@@ -18,12 +19,16 @@ CONTENT_SUMMARY = "content_summary"
@retry(tries=3, delay=1, backoff=2)
def _delete_vespa_doc_chunks(
document_id: str, index_name: str, http_client: httpx.Client
document_id: str,
index_name: str,
http_client: httpx.Client,
tenant_id: str | None = None,
) -> None:
doc_chunk_ids = get_all_vespa_ids_for_document_id(
document_id=document_id,
index_name=index_name,
get_large_chunks=True,
tenant_id=tenant_id,
)
for chunk_id in doc_chunk_ids:
@@ -37,8 +42,13 @@ def _delete_vespa_doc_chunks(
raise
class VespaDeletionRequest(BaseModel):
document_id: str
tenant_id: str | None
def delete_vespa_docs(
document_ids: list[str],
deletion_requests: list[VespaDeletionRequest],
index_name: str,
http_client: httpx.Client,
executor: concurrent.futures.ThreadPoolExecutor | None = None,
@@ -52,9 +62,13 @@ def delete_vespa_docs(
try:
doc_deletion_future = {
executor.submit(
_delete_vespa_doc_chunks, doc_id, index_name, http_client
): doc_id
for doc_id in document_ids
_delete_vespa_doc_chunks,
deletion_request.document_id,
index_name,
http_client,
deletion_request.tenant_id,
): deletion_request.document_id
for deletion_request in deletion_requests
}
for future in concurrent.futures.as_completed(doc_deletion_future):
# Will raise exception if the deletion raised an exception

View File

@@ -39,6 +39,7 @@ from onyx.document_index.vespa.chunk_retrieval import (
)
from onyx.document_index.vespa.chunk_retrieval import query_vespa
from onyx.document_index.vespa.deletion import delete_vespa_docs
from onyx.document_index.vespa.deletion import VespaDeletionRequest
from onyx.document_index.vespa.indexing_utils import batch_index_vespa_chunks
from onyx.document_index.vespa.indexing_utils import clean_chunk_id_copy
from onyx.document_index.vespa.indexing_utils import (
@@ -316,17 +317,22 @@ class VespaIndex(DocumentIndex):
# IMPORTANT: This must be done one index at a time, do not use secondary index here
cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks]
# Build a map from doc_id -> tenant_id (if tenant_id is not None).
# This allows us to create VespaDeletionRequest objects with tenant IDs below.
doc_id_to_tenant_id = {}
for chunk in cleaned_chunks:
doc_id_to_tenant_id[chunk.source_document.id] = chunk.tenant_id
existing_docs: set[str] = set()
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficial for
# indexing / updates / deletes since we have to make a large volume of requests.
with (
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
get_vespa_http_client() as http_client,
):
if not fresh_index:
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
print("Checking for existing documents")
# Determine which documents already exist in Vespa.
first_chunks = [
chunk for chunk in cleaned_chunks if chunk.chunk_id == 0
]
@@ -340,14 +346,23 @@ class VespaIndex(DocumentIndex):
)
)
# Pass VespaDeletionRequest objects (document_id and tenant_id) instead of just doc_ids.
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
deletion_requests = [
VespaDeletionRequest(
document_id=doc_id,
tenant_id=doc_id_to_tenant_id.get(doc_id), # Might be None
)
for doc_id in doc_id_batch
]
delete_vespa_docs(
document_ids=doc_id_batch,
deletion_requests=deletion_requests,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
# Index the cleaned chunks in batches.
for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE):
batch_index_vespa_chunks(
chunks=chunk_batch,
@@ -588,7 +603,7 @@ class VespaIndex(DocumentIndex):
return total_chunks_updated
def delete(self, doc_ids: list[str]) -> None:
def delete(self, doc_ids: list[str], tenant_id: str | None = None) -> None:
logger.info(f"Deleting {len(doc_ids)} documents from Vespa")
doc_ids = [replace_invalid_doc_id_characters(doc_id) for doc_id in doc_ids]
@@ -602,7 +617,10 @@ class VespaIndex(DocumentIndex):
for index_name in index_names:
delete_vespa_docs(
document_ids=doc_ids, index_name=index_name, http_client=http_client
document_ids=doc_ids,
index_name=index_name,
http_client=http_client,
tenant_id=tenant_id,
)
return

View File

@@ -217,7 +217,7 @@ def seed_initial_documents(
# as we just sent over the Vespa schema and there is a slight delay
index_with_retries = retry_builder(tries=15)(document_index.index)
index_with_retries(chunks=chunks, fresh_index=cohere_enabled)
index_with_retries(chunks=chunks, fresh_index=True)
# Mock a run for the UI even though it did not actually call out to anything
mock_successful_index_attempt(