Compare commits

...

9 Commits

48 changed files with 2483 additions and 475 deletions

View File

@@ -42,6 +42,7 @@ env:
CONFLUENCE_ACCESS_TOKEN_SCOPED: ${{ secrets.CONFLUENCE_ACCESS_TOKEN_SCOPED }}
# Jira
JIRA_ADMIN_USER_EMAIL: ${{ vars.JIRA_ADMIN_USER_EMAIL }}
JIRA_ADMIN_API_TOKEN: ${{ secrets.JIRA_ADMIN_API_TOKEN }}
# LLMs

View File

@@ -26,6 +26,7 @@ env:
CONFLUENCE_ACCESS_TOKEN: ${{ secrets.CONFLUENCE_ACCESS_TOKEN }}
CONFLUENCE_ACCESS_TOKEN_SCOPED: ${{ secrets.CONFLUENCE_ACCESS_TOKEN_SCOPED }}
JIRA_BASE_URL: ${{ secrets.JIRA_BASE_URL }}
JIRA_ADMIN_USER_EMAIL: ${{ vars.JIRA_ADMIN_USER_EMAIL }}
JIRA_USER_EMAIL: ${{ secrets.JIRA_USER_EMAIL }}
JIRA_API_TOKEN: ${{ secrets.JIRA_API_TOKEN }}
JIRA_API_TOKEN_SCOPED: ${{ secrets.JIRA_API_TOKEN_SCOPED }}
@@ -431,6 +432,7 @@ jobs:
-e CONFLUENCE_ACCESS_TOKEN=${CONFLUENCE_ACCESS_TOKEN} \
-e CONFLUENCE_ACCESS_TOKEN_SCOPED=${CONFLUENCE_ACCESS_TOKEN_SCOPED} \
-e JIRA_BASE_URL=${JIRA_BASE_URL} \
-e JIRA_ADMIN_USER_EMAIL=${JIRA_ADMIN_USER_EMAIL} \
-e JIRA_USER_EMAIL=${JIRA_USER_EMAIL} \
-e JIRA_API_TOKEN=${JIRA_API_TOKEN} \
-e JIRA_API_TOKEN_SCOPED=${JIRA_API_TOKEN_SCOPED} \

View File

@@ -116,6 +116,7 @@ jobs:
CONFLUENCE_ACCESS_TOKEN, test/confluence-access-token
CONFLUENCE_ACCESS_TOKEN_SCOPED, test/confluence-access-token-scoped
JIRA_BASE_URL, test/jira-base-url
JIRA_ADMIN_USER_EMAIL, test/jira-admin-user-email
JIRA_USER_EMAIL, test/jira-user-email
JIRA_API_TOKEN, test/jira-api-token
JIRA_API_TOKEN_SCOPED, test/jira-api-token-scoped

View File

