Compare commits

..

249 Commits

Author SHA1 Message Date
Nik
c8e565fa75 fix(chat): fix B1/B2/P1 bugs in multi-model streaming + cleanup
B1 — Self-completion race: model finishes before GeneratorExit fires,
exits _run_model with drain_done=False, skips self-completion. Fix:
add completion_locks (one per model); disconnect else-branch claims
lock and calls llm_loop_completion_handle for already-succeeded models.

B2 — Stop-button saves wrong message for errored models: the stop loop
called llm_loop_completion_handle for all models including ones that
threw exceptions, persisting "stopped by user" for an errored model.
Fix: add model_errored flag; stop loop skips errored models.

P1 — Orphaned ChatMessage rows for errored models: reserved_messages
were never cleaned up when a model errored. Fix: delete via
db_session.get(ChatMessage, id) in all three exit paths (normal
completion, stop-button, disconnect).

Also: extract repeated orphan-cleanup into _delete_orphaned_message
nested helper; remove dead check_call_count variable in tests; rename
ctx→worker_context, _completion_done→completion_persisted; replace
functools.partial with captured-variable lambda; fix stale docstring
("bounded"→"unbounded"); add _CANCEL_POLL_INTERVAL_S named constant;
if/if/if→if/elif/elif; %-style logger calls throughout.

Tests: two new regression tests (B1 race, B2 stop-button errored model).
26 tests pass. mypy clean.
2026-04-01 08:17:50 -07:00
Nik
bab95d8bf0 fix(chat): remove duplicate drain_done declaration after rebase 2026-03-31 20:02:29 -07:00
Nik
eb7bc74e1b fix(chat): persist LLM response on HTTP disconnect via drain_done + worker self-completion
When the HTTP client disconnects, Starlette throws GeneratorExit into the
drain loop generator. The old code called executor.shutdown(wait=False) with
no completion handling, leaving the assistant DB message as the TERMINATED
placeholder forever (regressing test_send_message_disconnect_and_cleanup).

New design:
- drain_done (threading.Event) signals emitters to return immediately instead
  of blocking on queue.put — no retry loops, no daemon threads
- One-time queue drain in the else block releases any in-progress puts so
  workers exit within milliseconds
- Workers self-complete: after run_llm_loop returns, each worker checks
  drain_done.is_set() and, if true, opens its own DB session and calls
  llm_loop_completion_handle directly

Unit test updated to reflect the async self-completion semantics: the test
blocks the worker inside run_llm_loop until gen.close() sets drain_done,
then waits for completion_called inside the patch context (while mocks are
still active) to avoid calling the real get_session_with_current_tenant.
2026-03-31 20:02:29 -07:00
Nik
29da0aefb5 feat(chat): add multi-model parallel streaming (N=2-3 LLMs side-by-side)
Adds support for running 2-3 LLMs in parallel within a single chat turn,
with responses streamed interleaved to the frontend via the merged queue
infrastructure introduced in the preceding PR.

Backend changes
- process_message.py: restore llm_overrides param on build_chat_turn and
  _stream_chat_turn; restore is_multi branching for LLM setup, context
  window sizing, and message ID reservation; add _build_model_display_name
  and handle_multi_model_stream (public multi-model entrypoint)
