Compare commits

..

1 Commits

Author SHA1 Message Date
Nik
649a2af6be refactor(be): replace HTTPException with OnyxError in small API files 2026-03-09 15:48:08 -07:00
1103 changed files with 11716 additions and 55258 deletions

View File

@@ -1,186 +0,0 @@
---
name: onyx-cli
description: Query the Onyx knowledge base using the onyx-cli command. Use when the user wants to search company documents, ask questions about internal knowledge, query connected data sources, or look up information stored in Onyx.
---
# Onyx CLI — Agent Tool
Onyx is an enterprise search and Gen-AI platform that connects to company documents, apps, and people. The `onyx-cli` CLI provides non-interactive commands to query the Onyx knowledge base and list available agents.
## Prerequisites
### 1. Check if installed
```bash
which onyx-cli
```
### 2. Install (if needed)
**Primary — pip:**
```bash
pip install onyx-cli
```
**From source (Go):**
```bash
cd cli && go build -o onyx-cli . && sudo mv onyx-cli /usr/local/bin/
```
### 3. Check if configured
```bash
onyx-cli validate-config
```
This checks the config file exists, API key is present, and tests the server connection via `/api/me`. Exit code 0 on success, non-zero with a descriptive error on failure.
If unconfigured, you have two options:
**Option A — Interactive setup (requires user input):**
```bash
onyx-cli configure
```
This prompts for the Onyx server URL and API key, tests the connection, and saves config.
**Option B — Environment variables (non-interactive, preferred for agents):**
```bash
export ONYX_SERVER_URL="https://your-onyx-server.com" # default: https://cloud.onyx.app
export ONYX_API_KEY="your-api-key"
```
Environment variables override the config file. If these are set, no config file is needed.
| Variable | Required | Description |
|----------|----------|-------------|
| `ONYX_SERVER_URL` | No | Onyx server base URL (default: `https://cloud.onyx.app`) |
| `ONYX_API_KEY` | Yes | API key for authentication |
| `ONYX_PERSONA_ID` | No | Default agent/persona ID |
If neither the config file nor environment variables are set, tell the user that `onyx-cli` needs to be configured and ask them to either:
- Run `onyx-cli configure` interactively, or
- Set `ONYX_SERVER_URL` and `ONYX_API_KEY` environment variables
## Commands
### Validate configuration
```bash
onyx-cli validate-config
```
Checks config file exists, API key is present, and tests the server connection. Use this before `ask` or `agents` to confirm the CLI is properly set up.
### List available agents
```bash
onyx-cli agents
```
Prints a table of agent IDs, names, and descriptions. Use `--json` for structured output:
```bash
onyx-cli agents --json
```
Use agent IDs with `ask --agent-id` to query a specific agent.
### Basic query (plain text output)
```bash
onyx-cli ask "What is our company's PTO policy?"
```
Streams the answer as plain text to stdout. Exit code 0 on success, non-zero on error.
### JSON output (structured events)
```bash
onyx-cli ask --json "What authentication methods do we support?"
```
Outputs JSON-encoded parsed stream events (one object per line). Key event objects include message deltas, stop, errors, search-start, and citation payloads.
Each line is a JSON object with this envelope:
```json
{"type": "<event_type>", "event": { ... }}
```
| Event Type | Description |
|------------|-------------|
| `message_delta` | Content token — concatenate all `content` fields for the full answer |
| `stop` | Stream complete |
| `error` | Error with `error` message field |
| `search_tool_start` | Onyx started searching documents |
| `citation_info` | Source citation — see shape below |
`citation_info` event shape:
```json
{
"type": "citation_info",
"event": {
"citation_number": 1,
"document_id": "abc123def456",
"placement": {"turn_index": 0, "tab_index": 0, "sub_turn_index": null}
}
}
```
`placement` is metadata about where in the conversation the citation appeared and can be ignored for most use cases.
### Specify an agent
```bash
onyx-cli ask --agent-id 5 "Summarize our Q4 roadmap"
```
Uses a specific Onyx agent/persona instead of the default.
### All flags
| Flag | Type | Description |
|------|------|-------------|
| `--agent-id` | int | Agent ID to use (overrides default) |
| `--json` | bool | Output raw NDJSON events instead of plain text |
## Statelessness
Each `onyx-cli ask` call creates an independent chat session. There is no built-in way to chain context across multiple `ask` invocations — every call starts fresh. If you need multi-turn conversation with memory, use the interactive TUI (`onyx-cli` or `onyx-cli chat`) instead.
## When to Use
Use `onyx-cli ask` when:
- The user asks about company-specific information (policies, docs, processes)
- You need to search internal knowledge bases or connected data sources
- The user references Onyx, asks you to "search Onyx", or wants to query their documents
- You need context from company wikis, Confluence, Google Drive, Slack, or other connected sources
Do NOT use when:
- The question is about general programming knowledge (use your own knowledge)
- The user is asking about code in the current repository (use grep/read tools)
- The user hasn't mentioned Onyx and the question doesn't require internal company data
## Examples
```bash
# Simple question
onyx-cli ask "What are the steps to deploy to production?"
# Get structured output for parsing
onyx-cli ask --json "List all active API integrations"
# Use a specialized agent
onyx-cli ask --agent-id 3 "What were the action items from last week's standup?"
# Pipe the answer into another command
onyx-cli ask "What is the database schema for users?" | head -20
```

3
.github/CODEOWNERS vendored
View File

@@ -8,6 +8,3 @@
# Agent context files
/CLAUDE.md @Weves
/AGENTS.md @Weves
# Beta cherry-pick workflow owners
/.github/workflows/post-merge-beta-cherry-pick.yml @justin-tahara @jmelahman

View File

@@ -1,14 +1,11 @@
name: "Slack Notify"
description: "Sends a Slack notification for workflow events"
name: "Slack Notify on Failure"
description: "Sends a Slack notification when a workflow fails"
inputs:
webhook-url:
description: "Slack webhook URL (can also use SLACK_WEBHOOK_URL env var)"
required: false
details:
description: "Additional message body content"
required: false
failed-jobs:
description: "Deprecated alias for details"
description: "List of failed job names (newline-separated)"
required: false
title:
description: "Title for the notification"
@@ -24,7 +21,6 @@ runs:
shell: bash
env:
SLACK_WEBHOOK_URL: ${{ inputs.webhook-url }}
DETAILS: ${{ inputs.details }}
FAILED_JOBS: ${{ inputs.failed-jobs }}
TITLE: ${{ inputs.title }}
REF_NAME: ${{ inputs.ref-name }}
@@ -48,18 +44,6 @@ runs:
REF_NAME="$GITHUB_REF_NAME"
fi
if [ -z "$DETAILS" ]; then
DETAILS="$FAILED_JOBS"
fi
normalize_multiline() {
printf '%s' "$1" | awk 'BEGIN { ORS=""; first=1 } { if (!first) printf "\\n"; printf "%s", $0; first=0 }'
}
DETAILS="$(normalize_multiline "$DETAILS")"
REF_NAME="$(normalize_multiline "$REF_NAME")"
TITLE="$(normalize_multiline "$TITLE")"
# Escape JSON special characters
escape_json() {
local input="$1"
@@ -75,12 +59,12 @@ runs:
}
REF_NAME_ESC=$(escape_json "$REF_NAME")
DETAILS_ESC=$(escape_json "$DETAILS")
FAILED_JOBS_ESC=$(escape_json "$FAILED_JOBS")
WORKFLOW_URL_ESC=$(escape_json "$WORKFLOW_URL")
TITLE_ESC=$(escape_json "$TITLE")
# Build JSON payload piece by piece
# Note: DETAILS_ESC already contains \n sequences that should remain as \n in JSON
# Note: FAILED_JOBS_ESC already contains \n sequences that should remain as \n in JSON
PAYLOAD="{"
PAYLOAD="${PAYLOAD}\"text\":\"${TITLE_ESC}\","
PAYLOAD="${PAYLOAD}\"blocks\":[{"
@@ -95,10 +79,10 @@ runs:
PAYLOAD="${PAYLOAD}{\"type\":\"mrkdwn\",\"text\":\"*Run ID:*\\n#${RUN_NUMBER}\"}"
PAYLOAD="${PAYLOAD}]"
PAYLOAD="${PAYLOAD}}"
if [ -n "$DETAILS" ]; then
if [ -n "$FAILED_JOBS" ]; then
PAYLOAD="${PAYLOAD},{"
PAYLOAD="${PAYLOAD}\"type\":\"section\","
PAYLOAD="${PAYLOAD}\"text\":{\"type\":\"mrkdwn\",\"text\":\"${DETAILS_ESC}\"}"
PAYLOAD="${PAYLOAD}\"text\":{\"type\":\"mrkdwn\",\"text\":\"*Failed Jobs:*\\n${FAILED_JOBS_ESC}\"}"
PAYLOAD="${PAYLOAD}}"
fi
PAYLOAD="${PAYLOAD},{"
@@ -115,3 +99,4 @@ runs:
curl -X POST -H 'Content-type: application/json' \
--data "$PAYLOAD" \
"$SLACK_WEBHOOK_URL"

View File

@@ -29,32 +29,20 @@ jobs:
build-backend-craft: ${{ steps.check.outputs.build-backend-craft }}
build-model-server: ${{ steps.check.outputs.build-model-server }}
is-cloud-tag: ${{ steps.check.outputs.is-cloud-tag }}
is-stable: ${{ steps.check.outputs.is-stable }}
is-beta: ${{ steps.check.outputs.is-beta }}
is-stable-standalone: ${{ steps.check.outputs.is-stable-standalone }}
is-beta-standalone: ${{ steps.check.outputs.is-beta-standalone }}
is-latest: ${{ steps.check.outputs.is-latest }}
is-craft-latest: ${{ steps.check.outputs.is-craft-latest }}
is-test-run: ${{ steps.check.outputs.is-test-run }}
sanitized-tag: ${{ steps.check.outputs.sanitized-tag }}
short-sha: ${{ steps.check.outputs.short-sha }}
steps:
- name: Checkout (for git tags)
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
fetch-depth: 0
fetch-tags: true
- name: Setup uv
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
with:
version: "0.9.9"
enable-cache: false
- name: Check which components to build and version info
id: check
env:
EVENT_NAME: ${{ github.event_name }}
run: |
set -eo pipefail
TAG="${GITHUB_REF_NAME}"
# Sanitize tag name by replacing slashes with hyphens (for Docker tag compatibility)
SANITIZED_TAG=$(echo "$TAG" | tr '/' '-')
@@ -66,8 +54,9 @@ jobs:
IS_VERSION_TAG=false
IS_STABLE=false
IS_BETA=false
IS_STABLE_STANDALONE=false
IS_BETA_STANDALONE=false
IS_LATEST=false
IS_CRAFT_LATEST=false
IS_PROD_TAG=false
IS_TEST_RUN=false
BUILD_DESKTOP=false
@@ -78,6 +67,9 @@ jobs:
BUILD_MODEL_SERVER=true
# Determine tag type based on pattern matching (do regex checks once)
if [[ "$TAG" == craft-* ]]; then
IS_CRAFT_LATEST=true
fi
if [[ "$TAG" == *cloud* ]]; then
IS_CLOUD=true
fi
@@ -105,28 +97,20 @@ jobs:
fi
fi
# Craft-latest builds backend with Craft enabled
if [[ "$IS_CRAFT_LATEST" == "true" ]]; then
BUILD_BACKEND_CRAFT=true
BUILD_BACKEND=false
fi
# Standalone version checks (for backend/model-server - version excluding cloud tags)
if [[ "$IS_STABLE" == "true" ]] && [[ "$IS_CLOUD" != "true" ]]; then
IS_STABLE_STANDALONE=true
fi
if [[ "$IS_BETA" == "true" ]] && [[ "$IS_CLOUD" != "true" ]]; then
IS_BETA_STANDALONE=true
fi
# Determine if this tag should get the "latest" Docker tag.
# Only the highest semver stable tag (vX.Y.Z exactly) gets "latest".
if [[ "$IS_STABLE" == "true" ]]; then
HIGHEST_STABLE=$(uv run --no-sync --with onyx-devtools ods latest-stable-tag) || {
echo "::error::Failed to determine highest stable tag via 'ods latest-stable-tag'"
exit 1
}
if [[ "$TAG" == "$HIGHEST_STABLE" ]]; then
IS_LATEST=true
fi
fi
# Build craft-latest backend alongside the regular latest.
if [[ "$IS_LATEST" == "true" ]]; then
BUILD_BACKEND_CRAFT=true
fi
# Determine if this is a production tag
# Production tags are: version tags (v1.2.3*) or nightly tags
if [[ "$IS_VERSION_TAG" == "true" ]] || [[ "$IS_NIGHTLY" == "true" ]]; then
@@ -145,9 +129,11 @@ jobs:
echo "build-backend-craft=$BUILD_BACKEND_CRAFT"
echo "build-model-server=$BUILD_MODEL_SERVER"
echo "is-cloud-tag=$IS_CLOUD"
echo "is-stable=$IS_STABLE"
echo "is-beta=$IS_BETA"
echo "is-stable-standalone=$IS_STABLE_STANDALONE"
echo "is-beta-standalone=$IS_BETA_STANDALONE"
echo "is-latest=$IS_LATEST"
echo "is-craft-latest=$IS_CRAFT_LATEST"
echo "is-test-run=$IS_TEST_RUN"
echo "sanitized-tag=$SANITIZED_TAG"
echo "short-sha=$SHORT_SHA"
@@ -165,7 +151,7 @@ jobs:
fetch-depth: 0
- name: Setup uv
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # ratchet:astral-sh/setup-uv@v7
with:
version: "0.9.9"
# NOTE: This isn't caching much and zizmor suggests this could be poisoned, so disable.
@@ -196,52 +182,8 @@ jobs:
title: "🚨 Version Tag Check Failed"
ref-name: ${{ github.ref_name }}
# Create GitHub release first, before desktop builds start.
# This ensures all desktop matrix jobs upload to the same release instead of
# racing to create duplicate releases.
create-release:
needs: determine-builds
if: needs.determine-builds.outputs.build-desktop == 'true'
runs-on: ubuntu-slim
timeout-minutes: 10
permissions:
contents: write
outputs:
release-id: ${{ steps.create-release.outputs.id }}
steps:
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- name: Determine release tag
id: release-tag
env:
IS_TEST_RUN: ${{ needs.determine-builds.outputs.is-test-run }}
SHORT_SHA: ${{ needs.determine-builds.outputs.short-sha }}
run: |
if [ "${IS_TEST_RUN}" == "true" ]; then
echo "tag=v0.0.0-dev+${SHORT_SHA}" >> "$GITHUB_OUTPUT"
else
echo "tag=${GITHUB_REF_NAME}" >> "$GITHUB_OUTPUT"
fi
- name: Create GitHub Release
id: create-release
uses: softprops/action-gh-release@da05d552573ad5aba039eaac05058a918a7bf631 # ratchet:softprops/action-gh-release@v2
with:
tag_name: ${{ steps.release-tag.outputs.tag }}
name: ${{ steps.release-tag.outputs.tag }}
body: "See the assets to download this version and install."
draft: true
prerelease: false
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
build-desktop:
needs:
- determine-builds
- create-release
needs: determine-builds
if: needs.determine-builds.outputs.build-desktop == 'true'
permissions:
id-token: write
@@ -266,12 +208,12 @@ jobs:
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6.0.2
with:
# NOTE: persist-credentials is needed for tauri-action to upload assets to GitHub releases.
# NOTE: persist-credentials is needed for tauri-action to create GitHub releases.
persist-credentials: true # zizmor: ignore[artipacked]
- name: Configure AWS credentials
if: startsWith(matrix.platform, 'macos-')
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -411,9 +353,11 @@ jobs:
APPLE_SIGNING_IDENTITY: ${{ env.CERT_ID }}
APPLE_TEAM_ID: ${{ env.APPLE_TEAM_ID }}
with:
# Use the release created by the create-release job to avoid race conditions
# when multiple matrix jobs try to create/update the same release simultaneously
releaseId: ${{ needs.create-release.outputs.release-id }}
tagName: ${{ needs.determine-builds.outputs.is-test-run != 'true' && 'v__VERSION__' || format('v0.0.0-dev+{0}', needs.determine-builds.outputs.short-sha) }}
releaseName: ${{ needs.determine-builds.outputs.is-test-run != 'true' && 'v__VERSION__' || format('v0.0.0-dev+{0}', needs.determine-builds.outputs.short-sha) }}
releaseBody: "See the assets to download this version and install."
releaseDraft: true
prerelease: false
assetNamePattern: "[name]_[arch][ext]"
args: ${{ matrix.args }}
@@ -440,7 +384,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -514,7 +458,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -583,7 +527,7 @@ jobs:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -614,7 +558,7 @@ jobs:
latest=false
tags: |
type=raw,value=${{ needs.determine-builds.outputs.is-test-run == 'true' && format('web-{0}', needs.determine-builds.outputs.sanitized-tag) || github.ref_name }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-latest == 'true' && 'latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-stable == 'true' && 'latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && env.EDGE_TAG == 'true' && 'edge' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-beta == 'true' && 'beta' || '' }}
@@ -653,7 +597,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -735,7 +679,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -812,7 +756,7 @@ jobs:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -879,7 +823,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -952,7 +896,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -1020,7 +964,7 @@ jobs:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -1051,7 +995,7 @@ jobs:
latest=false
tags: |
type=raw,value=${{ needs.determine-builds.outputs.is-test-run == 'true' && format('backend-{0}', needs.determine-builds.outputs.sanitized-tag) || github.ref_name }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-latest == 'true' && 'latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-stable-standalone == 'true' && 'latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && env.EDGE_TAG == 'true' && 'edge' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-beta-standalone == 'true' && 'beta' || '' }}
@@ -1090,7 +1034,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -1163,7 +1107,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -1232,7 +1176,7 @@ jobs:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -1302,7 +1246,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -1382,7 +1326,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -1456,7 +1400,7 @@ jobs:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -1487,7 +1431,7 @@ jobs:
latest=false
tags: |
type=raw,value=${{ needs.determine-builds.outputs.is-test-run == 'true' && format('model-server-{0}', needs.determine-builds.outputs.sanitized-tag) || github.ref_name }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-latest == 'true' && 'latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-stable-standalone == 'true' && 'latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && env.EDGE_TAG == 'true' && 'edge' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-beta-standalone == 'true' && 'beta' || '' }}
@@ -1521,7 +1465,7 @@ jobs:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -1576,7 +1520,7 @@ jobs:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -1636,7 +1580,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -1693,7 +1637,7 @@ jobs:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2

View File

@@ -15,8 +15,7 @@ permissions:
jobs:
provider-chat-test:
uses: ./.github/workflows/reusable-nightly-llm-provider-chat.yml
secrets:
AWS_OIDC_ROLE_ARN: ${{ secrets.AWS_OIDC_ROLE_ARN }}
secrets: inherit
permissions:
contents: read
id-token: write

View File

