Compare commits

..

2 Commits

Author SHA1 Message Date
Dane Urban
4f6826ca0c . 2026-04-17 13:50:33 -07:00
Dane Urban
895c329c00 file saving 2026-04-17 11:53:55 -07:00
8 changed files with 433 additions and 0 deletions

View File

@@ -231,6 +231,8 @@ class DocumentBase(BaseModel):
# Set during docfetching after hierarchy nodes are cached
parent_hierarchy_node_id: int | None = None
file_id: str | None = None
def get_title_for_document_index(
self,
) -> str | None:
@@ -370,6 +372,7 @@ class Document(DocumentBase):
secondary_owners=base.secondary_owners,
title=base.title,
from_ingestion_api=base.from_ingestion_api,
file_id=base.file_id,
)
def __sizeof__(self) -> int:

View File

@@ -696,6 +696,7 @@ def upsert_documents(
else {}
),
doc_metadata=doc.doc_metadata,
file_id=doc.file_id,
)
)
for doc in seen_documents.values()
@@ -712,6 +713,7 @@ def upsert_documents(
"secondary_owners": insert_stmt.excluded.secondary_owners,
"doc_metadata": insert_stmt.excluded.doc_metadata,
"parent_hierarchy_node_id": insert_stmt.excluded.parent_hierarchy_node_id,
"file_id": insert_stmt.excluded.file_id,
}
if includes_permissions:
# Use COALESCE to preserve existing permissions when new values are NULL.

View File

