Compare commits

...

11 Commits

44 changed files with 2078 additions and 946 deletions

View File

@@ -462,7 +462,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -472,7 +472,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./web
file: ./web/Dockerfile
@@ -536,7 +536,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -546,7 +546,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./web
file: ./web/Dockerfile
@@ -597,7 +597,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -676,7 +676,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -686,7 +686,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./web
file: ./web/Dockerfile
@@ -761,7 +761,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -771,7 +771,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./web
file: ./web/Dockerfile
@@ -833,7 +833,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -908,7 +908,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -918,7 +918,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -981,7 +981,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -991,7 +991,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -1041,7 +1041,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -1119,7 +1119,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -1129,7 +1129,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -1192,7 +1192,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -1202,7 +1202,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -1253,7 +1253,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -1329,7 +1329,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
with:
buildkitd-flags: ${{ vars.DOCKER_DEBUG == 'true' && '--debug' || '' }}
@@ -1341,7 +1341,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
env:
DEBUG: ${{ vars.DOCKER_DEBUG == 'true' && 1 || 0 }}
with:
@@ -1409,7 +1409,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
with:
buildkitd-flags: ${{ vars.DOCKER_DEBUG == 'true' && '--debug' || '' }}
@@ -1421,7 +1421,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
env:
DEBUG: ${{ vars.DOCKER_DEBUG == 'true' && 1 || 0 }}
with:
@@ -1475,7 +1475,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3

View File

@@ -21,7 +21,7 @@ jobs:
timeout-minutes: 45
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3

View File

@@ -21,7 +21,7 @@ jobs:
timeout-minutes: 45
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3

View File

@@ -115,7 +115,7 @@ jobs:
echo "cache-suffix=${CACHE_SUFFIX}" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling Vespa, Redis, Postgres, and Minio images
# otherwise, we hit the "Unauthenticated users" limit
@@ -127,7 +127,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Backend Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -175,7 +175,7 @@ jobs:
echo "cache-suffix=${CACHE_SUFFIX}" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling Vespa, Redis, Postgres, and Minio images
# otherwise, we hit the "Unauthenticated users" limit
@@ -187,7 +187,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Model Server Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile.model_server
@@ -220,7 +220,7 @@ jobs:
persist-credentials: false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling openapitools/openapi-generator-cli
# otherwise, we hit the "Unauthenticated users" limit

View File

@@ -94,7 +94,7 @@ jobs:
echo "cache-suffix=${CACHE_SUFFIX}" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling external images otherwise, we hit the "Unauthenticated users" limit
# https://docs.docker.com/docker-hub/usage/
@@ -105,7 +105,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Web Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./web
file: ./web/Dockerfile
@@ -155,7 +155,7 @@ jobs:
echo "cache-suffix=${CACHE_SUFFIX}" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling external images otherwise, we hit the "Unauthenticated users" limit
# https://docs.docker.com/docker-hub/usage/
@@ -166,7 +166,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Backend Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile
@@ -216,7 +216,7 @@ jobs:
echo "cache-suffix=${CACHE_SUFFIX}" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
# needed for pulling external images otherwise, we hit the "Unauthenticated users" limit
# https://docs.docker.com/docker-hub/usage/
@@ -227,7 +227,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and push Model Server Docker image
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend
file: ./backend/Dockerfile.model_server

View File