- db/chat.py: add reserve_multi_model_message_ids (reserves N assistant
  message placeholders sharing the same parent), set_preferred_response
  (marks one response as the user's preferred), and extend
  translate_db_message_to_chat_message_detail with preferred_response_id
  and model_display_name fields
- chat_backend.py: route requests with llm_overrides >1 through
  handle_multi_model_stream; reject non-streaming multi-model requests with
  OnyxError; add /set-preferred-response endpoint

Tests
- test_multi_model_streaming.py: unit tests for _run_models drain loop
  (arrival-order yield, error isolation, cancellation), handle_multi_model_stream
  validation guards, and N=1 backwards-compatibility
2026-03-31 20:01:21 -07:00
Nik
6c86301c51 fix(chat): remove bounded queue and packet drops — match old behavior
Old code used queue.Queue() (unbounded, blocking put). New code introduced
queue.Queue(maxsize=100) + put(timeout=3.0) + silent drop on queue.Full —
a regression in all three callsites:

- Emitter.emit(): data packets silently dropped on queue full
- _run_model exception path: model errors silently lost
- _run_model finally (_MODEL_DONE): if dropped, drain loop hangs forever
  (models_remaining never reaches 0)

Fix: remove maxsize, remove all timeout= arguments, remove all
except queue.Full handlers. The drain_done early-return in emit() is the
correct disconnect mechanism; queue backpressure is not needed.

Also adds _completion_done: bool type annotation and fixes the queue drain
comment (no longer unblocking timed-out puts — just releasing memory).
2026-03-31 20:00:46 -07:00
Nik
631146f48f fix(chat): use model_succeeded instead of check_is_connected on self-completion
On HTTP disconnect, check_is_connected() returns False, causing
llm_loop_completion_handle to treat a completed response as
user-cancelled and append "Generation was stopped by the user."
Use lambda: model_succeeded[model_idx] (always True here) instead,
matching the cancellation path's functools.partial(bool, model_succeeded[i]).
2026-03-31 18:42:04 -07:00
Nik
f327278506 fix(chat): persist LLM response on HTTP disconnect via drain_done + worker self-completion
When the HTTP client disconnects, Starlette throws GeneratorExit into the
drain loop generator. The old else branch just called executor.shutdown(wait=False)
with no completion handling, leaving the assistant DB message as the TERMINATED
placeholder forever (regressing test_send_message_disconnect_and_cleanup).

New design:
- drain_done (threading.Event) signals emitters to return immediately instead
  of blocking on queue.put — no retry loops, no daemon threads
- One-time queue drain in the else block releases any in-progress puts so
  workers exit within milliseconds
- Workers self-complete: after run_llm_loop returns, each worker checks
  drain_done.is_set() and, if true, opens its own DB session and calls
  llm_loop_completion_handle directly
2026-03-31 18:14:50 -07:00
Nik
c7cc439862 fix(emitter): address Greptile P1/P2/P3 and Queue typing
- P1: executor.shutdown(wait=False) on early exit — don't block the
  server thread waiting for LLM workers; they will hit queue.Full
  timeouts and exit on their own (matches old run_chat_loop behavior)
- P2: wrap db_session.commit() in try/finally in build_chat_turn —
  reset processing status before propagating if commit fails, so the
  chat session isn't stuck at "processing" permanently
- P3: fix inaccurate comment "All worker threads have exited" — workers
  may still be closing their own DB sessions at that point; clarify
  that only the main-thread db_session is safe to use
- Queue[Any] → Queue[tuple[int, Packet | Exception | object]] in Emitter
2026-03-31 17:02:46 -07:00
Nik
3365a369e2 fix(review): address Greptile comments
- Add owner to bare TODO comment
- Restore placement field assertions weakened by Emitter refactor
2026-03-31 12:49:09 -07:00
Nik
470bda3fb5 refactor(chat): elegance pass on PR1 changed files
process_message.py:
- Fix `skip_clarification` field in ChatTurnSetup: inline comment inside
  the type annotation → separate `#` comment on the line above the field
- Flatten `model_tools` via list comprehension instead of manual extend loop
- `forced_tool_id` membership test: list → set comprehension (O(1) lookup)
- Trim `_run_model` inner-function docstring — private closure doesn't need
  10-line Args block
- Remove redundant inline param comments from `_stream_chat_turn` and
  `handle_stream_message_objects` where the docstring Args section already
  documents them
- Strip duplicate Args/Returns from `handle_stream_message_objects` docstring
  — it delegates entirely to `_stream_chat_turn`

emitter.py:
- Widen `merged_queue` annotation to `Queue[Any]`: Queue is invariant so
  `Queue[tuple[int, Packet]]` can't be passed a `Queue[tuple[int, Packet |
  Exception | object]]`; the emitter is a write-only producer and doesn't
  care what else lives on the queue
2026-03-31 12:16:38 -07:00
Nik
13f511e209 refactor(emitter): clean up string annotation and use model_copy
- Fix `"Queue"` forward-reference annotation → `Queue[tuple[int, Packet]]`
  (Queue is already imported, the string was unnecessary)
- Replace manual Placement field copy with `base.model_copy(update={...})`
- Remove redundant `key` variable (was just `self._model_idx`)
- Tighten docstring
2026-03-31 11:44:28 -07:00
Nik
c5e8ba1eab refactor(chat): replace bus-polling emitter with merged-queue streaming; fix 429 hang
Switch Emitter from a per-model event bus + polling thread to a single
bounded queue shared across all models.  Each emit() call puts directly onto
the queue; the drain loop in _run_models yields packets in arrival order.

Key changes
- emitter.py: remove Bus, get_default_emitter(); add Emitter(merged_queue, model_idx)
- chat_state.py: remove run_chat_loop_with_state_containers (113-line bus-poll loop)
- process_message.py: add ChatTurnSetup dataclass and build_chat_turn(); rewrite
  _stream_chat_turn + _run_models around the merged queue; single-model (N=1)
  path is fully backwards-compatible
- placement.py, override_models.py: add docstrings; LLMOverride gains display_name
- research_agent.py, custom_tool.py: update Emitter call sites
- test_emitter.py: new unit tests for queue routing, model_index tagging, placement

Frontend 429 fix
- lib.tsx: parse response body for human-readable detail on non-2xx responses
  instead of "HTTP error! status: 429"
- useChatController.ts: surface stack.error after the FIFO drain loop exits so
  the catch block replaces the thinking placeholder with an error message
2026-03-30 22:18:48 -07:00
dependabot[bot]
0b9d154a73 chore(deps): bump runs-on/cache from 50350ad4242587b6c8c2baa2e740b1bc11285ff4 to a5f51d6f3fece787d03b7b4e981c82538a0654ed (#9763)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-30 13:54:43 -07:00
dependabot[bot]
6e65e55bf5 chore(deps): bump actions/cache from 5.0.3 to 5.0.4 (#9765)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-30 13:46:53 -07:00
Raunak Bhagat
3f9e208759 feat(opal): SelectCard + CardHeaderLayout (#9760) 2026-03-30 19:54:54 +00:00
dependabot[bot]
fb8edda14a chore(deps): bump pygments from 2.19.2 to 2.20.0 (#9757)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-30 18:30:18 +00:00
Jamison Lahman
58decd8a6b chore(gha): prefer ci-protected env (#9728) 2026-03-30 17:20:54 +00:00
Danelegend
e97204d9cc feat(indexing): Batch chunks during doc processing (#9468) 2026-03-30 11:49:36 +00:00
Danelegend
44ab02c94f refactor(indexing): Refactor indexing vector db abstraction (#9653) 2026-03-30 09:57:16 +00:00
Danelegend
a98cc30f25 refactor(indexing): Change adapters to support iterables (#9469) 2026-03-30 01:43:10 +00:00
Danelegend
a709dcb8fa feat(indexing): Max chunk processing (#9400) 2026-03-30 00:10:24 +00:00
Raunak Bhagat
a3dfe6aa1b refactor(opal): unify Interactive color system (#9717) 2026-03-28 00:40:23 +00:00
Nikolas Garza
23e4d55fb1 perf(swr): convert raw-fetch hooks to SWR to eliminate duplicate requests (#9694) 2026-03-28 00:26:20 +00:00
Jamison Lahman
470cc85f83 feat(cli): onyx-cli serve over SSH (#9726) 2026-03-27 23:46:14 +00:00
Justin Tahara
64d9be5a41 fix(openpyxl): Colors must be aRGB hex values (#9727) 2026-03-27 23:14:36 +00:00
roshan
71a5b469b0 feat(widget): add citation badges to chat widget (#9714) 2026-03-27 22:39:46 +00:00
Evan Lohn
462eb0697f fix: Anthropic litellm thinking workaround (#9713) 2026-03-27 21:03:05 +00:00
dependabot[bot]
b708dc8796 chore(deps): bump langchain-core from 1.2.11 to 1.2.22 (#9720)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-27 20:50:19 +00:00
dependabot[bot]
c9e2c32f55 chore(deps): bump cryptography from 46.0.5 to 46.0.6 (#9721)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-27 20:48:59 +00:00
Jamison Lahman
d725df62e7 feat(cli): --version and validate-config warn if backend version is incompatible (#9715) 2026-03-27 13:13:16 -07:00
Jamison Lahman
d1460972b6 fix(cli): onyx-cli --version interpolation (#9712) 2026-03-27 19:22:31 +00:00
Jamison Lahman
706872f0b7 chore(deps): upgrade go deps (#9711) 2026-03-27 12:24:25 -07:00
Jamison Lahman
ed3856be2b chore(release): build all CLI wheels before publishing (#9710) 2026-03-27 19:04:02 +00:00
Jamison Lahman
6326c7f0b9 chore(gha): fix git error after helm release migration to alpine base image (#9709) 2026-03-27 11:21:34 -07:00
Jamison Lahman
40420fc4e6 chore(gha): helm release upstream nits (#9708) 2026-03-27 11:10:41 -07:00
Nikolas Garza
1a2b6a66cc fix(celery): use broker connection pool to prevent Redis connection leak (#9682) 2026-03-27 17:53:49 +00:00
Jamison Lahman
d1b1529ccf chore(gha): fix helm release after image update (#9707) 2026-03-27 10:37:43 -07:00
Bo-Onyx
fedd9c76e5 feat(hook): admin page create or edit hook (#9690) 2026-03-27 17:10:14 +00:00
Jamison Lahman
0b34b40b79 chore(gha): pin helm release docker image (#9706) 2026-03-27 10:16:41 -07:00
Yuhong Sun
fe82ddb1b9 Update README.md (#9703) 2026-03-27 10:03:56 -07:00
Jamison Lahman
32d3d70525 chore(playwright): deflake settings_pages.spec.ts (#9684) 2026-03-27 15:54:23 +00:00
Jamison Lahman
40b9e10890 chore(devtools): upgrade ods: 0.7.1->0.7.2 (#9701) 2026-03-27 08:17:42 -07:00
dependabot[bot]
e21b204b8a chore(deps): bump brace-expansion in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#9698)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-27 08:10:15 -07:00
Jamison Lahman
2f672b3a4f fix(fe): Popover content doesnt overflow on small screens (#9612) 2026-03-27 08:07:52 -07:00
Nikolas Garza
cf19d0df4f feat(helm): add Prometheus metrics ports and Services for celery workers (#9630) 2026-03-27 08:03:48 +00:00
Danelegend
86a6a4c134 refactor(indexing): Vespa & Opensearch index function use Iterable (#9384) 2026-03-27 04:36:59 +00:00
SubashMohan
146b5449d2 feat: configurable file upload size and token limits via admin settings (#9232) 2026-03-27 04:23:16 +00:00
Jamison Lahman
b66991b5c5 chore(devtools): ods trace (#9688)
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
2026-03-27 03:56:38 +00:00
dependabot[bot]
9cb76dc027 chore(deps-dev): bump picomatch from 2.3.1 to 2.3.2 in /web (#9691)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-27 02:22:22 +00:00
dependabot[bot]
f66891d19e chore(deps-dev): bump handlebars from 4.7.8 to 4.7.9 in /web (#9689)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-27 01:41:29 +00:00
Nikolas Garza
c07c952ad5 chore(greptile): add nginx routing rule for non-api backend routes (#9687) 2026-03-27 00:34:15 +00:00
Nikolas Garza
be7f40a28a fix(nginx): route /scim/* to api_server (#9686) 2026-03-26 17:21:57 -07:00
Evan Lohn
26f941b5da perf: perm sync start time (#9685) 2026-03-27 00:07:53 +00:00
Jamison Lahman
b9e84c42a8 feat(providers): allow deleting all types of providers (#9625)
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
2026-03-26 15:20:56 -07:00
Bo-Onyx
0a1df52c2f feat(hook): Hook Form Modal Polish. (#9683) 2026-03-26 22:12:33 +00:00
Nikolas Garza
306b0d452f fix(billing): retry claimLicense up to 3x after Stripe checkout return (#9669) 2026-03-26 21:06:19 +00:00
Justin Tahara
5fdb34ba8e feat(llm): add Bifrost gateway frontend modal and provider registration (#9617) 2026-03-26 20:50:25 +00:00
Jamison Lahman
2d066631e3 fix(voice): dont soft-delete providers (#9679) 2026-03-26 19:26:32 +00:00
Evan Lohn
5c84f6c61b fix(jira): large batches fail json decode (#9677) 2026-03-26 18:53:37 +00:00
Nikolas Garza
899179d4b6 fix(api-key): clarify upgrade message for trial accounts (#9678) 2026-03-26 18:32:41 +00:00
Bo-Onyx
80d6bafc74 feat(hook): Hook connect/manage modal (#9645) 2026-03-26 18:16:33 +00:00
Nikolas Garza
2cc325cb0e chore(greptile): split greptile.json into .greptile/ directory (#9668) 2026-03-26 17:05:43 +00:00
Raunak Bhagat
849385b756 refactor: migrate legacy components/Text (#9628) 2026-03-26 16:14:03 +00:00
Ben Wu
417b9c12e4 feat(canvas): add API client, data models, and connector scaffold 1/6 (#9385)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 15:26:52 +00:00
Raunak Bhagat
30b37d0a77 fix(admin): wrap system prompt modal in Formik with markdown subDescription (#9667) 2026-03-26 07:08:56 -07:00
Justin Tahara
b48be0cd3a feat(llm): add Bifrost gateway as LLM provider (backend) (#9616) 2026-03-26 05:09:20 +00:00
Nikolas Garza
127fd90424 fix(metrics): replace inspect.ping() with event-based worker health monitoring (#9633) 2026-03-26 03:36:07 +00:00
Raunak Bhagat
f9c9e55f32 refactor(opal): accept string | RichStr in all Opal text-rendering components, modals, and input-layouts (#9656) 2026-03-26 02:46:34 +00:00
Raunak Bhagat
5afcf1acea fix(opal): remove gap between title and description in ContentMd (#9666) 2026-03-25 19:45:21 -07:00
Nikolas Garza
eb1244a9d7 feat(chat): add DB schema and Pydantic models for multi-model answers (#9646) 2026-03-26 02:21:00 +00:00
Evan Lohn
2433a9a4c5 feat: sharepoint filters (denylist) (#9649) 2026-03-26 01:33:18 +00:00
dependabot[bot]
60bc8fcac6 chore(deps): bump nltk from 3.9.3 to 3.9.4 (#9663)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-26 00:50:52 +00:00
dependabot[bot]
1ddc958a51 chore(deps): bump picomatch in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#9662)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-25 17:54:28 -07:00
acaprau
de37acbe07 chore(opensearch): Optimize terms filters; add type aliases (#9619) 2026-03-26 00:35:53 +00:00
Wenxi
08cd2f2c3e fix(ci): tag web-server and model-server with craft-latest (#9661)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 00:35:37 +00:00
acaprau
fc29f20914 feat(opensearch): Add Prometheus metrics for OpenSearch retrieval (#9654) 2026-03-26 00:29:29 +00:00
dependabot[bot]
c43cb80a7a chore(deps): bump yaml from 1.10.2 to 1.10.3 in /web (#9655)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-25 23:59:17 +00:00
dependabot[bot]
56f0be2ec8 chore(deps): bump requests from 2.32.5 to 2.33.0 (#9652)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-25 23:59:00 +00:00
acaprau
42f9ddf247 feat(opensearch): Search UI search flow can be configured to use pure keyword search (#9500)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
2026-03-25 23:56:32 +00:00
dependabot[bot]
a10a85c73c chore(deps-dev): bump picomatch from 4.0.3 to 4.0.4 in /widget (#9659)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-25 17:01:38 -07:00
Jamison Lahman
31d8ae9718 chore(playwright): rework admin navigation tests (#9650)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-25 23:27:08 +00:00
Nikolas Garza
00a0a99842 fix: clarify service account API key upgrade message for trial accounts (#9581) 2026-03-25 23:22:45 +00:00
dependabot[bot]
90040f8973 chore(deps): bump picomatch in /examples/widget (#9651)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-25 16:20:36 -07:00
Raunak Bhagat
4f5d081f26 feat(opal): add Text component with inline markdown support (#9623) 2026-03-25 23:06:18 +00:00
dependabot[bot]
c51a6dbd0d chore(deps): bump pypdf from 6.9.1 to 6.9.2 (#9637)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-25 23:04:27 +00:00
Evan Lohn
8b90ecc189 feat: sharepoint shareable links non-public (#9636) 2026-03-25 22:50:29 +00:00
Justin Tahara
865c893a09 chore(agents): Match Mocks & Add Date Validation (#9632) 2026-03-25 21:57:31 +00:00
Bo-Onyx
ef5628bfa7 feat(hook): Frontend hook infrastructure (#9634) 2026-03-25 21:38:04 +00:00
Jessica Singh
6ffee0021e chore(voice): align fe with other admin pages (#9505) 2026-03-25 20:00:36 +00:00
Jessica Singh
28dc84b831 fix(notion): upgrade API version + logical changes (#9609)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 19:18:51 +00:00
Jamison Lahman
230f035500 fix(chat): dont clear input message after errors submitting (#9624) 2026-03-25 12:00:23 -07:00
Jamison Lahman
55b24d72b4 fix(fe): redirect to status page after deleting connector (#9620) 2026-03-25 17:24:41 +00:00
Raunak Bhagat
3321a84c7d fix(sidebar): fix icon alignment for user-avatar-popover (#9615)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-25 17:07:50 +00:00
SubashMohan
54bf32a5f8 fix: use persisted source functions when toggling search tool (#9548) 2026-03-25 16:50:25 +00:00
Nikolas Garza
4bb6b76be6 feat(groups): switchover to /admin/groups and rewrite e2e tests (#9545) 2026-03-25 08:11:13 +00:00
SubashMohan
db94562474 feat: Group-based permissions — Phase 1 schema (AccountType, Permission, PermissionGrant) (#9547) 2026-03-25 06:24:43 +00:00
Nikolas Garza
582d4642c1 feat(metrics): add task lifecycle and per-connector Prometheus metrics (#9602) 2026-03-25 06:02:43 +00:00
Nikolas Garza
3caaecdb0e feat(groups): polish edit page table and delete UX (#9544) 2026-03-25 04:57:50 +00:00
Nikolas Garza
039b69806b feat(metrics): add queue depth and connector health Prometheus collectors (#9590) 2026-03-25 03:53:26 +00:00
Evan Lohn
63971d4958 fix: confluence client retries (#9605) 2026-03-25 03:32:29 +00:00
Nikolas Garza
ffd897f380 feat(metrics): add reusable Prometheus metrics server for celery workers (#9589) 2026-03-25 01:47:06 +00:00
Evan Lohn
4745069232 fix: no more lazy queries per search call (#9578) 2026-03-25 01:38:35 +00:00
Nikolas Garza
386782f188 feat(groups): add edit group page (#9543) 2026-03-25 01:22:57 +00:00
Raunak Bhagat
ff009c4129 fix: Fix tag widths (#9618) 2026-03-25 01:18:51 +00:00
Bo-Onyx
b20a5ebf69 feat(hook): Add frontend feature control and admin hook page (#9575) 2026-03-25 00:37:37 +00:00
Bo-Onyx
8645adb807 fix(width): UI update model width definition. (#9613) 2026-03-25 00:11:32 +00:00
Nikolas Garza
2425bd4d8d feat(groups): add shared resources and token limit sections (#9538) 2026-03-24 23:44:44 +00:00
Raunak Bhagat
333b2b19cb refactor: fix sidebar layout (#9601)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-24 23:22:00 +00:00
Jamison Lahman
44895b3bd6 fix(ux): disable MCP Tools toggle if needs authenticated (#9607) 2026-03-24 22:45:23 +00:00
Raunak Bhagat
78c2ecf99f refactor(opal): restructure Onyx logo icons into composable parts (#9606)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-24 22:26:28 +00:00
Ciaran Sweet
e3e0e04edc fix: update values.yaml comment for opensearch admin password secretKeyRef (#9595) 2026-03-24 21:54:03 +00:00
Justin Tahara
a19fe03bd8 fix(ui): Text focused paste from PowerPoint (#9603) 2026-03-24 21:23:58 +00:00
Nikolas Garza
415c05b5f8 feat(groups): add create group page (#9515)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 20:55:18 +00:00
Nikolas Garza
352fd19f0a feat(admin): inline group renaming (#9491) 2026-03-24 20:12:17 +00:00
Raunak Bhagat
41ae039bfa refactor(opal): cleanup button types in Opal (#9598) 2026-03-24 20:06:39 +00:00
Bo-Onyx
782c734287 feat(hook): integrate query processing hook point (#9533) 2026-03-24 19:47:17 +00:00
Justin Tahara
728cdb0715 feat(helm): Adding pginto specific host (#9600) 2026-03-24 19:31:02 +00:00
Justin Tahara
baf6437117 fix(mt): Preprovision all tenants at once (#9576) 2026-03-24 19:13:10 +00:00
Raunak Bhagat
f187165077 refactor(opal): opalify FilterButton + migrate all instances away from old one (#9597) 2026-03-24 19:12:00 +00:00
Evan Lohn
727be3d663 fix: eager load chat session persona (#9577) 2026-03-24 19:03:57 +00:00
Jessica Singh
98c8f9884b fix(voice): add WebSocket upgrade headers to nginx configs (#9558)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 18:53:39 +00:00
Jamison Lahman
d79a068984 fix(fe): settings page layout shift on load (#9594) 2026-03-24 18:15:54 +00:00
Wenxi
ba0740d15f fix(fe): map snake_case auth type API response to camelCase (#9586)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 18:01:51 +00:00
Jamison Lahman
86b7bed90b chore(gha): basic test selection for external deps and connector tests (#9596)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-24 10:52:40 -07:00
Jamison Lahman
aead6ab9a5 fix(fe): properly style "Sign In" button (#9480) 2026-03-24 10:02:02 -07:00
Jamison Lahman
c9d4c186dd chore(blame): ignore ruff formatting change (#9345) 2026-03-24 10:01:23 -07:00
Jamison Lahman
70aad1ec46 fix(fe): editing an LLM provider uses the global default model (#9502) 2026-03-24 10:01:07 -07:00
Wenxi
ca3cc16ead fix(fe): stop SWR retry spam and spurious logout on auth pages (#9587) 2026-03-24 16:53:56 +00:00
Jamison Lahman
9ea1780ce5 chore(fe): memory input defaults to 1 row with max of 3 (#9563) 2026-03-24 09:35:47 -07:00
Jamison Lahman
f70e5e605e feat(ux): handle when chat session id cannot be found (#9524) 2026-03-24 16:18:52 +00:00
Jamison Lahman
84b134e226 fix(a11y): hidden buttons appear on tabbing (#9518) 2026-03-24 16:18:11 +00:00
Raunak Bhagat
b17c63a7d6 feat(admin): refresh agents page with DataTable and opal components (#9376) 2026-03-24 16:04:10 +00:00
Jamison Lahman
76c41d1b0b chore(fe): always load an empty memory card (#9560) 2026-03-24 15:47:12 +00:00
Jamison Lahman
579b86f1ce chore(fe): memories save after pressing enter (#9553)
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
2026-03-24 15:42:36 +00:00
Jamison Lahman
a53cf13db1 chore(fe): position memories modal at the top (#9554) 2026-03-24 15:42:32 +00:00
Danelegend
06f76bac0c chore(indexing): Add tests for DocumentIndex index function (#9477) 2026-03-24 03:02:25 +00:00
Evan Lohn
e59b0c0d23 refactor: filter fields (#9574) 2026-03-24 02:32:46 +00:00
Evan Lohn
dfa37cce8b chore: use efficient persona id query path (#9573) 2026-03-24 01:49:20 +00:00
Jamison Lahman
6dce6b09e4 chore(playwright): mask date switcher in screenshots (#9584) 2026-03-24 01:39:16 +00:00
acaprau
0eba41c487 chore(opensearch, devtools): Generate embedding script (#9580) 2026-03-24 01:18:38 +00:00
acaprau
a426930123 chore(opensearch, devtools): Benchmarking script (#9579) 2026-03-24 00:35:18 +00:00
Jamison Lahman
73d98c7fa5 fix(ux): display invalid agent fields on load (#9582) 2026-03-24 00:22:15 +00:00
Justin Tahara
a096cf3997 feat(tf): Introduce Opensearch Terraform for AWS (#9523) 2026-03-24 00:16:52 +00:00
Justin Tahara
1e01ff8f10 fix(migration): Fix duplicate Null Users issue (#9568)
Co-authored-by: Jessica Singh <jessicasingh@outlook.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 00:01:56 +00:00
Jamison Lahman
3665bb23c2 chore(fe): action popover item UX (#8831) 2026-03-23 23:29:33 +00:00
Raunak Bhagat
8ff0d5fc15 refactor: update names in Persona table (#9569) 2026-03-23 22:20:47 +00:00
Wenxi
645d45776a fix: alias anonymous ph users with registered users (#9570) 2026-03-23 20:51:15 +00:00
Jamison Lahman
fa06e4ebd5 fix(ux): give a tooltip with reason agent edit cannot save (#9571) 2026-03-23 20:46:03 +00:00
Wenxi
2a61e3ce4c refactor: update auth paths to use onyx error and correctly pass error detail to auth error page (#9565)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-23 20:15:29 +00:00
dependabot[bot]
d8733dd89f chore(deps-dev): bump flatted from 3.3.3 to 3.4.2 in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#9535)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-23 12:48:01 -07:00
Jamison Lahman
c651177529 chore(fe): remove Inter font (#9566) 2026-03-23 19:16:27 +00:00
Wenxi
3193fe76e4 chore: don't allow periods in gmail signup on cloud (#9564)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-23 19:09:20 +00:00
Wenxi
b9025c57f6 chore: web connector wait for page networkidle before continuing (#9556) 2026-03-23 18:16:43 +00:00
dependabot[bot]
1b0d62c16e chore(deps): bump actions/setup-node from 6.2.0 to 6.3.0 (#9561)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-23 11:19:18 -07:00
dependabot[bot]
7a0c977eb7 chore(deps): bump astral-sh/setup-uv from 7.3.1 to 7.6.0 (#9562)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-23 11:19:08 -07:00
dependabot[bot]
a9c04dca89 chore(deps): bump docker/bake-action from 6.10.0 to 7.0.0 (#9559)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-23 11:18:54 -07:00
Nikolas Garza
8d76a95c86 feat(fe): migrate root layout from SSR to CSR with SWR data fetching (#9529) 2026-03-23 18:03:55 +00:00
Raunak Bhagat
2a69561ec5 feat(opal): add SvgStarOff icon (#9555) 2026-03-23 17:45:40 +00:00
Evan Lohn
e1655426d6 chore: [mirror of #9267] pass OAuthClientProvider to call_mcp_tool for automatic token refresh (#9414)
Co-authored-by: Fizza-Mukhtar <fizzamukhtar01@gmail.com>
2026-03-23 17:34:18 +00:00
Evan Lohn
9634266bc6 fix: last index time consistency (#9546) 2026-03-23 17:20:11 +00:00
Raunak Bhagat
0c962d882d fix(opal): reduce table qualifier icon sizes (#9552) 2026-03-23 17:17:05 +00:00
Raunak Bhagat
4bf3edf83b fix(opal): remove table-layout fixed, right-align actions column (#9551) 2026-03-23 16:47:38 +00:00
dependabot[bot]
c48a77c644 chore(deps-dev): bump flatted from 3.3.3 to 3.4.2 in /examples/widget (#9550)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-23 09:14:07 -07:00
dependabot[bot]
26d70ab16b chore(deps-dev): bump flatted from 3.4.1 to 3.4.2 in /web (#9539)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-23 09:07:24 -07:00
Raunak Bhagat
f8a2f3ac93 chore(fe): clean up table qualifier API and internals (#9528) 2026-03-23 07:58:36 +00:00
Evan Lohn
5186356a26 fix: windows install improvements (#9542) 2026-03-22 20:59:35 +00:00
Jamison Lahman
7b826e2a4e chore(fe): auto-focus clicked memory, improve action hover style (#9532) 2026-03-22 19:16:10 +00:00
Justin Tahara
c175dc8f6a fix(mt): Tenant Provisioning Fixes (#9541) 2026-03-22 17:50:00 +00:00
Raunak Bhagat
aa11813cc0 feat: UserAvatar (#9527) 2026-03-21 02:05:00 +00:00
Evan Lohn
6235f49b49 fix: csv test with newlines (#9534) 2026-03-21 01:30:11 +00:00
Evan Lohn
fd6a110794 feat: installer invocable from other bash script (#9531) 2026-03-21 01:18:20 +00:00
Jamison Lahman
bd42c459d6 chore(fe): update memories dropdown padding (#9526) 2026-03-20 23:38:32 +00:00
Danelegend
aede532e63 fix(chat): Cache plaintext file results (#9511) 2026-03-20 23:21:12 +00:00
Evan Lohn
068ac543ad fix: deadlock in multitenant test (#9530) 2026-03-20 23:05:20 +00:00
Bo-Onyx
30e7a831a5 feat(hook): Add hook management API (#9513)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 22:53:59 +00:00
Evan Lohn
276261c96d fix: windows installer (#9507) 2026-03-20 22:53:46 +00:00
Bo-Onyx
205f1410e4 chore(hook): Hook executor. (#9467) 2026-03-20 22:47:01 +00:00
Bo-Onyx
a93d154c27 feat(hook): improve on hook point definition (#9522) 2026-03-20 22:20:42 +00:00
Jamison Lahman
1361879bd0 fix(fe): clicking outside chat area keeps chat input focused (#9521) 2026-03-20 19:22:11 +00:00
Justin Tahara
c58cc320b2 feat(tf): Port over WAF updates (#9520) 2026-03-20 18:45:09 +00:00
Jamison Lahman
461350958a fix(fe): dim project name in sidebar color (#9519) 2026-03-20 17:47:49 +00:00
Raunak Bhagat
50dde0be1a chore: edit AGENTS.md and CLAUDE.md files (#9486) 2026-03-20 00:59:30 +00:00
acaprau
199e1df453 feat(opensearch): Add functions for keyword and semantic retrieval (#9479)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
2026-03-20 00:48:01 +00:00
Justin Tahara
996b674840 feat(backend): Adding procps (#9509) 2026-03-19 23:26:36 +00:00
Justin Tahara
5413723ccc feat(ods): Rerun run-ci workflow (#9501) 2026-03-19 22:11:59 +00:00
Evan Lohn
9660056a51 fix: drive rate limit retry (#9498) 2026-03-19 21:32:08 +00:00
Fizza Mukhtar
3105177238 fix(llm): don't send tool_choice when no tools are provided (#9224) 2026-03-19 21:26:46 +00:00
Evan Lohn
24bb4bda8b feat: windows installer and install improvements (#9476) 2026-03-19 20:47:44 +00:00
Raunak Bhagat
9532af4ceb chore: move Hoverable story (#9495) 2026-03-19 20:40:27 +00:00
Jamison Lahman
0a913f6af5 fix(fe): fix memories immediately losing focus on click (#9493) 2026-03-19 20:15:34 +00:00
Justin Tahara
fe30c55199 fix(code interpreter): Caching files (#9484) 2026-03-19 19:32:37 +00:00
Jamison Lahman
2cf0a65dd3 chore(fe): reduce padding on elements at the bottom of modal headers (#9488) 2026-03-19 19:27:37 +00:00
Nikolas Garza
659416f363 feat(admin): groups page - list page and group cards (#9453) 2026-03-19 18:23:15 +00:00
Raunak Bhagat
40aecbc4b9 refactor(fe): move table to opal, update size API (#9438) 2026-03-19 17:23:41 +00:00
Jamison Lahman
710b39074f chore(fe): remove opal-button* class names (#9471) 2026-03-19 02:15:00 +00:00
acaprau
8fe2f67d38 chore(opensearch): Allow disabling match highlights via env var; default to disabled (#9436) 2026-03-19 00:43:17 +00:00
Justin Tahara
f00aaf9fc0 fix(agents): Agents are Private by Default (#9465) 2026-03-19 00:01:46 +00:00
Bo-Onyx
5b2426b002 chore(hooks): Define Hook Point in the backend (#9391) 2026-03-18 23:43:26 +00:00
Justin Tahara
ba6ab0245b fix(celery): add dedup guardrails to user file delete queue (#9454)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-18 23:38:52 +00:00
Justin Tahara
b64ebb57e1 fix(logging): extract LiteLLM error details in image summarization failures (#9458) 2026-03-18 23:29:04 +00:00
Justin Tahara
2fcfdbabde fix(celery): add task expiry to upload API send_task call (#9456)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-18 23:17:08 +00:00
Justin Tahara
ea1a2749c1 fix(image): add diagnostic logging to vision model selection (#9460) 2026-03-18 22:06:56 +00:00
Justin Tahara
73c4e22588 fix(image): stop dumping base64 image data into error logs (#9457) 2026-03-18 21:43:55 +00:00
Jamison Lahman
fceaac6e13 fix(fe): make indexing attempt error rows click to show trace (#9463)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-18 21:38:53 +00:00
Jamison Lahman
e8bf45cfd2 feat(fe): "Full Exception Trace" modal uses CodePreview rendering (#9464)
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
2026-03-18 21:04:55 +00:00
Bo-Onyx
13ff648fcd chore(hooks): Add Celery task to remove hook running records older than 30 days (#9433) 2026-03-18 21:03:01 +00:00
Jamison Lahman
ae8268afb1 fix(fe): truncate connector names in table (#9459) 2026-03-18 20:59:49 +00:00
acaprau
b338bd9e97 feat(opensearch): Can override number of shards and replicas via env var (#9431) 2026-03-18 20:16:05 +00:00
acaprau
0dcc90a042 fix(opensearch): Exclude retrieving vectors during hybrid and random search (#9430) 2026-03-18 20:13:12 +00:00
Jamison Lahman
0f6a6693d3 fix(fe): truncate project name in sidebar button (#9462) 2026-03-18 20:06:09 +00:00
Jamison Lahman
e32cc450b2 fix(fe): update connector indexing error modal (#9426)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-18 11:57:28 -07:00
Jamison Lahman
732fb71edf chore(tests): unit tests for pdf processing (#9452)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-18 18:31:37 +00:00
dependabot[bot]
ca3320c0e0 chore(deps): bump pypdf from 6.8.0 to 6.9.1 (#9450)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-18 17:52:50 +00:00
Jamison Lahman
d7c554aca7 chore(ruff): fix and enable S324 (#9451) 2026-03-18 17:26:29 +00:00
dependabot[bot]
69e5c19695 chore(deps): bump next from 16.1.5 to 16.1.7 in /examples/widget (#9425)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-18 09:25:27 -07:00
Nikolas Garza
b4ce1c7a97 chore: bump next to 16.1.7 (#9423) 2026-03-18 09:22:40 -07:00
Jamison Lahman
cd64a91154 fix(fe): display name on attachment file card hover (#9446) 2026-03-18 16:13:21 +00:00
Danelegend
c282cdc096 fix(file upload): Allow zip file upload via query param (#9432) 2026-03-18 07:32:07 +00:00
Jamison Lahman
b1de1c59b6 chore(playwright): projects screenshot is main container only (#9440) 2026-03-18 05:35:30 +00:00
acaprau
64d484039f chore(opensearch): Disable test_update_single_can_clear_user_projects_and_personas (#9434) 2026-03-18 00:40:29 +00:00
Jamison Lahman
0530095b71 fix(fe): replace users table buttons with LineItems (#9435) 2026-03-17 23:45:15 +00:00
acaprau
23280d5b91 fix(opensearch): Fix env var mismatch issue with configuring subquery results; set default to 100 (#9428) 2026-03-17 16:01:45 -07:00
Bo-Onyx
229442679c chore(hooks): Add db CRUD (#9411) 2026-03-17 22:36:50 +00:00
Jamison Lahman
95a192fb0f chore(devtools): upgrade ods: 0.7.0->0.7.1 (#9429) 2026-03-17 15:18:21 -07:00
Yuhong Sun
6bd96ec906 chore: Scripts for search quality eval (#9421) 2026-03-17 14:53:32 -07:00
Jamison Lahman
a1ec88269f chore(docker): configurable api_server resource limits (#9424)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-17 14:52:21 -07:00
Raunak Bhagat
b929518c34 refactor: update size variant and dimension names in opal components (#9416)
Co-authored-by: Nikolas Garza <90273783+nmgarza5@users.noreply.github.com>
2026-03-17 21:43:19 +00:00
Justin Tahara
479220e774 chore(opensearch): Make Password Default Empty (#9415) 2026-03-17 21:41:08 +00:00
dependabot[bot]
d3e0acf905 chore(deps): bump pyasn1 from 0.6.2 to 0.6.3 (#9417)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-17 21:28:07 +00:00
acaprau
cbd1a344f2 feat(opensearch): Configure hybrid search subquery groups and pipelines via env var (#9407) 2026-03-17 21:25:53 +00:00
Jamison Lahman
e79264b69b chore(fe): prefer INTERNAL_URL (#9419) 2026-03-17 14:17:05 -07:00
Justin Tahara
1e0a8e9a0e fix(llm): surface masked OpenAI quota failures (#9308) 2026-03-17 21:11:43 +00:00
Jamison Lahman
b7841a513d chore(devtools): ods backend scans for available ports (#9418) 2026-03-17 14:09:23 -07:00
dependabot[bot]
c779bf722d chore(deps): bump next from 16.1.5 to 16.1.7 in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#9420)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-17 14:03:51 -07:00
Jamison Lahman
a5aff0d199 chore(fe): rm dead stackTraceModalContent (#9412) 2026-03-17 20:42:29 +00:00
Wenxi
8ed170b070 fix: make ConnectorCredentialPair name required (#9408)
Co-authored-by: Ciaran Sweet <ciaran@developmentseed.org>
2026-03-17 18:54:34 +00:00
Raunak Bhagat
c890cd4767 feat(llm-config): replace AdvancedOptions with unified ModelsAccessField (#9270)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 18:19:52 +00:00
Nikolas Garza
2b2df18463 fix(vespa): use weightedSet for ACL filters to prevent query failures (#9403) 2026-03-17 17:21:13 +00:00
Bo-Onyx
11cfc92f15 chore(hook): DB changes (#9337) 2026-03-17 01:04:06 +00:00
Jamison Lahman
c7da99cfd7 chore(playwright): make project name human-readable (#9394) 2026-03-16 17:26:20 -07:00
Jamison Lahman
b384c77863 chore(fe): admin navigation always goes to LLM config page (#9395) 2026-03-16 17:15:50 -07:00
Raunak Bhagat
b0f31cd46b fix(search-ui): center pagination in SearchUI (#9396) 2026-03-16 23:59:17 +00:00
Jamison Lahman
323eb9bbba chore(fe): make sidebar scrollbar flush with edge (#9383)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-16 23:33:40 +00:00
Raunak Bhagat
708e310849 refactor: refreshed Pagination component (#9380) 2026-03-16 23:14:59 +00:00
Wenxi
c25509e212 chore: run identify from backend (#9392) 2026-03-16 23:12:10 +00:00
Nikolas Garza
6af0da41bd test(admin): add E2E Playwright tests for Users page (#9266) 2026-03-16 21:41:24 +00:00
Evan Lohn
b94da25d7c chore: update install script (#9068) 2026-03-16 21:29:56 +00:00
Jamison Lahman
7d443c1b53 chore(ws): ignore port when determining origin in dev (#9382) 2026-03-16 21:24:34 +00:00
Justin Tahara
d6b7b3c68f fix(celery): Limiting connector_hierarchy_fetching jobs (#9381) 2026-03-16 21:14:04 +00:00
789 changed files with 48888 additions and 22995 deletions

View File

@@ -6,3 +6,4 @@
3134e5f840c12c8f32613ce520101a047c89dcc2 # refactor(whitespace): rm temporary react fragments (#7161)
ed3f72bc75f3e3a9ae9e4d8cd38278f9c97e78b4 # refactor(whitespace): rm react fragment #7190
7b927e79c25f4ddfd18a067f489e122acd2c89de # chore(format): format files where `ruff` and `black` agree (#9339)

View File

@@ -44,7 +44,7 @@ jobs:
fetch-tags: true
- name: Setup uv
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
with:
version: "0.9.9"
enable-cache: false
@@ -165,7 +165,7 @@ jobs:
fetch-depth: 0
- name: Setup uv
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
with:
version: "0.9.9"
# NOTE: This isn't caching much and zizmor suggests this could be poisoned, so disable.
@@ -307,7 +307,7 @@ jobs:
xdg-utils
- name: setup node
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v6.2.0
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v6.3.0
with:
node-version: 24
package-manager-cache: false
@@ -615,6 +615,7 @@ jobs:
tags: |
type=raw,value=${{ needs.determine-builds.outputs.is-test-run == 'true' && format('web-{0}', needs.determine-builds.outputs.sanitized-tag) || github.ref_name }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-latest == 'true' && 'latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-latest == 'true' && 'craft-latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && env.EDGE_TAG == 'true' && 'edge' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-beta == 'true' && 'beta' || '' }}
@@ -1263,8 +1264,6 @@ jobs:
latest=false
tags: |
type=raw,value=craft-latest
# TODO: Consider aligning craft-latest tags with regular backend builds (e.g., latest, edge, beta)
# to keep tagging strategy consistent across all backend images
- name: Create and push manifest
env:
@@ -1488,6 +1487,7 @@ jobs:
tags: |
type=raw,value=${{ needs.determine-builds.outputs.is-test-run == 'true' && format('model-server-{0}', needs.determine-builds.outputs.sanitized-tag) || github.ref_name }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-latest == 'true' && 'latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-latest == 'true' && 'craft-latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && env.EDGE_TAG == 'true' && 'edge' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-beta-standalone == 'true' && 'beta' || '' }}

View File

@@ -47,7 +47,8 @@ jobs:
done
- name: Publish Helm charts to gh-pages
uses: stefanprodan/helm-gh-pages@0ad2bb377311d61ac04ad9eb6f252fb68e207260 # ratchet:stefanprodan/helm-gh-pages@v1.7.0
# NOTE: HEAD of https://github.com/stefanprodan/helm-gh-pages/pull/43
uses: stefanprodan/helm-gh-pages@ad32ad3b8720abfeaac83532fd1e9bdfca5bbe27 # zizmor: ignore[impostor-commit]
with:
token: ${{ secrets.GITHUB_TOKEN }}
charts_dir: deployment/helm/charts

View File

@@ -35,6 +35,7 @@ jobs:
needs: [provider-chat-test]
if: failure() && github.event_name == 'schedule'
runs-on: ubuntu-slim
environment: ci-protected
timeout-minutes: 5
steps:
- name: Checkout

View File

@@ -114,7 +114,7 @@ jobs:
ref: main
- name: Install the latest version of uv
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"
@@ -183,6 +183,7 @@ jobs:
- cherry-pick-to-latest-release
if: needs.resolve-cherry-pick-request.outputs.should_cherrypick == 'true' && needs.resolve-cherry-pick-request.result == 'success' && needs.cherry-pick-to-latest-release.result == 'success'
runs-on: ubuntu-slim
environment: ci-protected
timeout-minutes: 10
steps:
- name: Checkout
@@ -232,6 +233,7 @@ jobs:
- cherry-pick-to-latest-release
if: always() && needs.resolve-cherry-pick-request.outputs.should_cherrypick == 'true' && (needs.resolve-cherry-pick-request.result == 'failure' || needs.cherry-pick-to-latest-release.result == 'failure')
runs-on: ubuntu-slim
environment: ci-protected
timeout-minutes: 10
steps:
- name: Checkout

View File

@@ -50,7 +50,7 @@ jobs:
persist-credentials: false
- name: Setup node
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f
with:
node-version: 24
cache: "npm" # zizmor: ignore[cache-poisoning]
@@ -63,7 +63,7 @@ jobs:
targets: ${{ matrix.target }}
- name: Cache Cargo registry and build
uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # zizmor: ignore[cache-poisoning]
uses: actions/cache@668228422ae6a00e4ad889ee87cd7109ec5666a7 # zizmor: ignore[cache-poisoning]
with:
path: |
~/.cargo/bin/

View File

@@ -7,6 +7,15 @@ on:
merge_group:
pull_request:
branches: [main]
paths:
- "backend/**"
- "pyproject.toml"
- "uv.lock"
- ".github/workflows/pr-external-dependency-unit-tests.yml"
- ".github/actions/setup-python-and-install-dependencies/**"
- ".github/actions/setup-playwright/**"
- "deployment/docker_compose/docker-compose.yml"
- "deployment/docker_compose/docker-compose.dev.yml"
push:
tags:
- "v*.*.*"

View File

@@ -28,7 +28,7 @@ jobs:
persist-credentials: false
- name: Setup node
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm" # zizmor: ignore[cache-poisoning] test-only workflow; no deploy artifacts

View File

@@ -272,7 +272,7 @@ jobs:
- name: Setup node
# zizmor: ignore[cache-poisoning] ephemeral runners; no release artifacts
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm" # zizmor: ignore[cache-poisoning]
@@ -284,7 +284,7 @@ jobs:
- name: Cache playwright cache
# zizmor: ignore[cache-poisoning] ephemeral runners; no release artifacts
uses: runs-on/cache@50350ad4242587b6c8c2baa2e740b1bc11285ff4 # ratchet:runs-on/cache@v4
uses: runs-on/cache@a5f51d6f3fece787d03b7b4e981c82538a0654ed # ratchet:runs-on/cache@v4
with:
path: ~/.cache/ms-playwright
key: ${{ runner.os }}-playwright-npm-${{ hashFiles('web/package-lock.json') }}
@@ -471,7 +471,7 @@ jobs:
- name: Install the latest version of uv
if: always()
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"
@@ -614,7 +614,7 @@ jobs:
- name: Setup node
# zizmor: ignore[cache-poisoning] ephemeral runners; no release artifacts
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm" # zizmor: ignore[cache-poisoning]
@@ -626,7 +626,7 @@ jobs:
- name: Cache playwright cache
# zizmor: ignore[cache-poisoning] ephemeral runners; no release artifacts
uses: runs-on/cache@50350ad4242587b6c8c2baa2e740b1bc11285ff4 # ratchet:runs-on/cache@v4
uses: runs-on/cache@a5f51d6f3fece787d03b7b4e981c82538a0654ed # ratchet:runs-on/cache@v4
with:
path: ~/.cache/ms-playwright
key: ${{ runner.os }}-playwright-npm-${{ hashFiles('web/package-lock.json') }}

View File

@@ -56,7 +56,7 @@ jobs:
- name: Cache mypy cache
if: ${{ vars.DISABLE_MYPY_CACHE != 'true' }}
uses: runs-on/cache@50350ad4242587b6c8c2baa2e740b1bc11285ff4 # ratchet:runs-on/cache@v4
uses: runs-on/cache@a5f51d6f3fece787d03b7b4e981c82538a0654ed # ratchet:runs-on/cache@v4
with:
path: .mypy_cache
key: mypy-${{ runner.os }}-${{ github.base_ref || github.event.merge_group.base_ref || 'main' }}-${{ hashFiles('**/*.py', '**/*.pyi', 'pyproject.toml') }}

View File

@@ -7,6 +7,13 @@ on:
merge_group:
pull_request:
branches: [main]
paths:
- "backend/**"
- "pyproject.toml"
- "uv.lock"
- ".github/workflows/pr-python-connector-tests.yml"
- ".github/actions/setup-python-and-install-dependencies/**"
- ".github/actions/setup-playwright/**"
push:
tags:
- "v*.*.*"

View File

@@ -31,6 +31,7 @@ jobs:
- runner=4cpu-linux-arm64
- "run-id=${{ github.run_id }}-model-check"
- "extras=ecr-cache"
environment: ci-protected
timeout-minutes: 45
env:
@@ -73,7 +74,7 @@ jobs:
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f
- name: Build and load
uses: docker/bake-action@5be5f02ff8819ecd3092ea6b2e6261c31774f2b4 # ratchet:docker/bake-action@v6
uses: docker/bake-action@82490499d2e5613fcead7e128237ef0b0ea210f7 # ratchet:docker/bake-action@v7.0.0
env:
TAG: model-server-${{ github.run_id }}
with:

View File

@@ -30,7 +30,7 @@ jobs:
- name: Setup Terraform
uses: hashicorp/setup-terraform@5e8dbf3c6d9deaf4193ca7a8fb23f2ac83bb6c85 # ratchet:hashicorp/setup-terraform@v4.0.0
- name: Setup node
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v6
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v6
with: # zizmor: ignore[cache-poisoning]
node-version: 22
cache: "npm"

View File

@@ -15,6 +15,7 @@ permissions:
jobs:
Deploy-Preview:
runs-on: ubuntu-latest
environment: ci-protected
timeout-minutes: 30
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd
@@ -22,7 +23,7 @@ jobs:
persist-credentials: false
- name: Setup node
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm"

View File

@@ -13,27 +13,20 @@ jobs:
permissions:
id-token: write
timeout-minutes: 10
strategy:
matrix:
os-arch:
- { goos: "linux", goarch: "amd64" }
- { goos: "linux", goarch: "arm64" }
- { goos: "windows", goarch: "amd64" }
- { goos: "windows", goarch: "arm64" }
- { goos: "darwin", goarch: "amd64" }
- { goos: "darwin", goarch: "arm64" }
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
- uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"
- run: |
GOOS="${{ matrix.os-arch.goos }}" \
GOARCH="${{ matrix.os-arch.goarch }}" \
uv build --wheel
for goos in linux windows darwin; do
for goarch in amd64 arm64; do
GOOS="$goos" GOARCH="$goarch" uv build --wheel
done
done
working-directory: cli
- run: uv publish
working-directory: cli

View File

@@ -26,7 +26,7 @@ jobs:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
- uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"

View File

@@ -25,6 +25,7 @@ permissions:
jobs:
Deploy-Storybook:
runs-on: ubuntu-latest
environment: ci-protected
timeout-minutes: 30
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v4
@@ -32,7 +33,7 @@ jobs:
persist-credentials: false
- name: Setup node
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm"
@@ -54,6 +55,7 @@ jobs:
needs: Deploy-Storybook
if: always() && needs.Deploy-Storybook.result == 'failure'
runs-on: ubuntu-latest
environment: ci-protected
timeout-minutes: 10
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v4

View File

@@ -9,6 +9,7 @@ on:
jobs:
sync-foss:
runs-on: ubuntu-latest
environment: ci-protected
timeout-minutes: 45
permissions:
contents: read

View File

@@ -11,6 +11,7 @@ permissions:
jobs:
create-and-push-tag:
runs-on: ubuntu-slim
environment: ci-protected
timeout-minutes: 45
steps:

View File

@@ -24,7 +24,7 @@ jobs:
persist-credentials: false
- name: Install the latest version of uv
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"

64
.greptile/config.json Normal file
View File

@@ -0,0 +1,64 @@
{
"labels": [],
"comment": "",
"fixWithAI": true,
"hideFooter": false,
"strictness": 3,
"statusCheck": true,
"commentTypes": [
"logic",
"syntax",
"style"
],
"instructions": "",
"disabledLabels": [],
"excludeAuthors": [
"dependabot[bot]",
"renovate[bot]"
],
"ignoreKeywords": "",
"ignorePatterns": "",
"includeAuthors": [],
"summarySection": {
"included": true,
"collapsible": false,
"defaultOpen": false
},
"excludeBranches": [],
"fileChangeLimit": 300,
"includeBranches": [],
"includeKeywords": "",
"triggerOnUpdates": true,
"updateExistingSummaryComment": true,
"updateSummaryOnly": false,
"issuesTableSection": {
"included": true,
"collapsible": false,
"defaultOpen": false
},
"statusCommentsEnabled": true,
"confidenceScoreSection": {
"included": true,
"collapsible": false
},
"sequenceDiagramSection": {
"included": true,
"collapsible": false,
"defaultOpen": false
},
"shouldUpdateDescription": false,
"rules": [
{
"scope": ["web/**"],
"rule": "In Onyx's Next.js app, the `app/ee/admin/` directory is a filesystem convention for Enterprise Edition route overrides — it does NOT add an `/ee/` prefix to the URL. Both `app/admin/groups/page.tsx` and `app/ee/admin/groups/page.tsx` serve the same URL `/admin/groups`. Hardcoded `/admin/...` paths in router.push() calls are correct and do NOT break EE deployments. Do not flag hardcoded admin paths as bugs."
},
{
"scope": ["web/**"],
"rule": "In Onyx, each API key creates a unique user row in the database with a unique `user_id` (UUID). There is a 1:1 mapping between API keys and their backing user records. Multiple API keys do NOT share the same `user_id`. Do not flag potential duplicate row IDs when using `user_id` from API key descriptors."
},
{
"scope": ["backend/**/*.py"],
"rule": "Never raise HTTPException directly in business code. Use `raise OnyxError(OnyxErrorCode.XXX, \"message\")` from `onyx.error_handling.exceptions`. A global FastAPI exception handler converts OnyxError into structured JSON responses with {\"error_code\": \"...\", \"detail\": \"...\"}. Error codes are defined in `onyx.error_handling.error_codes.OnyxErrorCode`. For upstream errors with dynamic HTTP status codes, use `status_code_override`: `raise OnyxError(OnyxErrorCode.BAD_GATEWAY, detail, status_code_override=upstream_status)`."
}
]
}

57
.greptile/files.json Normal file
View File

@@ -0,0 +1,57 @@
[
{
"scope": [],
"path": "contributing_guides/best_practices.md",
"description": "Best practices for contributing to the codebase"
},
{
"scope": ["web/**"],
"path": "web/AGENTS.md",
"description": "Frontend coding standards for the web directory"
},
{
"scope": ["web/**"],
"path": "web/tests/README.md",
"description": "Frontend testing guide and conventions"
},
{
"scope": ["web/**"],
"path": "web/CLAUDE.md",
"description": "Single source of truth for frontend coding standards"
},
{
"scope": ["web/**"],
"path": "web/lib/opal/README.md",
"description": "Opal component library usage guide"
},
{
"scope": ["backend/**"],
"path": "backend/tests/README.md",
"description": "Backend testing guide covering all 4 test types, fixtures, and conventions"
},
{
"scope": ["backend/onyx/connectors/**"],
"path": "backend/onyx/connectors/README.md",
"description": "Connector development guide covering design, interfaces, and required changes"
},
{
"scope": [],
"path": "CLAUDE.md",
"description": "Project instructions and coding standards"
},
{
"scope": [],
"path": "backend/alembic/README.md",
"description": "Migration guidance, including multi-tenant migration behavior"
},
{
"scope": [],
"path": "deployment/helm/charts/onyx/values-lite.yaml",
"description": "Lite deployment Helm values and service assumptions"
},
{
"scope": [],
"path": "deployment/docker_compose/docker-compose.onyx-lite.yml",
"description": "Lite deployment Docker Compose overlay and disabled service behavior"
}
]

39
.greptile/rules.md Normal file
View File

