Compare commits

..

12 Commits

Author SHA1 Message Date
Nik
c8e565fa75 fix(chat): fix B1/B2/P1 bugs in multi-model streaming + cleanup
B1 — Self-completion race: model finishes before GeneratorExit fires,
exits _run_model with drain_done=False, skips self-completion. Fix:
add completion_locks (one per model); disconnect else-branch claims
lock and calls llm_loop_completion_handle for already-succeeded models.

B2 — Stop-button saves wrong message for errored models: the stop loop
called llm_loop_completion_handle for all models including ones that
threw exceptions, persisting "stopped by user" for an errored model.
Fix: add model_errored flag; stop loop skips errored models.

P1 — Orphaned ChatMessage rows for errored models: reserved_messages
were never cleaned up when a model errored. Fix: delete via
db_session.get(ChatMessage, id) in all three exit paths (normal
completion, stop-button, disconnect).

Also: extract repeated orphan-cleanup into _delete_orphaned_message
nested helper; remove dead check_call_count variable in tests; rename
ctx→worker_context, _completion_done→completion_persisted; replace
functools.partial with captured-variable lambda; fix stale docstring
("bounded"→"unbounded"); add _CANCEL_POLL_INTERVAL_S named constant;
if/if/if→if/elif/elif; %-style logger calls throughout.

Tests: two new regression tests (B1 race, B2 stop-button errored model).
26 tests pass. mypy clean.
2026-04-01 08:17:50 -07:00
Nik
bab95d8bf0 fix(chat): remove duplicate drain_done declaration after rebase 2026-03-31 20:02:29 -07:00
Nik
eb7bc74e1b fix(chat): persist LLM response on HTTP disconnect via drain_done + worker self-completion
When the HTTP client disconnects, Starlette throws GeneratorExit into the
drain loop generator. The old code called executor.shutdown(wait=False) with
no completion handling, leaving the assistant DB message as the TERMINATED
placeholder forever (regressing test_send_message_disconnect_and_cleanup).

New design:
- drain_done (threading.Event) signals emitters to return immediately instead
  of blocking on queue.put — no retry loops, no daemon threads
- One-time queue drain in the else block releases any in-progress puts so
  workers exit within milliseconds
- Workers self-complete: after run_llm_loop returns, each worker checks
  drain_done.is_set() and, if true, opens its own DB session and calls
  llm_loop_completion_handle directly

Unit test updated to reflect the async self-completion semantics: the test
blocks the worker inside run_llm_loop until gen.close() sets drain_done,
then waits for completion_called inside the patch context (while mocks are
still active) to avoid calling the real get_session_with_current_tenant.
2026-03-31 20:02:29 -07:00
Nik
29da0aefb5 feat(chat): add multi-model parallel streaming (N=2-3 LLMs side-by-side)
Adds support for running 2-3 LLMs in parallel within a single chat turn,
with responses streamed interleaved to the frontend via the merged queue
infrastructure introduced in the preceding PR.

Backend changes
- process_message.py: restore llm_overrides param on build_chat_turn and
  _stream_chat_turn; restore is_multi branching for LLM setup, context
  window sizing, and message ID reservation; add _build_model_display_name
  and handle_multi_model_stream (public multi-model entrypoint)
