Compare commits

..

16 Commits

Author SHA1 Message Date
Nikolas Garza
dfd168cde9 fix(fe): bump flatted to patch CVE-2026-32141 (#9350) 2026-03-14 05:46:04 +00:00
Raunak Bhagat
6c7ae243d0 feat: refresh admin sidebar with new sections, search, and disabled EE tabs (#9344) 2026-03-14 04:09:16 +00:00
Raunak Bhagat
c4a2ff2593 feat: add progress-bars opal icon (#9349) 2026-03-14 02:18:41 +00:00
Danelegend
4b74a6dc76 fix(litellm): filter embedding models (#9347) 2026-03-14 01:40:06 +00:00
dependabot[bot]
eea5f5b380 chore(deps): bump pyjwt from 2.11.0 to 2.12.0 (#9341)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-13 21:57:49 +00:00
Raunak Bhagat
ae428ba684 feat: add curate and user variant opal icons (#9343) 2026-03-13 21:51:02 +00:00
Jamison Lahman
7b927e79c2 chore(format): format files where ruff and black agree (#9339) 2026-03-13 20:18:49 +00:00
Raunak Bhagat
a6815d1221 feat(llm-modal-consolidation): Update components (#9334) 2026-03-13 19:26:01 +00:00
Wenxi
f73d103b6b chore: refactor ph events for typing and consolidation and add event on llm configuration (#9328) 2026-03-13 18:46:15 +00:00
Justin Tahara
5ec424a3f3 feat(cherry-pick): notify Slack on successful PR creation (#9331) 2026-03-13 18:30:23 +00:00
Jessica Singh
0bd3e9a11c fix(voice): sanitized error and fix replay voice on revisit chat (#9326) 2026-03-13 18:30:06 +00:00
Jamison Lahman
a336691882 chore(playwright): remove .only typo (#9336) 2026-03-13 11:34:22 -07:00
Jamison Lahman
bd4965b4d9 chore(deps): upgrade katex: v0.16.17->v0.16.38 (#9327) 2026-03-13 18:06:47 +00:00
Justin Tahara
3c8a24eeba chore(cherry-pick): Whitelist for Users who can CP (#9330) 2026-03-13 17:59:40 +00:00
Evan Lohn
613be0de66 fix: sharepoint pages 400 list expand (#9321) 2026-03-13 17:55:55 +00:00
Justin Tahara
6f05dbd650 chore(cherry-pick): CODEOWNERS for cherry-pick (#9329) 2026-03-13 17:51:48 +00:00
455 changed files with 2811 additions and 2540 deletions

3
.github/CODEOWNERS vendored
View File

@@ -8,3 +8,6 @@
# Agent context files
/CLAUDE.md @Weves
/AGENTS.md @Weves
# Beta cherry-pick workflow owners
/.github/workflows/post-merge-beta-cherry-pick.yml @justin-tahara @jmelahman

View File

@@ -1,11 +1,14 @@
name: "Slack Notify on Failure"
description: "Sends a Slack notification when a workflow fails"
name: "Slack Notify"
description: "Sends a Slack notification for workflow events"
inputs:
webhook-url:
description: "Slack webhook URL (can also use SLACK_WEBHOOK_URL env var)"
required: false
details:
description: "Additional message body content"
required: false
failed-jobs:
description: "List of failed job names (newline-separated)"
description: "Deprecated alias for details"
required: false
title:
description: "Title for the notification"
@@ -21,6 +24,7 @@ runs:
shell: bash
env:
SLACK_WEBHOOK_URL: ${{ inputs.webhook-url }}
DETAILS: ${{ inputs.details }}
FAILED_JOBS: ${{ inputs.failed-jobs }}
TITLE: ${{ inputs.title }}
REF_NAME: ${{ inputs.ref-name }}
@@ -44,6 +48,18 @@ runs:
REF_NAME="$GITHUB_REF_NAME"
fi
if [ -z "$DETAILS" ]; then
DETAILS="$FAILED_JOBS"
fi
normalize_multiline() {
printf '%s' "$1" | awk 'BEGIN { ORS=""; first=1 } { if (!first) printf "\\n"; printf "%s", $0; first=0 }'
}
DETAILS="$(normalize_multiline "$DETAILS")"
REF_NAME="$(normalize_multiline "$REF_NAME")"
TITLE="$(normalize_multiline "$TITLE")"
# Escape JSON special characters
escape_json() {
local input="$1"
@@ -59,12 +75,12 @@ runs:
}
REF_NAME_ESC=$(escape_json "$REF_NAME")
FAILED_JOBS_ESC=$(escape_json "$FAILED_JOBS")
DETAILS_ESC=$(escape_json "$DETAILS")
WORKFLOW_URL_ESC=$(escape_json "$WORKFLOW_URL")
TITLE_ESC=$(escape_json "$TITLE")
# Build JSON payload piece by piece
# Note: FAILED_JOBS_ESC already contains \n sequences that should remain as \n in JSON
# Note: DETAILS_ESC already contains \n sequences that should remain as \n in JSON
PAYLOAD="{"
PAYLOAD="${PAYLOAD}\"text\":\"${TITLE_ESC}\","
PAYLOAD="${PAYLOAD}\"blocks\":[{"
@@ -79,10 +95,10 @@ runs:
PAYLOAD="${PAYLOAD}{\"type\":\"mrkdwn\",\"text\":\"*Run ID:*\\n#${RUN_NUMBER}\"}"
PAYLOAD="${PAYLOAD}]"
PAYLOAD="${PAYLOAD}}"
if [ -n "$FAILED_JOBS" ]; then
if [ -n "$DETAILS" ]; then
PAYLOAD="${PAYLOAD},{"
PAYLOAD="${PAYLOAD}\"type\":\"section\","
PAYLOAD="${PAYLOAD}\"text\":{\"type\":\"mrkdwn\",\"text\":\"*Failed Jobs:*\\n${FAILED_JOBS_ESC}\"}"
PAYLOAD="${PAYLOAD}\"text\":{\"type\":\"mrkdwn\",\"text\":\"${DETAILS_ESC}\"}"
PAYLOAD="${PAYLOAD}}"
fi
PAYLOAD="${PAYLOAD},{"
@@ -99,4 +115,3 @@ runs:
curl -X POST -H 'Content-type: application/json' \
--data "$PAYLOAD" \
"$SLACK_WEBHOOK_URL"

View File

@@ -37,10 +37,27 @@ jobs:
PR_BODY: ${{ github.event.pull_request.body }}
MERGE_COMMIT_SHA: ${{ github.event.pull_request.merge_commit_sha }}
MERGED_BY: ${{ github.event.pull_request.merged_by.login }}
# GitHub team slug authorized to trigger cherry-picks (e.g. "core-eng").
# For private/secret teams the GITHUB_TOKEN may need org:read scope;
# visible teams work with the default token.
ALLOWED_TEAM: "onyx-core-team"
# Explicit merger allowlist used because pull_request_target runs with
# the default GITHUB_TOKEN, which cannot reliably read org/team
# membership for this repository context.
ALLOWED_MERGERS: |
acaprau
bo-onyx
danelegend
duo-onyx
evan-onyx
jessicasingh7
jmelahman
joachim-danswer
justin-tahara
nmgarza5
raunakab
rohoswagger
subash-mohan
trial2onyx
wenxi-onyx
weves
yuhongsun96
run: |
echo "pr_number=${PR_NUMBER}" >> "$GITHUB_OUTPUT"
echo "merged_by=${MERGED_BY}" >> "$GITHUB_OUTPUT"
@@ -64,19 +81,11 @@ jobs:
echo "merge_commit_sha=${MERGE_COMMIT_SHA}" >> "$GITHUB_OUTPUT"
member_state_file="$(mktemp)"
member_err_file="$(mktemp)"
if ! gh api "orgs/${GITHUB_REPOSITORY_OWNER}/teams/${ALLOWED_TEAM}/memberships/${MERGED_BY}" --jq '.state' >"${member_state_file}" 2>"${member_err_file}"; then
api_err="$(tr '\n' ' ' < "${member_err_file}" | sed 's/[[:space:]]\+/ /g' | cut -c1-300)"
echo "gate_error=team-api-error" >> "$GITHUB_OUTPUT"
echo "::error::Team membership API call failed for ${MERGED_BY} in ${ALLOWED_TEAM}: ${api_err}"
exit 1
fi
member_state="$(cat "${member_state_file}")"
if [ "${member_state}" != "active" ]; then
echo "gate_error=not-team-member" >> "$GITHUB_OUTPUT"
echo "::error::${MERGED_BY} is not an active member of team ${ALLOWED_TEAM} (state: ${member_state}). Failing cherry-pick gate."
normalized_merged_by="$(printf '%s' "${MERGED_BY}" | tr '[:upper:]' '[:lower:]')"
normalized_allowed_mergers="$(printf '%s\n' "${ALLOWED_MERGERS}" | tr '[:upper:]' '[:lower:]')"
if ! printf '%s\n' "${normalized_allowed_mergers}" | grep -Fxq "${normalized_merged_by}"; then
echo "gate_error=not-allowed-merger" >> "$GITHUB_OUTPUT"
echo "::error::${MERGED_BY} is not in the explicit cherry-pick merger allowlist. Failing cherry-pick gate."
exit 1
fi
@@ -90,6 +99,7 @@ jobs:
contents: write
pull-requests: write
outputs:
cherry_pick_pr_url: ${{ steps.run_cherry_pick.outputs.pr_url }}
cherry_pick_reason: ${{ steps.run_cherry_pick.outputs.reason }}
cherry_pick_details: ${{ steps.run_cherry_pick.outputs.details }}
runs-on: ubuntu-latest
@@ -137,7 +147,11 @@ jobs:
fi
if [ "${exit_code}" -eq 0 ]; then
pr_url="$(sed -n 's/^.*PR created successfully: \(https:\/\/github\.com\/[^[:space:]]\+\/pull\/[0-9]\+\).*$/\1/p' "$output_file" | tail -n 1)"
echo "status=success" >> "$GITHUB_OUTPUT"
if [ -n "${pr_url}" ]; then
echo "pr_url=${pr_url}" >> "$GITHUB_OUTPUT"
fi
exit 0
fi
@@ -163,6 +177,54 @@ jobs:
echo "::error::Automated cherry-pick failed (${CHERRY_PICK_REASON})."
exit 1
notify-slack-on-cherry-pick-success:
needs:
- resolve-cherry-pick-request
- cherry-pick-to-latest-release
if: needs.resolve-cherry-pick-request.outputs.should_cherrypick == 'true' && needs.resolve-cherry-pick-request.result == 'success' && needs.cherry-pick-to-latest-release.result == 'success'
runs-on: ubuntu-slim
timeout-minutes: 10
steps:
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- name: Fail if Slack webhook secret is missing
env:
CHERRY_PICK_PRS_WEBHOOK: ${{ secrets.CHERRY_PICK_PRS_WEBHOOK }}
run: |
if [ -z "${CHERRY_PICK_PRS_WEBHOOK}" ]; then
echo "::error::CHERRY_PICK_PRS_WEBHOOK is not configured."
exit 1
fi
- name: Build cherry-pick success summary
id: success-summary
env:
SOURCE_PR_NUMBER: ${{ needs.resolve-cherry-pick-request.outputs.pr_number }}
MERGE_COMMIT_SHA: ${{ needs.resolve-cherry-pick-request.outputs.merge_commit_sha }}
CHERRY_PICK_PR_URL: ${{ needs.cherry-pick-to-latest-release.outputs.cherry_pick_pr_url }}
run: |
source_pr_url="https://github.com/${GITHUB_REPOSITORY}/pull/${SOURCE_PR_NUMBER}"
details="*Cherry-pick PR opened successfully.*\\n• source PR: ${source_pr_url}"
if [ -n "${CHERRY_PICK_PR_URL}" ]; then
details="${details}\\n• cherry-pick PR: ${CHERRY_PICK_PR_URL}"
fi
if [ -n "${MERGE_COMMIT_SHA}" ]; then
details="${details}\\n• merge SHA: ${MERGE_COMMIT_SHA}"
fi
echo "details=${details}" >> "$GITHUB_OUTPUT"
- name: Notify #cherry-pick-prs about cherry-pick success
uses: ./.github/actions/slack-notify
with:
webhook-url: ${{ secrets.CHERRY_PICK_PRS_WEBHOOK }}
details: ${{ steps.success-summary.outputs.details }}
title: "✅ Automated Cherry-Pick PR Opened"
ref-name: ${{ github.event.pull_request.base.ref }}
notify-slack-on-cherry-pick-failure:
needs:
- resolve-cherry-pick-request
@@ -199,10 +261,8 @@ jobs:
reason_text="cherry-pick command failed"
if [ "${GATE_ERROR}" = "missing-merge-commit-sha" ]; then
reason_text="requested cherry-pick but merge commit SHA was missing"
elif [ "${GATE_ERROR}" = "team-api-error" ]; then
reason_text="team membership lookup failed while validating cherry-pick permissions"
elif [ "${GATE_ERROR}" = "not-team-member" ]; then
reason_text="merger is not an active member of the allowed team"
elif [ "${GATE_ERROR}" = "not-allowed-merger" ]; then
reason_text="merger is not in the explicit cherry-pick allowlist"
elif [ "${CHERRY_PICK_REASON}" = "output-capture-failed" ]; then
reason_text="failed to capture cherry-pick output for classification"
elif [ "${CHERRY_PICK_REASON}" = "merge-conflict" ]; then
@@ -229,6 +289,6 @@ jobs:
uses: ./.github/actions/slack-notify
with:
webhook-url: ${{ secrets.CHERRY_PICK_PRS_WEBHOOK }}
failed-jobs: ${{ steps.failure-summary.outputs.jobs }}
details: ${{ steps.failure-summary.outputs.jobs }}
title: "🚨 Automated Cherry-Pick Failed"
ref-name: ${{ github.event.pull_request.base.ref }}

View File

@@ -244,7 +244,10 @@ def do_run_migrations(
def provide_iam_token_for_alembic(
dialect: Any, conn_rec: Any, cargs: Any, cparams: Any # noqa: ARG001
dialect: Any, # noqa: ARG001
conn_rec: Any, # noqa: ARG001
cargs: Any, # noqa: ARG001
cparams: Any,
) -> None:
if USE_IAM_AUTH:
# Database connection settings
@@ -360,8 +363,7 @@ async def run_async_migrations() -> None:
# upgrade_all_tenants=true or schemas in multi-tenant mode
# and for non-multi-tenant mode, we should use schemas with the default schema
raise ValueError(
"No migration target specified. Use either upgrade_all_tenants=true for all tenants "
"or schemas for specific schemas."
"No migration target specified. Use either upgrade_all_tenants=true for all tenants or schemas for specific schemas."
)
await engine.dispose()
@@ -457,8 +459,7 @@ def run_migrations_offline() -> None:
else:
# This should not happen in the new design
raise ValueError(
"No migration target specified. Use either upgrade_all_tenants=true for all tenants "
"or schemas for specific schemas."
"No migration target specified. Use either upgrade_all_tenants=true for all tenants or schemas for specific schemas."
)

View File

@@ -13,6 +13,7 @@ Usage examples::
# custom settings
python alembic/run_multitenant_migrations.py -j 8 -b 100
"""
from __future__ import annotations
import argparse
@@ -117,8 +118,7 @@ def run_migrations_parallel(
batches = [schemas[i : i + batch_size] for i in range(0, len(schemas), batch_size)]
total_batches = len(batches)
print(
f"{len(schemas)} schemas in {total_batches} batch(es) "
f"with {max_workers} workers (batch size: {batch_size})...",
f"{len(schemas)} schemas in {total_batches} batch(es) with {max_workers} workers (batch size: {batch_size})...",
flush=True,
)
all_success = True
@@ -166,8 +166,7 @@ def run_migrations_parallel(
with lock:
in_flight[batch_idx] = batch
print(
f"Batch {batch_idx + 1}/{total_batches} started "
f"({len(batch)} schemas): {', '.join(batch)}",
f"Batch {batch_idx + 1}/{total_batches} started ({len(batch)} schemas): {', '.join(batch)}",
flush=True,
)
result = run_alembic_for_batch(batch)
@@ -201,7 +200,7 @@ def run_migrations_parallel(
except Exception as e:
print(
f"Batch {batch_idx + 1}/{total_batches} " f"✗ exception: {e}",
f"Batch {batch_idx + 1}/{total_batches} ✗ exception: {e}",
flush=True,
)
all_success = False
@@ -268,14 +267,12 @@ def main() -> int:
if not schemas_to_migrate:
print(
f"All {len(tenant_schemas)} tenants are already at head "
f"revision ({head_rev})."
f"All {len(tenant_schemas)} tenants are already at head revision ({head_rev})."
)
return 0
print(
f"{len(schemas_to_migrate)}/{len(tenant_schemas)} tenants need "
f"migration (head: {head_rev})."
f"{len(schemas_to_migrate)}/{len(tenant_schemas)} tenants need migration (head: {head_rev})."
)
success = run_migrations_parallel(

View File

@@ -50,8 +50,7 @@ def upgrade() -> None:
if orphaned_count > 0:
logger.warning(
f"WARNING: {orphaned_count} chat_session records still have "
f"folder_id without project_id. Proceeding anyway."
f"WARNING: {orphaned_count} chat_session records still have folder_id without project_id. Proceeding anyway."
)
# === Step 2: Drop chat_session.folder_id ===

View File

@@ -75,8 +75,7 @@ def batch_delete(
if failed_batches:
logger.warning(
f"Failed to delete {len(failed_batches)} batches from {table_name}. "
f"Total deleted: {total_deleted}/{total_count}"
f"Failed to delete {len(failed_batches)} batches from {table_name}. Total deleted: {total_deleted}/{total_count}"
)
# Fail the migration to avoid silently succeeding on partial cleanup
raise RuntimeError(

View File

@@ -18,8 +18,7 @@ depends_on = None
def upgrade() -> None:
# Set all existing records to not migrated
op.execute(
"UPDATE user_file SET document_id_migrated = FALSE "
"WHERE document_id_migrated IS DISTINCT FROM FALSE;"
"UPDATE user_file SET document_id_migrated = FALSE WHERE document_id_migrated IS DISTINCT FROM FALSE;"
)

View File

@@ -35,7 +35,6 @@ def upgrade() -> None:
# environment variables MUST be set. Otherwise, an exception will be raised.
if not MULTI_TENANT:
# Enable pg_trgm extension if not already enabled
op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")
@@ -481,8 +480,7 @@ def upgrade() -> None:
f"ON kg_entity USING GIN (name {POSTGRES_DEFAULT_SCHEMA}.gin_trgm_ops)"
)
op.execute(
"CREATE INDEX IF NOT EXISTS idx_kg_entity_normalization_trigrams "
"ON kg_entity USING GIN (name_trigrams)"
"CREATE INDEX IF NOT EXISTS idx_kg_entity_normalization_trigrams ON kg_entity USING GIN (name_trigrams)"
)
# Create kg_entity trigger to update kg_entity.name and its trigrams

View File

@@ -51,10 +51,7 @@ def upgrade() -> None:
next_email = f"{username.lower()}_{attempt}@{domain.lower()}"
# Email conflict occurred, append `_1`, `_2`, etc., to the username
logger.warning(
f"Conflict while lowercasing email: "
f"old_email={email} "
f"conflicting_email={new_email} "
f"next_email={next_email}"
f"Conflict while lowercasing email: old_email={email} conflicting_email={new_email} next_email={next_email}"
)
new_email = next_email
attempt += 1

View File

@@ -24,12 +24,10 @@ depends_on = None
def upgrade() -> None:
# Convert existing lowercase values to uppercase to match enum member names
op.execute(
"UPDATE connector_credential_pair SET processing_mode = 'REGULAR' "
"WHERE processing_mode = 'regular'"
"UPDATE connector_credential_pair SET processing_mode = 'REGULAR' WHERE processing_mode = 'regular'"
)
op.execute(
"UPDATE connector_credential_pair SET processing_mode = 'FILE_SYSTEM' "
"WHERE processing_mode = 'file_system'"
"UPDATE connector_credential_pair SET processing_mode = 'FILE_SYSTEM' WHERE processing_mode = 'file_system'"
)
# Update the server default to use uppercase

View File

@@ -289,8 +289,7 @@ def upgrade() -> None:
attributes_str = json.dumps(attributes).replace("'", "''")
op.execute(
sa.text(
f"UPDATE kg_entity_type SET attributes = '{attributes_str}'"
f"WHERE id_name = '{entity_type}'"
f"UPDATE kg_entity_type SET attributes = '{attributes_str}'WHERE id_name = '{entity_type}'"
),
)
@@ -312,7 +311,6 @@ def downgrade() -> None:
attributes_str = json.dumps(attributes).replace("'", "''")
op.execute(
sa.text(
f"UPDATE kg_entity_type SET attributes = '{attributes_str}'"
f"WHERE id_name = '{entity_type}'"
f"UPDATE kg_entity_type SET attributes = '{attributes_str}'WHERE id_name = '{entity_type}'"
),
)

View File

@@ -160,7 +160,7 @@ def remove_old_tags() -> None:
f"""
DELETE FROM document__tag
WHERE document_id = '{document_id}'
AND tag_id IN ({','.join(to_delete)})
AND tag_id IN ({",".join(to_delete)})
"""
)
)
@@ -239,7 +239,7 @@ def _get_batch_documents_with_multiple_tags(
).fetchall()
if not batch:
break
doc_ids = [document_id for document_id, in batch]
doc_ids = [document_id for (document_id,) in batch]
yield doc_ids
offset_clause = f"AND document__tag.document_id > '{doc_ids[-1]}'"

View File

@@ -24,8 +24,7 @@ TOOL_DESCRIPTIONS = {
"The action will be used when the user asks the agent to generate an image."
),
"WebSearchTool": (
"The Web Search Action allows the agent "
"to perform internet searches for up-to-date information."
"The Web Search Action allows the agent to perform internet searches for up-to-date information."
),
"KnowledgeGraphTool": (
"The Knowledge Graph Search Action allows the agent to search the "

View File

@@ -140,8 +140,7 @@ def _migrate_files_to_postgres() -> None:
# Fetch rows that have external storage pointers (bucket/object_key not NULL)
result = session.execute(
text(
"SELECT file_id, bucket_name, object_key FROM file_record "
"WHERE bucket_name IS NOT NULL AND object_key IS NOT NULL"
"SELECT file_id, bucket_name, object_key FROM file_record WHERE bucket_name IS NOT NULL AND object_key IS NOT NULL"
)
)
@@ -182,8 +181,7 @@ def _migrate_files_to_postgres() -> None:
# Update DB row: set lobj_oid, clear bucket/object_key
session.execute(
text(
"UPDATE file_record SET lobj_oid = :lobj_oid, bucket_name = NULL, "
"object_key = NULL WHERE file_id = :file_id"
"UPDATE file_record SET lobj_oid = :lobj_oid, bucket_name = NULL, object_key = NULL WHERE file_id = :file_id"
),
{"lobj_oid": lobj_oid, "file_id": file_id},
)
@@ -224,8 +222,7 @@ def _migrate_files_to_external_storage() -> None:
# Find all files currently stored in PostgreSQL (lobj_oid is not null)
result = session.execute(
text(
"SELECT file_id FROM file_record WHERE lobj_oid IS NOT NULL "
"AND bucket_name IS NULL AND object_key IS NULL"
"SELECT file_id FROM file_record WHERE lobj_oid IS NOT NULL AND bucket_name IS NULL AND object_key IS NULL"
)
)

View File

@@ -39,8 +39,7 @@ BUILT_IN_TOOLS = [
"name": "WebSearchTool",
"display_name": "Web Search",
"description": (
"The Web Search Action allows the assistant "
"to perform internet searches for up-to-date information."
"The Web Search Action allows the assistant to perform internet searches for up-to-date information."
),
"in_code_tool_id": "WebSearchTool",
},

View File

@@ -25,8 +25,7 @@ def verify_auth_setting() -> None:
raw_auth_type = (os.environ.get("AUTH_TYPE") or "").lower()
if raw_auth_type == "disabled":
logger.warning(
"AUTH_TYPE='disabled' is no longer supported. "
"Using 'basic' instead. Please update your configuration."
"AUTH_TYPE='disabled' is no longer supported. Using 'basic' instead. Please update your configuration."
)
logger.notice(f"Using Auth Type: {AUTH_TYPE.value}")

View File

@@ -59,7 +59,6 @@ def cloud_beat_task_generator(
# gated_tenants = get_gated_tenants()
for tenant_id in tenant_ids:
# Same comment here as the above NOTE
# if tenant_id in gated_tenants:
# continue

View File

@@ -424,10 +424,7 @@ def connector_permission_sync_generator_task(
raise ValueError(error_msg)
if not redis_connector.permissions.fenced: # The fence must exist
error_msg = (
f"connector_permission_sync_generator_task - fence not found: "
f"fence={redis_connector.permissions.fence_key}"
)
error_msg = f"connector_permission_sync_generator_task - fence not found: fence={redis_connector.permissions.fence_key}"
_fail_doc_permission_sync_attempt(attempt_id, error_msg)
raise ValueError(error_msg)
@@ -441,8 +438,7 @@ def connector_permission_sync_generator_task(
if payload.celery_task_id is None:
logger.info(
f"connector_permission_sync_generator_task - Waiting for fence: "
f"fence={redis_connector.permissions.fence_key}"
f"connector_permission_sync_generator_task - Waiting for fence: fence={redis_connector.permissions.fence_key}"
)
sleep(1)
continue
@@ -608,8 +604,7 @@ def connector_permission_sync_generator_task(
docs_with_permission_errors=docs_with_errors,
)
task_logger.info(
f"Completed doc permission sync attempt {attempt_id}: "
f"{tasks_generated} docs, {docs_with_errors} errors"
f"Completed doc permission sync attempt {attempt_id}: {tasks_generated} docs, {docs_with_errors} errors"
)
redis_connector.permissions.generator_complete = tasks_generated
@@ -716,9 +711,7 @@ def element_update_permissions(
elapsed = time.monotonic() - start
task_logger.info(
f"{element_type}={element_id} "
f"action=update_permissions "
f"elapsed={elapsed:.2f}"
f"{element_type}={element_id} action=update_permissions elapsed={elapsed:.2f}"
)
except Exception as e:
task_logger.exception(
@@ -900,8 +893,7 @@ def validate_permission_sync_fence(
tasks_not_in_celery += 1
task_logger.info(
"validate_permission_sync_fence task check: "
f"tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
f"validate_permission_sync_fence task check: tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
)
# we're active if there are still tasks to run and those tasks all exist in celery
@@ -1007,7 +999,10 @@ class PermissionSyncCallback(IndexingHeartbeatInterface):
def monitor_ccpair_permissions_taskset(
tenant_id: str, key_bytes: bytes, r: Redis, db_session: Session # noqa: ARG001
tenant_id: str,
key_bytes: bytes,
r: Redis, # noqa: ARG001
db_session: Session,
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
@@ -1031,8 +1026,7 @@ def monitor_ccpair_permissions_taskset(
payload = redis_connector.permissions.payload
except ValidationError:
task_logger.exception(
"Permissions sync payload failed to validate. "
"Schema may have been updated."
"Permissions sync payload failed to validate. Schema may have been updated."
)
return
@@ -1041,11 +1035,7 @@ def monitor_ccpair_permissions_taskset(
remaining = redis_connector.permissions.get_remaining()
task_logger.info(
f"Permissions sync progress: "
f"cc_pair={cc_pair_id} "
f"id={payload.id} "
f"remaining={remaining} "
f"initial={initial}"
f"Permissions sync progress: cc_pair={cc_pair_id} id={payload.id} remaining={remaining} initial={initial}"
)
# Add telemetry for permission syncing progress
@@ -1064,10 +1054,7 @@ def monitor_ccpair_permissions_taskset(
mark_cc_pair_as_permissions_synced(db_session, int(cc_pair_id), payload.started)
task_logger.info(
f"Permissions sync finished: "
f"cc_pair={cc_pair_id} "
f"id={payload.id} "
f"num_synced={initial}"
f"Permissions sync finished: cc_pair={cc_pair_id} id={payload.id} num_synced={initial}"
)
# Add telemetry for permission syncing complete

View File

@@ -111,23 +111,20 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
if cc_pair.access_type != AccessType.SYNC:
task_logger.error(
f"Received non-sync CC Pair {cc_pair.id} for external "
f"group sync. Actual access type: {cc_pair.access_type}"
f"Received non-sync CC Pair {cc_pair.id} for external group sync. Actual access type: {cc_pair.access_type}"
)
return False
if cc_pair.status == ConnectorCredentialPairStatus.DELETING:
task_logger.debug(
f"Skipping group sync for CC Pair {cc_pair.id} - "
f"CC Pair is being deleted"
f"Skipping group sync for CC Pair {cc_pair.id} - CC Pair is being deleted"
)
return False
sync_config = get_source_perm_sync_config(cc_pair.connector.source)
if sync_config is None:
task_logger.debug(
f"Skipping group sync for CC Pair {cc_pair.id} - "
f"no sync config found for {cc_pair.connector.source}"
f"Skipping group sync for CC Pair {cc_pair.id} - no sync config found for {cc_pair.connector.source}"
)
return False
@@ -135,8 +132,7 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
# This is fine because all sources dont necessarily have a concept of groups
if sync_config.group_sync_config is None:
task_logger.debug(
f"Skipping group sync for CC Pair {cc_pair.id} - "
f"no group sync config found for {cc_pair.connector.source}"
f"Skipping group sync for CC Pair {cc_pair.id} - no group sync config found for {cc_pair.connector.source}"
)
return False

View File

@@ -74,8 +74,7 @@ def perform_ttl_management_task(
except Exception:
logger.exception(
"delete_chat_session exceptioned. "
f"user_id={user_id} session_id={session_id}"
f"delete_chat_session exceptioned. user_id={user_id} session_id={session_id}"
)
with get_session_with_current_tenant() as db_session:
mark_task_as_finished_with_id(

View File

@@ -7,7 +7,8 @@ QUERY_HISTORY_TASK_NAME_PREFIX = OnyxCeleryTask.EXPORT_QUERY_HISTORY_TASK
def name_chat_ttl_task(
retention_limit_days: float, tenant_id: str | None = None # noqa: ARG001
retention_limit_days: float,
tenant_id: str | None = None, # noqa: ARG001
) -> str:
return f"chat_ttl_{retention_limit_days}_days"

View File

@@ -31,7 +31,8 @@ def fetch_query_analytics(
func.sum(case((ChatMessageFeedback.is_positive, 1), else_=0)),
func.sum(
case(
(ChatMessageFeedback.is_positive == False, 1), else_=0 # noqa: E712
(ChatMessageFeedback.is_positive == False, 1), # noqa: E712
else_=0, # noqa: E712
)
),
cast(ChatMessage.time_sent, Date),
@@ -66,7 +67,8 @@ def fetch_per_user_query_analytics(
func.sum(case((ChatMessageFeedback.is_positive, 1), else_=0)),
func.sum(
case(
(ChatMessageFeedback.is_positive == False, 1), else_=0 # noqa: E712
(ChatMessageFeedback.is_positive == False, 1), # noqa: E712
else_=0, # noqa: E712
)
),
cast(ChatMessage.time_sent, Date),

View File

@@ -23,8 +23,7 @@ def _delete_connector_credential_pair_user_groups_relationship__no_commit(
)
if cc_pair is None:
raise ValueError(
f"ConnectorCredentialPair with connector_id: {connector_id} "
f"and credential_id: {credential_id} not found"
f"ConnectorCredentialPair with connector_id: {connector_id} and credential_id: {credential_id} not found"
)
stmt = delete(UserGroup__ConnectorCredentialPair).where(

View File

@@ -123,8 +123,7 @@ def upsert_external_groups(
user_id = email_id_map.get(user_email.lower())
if user_id is None:
logger.warning(
f"User in group {external_group.id}"
f" with email {user_email} not found"
f"User in group {external_group.id} with email {user_email} not found"
)
continue

View File

@@ -191,8 +191,7 @@ def create_initial_default_standard_answer_category(db_session: Session) -> None
if default_category is not None:
if default_category.name != default_category_name:
raise ValueError(
"DB is not in a valid initial state. "
"Default standard answer category does not have expected name."
"DB is not in a valid initial state. Default standard answer category does not have expected name."
)
return

View File

@@ -424,8 +424,7 @@ def fetch_user_groups_for_documents(
def _check_user_group_is_modifiable(user_group: UserGroup) -> None:
if not user_group.is_up_to_date:
raise ValueError(
"Specified user group is currently syncing. Wait until the current "
"sync has finished before editing."
"Specified user group is currently syncing. Wait until the current sync has finished before editing."
)

View File

@@ -56,8 +56,7 @@ def _run_with_retry(
if retry_count < MAX_RETRY_COUNT:
sleep_after_rate_limit_exception(github_client)
logger.warning(
f"Rate limit exceeded while {description}. Retrying... "
f"(attempt {retry_count + 1}/{MAX_RETRY_COUNT})"
f"Rate limit exceeded while {description}. Retrying... (attempt {retry_count + 1}/{MAX_RETRY_COUNT})"
)
return _run_with_retry(
operation, description, github_client, retry_count + 1
@@ -91,7 +90,9 @@ class TeamInfo(BaseModel):
def _fetch_organization_members(
github_client: Github, org_name: str, retry_count: int = 0 # noqa: ARG001
github_client: Github,
org_name: str,
retry_count: int = 0, # noqa: ARG001
) -> List[UserInfo]:
"""Fetch all organization members including owners and regular members."""
org_members: List[UserInfo] = []
@@ -124,7 +125,9 @@ def _fetch_organization_members(
def _fetch_repository_teams_detailed(
repo: Repository, github_client: Github, retry_count: int = 0 # noqa: ARG001
repo: Repository,
github_client: Github,
retry_count: int = 0, # noqa: ARG001
) -> List[TeamInfo]:
"""Fetch teams with access to the repository and their members."""
teams_data: List[TeamInfo] = []
@@ -167,7 +170,9 @@ def _fetch_repository_teams_detailed(
def fetch_repository_team_slugs(
repo: Repository, github_client: Github, retry_count: int = 0 # noqa: ARG001
repo: Repository,
github_client: Github,
retry_count: int = 0, # noqa: ARG001
) -> List[str]:
"""Fetch team slugs with access to the repository."""
logger.info(f"Fetching team slugs for repository {repo.full_name}")

View File

@@ -115,8 +115,7 @@ def get_external_access_for_raw_gdrive_file(
)
if len(permissions_list) != len(permission_ids) and retriever_drive_service:
logger.warning(
f"Failed to get all permissions for file {doc_id} with retriever service, "
"trying admin service"
f"Failed to get all permissions for file {doc_id} with retriever service, trying admin service"
)
backup_permissions_list = _get_permissions(admin_drive_service)
permissions_list = _merge_permissions_lists(
@@ -166,9 +165,7 @@ def get_external_access_for_raw_gdrive_file(
user_emails.add(permission.email_address)
else:
logger.error(
"Permission is type `user` but no email address is "
f"provided for document {doc_id}"
f"\n {permission}"
f"Permission is type `user` but no email address is provided for document {doc_id}\n {permission}"
)
elif permission.type == PermissionType.GROUP:
# groups are represented as email addresses within Drive
@@ -176,17 +173,14 @@ def get_external_access_for_raw_gdrive_file(
group_emails.add(permission.email_address)
else:
logger.error(
"Permission is type `group` but no email address is "
f"provided for document {doc_id}"
f"\n {permission}"
f"Permission is type `group` but no email address is provided for document {doc_id}\n {permission}"
)
elif permission.type == PermissionType.DOMAIN and company_domain:
if permission.domain == company_domain:
public = True
else:
logger.warning(
"Permission is type domain but does not match company domain:"
f"\n {permission}"
f"Permission is type domain but does not match company domain:\n {permission}"
)
elif permission.type == PermissionType.ANYONE:
public = True

View File

@@ -18,10 +18,7 @@ logger = setup_logger()
# Only include fields we need - folder ID and permissions
# IMPORTANT: must fetch permissionIds, since sometimes the drive API
# seems to miss permissions when requesting them directly
FOLDER_PERMISSION_FIELDS = (
"nextPageToken, files(id, name, permissionIds, "
"permissions(id, emailAddress, type, domain, permissionDetails))"
)
FOLDER_PERMISSION_FIELDS = "nextPageToken, files(id, name, permissionIds, permissions(id, emailAddress, type, domain, permissionDetails))"
def get_folder_permissions_by_ids(

View File

@@ -142,8 +142,7 @@ def _drive_folder_to_onyx_group(
elif permission.type == PermissionType.GROUP:
if permission.email_address not in group_email_to_member_emails_map:
logger.warning(
f"Group email {permission.email_address} for folder {folder.id} "
"not found in group_email_to_member_emails_map"
f"Group email {permission.email_address} for folder {folder.id} not found in group_email_to_member_emails_map"
)
continue
folder_member_emails.update(
@@ -238,8 +237,7 @@ def _drive_member_map_to_onyx_groups(
for group_email in group_emails:
if group_email not in group_email_to_member_emails_map:
logger.warning(
f"Group email {group_email} for drive {drive_id} not found in "
"group_email_to_member_emails_map"
f"Group email {group_email} for drive {drive_id} not found in group_email_to_member_emails_map"
)
continue
drive_member_emails.update(group_email_to_member_emails_map[group_email])
@@ -326,8 +324,7 @@ def _build_onyx_groups(
for group_email in group_emails:
if group_email not in group_email_to_member_emails_map:
logger.warning(
f"Group email {group_email} for drive {drive_id} not found in "
"group_email_to_member_emails_map"
f"Group email {group_email} for drive {drive_id} not found in group_email_to_member_emails_map"
)
continue
drive_member_emails.update(group_email_to_member_emails_map[group_email])

View File

@@ -55,8 +55,7 @@ def get_permissions_by_ids(
if len(filtered_permissions) < len(permission_ids):
missing_ids = permission_id_set - {p.id for p in filtered_permissions if p.id}
logger.warning(
f"Could not find all requested permission IDs for document {doc_id}. "
f"Missing IDs: {missing_ids}"
f"Could not find all requested permission IDs for document {doc_id}. Missing IDs: {missing_ids}"
)
return filtered_permissions

View File

@@ -89,8 +89,7 @@ def _get_group_member_emails(
emails.add(email)
else:
logger.warning(
f"Atlassian user {member.get('accountId', 'unknown')} "
f"in group {group_name} has no visible email address"
f"Atlassian user {member.get('accountId', 'unknown')} in group {group_name} has no visible email address"
)
if page.get("isLast", True) or not members:

View File

@@ -69,8 +69,7 @@ def _post_query_chunk_censoring(
censored_chunks = censor_chunks_for_source(chunks_for_source, user.email)
except Exception as e:
logger.exception(
f"Failed to censor chunks for source {source} so throwing out all"
f" chunks for this source and continuing: {e}"
f"Failed to censor chunks for source {source} so throwing out all chunks for this source and continuing: {e}"
)
continue

View File

@@ -23,7 +23,9 @@ ContentRange = tuple[int, int | None] # (start_index, end_index) None means to
# NOTE: Used for testing timing
def _get_dummy_object_access_map(
object_ids: set[str], user_email: str, chunks: list[InferenceChunk] # noqa: ARG001
object_ids: set[str],
user_email: str, # noqa: ARG001
chunks: list[InferenceChunk], # noqa: ARG001
) -> dict[str, bool]:
time.sleep(0.15)
# return {object_id: True for object_id in object_ids}

View File

@@ -61,8 +61,7 @@ def _graph_api_get(
):
wait = min(int(resp.headers.get("Retry-After", str(2**attempt))), 60)
logger.warning(
f"Graph API {resp.status_code} on attempt {attempt + 1}, "
f"retrying in {wait}s: {url}"
f"Graph API {resp.status_code} on attempt {attempt + 1}, retrying in {wait}s: {url}"
)
time.sleep(wait)
continue
@@ -72,8 +71,7 @@ def _graph_api_get(
if attempt < GRAPH_API_MAX_RETRIES:
wait = min(2**attempt, 60)
logger.warning(
f"Graph API connection error on attempt {attempt + 1}, "
f"retrying in {wait}s: {url}"
f"Graph API connection error on attempt {attempt + 1}, retrying in {wait}s: {url}"
)
time.sleep(wait)
continue
@@ -767,8 +765,7 @@ def get_sharepoint_external_groups(
if not enumerate_all_ad_groups or get_access_token is None:
logger.info(
"Skipping exhaustive Azure AD group enumeration. "
"Only groups found in site role assignments are included."
"Skipping exhaustive Azure AD group enumeration. Only groups found in site role assignments are included."
)
return external_user_groups

View File

@@ -166,8 +166,7 @@ def slack_doc_sync(
user_id_to_email_map = fetch_user_id_to_email_map(slack_client)
if not user_id_to_email_map:
raise ValueError(
"No user id to email map found. Please check to make sure that "
"your Slack bot token has the `users:read.email` scope"
"No user id to email map found. Please check to make sure that your Slack bot token has the `users:read.email` scope"
)
workspace_permissions = _fetch_workspace_permissions(

View File

@@ -152,10 +152,7 @@ def create_new_usage_report(
zip_buffer.seek(0)
# store zip blob to file_store
report_name = (
f"{datetime.now(tz=timezone.utc).strftime('%Y-%m-%d')}"
f"_{report_id}_usage_report.zip"
)
report_name = f"{datetime.now(tz=timezone.utc).strftime('%Y-%m-%d')}_{report_id}_usage_report.zip"
file_store.save_file(
content=zip_buffer,
display_name=report_name,

View File

@@ -449,8 +449,7 @@ def _apply_group_remove(
match = _MEMBER_FILTER_RE.match(op.path)
if not match:
raise ScimPatchError(
f"Unsupported remove path '{op.path}'. "
'Expected: members[value eq "user-id"]'
f"Unsupported remove path '{op.path}'. Expected: members[value eq \"user-id\"]"
)
target_id = match.group(1)

View File

@@ -123,7 +123,8 @@ async def get_or_provision_tenant(
async def create_tenant(
email: str, referral_source: str | None = None # noqa: ARG001
email: str,
referral_source: str | None = None, # noqa: ARG001
) -> str:
"""
Create a new tenant on-demand when no pre-provisioned tenants are available.
@@ -679,7 +680,9 @@ async def setup_tenant(tenant_id: str) -> None:
async def assign_tenant_to_user(
tenant_id: str, email: str, referral_source: str | None = None # noqa: ARG001
tenant_id: str,
email: str,
referral_source: str | None = None, # noqa: ARG001
) -> None:
"""
Assign a tenant to a user and perform necessary operations.

View File

@@ -75,8 +75,7 @@ def _decrypt_bytes(input_bytes: bytes, key: str | None = None) -> str:
# Does NOT handle data encrypted with a different key — that
# ciphertext is not valid UTF-8 and will raise below.
logger.warning(
"AES decryption failed — falling back to raw decode. "
"Run the re-encrypt secrets script to rotate to the current key."
"AES decryption failed — falling back to raw decode. Run the re-encrypt secrets script to rotate to the current key."
)
try:
return input_bytes.decode()

View File

@@ -96,7 +96,9 @@ def get_access_for_documents(
return versioned_get_access_for_documents_fn(document_ids, db_session)
def _get_acl_for_user(user: User, db_session: Session) -> set[str]: # noqa: ARG001
def _get_acl_for_user(
user: User, db_session: Session # noqa: ARG001
) -> set[str]: # noqa: ARG001
"""Returns a list of ACL entries that the user has access to. This is meant to be
used downstream to filter out documents that the user does not have access to. The
user should have access to a document if at least one entry in the document's ACL

View File

@@ -5,7 +5,8 @@ from onyx.utils.variable_functionality import fetch_versioned_implementation
def _get_user_external_group_ids(
db_session: Session, user: User # noqa: ARG001
db_session: Session, # noqa: ARG001
user: User, # noqa: ARG001
) -> list[str]:
return []

View File

@@ -8,7 +8,6 @@ from onyx.configs.constants import PUBLIC_DOC_PAT
@dataclass(frozen=True)
class ExternalAccess:
# arbitrary limit to prevent excessively large permissions sets
# not internally enforced ... the caller can check this before using the instance
MAX_NUM_ENTRIES = 5000

View File

@@ -96,8 +96,7 @@ async def verify_captcha_token(
)
logger.debug(
f"Captcha verification passed: score={result.score}, "
f"action={result.action}"
f"Captcha verification passed: score={result.score}, action={result.action}"
)
except httpx.HTTPError as e:

View File

@@ -353,20 +353,11 @@ def build_user_email_invite(
"or login with Google and complete your registration.</p>"
)
elif auth_type == AuthType.BASIC:
message += (
"<p>To join the organization, please click the button below to set a password "
"and complete your registration.</p>"
)
message += "<p>To join the organization, please click the button below to set a password and complete your registration.</p>"
elif auth_type == AuthType.GOOGLE_OAUTH:
message += (
"<p>To join the organization, please click the button below to login with Google "
"and complete your registration.</p>"
)
message += "<p>To join the organization, please click the button below to login with Google and complete your registration.</p>"
elif auth_type == AuthType.OIDC or auth_type == AuthType.SAML:
message += (
"<p>To join the organization, please click the button below to"
" complete your registration.</p>"
)
message += "<p>To join the organization, please click the button below to complete your registration.</p>"
else:
raise ValueError(f"Invalid auth type: {auth_type}")

View File

@@ -168,8 +168,7 @@ def verify_auth_setting() -> None:
)
if raw_auth_type == "disabled":
logger.warning(
"AUTH_TYPE='disabled' is no longer supported. "
"Using 'basic' instead. Please update your configuration."
"AUTH_TYPE='disabled' is no longer supported. Using 'basic' instead. Please update your configuration."
)
logger.notice(f"Using Auth Type: {AUTH_TYPE.value}")
@@ -612,8 +611,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
char in PASSWORD_SPECIAL_CHARS for char in password
):
raise exceptions.InvalidPasswordException(
reason="Password must contain at least one special character from the following set: "
f"{PASSWORD_SPECIAL_CHARS}."
reason=f"Password must contain at least one special character from the following set: {PASSWORD_SPECIAL_CHARS}."
)
return
@@ -880,7 +878,10 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
)
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None # noqa: ARG002
self,
user: User,
token: str,
request: Optional[Request] = None, # noqa: ARG002
) -> None:
if not EMAIL_CONFIGURED:
logger.error(
@@ -899,7 +900,10 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
send_forgot_password_email(user.email, tenant_id=tenant_id, token=token)
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None # noqa: ARG002
self,
user: User,
token: str,
request: Optional[Request] = None, # noqa: ARG002
) -> None:
verify_email_domain(user.email)
@@ -1195,7 +1199,9 @@ class SingleTenantJWTStrategy(JWTStrategy[User, uuid.UUID]):
return
async def refresh_token(
self, token: Optional[str], user: User # noqa: ARG002
self,
token: Optional[str], # noqa: ARG002
user: User, # noqa: ARG002
) -> str:
"""Issue a fresh JWT with a new expiry."""
return await self.write_token(user)
@@ -1223,8 +1229,7 @@ def get_jwt_strategy() -> SingleTenantJWTStrategy:
if AUTH_BACKEND == AuthBackend.JWT:
if MULTI_TENANT or AUTH_TYPE == AuthType.CLOUD:
raise ValueError(
"JWT auth backend is only supported for single-tenant, self-hosted deployments. "
"Use 'redis' or 'postgres' instead."
"JWT auth backend is only supported for single-tenant, self-hosted deployments. Use 'redis' or 'postgres' instead."
)
if not USER_AUTH_SECRET:
raise ValueError("USER_AUTH_SECRET is required for JWT auth backend.")

View File

@@ -154,8 +154,7 @@ def on_task_postrun(
tenant_id = cast(str, kwargs.get("tenant_id", POSTGRES_DEFAULT_SCHEMA))
task_logger.debug(
f"Task {task.name} (ID: {task_id}) completed with state: {state} "
f"{f'for tenant_id={tenant_id}' if tenant_id else ''}"
f"Task {task.name} (ID: {task_id}) completed with state: {state} {f'for tenant_id={tenant_id}' if tenant_id else ''}"
)
r = get_redis_client(tenant_id=tenant_id)
@@ -211,7 +210,9 @@ def on_task_postrun(
def on_celeryd_init(
sender: str, conf: Any = None, **kwargs: Any # noqa: ARG001
sender: str, # noqa: ARG001
conf: Any = None, # noqa: ARG001
**kwargs: Any, # noqa: ARG001
) -> None:
"""The first signal sent on celery worker startup"""
@@ -277,10 +278,7 @@ def wait_for_redis(sender: Any, **kwargs: Any) -> None: # noqa: ARG001
time.sleep(WAIT_INTERVAL)
if not ready:
msg = (
f"Redis: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
msg = f"Redis: Readiness probe did not succeed within the timeout ({WAIT_LIMIT} seconds). Exiting..."
logger.error(msg)
raise WorkerShutdown(msg)
@@ -319,10 +317,7 @@ def wait_for_db(sender: Any, **kwargs: Any) -> None: # noqa: ARG001
time.sleep(WAIT_INTERVAL)
if not ready:
msg = (
f"Database: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
msg = f"Database: Readiness probe did not succeed within the timeout ({WAIT_LIMIT} seconds). Exiting..."
logger.error(msg)
raise WorkerShutdown(msg)
@@ -349,10 +344,7 @@ def on_secondary_worker_init(sender: Any, **kwargs: Any) -> None: # noqa: ARG00
f"Primary worker is not ready yet. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
)
if time_elapsed > WAIT_LIMIT:
msg = (
f"Primary worker was not ready within the timeout. "
f"({WAIT_LIMIT} seconds). Exiting..."
)
msg = f"Primary worker was not ready within the timeout. ({WAIT_LIMIT} seconds). Exiting..."
logger.error(msg)
raise WorkerShutdown(msg)
@@ -522,7 +514,9 @@ def reset_tenant_id(
CURRENT_TENANT_ID_CONTEXTVAR.set(POSTGRES_DEFAULT_SCHEMA)
def wait_for_vespa_or_shutdown(sender: Any, **kwargs: Any) -> None: # noqa: ARG001
def wait_for_vespa_or_shutdown(
sender: Any, **kwargs: Any # noqa: ARG001
) -> None: # noqa: ARG001
"""Waits for Vespa to become ready subject to a timeout.
Raises WorkerShutdown if the timeout is reached."""

View File

@@ -181,9 +181,7 @@ class DynamicTenantScheduler(PersistentScheduler):
if not do_update:
# exit early if nothing changed
task_logger.info(
f"_try_updating_schedule - Schedule unchanged: "
f"tasks={len(new_schedule)} "
f"beat_multiplier={beat_multiplier}"
f"_try_updating_schedule - Schedule unchanged: tasks={len(new_schedule)} beat_multiplier={beat_multiplier}"
)
return

View File

@@ -186,7 +186,6 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
# Check if the Celery task actually exists
try:
result: AsyncResult = AsyncResult(attempt.celery_task_id)
# If the task is not in PENDING state, it exists in Celery
@@ -207,8 +206,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
except Exception:
# If we can't check the task status, be conservative and continue
logger.warning(
f"Could not verify Celery task status on startup for attempt {attempt.id}, "
f"task_id={attempt.celery_task_id}"
f"Could not verify Celery task status on startup for attempt {attempt.id}, task_id={attempt.celery_task_id}"
)
@@ -278,8 +276,7 @@ class HubPeriodicTask(bootsteps.StartStopStep):
lock.reacquire()
else:
task_logger.warning(
"Full acquisition of primary worker lock. "
"Reasons could be worker restart or lock expiration."
"Full acquisition of primary worker lock. Reasons could be worker restart or lock expiration."
)
lock = r.lock(
OnyxRedisLocks.PRIMARY_WORKER,

View File

@@ -120,7 +120,7 @@ def _extract_from_batch(
if failed_id:
ids[failed_id] = None
logger.warning(
f"Failed to retrieve document {failed_id}: " f"{item.failure_message}"
f"Failed to retrieve document {failed_id}: {item.failure_message}"
)
else:
ids[item.id] = item.parent_hierarchy_raw_node_id

View File

@@ -307,14 +307,12 @@ def try_generate_document_cc_pair_cleanup_tasks(
if redis_connector.prune.fenced:
raise TaskDependencyError(
"Connector deletion - Delayed (pruning in progress): "
f"cc_pair={cc_pair_id}"
f"Connector deletion - Delayed (pruning in progress): cc_pair={cc_pair_id}"
)
if redis_connector.permissions.fenced:
raise TaskDependencyError(
f"Connector deletion - Delayed (permissions in progress): "
f"cc_pair={cc_pair_id}"
f"Connector deletion - Delayed (permissions in progress): cc_pair={cc_pair_id}"
)
# add tasks to celery and build up the task set to monitor in redis
@@ -354,8 +352,7 @@ def try_generate_document_cc_pair_cleanup_tasks(
# return 0
task_logger.info(
"RedisConnectorDeletion.generate_tasks finished. "
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
f"RedisConnectorDeletion.generate_tasks finished. cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
)
# set this only after all tasks have been added
@@ -366,7 +363,9 @@ def try_generate_document_cc_pair_cleanup_tasks(
def monitor_connector_deletion_taskset(
tenant_id: str, key_bytes: bytes, r: Redis # noqa: ARG001
tenant_id: str,
key_bytes: bytes,
r: Redis, # noqa: ARG001
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
@@ -690,8 +689,7 @@ def validate_connector_deletion_fence(
tasks_not_in_celery += 1
task_logger.info(
"validate_connector_deletion_fence task check: "
f"tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
f"validate_connector_deletion_fence task check: tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
)
# we're active if there are still tasks to run and those tasks all exist in celery

View File

@@ -109,9 +109,7 @@ def try_creating_docfetching_task(
except Exception:
task_logger.exception(
f"try_creating_indexing_task - Unexpected exception: "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings.id}"
f"try_creating_indexing_task - Unexpected exception: cc_pair={cc_pair.id} search_settings={search_settings.id}"
)
# Clean up on failure

View File

@@ -60,15 +60,13 @@ def _verify_indexing_attempt(
if attempt.connector_credential_pair_id != cc_pair_id:
raise SimpleJobException(
f"docfetching_task - CC pair mismatch: "
f"expected={cc_pair_id} actual={attempt.connector_credential_pair_id}",
f"docfetching_task - CC pair mismatch: expected={cc_pair_id} actual={attempt.connector_credential_pair_id}",
code=IndexingWatchdogTerminalStatus.FENCE_MISMATCH.code,
)
if attempt.search_settings_id != search_settings_id:
raise SimpleJobException(
f"docfetching_task - Search settings mismatch: "
f"expected={search_settings_id} actual={attempt.search_settings_id}",
f"docfetching_task - Search settings mismatch: expected={search_settings_id} actual={attempt.search_settings_id}",
code=IndexingWatchdogTerminalStatus.FENCE_MISMATCH.code,
)
@@ -77,8 +75,7 @@ def _verify_indexing_attempt(
IndexingStatus.IN_PROGRESS,
]:
raise SimpleJobException(
f"docfetching_task - Invalid attempt status: "
f"attempt_id={index_attempt_id} status={attempt.status}",
f"docfetching_task - Invalid attempt status: attempt_id={index_attempt_id} status={attempt.status}",
code=IndexingWatchdogTerminalStatus.FENCE_MISMATCH.code,
)
@@ -248,9 +245,7 @@ def _docfetching_task(
raise e
logger.info(
f"Indexing spawned task finished: attempt={index_attempt_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
f"Indexing spawned task finished: attempt={index_attempt_id} cc_pair={cc_pair_id} search_settings={search_settings_id}"
)
os._exit(0) # ensure process exits cleanly
@@ -286,8 +281,7 @@ def process_job_result(
result.status = IndexingWatchdogTerminalStatus.SUCCEEDED
task_logger.warning(
log_builder.build(
"Indexing watchdog - spawned task has non-zero exit code "
"but completion signal is OK. Continuing...",
"Indexing watchdog - spawned task has non-zero exit code but completion signal is OK. Continuing...",
exit_code=str(result.exit_code),
)
)
@@ -296,10 +290,7 @@ def process_job_result(
result.status = IndexingWatchdogTerminalStatus.from_code(result.exit_code)
job_level_exception = job.exception()
result.exception_str = (
f"Docfetching returned exit code {result.exit_code} "
f"with exception: {job_level_exception}"
)
result.exception_str = f"Docfetching returned exit code {result.exit_code} with exception: {job_level_exception}"
return result

View File

@@ -158,7 +158,6 @@ def validate_active_indexing_attempts(
logger.info("Validating active indexing attempts")
with get_session_with_current_tenant() as db_session:
# Find all active indexing attempts
active_attempts = (
db_session.execute(
@@ -190,8 +189,7 @@ def validate_active_indexing_attempts(
db_session.commit()
task_logger.info(
f"Initialized heartbeat tracking for attempt {fresh_attempt.id}: "
f"counter={fresh_attempt.heartbeat_counter}"
f"Initialized heartbeat tracking for attempt {fresh_attempt.id}: counter={fresh_attempt.heartbeat_counter}"
)
continue
@@ -214,8 +212,7 @@ def validate_active_indexing_attempts(
db_session.commit()
task_logger.debug(
f"Heartbeat advanced for attempt {fresh_attempt.id}: "
f"new_counter={current_counter}"
f"Heartbeat advanced for attempt {fresh_attempt.id}: new_counter={current_counter}"
)
continue
@@ -350,9 +347,7 @@ def monitor_indexing_attempt_progress(
)
except Exception as e:
logger.exception(
f"Failed to monitor document processing completion: "
f"attempt={attempt.id} "
f"error={str(e)}"
f"Failed to monitor document processing completion: attempt={attempt.id} error={str(e)}"
)
# Mark the attempt as failed if monitoring fails
@@ -401,9 +396,7 @@ def check_indexing_completion(
) -> None:
logger.info(
f"Checking for indexing completion: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id}"
f"Checking for indexing completion: attempt={index_attempt_id} tenant={tenant_id}"
)
# Check if indexing is complete and all batches are processed
@@ -445,7 +438,7 @@ def check_indexing_completion(
if attempt.status == IndexingStatus.IN_PROGRESS:
logger.error(
f"Indexing attempt {index_attempt_id} has been indexing for "
f"{stalled_timeout_hours//2}-{stalled_timeout_hours} hours without progress. "
f"{stalled_timeout_hours // 2}-{stalled_timeout_hours} hours without progress. "
f"Marking it as failed."
)
mark_attempt_failed(
@@ -695,17 +688,12 @@ def _kickoff_indexing_tasks(
if attempt_id is not None:
task_logger.info(
f"Connector indexing queued: "
f"index_attempt={attempt_id} "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings.id}"
f"Connector indexing queued: index_attempt={attempt_id} cc_pair={cc_pair.id} search_settings={search_settings.id}"
)
tasks_created += 1
else:
task_logger.error(
f"Failed to create indexing task: "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings.id}"
f"Failed to create indexing task: cc_pair={cc_pair.id} search_settings={search_settings.id}"
)
return tasks_created
@@ -901,9 +889,7 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
and secondary_search_settings.switchover_type == SwitchoverType.INSTANT
):
task_logger.info(
f"Skipping secondary indexing: "
f"switchover_type=INSTANT "
f"for search_settings={secondary_search_settings.id}"
f"Skipping secondary indexing: switchover_type=INSTANT for search_settings={secondary_search_settings.id}"
)
# 2/3: VALIDATE
@@ -1005,8 +991,7 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
lock_beat.release()
else:
task_logger.error(
"check_for_indexing - Lock not owned on completion: "
f"tenant={tenant_id}"
f"check_for_indexing - Lock not owned on completion: tenant={tenant_id}"
)
redis_lock_dump(lock_beat, redis_client)
@@ -1060,8 +1045,7 @@ def check_for_checkpoint_cleanup(self: Task, *, tenant_id: str) -> None:
lock.release()
else:
task_logger.error(
"check_for_checkpoint_cleanup - Lock not owned on completion: "
f"tenant={tenant_id}"
f"check_for_checkpoint_cleanup - Lock not owned on completion: tenant={tenant_id}"
)
@@ -1071,7 +1055,10 @@ def check_for_checkpoint_cleanup(self: Task, *, tenant_id: str) -> None:
bind=True,
)
def cleanup_checkpoint_task(
self: Task, *, index_attempt_id: int, tenant_id: str | None # noqa: ARG001
self: Task, # noqa: ARG001
*,
index_attempt_id: int,
tenant_id: str | None,
) -> None:
"""Clean up a checkpoint for a given index attempt"""
@@ -1084,9 +1071,7 @@ def cleanup_checkpoint_task(
elapsed = time.monotonic() - start
task_logger.info(
f"cleanup_checkpoint_task completed: tenant_id={tenant_id} "
f"index_attempt_id={index_attempt_id} "
f"elapsed={elapsed:.2f}"
f"cleanup_checkpoint_task completed: tenant_id={tenant_id} index_attempt_id={index_attempt_id} elapsed={elapsed:.2f}"
)
@@ -1149,8 +1134,7 @@ def check_for_index_attempt_cleanup(self: Task, *, tenant_id: str) -> None:
lock.release()
else:
task_logger.error(
"check_for_index_attempt_cleanup - Lock not owned on completion: "
f"tenant={tenant_id}"
f"check_for_index_attempt_cleanup - Lock not owned on completion: tenant={tenant_id}"
)
@@ -1160,7 +1144,10 @@ def check_for_index_attempt_cleanup(self: Task, *, tenant_id: str) -> None:
bind=True,
)
def cleanup_index_attempt_task(
self: Task, *, index_attempt_ids: list[int], tenant_id: str # noqa: ARG001
self: Task, # noqa: ARG001
*,
index_attempt_ids: list[int],
tenant_id: str,
) -> None:
"""Clean up an index attempt"""
start = time.monotonic()
@@ -1207,15 +1194,13 @@ def _check_failure_threshold(
FAILURE_RATIO_THRESHOLD = 0.1
if total_failures > FAILURE_THRESHOLD and failure_ratio > FAILURE_RATIO_THRESHOLD:
logger.error(
f"Connector run failed with '{total_failures}' errors "
f"after '{batch_num}' batches."
f"Connector run failed with '{total_failures}' errors after '{batch_num}' batches."
)
if last_failure and last_failure.exception:
raise last_failure.exception from last_failure.exception
raise RuntimeError(
f"Connector run encountered too many errors, aborting. "
f"Last error: {last_failure}"
f"Connector run encountered too many errors, aborting. Last error: {last_failure}"
)
@@ -1339,9 +1324,7 @@ def _docprocessing_task(
raise
task_logger.info(
f"Processing document batch: "
f"attempt={index_attempt_id} "
f"batch_num={batch_num} "
f"Processing document batch: attempt={index_attempt_id} batch_num={batch_num} "
)
# Get the document batch storage
@@ -1599,9 +1582,7 @@ def _docprocessing_task(
except Exception:
task_logger.exception(
f"Document batch processing failed: "
f"batch_num={batch_num} "
f"attempt={index_attempt_id} "
f"Document batch processing failed: batch_num={batch_num} attempt={index_attempt_id} "
)
raise

View File

@@ -84,8 +84,7 @@ def scheduled_eval_task(self: Task, **kwargs: Any) -> None: # noqa: ARG001
run_timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d")
logger.info(
f"Starting scheduled eval pipeline for project '{project_name}' "
f"with {len(dataset_names)} dataset(s): {dataset_names}"
f"Starting scheduled eval pipeline for project '{project_name}' with {len(dataset_names)} dataset(s): {dataset_names}"
)
pipeline_start = datetime.now(timezone.utc)
@@ -101,8 +100,7 @@ def scheduled_eval_task(self: Task, **kwargs: Any) -> None: # noqa: ARG001
try:
logger.info(
f"Running scheduled eval for dataset: {dataset_name} "
f"(project: {project_name})"
f"Running scheduled eval for dataset: {dataset_name} (project: {project_name})"
)
configuration = EvalConfigurationOptions(
@@ -142,6 +140,5 @@ def scheduled_eval_task(self: Task, **kwargs: Any) -> None: # noqa: ARG001
passed_count = sum(1 for r in results if r["success"])
logger.info(
f"Scheduled eval pipeline completed: {passed_count}/{len(results)} passed "
f"in {total_duration:.1f}s"
f"Scheduled eval pipeline completed: {passed_count}/{len(results)} passed in {total_duration:.1f}s"
)

View File

@@ -127,9 +127,7 @@ def _try_creating_hierarchy_fetching_task(
raise RuntimeError("send_task for hierarchy_fetching_task failed.")
task_logger.info(
f"Created hierarchy fetching task: "
f"cc_pair={cc_pair.id} "
f"celery_task_id={custom_task_id}"
f"Created hierarchy fetching task: cc_pair={cc_pair.id} celery_task_id={custom_task_id}"
)
return custom_task_id
@@ -215,8 +213,7 @@ def check_for_hierarchy_fetching(self: Task, *, tenant_id: str) -> int | None:
time_elapsed = time.monotonic() - time_start
task_logger.info(
f"check_for_hierarchy_fetching finished: "
f"tasks_created={tasks_created} elapsed={time_elapsed:.2f}s"
f"check_for_hierarchy_fetching finished: tasks_created={tasks_created} elapsed={time_elapsed:.2f}s"
)
return tasks_created
@@ -342,8 +339,7 @@ def connector_hierarchy_fetching_task(
from the connector source and stores it in the database.
"""
task_logger.info(
f"connector_hierarchy_fetching_task starting: "
f"cc_pair={cc_pair_id} tenant={tenant_id}"
f"connector_hierarchy_fetching_task starting: cc_pair={cc_pair_id} tenant={tenant_id}"
)
try:
@@ -361,8 +357,7 @@ def connector_hierarchy_fetching_task(
if cc_pair.status == ConnectorCredentialPairStatus.DELETING:
task_logger.info(
f"Skipping hierarchy fetching for deleting connector: "
f"cc_pair={cc_pair_id}"
f"Skipping hierarchy fetching for deleting connector: cc_pair={cc_pair_id}"
)
return
@@ -375,8 +370,7 @@ def connector_hierarchy_fetching_task(
)
task_logger.info(
f"connector_hierarchy_fetching_task: "
f"Extracted {total_nodes} hierarchy nodes for cc_pair={cc_pair_id}"
f"connector_hierarchy_fetching_task: Extracted {total_nodes} hierarchy nodes for cc_pair={cc_pair_id}"
)
# Update the last fetch time to prevent re-running until next interval

View File

@@ -18,7 +18,9 @@ from onyx.llm.well_known_providers.auto_update_service import (
bind=True,
)
def check_for_auto_llm_updates(
self: Task, *, tenant_id: str # noqa: ARG001
self: Task, # noqa: ARG001
*,
tenant_id: str, # noqa: ARG001
) -> bool | None:
"""Periodic task to fetch LLM model updates from GitHub
and sync them to providers in Auto mode.

View File

@@ -116,8 +116,7 @@ class Metric(BaseModel):
string_value = self.value
else:
task_logger.error(
f"Invalid metric value type: {type(self.value)} "
f"({self.value}) for metric {self.name}."
f"Invalid metric value type: {type(self.value)} ({self.value}) for metric {self.name}."
)
return
@@ -260,8 +259,7 @@ def _build_connector_final_metrics(
)
if _has_metric_been_emitted(redis_std, metric_key):
task_logger.info(
f"Skipping final metrics for connector {cc_pair.connector.id} "
f"index attempt {attempt.id}, already emitted."
f"Skipping final metrics for connector {cc_pair.connector.id} index attempt {attempt.id}, already emitted."
)
continue
@@ -1036,8 +1034,7 @@ def monitor_process_memory(self: Task, *, tenant_id: str) -> None: # noqa: ARG0
if process_name in cmdline:
if process_type in supervisor_processes.values():
task_logger.error(
f"Duplicate process type for type {process_type} "
f"with cmd {cmdline} with pid={proc.pid}."
f"Duplicate process type for type {process_type} with cmd {cmdline} with pid={proc.pid}."
)
continue
@@ -1046,8 +1043,7 @@ def monitor_process_memory(self: Task, *, tenant_id: str) -> None: # noqa: ARG0
if len(supervisor_processes) != len(process_type_mapping):
task_logger.error(
"Missing processes: "
f"{set(process_type_mapping.keys()).symmetric_difference(supervisor_processes.values())}"
f"Missing processes: {set(process_type_mapping.keys()).symmetric_difference(supervisor_processes.values())}"
)
# Log memory usage for each process
@@ -1101,9 +1097,7 @@ def cloud_monitor_celery_pidbox(
r_celery.delete(key)
task_logger.info(
f"Deleted idle pidbox: pidbox={key_str} "
f"idletime={idletime} "
f"max_idletime={MAX_PIDBOX_IDLE}"
f"Deleted idle pidbox: pidbox={key_str} idletime={idletime} max_idletime={MAX_PIDBOX_IDLE}"
)
num_deleted += 1

View File

@@ -205,8 +205,7 @@ def migrate_chunks_from_vespa_to_opensearch_task(
) = get_vespa_visit_state(db_session)
if is_continuation_token_done_for_all_slices(continuation_token_map):
task_logger.info(
f"OpenSearch migration COMPLETED for tenant {tenant_id}. "
f"Total chunks migrated: {total_chunks_migrated}."
f"OpenSearch migration COMPLETED for tenant {tenant_id}. Total chunks migrated: {total_chunks_migrated}."
)
mark_migration_completed_time_if_not_set_with_commit(db_session)
break

View File

@@ -151,8 +151,7 @@ def _resolve_and_update_document_parents(
commit=True,
)
task_logger.info(
f"Pruning: resolved and updated parent hierarchy for "
f"{len(resolved)} documents (source={source.value})"
f"Pruning: resolved and updated parent hierarchy for {len(resolved)} documents (source={source.value})"
)
@@ -220,7 +219,6 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
# but pruning only kicks off once per hour
if not r.exists(OnyxRedisSignals.BLOCK_PRUNING):
task_logger.info("Checking for pruning due")
cc_pair_ids: list[int] = []
@@ -484,8 +482,7 @@ def connector_pruning_generator_task(
if not redis_connector.prune.fenced: # The fence must exist
raise ValueError(
f"connector_prune_generator_task - fence not found: "
f"fence={redis_connector.prune.fence_key}"
f"connector_prune_generator_task - fence not found: fence={redis_connector.prune.fence_key}"
)
payload = redis_connector.prune.payload # The payload must exist
@@ -496,8 +493,7 @@ def connector_pruning_generator_task(
if payload.celery_task_id is None:
logger.info(
f"connector_prune_generator_task - Waiting for fence: "
f"fence={redis_connector.prune.fence_key}"
f"connector_prune_generator_task - Waiting for fence: fence={redis_connector.prune.fence_key}"
)
time.sleep(1)
continue
@@ -553,9 +549,7 @@ def connector_pruning_generator_task(
redis_connector.prune.set_fence(new_payload)
task_logger.info(
f"Pruning generator running connector: "
f"cc_pair={cc_pair_id} "
f"connector_source={cc_pair.connector.source}"
f"Pruning generator running connector: cc_pair={cc_pair_id} connector_source={cc_pair.connector.source}"
)
runnable_connector = instantiate_connector(
@@ -673,8 +667,7 @@ def connector_pruning_generator_task(
return None
task_logger.info(
"RedisConnector.prune.generate_tasks finished. "
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
f"RedisConnector.prune.generate_tasks finished. cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
)
redis_connector.prune.generator_complete = tasks_generated
@@ -717,9 +710,7 @@ def connector_pruning_generator_task(
)
except Exception as e:
task_logger.exception(
f"Pruning exceptioned: cc_pair={cc_pair_id} "
f"connector={connector_id} "
f"payload_id={payload_id}"
f"Pruning exceptioned: cc_pair={cc_pair_id} connector={connector_id} payload_id={payload_id}"
)
redis_connector.prune.reset()
@@ -737,7 +728,10 @@ def connector_pruning_generator_task(
def monitor_ccpair_pruning_taskset(
tenant_id: str, key_bytes: bytes, r: Redis, db_session: Session # noqa: ARG001
tenant_id: str,
key_bytes: bytes,
r: Redis, # noqa: ARG001
db_session: Session,
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
@@ -931,8 +925,7 @@ def validate_pruning_fence(
tasks_not_in_celery += 1
task_logger.info(
"validate_pruning_fence task check: "
f"tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
f"validate_pruning_fence task check: tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
)
# we're active if there are still tasks to run and those tasks all exist in celery

View File

@@ -192,10 +192,7 @@ def document_by_cc_pair_cleanup_task(
elapsed = time.monotonic() - start
task_logger.info(
f"doc={document_id} "
f"action={action} "
f"refcount={count} "
f"elapsed={elapsed:.2f}"
f"doc={document_id} action={action} refcount={count} elapsed={elapsed:.2f}"
)
except SoftTimeLimitExceeded:
task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}")
@@ -218,9 +215,7 @@ def document_by_cc_pair_cleanup_task(
if isinstance(e, httpx.HTTPStatusError):
if e.response.status_code == HTTPStatus.BAD_REQUEST:
task_logger.exception(
f"Non-retryable HTTPStatusError: "
f"doc={document_id} "
f"status={e.response.status_code}"
f"Non-retryable HTTPStatusError: doc={document_id} status={e.response.status_code}"
)
completion_status = (
OnyxCeleryTaskCompletionStatus.NON_RETRYABLE_EXCEPTION
@@ -239,8 +234,7 @@ def document_by_cc_pair_cleanup_task(
# This is the last attempt! mark the document as dirty in the db so that it
# eventually gets fixed out of band via stale document reconciliation
task_logger.warning(
f"Max celery task retries reached. Marking doc as dirty for reconciliation: "
f"doc={document_id}"
f"Max celery task retries reached. Marking doc as dirty for reconciliation: doc={document_id}"
)
with get_session_with_current_tenant() as db_session:
# delete the cc pair relationship now and let reconciliation clean it up
@@ -285,4 +279,4 @@ def celery_beat_heartbeat(self: Task, *, tenant_id: str) -> None: # noqa: ARG00
r: Redis = get_redis_client()
r.set(ONYX_CELERY_BEAT_HEARTBEAT_KEY, 1, ex=600)
time_elapsed = time.monotonic() - time_start
task_logger.info(f"celery_beat_heartbeat finished: " f"elapsed={time_elapsed:.2f}")
task_logger.info(f"celery_beat_heartbeat finished: elapsed={time_elapsed:.2f}")

View File

@@ -285,8 +285,7 @@ def check_user_file_processing(self: Task, *, tenant_id: str) -> None:
lock.release()
task_logger.info(
f"check_user_file_processing - Enqueued {enqueued} skipped_guard={skipped_guard} "
f"tasks for tenant={tenant_id}"
f"check_user_file_processing - Enqueued {enqueued} skipped_guard={skipped_guard} tasks for tenant={tenant_id}"
)
return None
@@ -317,8 +316,7 @@ def _process_user_file_without_vector_db(
token_count: int | None = len(encode(combined_text))
except Exception:
task_logger.warning(
f"_process_user_file_without_vector_db - "
f"Failed to compute token count for {uf.id}, falling back to None"
f"_process_user_file_without_vector_db - Failed to compute token count for {uf.id}, falling back to None"
)
token_count = None
@@ -338,8 +336,7 @@ def _process_user_file_without_vector_db(
db_session.commit()
task_logger.info(
f"_process_user_file_without_vector_db - "
f"Completed id={uf.id} tokens={token_count}"
f"_process_user_file_without_vector_db - Completed id={uf.id} tokens={token_count}"
)
@@ -366,8 +363,7 @@ def _process_user_file_with_indexing(
)
if current_search_settings is None:
raise RuntimeError(
f"_process_user_file_with_indexing - "
f"No current search settings found for tenant={tenant_id}"
f"_process_user_file_with_indexing - No current search settings found for tenant={tenant_id}"
)
adapter = UserFileIndexingAdapter(
@@ -397,8 +393,7 @@ def _process_user_file_with_indexing(
)
task_logger.info(
f"_process_user_file_with_indexing - "
f"Indexing pipeline completed ={index_pipeline_result}"
f"_process_user_file_with_indexing - Indexing pipeline completed ={index_pipeline_result}"
)
if (
@@ -407,8 +402,7 @@ def _process_user_file_with_indexing(
or index_pipeline_result.total_chunks == 0
):
task_logger.error(
f"_process_user_file_with_indexing - "
f"Indexing pipeline failed id={user_file_id}"
f"_process_user_file_with_indexing - Indexing pipeline failed id={user_file_id}"
)
if uf.status != UserFileStatus.DELETING:
uf.status = UserFileStatus.FAILED
@@ -535,7 +529,10 @@ def process_user_file_impl(
ignore_result=True,
)
def process_single_user_file(
self: Task, *, user_file_id: str, tenant_id: str # noqa: ARG001
self: Task, # noqa: ARG001
*,
user_file_id: str,
tenant_id: str,
) -> None:
process_user_file_impl(
user_file_id=user_file_id, tenant_id=tenant_id, redis_locking=True
@@ -691,7 +688,10 @@ def delete_user_file_impl(
ignore_result=True,
)
def process_single_user_file_delete(
self: Task, *, user_file_id: str, tenant_id: str # noqa: ARG001
self: Task, # noqa: ARG001
*,
user_file_id: str,
tenant_id: str,
) -> None:
delete_user_file_impl(
user_file_id=user_file_id, tenant_id=tenant_id, redis_locking=True
@@ -761,8 +761,7 @@ def check_for_user_file_project_sync(self: Task, *, tenant_id: str) -> None:
lock.release()
task_logger.info(
f"Enqueued {enqueued} "
f"Skipped guard {skipped_guard} tasks for tenant={tenant_id}"
f"Enqueued {enqueued} Skipped guard {skipped_guard} tasks for tenant={tenant_id}"
)
return None
@@ -876,7 +875,10 @@ def project_sync_user_file_impl(
ignore_result=True,
)
def process_single_user_file_project_sync(
self: Task, *, user_file_id: str, tenant_id: str # noqa: ARG001
self: Task, # noqa: ARG001
*,
user_file_id: str,
tenant_id: str,
) -> None:
project_sync_user_file_impl(
user_file_id=user_file_id, tenant_id=tenant_id, redis_locking=True

View File

@@ -199,8 +199,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str) -> bool | None:
lock_beat.release()
else:
task_logger.error(
"check_for_vespa_sync_task - Lock not owned on completion: "
f"tenant={tenant_id}"
f"check_for_vespa_sync_task - Lock not owned on completion: tenant={tenant_id}"
)
redis_lock_dump(lock_beat, r)
@@ -266,8 +265,7 @@ def try_generate_document_set_sync_tasks(
# return 0
task_logger.info(
f"RedisDocumentSet.generate_tasks finished. "
f"document_set={document_set.id} tasks_generated={tasks_generated}"
f"RedisDocumentSet.generate_tasks finished. document_set={document_set.id} tasks_generated={tasks_generated}"
)
# create before setting fence to avoid race condition where the monitoring
@@ -342,8 +340,7 @@ def try_generate_user_group_sync_tasks(
# return 0
task_logger.info(
f"RedisUserGroup.generate_tasks finished. "
f"usergroup={usergroup.id} tasks_generated={tasks_generated}"
f"RedisUserGroup.generate_tasks finished. usergroup={usergroup.id} tasks_generated={tasks_generated}"
)
# create before setting fence to avoid race condition where the monitoring
@@ -398,8 +395,7 @@ def monitor_document_set_taskset(
count = cast(int, r.scard(rds.taskset_key))
task_logger.info(
f"Document set sync progress: document_set={document_set_id} "
f"remaining={count} initial={initial_count}"
f"Document set sync progress: document_set={document_set_id} remaining={count} initial={initial_count}"
)
if count > 0:
update_sync_record_status(
@@ -444,9 +440,7 @@ def monitor_document_set_taskset(
)
except Exception:
task_logger.exception(
"update_sync_record_status exceptioned. "
f"document_set_id={document_set_id} "
"Resetting document set regardless."
f"update_sync_record_status exceptioned. document_set_id={document_set_id} Resetting document set regardless."
)
rds.reset()
@@ -483,9 +477,7 @@ def vespa_metadata_sync_task(self: Task, document_id: str, *, tenant_id: str) ->
if not doc:
elapsed = time.monotonic() - start
task_logger.info(
f"doc={document_id} "
f"action=no_operation "
f"elapsed={elapsed:.2f}"
f"doc={document_id} action=no_operation elapsed={elapsed:.2f}"
)
completion_status = OnyxCeleryTaskCompletionStatus.SKIPPED
else:
@@ -524,9 +516,7 @@ def vespa_metadata_sync_task(self: Task, document_id: str, *, tenant_id: str) ->
mark_document_as_synced(document_id, db_session)
elapsed = time.monotonic() - start
task_logger.info(
f"doc={document_id} " f"action=sync " f"elapsed={elapsed:.2f}"
)
task_logger.info(f"doc={document_id} action=sync elapsed={elapsed:.2f}")
completion_status = OnyxCeleryTaskCompletionStatus.SUCCEEDED
except SoftTimeLimitExceeded:
task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}")
@@ -549,9 +539,7 @@ def vespa_metadata_sync_task(self: Task, document_id: str, *, tenant_id: str) ->
if isinstance(e, httpx.HTTPStatusError):
if e.response.status_code == HTTPStatus.BAD_REQUEST:
task_logger.exception(
f"Non-retryable HTTPStatusError: "
f"doc={document_id} "
f"status={e.response.status_code}"
f"Non-retryable HTTPStatusError: doc={document_id} status={e.response.status_code}"
)
completion_status = (
OnyxCeleryTaskCompletionStatus.NON_RETRYABLE_EXCEPTION

View File

@@ -175,14 +175,16 @@ class SimpleJobClient:
del self.jobs[job.id]
def submit(
self, func: Callable, *args: Any, pure: bool = True # noqa: ARG002
self,
func: Callable,
*args: Any,
pure: bool = True, # noqa: ARG002
) -> SimpleJob | None:
"""NOTE: `pure` arg is needed so this can be a drop in replacement for Dask"""
self._cleanup_completed_jobs()
if len(self.jobs) >= self.n_workers:
logger.debug(
f"No available workers to run job. "
f"Currently running '{len(self.jobs)}' jobs, with a limit of '{self.n_workers}'."
f"No available workers to run job. Currently running '{len(self.jobs)}' jobs, with a limit of '{self.n_workers}'."
)
return None

View File

@@ -226,15 +226,13 @@ def _check_failure_threshold(
FAILURE_RATIO_THRESHOLD = 0.1
if total_failures > FAILURE_THRESHOLD and failure_ratio > FAILURE_RATIO_THRESHOLD:
logger.error(
f"Connector run failed with '{total_failures}' errors "
f"after '{batch_num}' batches."
f"Connector run failed with '{total_failures}' errors after '{batch_num}' batches."
)
if last_failure and last_failure.exception:
raise last_failure.exception from last_failure.exception
raise RuntimeError(
f"Connector run encountered too many errors, aborting. "
f"Last error: {last_failure}"
f"Connector run encountered too many errors, aborting. Last error: {last_failure}"
)
@@ -609,8 +607,7 @@ def connector_document_extraction(
)
logger.debug(
f"Persisted and cached {len(hierarchy_node_batch_cleaned)} hierarchy nodes "
f"for attempt={index_attempt_id}"
f"Persisted and cached {len(hierarchy_node_batch_cleaned)} hierarchy nodes for attempt={index_attempt_id}"
)
# below is all document processing task, so if no batch we can just continue
@@ -812,15 +809,12 @@ def connector_document_extraction(
queue=OnyxCeleryQueues.SANDBOX,
)
logger.info(
f"Triggered sandbox file sync for user {creator_id} "
f"source={source_value} after indexing complete"
f"Triggered sandbox file sync for user {creator_id} source={source_value} after indexing complete"
)
except Exception as e:
logger.exception(
f"Document extraction failed: "
f"attempt={index_attempt_id} "
f"error={str(e)}"
f"Document extraction failed: attempt={index_attempt_id} error={str(e)}"
)
# Do NOT clean up batches on failure; future runs will use those batches
@@ -956,7 +950,6 @@ def reissue_old_batches(
# is still in the filestore waiting for processing or not.
last_batch_num = len(old_batches) + recent_batches
logger.info(
f"Starting from batch {last_batch_num} due to "
f"re-issued batches: {old_batches}, completed batches: {recent_batches}"
f"Starting from batch {last_batch_num} due to re-issued batches: {old_batches}, completed batches: {recent_batches}"
)
return len(old_batches), recent_batches

View File

@@ -259,8 +259,7 @@ def _poller_loop(tenant_id: str) -> None:
periodic_tasks = _build_periodic_tasks()
logger.info(
f"Periodic poller started with {len(periodic_tasks)} periodic task(s): "
f"{[t.name for t in periodic_tasks]}"
f"Periodic poller started with {len(periodic_tasks)} periodic task(s): {[t.name for t in periodic_tasks]}"
)
while not _shutdown_event.is_set():

View File

@@ -38,8 +38,7 @@ def get_cache_backend(*, tenant_id: str | None = None) -> CacheBackend:
builder = _BACKEND_BUILDERS.get(CACHE_BACKEND)
if builder is None:
raise ValueError(
f"Unsupported CACHE_BACKEND={CACHE_BACKEND!r}. "
f"Supported values: {[t.value for t in CacheBackendType]}"
f"Unsupported CACHE_BACKEND={CACHE_BACKEND!r}. Supported values: {[t.value for t in CacheBackendType]}"
)
return builder(tenant_id)

View File

@@ -270,7 +270,10 @@ def extract_headers(
def process_kg_commands(
message: str, persona_name: str, tenant_id: str, db_session: Session # noqa: ARG001
message: str,
persona_name: str,
tenant_id: str, # noqa: ARG001
db_session: Session,
) -> None:
# Temporarily, until we have a draft UI for the KG Operations/Management
# TODO: move to api endpoint once we get frontend

View File

@@ -472,8 +472,7 @@ class DynamicCitationProcessor:
# Check if we have a mapping for this citation number
if num not in self.citation_to_doc:
logger.warning(
f"Citation number {num} not found in mapping. "
f"Available: {list(self.citation_to_doc.keys())}"
f"Citation number {num} not found in mapping. Available: {list(self.citation_to_doc.keys())}"
)
continue

View File

@@ -157,8 +157,7 @@ def _try_fallback_tool_extraction(
)
if extracted_tool_calls:
logger.info(
f"Extracted {len(extracted_tool_calls)} tool call(s) from response text "
"as fallback"
f"Extracted {len(extracted_tool_calls)} tool call(s) from response text as fallback"
)
return (
LlmStepResult(
@@ -397,8 +396,7 @@ def construct_message_history(
]
if forgotten_meta:
logger.debug(
f"FileReader: building forgotten-files message for "
f"{[(m.file_id, m.filename) for m in forgotten_meta]}"
f"FileReader: building forgotten-files message for {[(m.file_id, m.filename) for m in forgotten_meta]}"
)
forgotten_files_message = _create_file_tool_metadata_message(
forgotten_meta, token_counter
@@ -488,8 +486,7 @@ def _drop_orphaned_tool_call_responses(
sanitized.append(msg)
else:
logger.debug(
"Dropping orphaned tool response with tool_call_id=%s while "
"constructing message history",
"Dropping orphaned tool response with tool_call_id=%s while constructing message history",
msg.tool_call_id,
)
continue
@@ -515,8 +512,7 @@ def _create_file_tool_metadata_message(
]
for meta in file_metadata:
lines.append(
f'- file_id="{meta.file_id}" filename="{meta.filename}" '
f"(~{meta.approx_char_count:,} chars)"
f'- file_id="{meta.file_id}" filename="{meta.filename}" (~{meta.approx_char_count:,} chars)'
)
message_content = "\n".join(lines)

View File

@@ -695,8 +695,7 @@ def _build_structured_assistant_message(msg: ChatMessageSimple) -> AssistantMess
def _build_structured_tool_response_message(msg: ChatMessageSimple) -> ToolMessage:
if not msg.tool_call_id:
raise ValueError(
"Tool call response message encountered but tool_call_id is not available. "
f"Message: {msg}"
f"Tool call response message encountered but tool_call_id is not available. Message: {msg}"
)
return ToolMessage(
@@ -731,8 +730,7 @@ class _OllamaHistoryMessageFormatter(_HistoryMessageFormatter):
tool_call_lines = [
(
f"[Tool Call] name={tc.tool_name} "
f"id={tc.tool_call_id} args={json.dumps(tc.tool_arguments)}"
f"[Tool Call] name={tc.tool_name} id={tc.tool_call_id} args={json.dumps(tc.tool_arguments)}"
)
for tc in msg.tool_calls
]
@@ -750,8 +748,7 @@ class _OllamaHistoryMessageFormatter(_HistoryMessageFormatter):
def format_tool_response_message(self, msg: ChatMessageSimple) -> UserMessage:
if not msg.tool_call_id:
raise ValueError(
"Tool call response message encountered but tool_call_id is not available. "
f"Message: {msg}"
f"Tool call response message encountered but tool_call_id is not available. Message: {msg}"
)
return UserMessage(
@@ -839,8 +836,7 @@ def translate_history_to_llm_format(
content_parts.append(image_part)
except Exception as e:
logger.warning(
f"Failed to process image file {img_file.file_id}: {e}. "
"Skipping image."
f"Failed to process image file {img_file.file_id}: {e}. Skipping image."
)
user_msg = UserMessage(
role="user",

View File

@@ -796,8 +796,7 @@ def handle_stream_message_objects(
if all_injected_file_metadata:
logger.debug(
"FileReader: file metadata for LLM: "
f"{[(fid, m.filename) for fid, m in all_injected_file_metadata.items()]}"
f"FileReader: file metadata for LLM: {[(fid, m.filename) for fid, m in all_injected_file_metadata.items()]}"
)
# Prepend summary message if compression exists

View File

@@ -87,8 +87,7 @@ def _create_and_link_tool_calls(
tool_call_tokens = len(default_tokenizer.encode(arguments_json_str))
except Exception as e:
logger.warning(
f"Failed to tokenize tool call arguments for {tool_call_info.tool_call_id}: {e}. "
f"Using length as (over) estimate."
f"Failed to tokenize tool call arguments for {tool_call_info.tool_call_id}: {e}. Using length as (over) estimate."
)
arguments_json_str = json.dumps(tool_call_info.tool_call_arguments)
tool_call_tokens = len(arguments_json_str)

View File

@@ -90,8 +90,7 @@ def parse_airtable_url(
match = _AIRTABLE_URL_PATTERN.search(url.strip())
if not match:
raise ValueError(
f"Could not parse Airtable URL: '{url}'. "
"Expected format: https://airtable.com/appXXX/tblYYY[/viwZZZ]"
f"Could not parse Airtable URL: '{url}'. Expected format: https://airtable.com/appXXX/tblYYY[/viwZZZ]"
)
return match.group(1), match.group(2), match.group(3)
@@ -170,16 +169,14 @@ class AirtableConnector(LoadConnector):
else:
if not self.base_id or not self.table_name_or_id:
raise ConnectorValidationError(
"A valid Airtable URL or base_id and table_name_or_id are required "
"when not using index_all mode."
"A valid Airtable URL or base_id and table_name_or_id are required when not using index_all mode."
)
try:
table = self.airtable_client.table(self.base_id, self.table_name_or_id)
table.schema()
except Exception as e:
raise ConnectorValidationError(
f"Failed to access table '{self.table_name_or_id}' "
f"in base '{self.base_id}': {e}"
f"Failed to access table '{self.table_name_or_id}' in base '{self.base_id}': {e}"
)
@classmethod
@@ -391,10 +388,7 @@ class AirtableConnector(LoadConnector):
TextSection(
link=link,
text=(
f"{field_name}:\n"
"------------------------\n"
f"{text}\n"
"------------------------"
f"{field_name}:\n------------------------\n{text}\n------------------------"
),
)
for text, link in field_value_and_links
@@ -440,8 +434,7 @@ class AirtableConnector(LoadConnector):
field_type = field_schema.type
logger.debug(
f"Processing field '{field_name}' of type '{field_type}' "
f"for record '{record_id}'."
f"Processing field '{field_name}' of type '{field_type}' for record '{record_id}'."
)
field_sections, field_metadata = self._process_field(
@@ -534,8 +527,7 @@ class AirtableConnector(LoadConnector):
break
logger.info(
f"Processing {len(records)} records from table "
f"'{table_schema.name}' in base '{base_name or base_id}'."
f"Processing {len(records)} records from table '{table_schema.name}' in base '{base_name or base_id}'."
)
if not records:
@@ -629,7 +621,6 @@ class AirtableConnector(LoadConnector):
)
except Exception:
logger.exception(
f"Failed to index table '{table.name}' ({table.id}) "
f"in base '{base_name}' ({base_id}), skipping."
f"Failed to index table '{table.name}' ({table.id}) in base '{base_name}' ({base_id}), skipping."
)
continue

View File

@@ -68,7 +68,7 @@ class ClickupConnector(LoadConnector, PollConnector):
response = self._make_request(url_endpoint)
comments = [
TextSection(
link=f'https://app.clickup.com/t/{task_id}?comment={comment_dict["id"]}',
link=f"https://app.clickup.com/t/{task_id}?comment={comment_dict['id']}",
text=comment_dict["comment_text"],
)
for comment_dict in response["comments"]

View File

@@ -698,8 +698,7 @@ class CodaConnector(LoadConnector, PollConnector):
)
elif e.status_code == 429:
raise ConnectorValidationError(
"Validation failed due to Coda rate-limits being exceeded (HTTP 429). "
"Please try again later."
"Validation failed due to Coda rate-limits being exceeded (HTTP 429). Please try again later."
)
else:
raise UnexpectedValidationError(

View File

@@ -95,7 +95,6 @@ def _get_page_id(page: dict[str, Any], allow_missing: bool = False) -> str:
class ConfluenceCheckpoint(ConnectorCheckpoint):
next_page_url: str | None

View File

@@ -296,8 +296,7 @@ class OnyxConfluence:
except HTTPError as e:
if e.response.status_code == 404 and use_v2:
logger.warning(
"v2 spaces API returned 404, falling back to v1 API. "
"This may indicate an older Confluence Cloud instance."
"v2 spaces API returned 404, falling back to v1 API. This may indicate an older Confluence Cloud instance."
)
# Fallback to v1
yield from self._paginate_spaces_for_endpoint(
@@ -354,9 +353,7 @@ class OnyxConfluence:
if not first_space:
raise RuntimeError(
f"No spaces found at {self._url}! "
"Check your credentials and wiki_base and make sure "
"is_cloud is set correctly."
f"No spaces found at {self._url}! Check your credentials and wiki_base and make sure is_cloud is set correctly."
)
logger.info("Confluence probe succeeded.")
@@ -461,8 +458,7 @@ class OnyxConfluence:
except HTTPError as e:
delay_until = _handle_http_error(e, attempt)
logger.warning(
f"HTTPError in confluence call. "
f"Retrying in {delay_until} seconds..."
f"HTTPError in confluence call. Retrying in {delay_until} seconds..."
)
while time.monotonic() < delay_until:
# in the future, check a signal here to exit
@@ -544,8 +540,7 @@ class OnyxConfluence:
if not latest_results:
# no more results, break out of the loop
logger.info(
f"No results found for call '{temp_url_suffix}'"
"Stopping pagination."
f"No results found for call '{temp_url_suffix}'Stopping pagination."
)
found_empty_page = True
break
@@ -606,8 +601,7 @@ class OnyxConfluence:
# If that fails, raise the error
if _PROBLEMATIC_EXPANSIONS in url_suffix:
logger.warning(
f"Replacing {_PROBLEMATIC_EXPANSIONS} with {_REPLACEMENT_EXPANSIONS}"
" and trying again."
f"Replacing {_PROBLEMATIC_EXPANSIONS} with {_REPLACEMENT_EXPANSIONS} and trying again."
)
url_suffix = url_suffix.replace(
_PROBLEMATIC_EXPANSIONS,
@@ -711,8 +705,7 @@ class OnyxConfluence:
# stop paginating.
if url_suffix and not results:
logger.info(
f"No results found for call '{old_url_suffix}' despite next link "
"being present. Stopping pagination."
f"No results found for call '{old_url_suffix}' despite next link being present. Stopping pagination."
)
break
@@ -934,8 +927,7 @@ class OnyxConfluence:
logger.debug(f"jsonrpc response: {response}")
if not response.get("result"):
logger.warning(
f"No jsonrpc response for space permissions for space {space_key}"
f"\nResponse: {response}"
f"No jsonrpc response for space permissions for space {space_key}\nResponse: {response}"
)
return response.get("result", [])
@@ -978,8 +970,7 @@ def get_user_email_from_username__server(
except HTTPError as e:
status_code = e.response.status_code if e.response is not None else "N/A"
logger.warning(
f"Failed to get confluence email for {user_name}: "
f"HTTP {status_code} - {e}"
f"Failed to get confluence email for {user_name}: HTTP {status_code} - {e}"
)
# For now, we'll just return None and log a warning. This means
# we will keep retrying to get the email every group sync.
@@ -1060,7 +1051,7 @@ def extract_text_from_confluence_html(
)
if not user_id:
logger.warning(
"ri:userkey not found in ri:user element. " f"Found attrs: {user.attrs}"
f"ri:userkey not found in ri:user element. Found attrs: {user.attrs}"
)
continue
# Include @ sign for tagging, more clear for LLM

View File

@@ -155,10 +155,7 @@ def process_attachment(
)
logger.info(
f"Downloading attachment: "
f"title={attachment['title']} "
f"length={attachment_size} "
f"link={attachment_link}"
f"Downloading attachment: title={attachment['title']} length={attachment_size} link={attachment_link}"
)
# Download the attachment
@@ -368,8 +365,7 @@ def handle_confluence_rate_limit(confluence_call: F) -> F:
except requests.HTTPError as e:
delay_until = _handle_http_error(e, attempt)
logger.warning(
f"HTTPError in confluence call. "
f"Retrying in {delay_until} seconds..."
f"HTTPError in confluence call. Retrying in {delay_until} seconds..."
)
while time.monotonic() < delay_until:
# in the future, check a signal here to exit

View File

@@ -263,8 +263,7 @@ class ConnectorRunner(Generic[CT]):
f"{key}: {value}" for key, value in local_vars.items()
)
logger.error(
f"Error in connector. type: {exc_type};\n"
f"local_vars below -> \n{local_vars_str[:1024]}"
f"Error in connector. type: {exc_type};\nlocal_vars below -> \n{local_vars_str[:1024]}"
)
raise

View File

@@ -138,8 +138,7 @@ def _parse_document_source(connector_type: Any) -> DocumentSource | None:
return DocumentSource(normalized)
except ValueError:
logger.warning(
f"Invalid connector_type value: '{connector_type}' "
f"(normalized: '{normalized}')"
f"Invalid connector_type value: '{connector_type}' (normalized: '{normalized}')"
)
return None

View File

@@ -57,8 +57,7 @@ class _RateLimitDecorator:
while len(self.call_history) == self.max_calls:
sleep_time = self.sleep_time * (self.sleep_backoff**sleep_cnt)
logger.notice(
f"Rate limit exceeded for function {func.__name__}. "
f"Waiting {sleep_time} seconds before retrying."
f"Rate limit exceeded for function {func.__name__}. Waiting {sleep_time} seconds before retrying."
)
time.sleep(sleep_time)
sleep_cnt += 1

View File

@@ -132,8 +132,7 @@ class EgnyteConnector(LoadConnector, PollConnector, OAuthConnector):
egnyte_domain: str = Field(
title="Egnyte Domain",
description=(
"The domain for the Egnyte instance "
"(e.g. 'company' for company.egnyte.com)"
"The domain for the Egnyte instance (e.g. 'company' for company.egnyte.com)"
),
)

View File

@@ -202,8 +202,7 @@ def _process_file(
)
sections.append(image_section)
logger.debug(
f"Created ImageSection for embedded image {idx} "
f"in {file_name}, stored as: {stored_file_name}"
f"Created ImageSection for embedded image {idx} in {file_name}, stored as: {stored_file_name}"
)
except Exception as e:
logger.warning(
@@ -279,8 +278,7 @@ class LocalFileConnector(LoadConnector):
logger.warning(f"Failed to load metadata from file store: {e}")
elif self._zip_metadata_deprecated:
logger.warning(
"Using deprecated inline zip_metadata dict. "
"Re-upload files to use the new file store format."
"Using deprecated inline zip_metadata dict. Re-upload files to use the new file store format."
)
zip_metadata = self._zip_metadata_deprecated

View File

@@ -195,7 +195,9 @@ class FreshdeskConnector(PollConnector, LoadConnector):
self.domain = domain
def _fetch_tickets(
self, start: datetime | None = None, end: datetime | None = None # noqa: ARG002
self,
start: datetime | None = None,
end: datetime | None = None, # noqa: ARG002
) -> Iterator[List[dict]]:
"""
'end' is not currently used, so we may double fetch tickets created after the indexing

View File

@@ -865,8 +865,7 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
total_count = org.get_repos().totalCount
if total_count == 0:
raise ConnectorValidationError(
f"Found no repos for organization: {self.repo_owner}. "
"Does the credential have the right scopes?"
f"Found no repos for organization: {self.repo_owner}. Does the credential have the right scopes?"
)
except GithubException as e:
# Check for missing SSO
@@ -889,8 +888,7 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
total_count = user.get_repos().totalCount
if total_count == 0:
raise ConnectorValidationError(
f"Found no repos for user: {self.repo_owner}. "
"Does the credential have the right scopes?"
f"Found no repos for user: {self.repo_owner}. Does the credential have the right scopes?"
)
except RateLimitExceededException:

View File

@@ -95,7 +95,8 @@ def _convert_code_to_document(
# Fetch the file content using the correct branch
file_content_obj = project.files.get(
file_path=file["path"], ref=default_branch # Use the default branch
file_path=file["path"],
ref=default_branch, # Use the default branch
)
try:
file_content = file_content_obj.decode().decode("utf-8")

View File

@@ -351,9 +351,7 @@ class GmailConnector(
def primary_admin_email(self) -> str:
if self._primary_admin_email is None:
raise RuntimeError(
"Primary admin email missing, "
"should not call this property "
"before calling load_credentials"
"Primary admin email missing, should not call this property before calling load_credentials"
)
return self._primary_admin_email
@@ -361,9 +359,7 @@ class GmailConnector(
def google_domain(self) -> str:
if self._primary_admin_email is None:
raise RuntimeError(
"Primary admin email missing, "
"should not call this property "
"before calling load_credentials"
"Primary admin email missing, should not call this property before calling load_credentials"
)
return self._primary_admin_email.split("@")[-1]
@@ -371,9 +367,7 @@ class GmailConnector(
def creds(self) -> OAuthCredentials | ServiceAccountCredentials:
if self._creds is None:
raise RuntimeError(
"Creds missing, "
"should not call this property "
"before calling load_credentials"
"Creds missing, should not call this property before calling load_credentials"
)
return self._creds

View File

@@ -250,7 +250,7 @@ class GongConnector(LoadConnector, PollConnector):
f"_get_call_details_by_ids waiting to retry: "
f"wait={wait_seconds}s "
f"current_attempt={current_attempt} "
f"next_attempt={current_attempt+1} "
f"next_attempt={current_attempt + 1} "
f"max_attempts={self.MAX_CALL_DETAILS_ATTEMPTS}"
)
time.sleep(wait_seconds)
@@ -283,8 +283,7 @@ class GongConnector(LoadConnector, PollConnector):
call_time_str = call_metadata["started"]
call_title = call_metadata["title"]
logger.info(
f"{num_calls+1}: Indexing Gong call id {call_id} "
f"from {call_time_str.split('T', 1)[0]}: {call_title}"
f"{num_calls + 1}: Indexing Gong call id {call_id} from {call_time_str.split('T', 1)[0]}: {call_title}"
)
call_parties = cast(list[dict] | None, call_details.get("parties"))
@@ -352,7 +351,7 @@ class GongConnector(LoadConnector, PollConnector):
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
combined = (
f'{credentials["gong_access_key"]}:{credentials["gong_access_key_secret"]}'
f"{credentials['gong_access_key']}:{credentials['gong_access_key_secret']}"
)
self.auth_token_basic = base64.b64encode(combined.encode("utf-8")).decode(
"utf-8"

View File

@@ -309,9 +309,7 @@ class GoogleDriveConnector(
def primary_admin_email(self) -> str:
if self._primary_admin_email is None:
raise RuntimeError(
"Primary admin email missing, "
"should not call this property "
"before calling load_credentials"
"Primary admin email missing, should not call this property before calling load_credentials"
)
return self._primary_admin_email
@@ -319,9 +317,7 @@ class GoogleDriveConnector(
def google_domain(self) -> str:
if self._primary_admin_email is None:
raise RuntimeError(
"Primary admin email missing, "
"should not call this property "
"before calling load_credentials"
"Primary admin email missing, should not call this property before calling load_credentials"
)
return self._primary_admin_email.split("@")[-1]
@@ -329,9 +325,7 @@ class GoogleDriveConnector(
def creds(self) -> OAuthCredentials | ServiceAccountCredentials:
if self._creds is None:
raise RuntimeError(
"Creds missing, "
"should not call this property "
"before calling load_credentials"
"Creds missing, should not call this property before calling load_credentials"
)
return self._creds
@@ -683,8 +677,7 @@ class GoogleDriveConnector(
if best_folder is None:
best_folder = folder
logger.debug(
f"Folder {folder_id} has no parents when fetched by {email}, "
f"will try admin to check for parent access"
f"Folder {folder_id} has no parents when fetched by {email}, will try admin to check for parent access"
)
if best_folder:
@@ -694,8 +687,7 @@ class GoogleDriveConnector(
return best_folder
logger.debug(
f"All attempts failed to fetch folder {folder_id} "
f"(tried {retriever_email} and {self.primary_admin_email})"
f"All attempts failed to fetch folder {folder_id} (tried {retriever_email} and {self.primary_admin_email})"
)
return None
@@ -846,7 +838,6 @@ class GoogleDriveConnector(
# - the current user's email is in the requested emails
if curr_stage.stage == DriveRetrievalStage.MY_DRIVE_FILES:
if self.include_my_drives or user_email in self._requested_my_drive_emails:
logger.info(
f"Getting all files in my drive as '{user_email}. Resuming: {resuming}. "
f"Stage completed until: {curr_stage.completed_until}. "
@@ -1105,8 +1096,7 @@ class GoogleDriveConnector(
for user_email in all_org_emails
):
logger.info(
"some users did not complete retrieval, "
"returning checkpoint for another run"
"some users did not complete retrieval, returning checkpoint for another run"
)
return
checkpoint.completion_stage = DriveRetrievalStage.DONE
@@ -1515,7 +1505,6 @@ class GoogleDriveConnector(
)
try:
# Build permission sync context if needed
permission_sync_context = (
PermissionSyncContext(
@@ -1791,8 +1780,7 @@ class GoogleDriveConnector(
if self._primary_admin_email is None:
raise ConnectorValidationError(
"Primary admin email not found in credentials. "
"Ensure DB_CREDENTIALS_PRIMARY_ADMIN_KEY is set."
"Primary admin email not found in credentials. Ensure DB_CREDENTIALS_PRIMARY_ADMIN_KEY is set."
)
try:
@@ -1825,8 +1813,7 @@ class GoogleDriveConnector(
# Check for scope-related hints from the error message
if MISSING_SCOPES_ERROR_STR in str(e):
raise InsufficientPermissionsError(
"Google Drive credentials are missing required scopes. "
f"{ONYX_SCOPE_INSTRUCTIONS}"
f"Google Drive credentials are missing required scopes. {ONYX_SCOPE_INSTRUCTIONS}"
)
raise ConnectorValidationError(
f"Unexpected error during Google Drive validation: {e}"
@@ -1969,9 +1956,11 @@ if __name__ == "__main__":
):
if num % 200 == 0:
f.write(f"Processed {num} files\n")
f.write(f"Max file size: {max_fsize/1000_000:.2f} MB\n")
f.write(f"Max file size: {max_fsize / 1000_000:.2f} MB\n")
f.write(f"Time so far: {time.time() - start_time:.2f} seconds\n")
f.write(f"Docs per minute: {num/(time.time() - start_time)*60:.2f}\n")
f.write(
f"Docs per minute: {num / (time.time() - start_time) * 60:.2f}\n"
)
biggest_fsize = max(biggest_fsize, max_fsize)
max_fsize = 0
if isinstance(doc_or_failure, Document):
@@ -1979,5 +1968,5 @@ if __name__ == "__main__":
elif isinstance(doc_or_failure, ConnectorFailure):
num_errors += 1
print(f"Num errors: {num_errors}")
print(f"Biggest file size: {biggest_fsize/1000_000:.2f} MB")
print(f"Biggest file size: {biggest_fsize / 1000_000:.2f} MB")
print(f"Time taken: {time.time() - start_time:.2f} seconds")

View File

@@ -231,8 +231,7 @@ def onyx_document_id_from_drive_file(file: GoogleDriveFileType) -> str:
else:
link = template.format(file_id)
logger.debug(
"Missing webViewLink for Google Drive file with id %s. "
"Falling back to constructed link %s",
"Missing webViewLink for Google Drive file with id %s. Falling back to constructed link %s",
file_id,
link,
)
@@ -643,8 +642,7 @@ def _convert_drive_item_to_document(
sections = cast(list[TextSection | ImageSection], doc_sections)
if any(SMART_CHIP_CHAR in section.text for section in doc_sections):
logger.debug(
f"found smart chips in {file.get('name')},"
" aligning with basic sections"
f"found smart chips in {file.get('name')}, aligning with basic sections"
)
basic_sections = _download_and_extract_sections_basic(
file, _get_drive_service(), allow_images, size_threshold

View File

@@ -112,8 +112,7 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync)
]
if not spots_to_process:
raise ValueError(
f"No valid spots found in Highspot. Found {spots} "
f"but {self.spot_names} were requested."
f"No valid spots found in Highspot. Found {spots} but {self.spot_names} were requested."
)
return spots_to_process

View File

@@ -59,7 +59,7 @@ class BaseConnector(abc.ABC, Generic[CT]):
elif isinstance(metadata_value, list):
if not all([isinstance(val, str) for val in metadata_value]):
raise RuntimeError(custom_parser_req_msg)
metadata_lines.append(f'{metadata_key}: {", ".join(metadata_value)}')
metadata_lines.append(f"{metadata_key}: {', '.join(metadata_value)}")
else:
raise RuntimeError(custom_parser_req_msg)
return metadata_lines

View File

@@ -193,8 +193,7 @@ def _handle_jira_search_error(e: Exception, jql: str) -> None:
if status_code == 400:
if "does not exist for the field 'project'" in error_text:
raise ConnectorValidationError(
f"The specified Jira project does not exist or you don't have access to it. "
f"JQL query: {jql}. Error: {error_text}"
f"The specified Jira project does not exist or you don't have access to it. JQL query: {jql}. Error: {error_text}"
)
raise ConnectorValidationError(
f"Invalid JQL query. JQL: {jql}. Error: {error_text}"
@@ -314,8 +313,7 @@ def _perform_jql_search_v2(
Unfortunately, jira server/data center will forever use the v2 APIs that are now deprecated.
"""
logger.debug(
f"Fetching Jira issues with JQL: {jql}, "
f"starting at {start}, max results: {max_results}"
f"Fetching Jira issues with JQL: {jql}, starting at {start}, max results: {max_results}"
)
try:
issues = jira_client.search_issues(
@@ -366,8 +364,7 @@ def process_jira_issue(
# Check ticket size
if len(ticket_content.encode("utf-8")) > JIRA_CONNECTOR_MAX_TICKET_SIZE:
logger.info(
f"Skipping {issue.key} because it exceeds the maximum size of "
f"{JIRA_CONNECTOR_MAX_TICKET_SIZE} bytes."
f"Skipping {issue.key} because it exceeds the maximum size of {JIRA_CONNECTOR_MAX_TICKET_SIZE} bytes."
)
return None

View File

@@ -147,7 +147,8 @@ class MediaWikiConnector(LoadConnector, PollConnector):
self.pages.append(pywikibot.Page(self.site, page))
def load_credentials(
self, credentials: dict[str, Any] # noqa: ARG002
self,
credentials: dict[str, Any], # noqa: ARG002
) -> dict[str, Any] | None:
"""Load credentials for a MediaWiki site.

View File

@@ -81,8 +81,7 @@ def resolve_microsoft_environment(
if env is None:
known = ", ".join(sorted(_GRAPH_HOST_INDEX))
raise ConnectorValidationError(
f"Unsupported Microsoft Graph API host '{graph_api_host}'. "
f"Recognised hosts: {known}"
f"Unsupported Microsoft Graph API host '{graph_api_host}'. Recognised hosts: {known}"
)
if env.authority_host != authority_host:

View File

@@ -46,7 +46,8 @@ class MockConnector(CheckpointedConnectorWithPermSync[MockConnectorCheckpoint]):
self.current_yield_index: int = 0
def load_credentials(
self, credentials: dict[str, Any] # noqa: ARG002
self,
credentials: dict[str, Any], # noqa: ARG002
) -> dict[str, Any] | None:
response = self.client.get(self._get_mock_server_url("get-documents"))
response.raise_for_status()

View File

@@ -148,8 +148,7 @@ class NotionConnector(LoadConnector, PollConnector):
if len(cleaned) == 32 and re.fullmatch(r"[0-9a-fA-F]{32}", cleaned):
normalized_uuid = (
f"{cleaned[0:8]}-{cleaned[8:12]}-{cleaned[12:16]}-"
f"{cleaned[16:20]}-{cleaned[20:]}"
f"{cleaned[0:8]}-{cleaned[8:12]}-{cleaned[12:16]}-{cleaned[16:20]}-{cleaned[20:]}"
).lower()
return NormalizationResult(
normalized_url=normalized_uuid, use_default=False
@@ -162,8 +161,7 @@ class NotionConnector(LoadConnector, PollConnector):
candidate = params[key][0].replace("-", "")
if len(candidate) == 32 and re.fullmatch(r"[0-9a-fA-F]{32}", candidate):
normalized_uuid = (
f"{candidate[0:8]}-{candidate[8:12]}-{candidate[12:16]}-"
f"{candidate[16:20]}-{candidate[20:]}"
f"{candidate[0:8]}-{candidate[8:12]}-{candidate[12:16]}-{candidate[16:20]}-{candidate[20:]}"
).lower()
return NormalizationResult(
normalized_url=normalized_uuid, use_default=False
@@ -883,8 +881,7 @@ class NotionConnector(LoadConnector, PollConnector):
def _recursive_load(self) -> GenerateDocumentsOutput:
if self.root_page_id is None or not self.recursive_index_enabled:
raise RuntimeError(
"Recursive page lookup is not enabled, but we are trying to "
"recursively load pages. This should never happen."
"Recursive page lookup is not enabled, but we are trying to recursively load pages. This should never happen."
)
# Yield workspace hierarchy node FIRST before any pages
@@ -893,8 +890,7 @@ class NotionConnector(LoadConnector, PollConnector):
yield [workspace_node]
logger.info(
"Recursively loading pages from Notion based on root page with "
f"ID: {self.root_page_id}"
f"Recursively loading pages from Notion based on root page with ID: {self.root_page_id}"
)
pages = [self._fetch_page(page_id=self.root_page_id)]
yield from batch_generator(self._read_pages(pages), self.batch_size)
@@ -902,7 +898,7 @@ class NotionConnector(LoadConnector, PollConnector):
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
"""Applies integration token to headers"""
self.headers["Authorization"] = (
f'Bearer {credentials["notion_integration_token"]}'
f"Bearer {credentials['notion_integration_token']}"
)
return None
@@ -1030,8 +1026,7 @@ class NotionConnector(LoadConnector, PollConnector):
)
elif status_code == 429:
raise ConnectorValidationError(
"Validation failed due to Notion rate-limits being exceeded (HTTP 429). "
"Please try again later."
"Validation failed due to Notion rate-limits being exceeded (HTTP 429). Please try again later."
)
else:
raise UnexpectedValidationError(

View File

@@ -75,8 +75,7 @@ class ProductboardConnector(PollConnector):
# The delay in this retry should handle this while this is
# not parallelized.
raise ProductboardApiError(
"Failed to fetch from productboard - status code:"
f" {response.status_code} - response: {response.text}"
f"Failed to fetch from productboard - status code: {response.status_code} - response: {response.text}"
)
return response.json()

Some files were not shown because too many files have changed in this diff Show More