@@ -1,112 +1,65 @@
name: Post-Merge Beta Cherry-Pick
on:
pull_request_target:
types:
- closed
push:
branches:
- main
# SECURITY NOTE:
# This workflow intentionally uses pull_request_target so post-merge automation can
# use base-repo credentials. Do not checkout PR head refs in this workflow
# (e.g. github.event.pull_request.head.sha). Only trusted base refs are allowed.
permissions:
contents: read
contents: write
pull-requests: write
jobs:
resolve-cherry-pick-request:
if: >-
github.event.pull_request.merged == true
&& github.event.pull_request.base.ref == 'main'
&& github.event.pull_request.head.repo.full_name == github.repository
cherry-pick-to-latest-release:
outputs:
should_cherrypick: ${{ steps.gate.outputs.should_cherrypick }}
pr_number: ${{ steps.gate.outputs.pr_number }}
merge_commit_sha: ${{ steps.gate.outputs.merge_commit_sha }}
merged_by: ${{ steps.gate.outputs.merged_by }}
gate_error: ${{ steps.gate.outputs.gate_error }}
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Resolve merged PR and checkbox state
id: gate
env:
GH_TOKEN: ${{ github.token }}
PR_NUMBER: ${{ github.event.pull_request.number }}
# SECURITY: keep PR body in env/plain-text handling; avoid directly
# inlining github.event.pull_request.body into shell commands.
PR_BODY: ${{ github.event.pull_request.body }}
MERGE_COMMIT_SHA: ${{ github.event.pull_request.merge_commit_sha }}
MERGED_BY: ${{ github.event.pull_request.merged_by.login }}
# Explicit merger allowlist used because pull_request_target runs with
# the default GITHUB_TOKEN, which cannot reliably read org/team
# membership for this repository context.
ALLOWED_MERGERS: |
acaprau
bo-onyx
danelegend
duo-onyx
evan-onyx
jessicasingh7
jmelahman
joachim-danswer
justin-tahara
nmgarza5
raunakab
rohoswagger
subash-mohan
trial2onyx
wenxi-onyx
weves
yuhongsun96
run: |
echo "pr_number=${PR_NUMBER}" >> "$GITHUB_OUTPUT"
echo "merged_by=${MERGED_BY}" >> "$GITHUB_OUTPUT"
if ! echo "${PR_BODY}" | grep -qiE "\\[x\\][[:space:]]*(\\[[^]]+\\][[:space:]]*)?Please cherry-pick this PR to the latest release version"; then
echo "should_cherrypick=false" >> "$GITHUB_OUTPUT"
echo "Cherry-pick checkbox not checked for PR #${PR_NUMBER}. Skipping."
exit 0
fi
# Keep should_cherrypick output before any possible exit 1 below so
# notify-slack can still gate on this output even if this job fails.
echo "should_cherrypick=true" >> "$GITHUB_OUTPUT"
echo "Cherry-pick checkbox checked for PR #${PR_NUMBER}."
if [ -z "${MERGE_COMMIT_SHA}" ] || [ "${MERGE_COMMIT_SHA}" = "null" ]; then
echo "gate_error=missing-merge-commit-sha" >> "$GITHUB_OUTPUT"
echo "::error::PR #${PR_NUMBER} requested cherry-pick, but merge_commit_sha is missing."
exit 1
fi
echo "merge_commit_sha=${MERGE_COMMIT_SHA}" >> "$GITHUB_OUTPUT"
normalized_merged_by="$(printf '%s' "${MERGED_BY}" | tr '[:upper:]' '[:lower:]')"
normalized_allowed_mergers="$(printf '%s\n' "${ALLOWED_MERGERS}" | tr '[:upper:]' '[:lower:]')"
if ! printf '%s\n' "${normalized_allowed_mergers}" | grep -Fxq "${normalized_merged_by}"; then
echo "gate_error=not-allowed-merger" >> "$GITHUB_OUTPUT"
echo "::error::${MERGED_BY} is not in the explicit cherry-pick merger allowlist. Failing cherry-pick gate."
exit 1
fi
exit 0
cherry-pick-to-latest-release:
needs:
- resolve-cherry-pick-request
if: needs.resolve-cherry-pick-request.outputs.should_cherrypick == 'true' && needs.resolve-cherry-pick-request.result == 'success'
permissions:
contents: write
pull-requests: write
outputs:
cherry_pick_pr_url: ${{ steps.run_cherry_pick.outputs.pr_url }}
cherry_pick_reason: ${{ steps.run_cherry_pick.outputs.reason }}
cherry_pick_details: ${{ steps.run_cherry_pick.outputs.details }}
runs-on: ubuntu-latest
timeout-minutes: 45
steps:
- name: Resolve merged PR and checkbox state
id: gate
env:
GH_TOKEN: ${{ github.token }}
run: |
# For the commit that triggered this workflow (HEAD on main), fetch all
# associated PRs and keep only the PR that was actually merged into main
# with this exact merge commit SHA.
pr_numbers="$(gh api "repos/${GITHUB_REPOSITORY}/commits/${GITHUB_SHA}/pulls" | jq -r --arg sha "${GITHUB_SHA}" '.[] | select(.merged_at != null and .base.ref == "main" and .merge_commit_sha == $sha) | .number')"
match_count="$(printf '%s\n' "$pr_numbers" | sed '/^[[:space:]]*$/d' | wc -l | tr -d ' ')"
pr_number="$(printf '%s\n' "$pr_numbers" | sed '/^[[:space:]]*$/d' | head -n 1)"
if [ "${match_count}" -gt 1 ]; then
echo "::warning::Multiple merged PRs matched commit ${GITHUB_SHA}. Using PR #${pr_number}."
fi
if [ -z "$pr_number" ]; then
echo "No merged PR associated with commit ${GITHUB_SHA}; skipping."
echo "should_cherrypick=false" >> "$GITHUB_OUTPUT"
exit 0
fi
# Read the PR once so we can gate behavior and infer preferred actor.
pr_json="$(gh api "repos/${GITHUB_REPOSITORY}/pulls/${pr_number}")"
pr_body="$(printf '%s' "$pr_json" | jq -r '.body // ""')"
merged_by="$(printf '%s' "$pr_json" | jq -r '.merged_by.login // ""')"
echo "pr_number=$pr_number" >> "$GITHUB_OUTPUT"
echo "merged_by=$merged_by" >> "$GITHUB_OUTPUT"
if echo "$pr_body" | grep -qiE "\\[x\\][[:space:]]*(\\[[^]]+\\][[:space:]]*)?Please cherry-pick this PR to the latest release version"; then
echo "should_cherrypick=true" >> "$GITHUB_OUTPUT"
echo "Cherry-pick checkbox checked for PR #${pr_number}."
exit 0
fi
echo "should_cherrypick=false" >> "$GITHUB_OUTPUT"
echo "Cherry-pick checkbox not checked for PR #${pr_number}. Skipping."
- name: Checkout repository
# SECURITY: keep checkout pinned to trusted base branch; do not switch to PR head refs.
if: steps.gate.outputs.should_cherrypick == 'true'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
fetch-depth: 0
@@ -114,44 +67,34 @@ jobs:
ref: main
- name: Install the latest version of uv
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
if: steps.gate.outputs.should_cherrypick == 'true'
uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"
- name: Configure git identity
if: steps.gate.outputs.should_cherrypick == 'true'
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
- name: Create cherry-pick PR to latest release
id: run_cherry_pick
if: steps.gate.outputs.should_cherrypick == 'true'
continue-on-error: true
env:
GH_TOKEN: ${{ github.token }}
GITHUB_TOKEN: ${{ github.token }}
CHERRY_PICK_ASSIGNEE: ${{ needs.resolve-cherry-pick-request.outputs.merged_by }}
MERGE_COMMIT_SHA: ${{ needs.resolve-cherry-pick-request.outputs.merge_commit_sha }}
CHERRY_PICK_ASSIGNEE: ${{ steps.gate.outputs.merged_by }}
run: |
set -o pipefail
output_file="$(mktemp)"
set +e
uv run --no-sync --with onyx-devtools ods cherry-pick "${MERGE_COMMIT_SHA}" --yes --no-verify 2>&1 | tee "$output_file"
pipe_statuses=("${PIPESTATUS[@]}")
exit_code="${pipe_statuses[0]}"
tee_exit="${pipe_statuses[1]:-0}"
set -e
if [ "${tee_exit}" -ne 0 ]; then
echo "status=failure" >> "$GITHUB_OUTPUT"
echo "reason=output-capture-failed" >> "$GITHUB_OUTPUT"
echo "::error::tee failed to capture cherry-pick output (exit ${tee_exit}); cannot classify result."
exit 1
fi
uv run --no-sync --with onyx-devtools ods cherry-pick "${GITHUB_SHA}" --yes --no-verify 2>&1 | tee "$output_file"
exit_code="${PIPESTATUS[0]}"
if [ "${exit_code}" -eq 0 ]; then
pr_url="$(sed -n 's/^.*PR created successfully: \(https:\/\/github\.com\/[^[:space:]]\+\/pull\/[0-9]\+\).*$/\1/p' "$output_file" | tail -n 1)"
echo "status=success" >> "$GITHUB_OUTPUT"
if [ -n "${pr_url}" ]; then
echo "pr_url=${pr_url}" >> "$GITHUB_OUTPUT"
fi
exit 0
fi
@@ -170,66 +113,17 @@ jobs:
} >> "$GITHUB_OUTPUT"
- name: Mark workflow as failed if cherry-pick failed
if: steps.run_cherry_pick.outputs.status == 'failure'
if: steps.gate.outputs.should_cherrypick == 'true' && steps.run_cherry_pick.outputs.status == 'failure'
env:
CHERRY_PICK_REASON: ${{ steps.run_cherry_pick.outputs.reason }}
run: |
echo "::error::Automated cherry-pick failed (${CHERRY_PICK_REASON})."
exit 1
notify-slack-on-cherry-pick-success:
needs:
- resolve-cherry-pick-request
- cherry-pick-to-latest-release
if: needs.resolve-cherry-pick-request.outputs.should_cherrypick == 'true' && needs.resolve-cherry-pick-request.result == 'success' && needs.cherry-pick-to-latest-release.result == 'success'
runs-on: ubuntu-slim
timeout-minutes: 10
steps:
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- name: Fail if Slack webhook secret is missing
env:
CHERRY_PICK_PRS_WEBHOOK: ${{ secrets.CHERRY_PICK_PRS_WEBHOOK }}
run: |
if [ -z "${CHERRY_PICK_PRS_WEBHOOK}" ]; then
echo "::error::CHERRY_PICK_PRS_WEBHOOK is not configured."
exit 1
fi
- name: Build cherry-pick success summary
id: success-summary
env:
SOURCE_PR_NUMBER: ${{ needs.resolve-cherry-pick-request.outputs.pr_number }}
MERGE_COMMIT_SHA: ${{ needs.resolve-cherry-pick-request.outputs.merge_commit_sha }}
CHERRY_PICK_PR_URL: ${{ needs.cherry-pick-to-latest-release.outputs.cherry_pick_pr_url }}
run: |
source_pr_url="https://github.com/${GITHUB_REPOSITORY}/pull/${SOURCE_PR_NUMBER}"
details="*Cherry-pick PR opened successfully.*\\n• source PR: ${source_pr_url}"
if [ -n "${CHERRY_PICK_PR_URL}" ]; then
details="${details}\\n• cherry-pick PR: ${CHERRY_PICK_PR_URL}"
fi
if [ -n "${MERGE_COMMIT_SHA}" ]; then
details="${details}\\n• merge SHA: ${MERGE_COMMIT_SHA}"
fi
echo "details=${details}" >> "$GITHUB_OUTPUT"
- name: Notify #cherry-pick-prs about cherry-pick success
uses: ./.github/actions/slack-notify
with:
webhook-url: ${{ secrets.CHERRY_PICK_PRS_WEBHOOK }}
details: ${{ steps.success-summary.outputs.details }}
title: "✅ Automated Cherry-Pick PR Opened"
ref-name: ${{ github.event.pull_request.base.ref }}
notify-slack-on-cherry-pick-failure:
needs:
- resolve-cherry-pick-request
- cherry-pick-to-latest-release
if: always() && needs.resolve-cherry-pick-request.outputs.should_cherrypick == 'true' && (needs.resolve-cherry-pick-request.result == 'failure' || needs.cherry-pick-to-latest-release.result == 'failure')
if: always() && needs.cherry-pick-to-latest-release.outputs.should_cherrypick == 'true' && needs.cherry-pick-to-latest-release.result != 'success'
runs-on: ubuntu-slim
timeout-minutes: 10
steps:
@@ -238,47 +132,22 @@ jobs:
with:
persist-credentials: false
- name: Fail if Slack webhook secret is missing
env:
CHERRY_PICK_PRS_WEBHOOK: ${{ secrets.CHERRY_PICK_PRS_WEBHOOK }}
run: |
if [ -z "${CHERRY_PICK_PRS_WEBHOOK}" ]; then
echo "::error::CHERRY_PICK_PRS_WEBHOOK is not configured."
exit 1
fi
- name: Build cherry-pick failure summary
id: failure-summary
env:
SOURCE_PR_NUMBER: ${{ needs.resolve-cherry-pick-request.outputs.pr_number }}
MERGE_COMMIT_SHA: ${{ needs.resolve-cherry-pick-request.outputs.merge_commit_sha }}
GATE_ERROR: ${{ needs.resolve-cherry-pick-request.outputs.gate_error }}
SOURCE_PR_NUMBER: ${{ needs.cherry-pick-to-latest-release.outputs.pr_number }}
CHERRY_PICK_REASON: ${{ needs.cherry-pick-to-latest-release.outputs.cherry_pick_reason }}
CHERRY_PICK_DETAILS: ${{ needs.cherry-pick-to-latest-release.outputs.cherry_pick_details }}
run: |
source_pr_url="https://github.com/${GITHUB_REPOSITORY}/pull/${SOURCE_PR_NUMBER}"
reason_text="cherry-pick command failed"
if [ "${GATE_ERROR}" = "missing-merge-commit-sha" ]; then
reason_text="requested cherry-pick but merge commit SHA was missing"
elif [ "${GATE_ERROR}" = "not-allowed-merger" ]; then
reason_text="merger is not in the explicit cherry-pick allowlist"
elif [ "${CHERRY_PICK_REASON}" = "output-capture-failed" ]; then
reason_text="failed to capture cherry-pick output for classification"
elif [ "${CHERRY_PICK_REASON}" = "merge-conflict" ]; then
if [ "${CHERRY_PICK_REASON}" = "merge-conflict" ]; then
reason_text="merge conflict during cherry-pick"
fi
details_excerpt="$(printf '%s' "${CHERRY_PICK_DETAILS}" | tail -n 8 | tr '\n' ' ' | sed "s/[[:space:]]\\+/ /g" | sed "s/\"/'/g" | cut -c1-350)"
if [ -n "${GATE_ERROR}" ]; then
failed_job_label="resolve-cherry-pick-request"
else
failed_job_label="cherry-pick-to-latest-release"
fi
failed_jobs="• ${failed_job_label}\\n• source PR: ${source_pr_url}\\n• reason: ${reason_text}"
if [ -n "${MERGE_COMMIT_SHA}" ]; then
failed_jobs="${failed_jobs}\\n• merge SHA: ${MERGE_COMMIT_SHA}"
fi
failed_jobs="• cherry-pick-to-latest-release\\n• source PR: ${source_pr_url}\\n• reason: ${reason_text}"
if [ -n "${details_excerpt}" ]; then
failed_jobs="${failed_jobs}\\n• excerpt: ${details_excerpt}"
fi
@@ -289,6 +158,6 @@ jobs:
uses: ./.github/actions/slack-notify
with:
webhook-url: ${{ secrets.CHERRY_PICK_PRS_WEBHOOK }}
details: ${{ steps.failure-summary.outputs.jobs }}
failed-jobs: ${{ steps.failure-summary.outputs.jobs }}
title: "🚨 Automated Cherry-Pick Failed"
ref-name: ${{ github.event.pull_request.base.ref }}
ref-name: ${{ github.ref_name }}

View File

@@ -57,7 +57,7 @@ jobs:
cache-dependency-path: ./desktop/package-lock.json
- name: Setup Rust
uses: dtolnay/rust-toolchain@efa25f7f19611383d5b0ccf2d1c8914531636bf9
uses: dtolnay/rust-toolchain@4be9e76fd7c4901c61fb841f559994984270fce7
with:
toolchain: stable
targets: ${{ matrix.target }}

View File

@@ -1,56 +0,0 @@
name: Golang Tests
concurrency:
group: Golang-Tests-${{ github.workflow }}-${{ github.head_ref || github.event.workflow_run.head_branch || github.run_id }}
cancel-in-progress: true
on:
merge_group:
pull_request:
branches:
- main
- "release/**"
push:
tags:
- "v*.*.*"
permissions: {}
env:
GO_VERSION: "1.26"
jobs:
detect-modules:
runs-on: ubuntu-latest
timeout-minutes: 10
outputs:
modules: ${{ steps.set-modules.outputs.modules }}
steps:
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8
with:
persist-credentials: false
- id: set-modules
run: echo "modules=$(find . -name 'go.mod' -exec dirname {} \; | jq -Rc '[.,inputs]')" >> "$GITHUB_OUTPUT"
golang:
needs: detect-modules
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
matrix:
modules: ${{ fromJSON(needs.detect-modules.outputs.modules) }}
steps:
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
persist-credentials: false
- uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # zizmor: ignore[cache-poisoning]
with:
go-version: ${{ env.GO_VERSION }}
cache-dependency-path: "**/go.sum"
- run: go mod tidy
working-directory: ${{ matrix.modules }}
- run: git diff --exit-code go.mod go.sum
working-directory: ${{ matrix.modules }}
- run: go test ./...
working-directory: ${{ matrix.modules }}

View File

@@ -71,7 +71,7 @@ jobs:
- name: Create kind cluster
if: steps.list-changed.outputs.changed == 'true'
uses: helm/kind-action@ef37e7f390d99f746eb8b610417061a60e82a6cc # ratchet:helm/kind-action@v1.14.0
uses: helm/kind-action@92086f6be054225fa813e0a4b13787fc9088faab # ratchet:helm/kind-action@v1.13.0
- name: Pre-install cluster status check
if: steps.list-changed.outputs.changed == 'true'
@@ -133,7 +133,7 @@ jobs:
echo "=== Validating chart dependencies ==="
cd deployment/helm/charts/onyx
helm dependency update
helm lint . --set auth.userauth.values.user_auth_secret=placeholder
helm lint .
- name: Run chart-testing (install) with enhanced monitoring
timeout-minutes: 25
@@ -194,7 +194,6 @@ jobs:
--set=vespa.enabled=false \
--set=opensearch.enabled=true \
--set=auth.opensearch.enabled=true \
--set=auth.userauth.values.user_auth_secret=test-secret \
--set=slackbot.enabled=false \
--set=postgresql.enabled=true \
--set=postgresql.cluster.storage.storageClass=standard \
@@ -231,10 +230,6 @@ jobs:
if: steps.list-changed.outputs.changed == 'true'
run: |
echo "=== Post-install verification ==="
if ! kubectl cluster-info >/dev/null 2>&1; then
echo "ERROR: Kubernetes cluster is not reachable after install"
exit 1
fi
kubectl get pods --all-namespaces
kubectl get services --all-namespaces
# Only show issues if they exist
@@ -244,10 +239,6 @@ jobs:
if: failure() && steps.list-changed.outputs.changed == 'true'
run: |
echo "=== Cleanup on failure ==="
if ! kubectl cluster-info >/dev/null 2>&1; then
echo "Skipping failure cleanup: Kubernetes cluster is not reachable"
exit 0
fi
echo "=== Final cluster state ==="
kubectl get pods --all-namespaces
kubectl get events --all-namespaces --sort-by=.lastTimestamp | tail -10

View File

@@ -316,7 +316,6 @@ jobs:
# Base config shared by both editions
cat <<EOF > deployment/docker_compose/.env
COMPOSE_PROFILES=s3-filestore
OPENSEARCH_FOR_ONYX_ENABLED=false
AUTH_TYPE=basic
POSTGRES_POOL_PRE_PING=true
POSTGRES_USE_NULL_POOL=true
@@ -419,7 +418,6 @@ jobs:
-e POSTGRES_POOL_PRE_PING=true \
-e POSTGRES_USE_NULL_POOL=true \
-e VESPA_HOST=index \
-e ENABLE_OPENSEARCH_INDEXING_FOR_ONYX=false \
-e REDIS_HOST=cache \
-e API_SERVER_HOST=api_server \
-e OPENAI_API_KEY=${OPENAI_API_KEY} \
@@ -639,7 +637,6 @@ jobs:
ONYX_BACKEND_IMAGE=${ECR_CACHE}:integration-test-backend-test-${RUN_ID} \
ONYX_MODEL_SERVER_IMAGE=${ECR_CACHE}:integration-test-model-server-test-${RUN_ID} \
DEV_MODE=true \
OPENSEARCH_FOR_ONYX_ENABLED=false \
docker compose -f docker-compose.multitenant-dev.yml up \
relational_db \
index \
@@ -694,7 +691,6 @@ jobs:
-e POSTGRES_DB=postgres \
-e POSTGRES_USE_NULL_POOL=true \
-e VESPA_HOST=index \
-e ENABLE_OPENSEARCH_INDEXING_FOR_ONYX=false \
-e REDIS_HOST=cache \
-e API_SERVER_HOST=api_server \
-e OPENAI_API_KEY=${OPENAI_API_KEY} \

View File

@@ -31,7 +31,7 @@ jobs:
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm" # zizmor: ignore[cache-poisoning] test-only workflow; no deploy artifacts
cache: "npm"
cache-dependency-path: ./web/package-lock.json
- name: Install node dependencies

View File

@@ -12,9 +12,6 @@ on:
push:
tags:
- "v*.*.*"
# TODO: Remove this if we enable merge-queues for release branches.
branches:
- "release/**"
permissions:
contents: read
@@ -464,14 +461,14 @@ jobs:
# --- Visual Regression Diff ---
- name: Configure AWS credentials
if: always()
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
- name: Install the latest version of uv
if: always()
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"
@@ -710,7 +707,7 @@ jobs:
pull-requests: write
steps:
- name: Download visual diff summaries
uses: actions/download-artifact@70fc10c6e5e1ce46ad2ea6f2b72d43f7d47b13c3
uses: actions/download-artifact@37930b1c2abaa49bbe596cd826c3c89aef350131
with:
pattern: screenshot-diff-summary-*
path: summaries/

View File

@@ -28,7 +28,7 @@ jobs:
with:
python-version: "3.11"
- name: Setup Terraform
uses: hashicorp/setup-terraform@5e8dbf3c6d9deaf4193ca7a8fb23f2ac83bb6c85 # ratchet:hashicorp/setup-terraform@v4.0.0
uses: hashicorp/setup-terraform@b9cd54a3c349d3f38e8881555d616ced269862dd # ratchet:hashicorp/setup-terraform@v3
- name: Setup node
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v6
with: # zizmor: ignore[cache-poisoning]
@@ -38,9 +38,9 @@ jobs:
- name: Install node dependencies
working-directory: ./web
run: npm ci
- uses: j178/prek-action@0bb87d7f00b0c99306c8bcb8b8beba1eb581c037 # ratchet:j178/prek-action@v1
- uses: j178/prek-action@9d6a3097e0c1865ecce00cfb89fe80f2ee91b547 # ratchet:j178/prek-action@v1
with:
prek-version: '0.3.4'
prek-version: '0.2.21'
extra-args: ${{ github.event_name == 'pull_request' && format('--from-ref {0} --to-ref {1}', github.event.pull_request.base.sha, github.event.pull_request.head.sha) || github.event_name == 'merge_group' && format('--from-ref {0} --to-ref {1}', github.event.merge_group.base_sha, github.event.merge_group.head_sha) || github.ref_name == 'main' && '--all-files' || '' }}
- name: Check Actions
uses: giner/check-actions@28d366c7cbbe235f9624a88aa31a628167eee28c # ratchet:giner/check-actions@v1.0.1

View File