@@ -0,0 +1,75 @@
"""add missing FK indexes for hot tables
Adds indexes on foreign key columns that are frequently queried by celery
worker tasks but were missing indexes, causing full sequential scans.
Postgres does NOT automatically create indexes on foreign key columns
(unlike MySQL). These four columns were identified via Aurora Performance
Insights as the top contributors to primary-worker AAS after the
ix_document_needs_sync partial index was deployed.
- index_attempt_errors.index_attempt_id: #1 query in pg_stat_statements
(56ms mean on a 532K-row / 997MB table). Queried every check_for_indexing
cycle to load errors per attempt.
- index_attempt_errors.connector_credential_pair_id: same table, queried
when filtering errors by cc_pair.
- index_attempt.connector_credential_pair_id: queried by check_for_indexing
to find attempts for each cc_pair. 73ms mean on tenant_dev.
- hierarchy_node.document_id: 18MB+ on large tenants, queried during
document deletion and hierarchy rebuilds.
Note: Index names follow SQLAlchemy's ix_<table>_<column> convention to match
the `index=True` declarations in models.py. This prevents autogenerate from
detecting a mismatch and creating duplicate indexes.
Revision ID: 856bcbe14d79
Revises: a6fcd3d631f9
Create Date: 2026-04-19 18:00:00.000000
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "856bcbe14d79"
down_revision = "91d150c361f6"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_index(
"ix_index_attempt_errors_index_attempt_id",
"index_attempt_errors",
["index_attempt_id"],
)
op.create_index(
"ix_index_attempt_errors_connector_credential_pair_id",
"index_attempt_errors",
["connector_credential_pair_id"],
)
op.create_index(
"ix_index_attempt_connector_credential_pair_id",
"index_attempt",
["connector_credential_pair_id"],
)
op.create_index(
"ix_hierarchy_node_document_id",
"hierarchy_node",
["document_id"],
)
def downgrade() -> None:
op.drop_index("ix_hierarchy_node_document_id", table_name="hierarchy_node")
op.drop_index(
"ix_index_attempt_connector_credential_pair_id", table_name="index_attempt"
)
op.drop_index(
"ix_index_attempt_errors_connector_credential_pair_id",
table_name="index_attempt_errors",
)
op.drop_index(
"ix_index_attempt_errors_index_attempt_id", table_name="index_attempt_errors"
)

View File

@@ -81,6 +81,9 @@ from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_tenant_work_gating import maybe_mark_tenant_active
from onyx.server.metrics.perm_sync_metrics import inc_doc_perm_sync_docs_processed
from onyx.server.metrics.perm_sync_metrics import inc_doc_perm_sync_errors
from onyx.server.metrics.perm_sync_metrics import observe_doc_perm_sync_duration
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.server.utils import make_short_id
from onyx.utils.logger import doc_permission_sync_ctx
@@ -475,6 +478,8 @@ def connector_permission_sync_generator_task(
_fail_doc_permission_sync_attempt(attempt_id, error_msg)
return None
sync_start = time.monotonic()
connector_type: str = "unknown"
try:
with get_session_with_current_tenant() as db_session:
cc_pair = get_connector_credential_pair_from_id(
@@ -508,6 +513,7 @@ def connector_permission_sync_generator_task(
raise
source_type = cc_pair.connector.source
connector_type = source_type.value
sync_config = get_source_perm_sync_config(source_type)
if sync_config is None:
error_msg = f"No sync config found for {source_type}"
@@ -593,7 +599,7 @@ def connector_permission_sync_generator_task(
result = redis_connector.permissions.update_db(
lock=lock,
new_permissions=[doc_external_access],
source_string=source_type,
source_string=connector_type,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
task_logger=task_logger,
@@ -606,6 +612,10 @@ def connector_permission_sync_generator_task(
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated} docs_with_errors={docs_with_errors}"
)
inc_doc_perm_sync_docs_processed(connector_type, tasks_generated)
if docs_with_errors > 0:
inc_doc_perm_sync_errors(connector_type, docs_with_errors)
complete_doc_permission_sync_attempt(
db_session=db_session,
attempt_id=attempt_id,
@@ -638,6 +648,7 @@ def connector_permission_sync_generator_task(
redis_connector.permissions.set_fence(None)
raise e
finally:
observe_doc_perm_sync_duration(time.monotonic() - sync_start, connector_type)
if lock.owned():
lock.release()

View File

@@ -70,6 +70,11 @@ from onyx.redis.redis_connector_ext_group_sync import (
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_tenant_work_gating import maybe_mark_tenant_active
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_errors
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_groups_processed
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_users_processed
from onyx.server.metrics.perm_sync_metrics import observe_group_sync_duration
from onyx.server.metrics.perm_sync_metrics import observe_group_sync_upsert_duration
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.server.utils import make_short_id
from onyx.utils.logger import format_error_for_logging
@@ -471,7 +476,9 @@ def _perform_external_group_sync(
tenant_id: str,
timeout_seconds: int = JOB_TIMEOUT,
) -> None:
# Create attempt record at the start
sync_start = time.monotonic()
connector_type: str = "unknown"
with get_session_with_current_tenant() as db_session:
attempt_id = create_external_group_sync_attempt(
connector_credential_pair_id=cc_pair_id,
@@ -481,6 +488,23 @@ def _perform_external_group_sync(
f"Created external group sync attempt: {attempt_id} for cc_pair={cc_pair_id}"
)
try:
connector_type = _timed_perform_external_group_sync(
cc_pair_id=cc_pair_id,
tenant_id=tenant_id,
timeout_seconds=timeout_seconds,
attempt_id=attempt_id,
)
finally:
observe_group_sync_duration(time.monotonic() - sync_start, connector_type)
def _timed_perform_external_group_sync(
cc_pair_id: int,
tenant_id: str,
attempt_id: int,
timeout_seconds: int = JOB_TIMEOUT,
) -> str:
with get_session_with_current_tenant() as db_session:
cc_pair = get_connector_credential_pair_from_id(
db_session=db_session,
@@ -491,6 +515,7 @@ def _perform_external_group_sync(
raise ValueError(f"No connector credential pair found for id: {cc_pair_id}")
source_type = cc_pair.connector.source
connector_type = source_type.value
sync_config = get_source_perm_sync_config(source_type)
if sync_config is None:
msg = f"No sync config found for {source_type} for cc_pair: {cc_pair_id}"
@@ -534,6 +559,7 @@ def _perform_external_group_sync(
seen_users: set[str] = set() # Track unique users across all groups
total_groups_processed = 0
total_group_memberships_synced = 0
cumulative_upsert_time = 0.0
start_time = time.monotonic()
try:
external_user_group_generator = ext_group_sync_func(tenant_id, cc_pair)
@@ -562,22 +588,26 @@ def _perform_external_group_sync(
logger.debug(
f"New external user groups: {external_user_group_batch}"
)
upsert_start = time.monotonic()
upsert_external_groups(
db_session=db_session,
cc_pair_id=cc_pair_id,
external_groups=external_user_group_batch,
source=cc_pair.connector.source,
)
cumulative_upsert_time += time.monotonic() - upsert_start
external_user_group_batch = []
if external_user_group_batch:
logger.debug(f"New external user groups: {external_user_group_batch}")
upsert_start = time.monotonic()
upsert_external_groups(
db_session=db_session,
cc_pair_id=cc_pair_id,
external_groups=external_user_group_batch,
source=cc_pair.connector.source,
)
cumulative_upsert_time += time.monotonic() - upsert_start
except Exception as e:
format_error_for_logging(e)
@@ -587,11 +617,14 @@ def _perform_external_group_sync(
)
# TODO: add some notification to the admins here
inc_group_sync_errors(connector_type)
logger.exception(
f"Error syncing external groups for {source_type} for cc_pair: {cc_pair_id} {e}"
)
raise e
observe_group_sync_upsert_duration(cumulative_upsert_time, connector_type)
logger.info(
f"Removing stale external groups for {source_type} for cc_pair: {cc_pair_id}"
)
@@ -615,8 +648,13 @@ def _perform_external_group_sync(
f"{total_group_memberships_synced} memberships"
)
inc_group_sync_groups_processed(connector_type, total_groups_processed)
inc_group_sync_users_processed(connector_type, total_users_processed)
mark_all_relevant_cc_pairs_as_external_group_synced(db_session, cc_pair)
return connector_type
def validate_external_group_sync_fences(
tenant_id: str,

View File

@@ -1,80 +0,0 @@
# import asyncio
# from typing import Optional
# from typing import TYPE_CHECKING
# from fastapi import APIRouter
# from fastapi import HTTPException
# from model_server.utils import simple_log_function_time
# from onyx.utils.logger import setup_logger
# from shared_configs.configs import INDEXING_ONLY
# from shared_configs.model_server_models import RerankRequest
# from shared_configs.model_server_models import RerankResponse
# if TYPE_CHECKING:
# from sentence_transformers import CrossEncoder
# logger = setup_logger()
# router = APIRouter(prefix="/encoder")
# _RERANK_MODEL: Optional["CrossEncoder"] = None
# def get_local_reranking_model(
# model_name: str,
# ) -> "CrossEncoder":
# global _RERANK_MODEL
# from sentence_transformers import CrossEncoder
# if _RERANK_MODEL is None:
# logger.notice(f"Loading {model_name}")
# model = CrossEncoder(model_name)
# _RERANK_MODEL = model
# return _RERANK_MODEL
# @simple_log_function_time()
# async def local_rerank(query: str, docs: list[str], model_name: str) -> list[float]:
# cross_encoder = get_local_reranking_model(model_name)
# # Run CPU-bound reranking in a thread pool
# return await asyncio.get_event_loop().run_in_executor(
# None,
# lambda: cross_encoder.predict([(query, doc) for doc in docs]).tolist(),
# )
# @router.post("/cross-encoder-scores")
# async def process_rerank_request(rerank_request: RerankRequest) -> RerankResponse:
# """Cross encoders can be purely black box from the app perspective"""
# # Only local models should use this endpoint - API providers should make direct API calls
# if rerank_request.provider_type is not None:
# raise ValueError(
# f"Model server reranking endpoint should only be used for local models. "
# f"API provider '{rerank_request.provider_type}' should make direct API calls instead."
# )
# if INDEXING_ONLY:
# raise RuntimeError("Indexing model server should not call reranking endpoint")
# if not rerank_request.documents or not rerank_request.query:
# raise HTTPException(
# status_code=400, detail="Missing documents or query for reranking"
# )
# if not all(rerank_request.documents):
# raise ValueError("Empty documents cannot be reranked.")
# try:
# # At this point, provider_type is None, so handle local reranking
# sim_scores = await local_rerank(
# query=rerank_request.query,
# docs=rerank_request.documents,
# model_name=rerank_request.model_name,
# )
# return RerankResponse(scores=sim_scores)
# except Exception as e:
# logger.exception(f"Error during reranking process:\n{str(e)}")
# raise HTTPException(
# status_code=500, detail="Failed to run Cross-Encoder reranking"
# )

View File

@@ -76,24 +76,34 @@ async def verify_captcha_token(
f"Captcha verification failed: {', '.join(error_codes)}"
)
# For reCAPTCHA v3, also check the score
if result.score is not None:
if result.score < RECAPTCHA_SCORE_THRESHOLD:
logger.warning(
f"Captcha score too low: {result.score} < {RECAPTCHA_SCORE_THRESHOLD}"
)
raise CaptchaVerificationError(
"Captcha verification failed: suspicious activity detected"
)
# Require v3 score. Google's public test secret returns no score
# — that path must not be active in prod since it skips the only
# human-vs-bot signal. A missing score here means captcha is
# misconfigured (test secret in prod, or a v2 response slipped in
# via an action mismatch).
if result.score is None:
logger.warning(
"Captcha verification failed: siteverify returned no score (likely test secret in prod)"
)
raise CaptchaVerificationError(
"Captcha verification failed: missing score"
)
# Optionally verify the action matches
if result.action and result.action != expected_action:
logger.warning(
f"Captcha action mismatch: {result.action} != {expected_action}"
)
raise CaptchaVerificationError(
"Captcha verification failed: action mismatch"
)
if result.score < RECAPTCHA_SCORE_THRESHOLD:
logger.warning(
f"Captcha score too low: {result.score} < {RECAPTCHA_SCORE_THRESHOLD}"
)
raise CaptchaVerificationError(
"Captcha verification failed: suspicious activity detected"
)
if result.action and result.action != expected_action:
logger.warning(
f"Captcha action mismatch: {result.action} != {expected_action}"
)
raise CaptchaVerificationError(
"Captcha verification failed: action mismatch"
)
logger.debug(
f"Captcha verification passed: score={result.score}, action={result.action}"

View File

@@ -380,6 +380,24 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
safe: bool = False,
request: Optional[Request] = None,
) -> User:
# Check for disposable emails FIRST so obvious throwaway domains are
# rejected before hitting Google's siteverify API. Cheap local check.
try:
verify_email_domain(user_create.email, is_registration=True)
except OnyxError as e:
# Log blocked disposable email attempts
if "Disposable email" in e.detail:
domain = (
user_create.email.split("@")[-1]
if "@" in user_create.email
else "unknown"
)
logger.warning(
f"Blocked disposable email registration attempt: {domain}",
extra={"email_domain": domain},
)
raise
# Verify captcha if enabled (for cloud signup protection)
from onyx.auth.captcha import CaptchaVerificationError
from onyx.auth.captcha import is_captcha_enabled
@@ -407,24 +425,6 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
user_create.password, cast(schemas.UC, user_create)
)
# Check for disposable emails BEFORE provisioning tenant
# This prevents creating tenants for throwaway email addresses
try:
verify_email_domain(user_create.email, is_registration=True)
except OnyxError as e:
# Log blocked disposable email attempts
if "Disposable email" in e.detail:
domain = (
user_create.email.split("@")[-1]
if "@" in user_create.email
else "unknown"
)
logger.warning(
f"Blocked disposable email registration attempt: {domain}",
extra={"email_domain": domain},
)
raise
user_count: int | None = None
referral_source = (
request.cookies.get("referral_source", None)
@@ -620,8 +620,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
sync_db.commit()
else:
logger.warning(
"User %s not found in sync session during upgrade to standard; "
"skipping upgrade",
"User %s not found in sync session during upgrade to standard; skipping upgrade",
user_id,
)
@@ -1614,7 +1613,6 @@ async def optional_user(
async_db_session: AsyncSession = Depends(get_async_session),
user: User | None = Depends(optional_fastapi_current_user),
) -> User | None:
if user := await _check_for_saml_and_jwt(request, user, async_db_session):
# If user is already set, _check_for_saml_and_jwt returns the same user object
return user

View File

@@ -60,7 +60,9 @@ from onyx.configs.constants import DEFAULT_PERSONA_ID
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import MessageType
from onyx.configs.constants import MilestoneRecordType
from onyx.configs.llm_configs import get_image_extraction_and_analysis_enabled
from onyx.context.search.models import BaseFilters
from onyx.context.search.models import IndexFilters
from onyx.context.search.models import SearchDoc
from onyx.db.chat import create_new_chat_message
from onyx.db.chat import get_chat_session_by_id
@@ -74,12 +76,17 @@ from onyx.db.models import Persona
from onyx.db.models import User
from onyx.db.models import UserFile
from onyx.db.projects import get_user_files_from_project
from onyx.db.search_settings import get_active_search_settings
from onyx.db.tools import get_tools
from onyx.deep_research.dr_loop import run_deep_research_llm_loop
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.interfaces import VespaChunkRequest
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import log_onyx_error
from onyx.error_handling.exceptions import OnyxError
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_store.models import ChatFileType
from onyx.file_store.models import InMemoryChatFile
from onyx.file_store.utils import load_in_memory_chat_files
@@ -122,6 +129,7 @@ from onyx.tools.tool_constructor import SearchToolConfig
from onyx.utils.logger import setup_logger
from onyx.utils.telemetry import mt_cloud_telemetry
from onyx.utils.timing import log_function_time
from shared_configs.configs import MULTI_TENANT
from shared_configs.contextvars import get_current_tenant_id
logger = setup_logger()
@@ -244,22 +252,106 @@ def _empty_extracted_context_files() -> ExtractedContextFiles:
)
def _extract_text_from_in_memory_file(f: InMemoryChatFile) -> str | None:
def _fetch_cached_image_captions(
user_file: UserFile | None,
document_index: DocumentIndex | None,
) -> list[str]:
"""Read image-caption chunks for a user file from the document index.
During indexing, embedded images are summarized via a vision LLM and
those summaries are stored as chunks whose `image_file_id` is set. Reading
them back at chat time avoids re-running vision-LLM calls per turn.
Returns an empty list if the index has no chunks yet (e.g. indexing is
still in flight) or on any fetch failure.
"""
if user_file is None or document_index is None:
return []
try:
chunks = document_index.id_based_retrieval(
chunk_requests=[VespaChunkRequest(document_id=str(user_file.id))],
filters=IndexFilters(
access_control_list=None,
tenant_id=get_current_tenant_id() if MULTI_TENANT else None,
),
)
except Exception:
logger.warning(
f"Failed to fetch cached captions for user_file {user_file.id}",
exc_info=True,
)
return []
# An image can be spread across multiple chunks; combine by image_file_id
# so a single caption appears once in the context.
combined: dict[str, list[str]] = {}
for chunk in chunks:
if chunk.image_file_id and chunk.content:
combined.setdefault(chunk.image_file_id, []).append(chunk.content)
return [
f"[Image — {image_file_id}]\n" + "\n".join(contents)
for image_file_id, contents in combined.items()
]
def _extract_text_from_in_memory_file(
f: InMemoryChatFile,
user_file: UserFile | None = None,
document_index: DocumentIndex | None = None,
) -> str | None:
"""Extract text content from an InMemoryChatFile.
PLAIN_TEXT: the content is pre-extracted UTF-8 plaintext stored during
ingestion — decode directly.
DOC / CSV / other text types: the content is the original file bytes —
use extract_file_text which handles encoding detection and format parsing.
When image extraction is enabled and the file has embedded images, cached
captions are pulled from the document index and appended to the text.
The index fetch is skipped for files with no embedded images. We do not
re-summarize images inline here — this path is hot and the indexing
pipeline writes chunks atomically, so a missed caption means the file
is mid-indexing and will be picked up on the next turn.
"""
try:
if f.file_type == ChatFileType.PLAIN_TEXT:
return f.content.decode("utf-8", errors="ignore").replace("\x00", "")
return extract_file_text(
filename = f.filename or ""
if not get_image_extraction_and_analysis_enabled():
return extract_file_text(
file=io.BytesIO(f.content),
file_name=filename,
break_on_unprocessable=False,
)
extraction = extract_text_and_images(
file=io.BytesIO(f.content),
file_name=f.filename or "",
break_on_unprocessable=False,
file_name=filename,
)
text = extraction.text_content
has_text = bool(text.strip())
has_images = bool(extraction.embedded_images)
if not has_text and not has_images:
# extract_text_and_images has no is_text_file() fallback for
# unknown extensions (.py/.rs/.md without a dedicated handler).
# Defer to the legacy path so those files remain readable.
return extract_file_text(
file=io.BytesIO(f.content),
file_name=filename,
break_on_unprocessable=False,
)
if not has_images:
return text if has_text else None
cached_captions = _fetch_cached_image_captions(user_file, document_index)
parts: list[str] = []
if has_text:
parts.append(text)
parts.extend(cached_captions)
return "\n\n".join(parts).strip() or None
except Exception:
logger.warning(f"Failed to extract text from file {f.file_id}", exc_info=True)
return None
@@ -341,6 +433,23 @@ def extract_context_files(
db_session=db_session,
)
# The document index is used at chat time to read cached image captions
# (produced during indexing) so vision-LLM calls don't re-run per turn.
document_index: DocumentIndex | None = None
if not DISABLE_VECTOR_DB and get_image_extraction_and_analysis_enabled():
try:
active_search_settings = get_active_search_settings(db_session)
document_index = get_default_document_index(
search_settings=active_search_settings.primary,
secondary_search_settings=None,
db_session=db_session,
)
except Exception:
logger.warning(
"Failed to construct document index for caption lookup",
exc_info=True,
)
file_texts: list[str] = []
image_files: list[ChatLoadedFile] = []
file_metadata: list[ContextFileMetadata] = []
@@ -361,7 +470,9 @@ def extract_context_files(
continue
tool_metadata.append(_build_tool_metadata(uf))
elif f.file_type.is_text_file():
text_content = _extract_text_from_in_memory_file(f)
text_content = _extract_text_from_in_memory_file(
f, user_file=uf, document_index=document_index
)
if not text_content:
continue
if not uf:

View File

@@ -3,7 +3,14 @@ from onyx.server.settings.store import load_settings
def get_image_extraction_and_analysis_enabled() -> bool:
"""Get image extraction and analysis enabled setting from workspace settings or fallback to False"""
"""Return the workspace setting for image extraction/analysis.
The pydantic `Settings` model defaults this field to True, so production
tenants get the feature on by default on first read. The fallback here
stays False so environments where settings cannot be loaded at all
(e.g. unit tests with no DB/Redis) don't trigger downstream vision-LLM
code paths that assume the DB is reachable.
"""
try:
settings = load_settings()
if settings.image_extraction_and_analysis_enabled is not None:

View File

@@ -2,7 +2,7 @@ import json
import os
#####
# Embedding/Reranking Model Configs
# Embedding Model Configs
#####
# Important considerations when choosing models
# Max tokens count needs to be high considering use case (at least 512)
@@ -27,10 +27,6 @@ OLD_DEFAULT_DOCUMENT_ENCODER_MODEL = "thenlper/gte-small"
OLD_DEFAULT_MODEL_DOC_EMBEDDING_DIM = 384
OLD_DEFAULT_MODEL_NORMALIZE_EMBEDDINGS = False
# These are only used if reranking is turned off, to normalize the direct retrieval scores for display
# Currently unused
SIM_SCORE_RANGE_LOW = float(os.environ.get("SIM_SCORE_RANGE_LOW") or 0.0)
SIM_SCORE_RANGE_HIGH = float(os.environ.get("SIM_SCORE_RANGE_HIGH") or 1.0)
# Certain models like e5, BGE, etc use a prefix for asymmetric retrievals (query generally shorter than docs)
ASYM_QUERY_PREFIX = os.environ.get("ASYM_QUERY_PREFIX", "search_query: ")
ASYM_PASSAGE_PREFIX = os.environ.get("ASYM_PASSAGE_PREFIX", "search_document: ")
@@ -42,9 +38,6 @@ EMBEDDING_BATCH_SIZE = int(os.environ.get("EMBEDDING_BATCH_SIZE") or 0) or None
BATCH_SIZE_ENCODE_CHUNKS = EMBEDDING_BATCH_SIZE or 8
# don't send over too many chunks at once, as sending too many could cause timeouts
BATCH_SIZE_ENCODE_CHUNKS_FOR_API_EMBEDDING_SERVICES = EMBEDDING_BATCH_SIZE or 512
# For score display purposes, only way is to know the expected ranges
CROSS_ENCODER_RANGE_MAX = 1
CROSS_ENCODER_RANGE_MIN = 0
#####

View File

@@ -231,6 +231,8 @@ class DocumentBase(BaseModel):
# Set during docfetching after hierarchy nodes are cached
parent_hierarchy_node_id: int | None = None
file_id: str | None = None
def get_title_for_document_index(
self,
) -> str | None:
@@ -370,6 +372,7 @@ class Document(DocumentBase):
secondary_owners=base.secondary_owners,
title=base.title,
from_ingestion_api=base.from_ingestion_api,
file_id=base.file_id,
)
def __sizeof__(self) -> int:

View File

@@ -75,6 +75,8 @@ from onyx.file_processing.file_types import OnyxMimeTypes
from onyx.file_processing.image_utils import store_image_and_create_section
from onyx.utils.b64 import get_image_type_from_bytes
from onyx.utils.logger import setup_logger
from onyx.utils.url import SSRFException
from onyx.utils.url import validate_outbound_http_url
logger = setup_logger()
SLIM_BATCH_SIZE = 1000
@@ -981,6 +983,42 @@ class SharepointConnector(
raise ConnectorValidationError(
"Site URLs must be full Sharepoint URLs (e.g. https://your-tenant.sharepoint.com/sites/your-site or https://your-tenant.sharepoint.com/teams/your-team)"
)
try:
validate_outbound_http_url(site_url, https_only=True)
except (SSRFException, ValueError) as e:
raise ConnectorValidationError(
f"Invalid site URL '{site_url}': {e}"
) from e
# Probe RoleAssignments permission — required for permission sync.
# Only runs when credentials have been loaded.
if self.msal_app and self.sp_tenant_domain and self.sites:
try:
token_response = acquire_token_for_rest(
self.msal_app,
self.sp_tenant_domain,
self.sharepoint_domain_suffix,
)
probe_url = (
f"{self.sites[0].rstrip('/')}/_api/web/roleassignments?$top=1"
)
resp = requests.get(
probe_url,
headers={"Authorization": f"Bearer {token_response.accessToken}"},
timeout=10,
)
if resp.status_code in (401, 403):
raise ConnectorValidationError(
"The Azure AD app registration is missing the required SharePoint permission "
"to read role assignments. Please grant 'Sites.FullControl.All' "
"(application permission) in the Azure portal and re-run admin consent."
)
except ConnectorValidationError:
raise
except Exception as e:
logger.warning(
f"RoleAssignments permission probe failed (non-blocking): {e}"
)
def _extract_tenant_domain_from_sites(self) -> str | None:
"""Extract the tenant domain from configured site URLs.
@@ -1876,16 +1914,22 @@ class SharepointConnector(
logger.debug(
f"Processing site page: {site_page.get('webUrl', site_page.get('name', 'Unknown'))}"
)
ctx = self._create_rest_client_context(site_descriptor.url)
doc_batch.append(
_convert_sitepage_to_slim_document(
site_page,
ctx,
self.graph_client,
parent_hierarchy_raw_node_id=site_descriptor.url,
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
try:
ctx = self._create_rest_client_context(site_descriptor.url)
doc_batch.append(
_convert_sitepage_to_slim_document(
site_page,
ctx,
self.graph_client,
parent_hierarchy_raw_node_id=site_descriptor.url,
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
)
)
except Exception as e:
logger.warning(
f"Failed to process site page "
f"{site_page.get('webUrl', site_page.get('name', 'Unknown'))}: {e}"
)
)
if len(doc_batch) >= SLIM_BATCH_SIZE:
yield doc_batch
doc_batch = []

View File

@@ -696,6 +696,7 @@ def upsert_documents(
else {}
),
doc_metadata=doc.doc_metadata,
file_id=doc.file_id,
)
)
for doc in seen_documents.values()
@@ -712,6 +713,7 @@ def upsert_documents(
"secondary_owners": insert_stmt.excluded.secondary_owners,
"doc_metadata": insert_stmt.excluded.doc_metadata,
"parent_hierarchy_node_id": insert_stmt.excluded.parent_hierarchy_node_id,
"file_id": insert_stmt.excluded.file_id,
}
if includes_permissions:
# Use COALESCE to preserve existing permissions when new values are NULL.

