mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-04-21 09:26:44 +00:00
Compare commits
3 Commits
dane/infer
...
release/v3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e49b4ca2ae | ||
|
|
eefe0a0146 | ||
|
|
b57acbfbf0 |
@@ -16,13 +16,9 @@
|
||||
"source=onyx-devcontainer-local,target=/home/dev/.local,type=volume"
|
||||
],
|
||||
"containerEnv": {
|
||||
"MODEL_SERVER_HOST": "inference_model_server",
|
||||
"OPENSEARCH_HOST": "opensearch",
|
||||
"POSTGRES_HOST": "relational_db",
|
||||
"REDIS_HOST": "cache",
|
||||
"S3_ENDPOINT_URL": "http://minio:9000",
|
||||
"SSH_AUTH_SOCK": "/tmp/ssh-agent.sock",
|
||||
"VESPA_HOST": "index"
|
||||
"POSTGRES_HOST": "relational_db",
|
||||
"REDIS_HOST": "cache"
|
||||
},
|
||||
"remoteUser": "${localEnv:DEVCONTAINER_REMOTE_USER:dev}",
|
||||
"updateRemoteUserUID": false,
|
||||
|
||||
@@ -40,7 +40,6 @@ ALLOWED_DOMAINS=(
|
||||
"api.anthropic.com"
|
||||
"api-staging.anthropic.com"
|
||||
"files.anthropic.com"
|
||||
"huggingface.co"
|
||||
"sentry.io"
|
||||
"update.code.visualstudio.com"
|
||||
"pypi.org"
|
||||
|
||||
2
.github/workflows/deployment.yml
vendored
2
.github/workflows/deployment.yml
vendored
@@ -403,7 +403,7 @@ jobs:
|
||||
echo "CERT_ID=$CERT_ID" >> $GITHUB_ENV
|
||||
echo "Certificate imported."
|
||||
|
||||
- uses: tauri-apps/tauri-action@84b9d35b5fc46c1e45415bdb6144030364f7ebc5 # ratchet:tauri-apps/tauri-action@action-v0.6.2
|
||||
- uses: tauri-apps/tauri-action@73fb865345c54760d875b94642314f8c0c894afa # ratchet:tauri-apps/tauri-action@action-v0.6.1
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
APPLE_ID: ${{ env.APPLE_ID }}
|
||||
|
||||
@@ -42,7 +42,6 @@ env:
|
||||
CONFLUENCE_ACCESS_TOKEN_SCOPED: ${{ secrets.CONFLUENCE_ACCESS_TOKEN_SCOPED }}
|
||||
|
||||
# Jira
|
||||
JIRA_ADMIN_USER_EMAIL: ${{ vars.JIRA_ADMIN_USER_EMAIL }}
|
||||
JIRA_ADMIN_API_TOKEN: ${{ secrets.JIRA_ADMIN_API_TOKEN }}
|
||||
|
||||
# LLMs
|
||||
|
||||
2
.github/workflows/pr-golang-tests.yml
vendored
2
.github/workflows/pr-golang-tests.yml
vendored
@@ -42,7 +42,7 @@ jobs:
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
|
||||
with:
|
||||
persist-credentials: false
|
||||
- uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # zizmor: ignore[cache-poisoning]
|
||||
- uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # zizmor: ignore[cache-poisoning]
|
||||
with:
|
||||
go-version: ${{ env.GO_VERSION }}
|
||||
cache-dependency-path: "**/go.sum"
|
||||
|
||||
2
.github/workflows/pr-integration-tests.yml
vendored
2
.github/workflows/pr-integration-tests.yml
vendored
@@ -26,7 +26,6 @@ env:
|
||||
CONFLUENCE_ACCESS_TOKEN: ${{ secrets.CONFLUENCE_ACCESS_TOKEN }}
|
||||
CONFLUENCE_ACCESS_TOKEN_SCOPED: ${{ secrets.CONFLUENCE_ACCESS_TOKEN_SCOPED }}
|
||||
JIRA_BASE_URL: ${{ secrets.JIRA_BASE_URL }}
|
||||
JIRA_ADMIN_USER_EMAIL: ${{ vars.JIRA_ADMIN_USER_EMAIL }}
|
||||
JIRA_USER_EMAIL: ${{ secrets.JIRA_USER_EMAIL }}
|
||||
JIRA_API_TOKEN: ${{ secrets.JIRA_API_TOKEN }}
|
||||
JIRA_API_TOKEN_SCOPED: ${{ secrets.JIRA_API_TOKEN_SCOPED }}
|
||||
@@ -432,7 +431,6 @@ jobs:
|
||||
-e CONFLUENCE_ACCESS_TOKEN=${CONFLUENCE_ACCESS_TOKEN} \
|
||||
-e CONFLUENCE_ACCESS_TOKEN_SCOPED=${CONFLUENCE_ACCESS_TOKEN_SCOPED} \
|
||||
-e JIRA_BASE_URL=${JIRA_BASE_URL} \
|
||||
-e JIRA_ADMIN_USER_EMAIL=${JIRA_ADMIN_USER_EMAIL} \
|
||||
-e JIRA_USER_EMAIL=${JIRA_USER_EMAIL} \
|
||||
-e JIRA_API_TOKEN=${JIRA_API_TOKEN} \
|
||||
-e JIRA_API_TOKEN_SCOPED=${JIRA_API_TOKEN_SCOPED} \
|
||||
|
||||
@@ -116,7 +116,6 @@ jobs:
|
||||
CONFLUENCE_ACCESS_TOKEN, test/confluence-access-token
|
||||
CONFLUENCE_ACCESS_TOKEN_SCOPED, test/confluence-access-token-scoped
|
||||
JIRA_BASE_URL, test/jira-base-url
|
||||
JIRA_ADMIN_USER_EMAIL, test/jira-admin-user-email
|
||||
JIRA_USER_EMAIL, test/jira-user-email
|
||||
JIRA_API_TOKEN, test/jira-api-token
|
||||
JIRA_API_TOKEN_SCOPED, test/jira-api-token-scoped
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
"""Add file_id to documents
|
||||
|
||||
Revision ID: 91d150c361f6
|
||||
Revises: a6fcd3d631f9
|
||||
Create Date: 2026-04-16 15:43:30.314823
|
||||
|
||||
"""
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "91d150c361f6"
|
||||
down_revision = "a6fcd3d631f9"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column(
|
||||
"document",
|
||||
sa.Column("file_id", sa.String(), nullable=True),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_column("document", "file_id")
|
||||
@@ -81,9 +81,6 @@ from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.redis.redis_pool import get_redis_replica_client
|
||||
from onyx.redis.redis_pool import redis_lock_dump
|
||||
from onyx.redis.redis_tenant_work_gating import maybe_mark_tenant_active
|
||||
from onyx.server.metrics.perm_sync_metrics import inc_doc_perm_sync_docs_processed
|
||||
from onyx.server.metrics.perm_sync_metrics import inc_doc_perm_sync_errors
|
||||
from onyx.server.metrics.perm_sync_metrics import observe_doc_perm_sync_duration
|
||||
from onyx.server.runtime.onyx_runtime import OnyxRuntime
|
||||
from onyx.server.utils import make_short_id
|
||||
from onyx.utils.logger import doc_permission_sync_ctx
|
||||
@@ -478,8 +475,6 @@ def connector_permission_sync_generator_task(
|
||||
_fail_doc_permission_sync_attempt(attempt_id, error_msg)
|
||||
return None
|
||||
|
||||
sync_start = time.monotonic()
|
||||
connector_type: str = "unknown"
|
||||
try:
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
cc_pair = get_connector_credential_pair_from_id(
|
||||
@@ -513,7 +508,6 @@ def connector_permission_sync_generator_task(
|
||||
raise
|
||||
|
||||
source_type = cc_pair.connector.source
|
||||
connector_type = source_type.value
|
||||
sync_config = get_source_perm_sync_config(source_type)
|
||||
if sync_config is None:
|
||||
error_msg = f"No sync config found for {source_type}"
|
||||
@@ -599,7 +593,7 @@ def connector_permission_sync_generator_task(
|
||||
result = redis_connector.permissions.update_db(
|
||||
lock=lock,
|
||||
new_permissions=[doc_external_access],
|
||||
source_string=connector_type,
|
||||
source_string=source_type,
|
||||
connector_id=cc_pair.connector.id,
|
||||
credential_id=cc_pair.credential.id,
|
||||
task_logger=task_logger,
|
||||
@@ -612,10 +606,6 @@ def connector_permission_sync_generator_task(
|
||||
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated} docs_with_errors={docs_with_errors}"
|
||||
)
|
||||
|
||||
inc_doc_perm_sync_docs_processed(connector_type, tasks_generated)
|
||||
if docs_with_errors > 0:
|
||||
inc_doc_perm_sync_errors(connector_type, docs_with_errors)
|
||||
|
||||
complete_doc_permission_sync_attempt(
|
||||
db_session=db_session,
|
||||
attempt_id=attempt_id,
|
||||
@@ -648,7 +638,6 @@ def connector_permission_sync_generator_task(
|
||||
redis_connector.permissions.set_fence(None)
|
||||
raise e
|
||||
finally:
|
||||
observe_doc_perm_sync_duration(time.monotonic() - sync_start, connector_type)
|
||||
if lock.owned():
|
||||
lock.release()
|
||||
|
||||
|
||||
@@ -70,11 +70,6 @@ from onyx.redis.redis_connector_ext_group_sync import (
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.redis.redis_pool import get_redis_replica_client
|
||||
from onyx.redis.redis_tenant_work_gating import maybe_mark_tenant_active
|
||||
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_errors
|
||||
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_groups_processed
|
||||
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_users_processed
|
||||
from onyx.server.metrics.perm_sync_metrics import observe_group_sync_duration
|
||||
from onyx.server.metrics.perm_sync_metrics import observe_group_sync_upsert_duration
|
||||
from onyx.server.runtime.onyx_runtime import OnyxRuntime
|
||||
from onyx.server.utils import make_short_id
|
||||
from onyx.utils.logger import format_error_for_logging
|
||||
@@ -476,9 +471,7 @@ def _perform_external_group_sync(
|
||||
tenant_id: str,
|
||||
timeout_seconds: int = JOB_TIMEOUT,
|
||||
) -> None:
|
||||
sync_start = time.monotonic()
|
||||
connector_type: str = "unknown"
|
||||
|
||||
# Create attempt record at the start
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
attempt_id = create_external_group_sync_attempt(
|
||||
connector_credential_pair_id=cc_pair_id,
|
||||
@@ -488,23 +481,6 @@ def _perform_external_group_sync(
|
||||
f"Created external group sync attempt: {attempt_id} for cc_pair={cc_pair_id}"
|
||||
)
|
||||
|
||||
try:
|
||||
connector_type = _timed_perform_external_group_sync(
|
||||
cc_pair_id=cc_pair_id,
|
||||
tenant_id=tenant_id,
|
||||
timeout_seconds=timeout_seconds,
|
||||
attempt_id=attempt_id,
|
||||
)
|
||||
finally:
|
||||
observe_group_sync_duration(time.monotonic() - sync_start, connector_type)
|
||||
|
||||
|
||||
def _timed_perform_external_group_sync(
|
||||
cc_pair_id: int,
|
||||
tenant_id: str,
|
||||
attempt_id: int,
|
||||
timeout_seconds: int = JOB_TIMEOUT,
|
||||
) -> str:
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
cc_pair = get_connector_credential_pair_from_id(
|
||||
db_session=db_session,
|
||||
@@ -515,7 +491,6 @@ def _timed_perform_external_group_sync(
|
||||
raise ValueError(f"No connector credential pair found for id: {cc_pair_id}")
|
||||
|
||||
source_type = cc_pair.connector.source
|
||||
connector_type = source_type.value
|
||||
sync_config = get_source_perm_sync_config(source_type)
|
||||
if sync_config is None:
|
||||
msg = f"No sync config found for {source_type} for cc_pair: {cc_pair_id}"
|
||||
@@ -531,18 +506,6 @@ def _timed_perform_external_group_sync(
|
||||
|
||||
ext_group_sync_func = sync_config.group_sync_config.group_sync_func
|
||||
|
||||
# Clean up stale rows from previous cycle BEFORE marking new ones.
|
||||
# This ensures cleanup always runs regardless of whether the current
|
||||
# sync succeeds — previously, cleanup only ran at the END of the sync,
|
||||
# so if the sync failed (e.g. DB connection killed by
|
||||
# idle_in_transaction_session_timeout during long API calls), stale
|
||||
# rows would accumulate indefinitely.
|
||||
logger.info(
|
||||
f"Removing stale external groups from prior cycle for {source_type} "
|
||||
f"for cc_pair: {cc_pair_id}"
|
||||
)
|
||||
remove_stale_external_groups(db_session, cc_pair_id)
|
||||
|
||||
logger.info(
|
||||
f"Marking old external groups as stale for {source_type} for cc_pair: {cc_pair_id}"
|
||||
)
|
||||
@@ -559,7 +522,6 @@ def _timed_perform_external_group_sync(
|
||||
seen_users: set[str] = set() # Track unique users across all groups
|
||||
total_groups_processed = 0
|
||||
total_group_memberships_synced = 0
|
||||
cumulative_upsert_time = 0.0
|
||||
start_time = time.monotonic()
|
||||
try:
|
||||
external_user_group_generator = ext_group_sync_func(tenant_id, cc_pair)
|
||||
@@ -588,26 +550,22 @@ def _timed_perform_external_group_sync(
|
||||
logger.debug(
|
||||
f"New external user groups: {external_user_group_batch}"
|
||||
)
|
||||
upsert_start = time.monotonic()
|
||||
upsert_external_groups(
|
||||
db_session=db_session,
|
||||
cc_pair_id=cc_pair_id,
|
||||
external_groups=external_user_group_batch,
|
||||
source=cc_pair.connector.source,
|
||||
)
|
||||
cumulative_upsert_time += time.monotonic() - upsert_start
|
||||
external_user_group_batch = []
|
||||
|
||||
if external_user_group_batch:
|
||||
logger.debug(f"New external user groups: {external_user_group_batch}")
|
||||
upsert_start = time.monotonic()
|
||||
upsert_external_groups(
|
||||
db_session=db_session,
|
||||
cc_pair_id=cc_pair_id,
|
||||
external_groups=external_user_group_batch,
|
||||
source=cc_pair.connector.source,
|
||||
)
|
||||
cumulative_upsert_time += time.monotonic() - upsert_start
|
||||
except Exception as e:
|
||||
format_error_for_logging(e)
|
||||
|
||||
@@ -617,14 +575,11 @@ def _timed_perform_external_group_sync(
|
||||
)
|
||||
|
||||
# TODO: add some notification to the admins here
|
||||
inc_group_sync_errors(connector_type)
|
||||
logger.exception(
|
||||
f"Error syncing external groups for {source_type} for cc_pair: {cc_pair_id} {e}"
|
||||
)
|
||||
raise e
|
||||
|
||||
observe_group_sync_upsert_duration(cumulative_upsert_time, connector_type)
|
||||
|
||||
logger.info(
|
||||
f"Removing stale external groups for {source_type} for cc_pair: {cc_pair_id}"
|
||||
)
|
||||
@@ -648,13 +603,8 @@ def _timed_perform_external_group_sync(
|
||||
f"{total_group_memberships_synced} memberships"
|
||||
)
|
||||
|
||||
inc_group_sync_groups_processed(connector_type, total_groups_processed)
|
||||
inc_group_sync_users_processed(connector_type, total_users_processed)
|
||||
|
||||
mark_all_relevant_cc_pairs_as_external_group_synced(db_session, cc_pair)
|
||||
|
||||
return connector_type
|
||||
|
||||
|
||||
def validate_external_group_sync_fences(
|
||||
tenant_id: str,
|
||||
|
||||
@@ -5,7 +5,6 @@ from pydantic import BaseModel
|
||||
from sqlalchemy import delete
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import update
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.access.utils import build_ext_group_name_for_onyx
|
||||
@@ -78,15 +77,6 @@ def mark_old_external_groups_as_stale(
|
||||
.where(PublicExternalUserGroup.cc_pair_id == cc_pair_id)
|
||||
.values(stale=True)
|
||||
)
|
||||
# Commit immediately so the transaction closes before potentially long
|
||||
# external API calls (e.g. Google Drive folder iteration). Without this,
|
||||
# the DB connection sits idle-in-transaction during API calls and gets
|
||||
# killed by idle_in_transaction_session_timeout, causing the entire sync
|
||||
# to fail and stale cleanup to never run.
|
||||
db_session.commit()
|
||||
|
||||
|
||||
_UPSERT_BATCH_SIZE = 5000
|
||||
|
||||
|
||||
def upsert_external_groups(
|
||||
@@ -96,102 +86,91 @@ def upsert_external_groups(
|
||||
source: DocumentSource,
|
||||
) -> None:
|
||||
"""
|
||||
Batch upsert external user groups using INSERT ... ON CONFLICT DO UPDATE.
|
||||
- For existing rows (same user_id, external_user_group_id, cc_pair_id),
|
||||
sets stale=False
|
||||
- For new rows, inserts with stale=False
|
||||
- Same logic for PublicExternalUserGroup
|
||||
Performs a true upsert operation for external user groups:
|
||||
- For existing groups (same user_id, external_user_group_id, cc_pair_id), updates the stale flag to False
|
||||
- For new groups, inserts them with stale=False
|
||||
- For public groups, uses upsert logic as well
|
||||
"""
|
||||
# If there are no groups to add, return early
|
||||
if not external_groups:
|
||||
return
|
||||
|
||||
# Collect all emails from all groups to batch-add users at once
|
||||
all_group_member_emails: set[str] = set()
|
||||
# collect all emails from all groups to batch add all users at once for efficiency
|
||||
all_group_member_emails = set()
|
||||
for external_group in external_groups:
|
||||
all_group_member_emails.update(external_group.user_emails)
|
||||
for user_email in external_group.user_emails:
|
||||
all_group_member_emails.add(user_email)
|
||||
|
||||
# Batch add users if they don't exist and get their ids
|
||||
# batch add users if they don't exist and get their ids
|
||||
all_group_members: list[User] = batch_add_ext_perm_user_if_not_exists(
|
||||
db_session=db_session,
|
||||
# NOTE: this function handles case sensitivity for emails
|
||||
emails=list(all_group_member_emails),
|
||||
)
|
||||
|
||||
# map emails to ids
|
||||
email_id_map = {user.email.lower(): user.id for user in all_group_members}
|
||||
|
||||
# Build all user-group mappings and public-group mappings
|
||||
user_group_mappings: list[dict] = []
|
||||
public_group_mappings: list[dict] = []
|
||||
|
||||
# Process each external group
|
||||
for external_group in external_groups:
|
||||
external_group_id = build_ext_group_name_for_onyx(
|
||||
ext_group_name=external_group.id,
|
||||
source=source,
|
||||
)
|
||||
|
||||
# Handle user-group mappings
|
||||
for user_email in external_group.user_emails:
|
||||
user_id = email_id_map.get(user_email.lower())
|
||||
if user_id is None:
|
||||
logger.warning(
|
||||
f"User in group {external_group.id}"
|
||||
f" with email {user_email} not found"
|
||||
f"User in group {external_group.id} with email {user_email} not found"
|
||||
)
|
||||
continue
|
||||
|
||||
user_group_mappings.append(
|
||||
{
|
||||
"user_id": user_id,
|
||||
"external_user_group_id": external_group_id,
|
||||
"cc_pair_id": cc_pair_id,
|
||||
"stale": False,
|
||||
}
|
||||
# Check if the user-group mapping already exists
|
||||
existing_user_group = db_session.scalar(
|
||||
select(User__ExternalUserGroupId).where(
|
||||
User__ExternalUserGroupId.user_id == user_id,
|
||||
User__ExternalUserGroupId.external_user_group_id
|
||||
== external_group_id,
|
||||
User__ExternalUserGroupId.cc_pair_id == cc_pair_id,
|
||||
)
|
||||
)
|
||||
|
||||
if existing_user_group:
|
||||
# Update existing record
|
||||
existing_user_group.stale = False
|
||||
else:
|
||||
# Insert new record
|
||||
new_user_group = User__ExternalUserGroupId(
|
||||
user_id=user_id,
|
||||
external_user_group_id=external_group_id,
|
||||
cc_pair_id=cc_pair_id,
|
||||
stale=False,
|
||||
)
|
||||
db_session.add(new_user_group)
|
||||
|
||||
# Handle public group if needed
|
||||
if external_group.gives_anyone_access:
|
||||
public_group_mappings.append(
|
||||
{
|
||||
"external_user_group_id": external_group_id,
|
||||
"cc_pair_id": cc_pair_id,
|
||||
"stale": False,
|
||||
}
|
||||
# Check if the public group already exists
|
||||
existing_public_group = db_session.scalar(
|
||||
select(PublicExternalUserGroup).where(
|
||||
PublicExternalUserGroup.external_user_group_id == external_group_id,
|
||||
PublicExternalUserGroup.cc_pair_id == cc_pair_id,
|
||||
)
|
||||
)
|
||||
|
||||
# Deduplicate to avoid "ON CONFLICT DO UPDATE command cannot affect row
|
||||
# a second time" when duplicate emails or overlapping groups produce
|
||||
# identical (user_id, external_user_group_id, cc_pair_id) tuples.
|
||||
user_group_mappings_deduped = list(
|
||||
{
|
||||
(m["user_id"], m["external_user_group_id"], m["cc_pair_id"]): m
|
||||
for m in user_group_mappings
|
||||
}.values()
|
||||
)
|
||||
|
||||
# Batch upsert user-group mappings
|
||||
for i in range(0, len(user_group_mappings_deduped), _UPSERT_BATCH_SIZE):
|
||||
chunk = user_group_mappings_deduped[i : i + _UPSERT_BATCH_SIZE]
|
||||
stmt = pg_insert(User__ExternalUserGroupId).values(chunk)
|
||||
stmt = stmt.on_conflict_do_update(
|
||||
index_elements=["user_id", "external_user_group_id", "cc_pair_id"],
|
||||
set_={"stale": False},
|
||||
)
|
||||
db_session.execute(stmt)
|
||||
|
||||
# Deduplicate public group mappings as well
|
||||
public_group_mappings_deduped = list(
|
||||
{
|
||||
(m["external_user_group_id"], m["cc_pair_id"]): m
|
||||
for m in public_group_mappings
|
||||
}.values()
|
||||
)
|
||||
|
||||
# Batch upsert public group mappings
|
||||
for i in range(0, len(public_group_mappings_deduped), _UPSERT_BATCH_SIZE):
|
||||
chunk = public_group_mappings_deduped[i : i + _UPSERT_BATCH_SIZE]
|
||||
stmt = pg_insert(PublicExternalUserGroup).values(chunk)
|
||||
stmt = stmt.on_conflict_do_update(
|
||||
index_elements=["external_user_group_id", "cc_pair_id"],
|
||||
set_={"stale": False},
|
||||
)
|
||||
db_session.execute(stmt)
|
||||
if existing_public_group:
|
||||
# Update existing record
|
||||
existing_public_group.stale = False
|
||||
else:
|
||||
# Insert new record
|
||||
new_public_group = PublicExternalUserGroup(
|
||||
external_user_group_id=external_group_id,
|
||||
cc_pair_id=cc_pair_id,
|
||||
stale=False,
|
||||
)
|
||||
db_session.add(new_public_group)
|
||||
|
||||
db_session.commit()
|
||||
|
||||
|
||||
@@ -27,7 +27,6 @@ from shared_configs.configs import MIN_THREADS_ML_MODELS
|
||||
from shared_configs.configs import MODEL_SERVER_ALLOWED_HOST
|
||||
from shared_configs.configs import MODEL_SERVER_PORT
|
||||
from shared_configs.configs import SENTRY_DSN
|
||||
from shared_configs.configs import SENTRY_TRACES_SAMPLE_RATE
|
||||
|
||||
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
||||
os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"
|
||||
@@ -102,7 +101,7 @@ def get_model_app() -> FastAPI:
|
||||
sentry_sdk.init(
|
||||
dsn=SENTRY_DSN,
|
||||
integrations=[StarletteIntegration(), FastApiIntegration()],
|
||||
traces_sample_rate=SENTRY_TRACES_SAMPLE_RATE,
|
||||
traces_sample_rate=0.1,
|
||||
release=__version__,
|
||||
before_send=_add_instance_tags,
|
||||
)
|
||||
|
||||
@@ -55,7 +55,6 @@ from onyx.utils.logger import setup_logger
|
||||
from shared_configs.configs import DEV_LOGGING_ENABLED
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
|
||||
from shared_configs.configs import SENTRY_CELERY_TRACES_SAMPLE_RATE
|
||||
from shared_configs.configs import SENTRY_DSN
|
||||
from shared_configs.configs import TENANT_ID_PREFIX
|
||||
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
|
||||
@@ -70,7 +69,7 @@ if SENTRY_DSN:
|
||||
sentry_sdk.init(
|
||||
dsn=SENTRY_DSN,
|
||||
integrations=[CeleryIntegration()],
|
||||
traces_sample_rate=SENTRY_CELERY_TRACES_SAMPLE_RATE,
|
||||
traces_sample_rate=0.1,
|
||||
release=__version__,
|
||||
before_send=_add_instance_tags,
|
||||
)
|
||||
|
||||
@@ -37,7 +37,6 @@ from onyx.redis.redis_connector import RedisConnector
|
||||
from onyx.server.metrics.connector_health_metrics import on_index_attempt_status_change
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.variable_functionality import global_version
|
||||
from shared_configs.configs import SENTRY_CELERY_TRACES_SAMPLE_RATE
|
||||
from shared_configs.configs import SENTRY_DSN
|
||||
|
||||
logger = setup_logger()
|
||||
@@ -141,7 +140,7 @@ def _docfetching_task(
|
||||
|
||||
sentry_sdk.init(
|
||||
dsn=SENTRY_DSN,
|
||||
traces_sample_rate=SENTRY_CELERY_TRACES_SAMPLE_RATE,
|
||||
traces_sample_rate=0.1,
|
||||
release=__version__,
|
||||
before_send=_add_instance_tags,
|
||||
)
|
||||
|
||||
@@ -15,7 +15,7 @@ from onyx.background.celery.tasks.shared.RetryDocumentIndex import RetryDocument
|
||||
from onyx.configs.constants import ONYX_CELERY_BEAT_HEARTBEAT_KEY
|
||||
from onyx.configs.constants import OnyxCeleryTask
|
||||
from onyx.db.document import delete_document_by_connector_credential_pair__no_commit
|
||||
from onyx.db.document import delete_documents_complete
|
||||
from onyx.db.document import delete_documents_complete__no_commit
|
||||
from onyx.db.document import fetch_chunk_count_for_document
|
||||
from onyx.db.document import get_document
|
||||
from onyx.db.document import get_document_connector_count
|
||||
@@ -129,10 +129,11 @@ def document_by_cc_pair_cleanup_task(
|
||||
document_id=document_id,
|
||||
)
|
||||
|
||||
delete_documents_complete(
|
||||
delete_documents_complete__no_commit(
|
||||
db_session=db_session,
|
||||
document_ids=[document_id],
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
completion_status = OnyxCeleryTaskCompletionStatus.SUCCEEDED
|
||||
elif count > 1:
|
||||
|
||||
@@ -58,8 +58,6 @@ from onyx.db.indexing_coordination import IndexingCoordination
|
||||
from onyx.db.models import IndexAttempt
|
||||
from onyx.file_store.document_batch_storage import DocumentBatchStorage
|
||||
from onyx.file_store.document_batch_storage import get_document_batch_storage
|
||||
from onyx.file_store.staging import build_raw_file_callback
|
||||
from onyx.file_store.staging import RawFileCallback
|
||||
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
|
||||
from onyx.indexing.indexing_pipeline import index_doc_batch_prepare
|
||||
from onyx.redis.redis_hierarchy import cache_hierarchy_nodes_batch
|
||||
@@ -92,7 +90,6 @@ def _get_connector_runner(
|
||||
end_time: datetime,
|
||||
include_permissions: bool,
|
||||
leave_connector_active: bool = LEAVE_CONNECTOR_ACTIVE_ON_INITIALIZATION_FAILURE,
|
||||
raw_file_callback: RawFileCallback | None = None,
|
||||
) -> ConnectorRunner:
|
||||
"""
|
||||
NOTE: `start_time` and `end_time` are only used for poll connectors
|
||||
@@ -111,7 +108,6 @@ def _get_connector_runner(
|
||||
input_type=task,
|
||||
connector_specific_config=attempt.connector_credential_pair.connector.connector_specific_config,
|
||||
credential=attempt.connector_credential_pair.credential,
|
||||
raw_file_callback=raw_file_callback,
|
||||
)
|
||||
|
||||
# validate the connector settings
|
||||
@@ -279,12 +275,6 @@ def run_docfetching_entrypoint(
|
||||
f"credentials='{credential_id}'"
|
||||
)
|
||||
|
||||
raw_file_callback = build_raw_file_callback(
|
||||
index_attempt_id=index_attempt_id,
|
||||
cc_pair_id=connector_credential_pair_id,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
|
||||
connector_document_extraction(
|
||||
app,
|
||||
index_attempt_id,
|
||||
@@ -292,7 +282,6 @@ def run_docfetching_entrypoint(
|
||||
attempt.search_settings_id,
|
||||
tenant_id,
|
||||
callback,
|
||||
raw_file_callback=raw_file_callback,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
@@ -312,7 +301,6 @@ def connector_document_extraction(
|
||||
search_settings_id: int,
|
||||
tenant_id: str,
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
raw_file_callback: RawFileCallback | None = None,
|
||||
) -> None:
|
||||
"""Extract documents from connector and queue them for indexing pipeline processing.
|
||||
|
||||
@@ -463,7 +451,6 @@ def connector_document_extraction(
|
||||
start_time=window_start,
|
||||
end_time=window_end,
|
||||
include_permissions=should_fetch_permissions_during_indexing,
|
||||
raw_file_callback=raw_file_callback,
|
||||
)
|
||||
|
||||
# don't use a checkpoint if we're explicitly indexing from
|
||||
|
||||
@@ -60,9 +60,7 @@ from onyx.configs.constants import DEFAULT_PERSONA_ID
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.configs.constants import MessageType
|
||||
from onyx.configs.constants import MilestoneRecordType
|
||||
from onyx.configs.llm_configs import get_image_extraction_and_analysis_enabled
|
||||
from onyx.context.search.models import BaseFilters
|
||||
from onyx.context.search.models import IndexFilters
|
||||
from onyx.context.search.models import SearchDoc
|
||||
from onyx.db.chat import create_new_chat_message
|
||||
from onyx.db.chat import get_chat_session_by_id
|
||||
@@ -76,17 +74,12 @@ from onyx.db.models import Persona
|
||||
from onyx.db.models import User
|
||||
from onyx.db.models import UserFile
|
||||
from onyx.db.projects import get_user_files_from_project
|
||||
from onyx.db.search_settings import get_active_search_settings
|
||||
from onyx.db.tools import get_tools
|
||||
from onyx.deep_research.dr_loop import run_deep_research_llm_loop
|
||||
from onyx.document_index.factory import get_default_document_index
|
||||
from onyx.document_index.interfaces import DocumentIndex
|
||||
from onyx.document_index.interfaces import VespaChunkRequest
|
||||
from onyx.error_handling.error_codes import OnyxErrorCode
|
||||
from onyx.error_handling.exceptions import log_onyx_error
|
||||
from onyx.error_handling.exceptions import OnyxError
|
||||
from onyx.file_processing.extract_file_text import extract_file_text
|
||||
from onyx.file_processing.extract_file_text import extract_text_and_images
|
||||
from onyx.file_store.models import ChatFileType
|
||||
from onyx.file_store.models import InMemoryChatFile
|
||||
from onyx.file_store.utils import load_in_memory_chat_files
|
||||
@@ -129,7 +122,6 @@ from onyx.tools.tool_constructor import SearchToolConfig
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.telemetry import mt_cloud_telemetry
|
||||
from onyx.utils.timing import log_function_time
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
from shared_configs.contextvars import get_current_tenant_id
|
||||
|
||||
logger = setup_logger()
|
||||
@@ -252,106 +244,22 @@ def _empty_extracted_context_files() -> ExtractedContextFiles:
|
||||
)
|
||||
|
||||
|
||||
def _fetch_cached_image_captions(
|
||||
user_file: UserFile | None,
|
||||
document_index: DocumentIndex | None,
|
||||
) -> list[str]:
|
||||
"""Read image-caption chunks for a user file from the document index.
|
||||
|
||||
During indexing, embedded images are summarized via a vision LLM and
|
||||
those summaries are stored as chunks whose `image_file_id` is set. Reading
|
||||
them back at chat time avoids re-running vision-LLM calls per turn.
|
||||
Returns an empty list if the index has no chunks yet (e.g. indexing is
|
||||
still in flight) or on any fetch failure.
|
||||
"""
|
||||
if user_file is None or document_index is None:
|
||||
return []
|
||||
try:
|
||||
chunks = document_index.id_based_retrieval(
|
||||
chunk_requests=[VespaChunkRequest(document_id=str(user_file.id))],
|
||||
filters=IndexFilters(
|
||||
access_control_list=None,
|
||||
tenant_id=get_current_tenant_id() if MULTI_TENANT else None,
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
f"Failed to fetch cached captions for user_file {user_file.id}",
|
||||
exc_info=True,
|
||||
)
|
||||
return []
|
||||
|
||||
# An image can be spread across multiple chunks; combine by image_file_id
|
||||
# so a single caption appears once in the context.
|
||||
combined: dict[str, list[str]] = {}
|
||||
for chunk in chunks:
|
||||
if chunk.image_file_id and chunk.content:
|
||||
combined.setdefault(chunk.image_file_id, []).append(chunk.content)
|
||||
return [
|
||||
f"[Image — {image_file_id}]\n" + "\n".join(contents)
|
||||
for image_file_id, contents in combined.items()
|
||||
]
|
||||
|
||||
|
||||
def _extract_text_from_in_memory_file(
|
||||
f: InMemoryChatFile,
|
||||
user_file: UserFile | None = None,
|
||||
document_index: DocumentIndex | None = None,
|
||||
) -> str | None:
|
||||
def _extract_text_from_in_memory_file(f: InMemoryChatFile) -> str | None:
|
||||
"""Extract text content from an InMemoryChatFile.
|
||||
|
||||
PLAIN_TEXT: the content is pre-extracted UTF-8 plaintext stored during
|
||||
ingestion — decode directly.
|
||||
DOC / CSV / other text types: the content is the original file bytes —
|
||||
use extract_file_text which handles encoding detection and format parsing.
|
||||
When image extraction is enabled and the file has embedded images, cached
|
||||
captions are pulled from the document index and appended to the text.
|
||||
The index fetch is skipped for files with no embedded images. We do not
|
||||
re-summarize images inline here — this path is hot and the indexing
|
||||
pipeline writes chunks atomically, so a missed caption means the file
|
||||
is mid-indexing and will be picked up on the next turn.
|
||||
"""
|
||||
try:
|
||||
if f.file_type == ChatFileType.PLAIN_TEXT:
|
||||
return f.content.decode("utf-8", errors="ignore").replace("\x00", "")
|
||||
|
||||
filename = f.filename or ""
|
||||
if not get_image_extraction_and_analysis_enabled():
|
||||
return extract_file_text(
|
||||
file=io.BytesIO(f.content),
|
||||
file_name=filename,
|
||||
break_on_unprocessable=False,
|
||||
)
|
||||
|
||||
extraction = extract_text_and_images(
|
||||
return extract_file_text(
|
||||
file=io.BytesIO(f.content),
|
||||
file_name=filename,
|
||||
file_name=f.filename or "",
|
||||
break_on_unprocessable=False,
|
||||
)
|
||||
text = extraction.text_content
|
||||
has_text = bool(text.strip())
|
||||
has_images = bool(extraction.embedded_images)
|
||||
|
||||
if not has_text and not has_images:
|
||||
# extract_text_and_images has no is_text_file() fallback for
|
||||
# unknown extensions (.py/.rs/.md without a dedicated handler).
|
||||
# Defer to the legacy path so those files remain readable.
|
||||
return extract_file_text(
|
||||
file=io.BytesIO(f.content),
|
||||
file_name=filename,
|
||||
break_on_unprocessable=False,
|
||||
)
|
||||
|
||||
if not has_images:
|
||||
return text if has_text else None
|
||||
|
||||
cached_captions = _fetch_cached_image_captions(user_file, document_index)
|
||||
|
||||
parts: list[str] = []
|
||||
if has_text:
|
||||
parts.append(text)
|
||||
parts.extend(cached_captions)
|
||||
|
||||
return "\n\n".join(parts).strip() or None
|
||||
except Exception:
|
||||
logger.warning(f"Failed to extract text from file {f.file_id}", exc_info=True)
|
||||
return None
|
||||
@@ -433,23 +341,6 @@ def extract_context_files(
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
# The document index is used at chat time to read cached image captions
|
||||
# (produced during indexing) so vision-LLM calls don't re-run per turn.
|
||||
document_index: DocumentIndex | None = None
|
||||
if not DISABLE_VECTOR_DB and get_image_extraction_and_analysis_enabled():
|
||||
try:
|
||||
active_search_settings = get_active_search_settings(db_session)
|
||||
document_index = get_default_document_index(
|
||||
search_settings=active_search_settings.primary,
|
||||
secondary_search_settings=None,
|
||||
db_session=db_session,
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Failed to construct document index for caption lookup",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
file_texts: list[str] = []
|
||||
image_files: list[ChatLoadedFile] = []
|
||||
file_metadata: list[ContextFileMetadata] = []
|
||||
@@ -470,9 +361,7 @@ def extract_context_files(
|
||||
continue
|
||||
tool_metadata.append(_build_tool_metadata(uf))
|
||||
elif f.file_type.is_text_file():
|
||||
text_content = _extract_text_from_in_memory_file(
|
||||
f, user_file=uf, document_index=document_index
|
||||
)
|
||||
text_content = _extract_text_from_in_memory_file(f)
|
||||
if not text_content:
|
||||
continue
|
||||
if not uf:
|
||||
|
||||
@@ -372,7 +372,6 @@ class FileOrigin(str, Enum):
|
||||
CONNECTOR_METADATA = "connector_metadata"
|
||||
GENERATED_REPORT = "generated_report"
|
||||
INDEXING_CHECKPOINT = "indexing_checkpoint"
|
||||
INDEXING_STAGING = "indexing_staging"
|
||||
PLAINTEXT_CACHE = "plaintext_cache"
|
||||
OTHER = "other"
|
||||
QUERY_HISTORY_CSV = "query_history_csv"
|
||||
|
||||
@@ -3,14 +3,7 @@ from onyx.server.settings.store import load_settings
|
||||
|
||||
|
||||
def get_image_extraction_and_analysis_enabled() -> bool:
|
||||
"""Return the workspace setting for image extraction/analysis.
|
||||
|
||||
The pydantic `Settings` model defaults this field to True, so production
|
||||
tenants get the feature on by default on first read. The fallback here
|
||||
stays False so environments where settings cannot be loaded at all
|
||||
(e.g. unit tests with no DB/Redis) don't trigger downstream vision-LLM
|
||||
code paths that assume the DB is reachable.
|
||||
"""
|
||||
"""Get image extraction and analysis enabled setting from workspace settings or fallback to False"""
|
||||
try:
|
||||
settings = load_settings()
|
||||
if settings.image_extraction_and_analysis_enabled is not None:
|
||||
|
||||
@@ -2,23 +2,15 @@ import csv
|
||||
import io
|
||||
from typing import IO
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from onyx.connectors.models import TabularSection
|
||||
from onyx.file_processing.extract_file_text import file_io_to_text
|
||||
from onyx.file_processing.extract_file_text import xlsx_sheet_extraction
|
||||
from onyx.file_processing.file_types import OnyxFileExtensions
|
||||
from onyx.file_store.staging import RawFileCallback
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
class TabularExtractionResult(BaseModel):
|
||||
sections: list[TabularSection]
|
||||
staged_file_id: str
|
||||
|
||||
|
||||
def is_tabular_file(file_name: str) -> bool:
|
||||
lowered = file_name.lower()
|
||||
return any(lowered.endswith(ext) for ext in OnyxFileExtensions.TABULAR_EXTENSIONS)
|
||||
@@ -49,9 +41,6 @@ def tabular_file_to_sections(
|
||||
"""
|
||||
lowered = file_name.lower()
|
||||
|
||||
if not lowered.endswith(tuple(OnyxFileExtensions.TABULAR_EXTENSIONS)):
|
||||
raise ValueError(f"{file_name!r} is not a tabular file")
|
||||
|
||||
if lowered.endswith(tuple(OnyxFileExtensions.SPREADSHEET_EXTENSIONS)):
|
||||
return [
|
||||
TabularSection(
|
||||
@@ -64,6 +53,9 @@ def tabular_file_to_sections(
|
||||
)
|
||||
]
|
||||
|
||||
if not lowered.endswith((".csv", ".tsv")):
|
||||
raise ValueError(f"{file_name!r} is not a tabular file")
|
||||
|
||||
try:
|
||||
text = file_io_to_text(file).strip()
|
||||
except Exception:
|
||||
@@ -75,26 +67,3 @@ def tabular_file_to_sections(
|
||||
if lowered.endswith(".tsv"):
|
||||
text = _tsv_to_csv(text)
|
||||
return [TabularSection(link=link or file_name, text=text)]
|
||||
|
||||
|
||||
def extract_and_stage_tabular_file(
|
||||
file: IO[bytes],
|
||||
file_name: str,
|
||||
content_type: str,
|
||||
raw_file_callback: RawFileCallback,
|
||||
link: str = "",
|
||||
) -> TabularExtractionResult:
|
||||
"""Extract tabular sections AND stage the raw bytes via the callback."""
|
||||
sections = tabular_file_to_sections(
|
||||
file=file,
|
||||
file_name=file_name,
|
||||
link=link,
|
||||
)
|
||||
# rewind so the callback can re-read what extraction consumed
|
||||
file.seek(0)
|
||||
staged_file_id = raw_file_callback(file, content_type)
|
||||
|
||||
return TabularExtractionResult(
|
||||
sections=sections,
|
||||
staged_file_id=staged_file_id,
|
||||
)
|
||||
|
||||
@@ -22,7 +22,6 @@ from onyx.db.credentials import backend_update_credential_json
|
||||
from onyx.db.credentials import fetch_credential_by_id
|
||||
from onyx.db.enums import AccessType
|
||||
from onyx.db.models import Credential
|
||||
from onyx.file_store.staging import RawFileCallback
|
||||
from shared_configs.contextvars import get_current_tenant_id
|
||||
|
||||
|
||||
@@ -108,7 +107,6 @@ def instantiate_connector(
|
||||
input_type: InputType,
|
||||
connector_specific_config: dict[str, Any],
|
||||
credential: Credential,
|
||||
raw_file_callback: RawFileCallback | None = None,
|
||||
) -> BaseConnector:
|
||||
connector_class = identify_connector_class(source, input_type)
|
||||
|
||||
@@ -132,9 +130,6 @@ def instantiate_connector(
|
||||
|
||||
connector.set_allow_images(get_image_extraction_and_analysis_enabled())
|
||||
|
||||
if raw_file_callback is not None:
|
||||
connector.set_raw_file_callback(raw_file_callback)
|
||||
|
||||
return connector
|
||||
|
||||
|
||||
|
||||
@@ -40,22 +40,6 @@ class GongConnectorCheckpoint(ConnectorCheckpoint):
|
||||
cursor: str | None = None
|
||||
# Cached time range — computed once, reused across checkpoint calls
|
||||
time_range: tuple[str, str] | None = None
|
||||
# Transcripts whose call details were not yet available from /v2/calls/extensive
|
||||
# (Gong has a known race where transcript call IDs take time to propagate).
|
||||
# Keyed by call_id. Retried on subsequent checkpoint invocations.
|
||||
#
|
||||
# Invariant: all entries share one resolution session — they're stashed
|
||||
# together from a single page and share the attempt counter and retry
|
||||
# deadline. load_from_checkpoint only fetches a new page when this dict
|
||||
# is empty, so entries from different pages can't mix.
|
||||
pending_transcripts: dict[str, dict[str, Any]] = {}
|
||||
# Number of resolution attempts made for pending_transcripts so far.
|
||||
pending_call_details_attempts: int = 0
|
||||
# Unix timestamp before which we should not retry pending_transcripts.
|
||||
# Enforces exponential backoff independent of worker cadence — Gong's
|
||||
# transcript-ID propagation race can take tens of seconds to minutes,
|
||||
# longer than typical worker reinvocation intervals.
|
||||
pending_retry_after: float | None = None
|
||||
|
||||
|
||||
class _TranscriptPage(BaseModel):
|
||||
@@ -78,15 +62,8 @@ class _CursorExpiredError(Exception):
|
||||
|
||||
class GongConnector(CheckpointedConnector[GongConnectorCheckpoint]):
|
||||
BASE_URL = "https://api.gong.io"
|
||||
# Max number of attempts to resolve missing call details across checkpoint
|
||||
# invocations before giving up and emitting ConnectorFailure.
|
||||
MAX_CALL_DETAILS_ATTEMPTS = 6
|
||||
# Base delay for exponential backoff between pending-transcript retry
|
||||
# attempts. Delay before attempt N (N >= 2) is CALL_DETAILS_DELAY * 2^(N-2)
|
||||
# seconds (30, 60, 120, 240, 480 = ~15.5min total) — matching the original
|
||||
# blocking-retry schedule, but enforced via checkpoint deadline rather
|
||||
# than in-call time.sleep.
|
||||
CALL_DETAILS_DELAY = 30
|
||||
CALL_DETAILS_DELAY = 30 # in seconds
|
||||
# Gong API limit is 3 calls/sec — stay safely under it
|
||||
MIN_REQUEST_INTERVAL = 0.5 # seconds between requests
|
||||
|
||||
@@ -210,6 +187,50 @@ class GongConnector(CheckpointedConnector[GongConnectorCheckpoint]):
|
||||
|
||||
return call_to_metadata
|
||||
|
||||
def _fetch_call_details_with_retry(self, call_ids: list[str]) -> dict[str, Any]:
|
||||
"""Fetch call details with retry for the Gong API race condition.
|
||||
|
||||
The Gong API has a known race where transcript call IDs don't immediately
|
||||
appear in /v2/calls/extensive. Retries with exponential backoff, only
|
||||
re-requesting the missing IDs on each attempt.
|
||||
"""
|
||||
call_details_map = self._get_call_details_by_ids(call_ids)
|
||||
if set(call_ids) == set(call_details_map.keys()):
|
||||
return call_details_map
|
||||
|
||||
for attempt in range(2, self.MAX_CALL_DETAILS_ATTEMPTS + 1):
|
||||
missing_ids = list(set(call_ids) - set(call_details_map.keys()))
|
||||
logger.warning(
|
||||
f"_get_call_details_by_ids is missing call id's: current_attempt={attempt - 1} missing_call_ids={missing_ids}"
|
||||
)
|
||||
|
||||
wait_seconds = self.CALL_DETAILS_DELAY * pow(2, attempt - 2)
|
||||
logger.warning(
|
||||
f"_get_call_details_by_ids waiting to retry: "
|
||||
f"wait={wait_seconds}s "
|
||||
f"current_attempt={attempt - 1} "
|
||||
f"next_attempt={attempt} "
|
||||
f"max_attempts={self.MAX_CALL_DETAILS_ATTEMPTS}"
|
||||
)
|
||||
time.sleep(wait_seconds)
|
||||
|
||||
# Only re-fetch the missing IDs, merge into existing results
|
||||
new_details = self._get_call_details_by_ids(missing_ids)
|
||||
call_details_map.update(new_details)
|
||||
|
||||
if set(call_ids) == set(call_details_map.keys()):
|
||||
return call_details_map
|
||||
|
||||
missing_ids = list(set(call_ids) - set(call_details_map.keys()))
|
||||
logger.error(
|
||||
f"Giving up on missing call id's after "
|
||||
f"{self.MAX_CALL_DETAILS_ATTEMPTS} attempts: "
|
||||
f"missing_call_ids={missing_ids} — "
|
||||
f"proceeding with {len(call_details_map)} of "
|
||||
f"{len(call_ids)} calls"
|
||||
)
|
||||
return call_details_map
|
||||
|
||||
@staticmethod
|
||||
def _parse_parties(parties: list[dict]) -> dict[str, str]:
|
||||
id_mapping = {}
|
||||
@@ -292,119 +313,87 @@ class GongConnector(CheckpointedConnector[GongConnectorCheckpoint]):
|
||||
|
||||
return start_time, end_time
|
||||
|
||||
def _build_document(
|
||||
self,
|
||||
transcript: dict[str, Any],
|
||||
call_details: dict[str, Any],
|
||||
) -> Document:
|
||||
"""Build a single Document from a transcript and its resolved call details."""
|
||||
call_id = transcript["callId"]
|
||||
call_metadata = call_details["metaData"]
|
||||
|
||||
call_time_str = call_metadata["started"]
|
||||
call_title = call_metadata["title"]
|
||||
logger.info(
|
||||
f"Indexing Gong call id {call_id} from {call_time_str.split('T', 1)[0]}: {call_title}"
|
||||
)
|
||||
|
||||
call_parties = cast(list[dict] | None, call_details.get("parties"))
|
||||
if call_parties is None:
|
||||
logger.error(f"Couldn't get parties for Call ID: {call_id}")
|
||||
call_parties = []
|
||||
|
||||
id_to_name_map = self._parse_parties(call_parties)
|
||||
|
||||
speaker_to_name: dict[str, str] = {}
|
||||
|
||||
transcript_text = ""
|
||||
call_purpose = call_metadata["purpose"]
|
||||
if call_purpose:
|
||||
transcript_text += f"Call Description: {call_purpose}\n\n"
|
||||
|
||||
contents = transcript["transcript"]
|
||||
for segment in contents:
|
||||
speaker_id = segment.get("speakerId", "")
|
||||
if speaker_id not in speaker_to_name:
|
||||
if self.hide_user_info:
|
||||
speaker_to_name[speaker_id] = f"User {len(speaker_to_name) + 1}"
|
||||
else:
|
||||
speaker_to_name[speaker_id] = id_to_name_map.get(
|
||||
speaker_id, "Unknown"
|
||||
)
|
||||
|
||||
speaker_name = speaker_to_name[speaker_id]
|
||||
|
||||
sentences = segment.get("sentences", {})
|
||||
monolog = " ".join([sentence.get("text", "") for sentence in sentences])
|
||||
transcript_text += f"{speaker_name}: {monolog}\n\n"
|
||||
|
||||
return Document(
|
||||
id=call_id,
|
||||
sections=[TextSection(link=call_metadata["url"], text=transcript_text)],
|
||||
source=DocumentSource.GONG,
|
||||
semantic_identifier=call_title or "Untitled",
|
||||
doc_updated_at=datetime.fromisoformat(call_time_str).astimezone(
|
||||
timezone.utc
|
||||
),
|
||||
metadata={"client": call_metadata.get("system")},
|
||||
)
|
||||
|
||||
def _process_transcripts(
|
||||
self,
|
||||
transcripts: list[dict[str, Any]],
|
||||
checkpoint: GongConnectorCheckpoint,
|
||||
) -> Generator[Document | ConnectorFailure, None, None]:
|
||||
"""Fetch call details for a page of transcripts and yield resulting
|
||||
Documents. Transcripts whose call details are missing (Gong race
|
||||
condition) are stashed into `checkpoint.pending_transcripts` for retry
|
||||
on a future checkpoint invocation rather than blocking here.
|
||||
"""
|
||||
"""Process a batch of transcripts into Documents or ConnectorFailures."""
|
||||
transcript_call_ids = cast(
|
||||
list[str],
|
||||
[t.get("callId") for t in transcripts if t.get("callId")],
|
||||
)
|
||||
|
||||
call_details_map = (
|
||||
self._get_call_details_by_ids(transcript_call_ids)
|
||||
if transcript_call_ids
|
||||
else {}
|
||||
)
|
||||
|
||||
newly_stashed: list[str] = []
|
||||
call_details_map = self._fetch_call_details_with_retry(transcript_call_ids)
|
||||
|
||||
for transcript in transcripts:
|
||||
call_id = transcript.get("callId")
|
||||
|
||||
if not call_id:
|
||||
logger.error(
|
||||
"Couldn't get call information for transcript missing callId"
|
||||
)
|
||||
if not call_id or call_id not in call_details_map:
|
||||
logger.error(f"Couldn't get call information for Call ID: {call_id}")
|
||||
if call_id:
|
||||
logger.error(
|
||||
f"Call debug info: call_id={call_id} "
|
||||
f"call_ids={transcript_call_ids} "
|
||||
f"call_details_map={call_details_map.keys()}"
|
||||
)
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(document_id="unknown"),
|
||||
failure_message="Transcript missing callId",
|
||||
failed_document=DocumentFailure(
|
||||
document_id=call_id or "unknown",
|
||||
),
|
||||
failure_message=f"Couldn't get call information for Call ID: {call_id}",
|
||||
)
|
||||
continue
|
||||
|
||||
if call_id in call_details_map:
|
||||
yield self._build_document(transcript, call_details_map[call_id])
|
||||
continue
|
||||
call_details = call_details_map[call_id]
|
||||
call_metadata = call_details["metaData"]
|
||||
|
||||
# Details not available yet — stash for retry on next invocation.
|
||||
checkpoint.pending_transcripts[call_id] = transcript
|
||||
newly_stashed.append(call_id)
|
||||
|
||||
if newly_stashed:
|
||||
logger.warning(
|
||||
f"Gong call details not yet available (race condition); "
|
||||
f"deferring to next checkpoint invocation: "
|
||||
f"call_ids={newly_stashed}"
|
||||
call_time_str = call_metadata["started"]
|
||||
call_title = call_metadata["title"]
|
||||
logger.info(
|
||||
f"Indexing Gong call id {call_id} from {call_time_str.split('T', 1)[0]}: {call_title}"
|
||||
)
|
||||
|
||||
call_parties = cast(list[dict] | None, call_details.get("parties"))
|
||||
if call_parties is None:
|
||||
logger.error(f"Couldn't get parties for Call ID: {call_id}")
|
||||
call_parties = []
|
||||
|
||||
id_to_name_map = self._parse_parties(call_parties)
|
||||
|
||||
speaker_to_name: dict[str, str] = {}
|
||||
|
||||
transcript_text = ""
|
||||
call_purpose = call_metadata["purpose"]
|
||||
if call_purpose:
|
||||
transcript_text += f"Call Description: {call_purpose}\n\n"
|
||||
|
||||
contents = transcript["transcript"]
|
||||
for segment in contents:
|
||||
speaker_id = segment.get("speakerId", "")
|
||||
if speaker_id not in speaker_to_name:
|
||||
if self.hide_user_info:
|
||||
speaker_to_name[speaker_id] = f"User {len(speaker_to_name) + 1}"
|
||||
else:
|
||||
speaker_to_name[speaker_id] = id_to_name_map.get(
|
||||
speaker_id, "Unknown"
|
||||
)
|
||||
|
||||
speaker_name = speaker_to_name[speaker_id]
|
||||
|
||||
sentences = segment.get("sentences", {})
|
||||
monolog = " ".join([sentence.get("text", "") for sentence in sentences])
|
||||
transcript_text += f"{speaker_name}: {monolog}\n\n"
|
||||
|
||||
yield Document(
|
||||
id=call_id,
|
||||
sections=[TextSection(link=call_metadata["url"], text=transcript_text)],
|
||||
source=DocumentSource.GONG,
|
||||
semantic_identifier=call_title or "Untitled",
|
||||
doc_updated_at=datetime.fromisoformat(call_time_str).astimezone(
|
||||
timezone.utc
|
||||
),
|
||||
metadata={"client": call_metadata.get("system")},
|
||||
)
|
||||
# First attempt on any newly-stashed transcripts counts as attempt #1.
|
||||
# pending_call_details_attempts is guaranteed 0 here because
|
||||
# load_from_checkpoint only reaches _process_transcripts when
|
||||
# pending_transcripts was empty at entry (see early-return above).
|
||||
checkpoint.pending_call_details_attempts = 1
|
||||
checkpoint.pending_retry_after = time.time() + self._next_retry_delay(1)
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
combined = (
|
||||
@@ -443,18 +432,6 @@ class GongConnector(CheckpointedConnector[GongConnectorCheckpoint]):
|
||||
checkpoint.has_more = True
|
||||
return checkpoint
|
||||
|
||||
# Step 2: Resolve any transcripts stashed by a prior invocation whose
|
||||
# call details were missing due to Gong's propagation race. Worker
|
||||
# cadence between checkpoint calls provides the spacing between retry
|
||||
# attempts — no in-call sleep needed.
|
||||
if checkpoint.pending_transcripts:
|
||||
yield from self._resolve_pending_transcripts(checkpoint)
|
||||
# If pending still exists and we haven't exhausted attempts, defer
|
||||
# the rest of this invocation — _resolve_pending_transcripts set
|
||||
# has_more=True for us.
|
||||
if checkpoint.pending_transcripts:
|
||||
return checkpoint
|
||||
|
||||
workspace_ids = checkpoint.workspace_ids
|
||||
|
||||
# If we've exhausted all workspaces, we're done
|
||||
@@ -473,7 +450,7 @@ class GongConnector(CheckpointedConnector[GongConnectorCheckpoint]):
|
||||
|
||||
workspace_id = workspace_ids[checkpoint.workspace_index]
|
||||
|
||||
# Step 3: Fetch one page of transcripts
|
||||
# Step 2: Fetch one page of transcripts
|
||||
try:
|
||||
page = self._fetch_transcript_page(
|
||||
start_datetime=start_time,
|
||||
@@ -496,102 +473,23 @@ class GongConnector(CheckpointedConnector[GongConnectorCheckpoint]):
|
||||
checkpoint.has_more = True
|
||||
return checkpoint
|
||||
|
||||
# Step 4: Process transcripts into documents. Missing-details
|
||||
# transcripts get stashed into checkpoint.pending_transcripts.
|
||||
# Step 3: Process transcripts into documents
|
||||
if page.transcripts:
|
||||
yield from self._process_transcripts(page.transcripts, checkpoint)
|
||||
yield from self._process_transcripts(page.transcripts)
|
||||
|
||||
# Step 5: Update cursor/workspace state
|
||||
# Step 4: Update checkpoint state
|
||||
if page.next_cursor:
|
||||
# More pages in this workspace
|
||||
checkpoint.cursor = page.next_cursor
|
||||
checkpoint.has_more = True
|
||||
else:
|
||||
# This workspace is exhausted — advance to next
|
||||
checkpoint.workspace_index += 1
|
||||
checkpoint.cursor = None
|
||||
checkpoint.has_more = checkpoint.workspace_index < len(workspace_ids)
|
||||
|
||||
# If pending transcripts were stashed this invocation, we still have
|
||||
# work to do on a future invocation even if pagination is exhausted.
|
||||
if checkpoint.pending_transcripts:
|
||||
checkpoint.has_more = True
|
||||
|
||||
return checkpoint
|
||||
|
||||
def _next_retry_delay(self, attempts_done: int) -> float:
|
||||
"""Seconds to wait before attempt #(attempts_done + 1).
|
||||
Matches the original exponential backoff: 30, 60, 120, 240, 480.
|
||||
"""
|
||||
return self.CALL_DETAILS_DELAY * pow(2, attempts_done - 1)
|
||||
|
||||
def _resolve_pending_transcripts(
|
||||
self,
|
||||
checkpoint: GongConnectorCheckpoint,
|
||||
) -> Generator[Document | ConnectorFailure, None, None]:
|
||||
"""Attempt to resolve transcripts whose call details were unavailable
|
||||
in a prior invocation. Mutates checkpoint in place: resolved transcripts
|
||||
are removed from pending_transcripts; on attempt exhaustion, emits
|
||||
ConnectorFailure for each unresolved call_id and clears pending state.
|
||||
|
||||
If the backoff deadline hasn't elapsed yet, returns without issuing
|
||||
any API call so the next invocation can try again later.
|
||||
"""
|
||||
if (
|
||||
checkpoint.pending_retry_after is not None
|
||||
and time.time() < checkpoint.pending_retry_after
|
||||
):
|
||||
# Backoff still in effect — defer to a later invocation without
|
||||
# burning an attempt or an API call.
|
||||
checkpoint.has_more = True
|
||||
return
|
||||
|
||||
pending_call_ids = list(checkpoint.pending_transcripts.keys())
|
||||
resolved = self._get_call_details_by_ids(pending_call_ids)
|
||||
|
||||
for call_id, details in resolved.items():
|
||||
transcript = checkpoint.pending_transcripts.pop(call_id, None)
|
||||
if transcript is None:
|
||||
continue
|
||||
yield self._build_document(transcript, details)
|
||||
|
||||
if not checkpoint.pending_transcripts:
|
||||
checkpoint.pending_call_details_attempts = 0
|
||||
checkpoint.pending_retry_after = None
|
||||
return
|
||||
|
||||
checkpoint.pending_call_details_attempts += 1
|
||||
logger.warning(
|
||||
f"Gong call details still missing after "
|
||||
f"{checkpoint.pending_call_details_attempts}/"
|
||||
f"{self.MAX_CALL_DETAILS_ATTEMPTS} attempts: "
|
||||
f"missing_call_ids={list(checkpoint.pending_transcripts.keys())}"
|
||||
)
|
||||
|
||||
if checkpoint.pending_call_details_attempts >= self.MAX_CALL_DETAILS_ATTEMPTS:
|
||||
logger.error(
|
||||
f"Giving up on missing Gong call details after "
|
||||
f"{self.MAX_CALL_DETAILS_ATTEMPTS} attempts: "
|
||||
f"missing_call_ids={list(checkpoint.pending_transcripts.keys())}"
|
||||
)
|
||||
for call_id in list(checkpoint.pending_transcripts.keys()):
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(document_id=call_id),
|
||||
failure_message=(
|
||||
f"Couldn't get call details after {self.MAX_CALL_DETAILS_ATTEMPTS} attempts for Call ID: {call_id}"
|
||||
),
|
||||
)
|
||||
checkpoint.pending_transcripts = {}
|
||||
checkpoint.pending_call_details_attempts = 0
|
||||
checkpoint.pending_retry_after = None
|
||||
# has_more is recomputed by the workspace iteration that follows;
|
||||
# reset to False here so a stale True from a prior invocation
|
||||
# can't leak out via any future early-return path.
|
||||
checkpoint.has_more = False
|
||||
else:
|
||||
checkpoint.pending_retry_after = time.time() + self._next_retry_delay(
|
||||
checkpoint.pending_call_details_attempts
|
||||
)
|
||||
checkpoint.has_more = True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import os
|
||||
|
||||
@@ -578,18 +578,8 @@ class GoogleDriveConnector(
|
||||
current_id, file.user_email, field_type, failed_folder_ids_by_email
|
||||
)
|
||||
if not folder:
|
||||
# Can't access this folder - stop climbing.
|
||||
# If the terminal node is a confirmed orphan, backfill all
|
||||
# intermediate folders into failed_folder_ids_by_email so
|
||||
# future files short-circuit via _get_folder_metadata's
|
||||
# cache check instead of re-climbing the whole chain.
|
||||
if failed_folder_ids_by_email is not None:
|
||||
for email in {file.user_email, self.primary_admin_email}:
|
||||
email_failed_ids = failed_folder_ids_by_email.get(email)
|
||||
if email_failed_ids and current_id in email_failed_ids:
|
||||
failed_folder_ids_by_email.setdefault(
|
||||
email, ThreadSafeSet()
|
||||
).update(set(node_ids_in_walk))
|
||||
# Can't access this folder - stop climbing
|
||||
# Don't mark as fully walked since we didn't reach root
|
||||
break
|
||||
|
||||
folder_parent_id = _get_parent_id_from_file(folder)
|
||||
@@ -1635,7 +1625,6 @@ class GoogleDriveConnector(
|
||||
[retrieved_file.user_email, self.primary_admin_email]
|
||||
+ get_file_owners(retrieved_file.drive_file, self.primary_admin_email),
|
||||
retrieved_file.drive_file,
|
||||
self.raw_file_callback,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
|
||||
@@ -13,9 +13,6 @@ from pydantic import BaseModel
|
||||
from onyx.access.models import ExternalAccess
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
extract_and_stage_tabular_file,
|
||||
)
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
tabular_file_to_sections,
|
||||
@@ -46,7 +43,6 @@ from onyx.file_processing.file_types import OnyxFileExtensions
|
||||
from onyx.file_processing.file_types import OnyxMimeTypes
|
||||
from onyx.file_processing.file_types import SPREADSHEET_MIME_TYPE
|
||||
from onyx.file_processing.image_utils import store_image_and_create_section
|
||||
from onyx.file_store.staging import RawFileCallback
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.variable_functionality import (
|
||||
fetch_versioned_implementation_with_fallback,
|
||||
@@ -55,12 +51,6 @@ from onyx.utils.variable_functionality import noop_fallback
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
class BasicExtractionResult(BaseModel):
|
||||
sections: list[TextSection | ImageSection | TabularSection]
|
||||
staged_file_id: str | None = None
|
||||
|
||||
|
||||
# Cache for folder path lookups to avoid redundant API calls
|
||||
# Maps folder_id -> (folder_name, parent_id)
|
||||
_folder_cache: dict[str, tuple[str, str | None]] = {}
|
||||
@@ -310,8 +300,7 @@ def _download_and_extract_sections_basic(
|
||||
service: GoogleDriveService,
|
||||
allow_images: bool,
|
||||
size_threshold: int,
|
||||
raw_file_callback: RawFileCallback | None = None,
|
||||
) -> BasicExtractionResult:
|
||||
) -> list[TextSection | ImageSection | TabularSection]:
|
||||
"""Extract text and images from a Google Drive file."""
|
||||
file_id = file["id"]
|
||||
file_name = file["name"]
|
||||
@@ -324,35 +313,10 @@ def _download_and_extract_sections_basic(
|
||||
def response_call() -> bytes:
|
||||
return download_request(service, file_id, size_threshold)
|
||||
|
||||
def _extract_tabular(
|
||||
raw_bytes: bytes, name: str, content_type: str
|
||||
) -> BasicExtractionResult:
|
||||
if raw_file_callback is not None:
|
||||
result = extract_and_stage_tabular_file(
|
||||
file=io.BytesIO(raw_bytes),
|
||||
file_name=name,
|
||||
content_type=content_type,
|
||||
raw_file_callback=raw_file_callback,
|
||||
link=link,
|
||||
)
|
||||
return BasicExtractionResult(
|
||||
sections=list(result.sections),
|
||||
staged_file_id=result.staged_file_id,
|
||||
)
|
||||
return BasicExtractionResult(
|
||||
sections=list(
|
||||
tabular_file_to_sections(
|
||||
io.BytesIO(raw_bytes),
|
||||
file_name=name,
|
||||
link=link,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
if mime_type in OnyxMimeTypes.IMAGE_MIME_TYPES:
|
||||
# Skip images if not explicitly enabled
|
||||
if not allow_images:
|
||||
return BasicExtractionResult(sections=[])
|
||||
return []
|
||||
|
||||
# Store images for later processing
|
||||
sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
@@ -368,7 +332,7 @@ def _download_and_extract_sections_basic(
|
||||
sections.append(section)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process image {file_name}: {e}")
|
||||
return BasicExtractionResult(sections=sections)
|
||||
return sections
|
||||
|
||||
# For Google Docs, Sheets, and Slides, export via the Drive API
|
||||
if mime_type in GOOGLE_MIME_TYPES_TO_EXPORT:
|
||||
@@ -379,35 +343,37 @@ def _download_and_extract_sections_basic(
|
||||
response = _download_request(request, file_id, size_threshold)
|
||||
if not response:
|
||||
logger.warning(f"Failed to export {file_name} as {export_mime_type}")
|
||||
return BasicExtractionResult(sections=[])
|
||||
return []
|
||||
|
||||
if export_mime_type in OnyxMimeTypes.TABULAR_MIME_TYPES:
|
||||
# Synthesize an extension on the filename
|
||||
ext = ".xlsx" if export_mime_type == SPREADSHEET_MIME_TYPE else ".csv"
|
||||
return _extract_tabular(
|
||||
raw_bytes=response,
|
||||
name=f"{file_name}{ext}",
|
||||
content_type=export_mime_type,
|
||||
return list(
|
||||
tabular_file_to_sections(
|
||||
io.BytesIO(response),
|
||||
file_name=f"{file_name}{ext}",
|
||||
link=link,
|
||||
)
|
||||
)
|
||||
|
||||
text = response.decode("utf-8")
|
||||
return BasicExtractionResult(sections=[TextSection(link=link, text=text)])
|
||||
return [TextSection(link=link, text=text)]
|
||||
|
||||
# Process based on mime type
|
||||
if mime_type == "text/plain":
|
||||
try:
|
||||
text = response_call().decode("utf-8")
|
||||
return BasicExtractionResult(sections=[TextSection(link=link, text=text)])
|
||||
return [TextSection(link=link, text=text)]
|
||||
except UnicodeDecodeError as e:
|
||||
logger.warning(f"Failed to extract text from {file_name}: {e}")
|
||||
return BasicExtractionResult(sections=[])
|
||||
return []
|
||||
|
||||
elif (
|
||||
mime_type
|
||||
== "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
||||
):
|
||||
text, _ = read_docx_file(io.BytesIO(response_call()))
|
||||
return BasicExtractionResult(sections=[TextSection(link=link, text=text)])
|
||||
return [TextSection(link=link, text=text)]
|
||||
|
||||
elif (
|
||||
mime_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
||||
@@ -436,9 +402,7 @@ def _download_and_extract_sections_basic(
|
||||
== "application/vnd.openxmlformats-officedocument.presentationml.presentation"
|
||||
):
|
||||
text = pptx_to_text(io.BytesIO(response_call()), file_name=file_name)
|
||||
return BasicExtractionResult(
|
||||
sections=[TextSection(link=link, text=text)] if text else []
|
||||
)
|
||||
return [TextSection(link=link, text=text)] if text else []
|
||||
|
||||
elif mime_type == "application/pdf":
|
||||
text, _pdf_meta, images = read_pdf_file(io.BytesIO(response_call()))
|
||||
@@ -458,20 +422,20 @@ def _download_and_extract_sections_basic(
|
||||
pdf_sections.append(section)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process PDF images in {file_name}: {e}")
|
||||
return BasicExtractionResult(sections=pdf_sections)
|
||||
return pdf_sections
|
||||
|
||||
# Final attempt at extracting text
|
||||
file_ext = get_file_ext(file.get("name", ""))
|
||||
if file_ext not in OnyxFileExtensions.ALL_ALLOWED_EXTENSIONS:
|
||||
logger.warning(f"Skipping file {file.get('name')} due to extension.")
|
||||
return BasicExtractionResult(sections=[])
|
||||
return []
|
||||
|
||||
try:
|
||||
text = extract_file_text(io.BytesIO(response_call()), file_name)
|
||||
return BasicExtractionResult(sections=[TextSection(link=link, text=text)])
|
||||
return [TextSection(link=link, text=text)]
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to extract text from {file_name}: {e}")
|
||||
return BasicExtractionResult(sections=[])
|
||||
return []
|
||||
|
||||
|
||||
def _find_nth(haystack: str, needle: str, n: int, start: int = 0) -> int:
|
||||
@@ -600,7 +564,6 @@ def convert_drive_item_to_document(
|
||||
permission_sync_context: PermissionSyncContext | None,
|
||||
retriever_emails: list[str],
|
||||
file: GoogleDriveFileType,
|
||||
raw_file_callback: RawFileCallback | None = None,
|
||||
) -> Document | ConnectorFailure | None:
|
||||
"""
|
||||
Attempt to convert a drive item to a document with each retriever email
|
||||
@@ -627,7 +590,6 @@ def convert_drive_item_to_document(
|
||||
retriever_email,
|
||||
file,
|
||||
permission_sync_context,
|
||||
raw_file_callback,
|
||||
)
|
||||
|
||||
# There are a variety of permissions-based errors that occasionally occur
|
||||
@@ -673,13 +635,11 @@ def _convert_drive_item_to_document(
|
||||
# if not specified, we will not sync permissions
|
||||
# will also be a no-op if EE is not enabled
|
||||
permission_sync_context: PermissionSyncContext | None,
|
||||
raw_file_callback: RawFileCallback | None = None,
|
||||
) -> Document | ConnectorFailure | None:
|
||||
"""
|
||||
Main entry point for converting a Google Drive file => Document object.
|
||||
"""
|
||||
sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
staged_file_id: str | None = None
|
||||
|
||||
# Only construct these services when needed
|
||||
def _get_drive_service() -> GoogleDriveService:
|
||||
@@ -726,17 +686,10 @@ def _convert_drive_item_to_document(
|
||||
logger.debug(
|
||||
f"found smart chips in {file.get('name')}, aligning with basic sections"
|
||||
)
|
||||
basic_extraction = _download_and_extract_sections_basic(
|
||||
file,
|
||||
_get_drive_service(),
|
||||
allow_images,
|
||||
size_threshold,
|
||||
raw_file_callback,
|
||||
basic_sections = _download_and_extract_sections_basic(
|
||||
file, _get_drive_service(), allow_images, size_threshold
|
||||
)
|
||||
sections = align_basic_advanced(
|
||||
basic_extraction.sections, doc_sections
|
||||
)
|
||||
staged_file_id = basic_extraction.staged_file_id
|
||||
sections = align_basic_advanced(basic_sections, doc_sections)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
@@ -744,15 +697,9 @@ def _convert_drive_item_to_document(
|
||||
)
|
||||
# Not Google Doc, attempt basic extraction
|
||||
else:
|
||||
basic_extraction = _download_and_extract_sections_basic(
|
||||
file,
|
||||
_get_drive_service(),
|
||||
allow_images,
|
||||
size_threshold,
|
||||
raw_file_callback,
|
||||
sections = _download_and_extract_sections_basic(
|
||||
file, _get_drive_service(), allow_images, size_threshold
|
||||
)
|
||||
sections = basic_extraction.sections
|
||||
staged_file_id = basic_extraction.staged_file_id
|
||||
|
||||
# If we still don't have any sections, skip this file
|
||||
if not sections:
|
||||
@@ -813,7 +760,6 @@ def _convert_drive_item_to_document(
|
||||
),
|
||||
external_access=external_access,
|
||||
parent_hierarchy_raw_node_id=(file.get("parents") or [None])[0],
|
||||
file_id=staged_file_id,
|
||||
)
|
||||
except Exception as e:
|
||||
doc_id = "unknown"
|
||||
|
||||
@@ -167,12 +167,9 @@ class GoogleDriveCheckpoint(ConnectorCheckpoint):
|
||||
default_factory=ThreadSafeSet
|
||||
)
|
||||
|
||||
# Maps email → set of folder IDs that email should skip when walking the
|
||||
# parent chain. Covers two cases:
|
||||
# 1. Folders where that email confirmed no accessible parent (true orphans).
|
||||
# 2. Intermediate folders on a path that dead-ended at a confirmed orphan —
|
||||
# backfilled so future walks short-circuit earlier in the chain.
|
||||
# In both cases _get_folder_metadata skips the API call and returns None.
|
||||
# Maps email → set of IDs of folders where that email confirmed no accessible parent.
|
||||
# Avoids redundant API calls when the same (folder, email) pair is
|
||||
# encountered again within the same retrieval run.
|
||||
failed_folder_ids_by_email: ThreadSafeDict[str, ThreadSafeSet[str]] = Field(
|
||||
default_factory=ThreadSafeDict
|
||||
)
|
||||
|
||||
@@ -15,7 +15,6 @@ from onyx.connectors.models import ConnectorFailure
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import HierarchyNode
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.file_store.staging import RawFileCallback
|
||||
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
|
||||
from onyx.utils.variable_functionality import fetch_ee_implementation_or_noop
|
||||
|
||||
@@ -43,9 +42,6 @@ class NormalizationResult(BaseModel):
|
||||
class BaseConnector(abc.ABC, Generic[CT]):
|
||||
REDIS_KEY_PREFIX = "da_connector_data:"
|
||||
|
||||
# Optional raw-file persistence hook to save original file
|
||||
raw_file_callback: RawFileCallback | None = None
|
||||
|
||||
@abc.abstractmethod
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
raise NotImplementedError
|
||||
@@ -92,15 +88,6 @@ class BaseConnector(abc.ABC, Generic[CT]):
|
||||
"""Implement if the underlying connector wants to skip/allow image downloading
|
||||
based on the application level image analysis setting."""
|
||||
|
||||
def set_raw_file_callback(self, callback: RawFileCallback) -> None:
|
||||
"""Inject the per-attempt raw-file persistence callback.
|
||||
|
||||
Wired up by the docfetching entrypoint via `instantiate_connector`.
|
||||
Connectors that don't care about persisting raw bytes can ignore this
|
||||
— `raw_file_callback` simply stays `None`.
|
||||
"""
|
||||
self.raw_file_callback = callback
|
||||
|
||||
@classmethod
|
||||
def normalize_url(cls, url: str) -> "NormalizationResult": # noqa: ARG003
|
||||
"""Normalize a URL to match the canonical Document.id format used during ingestion.
|
||||
|
||||
@@ -231,8 +231,6 @@ class DocumentBase(BaseModel):
|
||||
# Set during docfetching after hierarchy nodes are cached
|
||||
parent_hierarchy_node_id: int | None = None
|
||||
|
||||
file_id: str | None = None
|
||||
|
||||
def get_title_for_document_index(
|
||||
self,
|
||||
) -> str | None:
|
||||
@@ -372,7 +370,6 @@ class Document(DocumentBase):
|
||||
secondary_owners=base.secondary_owners,
|
||||
title=base.title,
|
||||
from_ingestion_api=base.from_ingestion_api,
|
||||
file_id=base.file_id,
|
||||
)
|
||||
|
||||
def __sizeof__(self) -> int:
|
||||
|
||||
@@ -75,8 +75,6 @@ from onyx.file_processing.file_types import OnyxMimeTypes
|
||||
from onyx.file_processing.image_utils import store_image_and_create_section
|
||||
from onyx.utils.b64 import get_image_type_from_bytes
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.url import SSRFException
|
||||
from onyx.utils.url import validate_outbound_http_url
|
||||
|
||||
logger = setup_logger()
|
||||
SLIM_BATCH_SIZE = 1000
|
||||
@@ -983,42 +981,6 @@ class SharepointConnector(
|
||||
raise ConnectorValidationError(
|
||||
"Site URLs must be full Sharepoint URLs (e.g. https://your-tenant.sharepoint.com/sites/your-site or https://your-tenant.sharepoint.com/teams/your-team)"
|
||||
)
|
||||
try:
|
||||
validate_outbound_http_url(site_url, https_only=True)
|
||||
except (SSRFException, ValueError) as e:
|
||||
raise ConnectorValidationError(
|
||||
f"Invalid site URL '{site_url}': {e}"
|
||||
) from e
|
||||
|
||||
# Probe RoleAssignments permission — required for permission sync.
|
||||
# Only runs when credentials have been loaded.
|
||||
if self.msal_app and self.sp_tenant_domain and self.sites:
|
||||
try:
|
||||
token_response = acquire_token_for_rest(
|
||||
self.msal_app,
|
||||
self.sp_tenant_domain,
|
||||
self.sharepoint_domain_suffix,
|
||||
)
|
||||
probe_url = (
|
||||
f"{self.sites[0].rstrip('/')}/_api/web/roleassignments?$top=1"
|
||||
)
|
||||
resp = requests.get(
|
||||
probe_url,
|
||||
headers={"Authorization": f"Bearer {token_response.accessToken}"},
|
||||
timeout=10,
|
||||
)
|
||||
if resp.status_code in (401, 403):
|
||||
raise ConnectorValidationError(
|
||||
"The Azure AD app registration is missing the required SharePoint permission "
|
||||
"to read role assignments. Please grant 'Sites.FullControl.All' "
|
||||
"(application permission) in the Azure portal and re-run admin consent."
|
||||
)
|
||||
except ConnectorValidationError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"RoleAssignments permission probe failed (non-blocking): {e}"
|
||||
)
|
||||
|
||||
def _extract_tenant_domain_from_sites(self) -> str | None:
|
||||
"""Extract the tenant domain from configured site URLs.
|
||||
@@ -1914,22 +1876,16 @@ class SharepointConnector(
|
||||
logger.debug(
|
||||
f"Processing site page: {site_page.get('webUrl', site_page.get('name', 'Unknown'))}"
|
||||
)
|
||||
try:
|
||||
ctx = self._create_rest_client_context(site_descriptor.url)
|
||||
doc_batch.append(
|
||||
_convert_sitepage_to_slim_document(
|
||||
site_page,
|
||||
ctx,
|
||||
self.graph_client,
|
||||
parent_hierarchy_raw_node_id=site_descriptor.url,
|
||||
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to process site page "
|
||||
f"{site_page.get('webUrl', site_page.get('name', 'Unknown'))}: {e}"
|
||||
ctx = self._create_rest_client_context(site_descriptor.url)
|
||||
doc_batch.append(
|
||||
_convert_sitepage_to_slim_document(
|
||||
site_page,
|
||||
ctx,
|
||||
self.graph_client,
|
||||
parent_hierarchy_raw_node_id=site_descriptor.url,
|
||||
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
|
||||
)
|
||||
)
|
||||
if len(doc_batch) >= SLIM_BATCH_SIZE:
|
||||
yield doc_batch
|
||||
doc_batch = []
|
||||
|
||||
@@ -19,7 +19,6 @@ from playwright.sync_api import Playwright
|
||||
from playwright.sync_api import sync_playwright
|
||||
from playwright.sync_api import TimeoutError
|
||||
from requests_oauthlib import OAuth2Session
|
||||
from typing_extensions import override
|
||||
from urllib3.exceptions import MaxRetryError
|
||||
|
||||
from onyx.configs.app_configs import INDEX_BATCH_SIZE
|
||||
@@ -33,16 +32,11 @@ from onyx.connectors.exceptions import CredentialExpiredError
|
||||
from onyx.connectors.exceptions import InsufficientPermissionsError
|
||||
from onyx.connectors.exceptions import UnexpectedValidationError
|
||||
from onyx.connectors.interfaces import GenerateDocumentsOutput
|
||||
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
|
||||
from onyx.connectors.interfaces import LoadConnector
|
||||
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
|
||||
from onyx.connectors.interfaces import SlimConnector
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import HierarchyNode
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.file_processing.html_utils import web_html_cleanup
|
||||
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.sitemap import list_pages_for_site
|
||||
from onyx.utils.web_content import extract_pdf_text
|
||||
@@ -61,6 +55,8 @@ class ScrapeSessionContext:
|
||||
self.visited_links: set[str] = set()
|
||||
self.content_hashes: set[int] = set()
|
||||
|
||||
self.doc_batch: list[Document | HierarchyNode] = []
|
||||
|
||||
self.at_least_one_doc: bool = False
|
||||
self.last_error: str | None = None
|
||||
self.needs_retry: bool = False
|
||||
@@ -442,7 +438,7 @@ def _handle_cookies(context: BrowserContext, url: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
class WebConnector(LoadConnector, SlimConnector):
|
||||
class WebConnector(LoadConnector):
|
||||
MAX_RETRIES = 3
|
||||
|
||||
def __init__(
|
||||
@@ -497,14 +493,8 @@ class WebConnector(LoadConnector, SlimConnector):
|
||||
index: int,
|
||||
initial_url: str,
|
||||
session_ctx: ScrapeSessionContext,
|
||||
slim: bool = False,
|
||||
) -> ScrapeResult:
|
||||
"""Returns a ScrapeResult object with a doc and retry flag.
|
||||
|
||||
When slim=True, skips scroll, PDF content download, and content extraction.
|
||||
The bot-detection render wait (5s) fires on CF/403 responses regardless of slim.
|
||||
networkidle is always awaited so JS-rendered links are discovered correctly.
|
||||
"""
|
||||
"""Returns a ScrapeResult object with a doc and retry flag."""
|
||||
|
||||
if session_ctx.playwright is None:
|
||||
raise RuntimeError("scrape_context.playwright is None")
|
||||
@@ -525,16 +515,7 @@ class WebConnector(LoadConnector, SlimConnector):
|
||||
is_pdf = is_pdf_resource(initial_url, content_type)
|
||||
|
||||
if is_pdf:
|
||||
if slim:
|
||||
result.doc = Document(
|
||||
id=initial_url,
|
||||
sections=[],
|
||||
source=DocumentSource.WEB,
|
||||
semantic_identifier=initial_url,
|
||||
metadata={},
|
||||
)
|
||||
return result
|
||||
|
||||
# PDF files are not checked for links
|
||||
response = requests.get(initial_url, headers=DEFAULT_HEADERS)
|
||||
page_text, metadata = extract_pdf_text(response.content)
|
||||
last_modified = response.headers.get("Last-Modified")
|
||||
@@ -565,20 +546,14 @@ class WebConnector(LoadConnector, SlimConnector):
|
||||
timeout=30000, # 30 seconds
|
||||
wait_until="commit", # Wait for navigation to commit
|
||||
)
|
||||
# Give the page a moment to start rendering after navigation commits.
|
||||
# Allows CloudFlare and other bot-detection challenges to complete.
|
||||
page.wait_for_timeout(PAGE_RENDER_TIMEOUT_MS)
|
||||
|
||||
# Bot-detection JS challenges (CloudFlare, Imperva, etc.) need a moment
|
||||
# to start network activity after commit before networkidle is meaningful.
|
||||
# We detect this via the cf-ray header (CloudFlare) or a 403 response,
|
||||
# which is the common entry point for JS-challenge-based bot detection.
|
||||
is_bot_challenge = page_response is not None and (
|
||||
page_response.header_value("cf-ray") is not None
|
||||
or page_response.status == 403
|
||||
)
|
||||
if is_bot_challenge:
|
||||
page.wait_for_timeout(PAGE_RENDER_TIMEOUT_MS)
|
||||
|
||||
# Wait for network activity to settle (handles SPAs, CF challenges, etc.)
|
||||
# Wait for network activity to settle so SPAs that fetch content
|
||||
# asynchronously after the initial JS bundle have time to render.
|
||||
try:
|
||||
# A bit of extra time to account for long-polling, websockets, etc.
|
||||
page.wait_for_load_state("networkidle", timeout=PAGE_RENDER_TIMEOUT_MS)
|
||||
except TimeoutError:
|
||||
pass
|
||||
@@ -601,7 +576,7 @@ class WebConnector(LoadConnector, SlimConnector):
|
||||
session_ctx.visited_links.add(initial_url)
|
||||
|
||||
# If we got here, the request was successful
|
||||
if not slim and self.scroll_before_scraping:
|
||||
if self.scroll_before_scraping:
|
||||
scroll_attempts = 0
|
||||
previous_height = page.evaluate("document.body.scrollHeight")
|
||||
while scroll_attempts < WEB_CONNECTOR_MAX_SCROLL_ATTEMPTS:
|
||||
@@ -640,16 +615,6 @@ class WebConnector(LoadConnector, SlimConnector):
|
||||
result.retry = True
|
||||
return result
|
||||
|
||||
if slim:
|
||||
result.doc = Document(
|
||||
id=initial_url,
|
||||
sections=[],
|
||||
source=DocumentSource.WEB,
|
||||
semantic_identifier=initial_url,
|
||||
metadata={},
|
||||
)
|
||||
return result
|
||||
|
||||
# after this point, we don't need the caller to retry
|
||||
parsed_html = web_html_cleanup(soup, self.mintlify_cleanup)
|
||||
|
||||
@@ -701,13 +666,9 @@ class WebConnector(LoadConnector, SlimConnector):
|
||||
|
||||
return result
|
||||
|
||||
def load_from_state(self, slim: bool = False) -> GenerateDocumentsOutput:
|
||||
"""Traverses through all pages found on the website and converts them into
|
||||
documents.
|
||||
|
||||
When slim=True, yields SlimDocument objects (URL id only, no content).
|
||||
Playwright is used in all modes — slim skips content extraction only.
|
||||
"""
|
||||
def load_from_state(self) -> GenerateDocumentsOutput:
|
||||
"""Traverses through all pages found on the website
|
||||
and converts them into documents"""
|
||||
|
||||
if not self.to_visit_list:
|
||||
raise ValueError("No URLs to visit")
|
||||
@@ -718,8 +679,6 @@ class WebConnector(LoadConnector, SlimConnector):
|
||||
session_ctx = ScrapeSessionContext(base_url, self.to_visit_list)
|
||||
session_ctx.initialize()
|
||||
|
||||
batch: list[Document | SlimDocument | HierarchyNode] = []
|
||||
|
||||
while session_ctx.to_visit:
|
||||
initial_url = session_ctx.to_visit.pop()
|
||||
if initial_url in session_ctx.visited_links:
|
||||
@@ -734,9 +693,7 @@ class WebConnector(LoadConnector, SlimConnector):
|
||||
continue
|
||||
|
||||
index = len(session_ctx.visited_links)
|
||||
logger.info(
|
||||
f"{index}: {'Slim-visiting' if slim else 'Visiting'} {initial_url}"
|
||||
)
|
||||
logger.info(f"{index}: Visiting {initial_url}")
|
||||
|
||||
# Add retry mechanism with exponential backoff
|
||||
retry_count = 0
|
||||
@@ -751,14 +708,12 @@ class WebConnector(LoadConnector, SlimConnector):
|
||||
time.sleep(delay)
|
||||
|
||||
try:
|
||||
result = self._do_scrape(index, initial_url, session_ctx, slim=slim)
|
||||
result = self._do_scrape(index, initial_url, session_ctx)
|
||||
if result.retry:
|
||||
continue
|
||||
|
||||
if result.doc:
|
||||
batch.append(
|
||||
SlimDocument(id=result.doc.id) if slim else result.doc
|
||||
)
|
||||
session_ctx.doc_batch.append(result.doc)
|
||||
except Exception as e:
|
||||
session_ctx.last_error = f"Failed to fetch '{initial_url}': {e}"
|
||||
logger.exception(session_ctx.last_error)
|
||||
@@ -769,16 +724,16 @@ class WebConnector(LoadConnector, SlimConnector):
|
||||
|
||||
break # success / don't retry
|
||||
|
||||
if len(batch) >= self.batch_size:
|
||||
if len(session_ctx.doc_batch) >= self.batch_size:
|
||||
session_ctx.initialize()
|
||||
session_ctx.at_least_one_doc = True
|
||||
yield batch # ty: ignore[invalid-yield]
|
||||
batch = []
|
||||
yield session_ctx.doc_batch
|
||||
session_ctx.doc_batch = []
|
||||
|
||||
if batch:
|
||||
if session_ctx.doc_batch:
|
||||
session_ctx.stop()
|
||||
session_ctx.at_least_one_doc = True
|
||||
yield batch # ty: ignore[invalid-yield]
|
||||
yield session_ctx.doc_batch
|
||||
|
||||
if not session_ctx.at_least_one_doc:
|
||||
if session_ctx.last_error:
|
||||
@@ -787,22 +742,6 @@ class WebConnector(LoadConnector, SlimConnector):
|
||||
|
||||
session_ctx.stop()
|
||||
|
||||
@override
|
||||
def retrieve_all_slim_docs(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
) -> GenerateSlimDocumentOutput:
|
||||
"""Yields SlimDocuments for all pages reachable from the configured URLs.
|
||||
|
||||
Uses the same Playwright crawl as full indexing but skips content extraction,
|
||||
scroll, and PDF downloads. The 5s render wait fires only on bot-detection
|
||||
responses (CloudFlare cf-ray header or HTTP 403).
|
||||
The start/end parameters are ignored — WEB connector has no incremental path.
|
||||
"""
|
||||
yield from self.load_from_state(slim=True) # ty: ignore[invalid-yield]
|
||||
|
||||
def validate_connector_settings(self) -> None:
|
||||
# Make sure we have at least one valid URL to check
|
||||
if not self.to_visit_list:
|
||||
|
||||
@@ -52,7 +52,6 @@ from onyx.db.utils import DocumentRow
|
||||
from onyx.db.utils import model_to_dict
|
||||
from onyx.db.utils import SortOrder
|
||||
from onyx.document_index.interfaces import DocumentMetadata
|
||||
from onyx.file_store.staging import delete_files_best_effort
|
||||
from onyx.kg.models import KGStage
|
||||
from onyx.server.documents.models import ConnectorCredentialPairIdentifier
|
||||
from onyx.utils.logger import setup_logger
|
||||
@@ -697,7 +696,6 @@ def upsert_documents(
|
||||
else {}
|
||||
),
|
||||
doc_metadata=doc.doc_metadata,
|
||||
file_id=doc.file_id,
|
||||
)
|
||||
)
|
||||
for doc in seen_documents.values()
|
||||
@@ -714,7 +712,6 @@ def upsert_documents(
|
||||
"secondary_owners": insert_stmt.excluded.secondary_owners,
|
||||
"doc_metadata": insert_stmt.excluded.doc_metadata,
|
||||
"parent_hierarchy_node_id": insert_stmt.excluded.parent_hierarchy_node_id,
|
||||
"file_id": insert_stmt.excluded.file_id,
|
||||
}
|
||||
if includes_permissions:
|
||||
# Use COALESCE to preserve existing permissions when new values are NULL.
|
||||
@@ -928,22 +925,6 @@ def delete_documents__no_commit(db_session: Session, document_ids: list[str]) ->
|
||||
db_session.execute(delete(DbDocument).where(DbDocument.id.in_(document_ids)))
|
||||
|
||||
|
||||
def get_file_ids_for_document_ids(
|
||||
db_session: Session,
|
||||
document_ids: list[str],
|
||||
) -> list[str]:
|
||||
"""Return the non-null `file_id` values attached to the given documents."""
|
||||
if not document_ids:
|
||||
return []
|
||||
rows = (
|
||||
db_session.query(DbDocument.file_id)
|
||||
.filter(DbDocument.id.in_(document_ids))
|
||||
.filter(DbDocument.file_id.isnot(None))
|
||||
.all()
|
||||
)
|
||||
return [row.file_id for row in rows if row.file_id is not None]
|
||||
|
||||
|
||||
def delete_documents_complete__no_commit(
|
||||
db_session: Session, document_ids: list[str]
|
||||
) -> None:
|
||||
@@ -987,27 +968,6 @@ def delete_documents_complete__no_commit(
|
||||
delete_documents__no_commit(db_session, document_ids)
|
||||
|
||||
|
||||
def delete_documents_complete(
|
||||
db_session: Session,
|
||||
document_ids: list[str],
|
||||
) -> None:
|
||||
"""Fully remove documents AND best-effort delete their attached files.
|
||||
|
||||
To be used when a document is finished and should be disposed of.
|
||||
Removes the row and the potentially associated file.
|
||||
"""
|
||||
file_ids_to_delete = get_file_ids_for_document_ids(
|
||||
db_session=db_session,
|
||||
document_ids=document_ids,
|
||||
)
|
||||
delete_documents_complete__no_commit(
|
||||
db_session=db_session,
|
||||
document_ids=document_ids,
|
||||
)
|
||||
db_session.commit()
|
||||
delete_files_best_effort(file_ids_to_delete)
|
||||
|
||||
|
||||
def delete_all_documents_for_connector_credential_pair(
|
||||
db_session: Session,
|
||||
connector_id: int,
|
||||
@@ -1039,9 +999,10 @@ def delete_all_documents_for_connector_credential_pair(
|
||||
if not document_ids:
|
||||
break
|
||||
|
||||
delete_documents_complete(
|
||||
delete_documents_complete__no_commit(
|
||||
db_session=db_session, document_ids=list(document_ids)
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
if time.monotonic() - start_time > timeout:
|
||||
raise RuntimeError("Timeout reached while deleting documents")
|
||||
|
||||
@@ -62,21 +62,6 @@ def delete_filerecord_by_file_id(
|
||||
db_session.query(FileRecord).filter_by(file_id=file_id).delete()
|
||||
|
||||
|
||||
def update_filerecord_origin(
|
||||
file_id: str,
|
||||
from_origin: FileOrigin,
|
||||
to_origin: FileOrigin,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
"""Change a file_record's `file_origin`, filtered on the current origin
|
||||
so the update is idempotent. Caller owns the commit.
|
||||
"""
|
||||
db_session.query(FileRecord).filter(
|
||||
FileRecord.file_id == file_id,
|
||||
FileRecord.file_origin == from_origin,
|
||||
).update({FileRecord.file_origin: to_origin})
|
||||
|
||||
|
||||
def upsert_filerecord(
|
||||
file_id: str,
|
||||
display_name: str,
|
||||
|
||||
@@ -952,7 +952,6 @@ class Document(Base):
|
||||
semantic_id: Mapped[str] = mapped_column(NullFilteredString)
|
||||
# First Section's link
|
||||
link: Mapped[str | None] = mapped_column(NullFilteredString, nullable=True)
|
||||
file_id: Mapped[str | None] = mapped_column(String, nullable=True)
|
||||
|
||||
# The updated time is also used as a measure of the last successful state of the doc
|
||||
# pulled from the source (to help skip reindexing already updated docs in case of
|
||||
|
||||
@@ -2,17 +2,11 @@ import datetime
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import joinedload
|
||||
from sqlalchemy.orm import selectinload
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.db.models import ChatMessage
|
||||
from onyx.db.models import ChatSession
|
||||
from onyx.db.models import ChatSessionSharedStatus
|
||||
from onyx.db.models import FileRecord
|
||||
from onyx.db.models import Persona
|
||||
from onyx.db.models import Project__UserFile
|
||||
from onyx.db.models import UserFile
|
||||
@@ -114,73 +108,13 @@ def update_last_accessed_at_for_user_files(
|
||||
db_session.commit()
|
||||
|
||||
|
||||
def get_file_id_by_user_file_id(
|
||||
user_file_id: str, user_id: UUID, db_session: Session
|
||||
) -> str | None:
|
||||
user_file = (
|
||||
db_session.query(UserFile)
|
||||
.filter(UserFile.id == user_file_id, UserFile.user_id == user_id)
|
||||
.first()
|
||||
)
|
||||
def get_file_id_by_user_file_id(user_file_id: str, db_session: Session) -> str | None:
|
||||
user_file = db_session.query(UserFile).filter(UserFile.id == user_file_id).first()
|
||||
if user_file:
|
||||
return user_file.file_id
|
||||
return None
|
||||
|
||||
|
||||
def user_can_access_chat_file(file_id: str, user_id: UUID, db_session: Session) -> bool:
|
||||
"""Return True if `user_id` is allowed to read the raw `file_id` served by
|
||||
`GET /chat/file/{file_id}`. Access is granted when any of:
|
||||
|
||||
- The `file_id` is the storage id of a `UserFile` owned by the user.
|
||||
- The `file_id` is a persona avatar (`Persona.uploaded_image_id`); avatars
|
||||
are visible to any authenticated user.
|
||||
- The `file_id` appears in a `ChatMessage.files` descriptor of a chat
|
||||
session the user owns or a session publicly shared via
|
||||
`ChatSessionSharedStatus.PUBLIC`.
|
||||
"""
|
||||
owns_user_file = db_session.query(
|
||||
select(UserFile.id)
|
||||
.where(UserFile.file_id == file_id, UserFile.user_id == user_id)
|
||||
.exists()
|
||||
).scalar()
|
||||
if owns_user_file:
|
||||
return True
|
||||
|
||||
# TODO: move persona avatars to a dedicated endpoint (e.g.
|
||||
# /assistants/{id}/avatar) so this branch can be removed. /chat/file is
|
||||
# currently overloaded with multiple asset classes (user files, chat
|
||||
# attachments, tool outputs, avatars), forcing this access-check fan-out.
|
||||
#
|
||||
# Restrict the avatar path to CHAT_UPLOAD-origin files so an attacker
|
||||
# cannot bind another user's USER_FILE (or any other origin) to their
|
||||
# own persona and read it through this check.
|
||||
is_persona_avatar = db_session.query(
|
||||
select(Persona.id)
|
||||
.join(FileRecord, FileRecord.file_id == Persona.uploaded_image_id)
|
||||
.where(
|
||||
Persona.uploaded_image_id == file_id,
|
||||
FileRecord.file_origin == FileOrigin.CHAT_UPLOAD,
|
||||
)
|
||||
.exists()
|
||||
).scalar()
|
||||
if is_persona_avatar:
|
||||
return True
|
||||
|
||||
chat_file_stmt = (
|
||||
select(ChatMessage.id)
|
||||
.join(ChatSession, ChatMessage.chat_session_id == ChatSession.id)
|
||||
.where(ChatMessage.files.op("@>")([{"id": file_id}]))
|
||||
.where(
|
||||
or_(
|
||||
ChatSession.user_id == user_id,
|
||||
ChatSession.shared_status == ChatSessionSharedStatus.PUBLIC,
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
)
|
||||
return db_session.execute(chat_file_stmt).first() is not None
|
||||
|
||||
|
||||
def get_file_ids_by_user_file_ids(
|
||||
user_file_ids: list[UUID], db_session: Session
|
||||
) -> list[str]:
|
||||
|
||||
@@ -98,9 +98,6 @@ class DocumentMetadata:
|
||||
# The resolved database ID of the parent hierarchy node (folder/container)
|
||||
parent_hierarchy_node_id: int | None = None
|
||||
|
||||
# Opt-in pointer to the persisted raw file for this document (file_store id).
|
||||
file_id: str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class VespaDocumentFields:
|
||||
|
||||
@@ -368,40 +368,6 @@ def extract_docx_images(docx_bytes: IO[Any]) -> Iterator[tuple[bytes, str]]:
|
||||
logger.exception("Failed to extract all docx images")
|
||||
|
||||
|
||||
def count_docx_embedded_images(file: IO[Any], cap: int) -> int:
|
||||
"""Return the number of embedded images in a docx, short-circuiting at cap+1.
|
||||
|
||||
Mirrors count_pdf_embedded_images so upload validation can apply the same
|
||||
per-file/per-batch caps. Returns a value > cap once the count exceeds the
|
||||
cap so callers do not iterate every media entry just to report a number.
|
||||
Always restores the file pointer to its original position before returning.
|
||||
"""
|
||||
try:
|
||||
start_pos = file.tell()
|
||||
except Exception:
|
||||
start_pos = None
|
||||
try:
|
||||
if start_pos is not None:
|
||||
file.seek(0)
|
||||
count = 0
|
||||
with zipfile.ZipFile(file) as z:
|
||||
for name in z.namelist():
|
||||
if name.startswith("word/media/"):
|
||||
count += 1
|
||||
if count > cap:
|
||||
return count
|
||||
return count
|
||||
except Exception:
|
||||
logger.warning("Failed to count embedded images in docx", exc_info=True)
|
||||
return 0
|
||||
finally:
|
||||
if start_pos is not None:
|
||||
try:
|
||||
file.seek(start_pos)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def read_docx_file(
|
||||
file: IO[Any],
|
||||
file_name: str = "",
|
||||
|
||||
@@ -1,92 +0,0 @@
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
from typing import IO
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.db.file_record import update_filerecord_origin
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
# (content, content_type) -> file_id
|
||||
RawFileCallback = Callable[[IO[bytes], str], str]
|
||||
|
||||
|
||||
def stage_raw_file(
|
||||
content: IO,
|
||||
content_type: str,
|
||||
*,
|
||||
metadata: dict[str, Any],
|
||||
) -> str:
|
||||
"""Persist raw bytes to the file store with FileOrigin.INDEXING_STAGING.
|
||||
|
||||
`metadata` is attached to the file_record so that downstream promotion
|
||||
(in docprocessing) and orphan reaping (TTL janitor) can locate the file
|
||||
by its originating context.
|
||||
"""
|
||||
file_store = get_default_file_store()
|
||||
file_id = file_store.save_file(
|
||||
content=content,
|
||||
display_name=None,
|
||||
file_origin=FileOrigin.INDEXING_STAGING,
|
||||
file_type=content_type,
|
||||
file_metadata=metadata,
|
||||
)
|
||||
return file_id
|
||||
|
||||
|
||||
def build_raw_file_callback(
|
||||
*,
|
||||
index_attempt_id: int,
|
||||
cc_pair_id: int,
|
||||
tenant_id: str,
|
||||
) -> RawFileCallback:
|
||||
"""Build a per-attempt callback that connectors can invoke to opt in to
|
||||
raw-file persistence. The closure binds the attempt-level context as the
|
||||
staging metadata so the connector only needs to pass per-call info
|
||||
(bytes, content_type) and gets back a file_id to attach to its Document.
|
||||
"""
|
||||
metadata: dict[str, Any] = {
|
||||
"index_attempt_id": index_attempt_id,
|
||||
"cc_pair_id": cc_pair_id,
|
||||
"tenant_id": tenant_id,
|
||||
}
|
||||
|
||||
def _callback(content: IO[bytes], content_type: str) -> str:
|
||||
return stage_raw_file(
|
||||
content=content,
|
||||
content_type=content_type,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
return _callback
|
||||
|
||||
|
||||
def delete_files_best_effort(file_ids: list[str]) -> None:
|
||||
"""Delete a list of files from the file store, logging individual
|
||||
failures rather than raising.
|
||||
"""
|
||||
if not file_ids:
|
||||
return
|
||||
file_store = get_default_file_store()
|
||||
for file_id in file_ids:
|
||||
try:
|
||||
file_store.delete_file(file_id, error_on_missing=False)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
f"Failed to delete file_id={file_id} during document cleanup"
|
||||
)
|
||||
|
||||
|
||||
def promote_staged_file(db_session: Session, file_id: str) -> None:
|
||||
"""Mark a previously-staged file as `FileOrigin.CONNECTOR`."""
|
||||
update_filerecord_origin(
|
||||
file_id=file_id,
|
||||
from_origin=FileOrigin.INDEXING_STAGING,
|
||||
to_origin=FileOrigin.CONNECTOR,
|
||||
db_session=db_session,
|
||||
)
|
||||
@@ -30,7 +30,6 @@ from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import IndexAttemptMetadata
|
||||
from onyx.connectors.models import IndexingDocument
|
||||
from onyx.connectors.models import Section
|
||||
from onyx.connectors.models import SectionType
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.db.document import get_documents_by_ids
|
||||
from onyx.db.document import upsert_document_by_connector_credential_pair
|
||||
@@ -50,7 +49,6 @@ from onyx.document_index.interfaces import DocumentMetadata
|
||||
from onyx.document_index.interfaces import IndexBatchParams
|
||||
from onyx.file_processing.image_summarization import summarize_image_with_error_handling
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.file_store.staging import promote_staged_file
|
||||
from onyx.hooks.executor import execute_hook
|
||||
from onyx.hooks.executor import HookSkipped
|
||||
from onyx.hooks.executor import HookSoftFailed
|
||||
@@ -156,7 +154,6 @@ def _upsert_documents_in_db(
|
||||
doc_metadata=doc.doc_metadata,
|
||||
# parent_hierarchy_node_id is resolved in docfetching using Redis cache
|
||||
parent_hierarchy_node_id=doc.parent_hierarchy_node_id,
|
||||
file_id=doc.file_id,
|
||||
)
|
||||
document_metadata_list.append(db_doc_metadata)
|
||||
|
||||
@@ -367,45 +364,6 @@ def index_doc_batch_with_handler(
|
||||
return index_pipeline_result
|
||||
|
||||
|
||||
def _promote_new_staged_files(
|
||||
documents: list[Document],
|
||||
previous_file_ids: dict[str, str],
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
"""Queue STAGING → CONNECTOR origin flips for every new file_id in the batch.
|
||||
|
||||
Intended to run immediately before `_upsert_documents_in_db` so the origin
|
||||
flip lands in the same commit as the `Document.file_id` write. Does not
|
||||
commit — the caller's next commit flushes these UPDATEs.
|
||||
"""
|
||||
for doc in documents:
|
||||
new_file_id = doc.file_id
|
||||
if new_file_id is None or new_file_id == previous_file_ids.get(doc.id):
|
||||
continue
|
||||
promote_staged_file(db_session=db_session, file_id=new_file_id)
|
||||
|
||||
|
||||
def _delete_replaced_files(
|
||||
documents: list[Document],
|
||||
previous_file_ids: dict[str, str],
|
||||
) -> None:
|
||||
"""Best-effort blob deletes for file_ids replaced in this batch.
|
||||
|
||||
Must run AFTER `Document.file_id` has been committed to the new
|
||||
file_id.
|
||||
"""
|
||||
file_store = get_default_file_store()
|
||||
for doc in documents:
|
||||
new_file_id = doc.file_id
|
||||
old_file_id = previous_file_ids.get(doc.id)
|
||||
if old_file_id is None or old_file_id == new_file_id:
|
||||
continue
|
||||
try:
|
||||
file_store.delete_file(old_file_id, error_on_missing=False)
|
||||
except Exception:
|
||||
logger.exception(f"Failed to delete replaced file_id={old_file_id}.")
|
||||
|
||||
|
||||
def index_doc_batch_prepare(
|
||||
documents: list[Document],
|
||||
index_attempt_metadata: IndexAttemptMetadata,
|
||||
@@ -424,11 +382,6 @@ def index_doc_batch_prepare(
|
||||
document_ids=document_ids,
|
||||
)
|
||||
|
||||
# Capture previous file_ids BEFORE any writes so we know what to reap.
|
||||
previous_file_ids: dict[str, str] = {
|
||||
db_doc.id: db_doc.file_id for db_doc in db_docs if db_doc.file_id is not None
|
||||
}
|
||||
|
||||
updatable_docs = (
|
||||
get_doc_ids_to_update(documents=documents, db_docs=db_docs)
|
||||
if not ignore_time_skip
|
||||
@@ -446,24 +399,11 @@ def index_doc_batch_prepare(
|
||||
# for all updatable docs, upsert into the DB
|
||||
# Does not include doc_updated_at which is also used to indicate a successful update
|
||||
if updatable_docs:
|
||||
# Queue the STAGING → CONNECTOR origin flips BEFORE the Document upsert
|
||||
# so `upsert_documents`' commit flushes Document.file_id and the origin
|
||||
# flip atomically
|
||||
_promote_new_staged_files(
|
||||
documents=updatable_docs,
|
||||
previous_file_ids=previous_file_ids,
|
||||
db_session=db_session,
|
||||
)
|
||||
_upsert_documents_in_db(
|
||||
documents=updatable_docs,
|
||||
index_attempt_metadata=index_attempt_metadata,
|
||||
db_session=db_session,
|
||||
)
|
||||
# Blob deletes run only after Document.file_id is durable.
|
||||
_delete_replaced_files(
|
||||
documents=updatable_docs,
|
||||
previous_file_ids=previous_file_ids,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Upserted {len(updatable_docs)} changed docs out of {len(documents)} total docs into the DB"
|
||||
@@ -590,15 +530,8 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
|
||||
Returns:
|
||||
List of IndexingDocument objects with processed_sections as list[Section]
|
||||
"""
|
||||
# Check if image extraction and analysis is enabled before trying to get a vision LLM.
|
||||
# Use section.type rather than isinstance because sections can round-trip
|
||||
# through pydantic as base Section instances (not the concrete subclass).
|
||||
has_image_section = any(
|
||||
section.type == SectionType.IMAGE
|
||||
for document in documents
|
||||
for section in document.sections
|
||||
)
|
||||
if not get_image_extraction_and_analysis_enabled() or not has_image_section:
|
||||
# Check if image extraction and analysis is enabled before trying to get a vision LLM
|
||||
if not get_image_extraction_and_analysis_enabled():
|
||||
llm = None
|
||||
else:
|
||||
# Only get the vision LLM if image processing is enabled
|
||||
|
||||
@@ -743,7 +743,13 @@ def model_is_reasoning_model(model_name: str, model_provider: str) -> bool:
|
||||
model_name,
|
||||
)
|
||||
if model_obj and "supports_reasoning" in model_obj:
|
||||
return model_obj["supports_reasoning"]
|
||||
reasoning = model_obj["supports_reasoning"]
|
||||
if reasoning is None:
|
||||
logger.error(
|
||||
f"Cannot find reasoning for name={model_name} and provider={model_provider}"
|
||||
)
|
||||
reasoning = False
|
||||
return reasoning
|
||||
|
||||
# Fallback: try using litellm.supports_reasoning() for newer models
|
||||
try:
|
||||
|
||||
@@ -166,7 +166,6 @@ from shared_configs.configs import CORS_ALLOWED_ORIGIN
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
|
||||
from shared_configs.configs import SENTRY_DSN
|
||||
from shared_configs.configs import SENTRY_TRACES_SAMPLE_RATE
|
||||
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
|
||||
|
||||
warnings.filterwarnings(
|
||||
@@ -440,7 +439,7 @@ def get_application(lifespan_override: Lifespan | None = None) -> FastAPI:
|
||||
sentry_sdk.init(
|
||||
dsn=SENTRY_DSN,
|
||||
integrations=[StarletteIntegration(), FastApiIntegration()],
|
||||
traces_sample_rate=SENTRY_TRACES_SAMPLE_RATE,
|
||||
traces_sample_rate=0.1,
|
||||
release=__version__,
|
||||
before_send=_add_instance_tags,
|
||||
)
|
||||
|
||||
@@ -15,9 +15,6 @@ from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
|
||||
from onyx.configs.constants import CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT
|
||||
from onyx.configs.constants import OnyxRedisConstants
|
||||
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
|
||||
from onyx.server.metrics.perm_sync_metrics import (
|
||||
observe_doc_perm_sync_db_update_duration,
|
||||
)
|
||||
from onyx.utils.variable_functionality import fetch_versioned_implementation
|
||||
|
||||
|
||||
@@ -192,7 +189,7 @@ class RedisConnectorPermissionSync:
|
||||
|
||||
num_permissions = 0
|
||||
num_errors = 0
|
||||
cumulative_db_update_time = 0.0
|
||||
# Create a task for each permission sync
|
||||
for permissions in new_permissions:
|
||||
current_time = time.monotonic()
|
||||
if lock and current_time - last_lock_time >= (
|
||||
@@ -231,9 +228,7 @@ class RedisConnectorPermissionSync:
|
||||
|
||||
# This can internally exception due to db issues but still continue
|
||||
# Catch exceptions per-element to avoid breaking the entire sync
|
||||
db_start = time.monotonic()
|
||||
try:
|
||||
|
||||
element_update_permissions_fn(
|
||||
self.tenant_id,
|
||||
permissions,
|
||||
@@ -241,6 +236,7 @@ class RedisConnectorPermissionSync:
|
||||
connector_id,
|
||||
credential_id,
|
||||
)
|
||||
|
||||
num_permissions += 1
|
||||
except Exception:
|
||||
num_errors += 1
|
||||
@@ -253,12 +249,8 @@ class RedisConnectorPermissionSync:
|
||||
task_logger.exception(
|
||||
f"Failed to update permissions for element {element_id}"
|
||||
)
|
||||
finally:
|
||||
cumulative_db_update_time += time.monotonic() - db_start
|
||||
# Continue processing other elements
|
||||
|
||||
observe_doc_perm_sync_db_update_duration(
|
||||
cumulative_db_update_time, source_string
|
||||
)
|
||||
return PermissionSyncResult(num_updated=num_permissions, num_errors=num_errors)
|
||||
|
||||
def reset(self) -> None:
|
||||
|
||||
@@ -11,9 +11,7 @@ from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.app_configs import MAX_EMBEDDED_IMAGES_PER_FILE
|
||||
from onyx.configs.app_configs import MAX_EMBEDDED_IMAGES_PER_UPLOAD
|
||||
from onyx.configs.llm_configs import get_image_extraction_and_analysis_enabled
|
||||
from onyx.db.llm import fetch_default_llm_model
|
||||
from onyx.file_processing.extract_file_text import count_docx_embedded_images
|
||||
from onyx.file_processing.extract_file_text import count_pdf_embedded_images
|
||||
from onyx.file_processing.extract_file_text import extract_file_text
|
||||
from onyx.file_processing.extract_file_text import get_file_ext
|
||||
@@ -200,9 +198,6 @@ def categorize_uploaded_files(
|
||||
# rejected even if they'd individually fit under MAX_EMBEDDED_IMAGES_PER_FILE.
|
||||
batch_image_total = 0
|
||||
|
||||
# Hoisted out of the loop to avoid a KV-store lookup per file.
|
||||
image_extraction_enabled = get_image_extraction_and_analysis_enabled()
|
||||
|
||||
for upload in files:
|
||||
try:
|
||||
filename = get_safe_filename(upload)
|
||||
@@ -265,33 +260,28 @@ def categorize_uploaded_files(
|
||||
)
|
||||
continue
|
||||
|
||||
# Reject documents with an unreasonable number of embedded
|
||||
# images (either per-file or accumulated across this upload
|
||||
# batch). A file with thousands of embedded images can OOM the
|
||||
# Reject PDFs with an unreasonable number of embedded images
|
||||
# (either per-file or accumulated across this upload batch).
|
||||
# A PDF with thousands of embedded images can OOM the
|
||||
# user-file-processing celery worker because every image is
|
||||
# decoded with PIL and then sent to the vision LLM.
|
||||
count: int = 0
|
||||
image_bearing_ext = extension in (".pdf", ".docx")
|
||||
if image_bearing_ext:
|
||||
if extension == ".pdf":
|
||||
file_cap = MAX_EMBEDDED_IMAGES_PER_FILE
|
||||
batch_cap = MAX_EMBEDDED_IMAGES_PER_UPLOAD
|
||||
# Use the larger of the two caps as the short-circuit
|
||||
# threshold so we get a useful count for both checks.
|
||||
# These helpers restore the stream position.
|
||||
counter = (
|
||||
count_pdf_embedded_images
|
||||
if extension == ".pdf"
|
||||
else count_docx_embedded_images
|
||||
# count_pdf_embedded_images restores the stream position.
|
||||
count = count_pdf_embedded_images(
|
||||
upload.file, max(file_cap, batch_cap)
|
||||
)
|
||||
count = counter(upload.file, max(file_cap, batch_cap))
|
||||
if count > file_cap:
|
||||
results.rejected.append(
|
||||
RejectedFile(
|
||||
filename=filename,
|
||||
reason=(
|
||||
f"Document contains too many embedded "
|
||||
f"images (more than {file_cap}). Try "
|
||||
f"splitting it into smaller files."
|
||||
f"PDF contains too many embedded images "
|
||||
f"(more than {file_cap}). Try splitting "
|
||||
f"the document into smaller files."
|
||||
),
|
||||
)
|
||||
)
|
||||
@@ -318,21 +308,6 @@ def categorize_uploaded_files(
|
||||
extension=extension,
|
||||
)
|
||||
if not text_content:
|
||||
# Documents with embedded images (e.g. scans) have no
|
||||
# extractable text but can still be indexed via the
|
||||
# vision-LLM captioning path when image analysis is
|
||||
# enabled.
|
||||
if image_bearing_ext and count > 0 and image_extraction_enabled:
|
||||
results.acceptable.append(upload)
|
||||
results.acceptable_file_to_token_count[filename] = 0
|
||||
try:
|
||||
upload.file.seek(0)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to reset file pointer for '{filename}': {str(e)}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.warning(f"No text content extracted from '{filename}'")
|
||||
results.rejected.append(
|
||||
RejectedFile(
|
||||
|
||||
@@ -1,186 +0,0 @@
|
||||
"""Permission-sync-specific Prometheus metrics.
|
||||
|
||||
Tracks doc permission sync and external group sync phases:
|
||||
|
||||
Doc permission sync (connector_permission_sync_generator_task):
|
||||
1. Overall sync duration (source enumeration + DB updates)
|
||||
2. Cumulative per-element DB update duration within update_db
|
||||
3. Documents successfully synced
|
||||
4. Documents with permission errors
|
||||
|
||||
External group sync (_perform_external_group_sync):
|
||||
1. Overall sync duration
|
||||
2. Cumulative batch upsert duration
|
||||
3. Groups processed
|
||||
4. Unique users discovered
|
||||
5. Errors encountered
|
||||
|
||||
All metrics are labeled by connector_type to identify which connector sources
|
||||
are the most expensive to sync. cc_pair_id is intentionally excluded to avoid
|
||||
unbounded cardinality.
|
||||
|
||||
Usage:
|
||||
from onyx.server.metrics.perm_sync_metrics import (
|
||||
observe_doc_perm_sync_duration,
|
||||
observe_doc_perm_sync_db_update_duration,
|
||||
inc_doc_perm_sync_docs_processed,
|
||||
inc_doc_perm_sync_errors,
|
||||
observe_group_sync_duration,
|
||||
observe_group_sync_upsert_duration,
|
||||
inc_group_sync_groups_processed,
|
||||
inc_group_sync_users_processed,
|
||||
inc_group_sync_errors,
|
||||
)
|
||||
"""
|
||||
|
||||
from prometheus_client import Counter
|
||||
from prometheus_client import Histogram
|
||||
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
# --- Doc permission sync metrics ---
|
||||
|
||||
DOC_PERM_SYNC_DURATION = Histogram(
|
||||
"onyx_doc_perm_sync_duration_seconds",
|
||||
"Overall duration of doc permission sync (source enumeration + DB updates)",
|
||||
["connector_type"],
|
||||
buckets=[5, 60, 600, 1800, 3600, 10800, 21600],
|
||||
)
|
||||
|
||||
DOC_PERM_SYNC_DB_UPDATE_DURATION = Histogram(
|
||||
"onyx_doc_perm_sync_db_update_duration_seconds",
|
||||
"Cumulative per-element DB update duration within a single doc permission sync",
|
||||
["connector_type"],
|
||||
buckets=[0.1, 0.5, 1, 5, 15, 30, 60, 300, 600],
|
||||
)
|
||||
|
||||
DOC_PERM_SYNC_DOCS_PROCESSED = Counter(
|
||||
"onyx_doc_perm_sync_docs_processed_total",
|
||||
"Total documents successfully synced during doc permission sync",
|
||||
["connector_type"],
|
||||
)
|
||||
|
||||
DOC_PERM_SYNC_ERRORS = Counter(
|
||||
"onyx_doc_perm_sync_errors_total",
|
||||
"Total document permission errors during doc permission sync",
|
||||
["connector_type"],
|
||||
)
|
||||
|
||||
# --- External group sync metrics ---
|
||||
|
||||
GROUP_SYNC_DURATION = Histogram(
|
||||
"onyx_group_sync_duration_seconds",
|
||||
"Overall duration of external group sync",
|
||||
["connector_type"],
|
||||
buckets=[5, 60, 600, 1800, 3600, 10800, 21600],
|
||||
)
|
||||
|
||||
GROUP_SYNC_UPSERT_DURATION = Histogram(
|
||||
"onyx_group_sync_upsert_duration_seconds",
|
||||
"Cumulative batch upsert duration within a single external group sync",
|
||||
["connector_type"],
|
||||
buckets=[0.1, 0.5, 1, 5, 15, 30, 60, 300, 600],
|
||||
)
|
||||
|
||||
GROUP_SYNC_GROUPS_PROCESSED = Counter(
|
||||
"onyx_group_sync_groups_processed_total",
|
||||
"Total groups processed during external group sync",
|
||||
["connector_type"],
|
||||
)
|
||||
|
||||
GROUP_SYNC_USERS_PROCESSED = Counter(
|
||||
"onyx_group_sync_users_processed_total",
|
||||
"Total unique users discovered during external group sync",
|
||||
["connector_type"],
|
||||
)
|
||||
|
||||
GROUP_SYNC_ERRORS = Counter(
|
||||
"onyx_group_sync_errors_total",
|
||||
"Total errors during external group sync",
|
||||
["connector_type"],
|
||||
)
|
||||
|
||||
|
||||
# --- Doc permission sync helpers ---
|
||||
|
||||
|
||||
def observe_doc_perm_sync_duration(
|
||||
duration_seconds: float, connector_type: str
|
||||
) -> None:
|
||||
try:
|
||||
DOC_PERM_SYNC_DURATION.labels(connector_type=connector_type).observe(
|
||||
duration_seconds
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to record doc perm sync duration", exc_info=True)
|
||||
|
||||
|
||||
def observe_doc_perm_sync_db_update_duration(
|
||||
duration_seconds: float, connector_type: str
|
||||
) -> None:
|
||||
try:
|
||||
DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(connector_type=connector_type).observe(
|
||||
duration_seconds
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to record doc perm sync db update duration", exc_info=True)
|
||||
|
||||
|
||||
def inc_doc_perm_sync_docs_processed(connector_type: str, amount: int = 1) -> None:
|
||||
try:
|
||||
DOC_PERM_SYNC_DOCS_PROCESSED.labels(connector_type=connector_type).inc(amount)
|
||||
except Exception:
|
||||
logger.debug("Failed to record doc perm sync docs processed", exc_info=True)
|
||||
|
||||
|
||||
def inc_doc_perm_sync_errors(connector_type: str, amount: int = 1) -> None:
|
||||
try:
|
||||
DOC_PERM_SYNC_ERRORS.labels(connector_type=connector_type).inc(amount)
|
||||
except Exception:
|
||||
logger.debug("Failed to record doc perm sync errors", exc_info=True)
|
||||
|
||||
|
||||
# --- External group sync helpers ---
|
||||
|
||||
|
||||
def observe_group_sync_duration(duration_seconds: float, connector_type: str) -> None:
|
||||
try:
|
||||
GROUP_SYNC_DURATION.labels(connector_type=connector_type).observe(
|
||||
duration_seconds
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to record group sync duration", exc_info=True)
|
||||
|
||||
|
||||
def observe_group_sync_upsert_duration(
|
||||
duration_seconds: float, connector_type: str
|
||||
) -> None:
|
||||
try:
|
||||
GROUP_SYNC_UPSERT_DURATION.labels(connector_type=connector_type).observe(
|
||||
duration_seconds
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to record group sync upsert duration", exc_info=True)
|
||||
|
||||
|
||||
def inc_group_sync_groups_processed(connector_type: str, amount: int = 1) -> None:
|
||||
try:
|
||||
GROUP_SYNC_GROUPS_PROCESSED.labels(connector_type=connector_type).inc(amount)
|
||||
except Exception:
|
||||
logger.debug("Failed to record group sync groups processed", exc_info=True)
|
||||
|
||||
|
||||
def inc_group_sync_users_processed(connector_type: str, amount: int = 1) -> None:
|
||||
try:
|
||||
GROUP_SYNC_USERS_PROCESSED.labels(connector_type=connector_type).inc(amount)
|
||||
except Exception:
|
||||
logger.debug("Failed to record group sync users processed", exc_info=True)
|
||||
|
||||
|
||||
def inc_group_sync_errors(connector_type: str, amount: int = 1) -> None:
|
||||
try:
|
||||
GROUP_SYNC_ERRORS.labels(connector_type=connector_type).inc(amount)
|
||||
except Exception:
|
||||
logger.debug("Failed to record group sync errors", exc_info=True)
|
||||
@@ -13,7 +13,7 @@ from onyx.configs.constants import PUBLIC_API_TAGS
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import IndexAttemptMetadata
|
||||
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
|
||||
from onyx.db.document import delete_documents_complete
|
||||
from onyx.db.document import delete_documents_complete__no_commit
|
||||
from onyx.db.document import get_document
|
||||
from onyx.db.document import get_documents_by_cc_pair
|
||||
from onyx.db.document import get_ingestion_documents
|
||||
@@ -210,4 +210,5 @@ def delete_ingestion_doc(
|
||||
)
|
||||
|
||||
# Delete from database
|
||||
delete_documents_complete(db_session, [document_id])
|
||||
delete_documents_complete__no_commit(db_session, [document_id])
|
||||
db_session.commit()
|
||||
|
||||
@@ -63,7 +63,6 @@ from onyx.db.persona import get_persona_by_id
|
||||
from onyx.db.usage import increment_usage
|
||||
from onyx.db.usage import UsageType
|
||||
from onyx.db.user_file import get_file_id_by_user_file_id
|
||||
from onyx.db.user_file import user_can_access_chat_file
|
||||
from onyx.error_handling.error_codes import OnyxErrorCode
|
||||
from onyx.error_handling.exceptions import OnyxError
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
@@ -867,18 +866,14 @@ def seed_chat_from_slack(
|
||||
def fetch_chat_file(
|
||||
file_id: str,
|
||||
request: Request,
|
||||
user: User = Depends(require_permission(Permission.BASIC_ACCESS)),
|
||||
_: User = Depends(require_permission(Permission.BASIC_ACCESS)),
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> Response:
|
||||
|
||||
# For user files, we need to get the file id from the user file id
|
||||
file_id_from_user_file = get_file_id_by_user_file_id(file_id, user.id, db_session)
|
||||
file_id_from_user_file = get_file_id_by_user_file_id(file_id, db_session)
|
||||
if file_id_from_user_file:
|
||||
file_id = file_id_from_user_file
|
||||
elif not user_can_access_chat_file(file_id, user.id, db_session):
|
||||
# Return 404 (rather than 403) so callers cannot probe for file
|
||||
# existence across ownership boundaries.
|
||||
raise OnyxError(OnyxErrorCode.NOT_FOUND, "File not found")
|
||||
|
||||
file_store = get_default_file_store()
|
||||
file_record = file_store.read_file_record(file_id)
|
||||
|
||||
@@ -80,7 +80,7 @@ class Settings(BaseModel):
|
||||
query_history_type: QueryHistoryType | None = None
|
||||
|
||||
# Image processing settings
|
||||
image_extraction_and_analysis_enabled: bool | None = True
|
||||
image_extraction_and_analysis_enabled: bool | None = False
|
||||
search_time_image_analysis_enabled: bool | None = False
|
||||
image_analysis_max_size_mb: int | None = 20
|
||||
|
||||
|
||||
@@ -99,14 +99,6 @@ STRICT_CHUNK_TOKEN_LIMIT = (
|
||||
# Set up Sentry integration (for error logging)
|
||||
SENTRY_DSN = os.environ.get("SENTRY_DSN")
|
||||
|
||||
# Celery task spans dominate ingestion volume (~94%), so default celery
|
||||
# tracing to 0. Web/API traces stay at a small non-zero rate so http.server
|
||||
# traces remain available. Both are env-tunable without a code change.
|
||||
SENTRY_TRACES_SAMPLE_RATE = float(os.environ.get("SENTRY_TRACES_SAMPLE_RATE", "0.01"))
|
||||
SENTRY_CELERY_TRACES_SAMPLE_RATE = float(
|
||||
os.environ.get("SENTRY_CELERY_TRACES_SAMPLE_RATE", "0.0")
|
||||
)
|
||||
|
||||
|
||||
# Fields which should only be set on new search setting
|
||||
PRESERVED_SEARCH_FIELDS = [
|
||||
|
||||
@@ -210,7 +210,7 @@ def test_jira_doc_sync_with_specific_permissions(
|
||||
assert len(docs) > 0, "Expected at least one document from SUP project"
|
||||
|
||||
_EXPECTED_USER_EMAILS = set(
|
||||
["yuhong@onyx.app", "chris@onyx.app", "founders@onyx.app", "oauth@onyx.app"]
|
||||
["yuhong@onyx.app", "chris@onyx.app", "founders@onyx.app"]
|
||||
)
|
||||
_EXPECTED_USER_GROUP_IDS = set(["jira-users-danswerai"])
|
||||
|
||||
|
||||
@@ -46,7 +46,6 @@ _EXPECTED_JIRA_GROUPS = [
|
||||
"chris@onyx.app",
|
||||
"founders@onyx.app",
|
||||
"hagen@danswer.ai",
|
||||
"oauth@onyx.app",
|
||||
"pablo@onyx.app",
|
||||
"yuhong@onyx.app",
|
||||
},
|
||||
@@ -57,11 +56,6 @@ _EXPECTED_JIRA_GROUPS = [
|
||||
user_emails={"founders@onyx.app", "hagen@danswer.ai", "pablo@onyx.app"},
|
||||
gives_anyone_access=False,
|
||||
),
|
||||
ExternalUserGroupSet(
|
||||
id="jira-servicemanagement-users-danswerai",
|
||||
user_emails={"oauth@onyx.app"},
|
||||
gives_anyone_access=False,
|
||||
),
|
||||
ExternalUserGroupSet(
|
||||
id="jira-user-access-admins-danswerai",
|
||||
user_emails={"hagen@danswer.ai"},
|
||||
@@ -73,7 +67,6 @@ _EXPECTED_JIRA_GROUPS = [
|
||||
"chris@onyx.app",
|
||||
"founders@onyx.app",
|
||||
"hagen@danswer.ai",
|
||||
"oauth@onyx.app",
|
||||
"pablo@onyx.app",
|
||||
},
|
||||
gives_anyone_access=False,
|
||||
@@ -83,19 +76,18 @@ _EXPECTED_JIRA_GROUPS = [
|
||||
user_emails={
|
||||
"chris@onyx.app",
|
||||
"founders@onyx.app",
|
||||
"oauth@onyx.app",
|
||||
"yuhong@onyx.app",
|
||||
},
|
||||
gives_anyone_access=False,
|
||||
),
|
||||
ExternalUserGroupSet(
|
||||
id="bitbucket-admins-onyxai",
|
||||
user_emails={"founders@onyx.app", "oauth@onyx.app"},
|
||||
user_emails={"founders@onyx.app"}, # no Oauth, we skip "app" account in jira
|
||||
gives_anyone_access=False,
|
||||
),
|
||||
ExternalUserGroupSet(
|
||||
id="bitbucket-users-onyxai",
|
||||
user_emails={"founders@onyx.app", "oauth@onyx.app"},
|
||||
user_emails={"founders@onyx.app"}, # no Oauth, we skip "app" account in jira
|
||||
gives_anyone_access=False,
|
||||
),
|
||||
]
|
||||
|
||||
@@ -1,130 +0,0 @@
|
||||
"""External dependency tests for onyx.file_store.staging.
|
||||
|
||||
Exercises the raw-file persistence hook used by the docfetching pipeline
|
||||
against a real file store (Postgres + MinIO/S3), since mocking the store
|
||||
would defeat the point of verifying that metadata round-trips through
|
||||
FileRecord.
|
||||
"""
|
||||
|
||||
from collections.abc import Generator
|
||||
from io import BytesIO
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.interfaces import BaseConnector
|
||||
from onyx.db.file_record import delete_filerecord_by_file_id
|
||||
from onyx.db.file_record import get_filerecord_by_file_id
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.file_store.staging import build_raw_file_callback
|
||||
from onyx.file_store.staging import stage_raw_file
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def cleanup_file_ids(
|
||||
db_session: Session,
|
||||
) -> Generator[list[str], None, None]:
|
||||
created: list[str] = []
|
||||
yield created
|
||||
file_store = get_default_file_store()
|
||||
for fid in created:
|
||||
try:
|
||||
file_store.delete_file(fid)
|
||||
except Exception:
|
||||
delete_filerecord_by_file_id(file_id=fid, db_session=db_session)
|
||||
db_session.commit()
|
||||
|
||||
|
||||
def test_stage_raw_file_persists_with_origin_and_metadata(
|
||||
db_session: Session,
|
||||
tenant_context: None, # noqa: ARG001
|
||||
initialize_file_store: None, # noqa: ARG001
|
||||
cleanup_file_ids: list[str],
|
||||
) -> None:
|
||||
"""stage_raw_file writes a FileRecord with INDEXING_STAGING origin and
|
||||
round-trips the provided metadata verbatim."""
|
||||
metadata: dict[str, Any] = {
|
||||
"index_attempt_id": 42,
|
||||
"cc_pair_id": 7,
|
||||
"tenant_id": "tenant-abc",
|
||||
"extra": "payload",
|
||||
}
|
||||
content_bytes = b"hello raw file"
|
||||
content_type = "application/pdf"
|
||||
|
||||
file_id = stage_raw_file(
|
||||
content=BytesIO(content_bytes),
|
||||
content_type=content_type,
|
||||
metadata=metadata,
|
||||
)
|
||||
cleanup_file_ids.append(file_id)
|
||||
db_session.commit()
|
||||
|
||||
record = get_filerecord_by_file_id(file_id=file_id, db_session=db_session)
|
||||
assert record.file_origin == FileOrigin.INDEXING_STAGING
|
||||
assert record.file_type == content_type
|
||||
assert record.file_metadata == metadata
|
||||
|
||||
|
||||
def test_build_raw_file_callback_binds_attempt_context_per_call(
|
||||
db_session: Session,
|
||||
tenant_context: None, # noqa: ARG001
|
||||
initialize_file_store: None, # noqa: ARG001
|
||||
cleanup_file_ids: list[str],
|
||||
) -> None:
|
||||
"""The callback returned by build_raw_file_callback must bind the
|
||||
attempt-level context into every FileRecord it produces, without
|
||||
leaking state across invocations."""
|
||||
callback = build_raw_file_callback(
|
||||
index_attempt_id=1001,
|
||||
cc_pair_id=202,
|
||||
tenant_id="tenant-xyz",
|
||||
)
|
||||
|
||||
file_id_a = callback(BytesIO(b"alpha"), "text/plain")
|
||||
file_id_b = callback(BytesIO(b"beta"), "application/octet-stream")
|
||||
cleanup_file_ids.extend([file_id_a, file_id_b])
|
||||
db_session.commit()
|
||||
|
||||
assert file_id_a != file_id_b
|
||||
|
||||
for fid, expected_content_type in (
|
||||
(file_id_a, "text/plain"),
|
||||
(file_id_b, "application/octet-stream"),
|
||||
):
|
||||
record = get_filerecord_by_file_id(file_id=fid, db_session=db_session)
|
||||
assert record.file_origin == FileOrigin.INDEXING_STAGING
|
||||
assert record.file_type == expected_content_type
|
||||
assert record.file_metadata == {
|
||||
"index_attempt_id": 1001,
|
||||
"cc_pair_id": 202,
|
||||
"tenant_id": "tenant-xyz",
|
||||
}
|
||||
|
||||
|
||||
def test_set_raw_file_callback_on_base_connector() -> None:
|
||||
"""set_raw_file_callback must install the callback as an instance
|
||||
attribute usable by the connector."""
|
||||
|
||||
class _MinimalConnector(BaseConnector):
|
||||
def load_credentials(
|
||||
self,
|
||||
credentials: dict[str, Any], # noqa: ARG002
|
||||
) -> dict[str, Any] | None:
|
||||
return None
|
||||
|
||||
connector = _MinimalConnector()
|
||||
assert connector.raw_file_callback is None
|
||||
|
||||
sentinel_file_id = f"sentinel-{uuid4().hex[:8]}"
|
||||
|
||||
def _fake_callback(_content: Any, _content_type: str) -> str:
|
||||
return sentinel_file_id
|
||||
|
||||
connector.set_raw_file_callback(_fake_callback)
|
||||
|
||||
assert connector.raw_file_callback is _fake_callback
|
||||
assert connector.raw_file_callback(BytesIO(b""), "text/plain") == sentinel_file_id
|
||||
@@ -1,279 +0,0 @@
|
||||
"""External dependency unit tests for the file_id cleanup that runs alongside
|
||||
document deletion across the three deletion paths:
|
||||
|
||||
1. `document_by_cc_pair_cleanup_task` (pruning + connector deletion)
|
||||
2. `delete_ingestion_doc` (public ingestion API DELETE)
|
||||
3. `delete_all_documents_for_connector_credential_pair` (index swap)
|
||||
|
||||
Each path captures attached `Document.file_id`s before the row is removed and
|
||||
best-effort deletes the underlying files after the DB commit.
|
||||
"""
|
||||
|
||||
from collections.abc import Generator
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import patch
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.background.celery.tasks.shared.tasks import (
|
||||
document_by_cc_pair_cleanup_task,
|
||||
)
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import IndexAttemptMetadata
|
||||
from onyx.db.document import delete_all_documents_for_connector_credential_pair
|
||||
from onyx.db.document import upsert_document_by_connector_credential_pair
|
||||
from onyx.db.models import ConnectorCredentialPair
|
||||
from onyx.indexing.indexing_pipeline import index_doc_batch_prepare
|
||||
from onyx.server.onyx_api.ingestion import delete_ingestion_doc
|
||||
from tests.external_dependency_unit.constants import TEST_TENANT_ID
|
||||
from tests.external_dependency_unit.indexing_helpers import cleanup_cc_pair
|
||||
from tests.external_dependency_unit.indexing_helpers import get_doc_row
|
||||
from tests.external_dependency_unit.indexing_helpers import get_filerecord
|
||||
from tests.external_dependency_unit.indexing_helpers import make_cc_pair
|
||||
from tests.external_dependency_unit.indexing_helpers import make_doc
|
||||
from tests.external_dependency_unit.indexing_helpers import stage_file
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers (file-local)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _index_doc(
|
||||
db_session: Session,
|
||||
doc: Document,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
"""Run the doc through the upsert pipeline so the row + cc_pair mapping
|
||||
exist (so deletion paths have something to find)."""
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cc_pair(
|
||||
db_session: Session,
|
||||
tenant_context: None, # noqa: ARG001
|
||||
initialize_file_store: None, # noqa: ARG001
|
||||
) -> Generator[ConnectorCredentialPair, None, None]:
|
||||
pair = make_cc_pair(db_session)
|
||||
try:
|
||||
yield pair
|
||||
finally:
|
||||
cleanup_cc_pair(db_session, pair)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def second_cc_pair(
|
||||
db_session: Session,
|
||||
tenant_context: None, # noqa: ARG001
|
||||
initialize_file_store: None, # noqa: ARG001
|
||||
) -> Generator[ConnectorCredentialPair, None, None]:
|
||||
"""A second cc_pair, used to test the count > 1 branch."""
|
||||
pair = make_cc_pair(db_session)
|
||||
try:
|
||||
yield pair
|
||||
finally:
|
||||
cleanup_cc_pair(db_session, pair)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def attempt_metadata(cc_pair: ConnectorCredentialPair) -> IndexAttemptMetadata:
|
||||
return IndexAttemptMetadata(
|
||||
connector_id=cc_pair.connector_id,
|
||||
credential_id=cc_pair.credential_id,
|
||||
attempt_id=None,
|
||||
request_id="test-request",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDeleteAllDocumentsForCcPair:
|
||||
"""Path 3: bulk delete during index swap (`INSTANT` switchover)."""
|
||||
|
||||
def test_cleans_up_files_for_all_docs(
|
||||
self,
|
||||
db_session: Session,
|
||||
cc_pair: ConnectorCredentialPair,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
file_id_a = stage_file(content=b"a")
|
||||
file_id_b = stage_file(content=b"b")
|
||||
doc_a = make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id_a)
|
||||
doc_b = make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id_b)
|
||||
|
||||
_index_doc(db_session, doc_a, attempt_metadata)
|
||||
_index_doc(db_session, doc_b, attempt_metadata)
|
||||
|
||||
assert get_filerecord(db_session, file_id_a) is not None
|
||||
assert get_filerecord(db_session, file_id_b) is not None
|
||||
|
||||
delete_all_documents_for_connector_credential_pair(
|
||||
db_session=db_session,
|
||||
connector_id=cc_pair.connector_id,
|
||||
credential_id=cc_pair.credential_id,
|
||||
)
|
||||
|
||||
assert get_doc_row(db_session, doc_a.id) is None
|
||||
assert get_doc_row(db_session, doc_b.id) is None
|
||||
assert get_filerecord(db_session, file_id_a) is None
|
||||
assert get_filerecord(db_session, file_id_b) is None
|
||||
|
||||
def test_handles_mixed_docs_with_and_without_file_ids(
|
||||
self,
|
||||
db_session: Session,
|
||||
cc_pair: ConnectorCredentialPair,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
"""Docs without file_id should be cleanly removed — no errors,
|
||||
no spurious file_store calls."""
|
||||
file_id = stage_file()
|
||||
doc_with = make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
|
||||
doc_without = make_doc(f"doc-{uuid4().hex[:8]}", file_id=None)
|
||||
|
||||
_index_doc(db_session, doc_with, attempt_metadata)
|
||||
_index_doc(db_session, doc_without, attempt_metadata)
|
||||
|
||||
delete_all_documents_for_connector_credential_pair(
|
||||
db_session=db_session,
|
||||
connector_id=cc_pair.connector_id,
|
||||
credential_id=cc_pair.credential_id,
|
||||
)
|
||||
|
||||
assert get_doc_row(db_session, doc_with.id) is None
|
||||
assert get_doc_row(db_session, doc_without.id) is None
|
||||
assert get_filerecord(db_session, file_id) is None
|
||||
|
||||
|
||||
class TestDeleteIngestionDoc:
|
||||
"""Path 2: public ingestion API DELETE endpoint."""
|
||||
|
||||
def test_cleans_up_file_for_ingestion_api_doc(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
tenant_context: None, # noqa: ARG002
|
||||
initialize_file_store: None, # noqa: ARG002
|
||||
) -> None:
|
||||
file_id = stage_file()
|
||||
doc = make_doc(
|
||||
f"doc-{uuid4().hex[:8]}",
|
||||
file_id=file_id,
|
||||
from_ingestion_api=True,
|
||||
)
|
||||
|
||||
_index_doc(db_session, doc, attempt_metadata)
|
||||
assert get_filerecord(db_session, file_id) is not None
|
||||
|
||||
# Patch out Vespa — we're testing the file cleanup, not the document
|
||||
# index integration.
|
||||
with patch(
|
||||
"onyx.server.onyx_api.ingestion.get_all_document_indices",
|
||||
return_value=[],
|
||||
):
|
||||
delete_ingestion_doc(
|
||||
document_id=doc.id,
|
||||
_=MagicMock(), # auth dep — not used by the function body
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
assert get_doc_row(db_session, doc.id) is None
|
||||
assert get_filerecord(db_session, file_id) is None
|
||||
|
||||
|
||||
class TestDocumentByCcPairCleanupTask:
|
||||
"""Path 1: per-doc cleanup task fired by pruning / connector deletion."""
|
||||
|
||||
def test_count_1_branch_cleans_up_file(
|
||||
self,
|
||||
db_session: Session,
|
||||
cc_pair: ConnectorCredentialPair,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
full_deployment_setup: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""When the doc has exactly one cc_pair reference, the full delete
|
||||
path runs and the attached file is reaped."""
|
||||
file_id = stage_file()
|
||||
doc = make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
|
||||
_index_doc(db_session, doc, attempt_metadata)
|
||||
|
||||
assert get_filerecord(db_session, file_id) is not None
|
||||
|
||||
# Patch out Vespa interaction — no chunks were ever written, and we're
|
||||
# not testing the document index here.
|
||||
with patch(
|
||||
"onyx.background.celery.tasks.shared.tasks.get_all_document_indices",
|
||||
return_value=[],
|
||||
):
|
||||
result = document_by_cc_pair_cleanup_task.apply(
|
||||
args=(
|
||||
doc.id,
|
||||
cc_pair.connector_id,
|
||||
cc_pair.credential_id,
|
||||
TEST_TENANT_ID,
|
||||
),
|
||||
)
|
||||
|
||||
assert result.successful(), result.traceback
|
||||
assert get_doc_row(db_session, doc.id) is None
|
||||
assert get_filerecord(db_session, file_id) is None
|
||||
|
||||
def test_count_gt_1_branch_preserves_file(
|
||||
self,
|
||||
db_session: Session,
|
||||
cc_pair: ConnectorCredentialPair,
|
||||
second_cc_pair: ConnectorCredentialPair,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
full_deployment_setup: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""When the doc is referenced by another cc_pair, only the mapping
|
||||
for the detaching cc_pair is removed. The file MUST stay because
|
||||
the doc and its file are still owned by the remaining cc_pair."""
|
||||
file_id = stage_file()
|
||||
doc = make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
|
||||
_index_doc(db_session, doc, attempt_metadata)
|
||||
|
||||
# Attach the same doc to a second cc_pair so refcount becomes 2.
|
||||
upsert_document_by_connector_credential_pair(
|
||||
db_session,
|
||||
second_cc_pair.connector_id,
|
||||
second_cc_pair.credential_id,
|
||||
[doc.id],
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
with patch(
|
||||
"onyx.background.celery.tasks.shared.tasks.get_all_document_indices",
|
||||
return_value=[],
|
||||
):
|
||||
result = document_by_cc_pair_cleanup_task.apply(
|
||||
args=(
|
||||
doc.id,
|
||||
cc_pair.connector_id,
|
||||
cc_pair.credential_id,
|
||||
TEST_TENANT_ID,
|
||||
),
|
||||
)
|
||||
|
||||
assert result.successful(), result.traceback
|
||||
# Document row still exists (other cc_pair owns it).
|
||||
assert get_doc_row(db_session, doc.id) is not None
|
||||
# File MUST still exist.
|
||||
record = get_filerecord(db_session, file_id)
|
||||
assert record is not None
|
||||
@@ -1,268 +0,0 @@
|
||||
"""External dependency unit tests for `index_doc_batch_prepare`.
|
||||
|
||||
Validates the file_id lifecycle that runs alongside the document upsert:
|
||||
|
||||
* `document.file_id` is written on insert AND on conflict (upsert path)
|
||||
* Newly-staged files get promoted from INDEXING_STAGING -> CONNECTOR
|
||||
* Replaced files are deleted from both `file_record` and S3
|
||||
* No-op when the file_id is unchanged
|
||||
|
||||
Uses real PostgreSQL + real S3/MinIO via the file store.
|
||||
"""
|
||||
|
||||
from collections.abc import Generator
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.models import IndexAttemptMetadata
|
||||
from onyx.db.models import ConnectorCredentialPair
|
||||
from onyx.indexing.indexing_pipeline import index_doc_batch_prepare
|
||||
from tests.external_dependency_unit.indexing_helpers import cleanup_cc_pair
|
||||
from tests.external_dependency_unit.indexing_helpers import get_doc_row
|
||||
from tests.external_dependency_unit.indexing_helpers import get_filerecord
|
||||
from tests.external_dependency_unit.indexing_helpers import make_cc_pair
|
||||
from tests.external_dependency_unit.indexing_helpers import make_doc
|
||||
from tests.external_dependency_unit.indexing_helpers import stage_file
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cc_pair(
|
||||
db_session: Session,
|
||||
tenant_context: None, # noqa: ARG001
|
||||
initialize_file_store: None, # noqa: ARG001
|
||||
) -> Generator[ConnectorCredentialPair, None, None]:
|
||||
pair = make_cc_pair(db_session)
|
||||
try:
|
||||
yield pair
|
||||
finally:
|
||||
cleanup_cc_pair(db_session, pair)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def attempt_metadata(cc_pair: ConnectorCredentialPair) -> IndexAttemptMetadata:
|
||||
return IndexAttemptMetadata(
|
||||
connector_id=cc_pair.connector_id,
|
||||
credential_id=cc_pair.credential_id,
|
||||
attempt_id=None,
|
||||
request_id="test-request",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNewDocuments:
|
||||
"""First-time inserts — no previous file_id to reconcile against."""
|
||||
|
||||
def test_new_doc_without_file_id(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
doc = make_doc(f"doc-{uuid4().hex[:8]}", file_id=None)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
row = get_doc_row(db_session, doc.id)
|
||||
assert row is not None
|
||||
assert row.file_id is None
|
||||
|
||||
def test_new_doc_with_staged_file_id_promotes_to_connector(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
file_id = stage_file()
|
||||
doc = make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
row = get_doc_row(db_session, doc.id)
|
||||
assert row is not None and row.file_id == file_id
|
||||
|
||||
record = get_filerecord(db_session, file_id)
|
||||
assert record is not None
|
||||
assert record.file_origin == FileOrigin.CONNECTOR
|
||||
|
||||
|
||||
class TestExistingDocuments:
|
||||
"""Re-index path — a `document` row already exists with some file_id."""
|
||||
|
||||
def test_unchanged_file_id_is_noop(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
file_id = stage_file()
|
||||
doc = make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
|
||||
|
||||
# First pass: inserts the row + promotes the file.
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Second pass with the same file_id — should not delete or re-promote.
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
record = get_filerecord(db_session, file_id)
|
||||
assert record is not None
|
||||
assert record.file_origin == FileOrigin.CONNECTOR
|
||||
|
||||
row = get_doc_row(db_session, doc.id)
|
||||
assert row is not None and row.file_id == file_id
|
||||
|
||||
def test_swapping_file_id_promotes_new_and_deletes_old(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
old_file_id = stage_file(content=b"old bytes")
|
||||
doc = make_doc(f"doc-{uuid4().hex[:8]}", file_id=old_file_id)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Re-fetch produces a new staged file_id for the same doc.
|
||||
new_file_id = stage_file(content=b"new bytes")
|
||||
doc_v2 = make_doc(doc.id, file_id=new_file_id)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc_v2],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
row = get_doc_row(db_session, doc.id)
|
||||
assert row is not None and row.file_id == new_file_id
|
||||
|
||||
new_record = get_filerecord(db_session, new_file_id)
|
||||
assert new_record is not None
|
||||
assert new_record.file_origin == FileOrigin.CONNECTOR
|
||||
|
||||
# Old file_record + S3 object are gone.
|
||||
assert get_filerecord(db_session, old_file_id) is None
|
||||
|
||||
def test_clearing_file_id_deletes_old_and_nulls_column(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
old_file_id = stage_file()
|
||||
doc = make_doc(f"doc-{uuid4().hex[:8]}", file_id=old_file_id)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Connector opts out on next run — yields the doc without a file_id.
|
||||
doc_v2 = make_doc(doc.id, file_id=None)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc_v2],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
row = get_doc_row(db_session, doc.id)
|
||||
assert row is not None and row.file_id is None
|
||||
assert get_filerecord(db_session, old_file_id) is None
|
||||
|
||||
|
||||
class TestBatchHandling:
|
||||
"""Mixed batches — multiple docs at different lifecycle states in one call."""
|
||||
|
||||
def test_mixed_batch_each_doc_handled_independently(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
# Pre-seed an existing doc with a file_id we'll swap.
|
||||
existing_old_id = stage_file(content=b"existing-old")
|
||||
existing_doc = make_doc(f"doc-{uuid4().hex[:8]}", file_id=existing_old_id)
|
||||
index_doc_batch_prepare(
|
||||
documents=[existing_doc],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Now: swap the existing one, add a brand-new doc with file_id, and a
|
||||
# brand-new doc without file_id.
|
||||
swap_new_id = stage_file(content=b"existing-new")
|
||||
new_with_file_id = stage_file(content=b"new-with-file")
|
||||
existing_v2 = make_doc(existing_doc.id, file_id=swap_new_id)
|
||||
new_with = make_doc(f"doc-{uuid4().hex[:8]}", file_id=new_with_file_id)
|
||||
new_without = make_doc(f"doc-{uuid4().hex[:8]}", file_id=None)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[existing_v2, new_with, new_without],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Existing doc was swapped: old file gone, new file promoted.
|
||||
existing_row = get_doc_row(db_session, existing_doc.id)
|
||||
assert existing_row is not None and existing_row.file_id == swap_new_id
|
||||
assert get_filerecord(db_session, existing_old_id) is None
|
||||
swap_record = get_filerecord(db_session, swap_new_id)
|
||||
assert swap_record is not None
|
||||
assert swap_record.file_origin == FileOrigin.CONNECTOR
|
||||
|
||||
# New doc with file_id: row exists, file promoted.
|
||||
new_with_row = get_doc_row(db_session, new_with.id)
|
||||
assert new_with_row is not None and new_with_row.file_id == new_with_file_id
|
||||
new_with_record = get_filerecord(db_session, new_with_file_id)
|
||||
assert new_with_record is not None
|
||||
assert new_with_record.file_origin == FileOrigin.CONNECTOR
|
||||
|
||||
# New doc without file_id: row exists, no file_record involvement.
|
||||
new_without_row = get_doc_row(db_session, new_without.id)
|
||||
assert new_without_row is not None and new_without_row.file_id is None
|
||||
@@ -1,190 +0,0 @@
|
||||
"""Shared helpers for external-dependency indexing tests.
|
||||
|
||||
Three test files exercise the `Document` / `cc_pair` / `file_store` surfaces
|
||||
against real Postgres + S3: `test_index_doc_batch_prepare`, `test_index_swap_workflow`,
|
||||
and `test_document_deletion_file_cleanup`. The setup + teardown logic is
|
||||
substantial and identical across all three, so it lives here.
|
||||
|
||||
Tests keep their own `cc_pair` fixture (dependencies differ per file), but
|
||||
the body is just `make_cc_pair` + `cleanup_cc_pair`.
|
||||
"""
|
||||
|
||||
from io import BytesIO
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import InputType
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.db.enums import AccessType
|
||||
from onyx.db.enums import ConnectorCredentialPairStatus
|
||||
from onyx.db.file_record import get_filerecord_by_file_id_optional
|
||||
from onyx.db.models import Connector
|
||||
from onyx.db.models import ConnectorCredentialPair
|
||||
from onyx.db.models import Credential
|
||||
from onyx.db.models import Document as DBDocument
|
||||
from onyx.db.models import DocumentByConnectorCredentialPair
|
||||
from onyx.db.models import FileRecord
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
|
||||
|
||||
def make_doc(
|
||||
doc_id: str,
|
||||
file_id: str | None = None,
|
||||
from_ingestion_api: bool = False,
|
||||
) -> Document:
|
||||
"""Minimal Document for indexing-pipeline tests. MOCK_CONNECTOR avoids
|
||||
triggering the hierarchy-node linking branch (NOTION/CONFLUENCE only)."""
|
||||
return Document(
|
||||
id=doc_id,
|
||||
source=DocumentSource.MOCK_CONNECTOR,
|
||||
semantic_identifier=f"semantic-{doc_id}",
|
||||
sections=[TextSection(text="content", link=None)],
|
||||
metadata={},
|
||||
file_id=file_id,
|
||||
from_ingestion_api=from_ingestion_api,
|
||||
)
|
||||
|
||||
|
||||
def stage_file(content: bytes = b"raw bytes") -> str:
|
||||
"""Write bytes to the file store as INDEXING_STAGING and return the file_id.
|
||||
|
||||
Mirrors what the connector raw_file_callback would do during fetch.
|
||||
The `{"test": True}` metadata tag lets manual cleanup scripts find
|
||||
leftovers if a cleanup ever slips through.
|
||||
"""
|
||||
return get_default_file_store().save_file(
|
||||
content=BytesIO(content),
|
||||
display_name=None,
|
||||
file_origin=FileOrigin.INDEXING_STAGING,
|
||||
file_type="application/octet-stream",
|
||||
file_metadata={"test": True},
|
||||
)
|
||||
|
||||
|
||||
def get_doc_row(db_session: Session, doc_id: str) -> DBDocument | None:
|
||||
"""Reload the document row fresh from DB so we see post-upsert state."""
|
||||
db_session.expire_all()
|
||||
return db_session.query(DBDocument).filter(DBDocument.id == doc_id).one_or_none()
|
||||
|
||||
|
||||
def get_filerecord(db_session: Session, file_id: str) -> FileRecord | None:
|
||||
db_session.expire_all()
|
||||
return get_filerecord_by_file_id_optional(file_id=file_id, db_session=db_session)
|
||||
|
||||
|
||||
def make_cc_pair(db_session: Session) -> ConnectorCredentialPair:
|
||||
"""Create a Connector + Credential + ConnectorCredentialPair for a test.
|
||||
|
||||
All names are UUID-suffixed so parallel test runs don't collide.
|
||||
"""
|
||||
connector = Connector(
|
||||
name=f"test-connector-{uuid4().hex[:8]}",
|
||||
source=DocumentSource.MOCK_CONNECTOR,
|
||||
input_type=InputType.LOAD_STATE,
|
||||
connector_specific_config={},
|
||||
refresh_freq=None,
|
||||
prune_freq=None,
|
||||
indexing_start=None,
|
||||
)
|
||||
db_session.add(connector)
|
||||
db_session.flush()
|
||||
|
||||
credential = Credential(
|
||||
source=DocumentSource.MOCK_CONNECTOR,
|
||||
credential_json={},
|
||||
)
|
||||
db_session.add(credential)
|
||||
db_session.flush()
|
||||
|
||||
pair = ConnectorCredentialPair(
|
||||
connector_id=connector.id,
|
||||
credential_id=credential.id,
|
||||
name=f"test-cc-pair-{uuid4().hex[:8]}",
|
||||
status=ConnectorCredentialPairStatus.ACTIVE,
|
||||
access_type=AccessType.PUBLIC,
|
||||
auto_sync_options=None,
|
||||
)
|
||||
db_session.add(pair)
|
||||
db_session.commit()
|
||||
db_session.refresh(pair)
|
||||
return pair
|
||||
|
||||
|
||||
def cleanup_cc_pair(db_session: Session, pair: ConnectorCredentialPair) -> None:
|
||||
"""Tear down everything created under `pair`.
|
||||
|
||||
Deletes own join rows first (FK to document has no cascade), then for any
|
||||
doc that now has zero remaining cc_pair references, deletes its file +
|
||||
the document row. Finally removes the cc_pair, connector, credential.
|
||||
Safe against docs shared with other cc_pairs — those stay alive until
|
||||
their last reference is torn down.
|
||||
"""
|
||||
db_session.expire_all()
|
||||
|
||||
connector_id = pair.connector_id
|
||||
credential_id = pair.credential_id
|
||||
|
||||
owned_doc_ids: list[str] = [
|
||||
row[0]
|
||||
for row in db_session.query(DocumentByConnectorCredentialPair.id)
|
||||
.filter(
|
||||
DocumentByConnectorCredentialPair.connector_id == connector_id,
|
||||
DocumentByConnectorCredentialPair.credential_id == credential_id,
|
||||
)
|
||||
.all()
|
||||
]
|
||||
|
||||
db_session.query(DocumentByConnectorCredentialPair).filter(
|
||||
DocumentByConnectorCredentialPair.connector_id == connector_id,
|
||||
DocumentByConnectorCredentialPair.credential_id == credential_id,
|
||||
).delete(synchronize_session="fetch")
|
||||
db_session.flush()
|
||||
|
||||
if owned_doc_ids:
|
||||
orphan_doc_ids: list[str] = [
|
||||
row[0]
|
||||
for row in db_session.query(DBDocument.id)
|
||||
.filter(DBDocument.id.in_(owned_doc_ids))
|
||||
.filter(
|
||||
~db_session.query(DocumentByConnectorCredentialPair)
|
||||
.filter(DocumentByConnectorCredentialPair.id == DBDocument.id)
|
||||
.exists()
|
||||
)
|
||||
.all()
|
||||
]
|
||||
orphan_file_ids: list[str] = [
|
||||
row[0]
|
||||
for row in db_session.query(DBDocument.file_id)
|
||||
.filter(
|
||||
DBDocument.id.in_(orphan_doc_ids),
|
||||
DBDocument.file_id.isnot(None),
|
||||
)
|
||||
.all()
|
||||
]
|
||||
|
||||
file_store = get_default_file_store()
|
||||
for fid in orphan_file_ids:
|
||||
try:
|
||||
file_store.delete_file(fid, error_on_missing=False)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if orphan_doc_ids:
|
||||
db_session.query(DBDocument).filter(
|
||||
DBDocument.id.in_(orphan_doc_ids)
|
||||
).delete(synchronize_session="fetch")
|
||||
|
||||
db_session.query(ConnectorCredentialPair).filter(
|
||||
ConnectorCredentialPair.id == pair.id
|
||||
).delete(synchronize_session="fetch")
|
||||
db_session.query(Connector).filter(Connector.id == connector_id).delete(
|
||||
synchronize_session="fetch"
|
||||
)
|
||||
db_session.query(Credential).filter(Credential.id == credential_id).delete(
|
||||
synchronize_session="fetch"
|
||||
)
|
||||
db_session.commit()
|
||||
@@ -1,193 +0,0 @@
|
||||
"""Workflow-level test for the INSTANT index swap.
|
||||
|
||||
When `check_and_perform_index_swap` runs against an `INSTANT` switchover, it
|
||||
calls `delete_all_documents_for_connector_credential_pair` for each cc_pair.
|
||||
This test exercises that full workflow end-to-end and asserts that the
|
||||
attached `Document.file_id`s are also reaped — not just the document rows.
|
||||
|
||||
Mocks Vespa (`get_all_document_indices`) since this is testing the postgres +
|
||||
file_store side effects of the swap, not the document index integration.
|
||||
"""
|
||||
|
||||
from collections.abc import Generator
|
||||
from unittest.mock import patch
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.connectors.models import IndexAttemptMetadata
|
||||
from onyx.context.search.models import SavedSearchSettings
|
||||
from onyx.db.enums import EmbeddingPrecision
|
||||
from onyx.db.enums import SwitchoverType
|
||||
from onyx.db.models import ConnectorCredentialPair
|
||||
from onyx.db.models import IndexModelStatus
|
||||
from onyx.db.search_settings import create_search_settings
|
||||
from onyx.db.swap_index import check_and_perform_index_swap
|
||||
from onyx.indexing.indexing_pipeline import index_doc_batch_prepare
|
||||
from tests.external_dependency_unit.indexing_helpers import cleanup_cc_pair
|
||||
from tests.external_dependency_unit.indexing_helpers import get_doc_row
|
||||
from tests.external_dependency_unit.indexing_helpers import get_filerecord
|
||||
from tests.external_dependency_unit.indexing_helpers import make_cc_pair
|
||||
from tests.external_dependency_unit.indexing_helpers import make_doc
|
||||
from tests.external_dependency_unit.indexing_helpers import stage_file
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers (file-local)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_saved_search_settings(
|
||||
*,
|
||||
switchover_type: SwitchoverType = SwitchoverType.REINDEX,
|
||||
) -> SavedSearchSettings:
|
||||
return SavedSearchSettings(
|
||||
model_name=f"test-embedding-model-{uuid4().hex[:8]}",
|
||||
model_dim=768,
|
||||
normalize=True,
|
||||
query_prefix="",
|
||||
passage_prefix="",
|
||||
provider_type=None,
|
||||
index_name=f"test_index_{uuid4().hex[:8]}",
|
||||
multipass_indexing=False,
|
||||
embedding_precision=EmbeddingPrecision.FLOAT,
|
||||
reduced_dimension=None,
|
||||
enable_contextual_rag=False,
|
||||
contextual_rag_llm_name=None,
|
||||
contextual_rag_llm_provider=None,
|
||||
switchover_type=switchover_type,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cc_pair(
|
||||
db_session: Session,
|
||||
tenant_context: None, # noqa: ARG001
|
||||
initialize_file_store: None, # noqa: ARG001
|
||||
full_deployment_setup: None, # noqa: ARG001
|
||||
) -> Generator[ConnectorCredentialPair, None, None]:
|
||||
pair = make_cc_pair(db_session)
|
||||
try:
|
||||
yield pair
|
||||
finally:
|
||||
cleanup_cc_pair(db_session, pair)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def attempt_metadata(cc_pair: ConnectorCredentialPair) -> IndexAttemptMetadata:
|
||||
return IndexAttemptMetadata(
|
||||
connector_id=cc_pair.connector_id,
|
||||
credential_id=cc_pair.credential_id,
|
||||
attempt_id=None,
|
||||
request_id="test-request",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestInstantIndexSwap:
|
||||
"""`SwitchoverType.INSTANT` wipes all docs for every cc_pair as part of
|
||||
the swap. The associated raw files must be reaped too."""
|
||||
|
||||
def test_instant_swap_deletes_docs_and_files(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
# Index two docs with attached files via the normal pipeline.
|
||||
file_id_a = stage_file(content=b"alpha")
|
||||
file_id_b = stage_file(content=b"beta")
|
||||
doc_a = make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id_a)
|
||||
doc_b = make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id_b)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc_a, doc_b],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
# Sanity: docs and files exist before the swap.
|
||||
assert get_doc_row(db_session, doc_a.id) is not None
|
||||
assert get_doc_row(db_session, doc_b.id) is not None
|
||||
assert get_filerecord(db_session, file_id_a) is not None
|
||||
assert get_filerecord(db_session, file_id_b) is not None
|
||||
|
||||
# Stage a FUTURE search settings with INSTANT switchover. The next
|
||||
# `check_and_perform_index_swap` call will see this and trigger the
|
||||
# bulk-delete path on every cc_pair.
|
||||
create_search_settings(
|
||||
search_settings=_make_saved_search_settings(
|
||||
switchover_type=SwitchoverType.INSTANT
|
||||
),
|
||||
db_session=db_session,
|
||||
status=IndexModelStatus.FUTURE,
|
||||
)
|
||||
|
||||
# Vespa is patched out — we're testing the postgres + file_store
|
||||
# side effects, not the document-index integration.
|
||||
with patch(
|
||||
"onyx.db.swap_index.get_all_document_indices",
|
||||
return_value=[],
|
||||
):
|
||||
old_settings = check_and_perform_index_swap(db_session)
|
||||
|
||||
assert old_settings is not None, "INSTANT swap should have executed"
|
||||
|
||||
# Documents are gone.
|
||||
assert get_doc_row(db_session, doc_a.id) is None
|
||||
assert get_doc_row(db_session, doc_b.id) is None
|
||||
|
||||
# Files are gone — the workflow's bulk-delete path correctly
|
||||
# propagated through to file cleanup.
|
||||
assert get_filerecord(db_session, file_id_a) is None
|
||||
assert get_filerecord(db_session, file_id_b) is None
|
||||
|
||||
def test_instant_swap_with_mixed_docs_does_not_break(
|
||||
self,
|
||||
db_session: Session,
|
||||
attempt_metadata: IndexAttemptMetadata,
|
||||
) -> None:
|
||||
"""A mix of docs with and without file_ids must all be swept up
|
||||
without errors during the swap."""
|
||||
file_id = stage_file()
|
||||
doc_with = make_doc(f"doc-{uuid4().hex[:8]}", file_id=file_id)
|
||||
doc_without = make_doc(f"doc-{uuid4().hex[:8]}", file_id=None)
|
||||
|
||||
index_doc_batch_prepare(
|
||||
documents=[doc_with, doc_without],
|
||||
index_attempt_metadata=attempt_metadata,
|
||||
db_session=db_session,
|
||||
ignore_time_skip=True,
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
create_search_settings(
|
||||
search_settings=_make_saved_search_settings(
|
||||
switchover_type=SwitchoverType.INSTANT
|
||||
),
|
||||
db_session=db_session,
|
||||
status=IndexModelStatus.FUTURE,
|
||||
)
|
||||
|
||||
with patch(
|
||||
"onyx.db.swap_index.get_all_document_indices",
|
||||
return_value=[],
|
||||
):
|
||||
old_settings = check_and_perform_index_swap(db_session)
|
||||
|
||||
assert old_settings is not None
|
||||
|
||||
assert get_doc_row(db_session, doc_with.id) is None
|
||||
assert get_doc_row(db_session, doc_without.id) is None
|
||||
assert get_filerecord(db_session, file_id) is None
|
||||
@@ -247,7 +247,7 @@ class DATestSettings(BaseModel):
|
||||
gpu_enabled: bool | None = None
|
||||
product_gating: DATestGatingType = DATestGatingType.NONE
|
||||
anonymous_user_enabled: bool | None = None
|
||||
image_extraction_and_analysis_enabled: bool | None = True
|
||||
image_extraction_and_analysis_enabled: bool | None = False
|
||||
search_time_image_analysis_enabled: bool | None = False
|
||||
|
||||
|
||||
|
||||
@@ -8,10 +8,8 @@ import io
|
||||
from typing import NamedTuple
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from onyx.file_store.models import FileDescriptor
|
||||
from tests.integration.common_utils.constants import API_SERVER_URL
|
||||
from tests.integration.common_utils.managers.chat import ChatSessionManager
|
||||
from tests.integration.common_utils.managers.file import FileManager
|
||||
from tests.integration.common_utils.managers.llm_provider import LLMProviderManager
|
||||
@@ -121,31 +119,3 @@ def test_public_assistant_with_user_files(
|
||||
assert (
|
||||
len(chat_history) >= 2
|
||||
), "Expected at least 2 messages (user message and assistant response)"
|
||||
|
||||
|
||||
def test_cannot_download_other_users_file_via_chat_file_endpoint(
|
||||
user_file_setup: UserFileTestSetup,
|
||||
) -> None:
|
||||
storage_file_id = user_file_setup.user1_file_descriptor["id"]
|
||||
user_file_id = user_file_setup.user1_file_id
|
||||
|
||||
owner_response = requests.get(
|
||||
f"{API_SERVER_URL}/chat/file/{storage_file_id}",
|
||||
headers=user_file_setup.user1_file_owner.headers,
|
||||
)
|
||||
assert owner_response.status_code == 200
|
||||
assert owner_response.content, "Owner should receive the file contents"
|
||||
|
||||
for file_id in (storage_file_id, user_file_id):
|
||||
user2_response = requests.get(
|
||||
f"{API_SERVER_URL}/chat/file/{file_id}",
|
||||
headers=user_file_setup.user2_non_owner.headers,
|
||||
)
|
||||
assert user2_response.status_code in (
|
||||
403,
|
||||
404,
|
||||
), (
|
||||
f"Expected access denied for non-owner, got {user2_response.status_code} "
|
||||
f"when fetching file_id={file_id}"
|
||||
)
|
||||
assert user2_response.content != owner_response.content
|
||||
|
||||
@@ -69,9 +69,6 @@ class TestGongConnectorCheckpoint:
|
||||
workspace_ids=["ws1", None],
|
||||
workspace_index=1,
|
||||
cursor="abc123",
|
||||
pending_transcripts={"call1": _make_transcript("call1")},
|
||||
pending_call_details_attempts=2,
|
||||
pending_retry_after=1234567890.5,
|
||||
)
|
||||
json_str = original.model_dump_json()
|
||||
restored = connector.validate_checkpoint_json(json_str)
|
||||
@@ -234,11 +231,7 @@ class TestGongConnectorCheckpoint:
|
||||
mock_request: MagicMock,
|
||||
connector: GongConnector,
|
||||
) -> None:
|
||||
"""Missing call details persist across checkpoint invocations and
|
||||
eventually yield ConnectorFailure once MAX_CALL_DETAILS_ATTEMPTS is hit.
|
||||
No in-call sleep — retries happen on subsequent invocations, gated by
|
||||
the wall-clock retry-after deadline on the checkpoint.
|
||||
"""
|
||||
"""When call details are missing after retries, yield ConnectorFailure."""
|
||||
transcript_response = MagicMock()
|
||||
transcript_response.status_code = 200
|
||||
transcript_response.json.return_value = {
|
||||
@@ -264,42 +257,23 @@ class TestGongConnectorCheckpoint:
|
||||
failures: list[ConnectorFailure] = []
|
||||
docs: list[Document] = []
|
||||
|
||||
# Jump the clock past any retry deadline on each invocation so we
|
||||
# exercise the retry path without real sleeping. The test for the
|
||||
# backoff-gate itself lives in test_backoff_gate_prevents_retry_too_soon.
|
||||
fake_now = [1_000_000.0]
|
||||
|
||||
def _advance_clock() -> float:
|
||||
fake_now[0] += 10_000.0
|
||||
return fake_now[0]
|
||||
|
||||
invocation_cap = GongConnector.MAX_CALL_DETAILS_ATTEMPTS + 5
|
||||
with patch(
|
||||
"onyx.connectors.gong.connector.time.time", side_effect=_advance_clock
|
||||
):
|
||||
for _ in range(invocation_cap):
|
||||
if not checkpoint.has_more:
|
||||
break
|
||||
generator = connector.load_from_checkpoint(0, fake_now[0], checkpoint)
|
||||
try:
|
||||
while True:
|
||||
item = next(generator)
|
||||
if isinstance(item, ConnectorFailure):
|
||||
failures.append(item)
|
||||
elif isinstance(item, Document):
|
||||
docs.append(item)
|
||||
except StopIteration as e:
|
||||
checkpoint = e.value
|
||||
with patch("onyx.connectors.gong.connector.time.sleep"):
|
||||
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
|
||||
try:
|
||||
while True:
|
||||
item = next(generator)
|
||||
if isinstance(item, ConnectorFailure):
|
||||
failures.append(item)
|
||||
elif isinstance(item, Document):
|
||||
docs.append(item)
|
||||
except StopIteration as e:
|
||||
checkpoint = e.value
|
||||
|
||||
assert len(docs) == 0
|
||||
assert len(failures) == 1
|
||||
assert failures[0].failed_document is not None
|
||||
assert failures[0].failed_document.document_id == "call1"
|
||||
assert checkpoint.has_more is False
|
||||
assert checkpoint.pending_transcripts == {}
|
||||
assert checkpoint.pending_call_details_attempts == 0
|
||||
assert checkpoint.pending_retry_after is None
|
||||
assert mock_request.call_count == 1 + GongConnector.MAX_CALL_DETAILS_ATTEMPTS
|
||||
|
||||
@patch.object(GongConnector, "_throttled_request")
|
||||
def test_multi_workspace_iteration(
|
||||
@@ -407,14 +381,12 @@ class TestGongConnectorCheckpoint:
|
||||
assert checkpoint.workspace_index == 1
|
||||
|
||||
@patch.object(GongConnector, "_throttled_request")
|
||||
def test_partial_details_defers_and_resolves_next_invocation(
|
||||
def test_retry_only_fetches_missing_ids(
|
||||
self,
|
||||
mock_request: MagicMock,
|
||||
connector: GongConnector,
|
||||
) -> None:
|
||||
"""A transcript whose call details are missing gets stashed into
|
||||
pending_transcripts and resolves on a later checkpoint invocation.
|
||||
Resolved docs are yielded in the order they become available."""
|
||||
"""Retry for missing call details should only re-request the missing IDs."""
|
||||
transcript_response = MagicMock()
|
||||
transcript_response.status_code = 200
|
||||
transcript_response.json.return_value = {
|
||||
@@ -432,7 +404,7 @@ class TestGongConnectorCheckpoint:
|
||||
"calls": [_make_call_detail("call1", "Call One")]
|
||||
}
|
||||
|
||||
# Second fetch (next invocation): returns call2
|
||||
# Second fetch (retry): returns call2
|
||||
missing_details = MagicMock()
|
||||
missing_details.status_code = 200
|
||||
missing_details.json.return_value = {
|
||||
@@ -452,48 +424,19 @@ class TestGongConnectorCheckpoint:
|
||||
)
|
||||
|
||||
docs: list[Document] = []
|
||||
|
||||
fake_now = [1_000_000.0]
|
||||
|
||||
def _advance_clock() -> float:
|
||||
fake_now[0] += 10_000.0
|
||||
return fake_now[0]
|
||||
|
||||
with patch(
|
||||
"onyx.connectors.gong.connector.time.time", side_effect=_advance_clock
|
||||
):
|
||||
# Invocation 1: fetches page + details, yields call1, stashes call2
|
||||
generator = connector.load_from_checkpoint(0, fake_now[0], checkpoint)
|
||||
with patch("onyx.connectors.gong.connector.time.sleep"):
|
||||
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
|
||||
try:
|
||||
while True:
|
||||
item = next(generator)
|
||||
if isinstance(item, Document):
|
||||
docs.append(item)
|
||||
except StopIteration as e:
|
||||
checkpoint = e.value
|
||||
|
||||
assert len(docs) == 1
|
||||
assert docs[0].semantic_identifier == "Call One"
|
||||
assert "call2" in checkpoint.pending_transcripts
|
||||
assert checkpoint.pending_call_details_attempts == 1
|
||||
assert checkpoint.pending_retry_after is not None
|
||||
assert checkpoint.has_more is True
|
||||
|
||||
# Invocation 2: retries missing (only call2), yields it, clears pending
|
||||
generator = connector.load_from_checkpoint(0, fake_now[0], checkpoint)
|
||||
try:
|
||||
while True:
|
||||
item = next(generator)
|
||||
if isinstance(item, Document):
|
||||
docs.append(item)
|
||||
except StopIteration as e:
|
||||
checkpoint = e.value
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
assert len(docs) == 2
|
||||
assert docs[0].semantic_identifier == "Call One"
|
||||
assert docs[1].semantic_identifier == "Call Two"
|
||||
assert checkpoint.pending_transcripts == {}
|
||||
assert checkpoint.pending_call_details_attempts == 0
|
||||
assert checkpoint.pending_retry_after is None
|
||||
|
||||
# Verify: 3 API calls total (1 transcript + 1 full details + 1 retry for missing only)
|
||||
assert mock_request.call_count == 3
|
||||
@@ -501,107 +444,6 @@ class TestGongConnectorCheckpoint:
|
||||
retry_call_body = mock_request.call_args_list[2][1]["json"]
|
||||
assert retry_call_body["filter"]["callIds"] == ["call2"]
|
||||
|
||||
@patch.object(GongConnector, "_throttled_request")
|
||||
def test_backoff_gate_prevents_retry_too_soon(
|
||||
self,
|
||||
mock_request: MagicMock,
|
||||
connector: GongConnector,
|
||||
) -> None:
|
||||
"""If the retry-after deadline hasn't elapsed, _resolve_pending must
|
||||
NOT issue a /v2/calls/extensive request. Prevents burning through
|
||||
MAX_CALL_DETAILS_ATTEMPTS when workers re-invoke tightly.
|
||||
"""
|
||||
pending_transcript = _make_transcript("call1")
|
||||
fixed_now = 1_000_000.0
|
||||
# Deadline is 30s in the future from fixed_now
|
||||
retry_after = fixed_now + 30
|
||||
|
||||
checkpoint = GongConnectorCheckpoint(
|
||||
has_more=True,
|
||||
workspace_ids=[None],
|
||||
workspace_index=0,
|
||||
pending_transcripts={"call1": pending_transcript},
|
||||
pending_call_details_attempts=1,
|
||||
pending_retry_after=retry_after,
|
||||
)
|
||||
|
||||
with patch("onyx.connectors.gong.connector.time.time", return_value=fixed_now):
|
||||
generator = connector.load_from_checkpoint(0, fixed_now, checkpoint)
|
||||
try:
|
||||
while True:
|
||||
next(generator)
|
||||
except StopIteration as e:
|
||||
checkpoint = e.value
|
||||
|
||||
# No API calls should have been made — we were inside the backoff window
|
||||
mock_request.assert_not_called()
|
||||
# Pending state preserved for later retry
|
||||
assert "call1" in checkpoint.pending_transcripts
|
||||
assert checkpoint.pending_call_details_attempts == 1
|
||||
assert checkpoint.pending_retry_after == retry_after
|
||||
assert checkpoint.has_more is True
|
||||
|
||||
@patch.object(GongConnector, "_throttled_request")
|
||||
def test_pending_retry_does_not_block_on_time_sleep(
|
||||
self,
|
||||
mock_request: MagicMock,
|
||||
connector: GongConnector,
|
||||
) -> None:
|
||||
"""Pending-transcript retry must never call time.sleep() with a
|
||||
non-trivial delay — spacing between retries is enforced via the
|
||||
wall-clock retry-after deadline stored on the checkpoint, not by
|
||||
blocking inside load_from_checkpoint.
|
||||
"""
|
||||
transcript_response = MagicMock()
|
||||
transcript_response.status_code = 200
|
||||
transcript_response.json.return_value = {
|
||||
"callTranscripts": [_make_transcript("call1")],
|
||||
"records": {},
|
||||
}
|
||||
empty_details = MagicMock()
|
||||
empty_details.status_code = 200
|
||||
empty_details.json.return_value = {"calls": []}
|
||||
|
||||
mock_request.side_effect = [transcript_response] + [
|
||||
empty_details
|
||||
] * GongConnector.MAX_CALL_DETAILS_ATTEMPTS
|
||||
|
||||
checkpoint = GongConnectorCheckpoint(
|
||||
has_more=True,
|
||||
workspace_ids=[None],
|
||||
workspace_index=0,
|
||||
)
|
||||
|
||||
fake_now = [1_000_000.0]
|
||||
|
||||
def _advance_clock() -> float:
|
||||
fake_now[0] += 10_000.0
|
||||
return fake_now[0]
|
||||
|
||||
with (
|
||||
patch("onyx.connectors.gong.connector.time.sleep") as mock_sleep,
|
||||
patch(
|
||||
"onyx.connectors.gong.connector.time.time", side_effect=_advance_clock
|
||||
),
|
||||
):
|
||||
invocation_cap = GongConnector.MAX_CALL_DETAILS_ATTEMPTS + 5
|
||||
for _ in range(invocation_cap):
|
||||
if not checkpoint.has_more:
|
||||
break
|
||||
generator = connector.load_from_checkpoint(0, fake_now[0], checkpoint)
|
||||
try:
|
||||
while True:
|
||||
next(generator)
|
||||
except StopIteration as e:
|
||||
checkpoint = e.value
|
||||
|
||||
# The only legitimate sleep is the sub-second throttle in
|
||||
# _throttled_request (<= MIN_REQUEST_INTERVAL). Assert we never
|
||||
# sleep for anything close to the per-retry backoff delays.
|
||||
for call in mock_sleep.call_args_list:
|
||||
delay_arg = call.args[0] if call.args else 0
|
||||
assert delay_arg <= GongConnector.MIN_REQUEST_INTERVAL
|
||||
|
||||
@patch.object(GongConnector, "_throttled_request")
|
||||
def test_expired_cursor_restarts_workspace(
|
||||
self,
|
||||
|
||||
@@ -287,140 +287,3 @@ class TestFailedFolderIdsByEmail:
|
||||
)
|
||||
|
||||
assert len(failed_map) == 0
|
||||
|
||||
|
||||
class TestOrphanedPathBackfill:
|
||||
def _make_failed_map(
|
||||
self, entries: dict[str, set[str]]
|
||||
) -> ThreadSafeDict[str, ThreadSafeSet[str]]:
|
||||
return ThreadSafeDict({k: ThreadSafeSet(v) for k, v in entries.items()})
|
||||
|
||||
def _make_file(self, parent_id: str) -> MagicMock:
|
||||
file = MagicMock()
|
||||
file.user_email = "retriever@example.com"
|
||||
file.drive_file = {"parents": [parent_id]}
|
||||
return file
|
||||
|
||||
def test_backfills_intermediate_folders_into_failed_map(self) -> None:
|
||||
"""When a walk dead-ends at a confirmed orphan, all intermediate folder
|
||||
IDs must be added to failed_folder_ids_by_email for both emails so
|
||||
future files short-circuit via _get_folder_metadata's cache check."""
|
||||
connector = _make_connector()
|
||||
|
||||
# Chain: folderA -> folderB -> folderC (confirmed orphan)
|
||||
failed_map = self._make_failed_map(
|
||||
{
|
||||
"retriever@example.com": {"folderC"},
|
||||
"admin@example.com": {"folderC"},
|
||||
}
|
||||
)
|
||||
|
||||
folder_a = {"id": "folderA", "name": "A", "parents": ["folderB"]}
|
||||
folder_b = {"id": "folderB", "name": "B", "parents": ["folderC"]}
|
||||
|
||||
def mock_get_folder(
|
||||
_service: MagicMock, folder_id: str, _field_type: DriveFileFieldType
|
||||
) -> dict | None:
|
||||
if folder_id == "folderA":
|
||||
return folder_a
|
||||
if folder_id == "folderB":
|
||||
return folder_b
|
||||
return None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"onyx.connectors.google_drive.connector.get_drive_service",
|
||||
return_value=MagicMock(),
|
||||
),
|
||||
patch(
|
||||
"onyx.connectors.google_drive.connector.get_folder_metadata",
|
||||
side_effect=mock_get_folder,
|
||||
),
|
||||
):
|
||||
connector._get_new_ancestors_for_files(
|
||||
files=[self._make_file("folderA")],
|
||||
seen_hierarchy_node_raw_ids=ThreadSafeSet(),
|
||||
fully_walked_hierarchy_node_raw_ids=ThreadSafeSet(),
|
||||
failed_folder_ids_by_email=failed_map,
|
||||
)
|
||||
|
||||
# Both emails confirmed folderC as orphan, so both get the backfill
|
||||
for email in ("retriever@example.com", "admin@example.com"):
|
||||
cached = failed_map.get(email, ThreadSafeSet())
|
||||
assert "folderA" in cached
|
||||
assert "folderB" in cached
|
||||
assert "folderC" in cached
|
||||
|
||||
def test_backfills_only_for_confirming_email(self) -> None:
|
||||
"""Only the email that confirmed the orphan gets the path backfilled."""
|
||||
connector = _make_connector()
|
||||
|
||||
# Only retriever confirmed folderC as orphan; admin has no entry
|
||||
failed_map = self._make_failed_map({"retriever@example.com": {"folderC"}})
|
||||
|
||||
folder_a = {"id": "folderA", "name": "A", "parents": ["folderB"]}
|
||||
folder_b = {"id": "folderB", "name": "B", "parents": ["folderC"]}
|
||||
|
||||
def mock_get_folder(
|
||||
_service: MagicMock, folder_id: str, _field_type: DriveFileFieldType
|
||||
) -> dict | None:
|
||||
if folder_id == "folderA":
|
||||
return folder_a
|
||||
if folder_id == "folderB":
|
||||
return folder_b
|
||||
return None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"onyx.connectors.google_drive.connector.get_drive_service",
|
||||
return_value=MagicMock(),
|
||||
),
|
||||
patch(
|
||||
"onyx.connectors.google_drive.connector.get_folder_metadata",
|
||||
side_effect=mock_get_folder,
|
||||
),
|
||||
):
|
||||
connector._get_new_ancestors_for_files(
|
||||
files=[self._make_file("folderA")],
|
||||
seen_hierarchy_node_raw_ids=ThreadSafeSet(),
|
||||
fully_walked_hierarchy_node_raw_ids=ThreadSafeSet(),
|
||||
failed_folder_ids_by_email=failed_map,
|
||||
)
|
||||
|
||||
retriever_cached = failed_map.get("retriever@example.com", ThreadSafeSet())
|
||||
assert "folderA" in retriever_cached
|
||||
assert "folderB" in retriever_cached
|
||||
|
||||
# admin did not confirm the orphan — must not get the backfill
|
||||
assert failed_map.get("admin@example.com") is None
|
||||
|
||||
def test_short_circuits_on_backfilled_intermediate(self) -> None:
|
||||
"""A second file whose parent is already in failed_folder_ids_by_email
|
||||
must not trigger any folder metadata API calls."""
|
||||
connector = _make_connector()
|
||||
|
||||
# folderA already in the failed map from a previous walk
|
||||
failed_map = self._make_failed_map(
|
||||
{
|
||||
"retriever@example.com": {"folderA"},
|
||||
"admin@example.com": {"folderA"},
|
||||
}
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"onyx.connectors.google_drive.connector.get_drive_service",
|
||||
return_value=MagicMock(),
|
||||
),
|
||||
patch(
|
||||
"onyx.connectors.google_drive.connector.get_folder_metadata"
|
||||
) as mock_api,
|
||||
):
|
||||
connector._get_new_ancestors_for_files(
|
||||
files=[self._make_file("folderA")],
|
||||
seen_hierarchy_node_raw_ids=ThreadSafeSet(),
|
||||
fully_walked_hierarchy_node_raw_ids=ThreadSafeSet(),
|
||||
failed_folder_ids_by_email=failed_map,
|
||||
)
|
||||
|
||||
mock_api.assert_not_called()
|
||||
|
||||
@@ -1,194 +0,0 @@
|
||||
"""Unit tests for SharepointConnector site-page slim resilience and
|
||||
validate_connector_settings RoleAssignments permission probe."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from onyx.connectors.exceptions import ConnectorValidationError
|
||||
from onyx.connectors.sharepoint.connector import SharepointConnector
|
||||
|
||||
SITE_URL = "https://tenant.sharepoint.com/sites/MySite"
|
||||
|
||||
|
||||
def _make_connector() -> SharepointConnector:
|
||||
connector = SharepointConnector(sites=[SITE_URL])
|
||||
connector.msal_app = MagicMock()
|
||||
connector.sp_tenant_domain = "tenant"
|
||||
connector._credential_json = {"sp_client_id": "x", "sp_directory_id": "y"}
|
||||
connector._graph_client = MagicMock()
|
||||
return connector
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _fetch_slim_documents_from_sharepoint — site page error resilience
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@patch("onyx.connectors.sharepoint.connector._convert_sitepage_to_slim_document")
|
||||
@patch(
|
||||
"onyx.connectors.sharepoint.connector.SharepointConnector._create_rest_client_context"
|
||||
)
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_site_pages")
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_driveitems")
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector.fetch_sites")
|
||||
def test_site_page_error_does_not_crash(
|
||||
mock_fetch_sites: MagicMock,
|
||||
mock_fetch_driveitems: MagicMock,
|
||||
mock_fetch_site_pages: MagicMock,
|
||||
_mock_create_ctx: MagicMock,
|
||||
mock_convert: MagicMock,
|
||||
) -> None:
|
||||
"""A 401 (or any exception) on a site page is caught; remaining pages are processed."""
|
||||
from onyx.connectors.models import SlimDocument
|
||||
|
||||
connector = _make_connector()
|
||||
connector.include_site_documents = False
|
||||
connector.include_site_pages = True
|
||||
|
||||
site = MagicMock()
|
||||
site.url = SITE_URL
|
||||
mock_fetch_sites.return_value = [site]
|
||||
mock_fetch_driveitems.return_value = iter([])
|
||||
|
||||
page_ok = {"id": "1", "webUrl": SITE_URL + "/SitePages/Good.aspx"}
|
||||
page_bad = {"id": "2", "webUrl": SITE_URL + "/SitePages/Bad.aspx"}
|
||||
mock_fetch_site_pages.return_value = [page_bad, page_ok]
|
||||
|
||||
good_slim = SlimDocument(id="1")
|
||||
|
||||
def _convert_side_effect(
|
||||
page: dict, *_args: object, **_kwargs: object
|
||||
) -> SlimDocument: # noqa: ANN001
|
||||
if page["id"] == "2":
|
||||
from office365.runtime.client_request import ClientRequestException
|
||||
|
||||
raise ClientRequestException(MagicMock(status_code=401), None)
|
||||
return good_slim
|
||||
|
||||
mock_convert.side_effect = _convert_side_effect
|
||||
|
||||
results = [
|
||||
doc
|
||||
for batch in connector._fetch_slim_documents_from_sharepoint()
|
||||
for doc in batch
|
||||
if isinstance(doc, SlimDocument)
|
||||
]
|
||||
|
||||
# Only the good page makes it through; bad page is skipped, no exception raised.
|
||||
assert any(d.id == "1" for d in results)
|
||||
assert not any(d.id == "2" for d in results)
|
||||
|
||||
|
||||
@patch("onyx.connectors.sharepoint.connector._convert_sitepage_to_slim_document")
|
||||
@patch(
|
||||
"onyx.connectors.sharepoint.connector.SharepointConnector._create_rest_client_context"
|
||||
)
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_site_pages")
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector._fetch_driveitems")
|
||||
@patch("onyx.connectors.sharepoint.connector.SharepointConnector.fetch_sites")
|
||||
def test_all_site_pages_fail_does_not_crash(
|
||||
mock_fetch_sites: MagicMock,
|
||||
mock_fetch_driveitems: MagicMock,
|
||||
mock_fetch_site_pages: MagicMock,
|
||||
_mock_create_ctx: MagicMock,
|
||||
mock_convert: MagicMock,
|
||||
) -> None:
|
||||
"""When every site page fails, the generator completes without raising."""
|
||||
connector = _make_connector()
|
||||
connector.include_site_documents = False
|
||||
connector.include_site_pages = True
|
||||
|
||||
site = MagicMock()
|
||||
site.url = SITE_URL
|
||||
mock_fetch_sites.return_value = [site]
|
||||
mock_fetch_driveitems.return_value = iter([])
|
||||
mock_fetch_site_pages.return_value = [
|
||||
{"id": "1", "webUrl": SITE_URL + "/SitePages/A.aspx"},
|
||||
{"id": "2", "webUrl": SITE_URL + "/SitePages/B.aspx"},
|
||||
]
|
||||
mock_convert.side_effect = RuntimeError("context error")
|
||||
|
||||
from onyx.connectors.models import SlimDocument
|
||||
|
||||
# Should not raise; no SlimDocuments in output (only hierarchy nodes).
|
||||
slim_results = [
|
||||
doc
|
||||
for batch in connector._fetch_slim_documents_from_sharepoint()
|
||||
for doc in batch
|
||||
if isinstance(doc, SlimDocument)
|
||||
]
|
||||
assert slim_results == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# validate_connector_settings — RoleAssignments permission probe
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status_code", [401, 403])
|
||||
@patch("onyx.connectors.sharepoint.connector.requests.get")
|
||||
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
|
||||
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
|
||||
def test_validate_raises_on_401_or_403(
|
||||
mock_acquire: MagicMock,
|
||||
_mock_validate_url: MagicMock,
|
||||
mock_get: MagicMock,
|
||||
status_code: int,
|
||||
) -> None:
|
||||
"""validate_connector_settings raises ConnectorValidationError when probe returns 401 or 403."""
|
||||
mock_acquire.return_value = MagicMock(accessToken="tok")
|
||||
mock_get.return_value = MagicMock(status_code=status_code)
|
||||
|
||||
connector = _make_connector()
|
||||
|
||||
with pytest.raises(ConnectorValidationError, match="Sites.FullControl.All"):
|
||||
connector.validate_connector_settings()
|
||||
|
||||
|
||||
@patch("onyx.connectors.sharepoint.connector.requests.get")
|
||||
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
|
||||
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
|
||||
def test_validate_passes_on_200(
|
||||
mock_acquire: MagicMock,
|
||||
_mock_validate_url: MagicMock,
|
||||
mock_get: MagicMock,
|
||||
) -> None:
|
||||
"""validate_connector_settings does not raise when probe returns 200."""
|
||||
mock_acquire.return_value = MagicMock(accessToken="tok")
|
||||
mock_get.return_value = MagicMock(status_code=200)
|
||||
|
||||
connector = _make_connector()
|
||||
connector.validate_connector_settings() # should not raise
|
||||
|
||||
|
||||
@patch("onyx.connectors.sharepoint.connector.requests.get")
|
||||
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
|
||||
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
|
||||
def test_validate_passes_on_network_error(
|
||||
mock_acquire: MagicMock,
|
||||
_mock_validate_url: MagicMock,
|
||||
mock_get: MagicMock,
|
||||
) -> None:
|
||||
"""Network errors during the probe are non-blocking (logged as warning only)."""
|
||||
mock_acquire.return_value = MagicMock(accessToken="tok")
|
||||
mock_get.side_effect = Exception("timeout")
|
||||
|
||||
connector = _make_connector()
|
||||
connector.validate_connector_settings() # should not raise
|
||||
|
||||
|
||||
@patch("onyx.connectors.sharepoint.connector.validate_outbound_http_url")
|
||||
@patch("onyx.connectors.sharepoint.connector.acquire_token_for_rest")
|
||||
def test_validate_skips_probe_without_credentials(
|
||||
mock_acquire: MagicMock,
|
||||
_mock_validate_url: MagicMock,
|
||||
) -> None:
|
||||
"""Probe is skipped when credentials have not been loaded."""
|
||||
connector = SharepointConnector(sites=[SITE_URL])
|
||||
# msal_app and sp_tenant_domain are None — probe must be skipped.
|
||||
connector.validate_connector_settings() # should not raise
|
||||
mock_acquire.assert_not_called()
|
||||
@@ -1,243 +0,0 @@
|
||||
"""Unit tests for WebConnector.retrieve_all_slim_docs (slim pruning path)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import patch
|
||||
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.connectors.web.connector import WEB_CONNECTOR_VALID_SETTINGS
|
||||
from onyx.connectors.web.connector import WebConnector
|
||||
|
||||
BASE_URL = "http://example.com"
|
||||
|
||||
SINGLE_PAGE_HTML = (
|
||||
"<html><body><p>Content that should not appear in slim output</p></body></html>"
|
||||
)
|
||||
|
||||
RECURSIVE_ROOT_HTML = """
|
||||
<html><body>
|
||||
<a href="/page2">Page 2</a>
|
||||
<a href="/page3">Page 3</a>
|
||||
</body></html>
|
||||
"""
|
||||
|
||||
PAGE2_HTML = "<html><body><p>page 2</p></body></html>"
|
||||
PAGE3_HTML = "<html><body><p>page 3</p></body></html>"
|
||||
|
||||
|
||||
def _make_playwright_context_mock(url_to_html: dict[str, str]) -> MagicMock:
|
||||
"""Return a BrowserContext mock whose pages respond based on goto URL."""
|
||||
context = MagicMock()
|
||||
|
||||
def _new_page() -> MagicMock:
|
||||
page = MagicMock()
|
||||
visited: list[str] = []
|
||||
|
||||
def _goto(url: str, **kwargs: Any) -> MagicMock: # noqa: ARG001
|
||||
visited.append(url)
|
||||
page.url = url
|
||||
response = MagicMock()
|
||||
response.status = 200
|
||||
response.header_value.return_value = None # no cf-ray
|
||||
return response
|
||||
|
||||
def _content() -> str:
|
||||
return url_to_html.get(
|
||||
visited[-1] if visited else "", "<html><body></body></html>"
|
||||
)
|
||||
|
||||
page.goto.side_effect = _goto
|
||||
page.content.side_effect = _content
|
||||
return page
|
||||
|
||||
context.new_page.side_effect = _new_page
|
||||
return context
|
||||
|
||||
|
||||
def _make_playwright_mock() -> MagicMock:
|
||||
playwright = MagicMock()
|
||||
playwright.stop = MagicMock()
|
||||
return playwright
|
||||
|
||||
|
||||
def _make_page_mock(
|
||||
html: str, cf_ray: str | None = None, status: int = 200
|
||||
) -> MagicMock:
|
||||
"""Return a Playwright page mock with configurable status and CF header."""
|
||||
page = MagicMock()
|
||||
page.url = BASE_URL + "/"
|
||||
response = MagicMock()
|
||||
response.status = status
|
||||
response.header_value.side_effect = lambda h: cf_ray if h == "cf-ray" else None
|
||||
page.goto.return_value = response
|
||||
page.content.return_value = html
|
||||
return page
|
||||
|
||||
|
||||
@patch("onyx.connectors.web.connector.check_internet_connection")
|
||||
@patch("onyx.connectors.web.connector.requests.head")
|
||||
@patch("onyx.connectors.web.connector.start_playwright")
|
||||
def test_slim_yields_slim_documents(
|
||||
mock_start_playwright: MagicMock,
|
||||
mock_head: MagicMock,
|
||||
_mock_check: MagicMock,
|
||||
) -> None:
|
||||
"""retrieve_all_slim_docs yields SlimDocuments with the correct URL as id."""
|
||||
context = _make_playwright_context_mock({BASE_URL + "/": SINGLE_PAGE_HTML})
|
||||
mock_start_playwright.return_value = (_make_playwright_mock(), context)
|
||||
mock_head.return_value.headers = {"content-type": "text/html"}
|
||||
|
||||
connector = WebConnector(
|
||||
base_url=BASE_URL + "/",
|
||||
web_connector_type=WEB_CONNECTOR_VALID_SETTINGS.SINGLE.value,
|
||||
)
|
||||
|
||||
docs = [doc for batch in connector.retrieve_all_slim_docs() for doc in batch]
|
||||
|
||||
assert len(docs) == 1
|
||||
assert isinstance(docs[0], SlimDocument)
|
||||
assert docs[0].id == BASE_URL + "/"
|
||||
|
||||
|
||||
@patch("onyx.connectors.web.connector.check_internet_connection")
|
||||
@patch("onyx.connectors.web.connector.requests.head")
|
||||
@patch("onyx.connectors.web.connector.start_playwright")
|
||||
def test_slim_skips_content_extraction(
|
||||
mock_start_playwright: MagicMock,
|
||||
mock_head: MagicMock,
|
||||
_mock_check: MagicMock,
|
||||
) -> None:
|
||||
"""web_html_cleanup is never called in slim mode."""
|
||||
context = _make_playwright_context_mock({BASE_URL + "/": SINGLE_PAGE_HTML})
|
||||
mock_start_playwright.return_value = (_make_playwright_mock(), context)
|
||||
mock_head.return_value.headers = {"content-type": "text/html"}
|
||||
|
||||
connector = WebConnector(
|
||||
base_url=BASE_URL + "/",
|
||||
web_connector_type=WEB_CONNECTOR_VALID_SETTINGS.SINGLE.value,
|
||||
)
|
||||
|
||||
with patch("onyx.connectors.web.connector.web_html_cleanup") as mock_cleanup:
|
||||
list(connector.retrieve_all_slim_docs())
|
||||
mock_cleanup.assert_not_called()
|
||||
|
||||
|
||||
@patch("onyx.connectors.web.connector.check_internet_connection")
|
||||
@patch("onyx.connectors.web.connector.requests.head")
|
||||
@patch("onyx.connectors.web.connector.start_playwright")
|
||||
def test_slim_discovers_links_recursively(
|
||||
mock_start_playwright: MagicMock,
|
||||
mock_head: MagicMock,
|
||||
_mock_check: MagicMock,
|
||||
) -> None:
|
||||
"""In RECURSIVE mode, internal <a href> links are followed and all URLs yielded."""
|
||||
url_to_html = {
|
||||
BASE_URL + "/": RECURSIVE_ROOT_HTML,
|
||||
BASE_URL + "/page2": PAGE2_HTML,
|
||||
BASE_URL + "/page3": PAGE3_HTML,
|
||||
}
|
||||
context = _make_playwright_context_mock(url_to_html)
|
||||
mock_start_playwright.return_value = (_make_playwright_mock(), context)
|
||||
mock_head.return_value.headers = {"content-type": "text/html"}
|
||||
|
||||
connector = WebConnector(
|
||||
base_url=BASE_URL + "/",
|
||||
web_connector_type=WEB_CONNECTOR_VALID_SETTINGS.RECURSIVE.value,
|
||||
)
|
||||
|
||||
ids = {
|
||||
doc.id
|
||||
for batch in connector.retrieve_all_slim_docs()
|
||||
for doc in batch
|
||||
if isinstance(doc, SlimDocument)
|
||||
}
|
||||
|
||||
assert ids == {
|
||||
BASE_URL + "/",
|
||||
BASE_URL + "/page2",
|
||||
BASE_URL + "/page3",
|
||||
}
|
||||
|
||||
|
||||
@patch("onyx.connectors.web.connector.check_internet_connection")
|
||||
@patch("onyx.connectors.web.connector.requests.head")
|
||||
@patch("onyx.connectors.web.connector.start_playwright")
|
||||
def test_normal_200_skips_5s_wait(
|
||||
mock_start_playwright: MagicMock,
|
||||
mock_head: MagicMock,
|
||||
_mock_check: MagicMock,
|
||||
) -> None:
|
||||
"""Normal 200 responses without bot-detection signals skip the 5s render wait."""
|
||||
page = _make_page_mock(SINGLE_PAGE_HTML, cf_ray=None, status=200)
|
||||
context = MagicMock()
|
||||
context.new_page.return_value = page
|
||||
mock_start_playwright.return_value = (_make_playwright_mock(), context)
|
||||
mock_head.return_value.headers = {"content-type": "text/html"}
|
||||
|
||||
connector = WebConnector(
|
||||
base_url=BASE_URL + "/",
|
||||
web_connector_type=WEB_CONNECTOR_VALID_SETTINGS.SINGLE.value,
|
||||
)
|
||||
|
||||
list(connector.retrieve_all_slim_docs())
|
||||
|
||||
page.wait_for_timeout.assert_not_called()
|
||||
|
||||
|
||||
@patch("onyx.connectors.web.connector.check_internet_connection")
|
||||
@patch("onyx.connectors.web.connector.requests.head")
|
||||
@patch("onyx.connectors.web.connector.start_playwright")
|
||||
def test_cloudflare_applies_5s_wait(
|
||||
mock_start_playwright: MagicMock,
|
||||
mock_head: MagicMock,
|
||||
_mock_check: MagicMock,
|
||||
) -> None:
|
||||
"""Pages with a cf-ray header trigger the 5s wait before networkidle."""
|
||||
page = _make_page_mock(SINGLE_PAGE_HTML, cf_ray="abc123-LAX")
|
||||
context = MagicMock()
|
||||
context.new_page.return_value = page
|
||||
mock_start_playwright.return_value = (_make_playwright_mock(), context)
|
||||
mock_head.return_value.headers = {"content-type": "text/html"}
|
||||
|
||||
connector = WebConnector(
|
||||
base_url=BASE_URL + "/",
|
||||
web_connector_type=WEB_CONNECTOR_VALID_SETTINGS.SINGLE.value,
|
||||
)
|
||||
|
||||
list(connector.retrieve_all_slim_docs())
|
||||
|
||||
page.wait_for_timeout.assert_called_once_with(5000)
|
||||
|
||||
|
||||
@patch("onyx.connectors.web.connector.time")
|
||||
@patch("onyx.connectors.web.connector.check_internet_connection")
|
||||
@patch("onyx.connectors.web.connector.requests.head")
|
||||
@patch("onyx.connectors.web.connector.start_playwright")
|
||||
def test_403_applies_5s_wait(
|
||||
mock_start_playwright: MagicMock,
|
||||
mock_head: MagicMock,
|
||||
_mock_check: MagicMock,
|
||||
_mock_time: MagicMock,
|
||||
) -> None:
|
||||
"""A 403 response triggers the 5s wait (common bot-detection challenge entry point)."""
|
||||
page = _make_page_mock(SINGLE_PAGE_HTML, cf_ray=None, status=403)
|
||||
context = MagicMock()
|
||||
context.new_page.return_value = page
|
||||
mock_start_playwright.return_value = (_make_playwright_mock(), context)
|
||||
mock_head.return_value.headers = {"content-type": "text/html"}
|
||||
|
||||
connector = WebConnector(
|
||||
base_url=BASE_URL + "/",
|
||||
web_connector_type=WEB_CONNECTOR_VALID_SETTINGS.SINGLE.value,
|
||||
)
|
||||
|
||||
# All retries return 403 so no docs are found — that's expected here.
|
||||
# We only care that the 5s wait fired.
|
||||
try:
|
||||
list(connector.retrieve_all_slim_docs())
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
page.wait_for_timeout.assert_called_with(5000)
|
||||
@@ -1,319 +0,0 @@
|
||||
"""Tests for permission-sync-specific Prometheus metrics."""
|
||||
|
||||
import pytest
|
||||
|
||||
from onyx.server.metrics.perm_sync_metrics import DOC_PERM_SYNC_DB_UPDATE_DURATION
|
||||
from onyx.server.metrics.perm_sync_metrics import DOC_PERM_SYNC_DOCS_PROCESSED
|
||||
from onyx.server.metrics.perm_sync_metrics import DOC_PERM_SYNC_DURATION
|
||||
from onyx.server.metrics.perm_sync_metrics import DOC_PERM_SYNC_ERRORS
|
||||
from onyx.server.metrics.perm_sync_metrics import GROUP_SYNC_DURATION
|
||||
from onyx.server.metrics.perm_sync_metrics import GROUP_SYNC_ERRORS
|
||||
from onyx.server.metrics.perm_sync_metrics import GROUP_SYNC_GROUPS_PROCESSED
|
||||
from onyx.server.metrics.perm_sync_metrics import GROUP_SYNC_UPSERT_DURATION
|
||||
from onyx.server.metrics.perm_sync_metrics import GROUP_SYNC_USERS_PROCESSED
|
||||
from onyx.server.metrics.perm_sync_metrics import inc_doc_perm_sync_docs_processed
|
||||
from onyx.server.metrics.perm_sync_metrics import inc_doc_perm_sync_errors
|
||||
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_errors
|
||||
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_groups_processed
|
||||
from onyx.server.metrics.perm_sync_metrics import inc_group_sync_users_processed
|
||||
from onyx.server.metrics.perm_sync_metrics import (
|
||||
observe_doc_perm_sync_db_update_duration,
|
||||
)
|
||||
from onyx.server.metrics.perm_sync_metrics import observe_doc_perm_sync_duration
|
||||
from onyx.server.metrics.perm_sync_metrics import observe_group_sync_duration
|
||||
from onyx.server.metrics.perm_sync_metrics import observe_group_sync_upsert_duration
|
||||
|
||||
|
||||
# --- Doc permission sync: overall duration ---
|
||||
|
||||
|
||||
class TestObserveDocPermSyncDuration:
|
||||
def test_observes_duration(self) -> None:
|
||||
before = DOC_PERM_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
|
||||
|
||||
observe_doc_perm_sync_duration(10.0, "google_drive")
|
||||
|
||||
after = DOC_PERM_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
|
||||
assert after == pytest.approx(before + 10.0)
|
||||
|
||||
def test_labels_by_connector_type(self) -> None:
|
||||
before_gd = DOC_PERM_SYNC_DURATION.labels(
|
||||
connector_type="google_drive"
|
||||
)._sum.get()
|
||||
before_conf = DOC_PERM_SYNC_DURATION.labels(
|
||||
connector_type="confluence"
|
||||
)._sum.get()
|
||||
|
||||
observe_doc_perm_sync_duration(5.0, "google_drive")
|
||||
|
||||
after_gd = DOC_PERM_SYNC_DURATION.labels(
|
||||
connector_type="google_drive"
|
||||
)._sum.get()
|
||||
after_conf = DOC_PERM_SYNC_DURATION.labels(
|
||||
connector_type="confluence"
|
||||
)._sum.get()
|
||||
|
||||
assert after_gd == pytest.approx(before_gd + 5.0)
|
||||
assert after_conf == pytest.approx(before_conf)
|
||||
|
||||
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
DOC_PERM_SYNC_DURATION,
|
||||
"labels",
|
||||
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
|
||||
)
|
||||
observe_doc_perm_sync_duration(1.0, "google_drive")
|
||||
|
||||
|
||||
# --- Doc permission sync: DB update duration ---
|
||||
|
||||
|
||||
class TestObserveDocPermSyncDbUpdateDuration:
|
||||
def test_observes_duration(self) -> None:
|
||||
before = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
|
||||
connector_type="confluence"
|
||||
)._sum.get()
|
||||
|
||||
observe_doc_perm_sync_db_update_duration(3.0, "confluence")
|
||||
|
||||
after = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
|
||||
connector_type="confluence"
|
||||
)._sum.get()
|
||||
assert after == pytest.approx(before + 3.0)
|
||||
|
||||
def test_labels_by_connector_type(self) -> None:
|
||||
before_conf = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
|
||||
connector_type="confluence"
|
||||
)._sum.get()
|
||||
before_slack = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
|
||||
connector_type="slack"
|
||||
)._sum.get()
|
||||
|
||||
observe_doc_perm_sync_db_update_duration(2.0, "confluence")
|
||||
|
||||
after_conf = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
|
||||
connector_type="confluence"
|
||||
)._sum.get()
|
||||
after_slack = DOC_PERM_SYNC_DB_UPDATE_DURATION.labels(
|
||||
connector_type="slack"
|
||||
)._sum.get()
|
||||
|
||||
assert after_conf == pytest.approx(before_conf + 2.0)
|
||||
assert after_slack == pytest.approx(before_slack)
|
||||
|
||||
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
DOC_PERM_SYNC_DB_UPDATE_DURATION,
|
||||
"labels",
|
||||
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
|
||||
)
|
||||
observe_doc_perm_sync_db_update_duration(1.0, "confluence")
|
||||
|
||||
|
||||
# --- Doc permission sync: docs processed counter ---
|
||||
|
||||
|
||||
class TestIncDocPermSyncDocsProcessed:
|
||||
def test_increments_counter(self) -> None:
|
||||
before = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
|
||||
connector_type="google_drive"
|
||||
)._value.get()
|
||||
|
||||
inc_doc_perm_sync_docs_processed("google_drive", 5)
|
||||
|
||||
after = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
|
||||
connector_type="google_drive"
|
||||
)._value.get()
|
||||
assert after == before + 5
|
||||
|
||||
def test_labels_by_connector_type(self) -> None:
|
||||
before_gd = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
|
||||
connector_type="google_drive"
|
||||
)._value.get()
|
||||
before_jira = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
|
||||
connector_type="jira"
|
||||
)._value.get()
|
||||
|
||||
inc_doc_perm_sync_docs_processed("google_drive", 3)
|
||||
|
||||
after_gd = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
|
||||
connector_type="google_drive"
|
||||
)._value.get()
|
||||
after_jira = DOC_PERM_SYNC_DOCS_PROCESSED.labels(
|
||||
connector_type="jira"
|
||||
)._value.get()
|
||||
|
||||
assert after_gd == before_gd + 3
|
||||
assert after_jira == before_jira
|
||||
|
||||
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
DOC_PERM_SYNC_DOCS_PROCESSED,
|
||||
"labels",
|
||||
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
|
||||
)
|
||||
inc_doc_perm_sync_docs_processed("google_drive")
|
||||
|
||||
|
||||
# --- Doc permission sync: errors counter ---
|
||||
|
||||
|
||||
class TestIncDocPermSyncErrors:
|
||||
def test_increments_counter(self) -> None:
|
||||
before = DOC_PERM_SYNC_ERRORS.labels(connector_type="sharepoint")._value.get()
|
||||
|
||||
inc_doc_perm_sync_errors("sharepoint", 2)
|
||||
|
||||
after = DOC_PERM_SYNC_ERRORS.labels(connector_type="sharepoint")._value.get()
|
||||
assert after == before + 2
|
||||
|
||||
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
DOC_PERM_SYNC_ERRORS,
|
||||
"labels",
|
||||
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
|
||||
)
|
||||
inc_doc_perm_sync_errors("sharepoint")
|
||||
|
||||
|
||||
# --- Group sync: overall duration ---
|
||||
|
||||
|
||||
class TestObserveGroupSyncDuration:
|
||||
def test_observes_duration(self) -> None:
|
||||
before = GROUP_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
|
||||
|
||||
observe_group_sync_duration(20.0, "google_drive")
|
||||
|
||||
after = GROUP_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
|
||||
assert after == pytest.approx(before + 20.0)
|
||||
|
||||
def test_labels_by_connector_type(self) -> None:
|
||||
before_gd = GROUP_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
|
||||
before_slack = GROUP_SYNC_DURATION.labels(connector_type="slack")._sum.get()
|
||||
|
||||
observe_group_sync_duration(7.0, "google_drive")
|
||||
|
||||
after_gd = GROUP_SYNC_DURATION.labels(connector_type="google_drive")._sum.get()
|
||||
after_slack = GROUP_SYNC_DURATION.labels(connector_type="slack")._sum.get()
|
||||
|
||||
assert after_gd == pytest.approx(before_gd + 7.0)
|
||||
assert after_slack == pytest.approx(before_slack)
|
||||
|
||||
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
GROUP_SYNC_DURATION,
|
||||
"labels",
|
||||
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
|
||||
)
|
||||
observe_group_sync_duration(1.0, "google_drive")
|
||||
|
||||
|
||||
# --- Group sync: upsert duration ---
|
||||
|
||||
|
||||
class TestObserveGroupSyncUpsertDuration:
|
||||
def test_observes_duration(self) -> None:
|
||||
before = GROUP_SYNC_UPSERT_DURATION.labels(
|
||||
connector_type="confluence"
|
||||
)._sum.get()
|
||||
|
||||
observe_group_sync_upsert_duration(4.0, "confluence")
|
||||
|
||||
after = GROUP_SYNC_UPSERT_DURATION.labels(
|
||||
connector_type="confluence"
|
||||
)._sum.get()
|
||||
assert after == pytest.approx(before + 4.0)
|
||||
|
||||
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
GROUP_SYNC_UPSERT_DURATION,
|
||||
"labels",
|
||||
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
|
||||
)
|
||||
observe_group_sync_upsert_duration(1.0, "confluence")
|
||||
|
||||
|
||||
# --- Group sync: groups processed counter ---
|
||||
|
||||
|
||||
class TestIncGroupSyncGroupsProcessed:
|
||||
def test_increments_counter(self) -> None:
|
||||
before = GROUP_SYNC_GROUPS_PROCESSED.labels(
|
||||
connector_type="github"
|
||||
)._value.get()
|
||||
|
||||
inc_group_sync_groups_processed("github", 10)
|
||||
|
||||
after = GROUP_SYNC_GROUPS_PROCESSED.labels(connector_type="github")._value.get()
|
||||
assert after == before + 10
|
||||
|
||||
def test_labels_by_connector_type(self) -> None:
|
||||
before_gh = GROUP_SYNC_GROUPS_PROCESSED.labels(
|
||||
connector_type="github"
|
||||
)._value.get()
|
||||
before_slack = GROUP_SYNC_GROUPS_PROCESSED.labels(
|
||||
connector_type="slack"
|
||||
)._value.get()
|
||||
|
||||
inc_group_sync_groups_processed("github", 4)
|
||||
|
||||
after_gh = GROUP_SYNC_GROUPS_PROCESSED.labels(
|
||||
connector_type="github"
|
||||
)._value.get()
|
||||
after_slack = GROUP_SYNC_GROUPS_PROCESSED.labels(
|
||||
connector_type="slack"
|
||||
)._value.get()
|
||||
|
||||
assert after_gh == before_gh + 4
|
||||
assert after_slack == before_slack
|
||||
|
||||
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
GROUP_SYNC_GROUPS_PROCESSED,
|
||||
"labels",
|
||||
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
|
||||
)
|
||||
inc_group_sync_groups_processed("github")
|
||||
|
||||
|
||||
# --- Group sync: users processed counter ---
|
||||
|
||||
|
||||
class TestIncGroupSyncUsersProcessed:
|
||||
def test_increments_counter(self) -> None:
|
||||
before = GROUP_SYNC_USERS_PROCESSED.labels(connector_type="github")._value.get()
|
||||
|
||||
inc_group_sync_users_processed("github", 25)
|
||||
|
||||
after = GROUP_SYNC_USERS_PROCESSED.labels(connector_type="github")._value.get()
|
||||
assert after == before + 25
|
||||
|
||||
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
GROUP_SYNC_USERS_PROCESSED,
|
||||
"labels",
|
||||
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
|
||||
)
|
||||
inc_group_sync_users_processed("github")
|
||||
|
||||
|
||||
# --- Group sync: errors counter ---
|
||||
|
||||
|
||||
class TestIncGroupSyncErrors:
|
||||
def test_increments_counter(self) -> None:
|
||||
before = GROUP_SYNC_ERRORS.labels(connector_type="sharepoint")._value.get()
|
||||
|
||||
inc_group_sync_errors("sharepoint")
|
||||
|
||||
after = GROUP_SYNC_ERRORS.labels(connector_type="sharepoint")._value.get()
|
||||
assert after == before + 1
|
||||
|
||||
def test_does_not_raise_on_exception(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
GROUP_SYNC_ERRORS,
|
||||
"labels",
|
||||
lambda **_: (_ for _ in ()).throw(RuntimeError("boom")),
|
||||
)
|
||||
inc_group_sync_errors("sharepoint")
|
||||
@@ -5,7 +5,7 @@ home: https://www.onyx.app/
|
||||
sources:
|
||||
- "https://github.com/onyx-dot-app/onyx"
|
||||
type: application
|
||||
version: 0.4.47
|
||||
version: 0.4.46
|
||||
appVersion: latest
|
||||
annotations:
|
||||
category: Productivity
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
{{- range .Values.extraManifests }}
|
||||
---
|
||||
{{- if kindIs "string" . }}
|
||||
{{ tpl . $ }}
|
||||
{{- else }}
|
||||
{{ tpl (toYaml .) $ }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
@@ -1316,26 +1316,3 @@ configMap:
|
||||
HARD_DELETE_CHATS: ""
|
||||
MAX_ALLOWED_UPLOAD_SIZE_MB: ""
|
||||
DEFAULT_USER_FILE_MAX_UPLOAD_SIZE_MB: ""
|
||||
|
||||
# -- Additional arbitrary manifests to render as part of the release. Each entry
|
||||
# may be either a YAML mapping (a single Kubernetes object) or a multi-line
|
||||
# string that will be passed through `tpl` so it can reference release values.
|
||||
# Useful for injecting resources not covered by the chart (e.g. NetworkPolicies,
|
||||
# ExternalSecrets, custom CRs) without forking the chart.
|
||||
extraManifests: []
|
||||
# extraManifests:
|
||||
# - apiVersion: v1
|
||||
# kind: ConfigMap
|
||||
# metadata:
|
||||
# name: my-extra-config
|
||||
# data:
|
||||
# key: value
|
||||
# - |
|
||||
# apiVersion: networking.k8s.io/v1
|
||||
# kind: NetworkPolicy
|
||||
# metadata:
|
||||
# name: {{ include "onyx.fullname" . }}-extra
|
||||
# spec:
|
||||
# podSelector: {}
|
||||
# policyTypes:
|
||||
# - Ingress
|
||||
|
||||
377
desktop/src-tauri/Cargo.lock
generated
377
desktop/src-tauri/Cargo.lock
generated
@@ -82,28 +82,6 @@ version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
|
||||
|
||||
[[package]]
|
||||
name = "aws-lc-rs"
|
||||
version = "1.16.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a054912289d18629dc78375ba2c3726a3afe3ff71b4edba9dedfca0e3446d1fc"
|
||||
dependencies = [
|
||||
"aws-lc-sys",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-lc-sys"
|
||||
version = "0.39.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "83a25cf98105baa966497416dbd42565ce3a8cf8dbfd59803ec9ad46f3126399"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cmake",
|
||||
"dunce",
|
||||
"fs_extra",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.21.7"
|
||||
@@ -277,8 +255,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aebf35691d1bfb0ac386a69bac2fde4dd276fb618cf8bf4f5318fe285e821bb2"
|
||||
dependencies = [
|
||||
"find-msvc-tools",
|
||||
"jobserver",
|
||||
"libc",
|
||||
"shlex",
|
||||
]
|
||||
|
||||
@@ -315,12 +291,6 @@ version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
|
||||
|
||||
[[package]]
|
||||
name = "cfg_aliases"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
|
||||
|
||||
[[package]]
|
||||
name = "chrono"
|
||||
version = "0.4.44"
|
||||
@@ -333,15 +303,6 @@ dependencies = [
|
||||
"windows-link 0.2.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cmake"
|
||||
version = "0.1.58"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678"
|
||||
dependencies = [
|
||||
"cc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "combine"
|
||||
version = "4.6.7"
|
||||
@@ -819,12 +780,6 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fs_extra"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "futf"
|
||||
version = "0.1.5"
|
||||
@@ -1042,10 +997,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"js-sys",
|
||||
"libc",
|
||||
"wasi 0.11.1+wasi-snapshot-preview1",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1055,11 +1008,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"js-sys",
|
||||
"libc",
|
||||
"r-efi 5.3.0",
|
||||
"wasip2",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1334,22 +1285,6 @@ dependencies = [
|
||||
"want",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-rustls"
|
||||
version = "0.27.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58"
|
||||
dependencies = [
|
||||
"http",
|
||||
"hyper",
|
||||
"hyper-util",
|
||||
"rustls",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.20"
|
||||
@@ -1652,16 +1587,6 @@ version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130"
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.34"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33"
|
||||
dependencies = [
|
||||
"getrandom 0.3.4",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.91"
|
||||
@@ -1793,12 +1718,6 @@ version = "0.4.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
|
||||
|
||||
[[package]]
|
||||
name = "lru-slab"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
|
||||
|
||||
[[package]]
|
||||
name = "mac"
|
||||
version = "0.1.1"
|
||||
@@ -2122,7 +2041,6 @@ name = "onyx"
|
||||
version = "0.0.0-dev"
|
||||
dependencies = [
|
||||
"directories",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tauri",
|
||||
@@ -2147,12 +2065,6 @@ dependencies = [
|
||||
"pathdiff",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openssl-probe"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe"
|
||||
|
||||
[[package]]
|
||||
name = "option-ext"
|
||||
version = "0.2.0"
|
||||
@@ -2543,62 +2455,6 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn"
|
||||
version = "0.11.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"cfg_aliases",
|
||||
"pin-project-lite",
|
||||
"quinn-proto",
|
||||
"quinn-udp",
|
||||
"rustc-hash",
|
||||
"rustls",
|
||||
"socket2",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"web-time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn-proto"
|
||||
version = "0.11.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098"
|
||||
dependencies = [
|
||||
"aws-lc-rs",
|
||||
"bytes",
|
||||
"getrandom 0.3.4",
|
||||
"lru-slab",
|
||||
"rand 0.9.2",
|
||||
"ring",
|
||||
"rustc-hash",
|
||||
"rustls",
|
||||
"rustls-pki-types",
|
||||
"slab",
|
||||
"thiserror 2.0.18",
|
||||
"tinyvec",
|
||||
"tracing",
|
||||
"web-time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn-udp"
|
||||
version = "0.5.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd"
|
||||
dependencies = [
|
||||
"cfg_aliases",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"socket2",
|
||||
"tracing",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.45"
|
||||
@@ -2645,16 +2501,6 @@ dependencies = [
|
||||
"rand_core 0.6.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.9.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
|
||||
dependencies = [
|
||||
"rand_chacha 0.9.0",
|
||||
"rand_core 0.9.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.2.2"
|
||||
@@ -2675,16 +2521,6 @@ dependencies = [
|
||||
"rand_core 0.6.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
|
||||
dependencies = [
|
||||
"ppv-lite86",
|
||||
"rand_core 0.9.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.5.1"
|
||||
@@ -2703,15 +2539,6 @@ dependencies = [
|
||||
"getrandom 0.2.17",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.9.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c"
|
||||
dependencies = [
|
||||
"getrandom 0.3.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_hc"
|
||||
version = "0.2.0"
|
||||
@@ -2830,21 +2657,15 @@ dependencies = [
|
||||
"http-body",
|
||||
"http-body-util",
|
||||
"hyper",
|
||||
"hyper-rustls",
|
||||
"hyper-util",
|
||||
"js-sys",
|
||||
"log",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"quinn",
|
||||
"rustls",
|
||||
"rustls-pki-types",
|
||||
"rustls-platform-verifier",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sync_wrapper",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower-http",
|
||||
@@ -2856,26 +2677,6 @@ dependencies = [
|
||||
"web-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"getrandom 0.2.17",
|
||||
"libc",
|
||||
"untrusted",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "2.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe"
|
||||
|
||||
[[package]]
|
||||
name = "rustc_version"
|
||||
version = "0.4.1"
|
||||
@@ -2885,81 +2686,6 @@ dependencies = [
|
||||
"semver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.23.37"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4"
|
||||
dependencies = [
|
||||
"aws-lc-rs",
|
||||
"once_cell",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pki-types",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pki-types"
|
||||
version = "1.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd"
|
||||
dependencies = [
|
||||
"web-time",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-platform-verifier"
|
||||
version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784"
|
||||
dependencies = [
|
||||
"core-foundation",
|
||||
"core-foundation-sys",
|
||||
"jni",
|
||||
"log",
|
||||
"once_cell",
|
||||
"rustls",
|
||||
"rustls-native-certs",
|
||||
"rustls-platform-verifier-android",
|
||||
"rustls-webpki",
|
||||
"security-framework",
|
||||
"security-framework-sys",
|
||||
"webpki-root-certs",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-platform-verifier-android"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f"
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.103.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef"
|
||||
dependencies = [
|
||||
"aws-lc-rs",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.22"
|
||||
@@ -2975,15 +2701,6 @@ dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schannel"
|
||||
version = "0.1.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939"
|
||||
dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schemars"
|
||||
version = "0.8.22"
|
||||
@@ -3041,29 +2758,6 @@ version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "security-framework"
|
||||
version = "3.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d"
|
||||
dependencies = [
|
||||
"bitflags 2.11.0",
|
||||
"core-foundation",
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
"security-framework-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "security-framework-sys"
|
||||
version = "2.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3"
|
||||
dependencies = [
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "selectors"
|
||||
version = "0.24.0"
|
||||
@@ -3434,12 +3128,6 @@ version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
|
||||
|
||||
[[package]]
|
||||
name = "subtle"
|
||||
version = "2.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
|
||||
|
||||
[[package]]
|
||||
name = "swift-rs"
|
||||
version = "1.0.7"
|
||||
@@ -3921,21 +3609,6 @@ dependencies = [
|
||||
"zerovec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tinyvec"
|
||||
version = "1.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3"
|
||||
dependencies = [
|
||||
"tinyvec_macros",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tinyvec_macros"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.50.0"
|
||||
@@ -3950,16 +3623,6 @@ dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-rustls"
|
||||
version = "0.26.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61"
|
||||
dependencies = [
|
||||
"rustls",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.18"
|
||||
@@ -4241,12 +3904,6 @@ version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
|
||||
|
||||
[[package]]
|
||||
name = "untrusted"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
|
||||
|
||||
[[package]]
|
||||
name = "url"
|
||||
version = "2.5.8"
|
||||
@@ -4493,16 +4150,6 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "web-time"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
|
||||
dependencies = [
|
||||
"js-sys",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webkit2gtk"
|
||||
version = "2.0.2"
|
||||
@@ -4547,15 +4194,6 @@ dependencies = [
|
||||
"system-deps",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki-root-certs"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca"
|
||||
dependencies = [
|
||||
"rustls-pki-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webview2-com"
|
||||
version = "0.38.2"
|
||||
@@ -4810,15 +4448,6 @@ dependencies = [
|
||||
"windows-targets 0.48.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
|
||||
dependencies = [
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.59.0"
|
||||
@@ -5359,12 +4988,6 @@ dependencies = [
|
||||
"synstructure",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zeroize"
|
||||
version = "1.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0"
|
||||
|
||||
[[package]]
|
||||
name = "zerotrie"
|
||||
version = "0.2.3"
|
||||
|
||||
@@ -19,7 +19,6 @@ directories = "5.0"
|
||||
tokio = { version = "1", features = ["time"] }
|
||||
window-vibrancy = "0.7.1"
|
||||
url = "2.5"
|
||||
reqwest = { version = "0.13", default-features = false, features = ["rustls"] }
|
||||
|
||||
[features]
|
||||
default = ["custom-protocol"]
|
||||
|
||||
@@ -583,31 +583,6 @@ fn open_in_default_browser(url: &str) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
async fn check_server_reachable(state: tauri::State<'_, ConfigState>) -> Result<(), String> {
|
||||
let url = state.config.read().unwrap().server_url.clone();
|
||||
let parsed = Url::parse(&url).map_err(|e| format!("Invalid URL: {}", e))?;
|
||||
match parsed.scheme() {
|
||||
"http" | "https" => {}
|
||||
_ => return Err("URL must use http or https".to_string()),
|
||||
}
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(5))
|
||||
.build()
|
||||
.map_err(|e| format!("Failed to build HTTP client: {}", e))?;
|
||||
|
||||
match client.head(parsed).send().await {
|
||||
Ok(_) => Ok(()),
|
||||
// Only definitive "server didn't answer" errors count as unreachable.
|
||||
// TLS / decode / redirect errors imply the server is listening — the
|
||||
// webview, which has its own trust store, is likely to succeed even
|
||||
// when rustls rejects a self-signed cert.
|
||||
Err(e) if e.is_connect() || e.is_timeout() => Err(e.to_string()),
|
||||
Err(_) => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
fn open_in_browser(url: String) -> Result<(), String> {
|
||||
let parsed_url = Url::parse(&url).map_err(|_| "Invalid URL".to_string())?;
|
||||
@@ -1319,7 +1294,6 @@ fn main() {
|
||||
get_bootstrap_state,
|
||||
set_server_url,
|
||||
get_config_path_cmd,
|
||||
check_server_reachable,
|
||||
open_in_browser,
|
||||
open_config_file,
|
||||
open_config_directory,
|
||||
|
||||
@@ -459,19 +459,8 @@
|
||||
return;
|
||||
}
|
||||
|
||||
// Not first launch and not explicit settings — confirm the server
|
||||
// is reachable before handing the webview over. Otherwise the user
|
||||
// lands on a native "connection refused" page with no way back.
|
||||
try {
|
||||
await invoke("check_server_reachable");
|
||||
} catch (reachErr) {
|
||||
showSettings();
|
||||
showError(
|
||||
`Could not connect to ${currentServerUrl}. Check the URL and your network connection.`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Not first launch and not explicit settings
|
||||
// Auto-redirect to configured domain
|
||||
window.location.href = currentServerUrl;
|
||||
} catch (error) {
|
||||
// On error, default to cloud
|
||||
|
||||
@@ -18,18 +18,15 @@ docker compose up -d
|
||||
|
||||
- **Onyx DB Pool Health** — PostgreSQL connection pool utilization
|
||||
- **Onyx Indexing Pipeline v2** — Per-connector indexing throughput, queue depth, task latency
|
||||
- **Onyx Permission Sync** — Doc permission sync and external group sync duration, throughput, errors, and Celery task metrics
|
||||
|
||||
## Scrape targets
|
||||
|
||||
| Job | Port | Source |
|
||||
|----------------------------|-------|-------------------------------|
|
||||
| `onyx-api-server` | 8080 | FastAPI `/metrics` (matches `.vscode/launch.json`) |
|
||||
| `onyx-monitoring-worker` | 9096 | Celery monitoring worker |
|
||||
| `onyx-docfetching-worker` | 9092 | Celery docfetching worker |
|
||||
| `onyx-docprocessing-worker`| 9093 | Celery docprocessing worker |
|
||||
| `onyx-heavy-worker` | 9094 | Celery heavy worker (pruning, perm sync, group sync) |
|
||||
| `onyx-light-worker` | 9095 | Celery light worker (vespa sync, deletion, permissions upsert) |
|
||||
| Job | Port | Source |
|
||||
|--------------------------|-------|-------------------------------|
|
||||
| `onyx-api-server` | 8080 | FastAPI `/metrics` (matches `.vscode/launch.json`) |
|
||||
| `onyx-monitoring-worker` | 9096 | Celery monitoring worker |
|
||||
| `onyx-docfetching-worker`| 9092 | Celery docfetching worker |
|
||||
| `onyx-docprocessing-worker`| 9093 | Celery docprocessing worker |
|
||||
|
||||
## Environment variables
|
||||
|
||||
|
||||
@@ -1,697 +0,0 @@
|
||||
{
|
||||
"id": null,
|
||||
"annotations": {
|
||||
"list": [
|
||||
{
|
||||
"builtIn": 1,
|
||||
"datasource": {
|
||||
"type": "grafana",
|
||||
"uid": "-- Grafana --"
|
||||
},
|
||||
"enable": true,
|
||||
"hide": true,
|
||||
"iconColor": "rgba(0, 211, 255, 1)",
|
||||
"name": "Annotations & Alerts",
|
||||
"type": "dashboard"
|
||||
}
|
||||
]
|
||||
},
|
||||
"editable": true,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 1,
|
||||
"links": [],
|
||||
"panels": [
|
||||
{
|
||||
"collapsed": false,
|
||||
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 0 },
|
||||
"id": 100,
|
||||
"title": "Doc Permission Sync",
|
||||
"type": "row"
|
||||
},
|
||||
{
|
||||
"title": "Doc Perm Sync Duration (p50 / p95 / p99)",
|
||||
"description": "Overall duration of doc permission sync by connector type",
|
||||
"type": "timeseries",
|
||||
"id": 1,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 1 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "s",
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 10,
|
||||
"pointSize": 5,
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "histogram_quantile(0.50, sum(rate(onyx_doc_perm_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p50",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.95, sum(rate(onyx_doc_perm_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p95",
|
||||
"refId": "B"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.99, sum(rate(onyx_doc_perm_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p99",
|
||||
"refId": "C"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Doc Perm Sync DB Update Duration (p50 / p95 / p99)",
|
||||
"description": "Cumulative per-element DB update time within a single sync run",
|
||||
"type": "timeseries",
|
||||
"id": 2,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 1 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "s",
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 10,
|
||||
"pointSize": 5,
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "histogram_quantile(0.50, sum(rate(onyx_doc_perm_sync_db_update_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p50",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.95, sum(rate(onyx_doc_perm_sync_db_update_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p95",
|
||||
"refId": "B"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.99, sum(rate(onyx_doc_perm_sync_db_update_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p99",
|
||||
"refId": "C"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Doc Perm Sync Throughput (docs/min)",
|
||||
"description": "Rate of documents successfully synced per minute",
|
||||
"type": "timeseries",
|
||||
"id": 3,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 8, "x": 0, "y": 9 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "docs/min",
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 18,
|
||||
"pointSize": 5,
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "sum(rate(onyx_doc_perm_sync_docs_processed_total{connector_type=~\"$connector_type\"}[$__rate_interval])) by (connector_type) * 60",
|
||||
"legendFormat": "{{connector_type}}",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Doc Perm Sync Error Rate",
|
||||
"description": "Rate of document permission errors per minute",
|
||||
"type": "timeseries",
|
||||
"id": 4,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 8, "x": 8, "y": 9 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "errors/min",
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 18,
|
||||
"pointSize": 5,
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "sum(rate(onyx_doc_perm_sync_errors_total{connector_type=~\"$connector_type\"}[$__rate_interval])) by (connector_type) * 60",
|
||||
"legendFormat": "{{connector_type}}",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Doc Perm Sync Totals",
|
||||
"description": "Cumulative docs synced and errors",
|
||||
"type": "stat",
|
||||
"id": 5,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 8, "x": 16, "y": 9 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": { "mode": "thresholds" },
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{ "color": "green", "value": null }
|
||||
]
|
||||
}
|
||||
},
|
||||
"overrides": [
|
||||
{
|
||||
"matcher": { "id": "byName", "options": "Errors" },
|
||||
"properties": [
|
||||
{
|
||||
"id": "thresholds",
|
||||
"value": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{ "color": "green", "value": null },
|
||||
{ "color": "red", "value": 1 }
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"options": {
|
||||
"reduceOptions": { "calcs": ["lastNotNull"] },
|
||||
"orientation": "horizontal",
|
||||
"textMode": "auto",
|
||||
"colorMode": "value",
|
||||
"graphMode": "area"
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "sum(onyx_doc_perm_sync_docs_processed_total{connector_type=~\"$connector_type\"})",
|
||||
"legendFormat": "Docs Synced",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "sum(onyx_doc_perm_sync_errors_total{connector_type=~\"$connector_type\"})",
|
||||
"legendFormat": "Errors",
|
||||
"refId": "B"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"collapsed": false,
|
||||
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 17 },
|
||||
"id": 101,
|
||||
"title": "External Group Sync",
|
||||
"type": "row"
|
||||
},
|
||||
{
|
||||
"title": "Group Sync Duration (p50 / p95 / p99)",
|
||||
"description": "Overall duration of external group sync by connector type",
|
||||
"type": "timeseries",
|
||||
"id": 6,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 18 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "s",
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 10,
|
||||
"pointSize": 5,
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "histogram_quantile(0.50, sum(rate(onyx_group_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p50",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.95, sum(rate(onyx_group_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p95",
|
||||
"refId": "B"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.99, sum(rate(onyx_group_sync_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p99",
|
||||
"refId": "C"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Group Sync Upsert Duration (p50 / p95 / p99)",
|
||||
"description": "Cumulative batch upsert time within a single group sync run",
|
||||
"type": "timeseries",
|
||||
"id": 7,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 18 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "s",
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 10,
|
||||
"pointSize": 5,
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "histogram_quantile(0.50, sum(rate(onyx_group_sync_upsert_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p50",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.95, sum(rate(onyx_group_sync_upsert_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p95",
|
||||
"refId": "B"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.99, sum(rate(onyx_group_sync_upsert_duration_seconds_bucket{connector_type=~\"$connector_type\"}[$__rate_interval])) by (le, connector_type))",
|
||||
"legendFormat": "{{connector_type}} p99",
|
||||
"refId": "C"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Group Sync Throughput (groups/min)",
|
||||
"description": "Rate of groups processed per minute",
|
||||
"type": "timeseries",
|
||||
"id": 8,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 8, "x": 0, "y": 26 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "groups/min",
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 18,
|
||||
"pointSize": 5,
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "sum(rate(onyx_group_sync_groups_processed_total{connector_type=~\"$connector_type\"}[$__rate_interval])) by (connector_type) * 60",
|
||||
"legendFormat": "{{connector_type}} groups",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "sum(rate(onyx_group_sync_users_processed_total{connector_type=~\"$connector_type\"}[$__rate_interval])) by (connector_type) * 60",
|
||||
"legendFormat": "{{connector_type}} users",
|
||||
"refId": "B"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Group Sync Error Rate",
|
||||
"description": "Rate of errors during external group sync per minute",
|
||||
"type": "timeseries",
|
||||
"id": 9,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 8, "x": 8, "y": 26 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "errors/min",
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 18,
|
||||
"pointSize": 5,
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "sum(rate(onyx_group_sync_errors_total{connector_type=~\"$connector_type\"}[$__rate_interval])) by (connector_type) * 60",
|
||||
"legendFormat": "{{connector_type}}",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Group Sync Totals",
|
||||
"description": "Cumulative groups, users processed and errors",
|
||||
"type": "stat",
|
||||
"id": 10,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 8, "x": 16, "y": 26 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": { "mode": "thresholds" },
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{ "color": "green", "value": null }
|
||||
]
|
||||
}
|
||||
},
|
||||
"overrides": [
|
||||
{
|
||||
"matcher": { "id": "byName", "options": "Errors" },
|
||||
"properties": [
|
||||
{
|
||||
"id": "thresholds",
|
||||
"value": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{ "color": "green", "value": null },
|
||||
{ "color": "red", "value": 1 }
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"options": {
|
||||
"reduceOptions": { "calcs": ["lastNotNull"] },
|
||||
"orientation": "horizontal",
|
||||
"textMode": "auto",
|
||||
"colorMode": "value",
|
||||
"graphMode": "area"
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "sum(onyx_group_sync_groups_processed_total{connector_type=~\"$connector_type\"})",
|
||||
"legendFormat": "Groups",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "sum(onyx_group_sync_users_processed_total{connector_type=~\"$connector_type\"})",
|
||||
"legendFormat": "Users",
|
||||
"refId": "B"
|
||||
},
|
||||
{
|
||||
"expr": "sum(onyx_group_sync_errors_total{connector_type=~\"$connector_type\"})",
|
||||
"legendFormat": "Errors",
|
||||
"refId": "C"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"collapsed": false,
|
||||
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 34 },
|
||||
"id": 102,
|
||||
"title": "Celery Task Metrics (Perm Sync Tasks)",
|
||||
"type": "row"
|
||||
},
|
||||
{
|
||||
"title": "Perm Sync Celery Task Duration (p95)",
|
||||
"description": "Task execution duration for permission sync celery tasks",
|
||||
"type": "timeseries",
|
||||
"id": 11,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 35 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "s",
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 10,
|
||||
"pointSize": 5,
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "histogram_quantile(0.95, sum(rate(onyx_celery_task_duration_seconds_bucket{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task|check_for_doc_permissions_sync|check_for_external_group_sync\"}[$__rate_interval])) by (le, task_name))",
|
||||
"legendFormat": "{{task_name}} p95",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Perm Sync Celery Task Outcomes",
|
||||
"description": "Success vs failure counts for permission sync celery tasks",
|
||||
"type": "timeseries",
|
||||
"id": 12,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 35 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "bars",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 50,
|
||||
"pointSize": 5,
|
||||
"showPoints": "never",
|
||||
"spanNulls": false,
|
||||
"stacking": { "mode": "normal", "group": "A" },
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["sum"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "sum(increase(onyx_celery_task_completed_total{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\", outcome=\"success\"}[$__rate_interval])) by (task_name)",
|
||||
"legendFormat": "{{task_name}} success",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "sum(increase(onyx_celery_task_completed_total{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\", outcome=\"failure\"}[$__rate_interval])) by (task_name)",
|
||||
"legendFormat": "{{task_name}} failure",
|
||||
"refId": "B"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Perm Sync Celery Revoked / Retried / Rejected",
|
||||
"description": "Revocation, retry, and rejection counts for perm sync tasks",
|
||||
"type": "timeseries",
|
||||
"id": 13,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 43 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "bars",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 50,
|
||||
"pointSize": 5,
|
||||
"showPoints": "never",
|
||||
"spanNulls": false,
|
||||
"stacking": { "mode": "normal", "group": "A" },
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["sum"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "sum(increase(onyx_celery_task_revoked_total{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\"}[$__rate_interval])) by (task_name)",
|
||||
"legendFormat": "{{task_name}} revoked",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "sum(increase(onyx_celery_task_retried_total{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\"}[$__rate_interval])) by (task_name)",
|
||||
"legendFormat": "{{task_name}} retried",
|
||||
"refId": "B"
|
||||
},
|
||||
{
|
||||
"expr": "sum(increase(onyx_celery_task_rejected_total{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\"}[$__rate_interval])) by (task_name)",
|
||||
"legendFormat": "{{task_name}} rejected",
|
||||
"refId": "C"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"title": "Perm Sync Celery Queue Wait Time (p95)",
|
||||
"description": "Time perm sync tasks waited in queue before execution",
|
||||
"type": "timeseries",
|
||||
"id": 14,
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 43 },
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "s",
|
||||
"color": { "mode": "palette-classic" },
|
||||
"custom": {
|
||||
"drawStyle": "line",
|
||||
"lineInterpolation": "linear",
|
||||
"fillOpacity": 10,
|
||||
"pointSize": 5,
|
||||
"showPoints": "auto",
|
||||
"spanNulls": false,
|
||||
"axisBorderShow": false,
|
||||
"axisPlacement": "auto"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": { "displayMode": "table", "placement": "bottom", "calcs": ["mean", "max"] },
|
||||
"tooltip": { "mode": "multi", "sort": "desc" }
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "histogram_quantile(0.95, sum(rate(onyx_celery_task_queue_wait_seconds_bucket{task_name=~\"connector_permission_sync_generator_task|connector_external_group_sync_generator_task\"}[$__rate_interval])) by (le, task_name))",
|
||||
"legendFormat": "{{task_name}} p95",
|
||||
"refId": "A"
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"schemaVersion": 39,
|
||||
"tags": ["onyx", "permissions", "sync"],
|
||||
"templating": {
|
||||
"list": [
|
||||
{
|
||||
"current": { "text": "Prometheus", "value": "prometheus" },
|
||||
"includeAll": false,
|
||||
"name": "DS_PROMETHEUS",
|
||||
"options": [],
|
||||
"query": "prometheus",
|
||||
"refresh": 1,
|
||||
"type": "datasource"
|
||||
},
|
||||
{
|
||||
"allValue": ".*",
|
||||
"current": { "selected": true, "text": "All", "value": "$__all" },
|
||||
"datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" },
|
||||
"definition": "label_values(onyx_doc_perm_sync_duration_seconds_bucket, connector_type)",
|
||||
"includeAll": true,
|
||||
"label": "Connector Type",
|
||||
"multi": true,
|
||||
"name": "connector_type",
|
||||
"options": [],
|
||||
"query": {
|
||||
"query": "label_values(onyx_doc_perm_sync_duration_seconds_bucket, connector_type)",
|
||||
"refId": "StandardVariableQuery"
|
||||
},
|
||||
"refresh": 2,
|
||||
"regex": "",
|
||||
"sort": 1,
|
||||
"type": "query"
|
||||
}
|
||||
]
|
||||
},
|
||||
"time": {
|
||||
"from": "now-6h",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {},
|
||||
"timezone": "",
|
||||
"title": "Onyx Permission Sync",
|
||||
"uid": "onyx-permission-sync",
|
||||
"version": 1,
|
||||
"weekStart": ""
|
||||
}
|
||||
@@ -34,15 +34,3 @@ scrape_configs:
|
||||
metrics_path: /metrics
|
||||
static_configs:
|
||||
- targets: ['host.docker.internal:9093']
|
||||
|
||||
- job_name: 'onyx-heavy-worker'
|
||||
scrape_interval: 5s
|
||||
metrics_path: /metrics
|
||||
static_configs:
|
||||
- targets: ['host.docker.internal:9094']
|
||||
|
||||
- job_name: 'onyx-light-worker'
|
||||
scrape_interval: 5s
|
||||
metrics_path: /metrics
|
||||
static_configs:
|
||||
- targets: ['host.docker.internal:9095']
|
||||
|
||||
@@ -996,23 +996,6 @@ export default function ChatPreferencesPage() {
|
||||
</InputVertical>
|
||||
</Card>
|
||||
|
||||
<Card border="solid" rounding="lg">
|
||||
<InputHorizontal
|
||||
title="Image Extraction & Analysis"
|
||||
description="Extract embedded images from uploaded files (PDFs, DOCX, etc.) and summarize them with a vision-capable LLM so image-only documents become searchable and answerable. Requires a vision-capable default LLM."
|
||||
withLabel
|
||||
>
|
||||
<Switch
|
||||
checked={s.image_extraction_and_analysis_enabled ?? true}
|
||||
onCheckedChange={(checked) => {
|
||||
void saveSettings({
|
||||
image_extraction_and_analysis_enabled: checked,
|
||||
});
|
||||
}}
|
||||
/>
|
||||
</InputHorizontal>
|
||||
</Card>
|
||||
|
||||
<Card border="solid" rounding="lg">
|
||||
<Section>
|
||||
<InputHorizontal
|
||||
|
||||
Reference in New Issue
Block a user