@@ -1,214 +0,0 @@
name: Release CLI
on:
push:
tags:
- "cli/v*.*.*"
jobs:
pypi:
runs-on: ubuntu-latest
environment:
name: release-cli
permissions:
id-token: write
timeout-minutes: 10
strategy:
matrix:
os-arch:
- { goos: "linux", goarch: "amd64" }
- { goos: "linux", goarch: "arm64" }
- { goos: "windows", goarch: "amd64" }
- { goos: "windows", goarch: "arm64" }
- { goos: "darwin", goarch: "amd64" }
- { goos: "darwin", goarch: "arm64" }
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"
- run: |
GOOS="${{ matrix.os-arch.goos }}" \
GOARCH="${{ matrix.os-arch.goarch }}" \
uv build --wheel
working-directory: cli
- run: uv publish
working-directory: cli
docker-amd64:
runs-on:
- runs-on
- runner=2cpu-linux-x64
- run-id=${{ github.run_id }}-cli-amd64
- extras=ecr-cache
environment: deploy
permissions:
id-token: write
timeout-minutes: 30
outputs:
digest: ${{ steps.build.outputs.digest }}
env:
REGISTRY_IMAGE: onyxdotapp/onyx-cli
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7 # ratchet:aws-actions/configure-aws-credentials@v6.0.0
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
- name: Get AWS Secrets
uses: aws-actions/aws-secretsmanager-get-secrets@a9a7eb4e2f2871d30dc5b892576fde60a2ecc802 # ratchet:aws-actions/aws-secretsmanager-get-secrets@v2.0.10
with:
secret-ids: |
DOCKER_USERNAME, deploy/docker-username
DOCKER_TOKEN, deploy/docker-token
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@b45d80f862d83dbcd57f89517bcf500b2ab88fb2 # ratchet:docker/login-action@v4
with:
username: ${{ env.DOCKER_USERNAME }}
password: ${{ env.DOCKER_TOKEN }}
- name: Build and push AMD64
id: build
uses: docker/build-push-action@d08e5c354a6adb9ed34480a06d141179aa583294 # ratchet:docker/build-push-action@v7
with:
context: ./cli
file: ./cli/Dockerfile
platforms: linux/amd64
cache-from: type=registry,ref=${{ env.REGISTRY_IMAGE }}:latest
cache-to: type=inline
outputs: type=image,name=${{ env.REGISTRY_IMAGE }},push-by-digest=true,name-canonical=true,push=true
docker-arm64:
runs-on:
- runs-on
- runner=2cpu-linux-arm64
- run-id=${{ github.run_id }}-cli-arm64
- extras=ecr-cache
environment: deploy
permissions:
id-token: write
timeout-minutes: 30
outputs:
digest: ${{ steps.build.outputs.digest }}
env:
REGISTRY_IMAGE: onyxdotapp/onyx-cli
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7 # ratchet:aws-actions/configure-aws-credentials@v6.0.0
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
- name: Get AWS Secrets
uses: aws-actions/aws-secretsmanager-get-secrets@a9a7eb4e2f2871d30dc5b892576fde60a2ecc802 # ratchet:aws-actions/aws-secretsmanager-get-secrets@v2.0.10
with:
secret-ids: |
DOCKER_USERNAME, deploy/docker-username
DOCKER_TOKEN, deploy/docker-token
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@b45d80f862d83dbcd57f89517bcf500b2ab88fb2 # ratchet:docker/login-action@v4
with:
username: ${{ env.DOCKER_USERNAME }}
password: ${{ env.DOCKER_TOKEN }}
- name: Build and push ARM64
id: build
uses: docker/build-push-action@d08e5c354a6adb9ed34480a06d141179aa583294 # ratchet:docker/build-push-action@v7
with:
context: ./cli
file: ./cli/Dockerfile
platforms: linux/arm64
cache-from: type=registry,ref=${{ env.REGISTRY_IMAGE }}:latest
cache-to: type=inline
outputs: type=image,name=${{ env.REGISTRY_IMAGE }},push-by-digest=true,name-canonical=true,push=true
merge-docker:
needs:
- docker-amd64
- docker-arm64
runs-on:
- runs-on
- runner=2cpu-linux-x64
- run-id=${{ github.run_id }}-cli-merge
environment: deploy
permissions:
id-token: write
timeout-minutes: 10
env:
REGISTRY_IMAGE: onyxdotapp/onyx-cli
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7 # ratchet:aws-actions/configure-aws-credentials@v6.0.0
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
- name: Get AWS Secrets
uses: aws-actions/aws-secretsmanager-get-secrets@a9a7eb4e2f2871d30dc5b892576fde60a2ecc802 # ratchet:aws-actions/aws-secretsmanager-get-secrets@v2.0.10
with:
secret-ids: |
DOCKER_USERNAME, deploy/docker-username
DOCKER_TOKEN, deploy/docker-token
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@b45d80f862d83dbcd57f89517bcf500b2ab88fb2 # ratchet:docker/login-action@v4
with:
username: ${{ env.DOCKER_USERNAME }}
password: ${{ env.DOCKER_TOKEN }}
- name: Create and push manifest
env:
AMD64_DIGEST: ${{ needs.docker-amd64.outputs.digest }}
ARM64_DIGEST: ${{ needs.docker-arm64.outputs.digest }}
TAG: ${{ github.ref_name }}
run: |
SANITIZED_TAG="${TAG#cli/}"
IMAGES=(
"${REGISTRY_IMAGE}@${AMD64_DIGEST}"
"${REGISTRY_IMAGE}@${ARM64_DIGEST}"
)
if [[ "$TAG" =~ ^cli/v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
docker buildx imagetools create \
-t "${REGISTRY_IMAGE}:${SANITIZED_TAG}" \
-t "${REGISTRY_IMAGE}:latest" \
"${IMAGES[@]}"
else
docker buildx imagetools create \
-t "${REGISTRY_IMAGE}:${SANITIZED_TAG}" \
"${IMAGES[@]}"
fi

View File

@@ -22,11 +22,13 @@ jobs:
- { goos: "windows", goarch: "arm64" }
- { goos: "darwin", goarch: "amd64" }
- { goos: "darwin", goarch: "arm64" }
- { goos: "", goarch: "" }
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
fetch-depth: 0
- uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"

View File

@@ -48,10 +48,6 @@ on:
required: false
default: true
type: boolean
secrets:
AWS_OIDC_ROLE_ARN:
description: "AWS role ARN for OIDC auth"
required: true
permissions:
contents: read
@@ -77,7 +73,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -120,7 +116,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -162,7 +158,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -268,7 +264,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2

View File

@@ -110,7 +110,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -180,7 +180,7 @@ jobs:
persist-credentials: false
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
@@ -244,7 +244,7 @@ jobs:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
uses: aws-actions/configure-aws-credentials@61815dcd50bd041e203e49132bacad1fd04d2708
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2

View File

@@ -1,69 +0,0 @@
name: Storybook Deploy
env:
VERCEL_ORG_ID: ${{ secrets.VERCEL_ORG_ID }}
VERCEL_PROJECT_ID: prj_sG49mVsA25UsxIPhN2pmBJlikJZM
VERCEL_CLI: vercel@50.14.1
VERCEL_TOKEN: ${{ secrets.VERCEL_TOKEN }}
concurrency:
group: storybook-deploy-production
cancel-in-progress: true
on:
workflow_dispatch:
push:
branches:
- main
paths:
- "web/lib/opal/**"
- "web/src/refresh-components/**"
- "web/.storybook/**"
- "web/package.json"
- "web/package-lock.json"
permissions:
contents: read
jobs:
Deploy-Storybook:
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v4
with:
persist-credentials: false
- name: Setup node
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm"
cache-dependency-path: ./web/package-lock.json
- name: Install dependencies
working-directory: web
run: npm ci
- name: Build Storybook
working-directory: web
run: npm run storybook:build
- name: Deploy to Vercel (Production)
working-directory: web
run: npx --yes "$VERCEL_CLI" deploy storybook-static/ --prod --yes --token="$VERCEL_TOKEN"
notify-slack-on-failure:
needs: Deploy-Storybook
if: always() && needs.Deploy-Storybook.result == 'failure'
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v4
with:
persist-credentials: false
sparse-checkout: .github/actions/slack-notify
- name: Send Slack notification
uses: ./.github/actions/slack-notify
with:
webhook-url: ${{ secrets.MONITOR_DEPLOYMENTS_WEBHOOK }}
failed-jobs: "• Deploy-Storybook"
title: "🚨 Storybook Deploy Failed"

View File

@@ -24,7 +24,7 @@ jobs:
persist-credentials: false
- name: Install the latest version of uv
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"

View File

@@ -119,11 +119,10 @@ repos:
]
- repo: https://github.com/golangci/golangci-lint
rev: 5d1e709b7be35cb2025444e19de266b056b7b7ee # frozen: v2.10.1
rev: 9f61b0f53f80672872fced07b6874397c3ed197b # frozen: v2.7.2
hooks:
- id: golangci-lint
language_version: "1.26.0"
entry: bash -c "find . -name go.mod -not -path './.venv/*' -print0 | xargs -0 -I{} bash -c 'cd \"$(dirname {})\" && golangci-lint run ./...'"
entry: bash -c "find tools/ -name go.mod -print0 | xargs -0 -I{} bash -c 'cd \"$(dirname {})\" && golangci-lint run ./...'"
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.

View File

@@ -7,9 +7,6 @@
AUTH_TYPE=basic
# Recommended for basic auth - used for signing password reset and verification tokens
# Generate a secure value with: openssl rand -hex 32
USER_AUTH_SECRET=""
DEV_MODE=true

View File

@@ -104,10 +104,6 @@ Onyx uses Celery for asynchronous task processing with multiple specialized work
- Always use `@shared_task` rather than `@celery_app`
- Put tasks under `background/celery/tasks/` or `ee/background/celery/tasks`
- Never enqueue a task without an expiration. Always supply `expires=` when
sending tasks, either from the beat schedule or directly from another task. It
should never be acceptable to submit code which enqueues tasks without an
expiration, as doing so can lead to unbounded task queue growth.
**Defining APIs**:
When creating new FastAPI APIs, do NOT use the `response_model` field. Instead, just type the
@@ -544,8 +540,6 @@ To run them:
npx playwright test <TEST_NAME>
```
For shared fixtures, best practices, and detailed guidance, see `backend/tests/README.md`.
## Logs
When (1) writing integration tests or (2) doing live tests (e.g. curl / playwright) you can get access
@@ -598,7 +592,7 @@ Before writing your plan, make sure to do research. Explore the relevant section
Never hardcode status codes or use `starlette.status` / `fastapi.status` constants directly.**
A global FastAPI exception handler converts `OnyxError` into a JSON response with the standard
`{"error_code": "...", "detail": "..."}` shape. This eliminates boilerplate and keeps error
`{"error_code": "...", "message": "..."}` shape. This eliminates boilerplate and keeps error
handling consistent across the entire backend.
```python

View File

@@ -46,9 +46,7 @@ RUN apt-get update && \
pkg-config \
gcc \
nano \
vim \
libjemalloc2 \
&& \
vim && \
rm -rf /var/lib/apt/lists/* && \
apt-get clean
@@ -143,7 +141,6 @@ COPY --chown=onyx:onyx ./scripts/debugging /app/scripts/debugging
COPY --chown=onyx:onyx ./scripts/force_delete_connector_by_id.py /app/scripts/force_delete_connector_by_id.py
COPY --chown=onyx:onyx ./scripts/supervisord_entrypoint.sh /app/scripts/supervisord_entrypoint.sh
COPY --chown=onyx:onyx ./scripts/setup_craft_templates.sh /app/scripts/setup_craft_templates.sh
COPY --chown=onyx:onyx ./scripts/reencrypt_secrets.py /app/scripts/reencrypt_secrets.py
RUN chmod +x /app/scripts/supervisord_entrypoint.sh /app/scripts/setup_craft_templates.sh
# Run Craft template setup at build time when ENABLE_CRAFT=true
@@ -167,13 +164,6 @@ ENV PYTHONPATH=/app
ARG ONYX_VERSION=0.0.0-dev
ENV ONYX_VERSION=${ONYX_VERSION}
# Use jemalloc instead of glibc malloc to reduce memory fragmentation
# in long-running Python processes (API server, Celery workers).
# The soname is architecture-independent; the dynamic linker resolves
# the correct path from standard library directories.
# Placed after all RUN steps so build-time processes are unaffected.
ENV LD_PRELOAD=libjemalloc.so.2
# Default command which does nothing
# This container is used by api server and background which specify their own CMD
CMD ["tail", "-f", "/dev/null"]

View File

@@ -244,10 +244,7 @@ def do_run_migrations(
def provide_iam_token_for_alembic(
dialect: Any, # noqa: ARG001
conn_rec: Any, # noqa: ARG001
cargs: Any, # noqa: ARG001
cparams: Any,
dialect: Any, conn_rec: Any, cargs: Any, cparams: Any # noqa: ARG001
) -> None:
if USE_IAM_AUTH:
# Database connection settings
@@ -363,7 +360,8 @@ async def run_async_migrations() -> None:
# upgrade_all_tenants=true or schemas in multi-tenant mode
# and for non-multi-tenant mode, we should use schemas with the default schema
raise ValueError(
"No migration target specified. Use either upgrade_all_tenants=true for all tenants or schemas for specific schemas."
"No migration target specified. Use either upgrade_all_tenants=true for all tenants "
"or schemas for specific schemas."
)
await engine.dispose()
@@ -459,7 +457,8 @@ def run_migrations_offline() -> None:
else:
# This should not happen in the new design
raise ValueError(
"No migration target specified. Use either upgrade_all_tenants=true for all tenants or schemas for specific schemas."
"No migration target specified. Use either upgrade_all_tenants=true for all tenants "
"or schemas for specific schemas."
)

View File

@@ -13,7 +13,6 @@ Usage examples::
# custom settings
python alembic/run_multitenant_migrations.py -j 8 -b 100
"""
from __future__ import annotations
import argparse
@@ -118,7 +117,8 @@ def run_migrations_parallel(
batches = [schemas[i : i + batch_size] for i in range(0, len(schemas), batch_size)]
total_batches = len(batches)
print(
f"{len(schemas)} schemas in {total_batches} batch(es) with {max_workers} workers (batch size: {batch_size})...",
f"{len(schemas)} schemas in {total_batches} batch(es) "
f"with {max_workers} workers (batch size: {batch_size})...",
flush=True,
)
all_success = True
@@ -166,7 +166,8 @@ def run_migrations_parallel(
with lock:
in_flight[batch_idx] = batch
print(
f"Batch {batch_idx + 1}/{total_batches} started ({len(batch)} schemas): {', '.join(batch)}",
f"Batch {batch_idx + 1}/{total_batches} started "
f"({len(batch)} schemas): {', '.join(batch)}",
flush=True,
)
result = run_alembic_for_batch(batch)
@@ -200,7 +201,7 @@ def run_migrations_parallel(
except Exception as e:
print(
f"Batch {batch_idx + 1}/{total_batches} ✗ exception: {e}",
f"Batch {batch_idx + 1}/{total_batches} " f"✗ exception: {e}",
flush=True,
)
all_success = False
@@ -267,12 +268,14 @@ def main() -> int:
if not schemas_to_migrate:
print(
f"All {len(tenant_schemas)} tenants are already at head revision ({head_rev})."
f"All {len(tenant_schemas)} tenants are already at head "
f"revision ({head_rev})."
)
return 0
print(
f"{len(schemas_to_migrate)}/{len(tenant_schemas)} tenants need migration (head: {head_rev})."
f"{len(schemas_to_migrate)}/{len(tenant_schemas)} tenants need "
f"migration (head: {head_rev})."
)
success = run_migrations_parallel(

View File

@@ -1,43 +0,0 @@
"""add timestamps to user table
Revision ID: 27fb147a843f
Revises: b5c4d7e8f9a1
Create Date: 2026-03-08 17:18:40.828644
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "27fb147a843f"
down_revision = "b5c4d7e8f9a1"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"user",
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
)
op.add_column(
"user",
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
)
def downgrade() -> None:
op.drop_column("user", "updated_at")
op.drop_column("user", "created_at")

View File

@@ -50,7 +50,8 @@ def upgrade() -> None:
if orphaned_count > 0:
logger.warning(
f"WARNING: {orphaned_count} chat_session records still have folder_id without project_id. Proceeding anyway."
f"WARNING: {orphaned_count} chat_session records still have "
f"folder_id without project_id. Proceeding anyway."
)
# === Step 2: Drop chat_session.folder_id ===

View File

@@ -75,7 +75,8 @@ def batch_delete(
if failed_batches:
logger.warning(
f"Failed to delete {len(failed_batches)} batches from {table_name}. Total deleted: {total_deleted}/{total_count}"
f"Failed to delete {len(failed_batches)} batches from {table_name}. "
f"Total deleted: {total_deleted}/{total_count}"
)
# Fail the migration to avoid silently succeeding on partial cleanup
raise RuntimeError(

View File

@@ -18,7 +18,8 @@ depends_on = None
def upgrade() -> None:
# Set all existing records to not migrated
op.execute(
"UPDATE user_file SET document_id_migrated = FALSE WHERE document_id_migrated IS DISTINCT FROM FALSE;"
"UPDATE user_file SET document_id_migrated = FALSE "
"WHERE document_id_migrated IS DISTINCT FROM FALSE;"
)

View File

@@ -35,6 +35,7 @@ def upgrade() -> None:
# environment variables MUST be set. Otherwise, an exception will be raised.
if not MULTI_TENANT:
# Enable pg_trgm extension if not already enabled
op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")
@@ -480,7 +481,8 @@ def upgrade() -> None:
f"ON kg_entity USING GIN (name {POSTGRES_DEFAULT_SCHEMA}.gin_trgm_ops)"
)
op.execute(
"CREATE INDEX IF NOT EXISTS idx_kg_entity_normalization_trigrams ON kg_entity USING GIN (name_trigrams)"
"CREATE INDEX IF NOT EXISTS idx_kg_entity_normalization_trigrams "
"ON kg_entity USING GIN (name_trigrams)"
)
# Create kg_entity trigger to update kg_entity.name and its trigrams

View File

@@ -51,7 +51,10 @@ def upgrade() -> None:
next_email = f"{username.lower()}_{attempt}@{domain.lower()}"
# Email conflict occurred, append `_1`, `_2`, etc., to the username
logger.warning(
f"Conflict while lowercasing email: old_email={email} conflicting_email={new_email} next_email={next_email}"
f"Conflict while lowercasing email: "
f"old_email={email} "
f"conflicting_email={new_email} "
f"next_email={next_email}"
)
new_email = next_email
attempt += 1

View File

@@ -24,10 +24,12 @@ depends_on = None
def upgrade() -> None:
# Convert existing lowercase values to uppercase to match enum member names
op.execute(
"UPDATE connector_credential_pair SET processing_mode = 'REGULAR' WHERE processing_mode = 'regular'"
"UPDATE connector_credential_pair SET processing_mode = 'REGULAR' "
"WHERE processing_mode = 'regular'"
)
op.execute(
"UPDATE connector_credential_pair SET processing_mode = 'FILE_SYSTEM' WHERE processing_mode = 'file_system'"
"UPDATE connector_credential_pair SET processing_mode = 'FILE_SYSTEM' "
"WHERE processing_mode = 'file_system'"
)
# Update the server default to use uppercase

View File

@@ -289,7 +289,8 @@ def upgrade() -> None:
attributes_str = json.dumps(attributes).replace("'", "''")
op.execute(
sa.text(
f"UPDATE kg_entity_type SET attributes = '{attributes_str}'WHERE id_name = '{entity_type}'"
f"UPDATE kg_entity_type SET attributes = '{attributes_str}'"
f"WHERE id_name = '{entity_type}'"
),
)
@@ -311,6 +312,7 @@ def downgrade() -> None:
attributes_str = json.dumps(attributes).replace("'", "''")
op.execute(
sa.text(
f"UPDATE kg_entity_type SET attributes = '{attributes_str}'WHERE id_name = '{entity_type}'"
f"UPDATE kg_entity_type SET attributes = '{attributes_str}'"
f"WHERE id_name = '{entity_type}'"
),
)

View File

@@ -160,7 +160,7 @@ def remove_old_tags() -> None:
f"""
DELETE FROM document__tag
WHERE document_id = '{document_id}'
AND tag_id IN ({",".join(to_delete)})
AND tag_id IN ({','.join(to_delete)})
"""
)
)
@@ -239,7 +239,7 @@ def _get_batch_documents_with_multiple_tags(
).fetchall()
if not batch:
break
doc_ids = [document_id for (document_id,) in batch]
doc_ids = [document_id for document_id, in batch]
yield doc_ids
offset_clause = f"AND document__tag.document_id > '{doc_ids[-1]}'"

View File

@@ -1,117 +0,0 @@
"""add_voice_provider_and_user_voice_prefs
Revision ID: 93a2e195e25c
Revises: 27fb147a843f
Create Date: 2026-02-23 15:16:39.507304
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import column
from sqlalchemy import true
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "93a2e195e25c"
down_revision = "27fb147a843f"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create voice_provider table
op.create_table(
"voice_provider",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("name", sa.String(), unique=True, nullable=False),
sa.Column("provider_type", sa.String(), nullable=False),
sa.Column("api_key", sa.LargeBinary(), nullable=True),
sa.Column("api_base", sa.String(), nullable=True),
sa.Column("custom_config", postgresql.JSONB(), nullable=True),
sa.Column("stt_model", sa.String(), nullable=True),
sa.Column("tts_model", sa.String(), nullable=True),
sa.Column("default_voice", sa.String(), nullable=True),
sa.Column(
"is_default_stt", sa.Boolean(), nullable=False, server_default="false"
),
sa.Column(
"is_default_tts", sa.Boolean(), nullable=False, server_default="false"
),
sa.Column("deleted", sa.Boolean(), nullable=False, server_default="false"),
sa.Column(
"time_created",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"time_updated",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
onupdate=sa.func.now(),
nullable=False,
),
)
# Add partial unique indexes to enforce only one default STT/TTS provider
op.create_index(
"ix_voice_provider_one_default_stt",
"voice_provider",
["is_default_stt"],
unique=True,
postgresql_where=column("is_default_stt") == true(),
)
op.create_index(
"ix_voice_provider_one_default_tts",
"voice_provider",
["is_default_tts"],
unique=True,
postgresql_where=column("is_default_tts") == true(),
)
# Add voice preference columns to user table
op.add_column(
"user",
sa.Column(
"voice_auto_send",
sa.Boolean(),
default=False,
nullable=False,
server_default="false",
),
)
op.add_column(
"user",
sa.Column(
"voice_auto_playback",
sa.Boolean(),
default=False,
nullable=False,
server_default="false",
),
)
op.add_column(
"user",
sa.Column(
"voice_playback_speed",
sa.Float(),
default=1.0,
nullable=False,
server_default="1.0",
),
)
def downgrade() -> None:
# Remove user voice preference columns
op.drop_column("user", "voice_playback_speed")
op.drop_column("user", "voice_auto_playback")
op.drop_column("user", "voice_auto_send")
op.drop_index("ix_voice_provider_one_default_tts", table_name="voice_provider")
op.drop_index("ix_voice_provider_one_default_stt", table_name="voice_provider")
# Drop voice_provider table
op.drop_table("voice_provider")

View File

@@ -24,7 +24,8 @@ TOOL_DESCRIPTIONS = {
"The action will be used when the user asks the agent to generate an image."
),
"WebSearchTool": (
"The Web Search Action allows the agent to perform internet searches for up-to-date information."
"The Web Search Action allows the agent "
"to perform internet searches for up-to-date information."
),
"KnowledgeGraphTool": (
"The Knowledge Graph Search Action allows the agent to search the "

View File

@@ -1,51 +0,0 @@
"""add hierarchy_node_by_connector_credential_pair table
Revision ID: b5c4d7e8f9a1
Revises: a3b8d9e2f1c4
Create Date: 2026-03-04
"""
import sqlalchemy as sa
from alembic import op
revision = "b5c4d7e8f9a1"
down_revision = "a3b8d9e2f1c4"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"hierarchy_node_by_connector_credential_pair",
sa.Column("hierarchy_node_id", sa.Integer(), nullable=False),
sa.Column("connector_id", sa.Integer(), nullable=False),
sa.Column("credential_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["hierarchy_node_id"],
["hierarchy_node.id"],
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["connector_id", "credential_id"],
[
"connector_credential_pair.connector_id",
"connector_credential_pair.credential_id",
],
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("hierarchy_node_id", "connector_id", "credential_id"),
)
op.create_index(
"ix_hierarchy_node_cc_pair_connector_credential",
"hierarchy_node_by_connector_credential_pair",
["connector_id", "credential_id"],
)
def downgrade() -> None:
op.drop_index(
"ix_hierarchy_node_cc_pair_connector_credential",
table_name="hierarchy_node_by_connector_credential_pair",
)
op.drop_table("hierarchy_node_by_connector_credential_pair")

View File

@@ -140,7 +140,8 @@ def _migrate_files_to_postgres() -> None:
# Fetch rows that have external storage pointers (bucket/object_key not NULL)
result = session.execute(
text(
"SELECT file_id, bucket_name, object_key FROM file_record WHERE bucket_name IS NOT NULL AND object_key IS NOT NULL"
"SELECT file_id, bucket_name, object_key FROM file_record "
"WHERE bucket_name IS NOT NULL AND object_key IS NOT NULL"
)
)
@@ -181,7 +182,8 @@ def _migrate_files_to_postgres() -> None:
# Update DB row: set lobj_oid, clear bucket/object_key
session.execute(
text(
"UPDATE file_record SET lobj_oid = :lobj_oid, bucket_name = NULL, object_key = NULL WHERE file_id = :file_id"
"UPDATE file_record SET lobj_oid = :lobj_oid, bucket_name = NULL, "
"object_key = NULL WHERE file_id = :file_id"
),
{"lobj_oid": lobj_oid, "file_id": file_id},
)
@@ -222,7 +224,8 @@ def _migrate_files_to_external_storage() -> None:
# Find all files currently stored in PostgreSQL (lobj_oid is not null)
result = session.execute(
text(
"SELECT file_id FROM file_record WHERE lobj_oid IS NOT NULL AND bucket_name IS NULL AND object_key IS NULL"
"SELECT file_id FROM file_record WHERE lobj_oid IS NOT NULL "
"AND bucket_name IS NULL AND object_key IS NULL"
)
)

View File

@@ -39,7 +39,8 @@ BUILT_IN_TOOLS = [
"name": "WebSearchTool",
"display_name": "Web Search",
"description": (
"The Web Search Action allows the assistant to perform internet searches for up-to-date information."
"The Web Search Action allows the assistant "
"to perform internet searches for up-to-date information."
),
"in_code_tool_id": "WebSearchTool",
},

View File

@@ -11,6 +11,7 @@ from sqlalchemy import text
from alembic import op
from onyx.configs.app_configs import DB_READONLY_PASSWORD
from onyx.configs.app_configs import DB_READONLY_USER
from shared_configs.configs import MULTI_TENANT
# revision identifiers, used by Alembic.
@@ -21,52 +22,59 @@ depends_on = None
def upgrade() -> None:
# Enable pg_trgm extension if not already enabled
op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")
if MULTI_TENANT:
# Create the read-only db user if it does not already exist.
if not (DB_READONLY_USER and DB_READONLY_PASSWORD):
raise Exception("DB_READONLY_USER or DB_READONLY_PASSWORD is not set")
# Enable pg_trgm extension if not already enabled
op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")
op.execute(
text(
f"""
DO $$
BEGIN
-- Check if the read-only user already exists
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{DB_READONLY_USER}') THEN
-- Create the read-only user with the specified password
EXECUTE format('CREATE USER %I WITH PASSWORD %L', '{DB_READONLY_USER}', '{DB_READONLY_PASSWORD}');
-- First revoke all privileges to ensure a clean slate
EXECUTE format('REVOKE ALL ON DATABASE %I FROM %I', current_database(), '{DB_READONLY_USER}');
-- Grant only the CONNECT privilege to allow the user to connect to the database
-- but not perform any operations without additional specific grants
EXECUTE format('GRANT CONNECT ON DATABASE %I TO %I', current_database(), '{DB_READONLY_USER}');
END IF;
END
$$;
"""
# Create read-only db user here only in multi-tenant mode. For single-tenant mode,
# the user is created in the standard migration.
if not (DB_READONLY_USER and DB_READONLY_PASSWORD):
raise Exception("DB_READONLY_USER or DB_READONLY_PASSWORD is not set")
op.execute(
text(
f"""
DO $$
BEGIN
-- Check if the read-only user already exists
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{DB_READONLY_USER}') THEN
-- Create the read-only user with the specified password
EXECUTE format('CREATE USER %I WITH PASSWORD %L', '{DB_READONLY_USER}', '{DB_READONLY_PASSWORD}');
-- First revoke all privileges to ensure a clean slate
EXECUTE format('REVOKE ALL ON DATABASE %I FROM %I', current_database(), '{DB_READONLY_USER}');
-- Grant only the CONNECT privilege to allow the user to connect to the database
-- but not perform any operations without additional specific grants
EXECUTE format('GRANT CONNECT ON DATABASE %I TO %I', current_database(), '{DB_READONLY_USER}');
END IF;
END
$$;
"""
)
)
)
def downgrade() -> None:
op.execute(
text(
f"""
DO $$
BEGIN
IF EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{DB_READONLY_USER}') THEN
-- First revoke all privileges from the database
EXECUTE format('REVOKE ALL ON DATABASE %I FROM %I', current_database(), '{DB_READONLY_USER}');
-- Then revoke all privileges from the public schema
EXECUTE format('REVOKE ALL ON SCHEMA public FROM %I', '{DB_READONLY_USER}');
-- Then drop the user
EXECUTE format('DROP USER %I', '{DB_READONLY_USER}');
END IF;
END
$$;
"""
if MULTI_TENANT:
# Drop read-only db user here only in single tenant mode. For multi-tenant mode,
# the user is dropped in the alembic_tenants migration.
op.execute(
text(
f"""
DO $$
BEGIN
IF EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{DB_READONLY_USER}') THEN
-- First revoke all privileges from the database
EXECUTE format('REVOKE ALL ON DATABASE %I FROM %I', current_database(), '{DB_READONLY_USER}');
-- Then revoke all privileges from the public schema
EXECUTE format('REVOKE ALL ON SCHEMA public FROM %I', '{DB_READONLY_USER}');
-- Then drop the user
EXECUTE format('DROP USER %I', '{DB_READONLY_USER}');
END IF;
END
$$;
"""
)
)
)
op.execute(text("DROP EXTENSION IF EXISTS pg_trgm"))
op.execute(text("DROP EXTENSION IF EXISTS pg_trgm"))

View File

@@ -9,15 +9,12 @@ from onyx.access.access import (
_get_access_for_documents as get_access_for_documents_without_groups,
)
from onyx.access.access import _get_acl_for_user as get_acl_for_user_without_groups
from onyx.access.access import collect_user_file_access
from onyx.access.models import DocumentAccess
from onyx.access.utils import prefix_external_group
from onyx.access.utils import prefix_user_group
from onyx.db.document import get_document_sources
from onyx.db.document import get_documents_by_ids
from onyx.db.models import User
from onyx.db.models import UserFile
from onyx.db.user_file import fetch_user_files_with_access_relationships
from onyx.utils.logger import setup_logger
@@ -119,68 +116,6 @@ def _get_access_for_documents(
return access_map
def _collect_user_file_group_names(user_file: UserFile) -> set[str]:
"""Extract user-group names from the already-loaded Persona.groups
relationships on a UserFile (skipping deleted personas)."""
groups: set[str] = set()
for persona in user_file.assistants:
if persona.deleted:
continue
for group in persona.groups:
groups.add(group.name)
return groups
def get_access_for_user_files_impl(
user_file_ids: list[str],
db_session: Session,
) -> dict[str, DocumentAccess]:
"""EE version: extends the MIT user file ACL with user group names
from personas shared via user groups.
Uses a single DB query (via fetch_user_files_with_access_relationships)
that eagerly loads both the MIT-needed and EE-needed relationships.
NOTE: is imported in onyx.access.access by `fetch_versioned_implementation`
DO NOT REMOVE."""
user_files = fetch_user_files_with_access_relationships(
user_file_ids, db_session, eager_load_groups=True
)
return build_access_for_user_files_impl(user_files)
def build_access_for_user_files_impl(
user_files: list[UserFile],
) -> dict[str, DocumentAccess]:
"""EE version: works on pre-loaded UserFile objects.
Expects Persona.groups to be eagerly loaded.
NOTE: is imported in onyx.access.access by `fetch_versioned_implementation`
DO NOT REMOVE."""
result: dict[str, DocumentAccess] = {}
for user_file in user_files:
if user_file.user is None:
result[str(user_file.id)] = DocumentAccess.build(
user_emails=[],
user_groups=[],
is_public=True,
external_user_emails=[],
external_user_group_ids=[],
)
continue
emails, is_public = collect_user_file_access(user_file)
group_names = _collect_user_file_group_names(user_file)
result[str(user_file.id)] = DocumentAccess.build(
user_emails=list(emails),
user_groups=list(group_names),
is_public=is_public,
external_user_emails=[],
external_user_group_ids=[],
)
return result
def _get_acl_for_user(user: User, db_session: Session) -> set[str]:
"""Returns a list of ACL entries that the user has access to. This is meant to be
used downstream to filter out documents that the user does not have access to. The

View File

@@ -1,4 +1,3 @@
import os
from datetime import datetime
import jwt
@@ -21,12 +20,7 @@ logger = setup_logger()
def verify_auth_setting() -> None:
# All the Auth flows are valid for EE version, but warn about deprecated 'disabled'
raw_auth_type = (os.environ.get("AUTH_TYPE") or "").lower()
if raw_auth_type == "disabled":
logger.warning(
"AUTH_TYPE='disabled' is no longer supported. Using 'basic' instead. Please update your configuration."
)
# All the Auth flows are valid for EE version
logger.notice(f"Using Auth Type: {AUTH_TYPE.value}")

View File

@@ -59,6 +59,7 @@ def cloud_beat_task_generator(
# gated_tenants = get_gated_tenants()
for tenant_id in tenant_ids:
# Same comment here as the above NOTE
# if tenant_id in gated_tenants:
# continue

View File

@@ -424,7 +424,10 @@ def connector_permission_sync_generator_task(
raise ValueError(error_msg)
if not redis_connector.permissions.fenced: # The fence must exist
error_msg = f"connector_permission_sync_generator_task - fence not found: fence={redis_connector.permissions.fence_key}"
error_msg = (
f"connector_permission_sync_generator_task - fence not found: "
f"fence={redis_connector.permissions.fence_key}"
)
_fail_doc_permission_sync_attempt(attempt_id, error_msg)
raise ValueError(error_msg)
@@ -438,7 +441,8 @@ def connector_permission_sync_generator_task(
if payload.celery_task_id is None:
logger.info(
f"connector_permission_sync_generator_task - Waiting for fence: fence={redis_connector.permissions.fence_key}"
f"connector_permission_sync_generator_task - Waiting for fence: "
f"fence={redis_connector.permissions.fence_key}"
)
sleep(1)
continue
@@ -604,7 +608,8 @@ def connector_permission_sync_generator_task(
docs_with_permission_errors=docs_with_errors,
)
task_logger.info(
f"Completed doc permission sync attempt {attempt_id}: {tasks_generated} docs, {docs_with_errors} errors"
f"Completed doc permission sync attempt {attempt_id}: "
f"{tasks_generated} docs, {docs_with_errors} errors"
)
redis_connector.permissions.generator_complete = tasks_generated
@@ -711,7 +716,9 @@ def element_update_permissions(
elapsed = time.monotonic() - start
task_logger.info(
f"{element_type}={element_id} action=update_permissions elapsed={elapsed:.2f}"
f"{element_type}={element_id} "
f"action=update_permissions "
f"elapsed={elapsed:.2f}"
)
except Exception as e:
task_logger.exception(
@@ -893,7 +900,8 @@ def validate_permission_sync_fence(
tasks_not_in_celery += 1
task_logger.info(
f"validate_permission_sync_fence task check: tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
"validate_permission_sync_fence task check: "
f"tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
)
# we're active if there are still tasks to run and those tasks all exist in celery
@@ -999,10 +1007,7 @@ class PermissionSyncCallback(IndexingHeartbeatInterface):
def monitor_ccpair_permissions_taskset(
tenant_id: str,
key_bytes: bytes,
r: Redis, # noqa: ARG001
db_session: Session,
tenant_id: str, key_bytes: bytes, r: Redis, db_session: Session # noqa: ARG001
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
@@ -1026,7 +1031,8 @@ def monitor_ccpair_permissions_taskset(
payload = redis_connector.permissions.payload
except ValidationError:
task_logger.exception(
"Permissions sync payload failed to validate. Schema may have been updated."
"Permissions sync payload failed to validate. "
"Schema may have been updated."
)
return
@@ -1035,7 +1041,11 @@ def monitor_ccpair_permissions_taskset(
remaining = redis_connector.permissions.get_remaining()
task_logger.info(
f"Permissions sync progress: cc_pair={cc_pair_id} id={payload.id} remaining={remaining} initial={initial}"
f"Permissions sync progress: "
f"cc_pair={cc_pair_id} "
f"id={payload.id} "
f"remaining={remaining} "
f"initial={initial}"
)
# Add telemetry for permission syncing progress
@@ -1054,7 +1064,10 @@ def monitor_ccpair_permissions_taskset(
mark_cc_pair_as_permissions_synced(db_session, int(cc_pair_id), payload.started)
task_logger.info(
f"Permissions sync finished: cc_pair={cc_pair_id} id={payload.id} num_synced={initial}"
f"Permissions sync finished: "
f"cc_pair={cc_pair_id} "
f"id={payload.id} "
f"num_synced={initial}"
)
# Add telemetry for permission syncing complete

View File

@@ -111,20 +111,23 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
if cc_pair.access_type != AccessType.SYNC:
task_logger.error(
f"Received non-sync CC Pair {cc_pair.id} for external group sync. Actual access type: {cc_pair.access_type}"
f"Received non-sync CC Pair {cc_pair.id} for external "
f"group sync. Actual access type: {cc_pair.access_type}"
)
return False
if cc_pair.status == ConnectorCredentialPairStatus.DELETING:
task_logger.debug(
f"Skipping group sync for CC Pair {cc_pair.id} - CC Pair is being deleted"
f"Skipping group sync for CC Pair {cc_pair.id} - "
f"CC Pair is being deleted"
)
return False
sync_config = get_source_perm_sync_config(cc_pair.connector.source)
if sync_config is None:
task_logger.debug(
f"Skipping group sync for CC Pair {cc_pair.id} - no sync config found for {cc_pair.connector.source}"
f"Skipping group sync for CC Pair {cc_pair.id} - "
f"no sync config found for {cc_pair.connector.source}"
)
return False
@@ -132,7 +135,8 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
# This is fine because all sources dont necessarily have a concept of groups
if sync_config.group_sync_config is None:
task_logger.debug(
f"Skipping group sync for CC Pair {cc_pair.id} - no group sync config found for {cc_pair.connector.source}"
f"Skipping group sync for CC Pair {cc_pair.id} - "
f"no group sync config found for {cc_pair.connector.source}"
)
return False

View File

@@ -74,7 +74,8 @@ def perform_ttl_management_task(
except Exception:
logger.exception(
f"delete_chat_session exceptioned. user_id={user_id} session_id={session_id}"
"delete_chat_session exceptioned. "
f"user_id={user_id} session_id={session_id}"
)
with get_session_with_current_tenant() as db_session:
mark_task_as_finished_with_id(

View File

@@ -7,8 +7,7 @@ QUERY_HISTORY_TASK_NAME_PREFIX = OnyxCeleryTask.EXPORT_QUERY_HISTORY_TASK
def name_chat_ttl_task(
retention_limit_days: float,
tenant_id: str | None = None, # noqa: ARG001
retention_limit_days: float, tenant_id: str | None = None # noqa: ARG001
) -> str:
return f"chat_ttl_{retention_limit_days}_days"

View File

@@ -31,8 +31,7 @@ def fetch_query_analytics(
func.sum(case((ChatMessageFeedback.is_positive, 1), else_=0)),
func.sum(
case(
(ChatMessageFeedback.is_positive == False, 1), # noqa: E712
else_=0, # noqa: E712
(ChatMessageFeedback.is_positive == False, 1), else_=0 # noqa: E712
)
),
cast(ChatMessage.time_sent, Date),
@@ -67,8 +66,7 @@ def fetch_per_user_query_analytics(
func.sum(case((ChatMessageFeedback.is_positive, 1), else_=0)),
func.sum(
case(
(ChatMessageFeedback.is_positive == False, 1), # noqa: E712
else_=0, # noqa: E712
(ChatMessageFeedback.is_positive == False, 1), else_=0 # noqa: E712
)
),
cast(ChatMessage.time_sent, Date),

View File

@@ -23,7 +23,8 @@ def _delete_connector_credential_pair_user_groups_relationship__no_commit(
)
if cc_pair is None:
raise ValueError(
f"ConnectorCredentialPair with connector_id: {connector_id} and credential_id: {credential_id} not found"
f"ConnectorCredentialPair with connector_id: {connector_id} "
f"and credential_id: {credential_id} not found"
)
stmt = delete(UserGroup__ConnectorCredentialPair).where(

View File

@@ -123,7 +123,8 @@ def upsert_external_groups(
user_id = email_id_map.get(user_email.lower())
if user_id is None:
logger.warning(
f"User in group {external_group.id} with email {user_email} not found"
f"User in group {external_group.id}"
f" with email {user_email} not found"
)
continue

View File

@@ -18,7 +18,7 @@ from onyx.db.models import HierarchyNode
def _build_hierarchy_access_filter(
user_email: str,
user_email: str | None,
external_group_ids: list[str],
) -> ColumnElement[bool]:
"""Build SQLAlchemy filter for hierarchy node access.
@@ -43,7 +43,7 @@ def _build_hierarchy_access_filter(
def _get_accessible_hierarchy_nodes_for_source(
db_session: Session,
source: DocumentSource,
user_email: str,
user_email: str | None,
external_group_ids: list[str],
) -> list[HierarchyNode]:
"""

View File

@@ -7,7 +7,6 @@ from onyx.db.models import Persona
from onyx.db.models import Persona__User
from onyx.db.models import Persona__UserGroup
from onyx.db.notification import create_notification
from onyx.db.persona import mark_persona_user_files_for_sync
from onyx.server.features.persona.models import PersonaSharedNotificationData
@@ -27,9 +26,7 @@ def update_persona_access(
NOTE: Callers are responsible for committing."""
needs_sync = False
if is_public is not None:
needs_sync = True
persona = db_session.query(Persona).filter(Persona.id == persona_id).first()
if persona:
persona.is_public = is_public
@@ -38,7 +35,6 @@ def update_persona_access(
# and a non-empty list means "replace with these shares".
if user_ids is not None:
needs_sync = True
db_session.query(Persona__User).filter(
Persona__User.persona_id == persona_id
).delete(synchronize_session="fetch")
@@ -58,7 +54,6 @@ def update_persona_access(
)
if group_ids is not None:
needs_sync = True
db_session.query(Persona__UserGroup).filter(
Persona__UserGroup.persona_id == persona_id
).delete(synchronize_session="fetch")
@@ -68,7 +63,3 @@ def update_persona_access(
db_session.add(
Persona__UserGroup(persona_id=persona_id, user_group_id=group_id)
)
# When sharing changes, user file ACLs need to be updated in the vector DB
if needs_sync:
mark_persona_user_files_for_sync(persona_id, db_session)

View File

@@ -191,7 +191,8 @@ def create_initial_default_standard_answer_category(db_session: Session) -> None
if default_category is not None:
if default_category.name != default_category_name:
raise ValueError(
"DB is not in a valid initial state. Default standard answer category does not have expected name."
"DB is not in a valid initial state. "
"Default standard answer category does not have expected name."
)
return

View File

@@ -424,7 +424,8 @@ def fetch_user_groups_for_documents(
def _check_user_group_is_modifiable(user_group: UserGroup) -> None:
if not user_group.is_up_to_date:
raise ValueError(
"Specified user group is currently syncing. Wait until the current sync has finished before editing."
"Specified user group is currently syncing. Wait until the current "
"sync has finished before editing."
)

View File

@@ -56,7 +56,8 @@ def _run_with_retry(
if retry_count < MAX_RETRY_COUNT:
sleep_after_rate_limit_exception(github_client)
logger.warning(
f"Rate limit exceeded while {description}. Retrying... (attempt {retry_count + 1}/{MAX_RETRY_COUNT})"
f"Rate limit exceeded while {description}. Retrying... "
f"(attempt {retry_count + 1}/{MAX_RETRY_COUNT})"
)
return _run_with_retry(
operation, description, github_client, retry_count + 1
@@ -90,9 +91,7 @@ class TeamInfo(BaseModel):
def _fetch_organization_members(
github_client: Github,
org_name: str,
retry_count: int = 0, # noqa: ARG001
github_client: Github, org_name: str, retry_count: int = 0 # noqa: ARG001
) -> List[UserInfo]:
"""Fetch all organization members including owners and regular members."""
org_members: List[UserInfo] = []
@@ -125,9 +124,7 @@ def _fetch_organization_members(
def _fetch_repository_teams_detailed(
repo: Repository,
github_client: Github,
retry_count: int = 0, # noqa: ARG001
repo: Repository, github_client: Github, retry_count: int = 0 # noqa: ARG001
) -> List[TeamInfo]:
"""Fetch teams with access to the repository and their members."""
teams_data: List[TeamInfo] = []
@@ -170,9 +167,7 @@ def _fetch_repository_teams_detailed(
def fetch_repository_team_slugs(
repo: Repository,
github_client: Github,
retry_count: int = 0, # noqa: ARG001
repo: Repository, github_client: Github, retry_count: int = 0 # noqa: ARG001
) -> List[str]:
"""Fetch team slugs with access to the repository."""
logger.info(f"Fetching team slugs for repository {repo.full_name}")

View File

@@ -68,7 +68,6 @@ def get_external_access_for_raw_gdrive_file(
company_domain: str,
retriever_drive_service: GoogleDriveService | None,
admin_drive_service: GoogleDriveService,
fallback_user_email: str,
add_prefix: bool = False,
) -> ExternalAccess:
"""
@@ -80,11 +79,6 @@ def get_external_access_for_raw_gdrive_file(
set add_prefix to True so group IDs are prefixed with the source type.
When invoked from doc_sync (permission sync), use the default (False)
since upsert_document_external_perms handles prefixing.
fallback_user_email: When we cannot retrieve any permission info for a file
(e.g. externally-owned files where the API returns no permissions
and permissions.list returns 403), fall back to granting access
to this user. This is typically the impersonated org user whose
drive contained the file.
"""
doc_id = file.get("id")
if not doc_id:
@@ -115,33 +109,14 @@ def get_external_access_for_raw_gdrive_file(
)
if len(permissions_list) != len(permission_ids) and retriever_drive_service:
logger.warning(
f"Failed to get all permissions for file {doc_id} with retriever service, trying admin service"
f"Failed to get all permissions for file {doc_id} with retriever service, "
"trying admin service"
)
backup_permissions_list = _get_permissions(admin_drive_service)
permissions_list = _merge_permissions_lists(
[permissions_list, backup_permissions_list]
)
# For externally-owned files, the Drive API may return no permissions
# and permissions.list may return 403. In this case, fall back to
# granting access to the user who found the file in their drive.
# Note, even if other users also have access to this file,
# they will not be granted access in Onyx.
# We check permissions_list (the final result after all fetch attempts)
# rather than the raw fields, because permission_ids may be present
# but the actual fetch can still return empty due to a 403.
if not permissions_list:
logger.info(
f"No permission info available for file {doc_id} "
f"(likely owned by a user outside of your organization). "
f"Falling back to granting access to retriever user: {fallback_user_email}"
)
return ExternalAccess(
external_user_emails={fallback_user_email},
external_user_group_ids=set(),
is_public=False,
)
folder_ids_to_inherit_permissions_from: set[str] = set()
user_emails: set[str] = set()
group_emails: set[str] = set()
@@ -165,7 +140,9 @@ def get_external_access_for_raw_gdrive_file(
user_emails.add(permission.email_address)
else:
logger.error(
f"Permission is type `user` but no email address is provided for document {doc_id}\n {permission}"
"Permission is type `user` but no email address is "
f"provided for document {doc_id}"
f"\n {permission}"
)
elif permission.type == PermissionType.GROUP:
# groups are represented as email addresses within Drive
@@ -173,14 +150,17 @@ def get_external_access_for_raw_gdrive_file(
group_emails.add(permission.email_address)
else:
logger.error(
f"Permission is type `group` but no email address is provided for document {doc_id}\n {permission}"
"Permission is type `group` but no email address is "
f"provided for document {doc_id}"
f"\n {permission}"
)
elif permission.type == PermissionType.DOMAIN and company_domain:
if permission.domain == company_domain:
public = True
else:
logger.warning(
f"Permission is type domain but does not match company domain:\n {permission}"
"Permission is type domain but does not match company domain:"
f"\n {permission}"
)
elif permission.type == PermissionType.ANYONE:
public = True

View File

@@ -18,7 +18,10 @@ logger = setup_logger()
# Only include fields we need - folder ID and permissions
# IMPORTANT: must fetch permissionIds, since sometimes the drive API
# seems to miss permissions when requesting them directly
FOLDER_PERMISSION_FIELDS = "nextPageToken, files(id, name, permissionIds, permissions(id, emailAddress, type, domain, permissionDetails))"
FOLDER_PERMISSION_FIELDS = (
"nextPageToken, files(id, name, permissionIds, "
"permissions(id, emailAddress, type, domain, permissionDetails))"
)
def get_folder_permissions_by_ids(

View File

@@ -142,7 +142,8 @@ def _drive_folder_to_onyx_group(
elif permission.type == PermissionType.GROUP:
if permission.email_address not in group_email_to_member_emails_map:
logger.warning(
f"Group email {permission.email_address} for folder {folder.id} not found in group_email_to_member_emails_map"
f"Group email {permission.email_address} for folder {folder.id} "
"not found in group_email_to_member_emails_map"
)
continue
folder_member_emails.update(
@@ -237,7 +238,8 @@ def _drive_member_map_to_onyx_groups(
for group_email in group_emails:
if group_email not in group_email_to_member_emails_map:
logger.warning(
f"Group email {group_email} for drive {drive_id} not found in group_email_to_member_emails_map"
f"Group email {group_email} for drive {drive_id} not found in "
"group_email_to_member_emails_map"
)
continue
drive_member_emails.update(group_email_to_member_emails_map[group_email])
@@ -324,7 +326,8 @@ def _build_onyx_groups(
for group_email in group_emails:
if group_email not in group_email_to_member_emails_map:
logger.warning(
f"Group email {group_email} for drive {drive_id} not found in group_email_to_member_emails_map"
f"Group email {group_email} for drive {drive_id} not found in "
"group_email_to_member_emails_map"
)
continue
drive_member_emails.update(group_email_to_member_emails_map[group_email])

View File

@@ -55,7 +55,8 @@ def get_permissions_by_ids(
if len(filtered_permissions) < len(permission_ids):
missing_ids = permission_id_set - {p.id for p in filtered_permissions if p.id}
logger.warning(
f"Could not find all requested permission IDs for document {doc_id}. Missing IDs: {missing_ids}"
f"Could not find all requested permission IDs for document {doc_id}. "
f"Missing IDs: {missing_ids}"
)
return filtered_permissions

View File

@@ -1,8 +1,6 @@
from collections.abc import Generator
from typing import Any
from jira import JIRA
from jira.exceptions import JIRAError
from ee.onyx.db.external_perm import ExternalUserGroup
from onyx.connectors.jira.utils import build_jira_client
@@ -11,101 +9,107 @@ from onyx.utils.logger import setup_logger
logger = setup_logger()
_ATLASSIAN_ACCOUNT_TYPE = "atlassian"
_GROUP_MEMBER_PAGE_SIZE = 50
# The GET /group/member endpoint was introduced in Jira 6.0.
# Jira versions older than 6.0 do not have group management REST APIs at all.
_MIN_JIRA_VERSION_FOR_GROUP_MEMBER = "6.0"
def _fetch_group_member_page(
def _get_jira_group_members_email(
jira_client: JIRA,
group_name: str,
start_at: int,
) -> dict[str, Any]:
"""Fetch a single page from the non-deprecated GET /group/member endpoint.
) -> list[str]:
"""Get all member emails for a Jira group.
The old GET /group endpoint (used by jira_client.group_members()) is deprecated
and decommissioned in Jira Server 10.3+. This uses the replacement endpoint
directly via the library's internal _get_json helper, following the same pattern
as enhanced_search_ids / bulk_fetch_issues in connector.py.
There is an open PR to the library to switch to this endpoint since last year:
https://github.com/pycontribs/jira/pull/2356
so once it is merged and released, we can switch to using the library function.
Filters out app accounts (bots, integrations) and only returns real user emails.
"""
emails: list[str] = []
try:
return jira_client._get_json(
"group/member",
params={
"groupname": group_name,
"includeInactiveUsers": "false",
"startAt": start_at,
"maxResults": _GROUP_MEMBER_PAGE_SIZE,
},
)
except JIRAError as e:
if e.status_code == 404:
raise RuntimeError(
f"GET /group/member returned 404 for group '{group_name}'. "
f"This endpoint requires Jira {_MIN_JIRA_VERSION_FOR_GROUP_MEMBER}+. "
f"If you are running a self-hosted Jira instance, please upgrade "
f"to at least Jira {_MIN_JIRA_VERSION_FOR_GROUP_MEMBER}."
) from e
raise
# group_members returns an OrderedDict of account_id -> member_info
members = jira_client.group_members(group=group_name)
if not members:
logger.warning(f"No members found for group {group_name}")
return emails
def _get_group_member_emails(
jira_client: JIRA,
group_name: str,
) -> set[str]:
"""Get all member emails for a single Jira group.
for account_id, member_info in members.items():
# member_info is a dict with keys like 'fullname', 'email', 'active'
email = member_info.get("email")
Uses the non-deprecated GET /group/member endpoint which returns full user
objects including accountType, so we can filter out app/customer accounts
without making separate user() calls.
"""
emails: set[str] = set()
start_at = 0
while True:
try:
page = _fetch_group_member_page(jira_client, group_name, start_at)
except Exception as e:
logger.error(f"Error fetching members for group {group_name}: {e}")
raise
members: list[dict[str, Any]] = page.get("values", [])
for member in members:
account_type = member.get("accountType")
# On Jira DC < 9.0, accountType is absent; include those users.
# On Cloud / DC 9.0+, filter to real user accounts only.
if account_type is not None and account_type != _ATLASSIAN_ACCOUNT_TYPE:
continue
email = member.get("emailAddress")
if email:
emails.add(email)
# Skip "hidden" emails - these are typically app accounts
if email and email != "hidden":
emails.append(email)
else:
logger.warning(
f"Atlassian user {member.get('accountId', 'unknown')} in group {group_name} has no visible email address"
)
# For cloud, we might need to fetch user details separately
try:
user = jira_client.user(id=account_id)
if page.get("isLast", True) or not members:
break
start_at += len(members)
# Skip app accounts (bots, integrations, etc.)
if hasattr(user, "accountType") and user.accountType == "app":
logger.info(
f"Skipping app account {account_id} for group {group_name}"
)
continue
if hasattr(user, "emailAddress") and user.emailAddress:
emails.append(user.emailAddress)
else:
logger.warning(f"User {account_id} has no email address")
except Exception as e:
logger.warning(
f"Could not fetch email for user {account_id} in group {group_name}: {e}"
)
except Exception as e:
logger.error(f"Error fetching members for group {group_name}: {e}")
return emails
def _build_group_member_email_map(
jira_client: JIRA,
) -> dict[str, set[str]]:
"""Build a map of group names to member emails."""
group_member_emails: dict[str, set[str]] = {}
try:
# Get all groups from Jira - returns a list of group name strings
group_names = jira_client.groups()
if not group_names:
logger.warning("No groups found in Jira")
return group_member_emails
logger.info(f"Found {len(group_names)} groups in Jira")
for group_name in group_names:
if not group_name:
continue
member_emails = _get_jira_group_members_email(
jira_client=jira_client,
group_name=group_name,
)
if member_emails:
group_member_emails[group_name] = set(member_emails)
logger.debug(
f"Found {len(member_emails)} members for group {group_name}"
)
else:
logger.debug(f"No members found for group {group_name}")
except Exception as e:
logger.error(f"Error building group member email map: {e}")
return group_member_emails
def jira_group_sync(
tenant_id: str, # noqa: ARG001
cc_pair: ConnectorCredentialPair,
) -> Generator[ExternalUserGroup, None, None]:
"""Sync Jira groups and their members, yielding one group at a time.
"""
Sync Jira groups and their members.
Streams group-by-group rather than accumulating all groups in memory.
This function fetches all groups from Jira and yields ExternalUserGroup
objects containing the group ID and member emails.
"""
jira_base_url = cc_pair.connector.connector_specific_config.get("jira_base_url", "")
scoped_token = cc_pair.connector.connector_specific_config.get(
@@ -126,26 +130,12 @@ def jira_group_sync(
scoped_token=scoped_token,
)
group_names = jira_client.groups()
if not group_names:
raise ValueError(f"No groups found for cc_pair_id={cc_pair.id}")
group_member_email_map = _build_group_member_email_map(jira_client=jira_client)
if not group_member_email_map:
raise ValueError(f"No groups with members found for cc_pair_id={cc_pair.id}")
logger.info(f"Found {len(group_names)} groups in Jira")
for group_name in group_names:
if not group_name:
continue
member_emails = _get_group_member_emails(
jira_client=jira_client,
group_name=group_name,
)
if not member_emails:
logger.debug(f"No members found for group {group_name}")
continue
logger.debug(f"Found {len(member_emails)} members for group {group_name}")
for group_id, group_member_emails in group_member_email_map.items():
yield ExternalUserGroup(
id=group_name,
user_emails=list(member_emails),
id=group_id,
user_emails=list(group_member_emails),
)

View File

@@ -69,7 +69,8 @@ def _post_query_chunk_censoring(
censored_chunks = censor_chunks_for_source(chunks_for_source, user.email)
except Exception as e:
logger.exception(
f"Failed to censor chunks for source {source} so throwing out all chunks for this source and continuing: {e}"
f"Failed to censor chunks for source {source} so throwing out all"
f" chunks for this source and continuing: {e}"
)
continue

View File

@@ -23,9 +23,7 @@ ContentRange = tuple[int, int | None] # (start_index, end_index) None means to
# NOTE: Used for testing timing
def _get_dummy_object_access_map(
object_ids: set[str],
user_email: str, # noqa: ARG001
chunks: list[InferenceChunk], # noqa: ARG001
object_ids: set[str], user_email: str, chunks: list[InferenceChunk] # noqa: ARG001
) -> dict[str, bool]:
time.sleep(0.15)
# return {object_id: True for object_id in object_ids}

View File

@@ -61,7 +61,8 @@ def _graph_api_get(
):
wait = min(int(resp.headers.get("Retry-After", str(2**attempt))), 60)
logger.warning(
f"Graph API {resp.status_code} on attempt {attempt + 1}, retrying in {wait}s: {url}"
f"Graph API {resp.status_code} on attempt {attempt + 1}, "
f"retrying in {wait}s: {url}"
)
time.sleep(wait)
continue
@@ -71,7 +72,8 @@ def _graph_api_get(
if attempt < GRAPH_API_MAX_RETRIES:
wait = min(2**attempt, 60)
logger.warning(
f"Graph API connection error on attempt {attempt + 1}, retrying in {wait}s: {url}"
f"Graph API connection error on attempt {attempt + 1}, "
f"retrying in {wait}s: {url}"
)
time.sleep(wait)
continue
@@ -765,7 +767,8 @@ def get_sharepoint_external_groups(
if not enumerate_all_ad_groups or get_access_token is None:
logger.info(
"Skipping exhaustive Azure AD group enumeration. Only groups found in site role assignments are included."
"Skipping exhaustive Azure AD group enumeration. "
"Only groups found in site role assignments are included."
)
return external_user_groups

View File

@@ -166,7 +166,8 @@ def slack_doc_sync(
user_id_to_email_map = fetch_user_id_to_email_map(slack_client)
if not user_id_to_email_map:
raise ValueError(
"No user id to email map found. Please check to make sure that your Slack bot token has the `users:read.email` scope"
"No user id to email map found. Please check to make sure that "
"your Slack bot token has the `users:read.email` scope"
)
workspace_permissions = _fetch_workspace_permissions(

View File

@@ -4,6 +4,7 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI
from httpx_oauth.clients.google import GoogleOAuth2
from ee.onyx.configs.app_configs import LICENSE_ENFORCEMENT_ENABLED
from ee.onyx.server.analytics.api import router as analytics_router
from ee.onyx.server.auth_check import check_ee_router_auth
from ee.onyx.server.billing.api import router as billing_router
@@ -152,9 +153,12 @@ def get_application() -> FastAPI:
# License management
include_router_with_global_prefix_prepended(application, license_router)
# Unified billing API - always registered in EE.
# Each endpoint is protected by the `current_admin_user` dependency (admin auth).
include_router_with_global_prefix_prepended(application, billing_router)
# Unified billing API - available when license system is enabled
# Works for both self-hosted and cloud deployments
# TODO(ENG-3533): Once frontend migrates to /admin/billing/*, this becomes the
# primary billing API and /tenants/* billing endpoints can be removed
if LICENSE_ENFORCEMENT_ENABLED:
include_router_with_global_prefix_prepended(application, billing_router)
if MULTI_TENANT:
# Tenant management

View File

@@ -4,7 +4,6 @@ from typing import List
from fastapi import APIRouter
from fastapi import Depends
from fastapi import HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
@@ -22,6 +21,8 @@ from onyx.auth.users import current_user
from onyx.configs.constants import PUBLIC_API_TAGS
from onyx.db.engine.sql_engine import get_session
from onyx.db.models import User
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
router = APIRouter(prefix="/analytics", tags=PUBLIC_API_TAGS)
@@ -231,8 +232,9 @@ def get_assistant_stats(
end = end or datetime.datetime.utcnow()
if not user_can_view_assistant_stats(db_session, user, assistant_id):
raise HTTPException(
status_code=403, detail="Not allowed to access this assistant's stats."
raise OnyxError(
OnyxErrorCode.INSUFFICIENT_PERMISSIONS,
"Not allowed to access this assistant's stats.",
)
# Pull daily usage from the DB calls

View File

@@ -246,11 +246,7 @@ async def get_billing_information(
)
except OnyxError as e:
# Open circuit breaker on connection failures (self-hosted only)
if e.status_code in (
OnyxErrorCode.BAD_GATEWAY.status_code,
OnyxErrorCode.SERVICE_UNAVAILABLE.status_code,
OnyxErrorCode.GATEWAY_TIMEOUT.status_code,
):
if e.status_code in (502, 503, 504):
_open_billing_circuit()
raise

View File

@@ -4,7 +4,6 @@ from typing import Any
from typing import cast
from typing import IO
from fastapi import HTTPException
from fastapi import UploadFile
from ee.onyx.server.enterprise_settings.models import AnalyticsScriptUpload
@@ -13,6 +12,8 @@ from onyx.configs.constants import FileOrigin
from onyx.configs.constants import KV_CUSTOM_ANALYTICS_SCRIPT_KEY
from onyx.configs.constants import KV_ENTERPRISE_SETTINGS_KEY
from onyx.configs.constants import ONYX_DEFAULT_APPLICATION_NAME
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.file_store.file_store import get_default_file_store
from onyx.key_value_store.factory import get_kv_store
from onyx.key_value_store.interface import KvKeyNotFoundError
@@ -118,9 +119,9 @@ def upload_logo(file: UploadFile | str, is_logotype: bool = False) -> bool:
else:
logger.notice("Uploading logo from uploaded file")
if not file.filename or not is_valid_file_type(file.filename):
raise HTTPException(
status_code=400,
detail="Invalid file type- only .png, .jpg, and .jpeg files are allowed",
raise OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
"Invalid file type- only .png, .jpg, and .jpeg files are allowed",
)
content = file.file
display_name = file.filename

View File

@@ -1,6 +1,5 @@
from fastapi import APIRouter
from fastapi import Depends
from fastapi import HTTPException
from sqlalchemy.orm import Session
from ee.onyx.db.standard_answer import fetch_standard_answer
@@ -15,6 +14,8 @@ from ee.onyx.db.standard_answer import update_standard_answer_category
from onyx.auth.users import current_admin_user
from onyx.db.engine.sql_engine import get_session
from onyx.db.models import User
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.server.manage.models import StandardAnswer
from onyx.server.manage.models import StandardAnswerCategory
from onyx.server.manage.models import StandardAnswerCategoryCreationRequest
@@ -65,7 +66,7 @@ def patch_standard_answer(
)
if existing_standard_answer is None:
raise HTTPException(status_code=404, detail="Standard answer not found")
raise OnyxError(OnyxErrorCode.NOT_FOUND, "Standard answer not found")
standard_answer_model = update_standard_answer(
standard_answer_id=standard_answer_id,
@@ -131,9 +132,7 @@ def patch_standard_answer_category(
)
if existing_standard_answer_category is None:
raise HTTPException(
status_code=404, detail="Standard answer category not found"
)
raise OnyxError(OnyxErrorCode.NOT_FOUND, "Standard answer category not found")
standard_answer_category_model = update_standard_answer_category(
standard_answer_category_id=standard_answer_category_id,

View File

@@ -152,7 +152,10 @@ def create_new_usage_report(
zip_buffer.seek(0)
# store zip blob to file_store
report_name = f"{datetime.now(tz=timezone.utc).strftime('%Y-%m-%d')}_{report_id}_usage_report.zip"
report_name = (
f"{datetime.now(tz=timezone.utc).strftime('%Y-%m-%d')}"
f"_{report_id}_usage_report.zip"
)
file_store.save_file(
content=zip_buffer,
display_name=report_name,

View File

@@ -449,7 +449,8 @@ def _apply_group_remove(
match = _MEMBER_FILTER_RE.match(op.path)
if not match:
raise ScimPatchError(
f"Unsupported remove path '{op.path}'. Expected: members[value eq \"user-id\"]"
f"Unsupported remove path '{op.path}'. "
'Expected: members[value eq "user-id"]'
)
target_id = match.group(1)

View File

@@ -26,7 +26,6 @@ from onyx.db.models import Tool
from onyx.db.persona import upsert_persona
from onyx.server.features.persona.models import PersonaUpsertRequest
from onyx.server.manage.llm.models import LLMProviderUpsertRequest
from onyx.server.manage.llm.models import LLMProviderView
from onyx.server.settings.models import Settings
from onyx.server.settings.store import store_settings as store_base_settings
from onyx.utils.logger import setup_logger
@@ -126,16 +125,10 @@ def _seed_llms(
existing = fetch_existing_llm_provider(name=request.name, db_session=db_session)
if existing:
request.id = existing.id
seeded_providers: list[LLMProviderView] = []
for llm_upsert_request in llm_upsert_requests:
try:
seeded_providers.append(upsert_llm_provider(llm_upsert_request, db_session))
except ValueError as e:
logger.warning(
"Failed to upsert LLM provider '%s' during seeding: %s",
llm_upsert_request.name,
e,
)
seeded_providers = [
upsert_llm_provider(llm_upsert_request, db_session)
for llm_upsert_request in llm_upsert_requests
]
default_provider = next(
(p for p in seeded_providers if p.model_configurations), None

View File

@@ -123,8 +123,7 @@ async def get_or_provision_tenant(
async def create_tenant(
email: str,
referral_source: str | None = None, # noqa: ARG001
email: str, referral_source: str | None = None # noqa: ARG001
) -> str:
"""
Create a new tenant on-demand when no pre-provisioned tenants are available.
@@ -680,9 +679,7 @@ async def setup_tenant(tenant_id: str) -> None:
async def assign_tenant_to_user(
tenant_id: str,
email: str,
referral_source: str | None = None, # noqa: ARG001
tenant_id: str, email: str, referral_source: str | None = None # noqa: ARG001
) -> None:
"""
Assign a tenant to a user and perform necessary operations.

View File

@@ -14,90 +14,67 @@ from onyx.utils.variable_functionality import fetch_versioned_implementation
logger = setup_logger()
@lru_cache(maxsize=2)
@lru_cache(maxsize=1)
def _get_trimmed_key(key: str) -> bytes:
encoded_key = key.encode()
key_length = len(encoded_key)
if key_length < 16:
raise RuntimeError("Invalid ENCRYPTION_KEY_SECRET - too short")
elif key_length > 32:
key = key[:32]
elif key_length not in (16, 24, 32):
valid_lengths = [16, 24, 32]
key = key[: min(valid_lengths, key=lambda x: abs(x - key_length))]
# Trim to the largest valid AES key size that fits
valid_lengths = [32, 24, 16]
for size in valid_lengths:
if key_length >= size:
return encoded_key[:size]
raise AssertionError("unreachable")
return encoded_key
def _encrypt_string(input_str: str, key: str | None = None) -> bytes:
effective_key = key if key is not None else ENCRYPTION_KEY_SECRET
if not effective_key:
def _encrypt_string(input_str: str) -> bytes:
if not ENCRYPTION_KEY_SECRET:
return input_str.encode()
trimmed = _get_trimmed_key(effective_key)
key = _get_trimmed_key(ENCRYPTION_KEY_SECRET)
iv = urandom(16)
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(input_str.encode()) + padder.finalize()
cipher = Cipher(algorithms.AES(trimmed), modes.CBC(iv), backend=default_backend())
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
return iv + encrypted_data
def _decrypt_bytes(input_bytes: bytes, key: str | None = None) -> str:
effective_key = key if key is not None else ENCRYPTION_KEY_SECRET
if not effective_key:
def _decrypt_bytes(input_bytes: bytes) -> str:
if not ENCRYPTION_KEY_SECRET:
return input_bytes.decode()
trimmed = _get_trimmed_key(effective_key)
try:
iv = input_bytes[:16]
encrypted_data = input_bytes[16:]
key = _get_trimmed_key(ENCRYPTION_KEY_SECRET)
iv = input_bytes[:16]
encrypted_data = input_bytes[16:]
cipher = Cipher(
algorithms.AES(trimmed), modes.CBC(iv), backend=default_backend()
)
decryptor = cipher.decryptor()
decrypted_padded_data = decryptor.update(encrypted_data) + decryptor.finalize()
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_padded_data = decryptor.update(encrypted_data) + decryptor.finalize()
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
decrypted_data = unpadder.update(decrypted_padded_data) + unpadder.finalize()
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
decrypted_data = unpadder.update(decrypted_padded_data) + unpadder.finalize()
return decrypted_data.decode()
except (ValueError, UnicodeDecodeError):
if key is not None:
# Explicit key was provided — don't fall back silently
raise
# Read path: attempt raw UTF-8 decode as a fallback for legacy data.
# Does NOT handle data encrypted with a different key — that
# ciphertext is not valid UTF-8 and will raise below.
logger.warning(
"AES decryption failed — falling back to raw decode. Run the re-encrypt secrets script to rotate to the current key."
)
try:
return input_bytes.decode()
except UnicodeDecodeError:
raise ValueError(
"Data is not valid UTF-8 — likely encrypted with a different key. "
"Run the re-encrypt secrets script to rotate to the current key."
) from None
return decrypted_data.decode()
def encrypt_string_to_bytes(input_str: str, key: str | None = None) -> bytes:
def encrypt_string_to_bytes(input_str: str) -> bytes:
versioned_encryption_fn = fetch_versioned_implementation(
"onyx.utils.encryption", "_encrypt_string"
)
return versioned_encryption_fn(input_str, key=key)
return versioned_encryption_fn(input_str)
def decrypt_bytes_to_string(input_bytes: bytes, key: str | None = None) -> str:
def decrypt_bytes_to_string(input_bytes: bytes) -> str:
versioned_decryption_fn = fetch_versioned_implementation(
"onyx.utils.encryption", "_decrypt_bytes"
)
return versioned_decryption_fn(input_bytes, key=key)
return versioned_decryption_fn(input_bytes)
def test_encryption() -> None:

View File

@@ -1,6 +1,7 @@
from collections.abc import Callable
from typing import cast
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import Session
from onyx.access.models import DocumentAccess
@@ -11,7 +12,6 @@ from onyx.db.document import get_access_info_for_document
from onyx.db.document import get_access_info_for_documents
from onyx.db.models import User
from onyx.db.models import UserFile
from onyx.db.user_file import fetch_user_files_with_access_relationships
from onyx.utils.variable_functionality import fetch_ee_implementation_or_noop
from onyx.utils.variable_functionality import fetch_versioned_implementation
@@ -96,9 +96,7 @@ def get_access_for_documents(
return versioned_get_access_for_documents_fn(document_ids, db_session)
def _get_acl_for_user(
user: User, db_session: Session # noqa: ARG001
) -> set[str]: # noqa: ARG001
def _get_acl_for_user(user: User, db_session: Session) -> set[str]: # noqa: ARG001
"""Returns a list of ACL entries that the user has access to. This is meant to be
used downstream to filter out documents that the user does not have access to. The
user should have access to a document if at least one entry in the document's ACL
@@ -134,61 +132,19 @@ def get_access_for_user_files(
user_file_ids: list[str],
db_session: Session,
) -> dict[str, DocumentAccess]:
versioned_fn = fetch_versioned_implementation(
"onyx.access.access", "get_access_for_user_files_impl"
user_files = (
db_session.query(UserFile)
.options(joinedload(UserFile.user)) # Eager load the user relationship
.filter(UserFile.id.in_(user_file_ids))
.all()
)
return versioned_fn(user_file_ids, db_session)
def get_access_for_user_files_impl(
user_file_ids: list[str],
db_session: Session,
) -> dict[str, DocumentAccess]:
user_files = fetch_user_files_with_access_relationships(user_file_ids, db_session)
return build_access_for_user_files_impl(user_files)
def build_access_for_user_files(
user_files: list[UserFile],
) -> dict[str, DocumentAccess]:
"""Compute access from pre-loaded UserFile objects (with relationships).
Callers must ensure UserFile.user, Persona.users, and Persona.user are
eagerly loaded (and Persona.groups for the EE path)."""
versioned_fn = fetch_versioned_implementation(
"onyx.access.access", "build_access_for_user_files_impl"
)
return versioned_fn(user_files)
def build_access_for_user_files_impl(
user_files: list[UserFile],
) -> dict[str, DocumentAccess]:
result: dict[str, DocumentAccess] = {}
for user_file in user_files:
emails, is_public = collect_user_file_access(user_file)
result[str(user_file.id)] = DocumentAccess.build(
user_emails=list(emails),
return {
str(user_file.id): DocumentAccess.build(
user_emails=[user_file.user.email] if user_file.user else [],
user_groups=[],
is_public=is_public,
is_public=True if user_file.user is None else False,
external_user_emails=[],
external_user_group_ids=[],
)
return result
def collect_user_file_access(user_file: UserFile) -> tuple[set[str], bool]:
"""Collect all user emails that should have access to this user file.
Includes the owner plus any users who have access via shared personas.
Returns (emails, is_public)."""
emails: set[str] = {user_file.user.email}
is_public = False
for persona in user_file.assistants:
if persona.deleted:
continue
if persona.is_public:
is_public = True
if persona.user_id is not None and persona.user:
emails.add(persona.user.email)
for shared_user in persona.users:
emails.add(shared_user.email)
return emails, is_public
for user_file in user_files
}

View File

@@ -5,8 +5,7 @@ from onyx.utils.variable_functionality import fetch_versioned_implementation
def _get_user_external_group_ids(
db_session: Session, # noqa: ARG001
user: User, # noqa: ARG001
db_session: Session, user: User # noqa: ARG001
) -> list[str]:
return []

View File

@@ -8,6 +8,7 @@ from onyx.configs.constants import PUBLIC_DOC_PAT
@dataclass(frozen=True)
class ExternalAccess:
# arbitrary limit to prevent excessively large permissions sets
# not internally enforced ... the caller can check this before using the instance
MAX_NUM_ENTRIES = 5000

View File

@@ -96,7 +96,8 @@ async def verify_captcha_token(
)
logger.debug(
f"Captcha verification passed: score={result.score}, action={result.action}"
f"Captcha verification passed: score={result.score}, "
f"action={result.action}"
)
except httpx.HTTPError as e:

View File

@@ -353,11 +353,20 @@ def build_user_email_invite(
"or login with Google and complete your registration.</p>"
)
elif auth_type == AuthType.BASIC:
message += "<p>To join the organization, please click the button below to set a password and complete your registration.</p>"
message += (
"<p>To join the organization, please click the button below to set a password "
"and complete your registration.</p>"
)
elif auth_type == AuthType.GOOGLE_OAUTH:
message += "<p>To join the organization, please click the button below to login with Google and complete your registration.</p>"
message += (
"<p>To join the organization, please click the button below to login with Google "
"and complete your registration.</p>"
)
elif auth_type == AuthType.OIDC or auth_type == AuthType.SAML:
message += "<p>To join the organization, please click the button below to complete your registration.</p>"
message += (
"<p>To join the organization, please click the button below to"
" complete your registration.</p>"
)
else:
raise ValueError(f"Invalid auth type: {auth_type}")

View File

@@ -1,7 +1,4 @@
import base64
import hashlib
import json
import os
import random
import secrets
import string
@@ -31,8 +28,6 @@ from fastapi import Query
from fastapi import Request
from fastapi import Response
from fastapi import status
from fastapi import WebSocket
from fastapi.responses import JSONResponse
from fastapi.responses import RedirectResponse
from fastapi.security import OAuth2PasswordRequestForm
from fastapi_users import BaseUserManager
@@ -59,7 +54,6 @@ from fastapi_users.router.common import ErrorModel
from fastapi_users_db_sqlalchemy import SQLAlchemyUserDatabase
from httpx_oauth.integrations.fastapi import OAuth2AuthorizeCallback
from httpx_oauth.oauth2 import BaseOAuth2
from httpx_oauth.oauth2 import GetAccessTokenError
from httpx_oauth.oauth2 import OAuth2Token
from pydantic import BaseModel
from sqlalchemy import nulls_last
@@ -125,12 +119,7 @@ from onyx.db.models import Persona
from onyx.db.models import User
from onyx.db.pat import fetch_user_for_pat
from onyx.db.users import get_user_by_email
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import log_onyx_error
from onyx.error_handling.exceptions import onyx_error_to_json_response
from onyx.error_handling.exceptions import OnyxError
from onyx.redis.redis_pool import get_async_redis_connection
from onyx.redis.redis_pool import retrieve_ws_token_data
from onyx.server.settings.store import load_settings
from onyx.server.utils import BasicAuthenticationError
from onyx.utils.logger import setup_logger
@@ -156,21 +145,10 @@ def is_user_admin(user: User) -> bool:
def verify_auth_setting() -> None:
"""Log warnings for AUTH_TYPE issues.
This only runs on app startup not during migrations/scripts.
"""
raw_auth_type = (os.environ.get("AUTH_TYPE") or "").lower()
if raw_auth_type == "cloud":
if AUTH_TYPE == AuthType.CLOUD:
raise ValueError(
"'cloud' is not a valid auth type for self-hosted deployments."
f"{AUTH_TYPE.value} is not a valid auth type for self-hosted deployments."
)
if raw_auth_type == "disabled":
logger.warning(
"AUTH_TYPE='disabled' is no longer supported. Using 'basic' instead. Please update your configuration."
)
logger.notice(f"Using Auth Type: {AUTH_TYPE.value}")
@@ -611,7 +589,8 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
char in PASSWORD_SPECIAL_CHARS for char in password
):
raise exceptions.InvalidPasswordException(
reason=f"Password must contain at least one special character from the following set: {PASSWORD_SPECIAL_CHARS}."
reason="Password must contain at least one special character from the following set: "
f"{PASSWORD_SPECIAL_CHARS}."
)
return
@@ -878,10 +857,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
)
async def on_after_forgot_password(
self,
user: User,
token: str,
request: Optional[Request] = None, # noqa: ARG002
self, user: User, token: str, request: Optional[Request] = None # noqa: ARG002
) -> None:
if not EMAIL_CONFIGURED:
logger.error(
@@ -900,10 +876,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
send_forgot_password_email(user.email, tenant_id=tenant_id, token=token)
async def on_after_request_verify(
self,
user: User,
token: str,
request: Optional[Request] = None, # noqa: ARG002
self, user: User, token: str, request: Optional[Request] = None # noqa: ARG002
) -> None:
verify_email_domain(user.email)
@@ -1199,9 +1172,7 @@ class SingleTenantJWTStrategy(JWTStrategy[User, uuid.UUID]):
return
async def refresh_token(
self,
token: Optional[str], # noqa: ARG002
user: User, # noqa: ARG002
self, token: Optional[str], user: User # noqa: ARG002
) -> str:
"""Issue a fresh JWT with a new expiry."""
return await self.write_token(user)
@@ -1229,7 +1200,8 @@ def get_jwt_strategy() -> SingleTenantJWTStrategy:
if AUTH_BACKEND == AuthBackend.JWT:
if MULTI_TENANT or AUTH_TYPE == AuthType.CLOUD:
raise ValueError(
"JWT auth backend is only supported for single-tenant, self-hosted deployments. Use 'redis' or 'postgres' instead."
"JWT auth backend is only supported for single-tenant, self-hosted deployments. "
"Use 'redis' or 'postgres' instead."
)
if not USER_AUTH_SECRET:
raise ValueError("USER_AUTH_SECRET is required for JWT auth backend.")
@@ -1627,102 +1599,6 @@ async def current_admin_user(user: User = Depends(current_user)) -> User:
return user
async def _get_user_from_token_data(token_data: dict) -> User | None:
"""Shared logic: token data dict → User object.
Args:
token_data: Decoded token data containing 'sub' (user ID).
Returns:
User object if found and active, None otherwise.
"""
user_id = token_data.get("sub")
if not user_id:
return None
try:
user_uuid = uuid.UUID(user_id)
except ValueError:
return None
async with get_async_session_context_manager() as async_db_session:
user = await async_db_session.get(User, user_uuid)
if user is None or not user.is_active:
return None
return user
async def current_user_from_websocket(
websocket: WebSocket,
token: str = Query(..., description="WebSocket authentication token"),
) -> User:
"""
WebSocket authentication dependency using query parameter.
Validates the WS token from query param and returns the User.
Raises BasicAuthenticationError if authentication fails.
The token must be obtained from POST /voice/ws-token before connecting.
Tokens are single-use and expire after 60 seconds.
Usage:
1. POST /voice/ws-token -> {"token": "xxx"}
2. Connect to ws://host/path?token=xxx
This applies the same auth checks as current_user() for HTTP endpoints.
"""
# Check Origin header to prevent Cross-Site WebSocket Hijacking (CSWSH)
# Browsers always send Origin on WebSocket connections
origin = websocket.headers.get("origin")
expected_origin = WEB_DOMAIN.rstrip("/")
if not origin:
logger.warning("WS auth: missing Origin header")
raise BasicAuthenticationError(detail="Access denied. Missing origin.")
actual_origin = origin.rstrip("/")
if actual_origin != expected_origin:
logger.warning(
f"WS auth: origin mismatch. Expected {expected_origin}, got {actual_origin}"
)
raise BasicAuthenticationError(detail="Access denied. Invalid origin.")
# Validate WS token in Redis (single-use, deleted after retrieval)
try:
token_data = await retrieve_ws_token_data(token)
if token_data is None:
raise BasicAuthenticationError(
detail="Access denied. Invalid or expired authentication token."
)
except BasicAuthenticationError:
raise
except Exception as e:
logger.error(f"WS auth: error during token validation: {e}")
raise BasicAuthenticationError(
detail="Authentication verification failed."
) from e
# Get user from token data
user = await _get_user_from_token_data(token_data)
if user is None:
logger.warning(f"WS auth: user not found for id={token_data.get('sub')}")
raise BasicAuthenticationError(
detail="Access denied. User not found or inactive."
)
# Apply same checks as HTTP auth (verification, OIDC expiry, role)
user = await double_check_user(user)
# Block LIMITED users (same as current_user)
if user.role == UserRole.LIMITED:
logger.warning(f"WS auth: user {user.email} has LIMITED role")
raise BasicAuthenticationError(
detail="Access denied. User role is LIMITED. BASIC or higher permissions are required.",
)
logger.debug(f"WS auth: authenticated {user.email}")
return user
def get_default_admin_user_emails_() -> list[str]:
# No default seeding available for Onyx MIT
return []
@@ -1732,7 +1608,6 @@ STATE_TOKEN_AUDIENCE = "fastapi-users:oauth-state"
STATE_TOKEN_LIFETIME_SECONDS = 3600
CSRF_TOKEN_KEY = "csrftoken"
CSRF_TOKEN_COOKIE_NAME = "fastapiusersoauthcsrf"
PKCE_COOKIE_NAME_PREFIX = "fastapiusersoauthpkce"
class OAuth2AuthorizeResponse(BaseModel):
@@ -1753,21 +1628,6 @@ def generate_csrf_token() -> str:
return secrets.token_urlsafe(32)
def _base64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def generate_pkce_pair() -> tuple[str, str]:
verifier = secrets.token_urlsafe(64)
challenge = _base64url_encode(hashlib.sha256(verifier.encode("ascii")).digest())
return verifier, challenge
def get_pkce_cookie_name(state: str) -> str:
state_hash = hashlib.sha256(state.encode("utf-8")).hexdigest()
return f"{PKCE_COOKIE_NAME_PREFIX}_{state_hash}"
# refer to https://github.com/fastapi-users/fastapi-users/blob/42ddc241b965475390e2bce887b084152ae1a2cd/fastapi_users/fastapi_users.py#L91
def create_onyx_oauth_router(
oauth_client: BaseOAuth2,
@@ -1776,7 +1636,6 @@ def create_onyx_oauth_router(
redirect_url: Optional[str] = None,
associate_by_email: bool = False,
is_verified_by_default: bool = False,
enable_pkce: bool = False,
) -> APIRouter:
return get_oauth_router(
oauth_client,
@@ -1786,7 +1645,6 @@ def create_onyx_oauth_router(
redirect_url,
associate_by_email,
is_verified_by_default,
enable_pkce=enable_pkce,
)
@@ -1805,7 +1663,6 @@ def get_oauth_router(
csrf_token_cookie_secure: Optional[bool] = None,
csrf_token_cookie_httponly: bool = True,
csrf_token_cookie_samesite: Optional[Literal["lax", "strict", "none"]] = "lax",
enable_pkce: bool = False,
) -> APIRouter:
"""Generate a router with the OAuth routes."""
router = APIRouter()
@@ -1822,13 +1679,6 @@ def get_oauth_router(
route_name=callback_route_name,
)
async def null_access_token_state() -> tuple[OAuth2Token, Optional[str]] | None:
return None
access_token_state_dependency = (
oauth2_authorize_callback if not enable_pkce else null_access_token_state
)
if csrf_token_cookie_secure is None:
csrf_token_cookie_secure = WEB_DOMAIN.startswith("https")
@@ -1862,26 +1712,13 @@ def get_oauth_router(
CSRF_TOKEN_KEY: csrf_token,
}
state = generate_state_token(state_data, state_secret)
pkce_cookie: tuple[str, str] | None = None
if enable_pkce:
code_verifier, code_challenge = generate_pkce_pair()
pkce_cookie_name = get_pkce_cookie_name(state)
pkce_cookie = (pkce_cookie_name, code_verifier)
authorization_url = await oauth_client.get_authorization_url(
authorize_redirect_url,
state,
scopes,
code_challenge=code_challenge,
code_challenge_method="S256",
)
else:
# Get the basic authorization URL
authorization_url = await oauth_client.get_authorization_url(
authorize_redirect_url,
state,
scopes,
)
# Get the basic authorization URL
authorization_url = await oauth_client.get_authorization_url(
authorize_redirect_url,
state,
scopes,
)
# For Google OAuth, add parameters to request refresh tokens
if oauth_client.name == "google":
@@ -1889,15 +1726,11 @@ def get_oauth_router(
authorization_url, {"access_type": "offline", "prompt": "consent"}
)
def set_oauth_cookie(
target_response: Response,
*,
key: str,
value: str,
) -> None:
target_response.set_cookie(
key=key,
value=value,
if redirect:
redirect_response = RedirectResponse(authorization_url, status_code=302)
redirect_response.set_cookie(
key=csrf_token_cookie_name,
value=csrf_token,
max_age=STATE_TOKEN_LIFETIME_SECONDS,
path=csrf_token_cookie_path,
domain=csrf_token_cookie_domain,
@@ -1905,28 +1738,18 @@ def get_oauth_router(
httponly=csrf_token_cookie_httponly,
samesite=csrf_token_cookie_samesite,
)
return redirect_response
response_with_cookies: Response
if redirect:
response_with_cookies = RedirectResponse(authorization_url, status_code=302)
else:
response_with_cookies = response
set_oauth_cookie(
response_with_cookies,
response.set_cookie(
key=csrf_token_cookie_name,
value=csrf_token,
max_age=STATE_TOKEN_LIFETIME_SECONDS,
path=csrf_token_cookie_path,
domain=csrf_token_cookie_domain,
secure=csrf_token_cookie_secure,
httponly=csrf_token_cookie_httponly,
samesite=csrf_token_cookie_samesite,
)
if pkce_cookie is not None:
pkce_cookie_name, code_verifier = pkce_cookie
set_oauth_cookie(
response_with_cookies,
key=pkce_cookie_name,
value=code_verifier,
)
if redirect:
return response_with_cookies
return OAuth2AuthorizeResponse(authorization_url=authorization_url)
@@ -1957,242 +1780,119 @@ def get_oauth_router(
)
async def callback(
request: Request,
access_token_state: Tuple[OAuth2Token, Optional[str]] | None = Depends(
access_token_state_dependency
access_token_state: Tuple[OAuth2Token, str] = Depends(
oauth2_authorize_callback
),
code: Optional[str] = None,
state: Optional[str] = None,
error: Optional[str] = None,
user_manager: BaseUserManager[models.UP, models.ID] = Depends(get_user_manager),
strategy: Strategy[models.UP, models.ID] = Depends(backend.get_strategy),
) -> Response:
pkce_cookie_name: str | None = None
) -> RedirectResponse:
token, state = access_token_state
account_id, account_email = await oauth_client.get_id_email(
token["access_token"]
)
def delete_pkce_cookie(response: Response) -> None:
if enable_pkce and pkce_cookie_name:
response.delete_cookie(
key=pkce_cookie_name,
path=csrf_token_cookie_path,
domain=csrf_token_cookie_domain,
secure=csrf_token_cookie_secure,
httponly=csrf_token_cookie_httponly,
samesite=csrf_token_cookie_samesite,
)
def build_error_response(exc: OnyxError) -> JSONResponse:
log_onyx_error(exc)
error_response = onyx_error_to_json_response(exc)
delete_pkce_cookie(error_response)
return error_response
def decode_and_validate_state(state_value: str) -> Dict[str, str]:
try:
state_data = decode_jwt(
state_value, state_secret, [STATE_TOKEN_AUDIENCE]
)
except jwt.DecodeError:
raise OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
getattr(
ErrorCode,
"ACCESS_TOKEN_DECODE_ERROR",
"ACCESS_TOKEN_DECODE_ERROR",
),
)
except jwt.ExpiredSignatureError:
raise OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
getattr(
ErrorCode,
"ACCESS_TOKEN_ALREADY_EXPIRED",
"ACCESS_TOKEN_ALREADY_EXPIRED",
),
)
except jwt.PyJWTError:
raise OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
getattr(
ErrorCode,
"ACCESS_TOKEN_DECODE_ERROR",
"ACCESS_TOKEN_DECODE_ERROR",
),
)
cookie_csrf_token = request.cookies.get(csrf_token_cookie_name)
state_csrf_token = state_data.get(CSRF_TOKEN_KEY)
if (
not cookie_csrf_token
or not state_csrf_token
or not secrets.compare_digest(cookie_csrf_token, state_csrf_token)
):
raise OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
getattr(ErrorCode, "OAUTH_INVALID_STATE", "OAUTH_INVALID_STATE"),
)
return state_data
token: OAuth2Token
state_data: Dict[str, str]
# `code`, `state`, and `error` are read directly only in the PKCE path.
# In the non-PKCE path, `oauth2_authorize_callback` consumes them.
if enable_pkce:
if state is not None:
pkce_cookie_name = get_pkce_cookie_name(state)
if error is not None:
return build_error_response(
OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
"Authorization request failed or was denied",
)
)
if code is None:
return build_error_response(
OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
"Missing authorization code in OAuth callback",
)
)
if state is None:
return build_error_response(
OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
"Missing state parameter in OAuth callback",
)
)
state_value = state
if redirect_url is not None:
callback_redirect_url = redirect_url
else:
callback_path = request.app.url_path_for(callback_route_name)
callback_redirect_url = f"{WEB_DOMAIN}{callback_path}"
code_verifier = request.cookies.get(cast(str, pkce_cookie_name))
if not code_verifier:
return build_error_response(
OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
"Missing PKCE verifier cookie in OAuth callback",
)
)
try:
state_data = decode_and_validate_state(state_value)
except OnyxError as e:
return build_error_response(e)
try:
token = await oauth_client.get_access_token(
code, callback_redirect_url, code_verifier
)
except GetAccessTokenError:
return build_error_response(
OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
"Authorization code exchange failed",
)
)
else:
if access_token_state is None:
raise OnyxError(
OnyxErrorCode.INTERNAL_ERROR, "Missing OAuth callback state"
)
token, callback_state = access_token_state
if callback_state is None:
raise OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
"Missing state parameter in OAuth callback",
)
state_data = decode_and_validate_state(callback_state)
async def complete_login_flow(
token: OAuth2Token, state_data: Dict[str, str]
) -> RedirectResponse:
account_id, account_email = await oauth_client.get_id_email(
token["access_token"]
if account_email is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorCode.OAUTH_NOT_AVAILABLE_EMAIL,
)
if account_email is None:
raise OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
ErrorCode.OAUTH_NOT_AVAILABLE_EMAIL,
)
try:
state_data = decode_jwt(state, state_secret, [STATE_TOKEN_AUDIENCE])
except jwt.DecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=getattr(
ErrorCode, "ACCESS_TOKEN_DECODE_ERROR", "ACCESS_TOKEN_DECODE_ERROR"
),
)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=getattr(
ErrorCode,
"ACCESS_TOKEN_ALREADY_EXPIRED",
"ACCESS_TOKEN_ALREADY_EXPIRED",
),
)
next_url = state_data.get("next_url", "/")
referral_source = state_data.get("referral_source", None)
try:
tenant_id = fetch_ee_implementation_or_noop(
"onyx.server.tenants.user_mapping", "get_tenant_id_for_email", None
)(account_email)
except exceptions.UserNotExists:
tenant_id = None
cookie_csrf_token = request.cookies.get(csrf_token_cookie_name)
state_csrf_token = state_data.get(CSRF_TOKEN_KEY)
if (
not cookie_csrf_token
or not state_csrf_token
or not secrets.compare_digest(cookie_csrf_token, state_csrf_token)
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=getattr(ErrorCode, "OAUTH_INVALID_STATE", "OAUTH_INVALID_STATE"),
)
request.state.referral_source = referral_source
next_url = state_data.get("next_url", "/")
referral_source = state_data.get("referral_source", None)
try:
tenant_id = fetch_ee_implementation_or_noop(
"onyx.server.tenants.user_mapping", "get_tenant_id_for_email", None
)(account_email)
except exceptions.UserNotExists:
tenant_id = None
# Proceed to authenticate or create the user
try:
user = await user_manager.oauth_callback(
oauth_client.name,
token["access_token"],
account_id,
account_email,
token.get("expires_at"),
token.get("refresh_token"),
request,
associate_by_email=associate_by_email,
is_verified_by_default=is_verified_by_default,
)
except UserAlreadyExists:
raise OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
ErrorCode.OAUTH_USER_ALREADY_EXISTS,
)
request.state.referral_source = referral_source
if not user.is_active:
raise OnyxError(
OnyxErrorCode.VALIDATION_ERROR,
ErrorCode.LOGIN_BAD_CREDENTIALS,
)
# Proceed to authenticate or create the user
try:
user = await user_manager.oauth_callback(
oauth_client.name,
token["access_token"],
account_id,
account_email,
token.get("expires_at"),
token.get("refresh_token"),
request,
associate_by_email=associate_by_email,
is_verified_by_default=is_verified_by_default,
)
except UserAlreadyExists:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorCode.OAUTH_USER_ALREADY_EXISTS,
)
# Login user
response = await backend.login(strategy, user)
await user_manager.on_after_login(user, request, response)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorCode.LOGIN_BAD_CREDENTIALS,
)
# Prepare redirect response
if tenant_id is None:
# Use URL utility to add parameters
redirect_destination = add_url_params(next_url, {"new_team": "true"})
redirect_response = RedirectResponse(
redirect_destination, status_code=302
)
# Login user
response = await backend.login(strategy, user)
await user_manager.on_after_login(user, request, response)
# Prepare redirect response
if tenant_id is None:
# Use URL utility to add parameters
redirect_url = add_url_params(next_url, {"new_team": "true"})
redirect_response = RedirectResponse(redirect_url, status_code=302)
else:
# No parameters to add
redirect_response = RedirectResponse(next_url, status_code=302)
# Copy headers from auth response to redirect response, with special handling for Set-Cookie
for header_name, header_value in response.headers.items():
# FastAPI can have multiple Set-Cookie headers as a list
if header_name.lower() == "set-cookie" and isinstance(header_value, list):
for cookie_value in header_value:
redirect_response.headers.append(header_name, cookie_value)
else:
# No parameters to add
redirect_response = RedirectResponse(next_url, status_code=302)
# Copy headers from auth response to redirect response, with special handling for Set-Cookie
for header_name, header_value in response.headers.items():
header_name_lower = header_name.lower()
if header_name_lower == "set-cookie":
redirect_response.headers.append(header_name, header_value)
continue
if header_name_lower in {"location", "content-length"}:
continue
redirect_response.headers[header_name] = header_value
return redirect_response
if hasattr(response, "body"):
redirect_response.body = response.body
if hasattr(response, "status_code"):
redirect_response.status_code = response.status_code
if hasattr(response, "media_type"):
redirect_response.media_type = response.media_type
if enable_pkce:
try:
redirect_response = await complete_login_flow(token, state_data)
except OnyxError as e:
return build_error_response(e)
delete_pkce_cookie(redirect_response)
return redirect_response
return await complete_login_flow(token, state_data)
return redirect_response
return router

View File

@@ -154,7 +154,8 @@ def on_task_postrun(
tenant_id = cast(str, kwargs.get("tenant_id", POSTGRES_DEFAULT_SCHEMA))
task_logger.debug(
f"Task {task.name} (ID: {task_id}) completed with state: {state} {f'for tenant_id={tenant_id}' if tenant_id else ''}"
f"Task {task.name} (ID: {task_id}) completed with state: {state} "
f"{f'for tenant_id={tenant_id}' if tenant_id else ''}"
)
r = get_redis_client(tenant_id=tenant_id)
@@ -210,9 +211,7 @@ def on_task_postrun(
def on_celeryd_init(
sender: str, # noqa: ARG001
conf: Any = None, # noqa: ARG001
**kwargs: Any, # noqa: ARG001
sender: str, conf: Any = None, **kwargs: Any # noqa: ARG001
) -> None:
"""The first signal sent on celery worker startup"""
@@ -278,7 +277,10 @@ def wait_for_redis(sender: Any, **kwargs: Any) -> None: # noqa: ARG001
time.sleep(WAIT_INTERVAL)
if not ready:
msg = f"Redis: Readiness probe did not succeed within the timeout ({WAIT_LIMIT} seconds). Exiting..."
msg = (
f"Redis: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)
@@ -317,7 +319,10 @@ def wait_for_db(sender: Any, **kwargs: Any) -> None: # noqa: ARG001
time.sleep(WAIT_INTERVAL)
if not ready:
msg = f"Database: Readiness probe did not succeed within the timeout ({WAIT_LIMIT} seconds). Exiting..."
msg = (
f"Database: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)
@@ -344,7 +349,10 @@ def on_secondary_worker_init(sender: Any, **kwargs: Any) -> None: # noqa: ARG00
f"Primary worker is not ready yet. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
)
if time_elapsed > WAIT_LIMIT:
msg = f"Primary worker was not ready within the timeout. ({WAIT_LIMIT} seconds). Exiting..."
msg = (
f"Primary worker was not ready within the timeout. "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)
@@ -514,9 +522,7 @@ def reset_tenant_id(
CURRENT_TENANT_ID_CONTEXTVAR.set(POSTGRES_DEFAULT_SCHEMA)
def wait_for_vespa_or_shutdown(
sender: Any, **kwargs: Any # noqa: ARG001
) -> None: # noqa: ARG001
def wait_for_vespa_or_shutdown(sender: Any, **kwargs: Any) -> None: # noqa: ARG001
"""Waits for Vespa to become ready subject to a timeout.
Raises WorkerShutdown if the timeout is reached."""

View File

@@ -181,7 +181,9 @@ class DynamicTenantScheduler(PersistentScheduler):
if not do_update:
# exit early if nothing changed
task_logger.info(
f"_try_updating_schedule - Schedule unchanged: tasks={len(new_schedule)} beat_multiplier={beat_multiplier}"
f"_try_updating_schedule - Schedule unchanged: "
f"tasks={len(new_schedule)} "
f"beat_multiplier={beat_multiplier}"
)
return

View File

@@ -186,6 +186,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
# Check if the Celery task actually exists
try:
result: AsyncResult = AsyncResult(attempt.celery_task_id)
# If the task is not in PENDING state, it exists in Celery
@@ -206,7 +207,8 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
except Exception:
# If we can't check the task status, be conservative and continue
logger.warning(
f"Could not verify Celery task status on startup for attempt {attempt.id}, task_id={attempt.celery_task_id}"
f"Could not verify Celery task status on startup for attempt {attempt.id}, "
f"task_id={attempt.celery_task_id}"
)
@@ -276,7 +278,8 @@ class HubPeriodicTask(bootsteps.StartStopStep):
lock.reacquire()
else:
task_logger.warning(
"Full acquisition of primary worker lock. Reasons could be worker restart or lock expiration."
"Full acquisition of primary worker lock. "
"Reasons could be worker restart or lock expiration."
)
lock = r.lock(
OnyxRedisLocks.PRIMARY_WORKER,

View File

@@ -115,15 +115,18 @@ def _extract_from_batch(
for item in doc_list:
if isinstance(item, HierarchyNode):
hierarchy_nodes.append(item)
if item.raw_node_id not in ids:
ids[item.raw_node_id] = None
elif isinstance(item, ConnectorFailure):
failed_id = _get_failure_id(item)
if failed_id:
ids[failed_id] = None
logger.warning(
f"Failed to retrieve document {failed_id}: {item.failure_message}"
f"Failed to retrieve document {failed_id}: " f"{item.failure_message}"
)
else:
ids[item.id] = item.parent_hierarchy_raw_node_id
parent_raw = getattr(item, "parent_hierarchy_raw_node_id", None)
ids[item.id] = parent_raw
return BatchResult(raw_id_to_parent=ids, hierarchy_nodes=hierarchy_nodes)
@@ -189,7 +192,9 @@ def extract_ids_from_runnable_connector(
batch_ids = batch_result.raw_id_to_parent
batch_nodes = batch_result.hierarchy_nodes
doc_batch_processing_func(batch_ids)
all_raw_id_to_parent.update(batch_ids)
for k, v in batch_ids.items():
if v is not None or k not in all_raw_id_to_parent:
all_raw_id_to_parent[k] = v
all_hierarchy_nodes.extend(batch_nodes)
if callback:

View File

@@ -307,12 +307,14 @@ def try_generate_document_cc_pair_cleanup_tasks(
if redis_connector.prune.fenced:
raise TaskDependencyError(
f"Connector deletion - Delayed (pruning in progress): cc_pair={cc_pair_id}"
"Connector deletion - Delayed (pruning in progress): "
f"cc_pair={cc_pair_id}"
)
if redis_connector.permissions.fenced:
raise TaskDependencyError(
f"Connector deletion - Delayed (permissions in progress): cc_pair={cc_pair_id}"
f"Connector deletion - Delayed (permissions in progress): "
f"cc_pair={cc_pair_id}"
)
# add tasks to celery and build up the task set to monitor in redis
@@ -352,7 +354,8 @@ def try_generate_document_cc_pair_cleanup_tasks(
# return 0
task_logger.info(
f"RedisConnectorDeletion.generate_tasks finished. cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
"RedisConnectorDeletion.generate_tasks finished. "
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
)
# set this only after all tasks have been added
@@ -363,9 +366,7 @@ def try_generate_document_cc_pair_cleanup_tasks(
def monitor_connector_deletion_taskset(
tenant_id: str,
key_bytes: bytes,
r: Redis, # noqa: ARG001
tenant_id: str, key_bytes: bytes, r: Redis # noqa: ARG001
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
@@ -689,7 +690,8 @@ def validate_connector_deletion_fence(
tasks_not_in_celery += 1
task_logger.info(
f"validate_connector_deletion_fence task check: tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
"validate_connector_deletion_fence task check: "
f"tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
)
# we're active if there are still tasks to run and those tasks all exist in celery

View File

@@ -109,7 +109,9 @@ def try_creating_docfetching_task(
except Exception:
task_logger.exception(
f"try_creating_indexing_task - Unexpected exception: cc_pair={cc_pair.id} search_settings={search_settings.id}"
f"try_creating_indexing_task - Unexpected exception: "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings.id}"
)
# Clean up on failure

View File

@@ -60,13 +60,15 @@ def _verify_indexing_attempt(
if attempt.connector_credential_pair_id != cc_pair_id:
raise SimpleJobException(
f"docfetching_task - CC pair mismatch: expected={cc_pair_id} actual={attempt.connector_credential_pair_id}",
f"docfetching_task - CC pair mismatch: "
f"expected={cc_pair_id} actual={attempt.connector_credential_pair_id}",
code=IndexingWatchdogTerminalStatus.FENCE_MISMATCH.code,
)
if attempt.search_settings_id != search_settings_id:
raise SimpleJobException(
f"docfetching_task - Search settings mismatch: expected={search_settings_id} actual={attempt.search_settings_id}",
f"docfetching_task - Search settings mismatch: "
f"expected={search_settings_id} actual={attempt.search_settings_id}",
code=IndexingWatchdogTerminalStatus.FENCE_MISMATCH.code,
)
@@ -75,7 +77,8 @@ def _verify_indexing_attempt(
IndexingStatus.IN_PROGRESS,
]:
raise SimpleJobException(
f"docfetching_task - Invalid attempt status: attempt_id={index_attempt_id} status={attempt.status}",
f"docfetching_task - Invalid attempt status: "
f"attempt_id={index_attempt_id} status={attempt.status}",
code=IndexingWatchdogTerminalStatus.FENCE_MISMATCH.code,
)
@@ -245,7 +248,9 @@ def _docfetching_task(
raise e
logger.info(
f"Indexing spawned task finished: attempt={index_attempt_id} cc_pair={cc_pair_id} search_settings={search_settings_id}"
f"Indexing spawned task finished: attempt={index_attempt_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
os._exit(0) # ensure process exits cleanly
@@ -281,7 +286,8 @@ def process_job_result(
result.status = IndexingWatchdogTerminalStatus.SUCCEEDED
task_logger.warning(
log_builder.build(
"Indexing watchdog - spawned task has non-zero exit code but completion signal is OK. Continuing...",
"Indexing watchdog - spawned task has non-zero exit code "
"but completion signal is OK. Continuing...",
exit_code=str(result.exit_code),
)
)
@@ -290,7 +296,10 @@ def process_job_result(
result.status = IndexingWatchdogTerminalStatus.from_code(result.exit_code)
job_level_exception = job.exception()
result.exception_str = f"Docfetching returned exit code {result.exit_code} with exception: {job_level_exception}"
result.exception_str = (
f"Docfetching returned exit code {result.exit_code} "
f"with exception: {job_level_exception}"
)
return result

View File

@@ -158,6 +158,7 @@ def validate_active_indexing_attempts(
logger.info("Validating active indexing attempts")
with get_session_with_current_tenant() as db_session:
# Find all active indexing attempts
active_attempts = (
db_session.execute(
@@ -189,7 +190,8 @@ def validate_active_indexing_attempts(
db_session.commit()
task_logger.info(
f"Initialized heartbeat tracking for attempt {fresh_attempt.id}: counter={fresh_attempt.heartbeat_counter}"
f"Initialized heartbeat tracking for attempt {fresh_attempt.id}: "
f"counter={fresh_attempt.heartbeat_counter}"
)
continue
@@ -212,7 +214,8 @@ def validate_active_indexing_attempts(
db_session.commit()
task_logger.debug(
f"Heartbeat advanced for attempt {fresh_attempt.id}: new_counter={current_counter}"
f"Heartbeat advanced for attempt {fresh_attempt.id}: "
f"new_counter={current_counter}"
)
continue
@@ -347,7 +350,9 @@ def monitor_indexing_attempt_progress(
)
except Exception as e:
logger.exception(
f"Failed to monitor document processing completion: attempt={attempt.id} error={str(e)}"
f"Failed to monitor document processing completion: "
f"attempt={attempt.id} "
f"error={str(e)}"
)
# Mark the attempt as failed if monitoring fails
@@ -396,7 +401,9 @@ def check_indexing_completion(
) -> None:
logger.info(
f"Checking for indexing completion: attempt={index_attempt_id} tenant={tenant_id}"
f"Checking for indexing completion: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id}"
)
# Check if indexing is complete and all batches are processed
@@ -438,7 +445,7 @@ def check_indexing_completion(
if attempt.status == IndexingStatus.IN_PROGRESS:
logger.error(
f"Indexing attempt {index_attempt_id} has been indexing for "
f"{stalled_timeout_hours // 2}-{stalled_timeout_hours} hours without progress. "
f"{stalled_timeout_hours//2}-{stalled_timeout_hours} hours without progress. "
f"Marking it as failed."
)
mark_attempt_failed(
@@ -688,12 +695,17 @@ def _kickoff_indexing_tasks(
if attempt_id is not None:
task_logger.info(
f"Connector indexing queued: index_attempt={attempt_id} cc_pair={cc_pair.id} search_settings={search_settings.id}"
f"Connector indexing queued: "
f"index_attempt={attempt_id} "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings.id}"
)
tasks_created += 1
else:
task_logger.error(
f"Failed to create indexing task: cc_pair={cc_pair.id} search_settings={search_settings.id}"
f"Failed to create indexing task: "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings.id}"
)
return tasks_created
@@ -889,7 +901,9 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
and secondary_search_settings.switchover_type == SwitchoverType.INSTANT
):
task_logger.info(
f"Skipping secondary indexing: switchover_type=INSTANT for search_settings={secondary_search_settings.id}"
f"Skipping secondary indexing: "
f"switchover_type=INSTANT "
f"for search_settings={secondary_search_settings.id}"
)
# 2/3: VALIDATE
@@ -991,7 +1005,8 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
lock_beat.release()
else:
task_logger.error(
f"check_for_indexing - Lock not owned on completion: tenant={tenant_id}"
"check_for_indexing - Lock not owned on completion: "
f"tenant={tenant_id}"
)
redis_lock_dump(lock_beat, redis_client)
@@ -1045,7 +1060,8 @@ def check_for_checkpoint_cleanup(self: Task, *, tenant_id: str) -> None:
lock.release()
else:
task_logger.error(
f"check_for_checkpoint_cleanup - Lock not owned on completion: tenant={tenant_id}"
"check_for_checkpoint_cleanup - Lock not owned on completion: "
f"tenant={tenant_id}"
)
@@ -1055,10 +1071,7 @@ def check_for_checkpoint_cleanup(self: Task, *, tenant_id: str) -> None:
bind=True,
)
def cleanup_checkpoint_task(
self: Task, # noqa: ARG001
*,
index_attempt_id: int,
tenant_id: str | None,
self: Task, *, index_attempt_id: int, tenant_id: str | None # noqa: ARG001
) -> None:
"""Clean up a checkpoint for a given index attempt"""
@@ -1071,7 +1084,9 @@ def cleanup_checkpoint_task(
elapsed = time.monotonic() - start
task_logger.info(
f"cleanup_checkpoint_task completed: tenant_id={tenant_id} index_attempt_id={index_attempt_id} elapsed={elapsed:.2f}"
f"cleanup_checkpoint_task completed: tenant_id={tenant_id} "
f"index_attempt_id={index_attempt_id} "
f"elapsed={elapsed:.2f}"
)
@@ -1134,7 +1149,8 @@ def check_for_index_attempt_cleanup(self: Task, *, tenant_id: str) -> None:
lock.release()
else:
task_logger.error(
f"check_for_index_attempt_cleanup - Lock not owned on completion: tenant={tenant_id}"
"check_for_index_attempt_cleanup - Lock not owned on completion: "
f"tenant={tenant_id}"
)
@@ -1144,10 +1160,7 @@ def check_for_index_attempt_cleanup(self: Task, *, tenant_id: str) -> None:
bind=True,
)
def cleanup_index_attempt_task(
self: Task, # noqa: ARG001
*,
index_attempt_ids: list[int],
tenant_id: str,
self: Task, *, index_attempt_ids: list[int], tenant_id: str # noqa: ARG001
) -> None:
"""Clean up an index attempt"""
start = time.monotonic()
@@ -1194,13 +1207,15 @@ def _check_failure_threshold(
FAILURE_RATIO_THRESHOLD = 0.1
if total_failures > FAILURE_THRESHOLD and failure_ratio > FAILURE_RATIO_THRESHOLD:
logger.error(
f"Connector run failed with '{total_failures}' errors after '{batch_num}' batches."
f"Connector run failed with '{total_failures}' errors "
f"after '{batch_num}' batches."
)
if last_failure and last_failure.exception:
raise last_failure.exception from last_failure.exception
raise RuntimeError(
f"Connector run encountered too many errors, aborting. Last error: {last_failure}"
f"Connector run encountered too many errors, aborting. "
f"Last error: {last_failure}"
)
@@ -1324,7 +1339,9 @@ def _docprocessing_task(
raise
task_logger.info(
f"Processing document batch: attempt={index_attempt_id} batch_num={batch_num} "
f"Processing document batch: "
f"attempt={index_attempt_id} "
f"batch_num={batch_num} "
)
# Get the document batch storage
@@ -1582,7 +1599,9 @@ def _docprocessing_task(
except Exception:
task_logger.exception(
f"Document batch processing failed: batch_num={batch_num} attempt={index_attempt_id} "
f"Document batch processing failed: "
f"batch_num={batch_num} "
f"attempt={index_attempt_id} "
)
raise

View File

@@ -84,7 +84,8 @@ def scheduled_eval_task(self: Task, **kwargs: Any) -> None: # noqa: ARG001
run_timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d")
logger.info(
f"Starting scheduled eval pipeline for project '{project_name}' with {len(dataset_names)} dataset(s): {dataset_names}"
f"Starting scheduled eval pipeline for project '{project_name}' "
f"with {len(dataset_names)} dataset(s): {dataset_names}"
)
pipeline_start = datetime.now(timezone.utc)
@@ -100,7 +101,8 @@ def scheduled_eval_task(self: Task, **kwargs: Any) -> None: # noqa: ARG001
try:
logger.info(
f"Running scheduled eval for dataset: {dataset_name} (project: {project_name})"
f"Running scheduled eval for dataset: {dataset_name} "
f"(project: {project_name})"
)
configuration = EvalConfigurationOptions(
@@ -140,5 +142,6 @@ def scheduled_eval_task(self: Task, **kwargs: Any) -> None: # noqa: ARG001
passed_count = sum(1 for r in results if r["success"])
logger.info(
f"Scheduled eval pipeline completed: {passed_count}/{len(results)} passed in {total_duration:.1f}s"
f"Scheduled eval pipeline completed: {passed_count}/{len(results)} passed "
f"in {total_duration:.1f}s"
)

View File

@@ -40,7 +40,6 @@ from onyx.db.connector_credential_pair import get_connector_credential_pair_from
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.hierarchy import upsert_hierarchy_node_cc_pair_entries
from onyx.db.hierarchy import upsert_hierarchy_nodes_batch
from onyx.db.models import ConnectorCredentialPair
from onyx.redis.redis_hierarchy import cache_hierarchy_nodes_batch
@@ -127,7 +126,9 @@ def _try_creating_hierarchy_fetching_task(
raise RuntimeError("send_task for hierarchy_fetching_task failed.")
task_logger.info(
f"Created hierarchy fetching task: cc_pair={cc_pair.id} celery_task_id={custom_task_id}"
f"Created hierarchy fetching task: "
f"cc_pair={cc_pair.id} "
f"celery_task_id={custom_task_id}"
)
return custom_task_id
@@ -213,7 +214,8 @@ def check_for_hierarchy_fetching(self: Task, *, tenant_id: str) -> int | None:
time_elapsed = time.monotonic() - time_start
task_logger.info(
f"check_for_hierarchy_fetching finished: tasks_created={tasks_created} elapsed={time_elapsed:.2f}s"
f"check_for_hierarchy_fetching finished: "
f"tasks_created={tasks_created} elapsed={time_elapsed:.2f}s"
)
return tasks_created
@@ -287,14 +289,6 @@ def _run_hierarchy_extraction(
is_connector_public=is_connector_public,
)
upsert_hierarchy_node_cc_pair_entries(
db_session=db_session,
hierarchy_node_ids=[n.id for n in upserted_nodes],
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,
commit=True,
)
# Cache in Redis for fast ancestor resolution
cache_entries = [
HierarchyNodeCacheEntry.from_db_model(node) for node in upserted_nodes
@@ -339,7 +333,8 @@ def connector_hierarchy_fetching_task(
from the connector source and stores it in the database.
"""
task_logger.info(
f"connector_hierarchy_fetching_task starting: cc_pair={cc_pair_id} tenant={tenant_id}"
f"connector_hierarchy_fetching_task starting: "
f"cc_pair={cc_pair_id} tenant={tenant_id}"
)
try:
@@ -357,7 +352,8 @@ def connector_hierarchy_fetching_task(
if cc_pair.status == ConnectorCredentialPairStatus.DELETING:
task_logger.info(
f"Skipping hierarchy fetching for deleting connector: cc_pair={cc_pair_id}"
f"Skipping hierarchy fetching for deleting connector: "
f"cc_pair={cc_pair_id}"
)
return
@@ -370,7 +366,8 @@ def connector_hierarchy_fetching_task(
)
task_logger.info(
f"connector_hierarchy_fetching_task: Extracted {total_nodes} hierarchy nodes for cc_pair={cc_pair_id}"
f"connector_hierarchy_fetching_task: "
f"Extracted {total_nodes} hierarchy nodes for cc_pair={cc_pair_id}"
)
# Update the last fetch time to prevent re-running until next interval

View File

@@ -18,9 +18,7 @@ from onyx.llm.well_known_providers.auto_update_service import (
bind=True,
)
def check_for_auto_llm_updates(
self: Task, # noqa: ARG001
*,
tenant_id: str, # noqa: ARG001
self: Task, *, tenant_id: str # noqa: ARG001
) -> bool | None:
"""Periodic task to fetch LLM model updates from GitHub
and sync them to providers in Auto mode.

View File

@@ -116,7 +116,8 @@ class Metric(BaseModel):
string_value = self.value
else:
task_logger.error(
f"Invalid metric value type: {type(self.value)} ({self.value}) for metric {self.name}."
f"Invalid metric value type: {type(self.value)} "
f"({self.value}) for metric {self.name}."
)
return
@@ -259,7 +260,8 @@ def _build_connector_final_metrics(
)
if _has_metric_been_emitted(redis_std, metric_key):
task_logger.info(
f"Skipping final metrics for connector {cc_pair.connector.id} index attempt {attempt.id}, already emitted."
f"Skipping final metrics for connector {cc_pair.connector.id} "
f"index attempt {attempt.id}, already emitted."
)
continue
@@ -1034,7 +1036,8 @@ def monitor_process_memory(self: Task, *, tenant_id: str) -> None: # noqa: ARG0
if process_name in cmdline:
if process_type in supervisor_processes.values():
task_logger.error(
f"Duplicate process type for type {process_type} with cmd {cmdline} with pid={proc.pid}."
f"Duplicate process type for type {process_type} "
f"with cmd {cmdline} with pid={proc.pid}."
)
continue
@@ -1043,7 +1046,8 @@ def monitor_process_memory(self: Task, *, tenant_id: str) -> None: # noqa: ARG0
if len(supervisor_processes) != len(process_type_mapping):
task_logger.error(
f"Missing processes: {set(process_type_mapping.keys()).symmetric_difference(supervisor_processes.values())}"
"Missing processes: "
f"{set(process_type_mapping.keys()).symmetric_difference(supervisor_processes.values())}"
)
# Log memory usage for each process
@@ -1097,7 +1101,9 @@ def cloud_monitor_celery_pidbox(
r_celery.delete(key)
task_logger.info(
f"Deleted idle pidbox: pidbox={key_str} idletime={idletime} max_idletime={MAX_PIDBOX_IDLE}"
f"Deleted idle pidbox: pidbox={key_str} "
f"idletime={idletime} "
f"max_idletime={MAX_PIDBOX_IDLE}"
)
num_deleted += 1

View File

@@ -11,9 +11,6 @@
# lock after its cleanup which happens at most after its soft timeout.
# Constants corresponding to migrate_documents_from_vespa_to_opensearch_task.
from onyx.configs.app_configs import OPENSEARCH_MIGRATION_GET_VESPA_CHUNKS_PAGE_SIZE
MIGRATION_TASK_SOFT_TIME_LIMIT_S = 60 * 5 # 5 minutes.
MIGRATION_TASK_TIME_LIMIT_S = 60 * 6 # 6 minutes.
# The maximum time the lock can be held for. Will automatically be released
@@ -47,7 +44,7 @@ TOTAL_ALLOWABLE_DOC_MIGRATION_ATTEMPTS_BEFORE_PERMANENT_FAILURE = 15
# WARNING: Do not change these values without knowing what changes also need to
# be made to OpenSearchTenantMigrationRecord.
GET_VESPA_CHUNKS_PAGE_SIZE = OPENSEARCH_MIGRATION_GET_VESPA_CHUNKS_PAGE_SIZE
GET_VESPA_CHUNKS_PAGE_SIZE = 500
GET_VESPA_CHUNKS_SLICE_COUNT = 4
# String used to indicate in the vespa_visit_continuation_token mapping that the

View File

@@ -205,7 +205,8 @@ def migrate_chunks_from_vespa_to_opensearch_task(
) = get_vespa_visit_state(db_session)
if is_continuation_token_done_for_all_slices(continuation_token_map):
task_logger.info(
f"OpenSearch migration COMPLETED for tenant {tenant_id}. Total chunks migrated: {total_chunks_migrated}."
f"OpenSearch migration COMPLETED for tenant {tenant_id}. "
f"Total chunks migrated: {total_chunks_migrated}."
)
mark_migration_completed_time_if_not_set_with_commit(db_session)
break

View File

@@ -48,15 +48,10 @@ from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.hierarchy import delete_orphaned_hierarchy_nodes
from onyx.db.hierarchy import link_hierarchy_nodes_to_documents
from onyx.db.hierarchy import remove_stale_hierarchy_node_cc_pair_entries
from onyx.db.hierarchy import reparent_orphaned_hierarchy_nodes
from onyx.db.hierarchy import update_document_parent_hierarchy_nodes
from onyx.db.hierarchy import upsert_hierarchy_node_cc_pair_entries
from onyx.db.hierarchy import upsert_hierarchy_nodes_batch
from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import HierarchyNode as DBHierarchyNode
from onyx.db.sync_record import insert_sync_record
from onyx.db.sync_record import update_sync_record_status
from onyx.db.tag import delete_orphan_tags__no_commit
@@ -65,7 +60,6 @@ from onyx.redis.redis_connector_prune import RedisConnectorPrune
from onyx.redis.redis_connector_prune import RedisConnectorPrunePayload
from onyx.redis.redis_hierarchy import cache_hierarchy_nodes_batch
from onyx.redis.redis_hierarchy import ensure_source_node_exists
from onyx.redis.redis_hierarchy import evict_hierarchy_nodes_from_cache
from onyx.redis.redis_hierarchy import get_node_id_from_raw_id
from onyx.redis.redis_hierarchy import get_source_node_id_from_cache
from onyx.redis.redis_hierarchy import HierarchyNodeCacheEntry
@@ -151,7 +145,8 @@ def _resolve_and_update_document_parents(
commit=True,
)
task_logger.info(
f"Pruning: resolved and updated parent hierarchy for {len(resolved)} documents (source={source.value})"
f"Pruning: resolved and updated parent hierarchy for "
f"{len(resolved)} documents (source={source.value})"
)
@@ -219,6 +214,7 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
# but pruning only kicks off once per hour
if not r.exists(OnyxRedisSignals.BLOCK_PRUNING):
task_logger.info("Checking for pruning due")
cc_pair_ids: list[int] = []
@@ -482,7 +478,8 @@ def connector_pruning_generator_task(
if not redis_connector.prune.fenced: # The fence must exist
raise ValueError(
f"connector_prune_generator_task - fence not found: fence={redis_connector.prune.fence_key}"
f"connector_prune_generator_task - fence not found: "
f"fence={redis_connector.prune.fence_key}"
)
payload = redis_connector.prune.payload # The payload must exist
@@ -493,7 +490,8 @@ def connector_pruning_generator_task(
if payload.celery_task_id is None:
logger.info(
f"connector_prune_generator_task - Waiting for fence: fence={redis_connector.prune.fence_key}"
f"connector_prune_generator_task - Waiting for fence: "
f"fence={redis_connector.prune.fence_key}"
)
time.sleep(1)
continue
@@ -549,7 +547,9 @@ def connector_pruning_generator_task(
redis_connector.prune.set_fence(new_payload)
task_logger.info(
f"Pruning generator running connector: cc_pair={cc_pair_id} connector_source={cc_pair.connector.source}"
f"Pruning generator running connector: "
f"cc_pair={cc_pair_id} "
f"connector_source={cc_pair.connector.source}"
)
runnable_connector = instantiate_connector(
@@ -579,12 +579,11 @@ def connector_pruning_generator_task(
source = cc_pair.connector.source
redis_client = get_redis_client(tenant_id=tenant_id)
ensure_source_node_exists(redis_client, db_session, source)
upserted_nodes: list[DBHierarchyNode] = []
if extraction_result.hierarchy_nodes:
is_connector_public = cc_pair.access_type == AccessType.PUBLIC
ensure_source_node_exists(redis_client, db_session, source)
upserted_nodes = upsert_hierarchy_nodes_batch(
db_session=db_session,
nodes=extraction_result.hierarchy_nodes,
@@ -593,14 +592,6 @@ def connector_pruning_generator_task(
is_connector_public=is_connector_public,
)
upsert_hierarchy_node_cc_pair_entries(
db_session=db_session,
hierarchy_node_ids=[n.id for n in upserted_nodes],
connector_id=connector_id,
credential_id=credential_id,
commit=True,
)
cache_entries = [
HierarchyNodeCacheEntry.from_db_model(node)
for node in upserted_nodes
@@ -616,6 +607,7 @@ def connector_pruning_generator_task(
f"hierarchy nodes for cc_pair={cc_pair_id}"
)
ensure_source_node_exists(redis_client, db_session, source)
# Resolve parent_hierarchy_raw_node_id → parent_hierarchy_node_id
# and bulk-update documents, mirroring the docfetching resolution
_resolve_and_update_document_parents(
@@ -667,50 +659,16 @@ def connector_pruning_generator_task(
return None
task_logger.info(
f"RedisConnector.prune.generate_tasks finished. cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
"RedisConnector.prune.generate_tasks finished. "
f"cc_pair={cc_pair_id} tasks_generated={tasks_generated}"
)
redis_connector.prune.generator_complete = tasks_generated
# --- Hierarchy node pruning ---
live_node_ids = {n.id for n in upserted_nodes}
stale_removed = remove_stale_hierarchy_node_cc_pair_entries(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
live_hierarchy_node_ids=live_node_ids,
commit=True,
)
deleted_raw_ids = delete_orphaned_hierarchy_nodes(
db_session=db_session,
source=source,
commit=True,
)
reparented_nodes = reparent_orphaned_hierarchy_nodes(
db_session=db_session,
source=source,
commit=True,
)
if deleted_raw_ids:
evict_hierarchy_nodes_from_cache(redis_client, source, deleted_raw_ids)
if reparented_nodes:
reparented_cache_entries = [
HierarchyNodeCacheEntry.from_db_model(node)
for node in reparented_nodes
]
cache_hierarchy_nodes_batch(
redis_client, source, reparented_cache_entries
)
if stale_removed or deleted_raw_ids or reparented_nodes:
task_logger.info(
f"Hierarchy node pruning: cc_pair={cc_pair_id} "
f"stale_entries_removed={stale_removed} "
f"nodes_deleted={len(deleted_raw_ids)} "
f"nodes_reparented={len(reparented_nodes)}"
)
except Exception as e:
task_logger.exception(
f"Pruning exceptioned: cc_pair={cc_pair_id} connector={connector_id} payload_id={payload_id}"
f"Pruning exceptioned: cc_pair={cc_pair_id} "
f"connector={connector_id} "
f"payload_id={payload_id}"
)
redis_connector.prune.reset()
@@ -728,10 +686,7 @@ def connector_pruning_generator_task(
def monitor_ccpair_pruning_taskset(
tenant_id: str,
key_bytes: bytes,
r: Redis, # noqa: ARG001
db_session: Session,
tenant_id: str, key_bytes: bytes, r: Redis, db_session: Session # noqa: ARG001
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
@@ -925,7 +880,8 @@ def validate_pruning_fence(
tasks_not_in_celery += 1
task_logger.info(
f"validate_pruning_fence task check: tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
"validate_pruning_fence task check: "
f"tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}"
)
# we're active if there are still tasks to run and those tasks all exist in celery

View File

@@ -192,7 +192,10 @@ def document_by_cc_pair_cleanup_task(
elapsed = time.monotonic() - start
task_logger.info(
f"doc={document_id} action={action} refcount={count} elapsed={elapsed:.2f}"
f"doc={document_id} "
f"action={action} "
f"refcount={count} "
f"elapsed={elapsed:.2f}"
)
except SoftTimeLimitExceeded:
task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}")
@@ -215,7 +218,9 @@ def document_by_cc_pair_cleanup_task(
if isinstance(e, httpx.HTTPStatusError):
if e.response.status_code == HTTPStatus.BAD_REQUEST:
task_logger.exception(
f"Non-retryable HTTPStatusError: doc={document_id} status={e.response.status_code}"
f"Non-retryable HTTPStatusError: "
f"doc={document_id} "
f"status={e.response.status_code}"
)
completion_status = (
OnyxCeleryTaskCompletionStatus.NON_RETRYABLE_EXCEPTION
@@ -234,7 +239,8 @@ def document_by_cc_pair_cleanup_task(
# This is the last attempt! mark the document as dirty in the db so that it
# eventually gets fixed out of band via stale document reconciliation
task_logger.warning(
f"Max celery task retries reached. Marking doc as dirty for reconciliation: doc={document_id}"
f"Max celery task retries reached. Marking doc as dirty for reconciliation: "
f"doc={document_id}"
)
with get_session_with_current_tenant() as db_session:
# delete the cc pair relationship now and let reconciliation clean it up
@@ -279,4 +285,4 @@ def celery_beat_heartbeat(self: Task, *, tenant_id: str) -> None: # noqa: ARG00
r: Redis = get_redis_client()
r.set(ONYX_CELERY_BEAT_HEARTBEAT_KEY, 1, ex=600)
time_elapsed = time.monotonic() - time_start
task_logger.info(f"celery_beat_heartbeat finished: elapsed={time_elapsed:.2f}")
task_logger.info(f"celery_beat_heartbeat finished: " f"elapsed={time_elapsed:.2f}")

View File

@@ -12,9 +12,9 @@ from redis import Redis
from redis.lock import Lock as RedisLock
from retry import retry
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
from onyx.access.access import build_access_for_user_files
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
@@ -43,9 +43,7 @@ from onyx.db.enums import UserFileStatus
from onyx.db.models import UserFile
from onyx.db.search_settings import get_active_search_settings
from onyx.db.search_settings import get_active_search_settings_list
from onyx.db.user_file import fetch_user_files_with_access_relationships
from onyx.document_index.factory import get_all_document_indices
from onyx.document_index.interfaces import VespaDocumentFields
from onyx.document_index.interfaces import VespaDocumentUserFields
from onyx.document_index.vespa_constants import DOCUMENT_ID_ENDPOINT
from onyx.file_store.file_store import get_default_file_store
@@ -56,7 +54,6 @@ from onyx.indexing.adapters.user_file_indexing_adapter import UserFileIndexingAd
from onyx.indexing.embedder import DefaultIndexingEmbedder
from onyx.indexing.indexing_pipeline import run_indexing_pipeline
from onyx.redis.redis_pool import get_redis_client
from onyx.utils.variable_functionality import global_version
def _as_uuid(value: str | UUID) -> UUID:
@@ -285,7 +282,8 @@ def check_user_file_processing(self: Task, *, tenant_id: str) -> None:
lock.release()
task_logger.info(
f"check_user_file_processing - Enqueued {enqueued} skipped_guard={skipped_guard} tasks for tenant={tenant_id}"
f"check_user_file_processing - Enqueued {enqueued} skipped_guard={skipped_guard} "
f"tasks for tenant={tenant_id}"
)
return None
@@ -316,7 +314,8 @@ def _process_user_file_without_vector_db(
token_count: int | None = len(encode(combined_text))
except Exception:
task_logger.warning(
f"_process_user_file_without_vector_db - Failed to compute token count for {uf.id}, falling back to None"
f"_process_user_file_without_vector_db - "
f"Failed to compute token count for {uf.id}, falling back to None"
)
token_count = None
@@ -336,7 +335,8 @@ def _process_user_file_without_vector_db(
db_session.commit()
task_logger.info(
f"_process_user_file_without_vector_db - Completed id={uf.id} tokens={token_count}"
f"_process_user_file_without_vector_db - "
f"Completed id={uf.id} tokens={token_count}"
)
@@ -363,7 +363,8 @@ def _process_user_file_with_indexing(
)
if current_search_settings is None:
raise RuntimeError(
f"_process_user_file_with_indexing - No current search settings found for tenant={tenant_id}"
f"_process_user_file_with_indexing - "
f"No current search settings found for tenant={tenant_id}"
)
adapter = UserFileIndexingAdapter(
@@ -393,7 +394,8 @@ def _process_user_file_with_indexing(
)
task_logger.info(
f"_process_user_file_with_indexing - Indexing pipeline completed ={index_pipeline_result}"
f"_process_user_file_with_indexing - "
f"Indexing pipeline completed ={index_pipeline_result}"
)
if (
@@ -402,7 +404,8 @@ def _process_user_file_with_indexing(
or index_pipeline_result.total_chunks == 0
):
task_logger.error(
f"_process_user_file_with_indexing - Indexing pipeline failed id={user_file_id}"
f"_process_user_file_with_indexing - "
f"Indexing pipeline failed id={user_file_id}"
)
if uf.status != UserFileStatus.DELETING:
uf.status = UserFileStatus.FAILED
@@ -529,10 +532,7 @@ def process_user_file_impl(
ignore_result=True,
)
def process_single_user_file(
self: Task, # noqa: ARG001
*,
user_file_id: str,
tenant_id: str,
self: Task, *, user_file_id: str, tenant_id: str # noqa: ARG001
) -> None:
process_user_file_impl(
user_file_id=user_file_id, tenant_id=tenant_id, redis_locking=True
@@ -688,10 +688,7 @@ def delete_user_file_impl(
ignore_result=True,
)
def process_single_user_file_delete(
self: Task, # noqa: ARG001
*,
user_file_id: str,
tenant_id: str,
self: Task, *, user_file_id: str, tenant_id: str # noqa: ARG001
) -> None:
delete_user_file_impl(
user_file_id=user_file_id, tenant_id=tenant_id, redis_locking=True
@@ -761,7 +758,8 @@ def check_for_user_file_project_sync(self: Task, *, tenant_id: str) -> None:
lock.release()
task_logger.info(
f"Enqueued {enqueued} Skipped guard {skipped_guard} tasks for tenant={tenant_id}"
f"Enqueued {enqueued} "
f"Skipped guard {skipped_guard} tasks for tenant={tenant_id}"
)
return None
@@ -793,12 +791,11 @@ def project_sync_user_file_impl(
try:
with get_session_with_current_tenant() as db_session:
user_files = fetch_user_files_with_access_relationships(
[user_file_id],
db_session,
eager_load_groups=global_version.is_ee_version(),
)
user_file = user_files[0] if user_files else None
user_file = db_session.execute(
select(UserFile)
.where(UserFile.id == _as_uuid(user_file_id))
.options(selectinload(UserFile.assistants))
).scalar_one_or_none()
if not user_file:
task_logger.info(
f"project_sync_user_file_impl - User file not found id={user_file_id}"
@@ -826,21 +823,12 @@ def project_sync_user_file_impl(
project_ids = [project.id for project in user_file.projects]
persona_ids = [p.id for p in user_file.assistants if not p.deleted]
file_id_str = str(user_file.id)
access_map = build_access_for_user_files([user_file])
access = access_map.get(file_id_str)
for retry_document_index in retry_document_indices:
retry_document_index.update_single(
doc_id=file_id_str,
doc_id=str(user_file.id),
tenant_id=tenant_id,
chunk_count=user_file.chunk_count,
fields=(
VespaDocumentFields(access=access)
if access is not None
else None
),
fields=None,
user_fields=VespaDocumentUserFields(
user_projects=project_ids,
personas=persona_ids,
@@ -875,10 +863,7 @@ def project_sync_user_file_impl(
ignore_result=True,
)
def process_single_user_file_project_sync(
self: Task, # noqa: ARG001
*,
user_file_id: str,
tenant_id: str,
self: Task, *, user_file_id: str, tenant_id: str # noqa: ARG001
) -> None:
project_sync_user_file_impl(
user_file_id=user_file_id, tenant_id=tenant_id, redis_locking=True

View File

@@ -199,7 +199,8 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str) -> bool | None:
lock_beat.release()
else:
task_logger.error(
f"check_for_vespa_sync_task - Lock not owned on completion: tenant={tenant_id}"
"check_for_vespa_sync_task - Lock not owned on completion: "
f"tenant={tenant_id}"
)
redis_lock_dump(lock_beat, r)
@@ -265,7 +266,8 @@ def try_generate_document_set_sync_tasks(
# return 0
task_logger.info(
f"RedisDocumentSet.generate_tasks finished. document_set={document_set.id} tasks_generated={tasks_generated}"
f"RedisDocumentSet.generate_tasks finished. "
f"document_set={document_set.id} tasks_generated={tasks_generated}"
)
# create before setting fence to avoid race condition where the monitoring
@@ -340,7 +342,8 @@ def try_generate_user_group_sync_tasks(
# return 0
task_logger.info(
f"RedisUserGroup.generate_tasks finished. usergroup={usergroup.id} tasks_generated={tasks_generated}"
f"RedisUserGroup.generate_tasks finished. "
f"usergroup={usergroup.id} tasks_generated={tasks_generated}"
)
# create before setting fence to avoid race condition where the monitoring
@@ -395,7 +398,8 @@ def monitor_document_set_taskset(
count = cast(int, r.scard(rds.taskset_key))
task_logger.info(
f"Document set sync progress: document_set={document_set_id} remaining={count} initial={initial_count}"
f"Document set sync progress: document_set={document_set_id} "
f"remaining={count} initial={initial_count}"
)
if count > 0:
update_sync_record_status(
@@ -440,7 +444,9 @@ def monitor_document_set_taskset(
)
except Exception:
task_logger.exception(
f"update_sync_record_status exceptioned. document_set_id={document_set_id} Resetting document set regardless."
"update_sync_record_status exceptioned. "
f"document_set_id={document_set_id} "
"Resetting document set regardless."
)
rds.reset()
@@ -477,7 +483,9 @@ def vespa_metadata_sync_task(self: Task, document_id: str, *, tenant_id: str) ->
if not doc:
elapsed = time.monotonic() - start
task_logger.info(
f"doc={document_id} action=no_operation elapsed={elapsed:.2f}"
f"doc={document_id} "
f"action=no_operation "
f"elapsed={elapsed:.2f}"
)
completion_status = OnyxCeleryTaskCompletionStatus.SKIPPED
else:
@@ -516,7 +524,9 @@ def vespa_metadata_sync_task(self: Task, document_id: str, *, tenant_id: str) ->
mark_document_as_synced(document_id, db_session)
elapsed = time.monotonic() - start
task_logger.info(f"doc={document_id} action=sync elapsed={elapsed:.2f}")
task_logger.info(
f"doc={document_id} " f"action=sync " f"elapsed={elapsed:.2f}"
)
completion_status = OnyxCeleryTaskCompletionStatus.SUCCEEDED
except SoftTimeLimitExceeded:
task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}")
@@ -539,7 +549,9 @@ def vespa_metadata_sync_task(self: Task, document_id: str, *, tenant_id: str) ->
if isinstance(e, httpx.HTTPStatusError):
if e.response.status_code == HTTPStatus.BAD_REQUEST:
task_logger.exception(
f"Non-retryable HTTPStatusError: doc={document_id} status={e.response.status_code}"
f"Non-retryable HTTPStatusError: "
f"doc={document_id} "
f"status={e.response.status_code}"
)
completion_status = (
OnyxCeleryTaskCompletionStatus.NON_RETRYABLE_EXCEPTION

View File

@@ -175,16 +175,14 @@ class SimpleJobClient:
del self.jobs[job.id]
def submit(
self,
func: Callable,
*args: Any,
pure: bool = True, # noqa: ARG002
self, func: Callable, *args: Any, pure: bool = True # noqa: ARG002
) -> SimpleJob | None:
"""NOTE: `pure` arg is needed so this can be a drop in replacement for Dask"""
self._cleanup_completed_jobs()
if len(self.jobs) >= self.n_workers:
logger.debug(
f"No available workers to run job. Currently running '{len(self.jobs)}' jobs, with a limit of '{self.n_workers}'."
f"No available workers to run job. "
f"Currently running '{len(self.jobs)}' jobs, with a limit of '{self.n_workers}'."
)
return None

View File

@@ -45,7 +45,6 @@ from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import IndexingStatus
from onyx.db.enums import IndexModelStatus
from onyx.db.enums import ProcessingMode
from onyx.db.hierarchy import upsert_hierarchy_node_cc_pair_entries
from onyx.db.hierarchy import upsert_hierarchy_nodes_batch
from onyx.db.index_attempt import create_index_attempt_error
from onyx.db.index_attempt import get_index_attempt
@@ -59,6 +58,8 @@ from onyx.file_store.document_batch_storage import DocumentBatchStorage
from onyx.file_store.document_batch_storage import get_document_batch_storage
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.indexing.indexing_pipeline import index_doc_batch_prepare
from onyx.indexing.postgres_sanitization import sanitize_document_for_postgres
from onyx.indexing.postgres_sanitization import sanitize_hierarchy_nodes_for_postgres
from onyx.redis.redis_hierarchy import cache_hierarchy_nodes_batch
from onyx.redis.redis_hierarchy import ensure_source_node_exists
from onyx.redis.redis_hierarchy import get_node_id_from_raw_id
@@ -70,8 +71,6 @@ from onyx.server.features.build.indexing.persistent_document_writer import (
)
from onyx.utils.logger import setup_logger
from onyx.utils.middleware import make_randomized_onyx_request_id
from onyx.utils.postgres_sanitization import sanitize_document_for_postgres
from onyx.utils.postgres_sanitization import sanitize_hierarchy_nodes_for_postgres
from onyx.utils.variable_functionality import global_version
from shared_configs.configs import MULTI_TENANT
from shared_configs.contextvars import INDEX_ATTEMPT_INFO_CONTEXTVAR
@@ -226,13 +225,15 @@ def _check_failure_threshold(
FAILURE_RATIO_THRESHOLD = 0.1
if total_failures > FAILURE_THRESHOLD and failure_ratio > FAILURE_RATIO_THRESHOLD:
logger.error(
f"Connector run failed with '{total_failures}' errors after '{batch_num}' batches."
f"Connector run failed with '{total_failures}' errors "
f"after '{batch_num}' batches."
)
if last_failure and last_failure.exception:
raise last_failure.exception from last_failure.exception
raise RuntimeError(
f"Connector run encountered too many errors, aborting. Last error: {last_failure}"
f"Connector run encountered too many errors, aborting. "
f"Last error: {last_failure}"
)
@@ -586,14 +587,6 @@ def connector_document_extraction(
is_connector_public=is_connector_public,
)
upsert_hierarchy_node_cc_pair_entries(
db_session=db_session,
hierarchy_node_ids=[n.id for n in upserted_nodes],
connector_id=db_connector.id,
credential_id=db_credential.id,
commit=True,
)
# Cache in Redis for fast ancestor resolution during doc processing
redis_client = get_redis_client(tenant_id=tenant_id)
cache_entries = [
@@ -607,7 +600,8 @@ def connector_document_extraction(
)
logger.debug(
f"Persisted and cached {len(hierarchy_node_batch_cleaned)} hierarchy nodes for attempt={index_attempt_id}"
f"Persisted and cached {len(hierarchy_node_batch_cleaned)} hierarchy nodes "
f"for attempt={index_attempt_id}"
)
# below is all document processing task, so if no batch we can just continue
@@ -809,12 +803,15 @@ def connector_document_extraction(
queue=OnyxCeleryQueues.SANDBOX,
)
logger.info(
f"Triggered sandbox file sync for user {creator_id} source={source_value} after indexing complete"
f"Triggered sandbox file sync for user {creator_id} "
f"source={source_value} after indexing complete"
)
except Exception as e:
logger.exception(
f"Document extraction failed: attempt={index_attempt_id} error={str(e)}"
f"Document extraction failed: "
f"attempt={index_attempt_id} "
f"error={str(e)}"
)
# Do NOT clean up batches on failure; future runs will use those batches
@@ -950,6 +947,7 @@ def reissue_old_batches(
# is still in the filestore waiting for processing or not.
last_batch_num = len(old_batches) + recent_batches
logger.info(
f"Starting from batch {last_batch_num} due to re-issued batches: {old_batches}, completed batches: {recent_batches}"
f"Starting from batch {last_batch_num} due to "
f"re-issued batches: {old_batches}, completed batches: {recent_batches}"
)
return len(old_batches), recent_batches

Some files were not shown because too many files have changed in this diff Show More