- db/chat.py: add reserve_multi_model_message_ids (reserves N assistant
  message placeholders sharing the same parent), set_preferred_response
  (marks one response as the user's preferred), and extend
  translate_db_message_to_chat_message_detail with preferred_response_id
  and model_display_name fields
- chat_backend.py: route requests with llm_overrides >1 through
  handle_multi_model_stream; reject non-streaming multi-model requests with
  OnyxError; add /set-preferred-response endpoint

Tests
- test_multi_model_streaming.py: unit tests for _run_models drain loop
  (arrival-order yield, error isolation, cancellation), handle_multi_model_stream
  validation guards, and N=1 backwards-compatibility
2026-03-31 20:01:21 -07:00
Nik
6c86301c51 fix(chat): remove bounded queue and packet drops — match old behavior
Old code used queue.Queue() (unbounded, blocking put). New code introduced
queue.Queue(maxsize=100) + put(timeout=3.0) + silent drop on queue.Full —
a regression in all three callsites:

- Emitter.emit(): data packets silently dropped on queue full
- _run_model exception path: model errors silently lost
- _run_model finally (_MODEL_DONE): if dropped, drain loop hangs forever
  (models_remaining never reaches 0)

Fix: remove maxsize, remove all timeout= arguments, remove all
except queue.Full handlers. The drain_done early-return in emit() is the
correct disconnect mechanism; queue backpressure is not needed.

Also adds _completion_done: bool type annotation and fixes the queue drain
comment (no longer unblocking timed-out puts — just releasing memory).
2026-03-31 20:00:46 -07:00
Nik
631146f48f fix(chat): use model_succeeded instead of check_is_connected on self-completion
On HTTP disconnect, check_is_connected() returns False, causing
llm_loop_completion_handle to treat a completed response as
user-cancelled and append "Generation was stopped by the user."
Use lambda: model_succeeded[model_idx] (always True here) instead,
matching the cancellation path's functools.partial(bool, model_succeeded[i]).
2026-03-31 18:42:04 -07:00
Nik
f327278506 fix(chat): persist LLM response on HTTP disconnect via drain_done + worker self-completion
When the HTTP client disconnects, Starlette throws GeneratorExit into the
drain loop generator. The old else branch just called executor.shutdown(wait=False)
with no completion handling, leaving the assistant DB message as the TERMINATED
placeholder forever (regressing test_send_message_disconnect_and_cleanup).

New design:
- drain_done (threading.Event) signals emitters to return immediately instead
  of blocking on queue.put — no retry loops, no daemon threads
- One-time queue drain in the else block releases any in-progress puts so
  workers exit within milliseconds
- Workers self-complete: after run_llm_loop returns, each worker checks
  drain_done.is_set() and, if true, opens its own DB session and calls
  llm_loop_completion_handle directly
2026-03-31 18:14:50 -07:00
Nik
c7cc439862 fix(emitter): address Greptile P1/P2/P3 and Queue typing
- P1: executor.shutdown(wait=False) on early exit — don't block the
  server thread waiting for LLM workers; they will hit queue.Full
  timeouts and exit on their own (matches old run_chat_loop behavior)
- P2: wrap db_session.commit() in try/finally in build_chat_turn —
  reset processing status before propagating if commit fails, so the
  chat session isn't stuck at "processing" permanently
- P3: fix inaccurate comment "All worker threads have exited" — workers
  may still be closing their own DB sessions at that point; clarify
  that only the main-thread db_session is safe to use
- Queue[Any] → Queue[tuple[int, Packet | Exception | object]] in Emitter
2026-03-31 17:02:46 -07:00
Nik
3365a369e2 fix(review): address Greptile comments
- Add owner to bare TODO comment
- Restore placement field assertions weakened by Emitter refactor
2026-03-31 12:49:09 -07:00
Nik
470bda3fb5 refactor(chat): elegance pass on PR1 changed files
process_message.py:
- Fix `skip_clarification` field in ChatTurnSetup: inline comment inside
  the type annotation → separate `#` comment on the line above the field
- Flatten `model_tools` via list comprehension instead of manual extend loop
- `forced_tool_id` membership test: list → set comprehension (O(1) lookup)
- Trim `_run_model` inner-function docstring — private closure doesn't need
  10-line Args block
- Remove redundant inline param comments from `_stream_chat_turn` and
  `handle_stream_message_objects` where the docstring Args section already
  documents them
- Strip duplicate Args/Returns from `handle_stream_message_objects` docstring
  — it delegates entirely to `_stream_chat_turn`

emitter.py:
- Widen `merged_queue` annotation to `Queue[Any]`: Queue is invariant so
  `Queue[tuple[int, Packet]]` can't be passed a `Queue[tuple[int, Packet |
  Exception | object]]`; the emitter is a write-only producer and doesn't
  care what else lives on the queue
2026-03-31 12:16:38 -07:00
Nik
13f511e209 refactor(emitter): clean up string annotation and use model_copy
- Fix `"Queue"` forward-reference annotation → `Queue[tuple[int, Packet]]`
  (Queue is already imported, the string was unnecessary)
- Replace manual Placement field copy with `base.model_copy(update={...})`
- Remove redundant `key` variable (was just `self._model_idx`)
- Tighten docstring
2026-03-31 11:44:28 -07:00
Nik
c5e8ba1eab refactor(chat): replace bus-polling emitter with merged-queue streaming; fix 429 hang
Switch Emitter from a per-model event bus + polling thread to a single
bounded queue shared across all models.  Each emit() call puts directly onto
the queue; the drain loop in _run_models yields packets in arrival order.

Key changes
- emitter.py: remove Bus, get_default_emitter(); add Emitter(merged_queue, model_idx)
- chat_state.py: remove run_chat_loop_with_state_containers (113-line bus-poll loop)
- process_message.py: add ChatTurnSetup dataclass and build_chat_turn(); rewrite
  _stream_chat_turn + _run_models around the merged queue; single-model (N=1)
  path is fully backwards-compatible
- placement.py, override_models.py: add docstrings; LLMOverride gains display_name
- research_agent.py, custom_tool.py: update Emitter call sites
- test_emitter.py: new unit tests for queue routing, model_index tagging, placement

Frontend 429 fix
- lib.tsx: parse response body for human-readable detail on non-2xx responses
  instead of "HTTP error! status: 429"
- useChatController.ts: surface stack.error after the FIFO drain loop exits so
  the catch block replaces the thinking placeholder with an error message
2026-03-30 22:18:48 -07:00
308 changed files with 7807 additions and 12761 deletions

View File

@@ -704,9 +704,6 @@ jobs:
NEXT_PUBLIC_FORGOT_PASSWORD_ENABLED=true
NEXT_PUBLIC_INCLUDE_ERROR_POPUP_SUPPORT_LINK=true
NODE_OPTIONS=--max-old-space-size=8192
SENTRY_RELEASE=${{ github.sha }}
secrets: |
sentry_auth_token=${{ secrets.SENTRY_AUTH_TOKEN }}
cache-from: |
type=registry,ref=${{ env.RUNS_ON_ECR_CACHE }}:cloudweb-cache-amd64
type=registry,ref=${{ env.REGISTRY_IMAGE }}:latest
@@ -789,9 +786,6 @@ jobs:
NEXT_PUBLIC_FORGOT_PASSWORD_ENABLED=true
NEXT_PUBLIC_INCLUDE_ERROR_POPUP_SUPPORT_LINK=true
NODE_OPTIONS=--max-old-space-size=8192
SENTRY_RELEASE=${{ github.sha }}
secrets: |
sentry_auth_token=${{ secrets.SENTRY_AUTH_TOKEN }}
cache-from: |
type=registry,ref=${{ env.RUNS_ON_ECR_CACHE }}:cloudweb-cache-arm64
type=registry,ref=${{ env.REGISTRY_IMAGE }}:latest
@@ -1509,105 +1503,232 @@ jobs:
$(printf '%s\n' "${META_TAGS}" | xargs -I {} echo -t {}) \
$IMAGES
trivy-scan:
trivy-scan-web:
needs:
- determine-builds
- merge-web
- merge-web-cloud
- merge-backend
- merge-model-server
if: >-
always() && !cancelled() &&
(needs.merge-web.result == 'success' ||
needs.merge-web-cloud.result == 'success' ||
needs.merge-backend.result == 'success' ||
needs.merge-model-server.result == 'success')
if: needs.merge-web.result == 'success'
runs-on:
- runs-on
- runner=2cpu-linux-arm64
- run-id=${{ github.run_id }}-trivy-scan-${{ matrix.component }}
- run-id=${{ github.run_id }}-trivy-scan-web
- extras=ecr-cache
permissions:
security-events: write # needed for SARIF uploads
timeout-minutes: 10
strategy:
fail-fast: false
matrix:
include:
- component: web
registry-image: onyxdotapp/onyx-web-server
- component: web-cloud
registry-image: onyxdotapp/onyx-web-server-cloud
- component: backend
registry-image: ${{ contains(github.ref_name, 'cloud') && 'onyxdotapp/onyx-backend-cloud' || 'onyxdotapp/onyx-backend' }}
trivyignore: backend/.trivyignore
- component: model-server
registry-image: ${{ contains(github.ref_name, 'cloud') && 'onyxdotapp/onyx-model-server-cloud' || 'onyxdotapp/onyx-model-server' }}
timeout-minutes: 90
environment: release
env:
REGISTRY_IMAGE: onyxdotapp/onyx-web-server
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
- name: Get AWS Secrets
uses: aws-actions/aws-secretsmanager-get-secrets@a9a7eb4e2f2871d30dc5b892576fde60a2ecc802
with:
secret-ids: |
DOCKER_USERNAME, deploy/docker-username
DOCKER_TOKEN, deploy/docker-token
parse-json-secrets: true
- name: Run Trivy vulnerability scanner
uses: nick-fields/retry@ce71cc2ab81d554ebbe88c79ab5975992d79ba08 # ratchet:nick-fields/retry@v3
with:
timeout_minutes: 30
max_attempts: 3
retry_wait_seconds: 10
command: |
if [ "${{ needs.determine-builds.outputs.is-test-run }}" == "true" ]; then
SCAN_IMAGE="${{ env.RUNS_ON_ECR_CACHE }}:web-${{ needs.determine-builds.outputs.sanitized-tag }}"
else
SCAN_IMAGE="docker.io/${{ env.REGISTRY_IMAGE }}:${{ github.ref_name }}"
fi
docker run --rm -v $HOME/.cache/trivy:/root/.cache/trivy \
-e TRIVY_DB_REPOSITORY="public.ecr.aws/aquasecurity/trivy-db:2" \
-e TRIVY_JAVA_DB_REPOSITORY="public.ecr.aws/aquasecurity/trivy-java-db:1" \
-e TRIVY_USERNAME="${{ env.DOCKER_USERNAME }}" \
-e TRIVY_PASSWORD="${{ env.DOCKER_TOKEN }}" \
aquasec/trivy@sha256:a22415a38938a56c379387a8163fcb0ce38b10ace73e593475d3658d578b2436 \
image \
--skip-version-check \
--timeout 20m \
--severity CRITICAL,HIGH \
${SCAN_IMAGE}
trivy-scan-web-cloud:
needs:
- determine-builds
- merge-web-cloud
if: needs.merge-web-cloud.result == 'success'
runs-on:
- runs-on
- runner=2cpu-linux-arm64
- run-id=${{ github.run_id }}-trivy-scan-web-cloud
- extras=ecr-cache
timeout-minutes: 90
environment: release
env:
REGISTRY_IMAGE: onyxdotapp/onyx-web-server-cloud
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
- name: Get AWS Secrets
uses: aws-actions/aws-secretsmanager-get-secrets@a9a7eb4e2f2871d30dc5b892576fde60a2ecc802
with:
secret-ids: |
DOCKER_USERNAME, deploy/docker-username
DOCKER_TOKEN, deploy/docker-token
parse-json-secrets: true
- name: Run Trivy vulnerability scanner
uses: nick-fields/retry@ce71cc2ab81d554ebbe88c79ab5975992d79ba08 # ratchet:nick-fields/retry@v3
with:
timeout_minutes: 30
max_attempts: 3
retry_wait_seconds: 10
command: |
if [ "${{ needs.determine-builds.outputs.is-test-run }}" == "true" ]; then
SCAN_IMAGE="${{ env.RUNS_ON_ECR_CACHE }}:web-cloud-${{ needs.determine-builds.outputs.sanitized-tag }}"
else
SCAN_IMAGE="docker.io/${{ env.REGISTRY_IMAGE }}:${{ github.ref_name }}"
fi
docker run --rm -v $HOME/.cache/trivy:/root/.cache/trivy \
-e TRIVY_DB_REPOSITORY="public.ecr.aws/aquasecurity/trivy-db:2" \
-e TRIVY_JAVA_DB_REPOSITORY="public.ecr.aws/aquasecurity/trivy-java-db:1" \
-e TRIVY_USERNAME="${{ env.DOCKER_USERNAME }}" \
-e TRIVY_PASSWORD="${{ env.DOCKER_TOKEN }}" \
aquasec/trivy@sha256:a22415a38938a56c379387a8163fcb0ce38b10ace73e593475d3658d578b2436 \
image \
--skip-version-check \
--timeout 20m \
--severity CRITICAL,HIGH \
${SCAN_IMAGE}
trivy-scan-backend:
needs:
- determine-builds
- merge-backend
if: needs.merge-backend.result == 'success'
runs-on:
- runs-on
- runner=2cpu-linux-arm64
- run-id=${{ github.run_id }}-trivy-scan-backend
- extras=ecr-cache
timeout-minutes: 90
environment: release
env:
REGISTRY_IMAGE: ${{ contains(github.ref_name, 'cloud') && 'onyxdotapp/onyx-backend-cloud' || 'onyxdotapp/onyx-backend' }}
steps:
- name: Check if this scan should run
id: should-run
run: |
case "$COMPONENT" in
web) RESULT="$MERGE_WEB" ;;
web-cloud) RESULT="$MERGE_WEB_CLOUD" ;;
backend) RESULT="$MERGE_BACKEND" ;;
model-server) RESULT="$MERGE_MODEL_SERVER" ;;
esac
if [ "$RESULT" == "success" ]; then
echo "run=true" >> "$GITHUB_OUTPUT"
else
echo "run=false" >> "$GITHUB_OUTPUT"
fi
env:
COMPONENT: ${{ matrix.component }}
MERGE_WEB: ${{ needs.merge-web.result }}
MERGE_WEB_CLOUD: ${{ needs.merge-web-cloud.result }}
MERGE_BACKEND: ${{ needs.merge-backend.result }}
MERGE_MODEL_SERVER: ${{ needs.merge-model-server.result }}
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
if: steps.should-run.outputs.run == 'true'
- name: Checkout
if: steps.should-run.outputs.run == 'true' && matrix.trivyignore != ''
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- name: Determine scan image
if: steps.should-run.outputs.run == 'true'
id: scan-image
run: |
if [ "$IS_TEST_RUN" == "true" ]; then
echo "image=${RUNS_ON_ECR_CACHE}:${TAG_PREFIX}-${SANITIZED_TAG}" >> "$GITHUB_OUTPUT"
else
echo "image=docker.io/${REGISTRY_IMAGE}:${REF_NAME}" >> "$GITHUB_OUTPUT"
fi
env:
IS_TEST_RUN: ${{ needs.determine-builds.outputs.is-test-run }}
TAG_PREFIX: ${{ matrix.component }}
SANITIZED_TAG: ${{ needs.determine-builds.outputs.sanitized-tag }}
REGISTRY_IMAGE: ${{ matrix.registry-image }}
REF_NAME: ${{ github.ref_name }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
- name: Get AWS Secrets
uses: aws-actions/aws-secretsmanager-get-secrets@a9a7eb4e2f2871d30dc5b892576fde60a2ecc802
with:
secret-ids: |
DOCKER_USERNAME, deploy/docker-username
DOCKER_TOKEN, deploy/docker-token
parse-json-secrets: true
- name: Run Trivy vulnerability scanner
if: steps.should-run.outputs.run == 'true'
uses: aquasecurity/trivy-action@57a97c7e7821a5776cebc9bb87c984fa69cba8f1 # ratchet:aquasecurity/trivy-action@v0.35.0
uses: nick-fields/retry@ce71cc2ab81d554ebbe88c79ab5975992d79ba08 # ratchet:nick-fields/retry@v3
with:
image-ref: ${{ steps.scan-image.outputs.image }}
severity: CRITICAL,HIGH
format: "sarif"
output: "trivy-results.sarif"
trivyignores: ${{ matrix.trivyignore }}
env:
TRIVY_USERNAME: ${{ secrets.DOCKER_USERNAME }}
TRIVY_PASSWORD: ${{ secrets.DOCKER_TOKEN }}
timeout_minutes: 30
max_attempts: 3
retry_wait_seconds: 10
command: |
if [ "${{ needs.determine-builds.outputs.is-test-run }}" == "true" ]; then
SCAN_IMAGE="${{ env.RUNS_ON_ECR_CACHE }}:backend-${{ needs.determine-builds.outputs.sanitized-tag }}"
else
SCAN_IMAGE="docker.io/${{ env.REGISTRY_IMAGE }}:${{ github.ref_name }}"
fi
docker run --rm -v $HOME/.cache/trivy:/root/.cache/trivy \
-v ${{ github.workspace }}/backend/.trivyignore:/tmp/.trivyignore:ro \
-e TRIVY_DB_REPOSITORY="public.ecr.aws/aquasecurity/trivy-db:2" \
-e TRIVY_JAVA_DB_REPOSITORY="public.ecr.aws/aquasecurity/trivy-java-db:1" \
-e TRIVY_USERNAME="${{ env.DOCKER_USERNAME }}" \
-e TRIVY_PASSWORD="${{ env.DOCKER_TOKEN }}" \
aquasec/trivy@sha256:a22415a38938a56c379387a8163fcb0ce38b10ace73e593475d3658d578b2436 \
image \
--skip-version-check \
--timeout 20m \
--severity CRITICAL,HIGH \
--ignorefile /tmp/.trivyignore \
${SCAN_IMAGE}
- name: Upload Trivy scan results to GitHub Security tab
if: steps.should-run.outputs.run == 'true'
uses: github/codeql-action/upload-sarif@ba454b8ab46733eb6145342877cd148270bb77ab
trivy-scan-model-server:
needs:
- determine-builds
- merge-model-server
if: needs.merge-model-server.result == 'success'
runs-on:
- runs-on
- runner=2cpu-linux-arm64
- run-id=${{ github.run_id }}-trivy-scan-model-server
- extras=ecr-cache
timeout-minutes: 90
environment: release
env:
REGISTRY_IMAGE: ${{ contains(github.ref_name, 'cloud') && 'onyxdotapp/onyx-model-server-cloud' || 'onyxdotapp/onyx-model-server' }}
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7
with:
sarif_file: "trivy-results.sarif"
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
- name: Get AWS Secrets
uses: aws-actions/aws-secretsmanager-get-secrets@a9a7eb4e2f2871d30dc5b892576fde60a2ecc802
with:
secret-ids: |
DOCKER_USERNAME, deploy/docker-username
DOCKER_TOKEN, deploy/docker-token
parse-json-secrets: true
- name: Run Trivy vulnerability scanner
uses: nick-fields/retry@ce71cc2ab81d554ebbe88c79ab5975992d79ba08 # ratchet:nick-fields/retry@v3
with:
timeout_minutes: 30
max_attempts: 3
retry_wait_seconds: 10
command: |
if [ "${{ needs.determine-builds.outputs.is-test-run }}" == "true" ]; then
SCAN_IMAGE="${{ env.RUNS_ON_ECR_CACHE }}:model-server-${{ needs.determine-builds.outputs.sanitized-tag }}"
else
SCAN_IMAGE="docker.io/${{ env.REGISTRY_IMAGE }}:${{ github.ref_name }}"
fi
docker run --rm -v $HOME/.cache/trivy:/root/.cache/trivy \
-e TRIVY_DB_REPOSITORY="public.ecr.aws/aquasecurity/trivy-db:2" \
-e TRIVY_JAVA_DB_REPOSITORY="public.ecr.aws/aquasecurity/trivy-java-db:1" \
-e TRIVY_USERNAME="${{ env.DOCKER_USERNAME }}" \
-e TRIVY_PASSWORD="${{ env.DOCKER_TOKEN }}" \
aquasec/trivy@sha256:a22415a38938a56c379387a8163fcb0ce38b10ace73e593475d3658d578b2436 \
image \
--skip-version-check \
--timeout 20m \
--severity CRITICAL,HIGH \
${SCAN_IMAGE}
notify-slack-on-failure:
needs:

View File

@@ -41,7 +41,7 @@ jobs:
version: v3.19.0
- name: Set up chart-testing
uses: helm/chart-testing-action@2e2940618cb426dce2999631d543b53cdcfc8527
uses: helm/chart-testing-action@b5eebdd9998021f29756c53432f48dab66394810
with:
uv_version: "0.9.9"

View File

@@ -22,40 +22,132 @@ on:
- cron: "0 16 * * *"
permissions:
id-token: write # Required for OIDC-based AWS credential exchange
contents: read
env:
PYTHONPATH: ./backend
DISABLE_TELEMETRY: "true"
# AWS
AWS_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS: ${{ secrets.AWS_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS }}
AWS_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS: ${{ secrets.AWS_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS }}
# Cloudflare R2
R2_ACCOUNT_ID_DAILY_CONNECTOR_TESTS: ${{ vars.R2_ACCOUNT_ID_DAILY_CONNECTOR_TESTS }}
R2_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS: ${{ secrets.R2_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS }}
R2_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS: ${{ secrets.R2_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS }}
# Google Cloud Storage
GCS_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS: ${{ secrets.GCS_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS }}
GCS_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS: ${{ secrets.GCS_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS }}
# Confluence
CONFLUENCE_TEST_SPACE_URL: ${{ vars.CONFLUENCE_TEST_SPACE_URL }}
CONFLUENCE_TEST_SPACE: ${{ vars.CONFLUENCE_TEST_SPACE }}
CONFLUENCE_TEST_PAGE_ID: ${{ secrets.CONFLUENCE_TEST_PAGE_ID }}
CONFLUENCE_USER_NAME: ${{ vars.CONFLUENCE_USER_NAME }}
CONFLUENCE_ACCESS_TOKEN: ${{ secrets.CONFLUENCE_ACCESS_TOKEN }}
CONFLUENCE_ACCESS_TOKEN_SCOPED: ${{ secrets.CONFLUENCE_ACCESS_TOKEN_SCOPED }}
# Jira
JIRA_BASE_URL: ${{ secrets.JIRA_BASE_URL }}
JIRA_USER_EMAIL: ${{ secrets.JIRA_USER_EMAIL }}
JIRA_API_TOKEN: ${{ secrets.JIRA_API_TOKEN }}
JIRA_API_TOKEN_SCOPED: ${{ secrets.JIRA_API_TOKEN_SCOPED }}
# Gong
GONG_ACCESS_KEY: ${{ secrets.GONG_ACCESS_KEY }}
GONG_ACCESS_KEY_SECRET: ${{ secrets.GONG_ACCESS_KEY_SECRET }}
# Google
GOOGLE_DRIVE_SERVICE_ACCOUNT_JSON_STR: ${{ secrets.GOOGLE_DRIVE_SERVICE_ACCOUNT_JSON_STR }}
GOOGLE_DRIVE_OAUTH_CREDENTIALS_JSON_STR_TEST_USER_1: ${{ secrets.GOOGLE_DRIVE_OAUTH_CREDENTIALS_JSON_STR_TEST_USER_1 }}
GOOGLE_DRIVE_OAUTH_CREDENTIALS_JSON_STR: ${{ secrets.GOOGLE_DRIVE_OAUTH_CREDENTIALS_JSON_STR }}
GOOGLE_GMAIL_SERVICE_ACCOUNT_JSON_STR: ${{ secrets.GOOGLE_GMAIL_SERVICE_ACCOUNT_JSON_STR }}
GOOGLE_GMAIL_OAUTH_CREDENTIALS_JSON_STR: ${{ secrets.GOOGLE_GMAIL_OAUTH_CREDENTIALS_JSON_STR }}
# Slab
SLAB_BOT_TOKEN: ${{ secrets.SLAB_BOT_TOKEN }}
# Zendesk
ZENDESK_SUBDOMAIN: ${{ secrets.ZENDESK_SUBDOMAIN }}
ZENDESK_EMAIL: ${{ secrets.ZENDESK_EMAIL }}
ZENDESK_TOKEN: ${{ secrets.ZENDESK_TOKEN }}
# Salesforce
SF_USERNAME: ${{ vars.SF_USERNAME }}
SF_PASSWORD: ${{ secrets.SF_PASSWORD }}
SF_SECURITY_TOKEN: ${{ secrets.SF_SECURITY_TOKEN }}
# Hubspot
HUBSPOT_ACCESS_TOKEN: ${{ secrets.HUBSPOT_ACCESS_TOKEN }}
# IMAP
IMAP_HOST: ${{ vars.IMAP_HOST }}
IMAP_USERNAME: ${{ vars.IMAP_USERNAME }}
IMAP_PASSWORD: ${{ secrets.IMAP_PASSWORD }}
IMAP_MAILBOXES: ${{ vars.IMAP_MAILBOXES }}
# Airtable
AIRTABLE_TEST_BASE_ID: ${{ vars.AIRTABLE_TEST_BASE_ID }}
AIRTABLE_TEST_TABLE_ID: ${{ vars.AIRTABLE_TEST_TABLE_ID }}
AIRTABLE_TEST_TABLE_NAME: ${{ vars.AIRTABLE_TEST_TABLE_NAME }}
AIRTABLE_ACCESS_TOKEN: ${{ secrets.AIRTABLE_ACCESS_TOKEN }}
# Sharepoint
SHAREPOINT_CLIENT_ID: ${{ vars.SHAREPOINT_CLIENT_ID }}
SHAREPOINT_CLIENT_SECRET: ${{ secrets.SHAREPOINT_CLIENT_SECRET }}
SHAREPOINT_CLIENT_DIRECTORY_ID: ${{ vars.SHAREPOINT_CLIENT_DIRECTORY_ID }}
SHAREPOINT_SITE: ${{ vars.SHAREPOINT_SITE }}
PERM_SYNC_SHAREPOINT_CLIENT_ID: ${{ secrets.PERM_SYNC_SHAREPOINT_CLIENT_ID }}
PERM_SYNC_SHAREPOINT_PRIVATE_KEY: ${{ secrets.PERM_SYNC_SHAREPOINT_PRIVATE_KEY }}
PERM_SYNC_SHAREPOINT_CERTIFICATE_PASSWORD: ${{ secrets.PERM_SYNC_SHAREPOINT_CERTIFICATE_PASSWORD }}
PERM_SYNC_SHAREPOINT_DIRECTORY_ID: ${{ secrets.PERM_SYNC_SHAREPOINT_DIRECTORY_ID }}
# Github
ACCESS_TOKEN_GITHUB: ${{ secrets.ACCESS_TOKEN_GITHUB }}
# Gitlab
GITLAB_ACCESS_TOKEN: ${{ secrets.GITLAB_ACCESS_TOKEN }}
# Gitbook
GITBOOK_SPACE_ID: ${{ secrets.GITBOOK_SPACE_ID }}
GITBOOK_API_KEY: ${{ secrets.GITBOOK_API_KEY }}
# Notion
NOTION_INTEGRATION_TOKEN: ${{ secrets.NOTION_INTEGRATION_TOKEN }}
# Highspot
HIGHSPOT_KEY: ${{ secrets.HIGHSPOT_KEY }}
HIGHSPOT_SECRET: ${{ secrets.HIGHSPOT_SECRET }}
# Slack
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
# Discord
DISCORD_CONNECTOR_BOT_TOKEN: ${{ secrets.DISCORD_CONNECTOR_BOT_TOKEN }}
# Teams
TEAMS_APPLICATION_ID: ${{ secrets.TEAMS_APPLICATION_ID }}
TEAMS_DIRECTORY_ID: ${{ secrets.TEAMS_DIRECTORY_ID }}
TEAMS_SECRET: ${{ secrets.TEAMS_SECRET }}
# Bitbucket
BITBUCKET_WORKSPACE: ${{ secrets.BITBUCKET_WORKSPACE }}
BITBUCKET_REPOSITORIES: ${{ secrets.BITBUCKET_REPOSITORIES }}
BITBUCKET_PROJECTS: ${{ secrets.BITBUCKET_PROJECTS }}
BITBUCKET_EMAIL: ${{ vars.BITBUCKET_EMAIL }}
BITBUCKET_API_TOKEN: ${{ secrets.BITBUCKET_API_TOKEN }}
# Fireflies
FIREFLIES_API_KEY: ${{ secrets.FIREFLIES_API_KEY }}
jobs:
connectors-check:
# See https://runs-on.com/runners/linux/
runs-on:
[
runs-on,
runner=8cpu-linux-x64,
"run-id=${{ github.run_id }}-connectors-check",
"extras=s3-cache",
]
runs-on: [runs-on, runner=8cpu-linux-x64, "run-id=${{ github.run_id }}-connectors-check", "extras=s3-cache"]
timeout-minutes: 45
environment: ci-protected
env:
PYTHONPATH: ./backend
DISABLE_TELEMETRY: "true"
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
@@ -96,66 +188,6 @@ jobs:
- 'backend/onyx/file_processing/**'
- 'uv.lock'
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7 # ratchet:aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_OIDC_ROLE_ARN }}
aws-region: us-east-2
- name: Get connector test secrets from AWS Secrets Manager
uses: aws-actions/aws-secretsmanager-get-secrets@a9a7eb4e2f2871d30dc5b892576fde60a2ecc802 # ratchet:aws-actions/aws-secretsmanager-get-secrets@v2
with:
parse-json-secrets: false
secret-ids: |
AWS_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS, test/aws-access-key-id
AWS_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS, test/aws-secret-access-key
R2_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS, test/r2-access-key-id
R2_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS, test/r2-secret-access-key
GCS_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS, test/gcs-access-key-id
GCS_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS, test/gcs-secret-access-key
CONFLUENCE_ACCESS_TOKEN, test/confluence-access-token
CONFLUENCE_ACCESS_TOKEN_SCOPED, test/confluence-access-token-scoped
JIRA_BASE_URL, test/jira-base-url
JIRA_USER_EMAIL, test/jira-user-email
JIRA_API_TOKEN, test/jira-api-token
JIRA_API_TOKEN_SCOPED, test/jira-api-token-scoped
GONG_ACCESS_KEY, test/gong-access-key
GONG_ACCESS_KEY_SECRET, test/gong-access-key-secret
GOOGLE_DRIVE_SERVICE_ACCOUNT_JSON_STR, test/google-drive-service-account-json
GOOGLE_DRIVE_OAUTH_CREDENTIALS_JSON_STR_TEST_USER_1, test/google-drive-oauth-creds-test-user-1
GOOGLE_DRIVE_OAUTH_CREDENTIALS_JSON_STR, test/google-drive-oauth-creds
GOOGLE_GMAIL_SERVICE_ACCOUNT_JSON_STR, test/google-gmail-service-account-json
GOOGLE_GMAIL_OAUTH_CREDENTIALS_JSON_STR, test/google-gmail-oauth-creds
SLAB_BOT_TOKEN, test/slab-bot-token
ZENDESK_SUBDOMAIN, test/zendesk-subdomain
ZENDESK_EMAIL, test/zendesk-email
ZENDESK_TOKEN, test/zendesk-token
SF_PASSWORD, test/sf-password
SF_SECURITY_TOKEN, test/sf-security-token
HUBSPOT_ACCESS_TOKEN, test/hubspot-access-token
IMAP_PASSWORD, test/imap-password
AIRTABLE_ACCESS_TOKEN, test/airtable-access-token
SHAREPOINT_CLIENT_SECRET, test/sharepoint-client-secret
PERM_SYNC_SHAREPOINT_CLIENT_ID, test/perm-sync-sharepoint-client-id
PERM_SYNC_SHAREPOINT_PRIVATE_KEY, test/perm-sync-sharepoint-private-key
PERM_SYNC_SHAREPOINT_CERTIFICATE_PASSWORD, test/perm-sync-sharepoint-cert-password
PERM_SYNC_SHAREPOINT_DIRECTORY_ID, test/perm-sync-sharepoint-directory-id
ACCESS_TOKEN_GITHUB, test/github-access-token
GITLAB_ACCESS_TOKEN, test/gitlab-access-token
GITBOOK_SPACE_ID, test/gitbook-space-id
GITBOOK_API_KEY, test/gitbook-api-key
NOTION_INTEGRATION_TOKEN, test/notion-integration-token
HIGHSPOT_KEY, test/highspot-key
HIGHSPOT_SECRET, test/highspot-secret
SLACK_BOT_TOKEN, test/slack-bot-token
DISCORD_CONNECTOR_BOT_TOKEN, test/discord-bot-token
TEAMS_APPLICATION_ID, test/teams-application-id
TEAMS_DIRECTORY_ID, test/teams-directory-id
TEAMS_SECRET, test/teams-secret
BITBUCKET_WORKSPACE, test/bitbucket-workspace
BITBUCKET_API_TOKEN, test/bitbucket-api-token
FIREFLIES_API_KEY, test/fireflies-api-key
- name: Run Tests (excluding HubSpot, Salesforce, GitHub, and Coda)
shell: script -q -e -c "bash --noprofile --norc -eo pipefail {0}"
run: |

View File

@@ -15,6 +15,7 @@ permissions:
jobs:
Deploy-Preview:
runs-on: ubuntu-latest
environment: ci-protected
timeout-minutes: 30
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd

View File

@@ -6,7 +6,7 @@ Use explicit type annotations for variables to enhance code clarity, especially
## Best Practices
Use the "Engineering Best Practices" section of `CONTRIBUTING.md` as core review context. Prefer consistency with existing patterns, fix issues in code you touch, avoid tacking new features onto muddy interfaces, fail loudly instead of silently swallowing errors, keep code strictly typed, preserve clear state boundaries, remove duplicate or dead logic, break up overly long functions, avoid hidden import-time side effects, respect module boundaries, and favor correctness-by-construction over relying on callers to use an API correctly.
Use `contributing_guides/best_practices.md` as core review context. Prefer consistency with existing patterns, fix issues in code you touch, avoid tacking new features onto muddy interfaces, fail loudly instead of silently swallowing errors, keep code strictly typed, preserve clear state boundaries, remove duplicate or dead logic, break up overly long functions, avoid hidden import-time side effects, respect module boundaries, and favor correctness-by-construction over relying on callers to use an API correctly.
## TODOs
@@ -27,7 +27,6 @@ Code changes must consider both multi-tenant and single-tenant deployments. In m
## Nginx Routing — New Backend Routes
Whenever a new backend route is added that does NOT start with `/api`, it must also be explicitly added to ALL nginx configs:
- `deployment/helm/charts/onyx/templates/nginx-conf.yaml` (Helm/k8s)
- `deployment/data/nginx/app.conf.template` (docker-compose dev)
- `deployment/data/nginx/app.conf.template.prod` (docker-compose prod)
@@ -38,7 +37,3 @@ Routes not starting with `/api` are not caught by the existing `^/(api|openapi\.
## Full vs Lite Deployments
Code changes must consider both regular Onyx deployments and Onyx lite deployments. Lite deployments disable the vector DB, Redis, model servers, and background workers by default, use PostgreSQL-backed cache/auth/file storage, and rely on the API server to handle background work. Do not assume those services are available unless the code path is explicitly limited to full deployments.
## SWR Cache Keys — Always Use SWR_KEYS Registry
All `useSWR()` calls and `mutate()` calls in the frontend must reference the centralized `SWR_KEYS` registry in `web/src/lib/swr-keys.ts` instead of inline endpoint strings or local string constants. Never write `useSWR("/api/some/endpoint", ...)` or `mutate("/api/some/endpoint")` — always use the corresponding `SWR_KEYS.someEndpoint` constant. If the endpoint does not yet exist in the registry, add it there first. This applies to all variants of an endpoint (e.g. query-string variants like `?get_editable=true` must also be registered as their own key).

View File

@@ -357,5 +357,5 @@ raise OnyxError(OnyxErrorCode.BAD_GATEWAY, detail, status_code_override=e.respon
## Best Practices
In addition to the other content in this file, best practices for contributing
to the codebase can be found in the "Engineering Best Practices" section of
`CONTRIBUTING.md`. Understand its contents and follow them.
to the codebase can be found at `contributing_guides/best_practices.md`.
Understand its contents and follow them.

View File

@@ -1,487 +1,32 @@
# Contributing to Onyx
Hey there! We are so excited that you're interested in Onyx.
## Table of Contents
- [Contribution Opportunities](#contribution-opportunities)
- [Contribution Process](#contribution-process)
- [Development Setup](#development-setup)
- [Prerequisites](#prerequisites)
- [Backend: Python Requirements](#backend-python-requirements)
- [Frontend: Node Dependencies](#frontend-node-dependencies)
- [Formatting and Linting](#formatting-and-linting)
- [Running the Application](#running-the-application)
- [VSCode Debugger (Recommended)](#vscode-debugger-recommended)
- [Manually Running for Development](#manually-running-for-development)
- [Running in Docker](#running-in-docker)
- [macOS-Specific Notes](#macos-specific-notes)
- [Engineering Best Practices](#engineering-best-practices)
- [Principles and Collaboration](#principles-and-collaboration)
- [Style and Maintainability](#style-and-maintainability)
- [Performance and Correctness](#performance-and-correctness)
- [Repository Conventions](#repository-conventions)
- [Release Process](#release-process)
- [Getting Help](#getting-help)
- [Enterprise Edition Contributions](#enterprise-edition-contributions)
---
## Contribution Opportunities
The [GitHub Issues](https://github.com/onyx-dot-app/onyx/issues) page is a great place to look for and share contribution ideas.
If you have your own feature that you would like to build, please create an issue and community members can provide feedback and upvote if they feel a common need.
If you have your own feature that you would like to build please create an issue and community members can provide feedback and
thumb it up if they feel a common need.
---
## Contribution Process
## Contributing Code
Please reference the documents in contributing_guides folder to ensure that the code base is kept to a high standard.
1. dev_setup.md (start here): gives you a guide to setting up a local development environment.
2. contribution_process.md: how to ensure you are building valuable features that will get reviewed and merged.
3. best_practices.md: before asking for reviews, ensure your changes meet the repo code quality standards.
To contribute, please follow the
["fork and pull request"](https://docs.github.com/en/get-started/quickstart/contributing-to-projects) workflow.
### 1. Get the feature or enhancement approved
Create a GitHub issue and see if there are upvotes. If you feel the feature is sufficiently value-additive and you would like approval to contribute it to the repo, tag [Yuhong](https://github.com/yuhongsun96) to review.
If you do not get a response within a week, feel free to email yuhong@onyx.app and include the issue in the message.
Not all small features and enhancements will be accepted as there is a balance between feature richness and bloat. We strive to provide the best user experience possible so we have to be intentional about what we include in the app.
### 2. Get the design approved
The Onyx team will either provide a design doc and PRD for the feature or request one from you, the contributor. The scope and detail of the design will depend on the individual feature.
### 3. IP attribution for EE contributions
If you are contributing features to Onyx Enterprise Edition, you are required to sign the [IP Assignment Agreement](contributor_ip_assignment/EE_Contributor_IP_Assignment_Agreement.md).
### 4. Review and testing
Your features must pass all tests and all comments must be addressed prior to merging.
### Implicit agreements
If we approve an issue, we are promising you the following:
- Your work will receive timely attention and we will put aside other important items to ensure you are not blocked.
- You will receive necessary coaching on eng quality, system design, etc. to ensure the feature is completed well.
- The Onyx team will pull resources and bandwidth from design, PM, and engineering to ensure that you have all the resources to build the feature to the quality required for merging.
Because this is a large investment from our team, we ask that you:
- Thoroughly read all the requirements of the design docs, engineering best practices, and try to minimize overhead for the Onyx team.
- Complete the feature in a timely manner to reduce context switching and an ongoing resource pull from the Onyx team.
---
## Development Setup
Onyx being a fully functional app, relies on some external software, specifically:
- [Postgres](https://www.postgresql.org/) (Relational DB)
- [OpenSearch](https://opensearch.org/) (Vector DB/Search Engine)
- [Redis](https://redis.io/) (Cache)
- [MinIO](https://min.io/) (File Store)
- [Nginx](https://nginx.org/) (Not needed for development flows generally)
> **Note:**
> This guide provides instructions to build and run Onyx locally from source with Docker containers providing the above external software.
> We believe this combination is easier for development purposes. If you prefer to use pre-built container images, see [Running in Docker](#running-in-docker) below.
### Prerequisites
- **Python 3.11** — If using a lower version, modifications will have to be made to the code. Higher versions may have library compatibility issues.
- **Docker** — Required for running external services (Postgres, OpenSearch, Redis, MinIO).
- **Node.js v22** — We recommend using [nvm](https://github.com/nvm-sh/nvm) to manage Node installations.
### Backend: Python Requirements
We use [uv](https://docs.astral.sh/uv/) and recommend creating a [virtual environment](https://docs.astral.sh/uv/pip/environments/#using-a-virtual-environment).
```bash
uv venv .venv --python 3.11
source .venv/bin/activate
```
_For Windows, activate the virtual environment using Command Prompt:_
```bash
.venv\Scripts\activate
```
If using PowerShell, the command slightly differs:
```powershell
.venv\Scripts\Activate.ps1
```
Install the required Python dependencies:
```bash
uv sync --all-extras
```
Install Playwright for Python (headless browser required by the Web Connector):
```bash
uv run playwright install
```
### Frontend: Node Dependencies
```bash
nvm install 22 && nvm use 22
node -v # verify your active version
```
Navigate to `onyx/web` and run:
```bash
npm i
```
### Formatting and Linting
#### Backend
Set up pre-commit hooks (black / reorder-python-imports):
```bash
uv run pre-commit install
```
We also use `mypy` for static type checking. Onyx is fully type-annotated, and we want to keep it that way! To run the mypy checks manually:
```bash
uv run mypy . # from onyx/backend
```
#### Frontend
We use `prettier` for formatting. The desired version will be installed via `npm i` from the `onyx/web` directory. To run the formatter:
```bash
npx prettier --write . # from onyx/web
```
Pre-commit will also run prettier automatically on files you've recently touched. If re-formatted, your commit will fail. Re-stage your changes and commit again.
---
## Running the Application
### VSCode Debugger (Recommended)
We highly recommend using VSCode's debugger for development.
#### Initial Setup
1. Copy `.vscode/env_template.txt` to `.vscode/.env`
2. Fill in the necessary environment variables in `.vscode/.env`
#### Using the Debugger
Before starting, make sure the Docker Daemon is running.
1. Open the Debug view in VSCode (Cmd+Shift+D on macOS)
2. From the dropdown at the top, select "Clear and Restart External Volumes and Containers" and press the green play button
3. From the dropdown at the top, select "Run All Onyx Services" and press the green play button
4. Navigate to http://localhost:3000 in your browser to start using the app
5. Set breakpoints by clicking to the left of line numbers to help debug while the app is running
6. Use the debug toolbar to step through code, inspect variables, etc.
> **Note:** "Clear and Restart External Volumes and Containers" will reset your Postgres and OpenSearch (relational-db and index). Only run this if you are okay with wiping your data.
**Features:**
- Hot reload is enabled for the web server and API servers
- Python debugging is configured with debugpy
- Environment variables are loaded from `.vscode/.env`
- Console output is organized in the integrated terminal with labeled tabs
### Manually Running for Development
#### Docker containers for external software
You will need Docker installed to run these containers.
Navigate to `onyx/deployment/docker_compose`, then start up Postgres/OpenSearch/Redis/MinIO with:
```bash
docker compose -f docker-compose.yml -f docker-compose.dev.yml up -d index relational_db cache minio
```
(index refers to OpenSearch, relational_db refers to Postgres, and cache refers to Redis)
#### Running Onyx locally
To start the frontend, navigate to `onyx/web` and run:
```bash
npm run dev
```
Next, start the model server which runs the local NLP models. Navigate to `onyx/backend` and run:
```bash
uvicorn model_server.main:app --reload --port 9000
```
_For Windows (for compatibility with both PowerShell and Command Prompt):_
```bash
powershell -Command "uvicorn model_server.main:app --reload --port 9000"
```
The first time running Onyx, you will need to run the DB migrations for Postgres. After the first time, this is no longer required unless the DB models change.
Navigate to `onyx/backend` and with the venv active, run:
```bash
alembic upgrade head
```
Next, start the task queue which orchestrates the background jobs. Still in `onyx/backend`, run:
```bash
python ./scripts/dev_run_background_jobs.py
```
To run the backend API server, navigate back to `onyx/backend` and run:
```bash
AUTH_TYPE=basic uvicorn onyx.main:app --reload --port 8080
```
_For Windows (for compatibility with both PowerShell and Command Prompt):_
```bash
powershell -Command "
$env:AUTH_TYPE='basic'
uvicorn onyx.main:app --reload --port 8080
"
```
> **Note:** If you need finer logging, add the additional environment variable `LOG_LEVEL=DEBUG` to the relevant services.
#### Wrapping up
You should now have 4 servers running:
- Web server
- Backend API
- Model server
- Background jobs
Now, visit http://localhost:3000 in your browser. You should see the Onyx onboarding wizard where you can connect your external LLM provider to Onyx.
You've successfully set up a local Onyx instance!
### Running in Docker
You can run the full Onyx application stack from pre-built images including all external software dependencies.
Navigate to `onyx/deployment/docker_compose` and run:
```bash
docker compose up -d
```
After Docker pulls and starts these containers, navigate to http://localhost:3000 to use Onyx.
If you want to make changes to Onyx and run those changes in Docker, you can also build a local version of the Onyx container images that incorporates your changes:
```bash
docker compose up -d --build
```
---
## macOS-Specific Notes
### Setting up Python
Ensure [Homebrew](https://brew.sh/) is already set up, then install Python 3.11:
```bash
brew install python@3.11
```
Add Python 3.11 to your path by adding the following line to `~/.zshrc`:
```
export PATH="$(brew --prefix)/opt/python@3.11/libexec/bin:$PATH"
```
> **Note:** You will need to open a new terminal for the path change above to take effect.
### Setting up Docker
On macOS, you will need to install [Docker Desktop](https://www.docker.com/products/docker-desktop/) and ensure it is running before continuing with the docker commands.
### Formatting and Linting
macOS will likely require you to remove some quarantine attributes on some of the hooks for them to execute properly. After installing pre-commit, run the following command:
```bash
sudo xattr -r -d com.apple.quarantine ~/.cache/pre-commit
```
---
## Engineering Best Practices
> These are also what we adhere to as a team internally, we love to build in the open and to uplevel our community and each other through being transparent.
### Principles and Collaboration
- **Use 1-way vs 2-way doors.** For 2-way doors, move faster and iterate. For 1-way doors, be more deliberate.
- **Consistency > being "right."** Prefer consistent patterns across the codebase. If something is truly bad, fix it everywhere.
- **Fix what you touch (selectively).**
- Don't feel obligated to fix every best-practice issue you notice.
- Don't introduce new bad practices.
- If your change touches code that violates best practices, fix it as part of the change.
- **Don't tack features on.** When adding functionality, restructure logically as needed to avoid muddying interfaces and accumulating tech debt.
### Style and Maintainability
#### Comments and readability
Add clear comments:
- At logical boundaries (e.g., interfaces) so the reader doesn't need to dig 10 layers deeper.
- Wherever assumptions are made or something non-obvious/unexpected is done.
- For complicated flows/functions.
- Wherever it saves time (e.g., nontrivial regex patterns).
#### Errors and exceptions
- **Fail loudly** rather than silently skipping work.
- Example: raise and let exceptions propagate instead of silently dropping a document.
- **Don't overuse `try/except`.**
- Put `try/except` at the correct logical level.
- Do not mask exceptions unless it is clearly appropriate.
#### Typing
- Everything should be **as strictly typed as possible**.
- Use `cast` for annoying/loose-typed interfaces (e.g., results of `run_functions_tuples_in_parallel`).
- Only `cast` when the type checker sees `Any` or types are too loose.
- Prefer types that are easy to read.
- Avoid dense types like `dict[tuple[str, str], list[list[float]]]`.
- Prefer domain models, e.g.:
- `EmbeddingModel(provider_name, model_name)` as a Pydantic model
- `dict[EmbeddingModel, list[EmbeddingVector]]`
#### State, objects, and boundaries
- Keep **clear logical boundaries** for state containers and objects.
- A **config** object should never contain things like a `db_session`.
- Avoid state containers that are overly nested, or huge + flat (use judgment).
- Prefer **composition and functional style** over inheritance/OOP.
- Prefer **no mutation** unless there's a strong reason.
- State objects should be **intentional and explicit**, ideally nonmutating.
- Use interfaces/objects to create clear separation of responsibility.
- Prefer simplicity when there's no clear gain.
- Avoid overcomplicated mechanisms like semaphores.
- Prefer **hash maps (dicts)** over tree structures unless there's a strong reason.
#### Naming
- Name variables carefully and intentionally.
- Prefer long, explicit names when undecided.
- Avoid single-character variables except for small, self-contained utilities (or not at all).
- Keep the same object/name consistent through the call stack and within functions when reasonable.
- Good: `for token in tokens:`
- Bad: `for msg in tokens:` (if iterating tokens)
- Function names should bias toward **long + descriptive** for codebase search.
- IntelliSense can miss call sites; search works best with unique names.
#### Correctness by construction
- Prefer self-contained correctness — don't rely on callers to "use it right" if you can make misuse hard.
- Avoid redundancies: if a function takes an arg, it shouldn't also take a state object that contains that same arg.
- No dead code (unless there's a very good reason).
- No commented-out code in main or feature branches (unless there's a very good reason).
- No duplicate logic:
- Don't copy/paste into branches when shared logic can live above the conditional.
- If you're afraid to touch the original, you don't understand it well enough.
- LLMs often create subtle duplicate logic — review carefully and remove it.
- Avoid "nearly identical" objects that confuse when to use which.
- Avoid extremely long functions with chained logic:
- Encapsulate steps into helpers for readability, even if not reused.
- "Pythonic" multi-step expressions are OK in moderation; don't trade clarity for cleverness.
### Performance and Correctness
- Avoid holding resources for extended periods (DB sessions, locks/semaphores).
- Validate objects on creation and right before use.
- Connector code (data to Onyx documents):
- Any in-memory structure that can grow without bound based on input must be periodically size-checked.
- If a connector is OOMing (often shows up as "missing celery tasks"), this is a top thing to check retroactively.
- Async and event loops:
- Never introduce new async/event loop Python code, and try to make existing async code synchronous when possible if it makes sense.
- Writing async code without 100% understanding the code and having a concrete reason to do so is likely to introduce bugs and not add any meaningful performance gains.
### Repository Conventions
#### Where code lives
- Pydantic + data models: `models.py` files.
- DB interface functions (excluding lazy loading): `db/` directory.
- LLM prompts: `prompts/` directory, roughly mirroring the code layout that uses them.
- API routes: `server/` directory.
#### Pydantic and modeling
- Prefer **Pydantic** over dataclasses.
- If absolutely required, use `allow_arbitrary_types`.
#### Data conventions
- Prefer explicit `None` over sentinel empty strings (usually; depends on intent).
- Prefer explicit identifiers: use string enums instead of integer codes.
- Avoid magic numbers (co-location is good when necessary). **Always avoid magic strings.**
#### Logging
- Log messages where they are created.
- Don't propagate log messages around just to log them elsewhere.
#### Encapsulation
- Don't use private attributes/methods/properties from other classes/modules.
- "Private" is private — respect that boundary.
#### SQLAlchemy guidance
- Lazy loading is often bad at scale, especially across multiple list relationships.
- Be careful when accessing SQLAlchemy object attributes:
- It can help avoid redundant DB queries,
- but it can also fail if accessed outside an active session,
- and lazy loading can add hidden DB dependencies to otherwise "simple" functions.
- Reference: https://www.reddit.com/r/SQLAlchemy/comments/138f248/joinedload_vs_selectinload/
#### Trunk-based development and feature flags
- **PRs should contain no more than 500 lines of real change.**
- **Merge to main frequently.** Avoid long-lived feature branches — they create merge conflicts and integration pain.
- **Use feature flags for incremental rollout.**
- Large features should be merged in small, shippable increments behind a flag.
- This allows continuous integration without exposing incomplete functionality.
- **Keep flags short-lived.** Once a feature is fully rolled out, remove the flag and dead code paths promptly.
- **Flag at the right level.** Prefer flagging at API/UI entry points rather than deep in business logic.
- **Test both flag states.** Ensure the codebase works correctly with the flag on and off.
#### Miscellaneous
- Any TODOs you add in the code must be accompanied by either the name/username of the owner of that TODO, or an issue number for an issue referencing that piece of work.
- Avoid module-level logic that runs on import, which leads to import-time side effects. Essentially every piece of meaningful logic should exist within some function that has to be explicitly invoked. Acceptable exceptions may include loading environment variables or setting up loggers.
- If you find yourself needing something like this, you may want that logic to exist in a file dedicated for manual execution (contains `if __name__ == "__main__":`) which should not be imported by anything else.
- Do not conflate Python scripts you intend to run from the command line (contains `if __name__ == "__main__":`) with modules you intend to import from elsewhere. If for some unlikely reason they have to be the same file, any logic specific to executing the file (including imports) should be contained in the `if __name__ == "__main__":` block.
- Generally these executable files exist in `backend/scripts/`.
---
## Release Process
Onyx loosely follows the SemVer versioning standard.
A set of Docker containers will be pushed automatically to DockerHub with every tag.
You can see the containers [here](https://hub.docker.com/search?q=onyx%2F).
---
## Getting Help
## Getting Help 🙋
We have support channels and generally interesting discussions on our [Discord](https://discord.gg/4NA5SbzrWb).
See you there!
---
## Enterprise Edition Contributions
If you are contributing features to Onyx Enterprise Edition (code under any `ee/` directory), you are required to sign the [IP Assignment Agreement](contributor_ip_assignment/EE_Contributor_IP_Assignment_Agreement.md) ([PDF version](contributor_ip_assignment/EE_Contributor_IP_Assignment_Agreement.pdf)).
## Release Process
Onyx loosely follows the SemVer versioning standard.
Major changes are released with a "minor" version bump. Currently we use patch release versions to indicate small feature changes.
A set of Docker containers will be pushed automatically to DockerHub with every tag.
You can see the containers [here](https://hub.docker.com/search?q=onyx%2F).

102
README.md
View File

@@ -4,6 +4,8 @@
<a href="https://www.onyx.app/?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme"> <img width="50%" src="https://github.com/onyx-dot-app/onyx/blob/logo/OnyxLogoCropped.jpg?raw=true" /></a>
</h2>
<p align="center">Open Source AI Platform</p>
<p align="center">
<a href="https://discord.gg/TDJ59cGV2X" target="_blank">
<img src="https://img.shields.io/badge/discord-join-blue.svg?logo=discord&logoColor=white" alt="Discord" />
@@ -25,94 +27,82 @@
</a>
</p>
# Onyx - The Open Source AI Platform
**[Onyx](https://www.onyx.app/?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme)** is the application layer for LLMs - bringing a feature-rich interface that can be easily hosted by anyone.
Onyx enables LLMs through advanced capabilities like RAG, web search, code execution, file creation, deep research and more.
**[Onyx](https://www.onyx.app/?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme)** is a feature-rich, self-hostable Chat UI that works with any LLM. It is easy to deploy and can run in a completely airgapped environment.
Connect your applications with over 50+ indexing based connectors provided out of the box or via MCP.
Onyx comes loaded with advanced features like Agents, Web Search, RAG, MCP, Deep Research, Connectors to 40+ knowledge sources, and more.
> [!TIP]
> Deploy with a single command:
> Run Onyx with one command (or see deployment section below):
> ```
> curl -fsSL https://onyx.app/install_onyx.sh | bash
> ```
![Onyx Chat Silent Demo](https://github.com/onyx-dot-app/onyx/releases/download/v3.0.0/Onyx.gif)
****
![Onyx Chat Silent Demo](https://github.com/onyx-dot-app/onyx/releases/download/v0.21.1/OnyxChatSilentDemo.gif)
---
## ⭐ Features
- **🔍 Agentic RAG:** Get best in class search and answer quality based on hybrid index + AI Agents for information retrieval
- Benchmark to release soon!
- **🔬 Deep Research:** Get in depth reports with a multi-step research flow.
- Top of [leaderboard](https://github.com/onyx-dot-app/onyx_deep_research_bench) as of Feb 2026.
- **🤖 Custom Agents:** Build AI Agents with unique instructions, knowledge, and actions.
- **🌍 Web Search:** Browse the web to get up to date information.
- Supports Serper, Google PSE, Brave, SearXNG, and others.
- Comes with an in house web crawler and support for Firecrawl/Exa.
- **📄 Artifacts:** Generate documents, graphics, and other downloadable artifacts.
- **▶️ Actions & MCP:** Let Onyx agents interact with external applications, comes with flexible Auth options.
- **💻 Code Execution:** Execute code in a sandbox to analyze data, render graphs, or modify files.
- **🎙️ Voice Mode:** Chat with Onyx via text-to-speech and speech-to-text.
- **🤖 Custom Agents:** Build AI Agents with unique instructions, knowledge and actions.
- **🌍 Web Search:** Browse the web with Google PSE, Exa, and Serper as well as an in-house scraper or Firecrawl.
- **🔍 RAG:** Best in class hybrid-search + knowledge graph for uploaded files and ingested documents from connectors.
- **🔄 Connectors:** Pull knowledge, metadata, and access information from over 40 applications.
- **🔬 Deep Research:** Get in depth answers with an agentic multi-step search.
- **▶️ Actions & MCP:** Give AI Agents the ability to interact with external systems.
- **💻 Code Interpreter:** Execute code to analyze data, render graphs and create files.
- **🎨 Image Generation:** Generate images based on user prompts.
- **👥 Collaboration:** Chat sharing, feedback gathering, user management, usage analytics, and more.
Onyx supports all major LLM providers, both self-hosted (like Ollama, LiteLLM, vLLM, etc.) and proprietary (like Anthropic, OpenAI, Gemini, etc.).
Onyx works with all LLMs (like OpenAI, Anthropic, Gemini, etc.) and self-hosted LLMs (like Ollama, vLLM, etc.)
To learn more - check out our [docs](https://docs.onyx.app/welcome?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme)!
To learn more about the features, check out our [documentation](https://docs.onyx.app/welcome?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme)!
---
## 🚀 Deployment Modes
> Onyx supports deployments in Docker, Kubernetes, Helm/Terraform and provides guides for major cloud providers.
> Detailed deployment guides found [here](https://docs.onyx.app/deployment/overview).
## 🚀 Deployment
Onyx supports deployments in Docker, Kubernetes, Terraform, along with guides for major cloud providers.
Onyx supports two separate deployment options: standard and lite.
#### Onyx Lite
The Lite mode can be thought of as a lightweight Chat UI. It requires less resources (under 1GB memory) and runs a less complex stack.
It is great for users who want to test out Onyx quickly or for teams who are only interested in the Chat UI and Agents functionalities.
#### Standard Onyx
The complete feature set of Onyx which is recommended for serious users and larger teams. Additional components not included in Lite mode:
- Vector + Keyword index for RAG.
- Background containers to run job queues and workers for syncing knowledge from connectors.
- AI model inference servers to run deep learning models used during indexing and inference.
- Performance optimizations for large scale use via in memory cache (Redis) and blob store (MinIO).
See guides below:
- [Docker](https://docs.onyx.app/deployment/local/docker?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme) or [Quickstart](https://docs.onyx.app/deployment/getting_started/quickstart?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme) (best for most users)
- [Kubernetes](https://docs.onyx.app/deployment/local/kubernetes?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme) (best for large teams)
- [Terraform](https://docs.onyx.app/deployment/local/terraform?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme) (best for teams already using Terraform)
- Cloud specific guides (best if specifically using [AWS EKS](https://docs.onyx.app/deployment/cloud/aws/eks?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme), [Azure VMs](https://docs.onyx.app/deployment/cloud/azure?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme), etc.)
> [!TIP]
> **To try Onyx for free without deploying, visit [Onyx Cloud](https://cloud.onyx.app/signup?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme)**.
> **To try Onyx for free without deploying, check out [Onyx Cloud](https://cloud.onyx.app/signup?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme)**.
---
## 🏢 Onyx for Enterprise
Onyx is built for teams of all sizes, from individual users to the largest global enterprises:
- 👥 Collaboration: Share chats and agents with other members of your organization.
- 🔐 Single Sign On: SSO via Google OAuth, OIDC, or SAML. Group syncing and user provisioning via SCIM.
- 🛡️ Role Based Access Control: RBAC for sensitive resources like access to agents, actions, etc.
- 📊 Analytics: Usage graphs broken down by teams, LLMs, or agents.
- 🕵️ Query History: Audit usage to ensure safe adoption of AI in your organization.
- 💻 Custom code: Run custom code to remove PII, reject sensitive queries, or to run custom analysis.
- 🎨 Whitelabeling: Customize the look and feel of Onyx with custom naming, icons, banners, and more.
## 🔍 Other Notable Benefits
Onyx is built for teams of all sizes, from individual users to the largest global enterprises.
- **Enterprise Search**: far more than simple RAG, Onyx has custom indexing and retrieval that remains performant and accurate for scales of up to tens of millions of documents.
- **Security**: SSO (OIDC/SAML/OAuth2), RBAC, encryption of credentials, etc.
- **Management UI**: different user roles such as basic, curator, and admin.
- **Document Permissioning**: mirrors user access from external apps for RAG use cases.
## 🚧 Roadmap
To see ongoing and upcoming projects, check out our [roadmap](https://github.com/orgs/onyx-dot-app/projects/2)!
## 📚 Licensing
There are two editions of Onyx:
- Onyx Community Edition (CE) is available freely under the MIT license and covers all of the core features for Chat, RAG, Agents, and Actions.
- Onyx Community Edition (CE) is available freely under the MIT license.
- Onyx Enterprise Edition (EE) includes extra features that are primarily useful for larger organizations.
For feature details, check out [our website](https://www.onyx.app/pricing?utm_source=onyx_repo&utm_medium=github&utm_campaign=readme).
## 👪 Community
## 👪 Community
Join our open source community on **[Discord](https://discord.gg/TDJ59cGV2X)**!
## 💡 Contributing
## 💡 Contributing
Looking to contribute? Please check out the [Contribution Guide](CONTRIBUTING.md) for more details.

View File

@@ -1,54 +0,0 @@
"""csv to tabular chat file type
Revision ID: 8188861f4e92
Revises: d8cdfee5df80
Create Date: 2026-03-31 19:23:05.753184
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "8188861f4e92"
down_revision = "d8cdfee5df80"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute(
"""
UPDATE chat_message
SET files = (
SELECT jsonb_agg(
CASE
WHEN elem->>'type' = 'csv'
THEN jsonb_set(elem, '{type}', '"tabular"')
ELSE elem
END
)
FROM jsonb_array_elements(files) AS elem
)
WHERE files::text LIKE '%"type": "csv"%'
"""
)
def downgrade() -> None:
op.execute(
"""
UPDATE chat_message
SET files = (
SELECT jsonb_agg(
CASE
WHEN elem->>'type' = 'tabular'
THEN jsonb_set(elem, '{type}', '"csv"')
ELSE elem
END
)
FROM jsonb_array_elements(files) AS elem
)
WHERE files::text LIKE '%"type": "tabular"%'
"""
)

View File

@@ -1,55 +0,0 @@
"""add skipped to userfilestatus
Revision ID: d8cdfee5df80
Revises: 1d78c0ca7853
Create Date: 2026-04-01 10:47:12.593950
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "d8cdfee5df80"
down_revision = "1d78c0ca7853"
branch_labels = None
depends_on = None
TABLE = "user_file"
COLUMN = "status"
CONSTRAINT_NAME = "ck_user_file_status"
OLD_VALUES = ("PROCESSING", "INDEXING", "COMPLETED", "FAILED", "CANCELED", "DELETING")
NEW_VALUES = (
"PROCESSING",
"INDEXING",
"COMPLETED",
"SKIPPED",
"FAILED",
"CANCELED",
"DELETING",
)
def _drop_status_check_constraint() -> None:
inspector = sa.inspect(op.get_bind())
for constraint in inspector.get_check_constraints(TABLE):
if COLUMN in constraint.get("sqltext", ""):
constraint_name = constraint["name"]
if constraint_name is not None:
op.drop_constraint(constraint_name, TABLE, type_="check")
def upgrade() -> None:
_drop_status_check_constraint()
in_clause = ", ".join(f"'{v}'" for v in NEW_VALUES)
op.create_check_constraint(CONSTRAINT_NAME, TABLE, f"{COLUMN} IN ({in_clause})")
def downgrade() -> None:
op.execute(f"UPDATE {TABLE} SET {COLUMN} = 'COMPLETED' WHERE {COLUMN} = 'SKIPPED'")
_drop_status_check_constraint()
in_clause = ", ".join(f"'{v}'" for v in OLD_VALUES)
op.create_check_constraint(CONSTRAINT_NAME, TABLE, f"{COLUMN} IN ({in_clause})")

View File

@@ -5,7 +5,6 @@ from onyx.background.celery.apps.primary import celery_app
celery_app.autodiscover_tasks(
app_base.filter_task_modules(
[
"ee.onyx.background.celery.tasks.hooks",
"ee.onyx.background.celery.tasks.doc_permission_syncing",
"ee.onyx.background.celery.tasks.external_group_syncing",
"ee.onyx.background.celery.tasks.cloud",

View File

@@ -55,15 +55,6 @@ ee_tasks_to_schedule: list[dict] = []
if not MULTI_TENANT:
ee_tasks_to_schedule = [
{
"name": "hook-execution-log-cleanup",
"task": OnyxCeleryTask.HOOK_EXECUTION_LOG_CLEANUP_TASK,
"schedule": timedelta(days=1),
"options": {
"priority": OnyxCeleryPriority.LOW,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
"name": "autogenerate-usage-report",
"task": OnyxCeleryTask.GENERATE_USAGE_REPORT_TASK,

View File

@@ -13,7 +13,6 @@ from redis.lock import Lock as RedisLock
from ee.onyx.server.tenants.provisioning import setup_tenant
from ee.onyx.server.tenants.schema_management import create_schema_if_not_exists
from ee.onyx.server.tenants.schema_management import get_current_alembic_version
from ee.onyx.server.tenants.schema_management import run_alembic_migrations
from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.app_configs import TARGET_AVAILABLE_TENANTS
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
@@ -30,10 +29,9 @@ from shared_configs.configs import TENANT_ID_PREFIX
# Each tenant takes ~80s (alembic migrations), so 5 tenants ≈ 7 minutes.
_MAX_TENANTS_PER_RUN = 5
# Time limits sized for worst-case: provisioning up to _MAX_TENANTS_PER_RUN new tenants
# (~90s each) plus migrating up to TARGET_AVAILABLE_TENANTS pool tenants (~90s each).
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 20 # 20 minutes
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 25 # 25 minutes
# Time limits sized for worst-case batch: _MAX_TENANTS_PER_RUN × ~90s + buffer.
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 10 # 10 minutes
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 15 # 15 minutes
@shared_task(
@@ -93,7 +91,8 @@ def check_available_tenants(self: Task) -> None: # noqa: ARG001
batch_size = min(tenants_to_provision, _MAX_TENANTS_PER_RUN)
if batch_size < tenants_to_provision:
task_logger.info(
f"Capping batch to {batch_size} (need {tenants_to_provision}, will catch up next cycle)"
f"Capping batch to {batch_size} "
f"(need {tenants_to_provision}, will catch up next cycle)"
)
provisioned = 0
@@ -104,14 +103,12 @@ def check_available_tenants(self: Task) -> None: # noqa: ARG001
provisioned += 1
except Exception:
task_logger.exception(
f"Failed to provision tenant {i + 1}/{batch_size}, continuing with remaining tenants"
f"Failed to provision tenant {i + 1}/{batch_size}, "
"continuing with remaining tenants"
)
task_logger.info(f"Provisioning complete: {provisioned}/{batch_size} succeeded")
# Migrate any pool tenants that were provisioned before a new migration was deployed
_migrate_stale_pool_tenants()
except Exception:
task_logger.exception("Error in check_available_tenants task")
@@ -124,46 +121,6 @@ def check_available_tenants(self: Task) -> None: # noqa: ARG001
)
def _migrate_stale_pool_tenants() -> None:
"""
Run alembic upgrade head on all pool tenants. Since alembic upgrade head is
idempotent, tenants already at head are a fast no-op. This ensures pool
tenants are always current so that signup doesn't hit schema mismatches
(e.g. missing columns added after the tenant was pre-provisioned).
"""
with get_session_with_shared_schema() as db_session:
pool_tenants = db_session.query(AvailableTenant).all()
tenant_ids = [t.tenant_id for t in pool_tenants]
if not tenant_ids:
return
task_logger.info(
f"Checking {len(tenant_ids)} pool tenant(s) for pending migrations"
)
for tenant_id in tenant_ids:
try:
run_alembic_migrations(tenant_id)
new_version = get_current_alembic_version(tenant_id)
with get_session_with_shared_schema() as db_session:
tenant = (
db_session.query(AvailableTenant)
.filter_by(tenant_id=tenant_id)
.first()
)
if tenant and tenant.alembic_version != new_version:
task_logger.info(
f"Migrated pool tenant {tenant_id}: {tenant.alembic_version} -> {new_version}"
)
tenant.alembic_version = new_version
db_session.commit()
except Exception:
task_logger.exception(
f"Failed to migrate pool tenant {tenant_id}, skipping"
)
def pre_provision_tenant() -> bool:
"""
Pre-provision a new tenant and store it in the NewAvailableTenant table.

View File

@@ -69,7 +69,5 @@ EE_ONLY_PATH_PREFIXES: frozenset[str] = frozenset(
"/admin/token-rate-limits",
# Evals
"/evals",
# Hook extensions
"/admin/hooks",
}
)

View File

@@ -1,385 +0,0 @@
"""Hook executor — calls a customer's external HTTP endpoint for a given hook point.
Usage (Celery tasks and FastAPI handlers):
result = execute_hook(
db_session=db_session,
hook_point=HookPoint.QUERY_PROCESSING,
payload={"query": "...", "user_email": "...", "chat_session_id": "..."},
response_type=QueryProcessingResponse,
)
if isinstance(result, HookSkipped):
# no active hook configured — continue with original behavior
...
elif isinstance(result, HookSoftFailed):
# hook failed but fail strategy is SOFT — continue with original behavior
...
else:
# result is a validated Pydantic model instance (response_type)
...
is_reachable update policy
--------------------------
``is_reachable`` on the Hook row is updated selectively — only when the outcome
carries meaningful signal about physical reachability:
NetworkError (DNS, connection refused) → False (cannot reach the server)
HTTP 401 / 403 → False (api_key revoked or invalid)
TimeoutException → None (server may be slow, skip write)
Other HTTP errors (4xx / 5xx) → None (server responded, skip write)
Unknown exception → None (no signal, skip write)
Non-JSON / non-dict response → None (server responded, skip write)
Success (2xx, valid dict) → True (confirmed reachable)
None means "leave the current value unchanged" — no DB round-trip is made.
DB session design
-----------------
The executor uses three sessions:
1. Caller's session (db_session) — used only for the hook lookup read. All
needed fields are extracted from the Hook object before the HTTP call, so
the caller's session is not held open during the external HTTP request.
2. Log session — a separate short-lived session opened after the HTTP call
completes to write the HookExecutionLog row on failure. Success runs are
not recorded. Committed independently of everything else.
3. Reachable session — a second short-lived session to update is_reachable on
the Hook. Kept separate from the log session so a concurrent hook deletion
(which causes update_hook__no_commit to raise OnyxError(NOT_FOUND)) cannot
prevent the execution log from being written. This update is best-effort.
"""
import json
import time
from typing import Any
from typing import TypeVar
import httpx
from pydantic import BaseModel
from pydantic import ValidationError
from sqlalchemy.orm import Session
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.enums import HookFailStrategy
from onyx.db.enums import HookPoint
from onyx.db.hook import create_hook_execution_log__no_commit
from onyx.db.hook import get_non_deleted_hook_by_hook_point
from onyx.db.hook import update_hook__no_commit
from onyx.db.models import Hook
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.hooks.executor import HookSkipped
from onyx.hooks.executor import HookSoftFailed
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
logger = setup_logger()
T = TypeVar("T", bound=BaseModel)
# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------
class _HttpOutcome(BaseModel):
"""Structured result of an HTTP hook call, returned by _process_response."""
is_success: bool
updated_is_reachable: (
bool | None
) # True/False = write to DB, None = unchanged (skip write)
status_code: int | None
error_message: str | None
response_payload: dict[str, Any] | None
def _lookup_hook(
db_session: Session,
hook_point: HookPoint,
) -> Hook | HookSkipped:
"""Return the active Hook or HookSkipped if hooks are unavailable/unconfigured.
No HTTP call is made and no DB writes are performed for any HookSkipped path.
There is nothing to log and no reachability information to update.
"""
if MULTI_TENANT:
return HookSkipped()
hook = get_non_deleted_hook_by_hook_point(
db_session=db_session, hook_point=hook_point
)
if hook is None or not hook.is_active:
return HookSkipped()
if not hook.endpoint_url:
return HookSkipped()
return hook
def _process_response(
*,
response: httpx.Response | None,
exc: Exception | None,
timeout: float,
) -> _HttpOutcome:
"""Process the result of an HTTP call and return a structured outcome.
Called after the client.post() try/except. If post() raised, exc is set and
response is None. Otherwise response is set and exc is None. Handles
raise_for_status(), JSON decoding, and the dict shape check.
"""
if exc is not None:
if isinstance(exc, httpx.NetworkError):
msg = f"Hook network error (endpoint unreachable): {exc}"
logger.warning(msg, exc_info=exc)
return _HttpOutcome(
is_success=False,
updated_is_reachable=False,
status_code=None,
error_message=msg,
response_payload=None,
)
if isinstance(exc, httpx.TimeoutException):
msg = f"Hook timed out after {timeout}s: {exc}"
logger.warning(msg, exc_info=exc)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # timeout doesn't indicate unreachability
status_code=None,
error_message=msg,
response_payload=None,
)
msg = f"Hook call failed: {exc}"
logger.exception(msg, exc_info=exc)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # unknown error — don't make assumptions
status_code=None,
error_message=msg,
response_payload=None,
)
if response is None:
raise ValueError(
"exactly one of response or exc must be non-None; both are None"
)
status_code = response.status_code
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
msg = f"Hook returned HTTP {e.response.status_code}: {e.response.text}"
logger.warning(msg, exc_info=e)
# 401/403 means the api_key has been revoked or is invalid — mark unreachable
# so the operator knows to update it. All other HTTP errors keep is_reachable
# as-is (server is up, the request just failed for application reasons).
auth_failed = e.response.status_code in (401, 403)
return _HttpOutcome(
is_success=False,
updated_is_reachable=False if auth_failed else None,
status_code=status_code,
error_message=msg,
response_payload=None,
)
try:
response_payload = response.json()
except (json.JSONDecodeError, httpx.DecodingError) as e:
msg = f"Hook returned non-JSON response: {e}"
logger.warning(msg, exc_info=e)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # server responded — reachability unchanged
status_code=status_code,
error_message=msg,
response_payload=None,
)
if not isinstance(response_payload, dict):
msg = f"Hook returned non-dict JSON (got {type(response_payload).__name__})"
logger.warning(msg)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # server responded — reachability unchanged
status_code=status_code,
error_message=msg,
response_payload=None,
)
return _HttpOutcome(
is_success=True,
updated_is_reachable=True,
status_code=status_code,
error_message=None,
response_payload=response_payload,
)
def _persist_result(
*,
hook_id: int,
outcome: _HttpOutcome,
duration_ms: int,
) -> None:
"""Write the execution log on failure and optionally update is_reachable, each
in its own session so a failure in one does not affect the other."""
# Only write the execution log on failure — success runs are not recorded.
# Must not be skipped if the is_reachable update fails (e.g. hook concurrently
# deleted between the initial lookup and here).
if not outcome.is_success:
try:
with get_session_with_current_tenant() as log_session:
create_hook_execution_log__no_commit(
db_session=log_session,
hook_id=hook_id,
is_success=False,
error_message=outcome.error_message,
status_code=outcome.status_code,
duration_ms=duration_ms,
)
log_session.commit()
except Exception:
logger.exception(
f"Failed to persist hook execution log for hook_id={hook_id}"
)
# Update is_reachable separately — best-effort, non-critical.
# None means the value is unchanged (set by the caller to skip the no-op write).
# update_hook__no_commit can raise OnyxError(NOT_FOUND) if the hook was
# concurrently deleted, so keep this isolated from the log write above.
if outcome.updated_is_reachable is not None:
try:
with get_session_with_current_tenant() as reachable_session:
update_hook__no_commit(
db_session=reachable_session,
hook_id=hook_id,
is_reachable=outcome.updated_is_reachable,
)
reachable_session.commit()
except Exception:
logger.warning(f"Failed to update is_reachable for hook_id={hook_id}")
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def _execute_hook_inner(
hook: Hook,
payload: dict[str, Any],
response_type: type[T],
) -> T | HookSoftFailed:
"""Make the HTTP call, validate the response, and return a typed model.
Raises OnyxError on HARD failure. Returns HookSoftFailed on SOFT failure.
"""
timeout = hook.timeout_seconds
hook_id = hook.id
fail_strategy = hook.fail_strategy
endpoint_url = hook.endpoint_url
current_is_reachable: bool | None = hook.is_reachable
if not endpoint_url:
raise ValueError(
f"hook_id={hook_id} is active but has no endpoint_url — "
"active hooks without an endpoint_url must be rejected by _lookup_hook"
)
start = time.monotonic()
response: httpx.Response | None = None
exc: Exception | None = None
try:
api_key: str | None = (
hook.api_key.get_value(apply_mask=False) if hook.api_key else None
)
headers: dict[str, str] = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
with httpx.Client(
timeout=timeout, follow_redirects=False
) as client: # SSRF guard: never follow redirects
response = client.post(endpoint_url, json=payload, headers=headers)
except Exception as e:
exc = e
duration_ms = int((time.monotonic() - start) * 1000)
outcome = _process_response(response=response, exc=exc, timeout=timeout)
# Validate the response payload against response_type.
# A validation failure downgrades the outcome to a failure so it is logged,
# is_reachable is left unchanged (server responded — just a bad payload),
# and fail_strategy is respected below.
validated_model: T | None = None
if outcome.is_success and outcome.response_payload is not None:
try:
validated_model = response_type.model_validate(outcome.response_payload)
except ValidationError as e:
msg = (
f"Hook response failed validation against {response_type.__name__}: {e}"
)
outcome = _HttpOutcome(
is_success=False,
updated_is_reachable=None, # server responded — reachability unchanged
status_code=outcome.status_code,
error_message=msg,
response_payload=None,
)
# Skip the is_reachable write when the value would not change — avoids a
# no-op DB round-trip on every call when the hook is already in the expected state.
if outcome.updated_is_reachable == current_is_reachable:
outcome = outcome.model_copy(update={"updated_is_reachable": None})
_persist_result(hook_id=hook_id, outcome=outcome, duration_ms=duration_ms)
if not outcome.is_success:
if fail_strategy == HookFailStrategy.HARD:
raise OnyxError(
OnyxErrorCode.HOOK_EXECUTION_FAILED,
outcome.error_message or "Hook execution failed.",
)
logger.warning(
f"Hook execution failed (soft fail) for hook_id={hook_id}: {outcome.error_message}"
)
return HookSoftFailed()
if validated_model is None:
raise OnyxError(
OnyxErrorCode.INTERNAL_ERROR,
f"validated_model is None for successful hook call (hook_id={hook_id})",
)
return validated_model
def _execute_hook_impl(
*,
db_session: Session,
hook_point: HookPoint,
payload: dict[str, Any],
response_type: type[T],
) -> T | HookSkipped | HookSoftFailed:
"""EE implementation — loaded by CE's execute_hook via fetch_versioned_implementation.
Returns HookSkipped if no active hook is configured, HookSoftFailed if the
hook failed with SOFT fail strategy, or a validated response model on success.
Raises OnyxError on HARD failure or if the hook is misconfigured.
"""
hook = _lookup_hook(db_session, hook_point)
if isinstance(hook, HookSkipped):
return hook
fail_strategy = hook.fail_strategy
hook_id = hook.id
try:
return _execute_hook_inner(hook, payload, response_type)
except Exception:
if fail_strategy == HookFailStrategy.SOFT:
logger.exception(
f"Unexpected error in hook execution (soft fail) for hook_id={hook_id}"
)
return HookSoftFailed()
raise

View File

@@ -15,7 +15,6 @@ from ee.onyx.server.enterprise_settings.api import (
basic_router as enterprise_settings_router,
)
from ee.onyx.server.evals.api import router as evals_router
from ee.onyx.server.features.hooks.api import router as hook_router
from ee.onyx.server.license.api import router as license_router
from ee.onyx.server.manage.standard_answer import router as standard_answer_router
from ee.onyx.server.middleware.license_enforcement import (
@@ -139,7 +138,6 @@ def get_application() -> FastAPI:
include_router_with_global_prefix_prepended(application, ee_oauth_router)
include_router_with_global_prefix_prepended(application, ee_document_cc_pair_router)
include_router_with_global_prefix_prepended(application, evals_router)
include_router_with_global_prefix_prepended(application, hook_router)
# Enterprise-only global settings
include_router_with_global_prefix_prepended(

View File

@@ -99,26 +99,6 @@ async def get_or_provision_tenant(
tenant_id = await get_available_tenant()
if tenant_id:
# Run migrations to ensure the pre-provisioned tenant schema is current.
# Pool tenants may have been created before a new migration was deployed.
# Capture as a non-optional local so mypy can type the lambda correctly.
_tenant_id: str = tenant_id
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(
None, lambda: run_alembic_migrations(_tenant_id)
)
except Exception:
# The tenant was already dequeued from the pool — roll it back so
# it doesn't end up orphaned (schema exists, but not assigned to anyone).
logger.exception(
f"Migration failed for pre-provisioned tenant {_tenant_id}; rolling back"
)
try:
await rollback_tenant_provisioning(_tenant_id)
except Exception:
logger.exception(f"Failed to rollback orphaned tenant {_tenant_id}")
raise
# If we have a pre-provisioned tenant, assign it to the user
await assign_tenant_to_user(tenant_id, email, referral_source)
logger.info(f"Assigned pre-provisioned tenant {tenant_id} to user {email}")

View File

@@ -100,7 +100,6 @@ def get_model_app() -> FastAPI:
dsn=SENTRY_DSN,
integrations=[StarletteIntegration(), FastApiIntegration()],
traces_sample_rate=0.1,
release=__version__,
)
logger.info("Sentry initialized")
else:

View File

@@ -20,7 +20,6 @@ from sentry_sdk.integrations.celery import CeleryIntegration
from sqlalchemy import text
from sqlalchemy.orm import Session
from onyx import __version__
from onyx.background.celery.apps.task_formatters import CeleryTaskColoredFormatter
from onyx.background.celery.apps.task_formatters import CeleryTaskPlainFormatter
from onyx.background.celery.celery_utils import celery_is_worker_primary
@@ -66,7 +65,6 @@ if SENTRY_DSN:
dsn=SENTRY_DSN,
integrations=[CeleryIntegration()],
traces_sample_rate=0.1,
release=__version__,
)
logger.info("Sentry initialized")
else:
@@ -517,8 +515,7 @@ def reset_tenant_id(
def wait_for_vespa_or_shutdown(
sender: Any, # noqa: ARG001
**kwargs: Any, # noqa: ARG001
sender: Any, **kwargs: Any # noqa: ARG001
) -> None: # noqa: ARG001
"""Waits for Vespa to become ready subject to a timeout.
Raises WorkerShutdown if the timeout is reached."""

View File

@@ -317,6 +317,7 @@ celery_app.autodiscover_tasks(
"onyx.background.celery.tasks.docprocessing",
"onyx.background.celery.tasks.evals",
"onyx.background.celery.tasks.hierarchyfetching",
"onyx.background.celery.tasks.hooks",
"onyx.background.celery.tasks.periodic",
"onyx.background.celery.tasks.pruning",
"onyx.background.celery.tasks.shared",

View File

@@ -14,6 +14,7 @@ from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.hooks.utils import HOOKS_AVAILABLE
from shared_configs.configs import MULTI_TENANT
# choosing 15 minutes because it roughly gives us enough time to process many tasks
@@ -361,6 +362,19 @@ if not MULTI_TENANT:
tasks_to_schedule.extend(beat_task_templates)
if HOOKS_AVAILABLE:
tasks_to_schedule.append(
{
"name": "hook-execution-log-cleanup",
"task": OnyxCeleryTask.HOOK_EXECUTION_LOG_CLEANUP_TASK,
"schedule": timedelta(days=1),
"options": {
"priority": OnyxCeleryPriority.LOW,
"expires": BEAT_EXPIRES_DEFAULT,
},
}
)
def generate_cloud_tasks(
beat_tasks: list[dict], beat_templates: list[dict], beat_multiplier: float

View File

@@ -9,7 +9,6 @@ from celery import Celery
from celery import shared_task
from celery import Task
from onyx import __version__
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.memory_monitoring import emit_process_memory
from onyx.background.celery.tasks.docprocessing.heartbeat import start_heartbeat
@@ -138,7 +137,6 @@ def _docfetching_task(
sentry_sdk.init(
dsn=SENTRY_DSN,
traces_sample_rate=0.1,
release=__version__,
)
logger.info("Sentry initialized")
else:

View File

@@ -319,11 +319,6 @@ def monitor_indexing_attempt_progress(
)
current_db_time = get_db_current_time(db_session)
total_batches: int | str = (
coordination_status.total_batches
if coordination_status.total_batches is not None
else "?"
)
if coordination_status.found:
task_logger.info(
f"Indexing attempt progress: "
@@ -331,7 +326,7 @@ def monitor_indexing_attempt_progress(
f"cc_pair={attempt.connector_credential_pair_id} "
f"search_settings={attempt.search_settings_id} "
f"completed_batches={coordination_status.completed_batches} "
f"total_batches={total_batches} "
f"total_batches={coordination_status.total_batches or '?'} "
f"total_docs={coordination_status.total_docs} "
f"total_failures={coordination_status.total_failures}"
f"elapsed={(current_db_time - attempt.time_created).seconds}"
@@ -415,7 +410,7 @@ def check_indexing_completion(
logger.info(
f"Indexing status: "
f"indexing_completed={indexing_completed} "
f"batches_processed={batches_processed}/{batches_total if batches_total is not None else '?'} "
f"batches_processed={batches_processed}/{batches_total or '?'} "
f"total_docs={coordination_status.total_docs} "
f"total_chunks={coordination_status.total_chunks} "
f"total_failures={coordination_status.total_failures}"

View File

@@ -1,19 +1,8 @@
import threading
import time
from collections.abc import Callable
from collections.abc import Generator
from queue import Empty
from onyx.chat.citation_processor import CitationMapping
from onyx.chat.emitter import Emitter
from onyx.context.search.models import SearchDoc
from onyx.server.query_and_chat.placement import Placement
from onyx.server.query_and_chat.streaming_models import OverallStop
from onyx.server.query_and_chat.streaming_models import Packet
from onyx.server.query_and_chat.streaming_models import PacketException
from onyx.tools.models import ToolCallInfo
from onyx.utils.threadpool_concurrency import run_in_background
from onyx.utils.threadpool_concurrency import wait_on_background
# Type alias for search doc deduplication key
# Simple key: just document_id (str)
@@ -159,114 +148,3 @@ class ChatStateContainer:
"""Thread-safe getter for emitted citations (returns a copy)."""
with self._lock:
return self._emitted_citations.copy()
def run_chat_loop_with_state_containers(
chat_loop_func: Callable[[Emitter, ChatStateContainer], None],
completion_callback: Callable[[ChatStateContainer], None],
is_connected: Callable[[], bool],
emitter: Emitter,
state_container: ChatStateContainer,
) -> Generator[Packet, None]:
"""
Explicit wrapper function that runs a function in a background thread
with event streaming capabilities.
The wrapped function should accept emitter as first arg and use it to emit
Packet objects. This wrapper polls every 300ms to check if stop signal is set.
Args:
func: The function to wrap (should accept emitter and state_container as first and second args)
completion_callback: Callback function to call when the function completes
emitter: Emitter instance for sending packets
state_container: ChatStateContainer instance for accumulating state
is_connected: Callable that returns False when stop signal is set
Usage:
packets = run_chat_loop_with_state_containers(
my_func,
completion_callback=completion_callback,
emitter=emitter,
state_container=state_container,
is_connected=check_func,
)
for packet in packets:
# Process packets
pass
"""
def run_with_exception_capture() -> None:
try:
chat_loop_func(emitter, state_container)
except Exception as e:
# If execution fails, emit an exception packet
emitter.emit(
Packet(
placement=Placement(turn_index=0),
obj=PacketException(type="error", exception=e),
)
)
# Run the function in a background thread
thread = run_in_background(run_with_exception_capture)
pkt: Packet | None = None
last_turn_index = 0 # Track the highest turn_index seen for stop packet
last_cancel_check = time.monotonic()
cancel_check_interval = 0.3 # Check for cancellation every 300ms
try:
while True:
# Poll queue with 300ms timeout for natural stop signal checking
# the 300ms timeout is to avoid busy-waiting and to allow the stop signal to be checked regularly
try:
pkt = emitter.bus.get(timeout=0.3)
except Empty:
if not is_connected():
# Stop signal detected
yield Packet(
placement=Placement(turn_index=last_turn_index + 1),
obj=OverallStop(type="stop", stop_reason="user_cancelled"),
)
break
last_cancel_check = time.monotonic()
continue
if pkt is not None:
# Track the highest turn_index for the stop packet
if pkt.placement and pkt.placement.turn_index > last_turn_index:
last_turn_index = pkt.placement.turn_index
if isinstance(pkt.obj, OverallStop):
yield pkt
break
elif isinstance(pkt.obj, PacketException):
raise pkt.obj.exception
else:
yield pkt
# Check for cancellation periodically even when packets are flowing
# This ensures stop signal is checked during active streaming
current_time = time.monotonic()
if current_time - last_cancel_check >= cancel_check_interval:
if not is_connected():
# Stop signal detected during streaming
yield Packet(
placement=Placement(turn_index=last_turn_index + 1),
obj=OverallStop(type="stop", stop_reason="user_cancelled"),
)
break
last_cancel_check = current_time
finally:
# Wait for thread to complete on normal exit to propagate exceptions and ensure cleanup.
# Skip waiting if user disconnected to exit quickly.
if is_connected():
wait_on_background(thread)
try:
completion_callback(state_container)
except Exception as e:
emitter.emit(
Packet(
placement=Placement(turn_index=last_turn_index + 1),
obj=PacketException(type="error", exception=e),
)
)

View File

@@ -5,7 +5,6 @@ from typing import cast
from uuid import UUID
from fastapi.datastructures import Headers
from pydantic import BaseModel
from sqlalchemy.orm import Session
from onyx.chat.models import ChatHistoryResult
@@ -52,60 +51,6 @@ logger = setup_logger()
IMAGE_GENERATION_TOOL_NAME = "generate_image"
class FileContextResult(BaseModel):
"""Result of building a file's LLM context representation."""
message: ChatMessageSimple
tool_metadata: FileToolMetadata
def build_file_context(
tool_file_id: str,
filename: str,
file_type: ChatFileType,
content_text: str | None = None,
token_count: int = 0,
approx_char_count: int | None = None,
) -> FileContextResult:
"""Build the LLM context representation for a single file.
Centralises how files should appear in the LLM prompt
— the ID that FileReaderTool accepts (``UserFile.id`` for user files).
"""
if file_type.use_metadata_only():
message_text = (
f"File: {filename} (id={tool_file_id})\n"
"Use the file_reader or python tools to access "
"this file's contents."
)
message = ChatMessageSimple(
message=message_text,
token_count=max(1, len(message_text) // 4),
message_type=MessageType.USER,
file_id=tool_file_id,
)
else:
message_text = f"File: {filename}\n{content_text or ''}\nEnd of File"
message = ChatMessageSimple(
message=message_text,
token_count=token_count,
message_type=MessageType.USER,
file_id=tool_file_id,
)
metadata = FileToolMetadata(
file_id=tool_file_id,
filename=filename,
approx_char_count=(
approx_char_count
if approx_char_count is not None
else len(content_text or "")
),
)
return FileContextResult(message=message, tool_metadata=metadata)
def create_chat_session_from_request(
chat_session_request: ChatSessionCreationRequest,
user_id: UUID | None,
@@ -593,7 +538,7 @@ def convert_chat_history(
for idx, chat_message in enumerate(chat_history):
if chat_message.message_type == MessageType.USER:
# Process files attached to this message
text_files: list[tuple[ChatLoadedFile, FileDescriptor]] = []
text_files: list[ChatLoadedFile] = []
image_files: list[ChatLoadedFile] = []
if chat_message.files:
@@ -604,26 +549,34 @@ def convert_chat_history(
if loaded_file.file_type == ChatFileType.IMAGE:
image_files.append(loaded_file)
else:
# Text files (DOC, PLAIN_TEXT, TABULAR) are added as separate messages
text_files.append((loaded_file, file_descriptor))
# Text files (DOC, PLAIN_TEXT, CSV) are added as separate messages
text_files.append(loaded_file)
# Add text files as separate messages before the user message.
# Each message is tagged with ``file_id`` so that forgotten files
# can be detected after context-window truncation.
for text_file, fd in text_files:
# Use user_file_id as the FileReaderTool accepts that.
# Fall back to the file-store path id.
tool_id = fd.get("user_file_id") or text_file.file_id
filename = text_file.filename or "unknown"
ctx = build_file_context(
tool_file_id=tool_id,
filename=filename,
file_type=text_file.file_type,
content_text=text_file.content_text,
token_count=text_file.token_count,
for text_file in text_files:
file_text = text_file.content_text or ""
filename = text_file.filename
message = (
f"File: {filename}\n{file_text}\nEnd of File"
if filename
else file_text
)
simple_messages.append(
ChatMessageSimple(
message=message,
token_count=text_file.token_count,
message_type=MessageType.USER,
image_files=None,
file_id=text_file.file_id,
)
)
all_injected_file_metadata[text_file.file_id] = FileToolMetadata(
file_id=text_file.file_id,
filename=filename or "unknown",
approx_char_count=len(file_text),
)
simple_messages.append(ctx.message)
all_injected_file_metadata[tool_id] = ctx.tool_metadata
# Sum token counts from image files (excluding project image files)
image_token_count = (

View File

@@ -1,19 +1,40 @@
import threading
from queue import Queue
from onyx.server.query_and_chat.placement import Placement
from onyx.server.query_and_chat.streaming_models import Packet
class Emitter:
"""Use this inside tools to emit arbitrary UI progress."""
"""Routes packets from LLM/tool execution to the ``_run_models`` drain loop.
def __init__(self, bus: Queue):
self.bus = bus
Tags every packet with ``model_index`` and places it on ``merged_queue``
as a ``(model_idx, packet)`` tuple for ordered consumption downstream.
Args:
merged_queue: Shared queue owned by ``_run_models``.
model_idx: Index embedded in packet placements (``0`` for N=1 runs).
drain_done: Optional event set by ``_run_models`` when the drain loop
exits early (e.g. HTTP disconnect). When set, ``emit`` returns
immediately so worker threads can exit fast.
"""
def __init__(
self,
merged_queue: Queue[tuple[int, Packet | Exception | object]],
model_idx: int = 0,
drain_done: threading.Event | None = None,
) -> None:
self._model_idx = model_idx
self._merged_queue = merged_queue
self._drain_done = drain_done
def emit(self, packet: Packet) -> None:
self.bus.put(packet) # Thread-safe
def get_default_emitter() -> Emitter:
bus: Queue[Packet] = Queue()
emitter = Emitter(bus)
return emitter
if self._drain_done is not None and self._drain_done.is_set():
return
base = packet.placement or Placement(turn_index=0)
tagged = Packet(
placement=base.model_copy(update={"model_index": self._model_idx}),
obj=packet.obj,
)
self._merged_queue.put((self._model_idx, tagged))

File diff suppressed because it is too large Load Diff

View File

@@ -1079,6 +1079,7 @@ POD_NAMESPACE = os.environ.get("POD_NAMESPACE")
DEV_MODE = os.environ.get("DEV_MODE", "").lower() == "true"
HOOK_ENABLED = os.environ.get("HOOK_ENABLED", "").lower() == "true"
INTEGRATION_TESTS_MODE = os.environ.get("INTEGRATION_TESTS_MODE", "").lower() == "true"

View File

@@ -212,7 +212,6 @@ class DocumentSource(str, Enum):
PRODUCTBOARD = "productboard"
FILE = "file"
CODA = "coda"
CANVAS = "canvas"
NOTION = "notion"
ZULIP = "zulip"
LINEAR = "linear"
@@ -673,7 +672,6 @@ DocumentSourceDescription: dict[DocumentSource, str] = {
DocumentSource.SLAB: "slab data",
DocumentSource.PRODUCTBOARD: "productboard data (boards, etc.)",
DocumentSource.FILE: "files",
DocumentSource.CANVAS: "canvas lms - courses, pages, assignments, and announcements",
DocumentSource.CODA: "coda - team workspace with docs, tables, and pages",
DocumentSource.NOTION: "notion data - a workspace that combines note-taking, \
project management, and collaboration tools into a single, customizable platform",

View File

@@ -1,32 +0,0 @@
"""
Permissioning / AccessControl logic for Canvas courses.
CE stub — returns None (no permissions). The EE implementation is loaded
at runtime via ``fetch_versioned_implementation``.
"""
from collections.abc import Callable
from typing import cast
from onyx.access.models import ExternalAccess
from onyx.connectors.canvas.client import CanvasApiClient
from onyx.utils.variable_functionality import fetch_versioned_implementation
from onyx.utils.variable_functionality import global_version
def get_course_permissions(
canvas_client: CanvasApiClient,
course_id: int,
) -> ExternalAccess | None:
if not global_version.is_ee_version():
return None
ee_get_course_permissions = cast(
Callable[[CanvasApiClient, int], ExternalAccess | None],
fetch_versioned_implementation(
"onyx.external_permissions.canvas.access",
"get_course_permissions",
),
)
return ee_get_course_permissions(canvas_client, course_id)

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
import logging
import re
from collections.abc import Iterator
from typing import Any
from urllib.parse import urlparse
@@ -191,22 +190,3 @@ class CanvasApiClient:
if clean_endpoint:
final_url += "/" + clean_endpoint
return final_url
def paginate(
self,
endpoint: str,
params: dict[str, Any] | None = None,
) -> Iterator[list[Any]]:
"""Yield each page of results, following Link-header pagination.
Makes the first request with endpoint + params, then follows
next_url from Link headers for subsequent pages.
"""
response, next_url = self.get(endpoint, params=params)
while True:
if not response:
break
yield response
if not next_url:
break
response, next_url = self.get(full_url=next_url)

View File

@@ -1,82 +1,17 @@
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import cast
from typing import Literal
from typing import NoReturn
from typing import TypeAlias
from pydantic import BaseModel
from retry import retry
from typing_extensions import override
from onyx.access.models import ExternalAccess
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.canvas.access import get_course_permissions
from onyx.connectors.canvas.client import CanvasApiClient
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedValidationError
from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import ImageSection
from onyx.connectors.models import TextSection
from onyx.error_handling.exceptions import OnyxError
from onyx.file_processing.html_utils import parse_html_page_basic
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.logger import setup_logger
logger = setup_logger()
def _handle_canvas_api_error(e: OnyxError) -> NoReturn:
"""Map Canvas API errors to connector framework exceptions."""
if e.status_code == 401:
raise CredentialExpiredError(
"Canvas API token is invalid or expired (HTTP 401)."
)
elif e.status_code == 403:
raise InsufficientPermissionsError(
"Canvas API token does not have sufficient permissions (HTTP 403)."
)
elif e.status_code == 429:
raise ConnectorValidationError(
"Canvas rate-limit exceeded (HTTP 429). Please try again later."
)
elif e.status_code >= 500:
raise UnexpectedValidationError(
f"Unexpected Canvas HTTP error (status={e.status_code}): {e}"
)
else:
raise ConnectorValidationError(
f"Canvas API error (status={e.status_code}): {e}"
)
class CanvasCourse(BaseModel):
id: int
name: str | None = None
course_code: str | None = None
created_at: str | None = None
workflow_state: str | None = None
@classmethod
def from_api(cls, payload: dict[str, Any]) -> "CanvasCourse":
return cls(
id=payload["id"],
name=payload.get("name"),
course_code=payload.get("course_code"),
created_at=payload.get("created_at"),
workflow_state=payload.get("workflow_state"),
)
name: str
course_code: str
created_at: str
workflow_state: str
class CanvasPage(BaseModel):
@@ -84,22 +19,10 @@ class CanvasPage(BaseModel):
url: str
title: str
body: str | None = None
created_at: str | None = None
updated_at: str | None = None
created_at: str
updated_at: str
course_id: int
@classmethod
def from_api(cls, payload: dict[str, Any], course_id: int) -> "CanvasPage":
return cls(
page_id=payload["page_id"],
url=payload["url"],
title=payload["title"],
body=payload.get("body"),
created_at=payload.get("created_at"),
updated_at=payload.get("updated_at"),
course_id=course_id,
)
class CanvasAssignment(BaseModel):
id: int
@@ -107,23 +30,10 @@ class CanvasAssignment(BaseModel):
description: str | None = None
html_url: str
course_id: int
created_at: str | None = None
updated_at: str | None = None
created_at: str
updated_at: str
due_at: str | None = None
@classmethod
def from_api(cls, payload: dict[str, Any], course_id: int) -> "CanvasAssignment":
return cls(
id=payload["id"],
name=payload["name"],
description=payload.get("description"),
html_url=payload["html_url"],
course_id=course_id,
created_at=payload.get("created_at"),
updated_at=payload.get("updated_at"),
due_at=payload.get("due_at"),
)
class CanvasAnnouncement(BaseModel):
id: int
@@ -133,17 +43,6 @@ class CanvasAnnouncement(BaseModel):
posted_at: str | None = None
course_id: int
@classmethod
def from_api(cls, payload: dict[str, Any], course_id: int) -> "CanvasAnnouncement":
return cls(
id=payload["id"],
title=payload["title"],
message=payload.get("message"),
html_url=payload["html_url"],
posted_at=payload.get("posted_at"),
course_id=course_id,
)
CanvasStage: TypeAlias = Literal["pages", "assignments", "announcements"]
@@ -173,286 +72,3 @@ class CanvasConnectorCheckpoint(ConnectorCheckpoint):
self.current_course_index += 1
self.stage = "pages"
self.next_url = None
class CanvasConnector(
CheckpointedConnectorWithPermSync[CanvasConnectorCheckpoint],
SlimConnectorWithPermSync,
):
def __init__(
self,
canvas_base_url: str,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.canvas_base_url = canvas_base_url.rstrip("/").removesuffix("/api/v1")
self.batch_size = batch_size
self._canvas_client: CanvasApiClient | None = None
self._course_permissions_cache: dict[int, ExternalAccess | None] = {}
@property
def canvas_client(self) -> CanvasApiClient:
if self._canvas_client is None:
raise ConnectorMissingCredentialError("Canvas")
return self._canvas_client
def _get_course_permissions(self, course_id: int) -> ExternalAccess | None:
"""Get course permissions with caching."""
if course_id not in self._course_permissions_cache:
self._course_permissions_cache[course_id] = get_course_permissions(
canvas_client=self.canvas_client,
course_id=course_id,
)
return self._course_permissions_cache[course_id]
@retry(tries=3, delay=1, backoff=2)
def _list_courses(self) -> list[CanvasCourse]:
"""Fetch all courses accessible to the authenticated user."""
logger.debug("Fetching Canvas courses")
courses: list[CanvasCourse] = []
for page in self.canvas_client.paginate(
"courses", params={"per_page": "100", "state[]": "available"}
):
courses.extend(CanvasCourse.from_api(c) for c in page)
return courses
@retry(tries=3, delay=1, backoff=2)
def _list_pages(self, course_id: int) -> list[CanvasPage]:
"""Fetch all pages for a given course."""
logger.debug(f"Fetching pages for course {course_id}")
pages: list[CanvasPage] = []
for page in self.canvas_client.paginate(
f"courses/{course_id}/pages",
params={"per_page": "100", "include[]": "body", "published": "true"},
):
pages.extend(CanvasPage.from_api(p, course_id=course_id) for p in page)
return pages
@retry(tries=3, delay=1, backoff=2)
def _list_assignments(self, course_id: int) -> list[CanvasAssignment]:
"""Fetch all assignments for a given course."""
logger.debug(f"Fetching assignments for course {course_id}")
assignments: list[CanvasAssignment] = []
for page in self.canvas_client.paginate(
f"courses/{course_id}/assignments",
params={"per_page": "100", "published": "true"},
):
assignments.extend(
CanvasAssignment.from_api(a, course_id=course_id) for a in page
)
return assignments
@retry(tries=3, delay=1, backoff=2)
def _list_announcements(self, course_id: int) -> list[CanvasAnnouncement]:
"""Fetch all announcements for a given course."""
logger.debug(f"Fetching announcements for course {course_id}")
announcements: list[CanvasAnnouncement] = []
for page in self.canvas_client.paginate(
"announcements",
params={
"per_page": "100",
"context_codes[]": f"course_{course_id}",
"active_only": "true",
},
):
announcements.extend(
CanvasAnnouncement.from_api(a, course_id=course_id) for a in page
)
return announcements
def _build_document(
self,
doc_id: str,
link: str,
text: str,
semantic_identifier: str,
doc_updated_at: datetime | None,
course_id: int,
doc_type: str,
) -> Document:
"""Build a Document with standard Canvas fields."""
return Document(
id=doc_id,
sections=cast(
list[TextSection | ImageSection],
[TextSection(link=link, text=text)],
),
source=DocumentSource.CANVAS,
semantic_identifier=semantic_identifier,
doc_updated_at=doc_updated_at,
metadata={"course_id": str(course_id), "type": doc_type},
)
def _convert_page_to_document(self, page: CanvasPage) -> Document:
"""Convert a Canvas page to a Document."""
link = f"{self.canvas_base_url}/courses/{page.course_id}/pages/{page.url}"
text_parts = [page.title]
body_text = parse_html_page_basic(page.body) if page.body else ""
if body_text:
text_parts.append(body_text)
doc_updated_at = (
datetime.fromisoformat(page.updated_at.replace("Z", "+00:00")).astimezone(
timezone.utc
)
if page.updated_at
else None
)
document = self._build_document(
doc_id=f"canvas-page-{page.course_id}-{page.page_id}",
link=link,
text="\n\n".join(text_parts),
semantic_identifier=page.title or f"Page {page.page_id}",
doc_updated_at=doc_updated_at,
course_id=page.course_id,
doc_type="page",
)
return document
def _convert_assignment_to_document(self, assignment: CanvasAssignment) -> Document:
"""Convert a Canvas assignment to a Document."""
text_parts = [assignment.name]
desc_text = (
parse_html_page_basic(assignment.description)
if assignment.description
else ""
)
if desc_text:
text_parts.append(desc_text)
if assignment.due_at:
due_dt = datetime.fromisoformat(
assignment.due_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
text_parts.append(f"Due: {due_dt.strftime('%B %d, %Y %H:%M UTC')}")
doc_updated_at = (
datetime.fromisoformat(
assignment.updated_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
if assignment.updated_at
else None
)
document = self._build_document(
doc_id=f"canvas-assignment-{assignment.course_id}-{assignment.id}",
link=assignment.html_url,
text="\n\n".join(text_parts),
semantic_identifier=assignment.name or f"Assignment {assignment.id}",
doc_updated_at=doc_updated_at,
course_id=assignment.course_id,
doc_type="assignment",
)
return document
def _convert_announcement_to_document(
self, announcement: CanvasAnnouncement
) -> Document:
"""Convert a Canvas announcement to a Document."""
text_parts = [announcement.title]
msg_text = (
parse_html_page_basic(announcement.message) if announcement.message else ""
)
if msg_text:
text_parts.append(msg_text)
doc_updated_at = (
datetime.fromisoformat(
announcement.posted_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
if announcement.posted_at
else None
)
document = self._build_document(
doc_id=f"canvas-announcement-{announcement.course_id}-{announcement.id}",
link=announcement.html_url,
text="\n\n".join(text_parts),
semantic_identifier=announcement.title or f"Announcement {announcement.id}",
doc_updated_at=doc_updated_at,
course_id=announcement.course_id,
doc_type="announcement",
)
return document
@override
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
"""Load and validate Canvas credentials."""
access_token = credentials.get("canvas_access_token")
if not access_token:
raise ConnectorMissingCredentialError("Canvas")
try:
client = CanvasApiClient(
bearer_token=access_token,
canvas_base_url=self.canvas_base_url,
)
client.get("courses", params={"per_page": "1"})
except ValueError as e:
raise ConnectorValidationError(f"Invalid Canvas base URL: {e}")
except OnyxError as e:
_handle_canvas_api_error(e)
self._canvas_client = client
return None
@override
def validate_connector_settings(self) -> None:
"""Validate Canvas connector settings by testing API access."""
try:
self.canvas_client.get("courses", params={"per_page": "1"})
logger.info("Canvas connector settings validated successfully")
except OnyxError as e:
_handle_canvas_api_error(e)
except ConnectorMissingCredentialError:
raise
except Exception as exc:
raise UnexpectedValidationError(
f"Unexpected error during Canvas settings validation: {exc}"
)
@override
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CanvasConnectorCheckpoint,
) -> CheckpointOutput[CanvasConnectorCheckpoint]:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def load_from_checkpoint_with_perm_sync(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CanvasConnectorCheckpoint,
) -> CheckpointOutput[CanvasConnectorCheckpoint]:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def build_dummy_checkpoint(self) -> CanvasConnectorCheckpoint:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def validate_checkpoint_json(
self, checkpoint_json: str
) -> CanvasConnectorCheckpoint:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
# TODO(benwu408): implemented in PR4 (perm sync)
raise NotImplementedError

View File

@@ -11,13 +11,11 @@ from discord import Client
from discord.channel import TextChannel
from discord.channel import Thread
from discord.enums import MessageType
from discord.errors import LoginFailure
from discord.flags import Intents
from discord.message import Message as DiscordMessage
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.exceptions import CredentialInvalidError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
@@ -211,19 +209,8 @@ def _manage_async_retrieval(
intents = Intents.default()
intents.message_content = True
async with Client(intents=intents) as discord_client:
start_task = asyncio.create_task(discord_client.start(token))
ready_task = asyncio.create_task(discord_client.wait_until_ready())
done, _ = await asyncio.wait(
{start_task, ready_task},
return_when=asyncio.FIRST_COMPLETED,
)
# start() runs indefinitely once connected, so it only lands
# in `done` when login/connection failed — propagate the error.
if start_task in done:
ready_task.cancel()
start_task.result()
asyncio.create_task(discord_client.start(token))
await discord_client.wait_until_ready()
filtered_channels: list[TextChannel] = await _fetch_filtered_channels(
discord_client=discord_client,
@@ -289,19 +276,6 @@ class DiscordConnector(PollConnector, LoadConnector):
self._discord_bot_token = credentials["discord_bot_token"]
return None
def validate_connector_settings(self) -> None:
loop = asyncio.new_event_loop()
try:
client = Client(intents=Intents.default())
try:
loop.run_until_complete(client.login(self.discord_bot_token))
except LoginFailure as e:
raise CredentialInvalidError(f"Invalid Discord bot token: {e}")
finally:
loop.run_until_complete(client.close())
finally:
loop.close()
def _manage_doc_batching(
self,
start: datetime | None = None,

View File

@@ -8,6 +8,7 @@ from collections.abc import Generator
from collections.abc import Iterator
from datetime import datetime
from enum import Enum
from functools import partial
from typing import Any
from typing import cast
from typing import Protocol
@@ -42,9 +43,6 @@ from onyx.connectors.google_drive.file_retrieval import (
get_all_files_in_my_drive_and_shared,
)
from onyx.connectors.google_drive.file_retrieval import get_external_access_for_folder
from onyx.connectors.google_drive.file_retrieval import (
get_files_by_web_view_links_batch,
)
from onyx.connectors.google_drive.file_retrieval import get_files_in_shared_drive
from onyx.connectors.google_drive.file_retrieval import get_folder_metadata
from onyx.connectors.google_drive.file_retrieval import get_root_folder_id
@@ -73,13 +71,11 @@ from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import NormalizationResult
from onyx.connectors.interfaces import Resolver
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import EntityFailure
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import SlimDocument
@@ -207,9 +203,7 @@ class DriveIdStatus(Enum):
class GoogleDriveConnector(
SlimConnectorWithPermSync,
CheckpointedConnectorWithPermSync[GoogleDriveCheckpoint],
Resolver,
SlimConnectorWithPermSync, CheckpointedConnectorWithPermSync[GoogleDriveCheckpoint]
):
def __init__(
self,
@@ -1493,113 +1487,134 @@ class GoogleDriveConnector(
end=end,
)
def _convert_retrieved_files_to_documents(
def _extract_docs_from_google_drive(
self,
drive_files_iter: Iterator[RetrievedDriveFile],
checkpoint: GoogleDriveCheckpoint,
start: SecondsSinceUnixEpoch | None,
end: SecondsSinceUnixEpoch | None,
include_permissions: bool,
) -> Iterator[Document | ConnectorFailure | HierarchyNode]:
"""
Converts retrieved files to documents, yielding HierarchyNode
objects for ancestor folders before the converted documents.
Retrieves and converts Google Drive files to documents.
Also yields HierarchyNode objects for ancestor folders.
"""
permission_sync_context = (
PermissionSyncContext(
primary_admin_email=self.primary_admin_email,
google_domain=self.google_domain,
)
if include_permissions
else None
field_type = (
DriveFileFieldType.WITH_PERMISSIONS
if include_permissions or self.exclude_domain_link_only
else DriveFileFieldType.STANDARD
)
files_batch: list[RetrievedDriveFile] = []
for retrieved_file in drive_files_iter:
if self.exclude_domain_link_only and has_link_only_permission(
retrieved_file.drive_file
):
continue
if retrieved_file.error is None:
files_batch.append(retrieved_file)
continue
failure_stage = retrieved_file.completion_stage.value
failure_message = f"retrieval failure during stage: {failure_stage},"
failure_message += f"user: {retrieved_file.user_email},"
failure_message += f"parent drive/folder: {retrieved_file.parent_id},"
failure_message += f"error: {retrieved_file.error}"
logger.error(failure_message)
yield ConnectorFailure(
failed_entity=EntityFailure(
entity_id=retrieved_file.drive_file.get("id", failure_stage),
),
failure_message=failure_message,
exception=retrieved_file.error,
)
new_ancestors = self._get_new_ancestors_for_files(
files=files_batch,
seen_hierarchy_node_raw_ids=checkpoint.seen_hierarchy_node_raw_ids,
fully_walked_hierarchy_node_raw_ids=checkpoint.fully_walked_hierarchy_node_raw_ids,
permission_sync_context=permission_sync_context,
add_prefix=True,
)
if new_ancestors:
logger.debug(f"Yielding {len(new_ancestors)} new hierarchy nodes")
yield from new_ancestors
func_with_args = [
(
self._convert_retrieved_file_to_document,
(retrieved_file, permission_sync_context),
)
for retrieved_file in files_batch
]
raw_results = cast(
list[Document | ConnectorFailure | None],
run_functions_tuples_in_parallel(func_with_args, max_workers=8),
)
results: list[Document | ConnectorFailure] = [
r for r in raw_results if r is not None
]
logger.debug(f"batch has {len(results)} docs or failures")
yield from results
checkpoint.retrieved_folder_and_drive_ids = self._retrieved_folder_and_drive_ids
def _convert_retrieved_file_to_document(
self,
retrieved_file: RetrievedDriveFile,
permission_sync_context: PermissionSyncContext | None,
) -> Document | ConnectorFailure | None:
"""
Converts a single retrieved file to a document.
"""
try:
return convert_drive_item_to_document(
# Build permission sync context if needed
permission_sync_context = (
PermissionSyncContext(
primary_admin_email=self.primary_admin_email,
google_domain=self.google_domain,
)
if include_permissions
else None
)
# Prepare a partial function with the credentials and admin email
convert_func = partial(
convert_drive_item_to_document,
self.creds,
self.allow_images,
self.size_threshold,
permission_sync_context,
[retrieved_file.user_email, self.primary_admin_email]
+ get_file_owners(retrieved_file.drive_file, self.primary_admin_email),
retrieved_file.drive_file,
)
# Fetch files in batches
batches_complete = 0
files_batch: list[RetrievedDriveFile] = []
def _yield_batch(
files_batch: list[RetrievedDriveFile],
) -> Iterator[Document | ConnectorFailure | HierarchyNode]:
nonlocal batches_complete
# First, yield any new ancestor hierarchy nodes
new_ancestors = self._get_new_ancestors_for_files(
files=files_batch,
seen_hierarchy_node_raw_ids=checkpoint.seen_hierarchy_node_raw_ids,
fully_walked_hierarchy_node_raw_ids=checkpoint.fully_walked_hierarchy_node_raw_ids,
permission_sync_context=permission_sync_context,
add_prefix=True, # Indexing path - prefix here
)
if new_ancestors:
logger.debug(
f"Yielding {len(new_ancestors)} new hierarchy nodes for batch {batches_complete}"
)
yield from new_ancestors
# Process the batch using run_functions_tuples_in_parallel
func_with_args = [
(
convert_func,
(
[file.user_email, self.primary_admin_email]
+ get_file_owners(
file.drive_file, self.primary_admin_email
),
file.drive_file,
),
)
for file in files_batch
]
results = cast(
list[Document | ConnectorFailure | None],
run_functions_tuples_in_parallel(func_with_args, max_workers=8),
)
logger.debug(
f"finished processing batch {batches_complete} with {len(results)} results"
)
docs_and_failures = [result for result in results if result is not None]
logger.debug(
f"batch {batches_complete} has {len(docs_and_failures)} docs or failures"
)
if docs_and_failures:
yield from docs_and_failures
batches_complete += 1
logger.debug(f"finished yielding batch {batches_complete}")
for retrieved_file in self._fetch_drive_items(
field_type=field_type,
checkpoint=checkpoint,
start=start,
end=end,
):
if self.exclude_domain_link_only and has_link_only_permission(
retrieved_file.drive_file
):
continue
if retrieved_file.error is None:
files_batch.append(retrieved_file)
continue
# handle retrieval errors
failure_stage = retrieved_file.completion_stage.value
failure_message = f"retrieval failure during stage: {failure_stage},"
failure_message += f"user: {retrieved_file.user_email},"
failure_message += f"parent drive/folder: {retrieved_file.parent_id},"
failure_message += f"error: {retrieved_file.error}"
logger.error(failure_message)
yield ConnectorFailure(
failed_entity=EntityFailure(
entity_id=failure_stage,
),
failure_message=failure_message,
exception=retrieved_file.error,
)
yield from _yield_batch(files_batch)
checkpoint.retrieved_folder_and_drive_ids = (
self._retrieved_folder_and_drive_ids
)
except Exception as e:
logger.exception(
f"Error extracting document: "
f"{retrieved_file.drive_file.get('name')} from Google Drive"
)
return ConnectorFailure(
failed_entity=EntityFailure(
entity_id=retrieved_file.drive_file.get("id", "unknown"),
),
failure_message=(
f"Error extracting document: "
f"{retrieved_file.drive_file.get('name')}"
),
exception=e,
)
logger.exception(f"Error extracting documents from Google Drive: {e}")
raise e
def _load_from_checkpoint(
self,
@@ -1623,19 +1638,8 @@ class GoogleDriveConnector(
checkpoint = copy.deepcopy(checkpoint)
self._retrieved_folder_and_drive_ids = checkpoint.retrieved_folder_and_drive_ids
try:
field_type = (
DriveFileFieldType.WITH_PERMISSIONS
if include_permissions or self.exclude_domain_link_only
else DriveFileFieldType.STANDARD
)
drive_files_iter = self._fetch_drive_items(
field_type=field_type,
checkpoint=checkpoint,
start=start,
end=end,
)
yield from self._convert_retrieved_files_to_documents(
drive_files_iter, checkpoint, include_permissions
yield from self._extract_docs_from_google_drive(
checkpoint, start, end, include_permissions
)
except Exception as e:
if MISSING_SCOPES_ERROR_STR in str(e):
@@ -1672,82 +1676,6 @@ class GoogleDriveConnector(
start, end, checkpoint, include_permissions=True
)
@override
def resolve_errors(
self,
errors: list[ConnectorFailure],
include_permissions: bool = False,
) -> Generator[Document | ConnectorFailure | HierarchyNode, None, None]:
if self._creds is None or self._primary_admin_email is None:
raise RuntimeError(
"Credentials missing, should not call this method before calling load_credentials"
)
logger.info(f"Resolving {len(errors)} errors")
doc_ids = [
failure.failed_document.document_id
for failure in errors
if failure.failed_document
]
service = get_drive_service(self.creds, self.primary_admin_email)
field_type = (
DriveFileFieldType.WITH_PERMISSIONS
if include_permissions or self.exclude_domain_link_only
else DriveFileFieldType.STANDARD
)
batch_result = get_files_by_web_view_links_batch(service, doc_ids, field_type)
for doc_id, error in batch_result.errors.items():
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=doc_id,
document_link=doc_id,
),
failure_message=f"Failed to retrieve file during error resolution: {error}",
exception=error,
)
permission_sync_context = (
PermissionSyncContext(
primary_admin_email=self.primary_admin_email,
google_domain=self.google_domain,
)
if include_permissions
else None
)
retrieved_files = [
RetrievedDriveFile(
drive_file=file,
user_email=self.primary_admin_email,
completion_stage=DriveRetrievalStage.DONE,
)
for file in batch_result.files.values()
]
yield from self._get_new_ancestors_for_files(
files=retrieved_files,
seen_hierarchy_node_raw_ids=ThreadSafeSet(),
fully_walked_hierarchy_node_raw_ids=ThreadSafeSet(),
permission_sync_context=permission_sync_context,
add_prefix=True,
)
func_with_args = [
(
self._convert_retrieved_file_to_document,
(rf, permission_sync_context),
)
for rf in retrieved_files
]
results = cast(
list[Document | ConnectorFailure | None],
run_functions_tuples_in_parallel(func_with_args, max_workers=8),
)
for result in results:
if result is not None:
yield result
def _extract_slim_docs_from_google_drive(
self,
checkpoint: GoogleDriveCheckpoint,

View File

@@ -4,12 +4,9 @@ from datetime import datetime
from datetime import timezone
from enum import Enum
from typing import cast
from urllib.parse import parse_qs
from urllib.parse import urlparse
from googleapiclient.discovery import Resource # type: ignore
from googleapiclient.errors import HttpError # type: ignore
from googleapiclient.http import BatchHttpRequest # type: ignore
from onyx.access.models import ExternalAccess
from onyx.connectors.google_drive.constants import DRIVE_FOLDER_TYPE
@@ -61,8 +58,6 @@ SLIM_FILE_FIELDS = (
)
FOLDER_FIELDS = "nextPageToken, files(id, name, permissions, modifiedTime, webViewLink, shortcutDetails)"
MAX_BATCH_SIZE = 100
HIERARCHY_FIELDS = "id, name, parents, webViewLink, mimeType, driveId"
HIERARCHY_FIELDS_WITH_PERMISSIONS = (
@@ -219,7 +214,7 @@ def get_external_access_for_folder(
def _get_fields_for_file_type(field_type: DriveFileFieldType) -> str:
"""Get the appropriate fields string for files().list() based on the field type enum."""
"""Get the appropriate fields string based on the field type enum"""
if field_type == DriveFileFieldType.SLIM:
return SLIM_FILE_FIELDS
elif field_type == DriveFileFieldType.WITH_PERMISSIONS:
@@ -228,25 +223,6 @@ def _get_fields_for_file_type(field_type: DriveFileFieldType) -> str:
return FILE_FIELDS
def _extract_single_file_fields(list_fields: str) -> str:
"""Convert a files().list() fields string to one suitable for files().get().
List fields look like "nextPageToken, files(field1, field2, ...)"
Single-file fields should be just "field1, field2, ..."
"""
start = list_fields.find("files(")
if start == -1:
return list_fields
inner_start = start + len("files(")
inner_end = list_fields.rfind(")")
return list_fields[inner_start:inner_end]
def _get_single_file_fields(field_type: DriveFileFieldType) -> str:
"""Get the appropriate fields string for files().get() based on the field type enum."""
return _extract_single_file_fields(_get_fields_for_file_type(field_type))
def _get_files_in_parent(
service: Resource,
parent_id: str,
@@ -520,112 +496,3 @@ def get_root_folder_id(service: Resource) -> str:
.get(fileId="root", fields=GoogleFields.ID.value)
.execute()[GoogleFields.ID.value]
)
def _extract_file_id_from_web_view_link(web_view_link: str) -> str:
parsed = urlparse(web_view_link)
path_parts = [part for part in parsed.path.split("/") if part]
if "d" in path_parts:
idx = path_parts.index("d")
if idx + 1 < len(path_parts):
return path_parts[idx + 1]
query_params = parse_qs(parsed.query)
for key in ("id", "fileId"):
value = query_params.get(key)
if value and value[0]:
return value[0]
raise ValueError(
f"Unable to extract Drive file id from webViewLink: {web_view_link}"
)
def get_file_by_web_view_link(
service: GoogleDriveService,
web_view_link: str,
fields: str,
) -> GoogleDriveFileType:
"""Retrieve a Google Drive file using its webViewLink."""
file_id = _extract_file_id_from_web_view_link(web_view_link)
return (
service.files()
.get(
fileId=file_id,
supportsAllDrives=True,
fields=fields,
)
.execute()
)
class BatchRetrievalResult:
"""Result of a batch file retrieval, separating successes from errors."""
def __init__(self) -> None:
self.files: dict[str, GoogleDriveFileType] = {}
self.errors: dict[str, Exception] = {}
def get_files_by_web_view_links_batch(
service: GoogleDriveService,
web_view_links: list[str],
field_type: DriveFileFieldType,
) -> BatchRetrievalResult:
"""Retrieve multiple Google Drive files by webViewLink using the batch API.
Returns a BatchRetrievalResult containing successful file retrievals
and errors for any files that could not be fetched.
Automatically splits into chunks of MAX_BATCH_SIZE.
"""
fields = _get_single_file_fields(field_type)
if len(web_view_links) <= MAX_BATCH_SIZE:
return _get_files_by_web_view_links_batch(service, web_view_links, fields)
combined = BatchRetrievalResult()
for i in range(0, len(web_view_links), MAX_BATCH_SIZE):
chunk = web_view_links[i : i + MAX_BATCH_SIZE]
chunk_result = _get_files_by_web_view_links_batch(service, chunk, fields)
combined.files.update(chunk_result.files)
combined.errors.update(chunk_result.errors)
return combined
def _get_files_by_web_view_links_batch(
service: GoogleDriveService,
web_view_links: list[str],
fields: str,
) -> BatchRetrievalResult:
"""Single-batch implementation."""
result = BatchRetrievalResult()
def callback(
request_id: str,
response: GoogleDriveFileType,
exception: Exception | None,
) -> None:
if exception:
logger.warning(f"Error retrieving file {request_id}: {exception}")
result.errors[request_id] = exception
else:
result.files[request_id] = response
batch = cast(BatchHttpRequest, service.new_batch_http_request(callback=callback))
for web_view_link in web_view_links:
try:
file_id = _extract_file_id_from_web_view_link(web_view_link)
request = service.files().get(
fileId=file_id,
supportsAllDrives=True,
fields=fields,
)
batch.add(request, request_id=web_view_link)
except ValueError as e:
logger.warning(f"Failed to extract file ID from {web_view_link}: {e}")
result.errors[web_view_link] = e
batch.execute()
return result

View File

@@ -298,22 +298,6 @@ class CheckpointedConnectorWithPermSync(CheckpointedConnector[CT]):
raise NotImplementedError
class Resolver(BaseConnector):
@abc.abstractmethod
def resolve_errors(
self,
errors: list[ConnectorFailure],
include_permissions: bool = False,
) -> Generator[Document | ConnectorFailure | HierarchyNode, None, None]:
"""Attempts to yield back ALL the documents described by the errors, no checkpointing.
Caller's responsibility is to delete the old ConnectorFailures and replace with the new ones.
If include_permissions is True, the documents will have permissions synced.
May also yield HierarchyNode objects for ancestor folders of resolved documents.
"""
raise NotImplementedError
class HierarchyConnector(BaseConnector):
@abc.abstractmethod
def load_hierarchy(

View File

@@ -72,10 +72,6 @@ CONNECTOR_CLASS_MAP = {
module_path="onyx.connectors.coda.connector",
class_name="CodaConnector",
),
DocumentSource.CANVAS: ConnectorMapping(
module_path="onyx.connectors.canvas.connector",
class_name="CanvasConnector",
),
DocumentSource.NOTION: ConnectorMapping(
module_path="onyx.connectors.notion.connector",
class_name="NotionConnector",

View File

@@ -8,6 +8,7 @@ from uuid import UUID
from fastapi import HTTPException
from sqlalchemy import delete
from sqlalchemy import desc
from sqlalchemy import exists
from sqlalchemy import func
from sqlalchemy import nullsfirst
from sqlalchemy import or_
@@ -131,47 +132,32 @@ def get_chat_sessions_by_user(
if before is not None:
stmt = stmt.where(ChatSession.time_updated < before)
if limit:
stmt = stmt.limit(limit)
if project_id is not None:
stmt = stmt.where(ChatSession.project_id == project_id)
elif only_non_project_chats:
stmt = stmt.where(ChatSession.project_id.is_(None))
# When filtering out failed chats, we apply the limit in Python after
# filtering rather than in SQL, since the post-filter may remove rows.
if limit and include_failed_chats:
stmt = stmt.limit(limit)
if not include_failed_chats:
non_system_message_exists_subq = (
exists()
.where(ChatMessage.chat_session_id == ChatSession.id)
.where(ChatMessage.message_type != MessageType.SYSTEM)
.correlate(ChatSession)
)
# Leeway for newly created chats that don't have messages yet
time = datetime.now(timezone.utc) - timedelta(minutes=5)
recently_created = ChatSession.time_created >= time
stmt = stmt.where(or_(non_system_message_exists_subq, recently_created))
result = db_session.execute(stmt)
chat_sessions = list(result.scalars().all())
chat_sessions = result.scalars().all()
if not include_failed_chats and chat_sessions:
# Filter out "failed" sessions (those with only SYSTEM messages)
# using a separate efficient query instead of a correlated EXISTS
# subquery, which causes full sequential scans of chat_message.
leeway = datetime.now(timezone.utc) - timedelta(minutes=5)
session_ids = [cs.id for cs in chat_sessions if cs.time_created < leeway]
if session_ids:
valid_session_ids_stmt = (
select(ChatMessage.chat_session_id)
.where(ChatMessage.chat_session_id.in_(session_ids))
.where(ChatMessage.message_type != MessageType.SYSTEM)
.distinct()
)
valid_session_ids = set(
db_session.execute(valid_session_ids_stmt).scalars().all()
)
chat_sessions = [
cs
for cs in chat_sessions
if cs.time_created >= leeway or cs.id in valid_session_ids
]
if limit:
chat_sessions = chat_sessions[:limit]
return chat_sessions
return list(chat_sessions)
def delete_orphaned_search_docs(db_session: Session) -> None:
@@ -631,6 +617,92 @@ def reserve_message_id(
return empty_message
def reserve_multi_model_message_ids(
db_session: Session,
chat_session_id: UUID,
parent_message_id: int,
model_display_names: list[str],
) -> list[ChatMessage]:
"""Reserve N assistant message placeholders for multi-model parallel streaming.
All messages share the same parent (the user message). The parent's
latest_child_message_id points to the LAST reserved message so that the
default history-chain walker picks it up.
"""
reserved: list[ChatMessage] = []
for display_name in model_display_names:
msg = ChatMessage(
chat_session_id=chat_session_id,
parent_message_id=parent_message_id,
latest_child_message_id=None,
message="Response was terminated prior to completion, try regenerating.",
token_count=15, # placeholder; updated on completion by llm_loop_completion_handle
message_type=MessageType.ASSISTANT,
model_display_name=display_name,
)
db_session.add(msg)
reserved.append(msg)
# Flush to assign IDs without committing yet
db_session.flush()
# Point parent's latest_child to the last reserved message
parent = (
db_session.query(ChatMessage)
.filter(ChatMessage.id == parent_message_id)
.first()
)
if parent:
parent.latest_child_message_id = reserved[-1].id
db_session.commit()
return reserved
def set_preferred_response(
db_session: Session,
user_message_id: int,
preferred_assistant_message_id: int,
) -> None:
"""Mark one assistant response as the user's preferred choice in a multi-model turn.
Also advances ``latest_child_message_id`` so the preferred response becomes
the active branch for any subsequent messages in the conversation.
Args:
db_session: Active database session.
user_message_id: Primary key of the ``USER``-type ``ChatMessage`` whose
preferred response is being set.
preferred_assistant_message_id: Primary key of the ``ASSISTANT``-type
``ChatMessage`` to prefer. Must be a direct child of ``user_message_id``.
Raises:
ValueError: If either message is not found, if ``user_message_id`` does not
refer to a USER message, or if the assistant message is not a direct child
of the user message.
"""
user_msg = db_session.get(ChatMessage, user_message_id)
if user_msg is None:
raise ValueError(f"User message {user_message_id} not found")
if user_msg.message_type != MessageType.USER:
raise ValueError(f"Message {user_message_id} is not a user message")
assistant_msg = db_session.get(ChatMessage, preferred_assistant_message_id)
if assistant_msg is None:
raise ValueError(
f"Assistant message {preferred_assistant_message_id} not found"
)
if assistant_msg.parent_message_id != user_message_id:
raise ValueError(
f"Assistant message {preferred_assistant_message_id} is not a child "
f"of user message {user_message_id}"
)
user_msg.preferred_response_id = preferred_assistant_message_id
user_msg.latest_child_message_id = preferred_assistant_message_id
db_session.commit()
def create_new_chat_message(
chat_session_id: UUID,
parent_message: ChatMessage,
@@ -853,6 +925,8 @@ def translate_db_message_to_chat_message_detail(
error=chat_message.error,
current_feedback=current_feedback,
processing_duration_seconds=chat_message.processing_duration_seconds,
preferred_response_id=chat_message.preferred_response_id,
model_display_name=chat_message.model_display_name,
)
return chat_msg_detail

View File

@@ -215,7 +215,6 @@ class UserFileStatus(str, PyEnum):
PROCESSING = "PROCESSING"
INDEXING = "INDEXING"
COMPLETED = "COMPLETED"
SKIPPED = "SKIPPED"
FAILED = "FAILED"
CANCELED = "CANCELED"
DELETING = "DELETING"

View File

@@ -7,7 +7,6 @@ from fastapi import HTTPException
from fastapi import UploadFile
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from sqlalchemy import func
from sqlalchemy.orm import Session
from starlette.background import BackgroundTasks
@@ -18,7 +17,6 @@ from onyx.configs.constants import FileOrigin
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.db.enums import UserFileStatus
from onyx.db.models import Project__UserFile
from onyx.db.models import User
from onyx.db.models import UserFile
@@ -36,19 +34,9 @@ class CategorizedFilesResult(BaseModel):
user_files: list[UserFile]
rejected_files: list[RejectedFile]
id_to_temp_id: dict[str, str]
# Filenames that should be stored but not indexed.
skip_indexing_filenames: set[str] = Field(default_factory=set)
# Allow SQLAlchemy ORM models inside this result container
model_config = ConfigDict(arbitrary_types_allowed=True)
@property
def indexable_files(self) -> list[UserFile]:
return [
uf
for uf in self.user_files
if (uf.name or "") not in self.skip_indexing_filenames
]
def build_hashed_file_key(file: UploadFile) -> str:
name_prefix = (file.filename or "")[:50]
@@ -82,7 +70,6 @@ def create_user_files(
)
if new_temp_id is not None:
id_to_temp_id[str(new_id)] = new_temp_id
should_skip = (file.filename or "") in categorized_files.skip_indexing
new_file = UserFile(
id=new_id,
user_id=user.id,
@@ -94,7 +81,6 @@ def create_user_files(
link_url=link_url,
content_type=file.content_type,
file_type=file.content_type,
status=UserFileStatus.SKIPPED if should_skip else UserFileStatus.PROCESSING,
last_accessed_at=datetime.datetime.now(datetime.timezone.utc),
)
# Persist the UserFile first to satisfy FK constraints for association table
@@ -112,7 +98,6 @@ def create_user_files(
user_files=user_files,
rejected_files=rejected_files,
id_to_temp_id=id_to_temp_id,
skip_indexing_filenames=categorized_files.skip_indexing,
)
@@ -138,7 +123,6 @@ def upload_files_to_user_files_with_indexing(
user_files = categorized_files_result.user_files
rejected_files = categorized_files_result.rejected_files
id_to_temp_id = categorized_files_result.id_to_temp_id
indexable_files = categorized_files_result.indexable_files
# Trigger per-file processing immediately for the current tenant
tenant_id = get_current_tenant_id()
for rejected_file in rejected_files:
@@ -150,12 +134,12 @@ def upload_files_to_user_files_with_indexing(
from onyx.background.task_utils import drain_processing_loop
background_tasks.add_task(drain_processing_loop, tenant_id)
for user_file in indexable_files:
for user_file in user_files:
logger.info(f"Queued in-process processing for user_file_id={user_file.id}")
else:
from onyx.background.celery.versioned_apps.client import app as client_app
for user_file in indexable_files:
for user_file in user_files:
task = client_app.send_task(
OnyxCeleryTask.PROCESS_SINGLE_USER_FILE,
kwargs={"user_file_id": user_file.id, "tenant_id": tenant_id},
@@ -171,7 +155,6 @@ def upload_files_to_user_files_with_indexing(
user_files=user_files,
rejected_files=rejected_files,
id_to_temp_id=id_to_temp_id,
skip_indexing_filenames=categorized_files_result.skip_indexing_filenames,
)

View File

@@ -932,7 +932,7 @@ class OpenSearchIndexClient(OpenSearchClient):
def search_for_document_ids(
self,
body: dict[str, Any],
search_type: OpenSearchSearchType = OpenSearchSearchType.UNKNOWN,
search_type: OpenSearchSearchType = OpenSearchSearchType.DOCUMENT_IDS,
) -> list[str]:
"""Searches the index and returns only document chunk IDs.

View File

@@ -60,7 +60,8 @@ class OpenSearchSearchType(str, Enum):
KEYWORD = "keyword"
SEMANTIC = "semantic"
RANDOM = "random"
DOC_ID_RETRIEVAL = "doc_id_retrieval"
ID_RETRIEVAL = "id_retrieval"
DOCUMENT_IDS = "document_ids"
UNKNOWN = "unknown"

View File

@@ -928,7 +928,7 @@ class OpenSearchDocumentIndex(DocumentIndex):
search_hits = self._client.search(
body=query_body,
search_pipeline_id=None,
search_type=OpenSearchSearchType.DOC_ID_RETRIEVAL,
search_type=OpenSearchSearchType.ID_RETRIEVAL,
)
inference_chunks_uncleaned: list[InferenceChunkUncleaned] = [
_convert_retrieved_opensearch_chunk_to_inference_chunk_uncleaned(

View File

@@ -15,7 +15,6 @@ PLAIN_TEXT_MIME_TYPE = "text/plain"
class OnyxMimeTypes:
IMAGE_MIME_TYPES = {"image/jpg", "image/jpeg", "image/png", "image/webp"}
CSV_MIME_TYPES = {"text/csv"}
TABULAR_MIME_TYPES = CSV_MIME_TYPES | {SPREADSHEET_MIME_TYPE}
TEXT_MIME_TYPES = {
PLAIN_TEXT_MIME_TYPE,
"text/markdown",
@@ -35,12 +34,13 @@ class OnyxMimeTypes:
PDF_MIME_TYPE,
WORD_PROCESSING_MIME_TYPE,
PRESENTATION_MIME_TYPE,
SPREADSHEET_MIME_TYPE,
"message/rfc822",
"application/epub+zip",
}
ALLOWED_MIME_TYPES = IMAGE_MIME_TYPES.union(
TEXT_MIME_TYPES, DOCUMENT_MIME_TYPES, TABULAR_MIME_TYPES
TEXT_MIME_TYPES, DOCUMENT_MIME_TYPES, CSV_MIME_TYPES
)
EXCLUDED_IMAGE_TYPES = {
@@ -53,11 +53,6 @@ class OnyxMimeTypes:
class OnyxFileExtensions:
TABULAR_EXTENSIONS = {
".csv",
".tsv",
".xlsx",
}
PLAIN_TEXT_EXTENSIONS = {
".txt",
".md",

View File

@@ -13,21 +13,15 @@ class ChatFileType(str, Enum):
DOC = "document"
# Plain text only contain the text
PLAIN_TEXT = "plain_text"
# Tabular data files (CSV, XLSX)
TABULAR = "tabular"
CSV = "csv"
def is_text_file(self) -> bool:
return self in (
ChatFileType.PLAIN_TEXT,
ChatFileType.DOC,
ChatFileType.TABULAR,
ChatFileType.CSV,
)
def use_metadata_only(self) -> bool:
"""File types where we can ignore the file content
and only use the metadata."""
return self in (ChatFileType.TABULAR,)
class FileDescriptor(TypedDict):
"""NOTE: is a `TypedDict` so it can be used as a type hint for a JSONB column

View File

@@ -110,20 +110,16 @@ def load_user_file(file_id: UUID, db_session: Session) -> InMemoryChatFile:
# check for plain text normalized version first, then use original file otherwise
try:
file_io = file_store.read_file(plaintext_file_name, mode="b")
# Metadata-only file types preserve their original type so
# downstream injection paths can route them correctly.
if chat_file_type.use_metadata_only():
plaintext_chat_file_type = chat_file_type
elif file_io is not None:
# if we have plaintext for image (which happens when image
# extraction is enabled), we use PLAIN_TEXT type
# For plaintext versions, use PLAIN_TEXT type (unless it's an image which doesn't have plaintext)
plaintext_chat_file_type = (
ChatFileType.PLAIN_TEXT
if chat_file_type != ChatFileType.IMAGE
else chat_file_type
)
# if we have plaintext for image (which happens when image extraction is enabled), we use PLAIN_TEXT type
if file_io is not None:
plaintext_chat_file_type = ChatFileType.PLAIN_TEXT
else:
plaintext_chat_file_type = (
ChatFileType.PLAIN_TEXT
if chat_file_type != ChatFileType.IMAGE
else chat_file_type
)
chat_file = InMemoryChatFile(
file_id=str(user_file.file_id),

View File

@@ -1,3 +1,4 @@
from onyx.configs.app_configs import HOOK_ENABLED
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from shared_configs.configs import MULTI_TENANT
@@ -6,7 +7,10 @@ from shared_configs.configs import MULTI_TENANT
def require_hook_enabled() -> None:
"""FastAPI dependency that gates all hook management endpoints.
Hooks are only available in single-tenant / self-hosted EE deployments.
Hooks are only available in single-tenant / self-hosted deployments with
HOOK_ENABLED=true explicitly set. Two layers of protection:
1. MULTI_TENANT check — rejects even if HOOK_ENABLED is accidentally set true
2. HOOK_ENABLED flag — explicit opt-in by the operator
Use as: Depends(require_hook_enabled)
"""
@@ -15,3 +19,8 @@ def require_hook_enabled() -> None:
OnyxErrorCode.SINGLE_TENANT_ONLY,
"Hooks are not available in multi-tenant deployments",
)
if not HOOK_ENABLED:
raise OnyxError(
OnyxErrorCode.ENV_VAR_GATED,
"Hooks are not enabled. Set HOOK_ENABLED=true to enable.",
)

View File

@@ -1,22 +1,79 @@
"""CE hook executor.
"""Hook executor — calls a customer's external HTTP endpoint for a given hook point.
HookSkipped and HookSoftFailed are real classes kept here because
process_message.py (CE code) uses isinstance checks against them.
Usage (Celery tasks and FastAPI handlers):
result = execute_hook(
db_session=db_session,
hook_point=HookPoint.QUERY_PROCESSING,
payload={"query": "...", "user_email": "...", "chat_session_id": "..."},
response_type=QueryProcessingResponse,
)
execute_hook is the public entry point. It dispatches to _execute_hook_impl
via fetch_versioned_implementation so that:
- CE: onyx.hooks.executor._execute_hook_impl → no-op, returns HookSkipped()
- EE: ee.onyx.hooks.executor._execute_hook_impl → real HTTP call
if isinstance(result, HookSkipped):
# no active hook configured — continue with original behavior
...
elif isinstance(result, HookSoftFailed):
# hook failed but fail strategy is SOFT — continue with original behavior
...
else:
# result is a validated Pydantic model instance (response_type)
...
is_reachable update policy
--------------------------
``is_reachable`` on the Hook row is updated selectively — only when the outcome
carries meaningful signal about physical reachability:
NetworkError (DNS, connection refused) → False (cannot reach the server)
HTTP 401 / 403 → False (api_key revoked or invalid)
TimeoutException → None (server may be slow, skip write)
Other HTTP errors (4xx / 5xx) → None (server responded, skip write)
Unknown exception → None (no signal, skip write)
Non-JSON / non-dict response → None (server responded, skip write)
Success (2xx, valid dict) → True (confirmed reachable)
None means "leave the current value unchanged" — no DB round-trip is made.
DB session design
-----------------
The executor uses three sessions:
1. Caller's session (db_session) — used only for the hook lookup read. All
needed fields are extracted from the Hook object before the HTTP call, so
the caller's session is not held open during the external HTTP request.
2. Log session — a separate short-lived session opened after the HTTP call
completes to write the HookExecutionLog row on failure. Success runs are
not recorded. Committed independently of everything else.
3. Reachable session — a second short-lived session to update is_reachable on
the Hook. Kept separate from the log session so a concurrent hook deletion
(which causes update_hook__no_commit to raise OnyxError(NOT_FOUND)) cannot
prevent the execution log from being written. This update is best-effort.
"""
import json
import time
from typing import Any
from typing import TypeVar
import httpx
from pydantic import BaseModel
from pydantic import ValidationError
from sqlalchemy.orm import Session
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.enums import HookFailStrategy
from onyx.db.enums import HookPoint
from onyx.utils.variable_functionality import fetch_versioned_implementation
from onyx.db.hook import create_hook_execution_log__no_commit
from onyx.db.hook import get_non_deleted_hook_by_hook_point
from onyx.db.hook import update_hook__no_commit
from onyx.db.models import Hook
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.hooks.utils import HOOKS_AVAILABLE
from onyx.utils.logger import setup_logger
logger = setup_logger()
class HookSkipped:
@@ -30,15 +87,277 @@ class HookSoftFailed:
T = TypeVar("T", bound=BaseModel)
def _execute_hook_impl(
# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------
class _HttpOutcome(BaseModel):
"""Structured result of an HTTP hook call, returned by _process_response."""
is_success: bool
updated_is_reachable: (
bool | None
) # True/False = write to DB, None = unchanged (skip write)
status_code: int | None
error_message: str | None
response_payload: dict[str, Any] | None
def _lookup_hook(
db_session: Session,
hook_point: HookPoint,
) -> Hook | HookSkipped:
"""Return the active Hook or HookSkipped if hooks are unavailable/unconfigured.
No HTTP call is made and no DB writes are performed for any HookSkipped path.
There is nothing to log and no reachability information to update.
"""
if not HOOKS_AVAILABLE:
return HookSkipped()
hook = get_non_deleted_hook_by_hook_point(
db_session=db_session, hook_point=hook_point
)
if hook is None or not hook.is_active:
return HookSkipped()
if not hook.endpoint_url:
return HookSkipped()
return hook
def _process_response(
*,
db_session: Session, # noqa: ARG001
hook_point: HookPoint, # noqa: ARG001
payload: dict[str, Any], # noqa: ARG001
response_type: type[T], # noqa: ARG001
) -> T | HookSkipped | HookSoftFailed:
"""CE no-op — hooks are not available without EE."""
return HookSkipped()
response: httpx.Response | None,
exc: Exception | None,
timeout: float,
) -> _HttpOutcome:
"""Process the result of an HTTP call and return a structured outcome.
Called after the client.post() try/except. If post() raised, exc is set and
response is None. Otherwise response is set and exc is None. Handles
raise_for_status(), JSON decoding, and the dict shape check.
"""
if exc is not None:
if isinstance(exc, httpx.NetworkError):
msg = f"Hook network error (endpoint unreachable): {exc}"
logger.warning(msg, exc_info=exc)
return _HttpOutcome(
is_success=False,
updated_is_reachable=False,
status_code=None,
error_message=msg,
response_payload=None,
)
if isinstance(exc, httpx.TimeoutException):
msg = f"Hook timed out after {timeout}s: {exc}"
logger.warning(msg, exc_info=exc)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # timeout doesn't indicate unreachability
status_code=None,
error_message=msg,
response_payload=None,
)
msg = f"Hook call failed: {exc}"
logger.exception(msg, exc_info=exc)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # unknown error — don't make assumptions
status_code=None,
error_message=msg,
response_payload=None,
)
if response is None:
raise ValueError(
"exactly one of response or exc must be non-None; both are None"
)
status_code = response.status_code
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
msg = f"Hook returned HTTP {e.response.status_code}: {e.response.text}"
logger.warning(msg, exc_info=e)
# 401/403 means the api_key has been revoked or is invalid — mark unreachable
# so the operator knows to update it. All other HTTP errors keep is_reachable
# as-is (server is up, the request just failed for application reasons).
auth_failed = e.response.status_code in (401, 403)
return _HttpOutcome(
is_success=False,
updated_is_reachable=False if auth_failed else None,
status_code=status_code,
error_message=msg,
response_payload=None,
)
try:
response_payload = response.json()
except (json.JSONDecodeError, httpx.DecodingError) as e:
msg = f"Hook returned non-JSON response: {e}"
logger.warning(msg, exc_info=e)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # server responded — reachability unchanged
status_code=status_code,
error_message=msg,
response_payload=None,
)
if not isinstance(response_payload, dict):
msg = f"Hook returned non-dict JSON (got {type(response_payload).__name__})"
logger.warning(msg)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # server responded — reachability unchanged
status_code=status_code,
error_message=msg,
response_payload=None,
)
return _HttpOutcome(
is_success=True,
updated_is_reachable=True,
status_code=status_code,
error_message=None,
response_payload=response_payload,
)
def _persist_result(
*,
hook_id: int,
outcome: _HttpOutcome,
duration_ms: int,
) -> None:
"""Write the execution log on failure and optionally update is_reachable, each
in its own session so a failure in one does not affect the other."""
# Only write the execution log on failure — success runs are not recorded.
# Must not be skipped if the is_reachable update fails (e.g. hook concurrently
# deleted between the initial lookup and here).
if not outcome.is_success:
try:
with get_session_with_current_tenant() as log_session:
create_hook_execution_log__no_commit(
db_session=log_session,
hook_id=hook_id,
is_success=False,
error_message=outcome.error_message,
status_code=outcome.status_code,
duration_ms=duration_ms,
)
log_session.commit()
except Exception:
logger.exception(
f"Failed to persist hook execution log for hook_id={hook_id}"
)
# Update is_reachable separately — best-effort, non-critical.
# None means the value is unchanged (set by the caller to skip the no-op write).
# update_hook__no_commit can raise OnyxError(NOT_FOUND) if the hook was
# concurrently deleted, so keep this isolated from the log write above.
if outcome.updated_is_reachable is not None:
try:
with get_session_with_current_tenant() as reachable_session:
update_hook__no_commit(
db_session=reachable_session,
hook_id=hook_id,
is_reachable=outcome.updated_is_reachable,
)
reachable_session.commit()
except Exception:
logger.warning(f"Failed to update is_reachable for hook_id={hook_id}")
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def _execute_hook_inner(
hook: Hook,
payload: dict[str, Any],
response_type: type[T],
) -> T | HookSoftFailed:
"""Make the HTTP call, validate the response, and return a typed model.
Raises OnyxError on HARD failure. Returns HookSoftFailed on SOFT failure.
"""
timeout = hook.timeout_seconds
hook_id = hook.id
fail_strategy = hook.fail_strategy
endpoint_url = hook.endpoint_url
current_is_reachable: bool | None = hook.is_reachable
if not endpoint_url:
raise ValueError(
f"hook_id={hook_id} is active but has no endpoint_url — "
"active hooks without an endpoint_url must be rejected by _lookup_hook"
)
start = time.monotonic()
response: httpx.Response | None = None
exc: Exception | None = None
try:
api_key: str | None = (
hook.api_key.get_value(apply_mask=False) if hook.api_key else None
)
headers: dict[str, str] = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
with httpx.Client(
timeout=timeout, follow_redirects=False
) as client: # SSRF guard: never follow redirects
response = client.post(endpoint_url, json=payload, headers=headers)
except Exception as e:
exc = e
duration_ms = int((time.monotonic() - start) * 1000)
outcome = _process_response(response=response, exc=exc, timeout=timeout)
# Validate the response payload against response_type.
# A validation failure downgrades the outcome to a failure so it is logged,
# is_reachable is left unchanged (server responded — just a bad payload),
# and fail_strategy is respected below.
validated_model: T | None = None
if outcome.is_success and outcome.response_payload is not None:
try:
validated_model = response_type.model_validate(outcome.response_payload)
except ValidationError as e:
msg = (
f"Hook response failed validation against {response_type.__name__}: {e}"
)
outcome = _HttpOutcome(
is_success=False,
updated_is_reachable=None, # server responded — reachability unchanged
status_code=outcome.status_code,
error_message=msg,
response_payload=None,
)
# Skip the is_reachable write when the value would not change — avoids a
# no-op DB round-trip on every call when the hook is already in the expected state.
if outcome.updated_is_reachable == current_is_reachable:
outcome = outcome.model_copy(update={"updated_is_reachable": None})
_persist_result(hook_id=hook_id, outcome=outcome, duration_ms=duration_ms)
if not outcome.is_success:
if fail_strategy == HookFailStrategy.HARD:
raise OnyxError(
OnyxErrorCode.HOOK_EXECUTION_FAILED,
outcome.error_message or "Hook execution failed.",
)
logger.warning(
f"Hook execution failed (soft fail) for hook_id={hook_id}: {outcome.error_message}"
)
return HookSoftFailed()
if validated_model is None:
raise OnyxError(
OnyxErrorCode.INTERNAL_ERROR,
f"validated_model is None for successful hook call (hook_id={hook_id})",
)
return validated_model
def execute_hook(
@@ -48,15 +367,25 @@ def execute_hook(
payload: dict[str, Any],
response_type: type[T],
) -> T | HookSkipped | HookSoftFailed:
"""Execute the hook for the given hook point.
"""Execute the hook for the given hook point synchronously.
Dispatches to the versioned implementation so EE gets the real executor
and CE gets the no-op stub, without any changes at the call site.
Returns HookSkipped if no active hook is configured, HookSoftFailed if the
hook failed with SOFT fail strategy, or a validated response model on success.
Raises OnyxError on HARD failure or if the hook is misconfigured.
"""
impl = fetch_versioned_implementation("onyx.hooks.executor", "_execute_hook_impl")
return impl(
db_session=db_session,
hook_point=hook_point,
payload=payload,
response_type=response_type,
)
hook = _lookup_hook(db_session, hook_point)
if isinstance(hook, HookSkipped):
return hook
fail_strategy = hook.fail_strategy
hook_id = hook.id
try:
return _execute_hook_inner(hook, payload, response_type)
except Exception:
if fail_strategy == HookFailStrategy.SOFT:
logger.exception(
f"Unexpected error in hook execution (soft fail) for hook_id={hook_id}"
)
return HookSoftFailed()
raise

View File

@@ -0,0 +1,5 @@
from onyx.configs.app_configs import HOOK_ENABLED
from shared_configs.configs import MULTI_TENANT
# True only when hooks are available: single-tenant deployment with HOOK_ENABLED=true.
HOOKS_AVAILABLE: bool = HOOK_ENABLED and not MULTI_TENANT

View File

@@ -8,6 +8,24 @@ from pydantic import BaseModel
class LLMOverride(BaseModel):
"""Per-request LLM settings that override persona defaults.
All fields are optional — only the fields that differ from the persona's
configured LLM need to be supplied. Used both over the wire (API requests)
and for multi-model comparison, where one override is supplied per model.
Attributes:
model_provider: LLM provider slug (e.g. ``"openai"``, ``"anthropic"``).
When ``None``, the persona's default provider is used.
model_version: Specific model version string (e.g. ``"gpt-4o"``).
When ``None``, the persona's default model is used.
temperature: Sampling temperature in ``[0, 2]``. When ``None``, the
persona's default temperature is used.
display_name: Human-readable label shown in the UI for this model,
e.g. ``"GPT-4 Turbo"``. Optional; falls back to ``model_version``
when not set.
"""
model_provider: str | None = None
model_version: str | None = None
temperature: float | None = None

View File

@@ -77,6 +77,7 @@ from onyx.server.features.default_assistant.api import (
)
from onyx.server.features.document_set.api import router as document_set_router
from onyx.server.features.hierarchy.api import router as hierarchy_router
from onyx.server.features.hooks.api import router as hook_router
from onyx.server.features.input_prompt.api import (
admin_router as admin_input_prompt_router,
)
@@ -438,7 +439,6 @@ def get_application(lifespan_override: Lifespan | None = None) -> FastAPI:
dsn=SENTRY_DSN,
integrations=[StarletteIntegration(), FastApiIntegration()],
traces_sample_rate=0.1,
release=__version__,
)
logger.info("Sentry initialized")
else:
@@ -454,6 +454,7 @@ def get_application(lifespan_override: Lifespan | None = None) -> FastAPI:
register_onyx_exception_handlers(application)
include_router_with_global_prefix_prepended(application, hook_router)
include_router_with_global_prefix_prepended(application, password_router)
include_router_with_global_prefix_prepended(application, chat_router)
include_router_with_global_prefix_prepended(application, query_router)

View File

@@ -76,18 +76,11 @@ class CategorizedFiles(BaseModel):
acceptable: list[UploadFile] = Field(default_factory=list)
rejected: list[RejectedFile] = Field(default_factory=list)
acceptable_file_to_token_count: dict[str, int] = Field(default_factory=dict)
# Filenames within `acceptable` that should be stored but not indexed.
skip_indexing: set[str] = Field(default_factory=set)
# Allow FastAPI UploadFile instances
model_config = ConfigDict(arbitrary_types_allowed=True)
def _skip_token_threshold(extension: str) -> bool:
"""Return True if this file extension should bypass the token limit."""
return extension.lower() in OnyxFileExtensions.TABULAR_EXTENSIONS
def _apply_long_side_cap(width: int, height: int, cap: int) -> tuple[int, int]:
if max(width, height) <= cap:
return width, height
@@ -271,17 +264,7 @@ def categorize_uploaded_files(
token_count = count_tokens(
text_content, tokenizer, token_limit=token_threshold
)
exceeds_threshold = (
token_threshold is not None and token_count > token_threshold
)
if exceeds_threshold and _skip_token_threshold(extension):
# Exempt extensions (e.g. spreadsheets) are accepted
# but flagged to skip indexing — only metadata is
# injected into the LLM context.
results.acceptable.append(upload)
results.acceptable_file_to_token_count[filename] = token_count
results.skip_indexing.add(filename)
elif exceeds_threshold:
if token_threshold is not None and token_count > token_threshold:
results.rejected.append(
RejectedFile(
filename=filename,

View File

@@ -28,6 +28,7 @@ from onyx.chat.chat_utils import extract_headers
from onyx.chat.models import ChatFullResponse
from onyx.chat.models import CreateChatSessionID
from onyx.chat.process_message import gather_stream_full
from onyx.chat.process_message import handle_multi_model_stream
from onyx.chat.process_message import handle_stream_message_objects
from onyx.chat.prompt_utils import get_default_base_system_prompt
from onyx.chat.stop_signal_checker import set_fence
@@ -46,6 +47,7 @@ from onyx.db.chat import get_chat_messages_by_session
from onyx.db.chat import get_chat_session_by_id
from onyx.db.chat import get_chat_sessions_by_user
from onyx.db.chat import set_as_latest_chat_message
from onyx.db.chat import set_preferred_response
from onyx.db.chat import translate_db_message_to_chat_message_detail
from onyx.db.chat import update_chat_session
from onyx.db.chat_search import search_chat_sessions
@@ -60,6 +62,8 @@ from onyx.db.persona import get_persona_by_id
from onyx.db.usage import increment_usage
from onyx.db.usage import UsageType
from onyx.db.user_file import get_file_id_by_user_file_id
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.file_store.file_store import get_default_file_store
from onyx.llm.constants import LlmProviderNames
from onyx.llm.factory import get_default_llm
@@ -81,6 +85,7 @@ from onyx.server.query_and_chat.models import ChatSessionUpdateRequest
from onyx.server.query_and_chat.models import MessageOrigin
from onyx.server.query_and_chat.models import RenameChatSessionResponse
from onyx.server.query_and_chat.models import SendMessageRequest
from onyx.server.query_and_chat.models import SetPreferredResponseRequest
from onyx.server.query_and_chat.models import UpdateChatSessionTemperatureRequest
from onyx.server.query_and_chat.models import UpdateChatSessionThreadRequest
from onyx.server.query_and_chat.session_loading import (
@@ -570,6 +575,46 @@ def handle_send_chat_message(
if get_hashed_api_key_from_request(request) or get_hashed_pat_from_request(request):
chat_message_req.origin = MessageOrigin.API
# Multi-model streaming path: 2-3 LLMs in parallel (streaming only)
is_multi_model = (
chat_message_req.llm_overrides is not None
and len(chat_message_req.llm_overrides) > 1
)
if is_multi_model and chat_message_req.stream:
# Narrowed here; is_multi_model already checked llm_overrides is not None
llm_overrides = chat_message_req.llm_overrides or []
def multi_model_stream_generator() -> Generator[str, None, None]:
try:
with get_session_with_current_tenant() as db_session:
for obj in handle_multi_model_stream(
new_msg_req=chat_message_req,
user=user,
db_session=db_session,
llm_overrides=llm_overrides,
litellm_additional_headers=extract_headers(
request.headers, LITELLM_PASS_THROUGH_HEADERS
),
custom_tool_additional_headers=get_custom_tool_additional_request_headers(
request.headers
),
mcp_headers=chat_message_req.mcp_headers,
):
yield get_json_line(obj.model_dump())
except Exception as e:
logger.exception("Error in multi-model streaming")
yield json.dumps({"error": str(e)})
return StreamingResponse(
multi_model_stream_generator(), media_type="text/event-stream"
)
if is_multi_model and not chat_message_req.stream:
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Multi-model mode (llm_overrides with >1 entry) requires stream=True.",
)
# Non-streaming path: consume all packets and return complete response
if not chat_message_req.stream:
with get_session_with_current_tenant() as db_session:
@@ -660,6 +705,30 @@ def set_message_as_latest(
)
@router.put("/set-preferred-response")
def set_preferred_response_endpoint(
request_body: SetPreferredResponseRequest,
user: User | None = Depends(current_user),
db_session: Session = Depends(get_session),
) -> None:
"""Set the preferred assistant response for a multi-model turn."""
try:
# Ownership check: get_chat_message raises ValueError if the message
# doesn't belong to this user, preventing cross-user mutation.
get_chat_message(
chat_message_id=request_body.user_message_id,
user_id=user.id if user else None,
db_session=db_session,
)
set_preferred_response(
db_session=db_session,
user_message_id=request_body.user_message_id,
preferred_assistant_message_id=request_body.preferred_response_id,
)
except ValueError as e:
raise OnyxError(OnyxErrorCode.INVALID_INPUT, str(e))
@router.post("/create-chat-message-feedback")
def create_chat_feedback(
feedback: ChatFeedbackRequest,

View File

@@ -9,8 +9,8 @@ def mime_type_to_chat_file_type(mime_type: str | None) -> ChatFileType:
if mime_type in OnyxMimeTypes.IMAGE_MIME_TYPES:
return ChatFileType.IMAGE
if mime_type in OnyxMimeTypes.TABULAR_MIME_TYPES:
return ChatFileType.TABULAR
if mime_type in OnyxMimeTypes.CSV_MIME_TYPES:
return ChatFileType.CSV
if mime_type in OnyxMimeTypes.DOCUMENT_MIME_TYPES:
return ChatFileType.DOC

View File

@@ -2,11 +2,25 @@ from pydantic import BaseModel
class Placement(BaseModel):
# Which iterative block in the UI is this part of, these are ordered and smaller ones happened first
"""Coordinates that identify where a streaming packet belongs in the UI.
The frontend uses these fields to route each packet to the correct turn,
tool tab, agent sub-turn, and (in multi-model mode) response column.
Attributes:
turn_index: Monotonically increasing index of the iterative reasoning block
(e.g. tool call round) within this chat message. Lower values happened first.
tab_index: Disambiguates parallel tool calls within the same turn so each
tool's output can be displayed in its own tab.
sub_turn_index: Nesting level for tools that invoke other tools. ``None`` for
top-level packets; an integer for tool-within-tool output.
model_index: Which model this packet belongs to. ``0`` for single-model
responses; ``0``, ``1``, or ``2`` for multi-model comparison. ``None``
for pre-LLM setup packets (e.g. message ID info) that are yielded
before any Emitter runs.
"""
turn_index: int
# For parallel tool calls to preserve order of execution
tab_index: int = 0
# Used for tools/agents that call other tools, this currently doesn't support nested agents but can be added later
sub_turn_index: int | None = None
# For multi-model streaming: identifies which model (0, 1, 2) this packet belongs to.
model_index: int | None = None

View File

@@ -21,6 +21,7 @@ from onyx.db.notification import get_notifications
from onyx.db.notification import update_notification_last_shown
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.hooks.utils import HOOKS_AVAILABLE
from onyx.key_value_store.factory import get_kv_store
from onyx.key_value_store.interface import KvKeyNotFoundError
from onyx.server.features.build.utils import is_onyx_craft_enabled
@@ -37,7 +38,6 @@ from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import (
fetch_versioned_implementation_with_fallback,
)
from shared_configs.configs import MULTI_TENANT
logger = setup_logger()
@@ -98,7 +98,7 @@ def fetch_settings(
needs_reindexing=needs_reindexing,
onyx_craft_enabled=onyx_craft_enabled_for_user,
vector_db_enabled=not DISABLE_VECTOR_DB,
hooks_enabled=not MULTI_TENANT,
hooks_enabled=HOOKS_AVAILABLE,
version=onyx_version,
max_allowed_upload_size_mb=MAX_ALLOWED_UPLOAD_SIZE_MB,
default_user_file_max_upload_size_mb=min(

View File

@@ -116,7 +116,7 @@ class UserSettings(Settings):
# False when DISABLE_VECTOR_DB is set — connectors, RAG search, and
# document sets are unavailable.
vector_db_enabled: bool = True
# True when hooks are available: single-tenant EE deployments only.
# True when hooks are available: single-tenant deployment with HOOK_ENABLED=true.
hooks_enabled: bool = False
# Application version, read from the ONYX_VERSION env var at startup.
version: str | None = None

View File

@@ -1,3 +1,4 @@
import queue
import time
from collections.abc import Callable
from typing import Any
@@ -708,7 +709,6 @@ def run_research_agent_calls(
if __name__ == "__main__":
from queue import Queue
from uuid import uuid4
from onyx.chat.chat_state import ChatStateContainer
@@ -744,8 +744,8 @@ if __name__ == "__main__":
if user is None:
raise ValueError("No users found in database. Please create a user first.")
bus: Queue[Packet] = Queue()
emitter = Emitter(bus)
emitter_queue: queue.Queue = queue.Queue()
emitter = Emitter(merged_queue=emitter_queue)
state_container = ChatStateContainer()
tool_dict = construct_tools(
@@ -792,4 +792,4 @@ if __name__ == "__main__":
print(result.intermediate_report)
print("=" * 80)
print(f"Citations: {result.citation_mapping}")
print(f"Total packets emitted: {bus.qsize()}")
print(f"Total packets emitted: {emitter_queue.qsize()}")

View File

@@ -1,5 +1,6 @@
import csv
import json
import queue
import uuid
from io import BytesIO
from io import StringIO
@@ -11,7 +12,6 @@ import requests
from requests import JSONDecodeError
from onyx.chat.emitter import Emitter
from onyx.chat.emitter import get_default_emitter
from onyx.configs.constants import FileOrigin
from onyx.file_store.file_store import get_default_file_store
from onyx.server.query_and_chat.placement import Placement
@@ -296,9 +296,9 @@ def build_custom_tools_from_openapi_schema_and_headers(
url = openapi_to_url(openapi_schema)
method_specs = openapi_to_method_specs(openapi_schema)
# Use default emitter if none provided
# Use a discard emitter if none provided (packets go nowhere)
if emitter is None:
emitter = get_default_emitter()
emitter = Emitter(merged_queue=queue.Queue())
return [
CustomTool(
@@ -367,7 +367,7 @@ if __name__ == "__main__":
tools = build_custom_tools_from_openapi_schema_and_headers(
tool_id=0, # dummy tool id
openapi_schema=openapi_schema,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
dynamic_schema_info=None,
)

View File

@@ -1,4 +1,3 @@
import io
import json
from typing import Any
from typing import cast
@@ -10,7 +9,6 @@ from typing_extensions import override
from onyx.chat.emitter import Emitter
from onyx.configs.app_configs import DISABLE_VECTOR_DB
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_store.models import ChatFileType
from onyx.file_store.models import InMemoryChatFile
from onyx.file_store.utils import load_chat_file_by_id
@@ -171,13 +169,10 @@ class FileReaderTool(Tool[FileReaderToolOverrideKwargs]):
chat_file = self._load_file(file_id)
# Only PLAIN_TEXT and TABULAR are guaranteed to contain actual text bytes.
# Only PLAIN_TEXT and CSV are guaranteed to contain actual text bytes.
# DOC type in a loaded file means plaintext extraction failed and the
# content is the original binary (e.g. raw PDF/DOCX bytes).
if chat_file.file_type not in (
ChatFileType.PLAIN_TEXT,
ChatFileType.TABULAR,
):
if chat_file.file_type not in (ChatFileType.PLAIN_TEXT, ChatFileType.CSV):
raise ToolCallException(
message=f"File {file_id} is not a text file (type={chat_file.file_type})",
llm_facing_message=(
@@ -186,19 +181,7 @@ class FileReaderTool(Tool[FileReaderToolOverrideKwargs]):
)
try:
if chat_file.file_type == ChatFileType.PLAIN_TEXT:
full_text = chat_file.content.decode("utf-8", errors="replace")
else:
full_text = (
extract_file_text(
file=io.BytesIO(chat_file.content),
file_name=chat_file.filename or "",
break_on_unprocessable=False,
)
or ""
)
except ToolCallException:
raise
full_text = chat_file.content.decode("utf-8", errors="replace")
except Exception:
raise ToolCallException(
message=f"Failed to decode file {file_id}",

View File

@@ -14,7 +14,7 @@ aiofiles==25.1.0
# unstructured-client
aiohappyeyeballs==2.6.1
# via aiohttp
aiohttp==3.13.4
aiohttp==3.13.3
# via
# aiobotocore
# discord-py
@@ -271,7 +271,7 @@ fastapi-users-db-sqlalchemy==7.0.0
# via onyx
fastavro==1.12.1
# via cohere
fastmcp==3.2.0
fastmcp==3.0.2
# via onyx
fastuuid==0.14.0
# via litellm
@@ -1102,8 +1102,6 @@ tzdata==2025.2
# tzlocal
tzlocal==5.3.1
# via dateparser
uncalled-for==0.2.0
# via fastmcp
unstructured==0.18.27
# via onyx
unstructured-client==0.42.6

View File

@@ -10,7 +10,7 @@ aiofiles==25.1.0
# via aioboto3
aiohappyeyeballs==2.6.1
# via aiohttp
aiohttp==3.13.4
aiohttp==3.13.3
# via
# aiobotocore
# discord-py

View File

@@ -10,7 +10,7 @@ aiofiles==25.1.0
# via aioboto3
aiohappyeyeballs==2.6.1
# via aiohttp
aiohttp==3.13.4
aiohttp==3.13.3
# via
# aiobotocore
# discord-py

View File

@@ -12,7 +12,7 @@ aiofiles==25.1.0
# via aioboto3
aiohappyeyeballs==2.6.1
# via aiohttp
aiohttp==3.13.4
aiohttp==3.13.3
# via
# aiobotocore
# discord-py

View File

@@ -5,7 +5,6 @@ import asyncio
import json
import logging
import sys
import time
from dataclasses import asdict
from dataclasses import dataclass
from pathlib import Path
@@ -28,9 +27,6 @@ INTERNAL_SEARCH_TOOL_NAME = "internal_search"
INTERNAL_SEARCH_IN_CODE_TOOL_ID = "SearchTool"
MAX_REQUEST_ATTEMPTS = 5
RETRIABLE_STATUS_CODES = {429, 500, 502, 503, 504}
QUESTION_TIMEOUT_SECONDS = 300
QUESTION_RETRY_PAUSE_SECONDS = 30
MAX_QUESTION_ATTEMPTS = 3
@dataclass(frozen=True)
@@ -113,27 +109,6 @@ def normalize_api_base(api_base: str) -> str:
return f"{normalized}/api"
def load_completed_question_ids(output_file: Path) -> set[str]:
if not output_file.exists():
return set()
completed_ids: set[str] = set()
with output_file.open("r", encoding="utf-8") as file:
for line in file:
stripped = line.strip()
if not stripped:
continue
try:
record = json.loads(stripped)
except json.JSONDecodeError:
continue
question_id = record.get("question_id")
if isinstance(question_id, str) and question_id:
completed_ids.add(question_id)
return completed_ids
def load_questions(questions_file: Path) -> list[QuestionRecord]:
if not questions_file.exists():
raise FileNotFoundError(f"Questions file not found: {questions_file}")
@@ -373,7 +348,6 @@ async def generate_answers(
api_base: str,
api_key: str,
parallelism: int,
skipped: int,
) -> None:
if parallelism < 1:
raise ValueError("`--parallelism` must be at least 1.")
@@ -408,178 +382,58 @@ async def generate_answers(
write_lock = asyncio.Lock()
completed = 0
successful = 0
stuck_count = 0
failed_questions: list[FailedQuestionRecord] = []
remaining_count = len(questions)
overall_total = remaining_count + skipped
question_durations: list[float] = []
run_start_time = time.monotonic()
def print_progress() -> None:
avg_time = (
sum(question_durations) / len(question_durations)
if question_durations
else 0.0
)
elapsed = time.monotonic() - run_start_time
eta = avg_time * (remaining_count - completed) / max(parallelism, 1)
done = skipped + completed
bar_width = 30
filled = (
int(bar_width * done / overall_total)
if overall_total
else bar_width
)
bar = "" * filled + "" * (bar_width - filled)
pct = (done / overall_total * 100) if overall_total else 100.0
parts = (
f"\r{bar} {pct:5.1f}% "
f"[{done}/{overall_total}] "
f"avg {avg_time:.1f}s/q "
f"elapsed {elapsed:.0f}s "
f"ETA {eta:.0f}s "
f"(ok:{successful} fail:{len(failed_questions)}"
)
if stuck_count:
parts += f" stuck:{stuck_count}"
if skipped:
parts += f" skip:{skipped}"
parts += ")"
sys.stderr.write(parts)
sys.stderr.flush()
print_progress()
total = len(questions)
async def process_question(question_record: QuestionRecord) -> None:
nonlocal completed
nonlocal successful
nonlocal stuck_count
last_error: Exception | None = None
for attempt in range(1, MAX_QUESTION_ATTEMPTS + 1):
q_start = time.monotonic()
try:
async with semaphore:
result = await asyncio.wait_for(
submit_question(
session=session,
api_base=api_base,
headers=headers,
internal_search_tool_id=internal_search_tool_id,
question_record=question_record,
),
timeout=QUESTION_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
async with progress_lock:
stuck_count += 1
logger.warning(
"Question %s timed out after %ss (attempt %s/%s, "
"total stuck: %s) — retrying in %ss",
question_record.question_id,
QUESTION_TIMEOUT_SECONDS,
attempt,
MAX_QUESTION_ATTEMPTS,
stuck_count,
QUESTION_RETRY_PAUSE_SECONDS,
)
print_progress()
last_error = TimeoutError(
f"Timed out after {QUESTION_TIMEOUT_SECONDS}s "
f"on attempt {attempt}/{MAX_QUESTION_ATTEMPTS}"
try:
async with semaphore:
result = await submit_question(
session=session,
api_base=api_base,
headers=headers,
internal_search_tool_id=internal_search_tool_id,
question_record=question_record,
)
await asyncio.sleep(QUESTION_RETRY_PAUSE_SECONDS)
continue
except Exception as exc:
duration = time.monotonic() - q_start
async with progress_lock:
completed += 1
question_durations.append(duration)
failed_questions.append(
FailedQuestionRecord(
question_id=question_record.question_id,
error=str(exc),
)
)
logger.exception(
"Failed question %s (%s/%s)",
question_record.question_id,
completed,
remaining_count,
)
print_progress()
return
duration = time.monotonic() - q_start
async with write_lock:
file.write(json.dumps(asdict(result), ensure_ascii=False))
file.write("\n")
file.flush()
except Exception as exc:
async with progress_lock:
completed += 1
successful += 1
question_durations.append(duration)
print_progress()
failed_questions.append(
FailedQuestionRecord(
question_id=question_record.question_id,
error=str(exc),
)
)
logger.exception(
"Failed question %s (%s/%s)",
question_record.question_id,
completed,
total,
)
return
# All attempts exhausted due to timeouts
async with write_lock:
file.write(json.dumps(asdict(result), ensure_ascii=False))
file.write("\n")
file.flush()
async with progress_lock:
completed += 1
failed_questions.append(
FailedQuestionRecord(
question_id=question_record.question_id,
error=str(last_error),
)
)
logger.error(
"Question %s failed after %s timeout attempts (%s/%s)",
question_record.question_id,
MAX_QUESTION_ATTEMPTS,
completed,
remaining_count,
)
print_progress()
successful += 1
logger.info("Processed %s/%s questions", completed, total)
await asyncio.gather(
*(process_question(question_record) for question_record in questions)
)
# Final newline after progress bar
sys.stderr.write("\n")
sys.stderr.flush()
total_elapsed = time.monotonic() - run_start_time
avg_time = (
sum(question_durations) / len(question_durations)
if question_durations
else 0.0
)
stuck_suffix = f", {stuck_count} stuck timeouts" if stuck_count else ""
resume_suffix = (
f"{skipped} previously completed, "
f"{skipped + successful}/{overall_total} overall"
if skipped
else ""
)
logger.info(
"Done: %s/%s successful in %.1fs (avg %.1fs/question%s)%s",
successful,
remaining_count,
total_elapsed,
avg_time,
stuck_suffix,
resume_suffix,
)
if failed_questions:
logger.warning(
"%s questions failed:",
"Completed with %s failed questions and %s successful questions.",
len(failed_questions),
successful,
)
for failed_question in failed_questions:
logger.warning(
@@ -599,30 +453,7 @@ def main() -> None:
raise ValueError("`--max-questions` must be at least 1 when provided.")
questions = questions[: args.max_questions]
completed_ids = load_completed_question_ids(args.output_file)
logger.info(
"Found %s already-answered question IDs in %s",
len(completed_ids),
args.output_file,
)
total_before_filter = len(questions)
questions = [q for q in questions if q.question_id not in completed_ids]
skipped = total_before_filter - len(questions)
if skipped:
logger.info(
"Resuming: %s/%s already answered, %s remaining",
skipped,
total_before_filter,
len(questions),
)
else:
logger.info("Loaded %s questions from %s", len(questions), args.questions_file)
if not questions:
logger.info("All questions already answered. Nothing to do.")
return
logger.info("Loaded %s questions from %s", len(questions), args.questions_file)
logger.info("Writing answers to %s", args.output_file)
asyncio.run(
@@ -632,7 +463,6 @@ def main() -> None:
api_base=api_base,
api_key=args.api_key,
parallelism=args.parallelism,
skipped=skipped,
)
)

View File

@@ -1,8 +1,10 @@
from collections.abc import Iterable
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
from onyx.connectors.google_drive.connector import GoogleDriveConnector
from onyx.connectors.google_drive.file_retrieval import DriveFileFieldType
from onyx.connectors.google_drive.file_retrieval import has_link_only_permission
from onyx.connectors.google_drive.models import DriveRetrievalStage
from onyx.connectors.google_drive.models import RetrievedDriveFile
@@ -73,8 +75,10 @@ def test_connector_skips_link_only_files_when_enabled() -> None:
retrieved_file = _build_retrieved_file(
[{"type": "domain", "allowFileDiscovery": False}]
)
fetch_mock = MagicMock(return_value=iter([retrieved_file]))
with (
patch.object(connector, "_fetch_drive_items", fetch_mock),
patch(
"onyx.connectors.google_drive.connector.run_functions_tuples_in_parallel",
side_effect=_stub_run_functions,
@@ -89,16 +93,21 @@ def test_connector_skips_link_only_files_when_enabled() -> None:
convert_mock.return_value = "doc"
checkpoint = connector.build_dummy_checkpoint()
results = list(
connector._convert_retrieved_files_to_documents(
drive_files_iter=iter([retrieved_file]),
connector._extract_docs_from_google_drive(
checkpoint=checkpoint,
start=None,
end=None,
include_permissions=False,
)
)
assert results == []
convert_mock.assert_not_called()
fetch_mock.assert_called_once()
get_new_ancestors_mock.assert_called_once()
assert (
fetch_mock.call_args.kwargs["field_type"] == DriveFileFieldType.WITH_PERMISSIONS
)
def test_connector_processes_files_when_option_disabled() -> None:
@@ -106,8 +115,10 @@ def test_connector_processes_files_when_option_disabled() -> None:
retrieved_file = _build_retrieved_file(
[{"type": "domain", "allowFileDiscovery": False}]
)
fetch_mock = MagicMock(return_value=iter([retrieved_file]))
with (
patch.object(connector, "_fetch_drive_items", fetch_mock),
patch(
"onyx.connectors.google_drive.connector.run_functions_tuples_in_parallel",
side_effect=_stub_run_functions,
@@ -122,13 +133,16 @@ def test_connector_processes_files_when_option_disabled() -> None:
convert_mock.return_value = "doc"
checkpoint = connector.build_dummy_checkpoint()
results = list(
connector._convert_retrieved_files_to_documents(
drive_files_iter=iter([retrieved_file]),
connector._extract_docs_from_google_drive(
checkpoint=checkpoint,
start=None,
end=None,
include_permissions=False,
)
)
assert len(results) == 1
convert_mock.assert_called_once()
fetch_mock.assert_called_once()
get_new_ancestors_mock.assert_called_once()
assert fetch_mock.call_args.kwargs["field_type"] == DriveFileFieldType.STANDARD

View File

@@ -1,239 +0,0 @@
"""Tests for GoogleDriveConnector.resolve_errors against real Google Drive."""
import json
import os
from collections.abc import Callable
from unittest.mock import patch
from onyx.connectors.google_drive.connector import GoogleDriveConnector
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import Document
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import HierarchyNode
from tests.daily.connectors.google_drive.consts_and_utils import ADMIN_EMAIL
from tests.daily.connectors.google_drive.consts_and_utils import (
ALL_EXPECTED_HIERARCHY_NODES,
)
from tests.daily.connectors.google_drive.consts_and_utils import FOLDER_1_ID
from tests.daily.connectors.google_drive.consts_and_utils import SHARED_DRIVE_1_ID
_DRIVE_ID_MAPPING_PATH = os.path.join(
os.path.dirname(__file__), "drive_id_mapping.json"
)
def _load_web_view_links(file_ids: list[int]) -> list[str]:
with open(_DRIVE_ID_MAPPING_PATH) as f:
mapping: dict[str, str] = json.load(f)
return [mapping[str(fid)] for fid in file_ids]
def _build_failures(web_view_links: list[str]) -> list[ConnectorFailure]:
return [
ConnectorFailure(
failed_document=DocumentFailure(
document_id=link,
document_link=link,
),
failure_message=f"Synthetic failure for {link}",
)
for link in web_view_links
]
@patch("onyx.file_processing.extract_file_text.get_unstructured_api_key")
def test_resolve_single_file(
mock_api_key: None, # noqa: ARG001
google_drive_service_acct_connector_factory: Callable[..., GoogleDriveConnector],
) -> None:
"""Resolve a single known file and verify we get back exactly one Document."""
connector = google_drive_service_acct_connector_factory(
primary_admin_email=ADMIN_EMAIL,
include_shared_drives=True,
shared_drive_urls=None,
include_my_drives=True,
my_drive_emails=None,
shared_folder_urls=None,
include_files_shared_with_me=False,
)
web_view_links = _load_web_view_links([0])
failures = _build_failures(web_view_links)
results = list(connector.resolve_errors(failures))
docs = [r for r in results if isinstance(r, Document)]
new_failures = [r for r in results if isinstance(r, ConnectorFailure)]
hierarchy_nodes = [r for r in results if isinstance(r, HierarchyNode)]
assert len(docs) == 1
assert len(new_failures) == 0
assert docs[0].semantic_identifier == "file_0.txt"
# Should yield at least one hierarchy node (the file's parent folder chain)
assert len(hierarchy_nodes) > 0
@patch("onyx.file_processing.extract_file_text.get_unstructured_api_key")
def test_resolve_multiple_files(
mock_api_key: None, # noqa: ARG001
google_drive_service_acct_connector_factory: Callable[..., GoogleDriveConnector],
) -> None:
"""Resolve multiple files across different folders via batch API."""
connector = google_drive_service_acct_connector_factory(
primary_admin_email=ADMIN_EMAIL,
include_shared_drives=True,
shared_drive_urls=None,
include_my_drives=True,
my_drive_emails=None,
shared_folder_urls=None,
include_files_shared_with_me=False,
)
# Pick files from different folders: admin files (0-4), shared drive 1 (20-24), folder_2 (45-49)
file_ids = [0, 1, 20, 21, 45]
web_view_links = _load_web_view_links(file_ids)
failures = _build_failures(web_view_links)
results = list(connector.resolve_errors(failures))
docs = [r for r in results if isinstance(r, Document)]
new_failures = [r for r in results if isinstance(r, ConnectorFailure)]
hierarchy_nodes = [r for r in results if isinstance(r, HierarchyNode)]
assert len(new_failures) == 0
retrieved_names = {doc.semantic_identifier for doc in docs}
expected_names = {f"file_{fid}.txt" for fid in file_ids}
assert expected_names == retrieved_names
# Files span multiple folders, so we should get hierarchy nodes
assert len(hierarchy_nodes) > 0
@patch("onyx.file_processing.extract_file_text.get_unstructured_api_key")
def test_resolve_hierarchy_nodes_are_valid(
mock_api_key: None, # noqa: ARG001
google_drive_service_acct_connector_factory: Callable[..., GoogleDriveConnector],
) -> None:
"""Verify that hierarchy nodes from resolve_errors match expected structure."""
connector = google_drive_service_acct_connector_factory(
primary_admin_email=ADMIN_EMAIL,
include_shared_drives=True,
shared_drive_urls=None,
include_my_drives=True,
my_drive_emails=None,
shared_folder_urls=None,
include_files_shared_with_me=False,
)
# File in folder_1 (inside shared_drive_1) — should walk up to shared_drive_1 root
web_view_links = _load_web_view_links([25])
failures = _build_failures(web_view_links)
results = list(connector.resolve_errors(failures))
hierarchy_nodes = [r for r in results if isinstance(r, HierarchyNode)]
node_ids = {node.raw_node_id for node in hierarchy_nodes}
# File 25 is in folder_1 which is inside shared_drive_1.
# The parent walk must yield at least these two ancestors.
assert (
FOLDER_1_ID in node_ids
), f"Expected folder_1 ({FOLDER_1_ID}) in hierarchy nodes, got: {node_ids}"
assert (
SHARED_DRIVE_1_ID in node_ids
), f"Expected shared_drive_1 ({SHARED_DRIVE_1_ID}) in hierarchy nodes, got: {node_ids}"
for node in hierarchy_nodes:
if node.raw_node_id not in ALL_EXPECTED_HIERARCHY_NODES:
continue
expected = ALL_EXPECTED_HIERARCHY_NODES[node.raw_node_id]
assert node.display_name == expected.display_name, (
f"Display name mismatch for {node.raw_node_id}: "
f"expected '{expected.display_name}', got '{node.display_name}'"
)
assert node.node_type == expected.node_type, (
f"Node type mismatch for {node.raw_node_id}: "
f"expected '{expected.node_type}', got '{node.node_type}'"
)
@patch("onyx.file_processing.extract_file_text.get_unstructured_api_key")
def test_resolve_with_invalid_link(
mock_api_key: None, # noqa: ARG001
google_drive_service_acct_connector_factory: Callable[..., GoogleDriveConnector],
) -> None:
"""Resolve with a mix of valid and invalid links — invalid ones yield ConnectorFailure."""
connector = google_drive_service_acct_connector_factory(
primary_admin_email=ADMIN_EMAIL,
include_shared_drives=True,
shared_drive_urls=None,
include_my_drives=True,
my_drive_emails=None,
shared_folder_urls=None,
include_files_shared_with_me=False,
)
valid_links = _load_web_view_links([0])
invalid_link = "https://drive.google.com/file/d/NONEXISTENT_FILE_ID_12345"
failures = _build_failures(valid_links + [invalid_link])
results = list(connector.resolve_errors(failures))
docs = [r for r in results if isinstance(r, Document)]
new_failures = [r for r in results if isinstance(r, ConnectorFailure)]
assert len(docs) == 1
assert docs[0].semantic_identifier == "file_0.txt"
assert len(new_failures) == 1
assert new_failures[0].failed_document is not None
assert new_failures[0].failed_document.document_id == invalid_link
@patch("onyx.file_processing.extract_file_text.get_unstructured_api_key")
def test_resolve_empty_errors(
mock_api_key: None, # noqa: ARG001
google_drive_service_acct_connector_factory: Callable[..., GoogleDriveConnector],
) -> None:
"""Resolving an empty error list should yield nothing."""
connector = google_drive_service_acct_connector_factory(
primary_admin_email=ADMIN_EMAIL,
include_shared_drives=True,
shared_drive_urls=None,
include_my_drives=True,
my_drive_emails=None,
shared_folder_urls=None,
include_files_shared_with_me=False,
)
results = list(connector.resolve_errors([]))
assert len(results) == 0
@patch("onyx.file_processing.extract_file_text.get_unstructured_api_key")
def test_resolve_entity_failures_are_skipped(
mock_api_key: None, # noqa: ARG001
google_drive_service_acct_connector_factory: Callable[..., GoogleDriveConnector],
) -> None:
"""Entity failures (not document failures) should be skipped by resolve_errors."""
from onyx.connectors.models import EntityFailure
connector = google_drive_service_acct_connector_factory(
primary_admin_email=ADMIN_EMAIL,
include_shared_drives=True,
shared_drive_urls=None,
include_my_drives=True,
my_drive_emails=None,
shared_folder_urls=None,
include_files_shared_with_me=False,
)
entity_failure = ConnectorFailure(
failed_entity=EntityFailure(entity_id="some_stage"),
failure_message="retrieval failure",
)
results = list(connector.resolve_errors([entity_failure]))
assert len(results) == 0

View File

@@ -27,11 +27,13 @@ def create_placement(
turn_index: int,
tab_index: int = 0,
sub_turn_index: int | None = None,
model_index: int | None = 0,
) -> Placement:
return Placement(
turn_index=turn_index,
tab_index=tab_index,
sub_turn_index=sub_turn_index,
model_index=model_index,
)

View File

@@ -13,6 +13,7 @@ This test:
All external HTTP calls are mocked, but Postgres and Redis are running.
"""
import queue
from typing import Any
from unittest.mock import patch
from uuid import uuid4
@@ -20,7 +21,7 @@ from uuid import uuid4
import pytest
from sqlalchemy.orm import Session
from onyx.chat.emitter import get_default_emitter
from onyx.chat.emitter import Emitter
from onyx.db.enums import MCPAuthenticationPerformer
from onyx.db.enums import MCPAuthenticationType
from onyx.db.enums import MCPTransport
@@ -137,7 +138,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
search_tool_config=search_tool_config,
@@ -200,7 +201,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
search_tool_config=SearchToolConfig(),
@@ -275,7 +276,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
search_tool_config=SearchToolConfig(),
@@ -350,7 +351,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
search_tool_config=SearchToolConfig(),
@@ -458,7 +459,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
search_tool_config=SearchToolConfig(),
@@ -541,7 +542,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
search_tool_config=SearchToolConfig(),

View File

@@ -8,6 +8,7 @@ Tests the priority logic for OAuth tokens when constructing custom tools:
All external HTTP calls are mocked, but Postgres and Redis are running.
"""
import queue
from typing import Any
from unittest.mock import Mock
from unittest.mock import patch
@@ -16,7 +17,7 @@ from uuid import uuid4
import pytest
from sqlalchemy.orm import Session
from onyx.chat.emitter import get_default_emitter
from onyx.chat.emitter import Emitter
from onyx.db.models import OAuthAccount
from onyx.db.models import OAuthConfig
from onyx.db.models import Persona
@@ -174,7 +175,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
search_tool_config=search_tool_config,
@@ -232,7 +233,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
)
@@ -284,7 +285,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
)
@@ -345,7 +346,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
)
@@ -416,7 +417,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
)
@@ -483,7 +484,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
)
@@ -536,7 +537,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=get_default_emitter(),
emitter=Emitter(merged_queue=queue.Queue()),
user=user,
llm=llm,
)

View File

@@ -1175,7 +1175,7 @@ def test_code_interpreter_receives_chat_files(
file_descriptor: FileDescriptor = {
"id": user_file.file_id,
"type": ChatFileType.TABULAR,
"type": ChatFileType.CSV,
"name": "data.csv",
"user_file_id": str(user_file.id),
}

View File

@@ -1,9 +1,3 @@
import mimetypes
from typing import Any
import requests
from tests.integration.common_utils.constants import API_SERVER_URL
from tests.integration.common_utils.managers.chat import ChatSessionManager
from tests.integration.common_utils.managers.file import FileManager
from tests.integration.common_utils.managers.llm_provider import LLMProviderManager
@@ -85,90 +79,3 @@ def test_send_message_with_text_file_attachment(admin_user: DATestUser) -> None:
assert (
"third line" in response.full_message.lower()
), "Chat response should contain the contents of the file"
def _set_token_threshold(admin_user: DATestUser, threshold_k: int) -> None:
"""Set the file token count threshold via admin settings API."""
response = requests.put(
f"{API_SERVER_URL}/admin/settings",
json={"file_token_count_threshold_k": threshold_k},
headers=admin_user.headers,
)
response.raise_for_status()
def _upload_raw(
filename: str,
content: bytes,
user: DATestUser,
) -> dict[str, Any]:
"""Upload a file and return the full JSON response (user_files + rejected_files)."""
mime_type, _ = mimetypes.guess_type(filename)
headers = user.headers.copy()
headers.pop("Content-Type", None)
response = requests.post(
f"{API_SERVER_URL}/user/projects/file/upload",
files=[("files", (filename, content, mime_type or "application/octet-stream"))],
headers=headers,
)
response.raise_for_status()
return response.json()
def test_csv_over_token_threshold_uploaded_not_indexed(
admin_user: DATestUser,
) -> None:
"""CSV exceeding token threshold is uploaded (accepted) but skips indexing."""
_set_token_threshold(admin_user, threshold_k=1)
try:
# ~2000 tokens with default tokenizer, well over 1K threshold
content = ("x " * 100 + "\n") * 20
result = _upload_raw("large.csv", content.encode(), admin_user)
assert len(result["user_files"]) == 1, "CSV should be accepted"
assert len(result["rejected_files"]) == 0, "CSV should not be rejected"
assert (
result["user_files"][0]["status"] == "SKIPPED"
), "CSV over threshold should be SKIPPED (uploaded but not indexed)"
assert (
result["user_files"][0]["chunk_count"] is None
), "Skipped file should have no chunks"
finally:
_set_token_threshold(admin_user, threshold_k=200)
def test_csv_under_token_threshold_uploaded_and_indexed(
admin_user: DATestUser,
) -> None:
"""CSV under token threshold is uploaded and queued for indexing."""
_set_token_threshold(admin_user, threshold_k=200)
try:
content = "col1,col2\na,b\n"
result = _upload_raw("small.csv", content.encode(), admin_user)
assert len(result["user_files"]) == 1, "CSV should be accepted"
assert len(result["rejected_files"]) == 0, "CSV should not be rejected"
assert (
result["user_files"][0]["status"] == "PROCESSING"
), "CSV under threshold should be PROCESSING (queued for indexing)"
finally:
_set_token_threshold(admin_user, threshold_k=200)
def test_txt_over_token_threshold_rejected(
admin_user: DATestUser,
) -> None:
"""Non-exempt file exceeding token threshold is rejected entirely."""
_set_token_threshold(admin_user, threshold_k=1)
try:
# ~2000 tokens, well over 1K threshold. Unlike CSV, .txt is not
# exempt from the threshold so the file should be rejected.
content = ("x " * 100 + "\n") * 20
result = _upload_raw("big.txt", content.encode(), admin_user)
assert len(result["user_files"]) == 0, "File should not be accepted"
assert len(result["rejected_files"]) == 1, "File should be rejected"
assert "token limit" in result["rejected_files"][0]["reason"].lower()
finally:
_set_token_threshold(admin_user, threshold_k=200)

View File

@@ -300,66 +300,6 @@ class TestExtractContextFiles:
assert result.file_texts == []
assert result.total_token_count == 50
@patch("onyx.chat.process_message.load_in_memory_chat_files")
def test_tool_metadata_file_id_matches_chat_history_file_id(
self, mock_load: MagicMock
) -> None:
"""The file_id in tool metadata (from extract_context_files) and the
file_id in chat history messages (from build_file_context) must
agree, otherwise the LLM sees different IDs for the same file across
turns.
In production, UserFile.id (UUID PK) differs from UserFile.file_id
(file-store path). Both pathways should produce the same file_id
(UserFile.id) for FileReaderTool."""
from onyx.chat.chat_utils import build_file_context
user_file_uuid = uuid4()
file_store_path = f"user_files/{user_file_uuid}/data.csv"
uf = UserFile(
id=user_file_uuid,
file_id=file_store_path,
name="data.csv",
token_count=100,
file_type="text/csv",
)
in_memory = InMemoryChatFile(
file_id=file_store_path,
content=b"col1,col2\na,b",
file_type=ChatFileType.TABULAR,
filename="data.csv",
)
mock_load.return_value = [in_memory]
# Pathway 1: extract_context_files (project/persona context)
result = extract_context_files(
user_files=[uf],
llm_max_context_window=10000,
reserved_token_count=0,
db_session=MagicMock(),
)
assert len(result.file_metadata_for_tool) == 1
tool_metadata_file_id = result.file_metadata_for_tool[0].file_id
# Pathway 2: build_file_context (chat history path)
# In convert_chat_history, tool_file_id comes from
# file_descriptor["user_file_id"], which is str(UserFile.id)
ctx = build_file_context(
tool_file_id=str(user_file_uuid),
filename="data.csv",
file_type=ChatFileType.TABULAR,
)
chat_history_file_id = ctx.tool_metadata.file_id
# Both pathways must produce the same ID for the LLM
assert tool_metadata_file_id == chat_history_file_id, (
f"File ID mismatch: extract_context_files uses '{tool_metadata_file_id}' "
f"but build_file_context uses '{chat_history_file_id}'."
)
@patch("onyx.chat.process_message.DISABLE_VECTOR_DB", True)
def test_overflow_with_vector_db_disabled_provides_tool_metadata(self) -> None:
"""When vector DB is disabled, overflow produces FileToolMetadata."""
@@ -376,128 +316,6 @@ class TestExtractContextFiles:
assert len(result.file_metadata_for_tool) == 1
assert result.file_metadata_for_tool[0].filename == "bigfile.txt"
@patch("onyx.chat.process_message.load_in_memory_chat_files")
def test_metadata_only_files_not_counted_in_aggregate_tokens(
self, mock_load: MagicMock
) -> None:
"""Metadata-only files (TABULAR) should not count toward the token budget."""
text_file_id = str(uuid4())
text_uf = _make_user_file(token_count=100, file_id=text_file_id)
# TABULAR file with large token count — should be excluded from aggregate
tabular_uf = _make_user_file(
token_count=50000, name="huge.xlsx", file_id=str(uuid4())
)
tabular_uf.file_type = (
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
mock_load.return_value = [
_make_in_memory_file(file_id=text_file_id, content="text content"),
InMemoryChatFile(
file_id=str(tabular_uf.id),
content=b"binary xlsx",
file_type=ChatFileType.TABULAR,
filename="huge.xlsx",
),
]
result = extract_context_files(
user_files=[text_uf, tabular_uf],
llm_max_context_window=10000,
reserved_token_count=0,
db_session=MagicMock(),
)
# Text file fits (100 < 6000), so files should be loaded
assert result.file_texts == ["text content"]
# TABULAR file should appear as tool metadata, not in file_texts
assert len(result.file_metadata_for_tool) == 1
assert result.file_metadata_for_tool[0].filename == "huge.xlsx"
@patch("onyx.chat.process_message.load_in_memory_chat_files")
def test_metadata_only_files_loaded_as_tool_metadata(
self, mock_load: MagicMock
) -> None:
"""When files fit, metadata-only files appear in file_metadata_for_tool."""
text_file_id = str(uuid4())
tabular_file_id = str(uuid4())
text_uf = _make_user_file(token_count=100, file_id=text_file_id)
tabular_uf = _make_user_file(
token_count=500, name="data.csv", file_id=tabular_file_id
)
tabular_uf.file_type = "text/csv"
mock_load.return_value = [
_make_in_memory_file(file_id=text_file_id, content="hello"),
InMemoryChatFile(
file_id=tabular_file_id,
content=b"col1,col2\na,b",
file_type=ChatFileType.TABULAR,
filename="data.csv",
),
]
result = extract_context_files(
user_files=[text_uf, tabular_uf],
llm_max_context_window=10000,
reserved_token_count=0,
db_session=MagicMock(),
)
assert result.file_texts == ["hello"]
assert len(result.file_metadata_for_tool) == 1
assert result.file_metadata_for_tool[0].filename == "data.csv"
# TABULAR should not appear in file_metadata (that's for citation)
assert all(m.filename != "data.csv" for m in result.file_metadata)
def test_overflow_with_vector_db_preserves_metadata_only_tool_metadata(
self,
) -> None:
"""When text files overflow with vector DB enabled, metadata-only files
should still be exposed via file_metadata_for_tool since they aren't
in the vector DB and would otherwise be inaccessible."""
text_uf = _make_user_file(token_count=7000, name="bigfile.txt")
tabular_uf = _make_user_file(token_count=500, name="data.xlsx")
tabular_uf.file_type = (
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
result = extract_context_files(
user_files=[text_uf, tabular_uf],
llm_max_context_window=10000,
reserved_token_count=0,
db_session=MagicMock(),
)
# Text files overflow → search filter enabled
assert result.use_as_search_filter is True
assert result.file_texts == []
# TABULAR file should still be in tool metadata
assert len(result.file_metadata_for_tool) == 1
assert result.file_metadata_for_tool[0].filename == "data.xlsx"
@patch("onyx.chat.process_message.DISABLE_VECTOR_DB", True)
def test_overflow_no_vector_db_includes_all_files_in_tool_metadata(self) -> None:
"""When vector DB is disabled and files overflow, all files
(both text and metadata-only) appear in file_metadata_for_tool."""
text_uf = _make_user_file(token_count=7000, name="bigfile.txt")
tabular_uf = _make_user_file(token_count=500, name="data.xlsx")
tabular_uf.file_type = (
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
result = extract_context_files(
user_files=[text_uf, tabular_uf],
llm_max_context_window=10000,
reserved_token_count=0,
db_session=MagicMock(),
)
assert result.use_as_search_filter is False
assert len(result.file_metadata_for_tool) == 2
filenames = {m.filename for m in result.file_metadata_for_tool}
assert filenames == {"bigfile.txt", "data.xlsx"}
# ===========================================================================
# Search filter + search_usage determination

View File

@@ -0,0 +1,173 @@
"""Unit tests for the Emitter class.
All tests use the streaming mode (merged_queue required). Emitter has a single
code path — no standalone bus.
"""
import queue
from onyx.chat.emitter import Emitter
from onyx.server.query_and_chat.placement import Placement
from onyx.server.query_and_chat.streaming_models import OverallStop
from onyx.server.query_and_chat.streaming_models import Packet
from onyx.server.query_and_chat.streaming_models import ReasoningStart
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _placement(
turn_index: int = 0,
tab_index: int = 0,
sub_turn_index: int | None = None,
) -> Placement:
return Placement(
turn_index=turn_index,
tab_index=tab_index,
sub_turn_index=sub_turn_index,
)
def _packet(
turn_index: int = 0,
tab_index: int = 0,
sub_turn_index: int | None = None,
) -> Packet:
"""Build a minimal valid packet with an OverallStop payload."""
return Packet(
placement=_placement(turn_index, tab_index, sub_turn_index),
obj=OverallStop(stop_reason="test"),
)
def _make_emitter(model_idx: int = 0) -> tuple["Emitter", "queue.Queue"]:
"""Return (emitter, queue) wired together."""
mq: queue.Queue = queue.Queue()
return Emitter(merged_queue=mq, model_idx=model_idx), mq
# ---------------------------------------------------------------------------
# Queue routing
# ---------------------------------------------------------------------------
class TestEmitterQueueRouting:
def test_emit_lands_on_merged_queue(self) -> None:
emitter, mq = _make_emitter()
emitter.emit(_packet())
assert not mq.empty()
def test_queue_item_is_tuple_of_key_and_packet(self) -> None:
emitter, mq = _make_emitter(model_idx=1)
emitter.emit(_packet())
item = mq.get_nowait()
assert isinstance(item, tuple)
assert len(item) == 2
def test_multiple_packets_delivered_fifo(self) -> None:
emitter, mq = _make_emitter()
p1 = _packet(turn_index=0)
p2 = _packet(turn_index=1)
emitter.emit(p1)
emitter.emit(p2)
_, t1 = mq.get_nowait()
_, t2 = mq.get_nowait()
assert t1.placement.turn_index == 0
assert t2.placement.turn_index == 1
# ---------------------------------------------------------------------------
# model_index tagging
# ---------------------------------------------------------------------------
class TestEmitterModelIndexTagging:
def test_n1_default_model_idx_tags_model_index_zero(self) -> None:
"""N=1: default model_idx=0, so packet gets model_index=0."""
emitter, mq = _make_emitter(model_idx=0)
emitter.emit(_packet())
_key, tagged = mq.get_nowait()
assert tagged.placement.model_index == 0
def test_model_idx_one_tags_packet(self) -> None:
emitter, mq = _make_emitter(model_idx=1)
emitter.emit(_packet())
_key, tagged = mq.get_nowait()
assert tagged.placement.model_index == 1
def test_model_idx_two_tags_packet(self) -> None:
"""Boundary: third model in a 3-model run."""
emitter, mq = _make_emitter(model_idx=2)
emitter.emit(_packet())
_key, tagged = mq.get_nowait()
assert tagged.placement.model_index == 2
# ---------------------------------------------------------------------------
# Queue key
# ---------------------------------------------------------------------------
class TestEmitterQueueKey:
def test_key_equals_model_idx(self) -> None:
"""Drain loop uses the key to route packets; it must match model_idx."""
emitter, mq = _make_emitter(model_idx=2)
emitter.emit(_packet())
key, _ = mq.get_nowait()
assert key == 2
def test_n1_key_is_zero(self) -> None:
emitter, mq = _make_emitter(model_idx=0)
emitter.emit(_packet())
key, _ = mq.get_nowait()
assert key == 0
# ---------------------------------------------------------------------------
# Placement field preservation
# ---------------------------------------------------------------------------
class TestEmitterPlacementPreservation:
def test_turn_index_is_preserved(self) -> None:
emitter, mq = _make_emitter()
emitter.emit(_packet(turn_index=5))
_, tagged = mq.get_nowait()
assert tagged.placement.turn_index == 5
def test_tab_index_is_preserved(self) -> None:
emitter, mq = _make_emitter()
emitter.emit(_packet(tab_index=3))
_, tagged = mq.get_nowait()
assert tagged.placement.tab_index == 3
def test_sub_turn_index_is_preserved(self) -> None:
emitter, mq = _make_emitter()
emitter.emit(_packet(sub_turn_index=2))
_, tagged = mq.get_nowait()
assert tagged.placement.sub_turn_index == 2
def test_sub_turn_index_none_is_preserved(self) -> None:
emitter, mq = _make_emitter()
emitter.emit(_packet(sub_turn_index=None))
_, tagged = mq.get_nowait()
assert tagged.placement.sub_turn_index is None
def test_packet_obj_is_not_modified(self) -> None:
"""The payload object must survive tagging untouched."""
emitter, mq = _make_emitter()
original_obj = OverallStop(stop_reason="sentinel")
pkt = Packet(placement=_placement(), obj=original_obj)
emitter.emit(pkt)
_, tagged = mq.get_nowait()
assert tagged.obj is original_obj
def test_different_obj_types_are_handled(self) -> None:
"""Any valid PacketObj type passes through correctly."""
emitter, mq = _make_emitter()
pkt = Packet(placement=_placement(), obj=ReasoningStart())
emitter.emit(pkt)
_, tagged = mq.get_nowait()
assert isinstance(tagged.obj, ReasoningStart)

View File

@@ -644,92 +644,6 @@ class TestConstructMessageHistory:
assert "Project file 0 content" in project_message.message
assert "Project file 1 content" in project_message.message
def test_file_metadata_for_tool_produces_message(self) -> None:
"""When context_files has file_metadata_for_tool, a metadata listing
message should be injected into the history."""
system_prompt = create_message("System", MessageType.SYSTEM, 10)
user_msg = create_message("Analyze the spreadsheet", MessageType.USER, 5)
context_files = ExtractedContextFiles(
file_texts=[],
image_files=[],
use_as_search_filter=False,
total_token_count=0,
file_metadata=[],
uncapped_token_count=0,
file_metadata_for_tool=[
FileToolMetadata(
file_id="xlsx-1",
filename="report.xlsx",
approx_char_count=100000,
),
],
)
result = construct_message_history(
system_prompt=system_prompt,
custom_agent_prompt=None,
simple_chat_history=[user_msg],
reminder_message=None,
context_files=context_files,
available_tokens=1000,
token_counter=_simple_token_counter,
)
# Should have: system, tool_metadata_message, user
assert len(result) == 3
metadata_msg = result[1]
assert metadata_msg.message_type == MessageType.USER
assert "report.xlsx" in metadata_msg.message
assert "xlsx-1" in metadata_msg.message
def test_metadata_only_and_text_files_both_present(self) -> None:
"""When both text content and tool metadata are present, both messages
should appear in the history."""
system_prompt = create_message("System", MessageType.SYSTEM, 10)
user_msg = create_message("Summarize everything", MessageType.USER, 5)
context_files = ExtractedContextFiles(
file_texts=["Text file content here"],
image_files=[],
use_as_search_filter=False,
total_token_count=100,
file_metadata=[
ContextFileMetadata(
file_id="txt-1",
filename="notes.txt",
file_content="Text file content here",
),
],
uncapped_token_count=100,
file_metadata_for_tool=[
FileToolMetadata(
file_id="xlsx-1",
filename="data.xlsx",
approx_char_count=50000,
),
],
)
result = construct_message_history(
system_prompt=system_prompt,
custom_agent_prompt=None,
simple_chat_history=[user_msg],
reminder_message=None,
context_files=context_files,
available_tokens=2000,
token_counter=_simple_token_counter,
)
# Should have: system, context_files_message, tool_metadata_message, user
assert len(result) == 4
# Context files message (text content)
assert "documents" in result[1].message
assert "Text file content here" in result[1].message
# Tool metadata message
assert "data.xlsx" in result[2].message
assert result[3] == user_msg
def _simple_token_counter(text: str) -> int:
"""Approximate token counter for tests (~4 chars per token)."""

View File

@@ -0,0 +1,768 @@
"""Unit tests for multi-model streaming validation and DB helpers.
These are pure unit tests — no real database or LLM calls required.
The validation logic in handle_multi_model_stream fires before any external
calls, so we can trigger it with lightweight mocks.
"""
import time
from collections.abc import Generator
from typing import Any
from typing import cast
from unittest.mock import MagicMock
from unittest.mock import patch
from uuid import uuid4
import pytest
from onyx.chat.models import StreamingError
from onyx.configs.constants import MessageType
from onyx.db.chat import set_preferred_response
from onyx.llm.override_models import LLMOverride
from onyx.server.query_and_chat.models import SendMessageRequest
from onyx.server.query_and_chat.placement import Placement
from onyx.server.query_and_chat.streaming_models import OverallStop
from onyx.server.query_and_chat.streaming_models import Packet
from onyx.server.query_and_chat.streaming_models import ReasoningStart
from onyx.utils.variable_functionality import global_version
@pytest.fixture(autouse=True)
def _restore_ee_version() -> Generator[None, None, None]:
"""Reset EE global state after each test.
Importing onyx.chat.process_message triggers set_is_ee_based_on_env_variable()
(via the celery client import chain). Without this fixture, the EE flag stays
True for the rest of the session and breaks unrelated tests that mock Confluence
or other connectors and assume EE is disabled.
"""
original = global_version._is_ee
yield
global_version._is_ee = original
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_request(**kwargs: Any) -> SendMessageRequest:
defaults: dict[str, Any] = {
"message": "hello",
"chat_session_id": uuid4(),
}
defaults.update(kwargs)
return SendMessageRequest(**defaults)
def _make_override(provider: str = "openai", version: str = "gpt-4") -> LLMOverride:
return LLMOverride(model_provider=provider, model_version=version)
def _first_from_stream(req: SendMessageRequest, overrides: list[LLMOverride]) -> Any:
"""Return the first item yielded by handle_multi_model_stream."""
from onyx.chat.process_message import handle_multi_model_stream
user = MagicMock()
user.is_anonymous = False
user.email = "test@example.com"
db = MagicMock()
gen = handle_multi_model_stream(req, user, db, overrides)
return next(gen)
# ---------------------------------------------------------------------------
# handle_multi_model_stream — validation
# ---------------------------------------------------------------------------
class TestRunMultiModelStreamValidation:
def test_single_override_yields_error(self) -> None:
"""Exactly 1 override is not multi-model — yields StreamingError."""
req = _make_request()
result = _first_from_stream(req, [_make_override()])
assert isinstance(result, StreamingError)
assert "2-3" in result.error
def test_four_overrides_yields_error(self) -> None:
"""4 overrides exceeds maximum — yields StreamingError."""
req = _make_request()
result = _first_from_stream(
req,
[
_make_override("openai", "gpt-4"),
_make_override("anthropic", "claude-3"),
_make_override("google", "gemini-pro"),
_make_override("cohere", "command-r"),
],
)
assert isinstance(result, StreamingError)
assert "2-3" in result.error
def test_zero_overrides_yields_error(self) -> None:
"""Empty override list yields StreamingError."""
req = _make_request()
result = _first_from_stream(req, [])
assert isinstance(result, StreamingError)
assert "2-3" in result.error
def test_deep_research_yields_error(self) -> None:
"""deep_research=True is incompatible with multi-model — yields StreamingError."""
req = _make_request(deep_research=True)
result = _first_from_stream(
req, [_make_override(), _make_override("anthropic", "claude-3")]
)
assert isinstance(result, StreamingError)
assert "not supported" in result.error
def test_exactly_two_overrides_is_minimum(self) -> None:
"""Boundary: 1 override yields error, 2 overrides passes validation."""
req = _make_request()
# 1 override must yield a StreamingError
result = _first_from_stream(req, [_make_override()])
assert isinstance(
result, StreamingError
), "1 override should yield StreamingError"
# 2 overrides must NOT yield a validation StreamingError (may raise later due to
# missing session, that's OK — validation itself passed)
try:
result2 = _first_from_stream(
req, [_make_override(), _make_override("anthropic", "claude-3")]
)
if isinstance(result2, StreamingError) and "2-3" in result2.error:
pytest.fail(
f"2 overrides should pass validation, got StreamingError: {result2.error}"
)
except Exception:
pass # Any non-validation error means validation passed
# ---------------------------------------------------------------------------
# set_preferred_response — validation (mocked db)
# ---------------------------------------------------------------------------
class TestSetPreferredResponseValidation:
def test_user_message_not_found(self) -> None:
db = MagicMock()
db.get.return_value = None
with pytest.raises(ValueError, match="not found"):
set_preferred_response(
db, user_message_id=999, preferred_assistant_message_id=1
)
def test_wrong_message_type(self) -> None:
"""Cannot set preferred response on a non-USER message."""
db = MagicMock()
user_msg = MagicMock()
user_msg.message_type = MessageType.ASSISTANT # wrong type
db.get.return_value = user_msg
with pytest.raises(ValueError, match="not a user message"):
set_preferred_response(
db, user_message_id=1, preferred_assistant_message_id=2
)
def test_assistant_message_not_found(self) -> None:
db = MagicMock()
user_msg = MagicMock()
user_msg.message_type = MessageType.USER
# First call returns user_msg, second call (for assistant) returns None
db.get.side_effect = [user_msg, None]
with pytest.raises(ValueError, match="not found"):
set_preferred_response(
db, user_message_id=1, preferred_assistant_message_id=2
)
def test_assistant_not_child_of_user(self) -> None:
db = MagicMock()
user_msg = MagicMock()
user_msg.message_type = MessageType.USER
assistant_msg = MagicMock()
assistant_msg.parent_message_id = 999 # different parent
db.get.side_effect = [user_msg, assistant_msg]
with pytest.raises(ValueError, match="not a child"):
set_preferred_response(
db, user_message_id=1, preferred_assistant_message_id=2
)
def test_valid_call_sets_preferred_response_id(self) -> None:
db = MagicMock()
user_msg = MagicMock()
user_msg.message_type = MessageType.USER
assistant_msg = MagicMock()
assistant_msg.parent_message_id = 1 # correct parent
db.get.side_effect = [user_msg, assistant_msg]
set_preferred_response(db, user_message_id=1, preferred_assistant_message_id=2)
assert user_msg.preferred_response_id == 2
assert user_msg.latest_child_message_id == 2
# ---------------------------------------------------------------------------
# LLMOverride — display_name field
# ---------------------------------------------------------------------------
class TestLLMOverrideDisplayName:
def test_display_name_defaults_none(self) -> None:
override = LLMOverride(model_provider="openai", model_version="gpt-4")
assert override.display_name is None
def test_display_name_set(self) -> None:
override = LLMOverride(
model_provider="openai",
model_version="gpt-4",
display_name="GPT-4 Turbo",
)
assert override.display_name == "GPT-4 Turbo"
def test_display_name_serializes(self) -> None:
override = LLMOverride(
model_provider="anthropic",
model_version="claude-opus-4-6",
display_name="Claude Opus",
)
d = override.model_dump()
assert d["display_name"] == "Claude Opus"
# ---------------------------------------------------------------------------
# _run_models — drain loop behaviour
# ---------------------------------------------------------------------------
def _make_setup(n_models: int = 1) -> MagicMock:
"""Minimal ChatTurnSetup mock whose fields pass Pydantic validation in _run_model."""
setup = MagicMock()
setup.llms = [MagicMock() for _ in range(n_models)]
setup.model_display_names = [f"model-{i}" for i in range(n_models)]
setup.check_is_connected = MagicMock(return_value=True)
setup.reserved_messages = [MagicMock() for _ in range(n_models)]
setup.reserved_token_count = 100
# Fields consumed by SearchToolConfig / CustomToolConfig / FileReaderToolConfig
# constructors inside _run_model — must be typed correctly for Pydantic.
setup.new_msg_req.deep_research = False
setup.new_msg_req.internal_search_filters = None
setup.new_msg_req.allowed_tool_ids = None
setup.new_msg_req.include_citations = True
setup.search_params.project_id_filter = None
setup.search_params.persona_id_filter = None
setup.bypass_acl = False
setup.slack_context = None
setup.available_files.user_file_ids = []
setup.available_files.chat_file_ids = []
setup.forced_tool_id = None
setup.simple_chat_history = []
setup.chat_session.id = uuid4()
setup.user_message.id = None
setup.custom_tool_additional_headers = None
setup.mcp_headers = None
return setup
def _run_models_collect(setup: MagicMock) -> list:
"""Drive _run_models to completion and return all yielded items."""
from onyx.chat.process_message import _run_models
return list(_run_models(setup, MagicMock(), MagicMock()))
class TestRunModels:
"""Tests for the _run_models worker-thread drain loop.
All external dependencies (LLM, DB, tools) are patched out. Worker threads
still run but return immediately since run_llm_loop is mocked.
"""
def test_n1_overall_stop_from_llm_loop_passes_through(self) -> None:
"""OverallStop emitted by run_llm_loop is passed through the drain loop unchanged."""
def emit_stop(**kwargs: Any) -> None:
kwargs["emitter"].emit(
Packet(
placement=Placement(turn_index=0),
obj=OverallStop(stop_reason="complete"),
)
)
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=emit_stop),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(_make_setup(n_models=1))
stops = [
p
for p in packets
if isinstance(p, Packet) and isinstance(p.obj, OverallStop)
]
assert len(stops) == 1
stop_obj = stops[0].obj
assert isinstance(stop_obj, OverallStop)
assert stop_obj.stop_reason == "complete"
def test_n1_emitted_packet_has_model_index_zero(self) -> None:
"""Single-model path: model_index is 0 (Emitter defaults model_idx=0)."""
def emit_one(**kwargs: Any) -> None:
kwargs["emitter"].emit(
Packet(placement=Placement(turn_index=0), obj=ReasoningStart())
)
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=emit_one),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(_make_setup(n_models=1))
reasoning = [
p
for p in packets
if isinstance(p, Packet) and isinstance(p.obj, ReasoningStart)
]
assert len(reasoning) == 1
assert reasoning[0].placement.model_index == 0
def test_n2_each_model_packet_tagged_with_its_index(self) -> None:
"""Multi-model path: packets from model 0 get index=0, model 1 gets index=1."""
def emit_one(**kwargs: Any) -> None:
# _model_idx is set by _run_model based on position in setup.llms
emitter = kwargs["emitter"]
emitter.emit(
Packet(placement=Placement(turn_index=0), obj=ReasoningStart())
)
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=emit_one),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(_make_setup(n_models=2))
reasoning = [
p
for p in packets
if isinstance(p, Packet) and isinstance(p.obj, ReasoningStart)
]
assert len(reasoning) == 2
indices = {p.placement.model_index for p in reasoning}
assert indices == {0, 1}
def test_model_error_yields_streaming_error(self) -> None:
"""An exception inside a worker thread is surfaced as a StreamingError."""
def always_fail(**_kwargs: Any) -> None:
raise RuntimeError("intentional test failure")
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=always_fail),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(_make_setup(n_models=1))
errors = [p for p in packets if isinstance(p, StreamingError)]
assert len(errors) == 1
assert errors[0].error_code == "MODEL_ERROR"
assert "intentional test failure" in errors[0].error
def test_one_model_error_does_not_stop_other_models(self) -> None:
"""A failing model yields StreamingError; the surviving model's packets still arrive."""
setup = _make_setup(n_models=2)
def fail_model_0_succeed_model_1(**kwargs: Any) -> None:
if kwargs["llm"] is setup.llms[0]:
raise RuntimeError("model 0 failed")
kwargs["emitter"].emit(
Packet(placement=Placement(turn_index=0), obj=ReasoningStart())
)
with (
patch(
"onyx.chat.process_message.run_llm_loop",
side_effect=fail_model_0_succeed_model_1,
),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(setup)
errors = [p for p in packets if isinstance(p, StreamingError)]
assert len(errors) == 1
reasoning = [
p
for p in packets
if isinstance(p, Packet) and isinstance(p.obj, ReasoningStart)
]
assert len(reasoning) == 1
assert reasoning[0].placement.model_index == 1
def test_cancellation_yields_user_cancelled_stop(self) -> None:
"""If check_is_connected returns False, drain loop emits user_cancelled."""
def slow_llm(**_kwargs: Any) -> None:
time.sleep(0.3) # Outlasts the 50 ms queue-poll interval
setup = _make_setup(n_models=1)
setup.check_is_connected = MagicMock(return_value=False)
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=slow_llm),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(setup)
stops = [
p
for p in packets
if isinstance(p, Packet) and isinstance(p.obj, OverallStop)
]
assert any(
isinstance(s.obj, OverallStop) and s.obj.stop_reason == "user_cancelled"
for s in stops
)
def test_stop_button_calls_completion_for_all_models(self) -> None:
"""llm_loop_completion_handle must be called for all models when the stop button fires.
Regression test for the disconnect-cleanup bug: the old
run_chat_loop_with_state_containers always called completion_callback in
its finally block (even on disconnect) so the DB message was updated from
the TERMINATED placeholder to a partial answer. The new _run_models must
replicate this — otherwise the integration test
test_send_message_disconnect_and_cleanup fails because the message stays
as "Response was terminated prior to completion, try regenerating."
"""
def slow_llm(**_kwargs: Any) -> None:
time.sleep(0.3)
setup = _make_setup(n_models=2)
setup.check_is_connected = MagicMock(return_value=False)
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=slow_llm),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle"
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
_run_models_collect(setup)
# Must be called once per model, not zero times
assert mock_handle.call_count == 2
def test_completion_handle_called_for_each_successful_model(self) -> None:
"""llm_loop_completion_handle must be called once per model that succeeded."""
setup = _make_setup(n_models=2)
with (
patch("onyx.chat.process_message.run_llm_loop"),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle"
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
_run_models_collect(setup)
assert mock_handle.call_count == 2
def test_completion_handle_not_called_for_failed_model(self) -> None:
"""llm_loop_completion_handle must be skipped for a model that raised."""
def always_fail(**_kwargs: Any) -> None:
raise RuntimeError("fail")
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=always_fail),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle"
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
_run_models_collect(_make_setup(n_models=1))
mock_handle.assert_not_called()
def test_http_disconnect_completion_via_generator_exit(self) -> None:
"""GeneratorExit from HTTP disconnect triggers worker self-completion.
When the HTTP client closes the connection, Starlette throws GeneratorExit
into the stream generator. The finally block sets drain_done (signalling
emitters to stop blocking) and calls executor.shutdown(wait=False) so the
server thread is never blocked. Worker threads detect drain_done.is_set()
after run_llm_loop completes and self-persist the result via
llm_loop_completion_handle using their own DB session.
This is the primary regression for test_send_message_disconnect_and_cleanup:
the integration test disconnects mid-stream and expects the DB message to be
updated from the TERMINATED placeholder to the real response.
"""
import threading
# Signals the worker to unblock from run_llm_loop after gen.close() returns.
# This guarantees drain_done is set BEFORE the worker returns from run_llm_loop,
# so the self-completion path (drain_done.is_set() check) is always taken.
disconnect_received = threading.Event()
# Set by the llm_loop_completion_handle mock when called.
completion_called = threading.Event()
def emit_then_complete(**kwargs: Any) -> None:
"""Emit one packet (to give the drain loop a yield point), then block
until the main thread signals that gen.close() has been called. This
ensures drain_done is set before we return so model_succeeded is checked
against a set drain_done — no race condition.
"""
emitter = kwargs["emitter"]
emitter.emit(
Packet(placement=Placement(turn_index=0), obj=ReasoningStart())
)
disconnect_received.wait(timeout=5)
setup = _make_setup(n_models=1)
# is_connected() always True — HTTP disconnect does NOT set the Redis stop fence.
setup.check_is_connected = MagicMock(return_value=True)
with (
patch(
"onyx.chat.process_message.run_llm_loop",
side_effect=emit_then_complete,
),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle",
side_effect=lambda *_, **__: completion_called.set(),
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
from onyx.chat.process_message import _run_models
# cast to Generator so .close() is available; _run_models returns
# AnswerStream (= Iterator) but the actual object is always a generator.
gen = cast(Generator, _run_models(setup, MagicMock(), MagicMock()))
# Advance to the first yielded packet — generator suspends at `yield item`.
first = next(gen)
assert isinstance(first, Packet)
# Simulate Starlette closing the stream on HTTP client disconnect.
# GeneratorExit is thrown at the `yield item` suspension point.
gen.close()
# Unblock the worker now that drain_done has been set by gen.close().
disconnect_received.set()
# Worker self-completes asynchronously (executor.shutdown(wait=False)).
# Wait here, inside the patch context, so that get_session_with_current_tenant
# and llm_loop_completion_handle mocks are still active when the worker calls them.
assert completion_called.wait(
timeout=5
), "worker must self-complete via drain_done within 5 seconds"
assert (
mock_handle.call_count == 1
), "completion handle must be called once for the successful model"
def test_b1_race_disconnect_handler_completes_already_finished_model(self) -> None:
"""B1 regression: model finishes BEFORE GeneratorExit fires.
The worker exits _run_model with drain_done.is_set()=False and skips
self-completion. When gen.close() fires afterward, the finally else-branch
must detect model_succeeded=True and call llm_loop_completion_handle itself.
Contrast with test_http_disconnect_completion_via_generator_exit, which
tests the opposite ordering (worker finishes AFTER disconnect).
"""
import threading
import time
completion_called = threading.Event()
def emit_and_return_immediately(**kwargs: Any) -> None:
# Emit one packet so the drain loop has something to yield, then return
# immediately — no blocking. The worker will be done in microseconds.
kwargs["emitter"].emit(
Packet(placement=Placement(turn_index=0), obj=ReasoningStart())
)
setup = _make_setup(n_models=1)
setup.check_is_connected = MagicMock(return_value=True)
with (
patch(
"onyx.chat.process_message.run_llm_loop",
side_effect=emit_and_return_immediately,
),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle",
side_effect=lambda *_, **__: completion_called.set(),
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
from onyx.chat.process_message import _run_models
gen = cast(Generator, _run_models(setup, MagicMock(), MagicMock()))
first = next(gen)
assert isinstance(first, Packet)
# Give the worker thread time to finish completely (emit + return +
# finally + self-completion check). It does almost no work, so 100 ms
# is far more than enough while still keeping the test fast.
time.sleep(0.1)
# Now close — worker is already done, so else-branch handles completion.
gen.close()
assert completion_called.wait(
timeout=5
), "disconnect handler must call completion for a model that already finished"
assert mock_handle.call_count == 1, "completion must be called exactly once"
def test_stop_button_does_not_call_completion_for_errored_model(self) -> None:
"""B2 regression: stop-button must NOT call completion for an errored model.
When model 0 raises an exception, its reserved ChatMessage must not be
saved with 'stopped by user' — that message is wrong for a model that
errored. llm_loop_completion_handle must only be called for non-errored
models when the stop button fires.
"""
def fail_model_0(**kwargs: Any) -> None:
if kwargs["llm"] is setup.llms[0]:
raise RuntimeError("model 0 errored")
# Model 1: run forever (stop button fires before it finishes)
time.sleep(10)
setup = _make_setup(n_models=2)
# Return False immediately so the stop-button path fires while model 1
# is still sleeping (model 0 has already errored by then).
setup.check_is_connected = lambda: False
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=fail_model_0),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle"
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
_run_models_collect(setup)
# Completion must NOT be called for model 0 (it errored).
# It MAY be called for model 1 (still in-flight when stop fired).
for call in mock_handle.call_args_list:
assert (
call.kwargs.get("llm") is not setup.llms[0]
), "llm_loop_completion_handle must not be called for the errored model"
def test_external_state_container_used_for_model_zero(self) -> None:
"""When provided, external_state_container is used as state_containers[0]."""
from onyx.chat.chat_state import ChatStateContainer
from onyx.chat.process_message import _run_models
external = ChatStateContainer()
setup = _make_setup(n_models=1)
with (
patch("onyx.chat.process_message.run_llm_loop") as mock_llm,
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
list(
_run_models(
setup, MagicMock(), MagicMock(), external_state_container=external
)
)
# The state_container kwarg passed to run_llm_loop must be the external one
call_kwargs = mock_llm.call_args.kwargs
assert call_kwargs["state_container"] is external

View File

@@ -139,7 +139,7 @@ def test_csv_file_type() -> None:
result = _extract_referenced_file_descriptors([tool_call], message)
assert len(result) == 1
assert result[0]["type"] == ChatFileType.TABULAR
assert result[0]["type"] == ChatFileType.CSV
def test_unknown_extension_defaults_to_plain_text() -> None:

View File

@@ -1,23 +1,15 @@
"""Tests for Canvas connector — client, credentials, conversion."""
"""Tests for Canvas connector — client (PR1)."""
from datetime import datetime
from datetime import timezone
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from onyx.configs.constants import DocumentSource
from onyx.connectors.canvas.client import CanvasApiClient
from onyx.connectors.canvas.connector import CanvasConnector
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedValidationError
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.error_handling.exceptions import OnyxError
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@@ -26,77 +18,6 @@ FAKE_BASE_URL = "https://myschool.instructure.com"
FAKE_TOKEN = "fake-canvas-token"
def _mock_course(
course_id: int = 1,
name: str = "Intro to CS",
course_code: str = "CS101",
) -> dict[str, Any]:
return {
"id": course_id,
"name": name,
"course_code": course_code,
"created_at": "2025-01-01T00:00:00Z",
"workflow_state": "available",
}
def _build_connector(base_url: str = FAKE_BASE_URL) -> CanvasConnector:
"""Build a connector with mocked credential validation."""
with patch("onyx.connectors.canvas.client.rl_requests") as mock_req:
mock_req.get.return_value = _mock_response(json_data=[_mock_course()])
connector = CanvasConnector(canvas_base_url=base_url)
connector.load_credentials({"canvas_access_token": FAKE_TOKEN})
return connector
def _mock_page(
page_id: int = 10,
title: str = "Syllabus",
updated_at: str = "2025-06-01T12:00:00Z",
) -> dict[str, Any]:
return {
"page_id": page_id,
"url": "syllabus",
"title": title,
"body": "<p>Welcome to the course</p>",
"created_at": "2025-01-15T00:00:00Z",
"updated_at": updated_at,
}
def _mock_assignment(
assignment_id: int = 20,
name: str = "Homework 1",
course_id: int = 1,
updated_at: str = "2025-06-01T12:00:00Z",
) -> dict[str, Any]:
return {
"id": assignment_id,
"name": name,
"description": "<p>Solve these problems</p>",
"html_url": f"{FAKE_BASE_URL}/courses/{course_id}/assignments/{assignment_id}",
"course_id": course_id,
"created_at": "2025-01-20T00:00:00Z",
"updated_at": updated_at,
"due_at": "2025-02-01T23:59:00Z",
}
def _mock_announcement(
announcement_id: int = 30,
title: str = "Class Cancelled",
course_id: int = 1,
posted_at: str = "2025-06-01T12:00:00Z",
) -> dict[str, Any]:
return {
"id": announcement_id,
"title": title,
"message": "<p>No class today</p>",
"html_url": f"{FAKE_BASE_URL}/courses/{course_id}/discussion_topics/{announcement_id}",
"posted_at": posted_at,
}
def _mock_response(
status_code: int = 200,
json_data: Any = None,
@@ -404,57 +325,6 @@ class TestGet:
assert result == expected
# ---------------------------------------------------------------------------
# CanvasApiClient.paginate tests
# ---------------------------------------------------------------------------
class TestPaginate:
@patch("onyx.connectors.canvas.client.rl_requests")
def test_single_page(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(
json_data=[{"id": 1}, {"id": 2}]
)
client = CanvasApiClient(
bearer_token=FAKE_TOKEN,
canvas_base_url=FAKE_BASE_URL,
)
pages = list(client.paginate("courses"))
assert len(pages) == 1
assert pages[0] == [{"id": 1}, {"id": 2}]
@patch("onyx.connectors.canvas.client.rl_requests")
def test_two_pages(self, mock_requests: MagicMock) -> None:
next_link = f'<{FAKE_BASE_URL}/api/v1/courses?page=2>; rel="next"'
page1 = _mock_response(json_data=[{"id": 1}], link_header=next_link)
page2 = _mock_response(json_data=[{"id": 2}])
mock_requests.get.side_effect = [page1, page2]
client = CanvasApiClient(
bearer_token=FAKE_TOKEN,
canvas_base_url=FAKE_BASE_URL,
)
pages = list(client.paginate("courses"))
assert len(pages) == 2
assert pages[0] == [{"id": 1}]
assert pages[1] == [{"id": 2}]
@patch("onyx.connectors.canvas.client.rl_requests")
def test_empty_response(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[])
client = CanvasApiClient(
bearer_token=FAKE_TOKEN,
canvas_base_url=FAKE_BASE_URL,
)
pages = list(client.paginate("courses"))
assert pages == []
# ---------------------------------------------------------------------------
# CanvasApiClient._parse_next_link tests
# ---------------------------------------------------------------------------
@@ -509,368 +379,3 @@ class TestParseNextLink:
with pytest.raises(OnyxError, match="must use https"):
self.client._parse_next_link(header)
# ---------------------------------------------------------------------------
# CanvasConnector — credential loading
# ---------------------------------------------------------------------------
class TestLoadCredentials:
def _assert_load_credentials_raises(
self,
status_code: int,
expected_error: type[Exception],
mock_requests: MagicMock,
) -> None:
"""Helper: assert load_credentials raises expected_error for a given status."""
mock_requests.get.return_value = _mock_response(status_code, {})
connector = CanvasConnector(canvas_base_url=FAKE_BASE_URL)
with pytest.raises(expected_error):
connector.load_credentials({"canvas_access_token": FAKE_TOKEN})
@patch("onyx.connectors.canvas.client.rl_requests")
def test_load_credentials_success(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[_mock_course()])
connector = CanvasConnector(canvas_base_url=FAKE_BASE_URL)
result = connector.load_credentials({"canvas_access_token": FAKE_TOKEN})
assert result is None
assert connector._canvas_client is not None
def test_canvas_client_raises_without_credentials(self) -> None:
connector = CanvasConnector(canvas_base_url=FAKE_BASE_URL)
with pytest.raises(ConnectorMissingCredentialError):
_ = connector.canvas_client
@patch("onyx.connectors.canvas.client.rl_requests")
def test_load_credentials_invalid_token(self, mock_requests: MagicMock) -> None:
self._assert_load_credentials_raises(401, CredentialExpiredError, mock_requests)
@patch("onyx.connectors.canvas.client.rl_requests")
def test_load_credentials_insufficient_permissions(
self, mock_requests: MagicMock
) -> None:
self._assert_load_credentials_raises(
403, InsufficientPermissionsError, mock_requests
)
# ---------------------------------------------------------------------------
# CanvasConnector — URL normalization
# ---------------------------------------------------------------------------
class TestConnectorUrlNormalization:
def test_strips_api_v1_suffix(self) -> None:
connector = _build_connector(base_url=f"{FAKE_BASE_URL}/api/v1")
result = connector.canvas_base_url
expected = FAKE_BASE_URL
assert result == expected
def test_strips_trailing_slash(self) -> None:
connector = _build_connector(base_url=f"{FAKE_BASE_URL}/")
result = connector.canvas_base_url
expected = FAKE_BASE_URL
assert result == expected
def test_no_change_for_clean_url(self) -> None:
connector = _build_connector(base_url=FAKE_BASE_URL)
result = connector.canvas_base_url
expected = FAKE_BASE_URL
assert result == expected
# ---------------------------------------------------------------------------
# CanvasConnector — document conversion
# ---------------------------------------------------------------------------
class TestDocumentConversion:
def setup_method(self) -> None:
self.connector = _build_connector()
def test_convert_page_to_document(self) -> None:
from onyx.connectors.canvas.connector import CanvasPage
page = CanvasPage(
page_id=10,
url="syllabus",
title="Syllabus",
body="<p>Welcome</p>",
created_at="2025-01-15T00:00:00Z",
updated_at="2025-06-01T12:00:00Z",
course_id=1,
)
doc = self.connector._convert_page_to_document(page)
expected_id = "canvas-page-1-10"
expected_metadata = {"course_id": "1", "type": "page"}
expected_updated_at = datetime(2025, 6, 1, 12, 0, tzinfo=timezone.utc)
assert doc.id == expected_id
assert doc.source == DocumentSource.CANVAS
assert doc.semantic_identifier == "Syllabus"
assert doc.metadata == expected_metadata
assert doc.sections[0].link is not None
assert f"{FAKE_BASE_URL}/courses/1/pages/syllabus" in doc.sections[0].link
assert doc.doc_updated_at == expected_updated_at
def test_convert_page_without_body(self) -> None:
from onyx.connectors.canvas.connector import CanvasPage
page = CanvasPage(
page_id=11,
url="empty-page",
title="Empty Page",
body=None,
created_at="2025-01-15T00:00:00Z",
updated_at="2025-06-01T12:00:00Z",
course_id=1,
)
doc = self.connector._convert_page_to_document(page)
section_text = doc.sections[0].text
assert section_text is not None
assert "Empty Page" in section_text
assert "<p>" not in section_text
def test_convert_assignment_to_document(self) -> None:
from onyx.connectors.canvas.connector import CanvasAssignment
assignment = CanvasAssignment(
id=20,
name="Homework 1",
description="<p>Solve these</p>",
html_url=f"{FAKE_BASE_URL}/courses/1/assignments/20",
course_id=1,
created_at="2025-01-20T00:00:00Z",
updated_at="2025-06-01T12:00:00Z",
due_at="2025-02-01T23:59:00Z",
)
doc = self.connector._convert_assignment_to_document(assignment)
expected_id = "canvas-assignment-1-20"
expected_due_text = "Due: February 01, 2025 23:59 UTC"
assert doc.id == expected_id
assert doc.source == DocumentSource.CANVAS
assert doc.semantic_identifier == "Homework 1"
assert doc.sections[0].text is not None
assert expected_due_text in doc.sections[0].text
def test_convert_assignment_without_description(self) -> None:
from onyx.connectors.canvas.connector import CanvasAssignment
assignment = CanvasAssignment(
id=21,
name="Quiz 1",
description=None,
html_url=f"{FAKE_BASE_URL}/courses/1/assignments/21",
course_id=1,
created_at="2025-01-20T00:00:00Z",
updated_at="2025-06-01T12:00:00Z",
due_at=None,
)
doc = self.connector._convert_assignment_to_document(assignment)
section_text = doc.sections[0].text
assert section_text is not None
assert "Quiz 1" in section_text
assert "Due:" not in section_text
def test_convert_announcement_to_document(self) -> None:
from onyx.connectors.canvas.connector import CanvasAnnouncement
announcement = CanvasAnnouncement(
id=30,
title="Class Cancelled",
message="<p>No class today</p>",
html_url=f"{FAKE_BASE_URL}/courses/1/discussion_topics/30",
posted_at="2025-06-01T12:00:00Z",
course_id=1,
)
doc = self.connector._convert_announcement_to_document(announcement)
expected_id = "canvas-announcement-1-30"
expected_updated_at = datetime(2025, 6, 1, 12, 0, tzinfo=timezone.utc)
assert doc.id == expected_id
assert doc.source == DocumentSource.CANVAS
assert doc.semantic_identifier == "Class Cancelled"
assert doc.doc_updated_at == expected_updated_at
def test_convert_announcement_without_posted_at(self) -> None:
from onyx.connectors.canvas.connector import CanvasAnnouncement
announcement = CanvasAnnouncement(
id=31,
title="TBD Announcement",
message=None,
html_url=f"{FAKE_BASE_URL}/courses/1/discussion_topics/31",
posted_at=None,
course_id=1,
)
doc = self.connector._convert_announcement_to_document(announcement)
assert doc.doc_updated_at is None
# ---------------------------------------------------------------------------
# CanvasConnector — validate_connector_settings
# ---------------------------------------------------------------------------
class TestValidateConnectorSettings:
def _assert_validate_raises(
self,
status_code: int,
expected_error: type[Exception],
mock_requests: MagicMock,
) -> None:
"""Helper: assert validate_connector_settings raises expected_error."""
success_resp = _mock_response(json_data=[_mock_course()])
fail_resp = _mock_response(status_code, {})
mock_requests.get.side_effect = [success_resp, fail_resp]
connector = CanvasConnector(canvas_base_url=FAKE_BASE_URL)
connector.load_credentials({"canvas_access_token": FAKE_TOKEN})
with pytest.raises(expected_error):
connector.validate_connector_settings()
@patch("onyx.connectors.canvas.client.rl_requests")
def test_validate_success(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[_mock_course()])
connector = _build_connector()
connector.validate_connector_settings() # should not raise
@patch("onyx.connectors.canvas.client.rl_requests")
def test_validate_expired_credential(self, mock_requests: MagicMock) -> None:
self._assert_validate_raises(401, CredentialExpiredError, mock_requests)
@patch("onyx.connectors.canvas.client.rl_requests")
def test_validate_insufficient_permissions(self, mock_requests: MagicMock) -> None:
self._assert_validate_raises(403, InsufficientPermissionsError, mock_requests)
@patch("onyx.connectors.canvas.client.rl_requests")
def test_validate_rate_limited(self, mock_requests: MagicMock) -> None:
self._assert_validate_raises(429, ConnectorValidationError, mock_requests)
@patch("onyx.connectors.canvas.client.rl_requests")
def test_validate_unexpected_error(self, mock_requests: MagicMock) -> None:
self._assert_validate_raises(500, UnexpectedValidationError, mock_requests)
# ---------------------------------------------------------------------------
# _list_* pagination tests
# ---------------------------------------------------------------------------
class TestListCourses:
@patch("onyx.connectors.canvas.client.rl_requests")
def test_single_page(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(
json_data=[_mock_course(1), _mock_course(2, "CS201", "Data Structures")]
)
connector = _build_connector()
result = connector._list_courses()
assert len(result) == 2
assert result[0].id == 1
assert result[1].id == 2
@patch("onyx.connectors.canvas.client.rl_requests")
def test_empty_response(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[])
connector = _build_connector()
result = connector._list_courses()
assert result == []
class TestListPages:
@patch("onyx.connectors.canvas.client.rl_requests")
def test_single_page(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(
json_data=[_mock_page(10), _mock_page(11, "Notes")]
)
connector = _build_connector()
result = connector._list_pages(course_id=1)
assert len(result) == 2
assert result[0].page_id == 10
assert result[1].page_id == 11
@patch("onyx.connectors.canvas.client.rl_requests")
def test_empty_response(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[])
connector = _build_connector()
result = connector._list_pages(course_id=1)
assert result == []
class TestListAssignments:
@patch("onyx.connectors.canvas.client.rl_requests")
def test_single_page(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(
json_data=[_mock_assignment(20), _mock_assignment(21, "Quiz 1")]
)
connector = _build_connector()
result = connector._list_assignments(course_id=1)
assert len(result) == 2
assert result[0].id == 20
assert result[1].id == 21
@patch("onyx.connectors.canvas.client.rl_requests")
def test_empty_response(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[])
connector = _build_connector()
result = connector._list_assignments(course_id=1)
assert result == []
class TestListAnnouncements:
@patch("onyx.connectors.canvas.client.rl_requests")
def test_single_page(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(
json_data=[_mock_announcement(30), _mock_announcement(31, "Update")]
)
connector = _build_connector()
result = connector._list_announcements(course_id=1)
assert len(result) == 2
assert result[0].id == 30
assert result[1].id == 31
@patch("onyx.connectors.canvas.client.rl_requests")
def test_empty_response(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[])
connector = _build_connector()
result = connector._list_announcements(course_id=1)
assert result == []

View File

@@ -1,45 +0,0 @@
from unittest.mock import AsyncMock
from unittest.mock import patch
import pytest
from discord.errors import LoginFailure
from onyx.connectors.discord.connector import DiscordConnector
from onyx.connectors.exceptions import CredentialInvalidError
def _build_connector(token: str = "fake-bot-token") -> DiscordConnector:
connector = DiscordConnector()
connector.load_credentials({"discord_bot_token": token})
return connector
@patch("onyx.connectors.discord.connector.Client.close", new_callable=AsyncMock)
@patch("onyx.connectors.discord.connector.Client.login", new_callable=AsyncMock)
def test_validate_success(
mock_login: AsyncMock,
mock_close: AsyncMock,
) -> None:
connector = _build_connector()
connector.validate_connector_settings()
mock_login.assert_awaited_once_with("fake-bot-token")
mock_close.assert_awaited_once()
@patch("onyx.connectors.discord.connector.Client.close", new_callable=AsyncMock)
@patch(
"onyx.connectors.discord.connector.Client.login",
new_callable=AsyncMock,
side_effect=LoginFailure("Improper token has been passed."),
)
def test_validate_invalid_token(
mock_login: AsyncMock, # noqa: ARG001
mock_close: AsyncMock,
) -> None:
connector = _build_connector(token="bad-token")
with pytest.raises(CredentialInvalidError, match="Invalid Discord bot token"):
connector.validate_connector_settings()
mock_close.assert_awaited_once()

View File

@@ -1,225 +0,0 @@
"""Tests for get_chat_sessions_by_user filtering behavior.
Verifies that failed chat sessions (those with only SYSTEM messages) are
correctly filtered out while preserving recently created sessions, matching
the behavior specified in PR #7233.
"""
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from unittest.mock import MagicMock
from uuid import UUID
from uuid import uuid4
import pytest
from sqlalchemy.orm import Session
from onyx.db.chat import get_chat_sessions_by_user
from onyx.db.models import ChatSession
def _make_session(
user_id: UUID,
time_created: datetime | None = None,
time_updated: datetime | None = None,
description: str = "",
) -> MagicMock:
"""Create a mock ChatSession with the given attributes."""
session = MagicMock(spec=ChatSession)
session.id = uuid4()
session.user_id = user_id
session.time_created = time_created or datetime.now(timezone.utc)
session.time_updated = time_updated or session.time_created
session.description = description
session.deleted = False
session.onyxbot_flow = False
session.project_id = None
return session
@pytest.fixture
def user_id() -> UUID:
return uuid4()
@pytest.fixture
def old_time() -> datetime:
"""A timestamp well outside the 5-minute leeway window."""
return datetime.now(timezone.utc) - timedelta(hours=1)
@pytest.fixture
def recent_time() -> datetime:
"""A timestamp within the 5-minute leeway window."""
return datetime.now(timezone.utc) - timedelta(minutes=2)
class TestGetChatSessionsByUser:
"""Tests for the failed chat filtering logic in get_chat_sessions_by_user."""
def test_filters_out_failed_sessions(
self, user_id: UUID, old_time: datetime
) -> None:
"""Sessions with only SYSTEM messages should be excluded."""
valid_session = _make_session(user_id, time_created=old_time)
failed_session = _make_session(user_id, time_created=old_time)
db_session = MagicMock(spec=Session)
# First execute: returns all sessions
# Second execute: returns only the valid session's ID (has non-system msgs)
mock_result_1 = MagicMock()
mock_result_1.scalars.return_value.all.return_value = [
valid_session,
failed_session,
]
mock_result_2 = MagicMock()
mock_result_2.scalars.return_value.all.return_value = [valid_session.id]
db_session.execute.side_effect = [mock_result_1, mock_result_2]
result = get_chat_sessions_by_user(
user_id=user_id,
deleted=False,
db_session=db_session,
include_failed_chats=False,
)
assert len(result) == 1
assert result[0].id == valid_session.id
def test_keeps_recent_sessions_without_messages(
self, user_id: UUID, recent_time: datetime
) -> None:
"""Recently created sessions should be kept even without messages."""
recent_session = _make_session(user_id, time_created=recent_time)
db_session = MagicMock(spec=Session)
mock_result_1 = MagicMock()
mock_result_1.scalars.return_value.all.return_value = [recent_session]
db_session.execute.side_effect = [mock_result_1]
result = get_chat_sessions_by_user(
user_id=user_id,
deleted=False,
db_session=db_session,
include_failed_chats=False,
)
assert len(result) == 1
assert result[0].id == recent_session.id
# Should only have been called once — no second query needed
# because the recent session is within the leeway window
assert db_session.execute.call_count == 1
def test_include_failed_chats_skips_filtering(
self, user_id: UUID, old_time: datetime
) -> None:
"""When include_failed_chats=True, no filtering should occur."""
session_a = _make_session(user_id, time_created=old_time)
session_b = _make_session(user_id, time_created=old_time)
db_session = MagicMock(spec=Session)
mock_result = MagicMock()
mock_result.scalars.return_value.all.return_value = [session_a, session_b]
db_session.execute.side_effect = [mock_result]
result = get_chat_sessions_by_user(
user_id=user_id,
deleted=False,
db_session=db_session,
include_failed_chats=True,
)
assert len(result) == 2
# Only one DB call — no second query for message validation
assert db_session.execute.call_count == 1
def test_limit_applied_after_filtering(
self, user_id: UUID, old_time: datetime
) -> None:
"""Limit should be applied after filtering, not before."""
sessions = [_make_session(user_id, time_created=old_time) for _ in range(5)]
valid_ids = [s.id for s in sessions[:3]]
db_session = MagicMock(spec=Session)
mock_result_1 = MagicMock()
mock_result_1.scalars.return_value.all.return_value = sessions
mock_result_2 = MagicMock()
mock_result_2.scalars.return_value.all.return_value = valid_ids
db_session.execute.side_effect = [mock_result_1, mock_result_2]
result = get_chat_sessions_by_user(
user_id=user_id,
deleted=False,
db_session=db_session,
include_failed_chats=False,
limit=2,
)
assert len(result) == 2
# Should be the first 2 valid sessions (order preserved)
assert result[0].id == sessions[0].id
assert result[1].id == sessions[1].id
def test_mixed_recent_and_old_sessions(
self, user_id: UUID, old_time: datetime, recent_time: datetime
) -> None:
"""Mix of recent and old sessions should filter correctly."""
old_valid = _make_session(user_id, time_created=old_time)
old_failed = _make_session(user_id, time_created=old_time)
recent_no_msgs = _make_session(user_id, time_created=recent_time)
db_session = MagicMock(spec=Session)
mock_result_1 = MagicMock()
mock_result_1.scalars.return_value.all.return_value = [
old_valid,
old_failed,
recent_no_msgs,
]
mock_result_2 = MagicMock()
mock_result_2.scalars.return_value.all.return_value = [old_valid.id]
db_session.execute.side_effect = [mock_result_1, mock_result_2]
result = get_chat_sessions_by_user(
user_id=user_id,
deleted=False,
db_session=db_session,
include_failed_chats=False,
)
result_ids = {cs.id for cs in result}
assert old_valid.id in result_ids
assert recent_no_msgs.id in result_ids
assert old_failed.id not in result_ids
def test_empty_result(self, user_id: UUID) -> None:
"""No sessions should return empty list without errors."""
db_session = MagicMock(spec=Session)
mock_result = MagicMock()
mock_result.scalars.return_value.all.return_value = []
db_session.execute.side_effect = [mock_result]
result = get_chat_sessions_by_user(
user_id=user_id,
deleted=False,
db_session=db_session,
include_failed_chats=False,
)
assert result == []
assert db_session.execute.call_count == 1

View File

@@ -40,8 +40,6 @@ def test_send_task_includes_expires(
user_files=user_files,
rejected_files=[],
id_to_temp_id={},
skip_indexing_filenames=set(),
indexable_files=user_files,
)
mock_user = MagicMock()

View File

@@ -11,13 +11,30 @@ from onyx.hooks.api_dependencies import require_hook_enabled
class TestRequireHookEnabled:
def test_raises_when_multi_tenant(self) -> None:
with patch("onyx.hooks.api_dependencies.MULTI_TENANT", True):
with (
patch("onyx.hooks.api_dependencies.MULTI_TENANT", True),
patch("onyx.hooks.api_dependencies.HOOK_ENABLED", True),
):
with pytest.raises(OnyxError) as exc_info:
require_hook_enabled()
assert exc_info.value.error_code is OnyxErrorCode.SINGLE_TENANT_ONLY
assert exc_info.value.status_code == 403
assert "multi-tenant" in exc_info.value.detail
def test_passes_when_single_tenant(self) -> None:
with patch("onyx.hooks.api_dependencies.MULTI_TENANT", False):
def test_raises_when_flag_disabled(self) -> None:
with (
patch("onyx.hooks.api_dependencies.MULTI_TENANT", False),
patch("onyx.hooks.api_dependencies.HOOK_ENABLED", False),
):
with pytest.raises(OnyxError) as exc_info:
require_hook_enabled()
assert exc_info.value.error_code is OnyxErrorCode.ENV_VAR_GATED
assert exc_info.value.status_code == 403
assert "HOOK_ENABLED" in exc_info.value.detail
def test_passes_when_enabled_single_tenant(self) -> None:
with (
patch("onyx.hooks.api_dependencies.MULTI_TENANT", False),
patch("onyx.hooks.api_dependencies.HOOK_ENABLED", True),
):
require_hook_enabled() # must not raise

View File

@@ -9,11 +9,11 @@ import httpx
import pytest
from pydantic import BaseModel
from ee.onyx.hooks.executor import _execute_hook_impl as execute_hook
from onyx.db.enums import HookFailStrategy
from onyx.db.enums import HookPoint
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.hooks.executor import execute_hook
from onyx.hooks.executor import HookSkipped
from onyx.hooks.executor import HookSoftFailed
from onyx.hooks.points.query_processing import QueryProcessingResponse
@@ -118,30 +118,28 @@ def db_session() -> MagicMock:
@pytest.mark.parametrize(
"multi_tenant,hook",
"hooks_available,hook",
[
# MULTI_TENANT=True exits before the DB lookup — hook is irrelevant.
pytest.param(True, None, id="multi_tenant"),
pytest.param(False, None, id="hook_not_found"),
pytest.param(False, _make_hook(is_active=False), id="hook_inactive"),
pytest.param(False, _make_hook(endpoint_url=None), id="no_endpoint_url"),
# HOOKS_AVAILABLE=False exits before the DB lookup — hook is irrelevant.
pytest.param(False, None, id="hooks_not_available"),
pytest.param(True, None, id="hook_not_found"),
pytest.param(True, _make_hook(is_active=False), id="hook_inactive"),
pytest.param(True, _make_hook(endpoint_url=None), id="no_endpoint_url"),
],
)
def test_early_exit_returns_skipped_with_no_db_writes(
db_session: MagicMock,
multi_tenant: bool,
hooks_available: bool,
hook: MagicMock | None,
) -> None:
with (
patch("ee.onyx.hooks.executor.MULTI_TENANT", multi_tenant),
patch("onyx.hooks.executor.HOOKS_AVAILABLE", hooks_available),
patch(
"ee.onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
"onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
return_value=hook,
),
patch("ee.onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch(
"ee.onyx.hooks.executor.create_hook_execution_log__no_commit"
) as mock_log,
patch("onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch("onyx.hooks.executor.create_hook_execution_log__no_commit") as mock_log,
):
result = execute_hook(
db_session=db_session,
@@ -166,16 +164,14 @@ def test_success_returns_validated_model_and_sets_reachable(
hook = _make_hook()
with (
patch("ee.onyx.hooks.executor.MULTI_TENANT", False),
patch("onyx.hooks.executor.HOOKS_AVAILABLE", True),
patch(
"ee.onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
"onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
return_value=hook,
),
patch("ee.onyx.hooks.executor.get_session_with_current_tenant"),
patch("ee.onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch(
"ee.onyx.hooks.executor.create_hook_execution_log__no_commit"
) as mock_log,
patch("onyx.hooks.executor.get_session_with_current_tenant"),
patch("onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch("onyx.hooks.executor.create_hook_execution_log__no_commit") as mock_log,
patch("httpx.Client") as mock_client_cls,
):
_setup_client(mock_client_cls, response=_make_response())
@@ -199,14 +195,14 @@ def test_success_skips_reachable_write_when_already_true(db_session: MagicMock)
hook = _make_hook(is_reachable=True)
with (
patch("ee.onyx.hooks.executor.MULTI_TENANT", False),
patch("onyx.hooks.executor.HOOKS_AVAILABLE", True),
patch(
"ee.onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
"onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
return_value=hook,
),
patch("ee.onyx.hooks.executor.get_session_with_current_tenant"),
patch("ee.onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch("ee.onyx.hooks.executor.create_hook_execution_log__no_commit"),
patch("onyx.hooks.executor.get_session_with_current_tenant"),
patch("onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch("onyx.hooks.executor.create_hook_execution_log__no_commit"),
patch("httpx.Client") as mock_client_cls,
):
_setup_client(mock_client_cls, response=_make_response())
@@ -228,16 +224,14 @@ def test_non_dict_json_response_is_a_failure(db_session: MagicMock) -> None:
hook = _make_hook(fail_strategy=HookFailStrategy.SOFT)
with (
patch("ee.onyx.hooks.executor.MULTI_TENANT", False),
patch("onyx.hooks.executor.HOOKS_AVAILABLE", True),
patch(
"ee.onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
"onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
return_value=hook,
),
patch("ee.onyx.hooks.executor.get_session_with_current_tenant"),
patch("ee.onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch(
"ee.onyx.hooks.executor.create_hook_execution_log__no_commit"
) as mock_log,
patch("onyx.hooks.executor.get_session_with_current_tenant"),
patch("onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch("onyx.hooks.executor.create_hook_execution_log__no_commit") as mock_log,
patch("httpx.Client") as mock_client_cls,
):
_setup_client(
@@ -264,16 +258,14 @@ def test_json_decode_failure_is_a_failure(db_session: MagicMock) -> None:
hook = _make_hook(fail_strategy=HookFailStrategy.SOFT)
with (
patch("ee.onyx.hooks.executor.MULTI_TENANT", False),
patch("onyx.hooks.executor.HOOKS_AVAILABLE", True),
patch(
"ee.onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
"onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
return_value=hook,
),
patch("ee.onyx.hooks.executor.get_session_with_current_tenant"),
patch("ee.onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch(
"ee.onyx.hooks.executor.create_hook_execution_log__no_commit"
) as mock_log,
patch("onyx.hooks.executor.get_session_with_current_tenant"),
patch("onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch("onyx.hooks.executor.create_hook_execution_log__no_commit") as mock_log,
patch("httpx.Client") as mock_client_cls,
):
_setup_client(
@@ -392,14 +384,14 @@ def test_http_failure_paths(
hook = _make_hook(fail_strategy=fail_strategy)
with (
patch("ee.onyx.hooks.executor.MULTI_TENANT", False),
patch("onyx.hooks.executor.HOOKS_AVAILABLE", True),
patch(
"ee.onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
"onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
return_value=hook,
),
patch("ee.onyx.hooks.executor.get_session_with_current_tenant"),
patch("ee.onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch("ee.onyx.hooks.executor.create_hook_execution_log__no_commit"),
patch("onyx.hooks.executor.get_session_with_current_tenant"),
patch("onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch("onyx.hooks.executor.create_hook_execution_log__no_commit"),
patch("httpx.Client") as mock_client_cls,
):
_setup_client(mock_client_cls, side_effect=exception)
@@ -451,14 +443,14 @@ def test_authorization_header(
hook = _make_hook(api_key=api_key)
with (
patch("ee.onyx.hooks.executor.MULTI_TENANT", False),
patch("onyx.hooks.executor.HOOKS_AVAILABLE", True),
patch(
"ee.onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
"onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
return_value=hook,
),
patch("ee.onyx.hooks.executor.get_session_with_current_tenant"),
patch("ee.onyx.hooks.executor.update_hook__no_commit"),
patch("ee.onyx.hooks.executor.create_hook_execution_log__no_commit"),
patch("onyx.hooks.executor.get_session_with_current_tenant"),
patch("onyx.hooks.executor.update_hook__no_commit"),
patch("onyx.hooks.executor.create_hook_execution_log__no_commit"),
patch("httpx.Client") as mock_client_cls,
):
mock_client = _setup_client(mock_client_cls, response=_make_response())
@@ -497,13 +489,13 @@ def test_persist_session_failure_is_swallowed(
hook = _make_hook(fail_strategy=HookFailStrategy.HARD)
with (
patch("ee.onyx.hooks.executor.MULTI_TENANT", False),
patch("onyx.hooks.executor.HOOKS_AVAILABLE", True),
patch(
"ee.onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
"onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
return_value=hook,
),
patch(
"ee.onyx.hooks.executor.get_session_with_current_tenant",
"onyx.hooks.executor.get_session_with_current_tenant",
side_effect=RuntimeError("DB unavailable"),
),
patch("httpx.Client") as mock_client_cls,
@@ -564,16 +556,14 @@ def test_response_validation_failure_respects_fail_strategy(
hook = _make_hook(fail_strategy=fail_strategy)
with (
patch("ee.onyx.hooks.executor.MULTI_TENANT", False),
patch("onyx.hooks.executor.HOOKS_AVAILABLE", True),
patch(
"ee.onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
"onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
return_value=hook,
),
patch("ee.onyx.hooks.executor.get_session_with_current_tenant"),
patch("ee.onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch(
"ee.onyx.hooks.executor.create_hook_execution_log__no_commit"
) as mock_log,
patch("onyx.hooks.executor.get_session_with_current_tenant"),
patch("onyx.hooks.executor.update_hook__no_commit") as mock_update,
patch("onyx.hooks.executor.create_hook_execution_log__no_commit") as mock_log,
patch("httpx.Client") as mock_client_cls,
):
# Response payload is missing required_field → ValidationError
@@ -629,13 +619,13 @@ def test_unexpected_exception_in_inner_respects_fail_strategy(
hook = _make_hook(fail_strategy=fail_strategy)
with (
patch("ee.onyx.hooks.executor.MULTI_TENANT", False),
patch("onyx.hooks.executor.HOOKS_AVAILABLE", True),
patch(
"ee.onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
"onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
return_value=hook,
),
patch(
"ee.onyx.hooks.executor._execute_hook_inner",
"onyx.hooks.executor._execute_hook_inner",
side_effect=ValueError("unexpected bug"),
),
):
@@ -668,19 +658,17 @@ def test_is_reachable_failure_does_not_prevent_log(db_session: MagicMock) -> Non
hook = _make_hook(fail_strategy=HookFailStrategy.SOFT)
with (
patch("ee.onyx.hooks.executor.MULTI_TENANT", False),
patch("onyx.hooks.executor.HOOKS_AVAILABLE", True),
patch(
"ee.onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
"onyx.hooks.executor.get_non_deleted_hook_by_hook_point",
return_value=hook,
),
patch("ee.onyx.hooks.executor.get_session_with_current_tenant"),
patch("onyx.hooks.executor.get_session_with_current_tenant"),
patch(
"ee.onyx.hooks.executor.update_hook__no_commit",
"onyx.hooks.executor.update_hook__no_commit",
side_effect=OnyxError(OnyxErrorCode.NOT_FOUND, "hook deleted"),
),
patch(
"ee.onyx.hooks.executor.create_hook_execution_log__no_commit"
) as mock_log,
patch("onyx.hooks.executor.create_hook_execution_log__no_commit") as mock_log,
patch("httpx.Client") as mock_client_cls,
):
_setup_client(mock_client_cls, side_effect=httpx.ConnectError("refused"))

View File

@@ -1,4 +1,4 @@
"""Unit tests for ee.onyx.server.features.hooks.api helpers.
"""Unit tests for onyx.server.features.hooks.api helpers.
Covers:
- _check_ssrf_safety: scheme enforcement and private-IP blocklist
@@ -16,13 +16,13 @@ from unittest.mock import patch
import httpx
import pytest
from ee.onyx.server.features.hooks.api import _check_ssrf_safety
from ee.onyx.server.features.hooks.api import _raise_for_validation_failure
from ee.onyx.server.features.hooks.api import _validate_endpoint
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.hooks.models import HookValidateResponse
from onyx.hooks.models import HookValidateStatus
from onyx.server.features.hooks.api import _check_ssrf_safety
from onyx.server.features.hooks.api import _raise_for_validation_failure
from onyx.server.features.hooks.api import _validate_endpoint
# ---------------------------------------------------------------------------
# Helpers
@@ -117,28 +117,28 @@ class TestCheckSsrfSafety:
class TestValidateEndpoint:
def _call(self, *, api_key: str | None = _API_KEY) -> HookValidateResponse:
# Bypass SSRF check — tested separately in TestCheckSsrfSafety.
with patch("ee.onyx.server.features.hooks.api._check_ssrf_safety"):
with patch("onyx.server.features.hooks.api._check_ssrf_safety"):
return _validate_endpoint(
endpoint_url=_URL,
api_key=api_key,
timeout_seconds=_TIMEOUT,
)
@patch("ee.onyx.server.features.hooks.api.httpx.Client")
@patch("onyx.server.features.hooks.api.httpx.Client")
def test_2xx_returns_passed(self, mock_client_cls: MagicMock) -> None:
mock_client_cls.return_value.__enter__.return_value.post.return_value = (
_mock_response(200)
)
assert self._call().status == HookValidateStatus.passed
@patch("ee.onyx.server.features.hooks.api.httpx.Client")
@patch("onyx.server.features.hooks.api.httpx.Client")
def test_5xx_returns_passed(self, mock_client_cls: MagicMock) -> None:
mock_client_cls.return_value.__enter__.return_value.post.return_value = (
_mock_response(500)
)
assert self._call().status == HookValidateStatus.passed
@patch("ee.onyx.server.features.hooks.api.httpx.Client")
@patch("onyx.server.features.hooks.api.httpx.Client")
@pytest.mark.parametrize("status_code", [401, 403])
def test_401_403_returns_auth_failed(
self, mock_client_cls: MagicMock, status_code: int
@@ -150,21 +150,21 @@ class TestValidateEndpoint:
assert result.status == HookValidateStatus.auth_failed
assert str(status_code) in (result.error_message or "")
@patch("ee.onyx.server.features.hooks.api.httpx.Client")
@patch("onyx.server.features.hooks.api.httpx.Client")
def test_4xx_non_auth_returns_passed(self, mock_client_cls: MagicMock) -> None:
mock_client_cls.return_value.__enter__.return_value.post.return_value = (
_mock_response(422)
)
assert self._call().status == HookValidateStatus.passed
@patch("ee.onyx.server.features.hooks.api.httpx.Client")
@patch("onyx.server.features.hooks.api.httpx.Client")
def test_connect_timeout_returns_timeout(self, mock_client_cls: MagicMock) -> None:
mock_client_cls.return_value.__enter__.return_value.post.side_effect = (
httpx.ConnectTimeout("timed out")
)
assert self._call().status == HookValidateStatus.timeout
@patch("ee.onyx.server.features.hooks.api.httpx.Client")
@patch("onyx.server.features.hooks.api.httpx.Client")
@pytest.mark.parametrize(
"exc",
[
@@ -179,7 +179,7 @@ class TestValidateEndpoint:
mock_client_cls.return_value.__enter__.return_value.post.side_effect = exc
assert self._call().status == HookValidateStatus.timeout
@patch("ee.onyx.server.features.hooks.api.httpx.Client")
@patch("onyx.server.features.hooks.api.httpx.Client")
def test_connect_error_returns_cannot_connect(
self, mock_client_cls: MagicMock
) -> None:
@@ -189,7 +189,7 @@ class TestValidateEndpoint:
)
assert self._call().status == HookValidateStatus.cannot_connect
@patch("ee.onyx.server.features.hooks.api.httpx.Client")
@patch("onyx.server.features.hooks.api.httpx.Client")
def test_arbitrary_exception_returns_cannot_connect(
self, mock_client_cls: MagicMock
) -> None:
@@ -198,7 +198,7 @@ class TestValidateEndpoint:
)
assert self._call().status == HookValidateStatus.cannot_connect
@patch("ee.onyx.server.features.hooks.api.httpx.Client")
@patch("onyx.server.features.hooks.api.httpx.Client")
def test_api_key_sent_as_bearer(self, mock_client_cls: MagicMock) -> None:
mock_post = mock_client_cls.return_value.__enter__.return_value.post
mock_post.return_value = _mock_response(200)
@@ -206,7 +206,7 @@ class TestValidateEndpoint:
_, kwargs = mock_post.call_args
assert kwargs["headers"]["Authorization"] == "Bearer mykey"
@patch("ee.onyx.server.features.hooks.api.httpx.Client")
@patch("onyx.server.features.hooks.api.httpx.Client")
def test_no_api_key_omits_auth_header(self, mock_client_cls: MagicMock) -> None:
mock_post = mock_client_cls.return_value.__enter__.return_value.post
mock_post.return_value = _mock_response(200)

View File

@@ -417,57 +417,3 @@ def test_categorize_text_under_token_limit_accepted(
assert len(result.acceptable) == 1
assert result.acceptable_file_to_token_count["ok.txt"] == 500
# --- skip-indexing vs rejection by file type ---
def test_csv_over_token_threshold_accepted_skip_indexing(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""CSV exceeding token threshold is uploaded but flagged to skip indexing."""
_patch_common_dependencies(monkeypatch, upload_size_mb=1000, token_threshold_k=1)
text = "x" * 2000 # 2000 tokens > 1000 threshold
monkeypatch.setattr(utils, "extract_file_text", lambda **_kwargs: text)
upload = _make_upload("large.csv", size=2000, content=text.encode())
result = utils.categorize_uploaded_files([upload], MagicMock())
assert len(result.acceptable) == 1
assert result.acceptable[0].filename == "large.csv"
assert "large.csv" in result.skip_indexing
assert len(result.rejected) == 0
def test_csv_under_token_threshold_accepted_and_indexed(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""CSV under token threshold is uploaded and indexed normally."""
_patch_common_dependencies(monkeypatch, upload_size_mb=1000, token_threshold_k=1)
text = "x" * 500 # 500 tokens < 1000 threshold
monkeypatch.setattr(utils, "extract_file_text", lambda **_kwargs: text)
upload = _make_upload("small.csv", size=500, content=text.encode())
result = utils.categorize_uploaded_files([upload], MagicMock())
assert len(result.acceptable) == 1
assert result.acceptable[0].filename == "small.csv"
assert "small.csv" not in result.skip_indexing
assert len(result.rejected) == 0
def test_pdf_over_token_threshold_rejected(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""PDF exceeding token threshold is rejected entirely (not uploaded)."""
_patch_common_dependencies(monkeypatch, upload_size_mb=1000, token_threshold_k=1)
text = "x" * 2000 # 2000 tokens > 1000 threshold
monkeypatch.setattr(utils, "extract_file_text", lambda **_kwargs: text)
upload = _make_upload("big.pdf", size=2000, content=text.encode())
result = utils.categorize_uploaded_files([upload], MagicMock())
assert len(result.rejected) == 1
assert result.rejected[0].filename == "big.pdf"
assert "1K token limit" in result.rejected[0].reason
assert len(result.acceptable) == 0

View File

@@ -82,7 +82,7 @@ class TestChatFileConversion:
ChatLoadedFile(
file_id="file-2",
content=b"csv,data\n1,2",
file_type=ChatFileType.TABULAR,
file_type=ChatFileType.CSV,
filename="data.csv",
content_text="csv,data\n1,2",
token_count=5,

View File

@@ -1,6 +1,6 @@
"""Tests for memory tool streaming packet emissions."""
from queue import Queue
import queue
from unittest.mock import MagicMock
from unittest.mock import patch
@@ -18,9 +18,13 @@ from onyx.tools.tool_implementations.memory.models import MemoryToolResponse
@pytest.fixture
def emitter() -> Emitter:
bus: Queue = Queue()
return Emitter(bus)
def emitter_queue() -> queue.Queue:
return queue.Queue()
@pytest.fixture
def emitter(emitter_queue: queue.Queue) -> Emitter:
return Emitter(merged_queue=emitter_queue)
@pytest.fixture
@@ -53,24 +57,27 @@ class TestMemoryToolEmitStart:
def test_emit_start_emits_memory_tool_start_packet(
self,
memory_tool: MemoryTool,
emitter: Emitter,
emitter_queue: queue.Queue,
placement: Placement,
) -> None:
memory_tool.emit_start(placement)
packet = emitter.bus.get_nowait()
_key, packet = emitter_queue.get_nowait()
assert isinstance(packet.obj, MemoryToolStart)
assert packet.placement == placement
assert packet.placement is not None
assert packet.placement.turn_index == placement.turn_index
assert packet.placement.tab_index == placement.tab_index
assert packet.placement.model_index == 0 # emitter stamps model_index=0
def test_emit_start_with_different_placement(
self,
memory_tool: MemoryTool,
emitter: Emitter,
emitter_queue: queue.Queue,
) -> None:
placement = Placement(turn_index=2, tab_index=1)
memory_tool.emit_start(placement)
packet = emitter.bus.get_nowait()
_key, packet = emitter_queue.get_nowait()
assert packet.placement.turn_index == 2
assert packet.placement.tab_index == 1
@@ -81,7 +88,7 @@ class TestMemoryToolRun:
self,
mock_process: MagicMock,
memory_tool: MemoryTool,
emitter: Emitter,
emitter_queue: queue.Queue,
placement: Placement,
override_kwargs: MemoryToolOverrideKwargs,
) -> None:
@@ -93,21 +100,19 @@ class TestMemoryToolRun:
memory="User prefers Python",
)
# The delta packet should be in the queue
packet = emitter.bus.get_nowait()
_key, packet = emitter_queue.get_nowait()
assert isinstance(packet.obj, MemoryToolDelta)
assert packet.obj.memory_text == "User prefers Python"
assert packet.obj.operation == "add"
assert packet.obj.memory_id is None
assert packet.obj.index is None
assert packet.placement == placement
@patch("onyx.tools.tool_implementations.memory.memory_tool.process_memory_update")
def test_run_emits_delta_for_update_operation(
self,
mock_process: MagicMock,
memory_tool: MemoryTool,
emitter: Emitter,
emitter_queue: queue.Queue,
placement: Placement,
override_kwargs: MemoryToolOverrideKwargs,
) -> None:
@@ -119,7 +124,7 @@ class TestMemoryToolRun:
memory="User prefers light mode",
)
packet = emitter.bus.get_nowait()
_key, packet = emitter_queue.get_nowait()
assert isinstance(packet.obj, MemoryToolDelta)
assert packet.obj.memory_text == "User prefers light mode"
assert packet.obj.operation == "update"

Some files were not shown because too many files have changed in this diff Show More