Compare commits

..

148 Commits

Author SHA1 Message Date
Jamison Lahman
de333a7f86 nit 2026-04-15 21:53:08 -07:00
Jamison Lahman
25cf97efc3 nit 2026-04-15 21:52:51 -07:00
Jamison Lahman
51f4383d3b nit 2026-04-15 21:52:18 -07:00
Jamison Lahman
6501d6204a nit 2026-04-15 21:52:10 -07:00
Jamison Lahman
d6af67a88e nit 2026-04-15 21:50:52 -07:00
Jamison Lahman
4e9096a42c nit 2026-04-15 21:50:40 -07:00
Jamison Lahman
63e6fe08c9 nit 2026-04-15 21:50:33 -07:00
Jamison Lahman
805894bddf nit 2026-04-15 21:50:23 -07:00
Jamison Lahman
d233f2eb40 nit 2026-04-15 21:50:15 -07:00
Jamison Lahman
bb44f83860 nit 2026-04-15 21:50:07 -07:00
Jamison Lahman
3cdba62c8a nit 2026-04-15 21:49:59 -07:00
Jamison Lahman
590351e6bf nit 2026-04-15 21:49:51 -07:00
Jamison Lahman
40408c1b83 nit 2026-04-15 21:49:14 -07:00
Jamison Lahman
60ffd7b4b4 nit 2026-04-15 21:46:49 -07:00
Jamison Lahman
0cb13d6e69 nit 2026-04-15 21:22:30 -07:00
Jamison Lahman
976ecfed74 nit 2026-04-15 21:21:27 -07:00
Jamison Lahman
70b736648a nit 2026-04-15 21:08:40 -07:00
Jamison Lahman
e92de2cba6 chore(python): replace mypy with ty 2026-04-15 21:01:37 -07:00
dependabot[bot]
a3cb45e56d chore(deps): bump hono from 4.12.12 to 4.12.14 in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#10252)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-15 20:06:35 -07:00
dependabot[bot]
6fd07f44e1 chore(deps): bump langsmith from 0.3.45 to 0.7.31 in /backend/requirements (#10250)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-15 19:59:31 -07:00
Jamison Lahman
2a3b487fad chore(tests): remove defunct test_litellm_embedding (#10244) 2026-04-16 00:11:06 +00:00
Nikolas Garza
a14dc4e632 chore(helm): update Grafana dashboard for new push-based metric names (#10238) 2026-04-16 00:06:23 +00:00
jaffar keikei
b6467e8e3e fix: invert already_existed logic in ingestion API response (#9999) 2026-04-15 23:18:56 +00:00
Nikolas Garza
546da624a1 feat(metrics): add connector_name label to push-based connector metrics (#10237) 2026-04-15 22:58:49 +00:00
Nikolas Garza
1a88dea760 fix(model-server): add missing onyx/configs to Dockerfile for sentry support (#10236) 2026-04-15 22:42:00 +00:00
Justin Tahara
53d2d647c5 fix(deletion): Commit Session in per-doc cleanup (#10193) 2026-04-15 22:37:00 +00:00
Justin Tahara
560a8f7ab4 feat(mt): Infra setup for Redis Set (1/3) (#10209) 2026-04-15 22:29:49 +00:00
Bo-Onyx
eaabb19c72 fix(pruning): GitHub connector pruning timeout via SlimConnector (#10205) 2026-04-15 22:25:48 +00:00
Bo-Onyx
d3e5e16150 fix(pruning): Resolve hierarchy node FK error for Confluence and Notion (#10235) 2026-04-15 22:25:34 +00:00
Danelegend
d3739611ba feat(connectors): Connectors output TabularSections for tabular files (#10096) 2026-04-15 22:09:28 +00:00
Justin Tahara
73f9a47364 fix(xlsx): Openpyxl Formatting Issues (#10230) 2026-04-15 21:22:58 +00:00
Raunak Bhagat
a808445d96 feat: opalify MessageCard (#10223) 2026-04-15 21:11:18 +00:00
Nikolas Garza
c31215197a fix(chat): hide incomplete citation links during streaming (#10224) 2026-04-15 21:10:06 +00:00
Nikolas Garza
9ebd9ebd73 fix(chat): snap typewriter to full content on tab re-focus (#10226) 2026-04-15 21:07:00 +00:00
Nikolas Garza
f0bb0a6bb0 fix(chat): only header click selects preferred in multi-model panels (#10198) 2026-04-15 21:06:19 +00:00
Ben Wu
01bec19d19 feat(canvas): checkpoint logic (3/4) (#9807) 2026-04-15 20:48:16 +00:00
Danelegend
7b40c2cde7 feat(indexing): CSV Chunker - Field-Value Implementation (#10099) 2026-04-15 19:57:50 +00:00
Jamison Lahman
e2c38d2899 chore(devtools): connect databases and github remote to devcontainer (#10222) 2026-04-15 19:50:11 +00:00
Nikolas Garza
24768f9e4f feat(metrics): replace pull-based connector metrics with push-based for multi-tenant (#10189) 2026-04-15 18:15:34 +00:00
Bo-Onyx
aec1c169b6 feat(pruning): pruning grafana dashboard for single tenant (#10208) 2026-04-15 17:50:28 +00:00
Jamison Lahman
5a16ad3473 chore(tests): avoid openapi client import in tests (#10220) 2026-04-15 17:38:24 +00:00
dependabot[bot]
7e28e59f23 chore(deps): bump transformers from 4.53.0 to 5.5.4 (#9987)
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-15 10:39:50 -07:00
Nikolas Garza
879ae6c02d feat(monitoring): add local Prometheus + Grafana docker-compose stack (#9627) 2026-04-15 17:25:28 +00:00
Nikolas Garza
f84f367eb4 fix(voice): send TTS text in POST body instead of query params (#10213) 2026-04-15 17:20:29 +00:00
Jamison Lahman
d81efe3877 fix(ollama): always include model tag in display name (#10218)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2026-04-15 09:17:37 -07:00
Nikolas Garza
d4619f93c4 feat(indexing): notify admins when connector enters repeated error state (#10207) 2026-04-15 06:10:25 +00:00
Nikolas Garza
70fcfb1d73 feat(indexing): add admin API for failed documents (#10204) 2026-04-15 06:10:06 +00:00
Nikolas Garza
32ba393b32 fix(chat): keep model selector popover open until max models reached (#10203) 2026-04-15 06:09:24 +00:00
Nikolas Garza
f9d2bf78ed fix(chat): disable hover/pointer states on multi-model panels during streaming (#10202) 2026-04-15 06:09:11 +00:00
Nikolas Garza
5567a078fe fix(chat): fix fade gradient missing on last multi-model panel (#10199) 2026-04-15 06:08:48 +00:00
Raunak Bhagat
fc0e8560bc feat: opalify Tooltip component, migrate all consumers (#10210) 2026-04-15 03:42:15 +00:00
Nikolas Garza
60b2701eed feat(indexing): add diagnostic logging to check_for_indexing beat task (#10200) 2026-04-14 20:29:47 -07:00
Jamison Lahman
3682d9844b fix(fe): handle file attachment overflow (#10211) 2026-04-15 02:00:58 +00:00
Raunak Bhagat
a420f9a37c feat: add ref forwarding to input layout components (#10206) 2026-04-15 00:20:50 +00:00
Jamison Lahman
20c5107ba6 chore(devtools): install java runtime into devcontainer (#10197) 2026-04-14 23:10:12 +00:00
Nikolas Garza
357bc91aee feat(indexing): capture swallowed per-doc exceptions in Sentry (#10149) 2026-04-14 23:01:42 +00:00
Nikolas Garza
09653872a2 fix(chat): render inline citation chips in multi-model panels (#10196) 2026-04-14 22:59:10 +00:00
dependabot[bot]
ff01a53f83 chore(deps): bump next from 16.1.7 to 16.2.3 in /web (#10195)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-14 22:49:31 +00:00
Danelegend
03ddd5ca9b feat(indexing): Add TabularSection (#10095) 2026-04-14 22:16:35 +00:00
Bo-Onyx
8c49e4573c fix(pruning): Skip Permission Sync During Google Drive Pruning (#10185) 2026-04-14 22:14:09 +00:00
Jamison Lahman
f1696ffa16 chore(deps): upgrade playwright: 1.55.0->1.58.0 (#10194) 2026-04-14 15:12:14 -07:00
Jamison Lahman
a427cb5b0c chore(deps): upgrade python patch version in docker (#10192) 2026-04-14 15:10:00 -07:00
Evan Lohn
f7e4be18dd fix: uploaded files as knowledge source (#10167) 2026-04-14 21:51:00 +00:00
acaprau
0f31c490fa chore(opensearch): Add debug log for when the migration task releases its lock (#10190) 2026-04-14 14:08:48 -07:00
Wenxi
c9a4a6e42b fix: text shimmer animation nice and fast (#10184) 2026-04-14 20:59:00 +00:00
Nikolas Garza
558c9df3c7 fix(chat): eliminate long-lived DB session in multi-model worker threads (#10159) 2026-04-14 20:37:05 +00:00
Jamison Lahman
30003036d3 chore(fe): Toast logs to the console by default in dev (#10183) 2026-04-14 20:34:04 +00:00
Nikolas Garza
4b2f18c239 fix(chat): speed up text gen (#10186) 2026-04-14 13:41:29 -07:00
Wenxi
4290b097f5 fix: auth logout modal on fresh load (#10007) 2026-04-14 18:43:34 +00:00
Justin Tahara
b0f621a08b fix(llm): Fix the Auto Fetch workflow (#10181) 2026-04-14 18:06:47 +00:00
Raunak Bhagat
112edf41c5 refactor: replace Radix Slot with div wrapper in @opal/core.Disabled (#10119) 2026-04-14 17:40:32 +00:00
SubashMohan
74eb1d7212 feat(notifications): announce upcoming group-based permissions migration (#10178) 2026-04-14 16:23:33 +00:00
dependabot[bot]
e62d592b11 chore(deps): bump alembic from 1.10.4 to 1.18.4 in /backend (#9768)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-14 16:04:58 +00:00
Wenxi
57a0d25321 fix: use static provider list instead of querying be (#10166) 2026-04-14 15:34:57 +00:00
dependabot[bot]
887f79d7a5 chore(deps-dev): bump langchain-core from 1.2.22 to 1.2.28 (#10010)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-14 08:23:30 -07:00
Evan Lohn
65fd1c3ec8 fix: document set name patch (#10162) 2026-04-14 01:53:40 +00:00
Danelegend
6e3ee287b9 fix(files): Lower log level on file store cache miss (#10164) 2026-04-14 01:46:46 +00:00
Raunak Bhagat
dee0b7867e refactor: opalify input layouts with trinary withLabel prop (#10144) 2026-04-14 01:28:37 +00:00
Wenxi
77beb8044e fix(google): handle JSON credential payloads in KV storage (@jack-larch) (#10160)
Co-authored-by: Jack Larch <jack.larch@biograph.com>
2026-04-14 01:20:44 +00:00
Wenxi
750d3ac4ed fix: llm popover should refresh on admin provider edit (#10152) 2026-04-14 01:13:50 +00:00
Bo-Onyx
6c02087ba4 chore(pruning): Add Celery task queue wait time metric (#10161) 2026-04-14 01:08:25 +00:00
Wenxi
0425283ed0 fix: show correct knowledge toggle status on agent edit page (#10151) 2026-04-14 01:07:21 +00:00
Justin Tahara
da97a57c58 feat(metrics): Add Deletion-specific Prometheus Metrics (#10157) 2026-04-14 00:57:16 +00:00
dependabot[bot]
8087ddb97c chore(deps): bump hono from 4.12.7 to 4.12.12 in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#9986)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-14 00:34:07 +00:00
Wenxi
d9d5943dc4 fix: properly refresh settings/ee settings on license upload success (#10158) 2026-04-14 00:21:12 +00:00
Bo-Onyx
97a7fa6f7f fix(pruning): Release DB connection before connector enumeration in pruning (#10154) 2026-04-13 23:58:30 +00:00
Bo-Onyx
8027e62446 fix(pruning): Commit hierarchy node upserts atomically in pruning (#10156) 2026-04-13 23:53:21 +00:00
Bo-Onyx
571e860d4f fix(pruning): Adjust Prometheus histogram buckets for pruning metrics (#10155) 2026-04-13 23:49:49 +00:00
dependabot[bot]
89b91ac384 chore(deps): bump cryptography from 46.0.6 to 46.0.7 (#10012)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-13 23:40:35 +00:00
Evan Lohn
069b1f3efb fix: confluence 504 retry with smaller page size (#10147) 2026-04-13 23:36:18 +00:00
dependabot[bot]
ef2fffcd6e chore(deps): bump lodash-es from 4.17.23 to 4.18.1 in /web (#9858)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-13 23:29:29 +00:00
Justin Tahara
925be18424 feat(metrics): Wire Promtheus Metrics for Light Worker (#10148) 2026-04-13 23:29:29 +00:00
dependabot[bot]
38fffc8ad8 chore(deps): bump next from 16.1.7 to 16.2.3 in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#10062)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-13 23:10:31 +00:00
dependabot[bot]
3e9e2f08d5 chore(deps-dev): bump black from 25.1.0 to 26.3.1 (#9313)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-13 23:00:11 +00:00
dependabot[bot]
243d93ecd8 chore(deps): bump @hono/node-server from 1.19.10 to 1.19.13 in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#9985)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-13 22:47:58 +00:00
dependabot[bot]
4effe77225 chore(deps): bump pytest from 8.3.5 to 9.0.3 in /backend/requirements (#10124)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-13 16:03:16 -07:00
dependabot[bot]
ef2df458a3 chore(deps): bump lodash from 4.17.23 to 4.18.1 in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#9937)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-13 15:55:51 -07:00
dependabot[bot]
d3000da3d0 chore(deps-dev): bump pypdf from 6.9.2 to 6.10.0 (#10070)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-13 15:53:50 -07:00
Nikolas Garza
a5c703f9ca feat(indexing): add error_type to index_attempt_errors (#10134) 2026-04-13 22:12:18 +00:00
Nikolas Garza
d10c901c43 chore(ci): disable automatic Greptile reviews on push (#10146) 2026-04-13 22:01:21 +00:00
Nikolas Garza
f1ac555c57 chore(admin): rework chat preferences page layout (#10143) 2026-04-13 21:55:46 +00:00
Nikolas Garza
ed52384c21 fix(widget): surface descriptive error for trial account 429 (#10141) 2026-04-13 21:55:44 +00:00
dependabot[bot]
cb10376a0d chore(deps): bump pillow from 12.1.1 to 12.2.0 (#10129)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-13 21:17:30 +00:00
Raunak Bhagat
5a25b70b9c refactor: rename nonInteractive to withLabel in input layouts (#10117) 2026-04-13 21:13:57 +00:00
dependabot[bot]
8cbc37f281 chore(deps): bump next from 16.1.7 to 16.2.3 in /examples/widget (#10125)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-13 21:03:52 +00:00
Nikolas Garza
9d78f71f23 fix(chat): disable Deep Research in multi-model mode (ENG-4009) (#10126) 2026-04-13 20:52:56 +00:00
dependabot[bot]
fbf3179d84 chore(deps): bump astral-sh/setup-uv from 7.6.0 to 8.0.0 (#10121)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-13 20:45:02 +00:00
Danelegend
779470b553 refactor(indexing): Split document chunking into section-based strategies (#10087) 2026-04-13 20:29:58 +00:00
Evan Lohn
151e189898 fix: CLAUDE.md incorrect task info (#10116) 2026-04-13 20:26:57 +00:00
Nikolas Garza
72e08f81a4 feat(admin): add global multi-model chat toggle (#10132) 2026-04-13 20:24:54 +00:00
Justin Tahara
65792a8ad8 fix(deletion): Handle Null Connector Properly (#10131) 2026-04-13 20:20:42 +00:00
Justin Tahara
497b700b3d chore(deletion): Cleanup log (#10133) 2026-04-13 20:20:28 +00:00
Alex Kim
c3ed2135f1 Add Datadog admission opt-out label to sandbox pods (#10040) 2026-04-13 13:18:46 -07:00
Nikolas Garza
a969d56818 fix: welcome message alignment in chrome extension/desktop (#10094) 2026-04-13 19:54:18 +00:00
dependabot[bot]
a31d862f48 chore(deps): bump actions/download-artifact from 8.0.0 to 8.0.1 (#10122)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-13 19:38:08 +00:00
Nikolas Garza
a4e6d4cf43 fix(chat): isolate multi-model streaming errors to their panels (#10113) 2026-04-13 19:28:50 +00:00
Nikolas Garza
1e6f94e00d feat(chat): scrollable tables with overflow fade (#10097) 2026-04-13 19:26:05 +00:00
dependabot[bot]
a769b87a9d chore(deps): bump j178/prek-action from 1.1.1 to 2.0.1 (#10120)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-13 19:11:53 +00:00
Danelegend
278fc7e9b1 chore(indexing): Add kind enum to Section model (#10092) 2026-04-13 18:51:07 +00:00
Raunak Bhagat
eb34df470f chore: remove top-level Formik from ChatPreferencesPage (#10112) 2026-04-13 17:07:38 +00:00
Danelegend
9d1785273f chore(indexing): Add tests for current document chunking (#10086) 2026-04-13 17:00:11 +00:00
Nikolas Garza
ef69b17d26 feat(chat): smooth character-level streaming (#10093) 2026-04-13 16:13:49 +00:00
Raunak Bhagat
787c961802 refactor: migrate refresh-components/Separator and refresh-components/Divider to @opal/components.Divider (#10064) 2026-04-13 09:08:39 -07:00
Raunak Bhagat
62bc4fa2a3 chore: remove Knowledge Graph admin page (#10110) 2026-04-13 15:50:38 +00:00
Jamison Lahman
bb1c44daff fix(copy-button): fall back when Clipboard API unavailable (#10080) 2026-04-11 05:38:20 +00:00
Nikolas Garza
f26ecafb51 Revert "feat(chat): smooth character-level streaming" (#10083) 2026-04-10 20:51:27 -07:00
Nikolas Garza
9fdb425c0d feat(chat): smooth character-level streaming (#10076) 2026-04-11 03:32:33 +00:00
Jamison Lahman
47e20e89c5 chore(devtools): rm docker socket from devcontainer (#10079) 2026-04-11 02:37:54 +00:00
Raunak Bhagat
8b28c127f2 feat: add padding API to Divider (#10077) 2026-04-11 02:16:12 +00:00
Nikolas Garza
9a861a71ad fix(chat): model selection + multi-model follow-up correctness (#10075) 2026-04-11 00:14:15 +00:00
Jamison Lahman
b4bc12f6dc fix(mcp): add Ingress route for OAuth callback to reach web server (#10074) 2026-04-11 00:05:19 +00:00
Raunak Bhagat
9af9148ca7 fix: italicize proper nouns in modal titles (#10073) 2026-04-10 22:36:29 +00:00
Jamison Lahman
8a517c4f10 fix(mcp): route OAuth callback to web server instead of MCP server (#10071) 2026-04-10 15:11:46 -07:00
Jamison Lahman
6959d851ea fix(mcp): prevent masked OAuth credentials from being stored on re-auth (#10066) 2026-04-10 21:30:21 +00:00
dependabot[bot]
6a2550fc2d chore(deps): bump lodash from 4.17.23 to 4.18.1 in /web (#9901)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-10 19:43:34 +00:00
Nikolas Garza
b1cc0c2bf9 fix(scim): add advisory lock to prevent seat limit race condition (#10048) 2026-04-10 18:50:24 +00:00
Raunak Bhagat
c28b17064b feat: opalified Divider (#10063) 2026-04-10 11:40:50 -07:00
Nikolas Garza
4dab92ab52 fix(license): exclude service account users from seat count (#10053) 2026-04-10 17:15:33 +00:00
Jamison Lahman
7eb68d61b0 chore(devtools): upgrade ods: 0.7.4->0.7.5 (#10060) 2026-04-10 17:02:58 +00:00
Raunak Bhagat
8c7810d688 feat: add logos for embedding providers (Cohere, Nomic, Voyage) to @opal/logos (#10034) 2026-04-10 16:51:28 +00:00
Evan Lohn
712e6fdf5e feat: google drive error resolution (#9842) 2026-04-10 16:16:32 +00:00
Jamison Lahman
f1a9a3b41e fix(LLM config): resolve API Key before fetching models (#10056) 2026-04-10 06:53:07 +00:00
Jamison Lahman
c3405fb6bf chore(devtools): improve devcontainer usability w/ rootless docker (#10054)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 06:18:41 +00:00
Nikolas Garza
3e962935f4 fix(chat): hide ModelSelector in search mode (#10052) 2026-04-10 03:43:22 +00:00
Jamison Lahman
0aa1aa7ea0 fix(fe): Query History table has constrained column size (#10047) 2026-04-09 19:47:56 -07:00
Nikolas Garza
771d2cf101 feat(helm): add OpenSearch and Redis queues Grafana dashboards (#10042) 2026-04-10 01:23:40 +00:00
Nikolas Garza
7ec50280ed feat(federated): full thread replies + direct URL fetch in Slack search (#9940) 2026-04-09 18:17:46 -07:00
Evan Lohn
5b2ba5caeb fix: jira bulk issue fetch batching (#10044) 2026-04-10 00:38:55 +00:00
634 changed files with 19660 additions and 7050 deletions

View File

@@ -1,8 +1,8 @@
FROM ubuntu:26.04@sha256:cc925e589b7543b910fea57a240468940003fbfc0515245a495dd0ad8fe7cef1
RUN apt-get update && apt-get install -y --no-install-recommends \
acl \
curl \
default-jre \
fd-find \
fzf \
git \
@@ -25,13 +25,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
&& curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \
&& apt-get install -y nodejs \
&& install -m 0755 -d /etc/apt/keyrings \
&& curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu $(. /etc/os-release && echo "$VERSION_CODENAME") stable" > /etc/apt/sources.list.d/docker.list \
&& curl -fsSL https://cli.github.com/packages/githubcli-archive-keyring.gpg -o /etc/apt/keyrings/githubcli-archive-keyring.gpg \
&& chmod go+r /etc/apt/keyrings/githubcli-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/githubcli-archive-keyring.gpg] https://cli.github.com/packages stable main" > /etc/apt/sources.list.d/github-cli.list \
&& apt-get update \
&& apt-get install -y --no-install-recommends docker-ce-cli docker-compose-plugin gh \
&& apt-get install -y --no-install-recommends gh \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
# fd-find installs as fdfind on Debian/Ubuntu — symlink to fd

View File

@@ -6,7 +6,7 @@ A containerized development environment for working on Onyx.
- Ubuntu 26.04 base image
- Node.js 20, uv, Claude Code
- Docker CLI, GitHub CLI (`gh`)
- GitHub CLI (`gh`)
- Neovim, ripgrep, fd, fzf, jq, make, wget, unzip
- Zsh as default shell (sources host `~/.zshrc` if available)
- Python venv auto-activation
@@ -14,12 +14,6 @@ A containerized development environment for working on Onyx.
## Usage
### VS Code
1. Install the [Dev Containers extension](https://marketplace.visualstudio.com/items?itemName=ms-vscode-remote.remote-containers)
2. Open this repo in VS Code
3. "Reopen in Container" when prompted
### CLI (`ods dev`)
The [`ods` devtools CLI](../tools/ods/README.md) provides workspace-aware wrappers
@@ -39,25 +33,8 @@ ods dev exec npm test
ods dev stop
```
If you don't have `ods` installed, use the `devcontainer` CLI directly:
```bash
npm install -g @devcontainers/cli
devcontainer up --workspace-folder .
devcontainer exec --workspace-folder . zsh
```
## Restarting the container
### VS Code
Open the Command Palette (`Ctrl+Shift+P` / `Cmd+Shift+P`) and run:
- **Dev Containers: Reopen in Container** — restarts the container without rebuilding
### CLI
```bash
# Restart the container
ods dev restart
@@ -66,12 +43,6 @@ ods dev restart
ods dev rebuild
```
Or without `ods`:
```bash
devcontainer up --workspace-folder . --remove-existing-container
```
## Image
The devcontainer uses a prebuilt image published to `onyxdotapp/onyx-devcontainer`.
@@ -88,30 +59,19 @@ The `devcontainer` target is defined in `docker-bake.hcl` at the repo root.
## User & permissions
The container runs as the `dev` user by default (`remoteUser` in devcontainer.json).
An init script (`init-dev-user.sh`) runs at container start to ensure `dev` has
read/write access to the bind-mounted workspace:
An init script (`init-dev-user.sh`) runs at container start to ensure the active
user has read/write access to the bind-mounted workspace:
- **Standard Docker** — `dev`'s UID/GID is remapped to match the workspace owner,
so file permissions work seamlessly.
- **Rootless Docker** — The workspace appears as root-owned (UID 0) inside the
container due to user-namespace mapping. The init script grants `dev` access via
POSIX ACLs (`setfacl`), which adds a few seconds to the first container start on
large repos.
container due to user-namespace mapping. `ods dev up` auto-detects rootless Docker
and sets `DEVCONTAINER_REMOTE_USER=root` so the container runs as root — which
maps back to your host user via the user namespace. New files are owned by your
host UID and no ACL workarounds are needed.
## Docker socket
The container mounts the host's Docker socket so you can run `docker` commands
from inside. `ods dev` auto-detects the socket path and sets `DOCKER_SOCK`:
| Environment | Socket path |
| ----------------------- | ------------------------------ |
| Linux (rootless Docker) | `$XDG_RUNTIME_DIR/docker.sock` |
| macOS (Docker Desktop) | `~/.docker/run/docker.sock` |
| Linux (standard Docker) | `/var/run/docker.sock` |
To override, set `DOCKER_SOCK` before running `ods dev up`. When using the
VS Code extension or `devcontainer` CLI directly (without `ods`), you must set
`DOCKER_SOCK` yourself.
To override the auto-detection, set `DEVCONTAINER_REMOTE_USER` before running
`ods dev up`.
## Firewall

View File

@@ -1,20 +1,24 @@
{
"name": "Onyx Dev Sandbox",
"image": "onyxdotapp/onyx-devcontainer@sha256:12184169c5bcc9cca0388286d5ffe504b569bc9c37bfa631b76ee8eee2064055",
"runArgs": ["--cap-add=NET_ADMIN", "--cap-add=NET_RAW"],
"image": "onyxdotapp/onyx-devcontainer@sha256:0f02d9299928849c7b15f3b348dcfdcdcb64411ff7a4580cbc026a6ee7aa1554",
"runArgs": ["--cap-add=NET_ADMIN", "--cap-add=NET_RAW", "--network=onyx_default"],
"mounts": [
"source=${localEnv:DOCKER_SOCK},target=/var/run/docker.sock,type=bind",
"source=${localEnv:HOME}/.claude,target=/home/dev/.claude,type=bind",
"source=${localEnv:HOME}/.claude.json,target=/home/dev/.claude.json,type=bind",
"source=${localEnv:HOME}/.zshrc,target=/home/dev/.zshrc.host,type=bind,readonly",
"source=${localEnv:HOME}/.gitconfig,target=/home/dev/.gitconfig.host,type=bind,readonly",
"source=${localEnv:HOME}/.ssh,target=/home/dev/.ssh.host,type=bind,readonly",
"source=${localEnv:HOME}/.config/nvim,target=/home/dev/.config/nvim.host,type=bind,readonly",
"source=${localEnv:HOME}/.gitconfig,target=/home/dev/.gitconfig,type=bind,readonly",
"source=${localEnv:HOME}/.config/nvim,target=/home/dev/.config/nvim,type=bind,readonly",
"source=onyx-devcontainer-cache,target=/home/dev/.cache,type=volume",
"source=onyx-devcontainer-local,target=/home/dev/.local,type=volume"
],
"remoteUser": "dev",
"containerEnv": {
"SSH_AUTH_SOCK": "/tmp/ssh-agent.sock",
"POSTGRES_HOST": "relational_db",
"REDIS_HOST": "cache"
},
"remoteUser": "${localEnv:DEVCONTAINER_REMOTE_USER:dev}",
"updateRemoteUserUID": false,
"initializeCommand": "docker network create onyx_default 2>/dev/null || true",
"workspaceMount": "source=${localWorkspaceFolder},target=/workspace,type=bind,consistency=delegated",
"workspaceFolder": "/workspace",
"postStartCommand": "sudo bash /workspace/.devcontainer/init-dev-user.sh && sudo bash /workspace/.devcontainer/init-firewall.sh",

View File

@@ -8,38 +8,68 @@ set -euo pipefail
# We remap dev to that UID -- fast and seamless.
#
# Rootless Docker: Workspace appears as root-owned (UID 0) inside the
# container due to user-namespace mapping. We can't remap
# dev to UID 0 (that's root), so we grant access with
# POSIX ACLs instead.
# container due to user-namespace mapping. Requires
# DEVCONTAINER_REMOTE_USER=root (set automatically by
# ods dev up). Container root IS the host user, so
# bind-mounts and named volumes are symlinked into /root.
WORKSPACE=/workspace
TARGET_USER=dev
REMOTE_USER="${SUDO_USER:-$TARGET_USER}"
WS_UID=$(stat -c '%u' "$WORKSPACE")
WS_GID=$(stat -c '%g' "$WORKSPACE")
DEV_UID=$(id -u "$TARGET_USER")
DEV_GID=$(id -g "$TARGET_USER")
DEV_HOME=/home/"$TARGET_USER"
# devcontainer.json bind-mounts and named volumes target /home/dev regardless
# of remoteUser. When running as root ($HOME=/root), Phase 1 bridges the gap
# with symlinks from ACTIVE_HOME → MOUNT_HOME.
MOUNT_HOME=/home/"$TARGET_USER"
# Ensure directories that tools expect exist under ~dev.
# ~/.local and ~/.cache are named Docker volumes -- ensure they are owned by dev.
mkdir -p "$DEV_HOME"/.local/state "$DEV_HOME"/.local/share
chown -R "$TARGET_USER":"$TARGET_USER" "$DEV_HOME"/.local
chown -R "$TARGET_USER":"$TARGET_USER" "$DEV_HOME"/.cache
# Copy host configs mounted as *.host into their real locations.
# This gives the dev user owned copies without touching host originals.
if [ -d "$DEV_HOME/.ssh.host" ]; then
cp -a "$DEV_HOME/.ssh.host" "$DEV_HOME/.ssh"
chmod 700 "$DEV_HOME/.ssh"
chmod 600 "$DEV_HOME"/.ssh/id_* 2>/dev/null || true
chown -R "$TARGET_USER":"$TARGET_USER" "$DEV_HOME/.ssh"
if [ "$REMOTE_USER" = "root" ]; then
ACTIVE_HOME="/root"
else
ACTIVE_HOME="$MOUNT_HOME"
fi
if [ -d "$DEV_HOME/.config/nvim.host" ]; then
mkdir -p "$DEV_HOME/.config"
cp -a "$DEV_HOME/.config/nvim.host" "$DEV_HOME/.config/nvim"
chown -R "$TARGET_USER":"$TARGET_USER" "$DEV_HOME/.config/nvim"
# ── Phase 1: home directory setup ───────────────────────────────────
# ~/.local and ~/.cache are named Docker volumes mounted under MOUNT_HOME.
mkdir -p "$MOUNT_HOME"/.local/state "$MOUNT_HOME"/.local/share
# When running as root, symlink bind-mounts and named volumes into /root
# so that $HOME-relative tools (Claude Code, git, etc.) find them.
if [ "$ACTIVE_HOME" != "$MOUNT_HOME" ]; then
for item in .claude .cache .local; do
[ -d "$MOUNT_HOME/$item" ] || continue
if [ -e "$ACTIVE_HOME/$item" ] && [ ! -L "$ACTIVE_HOME/$item" ]; then
echo "warning: replacing $ACTIVE_HOME/$item with symlink to $MOUNT_HOME/$item" >&2
rm -rf "$ACTIVE_HOME/$item"
fi
ln -sfn "$MOUNT_HOME/$item" "$ACTIVE_HOME/$item"
done
# Symlink files (not directories).
for file in .claude.json .gitconfig .zshrc.host; do
[ -f "$MOUNT_HOME/$file" ] && ln -sf "$MOUNT_HOME/$file" "$ACTIVE_HOME/$file"
done
# Nested mount: .config/nvim
if [ -d "$MOUNT_HOME/.config/nvim" ]; then
mkdir -p "$ACTIVE_HOME/.config"
if [ -e "$ACTIVE_HOME/.config/nvim" ] && [ ! -L "$ACTIVE_HOME/.config/nvim" ]; then
echo "warning: replacing $ACTIVE_HOME/.config/nvim with symlink" >&2
rm -rf "$ACTIVE_HOME/.config/nvim"
fi
ln -sfn "$MOUNT_HOME/.config/nvim" "$ACTIVE_HOME/.config/nvim"
fi
fi
# ── Phase 2: workspace access ───────────────────────────────────────
# Root always has workspace access; Phase 1 handled home setup.
if [ "$REMOTE_USER" = "root" ]; then
exit 0
fi
# Already matching -- nothing to do.
@@ -61,45 +91,17 @@ if [ "$WS_UID" != "0" ]; then
echo "warning: failed to remap $TARGET_USER UID to $WS_UID" >&2
fi
fi
if ! chown -R "$TARGET_USER":"$TARGET_USER" /home/"$TARGET_USER" 2>&1; then
echo "warning: failed to chown /home/$TARGET_USER" >&2
if ! chown -R "$TARGET_USER":"$TARGET_USER" "$MOUNT_HOME" 2>&1; then
echo "warning: failed to chown $MOUNT_HOME" >&2
fi
else
# ── Rootless Docker ──────────────────────────────────────────────
# Workspace is root-owned inside the container. Grant dev access
# via POSIX ACLs (preserves ownership, works across the namespace
# boundary).
if command -v setfacl &>/dev/null; then
setfacl -Rm "u:${TARGET_USER}:rwX" "$WORKSPACE"
setfacl -Rdm "u:${TARGET_USER}:rwX" "$WORKSPACE" # default ACL for new files
# Git refuses to operate in repos owned by a different UID.
# Host gitconfig is mounted readonly as ~/.gitconfig.host.
# Create a real ~/.gitconfig that includes it plus container overrides.
printf '[include]\n\tpath = %s/.gitconfig.host\n[safe]\n\tdirectory = %s\n' \
"$DEV_HOME" "$WORKSPACE" > "$DEV_HOME/.gitconfig"
chown "$TARGET_USER":"$TARGET_USER" "$DEV_HOME/.gitconfig"
# If this is a worktree, the main .git dir is bind-mounted at its
# host absolute path. Grant dev access so git operations work.
GIT_COMMON_DIR=$(git -C "$WORKSPACE" rev-parse --git-common-dir 2>/dev/null || true)
if [ -n "$GIT_COMMON_DIR" ] && [ "$GIT_COMMON_DIR" != "$WORKSPACE/.git" ]; then
[ ! -d "$GIT_COMMON_DIR" ] && GIT_COMMON_DIR="$WORKSPACE/$GIT_COMMON_DIR"
if [ -d "$GIT_COMMON_DIR" ]; then
setfacl -Rm "u:${TARGET_USER}:rwX" "$GIT_COMMON_DIR"
setfacl -Rdm "u:${TARGET_USER}:rwX" "$GIT_COMMON_DIR"
git config -f "$DEV_HOME/.gitconfig" --add safe.directory "$(dirname "$GIT_COMMON_DIR")"
fi
fi
# Also fix bind-mounted dirs under ~dev that appear root-owned.
for dir in /home/"$TARGET_USER"/.claude; do
[ -d "$dir" ] && setfacl -Rm "u:${TARGET_USER}:rwX" "$dir" && setfacl -Rdm "u:${TARGET_USER}:rwX" "$dir"
done
[ -f /home/"$TARGET_USER"/.claude.json ] && \
setfacl -m "u:${TARGET_USER}:rw" /home/"$TARGET_USER"/.claude.json
else
echo "warning: setfacl not found; dev user may not have write access to workspace" >&2
echo " install the 'acl' package or set remoteUser to root" >&2
fi
# Workspace is root-owned (UID 0) due to user-namespace mapping.
# The supported path is remoteUser=root (set DEVCONTAINER_REMOTE_USER=root),
# which is handled above. If we reach here, the user is running as dev
# under rootless Docker without the override.
echo "error: rootless Docker detected but remoteUser is not root." >&2
echo " Set DEVCONTAINER_REMOTE_USER=root before starting the container," >&2
echo " or use 'ods dev up' which sets it automatically." >&2
exit 1
fi

View File

@@ -4,22 +4,12 @@ set -euo pipefail
echo "Setting up firewall..."
# Preserve docker dns resolution
DOCKER_DNS_RULES=$(iptables-save | grep -E "^-A.*-d 127.0.0.11/32" || true)
# Flush all rules
iptables -t nat -F
iptables -t nat -X
iptables -t mangle -F
iptables -t mangle -X
# Only flush the filter table. The nat and mangle tables are managed by Docker
# (DNS DNAT to 127.0.0.11, container networking, etc.) and must not be touched —
# flushing them breaks Docker's embedded DNS resolver.
iptables -F
iptables -X
# Restore docker dns rules
if [ -n "$DOCKER_DNS_RULES" ]; then
echo "$DOCKER_DNS_RULES" | iptables-restore -n
fi
# Create ipset for allowed destinations
ipset create allowed-domains hash:net || true
ipset flush allowed-domains
@@ -34,6 +24,7 @@ done
# Resolve allowed domains
ALLOWED_DOMAINS=(
"github.com"
"registry.npmjs.org"
"api.anthropic.com"
"api-staging.anthropic.com"
@@ -56,14 +47,23 @@ for domain in "${ALLOWED_DOMAINS[@]}"; do
done
done
# Detect host network
if [[ "${DOCKER_HOST:-}" == "unix://"* ]]; then
DOCKER_GATEWAY=$(ip -4 route show | grep "^default" | awk '{print $3}')
# Allow traffic to the Docker gateway so the container can reach host services
# (e.g. the Onyx stack at localhost:3000, localhost:8080, etc.)
DOCKER_GATEWAY=$(ip -4 route show default | awk '{print $3}')
if [ -n "$DOCKER_GATEWAY" ]; then
if ! ipset add allowed-domains "$DOCKER_GATEWAY/32" -exist 2>&1; then
echo "warning: failed to add Docker gateway $DOCKER_GATEWAY to allowlist" >&2
fi
fi
# Allow traffic to all attached Docker network subnets so the container can
# reach sibling services (e.g. relational_db, cache) on shared compose networks.
for subnet in $(ip -4 -o addr show scope global | awk '{print $4}'); do
if ! ipset add allowed-domains "$subnet" -exist 2>&1; then
echo "warning: failed to add Docker subnet $subnet to allowlist" >&2
fi
done
# Set default policies to DROP
iptables -P FORWARD DROP
iptables -P INPUT DROP

View File

@@ -44,7 +44,7 @@ jobs:
fetch-tags: true
- name: Setup uv
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # ratchet:astral-sh/setup-uv@v8.0.0
with:
version: "0.9.9"
enable-cache: false
@@ -165,7 +165,7 @@ jobs:
fetch-depth: 0
- name: Setup uv
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # ratchet:astral-sh/setup-uv@v8.0.0
with:
version: "0.9.9"
# NOTE: This isn't caching much and zizmor suggests this could be poisoned, so disable.

View File

@@ -114,7 +114,7 @@ jobs:
ref: main
- name: Install the latest version of uv
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # ratchet:astral-sh/setup-uv@v8.0.0
with:
enable-cache: false
version: "0.9.9"

View File

@@ -471,7 +471,7 @@ jobs:
- name: Install the latest version of uv
if: always()
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # ratchet:astral-sh/setup-uv@v8.0.0
with:
enable-cache: false
version: "0.9.9"
@@ -710,7 +710,7 @@ jobs:
pull-requests: write
steps:
- name: Download visual diff summaries
uses: actions/download-artifact@70fc10c6e5e1ce46ad2ea6f2b72d43f7d47b13c3
uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c
with:
pattern: screenshot-diff-summary-*
path: summaries/

View File

@@ -19,16 +19,15 @@ permissions:
jobs:
mypy-check:
# See https://runs-on.com/runners/linux/
# Note: Mypy seems quite optimized for x64 compared to arm64.
# Similarly, mypy is single-threaded and incremental, so 2cpu is sufficient.
# NOTE: This job is named mypy-check for branch protection compatibility,
# but it actually runs ty (astral-sh's Rust type checker).
runs-on:
[
runs-on,
runner=2cpu-linux-x64,
runner=2cpu-linux-arm64,
"run-id=${{ github.run_id }}-mypy-check",
"extras=s3-cache",
]
timeout-minutes: 45
timeout-minutes: 15
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
@@ -46,26 +45,7 @@ jobs:
backend/requirements/model_server.txt
backend/requirements/ee.txt
- name: Generate OpenAPI schema and Python client
shell: bash
# TODO(Nik): https://linear.app/onyx-app/issue/ENG-1/update-test-infra-to-use-test-license
- name: Run ty
env:
LICENSE_ENFORCEMENT_ENABLED: "false"
run: |
ods openapi all
- name: Cache mypy cache
if: ${{ vars.DISABLE_MYPY_CACHE != 'true' }}
uses: runs-on/cache@a5f51d6f3fece787d03b7b4e981c82538a0654ed # ratchet:runs-on/cache@v4
with:
path: .mypy_cache
key: mypy-${{ runner.os }}-${{ github.base_ref || github.event.merge_group.base_ref || 'main' }}-${{ hashFiles('**/*.py', '**/*.pyi', 'pyproject.toml') }}
restore-keys: |
mypy-${{ runner.os }}-${{ github.base_ref || github.event.merge_group.base_ref || 'main' }}-
mypy-${{ runner.os }}-
- name: Run MyPy
env:
MYPY_FORCE_COLOR: 1
TERM: xterm-256color
run: mypy .
run: ty check --output-format github

View File

@@ -17,8 +17,6 @@ env:
# API keys for testing
COHERE_API_KEY: ${{ secrets.COHERE_API_KEY }}
LITELLM_API_KEY: ${{ secrets.LITELLM_API_KEY }}
LITELLM_API_URL: ${{ secrets.LITELLM_API_URL }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
AZURE_API_KEY: ${{ secrets.AZURE_API_KEY }}
AZURE_API_URL: ${{ vars.AZURE_API_URL }}

View File

@@ -38,7 +38,7 @@ jobs:
- name: Install node dependencies
working-directory: ./web
run: npm ci
- uses: j178/prek-action@0bb87d7f00b0c99306c8bcb8b8beba1eb581c037 # ratchet:j178/prek-action@v1
- uses: j178/prek-action@cbc2f23eb5539cf20d82d1aabd0d0ecbcc56f4e3
with:
prek-version: '0.3.4'
extra-args: ${{ github.event_name == 'pull_request' && format('--from-ref {0} --to-ref {1}', github.event.pull_request.base.sha, github.event.pull_request.head.sha) || github.event_name == 'merge_group' && format('--from-ref {0} --to-ref {1}', github.event.merge_group.base_sha, github.event.merge_group.head_sha) || github.ref_name == 'main' && '--all-files' || '' }}

View File

@@ -17,7 +17,7 @@ jobs:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
- uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # ratchet:astral-sh/setup-uv@v8.0.0
with:
enable-cache: false
version: "0.9.9"

View File

@@ -26,7 +26,7 @@ jobs:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
- uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # ratchet:astral-sh/setup-uv@v8.0.0
with:
enable-cache: false
version: "0.9.9"

View File

@@ -24,7 +24,7 @@ jobs:
persist-credentials: false
- name: Install the latest version of uv
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # ratchet:astral-sh/setup-uv@v8.0.0
with:
enable-cache: false
version: "0.9.9"

View File

@@ -1,64 +1,57 @@
{
"labels": [],
"comment": "",
"fixWithAI": true,
"hideFooter": false,
"strictness": 3,
"statusCheck": true,
"commentTypes": [
"logic",
"syntax",
"style"
],
"instructions": "",
"disabledLabels": [],
"excludeAuthors": [
"dependabot[bot]",
"renovate[bot]"
],
"ignoreKeywords": "",
"ignorePatterns": "",
"includeAuthors": [],
"summarySection": {
"included": true,
"collapsible": false,
"defaultOpen": false
"labels": [],
"comment": "",
"fixWithAI": true,
"hideFooter": false,
"strictness": 3,
"statusCheck": true,
"commentTypes": ["logic", "syntax", "style"],
"instructions": "",
"disabledLabels": [],
"excludeAuthors": ["dependabot[bot]", "renovate[bot]"],
"ignoreKeywords": "",
"ignorePatterns": "",
"includeAuthors": [],
"summarySection": {
"included": true,
"collapsible": false,
"defaultOpen": false
},
"excludeBranches": [],
"fileChangeLimit": 300,
"includeBranches": [],
"includeKeywords": "",
"triggerOnUpdates": false,
"updateExistingSummaryComment": true,
"updateSummaryOnly": false,
"issuesTableSection": {
"included": true,
"collapsible": false,
"defaultOpen": false
},
"statusCommentsEnabled": true,
"confidenceScoreSection": {
"included": true,
"collapsible": false
},
"sequenceDiagramSection": {
"included": true,
"collapsible": false,
"defaultOpen": false
},
"shouldUpdateDescription": false,
"rules": [
{
"scope": ["web/**"],
"rule": "In Onyx's Next.js app, the `app/ee/admin/` directory is a filesystem convention for Enterprise Edition route overrides — it does NOT add an `/ee/` prefix to the URL. Both `app/admin/groups/page.tsx` and `app/ee/admin/groups/page.tsx` serve the same URL `/admin/groups`. Hardcoded `/admin/...` paths in router.push() calls are correct and do NOT break EE deployments. Do not flag hardcoded admin paths as bugs."
},
"excludeBranches": [],
"fileChangeLimit": 300,
"includeBranches": [],
"includeKeywords": "",
"triggerOnUpdates": true,
"updateExistingSummaryComment": true,
"updateSummaryOnly": false,
"issuesTableSection": {
"included": true,
"collapsible": false,
"defaultOpen": false
{
"scope": ["web/**"],
"rule": "In Onyx, each API key creates a unique user row in the database with a unique `user_id` (UUID). There is a 1:1 mapping between API keys and their backing user records. Multiple API keys do NOT share the same `user_id`. Do not flag potential duplicate row IDs when using `user_id` from API key descriptors."
},
"statusCommentsEnabled": true,
"confidenceScoreSection": {
"included": true,
"collapsible": false
},
"sequenceDiagramSection": {
"included": true,
"collapsible": false,
"defaultOpen": false
},
"shouldUpdateDescription": false,
"rules": [
{
"scope": ["web/**"],
"rule": "In Onyx's Next.js app, the `app/ee/admin/` directory is a filesystem convention for Enterprise Edition route overrides — it does NOT add an `/ee/` prefix to the URL. Both `app/admin/groups/page.tsx` and `app/ee/admin/groups/page.tsx` serve the same URL `/admin/groups`. Hardcoded `/admin/...` paths in router.push() calls are correct and do NOT break EE deployments. Do not flag hardcoded admin paths as bugs."
},
{
"scope": ["web/**"],
"rule": "In Onyx, each API key creates a unique user row in the database with a unique `user_id` (UUID). There is a 1:1 mapping between API keys and their backing user records. Multiple API keys do NOT share the same `user_id`. Do not flag potential duplicate row IDs when using `user_id` from API key descriptors."
},
{
"scope": ["backend/**/*.py"],
"rule": "Never raise HTTPException directly in business code. Use `raise OnyxError(OnyxErrorCode.XXX, \"message\")` from `onyx.error_handling.exceptions`. A global FastAPI exception handler converts OnyxError into structured JSON responses with {\"error_code\": \"...\", \"detail\": \"...\"}. Error codes are defined in `onyx.error_handling.error_codes.OnyxErrorCode`. For upstream errors with dynamic HTTP status codes, use `status_code_override`: `raise OnyxError(OnyxErrorCode.BAD_GATEWAY, detail, status_code_override=upstream_status)`."
}
]
{
"scope": ["backend/**/*.py"],
"rule": "Never raise HTTPException directly in business code. Use `raise OnyxError(OnyxErrorCode.XXX, \"message\")` from `onyx.error_handling.exceptions`. A global FastAPI exception handler converts OnyxError into structured JSON responses with {\"error_code\": \"...\", \"detail\": \"...\"}. Error codes are defined in `onyx.error_handling.error_codes.OnyxErrorCode`. For upstream errors with dynamic HTTP status codes, use `status_code_override`: `raise OnyxError(OnyxErrorCode.BAD_GATEWAY, detail, status_code_override=upstream_status)`."
}
]
}

View File

@@ -67,11 +67,12 @@ repos:
args: ["--active", "--with=onyx-devtools", "ods", "check-lazy-imports"]
pass_filenames: true
files: ^backend/(?!\.venv/|scripts/).*\.py$
# NOTE: This takes ~6s on a single, large module which is prohibitively slow.
# NOTE: ty is fast enough to run in pre-commit but is pre-release (v0.0.31).
# Uncomment to enable.
# - id: uv-run
# name: mypy
# args: ["--all-extras", "mypy"]
# pass_filenames: true
# name: ty
# args: ["--active", "ty", "check"]
# pass_filenames: false
# files: ^backend/.*\.py$
- repo: https://github.com/pre-commit/pre-commit-hooks

12
.vscode/launch.json vendored
View File

@@ -475,6 +475,18 @@
"order": 0
}
},
{
"name": "Start Monitoring Stack (Prometheus + Grafana)",
"type": "node",
"request": "launch",
"runtimeExecutable": "docker",
"runtimeArgs": ["compose", "up", "-d"],
"cwd": "${workspaceFolder}/profiling",
"console": "integratedTerminal",
"presentation": {
"group": "3"
}
},
{
"name": "Clear and Restart External Volumes and Containers",
"type": "node",

View File

@@ -49,12 +49,12 @@ Onyx uses Celery for asynchronous task processing with multiple specialized work
4. **Light Worker** (`light`)
- Handles lightweight, fast operations
- Tasks: vespa operations, document permissions sync, external group sync
- Tasks: vespa metadata sync, connector deletion, doc permissions upsert, checkpoint cleanup, index attempt cleanup
- Higher concurrency for quick tasks
5. **Heavy Worker** (`heavy`)
- Handles resource-intensive operations
- Primary task: document pruning operations
- Tasks: connector pruning, document permissions sync, external group sync, CSV generation
- Runs with 4 threads concurrency
6. **KG Processing Worker** (`kg_processing`)

View File

@@ -1,4 +1,4 @@
FROM python:3.11.7-slim-bookworm
FROM python:3.11-slim-bookworm@sha256:9c6f90801e6b68e772b7c0ca74260cbf7af9f320acec894e26fccdaccfbe3b47
LABEL com.danswer.maintainer="founders@onyx.app"
LABEL com.danswer.description="This image is the web/frontend container of Onyx which \

View File

@@ -1,5 +1,5 @@
# Base stage with dependencies
FROM python:3.11.7-slim-bookworm AS base
FROM python:3.11-slim-bookworm@sha256:9c6f90801e6b68e772b7c0ca74260cbf7af9f320acec894e26fccdaccfbe3b47 AS base
ENV DANSWER_RUNNING_IN_DOCKER="true" \
HF_HOME=/app/.cache/huggingface
@@ -50,6 +50,10 @@ COPY ./onyx/utils/logger.py /app/onyx/utils/logger.py
COPY ./onyx/utils/middleware.py /app/onyx/utils/middleware.py
COPY ./onyx/utils/tenant.py /app/onyx/utils/tenant.py
# Sentry configuration (used when SENTRY_DSN is set)
COPY ./onyx/configs/__init__.py /app/onyx/configs/__init__.py
COPY ./onyx/configs/sentry.py /app/onyx/configs/sentry.py
# Place to fetch version information
COPY ./onyx/__init__.py /app/onyx/__init__.py

View File

@@ -26,7 +26,9 @@ from shared_configs.configs import (
TENANT_ID_PREFIX,
)
from onyx.db.models import Base
from celery.backends.database.session import ResultModelBase # type: ignore
from celery.backends.database.session import ( # ty: ignore[unresolved-import]
ResultModelBase,
)
from onyx.db.engine.sql_engine import SqlEngine
# Make sure in alembic.ini [logger_root] level=INFO is set or most logging will be
@@ -208,7 +210,7 @@ def do_run_migrations(
context.configure(
connection=connection,
target_metadata=target_metadata, # type: ignore
target_metadata=target_metadata,
version_table_schema=schema_name,
include_schemas=True,
compare_type=True,
@@ -380,7 +382,7 @@ def run_migrations_offline() -> None:
logger.info(f"Migrating schema: {schema}")
context.configure(
url=url,
target_metadata=target_metadata, # type: ignore
target_metadata=target_metadata,
literal_binds=True,
version_table_schema=schema,
include_schemas=True,
@@ -421,7 +423,7 @@ def run_migrations_offline() -> None:
logger.info(f"Migrating schema: {schema}")
context.configure(
url=url,
target_metadata=target_metadata, # type: ignore
target_metadata=target_metadata,
literal_binds=True,
version_table_schema=schema,
include_schemas=True,
@@ -464,7 +466,7 @@ def run_migrations_online() -> None:
context.configure(
connection=connection,
target_metadata=target_metadata, # type: ignore
target_metadata=target_metadata,
version_table_schema=schema_name,
include_schemas=True,
compare_type=True,

View File

@@ -25,7 +25,7 @@ def upgrade() -> None:
# Use batch mode to modify the enum type
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.alter_column( # type: ignore[attr-defined]
batch_op.alter_column(
"role",
type_=sa.Enum(
"BASIC",
@@ -71,7 +71,7 @@ def downgrade() -> None:
op.drop_column("user__user_group", "is_curator")
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.alter_column( # type: ignore[attr-defined]
batch_op.alter_column(
"role",
type_=sa.Enum(
"BASIC", "ADMIN", name="userrole", native_enum=False, length=20

View File

@@ -49,7 +49,7 @@ def upgrade() -> None:
"time_updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
server_onupdate=sa.text("now()"), # type: ignore
server_onupdate=sa.text("now()"),
nullable=True,
),
sa.Column(

View File

@@ -68,7 +68,7 @@ def upgrade() -> None:
sa.text("SELECT id FROM tool WHERE in_code_tool_id = :in_code_tool_id"),
{"in_code_tool_id": OPEN_URL_TOOL["in_code_tool_id"]},
).fetchone()
tool_id = result[0] # type: ignore
tool_id = result[0]
# Associate the tool with all existing personas
# Get all persona IDs

View File

@@ -63,7 +63,7 @@ def upgrade() -> None:
"time_created",
existing_type=postgresql.TIMESTAMP(timezone=True),
nullable=False,
existing_server_default=sa.text("now()"), # type: ignore
existing_server_default=sa.text("now()"),
)
op.alter_column(
"index_attempt",
@@ -85,7 +85,7 @@ def downgrade() -> None:
"time_created",
existing_type=postgresql.TIMESTAMP(timezone=True),
nullable=True,
existing_server_default=sa.text("now()"), # type: ignore
existing_server_default=sa.text("now()"),
)
op.drop_index(op.f("ix_accesstoken_created_at"), table_name="accesstoken")
op.drop_table("accesstoken")

View File

@@ -19,7 +19,7 @@ depends_on: None = None
def upgrade() -> None:
sequence = Sequence("connector_credential_pair_id_seq")
op.execute(CreateSequence(sequence)) # type: ignore
op.execute(CreateSequence(sequence))
op.add_column(
"connector_credential_pair",
sa.Column(

View File

@@ -0,0 +1,28 @@
"""add_error_tracking_fields_to_index_attempt_errors
Revision ID: d129f37b3d87
Revises: 503883791c39
Create Date: 2026-04-06 19:11:18.261800
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "d129f37b3d87"
down_revision = "503883791c39"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"index_attempt_errors",
sa.Column("error_type", sa.String(), nullable=True),
)
def downgrade() -> None:
op.drop_column("index_attempt_errors", "error_type")

View File

@@ -49,7 +49,7 @@ def run_migrations_offline() -> None:
url = build_connection_string()
context.configure(
url=url,
target_metadata=target_metadata, # type: ignore
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
@@ -61,7 +61,7 @@ def run_migrations_offline() -> None:
def do_run_migrations(connection: Connection) -> None:
context.configure(
connection=connection,
target_metadata=target_metadata, # type: ignore[arg-type]
target_metadata=target_metadata,
)
with context.begin_transaction():

View File

@@ -112,7 +112,7 @@ def _get_access_for_documents(
access_map[document_id] = DocumentAccess.build(
user_emails=list(non_ee_access.user_emails),
user_groups=user_group_info.get(document_id, []),
is_public=is_public_anywhere,
is_public=is_public_anywhere, # ty: ignore[invalid-argument-type]
external_user_emails=list(ext_u_emails),
external_user_group_ids=list(ext_u_groups),
)

View File

@@ -53,7 +53,7 @@ def fetch_query_analytics(
.order_by(cast(ChatMessage.time_sent, Date))
)
return db_session.execute(stmt).all() # type: ignore
return db_session.execute(stmt).all() # ty: ignore[invalid-return-type]
def fetch_per_user_query_analytics(
@@ -92,7 +92,7 @@ def fetch_per_user_query_analytics(
.order_by(cast(ChatMessage.time_sent, Date), ChatSession.user_id)
)
return db_session.execute(stmt).all() # type: ignore
return db_session.execute(stmt).all() # ty: ignore[invalid-return-type]
def fetch_onyxbot_analytics(

View File

@@ -9,7 +9,7 @@ logger = setup_logger()
def fetch_sources_with_connectors(db_session: Session) -> list[DocumentSource]:
sources = db_session.query(distinct(Connector.source)).all() # type: ignore
sources = db_session.query(distinct(Connector.source)).all()
document_sources = [source[0] for source in sources]

View File

@@ -13,6 +13,7 @@ from ee.onyx.server.license.models import LicenseSource
from onyx.auth.schemas import UserRole
from onyx.cache.factory import get_cache_backend
from onyx.configs.constants import ANONYMOUS_USER_EMAIL
from onyx.db.enums import AccountType
from onyx.db.models import License
from onyx.db.models import User
from onyx.utils.logger import setup_logger
@@ -107,12 +108,13 @@ def get_used_seats(tenant_id: str | None = None) -> int:
Get current seat usage directly from database.
For multi-tenant: counts users in UserTenantMapping for this tenant.
For self-hosted: counts all active users (excludes EXT_PERM_USER role
and the anonymous system user).
For self-hosted: counts all active users.
TODO: Exclude API key dummy users from seat counting. API keys create
users with emails like `__DANSWER_API_KEY_*` that should not count toward
seat limits. See: https://linear.app/onyx-app/issue/ENG-3518
Only human accounts count toward seat limits.
SERVICE_ACCOUNT (API key dummy users), EXT_PERM_USER, and the
anonymous system user are excluded. BOT (Slack users) ARE counted
because they represent real humans and get upgraded to STANDARD
when they log in via web.
"""
if MULTI_TENANT:
from ee.onyx.server.tenants.user_mapping import get_tenant_count
@@ -126,9 +128,10 @@ def get_used_seats(tenant_id: str | None = None) -> int:
select(func.count())
.select_from(User)
.where(
User.is_active == True, # type: ignore # noqa: E712
User.is_active == True, # noqa: E712
User.role != UserRole.EXT_PERM_USER,
User.email != ANONYMOUS_USER_EMAIL, # type: ignore
User.email != ANONYMOUS_USER_EMAIL,
User.account_type != AccountType.SERVICE_ACCOUNT,
)
)
return result.scalar() or 0

View File

@@ -121,7 +121,7 @@ class ScimDAL(DAL):
"""Update the last_used_at timestamp for a token."""
token = self._session.get(ScimToken, token_id)
if token:
token.last_used_at = func.now() # type: ignore[assignment]
token.last_used_at = func.now()
# ------------------------------------------------------------------
# User mapping operations
@@ -229,7 +229,7 @@ class ScimDAL(DAL):
def get_user(self, user_id: UUID) -> User | None:
"""Fetch a user by ID."""
return self._session.scalar(
select(User).where(User.id == user_id) # type: ignore[arg-type]
select(User).where(User.id == user_id) # ty: ignore[invalid-argument-type]
)
def get_user_by_email(self, email: str) -> User | None:
@@ -293,16 +293,22 @@ class ScimDAL(DAL):
if attr == "username":
# arg-type: fastapi-users types User.email as str, not a column expression
# assignment: union return type widens but query is still Select[tuple[User]]
query = _apply_scim_string_op(query, User.email, scim_filter) # type: ignore[arg-type, assignment]
query = _apply_scim_string_op(
query, User.email, scim_filter # ty: ignore[invalid-argument-type]
)
elif attr == "active":
query = query.where(
User.is_active.is_(scim_filter.value.lower() == "true") # type: ignore[attr-defined]
User.is_active.is_( # ty: ignore[unresolved-attribute]
scim_filter.value.lower() == "true"
)
)
elif attr == "externalid":
mapping = self.get_user_mapping_by_external_id(scim_filter.value)
if not mapping:
return [], 0
query = query.where(User.id == mapping.user_id) # type: ignore[arg-type]
query = query.where(
User.id == mapping.user_id # ty: ignore[invalid-argument-type]
)
else:
raise ValueError(
f"Unsupported filter attribute: {scim_filter.attribute}"
@@ -318,7 +324,9 @@ class ScimDAL(DAL):
offset = max(start_index - 1, 0)
users = list(
self._session.scalars(
query.order_by(User.id).offset(offset).limit(count) # type: ignore[arg-type]
query.order_by(User.id) # ty: ignore[invalid-argument-type]
.offset(offset)
.limit(count)
)
.unique()
.all()
@@ -577,7 +585,7 @@ class ScimDAL(DAL):
attr = scim_filter.attribute.lower()
if attr == "displayname":
# assignment: union return type widens but query is still Select[tuple[UserGroup]]
query = _apply_scim_string_op(query, UserGroup.name, scim_filter) # type: ignore[assignment]
query = _apply_scim_string_op(query, UserGroup.name, scim_filter)
elif attr == "externalid":
mapping = self.get_group_mapping_by_external_id(scim_filter.value)
if not mapping:
@@ -615,7 +623,9 @@ class ScimDAL(DAL):
users = (
self._session.scalars(
select(User).where(User.id.in_(user_ids)) # type: ignore[attr-defined]
select(User).where(
User.id.in_(user_ids) # ty: ignore[unresolved-attribute]
)
)
.unique()
.all()
@@ -640,7 +650,9 @@ class ScimDAL(DAL):
return []
existing_users = (
self._session.scalars(
select(User).where(User.id.in_(uuids)) # type: ignore[attr-defined]
select(User).where(
User.id.in_(uuids) # ty: ignore[unresolved-attribute]
)
)
.unique()
.all()

View File

@@ -300,8 +300,11 @@ def fetch_user_groups_for_user(
stmt = (
select(UserGroup)
.join(User__UserGroup, User__UserGroup.user_group_id == UserGroup.id)
.join(User, User.id == User__UserGroup.user_id) # type: ignore
.where(User.id == user_id) # type: ignore
.join(
User,
User.id == User__UserGroup.user_id, # ty: ignore[invalid-argument-type]
)
.where(User.id == user_id) # ty: ignore[invalid-argument-type]
)
if only_curator_groups:
stmt = stmt.where(User__UserGroup.is_curator == True) # noqa: E712
@@ -430,7 +433,7 @@ def fetch_user_groups_for_documents(
.group_by(Document.id)
)
return db_session.execute(stmt).all() # type: ignore
return db_session.execute(stmt).all() # ty: ignore[invalid-return-type]
def _check_user_group_is_modifiable(user_group: UserGroup) -> None:
@@ -804,7 +807,9 @@ def update_user_group(
db_user_group.is_up_to_date = False
removed_users = db_session.scalars(
select(User).where(User.id.in_(removed_user_ids)) # type: ignore
select(User).where(
User.id.in_(removed_user_ids) # ty: ignore[unresolved-attribute]
)
).unique()
# Filter out admin and global curator users before validating curator status

View File

@@ -1,6 +1,6 @@
from collections.abc import Iterator
from googleapiclient.discovery import Resource # type: ignore
from googleapiclient.discovery import Resource
from ee.onyx.external_permissions.google_drive.models import GoogleDrivePermission
from ee.onyx.external_permissions.google_drive.permission_retrieval import (
@@ -38,7 +38,7 @@ def get_folder_permissions_by_ids(
A list of permissions matching the provided permission IDs
"""
return get_permissions_by_ids(
drive_service=service,
drive_service=service, # ty: ignore[invalid-argument-type]
doc_id=folder_id,
permission_ids=permission_ids,
)
@@ -68,7 +68,7 @@ def get_modified_folders(
# Retrieve and yield folders
for folder in execute_paginated_retrieval(
retrieval_function=service.files().list,
retrieval_function=service.files().list, # ty: ignore[unresolved-attribute]
list_key="files",
continue_on_404_or_403=True,
corpora="allDrives",

View File

@@ -1,6 +1,6 @@
from collections.abc import Generator
from googleapiclient.errors import HttpError # type: ignore
from googleapiclient.errors import HttpError
from pydantic import BaseModel
from ee.onyx.db.external_perm import ExternalUserGroup
@@ -183,7 +183,7 @@ def _get_drive_members(
)
admin_user_info = (
admin_service.users()
admin_service.users() # ty: ignore[unresolved-attribute]
.get(userKey=google_drive_connector.primary_admin_email)
.execute()
)
@@ -197,7 +197,7 @@ def _get_drive_members(
try:
for permission in execute_paginated_retrieval(
drive_service.permissions().list,
drive_service.permissions().list, # ty: ignore[unresolved-attribute]
list_key="permissions",
fileId=drive_id,
fields="permissions(emailAddress, type),nextPageToken",
@@ -256,7 +256,7 @@ def _get_all_google_groups(
"""
group_emails: set[str] = set()
for group in execute_paginated_retrieval(
admin_service.groups().list,
admin_service.groups().list, # ty: ignore[unresolved-attribute]
list_key="groups",
domain=google_domain,
fields="groups(email),nextPageToken",
@@ -274,7 +274,7 @@ def _google_group_to_onyx_group(
"""
group_member_emails: set[str] = set()
for member in execute_paginated_retrieval(
admin_service.members().list,
admin_service.members().list, # ty: ignore[unresolved-attribute]
list_key="members",
groupKey=group_email,
fields="members(email),nextPageToken",
@@ -298,7 +298,7 @@ def _map_group_email_to_member_emails(
for group_email in group_emails:
group_member_emails: set[str] = set()
for member in execute_paginated_retrieval(
admin_service.members().list,
admin_service.members().list, # ty: ignore[unresolved-attribute]
list_key="members",
groupKey=group_email,
fields="members(email),nextPageToken",

View File

@@ -33,7 +33,7 @@ def get_permissions_by_ids(
# Fetch all permissions for the document
fetched_permissions = execute_paginated_retrieval(
retrieval_function=drive_service.permissions().list,
retrieval_function=drive_service.permissions().list, # ty: ignore[unresolved-attribute]
list_key="permissions",
fileId=doc_id,
fields="permissions(id, emailAddress, type, domain, allowFileDiscovery, permissionDetails),nextPageToken",

View File

@@ -68,7 +68,7 @@ def _build_holder_map(permissions: list[dict]) -> dict[str, list[Holder]]:
logger.warning(f"Expected a 'raw' field, but none was found: {raw_perm=}")
continue
permission = Permission(**raw_perm.raw)
permission = Permission(**raw_perm.raw) # ty: ignore[invalid-argument-type]
# We only care about ability to browse through projects + issues (not other permissions such as read/write).
if permission.permission != "BROWSE_PROJECTS":

View File

@@ -1,6 +1,6 @@
from collections.abc import Generator
from office365.sharepoint.client_context import ClientContext # type: ignore[import-untyped]
from office365.sharepoint.client_context import ClientContext
from ee.onyx.db.external_perm import ExternalUserGroup
from ee.onyx.external_permissions.sharepoint.permission_utils import (

View File

@@ -7,11 +7,11 @@ from typing import Any
from urllib.parse import urlparse
import requests as _requests
from office365.graph_client import GraphClient # type: ignore[import-untyped]
from office365.onedrive.driveitems.driveItem import DriveItem # type: ignore[import-untyped]
from office365.runtime.client_request import ClientRequestException # type: ignore
from office365.sharepoint.client_context import ClientContext # type: ignore[import-untyped]
from office365.sharepoint.permissions.securable_object import RoleAssignmentCollection # type: ignore[import-untyped]
from office365.graph_client import GraphClient
from office365.onedrive.driveitems.driveItem import DriveItem
from office365.runtime.client_request import ClientRequestException
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.permissions.securable_object import RoleAssignmentCollection
from pydantic import BaseModel
from ee.onyx.db.external_perm import ExternalUserGroup

View File

@@ -287,8 +287,10 @@ def update_hook(
validated_is_reachable: bool | None = None
if endpoint_url_changing or api_key_changing or timeout_changing:
existing = _get_hook_or_404(db_session, hook_id)
effective_url: str = (
req.endpoint_url if endpoint_url_changing else existing.endpoint_url # type: ignore[assignment] # endpoint_url is required on create and cannot be cleared on update
effective_url: str = ( # ty: ignore[invalid-assignment]
req.endpoint_url
if endpoint_url_changing
else existing.endpoint_url # endpoint_url is required on create and cannot be cleared on update
)
effective_api_key: str | None = (
(api_key if not isinstance(api_key, UnsetType) else None)
@@ -299,8 +301,10 @@ def update_hook(
else None
)
)
effective_timeout: float = (
req.timeout_seconds if timeout_changing else existing.timeout_seconds # type: ignore[assignment] # req.timeout_seconds is non-None when timeout_changing (validated by HookUpdateRequest)
effective_timeout: float = ( # ty: ignore[invalid-assignment]
req.timeout_seconds
if timeout_changing
else existing.timeout_seconds # req.timeout_seconds is non-None when timeout_changing (validated by HookUpdateRequest)
)
validation = _validate_endpoint(
endpoint_url=effective_url,

View File

@@ -97,7 +97,7 @@ def fetch_and_process_chat_session_history(
break
paged_snapshots = parallel_yield(
[
[ # ty: ignore[invalid-argument-type]
yield_snapshot_from_chat_session(
db_session=db_session,
chat_session=chat_session,

View File

@@ -11,6 +11,8 @@ require a valid SCIM bearer token.
from __future__ import annotations
import hashlib
import struct
from uuid import UUID
from fastapi import APIRouter
@@ -22,6 +24,7 @@ from fastapi import Response
from fastapi.responses import JSONResponse
from fastapi_users.password import PasswordHelper
from sqlalchemy import func
from sqlalchemy import text
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
@@ -65,12 +68,25 @@ from onyx.db.permissions import recompute_user_permissions__no_commit
from onyx.db.users import assign_user_to_default_groups__no_commit
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import fetch_ee_implementation_or_noop
from shared_configs.contextvars import get_current_tenant_id
logger = setup_logger()
# Group names reserved for system default groups (seeded by migration).
_RESERVED_GROUP_NAMES = frozenset({"Admin", "Basic"})
# Namespace prefix for the seat-allocation advisory lock. Hashed together
# with the tenant ID so the lock is scoped per-tenant (unrelated tenants
# never block each other) and cannot collide with unrelated advisory locks.
_SEAT_LOCK_NAMESPACE = "onyx_scim_seat_lock"
def _seat_lock_id_for_tenant(tenant_id: str) -> int:
"""Derive a stable 64-bit signed int lock id for this tenant's seat lock."""
digest = hashlib.sha256(f"{_SEAT_LOCK_NAMESPACE}:{tenant_id}".encode()).digest()
# pg_advisory_xact_lock takes a signed 8-byte int; unpack as such.
return struct.unpack("q", digest[:8])[0]
class ScimJSONResponse(JSONResponse):
"""JSONResponse with Content-Type: application/scim+json (RFC 7644 §3.1)."""
@@ -209,12 +225,37 @@ def _apply_exclusions(
def _check_seat_availability(dal: ScimDAL) -> str | None:
"""Return an error message if seat limit is reached, else None."""
"""Return an error message if seat limit is reached, else None.
Acquires a transaction-scoped advisory lock so that concurrent
SCIM requests are serialized. IdPs like Okta send provisioning
requests in parallel batches — without serialization the check is
vulnerable to a TOCTOU race where N concurrent requests each see
"seats available", all insert, and the tenant ends up over its
seat limit.
The lock is held until the caller's next COMMIT or ROLLBACK, which
means the seat count cannot change between the check here and the
subsequent INSERT/UPDATE. Each call site in this module follows
the pattern: _check_seat_availability → write → dal.commit()
(which releases the lock for the next waiting request).
"""
check_fn = fetch_ee_implementation_or_noop(
"onyx.db.license", "check_seat_availability", None
)
if check_fn is None:
return None
# Transaction-scoped advisory lock — released on dal.commit() / dal.rollback().
# The lock id is derived from the tenant so unrelated tenants never block
# each other, and from a namespace string so it cannot collide with
# unrelated advisory locks elsewhere in the codebase.
lock_id = _seat_lock_id_for_tenant(get_current_tenant_id())
dal.session.execute(
text("SELECT pg_advisory_xact_lock(:lock_id)"),
{"lock_id": lock_id},
)
result = check_fn(dal.session, seats_needed=1)
if not result.available:
return result.error_message or "Seat limit reached"

View File

@@ -55,8 +55,10 @@ def run_alembic_migrations(schema_name: str) -> None:
alembic_cfg.attributes["configure_logger"] = False
# Mimic command-line options by adding 'cmd_opts' to the config
alembic_cfg.cmd_opts = SimpleNamespace() # type: ignore
alembic_cfg.cmd_opts.x = [f"schemas={schema_name}"] # type: ignore
alembic_cfg.cmd_opts = SimpleNamespace() # ty: ignore[invalid-assignment]
alembic_cfg.cmd_opts.x = [ # ty: ignore[invalid-assignment]
f"schemas={schema_name}"
]
# Run migrations programmatically
command.upgrade(alembic_cfg, "head")

View File

@@ -349,8 +349,9 @@ def get_tenant_count(tenant_id: str) -> int:
user_count = (
db_session.query(User)
.filter(
User.email.in_(emails), # type: ignore
User.is_active == True, # type: ignore # noqa: E712
User.email.in_(emails), # ty: ignore[unresolved-attribute]
User.is_active # noqa: E712 # ty: ignore[invalid-argument-type]
== True,
)
.count()
)

View File

@@ -73,7 +73,7 @@ def capture_and_sync_with_alternate_posthog(
cloud_props.pop("onyx_cloud_user_id", None)
posthog.identify(
distinct_id=cloud_user_id,
distinct_id=cloud_user_id, # ty: ignore[possibly-unresolved-reference]
properties=cloud_props,
)
except Exception as e:
@@ -105,7 +105,7 @@ def get_anon_id_from_request(request: Any) -> str | None:
if (cookie_value := request.cookies.get(cookie_name)) and (
parsed := parse_posthog_cookie(cookie_value)
):
return parsed.get("distinct_id")
return parsed.get("distinct_id") # ty: ignore[possibly-unresolved-reference]
return None

View File

@@ -23,7 +23,7 @@
# from shared_configs.model_server_models import IntentResponse
# if TYPE_CHECKING:
# from setfit import SetFitModel # type: ignore[import-untyped]
# from setfit import SetFitModel
# from transformers import PreTrainedTokenizer, BatchEncoding
@@ -423,7 +423,7 @@
# def map_keywords(
# input_ids: torch.Tensor, tokenizer: "PreTrainedTokenizer", is_keyword: list[bool]
# ) -> list[str]:
# tokens = tokenizer.convert_ids_to_tokens(input_ids) # type: ignore
# tokens = tokenizer.convert_ids_to_tokens(input_ids)
# if not len(tokens) == len(is_keyword):
# raise ValueError("Length of tokens and keyword predictions must match")

View File

@@ -18,7 +18,7 @@
# super().__init__()
# config = DistilBertConfig()
# self.distilbert = DistilBertModel(config)
# config = self.distilbert.config # type: ignore
# config = self.distilbert.config
# # Keyword tokenwise binary classification layer
# self.keyword_classifier = nn.Linear(config.dim, 2)
@@ -85,7 +85,7 @@
# self.config = config
# self.distilbert = DistilBertModel(config)
# config = self.distilbert.config # type: ignore
# config = self.distilbert.config
# self.connector_global_classifier = nn.Linear(config.dim, 1)
# self.connector_match_classifier = nn.Linear(config.dim, 1)
# self.tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")

View File

@@ -96,11 +96,14 @@ def get_model_app() -> FastAPI:
title="Onyx Model Server", version=__version__, lifespan=lifespan
)
if SENTRY_DSN:
from onyx.configs.sentry import _add_instance_tags
sentry_sdk.init(
dsn=SENTRY_DSN,
integrations=[StarletteIntegration(), FastApiIntegration()],
traces_sample_rate=0.1,
release=__version__,
before_send=_add_instance_tags,
)
logger.info("Sentry initialized")
else:

View File

@@ -7,8 +7,8 @@ from email.mime.text import MIMEText
from email.utils import formatdate
from email.utils import make_msgid
import sendgrid # type: ignore
from sendgrid.helpers.mail import Attachment # type: ignore
import sendgrid
from sendgrid.helpers.mail import Attachment
from sendgrid.helpers.mail import Content
from sendgrid.helpers.mail import ContentId
from sendgrid.helpers.mail import Disposition

View File

@@ -10,7 +10,7 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from jwt import decode as jwt_decode
from jwt import InvalidTokenError
from jwt import PyJWTError
from jwt.algorithms import RSAAlgorithm
from jwt.algorithms import RSAAlgorithm # ty: ignore[possibly-missing-import]
from onyx.configs.app_configs import JWT_PUBLIC_KEY_URL
from onyx.utils.logger import setup_logger

View File

@@ -46,8 +46,10 @@ async def _test_expire_oauth_token(
updated_data: Dict[str, Any] = {"expires_at": new_expires_at}
await user_manager.user_db.update_oauth_account(
user, cast(Any, oauth_account), updated_data
await user_manager.user_db.update_oauth_account( # ty: ignore[invalid-argument-type]
user, # ty: ignore[invalid-argument-type]
cast(Any, oauth_account),
updated_data,
)
return True
@@ -132,8 +134,10 @@ async def refresh_oauth_token(
)
# Update the OAuth account
await user_manager.user_db.update_oauth_account(
user, cast(Any, oauth_account), updated_data
await user_manager.user_db.update_oauth_account( # ty: ignore[invalid-argument-type]
user, # ty: ignore[invalid-argument-type]
cast(Any, oauth_account),
updated_data,
)
logger.info(f"Successfully refreshed OAuth token for {user.email}")

View File

@@ -191,7 +191,7 @@ class OAuthTokenManager:
@staticmethod
def _unwrap_sensitive_str(value: SensitiveValue[str] | str) -> str:
if isinstance(value, SensitiveValue):
return value.get_value(apply_mask=False)
return value.get_value(apply_mask=False) # ty: ignore[invalid-return-type]
return value
@staticmethod
@@ -199,5 +199,7 @@ class OAuthTokenManager:
token_data: SensitiveValue[dict[str, Any]] | dict[str, Any],
) -> dict[str, Any]:
if isinstance(token_data, SensitiveValue):
return token_data.get_value(apply_mask=False)
return token_data.get_value( # ty: ignore[invalid-return-type]
apply_mask=False
)
return token_data

View File

@@ -121,5 +121,7 @@ def require_permission(
return user
dependency._is_require_permission = True # type: ignore[attr-defined] # sentinel for auth_check detection
dependency._is_require_permission = ( # ty: ignore[unresolved-attribute]
True # sentinel for auth_check detection
)
return dependency

View File

@@ -45,7 +45,9 @@ from fastapi_users import UUIDIDMixin
from fastapi_users.authentication import AuthenticationBackend
from fastapi_users.authentication import CookieTransport
from fastapi_users.authentication import JWTStrategy
from fastapi_users.authentication import RedisStrategy
from fastapi_users.authentication import (
RedisStrategy, # ty: ignore[possibly-missing-import]
)
from fastapi_users.authentication import Strategy
from fastapi_users.authentication.strategy.db import AccessTokenDatabase
from fastapi_users.authentication.strategy.db import DatabaseStrategy
@@ -462,14 +464,16 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
self.user_db = tenant_user_db
if hasattr(user_create, "role"):
user_create.role = UserRole.BASIC
user_create.role = UserRole.BASIC # ty: ignore[invalid-assignment]
user_count = await get_user_count()
if (
user_count == 0
or user_create.email in get_default_admin_user_emails()
):
user_create.role = UserRole.ADMIN
user_create.role = ( # ty: ignore[invalid-assignment]
UserRole.ADMIN
)
# Check seat availability for new users (single-tenant only)
with get_session_with_current_tenant() as sync_db:
@@ -516,7 +520,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# Expire so the async session re-fetches the row updated by
# the sync session above.
self.user_db.session.expire(user)
user = await self.user_db.get(user_id) # type: ignore[assignment]
user = await self.user_db.get( # ty: ignore[invalid-assignment]
user_id
)
except exceptions.UserAlreadyExists:
user = await self.get_by_email(user_create.email)
@@ -544,7 +550,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# Expire so the async session re-fetches the row updated by
# the sync session above.
self.user_db.session.expire(user)
user = await self.user_db.get(user_id) # type: ignore[assignment]
user = await self.user_db.get( # ty: ignore[invalid-assignment]
user_id
)
if user_created:
await self._assign_default_pinned_assistants(user, db_session)
remove_user_from_invited_users(user_create.email)
@@ -592,7 +600,11 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
update nor the group assignment is visible without the other.
"""
with get_session_with_current_tenant() as sync_db:
sync_user = sync_db.query(User).filter(User.id == user_id).first() # type: ignore[arg-type]
sync_user = (
sync_db.query(User)
.filter(User.id == user_id) # ty: ignore[invalid-argument-type]
.first()
)
if sync_user:
sync_user.hashed_password = self.password_helper.hash(
user_create.password
@@ -613,7 +625,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
user_id,
)
async def validate_password(self, password: str, _: schemas.UC | models.UP) -> None:
async def validate_password( # ty: ignore[invalid-method-override]
self, password: str, _: schemas.UC | models.UP
) -> None:
# Validate password according to configurable security policy (defined via environment variables)
if len(password) < PASSWORD_MIN_LENGTH:
raise exceptions.InvalidPasswordException(
@@ -644,7 +658,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
return
@log_function_time(print_only=True)
async def oauth_callback(
async def oauth_callback( # ty: ignore[invalid-method-override]
self,
oauth_name: str,
access_token: str,
@@ -754,7 +768,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
user,
# NOTE: OAuthAccount DOES implement the OAuthAccountProtocol
# but the type checker doesn't know that :(
existing_oauth_account, # type: ignore
existing_oauth_account, # ty: ignore[invalid-argument-type]
oauth_account_dict,
)
@@ -788,7 +802,11 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# transaction so neither change is visible without the other.
was_inactive = not user.is_active
with get_session_with_current_tenant() as sync_db:
sync_user = sync_db.query(User).filter(User.id == user.id).first() # type: ignore[arg-type]
sync_user = (
sync_db.query(User)
.filter(User.id == user.id) # ty: ignore[invalid-argument-type]
.first()
)
if sync_user:
sync_user.is_verified = is_verified_by_default
sync_user.role = UserRole.BASIC
@@ -808,7 +826,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# otherwise, the oidc expiry will always be old, and the user will never be able to login
if user.oidc_expiry is not None and not TRACK_EXTERNAL_IDP_EXPIRY:
await self.user_db.update(user, {"oidc_expiry": None})
user.oidc_expiry = None # type: ignore
user.oidc_expiry = None # ty: ignore[invalid-assignment]
remove_user_from_invited_users(user.email)
if token:
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
@@ -925,7 +943,11 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
and (marketing_cookie_value := request.cookies.get(marketing_cookie_name))
and (parsed_cookie := parse_posthog_cookie(marketing_cookie_value))
):
marketing_anonymous_id = parsed_cookie["distinct_id"]
marketing_anonymous_id = (
parsed_cookie[ # ty: ignore[possibly-unresolved-reference]
"distinct_id"
]
)
# Technically, USER_SIGNED_UP is only fired from the cloud site when
# it is the first user in a tenant. However, it is semantically correct
@@ -942,7 +964,10 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
}
# Add all other values from the marketing cookie (featureFlags, etc.)
for key, value in parsed_cookie.items():
for (
key,
value,
) in parsed_cookie.items(): # ty: ignore[possibly-unresolved-reference]
if key != "distinct_id":
properties.setdefault(key, value)
@@ -1504,7 +1529,7 @@ async def _sync_jwt_oidc_expiry(
if user.oidc_expiry is not None:
await user_manager.user_db.update(user, {"oidc_expiry": None})
user.oidc_expiry = None # type: ignore
user.oidc_expiry = None # ty: ignore[invalid-assignment]
async def _get_or_create_user_from_jwt(
@@ -2232,7 +2257,7 @@ def get_oauth_router(
# Proceed to authenticate or create the user
try:
user = await user_manager.oauth_callback(
user = await user_manager.oauth_callback( # ty: ignore[invalid-argument-type]
oauth_client.name,
token["access_token"],
account_id,

View File

@@ -6,15 +6,16 @@ from typing import Any
from typing import cast
import sentry_sdk
from celery import bootsteps # type: ignore
from celery import bootsteps # ty: ignore[unresolved-import]
from celery import Task
from celery.app import trace
from celery.app import trace # ty: ignore[unresolved-import]
from celery.exceptions import WorkerShutdown
from celery.signals import before_task_publish
from celery.signals import task_postrun
from celery.signals import task_prerun
from celery.states import READY_STATES
from celery.utils.log import get_task_logger
from celery.worker import strategy # type: ignore
from celery.worker import strategy # ty: ignore[unresolved-import]
from redis.lock import Lock as RedisLock
from sentry_sdk.integrations.celery import CeleryIntegration
from sqlalchemy import text
@@ -62,11 +63,14 @@ logger = setup_logger()
task_logger = get_task_logger(__name__)
if SENTRY_DSN:
from onyx.configs.sentry import _add_instance_tags
sentry_sdk.init(
dsn=SENTRY_DSN,
integrations=[CeleryIntegration()],
traces_sample_rate=0.1,
release=__version__,
before_send=_add_instance_tags,
)
logger.info("Sentry initialized")
else:
@@ -94,6 +98,17 @@ class TenantAwareTask(Task):
CURRENT_TENANT_ID_CONTEXTVAR.set(None)
@before_task_publish.connect
def on_before_task_publish(
headers: dict[str, Any] | None = None,
**kwargs: Any, # noqa: ARG001
) -> None:
"""Stamp the current wall-clock time into the task message headers so that
workers can compute queue wait time (time between publish and execution)."""
if headers is not None:
headers["enqueued_at"] = time.time()
@task_prerun.connect
def on_task_prerun(
sender: Any | None = None, # noqa: ARG001

View File

@@ -3,7 +3,7 @@ from typing import Any
from celery import Celery
from celery import signals
from celery.beat import PersistentScheduler # type: ignore
from celery.beat import PersistentScheduler # ty: ignore[unresolved-import]
from celery.signals import beat_init
from celery.utils.log import get_task_logger

View File

@@ -4,4 +4,4 @@ import onyx.background.celery.apps.app_base as app_base
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.client")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]

View File

@@ -29,7 +29,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.docfetching")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -100,7 +100,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_DOCFETCHING_APP_NAME)
pool_size = cast(int, sender.concurrency) # type: ignore
pool_size = cast(int, sender.concurrency) # ty: ignore[unresolved-attribute]
SqlEngine.init_engine(pool_size=pool_size, max_overflow=8)
app_base.wait_for_redis(sender, **kwargs)

View File

@@ -30,7 +30,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.docprocessing")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -106,7 +106,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
# "SSL connection has been closed unexpectedly"
# actually setting the spawn method in the cloud fixes 95% of these.
# setting pre ping might help even more, but not worrying about that yet
pool_size = cast(int, sender.concurrency) # type: ignore
pool_size = cast(int, sender.concurrency) # ty: ignore[unresolved-attribute]
SqlEngine.init_engine(pool_size=pool_size, max_overflow=8)
app_base.wait_for_redis(sender, **kwargs)

View File

@@ -27,7 +27,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.heavy")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -92,7 +92,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
pool_size = cast(int, sender.concurrency) # type: ignore
pool_size = cast(int, sender.concurrency) # ty: ignore[unresolved-attribute]
SqlEngine.init_engine(pool_size=pool_size, max_overflow=8)
app_base.wait_for_redis(sender, **kwargs)

View File

@@ -16,6 +16,12 @@ from onyx.configs.app_configs import VESPA_CLOUD_CERT_PATH
from onyx.configs.app_configs import VESPA_CLOUD_KEY_PATH
from onyx.configs.constants import POSTGRES_CELERY_WORKER_LIGHT_APP_NAME
from onyx.db.engine.sql_engine import SqlEngine
from onyx.server.metrics.celery_task_metrics import on_celery_task_postrun
from onyx.server.metrics.celery_task_metrics import on_celery_task_prerun
from onyx.server.metrics.celery_task_metrics import on_celery_task_rejected
from onyx.server.metrics.celery_task_metrics import on_celery_task_retry
from onyx.server.metrics.celery_task_metrics import on_celery_task_revoked
from onyx.server.metrics.metrics_server import start_metrics_server
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
@@ -23,7 +29,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.light")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -36,6 +42,7 @@ def on_task_prerun(
**kwds: Any,
) -> None:
app_base.on_task_prerun(sender, task_id, task, args, kwargs, **kwds)
on_celery_task_prerun(task_id, task)
@signals.task_postrun.connect
@@ -50,6 +57,31 @@ def on_task_postrun(
**kwds: Any,
) -> None:
app_base.on_task_postrun(sender, task_id, task, args, kwargs, retval, state, **kwds)
on_celery_task_postrun(task_id, task, state)
@signals.task_retry.connect
def on_task_retry(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
task_id = getattr(getattr(sender, "request", None), "id", None)
on_celery_task_retry(task_id, sender)
@signals.task_revoked.connect
def on_task_revoked(sender: Any | None = None, **kwargs: Any) -> None:
task_name = getattr(sender, "name", None) or str(sender)
on_celery_task_revoked(kwargs.get("task_id"), task_name)
@signals.task_rejected.connect
def on_task_rejected(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
message = kwargs.get("message")
task_name: str | None = None
if message is not None:
headers = getattr(message, "headers", None) or {}
task_name = headers.get("task")
if task_name is None:
task_name = "unknown"
on_celery_task_rejected(None, task_name)
@celeryd_init.connect
@@ -63,19 +95,26 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
logger.info(f"Concurrency: {sender.concurrency}") # type: ignore
logger.info(
f"Concurrency: {sender.concurrency}" # ty: ignore[unresolved-attribute]
)
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME)
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=EXTRA_CONCURRENCY) # type: ignore
SqlEngine.init_engine(
pool_size=sender.concurrency, # ty: ignore[unresolved-attribute]
max_overflow=EXTRA_CONCURRENCY,
)
if MANAGED_VESPA:
httpx_init_vespa_pool(
sender.concurrency + EXTRA_CONCURRENCY, # type: ignore
sender.concurrency + EXTRA_CONCURRENCY, # ty: ignore[unresolved-attribute]
ssl_cert=VESPA_CLOUD_CERT_PATH,
ssl_key=VESPA_CLOUD_KEY_PATH,
)
else:
httpx_init_vespa_pool(sender.concurrency + EXTRA_CONCURRENCY) # type: ignore
httpx_init_vespa_pool(
sender.concurrency + EXTRA_CONCURRENCY # ty: ignore[unresolved-attribute]
)
app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
@@ -90,6 +129,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
start_metrics_server("light")
app_base.on_worker_ready(sender, **kwargs)

View File

@@ -20,7 +20,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.monitoring")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect

View File

@@ -3,7 +3,7 @@ import os
from typing import Any
from typing import cast
from celery import bootsteps # type: ignore
from celery import bootsteps # ty: ignore[unresolved-import]
from celery import Celery
from celery import signals
from celery import Task
@@ -46,7 +46,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.primary")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -85,7 +85,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME)
pool_size = cast(int, sender.concurrency) # type: ignore
pool_size = cast(int, sender.concurrency) # ty: ignore[unresolved-attribute]
SqlEngine.init_engine(
pool_size=pool_size, max_overflow=CELERY_WORKER_PRIMARY_POOL_OVERFLOW
)
@@ -145,7 +145,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
raise WorkerShutdown("Primary worker lock could not be acquired!")
# tacking on our own user data to the sender
sender.primary_worker_lock = lock # type: ignore
sender.primary_worker_lock = lock # ty: ignore[unresolved-attribute]
# As currently designed, when this worker starts as "primary", we reinitialize redis
# to a clean state (for our purposes, anyway)

View File

@@ -22,7 +22,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.user_file_processing")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -66,7 +66,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
# "SSL connection has been closed unexpectedly"
# actually setting the spawn method in the cloud fixes 95% of these.
# setting pre ping might help even more, but not worrying about that yet
pool_size = cast(int, sender.concurrency) # type: ignore
pool_size = cast(int, sender.concurrency) # ty: ignore[unresolved-attribute]
SqlEngine.init_engine(pool_size=pool_size, max_overflow=8)
app_base.wait_for_redis(sender, **kwargs)

View File

@@ -179,7 +179,7 @@ def celery_inspect_get_workers(name_filter: str | None, app: Celery) -> list[str
# filter for and create an indexing specific inspect object
inspect = app.control.inspect()
workers: dict[str, Any] = inspect.ping() # type: ignore
workers: dict[str, Any] = inspect.ping() # ty: ignore[invalid-assignment]
if workers:
for worker_name in list(workers.keys()):
# if the name filter not set, return all worker names
@@ -208,7 +208,9 @@ def celery_inspect_get_reserved(worker_names: list[str], app: Celery) -> set[str
inspect = app.control.inspect(destination=worker_names)
# get the list of reserved tasks
reserved_tasks: dict[str, list] | None = inspect.reserved() # type: ignore
reserved_tasks: dict[str, list] | None = ( # ty: ignore[invalid-assignment]
inspect.reserved()
)
if reserved_tasks:
for _, task_list in reserved_tasks.items():
for task in task_list:
@@ -229,7 +231,9 @@ def celery_inspect_get_active(worker_names: list[str], app: Celery) -> set[str]:
inspect = app.control.inspect(destination=worker_names)
# get the list of reserved tasks
active_tasks: dict[str, list] | None = inspect.active() # type: ignore
active_tasks: dict[str, list] | None = ( # ty: ignore[invalid-assignment]
inspect.active()
)
if active_tasks:
for _, task_list in active_tasks.items():
for task in task_list:

View File

@@ -59,6 +59,11 @@ from onyx.redis.redis_connector_delete import RedisConnectorDelete
from onyx.redis.redis_connector_delete import RedisConnectorDeletePayload
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.server.metrics.deletion_metrics import inc_deletion_blocked
from onyx.server.metrics.deletion_metrics import inc_deletion_completed
from onyx.server.metrics.deletion_metrics import inc_deletion_fence_reset
from onyx.server.metrics.deletion_metrics import inc_deletion_started
from onyx.server.metrics.deletion_metrics import observe_deletion_taskset_duration
from onyx.utils.variable_functionality import (
fetch_versioned_implementation_with_fallback,
)
@@ -102,7 +107,7 @@ def revoke_tasks_blocking_deletion(
f"Revoked permissions sync task {permissions_sync_payload.celery_task_id}."
)
except Exception:
task_logger.exception("Exception while revoking pruning task")
task_logger.exception("Exception while revoking permissions sync task")
try:
prune_payload = redis_connector.prune.payload
@@ -110,7 +115,7 @@ def revoke_tasks_blocking_deletion(
app.control.revoke(prune_payload.celery_task_id)
task_logger.info(f"Revoked pruning task {prune_payload.celery_task_id}.")
except Exception:
task_logger.exception("Exception while revoking permissions sync task")
task_logger.exception("Exception while revoking pruning task")
try:
external_group_sync_payload = redis_connector.external_group_sync.payload
@@ -300,6 +305,7 @@ def try_generate_document_cc_pair_cleanup_tasks(
recent_index_attempts
and recent_index_attempts[0].status == IndexingStatus.IN_PROGRESS
):
inc_deletion_blocked(tenant_id, "indexing")
raise TaskDependencyError(
"Connector deletion - Delayed (indexing in progress): "
f"cc_pair={cc_pair_id} "
@@ -307,11 +313,13 @@ def try_generate_document_cc_pair_cleanup_tasks(
)
if redis_connector.prune.fenced:
inc_deletion_blocked(tenant_id, "pruning")
raise TaskDependencyError(
f"Connector deletion - Delayed (pruning in progress): cc_pair={cc_pair_id}"
)
if redis_connector.permissions.fenced:
inc_deletion_blocked(tenant_id, "permissions")
raise TaskDependencyError(
f"Connector deletion - Delayed (permissions in progress): cc_pair={cc_pair_id}"
)
@@ -359,6 +367,7 @@ def try_generate_document_cc_pair_cleanup_tasks(
# set this only after all tasks have been added
fence_payload.num_tasks = tasks_generated
redis_connector.delete.set_fence(fence_payload)
inc_deletion_started(tenant_id)
return tasks_generated
@@ -508,7 +517,11 @@ def monitor_connector_deletion_taskset(
db_session=db_session,
connector_id=connector_id_to_delete,
)
if not connector or not len(connector.credentials):
if not connector:
task_logger.info(
"Connector deletion - Connector already deleted, skipping connector cleanup"
)
elif not len(connector.credentials):
task_logger.info(
"Connector deletion - Found no credentials left for connector, deleting connector"
)
@@ -523,6 +536,12 @@ def monitor_connector_deletion_taskset(
num_docs_synced=fence_data.num_tasks,
)
duration = (
datetime.now(timezone.utc) - fence_data.submitted
).total_seconds()
observe_deletion_taskset_duration(tenant_id, "success", duration)
inc_deletion_completed(tenant_id, "success")
except Exception as e:
db_session.rollback()
stack_trace = traceback.format_exc()
@@ -541,6 +560,11 @@ def monitor_connector_deletion_taskset(
f"Connector deletion exceptioned: "
f"cc_pair={cc_pair_id} connector={connector_id_to_delete} credential={credential_id_to_delete}"
)
duration = (
datetime.now(timezone.utc) - fence_data.submitted
).total_seconds()
observe_deletion_taskset_duration(tenant_id, "failure", duration)
inc_deletion_completed(tenant_id, "failure")
raise e
task_logger.info(
@@ -717,5 +741,6 @@ def validate_connector_deletion_fence(
f"fence={fence_key}"
)
inc_deletion_fence_reset(tenant_id)
redis_connector.delete.reset()
return

View File

@@ -135,10 +135,13 @@ def _docfetching_task(
# Since connector_indexing_proxy_task spawns a new process using this function as
# the entrypoint, we init Sentry here.
if SENTRY_DSN:
from onyx.configs.sentry import _add_instance_tags
sentry_sdk.init(
dsn=SENTRY_DSN,
traces_sample_rate=0.1,
release=__version__,
before_send=_add_instance_tags,
)
logger.info("Sentry initialized")
else:

View File

@@ -3,6 +3,7 @@ import os
import time
import traceback
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from datetime import timedelta
from datetime import timezone
@@ -50,6 +51,7 @@ from onyx.configs.constants import AuthType
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT
from onyx.configs.constants import MilestoneRecordType
from onyx.configs.constants import NotificationType
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
@@ -85,6 +87,8 @@ from onyx.db.indexing_coordination import INDEXING_PROGRESS_TIMEOUT_HOURS
from onyx.db.indexing_coordination import IndexingCoordination
from onyx.db.models import IndexAttempt
from onyx.db.models import SearchSettings
from onyx.db.notification import create_notification
from onyx.db.notification import get_notifications
from onyx.db.search_settings import get_current_search_settings
from onyx.db.search_settings import get_secondary_search_settings
from onyx.db.swap_index import check_and_perform_index_swap
@@ -105,6 +109,9 @@ from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.redis.redis_utils import is_fence
from onyx.server.metrics.connector_health_metrics import on_connector_error_state_change
from onyx.server.metrics.connector_health_metrics import on_connector_indexing_success
from onyx.server.metrics.connector_health_metrics import on_index_attempt_status_change
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.utils.logger import setup_logger
from onyx.utils.middleware import make_randomized_onyx_request_id
@@ -400,7 +407,6 @@ def check_indexing_completion(
tenant_id: str,
task: Task,
) -> None:
logger.info(
f"Checking for indexing completion: attempt={index_attempt_id} tenant={tenant_id}"
)
@@ -521,13 +527,25 @@ def check_indexing_completion(
# Update CC pair status if successful
cc_pair = get_connector_credential_pair_from_id(
db_session, attempt.connector_credential_pair_id
db_session,
attempt.connector_credential_pair_id,
eager_load_connector=True,
)
if cc_pair is None:
raise RuntimeError(
f"CC pair {attempt.connector_credential_pair_id} not found in database"
)
source = cc_pair.connector.source.value
connector_name = cc_pair.connector.name or f"cc_pair_{cc_pair.id}"
on_index_attempt_status_change(
tenant_id=tenant_id,
source=source,
cc_pair_id=cc_pair.id,
connector_name=connector_name,
status=attempt.status.value,
)
if attempt.status.is_successful():
# NOTE: we define the last successful index time as the time the last successful
# attempt finished. This is distinct from the poll_range_end of the last successful
@@ -548,10 +566,41 @@ def check_indexing_completion(
event=MilestoneRecordType.CONNECTOR_SUCCEEDED,
)
on_connector_indexing_success(
tenant_id=tenant_id,
source=source,
cc_pair_id=cc_pair.id,
connector_name=connector_name,
docs_indexed=attempt.new_docs_indexed or 0,
success_timestamp=attempt.time_updated.timestamp(),
)
# Clear repeated error state on success
if cc_pair.in_repeated_error_state:
cc_pair.in_repeated_error_state = False
# Delete any existing error notification for this CC pair so a
# fresh one is created if the connector fails again later.
for notif in get_notifications(
user=None,
db_session=db_session,
notif_type=NotificationType.CONNECTOR_REPEATED_ERRORS,
include_dismissed=True,
):
if (
notif.additional_data
and notif.additional_data.get("cc_pair_id") == cc_pair.id
):
db_session.delete(notif)
db_session.commit()
on_connector_error_state_change(
tenant_id=tenant_id,
source=source,
cc_pair_id=cc_pair.id,
connector_name=connector_name,
in_error=False,
)
if attempt.status == IndexingStatus.SUCCESS:
logger.info(
@@ -608,6 +657,27 @@ def active_indexing_attempt(
return bool(active_indexing_attempt)
@dataclass
class _KickoffResult:
"""Tracks diagnostic counts from a _kickoff_indexing_tasks run."""
created: int = 0
skipped_active: int = 0
skipped_not_found: int = 0
skipped_not_indexable: int = 0
failed_to_create: int = 0
@property
def evaluated(self) -> int:
return (
self.created
+ self.skipped_active
+ self.skipped_not_found
+ self.skipped_not_indexable
+ self.failed_to_create
)
def _kickoff_indexing_tasks(
celery_app: Celery,
db_session: Session,
@@ -617,12 +687,12 @@ def _kickoff_indexing_tasks(
redis_client: Redis,
lock_beat: RedisLock,
tenant_id: str,
) -> int:
) -> _KickoffResult:
"""Kick off indexing tasks for the given cc_pair_ids and search_settings.
Returns the number of tasks successfully created.
Returns a _KickoffResult with diagnostic counts.
"""
tasks_created = 0
result = _KickoffResult()
for cc_pair_id in cc_pair_ids:
lock_beat.reacquire()
@@ -633,6 +703,7 @@ def _kickoff_indexing_tasks(
search_settings_id=search_settings.id,
db_session=db_session,
):
result.skipped_active += 1
continue
cc_pair = get_connector_credential_pair_from_id(
@@ -643,6 +714,7 @@ def _kickoff_indexing_tasks(
task_logger.warning(
f"_kickoff_indexing_tasks - CC pair not found: cc_pair={cc_pair_id}"
)
result.skipped_not_found += 1
continue
# Heavyweight check after fetching cc pair
@@ -657,6 +729,7 @@ def _kickoff_indexing_tasks(
f"search_settings={search_settings.id}, "
f"secondary_index_building={secondary_index_building}"
)
result.skipped_not_indexable += 1
continue
task_logger.debug(
@@ -696,13 +769,14 @@ def _kickoff_indexing_tasks(
task_logger.info(
f"Connector indexing queued: index_attempt={attempt_id} cc_pair={cc_pair.id} search_settings={search_settings.id}"
)
tasks_created += 1
result.created += 1
else:
task_logger.error(
f"Failed to create indexing task: cc_pair={cc_pair.id} search_settings={search_settings.id}"
)
result.failed_to_create += 1
return tasks_created
return result
@shared_task(
@@ -728,13 +802,15 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
task_logger.warning("check_for_indexing - Starting")
tasks_created = 0
primary_result = _KickoffResult()
secondary_result: _KickoffResult | None = None
locked = False
redis_client = get_redis_client()
redis_client_replica = get_redis_replica_client()
# we need to use celery's redis client to access its redis data
# (which lives on a different db number)
# redis_client_celery: Redis = self.app.broker_connection().channel().client # type: ignore
# redis_client_celery: Redis = self.app.broker_connection().channel().client
lock_beat: RedisLock = redis_client.lock(
OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK,
@@ -848,6 +924,43 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
cc_pair_id=cc_pair_id,
in_repeated_error_state=True,
)
error_connector_name = (
cc_pair.connector.name or f"cc_pair_{cc_pair.id}"
)
on_connector_error_state_change(
tenant_id=tenant_id,
source=cc_pair.connector.source.value,
cc_pair_id=cc_pair_id,
connector_name=error_connector_name,
in_error=True,
)
connector_name = (
cc_pair.name
or cc_pair.connector.name
or f"CC pair {cc_pair.id}"
)
source = cc_pair.connector.source.value
connector_url = f"/admin/connector/{cc_pair.id}"
create_notification(
user_id=None,
notif_type=NotificationType.CONNECTOR_REPEATED_ERRORS,
db_session=db_session,
title=f"Connector '{connector_name}' has entered repeated error state",
description=(
f"The {source} connector has failed repeatedly and "
f"has been flagged. View indexing history in the "
f"Advanced section: {connector_url}"
),
additional_data={"cc_pair_id": cc_pair.id},
)
task_logger.error(
f"Connector entered repeated error state: "
f"cc_pair={cc_pair.id} "
f"connector={cc_pair.connector.name} "
f"source={source}"
)
# When entering repeated error state, also pause the connector
# to prevent continued indexing retry attempts burning through embedding credits.
# NOTE: only for Cloud, since most self-hosted users use self-hosted embedding
@@ -863,7 +976,7 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
# Heavy check, should_index(), is called in _kickoff_indexing_tasks
with get_session_with_current_tenant() as db_session:
# Primary first
tasks_created += _kickoff_indexing_tasks(
primary_result = _kickoff_indexing_tasks(
celery_app=self.app,
db_session=db_session,
search_settings=current_search_settings,
@@ -873,6 +986,7 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
lock_beat=lock_beat,
tenant_id=tenant_id,
)
tasks_created += primary_result.created
# Secondary indexing (only if secondary search settings exist and switchover_type is not INSTANT)
if (
@@ -880,7 +994,7 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
and secondary_search_settings.switchover_type != SwitchoverType.INSTANT
and secondary_cc_pair_ids
):
tasks_created += _kickoff_indexing_tasks(
secondary_result = _kickoff_indexing_tasks(
celery_app=self.app,
db_session=db_session,
search_settings=secondary_search_settings,
@@ -890,6 +1004,7 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
lock_beat=lock_beat,
tenant_id=tenant_id,
)
tasks_created += secondary_result.created
elif (
secondary_search_settings
and secondary_search_settings.switchover_type == SwitchoverType.INSTANT
@@ -1002,7 +1117,26 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
redis_lock_dump(lock_beat, redis_client)
time_elapsed = time.monotonic() - time_start
task_logger.info(f"check_for_indexing finished: elapsed={time_elapsed:.2f}")
task_logger.info(
f"check_for_indexing finished: "
f"elapsed={time_elapsed:.2f}s "
f"primary=[evaluated={primary_result.evaluated} "
f"created={primary_result.created} "
f"skipped_active={primary_result.skipped_active} "
f"skipped_not_found={primary_result.skipped_not_found} "
f"skipped_not_indexable={primary_result.skipped_not_indexable} "
f"failed={primary_result.failed_to_create}]"
+ (
f" secondary=[evaluated={secondary_result.evaluated} "
f"created={secondary_result.created} "
f"skipped_active={secondary_result.skipped_active} "
f"skipped_not_found={secondary_result.skipped_not_found} "
f"skipped_not_indexable={secondary_result.skipped_not_indexable} "
f"failed={secondary_result.failed_to_create}]"
if secondary_result
else ""
)
)
return tasks_created

View File

@@ -172,6 +172,10 @@ def migrate_chunks_from_vespa_to_opensearch_task(
search_settings = get_current_search_settings(db_session)
indexing_setting = IndexingSetting.from_db_model(search_settings)
task_logger.debug(
"Verified tenant info, migration record, and search settings."
)
# 2.e. Build sanitized to original doc ID mapping to check for
# conflicts in the event we sanitize a doc ID to an
# already-existing doc ID.
@@ -325,6 +329,7 @@ def migrate_chunks_from_vespa_to_opensearch_task(
finally:
if lock.owned():
lock.release()
task_logger.debug("Released the OpenSearch migration lock.")
else:
task_logger.warning(
"The OpenSearch migration lock was not owned on completion of the migration task."

View File

@@ -38,6 +38,7 @@ from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.connectors.factory import instantiate_connector
from onyx.connectors.interfaces import BaseConnector
from onyx.connectors.models import InputType
from onyx.db.connector import mark_ccpair_as_pruned
from onyx.db.connector_credential_pair import get_connector_credential_pair
@@ -50,7 +51,6 @@ from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.hierarchy import delete_orphaned_hierarchy_nodes
from onyx.db.hierarchy import link_hierarchy_nodes_to_documents
from onyx.db.hierarchy import remove_stale_hierarchy_node_cc_pair_entries
from onyx.db.hierarchy import reparent_orphaned_hierarchy_nodes
from onyx.db.hierarchy import update_document_parent_hierarchy_nodes
@@ -525,6 +525,14 @@ def connector_pruning_generator_task(
return None
try:
# Session 1: pre-enumeration — load cc_pair and instantiate the connector.
# The session is closed before enumeration so the DB connection is not held
# open during the 1030+ minute connector crawl.
connector_source: DocumentSource | None = None
connector_type: str = ""
is_connector_public: bool = False
runnable_connector: BaseConnector | None = None
with get_session_with_current_tenant() as db_session:
cc_pair = get_connector_credential_pair(
db_session=db_session,
@@ -550,49 +558,51 @@ def connector_pruning_generator_task(
)
redis_connector.prune.set_fence(new_payload)
connector_source = cc_pair.connector.source
connector_type = connector_source.value
is_connector_public = cc_pair.access_type == AccessType.PUBLIC
task_logger.info(
f"Pruning generator running connector: cc_pair={cc_pair_id} connector_source={cc_pair.connector.source}"
f"Pruning generator running connector: cc_pair={cc_pair_id} connector_source={connector_source}"
)
runnable_connector = instantiate_connector(
db_session,
cc_pair.connector.source,
connector_source,
InputType.SLIM_RETRIEVAL,
cc_pair.connector.connector_specific_config,
cc_pair.credential,
)
# Session 1 closed here — connection released before enumeration.
callback = PruneCallback(
0,
redis_connector,
lock,
r,
timeout_seconds=JOB_TIMEOUT,
)
callback = PruneCallback(
0,
redis_connector,
lock,
r,
timeout_seconds=JOB_TIMEOUT,
)
# Extract docs and hierarchy nodes from the source
connector_type = cc_pair.connector.source.value
extraction_result = extract_ids_from_runnable_connector(
runnable_connector, callback, connector_type=connector_type
)
all_connector_doc_ids = extraction_result.raw_id_to_parent
# Extract docs and hierarchy nodes from the source (no DB session held).
extraction_result = extract_ids_from_runnable_connector(
runnable_connector, callback, connector_type=connector_type
)
all_connector_doc_ids = extraction_result.raw_id_to_parent
# Process hierarchy nodes (same as docfetching):
# upsert to Postgres and cache in Redis
source = cc_pair.connector.source
# Session 2: post-enumeration — hierarchy upserts, diff computation, task dispatch.
with get_session_with_current_tenant() as db_session:
source = connector_source
redis_client = get_redis_client(tenant_id=tenant_id)
ensure_source_node_exists(redis_client, db_session, source)
upserted_nodes: list[DBHierarchyNode] = []
if extraction_result.hierarchy_nodes:
is_connector_public = cc_pair.access_type == AccessType.PUBLIC
upserted_nodes = upsert_hierarchy_nodes_batch(
db_session=db_session,
nodes=extraction_result.hierarchy_nodes,
source=source,
commit=True,
commit=False,
is_connector_public=is_connector_public,
)
@@ -601,9 +611,13 @@ def connector_pruning_generator_task(
hierarchy_node_ids=[n.id for n in upserted_nodes],
connector_id=connector_id,
credential_id=credential_id,
commit=True,
commit=False,
)
# Single commit so the FK reference in the join table can never
# outrun the parent hierarchy_node insert.
db_session.commit()
cache_entries = [
HierarchyNodeCacheEntry.from_db_model(node)
for node in upserted_nodes
@@ -628,16 +642,6 @@ def connector_pruning_generator_task(
raw_id_to_parent=all_connector_doc_ids,
)
# Link hierarchy nodes to documents for sources where pages can be
# both hierarchy nodes AND documents (e.g. Notion, Confluence)
all_doc_id_list = list(all_connector_doc_ids.keys())
link_hierarchy_nodes_to_documents(
db_session=db_session,
document_ids=all_doc_id_list,
source=source,
commit=True,
)
diff_start = time.monotonic()
try:
# a list of docs in our local index
@@ -658,7 +662,7 @@ def connector_pruning_generator_task(
task_logger.info(
"Pruning set collected: "
f"cc_pair={cc_pair_id} "
f"connector_source={cc_pair.connector.source} "
f"connector_source={connector_source} "
f"docs_to_remove={len(doc_ids_to_remove)}"
)

View File

@@ -248,6 +248,7 @@ def document_by_cc_pair_cleanup_task(
),
)
mark_document_as_modified(document_id, db_session)
db_session.commit()
completion_status = (
OnyxCeleryTaskCompletionStatus.NON_RETRYABLE_EXCEPTION
)

View File

@@ -61,7 +61,9 @@ def load_checkpoint(
checkpoint_io = file_store.read_file(checkpoint_pointer, mode="rb")
checkpoint_data = checkpoint_io.read().decode("utf-8")
if isinstance(connector, CheckpointedConnector):
return connector.validate_checkpoint_json(checkpoint_data)
return connector.validate_checkpoint_json( # ty: ignore[invalid-return-type]
checkpoint_data
)
return ConnectorCheckpoint.model_validate_json(checkpoint_data)

View File

@@ -23,6 +23,8 @@ class IndexAttemptErrorPydantic(BaseModel):
index_attempt_id: int
error_type: str | None = None
@classmethod
def from_model(cls, model: IndexAttemptError) -> "IndexAttemptErrorPydantic":
return cls(
@@ -37,4 +39,5 @@ class IndexAttemptErrorPydantic(BaseModel):
is_resolved=model.is_resolved,
time_created=model.time_created,
index_attempt_id=model.index_attempt_id,
error_type=model.error_type,
)

View File

@@ -5,6 +5,7 @@ from datetime import datetime
from datetime import timedelta
from datetime import timezone
import sentry_sdk
from celery import Celery
from sqlalchemy.orm import Session
@@ -68,6 +69,7 @@ from onyx.redis.redis_pool import get_redis_client
from onyx.server.features.build.indexing.persistent_document_writer import (
get_persistent_document_writer,
)
from onyx.server.metrics.connector_health_metrics import on_index_attempt_status_change
from onyx.utils.logger import setup_logger
from onyx.utils.middleware import make_randomized_onyx_request_id
from onyx.utils.postgres_sanitization import sanitize_document_for_postgres
@@ -267,6 +269,14 @@ def run_docfetching_entrypoint(
)
credential_id = attempt.connector_credential_pair.credential_id
on_index_attempt_status_change(
tenant_id=tenant_id,
source=attempt.connector_credential_pair.connector.source.value,
cc_pair_id=connector_credential_pair_id,
connector_name=connector_name or f"cc_pair_{connector_credential_pair_id}",
status="in_progress",
)
logger.info(
f"Docfetching starting{tenant_str}: "
f"connector='{connector_name}' "
@@ -556,6 +566,27 @@ def connector_document_extraction(
# save record of any failures at the connector level
if failure is not None:
if failure.exception is not None:
with sentry_sdk.new_scope() as scope:
scope.set_tag("stage", "connector_fetch")
scope.set_tag("connector_source", db_connector.source.value)
scope.set_tag("cc_pair_id", str(cc_pair_id))
scope.set_tag("index_attempt_id", str(index_attempt_id))
scope.set_tag("tenant_id", tenant_id)
if failure.failed_document:
scope.set_tag(
"doc_id", failure.failed_document.document_id
)
if failure.failed_entity:
scope.set_tag(
"entity_id", failure.failed_entity.entity_id
)
scope.fingerprint = [
"connector-fetch-failure",
db_connector.source.value,
type(failure.exception).__name__,
]
sentry_sdk.capture_exception(failure.exception)
total_failures += 1
with get_session_with_current_tenant() as db_session:
create_index_attempt_error(

View File

@@ -364,7 +364,7 @@ def _get_or_extract_plaintext(
plaintext_io = file_store.read_file(plaintext_key, mode="b")
return plaintext_io.read().decode("utf-8")
except Exception:
logger.exception(f"Error when reading file, id={file_id}")
logger.info(f"Cache miss for file with id={file_id}")
# Cache miss — extract and store.
content_text = extract_fn()

View File

@@ -4,8 +4,6 @@ from collections.abc import Callable
from typing import Any
from typing import Literal
from sqlalchemy.orm import Session
from onyx.chat.chat_state import ChatStateContainer
from onyx.chat.chat_utils import create_tool_call_failure_messages
from onyx.chat.citation_processor import CitationMapping
@@ -635,7 +633,6 @@ def run_llm_loop(
user_memory_context: UserMemoryContext | None,
llm: LLM,
token_counter: Callable[[str], int],
db_session: Session,
forced_tool_id: int | None = None,
user_identity: LLMUserIdentity | None = None,
chat_session_id: str | None = None,
@@ -1020,20 +1017,16 @@ def run_llm_loop(
persisted_memory_id: int | None = None
if user_memory_context and user_memory_context.user_id:
if tool_response.rich_response.index_to_replace is not None:
memory = update_memory_at_index(
persisted_memory_id = update_memory_at_index(
user_id=user_memory_context.user_id,
index=tool_response.rich_response.index_to_replace,
new_text=tool_response.rich_response.memory_text,
db_session=db_session,
)
persisted_memory_id = memory.id if memory else None
else:
memory = add_memory(
persisted_memory_id = add_memory(
user_id=user_memory_context.user_id,
memory_text=tool_response.rich_response.memory_text,
db_session=db_session,
)
persisted_memory_id = memory.id
operation: Literal["add", "update"] = (
"update"
if tool_response.rich_response.index_to_replace is not None
@@ -1171,7 +1164,10 @@ def run_llm_loop(
emitter.emit(
Packet(
placement=Placement(turn_index=llm_cycle_count + reasoning_cycles),
placement=Placement(
turn_index=llm_cycle_count # ty: ignore[possibly-unresolved-reference]
+ reasoning_cycles
),
obj=OverallStop(type="stop"),
)
)

View File

@@ -67,7 +67,6 @@ from onyx.db.chat import get_chat_session_by_id
from onyx.db.chat import get_or_create_root_message
from onyx.db.chat import reserve_message_id
from onyx.db.chat import reserve_multi_model_message_ids
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.enums import HookPoint
from onyx.db.memory import get_memories
from onyx.db.models import ChatMessage
@@ -1006,93 +1005,86 @@ def _run_models(
model_llm = setup.llms[model_idx]
try:
# Each worker opens its own session — SQLAlchemy sessions are not thread-safe.
# Do NOT write to the outer db_session (or any shared DB state) from here;
# all DB writes in this thread must go through thread_db_session.
with get_session_with_current_tenant() as thread_db_session:
thread_tool_dict = construct_tools(
persona=setup.persona,
db_session=thread_db_session,
emitter=model_emitter,
user=user,
llm=model_llm,
search_tool_config=SearchToolConfig(
user_selected_filters=setup.new_msg_req.internal_search_filters,
project_id_filter=setup.search_params.project_id_filter,
persona_id_filter=setup.search_params.persona_id_filter,
bypass_acl=setup.bypass_acl,
slack_context=setup.slack_context,
enable_slack_search=_should_enable_slack_search(
setup.persona, setup.new_msg_req.internal_search_filters
),
# Each function opens short-lived DB sessions on demand.
# Do NOT pass a long-lived session here — it would hold a
# connection for the entire LLM loop (minutes), and cloud
# infrastructure may drop idle connections.
thread_tool_dict = construct_tools(
persona=setup.persona,
emitter=model_emitter,
user=user,
llm=model_llm,
search_tool_config=SearchToolConfig(
user_selected_filters=setup.new_msg_req.internal_search_filters,
project_id_filter=setup.search_params.project_id_filter,
persona_id_filter=setup.search_params.persona_id_filter,
bypass_acl=setup.bypass_acl,
slack_context=setup.slack_context,
enable_slack_search=_should_enable_slack_search(
setup.persona, setup.new_msg_req.internal_search_filters
),
custom_tool_config=CustomToolConfig(
chat_session_id=setup.chat_session.id,
message_id=setup.user_message.id,
additional_headers=setup.custom_tool_additional_headers,
mcp_headers=setup.mcp_headers,
),
file_reader_tool_config=FileReaderToolConfig(
user_file_ids=setup.available_files.user_file_ids,
chat_file_ids=setup.available_files.chat_file_ids,
),
allowed_tool_ids=setup.new_msg_req.allowed_tool_ids,
search_usage_forcing_setting=setup.search_params.search_usage,
),
custom_tool_config=CustomToolConfig(
chat_session_id=setup.chat_session.id,
message_id=setup.user_message.id,
additional_headers=setup.custom_tool_additional_headers,
mcp_headers=setup.mcp_headers,
),
file_reader_tool_config=FileReaderToolConfig(
user_file_ids=setup.available_files.user_file_ids,
chat_file_ids=setup.available_files.chat_file_ids,
),
allowed_tool_ids=setup.new_msg_req.allowed_tool_ids,
search_usage_forcing_setting=setup.search_params.search_usage,
)
model_tools = [
tool for tool_list in thread_tool_dict.values() for tool in tool_list
]
if setup.forced_tool_id and setup.forced_tool_id not in {
tool.id for tool in model_tools
}:
raise ValueError(
f"Forced tool {setup.forced_tool_id} not found in tools"
)
model_tools = [
tool
for tool_list in thread_tool_dict.values()
for tool in tool_list
]
if setup.forced_tool_id and setup.forced_tool_id not in {
tool.id for tool in model_tools
}:
raise ValueError(
f"Forced tool {setup.forced_tool_id} not found in tools"
)
# Per-thread copy: run_llm_loop mutates simple_chat_history in-place.
if n_models == 1 and setup.new_msg_req.deep_research:
if setup.chat_session.project_id:
raise RuntimeError(
"Deep research is not supported for projects"
)
run_deep_research_llm_loop(
emitter=model_emitter,
state_container=sc,
simple_chat_history=list(setup.simple_chat_history),
tools=model_tools,
custom_agent_prompt=setup.custom_agent_prompt,
llm=model_llm,
token_counter=get_llm_token_counter(model_llm),
db_session=thread_db_session,
skip_clarification=setup.skip_clarification,
user_identity=setup.user_identity,
chat_session_id=str(setup.chat_session.id),
all_injected_file_metadata=setup.all_injected_file_metadata,
)
else:
run_llm_loop(
emitter=model_emitter,
state_container=sc,
simple_chat_history=list(setup.simple_chat_history),
tools=model_tools,
custom_agent_prompt=setup.custom_agent_prompt,
context_files=setup.extracted_context_files,
persona=setup.persona,
user_memory_context=setup.user_memory_context,
llm=model_llm,
token_counter=get_llm_token_counter(model_llm),
db_session=thread_db_session,
forced_tool_id=setup.forced_tool_id,
user_identity=setup.user_identity,
chat_session_id=str(setup.chat_session.id),
chat_files=setup.chat_files_for_tools,
include_citations=setup.new_msg_req.include_citations,
all_injected_file_metadata=setup.all_injected_file_metadata,
inject_memories_in_prompt=user.use_memories,
)
# Per-thread copy: run_llm_loop mutates simple_chat_history in-place.
if n_models == 1 and setup.new_msg_req.deep_research:
if setup.chat_session.project_id:
raise RuntimeError("Deep research is not supported for projects")
run_deep_research_llm_loop(
emitter=model_emitter,
state_container=sc,
simple_chat_history=list(setup.simple_chat_history),
tools=model_tools,
custom_agent_prompt=setup.custom_agent_prompt,
llm=model_llm,
token_counter=get_llm_token_counter(model_llm),
skip_clarification=setup.skip_clarification,
user_identity=setup.user_identity,
chat_session_id=str(setup.chat_session.id),
all_injected_file_metadata=setup.all_injected_file_metadata,
)
else:
run_llm_loop(
emitter=model_emitter,
state_container=sc,
simple_chat_history=list(setup.simple_chat_history),
tools=model_tools,
custom_agent_prompt=setup.custom_agent_prompt,
context_files=setup.extracted_context_files,
persona=setup.persona,
user_memory_context=setup.user_memory_context,
llm=model_llm,
token_counter=get_llm_token_counter(model_llm),
forced_tool_id=setup.forced_tool_id,
user_identity=setup.user_identity,
chat_session_id=str(setup.chat_session.id),
chat_files=setup.chat_files_for_tools,
include_citations=setup.new_msg_req.include_citations,
all_injected_file_metadata=setup.all_injected_file_metadata,
inject_memories_in_prompt=user.use_memories,
)
model_succeeded[model_idx] = True

View File

@@ -1125,6 +1125,32 @@ DEFAULT_IMAGE_ANALYSIS_MAX_SIZE_MB = 20
# Number of pre-provisioned tenants to maintain
TARGET_AVAILABLE_TENANTS = int(os.environ.get("TARGET_AVAILABLE_TENANTS", "5"))
# Master switch for the tenant work-gating feature. Controls the `enabled`
# axis only — flipping this True puts the feature in shadow mode (compute
# the gate, log skip counts, but do not actually skip). The `enforce` axis
# is Redis-only with a hard-coded default of False, so this env flag alone
# cannot cause real tenants to be skipped. Default off.
ENABLE_TENANT_WORK_GATING = (
os.environ.get("ENABLE_TENANT_WORK_GATING", "").lower() == "true"
)
# Membership TTL for the `active_tenants` sorted set. Members older than this
# are treated as inactive by the gate read path. Must be > the full-fanout
# interval so self-healing re-adds a genuinely-working tenant before their
# membership expires. Default 30 min.
TENANT_WORK_GATING_TTL_SECONDS = int(
os.environ.get("TENANT_WORK_GATING_TTL_SECONDS", 30 * 60)
)
# Minimum wall-clock interval between full-fanout cycles. When this many
# seconds have elapsed since the last bypass, the generator ignores the gate
# on the next invocation and dispatches to every non-gated tenant, letting
# consumers re-populate the active set. Schedule-independent so beat drift
# or backlog can't make the self-heal bursty or sparse. Default 20 min.
TENANT_WORK_GATING_FULL_FANOUT_INTERVAL_SECONDS = int(
os.environ.get("TENANT_WORK_GATING_FULL_FANOUT_INTERVAL_SECONDS", 20 * 60)
)
# Image summarization configuration
IMAGE_SUMMARIZATION_SYSTEM_PROMPT = os.environ.get(

View File

@@ -283,6 +283,7 @@ class NotificationType(str, Enum):
RELEASE_NOTES = "release_notes"
ASSISTANT_FILES_READY = "assistant_files_ready"
FEATURE_ANNOUNCEMENT = "feature_announcement"
CONNECTOR_REPEATED_ERRORS = "connector_repeated_errors"
class BlobType(str, Enum):
@@ -638,9 +639,11 @@ REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPINTVL] = 15
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPCNT] = 3
if platform.system() == "Darwin":
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPALIVE] = 60 # type: ignore[attr-defined,unused-ignore]
REDIS_SOCKET_KEEPALIVE_OPTIONS[
socket.TCP_KEEPALIVE # ty: ignore[unresolved-attribute]
] = 60
else:
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPIDLE] = 60 # type: ignore[attr-defined,unused-ignore]
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPIDLE] = 60
class OnyxCallTypes(str, Enum):

View File

@@ -0,0 +1,48 @@
from typing import Any
from sentry_sdk.types import Event
from onyx.utils.logger import setup_logger
logger = setup_logger()
_instance_id_resolved = False
def _add_instance_tags(
event: Event,
hint: dict[str, Any], # noqa: ARG001
) -> Event | None:
"""Sentry before_send hook that lazily attaches instance identification tags.
On the first event, resolves the instance UUID from the KV store (requires DB)
and sets it as a global Sentry tag. Subsequent events pick it up automatically.
"""
global _instance_id_resolved
if _instance_id_resolved:
return event
try:
import sentry_sdk
from shared_configs.configs import MULTI_TENANT
if MULTI_TENANT:
instance_id = "multi-tenant-cloud"
else:
from onyx.utils.telemetry import get_or_generate_uuid
instance_id = get_or_generate_uuid()
sentry_sdk.set_tag("instance_id", instance_id)
# Also set on this event since set_tag won't retroactively apply
event.setdefault("tags", {})["instance_id"] = instance_id
# Only mark resolved after success — if DB wasn't ready, retry next event
_instance_id_resolved = True
except Exception:
logger.debug("Failed to resolve instance_id for Sentry tagging")
return event

View File

@@ -547,7 +547,7 @@ class AirtableConnector(LoadConnector):
for record in batch_records:
# Capture the current context so that the thread gets the current tenant ID
current_context = contextvars.copy_context()
future_to_record[
future_to_record[ # ty: ignore[invalid-assignment]
executor.submit(
current_context.run,
self._process_record,

View File

@@ -3,7 +3,7 @@ from collections.abc import Iterator
from datetime import datetime
from typing import Dict
import asana # type: ignore
import asana
from onyx.utils.logger import setup_logger

View File

@@ -327,7 +327,7 @@ class AxeroConnector(PollConnector):
)
all_axero_forums = _map_post_to_parent(
posts=forums_posts,
posts=forums_posts, # ty: ignore[invalid-argument-type]
api_key=self.axero_key,
axero_base_url=self.base_url,
)

View File

@@ -26,6 +26,10 @@ from onyx.configs.constants import FileOrigin
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
process_onyx_metadata,
)
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
tabular_file_to_sections,
)
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
@@ -38,6 +42,7 @@ from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_processing.extract_file_text import get_file_ext
@@ -71,7 +76,9 @@ class BlobStorageConnector(LoadConnector, PollConnector):
self.bucket_region: Optional[str] = None
self.european_residency: bool = european_residency
def set_allow_images(self, allow_images: bool) -> None:
def set_allow_images( # ty: ignore[invalid-method-override]
self, allow_images: bool
) -> None:
"""Set whether to process images in this connector."""
logger.info(f"Setting allow_images to {allow_images}.")
self._allow_images = allow_images
@@ -190,7 +197,9 @@ class BlobStorageConnector(LoadConnector, PollConnector):
method="sts-assume-role",
)
botocore_session = get_session()
botocore_session._credentials = refreshable # type: ignore[attr-defined]
botocore_session._credentials = ( # ty: ignore[unresolved-attribute]
refreshable
)
session = boto3.Session(botocore_session=botocore_session)
self.s3_client = session.client("s3")
elif authentication_method == "assume_role":
@@ -451,6 +460,40 @@ class BlobStorageConnector(LoadConnector, PollConnector):
logger.exception(f"Error processing image {key}")
continue
# Handle tabular files (xlsx, csv, tsv) — produce one
# TabularSection per sheet (or per file for csv/tsv)
# instead of a flat TextSection.
if is_tabular_file(file_name):
try:
downloaded_file = self._download_object(key)
if downloaded_file is None:
continue
tabular_sections = tabular_file_to_sections(
BytesIO(downloaded_file),
file_name=file_name,
link=link,
)
batch.append(
Document(
id=f"{self.bucket_type}:{self.bucket_name}:{key}",
sections=(
tabular_sections
if tabular_sections
else [TabularSection(link=link, text="")]
),
source=DocumentSource(self.bucket_type.value),
semantic_identifier=file_name,
doc_updated_at=last_modified,
metadata={},
)
)
if len(batch) == self.batch_size:
yield batch
batch = []
except Exception:
logger.exception(f"Error processing tabular file {key}")
continue
# Handle text and document files
try:
downloaded_file = self._download_object(key)

View File

@@ -27,16 +27,19 @@ _STATUS_TO_ERROR_CODE: dict[int, OnyxErrorCode] = {
401: OnyxErrorCode.CREDENTIAL_EXPIRED,
403: OnyxErrorCode.INSUFFICIENT_PERMISSIONS,
404: OnyxErrorCode.BAD_GATEWAY,
429: OnyxErrorCode.RATE_LIMITED,
}
def _error_code_for_status(status_code: int) -> OnyxErrorCode:
"""Map an HTTP status code to the appropriate OnyxErrorCode.
Expects a >= 400 status code. Known codes (401, 403, 404, 429) are
Expects a >= 400 status code. Known codes (401, 403, 404) are
mapped to specific error codes; all other codes (unrecognised 4xx
and 5xx) map to BAD_GATEWAY as unexpected upstream errors.
Note: 429 is intentionally omitted — the rl_requests wrapper
handles rate limits transparently at the HTTP layer, so 429
responses never reach this function.
"""
if status_code in _STATUS_TO_ERROR_CODE:
return _STATUS_TO_ERROR_CODE[status_code]

View File

@@ -1,10 +1,9 @@
from datetime import datetime
from datetime import timezone
from enum import StrEnum
from typing import Any
from typing import cast
from typing import Literal
from typing import NoReturn
from typing import TypeAlias
from pydantic import BaseModel
from retry import retry
@@ -25,8 +24,11 @@ from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import EntityFailure
from onyx.connectors.models import ImageSection
from onyx.connectors.models import TextSection
from onyx.error_handling.exceptions import OnyxError
@@ -47,10 +49,6 @@ def _handle_canvas_api_error(e: OnyxError) -> NoReturn:
raise InsufficientPermissionsError(
"Canvas API token does not have sufficient permissions (HTTP 403)."
)
elif e.status_code == 429:
raise ConnectorValidationError(
"Canvas rate-limit exceeded (HTTP 429). Please try again later."
)
elif e.status_code >= 500:
raise UnexpectedValidationError(
f"Unexpected Canvas HTTP error (status={e.status_code}): {e}"
@@ -61,6 +59,60 @@ def _handle_canvas_api_error(e: OnyxError) -> NoReturn:
)
class CanvasStage(StrEnum):
PAGES = "pages"
ASSIGNMENTS = "assignments"
ANNOUNCEMENTS = "announcements"
_STAGE_CONFIG: dict[CanvasStage, dict[str, Any]] = {
CanvasStage.PAGES: {
"endpoint": "courses/{course_id}/pages",
"params": {
"per_page": "100",
"include[]": "body",
"published": "true",
"sort": "updated_at",
"order": "desc",
},
},
CanvasStage.ASSIGNMENTS: {
"endpoint": "courses/{course_id}/assignments",
"params": {"per_page": "100", "published": "true"},
},
CanvasStage.ANNOUNCEMENTS: {
"endpoint": "announcements",
"params": {
"per_page": "100",
"context_codes[]": "course_{course_id}",
"active_only": "true",
},
},
}
def _parse_canvas_dt(timestamp_str: str) -> datetime:
"""Parse a Canvas ISO-8601 timestamp (e.g. '2025-06-15T12:00:00Z')
into a timezone-aware UTC datetime.
Canvas returns timestamps with a trailing 'Z' instead of '+00:00',
so we normalise before parsing.
"""
return datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")).astimezone(
timezone.utc
)
def _unix_to_canvas_time(epoch: float) -> str:
"""Convert a Unix timestamp to Canvas ISO-8601 format (e.g. '2025-06-15T12:00:00Z')."""
return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _in_time_window(timestamp_str: str, start: float, end: float) -> bool:
"""Check whether a Canvas ISO-8601 timestamp falls within (start, end]."""
return start < _parse_canvas_dt(timestamp_str).timestamp() <= end
class CanvasCourse(BaseModel):
id: int
name: str | None = None
@@ -145,9 +197,6 @@ class CanvasAnnouncement(BaseModel):
)
CanvasStage: TypeAlias = Literal["pages", "assignments", "announcements"]
class CanvasConnectorCheckpoint(ConnectorCheckpoint):
"""Checkpoint state for resumable Canvas indexing.
@@ -165,15 +214,30 @@ class CanvasConnectorCheckpoint(ConnectorCheckpoint):
course_ids: list[int] = []
current_course_index: int = 0
stage: CanvasStage = "pages"
stage: CanvasStage = CanvasStage.PAGES
next_url: str | None = None
def advance_course(self) -> None:
"""Move to the next course and reset within-course state."""
self.current_course_index += 1
self.stage = "pages"
self.stage = CanvasStage.PAGES
self.next_url = None
def advance_stage(self) -> None:
"""Advance past the current stage.
Moves to the next stage within the same course, or to the next
course if the current stage is the last one. Resets next_url so
the next call starts fresh on the new stage.
"""
self.next_url = None
stages: list[CanvasStage] = list(CanvasStage)
next_idx = stages.index(self.stage) + 1
if next_idx < len(stages):
self.stage = stages[next_idx]
else:
self.advance_course()
class CanvasConnector(
CheckpointedConnectorWithPermSync[CanvasConnectorCheckpoint],
@@ -295,13 +359,7 @@ class CanvasConnector(
if body_text:
text_parts.append(body_text)
doc_updated_at = (
datetime.fromisoformat(page.updated_at.replace("Z", "+00:00")).astimezone(
timezone.utc
)
if page.updated_at
else None
)
doc_updated_at = _parse_canvas_dt(page.updated_at) if page.updated_at else None
document = self._build_document(
doc_id=f"canvas-page-{page.course_id}-{page.page_id}",
@@ -325,17 +383,11 @@ class CanvasConnector(
if desc_text:
text_parts.append(desc_text)
if assignment.due_at:
due_dt = datetime.fromisoformat(
assignment.due_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
due_dt = _parse_canvas_dt(assignment.due_at)
text_parts.append(f"Due: {due_dt.strftime('%B %d, %Y %H:%M UTC')}")
doc_updated_at = (
datetime.fromisoformat(
assignment.updated_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
if assignment.updated_at
else None
_parse_canvas_dt(assignment.updated_at) if assignment.updated_at else None
)
document = self._build_document(
@@ -361,11 +413,7 @@ class CanvasConnector(
text_parts.append(msg_text)
doc_updated_at = (
datetime.fromisoformat(
announcement.posted_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
if announcement.posted_at
else None
_parse_canvas_dt(announcement.posted_at) if announcement.posted_at else None
)
document = self._build_document(
@@ -400,6 +448,314 @@ class CanvasConnector(
self._canvas_client = client
return None
def _fetch_stage_page(
self,
next_url: str | None,
endpoint: str,
params: dict[str, Any],
) -> tuple[list[Any], str | None]:
"""Fetch one page of API results for the current stage.
Returns (items, next_url). All error handling is done by the
caller (_load_from_checkpoint).
"""
if next_url:
# Resuming mid-pagination: the next_url from Canvas's
# Link header already contains endpoint + query params.
response, result_next_url = self.canvas_client.get(full_url=next_url)
else:
# First request for this stage: build from endpoint + params.
response, result_next_url = self.canvas_client.get(
endpoint=endpoint, params=params
)
return response or [], result_next_url
def _process_items(
self,
response: list[Any],
stage: CanvasStage,
course_id: int,
start: float,
end: float,
include_permissions: bool,
) -> tuple[list[Document | ConnectorFailure], bool]:
"""Process a page of API results into documents.
Returns (docs, early_exit). early_exit is True when pages
(sorted desc by updated_at) hit an item older than start,
signaling that pagination should stop.
"""
results: list[Document | ConnectorFailure] = []
early_exit = False
for item in response:
try:
if stage == CanvasStage.PAGES:
page = CanvasPage.from_api(item, course_id=course_id)
if not page.updated_at:
continue
# Pages are sorted by updated_at desc — once we see
# an item at or before `start`, all remaining items
# on this and subsequent pages are older too.
if not _in_time_window(page.updated_at, start, end):
if _parse_canvas_dt(page.updated_at).timestamp() <= start:
early_exit = True
break
# ts > end: page is newer than our window, skip it
continue
doc = self._convert_page_to_document(page)
results.append(
self._maybe_attach_permissions(
doc, course_id, include_permissions
)
)
elif stage == CanvasStage.ASSIGNMENTS:
assignment = CanvasAssignment.from_api(item, course_id=course_id)
if not assignment.updated_at or not _in_time_window(
assignment.updated_at, start, end
):
continue
doc = self._convert_assignment_to_document(assignment)
results.append(
self._maybe_attach_permissions(
doc, course_id, include_permissions
)
)
elif stage == CanvasStage.ANNOUNCEMENTS:
announcement = CanvasAnnouncement.from_api(
item, course_id=course_id
)
if not announcement.posted_at:
logger.debug(
f"Skipping announcement {announcement.id} in "
f"course {course_id}: no posted_at"
)
continue
if not _in_time_window(announcement.posted_at, start, end):
continue
doc = self._convert_announcement_to_document(announcement)
results.append(
self._maybe_attach_permissions(
doc, course_id, include_permissions
)
)
except Exception as e:
item_id = item.get("id") or item.get("page_id", "unknown")
if stage == CanvasStage.PAGES:
doc_link = (
f"{self.canvas_base_url}/courses/{course_id}"
f"/pages/{item.get('url', '')}"
)
else:
doc_link = item.get("html_url", "")
results.append(
ConnectorFailure(
failed_document=DocumentFailure(
document_id=f"canvas-{stage.removesuffix('s')}-{course_id}-{item_id}",
document_link=doc_link,
),
failure_message=f"Failed to process {stage.removesuffix('s')}: {e}",
exception=e,
)
)
return results, early_exit
def _maybe_attach_permissions(
self,
document: Document,
course_id: int,
include_permissions: bool,
) -> Document:
if include_permissions:
document.external_access = self._get_course_permissions(course_id)
return document
def _load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CanvasConnectorCheckpoint,
include_permissions: bool = False,
) -> CheckpointOutput[CanvasConnectorCheckpoint]:
"""Shared implementation for load_from_checkpoint and load_from_checkpoint_with_perm_sync."""
new_checkpoint = checkpoint.model_copy(deep=True)
# First call: materialize the list of course IDs.
# On failure, let the exception propagate so the framework fails the
# attempt cleanly. Swallowing errors here would leave the checkpoint
# state unchanged and cause an infinite retry loop.
if not new_checkpoint.course_ids:
try:
courses = self._list_courses()
except OnyxError as e:
if e.status_code in (401, 403):
_handle_canvas_api_error(e) # NoReturn — always raises
raise
new_checkpoint.course_ids = [c.id for c in courses]
logger.info(f"Found {len(courses)} Canvas courses to process")
new_checkpoint.has_more = len(new_checkpoint.course_ids) > 0
return new_checkpoint
# All courses done.
if new_checkpoint.current_course_index >= len(new_checkpoint.course_ids):
new_checkpoint.has_more = False
return new_checkpoint
course_id = new_checkpoint.course_ids[new_checkpoint.current_course_index]
try:
stage = CanvasStage(new_checkpoint.stage)
except ValueError as e:
raise ValueError(
f"Invalid checkpoint stage: {new_checkpoint.stage!r}. "
f"Valid stages: {[s.value for s in CanvasStage]}"
) from e
# Build endpoint + params from the static template.
config = _STAGE_CONFIG[stage]
endpoint = config["endpoint"].format(course_id=course_id)
params = {k: v.format(course_id=course_id) for k, v in config["params"].items()}
# Only the announcements API supports server-side date filtering
# (start_date/end_date). Pages support server-side sorting
# (sort=updated_at desc) enabling early exit, but not date
# filtering. Assignments support neither. Both are filtered
# client-side via _in_time_window after fetching.
if stage == CanvasStage.ANNOUNCEMENTS:
params["start_date"] = _unix_to_canvas_time(start)
params["end_date"] = _unix_to_canvas_time(end)
try:
response, result_next_url = self._fetch_stage_page(
next_url=new_checkpoint.next_url,
endpoint=endpoint,
params=params,
)
except OnyxError as oe:
# Security errors from _parse_next_link (host/scheme
# mismatch on pagination URLs) have no status code override
# and must not be silenced.
is_api_error = oe._status_code_override is not None
if not is_api_error:
raise
if oe.status_code in (401, 403):
_handle_canvas_api_error(oe) # NoReturn — always raises
# 404 means the course itself is gone or inaccessible. The
# other stages on this course will hit the same 404, so skip
# the whole course rather than burning API calls on each stage.
if oe.status_code == 404:
logger.warning(
f"Canvas course {course_id} not found while fetching "
f"{stage} (HTTP 404). Skipping course."
)
yield ConnectorFailure(
failed_entity=EntityFailure(
entity_id=f"canvas-course-{course_id}",
),
failure_message=(f"Canvas course {course_id} not found: {oe}"),
exception=oe,
)
new_checkpoint.advance_course()
else:
logger.warning(
f"Failed to fetch {stage} for course {course_id}: {oe}. "
f"Skipping remainder of this stage."
)
yield ConnectorFailure(
failed_entity=EntityFailure(
entity_id=f"canvas-{stage}-{course_id}",
),
failure_message=(
f"Failed to fetch {stage} for course {course_id}: {oe}"
),
exception=oe,
)
new_checkpoint.advance_stage()
new_checkpoint.has_more = new_checkpoint.current_course_index < len(
new_checkpoint.course_ids
)
return new_checkpoint
except Exception as e:
# Unknown error — skip the stage and try to continue.
logger.warning(
f"Failed to fetch {stage} for course {course_id}: {e}. "
f"Skipping remainder of this stage."
)
yield ConnectorFailure(
failed_entity=EntityFailure(
entity_id=f"canvas-{stage}-{course_id}",
),
failure_message=(
f"Failed to fetch {stage} for course {course_id}: {e}"
),
exception=e,
)
new_checkpoint.advance_stage()
new_checkpoint.has_more = new_checkpoint.current_course_index < len(
new_checkpoint.course_ids
)
return new_checkpoint
# Process fetched items
results, early_exit = self._process_items(
response, stage, course_id, start, end, include_permissions
)
for result in results:
yield result
# If we hit an item older than our window (pages sorted desc),
# skip remaining pagination and advance to the next stage.
if early_exit:
result_next_url = None
# If there are more pages, save the cursor and return
if result_next_url:
new_checkpoint.next_url = result_next_url
else:
# Stage complete — advance to next stage (or next course if last).
new_checkpoint.advance_stage()
new_checkpoint.has_more = new_checkpoint.current_course_index < len(
new_checkpoint.course_ids
)
return new_checkpoint
@override
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CanvasConnectorCheckpoint,
) -> CheckpointOutput[CanvasConnectorCheckpoint]:
return self._load_from_checkpoint(
start, end, checkpoint, include_permissions=False
)
@override
def load_from_checkpoint_with_perm_sync(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CanvasConnectorCheckpoint,
) -> CheckpointOutput[CanvasConnectorCheckpoint]:
"""Load documents from checkpoint with permission information included."""
return self._load_from_checkpoint(
start, end, checkpoint, include_permissions=True
)
@override
def build_dummy_checkpoint(self) -> CanvasConnectorCheckpoint:
return CanvasConnectorCheckpoint(has_more=True)
@override
def validate_checkpoint_json(
self, checkpoint_json: str
) -> CanvasConnectorCheckpoint:
return CanvasConnectorCheckpoint.model_validate_json(checkpoint_json)
@override
def validate_connector_settings(self) -> None:
"""Validate Canvas connector settings by testing API access."""
@@ -415,38 +771,6 @@ class CanvasConnector(
f"Unexpected error during Canvas settings validation: {exc}"
)
@override
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CanvasConnectorCheckpoint,
) -> CheckpointOutput[CanvasConnectorCheckpoint]:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def load_from_checkpoint_with_perm_sync(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CanvasConnectorCheckpoint,
) -> CheckpointOutput[CanvasConnectorCheckpoint]:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def build_dummy_checkpoint(self) -> CanvasConnectorCheckpoint:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def validate_checkpoint_json(
self, checkpoint_json: str
) -> CanvasConnectorCheckpoint:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def retrieve_all_slim_docs_perm_sync(
self,

View File

@@ -95,11 +95,13 @@ class ClickupConnector(LoadConnector, PollConnector):
params["date_updated_lt"] = end
if self.connector_type == "list":
params["list_ids[]"] = self.connector_ids
params["list_ids[]"] = self.connector_ids # ty: ignore[invalid-assignment]
elif self.connector_type == "folder":
params["project_ids[]"] = self.connector_ids
params["project_ids[]"] = ( # ty: ignore[invalid-assignment]
self.connector_ids
)
elif self.connector_type == "space":
params["space_ids[]"] = self.connector_ids
params["space_ids[]"] = self.connector_ids # ty: ignore[invalid-assignment]
url_endpoint = f"/team/{self.team_id}/task"
@@ -171,7 +173,10 @@ class ClickupConnector(LoadConnector, PollConnector):
document.metadata[extra_field] = task[extra_field]
if self.retrieve_task_comments:
document.sections.extend(self._get_task_comments(task["id"]))
document.sections = [
*document.sections,
*self._get_task_comments(task["id"]),
]
doc_batch.append(document)

View File

@@ -6,7 +6,7 @@ from datetime import timezone
from typing import Any
from urllib.parse import quote
from atlassian.errors import ApiError # type: ignore
from atlassian.errors import ApiError
from requests.exceptions import HTTPError
from typing_extensions import override

View File

@@ -26,7 +26,7 @@ from typing import TypeVar
from urllib.parse import quote
import bs4
from atlassian import Confluence # type:ignore
from atlassian import Confluence
from redis import Redis
from requests import HTTPError
@@ -61,6 +61,9 @@ _USER_NOT_FOUND = "Unknown Confluence User"
_USER_ID_TO_DISPLAY_NAME_CACHE: dict[str, str | None] = {}
_USER_EMAIL_CACHE: dict[str, str | None] = {}
_DEFAULT_PAGINATION_LIMIT = 1000
_MINIMUM_PAGINATION_LIMIT = 5
_SERVER_ERROR_CODES = {500, 502, 503, 504}
_CONFLUENCE_SPACES_API_V1 = "rest/api/space"
_CONFLUENCE_SPACES_API_V2 = "wiki/api/v2/spaces"
@@ -569,7 +572,8 @@ class OnyxConfluence:
if not limit:
limit = _DEFAULT_PAGINATION_LIMIT
url_suffix = update_param_in_path(url_suffix, "limit", str(limit))
current_limit = limit
url_suffix = update_param_in_path(url_suffix, "limit", str(current_limit))
while url_suffix:
logger.debug(f"Making confluence call to {url_suffix}")
@@ -609,40 +613,61 @@ class OnyxConfluence:
)
continue
# If we fail due to a 500, try one by one.
# NOTE: this iterative approach only works for server, since cloud uses cursor-based
# pagination
if raw_response.status_code == 500 and not self._is_cloud:
initial_start = get_start_param_from_url(url_suffix)
if initial_start is None:
# can't handle this if we don't have offset-based pagination
raise
if raw_response.status_code in _SERVER_ERROR_CODES:
# Try reducing the page size -- Confluence often times out
# on large result sets (especially Cloud 504s).
if current_limit > _MINIMUM_PAGINATION_LIMIT:
old_limit = current_limit
current_limit = max(
current_limit // 2, _MINIMUM_PAGINATION_LIMIT
)
logger.warning(
f"Confluence returned {raw_response.status_code}. "
f"Reducing limit from {old_limit} to {current_limit} "
f"and retrying."
)
url_suffix = update_param_in_path(
url_suffix, "limit", str(current_limit)
)
continue
# this will just yield the successful items from the batch
new_url_suffix = yield from self._try_one_by_one_for_paginated_url(
url_suffix,
initial_start=initial_start,
limit=limit,
)
# Limit reduction exhausted -- for Server, fall back to
# one-by-one offset pagination as a last resort.
if not self._is_cloud:
initial_start = get_start_param_from_url(url_suffix)
# this will just yield the successful items from the batch
new_url_suffix = (
yield from self._try_one_by_one_for_paginated_url(
url_suffix,
initial_start=initial_start,
limit=current_limit,
)
)
# this means we ran into an empty page
if new_url_suffix is None:
if next_page_callback:
next_page_callback("")
break
# this means we ran into an empty page
if new_url_suffix is None:
if next_page_callback:
next_page_callback("")
break
url_suffix = new_url_suffix
continue
url_suffix = new_url_suffix
continue
else:
logger.exception(
f"Error in confluence call to {url_suffix} \n"
f"Raw Response Text: {raw_response.text} \n"
f"Full Response: {raw_response.__dict__} \n"
f"Error: {e} \n"
f"Error in confluence call to {url_suffix} "
f"after reducing limit to {current_limit}.\n"
f"Raw Response Text: {raw_response.text}\n"
f"Error: {e}\n"
)
raise
logger.exception(
f"Error in confluence call to {url_suffix} \n"
f"Raw Response Text: {raw_response.text} \n"
f"Full Response: {raw_response.__dict__} \n"
f"Error: {e} \n"
)
raise
try:
next_response = raw_response.json()
except Exception as e:
@@ -680,6 +705,10 @@ class OnyxConfluence:
old_url_suffix = url_suffix
updated_start = get_start_param_from_url(old_url_suffix)
url_suffix = cast(str, next_response.get("_links", {}).get("next", ""))
if url_suffix and current_limit != limit:
url_suffix = update_param_in_path(
url_suffix, "limit", str(current_limit)
)
for i, result in enumerate(results):
updated_start += 1
if url_suffix and next_page_callback and i == len(results) - 1:
@@ -942,7 +971,7 @@ class OnyxConfluence:
:return: Returns the user details
"""
from atlassian.errors import ApiPermissionError # type:ignore
from atlassian.errors import ApiPermissionError
url = "rest/api/user/current"
params = {}

View File

@@ -165,7 +165,7 @@ class ConnectorRunner(Generic[CT]):
checkpoint_connector_generator = load_from_checkpoint(
start=self.time_range[0].timestamp(),
end=self.time_range[1].timestamp(),
checkpoint=checkpoint,
checkpoint=checkpoint, # ty: ignore[invalid-argument-type]
)
next_checkpoint: CT | None = None
# this is guaranteed to always run at least once with next_checkpoint being non-None
@@ -174,7 +174,9 @@ class ConnectorRunner(Generic[CT]):
hierarchy_node,
failure,
next_checkpoint,
) in CheckpointOutputWrapper[CT]()(checkpoint_connector_generator):
) in CheckpointOutputWrapper[CT]()(
checkpoint_connector_generator # ty: ignore[invalid-argument-type]
):
if document is not None:
self.doc_batch.append(document)

View File

@@ -83,7 +83,9 @@ class OnyxDBCredentialsProvider(
f"No credential found: credential={self._credential_id}"
)
credential.credential_json = credential_json # type: ignore[assignment]
credential.credential_json = ( # ty: ignore[invalid-assignment]
credential_json
)
db_session.commit()
except Exception:
db_session.rollback()

View File

@@ -0,0 +1,69 @@
import csv
import io
from typing import IO
from onyx.connectors.models import TabularSection
from onyx.file_processing.extract_file_text import file_io_to_text
from onyx.file_processing.extract_file_text import xlsx_sheet_extraction
from onyx.file_processing.file_types import OnyxFileExtensions
from onyx.utils.logger import setup_logger
logger = setup_logger()
def is_tabular_file(file_name: str) -> bool:
lowered = file_name.lower()
return any(lowered.endswith(ext) for ext in OnyxFileExtensions.TABULAR_EXTENSIONS)
def _tsv_to_csv(tsv_text: str) -> str:
"""Re-serialize tab-separated text as CSV so downstream parsers that
assume the default Excel dialect read the columns correctly."""
out = io.StringIO()
csv.writer(out, lineterminator="\n").writerows(
csv.reader(io.StringIO(tsv_text), dialect="excel-tab")
)
return out.getvalue().rstrip("\n")
def tabular_file_to_sections(
file: IO[bytes],
file_name: str,
link: str = "",
) -> list[TabularSection]:
"""Convert a tabular file into one or more TabularSections.
- .xlsx → one TabularSection per non-empty sheet.
- .csv / .tsv → a single TabularSection containing the full decoded
file.
Returns an empty list when the file yields no extractable content.
"""
lowered = file_name.lower()
if lowered.endswith(".xlsx"):
return [
TabularSection(
link=link or file_name,
text=csv_text,
heading=f"{file_name} :: {sheet_title}",
)
for csv_text, sheet_title in xlsx_sheet_extraction(
file, file_name=file_name
)
]
if not lowered.endswith((".csv", ".tsv")):
raise ValueError(f"{file_name!r} is not a tabular file")
try:
text = file_io_to_text(file).strip()
except Exception:
logger.exception(f"Failure decoding {file_name}")
raise
if not text:
return []
if lowered.endswith(".tsv"):
text = _tsv_to_csv(text)
return [TabularSection(link=link or file_name, text=text)]

View File

@@ -53,8 +53,10 @@ def _convert_message_to_document(
if isinstance(message.channel, TextChannel) and (
channel_name := message.channel.name
):
metadata["Channel"] = channel_name
semantic_substring += f" in Channel: #{channel_name}"
metadata["Channel"] = channel_name # ty: ignore[possibly-unresolved-reference]
semantic_substring += (
f" in Channel: #{channel_name}" # ty: ignore[possibly-unresolved-reference]
)
# Single messages dont have a title
title = ""

View File

@@ -2,10 +2,10 @@ from datetime import timezone
from io import BytesIO
from typing import Any
from dropbox import Dropbox # type: ignore[import-untyped]
from dropbox.exceptions import ApiError # type: ignore[import-untyped]
from dropbox import Dropbox
from dropbox.exceptions import ApiError
from dropbox.exceptions import AuthError
from dropbox.files import FileMetadata # type: ignore[import-untyped]
from dropbox.files import FileMetadata
from dropbox.files import FolderMetadata
from onyx.configs.app_configs import INDEX_BATCH_SIZE

View File

@@ -15,6 +15,10 @@ from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
)
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rate_limit_builder
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rl_requests
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
tabular_file_to_sections,
)
from onyx.connectors.drupal_wiki.models import DrupalWikiCheckpoint
from onyx.connectors.drupal_wiki.models import DrupalWikiPage
from onyx.connectors.drupal_wiki.models import DrupalWikiPageResponse
@@ -33,6 +37,7 @@ from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_processing.extract_file_text import get_file_ext
@@ -213,7 +218,7 @@ class DrupalWikiConnector(
attachment: dict[str, Any],
page_id: int,
download_url: str,
) -> tuple[list[TextSection | ImageSection], str | None]:
) -> tuple[list[TextSection | ImageSection | TabularSection], str | None]:
"""
Process a single attachment and return generated sections.
@@ -226,7 +231,7 @@ class DrupalWikiConnector(
Tuple of (sections, error_message). If error_message is not None, the
sections list should be treated as invalid.
"""
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
try:
if not self._validate_attachment_filetype(attachment):
@@ -273,6 +278,25 @@ class DrupalWikiConnector(
return sections, None
# Tabular attachments (xlsx, csv, tsv) — produce
# TabularSections instead of a flat TextSection.
if is_tabular_file(file_name):
try:
sections.extend(
tabular_file_to_sections(
BytesIO(raw_bytes),
file_name=file_name,
link=download_url,
)
)
except Exception:
logger.exception(
f"Failed to extract tabular sections from {file_name}"
)
if not sections:
return [], f"No content extracted from tabular file {file_name}"
return sections, None
image_counter = 0
def _store_embedded_image(image_data: bytes, image_name: str) -> None:
@@ -497,7 +521,7 @@ class DrupalWikiConnector(
page_url = build_drupal_wiki_document_id(self.base_url, page.id)
# Create sections with just the page content
sections: list[TextSection | ImageSection] = [
sections: list[TextSection | ImageSection | TabularSection] = [
TextSection(text=text_content, link=page_url)
]

View File

@@ -2,6 +2,7 @@ import json
import os
from datetime import datetime
from datetime import timezone
from io import BytesIO
from pathlib import Path
from typing import Any
from typing import IO
@@ -12,11 +13,16 @@ from onyx.configs.constants import FileOrigin
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
process_onyx_metadata,
)
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
tabular_file_to_sections,
)
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.models import Document
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_processing.extract_file_text import get_file_ext
@@ -179,8 +185,32 @@ def _process_file(
link = onyx_metadata.link or link
# Build sections: first the text as a single Section
sections: list[TextSection | ImageSection] = []
if extraction_result.text_content.strip():
sections: list[TextSection | ImageSection | TabularSection] = []
if is_tabular_file(file_name):
# Produce TabularSections
lowered_name = file_name.lower()
if lowered_name.endswith(".xlsx"):
file.seek(0)
tabular_source: IO[bytes] = file
else:
tabular_source = BytesIO(
extraction_result.text_content.encode("utf-8", errors="replace")
)
try:
sections.extend(
tabular_file_to_sections(
file=tabular_source,
file_name=file_name,
link=link or "",
)
)
except Exception as e:
logger.error(f"Failed to process tabular file {file_name}: {e}")
return []
if not sections:
logger.warning(f"No content extracted from tabular file {file_name}")
return []
elif extraction_result.text_content.strip():
logger.debug(f"Creating TextSection for {file_name} with link: {link}")
sections.append(
TextSection(link=link, text=extraction_result.text_content.strip())

View File

@@ -22,6 +22,7 @@ from typing_extensions import override
from onyx.access.models import ExternalAccess
from onyx.configs.app_configs import GITHUB_CONNECTOR_BASE_URL
from onyx.configs.constants import DocumentSource
from onyx.connectors.connector_runner import CheckpointOutputWrapper
from onyx.connectors.connector_runner import ConnectorRunner
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
@@ -35,10 +36,16 @@ from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import ConnectorCheckpoint
from onyx.connectors.interfaces import ConnectorFailure
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import IndexingHeartbeatInterface
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TextSection
from onyx.utils.logger import setup_logger
@@ -427,7 +434,11 @@ def make_cursor_url_callback(
return cursor_url_callback
class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoint]):
class GithubConnector(
CheckpointedConnectorWithPermSync[GithubConnectorCheckpoint],
SlimConnector,
SlimConnectorWithPermSync,
):
def __init__(
self,
repo_owner: str,
@@ -559,6 +570,7 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
start: datetime | None = None,
end: datetime | None = None,
include_permissions: bool = False,
is_slim: bool = False,
) -> Generator[Document | ConnectorFailure, None, GithubConnectorCheckpoint]:
if self.github_client is None:
raise ConnectorMissingCredentialError("GitHub")
@@ -614,36 +626,46 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
for pr in pr_batch:
num_prs += 1
# we iterate backwards in time, so at this point we stop processing prs
if (
start is not None
and pr.updated_at
and pr.updated_at.replace(tzinfo=timezone.utc) < start
):
done_with_prs = True
break
# Skip PRs updated after the end date
if (
end is not None
and pr.updated_at
and pr.updated_at.replace(tzinfo=timezone.utc) > end
):
continue
try:
yield _convert_pr_to_document(
cast(PullRequest, pr), repo_external_access
if is_slim:
yield Document(
id=pr.html_url,
sections=[],
external_access=repo_external_access,
source=DocumentSource.GITHUB,
semantic_identifier="",
metadata={},
)
except Exception as e:
error_msg = f"Error converting PR to document: {e}"
logger.exception(error_msg)
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=str(pr.id), document_link=pr.html_url
),
failure_message=error_msg,
exception=e,
)
continue
else:
# we iterate backwards in time, so at this point we stop processing prs
if (
start is not None
and pr.updated_at
and pr.updated_at.replace(tzinfo=timezone.utc) < start
):
done_with_prs = True
break
# Skip PRs updated after the end date
if (
end is not None
and pr.updated_at
and pr.updated_at.replace(tzinfo=timezone.utc) > end
):
continue
try:
yield _convert_pr_to_document(
cast(PullRequest, pr), repo_external_access
)
except Exception as e:
error_msg = f"Error converting PR to document: {e}"
logger.exception(error_msg)
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=str(pr.id), document_link=pr.html_url
),
failure_message=error_msg,
exception=e,
)
continue
# If we reach this point with a cursor url in the checkpoint, we were using
# the fallback cursor-based pagination strategy. That strategy tries to get all
@@ -689,38 +711,47 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
for issue in issue_batch:
num_issues += 1
issue = cast(Issue, issue)
# we iterate backwards in time, so at this point we stop processing prs
if (
start is not None
and issue.updated_at.replace(tzinfo=timezone.utc) < start
):
done_with_issues = True
break
# Skip PRs updated after the end date
if (
end is not None
and issue.updated_at.replace(tzinfo=timezone.utc) > end
):
continue
if issue.pull_request is not None:
# PRs are handled separately
continue
try:
yield _convert_issue_to_document(issue, repo_external_access)
except Exception as e:
error_msg = f"Error converting issue to document: {e}"
logger.exception(error_msg)
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=str(issue.id),
document_link=issue.html_url,
),
failure_message=error_msg,
exception=e,
if is_slim:
yield Document(
id=issue.html_url,
sections=[],
external_access=repo_external_access,
source=DocumentSource.GITHUB,
semantic_identifier="",
metadata={},
)
continue
else:
# we iterate backwards in time, so at this point we stop processing issues
if (
start is not None
and issue.updated_at.replace(tzinfo=timezone.utc) < start
):
done_with_issues = True
break
# Skip issues updated after the end date
if (
end is not None
and issue.updated_at.replace(tzinfo=timezone.utc) > end
):
continue
try:
yield _convert_issue_to_document(issue, repo_external_access)
except Exception as e:
error_msg = f"Error converting issue to document: {e}"
logger.exception(error_msg)
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=str(issue.id),
document_link=issue.html_url,
),
failure_message=error_msg,
exception=e,
)
continue
logger.info(f"Fetched {num_issues} issues for repo: {repo.name}")
# if we found any issues on the page, and we're not done, return the checkpoint.
@@ -803,6 +834,60 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
start, end, checkpoint, include_permissions=True
)
def _retrieve_slim_docs(
self,
include_permissions: bool,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
"""Iterate all PRs and issues across all configured repos as SlimDocuments.
Drives _fetch_from_github in a checkpoint loop — each call processes one
page and returns an updated checkpoint. CheckpointOutputWrapper handles
draining the generator and extracting the returned checkpoint. Rate
limiting and pagination are handled centrally by _fetch_from_github via
_get_batch_rate_limited.
"""
checkpoint = self.build_dummy_checkpoint()
while checkpoint.has_more:
batch: list[SlimDocument | HierarchyNode] = []
gen = self._fetch_from_github(
checkpoint, include_permissions=include_permissions, is_slim=True
)
wrapper: CheckpointOutputWrapper[GithubConnectorCheckpoint] = (
CheckpointOutputWrapper()
)
for document, _, _, next_checkpoint in wrapper(gen):
if document is not None:
batch.append(
SlimDocument(
id=document.id, external_access=document.external_access
)
)
if next_checkpoint is not None:
checkpoint = next_checkpoint
if batch:
yield batch
if callback and callback.should_stop():
raise RuntimeError("github_slim_docs: Stop signal detected")
@override
def retrieve_all_slim_docs(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
return self._retrieve_slim_docs(include_permissions=False, callback=callback)
@override
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
return self._retrieve_slim_docs(include_permissions=True, callback=callback)
def validate_connector_settings(self) -> None:
if self.github_client is None:
raise ConnectorMissingCredentialError("GitHub credentials not loaded.")

View File

@@ -7,7 +7,7 @@ from typing import Dict
from google.oauth2.credentials import Credentials as OAuthCredentials
from google.oauth2.service_account import Credentials as ServiceAccountCredentials
from googleapiclient.errors import HttpError # type: ignore
from googleapiclient.errors import HttpError
from onyx.access.models import ExternalAccess
from onyx.configs.app_configs import INDEX_BATCH_SIZE
@@ -296,7 +296,9 @@ def _full_thread_from_id(
try:
thread = next(
execute_single_retrieval(
retrieval_function=gmail_service.users().threads().get,
retrieval_function=gmail_service.users() # ty: ignore[unresolved-attribute]
.threads()
.get,
list_key=None,
userId=user_email,
fields=THREAD_FIELDS,
@@ -394,7 +396,7 @@ class GmailConnector(
admin_service = get_admin_service(self.creds, self.primary_admin_email)
emails = []
for user in execute_paginated_retrieval(
retrieval_function=admin_service.users().list,
retrieval_function=admin_service.users().list, # ty: ignore[unresolved-attribute]
list_key="users",
fields=USER_FIELDS,
domain=self.google_domain,
@@ -438,7 +440,9 @@ class GmailConnector(
try:
for thread in execute_paginated_retrieval_with_max_pages(
max_num_pages=PAGES_PER_CHECKPOINT,
retrieval_function=gmail_service.users().threads().list,
retrieval_function=gmail_service.users() # ty: ignore[unresolved-attribute]
.threads()
.list,
list_key="threads",
userId=user_email,
fields=THREAD_LIST_FIELDS,

View File

@@ -110,7 +110,7 @@ class GongConnector(LoadConnector, PollConnector):
# The batch_ids in the previous method appears to be batches of call_ids to process
# In this method, we will retrieve transcripts for them in batches.
transcripts: list[dict[str, Any]] = []
workspace_list = self.workspaces or [None] # type: ignore
workspace_list = self.workspaces or [None]
workspace_map = self._get_workspace_id_map() if self.workspaces else {}
for workspace in workspace_list:

Some files were not shown because too many files have changed in this diff Show More