@@ -0,0 +1,39 @@
# Greptile Review Rules
## Type Annotations
Use explicit type annotations for variables to enhance code clarity, especially when moving type hints around in the code.
## Best Practices
Use `contributing_guides/best_practices.md` as core review context. Prefer consistency with existing patterns, fix issues in code you touch, avoid tacking new features onto muddy interfaces, fail loudly instead of silently swallowing errors, keep code strictly typed, preserve clear state boundaries, remove duplicate or dead logic, break up overly long functions, avoid hidden import-time side effects, respect module boundaries, and favor correctness-by-construction over relying on callers to use an API correctly.
## TODOs
Whenever a TODO is added, there must always be an associated name or ticket with that TODO in the style of `TODO(name): ...` or `TODO(1234): ...`
## Debugging Code
Remove temporary debugging code before merging to production, especially tenant-specific debugging logs.
## Hardcoded Booleans
When hardcoding a boolean variable to a constant value, remove the variable entirely and clean up all places where it's used rather than just setting it to a constant.
## Multi-tenant vs Single-tenant
Code changes must consider both multi-tenant and single-tenant deployments. In multi-tenant mode, preserve tenant isolation, ensure tenant context is propagated correctly, and avoid assumptions that only hold for a single shared schema or globally shared state. In single-tenant mode, avoid introducing unnecessary tenant-specific requirements or cloud-only control-plane dependencies.
## Nginx Routing — New Backend Routes
Whenever a new backend route is added that does NOT start with `/api`, it must also be explicitly added to ALL nginx configs:
- `deployment/helm/charts/onyx/templates/nginx-conf.yaml` (Helm/k8s)
- `deployment/data/nginx/app.conf.template` (docker-compose dev)
- `deployment/data/nginx/app.conf.template.prod` (docker-compose prod)
- `deployment/data/nginx/app.conf.template.no-letsencrypt` (docker-compose no-letsencrypt)
Routes not starting with `/api` are not caught by the existing `^/(api|openapi\.json)` location block and will fall through to `location /`, which proxies to the Next.js web server and returns an HTML 404. The new location block must be placed before the `/api` block. Examples of routes that need this treatment: `/scim`, `/mcp`.
## Full vs Lite Deployments
Code changes must consider both regular Onyx deployments and Onyx lite deployments. Lite deployments disable the vector DB, Redis, model servers, and background workers by default, use PostgreSQL-backed cache/auth/file storage, and rely on the API server to handle background work. Do not assume those services are available unless the code path is explicitly limited to full deployments.

View File

@@ -122,7 +122,7 @@ repos:
rev: 5d1e709b7be35cb2025444e19de266b056b7b7ee # frozen: v2.10.1
hooks:
- id: golangci-lint
language_version: "1.26.0"
language_version: "1.26.1"
entry: bash -c "find . -name go.mod -not -path './.venv/*' -print0 | xargs -0 -I{} bash -c 'cd \"$(dirname {})\" && golangci-lint run ./...'"
- repo: https://github.com/astral-sh/ruff-pre-commit

12
.vscode/launch.json vendored
View File