View File

@@ -62,6 +62,21 @@ def delete_filerecord_by_file_id(
db_session.query(FileRecord).filter_by(file_id=file_id).delete()
def update_filerecord_origin(
file_id: str,
from_origin: FileOrigin,
to_origin: FileOrigin,
db_session: Session,
) -> None:
"""Change a file_record's `file_origin`, filtered on the current origin
so the update is idempotent. Caller owns the commit.
"""
db_session.query(FileRecord).filter(
FileRecord.file_id == file_id,
FileRecord.file_origin == from_origin,
).update({FileRecord.file_origin: to_origin})
def upsert_filerecord(
file_id: str,
display_name: str,

View File

@@ -894,7 +894,7 @@ class HierarchyNode(Base):
# For hierarchy nodes that are also documents (e.g., Confluence pages)
# SET NULL when document is deleted - node can exist without its document
document_id: Mapped[str | None] = mapped_column(
ForeignKey("document.id", ondelete="SET NULL"), nullable=True
ForeignKey("document.id", ondelete="SET NULL"), nullable=True, index=True
)
# Self-referential FK for tree structure
@@ -2204,6 +2204,7 @@ class IndexAttempt(Base):
connector_credential_pair_id: Mapped[int] = mapped_column(
ForeignKey("connector_credential_pair.id"),
nullable=False,
index=True,
)
# Some index attempts that run from beginning will still have this as False
@@ -2407,10 +2408,12 @@ class IndexAttemptError(Base):
index_attempt_id: Mapped[int] = mapped_column(
ForeignKey("index_attempt.id"),
nullable=False,
index=True,
)
connector_credential_pair_id: Mapped[int] = mapped_column(
ForeignKey("connector_credential_pair.id"),
nullable=False,
index=True,
)
document_id: Mapped[str | None] = mapped_column(String, nullable=True)

View File

@@ -98,6 +98,9 @@ class DocumentMetadata:
# The resolved database ID of the parent hierarchy node (folder/container)
parent_hierarchy_node_id: int | None = None
# Opt-in pointer to the persisted raw file for this document (file_store id).
file_id: str | None = None
@dataclass
class VespaDocumentFields:

View File

@@ -368,6 +368,40 @@ def extract_docx_images(docx_bytes: IO[Any]) -> Iterator[tuple[bytes, str]]:
logger.exception("Failed to extract all docx images")
def count_docx_embedded_images(file: IO[Any], cap: int) -> int:
"""Return the number of embedded images in a docx, short-circuiting at cap+1.
Mirrors count_pdf_embedded_images so upload validation can apply the same
per-file/per-batch caps. Returns a value > cap once the count exceeds the
cap so callers do not iterate every media entry just to report a number.
Always restores the file pointer to its original position before returning.
"""
try:
start_pos = file.tell()
except Exception:
start_pos = None
try:
if start_pos is not None:
file.seek(0)
count = 0
with zipfile.ZipFile(file) as z:
for name in z.namelist():
if name.startswith("word/media/"):
count += 1
if count > cap:
return count
return count
except Exception:
logger.warning("Failed to count embedded images in docx", exc_info=True)
return 0
finally:
if start_pos is not None:
try:
file.seek(start_pos)
except Exception:
pass
def read_docx_file(
file: IO[Any],
file_name: str = "",

View File

@@ -2,7 +2,10 @@ from collections.abc import Callable
from typing import Any
from typing import IO
from sqlalchemy.orm import Session
from onyx.configs.constants import FileOrigin
from onyx.db.file_record import update_filerecord_origin
from onyx.file_store.file_store import get_default_file_store
from onyx.utils.logger import setup_logger
@@ -61,3 +64,13 @@ def build_raw_file_callback(
)
return _callback
def promote_staged_file(db_session: Session, file_id: str) -> None:
"""Mark a previously-staged file as `FileOrigin.CONNECTOR`."""
update_filerecord_origin(
file_id=file_id,
from_origin=FileOrigin.INDEXING_STAGING,
to_origin=FileOrigin.CONNECTOR,
db_session=db_session,
)

View File

@@ -30,6 +30,7 @@ from onyx.connectors.models import ImageSection
from onyx.connectors.models import IndexAttemptMetadata
from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import Section
from onyx.connectors.models import SectionType
from onyx.connectors.models import TextSection
from onyx.db.document import get_documents_by_ids
from onyx.db.document import upsert_document_by_connector_credential_pair
@@ -49,6 +50,7 @@ from onyx.document_index.interfaces import DocumentMetadata
from onyx.document_index.interfaces import IndexBatchParams
from onyx.file_processing.image_summarization import summarize_image_with_error_handling
from onyx.file_store.file_store import get_default_file_store
from onyx.file_store.staging import promote_staged_file
from onyx.hooks.executor import execute_hook
from onyx.hooks.executor import HookSkipped
from onyx.hooks.executor import HookSoftFailed
@@ -154,6 +156,7 @@ def _upsert_documents_in_db(
doc_metadata=doc.doc_metadata,
# parent_hierarchy_node_id is resolved in docfetching using Redis cache
parent_hierarchy_node_id=doc.parent_hierarchy_node_id,
file_id=doc.file_id,
)
document_metadata_list.append(db_doc_metadata)
@@ -364,6 +367,45 @@ def index_doc_batch_with_handler(
return index_pipeline_result
def _promote_new_staged_files(
documents: list[Document],
previous_file_ids: dict[str, str],
db_session: Session,
) -> None:
"""Queue STAGING → CONNECTOR origin flips for every new file_id in the batch.
Intended to run immediately before `_upsert_documents_in_db` so the origin
flip lands in the same commit as the `Document.file_id` write. Does not
commit — the caller's next commit flushes these UPDATEs.
"""
for doc in documents:
new_file_id = doc.file_id
if new_file_id is None or new_file_id == previous_file_ids.get(doc.id):
continue
promote_staged_file(db_session=db_session, file_id=new_file_id)
def _delete_replaced_files(
documents: list[Document],
previous_file_ids: dict[str, str],
) -> None:
"""Best-effort blob deletes for file_ids replaced in this batch.
Must run AFTER `Document.file_id` has been committed to the new
file_id.
"""
file_store = get_default_file_store()
for doc in documents:
new_file_id = doc.file_id
old_file_id = previous_file_ids.get(doc.id)
if old_file_id is None or old_file_id == new_file_id:
continue
try:
file_store.delete_file(old_file_id, error_on_missing=False)
except Exception:
logger.exception(f"Failed to delete replaced file_id={old_file_id}.")
def index_doc_batch_prepare(
documents: list[Document],
index_attempt_metadata: IndexAttemptMetadata,
@@ -382,6 +424,11 @@ def index_doc_batch_prepare(
document_ids=document_ids,
)
# Capture previous file_ids BEFORE any writes so we know what to reap.
previous_file_ids: dict[str, str] = {
db_doc.id: db_doc.file_id for db_doc in db_docs if db_doc.file_id is not None
}
updatable_docs = (
get_doc_ids_to_update(documents=documents, db_docs=db_docs)
if not ignore_time_skip
@@ -399,11 +446,24 @@ def index_doc_batch_prepare(
# for all updatable docs, upsert into the DB
# Does not include doc_updated_at which is also used to indicate a successful update
if updatable_docs:
# Queue the STAGING → CONNECTOR origin flips BEFORE the Document upsert
# so `upsert_documents`' commit flushes Document.file_id and the origin
# flip atomically
_promote_new_staged_files(
documents=updatable_docs,
previous_file_ids=previous_file_ids,
db_session=db_session,
)
_upsert_documents_in_db(
documents=updatable_docs,
index_attempt_metadata=index_attempt_metadata,
db_session=db_session,
)
# Blob deletes run only after Document.file_id is durable.
_delete_replaced_files(
documents=updatable_docs,
previous_file_ids=previous_file_ids,
)
logger.info(
f"Upserted {len(updatable_docs)} changed docs out of {len(documents)} total docs into the DB"
@@ -530,8 +590,15 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
Returns:
List of IndexingDocument objects with processed_sections as list[Section]
"""
# Check if image extraction and analysis is enabled before trying to get a vision LLM
if not get_image_extraction_and_analysis_enabled():
# Check if image extraction and analysis is enabled before trying to get a vision LLM.
# Use section.type rather than isinstance because sections can round-trip
# through pydantic as base Section instances (not the concrete subclass).
has_image_section = any(
section.type == SectionType.IMAGE
for document in documents
for section in document.sections
)
if not get_image_extraction_and_analysis_enabled() or not has_image_section:
llm = None
else:
# Only get the vision LLM if image processing is enabled

View File

@@ -743,7 +743,13 @@ def model_is_reasoning_model(model_name: str, model_provider: str) -> bool:
model_name,
)
if model_obj and "supports_reasoning" in model_obj:
return model_obj["supports_reasoning"]
reasoning = model_obj["supports_reasoning"]
if reasoning is None:
logger.error(
f"Cannot find reasoning for name={model_name} and provider={model_provider}"
)
reasoning = False
return reasoning
# Fallback: try using litellm.supports_reasoning() for newer models
try:

View File

@@ -1,9 +1,4 @@
"""
Constants for natural language processing, including embedding and reranking models.
This file contains constants moved from model_server to support the gradual migration
of API-based calls to bypass the model server.
"""
"""Constants for natural language processing embeddings and related helpers."""
from shared_configs.enums import EmbeddingProvider
from shared_configs.enums import EmbedTextType

View File

@@ -2,9 +2,3 @@ class ModelServerRateLimitError(Exception):
"""
Exception raised for rate limiting errors from the model server.
"""
class CohereBillingLimitError(Exception):
"""
Raised when Cohere rejects requests because the billing cap is reached.
"""

View File

@@ -12,12 +12,10 @@ from types import TracebackType
from typing import Any
from typing import cast
import aioboto3
import httpx
import requests
import voyageai
from cohere import AsyncClient as CohereAsyncClient
from cohere.core.api_error import ApiError
from google.oauth2 import service_account
from httpx import HTTPError
from requests import JSONDecodeError
@@ -39,12 +37,10 @@ from onyx.natural_language_processing.constants import DEFAULT_OPENAI_MODEL
from onyx.natural_language_processing.constants import DEFAULT_VERTEX_MODEL
from onyx.natural_language_processing.constants import DEFAULT_VOYAGE_MODEL
from onyx.natural_language_processing.constants import EmbeddingModelTextType
from onyx.natural_language_processing.exceptions import CohereBillingLimitError
from onyx.natural_language_processing.exceptions import ModelServerRateLimitError
from onyx.natural_language_processing.utils import get_tokenizer
from onyx.natural_language_processing.utils import tokenizer_trim_content
from onyx.utils.logger import setup_logger
from onyx.utils.search_nlp_models_utils import pass_aws_key
from onyx.utils.text_processing import remove_invalid_unicode_chars
from onyx.utils.timing import log_function_time
from shared_configs.configs import API_BASED_EMBEDDING_TIMEOUT
@@ -57,14 +53,11 @@ from shared_configs.configs import SKIP_WARM_UP
from shared_configs.configs import VERTEXAI_EMBEDDING_LOCAL_BATCH_SIZE
from shared_configs.enums import EmbeddingProvider
from shared_configs.enums import EmbedTextType
from shared_configs.enums import RerankerProvider
from shared_configs.model_server_models import Embedding
from shared_configs.model_server_models import EmbedRequest
from shared_configs.model_server_models import EmbedResponse
from shared_configs.model_server_models import IntentRequest
from shared_configs.model_server_models import IntentResponse
from shared_configs.model_server_models import RerankRequest
from shared_configs.model_server_models import RerankResponse
from shared_configs.utils import batch_list
logger = setup_logger()
@@ -570,90 +563,6 @@ class CloudEmbedding:
)
# API-based reranking functions (moved from model server)
async def cohere_rerank_api(
query: str, docs: list[str], model_name: str, api_key: str
) -> list[float]:
cohere_client = CohereAsyncClient(api_key=api_key)
try:
response = await cohere_client.rerank(
query=query, documents=docs, model=model_name
)
except ApiError as err:
if err.status_code == 402:
logger.warning(
"Cohere rerank request rejected due to billing cap. Falling back to retrieval ordering until billing resets."
)
raise CohereBillingLimitError(
"Cohere billing limit reached for reranking"
) from err
raise
results = response.results
sorted_results = sorted(results, key=lambda item: item.index)
return [result.relevance_score for result in sorted_results]
async def cohere_rerank_aws(
query: str,
docs: list[str],
model_name: str,
region_name: str,
aws_access_key_id: str,
aws_secret_access_key: str,
) -> list[float]:
session = aioboto3.Session(
aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key
)
async with session.client(
"bedrock-runtime", region_name=region_name
) as bedrock_client:
body = json.dumps(
{
"query": query,
"documents": docs,
"api_version": 2,
}
)
# Invoke the Bedrock model asynchronously
response = await bedrock_client.invoke_model(
modelId=model_name,
accept="application/json",
contentType="application/json",
body=body,
)
# Read the response asynchronously
response_body = json.loads(await response["body"].read())
# Extract and sort the results
results = response_body.get("results", [])
sorted_results = sorted(results, key=lambda item: item["index"])
return [result["relevance_score"] for result in sorted_results]
async def litellm_rerank(
query: str, docs: list[str], api_url: str, model_name: str, api_key: str | None
) -> list[float]:
headers = {} if not api_key else {"Authorization": f"Bearer {api_key}"}
async with httpx.AsyncClient() as client:
response = await client.post(
api_url,
json={
"model": model_name,
"query": query,
"documents": docs,
},
headers=headers,
)
response.raise_for_status()
result = response.json()
return [
item["relevance_score"]
for item in sorted(result["results"], key=lambda x: x["index"])
]
class EmbeddingModel:
def __init__(
self,
@@ -1027,104 +936,6 @@ class EmbeddingModel:
)
class RerankingModel:
def __init__(
self,
model_name: str,
provider_type: RerankerProvider | None,
api_key: str | None,
api_url: str | None,
model_server_host: str = MODEL_SERVER_HOST,
model_server_port: int = MODEL_SERVER_PORT,
) -> None:
self.model_name = model_name
self.provider_type = provider_type
self.api_key = api_key
self.api_url = api_url
# Only build model server endpoint for local models
if self.provider_type is None:
model_server_url = build_model_server_url(
model_server_host, model_server_port
)
self.rerank_server_endpoint: str | None = (
model_server_url + "/encoder/cross-encoder-scores"
)
else:
# API providers don't need model server endpoint
self.rerank_server_endpoint = None
async def _make_direct_rerank_call(
self, query: str, passages: list[str]
) -> list[float]:
"""Make direct API call to cloud provider, bypassing model server."""
if self.provider_type is None:
raise ValueError("Provider type is required for direct API calls")
if self.api_key is None:
raise ValueError("API key is required for cloud provider")
if self.provider_type == RerankerProvider.COHERE:
return await cohere_rerank_api(
query, passages, self.model_name, self.api_key
)
elif self.provider_type == RerankerProvider.BEDROCK:
aws_access_key_id, aws_secret_access_key, aws_region = pass_aws_key(
self.api_key
)
return await cohere_rerank_aws(
query,
passages,
self.model_name,
aws_region,
aws_access_key_id,
aws_secret_access_key,
)
elif self.provider_type == RerankerProvider.LITELLM:
if self.api_url is None:
raise ValueError("API URL is required for LiteLLM reranking.")
return await litellm_rerank(
query, passages, self.api_url, self.model_name, self.api_key
)
else:
raise ValueError(f"Unsupported reranking provider: {self.provider_type}")
def predict(self, query: str, passages: list[str]) -> list[float]:
# Route between direct API calls and model server calls
if self.provider_type is not None:
# For API providers, make direct API call
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(
self._make_direct_rerank_call(query, passages)
)
finally:
loop.close()
else:
# For local models, use model server
if self.rerank_server_endpoint is None:
raise ValueError(
"Rerank server endpoint is not configured for local models"
)
rerank_request = RerankRequest(
query=query,
documents=passages,
model_name=self.model_name,
provider_type=self.provider_type,
api_key=self.api_key,
api_url=self.api_url,
)
response = requests.post(
self.rerank_server_endpoint, json=rerank_request.model_dump()
)
response.raise_for_status()
return RerankResponse(**response.json()).scores
class QueryAnalysisModel:
def __init__(
self,
@@ -1218,39 +1029,3 @@ def warm_up_bi_encoder(
else:
retry_encode = warm_up_retry(embedding_model.encode)
retry_encode(texts=[warm_up_str], text_type=EmbedTextType.QUERY)
# No longer used
def warm_up_cross_encoder(
rerank_model_name: str,
non_blocking: bool = False,
) -> None:
if SKIP_WARM_UP:
return
logger.debug(f"Warming up reranking model: {rerank_model_name}")
reranking_model = RerankingModel(
model_name=rerank_model_name,
provider_type=None,
api_url=None,
api_key=None,
)
def _warm_up() -> None:
try:
reranking_model.predict(WARM_UP_STRINGS[0], WARM_UP_STRINGS[1:])
logger.debug(f"Warm-up complete for reranking model: {rerank_model_name}")
except Exception as e:
logger.warning(
f"Warm-up request failed for reranking model {rerank_model_name}: {e}"
)
if non_blocking:
threading.Thread(target=_warm_up, daemon=True).start()
logger.debug(
f"Started non-blocking warm-up for reranking model: {rerank_model_name}"
)
else:
retry_rerank = warm_up_retry(reranking_model.predict)
retry_rerank(WARM_UP_STRINGS[0], WARM_UP_STRINGS[1:])

View File

@@ -15,6 +15,9 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT
from onyx.configs.constants import OnyxRedisConstants
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.server.metrics.perm_sync_metrics import (
observe_doc_perm_sync_db_update_duration,
)
from onyx.utils.variable_functionality import fetch_versioned_implementation
@@ -189,7 +192,7 @@ class RedisConnectorPermissionSync:
num_permissions = 0
num_errors = 0
# Create a task for each permission sync
cumulative_db_update_time = 0.0
for permissions in new_permissions:
current_time = time.monotonic()
if lock and current_time - last_lock_time >= (
@@ -228,7 +231,9 @@ class RedisConnectorPermissionSync:
# This can internally exception due to db issues but still continue
# Catch exceptions per-element to avoid breaking the entire sync
db_start = time.monotonic()
try:
element_update_permissions_fn(
self.tenant_id,
permissions,
@@ -236,7 +241,6 @@ class RedisConnectorPermissionSync:
connector_id,
credential_id,
)
num_permissions += 1
except Exception:
num_errors += 1
@@ -249,8 +253,12 @@ class RedisConnectorPermissionSync:
task_logger.exception(
f"Failed to update permissions for element {element_id}"
)
# Continue processing other elements
finally:
cumulative_db_update_time += time.monotonic() - db_start
observe_doc_perm_sync_db_update_duration(
cumulative_db_update_time, source_string
)
return PermissionSyncResult(num_updated=num_permissions, num_errors=num_errors)
def reset(self) -> None:

View File

@@ -11,7 +11,9 @@ from sqlalchemy.orm import Session
from onyx.configs.app_configs import MAX_EMBEDDED_IMAGES_PER_FILE
from onyx.configs.app_configs import MAX_EMBEDDED_IMAGES_PER_UPLOAD
from onyx.configs.llm_configs import get_image_extraction_and_analysis_enabled
from onyx.db.llm import fetch_default_llm_model
from onyx.file_processing.extract_file_text import count_docx_embedded_images
from onyx.file_processing.extract_file_text import count_pdf_embedded_images
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_processing.extract_file_text import get_file_ext
@@ -198,6 +200,9 @@ def categorize_uploaded_files(
# rejected even if they'd individually fit under MAX_EMBEDDED_IMAGES_PER_FILE.
batch_image_total = 0
# Hoisted out of the loop to avoid a KV-store lookup per file.
image_extraction_enabled = get_image_extraction_and_analysis_enabled()
for upload in files:
try:
filename = get_safe_filename(upload)
@@ -260,28 +265,33 @@ def categorize_uploaded_files(
)
continue
# Reject PDFs with an unreasonable number of embedded images
# (either per-file or accumulated across this upload batch).
# A PDF with thousands of embedded images can OOM the
# Reject documents with an unreasonable number of embedded
# images (either per-file or accumulated across this upload
# batch). A file with thousands of embedded images can OOM the
# user-file-processing celery worker because every image is
# decoded with PIL and then sent to the vision LLM.
if extension == ".pdf":
count: int = 0
image_bearing_ext = extension in (".pdf", ".docx")
if image_bearing_ext:
file_cap = MAX_EMBEDDED_IMAGES_PER_FILE
batch_cap = MAX_EMBEDDED_IMAGES_PER_UPLOAD
# Use the larger of the two caps as the short-circuit
# threshold so we get a useful count for both checks.
# count_pdf_embedded_images restores the stream position.
count = count_pdf_embedded_images(
upload.file, max(file_cap, batch_cap)
# These helpers restore the stream position.
counter = (
count_pdf_embedded_images
if extension == ".pdf"
else count_docx_embedded_images
)
count = counter(upload.file, max(file_cap, batch_cap))
if count > file_cap:
results.rejected.append(
RejectedFile(
filename=filename,
reason=(
f"PDF contains too many embedded images "
f"(more than {file_cap}). Try splitting "
f"the document into smaller files."
f"Document contains too many embedded "
f"images (more than {file_cap}). Try "
f"splitting it into smaller files."
),
)
)
@@ -308,6 +318,21 @@ def categorize_uploaded_files(
extension=extension,
)
if not text_content:
# Documents with embedded images (e.g. scans) have no
# extractable text but can still be indexed via the
# vision-LLM captioning path when image analysis is
# enabled.
if image_bearing_ext and count > 0 and image_extraction_enabled:
results.acceptable.append(upload)
results.acceptable_file_to_token_count[filename] = 0
try:
upload.file.seek(0)
except Exception as e:
logger.warning(
f"Failed to reset file pointer for '{filename}': {str(e)}"
)
continue
logger.warning(f"No text content extracted from '{filename}'")
results.rejected.append(
RejectedFile(

View File

@@ -0,0 +1,186 @@
"""Permission-sync-specific Prometheus metrics.
Tracks doc permission sync and external group sync phases:
Doc permission sync (connector_permission_sync_generator_task):
1. Overall sync duration (source enumeration + DB updates)
2. Cumulative per-element DB update duration within update_db
3. Documents successfully synced
4. Documents with permission errors
External group sync (_perform_external_group_sync):
1. Overall sync duration
2. Cumulative batch upsert duration
3. Groups processed
4. Unique users discovered
5. Errors encountered
All metrics are labeled by connector_type to identify which connector sources
are the most expensive to sync. cc_pair_id is intentionally excluded to avoid
unbounded cardinality.
Usage:
from onyx.server.metrics.perm_sync_metrics import (
observe_doc_perm_sync_duration,
observe_doc_perm_sync_db_update_duration,
inc_doc_perm_sync_docs_processed,
inc_doc_perm_sync_errors,
observe_group_sync_duration,
observe_group_sync_upsert_duration,
inc_group_sync_groups_processed,
inc_group_sync_users_processed,
inc_group_sync_errors,
)
"""
from prometheus_client import Counter
from prometheus_client import Histogram
from onyx.utils.logger import setup_logger
logger = setup_logger()
# --- Doc permission sync metrics ---
DOC_PERM_SYNC_DURATION = Histogram(
"onyx_doc_perm_sync_duration_seconds",
"Overall duration of doc permission sync (source enumeration + DB updates)",
["connector_type"],
buckets=[5, 60, 600, 1800, 3600, 10800, 21600],
)
DOC_PERM_SYNC_DB_UPDATE_DURATION = Histogram(
"onyx_doc_perm_sync_db_update_duration_seconds",
"Cumulative per-element DB update duration within a single doc permission sync",
["connector_type"],
buckets=[0.1, 0.5, 1, 5, 15, 30, 60, 300, 600],
)
DOC_PERM_SYNC_DOCS_PROCESSED = Counter(
"onyx_doc_perm_sync_docs_processed_total",
"Total documents successfully synced during doc permission sync",
["connector_type"],
)
DOC_PERM_SYNC_ERRORS = Counter(
"onyx_doc_perm_sync_errors_total",
"Total document permission errors during doc permission sync",
["connector_type"],
)
# --- External group sync metrics ---
GROUP_SYNC_DURATION = Histogram(
"onyx_group_sync_duration_seconds",
"Overall duration of external group sync",
["connector_type"],
buckets=[5, 60, 600, 1800, 3600, 10800, 21600],
)
GROUP_SYNC_UPSERT_DURATION = Histogram(
"onyx_group_sync_upsert_duration_seconds",
"Cumulative batch upsert duration within a single external group sync",
["connector_type"],
buckets=[0.1, 0.5, 1, 5, 15, 30, 60, 300, 600],
)
GROUP_SYNC_GROUPS_PROCESSED = Counter(
"onyx_group_sync_groups_processed_total",
"Total groups processed during external group sync",
["connector_type"],
)
GROUP_SYNC_USERS_PROCESSED = Counter(
"onyx_group_sync_users_processed_total",
"Total unique users discovered during external group sync",
["connector_type"],
)
GROUP_SYNC_ERRORS = Counter(
"onyx_group_sync_errors_total",
"Total errors during external group sync",
["connector_type"],
)
# --- Doc permission sync helpers ---
def observe_doc_perm_sync_duration(
duration_seconds: float, connector_type: str
) -> None:
try:
DOC_PERM_SYNC_DURATION.labels(connector_type=connector_type).observe(
duration_seconds
)
except Exception:
logger.debug("Failed to record doc perm sync duration", exc_info=True)
def observe_doc_perm_sync_db_update_duration(
duration_seconds: float, connector_type: str
) -> None:
try:
DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(connector_type=connector_type).observe(
duration_seconds
)
except Exception:
logger.debug("Failed to record doc perm sync db update duration", exc_info=True)
def inc_doc_perm_sync_docs_processed(connector_type: str, amount: int = 1) -> None:
try:
DOC_PERM_SYNC_DOCS_PROCESSED.labels(connector_type=connector_type).inc(amount)
except Exception:
logger.debug("Failed to record doc perm sync docs processed", exc_info=True)
def inc_doc_perm_sync_errors(connector_type: str, amount: int = 1) -> None:
try:
DOC_PERM_SYNC_ERRORS.labels(connector_type=connector_type).inc(amount)
except Exception:
logger.debug("Failed to record doc perm sync errors", exc_info=True)
# --- External group sync helpers ---
def observe_group_sync_duration(duration_seconds: float, connector_type: str) -> None:
try:
GROUP_SYNC_DURATION.labels(connector_type=connector_type).observe(
duration_seconds
)
except Exception:
logger.debug("Failed to record group sync duration", exc_info=True)
def observe_group_sync_upsert_duration(
duration_seconds: float, connector_type: str
) -> None:
try:
GROUP_SYNC_UPSERT_DURATION.labels(connector_type=connector_type).observe(
duration_seconds
)
except Exception:
logger.debug("Failed to record group sync upsert duration", exc_info=True)
def inc_group_sync_groups_processed(connector_type: str, amount: int = 1) -> None:
try:
GROUP_SYNC_GROUPS_PROCESSED.labels(connector_type=connector_type).inc(amount)
except Exception:
logger.debug("Failed to record group sync groups processed", exc_info=True)
def inc_group_sync_users_processed(connector_type: str, amount: int = 1) -> None:
try:
GROUP_SYNC_USERS_PROCESSED.labels(connector_type=connector_type).inc(amount)
except Exception:
logger.debug("Failed to record group sync users processed", exc_info=True)
def inc_group_sync_errors(connector_type: str, amount: int = 1) -> None:
try:
GROUP_SYNC_ERRORS.labels(connector_type=connector_type).inc(amount)
except Exception:
logger.debug("Failed to record group sync errors", exc_info=True)

View File

@@ -80,7 +80,7 @@ class Settings(BaseModel):
query_history_type: QueryHistoryType | None = None
# Image processing settings
image_extraction_and_analysis_enabled: bool | None = False
image_extraction_and_analysis_enabled: bool | None = True
search_time_image_analysis_enabled: bool | None = False
image_analysis_max_size_mb: int | None = 20

View File

@@ -1,26 +0,0 @@
def pass_aws_key(api_key: str) -> tuple[str, str, str]:
"""Parse AWS API key string into components.
Args:
api_key: String in format 'aws_ACCESSKEY_SECRETKEY_REGION'
Returns:
Tuple of (access_key, secret_key, region)
Raises:
ValueError: If key format is invalid
"""
if not api_key.startswith("aws"):
raise ValueError("API key must start with 'aws' prefix")
parts = api_key.split("_")
if len(parts) != 4:
raise ValueError(
f"API key must be in format 'aws_ACCESSKEY_SECRETKEY_REGION', got {len(parts) - 1} parts. "
"This is an onyx specific format for formatting the aws secrets for bedrock"
)
try:
_, aws_access_key_id, aws_secret_access_key, aws_region = parts
return aws_access_key_id, aws_secret_access_key, aws_region
except Exception as e:
raise ValueError(f"Failed to parse AWS key components: {str(e)}")

View File

@@ -44,25 +44,12 @@ DOC_EMBEDDING_CONTEXT_SIZE = 512
# Used to distinguish alternative indices
ALT_INDEX_SUFFIX = "__danswer_alt_index"
# Used for loading defaults for automatic deployments and dev flows
# For local, use: mixedbread-ai/mxbai-rerank-xsmall-v1
DEFAULT_CROSS_ENCODER_MODEL_NAME = (
os.environ.get("DEFAULT_CROSS_ENCODER_MODEL_NAME") or None
)
DEFAULT_CROSS_ENCODER_API_KEY = os.environ.get("DEFAULT_CROSS_ENCODER_API_KEY") or None
DEFAULT_CROSS_ENCODER_PROVIDER_TYPE = (
os.environ.get("DEFAULT_CROSS_ENCODER_PROVIDER_TYPE") or None
)
DISABLE_RERANK_FOR_STREAMING = (
os.environ.get("DISABLE_RERANK_FOR_STREAMING", "").lower() == "true"
)
# This controls the minimum number of pytorch "threads" to allocate to the embedding
# model. If torch finds more threads on its own, this value is not used.
MIN_THREADS_ML_MODELS = int(os.environ.get("MIN_THREADS_ML_MODELS") or 1)
# Model server that has indexing only set will throw exception if used for reranking
# or intent classification
# Model server that has indexing only set will throw exception if used for intent
# classification.
INDEXING_ONLY = os.environ.get("INDEXING_ONLY", "").lower() == "true"
# The process needs to have this for the log file to write to

View File

@@ -10,12 +10,6 @@ class EmbeddingProvider(str, Enum):
AZURE = "azure"
class RerankerProvider(str, Enum):
COHERE = "cohere"
LITELLM = "litellm"
BEDROCK = "bedrock"
class EmbedTextType(str, Enum):
QUERY = "query"
PASSAGE = "passage"

View File

@@ -2,7 +2,6 @@ from pydantic import BaseModel
from shared_configs.enums import EmbeddingProvider
from shared_configs.enums import EmbedTextType
from shared_configs.enums import RerankerProvider
Embedding = list[float]
@@ -36,22 +35,6 @@ class EmbedResponse(BaseModel):
embeddings: list[Embedding]
class RerankRequest(BaseModel):
query: str
documents: list[str]
model_name: str
provider_type: RerankerProvider | None = None
api_key: str | None = None
api_url: str | None = None
# This disables the "model_" protected namespace for pydantic
model_config = {"protected_namespaces": ()}
class RerankResponse(BaseModel):
scores: list[float]
class IntentRequest(BaseModel):
query: str
# Sequence classification threshold

View File

@@ -210,7 +210,7 @@ def test_jira_doc_sync_with_specific_permissions(
assert len(docs) > 0, "Expected at least one document from SUP project"
_EXPECTED_USER_EMAILS = set(
["yuhong@onyx.app", "chris@onyx.app", "founders@onyx.app"]
["yuhong@onyx.app", "chris@onyx.app", "founders@onyx.app", "oauth@onyx.app"]
)
_EXPECTED_USER_GROUP_IDS = set(["jira-users-danswerai"])

View File

@@ -46,6 +46,7 @@ _EXPECTED_JIRA_GROUPS = [
"chris@onyx.app",
"founders@onyx.app",
"hagen@danswer.ai",
"oauth@onyx.app",
"pablo@onyx.app",
"yuhong@onyx.app",
},
@@ -56,6 +57,11 @@ _EXPECTED_JIRA_GROUPS = [
user_emails={"founders@onyx.app", "hagen@danswer.ai", "pablo@onyx.app"},
gives_anyone_access=False,
),
ExternalUserGroupSet(
id="jira-servicemanagement-users-danswerai",
user_emails={"oauth@onyx.app"},
gives_anyone_access=False,
),
ExternalUserGroupSet(
id="jira-user-access-admins-danswerai",
user_emails={"hagen@danswer.ai"},
@@ -67,6 +73,7 @@ _EXPECTED_JIRA_GROUPS = [
"chris@onyx.app",
"founders@onyx.app",
"hagen@danswer.ai",
"oauth@onyx.app",
"pablo@onyx.app",
},
gives_anyone_access=False,
@@ -76,18 +83,19 @@ _EXPECTED_JIRA_GROUPS = [
user_emails={
"chris@onyx.app",
"founders@onyx.app",
"oauth@onyx.app",
"yuhong@onyx.app",
},
gives_anyone_access=False,
),
ExternalUserGroupSet(
id="bitbucket-admins-onyxai",
user_emails={"founders@onyx.app"}, # no Oauth, we skip "app" account in jira
user_emails={"founders@onyx.app", "oauth@onyx.app"},
gives_anyone_access=False,
),
ExternalUserGroupSet(
id="bitbucket-users-onyxai",
user_emails={"founders@onyx.app"}, # no Oauth, we skip "app" account in jira
user_emails={"founders@onyx.app", "oauth@onyx.app"},
gives_anyone_access=False,
),
]

View File

@@ -0,0 +1,405 @@
"""External dependency unit tests for `index_doc_batch_prepare`.
Validates the file_id lifecycle that runs alongside the document upsert:
* `document.file_id` is written on insert AND on conflict (upsert path)
* Newly-staged files get promoted from INDEXING_STAGING -> CONNECTOR
* Replaced files are deleted from both `file_record` and S3
* No-op when the file_id is unchanged
Uses real PostgreSQL + real S3/MinIO via the file store.
"""
from collections.abc import Generator
from io import BytesIO
from uuid import uuid4
import pytest
from sqlalchemy.orm import Session
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import FileOrigin
from onyx.connectors.models import Document
from onyx.connectors.models import IndexAttemptMetadata
from onyx.connectors.models import InputType
from onyx.connectors.models import TextSection
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.file_record import get_filerecord_by_file_id_optional
from onyx.db.models import Connector
from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import Credential
from onyx.db.models import Document as DBDocument
from onyx.db.models import DocumentByConnectorCredentialPair
from onyx.db.models import FileRecord
from onyx.file_store.file_store import get_default_file_store
from onyx.indexing.indexing_pipeline import index_doc_batch_prepare
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_doc(doc_id: str, file_id: str | None = None) -> Document:
"""Minimal Document for indexing-pipeline tests. MOCK_CONNECTOR avoids
triggering the hierarchy-node linking branch (NOTION/CONFLUENCE only)."""
return Document(
id=doc_id,
source=DocumentSource.MOCK_CONNECTOR,
semantic_identifier=f"semantic-{doc_id}",
sections=[TextSection(text="content", link=None)],
metadata={},
file_id=file_id,
)
def _stage_file(content: bytes = b"raw bytes") -> str:
"""Write bytes to the file store as INDEXING_STAGING and return the file_id.
Mirrors what the connector raw_file_callback would do during fetch.
"""
return get_default_file_store().save_file(
content=BytesIO(content),
display_name=None,
file_origin=FileOrigin.INDEXING_STAGING,
file_type="application/octet-stream",
file_metadata={"test": True},
)
def _get_doc_row(db_session: Session, doc_id: str) -> DBDocument | None:
"""Reload the document row fresh from DB so we see post-upsert state."""
db_session.expire_all()
return db_session.query(DBDocument).filter(DBDocument.id == doc_id).one_or_none()
def _get_filerecord(db_session: Session, file_id: str) -> FileRecord | None:
db_session.expire_all()
return get_filerecord_by_file_id_optional(file_id=file_id, db_session=db_session)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def cc_pair(
db_session: Session,
tenant_context: None, # noqa: ARG001
initialize_file_store: None, # noqa: ARG001
) -> Generator[ConnectorCredentialPair, None, None]:
"""Create a connector + credential + cc_pair backing the index attempt.
Teardown sweeps everything the test created under this cc_pair: the
`document_by_connector_credential_pair` join rows, the `Document` rows
they point at, the `FileRecord` + blob for each doc's `file_id`, and
finally the cc_pair / connector / credential themselves. Without this,
every run would leave orphan rows in the dev DB and orphan blobs in
MinIO.
"""
connector = Connector(
name=f"test-connector-{uuid4().hex[:8]}",
source=DocumentSource.MOCK_CONNECTOR,
input_type=InputType.LOAD_STATE,
connector_specific_config={},
refresh_freq=None,
prune_freq=None,
indexing_start=None,
)
db_session.add(connector)
db_session.flush()
credential = Credential(
source=DocumentSource.MOCK_CONNECTOR,
credential_json={},
)
db_session.add(credential)
db_session.flush()
pair = ConnectorCredentialPair(
connector_id=connector.id,
credential_id=credential.id,
name=f"test-cc-pair-{uuid4().hex[:8]}",
status=ConnectorCredentialPairStatus.ACTIVE,
access_type=AccessType.PUBLIC,
auto_sync_options=None,
)
db_session.add(pair)
db_session.commit()
db_session.refresh(pair)
connector_id = pair.connector_id
credential_id = pair.credential_id
try:
yield pair
finally:
db_session.expire_all()
# Collect every doc indexed under this cc_pair so we can delete its
# file_record + blob before dropping the Document row itself.
doc_ids: list[str] = [
row[0]
for row in db_session.query(DocumentByConnectorCredentialPair.id)
.filter(
DocumentByConnectorCredentialPair.connector_id == connector_id,
DocumentByConnectorCredentialPair.credential_id == credential_id,
)
.all()
]
file_ids: list[str] = [
row[0]
for row in db_session.query(DBDocument.file_id)
.filter(DBDocument.id.in_(doc_ids), DBDocument.file_id.isnot(None))
.all()
]
file_store = get_default_file_store()
for fid in file_ids:
try:
file_store.delete_file(fid, error_on_missing=False)
except Exception:
pass
if doc_ids:
db_session.query(DocumentByConnectorCredentialPair).filter(
DocumentByConnectorCredentialPair.id.in_(doc_ids)
).delete(synchronize_session="fetch")
db_session.query(DBDocument).filter(DBDocument.id.in_(doc_ids)).delete(
synchronize_session="fetch"
)
db_session.query(ConnectorCredentialPair).filter(
ConnectorCredentialPair.id == pair.id
).delete(synchronize_session="fetch")
db_session.query(Connector).filter(Connector.id == connector_id).delete(
synchronize_session="fetch"
)
db_session.query(Credential).filter(Credential.id == credential_id).delete(
synchronize_session="fetch"
)
db_session.commit()
@pytest.fixture
def attempt_metadata(cc_pair: ConnectorCredentialPair) -> IndexAttemptMetadata:
return IndexAttemptMetadata(
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,
attempt_id=None,
request_id="test-request",
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestNewDocuments:
"""First-time inserts — no previous file_id to reconcile against."""
def test_new_doc_without_file_id(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=None)
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
row = _get_doc_row(db_session, doc.id)
assert row is not None
assert row.file_id is None
def test_new_doc_with_staged_file_id_promotes_to_connector(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
file_id = _stage_file()
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
row = _get_doc_row(db_session, doc.id)
assert row is not None and row.file_id == file_id
record = _get_filerecord(db_session, file_id)
assert record is not None
assert record.file_origin == FileOrigin.CONNECTOR
class TestExistingDocuments:
"""Re-index path — a `document` row already exists with some file_id."""
def test_unchanged_file_id_is_noop(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
file_id = _stage_file()
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
# First pass: inserts the row + promotes the file.
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
# Second pass with the same file_id — should not delete or re-promote.
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
record = _get_filerecord(db_session, file_id)
assert record is not None
assert record.file_origin == FileOrigin.CONNECTOR
row = _get_doc_row(db_session, doc.id)
assert row is not None and row.file_id == file_id
def test_swapping_file_id_promotes_new_and_deletes_old(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
old_file_id = _stage_file(content=b"old bytes")
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=old_file_id)
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
# Re-fetch produces a new staged file_id for the same doc.
new_file_id = _stage_file(content=b"new bytes")
doc_v2 = _make_doc(doc.id, file_id=new_file_id)
index_doc_batch_prepare(
documents=[doc_v2],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
row = _get_doc_row(db_session, doc.id)
assert row is not None and row.file_id == new_file_id
new_record = _get_filerecord(db_session, new_file_id)
assert new_record is not None
assert new_record.file_origin == FileOrigin.CONNECTOR
# Old file_record + S3 object are gone.
assert _get_filerecord(db_session, old_file_id) is None
def test_clearing_file_id_deletes_old_and_nulls_column(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
old_file_id = _stage_file()
doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=old_file_id)
index_doc_batch_prepare(
documents=[doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
# Connector opts out on next run — yields the doc without a file_id.
doc_v2 = _make_doc(doc.id, file_id=None)
index_doc_batch_prepare(
documents=[doc_v2],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
row = _get_doc_row(db_session, doc.id)
assert row is not None and row.file_id is None
assert _get_filerecord(db_session, old_file_id) is None
class TestBatchHandling:
"""Mixed batches — multiple docs at different lifecycle states in one call."""
def test_mixed_batch_each_doc_handled_independently(
self,
db_session: Session,
attempt_metadata: IndexAttemptMetadata,
) -> None:
# Pre-seed an existing doc with a file_id we'll swap.
existing_old_id = _stage_file(content=b"existing-old")
existing_doc = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=existing_old_id)
index_doc_batch_prepare(
documents=[existing_doc],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
# Now: swap the existing one, add a brand-new doc with file_id, and a
# brand-new doc without file_id.
swap_new_id = _stage_file(content=b"existing-new")
new_with_file_id = _stage_file(content=b"new-with-file")
existing_v2 = _make_doc(existing_doc.id, file_id=swap_new_id)
new_with = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=new_with_file_id)
new_without = _make_doc(f"doc-{uuid4().hex[:8]}", file_id=None)
index_doc_batch_prepare(
documents=[existing_v2, new_with, new_without],
index_attempt_metadata=attempt_metadata,
db_session=db_session,
ignore_time_skip=True,
)
db_session.commit()
# Existing doc was swapped: old file gone, new file promoted.
existing_row = _get_doc_row(db_session, existing_doc.id)
assert existing_row is not None and existing_row.file_id == swap_new_id
assert _get_filerecord(db_session, existing_old_id) is None
swap_record = _get_filerecord(db_session, swap_new_id)
assert swap_record is not None
assert swap_record.file_origin == FileOrigin.CONNECTOR
# New doc with file_id: row exists, file promoted.
new_with_row = _get_doc_row(db_session, new_with.id)
assert new_with_row is not None and new_with_row.file_id == new_with_file_id
new_with_record = _get_filerecord(db_session, new_with_file_id)
assert new_with_record is not None
assert new_with_record.file_origin == FileOrigin.CONNECTOR
# New doc without file_id: row exists, no file_record involvement.
new_without_row = _get_doc_row(db_session, new_without.id)
assert new_without_row is not None and new_without_row.file_id is None

View File

@@ -247,7 +247,7 @@ class DATestSettings(BaseModel):
gpu_enabled: bool | None = None
product_gating: DATestGatingType = DATestGatingType.NONE
anonymous_user_enabled: bool | None = None
image_extraction_and_analysis_enabled: bool | None = False
image_extraction_and_analysis_enabled: bool | None = True
search_time_image_analysis_enabled: bool | None = False

View File

@@ -0,0 +1,78 @@
"""Unit tests for the require-score check in verify_captcha_token."""
from unittest.mock import AsyncMock
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from onyx.auth import captcha as captcha_module
from onyx.auth.captcha import CaptchaVerificationError
from onyx.auth.captcha import verify_captcha_token
def _fake_httpx_client_returning(payload: dict) -> MagicMock:
resp = MagicMock()
resp.raise_for_status = MagicMock()
resp.json = MagicMock(return_value=payload)
client = MagicMock()
client.post = AsyncMock(return_value=resp)
client.__aenter__ = AsyncMock(return_value=client)
client.__aexit__ = AsyncMock(return_value=None)
return client
@pytest.mark.asyncio
async def test_rejects_when_score_missing() -> None:
"""Siteverify response with no score field is rejected outright —
closes the accidental 'test secret in prod' bypass path."""
client = _fake_httpx_client_returning(
{"success": True, "hostname": "testkey.google.com"}
)
with (
patch.object(captcha_module, "is_captcha_enabled", return_value=True),
patch.object(captcha_module.httpx, "AsyncClient", return_value=client),
):
with pytest.raises(CaptchaVerificationError, match="missing score"):
await verify_captcha_token("test-token", expected_action="signup")
@pytest.mark.asyncio
async def test_accepts_when_score_present_and_above_threshold() -> None:
"""Sanity check the happy path still works with the tighter score rule."""
client = _fake_httpx_client_returning(
{
"success": True,
"score": 0.9,
"action": "signup",
"hostname": "cloud.onyx.app",
}
)
with (
patch.object(captcha_module, "is_captcha_enabled", return_value=True),
patch.object(captcha_module.httpx, "AsyncClient", return_value=client),
):
# Should not raise.
await verify_captcha_token("fresh-token", expected_action="signup")
@pytest.mark.asyncio
async def test_rejects_when_score_below_threshold() -> None:
"""A score present but below threshold still rejects (existing behavior,
guarding against regression from this PR's restructure)."""
client = _fake_httpx_client_returning(
{
"success": True,
"score": 0.1,
"action": "signup",
"hostname": "cloud.onyx.app",
}
)
with (
patch.object(captcha_module, "is_captcha_enabled", return_value=True),
patch.object(captcha_module.httpx, "AsyncClient", return_value=client),
):
with pytest.raises(
CaptchaVerificationError, match="suspicious activity detected"
):
await verify_captcha_token("low-score-token", expected_action="signup")

View File

@@ -0,0 +1,194 @@
"""Unit tests for SharepointConnector site-page slim resilience and
validate_connector_settings RoleAssignments permission probe."""
from __future__ import annotations
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.sharepoint.connector import SharepointConnector
SITE_URL = "https://tenant.sharepoint.com/sites/MySite"
def _make_connector() -> SharepointConnector:
connector = SharepointConnector(sites=[SITE_URL])
connector.msal_app = MagicMock()
connector.sp_tenant_domain = "tenant"
connector._credential_json = {"sp_client_id": "x", "sp_directory_id": "y"}
connector._graph_client = MagicMock()
return connector
# ---------------------------------------------------------------------------
# _fetch_slim_documents_from_sharepoint — site page error resilience
# ---------------------------------------------------------------------------
@patch("onyx.connectors.sharepoint.connector._convert_sitepage_to_slim_document")
@patch(
"onyx.connectors.sharepoint.connector.SharepointConnector._create_rest_client_context"
)
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_site_pages")
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_driveitems")
@patch("onyx.connectors.sharepoint.connector.SharepointConnector.fetch_sites")
def test_site_page_error_does_not_crash(
mock_fetch_sites: MagicMock,
mock_fetch_driveitems: MagicMock,
mock_fetch_site_pages: MagicMock,
_mock_create_ctx: MagicMock,
mock_convert: MagicMock,
) -> None:
"""A 401 (or any exception) on a site page is caught; remaining pages are processed."""
from onyx.connectors.models import SlimDocument
connector = _make_connector()
connector.include_site_documents = False
connector.include_site_pages = True
site = MagicMock()
site.url = SITE_URL
mock_fetch_sites.return_value = [site]
mock_fetch_driveitems.return_value = iter([])
page_ok = {"id": "1", "webUrl": SITE_URL + "/SitePages/Good.aspx"}
page_bad = {"id": "2", "webUrl": SITE_URL + "/SitePages/Bad.aspx"}
mock_fetch_site_pages.return_value = [page_bad, page_ok]
good_slim = SlimDocument(id="1")
def _convert_side_effect(
page: dict, *_args: object, **_kwargs: object
) -> SlimDocument: # noqa: ANN001
if page["id"] == "2":
from office365.runtime.client_request import ClientRequestException
raise ClientRequestException(MagicMock(status_code=401), None)
return good_slim
mock_convert.side_effect = _convert_side_effect
results = [
doc
for batch in connector._fetch_slim_documents_from_sharepoint()
for doc in batch
if isinstance(doc, SlimDocument)
]
# Only the good page makes it through; bad page is skipped, no exception raised.
assert any(d.id == "1" for d in results)
assert not any(d.id == "2" for d in results)
@patch("onyx.connectors.sharepoint.connector._convert_sitepage_to_slim_document")
@patch(
"onyx.connectors.sharepoint.connector.SharepointConnector._create_rest_client_context"
)
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_site_pages")
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_driveitems")
@patch("onyx.connectors.sharepoint.connector.SharepointConnector.fetch_sites")
def test_all_site_pages_fail_does_not_crash(
mock_fetch_sites: MagicMock,
mock_fetch_driveitems: MagicMock,
mock_fetch_site_pages: MagicMock,
_mock_create_ctx: MagicMock,
mock_convert: MagicMock,
) -> None:
"""When every site page fails, the generator completes without raising."""
connector = _make_connector()
connector.include_site_documents = False
connector.include_site_pages = True
site = MagicMock()
site.url = SITE_URL
mock_fetch_sites.return_value = [site]
mock_fetch_driveitems.return_value = iter([])
mock_fetch_site_pages.return_value = [
{"id": "1", "webUrl": SITE_URL + "/SitePages/A.aspx"},
{"id": "2", "webUrl": SITE_URL + "/SitePages/B.aspx"},
]
mock_convert.side_effect = RuntimeError("context error")
from onyx.connectors.models import SlimDocument
# Should not raise; no SlimDocuments in output (only hierarchy nodes).
slim_results = [
doc
for batch in connector._fetch_slim_documents_from_sharepoint()
for doc in batch
if isinstance(doc, SlimDocument)
]
assert slim_results == []
# ---------------------------------------------------------------------------
# validate_connector_settings — RoleAssignments permission probe
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("status_code", [401, 403])
@patch("onyx.connectors.sharepoint.connector.requests.get")
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
def test_validate_raises_on_401_or_403(
mock_acquire: MagicMock,
_mock_validate_url: MagicMock,
mock_get: MagicMock,
status_code: int,
) -> None:
"""validate_connector_settings raises ConnectorValidationError when probe returns 401 or 403."""
mock_acquire.return_value = MagicMock(accessToken="tok")
mock_get.return_value = MagicMock(status_code=status_code)
connector = _make_connector()
with pytest.raises(ConnectorValidationError, match="Sites.FullControl.All"):
connector.validate_connector_settings()
@patch("onyx.connectors.sharepoint.connector.requests.get")
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
def test_validate_passes_on_200(
mock_acquire: MagicMock,
_mock_validate_url: MagicMock,
mock_get: MagicMock,
) -> None:
"""validate_connector_settings does not raise when probe returns 200."""
mock_acquire.return_value = MagicMock(accessToken="tok")
mock_get.return_value = MagicMock(status_code=200)
connector = _make_connector()
connector.validate_connector_settings() # should not raise
@patch("onyx.connectors.sharepoint.connector.requests.get")
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
def test_validate_passes_on_network_error(
mock_acquire: MagicMock,
_mock_validate_url: MagicMock,
mock_get: MagicMock,
) -> None:
"""Network errors during the probe are non-blocking (logged as warning only)."""
mock_acquire.return_value = MagicMock(accessToken="tok")
mock_get.side_effect = Exception("timeout")
connector = _make_connector()
connector.validate_connector_settings() # should not raise
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
def test_validate_skips_probe_without_credentials(
mock_acquire: MagicMock,
_mock_validate_url: MagicMock,
) -> None:
"""Probe is skipped when credentials have not been loaded."""
connector = SharepointConnector(sites=[SITE_URL])
# msal_app and sp_tenant_domain are None — probe must be skipped.
connector.validate_connector_settings() # should not raise
mock_acquire.assert_not_called()

View File

@@ -0,0 +1,319 @@
"""Tests for permission-sync-specific Prometheus metrics."""
import pytest
from onyx.server.metrics.perm_sync_metrics import DOC_PERM_SYNC_DB_UPDATE_DURATION
from onyx.server.metrics.perm_sync_metrics import DOC_PERM_SYNC_DOCS_PROCESSED
from onyx.server.metrics.perm_sync_metrics import DOC_PERM_SYNC_DURATION
from onyx.server.metrics.perm_sync_metrics import DOC_PERM_SYNC_ERRORS
from onyx.server.metrics.perm_sync_metrics import GROUP_SYNC_DURATION
from onyx.server.metrics.perm_sync_metrics import GROUP_SYNC_ERRORS
from onyx.server.metrics.perm_sync_metrics import GROUP_SYNC_GROUPS_PROCESSED
from onyx.server.metrics.perm_sync_metrics import GROUP_SYNC_UPSERT_DURATION
from onyx.server.metrics.perm_sync_metrics import GROUP_SYNC_USERS_PROCESSED
from onyx.server.metrics.perm_sync_metrics import inc_doc_perm_sync_docs_processed
from onyx.server.metrics.perm_sync_metrics import inc_doc_perm_sync_errors
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_errors
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_groups_processed
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_users_processed
from onyx.server.metrics.perm_sync_metrics import (
observe_doc_perm_sync_db_update_duration,
)
from onyx.server.metrics.perm_sync_metrics import observe_doc_perm_sync_duration
from onyx.server.metrics.perm_sync_metrics import observe_group_sync_duration
from onyx.server.metrics.perm_sync_metrics import observe_group_sync_upsert_duration
# --- Doc permission sync: overall duration ---
class TestObserveDocPermSyncDuration:
def test_observes_duration(self) -> None:
before = DOC_PERM_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
observe_doc_perm_sync_duration(10.0, "google_drive")
after = DOC_PERM_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
assert after == pytest.approx(before + 10.0)
def test_labels_by_connector_type(self) -> None:
before_gd = DOC_PERM_SYNC_DURATION.labels(
connector_type="google_drive"
)._sum.get()
before_conf = DOC_PERM_SYNC_DURATION.labels(
connector_type="confluence"
)._sum.get()
observe_doc_perm_sync_duration(5.0, "google_drive")
after_gd = DOC_PERM_SYNC_DURATION.labels(
connector_type="google_drive"
)._sum.get()
after_conf = DOC_PERM_SYNC_DURATION.labels(
connector_type="confluence"
)._sum.get()
assert after_gd == pytest.approx(before_gd + 5.0)
assert after_conf == pytest.approx(before_conf)
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
DOC_PERM_SYNC_DURATION,
"labels",
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
)
observe_doc_perm_sync_duration(1.0, "google_drive")
# --- Doc permission sync: DB update duration ---
class TestObserveDocPermSyncDbUpdateDuration:
def test_observes_duration(self) -> None:
before = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
connector_type="confluence"
)._sum.get()
observe_doc_perm_sync_db_update_duration(3.0, "confluence")
after = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
connector_type="confluence"
)._sum.get()
assert after == pytest.approx(before + 3.0)
def test_labels_by_connector_type(self) -> None:
before_conf = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
connector_type="confluence"
)._sum.get()
before_slack = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
connector_type="slack"
)._sum.get()
observe_doc_perm_sync_db_update_duration(2.0, "confluence")
after_conf = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
connector_type="confluence"
)._sum.get()
after_slack = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
connector_type="slack"
)._sum.get()
assert after_conf == pytest.approx(before_conf + 2.0)
assert after_slack == pytest.approx(before_slack)
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
DOC_PERM_SYNC_DB_UPDATE_DURATION,
"labels",
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
)
observe_doc_perm_sync_db_update_duration(1.0, "confluence")
# --- Doc permission sync: docs processed counter ---
class TestIncDocPermSyncDocsProcessed:
def test_increments_counter(self) -> None:
before = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
connector_type="google_drive"
)._value.get()
inc_doc_perm_sync_docs_processed("google_drive", 5)
after = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
connector_type="google_drive"
)._value.get()
assert after == before + 5
def test_labels_by_connector_type(self) -> None:
before_gd = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
connector_type="google_drive"
)._value.get()
before_jira = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
connector_type="jira"
)._value.get()
inc_doc_perm_sync_docs_processed("google_drive", 3)
after_gd = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
connector_type="google_drive"
)._value.get()
after_jira = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
connector_type="jira"
)._value.get()
assert after_gd == before_gd + 3
assert after_jira == before_jira
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
DOC_PERM_SYNC_DOCS_PROCESSED,
"labels",
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
)
inc_doc_perm_sync_docs_processed("google_drive")
# --- Doc permission sync: errors counter ---
class TestIncDocPermSyncErrors:
def test_increments_counter(self) -> None:
before = DOC_PERM_SYNC_ERRORS.labels(connector_type="sharepoint")._value.get()
inc_doc_perm_sync_errors("sharepoint", 2)
after = DOC_PERM_SYNC_ERRORS.labels(connector_type="sharepoint")._value.get()
assert after == before + 2
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
DOC_PERM_SYNC_ERRORS,
"labels",
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
)
inc_doc_perm_sync_errors("sharepoint")
# --- Group sync: overall duration ---
class TestObserveGroupSyncDuration:
def test_observes_duration(self) -> None:
before = GROUP_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
observe_group_sync_duration(20.0, "google_drive")
after = GROUP_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
assert after == pytest.approx(before + 20.0)
def test_labels_by_connector_type(self) -> None:
before_gd = GROUP_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
before_slack = GROUP_SYNC_DURATION.labels(connector_type="slack")._sum.get()
observe_group_sync_duration(7.0, "google_drive")
after_gd = GROUP_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
after_slack = GROUP_SYNC_DURATION.labels(connector_type="slack")._sum.get()
assert after_gd == pytest.approx(before_gd + 7.0)
assert after_slack == pytest.approx(before_slack)
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
GROUP_SYNC_DURATION,
"labels",
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
)
observe_group_sync_duration(1.0, "google_drive")
# --- Group sync: upsert duration ---
class TestObserveGroupSyncUpsertDuration:
def test_observes_duration(self) -> None:
before = GROUP_SYNC_UPSERT_DURATION.labels(
connector_type="confluence"
)._sum.get()
observe_group_sync_upsert_duration(4.0, "confluence")
after = GROUP_SYNC_UPSERT_DURATION.labels(
connector_type="confluence"
)._sum.get()
assert after == pytest.approx(before + 4.0)
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
GROUP_SYNC_UPSERT_DURATION,
"labels",
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
)
observe_group_sync_upsert_duration(1.0, "confluence")
# --- Group sync: groups processed counter ---
class TestIncGroupSyncGroupsProcessed:
def test_increments_counter(self) -> None:
before = GROUP_SYNC_GROUPS_PROCESSED.labels(
connector_type="github"
)._value.get()
inc_group_sync_groups_processed("github", 10)
after = GROUP_SYNC_GROUPS_PROCESSED.labels(connector_type="github")._value.get()
assert after == before + 10
def test_labels_by_connector_type(self) -> None:
before_gh = GROUP_SYNC_GROUPS_PROCESSED.labels(
connector_type="github"
)._value.get()
before_slack = GROUP_SYNC_GROUPS_PROCESSED.labels(
connector_type="slack"
)._value.get()
inc_group_sync_groups_processed("github", 4)
after_gh = GROUP_SYNC_GROUPS_PROCESSED.labels(
connector_type="github"
)._value.get()
after_slack = GROUP_SYNC_GROUPS_PROCESSED.labels(
connector_type="slack"
)._value.get()
assert after_gh == before_gh + 4
assert after_slack == before_slack
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
GROUP_SYNC_GROUPS_PROCESSED,
"labels",
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
)
inc_group_sync_groups_processed("github")
# --- Group sync: users processed counter ---
class TestIncGroupSyncUsersProcessed:
def test_increments_counter(self) -> None:
before = GROUP_SYNC_USERS_PROCESSED.labels(connector_type="github")._value.get()
inc_group_sync_users_processed("github", 25)
after = GROUP_SYNC_USERS_PROCESSED.labels(connector_type="github")._value.get()
assert after == before + 25
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
GROUP_SYNC_USERS_PROCESSED,
"labels",
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
)
inc_group_sync_users_processed("github")
# --- Group sync: errors counter ---
class TestIncGroupSyncErrors:
def test_increments_counter(self) -> None:
before = GROUP_SYNC_ERRORS.labels(connector_type="sharepoint")._value.get()
inc_group_sync_errors("sharepoint")
after = GROUP_SYNC_ERRORS.labels(connector_type="sharepoint")._value.get()
assert after == before + 1
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
GROUP_SYNC_ERRORS,
"labels",
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
)
inc_group_sync_errors("sharepoint")

