Compare commits

...

6 Commits

Author SHA1 Message Date
Evan Lohn
406e5b9947 use correct endpoint and query 2025-07-06 19:53:47 -07:00
Evan Lohn
e9a5498a23 fix visit api index name 2025-07-06 17:59:02 -07:00
Evan Lohn
5097cdbb6c correct vespa URL 2025-07-06 17:24:23 -07:00
Evan Lohn
414419f849 cursor based pages 2025-07-06 16:59:10 -07:00
Evan Lohn
994d230019 fix migration2 2025-07-06 16:32:47 -07:00
Evan Lohn
8fedcf30b3 fix migration 2025-07-06 16:32:47 -07:00

View File

@@ -9,14 +9,15 @@ Create Date: 2025-06-20 14:44:54.241159
from alembic import op
import sqlalchemy as sa
from urllib.parse import urlparse, urlunparse
from httpx import HTTPStatusError
import httpx
from onyx.document_index.factory import get_default_document_index
from onyx.db.search_settings import SearchSettings
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
from onyx.document_index.vespa.shared_utils.utils import (
replace_invalid_doc_id_characters,
)
from onyx.document_index.vespa_constants import SEARCH_ENDPOINT, DOCUMENT_ID_ENDPOINT
from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -110,11 +111,13 @@ def get_google_drive_documents_from_database() -> list[dict]:
return documents
def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
def update_document_id_in_database(
old_doc_id: str, new_doc_id: str, index_name: str
) -> None:
"""Update document IDs in all relevant database tables using copy-and-swap approach."""
bind = op.get_bind()
logger.info(f"Updating database tables for document {old_doc_id} -> {new_doc_id}")
# print(f"Updating database tables for document {old_doc_id} -> {new_doc_id}")
# Check if new document ID already exists
result = bind.execute(
@@ -123,9 +126,9 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
)
row = result.fetchone()
if row and row[0] > 0:
raise RuntimeError(
f"Document with ID {new_doc_id} already exists, cannot create duplicate"
)
# print(f"Document with ID {new_doc_id} already exists, deleting old one")
delete_document_from_db(old_doc_id, index_name)
return
# Step 1: Create a new document row with the new ID (copy all fields from old row)
# Use a conservative approach to handle columns that might not exist in all installations
@@ -147,6 +150,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
),
{"new_id": new_doc_id, "old_id": old_doc_id},
)
# print(f"Successfully updated database tables for document {old_doc_id} -> {new_doc_id}")
except Exception as e:
# If the full INSERT fails, try a more basic version with only core columns
logger.warning(f"Full INSERT failed, trying basic version: {e}")
@@ -173,15 +177,17 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
),
{"new_id": new_doc_id, "old_id": old_doc_id},
)
# print(f"Successfully updated document_by_connector_credential_pair table for document {old_doc_id} -> {new_doc_id}")
# Update search_doc table (stores search results for chat replay)
# This is critical for agent functionality
bind.execute(
sa.text(
"UPDATE search_doc SET document_id = :new_id WHERE document_id = :old_id"
),
{"new_id": new_doc_id, "old_id": old_doc_id},
)
# print(f"Successfully updated search_doc table for document {old_doc_id} -> {new_doc_id}")
# Update document_retrieval_feedback table (user feedback on documents)
bind.execute(
sa.text(
@@ -189,7 +195,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
),
{"new_id": new_doc_id, "old_id": old_doc_id},
)
# print(f"Successfully updated document_retrieval_feedback table for document {old_doc_id} -> {new_doc_id}")
# Update document__tag table (document-tag relationships)
bind.execute(
sa.text(
@@ -197,7 +203,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
),
{"new_id": new_doc_id, "old_id": old_doc_id},
)
# print(f"Successfully updated document__tag table for document {old_doc_id} -> {new_doc_id}")
# Update user_file table (user uploaded files linked to documents)
bind.execute(
sa.text(
@@ -205,7 +211,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
),
{"new_id": new_doc_id, "old_id": old_doc_id},
)
# print(f"Successfully updated user_file table for document {old_doc_id} -> {new_doc_id}")
# Update KG and chunk_stats tables (these may not exist in all installations)
try:
# Update kg_entity table
@@ -215,7 +221,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
),
{"new_id": new_doc_id, "old_id": old_doc_id},
)
# print(f"Successfully updated kg_entity table for document {old_doc_id} -> {new_doc_id}")
# Update kg_entity_extraction_staging table
bind.execute(
sa.text(
@@ -223,7 +229,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
),
{"new_id": new_doc_id, "old_id": old_doc_id},
)
# print(f"Successfully updated kg_entity_extraction_staging table for document {old_doc_id} -> {new_doc_id}")
# Update kg_relationship table
bind.execute(
sa.text(
@@ -231,7 +237,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
),
{"new_id": new_doc_id, "old_id": old_doc_id},
)
# print(f"Successfully updated kg_relationship table for document {old_doc_id} -> {new_doc_id}")
# Update kg_relationship_extraction_staging table
bind.execute(
sa.text(
@@ -239,7 +245,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
),
{"new_id": new_doc_id, "old_id": old_doc_id},
)
# print(f"Successfully updated kg_relationship_extraction_staging table for document {old_doc_id} -> {new_doc_id}")
# Update chunk_stats table
bind.execute(
sa.text(
@@ -247,7 +253,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
),
{"new_id": new_doc_id, "old_id": old_doc_id},
)
# print(f"Successfully updated chunk_stats table for document {old_doc_id} -> {new_doc_id}")
# Update chunk_stats ID field which includes document_id
bind.execute(
sa.text(
@@ -263,7 +269,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
"old_id_pattern": f"{old_doc_id}__%",
},
)
# print(f"Successfully updated chunk_stats ID field for document {old_doc_id} -> {new_doc_id}")
except Exception as e:
logger.warning(f"Some KG/chunk tables may not exist or failed to update: {e}")
@@ -271,106 +277,108 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
bind.execute(
sa.text("DELETE FROM document WHERE id = :old_id"), {"old_id": old_doc_id}
)
# print(f"Successfully deleted document {old_doc_id} from database")
def _visit_chunks(
*,
http_client: httpx.Client,
index_name: str,
selection: str,
continuation: str | None = None,
) -> tuple[list[dict], str | None]:
"""Helper that calls the /document/v1 visit API once and returns (docs, next_token)."""
# Use the same URL as the document API, but with visit-specific params
base_url = DOCUMENT_ID_ENDPOINT.format(index_name=index_name)
params: dict[str, str] = {
"selection": selection,
"wantedDocumentCount": "1000",
}
if continuation:
params["continuation"] = continuation
# print(f"Visiting chunks for selection '{selection}' with params {params}")
resp = http_client.get(base_url, params=params, timeout=None)
# print(f"Visited chunks for document {selection}")
resp.raise_for_status()
payload = resp.json()
return payload.get("documents", []), payload.get("continuation")
def delete_document_chunks_from_vespa(index_name: str, doc_id: str) -> None:
"""Delete all chunks for a document from Vespa using pagination."""
offset = 0
limit = 400 # Vespa's maximum hits per query
"""Delete all chunks for *doc_id* from Vespa using continuation-token paging (no offset)."""
total_deleted = 0
# Use exact match instead of contains - Document Selector Language doesn't support contains
selection = f'{index_name}.document_id=="{doc_id}"'
with get_vespa_http_client() as http_client:
continuation: str | None = None
while True:
# Use pagination to handle the 400 hit limit
yql = f'select documentid, document_id, chunk_id from sources {index_name} where document_id contains "{doc_id}"'
docs, continuation = _visit_chunks(
http_client=http_client,
index_name=index_name,
selection=selection,
continuation=continuation,
)
params = {
"yql": yql,
"hits": str(limit),
"offset": str(offset),
"timeout": "30s",
"format": "json",
}
if not docs:
break
response = http_client.get(SEARCH_ENDPOINT, params=params, timeout=None)
response.raise_for_status()
search_result = response.json()
hits = search_result.get("root", {}).get("children", [])
if not hits:
break # No more chunks to process
# Delete each chunk in this batch
for hit in hits:
vespa_doc_id = hit.get("id") # This is the internal Vespa document ID
if not vespa_doc_id:
print(f"No Vespa document ID found for chunk {hit}")
for doc in docs:
vespa_full_id = doc.get("id")
if not vespa_full_id:
continue
vespa_doc_id = vespa_doc_id.split("::")[-1] # get the UUID from the end
# Delete the chunk using the internal Vespa document ID
delete_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_id}"
vespa_doc_uuid = vespa_full_id.split("::")[-1]
delete_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_uuid}"
try:
resp = http_client.delete(delete_url)
resp.raise_for_status()
total_deleted += 1
except Exception as e:
print(f"Failed to delete chunk {vespa_doc_id}: {e}")
# Continue trying to delete other chunks even if one fails
continue
print(f"Failed to delete chunk {vespa_doc_uuid}: {e}")
# Move to next batch
offset += limit
# If we got fewer hits than the limit, we're done
if len(hits) < limit:
if not continuation:
break
def update_document_id_in_vespa(
index_name: str, old_doc_id: str, new_doc_id: str
) -> None:
"""Update a document's ID in Vespa by updating the document_id field."""
# Clean the new document ID for storage in Vespa (this handles invalid characters)
"""Update all chunks' document_id field from *old_doc_id* to *new_doc_id* using continuation paging."""
clean_new_doc_id = replace_invalid_doc_id_characters(new_doc_id)
offset = 0
limit = 400 # Vespa's maximum hits per query
total_updated = 0
# Use exact match instead of contains - Document Selector Language doesn't support contains
selection = f'{index_name}.document_id=="{old_doc_id}"'
with get_vespa_http_client() as http_client:
continuation: str | None = None
while True:
# Use pagination to handle the 400 hit limit
yql = f'select documentid, document_id, chunk_id from sources {index_name} where document_id contains "{old_doc_id}"'
# print(f"Visiting chunks for document {old_doc_id} -> {new_doc_id}")
docs, continuation = _visit_chunks(
http_client=http_client,
index_name=index_name,
selection=selection,
continuation=continuation,
)
params = {
"yql": yql,
"hits": str(limit),
"offset": str(offset),
"timeout": "30s",
"format": "json",
}
if not docs:
break
response = http_client.get(SEARCH_ENDPOINT, params=params, timeout=None)
response.raise_for_status()
search_result = response.json()
hits = search_result.get("root", {}).get("children", [])
if not hits:
break # No more chunks to process
# Update each chunk in this batch
for hit in hits:
vespa_doc_id = hit.get("id") # This is the internal Vespa document ID
if not vespa_doc_id:
print(f"No Vespa document ID found for chunk {hit}")
for doc in docs:
vespa_full_id = doc.get("id")
if not vespa_full_id:
continue
vespa_doc_id = vespa_doc_id.split("::")[-1] # get the UUID from the end
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_id}"
vespa_doc_uuid = vespa_full_id.split("::")[-1]
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_uuid}"
update_request = {
"fields": {"document_id": {"assign": clean_new_doc_id}}
}
@@ -378,16 +386,11 @@ def update_document_id_in_vespa(
try:
resp = http_client.put(vespa_url, json=update_request)
resp.raise_for_status()
total_updated += 1
except Exception as e:
print(f"Failed to update chunk {vespa_doc_id}: {e}")
print(f"Failed to update chunk {vespa_doc_uuid}: {e}")
raise
# Move to next batch
offset += limit
# If we got fewer hits than the limit, we're done
if len(hits) < limit:
if not continuation:
break
@@ -396,6 +399,39 @@ def delete_document_from_db(current_doc_id: str, index_name: str) -> None:
try:
bind = op.get_bind()
# Delete from agent-related tables first (order matters due to foreign keys)
# Delete from agent__sub_query__search_doc first since it references search_doc
bind.execute(
sa.text(
"""
DELETE FROM agent__sub_query__search_doc
WHERE search_doc_id IN (
SELECT id FROM search_doc WHERE document_id = :doc_id
)
"""
),
{"doc_id": current_doc_id},
)
# Delete from chat_message__search_doc
bind.execute(
sa.text(
"""
DELETE FROM chat_message__search_doc
WHERE search_doc_id IN (
SELECT id FROM search_doc WHERE document_id = :doc_id
)
"""
),
{"doc_id": current_doc_id},
)
# Now we can safely delete from search_doc
bind.execute(
sa.text("DELETE FROM search_doc WHERE document_id = :doc_id"),
{"doc_id": current_doc_id},
)
# Delete from document_by_connector_credential_pair
bind.execute(
sa.text(
@@ -405,11 +441,6 @@ def delete_document_from_db(current_doc_id: str, index_name: str) -> None:
)
# Delete from other tables that reference this document
bind.execute(
sa.text("DELETE FROM search_doc WHERE document_id = :doc_id"),
{"doc_id": current_doc_id},
)
bind.execute(
sa.text(
"DELETE FROM document_retrieval_feedback WHERE document_id = :doc_id"
@@ -510,8 +541,10 @@ def upgrade() -> None:
current_doc_id = doc_info["document_id"]
normalized_doc_id = normalize_google_drive_url(current_doc_id)
print(f"Processing document {current_doc_id} -> {normalized_doc_id}")
# Check for duplicates
if normalized_doc_id in all_normalized_doc_ids:
# print(f"Deleting duplicate document {current_doc_id}")
delete_document_from_db(current_doc_id, index_name)
continue
@@ -519,19 +552,22 @@ def upgrade() -> None:
# If the document ID already doesn't have query parameters, skip it
if current_doc_id == normalized_doc_id:
# print(f"Skipping document {current_doc_id} -> {normalized_doc_id} because it already has no query parameters")
continue
try:
# Update both database and Vespa in order
# Database first to ensure consistency
update_document_id_in_database(current_doc_id, normalized_doc_id)
update_document_id_in_database(
current_doc_id, normalized_doc_id, index_name
)
# For Vespa, we can now use the original document IDs since we're using contains matching
update_document_id_in_vespa(index_name, current_doc_id, normalized_doc_id)
updated_count += 1
# print(f"Finished updating document {current_doc_id} -> {normalized_doc_id}")
except Exception as e:
print(f"Failed to update document {current_doc_id}: {e}")
from httpx import HTTPStatusError
if isinstance(e, HTTPStatusError):
print(f"HTTPStatusError: {e}")