Compare commits

..

63 Commits

Author SHA1 Message Date
Bo-Onyx
19e4f9b377 fix(pruning): WEB connector pruning timeout by implementing SlimConnector 2026-04-17 18:13:43 -07:00
Raunak Bhagat
146d8522df refactor: replace raw Card+Content message patterns with @opal/components.MessageCard (#10342) 2026-04-17 22:48:45 +00:00
acaprau
ac5bae3631 feat(opensearch): Allow optional disabling of SSL to OpenSearch via env var (#10339) 2026-04-17 22:45:52 +00:00
Jamison Lahman
b5434b2391 chore(lint): run shellcheck in pre-commit (#10043) 2026-04-17 21:21:14 +00:00
aserafin-mtt
28e13b503b Fix: mcp langfuse tracing and Jira adf parsing (#10314) 2026-04-17 14:24:16 -07:00
Jamison Lahman
99a90ec196 chore(devtools): ods web installs node_modules on init (#10046) 2026-04-17 21:03:26 +00:00
Jamison Lahman
8ffd7fbb56 chore(gha): skip ty pre-commit in CI (#10337) 2026-04-17 20:56:09 +00:00
Justin Tahara
f9e88e3c72 feat(mt): Tenant work-gating gate + metrics (3/3) (#10281) 2026-04-17 20:44:58 +00:00
Bo-Onyx
97efdbbbc3 fix(pruning): Fix SharePoint sp_tenant_domain not resolved for client secret auth with site pages (#10326) 2026-04-17 20:39:22 +00:00
Justin Tahara
b91a3aed53 fix(xlsx): Additional fixes for mime types (#10331) 2026-04-17 20:26:14 +00:00
Justin Tahara
51480e1099 fix(metrics): Adding in hostname (#10335) 2026-04-17 20:22:20 +00:00
acaprau
70efbef95e feat(opensearch): Add option to disable Vespa and run Onyx with only OpenSearch (#10330) 2026-04-17 13:27:03 -07:00
Wenxi
f3936e2669 fix: s3 test assertion (#10329) 2026-04-17 20:17:16 +00:00
Wenxi
c933c71b59 feat: add document sets to mcp server options (#10322) 2026-04-17 19:52:35 +00:00
Raunak Bhagat
e0d9e109b5 refactor: Rename /admin/configuration/llm to /admin/configuration/language-models (#10327) 2026-04-17 19:47:50 +00:00
Jamison Lahman
66c361bd37 fix(deps): install transitive vertexai dependency (#10328) 2026-04-17 12:14:46 -07:00
Wenxi
01cbea8c4b fix: zuplip temp dir init (#10324) 2026-04-17 18:37:30 +00:00
Raunak Bhagat
2dc2b0da84 refactor: Update toast notifications look and feel (#10320) 2026-04-17 17:54:56 +00:00
Raunak Bhagat
4b58c9cda6 fix: chat preferences page spacing and layout cleanup (#10317) 2026-04-17 09:42:36 -07:00
Justin Tahara
7eb945f060 fix(web): Sentry Token Check (#10310) 2026-04-17 15:44:24 +00:00
Raunak Bhagat
e29f948f29 refactor: migrate ActionsLayouts to @opal/layouts's CardLayout.Header (#10301) 2026-04-17 15:39:32 +00:00
Jamison Lahman
7a18b896aa fix(fe): LineItem can disable icon stroke (#10289) 2026-04-17 15:21:23 +00:00
Bo-Onyx
53e00c7989 fix(pruning): Google Drive Connecto - Skip redundant folder metadata calls for orphaned folders (#10304) 2026-04-17 06:06:30 +00:00
Raunak Bhagat
50df53727a refactor: migrate EmptyMessage to @opal/components.EmptyMessageCard (#10302) 2026-04-17 03:41:21 +00:00
Raunak Bhagat
e629574580 feat(opal): opalify ExpandableCard (#10276) 2026-04-17 03:03:55 +00:00
Justin Tahara
8d539cdf3f fix(image): Cap Uploaded File Image Count (#10298) 2026-04-17 02:27:10 +00:00
dependabot[bot]
52524cbe57 chore(deps): bump authlib from 1.6.9 to 1.6.11 (#10293)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-16 17:53:49 -07:00
Jamison Lahman
c64def6a9e fix(llm-selector): show each provider instance as its own group (#10292)
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 00:20:00 +00:00
Wenxi
2628fe1b93 fix: gmail datetime parsing on unexpected values (#10290) 2026-04-16 23:48:45 +00:00
Raunak Bhagat
96bf344f9c feat(opal): add LinkButton component (#10275) 2026-04-16 23:23:03 +00:00
dependabot[bot]
b92d3a307d chore(deps): bump mako from 1.2.4 to 1.3.11 in /backend/requirements (#10286)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-16 15:57:29 -07:00
dependabot[bot]
c55207eeba chore(deps): bump pypdf from 6.10.0 to 6.10.2 in /backend/requirements (#10287)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-16 15:56:09 -07:00
Bo-Onyx
2de56cd65f fix(pruning): Add pruning dashboard panels and reorder layout (#10279) 2026-04-16 22:09:33 +00:00
Jamison Lahman
92bc13f920 fix(logos): github logo displays correctly in dark mode (#10269) 2026-04-16 22:03:49 +00:00
Nikolas Garza
3ddcf101bf feat(connectors): convert Gong connector from poll to checkpointed (#10258) 2026-04-16 21:33:44 +00:00
Jamison Lahman
9f764ee55f feat(anthropic): include Opus 4.7 in recommended models (#10273) 2026-04-16 21:26:43 +00:00
Danelegend
4d059b5e0f feat(indexing): Total descriptor chunk emitted during Tabular Indexing (#10259) 2026-04-16 20:49:36 +00:00
Raunak Bhagat
57e78cf4c9 feat(opal): add SvgVector icon (#10274) 2026-04-16 13:52:02 -07:00
Danelegend
48e74ad3ef feat(img): Editing User Uploaded Images (#10264) 2026-04-16 20:39:18 +00:00
Nikolas Garza
ca10520190 chore(ci): bump docker/setup-buildx-action to v4 and docker/build-push-action to v7 (#10270) 2026-04-16 20:36:03 +00:00
Justin Tahara
d128508838 fix(mt): Tighten tenant work-gating writer hooks (2.5/3) (#10268) 2026-04-16 20:05:16 +00:00
Jamison Lahman
f64cd1dd63 chore(playwright): always hide the text under the onyx-logo (#10267) 2026-04-16 19:49:12 +00:00
Jamison Lahman
210d11aa5d chore(devcontainer): pre-seed known_hosts, install build-essentials (#10266) 2026-04-16 12:29:37 -07:00
Justin Tahara
f9458c86ec feat(mt): Writer Hooks for tenant work gating (2/3) (#10246) 2026-04-16 18:17:50 +00:00
Danelegend
369306a0f3 feat(indexing): Sheet descriptor chunk for tabular indexing (#10245) 2026-04-16 18:08:39 +00:00
acaprau
8af6ee9c9b chore(opensearch): Add option to conditionally disable migration task (#10260) 2026-04-16 10:59:44 -07:00
Jamison Lahman
f5f953cc28 chore(python): fix ty warnings (#10265) 2026-04-16 17:52:40 +00:00
Justin Tahara
3f360e462f fix(openpyxl): Adding support for xlsm (#10261) 2026-04-16 17:33:15 +00:00
Nikolas Garza
0602353b2b fix(metrics): move in_progress emission from subprocess to watchdog thread (#10249) 2026-04-16 17:24:12 +00:00
acaprau
78288867b7 chore(search): Search API load test script (#10248) 2026-04-16 10:32:44 -07:00
Jamison Lahman
0e7b99f960 chore(pre-commit): add ty hook (#10263) 2026-04-16 10:27:04 -07:00
Nikolas Garza
3f2d0a0567 fix(helm): increase proxy timeouts to prevent HTTP/2 stream resets (#10247) 2026-04-16 17:20:55 +00:00
Justin Tahara
e0897265e3 feat(celery): Monitoring for Primary Worker (#10257) 2026-04-16 17:14:59 +00:00
dependabot[bot]
bc9c03ab76 chore(deps): bump dompurify from 3.3.2 to 3.4.0 in /widget (#10253)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-16 17:05:56 +00:00
Jamison Lahman
dfc3886683 chore(python): replace mypy with ty (#10256) 2026-04-16 09:32:27 -07:00
dependabot[bot]
a3cb45e56d chore(deps): bump hono from 4.12.12 to 4.12.14 in /backend/onyx/server/features/build/sandbox/kubernetes/docker/templates/outputs/web (#10252)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-15 20:06:35 -07:00
dependabot[bot]
6fd07f44e1 chore(deps): bump langsmith from 0.3.45 to 0.7.31 in /backend/requirements (#10250)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jamison Lahman <jamison@lahman.dev>
2026-04-15 19:59:31 -07:00
Jamison Lahman
2a3b487fad chore(tests): remove defunct test_litellm_embedding (#10244) 2026-04-16 00:11:06 +00:00
Nikolas Garza
a14dc4e632 chore(helm): update Grafana dashboard for new push-based metric names (#10238) 2026-04-16 00:06:23 +00:00
jaffar keikei
b6467e8e3e fix: invert already_existed logic in ingestion API response (#9999) 2026-04-15 23:18:56 +00:00
Nikolas Garza
546da624a1 feat(metrics): add connector_name label to push-based connector metrics (#10237) 2026-04-15 22:58:49 +00:00
Nikolas Garza
1a88dea760 fix(model-server): add missing onyx/configs to Dockerfile for sentry support (#10236) 2026-04-15 22:42:00 +00:00
Justin Tahara
53d2d647c5 fix(deletion): Commit Session in per-doc cleanup (#10193) 2026-04-15 22:37:00 +00:00
487 changed files with 9320 additions and 3726 deletions

View File

@@ -1,6 +1,7 @@
FROM ubuntu:26.04@sha256:cc925e589b7543b910fea57a240468940003fbfc0515245a495dd0ad8fe7cef1
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
curl \
default-jre \
fd-find \
@@ -61,3 +62,11 @@ RUN chsh -s /bin/zsh root && \
echo '[ -f /workspace/.devcontainer/zshrc ] && . /workspace/.devcontainer/zshrc' >> "$rc"; \
done && \
chown dev:dev /home/dev/.zshrc
# Pre-seed GitHub's SSH host keys so git-over-SSH never prompts. Keys are
# pinned in-repo (verified against the fingerprints GitHub publishes at
# https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/githubs-ssh-key-fingerprints)
# rather than fetched at build time, so a compromised build-time network can't
# inject a rogue key.
COPY github_known_hosts /etc/ssh/ssh_known_hosts
RUN chmod 644 /etc/ssh/ssh_known_hosts

View File

@@ -1,7 +1,11 @@
{
"name": "Onyx Dev Sandbox",
"image": "onyxdotapp/onyx-devcontainer@sha256:0f02d9299928849c7b15f3b348dcfdcdcb64411ff7a4580cbc026a6ee7aa1554",
"runArgs": ["--cap-add=NET_ADMIN", "--cap-add=NET_RAW", "--network=onyx_default"],
"image": "onyxdotapp/onyx-devcontainer@sha256:4986c9252289b660ce772b45f0488b938fe425d8114245e96ef64b273b3fcee4",
"runArgs": [
"--cap-add=NET_ADMIN",
"--cap-add=NET_RAW",
"--network=onyx_default"
],
"mounts": [
"source=${localEnv:HOME}/.claude,target=/home/dev/.claude,type=bind",
"source=${localEnv:HOME}/.claude.json,target=/home/dev/.claude.json,type=bind",

View File

@@ -0,0 +1,3 @@
github.com ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCj7ndNxQowgcQnjshcLrqPEiiphnt+VTTvDP6mHBL9j1aNUkY4Ue1gvwnGLVlOhGeYrnZaMgRK6+PKCUXaDbC7qtbW8gIkhL7aGCsOr/C56SJMy/BCZfxd1nWzAOxSDPgVsmerOBYfNqltV9/hWCqBywINIR+5dIg6JTJ72pcEpEjcYgXkE2YEFXV1JHnsKgbLWNlhScqb2UmyRkQyytRLtL+38TGxkxCflmO+5Z8CSSNY7GidjMIZ7Q4zMjA2n1nGrlTDkzwDCsw+wqFPGQA179cnfGWOWRVruj16z6XyvxvjJwbz0wQZ75XK5tKSb7FNyeIEs4TT4jk+S4dhPeAUC5y+bDYirYgM4GC7uEnztnZyaVWQ7B381AK4Qdrwt51ZqExKbQpTUNn+EjqoTwvqNj4kqx5QUCI0ThS/YkOxJCXmPUWZbhjpCg56i+2aB6CmK2JGhn57K5mj0MNdBXA4/WnwH6XoPWJzK5Nyu2zB3nAZp+S5hpQs+p1vN1/wsjk=
github.com ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEmKSENjQEezOmxkZMy7opKgwFB9nkt5YRrYMjNuG5N87uRgg6CLrbo5wAdT/y6v0mKV0U2w0WZ2YB/++Tpockg=
github.com ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIOMqqnkVzrm0SdG6UOoqKLsabgH5C9okWi0dh2l9GKJl

View File

@@ -45,7 +45,7 @@ if [ "$ACTIVE_HOME" != "$MOUNT_HOME" ]; then
[ -d "$MOUNT_HOME/$item" ] || continue
if [ -e "$ACTIVE_HOME/$item" ] && [ ! -L "$ACTIVE_HOME/$item" ]; then
echo "warning: replacing $ACTIVE_HOME/$item with symlink to $MOUNT_HOME/$item" >&2
rm -rf "$ACTIVE_HOME/$item"
rm -rf "${ACTIVE_HOME:?}/$item"
fi
ln -sfn "$MOUNT_HOME/$item" "$ACTIVE_HOME/$item"
done

View File

@@ -4,6 +4,17 @@ set -euo pipefail
echo "Setting up firewall..."
# Reset default policies to ACCEPT before flushing rules. On re-runs the
# previous invocation's DROP policies are still in effect; flushing rules while
# the default is DROP would block the DNS lookups below. Register a trap so
# that if the script exits before the DROP policies are re-applied at the end,
# we fail closed instead of leaving the container with an unrestricted
# firewall.
trap 'iptables -P INPUT DROP; iptables -P OUTPUT DROP; iptables -P FORWARD DROP' EXIT
iptables -P INPUT ACCEPT
iptables -P OUTPUT ACCEPT
iptables -P FORWARD ACCEPT
# Only flush the filter table. The nat and mangle tables are managed by Docker
# (DNS DNAT to 127.0.0.11, container networking, etc.) and must not be touched —
# flushing them breaks Docker's embedded DNS resolver.
@@ -34,8 +45,16 @@ ALLOWED_DOMAINS=(
"pypi.org"
"files.pythonhosted.org"
"go.dev"
"proxy.golang.org"
"sum.golang.org"
"storage.googleapis.com"
"dl.google.com"
"static.rust-lang.org"
"index.crates.io"
"static.crates.io"
"archive.ubuntu.com"
"security.ubuntu.com"
"deb.nodesource.com"
)
for domain in "${ALLOWED_DOMAINS[@]}"; do

View File

@@ -462,7 +462,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -472,7 +472,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./web
file: ./web/Dockerfile
@@ -536,7 +536,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -546,7 +546,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./web
file: ./web/Dockerfile
@@ -597,7 +597,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -676,7 +676,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -686,7 +686,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./web
file: ./web/Dockerfile
@@ -761,7 +761,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -771,7 +771,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./web
file: ./web/Dockerfile
@@ -833,7 +833,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -908,7 +908,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -918,7 +918,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -981,7 +981,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -991,7 +991,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -1041,7 +1041,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -1119,7 +1119,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -1129,7 +1129,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -1192,7 +1192,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -1202,7 +1202,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -1253,7 +1253,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -1329,7 +1329,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
with:
buildkitd-flags: ${{ vars.DOCKER_DEBUG == 'true' && '--debug' || '' }}
@@ -1341,7 +1341,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
env:
DEBUG: ${{ vars.DOCKER_DEBUG == 'true' && 1 || 0 }}
with:
@@ -1409,7 +1409,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
with:
buildkitd-flags: ${{ vars.DOCKER_DEBUG == 'true' && '--debug' || '' }}
@@ -1421,7 +1421,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
env:
DEBUG: ${{ vars.DOCKER_DEBUG == 'true' && 1 || 0 }}
with:
@@ -1475,7 +1475,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3

View File

@@ -21,7 +21,7 @@ jobs:
timeout-minutes: 45
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3

View File

@@ -21,7 +21,7 @@ jobs:
timeout-minutes: 45
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3

View File

@@ -115,7 +115,7 @@ jobs:
echo "cache-suffix=${CACHE_SUFFIX}" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling Vespa, Redis, Postgres, and Minio images
# otherwise, we hit the "Unauthenticated users" limit
@@ -127,7 +127,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Backend Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -175,7 +175,7 @@ jobs:
echo "cache-suffix=${CACHE_SUFFIX}" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling Vespa, Redis, Postgres, and Minio images
# otherwise, we hit the "Unauthenticated users" limit
@@ -187,7 +187,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Model Server Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile.model_server
@@ -220,7 +220,7 @@ jobs:
persist-credentials: false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling openapitools/openapi-generator-cli
# otherwise, we hit the "Unauthenticated users" limit

View File

@@ -94,7 +94,7 @@ jobs:
echo "cache-suffix=${CACHE_SUFFIX}" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling external images otherwise, we hit the "Unauthenticated users" limit
# https://docs.docker.com/docker-hub/usage/
@@ -105,7 +105,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Web Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./web
file: ./web/Dockerfile
@@ -155,7 +155,7 @@ jobs:
echo "cache-suffix=${CACHE_SUFFIX}" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling external images otherwise, we hit the "Unauthenticated users" limit
# https://docs.docker.com/docker-hub/usage/
@@ -166,7 +166,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Backend Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -216,7 +216,7 @@ jobs:
echo "cache-suffix=${CACHE_SUFFIX}" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling external images otherwise, we hit the "Unauthenticated users" limit
# https://docs.docker.com/docker-hub/usage/
@@ -227,7 +227,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Model Server Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile.model_server

View File

@@ -19,16 +19,16 @@ permissions:
jobs:
mypy-check:
# See https://runs-on.com/runners/linux/
# Note: Mypy seems quite optimized for x64 compared to arm64.
# Similarly, mypy is single-threaded and incremental, so 2cpu is sufficient.
# NOTE: This job is named mypy-check for branch protection compatibility,
# but it actually runs ty (astral-sh's Rust type checker).
runs-on:
[
runs-on,
runner=2cpu-linux-x64,
runner=2cpu-linux-arm64,
"run-id=${{ github.run_id }}-mypy-check",
"extras=s3-cache",
]
timeout-minutes: 45
timeout-minutes: 15
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
@@ -46,26 +46,7 @@ jobs:
backend/requirements/model_server.txt
backend/requirements/ee.txt
- name: Generate OpenAPI schema and Python client
shell: bash
# TODO(Nik): https://linear.app/onyx-app/issue/ENG-1/update-test-infra-to-use-test-license
- name: Run ty
env:
LICENSE_ENFORCEMENT_ENABLED: "false"
run: |
ods openapi all
- name: Cache mypy cache
if: ${{ vars.DISABLE_MYPY_CACHE != 'true' }}
uses: runs-on/cache@a5f51d6f3fece787d03b7b4e981c82538a0654ed # ratchet:runs-on/cache@v4
with:
path: .mypy_cache
key: mypy-${{ runner.os }}-${{ github.base_ref || github.event.merge_group.base_ref || 'main' }}-${{ hashFiles('**/*.py', '**/*.pyi', 'pyproject.toml') }}
restore-keys: |
mypy-${{ runner.os }}-${{ github.base_ref || github.event.merge_group.base_ref || 'main' }}-
mypy-${{ runner.os }}-
- name: Run MyPy
env:
MYPY_FORCE_COLOR: 1
TERM: xterm-256color
run: mypy .
run: ty check --output-format github

View File

@@ -17,8 +17,6 @@ env:
# API keys for testing
COHERE_API_KEY: ${{ secrets.COHERE_API_KEY }}
LITELLM_API_KEY: ${{ secrets.LITELLM_API_KEY }}
LITELLM_API_URL: ${{ secrets.LITELLM_API_URL }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
AZURE_API_KEY: ${{ secrets.AZURE_API_KEY }}
AZURE_API_URL: ${{ vars.AZURE_API_URL }}
@@ -71,7 +69,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Build and load
uses: docker/bake-action@82490499d2e5613fcead7e128237ef0b0ea210f7 # ratchet:docker/bake-action@v7.0.0

View File

@@ -39,6 +39,8 @@ jobs:
working-directory: ./web
run: npm ci
- uses: j178/prek-action@cbc2f23eb5539cf20d82d1aabd0d0ecbcc56f4e3
env:
SKIP: ty
with:
prek-version: '0.3.4'
extra-args: ${{ github.event_name == 'pull_request' && format('--from-ref {0} --to-ref {1}', github.event.pull_request.base.sha, github.event.pull_request.head.sha) || github.event_name == 'merge_group' && format('--from-ref {0} --to-ref {1}', github.event.merge_group.base_sha, github.event.merge_group.head_sha) || github.ref_name == 'main' && '--all-files' || '' }}

View File

@@ -132,7 +132,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -142,7 +142,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend/onyx/server/features/build/sandbox/kubernetes/docker
file: ./backend/onyx/server/features/build/sandbox/kubernetes/docker/Dockerfile
@@ -202,7 +202,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -212,7 +212,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend/onyx/server/features/build/sandbox/kubernetes/docker
file: ./backend/onyx/server/features/build/sandbox/kubernetes/docker/Dockerfile
@@ -258,7 +258,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3

View File

@@ -67,12 +67,12 @@ repos:
args: ["--active", "--with=onyx-devtools", "ods", "check-lazy-imports"]
pass_filenames: true
files: ^backend/(?!\.venv/|scripts/).*\.py$
# NOTE: This takes ~6s on a single, large module which is prohibitively slow.
# - id: uv-run
# name: mypy
# args: ["--all-extras", "mypy"]
# pass_filenames: true
# files: ^backend/.*\.py$
- id: uv-run
alias: ty
name: ty
args: ["ty", "check"]
pass_filenames: true
types_or: [python]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: 3e8a8703264a2f4a69428a0aa4dcb512790b2c8c # frozen: v6.0.0
@@ -86,6 +86,17 @@ repos:
hooks:
- id: actionlint
- repo: https://github.com/shellcheck-py/shellcheck-py
rev: 745eface02aef23e168a8afb6b5737818efbea95 # frozen: v0.11.0.1
hooks:
- id: shellcheck
exclude: >-
(?x)^(
backend/scripts/setup_craft_templates\.sh|
deployment/docker_compose/init-letsencrypt\.sh|
deployment/docker_compose/install\.sh
)$
- repo: https://github.com/psf/black
rev: 8a737e727ac5ab2f1d4cf5876720ed276dc8dc4b # frozen: 25.1.0
hooks:
@@ -142,6 +153,7 @@ repos:
hooks:
- id: ripsecrets
args:
- --strict-ignore
- --additional-pattern
- ^sk-[A-Za-z0-9_\-]{20,}$

1
.secretsignore Normal file
View File

@@ -0,0 +1 @@
.devcontainer/github_known_hosts

View File

@@ -63,11 +63,13 @@ Your features must pass all tests and all comments must be addressed prior to me
### Implicit agreements
If we approve an issue, we are promising you the following:
- Your work will receive timely attention and we will put aside other important items to ensure you are not blocked.
- You will receive necessary coaching on eng quality, system design, etc. to ensure the feature is completed well.
- The Onyx team will pull resources and bandwidth from design, PM, and engineering to ensure that you have all the resources to build the feature to the quality required for merging.
Because this is a large investment from our team, we ask that you:
- Thoroughly read all the requirements of the design docs, engineering best practices, and try to minimize overhead for the Onyx team.
- Complete the feature in a timely manner to reduce context switching and an ongoing resource pull from the Onyx team.
@@ -149,10 +151,10 @@ Set up pre-commit hooks (black / reorder-python-imports):
uv run pre-commit install
```
We also use `mypy` for static type checking. Onyx is fully type-annotated, and we want to keep it that way! To run the mypy checks manually:
We also use `ty` for static type checking. Onyx is fully type-annotated, and we want to keep it that way! To run the ty checks manually:
```bash
uv run mypy . # from onyx/backend
uv run ty check
```
#### Frontend
@@ -192,6 +194,7 @@ Before starting, make sure the Docker Daemon is running.
> **Note:** "Clear and Restart External Volumes and Containers" will reset your Postgres and OpenSearch (relational-db and index). Only run this if you are okay with wiping your data.
**Features:**
- Hot reload is enabled for the web server and API servers
- Python debugging is configured with debugpy
- Environment variables are loaded from `.vscode/.env`
@@ -344,13 +347,16 @@ sudo xattr -r -d com.apple.quarantine ~/.cache/pre-commit
### Style and Maintainability
#### Comments and readability
Add clear comments:
- At logical boundaries (e.g., interfaces) so the reader doesn't need to dig 10 layers deeper.
- Wherever assumptions are made or something non-obvious/unexpected is done.
- For complicated flows/functions.
- Wherever it saves time (e.g., nontrivial regex patterns).
#### Errors and exceptions
- **Fail loudly** rather than silently skipping work.
- Example: raise and let exceptions propagate instead of silently dropping a document.
- **Don't overuse `try/except`.**
@@ -358,6 +364,7 @@ Add clear comments:
- Do not mask exceptions unless it is clearly appropriate.
#### Typing
- Everything should be **as strictly typed as possible**.
- Use `cast` for annoying/loose-typed interfaces (e.g., results of `run_functions_tuples_in_parallel`).
- Only `cast` when the type checker sees `Any` or types are too loose.
@@ -368,6 +375,7 @@ Add clear comments:
- `dict[EmbeddingModel, list[EmbeddingVector]]`
#### State, objects, and boundaries
- Keep **clear logical boundaries** for state containers and objects.
- A **config** object should never contain things like a `db_session`.
- Avoid state containers that are overly nested, or huge + flat (use judgment).
@@ -380,6 +388,7 @@ Add clear comments:
- Prefer **hash maps (dicts)** over tree structures unless there's a strong reason.
#### Naming
- Name variables carefully and intentionally.
- Prefer long, explicit names when undecided.
- Avoid single-character variables except for small, self-contained utilities (or not at all).
@@ -390,6 +399,7 @@ Add clear comments:
- IntelliSense can miss call sites; search works best with unique names.
#### Correctness by construction
- Prefer self-contained correctness — don't rely on callers to "use it right" if you can make misuse hard.
- Avoid redundancies: if a function takes an arg, it shouldn't also take a state object that contains that same arg.
- No dead code (unless there's a very good reason).
@@ -417,29 +427,35 @@ Add clear comments:
### Repository Conventions
#### Where code lives
- Pydantic + data models: `models.py` files.
- DB interface functions (excluding lazy loading): `db/` directory.
- LLM prompts: `prompts/` directory, roughly mirroring the code layout that uses them.
- API routes: `server/` directory.
#### Pydantic and modeling
- Prefer **Pydantic** over dataclasses.
- If absolutely required, use `allow_arbitrary_types`.
#### Data conventions
- Prefer explicit `None` over sentinel empty strings (usually; depends on intent).
- Prefer explicit identifiers: use string enums instead of integer codes.
- Avoid magic numbers (co-location is good when necessary). **Always avoid magic strings.**
#### Logging
- Log messages where they are created.
- Don't propagate log messages around just to log them elsewhere.
#### Encapsulation
- Don't use private attributes/methods/properties from other classes/modules.
- "Private" is private — respect that boundary.
#### SQLAlchemy guidance
- Lazy loading is often bad at scale, especially across multiple list relationships.
- Be careful when accessing SQLAlchemy object attributes:
- It can help avoid redundant DB queries,
@@ -448,6 +464,7 @@ Add clear comments:
- Reference: https://www.reddit.com/r/SQLAlchemy/comments/138f248/joinedload_vs_selectinload/
#### Trunk-based development and feature flags
- **PRs should contain no more than 500 lines of real change.**
- **Merge to main frequently.** Avoid long-lived feature branches — they create merge conflicts and integration pain.
- **Use feature flags for incremental rollout.**
@@ -458,6 +475,7 @@ Add clear comments:
- **Test both flag states.** Ensure the codebase works correctly with the flag on and off.
#### Miscellaneous
- Any TODOs you add in the code must be accompanied by either the name/username of the owner of that TODO, or an issue number for an issue referencing that piece of work.
- Avoid module-level logic that runs on import, which leads to import-time side effects. Essentially every piece of meaningful logic should exist within some function that has to be explicitly invoked. Acceptable exceptions may include loading environment variables or setting up loggers.
- If you find yourself needing something like this, you may want that logic to exist in a file dedicated for manual execution (contains `if __name__ == "__main__":`) which should not be imported by anything else.

View File

@@ -12,7 +12,7 @@ founders@onyx.app for more information. Please visit https://github.com/onyx-dot
ARG ENABLE_CRAFT=false
# DO_NOT_TRACK is used to disable telemetry for Unstructured
ENV ONYX_RUNNING_IN_DOCKER="true" \
ENV DANSWER_RUNNING_IN_DOCKER="true" \
DO_NOT_TRACK="true" \
PLAYWRIGHT_BROWSERS_PATH="/app/.cache/ms-playwright"

View File

@@ -1,7 +1,7 @@
# Base stage with dependencies
FROM python:3.11-slim-bookworm@sha256:9c6f90801e6b68e772b7c0ca74260cbf7af9f320acec894e26fccdaccfbe3b47 AS base
ENV ONYX_RUNNING_IN_DOCKER="true" \
ENV DANSWER_RUNNING_IN_DOCKER="true" \
HF_HOME=/app/.cache/huggingface
COPY --from=ghcr.io/astral-sh/uv:0.9.9 /uv /uvx /bin/
@@ -50,6 +50,10 @@ COPY ./onyx/utils/logger.py /app/onyx/utils/logger.py
COPY ./onyx/utils/middleware.py /app/onyx/utils/middleware.py
COPY ./onyx/utils/tenant.py /app/onyx/utils/tenant.py
# Sentry configuration (used when SENTRY_DSN is set)
COPY ./onyx/configs/__init__.py /app/onyx/configs/__init__.py
COPY ./onyx/configs/sentry.py /app/onyx/configs/sentry.py
# Place to fetch version information
COPY ./onyx/__init__.py /app/onyx/__init__.py

View File

@@ -26,7 +26,9 @@ from shared_configs.configs import (
TENANT_ID_PREFIX,
)
from onyx.db.models import Base
from celery.backends.database.session import ResultModelBase # type: ignore
from celery.backends.database.session import ( # ty: ignore[unresolved-import]
ResultModelBase,
)
from onyx.db.engine.sql_engine import SqlEngine
# Make sure in alembic.ini [logger_root] level=INFO is set or most logging will be

View File

@@ -49,7 +49,7 @@ def upgrade() -> None:
"time_updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
server_onupdate=sa.text("now()"), # type: ignore
server_onupdate=sa.text("now()"), # ty: ignore[invalid-argument-type]
nullable=True,
),
sa.Column(

View File

@@ -68,7 +68,7 @@ def upgrade() -> None:
sa.text("SELECT id FROM tool WHERE in_code_tool_id = :in_code_tool_id"),
{"in_code_tool_id": OPEN_URL_TOOL["in_code_tool_id"]},
).fetchone()
tool_id = result[0] # type: ignore
tool_id = result[0] # ty: ignore[not-subscriptable]
# Associate the tool with all existing personas
# Get all persona IDs

View File

@@ -52,7 +52,7 @@ def upgrade() -> None:
sa.Column(
"created_at",
sa.DateTime(),
default=datetime.datetime.utcnow,
default=lambda: datetime.datetime.now(datetime.timezone.utc),
),
sa.Column(
"cc_pair_id",

View File

@@ -10,7 +10,7 @@ from alembic import op
import sqlalchemy as sa
from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import cast, Any
from typing import cast
from botocore.exceptions import ClientError
@@ -255,7 +255,7 @@ def _migrate_files_to_external_storage() -> None:
continue
lobj_id = cast(int, file_record.lobj_oid)
file_metadata = cast(Any, file_record.file_metadata)
file_metadata = file_record.file_metadata
# Read file content from PostgreSQL
try:

View File

@@ -112,7 +112,7 @@ def _get_access_for_documents(
access_map[document_id] = DocumentAccess.build(
user_emails=list(non_ee_access.user_emails),
user_groups=user_group_info.get(document_id, []),
is_public=is_public_anywhere,
is_public=is_public_anywhere, # ty: ignore[invalid-argument-type]
external_user_emails=list(ext_u_emails),
external_user_group_ids=list(ext_u_groups),
)

View File

@@ -1,5 +1,6 @@
import os
from datetime import datetime
from datetime import timezone
import jwt
from fastapi import Depends
@@ -58,7 +59,7 @@ def generate_anonymous_user_jwt_token(tenant_id: str) -> str:
payload = {
"tenant_id": tenant_id,
# Token does not expire
"iat": datetime.utcnow(), # Issued at time
"iat": datetime.now(timezone.utc), # Issued at time
}
return jwt.encode(payload, USER_AUTH_SECRET, algorithm="HS256")

View File

@@ -1,8 +1,10 @@
import time
from typing import cast
from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from redis.client import Redis
from redis.lock import Lock as RedisLock
from ee.onyx.server.tenants.product_gating import get_gated_tenants
@@ -16,9 +18,56 @@ from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine.tenant_utils import get_all_tenant_ids
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_tenant_work_gating import cleanup_expired
from onyx.redis.redis_tenant_work_gating import get_active_tenants
from onyx.redis.redis_tenant_work_gating import observe_active_set_size
from onyx.redis.redis_tenant_work_gating import record_full_fanout_cycle
from onyx.redis.redis_tenant_work_gating import record_gate_decision
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST
_FULL_FANOUT_TIMESTAMP_KEY_PREFIX = "tenant_work_gating_last_full_fanout_ms"
def _should_bypass_gate_for_full_fanout(
redis_client: Redis, task_name: str, interval_seconds: int
) -> bool:
"""True if at least `interval_seconds` have elapsed since the last
full-fanout bypass for this task. On True, updates the stored timestamp
atomically-enough (it's a best-effort counter, not a lock)."""
key = f"{_FULL_FANOUT_TIMESTAMP_KEY_PREFIX}:{task_name}"
now_ms = int(time.time() * 1000)
threshold_ms = now_ms - (interval_seconds * 1000)
try:
raw = cast(bytes | None, redis_client.get(key))
except Exception:
task_logger.exception(f"full-fanout timestamp read failed: task={task_name}")
# Fail open: treat as "interval elapsed" so we don't skip every
# tenant during a Redis hiccup.
return True
if raw is None:
# First invocation — bypass so the set seeds cleanly.
elapsed = True
else:
try:
last_ms = int(raw.decode())
elapsed = last_ms <= threshold_ms
except ValueError:
elapsed = True
if elapsed:
try:
redis_client.set(key, str(now_ms))
except Exception:
task_logger.exception(
f"full-fanout timestamp write failed: task={task_name}"
)
return elapsed
@shared_task(
name=OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR,
ignore_result=True,
@@ -32,6 +81,7 @@ def cloud_beat_task_generator(
priority: int = OnyxCeleryPriority.MEDIUM,
expires: int = BEAT_EXPIRES_DEFAULT,
skip_gated: bool = True,
work_gated: bool = False,
) -> bool | None:
"""a lightweight task used to kick off individual beat tasks per tenant."""
time_start = time.monotonic()
@@ -51,8 +101,56 @@ def cloud_beat_task_generator(
tenant_ids: list[str] = []
num_processed_tenants = 0
num_skipped_gated = 0
num_would_skip_work_gate = 0
num_skipped_work_gate = 0
# Tenant-work-gating read path. Resolve once per invocation.
gate_enabled = False
gate_enforce = False
full_fanout_cycle = False
active_tenants: set[str] | None = None
try:
# Gating setup is inside the try block so any exception still
# reaches the finally that releases the beat lock.
if work_gated:
try:
gate_enabled = OnyxRuntime.get_tenant_work_gating_enabled()
gate_enforce = OnyxRuntime.get_tenant_work_gating_enforce()
except Exception:
task_logger.exception("tenant work gating: runtime flag read failed")
gate_enabled = False
if gate_enabled:
redis_failed = False
interval_s = (
OnyxRuntime.get_tenant_work_gating_full_fanout_interval_seconds()
)
full_fanout_cycle = _should_bypass_gate_for_full_fanout(
redis_client, task_name, interval_s
)
if full_fanout_cycle:
record_full_fanout_cycle(task_name)
try:
ttl_s = OnyxRuntime.get_tenant_work_gating_ttl_seconds()
cleanup_expired(ttl_s)
except Exception:
task_logger.exception(
"tenant work gating: cleanup_expired failed"
)
else:
ttl_s = OnyxRuntime.get_tenant_work_gating_ttl_seconds()
active_tenants = get_active_tenants(ttl_s)
if active_tenants is None:
full_fanout_cycle = True
record_full_fanout_cycle(task_name)
redis_failed = True
# Only refresh the gauge when Redis is known-reachable —
# skip the ZCARD if we just failed open due to a Redis error.
if not redis_failed:
observe_active_set_size()
tenant_ids = get_all_tenant_ids()
# Per-task control over whether gated tenants are included. Most periodic tasks
@@ -76,6 +174,21 @@ def cloud_beat_task_generator(
if IGNORED_SYNCING_TENANT_LIST and tenant_id in IGNORED_SYNCING_TENANT_LIST:
continue
# Tenant work gate: if the feature is on, check membership. Skip
# unmarked tenants when enforce=True AND we're not in a full-
# fanout cycle. Always log/emit the shadow counter.
if work_gated and gate_enabled and not full_fanout_cycle:
would_skip = (
active_tenants is not None and tenant_id not in active_tenants
)
if would_skip:
num_would_skip_work_gate += 1
if gate_enforce:
num_skipped_work_gate += 1
record_gate_decision(task_name, skipped=True)
continue
record_gate_decision(task_name, skipped=False)
self.app.send_task(
task_name,
kwargs=dict(
@@ -109,6 +222,12 @@ def cloud_beat_task_generator(
f"task={task_name} "
f"num_processed_tenants={num_processed_tenants} "
f"num_skipped_gated={num_skipped_gated} "
f"num_would_skip_work_gate={num_would_skip_work_gate} "
f"num_skipped_work_gate={num_skipped_work_gate} "
f"full_fanout_cycle={full_fanout_cycle} "
f"work_gated={work_gated} "
f"gate_enabled={gate_enabled} "
f"gate_enforce={gate_enforce} "
f"num_tenants={len(tenant_ids)} "
f"elapsed={time_elapsed:.2f}"
)

View File

@@ -80,6 +80,7 @@ from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyn
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_tenant_work_gating import maybe_mark_tenant_active
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.server.utils import make_short_id
from onyx.utils.logger import doc_permission_sync_ctx
@@ -208,6 +209,11 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str) -> bool | None
if _is_external_doc_permissions_sync_due(cc_pair):
cc_pair_ids_to_sync.append(cc_pair.id)
# Tenant-work-gating hook: refresh this tenant's active-set membership
# whenever doc-permission sync has any due cc_pairs to dispatch.
if cc_pair_ids_to_sync:
maybe_mark_tenant_active(tenant_id)
lock_beat.reacquire()
for cc_pair_id in cc_pair_ids_to_sync:
payload_id = try_creating_permissions_sync_task(

View File

@@ -69,6 +69,7 @@ from onyx.redis.redis_connector_ext_group_sync import (
)
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_tenant_work_gating import maybe_mark_tenant_active
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.server.utils import make_short_id
from onyx.utils.logger import format_error_for_logging
@@ -202,6 +203,11 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str) -> bool | None:
if _is_external_group_sync_due(cc_pair):
cc_pair_ids_to_sync.append(cc_pair.id)
# Tenant-work-gating hook: refresh this tenant's active-set membership
# whenever external-group sync has any due cc_pairs to dispatch.
if cc_pair_ids_to_sync:
maybe_mark_tenant_active(tenant_id)
lock_beat.reacquire()
for cc_pair_id in cc_pair_ids_to_sync:
payload_id = try_creating_external_group_sync_task(

View File

@@ -53,7 +53,7 @@ def fetch_query_analytics(
.order_by(cast(ChatMessage.time_sent, Date))
)
return db_session.execute(stmt).all() # type: ignore
return db_session.execute(stmt).all() # ty: ignore[invalid-return-type]
def fetch_per_user_query_analytics(
@@ -92,7 +92,7 @@ def fetch_per_user_query_analytics(
.order_by(cast(ChatMessage.time_sent, Date), ChatSession.user_id)
)
return db_session.execute(stmt).all() # type: ignore
return db_session.execute(stmt).all() # ty: ignore[invalid-return-type]
def fetch_onyxbot_analytics(

View File

@@ -9,7 +9,7 @@ logger = setup_logger()
def fetch_sources_with_connectors(db_session: Session) -> list[DocumentSource]:
sources = db_session.query(distinct(Connector.source)).all() # type: ignore
sources = db_session.query(distinct(Connector.source)).all()
document_sources = [source[0] for source in sources]

View File

@@ -128,9 +128,9 @@ def get_used_seats(tenant_id: str | None = None) -> int:
select(func.count())
.select_from(User)
.where(
User.is_active == True, # type: ignore # noqa: E712
User.is_active == True, # noqa: E712
User.role != UserRole.EXT_PERM_USER,
User.email != ANONYMOUS_USER_EMAIL, # type: ignore
User.email != ANONYMOUS_USER_EMAIL,
User.account_type != AccountType.SERVICE_ACCOUNT,
)
)

View File

@@ -121,7 +121,7 @@ class ScimDAL(DAL):
"""Update the last_used_at timestamp for a token."""
token = self._session.get(ScimToken, token_id)
if token:
token.last_used_at = func.now() # type: ignore[assignment]
token.last_used_at = func.now()
# ------------------------------------------------------------------
# User mapping operations
@@ -229,7 +229,7 @@ class ScimDAL(DAL):
def get_user(self, user_id: UUID) -> User | None:
"""Fetch a user by ID."""
return self._session.scalar(
select(User).where(User.id == user_id) # type: ignore[arg-type]
select(User).where(User.id == user_id) # ty: ignore[invalid-argument-type]
)
def get_user_by_email(self, email: str) -> User | None:
@@ -293,16 +293,22 @@ class ScimDAL(DAL):
if attr == "username":
# arg-type: fastapi-users types User.email as str, not a column expression
# assignment: union return type widens but query is still Select[tuple[User]]
query = _apply_scim_string_op(query, User.email, scim_filter) # type: ignore[arg-type, assignment]
query = _apply_scim_string_op(
query, User.email, scim_filter # ty: ignore[invalid-argument-type]
)
elif attr == "active":
query = query.where(
User.is_active.is_(scim_filter.value.lower() == "true") # type: ignore[attr-defined]
User.is_active.is_( # ty: ignore[unresolved-attribute]
scim_filter.value.lower() == "true"
)
)
elif attr == "externalid":
mapping = self.get_user_mapping_by_external_id(scim_filter.value)
if not mapping:
return [], 0
query = query.where(User.id == mapping.user_id) # type: ignore[arg-type]
query = query.where(
User.id == mapping.user_id # ty: ignore[invalid-argument-type]
)
else:
raise ValueError(
f"Unsupported filter attribute: {scim_filter.attribute}"
@@ -318,7 +324,9 @@ class ScimDAL(DAL):
offset = max(start_index - 1, 0)
users = list(
self._session.scalars(
query.order_by(User.id).offset(offset).limit(count) # type: ignore[arg-type]
query.order_by(User.id) # ty: ignore[invalid-argument-type]
.offset(offset)
.limit(count)
)
.unique()
.all()
@@ -577,7 +585,7 @@ class ScimDAL(DAL):
attr = scim_filter.attribute.lower()
if attr == "displayname":
# assignment: union return type widens but query is still Select[tuple[UserGroup]]
query = _apply_scim_string_op(query, UserGroup.name, scim_filter) # type: ignore[assignment]
query = _apply_scim_string_op(query, UserGroup.name, scim_filter)
elif attr == "externalid":
mapping = self.get_group_mapping_by_external_id(scim_filter.value)
if not mapping:
@@ -615,7 +623,9 @@ class ScimDAL(DAL):
users = (
self._session.scalars(
select(User).where(User.id.in_(user_ids)) # type: ignore[attr-defined]
select(User).where(
User.id.in_(user_ids) # ty: ignore[unresolved-attribute]
)
)
.unique()
.all()
@@ -640,7 +650,9 @@ class ScimDAL(DAL):
return []
existing_users = (
self._session.scalars(
select(User).where(User.id.in_(uuids)) # type: ignore[attr-defined]
select(User).where(
User.id.in_(uuids) # ty: ignore[unresolved-attribute]
)
)
.unique()
.all()

View File

@@ -300,8 +300,11 @@ def fetch_user_groups_for_user(
stmt = (
select(UserGroup)
.join(User__UserGroup, User__UserGroup.user_group_id == UserGroup.id)
.join(User, User.id == User__UserGroup.user_id) # type: ignore
.where(User.id == user_id) # type: ignore
.join(
User,
User.id == User__UserGroup.user_id, # ty: ignore[invalid-argument-type]
)
.where(User.id == user_id) # ty: ignore[invalid-argument-type]
)
if only_curator_groups:
stmt = stmt.where(User__UserGroup.is_curator == True) # noqa: E712
@@ -430,7 +433,7 @@ def fetch_user_groups_for_documents(
.group_by(Document.id)
)
return db_session.execute(stmt).all() # type: ignore
return db_session.execute(stmt).all() # ty: ignore[invalid-return-type]
def _check_user_group_is_modifiable(user_group: UserGroup) -> None:
@@ -804,7 +807,9 @@ def update_user_group(
db_user_group.is_up_to_date = False
removed_users = db_session.scalars(
select(User).where(User.id.in_(removed_user_ids)) # type: ignore
select(User).where(
User.id.in_(removed_user_ids) # ty: ignore[unresolved-attribute]
)
).unique()
# Filter out admin and global curator users before validating curator status

View File

@@ -1,6 +1,6 @@
from collections.abc import Iterator
from googleapiclient.discovery import Resource # type: ignore
from googleapiclient.discovery import Resource
from ee.onyx.external_permissions.google_drive.models import GoogleDrivePermission
from ee.onyx.external_permissions.google_drive.permission_retrieval import (
@@ -38,7 +38,7 @@ def get_folder_permissions_by_ids(
A list of permissions matching the provided permission IDs
"""
return get_permissions_by_ids(
drive_service=service,
drive_service=service, # ty: ignore[invalid-argument-type]
doc_id=folder_id,
permission_ids=permission_ids,
)
@@ -68,7 +68,7 @@ def get_modified_folders(
# Retrieve and yield folders
for folder in execute_paginated_retrieval(
retrieval_function=service.files().list,
retrieval_function=service.files().list, # ty: ignore[unresolved-attribute]
list_key="files",
continue_on_404_or_403=True,
corpora="allDrives",

View File

@@ -1,6 +1,6 @@
from collections.abc import Generator
from googleapiclient.errors import HttpError # type: ignore
from googleapiclient.errors import HttpError
from pydantic import BaseModel
from ee.onyx.db.external_perm import ExternalUserGroup
@@ -183,7 +183,7 @@ def _get_drive_members(
)
admin_user_info = (
admin_service.users()
admin_service.users() # ty: ignore[unresolved-attribute]
.get(userKey=google_drive_connector.primary_admin_email)
.execute()
)
@@ -197,7 +197,7 @@ def _get_drive_members(
try:
for permission in execute_paginated_retrieval(
drive_service.permissions().list,
drive_service.permissions().list, # ty: ignore[unresolved-attribute]
list_key="permissions",
fileId=drive_id,
fields="permissions(emailAddress, type),nextPageToken",
@@ -256,7 +256,7 @@ def _get_all_google_groups(
"""
group_emails: set[str] = set()
for group in execute_paginated_retrieval(
admin_service.groups().list,
admin_service.groups().list, # ty: ignore[unresolved-attribute]
list_key="groups",
domain=google_domain,
fields="groups(email),nextPageToken",
@@ -274,7 +274,7 @@ def _google_group_to_onyx_group(
"""
group_member_emails: set[str] = set()
for member in execute_paginated_retrieval(
admin_service.members().list,
admin_service.members().list, # ty: ignore[unresolved-attribute]
list_key="members",
groupKey=group_email,
fields="members(email),nextPageToken",
@@ -298,7 +298,7 @@ def _map_group_email_to_member_emails(
for group_email in group_emails:
group_member_emails: set[str] = set()
for member in execute_paginated_retrieval(
admin_service.members().list,
admin_service.members().list, # ty: ignore[unresolved-attribute]
list_key="members",
groupKey=group_email,
fields="members(email),nextPageToken",

View File

@@ -33,7 +33,7 @@ def get_permissions_by_ids(
# Fetch all permissions for the document
fetched_permissions = execute_paginated_retrieval(
retrieval_function=drive_service.permissions().list,
retrieval_function=drive_service.permissions().list, # ty: ignore[unresolved-attribute]
list_key="permissions",
fileId=doc_id,
fields="permissions(id, emailAddress, type, domain, allowFileDiscovery, permissionDetails),nextPageToken",

View File

@@ -68,7 +68,7 @@ def _build_holder_map(permissions: list[dict]) -> dict[str, list[Holder]]:
logger.warning(f"Expected a 'raw' field, but none was found: {raw_perm=}")
continue
permission = Permission(**raw_perm.raw)
permission = Permission(**raw_perm.raw) # ty: ignore[invalid-argument-type]
# We only care about ability to browse through projects + issues (not other permissions such as read/write).
if permission.permission != "BROWSE_PROJECTS":

View File

@@ -1,6 +1,6 @@
from collections.abc import Generator
from office365.sharepoint.client_context import ClientContext # type: ignore[import-untyped]
from office365.sharepoint.client_context import ClientContext
from ee.onyx.db.external_perm import ExternalUserGroup
from ee.onyx.external_permissions.sharepoint.permission_utils import (

View File

@@ -7,11 +7,11 @@ from typing import Any
from urllib.parse import urlparse
import requests as _requests
from office365.graph_client import GraphClient # type: ignore[import-untyped]
from office365.onedrive.driveitems.driveItem import DriveItem # type: ignore[import-untyped]
from office365.runtime.client_request import ClientRequestException # type: ignore
from office365.sharepoint.client_context import ClientContext # type: ignore[import-untyped]
from office365.sharepoint.permissions.securable_object import RoleAssignmentCollection # type: ignore[import-untyped]
from office365.graph_client import GraphClient
from office365.onedrive.driveitems.driveItem import DriveItem
from office365.runtime.client_request import ClientRequestException
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.permissions.securable_object import RoleAssignmentCollection
from pydantic import BaseModel
from ee.onyx.db.external_perm import ExternalUserGroup

View File

@@ -46,9 +46,10 @@ def get_query_analytics(
daily_query_usage_info = fetch_query_analytics(
start=start
or (
datetime.datetime.utcnow() - datetime.timedelta(days=_DEFAULT_LOOKBACK_DAYS)
datetime.datetime.now(tz=datetime.timezone.utc)
- datetime.timedelta(days=_DEFAULT_LOOKBACK_DAYS)
), # default is 30d lookback
end=end or datetime.datetime.utcnow(),
end=end or datetime.datetime.now(tz=datetime.timezone.utc),
db_session=db_session,
)
return [
@@ -77,9 +78,10 @@ def get_user_analytics(
daily_query_usage_info_per_user = fetch_per_user_query_analytics(
start=start
or (
datetime.datetime.utcnow() - datetime.timedelta(days=_DEFAULT_LOOKBACK_DAYS)
datetime.datetime.now(tz=datetime.timezone.utc)
- datetime.timedelta(days=_DEFAULT_LOOKBACK_DAYS)
), # default is 30d lookback
end=end or datetime.datetime.utcnow(),
end=end or datetime.datetime.now(tz=datetime.timezone.utc),
db_session=db_session,
)
@@ -111,9 +113,10 @@ def get_onyxbot_analytics(
daily_onyxbot_info = fetch_onyxbot_analytics(
start=start
or (
datetime.datetime.utcnow() - datetime.timedelta(days=_DEFAULT_LOOKBACK_DAYS)
datetime.datetime.now(tz=datetime.timezone.utc)
- datetime.timedelta(days=_DEFAULT_LOOKBACK_DAYS)
), # default is 30d lookback
end=end or datetime.datetime.utcnow(),
end=end or datetime.datetime.now(tz=datetime.timezone.utc),
db_session=db_session,
)
@@ -146,9 +149,10 @@ def get_persona_messages(
) -> list[PersonaMessageAnalyticsResponse]:
"""Fetch daily message counts for a single persona within the given time range."""
start = start or (
datetime.datetime.utcnow() - datetime.timedelta(days=_DEFAULT_LOOKBACK_DAYS)
datetime.datetime.now(tz=datetime.timezone.utc)
- datetime.timedelta(days=_DEFAULT_LOOKBACK_DAYS)
)
end = end or datetime.datetime.utcnow()
end = end or datetime.datetime.now(tz=datetime.timezone.utc)
persona_message_counts = []
for count, date in fetch_persona_message_analytics(
@@ -226,9 +230,10 @@ def get_assistant_stats(
along with the overall total messages and total distinct users.
"""
start = start or (
datetime.datetime.utcnow() - datetime.timedelta(days=_DEFAULT_LOOKBACK_DAYS)
datetime.datetime.now(tz=datetime.timezone.utc)
- datetime.timedelta(days=_DEFAULT_LOOKBACK_DAYS)
)
end = end or datetime.datetime.utcnow()
end = end or datetime.datetime.now(tz=datetime.timezone.utc)
if not user_can_view_assistant_stats(db_session, user, assistant_id):
raise HTTPException(

View File

@@ -287,8 +287,10 @@ def update_hook(
validated_is_reachable: bool | None = None
if endpoint_url_changing or api_key_changing or timeout_changing:
existing = _get_hook_or_404(db_session, hook_id)
effective_url: str = (
req.endpoint_url if endpoint_url_changing else existing.endpoint_url # type: ignore[assignment] # endpoint_url is required on create and cannot be cleared on update
effective_url: str = ( # ty: ignore[invalid-assignment]
req.endpoint_url
if endpoint_url_changing
else existing.endpoint_url # endpoint_url is required on create and cannot be cleared on update
)
effective_api_key: str | None = (
(api_key if not isinstance(api_key, UnsetType) else None)
@@ -299,8 +301,10 @@ def update_hook(
else None
)
)
effective_timeout: float = (
req.timeout_seconds if timeout_changing else existing.timeout_seconds # type: ignore[assignment] # req.timeout_seconds is non-None when timeout_changing (validated by HookUpdateRequest)
effective_timeout: float = ( # ty: ignore[invalid-assignment]
req.timeout_seconds
if timeout_changing
else existing.timeout_seconds # req.timeout_seconds is non-None when timeout_changing (validated by HookUpdateRequest)
)
validation = _validate_endpoint(
endpoint_url=effective_url,

View File

@@ -97,7 +97,7 @@ def fetch_and_process_chat_session_history(
break
paged_snapshots = parallel_yield(
[
[ # ty: ignore[invalid-argument-type]
yield_snapshot_from_chat_session(
db_session=db_session,
chat_session=chat_session,

View File

@@ -1,5 +1,6 @@
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import jwt
from fastapi import HTTPException
@@ -19,8 +20,8 @@ def generate_data_plane_token() -> str:
payload = {
"iss": "data_plane",
"exp": datetime.utcnow() + timedelta(minutes=5),
"iat": datetime.utcnow(),
"exp": datetime.now(tz=timezone.utc) + timedelta(minutes=5),
"iat": datetime.now(tz=timezone.utc),
"scope": "api_access",
}

View File

@@ -55,8 +55,10 @@ def run_alembic_migrations(schema_name: str) -> None:
alembic_cfg.attributes["configure_logger"] = False
# Mimic command-line options by adding 'cmd_opts' to the config
alembic_cfg.cmd_opts = SimpleNamespace() # type: ignore
alembic_cfg.cmd_opts.x = [f"schemas={schema_name}"] # type: ignore
alembic_cfg.cmd_opts = SimpleNamespace() # ty: ignore[invalid-assignment]
alembic_cfg.cmd_opts.x = [ # ty: ignore[invalid-assignment]
f"schemas={schema_name}"
]
# Run migrations programmatically
command.upgrade(alembic_cfg, "head")

View File

@@ -349,8 +349,9 @@ def get_tenant_count(tenant_id: str) -> int:
user_count = (
db_session.query(User)
.filter(
User.email.in_(emails), # type: ignore
User.is_active == True, # type: ignore # noqa: E712
User.email.in_(emails), # ty: ignore[unresolved-attribute]
User.is_active # noqa: E712 # ty: ignore[invalid-argument-type]
== True,
)
.count()
)

View File

@@ -73,7 +73,7 @@ def capture_and_sync_with_alternate_posthog(
cloud_props.pop("onyx_cloud_user_id", None)
posthog.identify(
distinct_id=cloud_user_id,
distinct_id=cloud_user_id, # ty: ignore[possibly-unresolved-reference]
properties=cloud_props,
)
except Exception as e:
@@ -105,7 +105,7 @@ def get_anon_id_from_request(request: Any) -> str | None:
if (cookie_value := request.cookies.get(cookie_name)) and (
parsed := parse_posthog_cookie(cookie_value)
):
return parsed.get("distinct_id")
return parsed.get("distinct_id") # ty: ignore[possibly-unresolved-reference]
return None

View File

@@ -23,7 +23,7 @@
# from shared_configs.model_server_models import IntentResponse
# if TYPE_CHECKING:
# from setfit import SetFitModel # type: ignore[import-untyped]
# from setfit import SetFitModel
# from transformers import PreTrainedTokenizer, BatchEncoding
@@ -423,7 +423,7 @@
# def map_keywords(
# input_ids: torch.Tensor, tokenizer: "PreTrainedTokenizer", is_keyword: list[bool]
# ) -> list[str]:
# tokens = tokenizer.convert_ids_to_tokens(input_ids) # type: ignore
# tokens = tokenizer.convert_ids_to_tokens(input_ids)
# if not len(tokens) == len(is_keyword):
# raise ValueError("Length of tokens and keyword predictions must match")

View File

@@ -18,7 +18,7 @@
# super().__init__()
# config = DistilBertConfig()
# self.distilbert = DistilBertModel(config)
# config = self.distilbert.config # type: ignore
# config = self.distilbert.config
# # Keyword tokenwise binary classification layer
# self.keyword_classifier = nn.Linear(config.dim, 2)
@@ -85,7 +85,7 @@
# self.config = config
# self.distilbert = DistilBertModel(config)
# config = self.distilbert.config # type: ignore
# config = self.distilbert.config
# self.connector_global_classifier = nn.Linear(config.dim, 1)
# self.connector_match_classifier = nn.Linear(config.dim, 1)
# self.tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")

View File

@@ -7,8 +7,8 @@ from email.mime.text import MIMEText
from email.utils import formatdate
from email.utils import make_msgid
import sendgrid # type: ignore
from sendgrid.helpers.mail import Attachment # type: ignore
import sendgrid
from sendgrid.helpers.mail import Attachment
from sendgrid.helpers.mail import Content
from sendgrid.helpers.mail import ContentId
from sendgrid.helpers.mail import Disposition

View File

@@ -10,7 +10,7 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from jwt import decode as jwt_decode
from jwt import InvalidTokenError
from jwt import PyJWTError
from jwt.algorithms import RSAAlgorithm
from jwt.algorithms import RSAAlgorithm # ty: ignore[possibly-missing-import]
from onyx.configs.app_configs import JWT_PUBLIC_KEY_URL
from onyx.utils.logger import setup_logger

View File

@@ -46,8 +46,10 @@ async def _test_expire_oauth_token(
updated_data: Dict[str, Any] = {"expires_at": new_expires_at}
await user_manager.user_db.update_oauth_account(
user, cast(Any, oauth_account), updated_data
await user_manager.user_db.update_oauth_account( # ty: ignore[invalid-argument-type]
user, # ty: ignore[invalid-argument-type]
cast(Any, oauth_account),
updated_data,
)
return True
@@ -132,8 +134,10 @@ async def refresh_oauth_token(
)
# Update the OAuth account
await user_manager.user_db.update_oauth_account(
user, cast(Any, oauth_account), updated_data
await user_manager.user_db.update_oauth_account( # ty: ignore[invalid-argument-type]
user, # ty: ignore[invalid-argument-type]
cast(Any, oauth_account),
updated_data,
)
logger.info(f"Successfully refreshed OAuth token for {user.email}")

View File

@@ -191,7 +191,7 @@ class OAuthTokenManager:
@staticmethod
def _unwrap_sensitive_str(value: SensitiveValue[str] | str) -> str:
if isinstance(value, SensitiveValue):
return value.get_value(apply_mask=False)
return value.get_value(apply_mask=False) # ty: ignore[invalid-return-type]
return value
@staticmethod
@@ -199,5 +199,7 @@ class OAuthTokenManager:
token_data: SensitiveValue[dict[str, Any]] | dict[str, Any],
) -> dict[str, Any]:
if isinstance(token_data, SensitiveValue):
return token_data.get_value(apply_mask=False)
return token_data.get_value( # ty: ignore[invalid-return-type]
apply_mask=False
)
return token_data

View File

@@ -121,5 +121,7 @@ def require_permission(
return user
dependency._is_require_permission = True # type: ignore[attr-defined] # sentinel for auth_check detection
dependency._is_require_permission = ( # ty: ignore[unresolved-attribute]
True # sentinel for auth_check detection
)
return dependency

View File

@@ -45,7 +45,9 @@ from fastapi_users import UUIDIDMixin
from fastapi_users.authentication import AuthenticationBackend
from fastapi_users.authentication import CookieTransport
from fastapi_users.authentication import JWTStrategy
from fastapi_users.authentication import RedisStrategy
from fastapi_users.authentication import (
RedisStrategy, # ty: ignore[possibly-missing-import]
)
from fastapi_users.authentication import Strategy
from fastapi_users.authentication.strategy.db import AccessTokenDatabase
from fastapi_users.authentication.strategy.db import DatabaseStrategy
@@ -462,14 +464,16 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
self.user_db = tenant_user_db
if hasattr(user_create, "role"):
user_create.role = UserRole.BASIC
user_create.role = UserRole.BASIC # ty: ignore[invalid-assignment]
user_count = await get_user_count()
if (
user_count == 0
or user_create.email in get_default_admin_user_emails()
):
user_create.role = UserRole.ADMIN
user_create.role = ( # ty: ignore[invalid-assignment]
UserRole.ADMIN
)
# Check seat availability for new users (single-tenant only)
with get_session_with_current_tenant() as sync_db:
@@ -516,7 +520,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# Expire so the async session re-fetches the row updated by
# the sync session above.
self.user_db.session.expire(user)
user = await self.user_db.get(user_id) # type: ignore[assignment]
user = await self.user_db.get( # ty: ignore[invalid-assignment]
user_id
)
except exceptions.UserAlreadyExists:
user = await self.get_by_email(user_create.email)
@@ -544,7 +550,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# Expire so the async session re-fetches the row updated by
# the sync session above.
self.user_db.session.expire(user)
user = await self.user_db.get(user_id) # type: ignore[assignment]
user = await self.user_db.get( # ty: ignore[invalid-assignment]
user_id
)
if user_created:
await self._assign_default_pinned_assistants(user, db_session)
remove_user_from_invited_users(user_create.email)
@@ -592,7 +600,11 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
update nor the group assignment is visible without the other.
"""
with get_session_with_current_tenant() as sync_db:
sync_user = sync_db.query(User).filter(User.id == user_id).first() # type: ignore[arg-type]
sync_user = (
sync_db.query(User)
.filter(User.id == user_id) # ty: ignore[invalid-argument-type]
.first()
)
if sync_user:
sync_user.hashed_password = self.password_helper.hash(
user_create.password
@@ -613,7 +625,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
user_id,
)
async def validate_password(self, password: str, _: schemas.UC | models.UP) -> None:
async def validate_password( # ty: ignore[invalid-method-override]
self, password: str, _: schemas.UC | models.UP
) -> None:
# Validate password according to configurable security policy (defined via environment variables)
if len(password) < PASSWORD_MIN_LENGTH:
raise exceptions.InvalidPasswordException(
@@ -644,7 +658,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
return
@log_function_time(print_only=True)
async def oauth_callback(
async def oauth_callback( # ty: ignore[invalid-method-override]
self,
oauth_name: str,
access_token: str,
@@ -754,7 +768,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
user,
# NOTE: OAuthAccount DOES implement the OAuthAccountProtocol
# but the type checker doesn't know that :(
existing_oauth_account, # type: ignore
existing_oauth_account, # ty: ignore[invalid-argument-type]
oauth_account_dict,
)
@@ -788,7 +802,11 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# transaction so neither change is visible without the other.
was_inactive = not user.is_active
with get_session_with_current_tenant() as sync_db:
sync_user = sync_db.query(User).filter(User.id == user.id).first() # type: ignore[arg-type]
sync_user = (
sync_db.query(User)
.filter(User.id == user.id) # ty: ignore[invalid-argument-type]
.first()
)
if sync_user:
sync_user.is_verified = is_verified_by_default
sync_user.role = UserRole.BASIC
@@ -808,7 +826,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# otherwise, the oidc expiry will always be old, and the user will never be able to login
if user.oidc_expiry is not None and not TRACK_EXTERNAL_IDP_EXPIRY:
await self.user_db.update(user, {"oidc_expiry": None})
user.oidc_expiry = None # type: ignore
user.oidc_expiry = None # ty: ignore[invalid-assignment]
remove_user_from_invited_users(user.email)
if token:
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
@@ -925,7 +943,11 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
and (marketing_cookie_value := request.cookies.get(marketing_cookie_name))
and (parsed_cookie := parse_posthog_cookie(marketing_cookie_value))
):
marketing_anonymous_id = parsed_cookie["distinct_id"]
marketing_anonymous_id = (
parsed_cookie[ # ty: ignore[possibly-unresolved-reference]
"distinct_id"
]
)
# Technically, USER_SIGNED_UP is only fired from the cloud site when
# it is the first user in a tenant. However, it is semantically correct
@@ -942,7 +964,10 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
}
# Add all other values from the marketing cookie (featureFlags, etc.)
for key, value in parsed_cookie.items():
for (
key,
value,
) in parsed_cookie.items(): # ty: ignore[possibly-unresolved-reference]
if key != "distinct_id":
properties.setdefault(key, value)
@@ -1504,7 +1529,7 @@ async def _sync_jwt_oidc_expiry(
if user.oidc_expiry is not None:
await user_manager.user_db.update(user, {"oidc_expiry": None})
user.oidc_expiry = None # type: ignore
user.oidc_expiry = None # ty: ignore[invalid-assignment]
async def _get_or_create_user_from_jwt(
@@ -2232,7 +2257,7 @@ def get_oauth_router(
# Proceed to authenticate or create the user
try:
user = await user_manager.oauth_callback(
user = await user_manager.oauth_callback( # ty: ignore[invalid-argument-type]
oauth_client.name,
token["access_token"],
account_id,

View File

@@ -6,16 +6,16 @@ from typing import Any
from typing import cast
import sentry_sdk
from celery import bootsteps # type: ignore
from celery import bootsteps # ty: ignore[unresolved-import]
from celery import Task
from celery.app import trace
from celery.app import trace # ty: ignore[unresolved-import]
from celery.exceptions import WorkerShutdown
from celery.signals import before_task_publish
from celery.signals import task_postrun
from celery.signals import task_prerun
from celery.states import READY_STATES
from celery.utils.log import get_task_logger
from celery.worker import strategy # type: ignore
from celery.worker import strategy # ty: ignore[unresolved-import]
from redis.lock import Lock as RedisLock
from sentry_sdk.integrations.celery import CeleryIntegration
from sqlalchemy import text
@@ -30,6 +30,7 @@ from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_PREFI
from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_TASKSET_KEY
from onyx.configs.app_configs import DISABLE_VECTOR_DB
from onyx.configs.app_configs import ENABLE_OPENSEARCH_INDEXING_FOR_ONYX
from onyx.configs.app_configs import ONYX_DISABLE_VESPA
from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine.sql_engine import get_sqlalchemy_engine
@@ -531,23 +532,26 @@ def reset_tenant_id(
CURRENT_TENANT_ID_CONTEXTVAR.set(POSTGRES_DEFAULT_SCHEMA)
def wait_for_vespa_or_shutdown(
sender: Any, # noqa: ARG001
**kwargs: Any, # noqa: ARG001
) -> None: # noqa: ARG001
"""Waits for Vespa to become ready subject to a timeout.
Raises WorkerShutdown if the timeout is reached."""
def wait_for_document_index_or_shutdown() -> None:
"""
Waits for all configured document indices to become ready subject to a
timeout.
Raises WorkerShutdown if the timeout is reached.
"""
if DISABLE_VECTOR_DB:
logger.info(
"DISABLE_VECTOR_DB is set — skipping Vespa/OpenSearch readiness check."
)
return
if not wait_for_vespa_with_timeout():
msg = "[Vespa] Readiness probe did not succeed within the timeout. Exiting..."
logger.error(msg)
raise WorkerShutdown(msg)
if not ONYX_DISABLE_VESPA:
if not wait_for_vespa_with_timeout():
msg = (
"[Vespa] Readiness probe did not succeed within the timeout. Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)
if ENABLE_OPENSEARCH_INDEXING_FOR_ONYX:
if not wait_for_opensearch_with_timeout():

View File

@@ -3,7 +3,7 @@ from typing import Any
from celery import Celery
from celery import signals
from celery.beat import PersistentScheduler # type: ignore
from celery.beat import PersistentScheduler # ty: ignore[unresolved-import]
from celery.signals import beat_init
from celery.utils.log import get_task_logger

View File

@@ -4,4 +4,4 @@ import onyx.background.celery.apps.app_base as app_base
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.client")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]

View File

@@ -29,7 +29,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.docfetching")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -100,12 +100,12 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_DOCFETCHING_APP_NAME)
pool_size = cast(int, sender.concurrency) # type: ignore
pool_size = cast(int, sender.concurrency) # ty: ignore[unresolved-attribute]
SqlEngine.init_engine(pool_size=pool_size, max_overflow=8)
app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa_or_shutdown(sender, **kwargs)
app_base.wait_for_document_index_or_shutdown()
# Less startup checks in multi-tenant case
if MULTI_TENANT:

View File

@@ -30,7 +30,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.docprocessing")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -106,12 +106,12 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
# "SSL connection has been closed unexpectedly"
# actually setting the spawn method in the cloud fixes 95% of these.
# setting pre ping might help even more, but not worrying about that yet
pool_size = cast(int, sender.concurrency) # type: ignore
pool_size = cast(int, sender.concurrency) # ty: ignore[unresolved-attribute]
SqlEngine.init_engine(pool_size=pool_size, max_overflow=8)
app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa_or_shutdown(sender, **kwargs)
app_base.wait_for_document_index_or_shutdown()
# Less startup checks in multi-tenant case
if MULTI_TENANT:

View File

@@ -27,7 +27,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.heavy")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -92,12 +92,12 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
pool_size = cast(int, sender.concurrency) # type: ignore
pool_size = cast(int, sender.concurrency) # ty: ignore[unresolved-attribute]
SqlEngine.init_engine(pool_size=pool_size, max_overflow=8)
app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa_or_shutdown(sender, **kwargs)
app_base.wait_for_document_index_or_shutdown()
# Less startup checks in multi-tenant case
if MULTI_TENANT:

View File

@@ -29,7 +29,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.light")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -95,23 +95,30 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
logger.info(f"Concurrency: {sender.concurrency}") # type: ignore
logger.info(
f"Concurrency: {sender.concurrency}" # ty: ignore[unresolved-attribute]
)
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME)
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=EXTRA_CONCURRENCY) # type: ignore
SqlEngine.init_engine(
pool_size=sender.concurrency, # ty: ignore[unresolved-attribute]
max_overflow=EXTRA_CONCURRENCY,
)
if MANAGED_VESPA:
httpx_init_vespa_pool(
sender.concurrency + EXTRA_CONCURRENCY, # type: ignore
sender.concurrency + EXTRA_CONCURRENCY, # ty: ignore[unresolved-attribute]
ssl_cert=VESPA_CLOUD_CERT_PATH,
ssl_key=VESPA_CLOUD_KEY_PATH,
)
else:
httpx_init_vespa_pool(sender.concurrency + EXTRA_CONCURRENCY) # type: ignore
httpx_init_vespa_pool(
sender.concurrency + EXTRA_CONCURRENCY # ty: ignore[unresolved-attribute]
)
app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa_or_shutdown(sender, **kwargs)
app_base.wait_for_document_index_or_shutdown()
# Less startup checks in multi-tenant case
if MULTI_TENANT:

View File

@@ -20,7 +20,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.monitoring")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect

View File

@@ -3,7 +3,7 @@ import os
from typing import Any
from typing import cast
from celery import bootsteps # type: ignore
from celery import bootsteps # ty: ignore[unresolved-import]
from celery import Celery
from celery import signals
from celery import Task
@@ -38,6 +38,12 @@ from onyx.redis.redis_connector_stop import RedisConnectorStop
from onyx.redis.redis_document_set import RedisDocumentSet
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_usergroup import RedisUserGroup
from onyx.server.metrics.celery_task_metrics import on_celery_task_postrun
from onyx.server.metrics.celery_task_metrics import on_celery_task_prerun
from onyx.server.metrics.celery_task_metrics import on_celery_task_rejected
from onyx.server.metrics.celery_task_metrics import on_celery_task_retry
from onyx.server.metrics.celery_task_metrics import on_celery_task_revoked
from onyx.server.metrics.metrics_server import start_metrics_server
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
@@ -46,7 +52,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.primary")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -59,6 +65,7 @@ def on_task_prerun(
**kwds: Any,
) -> None:
app_base.on_task_prerun(sender, task_id, task, args, kwargs, **kwds)
on_celery_task_prerun(task_id, task)
@signals.task_postrun.connect
@@ -73,6 +80,31 @@ def on_task_postrun(
**kwds: Any,
) -> None:
app_base.on_task_postrun(sender, task_id, task, args, kwargs, retval, state, **kwds)
on_celery_task_postrun(task_id, task, state)
@signals.task_retry.connect
def on_task_retry(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
task_id = getattr(getattr(sender, "request", None), "id", None)
on_celery_task_retry(task_id, sender)
@signals.task_revoked.connect
def on_task_revoked(sender: Any | None = None, **kwargs: Any) -> None:
task_name = getattr(sender, "name", None) or str(sender)
on_celery_task_revoked(kwargs.get("task_id"), task_name)
@signals.task_rejected.connect
def on_task_rejected(sender: Any | None = None, **kwargs: Any) -> None: # noqa: ARG001
message = kwargs.get("message")
task_name: str | None = None
if message is not None:
headers = getattr(message, "headers", None) or {}
task_name = headers.get("task")
if task_name is None:
task_name = "unknown"
on_celery_task_rejected(None, task_name)
@celeryd_init.connect
@@ -85,14 +117,14 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME)
pool_size = cast(int, sender.concurrency) # type: ignore
pool_size = cast(int, sender.concurrency) # ty: ignore[unresolved-attribute]
SqlEngine.init_engine(
pool_size=pool_size, max_overflow=CELERY_WORKER_PRIMARY_POOL_OVERFLOW
)
app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa_or_shutdown(sender, **kwargs)
app_base.wait_for_document_index_or_shutdown()
logger.info(f"Running as the primary celery worker: pid={os.getpid()}")
@@ -145,7 +177,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
raise WorkerShutdown("Primary worker lock could not be acquired!")
# tacking on our own user data to the sender
sender.primary_worker_lock = lock # type: ignore
sender.primary_worker_lock = lock # ty: ignore[unresolved-attribute]
# As currently designed, when this worker starts as "primary", we reinitialize redis
# to a clean state (for our purposes, anyway)
@@ -212,6 +244,7 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
start_metrics_server("primary")
app_base.on_worker_ready(sender, **kwargs)

View File

@@ -22,7 +22,7 @@ logger = setup_logger()
celery_app = Celery(__name__)
celery_app.config_from_object("onyx.background.celery.configs.user_file_processing")
celery_app.Task = app_base.TenantAwareTask # type: ignore [misc]
celery_app.Task = app_base.TenantAwareTask # ty: ignore[invalid-assignment]
@signals.task_prerun.connect
@@ -66,12 +66,12 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None:
# "SSL connection has been closed unexpectedly"
# actually setting the spawn method in the cloud fixes 95% of these.
# setting pre ping might help even more, but not worrying about that yet
pool_size = cast(int, sender.concurrency) # type: ignore
pool_size = cast(int, sender.concurrency) # ty: ignore[unresolved-attribute]
SqlEngine.init_engine(pool_size=pool_size, max_overflow=8)
app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa_or_shutdown(sender, **kwargs)
app_base.wait_for_document_index_or_shutdown()
# Less startup checks in multi-tenant case
if MULTI_TENANT:

View File

@@ -179,7 +179,7 @@ def celery_inspect_get_workers(name_filter: str | None, app: Celery) -> list[str
# filter for and create an indexing specific inspect object
inspect = app.control.inspect()
workers: dict[str, Any] = inspect.ping() # type: ignore
workers: dict[str, Any] = inspect.ping() # ty: ignore[invalid-assignment]
if workers:
for worker_name in list(workers.keys()):
# if the name filter not set, return all worker names
@@ -208,7 +208,9 @@ def celery_inspect_get_reserved(worker_names: list[str], app: Celery) -> set[str
inspect = app.control.inspect(destination=worker_names)
# get the list of reserved tasks
reserved_tasks: dict[str, list] | None = inspect.reserved() # type: ignore
reserved_tasks: dict[str, list] | None = ( # ty: ignore[invalid-assignment]
inspect.reserved()
)
if reserved_tasks:
for _, task_list in reserved_tasks.items():
for task in task_list:
@@ -229,7 +231,9 @@ def celery_inspect_get_active(worker_names: list[str], app: Celery) -> set[str]:
inspect = app.control.inspect(destination=worker_names)
# get the list of reserved tasks
active_tasks: dict[str, list] | None = inspect.active() # type: ignore
active_tasks: dict[str, list] | None = ( # ty: ignore[invalid-assignment]
inspect.active()
)
if active_tasks:
for _, task_list in active_tasks.items():
for task in task_list:

View File

@@ -5,8 +5,8 @@ from logging.handlers import RotatingFileHandler
import psutil
from onyx.utils.logger import is_running_in_container
from onyx.utils.logger import setup_logger
from onyx.utils.platform import is_running_in_container
# Regular application logger
logger = setup_logger()

View File

@@ -6,9 +6,11 @@ from celery.schedules import crontab
from onyx.configs.app_configs import AUTO_LLM_CONFIG_URL
from onyx.configs.app_configs import AUTO_LLM_UPDATE_INTERVAL_SECONDS
from onyx.configs.app_configs import DISABLE_OPENSEARCH_MIGRATION_TASK
from onyx.configs.app_configs import DISABLE_VECTOR_DB
from onyx.configs.app_configs import ENABLE_OPENSEARCH_INDEXING_FOR_ONYX
from onyx.configs.app_configs import ENTERPRISE_EDITION_ENABLED
from onyx.configs.app_configs import ONYX_DISABLE_VESPA
from onyx.configs.app_configs import SCHEDULED_EVAL_DATASET_NAMES
from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX
from onyx.configs.constants import OnyxCeleryPriority
@@ -66,6 +68,7 @@ beat_task_templates: list[dict] = [
"options": {
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
"work_gated": True,
},
},
{
@@ -99,6 +102,7 @@ beat_task_templates: list[dict] = [
"expires": BEAT_EXPIRES_DEFAULT,
# Gated tenants may still have connectors awaiting deletion.
"skip_gated": False,
"work_gated": True,
},
},
{
@@ -108,6 +112,7 @@ beat_task_templates: list[dict] = [
"options": {
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
"work_gated": True,
},
},
{
@@ -117,6 +122,7 @@ beat_task_templates: list[dict] = [
"options": {
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
"work_gated": True,
},
},
{
@@ -154,6 +160,7 @@ beat_task_templates: list[dict] = [
"priority": OnyxCeleryPriority.LOW,
"expires": BEAT_EXPIRES_DEFAULT,
"queue": OnyxCeleryQueues.SANDBOX,
"work_gated": True,
},
},
{
@@ -178,6 +185,7 @@ if ENTERPRISE_EDITION_ENABLED:
"options": {
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
"work_gated": True,
},
},
{
@@ -187,6 +195,7 @@ if ENTERPRISE_EDITION_ENABLED:
"options": {
"priority": OnyxCeleryPriority.MEDIUM,
"expires": BEAT_EXPIRES_DEFAULT,
"work_gated": True,
},
},
]
@@ -226,7 +235,11 @@ if SCHEDULED_EVAL_DATASET_NAMES:
)
# Add OpenSearch migration task if enabled.
if ENABLE_OPENSEARCH_INDEXING_FOR_ONYX:
if (
ENABLE_OPENSEARCH_INDEXING_FOR_ONYX
and not DISABLE_OPENSEARCH_MIGRATION_TASK
and not ONYX_DISABLE_VESPA
):
beat_task_templates.append(
{
"name": "migrate-chunks-from-vespa-to-opensearch",
@@ -279,7 +292,7 @@ def make_cloud_generator_task(task: dict[str, Any]) -> dict[str, Any]:
cloud_task["kwargs"] = {}
cloud_task["kwargs"]["task_name"] = task["task"]
optional_fields = ["queue", "priority", "expires", "skip_gated"]
optional_fields = ["queue", "priority", "expires", "skip_gated", "work_gated"]
for field in optional_fields:
if field in task["options"]:
cloud_task["kwargs"][field] = task["options"][field]
@@ -372,12 +385,14 @@ if not MULTI_TENANT:
]
)
# `skip_gated` is a cloud-only hint consumed by `cloud_beat_task_generator`. Strip
# it before extending the self-hosted schedule so it doesn't leak into apply_async
# as an unrecognised option on every fired task message.
# `skip_gated` and `work_gated` are cloud-only hints consumed by
# `cloud_beat_task_generator`. Strip them before extending the self-hosted
# schedule so they don't leak into apply_async as unrecognised options on
# every fired task message.
for _template in beat_task_templates:
_self_hosted_template = copy.deepcopy(_template)
_self_hosted_template["options"].pop("skip_gated", None)
_self_hosted_template["options"].pop("work_gated", None)
tasks_to_schedule.append(_self_hosted_template)

View File

@@ -59,6 +59,7 @@ from onyx.redis.redis_connector_delete import RedisConnectorDelete
from onyx.redis.redis_connector_delete import RedisConnectorDeletePayload
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_tenant_work_gating import maybe_mark_tenant_active
from onyx.server.metrics.deletion_metrics import inc_deletion_blocked
from onyx.server.metrics.deletion_metrics import inc_deletion_completed
from onyx.server.metrics.deletion_metrics import inc_deletion_fence_reset
@@ -165,12 +166,22 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str) -> bool | N
r.set(OnyxRedisSignals.BLOCK_VALIDATE_CONNECTOR_DELETION_FENCES, 1, ex=300)
# collect cc_pair_ids
# collect cc_pair_ids and note whether any are in DELETING status
cc_pair_ids: list[int] = []
has_deleting_cc_pair = False
with get_session_with_current_tenant() as db_session:
cc_pairs = get_connector_credential_pairs(db_session)
for cc_pair in cc_pairs:
cc_pair_ids.append(cc_pair.id)
if cc_pair.status == ConnectorCredentialPairStatus.DELETING:
has_deleting_cc_pair = True
# Tenant-work-gating hook: mark only when at least one cc_pair is in
# DELETING status. Marking on bare cc_pair existence would keep
# nearly every tenant in the active set since most have cc_pairs
# but almost none are actively being deleted on any given cycle.
if has_deleting_cc_pair:
maybe_mark_tenant_active(tenant_id)
# try running cleanup on the cc_pair_ids
for cc_pair_id in cc_pair_ids:

View File

@@ -34,6 +34,7 @@ from onyx.db.index_attempt import mark_attempt_canceled
from onyx.db.index_attempt import mark_attempt_failed
from onyx.db.indexing_coordination import IndexingCoordination
from onyx.redis.redis_connector import RedisConnector
from onyx.server.metrics.connector_health_metrics import on_index_attempt_status_change
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import global_version
from shared_configs.configs import SENTRY_DSN
@@ -470,6 +471,15 @@ def docfetching_proxy_task(
index_attempt.connector_credential_pair.connector.source.value
)
cc_pair = index_attempt.connector_credential_pair
on_index_attempt_status_change(
tenant_id=tenant_id,
source=result.connector_source,
cc_pair_id=cc_pair_id,
connector_name=cc_pair.connector.name or f"cc_pair_{cc_pair_id}",
status="in_progress",
)
while True:
sleep(5)

View File

@@ -108,6 +108,7 @@ from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.redis.redis_tenant_work_gating import maybe_mark_tenant_active
from onyx.redis.redis_utils import is_fence
from onyx.server.metrics.connector_health_metrics import on_connector_error_state_change
from onyx.server.metrics.connector_health_metrics import on_connector_indexing_success
@@ -537,10 +538,12 @@ def check_indexing_completion(
)
source = cc_pair.connector.source.value
connector_name = cc_pair.connector.name or f"cc_pair_{cc_pair.id}"
on_index_attempt_status_change(
tenant_id=tenant_id,
source=source,
cc_pair_id=cc_pair.id,
connector_name=connector_name,
status=attempt.status.value,
)
@@ -568,6 +571,7 @@ def check_indexing_completion(
tenant_id=tenant_id,
source=source,
cc_pair_id=cc_pair.id,
connector_name=connector_name,
docs_indexed=attempt.new_docs_indexed or 0,
success_timestamp=attempt.time_updated.timestamp(),
)
@@ -595,6 +599,7 @@ def check_indexing_completion(
tenant_id=tenant_id,
source=source,
cc_pair_id=cc_pair.id,
connector_name=connector_name,
in_error=False,
)
@@ -806,7 +811,7 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
# we need to use celery's redis client to access its redis data
# (which lives on a different db number)
# redis_client_celery: Redis = self.app.broker_connection().channel().client # type: ignore
# redis_client_celery: Redis = self.app.broker_connection().channel().client
lock_beat: RedisLock = redis_client.lock(
OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK,
@@ -920,10 +925,14 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
cc_pair_id=cc_pair_id,
in_repeated_error_state=True,
)
error_connector_name = (
cc_pair.connector.name or f"cc_pair_{cc_pair.id}"
)
on_connector_error_state_change(
tenant_id=tenant_id,
source=cc_pair.connector.source.value,
cc_pair_id=cc_pair_id,
connector_name=error_connector_name,
in_error=True,
)
@@ -1005,6 +1014,14 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
f"Skipping secondary indexing: switchover_type=INSTANT for search_settings={secondary_search_settings.id}"
)
# Tenant-work-gating hook: refresh membership only when indexing
# actually dispatched at least one docfetching task. `_kickoff_indexing_tasks`
# internally calls `should_index()` to decide per-cc_pair; using
# `tasks_created > 0` here gives us a "real work was done" signal
# rather than just "tenant has a cc_pair somewhere."
if tasks_created > 0:
maybe_mark_tenant_active(tenant_id)
# 2/3: VALIDATE
# Check for inconsistent index attempts - active attempts without task IDs
# This can happen if attempt creation fails partway through

View File

@@ -42,7 +42,7 @@ from onyx.db.models import UserGroup
from onyx.db.search_settings import get_active_search_settings_list
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.utils.platform import is_running_in_container
from onyx.utils.logger import is_running_in_container
from onyx.utils.telemetry import optional_telemetry
from onyx.utils.telemetry import RecordType
from shared_configs.configs import MULTI_TENANT

View File

@@ -72,6 +72,7 @@ from onyx.redis.redis_hierarchy import get_source_node_id_from_cache
from onyx.redis.redis_hierarchy import HierarchyNodeCacheEntry
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_tenant_work_gating import maybe_mark_tenant_active
from onyx.server.metrics.pruning_metrics import observe_pruning_diff_duration
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.server.utils import make_short_id
@@ -228,6 +229,7 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
for cc_pair_entry in cc_pairs:
cc_pair_ids.append(cc_pair_entry.id)
prune_dispatched = False
for cc_pair_id in cc_pair_ids:
lock_beat.reacquire()
with get_session_with_current_tenant() as db_session:
@@ -250,9 +252,18 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
logger.info(f"Pruning not created: {cc_pair_id}")
continue
prune_dispatched = True
task_logger.info(
f"Pruning queued: cc_pair={cc_pair.id} id={payload_id}"
)
# Tenant-work-gating hook: mark only when at least one cc_pair
# was actually due for pruning AND a prune task was dispatched.
# Marking on bare cc_pair existence over-counts the population
# since most tenants have cc_pairs but almost none are due on
# any given cycle.
if prune_dispatched:
maybe_mark_tenant_active(tenant_id)
r.set(OnyxRedisSignals.BLOCK_PRUNING, 1, ex=_get_pruning_block_expiration())
# we want to run this less frequently than the overall task

View File

@@ -248,6 +248,7 @@ def document_by_cc_pair_cleanup_task(
),
)
mark_document_as_modified(document_id, db_session)
db_session.commit()
completion_status = (
OnyxCeleryTaskCompletionStatus.NON_RETRYABLE_EXCEPTION
)

View File

@@ -15,6 +15,7 @@ from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.db.document import construct_document_id_select_by_needs_sync
from onyx.db.document import count_documents_by_needs_sync
from onyx.redis.redis_tenant_work_gating import maybe_mark_tenant_active
from onyx.utils.logger import setup_logger
# Redis keys for document sync tracking
@@ -150,6 +151,10 @@ def try_generate_stale_document_sync_tasks(
logger.info("No stale documents found. Skipping sync tasks generation.")
return None
# Tenant-work-gating hook: refresh this tenant's active-set membership
# whenever vespa sync actually has stale docs to dispatch.
maybe_mark_tenant_active(tenant_id)
logger.info(
f"Stale documents found (at least {stale_doc_count}). Generating sync tasks in one batch."
)

View File

@@ -61,7 +61,9 @@ def load_checkpoint(
checkpoint_io = file_store.read_file(checkpoint_pointer, mode="rb")
checkpoint_data = checkpoint_io.read().decode("utf-8")
if isinstance(connector, CheckpointedConnector):
return connector.validate_checkpoint_json(checkpoint_data)
return connector.validate_checkpoint_json( # ty: ignore[invalid-return-type]
checkpoint_data
)
return ConnectorCheckpoint.model_validate_json(checkpoint_data)

View File

@@ -69,7 +69,6 @@ from onyx.redis.redis_pool import get_redis_client
from onyx.server.features.build.indexing.persistent_document_writer import (
get_persistent_document_writer,
)
from onyx.server.metrics.connector_health_metrics import on_index_attempt_status_change
from onyx.utils.logger import setup_logger
from onyx.utils.middleware import make_randomized_onyx_request_id
from onyx.utils.postgres_sanitization import sanitize_document_for_postgres
@@ -269,13 +268,6 @@ def run_docfetching_entrypoint(
)
credential_id = attempt.connector_credential_pair.credential_id
on_index_attempt_status_change(
tenant_id=tenant_id,
source=attempt.connector_credential_pair.connector.source.value,
cc_pair_id=connector_credential_pair_id,
status="in_progress",
)
logger.info(
f"Docfetching starting{tenant_str}: "
f"connector='{connector_name}' "

View File

@@ -1164,7 +1164,10 @@ def run_llm_loop(
emitter.emit(
Packet(
placement=Placement(turn_index=llm_cycle_count + reasoning_cycles),
placement=Placement(
turn_index=llm_cycle_count # ty: ignore[possibly-unresolved-reference]
+ reasoning_cycles
),
obj=OverallStop(type="stop"),
)
)

View File

@@ -826,6 +826,12 @@ def translate_history_to_llm_format(
base64_data = img_file.to_base64()
image_url = f"data:{image_type};base64,{base64_data}"
content_parts.append(
TextContentPart(
type="text",
text=f"[attached image — file_id: {img_file.file_id}]",
)
)
image_part = ImageContentPart(
type="image_url",
image_url=ImageUrlDetail(

View File

@@ -282,6 +282,7 @@ OPENSEARCH_ADMIN_USERNAME = os.environ.get("OPENSEARCH_ADMIN_USERNAME", "admin")
OPENSEARCH_ADMIN_PASSWORD = os.environ.get(
"OPENSEARCH_ADMIN_PASSWORD", "StrongPassword123!"
)
OPENSEARCH_USE_SSL = os.environ.get("OPENSEARCH_USE_SSL", "true").lower() == "true"
USING_AWS_MANAGED_OPENSEARCH = (
os.environ.get("USING_AWS_MANAGED_OPENSEARCH", "").lower() == "true"
)
@@ -324,6 +325,10 @@ ENABLE_OPENSEARCH_RETRIEVAL_FOR_ONYX = (
ENABLE_OPENSEARCH_INDEXING_FOR_ONYX
and os.environ.get("ENABLE_OPENSEARCH_RETRIEVAL_FOR_ONYX", "").lower() == "true"
)
DISABLE_OPENSEARCH_MIGRATION_TASK = (
os.environ.get("DISABLE_OPENSEARCH_MIGRATION_TASK", "").lower() == "true"
)
ONYX_DISABLE_VESPA = os.environ.get("ONYX_DISABLE_VESPA", "").lower() == "true"
# Whether we should check for and create an index if necessary every time we
# instantiate an OpenSearchDocumentIndex on multitenant cloud. Defaults to True.
VERIFY_CREATE_OPENSEARCH_INDEX_ON_INIT_MT = (
@@ -840,6 +845,29 @@ MAX_FILE_SIZE_BYTES = int(
os.environ.get("MAX_FILE_SIZE_BYTES") or 2 * 1024 * 1024 * 1024
) # 2GB in bytes
# Maximum embedded images allowed in a single file. PDFs (and other formats)
# with thousands of embedded images can OOM the user-file-processing worker
# because every image is decoded with PIL and then sent to the vision LLM.
# Enforced both at upload time (rejects the file) and during extraction
# (defense-in-depth: caps the number of images materialized).
#
# Clamped to >= 0; a negative env value would turn upload validation into
# always-fail and extraction into always-stop, which is never desired. 0
# disables image extraction entirely, which is a valid (if aggressive) setting.
MAX_EMBEDDED_IMAGES_PER_FILE = max(
0, int(os.environ.get("MAX_EMBEDDED_IMAGES_PER_FILE") or 500)
)
# Maximum embedded images allowed across all files in a single upload batch.
# Protects against the scenario where a user uploads many files that each
# fall under MAX_EMBEDDED_IMAGES_PER_FILE but aggregate to enough work
# (serial-ish celery fan-out plus per-image vision-LLM calls) to OOM the
# worker under concurrency or run up surprise latency/cost. Also clamped
# to >= 0.
MAX_EMBEDDED_IMAGES_PER_UPLOAD = max(
0, int(os.environ.get("MAX_EMBEDDED_IMAGES_PER_UPLOAD") or 1000)
)
# Use document summary for contextual rag
USE_DOCUMENT_SUMMARY = os.environ.get("USE_DOCUMENT_SUMMARY", "true").lower() == "true"
# Use chunk summary for contextual rag

View File

@@ -639,9 +639,11 @@ REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPINTVL] = 15
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPCNT] = 3
if platform.system() == "Darwin":
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPALIVE] = 60 # type: ignore[attr-defined,unused-ignore]
REDIS_SOCKET_KEEPALIVE_OPTIONS[
socket.TCP_KEEPALIVE # ty: ignore[unresolved-attribute]
] = 60
else:
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPIDLE] = 60 # type: ignore[attr-defined,unused-ignore]
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPIDLE] = 60
class OnyxCallTypes(str, Enum):

View File

@@ -547,7 +547,7 @@ class AirtableConnector(LoadConnector):
for record in batch_records:
# Capture the current context so that the thread gets the current tenant ID
current_context = contextvars.copy_context()
future_to_record[
future_to_record[ # ty: ignore[invalid-assignment]
executor.submit(
current_context.run,
self._process_record,

View File

@@ -3,7 +3,7 @@ from collections.abc import Iterator
from datetime import datetime
from typing import Dict
import asana # type: ignore
import asana
from onyx.utils.logger import setup_logger

View File

@@ -290,8 +290,8 @@ class AxeroConnector(PollConnector):
if not self.axero_key or not self.base_url:
raise ConnectorMissingCredentialError("Axero")
start_datetime = datetime.utcfromtimestamp(start).replace(tzinfo=timezone.utc)
end_datetime = datetime.utcfromtimestamp(end).replace(tzinfo=timezone.utc)
start_datetime = datetime.fromtimestamp(start, tz=timezone.utc)
end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)
entity_types = []
if self.include_article:
@@ -327,7 +327,7 @@ class AxeroConnector(PollConnector):
)
all_axero_forums = _map_post_to_parent(
posts=forums_posts,
posts=forums_posts, # ty: ignore[invalid-argument-type]
api_key=self.axero_key,
axero_base_url=self.base_url,
)

View File

@@ -76,7 +76,9 @@ class BlobStorageConnector(LoadConnector, PollConnector):
self.bucket_region: Optional[str] = None
self.european_residency: bool = european_residency
def set_allow_images(self, allow_images: bool) -> None:
def set_allow_images( # ty: ignore[invalid-method-override]
self, allow_images: bool
) -> None:
"""Set whether to process images in this connector."""
logger.info(f"Setting allow_images to {allow_images}.")
self._allow_images = allow_images
@@ -195,7 +197,9 @@ class BlobStorageConnector(LoadConnector, PollConnector):
method="sts-assume-role",
)
botocore_session = get_session()
botocore_session._credentials = refreshable # type: ignore[attr-defined]
botocore_session._credentials = ( # ty: ignore[unresolved-attribute]
refreshable
)
session = boto3.Session(botocore_session=botocore_session)
self.s3_client = session.client("s3")
elif authentication_method == "assume_role":

View File

@@ -2,6 +2,7 @@ import html
import time
from collections.abc import Callable
from datetime import datetime
from datetime import timezone
from typing import Any
from onyx.configs.app_configs import INDEX_BATCH_SIZE
@@ -56,14 +57,14 @@ class BookstackConnector(LoadConnector, PollConnector):
}
if start:
params["filter[updated_at:gte]"] = datetime.utcfromtimestamp(
start
params["filter[updated_at:gte]"] = datetime.fromtimestamp(
start, tz=timezone.utc
).strftime("%Y-%m-%d")
if end:
params["filter[updated_at:lte]"] = datetime.utcfromtimestamp(end).strftime(
"%Y-%m-%d"
)
params["filter[updated_at:lte]"] = datetime.fromtimestamp(
end, tz=timezone.utc
).strftime("%Y-%m-%d")
batch = bookstack_client.get(endpoint, params=params).get("data", [])
doc_batch: list[Document | HierarchyNode] = [

View File

@@ -95,11 +95,13 @@ class ClickupConnector(LoadConnector, PollConnector):
params["date_updated_lt"] = end
if self.connector_type == "list":
params["list_ids[]"] = self.connector_ids
params["list_ids[]"] = self.connector_ids # ty: ignore[invalid-assignment]
elif self.connector_type == "folder":
params["project_ids[]"] = self.connector_ids
params["project_ids[]"] = ( # ty: ignore[invalid-assignment]
self.connector_ids
)
elif self.connector_type == "space":
params["space_ids[]"] = self.connector_ids
params["space_ids[]"] = self.connector_ids # ty: ignore[invalid-assignment]
url_endpoint = f"/team/{self.team_id}/task"

View File

@@ -6,7 +6,7 @@ from datetime import timezone
from typing import Any
from urllib.parse import quote
from atlassian.errors import ApiError # type: ignore
from atlassian.errors import ApiError
from requests.exceptions import HTTPError
from typing_extensions import override

View File

@@ -26,7 +26,7 @@ from typing import TypeVar
from urllib.parse import quote
import bs4
from atlassian import Confluence # type:ignore
from atlassian import Confluence
from redis import Redis
from requests import HTTPError
@@ -971,7 +971,7 @@ class OnyxConfluence:
:return: Returns the user details
"""
from atlassian.errors import ApiPermissionError # type:ignore
from atlassian.errors import ApiPermissionError
url = "rest/api/user/current"
params = {}

View File

@@ -165,7 +165,7 @@ class ConnectorRunner(Generic[CT]):
checkpoint_connector_generator = load_from_checkpoint(
start=self.time_range[0].timestamp(),
end=self.time_range[1].timestamp(),
checkpoint=checkpoint,
checkpoint=checkpoint, # ty: ignore[invalid-argument-type]
)
next_checkpoint: CT | None = None
# this is guaranteed to always run at least once with next_checkpoint being non-None
@@ -174,7 +174,9 @@ class ConnectorRunner(Generic[CT]):
hierarchy_node,
failure,
next_checkpoint,
) in CheckpointOutputWrapper[CT]()(checkpoint_connector_generator):
) in CheckpointOutputWrapper[CT]()(
checkpoint_connector_generator # ty: ignore[invalid-argument-type]
):
if document is not None:
self.doc_batch.append(document)

View File

@@ -83,7 +83,9 @@ class OnyxDBCredentialsProvider(
f"No credential found: credential={self._credential_id}"
)
credential.credential_json = credential_json # type: ignore[assignment]
credential.credential_json = ( # ty: ignore[invalid-assignment]
credential_json
)
db_session.commit()
except Exception:
db_session.rollback()

View File

@@ -3,6 +3,7 @@ from collections.abc import Callable
from collections.abc import Iterator
from datetime import datetime
from datetime import timezone
from email.utils import parsedate_to_datetime
from typing import Any
from typing import TypeVar
from urllib.parse import urljoin
@@ -10,7 +11,6 @@ from urllib.parse import urlparse
import requests
from dateutil.parser import parse
from dateutil.parser import ParserError
from onyx.configs.app_configs import CONNECTOR_LOCALHOST_OVERRIDE
from onyx.configs.constants import DocumentSource
@@ -56,18 +56,16 @@ def time_str_to_utc(datetime_str: str) -> datetime:
if fixed not in candidates:
candidates.append(fixed)
last_exception: Exception | None = None
for candidate in candidates:
try:
dt = parse(candidate)
return datetime_to_utc(dt)
except (ValueError, ParserError) as exc:
last_exception = exc
# dateutil is the primary; the stdlib RFC 2822 parser is a fallback for
# inputs dateutil rejects (e.g. headers concatenated without a CRLF —
# TZ may be dropped, datetime_to_utc then assumes UTC).
for parser in (parse, parsedate_to_datetime):
for candidate in candidates:
try:
return datetime_to_utc(parser(candidate))
except (TypeError, ValueError, OverflowError):
continue
if last_exception is not None:
raise last_exception
# Fallback in case parsing failed without raising (should not happen)
raise ValueError(f"Unable to parse datetime string: {datetime_str}")

View File

@@ -41,7 +41,7 @@ def tabular_file_to_sections(
"""
lowered = file_name.lower()
if lowered.endswith(".xlsx"):
if lowered.endswith(tuple(OnyxFileExtensions.SPREADSHEET_EXTENSIONS)):
return [
TabularSection(
link=link or file_name,

View File

@@ -53,8 +53,10 @@ def _convert_message_to_document(
if isinstance(message.channel, TextChannel) and (
channel_name := message.channel.name
):
metadata["Channel"] = channel_name
semantic_substring += f" in Channel: #{channel_name}"
metadata["Channel"] = channel_name # ty: ignore[possibly-unresolved-reference]
semantic_substring += (
f" in Channel: #{channel_name}" # ty: ignore[possibly-unresolved-reference]
)
# Single messages dont have a title
title = ""

View File

@@ -221,8 +221,8 @@ class DiscourseConnector(PollConnector):
if self.permissions is None:
raise ConnectorMissingCredentialError("Discourse")
start_datetime = datetime.utcfromtimestamp(start).replace(tzinfo=timezone.utc)
end_datetime = datetime.utcfromtimestamp(end).replace(tzinfo=timezone.utc)
start_datetime = datetime.fromtimestamp(start, tz=timezone.utc)
end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)
self._get_categories_map()

View File

@@ -2,10 +2,10 @@ from datetime import timezone
from io import BytesIO
from typing import Any
from dropbox import Dropbox # type: ignore[import-untyped]
from dropbox.exceptions import ApiError # type: ignore[import-untyped]
from dropbox import Dropbox
from dropbox.exceptions import ApiError
from dropbox.exceptions import AuthError
from dropbox.files import FileMetadata # type: ignore[import-untyped]
from dropbox.files import FileMetadata
from dropbox.files import FolderMetadata
from onyx.configs.app_configs import INDEX_BATCH_SIZE

View File

@@ -189,7 +189,7 @@ def _process_file(
if is_tabular_file(file_name):
# Produce TabularSections
lowered_name = file_name.lower()
if lowered_name.endswith(".xlsx"):
if lowered_name.endswith(tuple(OnyxFileExtensions.SPREADSHEET_EXTENSIONS)):
file.seek(0)
tabular_source: IO[bytes] = file
else:

View File

@@ -7,7 +7,7 @@ from typing import Dict
from google.oauth2.credentials import Credentials as OAuthCredentials
from google.oauth2.service_account import Credentials as ServiceAccountCredentials
from googleapiclient.errors import HttpError # type: ignore
from googleapiclient.errors import HttpError
from onyx.access.models import ExternalAccess
from onyx.configs.app_configs import INDEX_BATCH_SIZE
@@ -253,7 +253,17 @@ def thread_to_document(
updated_at_datetime = None
if updated_at:
updated_at_datetime = time_str_to_utc(updated_at)
try:
updated_at_datetime = time_str_to_utc(updated_at)
except (ValueError, OverflowError) as e:
# Old mailboxes contain RFC-violating Date headers. Drop the
# timestamp instead of aborting the indexing run.
logger.warning(
"Skipping unparseable Gmail Date header on thread %s: %r (%s)",
full_thread.get("id"),
updated_at,
e,
)
id = full_thread.get("id")
if not id:
@@ -296,7 +306,9 @@ def _full_thread_from_id(
try:
thread = next(
execute_single_retrieval(
retrieval_function=gmail_service.users().threads().get,
retrieval_function=gmail_service.users() # ty: ignore[unresolved-attribute]
.threads()
.get,
list_key=None,
userId=user_email,
fields=THREAD_FIELDS,
@@ -394,7 +406,7 @@ class GmailConnector(
admin_service = get_admin_service(self.creds, self.primary_admin_email)
emails = []
for user in execute_paginated_retrieval(
retrieval_function=admin_service.users().list,
retrieval_function=admin_service.users().list, # ty: ignore[unresolved-attribute]
list_key="users",
fields=USER_FIELDS,
domain=self.google_domain,
@@ -438,7 +450,9 @@ class GmailConnector(
try:
for thread in execute_paginated_retrieval_with_max_pages(
max_num_pages=PAGES_PER_CHECKPOINT,
retrieval_function=gmail_service.users().threads().list,
retrieval_function=gmail_service.users() # ty: ignore[unresolved-attribute]
.threads()
.list,
list_key="threads",
userId=user_email,
fields=THREAD_LIST_FIELDS,

View File

@@ -1,4 +1,5 @@
import base64
import copy
import time
from collections.abc import Generator
from datetime import datetime
@@ -8,27 +9,58 @@ from typing import Any
from typing import cast
import requests
from pydantic import BaseModel
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from onyx.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
from onyx.configs.app_configs import GONG_CONNECTOR_START_TIME
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import CheckpointedConnector
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import TextSection
from onyx.utils.logger import setup_logger
logger = setup_logger()
class GongConnector(LoadConnector, PollConnector):
class GongConnectorCheckpoint(ConnectorCheckpoint):
# Resolved workspace IDs to iterate through.
# None means "not yet resolved" — first checkpoint call resolves them.
# Inner None means "no workspace filter" (fetch all).
workspace_ids: list[str | None] | None = None
# Index into workspace_ids for current workspace
workspace_index: int = 0
# Gong API cursor for current workspace's transcript pagination
cursor: str | None = None
# Cached time range — computed once, reused across checkpoint calls
time_range: tuple[str, str] | None = None
class _TranscriptPage(BaseModel):
"""One page of transcripts from /v2/calls/transcript."""
transcripts: list[dict[str, Any]]
next_cursor: str | None = None
class _CursorExpiredError(Exception):
"""Raised when Gong rejects a pagination cursor as expired.
Gong pagination cursors TTL is ~1 hour from the first request in a
pagination sequence, not from the last cursor fetch. Since checkpointed
connector runs can pause between invocations, a resumed run may encounter
an expired cursor and must restart the current workspace from scratch.
See https://visioneers.gong.io/integrations-77/pagination-cursor-expires-after-1-hours-even-for-a-new-cursor-1382
"""
class GongConnector(CheckpointedConnector[GongConnectorCheckpoint]):
BASE_URL = "https://api.gong.io"
MAX_CALL_DETAILS_ATTEMPTS = 6
CALL_DETAILS_DELAY = 30 # in seconds
@@ -38,13 +70,9 @@ class GongConnector(LoadConnector, PollConnector):
def __init__(
self,
workspaces: list[str] | None = None,
batch_size: int = INDEX_BATCH_SIZE,
continue_on_fail: bool = CONTINUE_ON_CONNECTOR_FAILURE,
hide_user_info: bool = False,
) -> None:
self.workspaces = workspaces
self.batch_size: int = batch_size
self.continue_on_fail = continue_on_fail
self.auth_token_basic: str | None = None
self.hide_user_info = hide_user_info
self._last_request_time: float = 0.0
@@ -98,67 +126,50 @@ class GongConnector(LoadConnector, PollConnector):
# Then the user input is treated as the name
return {**id_id_map, **name_id_map}
def _get_transcript_batches(
self, start_datetime: str | None = None, end_datetime: str | None = None
) -> Generator[list[dict[str, Any]], None, None]:
body: dict[str, dict] = {"filter": {}}
def _fetch_transcript_page(
self,
start_datetime: str | None,
end_datetime: str | None,
workspace_id: str | None,
cursor: str | None,
) -> _TranscriptPage:
"""Fetch one page of transcripts from the Gong API.
Raises _CursorExpiredError if Gong reports the pagination cursor
expired (TTL is ~1 hour from first request in the pagination sequence).
"""
body: dict[str, Any] = {"filter": {}}
if start_datetime:
body["filter"]["fromDateTime"] = start_datetime
if end_datetime:
body["filter"]["toDateTime"] = end_datetime
if workspace_id:
body["filter"]["workspaceId"] = workspace_id
if cursor:
body["cursor"] = cursor
# The batch_ids in the previous method appears to be batches of call_ids to process
# In this method, we will retrieve transcripts for them in batches.
transcripts: list[dict[str, Any]] = []
workspace_list = self.workspaces or [None] # type: ignore
workspace_map = self._get_workspace_id_map() if self.workspaces else {}
response = self._throttled_request(
"POST", GongConnector.make_url("/v2/calls/transcript"), json=body
)
# If no calls in the range, return empty
if response.status_code == 404:
return _TranscriptPage(transcripts=[])
for workspace in workspace_list:
if workspace:
logger.info(f"Updating Gong workspace: {workspace}")
workspace_id = workspace_map.get(workspace)
if not workspace_id:
logger.error(f"Invalid Gong workspace: {workspace}")
if not self.continue_on_fail:
raise ValueError(f"Invalid workspace: {workspace}")
continue
body["filter"]["workspaceId"] = workspace_id
else:
if "workspaceId" in body["filter"]:
del body["filter"]["workspaceId"]
if not response.ok:
# Cursor expiration comes back as a 4xx with this error message —
# detect it before raise_for_status so callers can restart the workspace.
if cursor and "cursor has expired" in response.text.lower():
raise _CursorExpiredError(response.text)
logger.error(f"Error fetching transcripts: {response.text}")
response.raise_for_status()
while True:
response = self._throttled_request(
"POST", GongConnector.make_url("/v2/calls/transcript"), json=body
)
# If no calls in the range, just break out
if response.status_code == 404:
break
data = response.json()
return _TranscriptPage(
transcripts=data.get("callTranscripts", []),
next_cursor=data.get("records", {}).get("cursor"),
)
try:
response.raise_for_status()
except Exception:
logger.error(f"Error fetching transcripts: {response.text}")
raise
data = response.json()
call_transcripts = data.get("callTranscripts", [])
transcripts.extend(call_transcripts)
while len(transcripts) >= self.batch_size:
yield transcripts[: self.batch_size]
transcripts = transcripts[self.batch_size :]
cursor = data.get("records", {}).get("cursor")
if cursor:
body["cursor"] = cursor
else:
break
if transcripts:
yield transcripts
def _get_call_details_by_ids(self, call_ids: list[str]) -> dict:
def _get_call_details_by_ids(self, call_ids: list[str]) -> dict[str, Any]:
body = {
"filter": {"callIds": call_ids},
"contentSelector": {"exposedFields": {"parties": True}},
@@ -176,6 +187,50 @@ class GongConnector(LoadConnector, PollConnector):
return call_to_metadata
def _fetch_call_details_with_retry(self, call_ids: list[str]) -> dict[str, Any]:
"""Fetch call details with retry for the Gong API race condition.
The Gong API has a known race where transcript call IDs don't immediately
appear in /v2/calls/extensive. Retries with exponential backoff, only
re-requesting the missing IDs on each attempt.
"""
call_details_map = self._get_call_details_by_ids(call_ids)
if set(call_ids) == set(call_details_map.keys()):
return call_details_map
for attempt in range(2, self.MAX_CALL_DETAILS_ATTEMPTS + 1):
missing_ids = list(set(call_ids) - set(call_details_map.keys()))
logger.warning(
f"_get_call_details_by_ids is missing call id's: current_attempt={attempt - 1} missing_call_ids={missing_ids}"
)
wait_seconds = self.CALL_DETAILS_DELAY * pow(2, attempt - 2)
logger.warning(
f"_get_call_details_by_ids waiting to retry: "
f"wait={wait_seconds}s "
f"current_attempt={attempt - 1} "
f"next_attempt={attempt} "
f"max_attempts={self.MAX_CALL_DETAILS_ATTEMPTS}"
)
time.sleep(wait_seconds)
# Only re-fetch the missing IDs, merge into existing results
new_details = self._get_call_details_by_ids(missing_ids)
call_details_map.update(new_details)
if set(call_ids) == set(call_details_map.keys()):
return call_details_map
missing_ids = list(set(call_ids) - set(call_details_map.keys()))
logger.error(
f"Giving up on missing call id's after "
f"{self.MAX_CALL_DETAILS_ATTEMPTS} attempts: "
f"missing_call_ids={missing_ids}"
f"proceeding with {len(call_details_map)} of "
f"{len(call_ids)} calls"
)
return call_details_map
@staticmethod
def _parse_parties(parties: list[dict]) -> dict[str, str]:
id_mapping = {}
@@ -196,186 +251,46 @@ class GongConnector(LoadConnector, PollConnector):
return id_mapping
def _fetch_calls(
self, start_datetime: str | None = None, end_datetime: str | None = None
) -> GenerateDocumentsOutput:
num_calls = 0
def _resolve_workspace_ids(self) -> list[str | None]:
"""Resolve configured workspace names/IDs to actual workspace IDs.
for transcript_batch in self._get_transcript_batches(
start_datetime, end_datetime
):
doc_batch: list[Document | HierarchyNode] = []
Returns a list of workspace IDs. If no workspaces are configured,
returns [None] to indicate "fetch all workspaces".
transcript_call_ids = cast(
list[str],
[t.get("callId") for t in transcript_batch if t.get("callId")],
Raises ValueError if workspaces are configured but none resolve —
we never silently widen scope to "fetch all" on misconfiguration,
because that could ingest an entire Gong account by mistake.
"""
if not self.workspaces:
return [None]
workspace_map = self._get_workspace_id_map()
resolved: list[str | None] = []
for workspace in self.workspaces:
workspace_id = workspace_map.get(workspace)
if not workspace_id:
logger.error(f"Invalid Gong workspace: {workspace}")
continue
resolved.append(workspace_id)
if not resolved:
raise ValueError(
f"No valid Gong workspaces found — check workspace names/IDs in connector config. Configured: {self.workspaces}"
)
call_details_map: dict[str, Any] = {}
return resolved
# There's a likely race condition in the API where a transcript will have a
# call id but the call to v2/calls/extensive will not return all of the id's
# retry with exponential backoff has been observed to mitigate this
# in ~2 minutes. After max attempts, proceed with whatever we have —
# the per-call loop below will skip missing IDs gracefully.
current_attempt = 0
while True:
current_attempt += 1
call_details_map = self._get_call_details_by_ids(transcript_call_ids)
if set(transcript_call_ids) == set(call_details_map.keys()):
# we got all the id's we were expecting ... break and continue
break
# we are missing some id's. Log and retry with exponential backoff
missing_call_ids = set(transcript_call_ids) - set(
call_details_map.keys()
)
logger.warning(
f"_get_call_details_by_ids is missing call id's: "
f"current_attempt={current_attempt} "
f"missing_call_ids={missing_call_ids}"
)
if current_attempt >= self.MAX_CALL_DETAILS_ATTEMPTS:
logger.error(
f"Giving up on missing call id's after "
f"{self.MAX_CALL_DETAILS_ATTEMPTS} attempts: "
f"missing_call_ids={missing_call_ids}"
f"proceeding with {len(call_details_map)} of "
f"{len(transcript_call_ids)} calls"
)
break
wait_seconds = self.CALL_DETAILS_DELAY * pow(2, current_attempt - 1)
logger.warning(
f"_get_call_details_by_ids waiting to retry: "
f"wait={wait_seconds}s "
f"current_attempt={current_attempt} "
f"next_attempt={current_attempt + 1} "
f"max_attempts={self.MAX_CALL_DETAILS_ATTEMPTS}"
)
time.sleep(wait_seconds)
# now we can iterate per call/transcript
for transcript in transcript_batch:
call_id = transcript.get("callId")
if not call_id or call_id not in call_details_map:
# NOTE(rkuo): seeing odd behavior where call_ids from the transcript
# don't have call details. adding error debugging logs to trace.
logger.error(
f"Couldn't get call information for Call ID: {call_id}"
)
if call_id:
logger.error(
f"Call debug info: call_id={call_id} "
f"call_ids={transcript_call_ids} "
f"call_details_map={call_details_map.keys()}"
)
if not self.continue_on_fail:
raise RuntimeError(
f"Couldn't get call information for Call ID: {call_id}"
)
continue
call_details = call_details_map[call_id]
call_metadata = call_details["metaData"]
call_time_str = call_metadata["started"]
call_title = call_metadata["title"]
logger.info(
f"{num_calls + 1}: Indexing Gong call id {call_id} from {call_time_str.split('T', 1)[0]}: {call_title}"
)
call_parties = cast(list[dict] | None, call_details.get("parties"))
if call_parties is None:
logger.error(f"Couldn't get parties for Call ID: {call_id}")
call_parties = []
id_to_name_map = self._parse_parties(call_parties)
# Keeping a separate dict here in case the parties info is incomplete
speaker_to_name: dict[str, str] = {}
transcript_text = ""
call_purpose = call_metadata["purpose"]
if call_purpose:
transcript_text += f"Call Description: {call_purpose}\n\n"
contents = transcript["transcript"]
for segment in contents:
speaker_id = segment.get("speakerId", "")
if speaker_id not in speaker_to_name:
if self.hide_user_info:
speaker_to_name[speaker_id] = (
f"User {len(speaker_to_name) + 1}"
)
else:
speaker_to_name[speaker_id] = id_to_name_map.get(
speaker_id, "Unknown"
)
speaker_name = speaker_to_name[speaker_id]
sentences = segment.get("sentences", {})
monolog = " ".join(
[sentence.get("text", "") for sentence in sentences]
)
transcript_text += f"{speaker_name}: {monolog}\n\n"
metadata = {}
if call_metadata.get("system"):
metadata["client"] = call_metadata.get("system")
# TODO calls have a clientUniqueId field, can pull that in later
doc_batch.append(
Document(
id=call_id,
sections=[
TextSection(link=call_metadata["url"], text=transcript_text)
],
source=DocumentSource.GONG,
# Should not ever be Untitled as a call cannot be made without a Title
semantic_identifier=call_title or "Untitled",
doc_updated_at=datetime.fromisoformat(call_time_str).astimezone(
timezone.utc
),
metadata={"client": call_metadata.get("system")},
)
)
num_calls += 1
yield doc_batch
logger.info(f"_fetch_calls finished: num_calls={num_calls}")
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
combined = (
f"{credentials['gong_access_key']}:{credentials['gong_access_key_secret']}"
)
self.auth_token_basic = base64.b64encode(combined.encode("utf-8")).decode(
"utf-8"
)
if self.auth_token_basic is None:
raise ConnectorMissingCredentialError("Gong")
self._session.headers.update(
{"Authorization": f"Basic {self.auth_token_basic}"}
)
return None
def load_from_state(self) -> GenerateDocumentsOutput:
return self._fetch_calls()
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
@staticmethod
def _compute_time_range(
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
) -> tuple[str, str]:
"""Compute the start/end datetime strings for the Gong API filter,
applying GONG_CONNECTOR_START_TIME and the 1-day offset."""
end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)
# if this env variable is set, don't start from a timestamp before the specified
# start time
# TODO: remove this once this is globally available
if GONG_CONNECTOR_START_TIME:
special_start_datetime = datetime.fromisoformat(GONG_CONNECTOR_START_TIME)
special_start_datetime = special_start_datetime.replace(tzinfo=timezone.utc)
@@ -394,11 +309,186 @@ class GongConnector(LoadConnector, PollConnector):
# so adding a 1 day buffer and fetching by default till current time
start_one_day_offset = start_datetime - timedelta(days=1)
start_time = start_one_day_offset.isoformat()
end_time = end_datetime.isoformat()
end_time = datetime.fromtimestamp(end, tz=timezone.utc).isoformat()
return start_time, end_time
logger.info(f"Fetching Gong calls between {start_time} and {end_time}")
return self._fetch_calls(start_time, end_time)
def _process_transcripts(
self,
transcripts: list[dict[str, Any]],
) -> Generator[Document | ConnectorFailure, None, None]:
"""Process a batch of transcripts into Documents or ConnectorFailures."""
transcript_call_ids = cast(
list[str],
[t.get("callId") for t in transcripts if t.get("callId")],
)
call_details_map = self._fetch_call_details_with_retry(transcript_call_ids)
for transcript in transcripts:
call_id = transcript.get("callId")
if not call_id or call_id not in call_details_map:
logger.error(f"Couldn't get call information for Call ID: {call_id}")
if call_id:
logger.error(
f"Call debug info: call_id={call_id} "
f"call_ids={transcript_call_ids} "
f"call_details_map={call_details_map.keys()}"
)
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=call_id or "unknown",
),
failure_message=f"Couldn't get call information for Call ID: {call_id}",
)
continue
call_details = call_details_map[call_id]
call_metadata = call_details["metaData"]
call_time_str = call_metadata["started"]
call_title = call_metadata["title"]
logger.info(
f"Indexing Gong call id {call_id} from {call_time_str.split('T', 1)[0]}: {call_title}"
)
call_parties = cast(list[dict] | None, call_details.get("parties"))
if call_parties is None:
logger.error(f"Couldn't get parties for Call ID: {call_id}")
call_parties = []
id_to_name_map = self._parse_parties(call_parties)
speaker_to_name: dict[str, str] = {}
transcript_text = ""
call_purpose = call_metadata["purpose"]
if call_purpose:
transcript_text += f"Call Description: {call_purpose}\n\n"
contents = transcript["transcript"]
for segment in contents:
speaker_id = segment.get("speakerId", "")
if speaker_id not in speaker_to_name:
if self.hide_user_info:
speaker_to_name[speaker_id] = f"User {len(speaker_to_name) + 1}"
else:
speaker_to_name[speaker_id] = id_to_name_map.get(
speaker_id, "Unknown"
)
speaker_name = speaker_to_name[speaker_id]
sentences = segment.get("sentences", {})
monolog = " ".join([sentence.get("text", "") for sentence in sentences])
transcript_text += f"{speaker_name}: {monolog}\n\n"
yield Document(
id=call_id,
sections=[TextSection(link=call_metadata["url"], text=transcript_text)],
source=DocumentSource.GONG,
semantic_identifier=call_title or "Untitled",
doc_updated_at=datetime.fromisoformat(call_time_str).astimezone(
timezone.utc
),
metadata={"client": call_metadata.get("system")},
)
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
combined = (
f"{credentials['gong_access_key']}:{credentials['gong_access_key_secret']}"
)
self.auth_token_basic = base64.b64encode(combined.encode("utf-8")).decode(
"utf-8"
)
if self.auth_token_basic is None:
raise ConnectorMissingCredentialError("Gong")
self._session.headers.update(
{"Authorization": f"Basic {self.auth_token_basic}"}
)
return None
def build_dummy_checkpoint(self) -> GongConnectorCheckpoint:
return GongConnectorCheckpoint(has_more=True)
def validate_checkpoint_json(self, checkpoint_json: str) -> GongConnectorCheckpoint:
return GongConnectorCheckpoint.model_validate_json(checkpoint_json)
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: GongConnectorCheckpoint,
) -> CheckpointOutput[GongConnectorCheckpoint]:
checkpoint = copy.deepcopy(checkpoint)
# Step 1: Resolve workspace IDs on first call
if checkpoint.workspace_ids is None:
checkpoint.workspace_ids = self._resolve_workspace_ids()
checkpoint.time_range = self._compute_time_range(start, end)
checkpoint.has_more = True
return checkpoint
workspace_ids = checkpoint.workspace_ids
# If we've exhausted all workspaces, we're done
if checkpoint.workspace_index >= len(workspace_ids):
checkpoint.has_more = False
return checkpoint
# Use cached time range, falling back to computation if not cached
start_time, end_time = checkpoint.time_range or self._compute_time_range(
start, end
)
logger.info(
f"Fetching Gong calls between {start_time} and {end_time} "
f"(workspace {checkpoint.workspace_index + 1}/{len(workspace_ids)})"
)
workspace_id = workspace_ids[checkpoint.workspace_index]
# Step 2: Fetch one page of transcripts
try:
page = self._fetch_transcript_page(
start_datetime=start_time,
end_datetime=end_time,
workspace_id=workspace_id,
cursor=checkpoint.cursor,
)
except _CursorExpiredError:
# Gong cursors TTL ~1h from first request in the sequence. If the
# checkpoint paused long enough for the cursor to expire, restart
# the current workspace from the beginning of the time range.
# Document upserts are idempotent (keyed by call_id) so
# reprocessing is safe.
logger.warning(
f"Gong pagination cursor expired for workspace "
f"{checkpoint.workspace_index + 1}/{len(workspace_ids)}; "
f"restarting workspace from beginning of time range."
)
checkpoint.cursor = None
checkpoint.has_more = True
return checkpoint
# Step 3: Process transcripts into documents
if page.transcripts:
yield from self._process_transcripts(page.transcripts)
# Step 4: Update checkpoint state
if page.next_cursor:
# More pages in this workspace
checkpoint.cursor = page.next_cursor
checkpoint.has_more = True
else:
# This workspace is exhausted — advance to next
checkpoint.workspace_index += 1
checkpoint.cursor = None
checkpoint.has_more = checkpoint.workspace_index < len(workspace_ids)
return checkpoint
if __name__ == "__main__":
@@ -412,5 +502,13 @@ if __name__ == "__main__":
}
)
latest_docs = connector.load_from_state()
print(next(latest_docs))
checkpoint = connector.build_dummy_checkpoint()
while checkpoint.has_more:
doc_generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
item = next(doc_generator)
print(item)
except StopIteration as e:
checkpoint = e.value
print(f"Checkpoint: {checkpoint}")

View File

@@ -18,7 +18,7 @@ from urllib.parse import urlunparse
from google.auth.exceptions import RefreshError
from google.oauth2.credentials import Credentials as OAuthCredentials
from google.oauth2.service_account import Credentials as ServiceAccountCredentials
from googleapiclient.errors import HttpError # type: ignore
from googleapiclient.errors import HttpError
from typing_extensions import override
from onyx.access.models import ExternalAccess
@@ -434,7 +434,7 @@ class GoogleDriveConnector(
for is_admin in [True, False]:
query = "isAdmin=true" if is_admin else "isAdmin=false"
for user in execute_paginated_retrieval(
retrieval_function=admin_service.users().list,
retrieval_function=admin_service.users().list, # ty: ignore[unresolved-attribute]
list_key="users",
fields=USER_FIELDS,
domain=self.google_domain,
@@ -502,6 +502,9 @@ class GoogleDriveConnector(
files: list[RetrievedDriveFile],
seen_hierarchy_node_raw_ids: ThreadSafeSet[str],
fully_walked_hierarchy_node_raw_ids: ThreadSafeSet[str],
failed_folder_ids_by_email: (
ThreadSafeDict[str, ThreadSafeSet[str]] | None
) = None,
permission_sync_context: PermissionSyncContext | None = None,
add_prefix: bool = False,
) -> list[HierarchyNode]:
@@ -525,6 +528,9 @@ class GoogleDriveConnector(
seen_hierarchy_node_raw_ids: Set of already-yielded node IDs (modified in place)
fully_walked_hierarchy_node_raw_ids: Set of node IDs where the walk to root
succeeded (modified in place)
failed_folder_ids_by_email: Map of email → folder IDs where that email
previously confirmed no accessible parent. Skips the API call if the same
(folder, email) is encountered again (modified in place).
permission_sync_context: If provided, permissions will be fetched for hierarchy nodes.
Contains google_domain and primary_admin_email needed for permission syncing.
add_prefix: When True, prefix group IDs with source type (for indexing path).
@@ -569,7 +575,7 @@ class GoogleDriveConnector(
# Fetch folder metadata
folder = self._get_folder_metadata(
current_id, file.user_email, field_type
current_id, file.user_email, field_type, failed_folder_ids_by_email
)
if not folder:
# Can't access this folder - stop climbing
@@ -653,7 +659,13 @@ class GoogleDriveConnector(
return new_nodes
def _get_folder_metadata(
self, folder_id: str, retriever_email: str, field_type: DriveFileFieldType
self,
folder_id: str,
retriever_email: str,
field_type: DriveFileFieldType,
failed_folder_ids_by_email: (
ThreadSafeDict[str, ThreadSafeSet[str]] | None
) = None,
) -> GoogleDriveFileType | None:
"""
Fetch metadata for a folder by ID.
@@ -667,6 +679,17 @@ class GoogleDriveConnector(
# Use a set to deduplicate if retriever_email == primary_admin_email
for email in {retriever_email, self.primary_admin_email}:
failed_ids = (
failed_folder_ids_by_email.get(email)
if failed_folder_ids_by_email
else None
)
if failed_ids and folder_id in failed_ids:
logger.debug(
f"Skipping folder {folder_id} using {email} (previously confirmed no parents)"
)
continue
service = get_drive_service(self.creds, email)
folder = get_folder_metadata(service, folder_id, field_type)
@@ -682,6 +705,10 @@ class GoogleDriveConnector(
# Folder has no parents - could be a root OR user lacks access to parent
# Keep this as a fallback but try admin to see if they can see parents
if failed_folder_ids_by_email is not None:
failed_folder_ids_by_email.setdefault(email, ThreadSafeSet()).add(
folder_id
)
if best_folder is None:
best_folder = folder
logger.debug(
@@ -719,7 +746,7 @@ class GoogleDriveConnector(
)
all_drive_ids: set[str] = set()
for drive in execute_paginated_retrieval(
retrieval_function=drive_service.drives().list,
retrieval_function=drive_service.drives().list, # ty: ignore[unresolved-attribute]
list_key="drives",
useDomainAdminAccess=is_service_account,
fields="drives(id),nextPageToken",
@@ -907,7 +934,9 @@ class GoogleDriveConnector(
# resume from a checkpoint
if resuming and (drive_id := curr_stage.current_folder_or_drive_id):
resume_start = curr_stage.completed_until
for file_or_token in _yield_from_drive(drive_id, resume_start):
for file_or_token in _yield_from_drive(
drive_id, resume_start # ty: ignore[possibly-unresolved-reference]
):
if isinstance(file_or_token, str):
checkpoint.completion_map[user_email].next_page_token = (
file_or_token
@@ -1088,6 +1117,13 @@ class GoogleDriveConnector(
]
yield from parallel_yield(user_retrieval_gens, max_workers=MAX_DRIVE_WORKERS)
# Free per-user cache entries now that this batch is done.
# Skip the admin email — it is shared across all user batches and must
# persist for the duration of the run.
for email in non_completed_org_emails:
if email != self.primary_admin_email:
checkpoint.failed_folder_ids_by_email.pop(email, None)
# if there are more emails to process, don't mark as complete
if not email_batch_takes_us_to_completion:
return
@@ -1302,7 +1338,9 @@ class GoogleDriveConnector(
resume_start = checkpoint.completion_map[
self.primary_admin_email
].completed_until
yield from _yield_from_folder_crawl(folder_id, resume_start)
yield from _yield_from_folder_crawl(
folder_id, resume_start # ty: ignore[possibly-unresolved-reference]
)
# the times stored in the completion_map aren't used due to the crawling behavior
# instead, the traversed_parent_ids are used to determine what we have left to retrieve
@@ -1542,6 +1580,7 @@ class GoogleDriveConnector(
files=files_batch,
seen_hierarchy_node_raw_ids=checkpoint.seen_hierarchy_node_raw_ids,
fully_walked_hierarchy_node_raw_ids=checkpoint.fully_walked_hierarchy_node_raw_ids,
failed_folder_ids_by_email=checkpoint.failed_folder_ids_by_email,
permission_sync_context=permission_sync_context,
add_prefix=True,
)
@@ -1778,6 +1817,7 @@ class GoogleDriveConnector(
files=files_batch,
seen_hierarchy_node_raw_ids=checkpoint.seen_hierarchy_node_raw_ids,
fully_walked_hierarchy_node_raw_ids=checkpoint.fully_walked_hierarchy_node_raw_ids,
failed_folder_ids_by_email=checkpoint.failed_folder_ids_by_email,
permission_sync_context=permission_sync_context,
)
@@ -1883,7 +1923,9 @@ class GoogleDriveConnector(
try:
drive_service = get_drive_service(self._creds, self._primary_admin_email)
drive_service.files().list(pageSize=1, fields="files(id)").execute()
drive_service.files().list( # ty: ignore[unresolved-attribute]
pageSize=1, fields="files(id)"
).execute()
if isinstance(self._creds, ServiceAccountCredentials):
# default is ~17mins of retries, don't do that here since this is called from

Some files were not shown because too many files have changed in this diff Show More