@@ -117,7 +117,8 @@
"presentation": {
"group": "2"
},
"consoleTitle": "API Server Console"
"consoleTitle": "API Server Console",
"justMyCode": false
},
{
"name": "Slack Bot",
@@ -268,7 +269,8 @@
"presentation": {
"group": "2"
},
"consoleTitle": "Celery heavy Console"
"consoleTitle": "Celery heavy Console",
"justMyCode": false
},
{
"name": "Celery kg_processing",
@@ -355,7 +357,8 @@
"presentation": {
"group": "2"
},
"consoleTitle": "Celery user_file_processing Console"
"consoleTitle": "Celery user_file_processing Console",
"justMyCode": false
},
{
"name": "Celery docfetching",
@@ -413,7 +416,8 @@
"presentation": {
"group": "2"
},
"consoleTitle": "Celery docprocessing Console"
"consoleTitle": "Celery docprocessing Console",
"justMyCode": false
},
{
"name": "Celery beat",

279
AGENTS.md
View File

@@ -167,284 +167,7 @@ web/
## Frontend Standards
### 1. Import Standards
**Always use absolute imports with the `@` prefix.**
**Reason:** Moving files around becomes easier since you don't also have to update those import statements. This makes modifications to the codebase much nicer.
```typescript
// ✅ Good
import { Button } from "@/components/ui/button";
import { useAuth } from "@/hooks/useAuth";
import { Text } from "@/refresh-components/texts/Text";
// ❌ Bad
import { Button } from "../../../components/ui/button";
import { useAuth } from "./hooks/useAuth";
```
### 2. React Component Functions
**Prefer regular functions over arrow functions for React components.**
**Reason:** Functions just become easier to read.
```typescript
// ✅ Good
function UserProfile({ userId }: UserProfileProps) {
return <div>User Profile</div>
}
// ❌ Bad
const UserProfile = ({ userId }: UserProfileProps) => {
return <div>User Profile</div>
}
```
### 3. Props Interface Extraction
**Extract prop types into their own interface definitions.**
**Reason:** Functions just become easier to read.
```typescript
// ✅ Good
interface UserCardProps {
user: User
showActions?: boolean
onEdit?: (userId: string) => void
}
function UserCard({ user, showActions = false, onEdit }: UserCardProps) {
return <div>User Card</div>
}
// ❌ Bad
function UserCard({
user,
showActions = false,
onEdit
}: {
user: User
showActions?: boolean
onEdit?: (userId: string) => void
}) {
return <div>User Card</div>
}
```
### 4. Spacing Guidelines
**Prefer padding over margins for spacing.**
**Reason:** We want to consolidate usage to paddings instead of margins.
```typescript
// ✅ Good
<div className="p-4 space-y-2">
<div className="p-2">Content</div>
</div>
// ❌ Bad
<div className="m-4 space-y-2">
<div className="m-2">Content</div>
</div>
```
### 5. Tailwind Dark Mode
**Strictly forbid using the `dark:` modifier in Tailwind classes, except for logo icon handling.**
**Reason:** The `colors.css` file already, VERY CAREFULLY, defines what the exact opposite colour of each light-mode colour is. Overriding this behaviour is VERY bad and will lead to horrible UI breakages.
**Exception:** The `createLogoIcon` helper in `web/src/components/icons/icons.tsx` uses `dark:` modifiers (`dark:invert`, `dark:hidden`, `dark:block`) to handle third-party logo icons that cannot automatically adapt through `colors.css`. This is the ONLY acceptable use of dark mode modifiers.
```typescript
// ✅ Good - Standard components use `tailwind-themes/tailwind.config.js` / `src/app/css/colors.css`
<div className="bg-background-neutral-03 text-text-02">
Content
</div>
// ✅ Good - Logo icons with dark mode handling via createLogoIcon
export const GithubIcon = createLogoIcon(githubLightIcon, {
monochromatic: true, // Will apply dark:invert internally
});
export const GitbookIcon = createLogoIcon(gitbookLightIcon, {
darkSrc: gitbookDarkIcon, // Will use dark:hidden/dark:block internally
});
// ❌ Bad - Manual dark mode overrides
<div className="bg-white dark:bg-black text-black dark:text-white">
Content
</div>
```
### 6. Class Name Utilities
**Use the `cn` utility instead of raw string formatting for classNames.**
**Reason:** `cn`s are easier to read. They also allow for more complex types (i.e., string-arrays) to get formatted properly (it flattens each element in that string array down). As a result, it can allow things such as conditionals (i.e., `myCondition && "some-tailwind-class"`, which evaluates to `false` when `myCondition` is `false`) to get filtered out.
```typescript
import { cn } from '@/lib/utils'
// ✅ Good
<div className={cn(
'base-class',
isActive && 'active-class',
className
)}>
Content
</div>
// ❌ Bad
<div className={`base-class ${isActive ? 'active-class' : ''} ${className}`}>
Content
</div>
```
### 7. Custom Hooks Organization
**Follow a "hook-per-file" layout. Each hook should live in its own file within `web/src/hooks`.**
**Reason:** This is just a layout preference. Keeps code clean.
```typescript
// web/src/hooks/useUserData.ts
export function useUserData(userId: string) {
// hook implementation
}
// web/src/hooks/useLocalStorage.ts
export function useLocalStorage<T>(key: string, initialValue: T) {
// hook implementation
}
```
### 8. Icon Usage
**ONLY use icons from the `web/src/icons` directory. Do NOT use icons from `react-icons`, `lucide`, or other external libraries.**
**Reason:** We have a very carefully curated selection of icons that match our Onyx guidelines. We do NOT want to muddy those up with different aesthetic stylings.
```typescript
// ✅ Good
import SvgX from "@/icons/x";
import SvgMoreHorizontal from "@/icons/more-horizontal";
// ❌ Bad
import { User } from "lucide-react";
import { FiSearch } from "react-icons/fi";
```
**Missing Icons**: If an icon is needed but doesn't exist in the `web/src/icons` directory, import it from Figma using the Figma MCP tool and add it to the icons directory.
If you need help with this step, reach out to `raunak@onyx.app`.
### 9. Text Rendering
**Prefer using the `refresh-components/texts/Text` component for all text rendering. Avoid "naked" text nodes.**
**Reason:** The `Text` component is fully compliant with the stylings provided in Figma. It provides easy utilities to specify the text-colour and font-size in the form of flags. Super duper easy.
```typescript
// ✅ Good
import { Text } from '@/refresh-components/texts/Text'
function UserCard({ name }: { name: string }) {
return (
<Text
{/* The `text03` flag makes the text it renders to be coloured the 3rd-scale grey */}
text03
{/* The `mainAction` flag makes the text it renders to be "main-action" font + line-height + weightage, as described in the Figma */}
mainAction
>
{name}
</Text>
)
}
// ❌ Bad
function UserCard({ name }: { name: string }) {
return (
<div>
<h2>{name}</h2>
<p>User details</p>
</div>
)
}
```
### 10. Component Usage
**Heavily avoid raw HTML input components. Always use components from the `web/src/refresh-components` or `web/lib/opal/src` directory.**
**Reason:** We've put in a lot of effort to unify the components that are rendered in the Onyx app. Using raw components breaks the entire UI of the application, and leaves it in a muddier state than before.
```typescript
// ✅ Good
import Button from '@/refresh-components/buttons/Button'
import InputTypeIn from '@/refresh-components/inputs/InputTypeIn'
import SvgPlusCircle from '@/icons/plus-circle'
function ContactForm() {
return (
<form>
<InputTypeIn placeholder="Search..." />
<Button type="submit" leftIcon={SvgPlusCircle}>Submit</Button>
</form>
)
}
// ❌ Bad
function ContactForm() {
return (
<form>
<input placeholder="Name" />
<textarea placeholder="Message" />
<button type="submit">Submit</button>
</form>
)
}
```
### 11. Colors
**Always use custom overrides for colors and borders rather than built in Tailwind CSS colors. These overrides live in `web/tailwind-themes/tailwind.config.js`.**
**Reason:** Our custom color system uses CSS variables that automatically handle dark mode and maintain design consistency across the app. Standard Tailwind colors bypass this system.
**Available color categories:**
- **Text:** `text-01` through `text-05`, `text-inverted-XX`
- **Backgrounds:** `background-neutral-XX`, `background-tint-XX` (and inverted variants)
- **Borders:** `border-01` through `border-05`, `border-inverted-XX`
- **Actions:** `action-link-XX`, `action-danger-XX`
- **Status:** `status-info-XX`, `status-success-XX`, `status-warning-XX`, `status-error-XX`
- **Theme:** `theme-primary-XX`, `theme-red-XX`, `theme-blue-XX`, etc.
```typescript
// ✅ Good - Use custom Onyx color classes
<div className="bg-background-neutral-01 border border-border-02" />
<div className="bg-background-tint-02 border border-border-01" />
<div className="bg-status-success-01" />
<div className="bg-action-link-01" />
<div className="bg-theme-primary-05" />
// ❌ Bad - Do NOT use standard Tailwind colors
<div className="bg-gray-100 border border-gray-300 text-gray-600" />
<div className="bg-white border border-slate-200" />
<div className="bg-green-100 text-green-700" />
<div className="bg-blue-100 text-blue-600" />
<div className="bg-indigo-500" />
```
### 12. Data Fetching
**Prefer using `useSWR` for data fetching. Data should generally be fetched on the client side. Components that need data should display a loader / placeholder while waiting for that data. Prefer loading data within the component that needs it rather than at the top level and passing it down.**
**Reason:** Client side fetching allows us to load the skeleton of the page without waiting for data to load, leading to a snappier UX. Loading data where needed reduces dependencies between a component and its parent component(s).
Frontend standards for the `web/` and `desktop/` projects live in `web/AGENTS.md`.
## Database & Migrations

View File

@@ -35,7 +35,7 @@ Onyx comes loaded with advanced features like Agents, Web Search, RAG, MCP, Deep
> [!TIP]
> Run Onyx with one command (or see deployment section below):
> ```
> curl -fsSL https://raw.githubusercontent.com/onyx-dot-app/onyx/main/deployment/docker_compose/install.sh > install.sh && chmod +x install.sh && ./install.sh
> curl -fsSL https://onyx.app/install_onyx.sh | bash
> ```
****

View File

@@ -47,6 +47,8 @@ RUN apt-get update && \
gcc \
nano \
vim \
# Install procps so kubernetes exec sessions can use ps aux for debugging
procps \
libjemalloc2 \
&& \
rm -rf /var/lib/apt/lists/* && \

View File

@@ -0,0 +1,35 @@
"""remove voice_provider deleted column
Revision ID: 1d78c0ca7853
Revises: a3f8b2c1d4e5
Create Date: 2026-03-26 11:30:53.883127
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "1d78c0ca7853"
down_revision = "a3f8b2c1d4e5"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Hard-delete any soft-deleted rows before dropping the column
op.execute("DELETE FROM voice_provider WHERE deleted = true")
op.drop_column("voice_provider", "deleted")
def downgrade() -> None:
op.add_column(
"voice_provider",
sa.Column(
"deleted",
sa.Boolean(),
nullable=False,
server_default=sa.text("false"),
),
)

View File

@@ -0,0 +1,109 @@
"""group_permissions_phase1
Revision ID: 25a5501dc766
Revises: b728689f45b1
Create Date: 2026-03-23 11:41:25.557442
"""
from alembic import op
import fastapi_users_db_sqlalchemy
import sqlalchemy as sa
from onyx.db.enums import AccountType
from onyx.db.enums import GrantSource
from onyx.db.enums import Permission
# revision identifiers, used by Alembic.
revision = "25a5501dc766"
down_revision = "b728689f45b1"
branch_labels = None
depends_on = None
def upgrade() -> None:
# 1. Add account_type column to user table (nullable for now).
# TODO(subash): backfill account_type for existing rows and add NOT NULL.
op.add_column(
"user",
sa.Column(
"account_type",
sa.Enum(AccountType, native_enum=False),
nullable=True,
),
)
# 2. Add is_default column to user_group table
op.add_column(
"user_group",
sa.Column(
"is_default",
sa.Boolean(),
nullable=False,
server_default=sa.false(),
),
)
# 3. Create permission_grant table
op.create_table(
"permission_grant",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("group_id", sa.Integer(), nullable=False),
sa.Column(
"permission",
sa.Enum(Permission, native_enum=False),
nullable=False,
),
sa.Column(
"grant_source",
sa.Enum(GrantSource, native_enum=False),
nullable=False,
),
sa.Column(
"granted_by",
fastapi_users_db_sqlalchemy.generics.GUID(),
nullable=True,
),
sa.Column(
"granted_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"is_deleted",
sa.Boolean(),
nullable=False,
server_default=sa.false(),
),
sa.PrimaryKeyConstraint("id"),
sa.ForeignKeyConstraint(
["group_id"],
["user_group.id"],
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["granted_by"],
["user.id"],
ondelete="SET NULL",
),
sa.UniqueConstraint(
"group_id", "permission", name="uq_permission_grant_group_permission"
),
)
# 4. Index on user__user_group(user_id) — existing composite PK
# has user_group_id as leading column; user-filtered queries need this
op.create_index(
"ix_user__user_group_user_id",
"user__user_group",
["user_id"],
)
def downgrade() -> None:
op.drop_index("ix_user__user_group_user_id", table_name="user__user_group")
op.drop_table("permission_grant")
op.drop_column("user_group", "is_default")
op.drop_column("user", "account_type")

View File

@@ -0,0 +1,103 @@
"""add_hook_and_hook_execution_log_tables
Revision ID: 689433b0d8de
Revises: 93a2e195e25c
Create Date: 2026-03-13 11:25:06.547474
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID as PGUUID
# revision identifiers, used by Alembic.
revision = "689433b0d8de"
down_revision = "93a2e195e25c"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"hook",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.Column(
"hook_point",
sa.Enum("document_ingestion", "query_processing", native_enum=False),
nullable=False,
),
sa.Column("endpoint_url", sa.Text(), nullable=True),
sa.Column("api_key", sa.LargeBinary(), nullable=True),
sa.Column("is_reachable", sa.Boolean(), nullable=True),
sa.Column(
"fail_strategy",
sa.Enum("hard", "soft", native_enum=False),
nullable=False,
),
sa.Column("timeout_seconds", sa.Float(), nullable=False),
sa.Column(
"is_active", sa.Boolean(), nullable=False, server_default=sa.text("false")
),
sa.Column(
"deleted", sa.Boolean(), nullable=False, server_default=sa.text("false")
),
sa.Column("creator_id", PGUUID(as_uuid=True), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(["creator_id"], ["user.id"], ondelete="SET NULL"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
"ix_hook_one_non_deleted_per_point",
"hook",
["hook_point"],
unique=True,
postgresql_where=sa.text("deleted = false"),
)
op.create_table(
"hook_execution_log",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("hook_id", sa.Integer(), nullable=False),
sa.Column(
"is_success",
sa.Boolean(),
nullable=False,
),
sa.Column("error_message", sa.Text(), nullable=True),
sa.Column("status_code", sa.Integer(), nullable=True),
sa.Column("duration_ms", sa.Integer(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(["hook_id"], ["hook.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_hook_execution_log_hook_id", "hook_execution_log", ["hook_id"])
op.create_index(
"ix_hook_execution_log_created_at", "hook_execution_log", ["created_at"]
)
def downgrade() -> None:
op.drop_index("ix_hook_execution_log_created_at", table_name="hook_execution_log")
op.drop_index("ix_hook_execution_log_hook_id", table_name="hook_execution_log")
op.drop_table("hook_execution_log")
op.drop_index("ix_hook_one_non_deleted_per_point", table_name="hook")
op.drop_table("hook")

View File

@@ -0,0 +1,36 @@
"""add preferred_response_id and model_display_name to chat_message
Revision ID: a3f8b2c1d4e5
Create Date: 2026-03-22
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "a3f8b2c1d4e5"
down_revision = "25a5501dc766"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"chat_message",
sa.Column(
"preferred_response_id",
sa.Integer(),
sa.ForeignKey("chat_message.id", ondelete="SET NULL"),
nullable=True,
),
)
op.add_column(
"chat_message",
sa.Column("model_display_name", sa.String(), nullable=True),
)
def downgrade() -> None:
op.drop_column("chat_message", "model_display_name")
op.drop_column("chat_message", "preferred_response_id")

View File

@@ -0,0 +1,26 @@
"""rename persona is_visible to is_listed and featured to is_featured
Revision ID: b728689f45b1
Revises: 689433b0d8de
Create Date: 2026-03-23 12:36:26.607305
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "b728689f45b1"
down_revision = "689433b0d8de"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.alter_column("persona", "is_visible", new_column_name="is_listed")
op.alter_column("persona", "featured", new_column_name="is_featured")
def downgrade() -> None:
op.alter_column("persona", "is_listed", new_column_name="is_visible")
op.alter_column("persona", "is_featured", new_column_name="featured")

View File

@@ -36,6 +36,56 @@ TABLES_WITH_USER_ID = [
]
def _dedupe_null_notifications(connection: sa.Connection) -> None:
# Multiple NULL-owned notifications can exist because the unique index treats
# NULL user_id values as distinct. Before migrating them to the anonymous
# user, collapse duplicates and remove rows that would conflict with an
# already-existing anonymous notification.
result = connection.execute(
sa.text(
"""
WITH ranked_null_notifications AS (
SELECT
id,
ROW_NUMBER() OVER (
PARTITION BY notif_type, COALESCE(additional_data, '{}'::jsonb)
ORDER BY first_shown DESC, last_shown DESC, id DESC
) AS row_num
FROM notification
WHERE user_id IS NULL
)
DELETE FROM notification
WHERE id IN (
SELECT id
FROM ranked_null_notifications
WHERE row_num > 1
)
"""
)
)
if result.rowcount > 0:
print(f"Deleted {result.rowcount} duplicate NULL-owned notifications")
result = connection.execute(
sa.text(
"""
DELETE FROM notification AS null_owned
USING notification AS anonymous_owned
WHERE null_owned.user_id IS NULL
AND anonymous_owned.user_id = :user_id
AND null_owned.notif_type = anonymous_owned.notif_type
AND COALESCE(null_owned.additional_data, '{}'::jsonb) =
COALESCE(anonymous_owned.additional_data, '{}'::jsonb)
"""
),
{"user_id": ANONYMOUS_USER_UUID},
)
if result.rowcount > 0:
print(
f"Deleted {result.rowcount} NULL-owned notifications that conflict with existing anonymous-owned notifications"
)
def upgrade() -> None:
"""
Create the anonymous user for anonymous access feature.
@@ -65,7 +115,12 @@ def upgrade() -> None:
# Migrate any remaining user_id=NULL records to anonymous user
for table in TABLES_WITH_USER_ID:
try:
# Dedup notifications outside the savepoint so deletions persist
# even if the subsequent UPDATE rolls back
if table == "notification":
_dedupe_null_notifications(connection)
with connection.begin_nested():
# Exclude public credential (id=0) which must remain user_id=NULL
# Exclude builtin tools (in_code_tool_id IS NOT NULL) which must remain user_id=NULL
# Exclude builtin personas (builtin_persona=True) which must remain user_id=NULL
@@ -80,6 +135,7 @@ def upgrade() -> None:
condition = "user_id IS NULL AND is_public = false"
else:
condition = "user_id IS NULL"
result = connection.execute(
sa.text(
f"""
@@ -92,19 +148,19 @@ def upgrade() -> None:
)
if result.rowcount > 0:
print(f"Updated {result.rowcount} rows in {table} to anonymous user")
except Exception as e:
print(f"Skipping {table}: {e}")
def downgrade() -> None:
"""
Set anonymous user's records back to NULL and delete the anonymous user.
Note: Duplicate NULL-owned notifications removed during upgrade are not restored.
"""
connection = op.get_bind()
# Set records back to NULL
for table in TABLES_WITH_USER_ID:
try:
with connection.begin_nested():
connection.execute(
sa.text(
f"""
@@ -115,8 +171,6 @@ def downgrade() -> None:
),
{"user_id": ANONYMOUS_USER_UUID},
)
except Exception:
pass
# Delete the anonymous user
connection.execute(

View File

@@ -28,6 +28,7 @@ from onyx.access.models import DocExternalAccess
from onyx.access.models import ElementExternalAccess
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_redis import celery_get_queued_task_ids
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
@@ -187,7 +188,6 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str) -> bool | None
# (which lives on a different db number)
r = get_redis_client()
r_replica = get_redis_replica_client()
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_DOC_PERMISSIONS_SYNC_BEAT_LOCK,
@@ -227,6 +227,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str) -> bool | None
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
r_celery = celery_get_broker_client(self.app)
validate_permission_sync_fences(
tenant_id, r, r_replica, r_celery, lock_beat
)
@@ -473,6 +474,8 @@ def connector_permission_sync_generator_task(
cc_pair = get_connector_credential_pair_from_id(
db_session=db_session,
cc_pair_id=cc_pair_id,
eager_load_connector=True,
eager_load_credential=True,
)
if cc_pair is None:
raise ValueError(

View File

@@ -29,6 +29,7 @@ from ee.onyx.external_permissions.sync_params import (
from ee.onyx.external_permissions.sync_params import get_source_perm_sync_config
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
from onyx.background.error_logging import emit_background_error
@@ -162,7 +163,6 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str) -> bool | None:
# (which lives on a different db number)
r = get_redis_client()
r_replica = get_redis_replica_client()
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_EXTERNAL_GROUP_SYNC_BEAT_LOCK,
@@ -221,6 +221,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str) -> bool | None:
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
r_celery = celery_get_broker_client(self.app)
validate_external_group_sync_fences(
tenant_id, self.app, r, r_replica, r_celery, lock_beat
)

View File

@@ -25,13 +25,13 @@ from onyx.redis.redis_pool import get_redis_client
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import TENANT_ID_PREFIX
# Default number of pre-provisioned tenants to maintain
DEFAULT_TARGET_AVAILABLE_TENANTS = 5
# Maximum tenants to provision in a single task run.
# Each tenant takes ~80s (alembic migrations), so 5 tenants ≈ 7 minutes.
_MAX_TENANTS_PER_RUN = 5
# Soft time limit for tenant pre-provisioning tasks (in seconds)
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes
# Hard time limit for tenant pre-provisioning tasks (in seconds)
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 10 # 10 minutes
# Time limits sized for worst-case batch: _MAX_TENANTS_PER_RUN × ~90s + buffer.
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 10 # 10 minutes
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 15 # 15 minutes
@shared_task(
@@ -58,7 +58,7 @@ def check_available_tenants(self: Task) -> None: # noqa: ARG001
r = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
lock_check: RedisLock = r.lock(
OnyxRedisLocks.CHECK_AVAILABLE_TENANTS_LOCK,
timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
timeout=_TENANT_PROVISIONING_TIME_LIMIT,
)
# These tasks should never overlap
@@ -74,9 +74,7 @@ def check_available_tenants(self: Task) -> None: # noqa: ARG001
num_available_tenants = db_session.query(AvailableTenant).count()
# Get the target number of available tenants
num_minimum_available_tenants = getattr(
TARGET_AVAILABLE_TENANTS, "value", DEFAULT_TARGET_AVAILABLE_TENANTS
)
num_minimum_available_tenants = TARGET_AVAILABLE_TENANTS
# Calculate how many new tenants we need to provision
if num_available_tenants < num_minimum_available_tenants:
@@ -90,22 +88,46 @@ def check_available_tenants(self: Task) -> None: # noqa: ARG001
f"To provision: {tenants_to_provision}"
)
# just provision one tenant each time we run this ... increase if needed.
if tenants_to_provision > 0:
pre_provision_tenant()
batch_size = min(tenants_to_provision, _MAX_TENANTS_PER_RUN)
if batch_size < tenants_to_provision:
task_logger.info(
f"Capping batch to {batch_size} "
f"(need {tenants_to_provision}, will catch up next cycle)"
)
provisioned = 0
for i in range(batch_size):
task_logger.info(f"Provisioning tenant {i + 1}/{batch_size}")
try:
if pre_provision_tenant():
provisioned += 1
except Exception:
task_logger.exception(
f"Failed to provision tenant {i + 1}/{batch_size}, "
"continuing with remaining tenants"
)
task_logger.info(f"Provisioning complete: {provisioned}/{batch_size} succeeded")
except Exception:
task_logger.exception("Error in check_available_tenants task")
finally:
lock_check.release()
try:
lock_check.release()
except Exception:
task_logger.warning(
"Could not release check lock (likely expired), continuing"
)
def pre_provision_tenant() -> None:
def pre_provision_tenant() -> bool:
"""
Pre-provision a new tenant and store it in the NewAvailableTenant table.
This function fully sets up the tenant with all necessary configurations,
so it's ready to be assigned to a user immediately.
Returns True if a tenant was successfully provisioned, False otherwise.
"""
# The MULTI_TENANT check is now done at the caller level (check_available_tenants)
# rather than inside this function
@@ -113,15 +135,15 @@ def pre_provision_tenant() -> None:
r = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
lock_provision: RedisLock = r.lock(
OnyxRedisLocks.CLOUD_PRE_PROVISION_TENANT_LOCK,
timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
timeout=_TENANT_PROVISIONING_TIME_LIMIT,
)
# Allow multiple pre-provisioning tasks to run, but ensure they don't overlap
if not lock_provision.acquire(blocking=False):
task_logger.debug(
"Skipping pre_provision_tenant task because it is already running"
task_logger.warning(
"Skipping pre_provision_tenant — could not acquire provision lock"
)
return
return False
tenant_id: str | None = None
try:
@@ -161,6 +183,7 @@ def pre_provision_tenant() -> None:
db_session.add(new_tenant)
db_session.commit()
task_logger.info(f"Successfully pre-provisioned tenant: {tenant_id}")
return True
except Exception:
db_session.rollback()
task_logger.error(
@@ -184,5 +207,11 @@ def pre_provision_tenant() -> None:
asyncio.run(rollback_tenant_provisioning(tenant_id))
except Exception:
task_logger.exception(f"Error during rollback for tenant: {tenant_id}")
return False
finally:
lock_provision.release()
try:
lock_provision.release()
except Exception:
task_logger.warning(
"Could not release provision lock (likely expired), continuing"
)

View File

@@ -115,8 +115,14 @@ def fetch_user_group_token_rate_limits_for_user(
ordered: bool = True,
get_editable: bool = True,
) -> Sequence[TokenRateLimit]:
stmt = select(TokenRateLimit)
stmt = stmt.where(User__UserGroup.user_group_id == group_id)
stmt = (
select(TokenRateLimit)
.join(
TokenRateLimit__UserGroup,
TokenRateLimit.id == TokenRateLimit__UserGroup.rate_limit_id,
)
.where(TokenRateLimit__UserGroup.user_group_id == group_id)
)
stmt = _add_user_filters(stmt, user, get_editable)
if enabled_only:

View File

@@ -800,6 +800,33 @@ def update_user_group(
return db_user_group
def rename_user_group(
db_session: Session,
user_group_id: int,
new_name: str,
) -> UserGroup:
stmt = select(UserGroup).where(UserGroup.id == user_group_id)
db_user_group = db_session.scalar(stmt)
if db_user_group is None:
raise ValueError(f"UserGroup with id '{user_group_id}' not found")
_check_user_group_is_modifiable(db_user_group)
db_user_group.name = new_name
db_user_group.time_last_modified_by_user = func.now()
# CC pair documents in Vespa contain the group name, so we need to
# trigger a sync to update them with the new name.
_mark_user_group__cc_pair_relationships_outdated__no_commit(
db_session=db_session, user_group_id=user_group_id
)
if not DISABLE_VECTOR_DB:
db_user_group.is_up_to_date = False
db_session.commit()
return db_user_group
def prepare_user_group_for_deletion(db_session: Session, user_group_id: int) -> None:
stmt = select(UserGroup).where(UserGroup.id == user_group_id)
db_user_group = db_session.scalar(stmt)

View File

@@ -250,20 +250,24 @@ def _get_sharepoint_list_item_id(drive_item: DriveItem) -> str | None:
raise e
def _is_public_item(drive_item: DriveItem) -> bool:
is_public = False
def _is_public_item(
drive_item: DriveItem,
treat_sharing_link_as_public: bool = False,
) -> bool:
if not treat_sharing_link_as_public:
return False
try:
permissions = sleep_and_retry(
drive_item.permissions.get_all(page_loaded=lambda _: None), "is_public_item"
)
for permission in permissions:
if permission.link and (
permission.link.scope == "anonymous"
or permission.link.scope == "organization"
if permission.link and permission.link.scope in (
"anonymous",
"organization",
):
is_public = True
break
return is_public
return True
return False
except Exception as e:
logger.error(f"Failed to check if item {drive_item.id} is public: {e}")
return False
@@ -504,6 +508,7 @@ def get_external_access_from_sharepoint(
drive_item: DriveItem | None,
site_page: dict[str, Any] | None,
add_prefix: bool = False,
treat_sharing_link_as_public: bool = False,
) -> ExternalAccess:
"""
Get external access information from SharePoint.
@@ -563,8 +568,7 @@ def get_external_access_from_sharepoint(
)
if drive_item and drive_name:
# Here we check if the item have have any public links, if so we return early
is_public = _is_public_item(drive_item)
is_public = _is_public_item(drive_item, treat_sharing_link_as_public)
if is_public:
logger.info(f"Item {drive_item.id} is public")
return ExternalAccess(

View File

@@ -8,6 +8,7 @@ from ee.onyx.external_permissions.slack.utils import fetch_user_id_to_email_map
from onyx.access.models import DocExternalAccess
from onyx.access.models import ExternalAccess
from onyx.connectors.credentials_provider import OnyxDBCredentialsProvider
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.models import HierarchyNode
from onyx.connectors.slack.connector import get_channels
from onyx.connectors.slack.connector import make_paginated_slack_api_call
@@ -105,9 +106,11 @@ def _get_slack_document_access(
slack_connector: SlackConnector,
channel_permissions: dict[str, ExternalAccess], # noqa: ARG001
callback: IndexingHeartbeatInterface | None,
indexing_start: SecondsSinceUnixEpoch | None = None,
) -> Generator[DocExternalAccess, None, None]:
slim_doc_generator = slack_connector.retrieve_all_slim_docs_perm_sync(
callback=callback
callback=callback,
start=indexing_start,
)
for doc_metadata_batch in slim_doc_generator:
@@ -180,9 +183,15 @@ def slack_doc_sync(
slack_connector = SlackConnector(**cc_pair.connector.connector_specific_config)
slack_connector.set_credentials_provider(provider)
indexing_start_ts: SecondsSinceUnixEpoch | None = (
cc_pair.connector.indexing_start.timestamp()
if cc_pair.connector.indexing_start is not None
else None
)
yield from _get_slack_document_access(
slack_connector,
slack_connector=slack_connector,
channel_permissions=channel_permissions,
callback=callback,
indexing_start=indexing_start_ts,
)

View File

@@ -6,6 +6,7 @@ from onyx.access.models import ElementExternalAccess
from onyx.access.models import ExternalAccess
from onyx.access.models import NodeExternalAccess
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import HierarchyNode
from onyx.db.models import ConnectorCredentialPair
@@ -40,10 +41,19 @@ def generic_doc_sync(
logger.info(f"Starting {doc_source} doc sync for CC Pair ID: {cc_pair.id}")
indexing_start: SecondsSinceUnixEpoch | None = (
cc_pair.connector.indexing_start.timestamp()
if cc_pair.connector.indexing_start is not None
else None
)
newly_fetched_doc_ids: set[str] = set()
logger.info(f"Fetching all slim documents from {doc_source}")
for doc_batch in slim_connector.retrieve_all_slim_docs_perm_sync(callback=callback):
for doc_batch in slim_connector.retrieve_all_slim_docs_perm_sync(
start=indexing_start,
callback=callback,
):
logger.info(f"Got {len(doc_batch)} slim documents from {doc_source}")
if callback:

View File

@@ -44,19 +44,21 @@ def _run_single_search(
user: User,
db_session: Session,
num_hits: int | None = None,
hybrid_alpha: float | None = None,
) -> list[InferenceChunk]:
"""Execute a single search query and return chunks."""
chunk_search_request = ChunkSearchRequest(
query=query,
user_selected_filters=filters,
limit=num_hits,
hybrid_alpha=hybrid_alpha,
)
return search_pipeline(
chunk_search_request=chunk_search_request,
document_index=document_index,
user=user,
persona=None, # No persona for direct search
persona_search_info=None,
db_session=db_session,
)
@@ -74,7 +76,7 @@ def stream_search_query(
Core search function that yields streaming packets.
Used by both streaming and non-streaming endpoints.
"""
# Get document index
# Get document index.
search_settings = get_current_search_settings(db_session)
# This flow is for search so we do not get all indices.
document_index = get_default_document_index(search_settings, None, db_session)
@@ -119,6 +121,7 @@ def stream_search_query(
user=user,
db_session=db_session,
num_hits=request.num_hits,
hybrid_alpha=request.hybrid_alpha,
)
else:
# Multiple queries - run in parallel and merge with RRF
@@ -133,6 +136,7 @@ def stream_search_query(
user,
db_session,
request.num_hits,
request.hybrid_alpha,
),
)
for query in all_executed_queries

View File

@@ -157,7 +157,11 @@ def fetch_logo_helper(db_session: Session) -> Response: # noqa: ARG001
detail="No logo file found",
)
else:
return Response(content=onyx_file.data, media_type=onyx_file.mime_type)
return Response(
content=onyx_file.data,
media_type=onyx_file.mime_type,
headers={"Cache-Control": "no-cache"},
)
def fetch_logotype_helper(db_session: Session) -> Response: # noqa: ARG001

View File

@@ -27,15 +27,17 @@ class SearchFlowClassificationResponse(BaseModel):
is_search_flow: bool
# NOTE: This model is used for the core flow of the Onyx application, any changes to it should be reviewed and approved by an
# experienced team member. It is very important to 1. avoid bloat and 2. that this remains backwards compatible across versions.
# NOTE: This model is used for the core flow of the Onyx application, any
# changes to it should be reviewed and approved by an experienced team member.
# It is very important to 1. avoid bloat and 2. that this remains backwards
# compatible across versions.
class SendSearchQueryRequest(BaseModel):
search_query: str
filters: BaseFilters | None = None
num_docs_fed_to_llm_selection: int | None = None
run_query_expansion: bool = False
num_hits: int = 30
hybrid_alpha: float | None = None
include_content: bool = False
stream: bool = False

View File

@@ -20,6 +20,7 @@ from ee.onyx.server.query_and_chat.models import SearchQueryResponse
from ee.onyx.server.query_and_chat.models import SendSearchQueryRequest
from ee.onyx.server.query_and_chat.streaming_models import SearchErrorPacket
from onyx.auth.users import current_user
from onyx.configs.app_configs import ONYX_SEARCH_UI_USES_OPENSEARCH_KEYWORD_SEARCH
from onyx.db.engine.sql_engine import get_session
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.models import User
@@ -67,8 +68,10 @@ def search_flow_classification(
return SearchFlowClassificationResponse(is_search_flow=is_search_flow)
# NOTE: This endpoint is used for the core flow of the Onyx application, any changes to it should be reviewed and approved by an
# experienced team member. It is very important to 1. avoid bloat and 2. that this remains backwards compatible across versions.
# NOTE: This endpoint is used for the core flow of the Onyx application, any
# changes to it should be reviewed and approved by an experienced team member.
# It is very important to 1. avoid bloat and 2. that this remains backwards
# compatible across versions.
@router.post(
"/send-search-message",
response_model=None,
@@ -80,13 +83,19 @@ def handle_send_search_message(
db_session: Session = Depends(get_session),
) -> StreamingResponse | SearchFullResponse:
"""
Execute a search query with optional streaming.
Executes a search query with optional streaming.
When stream=True: Returns StreamingResponse with SSE
When stream=False: Returns SearchFullResponse
If hybrid_alpha is unset and ONYX_SEARCH_UI_USES_OPENSEARCH_KEYWORD_SEARCH
is True, executes pure keyword search.
Returns:
StreamingResponse with SSE if stream=True, otherwise SearchFullResponse.
"""
logger.debug(f"Received search query: {request.search_query}")
if request.hybrid_alpha is None and ONYX_SEARCH_UI_USES_OPENSEARCH_KEYWORD_SEARCH:
request.hybrid_alpha = 0.0
# Non-streaming path
if not request.stream:
try:

View File

@@ -178,7 +178,7 @@ def _seed_personas(db_session: Session, personas: list[PersonaUpsertRequest]) ->
system_prompt=persona.system_prompt,
task_prompt=persona.task_prompt,
datetime_aware=persona.datetime_aware,
featured=persona.featured,
is_featured=persona.is_featured,
commit=False,
)
db_session.commit()

View File

@@ -4,6 +4,7 @@ from fastapi import HTTPException
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from ee.onyx.db.persona import update_persona_access
from ee.onyx.db.user_group import add_users_to_user_group
from ee.onyx.db.user_group import delete_user_group as db_delete_user_group
from ee.onyx.db.user_group import fetch_user_group
@@ -11,13 +12,16 @@ from ee.onyx.db.user_group import fetch_user_groups
from ee.onyx.db.user_group import fetch_user_groups_for_user
from ee.onyx.db.user_group import insert_user_group
from ee.onyx.db.user_group import prepare_user_group_for_deletion
from ee.onyx.db.user_group import rename_user_group
from ee.onyx.db.user_group import update_user_curator_relationship
from ee.onyx.db.user_group import update_user_group
from ee.onyx.server.user_group.models import AddUsersToUserGroupRequest
from ee.onyx.server.user_group.models import MinimalUserGroupSnapshot
from ee.onyx.server.user_group.models import SetCuratorRequest
from ee.onyx.server.user_group.models import UpdateGroupAgentsRequest
from ee.onyx.server.user_group.models import UserGroup
from ee.onyx.server.user_group.models import UserGroupCreate
from ee.onyx.server.user_group.models import UserGroupRename
from ee.onyx.server.user_group.models import UserGroupUpdate
from onyx.auth.users import current_admin_user
from onyx.auth.users import current_curator_or_admin_user
@@ -27,6 +31,9 @@ from onyx.configs.constants import PUBLIC_API_TAGS
from onyx.db.engine.sql_engine import get_session
from onyx.db.models import User
from onyx.db.models import UserRole
from onyx.db.persona import get_persona_by_id
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -87,6 +94,32 @@ def create_user_group(
return UserGroup.from_model(db_user_group)
@router.patch("/admin/user-group/rename")
def rename_user_group_endpoint(
rename_request: UserGroupRename,
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> UserGroup:
try:
return UserGroup.from_model(
rename_user_group(
db_session=db_session,
user_group_id=rename_request.id,
new_name=rename_request.name,
)
)
except IntegrityError:
raise OnyxError(
OnyxErrorCode.DUPLICATE_RESOURCE,
f"User group with name '{rename_request.name}' already exists.",
)
except ValueError as e:
msg = str(e)
if "not found" in msg.lower():
raise OnyxError(OnyxErrorCode.NOT_FOUND, msg)
raise OnyxError(OnyxErrorCode.CONFLICT, msg)
@router.patch("/admin/user-group/{user_group_id}")
def patch_user_group(
user_group_id: int,
@@ -161,3 +194,38 @@ def delete_user_group(
user_group = fetch_user_group(db_session, user_group_id)
if user_group:
db_delete_user_group(db_session, user_group)
@router.patch("/admin/user-group/{user_group_id}/agents")
def update_group_agents(
user_group_id: int,
request: UpdateGroupAgentsRequest,
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> None:
for agent_id in request.added_agent_ids:
persona = get_persona_by_id(
persona_id=agent_id, user=user, db_session=db_session
)
current_group_ids = [g.id for g in persona.groups]
if user_group_id not in current_group_ids:
update_persona_access(
persona_id=agent_id,
creator_user_id=user.id,
db_session=db_session,
group_ids=current_group_ids + [user_group_id],
)
for agent_id in request.removed_agent_ids:
persona = get_persona_by_id(
persona_id=agent_id, user=user, db_session=db_session
)
current_group_ids = [g.id for g in persona.groups]
update_persona_access(
persona_id=agent_id,
creator_user_id=user.id,
db_session=db_session,
group_ids=[gid for gid in current_group_ids if gid != user_group_id],
)
db_session.commit()

View File

@@ -104,6 +104,16 @@ class AddUsersToUserGroupRequest(BaseModel):
user_ids: list[UUID]
class UserGroupRename(BaseModel):
id: int
name: str
class SetCuratorRequest(BaseModel):
user_id: UUID
is_curator: bool
class UpdateGroupAgentsRequest(BaseModel):
added_agent_ids: list[int]
removed_agent_ids: list[int]

View File

@@ -80,15 +80,45 @@ def capture_and_sync_with_alternate_posthog(
logger.error(f"Error identifying cloud posthog user: {e}")
def alias_user(distinct_id: str, anonymous_id: str) -> None:
"""Link an anonymous distinct_id to an identified user, merging person profiles.
No-ops when the IDs match (e.g. returning users whose PostHog cookie
already contains their identified user ID).
"""
if not posthog or anonymous_id == distinct_id:
return
try:
posthog.alias(previous_id=anonymous_id, distinct_id=distinct_id)
posthog.flush()
except Exception as e:
logger.error(f"Error aliasing PostHog user: {e}")
def get_anon_id_from_request(request: Any) -> str | None:
"""Extract the anonymous distinct_id from the app PostHog cookie on a request."""
if not POSTHOG_API_KEY:
return None
cookie_name = f"ph_{POSTHOG_API_KEY}_posthog"
if (cookie_value := request.cookies.get(cookie_name)) and (
parsed := parse_posthog_cookie(cookie_value)
):
return parsed.get("distinct_id")
return None
def get_marketing_posthog_cookie_name() -> str | None:
if not MARKETING_POSTHOG_API_KEY:
return None
return f"onyx_custom_ph_{MARKETING_POSTHOG_API_KEY}_posthog"
def parse_marketing_cookie(cookie_value: str) -> dict[str, Any] | None:
def parse_posthog_cookie(cookie_value: str) -> dict[str, Any] | None:
"""
Parse the URL-encoded JSON marketing cookie.
Parse a URL-encoded JSON PostHog cookie
Expected format (URL-encoded):
{"distinct_id":"...", "featureFlags":{"landing_page_variant":"..."}, ...}
@@ -102,7 +132,7 @@ def parse_marketing_cookie(cookie_value: str) -> dict[str, Any] | None:
cookie_data = json.loads(decoded_cookie)
distinct_id = cookie_data.get("distinct_id")
if not distinct_id:
if not distinct_id or not isinstance(distinct_id, str):
return None
return cookie_data

View File

@@ -1,3 +1,5 @@
from typing import Any
from ee.onyx.utils.posthog_client import posthog
from onyx.utils.logger import setup_logger
@@ -5,7 +7,7 @@ logger = setup_logger()
def event_telemetry(
distinct_id: str, event: str, properties: dict | None = None
distinct_id: str, event: str, properties: dict[str, Any] | None = None
) -> None:
"""Capture and send an event to PostHog, flushing immediately."""
if not posthog:
@@ -17,3 +19,15 @@ def event_telemetry(
posthog.flush()
except Exception as e:
logger.error(f"Error capturing PostHog event: {e}")
def identify_user(distinct_id: str, properties: dict[str, Any] | None = None) -> None:
"""Create/update a PostHog person profile, flushing immediately."""
if not posthog:
return
try:
posthog.identify(distinct_id, properties)
posthog.flush()
except Exception as e:
logger.error(f"Error identifying PostHog user: {e}")

View File

@@ -19,6 +19,7 @@ from typing import Optional
from typing import Protocol
from typing import Tuple
from typing import TypeVar
from urllib.parse import urlparse
import jwt
from email_validator import EmailNotValidError
@@ -134,6 +135,9 @@ from onyx.redis.redis_pool import retrieve_ws_token_data
from onyx.server.settings.store import load_settings
from onyx.server.utils import BasicAuthenticationError
from onyx.utils.logger import setup_logger
from onyx.utils.telemetry import mt_cloud_alias
from onyx.utils.telemetry import mt_cloud_get_anon_id
from onyx.utils.telemetry import mt_cloud_identify
from onyx.utils.telemetry import mt_cloud_telemetry
from onyx.utils.telemetry import optional_telemetry
from onyx.utils.telemetry import RecordType
@@ -249,18 +253,12 @@ def verify_email_is_invited(email: str) -> None:
whitelist = get_invited_users()
if not email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": "Email must be specified"},
)
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email must be specified")
try:
email_info = validate_email(email, check_deliverability=False)
except EmailUndeliverableError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": "Email is not valid"},
)
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email is not valid")
for email_whitelist in whitelist:
try:
@@ -277,12 +275,9 @@ def verify_email_is_invited(email: str) -> None:
if email_info.normalized.lower() == email_info_whitelist.normalized.lower():
return
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"code": REGISTER_INVITE_ONLY_CODE,
"reason": "This workspace is invite-only. Please ask your admin to invite you.",
},
raise OnyxError(
OnyxErrorCode.UNAUTHORIZED,
"This workspace is invite-only. Please ask your admin to invite you.",
)
@@ -292,48 +287,47 @@ def verify_email_in_whitelist(email: str, tenant_id: str) -> None:
verify_email_is_invited(email)
def verify_email_domain(email: str) -> None:
def verify_email_domain(email: str, *, is_registration: bool = False) -> None:
if email.count("@") != 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email is not valid",
)
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email is not valid")
local_part, domain = email.split("@")
domain = domain.lower()
local_part = local_part.lower()
if AUTH_TYPE == AuthType.CLOUD:
# Normalize googlemail.com to gmail.com (they deliver to the same inbox)
if domain == "googlemail.com":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": "Please use @gmail.com instead of @googlemail.com."},
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Please use @gmail.com instead of @googlemail.com.",
)
# Only block dotted Gmail on new signups — existing users must still be
# able to sign in with the address they originally registered with.
if is_registration and domain == "gmail.com" and "." in local_part:
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Gmail addresses with '.' are not allowed. Please use your base email address.",
)
if "+" in local_part and domain != "onyx.app":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"reason": "Email addresses with '+' are not allowed. Please use your base email address."
},
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Email addresses with '+' are not allowed. Please use your base email address.",
)
# Check if email uses a disposable/temporary domain
if is_disposable_email(email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"reason": "Disposable email addresses are not allowed. Please use a permanent email address."
},
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Disposable email addresses are not allowed. Please use a permanent email address.",
)
# Check domain whitelist if configured
if VALID_EMAIL_DOMAINS:
if domain not in VALID_EMAIL_DOMAINS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email domain is not valid",
)
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email domain is not valid")
def enforce_seat_limit(db_session: Session, seats_needed: int = 1) -> None:
@@ -349,7 +343,7 @@ def enforce_seat_limit(db_session: Session, seats_needed: int = 1) -> None:
)(db_session, seats_needed=seats_needed)
if result is not None and not result.available:
raise HTTPException(status_code=402, detail=result.error_message)
raise OnyxError(OnyxErrorCode.SEAT_LIMIT_EXCEEDED, result.error_message)
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
@@ -402,10 +396,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
captcha_token or "", expected_action="signup"
)
except CaptchaVerificationError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": str(e)},
)
raise OnyxError(OnyxErrorCode.INVALID_INPUT, str(e))
# We verify the password here to make sure it's valid before we proceed
await self.validate_password(
@@ -415,13 +406,10 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# Check for disposable emails BEFORE provisioning tenant
# This prevents creating tenants for throwaway email addresses
try:
verify_email_domain(user_create.email)
except HTTPException as e:
verify_email_domain(user_create.email, is_registration=True)
except OnyxError as e:
# Log blocked disposable email attempts
if (
e.status_code == status.HTTP_400_BAD_REQUEST
and "Disposable email" in str(e.detail)
):
if "Disposable email" in e.detail:
domain = (
user_create.email.split("@")[-1]
if "@" in user_create.email
@@ -565,9 +553,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
result = await db_session.execute(
select(Persona.id)
.where(
Persona.featured.is_(True),
Persona.is_featured.is_(True),
Persona.is_public.is_(True),
Persona.is_visible.is_(True),
Persona.is_listed.is_(True),
Persona.deleted.is_(False),
)
.order_by(
@@ -695,6 +683,8 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
raise exceptions.UserNotExists()
except exceptions.UserNotExists:
verify_email_domain(account_email, is_registration=True)
# Check seat availability before creating (single-tenant only)
with get_session_with_current_tenant() as sync_db:
enforce_seat_limit(sync_db)
@@ -792,6 +782,18 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
except Exception:
logger.exception("Error deleting anonymous user cookie")
tenant_id = CURRENT_TENANT_ID_CONTEXTVAR.get()
# Link the anonymous PostHog session to the identified user so that
# pre-login session recordings and events merge into one person profile.
if anon_id := mt_cloud_get_anon_id(request):
mt_cloud_alias(distinct_id=str(user.id), anonymous_id=anon_id)
mt_cloud_identify(
distinct_id=str(user.id),
properties={"email": user.email, "tenant_id": tenant_id},
)
async def on_after_register(
self, user: User, request: Optional[Request] = None
) -> None:
@@ -810,6 +812,17 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
user_count = await get_user_count()
logger.debug(f"Current tenant user count: {user_count}")
# Link the anonymous PostHog session to the identified user so
# that pre-signup session recordings merge into one person profile.
if anon_id := mt_cloud_get_anon_id(request):
mt_cloud_alias(distinct_id=str(user.id), anonymous_id=anon_id)
# Ensure a PostHog person profile exists for this user.
mt_cloud_identify(
distinct_id=str(user.id),
properties={"email": user.email, "tenant_id": tenant_id},
)
mt_cloud_telemetry(
tenant_id=tenant_id,
distinct_id=str(user.id),
@@ -832,9 +845,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
attribute="get_marketing_posthog_cookie_name",
noop_return_value=None,
)
parse_marketing_cookie = fetch_ee_implementation_or_noop(
parse_posthog_cookie = fetch_ee_implementation_or_noop(
module="onyx.utils.posthog_client",
attribute="parse_marketing_cookie",
attribute="parse_posthog_cookie",
noop_return_value=None,
)
capture_and_sync_with_alternate_posthog = fetch_ee_implementation_or_noop(
@@ -848,7 +861,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
and user_count is not None
and (marketing_cookie_name := get_marketing_posthog_cookie_name())
and (marketing_cookie_value := request.cookies.get(marketing_cookie_name))
and (parsed_cookie := parse_marketing_cookie(marketing_cookie_value))
and (parsed_cookie := parse_posthog_cookie(marketing_cookie_value))
):
marketing_anonymous_id = parsed_cookie["distinct_id"]
@@ -1659,6 +1672,33 @@ async def _get_user_from_token_data(token_data: dict) -> User | None:
return user
_LOOPBACK_HOSTNAMES = frozenset({"localhost", "127.0.0.1", "::1"})
def _is_same_origin(actual: str, expected: str) -> bool:
"""Compare two origins for the WebSocket CSWSH check.
Scheme and hostname must match exactly. Port must also match, except
when the hostname is a loopback address (localhost / 127.0.0.1 / ::1),
where port is ignored. On loopback, all ports belong to the same
operator, so port differences carry no security significance — the
CSWSH threat is remote origins, not local ones.
"""
a = urlparse(actual.rstrip("/"))
e = urlparse(expected.rstrip("/"))
if a.scheme != e.scheme or a.hostname != e.hostname:
return False
if a.hostname in _LOOPBACK_HOSTNAMES:
return True
actual_port = a.port or (443 if a.scheme == "https" else 80)
expected_port = e.port or (443 if e.scheme == "https" else 80)
return actual_port == expected_port
async def current_user_from_websocket(
websocket: WebSocket,
token: str = Query(..., description="WebSocket authentication token"),
@@ -1678,19 +1718,15 @@ async def current_user_from_websocket(
This applies the same auth checks as current_user() for HTTP endpoints.
"""
# Check Origin header to prevent Cross-Site WebSocket Hijacking (CSWSH)
# Browsers always send Origin on WebSocket connections
# Check Origin header to prevent Cross-Site WebSocket Hijacking (CSWSH).
# Browsers always send Origin on WebSocket connections.
origin = websocket.headers.get("origin")
expected_origin = WEB_DOMAIN.rstrip("/")
if not origin:
logger.warning("WS auth: missing Origin header")
raise BasicAuthenticationError(detail="Access denied. Missing origin.")
actual_origin = origin.rstrip("/")
if actual_origin != expected_origin:
logger.warning(
f"WS auth: origin mismatch. Expected {expected_origin}, got {actual_origin}"
)
if not _is_same_origin(origin, WEB_DOMAIN):
logger.warning(f"WS auth: origin mismatch. Expected {WEB_DOMAIN}, got {origin}")
raise BasicAuthenticationError(detail="Access denied. Invalid origin.")
# Validate WS token in Redis (single-use, deleted after retrieval)

View File

@@ -13,6 +13,14 @@ from celery.signals import worker_shutdown
import onyx.background.celery.apps.app_base as app_base
from onyx.configs.constants import POSTGRES_CELERY_WORKER_DOCFETCHING_APP_NAME
from onyx.db.engine.sql_engine import SqlEngine
from onyx.server.metrics.celery_task_metrics import on_celery_task_postrun
from onyx.server.metrics.celery_task_metrics import on_celery_task_prerun
from onyx.server.metrics.celery_task_metrics import on_celery_task_rejected
from onyx.server.metrics.celery_task_metrics import on_celery_task_retry
from onyx.server.metrics.celery_task_metrics import on_celery_task_revoked
from onyx.server.metrics.indexing_task_metrics import on_indexing_task_postrun
from onyx.server.metrics.indexing_task_metrics import on_indexing_task_prerun
from onyx.server.metrics.metrics_server import start_metrics_server
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
@@ -34,6 +42,8 @@ def on_task_prerun(
**kwds: Any,
) -> None:
app_base.on_task_prerun(sender, task_id, task, args, kwargs, **kwds)
on_celery_task_prerun(task_id, task)
on_indexing_task_prerun(task_id, task, kwargs)
@signals.task_postrun.connect
@@ -48,6 +58,36 @@ def on_task_postrun(
**kwds: Any,
) -> None:
app_base.on_task_postrun(sender, task_id, task, args, kwargs, retval, state, **kwds)
on_celery_task_postrun(task_id, task, state)
on_indexing_task_postrun(task_id, task, kwargs, state)
@signals.task_retry.connect
def on_task_retry(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
# task_retry signal doesn't pass task_id in kwargs; get it from
# the sender (the task instance) via sender.request.id.
task_id = getattr(getattr(sender, "request", None), "id", None)
on_celery_task_retry(task_id, sender)
@signals.task_revoked.connect
def on_task_revoked(sender: Any | None = None, **kwargs: Any) -> None:
task_name = getattr(sender, "name", None) or str(sender)
on_celery_task_revoked(kwargs.get("task_id"), task_name)
@signals.task_rejected.connect
def on_task_rejected(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
# task_rejected sends the Consumer as sender, not the task instance.
# The task name must be extracted from the Celery message headers.
message = kwargs.get("message")
task_name: str | None = None
if message is not None:
headers = getattr(message, "headers", None) or {}
task_name = headers.get("task")
if task_name is None:
task_name = "unknown"
on_celery_task_rejected(None, task_name)
@celeryd_init.connect
@@ -76,6 +116,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
start_metrics_server("docfetching")
app_base.on_worker_ready(sender, **kwargs)

View File

@@ -14,6 +14,14 @@ from celery.signals import worker_shutdown
import onyx.background.celery.apps.app_base as app_base
from onyx.configs.constants import POSTGRES_CELERY_WORKER_DOCPROCESSING_APP_NAME
from onyx.db.engine.sql_engine import SqlEngine
from onyx.server.metrics.celery_task_metrics import on_celery_task_postrun
from onyx.server.metrics.celery_task_metrics import on_celery_task_prerun
from onyx.server.metrics.celery_task_metrics import on_celery_task_rejected
from onyx.server.metrics.celery_task_metrics import on_celery_task_retry
from onyx.server.metrics.celery_task_metrics import on_celery_task_revoked
from onyx.server.metrics.indexing_task_metrics import on_indexing_task_postrun
from onyx.server.metrics.indexing_task_metrics import on_indexing_task_prerun
from onyx.server.metrics.metrics_server import start_metrics_server
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
@@ -35,6 +43,8 @@ def on_task_prerun(
**kwds: Any,
) -> None:
app_base.on_task_prerun(sender, task_id, task, args, kwargs, **kwds)
on_celery_task_prerun(task_id, task)
on_indexing_task_prerun(task_id, task, kwargs)
@signals.task_postrun.connect
@@ -49,6 +59,36 @@ def on_task_postrun(
**kwds: Any,
) -> None:
app_base.on_task_postrun(sender, task_id, task, args, kwargs, retval, state, **kwds)
on_celery_task_postrun(task_id, task, state)
on_indexing_task_postrun(task_id, task, kwargs, state)
@signals.task_retry.connect
def on_task_retry(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
# task_retry signal doesn't pass task_id in kwargs; get it from
# the sender (the task instance) via sender.request.id.
task_id = getattr(getattr(sender, "request", None), "id", None)
on_celery_task_retry(task_id, sender)
@signals.task_revoked.connect
def on_task_revoked(sender: Any | None = None, **kwargs: Any) -> None:
task_name = getattr(sender, "name", None) or str(sender)
on_celery_task_revoked(kwargs.get("task_id"), task_name)
@signals.task_rejected.connect
def on_task_rejected(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
# task_rejected sends the Consumer as sender, not the task instance.
# The task name must be extracted from the Celery message headers.
message = kwargs.get("message")
task_name: str | None = None
if message is not None:
headers = getattr(message, "headers", None) or {}
task_name = headers.get("task")
if task_name is None:
task_name = "unknown"
on_celery_task_rejected(None, task_name)
@celeryd_init.connect
@@ -82,6 +122,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
start_metrics_server("docprocessing")
app_base.on_worker_ready(sender, **kwargs)
@@ -90,6 +131,12 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
app_base.on_worker_shutdown(sender, **kwargs)
# Note: worker_process_init only fires in prefork pool mode. Docprocessing uses
# worker_pool="threads" (see configs/docprocessing.py), so this handler is
# effectively a no-op in normal operation. It remains as a safety net in case
# the pool type is ever changed to prefork. Prometheus metrics are safe in
# thread-pool mode since all threads share the same process memory and can
# update the same Counter/Gauge/Histogram objects directly.
@worker_process_init.connect
def init_worker(**kwargs: Any) -> None: # noqa: ARG001
SqlEngine.reset_engine()

View File

@@ -54,8 +54,14 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
app_base.on_celeryd_init(sender, conf, **kwargs)
# Set by on_worker_init so on_worker_ready knows whether to start the server.
_prometheus_collectors_ok: bool = False
@worker_init.connect
def on_worker_init(sender: Any, **kwargs: Any) -> None:
global _prometheus_collectors_ok
logger.info("worker_init signal received.")
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")
@@ -65,6 +71,8 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
_prometheus_collectors_ok = _setup_prometheus_collectors(sender)
# Less startup checks in multi-tenant case
if MULTI_TENANT:
return
@@ -72,8 +80,37 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
app_base.on_secondary_worker_init(sender, **kwargs)
def _setup_prometheus_collectors(sender: Any) -> bool:
"""Register Prometheus collectors that need Redis/DB access.
Passes the Celery app so the queue depth collector can obtain a fresh
broker Redis client on each scrape (rather than holding a stale reference).
Returns True if registration succeeded, False otherwise.
"""
try:
from onyx.server.metrics.indexing_pipeline_setup import (
setup_indexing_pipeline_metrics,
)
setup_indexing_pipeline_metrics(sender.app)
logger.info("Prometheus indexing pipeline collectors registered")
return True
except Exception:
logger.exception("Failed to register Prometheus indexing pipeline collectors")
return False
@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
if _prometheus_collectors_ok:
from onyx.server.metrics.metrics_server import start_metrics_server
start_metrics_server("monitoring")
else:
logger.warning(
"Skipping Prometheus metrics server — collector registration failed"
)
app_base.on_worker_ready(sender, **kwargs)

View File

@@ -317,6 +317,7 @@ celery_app.autodiscover_tasks(
"onyx.background.celery.tasks.docprocessing",
"onyx.background.celery.tasks.evals",
"onyx.background.celery.tasks.hierarchyfetching",
"onyx.background.celery.tasks.hooks",
"onyx.background.celery.tasks.periodic",
"onyx.background.celery.tasks.pruning",
"onyx.background.celery.tasks.shared",

View File

@@ -1,5 +1,6 @@
# These are helper objects for tracking the keys we need to write in redis
import json
import threading
from typing import Any
from typing import cast
@@ -7,7 +8,59 @@ from celery import Celery
from redis import Redis
from onyx.background.celery.configs.base import CELERY_SEPARATOR
from onyx.configs.app_configs import REDIS_HEALTH_CHECK_INTERVAL
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import REDIS_SOCKET_KEEPALIVE_OPTIONS
_broker_client: Redis | None = None
_broker_url: str | None = None
_broker_client_lock = threading.Lock()
def celery_get_broker_client(app: Celery) -> Redis:
"""Return a shared Redis client connected to the Celery broker DB.
Uses a module-level singleton so all tasks on a worker share one
connection instead of creating a new one per call. The client
connects directly to the broker Redis DB (parsed from the broker URL).
Thread-safe via lock — safe for use in Celery thread-pool workers.
Usage:
r_celery = celery_get_broker_client(self.app)
length = celery_get_queue_length(queue, r_celery)
"""
global _broker_client, _broker_url
with _broker_client_lock:
url = app.conf.broker_url
if _broker_client is not None and _broker_url == url:
try:
_broker_client.ping()
return _broker_client
except Exception:
try:
_broker_client.close()
except Exception:
pass
_broker_client = None
elif _broker_client is not None:
try:
_broker_client.close()
except Exception:
pass
_broker_client = None
_broker_url = url
_broker_client = Redis.from_url(
url,
decode_responses=False,
health_check_interval=REDIS_HEALTH_CHECK_INTERVAL,
socket_keepalive=True,
socket_keepalive_options=REDIS_SOCKET_KEEPALIVE_OPTIONS,
retry_on_timeout=True,
)
return _broker_client
def celery_get_unacked_length(r: Redis) -> int:

View File

@@ -14,6 +14,7 @@ from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.hooks.utils import HOOKS_AVAILABLE
from shared_configs.configs import MULTI_TENANT
# choosing 15 minutes because it roughly gives us enough time to process many tasks
@@ -361,6 +362,19 @@ if not MULTI_TENANT:
tasks_to_schedule.extend(beat_task_templates)
if HOOKS_AVAILABLE:
tasks_to_schedule.append(
{
"name": "hook-execution-log-cleanup",
"task": OnyxCeleryTask.HOOK_EXECUTION_LOG_CLEANUP_TASK,
"schedule": timedelta(days=1),
"options": {
"priority": OnyxCeleryPriority.LOW,
"expires": BEAT_EXPIRES_DEFAULT,
},
}
)
def generate_cloud_tasks(
beat_tasks: list[dict], beat_templates: list[dict], beat_multiplier: float

View File

@@ -14,6 +14,7 @@ from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_redis import celery_get_queued_task_ids
from onyx.configs.app_configs import JOB_TIMEOUT
@@ -132,7 +133,6 @@ def revoke_tasks_blocking_deletion(
def check_for_connector_deletion_task(self: Task, *, tenant_id: str) -> bool | None:
r = get_redis_client()
r_replica = get_redis_replica_client()
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_DELETION_BEAT_LOCK,
@@ -149,6 +149,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str) -> bool | N
if not r.exists(OnyxRedisSignals.BLOCK_VALIDATE_CONNECTOR_DELETION_FENCES):
# clear fences that don't have associated celery tasks in progress
try:
r_celery = celery_get_broker_client(self.app)
validate_connector_deletion_fences(
tenant_id, r, r_replica, r_celery, lock_beat
)

View File

@@ -22,6 +22,7 @@ from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
from onyx.background.celery.memory_monitoring import emit_process_memory
@@ -449,7 +450,7 @@ def check_indexing_completion(
):
# Check if the task exists in the celery queue
# This handles the case where Redis dies after task creation but before task execution
redis_celery = task.app.broker_connection().channel().client # type: ignore
redis_celery = celery_get_broker_client(task.app)
task_exists = celery_find_task(
attempt.celery_task_id,
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING,

View File

@@ -29,6 +29,8 @@ from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.connectors.factory import ConnectorMissingException
from onyx.connectors.factory import identify_connector_class
from onyx.connectors.factory import instantiate_connector
from onyx.connectors.interfaces import HierarchyConnector
from onyx.connectors.models import HierarchyNode as PydanticHierarchyNode
@@ -55,6 +57,26 @@ logger = setup_logger()
HIERARCHY_FETCH_INTERVAL_SECONDS = 24 * 60 * 60
def _connector_supports_hierarchy_fetching(
cc_pair: ConnectorCredentialPair,
) -> bool:
"""Return True only for connectors whose class implements HierarchyConnector."""
try:
connector_class = identify_connector_class(
cc_pair.connector.source,
)
except ConnectorMissingException as e:
task_logger.warning(
"Skipping hierarchy fetching enqueue for source=%s input_type=%s: %s",
cc_pair.connector.source,
cc_pair.connector.input_type,
str(e),
)
return False
return issubclass(connector_class, HierarchyConnector)
def _is_hierarchy_fetching_due(cc_pair: ConnectorCredentialPair) -> bool:
"""Returns boolean indicating if hierarchy fetching is due for this connector.
@@ -186,7 +208,10 @@ def check_for_hierarchy_fetching(self: Task, *, tenant_id: str) -> int | None:
cc_pair_id=cc_pair_id,
)
if not cc_pair or not _is_hierarchy_fetching_due(cc_pair):
if not cc_pair or not _connector_supports_hierarchy_fetching(cc_pair):
continue
if not _is_hierarchy_fetching_due(cc_pair):
continue
task_id = _try_creating_hierarchy_fetching_task(

View File

@@ -0,0 +1,35 @@
from celery import shared_task
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import OnyxCeleryTask
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.hook import cleanup_old_execution_logs__no_commit
from onyx.utils.logger import setup_logger
logger = setup_logger()
_HOOK_EXECUTION_LOG_RETENTION_DAYS: int = 30
@shared_task(
name=OnyxCeleryTask.HOOK_EXECUTION_LOG_CLEANUP_TASK,
ignore_result=True,
soft_time_limit=JOB_TIMEOUT,
trail=False,
)
def hook_execution_log_cleanup_task(*, tenant_id: str) -> None: # noqa: ARG001
try:
with get_session_with_current_tenant() as db_session:
deleted: int = cleanup_old_execution_logs__no_commit(
db_session=db_session,
max_age_days=_HOOK_EXECUTION_LOG_RETENTION_DAYS,
)
db_session.commit()
if deleted:
logger.info(
f"Deleted {deleted} hook execution log(s) older than "
f"{_HOOK_EXECUTION_LOG_RETENTION_DAYS} days."
)
except Exception:
logger.exception("Failed to clean up hook execution logs")
raise

View File

@@ -1,6 +1,5 @@
import json
import time
from collections.abc import Callable
from datetime import timedelta
from itertools import islice
from typing import Any
@@ -19,6 +18,7 @@ from sqlalchemy import text
from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.memory_monitoring import emit_process_memory
@@ -698,31 +698,27 @@ def monitor_background_processes(self: Task, *, tenant_id: str) -> None:
return None
try:
# Get Redis client for Celery broker
redis_celery = self.app.broker_connection().channel().client # type: ignore
redis_std = get_redis_client()
# Define metric collection functions and their dependencies
metric_functions: list[Callable[[], list[Metric]]] = [
lambda: _collect_queue_metrics(redis_celery),
lambda: _collect_connector_metrics(db_session, redis_std),
lambda: _collect_sync_metrics(db_session, redis_std),
]
# Collect queue metrics with broker connection
r_celery = celery_get_broker_client(self.app)
queue_metrics = _collect_queue_metrics(r_celery)
# Collect and log each metric
# Collect remaining metrics (no broker connection needed)
with get_session_with_current_tenant() as db_session:
for metric_fn in metric_functions:
metrics = metric_fn()
for metric in metrics:
# double check to make sure we aren't double-emitting metrics
if metric.key is None or not _has_metric_been_emitted(
redis_std, metric.key
):
metric.log()
metric.emit(tenant_id)
all_metrics: list[Metric] = queue_metrics
all_metrics.extend(_collect_connector_metrics(db_session, redis_std))
all_metrics.extend(_collect_sync_metrics(db_session, redis_std))
if metric.key is not None:
_mark_metric_as_emitted(redis_std, metric.key)
for metric in all_metrics:
if metric.key is None or not _has_metric_been_emitted(
redis_std, metric.key
):
metric.log()
metric.emit(tenant_id)
if metric.key is not None:
_mark_metric_as_emitted(redis_std, metric.key)
task_logger.info("Successfully collected background metrics")
except SoftTimeLimitExceeded:
@@ -890,7 +886,7 @@ def monitor_celery_queues_helper(
) -> None:
"""A task to monitor all celery queue lengths."""
r_celery = task.app.broker_connection().channel().client # type: ignore
r_celery = celery_get_broker_client(task.app)
n_celery = celery_get_queue_length(OnyxCeleryQueues.PRIMARY, r_celery)
n_docfetching = celery_get_queue_length(
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery
@@ -1080,7 +1076,7 @@ def cloud_monitor_celery_pidbox(
num_deleted = 0
MAX_PIDBOX_IDLE = 24 * 3600 # 1 day in seconds
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
r_celery = celery_get_broker_client(self.app)
for key in r_celery.scan_iter("*.reply.celery.pidbox"):
key_bytes = cast(bytes, key)
key_str = key_bytes.decode("utf-8")

View File

@@ -17,6 +17,7 @@ from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_redis import celery_get_queued_task_ids
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
@@ -203,7 +204,6 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
r = get_redis_client()
r_replica = get_redis_replica_client()
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK,
@@ -261,6 +261,7 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
r_celery = celery_get_broker_client(self.app)
validate_pruning_fences(tenant_id, r, r_replica, r_celery, lock_beat)
except Exception:
task_logger.exception("Exception while validating pruning fences")

View File

@@ -16,6 +16,7 @@ from sqlalchemy.orm import Session
from onyx.access.access import build_access_for_user_files
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
from onyx.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex
@@ -24,6 +25,7 @@ from onyx.configs.app_configs import MANAGED_VESPA
from onyx.configs.app_configs import VESPA_CLOUD_CERT_PATH
from onyx.configs.app_configs import VESPA_CLOUD_KEY_PATH
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_USER_FILE_DELETE_TASK_EXPIRES
from onyx.configs.constants import CELERY_USER_FILE_PROCESSING_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_USER_FILE_PROCESSING_TASK_EXPIRES
from onyx.configs.constants import CELERY_USER_FILE_PROJECT_SYNC_LOCK_TIMEOUT
@@ -33,6 +35,7 @@ from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import USER_FILE_DELETE_MAX_QUEUE_DEPTH
from onyx.configs.constants import USER_FILE_PROCESSING_MAX_QUEUE_DEPTH
from onyx.configs.constants import USER_FILE_PROJECT_SYNC_MAX_QUEUE_DEPTH
from onyx.connectors.file.connector import LocalFileConnector
@@ -91,8 +94,19 @@ def _user_file_delete_lock_key(user_file_id: str | UUID) -> str:
return f"{OnyxRedisLocks.USER_FILE_DELETE_LOCK_PREFIX}:{user_file_id}"
def _user_file_delete_queued_key(user_file_id: str | UUID) -> str:
"""Key that exists while a delete_single_user_file task is sitting in the queue.
The beat generator sets this with a TTL equal to CELERY_USER_FILE_DELETE_TASK_EXPIRES
before enqueuing and the worker deletes it as its first action. This prevents
the beat from adding duplicate tasks for files that already have a live task
in flight.
"""
return f"{OnyxRedisLocks.USER_FILE_DELETE_QUEUED_PREFIX}:{user_file_id}"
def get_user_file_project_sync_queue_depth(celery_app: Celery) -> int:
redis_celery: Redis = celery_app.broker_connection().channel().client # type: ignore
redis_celery = celery_get_broker_client(celery_app)
return celery_get_queue_length(
OnyxCeleryQueues.USER_FILE_PROJECT_SYNC, redis_celery
)
@@ -225,7 +239,7 @@ def check_user_file_processing(self: Task, *, tenant_id: str) -> None:
skipped_guard = 0
try:
# --- Protection 1: queue depth backpressure ---
r_celery = self.app.broker_connection().channel().client # type: ignore
r_celery = celery_get_broker_client(self.app)
queue_len = celery_get_queue_length(
OnyxCeleryQueues.USER_FILE_PROCESSING, r_celery
)
@@ -546,7 +560,23 @@ def process_single_user_file(
ignore_result=True,
)
def check_for_user_file_delete(self: Task, *, tenant_id: str) -> None:
"""Scan for user files with DELETING status and enqueue per-file tasks."""
"""Scan for user files with DELETING status and enqueue per-file tasks.
Three mechanisms prevent queue runaway (mirrors check_user_file_processing):
1. **Queue depth backpressure** if the broker queue already has more than
USER_FILE_DELETE_MAX_QUEUE_DEPTH items we skip this beat cycle entirely.
2. **Per-file queued guard** before enqueuing a task we set a short-lived
Redis key (TTL = CELERY_USER_FILE_DELETE_TASK_EXPIRES). If that key
already exists the file already has a live task in the queue, so we skip
it. The worker deletes the key the moment it picks up the task so the
next beat cycle can re-enqueue if the file is still DELETING.
3. **Task expiry** every enqueued task carries an `expires` value equal to
CELERY_USER_FILE_DELETE_TASK_EXPIRES. If a task is still sitting in
the queue after that deadline, Celery discards it without touching the DB.
"""
task_logger.info("check_for_user_file_delete - Starting")
redis_client = get_redis_client(tenant_id=tenant_id)
lock: RedisLock = redis_client.lock(
@@ -555,8 +585,23 @@ def check_for_user_file_delete(self: Task, *, tenant_id: str) -> None:
)
if not lock.acquire(blocking=False):
return None
enqueued = 0
skipped_guard = 0
try:
# --- Protection 1: queue depth backpressure ---
# NOTE: must use the broker's Redis client (not redis_client) because
# Celery queues live on a separate Redis DB with CELERY_SEPARATOR keys.
r_celery = celery_get_broker_client(self.app)
queue_len = celery_get_queue_length(OnyxCeleryQueues.USER_FILE_DELETE, r_celery)
if queue_len > USER_FILE_DELETE_MAX_QUEUE_DEPTH:
task_logger.warning(
f"check_for_user_file_delete - Queue depth {queue_len} exceeds "
f"{USER_FILE_DELETE_MAX_QUEUE_DEPTH}, skipping enqueue for "
f"tenant={tenant_id}"
)
return None
with get_session_with_current_tenant() as db_session:
user_file_ids = (
db_session.execute(
@@ -568,23 +613,40 @@ def check_for_user_file_delete(self: Task, *, tenant_id: str) -> None:
.all()
)
for user_file_id in user_file_ids:
self.app.send_task(
OnyxCeleryTask.DELETE_SINGLE_USER_FILE,
kwargs={"user_file_id": str(user_file_id), "tenant_id": tenant_id},
queue=OnyxCeleryQueues.USER_FILE_DELETE,
priority=OnyxCeleryPriority.HIGH,
# --- Protection 2: per-file queued guard ---
queued_key = _user_file_delete_queued_key(user_file_id)
guard_set = redis_client.set(
queued_key,
1,
ex=CELERY_USER_FILE_DELETE_TASK_EXPIRES,
nx=True,
)
if not guard_set:
skipped_guard += 1
continue
# --- Protection 3: task expiry ---
try:
self.app.send_task(
OnyxCeleryTask.DELETE_SINGLE_USER_FILE,
kwargs={
"user_file_id": str(user_file_id),
"tenant_id": tenant_id,
},
queue=OnyxCeleryQueues.USER_FILE_DELETE,
priority=OnyxCeleryPriority.HIGH,
expires=CELERY_USER_FILE_DELETE_TASK_EXPIRES,
)
except Exception:
redis_client.delete(queued_key)
raise
enqueued += 1
except Exception as e:
task_logger.exception(
f"check_for_user_file_delete - Error enqueuing deletes - {e.__class__.__name__}"
)
return None
finally:
if lock.owned():
lock.release()
task_logger.info(
f"check_for_user_file_delete - Enqueued {enqueued} tasks for tenant={tenant_id}"
f"check_for_user_file_delete - Enqueued {enqueued} tasks, skipped_guard={skipped_guard} for tenant={tenant_id}"
)
return None
@@ -602,6 +664,9 @@ def delete_user_file_impl(
file_lock: RedisLock | None = None
if redis_locking:
redis_client = get_redis_client(tenant_id=tenant_id)
# Clear the queued guard so the beat can re-enqueue if deletion fails
# and the file remains in DELETING status.
redis_client.delete(_user_file_delete_queued_key(user_file_id))
file_lock = redis_client.lock(
_user_file_delete_lock_key(user_file_id),
timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT,

View File

@@ -297,7 +297,9 @@ class PostgresCacheBackend(CacheBackend):
def _lock_id_for(self, name: str) -> int:
"""Map *name* to a 64-bit signed int for ``pg_advisory_lock``."""
h = hashlib.md5(f"{self._tenant_id}:{name}".encode()).digest()
h = hashlib.md5(
f"{self._tenant_id}:{name}".encode(), usedforsecurity=False
).digest()
return struct.unpack("q", h[:8])[0]

View File

@@ -1,19 +1,8 @@
import threading
import time
from collections.abc import Callable
from collections.abc import Generator
from queue import Empty
from onyx.chat.citation_processor import CitationMapping
from onyx.chat.emitter import Emitter
from onyx.context.search.models import SearchDoc
from onyx.server.query_and_chat.placement import Placement
from onyx.server.query_and_chat.streaming_models import OverallStop
from onyx.server.query_and_chat.streaming_models import Packet
from onyx.server.query_and_chat.streaming_models import PacketException
from onyx.tools.models import ToolCallInfo
from onyx.utils.threadpool_concurrency import run_in_background
from onyx.utils.threadpool_concurrency import wait_on_background
# Type alias for search doc deduplication key
# Simple key: just document_id (str)
@@ -159,114 +148,3 @@ class ChatStateContainer:
"""Thread-safe getter for emitted citations (returns a copy)."""
with self._lock:
return self._emitted_citations.copy()
def run_chat_loop_with_state_containers(
chat_loop_func: Callable[[Emitter, ChatStateContainer], None],
completion_callback: Callable[[ChatStateContainer], None],
is_connected: Callable[[], bool],
emitter: Emitter,
state_container: ChatStateContainer,
) -> Generator[Packet, None]:
"""
Explicit wrapper function that runs a function in a background thread
with event streaming capabilities.
The wrapped function should accept emitter as first arg and use it to emit
Packet objects. This wrapper polls every 300ms to check if stop signal is set.
Args:
func: The function to wrap (should accept emitter and state_container as first and second args)
completion_callback: Callback function to call when the function completes
emitter: Emitter instance for sending packets
state_container: ChatStateContainer instance for accumulating state
is_connected: Callable that returns False when stop signal is set
Usage:
packets = run_chat_loop_with_state_containers(
my_func,
completion_callback=completion_callback,
emitter=emitter,
state_container=state_container,
is_connected=check_func,
)
for packet in packets:
# Process packets
pass
"""
def run_with_exception_capture() -> None:
try:
chat_loop_func(emitter, state_container)
except Exception as e:
# If execution fails, emit an exception packet
emitter.emit(
Packet(
placement=Placement(turn_index=0),
obj=PacketException(type="error", exception=e),
)
)
# Run the function in a background thread
thread = run_in_background(run_with_exception_capture)
pkt: Packet | None = None
last_turn_index = 0 # Track the highest turn_index seen for stop packet
last_cancel_check = time.monotonic()
cancel_check_interval = 0.3 # Check for cancellation every 300ms
try:
while True:
# Poll queue with 300ms timeout for natural stop signal checking
# the 300ms timeout is to avoid busy-waiting and to allow the stop signal to be checked regularly
try:
pkt = emitter.bus.get(timeout=0.3)
except Empty:
if not is_connected():
# Stop signal detected
yield Packet(
placement=Placement(turn_index=last_turn_index + 1),
obj=OverallStop(type="stop", stop_reason="user_cancelled"),
)
break
last_cancel_check = time.monotonic()
continue
if pkt is not None:
# Track the highest turn_index for the stop packet
if pkt.placement and pkt.placement.turn_index > last_turn_index:
last_turn_index = pkt.placement.turn_index
if isinstance(pkt.obj, OverallStop):
yield pkt
break
elif isinstance(pkt.obj, PacketException):
raise pkt.obj.exception
else:
yield pkt
# Check for cancellation periodically even when packets are flowing
# This ensures stop signal is checked during active streaming
current_time = time.monotonic()
if current_time - last_cancel_check >= cancel_check_interval:
if not is_connected():
# Stop signal detected during streaming
yield Packet(
placement=Placement(turn_index=last_turn_index + 1),
obj=OverallStop(type="stop", stop_reason="user_cancelled"),
)
break
last_cancel_check = current_time
finally:
# Wait for thread to complete on normal exit to propagate exceptions and ensure cleanup.
# Skip waiting if user disconnected to exit quickly.
if is_connected():
wait_on_background(thread)
try:
completion_callback(state_container)
except Exception as e:
emitter.emit(
Packet(
placement=Placement(turn_index=last_turn_index + 1),
obj=PacketException(type="error", exception=e),
)
)

View File

@@ -30,6 +30,8 @@ from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_store.file_store import get_default_file_store
from onyx.file_store.models import ChatFileType
from onyx.file_store.models import FileDescriptor
from onyx.file_store.utils import plaintext_file_name_for_id
from onyx.file_store.utils import store_plaintext
from onyx.kg.models import KGException
from onyx.kg.setup.kg_default_entity_definitions import (
populate_missing_default_entity_types__commit,
@@ -289,6 +291,33 @@ def process_kg_commands(
raise KGException("KG setup done")
def _get_or_extract_plaintext(
file_id: str,
extract_fn: Callable[[], str],
) -> str:
"""Load cached plaintext for a file, or extract and store it.
Tries to read pre-stored plaintext from the file store. On a miss,
calls extract_fn to produce the text, then stores the result so
future calls skip the expensive extraction.
"""
file_store = get_default_file_store()
plaintext_key = plaintext_file_name_for_id(file_id)
# Try cached plaintext first.
try:
plaintext_io = file_store.read_file(plaintext_key, mode="b")
return plaintext_io.read().decode("utf-8")
except Exception:
logger.exception(f"Error when reading file, id={file_id}")
# Cache miss — extract and store.
content_text = extract_fn()
if content_text:
store_plaintext(file_id, content_text)
return content_text
@log_function_time(print_only=True)
def load_chat_file(
file_descriptor: FileDescriptor, db_session: Session
@@ -303,12 +332,23 @@ def load_chat_file(
file_type = ChatFileType(file_descriptor["type"])
if file_type.is_text_file():
try:
content_text = extract_file_text(
file_id = file_descriptor["id"]
def _extract() -> str:
return extract_file_text(
file=file_io,
file_name=file_descriptor.get("name") or "",
break_on_unprocessable=False,
)
# Use the user_file_id as cache key when available (matches what
# the celery indexing worker stores), otherwise fall back to the
# file store id (covers code-interpreter-generated files, etc.).
user_file_id_str = file_descriptor.get("user_file_id")
cache_key = user_file_id_str or file_id
try:
content_text = _get_or_extract_plaintext(cache_key, _extract)
except Exception as e:
logger.warning(
f"Failed to retrieve content for file {file_descriptor['id']}: {str(e)}"

View File

@@ -1,19 +1,40 @@
import threading
from queue import Queue
from onyx.server.query_and_chat.placement import Placement
from onyx.server.query_and_chat.streaming_models import Packet
class Emitter:
"""Use this inside tools to emit arbitrary UI progress."""
"""Routes packets from LLM/tool execution to the ``_run_models`` drain loop.
def __init__(self, bus: Queue):
self.bus = bus
Tags every packet with ``model_index`` and places it on ``merged_queue``
as a ``(model_idx, packet)`` tuple for ordered consumption downstream.
Args:
merged_queue: Shared queue owned by ``_run_models``.
model_idx: Index embedded in packet placements (``0`` for N=1 runs).
drain_done: Optional event set by ``_run_models`` when the drain loop
exits early (e.g. HTTP disconnect). When set, ``emit`` returns
immediately so worker threads can exit fast.
"""
def __init__(
self,
merged_queue: Queue[tuple[int, Packet | Exception | object]],
model_idx: int = 0,
drain_done: threading.Event | None = None,
) -> None:
self._model_idx = model_idx
self._merged_queue = merged_queue
self._drain_done = drain_done
def emit(self, packet: Packet) -> None:
self.bus.put(packet) # Thread-safe
def get_default_emitter() -> Emitter:
bus: Queue[Packet] = Queue()
emitter = Emitter(bus)
return emitter
if self._drain_done is not None and self._drain_done.is_set():
return
base = packet.placement or Placement(turn_index=0)
tagged = Packet(
placement=base.model_copy(update={"model_index": self._model_idx}),
obj=packet.obj,
)
self._merged_queue.put((self._model_idx, tagged))

View File

@@ -36,9 +36,11 @@ from onyx.db.memory import add_memory
from onyx.db.memory import update_memory_at_index
from onyx.db.memory import UserMemoryContext
from onyx.db.models import Persona
from onyx.llm.constants import LlmProviderNames
from onyx.llm.interfaces import LLM
from onyx.llm.interfaces import LLMUserIdentity
from onyx.llm.interfaces import ToolChoiceOptions
from onyx.llm.utils import is_true_openai_model
from onyx.prompts.chat_prompts import IMAGE_GEN_REMINDER
from onyx.prompts.chat_prompts import OPEN_URL_REMINDER
from onyx.server.query_and_chat.placement import Placement
@@ -72,6 +74,70 @@ from shared_configs.contextvars import get_current_tenant_id
logger = setup_logger()
class EmptyLLMResponseError(RuntimeError):
"""Raised when the streamed LLM response completes without a usable answer."""
def __init__(
self,
*,
provider: str,
model: str,
tool_choice: ToolChoiceOptions,
client_error_msg: str,
error_code: str = "EMPTY_LLM_RESPONSE",
is_retryable: bool = True,
) -> None:
super().__init__(client_error_msg)
self.provider = provider
self.model = model
self.tool_choice = tool_choice
self.client_error_msg = client_error_msg
self.error_code = error_code
self.is_retryable = is_retryable
def _build_empty_llm_response_error(
llm: LLM,
llm_step_result: LlmStepResult,
tool_choice: ToolChoiceOptions,
) -> EmptyLLMResponseError:
provider = llm.config.model_provider
model = llm.config.model_name
# OpenAI quota exhaustion has reached us as a streamed "stop" with zero content.
# When the stream is completely empty and there is no reasoning/tool output, surface
# the likely account-level cause instead of a generic tool-calling error.
if (
not llm_step_result.reasoning
and provider == LlmProviderNames.OPENAI
and is_true_openai_model(provider, model)
):
return EmptyLLMResponseError(
provider=provider,
model=model,
tool_choice=tool_choice,
client_error_msg=(
"The selected OpenAI model returned an empty streamed response "
"before producing any tokens. This commonly happens when the API "
"key or project has no remaining quota or billing is not enabled. "
"Verify quota and billing for this key and try again."
),
error_code="BUDGET_EXCEEDED",
is_retryable=False,
)
return EmptyLLMResponseError(
provider=provider,
model=model,
tool_choice=tool_choice,
client_error_msg=(
"The selected model returned no final answer before the stream "
"completed. No text or tool calls were received from the upstream "
"provider."
),
)
def _looks_like_xml_tool_call_payload(text: str | None) -> bool:
"""Detect XML-style marshaled tool calls emitted as plain text."""
if not text:
@@ -613,7 +679,12 @@ def run_llm_loop(
)
citation_processor.update_citation_mapping(project_citation_mapping)
llm_step_result: LlmStepResult | None = None
llm_step_result = LlmStepResult(
reasoning=None,
answer=None,
tool_calls=None,
raw_answer=None,
)
# Pass the total budget to construct_message_history, which will handle token allocation
available_tokens = llm.config.max_input_tokens
@@ -1084,12 +1155,18 @@ def run_llm_loop(
# As long as 1 tool with citeable documents is called at any point, we ask the LLM to try to cite
should_cite_documents = True
if not llm_step_result or not llm_step_result.answer:
if not llm_step_result.answer and not llm_step_result.tool_calls:
raise _build_empty_llm_response_error(
llm=llm,
llm_step_result=llm_step_result,
tool_choice=tool_choice,
)
if not llm_step_result.answer:
raise RuntimeError(
"The LLM did not return an answer. "
"Typically this is an issue with LLMs that do not support tool calling natively, "
"or the model serving API is not configured correctly. "
"This may also happen with models that are lower quality outputting invalid tool calls."
"The LLM did not return a final answer after tool execution. "
"Typically this indicates invalid tool-call output, a model/provider mismatch, "
"or serving API misconfiguration."
)
emitter.emit(

View File

@@ -1013,6 +1013,10 @@ def run_llm_step_pkt_generator(
accumulated_reasoning = ""
accumulated_answer = ""
accumulated_raw_answer = ""
stream_chunk_count = 0
actionable_chunk_count = 0
empty_chunk_count = 0
finish_reasons: set[str] = set()
xml_tool_call_content_filter = _XmlToolCallContentFilter()
processor_state: Any = None
@@ -1145,6 +1149,7 @@ def run_llm_step_pkt_generator(
user_identity=user_identity,
timeout_override=timeout_override,
):
stream_chunk_count += 1
if packet.usage:
usage = packet.usage
span_generation.span_data.usage = {
@@ -1154,16 +1159,21 @@ def run_llm_step_pkt_generator(
"cache_creation_input_tokens": usage.cache_creation_input_tokens,
}
# Note: LLM cost tracking is now handled in multi_llm.py
finish_reason = packet.choice.finish_reason
if finish_reason:
finish_reasons.add(str(finish_reason))
delta = packet.choice.delta
# Weird behavior from some model providers, just log and ignore for now
if (
delta.content is None
not delta.content
and delta.reasoning_content is None
and delta.tool_calls is None
and not delta.tool_calls
):
empty_chunk_count += 1
logger.warning(
f"LLM packet is empty (no contents, reasoning or tool calls). Skipping: {packet}"
"LLM packet is empty (no content, reasoning, or tool calls). "
f"finish_reason={finish_reason}. Skipping: {packet}"
)
continue
@@ -1172,6 +1182,8 @@ def run_llm_step_pkt_generator(
time.monotonic() - stream_start_time
)
first_action_recorded = True
if _delta_has_action(delta):
actionable_chunk_count += 1
if custom_token_processor:
# The custom token processor can modify the deltas for specific custom logic
@@ -1307,6 +1319,15 @@ def run_llm_step_pkt_generator(
else:
logger.debug("Tool calls: []")
if actionable_chunk_count == 0:
logger.warning(
"LLM stream completed with no actionable deltas. "
f"chunks={stream_chunk_count}, empty_chunks={empty_chunk_count}, "
f"finish_reasons={sorted(finish_reasons)}, "
f"provider={llm.config.model_provider}, model={llm.config.model_name}, "
f"tool_choice={tool_choice}, tools_sent={len(tool_definitions)}"
)
return (
LlmStepResult(
reasoning=accumulated_reasoning if accumulated_reasoning else None,

View File

@@ -8,6 +8,7 @@ from onyx.configs.constants import MessageType
from onyx.context.search.models import SearchDoc
from onyx.file_store.models import InMemoryChatFile
from onyx.server.query_and_chat.models import MessageResponseIDInfo
from onyx.server.query_and_chat.models import MultiModelMessageResponseIDInfo
from onyx.server.query_and_chat.streaming_models import CitationInfo
from onyx.server.query_and_chat.streaming_models import GeneratedImage
from onyx.server.query_and_chat.streaming_models import Packet
@@ -35,7 +36,13 @@ class CreateChatSessionID(BaseModel):
chat_session_id: UUID
AnswerStreamPart = Packet | MessageResponseIDInfo | StreamingError | CreateChatSessionID
AnswerStreamPart = (
Packet
| MessageResponseIDInfo
| MultiModelMessageResponseIDInfo
| StreamingError
| CreateChatSessionID
)
AnswerStream = Iterator[AnswerStreamPart]
@@ -177,8 +184,8 @@ class ExtractedContextFiles(BaseModel):
class SearchParams(BaseModel):
"""Resolved search filter IDs and search-tool usage for a chat turn."""
search_project_id: int | None
search_persona_id: int | None
project_id_filter: int | None
persona_id_filter: int | None
search_usage: SearchToolUsage

File diff suppressed because it is too large Load Diff

View File

@@ -44,6 +44,31 @@ SEND_USER_METADATA_TO_LLM_PROVIDER = (
# User Facing Features Configs
#####
BLURB_SIZE = 128 # Number Encoder Tokens included in the chunk blurb
# Hard ceiling for the admin-configurable file upload size (in MB).
# Self-hosted customers can raise or lower this via the environment variable.
_raw_max_upload_size_mb = int(os.environ.get("MAX_ALLOWED_UPLOAD_SIZE_MB", "250"))
if _raw_max_upload_size_mb < 0:
logger.warning(
"MAX_ALLOWED_UPLOAD_SIZE_MB=%d is negative; falling back to 250",
_raw_max_upload_size_mb,
)
_raw_max_upload_size_mb = 250
MAX_ALLOWED_UPLOAD_SIZE_MB = _raw_max_upload_size_mb
# Default fallback for the per-user file upload size limit (in MB) when no
# admin-configured value exists. Clamped to MAX_ALLOWED_UPLOAD_SIZE_MB at
# runtime so this never silently exceeds the hard ceiling.
_raw_default_upload_size_mb = int(
os.environ.get("DEFAULT_USER_FILE_MAX_UPLOAD_SIZE_MB", "100")
)
if _raw_default_upload_size_mb < 0:
logger.warning(
"DEFAULT_USER_FILE_MAX_UPLOAD_SIZE_MB=%d is negative; falling back to 100",
_raw_default_upload_size_mb,
)
_raw_default_upload_size_mb = 100
DEFAULT_USER_FILE_MAX_UPLOAD_SIZE_MB = _raw_default_upload_size_mb
GENERATIVE_MODEL_ACCESS_CHECK_FREQ = int(
os.environ.get("GENERATIVE_MODEL_ACCESS_CHECK_FREQ") or 86400
) # 1 day
@@ -61,17 +86,6 @@ CACHE_BACKEND = CacheBackendType(
os.environ.get("CACHE_BACKEND", CacheBackendType.REDIS)
)
# Maximum token count for a single uploaded file. Files exceeding this are rejected.
# Defaults to 100k tokens (or 10M when vector DB is disabled).
_DEFAULT_FILE_TOKEN_LIMIT = 10_000_000 if DISABLE_VECTOR_DB else 100_000
FILE_TOKEN_COUNT_THRESHOLD = int(
os.environ.get("FILE_TOKEN_COUNT_THRESHOLD", str(_DEFAULT_FILE_TOKEN_LIMIT))
)
# Maximum upload size for a single user file (chat/projects) in MB.
USER_FILE_MAX_UPLOAD_SIZE_MB = int(os.environ.get("USER_FILE_MAX_UPLOAD_SIZE_MB") or 50)
USER_FILE_MAX_UPLOAD_SIZE_BYTES = USER_FILE_MAX_UPLOAD_SIZE_MB * 1024 * 1024
# If set to true, will show extra/uncommon connectors in the "Other" category
SHOW_EXTRA_CONNECTORS = os.environ.get("SHOW_EXTRA_CONNECTORS", "").lower() == "true"
@@ -278,14 +292,17 @@ USING_AWS_MANAGED_OPENSEARCH = (
OPENSEARCH_PROFILING_DISABLED = (
os.environ.get("OPENSEARCH_PROFILING_DISABLED", "").lower() == "true"
)
# Whether to disable match highlights for OpenSearch. Defaults to True for now
# as we investigate query performance.
OPENSEARCH_MATCH_HIGHLIGHTS_DISABLED = (
os.environ.get("OPENSEARCH_MATCH_HIGHLIGHTS_DISABLED", "true").lower() == "true"
)
# When enabled, OpenSearch returns detailed score breakdowns for each hit.
# Useful for debugging and tuning search relevance. Has ~10-30% performance overhead according to documentation.
# Seems for Hybrid Search in practice, the impact is actually more like 1000x slower.
OPENSEARCH_EXPLAIN_ENABLED = (
os.environ.get("OPENSEARCH_EXPLAIN_ENABLED", "").lower() == "true"
)
# Analyzer used for full-text fields (title, content). Use OpenSearch built-in analyzer
# names (e.g. "english", "standard", "german"). Affects stemming and tokenization;
# existing indices need reindexing after a change.
@@ -318,8 +335,20 @@ VERIFY_CREATE_OPENSEARCH_INDEX_ON_INIT_MT = (
OPENSEARCH_MIGRATION_GET_VESPA_CHUNKS_PAGE_SIZE = int(
os.environ.get("OPENSEARCH_MIGRATION_GET_VESPA_CHUNKS_PAGE_SIZE") or 500
)
OPENSEARCH_OVERRIDE_DEFAULT_NUM_HYBRID_SEARCH_CANDIDATES = int(
os.environ.get("OPENSEARCH_DEFAULT_NUM_HYBRID_SEARCH_CANDIDATES") or 0
# If set, will override the default number of shards and replicas for the index.
OPENSEARCH_INDEX_NUM_SHARDS: int | None = (
int(os.environ["OPENSEARCH_INDEX_NUM_SHARDS"])
if os.environ.get("OPENSEARCH_INDEX_NUM_SHARDS", None) is not None
else None
)
OPENSEARCH_INDEX_NUM_REPLICAS: int | None = (
int(os.environ["OPENSEARCH_INDEX_NUM_REPLICAS"])
if os.environ.get("OPENSEARCH_INDEX_NUM_REPLICAS", None) is not None
else None
)
ONYX_SEARCH_UI_USES_OPENSEARCH_KEYWORD_SEARCH = (
os.environ.get("ONYX_SEARCH_UI_USES_OPENSEARCH_KEYWORD_SEARCH", "").lower()
== "true"
)
VESPA_HOST = os.environ.get("VESPA_HOST") or "localhost"
@@ -776,6 +805,10 @@ MINI_CHUNK_SIZE = 150
# This is the number of regular chunks per large chunk
LARGE_CHUNK_RATIO = 4
# The maximum number of chunks that can be held for 1 document processing batch
# The purpose of this is to set an upper bound on memory usage
MAX_CHUNKS_PER_DOC_BATCH = int(os.environ.get("MAX_CHUNKS_PER_DOC_BATCH") or 1000)
# Include the document level metadata in each chunk. If the metadata is too long, then it is thrown out
# We don't want the metadata to overwhelm the actual contents of the chunk
SKIP_METADATA_IN_CHUNK = os.environ.get("SKIP_METADATA_IN_CHUNK", "").lower() == "true"

View File

@@ -24,11 +24,11 @@ CONTEXT_CHUNKS_BELOW = int(os.environ.get("CONTEXT_CHUNKS_BELOW") or 1)
LLM_SOCKET_READ_TIMEOUT = int(
os.environ.get("LLM_SOCKET_READ_TIMEOUT") or "60"
) # 60 seconds
# Weighting factor between Vector and Keyword Search, 1 for completely vector search
# Weighting factor between vector and keyword Search; 1 for completely vector
# search, 0 for keyword. Enforces a valid range of [0, 1]. A supplied value from
# the env outside of this range will be clipped to the respective end of the
# range. Defaults to 0.5.
HYBRID_ALPHA = max(0, min(1, float(os.environ.get("HYBRID_ALPHA") or 0.5)))
HYBRID_ALPHA_KEYWORD = max(
0, min(1, float(os.environ.get("HYBRID_ALPHA_KEYWORD") or 0.4))
)
# Weighting factor between Title and Content of documents during search, 1 for completely
# Title based. Default heavily favors Content because Title is also included at the top of
# Content. This is to avoid cases where the Content is very relevant but it may not be clear

View File

@@ -177,6 +177,14 @@ USER_FILE_PROJECT_SYNC_MAX_QUEUE_DEPTH = 500
CELERY_USER_FILE_PROJECT_SYNC_LOCK_TIMEOUT = 5 * 60 # 5 minutes (in seconds)
# How long a queued user-file-delete task is valid before workers discard it.
# Mirrors the processing task expiry to prevent indefinite queue growth when
# files are stuck in DELETING status and the beat keeps re-enqueuing them.
CELERY_USER_FILE_DELETE_TASK_EXPIRES = 60 # 1 minute (in seconds)
# Max queue depth before the delete beat stops enqueuing more delete tasks.
USER_FILE_DELETE_MAX_QUEUE_DEPTH = 500
CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT = 5 * 60 # 5 minutes (in seconds)
DANSWER_REDIS_FUNCTION_LOCK_PREFIX = "da_function_lock:"
@@ -469,6 +477,9 @@ class OnyxRedisLocks:
USER_FILE_PROJECT_SYNC_QUEUED_PREFIX = "da_lock:user_file_project_sync_queued"
USER_FILE_DELETE_BEAT_LOCK = "da_lock:check_user_file_delete_beat"
USER_FILE_DELETE_LOCK_PREFIX = "da_lock:user_file_delete"
# Short-lived key set when a delete task is enqueued; cleared when the worker picks it up.
# Prevents the beat from re-enqueuing the same file while a delete task is already queued.
USER_FILE_DELETE_QUEUED_PREFIX = "da_lock:user_file_delete_queued"
# Release notes
RELEASE_NOTES_FETCH_LOCK = "da_lock:release_notes_fetch"
@@ -597,6 +608,9 @@ class OnyxCeleryTask:
EXPORT_QUERY_HISTORY_TASK = "export_query_history_task"
EXPORT_QUERY_HISTORY_CLEANUP_TASK = "export_query_history_cleanup_task"
# Hook execution log retention
HOOK_EXECUTION_LOG_CLEANUP_TASK = "hook_execution_log_cleanup_task"
# Sandbox cleanup
CLEANUP_IDLE_SANDBOXES = "cleanup_idle_sandboxes"
CLEANUP_OLD_SNAPSHOTS = "cleanup_old_snapshots"

View File

@@ -890,8 +890,8 @@ class ConfluenceConnector(
def _retrieve_all_slim_docs(
self,
start: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
end: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
include_permissions: bool = True,
) -> GenerateSlimDocumentOutput:
@@ -915,8 +915,8 @@ class ConfluenceConnector(
self.confluence_client, doc_id, restrictions, ancestors
) or space_level_access_info.get(page_space_key)
# Query pages
page_query = self.base_cql_page_query + self.cql_label_filter
# Query pages (with optional time filtering for indexing_start)
page_query = self._construct_page_cql_query(start, end)
for page in self.confluence_client.cql_paginate_all_expansions(
cql=page_query,
expand=restrictions_expand,
@@ -950,7 +950,9 @@ class ConfluenceConnector(
# Query attachments for each page
page_hierarchy_node_yielded = False
attachment_query = self._construct_attachment_query(_get_page_id(page))
attachment_query = self._construct_attachment_query(
_get_page_id(page), start, end
)
for attachment in self.confluence_client.cql_paginate_all_expansions(
cql=attachment_query,
expand=restrictions_expand,

View File

@@ -123,7 +123,7 @@ class OnyxConfluence:
self.shared_base_kwargs: dict[str, str | int | bool] = {
"api_version": "cloud" if is_cloud else "latest",
"backoff_and_retry": True,
"backoff_and_retry": False,
"cloud": is_cloud,
}
if timeout:
@@ -456,7 +456,7 @@ class OnyxConfluence:
return attr(*args, **kwargs)
except HTTPError as e:
delay_until = _handle_http_error(e, attempt)
delay_until = _handle_http_error(e, attempt, MAX_RETRIES)
logger.warning(
f"HTTPError in confluence call. Retrying in {delay_until} seconds..."
)

View File

@@ -363,7 +363,7 @@ def handle_confluence_rate_limit(confluence_call: F) -> F:
# and applying our own retries in a more specific set of circumstances
return confluence_call(*args, **kwargs)
except requests.HTTPError as e:
delay_until = _handle_http_error(e, attempt)
delay_until = _handle_http_error(e, attempt, MAX_RETRIES)
logger.warning(
f"HTTPError in confluence call. Retrying in {delay_until} seconds..."
)
@@ -384,7 +384,7 @@ def handle_confluence_rate_limit(confluence_call: F) -> F:
return cast(F, wrapped_call)
def _handle_http_error(e: requests.HTTPError, attempt: int) -> int:
def _handle_http_error(e: requests.HTTPError, attempt: int, max_retries: int) -> int:
MIN_DELAY = 2
MAX_DELAY = 60
STARTING_DELAY = 5
@@ -408,6 +408,17 @@ def _handle_http_error(e: requests.HTTPError, attempt: int) -> int:
raise e
if e.response.status_code >= 500:
if attempt >= max_retries - 1:
raise e
delay = min(STARTING_DELAY * (BACKOFF**attempt), MAX_DELAY)
logger.warning(
f"Server error {e.response.status_code}. "
f"Retrying in {delay} seconds (attempt {attempt + 1})..."
)
return math.ceil(time.monotonic() + delay)
if (
e.response.status_code != 429
and RATE_LIMIT_MESSAGE_LOWERCASE not in e.response.text.lower()

View File

@@ -157,9 +157,7 @@ def _execute_single_retrieval(
logger.error(f"Error executing request: {e}")
raise e
elif _is_rate_limit_error(e):
results = _execute_with_retry(
lambda: retrieval_function(**request_kwargs).execute()
)
results = _execute_with_retry(retrieval_function(**request_kwargs))
elif e.resp.status == 404 or e.resp.status == 403:
if continue_on_404_or_403:
logger.debug(f"Error executing request: {e}")

View File

@@ -10,6 +10,7 @@ from datetime import timedelta
from datetime import timezone
from typing import Any
import requests
from jira import JIRA
from jira.exceptions import JIRAError
from jira.resources import Issue
@@ -239,29 +240,53 @@ def enhanced_search_ids(
)
def bulk_fetch_issues(
jira_client: JIRA, issue_ids: list[str], fields: str | None = None
) -> list[Issue]:
# TODO: move away from this jira library if they continue to not support
# the endpoints we need. Using private fields is not ideal, but
# is likely fine for now since we pin the library version
def _bulk_fetch_request(
jira_client: JIRA, issue_ids: list[str], fields: str | None
) -> list[dict[str, Any]]:
"""Raw POST to the bulkfetch endpoint. Returns the list of raw issue dicts."""
bulk_fetch_path = jira_client._get_url("issue/bulkfetch")
# Prepare the payload according to Jira API v3 specification
payload: dict[str, Any] = {"issueIdsOrKeys": issue_ids}
# Only restrict fields if specified, might want to explicitly do this in the future
# to avoid reading unnecessary data
payload["fields"] = fields.split(",") if fields else ["*all"]
resp = jira_client._session.post(bulk_fetch_path, json=payload)
return resp.json()["issues"]
def bulk_fetch_issues(
jira_client: JIRA, issue_ids: list[str], fields: str | None = None
) -> list[Issue]:
# TODO(evan): move away from this jira library if they continue to not support
# the endpoints we need. Using private fields is not ideal, but
# is likely fine for now since we pin the library version
try:
response = jira_client._session.post(bulk_fetch_path, json=payload).json()
raw_issues = _bulk_fetch_request(jira_client, issue_ids, fields)
except requests.exceptions.JSONDecodeError:
if len(issue_ids) <= 1:
logger.exception(
f"Jira bulk-fetch response for issue(s) {issue_ids} could not "
f"be decoded as JSON (response too large or truncated)."
)
raise
mid = len(issue_ids) // 2
logger.warning(
f"Jira bulk-fetch JSON decode failed for batch of {len(issue_ids)} issues. "
f"Splitting into sub-batches of {mid} and {len(issue_ids) - mid}."
)
left = bulk_fetch_issues(jira_client, issue_ids[:mid], fields)
right = bulk_fetch_issues(jira_client, issue_ids[mid:], fields)
return left + right
except Exception as e:
logger.error(f"Error fetching issues: {e}")
raise e
raise
return [
Issue(jira_client._options, jira_client._session, raw=issue)
for issue in response["issues"]
for issue in raw_issues
]

View File

@@ -53,7 +53,7 @@ class NotionPage(BaseModel):
id: str
created_time: str
last_edited_time: str
archived: bool
in_trash: bool
properties: dict[str, Any]
url: str
@@ -63,6 +63,13 @@ class NotionPage(BaseModel):
)
class NotionDataSource(BaseModel):
"""Represents a Notion Data Source within a database."""
id: str
name: str = ""
class NotionBlock(BaseModel):
"""Represents a Notion Block object"""
@@ -107,7 +114,7 @@ class NotionConnector(LoadConnector, PollConnector):
self.batch_size = batch_size
self.headers = {
"Content-Type": "application/json",
"Notion-Version": "2022-06-28",
"Notion-Version": "2026-03-11",
}
self.indexed_pages: set[str] = set()
self.root_page_id = root_page_id
@@ -127,6 +134,9 @@ class NotionConnector(LoadConnector, PollConnector):
# Maps child page IDs to their containing page ID (discovered in _read_blocks).
# Used to resolve block_id parent types to the actual containing page.
self._child_page_parent_map: dict[str, str] = {}
# Maps data_source_id -> database_id (populated in _read_pages_from_database).
# Used to resolve data_source_id parent types back to the database.
self._data_source_to_database_map: dict[str, str] = {}
@classmethod
@override
@@ -227,7 +237,11 @@ class NotionConnector(LoadConnector, PollConnector):
@retry(tries=3, delay=1, backoff=2)
def _fetch_database_as_page(self, database_id: str) -> NotionPage:
"""Attempt to fetch a database as a page."""
"""Attempt to fetch a database as a page.
Note: As of API 2025-09-03, database objects no longer include
`properties` (schema moved to individual data sources).
"""
logger.debug(f"Fetching database for ID '{database_id}' as a page")
database_url = f"https://api.notion.com/v1/databases/{database_id}"
res = rl_requests.get(
@@ -246,18 +260,52 @@ class NotionConnector(LoadConnector, PollConnector):
database_name[0].get("text", {}).get("content") if database_name else None
)
db_data.setdefault("properties", {})
return NotionPage(**db_data, database_name=database_name)
@retry(tries=3, delay=1, backoff=2)
def _fetch_database(
self, database_id: str, cursor: str | None = None
def _fetch_data_sources_for_database(
self, database_id: str
) -> list[NotionDataSource]:
"""Fetch the list of data sources for a database."""
logger.debug(f"Fetching data sources for database '{database_id}'")
res = rl_requests.get(
f"https://api.notion.com/v1/databases/{database_id}",
headers=self.headers,
timeout=_NOTION_CALL_TIMEOUT,
)
try:
res.raise_for_status()
except Exception as e:
if res.status_code in (403, 404):
logger.error(
f"Unable to access database with ID '{database_id}'. "
f"This is likely due to the database not being shared "
f"with the Onyx integration. Exact exception:\n{e}"
)
return []
logger.exception(f"Error fetching database - {res.json()}")
raise e
db_data = res.json()
data_sources = db_data.get("data_sources", [])
return [
NotionDataSource(id=ds["id"], name=ds.get("name", ""))
for ds in data_sources
if ds.get("id")
]
@retry(tries=3, delay=1, backoff=2)
def _fetch_data_source(
self, data_source_id: str, cursor: str | None = None
) -> dict[str, Any]:
"""Fetch a database from it's ID via the Notion API."""
logger.debug(f"Fetching database for ID '{database_id}'")
block_url = f"https://api.notion.com/v1/databases/{database_id}/query"
"""Query a data source via POST /v1/data_sources/{id}/query."""
logger.debug(f"Querying data source '{data_source_id}'")
url = f"https://api.notion.com/v1/data_sources/{data_source_id}/query"
body = None if not cursor else {"start_cursor": cursor}
res = rl_requests.post(
block_url,
url,
headers=self.headers,
json=body,
timeout=_NOTION_CALL_TIMEOUT,
@@ -265,25 +313,14 @@ class NotionConnector(LoadConnector, PollConnector):
try:
res.raise_for_status()
except Exception as e:
json_data = res.json()
code = json_data.get("code")
# Sep 3 2025 backend changed the error message for this case
# TODO: it is also now possible for there to be multiple data sources per database; at present we
# just don't handle that. We will need to upgrade the API to the current version + query the
# new data sources endpoint to handle that case correctly.
if code == "object_not_found" or (
code == "validation_error"
and "does not contain any data sources" in json_data.get("message", "")
):
# this happens when a database is not shared with the integration
# in this case, we should just ignore the database
if res.status_code in (403, 404):
logger.error(
f"Unable to access database with ID '{database_id}'. "
f"This is likely due to the database not being shared "
f"Unable to access data source with ID '{data_source_id}'. "
f"This is likely due to it not being shared "
f"with the Onyx integration. Exact exception:\n{e}"
)
return {"results": [], "next_cursor": None}
logger.exception(f"Error fetching database - {res.json()}")
logger.exception(f"Error querying data source - {res.json()}")
raise e
return res.json()
@@ -348,8 +385,9 @@ class NotionConnector(LoadConnector, PollConnector):
# Fallback to workspace if we don't know the parent
return self.workspace_id
elif parent_type == "data_source_id":
# Newer Notion API may use data_source_id for databases
return parent.get("database_id") or parent.get("data_source_id")
ds_id = parent.get("data_source_id")
if ds_id:
return self._data_source_to_database_map.get(ds_id, self.workspace_id)
elif parent_type in ["page_id", "database_id"]:
return parent.get(parent_type)
@@ -497,18 +535,32 @@ class NotionConnector(LoadConnector, PollConnector):
if db_node:
hierarchy_nodes.append(db_node)
cursor = None
while True:
data = self._fetch_database(database_id, cursor)
# Discover all data sources under this database, then query each one.
# Even legacy single-source databases have one entry in the array.
data_sources = self._fetch_data_sources_for_database(database_id)
if not data_sources:
logger.warning(
f"Database '{database_id}' returned zero data sources — "
f"no pages will be indexed from this database."
)
for ds in data_sources:
self._data_source_to_database_map[ds.id] = database_id
cursor = None
while True:
data = self._fetch_data_source(ds.id, cursor)
for result in data["results"]:
obj_id = result["id"]
obj_type = result["object"]
text = self._properties_to_str(result.get("properties", {}))
if text:
result_blocks.append(NotionBlock(id=obj_id, text=text, prefix="\n"))
for result in data["results"]:
obj_id = result["id"]
obj_type = result["object"]
text = self._properties_to_str(result.get("properties", {}))
if text:
result_blocks.append(
NotionBlock(id=obj_id, text=text, prefix="\n")
)
if not self.recursive_index_enabled:
continue
if self.recursive_index_enabled:
if obj_type == "page":
logger.debug(
f"Found page with ID '{obj_id}' in database '{database_id}'"
@@ -518,7 +570,6 @@ class NotionConnector(LoadConnector, PollConnector):
logger.debug(
f"Found database with ID '{obj_id}' in database '{database_id}'"
)
# Get nested database name from properties if available
nested_db_title = result.get("title", [])
nested_db_name = None
if nested_db_title and len(nested_db_title) > 0:
@@ -533,10 +584,10 @@ class NotionConnector(LoadConnector, PollConnector):
result_pages.extend(nested_output.child_page_ids)
hierarchy_nodes.extend(nested_output.hierarchy_nodes)
if data["next_cursor"] is None:
break
if data["next_cursor"] is None:
break
cursor = data["next_cursor"]
cursor = data["next_cursor"]
return BlockReadOutput(
blocks=result_blocks,
@@ -807,36 +858,55 @@ class NotionConnector(LoadConnector, PollConnector):
def _yield_database_hierarchy_nodes(
self,
) -> Generator[HierarchyNode | Document, None, None]:
"""Search for all databases and yield hierarchy nodes for each.
"""Search for all data sources and yield hierarchy nodes for their parent databases.
This must be called BEFORE page indexing so that database hierarchy nodes
exist when pages inside databases reference them as parents.
With the new API, search returns data source objects instead of databases.
Multiple data sources can share the same parent database, so we use
database_id as the hierarchy node key and deduplicate via
_maybe_yield_hierarchy_node.
"""
query_dict: dict[str, Any] = {
"filter": {"property": "object", "value": "database"},
"filter": {"property": "object", "value": "data_source"},
"page_size": _NOTION_PAGE_SIZE,
}
pages_seen = 0
while pages_seen < _MAX_PAGES:
db_res = self._search_notion(query_dict)
for db in db_res.results:
db_id = db["id"]
# Extract title from the title array
title_arr = db.get("title", [])
db_name = None
if title_arr:
db_name = " ".join(
t.get("plain_text", "") for t in title_arr
).strip()
if not db_name:
for ds in db_res.results:
# Extract the parent database_id from the data source's parent
ds_parent = ds.get("parent", {})
db_id = ds_parent.get("database_id")
if not db_id:
continue
# Populate the mapping so _get_parent_raw_id can resolve later
ds_id = ds.get("id")
if not ds_id:
continue
self._data_source_to_database_map[ds_id] = db_id
# Fetch the database to get its actual name and parent
try:
db_page = self._fetch_database_as_page(db_id)
db_name = db_page.database_name or f"Database {db_id}"
parent_raw_id = self._get_parent_raw_id(db_page.parent)
db_url = (
db_page.url or f"https://notion.so/{db_id.replace('-', '')}"
)
except requests.exceptions.RequestException as e:
logger.warning(
f"Could not fetch database '{db_id}', "
f"defaulting to workspace root. Error: {e}"
)
db_name = f"Database {db_id}"
parent_raw_id = self.workspace_id
db_url = f"https://notion.so/{db_id.replace('-', '')}"
# Get parent using existing helper
parent_raw_id = self._get_parent_raw_id(db.get("parent"))
# Notion URLs omit dashes from UUIDs
db_url = db.get("url") or f"https://notion.so/{db_id.replace('-', '')}"
# _maybe_yield_hierarchy_node deduplicates by raw_node_id,
# so multiple data sources under one database produce one node.
node = self._maybe_yield_hierarchy_node(
raw_node_id=db_id,
raw_parent_id=parent_raw_id or self.workspace_id,

View File

@@ -1,5 +1,6 @@
import base64
import copy
import fnmatch
import html
import io
import os
@@ -84,6 +85,44 @@ SHARED_DOCUMENTS_MAP_REVERSE = {v: k for k, v in SHARED_DOCUMENTS_MAP.items()}
ASPX_EXTENSION = ".aspx"
def _is_site_excluded(site_url: str, excluded_site_patterns: list[str]) -> bool:
"""Check if a site URL matches any of the exclusion glob patterns."""
for pattern in excluded_site_patterns:
if fnmatch.fnmatch(site_url, pattern) or fnmatch.fnmatch(
site_url.rstrip("/"), pattern.rstrip("/")
):
return True
return False
def _is_path_excluded(item_path: str, excluded_path_patterns: list[str]) -> bool:
"""Check if a drive item path matches any of the exclusion glob patterns.
item_path is the relative path within a drive, e.g. "Engineering/API/report.docx".
Matches are attempted against the full path and the filename alone so that
patterns like "*.tmp" match files at any depth.
"""
filename = item_path.rsplit("/", 1)[-1] if "/" in item_path else item_path
for pattern in excluded_path_patterns:
if fnmatch.fnmatch(item_path, pattern) or fnmatch.fnmatch(filename, pattern):
return True
return False
def _build_item_relative_path(parent_reference_path: str | None, item_name: str) -> str:
"""Build the relative path of a drive item from its parentReference.path and name.
Example: parentReference.path="/drives/abc/root:/Eng/API", name="report.docx"
=> "Eng/API/report.docx"
"""
if parent_reference_path and "root:/" in parent_reference_path:
folder = unquote(parent_reference_path.split("root:/", 1)[1])
if folder:
return f"{folder}/{item_name}"
return item_name
DEFAULT_AUTHORITY_HOST = "https://login.microsoftonline.com"
DEFAULT_GRAPH_API_HOST = "https://graph.microsoft.com"
DEFAULT_SHAREPOINT_DOMAIN_SUFFIX = "sharepoint.com"
@@ -478,6 +517,7 @@ def _convert_driveitem_to_document_with_permissions(
include_permissions: bool = False,
parent_hierarchy_raw_node_id: str | None = None,
access_token: str | None = None,
treat_sharing_link_as_public: bool = False,
) -> Document | ConnectorFailure | None:
if not driveitem.name or not driveitem.id:
@@ -610,6 +650,7 @@ def _convert_driveitem_to_document_with_permissions(
drive_item=sdk_item,
drive_name=drive_name,
add_prefix=True,
treat_sharing_link_as_public=treat_sharing_link_as_public,
)
else:
external_access = ExternalAccess.empty()
@@ -644,6 +685,7 @@ def _convert_sitepage_to_document(
graph_client: GraphClient,
include_permissions: bool = False,
parent_hierarchy_raw_node_id: str | None = None,
treat_sharing_link_as_public: bool = False,
) -> Document:
"""Convert a SharePoint site page to a Document object."""
# Extract text content from the site page
@@ -773,6 +815,7 @@ def _convert_sitepage_to_document(
graph_client=graph_client,
site_page=site_page,
add_prefix=True,
treat_sharing_link_as_public=treat_sharing_link_as_public,
)
else:
external_access = ExternalAccess.empty()
@@ -803,6 +846,7 @@ def _convert_driveitem_to_slim_document(
ctx: ClientContext,
graph_client: GraphClient,
parent_hierarchy_raw_node_id: str | None = None,
treat_sharing_link_as_public: bool = False,
) -> SlimDocument:
if driveitem.id is None:
raise ValueError("DriveItem ID is required")
@@ -813,6 +857,7 @@ def _convert_driveitem_to_slim_document(
graph_client=graph_client,
drive_item=sdk_item,
drive_name=drive_name,
treat_sharing_link_as_public=treat_sharing_link_as_public,
)
return SlimDocument(
@@ -827,6 +872,7 @@ def _convert_sitepage_to_slim_document(
ctx: ClientContext | None,
graph_client: GraphClient,
parent_hierarchy_raw_node_id: str | None = None,
treat_sharing_link_as_public: bool = False,
) -> SlimDocument:
"""Convert a SharePoint site page to a SlimDocument object."""
if site_page.get("id") is None:
@@ -836,6 +882,7 @@ def _convert_sitepage_to_slim_document(
ctx=ctx,
graph_client=graph_client,
site_page=site_page,
treat_sharing_link_as_public=treat_sharing_link_as_public,
)
id = site_page.get("id")
if id is None:
@@ -855,14 +902,20 @@ class SharepointConnector(
self,
batch_size: int = INDEX_BATCH_SIZE,
sites: list[str] = [],
excluded_sites: list[str] = [],
excluded_paths: list[str] = [],
include_site_pages: bool = True,
include_site_documents: bool = True,
treat_sharing_link_as_public: bool = False,
authority_host: str = DEFAULT_AUTHORITY_HOST,
graph_api_host: str = DEFAULT_GRAPH_API_HOST,
sharepoint_domain_suffix: str = DEFAULT_SHAREPOINT_DOMAIN_SUFFIX,
) -> None:
self.batch_size = batch_size
self.sites = list(sites)
self.excluded_sites = [s for p in excluded_sites if (s := p.strip())]
self.excluded_paths = [s for p in excluded_paths if (s := p.strip())]
self.treat_sharing_link_as_public = treat_sharing_link_as_public
self.site_descriptors: list[SiteDescriptor] = self._extract_site_and_drive_info(
sites
)
@@ -1233,6 +1286,29 @@ class SharepointConnector(
break
sites = sites._get_next().execute_query()
def _is_driveitem_excluded(self, driveitem: DriveItemData) -> bool:
"""Check if a drive item should be excluded based on excluded_paths patterns."""
if not self.excluded_paths:
return False
relative_path = _build_item_relative_path(
driveitem.parent_reference_path, driveitem.name
)
return _is_path_excluded(relative_path, self.excluded_paths)
def _filter_excluded_sites(
self, site_descriptors: list[SiteDescriptor]
) -> list[SiteDescriptor]:
"""Remove sites matching any excluded_sites glob pattern."""
if not self.excluded_sites:
return site_descriptors
result = []
for sd in site_descriptors:
if _is_site_excluded(sd.url, self.excluded_sites):
logger.info(f"Excluding site by denylist: {sd.url}")
continue
result.append(sd)
return result
def fetch_sites(self) -> list[SiteDescriptor]:
sites = self.graph_client.sites.get_all_sites().execute_query()
@@ -1249,7 +1325,7 @@ class SharepointConnector(
for site in self._handle_paginated_sites(sites)
if "-my.sharepoint" not in site.web_url
]
return site_descriptors
return self._filter_excluded_sites(site_descriptors)
def _fetch_site_pages(
self,
@@ -1689,8 +1765,14 @@ class SharepointConnector(
checkpoint.current_drive_delta_next_link = None
checkpoint.seen_document_ids.clear()
def _fetch_slim_documents_from_sharepoint(self) -> GenerateSlimDocumentOutput:
site_descriptors = self.site_descriptors or self.fetch_sites()
def _fetch_slim_documents_from_sharepoint(
self,
start: datetime | None = None,
end: datetime | None = None,
) -> GenerateSlimDocumentOutput:
site_descriptors = self._filter_excluded_sites(
self.site_descriptors or self.fetch_sites()
)
# Create a temporary checkpoint for hierarchy node tracking
temp_checkpoint = SharepointConnectorCheckpoint(has_more=True)
@@ -1708,8 +1790,14 @@ class SharepointConnector(
# Process site documents if flag is True
if self.include_site_documents:
for driveitem, drive_name, drive_web_url in self._fetch_driveitems(
site_descriptor=site_descriptor
site_descriptor=site_descriptor,
start=start,
end=end,
):
if self._is_driveitem_excluded(driveitem):
logger.debug(f"Excluding by path denylist: {driveitem.web_url}")
continue
if drive_web_url:
doc_batch.extend(
self._yield_drive_hierarchy_node(
@@ -1747,6 +1835,7 @@ class SharepointConnector(
ctx,
self.graph_client,
parent_hierarchy_raw_node_id=parent_hierarchy_url,
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
)
)
except Exception as e:
@@ -1758,7 +1847,9 @@ class SharepointConnector(
# Process site pages if flag is True
if self.include_site_pages:
site_pages = self._fetch_site_pages(site_descriptor)
site_pages = self._fetch_site_pages(
site_descriptor, start=start, end=end
)
for site_page in site_pages:
logger.debug(
f"Processing site page: {site_page.get('webUrl', site_page.get('name', 'Unknown'))}"
@@ -1770,6 +1861,7 @@ class SharepointConnector(
ctx,
self.graph_client,
parent_hierarchy_raw_node_id=site_descriptor.url,
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
)
)
if len(doc_batch) >= SLIM_BATCH_SIZE:
@@ -2043,7 +2135,9 @@ class SharepointConnector(
and not checkpoint.process_site_pages
):
logger.info("Initializing SharePoint sites for processing")
site_descs = self.site_descriptors or self.fetch_sites()
site_descs = self._filter_excluded_sites(
self.site_descriptors or self.fetch_sites()
)
checkpoint.cached_site_descriptors = deque(site_descs)
if not checkpoint.cached_site_descriptors:
@@ -2264,6 +2358,10 @@ class SharepointConnector(
for driveitem in driveitems:
item_count += 1
if self._is_driveitem_excluded(driveitem):
logger.debug(f"Excluding by path denylist: {driveitem.web_url}")
continue
if driveitem.id and driveitem.id in checkpoint.seen_document_ids:
logger.debug(
f"Skipping duplicate document {driveitem.id} ({driveitem.name})"
@@ -2318,6 +2416,7 @@ class SharepointConnector(
parent_hierarchy_raw_node_id=parent_hierarchy_url,
graph_api_base=self.graph_api_base,
access_token=access_token,
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
)
if isinstance(doc_or_failure, Document):
@@ -2398,6 +2497,7 @@ class SharepointConnector(
include_permissions=include_permissions,
# Site pages have the site as their parent
parent_hierarchy_raw_node_id=site_descriptor.url,
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
)
)
logger.info(
@@ -2473,12 +2573,22 @@ class SharepointConnector(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
end: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None, # noqa: ARG002
) -> GenerateSlimDocumentOutput:
yield from self._fetch_slim_documents_from_sharepoint()
start_dt = (
datetime.fromtimestamp(start, tz=timezone.utc)
if start is not None
else None
)
end_dt = (
datetime.fromtimestamp(end, tz=timezone.utc) if end is not None else None
)
yield from self._fetch_slim_documents_from_sharepoint(
start=start_dt,
end=end_dt,
)
if __name__ == "__main__":

View File

@@ -17,6 +17,7 @@ def get_sharepoint_external_access(
drive_name: str | None = None,
site_page: dict[str, Any] | None = None,
add_prefix: bool = False,
treat_sharing_link_as_public: bool = False,
) -> ExternalAccess:
if drive_item and drive_item.id is None:
raise ValueError("DriveItem ID is required")
@@ -34,7 +35,13 @@ def get_sharepoint_external_access(
)
external_access = get_external_access_func(
ctx, graph_client, drive_name, drive_item, site_page, add_prefix
ctx,
graph_client,
drive_name,
drive_item,
site_page,
add_prefix,
treat_sharing_link_as_public,
)
return external_access

View File

@@ -516,6 +516,8 @@ def _get_all_doc_ids(
] = default_msg_filter,
callback: IndexingHeartbeatInterface | None = None,
workspace_url: str | None = None,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> GenerateSlimDocumentOutput:
"""
Get all document ids in the workspace, channel by channel
@@ -546,6 +548,8 @@ def _get_all_doc_ids(
client=client,
channel=channel,
callback=callback,
oldest=str(start) if start else None, # 0.0 -> None intentionally
latest=str(end) if end is not None else None,
)
for message_batch in channel_message_batches:
@@ -847,8 +851,8 @@ class SlackConnector(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
end: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
if self.client is None:
@@ -861,6 +865,8 @@ class SlackConnector(
msg_filter_func=self.msg_filter_func,
callback=callback,
workspace_url=self._workspace_url,
start=start,
end=end,
)
def _load_from_checkpoint(

View File

@@ -88,8 +88,9 @@ WEB_CONNECTOR_MAX_SCROLL_ATTEMPTS = 20
IFRAME_TEXT_LENGTH_THRESHOLD = 700
# Message indicating JavaScript is disabled, which often appears when scraping fails
JAVASCRIPT_DISABLED_MESSAGE = "You have JavaScript disabled in your browser"
# Grace period after page navigation to allow bot-detection challenges to complete
BOT_DETECTION_GRACE_PERIOD_MS = 5000
# Grace period after page navigation to allow bot-detection challenges
# and SPA content rendering to complete
PAGE_RENDER_TIMEOUT_MS = 5000
# Define common headers that mimic a real browser
DEFAULT_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
@@ -547,7 +548,15 @@ class WebConnector(LoadConnector):
)
# Give the page a moment to start rendering after navigation commits.
# Allows CloudFlare and other bot-detection challenges to complete.
page.wait_for_timeout(BOT_DETECTION_GRACE_PERIOD_MS)
page.wait_for_timeout(PAGE_RENDER_TIMEOUT_MS)
# Wait for network activity to settle so SPAs that fetch content
# asynchronously after the initial JS bundle have time to render.
try:
# A bit of extra time to account for long-polling, websockets, etc.
page.wait_for_load_state("networkidle", timeout=PAGE_RENDER_TIMEOUT_MS)
except TimeoutError:
pass
last_modified = (
page_response.header_value("Last-Modified") if page_response else None
@@ -576,7 +585,7 @@ class WebConnector(LoadConnector):
# (e.g., CloudFlare protection keeps making requests)
try:
page.wait_for_load_state(
"networkidle", timeout=BOT_DETECTION_GRACE_PERIOD_MS
"networkidle", timeout=PAGE_RENDER_TIMEOUT_MS
)
except TimeoutError:
# If networkidle times out, just give it a moment for content to render

View File

@@ -2,7 +2,6 @@ from collections.abc import Sequence
from datetime import datetime
from enum import Enum
from typing import Any
from uuid import UUID
from pydantic import BaseModel
from pydantic import Field
@@ -70,9 +69,13 @@ class BaseFilters(BaseModel):
class UserFileFilters(BaseModel):
user_file_ids: list[UUID] | None = None
project_id: int | None = None
persona_id: int | None = None
# Scopes search to user files tagged with a given project/persona in Vespa.
# These are NOT simply the IDs of the current project or persona — they are
# only set when the persona's/project's user files overflowed the LLM
# context window and must be searched via vector DB instead of being loaded
# directly into the prompt.
project_id_filter: int | None = None
persona_id_filter: int | None = None
class AssistantKnowledgeFilters(BaseModel):
@@ -398,3 +401,16 @@ class SavedSearchDocWithContent(SavedSearchDoc):
section in addition to the match_highlights."""
content: str
class PersonaSearchInfo(BaseModel):
"""Snapshot of persona data needed by the search pipeline.
Extracted from the ORM Persona before the DB session is released so that
SearchTool and search_pipeline never lazy-load relationships post-commit.
"""
document_set_names: list[str]
search_start_date: datetime | None
attached_document_ids: list[str]
hierarchy_node_ids: list[int]

View File

@@ -1,6 +1,5 @@
from collections import defaultdict
from datetime import datetime
from uuid import UUID
from sqlalchemy.orm import Session
@@ -10,12 +9,12 @@ from onyx.context.search.models import ChunkSearchRequest
from onyx.context.search.models import IndexFilters
from onyx.context.search.models import InferenceChunk
from onyx.context.search.models import InferenceSection
from onyx.context.search.models import PersonaSearchInfo
from onyx.context.search.preprocessing.access_filters import (
build_access_filters_for_user,
)
from onyx.context.search.retrieval.search_runner import search_chunks
from onyx.context.search.utils import inference_section_from_chunks
from onyx.db.models import Persona
from onyx.db.models import User
from onyx.document_index.interfaces import DocumentIndex
from onyx.federated_connectors.federated_retrieval import FederatedRetrievalInfo
@@ -39,9 +38,8 @@ logger = setup_logger()
def _build_index_filters(
user_provided_filters: BaseFilters | None,
user: User, # Used for ACLs, anonymous users only see public docs
project_id: int | None,
persona_id: int | None,
user_file_ids: list[UUID] | None,
project_id_filter: int | None,
persona_id_filter: int | None,
persona_document_sets: list[str] | None,
persona_time_cutoff: datetime | None,
db_session: Session | None = None,
@@ -97,16 +95,6 @@ def _build_index_filters(
if not source_filter and detected_source_filter:
source_filter = detected_source_filter
# CRITICAL FIX: If user_file_ids are present, we must ensure "user_file"
# source type is included in the filter, otherwise user files will be excluded!
if user_file_ids and source_filter:
from onyx.configs.constants import DocumentSource
# Add user_file to the source filter if not already present
if DocumentSource.USER_FILE not in source_filter:
source_filter = list(source_filter) + [DocumentSource.USER_FILE]
logger.debug("Added USER_FILE to source_filter for user knowledge search")
if bypass_acl:
user_acl_filters = None
elif acl_filters is not None:
@@ -117,9 +105,8 @@ def _build_index_filters(
user_acl_filters = build_access_filters_for_user(user, db_session)
final_filters = IndexFilters(
user_file_ids=user_file_ids,
project_id=project_id,
persona_id=persona_id,
project_id_filter=project_id_filter,
persona_id_filter=persona_id_filter,
source_type=source_filter,
document_set=document_set_filter,
time_cutoff=time_filter,
@@ -260,51 +247,41 @@ def search_pipeline(
document_index: DocumentIndex,
# Used for ACLs and federated search, anonymous users only see public docs
user: User,
# Used for default filters and settings
persona: Persona | None,
# Pre-extracted persona search configuration (None when no persona)
persona_search_info: PersonaSearchInfo | None,
db_session: Session | None = None,
auto_detect_filters: bool = False,
llm: LLM | None = None,
# If a project ID is provided, it will be exclusively scoped to that project
project_id: int | None = None,
# If a persona_id is provided, search scopes to files attached to this persona
persona_id: int | None = None,
# Vespa metadata filters for overflowing user files. NOT the raw IDs
# of the current project/persona — only set when user files couldn't fit
# in the LLM context and need to be searched via vector DB.
project_id_filter: int | None = None,
persona_id_filter: int | None = None,
# Pre-fetched data — when provided, avoids DB queries (no session needed)
acl_filters: list[str] | None = None,
embedding_model: EmbeddingModel | None = None,
prefetched_federated_retrieval_infos: list[FederatedRetrievalInfo] | None = None,
) -> list[InferenceChunk]:
user_uploaded_persona_files: list[UUID] | None = (
[user_file.id for user_file in persona.user_files] if persona else None
)
persona_document_sets: list[str] | None = (
[persona_document_set.name for persona_document_set in persona.document_sets]
if persona
else None
persona_search_info.document_set_names if persona_search_info else None
)
persona_time_cutoff: datetime | None = (
persona.search_start_date if persona else None
persona_search_info.search_start_date if persona_search_info else None
)
# Extract assistant knowledge filters from persona
attached_document_ids: list[str] | None = (
[doc.id for doc in persona.attached_documents]
if persona and persona.attached_documents
persona_search_info.attached_document_ids or None
if persona_search_info
else None
)
hierarchy_node_ids: list[int] | None = (
[node.id for node in persona.hierarchy_nodes]
if persona and persona.hierarchy_nodes
else None
persona_search_info.hierarchy_node_ids or None if persona_search_info else None
)
filters = _build_index_filters(
user_provided_filters=chunk_search_request.user_selected_filters,
user=user,
project_id=project_id,
persona_id=persona_id,
user_file_ids=user_uploaded_persona_files,
project_id_filter=project_id_filter,
persona_id_filter=persona_id_filter,
persona_document_sets=persona_document_sets,
persona_time_cutoff=persona_time_cutoff,
db_session=db_session,

View File

@@ -14,6 +14,10 @@ from onyx.context.search.utils import get_query_embedding
from onyx.context.search.utils import inference_section_from_chunks
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.interfaces import VespaChunkRequest
from onyx.document_index.interfaces_new import DocumentIndex as NewDocumentIndex
from onyx.document_index.opensearch.opensearch_document_index import (
OpenSearchOldDocumentIndex,
)
from onyx.federated_connectors.federated_retrieval import FederatedRetrievalInfo
from onyx.federated_connectors.federated_retrieval import (
get_federated_retrieval_functions,
@@ -49,7 +53,7 @@ def combine_retrieval_results(
return sorted_chunks
def _embed_and_search(
def _embed_and_hybrid_search(
query_request: ChunkIndexRequest,
document_index: DocumentIndex,
db_session: Session | None = None,
@@ -81,6 +85,17 @@ def _embed_and_search(
return top_chunks
def _keyword_search(
query_request: ChunkIndexRequest,
document_index: NewDocumentIndex,
) -> list[InferenceChunk]:
return document_index.keyword_retrieval(
query=query_request.query,
filters=query_request.filters,
num_to_retrieve=query_request.limit or NUM_RETURNED_HITS,
)
def search_chunks(
query_request: ChunkIndexRequest,
user_id: UUID | None,
@@ -110,7 +125,6 @@ def search_chunks(
user_id=user_id,
source_types=list(source_filters) if source_filters else None,
document_set_names=query_request.filters.document_set,
user_file_ids=query_request.filters.user_file_ids,
)
federated_sources = set(
@@ -129,21 +143,38 @@ def search_chunks(
)
if normal_search_enabled:
run_queries.append(
(
_embed_and_search,
(query_request, document_index, db_session, embedding_model),
if (
query_request.hybrid_alpha is not None
and query_request.hybrid_alpha == 0.0
and isinstance(document_index, OpenSearchOldDocumentIndex)
):
# If hybrid alpha is explicitly set to keyword only, do pure keyword
# search without generating an embedding. This is currently only
# supported with OpenSearchDocumentIndex.
opensearch_new_document_index: NewDocumentIndex = document_index._real_index
run_queries.append(
(
lambda: _keyword_search(
query_request, opensearch_new_document_index
),
(),
)
)
else:
run_queries.append(
(
_embed_and_hybrid_search,
(query_request, document_index, db_session, embedding_model),
)
)
)
parallel_search_results = run_functions_tuples_in_parallel(run_queries)
top_chunks = combine_retrieval_results(parallel_search_results)
if not top_chunks:
logger.debug(
f"Hybrid search returned no results for query: {query_request.query}with filters: {query_request.filters}"
f"Search returned no results for query: {query_request.query} with filters: {query_request.filters}."
)
return []
return top_chunks

View File

@@ -16,6 +16,7 @@ from sqlalchemy import Row
from sqlalchemy import select
from sqlalchemy import update
from sqlalchemy.exc import MultipleResultsFound
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
@@ -28,6 +29,7 @@ from onyx.db.models import ChatMessage
from onyx.db.models import ChatMessage__SearchDoc
from onyx.db.models import ChatSession
from onyx.db.models import ChatSessionSharedStatus
from onyx.db.models import Persona
from onyx.db.models import SearchDoc as DBSearchDoc
from onyx.db.models import ToolCall
from onyx.db.models import User
@@ -53,9 +55,22 @@ def get_chat_session_by_id(
db_session: Session,
include_deleted: bool = False,
is_shared: bool = False,
eager_load_persona: bool = False,
) -> ChatSession:
stmt = select(ChatSession).where(ChatSession.id == chat_session_id)
if eager_load_persona:
stmt = stmt.options(
joinedload(ChatSession.persona).options(
selectinload(Persona.tools),
selectinload(Persona.user_files),
selectinload(Persona.document_sets),
selectinload(Persona.attached_documents),
selectinload(Persona.hierarchy_nodes),
),
joinedload(ChatSession.project),
)
if is_shared:
stmt = stmt.where(ChatSession.shared_status == ChatSessionSharedStatus.PUBLIC)
else:
@@ -602,6 +617,92 @@ def reserve_message_id(
return empty_message
def reserve_multi_model_message_ids(
db_session: Session,
chat_session_id: UUID,
parent_message_id: int,
model_display_names: list[str],
) -> list[ChatMessage]:
"""Reserve N assistant message placeholders for multi-model parallel streaming.
All messages share the same parent (the user message). The parent's
latest_child_message_id points to the LAST reserved message so that the
default history-chain walker picks it up.
"""
reserved: list[ChatMessage] = []
for display_name in model_display_names:
msg = ChatMessage(
chat_session_id=chat_session_id,
parent_message_id=parent_message_id,
latest_child_message_id=None,
message="Response was terminated prior to completion, try regenerating.",
token_count=15, # placeholder; updated on completion by llm_loop_completion_handle
message_type=MessageType.ASSISTANT,
model_display_name=display_name,
)
db_session.add(msg)
reserved.append(msg)
# Flush to assign IDs without committing yet
db_session.flush()
# Point parent's latest_child to the last reserved message
parent = (
db_session.query(ChatMessage)
.filter(ChatMessage.id == parent_message_id)
.first()
)
if parent:
parent.latest_child_message_id = reserved[-1].id
db_session.commit()
return reserved
def set_preferred_response(
db_session: Session,
user_message_id: int,
preferred_assistant_message_id: int,
) -> None:
"""Mark one assistant response as the user's preferred choice in a multi-model turn.
Also advances ``latest_child_message_id`` so the preferred response becomes
the active branch for any subsequent messages in the conversation.
Args:
db_session: Active database session.
user_message_id: Primary key of the ``USER``-type ``ChatMessage`` whose
preferred response is being set.
preferred_assistant_message_id: Primary key of the ``ASSISTANT``-type
``ChatMessage`` to prefer. Must be a direct child of ``user_message_id``.
Raises:
ValueError: If either message is not found, if ``user_message_id`` does not
refer to a USER message, or if the assistant message is not a direct child
of the user message.
"""
user_msg = db_session.get(ChatMessage, user_message_id)
if user_msg is None:
raise ValueError(f"User message {user_message_id} not found")
if user_msg.message_type != MessageType.USER:
raise ValueError(f"Message {user_message_id} is not a user message")
assistant_msg = db_session.get(ChatMessage, preferred_assistant_message_id)
if assistant_msg is None:
raise ValueError(
f"Assistant message {preferred_assistant_message_id} not found"
)
if assistant_msg.parent_message_id != user_message_id:
raise ValueError(
f"Assistant message {preferred_assistant_message_id} is not a child "
f"of user message {user_message_id}"
)
user_msg.preferred_response_id = preferred_assistant_message_id
user_msg.latest_child_message_id = preferred_assistant_message_id
db_session.commit()
def create_new_chat_message(
chat_session_id: UUID,
parent_message: ChatMessage,
@@ -824,6 +925,8 @@ def translate_db_message_to_chat_message_detail(
error=chat_message.error,
current_feedback=current_feedback,
processing_duration_seconds=chat_message.processing_duration_seconds,
preferred_response_id=chat_message.preferred_response_id,
model_display_name=chat_message.model_display_name,
)
return chat_msg_detail

View File

@@ -511,7 +511,7 @@ def add_credential_to_connector(
user: User,
connector_id: int,
credential_id: int,
cc_pair_name: str | None,
cc_pair_name: str,
access_type: AccessType,
groups: list[int] | None,
auto_sync_options: dict | None = None,
@@ -750,3 +750,31 @@ def resync_cc_pair(
)
db_session.commit()
# ── Metrics query helpers ──────────────────────────────────────────────
def get_connector_health_for_metrics(
db_session: Session,
) -> list: # Returns list of Row tuples
"""Return connector health data for Prometheus metrics.
Each row is (cc_pair_id, status, in_repeated_error_state,
last_successful_index_time, name, source).
"""
return (
db_session.query(
ConnectorCredentialPair.id,
ConnectorCredentialPair.status,
ConnectorCredentialPair.in_repeated_error_state,
ConnectorCredentialPair.last_successful_index_time,
ConnectorCredentialPair.name,
Connector.source,
)
.join(
Connector,
ConnectorCredentialPair.connector_id == Connector.id,
)
.all()
)

View File

@@ -1,4 +1,31 @@
from __future__ import annotations
from enum import Enum as PyEnum
from typing import ClassVar
class AccountType(str, PyEnum):
"""
What kind of account this is — determines whether the user
enters the group-based permission system.
STANDARD + SERVICE_ACCOUNT → participate in group system
BOT, EXT_PERM_USER, ANONYMOUS → fixed behavior
"""
STANDARD = "standard"
BOT = "bot"
EXT_PERM_USER = "ext_perm_user"
SERVICE_ACCOUNT = "service_account"
ANONYMOUS = "anonymous"
class GrantSource(str, PyEnum):
"""How a permission grant was created."""
USER = "user"
SCIM = "scim"
SYSTEM = "system"
class IndexingStatus(str, PyEnum):
@@ -304,3 +331,64 @@ class LLMModelFlowType(str, PyEnum):
CHAT = "chat"
VISION = "vision"
CONTEXTUAL_RAG = "contextual_rag"
class HookPoint(str, PyEnum):
DOCUMENT_INGESTION = "document_ingestion"
QUERY_PROCESSING = "query_processing"
class HookFailStrategy(str, PyEnum):
HARD = "hard" # exception propagates, pipeline aborts
SOFT = "soft" # log error, return original input, pipeline continues
class Permission(str, PyEnum):
"""
Permission tokens for group-based authorization.
19 tokens total. full_admin_panel_access is an override —
if present, any permission check passes.
"""
# Basic (auto-granted to every new group)
BASIC_ACCESS = "basic"
# Read tokens — implied only, never granted directly
READ_CONNECTORS = "read:connectors"
READ_DOCUMENT_SETS = "read:document_sets"
READ_AGENTS = "read:agents"
READ_USERS = "read:users"
# Add / Manage pairs
ADD_AGENTS = "add:agents"
MANAGE_AGENTS = "manage:agents"
MANAGE_DOCUMENT_SETS = "manage:document_sets"
ADD_CONNECTORS = "add:connectors"
MANAGE_CONNECTORS = "manage:connectors"
MANAGE_LLMS = "manage:llms"
# Toggle tokens
READ_AGENT_ANALYTICS = "read:agent_analytics"
MANAGE_ACTIONS = "manage:actions"
READ_QUERY_HISTORY = "read:query_history"
MANAGE_USER_GROUPS = "manage:user_groups"
CREATE_USER_API_KEYS = "create:user_api_keys"
CREATE_SERVICE_ACCOUNT_API_KEYS = "create:service_account_api_keys"
CREATE_SLACK_DISCORD_BOTS = "create:slack_discord_bots"
# Override — any permission check passes
FULL_ADMIN_PANEL_ACCESS = "admin"
# Permissions that are implied by other grants and must never be stored
# directly in the permission_grant table.
IMPLIED: ClassVar[frozenset[Permission]]
Permission.IMPLIED = frozenset(
{
Permission.READ_CONNECTORS,
Permission.READ_DOCUMENT_SETS,
Permission.READ_AGENTS,
Permission.READ_USERS,
}
)

235
backend/onyx/db/hook.py Normal file
View File

@@ -0,0 +1,235 @@
import datetime
from uuid import UUID
from sqlalchemy import delete
from sqlalchemy import select
from sqlalchemy.engine import CursorResult
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
from onyx.db.constants import UNSET
from onyx.db.constants import UnsetType
from onyx.db.enums import HookFailStrategy
from onyx.db.enums import HookPoint
from onyx.db.models import Hook
from onyx.db.models import HookExecutionLog
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
# ── Hook CRUD ────────────────────────────────────────────────────────────
def get_hook_by_id(
*,
db_session: Session,
hook_id: int,
include_deleted: bool = False,
include_creator: bool = False,
) -> Hook | None:
stmt = select(Hook).where(Hook.id == hook_id)
if not include_deleted:
stmt = stmt.where(Hook.deleted.is_(False))
if include_creator:
stmt = stmt.options(selectinload(Hook.creator))
return db_session.scalar(stmt)
def get_non_deleted_hook_by_hook_point(
*,
db_session: Session,
hook_point: HookPoint,
include_creator: bool = False,
) -> Hook | None:
stmt = (
select(Hook).where(Hook.hook_point == hook_point).where(Hook.deleted.is_(False))
)
if include_creator:
stmt = stmt.options(selectinload(Hook.creator))
return db_session.scalar(stmt)
def get_hooks(
*,
db_session: Session,
include_deleted: bool = False,
include_creator: bool = False,
) -> list[Hook]:
stmt = select(Hook)
if not include_deleted:
stmt = stmt.where(Hook.deleted.is_(False))
if include_creator:
stmt = stmt.options(selectinload(Hook.creator))
stmt = stmt.order_by(Hook.hook_point, Hook.created_at.desc())
return list(db_session.scalars(stmt).all())
def create_hook__no_commit(
*,
db_session: Session,
name: str,
hook_point: HookPoint,
endpoint_url: str | None = None,
api_key: str | None = None,
fail_strategy: HookFailStrategy,
timeout_seconds: float,
is_active: bool = False,
is_reachable: bool | None = None,
creator_id: UUID | None = None,
) -> Hook:
"""Create a new hook for the given hook point.
At most one non-deleted hook per hook point is allowed. Raises
OnyxError(CONFLICT) if a hook already exists, including under concurrent
duplicate creates where the partial unique index fires an IntegrityError.
"""
existing = get_non_deleted_hook_by_hook_point(
db_session=db_session, hook_point=hook_point
)
if existing:
raise OnyxError(
OnyxErrorCode.CONFLICT,
f"A hook for '{hook_point.value}' already exists (id={existing.id}).",
)
hook = Hook(
name=name,
hook_point=hook_point,
endpoint_url=endpoint_url,
api_key=api_key,
fail_strategy=fail_strategy,
timeout_seconds=timeout_seconds,
is_active=is_active,
is_reachable=is_reachable,
creator_id=creator_id,
)
# Use a savepoint so that a failed insert only rolls back this operation,
# not the entire outer transaction.
savepoint = db_session.begin_nested()
try:
db_session.add(hook)
savepoint.commit()
except IntegrityError as exc:
savepoint.rollback()
if "ix_hook_one_non_deleted_per_point" in str(exc.orig):
raise OnyxError(
OnyxErrorCode.CONFLICT,
f"A hook for '{hook_point.value}' already exists.",
)
raise # re-raise unrelated integrity errors (FK violations, etc.)
return hook
def update_hook__no_commit(
*,
db_session: Session,
hook_id: int,
name: str | None = None,
endpoint_url: str | None | UnsetType = UNSET,
api_key: str | None | UnsetType = UNSET,
fail_strategy: HookFailStrategy | None = None,
timeout_seconds: float | None = None,
is_active: bool | None = None,
is_reachable: bool | None = None,
include_creator: bool = False,
) -> Hook:
"""Update hook fields.
Sentinel conventions:
- endpoint_url, api_key: pass UNSET to leave unchanged; pass None to clear.
- name, fail_strategy, timeout_seconds, is_active, is_reachable: pass None to leave unchanged.
"""
hook = get_hook_by_id(
db_session=db_session, hook_id=hook_id, include_creator=include_creator
)
if hook is None:
raise OnyxError(OnyxErrorCode.NOT_FOUND, f"Hook with id {hook_id} not found.")
if name is not None:
hook.name = name
if not isinstance(endpoint_url, UnsetType):
hook.endpoint_url = endpoint_url
if not isinstance(api_key, UnsetType):
hook.api_key = api_key # type: ignore[assignment] # EncryptedString coerces str → SensitiveValue at the ORM level
if fail_strategy is not None:
hook.fail_strategy = fail_strategy
if timeout_seconds is not None:
hook.timeout_seconds = timeout_seconds
if is_active is not None:
hook.is_active = is_active
if is_reachable is not None:
hook.is_reachable = is_reachable
db_session.flush()
return hook
def delete_hook__no_commit(
*,
db_session: Session,
hook_id: int,
) -> None:
hook = get_hook_by_id(db_session=db_session, hook_id=hook_id)
if hook is None:
raise OnyxError(OnyxErrorCode.NOT_FOUND, f"Hook with id {hook_id} not found.")
hook.deleted = True
hook.is_active = False
db_session.flush()
# ── HookExecutionLog CRUD ────────────────────────────────────────────────
def create_hook_execution_log__no_commit(
*,
db_session: Session,
hook_id: int,
is_success: bool,
error_message: str | None = None,
status_code: int | None = None,
duration_ms: int | None = None,
) -> HookExecutionLog:
log = HookExecutionLog(
hook_id=hook_id,
is_success=is_success,
error_message=error_message,
status_code=status_code,
duration_ms=duration_ms,
)
db_session.add(log)
db_session.flush()
return log
def get_hook_execution_logs(
*,
db_session: Session,
hook_id: int,
limit: int,
) -> list[HookExecutionLog]:
stmt = (
select(HookExecutionLog)
.where(HookExecutionLog.hook_id == hook_id)
.order_by(HookExecutionLog.created_at.desc())
.limit(limit)
)
return list(db_session.scalars(stmt).all())
def cleanup_old_execution_logs__no_commit(
*,
db_session: Session,
max_age_days: int,
) -> int:
"""Delete execution logs older than max_age_days. Returns the number of rows deleted."""
cutoff = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(
days=max_age_days
)
result: CursorResult = db_session.execute( # type: ignore[assignment]
delete(HookExecutionLog)
.where(HookExecutionLog.created_at < cutoff)
.execution_options(synchronize_session=False)
)
return result.rowcount

View File

@@ -2,6 +2,8 @@ from collections.abc import Sequence
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from typing import NamedTuple
from typing import TYPE_CHECKING
from typing import TypeVarTuple
from sqlalchemy import and_
@@ -28,6 +30,9 @@ from onyx.utils.logger import setup_logger
from onyx.utils.telemetry import optional_telemetry
from onyx.utils.telemetry import RecordType
if TYPE_CHECKING:
from onyx.configs.constants import DocumentSource
# from sqlalchemy.sql.selectable import Select
# Comment out unused imports that cause mypy errors
@@ -583,6 +588,67 @@ def get_latest_index_attempt_for_cc_pair_id(
return db_session.execute(stmt).scalar_one_or_none()
def get_latest_successful_index_attempt_for_cc_pair_id(
db_session: Session,
connector_credential_pair_id: int,
secondary_index: bool = False,
) -> IndexAttempt | None:
"""Returns the most recent successful index attempt for the given cc pair,
filtered to the current (or future) search settings.
Uses MAX(id) semantics to match get_latest_index_attempts_by_status."""
status = IndexModelStatus.FUTURE if secondary_index else IndexModelStatus.PRESENT
stmt = (
select(IndexAttempt)
.where(
IndexAttempt.connector_credential_pair_id == connector_credential_pair_id,
IndexAttempt.status.in_(
[IndexingStatus.SUCCESS, IndexingStatus.COMPLETED_WITH_ERRORS]
),
)
.join(SearchSettings)
.where(SearchSettings.status == status)
.order_by(desc(IndexAttempt.id))
.limit(1)
)
return db_session.execute(stmt).scalar_one_or_none()
def get_latest_successful_index_attempts_parallel(
secondary_index: bool = False,
) -> Sequence[IndexAttempt]:
"""Batch version: returns the latest successful index attempt per cc pair.
Covers both SUCCESS and COMPLETED_WITH_ERRORS (matching is_successful())."""
model_status = (
IndexModelStatus.FUTURE if secondary_index else IndexModelStatus.PRESENT
)
with get_session_with_current_tenant() as db_session:
latest_ids = (
select(
IndexAttempt.connector_credential_pair_id,
func.max(IndexAttempt.id).label("max_id"),
)
.join(SearchSettings, IndexAttempt.search_settings_id == SearchSettings.id)
.where(
SearchSettings.status == model_status,
IndexAttempt.status.in_(
[IndexingStatus.SUCCESS, IndexingStatus.COMPLETED_WITH_ERRORS]
),
)
.group_by(IndexAttempt.connector_credential_pair_id)
.subquery()
)
stmt = select(IndexAttempt).join(
latest_ids,
(
IndexAttempt.connector_credential_pair_id
== latest_ids.c.connector_credential_pair_id
)
& (IndexAttempt.id == latest_ids.c.max_id),
)
return db_session.execute(stmt).scalars().all()
def count_index_attempts_for_cc_pair(
db_session: Session,
cc_pair_id: int,
@@ -911,3 +977,106 @@ def get_index_attempt_errors_for_cc_pair(
stmt = stmt.offset(page * page_size).limit(page_size)
return list(db_session.scalars(stmt).all())
# ── Metrics query helpers ──────────────────────────────────────────────
class ActiveIndexAttemptMetric(NamedTuple):
"""Row returned by get_active_index_attempts_for_metrics."""
status: IndexingStatus
source: "DocumentSource"
cc_pair_id: int
cc_pair_name: str | None
attempt_count: int
def get_active_index_attempts_for_metrics(
db_session: Session,
) -> list[ActiveIndexAttemptMetric]:
"""Return non-terminal index attempts grouped by status, source, and connector.
Each row is (status, source, cc_pair_id, cc_pair_name, attempt_count).
"""
from onyx.db.models import Connector
terminal_statuses = [s for s in IndexingStatus if s.is_terminal()]
rows = (
db_session.query(
IndexAttempt.status,
Connector.source,
ConnectorCredentialPair.id,
ConnectorCredentialPair.name,
func.count(),
)
.join(
ConnectorCredentialPair,
IndexAttempt.connector_credential_pair_id == ConnectorCredentialPair.id,
)
.join(
Connector,
ConnectorCredentialPair.connector_id == Connector.id,
)
.filter(IndexAttempt.status.notin_(terminal_statuses))
.group_by(
IndexAttempt.status,
Connector.source,
ConnectorCredentialPair.id,
ConnectorCredentialPair.name,
)
.all()
)
return [ActiveIndexAttemptMetric(*row) for row in rows]
def get_failed_attempt_counts_by_cc_pair(
db_session: Session,
since: datetime | None = None,
) -> dict[int, int]:
"""Return {cc_pair_id: failed_attempt_count} for all connectors.
When ``since`` is provided, only attempts created after that timestamp
are counted. Defaults to the last 90 days to avoid unbounded historical
aggregation.
"""
if since is None:
since = datetime.now(timezone.utc) - timedelta(days=90)
rows = (
db_session.query(
IndexAttempt.connector_credential_pair_id,
func.count(),
)
.filter(IndexAttempt.status == IndexingStatus.FAILED)
.filter(IndexAttempt.time_created >= since)
.group_by(IndexAttempt.connector_credential_pair_id)
.all()
)
return {cc_id: count for cc_id, count in rows}
def get_docs_indexed_by_cc_pair(
db_session: Session,
since: datetime | None = None,
) -> dict[int, int]:
"""Return {cc_pair_id: total_new_docs_indexed} across successful attempts.
Only counts attempts with status SUCCESS to avoid inflating counts with
partial results from failed attempts. When ``since`` is provided, only
attempts created after that timestamp are included.
"""
if since is None:
since = datetime.now(timezone.utc) - timedelta(days=90)
query = (
db_session.query(
IndexAttempt.connector_credential_pair_id,
func.sum(func.coalesce(IndexAttempt.new_docs_indexed, 0)),
)
.filter(IndexAttempt.status == IndexingStatus.SUCCESS)
.filter(IndexAttempt.time_created >= since)
.group_by(IndexAttempt.connector_credential_pair_id)
)
rows = query.all()
return {cc_id: int(total or 0) for cc_id, total in rows}

View File

@@ -48,6 +48,7 @@ from sqlalchemy.types import LargeBinary
from sqlalchemy.types import TypeDecorator
from sqlalchemy import PrimaryKeyConstraint
from onyx.db.enums import AccountType
from onyx.auth.schemas import UserRole
from onyx.configs.constants import (
ANONYMOUS_USER_UUID,
@@ -64,6 +65,8 @@ from onyx.db.enums import (
BuildSessionStatus,
EmbeddingPrecision,
HierarchyNodeType,
HookFailStrategy,
HookPoint,
IndexingMode,
OpenSearchDocumentMigrationStatus,
OpenSearchTenantMigrationStatus,
@@ -76,6 +79,8 @@ from onyx.db.enums import (
MCPAuthenticationPerformer,
MCPTransport,
MCPServerStatus,
Permission,
GrantSource,
LLMModelFlowType,
ThemePreference,
DefaultAppMode,
@@ -300,6 +305,9 @@ class User(SQLAlchemyBaseUserTableUUID, Base):
role: Mapped[UserRole] = mapped_column(
Enum(UserRole, native_enum=False, default=UserRole.BASIC)
)
account_type: Mapped[AccountType | None] = mapped_column(
Enum(AccountType, native_enum=False), nullable=True
)
"""
Preferences probably should be in a separate table at some point, but for now
@@ -2643,6 +2651,15 @@ class ChatMessage(Base):
nullable=True,
)
# For multi-model turns: the user message points to which assistant response
# was selected as the preferred one to continue the conversation with.
preferred_response_id: Mapped[int | None] = mapped_column(
ForeignKey("chat_message.id", ondelete="SET NULL"), nullable=True
)
# The display name of the model that generated this assistant message
model_display_name: Mapped[str | None] = mapped_column(String, nullable=True)
# What does this message contain
reasoning_tokens: Mapped[str | None] = mapped_column(Text, nullable=True)
message: Mapped[str] = mapped_column(Text)
@@ -2710,6 +2727,12 @@ class ChatMessage(Base):
remote_side="ChatMessage.id",
)
preferred_response: Mapped["ChatMessage | None"] = relationship(
"ChatMessage",
foreign_keys=[preferred_response_id],
remote_side="ChatMessage.id",
)
# Chat messages only need to know their immediate tool call children
# If there are nested tool calls, they are stored in the tool_call_children relationship.
tool_calls: Mapped[list["ToolCall"] | None] = relationship(
@@ -3112,8 +3135,6 @@ class VoiceProvider(Base):
is_default_stt: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
is_default_tts: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
deleted: Mapped[bool] = mapped_column(Boolean, default=False)
time_created: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
@@ -3465,9 +3486,9 @@ class Persona(Base):
builtin_persona: Mapped[bool] = mapped_column(Boolean, default=False)
# Featured personas are highlighted in the UI
featured: Mapped[bool] = mapped_column(Boolean, default=False)
# controls whether the persona is available to be selected by users
is_visible: Mapped[bool] = mapped_column(Boolean, default=True)
is_featured: Mapped[bool] = mapped_column(Boolean, default=False)
# controls whether the persona is listed in user-facing agent lists
is_listed: Mapped[bool] = mapped_column(Boolean, default=True)
# controls the ordering of personas in the UI
# higher priority personas are displayed first, ties are resolved by the ID,
# where lower value IDs (e.g. created earlier) are displayed first
@@ -3969,6 +3990,8 @@ class SamlAccount(Base):
class User__UserGroup(Base):
__tablename__ = "user__user_group"
__table_args__ = (Index("ix_user__user_group_user_id", "user_id"),)
is_curator: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
user_group_id: Mapped[int] = mapped_column(
@@ -3979,6 +4002,48 @@ class User__UserGroup(Base):
)
class PermissionGrant(Base):
__tablename__ = "permission_grant"
__table_args__ = (
UniqueConstraint(
"group_id", "permission", name="uq_permission_grant_group_permission"
),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
group_id: Mapped[int] = mapped_column(
ForeignKey("user_group.id", ondelete="CASCADE"), nullable=False
)
permission: Mapped[Permission] = mapped_column(
Enum(Permission, native_enum=False), nullable=False
)
grant_source: Mapped[GrantSource] = mapped_column(
Enum(GrantSource, native_enum=False), nullable=False
)
granted_by: Mapped[UUID | None] = mapped_column(
ForeignKey("user.id", ondelete="SET NULL"), nullable=True
)
granted_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
is_deleted: Mapped[bool] = mapped_column(
Boolean, nullable=False, default=False, server_default=text("false")
)
group: Mapped["UserGroup"] = relationship(
"UserGroup", back_populates="permission_grants"
)
@validates("permission")
def _validate_permission(self, _key: str, value: Permission) -> Permission:
if value in Permission.IMPLIED:
raise ValueError(
f"{value!r} is an implied permission and cannot be granted directly"
)
return value
class UserGroup__ConnectorCredentialPair(Base):
__tablename__ = "user_group__connector_credential_pair"
@@ -4073,6 +4138,8 @@ class UserGroup(Base):
is_up_for_deletion: Mapped[bool] = mapped_column(
Boolean, nullable=False, default=False
)
# whether this is a default group (e.g. "Basic", "Admins") that cannot be deleted
is_default: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
# Last time a user updated this user group
time_last_modified_by_user: Mapped[datetime.datetime] = mapped_column(
@@ -4116,6 +4183,9 @@ class UserGroup(Base):
accessible_mcp_servers: Mapped[list["MCPServer"]] = relationship(
"MCPServer", secondary="mcp_server__user_group", back_populates="user_groups"
)
permission_grants: Mapped[list["PermissionGrant"]] = relationship(
"PermissionGrant", back_populates="group", cascade="all, delete-orphan"
)
"""Tables related to Token Rate Limiting
@@ -5178,3 +5248,90 @@ class CacheStore(Base):
expires_at: Mapped[datetime.datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
class Hook(Base):
"""Pairs a HookPoint with a customer-provided API endpoint.
At most one non-deleted Hook per HookPoint is allowed, enforced by a
partial unique index on (hook_point) where deleted=false.
"""
__tablename__ = "hook"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = mapped_column(String, nullable=False)
hook_point: Mapped[HookPoint] = mapped_column(
Enum(HookPoint, native_enum=False), nullable=False
)
endpoint_url: Mapped[str | None] = mapped_column(Text, nullable=True)
api_key: Mapped[SensitiveValue[str] | None] = mapped_column(
EncryptedString(), nullable=True
)
is_reachable: Mapped[bool | None] = mapped_column(
Boolean, nullable=True, default=None
) # null = never validated, true = last check passed, false = last check failed
fail_strategy: Mapped[HookFailStrategy] = mapped_column(
Enum(HookFailStrategy, native_enum=False),
nullable=False,
default=HookFailStrategy.HARD,
)
timeout_seconds: Mapped[float] = mapped_column(Float, nullable=False, default=30.0)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
deleted: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
creator_id: Mapped[UUID | None] = mapped_column(
PGUUID(as_uuid=True),
ForeignKey("user.id", ondelete="SET NULL"),
nullable=True,
)
created_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
updated_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
)
creator: Mapped["User | None"] = relationship("User", foreign_keys=[creator_id])
execution_logs: Mapped[list["HookExecutionLog"]] = relationship(
"HookExecutionLog", back_populates="hook", cascade="all, delete-orphan"
)
__table_args__ = (
Index(
"ix_hook_one_non_deleted_per_point",
"hook_point",
unique=True,
postgresql_where=(deleted == False), # noqa: E712
),
)
class HookExecutionLog(Base):
"""Records hook executions for health monitoring and debugging.
Currently only failures are logged; the is_success column exists so
success logging can be added later without a schema change.
Retention: rows older than 30 days are deleted by a nightly Celery task.
"""
__tablename__ = "hook_execution_log"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
hook_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("hook.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
is_success: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
status_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
duration_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
created_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False, index=True
)
hook: Mapped["Hook"] = relationship("Hook", back_populates="execution_logs")

View File

@@ -50,8 +50,18 @@ from onyx.utils.variable_functionality import fetch_versioned_implementation
logger = setup_logger()
def get_default_behavior_persona(db_session: Session) -> Persona | None:
def get_default_behavior_persona(
db_session: Session,
eager_load_for_tools: bool = False,
) -> Persona | None:
stmt = select(Persona).where(Persona.id == DEFAULT_PERSONA_ID)
if eager_load_for_tools:
stmt = stmt.options(
selectinload(Persona.tools),
selectinload(Persona.document_sets),
selectinload(Persona.attached_documents),
selectinload(Persona.hierarchy_nodes),
)
return db_session.scalars(stmt).first()
@@ -126,7 +136,7 @@ def _add_user_filters(
else:
# Group the public persona conditions
public_condition = (Persona.is_public == True) & ( # noqa: E712
Persona.is_visible == True # noqa: E712
Persona.is_listed == True # noqa: E712
)
where_clause |= public_condition
@@ -260,7 +270,7 @@ def create_update_persona(
try:
# Featured persona validation
if create_persona_request.featured:
if create_persona_request.is_featured:
# Curators can edit featured personas, but not make them
# TODO this will be reworked soon with RBAC permissions feature
if user.role == UserRole.CURATOR or user.role == UserRole.GLOBAL_CURATOR:
@@ -300,7 +310,7 @@ def create_update_persona(
remove_image=create_persona_request.remove_image,
search_start_date=create_persona_request.search_start_date,
label_ids=create_persona_request.label_ids,
featured=create_persona_request.featured,
is_featured=create_persona_request.is_featured,
user_file_ids=converted_user_file_ids,
commit=False,
hierarchy_node_ids=create_persona_request.hierarchy_node_ids,
@@ -910,11 +920,11 @@ def upsert_persona(
uploaded_image_id: str | None = None,
icon_name: str | None = None,
display_priority: int | None = None,
is_visible: bool = True,
is_listed: bool = True,
remove_image: bool | None = None,
search_start_date: datetime | None = None,
builtin_persona: bool = False,
featured: bool | None = None,
is_featured: bool | None = None,
label_ids: list[int] | None = None,
user_file_ids: list[UUID] | None = None,
hierarchy_node_ids: list[int] | None = None,
@@ -1037,13 +1047,13 @@ def upsert_persona(
if remove_image or uploaded_image_id:
existing_persona.uploaded_image_id = uploaded_image_id
existing_persona.icon_name = icon_name
existing_persona.is_visible = is_visible
existing_persona.is_listed = is_listed
existing_persona.search_start_date = search_start_date
if label_ids is not None:
existing_persona.labels.clear()
existing_persona.labels = labels or []
existing_persona.featured = (
featured if featured is not None else existing_persona.featured
existing_persona.is_featured = (
is_featured if is_featured is not None else existing_persona.is_featured
)
# Update embedded prompt fields if provided
if system_prompt is not None:
@@ -1109,9 +1119,9 @@ def upsert_persona(
uploaded_image_id=uploaded_image_id,
icon_name=icon_name,
display_priority=display_priority,
is_visible=is_visible,
is_listed=is_listed,
search_start_date=search_start_date,
featured=(featured if featured is not None else False),
is_featured=(is_featured if is_featured is not None else False),
user_files=user_files or [],
labels=labels or [],
hierarchy_nodes=hierarchy_nodes or [],
@@ -1158,7 +1168,7 @@ def delete_old_default_personas(
def update_persona_featured(
persona_id: int,
featured: bool,
is_featured: bool,
db_session: Session,
user: User,
) -> None:
@@ -1166,13 +1176,13 @@ def update_persona_featured(
db_session=db_session, persona_id=persona_id, user=user, get_editable=True
)
persona.featured = featured
persona.is_featured = is_featured
db_session.commit()
def update_persona_visibility(
persona_id: int,
is_visible: bool,
is_listed: bool,
db_session: Session,
user: User,
) -> None:
@@ -1180,7 +1190,7 @@ def update_persona_visibility(
db_session=db_session, persona_id=persona_id, user=user, get_editable=True
)
persona.is_visible = is_visible
persona.is_listed = is_listed
db_session.commit()

View File

@@ -12,6 +12,7 @@ from sqlalchemy.orm import Session
from starlette.background import BackgroundTasks
from onyx.configs.app_configs import DISABLE_VECTOR_DB
from onyx.configs.constants import CELERY_USER_FILE_PROCESSING_TASK_EXPIRES
from onyx.configs.constants import FileOrigin
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
@@ -144,6 +145,7 @@ def upload_files_to_user_files_with_indexing(
kwargs={"user_file_id": user_file.id, "tenant_id": tenant_id},
queue=OnyxCeleryQueues.USER_FILE_PROCESSING,
priority=OnyxCeleryPriority.HIGH,
expires=CELERY_USER_FILE_PROCESSING_TASK_EXPIRES,
)
logger.info(
f"Triggered indexing for user_file_id={user_file.id} with task_id={task.id}"

View File

@@ -75,7 +75,7 @@ def create_slack_channel_persona(
llm_model_version_override=None,
starter_messages=None,
is_public=True,
featured=False,
is_featured=False,
db_session=db_session,
commit=False,
)

View File

@@ -2,6 +2,7 @@ import time
from sqlalchemy.orm import Session
from onyx.configs.app_configs import DISABLE_VECTOR_DB
from onyx.configs.app_configs import VESPA_NUM_ATTEMPTS_ON_STARTUP
from onyx.configs.constants import KV_REINDEX_KEY
from onyx.db.connector_credential_pair import get_connector_credential_pairs
@@ -149,6 +150,9 @@ def check_and_perform_index_swap(db_session: Session) -> SearchSettings | None:
Returns None if search settings did not change, or the old search settings if they
did change.
"""
if DISABLE_VECTOR_DB:
return None
# Default CC-pair created for Ingestion API unused here
all_cc_pairs = get_connector_credential_pairs(db_session)
cc_pair_count = max(len(all_cc_pairs) - 1, 0)

Some files were not shown because too many files have changed in this diff Show More