mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-04-21 01:16:45 +00:00
Compare commits
4 Commits
edge
...
nikg/captc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9eb6eb080b | ||
|
|
6856a7bedd | ||
|
|
ea807a6db3 | ||
|
|
35263d9456 |
@@ -76,24 +76,34 @@ async def verify_captcha_token(
|
||||
f"Captcha verification failed: {', '.join(error_codes)}"
|
||||
)
|
||||
|
||||
# For reCAPTCHA v3, also check the score
|
||||
if result.score is not None:
|
||||
if result.score < RECAPTCHA_SCORE_THRESHOLD:
|
||||
logger.warning(
|
||||
f"Captcha score too low: {result.score} < {RECAPTCHA_SCORE_THRESHOLD}"
|
||||
)
|
||||
raise CaptchaVerificationError(
|
||||
"Captcha verification failed: suspicious activity detected"
|
||||
)
|
||||
# Require v3 score. Google's public test secret returns no score
|
||||
# — that path must not be active in prod since it skips the only
|
||||
# human-vs-bot signal. A missing score here means captcha is
|
||||
# misconfigured (test secret in prod, or a v2 response slipped in
|
||||
# via an action mismatch).
|
||||
if result.score is None:
|
||||
logger.warning(
|
||||
"Captcha verification failed: siteverify returned no score (likely test secret in prod)"
|
||||
)
|
||||
raise CaptchaVerificationError(
|
||||
"Captcha verification failed: missing score"
|
||||
)
|
||||
|
||||
# Optionally verify the action matches
|
||||
if result.action and result.action != expected_action:
|
||||
logger.warning(
|
||||
f"Captcha action mismatch: {result.action} != {expected_action}"
|
||||
)
|
||||
raise CaptchaVerificationError(
|
||||
"Captcha verification failed: action mismatch"
|
||||
)
|
||||
if result.score < RECAPTCHA_SCORE_THRESHOLD:
|
||||
logger.warning(
|
||||
f"Captcha score too low: {result.score} < {RECAPTCHA_SCORE_THRESHOLD}"
|
||||
)
|
||||
raise CaptchaVerificationError(
|
||||
"Captcha verification failed: suspicious activity detected"
|
||||
)
|
||||
|
||||
if result.action and result.action != expected_action:
|
||||
logger.warning(
|
||||
f"Captcha action mismatch: {result.action} != {expected_action}"
|
||||
)
|
||||
raise CaptchaVerificationError(
|
||||
"Captcha verification failed: action mismatch"
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Captcha verification passed: score={result.score}, action={result.action}"
|
||||
|
||||
@@ -60,7 +60,9 @@ from onyx.configs.constants import DEFAULT_PERSONA_ID
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.configs.constants import MessageType
|
||||
from onyx.configs.constants import MilestoneRecordType
|
||||
from onyx.configs.llm_configs import get_image_extraction_and_analysis_enabled
|
||||
from onyx.context.search.models import BaseFilters
|
||||
from onyx.context.search.models import IndexFilters
|
||||
from onyx.context.search.models import SearchDoc
|
||||
from onyx.db.chat import create_new_chat_message
|
||||
from onyx.db.chat import get_chat_session_by_id
|
||||
@@ -74,12 +76,17 @@ from onyx.db.models import Persona
|
||||
from onyx.db.models import User
|
||||
from onyx.db.models import UserFile
|
||||
from onyx.db.projects import get_user_files_from_project
|
||||
from onyx.db.search_settings import get_active_search_settings
|
||||
from onyx.db.tools import get_tools
|
||||
from onyx.deep_research.dr_loop import run_deep_research_llm_loop
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.document_index.interfaces import DocumentIndex
|
||||
from onyx.document_index.interfaces import VespaChunkRequest
|
||||
from onyx.error_handling.error_codes import OnyxErrorCode
|
||||
from onyx.error_handling.exceptions import log_onyx_error
|
||||
from onyx.error_handling.exceptions import OnyxError
|
||||
from onyx.file_processing.extract_file_text import extract_file_text
|
||||
from onyx.file_processing.extract_file_text import extract_text_and_images
|
||||
from onyx.file_store.models import ChatFileType
|
||||
from onyx.file_store.models import InMemoryChatFile
|
||||
from onyx.file_store.utils import load_in_memory_chat_files
|
||||
@@ -122,6 +129,7 @@ from onyx.tools.tool_constructor import SearchToolConfig
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.telemetry import mt_cloud_telemetry
|
||||
from onyx.utils.timing import log_function_time
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
from shared_configs.contextvars import get_current_tenant_id
|
||||
|
||||
logger = setup_logger()
|
||||
@@ -244,22 +252,106 @@ def _empty_extracted_context_files() -> ExtractedContextFiles:
|
||||
)
|
||||
|
||||
|
||||
def _extract_text_from_in_memory_file(f: InMemoryChatFile) -> str | None:
|
||||
def _fetch_cached_image_captions(
|
||||
user_file: UserFile | None,
|
||||
document_index: DocumentIndex | None,
|
||||
) -> list[str]:
|
||||
"""Read image-caption chunks for a user file from the document index.
|
||||
|
||||
During indexing, embedded images are summarized via a vision LLM and
|
||||
those summaries are stored as chunks whose `image_file_id` is set. Reading
|
||||
them back at chat time avoids re-running vision-LLM calls per turn.
|
||||
Returns an empty list if the index has no chunks yet (e.g. indexing is
|
||||
still in flight) or on any fetch failure.
|
||||
"""
|
||||
if user_file is None or document_index is None:
|
||||
return []
|
||||
try:
|
||||
chunks = document_index.id_based_retrieval(
|
||||
chunk_requests=[VespaChunkRequest(document_id=str(user_file.id))],
|
||||
filters=IndexFilters(
|
||||
access_control_list=None,
|
||||
tenant_id=get_current_tenant_id() if MULTI_TENANT else None,
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
f"Failed to fetch cached captions for user_file {user_file.id}",
|
||||
exc_info=True,
|
||||
)
|
||||
return []
|
||||
|
||||
# An image can be spread across multiple chunks; combine by image_file_id
|
||||
# so a single caption appears once in the context.
|
||||
combined: dict[str, list[str]] = {}
|
||||
for chunk in chunks:
|
||||
if chunk.image_file_id and chunk.content:
|
||||
combined.setdefault(chunk.image_file_id, []).append(chunk.content)
|
||||
return [
|
||||
f"[Image — {image_file_id}]\n" + "\n".join(contents)
|
||||
for image_file_id, contents in combined.items()
|
||||
]
|
||||
|
||||
|
||||
def _extract_text_from_in_memory_file(
|
||||
f: InMemoryChatFile,
|
||||
user_file: UserFile | None = None,
|
||||
document_index: DocumentIndex | None = None,
|
||||
) -> str | None:
|
||||
"""Extract text content from an InMemoryChatFile.
|
||||
|
||||
PLAIN_TEXT: the content is pre-extracted UTF-8 plaintext stored during
|
||||
ingestion — decode directly.
|
||||
DOC / CSV / other text types: the content is the original file bytes —
|
||||
use extract_file_text which handles encoding detection and format parsing.
|
||||
When image extraction is enabled and the file has embedded images, cached
|
||||
captions are pulled from the document index and appended to the text.
|
||||
The index fetch is skipped for files with no embedded images. We do not
|
||||
re-summarize images inline here — this path is hot and the indexing
|
||||
pipeline writes chunks atomically, so a missed caption means the file
|
||||
is mid-indexing and will be picked up on the next turn.
|
||||
"""
|
||||
try:
|
||||
if f.file_type == ChatFileType.PLAIN_TEXT:
|
||||
return f.content.decode("utf-8", errors="ignore").replace("\x00", "")
|
||||
return extract_file_text(
|
||||
|
||||
filename = f.filename or ""
|
||||
if not get_image_extraction_and_analysis_enabled():
|
||||
return extract_file_text(
|
||||
file=io.BytesIO(f.content),
|
||||
file_name=filename,
|
||||
break_on_unprocessable=False,
|
||||
)
|
||||
|
||||
extraction = extract_text_and_images(
|
||||
file=io.BytesIO(f.content),
|
||||
file_name=f.filename or "",
|
||||
break_on_unprocessable=False,
|
||||
file_name=filename,
|
||||
)
|
||||
text = extraction.text_content
|
||||
has_text = bool(text.strip())
|
||||
has_images = bool(extraction.embedded_images)
|
||||
|
||||
if not has_text and not has_images:
|
||||
# extract_text_and_images has no is_text_file() fallback for
|
||||
# unknown extensions (.py/.rs/.md without a dedicated handler).
|
||||
# Defer to the legacy path so those files remain readable.
|
||||
return extract_file_text(
|
||||
file=io.BytesIO(f.content),
|
||||
file_name=filename,
|
||||
break_on_unprocessable=False,
|
||||
)
|
||||
|
||||
if not has_images:
|
||||
return text if has_text else None
|
||||
|
||||
cached_captions = _fetch_cached_image_captions(user_file, document_index)
|
||||
|
||||
parts: list[str] = []
|
||||
if has_text:
|
||||
parts.append(text)
|
||||
parts.extend(cached_captions)
|
||||
|
||||
return "\n\n".join(parts).strip() or None
|
||||
except Exception:
|
||||
logger.warning(f"Failed to extract text from file {f.file_id}", exc_info=True)
|
||||
return None
|
||||
@@ -341,6 +433,23 @@ def extract_context_files(
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
# The document index is used at chat time to read cached image captions
|
||||
# (produced during indexing) so vision-LLM calls don't re-run per turn.
|
||||
document_index: DocumentIndex | None = None
|
||||
if not DISABLE_VECTOR_DB and get_image_extraction_and_analysis_enabled():
|
||||
try:
|
||||
active_search_settings = get_active_search_settings(db_session)
|
||||
document_index = get_default_document_index(
|
||||
search_settings=active_search_settings.primary,
|
||||
secondary_search_settings=None,
|
||||
db_session=db_session,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to construct document index for caption lookup",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
file_texts: list[str] = []
|
||||
image_files: list[ChatLoadedFile] = []
|
||||
file_metadata: list[ContextFileMetadata] = []
|
||||
@@ -361,7 +470,9 @@ def extract_context_files(
|
||||
continue
|
||||
tool_metadata.append(_build_tool_metadata(uf))
|
||||
elif f.file_type.is_text_file():
|
||||
text_content = _extract_text_from_in_memory_file(f)
|
||||
text_content = _extract_text_from_in_memory_file(
|
||||
f, user_file=uf, document_index=document_index
|
||||
)
|
||||
if not text_content:
|
||||
continue
|
||||
if not uf:
|
||||
|
||||
@@ -3,7 +3,14 @@ from onyx.server.settings.store import load_settings
|
||||
|
||||
|
||||
def get_image_extraction_and_analysis_enabled() -> bool:
|
||||
"""Get image extraction and analysis enabled setting from workspace settings or fallback to False"""
|
||||
"""Return the workspace setting for image extraction/analysis.
|
||||
|
||||
The pydantic `Settings` model defaults this field to True, so production
|
||||
tenants get the feature on by default on first read. The fallback here
|
||||
stays False so environments where settings cannot be loaded at all
|
||||
(e.g. unit tests with no DB/Redis) don't trigger downstream vision-LLM
|
||||
code paths that assume the DB is reachable.
|
||||
"""
|
||||
try:
|
||||
settings = load_settings()
|
||||
if settings.image_extraction_and_analysis_enabled is not None:
|
||||
|
||||
@@ -231,6 +231,8 @@ class DocumentBase(BaseModel):
|
||||
# Set during docfetching after hierarchy nodes are cached
|
||||
parent_hierarchy_node_id: int | None = None
|
||||
|
||||
file_id: str | None = None
|
||||
|
||||
def get_title_for_document_index(
|
||||
self,
|
||||
) -> str | None:
|
||||
@@ -370,6 +372,7 @@ class Document(DocumentBase):
|
||||
secondary_owners=base.secondary_owners,
|
||||
title=base.title,
|
||||
from_ingestion_api=base.from_ingestion_api,
|
||||
file_id=base.file_id,
|
||||
)
|
||||
|
||||
def __sizeof__(self) -> int:
|
||||
|
||||
@@ -75,6 +75,8 @@ from onyx.file_processing.file_types import OnyxMimeTypes
|
||||
from onyx.file_processing.image_utils import store_image_and_create_section
|
||||
from onyx.utils.b64 import get_image_type_from_bytes
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.url import SSRFException
|
||||
from onyx.utils.url import validate_outbound_http_url
|
||||
|
||||
logger = setup_logger()
|
||||
SLIM_BATCH_SIZE = 1000
|
||||
@@ -981,6 +983,42 @@ class SharepointConnector(
|
||||
raise ConnectorValidationError(
|
||||
"Site URLs must be full Sharepoint URLs (e.g. https://your-tenant.sharepoint.com/sites/your-site or https://your-tenant.sharepoint.com/teams/your-team)"
|
||||
)
|
||||
try:
|
||||
validate_outbound_http_url(site_url, https_only=True)
|
||||
except (SSRFException, ValueError) as e:
|
||||
raise ConnectorValidationError(
|
||||
f"Invalid site URL '{site_url}': {e}"
|
||||
) from e
|
||||
|
||||
# Probe RoleAssignments permission — required for permission sync.
|
||||
# Only runs when credentials have been loaded.
|
||||
if self.msal_app and self.sp_tenant_domain and self.sites:
|
||||
try:
|
||||
token_response = acquire_token_for_rest(
|
||||
self.msal_app,
|
||||
self.sp_tenant_domain,
|
||||
self.sharepoint_domain_suffix,
|
||||
)
|
||||
probe_url = (
|
||||
f"{self.sites[0].rstrip('/')}/_api/web/roleassignments?$top=1"
|
||||
)
|
||||
resp = requests.get(
|
||||
probe_url,
|
||||
headers={"Authorization": f"Bearer {token_response.accessToken}"},
|
||||
timeout=10,
|
||||
)
|
||||
if resp.status_code in (401, 403):
|
||||
raise ConnectorValidationError(
|
||||
"The Azure AD app registration is missing the required SharePoint permission "
|
||||
"to read role assignments. Please grant 'Sites.FullControl.All' "
|
||||
"(application permission) in the Azure portal and re-run admin consent."
|
||||
)
|
||||
except ConnectorValidationError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"RoleAssignments permission probe failed (non-blocking): {e}"
|
||||
)
|
||||
|
||||
def _extract_tenant_domain_from_sites(self) -> str | None:
|
||||
"""Extract the tenant domain from configured site URLs.
|
||||
@@ -1876,16 +1914,22 @@ class SharepointConnector(
|
||||
logger.debug(
|
||||
f"Processing site page: {site_page.get('webUrl', site_page.get('name', 'Unknown'))}"
|
||||
)
|
||||
ctx = self._create_rest_client_context(site_descriptor.url)
|
||||
doc_batch.append(
|
||||
_convert_sitepage_to_slim_document(
|
||||
site_page,
|
||||
ctx,
|
||||
self.graph_client,
|
||||
parent_hierarchy_raw_node_id=site_descriptor.url,
|
||||
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
|
||||
try:
|
||||
ctx = self._create_rest_client_context(site_descriptor.url)
|
||||
doc_batch.append(
|
||||
_convert_sitepage_to_slim_document(
|
||||
site_page,
|
||||
ctx,
|
||||
self.graph_client,
|
||||
parent_hierarchy_raw_node_id=site_descriptor.url,
|
||||
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to process site page "
|
||||
f"{site_page.get('webUrl', site_page.get('name', 'Unknown'))}: {e}"
|
||||
)
|
||||
)
|
||||
if len(doc_batch) >= SLIM_BATCH_SIZE:
|
||||
yield doc_batch
|
||||
doc_batch = []
|
||||
|
||||
@@ -696,6 +696,7 @@ def upsert_documents(
|
||||
else {}
|
||||
),
|
||||
doc_metadata=doc.doc_metadata,
|
||||
file_id=doc.file_id,
|
||||
)
|
||||
)
|
||||
for doc in seen_documents.values()
|
||||
@@ -712,6 +713,7 @@ def upsert_documents(
|
||||
"secondary_owners": insert_stmt.excluded.secondary_owners,
|
||||
"doc_metadata": insert_stmt.excluded.doc_metadata,
|
||||
"parent_hierarchy_node_id": insert_stmt.excluded.parent_hierarchy_node_id,
|
||||
"file_id": insert_stmt.excluded.file_id,
|
||||
}
|
||||
if includes_permissions:
|
||||
# Use COALESCE to preserve existing permissions when new values are NULL.
|
||||
|
||||
@@ -62,6 +62,21 @@ def delete_filerecord_by_file_id(
|
||||
db_session.query(FileRecord).filter_by(file_id=file_id).delete()
|
||||
|
||||
|
||||
def update_filerecord_origin(
|
||||
file_id: str,
|
||||
from_origin: FileOrigin,
|
||||
to_origin: FileOrigin,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
"""Change a file_record's `file_origin`, filtered on the current origin
|
||||
so the update is idempotent. Caller owns the commit.
|
||||
"""
|
||||
db_session.query(FileRecord).filter(
|
||||
FileRecord.file_id == file_id,
|
||||
FileRecord.file_origin == from_origin,
|
||||
).update({FileRecord.file_origin: to_origin})
|
||||
|
||||
|
||||
def upsert_filerecord(
|
||||
file_id: str,
|
||||
display_name: str,
|
||||
|
||||
@@ -98,6 +98,9 @@ class DocumentMetadata:
|
||||
# The resolved database ID of the parent hierarchy node (folder/container)
|
||||
parent_hierarchy_node_id: int | None = None
|
||||
|
||||
# Opt-in pointer to the persisted raw file for this document (file_store id).
|
||||
file_id: str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class VespaDocumentFields:
|
||||
|
||||
@@ -368,6 +368,40 @@ def extract_docx_images(docx_bytes: IO[Any]) -> Iterator[tuple[bytes, str]]:
|
||||
logger.exception("Failed to extract all docx images")
|
||||
|
||||
|
||||
def count_docx_embedded_images(file: IO[Any], cap: int) -> int:
|
||||
"""Return the number of embedded images in a docx, short-circuiting at cap+1.
|
||||
|
||||
Mirrors count_pdf_embedded_images so upload validation can apply the same
|
||||
per-file/per-batch caps. Returns a value > cap once the count exceeds the
|
||||
cap so callers do not iterate every media entry just to report a number.
|
||||
Always restores the file pointer to its original position before returning.
|
||||
"""
|
||||
try:
|
||||
start_pos = file.tell()
|
||||
except Exception:
|
||||
start_pos = None
|
||||
try:
|
||||
if start_pos is not None:
|
||||
file.seek(0)
|
||||
count = 0
|
||||
with zipfile.ZipFile(file) as z:
|
||||
for name in z.namelist():
|
||||
if name.startswith("word/media/"):
|
||||
count += 1
|
||||
if count > cap:
|
||||
return count
|
||||
return count
|
||||
except Exception:
|
||||
logger.warning("Failed to count embedded images in docx", exc_info=True)
|
||||
return 0
|
||||
finally:
|
||||
if start_pos is not None:
|
||||
try:
|
||||
file.seek(start_pos)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def read_docx_file(
|
||||
file: IO[Any],
|
||||
file_name: str = "",
|
||||
|
||||
@@ -2,7 +2,10 @@ from collections.abc import Callable
|
||||
from typing import Any
|
||||
from typing import IO
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.db.file_record import update_filerecord_origin
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
@@ -61,3 +64,13 @@ def build_raw_file_callback(
|
||||
)
|
||||
|
||||
return _callback
|
||||
|
||||
|
||||
def promote_staged_file(db_session: Session, file_id: str) -> None:
|
||||
"""Mark a previously-staged file as `FileOrigin.CONNECTOR`."""
|
||||
update_filerecord_origin(
|
||||
file_id=file_id,
|
||||
from_origin=FileOrigin.INDEXING_STAGING,
|
||||
to_origin=FileOrigin.CONNECTOR,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
@@ -30,6 +30,7 @@ from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import IndexAttemptMetadata
|
||||
from onyx.connectors.models import IndexingDocument
|
||||
from onyx.connectors.models import Section
|
||||
from onyx.connectors.models import SectionType
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.db.document import get_documents_by_ids
|
||||
from onyx.db.document import upsert_document_by_connector_credential_pair
|
||||
@@ -49,6 +50,7 @@ from onyx.document_index.interfaces import DocumentMetadata
|
||||
from onyx.document_index.interfaces import IndexBatchParams
|
||||
from onyx.file_processing.image_summarization import summarize_image_with_error_handling
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.file_store.staging import promote_staged_file
|
||||
from onyx.hooks.executor import execute_hook
|
||||
from onyx.hooks.executor import HookSkipped
|
||||
from onyx.hooks.executor import HookSoftFailed
|
||||
@@ -154,6 +156,7 @@ def _upsert_documents_in_db(
|
||||
doc_metadata=doc.doc_metadata,
|
||||
# parent_hierarchy_node_id is resolved in docfetching using Redis cache
|
||||
parent_hierarchy_node_id=doc.parent_hierarchy_node_id,
|
||||
file_id=doc.file_id,
|
||||
)
|
||||
document_metadata_list.append(db_doc_metadata)
|
||||
|
||||
@@ -364,6 +367,45 @@ def index_doc_batch_with_handler(
|
||||
return index_pipeline_result
|
||||
|
||||
|
||||
def _promote_new_staged_files(
|
||||
documents: list[Document],
|
||||
previous_file_ids: dict[str, str],
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
"""Queue STAGING → CONNECTOR origin flips for every new file_id in the batch.
|
||||
|
||||
Intended to run immediately before `_upsert_documents_in_db` so the origin
|
||||
flip lands in the same commit as the `Document.file_id` write. Does not
|
||||
commit — the caller's next commit flushes these UPDATEs.
|
||||
"""
|
||||
for doc in documents:
|
||||
new_file_id = doc.file_id
|
||||
if new_file_id is None or new_file_id == previous_file_ids.get(doc.id):
|
||||
continue
|
||||
promote_staged_file(db_session=db_session, file_id=new_file_id)
|
||||
|
||||
|
||||
def _delete_replaced_files(
|
||||
documents: list[Document],
|
||||
previous_file_ids: dict[str, str],
|
||||
) -> None:
|
||||
"""Best-effort blob deletes for file_ids replaced in this batch.
|
||||
|
||||
Must run AFTER `Document.file_id` has been committed to the new
|
||||
file_id.
|
||||
"""
|
||||
file_store = get_default_file_store()
|
||||
for doc in documents:
|
||||
new_file_id = doc.file_id
|
||||
old_file_id = previous_file_ids.get(doc.id)
|
||||
if old_file_id is None or old_file_id == new_file_id:
|
||||
continue
|
||||
try:
|
||||
file_store.delete_file(old_file_id, error_on_missing=False)
|
||||
except Exception:
|
||||
logger.exception(f"Failed to delete replaced file_id={old_file_id}.")
|
||||
|
||||
|
||||
def index_doc_batch_prepare(
|
||||
documents: list[Document],
|
||||
index_attempt_metadata: IndexAttemptMetadata,
|
||||
@@ -382,6 +424,11 @@ def index_doc_batch_prepare(
|
||||
document_ids=document_ids,
|
||||
)
|
||||
|
||||
# Capture previous file_ids BEFORE any writes so we know what to reap.
|
||||
previous_file_ids: dict[str, str] = {
|
||||
db_doc.id: db_doc.file_id for db_doc in db_docs if db_doc.file_id is not None
|
||||
}
|
||||
|
||||
updatable_docs = (
|
||||
get_doc_ids_to_update(documents=documents, db_docs=db_docs)
|
||||
if not ignore_time_skip
|
||||
@@ -399,11 +446,24 @@ def index_doc_batch_prepare(
|
||||
# for all updatable docs, upsert into the DB
|
||||
# Does not include doc_updated_at which is also used to indicate a successful update
|
||||
if updatable_docs:
|
||||
# Queue the STAGING → CONNECTOR origin flips BEFORE the Document upsert
|
||||
# so `upsert_documents`' commit flushes Document.file_id and the origin
|
||||
# flip atomically
|
||||
_promote_new_staged_files(
|
||||
documents=updatable_docs,
|
||||
previous_file_ids=previous_file_ids,
|
||||
db_session=db_session,
|
||||
)
|
||||
_upsert_documents_in_db(
|
||||
documents=updatable_docs,
|
||||
index_attempt_metadata=index_attempt_metadata,
|
||||
db_session=db_session,
|
||||
)
|
||||
# Blob deletes run only after Document.file_id is durable.
|
||||
_delete_replaced_files(
|
||||
documents=updatable_docs,
|
||||
previous_file_ids=previous_file_ids,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Upserted {len(updatable_docs)} changed docs out of {len(documents)} total docs into the DB"
|
||||
@@ -530,8 +590,15 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
|
||||
Returns:
|
||||
List of IndexingDocument objects with processed_sections as list[Section]
|
||||
"""
|
||||
# Check if image extraction and analysis is enabled before trying to get a vision LLM
|
||||
if not get_image_extraction_and_analysis_enabled():
|
||||
# Check if image extraction and analysis is enabled before trying to get a vision LLM.
|
||||
# Use section.type rather than isinstance because sections can round-trip
|
||||
# through pydantic as base Section instances (not the concrete subclass).
|
||||
has_image_section = any(
|
||||
section.type == SectionType.IMAGE
|
||||
for document in documents
|
||||
for section in document.sections
|
||||
)
|
||||
if not get_image_extraction_and_analysis_enabled() or not has_image_section:
|
||||
llm = None
|
||||
else:
|
||||
# Only get the vision LLM if image processing is enabled
|
||||
|
||||
@@ -11,7 +11,9 @@ from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.app_configs import MAX_EMBEDDED_IMAGES_PER_FILE
|
||||
from onyx.configs.app_configs import MAX_EMBEDDED_IMAGES_PER_UPLOAD
|
||||
from onyx.configs.llm_configs import get_image_extraction_and_analysis_enabled
|
||||
from onyx.db.llm import fetch_default_llm_model
|
||||
from onyx.file_processing.extract_file_text import count_docx_embedded_images
|
||||
from onyx.file_processing.extract_file_text import count_pdf_embedded_images
|
||||
from onyx.file_processing.extract_file_text import extract_file_text
|
||||
from onyx.file_processing.extract_file_text import get_file_ext
|
||||
@@ -198,6 +200,9 @@ def categorize_uploaded_files(
|
||||
# rejected even if they'd individually fit under MAX_EMBEDDED_IMAGES_PER_FILE.
|
||||
batch_image_total = 0
|
||||
|
||||
# Hoisted out of the loop to avoid a KV-store lookup per file.
|
||||
image_extraction_enabled = get_image_extraction_and_analysis_enabled()
|
||||
|
||||
for upload in files:
|
||||
try:
|
||||
filename = get_safe_filename(upload)
|
||||
@@ -260,28 +265,33 @@ def categorize_uploaded_files(
|
||||
)
|
||||
continue
|
||||
|
||||
# Reject PDFs with an unreasonable number of embedded images
|
||||
# (either per-file or accumulated across this upload batch).
|
||||
# A PDF with thousands of embedded images can OOM the
|
||||
# Reject documents with an unreasonable number of embedded
|
||||
# images (either per-file or accumulated across this upload
|
||||
# batch). A file with thousands of embedded images can OOM the
|
||||
# user-file-processing celery worker because every image is
|
||||
# decoded with PIL and then sent to the vision LLM.
|
||||
if extension == ".pdf":
|
||||
count: int = 0
|
||||
image_bearing_ext = extension in (".pdf", ".docx")
|
||||
if image_bearing_ext:
|
||||
file_cap = MAX_EMBEDDED_IMAGES_PER_FILE
|
||||
batch_cap = MAX_EMBEDDED_IMAGES_PER_UPLOAD
|
||||
# Use the larger of the two caps as the short-circuit
|
||||
# threshold so we get a useful count for both checks.
|
||||
# count_pdf_embedded_images restores the stream position.
|
||||
count = count_pdf_embedded_images(
|
||||
upload.file, max(file_cap, batch_cap)
|
||||
# These helpers restore the stream position.
|
||||
counter = (
|
||||
count_pdf_embedded_images
|
||||
if extension == ".pdf"
|
||||
else count_docx_embedded_images
|
||||
)
|
||||
count = counter(upload.file, max(file_cap, batch_cap))
|
||||
if count > file_cap:
|
||||
results.rejected.append(
|
||||
RejectedFile(
|
||||
filename=filename,
|
||||
reason=(
|
||||
f"PDF contains too many embedded images "
|
||||
f"(more than {file_cap}). Try splitting "
|
||||
f"the document into smaller files."
|
||||
f"Document contains too many embedded "
|
||||
f"images (more than {file_cap}). Try "
|
||||
f"splitting it into smaller files."
|
||||
),
|
||||
)
|
||||
)
|
||||
@@ -308,6 +318,21 @@ def categorize_uploaded_files(
|
||||
extension=extension,
|
||||
)
|
||||
if not text_content:
|
||||
# Documents with embedded images (e.g. scans) have no
|
||||
# extractable text but can still be indexed via the
|
||||
# vision-LLM captioning path when image analysis is
|
||||
# enabled.
|
||||
if image_bearing_ext and count > 0 and image_extraction_enabled:
|
||||
results.acceptable.append(upload)
|
||||
results.acceptable_file_to_token_count[filename] = 0
|
||||
try:
|
||||
upload.file.seek(0)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to reset file pointer for '{filename}': {str(e)}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.warning(f"No text content extracted from '{filename}'")
|
||||
results.rejected.append(
|
||||
RejectedFile(
|
||||
|
||||
@@ -80,7 +80,7 @@ class Settings(BaseModel):
|
||||
query_history_type: QueryHistoryType | None = None
|
||||
|
||||
# Image processing settings
|
||||
image_extraction_and_analysis_enabled: bool | None = False
|
||||
image_extraction_and_analysis_enabled: bool | None = True
|
||||
search_time_image_analysis_enabled: bool | None = False
|
||||
image_analysis_max_size_mb: int | None = 20
|
||||
|
||||
|
||||
@@ -0,0 +1,405 @@
|
||||
"""External dependency unit tests for `index_doc_batch_prepare`.
|
||||
|
||||
Validates the file_id lifecycle that runs alongside the document upsert:
|
||||
|
||||
* `document.file_id` is written on insert AND on conflict (upsert path)
|
||||
* Newly-staged files get promoted from INDEXING_STAGING -> CONNECTOR
|
||||
* Replaced files are deleted from both `file_record` and S3
|
||||
* No-op when the file_id is unchanged
|
||||
|
||||
Uses real PostgreSQL + real S3/MinIO via the file store.
|
||||
"""
|
||||
|
||||
from collections.abc import Generator
|
||||
from io import BytesIO
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import IndexAttemptMetadata
|
||||
from onyx.connectors.models import InputType
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.db.enums import AccessType
|
||||
from onyx.db.enums import ConnectorCredentialPairStatus
|
||||
from onyx.db.file_record import get_filerecord_by_file_id_optional
|
||||
from onyx.db.models import Connector
|
||||
from onyx.db.models import ConnectorCredentialPair
|
||||
from onyx.db.models import Credential
|
||||
from onyx.db.models import Document as DBDocument
|
||||
from onyx.db.models import DocumentByConnectorCredentialPair
|
||||
from onyx.db.models import FileRecord
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.indexing.indexing_pipeline import index_doc_batch_prepare
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_doc(doc_id: str, file_id: str | None = None) -> Document:
|
||||
"""Minimal Document for indexing-pipeline tests. MOCK_CONNECTOR avoids
|
||||
triggering the hierarchy-node linking branch (NOTION/CONFLUENCE only)."""
|
||||
return Document(
|
||||
id=doc_id,
|
||||
source=DocumentSource.MOCK_CONNECTOR,
|
||||
semantic_identifier=f"semantic-{doc_id}",
|
||||
sections=[TextSection(text="content", link=None)],
|
||||
metadata={},
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
|
||||
def _stage_file(content: bytes = b"raw bytes") -> str:
|
||||
"""Write bytes to the file store as INDEXING_STAGING and return the file_id.
|
||||
|
||||
Mirrors what the connector raw_file_callback would do during fetch.
|
||||
"""
|
||||
return get_default_file_store().save_file(
|
||||
content=BytesIO(content),
|
||||
display_name=None,
|
||||
file_origin=FileOrigin.INDEXING_STAGING,
|
||||
file_type="application/octet-stream",
|
||||
file_metadata={"test": True},
|
||||
)
|
||||
|
||||
|
||||
def _get_doc_row(db_session: Session, doc_id: str) -> DBDocument | None:
|
||||
"""Reload the document row fresh from DB so we see post-upsert state."""
|
||||
db_session.expire_all()
|
||||
return db_session.query(DBDocument).filter(DBDocument.id == doc_id).one_or_none()
|
||||
|
||||
|
||||
def _get_filerecord(db_session: Session, file_id: str) -> FileRecord | None:
|
||||
db_session.expire_all()
|
||||
return get_filerecord_by_file_id_optional(file_id=file_id, db_session=db_session)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cc_pair(
|
||||
db_session: Session,
|
||||
tenant_context: None, # noqa: ARG001
|
||||
initialize_file_store: None, # noqa: ARG001
|
||||
) -> Generator[ConnectorCredentialPair, None, None]:
|
||||
"""Create a connector + credential + cc_pair backing the index attempt.
|
||||
|
||||
Teardown sweeps everything the test created under this cc_pair: the
|
||||
`document_by_connector_credential_pair` join rows, the `Document` rows
|
||||
they point at, the `FileRecord` + blob for each doc's `file_id`, and
|
||||
finally the cc_pair / connector / credential themselves. Without this,
|
||||
every run would leave orphan rows in the dev DB and orphan blobs in
|
||||
MinIO.
|
||||
"""
|
||||
connector = Connector(
|
||||
name=f"test-connector-{uuid4().hex[:8]}",
|
||||
source=DocumentSource.MOCK_CONNECTOR,
|
||||
input_type=InputType.LOAD_STATE,
|
||||
connector_specific_config={},
|
||||
refresh_freq=None,
|
||||
prune_freq=None,
|
||||
indexing_start=None,
|
||||
)
|
||||
db_session.add(connector)
|
||||
db_session.flush()
|
||||
|
||||
credential = Credential(
|
||||
source=DocumentSource.MOCK_CONNECTOR,
|
||||
credential_json={},
|
||||
)
|
||||
db_session.add(credential)
|
||||
db_session.flush()
|
||||
|
||||
pair = ConnectorCredentialPair(
|
||||
connector_id=connector.id,
|
||||
credential_id=credential.id,
|
||||
name=f"test-cc-pair-{uuid4().hex[:8]}",
|
||||
status=ConnectorCredentialPairStatus.ACTIVE,
|
||||
access_type=AccessType.PUBLIC,
|
||||
auto_sync_options=None,
|
||||
)
|
||||
db_session.add(pair)
|
||||
db_session.commit()
|
||||
db_session.refresh(pair)
|
||||
|
||||
connector_id = pair.connector_id
|
||||
credential_id = pair.credential_id
|
||||
|
||||
try:
|
||||
yield pair
|
||||
finally:
|
||||
db_session.expire_all()
|
||||
|
||||
# Collect every doc indexed under this cc_pair so we can delete its
|
||||
# file_record + blob before dropping the Document row itself.
|
||||
doc_ids: list[str] = [
|
||||
row[0]
|
||||
for row in db_session.query(DocumentByConnectorCredentialPair.id)
|
||||
.filter(
|
||||
DocumentByConnectorCredentialPair.connector_id == connector_id,
|
||||
DocumentByConnectorCredentialPair.credential_id == credential_id,
|
||||
)
|
||||
.all()
|
||||
]
|
||||
file_ids: list[str] = [
|
||||
row[0]
|
||||
for row in db_session.query(DBDocument.file_id)
|
||||
.filter(DBDocument.id.in_(doc_ids), DBDocument.file_id.isnot(None))
|
||||
.all()
|
||||
]
|
||||
|
||||
file_store = get_default_file_store()
|
||||
for fid in file_ids:
|
||||
try:
|
||||
file_store.delete_file(fid, error_on_missing=False)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if doc_ids:
|
||||
db_session.query(DocumentByConnectorCredentialPair).filter(
|
||||
DocumentByConnectorCredentialPair.id.in_(doc_ids)
|
||||
).delete(synchronize_session="fetch")
|
||||
db_session.query(DBDocument).filter(DBDocument.id.in_(doc_ids)).delete(
|
||||
synchronize_session="fetch"
|
||||
)
|
||||
|
||||
db_session.query(ConnectorCredentialPair).filter(
|
||||
ConnectorCredentialPair.id == pair.id
|
||||
).delete(synchronize_session="fetch")
|
||||
db_session.query(Connector).filter(Connector.id == connector_id).delete(
|
||||
synchronize_session="fetch"
|
||||
)
|
||||
db_session.query(Credential).filter(Credential.id == credential_id).delete(
|
||||
synchronize_session="fetch"
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def attempt_metadata(cc_pair: ConnectorCredentialPair) -> IndexAttemptMetadata:
|
||||
return IndexAttemptMetadata(
|
||||
connector_id=cc_pair.connector_id,
|
||||
credential_id=cc_pair.credential_id,
|
||||
attempt_id=None,
|
||||
request_id="test-request",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNewDocuments:
|
||||
"""First-time inserts — no previous file_id to reconcile against."""
|
||||
|
||||
def test_new_doc_without_file_id(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=None)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
row = _get_doc_row(db_session, doc.id)
|
||||
assert row is not None
|
||||
assert row.file_id is None
|
||||
|
||||
def test_new_doc_with_staged_file_id_promotes_to_connector(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
file_id = _stage_file()
|
||||
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
row = _get_doc_row(db_session, doc.id)
|
||||
assert row is not None and row.file_id == file_id
|
||||
|
||||
record = _get_filerecord(db_session, file_id)
|
||||
assert record is not None
|
||||
assert record.file_origin == FileOrigin.CONNECTOR
|
||||
|
||||
|
||||
class TestExistingDocuments:
|
||||
"""Re-index path — a `document` row already exists with some file_id."""
|
||||
|
||||
def test_unchanged_file_id_is_noop(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
file_id = _stage_file()
|
||||
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
|
||||
|
||||
# First pass: inserts the row + promotes the file.
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Second pass with the same file_id — should not delete or re-promote.
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
record = _get_filerecord(db_session, file_id)
|
||||
assert record is not None
|
||||
assert record.file_origin == FileOrigin.CONNECTOR
|
||||
|
||||
row = _get_doc_row(db_session, doc.id)
|
||||
assert row is not None and row.file_id == file_id
|
||||
|
||||
def test_swapping_file_id_promotes_new_and_deletes_old(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
old_file_id = _stage_file(content=b"old bytes")
|
||||
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=old_file_id)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Re-fetch produces a new staged file_id for the same doc.
|
||||
new_file_id = _stage_file(content=b"new bytes")
|
||||
doc_v2 = _make_doc(doc.id, file_id=new_file_id)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc_v2],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
row = _get_doc_row(db_session, doc.id)
|
||||
assert row is not None and row.file_id == new_file_id
|
||||
|
||||
new_record = _get_filerecord(db_session, new_file_id)
|
||||
assert new_record is not None
|
||||
assert new_record.file_origin == FileOrigin.CONNECTOR
|
||||
|
||||
# Old file_record + S3 object are gone.
|
||||
assert _get_filerecord(db_session, old_file_id) is None
|
||||
|
||||
def test_clearing_file_id_deletes_old_and_nulls_column(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
old_file_id = _stage_file()
|
||||
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=old_file_id)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Connector opts out on next run — yields the doc without a file_id.
|
||||
doc_v2 = _make_doc(doc.id, file_id=None)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc_v2],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
row = _get_doc_row(db_session, doc.id)
|
||||
assert row is not None and row.file_id is None
|
||||
assert _get_filerecord(db_session, old_file_id) is None
|
||||
|
||||
|
||||
class TestBatchHandling:
|
||||
"""Mixed batches — multiple docs at different lifecycle states in one call."""
|
||||
|
||||
def test_mixed_batch_each_doc_handled_independently(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
# Pre-seed an existing doc with a file_id we'll swap.
|
||||
existing_old_id = _stage_file(content=b"existing-old")
|
||||
existing_doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=existing_old_id)
|
||||
index_doc_batch_prepare(
|
||||
documents=[existing_doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Now: swap the existing one, add a brand-new doc with file_id, and a
|
||||
# brand-new doc without file_id.
|
||||
swap_new_id = _stage_file(content=b"existing-new")
|
||||
new_with_file_id = _stage_file(content=b"new-with-file")
|
||||
existing_v2 = _make_doc(existing_doc.id, file_id=swap_new_id)
|
||||
new_with = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=new_with_file_id)
|
||||
new_without = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=None)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[existing_v2, new_with, new_without],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Existing doc was swapped: old file gone, new file promoted.
|
||||
existing_row = _get_doc_row(db_session, existing_doc.id)
|
||||
assert existing_row is not None and existing_row.file_id == swap_new_id
|
||||
assert _get_filerecord(db_session, existing_old_id) is None
|
||||
swap_record = _get_filerecord(db_session, swap_new_id)
|
||||
assert swap_record is not None
|
||||
assert swap_record.file_origin == FileOrigin.CONNECTOR
|
||||
|
||||
# New doc with file_id: row exists, file promoted.
|
||||
new_with_row = _get_doc_row(db_session, new_with.id)
|
||||
assert new_with_row is not None and new_with_row.file_id == new_with_file_id
|
||||
new_with_record = _get_filerecord(db_session, new_with_file_id)
|
||||
assert new_with_record is not None
|
||||
assert new_with_record.file_origin == FileOrigin.CONNECTOR
|
||||
|
||||
# New doc without file_id: row exists, no file_record involvement.
|
||||
new_without_row = _get_doc_row(db_session, new_without.id)
|
||||
assert new_without_row is not None and new_without_row.file_id is None
|
||||
@@ -247,7 +247,7 @@ class DATestSettings(BaseModel):
|
||||
gpu_enabled: bool | None = None
|
||||
product_gating: DATestGatingType = DATestGatingType.NONE
|
||||
anonymous_user_enabled: bool | None = None
|
||||
image_extraction_and_analysis_enabled: bool | None = False
|
||||
image_extraction_and_analysis_enabled: bool | None = True
|
||||
search_time_image_analysis_enabled: bool | None = False
|
||||
|
||||
|
||||
|
||||
78
backend/tests/unit/onyx/auth/test_captcha_require_score.py
Normal file
78
backend/tests/unit/onyx/auth/test_captcha_require_score.py
Normal file
@@ -0,0 +1,78 @@
|
||||
"""Unit tests for the require-score check in verify_captcha_token."""
|
||||
|
||||
from unittest.mock import AsyncMock
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from onyx.auth import captcha as captcha_module
|
||||
from onyx.auth.captcha import CaptchaVerificationError
|
||||
from onyx.auth.captcha import verify_captcha_token
|
||||
|
||||
|
||||
def _fake_httpx_client_returning(payload: dict) -> MagicMock:
|
||||
resp = MagicMock()
|
||||
resp.raise_for_status = MagicMock()
|
||||
resp.json = MagicMock(return_value=payload)
|
||||
client = MagicMock()
|
||||
client.post = AsyncMock(return_value=resp)
|
||||
client.__aenter__ = AsyncMock(return_value=client)
|
||||
client.__aexit__ = AsyncMock(return_value=None)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_rejects_when_score_missing() -> None:
|
||||
"""Siteverify response with no score field is rejected outright —
|
||||
closes the accidental 'test secret in prod' bypass path."""
|
||||
client = _fake_httpx_client_returning(
|
||||
{"success": True, "hostname": "testkey.google.com"}
|
||||
)
|
||||
with (
|
||||
patch.object(captcha_module, "is_captcha_enabled", return_value=True),
|
||||
patch.object(captcha_module.httpx, "AsyncClient", return_value=client),
|
||||
):
|
||||
with pytest.raises(CaptchaVerificationError, match="missing score"):
|
||||
await verify_captcha_token("test-token", expected_action="signup")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_accepts_when_score_present_and_above_threshold() -> None:
|
||||
"""Sanity check the happy path still works with the tighter score rule."""
|
||||
client = _fake_httpx_client_returning(
|
||||
{
|
||||
"success": True,
|
||||
"score": 0.9,
|
||||
"action": "signup",
|
||||
"hostname": "cloud.onyx.app",
|
||||
}
|
||||
)
|
||||
with (
|
||||
patch.object(captcha_module, "is_captcha_enabled", return_value=True),
|
||||
patch.object(captcha_module.httpx, "AsyncClient", return_value=client),
|
||||
):
|
||||
# Should not raise.
|
||||
await verify_captcha_token("fresh-token", expected_action="signup")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_rejects_when_score_below_threshold() -> None:
|
||||
"""A score present but below threshold still rejects (existing behavior,
|
||||
guarding against regression from this PR's restructure)."""
|
||||
client = _fake_httpx_client_returning(
|
||||
{
|
||||
"success": True,
|
||||
"score": 0.1,
|
||||
"action": "signup",
|
||||
"hostname": "cloud.onyx.app",
|
||||
}
|
||||
)
|
||||
with (
|
||||
patch.object(captcha_module, "is_captcha_enabled", return_value=True),
|
||||
patch.object(captcha_module.httpx, "AsyncClient", return_value=client),
|
||||
):
|
||||
with pytest.raises(
|
||||
CaptchaVerificationError, match="suspicious activity detected"
|
||||
):
|
||||
await verify_captcha_token("low-score-token", expected_action="signup")
|
||||
@@ -0,0 +1,194 @@
|
||||
"""Unit tests for SharepointConnector site-page slim resilience and
|
||||
validate_connector_settings RoleAssignments permission probe."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from onyx.connectors.exceptions import ConnectorValidationError
|
||||
from onyx.connectors.sharepoint.connector import SharepointConnector
|
||||
|
||||
SITE_URL = "https://tenant.sharepoint.com/sites/MySite"
|
||||
|
||||
|
||||
def _make_connector() -> SharepointConnector:
|
||||
connector = SharepointConnector(sites=[SITE_URL])
|
||||
connector.msal_app = MagicMock()
|
||||
connector.sp_tenant_domain = "tenant"
|
||||
connector._credential_json = {"sp_client_id": "x", "sp_directory_id": "y"}
|
||||
connector._graph_client = MagicMock()
|
||||
return connector
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _fetch_slim_documents_from_sharepoint — site page error resilience
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@patch("onyx.connectors.sharepoint.connector._convert_sitepage_to_slim_document")
|
||||
@patch(
|
||||
"onyx.connectors.sharepoint.connector.SharepointConnector._create_rest_client_context"
|
||||
)
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_site_pages")
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_driveitems")
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector.fetch_sites")
|
||||
def test_site_page_error_does_not_crash(
|
||||
mock_fetch_sites: MagicMock,
|
||||
mock_fetch_driveitems: MagicMock,
|
||||
mock_fetch_site_pages: MagicMock,
|
||||
_mock_create_ctx: MagicMock,
|
||||
mock_convert: MagicMock,
|
||||
) -> None:
|
||||
"""A 401 (or any exception) on a site page is caught; remaining pages are processed."""
|
||||
from onyx.connectors.models import SlimDocument
|
||||
|
||||
connector = _make_connector()
|
||||
connector.include_site_documents = False
|
||||
connector.include_site_pages = True
|
||||
|
||||
site = MagicMock()
|
||||
site.url = SITE_URL
|
||||
mock_fetch_sites.return_value = [site]
|
||||
mock_fetch_driveitems.return_value = iter([])
|
||||
|
||||
page_ok = {"id": "1", "webUrl": SITE_URL + "/SitePages/Good.aspx"}
|
||||
page_bad = {"id": "2", "webUrl": SITE_URL + "/SitePages/Bad.aspx"}
|
||||
mock_fetch_site_pages.return_value = [page_bad, page_ok]
|
||||
|
||||
good_slim = SlimDocument(id="1")
|
||||
|
||||
def _convert_side_effect(
|
||||
page: dict, *_args: object, **_kwargs: object
|
||||
) -> SlimDocument: # noqa: ANN001
|
||||
if page["id"] == "2":
|
||||
from office365.runtime.client_request import ClientRequestException
|
||||
|
||||
raise ClientRequestException(MagicMock(status_code=401), None)
|
||||
return good_slim
|
||||
|
||||
mock_convert.side_effect = _convert_side_effect
|
||||
|
||||
results = [
|
||||
doc
|
||||
for batch in connector._fetch_slim_documents_from_sharepoint()
|
||||
for doc in batch
|
||||
if isinstance(doc, SlimDocument)
|
||||
]
|
||||
|
||||
# Only the good page makes it through; bad page is skipped, no exception raised.
|
||||
assert any(d.id == "1" for d in results)
|
||||
assert not any(d.id == "2" for d in results)
|
||||
|
||||
|
||||
@patch("onyx.connectors.sharepoint.connector._convert_sitepage_to_slim_document")
|
||||
@patch(
|
||||
"onyx.connectors.sharepoint.connector.SharepointConnector._create_rest_client_context"
|
||||
)
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_site_pages")
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_driveitems")
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector.fetch_sites")
|
||||
def test_all_site_pages_fail_does_not_crash(
|
||||
mock_fetch_sites: MagicMock,
|
||||
mock_fetch_driveitems: MagicMock,
|
||||
mock_fetch_site_pages: MagicMock,
|
||||
_mock_create_ctx: MagicMock,
|
||||
mock_convert: MagicMock,
|
||||
) -> None:
|
||||
"""When every site page fails, the generator completes without raising."""
|
||||
connector = _make_connector()
|
||||
connector.include_site_documents = False
|
||||
connector.include_site_pages = True
|
||||
|
||||
site = MagicMock()
|
||||
site.url = SITE_URL
|
||||
mock_fetch_sites.return_value = [site]
|
||||
mock_fetch_driveitems.return_value = iter([])
|
||||
mock_fetch_site_pages.return_value = [
|
||||
{"id": "1", "webUrl": SITE_URL + "/SitePages/A.aspx"},
|
||||
{"id": "2", "webUrl": SITE_URL + "/SitePages/B.aspx"},
|
||||
]
|
||||
mock_convert.side_effect = RuntimeError("context error")
|
||||
|
||||
from onyx.connectors.models import SlimDocument
|
||||
|
||||
# Should not raise; no SlimDocuments in output (only hierarchy nodes).
|
||||
slim_results = [
|
||||
doc
|
||||
for batch in connector._fetch_slim_documents_from_sharepoint()
|
||||
for doc in batch
|
||||
if isinstance(doc, SlimDocument)
|
||||
]
|
||||
assert slim_results == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# validate_connector_settings — RoleAssignments permission probe
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status_code", [401, 403])
|
||||
@patch("onyx.connectors.sharepoint.connector.requests.get")
|
||||
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
|
||||
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
|
||||
def test_validate_raises_on_401_or_403(
|
||||
mock_acquire: MagicMock,
|
||||
_mock_validate_url: MagicMock,
|
||||
mock_get: MagicMock,
|
||||
status_code: int,
|
||||
) -> None:
|
||||
"""validate_connector_settings raises ConnectorValidationError when probe returns 401 or 403."""
|
||||
mock_acquire.return_value = MagicMock(accessToken="tok")
|
||||
mock_get.return_value = MagicMock(status_code=status_code)
|
||||
|
||||
connector = _make_connector()
|
||||
|
||||
with pytest.raises(ConnectorValidationError, match="Sites.FullControl.All"):
|
||||
connector.validate_connector_settings()
|
||||
|
||||
|
||||
@patch("onyx.connectors.sharepoint.connector.requests.get")
|
||||
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
|
||||
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
|
||||
def test_validate_passes_on_200(
|
||||
mock_acquire: MagicMock,
|
||||
_mock_validate_url: MagicMock,
|
||||
mock_get: MagicMock,
|
||||
) -> None:
|
||||
"""validate_connector_settings does not raise when probe returns 200."""
|
||||
mock_acquire.return_value = MagicMock(accessToken="tok")
|
||||
mock_get.return_value = MagicMock(status_code=200)
|
||||
|
||||
connector = _make_connector()
|
||||
connector.validate_connector_settings() # should not raise
|
||||
|
||||
|
||||
@patch("onyx.connectors.sharepoint.connector.requests.get")
|
||||
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
|
||||
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
|
||||
def test_validate_passes_on_network_error(
|
||||
mock_acquire: MagicMock,
|
||||
_mock_validate_url: MagicMock,
|
||||
mock_get: MagicMock,
|
||||
) -> None:
|
||||
"""Network errors during the probe are non-blocking (logged as warning only)."""
|
||||
mock_acquire.return_value = MagicMock(accessToken="tok")
|
||||
mock_get.side_effect = Exception("timeout")
|
||||
|
||||
connector = _make_connector()
|
||||
connector.validate_connector_settings() # should not raise
|
||||
|
||||
|
||||
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
|
||||
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
|
||||
def test_validate_skips_probe_without_credentials(
|
||||
mock_acquire: MagicMock,
|
||||
_mock_validate_url: MagicMock,
|
||||
) -> None:
|
||||
"""Probe is skipped when credentials have not been loaded."""
|
||||
connector = SharepointConnector(sites=[SITE_URL])
|
||||
# msal_app and sp_tenant_domain are None — probe must be skipped.
|
||||
connector.validate_connector_settings() # should not raise
|
||||
mock_acquire.assert_not_called()
|
||||
@@ -996,6 +996,23 @@ export default function ChatPreferencesPage() {
|
||||
</InputVertical>
|
||||
</Card>
|
||||
|
||||
<Card border="solid" rounding="lg">
|
||||
<InputHorizontal
|
||||
title="Image Extraction & Analysis"
|
||||
description="Extract embedded images from uploaded files (PDFs, DOCX, etc.) and summarize them with a vision-capable LLM so image-only documents become searchable and answerable. Requires a vision-capable default LLM."
|
||||
withLabel
|
||||
>
|
||||
<Switch
|
||||
checked={s.image_extraction_and_analysis_enabled ?? true}
|
||||
onCheckedChange={(checked) => {
|
||||
void saveSettings({
|
||||
image_extraction_and_analysis_enabled: checked,
|
||||
});
|
||||
}}
|
||||
/>
|
||||
</InputHorizontal>
|
||||
</Card>
|
||||
|
||||
<Card border="solid" rounding="lg">
|
||||
<Section>
|
||||
<InputHorizontal
|
||||
|
||||
Reference in New Issue
Block a user