Compare commits

..

199 Commits

Author SHA1 Message Date
SubashMohan
508c10daaf fix(user-create): ensure account_type is always set to STANDARD in create_update_dict 2026-04-01 14:29:48 +05:30
SubashMohan
c0003f0b77 fix(user-create): enforce STANDARD account type for self-registration 2026-04-01 13:04:55 +05:30
SubashMohan
dbf202ba05 fix(users): move user group assignment inside tenant context for proper schema targeting 2026-04-01 13:04:55 +05:30
SubashMohan
4987c9581f feat(enums): update GrantSource values to uppercase for consistency 2026-04-01 13:04:55 +05:30
SubashMohan
6a69347bd6 fix(migration): remove check for deleted permissions in basic grant logic 2026-04-01 13:04:55 +05:30
SubashMohan
b8cd332a36 refactor(users): remove user assignment to default groups after account upgrade 2026-04-01 13:04:55 +05:30
SubashMohan
e319728544 feat(permissions): enhance permission recomputation logic to exclude deleted grants 2026-04-01 13:04:55 +05:30
SubashMohan
b42a7858ed fix(docs): clarify user permissions aggregation in effective_permissions migration 2026-04-01 13:04:55 +05:30
SubashMohan
44e2fccbef feat(enums): update AccountType values to uppercase for consistency 2026-04-01 13:04:55 +05:30
SubashMohan
fbf0effcdf fix(permissions): ensure valid user IDs are processed in group permission recomputation 2026-04-01 13:04:55 +05:30
SubashMohan
a71c18c454 refactor(permissions): rename IMPLIES to IMPLIED_PERMISSIONS for clarity 2026-04-01 13:04:55 +05:30
SubashMohan
ccc813e075 feat(permissions): enhance effective permissions logic with user group and permission grant tables 2026-04-01 13:04:55 +05:30
SubashMohan
6eeeda2ab8 feat(permissions): refactor permission recomputation logic and introduce no-commit variants 2026-04-01 13:04:55 +05:30
SubashMohan
7cc4bf2286 feat(permissions): add endpoint to retrieve current user permissions and update permissions on user group changes 2026-04-01 13:04:55 +05:30
SubashMohan
ab0eeb5585 feat(permissions): grant basic permission to new user groups and add API endpoint for permission retrieval 2026-04-01 13:04:55 +05:30
SubashMohan
12b0b01787 feat(permissions): add effective_permissions JSONB column and related logic for permission resolution 2026-04-01 13:04:55 +05:30
SubashMohan
cb11a5c472 feat(notification): add USER_GROUP_ASSIGNMENT_FAILED type and improve notification query logic 2026-04-01 13:04:55 +05:30
SubashMohan
ee80b91b20 feat(migration): add error handling for missing default groups in user assignment
feat(users): update account type for external permissioned users to BOT
fix(tests): update tests to reflect changes in default group assignment behavior
feat(types): introduce AccountType enum for better user role management
2026-04-01 13:04:55 +05:30
SubashMohan
3a643067b9 fix(tests): comment out group visibility checks pending permission implementation 2026-04-01 13:04:55 +05:30
SubashMohan
c4c1de8d87 feat(migration): enhance downgrade logic to prevent FK violations and improve user creation tests with account type handling 2026-04-01 13:04:55 +05:30
SubashMohan
346de4fb39 fix(users): update permission handling in default group assignment logic 2026-04-01 13:04:55 +05:30
SubashMohan
4c08d1730f feat(users): add account_type to user creation and assign to default groups 2026-04-01 13:04:55 +05:30
SubashMohan
cfc2881b97 feat(migration): assign existing users to default groups
Add migration to assign all existing users to Admin/Basic default groups
based on their role and account_type. Add integration tests for group
assignment across all user creation paths (API keys, SAML, SCIM, registration).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 13:04:55 +05:30
SubashMohan
3c038165bb feat(users): backfill account_type and assign new users to default groups
Add migration to backfill account_type column (NOT NULL, default STANDARD)
based on existing user roles. Update all user creation paths (OAuth, SAML,
SCIM, API keys, Slack, anonymous) to set account_type and assign users to
appropriate default groups. Add comprehensive unit tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 13:04:55 +05:30
SubashMohan
04bdfe4749 feat(groups): seed default Admin and Basic user groups
Add migration to create system default user groups (Admin, Basic) with
permission grants. Update API and frontend to support is_default flag,
protect default groups from rename/delete, and add include_default
query parameter for group listing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 13:04:55 +05:30
Danelegend
de0f42f6cc refactor(files): Port csv type to tabular (#9785) 2026-04-01 03:37:13 +00:00
Raunak Bhagat
7ecefdc90f refactor(opal): split Card sizeVariant into padding + rounding (#9823) 2026-04-01 03:32:08 +00:00
Danelegend
21fc013893 feat(file-upload): Upload files exceeding tokens but skip indexing (#9751) 2026-04-01 02:14:51 +00:00
Justin Tahara
a1c3a68ba4 fix(perf): optimize chat sessions query to prevent DB cascading failures (#9802) 2026-04-01 01:28:37 +00:00
Evan Lohn
4fb175ae65 fix: install early exit (#9818) 2026-04-01 01:09:05 +00:00
Evan Lohn
800ad326df fix: discord token validation (#9817) 2026-04-01 01:08:38 +00:00
Bo-Onyx
6b920e8a3e feat(hook): refactor under ee (#9776) 2026-04-01 01:07:55 +00:00
Justin Tahara
ef3760796d feat(rds): Adding IO Metrics Alarms (#9789) 2026-04-01 01:07:45 +00:00
Jessica Singh
fa5b90df92 fix(connectors): fix reindex on paused file connectors (#9812) 2026-03-31 23:10:09 +00:00
Evan Lohn
53953ac4fa chore: fix indexing log2 (#9811) 2026-03-31 21:02:54 +00:00
Yuhong Sun
26bb5c990c chore: Rag script for benchmark/regression (#9781) 2026-03-31 20:46:17 +00:00
Evan Lohn
27b4ed301f chore: fix batch logging (#9808) 2026-03-31 20:10:33 +00:00
Jessica Singh
93ec270ccc feat(voice): VAD auto-stop only when auto-send is enabled (#9809) 2026-03-31 19:31:31 +00:00
Raunak Bhagat
9e2d6c8a1d refactor(admin): code-interpreter (#9790) 2026-03-31 19:08:55 +00:00
Nikolas Garza
fc934214d0 perf(swr): add SWR_KEYS registry and skip revalidation for stable hooks (#9695) 2026-03-31 19:07:42 +00:00
Raunak Bhagat
48fc45a0cd refactor(admin): web-search (#9761) 2026-03-31 19:04:18 +00:00
Jessica Singh
009266e53e fix(llm): when multiple providers are same type ensure name is prioritized when default (#9777) 2026-03-31 19:03:38 +00:00
Raunak Bhagat
ffb9df7308 refactor(admin): LLM Config (#9806) 2026-03-31 19:03:17 +00:00
Raunak Bhagat
b0f5e0b8d9 refactor(admin): image-generation (#9769) 2026-03-31 18:13:23 +00:00
acaprau
43aea5d614 chore(opensearch): Add Grafana dashboard for retrieval (#9657)
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
2026-03-31 16:56:40 +00:00
Bo-Onyx
593d82f431 feat(hook): hook status and logs (#9770) 2026-03-31 16:10:12 +00:00
Ben Wu
adf5691b5f feat(canvas 2/4): Canvas Connector data fetching (#9386) 2026-03-31 03:07:05 +00:00
Nikolas Garza
c1a8a5bd83 fix(tenants): run migrations on pool tenants before assigning to new users (#9788) 2026-03-31 01:24:01 +00:00
Justin Tahara
8fd486da99 feat(rds): Add Freeable Memory alert (#9787) 2026-03-31 00:59:30 +00:00
Raunak Bhagat
4bda4d3637 refactor: migrate away from cards/Select (#9771) 2026-03-31 00:27:01 +00:00
Justin Tahara
13c25eadad feat(rds): Adding CPU Alerts (#9784) 2026-03-31 00:22:15 +00:00
Justin Tahara
1f244e6388 feat(eks): Adding Cloudwatch logging (#9783) 2026-03-30 23:52:44 +00:00
Nikolas Garza
18b0416d30 feat(sentry): enable frontend source map uploads in cloud CI (#9775) 2026-03-30 23:42:57 +00:00
Nikolas Garza
4bc0bc1efb feat(helm): add Grafana dashboard provisioning (#9725) 2026-03-30 23:42:32 +00:00
Justin Tahara
1555217061 feat(rds): Adding RDS Snapshosts (#9779) 2026-03-30 23:17:08 +00:00
Nikolas Garza
d177a833f0 feat(sentry): add release tracking to backend and frontend (#9773) 2026-03-30 22:35:38 +00:00
Jamison Lahman
086997d3c5 chore(types): fix IconButton size props (#9772) 2026-03-30 21:40:25 +00:00
dependabot[bot]
dccec78397 chore(deps): bump helm/chart-testing-action from b5eebdd9998021f29756c53432f48dab66394810 to 2e2940618cb426dce2999631d543b53cdcfc8527 (#9764)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-30 14:41:01 -07:00
Jamison Lahman
0123133621 chore(fe): polish Query History table (#9767) 2026-03-30 21:30:13 +00:00
dependabot[bot]
0b9d154a73 chore(deps): bump runs-on/cache from 50350ad4242587b6c8c2baa2e740b1bc11285ff4 to a5f51d6f3fece787d03b7b4e981c82538a0654ed (#9763)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-30 13:54:43 -07:00
dependabot[bot]
6e65e55bf5 chore(deps): bump actions/cache from 5.0.3 to 5.0.4 (#9765)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-30 13:46:53 -07:00
Raunak Bhagat
3f9e208759 feat(opal): SelectCard + CardHeaderLayout (#9760) 2026-03-30 19:54:54 +00:00
dependabot[bot]
fb8edda14a chore(deps): bump pygments from 2.19.2 to 2.20.0 (#9757)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-30 18:30:18 +00:00
Jamison Lahman
58decd8a6b chore(gha): prefer ci-protected env (#9728) 2026-03-30 17:20:54 +00:00
Danelegend
e97204d9cc feat(indexing): Batch chunks during doc processing (#9468) 2026-03-30 11:49:36 +00:00
Danelegend
44ab02c94f refactor(indexing): Refactor indexing vector db abstraction (#9653) 2026-03-30 09:57:16 +00:00
Danelegend
a98cc30f25 refactor(indexing): Change adapters to support iterables (#9469) 2026-03-30 01:43:10 +00:00
Danelegend
a709dcb8fa feat(indexing): Max chunk processing (#9400) 2026-03-30 00:10:24 +00:00
Raunak Bhagat
a3dfe6aa1b refactor(opal): unify Interactive color system (#9717) 2026-03-28 00:40:23 +00:00
Nikolas Garza
23e4d55fb1 perf(swr): convert raw-fetch hooks to SWR to eliminate duplicate requests (#9694) 2026-03-28 00:26:20 +00:00
Jamison Lahman
470cc85f83 feat(cli): onyx-cli serve over SSH (#9726) 2026-03-27 23:46:14 +00:00
Justin Tahara
64d9be5a41 fix(openpyxl): Colors must be aRGB hex values (#9727) 2026-03-27 23:14:36 +00:00
roshan
71a5b469b0 feat(widget): add citation badges to chat widget (#9714) 2026-03-27 22:39:46 +00:00
Evan Lohn
462eb0697f fix: Anthropic litellm thinking workaround (#9713) 2026-03-27 21:03:05 +00:00
dependabot[bot]
b708dc8796 chore(deps): bump langchain-core from 1.2.11 to 1.2.22 (#9720)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-27 20:50:19 +00:00
dependabot[bot]
c9e2c32f55 chore(deps): bump cryptography from 46.0.5 to 46.0.6 (#9721)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-27 20:48:59 +00:00
Jamison Lahman
d725df62e7 feat(cli): --version and validate-config warn if backend version is incompatible (#9715) 2026-03-27 13:13:16 -07:00
Jamison Lahman
d1460972b6 fix(cli): onyx-cli --version interpolation (#9712) 2026-03-27 19:22:31 +00:00
Jamison Lahman
706872f0b7 chore(deps): upgrade go deps (#9711) 2026-03-27 12:24:25 -07:00
Jamison Lahman
ed3856be2b chore(release): build all CLI wheels before publishing (#9710) 2026-03-27 19:04:02 +00:00
Jamison Lahman
6326c7f0b9 chore(gha): fix git error after helm release migration to alpine base image (#9709) 2026-03-27 11:21:34 -07:00
Jamison Lahman
40420fc4e6 chore(gha): helm release upstream nits (#9708) 2026-03-27 11:10:41 -07:00
Nikolas Garza
1a2b6a66cc fix(celery): use broker connection pool to prevent Redis connection leak (#9682) 2026-03-27 17:53:49 +00:00
Jamison Lahman
d1b1529ccf chore(gha): fix helm release after image update (#9707) 2026-03-27 10:37:43 -07:00
Bo-Onyx
fedd9c76e5 feat(hook): admin page create or edit hook (#9690) 2026-03-27 17:10:14 +00:00
Jamison Lahman
0b34b40b79 chore(gha): pin helm release docker image (#9706) 2026-03-27 10:16:41 -07:00
Yuhong Sun
fe82ddb1b9 Update README.md (#9703) 2026-03-27 10:03:56 -07:00
Jamison Lahman
32d3d70525 chore(playwright): deflake settings_pages.spec.ts (#9684) 2026-03-27 15:54:23 +00:00
Jamison Lahman
40b9e10890 chore(devtools): upgrade ods: 0.7.1->0.7.2 (#9701) 2026-03-27 08:17:42 -07:00
dependabot[bot]
e21b204b8a chore(deps): bump brace-expansion in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#9698)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-27 08:10:15 -07:00
Jamison Lahman
2f672b3a4f fix(fe): Popover content doesnt overflow on small screens (#9612) 2026-03-27 08:07:52 -07:00
Nikolas Garza
cf19d0df4f feat(helm): add Prometheus metrics ports and Services for celery workers (#9630) 2026-03-27 08:03:48 +00:00
Danelegend
86a6a4c134 refactor(indexing): Vespa & Opensearch index function use Iterable (#9384) 2026-03-27 04:36:59 +00:00
SubashMohan
146b5449d2 feat: configurable file upload size and token limits via admin settings (#9232) 2026-03-27 04:23:16 +00:00
Jamison Lahman
b66991b5c5 chore(devtools): ods trace (#9688)
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
2026-03-27 03:56:38 +00:00
dependabot[bot]
9cb76dc027 chore(deps-dev): bump picomatch from 2.3.1 to 2.3.2 in /web (#9691)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-27 02:22:22 +00:00
dependabot[bot]
f66891d19e chore(deps-dev): bump handlebars from 4.7.8 to 4.7.9 in /web (#9689)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-27 01:41:29 +00:00
Nikolas Garza
c07c952ad5 chore(greptile): add nginx routing rule for non-api backend routes (#9687) 2026-03-27 00:34:15 +00:00
Nikolas Garza
be7f40a28a fix(nginx): route /scim/* to api_server (#9686) 2026-03-26 17:21:57 -07:00
Evan Lohn
26f941b5da perf: perm sync start time (#9685) 2026-03-27 00:07:53 +00:00
Jamison Lahman
b9e84c42a8 feat(providers): allow deleting all types of providers (#9625)
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
2026-03-26 15:20:56 -07:00
Bo-Onyx
0a1df52c2f feat(hook): Hook Form Modal Polish. (#9683) 2026-03-26 22:12:33 +00:00
Nikolas Garza
306b0d452f fix(billing): retry claimLicense up to 3x after Stripe checkout return (#9669) 2026-03-26 21:06:19 +00:00
Justin Tahara
5fdb34ba8e feat(llm): add Bifrost gateway frontend modal and provider registration (#9617) 2026-03-26 20:50:25 +00:00
Jamison Lahman
2d066631e3 fix(voice): dont soft-delete providers (#9679) 2026-03-26 19:26:32 +00:00
Evan Lohn
5c84f6c61b fix(jira): large batches fail json decode (#9677) 2026-03-26 18:53:37 +00:00
Nikolas Garza
899179d4b6 fix(api-key): clarify upgrade message for trial accounts (#9678) 2026-03-26 18:32:41 +00:00
Bo-Onyx
80d6bafc74 feat(hook): Hook connect/manage modal (#9645) 2026-03-26 18:16:33 +00:00
Nikolas Garza
2cc325cb0e chore(greptile): split greptile.json into .greptile/ directory (#9668) 2026-03-26 17:05:43 +00:00
Raunak Bhagat
849385b756 refactor: migrate legacy components/Text (#9628) 2026-03-26 16:14:03 +00:00
Ben Wu
417b9c12e4 feat(canvas): add API client, data models, and connector scaffold 1/6 (#9385)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 15:26:52 +00:00
Raunak Bhagat
30b37d0a77 fix(admin): wrap system prompt modal in Formik with markdown subDescription (#9667) 2026-03-26 07:08:56 -07:00
Justin Tahara
b48be0cd3a feat(llm): add Bifrost gateway as LLM provider (backend) (#9616) 2026-03-26 05:09:20 +00:00
Nikolas Garza
127fd90424 fix(metrics): replace inspect.ping() with event-based worker health monitoring (#9633) 2026-03-26 03:36:07 +00:00
Raunak Bhagat
f9c9e55f32 refactor(opal): accept string | RichStr in all Opal text-rendering components, modals, and input-layouts (#9656) 2026-03-26 02:46:34 +00:00
Raunak Bhagat
5afcf1acea fix(opal): remove gap between title and description in ContentMd (#9666) 2026-03-25 19:45:21 -07:00
Nikolas Garza
eb1244a9d7 feat(chat): add DB schema and Pydantic models for multi-model answers (#9646) 2026-03-26 02:21:00 +00:00
Evan Lohn
2433a9a4c5 feat: sharepoint filters (denylist) (#9649) 2026-03-26 01:33:18 +00:00
dependabot[bot]
60bc8fcac6 chore(deps): bump nltk from 3.9.3 to 3.9.4 (#9663)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-26 00:50:52 +00:00
dependabot[bot]
1ddc958a51 chore(deps): bump picomatch in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#9662)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-25 17:54:28 -07:00
acaprau
de37acbe07 chore(opensearch): Optimize terms filters; add type aliases (#9619) 2026-03-26 00:35:53 +00:00
Wenxi
08cd2f2c3e fix(ci): tag web-server and model-server with craft-latest (#9661)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 00:35:37 +00:00
acaprau
fc29f20914 feat(opensearch): Add Prometheus metrics for OpenSearch retrieval (#9654) 2026-03-26 00:29:29 +00:00
dependabot[bot]
c43cb80a7a chore(deps): bump yaml from 1.10.2 to 1.10.3 in /web (#9655)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-25 23:59:17 +00:00
dependabot[bot]
56f0be2ec8 chore(deps): bump requests from 2.32.5 to 2.33.0 (#9652)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-25 23:59:00 +00:00
acaprau
42f9ddf247 feat(opensearch): Search UI search flow can be configured to use pure keyword search (#9500)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
2026-03-25 23:56:32 +00:00
dependabot[bot]
a10a85c73c chore(deps-dev): bump picomatch from 4.0.3 to 4.0.4 in /widget (#9659)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-25 17:01:38 -07:00
Jamison Lahman
31d8ae9718 chore(playwright): rework admin navigation tests (#9650)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-25 23:27:08 +00:00
Nikolas Garza
00a0a99842 fix: clarify service account API key upgrade message for trial accounts (#9581) 2026-03-25 23:22:45 +00:00
dependabot[bot]
90040f8973 chore(deps): bump picomatch in /examples/widget (#9651)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-25 16:20:36 -07:00
Raunak Bhagat
4f5d081f26 feat(opal): add Text component with inline markdown support (#9623) 2026-03-25 23:06:18 +00:00
dependabot[bot]
c51a6dbd0d chore(deps): bump pypdf from 6.9.1 to 6.9.2 (#9637)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-03-25 23:04:27 +00:00
Evan Lohn
8b90ecc189 feat: sharepoint shareable links non-public (#9636) 2026-03-25 22:50:29 +00:00
Justin Tahara
865c893a09 chore(agents): Match Mocks & Add Date Validation (#9632) 2026-03-25 21:57:31 +00:00
Bo-Onyx
ef5628bfa7 feat(hook): Frontend hook infrastructure (#9634) 2026-03-25 21:38:04 +00:00
Jessica Singh
6ffee0021e chore(voice): align fe with other admin pages (#9505) 2026-03-25 20:00:36 +00:00
Jessica Singh
28dc84b831 fix(notion): upgrade API version + logical changes (#9609)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 19:18:51 +00:00
Jamison Lahman
230f035500 fix(chat): dont clear input message after errors submitting (#9624) 2026-03-25 12:00:23 -07:00
Jamison Lahman
55b24d72b4 fix(fe): redirect to status page after deleting connector (#9620) 2026-03-25 17:24:41 +00:00
Raunak Bhagat
3321a84c7d fix(sidebar): fix icon alignment for user-avatar-popover (#9615)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-25 17:07:50 +00:00
SubashMohan
54bf32a5f8 fix: use persisted source functions when toggling search tool (#9548) 2026-03-25 16:50:25 +00:00
Nikolas Garza
4bb6b76be6 feat(groups): switchover to /admin/groups and rewrite e2e tests (#9545) 2026-03-25 08:11:13 +00:00
SubashMohan
db94562474 feat: Group-based permissions — Phase 1 schema (AccountType, Permission, PermissionGrant) (#9547) 2026-03-25 06:24:43 +00:00
Nikolas Garza
582d4642c1 feat(metrics): add task lifecycle and per-connector Prometheus metrics (#9602) 2026-03-25 06:02:43 +00:00
Nikolas Garza
3caaecdb0e feat(groups): polish edit page table and delete UX (#9544) 2026-03-25 04:57:50 +00:00
Nikolas Garza
039b69806b feat(metrics): add queue depth and connector health Prometheus collectors (#9590) 2026-03-25 03:53:26 +00:00
Evan Lohn
63971d4958 fix: confluence client retries (#9605) 2026-03-25 03:32:29 +00:00
Nikolas Garza
ffd897f380 feat(metrics): add reusable Prometheus metrics server for celery workers (#9589) 2026-03-25 01:47:06 +00:00
Evan Lohn
4745069232 fix: no more lazy queries per search call (#9578) 2026-03-25 01:38:35 +00:00
Nikolas Garza
386782f188 feat(groups): add edit group page (#9543) 2026-03-25 01:22:57 +00:00
Raunak Bhagat
ff009c4129 fix: Fix tag widths (#9618) 2026-03-25 01:18:51 +00:00
Bo-Onyx
b20a5ebf69 feat(hook): Add frontend feature control and admin hook page (#9575) 2026-03-25 00:37:37 +00:00
Bo-Onyx
8645adb807 fix(width): UI update model width definition. (#9613) 2026-03-25 00:11:32 +00:00
Nikolas Garza
2425bd4d8d feat(groups): add shared resources and token limit sections (#9538) 2026-03-24 23:44:44 +00:00
Raunak Bhagat
333b2b19cb refactor: fix sidebar layout (#9601)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-24 23:22:00 +00:00
Jamison Lahman
44895b3bd6 fix(ux): disable MCP Tools toggle if needs authenticated (#9607) 2026-03-24 22:45:23 +00:00
Raunak Bhagat
78c2ecf99f refactor(opal): restructure Onyx logo icons into composable parts (#9606)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-24 22:26:28 +00:00
Ciaran Sweet
e3e0e04edc fix: update values.yaml comment for opensearch admin password secretKeyRef (#9595) 2026-03-24 21:54:03 +00:00
Justin Tahara
a19fe03bd8 fix(ui): Text focused paste from PowerPoint (#9603) 2026-03-24 21:23:58 +00:00
Nikolas Garza
415c05b5f8 feat(groups): add create group page (#9515)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 20:55:18 +00:00
Nikolas Garza
352fd19f0a feat(admin): inline group renaming (#9491) 2026-03-24 20:12:17 +00:00
Raunak Bhagat
41ae039bfa refactor(opal): cleanup button types in Opal (#9598) 2026-03-24 20:06:39 +00:00
Bo-Onyx
782c734287 feat(hook): integrate query processing hook point (#9533) 2026-03-24 19:47:17 +00:00
Justin Tahara
728cdb0715 feat(helm): Adding pginto specific host (#9600) 2026-03-24 19:31:02 +00:00
Justin Tahara
baf6437117 fix(mt): Preprovision all tenants at once (#9576) 2026-03-24 19:13:10 +00:00
Raunak Bhagat
f187165077 refactor(opal): opalify FilterButton + migrate all instances away from old one (#9597) 2026-03-24 19:12:00 +00:00
Evan Lohn
727be3d663 fix: eager load chat session persona (#9577) 2026-03-24 19:03:57 +00:00
Jessica Singh
98c8f9884b fix(voice): add WebSocket upgrade headers to nginx configs (#9558)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 18:53:39 +00:00
Jamison Lahman
d79a068984 fix(fe): settings page layout shift on load (#9594) 2026-03-24 18:15:54 +00:00
Wenxi
ba0740d15f fix(fe): map snake_case auth type API response to camelCase (#9586)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 18:01:51 +00:00
Jamison Lahman
86b7bed90b chore(gha): basic test selection for external deps and connector tests (#9596)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-03-24 10:52:40 -07:00
Jamison Lahman
aead6ab9a5 fix(fe): properly style "Sign In" button (#9480) 2026-03-24 10:02:02 -07:00
Jamison Lahman
c9d4c186dd chore(blame): ignore ruff formatting change (#9345) 2026-03-24 10:01:23 -07:00
Jamison Lahman
70aad1ec46 fix(fe): editing an LLM provider uses the global default model (#9502) 2026-03-24 10:01:07 -07:00
Wenxi
ca3cc16ead fix(fe): stop SWR retry spam and spurious logout on auth pages (#9587) 2026-03-24 16:53:56 +00:00
Jamison Lahman
9ea1780ce5 chore(fe): memory input defaults to 1 row with max of 3 (#9563) 2026-03-24 09:35:47 -07:00
Jamison Lahman
f70e5e605e feat(ux): handle when chat session id cannot be found (#9524) 2026-03-24 16:18:52 +00:00
Jamison Lahman
84b134e226 fix(a11y): hidden buttons appear on tabbing (#9518) 2026-03-24 16:18:11 +00:00
Raunak Bhagat
b17c63a7d6 feat(admin): refresh agents page with DataTable and opal components (#9376) 2026-03-24 16:04:10 +00:00
Jamison Lahman
76c41d1b0b chore(fe): always load an empty memory card (#9560) 2026-03-24 15:47:12 +00:00
Jamison Lahman
579b86f1ce chore(fe): memories save after pressing enter (#9553)
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
2026-03-24 15:42:36 +00:00
Jamison Lahman
a53cf13db1 chore(fe): position memories modal at the top (#9554) 2026-03-24 15:42:32 +00:00
Danelegend
06f76bac0c chore(indexing): Add tests for DocumentIndex index function (#9477) 2026-03-24 03:02:25 +00:00
Evan Lohn
e59b0c0d23 refactor: filter fields (#9574) 2026-03-24 02:32:46 +00:00
Evan Lohn
dfa37cce8b chore: use efficient persona id query path (#9573) 2026-03-24 01:49:20 +00:00
Jamison Lahman
6dce6b09e4 chore(playwright): mask date switcher in screenshots (#9584) 2026-03-24 01:39:16 +00:00
acaprau
0eba41c487 chore(opensearch, devtools): Generate embedding script (#9580) 2026-03-24 01:18:38 +00:00
acaprau
a426930123 chore(opensearch, devtools): Benchmarking script (#9579) 2026-03-24 00:35:18 +00:00
Jamison Lahman
73d98c7fa5 fix(ux): display invalid agent fields on load (#9582) 2026-03-24 00:22:15 +00:00
Justin Tahara
a096cf3997 feat(tf): Introduce Opensearch Terraform for AWS (#9523) 2026-03-24 00:16:52 +00:00
Justin Tahara
1e01ff8f10 fix(migration): Fix duplicate Null Users issue (#9568)
Co-authored-by: Jessica Singh <jessicasingh@outlook.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 00:01:56 +00:00
Jamison Lahman
3665bb23c2 chore(fe): action popover item UX (#8831) 2026-03-23 23:29:33 +00:00
Raunak Bhagat
8ff0d5fc15 refactor: update names in Persona table (#9569) 2026-03-23 22:20:47 +00:00
Wenxi
645d45776a fix: alias anonymous ph users with registered users (#9570) 2026-03-23 20:51:15 +00:00
Jamison Lahman
fa06e4ebd5 fix(ux): give a tooltip with reason agent edit cannot save (#9571) 2026-03-23 20:46:03 +00:00
Wenxi
2a61e3ce4c refactor: update auth paths to use onyx error and correctly pass error detail to auth error page (#9565)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-23 20:15:29 +00:00
dependabot[bot]
d8733dd89f chore(deps-dev): bump flatted from 3.3.3 to 3.4.2 in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#9535)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-23 12:48:01 -07:00
Jamison Lahman
c651177529 chore(fe): remove Inter font (#9566) 2026-03-23 19:16:27 +00:00
Wenxi
3193fe76e4 chore: don't allow periods in gmail signup on cloud (#9564)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-23 19:09:20 +00:00
718 changed files with 38423 additions and 10104 deletions

View File

@@ -6,3 +6,4 @@
3134e5f840c12c8f32613ce520101a047c89dcc2 # refactor(whitespace): rm temporary react fragments (#7161)
ed3f72bc75f3e3a9ae9e4d8cd38278f9c97e78b4 # refactor(whitespace): rm react fragment #7190
7b927e79c25f4ddfd18a067f489e122acd2c89de # chore(format): format files where `ruff` and `black` agree (#9339)

View File

@@ -615,6 +615,7 @@ jobs:
tags: |
type=raw,value=${{ needs.determine-builds.outputs.is-test-run == 'true' && format('web-{0}', needs.determine-builds.outputs.sanitized-tag) || github.ref_name }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-latest == 'true' && 'latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-latest == 'true' && 'craft-latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && env.EDGE_TAG == 'true' && 'edge' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-beta == 'true' && 'beta' || '' }}
@@ -703,6 +704,9 @@ jobs:
NEXT_PUBLIC_FORGOT_PASSWORD_ENABLED=true
NEXT_PUBLIC_INCLUDE_ERROR_POPUP_SUPPORT_LINK=true
NODE_OPTIONS=--max-old-space-size=8192
SENTRY_RELEASE=${{ github.sha }}
secrets: |
sentry_auth_token=${{ secrets.SENTRY_AUTH_TOKEN }}
cache-from: |
type=registry,ref=${{ env.RUNS_ON_ECR_CACHE }}:cloudweb-cache-amd64
type=registry,ref=${{ env.REGISTRY_IMAGE }}:latest
@@ -785,6 +789,9 @@ jobs:
NEXT_PUBLIC_FORGOT_PASSWORD_ENABLED=true
NEXT_PUBLIC_INCLUDE_ERROR_POPUP_SUPPORT_LINK=true
NODE_OPTIONS=--max-old-space-size=8192
SENTRY_RELEASE=${{ github.sha }}
secrets: |
sentry_auth_token=${{ secrets.SENTRY_AUTH_TOKEN }}
cache-from: |
type=registry,ref=${{ env.RUNS_ON_ECR_CACHE }}:cloudweb-cache-arm64
type=registry,ref=${{ env.REGISTRY_IMAGE }}:latest
@@ -1263,8 +1270,6 @@ jobs:
latest=false
tags: |
type=raw,value=craft-latest
# TODO: Consider aligning craft-latest tags with regular backend builds (e.g., latest, edge, beta)
# to keep tagging strategy consistent across all backend images
- name: Create and push manifest
env:
@@ -1488,6 +1493,7 @@ jobs:
tags: |
type=raw,value=${{ needs.determine-builds.outputs.is-test-run == 'true' && format('model-server-{0}', needs.determine-builds.outputs.sanitized-tag) || github.ref_name }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-latest == 'true' && 'latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-latest == 'true' && 'craft-latest' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && env.EDGE_TAG == 'true' && 'edge' || '' }}
type=raw,value=${{ needs.determine-builds.outputs.is-test-run != 'true' && needs.determine-builds.outputs.is-beta-standalone == 'true' && 'beta' || '' }}

View File

@@ -47,7 +47,8 @@ jobs:
done
- name: Publish Helm charts to gh-pages
uses: stefanprodan/helm-gh-pages@0ad2bb377311d61ac04ad9eb6f252fb68e207260 # ratchet:stefanprodan/helm-gh-pages@v1.7.0
# NOTE: HEAD of https://github.com/stefanprodan/helm-gh-pages/pull/43
uses: stefanprodan/helm-gh-pages@ad32ad3b8720abfeaac83532fd1e9bdfca5bbe27 # zizmor: ignore[impostor-commit]
with:
token: ${{ secrets.GITHUB_TOKEN }}
charts_dir: deployment/helm/charts

View File

@@ -35,6 +35,7 @@ jobs:
needs: [provider-chat-test]
if: failure() && github.event_name == 'schedule'
runs-on: ubuntu-slim
environment: ci-protected
timeout-minutes: 5
steps:
- name: Checkout

View File

@@ -183,6 +183,7 @@ jobs:
- cherry-pick-to-latest-release
if: needs.resolve-cherry-pick-request.outputs.should_cherrypick == 'true' && needs.resolve-cherry-pick-request.result == 'success' && needs.cherry-pick-to-latest-release.result == 'success'
runs-on: ubuntu-slim
environment: ci-protected
timeout-minutes: 10
steps:
- name: Checkout
@@ -232,6 +233,7 @@ jobs:
- cherry-pick-to-latest-release
if: always() && needs.resolve-cherry-pick-request.outputs.should_cherrypick == 'true' && (needs.resolve-cherry-pick-request.result == 'failure' || needs.cherry-pick-to-latest-release.result == 'failure')
runs-on: ubuntu-slim
environment: ci-protected
timeout-minutes: 10
steps:
- name: Checkout

View File

@@ -63,7 +63,7 @@ jobs:
targets: ${{ matrix.target }}
- name: Cache Cargo registry and build
uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # zizmor: ignore[cache-poisoning]
uses: actions/cache@668228422ae6a00e4ad889ee87cd7109ec5666a7 # zizmor: ignore[cache-poisoning]
with:
path: |
~/.cargo/bin/

View File

@@ -7,6 +7,15 @@ on:
merge_group:
pull_request:
branches: [main]
paths:
- "backend/**"
- "pyproject.toml"
- "uv.lock"
- ".github/workflows/pr-external-dependency-unit-tests.yml"
- ".github/actions/setup-python-and-install-dependencies/**"
- ".github/actions/setup-playwright/**"
- "deployment/docker_compose/docker-compose.yml"
- "deployment/docker_compose/docker-compose.dev.yml"
push:
tags:
- "v*.*.*"

View File

@@ -41,7 +41,7 @@ jobs:
version: v3.19.0
- name: Set up chart-testing
uses: helm/chart-testing-action@b5eebdd9998021f29756c53432f48dab66394810
uses: helm/chart-testing-action@2e2940618cb426dce2999631d543b53cdcfc8527
with:
uv_version: "0.9.9"

View File

@@ -284,7 +284,7 @@ jobs:
- name: Cache playwright cache
# zizmor: ignore[cache-poisoning] ephemeral runners; no release artifacts
uses: runs-on/cache@50350ad4242587b6c8c2baa2e740b1bc11285ff4 # ratchet:runs-on/cache@v4
uses: runs-on/cache@a5f51d6f3fece787d03b7b4e981c82538a0654ed # ratchet:runs-on/cache@v4
with:
path: ~/.cache/ms-playwright
key: ${{ runner.os }}-playwright-npm-${{ hashFiles('web/package-lock.json') }}
@@ -626,7 +626,7 @@ jobs:
- name: Cache playwright cache
# zizmor: ignore[cache-poisoning] ephemeral runners; no release artifacts
uses: runs-on/cache@50350ad4242587b6c8c2baa2e740b1bc11285ff4 # ratchet:runs-on/cache@v4
uses: runs-on/cache@a5f51d6f3fece787d03b7b4e981c82538a0654ed # ratchet:runs-on/cache@v4
with:
path: ~/.cache/ms-playwright
key: ${{ runner.os }}-playwright-npm-${{ hashFiles('web/package-lock.json') }}

View File

@@ -56,7 +56,7 @@ jobs:
- name: Cache mypy cache
if: ${{ vars.DISABLE_MYPY_CACHE != 'true' }}
uses: runs-on/cache@50350ad4242587b6c8c2baa2e740b1bc11285ff4 # ratchet:runs-on/cache@v4
uses: runs-on/cache@a5f51d6f3fece787d03b7b4e981c82538a0654ed # ratchet:runs-on/cache@v4
with:
path: .mypy_cache
key: mypy-${{ runner.os }}-${{ github.base_ref || github.event.merge_group.base_ref || 'main' }}-${{ hashFiles('**/*.py', '**/*.pyi', 'pyproject.toml') }}

View File

@@ -7,6 +7,13 @@ on:
merge_group:
pull_request:
branches: [main]
paths:
- "backend/**"
- "pyproject.toml"
- "uv.lock"
- ".github/workflows/pr-python-connector-tests.yml"
- ".github/actions/setup-python-and-install-dependencies/**"
- ".github/actions/setup-playwright/**"
push:
tags:
- "v*.*.*"

View File

@@ -31,6 +31,7 @@ jobs:
- runner=4cpu-linux-arm64
- "run-id=${{ github.run_id }}-model-check"
- "extras=ecr-cache"
environment: ci-protected
timeout-minutes: 45
env:

View File

@@ -15,6 +15,7 @@ permissions:
jobs:
Deploy-Preview:
runs-on: ubuntu-latest
environment: ci-protected
timeout-minutes: 30
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd

View File

@@ -13,15 +13,6 @@ jobs:
permissions:
id-token: write
timeout-minutes: 10
strategy:
matrix:
os-arch:
- { goos: "linux", goarch: "amd64" }
- { goos: "linux", goarch: "arm64" }
- { goos: "windows", goarch: "amd64" }
- { goos: "windows", goarch: "arm64" }
- { goos: "darwin", goarch: "amd64" }
- { goos: "darwin", goarch: "arm64" }
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
@@ -31,9 +22,11 @@ jobs:
enable-cache: false
version: "0.9.9"
- run: |
GOOS="${{ matrix.os-arch.goos }}" \
GOARCH="${{ matrix.os-arch.goarch }}" \
uv build --wheel
for goos in linux windows darwin; do
for goarch in amd64 arm64; do
GOOS="$goos" GOARCH="$goarch" uv build --wheel
done
done
working-directory: cli
- run: uv publish
working-directory: cli

View File

@@ -25,6 +25,7 @@ permissions:
jobs:
Deploy-Storybook:
runs-on: ubuntu-latest
environment: ci-protected
timeout-minutes: 30
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v4
@@ -54,6 +55,7 @@ jobs:
needs: Deploy-Storybook
if: always() && needs.Deploy-Storybook.result == 'failure'
runs-on: ubuntu-latest
environment: ci-protected
timeout-minutes: 10
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v4

View File

@@ -9,6 +9,7 @@ on:
jobs:
sync-foss:
runs-on: ubuntu-latest
environment: ci-protected
timeout-minutes: 45
permissions:
contents: read

View File

@@ -11,6 +11,7 @@ permissions:
jobs:
create-and-push-tag:
runs-on: ubuntu-slim
environment: ci-protected
timeout-minutes: 45
steps:

64
.greptile/config.json Normal file
View File

@@ -0,0 +1,64 @@
{
"labels": [],
"comment": "",
"fixWithAI": true,
"hideFooter": false,
"strictness": 3,
"statusCheck": true,
"commentTypes": [
"logic",
"syntax",
"style"
],
"instructions": "",
"disabledLabels": [],
"excludeAuthors": [
"dependabot[bot]",
"renovate[bot]"
],
"ignoreKeywords": "",
"ignorePatterns": "",
"includeAuthors": [],
"summarySection": {
"included": true,
"collapsible": false,
"defaultOpen": false
},
"excludeBranches": [],
"fileChangeLimit": 300,
"includeBranches": [],
"includeKeywords": "",
"triggerOnUpdates": true,
"updateExistingSummaryComment": true,
"updateSummaryOnly": false,
"issuesTableSection": {
"included": true,
"collapsible": false,
"defaultOpen": false
},
"statusCommentsEnabled": true,
"confidenceScoreSection": {
"included": true,
"collapsible": false
},
"sequenceDiagramSection": {
"included": true,
"collapsible": false,
"defaultOpen": false
},
"shouldUpdateDescription": false,
"rules": [
{
"scope": ["web/**"],
"rule": "In Onyx's Next.js app, the `app/ee/admin/` directory is a filesystem convention for Enterprise Edition route overrides — it does NOT add an `/ee/` prefix to the URL. Both `app/admin/groups/page.tsx` and `app/ee/admin/groups/page.tsx` serve the same URL `/admin/groups`. Hardcoded `/admin/...` paths in router.push() calls are correct and do NOT break EE deployments. Do not flag hardcoded admin paths as bugs."
},
{
"scope": ["web/**"],
"rule": "In Onyx, each API key creates a unique user row in the database with a unique `user_id` (UUID). There is a 1:1 mapping between API keys and their backing user records. Multiple API keys do NOT share the same `user_id`. Do not flag potential duplicate row IDs when using `user_id` from API key descriptors."
},
{
"scope": ["backend/**/*.py"],
"rule": "Never raise HTTPException directly in business code. Use `raise OnyxError(OnyxErrorCode.XXX, \"message\")` from `onyx.error_handling.exceptions`. A global FastAPI exception handler converts OnyxError into structured JSON responses with {\"error_code\": \"...\", \"detail\": \"...\"}. Error codes are defined in `onyx.error_handling.error_codes.OnyxErrorCode`. For upstream errors with dynamic HTTP status codes, use `status_code_override`: `raise OnyxError(OnyxErrorCode.BAD_GATEWAY, detail, status_code_override=upstream_status)`."
}
]
}

57
.greptile/files.json Normal file
View File

@@ -0,0 +1,57 @@
[
{
"scope": [],
"path": "contributing_guides/best_practices.md",
"description": "Best practices for contributing to the codebase"
},
{
"scope": ["web/**"],
"path": "web/AGENTS.md",
"description": "Frontend coding standards for the web directory"
},
{
"scope": ["web/**"],
"path": "web/tests/README.md",
"description": "Frontend testing guide and conventions"
},
{
"scope": ["web/**"],
"path": "web/CLAUDE.md",
"description": "Single source of truth for frontend coding standards"
},
{
"scope": ["web/**"],
"path": "web/lib/opal/README.md",
"description": "Opal component library usage guide"
},
{
"scope": ["backend/**"],
"path": "backend/tests/README.md",
"description": "Backend testing guide covering all 4 test types, fixtures, and conventions"
},
{
"scope": ["backend/onyx/connectors/**"],
"path": "backend/onyx/connectors/README.md",
"description": "Connector development guide covering design, interfaces, and required changes"
},
{
"scope": [],
"path": "CLAUDE.md",
"description": "Project instructions and coding standards"
},
{
"scope": [],
"path": "backend/alembic/README.md",
"description": "Migration guidance, including multi-tenant migration behavior"
},
{
"scope": [],
"path": "deployment/helm/charts/onyx/values-lite.yaml",
"description": "Lite deployment Helm values and service assumptions"
},
{
"scope": [],
"path": "deployment/docker_compose/docker-compose.onyx-lite.yml",
"description": "Lite deployment Docker Compose overlay and disabled service behavior"
}
]

39
.greptile/rules.md Normal file
View File

@@ -0,0 +1,39 @@
# Greptile Review Rules
## Type Annotations
Use explicit type annotations for variables to enhance code clarity, especially when moving type hints around in the code.
## Best Practices
Use `contributing_guides/best_practices.md` as core review context. Prefer consistency with existing patterns, fix issues in code you touch, avoid tacking new features onto muddy interfaces, fail loudly instead of silently swallowing errors, keep code strictly typed, preserve clear state boundaries, remove duplicate or dead logic, break up overly long functions, avoid hidden import-time side effects, respect module boundaries, and favor correctness-by-construction over relying on callers to use an API correctly.
## TODOs
Whenever a TODO is added, there must always be an associated name or ticket with that TODO in the style of `TODO(name): ...` or `TODO(1234): ...`
## Debugging Code
Remove temporary debugging code before merging to production, especially tenant-specific debugging logs.
## Hardcoded Booleans
When hardcoding a boolean variable to a constant value, remove the variable entirely and clean up all places where it's used rather than just setting it to a constant.
## Multi-tenant vs Single-tenant
Code changes must consider both multi-tenant and single-tenant deployments. In multi-tenant mode, preserve tenant isolation, ensure tenant context is propagated correctly, and avoid assumptions that only hold for a single shared schema or globally shared state. In single-tenant mode, avoid introducing unnecessary tenant-specific requirements or cloud-only control-plane dependencies.
## Nginx Routing — New Backend Routes
Whenever a new backend route is added that does NOT start with `/api`, it must also be explicitly added to ALL nginx configs:
- `deployment/helm/charts/onyx/templates/nginx-conf.yaml` (Helm/k8s)
- `deployment/data/nginx/app.conf.template` (docker-compose dev)
- `deployment/data/nginx/app.conf.template.prod` (docker-compose prod)
- `deployment/data/nginx/app.conf.template.no-letsencrypt` (docker-compose no-letsencrypt)
Routes not starting with `/api` are not caught by the existing `^/(api|openapi\.json)` location block and will fall through to `location /`, which proxies to the Next.js web server and returns an HTML 404. The new location block must be placed before the `/api` block. Examples of routes that need this treatment: `/scim`, `/mcp`.
## Full vs Lite Deployments
Code changes must consider both regular Onyx deployments and Onyx lite deployments. Lite deployments disable the vector DB, Redis, model servers, and background workers by default, use PostgreSQL-backed cache/auth/file storage, and rely on the API server to handle background work. Do not assume those services are available unless the code path is explicitly limited to full deployments.

View File

@@ -122,7 +122,7 @@ repos:
rev: 5d1e709b7be35cb2025444e19de266b056b7b7ee # frozen: v2.10.1
hooks:
- id: golangci-lint
language_version: "1.26.0"
language_version: "1.26.1"
entry: bash -c "find . -name go.mod -not -path './.venv/*' -print0 | xargs -0 -I{} bash -c 'cd \"$(dirname {})\" && golangci-lint run ./...'"
- repo: https://github.com/astral-sh/ruff-pre-commit

12
.vscode/launch.json vendored
View File

@@ -117,7 +117,8 @@
"presentation": {
"group": "2"
},
"consoleTitle": "API Server Console"
"consoleTitle": "API Server Console",
"justMyCode": false
},
{
"name": "Slack Bot",
@@ -268,7 +269,8 @@
"presentation": {
"group": "2"
},
"consoleTitle": "Celery heavy Console"
"consoleTitle": "Celery heavy Console",
"justMyCode": false
},
{
"name": "Celery kg_processing",
@@ -355,7 +357,8 @@
"presentation": {
"group": "2"
},
"consoleTitle": "Celery user_file_processing Console"
"consoleTitle": "Celery user_file_processing Console",
"justMyCode": false
},
{
"name": "Celery docfetching",
@@ -413,7 +416,8 @@
"presentation": {
"group": "2"
},
"consoleTitle": "Celery docprocessing Console"
"consoleTitle": "Celery docprocessing Console",
"justMyCode": false
},
{
"name": "Celery beat",

View File

@@ -35,7 +35,7 @@ Onyx comes loaded with advanced features like Agents, Web Search, RAG, MCP, Deep
> [!TIP]
> Run Onyx with one command (or see deployment section below):
> ```
> curl -fsSL https://raw.githubusercontent.com/onyx-dot-app/onyx/main/deployment/docker_compose/install.sh > install.sh && chmod +x install.sh && ./install.sh
> curl -fsSL https://onyx.app/install_onyx.sh | bash
> ```
****

View File

@@ -0,0 +1,108 @@
"""backfill_account_type
Revision ID: 03d085c5c38d
Revises: 977e834c1427
Create Date: 2026-03-25 16:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "03d085c5c38d"
down_revision = "977e834c1427"
branch_labels = None
depends_on = None
_STANDARD = "STANDARD"
_BOT = "BOT"
_EXT_PERM_USER = "EXT_PERM_USER"
_SERVICE_ACCOUNT = "SERVICE_ACCOUNT"
_ANONYMOUS = "ANONYMOUS"
# Well-known anonymous user UUID
ANONYMOUS_USER_ID = "00000000-0000-0000-0000-000000000002"
# Email pattern for API key virtual users
API_KEY_EMAIL_PATTERN = r"API\_KEY\_\_%"
# Reflect the table structure for use in DML
user_table = sa.table(
"user",
sa.column("id", sa.Uuid),
sa.column("email", sa.String),
sa.column("role", sa.String),
sa.column("account_type", sa.String),
)
def upgrade() -> None:
# ------------------------------------------------------------------
# Step 1: Backfill account_type from role.
# Order matters — most-specific matches first so the final catch-all
# only touches rows that haven't been classified yet.
# ------------------------------------------------------------------
# 1a. API key virtual users → SERVICE_ACCOUNT
op.execute(
sa.update(user_table)
.where(
user_table.c.email.ilike(API_KEY_EMAIL_PATTERN),
user_table.c.account_type.is_(None),
)
.values(account_type=_SERVICE_ACCOUNT)
)
# 1b. Anonymous user → ANONYMOUS
op.execute(
sa.update(user_table)
.where(
user_table.c.id == ANONYMOUS_USER_ID,
user_table.c.account_type.is_(None),
)
.values(account_type=_ANONYMOUS)
)
# 1c. SLACK_USER role → BOT
op.execute(
sa.update(user_table)
.where(
user_table.c.role == "SLACK_USER",
user_table.c.account_type.is_(None),
)
.values(account_type=_BOT)
)
# 1d. EXT_PERM_USER role → EXT_PERM_USER
op.execute(
sa.update(user_table)
.where(
user_table.c.role == "EXT_PERM_USER",
user_table.c.account_type.is_(None),
)
.values(account_type=_EXT_PERM_USER)
)
# 1e. Everything else → STANDARD
op.execute(
sa.update(user_table)
.where(user_table.c.account_type.is_(None))
.values(account_type=_STANDARD)
)
# ------------------------------------------------------------------
# Step 2: Set account_type to NOT NULL now that every row is filled.
# ------------------------------------------------------------------
op.alter_column(
"user",
"account_type",
nullable=False,
server_default="STANDARD",
)
def downgrade() -> None:
op.alter_column("user", "account_type", nullable=True, server_default=None)
op.execute(sa.update(user_table).values(account_type=None))

View File

@@ -0,0 +1,35 @@
"""remove voice_provider deleted column
Revision ID: 1d78c0ca7853
Revises: a3f8b2c1d4e5
Create Date: 2026-03-26 11:30:53.883127
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "1d78c0ca7853"
down_revision = "a3f8b2c1d4e5"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Hard-delete any soft-deleted rows before dropping the column
op.execute("DELETE FROM voice_provider WHERE deleted = true")
op.drop_column("voice_provider", "deleted")
def downgrade() -> None:
op.add_column(
"voice_provider",
sa.Column(
"deleted",
sa.Boolean(),
nullable=False,
server_default=sa.text("false"),
),
)

View File

@@ -0,0 +1,109 @@
"""group_permissions_phase1
Revision ID: 25a5501dc766
Revises: b728689f45b1
Create Date: 2026-03-23 11:41:25.557442
"""
from alembic import op
import fastapi_users_db_sqlalchemy
import sqlalchemy as sa
from onyx.db.enums import AccountType
from onyx.db.enums import GrantSource
from onyx.db.enums import Permission
# revision identifiers, used by Alembic.
revision = "25a5501dc766"
down_revision = "b728689f45b1"
branch_labels = None
depends_on = None
def upgrade() -> None:
# 1. Add account_type column to user table (nullable for now).
# TODO(subash): backfill account_type for existing rows and add NOT NULL.
op.add_column(
"user",
sa.Column(
"account_type",
sa.Enum(AccountType, native_enum=False),
nullable=True,
),
)
# 2. Add is_default column to user_group table
op.add_column(
"user_group",
sa.Column(
"is_default",
sa.Boolean(),
nullable=False,
server_default=sa.false(),
),
)
# 3. Create permission_grant table
op.create_table(
"permission_grant",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("group_id", sa.Integer(), nullable=False),
sa.Column(
"permission",
sa.Enum(Permission, native_enum=False),
nullable=False,
),
sa.Column(
"grant_source",
sa.Enum(GrantSource, native_enum=False),
nullable=False,
),
sa.Column(
"granted_by",
fastapi_users_db_sqlalchemy.generics.GUID(),
nullable=True,
),
sa.Column(
"granted_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"is_deleted",
sa.Boolean(),
nullable=False,
server_default=sa.false(),
),
sa.PrimaryKeyConstraint("id"),
sa.ForeignKeyConstraint(
["group_id"],
["user_group.id"],
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["granted_by"],
["user.id"],
ondelete="SET NULL",
),
sa.UniqueConstraint(
"group_id", "permission", name="uq_permission_grant_group_permission"
),
)
# 4. Index on user__user_group(user_id) — existing composite PK
# has user_group_id as leading column; user-filtered queries need this
op.create_index(
"ix_user__user_group_user_id",
"user__user_group",
["user_id"],
)
def downgrade() -> None:
op.drop_index("ix_user__user_group_user_id", table_name="user__user_group")
op.drop_table("permission_grant")
op.drop_column("user_group", "is_default")
op.drop_column("user", "account_type")

View File

@@ -0,0 +1,104 @@
"""add_effective_permissions
Adds a JSONB column `effective_permissions` to the user table to store
directly granted permissions (e.g. ["admin"] or ["basic"]). Implied
permissions are expanded at read time, not stored.
Backfill: joins user__user_group → permission_grant to collect each
user's granted permissions into a JSON array. Users without group
memberships keep the default [].
Revision ID: 503883791c39
Revises: b4b7e1028dfd
Create Date: 2026-03-30 14:49:22.261748
"""
from collections.abc import Sequence
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "503883791c39"
down_revision = "b4b7e1028dfd"
branch_labels: str | None = None
depends_on: str | Sequence[str] | None = None
user_table = sa.table(
"user",
sa.column("id", sa.Uuid),
sa.column("effective_permissions", postgresql.JSONB),
)
user_user_group = sa.table(
"user__user_group",
sa.column("user_id", sa.Uuid),
sa.column("user_group_id", sa.Integer),
)
permission_grant = sa.table(
"permission_grant",
sa.column("group_id", sa.Integer),
sa.column("permission", sa.String),
sa.column("is_deleted", sa.Boolean),
)
def upgrade() -> None:
op.add_column(
"user",
sa.Column(
"effective_permissions",
postgresql.JSONB(),
nullable=False,
server_default=sa.text("'[]'::jsonb"),
),
)
conn = op.get_bind()
# Deduplicated permissions per user
deduped = (
sa.select(
user_user_group.c.user_id,
permission_grant.c.permission,
)
.select_from(
user_user_group.join(
permission_grant,
sa.and_(
permission_grant.c.group_id == user_user_group.c.user_group_id,
permission_grant.c.is_deleted == sa.false(),
),
)
)
.distinct()
.subquery("deduped")
)
# Aggregate into JSONB array per user (order is not guaranteed;
# consumers read this as a set so ordering does not matter)
perms_per_user = (
sa.select(
deduped.c.user_id,
sa.func.jsonb_agg(
deduped.c.permission,
type_=postgresql.JSONB,
).label("perms"),
)
.group_by(deduped.c.user_id)
.subquery("sub")
)
conn.execute(
user_table.update()
.where(user_table.c.id == perms_per_user.c.user_id)
.values(effective_permissions=perms_per_user.c.perms)
)
def downgrade() -> None:
op.drop_column("user", "effective_permissions")

View File

@@ -0,0 +1,54 @@
"""csv to tabular chat file type
Revision ID: 8188861f4e92
Revises: d8cdfee5df80
Create Date: 2026-03-31 19:23:05.753184
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "8188861f4e92"
down_revision = "d8cdfee5df80"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute(
"""
UPDATE chat_message
SET files = (
SELECT jsonb_agg(
CASE
WHEN elem->>'type' = 'csv'
THEN jsonb_set(elem, '{type}', '"tabular"')
ELSE elem
END
)
FROM jsonb_array_elements(files) AS elem
)
WHERE files::text LIKE '%"type": "csv"%'
"""
)
def downgrade() -> None:
op.execute(
"""
UPDATE chat_message
SET files = (
SELECT jsonb_agg(
CASE
WHEN elem->>'type' = 'tabular'
THEN jsonb_set(elem, '{type}', '"csv"')
ELSE elem
END
)
FROM jsonb_array_elements(files) AS elem
)
WHERE files::text LIKE '%"type": "tabular"%'
"""
)

View File

@@ -0,0 +1,136 @@
"""seed_default_groups
Revision ID: 977e834c1427
Revises: 8188861f4e92
Create Date: 2026-03-25 14:59:41.313091
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import insert as pg_insert
# revision identifiers, used by Alembic.
revision = "977e834c1427"
down_revision = "8188861f4e92"
branch_labels = None
depends_on = None
# (group_name, permission_value)
DEFAULT_GROUPS = [
("Admin", "admin"),
("Basic", "basic"),
]
CUSTOM_SUFFIX = "(Custom)"
MAX_RENAME_ATTEMPTS = 100
# Reflect table structures for use in DML
user_group_table = sa.table(
"user_group",
sa.column("id", sa.Integer),
sa.column("name", sa.String),
sa.column("is_up_to_date", sa.Boolean),
sa.column("is_up_for_deletion", sa.Boolean),
sa.column("is_default", sa.Boolean),
)
permission_grant_table = sa.table(
"permission_grant",
sa.column("group_id", sa.Integer),
sa.column("permission", sa.String),
sa.column("grant_source", sa.String),
)
user__user_group_table = sa.table(
"user__user_group",
sa.column("user_group_id", sa.Integer),
sa.column("user_id", sa.Uuid),
)
def _find_available_name(conn: sa.engine.Connection, base: str) -> str:
"""Return a name like 'Admin (Custom)' or 'Admin (Custom 2)' that is not taken."""
candidate = f"{base} {CUSTOM_SUFFIX}"
attempt = 1
while attempt <= MAX_RENAME_ATTEMPTS:
exists = conn.execute(
sa.select(sa.literal(1))
.select_from(user_group_table)
.where(user_group_table.c.name == candidate)
.limit(1)
).fetchone()
if exists is None:
return candidate
attempt += 1
candidate = f"{base} (Custom {attempt})"
raise RuntimeError(
f"Could not find an available name for group '{base}' "
f"after {MAX_RENAME_ATTEMPTS} attempts"
)
def upgrade() -> None:
conn = op.get_bind()
for group_name, permission_value in DEFAULT_GROUPS:
# Step 1: Rename ALL existing groups that clash with the canonical name.
conflicting = conn.execute(
sa.select(user_group_table.c.id, user_group_table.c.name).where(
user_group_table.c.name == group_name
)
).fetchall()
for row_id, row_name in conflicting:
new_name = _find_available_name(conn, row_name)
op.execute(
sa.update(user_group_table)
.where(user_group_table.c.id == row_id)
.values(name=new_name, is_up_to_date=False)
)
# Step 2: Create a fresh default group.
result = conn.execute(
user_group_table.insert()
.values(
name=group_name,
is_up_to_date=True,
is_up_for_deletion=False,
is_default=True,
)
.returning(user_group_table.c.id)
).fetchone()
assert result is not None
group_id = result[0]
# Step 3: Upsert permission grant.
op.execute(
pg_insert(permission_grant_table)
.values(
group_id=group_id,
permission=permission_value,
grant_source="SYSTEM",
)
.on_conflict_do_nothing(index_elements=["group_id", "permission"])
)
def downgrade() -> None:
# Remove the default groups created by this migration.
# First remove user-group memberships that reference default groups
# to avoid FK violations, then delete the groups themselves.
default_group_ids = sa.select(user_group_table.c.id).where(
user_group_table.c.is_default == True # noqa: E712
)
op.execute(
sa.delete(user__user_group_table).where(
user__user_group_table.c.user_group_id.in_(default_group_ids)
)
)
op.execute(
sa.delete(user_group_table).where(
user_group_table.c.is_default == True # noqa: E712
)
)

View File

@@ -0,0 +1,36 @@
"""add preferred_response_id and model_display_name to chat_message
Revision ID: a3f8b2c1d4e5
Create Date: 2026-03-22
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "a3f8b2c1d4e5"
down_revision = "25a5501dc766"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"chat_message",
sa.Column(
"preferred_response_id",
sa.Integer(),
sa.ForeignKey("chat_message.id", ondelete="SET NULL"),
nullable=True,
),
)
op.add_column(
"chat_message",
sa.Column("model_display_name", sa.String(), nullable=True),
)
def downgrade() -> None:
op.drop_column("chat_message", "model_display_name")
op.drop_column("chat_message", "preferred_response_id")

View File

@@ -0,0 +1,84 @@
"""grant_basic_to_existing_groups
Grants the "basic" permission to all existing groups that don't already
have it. Every group should have at least "basic" so that its members
get basic access when effective_permissions is backfilled.
Revision ID: b4b7e1028dfd
Revises: b7bcc991d722
Create Date: 2026-03-30 16:15:17.093498
"""
from collections.abc import Sequence
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "b4b7e1028dfd"
down_revision = "b7bcc991d722"
branch_labels: str | None = None
depends_on: str | Sequence[str] | None = None
user_group = sa.table(
"user_group",
sa.column("id", sa.Integer),
sa.column("is_default", sa.Boolean),
)
permission_grant = sa.table(
"permission_grant",
sa.column("group_id", sa.Integer),
sa.column("permission", sa.String),
sa.column("grant_source", sa.String),
sa.column("is_deleted", sa.Boolean),
)
def upgrade() -> None:
conn = op.get_bind()
already_has_basic = (
sa.select(sa.literal(1))
.select_from(permission_grant)
.where(
permission_grant.c.group_id == user_group.c.id,
permission_grant.c.permission == "basic",
)
.exists()
)
groups_needing_basic = sa.select(
user_group.c.id,
sa.literal("basic").label("permission"),
sa.literal("SYSTEM").label("grant_source"),
sa.literal(False).label("is_deleted"),
).where(
user_group.c.is_default == sa.false(),
~already_has_basic,
)
conn.execute(
permission_grant.insert().from_select(
["group_id", "permission", "grant_source", "is_deleted"],
groups_needing_basic,
)
)
def downgrade() -> None:
conn = op.get_bind()
non_default_group_ids = sa.select(user_group.c.id).where(
user_group.c.is_default == sa.false()
)
conn.execute(
permission_grant.delete().where(
permission_grant.c.permission == "basic",
permission_grant.c.grant_source == "SYSTEM",
permission_grant.c.group_id.in_(non_default_group_ids),
)
)

View File

@@ -0,0 +1,26 @@
"""rename persona is_visible to is_listed and featured to is_featured
Revision ID: b728689f45b1
Revises: 689433b0d8de
Create Date: 2026-03-23 12:36:26.607305
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "b728689f45b1"
down_revision = "689433b0d8de"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.alter_column("persona", "is_visible", new_column_name="is_listed")
op.alter_column("persona", "featured", new_column_name="is_featured")
def downgrade() -> None:
op.alter_column("persona", "is_listed", new_column_name="is_visible")
op.alter_column("persona", "is_featured", new_column_name="featured")

View File

@@ -0,0 +1,116 @@
"""assign_users_to_default_groups
Revision ID: b7bcc991d722
Revises: 03d085c5c38d
Create Date: 2026-03-25 16:30:39.529301
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import insert as pg_insert
# revision identifiers, used by Alembic.
revision = "b7bcc991d722"
down_revision = "03d085c5c38d"
branch_labels = None
depends_on = None
# Reflect table structures for use in DML
user_group_table = sa.table(
"user_group",
sa.column("id", sa.Integer),
sa.column("name", sa.String),
sa.column("is_default", sa.Boolean),
)
user_table = sa.table(
"user",
sa.column("id", sa.Uuid),
sa.column("role", sa.String),
sa.column("account_type", sa.String),
sa.column("is_active", sa.Boolean),
)
user__user_group_table = sa.table(
"user__user_group",
sa.column("user_group_id", sa.Integer),
sa.column("user_id", sa.Uuid),
)
def upgrade() -> None:
conn = op.get_bind()
# Look up default group IDs
admin_row = conn.execute(
sa.select(user_group_table.c.id).where(
user_group_table.c.name == "Admin",
user_group_table.c.is_default == True, # noqa: E712
)
).fetchone()
basic_row = conn.execute(
sa.select(user_group_table.c.id).where(
user_group_table.c.name == "Basic",
user_group_table.c.is_default == True, # noqa: E712
)
).fetchone()
if admin_row is None:
raise RuntimeError(
"Default 'Admin' group not found. "
"Ensure migration 977e834c1427 (seed_default_groups) ran successfully."
)
if basic_row is None:
raise RuntimeError(
"Default 'Basic' group not found. "
"Ensure migration 977e834c1427 (seed_default_groups) ran successfully."
)
# Users with role=admin → Admin group
# Exclude inactive placeholder/anonymous users that are not real users
admin_users = sa.select(
sa.literal(admin_row[0]).label("user_group_id"),
user_table.c.id.label("user_id"),
).where(
user_table.c.role == "ADMIN",
user_table.c.is_active == True, # noqa: E712
)
op.execute(
pg_insert(user__user_group_table)
.from_select(["user_group_id", "user_id"], admin_users)
.on_conflict_do_nothing(index_elements=["user_group_id", "user_id"])
)
# STANDARD users (non-admin) and SERVICE_ACCOUNT users (role=basic) → Basic group
# Exclude inactive placeholder/anonymous users that are not real users
basic_users = sa.select(
sa.literal(basic_row[0]).label("user_group_id"),
user_table.c.id.label("user_id"),
).where(
user_table.c.is_active == True, # noqa: E712
sa.or_(
sa.and_(
user_table.c.account_type == "STANDARD",
user_table.c.role != "ADMIN",
),
sa.and_(
user_table.c.account_type == "SERVICE_ACCOUNT",
user_table.c.role == "BASIC",
),
),
)
op.execute(
pg_insert(user__user_group_table)
.from_select(["user_group_id", "user_id"], basic_users)
.on_conflict_do_nothing(index_elements=["user_group_id", "user_id"])
)
def downgrade() -> None:
# Group memberships are left in place — removing them risks
# deleting memberships that existed before this migration.
pass

View File

@@ -0,0 +1,55 @@
"""add skipped to userfilestatus
Revision ID: d8cdfee5df80
Revises: 1d78c0ca7853
Create Date: 2026-04-01 10:47:12.593950
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "d8cdfee5df80"
down_revision = "1d78c0ca7853"
branch_labels = None
depends_on = None
TABLE = "user_file"
COLUMN = "status"
CONSTRAINT_NAME = "ck_user_file_status"
OLD_VALUES = ("PROCESSING", "INDEXING", "COMPLETED", "FAILED", "CANCELED", "DELETING")
NEW_VALUES = (
"PROCESSING",
"INDEXING",
"COMPLETED",
"SKIPPED",
"FAILED",
"CANCELED",
"DELETING",
)
def _drop_status_check_constraint() -> None:
inspector = sa.inspect(op.get_bind())
for constraint in inspector.get_check_constraints(TABLE):
if COLUMN in constraint.get("sqltext", ""):
constraint_name = constraint["name"]
if constraint_name is not None:
op.drop_constraint(constraint_name, TABLE, type_="check")
def upgrade() -> None:
_drop_status_check_constraint()
in_clause = ", ".join(f"'{v}'" for v in NEW_VALUES)
op.create_check_constraint(CONSTRAINT_NAME, TABLE, f"{COLUMN} IN ({in_clause})")
def downgrade() -> None:
op.execute(f"UPDATE {TABLE} SET {COLUMN} = 'COMPLETED' WHERE {COLUMN} = 'SKIPPED'")
_drop_status_check_constraint()
in_clause = ", ".join(f"'{v}'" for v in OLD_VALUES)
op.create_check_constraint(CONSTRAINT_NAME, TABLE, f"{COLUMN} IN ({in_clause})")

View File

@@ -36,6 +36,56 @@ TABLES_WITH_USER_ID = [
]
def _dedupe_null_notifications(connection: sa.Connection) -> None:
# Multiple NULL-owned notifications can exist because the unique index treats
# NULL user_id values as distinct. Before migrating them to the anonymous
# user, collapse duplicates and remove rows that would conflict with an
# already-existing anonymous notification.
result = connection.execute(
sa.text(
"""
WITH ranked_null_notifications AS (
SELECT
id,
ROW_NUMBER() OVER (
PARTITION BY notif_type, COALESCE(additional_data, '{}'::jsonb)
ORDER BY first_shown DESC, last_shown DESC, id DESC
) AS row_num
FROM notification
WHERE user_id IS NULL
)
DELETE FROM notification
WHERE id IN (
SELECT id
FROM ranked_null_notifications
WHERE row_num > 1
)
"""
)
)
if result.rowcount > 0:
print(f"Deleted {result.rowcount} duplicate NULL-owned notifications")
result = connection.execute(
sa.text(
"""
DELETE FROM notification AS null_owned
USING notification AS anonymous_owned
WHERE null_owned.user_id IS NULL
AND anonymous_owned.user_id = :user_id
AND null_owned.notif_type = anonymous_owned.notif_type
AND COALESCE(null_owned.additional_data, '{}'::jsonb) =
COALESCE(anonymous_owned.additional_data, '{}'::jsonb)
"""
),
{"user_id": ANONYMOUS_USER_UUID},
)
if result.rowcount > 0:
print(
f"Deleted {result.rowcount} NULL-owned notifications that conflict with existing anonymous-owned notifications"
)
def upgrade() -> None:
"""
Create the anonymous user for anonymous access feature.
@@ -65,7 +115,12 @@ def upgrade() -> None:
# Migrate any remaining user_id=NULL records to anonymous user
for table in TABLES_WITH_USER_ID:
try:
# Dedup notifications outside the savepoint so deletions persist
# even if the subsequent UPDATE rolls back
if table == "notification":
_dedupe_null_notifications(connection)
with connection.begin_nested():
# Exclude public credential (id=0) which must remain user_id=NULL
# Exclude builtin tools (in_code_tool_id IS NOT NULL) which must remain user_id=NULL
# Exclude builtin personas (builtin_persona=True) which must remain user_id=NULL
@@ -80,6 +135,7 @@ def upgrade() -> None:
condition = "user_id IS NULL AND is_public = false"
else:
condition = "user_id IS NULL"
result = connection.execute(
sa.text(
f"""
@@ -92,19 +148,19 @@ def upgrade() -> None:
)
if result.rowcount > 0:
print(f"Updated {result.rowcount} rows in {table} to anonymous user")
except Exception as e:
print(f"Skipping {table}: {e}")
def downgrade() -> None:
"""
Set anonymous user's records back to NULL and delete the anonymous user.
Note: Duplicate NULL-owned notifications removed during upgrade are not restored.
"""
connection = op.get_bind()
# Set records back to NULL
for table in TABLES_WITH_USER_ID:
try:
with connection.begin_nested():
connection.execute(
sa.text(
f"""
@@ -115,8 +171,6 @@ def downgrade() -> None:
),
{"user_id": ANONYMOUS_USER_UUID},
)
except Exception:
pass
# Delete the anonymous user
connection.execute(

View File

@@ -5,6 +5,7 @@ from onyx.background.celery.apps.primary import celery_app
celery_app.autodiscover_tasks(
app_base.filter_task_modules(
[
"ee.onyx.background.celery.tasks.hooks",
"ee.onyx.background.celery.tasks.doc_permission_syncing",
"ee.onyx.background.celery.tasks.external_group_syncing",
"ee.onyx.background.celery.tasks.cloud",

View File

@@ -55,6 +55,15 @@ ee_tasks_to_schedule: list[dict] = []
if not MULTI_TENANT:
ee_tasks_to_schedule = [
{
"name": "hook-execution-log-cleanup",
"task": OnyxCeleryTask.HOOK_EXECUTION_LOG_CLEANUP_TASK,
"schedule": timedelta(days=1),
"options": {
"priority": OnyxCeleryPriority.LOW,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
"name": "autogenerate-usage-report",
"task": OnyxCeleryTask.GENERATE_USAGE_REPORT_TASK,

View File

@@ -28,6 +28,7 @@ from onyx.access.models import DocExternalAccess
from onyx.access.models import ElementExternalAccess
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_redis import celery_get_queued_task_ids
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
@@ -187,7 +188,6 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str) -> bool | None
# (which lives on a different db number)
r = get_redis_client()
r_replica = get_redis_replica_client()
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_DOC_PERMISSIONS_SYNC_BEAT_LOCK,
@@ -227,6 +227,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str) -> bool | None
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
r_celery = celery_get_broker_client(self.app)
validate_permission_sync_fences(
tenant_id, r, r_replica, r_celery, lock_beat
)
@@ -473,6 +474,8 @@ def connector_permission_sync_generator_task(
cc_pair = get_connector_credential_pair_from_id(
db_session=db_session,
cc_pair_id=cc_pair_id,
eager_load_connector=True,
eager_load_credential=True,
)
if cc_pair is None:
raise ValueError(

View File

@@ -29,6 +29,7 @@ from ee.onyx.external_permissions.sync_params import (
from ee.onyx.external_permissions.sync_params import get_source_perm_sync_config
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
from onyx.background.error_logging import emit_background_error
@@ -162,7 +163,6 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str) -> bool | None:
# (which lives on a different db number)
r = get_redis_client()
r_replica = get_redis_replica_client()
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_EXTERNAL_GROUP_SYNC_BEAT_LOCK,
@@ -221,6 +221,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str) -> bool | None:
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
r_celery = celery_get_broker_client(self.app)
validate_external_group_sync_fences(
tenant_id, self.app, r, r_replica, r_celery, lock_beat
)

View File

@@ -13,6 +13,7 @@ from redis.lock import Lock as RedisLock
from ee.onyx.server.tenants.provisioning import setup_tenant
from ee.onyx.server.tenants.schema_management import create_schema_if_not_exists
from ee.onyx.server.tenants.schema_management import get_current_alembic_version
from ee.onyx.server.tenants.schema_management import run_alembic_migrations
from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.app_configs import TARGET_AVAILABLE_TENANTS
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
@@ -25,10 +26,14 @@ from onyx.redis.redis_pool import get_redis_client
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import TENANT_ID_PREFIX
# Soft time limit for tenant pre-provisioning tasks (in seconds)
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes
# Hard time limit for tenant pre-provisioning tasks (in seconds)
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 10 # 10 minutes
# Maximum tenants to provision in a single task run.
# Each tenant takes ~80s (alembic migrations), so 5 tenants ≈ 7 minutes.
_MAX_TENANTS_PER_RUN = 5
# Time limits sized for worst-case: provisioning up to _MAX_TENANTS_PER_RUN new tenants
# (~90s each) plus migrating up to TARGET_AVAILABLE_TENANTS pool tenants (~90s each).
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 20 # 20 minutes
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 25 # 25 minutes
@shared_task(
@@ -85,9 +90,27 @@ def check_available_tenants(self: Task) -> None: # noqa: ARG001
f"To provision: {tenants_to_provision}"
)
# just provision one tenant each time we run this ... increase if needed.
if tenants_to_provision > 0:
pre_provision_tenant()
batch_size = min(tenants_to_provision, _MAX_TENANTS_PER_RUN)
if batch_size < tenants_to_provision:
task_logger.info(
f"Capping batch to {batch_size} (need {tenants_to_provision}, will catch up next cycle)"
)
provisioned = 0
for i in range(batch_size):
task_logger.info(f"Provisioning tenant {i + 1}/{batch_size}")
try:
if pre_provision_tenant():
provisioned += 1
except Exception:
task_logger.exception(
f"Failed to provision tenant {i + 1}/{batch_size}, continuing with remaining tenants"
)
task_logger.info(f"Provisioning complete: {provisioned}/{batch_size} succeeded")
# Migrate any pool tenants that were provisioned before a new migration was deployed
_migrate_stale_pool_tenants()
except Exception:
task_logger.exception("Error in check_available_tenants task")
@@ -101,11 +124,53 @@ def check_available_tenants(self: Task) -> None: # noqa: ARG001
)
def pre_provision_tenant() -> None:
def _migrate_stale_pool_tenants() -> None:
"""
Run alembic upgrade head on all pool tenants. Since alembic upgrade head is
idempotent, tenants already at head are a fast no-op. This ensures pool
tenants are always current so that signup doesn't hit schema mismatches
(e.g. missing columns added after the tenant was pre-provisioned).
"""
with get_session_with_shared_schema() as db_session:
pool_tenants = db_session.query(AvailableTenant).all()
tenant_ids = [t.tenant_id for t in pool_tenants]
if not tenant_ids:
return
task_logger.info(
f"Checking {len(tenant_ids)} pool tenant(s) for pending migrations"
)
for tenant_id in tenant_ids:
try:
run_alembic_migrations(tenant_id)
new_version = get_current_alembic_version(tenant_id)
with get_session_with_shared_schema() as db_session:
tenant = (
db_session.query(AvailableTenant)
.filter_by(tenant_id=tenant_id)
.first()
)
if tenant and tenant.alembic_version != new_version:
task_logger.info(
f"Migrated pool tenant {tenant_id}: {tenant.alembic_version} -> {new_version}"
)
tenant.alembic_version = new_version
db_session.commit()
except Exception:
task_logger.exception(
f"Failed to migrate pool tenant {tenant_id}, skipping"
)
def pre_provision_tenant() -> bool:
"""
Pre-provision a new tenant and store it in the NewAvailableTenant table.
This function fully sets up the tenant with all necessary configurations,
so it's ready to be assigned to a user immediately.
Returns True if a tenant was successfully provisioned, False otherwise.
"""
# The MULTI_TENANT check is now done at the caller level (check_available_tenants)
# rather than inside this function
@@ -118,10 +183,10 @@ def pre_provision_tenant() -> None:
# Allow multiple pre-provisioning tasks to run, but ensure they don't overlap
if not lock_provision.acquire(blocking=False):
task_logger.debug(
"Skipping pre_provision_tenant task because it is already running"
task_logger.warning(
"Skipping pre_provision_tenant — could not acquire provision lock"
)
return
return False
tenant_id: str | None = None
try:
@@ -161,6 +226,7 @@ def pre_provision_tenant() -> None:
db_session.add(new_tenant)
db_session.commit()
task_logger.info(f"Successfully pre-provisioned tenant: {tenant_id}")
return True
except Exception:
db_session.rollback()
task_logger.error(
@@ -184,6 +250,7 @@ def pre_provision_tenant() -> None:
asyncio.run(rollback_tenant_provisioning(tenant_id))
except Exception:
task_logger.exception(f"Error during rollback for tenant: {tenant_id}")
return False
finally:
try:
lock_provision.release()

View File

@@ -69,5 +69,7 @@ EE_ONLY_PATH_PREFIXES: frozenset[str] = frozenset(
"/admin/token-rate-limits",
# Evals
"/evals",
# Hook extensions
"/admin/hooks",
}
)

View File

@@ -115,8 +115,14 @@ def fetch_user_group_token_rate_limits_for_user(
ordered: bool = True,
get_editable: bool = True,
) -> Sequence[TokenRateLimit]:
stmt = select(TokenRateLimit)
stmt = stmt.where(User__UserGroup.user_group_id == group_id)
stmt = (
select(TokenRateLimit)
.join(
TokenRateLimit__UserGroup,
TokenRateLimit.id == TokenRateLimit__UserGroup.rate_limit_id,
)
.where(TokenRateLimit__UserGroup.user_group_id == group_id)
)
stmt = _add_user_filters(stmt, user, get_editable)
if enabled_only:

View File

@@ -19,6 +19,8 @@ from onyx.configs.app_configs import DISABLE_VECTOR_DB
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import GrantSource
from onyx.db.enums import Permission
from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import Credential
from onyx.db.models import Credential__UserGroup
@@ -28,6 +30,7 @@ from onyx.db.models import DocumentSet
from onyx.db.models import DocumentSet__UserGroup
from onyx.db.models import FederatedConnector__DocumentSet
from onyx.db.models import LLMProvider__UserGroup
from onyx.db.models import PermissionGrant
from onyx.db.models import Persona
from onyx.db.models import Persona__UserGroup
from onyx.db.models import TokenRateLimit__UserGroup
@@ -36,6 +39,7 @@ from onyx.db.models import User__UserGroup
from onyx.db.models import UserGroup
from onyx.db.models import UserGroup__ConnectorCredentialPair
from onyx.db.models import UserRole
from onyx.db.permissions import recompute_user_permissions__no_commit
from onyx.db.users import fetch_user_by_id
from onyx.utils.logger import setup_logger
@@ -255,6 +259,7 @@ def fetch_user_groups(
db_session: Session,
only_up_to_date: bool = True,
eager_load_for_snapshot: bool = False,
include_default: bool = True,
) -> Sequence[UserGroup]:
"""
Fetches user groups from the database.
@@ -269,6 +274,7 @@ def fetch_user_groups(
to include only up to date user groups. Defaults to `True`.
eager_load_for_snapshot: If True, adds eager loading for all relationships
needed by UserGroup.from_model snapshot creation.
include_default: If False, excludes system default groups (is_default=True).
Returns:
Sequence[UserGroup]: A sequence of `UserGroup` objects matching the query criteria.
@@ -276,6 +282,8 @@ def fetch_user_groups(
stmt = select(UserGroup)
if only_up_to_date:
stmt = stmt.where(UserGroup.is_up_to_date == True) # noqa: E712
if not include_default:
stmt = stmt.where(UserGroup.is_default == False) # noqa: E712
if eager_load_for_snapshot:
stmt = _add_user_group_snapshot_eager_loads(stmt)
return db_session.scalars(stmt).unique().all()
@@ -286,6 +294,7 @@ def fetch_user_groups_for_user(
user_id: UUID,
only_curator_groups: bool = False,
eager_load_for_snapshot: bool = False,
include_default: bool = True,
) -> Sequence[UserGroup]:
stmt = (
select(UserGroup)
@@ -295,6 +304,8 @@ def fetch_user_groups_for_user(
)
if only_curator_groups:
stmt = stmt.where(User__UserGroup.is_curator == True) # noqa: E712
if not include_default:
stmt = stmt.where(UserGroup.is_default == False) # noqa: E712
if eager_load_for_snapshot:
stmt = _add_user_group_snapshot_eager_loads(stmt)
return db_session.scalars(stmt).unique().all()
@@ -478,6 +489,16 @@ def insert_user_group(db_session: Session, user_group: UserGroupCreate) -> UserG
db_session.add(db_user_group)
db_session.flush() # give the group an ID
# Every group gets the "basic" permission by default
db_session.add(
PermissionGrant(
group_id=db_user_group.id,
permission=Permission.BASIC_ACCESS,
grant_source=GrantSource.SYSTEM,
)
)
db_session.flush()
_add_user__user_group_relationships__no_commit(
db_session=db_session,
user_group_id=db_user_group.id,
@@ -489,6 +510,9 @@ def insert_user_group(db_session: Session, user_group: UserGroupCreate) -> UserG
cc_pair_ids=user_group.cc_pair_ids,
)
for uid in user_group.user_ids:
recompute_user_permissions__no_commit(uid, db_session)
db_session.commit()
return db_user_group
@@ -796,6 +820,36 @@ def update_user_group(
# update "time_updated" to now
db_user_group.time_last_modified_by_user = func.now()
for uid in set(added_user_ids) | set(removed_user_ids):
recompute_user_permissions__no_commit(uid, db_session)
db_session.commit()
return db_user_group
def rename_user_group(
db_session: Session,
user_group_id: int,
new_name: str,
) -> UserGroup:
stmt = select(UserGroup).where(UserGroup.id == user_group_id)
db_user_group = db_session.scalar(stmt)
if db_user_group is None:
raise ValueError(f"UserGroup with id '{user_group_id}' not found")
_check_user_group_is_modifiable(db_user_group)
db_user_group.name = new_name
db_user_group.time_last_modified_by_user = func.now()
# CC pair documents in Vespa contain the group name, so we need to
# trigger a sync to update them with the new name.
_mark_user_group__cc_pair_relationships_outdated__no_commit(
db_session=db_session, user_group_id=user_group_id
)
if not DISABLE_VECTOR_DB:
db_user_group.is_up_to_date = False
db_session.commit()
return db_user_group
@@ -808,6 +862,17 @@ def prepare_user_group_for_deletion(db_session: Session, user_group_id: int) ->
_check_user_group_is_modifiable(db_user_group)
# Collect affected user IDs before cleanup deletes the relationships
affected_user_ids = (
db_session.execute(
select(User__UserGroup.user_id).where(
User__UserGroup.user_group_id == user_group_id
)
)
.scalars()
.all()
)
_mark_user_group__cc_pair_relationships_outdated__no_commit(
db_session=db_session, user_group_id=user_group_id
)
@@ -836,6 +901,11 @@ def prepare_user_group_for_deletion(db_session: Session, user_group_id: int) ->
db_session=db_session, user_group_id=user_group_id
)
# Recompute permissions for affected users now that their
# membership in this group has been removed
for uid in affected_user_ids:
recompute_user_permissions__no_commit(uid, db_session)
db_user_group.is_up_to_date = False
db_user_group.is_up_for_deletion = True
db_session.commit()

View File

@@ -250,20 +250,24 @@ def _get_sharepoint_list_item_id(drive_item: DriveItem) -> str | None:
raise e
def _is_public_item(drive_item: DriveItem) -> bool:
is_public = False
def _is_public_item(
drive_item: DriveItem,
treat_sharing_link_as_public: bool = False,
) -> bool:
if not treat_sharing_link_as_public:
return False
try:
permissions = sleep_and_retry(
drive_item.permissions.get_all(page_loaded=lambda _: None), "is_public_item"
)
for permission in permissions:
if permission.link and (
permission.link.scope == "anonymous"
or permission.link.scope == "organization"
if permission.link and permission.link.scope in (
"anonymous",
"organization",
):
is_public = True
break
return is_public
return True
return False
except Exception as e:
logger.error(f"Failed to check if item {drive_item.id} is public: {e}")
return False
@@ -504,6 +508,7 @@ def get_external_access_from_sharepoint(
drive_item: DriveItem | None,
site_page: dict[str, Any] | None,
add_prefix: bool = False,
treat_sharing_link_as_public: bool = False,
) -> ExternalAccess:
"""
Get external access information from SharePoint.
@@ -563,8 +568,7 @@ def get_external_access_from_sharepoint(
)
if drive_item and drive_name:
# Here we check if the item have have any public links, if so we return early
is_public = _is_public_item(drive_item)
is_public = _is_public_item(drive_item, treat_sharing_link_as_public)
if is_public:
logger.info(f"Item {drive_item.id} is public")
return ExternalAccess(

View File

@@ -8,6 +8,7 @@ from ee.onyx.external_permissions.slack.utils import fetch_user_id_to_email_map
from onyx.access.models import DocExternalAccess
from onyx.access.models import ExternalAccess
from onyx.connectors.credentials_provider import OnyxDBCredentialsProvider
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.models import HierarchyNode
from onyx.connectors.slack.connector import get_channels
from onyx.connectors.slack.connector import make_paginated_slack_api_call
@@ -105,9 +106,11 @@ def _get_slack_document_access(
slack_connector: SlackConnector,
channel_permissions: dict[str, ExternalAccess], # noqa: ARG001
callback: IndexingHeartbeatInterface | None,
indexing_start: SecondsSinceUnixEpoch | None = None,
) -> Generator[DocExternalAccess, None, None]:
slim_doc_generator = slack_connector.retrieve_all_slim_docs_perm_sync(
callback=callback
callback=callback,
start=indexing_start,
)
for doc_metadata_batch in slim_doc_generator:
@@ -180,9 +183,15 @@ def slack_doc_sync(
slack_connector = SlackConnector(**cc_pair.connector.connector_specific_config)
slack_connector.set_credentials_provider(provider)
indexing_start_ts: SecondsSinceUnixEpoch | None = (
cc_pair.connector.indexing_start.timestamp()
if cc_pair.connector.indexing_start is not None
else None
)
yield from _get_slack_document_access(
slack_connector,
slack_connector=slack_connector,
channel_permissions=channel_permissions,
callback=callback,
indexing_start=indexing_start_ts,
)

View File

@@ -6,6 +6,7 @@ from onyx.access.models import ElementExternalAccess
from onyx.access.models import ExternalAccess
from onyx.access.models import NodeExternalAccess
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import HierarchyNode
from onyx.db.models import ConnectorCredentialPair
@@ -40,10 +41,19 @@ def generic_doc_sync(
logger.info(f"Starting {doc_source} doc sync for CC Pair ID: {cc_pair.id}")
indexing_start: SecondsSinceUnixEpoch | None = (
cc_pair.connector.indexing_start.timestamp()
if cc_pair.connector.indexing_start is not None
else None
)
newly_fetched_doc_ids: set[str] = set()
logger.info(f"Fetching all slim documents from {doc_source}")
for doc_batch in slim_connector.retrieve_all_slim_docs_perm_sync(callback=callback):
for doc_batch in slim_connector.retrieve_all_slim_docs_perm_sync(
start=indexing_start,
callback=callback,
):
logger.info(f"Got {len(doc_batch)} slim documents from {doc_source}")
if callback:

View File

View File

@@ -0,0 +1,385 @@
"""Hook executor — calls a customer's external HTTP endpoint for a given hook point.
Usage (Celery tasks and FastAPI handlers):
result = execute_hook(
db_session=db_session,
hook_point=HookPoint.QUERY_PROCESSING,
payload={"query": "...", "user_email": "...", "chat_session_id": "..."},
response_type=QueryProcessingResponse,
)
if isinstance(result, HookSkipped):
# no active hook configured — continue with original behavior
...
elif isinstance(result, HookSoftFailed):
# hook failed but fail strategy is SOFT — continue with original behavior
...
else:
# result is a validated Pydantic model instance (response_type)
...
is_reachable update policy
--------------------------
``is_reachable`` on the Hook row is updated selectively — only when the outcome
carries meaningful signal about physical reachability:
NetworkError (DNS, connection refused) → False (cannot reach the server)
HTTP 401 / 403 → False (api_key revoked or invalid)
TimeoutException → None (server may be slow, skip write)
Other HTTP errors (4xx / 5xx) → None (server responded, skip write)
Unknown exception → None (no signal, skip write)
Non-JSON / non-dict response → None (server responded, skip write)
Success (2xx, valid dict) → True (confirmed reachable)
None means "leave the current value unchanged" — no DB round-trip is made.
DB session design
-----------------
The executor uses three sessions:
1. Caller's session (db_session) — used only for the hook lookup read. All
needed fields are extracted from the Hook object before the HTTP call, so
the caller's session is not held open during the external HTTP request.
2. Log session — a separate short-lived session opened after the HTTP call
completes to write the HookExecutionLog row on failure. Success runs are
not recorded. Committed independently of everything else.
3. Reachable session — a second short-lived session to update is_reachable on
the Hook. Kept separate from the log session so a concurrent hook deletion
(which causes update_hook__no_commit to raise OnyxError(NOT_FOUND)) cannot
prevent the execution log from being written. This update is best-effort.
"""
import json
import time
from typing import Any
from typing import TypeVar
import httpx
from pydantic import BaseModel
from pydantic import ValidationError
from sqlalchemy.orm import Session
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.enums import HookFailStrategy
from onyx.db.enums import HookPoint
from onyx.db.hook import create_hook_execution_log__no_commit
from onyx.db.hook import get_non_deleted_hook_by_hook_point
from onyx.db.hook import update_hook__no_commit
from onyx.db.models import Hook
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.hooks.executor import HookSkipped
from onyx.hooks.executor import HookSoftFailed
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
logger = setup_logger()
T = TypeVar("T", bound=BaseModel)
# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------
class _HttpOutcome(BaseModel):
"""Structured result of an HTTP hook call, returned by _process_response."""
is_success: bool
updated_is_reachable: (
bool | None
) # True/False = write to DB, None = unchanged (skip write)
status_code: int | None
error_message: str | None
response_payload: dict[str, Any] | None
def _lookup_hook(
db_session: Session,
hook_point: HookPoint,
) -> Hook | HookSkipped:
"""Return the active Hook or HookSkipped if hooks are unavailable/unconfigured.
No HTTP call is made and no DB writes are performed for any HookSkipped path.
There is nothing to log and no reachability information to update.
"""
if MULTI_TENANT:
return HookSkipped()
hook = get_non_deleted_hook_by_hook_point(
db_session=db_session, hook_point=hook_point
)
if hook is None or not hook.is_active:
return HookSkipped()
if not hook.endpoint_url:
return HookSkipped()
return hook
def _process_response(
*,
response: httpx.Response | None,
exc: Exception | None,
timeout: float,
) -> _HttpOutcome:
"""Process the result of an HTTP call and return a structured outcome.
Called after the client.post() try/except. If post() raised, exc is set and
response is None. Otherwise response is set and exc is None. Handles
raise_for_status(), JSON decoding, and the dict shape check.
"""
if exc is not None:
if isinstance(exc, httpx.NetworkError):
msg = f"Hook network error (endpoint unreachable): {exc}"
logger.warning(msg, exc_info=exc)
return _HttpOutcome(
is_success=False,
updated_is_reachable=False,
status_code=None,
error_message=msg,
response_payload=None,
)
if isinstance(exc, httpx.TimeoutException):
msg = f"Hook timed out after {timeout}s: {exc}"
logger.warning(msg, exc_info=exc)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # timeout doesn't indicate unreachability
status_code=None,
error_message=msg,
response_payload=None,
)
msg = f"Hook call failed: {exc}"
logger.exception(msg, exc_info=exc)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # unknown error — don't make assumptions
status_code=None,
error_message=msg,
response_payload=None,
)
if response is None:
raise ValueError(
"exactly one of response or exc must be non-None; both are None"
)
status_code = response.status_code
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
msg = f"Hook returned HTTP {e.response.status_code}: {e.response.text}"
logger.warning(msg, exc_info=e)
# 401/403 means the api_key has been revoked or is invalid — mark unreachable
# so the operator knows to update it. All other HTTP errors keep is_reachable
# as-is (server is up, the request just failed for application reasons).
auth_failed = e.response.status_code in (401, 403)
return _HttpOutcome(
is_success=False,
updated_is_reachable=False if auth_failed else None,
status_code=status_code,
error_message=msg,
response_payload=None,
)
try:
response_payload = response.json()
except (json.JSONDecodeError, httpx.DecodingError) as e:
msg = f"Hook returned non-JSON response: {e}"
logger.warning(msg, exc_info=e)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # server responded — reachability unchanged
status_code=status_code,
error_message=msg,
response_payload=None,
)
if not isinstance(response_payload, dict):
msg = f"Hook returned non-dict JSON (got {type(response_payload).__name__})"
logger.warning(msg)
return _HttpOutcome(
is_success=False,
updated_is_reachable=None, # server responded — reachability unchanged
status_code=status_code,
error_message=msg,
response_payload=None,
)
return _HttpOutcome(
is_success=True,
updated_is_reachable=True,
status_code=status_code,
error_message=None,
response_payload=response_payload,
)
def _persist_result(
*,
hook_id: int,
outcome: _HttpOutcome,
duration_ms: int,
) -> None:
"""Write the execution log on failure and optionally update is_reachable, each
in its own session so a failure in one does not affect the other."""
# Only write the execution log on failure — success runs are not recorded.
# Must not be skipped if the is_reachable update fails (e.g. hook concurrently
# deleted between the initial lookup and here).
if not outcome.is_success:
try:
with get_session_with_current_tenant() as log_session:
create_hook_execution_log__no_commit(
db_session=log_session,
hook_id=hook_id,
is_success=False,
error_message=outcome.error_message,
status_code=outcome.status_code,
duration_ms=duration_ms,
)
log_session.commit()
except Exception:
logger.exception(
f"Failed to persist hook execution log for hook_id={hook_id}"
)
# Update is_reachable separately — best-effort, non-critical.
# None means the value is unchanged (set by the caller to skip the no-op write).
# update_hook__no_commit can raise OnyxError(NOT_FOUND) if the hook was
# concurrently deleted, so keep this isolated from the log write above.
if outcome.updated_is_reachable is not None:
try:
with get_session_with_current_tenant() as reachable_session:
update_hook__no_commit(
db_session=reachable_session,
hook_id=hook_id,
is_reachable=outcome.updated_is_reachable,
)
reachable_session.commit()
except Exception:
logger.warning(f"Failed to update is_reachable for hook_id={hook_id}")
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def _execute_hook_inner(
hook: Hook,
payload: dict[str, Any],
response_type: type[T],
) -> T | HookSoftFailed:
"""Make the HTTP call, validate the response, and return a typed model.
Raises OnyxError on HARD failure. Returns HookSoftFailed on SOFT failure.
"""
timeout = hook.timeout_seconds
hook_id = hook.id
fail_strategy = hook.fail_strategy
endpoint_url = hook.endpoint_url
current_is_reachable: bool | None = hook.is_reachable
if not endpoint_url:
raise ValueError(
f"hook_id={hook_id} is active but has no endpoint_url — "
"active hooks without an endpoint_url must be rejected by _lookup_hook"
)
start = time.monotonic()
response: httpx.Response | None = None
exc: Exception | None = None
try:
api_key: str | None = (
hook.api_key.get_value(apply_mask=False) if hook.api_key else None
)
headers: dict[str, str] = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
with httpx.Client(
timeout=timeout, follow_redirects=False
) as client: # SSRF guard: never follow redirects
response = client.post(endpoint_url, json=payload, headers=headers)
except Exception as e:
exc = e
duration_ms = int((time.monotonic() - start) * 1000)
outcome = _process_response(response=response, exc=exc, timeout=timeout)
# Validate the response payload against response_type.
# A validation failure downgrades the outcome to a failure so it is logged,
# is_reachable is left unchanged (server responded — just a bad payload),
# and fail_strategy is respected below.
validated_model: T | None = None
if outcome.is_success and outcome.response_payload is not None:
try:
validated_model = response_type.model_validate(outcome.response_payload)
except ValidationError as e:
msg = (
f"Hook response failed validation against {response_type.__name__}: {e}"
)
outcome = _HttpOutcome(
is_success=False,
updated_is_reachable=None, # server responded — reachability unchanged
status_code=outcome.status_code,
error_message=msg,
response_payload=None,
)
# Skip the is_reachable write when the value would not change — avoids a
# no-op DB round-trip on every call when the hook is already in the expected state.
if outcome.updated_is_reachable == current_is_reachable:
outcome = outcome.model_copy(update={"updated_is_reachable": None})
_persist_result(hook_id=hook_id, outcome=outcome, duration_ms=duration_ms)
if not outcome.is_success:
if fail_strategy == HookFailStrategy.HARD:
raise OnyxError(
OnyxErrorCode.HOOK_EXECUTION_FAILED,
outcome.error_message or "Hook execution failed.",
)
logger.warning(
f"Hook execution failed (soft fail) for hook_id={hook_id}: {outcome.error_message}"
)
return HookSoftFailed()
if validated_model is None:
raise OnyxError(
OnyxErrorCode.INTERNAL_ERROR,
f"validated_model is None for successful hook call (hook_id={hook_id})",
)
return validated_model
def _execute_hook_impl(
*,
db_session: Session,
hook_point: HookPoint,
payload: dict[str, Any],
response_type: type[T],
) -> T | HookSkipped | HookSoftFailed:
"""EE implementation — loaded by CE's execute_hook via fetch_versioned_implementation.
Returns HookSkipped if no active hook is configured, HookSoftFailed if the
hook failed with SOFT fail strategy, or a validated response model on success.
Raises OnyxError on HARD failure or if the hook is misconfigured.
"""
hook = _lookup_hook(db_session, hook_point)
if isinstance(hook, HookSkipped):
return hook
fail_strategy = hook.fail_strategy
hook_id = hook.id
try:
return _execute_hook_inner(hook, payload, response_type)
except Exception:
if fail_strategy == HookFailStrategy.SOFT:
logger.exception(
f"Unexpected error in hook execution (soft fail) for hook_id={hook_id}"
)
return HookSoftFailed()
raise

View File

@@ -15,6 +15,7 @@ from ee.onyx.server.enterprise_settings.api import (
basic_router as enterprise_settings_router,
)
from ee.onyx.server.evals.api import router as evals_router
from ee.onyx.server.features.hooks.api import router as hook_router
from ee.onyx.server.license.api import router as license_router
from ee.onyx.server.manage.standard_answer import router as standard_answer_router
from ee.onyx.server.middleware.license_enforcement import (
@@ -138,6 +139,7 @@ def get_application() -> FastAPI:
include_router_with_global_prefix_prepended(application, ee_oauth_router)
include_router_with_global_prefix_prepended(application, ee_document_cc_pair_router)
include_router_with_global_prefix_prepended(application, evals_router)
include_router_with_global_prefix_prepended(application, hook_router)
# Enterprise-only global settings
include_router_with_global_prefix_prepended(

View File

@@ -44,19 +44,21 @@ def _run_single_search(
user: User,
db_session: Session,
num_hits: int | None = None,
hybrid_alpha: float | None = None,
) -> list[InferenceChunk]:
"""Execute a single search query and return chunks."""
chunk_search_request = ChunkSearchRequest(
query=query,
user_selected_filters=filters,
limit=num_hits,
hybrid_alpha=hybrid_alpha,
)
return search_pipeline(
chunk_search_request=chunk_search_request,
document_index=document_index,
user=user,
persona=None, # No persona for direct search
persona_search_info=None,
db_session=db_session,
)
@@ -74,7 +76,7 @@ def stream_search_query(
Core search function that yields streaming packets.
Used by both streaming and non-streaming endpoints.
"""
# Get document index
# Get document index.
search_settings = get_current_search_settings(db_session)
# This flow is for search so we do not get all indices.
document_index = get_default_document_index(search_settings, None, db_session)
@@ -119,6 +121,7 @@ def stream_search_query(
user=user,
db_session=db_session,
num_hits=request.num_hits,
hybrid_alpha=request.hybrid_alpha,
)
else:
# Multiple queries - run in parallel and merge with RRF
@@ -133,6 +136,7 @@ def stream_search_query(
user,
db_session,
request.num_hits,
request.hybrid_alpha,
),
)
for query in all_executed_queries

View File

@@ -44,11 +44,12 @@ def _check_ssrf_safety(endpoint_url: str) -> None:
"""Raise OnyxError if endpoint_url could be used for SSRF.
Delegates to validate_outbound_http_url with https_only=True.
Uses BAD_GATEWAY so the frontend maps the error to the Endpoint URL field.
"""
try:
validate_outbound_http_url(endpoint_url, https_only=True)
except (SSRFException, ValueError) as e:
raise OnyxError(OnyxErrorCode.INVALID_INPUT, str(e))
raise OnyxError(OnyxErrorCode.BAD_GATEWAY, str(e))
# ---------------------------------------------------------------------------
@@ -62,6 +63,9 @@ def _hook_to_response(hook: Hook, creator_email: str | None = None) -> HookRespo
name=hook.name,
hook_point=hook.hook_point,
endpoint_url=hook.endpoint_url,
api_key_masked=(
hook.api_key.get_value(apply_mask=True) if hook.api_key else None
),
fail_strategy=hook.fail_strategy,
timeout_seconds=hook.timeout_seconds,
is_active=hook.is_active,
@@ -119,9 +123,8 @@ def _validate_endpoint(
(not reachable indicates the api_key is invalid).
Timeout handling:
- ConnectTimeout: TCP handshake never completed cannot_connect.
- ReadTimeout / WriteTimeout: TCP was established, server responded slowly timeout
(operator should consider increasing timeout_seconds).
- Any httpx.TimeoutException (ConnectTimeout, ReadTimeout, WriteTimeout, PoolTimeout)
timeout (operator should consider increasing timeout_seconds).
- All other exceptions cannot_connect.
"""
_check_ssrf_safety(endpoint_url)
@@ -138,19 +141,11 @@ def _validate_endpoint(
)
return HookValidateResponse(status=HookValidateStatus.passed)
except httpx.TimeoutException as exc:
# ConnectTimeout: TCP handshake never completed → cannot_connect.
# ReadTimeout / WriteTimeout: TCP was established, server just responded slowly → timeout.
if isinstance(exc, httpx.ConnectTimeout):
logger.warning(
"Hook endpoint validation: connect timeout for %s",
endpoint_url,
exc_info=exc,
)
return HookValidateResponse(
status=HookValidateStatus.cannot_connect, error_message=str(exc)
)
# Any timeout (connect, read, or write) means the configured timeout_seconds
# is too low for this endpoint. Report as timeout so the UI directs the user
# to increase the timeout setting.
logger.warning(
"Hook endpoint validation: read/write timeout for %s",
"Hook endpoint validation: timeout for %s",
endpoint_url,
exc_info=exc,
)
@@ -220,8 +215,8 @@ def create_hook(
db_session: Session = Depends(get_session),
) -> HookResponse:
"""Create a new hook. The endpoint is validated before persisting — creation fails if
the endpoint cannot be reached or the api_key is invalid. Hooks are created inactive;
use POST /{hook_id}/activate once ready to receive traffic."""
the endpoint cannot be reached or the api_key is invalid. Hooks are created active.
"""
spec = get_hook_point_spec(req.hook_point)
api_key = req.api_key.get_secret_value() if req.api_key else None
validation = _validate_endpoint(
@@ -240,9 +235,10 @@ def create_hook(
api_key=api_key,
fail_strategy=req.fail_strategy or spec.default_fail_strategy,
timeout_seconds=req.timeout_seconds or spec.default_timeout_seconds,
is_active=True,
is_reachable=True,
creator_id=user.id,
)
hook.is_reachable = True
db_session.commit()
return _hook_to_response(hook, creator_email=user.email)

View File

@@ -27,15 +27,17 @@ class SearchFlowClassificationResponse(BaseModel):
is_search_flow: bool
# NOTE: This model is used for the core flow of the Onyx application, any changes to it should be reviewed and approved by an
# experienced team member. It is very important to 1. avoid bloat and 2. that this remains backwards compatible across versions.
# NOTE: This model is used for the core flow of the Onyx application, any
# changes to it should be reviewed and approved by an experienced team member.
# It is very important to 1. avoid bloat and 2. that this remains backwards
# compatible across versions.
class SendSearchQueryRequest(BaseModel):
search_query: str
filters: BaseFilters | None = None
num_docs_fed_to_llm_selection: int | None = None
run_query_expansion: bool = False
num_hits: int = 30
hybrid_alpha: float | None = None
include_content: bool = False
stream: bool = False

View File

@@ -20,6 +20,7 @@ from ee.onyx.server.query_and_chat.models import SearchQueryResponse
from ee.onyx.server.query_and_chat.models import SendSearchQueryRequest
from ee.onyx.server.query_and_chat.streaming_models import SearchErrorPacket
from onyx.auth.users import current_user
from onyx.configs.app_configs import ONYX_SEARCH_UI_USES_OPENSEARCH_KEYWORD_SEARCH
from onyx.db.engine.sql_engine import get_session
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.models import User
@@ -67,8 +68,10 @@ def search_flow_classification(
return SearchFlowClassificationResponse(is_search_flow=is_search_flow)
# NOTE: This endpoint is used for the core flow of the Onyx application, any changes to it should be reviewed and approved by an
# experienced team member. It is very important to 1. avoid bloat and 2. that this remains backwards compatible across versions.
# NOTE: This endpoint is used for the core flow of the Onyx application, any
# changes to it should be reviewed and approved by an experienced team member.
# It is very important to 1. avoid bloat and 2. that this remains backwards
# compatible across versions.
@router.post(
"/send-search-message",
response_model=None,
@@ -80,13 +83,19 @@ def handle_send_search_message(
db_session: Session = Depends(get_session),
) -> StreamingResponse | SearchFullResponse:
"""
Execute a search query with optional streaming.
Executes a search query with optional streaming.
When stream=True: Returns StreamingResponse with SSE
When stream=False: Returns SearchFullResponse
If hybrid_alpha is unset and ONYX_SEARCH_UI_USES_OPENSEARCH_KEYWORD_SEARCH
is True, executes pure keyword search.
Returns:
StreamingResponse with SSE if stream=True, otherwise SearchFullResponse.
"""
logger.debug(f"Received search query: {request.search_query}")
if request.hybrid_alpha is None and ONYX_SEARCH_UI_USES_OPENSEARCH_KEYWORD_SEARCH:
request.hybrid_alpha = 0.0
# Non-streaming path
if not request.stream:
try:

View File

@@ -52,11 +52,13 @@ from ee.onyx.server.scim.schema_definitions import SERVICE_PROVIDER_CONFIG
from ee.onyx.server.scim.schema_definitions import USER_RESOURCE_TYPE
from ee.onyx.server.scim.schema_definitions import USER_SCHEMA_DEF
from onyx.db.engine.sql_engine import get_session
from onyx.db.enums import AccountType
from onyx.db.models import ScimToken
from onyx.db.models import ScimUserMapping
from onyx.db.models import User
from onyx.db.models import UserGroup
from onyx.db.models import UserRole
from onyx.db.users import assign_user_to_default_groups__no_commit
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import fetch_ee_implementation_or_noop
@@ -486,6 +488,7 @@ def create_user(
email=email,
hashed_password=_pw_helper.hash(_pw_helper.generate()),
role=UserRole.BASIC,
account_type=AccountType.STANDARD,
is_active=user_resource.active,
is_verified=True,
personal_name=personal_name,
@@ -506,13 +509,25 @@ def create_user(
scim_username=scim_username,
fields=fields,
)
dal.commit()
except IntegrityError:
dal.rollback()
return _scim_error_response(
409, f"User with email {email} already has a SCIM mapping"
)
# Assign user to default group BEFORE commit so everything is atomic.
# If this fails, the entire user creation rolls back and IdP can retry.
try:
assign_user_to_default_groups__no_commit(db_session, user)
except Exception:
dal.rollback()
logger.exception(f"Failed to assign SCIM user {email} to default groups")
return _scim_error_response(
500, f"Failed to assign user {email} to default group"
)
dal.commit()
return _scim_resource_response(
provider.build_user_resource(
user,

View File

@@ -178,7 +178,7 @@ def _seed_personas(db_session: Session, personas: list[PersonaUpsertRequest]) ->
system_prompt=persona.system_prompt,
task_prompt=persona.task_prompt,
datetime_aware=persona.datetime_aware,
featured=persona.featured,
is_featured=persona.is_featured,
commit=False,
)
db_session.commit()

View File

@@ -99,6 +99,26 @@ async def get_or_provision_tenant(
tenant_id = await get_available_tenant()
if tenant_id:
# Run migrations to ensure the pre-provisioned tenant schema is current.
# Pool tenants may have been created before a new migration was deployed.
# Capture as a non-optional local so mypy can type the lambda correctly.
_tenant_id: str = tenant_id
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(
None, lambda: run_alembic_migrations(_tenant_id)
)
except Exception:
# The tenant was already dequeued from the pool — roll it back so
# it doesn't end up orphaned (schema exists, but not assigned to anyone).
logger.exception(
f"Migration failed for pre-provisioned tenant {_tenant_id}; rolling back"
)
try:
await rollback_tenant_provisioning(_tenant_id)
except Exception:
logger.exception(f"Failed to rollback orphaned tenant {_tenant_id}")
raise
# If we have a pre-provisioned tenant, assign it to the user
await assign_tenant_to_user(tenant_id, email, referral_source)
logger.info(f"Assigned pre-provisioned tenant {tenant_id} to user {email}")

View File

@@ -4,6 +4,7 @@ from fastapi import HTTPException
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from ee.onyx.db.persona import update_persona_access
from ee.onyx.db.user_group import add_users_to_user_group
from ee.onyx.db.user_group import delete_user_group as db_delete_user_group
from ee.onyx.db.user_group import fetch_user_group
@@ -11,13 +12,16 @@ from ee.onyx.db.user_group import fetch_user_groups
from ee.onyx.db.user_group import fetch_user_groups_for_user
from ee.onyx.db.user_group import insert_user_group
from ee.onyx.db.user_group import prepare_user_group_for_deletion
from ee.onyx.db.user_group import rename_user_group
from ee.onyx.db.user_group import update_user_curator_relationship
from ee.onyx.db.user_group import update_user_group
from ee.onyx.server.user_group.models import AddUsersToUserGroupRequest
from ee.onyx.server.user_group.models import MinimalUserGroupSnapshot
from ee.onyx.server.user_group.models import SetCuratorRequest
from ee.onyx.server.user_group.models import UpdateGroupAgentsRequest
from ee.onyx.server.user_group.models import UserGroup
from ee.onyx.server.user_group.models import UserGroupCreate
from ee.onyx.server.user_group.models import UserGroupRename
from ee.onyx.server.user_group.models import UserGroupUpdate
from onyx.auth.users import current_admin_user
from onyx.auth.users import current_curator_or_admin_user
@@ -27,6 +31,9 @@ from onyx.configs.constants import PUBLIC_API_TAGS
from onyx.db.engine.sql_engine import get_session
from onyx.db.models import User
from onyx.db.models import UserRole
from onyx.db.persona import get_persona_by_id
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -36,12 +43,16 @@ router = APIRouter(prefix="/manage", tags=PUBLIC_API_TAGS)
@router.get("/admin/user-group")
def list_user_groups(
include_default: bool = False,
user: User = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
) -> list[UserGroup]:
if user.role == UserRole.ADMIN:
user_groups = fetch_user_groups(
db_session, only_up_to_date=False, eager_load_for_snapshot=True
db_session,
only_up_to_date=False,
eager_load_for_snapshot=True,
include_default=include_default,
)
else:
user_groups = fetch_user_groups_for_user(
@@ -49,27 +60,50 @@ def list_user_groups(
user_id=user.id,
only_curator_groups=user.role == UserRole.CURATOR,
eager_load_for_snapshot=True,
include_default=include_default,
)
return [UserGroup.from_model(user_group) for user_group in user_groups]
@router.get("/user-groups/minimal")
def list_minimal_user_groups(
include_default: bool = False,
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> list[MinimalUserGroupSnapshot]:
if user.role == UserRole.ADMIN:
user_groups = fetch_user_groups(db_session, only_up_to_date=False)
user_groups = fetch_user_groups(
db_session,
only_up_to_date=False,
include_default=include_default,
)
else:
user_groups = fetch_user_groups_for_user(
db_session=db_session,
user_id=user.id,
include_default=include_default,
)
return [
MinimalUserGroupSnapshot.from_model(user_group) for user_group in user_groups
]
@router.get("/admin/user-group/{user_group_id}/permissions")
def get_user_group_permissions(
user_group_id: int,
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> list[str]:
group = fetch_user_group(db_session, user_group_id)
if group is None:
raise OnyxError(OnyxErrorCode.NOT_FOUND, "User group not found")
return [
grant.permission.value
for grant in group.permission_grants
if not grant.is_deleted
]
@router.post("/admin/user-group")
def create_user_group(
user_group: UserGroupCreate,
@@ -87,6 +121,35 @@ def create_user_group(
return UserGroup.from_model(db_user_group)
@router.patch("/admin/user-group/rename")
def rename_user_group_endpoint(
rename_request: UserGroupRename,
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> UserGroup:
group = fetch_user_group(db_session, rename_request.id)
if group and group.is_default:
raise OnyxError(OnyxErrorCode.CONFLICT, "Cannot rename a default system group.")
try:
return UserGroup.from_model(
rename_user_group(
db_session=db_session,
user_group_id=rename_request.id,
new_name=rename_request.name,
)
)
except IntegrityError:
raise OnyxError(
OnyxErrorCode.DUPLICATE_RESOURCE,
f"User group with name '{rename_request.name}' already exists.",
)
except ValueError as e:
msg = str(e)
if "not found" in msg.lower():
raise OnyxError(OnyxErrorCode.NOT_FOUND, msg)
raise OnyxError(OnyxErrorCode.CONFLICT, msg)
@router.patch("/admin/user-group/{user_group_id}")
def patch_user_group(
user_group_id: int,
@@ -152,6 +215,9 @@ def delete_user_group(
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> None:
group = fetch_user_group(db_session, user_group_id)
if group and group.is_default:
raise OnyxError(OnyxErrorCode.CONFLICT, "Cannot delete a default system group.")
try:
prepare_user_group_for_deletion(db_session, user_group_id)
except ValueError as e:
@@ -161,3 +227,38 @@ def delete_user_group(
user_group = fetch_user_group(db_session, user_group_id)
if user_group:
db_delete_user_group(db_session, user_group)
@router.patch("/admin/user-group/{user_group_id}/agents")
def update_group_agents(
user_group_id: int,
request: UpdateGroupAgentsRequest,
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> None:
for agent_id in request.added_agent_ids:
persona = get_persona_by_id(
persona_id=agent_id, user=user, db_session=db_session
)
current_group_ids = [g.id for g in persona.groups]
if user_group_id not in current_group_ids:
update_persona_access(
persona_id=agent_id,
creator_user_id=user.id,
db_session=db_session,
group_ids=current_group_ids + [user_group_id],
)
for agent_id in request.removed_agent_ids:
persona = get_persona_by_id(
persona_id=agent_id, user=user, db_session=db_session
)
current_group_ids = [g.id for g in persona.groups]
update_persona_access(
persona_id=agent_id,
creator_user_id=user.id,
db_session=db_session,
group_ids=[gid for gid in current_group_ids if gid != user_group_id],
)
db_session.commit()

View File

@@ -22,6 +22,7 @@ class UserGroup(BaseModel):
personas: list[PersonaSnapshot]
is_up_to_date: bool
is_up_for_deletion: bool
is_default: bool
@classmethod
def from_model(cls, user_group_model: UserGroupModel) -> "UserGroup":
@@ -74,18 +75,21 @@ class UserGroup(BaseModel):
],
is_up_to_date=user_group_model.is_up_to_date,
is_up_for_deletion=user_group_model.is_up_for_deletion,
is_default=user_group_model.is_default,
)
class MinimalUserGroupSnapshot(BaseModel):
id: int
name: str
is_default: bool
@classmethod
def from_model(cls, user_group_model: UserGroupModel) -> "MinimalUserGroupSnapshot":
return cls(
id=user_group_model.id,
name=user_group_model.name,
is_default=user_group_model.is_default,
)
@@ -104,6 +108,16 @@ class AddUsersToUserGroupRequest(BaseModel):
user_ids: list[UUID]
class UserGroupRename(BaseModel):
id: int
name: str
class SetCuratorRequest(BaseModel):
user_id: UUID
is_curator: bool
class UpdateGroupAgentsRequest(BaseModel):
added_agent_ids: list[int]
removed_agent_ids: list[int]

View File

@@ -80,15 +80,45 @@ def capture_and_sync_with_alternate_posthog(
logger.error(f"Error identifying cloud posthog user: {e}")
def alias_user(distinct_id: str, anonymous_id: str) -> None:
"""Link an anonymous distinct_id to an identified user, merging person profiles.
No-ops when the IDs match (e.g. returning users whose PostHog cookie
already contains their identified user ID).
"""
if not posthog or anonymous_id == distinct_id:
return
try:
posthog.alias(previous_id=anonymous_id, distinct_id=distinct_id)
posthog.flush()
except Exception as e:
logger.error(f"Error aliasing PostHog user: {e}")
def get_anon_id_from_request(request: Any) -> str | None:
"""Extract the anonymous distinct_id from the app PostHog cookie on a request."""
if not POSTHOG_API_KEY:
return None
cookie_name = f"ph_{POSTHOG_API_KEY}_posthog"
if (cookie_value := request.cookies.get(cookie_name)) and (
parsed := parse_posthog_cookie(cookie_value)
):
return parsed.get("distinct_id")
return None
def get_marketing_posthog_cookie_name() -> str | None:
if not MARKETING_POSTHOG_API_KEY:
return None
return f"onyx_custom_ph_{MARKETING_POSTHOG_API_KEY}_posthog"
def parse_marketing_cookie(cookie_value: str) -> dict[str, Any] | None:
def parse_posthog_cookie(cookie_value: str) -> dict[str, Any] | None:
"""
Parse the URL-encoded JSON marketing cookie.
Parse a URL-encoded JSON PostHog cookie
Expected format (URL-encoded):
{"distinct_id":"...", "featureFlags":{"landing_page_variant":"..."}, ...}
@@ -102,7 +132,7 @@ def parse_marketing_cookie(cookie_value: str) -> dict[str, Any] | None:
cookie_data = json.loads(decoded_cookie)
distinct_id = cookie_data.get("distinct_id")
if not distinct_id:
if not distinct_id or not isinstance(distinct_id, str):
return None
return cookie_data

View File

@@ -100,6 +100,7 @@ def get_model_app() -> FastAPI:
dsn=SENTRY_DSN,
integrations=[StarletteIntegration(), FastApiIntegration()],
traces_sample_rate=0.1,
release=__version__,
)
logger.info("Sentry initialized")
else:

View File

@@ -0,0 +1,110 @@
"""
Permission resolution for group-based authorization.
Granted permissions are stored as a JSONB column on the User table and
loaded for free with every auth query. Implied permissions are expanded
at read time — only directly granted permissions are persisted.
"""
from collections.abc import Callable
from collections.abc import Coroutine
from typing import Any
from fastapi import Depends
from onyx.auth.users import current_user
from onyx.db.enums import Permission
from onyx.db.models import User
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.utils.logger import setup_logger
logger = setup_logger()
ALL_PERMISSIONS: frozenset[str] = frozenset(p.value for p in Permission)
# Implication map: granted permission -> set of permissions it implies.
IMPLIED_PERMISSIONS: dict[str, set[str]] = {
Permission.ADD_AGENTS.value: {Permission.READ_AGENTS.value},
Permission.MANAGE_AGENTS.value: {
Permission.ADD_AGENTS.value,
Permission.READ_AGENTS.value,
},
Permission.MANAGE_DOCUMENT_SETS.value: {
Permission.READ_DOCUMENT_SETS.value,
Permission.READ_CONNECTORS.value,
},
Permission.ADD_CONNECTORS.value: {Permission.READ_CONNECTORS.value},
Permission.MANAGE_CONNECTORS.value: {
Permission.ADD_CONNECTORS.value,
Permission.READ_CONNECTORS.value,
},
Permission.MANAGE_USER_GROUPS.value: {
Permission.READ_CONNECTORS.value,
Permission.READ_DOCUMENT_SETS.value,
Permission.READ_AGENTS.value,
Permission.READ_USERS.value,
},
}
def resolve_effective_permissions(granted: set[str]) -> set[str]:
"""Expand granted permissions with their implied permissions.
If "admin" is present, returns all 19 permissions.
"""
if Permission.FULL_ADMIN_PANEL_ACCESS.value in granted:
return set(ALL_PERMISSIONS)
effective = set(granted)
changed = True
while changed:
changed = False
for perm in list(effective):
implied = IMPLIED_PERMISSIONS.get(perm)
if implied and not implied.issubset(effective):
effective |= implied
changed = True
return effective
def get_effective_permissions(user: User) -> set[Permission]:
"""Read granted permissions from the column and expand implied permissions."""
granted: set[Permission] = set()
for p in user.effective_permissions:
try:
granted.add(Permission(p))
except ValueError:
logger.warning(f"Skipping unknown permission '{p}' for user {user.id}")
if Permission.FULL_ADMIN_PANEL_ACCESS in granted:
return set(Permission)
expanded = resolve_effective_permissions({p.value for p in granted})
return {Permission(p) for p in expanded}
def require_permission(
required: Permission,
) -> Callable[..., Coroutine[Any, Any, User]]:
"""FastAPI dependency factory for permission-based access control.
Usage:
@router.get("/endpoint")
def endpoint(user: User = Depends(require_permission(Permission.MANAGE_CONNECTORS))):
...
"""
async def dependency(user: User = Depends(current_user)) -> User:
effective = get_effective_permissions(user)
if Permission.FULL_ADMIN_PANEL_ACCESS in effective:
return user
if required not in effective:
raise OnyxError(
OnyxErrorCode.INSUFFICIENT_PERMISSIONS,
"You do not have the required permissions for this action.",
)
return user
return dependency

View File

@@ -5,6 +5,8 @@ from typing import Any
from fastapi_users import schemas
from typing_extensions import override
from onyx.db.enums import AccountType
class UserRole(str, Enum):
"""
@@ -41,6 +43,7 @@ class UserRead(schemas.BaseUser[uuid.UUID]):
class UserCreate(schemas.BaseUserCreate):
role: UserRole = UserRole.BASIC
account_type: AccountType = AccountType.STANDARD
tenant_id: str | None = None
# Captcha token for cloud signup protection (optional, only used when captcha is enabled)
# Excluded from create_update_dict so it never reaches the DB layer
@@ -50,12 +53,16 @@ class UserCreate(schemas.BaseUserCreate):
def create_update_dict(self) -> dict[str, Any]:
d = super().create_update_dict()
d.pop("captcha_token", None)
# Force STANDARD for self-registration; only trusted paths
# (SCIM, API key creation) supply a different account_type directly.
d["account_type"] = AccountType.STANDARD
return d
@override
def create_update_dict_superuser(self) -> dict[str, Any]:
d = super().create_update_dict_superuser()
d.pop("captcha_token", None)
d.setdefault("account_type", self.account_type)
return d

View File

@@ -120,11 +120,13 @@ from onyx.db.engine.async_sql_engine import get_async_session
from onyx.db.engine.async_sql_engine import get_async_session_context_manager
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.engine.sql_engine import get_session_with_tenant
from onyx.db.enums import AccountType
from onyx.db.models import AccessToken
from onyx.db.models import OAuthAccount
from onyx.db.models import Persona
from onyx.db.models import User
from onyx.db.pat import fetch_user_for_pat
from onyx.db.users import assign_user_to_default_groups__no_commit
from onyx.db.users import get_user_by_email
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import log_onyx_error
@@ -135,6 +137,8 @@ from onyx.redis.redis_pool import retrieve_ws_token_data
from onyx.server.settings.store import load_settings
from onyx.server.utils import BasicAuthenticationError
from onyx.utils.logger import setup_logger
from onyx.utils.telemetry import mt_cloud_alias
from onyx.utils.telemetry import mt_cloud_get_anon_id
from onyx.utils.telemetry import mt_cloud_identify
from onyx.utils.telemetry import mt_cloud_telemetry
from onyx.utils.telemetry import optional_telemetry
@@ -251,18 +255,12 @@ def verify_email_is_invited(email: str) -> None:
whitelist = get_invited_users()
if not email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": "Email must be specified"},
)
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email must be specified")
try:
email_info = validate_email(email, check_deliverability=False)
except EmailUndeliverableError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": "Email is not valid"},
)
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email is not valid")
for email_whitelist in whitelist:
try:
@@ -279,12 +277,9 @@ def verify_email_is_invited(email: str) -> None:
if email_info.normalized.lower() == email_info_whitelist.normalized.lower():
return
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"code": REGISTER_INVITE_ONLY_CODE,
"reason": "This workspace is invite-only. Please ask your admin to invite you.",
},
raise OnyxError(
OnyxErrorCode.UNAUTHORIZED,
"This workspace is invite-only. Please ask your admin to invite you.",
)
@@ -294,48 +289,47 @@ def verify_email_in_whitelist(email: str, tenant_id: str) -> None:
verify_email_is_invited(email)
def verify_email_domain(email: str) -> None:
def verify_email_domain(email: str, *, is_registration: bool = False) -> None:
if email.count("@") != 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email is not valid",
)
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email is not valid")
local_part, domain = email.split("@")
domain = domain.lower()
local_part = local_part.lower()
if AUTH_TYPE == AuthType.CLOUD:
# Normalize googlemail.com to gmail.com (they deliver to the same inbox)
if domain == "googlemail.com":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": "Please use @gmail.com instead of @googlemail.com."},
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Please use @gmail.com instead of @googlemail.com.",
)
# Only block dotted Gmail on new signups — existing users must still be
# able to sign in with the address they originally registered with.
if is_registration and domain == "gmail.com" and "." in local_part:
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Gmail addresses with '.' are not allowed. Please use your base email address.",
)
if "+" in local_part and domain != "onyx.app":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"reason": "Email addresses with '+' are not allowed. Please use your base email address."
},
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Email addresses with '+' are not allowed. Please use your base email address.",
)
# Check if email uses a disposable/temporary domain
if is_disposable_email(email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"reason": "Disposable email addresses are not allowed. Please use a permanent email address."
},
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Disposable email addresses are not allowed. Please use a permanent email address.",
)
# Check domain whitelist if configured
if VALID_EMAIL_DOMAINS:
if domain not in VALID_EMAIL_DOMAINS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email domain is not valid",
)
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email domain is not valid")
def enforce_seat_limit(db_session: Session, seats_needed: int = 1) -> None:
@@ -351,7 +345,7 @@ def enforce_seat_limit(db_session: Session, seats_needed: int = 1) -> None:
)(db_session, seats_needed=seats_needed)
if result is not None and not result.available:
raise HTTPException(status_code=402, detail=result.error_message)
raise OnyxError(OnyxErrorCode.SEAT_LIMIT_EXCEEDED, result.error_message)
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
@@ -404,10 +398,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
captcha_token or "", expected_action="signup"
)
except CaptchaVerificationError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": str(e)},
)
raise OnyxError(OnyxErrorCode.INVALID_INPUT, str(e))
# We verify the password here to make sure it's valid before we proceed
await self.validate_password(
@@ -417,13 +408,10 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# Check for disposable emails BEFORE provisioning tenant
# This prevents creating tenants for throwaway email addresses
try:
verify_email_domain(user_create.email)
except HTTPException as e:
verify_email_domain(user_create.email, is_registration=True)
except OnyxError as e:
# Log blocked disposable email attempts
if (
e.status_code == status.HTTP_400_BAD_REQUEST
and "Disposable email" in str(e.detail)
):
if "Disposable email" in e.detail:
domain = (
user_create.email.split("@")[-1]
if "@" in user_create.email
@@ -567,9 +555,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
result = await db_session.execute(
select(Persona.id)
.where(
Persona.featured.is_(True),
Persona.is_featured.is_(True),
Persona.is_public.is_(True),
Persona.is_visible.is_(True),
Persona.is_listed.is_(True),
Persona.deleted.is_(False),
)
.order_by(
@@ -697,6 +685,8 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
raise exceptions.UserNotExists()
except exceptions.UserNotExists:
verify_email_domain(account_email, is_registration=True)
# Check seat availability before creating (single-tenant only)
with get_session_with_current_tenant() as sync_db:
enforce_seat_limit(sync_db)
@@ -706,6 +696,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
"email": account_email,
"hashed_password": self.password_helper.hash(password),
"is_verified": is_verified_by_default,
"account_type": AccountType.STANDARD,
}
user = await self.user_db.create(user_dict)
@@ -755,14 +746,23 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
with get_session_with_current_tenant() as sync_db:
enforce_seat_limit(sync_db)
await self.user_db.update(
user,
{
"is_verified": is_verified_by_default,
"role": UserRole.BASIC,
**({"is_active": True} if not user.is_active else {}),
},
)
# Upgrade the user and assign default groups in a single
# transaction so neither change is visible without the other.
was_inactive = not user.is_active
with get_session_with_current_tenant() as sync_db:
sync_user = sync_db.query(User).filter(User.id == user.id).first() # type: ignore[arg-type]
if sync_user:
sync_user.is_verified = is_verified_by_default
sync_user.role = UserRole.BASIC
sync_user.account_type = AccountType.STANDARD
if was_inactive:
sync_user.is_active = True
assign_user_to_default_groups__no_commit(sync_db, sync_user)
sync_db.commit()
# Refresh the async user object so downstream code
# (e.g. oidc_expiry check) sees the updated fields.
user = await self.user_db.get(user.id) # type: ignore[arg-type]
# this is needed if an organization goes from `TRACK_EXTERNAL_IDP_EXPIRY=true` to `false`
# otherwise, the oidc expiry will always be old, and the user will never be able to login
@@ -795,6 +795,12 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
logger.exception("Error deleting anonymous user cookie")
tenant_id = CURRENT_TENANT_ID_CONTEXTVAR.get()
# Link the anonymous PostHog session to the identified user so that
# pre-login session recordings and events merge into one person profile.
if anon_id := mt_cloud_get_anon_id(request):
mt_cloud_alias(distinct_id=str(user.id), anonymous_id=anon_id)
mt_cloud_identify(
distinct_id=str(user.id),
properties={"email": user.email, "tenant_id": tenant_id},
@@ -818,6 +824,11 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
user_count = await get_user_count()
logger.debug(f"Current tenant user count: {user_count}")
# Link the anonymous PostHog session to the identified user so
# that pre-signup session recordings merge into one person profile.
if anon_id := mt_cloud_get_anon_id(request):
mt_cloud_alias(distinct_id=str(user.id), anonymous_id=anon_id)
# Ensure a PostHog person profile exists for this user.
mt_cloud_identify(
distinct_id=str(user.id),
@@ -837,6 +848,16 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
event=MilestoneRecordType.TENANT_CREATED,
)
# Assign user to the appropriate default group (Admin or Basic).
# Must happen inside the try block while tenant context is active,
# otherwise get_session_with_current_tenant() targets the wrong schema.
is_admin = user_count == 1 or user.email in get_default_admin_user_emails()
with get_session_with_current_tenant() as db_session:
assign_user_to_default_groups__no_commit(
db_session, user, is_admin=is_admin
)
db_session.commit()
finally:
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
@@ -846,9 +867,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
attribute="get_marketing_posthog_cookie_name",
noop_return_value=None,
)
parse_marketing_cookie = fetch_ee_implementation_or_noop(
parse_posthog_cookie = fetch_ee_implementation_or_noop(
module="onyx.utils.posthog_client",
attribute="parse_marketing_cookie",
attribute="parse_posthog_cookie",
noop_return_value=None,
)
capture_and_sync_with_alternate_posthog = fetch_ee_implementation_or_noop(
@@ -862,7 +883,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
and user_count is not None
and (marketing_cookie_name := get_marketing_posthog_cookie_name())
and (marketing_cookie_value := request.cookies.get(marketing_cookie_name))
and (parsed_cookie := parse_marketing_cookie(marketing_cookie_value))
and (parsed_cookie := parse_posthog_cookie(marketing_cookie_value))
):
marketing_anonymous_id = parsed_cookie["distinct_id"]
@@ -1555,6 +1576,7 @@ def get_anonymous_user() -> User:
is_verified=True,
is_superuser=False,
role=UserRole.LIMITED,
account_type=AccountType.ANONYMOUS,
use_memories=False,
enable_memory_tool=False,
)

View File

@@ -20,6 +20,7 @@ from sentry_sdk.integrations.celery import CeleryIntegration
from sqlalchemy import text
from sqlalchemy.orm import Session
from onyx import __version__
from onyx.background.celery.apps.task_formatters import CeleryTaskColoredFormatter
from onyx.background.celery.apps.task_formatters import CeleryTaskPlainFormatter
from onyx.background.celery.celery_utils import celery_is_worker_primary
@@ -65,6 +66,7 @@ if SENTRY_DSN:
dsn=SENTRY_DSN,
integrations=[CeleryIntegration()],
traces_sample_rate=0.1,
release=__version__,
)
logger.info("Sentry initialized")
else:
@@ -515,7 +517,8 @@ def reset_tenant_id(
def wait_for_vespa_or_shutdown(
sender: Any, **kwargs: Any # noqa: ARG001
sender: Any, # noqa: ARG001
**kwargs: Any, # noqa: ARG001
) -> None: # noqa: ARG001
"""Waits for Vespa to become ready subject to a timeout.
Raises WorkerShutdown if the timeout is reached."""

View File

@@ -13,6 +13,14 @@ from celery.signals import worker_shutdown
import onyx.background.celery.apps.app_base as app_base
from onyx.configs.constants import POSTGRES_CELERY_WORKER_DOCFETCHING_APP_NAME
from onyx.db.engine.sql_engine import SqlEngine
from onyx.server.metrics.celery_task_metrics import on_celery_task_postrun
from onyx.server.metrics.celery_task_metrics import on_celery_task_prerun
from onyx.server.metrics.celery_task_metrics import on_celery_task_rejected
from onyx.server.metrics.celery_task_metrics import on_celery_task_retry
from onyx.server.metrics.celery_task_metrics import on_celery_task_revoked
from onyx.server.metrics.indexing_task_metrics import on_indexing_task_postrun
from onyx.server.metrics.indexing_task_metrics import on_indexing_task_prerun
from onyx.server.metrics.metrics_server import start_metrics_server
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
@@ -34,6 +42,8 @@ def on_task_prerun(
**kwds: Any,
) -> None:
app_base.on_task_prerun(sender, task_id, task, args, kwargs, **kwds)
on_celery_task_prerun(task_id, task)
on_indexing_task_prerun(task_id, task, kwargs)
@signals.task_postrun.connect
@@ -48,6 +58,36 @@ def on_task_postrun(
**kwds: Any,
) -> None:
app_base.on_task_postrun(sender, task_id, task, args, kwargs, retval, state, **kwds)
on_celery_task_postrun(task_id, task, state)
on_indexing_task_postrun(task_id, task, kwargs, state)
@signals.task_retry.connect
def on_task_retry(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
# task_retry signal doesn't pass task_id in kwargs; get it from
# the sender (the task instance) via sender.request.id.
task_id = getattr(getattr(sender, "request", None), "id", None)
on_celery_task_retry(task_id, sender)
@signals.task_revoked.connect
def on_task_revoked(sender: Any | None = None, **kwargs: Any) -> None:
task_name = getattr(sender, "name", None) or str(sender)
on_celery_task_revoked(kwargs.get("task_id"), task_name)
@signals.task_rejected.connect
def on_task_rejected(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
# task_rejected sends the Consumer as sender, not the task instance.
# The task name must be extracted from the Celery message headers.
message = kwargs.get("message")
task_name: str | None = None
if message is not None:
headers = getattr(message, "headers", None) or {}
task_name = headers.get("task")
if task_name is None:
task_name = "unknown"
on_celery_task_rejected(None, task_name)
@celeryd_init.connect
@@ -76,6 +116,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
start_metrics_server("docfetching")
app_base.on_worker_ready(sender, **kwargs)

View File

@@ -14,6 +14,14 @@ from celery.signals import worker_shutdown
import onyx.background.celery.apps.app_base as app_base
from onyx.configs.constants import POSTGRES_CELERY_WORKER_DOCPROCESSING_APP_NAME
from onyx.db.engine.sql_engine import SqlEngine
from onyx.server.metrics.celery_task_metrics import on_celery_task_postrun
from onyx.server.metrics.celery_task_metrics import on_celery_task_prerun
from onyx.server.metrics.celery_task_metrics import on_celery_task_rejected
from onyx.server.metrics.celery_task_metrics import on_celery_task_retry
from onyx.server.metrics.celery_task_metrics import on_celery_task_revoked
from onyx.server.metrics.indexing_task_metrics import on_indexing_task_postrun
from onyx.server.metrics.indexing_task_metrics import on_indexing_task_prerun
from onyx.server.metrics.metrics_server import start_metrics_server
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
@@ -35,6 +43,8 @@ def on_task_prerun(
**kwds: Any,
) -> None:
app_base.on_task_prerun(sender, task_id, task, args, kwargs, **kwds)
on_celery_task_prerun(task_id, task)
on_indexing_task_prerun(task_id, task, kwargs)
@signals.task_postrun.connect
@@ -49,6 +59,36 @@ def on_task_postrun(
**kwds: Any,
) -> None:
app_base.on_task_postrun(sender, task_id, task, args, kwargs, retval, state, **kwds)
on_celery_task_postrun(task_id, task, state)
on_indexing_task_postrun(task_id, task, kwargs, state)
@signals.task_retry.connect
def on_task_retry(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
# task_retry signal doesn't pass task_id in kwargs; get it from
# the sender (the task instance) via sender.request.id.
task_id = getattr(getattr(sender, "request", None), "id", None)
on_celery_task_retry(task_id, sender)
@signals.task_revoked.connect
def on_task_revoked(sender: Any | None = None, **kwargs: Any) -> None:
task_name = getattr(sender, "name", None) or str(sender)
on_celery_task_revoked(kwargs.get("task_id"), task_name)
@signals.task_rejected.connect
def on_task_rejected(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
# task_rejected sends the Consumer as sender, not the task instance.
# The task name must be extracted from the Celery message headers.
message = kwargs.get("message")
task_name: str | None = None
if message is not None:
headers = getattr(message, "headers", None) or {}
task_name = headers.get("task")
if task_name is None:
task_name = "unknown"
on_celery_task_rejected(None, task_name)
@celeryd_init.connect
@@ -82,6 +122,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
start_metrics_server("docprocessing")
app_base.on_worker_ready(sender, **kwargs)
@@ -90,6 +131,12 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
app_base.on_worker_shutdown(sender, **kwargs)
# Note: worker_process_init only fires in prefork pool mode. Docprocessing uses
# worker_pool="threads" (see configs/docprocessing.py), so this handler is
# effectively a no-op in normal operation. It remains as a safety net in case
# the pool type is ever changed to prefork. Prometheus metrics are safe in
# thread-pool mode since all threads share the same process memory and can
# update the same Counter/Gauge/Histogram objects directly.
@worker_process_init.connect
def init_worker(**kwargs: Any) -> None: # noqa: ARG001
SqlEngine.reset_engine()

View File

@@ -54,8 +54,14 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
app_base.on_celeryd_init(sender, conf, **kwargs)
# Set by on_worker_init so on_worker_ready knows whether to start the server.
_prometheus_collectors_ok: bool = False
@worker_init.connect
def on_worker_init(sender: Any, **kwargs: Any) -> None:
global _prometheus_collectors_ok
logger.info("worker_init signal received.")
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")
@@ -65,6 +71,8 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
_prometheus_collectors_ok = _setup_prometheus_collectors(sender)
# Less startup checks in multi-tenant case
if MULTI_TENANT:
return
@@ -72,8 +80,37 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
app_base.on_secondary_worker_init(sender, **kwargs)
def _setup_prometheus_collectors(sender: Any) -> bool:
"""Register Prometheus collectors that need Redis/DB access.
Passes the Celery app so the queue depth collector can obtain a fresh
broker Redis client on each scrape (rather than holding a stale reference).
Returns True if registration succeeded, False otherwise.
"""
try:
from onyx.server.metrics.indexing_pipeline_setup import (
setup_indexing_pipeline_metrics,
)
setup_indexing_pipeline_metrics(sender.app)
logger.info("Prometheus indexing pipeline collectors registered")
return True
except Exception:
logger.exception("Failed to register Prometheus indexing pipeline collectors")
return False
@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
if _prometheus_collectors_ok:
from onyx.server.metrics.metrics_server import start_metrics_server
start_metrics_server("monitoring")
else:
logger.warning(
"Skipping Prometheus metrics server — collector registration failed"
)
app_base.on_worker_ready(sender, **kwargs)

View File

@@ -317,7 +317,6 @@ celery_app.autodiscover_tasks(
"onyx.background.celery.tasks.docprocessing",
"onyx.background.celery.tasks.evals",
"onyx.background.celery.tasks.hierarchyfetching",
"onyx.background.celery.tasks.hooks",
"onyx.background.celery.tasks.periodic",
"onyx.background.celery.tasks.pruning",
"onyx.background.celery.tasks.shared",

View File

@@ -1,5 +1,6 @@
# These are helper objects for tracking the keys we need to write in redis
import json
import threading
from typing import Any
from typing import cast
@@ -7,7 +8,59 @@ from celery import Celery
from redis import Redis
from onyx.background.celery.configs.base import CELERY_SEPARATOR
from onyx.configs.app_configs import REDIS_HEALTH_CHECK_INTERVAL
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import REDIS_SOCKET_KEEPALIVE_OPTIONS
_broker_client: Redis | None = None
_broker_url: str | None = None
_broker_client_lock = threading.Lock()
def celery_get_broker_client(app: Celery) -> Redis:
"""Return a shared Redis client connected to the Celery broker DB.
Uses a module-level singleton so all tasks on a worker share one
connection instead of creating a new one per call. The client
connects directly to the broker Redis DB (parsed from the broker URL).
Thread-safe via lock — safe for use in Celery thread-pool workers.
Usage:
r_celery = celery_get_broker_client(self.app)
length = celery_get_queue_length(queue, r_celery)
"""
global _broker_client, _broker_url
with _broker_client_lock:
url = app.conf.broker_url
if _broker_client is not None and _broker_url == url:
try:
_broker_client.ping()
return _broker_client
except Exception:
try:
_broker_client.close()
except Exception:
pass
_broker_client = None
elif _broker_client is not None:
try:
_broker_client.close()
except Exception:
pass
_broker_client = None
_broker_url = url
_broker_client = Redis.from_url(
url,
decode_responses=False,
health_check_interval=REDIS_HEALTH_CHECK_INTERVAL,
socket_keepalive=True,
socket_keepalive_options=REDIS_SOCKET_KEEPALIVE_OPTIONS,
retry_on_timeout=True,
)
return _broker_client
def celery_get_unacked_length(r: Redis) -> int:

View File

@@ -14,7 +14,6 @@ from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.hooks.utils import HOOKS_AVAILABLE
from shared_configs.configs import MULTI_TENANT
# choosing 15 minutes because it roughly gives us enough time to process many tasks
@@ -362,19 +361,6 @@ if not MULTI_TENANT:
tasks_to_schedule.extend(beat_task_templates)
if HOOKS_AVAILABLE:
tasks_to_schedule.append(
{
"name": "hook-execution-log-cleanup",
"task": OnyxCeleryTask.HOOK_EXECUTION_LOG_CLEANUP_TASK,
"schedule": timedelta(days=1),
"options": {
"priority": OnyxCeleryPriority.LOW,
"expires": BEAT_EXPIRES_DEFAULT,
},
}
)
def generate_cloud_tasks(
beat_tasks: list[dict], beat_templates: list[dict], beat_multiplier: float

View File

@@ -14,6 +14,7 @@ from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_redis import celery_get_queued_task_ids
from onyx.configs.app_configs import JOB_TIMEOUT
@@ -132,7 +133,6 @@ def revoke_tasks_blocking_deletion(
def check_for_connector_deletion_task(self: Task, *, tenant_id: str) -> bool | None:
r = get_redis_client()
r_replica = get_redis_replica_client()
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_CONNECTOR_DELETION_BEAT_LOCK,
@@ -149,6 +149,7 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str) -> bool | N
if not r.exists(OnyxRedisSignals.BLOCK_VALIDATE_CONNECTOR_DELETION_FENCES):
# clear fences that don't have associated celery tasks in progress
try:
r_celery = celery_get_broker_client(self.app)
validate_connector_deletion_fences(
tenant_id, r, r_replica, r_celery, lock_beat
)

View File

@@ -9,6 +9,7 @@ from celery import Celery
from celery import shared_task
from celery import Task
from onyx import __version__
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.memory_monitoring import emit_process_memory
from onyx.background.celery.tasks.docprocessing.heartbeat import start_heartbeat
@@ -137,6 +138,7 @@ def _docfetching_task(
sentry_sdk.init(
dsn=SENTRY_DSN,
traces_sample_rate=0.1,
release=__version__,
)
logger.info("Sentry initialized")
else:

View File

@@ -22,6 +22,7 @@ from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
from onyx.background.celery.memory_monitoring import emit_process_memory
@@ -318,6 +319,11 @@ def monitor_indexing_attempt_progress(
)
current_db_time = get_db_current_time(db_session)
total_batches: int | str = (
coordination_status.total_batches
if coordination_status.total_batches is not None
else "?"
)
if coordination_status.found:
task_logger.info(
f"Indexing attempt progress: "
@@ -325,7 +331,7 @@ def monitor_indexing_attempt_progress(
f"cc_pair={attempt.connector_credential_pair_id} "
f"search_settings={attempt.search_settings_id} "
f"completed_batches={coordination_status.completed_batches} "
f"total_batches={coordination_status.total_batches or '?'} "
f"total_batches={total_batches} "
f"total_docs={coordination_status.total_docs} "
f"total_failures={coordination_status.total_failures}"
f"elapsed={(current_db_time - attempt.time_created).seconds}"
@@ -409,7 +415,7 @@ def check_indexing_completion(
logger.info(
f"Indexing status: "
f"indexing_completed={indexing_completed} "
f"batches_processed={batches_processed}/{batches_total or '?'} "
f"batches_processed={batches_processed}/{batches_total if batches_total is not None else '?'} "
f"total_docs={coordination_status.total_docs} "
f"total_chunks={coordination_status.total_chunks} "
f"total_failures={coordination_status.total_failures}"
@@ -449,7 +455,7 @@ def check_indexing_completion(
):
# Check if the task exists in the celery queue
# This handles the case where Redis dies after task creation but before task execution
redis_celery = task.app.broker_connection().channel().client # type: ignore
redis_celery = celery_get_broker_client(task.app)
task_exists = celery_find_task(
attempt.celery_task_id,
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING,

View File

@@ -1,6 +1,5 @@
import json
import time
from collections.abc import Callable
from datetime import timedelta
from itertools import islice
from typing import Any
@@ -19,6 +18,7 @@ from sqlalchemy import text
from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.memory_monitoring import emit_process_memory
@@ -698,31 +698,27 @@ def monitor_background_processes(self: Task, *, tenant_id: str) -> None:
return None
try:
# Get Redis client for Celery broker
redis_celery = self.app.broker_connection().channel().client # type: ignore
redis_std = get_redis_client()
# Define metric collection functions and their dependencies
metric_functions: list[Callable[[], list[Metric]]] = [
lambda: _collect_queue_metrics(redis_celery),
lambda: _collect_connector_metrics(db_session, redis_std),
lambda: _collect_sync_metrics(db_session, redis_std),
]
# Collect queue metrics with broker connection
r_celery = celery_get_broker_client(self.app)
queue_metrics = _collect_queue_metrics(r_celery)
# Collect and log each metric
# Collect remaining metrics (no broker connection needed)
with get_session_with_current_tenant() as db_session:
for metric_fn in metric_functions:
metrics = metric_fn()
for metric in metrics:
# double check to make sure we aren't double-emitting metrics
if metric.key is None or not _has_metric_been_emitted(
redis_std, metric.key
):
metric.log()
metric.emit(tenant_id)
all_metrics: list[Metric] = queue_metrics
all_metrics.extend(_collect_connector_metrics(db_session, redis_std))
all_metrics.extend(_collect_sync_metrics(db_session, redis_std))
if metric.key is not None:
_mark_metric_as_emitted(redis_std, metric.key)
for metric in all_metrics:
if metric.key is None or not _has_metric_been_emitted(
redis_std, metric.key
):
metric.log()
metric.emit(tenant_id)
if metric.key is not None:
_mark_metric_as_emitted(redis_std, metric.key)
task_logger.info("Successfully collected background metrics")
except SoftTimeLimitExceeded:
@@ -890,7 +886,7 @@ def monitor_celery_queues_helper(
) -> None:
"""A task to monitor all celery queue lengths."""
r_celery = task.app.broker_connection().channel().client # type: ignore
r_celery = celery_get_broker_client(task.app)
n_celery = celery_get_queue_length(OnyxCeleryQueues.PRIMARY, r_celery)
n_docfetching = celery_get_queue_length(
OnyxCeleryQueues.CONNECTOR_DOC_FETCHING, r_celery
@@ -1080,7 +1076,7 @@ def cloud_monitor_celery_pidbox(
num_deleted = 0
MAX_PIDBOX_IDLE = 24 * 3600 # 1 day in seconds
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
r_celery = celery_get_broker_client(self.app)
for key in r_celery.scan_iter("*.reply.celery.pidbox"):
key_bytes = cast(bytes, key)
key_str = key_bytes.decode("utf-8")

View File

@@ -17,6 +17,7 @@ from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_redis import celery_get_queued_task_ids
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
@@ -203,7 +204,6 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
r = get_redis_client()
r_replica = get_redis_replica_client()
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
lock_beat: RedisLock = r.lock(
OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK,
@@ -261,6 +261,7 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
r_celery = celery_get_broker_client(self.app)
validate_pruning_fences(tenant_id, r, r_replica, r_celery, lock_beat)
except Exception:
task_logger.exception("Exception while validating pruning fences")

View File

@@ -16,6 +16,7 @@ from sqlalchemy.orm import Session
from onyx.access.access import build_access_for_user_files
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_get_broker_client
from onyx.background.celery.celery_redis import celery_get_queue_length
from onyx.background.celery.celery_utils import httpx_init_vespa_pool
from onyx.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex
@@ -105,7 +106,7 @@ def _user_file_delete_queued_key(user_file_id: str | UUID) -> str:
def get_user_file_project_sync_queue_depth(celery_app: Celery) -> int:
redis_celery: Redis = celery_app.broker_connection().channel().client # type: ignore
redis_celery = celery_get_broker_client(celery_app)
return celery_get_queue_length(
OnyxCeleryQueues.USER_FILE_PROJECT_SYNC, redis_celery
)
@@ -238,7 +239,7 @@ def check_user_file_processing(self: Task, *, tenant_id: str) -> None:
skipped_guard = 0
try:
# --- Protection 1: queue depth backpressure ---
r_celery = self.app.broker_connection().channel().client # type: ignore
r_celery = celery_get_broker_client(self.app)
queue_len = celery_get_queue_length(
OnyxCeleryQueues.USER_FILE_PROCESSING, r_celery
)
@@ -591,7 +592,7 @@ def check_for_user_file_delete(self: Task, *, tenant_id: str) -> None:
# --- Protection 1: queue depth backpressure ---
# NOTE: must use the broker's Redis client (not redis_client) because
# Celery queues live on a separate Redis DB with CELERY_SEPARATOR keys.
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
r_celery = celery_get_broker_client(self.app)
queue_len = celery_get_queue_length(OnyxCeleryQueues.USER_FILE_DELETE, r_celery)
if queue_len > USER_FILE_DELETE_MAX_QUEUE_DEPTH:
task_logger.warning(

View File

@@ -8,6 +8,7 @@ from onyx.configs.constants import MessageType
from onyx.context.search.models import SearchDoc
from onyx.file_store.models import InMemoryChatFile
from onyx.server.query_and_chat.models import MessageResponseIDInfo
from onyx.server.query_and_chat.models import MultiModelMessageResponseIDInfo
from onyx.server.query_and_chat.streaming_models import CitationInfo
from onyx.server.query_and_chat.streaming_models import GeneratedImage
from onyx.server.query_and_chat.streaming_models import Packet
@@ -35,7 +36,13 @@ class CreateChatSessionID(BaseModel):
chat_session_id: UUID
AnswerStreamPart = Packet | MessageResponseIDInfo | StreamingError | CreateChatSessionID
AnswerStreamPart = (
Packet
| MessageResponseIDInfo
| MultiModelMessageResponseIDInfo
| StreamingError
| CreateChatSessionID
)
AnswerStream = Iterator[AnswerStreamPart]
@@ -177,8 +184,8 @@ class ExtractedContextFiles(BaseModel):
class SearchParams(BaseModel):
"""Resolved search filter IDs and search-tool usage for a chat turn."""
search_project_id: int | None
search_persona_id: int | None
project_id_filter: int | None
persona_id_filter: int | None
search_usage: SearchToolUsage

View File

@@ -59,6 +59,7 @@ from onyx.db.chat import create_new_chat_message
from onyx.db.chat import get_chat_session_by_id
from onyx.db.chat import get_or_create_root_message
from onyx.db.chat import reserve_message_id
from onyx.db.enums import HookPoint
from onyx.db.memory import get_memories
from onyx.db.models import ChatMessage
from onyx.db.models import ChatSession
@@ -68,11 +69,19 @@ from onyx.db.models import UserFile
from onyx.db.projects import get_user_files_from_project
from onyx.db.tools import get_tools
from onyx.deep_research.dr_loop import run_deep_research_llm_loop
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import log_onyx_error
from onyx.error_handling.exceptions import OnyxError
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_store.models import ChatFileType
from onyx.file_store.models import InMemoryChatFile
from onyx.file_store.utils import load_in_memory_chat_files
from onyx.file_store.utils import verify_user_files
from onyx.hooks.executor import execute_hook
from onyx.hooks.executor import HookSkipped
from onyx.hooks.executor import HookSoftFailed
from onyx.hooks.points.query_processing import QueryProcessingPayload
from onyx.hooks.points.query_processing import QueryProcessingResponse
from onyx.llm.factory import get_llm_for_persona
from onyx.llm.factory import get_llm_token_counter
from onyx.llm.interfaces import LLM
@@ -399,13 +408,13 @@ def determine_search_params(
"""
is_custom_persona = persona_id != DEFAULT_PERSONA_ID
search_project_id: int | None = None
search_persona_id: int | None = None
project_id_filter: int | None = None
persona_id_filter: int | None = None
if extracted_context_files.use_as_search_filter:
if is_custom_persona:
search_persona_id = persona_id
persona_id_filter = persona_id
else:
search_project_id = project_id
project_id_filter = project_id
search_usage = SearchToolUsage.AUTO
if not is_custom_persona and project_id:
@@ -418,12 +427,34 @@ def determine_search_params(
search_usage = SearchToolUsage.DISABLED
return SearchParams(
search_project_id=search_project_id,
search_persona_id=search_persona_id,
project_id_filter=project_id_filter,
persona_id_filter=persona_id_filter,
search_usage=search_usage,
)
def _resolve_query_processing_hook_result(
hook_result: QueryProcessingResponse | HookSkipped | HookSoftFailed,
message_text: str,
) -> str:
"""Apply the Query Processing hook result to the message text.
Returns the (possibly rewritten) message text, or raises OnyxError with
QUERY_REJECTED if the hook signals rejection (query is null or empty).
HookSkipped and HookSoftFailed are pass-throughs — the original text is
returned unchanged.
"""
if isinstance(hook_result, (HookSkipped, HookSoftFailed)):
return message_text
if not (hook_result.query and hook_result.query.strip()):
raise OnyxError(
OnyxErrorCode.QUERY_REJECTED,
hook_result.rejection_message
or "The hook extension for query processing did not return a valid query. No rejection reason was provided.",
)
return hook_result.query.strip()
def handle_stream_message_objects(
new_msg_req: SendMessageRequest,
user: User,
@@ -474,16 +505,24 @@ def handle_stream_message_objects(
db_session=db_session,
)
yield CreateChatSessionID(chat_session_id=chat_session.id)
chat_session = get_chat_session_by_id(
chat_session_id=chat_session.id,
user_id=user_id,
db_session=db_session,
eager_load_persona=True,
)
else:
chat_session = get_chat_session_by_id(
chat_session_id=new_msg_req.chat_session_id,
user_id=user_id,
db_session=db_session,
eager_load_persona=True,
)
persona = chat_session.persona
message_text = new_msg_req.message
user_identity = LLMUserIdentity(
user_id=llm_user_identifier, session_id=str(chat_session.id)
)
@@ -575,6 +614,28 @@ def handle_stream_message_objects(
if parent_message.message_type == MessageType.USER:
user_message = parent_message
else:
# New message — run the Query Processing hook before saving to DB.
# Skipped on regeneration: the message already exists and was accepted previously.
# Skip the hook for empty/whitespace-only messages — no meaningful query
# to process, and SendMessageRequest.message has no min_length guard.
if message_text.strip():
hook_result = execute_hook(
db_session=db_session,
hook_point=HookPoint.QUERY_PROCESSING,
payload=QueryProcessingPayload(
query=message_text,
# Pass None for anonymous users or authenticated users without an email
# (e.g. some SSO flows). QueryProcessingPayload.user_email is str | None,
# so None is accepted and serialised as null in both cases.
user_email=None if user.is_anonymous else user.email,
chat_session_id=str(chat_session.id),
).model_dump(),
response_type=QueryProcessingResponse,
)
message_text = _resolve_query_processing_hook_result(
hook_result, message_text
)
user_message = create_new_chat_message(
chat_session_id=chat_session.id,
parent_message=parent_message,
@@ -711,8 +772,8 @@ def handle_stream_message_objects(
llm=llm,
search_tool_config=SearchToolConfig(
user_selected_filters=new_msg_req.internal_search_filters,
project_id=search_params.search_project_id,
persona_id=search_params.search_persona_id,
project_id_filter=search_params.project_id_filter,
persona_id_filter=search_params.persona_id_filter,
bypass_acl=bypass_acl,
slack_context=slack_context,
enable_slack_search=_should_enable_slack_search(
@@ -914,6 +975,17 @@ def handle_stream_message_objects(
state_container=state_container,
)
except OnyxError as e:
if e.error_code is not OnyxErrorCode.QUERY_REJECTED:
log_onyx_error(e)
yield StreamingError(
error=e.detail,
error_code=e.error_code.code,
is_retryable=e.status_code >= 500,
)
db_session.rollback()
return
except ValueError as e:
logger.exception("Failed to process chat message.")

View File

@@ -44,6 +44,31 @@ SEND_USER_METADATA_TO_LLM_PROVIDER = (
# User Facing Features Configs
#####
BLURB_SIZE = 128 # Number Encoder Tokens included in the chunk blurb
# Hard ceiling for the admin-configurable file upload size (in MB).
# Self-hosted customers can raise or lower this via the environment variable.
_raw_max_upload_size_mb = int(os.environ.get("MAX_ALLOWED_UPLOAD_SIZE_MB", "250"))
if _raw_max_upload_size_mb < 0:
logger.warning(
"MAX_ALLOWED_UPLOAD_SIZE_MB=%d is negative; falling back to 250",
_raw_max_upload_size_mb,
)
_raw_max_upload_size_mb = 250
MAX_ALLOWED_UPLOAD_SIZE_MB = _raw_max_upload_size_mb
# Default fallback for the per-user file upload size limit (in MB) when no
# admin-configured value exists. Clamped to MAX_ALLOWED_UPLOAD_SIZE_MB at
# runtime so this never silently exceeds the hard ceiling.
_raw_default_upload_size_mb = int(
os.environ.get("DEFAULT_USER_FILE_MAX_UPLOAD_SIZE_MB", "100")
)
if _raw_default_upload_size_mb < 0:
logger.warning(
"DEFAULT_USER_FILE_MAX_UPLOAD_SIZE_MB=%d is negative; falling back to 100",
_raw_default_upload_size_mb,
)
_raw_default_upload_size_mb = 100
DEFAULT_USER_FILE_MAX_UPLOAD_SIZE_MB = _raw_default_upload_size_mb
GENERATIVE_MODEL_ACCESS_CHECK_FREQ = int(
os.environ.get("GENERATIVE_MODEL_ACCESS_CHECK_FREQ") or 86400
) # 1 day
@@ -61,17 +86,6 @@ CACHE_BACKEND = CacheBackendType(
os.environ.get("CACHE_BACKEND", CacheBackendType.REDIS)
)
# Maximum token count for a single uploaded file. Files exceeding this are rejected.
# Defaults to 100k tokens (or 10M when vector DB is disabled).
_DEFAULT_FILE_TOKEN_LIMIT = 10_000_000 if DISABLE_VECTOR_DB else 100_000
FILE_TOKEN_COUNT_THRESHOLD = int(
os.environ.get("FILE_TOKEN_COUNT_THRESHOLD", str(_DEFAULT_FILE_TOKEN_LIMIT))
)
# Maximum upload size for a single user file (chat/projects) in MB.
USER_FILE_MAX_UPLOAD_SIZE_MB = int(os.environ.get("USER_FILE_MAX_UPLOAD_SIZE_MB") or 50)
USER_FILE_MAX_UPLOAD_SIZE_BYTES = USER_FILE_MAX_UPLOAD_SIZE_MB * 1024 * 1024
# If set to true, will show extra/uncommon connectors in the "Other" category
SHOW_EXTRA_CONNECTORS = os.environ.get("SHOW_EXTRA_CONNECTORS", "").lower() == "true"
@@ -332,6 +346,10 @@ OPENSEARCH_INDEX_NUM_REPLICAS: int | None = (
if os.environ.get("OPENSEARCH_INDEX_NUM_REPLICAS", None) is not None
else None
)
ONYX_SEARCH_UI_USES_OPENSEARCH_KEYWORD_SEARCH = (
os.environ.get("ONYX_SEARCH_UI_USES_OPENSEARCH_KEYWORD_SEARCH", "").lower()
== "true"
)
VESPA_HOST = os.environ.get("VESPA_HOST") or "localhost"
# NOTE: this is used if and only if the vespa config server is accessible via a
@@ -787,6 +805,10 @@ MINI_CHUNK_SIZE = 150
# This is the number of regular chunks per large chunk
LARGE_CHUNK_RATIO = 4
# The maximum number of chunks that can be held for 1 document processing batch
# The purpose of this is to set an upper bound on memory usage
MAX_CHUNKS_PER_DOC_BATCH = int(os.environ.get("MAX_CHUNKS_PER_DOC_BATCH") or 1000)
# Include the document level metadata in each chunk. If the metadata is too long, then it is thrown out
# We don't want the metadata to overwhelm the actual contents of the chunk
SKIP_METADATA_IN_CHUNK = os.environ.get("SKIP_METADATA_IN_CHUNK", "").lower() == "true"
@@ -1057,7 +1079,6 @@ POD_NAMESPACE = os.environ.get("POD_NAMESPACE")
DEV_MODE = os.environ.get("DEV_MODE", "").lower() == "true"
HOOK_ENABLED = os.environ.get("HOOK_ENABLED", "").lower() == "true"
INTEGRATION_TESTS_MODE = os.environ.get("INTEGRATION_TESTS_MODE", "").lower() == "true"

View File

@@ -24,11 +24,11 @@ CONTEXT_CHUNKS_BELOW = int(os.environ.get("CONTEXT_CHUNKS_BELOW") or 1)
LLM_SOCKET_READ_TIMEOUT = int(
os.environ.get("LLM_SOCKET_READ_TIMEOUT") or "60"
) # 60 seconds
# Weighting factor between Vector and Keyword Search, 1 for completely vector search
# Weighting factor between vector and keyword Search; 1 for completely vector
# search, 0 for keyword. Enforces a valid range of [0, 1]. A supplied value from
# the env outside of this range will be clipped to the respective end of the
# range. Defaults to 0.5.
HYBRID_ALPHA = max(0, min(1, float(os.environ.get("HYBRID_ALPHA") or 0.5)))
HYBRID_ALPHA_KEYWORD = max(
0, min(1, float(os.environ.get("HYBRID_ALPHA_KEYWORD") or 0.4))
)
# Weighting factor between Title and Content of documents during search, 1 for completely
# Title based. Default heavily favors Content because Title is also included at the top of
# Content. This is to avoid cases where the Content is very relevant but it may not be clear

View File

@@ -212,6 +212,7 @@ class DocumentSource(str, Enum):
PRODUCTBOARD = "productboard"
FILE = "file"
CODA = "coda"
CANVAS = "canvas"
NOTION = "notion"
ZULIP = "zulip"
LINEAR = "linear"
@@ -277,6 +278,7 @@ class NotificationType(str, Enum):
RELEASE_NOTES = "release_notes"
ASSISTANT_FILES_READY = "assistant_files_ready"
FEATURE_ANNOUNCEMENT = "feature_announcement"
USER_GROUP_ASSIGNMENT_FAILED = "user_group_assignment_failed"
class BlobType(str, Enum):
@@ -672,6 +674,7 @@ DocumentSourceDescription: dict[DocumentSource, str] = {
DocumentSource.SLAB: "slab data",
DocumentSource.PRODUCTBOARD: "productboard data (boards, etc.)",
DocumentSource.FILE: "files",
DocumentSource.CANVAS: "canvas lms - courses, pages, assignments, and announcements",
DocumentSource.CODA: "coda - team workspace with docs, tables, and pages",
DocumentSource.NOTION: "notion data - a workspace that combines note-taking, \
project management, and collaboration tools into a single, customizable platform",

View File

@@ -0,0 +1,32 @@
"""
Permissioning / AccessControl logic for Canvas courses.
CE stub — returns None (no permissions). The EE implementation is loaded
at runtime via ``fetch_versioned_implementation``.
"""
from collections.abc import Callable
from typing import cast
from onyx.access.models import ExternalAccess
from onyx.connectors.canvas.client import CanvasApiClient
from onyx.utils.variable_functionality import fetch_versioned_implementation
from onyx.utils.variable_functionality import global_version
def get_course_permissions(
canvas_client: CanvasApiClient,
course_id: int,
) -> ExternalAccess | None:
if not global_version.is_ee_version():
return None
ee_get_course_permissions = cast(
Callable[[CanvasApiClient, int], ExternalAccess | None],
fetch_versioned_implementation(
"onyx.external_permissions.canvas.access",
"get_course_permissions",
),
)
return ee_get_course_permissions(canvas_client, course_id)

View File

@@ -0,0 +1,212 @@
from __future__ import annotations
import logging
import re
from collections.abc import Iterator
from typing import Any
from urllib.parse import urlparse
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import (
rl_requests,
)
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
logger = logging.getLogger(__name__)
# Requests timeout in seconds.
_CANVAS_CALL_TIMEOUT: int = 30
_CANVAS_API_VERSION: str = "/api/v1"
# Matches the "next" URL in a Canvas Link header, e.g.:
# <https://canvas.example.com/api/v1/courses?page=2>; rel="next"
# Captures the URL inside the angle brackets.
_NEXT_LINK_PATTERN: re.Pattern[str] = re.compile(r'<([^>]+)>;\s*rel="next"')
_STATUS_TO_ERROR_CODE: dict[int, OnyxErrorCode] = {
401: OnyxErrorCode.CREDENTIAL_EXPIRED,
403: OnyxErrorCode.INSUFFICIENT_PERMISSIONS,
404: OnyxErrorCode.BAD_GATEWAY,
429: OnyxErrorCode.RATE_LIMITED,
}
def _error_code_for_status(status_code: int) -> OnyxErrorCode:
"""Map an HTTP status code to the appropriate OnyxErrorCode.
Expects a >= 400 status code. Known codes (401, 403, 404, 429) are
mapped to specific error codes; all other codes (unrecognised 4xx
and 5xx) map to BAD_GATEWAY as unexpected upstream errors.
"""
if status_code in _STATUS_TO_ERROR_CODE:
return _STATUS_TO_ERROR_CODE[status_code]
return OnyxErrorCode.BAD_GATEWAY
class CanvasApiClient:
def __init__(
self,
bearer_token: str,
canvas_base_url: str,
) -> None:
parsed_base = urlparse(canvas_base_url)
if not parsed_base.hostname:
raise ValueError("canvas_base_url must include a valid host")
if parsed_base.scheme != "https":
raise ValueError("canvas_base_url must use https")
self._bearer_token = bearer_token
self.base_url = (
canvas_base_url.rstrip("/").removesuffix(_CANVAS_API_VERSION)
+ _CANVAS_API_VERSION
)
# Hostname is already validated above; reuse parsed_base instead
# of re-parsing. Used by _parse_next_link to validate pagination URLs.
self._expected_host: str = parsed_base.hostname
def get(
self,
endpoint: str = "",
params: dict[str, Any] | None = None,
full_url: str | None = None,
) -> tuple[Any, str | None]:
"""Make a GET request to the Canvas API.
Returns a tuple of (json_body, next_url).
next_url is parsed from the Link header and is None if there are no more pages.
If full_url is provided, it is used directly (for following pagination links).
Security note: full_url must only be set to values returned by
``_parse_next_link``, which validates the host against the configured
Canvas base URL. Passing an arbitrary URL would leak the bearer token.
"""
# full_url is used when following pagination (Canvas returns the
# next-page URL in the Link header). For the first request we build
# the URL from the endpoint name instead.
url = full_url if full_url else self._build_url(endpoint)
headers = self._build_headers()
response = rl_requests.get(
url,
headers=headers,
params=params if not full_url else None,
timeout=_CANVAS_CALL_TIMEOUT,
)
try:
response_json = response.json()
except ValueError as e:
if response.status_code < 300:
raise OnyxError(
OnyxErrorCode.BAD_GATEWAY,
detail=f"Invalid JSON in Canvas response: {e}",
)
logger.warning(
"Failed to parse JSON from Canvas error response (status=%d): %s",
response.status_code,
e,
)
response_json = {}
if response.status_code >= 400:
# Try to extract the most specific error message from the
# Canvas response body. Canvas uses three different shapes
# depending on the endpoint and error type:
default_error: str = response.reason or f"HTTP {response.status_code}"
error = default_error
if isinstance(response_json, dict):
# Shape 1: {"error": {"message": "Not authorized"}}
error_field = response_json.get("error")
if isinstance(error_field, dict):
response_error = error_field.get("message", "")
if response_error:
error = response_error
# Shape 2: {"error": "Invalid access token"}
elif isinstance(error_field, str):
error = error_field
# Shape 3: {"errors": [{"message": "..."}]}
# Used for validation errors. Only use as fallback if
# we didn't already find a more specific message above.
if error == default_error:
errors_list = response_json.get("errors")
if isinstance(errors_list, list) and errors_list:
first_error = errors_list[0]
if isinstance(first_error, dict):
msg = first_error.get("message", "")
if msg:
error = msg
raise OnyxError(
_error_code_for_status(response.status_code),
detail=error,
status_code_override=response.status_code,
)
next_url = self._parse_next_link(response.headers.get("Link", ""))
return response_json, next_url
def _parse_next_link(self, link_header: str) -> str | None:
"""Extract the 'next' URL from a Canvas Link header.
Only returns URLs whose host matches the configured Canvas base URL
to prevent leaking the bearer token to arbitrary hosts.
"""
expected_host = self._expected_host
for match in _NEXT_LINK_PATTERN.finditer(link_header):
url = match.group(1)
parsed_url = urlparse(url)
if parsed_url.hostname != expected_host:
raise OnyxError(
OnyxErrorCode.BAD_GATEWAY,
detail=(
"Canvas pagination returned an unexpected host "
f"({parsed_url.hostname}); expected {expected_host}"
),
)
if parsed_url.scheme != "https":
raise OnyxError(
OnyxErrorCode.BAD_GATEWAY,
detail=(
"Canvas pagination link must use https, "
f"got {parsed_url.scheme!r}"
),
)
return url
return None
def _build_headers(self) -> dict[str, str]:
"""Return the Authorization header with the bearer token."""
return {"Authorization": f"Bearer {self._bearer_token}"}
def _build_url(self, endpoint: str) -> str:
"""Build a full Canvas API URL from an endpoint path.
Assumes endpoint is non-empty (e.g. ``"courses"``, ``"announcements"``).
Only called on a first request, endpoint must be set for first request.
Verify endpoint exists in case of future changes where endpoint might be optional.
Leading slashes are stripped to avoid double-slash in the result.
self.base_url is already normalized with no trailing slash.
"""
final_url = self.base_url
clean_endpoint = endpoint.lstrip("/")
if clean_endpoint:
final_url += "/" + clean_endpoint
return final_url
def paginate(
self,
endpoint: str,
params: dict[str, Any] | None = None,
) -> Iterator[list[Any]]:
"""Yield each page of results, following Link-header pagination.
Makes the first request with endpoint + params, then follows
next_url from Link headers for subsequent pages.
"""
response, next_url = self.get(endpoint, params=params)
while True:
if not response:
break
yield response
if not next_url:
break
response, next_url = self.get(full_url=next_url)

View File

@@ -0,0 +1,458 @@
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import cast
from typing import Literal
from typing import NoReturn
from typing import TypeAlias
from pydantic import BaseModel
from retry import retry
from typing_extensions import override
from onyx.access.models import ExternalAccess
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.canvas.access import get_course_permissions
from onyx.connectors.canvas.client import CanvasApiClient
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedValidationError
from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import ImageSection
from onyx.connectors.models import TextSection
from onyx.error_handling.exceptions import OnyxError
from onyx.file_processing.html_utils import parse_html_page_basic
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.logger import setup_logger
logger = setup_logger()
def _handle_canvas_api_error(e: OnyxError) -> NoReturn:
"""Map Canvas API errors to connector framework exceptions."""
if e.status_code == 401:
raise CredentialExpiredError(
"Canvas API token is invalid or expired (HTTP 401)."
)
elif e.status_code == 403:
raise InsufficientPermissionsError(
"Canvas API token does not have sufficient permissions (HTTP 403)."
)
elif e.status_code == 429:
raise ConnectorValidationError(
"Canvas rate-limit exceeded (HTTP 429). Please try again later."
)
elif e.status_code >= 500:
raise UnexpectedValidationError(
f"Unexpected Canvas HTTP error (status={e.status_code}): {e}"
)
else:
raise ConnectorValidationError(
f"Canvas API error (status={e.status_code}): {e}"
)
class CanvasCourse(BaseModel):
id: int
name: str | None = None
course_code: str | None = None
created_at: str | None = None
workflow_state: str | None = None
@classmethod
def from_api(cls, payload: dict[str, Any]) -> "CanvasCourse":
return cls(
id=payload["id"],
name=payload.get("name"),
course_code=payload.get("course_code"),
created_at=payload.get("created_at"),
workflow_state=payload.get("workflow_state"),
)
class CanvasPage(BaseModel):
page_id: int
url: str
title: str
body: str | None = None
created_at: str | None = None
updated_at: str | None = None
course_id: int
@classmethod
def from_api(cls, payload: dict[str, Any], course_id: int) -> "CanvasPage":
return cls(
page_id=payload["page_id"],
url=payload["url"],
title=payload["title"],
body=payload.get("body"),
created_at=payload.get("created_at"),
updated_at=payload.get("updated_at"),
course_id=course_id,
)
class CanvasAssignment(BaseModel):
id: int
name: str
description: str | None = None
html_url: str
course_id: int
created_at: str | None = None
updated_at: str | None = None
due_at: str | None = None
@classmethod
def from_api(cls, payload: dict[str, Any], course_id: int) -> "CanvasAssignment":
return cls(
id=payload["id"],
name=payload["name"],
description=payload.get("description"),
html_url=payload["html_url"],
course_id=course_id,
created_at=payload.get("created_at"),
updated_at=payload.get("updated_at"),
due_at=payload.get("due_at"),
)
class CanvasAnnouncement(BaseModel):
id: int
title: str
message: str | None = None
html_url: str
posted_at: str | None = None
course_id: int
@classmethod
def from_api(cls, payload: dict[str, Any], course_id: int) -> "CanvasAnnouncement":
return cls(
id=payload["id"],
title=payload["title"],
message=payload.get("message"),
html_url=payload["html_url"],
posted_at=payload.get("posted_at"),
course_id=course_id,
)
CanvasStage: TypeAlias = Literal["pages", "assignments", "announcements"]
class CanvasConnectorCheckpoint(ConnectorCheckpoint):
"""Checkpoint state for resumable Canvas indexing.
Fields:
course_ids: Materialized list of course IDs to process.
current_course_index: Index into course_ids for current course.
stage: Which item type we're processing for the current course.
next_url: Pagination cursor within the current stage. None means
start from the first page; a URL means resume from that page.
Invariant:
If current_course_index is incremented, stage must be reset to
"pages" and next_url must be reset to None.
"""
course_ids: list[int] = []
current_course_index: int = 0
stage: CanvasStage = "pages"
next_url: str | None = None
def advance_course(self) -> None:
"""Move to the next course and reset within-course state."""
self.current_course_index += 1
self.stage = "pages"
self.next_url = None
class CanvasConnector(
CheckpointedConnectorWithPermSync[CanvasConnectorCheckpoint],
SlimConnectorWithPermSync,
):
def __init__(
self,
canvas_base_url: str,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.canvas_base_url = canvas_base_url.rstrip("/").removesuffix("/api/v1")
self.batch_size = batch_size
self._canvas_client: CanvasApiClient | None = None
self._course_permissions_cache: dict[int, ExternalAccess | None] = {}
@property
def canvas_client(self) -> CanvasApiClient:
if self._canvas_client is None:
raise ConnectorMissingCredentialError("Canvas")
return self._canvas_client
def _get_course_permissions(self, course_id: int) -> ExternalAccess | None:
"""Get course permissions with caching."""
if course_id not in self._course_permissions_cache:
self._course_permissions_cache[course_id] = get_course_permissions(
canvas_client=self.canvas_client,
course_id=course_id,
)
return self._course_permissions_cache[course_id]
@retry(tries=3, delay=1, backoff=2)
def _list_courses(self) -> list[CanvasCourse]:
"""Fetch all courses accessible to the authenticated user."""
logger.debug("Fetching Canvas courses")
courses: list[CanvasCourse] = []
for page in self.canvas_client.paginate(
"courses", params={"per_page": "100", "state[]": "available"}
):
courses.extend(CanvasCourse.from_api(c) for c in page)
return courses
@retry(tries=3, delay=1, backoff=2)
def _list_pages(self, course_id: int) -> list[CanvasPage]:
"""Fetch all pages for a given course."""
logger.debug(f"Fetching pages for course {course_id}")
pages: list[CanvasPage] = []
for page in self.canvas_client.paginate(
f"courses/{course_id}/pages",
params={"per_page": "100", "include[]": "body", "published": "true"},
):
pages.extend(CanvasPage.from_api(p, course_id=course_id) for p in page)
return pages
@retry(tries=3, delay=1, backoff=2)
def _list_assignments(self, course_id: int) -> list[CanvasAssignment]:
"""Fetch all assignments for a given course."""
logger.debug(f"Fetching assignments for course {course_id}")
assignments: list[CanvasAssignment] = []
for page in self.canvas_client.paginate(
f"courses/{course_id}/assignments",
params={"per_page": "100", "published": "true"},
):
assignments.extend(
CanvasAssignment.from_api(a, course_id=course_id) for a in page
)
return assignments
@retry(tries=3, delay=1, backoff=2)
def _list_announcements(self, course_id: int) -> list[CanvasAnnouncement]:
"""Fetch all announcements for a given course."""
logger.debug(f"Fetching announcements for course {course_id}")
announcements: list[CanvasAnnouncement] = []
for page in self.canvas_client.paginate(
"announcements",
params={
"per_page": "100",
"context_codes[]": f"course_{course_id}",
"active_only": "true",
},
):
announcements.extend(
CanvasAnnouncement.from_api(a, course_id=course_id) for a in page
)
return announcements
def _build_document(
self,
doc_id: str,
link: str,
text: str,
semantic_identifier: str,
doc_updated_at: datetime | None,
course_id: int,
doc_type: str,
) -> Document:
"""Build a Document with standard Canvas fields."""
return Document(
id=doc_id,
sections=cast(
list[TextSection | ImageSection],
[TextSection(link=link, text=text)],
),
source=DocumentSource.CANVAS,
semantic_identifier=semantic_identifier,
doc_updated_at=doc_updated_at,
metadata={"course_id": str(course_id), "type": doc_type},
)
def _convert_page_to_document(self, page: CanvasPage) -> Document:
"""Convert a Canvas page to a Document."""
link = f"{self.canvas_base_url}/courses/{page.course_id}/pages/{page.url}"
text_parts = [page.title]
body_text = parse_html_page_basic(page.body) if page.body else ""
if body_text:
text_parts.append(body_text)
doc_updated_at = (
datetime.fromisoformat(page.updated_at.replace("Z", "+00:00")).astimezone(
timezone.utc
)
if page.updated_at
else None
)
document = self._build_document(
doc_id=f"canvas-page-{page.course_id}-{page.page_id}",
link=link,
text="\n\n".join(text_parts),
semantic_identifier=page.title or f"Page {page.page_id}",
doc_updated_at=doc_updated_at,
course_id=page.course_id,
doc_type="page",
)
return document
def _convert_assignment_to_document(self, assignment: CanvasAssignment) -> Document:
"""Convert a Canvas assignment to a Document."""
text_parts = [assignment.name]
desc_text = (
parse_html_page_basic(assignment.description)
if assignment.description
else ""
)
if desc_text:
text_parts.append(desc_text)
if assignment.due_at:
due_dt = datetime.fromisoformat(
assignment.due_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
text_parts.append(f"Due: {due_dt.strftime('%B %d, %Y %H:%M UTC')}")
doc_updated_at = (
datetime.fromisoformat(
assignment.updated_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
if assignment.updated_at
else None
)
document = self._build_document(
doc_id=f"canvas-assignment-{assignment.course_id}-{assignment.id}",
link=assignment.html_url,
text="\n\n".join(text_parts),
semantic_identifier=assignment.name or f"Assignment {assignment.id}",
doc_updated_at=doc_updated_at,
course_id=assignment.course_id,
doc_type="assignment",
)
return document
def _convert_announcement_to_document(
self, announcement: CanvasAnnouncement
) -> Document:
"""Convert a Canvas announcement to a Document."""
text_parts = [announcement.title]
msg_text = (
parse_html_page_basic(announcement.message) if announcement.message else ""
)
if msg_text:
text_parts.append(msg_text)
doc_updated_at = (
datetime.fromisoformat(
announcement.posted_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
if announcement.posted_at
else None
)
document = self._build_document(
doc_id=f"canvas-announcement-{announcement.course_id}-{announcement.id}",
link=announcement.html_url,
text="\n\n".join(text_parts),
semantic_identifier=announcement.title or f"Announcement {announcement.id}",
doc_updated_at=doc_updated_at,
course_id=announcement.course_id,
doc_type="announcement",
)
return document
@override
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
"""Load and validate Canvas credentials."""
access_token = credentials.get("canvas_access_token")
if not access_token:
raise ConnectorMissingCredentialError("Canvas")
try:
client = CanvasApiClient(
bearer_token=access_token,
canvas_base_url=self.canvas_base_url,
)
client.get("courses", params={"per_page": "1"})
except ValueError as e:
raise ConnectorValidationError(f"Invalid Canvas base URL: {e}")
except OnyxError as e:
_handle_canvas_api_error(e)
self._canvas_client = client
return None
@override
def validate_connector_settings(self) -> None:
"""Validate Canvas connector settings by testing API access."""
try:
self.canvas_client.get("courses", params={"per_page": "1"})
logger.info("Canvas connector settings validated successfully")
except OnyxError as e:
_handle_canvas_api_error(e)
except ConnectorMissingCredentialError:
raise
except Exception as exc:
raise UnexpectedValidationError(
f"Unexpected error during Canvas settings validation: {exc}"
)
@override
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CanvasConnectorCheckpoint,
) -> CheckpointOutput[CanvasConnectorCheckpoint]:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def load_from_checkpoint_with_perm_sync(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CanvasConnectorCheckpoint,
) -> CheckpointOutput[CanvasConnectorCheckpoint]:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def build_dummy_checkpoint(self) -> CanvasConnectorCheckpoint:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def validate_checkpoint_json(
self, checkpoint_json: str
) -> CanvasConnectorCheckpoint:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
# TODO(benwu408): implemented in PR4 (perm sync)
raise NotImplementedError

View File

@@ -890,8 +890,8 @@ class ConfluenceConnector(
def _retrieve_all_slim_docs(
self,
start: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
end: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
include_permissions: bool = True,
) -> GenerateSlimDocumentOutput:
@@ -915,8 +915,8 @@ class ConfluenceConnector(
self.confluence_client, doc_id, restrictions, ancestors
) or space_level_access_info.get(page_space_key)
# Query pages
page_query = self.base_cql_page_query + self.cql_label_filter
# Query pages (with optional time filtering for indexing_start)
page_query = self._construct_page_cql_query(start, end)
for page in self.confluence_client.cql_paginate_all_expansions(
cql=page_query,
expand=restrictions_expand,
@@ -950,7 +950,9 @@ class ConfluenceConnector(
# Query attachments for each page
page_hierarchy_node_yielded = False
attachment_query = self._construct_attachment_query(_get_page_id(page))
attachment_query = self._construct_attachment_query(
_get_page_id(page), start, end
)
for attachment in self.confluence_client.cql_paginate_all_expansions(
cql=attachment_query,
expand=restrictions_expand,

View File

@@ -123,7 +123,7 @@ class OnyxConfluence:
self.shared_base_kwargs: dict[str, str | int | bool] = {
"api_version": "cloud" if is_cloud else "latest",
"backoff_and_retry": True,
"backoff_and_retry": False,
"cloud": is_cloud,
}
if timeout:
@@ -456,7 +456,7 @@ class OnyxConfluence:
return attr(*args, **kwargs)
except HTTPError as e:
delay_until = _handle_http_error(e, attempt)
delay_until = _handle_http_error(e, attempt, MAX_RETRIES)
logger.warning(
f"HTTPError in confluence call. Retrying in {delay_until} seconds..."
)

View File

@@ -363,7 +363,7 @@ def handle_confluence_rate_limit(confluence_call: F) -> F:
# and applying our own retries in a more specific set of circumstances
return confluence_call(*args, **kwargs)
except requests.HTTPError as e:
delay_until = _handle_http_error(e, attempt)
delay_until = _handle_http_error(e, attempt, MAX_RETRIES)
logger.warning(
f"HTTPError in confluence call. Retrying in {delay_until} seconds..."
)
@@ -384,7 +384,7 @@ def handle_confluence_rate_limit(confluence_call: F) -> F:
return cast(F, wrapped_call)
def _handle_http_error(e: requests.HTTPError, attempt: int) -> int:
def _handle_http_error(e: requests.HTTPError, attempt: int, max_retries: int) -> int:
MIN_DELAY = 2
MAX_DELAY = 60
STARTING_DELAY = 5
@@ -408,6 +408,17 @@ def _handle_http_error(e: requests.HTTPError, attempt: int) -> int:
raise e
if e.response.status_code >= 500:
if attempt >= max_retries - 1:
raise e
delay = min(STARTING_DELAY * (BACKOFF**attempt), MAX_DELAY)
logger.warning(
f"Server error {e.response.status_code}. "
f"Retrying in {delay} seconds (attempt {attempt + 1})..."
)
return math.ceil(time.monotonic() + delay)
if (
e.response.status_code != 429
and RATE_LIMIT_MESSAGE_LOWERCASE not in e.response.text.lower()

View File

@@ -11,11 +11,13 @@ from discord import Client
from discord.channel import TextChannel
from discord.channel import Thread
from discord.enums import MessageType
from discord.errors import LoginFailure
from discord.flags import Intents
from discord.message import Message as DiscordMessage
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.exceptions import CredentialInvalidError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
@@ -209,8 +211,19 @@ def _manage_async_retrieval(
intents = Intents.default()
intents.message_content = True
async with Client(intents=intents) as discord_client:
asyncio.create_task(discord_client.start(token))
await discord_client.wait_until_ready()
start_task = asyncio.create_task(discord_client.start(token))
ready_task = asyncio.create_task(discord_client.wait_until_ready())
done, _ = await asyncio.wait(
{start_task, ready_task},
return_when=asyncio.FIRST_COMPLETED,
)
# start() runs indefinitely once connected, so it only lands
# in `done` when login/connection failed — propagate the error.
if start_task in done:
ready_task.cancel()
start_task.result()
filtered_channels: list[TextChannel] = await _fetch_filtered_channels(
discord_client=discord_client,
@@ -276,6 +289,19 @@ class DiscordConnector(PollConnector, LoadConnector):
self._discord_bot_token = credentials["discord_bot_token"]
return None
def validate_connector_settings(self) -> None:
loop = asyncio.new_event_loop()
try:
client = Client(intents=Intents.default())
try:
loop.run_until_complete(client.login(self.discord_bot_token))
except LoginFailure as e:
raise CredentialInvalidError(f"Invalid Discord bot token: {e}")
finally:
loop.run_until_complete(client.close())
finally:
loop.close()
def _manage_doc_batching(
self,
start: datetime | None = None,

View File

@@ -10,6 +10,7 @@ from datetime import timedelta
from datetime import timezone
from typing import Any
import requests
from jira import JIRA
from jira.exceptions import JIRAError
from jira.resources import Issue
@@ -239,29 +240,53 @@ def enhanced_search_ids(
)
def bulk_fetch_issues(
jira_client: JIRA, issue_ids: list[str], fields: str | None = None
) -> list[Issue]:
# TODO: move away from this jira library if they continue to not support
# the endpoints we need. Using private fields is not ideal, but
# is likely fine for now since we pin the library version
def _bulk_fetch_request(
jira_client: JIRA, issue_ids: list[str], fields: str | None
) -> list[dict[str, Any]]:
"""Raw POST to the bulkfetch endpoint. Returns the list of raw issue dicts."""
bulk_fetch_path = jira_client._get_url("issue/bulkfetch")
# Prepare the payload according to Jira API v3 specification
payload: dict[str, Any] = {"issueIdsOrKeys": issue_ids}
# Only restrict fields if specified, might want to explicitly do this in the future
# to avoid reading unnecessary data
payload["fields"] = fields.split(",") if fields else ["*all"]
resp = jira_client._session.post(bulk_fetch_path, json=payload)
return resp.json()["issues"]
def bulk_fetch_issues(
jira_client: JIRA, issue_ids: list[str], fields: str | None = None
) -> list[Issue]:
# TODO(evan): move away from this jira library if they continue to not support
# the endpoints we need. Using private fields is not ideal, but
# is likely fine for now since we pin the library version
try:
response = jira_client._session.post(bulk_fetch_path, json=payload).json()
raw_issues = _bulk_fetch_request(jira_client, issue_ids, fields)
except requests.exceptions.JSONDecodeError:
if len(issue_ids) <= 1:
logger.exception(
f"Jira bulk-fetch response for issue(s) {issue_ids} could not "
f"be decoded as JSON (response too large or truncated)."
)
raise
mid = len(issue_ids) // 2
logger.warning(
f"Jira bulk-fetch JSON decode failed for batch of {len(issue_ids)} issues. "
f"Splitting into sub-batches of {mid} and {len(issue_ids) - mid}."
)
left = bulk_fetch_issues(jira_client, issue_ids[:mid], fields)
right = bulk_fetch_issues(jira_client, issue_ids[mid:], fields)
return left + right
except Exception as e:
logger.error(f"Error fetching issues: {e}")
raise e
raise
return [
Issue(jira_client._options, jira_client._session, raw=issue)
for issue in response["issues"]
for issue in raw_issues
]

View File

@@ -53,7 +53,7 @@ class NotionPage(BaseModel):
id: str
created_time: str
last_edited_time: str
archived: bool
in_trash: bool
properties: dict[str, Any]
url: str
@@ -63,6 +63,13 @@ class NotionPage(BaseModel):
)
class NotionDataSource(BaseModel):
"""Represents a Notion Data Source within a database."""
id: str
name: str = ""
class NotionBlock(BaseModel):
"""Represents a Notion Block object"""
@@ -107,7 +114,7 @@ class NotionConnector(LoadConnector, PollConnector):
self.batch_size = batch_size
self.headers = {
"Content-Type": "application/json",
"Notion-Version": "2022-06-28",
"Notion-Version": "2026-03-11",
}
self.indexed_pages: set[str] = set()
self.root_page_id = root_page_id
@@ -127,6 +134,9 @@ class NotionConnector(LoadConnector, PollConnector):
# Maps child page IDs to their containing page ID (discovered in _read_blocks).
# Used to resolve block_id parent types to the actual containing page.
self._child_page_parent_map: dict[str, str] = {}
# Maps data_source_id -> database_id (populated in _read_pages_from_database).
# Used to resolve data_source_id parent types back to the database.
self._data_source_to_database_map: dict[str, str] = {}
@classmethod
@override
@@ -227,7 +237,11 @@ class NotionConnector(LoadConnector, PollConnector):
@retry(tries=3, delay=1, backoff=2)
def _fetch_database_as_page(self, database_id: str) -> NotionPage:
"""Attempt to fetch a database as a page."""
"""Attempt to fetch a database as a page.
Note: As of API 2025-09-03, database objects no longer include
`properties` (schema moved to individual data sources).
"""
logger.debug(f"Fetching database for ID '{database_id}' as a page")
database_url = f"https://api.notion.com/v1/databases/{database_id}"
res = rl_requests.get(
@@ -246,18 +260,52 @@ class NotionConnector(LoadConnector, PollConnector):
database_name[0].get("text", {}).get("content") if database_name else None
)
db_data.setdefault("properties", {})
return NotionPage(**db_data, database_name=database_name)
@retry(tries=3, delay=1, backoff=2)
def _fetch_database(
self, database_id: str, cursor: str | None = None
def _fetch_data_sources_for_database(
self, database_id: str
) -> list[NotionDataSource]:
"""Fetch the list of data sources for a database."""
logger.debug(f"Fetching data sources for database '{database_id}'")
res = rl_requests.get(
f"https://api.notion.com/v1/databases/{database_id}",
headers=self.headers,
timeout=_NOTION_CALL_TIMEOUT,
)
try:
res.raise_for_status()
except Exception as e:
if res.status_code in (403, 404):
logger.error(
f"Unable to access database with ID '{database_id}'. "
f"This is likely due to the database not being shared "
f"with the Onyx integration. Exact exception:\n{e}"
)
return []
logger.exception(f"Error fetching database - {res.json()}")
raise e
db_data = res.json()
data_sources = db_data.get("data_sources", [])
return [
NotionDataSource(id=ds["id"], name=ds.get("name", ""))
for ds in data_sources
if ds.get("id")
]
@retry(tries=3, delay=1, backoff=2)
def _fetch_data_source(
self, data_source_id: str, cursor: str | None = None
) -> dict[str, Any]:
"""Fetch a database from it's ID via the Notion API."""
logger.debug(f"Fetching database for ID '{database_id}'")
block_url = f"https://api.notion.com/v1/databases/{database_id}/query"
"""Query a data source via POST /v1/data_sources/{id}/query."""
logger.debug(f"Querying data source '{data_source_id}'")
url = f"https://api.notion.com/v1/data_sources/{data_source_id}/query"
body = None if not cursor else {"start_cursor": cursor}
res = rl_requests.post(
block_url,
url,
headers=self.headers,
json=body,
timeout=_NOTION_CALL_TIMEOUT,
@@ -265,25 +313,14 @@ class NotionConnector(LoadConnector, PollConnector):
try:
res.raise_for_status()
except Exception as e:
json_data = res.json()
code = json_data.get("code")
# Sep 3 2025 backend changed the error message for this case
# TODO: it is also now possible for there to be multiple data sources per database; at present we
# just don't handle that. We will need to upgrade the API to the current version + query the
# new data sources endpoint to handle that case correctly.
if code == "object_not_found" or (
code == "validation_error"
and "does not contain any data sources" in json_data.get("message", "")
):
# this happens when a database is not shared with the integration
# in this case, we should just ignore the database
if res.status_code in (403, 404):
logger.error(
f"Unable to access database with ID '{database_id}'. "
f"This is likely due to the database not being shared "
f"Unable to access data source with ID '{data_source_id}'. "
f"This is likely due to it not being shared "
f"with the Onyx integration. Exact exception:\n{e}"
)
return {"results": [], "next_cursor": None}
logger.exception(f"Error fetching database - {res.json()}")
logger.exception(f"Error querying data source - {res.json()}")
raise e
return res.json()
@@ -348,8 +385,9 @@ class NotionConnector(LoadConnector, PollConnector):
# Fallback to workspace if we don't know the parent
return self.workspace_id
elif parent_type == "data_source_id":
# Newer Notion API may use data_source_id for databases
return parent.get("database_id") or parent.get("data_source_id")
ds_id = parent.get("data_source_id")
if ds_id:
return self._data_source_to_database_map.get(ds_id, self.workspace_id)
elif parent_type in ["page_id", "database_id"]:
return parent.get(parent_type)
@@ -497,18 +535,32 @@ class NotionConnector(LoadConnector, PollConnector):
if db_node:
hierarchy_nodes.append(db_node)
cursor = None
while True:
data = self._fetch_database(database_id, cursor)
# Discover all data sources under this database, then query each one.
# Even legacy single-source databases have one entry in the array.
data_sources = self._fetch_data_sources_for_database(database_id)
if not data_sources:
logger.warning(
f"Database '{database_id}' returned zero data sources — "
f"no pages will be indexed from this database."
)
for ds in data_sources:
self._data_source_to_database_map[ds.id] = database_id
cursor = None
while True:
data = self._fetch_data_source(ds.id, cursor)
for result in data["results"]:
obj_id = result["id"]
obj_type = result["object"]
text = self._properties_to_str(result.get("properties", {}))
if text:
result_blocks.append(NotionBlock(id=obj_id, text=text, prefix="\n"))
for result in data["results"]:
obj_id = result["id"]
obj_type = result["object"]
text = self._properties_to_str(result.get("properties", {}))
if text:
result_blocks.append(
NotionBlock(id=obj_id, text=text, prefix="\n")
)
if not self.recursive_index_enabled:
continue
if self.recursive_index_enabled:
if obj_type == "page":
logger.debug(
f"Found page with ID '{obj_id}' in database '{database_id}'"
@@ -518,7 +570,6 @@ class NotionConnector(LoadConnector, PollConnector):
logger.debug(
f"Found database with ID '{obj_id}' in database '{database_id}'"
)
# Get nested database name from properties if available
nested_db_title = result.get("title", [])
nested_db_name = None
if nested_db_title and len(nested_db_title) > 0:
@@ -533,10 +584,10 @@ class NotionConnector(LoadConnector, PollConnector):
result_pages.extend(nested_output.child_page_ids)
hierarchy_nodes.extend(nested_output.hierarchy_nodes)
if data["next_cursor"] is None:
break
if data["next_cursor"] is None:
break
cursor = data["next_cursor"]
cursor = data["next_cursor"]
return BlockReadOutput(
blocks=result_blocks,
@@ -807,36 +858,55 @@ class NotionConnector(LoadConnector, PollConnector):
def _yield_database_hierarchy_nodes(
self,
) -> Generator[HierarchyNode | Document, None, None]:
"""Search for all databases and yield hierarchy nodes for each.
"""Search for all data sources and yield hierarchy nodes for their parent databases.
This must be called BEFORE page indexing so that database hierarchy nodes
exist when pages inside databases reference them as parents.
With the new API, search returns data source objects instead of databases.
Multiple data sources can share the same parent database, so we use
database_id as the hierarchy node key and deduplicate via
_maybe_yield_hierarchy_node.
"""
query_dict: dict[str, Any] = {
"filter": {"property": "object", "value": "database"},
"filter": {"property": "object", "value": "data_source"},
"page_size": _NOTION_PAGE_SIZE,
}
pages_seen = 0
while pages_seen < _MAX_PAGES:
db_res = self._search_notion(query_dict)
for db in db_res.results:
db_id = db["id"]
# Extract title from the title array
title_arr = db.get("title", [])
db_name = None
if title_arr:
db_name = " ".join(
t.get("plain_text", "") for t in title_arr
).strip()
if not db_name:
for ds in db_res.results:
# Extract the parent database_id from the data source's parent
ds_parent = ds.get("parent", {})
db_id = ds_parent.get("database_id")
if not db_id:
continue
# Populate the mapping so _get_parent_raw_id can resolve later
ds_id = ds.get("id")
if not ds_id:
continue
self._data_source_to_database_map[ds_id] = db_id
# Fetch the database to get its actual name and parent
try:
db_page = self._fetch_database_as_page(db_id)
db_name = db_page.database_name or f"Database {db_id}"
parent_raw_id = self._get_parent_raw_id(db_page.parent)
db_url = (
db_page.url or f"https://notion.so/{db_id.replace('-', '')}"
)
except requests.exceptions.RequestException as e:
logger.warning(
f"Could not fetch database '{db_id}', "
f"defaulting to workspace root. Error: {e}"
)
db_name = f"Database {db_id}"
parent_raw_id = self.workspace_id
db_url = f"https://notion.so/{db_id.replace('-', '')}"
# Get parent using existing helper
parent_raw_id = self._get_parent_raw_id(db.get("parent"))
# Notion URLs omit dashes from UUIDs
db_url = db.get("url") or f"https://notion.so/{db_id.replace('-', '')}"
# _maybe_yield_hierarchy_node deduplicates by raw_node_id,
# so multiple data sources under one database produce one node.
node = self._maybe_yield_hierarchy_node(
raw_node_id=db_id,
raw_parent_id=parent_raw_id or self.workspace_id,

View File

@@ -72,6 +72,10 @@ CONNECTOR_CLASS_MAP = {
module_path="onyx.connectors.coda.connector",
class_name="CodaConnector",
),
DocumentSource.CANVAS: ConnectorMapping(
module_path="onyx.connectors.canvas.connector",
class_name="CanvasConnector",
),
DocumentSource.NOTION: ConnectorMapping(
module_path="onyx.connectors.notion.connector",
class_name="NotionConnector",

View File

@@ -1,5 +1,6 @@
import base64
import copy
import fnmatch
import html
import io
import os
@@ -84,6 +85,44 @@ SHARED_DOCUMENTS_MAP_REVERSE = {v: k for k, v in SHARED_DOCUMENTS_MAP.items()}
ASPX_EXTENSION = ".aspx"
def _is_site_excluded(site_url: str, excluded_site_patterns: list[str]) -> bool:
"""Check if a site URL matches any of the exclusion glob patterns."""
for pattern in excluded_site_patterns:
if fnmatch.fnmatch(site_url, pattern) or fnmatch.fnmatch(
site_url.rstrip("/"), pattern.rstrip("/")
):
return True
return False
def _is_path_excluded(item_path: str, excluded_path_patterns: list[str]) -> bool:
"""Check if a drive item path matches any of the exclusion glob patterns.
item_path is the relative path within a drive, e.g. "Engineering/API/report.docx".
Matches are attempted against the full path and the filename alone so that
patterns like "*.tmp" match files at any depth.
"""
filename = item_path.rsplit("/", 1)[-1] if "/" in item_path else item_path
for pattern in excluded_path_patterns:
if fnmatch.fnmatch(item_path, pattern) or fnmatch.fnmatch(filename, pattern):
return True
return False
def _build_item_relative_path(parent_reference_path: str | None, item_name: str) -> str:
"""Build the relative path of a drive item from its parentReference.path and name.
Example: parentReference.path="/drives/abc/root:/Eng/API", name="report.docx"
=> "Eng/API/report.docx"
"""
if parent_reference_path and "root:/" in parent_reference_path:
folder = unquote(parent_reference_path.split("root:/", 1)[1])
if folder:
return f"{folder}/{item_name}"
return item_name
DEFAULT_AUTHORITY_HOST = "https://login.microsoftonline.com"
DEFAULT_GRAPH_API_HOST = "https://graph.microsoft.com"
DEFAULT_SHAREPOINT_DOMAIN_SUFFIX = "sharepoint.com"
@@ -478,6 +517,7 @@ def _convert_driveitem_to_document_with_permissions(
include_permissions: bool = False,
parent_hierarchy_raw_node_id: str | None = None,
access_token: str | None = None,
treat_sharing_link_as_public: bool = False,
) -> Document | ConnectorFailure | None:
if not driveitem.name or not driveitem.id:
@@ -610,6 +650,7 @@ def _convert_driveitem_to_document_with_permissions(
drive_item=sdk_item,
drive_name=drive_name,
add_prefix=True,
treat_sharing_link_as_public=treat_sharing_link_as_public,
)
else:
external_access = ExternalAccess.empty()
@@ -644,6 +685,7 @@ def _convert_sitepage_to_document(
graph_client: GraphClient,
include_permissions: bool = False,
parent_hierarchy_raw_node_id: str | None = None,
treat_sharing_link_as_public: bool = False,
) -> Document:
"""Convert a SharePoint site page to a Document object."""
# Extract text content from the site page
@@ -773,6 +815,7 @@ def _convert_sitepage_to_document(
graph_client=graph_client,
site_page=site_page,
add_prefix=True,
treat_sharing_link_as_public=treat_sharing_link_as_public,
)
else:
external_access = ExternalAccess.empty()
@@ -803,6 +846,7 @@ def _convert_driveitem_to_slim_document(
ctx: ClientContext,
graph_client: GraphClient,
parent_hierarchy_raw_node_id: str | None = None,
treat_sharing_link_as_public: bool = False,
) -> SlimDocument:
if driveitem.id is None:
raise ValueError("DriveItem ID is required")
@@ -813,6 +857,7 @@ def _convert_driveitem_to_slim_document(
graph_client=graph_client,
drive_item=sdk_item,
drive_name=drive_name,
treat_sharing_link_as_public=treat_sharing_link_as_public,
)
return SlimDocument(
@@ -827,6 +872,7 @@ def _convert_sitepage_to_slim_document(
ctx: ClientContext | None,
graph_client: GraphClient,
parent_hierarchy_raw_node_id: str | None = None,
treat_sharing_link_as_public: bool = False,
) -> SlimDocument:
"""Convert a SharePoint site page to a SlimDocument object."""
if site_page.get("id") is None:
@@ -836,6 +882,7 @@ def _convert_sitepage_to_slim_document(
ctx=ctx,
graph_client=graph_client,
site_page=site_page,
treat_sharing_link_as_public=treat_sharing_link_as_public,
)
id = site_page.get("id")
if id is None:
@@ -855,14 +902,20 @@ class SharepointConnector(
self,
batch_size: int = INDEX_BATCH_SIZE,
sites: list[str] = [],
excluded_sites: list[str] = [],
excluded_paths: list[str] = [],
include_site_pages: bool = True,
include_site_documents: bool = True,
treat_sharing_link_as_public: bool = False,
authority_host: str = DEFAULT_AUTHORITY_HOST,
graph_api_host: str = DEFAULT_GRAPH_API_HOST,
sharepoint_domain_suffix: str = DEFAULT_SHAREPOINT_DOMAIN_SUFFIX,
) -> None:
self.batch_size = batch_size
self.sites = list(sites)
self.excluded_sites = [s for p in excluded_sites if (s := p.strip())]
self.excluded_paths = [s for p in excluded_paths if (s := p.strip())]
self.treat_sharing_link_as_public = treat_sharing_link_as_public
self.site_descriptors: list[SiteDescriptor] = self._extract_site_and_drive_info(
sites
)
@@ -1233,6 +1286,29 @@ class SharepointConnector(
break
sites = sites._get_next().execute_query()
def _is_driveitem_excluded(self, driveitem: DriveItemData) -> bool:
"""Check if a drive item should be excluded based on excluded_paths patterns."""
if not self.excluded_paths:
return False
relative_path = _build_item_relative_path(
driveitem.parent_reference_path, driveitem.name
)
return _is_path_excluded(relative_path, self.excluded_paths)
def _filter_excluded_sites(
self, site_descriptors: list[SiteDescriptor]
) -> list[SiteDescriptor]:
"""Remove sites matching any excluded_sites glob pattern."""
if not self.excluded_sites:
return site_descriptors
result = []
for sd in site_descriptors:
if _is_site_excluded(sd.url, self.excluded_sites):
logger.info(f"Excluding site by denylist: {sd.url}")
continue
result.append(sd)
return result
def fetch_sites(self) -> list[SiteDescriptor]:
sites = self.graph_client.sites.get_all_sites().execute_query()
@@ -1249,7 +1325,7 @@ class SharepointConnector(
for site in self._handle_paginated_sites(sites)
if "-my.sharepoint" not in site.web_url
]
return site_descriptors
return self._filter_excluded_sites(site_descriptors)
def _fetch_site_pages(
self,
@@ -1689,8 +1765,14 @@ class SharepointConnector(
checkpoint.current_drive_delta_next_link = None
checkpoint.seen_document_ids.clear()
def _fetch_slim_documents_from_sharepoint(self) -> GenerateSlimDocumentOutput:
site_descriptors = self.site_descriptors or self.fetch_sites()
def _fetch_slim_documents_from_sharepoint(
self,
start: datetime | None = None,
end: datetime | None = None,
) -> GenerateSlimDocumentOutput:
site_descriptors = self._filter_excluded_sites(
self.site_descriptors or self.fetch_sites()
)
# Create a temporary checkpoint for hierarchy node tracking
temp_checkpoint = SharepointConnectorCheckpoint(has_more=True)
@@ -1708,8 +1790,14 @@ class SharepointConnector(
# Process site documents if flag is True
if self.include_site_documents:
for driveitem, drive_name, drive_web_url in self._fetch_driveitems(
site_descriptor=site_descriptor
site_descriptor=site_descriptor,
start=start,
end=end,
):
if self._is_driveitem_excluded(driveitem):
logger.debug(f"Excluding by path denylist: {driveitem.web_url}")
continue
if drive_web_url:
doc_batch.extend(
self._yield_drive_hierarchy_node(
@@ -1747,6 +1835,7 @@ class SharepointConnector(
ctx,
self.graph_client,
parent_hierarchy_raw_node_id=parent_hierarchy_url,
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
)
)
except Exception as e:
@@ -1758,7 +1847,9 @@ class SharepointConnector(
# Process site pages if flag is True
if self.include_site_pages:
site_pages = self._fetch_site_pages(site_descriptor)
site_pages = self._fetch_site_pages(
site_descriptor, start=start, end=end
)
for site_page in site_pages:
logger.debug(
f"Processing site page: {site_page.get('webUrl', site_page.get('name', 'Unknown'))}"
@@ -1770,6 +1861,7 @@ class SharepointConnector(
ctx,
self.graph_client,
parent_hierarchy_raw_node_id=site_descriptor.url,
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
)
)
if len(doc_batch) >= SLIM_BATCH_SIZE:
@@ -2043,7 +2135,9 @@ class SharepointConnector(
and not checkpoint.process_site_pages
):
logger.info("Initializing SharePoint sites for processing")
site_descs = self.site_descriptors or self.fetch_sites()
site_descs = self._filter_excluded_sites(
self.site_descriptors or self.fetch_sites()
)
checkpoint.cached_site_descriptors = deque(site_descs)
if not checkpoint.cached_site_descriptors:
@@ -2264,6 +2358,10 @@ class SharepointConnector(
for driveitem in driveitems:
item_count += 1
if self._is_driveitem_excluded(driveitem):
logger.debug(f"Excluding by path denylist: {driveitem.web_url}")
continue
if driveitem.id and driveitem.id in checkpoint.seen_document_ids:
logger.debug(
f"Skipping duplicate document {driveitem.id} ({driveitem.name})"
@@ -2318,6 +2416,7 @@ class SharepointConnector(
parent_hierarchy_raw_node_id=parent_hierarchy_url,
graph_api_base=self.graph_api_base,
access_token=access_token,
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
)
if isinstance(doc_or_failure, Document):
@@ -2398,6 +2497,7 @@ class SharepointConnector(
include_permissions=include_permissions,
# Site pages have the site as their parent
parent_hierarchy_raw_node_id=site_descriptor.url,
treat_sharing_link_as_public=self.treat_sharing_link_as_public,
)
)
logger.info(
@@ -2473,12 +2573,22 @@ class SharepointConnector(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
end: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None, # noqa: ARG002
) -> GenerateSlimDocumentOutput:
yield from self._fetch_slim_documents_from_sharepoint()
start_dt = (
datetime.fromtimestamp(start, tz=timezone.utc)
if start is not None
else None
)
end_dt = (
datetime.fromtimestamp(end, tz=timezone.utc) if end is not None else None
)
yield from self._fetch_slim_documents_from_sharepoint(
start=start_dt,
end=end_dt,
)
if __name__ == "__main__":

View File

@@ -17,6 +17,7 @@ def get_sharepoint_external_access(
drive_name: str | None = None,
site_page: dict[str, Any] | None = None,
add_prefix: bool = False,
treat_sharing_link_as_public: bool = False,
) -> ExternalAccess:
if drive_item and drive_item.id is None:
raise ValueError("DriveItem ID is required")
@@ -34,7 +35,13 @@ def get_sharepoint_external_access(
)
external_access = get_external_access_func(
ctx, graph_client, drive_name, drive_item, site_page, add_prefix
ctx,
graph_client,
drive_name,
drive_item,
site_page,
add_prefix,
treat_sharing_link_as_public,
)
return external_access

View File

@@ -516,6 +516,8 @@ def _get_all_doc_ids(
] = default_msg_filter,
callback: IndexingHeartbeatInterface | None = None,
workspace_url: str | None = None,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> GenerateSlimDocumentOutput:
"""
Get all document ids in the workspace, channel by channel
@@ -546,6 +548,8 @@ def _get_all_doc_ids(
client=client,
channel=channel,
callback=callback,
oldest=str(start) if start else None, # 0.0 -> None intentionally
latest=str(end) if end is not None else None,
)
for message_batch in channel_message_batches:
@@ -847,8 +851,8 @@ class SlackConnector(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
end: SecondsSinceUnixEpoch | None = None, # noqa: ARG002
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
if self.client is None:
@@ -861,6 +865,8 @@ class SlackConnector(
msg_filter_func=self.msg_filter_func,
callback=callback,
workspace_url=self._workspace_url,
start=start,
end=end,
)
def _load_from_checkpoint(

View File

@@ -2,7 +2,6 @@ from collections.abc import Sequence
from datetime import datetime
from enum import Enum
from typing import Any
from uuid import UUID
from pydantic import BaseModel
from pydantic import Field
@@ -70,9 +69,13 @@ class BaseFilters(BaseModel):
class UserFileFilters(BaseModel):
user_file_ids: list[UUID] | None = None
project_id: int | None = None
persona_id: int | None = None
# Scopes search to user files tagged with a given project/persona in Vespa.
# These are NOT simply the IDs of the current project or persona — they are
# only set when the persona's/project's user files overflowed the LLM
# context window and must be searched via vector DB instead of being loaded
# directly into the prompt.
project_id_filter: int | None = None
persona_id_filter: int | None = None
class AssistantKnowledgeFilters(BaseModel):
@@ -398,3 +401,16 @@ class SavedSearchDocWithContent(SavedSearchDoc):
section in addition to the match_highlights."""
content: str
class PersonaSearchInfo(BaseModel):
"""Snapshot of persona data needed by the search pipeline.
Extracted from the ORM Persona before the DB session is released so that
SearchTool and search_pipeline never lazy-load relationships post-commit.
"""
document_set_names: list[str]
search_start_date: datetime | None
attached_document_ids: list[str]
hierarchy_node_ids: list[int]

Some files were not shown because too many files have changed in this diff Show More