mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-02-25 11:45:47 +00:00
Compare commits
6 Commits
ci_python_
...
v1.0.0-clo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
406e5b9947 | ||
|
|
e9a5498a23 | ||
|
|
5097cdbb6c | ||
|
|
414419f849 | ||
|
|
994d230019 | ||
|
|
8fedcf30b3 |
@@ -9,14 +9,15 @@ Create Date: 2025-06-20 14:44:54.241159
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
|
||||
from httpx import HTTPStatusError
|
||||
import httpx
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.db.search_settings import SearchSettings
|
||||
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
|
||||
from onyx.document_index.vespa.shared_utils.utils import (
|
||||
replace_invalid_doc_id_characters,
|
||||
)
|
||||
from onyx.document_index.vespa_constants import SEARCH_ENDPOINT, DOCUMENT_ID_ENDPOINT
|
||||
from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
@@ -110,11 +111,13 @@ def get_google_drive_documents_from_database() -> list[dict]:
|
||||
return documents
|
||||
|
||||
|
||||
def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
def update_document_id_in_database(
|
||||
old_doc_id: str, new_doc_id: str, index_name: str
|
||||
) -> None:
|
||||
"""Update document IDs in all relevant database tables using copy-and-swap approach."""
|
||||
bind = op.get_bind()
|
||||
|
||||
logger.info(f"Updating database tables for document {old_doc_id} -> {new_doc_id}")
|
||||
# print(f"Updating database tables for document {old_doc_id} -> {new_doc_id}")
|
||||
|
||||
# Check if new document ID already exists
|
||||
result = bind.execute(
|
||||
@@ -123,9 +126,9 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
)
|
||||
row = result.fetchone()
|
||||
if row and row[0] > 0:
|
||||
raise RuntimeError(
|
||||
f"Document with ID {new_doc_id} already exists, cannot create duplicate"
|
||||
)
|
||||
# print(f"Document with ID {new_doc_id} already exists, deleting old one")
|
||||
delete_document_from_db(old_doc_id, index_name)
|
||||
return
|
||||
|
||||
# Step 1: Create a new document row with the new ID (copy all fields from old row)
|
||||
# Use a conservative approach to handle columns that might not exist in all installations
|
||||
@@ -147,6 +150,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
),
|
||||
{"new_id": new_doc_id, "old_id": old_doc_id},
|
||||
)
|
||||
# print(f"Successfully updated database tables for document {old_doc_id} -> {new_doc_id}")
|
||||
except Exception as e:
|
||||
# If the full INSERT fails, try a more basic version with only core columns
|
||||
logger.warning(f"Full INSERT failed, trying basic version: {e}")
|
||||
@@ -173,15 +177,17 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
),
|
||||
{"new_id": new_doc_id, "old_id": old_doc_id},
|
||||
)
|
||||
# print(f"Successfully updated document_by_connector_credential_pair table for document {old_doc_id} -> {new_doc_id}")
|
||||
|
||||
# Update search_doc table (stores search results for chat replay)
|
||||
# This is critical for agent functionality
|
||||
bind.execute(
|
||||
sa.text(
|
||||
"UPDATE search_doc SET document_id = :new_id WHERE document_id = :old_id"
|
||||
),
|
||||
{"new_id": new_doc_id, "old_id": old_doc_id},
|
||||
)
|
||||
|
||||
# print(f"Successfully updated search_doc table for document {old_doc_id} -> {new_doc_id}")
|
||||
# Update document_retrieval_feedback table (user feedback on documents)
|
||||
bind.execute(
|
||||
sa.text(
|
||||
@@ -189,7 +195,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
),
|
||||
{"new_id": new_doc_id, "old_id": old_doc_id},
|
||||
)
|
||||
|
||||
# print(f"Successfully updated document_retrieval_feedback table for document {old_doc_id} -> {new_doc_id}")
|
||||
# Update document__tag table (document-tag relationships)
|
||||
bind.execute(
|
||||
sa.text(
|
||||
@@ -197,7 +203,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
),
|
||||
{"new_id": new_doc_id, "old_id": old_doc_id},
|
||||
)
|
||||
|
||||
# print(f"Successfully updated document__tag table for document {old_doc_id} -> {new_doc_id}")
|
||||
# Update user_file table (user uploaded files linked to documents)
|
||||
bind.execute(
|
||||
sa.text(
|
||||
@@ -205,7 +211,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
),
|
||||
{"new_id": new_doc_id, "old_id": old_doc_id},
|
||||
)
|
||||
|
||||
# print(f"Successfully updated user_file table for document {old_doc_id} -> {new_doc_id}")
|
||||
# Update KG and chunk_stats tables (these may not exist in all installations)
|
||||
try:
|
||||
# Update kg_entity table
|
||||
@@ -215,7 +221,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
),
|
||||
{"new_id": new_doc_id, "old_id": old_doc_id},
|
||||
)
|
||||
|
||||
# print(f"Successfully updated kg_entity table for document {old_doc_id} -> {new_doc_id}")
|
||||
# Update kg_entity_extraction_staging table
|
||||
bind.execute(
|
||||
sa.text(
|
||||
@@ -223,7 +229,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
),
|
||||
{"new_id": new_doc_id, "old_id": old_doc_id},
|
||||
)
|
||||
|
||||
# print(f"Successfully updated kg_entity_extraction_staging table for document {old_doc_id} -> {new_doc_id}")
|
||||
# Update kg_relationship table
|
||||
bind.execute(
|
||||
sa.text(
|
||||
@@ -231,7 +237,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
),
|
||||
{"new_id": new_doc_id, "old_id": old_doc_id},
|
||||
)
|
||||
|
||||
# print(f"Successfully updated kg_relationship table for document {old_doc_id} -> {new_doc_id}")
|
||||
# Update kg_relationship_extraction_staging table
|
||||
bind.execute(
|
||||
sa.text(
|
||||
@@ -239,7 +245,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
),
|
||||
{"new_id": new_doc_id, "old_id": old_doc_id},
|
||||
)
|
||||
|
||||
# print(f"Successfully updated kg_relationship_extraction_staging table for document {old_doc_id} -> {new_doc_id}")
|
||||
# Update chunk_stats table
|
||||
bind.execute(
|
||||
sa.text(
|
||||
@@ -247,7 +253,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
),
|
||||
{"new_id": new_doc_id, "old_id": old_doc_id},
|
||||
)
|
||||
|
||||
# print(f"Successfully updated chunk_stats table for document {old_doc_id} -> {new_doc_id}")
|
||||
# Update chunk_stats ID field which includes document_id
|
||||
bind.execute(
|
||||
sa.text(
|
||||
@@ -263,7 +269,7 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
"old_id_pattern": f"{old_doc_id}__%",
|
||||
},
|
||||
)
|
||||
|
||||
# print(f"Successfully updated chunk_stats ID field for document {old_doc_id} -> {new_doc_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Some KG/chunk tables may not exist or failed to update: {e}")
|
||||
|
||||
@@ -271,106 +277,108 @@ def update_document_id_in_database(old_doc_id: str, new_doc_id: str) -> None:
|
||||
bind.execute(
|
||||
sa.text("DELETE FROM document WHERE id = :old_id"), {"old_id": old_doc_id}
|
||||
)
|
||||
# print(f"Successfully deleted document {old_doc_id} from database")
|
||||
|
||||
|
||||
def _visit_chunks(
|
||||
*,
|
||||
http_client: httpx.Client,
|
||||
index_name: str,
|
||||
selection: str,
|
||||
continuation: str | None = None,
|
||||
) -> tuple[list[dict], str | None]:
|
||||
"""Helper that calls the /document/v1 visit API once and returns (docs, next_token)."""
|
||||
|
||||
# Use the same URL as the document API, but with visit-specific params
|
||||
base_url = DOCUMENT_ID_ENDPOINT.format(index_name=index_name)
|
||||
|
||||
params: dict[str, str] = {
|
||||
"selection": selection,
|
||||
"wantedDocumentCount": "1000",
|
||||
}
|
||||
if continuation:
|
||||
params["continuation"] = continuation
|
||||
|
||||
# print(f"Visiting chunks for selection '{selection}' with params {params}")
|
||||
resp = http_client.get(base_url, params=params, timeout=None)
|
||||
# print(f"Visited chunks for document {selection}")
|
||||
resp.raise_for_status()
|
||||
|
||||
payload = resp.json()
|
||||
return payload.get("documents", []), payload.get("continuation")
|
||||
|
||||
|
||||
def delete_document_chunks_from_vespa(index_name: str, doc_id: str) -> None:
|
||||
"""Delete all chunks for a document from Vespa using pagination."""
|
||||
offset = 0
|
||||
limit = 400 # Vespa's maximum hits per query
|
||||
"""Delete all chunks for *doc_id* from Vespa using continuation-token paging (no offset)."""
|
||||
|
||||
total_deleted = 0
|
||||
# Use exact match instead of contains - Document Selector Language doesn't support contains
|
||||
selection = f'{index_name}.document_id=="{doc_id}"'
|
||||
|
||||
with get_vespa_http_client() as http_client:
|
||||
continuation: str | None = None
|
||||
while True:
|
||||
# Use pagination to handle the 400 hit limit
|
||||
yql = f'select documentid, document_id, chunk_id from sources {index_name} where document_id contains "{doc_id}"'
|
||||
docs, continuation = _visit_chunks(
|
||||
http_client=http_client,
|
||||
index_name=index_name,
|
||||
selection=selection,
|
||||
continuation=continuation,
|
||||
)
|
||||
|
||||
params = {
|
||||
"yql": yql,
|
||||
"hits": str(limit),
|
||||
"offset": str(offset),
|
||||
"timeout": "30s",
|
||||
"format": "json",
|
||||
}
|
||||
if not docs:
|
||||
break
|
||||
|
||||
response = http_client.get(SEARCH_ENDPOINT, params=params, timeout=None)
|
||||
response.raise_for_status()
|
||||
|
||||
search_result = response.json()
|
||||
hits = search_result.get("root", {}).get("children", [])
|
||||
|
||||
if not hits:
|
||||
break # No more chunks to process
|
||||
|
||||
# Delete each chunk in this batch
|
||||
for hit in hits:
|
||||
vespa_doc_id = hit.get("id") # This is the internal Vespa document ID
|
||||
if not vespa_doc_id:
|
||||
print(f"No Vespa document ID found for chunk {hit}")
|
||||
for doc in docs:
|
||||
vespa_full_id = doc.get("id")
|
||||
if not vespa_full_id:
|
||||
continue
|
||||
vespa_doc_id = vespa_doc_id.split("::")[-1] # get the UUID from the end
|
||||
|
||||
# Delete the chunk using the internal Vespa document ID
|
||||
delete_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_id}"
|
||||
vespa_doc_uuid = vespa_full_id.split("::")[-1]
|
||||
delete_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_uuid}"
|
||||
|
||||
try:
|
||||
resp = http_client.delete(delete_url)
|
||||
resp.raise_for_status()
|
||||
total_deleted += 1
|
||||
except Exception as e:
|
||||
print(f"Failed to delete chunk {vespa_doc_id}: {e}")
|
||||
# Continue trying to delete other chunks even if one fails
|
||||
continue
|
||||
print(f"Failed to delete chunk {vespa_doc_uuid}: {e}")
|
||||
|
||||
# Move to next batch
|
||||
offset += limit
|
||||
|
||||
# If we got fewer hits than the limit, we're done
|
||||
if len(hits) < limit:
|
||||
if not continuation:
|
||||
break
|
||||
|
||||
|
||||
def update_document_id_in_vespa(
|
||||
index_name: str, old_doc_id: str, new_doc_id: str
|
||||
) -> None:
|
||||
"""Update a document's ID in Vespa by updating the document_id field."""
|
||||
# Clean the new document ID for storage in Vespa (this handles invalid characters)
|
||||
"""Update all chunks' document_id field from *old_doc_id* to *new_doc_id* using continuation paging."""
|
||||
|
||||
clean_new_doc_id = replace_invalid_doc_id_characters(new_doc_id)
|
||||
|
||||
offset = 0
|
||||
limit = 400 # Vespa's maximum hits per query
|
||||
total_updated = 0
|
||||
# Use exact match instead of contains - Document Selector Language doesn't support contains
|
||||
selection = f'{index_name}.document_id=="{old_doc_id}"'
|
||||
|
||||
with get_vespa_http_client() as http_client:
|
||||
continuation: str | None = None
|
||||
while True:
|
||||
# Use pagination to handle the 400 hit limit
|
||||
yql = f'select documentid, document_id, chunk_id from sources {index_name} where document_id contains "{old_doc_id}"'
|
||||
# print(f"Visiting chunks for document {old_doc_id} -> {new_doc_id}")
|
||||
docs, continuation = _visit_chunks(
|
||||
http_client=http_client,
|
||||
index_name=index_name,
|
||||
selection=selection,
|
||||
continuation=continuation,
|
||||
)
|
||||
|
||||
params = {
|
||||
"yql": yql,
|
||||
"hits": str(limit),
|
||||
"offset": str(offset),
|
||||
"timeout": "30s",
|
||||
"format": "json",
|
||||
}
|
||||
if not docs:
|
||||
break
|
||||
|
||||
response = http_client.get(SEARCH_ENDPOINT, params=params, timeout=None)
|
||||
response.raise_for_status()
|
||||
|
||||
search_result = response.json()
|
||||
hits = search_result.get("root", {}).get("children", [])
|
||||
|
||||
if not hits:
|
||||
break # No more chunks to process
|
||||
|
||||
# Update each chunk in this batch
|
||||
for hit in hits:
|
||||
vespa_doc_id = hit.get("id") # This is the internal Vespa document ID
|
||||
if not vespa_doc_id:
|
||||
print(f"No Vespa document ID found for chunk {hit}")
|
||||
for doc in docs:
|
||||
vespa_full_id = doc.get("id")
|
||||
if not vespa_full_id:
|
||||
continue
|
||||
vespa_doc_id = vespa_doc_id.split("::")[-1] # get the UUID from the end
|
||||
|
||||
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_id}"
|
||||
vespa_doc_uuid = vespa_full_id.split("::")[-1]
|
||||
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_doc_uuid}"
|
||||
|
||||
update_request = {
|
||||
"fields": {"document_id": {"assign": clean_new_doc_id}}
|
||||
}
|
||||
@@ -378,16 +386,11 @@ def update_document_id_in_vespa(
|
||||
try:
|
||||
resp = http_client.put(vespa_url, json=update_request)
|
||||
resp.raise_for_status()
|
||||
total_updated += 1
|
||||
except Exception as e:
|
||||
print(f"Failed to update chunk {vespa_doc_id}: {e}")
|
||||
print(f"Failed to update chunk {vespa_doc_uuid}: {e}")
|
||||
raise
|
||||
|
||||
# Move to next batch
|
||||
offset += limit
|
||||
|
||||
# If we got fewer hits than the limit, we're done
|
||||
if len(hits) < limit:
|
||||
if not continuation:
|
||||
break
|
||||
|
||||
|
||||
@@ -396,6 +399,39 @@ def delete_document_from_db(current_doc_id: str, index_name: str) -> None:
|
||||
try:
|
||||
bind = op.get_bind()
|
||||
|
||||
# Delete from agent-related tables first (order matters due to foreign keys)
|
||||
# Delete from agent__sub_query__search_doc first since it references search_doc
|
||||
bind.execute(
|
||||
sa.text(
|
||||
"""
|
||||
DELETE FROM agent__sub_query__search_doc
|
||||
WHERE search_doc_id IN (
|
||||
SELECT id FROM search_doc WHERE document_id = :doc_id
|
||||
)
|
||||
"""
|
||||
),
|
||||
{"doc_id": current_doc_id},
|
||||
)
|
||||
|
||||
# Delete from chat_message__search_doc
|
||||
bind.execute(
|
||||
sa.text(
|
||||
"""
|
||||
DELETE FROM chat_message__search_doc
|
||||
WHERE search_doc_id IN (
|
||||
SELECT id FROM search_doc WHERE document_id = :doc_id
|
||||
)
|
||||
"""
|
||||
),
|
||||
{"doc_id": current_doc_id},
|
||||
)
|
||||
|
||||
# Now we can safely delete from search_doc
|
||||
bind.execute(
|
||||
sa.text("DELETE FROM search_doc WHERE document_id = :doc_id"),
|
||||
{"doc_id": current_doc_id},
|
||||
)
|
||||
|
||||
# Delete from document_by_connector_credential_pair
|
||||
bind.execute(
|
||||
sa.text(
|
||||
@@ -405,11 +441,6 @@ def delete_document_from_db(current_doc_id: str, index_name: str) -> None:
|
||||
)
|
||||
|
||||
# Delete from other tables that reference this document
|
||||
bind.execute(
|
||||
sa.text("DELETE FROM search_doc WHERE document_id = :doc_id"),
|
||||
{"doc_id": current_doc_id},
|
||||
)
|
||||
|
||||
bind.execute(
|
||||
sa.text(
|
||||
"DELETE FROM document_retrieval_feedback WHERE document_id = :doc_id"
|
||||
@@ -510,8 +541,10 @@ def upgrade() -> None:
|
||||
current_doc_id = doc_info["document_id"]
|
||||
normalized_doc_id = normalize_google_drive_url(current_doc_id)
|
||||
|
||||
print(f"Processing document {current_doc_id} -> {normalized_doc_id}")
|
||||
# Check for duplicates
|
||||
if normalized_doc_id in all_normalized_doc_ids:
|
||||
# print(f"Deleting duplicate document {current_doc_id}")
|
||||
delete_document_from_db(current_doc_id, index_name)
|
||||
continue
|
||||
|
||||
@@ -519,19 +552,22 @@ def upgrade() -> None:
|
||||
|
||||
# If the document ID already doesn't have query parameters, skip it
|
||||
if current_doc_id == normalized_doc_id:
|
||||
# print(f"Skipping document {current_doc_id} -> {normalized_doc_id} because it already has no query parameters")
|
||||
continue
|
||||
|
||||
try:
|
||||
# Update both database and Vespa in order
|
||||
# Database first to ensure consistency
|
||||
update_document_id_in_database(current_doc_id, normalized_doc_id)
|
||||
update_document_id_in_database(
|
||||
current_doc_id, normalized_doc_id, index_name
|
||||
)
|
||||
|
||||
# For Vespa, we can now use the original document IDs since we're using contains matching
|
||||
update_document_id_in_vespa(index_name, current_doc_id, normalized_doc_id)
|
||||
updated_count += 1
|
||||
# print(f"Finished updating document {current_doc_id} -> {normalized_doc_id}")
|
||||
except Exception as e:
|
||||
print(f"Failed to update document {current_doc_id}: {e}")
|
||||
from httpx import HTTPStatusError
|
||||
|
||||
if isinstance(e, HTTPStatusError):
|
||||
print(f"HTTPStatusError: {e}")
|
||||
|
||||
Reference in New Issue
Block a user