@@ -62,6 +62,21 @@ def delete_filerecord_by_file_id(
db_session.query(FileRecord).filter_by(file_id=file_id).delete()
def update_filerecord_origin(
file_id: str,
from_origin: FileOrigin,
to_origin: FileOrigin,
db_session: Session,
) -> None:
"""Change a file_record's `file_origin`, filtered on the current origin
so the update is idempotent. Caller owns the commit.
"""
db_session.query(FileRecord).filter(
FileRecord.file_id == file_id,
FileRecord.file_origin == from_origin,
).update({FileRecord.file_origin: to_origin})
def upsert_filerecord(
file_id: str,
display_name: str,

View File

@@ -98,6 +98,9 @@ class DocumentMetadata:
# The resolved database ID of the parent hierarchy node (folder/container)
parent_hierarchy_node_id: int | None = None
# Opt-in pointer to the persisted raw file for this document (file_store id).
file_id: str | None = None
@dataclass
class VespaDocumentFields:

View File

@@ -2,7 +2,10 @@ from collections.abc import Callable
from typing import Any
from typing import IO
from sqlalchemy.orm import Session
from onyx.configs.constants import FileOrigin
from onyx.db.file_record import update_filerecord_origin
from onyx.file_store.file_store import get_default_file_store
from onyx.utils.logger import setup_logger
@@ -61,3 +64,19 @@ def build_raw_file_callback(
)
return _callback
def promote_staged_file(db_session: Session, file_id: str) -> None:
"""Mark a previously-staged file as `FileOrigin.CONNECTOR`.
Idempotent — the underlying update filters on the STAGING origin so
repeated calls no-op once the file has already been promoted or removed.
Caller owns the commit so promotion stays transactional with whatever
document-level bookkeeping the caller is doing.
"""
update_filerecord_origin(
file_id=file_id,
from_origin=FileOrigin.INDEXING_STAGING,
to_origin=FileOrigin.CONNECTOR,
db_session=db_session,
)

View File

@@ -49,6 +49,7 @@ from onyx.document_index.interfaces import DocumentMetadata
from onyx.document_index.interfaces import IndexBatchParams
from onyx.file_processing.image_summarization import summarize_image_with_error_handling
from onyx.file_store.file_store import get_default_file_store
from onyx.file_store.staging import promote_staged_file
from onyx.hooks.executor import execute_hook
from onyx.hooks.executor import HookSkipped
from onyx.hooks.executor import HookSoftFailed
@@ -154,6 +155,7 @@ def _upsert_documents_in_db(
doc_metadata=doc.doc_metadata,
# parent_hierarchy_node_id is resolved in docfetching using Redis cache
parent_hierarchy_node_id=doc.parent_hierarchy_node_id,
file_id=doc.file_id,
)
document_metadata_list.append(db_doc_metadata)
@@ -364,6 +366,39 @@ def index_doc_batch_with_handler(
return index_pipeline_result
def _apply_file_id_transitions(
documents: list[Document],
previous_file_ids: dict[str, str],
db_session: Session,
) -> None:
"""Finalize file_id lifecycle for the batch.
`document.file_id` is already written by `upsert_documents`. For each doc
whose file_id changed, promote the new staged file to `CONNECTOR` (so the
TTL janitor leaves it alone) and delete the replaced one. The delete is
best-effort; if it fails the janitor will reap the orphan.
"""
file_store = get_default_file_store()
for doc in documents:
new_file_id = doc.file_id
old_file_id = previous_file_ids.get(doc.id)
if new_file_id == old_file_id:
continue
if new_file_id is not None:
promote_staged_file(db_session=db_session, file_id=new_file_id)
if old_file_id is not None:
try:
file_store.delete_file(old_file_id, error_on_missing=False)
except Exception:
logger.exception(
f"Failed to delete replaced file_id={old_file_id}; "
"will be reaped by janitor."
)
def index_doc_batch_prepare(
documents: list[Document],
index_attempt_metadata: IndexAttemptMetadata,
@@ -382,6 +417,11 @@ def index_doc_batch_prepare(
document_ids=document_ids,
)
# Capture previous file_ids BEFORE any writes so we know what to reap.
previous_file_ids: dict[str, str] = {
db_doc.id: db_doc.file_id for db_doc in db_docs if db_doc.file_id is not None
}
updatable_docs = (
get_doc_ids_to_update(documents=documents, db_docs=db_docs)
if not ignore_time_skip
@@ -404,6 +444,11 @@ def index_doc_batch_prepare(
index_attempt_metadata=index_attempt_metadata,
db_session=db_session,
)
_apply_file_id_transitions(
documents=updatable_docs,
previous_file_ids=previous_file_ids,
db_session=db_session,
)
logger.info(
f"Upserted {len(updatable_docs)} changed docs out of {len(documents)} total docs into the DB"

View File

@@ -0,0 +1,346 @@
"""External dependency unit tests for `index_doc_batch_prepare`.
Validates the file_id lifecycle that runs alongside the document upsert:
* `document.file_id` is written on insert AND on conflict (upsert path)
* Newly-staged files get promoted from INDEXING_STAGING -> CONNECTOR
* Replaced files are deleted from both `file_record` and S3
* No-op when the file_id is unchanged
Uses real PostgreSQL + real S3/MinIO via the file store.
"""
from collections.abc import Generator
from io import BytesIO
from uuid import uuid4
import pytest
from sqlalchemy.orm import Session
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import FileOrigin
from onyx.connectors.models import Document
from onyx.connectors.models import IndexAttemptMetadata
from onyx.connectors.models import InputType
from onyx.connectors.models import TextSection
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.file_record import get_filerecord_by_file_id_optional
from onyx.db.models import Connector
from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import Credential
from onyx.db.models import Document as DBDocument
from onyx.db.models import FileRecord
from onyx.file_store.file_store import get_default_file_store
from onyx.indexing.indexing_pipeline import index_doc_batch_prepare
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_doc(doc_id: str, file_id: str | None = None) -> Document:
"""Minimal Document for indexing-pipeline tests. MOCK_CONNECTOR avoids
triggering the hierarchy-node linking branch (NOTION/CONFLUENCE only)."""
return Document(
id=doc_id,
source=DocumentSource.MOCK_CONNECTOR,
semantic_identifier=f"semantic-{doc_id}",
sections=[TextSection(text="content", link=None)],
metadata={},
file_id=file_id,
)
def _stage_file(content: bytes = b"raw bytes") -> str:
"""Write bytes to the file store as INDEXING_STAGING and return the file_id.
Mirrors what the connector raw_file_callback would do during fetch.
"""
return get_default_file_store().save_file(
content=BytesIO(content),
display_name=None,
file_origin=FileOrigin.INDEXING_STAGING,
file_type="application/octet-stream",
file_metadata={"test": True},
)
def _get_doc_row(db_session: Session, doc_id: str) -> DBDocument | None:
"""Reload the document row fresh from DB so we see post-upsert state."""
db_session.expire_all()
return db_session.query(DBDocument).filter(DBDocument.id == doc_id).one_or_none()
def _get_filerecord(db_session: Session, file_id: str) -> FileRecord | None:
db_session.expire_all()
return get_filerecord_by_file_id_optional(file_id=file_id, db_session=db_session)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def cc_pair(
db_session: Session,
tenant_context: None, # noqa: ARG001
initialize_file_store: None, # noqa: ARG001
) -> Generator[ConnectorCredentialPair, None, None]:
"""Create a connector + credential + cc_pair backing the index attempt."""
connector = Connector(
name=f"test-connector-{uuid4().hex[:8]}",
source=DocumentSource.MOCK_CONNECTOR,
input_type=InputType.LOAD_STATE,
connector_specific_config={},
refresh_freq=None,
prune_freq=None,
indexing_start=None,
)
db_session.add(connector)
db_session.flush()
credential = Credential(
source=DocumentSource.MOCK_CONNECTOR,
credential_json={},
)
db_session.add(credential)
db_session.flush()
pair = ConnectorCredentialPair(
connector_id=connector.id,
credential_id=credential.id,
name=f"test-cc-pair-{uuid4().hex[:8]}",
status=ConnectorCredentialPairStatus.ACTIVE,
access_type=AccessType.PUBLIC,
auto_sync_options=None,
)
db_session.add(pair)
db_session.commit()
db_session.refresh(pair)
yield pair
@pytest.fixture
def attempt_metadata(cc_pair: ConnectorCredentialPair) -> IndexAttemptMetadata:
return IndexAttemptMetadata(
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,
attempt_id=None,
request_id="test-request",
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestNewDocuments:
"""First-time inserts — no previous file_id to reconcile against."""
def test_new_doc_without_file_id(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=None)
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
row = _get_doc_row(db_session, doc.id)
assert row is not None
assert row.file_id is None
def test_new_doc_with_staged_file_id_promotes_to_connector(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
file_id = _stage_file()
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
row = _get_doc_row(db_session, doc.id)
assert row is not None and row.file_id == file_id
record = _get_filerecord(db_session, file_id)
assert record is not None
assert record.file_origin == FileOrigin.CONNECTOR
class TestExistingDocuments:
"""Re-index path — a `document` row already exists with some file_id."""
def test_unchanged_file_id_is_noop(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
file_id = _stage_file()
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
# First pass: inserts the row + promotes the file.
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
# Second pass with the same file_id — should not delete or re-promote.
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
record = _get_filerecord(db_session, file_id)
assert record is not None
assert record.file_origin == FileOrigin.CONNECTOR
row = _get_doc_row(db_session, doc.id)
assert row is not None and row.file_id == file_id
def test_swapping_file_id_promotes_new_and_deletes_old(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
old_file_id = _stage_file(content=b"old bytes")
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=old_file_id)
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
# Re-fetch produces a new staged file_id for the same doc.
new_file_id = _stage_file(content=b"new bytes")
doc_v2 = _make_doc(doc.id, file_id=new_file_id)
index_doc_batch_prepare(
documents=[doc_v2],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
row = _get_doc_row(db_session, doc.id)
assert row is not None and row.file_id == new_file_id
new_record = _get_filerecord(db_session, new_file_id)
assert new_record is not None
assert new_record.file_origin == FileOrigin.CONNECTOR
# Old file_record + S3 object are gone.
assert _get_filerecord(db_session, old_file_id) is None
def test_clearing_file_id_deletes_old_and_nulls_column(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
old_file_id = _stage_file()
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=old_file_id)
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
# Connector opts out on next run — yields the doc without a file_id.
doc_v2 = _make_doc(doc.id, file_id=None)
index_doc_batch_prepare(
documents=[doc_v2],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
row = _get_doc_row(db_session, doc.id)
assert row is not None and row.file_id is None
assert _get_filerecord(db_session, old_file_id) is None
class TestBatchHandling:
"""Mixed batches — multiple docs at different lifecycle states in one call."""
def test_mixed_batch_each_doc_handled_independently(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
# Pre-seed an existing doc with a file_id we'll swap.
existing_old_id = _stage_file(content=b"existing-old")
existing_doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=existing_old_id)
index_doc_batch_prepare(
documents=[existing_doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
# Now: swap the existing one, add a brand-new doc with file_id, and a
# brand-new doc without file_id.
swap_new_id = _stage_file(content=b"existing-new")
new_with_file_id = _stage_file(content=b"new-with-file")
existing_v2 = _make_doc(existing_doc.id, file_id=swap_new_id)
new_with = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=new_with_file_id)
new_without = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=None)
index_doc_batch_prepare(
documents=[existing_v2, new_with, new_without],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
# Existing doc was swapped: old file gone, new file promoted.
existing_row = _get_doc_row(db_session, existing_doc.id)
assert existing_row is not None and existing_row.file_id == swap_new_id
assert _get_filerecord(db_session, existing_old_id) is None
swap_record = _get_filerecord(db_session, swap_new_id)
assert swap_record is not None
assert swap_record.file_origin == FileOrigin.CONNECTOR
# New doc with file_id: row exists, file promoted.
new_with_row = _get_doc_row(db_session, new_with.id)
assert new_with_row is not None and new_with_row.file_id == new_with_file_id
new_with_record = _get_filerecord(db_session, new_with_file_id)
assert new_with_record is not None
assert new_with_record.file_origin == FileOrigin.CONNECTOR
# New doc without file_id: row exists, no file_record involvement.
new_without_row = _get_doc_row(db_session, new_without.id)
assert new_without_row is not None and new_without_row.file_id is None