View File

@@ -77,7 +77,6 @@ services:
- DOC_EMBEDDING_DIM=${DOC_EMBEDDING_DIM:-}
- NORMALIZE_EMBEDDINGS=${NORMALIZE_EMBEDDINGS:-}
- ASYM_QUERY_PREFIX=${ASYM_QUERY_PREFIX:-}
- DISABLE_RERANK_FOR_STREAMING=${DISABLE_RERANK_FOR_STREAMING:-}
- MODEL_SERVER_HOST=${MODEL_SERVER_HOST:-inference_model_server}
- MODEL_SERVER_PORT=${MODEL_SERVER_PORT:-}
- CODE_INTERPRETER_BASE_URL=${CODE_INTERPRETER_BASE_URL:-http://code-interpreter:8000}

View File

@@ -193,7 +193,6 @@ LOG_ONYX_MODEL_INTERACTIONS=False
# NORMALIZE_EMBEDDINGS=
# ASYM_QUERY_PREFIX=
# ASYM_PASSAGE_PREFIX=
# DISABLE_RERANK_FOR_STREAMING=
# MODEL_SERVER_PORT=
# INDEX_BATCH_SIZE=
# MIN_THREADS_ML_MODELS=

View File

@@ -1274,7 +1274,6 @@ configMap:
NORMALIZE_EMBEDDINGS: ""
ASYM_QUERY_PREFIX: ""
ASYM_PASSAGE_PREFIX: ""
DISABLE_RERANK_FOR_STREAMING: ""
MODEL_SERVER_PORT: ""
MIN_THREADS_ML_MODELS: ""
# Indexing Configs

View File

@@ -18,15 +18,18 @@ docker compose up -d
- **Onyx DB Pool Health** — PostgreSQL connection pool utilization
- **Onyx Indexing Pipeline v2** — Per-connector indexing throughput, queue depth, task latency
- **Onyx Permission Sync** — Doc permission sync and external group sync duration, throughput, errors, and Celery task metrics
## Scrape targets
| Job | Port | Source |
|--------------------------|-------|-------------------------------|
| `onyx-api-server` | 8080 | FastAPI `/metrics` (matches `.vscode/launch.json`) |
| `onyx-monitoring-worker` | 9096 | Celery monitoring worker |
| `onyx-docfetching-worker`| 9092 | Celery docfetching worker |
| `onyx-docprocessing-worker`| 9093 | Celery docprocessing worker |
| Job | Port | Source |
|----------------------------|-------|-------------------------------|
| `onyx-api-server` | 8080 | FastAPI `/metrics` (matches `.vscode/launch.json`) |
| `onyx-monitoring-worker` | 9096 | Celery monitoring worker |
| `onyx-docfetching-worker` | 9092 | Celery docfetching worker |
| `onyx-docprocessing-worker`| 9093 | Celery docprocessing worker |
| `onyx-heavy-worker` | 9094 | Celery heavy worker (pruning, perm sync, group sync) |
| `onyx-light-worker` | 9095 | Celery light worker (vespa sync, deletion, permissions upsert) |
## Environment variables

View File

@@ -0,0 +1,697 @@
{
"id": null,
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 1,
"links": [],
"panels": [
{
"collapsed": false,
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 0 },
"id": 100,
"title": "Doc Permission Sync",
"type": "row"
},
{
"title": "Doc Perm Sync Duration (p50 / p95 / p99)",
"description": "Overall duration of doc permission sync by connector type",
"type": "timeseries",
"id": 1,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 1 },
"fieldConfig": {
"defaults": {
"unit": "s",
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"fillOpacity": 10,
"pointSize": 5,
"showPoints": "auto",
"spanNulls": false,
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "histogram_quantile(0.50, sum(rate(onyx_doc_perm_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p50",
"refId": "A"
},
{
"expr": "histogram_quantile(0.95, sum(rate(onyx_doc_perm_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p95",
"refId": "B"
},
{
"expr": "histogram_quantile(0.99, sum(rate(onyx_doc_perm_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p99",
"refId": "C"
}
]
},
{
"title": "Doc Perm Sync DB Update Duration (p50 / p95 / p99)",
"description": "Cumulative per-element DB update time within a single sync run",
"type": "timeseries",
"id": 2,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 1 },
"fieldConfig": {
"defaults": {
"unit": "s",
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"fillOpacity": 10,
"pointSize": 5,
"showPoints": "auto",
"spanNulls": false,
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "histogram_quantile(0.50, sum(rate(onyx_doc_perm_sync_db_update_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p50",
"refId": "A"
},
{
"expr": "histogram_quantile(0.95, sum(rate(onyx_doc_perm_sync_db_update_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p95",
"refId": "B"
},
{
"expr": "histogram_quantile(0.99, sum(rate(onyx_doc_perm_sync_db_update_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p99",
"refId": "C"
}
]
},
{
"title": "Doc Perm Sync Throughput (docs/min)",
"description": "Rate of documents successfully synced per minute",
"type": "timeseries",
"id": 3,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 8, "x": 0, "y": 9 },
"fieldConfig": {
"defaults": {
"unit": "docs/min",
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"fillOpacity": 18,
"pointSize": 5,
"showPoints": "auto",
"spanNulls": false,
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "sum(rate(onyx_doc_perm_sync_docs_processed_total{connector_type=~\"$connector_type\"}[$__rate_interval])) by (connector_type) * 60",
"legendFormat": "{{connector_type}}",
"refId": "A"
}
]
},
{
"title": "Doc Perm Sync Error Rate",
"description": "Rate of document permission errors per minute",
"type": "timeseries",
"id": 4,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 8, "x": 8, "y": 9 },
"fieldConfig": {
"defaults": {
"unit": "errors/min",
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"fillOpacity": 18,
"pointSize": 5,
"showPoints": "auto",
"spanNulls": false,
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "sum(rate(onyx_doc_perm_sync_errors_total{connector_type=~\"$connector_type\"}[$__rate_interval])) by (connector_type) * 60",
"legendFormat": "{{connector_type}}",
"refId": "A"
}
]
},
{
"title": "Doc Perm Sync Totals",
"description": "Cumulative docs synced and errors",
"type": "stat",
"id": 5,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 8, "x": 16, "y": 9 },
"fieldConfig": {
"defaults": {
"color": { "mode": "thresholds" },
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null }
]
}
},
"overrides": [
{
"matcher": { "id": "byName", "options": "Errors" },
"properties": [
{
"id": "thresholds",
"value": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "red", "value": 1 }
]
}
}
]
}
]
},
"options": {
"reduceOptions": { "calcs": ["lastNotNull"] },
"orientation": "horizontal",
"textMode": "auto",
"colorMode": "value",
"graphMode": "area"
},
"targets": [
{
"expr": "sum(onyx_doc_perm_sync_docs_processed_total{connector_type=~\"$connector_type\"})",
"legendFormat": "Docs Synced",
"refId": "A"
},
{
"expr": "sum(onyx_doc_perm_sync_errors_total{connector_type=~\"$connector_type\"})",
"legendFormat": "Errors",
"refId": "B"
}
]
},
{
"collapsed": false,
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 17 },
"id": 101,
"title": "External Group Sync",
"type": "row"
},
{
"title": "Group Sync Duration (p50 / p95 / p99)",
"description": "Overall duration of external group sync by connector type",
"type": "timeseries",
"id": 6,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 18 },
"fieldConfig": {
"defaults": {
"unit": "s",
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"fillOpacity": 10,
"pointSize": 5,
"showPoints": "auto",
"spanNulls": false,
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "histogram_quantile(0.50, sum(rate(onyx_group_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p50",
"refId": "A"
},
{
"expr": "histogram_quantile(0.95, sum(rate(onyx_group_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p95",
"refId": "B"
},
{
"expr": "histogram_quantile(0.99, sum(rate(onyx_group_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p99",
"refId": "C"
}
]
},
{
"title": "Group Sync Upsert Duration (p50 / p95 / p99)",
"description": "Cumulative batch upsert time within a single group sync run",
"type": "timeseries",
"id": 7,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 18 },
"fieldConfig": {
"defaults": {
"unit": "s",
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"fillOpacity": 10,
"pointSize": 5,
"showPoints": "auto",
"spanNulls": false,
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "histogram_quantile(0.50, sum(rate(onyx_group_sync_upsert_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p50",
"refId": "A"
},
{
"expr": "histogram_quantile(0.95, sum(rate(onyx_group_sync_upsert_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p95",
"refId": "B"
},
{
"expr": "histogram_quantile(0.99, sum(rate(onyx_group_sync_upsert_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
"legendFormat": "{{connector_type}} p99",
"refId": "C"
}
]
},
{
"title": "Group Sync Throughput (groups/min)",
"description": "Rate of groups processed per minute",
"type": "timeseries",
"id": 8,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 8, "x": 0, "y": 26 },
"fieldConfig": {
"defaults": {
"unit": "groups/min",
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"fillOpacity": 18,
"pointSize": 5,
"showPoints": "auto",
"spanNulls": false,
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "sum(rate(onyx_group_sync_groups_processed_total{connector_type=~\"$connector_type\"}[$__rate_interval])) by (connector_type) * 60",
"legendFormat": "{{connector_type}} groups",
"refId": "A"
},
{
"expr": "sum(rate(onyx_group_sync_users_processed_total{connector_type=~\"$connector_type\"}[$__rate_interval])) by (connector_type) * 60",
"legendFormat": "{{connector_type}} users",
"refId": "B"
}
]
},
{
"title": "Group Sync Error Rate",
"description": "Rate of errors during external group sync per minute",
"type": "timeseries",
"id": 9,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 8, "x": 8, "y": 26 },
"fieldConfig": {
"defaults": {
"unit": "errors/min",
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"fillOpacity": 18,
"pointSize": 5,
"showPoints": "auto",
"spanNulls": false,
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "sum(rate(onyx_group_sync_errors_total{connector_type=~\"$connector_type\"}[$__rate_interval])) by (connector_type) * 60",
"legendFormat": "{{connector_type}}",
"refId": "A"
}
]
},
{
"title": "Group Sync Totals",
"description": "Cumulative groups, users processed and errors",
"type": "stat",
"id": 10,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 8, "x": 16, "y": 26 },
"fieldConfig": {
"defaults": {
"color": { "mode": "thresholds" },
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null }
]
}
},
"overrides": [
{
"matcher": { "id": "byName", "options": "Errors" },
"properties": [
{
"id": "thresholds",
"value": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "red", "value": 1 }
]
}
}
]
}
]
},
"options": {
"reduceOptions": { "calcs": ["lastNotNull"] },
"orientation": "horizontal",
"textMode": "auto",
"colorMode": "value",
"graphMode": "area"
},
"targets": [
{
"expr": "sum(onyx_group_sync_groups_processed_total{connector_type=~\"$connector_type\"})",
"legendFormat": "Groups",
"refId": "A"
},
{
"expr": "sum(onyx_group_sync_users_processed_total{connector_type=~\"$connector_type\"})",
"legendFormat": "Users",
"refId": "B"
},
{
"expr": "sum(onyx_group_sync_errors_total{connector_type=~\"$connector_type\"})",
"legendFormat": "Errors",
"refId": "C"
}
]
},
{
"collapsed": false,
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 34 },
"id": 102,
"title": "Celery Task Metrics (Perm Sync Tasks)",
"type": "row"
},
{
"title": "Perm Sync Celery Task Duration (p95)",
"description": "Task execution duration for permission sync celery tasks",
"type": "timeseries",
"id": 11,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 35 },
"fieldConfig": {
"defaults": {
"unit": "s",
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"fillOpacity": 10,
"pointSize": 5,
"showPoints": "auto",
"spanNulls": false,
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "histogram_quantile(0.95, sum(rate(onyx_celery_task_duration_seconds_bucket{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task|check_for_doc_permissions_sync|check_for_external_group_sync\"}[$__rate_interval])) by (le, task_name))",
"legendFormat": "{{task_name}} p95",
"refId": "A"
}
]
},
{
"title": "Perm Sync Celery Task Outcomes",
"description": "Success vs failure counts for permission sync celery tasks",
"type": "timeseries",
"id": 12,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 35 },
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "bars",
"lineInterpolation": "linear",
"fillOpacity": 50,
"pointSize": 5,
"showPoints": "never",
"spanNulls": false,
"stacking": { "mode": "normal", "group": "A" },
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["sum"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "sum(increase(onyx_celery_task_completed_total{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\", outcome=\"success\"}[$__rate_interval])) by (task_name)",
"legendFormat": "{{task_name}} success",
"refId": "A"
},
{
"expr": "sum(increase(onyx_celery_task_completed_total{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\", outcome=\"failure\"}[$__rate_interval])) by (task_name)",
"legendFormat": "{{task_name}} failure",
"refId": "B"
}
]
},
{
"title": "Perm Sync Celery Revoked / Retried / Rejected",
"description": "Revocation, retry, and rejection counts for perm sync tasks",
"type": "timeseries",
"id": 13,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 43 },
"fieldConfig": {
"defaults": {
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "bars",
"lineInterpolation": "linear",
"fillOpacity": 50,
"pointSize": 5,
"showPoints": "never",
"spanNulls": false,
"stacking": { "mode": "normal", "group": "A" },
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["sum"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "sum(increase(onyx_celery_task_revoked_total{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\"}[$__rate_interval])) by (task_name)",
"legendFormat": "{{task_name}} revoked",
"refId": "A"
},
{
"expr": "sum(increase(onyx_celery_task_retried_total{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\"}[$__rate_interval])) by (task_name)",
"legendFormat": "{{task_name}} retried",
"refId": "B"
},
{
"expr": "sum(increase(onyx_celery_task_rejected_total{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\"}[$__rate_interval])) by (task_name)",
"legendFormat": "{{task_name}} rejected",
"refId": "C"
}
]
},
{
"title": "Perm Sync Celery Queue Wait Time (p95)",
"description": "Time perm sync tasks waited in queue before execution",
"type": "timeseries",
"id": 14,
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 43 },
"fieldConfig": {
"defaults": {
"unit": "s",
"color": { "mode": "palette-classic" },
"custom": {
"drawStyle": "line",
"lineInterpolation": "linear",
"fillOpacity": 10,
"pointSize": 5,
"showPoints": "auto",
"spanNulls": false,
"axisBorderShow": false,
"axisPlacement": "auto"
}
},
"overrides": []
},
"options": {
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
"tooltip": { "mode": "multi", "sort": "desc" }
},
"targets": [
{
"expr": "histogram_quantile(0.95, sum(rate(onyx_celery_task_queue_wait_seconds_bucket{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\"}[$__rate_interval])) by (le, task_name))",
"legendFormat": "{{task_name}} p95",
"refId": "A"
}
]
}
],
"schemaVersion": 39,
"tags": ["onyx", "permissions", "sync"],
"templating": {
"list": [
{
"current": { "text": "Prometheus", "value": "prometheus" },
"includeAll": false,
"name": "DS_PROMETHEUS",
"options": [],
"query": "prometheus",
"refresh": 1,
"type": "datasource"
},
{
"allValue": ".*",
"current": { "selected": true, "text": "All", "value": "$__all" },
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
"definition": "label_values(onyx_doc_perm_sync_duration_seconds_bucket, connector_type)",
"includeAll": true,
"label": "Connector Type",
"multi": true,
"name": "connector_type",
"options": [],
"query": {
"query": "label_values(onyx_doc_perm_sync_duration_seconds_bucket, connector_type)",
"refId": "StandardVariableQuery"
},
"refresh": 2,
"regex": "",
"sort": 1,
"type": "query"
}
]
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "Onyx Permission Sync",
"uid": "onyx-permission-sync",
"version": 1,
"weekStart": ""
}

View File

@@ -34,3 +34,15 @@ scrape_configs:
metrics_path: /metrics
static_configs:
- targets: ['host.docker.internal:9093']
- job_name: 'onyx-heavy-worker'
scrape_interval: 5s
metrics_path: /metrics
static_configs:
- targets: ['host.docker.internal:9094']
- job_name: 'onyx-light-worker'
scrape_interval: 5s
metrics_path: /metrics
static_configs:
- targets: ['host.docker.internal:9095']

View File

@@ -996,6 +996,23 @@ export default function ChatPreferencesPage() {
</InputVertical>
</Card>
<Card border="solid" rounding="lg">
<InputHorizontal
title="Image Extraction & Analysis"
description="Extract embedded images from uploaded files (PDFs, DOCX, etc.) and summarize them with a vision-capable LLM so image-only documents become searchable and answerable. Requires a vision-capable default LLM."
withLabel
>
<Switch
checked={s.image_extraction_and_analysis_enabled ?? true}
onCheckedChange={(checked) => {
void saveSettings({
image_extraction_and_analysis_enabled: checked,
});
}}
/>
</InputHorizontal>
</Card>
<Card border="solid" rounding="lg">
<Section>
<InputHorizontal