Compare commits

...

5 Commits

Author SHA1 Message Date
Evan Lohn
e9a5498a23 fix visit api index name 2025-07-06 17:59:02 -07:00
Evan Lohn
5097cdbb6c correct vespa URL 2025-07-06 17:24:23 -07:00
Evan Lohn
414419f849 cursor based pages 2025-07-06 16:59:10 -07:00
Evan Lohn
994d230019 fix migration2 2025-07-06 16:32:47 -07:00
Evan Lohn
8fedcf30b3 fix migration 2025-07-06 16:32:47 -07:00

View File

@@ -9,14 +9,17 @@ Create Date: 2025-06-20 14:44:54.241159
from alembic import op
import sqlalchemy as sa
from urllib.parse import urlparse, urlunparse
from httpx import HTTPStatusError
import httpx
from onyx.document_index.factory import get_default_document_index
from onyx.db.search_settings import SearchSettings
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
from onyx.document_index.vespa.shared_utils.utils import (
replace_invalid_doc_id_characters,
)
from onyx.document_index.vespa_constants import SEARCH_ENDPOINT, DOCUMENT_ID_ENDPOINT
from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
from onyx.document_index.vespa_constants import VESPA_APP_CONTAINER_URL
from onyx.configs.app_configs import DOCUMENT_INDEX_NAME
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -110,7 +113,9 @@ def get_google_drive_documents_from_database() -> list[dict]:
return documents
def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
def update_document_id_in_database(
old_doc_id: str, new_doc_id: str, index_name: str
) -> None:
"""Update document IDs in all relevant database tables using copy-and-swap approach."""
bind = op.get_bind()
@@ -123,9 +128,9 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
)
row = result.fetchone()
if row and row[0] > 0:
raise RuntimeError(
f"Document with ID {new_doc_id} already exists, cannot create duplicate"
)
print(f"Document with ID {new_doc_id} already exists, deleting old one")
delete_document_from_db(old_doc_id, index_name)
return
# Step 1: Create a new document row with the new ID (copy all fields from old row)
# Use a conservative approach to handle columns that might not exist in all installations
@@ -175,6 +180,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
)
# Update search_doc table (stores search results for chat replay)
# This is critical for agent functionality
bind.execute(
sa.text(
"UPDATE search_doc SET document_id = :new_id WHERE document_id = :old_id"
@@ -273,104 +279,96 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
)
def _visit_chunks(
*,
http_client: httpx.Client,
cluster: str,
selection: str,
continuation: str | None = None,
) -> tuple[list[dict], str | None]:
"""Helper that calls the /document/v1 visit API once and returns (docs, next_token)."""
base_url = f"{VESPA_APP_CONTAINER_URL}/document/v1/?cluster={cluster}&stream=true&selection={selection}"
if continuation:
base_url += f"&continuation={continuation}"
resp = http_client.get(base_url, timeout=None)
resp.raise_for_status()
payload = resp.json()
return payload.get("documents", []), payload.get("continuation")
def delete_document_chunks_from_vespa(index_name: str, doc_id: str) -> None:
"""Delete all chunks for a document from Vespa using pagination."""
offset = 0
limit = 400 # Vespa's maximum hits per query
"""Delete all chunks for *doc_id* from Vespa using continuation-token paging (no offset)."""
total_deleted = 0
selection = (
f'document_id contains "{doc_id}"' # NB: this is a document selector, not YQL
)
with get_vespa_http_client() as http_client:
continuation: str | None = None
while True:
# Use pagination to handle the 400 hit limit
yql = f'select documentid, document_id, chunk_id from sources {index_name} where document_id contains "{doc_id}"'
docs, continuation = _visit_chunks(
http_client=http_client,
cluster=DOCUMENT_INDEX_NAME,
selection=selection,
continuation=continuation,
)
params = {
"yql": yql,
"hits": str(limit),
"offset": str(offset),
"timeout": "30s",
"format": "json",
}
if not docs:
break
response = http_client.get(SEARCH_ENDPOINT, params=params, timeout=None)
response.raise_for_status()
search_result = response.json()
hits = search_result.get("root", {}).get("children", [])
if not hits:
break # No more chunks to process
# Delete each chunk in this batch
for hit in hits:
vespa_doc_id = hit.get("id") # This is the internal Vespa document ID
if not vespa_doc_id:
print(f"No Vespa document ID found for chunk {hit}")
for doc in docs:
vespa_full_id = doc.get("id")
if not vespa_full_id:
continue
vespa_doc_id = vespa_doc_id.split("::")[-1] # get the UUID from the end
# Delete the chunk using the internal Vespa document ID
delete_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_id}"
vespa_doc_uuid = vespa_full_id.split("::")[-1]
delete_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_uuid}"
try:
resp = http_client.delete(delete_url)
resp.raise_for_status()
total_deleted += 1
except Exception as e:
print(f"Failed to delete chunk {vespa_doc_id}: {e}")
# Continue trying to delete other chunks even if one fails
continue
print(f"Failed to delete chunk {vespa_doc_uuid}: {e}")
# Move to next batch
offset += limit
# If we got fewer hits than the limit, we're done
if len(hits) < limit:
if not continuation:
break
def update_document_id_in_vespa(
index_name: str, old_doc_id: str, new_doc_id: str
) -> None:
"""Update a document's ID in Vespa by updating the document_id field."""
# Clean the new document ID for storage in Vespa (this handles invalid characters)
"""Update all chunks' document_id field from *old_doc_id* to *new_doc_id* using continuation paging."""
clean_new_doc_id = replace_invalid_doc_id_characters(new_doc_id)
offset = 0
limit = 400 # Vespa's maximum hits per query
total_updated = 0
selection = f'document_id contains "{old_doc_id}"'
with get_vespa_http_client() as http_client:
continuation: str | None = None
while True:
# Use pagination to handle the 400 hit limit
yql = f'select documentid, document_id, chunk_id from sources {index_name} where document_id contains "{old_doc_id}"'
docs, continuation = _visit_chunks(
http_client=http_client,
cluster=DOCUMENT_INDEX_NAME,
selection=selection,
continuation=continuation,
)
params = {
"yql": yql,
"hits": str(limit),
"offset": str(offset),
"timeout": "30s",
"format": "json",
}
if not docs:
break
response = http_client.get(SEARCH_ENDPOINT, params=params, timeout=None)
response.raise_for_status()
search_result = response.json()
hits = search_result.get("root", {}).get("children", [])
if not hits:
break # No more chunks to process
# Update each chunk in this batch
for hit in hits:
vespa_doc_id = hit.get("id") # This is the internal Vespa document ID
if not vespa_doc_id:
print(f"No Vespa document ID found for chunk {hit}")
for doc in docs:
vespa_full_id = doc.get("id")
if not vespa_full_id:
continue
vespa_doc_id = vespa_doc_id.split("::")[-1] # get the UUID from the end
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_id}"
vespa_doc_uuid = vespa_full_id.split("::")[-1]
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_uuid}"
update_request = {
"fields": {"document_id": {"assign": clean_new_doc_id}}
}
@@ -378,16 +376,11 @@ def update_document_id_in_vespa(
try:
resp = http_client.put(vespa_url, json=update_request)
resp.raise_for_status()
total_updated += 1
except Exception as e:
print(f"Failed to update chunk {vespa_doc_id}: {e}")
print(f"Failed to update chunk {vespa_doc_uuid}: {e}")
raise
# Move to next batch
offset += limit
# If we got fewer hits than the limit, we're done
if len(hits) < limit:
if not continuation:
break
@@ -396,6 +389,39 @@ def delete_document_from_db(current_doc_id: str, index_name: str) -> None:
try:
bind = op.get_bind()
# Delete from agent-related tables first (order matters due to foreign keys)
# Delete from agent__sub_query__search_doc first since it references search_doc
bind.execute(
sa.text(
"""
DELETE FROM agent__sub_query__search_doc
WHERE search_doc_id IN (
SELECT id FROM search_doc WHERE document_id = :doc_id
)
"""
),
{"doc_id": current_doc_id},
)
# Delete from chat_message__search_doc
bind.execute(
sa.text(
"""
DELETE FROM chat_message__search_doc
WHERE search_doc_id IN (
SELECT id FROM search_doc WHERE document_id = :doc_id
)
"""
),
{"doc_id": current_doc_id},
)
# Now we can safely delete from search_doc
bind.execute(
sa.text("DELETE FROM search_doc WHERE document_id = :doc_id"),
{"doc_id": current_doc_id},
)
# Delete from document_by_connector_credential_pair
bind.execute(
sa.text(
@@ -405,11 +431,6 @@ def delete_document_from_db(current_doc_id: str, index_name: str) -> None:
)
# Delete from other tables that reference this document
bind.execute(
sa.text("DELETE FROM search_doc WHERE document_id = :doc_id"),
{"doc_id": current_doc_id},
)
bind.execute(
sa.text(
"DELETE FROM document_retrieval_feedback WHERE document_id = :doc_id"
@@ -524,14 +545,15 @@ def upgrade() -> None:
try:
# Update both database and Vespa in order
# Database first to ensure consistency
update_document_id_in_database(current_doc_id, normalized_doc_id)
update_document_id_in_database(
current_doc_id, normalized_doc_id, index_name
)
# For Vespa, we can now use the original document IDs since we're using contains matching
update_document_id_in_vespa(index_name, current_doc_id, normalized_doc_id)
updated_count += 1
except Exception as e:
print(f"Failed to update document {current_doc_id}: {e}")
from httpx import HTTPStatusError
if isinstance(e, HTTPStatusError):
print(f"HTTPStatusError: {e}")