@@ -69,7 +69,7 @@ jobs:
password: ${{ secrets.DOCKER_TOKEN }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Build and load
uses: docker/bake-action@82490499d2e5613fcead7e128237ef0b0ea210f7 # ratchet:docker/bake-action@v7.0.0

View File

@@ -132,7 +132,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -142,7 +142,7 @@ jobs:
- name: Build and push AMD64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend/onyx/server/features/build/sandbox/kubernetes/docker
file: ./backend/onyx/server/features/build/sandbox/kubernetes/docker/Dockerfile
@@ -202,7 +202,7 @@ jobs:
latest=false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3
@@ -212,7 +212,7 @@ jobs:
- name: Build and push ARM64
id: build
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # ratchet:docker/build-push-action@v6
uses: docker/build-push-action@bcafcacb16a39f128d818304e6c9c0c18556b85f # ratchet:docker/build-push-action@v7
with:
context: ./backend/onyx/server/features/build/sandbox/kubernetes/docker
file: ./backend/onyx/server/features/build/sandbox/kubernetes/docker/Dockerfile
@@ -258,7 +258,7 @@ jobs:
parse-json-secrets: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # ratchet:docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd # ratchet:docker/setup-buildx-action@v4
- name: Login to Docker Hub
uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # ratchet:docker/login-action@v3

View File

@@ -0,0 +1,27 @@
"""Add file_id to documents
Revision ID: 91d150c361f6
Revises: d129f37b3d87
Create Date: 2026-04-16 15:43:30.314823
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "91d150c361f6"
down_revision = "d129f37b3d87"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"document",
sa.Column("file_id", sa.String(), nullable=True),
)
def downgrade() -> None:
op.drop_column("document", "file_id")

View File

@@ -166,16 +166,21 @@ def check_for_connector_deletion_task(self: Task, *, tenant_id: str) -> bool | N
r.set(OnyxRedisSignals.BLOCK_VALIDATE_CONNECTOR_DELETION_FENCES, 1, ex=300)
# collect cc_pair_ids
# collect cc_pair_ids and note whether any are in DELETING status
cc_pair_ids: list[int] = []
has_deleting_cc_pair = False
with get_session_with_current_tenant() as db_session:
cc_pairs = get_connector_credential_pairs(db_session)
for cc_pair in cc_pairs:
cc_pair_ids.append(cc_pair.id)
if cc_pair.status == ConnectorCredentialPairStatus.DELETING:
has_deleting_cc_pair = True
# Tenant-work-gating hook: any cc_pair means deletion could have
# cleanup work to do for this tenant on some cycle.
if cc_pair_ids:
# Tenant-work-gating hook: mark only when at least one cc_pair is in
# DELETING status. Marking on bare cc_pair existence would keep
# nearly every tenant in the active set since most have cc_pairs
# but almost none are actively being deleted on any given cycle.
if has_deleting_cc_pair:
maybe_mark_tenant_active(tenant_id)
# try running cleanup on the cc_pair_ids

View File

@@ -897,11 +897,6 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
secondary_cc_pair_ids = standard_cc_pair_ids
# Tenant-work-gating hook: refresh this tenant's active-set membership
# whenever indexing actually has work to dispatch.
if primary_cc_pair_ids or secondary_cc_pair_ids:
maybe_mark_tenant_active(tenant_id)
# Flag CC pairs in repeated error state for primary/current search settings
with get_session_with_current_tenant() as db_session:
for cc_pair_id in primary_cc_pair_ids:
@@ -1019,6 +1014,14 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
f"Skipping secondary indexing: switchover_type=INSTANT for search_settings={secondary_search_settings.id}"
)
# Tenant-work-gating hook: refresh membership only when indexing
# actually dispatched at least one docfetching task. `_kickoff_indexing_tasks`
# internally calls `should_index()` to decide per-cc_pair; using
# `tasks_created > 0` here gives us a "real work was done" signal
# rather than just "tenant has a cc_pair somewhere."
if tasks_created > 0:
maybe_mark_tenant_active(tenant_id)
# 2/3: VALIDATE
# Check for inconsistent index attempts - active attempts without task IDs
# This can happen if attempt creation fails partway through

View File

@@ -229,11 +229,7 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
for cc_pair_entry in cc_pairs:
cc_pair_ids.append(cc_pair_entry.id)
# Tenant-work-gating hook: any cc_pair means pruning could have
# work to do for this tenant on some cycle.
if cc_pair_ids:
maybe_mark_tenant_active(tenant_id)
prune_dispatched = False
for cc_pair_id in cc_pair_ids:
lock_beat.reacquire()
with get_session_with_current_tenant() as db_session:
@@ -256,9 +252,18 @@ def check_for_pruning(self: Task, *, tenant_id: str) -> bool | None:
logger.info(f"Pruning not created: {cc_pair_id}")
continue
prune_dispatched = True
task_logger.info(
f"Pruning queued: cc_pair={cc_pair.id} id={payload_id}"
)
# Tenant-work-gating hook: mark only when at least one cc_pair
# was actually due for pruning AND a prune task was dispatched.
# Marking on bare cc_pair existence over-counts the population
# since most tenants have cc_pairs but almost none are due on
# any given cycle.
if prune_dispatched:
maybe_mark_tenant_active(tenant_id)
r.set(OnyxRedisSignals.BLOCK_PRUNING, 1, ex=_get_pruning_block_expiration())
# we want to run this less frequently than the overall task

View File

@@ -826,6 +826,12 @@ def translate_history_to_llm_format(
base64_data = img_file.to_base64()
image_url = f"data:{image_type};base64,{base64_data}"
content_parts.append(
TextContentPart(
type="text",
text=f"[attached image — file_id: {img_file.file_id}]",
)
)
image_part = ImageContentPart(
type="image_url",
image_url=ImageUrlDetail(

View File

@@ -1,4 +1,5 @@
import base64
import copy
import time
from collections.abc import Generator
from datetime import datetime
@@ -8,27 +9,58 @@ from typing import Any
from typing import cast
import requests
from pydantic import BaseModel
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from onyx.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
from onyx.configs.app_configs import GONG_CONNECTOR_START_TIME
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import CheckpointedConnector
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import TextSection
from onyx.utils.logger import setup_logger
logger = setup_logger()
class GongConnector(LoadConnector, PollConnector):
class GongConnectorCheckpoint(ConnectorCheckpoint):
# Resolved workspace IDs to iterate through.
# None means "not yet resolved" — first checkpoint call resolves them.
# Inner None means "no workspace filter" (fetch all).
workspace_ids: list[str | None] | None = None
# Index into workspace_ids for current workspace
workspace_index: int = 0
# Gong API cursor for current workspace's transcript pagination
cursor: str | None = None
# Cached time range — computed once, reused across checkpoint calls
time_range: tuple[str, str] | None = None
class _TranscriptPage(BaseModel):
"""One page of transcripts from /v2/calls/transcript."""
transcripts: list[dict[str, Any]]
next_cursor: str | None = None
class _CursorExpiredError(Exception):
"""Raised when Gong rejects a pagination cursor as expired.
Gong pagination cursors TTL is ~1 hour from the first request in a
pagination sequence, not from the last cursor fetch. Since checkpointed
connector runs can pause between invocations, a resumed run may encounter
an expired cursor and must restart the current workspace from scratch.
See https://visioneers.gong.io/integrations-77/pagination-cursor-expires-after-1-hours-even-for-a-new-cursor-1382
"""
class GongConnector(CheckpointedConnector[GongConnectorCheckpoint]):
BASE_URL = "https://api.gong.io"
MAX_CALL_DETAILS_ATTEMPTS = 6
CALL_DETAILS_DELAY = 30 # in seconds
@@ -38,13 +70,9 @@ class GongConnector(LoadConnector, PollConnector):
def __init__(
self,
workspaces: list[str] | None = None,
batch_size: int = INDEX_BATCH_SIZE,
continue_on_fail: bool = CONTINUE_ON_CONNECTOR_FAILURE,
hide_user_info: bool = False,
) -> None:
self.workspaces = workspaces
self.batch_size: int = batch_size
self.continue_on_fail = continue_on_fail
self.auth_token_basic: str | None = None
self.hide_user_info = hide_user_info
self._last_request_time: float = 0.0
@@ -98,67 +126,50 @@ class GongConnector(LoadConnector, PollConnector):
# Then the user input is treated as the name
return {**id_id_map, **name_id_map}
def _get_transcript_batches(
self, start_datetime: str | None = None, end_datetime: str | None = None
) -> Generator[list[dict[str, Any]], None, None]:
body: dict[str, dict] = {"filter": {}}
def _fetch_transcript_page(
self,
start_datetime: str | None,
end_datetime: str | None,
workspace_id: str | None,
cursor: str | None,
) -> _TranscriptPage:
"""Fetch one page of transcripts from the Gong API.
Raises _CursorExpiredError if Gong reports the pagination cursor
expired (TTL is ~1 hour from first request in the pagination sequence).
"""
body: dict[str, Any] = {"filter": {}}
if start_datetime:
body["filter"]["fromDateTime"] = start_datetime
if end_datetime:
body["filter"]["toDateTime"] = end_datetime
if workspace_id:
body["filter"]["workspaceId"] = workspace_id
if cursor:
body["cursor"] = cursor
# The batch_ids in the previous method appears to be batches of call_ids to process
# In this method, we will retrieve transcripts for them in batches.
transcripts: list[dict[str, Any]] = []
workspace_list = self.workspaces or [None]
workspace_map = self._get_workspace_id_map() if self.workspaces else {}
response = self._throttled_request(
"POST", GongConnector.make_url("/v2/calls/transcript"), json=body
)
# If no calls in the range, return empty
if response.status_code == 404:
return _TranscriptPage(transcripts=[])
for workspace in workspace_list:
if workspace:
logger.info(f"Updating Gong workspace: {workspace}")
workspace_id = workspace_map.get(workspace)
if not workspace_id:
logger.error(f"Invalid Gong workspace: {workspace}")
if not self.continue_on_fail:
raise ValueError(f"Invalid workspace: {workspace}")
continue
body["filter"]["workspaceId"] = workspace_id
else:
if "workspaceId" in body["filter"]:
del body["filter"]["workspaceId"]
if not response.ok:
# Cursor expiration comes back as a 4xx with this error message —
# detect it before raise_for_status so callers can restart the workspace.
if cursor and "cursor has expired" in response.text.lower():
raise _CursorExpiredError(response.text)
logger.error(f"Error fetching transcripts: {response.text}")
response.raise_for_status()
while True:
response = self._throttled_request(
"POST", GongConnector.make_url("/v2/calls/transcript"), json=body
)
# If no calls in the range, just break out
if response.status_code == 404:
break
data = response.json()
return _TranscriptPage(
transcripts=data.get("callTranscripts", []),
next_cursor=data.get("records", {}).get("cursor"),
)
try:
response.raise_for_status()
except Exception:
logger.error(f"Error fetching transcripts: {response.text}")
raise
data = response.json()
call_transcripts = data.get("callTranscripts", [])
transcripts.extend(call_transcripts)
while len(transcripts) >= self.batch_size:
yield transcripts[: self.batch_size]
transcripts = transcripts[self.batch_size :]
cursor = data.get("records", {}).get("cursor")
if cursor:
body["cursor"] = cursor
else:
break
if transcripts:
yield transcripts
def _get_call_details_by_ids(self, call_ids: list[str]) -> dict:
def _get_call_details_by_ids(self, call_ids: list[str]) -> dict[str, Any]:
body = {
"filter": {"callIds": call_ids},
"contentSelector": {"exposedFields": {"parties": True}},
@@ -176,6 +187,50 @@ class GongConnector(LoadConnector, PollConnector):
return call_to_metadata
def _fetch_call_details_with_retry(self, call_ids: list[str]) -> dict[str, Any]:
"""Fetch call details with retry for the Gong API race condition.
The Gong API has a known race where transcript call IDs don't immediately
appear in /v2/calls/extensive. Retries with exponential backoff, only
re-requesting the missing IDs on each attempt.
"""
call_details_map = self._get_call_details_by_ids(call_ids)
if set(call_ids) == set(call_details_map.keys()):
return call_details_map
for attempt in range(2, self.MAX_CALL_DETAILS_ATTEMPTS + 1):
missing_ids = list(set(call_ids) - set(call_details_map.keys()))
logger.warning(
f"_get_call_details_by_ids is missing call id's: current_attempt={attempt - 1} missing_call_ids={missing_ids}"
)
wait_seconds = self.CALL_DETAILS_DELAY * pow(2, attempt - 2)
logger.warning(
f"_get_call_details_by_ids waiting to retry: "
f"wait={wait_seconds}s "
f"current_attempt={attempt - 1} "
f"next_attempt={attempt} "
f"max_attempts={self.MAX_CALL_DETAILS_ATTEMPTS}"
)
time.sleep(wait_seconds)
# Only re-fetch the missing IDs, merge into existing results
new_details = self._get_call_details_by_ids(missing_ids)
call_details_map.update(new_details)
if set(call_ids) == set(call_details_map.keys()):
return call_details_map
missing_ids = list(set(call_ids) - set(call_details_map.keys()))
logger.error(
f"Giving up on missing call id's after "
f"{self.MAX_CALL_DETAILS_ATTEMPTS} attempts: "
f"missing_call_ids={missing_ids}"
f"proceeding with {len(call_details_map)} of "
f"{len(call_ids)} calls"
)
return call_details_map
@staticmethod
def _parse_parties(parties: list[dict]) -> dict[str, str]:
id_mapping = {}
@@ -196,186 +251,46 @@ class GongConnector(LoadConnector, PollConnector):
return id_mapping
def _fetch_calls(
self, start_datetime: str | None = None, end_datetime: str | None = None
) -> GenerateDocumentsOutput:
num_calls = 0
def _resolve_workspace_ids(self) -> list[str | None]:
"""Resolve configured workspace names/IDs to actual workspace IDs.
for transcript_batch in self._get_transcript_batches(
start_datetime, end_datetime
):
doc_batch: list[Document | HierarchyNode] = []
Returns a list of workspace IDs. If no workspaces are configured,
returns [None] to indicate "fetch all workspaces".
transcript_call_ids = cast(
list[str],
[t.get("callId") for t in transcript_batch if t.get("callId")],
Raises ValueError if workspaces are configured but none resolve —
we never silently widen scope to "fetch all" on misconfiguration,
because that could ingest an entire Gong account by mistake.
"""
if not self.workspaces:
return [None]
workspace_map = self._get_workspace_id_map()
resolved: list[str | None] = []
for workspace in self.workspaces:
workspace_id = workspace_map.get(workspace)
if not workspace_id:
logger.error(f"Invalid Gong workspace: {workspace}")
continue
resolved.append(workspace_id)
if not resolved:
raise ValueError(
f"No valid Gong workspaces found — check workspace names/IDs in connector config. Configured: {self.workspaces}"
)
call_details_map: dict[str, Any] = {}
return resolved
# There's a likely race condition in the API where a transcript will have a
# call id but the call to v2/calls/extensive will not return all of the id's
# retry with exponential backoff has been observed to mitigate this
# in ~2 minutes. After max attempts, proceed with whatever we have —
# the per-call loop below will skip missing IDs gracefully.
current_attempt = 0
while True:
current_attempt += 1
call_details_map = self._get_call_details_by_ids(transcript_call_ids)
if set(transcript_call_ids) == set(call_details_map.keys()):
# we got all the id's we were expecting ... break and continue
break
# we are missing some id's. Log and retry with exponential backoff
missing_call_ids = set(transcript_call_ids) - set(
call_details_map.keys()
)
logger.warning(
f"_get_call_details_by_ids is missing call id's: "
f"current_attempt={current_attempt} "
f"missing_call_ids={missing_call_ids}"
)
if current_attempt >= self.MAX_CALL_DETAILS_ATTEMPTS:
logger.error(
f"Giving up on missing call id's after "
f"{self.MAX_CALL_DETAILS_ATTEMPTS} attempts: "
f"missing_call_ids={missing_call_ids}"
f"proceeding with {len(call_details_map)} of "
f"{len(transcript_call_ids)} calls"
)
break
wait_seconds = self.CALL_DETAILS_DELAY * pow(2, current_attempt - 1)
logger.warning(
f"_get_call_details_by_ids waiting to retry: "
f"wait={wait_seconds}s "
f"current_attempt={current_attempt} "
f"next_attempt={current_attempt + 1} "
f"max_attempts={self.MAX_CALL_DETAILS_ATTEMPTS}"
)
time.sleep(wait_seconds)
# now we can iterate per call/transcript
for transcript in transcript_batch:
call_id = transcript.get("callId")
if not call_id or call_id not in call_details_map:
# NOTE(rkuo): seeing odd behavior where call_ids from the transcript
# don't have call details. adding error debugging logs to trace.
logger.error(
f"Couldn't get call information for Call ID: {call_id}"
)
if call_id:
logger.error(
f"Call debug info: call_id={call_id} "
f"call_ids={transcript_call_ids} "
f"call_details_map={call_details_map.keys()}"
)
if not self.continue_on_fail:
raise RuntimeError(
f"Couldn't get call information for Call ID: {call_id}"
)
continue
call_details = call_details_map[call_id]
call_metadata = call_details["metaData"]
call_time_str = call_metadata["started"]
call_title = call_metadata["title"]
logger.info(
f"{num_calls + 1}: Indexing Gong call id {call_id} from {call_time_str.split('T', 1)[0]}: {call_title}"
)
call_parties = cast(list[dict] | None, call_details.get("parties"))
if call_parties is None:
logger.error(f"Couldn't get parties for Call ID: {call_id}")
call_parties = []
id_to_name_map = self._parse_parties(call_parties)
# Keeping a separate dict here in case the parties info is incomplete
speaker_to_name: dict[str, str] = {}
transcript_text = ""
call_purpose = call_metadata["purpose"]
if call_purpose:
transcript_text += f"Call Description: {call_purpose}\n\n"
contents = transcript["transcript"]
for segment in contents:
speaker_id = segment.get("speakerId", "")
if speaker_id not in speaker_to_name:
if self.hide_user_info:
speaker_to_name[speaker_id] = (
f"User {len(speaker_to_name) + 1}"
)
else:
speaker_to_name[speaker_id] = id_to_name_map.get(
speaker_id, "Unknown"
)
speaker_name = speaker_to_name[speaker_id]
sentences = segment.get("sentences", {})
monolog = " ".join(
[sentence.get("text", "") for sentence in sentences]
)
transcript_text += f"{speaker_name}: {monolog}\n\n"
metadata = {}
if call_metadata.get("system"):
metadata["client"] = call_metadata.get("system")
# TODO calls have a clientUniqueId field, can pull that in later
doc_batch.append(
Document(
id=call_id,
sections=[
TextSection(link=call_metadata["url"], text=transcript_text)
],
source=DocumentSource.GONG,
# Should not ever be Untitled as a call cannot be made without a Title
semantic_identifier=call_title or "Untitled",
doc_updated_at=datetime.fromisoformat(call_time_str).astimezone(
timezone.utc
),
metadata={"client": call_metadata.get("system")},
)
)
num_calls += 1
yield doc_batch
logger.info(f"_fetch_calls finished: num_calls={num_calls}")
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
combined = (
f"{credentials['gong_access_key']}:{credentials['gong_access_key_secret']}"
)
self.auth_token_basic = base64.b64encode(combined.encode("utf-8")).decode(
"utf-8"
)
if self.auth_token_basic is None:
raise ConnectorMissingCredentialError("Gong")
self._session.headers.update(
{"Authorization": f"Basic {self.auth_token_basic}"}
)
return None
def load_from_state(self) -> GenerateDocumentsOutput:
return self._fetch_calls()
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
@staticmethod
def _compute_time_range(
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
) -> tuple[str, str]:
"""Compute the start/end datetime strings for the Gong API filter,
applying GONG_CONNECTOR_START_TIME and the 1-day offset."""
end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)
# if this env variable is set, don't start from a timestamp before the specified
# start time
# TODO: remove this once this is globally available
if GONG_CONNECTOR_START_TIME:
special_start_datetime = datetime.fromisoformat(GONG_CONNECTOR_START_TIME)
special_start_datetime = special_start_datetime.replace(tzinfo=timezone.utc)
@@ -394,11 +309,186 @@ class GongConnector(LoadConnector, PollConnector):
# so adding a 1 day buffer and fetching by default till current time
start_one_day_offset = start_datetime - timedelta(days=1)
start_time = start_one_day_offset.isoformat()
end_time = end_datetime.isoformat()
end_time = datetime.fromtimestamp(end, tz=timezone.utc).isoformat()
return start_time, end_time
logger.info(f"Fetching Gong calls between {start_time} and {end_time}")
return self._fetch_calls(start_time, end_time)
def _process_transcripts(
self,
transcripts: list[dict[str, Any]],
) -> Generator[Document | ConnectorFailure, None, None]:
"""Process a batch of transcripts into Documents or ConnectorFailures."""
transcript_call_ids = cast(
list[str],
[t.get("callId") for t in transcripts if t.get("callId")],
)
call_details_map = self._fetch_call_details_with_retry(transcript_call_ids)
for transcript in transcripts:
call_id = transcript.get("callId")
if not call_id or call_id not in call_details_map:
logger.error(f"Couldn't get call information for Call ID: {call_id}")
if call_id:
logger.error(
f"Call debug info: call_id={call_id} "
f"call_ids={transcript_call_ids} "
f"call_details_map={call_details_map.keys()}"
)
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=call_id or "unknown",
),
failure_message=f"Couldn't get call information for Call ID: {call_id}",
)
continue
call_details = call_details_map[call_id]
call_metadata = call_details["metaData"]
call_time_str = call_metadata["started"]
call_title = call_metadata["title"]
logger.info(
f"Indexing Gong call id {call_id} from {call_time_str.split('T', 1)[0]}: {call_title}"
)
call_parties = cast(list[dict] | None, call_details.get("parties"))
if call_parties is None:
logger.error(f"Couldn't get parties for Call ID: {call_id}")
call_parties = []
id_to_name_map = self._parse_parties(call_parties)
speaker_to_name: dict[str, str] = {}
transcript_text = ""
call_purpose = call_metadata["purpose"]
if call_purpose:
transcript_text += f"Call Description: {call_purpose}\n\n"
contents = transcript["transcript"]
for segment in contents:
speaker_id = segment.get("speakerId", "")
if speaker_id not in speaker_to_name:
if self.hide_user_info:
speaker_to_name[speaker_id] = f"User {len(speaker_to_name) + 1}"
else:
speaker_to_name[speaker_id] = id_to_name_map.get(
speaker_id, "Unknown"
)
speaker_name = speaker_to_name[speaker_id]
sentences = segment.get("sentences", {})
monolog = " ".join([sentence.get("text", "") for sentence in sentences])
transcript_text += f"{speaker_name}: {monolog}\n\n"
yield Document(
id=call_id,
sections=[TextSection(link=call_metadata["url"], text=transcript_text)],
source=DocumentSource.GONG,
semantic_identifier=call_title or "Untitled",
doc_updated_at=datetime.fromisoformat(call_time_str).astimezone(
timezone.utc
),
metadata={"client": call_metadata.get("system")},
)
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
combined = (
f"{credentials['gong_access_key']}:{credentials['gong_access_key_secret']}"
)
self.auth_token_basic = base64.b64encode(combined.encode("utf-8")).decode(
"utf-8"
)
if self.auth_token_basic is None:
raise ConnectorMissingCredentialError("Gong")
self._session.headers.update(
{"Authorization": f"Basic {self.auth_token_basic}"}
)
return None
def build_dummy_checkpoint(self) -> GongConnectorCheckpoint:
return GongConnectorCheckpoint(has_more=True)
def validate_checkpoint_json(self, checkpoint_json: str) -> GongConnectorCheckpoint:
return GongConnectorCheckpoint.model_validate_json(checkpoint_json)
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: GongConnectorCheckpoint,
) -> CheckpointOutput[GongConnectorCheckpoint]:
checkpoint = copy.deepcopy(checkpoint)
# Step 1: Resolve workspace IDs on first call
if checkpoint.workspace_ids is None:
checkpoint.workspace_ids = self._resolve_workspace_ids()
checkpoint.time_range = self._compute_time_range(start, end)
checkpoint.has_more = True
return checkpoint
workspace_ids = checkpoint.workspace_ids
# If we've exhausted all workspaces, we're done
if checkpoint.workspace_index >= len(workspace_ids):
checkpoint.has_more = False
return checkpoint
# Use cached time range, falling back to computation if not cached
start_time, end_time = checkpoint.time_range or self._compute_time_range(
start, end
)
logger.info(
f"Fetching Gong calls between {start_time} and {end_time} "
f"(workspace {checkpoint.workspace_index + 1}/{len(workspace_ids)})"
)
workspace_id = workspace_ids[checkpoint.workspace_index]
# Step 2: Fetch one page of transcripts
try:
page = self._fetch_transcript_page(
start_datetime=start_time,
end_datetime=end_time,
workspace_id=workspace_id,
cursor=checkpoint.cursor,
)
except _CursorExpiredError:
# Gong cursors TTL ~1h from first request in the sequence. If the
# checkpoint paused long enough for the cursor to expire, restart
# the current workspace from the beginning of the time range.
# Document upserts are idempotent (keyed by call_id) so
# reprocessing is safe.
logger.warning(
f"Gong pagination cursor expired for workspace "
f"{checkpoint.workspace_index + 1}/{len(workspace_ids)}; "
f"restarting workspace from beginning of time range."
)
checkpoint.cursor = None
checkpoint.has_more = True
return checkpoint
# Step 3: Process transcripts into documents
if page.transcripts:
yield from self._process_transcripts(page.transcripts)
# Step 4: Update checkpoint state
if page.next_cursor:
# More pages in this workspace
checkpoint.cursor = page.next_cursor
checkpoint.has_more = True
else:
# This workspace is exhausted — advance to next
checkpoint.workspace_index += 1
checkpoint.cursor = None
checkpoint.has_more = checkpoint.workspace_index < len(workspace_ids)
return checkpoint
if __name__ == "__main__":
@@ -412,5 +502,13 @@ if __name__ == "__main__":
}
)
latest_docs = connector.load_from_state()
print(next(latest_docs))
checkpoint = connector.build_dummy_checkpoint()
while checkpoint.has_more:
doc_generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
item = next(doc_generator)
print(item)
except StopIteration as e:
checkpoint = e.value
print(f"Checkpoint: {checkpoint}")

View File

@@ -952,6 +952,7 @@ class Document(Base):
semantic_id: Mapped[str] = mapped_column(NullFilteredString)
# First Section's link
link: Mapped[str | None] = mapped_column(NullFilteredString, nullable=True)
file_id: Mapped[str | None] = mapped_column(String, nullable=True)
# The updated time is also used as a measure of the last successful state of the doc
# pulled from the source (to help skip reindexing already updated docs in case of

View File

@@ -0,0 +1,112 @@
from collections import Counter
from datetime import date
from itertools import zip_longest
from dateutil.parser import parse as parse_dt
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from onyx.utils.csv_utils import ParsedRow
CATEGORICAL_DISTINCT_THRESHOLD = 20
ID_NAME_TOKENS = {"id", "uuid", "uid", "guid", "key"}
class SheetAnalysis(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
row_count: int
num_cols: int
numeric_cols: list[int] = Field(default_factory=list)
categorical_cols: list[int] = Field(default_factory=list)
numeric_values: dict[int, list[float]] = Field(default_factory=dict)
categorical_counts: dict[int, Counter[str]] = Field(default_factory=dict)
id_col: int | None = None
date_min: date | None = None
date_max: date | None = None
@property
def categorical_values(self) -> dict[int, list[str]]:
return {ci: list(c.keys()) for ci, c in self.categorical_counts.items()}
def analyze_sheet(headers: list[str], parsed_rows: list[ParsedRow]) -> SheetAnalysis:
a = SheetAnalysis(row_count=len(parsed_rows), num_cols=len(headers))
columns = zip_longest(*(pr.row for pr in parsed_rows), fillvalue="")
for idx, (header, raw_values) in enumerate(zip(headers, columns)):
values = [v.strip() for v in raw_values if v.strip()]
if not values:
continue
# Identifier: id-named column whose values are all unique. Detected
# before classification so a numeric `id` column still gets flagged.
distinct = set(values)
if a.id_col is None and len(distinct) == len(values) and _is_id_name(header):
a.id_col = idx
# Numeric: every value parses as a number.
nums = _try_all_numeric(values)
if nums is not None:
a.numeric_cols.append(idx)
a.numeric_values[idx] = nums
continue
# Date: every value parses as a date — fold into the sheet-wide range.
dates = _try_all_dates(values)
if dates:
dmin = min(dates)
dmax = max(dates)
a.date_min = dmin if a.date_min is None else min(a.date_min, dmin)
a.date_max = dmax if a.date_max is None else max(a.date_max, dmax)
continue
# Categorical: low-cardinality column — keep counts for samples + top values.
if len(distinct) <= max(CATEGORICAL_DISTINCT_THRESHOLD, len(values) // 2):
a.categorical_cols.append(idx)
a.categorical_counts[idx] = Counter(values)
return a
def _try_all_numeric(values: list[str]) -> list[float] | None:
parsed: list[float] = []
for v in values:
n = _parse_num(v)
if n is None:
return None
parsed.append(n)
return parsed
def _parse_num(value: str) -> float | None:
try:
return float(value.replace(",", ""))
except ValueError:
return None
def _try_all_dates(values: list[str]) -> list[date] | None:
parsed: list[date] = []
for v in values:
d = _try_date(v)
if d is None:
return None
parsed.append(d)
return parsed
def _try_date(value: str) -> date | None:
if len(value) < 4 or not any(c in value for c in "-/T"):
return None
try:
return parse_dt(value).date()
except (ValueError, OverflowError, TypeError):
return None
def _is_id_name(name: str) -> bool:
lowered = name.lower().strip().replace("-", "_")
return lowered in ID_NAME_TOKENS or any(
lowered.endswith(f"_{t}") for t in ID_NAME_TOKENS
)

View File

@@ -1,51 +1,29 @@
"""Per-section sheet descriptor chunk builder."""
from datetime import date
from itertools import zip_longest
from dateutil.parser import parse as parse_dt
from pydantic import BaseModel
from pydantic import Field
from onyx.connectors.models import Section
from onyx.indexing.chunking.tabular_section_chunker.analysis import SheetAnalysis
from onyx.indexing.chunking.tabular_section_chunker.util import label
from onyx.indexing.chunking.tabular_section_chunker.util import pack_lines
from onyx.natural_language_processing.utils import BaseTokenizer
from onyx.natural_language_processing.utils import count_tokens
from onyx.utils.csv_utils import parse_csv_string
from onyx.utils.csv_utils import ParsedRow
from onyx.utils.csv_utils import read_csv_header
MAX_NUMERIC_COLS = 12
MAX_CATEGORICAL_COLS = 6
MAX_CATEGORICAL_WITH_SAMPLES = 4
MAX_DISTINCT_SAMPLES = 8
CATEGORICAL_DISTINCT_THRESHOLD = 20
ID_NAME_TOKENS = {"id", "uuid", "uid", "guid", "key"}
class SheetAnalysis(BaseModel):
row_count: int
num_cols: int
numeric_cols: list[int] = Field(default_factory=list)
categorical_cols: list[int] = Field(default_factory=list)
categorical_values: dict[int, list[str]] = Field(default_factory=dict)
id_col: int | None = None
date_min: date | None = None
date_max: date | None = None
def build_sheet_descriptor_chunks(
section: Section,
headers: list[str],
analysis: SheetAnalysis,
heading: str,
tokenizer: BaseTokenizer,
max_tokens: int,
) -> list[str]:
"""Build sheet descriptor chunk(s) from a parsed CSV section.
"""Build sheet descriptor chunk(s) from a pre-parsed sheet.
Output (lines joined by "\\n"; lines that overflow ``max_tokens`` on
their own are skipped; ``section.heading`` is prepended to every
emitted chunk so retrieval keeps sheet context after a split):
their own are skipped; ``heading`` is prepended to every emitted
chunk so retrieval keeps sheet context after a split):
{section.heading} # optional
{heading} # optional
Sheet overview.
This sheet has {N} rows and {M} columns.
Columns: {col1}, {col2}, ...
@@ -55,25 +33,21 @@ def build_sheet_descriptor_chunks(
Identifier column: {col}. # optional
Values seen in {col}: {v1}, {v2}, ... # optional, repeated
"""
text = section.text or ""
parsed_rows = list(parse_csv_string(text))
headers = parsed_rows[0].header if parsed_rows else read_csv_header(text)
if not headers:
return []
a = _analyze(headers, parsed_rows)
lines = [
_overview_line(a),
_overview_line(analysis),
_columns_line(headers),
_time_range_line(a),
_numeric_cols_line(headers, a),
_categorical_cols_line(headers, a),
_id_col_line(headers, a),
_values_seen_line(headers, a),
_time_range_line(analysis),
_numeric_cols_line(headers, analysis),
_categorical_cols_line(headers, analysis),
_id_col_line(headers, analysis),
_values_seen_line(headers, analysis),
]
return _pack_lines(
return pack_lines(
[line for line in lines if line],
prefix=section.heading or "",
prefix=heading,
tokenizer=tokenizer,
max_tokens=max_tokens,
)
@@ -87,7 +61,7 @@ def _overview_line(a: SheetAnalysis) -> str:
def _columns_line(headers: list[str]) -> str:
return "Columns: " + ", ".join(_label(h) for h in headers)
return "Columns: " + ", ".join(label(h) for h in headers)
def _time_range_line(a: SheetAnalysis) -> str:
@@ -99,7 +73,7 @@ def _time_range_line(a: SheetAnalysis) -> str:
def _numeric_cols_line(headers: list[str], a: SheetAnalysis) -> str:
if not a.numeric_cols:
return ""
names = ", ".join(_label(headers[i]) for i in a.numeric_cols[:MAX_NUMERIC_COLS])
names = ", ".join(label(headers[i]) for i in a.numeric_cols[:MAX_NUMERIC_COLS])
return f"Numeric columns (aggregatable by sum, average, min, max): {names}"
@@ -107,7 +81,7 @@ def _categorical_cols_line(headers: list[str], a: SheetAnalysis) -> str:
if not a.categorical_cols:
return ""
names = ", ".join(
_label(headers[i]) for i in a.categorical_cols[:MAX_CATEGORICAL_COLS]
label(headers[i]) for i in a.categorical_cols[:MAX_CATEGORICAL_COLS]
)
return f"Categorical columns (groupable, can be counted by value): {names}"
@@ -115,7 +89,7 @@ def _categorical_cols_line(headers: list[str], a: SheetAnalysis) -> str:
def _id_col_line(headers: list[str], a: SheetAnalysis) -> str:
if a.id_col is None:
return ""
return f"Identifier column: {_label(headers[a.id_col])}."
return f"Identifier column: {label(headers[a.id_col])}."
def _values_seen_line(headers: list[str], a: SheetAnalysis) -> str:
@@ -123,106 +97,5 @@ def _values_seen_line(headers: list[str], a: SheetAnalysis) -> str:
for ci in a.categorical_cols[:MAX_CATEGORICAL_WITH_SAMPLES]:
sample = sorted(a.categorical_values.get(ci, []))[:MAX_DISTINCT_SAMPLES]
if sample:
rows.append(f"Values seen in {_label(headers[ci])}: " + ", ".join(sample))
rows.append(f"Values seen in {label(headers[ci])}: " + ", ".join(sample))
return "\n".join(rows)
def _label(name: str) -> str:
return f"{name} ({name.replace('_', ' ')})" if "_" in name else name
def _is_numeric(value: str) -> bool:
try:
float(value.replace(",", ""))
return True
except ValueError:
return False
def _try_date(value: str) -> date | None:
if len(value) < 4 or not any(c in value for c in "-/T"):
return None
try:
return parse_dt(value).date()
except (ValueError, OverflowError, TypeError):
return None
def _is_id_name(name: str) -> bool:
lowered = name.lower().strip().replace("-", "_")
return lowered in ID_NAME_TOKENS or any(
lowered.endswith(f"_{t}") for t in ID_NAME_TOKENS
)
def _analyze(headers: list[str], parsed_rows: list[ParsedRow]) -> SheetAnalysis:
a = SheetAnalysis(row_count=len(parsed_rows), num_cols=len(headers))
columns = zip_longest(*(pr.row for pr in parsed_rows), fillvalue="")
for idx, (header, raw_values) in enumerate(zip(headers, columns)):
# Pull the column's non-empty values; skip if the column is blank.
values = [v.strip() for v in raw_values if v.strip()]
if not values:
continue
# Identifier: id-named column whose values are all unique. Detected
# before classification so a numeric `id` column still gets flagged.
distinct = set(values)
if a.id_col is None and len(distinct) == len(values) and _is_id_name(header):
a.id_col = idx
# Numeric: every value parses as a number.
if all(_is_numeric(v) for v in values):
a.numeric_cols.append(idx)
continue
# Date: every value parses as a date — fold into the sheet-wide range.
dates = [_try_date(v) for v in values]
if all(d is not None for d in dates):
dmin = min(filter(None, dates))
dmax = max(filter(None, dates))
a.date_min = dmin if a.date_min is None else min(a.date_min, dmin)
a.date_max = dmax if a.date_max is None else max(a.date_max, dmax)
continue
# Categorical: low-cardinality column — keep distinct values for samples.
if len(distinct) <= max(CATEGORICAL_DISTINCT_THRESHOLD, len(values) // 2):
a.categorical_cols.append(idx)
a.categorical_values[idx] = list(distinct)
return a
def _pack_lines(
lines: list[str],
prefix: str,
tokenizer: BaseTokenizer,
max_tokens: int,
) -> list[str]:
"""Greedily pack lines into chunks ≤ max_tokens. Lines that on
their own exceed max_tokens (after accounting for the prefix) are
skipped. ``prefix`` is prepended to every emitted chunk."""
prefix_tokens = count_tokens(prefix, tokenizer) + 1 if prefix else 0
budget = max_tokens - prefix_tokens
chunks: list[str] = []
current: list[str] = []
current_tokens = 0
for line in lines:
line_tokens = count_tokens(line, tokenizer)
if line_tokens > budget:
continue
sep = 1 if current else 0
if current_tokens + sep + line_tokens > budget:
chunks.append(_join_with_prefix(current, prefix))
current = [line]
current_tokens = line_tokens
else:
current.append(line)
current_tokens += sep + line_tokens
if current:
chunks.append(_join_with_prefix(current, prefix))
return chunks
def _join_with_prefix(lines: list[str], prefix: str) -> str:
body = "\n".join(lines)
return f"{prefix}\n{body}" if prefix else body

View File

@@ -7,14 +7,19 @@ from onyx.indexing.chunking.section_chunker import AccumulatorState
from onyx.indexing.chunking.section_chunker import ChunkPayload
from onyx.indexing.chunking.section_chunker import SectionChunker
from onyx.indexing.chunking.section_chunker import SectionChunkerOutput
from onyx.indexing.chunking.tabular_section_chunker.analysis import analyze_sheet
from onyx.indexing.chunking.tabular_section_chunker.sheet_descriptor import (
build_sheet_descriptor_chunks,
)
from onyx.indexing.chunking.tabular_section_chunker.total_descriptor import (
build_total_descriptor_chunks,
)
from onyx.natural_language_processing.utils import BaseTokenizer
from onyx.natural_language_processing.utils import count_tokens
from onyx.natural_language_processing.utils import split_text_by_tokens
from onyx.utils.csv_utils import parse_csv_string
from onyx.utils.csv_utils import ParsedRow
from onyx.utils.csv_utils import read_csv_header
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -230,24 +235,38 @@ class TabularChunker(SectionChunker):
) -> SectionChunkerOutput:
payloads = accumulator.flush_to_list()
parsed_rows = list(parse_csv_string(section.text or ""))
sheet_header = section.heading or ""
text = section.text or ""
parsed_rows = list(parse_csv_string(text))
headers = parsed_rows[0].header if parsed_rows else read_csv_header(text)
heading = section.heading or ""
chunk_texts: list[str] = []
if parsed_rows:
chunk_texts.extend(
parse_to_chunks(
rows=parsed_rows,
sheet_header=sheet_header,
sheet_header=heading,
tokenizer=self.tokenizer,
max_tokens=content_token_limit,
)
)
if not self.ignore_metadata_chunks:
if not self.ignore_metadata_chunks and headers:
analysis = analyze_sheet(headers, parsed_rows)
chunk_texts.extend(
build_sheet_descriptor_chunks(
section=section,
headers=headers,
analysis=analysis,
heading=heading,
tokenizer=self.tokenizer,
max_tokens=content_token_limit,
)
)
chunk_texts.extend(
build_total_descriptor_chunks(
headers=headers,
analysis=analysis,
heading=heading,
tokenizer=self.tokenizer,
max_tokens=content_token_limit,
)

View File

@@ -0,0 +1,70 @@
from collections import Counter
from onyx.indexing.chunking.tabular_section_chunker.analysis import SheetAnalysis
from onyx.indexing.chunking.tabular_section_chunker.util import label
from onyx.indexing.chunking.tabular_section_chunker.util import pack_lines
from onyx.natural_language_processing.utils import BaseTokenizer
TOTALS_HEADER = (
"Totals and overall aggregates across all rows. This sheet can answer "
"whole-dataset questions about total, overall, grand total, sum across "
"all, average, combined, mean, minimum, maximum, and count of values."
)
def build_total_descriptor_chunks(
headers: list[str],
analysis: SheetAnalysis,
heading: str,
tokenizer: BaseTokenizer,
max_tokens: int,
) -> list[str]:
if analysis.row_count == 0:
return []
lines: list[str] = []
for idx in analysis.numeric_cols:
lines.append(_numeric_totals_line(headers[idx], analysis.numeric_values[idx]))
for idx in analysis.categorical_cols:
line = _categorical_top_line(headers[idx], analysis.categorical_counts[idx])
if line:
lines.append(line)
# No meaningful information - leave early
if not lines:
return []
lines.append(f"Total row count: {analysis.row_count}.")
prefix = (f"{heading}\n" if heading else "") + TOTALS_HEADER
return pack_lines(
lines=lines,
prefix=prefix,
tokenizer=tokenizer,
max_tokens=max_tokens,
)
def _numeric_totals_line(name: str, values: list[float]) -> str:
total = sum(values)
avg = total / len(values)
return (
f"Column {label(name)}: total (sum across all rows) = {_fmt(total)}, "
f"average = {_fmt(avg)}, minimum = {_fmt(min(values))}, "
f"maximum = {_fmt(max(values))}, count = {len(values)}."
)
def _categorical_top_line(name: str, counts: Counter[str]) -> str:
top = counts.most_common(1)
if not top:
return ""
val, n = top[0]
return f"Column {label(name)} most frequent value: {val} ({n} occurrences)."
def _fmt(num: float) -> str:
if abs(num) < 1e15 and num == int(num):
return str(int(num))
return f"{num:.6g}"

View File

@@ -0,0 +1,48 @@
from onyx.natural_language_processing.utils import BaseTokenizer
from onyx.natural_language_processing.utils import count_tokens
def label(name: str) -> str:
"""Render a column name with a space-substituted friendly alias in
parens for underscored headers so retrieval matches either surface
form (e.g. ``MTTR_hours`` → ``MTTR_hours (MTTR hours)``)."""
return f"{name} ({name.replace('_', ' ')})" if "_" in name else name
def pack_lines(
lines: list[str],
prefix: str,
tokenizer: BaseTokenizer,
max_tokens: int,
) -> list[str]:
"""Greedily pack ``lines`` into chunks ≤ ``max_tokens``, prepending
``prefix`` (verbatim) to every emitted chunk. Lines whose own token
count exceeds the post-prefix budget are skipped. Callers assemble
the full prefix (heading, header text, etc.) before calling.
"""
prefix_tokens = count_tokens(prefix, tokenizer) + 1 if prefix else 0
budget = max_tokens - prefix_tokens
chunks: list[str] = []
current: list[str] = []
current_tokens = 0
for line in lines:
line_tokens = count_tokens(line, tokenizer)
if line_tokens > budget:
continue
sep = 1 if current else 0
if current_tokens + sep + line_tokens > budget:
chunks.append(_join_with_prefix(current, prefix))
current = [line]
current_tokens = line_tokens
else:
current.append(line)
current_tokens += sep + line_tokens
if current:
chunks.append(_join_with_prefix(current, prefix))
return chunks
def _join_with_prefix(lines: list[str], prefix: str) -> str:
body = "\n".join(lines)
return f"{prefix}\n{body}" if prefix else body

View File

@@ -1516,6 +1516,10 @@
"display_name": "Claude Opus 4.6",
"model_vendor": "anthropic"
},
"claude-opus-4-7": {
"display_name": "Claude Opus 4.7",
"model_vendor": "anthropic"
},
"claude-opus-4-5-20251101": {
"display_name": "Claude Opus 4.5",
"model_vendor": "anthropic",

View File

@@ -46,6 +46,15 @@ ANTHROPIC_REASONING_EFFORT_BUDGET: dict[ReasoningEffort, int] = {
ReasoningEffort.HIGH: 4096,
}
# Newer Anthropic models (Claude Opus 4.7+) use adaptive thinking with
# output_config.effort instead of thinking.type.enabled + budget_tokens.
ANTHROPIC_ADAPTIVE_REASONING_EFFORT: dict[ReasoningEffort, str] = {
ReasoningEffort.AUTO: "medium",
ReasoningEffort.LOW: "low",
ReasoningEffort.MEDIUM: "medium",
ReasoningEffort.HIGH: "high",
}
# Content part structures for multimodal messages
# The classes in this mirror the OpenAI Chat Completions message types and work well with routers like LiteLLM

View File

@@ -23,6 +23,7 @@ from onyx.llm.interfaces import ToolChoiceOptions
from onyx.llm.model_response import ModelResponse
from onyx.llm.model_response import ModelResponseStream
from onyx.llm.model_response import Usage
from onyx.llm.models import ANTHROPIC_ADAPTIVE_REASONING_EFFORT
from onyx.llm.models import ANTHROPIC_REASONING_EFFORT_BUDGET
from onyx.llm.models import OPENAI_REASONING_EFFORT
from onyx.llm.request_context import get_llm_mock_response
@@ -67,8 +68,13 @@ STANDARD_MAX_TOKENS_KWARG = "max_completion_tokens"
_VERTEX_ANTHROPIC_MODELS_REJECTING_OUTPUT_CONFIG = (
"claude-opus-4-5",
"claude-opus-4-6",
"claude-opus-4-7",
)
# Anthropic models that require the adaptive thinking API (thinking.type.adaptive
# + output_config.effort) instead of the legacy thinking.type.enabled + budget_tokens.
_ANTHROPIC_ADAPTIVE_THINKING_MODELS = ("claude-opus-4-7",)
class LLMTimeoutError(Exception):
"""
@@ -230,6 +236,14 @@ def _is_vertex_model_rejecting_output_config(model_name: str) -> bool:
)
def _anthropic_uses_adaptive_thinking(model_name: str) -> bool:
normalized_model_name = model_name.lower()
return any(
adaptive_model in normalized_model_name
for adaptive_model in _ANTHROPIC_ADAPTIVE_THINKING_MODELS
)
class LitellmLLM(LLM):
"""Uses Litellm library to allow easy configuration to use a multitude of LLMs
See https://python.langchain.com/docs/integrations/chat/litellm"""
@@ -509,10 +523,6 @@ class LitellmLLM(LLM):
}
elif is_claude_model:
budget_tokens: int | None = ANTHROPIC_REASONING_EFFORT_BUDGET.get(
reasoning_effort
)
# Anthropic requires every assistant message with tool_use
# blocks to start with a thinking block that carries a
# cryptographic signature. We don't preserve those blocks
@@ -520,24 +530,35 @@ class LitellmLLM(LLM):
# contains tool-calling assistant messages. LiteLLM's
# modify_params workaround doesn't cover all providers
# (notably Bedrock).
can_enable_thinking = (
budget_tokens is not None
and not _prompt_contains_tool_call_history(prompt)
)
has_tool_call_history = _prompt_contains_tool_call_history(prompt)
if can_enable_thinking:
assert budget_tokens is not None # mypy
if max_tokens is not None:
# Anthropic has a weird rule where max token has to be at least as much as budget tokens if set
# and the minimum budget tokens is 1024
# Will note that overwriting a developer set max tokens is not ideal but is the best we can do for now
# It is better to allow the LLM to output more reasoning tokens even if it results in a fairly small tool
# call as compared to reducing the budget for reasoning.
max_tokens = max(budget_tokens + 1, max_tokens)
optional_kwargs["thinking"] = {
"type": "enabled",
"budget_tokens": budget_tokens,
}
if _anthropic_uses_adaptive_thinking(self.config.model_name):
# Newer Anthropic models (Claude Opus 4.7+) reject
# thinking.type.enabled — they require the adaptive
# thinking config with output_config.effort.
if not has_tool_call_history:
optional_kwargs["thinking"] = {"type": "adaptive"}
optional_kwargs["output_config"] = {
"effort": ANTHROPIC_ADAPTIVE_REASONING_EFFORT[
reasoning_effort
],
}
else:
budget_tokens: int | None = ANTHROPIC_REASONING_EFFORT_BUDGET.get(
reasoning_effort
)
if budget_tokens is not None and not has_tool_call_history:
if max_tokens is not None:
# Anthropic has a weird rule where max token has to be at least as much as budget tokens if set
# and the minimum budget tokens is 1024
# Will note that overwriting a developer set max tokens is not ideal but is the best we can do for now
# It is better to allow the LLM to output more reasoning tokens even if it results in a fairly small tool
# call as compared to reducing the budget for reasoning.
max_tokens = max(budget_tokens + 1, max_tokens)
optional_kwargs["thinking"] = {
"type": "enabled",
"budget_tokens": budget_tokens,
}
# LiteLLM just does some mapping like this anyway but is incomplete for Anthropic
optional_kwargs.pop("reasoning_effort", None)

View File

@@ -1,6 +1,6 @@
{
"version": "1.1",
"updated_at": "2026-03-05T00:00:00Z",
"version": "1.2",
"updated_at": "2026-04-16T00:00:00Z",
"providers": {
"openai": {
"default_model": { "name": "gpt-5.4" },
@@ -10,8 +10,12 @@
]
},
"anthropic": {
"default_model": "claude-opus-4-6",
"default_model": "claude-opus-4-7",
"additional_visible_models": [
{
"name": "claude-opus-4-7",
"display_name": "Claude Opus 4.7"
},
{
"name": "claude-opus-4-6",
"display_name": "Claude Opus 4.6"

View File

@@ -65,8 +65,9 @@ IMPORTANT: each call to this tool is independent. Variables from previous calls
GENERATE_IMAGE_GUIDANCE = """
## generate_image
NEVER use generate_image unless the user specifically requests an image.
For edits/variations of a previously generated image, pass `reference_image_file_ids` with
the `file_id` values returned by earlier `generate_image` tool results.
To edit, restyle, or vary an existing image, pass its file_id in `reference_image_file_ids`. \
File IDs come from `[attached image — file_id: <id>]` tags on user-attached images or from prior `generate_image` tool results — never invent one. \
Leave `reference_image_file_ids` unset for a fresh generation.
""".lstrip()
MEMORY_GUIDANCE = """

View File

@@ -126,6 +126,8 @@ class TenantRedis(redis.Redis):
"srem",
"scard",
"zadd",
"zrange",
"zrevrange",
"zrangebyscore",
"zremrangebyscore",
"zscore",

View File

@@ -208,12 +208,6 @@ class PythonToolOverrideKwargs(BaseModel):
chat_files: list[ChatFile] = []
class ImageGenerationToolOverrideKwargs(BaseModel):
"""Override kwargs for image generation tool calls."""
recent_generated_image_file_ids: list[str] = []
class SearchToolRunContext(BaseModel):
emitter: Emitter

View File

@@ -26,7 +26,6 @@ from onyx.server.query_and_chat.streaming_models import ImageGenerationToolHeart
from onyx.server.query_and_chat.streaming_models import ImageGenerationToolStart
from onyx.server.query_and_chat.streaming_models import Packet
from onyx.tools.interface import Tool
from onyx.tools.models import ImageGenerationToolOverrideKwargs
from onyx.tools.models import ToolCallException
from onyx.tools.models import ToolExecutionException
from onyx.tools.models import ToolResponse
@@ -48,7 +47,7 @@ PROMPT_FIELD = "prompt"
REFERENCE_IMAGE_FILE_IDS_FIELD = "reference_image_file_ids"
class ImageGenerationTool(Tool[ImageGenerationToolOverrideKwargs | None]):
class ImageGenerationTool(Tool[None]):
NAME = "generate_image"
DESCRIPTION = "Generate an image based on a prompt. Do not use unless the user specifically requests an image."
DISPLAY_NAME = "Image Generation"
@@ -142,8 +141,11 @@ class ImageGenerationTool(Tool[ImageGenerationToolOverrideKwargs | None]):
REFERENCE_IMAGE_FILE_IDS_FIELD: {
"type": "array",
"description": (
"Optional image file IDs to use as reference context for edits/variations. "
"Use the file_id values returned by previous generate_image calls."
"Optional file_ids of existing images to edit or use as reference;"
" the first is the primary edit source."
" Get file_ids from `[attached image — file_id: <id>]` tags on"
" user-attached images or from prior generate_image tool responses."
" Omit for a fresh, unrelated generation."
),
"items": {
"type": "string",
@@ -254,41 +256,31 @@ class ImageGenerationTool(Tool[ImageGenerationToolOverrideKwargs | None]):
def _resolve_reference_image_file_ids(
self,
llm_kwargs: dict[str, Any],
override_kwargs: ImageGenerationToolOverrideKwargs | None,
) -> list[str]:
raw_reference_ids = llm_kwargs.get(REFERENCE_IMAGE_FILE_IDS_FIELD)
if raw_reference_ids is not None:
if not isinstance(raw_reference_ids, list) or not all(
isinstance(file_id, str) for file_id in raw_reference_ids
):
raise ToolCallException(
message=(
f"Invalid {REFERENCE_IMAGE_FILE_IDS_FIELD}: expected array of strings, got {type(raw_reference_ids)}"
),
llm_facing_message=(
f"The '{REFERENCE_IMAGE_FILE_IDS_FIELD}' field must be an array of file_id strings."
),
)
reference_image_file_ids = [
file_id.strip() for file_id in raw_reference_ids if file_id.strip()
]
elif (
override_kwargs
and override_kwargs.recent_generated_image_file_ids
and self.img_provider.supports_reference_images
):
# If no explicit reference was provided, default to the most recently generated image.
reference_image_file_ids = [
override_kwargs.recent_generated_image_file_ids[-1]
]
else:
reference_image_file_ids = []
if raw_reference_ids is None:
# No references requested — plain generation.
return []
# Deduplicate while preserving order.
if not isinstance(raw_reference_ids, list) or not all(
isinstance(file_id, str) for file_id in raw_reference_ids
):
raise ToolCallException(
message=(
f"Invalid {REFERENCE_IMAGE_FILE_IDS_FIELD}: expected array of strings, got {type(raw_reference_ids)}"
),
llm_facing_message=(
f"The '{REFERENCE_IMAGE_FILE_IDS_FIELD}' field must be an array of file_id strings."
),
)
# Deduplicate while preserving order (first occurrence wins, so the
# LLM's intended "primary edit source" stays at index 0).
deduped_reference_image_ids: list[str] = []
seen_ids: set[str] = set()
for file_id in reference_image_file_ids:
if file_id in seen_ids:
for file_id in raw_reference_ids:
file_id = file_id.strip()
if not file_id or file_id in seen_ids:
continue
seen_ids.add(file_id)
deduped_reference_image_ids.append(file_id)
@@ -302,14 +294,14 @@ class ImageGenerationTool(Tool[ImageGenerationToolOverrideKwargs | None]):
f"Reference images requested but provider '{self.provider}' does not support image-editing context."
),
llm_facing_message=(
"This image provider does not support editing from previous image context. "
"This image provider does not support editing from existing images. "
"Try text-only generation, or switch to a provider/model that supports image edits."
),
)
max_reference_images = self.img_provider.max_reference_images
if max_reference_images > 0:
return deduped_reference_image_ids[-max_reference_images:]
return deduped_reference_image_ids[:max_reference_images]
return deduped_reference_image_ids
def _load_reference_images(
@@ -358,7 +350,7 @@ class ImageGenerationTool(Tool[ImageGenerationToolOverrideKwargs | None]):
def run(
self,
placement: Placement,
override_kwargs: ImageGenerationToolOverrideKwargs | None = None,
override_kwargs: None = None, # noqa: ARG002
**llm_kwargs: Any,
) -> ToolResponse:
if PROMPT_FIELD not in llm_kwargs:
@@ -373,7 +365,6 @@ class ImageGenerationTool(Tool[ImageGenerationToolOverrideKwargs | None]):
shape = ImageShape(llm_kwargs.get("shape", ImageShape.SQUARE.value))
reference_image_file_ids = self._resolve_reference_image_file_ids(
llm_kwargs=llm_kwargs,
override_kwargs=override_kwargs,
)
reference_images = self._load_reference_images(reference_image_file_ids)

View File

@@ -1,4 +1,3 @@
import json
import traceback
from collections import defaultdict
from typing import Any
@@ -14,7 +13,6 @@ from onyx.server.query_and_chat.streaming_models import SectionEnd
from onyx.tools.interface import Tool
from onyx.tools.models import ChatFile
from onyx.tools.models import ChatMinimalTextMessage
from onyx.tools.models import ImageGenerationToolOverrideKwargs
from onyx.tools.models import OpenURLToolOverrideKwargs
from onyx.tools.models import ParallelToolCallResponse
from onyx.tools.models import PythonToolOverrideKwargs
@@ -24,9 +22,6 @@ from onyx.tools.models import ToolCallKickoff
from onyx.tools.models import ToolExecutionException
from onyx.tools.models import ToolResponse
from onyx.tools.models import WebSearchToolOverrideKwargs
from onyx.tools.tool_implementations.images.image_generation_tool import (
ImageGenerationTool,
)
from onyx.tools.tool_implementations.memory.memory_tool import MemoryTool
from onyx.tools.tool_implementations.memory.memory_tool import MemoryToolOverrideKwargs
from onyx.tools.tool_implementations.open_url.open_url_tool import OpenURLTool
@@ -110,63 +105,6 @@ def _merge_tool_calls(tool_calls: list[ToolCallKickoff]) -> list[ToolCallKickoff
return merged_calls
def _extract_image_file_ids_from_tool_response_message(
message: str,
) -> list[str]:
try:
parsed_message = json.loads(message)
except json.JSONDecodeError:
return []
parsed_items: list[Any] = (
parsed_message if isinstance(parsed_message, list) else [parsed_message]
)
file_ids: list[str] = []
for item in parsed_items:
if not isinstance(item, dict):
continue
file_id = item.get("file_id")
if isinstance(file_id, str):
file_ids.append(file_id)
return file_ids
def _extract_recent_generated_image_file_ids(
message_history: list[ChatMessageSimple],
) -> list[str]:
tool_name_by_tool_call_id: dict[str, str] = {}
recent_image_file_ids: list[str] = []
seen_file_ids: set[str] = set()
for message in message_history:
if message.message_type == MessageType.ASSISTANT and message.tool_calls:
for tool_call in message.tool_calls:
tool_name_by_tool_call_id[tool_call.tool_call_id] = tool_call.tool_name
continue
if (
message.message_type != MessageType.TOOL_CALL_RESPONSE
or not message.tool_call_id
):
continue
tool_name = tool_name_by_tool_call_id.get(message.tool_call_id)
if tool_name != ImageGenerationTool.NAME:
continue
for file_id in _extract_image_file_ids_from_tool_response_message(
message.message
):
if file_id in seen_file_ids:
continue
seen_file_ids.add(file_id)
recent_image_file_ids.append(file_id)
return recent_image_file_ids
def _safe_run_single_tool(
tool: Tool,
tool_call: ToolCallKickoff,
@@ -386,9 +324,6 @@ def run_tool_calls(
url_to_citation: dict[str, int] = {
url: citation_num for citation_num, url in citation_mapping.items()
}
recent_generated_image_file_ids = _extract_recent_generated_image_file_ids(
message_history
)
# Prepare all tool calls with their override_kwargs
# Each tool gets a unique starting citation number to avoid conflicts when running in parallel
@@ -405,7 +340,6 @@ def run_tool_calls(
| WebSearchToolOverrideKwargs
| OpenURLToolOverrideKwargs
| PythonToolOverrideKwargs
| ImageGenerationToolOverrideKwargs
| MemoryToolOverrideKwargs
| None
) = None
@@ -454,10 +388,6 @@ def run_tool_calls(
override_kwargs = PythonToolOverrideKwargs(
chat_files=chat_files or [],
)
elif isinstance(tool, ImageGenerationTool):
override_kwargs = ImageGenerationToolOverrideKwargs(
recent_generated_image_file_ids=recent_generated_image_file_ids
)
elif isinstance(tool, MemoryTool):
override_kwargs = MemoryToolOverrideKwargs(
user_name=(

View File

@@ -7,7 +7,6 @@ import pytest
from onyx.connectors.gong.connector import GongConnector
from onyx.connectors.models import Document
from onyx.connectors.models import HierarchyNode
@pytest.fixture
@@ -32,18 +31,20 @@ def test_gong_basic(
mock_get_api_key: MagicMock, # noqa: ARG001
gong_connector: GongConnector,
) -> None:
doc_batch_generator = gong_connector.poll_source(0, time.time())
doc_batch = next(doc_batch_generator)
with pytest.raises(StopIteration):
next(doc_batch_generator)
assert len(doc_batch) == 2
checkpoint = gong_connector.build_dummy_checkpoint()
docs: list[Document] = []
for doc in doc_batch:
if not isinstance(doc, HierarchyNode):
docs.append(doc)
while checkpoint.has_more:
generator = gong_connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
item = next(generator)
if isinstance(item, Document):
docs.append(item)
except StopIteration as e:
checkpoint = e.value
assert len(docs) == 2
assert docs[0].semantic_identifier == "test with chris"
assert docs[1].semantic_identifier == "Testing Gong"

View File

@@ -0,0 +1,483 @@
import time
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from onyx.connectors.gong.connector import GongConnector
from onyx.connectors.gong.connector import GongConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import Document
def _make_transcript(call_id: str) -> dict[str, Any]:
return {
"callId": call_id,
"transcript": [
{
"speakerId": "speaker1",
"sentences": [{"text": "Hello world"}],
}
],
}
def _make_call_detail(call_id: str, title: str) -> dict[str, Any]:
return {
"metaData": {
"id": call_id,
"started": "2026-01-15T10:00:00Z",
"title": title,
"purpose": "Test call",
"url": f"https://app.gong.io/call?id={call_id}",
"system": "test-system",
},
"parties": [
{
"speakerId": "speaker1",
"name": "Alice",
"emailAddress": "alice@test.com",
}
],
}
@pytest.fixture
def connector() -> GongConnector:
connector = GongConnector()
connector.load_credentials(
{
"gong_access_key": "test-key",
"gong_access_key_secret": "test-secret",
}
)
return connector
class TestGongConnectorCheckpoint:
def test_build_dummy_checkpoint(self, connector: GongConnector) -> None:
checkpoint = connector.build_dummy_checkpoint()
assert checkpoint.has_more is True
assert checkpoint.workspace_ids is None
assert checkpoint.workspace_index == 0
assert checkpoint.cursor is None
def test_validate_checkpoint_json(self, connector: GongConnector) -> None:
original = GongConnectorCheckpoint(
has_more=True,
workspace_ids=["ws1", None],
workspace_index=1,
cursor="abc123",
)
json_str = original.model_dump_json()
restored = connector.validate_checkpoint_json(json_str)
assert restored == original
@patch.object(GongConnector, "_throttled_request")
def test_first_call_resolves_workspaces(
self,
mock_request: MagicMock,
connector: GongConnector,
) -> None:
"""First checkpoint call should resolve workspaces and return without fetching."""
# No workspaces configured — should resolve to [None]
checkpoint = connector.build_dummy_checkpoint()
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
# Should return immediately (no yields)
with pytest.raises(StopIteration) as exc_info:
next(generator)
new_checkpoint = exc_info.value.value
assert new_checkpoint.workspace_ids == [None]
assert new_checkpoint.has_more is True
assert new_checkpoint.workspace_index == 0
# No API calls should have been made for workspace resolution
# when no workspaces are configured
mock_request.assert_not_called()
@patch.object(GongConnector, "_throttled_request")
def test_single_page_no_cursor(
self,
mock_request: MagicMock,
connector: GongConnector,
) -> None:
"""Single page of transcripts with no pagination cursor."""
transcript_response = MagicMock()
transcript_response.status_code = 200
transcript_response.json.return_value = {
"callTranscripts": [_make_transcript("call1")],
"records": {},
}
details_response = MagicMock()
details_response.status_code = 200
details_response.json.return_value = {
"calls": [_make_call_detail("call1", "Test Call")]
}
mock_request.side_effect = [transcript_response, details_response]
# Start from a checkpoint that already has workspaces resolved
checkpoint = GongConnectorCheckpoint(
has_more=True,
workspace_ids=[None],
workspace_index=0,
)
docs: list[Document] = []
failures: list[ConnectorFailure] = []
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
item = next(generator)
if isinstance(item, Document):
docs.append(item)
elif isinstance(item, ConnectorFailure):
failures.append(item)
except StopIteration as e:
checkpoint = e.value
assert len(docs) == 1
assert docs[0].semantic_identifier == "Test Call"
assert len(failures) == 0
assert checkpoint.has_more is False
assert checkpoint.workspace_index == 1
@patch.object(GongConnector, "_throttled_request")
def test_multi_page_with_cursor(
self,
mock_request: MagicMock,
connector: GongConnector,
) -> None:
"""Two pages of transcripts — cursor advances between checkpoint calls."""
# Page 1: returns cursor
page1_response = MagicMock()
page1_response.status_code = 200
page1_response.json.return_value = {
"callTranscripts": [_make_transcript("call1")],
"records": {"cursor": "page2cursor"},
}
details1_response = MagicMock()
details1_response.status_code = 200
details1_response.json.return_value = {
"calls": [_make_call_detail("call1", "Call One")]
}
# Page 2: no cursor (done)
page2_response = MagicMock()
page2_response.status_code = 200
page2_response.json.return_value = {
"callTranscripts": [_make_transcript("call2")],
"records": {},
}
details2_response = MagicMock()
details2_response.status_code = 200
details2_response.json.return_value = {
"calls": [_make_call_detail("call2", "Call Two")]
}
mock_request.side_effect = [
page1_response,
details1_response,
page2_response,
details2_response,
]
checkpoint = GongConnectorCheckpoint(
has_more=True,
workspace_ids=[None],
workspace_index=0,
)
all_docs: list[Document] = []
# First checkpoint call — page 1
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
item = next(generator)
if isinstance(item, Document):
all_docs.append(item)
except StopIteration as e:
checkpoint = e.value
assert len(all_docs) == 1
assert checkpoint.cursor == "page2cursor"
assert checkpoint.has_more is True
# Second checkpoint call — page 2
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
item = next(generator)
if isinstance(item, Document):
all_docs.append(item)
except StopIteration as e:
checkpoint = e.value
assert len(all_docs) == 2
assert all_docs[0].semantic_identifier == "Call One"
assert all_docs[1].semantic_identifier == "Call Two"
assert checkpoint.has_more is False
@patch.object(GongConnector, "_throttled_request")
def test_missing_call_details_yields_failure(
self,
mock_request: MagicMock,
connector: GongConnector,
) -> None:
"""When call details are missing after retries, yield ConnectorFailure."""
transcript_response = MagicMock()
transcript_response.status_code = 200
transcript_response.json.return_value = {
"callTranscripts": [_make_transcript("call1")],
"records": {},
}
# Return empty call details every time (simulating the race condition)
empty_details = MagicMock()
empty_details.status_code = 200
empty_details.json.return_value = {"calls": []}
mock_request.side_effect = [transcript_response] + [
empty_details
] * GongConnector.MAX_CALL_DETAILS_ATTEMPTS
checkpoint = GongConnectorCheckpoint(
has_more=True,
workspace_ids=[None],
workspace_index=0,
)
failures: list[ConnectorFailure] = []
docs: list[Document] = []
with patch("onyx.connectors.gong.connector.time.sleep"):
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
item = next(generator)
if isinstance(item, ConnectorFailure):
failures.append(item)
elif isinstance(item, Document):
docs.append(item)
except StopIteration as e:
checkpoint = e.value
assert len(docs) == 0
assert len(failures) == 1
assert failures[0].failed_document is not None
assert failures[0].failed_document.document_id == "call1"
assert checkpoint.has_more is False
@patch.object(GongConnector, "_throttled_request")
def test_multi_workspace_iteration(
self,
mock_request: MagicMock,
connector: GongConnector,
) -> None:
"""Checkpoint iterates through multiple workspaces."""
# Workspace 1: one call
ws1_transcript = MagicMock()
ws1_transcript.status_code = 200
ws1_transcript.json.return_value = {
"callTranscripts": [_make_transcript("call_ws1")],
"records": {},
}
ws1_details = MagicMock()
ws1_details.status_code = 200
ws1_details.json.return_value = {
"calls": [_make_call_detail("call_ws1", "WS1 Call")]
}
# Workspace 2: one call
ws2_transcript = MagicMock()
ws2_transcript.status_code = 200
ws2_transcript.json.return_value = {
"callTranscripts": [_make_transcript("call_ws2")],
"records": {},
}
ws2_details = MagicMock()
ws2_details.status_code = 200
ws2_details.json.return_value = {
"calls": [_make_call_detail("call_ws2", "WS2 Call")]
}
mock_request.side_effect = [
ws1_transcript,
ws1_details,
ws2_transcript,
ws2_details,
]
checkpoint = GongConnectorCheckpoint(
has_more=True,
workspace_ids=["ws1_id", "ws2_id"],
workspace_index=0,
)
all_docs: list[Document] = []
# Checkpoint call 1 — workspace 1
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
item = next(generator)
if isinstance(item, Document):
all_docs.append(item)
except StopIteration as e:
checkpoint = e.value
assert checkpoint.workspace_index == 1
assert checkpoint.has_more is True
# Checkpoint call 2 — workspace 2
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
item = next(generator)
if isinstance(item, Document):
all_docs.append(item)
except StopIteration as e:
checkpoint = e.value
assert len(all_docs) == 2
assert all_docs[0].semantic_identifier == "WS1 Call"
assert all_docs[1].semantic_identifier == "WS2 Call"
assert checkpoint.has_more is False
assert checkpoint.workspace_index == 2
@patch.object(GongConnector, "_throttled_request")
def test_empty_workspace_404(
self,
mock_request: MagicMock,
connector: GongConnector,
) -> None:
"""404 from transcript API means no calls — workspace exhausted."""
response_404 = MagicMock()
response_404.status_code = 404
mock_request.return_value = response_404
checkpoint = GongConnectorCheckpoint(
has_more=True,
workspace_ids=[None],
workspace_index=0,
)
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
next(generator)
except StopIteration as e:
checkpoint = e.value
assert checkpoint.has_more is False
assert checkpoint.workspace_index == 1
@patch.object(GongConnector, "_throttled_request")
def test_retry_only_fetches_missing_ids(
self,
mock_request: MagicMock,
connector: GongConnector,
) -> None:
"""Retry for missing call details should only re-request the missing IDs."""
transcript_response = MagicMock()
transcript_response.status_code = 200
transcript_response.json.return_value = {
"callTranscripts": [
_make_transcript("call1"),
_make_transcript("call2"),
],
"records": {},
}
# First fetch: returns call1 but not call2
partial_details = MagicMock()
partial_details.status_code = 200
partial_details.json.return_value = {
"calls": [_make_call_detail("call1", "Call One")]
}
# Second fetch (retry): returns call2
missing_details = MagicMock()
missing_details.status_code = 200
missing_details.json.return_value = {
"calls": [_make_call_detail("call2", "Call Two")]
}
mock_request.side_effect = [
transcript_response,
partial_details,
missing_details,
]
checkpoint = GongConnectorCheckpoint(
has_more=True,
workspace_ids=[None],
workspace_index=0,
)
docs: list[Document] = []
with patch("onyx.connectors.gong.connector.time.sleep"):
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
item = next(generator)
if isinstance(item, Document):
docs.append(item)
except StopIteration:
pass
assert len(docs) == 2
assert docs[0].semantic_identifier == "Call One"
assert docs[1].semantic_identifier == "Call Two"
# Verify: 3 API calls total (1 transcript + 1 full details + 1 retry for missing only)
assert mock_request.call_count == 3
# The retry call should only request call2, not both
retry_call_body = mock_request.call_args_list[2][1]["json"]
assert retry_call_body["filter"]["callIds"] == ["call2"]
@patch.object(GongConnector, "_throttled_request")
def test_expired_cursor_restarts_workspace(
self,
mock_request: MagicMock,
connector: GongConnector,
) -> None:
"""Expired pagination cursor resets checkpoint to restart the workspace."""
expired_response = MagicMock()
expired_response.status_code = 400
expired_response.ok = False
expired_response.text = '{"requestId":"abc","errors":["cursor has expired"]}'
mock_request.return_value = expired_response
# Checkpoint mid-pagination with a (now-expired) cursor
checkpoint = GongConnectorCheckpoint(
has_more=True,
workspace_ids=[None],
workspace_index=0,
cursor="stale-cursor",
)
docs: list[Document] = []
generator = connector.load_from_checkpoint(0, time.time(), checkpoint)
try:
while True:
item = next(generator)
if isinstance(item, Document):
docs.append(item)
except StopIteration as e:
checkpoint = e.value
assert len(docs) == 0
# Cursor reset so next call restarts the workspace from scratch
assert checkpoint.cursor is None
assert checkpoint.workspace_index == 0
assert checkpoint.has_more is True

View File

@@ -15,10 +15,19 @@ from onyx.connectors.models import Section
from onyx.connectors.models import TabularSection
from onyx.indexing.chunking.section_chunker import AccumulatorState
from onyx.indexing.chunking.tabular_section_chunker import TabularChunker
from onyx.indexing.chunking.tabular_section_chunker.analysis import analyze_sheet
from onyx.indexing.chunking.tabular_section_chunker.sheet_descriptor import (
build_sheet_descriptor_chunks,
)
from onyx.indexing.chunking.tabular_section_chunker.total_descriptor import (
build_total_descriptor_chunks,
)
from onyx.indexing.chunking.tabular_section_chunker.total_descriptor import (
TOTALS_HEADER,
)
from onyx.natural_language_processing.utils import BaseTokenizer
from onyx.utils.csv_utils import parse_csv_string
from onyx.utils.csv_utils import read_csv_header
class CharTokenizer(BaseTokenizer):
@@ -587,7 +596,7 @@ class TestTabularChunkerChunkSection:
content_chunk = (
"sheet:T\n" "Columns: Name, Age\n" "Name=Alice, Age=30\n" "Name=Bob, Age=25"
)
metadata_chunk = (
descriptor_chunk = (
"sheet:T\n"
"Sheet overview.\n"
"This sheet has 2 rows and 2 columns.\n"
@@ -596,7 +605,17 @@ class TestTabularChunkerChunkSection:
"Categorical columns (groupable, can be counted by value): Name\n"
"Values seen in Name: Alice, Bob"
)
expected_texts = [content_chunk, metadata_chunk]
totals_chunk = (
"sheet:T\n"
"Totals and overall aggregates across all rows. This sheet can answer "
"whole-dataset questions about total, overall, grand total, sum across "
"all, average, combined, mean, minimum, maximum, and count of values.\n"
"Column Age: total (sum across all rows) = 55, average = 27.5, "
"minimum = 25, maximum = 30, count = 2.\n"
"Column Name most frequent value: Alice (1 occurrences).\n"
"Total row count: 2."
)
expected_texts = [content_chunk, descriptor_chunk, totals_chunk]
# --- ACT -------------------------------------------------------
out = _make_chunker_with_metadata().chunk_section(
@@ -607,8 +626,8 @@ class TestTabularChunkerChunkSection:
# --- ASSERT ----------------------------------------------------
assert [p.text for p in out.payloads] == expected_texts
# Content first, metadata second — only the first chunk is fresh.
assert [p.is_continuation for p in out.payloads] == [False, True]
# Content first, metadata chunks follow as continuations.
assert [p.is_continuation for p in out.payloads] == [False, True, True]
class TestBuildSheetDescriptorChunks:
@@ -627,9 +646,14 @@ class TestBuildSheetDescriptorChunks:
heading: str | None = "sheet:T",
max_tokens: int = 500,
) -> list[str]:
section = TabularSection(text=csv_text, link=_DEFAULT_LINK, heading=heading)
parsed_rows = list(parse_csv_string(csv_text))
headers = parsed_rows[0].header if parsed_rows else read_csv_header(csv_text)
if not headers:
return []
return build_sheet_descriptor_chunks(
section=section,
headers=headers,
analysis=analyze_sheet(headers, parsed_rows),
heading=heading or "",
tokenizer=CharTokenizer(),
max_tokens=max_tokens,
)
@@ -837,3 +861,174 @@ class TestBuildSheetDescriptorChunks:
# --- ACT / ASSERT ---------------------------------------------
assert self._build(csv_text, heading="", max_tokens=30) == expected
class TestBuildTotalDescriptorChunks:
"""Direct tests of `build_total_descriptor_chunks` — emits the totals
chunk that names aggregate vocabulary (total/sum/average/min/max/
count/most frequent) plus per-column aggregates so whole-dataset
questions retrieve a chunk whose text actually contains the answer.
"""
@staticmethod
def _build(
csv_text: str,
heading: str | None = "sheet:T",
max_tokens: int = 1000,
) -> list[str]:
parsed_rows = list(parse_csv_string(csv_text))
headers = parsed_rows[0].header if parsed_rows else read_csv_header(csv_text)
if not headers:
return []
return build_total_descriptor_chunks(
headers=headers,
analysis=analyze_sheet(headers, parsed_rows),
heading=heading or "",
tokenizer=CharTokenizer(),
max_tokens=max_tokens,
)
def test_numeric_and_categorical_columns_emit_every_line(self) -> None:
# --- INPUT -----------------------------------------------------
# amount → numeric (total=600, avg=200, min=100, max=300, count=3)
# region → categorical (US appears twice, EU once → top=US (2))
csv_text = "amount,region\n100,US\n200,EU\n300,US\n"
# --- EXPECTED --------------------------------------------------
expected = [
"sheet:T\n"
f"{TOTALS_HEADER}\n"
"Column amount: total (sum across all rows) = 600, average = 200, "
"minimum = 100, maximum = 300, count = 3.\n"
"Column region most frequent value: US (2 occurrences).\n"
"Total row count: 3."
]
# --- ACT / ASSERT ---------------------------------------------
assert self._build(csv_text) == expected
def test_numeric_only_sheet_has_no_categorical_line(self) -> None:
# --- INPUT -----------------------------------------------------
# Both columns are all-numeric → no "most frequent value" lines.
csv_text = "x,y\n1,2\n3,4\n"
# --- EXPECTED --------------------------------------------------
expected = [
"sheet:T\n"
f"{TOTALS_HEADER}\n"
"Column x: total (sum across all rows) = 4, average = 2, "
"minimum = 1, maximum = 3, count = 2.\n"
"Column y: total (sum across all rows) = 6, average = 3, "
"minimum = 2, maximum = 4, count = 2.\n"
"Total row count: 2."
]
# --- ACT / ASSERT ---------------------------------------------
assert self._build(csv_text) == expected
def test_categorical_only_sheet_has_no_numeric_line(self) -> None:
# --- INPUT -----------------------------------------------------
# Non-numeric low-cardinality column → categorical only. "red"
# wins over "blue" 2-to-1.
csv_text = "color\nred\nblue\nred\n"
# --- EXPECTED --------------------------------------------------
expected = [
"sheet:T\n"
f"{TOTALS_HEADER}\n"
"Column color most frequent value: red (2 occurrences).\n"
"Total row count: 3."
]
# --- ACT / ASSERT ---------------------------------------------
assert self._build(csv_text) == expected
def test_underscored_column_names_get_friendly_alias(self) -> None:
# --- INPUT -----------------------------------------------------
# Underscored headers get the same `name (name with spaces)` alias
# used elsewhere so retrieval matches either surface form.
csv_text = "total_cost\n100\n200\n"
# --- EXPECTED --------------------------------------------------
expected = [
"sheet:T\n"
f"{TOTALS_HEADER}\n"
"Column total_cost (total cost): total (sum across all rows) = 300, "
"average = 150, minimum = 100, maximum = 200, count = 2.\n"
"Total row count: 2."
]
# --- ACT / ASSERT ---------------------------------------------
assert self._build(csv_text) == expected
def test_non_integer_averages_format_with_decimals(self) -> None:
# --- INPUT -----------------------------------------------------
# Whole-number inputs but a fractional average. `_fmt` drops the
# ".0" when the value is integral and falls back to `:.6g` when
# it isn't — verify both surfaces on the same line.
csv_text = "rate\n1\n2\n"
# --- EXPECTED --------------------------------------------------
# total=3 (int), avg=1.5 (fractional), min=1, max=2, count=2.
expected = [
"sheet:T\n"
f"{TOTALS_HEADER}\n"
"Column rate: total (sum across all rows) = 3, average = 1.5, "
"minimum = 1, maximum = 2, count = 2.\n"
"Total row count: 2."
]
# --- ACT / ASSERT ---------------------------------------------
assert self._build(csv_text) == expected
def test_empty_section_returns_no_chunks(self) -> None:
# No parsed rows → no totals to report; builder bails out early.
assert self._build("") == []
def test_header_only_csv_returns_no_chunks(self) -> None:
# Header-only CSV yields zero data rows → `parse_csv_string`
# returns nothing, so the builder returns an empty list.
assert self._build("col1,col2\n") == []
def test_no_heading_omits_prefix_line(self) -> None:
# --- INPUT -----------------------------------------------------
# heading=None → prefix is just TOTALS_HEADER, no leading heading
# line in the emitted chunk.
csv_text = "n\n5\n"
# --- EXPECTED --------------------------------------------------
expected = [
f"{TOTALS_HEADER}\n"
"Column n: total (sum across all rows) = 5, average = 5, "
"minimum = 5, maximum = 5, count = 1.\n"
"Total row count: 1."
]
# --- ACT / ASSERT ---------------------------------------------
assert self._build(csv_text, heading=None) == expected
def test_tight_budget_splits_into_multiple_chunks_each_with_header(self) -> None:
# --- INPUT -----------------------------------------------------
# Three numeric columns under a tight budget force pack_lines to
# split across multiple chunks. Every emitted chunk must still
# start with `heading + TOTALS_HEADER` so retrieval keeps context
# on whichever chunk wins.
csv_text = "a,b,c\n1,2,3\n4,5,6\n"
# --- ACT -------------------------------------------------------
# Budget chosen so the three aggregate lines can't all fit under
# TOTALS_HEADER in a single chunk.
out = self._build(csv_text, heading="S", max_tokens=len(TOTALS_HEADER) + 120)
# --- ASSERT ----------------------------------------------------
# Split actually happened.
assert len(out) > 1
# Each chunk carries the full prefix (heading + totals header).
assert all(c.startswith(f"S\n{TOTALS_HEADER}\n") for c in out)
# Collectively, every per-column aggregate and the row count line
# must appear somewhere in the output.
body = "\n".join(out)
assert "Column a: total (sum across all rows) = 5" in body
assert "Column b: total (sum across all rows) = 7" in body
assert "Column c: total (sum across all rows) = 9" in body
assert "Total row count: 2." in body

View File

@@ -29,6 +29,7 @@ from onyx.llm.utils import get_max_input_tokens
VERTEX_OPUS_MODELS_REJECTING_OUTPUT_CONFIG = [
"claude-opus-4-5@20251101",
"claude-opus-4-6",
"claude-opus-4-7",
]

View File

@@ -0,0 +1,110 @@
"""Tests for ``ImageGenerationTool._resolve_reference_image_file_ids``.
The resolver turns the LLM's ``reference_image_file_ids`` argument into a
cleaned list of file IDs to hand to ``_load_reference_images``. It trusts
the LLM's picks — the LLM can only see file IDs that actually appear in
the conversation (via ``[attached image — file_id: <id>]`` tags on user
messages and the JSON returned by prior generate_image calls), so we
don't re-validate against an allow-list in the tool itself.
"""
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from onyx.tools.models import ToolCallException
from onyx.tools.tool_implementations.images.image_generation_tool import (
ImageGenerationTool,
)
from onyx.tools.tool_implementations.images.image_generation_tool import (
REFERENCE_IMAGE_FILE_IDS_FIELD,
)
def _make_tool(
supports_reference_images: bool = True,
max_reference_images: int = 16,
) -> ImageGenerationTool:
"""Construct a tool with a mock provider so no credentials/network are needed."""
with patch(
"onyx.tools.tool_implementations.images.image_generation_tool.get_image_generation_provider"
) as mock_get_provider:
mock_provider = MagicMock()
mock_provider.supports_reference_images = supports_reference_images
mock_provider.max_reference_images = max_reference_images
mock_get_provider.return_value = mock_provider
return ImageGenerationTool(
image_generation_credentials=MagicMock(),
tool_id=1,
emitter=MagicMock(),
model="gpt-image-1",
provider="openai",
)
class TestResolveReferenceImageFileIds:
def test_unset_returns_empty_plain_generation(self) -> None:
tool = _make_tool()
assert tool._resolve_reference_image_file_ids(llm_kwargs={}) == []
def test_empty_list_is_treated_like_unset(self) -> None:
tool = _make_tool()
result = tool._resolve_reference_image_file_ids(
llm_kwargs={REFERENCE_IMAGE_FILE_IDS_FIELD: []},
)
assert result == []
def test_passes_llm_supplied_ids_through(self) -> None:
tool = _make_tool()
result = tool._resolve_reference_image_file_ids(
llm_kwargs={REFERENCE_IMAGE_FILE_IDS_FIELD: ["upload-1", "gen-1"]},
)
# Order preserved — first entry is the primary edit source.
assert result == ["upload-1", "gen-1"]
def test_invalid_shape_raises(self) -> None:
tool = _make_tool()
with pytest.raises(ToolCallException):
tool._resolve_reference_image_file_ids(
llm_kwargs={REFERENCE_IMAGE_FILE_IDS_FIELD: "not-a-list"},
)
def test_non_string_element_raises(self) -> None:
tool = _make_tool()
with pytest.raises(ToolCallException):
tool._resolve_reference_image_file_ids(
llm_kwargs={REFERENCE_IMAGE_FILE_IDS_FIELD: ["ok", 123]},
)
def test_deduplicates_preserving_first_occurrence(self) -> None:
tool = _make_tool()
result = tool._resolve_reference_image_file_ids(
llm_kwargs={REFERENCE_IMAGE_FILE_IDS_FIELD: ["gen-1", "gen-2", "gen-1"]},
)
assert result == ["gen-1", "gen-2"]
def test_strips_whitespace_and_skips_empty_strings(self) -> None:
tool = _make_tool()
result = tool._resolve_reference_image_file_ids(
llm_kwargs={REFERENCE_IMAGE_FILE_IDS_FIELD: [" gen-1 ", "", " "]},
)
assert result == ["gen-1"]
def test_provider_without_reference_support_raises(self) -> None:
tool = _make_tool(supports_reference_images=False)
with pytest.raises(ToolCallException):
tool._resolve_reference_image_file_ids(
llm_kwargs={REFERENCE_IMAGE_FILE_IDS_FIELD: ["gen-1"]},
)
def test_truncates_to_provider_max_preserving_head(self) -> None:
"""When the LLM lists more images than the provider allows, keep the
HEAD of the list (the primary edit source + earliest extras) rather
than the tail, since the LLM put the most important one first."""
tool = _make_tool(max_reference_images=2)
result = tool._resolve_reference_image_file_ids(
llm_kwargs={REFERENCE_IMAGE_FILE_IDS_FIELD: ["a", "b", "c", "d"]},
)
assert result == ["a", "b"]

View File

@@ -1,10 +1,5 @@
from onyx.chat.models import ChatMessageSimple
from onyx.chat.models import ToolCallSimple
from onyx.configs.constants import MessageType
from onyx.server.query_and_chat.placement import Placement
from onyx.tools.models import ToolCallKickoff
from onyx.tools.tool_runner import _extract_image_file_ids_from_tool_response_message
from onyx.tools.tool_runner import _extract_recent_generated_image_file_ids
from onyx.tools.tool_runner import _merge_tool_calls
@@ -312,62 +307,3 @@ class TestMergeToolCalls:
assert len(result) == 1
# String should be converted to list item
assert result[0].tool_args["queries"] == ["single_query", "q2"]
class TestImageHistoryExtraction:
def test_extracts_image_file_ids_from_json_response(self) -> None:
msg = '[{"file_id":"img-1","revised_prompt":"v1"},{"file_id":"img-2","revised_prompt":"v2"}]'
assert _extract_image_file_ids_from_tool_response_message(msg) == [
"img-1",
"img-2",
]
def test_extracts_recent_generated_image_ids_from_history(self) -> None:
history = [
ChatMessageSimple(
message="",
token_count=1,
message_type=MessageType.ASSISTANT,
tool_calls=[
ToolCallSimple(
tool_call_id="call_1",
tool_name="generate_image",
tool_arguments={"prompt": "test"},
token_count=1,
)
],
),
ChatMessageSimple(
message='[{"file_id":"img-1","revised_prompt":"r1"}]',
token_count=1,
message_type=MessageType.TOOL_CALL_RESPONSE,
tool_call_id="call_1",
),
]
assert _extract_recent_generated_image_file_ids(history) == ["img-1"]
def test_ignores_non_image_tool_responses(self) -> None:
history = [
ChatMessageSimple(
message="",
token_count=1,
message_type=MessageType.ASSISTANT,
tool_calls=[
ToolCallSimple(
tool_call_id="call_1",
tool_name="web_search",
tool_arguments={"queries": ["q"]},
token_count=1,
)
],
),
ChatMessageSimple(
message='[{"file_id":"img-1","revised_prompt":"r1"}]',
token_count=1,
message_type=MessageType.TOOL_CALL_RESPONSE,
tool_call_id="call_1",
),
]
assert _extract_recent_generated_image_file_ids(history) == []

View File

@@ -26,7 +26,6 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": " This chart shows how long it takes for Onyx to crawl each source connector and collect the current list of documents. The Y axis represents duration in seconds (bucketed), and each band shows how many enumerations completed within that time range.",
"fieldConfig": {
"defaults": {
"custom": {
@@ -48,6 +47,407 @@
"x": 0,
"y": 0
},
"id": 12,
"options": {
"calculate": false,
"cellGap": 1,
"color": {
"exponent": 0.5,
"fill": "dark-orange",
"mode": "scheme",
"reverse": false,
"scale": "exponential",
"scheme": "Oranges",
"steps": 64
},
"exemplars": {
"color": "rgba(255,0,255,0.7)"
},
"filterValues": {
"le": 1e-9
},
"legend": {
"show": true
},
"rowsFrame": {
"layout": "auto"
},
"tooltip": {
"mode": "single",
"showColorScale": false,
"yHistogram": false
},
"yAxis": {
"axisPlacement": "left",
"reverse": false,
"unit": "s"
}
},
"pluginVersion": "10.4.1",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "sum(increase(onyx_celery_task_queue_wait_seconds_bucket{queue=\"connector_pruning\"}[30m])) by (le)",
"format": "heatmap",
"fullMetaSearch": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "Pruning Task Queue Waiting Time",
"type": "heatmap"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Shows the 95th percentile execution duration of pruning tasks. A rising p95 indicates pruning jobs are taking longer over time, potentially approaching the 6-hour timeout limit. Sustained values near 21600s (6 hours) indicate connectors with too many documents to prune within the allowed window.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"id": 5,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.95, sum(rate(onyx_celery_task_duration_seconds_bucket{task_name=~\"connector_pruning.*\"}[1h])) by (le, task_name))",
"instant": false,
"legendFormat": "{{task_name}}",
"range": true,
"refId": "A"
}
],
"title": "Pruning Task Duration p95",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Shows the rate of pruning task failures and revocations per hour. Failures indicate crashed tasks (DB errors, timeouts). Revocations indicate cancelled tasks, typically from worker restarts or deployments. Both result in orphaned fences that block future pruning attempts for affected connectors.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
},
"id": 9,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum(increase(onyx_celery_task_revoked_total{task_name=~\"connector_pruning.*\"}[1h])) by (task_name)",
"hide": false,
"instant": false,
"legendFormat": "revoked",
"range": true,
"refId": "B"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum(increase(onyx_celery_task_completed_total{task_name=~\"connector_pruning.*\", outcome=\"failure\"}[1h])) by (task_name)",
"hide": false,
"instant": false,
"legendFormat": "failure",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum(increase(onyx_celery_task_completed_total{task_name=~\"connector_pruning.*\", outcome=\"success\"}[1h])) by (task_name)",
"hide": false,
"instant": false,
"legendFormat": "success",
"range": true,
"refId": "C"
}
],
"title": "Pruning Task Success & Failures & Revocations",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Shows the ratio of successfully completed pruning tasks to total completed tasks. A value of 1.0 (100%) means all pruning jobs are completing cleanly. A drop indicates tasks are crashing or timing out, which leads to orphaned fences and connectors being blocked from future pruning attempts.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 8
},
"id": 8,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": " sum(rate(onyx_celery_task_completed_total{task_name=~\"connector_pruning.*\", outcome=\"success\"}[1h]))\n /\n sum(rate(onyx_celery_task_completed_total{task_name=~\"connector_pruning.*\"}[1h]))",
"instant": false,
"legendFormat": "Success Rate",
"range": true,
"refId": "A"
}
],
"title": "Pruning Task Success Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": " This chart shows how long it takes for Onyx to crawl each source connector and collect the current list of documents. The Y axis represents duration in seconds (bucketed), and each band shows how many enumerations completed within that time range.",
"fieldConfig": {
"defaults": {
"custom": {
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"scaleDistribution": {
"type": "linear"
}
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 16
},
"id": 1,
"options": {
"calculate": false,
@@ -166,7 +566,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 0
"y": 16
},
"id": 7,
"options": {
@@ -262,7 +662,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 8
"y": 24
},
"id": 6,
"options": {
@@ -294,223 +694,6 @@
"title": "Pruning Enumeration Count",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Shows the 95th percentile execution duration of pruning tasks. A rising p95 indicates pruning jobs are taking longer over time, potentially approaching the 6-hour timeout limit. Sustained values near 21600s (6 hours) indicate connectors with too many documents to prune within the allowed window.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 8
},
"id": 5,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.95, sum(rate(onyx_celery_task_duration_seconds_bucket{task_name=~\"connector_pruning.*\"}[1h])) by (le, task_name))",
"instant": false,
"legendFormat": "{{task_name}}",
"range": true,
"refId": "A"
}
],
"title": "Pruning Task Duration p95",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Shows the number of currently executing pruning tasks on the heavy worker, broken down by task type. A value of 0 means no pruning is actively running. A sustained high count may indicate workers are saturated and new pruning jobs are queuing up.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"__systemRef": "hideSeriesFrom",
"matcher": {
"id": "byNames",
"options": {
"mode": "exclude",
"names": [
"connector_pruning_generator_task"
],
"prefix": "All except:",
"readOnly": true
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": false,
"tooltip": false,
"viz": true
}
}
]
}
]
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 16
},
"id": 4,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum(onyx_celery_tasks_active{queue=~\"connector_pruning.*|connector_doc_permissions.*|connector_external_group.*|csv_generation|sandbox\"}) by (task_name)",
"instant": false,
"legendFormat": "{{task_name}}",
"range": true,
"refId": "A"
}
],
"title": "Heavy Worker - Active Tasks",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
@@ -575,7 +758,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 16
"y": 24
},
"id": 3,
"options": {
@@ -612,7 +795,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Shows the rate of pruning task failures and revocations per hour. Failures indicate crashed tasks (DB errors, timeouts). Revocations indicate cancelled tasks, typically from worker restarts or deployments. Both result in orphaned fences that block future pruning attempts for affected connectors.",
"description": "Depth of queues that go heavy worker",
"fieldConfig": {
"defaults": {
"color": {
@@ -671,9 +854,9 @@
"h": 8,
"w": 12,
"x": 0,
"y": 24
"y": 32
},
"id": 9,
"id": 10,
"options": {
"legend": {
"calcs": [],
@@ -693,41 +876,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum(increase(onyx_celery_task_revoked_total{task_name=~\"connector_pruning.*\"}[1h])) by (task_name)",
"hide": false,
"expr": "sum by (queue) (onyx_queue_depth{queue=~\"connector_pruning|external_group_sync|permissions_sync\"})",
"instant": false,
"legendFormat": "revoked",
"range": true,
"refId": "B"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum(increase(onyx_celery_task_completed_total{task_name=~\"connector_pruning.*\", outcome=\"failure\"}[1h])) by (task_name)",
"hide": false,
"instant": false,
"legendFormat": "failure",
"legendFormat": "{{queue}}",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum(increase(onyx_celery_task_completed_total{task_name=~\"connector_pruning.*\", outcome=\"success\"}[1h])) by (task_name)",
"hide": false,
"instant": false,
"legendFormat": "success",
"range": true,
"refId": "C"
}
],
"title": "Heavy Worker - Pruning Task Success & Failures & Revocations",
"title": "Heavy Worker Queues Depth",
"type": "timeseries"
},
{
@@ -735,7 +891,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Shows the ratio of successfully completed pruning tasks to total completed tasks. A value of 1.0 (100%) means all pruning jobs are completing cleanly. A drop indicates tasks are crashing or timing out, which leads to orphaned fences and connectors being blocked from future pruning attempts.",
"description": "Shows the number of currently executing pruning tasks on the heavy worker, broken down by task type. A value of 0 means no pruning is actively running. A sustained high count may indicate workers are saturated and new pruning jobs are queuing up.",
"fieldConfig": {
"defaults": {
"color": {
@@ -794,9 +950,9 @@
"h": 8,
"w": 12,
"x": 12,
"y": 24
"y": 32
},
"id": 8,
"id": 4,
"options": {
"legend": {
"calcs": [],
@@ -816,14 +972,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": " sum(rate(onyx_celery_task_completed_total{task_name=~\"connector_pruning.*\", outcome=\"success\"}[1h]))\n /\n sum(rate(onyx_celery_task_completed_total{task_name=~\"connector_pruning.*\"}[1h]))",
"expr": "sum(onyx_celery_tasks_active{queue=~\"connector_pruning.*|connector_doc_permissions.*|connector_external_group.*|csv_generation|sandbox\"}) by (task_name)",
"instant": false,
"legendFormat": "Success Rate",
"legendFormat": "{{task_name}}",
"range": true,
"refId": "A"
}
],
"title": "Heavy Worker - Pruning Task Success Rate",
"title": "Heavy Worker - Active Tasks",
"type": "timeseries"
},
{
@@ -851,7 +1007,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 32
"y": 40
},
"id": 2,
"options": {
@@ -915,13 +1071,13 @@
"list": []
},
"time": {
"from": "now-6h",
"from": "now-24h",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "Indexing - Pruning",
"uid": "onyx-indexing-pruning",
"version": 10,
"version": 14,
"weekStart": ""
}

View File

@@ -184,6 +184,7 @@ export { default as SvgUserSpeaker } from "@opal/icons/user-speaker";
export { default as SvgUserSync } from "@opal/icons/user-sync";
export { default as SvgUserX } from "@opal/icons/user-x";
export { default as SvgUsers } from "@opal/icons/users";
export { default as SvgVector } from "@opal/icons/vector";
export { default as SvgVolume } from "@opal/icons/volume";
export { default as SvgVolumeOff } from "@opal/icons/volume-off";
export { default as SvgWallet } from "@opal/icons/wallet";

View File

@@ -0,0 +1,20 @@
import type { IconProps } from "@opal/types";
const SvgVector = ({ size, ...props }: IconProps) => (
<svg
width={size}
height={size}
viewBox="0 0 16 16"
fill="none"
xmlns="http://www.w3.org/2000/svg"
stroke="currentColor"
{...props}
>
<path
d="M8 2L6 4M8 2L8 9M8 2L10 4M8 9L14.0622 12.5M8 9L1.93782 12.5M14.0622 12.5L11.3301 13.232M14.0622 12.5L13.3301 9.76794M1.93782 12.5L4.66987 13.2321M1.93782 12.5L2.66987 9.76795"
strokeWidth={1.5}
strokeLinecap="round"
strokeLinejoin="round"
/>
</svg>
);
export default SvgVector;

View File

@@ -34,7 +34,8 @@ export const PROVIDERS: ProviderConfig[] = [
providerName: LLMProviderName.ANTHROPIC,
recommended: true,
models: [
{ name: "claude-opus-4-6", label: "Claude Opus 4.6", recommended: true },
{ name: "claude-opus-4-7", label: "Claude Opus 4.7", recommended: true },
{ name: "claude-opus-4-6", label: "Claude Opus 4.6" },
{ name: "claude-sonnet-4-6", label: "Claude Sonnet 4.6" },
],
apiKeyPlaceholder: "sk-ant-...",

View File

@@ -5,12 +5,12 @@
export interface BuildLlmSelection {
providerName: string; // e.g., "build-mode-anthropic" (LLMProviderDescriptor.name)
provider: string; // e.g., "anthropic"
modelName: string; // e.g., "claude-opus-4-6"
modelName: string; // e.g., "claude-opus-4-7"
}
// Priority order for smart default LLM selection
const LLM_SELECTION_PRIORITY = [
{ provider: "anthropic", modelName: "claude-opus-4-6" },
{ provider: "anthropic", modelName: "claude-opus-4-7" },
{ provider: "openai", modelName: "gpt-5.2" },
{ provider: "openrouter", modelName: "minimax/minimax-m2.1" },
] as const;
@@ -63,10 +63,11 @@ export function getDefaultLlmSelection(
export const RECOMMENDED_BUILD_MODELS = {
preferred: {
provider: "anthropic",
modelName: "claude-opus-4-6",
displayName: "Claude Opus 4.6",
modelName: "claude-opus-4-7",
displayName: "Claude Opus 4.7",
},
alternatives: [
{ provider: "anthropic", modelName: "claude-opus-4-6" },
{ provider: "anthropic", modelName: "claude-sonnet-4-6" },
{ provider: "openai", modelName: "gpt-5.2" },
{ provider: "openai", modelName: "gpt-5.1-codex" },
@@ -148,7 +149,8 @@ export const BUILD_MODE_PROVIDERS: BuildModeProvider[] = [
providerName: "anthropic",
recommended: true,
models: [
{ name: "claude-opus-4-6", label: "Claude Opus 4.6", recommended: true },
{ name: "claude-opus-4-7", label: "Claude Opus 4.7", recommended: true },
{ name: "claude-opus-4-6", label: "Claude Opus 4.6" },
{ name: "claude-sonnet-4-6", label: "Claude Sonnet 4.6" },
],
apiKeyPlaceholder: "sk-ant-...",

View File

@@ -3,7 +3,7 @@
import { ValidSources } from "@/lib/types";
import { SourceIcon } from "./SourceIcon";
import { useState } from "react";
import { OnyxIcon } from "./icons/icons";
import { GithubIcon, OnyxIcon } from "./icons/icons";
export function WebResultIcon({
url,
@@ -23,6 +23,8 @@ export function WebResultIcon({
<>
{hostname.includes("onyx.app") ? (
<OnyxIcon size={size} className="dark:text-[#fff] text-[#000]" />
) : hostname === "github.com" || hostname.endsWith(".github.com") ? (
<GithubIcon size={size} />
) : !error ? (
<img
className="my-0 rounded-full py-0"

View File

@@ -46,6 +46,7 @@ import freshdeskIcon from "@public/Freshdesk.png";
import geminiSVG from "@public/Gemini.svg";
import gitbookDarkIcon from "@public/GitBookDark.png";
import gitbookLightIcon from "@public/GitBookLight.png";
import githubDarkIcon from "@public/GithubDarkMode.png";
import githubLightIcon from "@public/Github.png";
import gongIcon from "@public/Gong.png";
import googleIcon from "@public/Google.png";
@@ -855,7 +856,7 @@ export const GitbookIcon = createLogoIcon(gitbookDarkIcon, {
darkSrc: gitbookLightIcon,
});
export const GithubIcon = createLogoIcon(githubLightIcon, {
monochromatic: true,
darkSrc: githubDarkIcon,
});
export const GitlabIcon = createLogoIcon(gitlabIcon);
export const GmailIcon = createLogoIcon(gmailIcon);

View File

@@ -35,7 +35,6 @@ for (const theme of THEMES) {
await expectScreenshot(page, {
name: `welcome-${theme}-full-page`,
hide: ['[data-testid="onyx-logo"]'], // greeting text is random, hide to prevent size variation
});
});

View File

@@ -31,6 +31,7 @@ const DEFAULT_MASK_SELECTORS: string[] = [
*/
const DEFAULT_HIDE_SELECTORS: string[] = [
'[data-testid="toast-container"]',
'[data-testid="onyx-logo"] p', // greeting text is random, hide to prevent size variation
// TODO: Remove once it loads consistently.
'[data-testid="actions-container"]',
];