Compare commits

..

49 Commits

Author SHA1 Message Date
Weves
ca3db17b08 add restart 2025-12-17 12:48:46 -08:00
Weves
ffd13b1104 dump scripts 2025-12-17 12:48:46 -08:00
Wenxi
1caa860f8e fix(file upload): properly convert and process files uploaded directly to chat (#6815)
Co-authored-by: _htz_ <100520465+1htz2@users.noreply.github.com>
2025-12-17 12:38:14 -08:00
trial-danswer
7181cc41af feat: adding support for SearXNG as an option for web search. It operates a… (#6653)
Co-authored-by: Weves <chrisweaver101@gmail.com>
2025-12-17 12:27:19 -08:00
Chris Weaver
959b8c320d fix: don't leave redis ports exposed (#6814) 2025-12-17 12:06:10 -08:00
roshan
96fd0432ff fix(tool): default tool descriptions assistant -> agent (#6788)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2025-12-17 19:12:17 +00:00
Jamison Lahman
4c73a03f57 chore(fe): followups to 7f79e34aa (#6808) 2025-12-17 18:36:31 +00:00
Raunak Bhagat
e57713e376 fix: Clean up DocumentsSidebar (#6805) 2025-12-17 09:00:14 -08:00
Jamison Lahman
21ea320323 fix(style): standardize projects page layout (#6807) 2025-12-17 01:11:09 -08:00
Jamison Lahman
bac9c48e53 fix(style): "More Agents" page is responsive (#6806) 2025-12-17 01:01:13 -08:00
roshan
7f79e34aa4 fix(projects): add special logic for internal search tool when no connectors available (#6774)
Co-authored-by: Yuhong Sun <yuhongsun96@gmail.com>
2025-12-17 06:45:03 +00:00
Jamison Lahman
f1a81d45a1 chore(fe): popover component uses z-index.css (#6804) 2025-12-16 23:07:31 -08:00
Jamison Lahman
285755a540 chore(pre-commit): fix uv.lock after filelock "upgrade" (#6803) 2025-12-16 22:16:19 -08:00
Justin Tahara
89003ad2d8 chore(tf): Update VPC calling (#6798) 2025-12-17 05:38:50 +00:00
Yuhong Sun
9f93f97259 feat(vectordb): New Document Index Interface (#5700) 2025-12-17 03:28:02 +00:00
Yuhong Sun
f702eebbe7 chore: some readme updates (#6802) 2025-12-16 19:53:23 -08:00
Yuhong Sun
8487e1856b feat: Deep Research first couple stages (#6801) 2025-12-16 19:40:54 -08:00
acaprau
a36445f840 fix(devtools): restart_containers.sh should source venv before running alembic (#6795) 2025-12-17 02:33:21 +00:00
roshan
7f30293b0e chore: improved error handling and display for agent failure types (#6784) 2025-12-17 02:30:24 +00:00
acaprau
619d9528b4 fix(devtools): CLAUDE.md.template makes reference to a venv that does not exist (#6796) 2025-12-17 02:29:47 +00:00
Yuhong Sun
6f83c669e7 feat: enable skip clarification (#6797) 2025-12-16 18:25:15 -08:00
Chris Weaver
c3e5f48cb4 fix: horrible typo in README (#6793) 2025-12-16 17:05:57 -08:00
Justin Tahara
fdf8fe391c fix(ui): Search Settings Active Only (#6657) 2025-12-16 17:00:06 -08:00
Raunak Bhagat
f1d6bb9e02 refactor: Transfer all icons to @opal/icons (#6755) 2025-12-17 00:16:44 +00:00
Justin Tahara
9a64a717dc fix(users): User Groups Race Condition (#6710) 2025-12-17 00:11:07 +00:00
Raunak Bhagat
aa0f475e01 refactor: Add new z-indexing file (#6789) 2025-12-16 23:56:13 +00:00
Nikolas Garza
75238dc353 fix: attach user credentials to assistant requests (#6785) 2025-12-16 23:15:31 +00:00
Nikolas Garza
9e19803244 chore: bump fallback max token limit to 32k (#6787) 2025-12-16 23:09:47 +00:00
dependabot[bot]
5cabd32638 chore(deps): Bump filelock from 3.15.4 to 3.20.1 in /backend/requirements (#6781)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-16 22:36:09 +00:00
Justin Tahara
4ccd88c331 fix(confluence): Skip attachments gracefully (#6769) 2025-12-16 22:34:16 +00:00
Justin Tahara
5a80b98320 feat(cleanup): No Bastion Setup (#6562) 2025-12-16 14:51:05 -08:00
Jamison Lahman
ff109d9f5c chore(style): fix chat page scrollbar after padding change (#6780) 2025-12-16 22:08:12 +00:00
Justin Tahara
4cc276aca9 fix(helm): Add Update Strategy (#6782) 2025-12-16 14:19:20 -08:00
Jamison Lahman
29f0df2c93 fix(style): increase tooltip z-index (#6778) 2025-12-16 21:30:19 +00:00
Nikolas Garza
e2edcf0e0b fix: improve ux for fed slack config error handling (#6699) 2025-12-16 21:23:11 +00:00
Chris Weaver
9396fc547d fix: confluence params (#6773) 2025-12-16 20:53:39 +00:00
Jamison Lahman
c089903aad fix: chat page overflow on small screens (#6723) 2025-12-16 13:03:07 -08:00
Chris Weaver
95471f64e9 fix: main chat page w/ overridden app name (#6775) 2025-12-16 12:56:15 -08:00
Jamison Lahman
13c1619d01 fix(style): center-ish align chat icon on small screen (#6727) 2025-12-16 20:10:09 +00:00
Justin Tahara
ddb5068847 fix(helm): Redis Operator Name (#6770) 2025-12-16 20:07:00 +00:00
Nikolas Garza
81a4f654c2 fix: scrollable container height for popover.tsx (#6772) 2025-12-16 20:04:33 +00:00
Jamison Lahman
9393c56a21 fix: remove unnecessary chat display tabindex (#6722) 2025-12-16 20:00:01 +00:00
Nikolas Garza
1ee96ff99c fix(llm): fix custom provider detection and model filtering (#6766) 2025-12-16 19:14:38 +00:00
Jamison Lahman
6bb00d2c6b chore(gha): run connector tests when uv.lock changes (#6768) 2025-12-16 18:44:06 +00:00
Wenxi
d9cc923c6a fix(hubspot): api client and urllib conflict (#6765) 2025-12-16 18:35:24 +00:00
Evan Lohn
bfbba0f036 chore: gpt 5.2 model naming (#6754) 2025-12-16 10:38:29 -08:00
Wenxi
ccf6911f97 chore: alembic readme nit (#6767) 2025-12-16 10:20:50 -08:00
Wenxi
15c9c2ba8e fix(llms): only save model configs for active/usable LLMs (#6758) 2025-12-16 17:54:47 +00:00
Wenxi
8b3fedf480 fix(web search): clamp google pse max results to api max (#6764) 2025-12-16 09:47:56 -08:00
460 changed files with 7386 additions and 2160 deletions

View File

@@ -18,7 +18,6 @@ jobs:
determine-builds:
# NOTE: Github-hosted runners have about 20s faster queue times and are preferred here.
runs-on: ubuntu-slim
environment: deployment
timeout-minutes: 90
outputs:
build-web: ${{ steps.check.outputs.build-web }}

View File

@@ -5,8 +5,7 @@ concurrency:
on:
merge_group:
pull_request_target:
types: [opened, synchronize, reopened]
pull_request:
branches:
- main
- "release/**"
@@ -39,7 +38,6 @@ jobs:
discover-test-dirs:
# NOTE: Github-hosted runners have about 20s faster queue times and are preferred here.
runs-on: ubuntu-slim
environment: ci-protected
timeout-minutes: 45
outputs:
test-dirs: ${{ steps.set-matrix.outputs.test-dirs }}
@@ -47,7 +45,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
persist-credentials: false
- name: Discover test directories
@@ -73,14 +70,12 @@ jobs:
build-backend-image:
runs-on: [runs-on, runner=1cpu-linux-arm64, "run-id=${{ github.run_id }}-build-backend-image", "extras=ecr-cache"]
environment: ci-protected
timeout-minutes: 45
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Checkout code
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
persist-credentials: false
- name: Format branch name for cache
@@ -130,14 +125,12 @@ jobs:
build-model-server-image:
runs-on: [runs-on, runner=1cpu-linux-arm64, "run-id=${{ github.run_id }}-build-model-server-image", "extras=ecr-cache"]
environment: ci-protected
timeout-minutes: 45
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Checkout code
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
persist-credentials: false
- name: Format branch name for cache
@@ -186,14 +179,12 @@ jobs:
build-integration-image:
runs-on: [runs-on, runner=2cpu-linux-arm64, "run-id=${{ github.run_id }}-build-integration-image", "extras=ecr-cache"]
environment: ci-protected
timeout-minutes: 45
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
- name: Checkout code
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
persist-credentials: false
- name: Set up Docker Buildx
@@ -270,7 +261,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
persist-credentials: false
# needed for pulling Vespa, Redis, Postgres, and Minio images
@@ -453,7 +443,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
persist-credentials: false
- name: Login to Docker Hub

View File

@@ -4,9 +4,6 @@ concurrency:
cancel-in-progress: true
on:
merge_group:
pull_request_target:
types: [opened, synchronize, reopened]
push:
permissions:
@@ -51,7 +48,6 @@ env:
jobs:
build-web-image:
runs-on: [runs-on, runner=4cpu-linux-arm64, "run-id=${{ github.run_id }}-build-web-image", "extras=ecr-cache"]
environment: ci-protected
timeout-minutes: 45
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
@@ -59,7 +55,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
persist-credentials: false
- name: Format branch name for cache
@@ -108,7 +103,6 @@ jobs:
build-backend-image:
runs-on: [runs-on, runner=1cpu-linux-arm64, "run-id=${{ github.run_id }}-build-backend-image", "extras=ecr-cache"]
environment: ci-protected
timeout-minutes: 45
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
@@ -116,7 +110,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
persist-credentials: false
- name: Format branch name for cache
@@ -165,7 +158,6 @@ jobs:
build-model-server-image:
runs-on: [runs-on, runner=1cpu-linux-arm64, "run-id=${{ github.run_id }}-build-model-server-image", "extras=ecr-cache"]
environment: ci-protected
timeout-minutes: 45
steps:
- uses: runs-on/action@cd2b598b0515d39d78c38a02d529db87d2196d1e # ratchet:runs-on/action@v2
@@ -173,7 +165,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
persist-credentials: false
- name: Format branch name for cache
@@ -240,7 +231,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
fetch-depth: 0
persist-credentials: false

View File

@@ -5,8 +5,7 @@ concurrency:
on:
merge_group:
pull_request_target:
types: [opened, synchronize, reopened]
pull_request:
branches: [main]
push:
tags:
@@ -130,7 +129,6 @@ jobs:
connectors-check:
# See https://runs-on.com/runners/linux/
runs-on: [runs-on, runner=8cpu-linux-x64, "run-id=${{ github.run_id }}-connectors-check", "extras=s3-cache"]
environment: ci-protected
timeout-minutes: 45
env:
@@ -143,7 +141,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # ratchet:actions/checkout@v6
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
persist-credentials: false
- name: Setup Python and Install Dependencies
@@ -164,14 +161,18 @@ jobs:
hubspot:
- 'backend/onyx/connectors/hubspot/**'
- 'backend/tests/daily/connectors/hubspot/**'
- 'uv.lock'
salesforce:
- 'backend/onyx/connectors/salesforce/**'
- 'backend/tests/daily/connectors/salesforce/**'
- 'uv.lock'
github:
- 'backend/onyx/connectors/github/**'
- 'backend/tests/daily/connectors/github/**'
- 'uv.lock'
file_processing:
- 'backend/onyx/file_processing/**'
- 'uv.lock'
- name: Run Tests (excluding HubSpot, Salesforce, GitHub, and Coda)
shell: script -q -e -c "bash --noprofile --norc -eo pipefail {0}"

View File

@@ -19,19 +19,19 @@ repos:
- id: uv-export
name: uv-export default.txt
args: ["--no-emit-project", "--no-default-groups", "--no-hashes", "--extra", "backend", "-o", "backend/requirements/default.txt"]
files: ^(pyproject\.toml|uv\.lock)$
files: ^(pyproject\.toml|uv\.lock|backend/requirements/.*\.txt)$
- id: uv-export
name: uv-export dev.txt
args: ["--no-emit-project", "--no-default-groups", "--no-hashes", "--extra", "dev", "-o", "backend/requirements/dev.txt"]
files: ^(pyproject\.toml|uv\.lock)$
files: ^(pyproject\.toml|uv\.lock|backend/requirements/.*\.txt)$
- id: uv-export
name: uv-export ee.txt
args: ["--no-emit-project", "--no-default-groups", "--no-hashes", "--extra", "ee", "-o", "backend/requirements/ee.txt"]
files: ^(pyproject\.toml|uv\.lock)$
files: ^(pyproject\.toml|uv\.lock|backend/requirements/.*\.txt)$
- id: uv-export
name: uv-export model_server.txt
args: ["--no-emit-project", "--no-default-groups", "--no-hashes", "--extra", "model_server", "-o", "backend/requirements/model_server.txt"]
files: ^(pyproject\.toml|uv\.lock)$
files: ^(pyproject\.toml|uv\.lock|backend/requirements/.*\.txt)$
# NOTE: This takes ~6s on a single, large module which is prohibitively slow.
# - id: uv-run
# name: mypy

View File

@@ -508,7 +508,6 @@
],
"cwd": "${workspaceFolder}",
"console": "integratedTerminal",
"stopOnEntry": true,
"presentation": {
"group": "3"
}

View File

@@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## KEY NOTES
- If you run into any missing python dependency errors, try running your command with `source backend/.venv/bin/activate` \
- If you run into any missing python dependency errors, try running your command with `source .venv/bin/activate` \
to assume the python venv.
- To make tests work, check the `.env` file at the root of the project to find an OpenAI key.
- If using `playwright` to explore the frontend, you can usually log in with username `a@test.com` and password

View File

@@ -7,8 +7,12 @@ Onyx migrations use a generic single-database configuration with an async dbapi.
## To generate new migrations:
run from onyx/backend:
`alembic revision --autogenerate -m <DESCRIPTION_OF_MIGRATION>`
From onyx/backend, run:
`alembic revision -m <DESCRIPTION_OF_MIGRATION>`
Note: you cannot use the `--autogenerate` flag as the automatic schema parsing does not work.
Manually populate the upgrade and downgrade in your new migration.
More info can be found here: https://alembic.sqlalchemy.org/en/latest/autogenerate.html

View File

@@ -0,0 +1,29 @@
"""add is_clarification to chat_message
Revision ID: 18b5b2524446
Revises: 87c52ec39f84
Create Date: 2025-01-16
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "18b5b2524446"
down_revision = "87c52ec39f84"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"chat_message",
sa.Column(
"is_clarification", sa.Boolean(), nullable=False, server_default="false"
),
)
def downgrade() -> None:
op.drop_column("chat_message", "is_clarification")

View File

@@ -0,0 +1,62 @@
"""update_default_tool_descriptions
Revision ID: a01bf2971c5d
Revises: 87c52ec39f84
Create Date: 2025-12-16 15:21:25.656375
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "a01bf2971c5d"
down_revision = "18b5b2524446"
branch_labels = None
depends_on = None
# new tool descriptions (12/2025)
TOOL_DESCRIPTIONS = {
"SearchTool": "The Search Action allows the agent to search through connected knowledge to help build an answer.",
"ImageGenerationTool": (
"The Image Generation Action allows the agent to use DALL-E 3 or GPT-IMAGE-1 to generate images. "
"The action will be used when the user asks the agent to generate an image."
),
"WebSearchTool": (
"The Web Search Action allows the agent "
"to perform internet searches for up-to-date information."
),
"KnowledgeGraphTool": (
"The Knowledge Graph Search Action allows the agent to search the "
"Knowledge Graph for information. This tool can (for now) only be active in the KG Beta Agent, "
"and it requires the Knowledge Graph to be enabled."
),
"OktaProfileTool": (
"The Okta Profile Action allows the agent to fetch the current user's information from Okta. "
"This may include the user's name, email, phone number, address, and other details such as their "
"manager and direct reports."
),
}
def upgrade() -> None:
conn = op.get_bind()
conn.execute(sa.text("BEGIN"))
try:
for tool_id, description in TOOL_DESCRIPTIONS.items():
conn.execute(
sa.text(
"UPDATE tool SET description = :description WHERE in_code_tool_id = :tool_id"
),
{"description": description, "tool_id": tool_id},
)
conn.execute(sa.text("COMMIT"))
except Exception as e:
conn.execute(sa.text("ROLLBACK"))
raise e
def downgrade() -> None:
pass

View File

@@ -8,6 +8,7 @@ from sqlalchemy import func
from sqlalchemy import Select
from sqlalchemy import select
from sqlalchemy import update
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from ee.onyx.server.user_group.models import SetCuratorRequest
@@ -362,14 +363,29 @@ def _check_user_group_is_modifiable(user_group: UserGroup) -> None:
def _add_user__user_group_relationships__no_commit(
db_session: Session, user_group_id: int, user_ids: list[UUID]
) -> list[User__UserGroup]:
"""NOTE: does not commit the transaction."""
relationships = [
User__UserGroup(user_id=user_id, user_group_id=user_group_id)
for user_id in user_ids
]
db_session.add_all(relationships)
return relationships
) -> None:
"""NOTE: does not commit the transaction.
This function is idempotent - it will skip users who are already in the group
to avoid duplicate key violations during concurrent operations or re-syncs.
Uses ON CONFLICT DO NOTHING to keep inserts atomic under concurrency.
"""
if not user_ids:
return
insert_stmt = (
insert(User__UserGroup)
.values(
[
{"user_id": user_id, "user_group_id": user_group_id}
for user_id in user_ids
]
)
.on_conflict_do_nothing(
index_elements=[User__UserGroup.user_group_id, User__UserGroup.user_id]
)
)
db_session.execute(insert_stmt)
def _add_user_group__cc_pair_relationships__no_commit(

View File

@@ -105,52 +105,49 @@ S, U1, TC, TR, R -- agent calls another tool -> S, U1, TC, TR, TC, TR, R, A1
- Reminder moved to the end
```
## Product considerations
Project files are important to the entire duration of the chat session. If the user has uploaded project files, they are likely very intent on working with
those files. The LLM is much better at referencing documents close to the end of the context window so keeping it there for ease of access.
User uploaded files are considered relevant for that point in time, it is ok if the Agent forgets about it as the chat gets long. If every uploaded file is
constantly moved towards the end of the chat, it would degrade quality as these stack up. Even with a single file, there is some cost of making the previous
User Message further away. This tradeoff is accepted for Projects because of the intent of the feature.
Reminder are absolutely necessary to ensure 1-2 specific instructions get followed with a very high probability. It is less detailed than the system prompt
and should be very targetted for it to work reliably and also not interfere with the last user message.
## Reasons / Experiments
Custom Agent instructions being placed in the system prompt is poorly followed. It also degrade performance of the system especially when the instructions
are orthogonal (or even possibly contradictory) to the system prompt. For weaker models, it causes strange artifacts in tool calls and final responses
that completely ruins the user experience. Empirically, this way works better across a range of models especially when the history gets longer.
Having the Custom Agent instructions not move means it fades more as the chat gets long which is also not ok from a UX perspective.
Project files are important to the entire duration of the chat session. If the user has uploaded project files, they are likely very intent on working with
those files. The LLM is much better at referencing documents close to the end of the context window so keeping it there for ease of access.
Reminder are absolutely necessary to ensure 1-2 specific instructions get followed with a very high probability. It is less detailed than the system prompt
and should be very targetted for it to work reliably.
User uploaded files are considered relevant for that point in time, it is ok if the Agent forgets about it as the chat gets long. If every uploaded file is
constantly moved towards the end of the chat, it would degrade quality as these stack up. Even with a single file, there is some cost of making the previous
User Message further away. This tradeoff is accepted for Projects because of the intent of the feature.
## Other related pointers
- How messages, files, images are stored can be found in db/models.py
# Appendix (just random tidbits for those interested)
- Reminder messages are placed at the end of the prompt because all model fine tuning approaches cause the LLMs to attend very strongly to the tokens at the very
back of the context closest to generation. This is the only way to get the LLMs to not miss critical information and for the product to be reliable. Specifically
the built-in reminders are around citations and what tools it should call in certain situations.
- LLMs are able to handle changes in topic best at message boundaries. There are special tokens under the hood for this. We also use this property to slice up
the history in the way presented above.
- Different LLMs vary in this but some now have a section that cannot be set via the API layer called the "System Prompt" (OpenAI terminology) which contains
Different LLMs vary in this but some now have a section that cannot be set via the API layer called the "System Prompt" (OpenAI terminology) which contains
information like the model cutoff date, identity, and some other basic non-changing information. The System prompt described above is in that convention called
the "Developer Prompt". It seems the distribution of the System Prompt, by which I mean the style of wording and terms used can also affect the behavior. This
is different between different models and not necessarily scientific so the system prompt is built from an exploration across different models. It currently
starts with: "You are a highly capable, thoughtful, and precise assistant. Your goal is to deeply understand the user's intent..."
- The document json includes a field for the LLM to cite (it's a single number) to make citations reliable and avoid weird artifacts. It's called "document" so
LLMs are able to handle changes in topic best at message boundaries. There are special tokens under the hood for this. We also use this property to slice up
the history in the way presented above.
Reminder messages are placed at the end of the prompt because all model fine tuning approaches cause the LLMs to attend very strongly to the tokens at the very
back of the context closest to generation. This is the only way to get the LLMs to not miss critical information and for the product to be reliable. Specifically
the built-in reminders are around citations and what tools it should call in certain situations.
The document json includes a field for the LLM to cite (it's a single number) to make citations reliable and avoid weird artifacts. It's called "document" so
that the LLM does not create weird artifacts in reasoning like "I should reference citation_id: 5 for...". It is also strategically placed so that it is easy to
reference. It is followed by a couple short sections like the metadata and title before the long content section. It seems LLMs are still better at local
attention despite having global access.
- In a similar concept, LLM instructions in the system prompt are structured specifically so that there are coherent sections for the LLM to attend to. This is
In a similar concept, LLM instructions in the system prompt are structured specifically so that there are coherent sections for the LLM to attend to. This is
fairly surprising actually but if there is a line of instructions effectively saying "If you try to use some tools and find that you need more information or
need to call additional tools, you are encouraged to do this", having this in the Tool section of the System prompt makes all the LLMs follow it well but if it's
even just a paragraph away like near the beginning of the prompt, it is often often ignored. The difference is as drastic as a 30% follow rate to a 90% follow
rate even just moving the same statement a few sentences.
- Custom Agent prompts are also completely separate from the system prompt. Having potentially orthogonal instructions in the system prompt (both the actual
instructions and the writing style) can greatly deteriorate the quality of the responses. There is also a product motivation to keep it close to the end of
generation so it's strongly followed.
## Other related pointers
- How messages, files, images are stored can be found in backend/onyx/db/models.py, there is also a README.md under that directory that may be helpful.

View File

@@ -26,6 +26,8 @@ class ChatStateContainer:
self.answer_tokens: str | None = None
# Store citation mapping for building citation_docs_info during partial saves
self.citation_to_doc: dict[int, SearchDoc] = {}
# True if this turn is a clarification question (deep research flow)
self.is_clarification: bool = False
def add_tool_call(self, tool_call: ToolCallInfo) -> None:
"""Add a tool call to the accumulated state."""
@@ -43,6 +45,10 @@ class ChatStateContainer:
"""Set the citation mapping from citation processor."""
self.citation_to_doc = citation_to_doc
def set_is_clarification(self, is_clarification: bool) -> None:
"""Set whether this turn is a clarification question."""
self.is_clarification = is_clarification
def run_chat_llm_with_state_containers(
func: Callable[..., None],

View File

@@ -477,7 +477,10 @@ def load_chat_file(
# Extract text content if it's a text file type (not an image)
content_text = None
file_type = file_descriptor["type"]
# `FileDescriptor` is often JSON-roundtripped (e.g. JSONB / API), so `type`
# may arrive as a raw string value instead of a `ChatFileType`.
file_type = ChatFileType(file_descriptor["type"])
if file_type.is_text_file():
try:
content_text = content.decode("utf-8")
@@ -708,3 +711,21 @@ def get_custom_agent_prompt(persona: Persona, chat_session: ChatSession) -> str
return chat_session.project.instructions
else:
return None
def is_last_assistant_message_clarification(chat_history: list[ChatMessage]) -> bool:
"""Check if the last assistant message in chat history was a clarification question.
This is used in the deep research flow to determine whether to skip the
clarification step when the user has already responded to a clarification.
Args:
chat_history: List of ChatMessage objects in chronological order
Returns:
True if the last assistant message has is_clarification=True, False otherwise
"""
for message in reversed(chat_history):
if message.message_type == MessageType.ASSISTANT:
return message.is_clarification
return False

View File

@@ -102,6 +102,11 @@ class MessageResponseIDInfo(BaseModel):
class StreamingError(BaseModel):
error: str
stack_trace: str | None = None
error_code: str | None = (
None # e.g., "RATE_LIMIT", "AUTH_ERROR", "TOOL_CALL_FAILED"
)
is_retryable: bool = True # Hint to frontend if retry might help
details: dict | None = None # Additional context (tool name, model name, etc.)
class OnyxAnswer(BaseModel):

View File

@@ -13,6 +13,7 @@ from onyx.chat.chat_state import run_chat_llm_with_state_containers
from onyx.chat.chat_utils import convert_chat_history
from onyx.chat.chat_utils import create_chat_history_chain
from onyx.chat.chat_utils import get_custom_agent_prompt
from onyx.chat.chat_utils import is_last_assistant_message_clarification
from onyx.chat.chat_utils import load_all_chat_files
from onyx.chat.emitter import get_default_emitter
from onyx.chat.llm_loop import run_llm_loop
@@ -63,10 +64,12 @@ from onyx.server.query_and_chat.streaming_models import AgentResponseStart
from onyx.server.query_and_chat.streaming_models import CitationInfo
from onyx.server.query_and_chat.streaming_models import Packet
from onyx.server.utils import get_json_line
from onyx.tools.constants import SEARCH_TOOL_ID
from onyx.tools.tool import Tool
from onyx.tools.tool_constructor import construct_tools
from onyx.tools.tool_constructor import CustomToolConfig
from onyx.tools.tool_constructor import SearchToolConfig
from onyx.tools.tool_constructor import SearchToolUsage
from onyx.utils.logger import setup_logger
from onyx.utils.long_term_log import LongTermLogger
from onyx.utils.timing import log_function_time
@@ -80,6 +83,10 @@ ERROR_TYPE_CANCELLED = "cancelled"
class ToolCallException(Exception):
"""Exception raised for errors during tool calls."""
def __init__(self, message: str, tool_name: str | None = None):
super().__init__(message)
self.tool_name = tool_name
def _extract_project_file_texts_and_images(
project_id: int | None,
@@ -207,6 +214,46 @@ def _extract_project_file_texts_and_images(
)
def _get_project_search_availability(
project_id: int | None,
persona_id: int | None,
has_project_file_texts: bool,
forced_tool_ids: list[int] | None,
search_tool_id: int | None,
) -> SearchToolUsage:
"""Determine search tool availability based on project context.
Args:
project_id: The project ID if the user is in a project
persona_id: The persona ID to check if it's the default persona
has_project_file_texts: Whether project files are loaded in context
forced_tool_ids: List of forced tool IDs (may be mutated to remove search tool)
search_tool_id: The search tool ID to check against
Returns:
SearchToolUsage setting indicating how search should be used
"""
# There are cases where the internal search tool should be disabled
# If the user is in a project, it should not use other sources / generic search
# If they are in a project but using a custom agent, it should use the agent setup
# (which means it can use search)
# However if in a project and there are more files than can fit in the context,
# it should use the search tool with the project filter on
# If no files are uploaded, search should remain enabled
search_usage_forcing_setting = SearchToolUsage.AUTO
if project_id:
if bool(persona_id is DEFAULT_PERSONA_ID and has_project_file_texts):
search_usage_forcing_setting = SearchToolUsage.DISABLED
# Remove search tool from forced_tool_ids if it's present
if forced_tool_ids and search_tool_id and search_tool_id in forced_tool_ids:
forced_tool_ids[:] = [
tool_id for tool_id in forced_tool_ids if tool_id != search_tool_id
]
elif forced_tool_ids and search_tool_id and search_tool_id in forced_tool_ids:
search_usage_forcing_setting = SearchToolUsage.ENABLED
return search_usage_forcing_setting
def _initialize_chat_session(
message_text: str,
files: list[FileDescriptor],
@@ -286,7 +333,7 @@ def stream_chat_message_objects(
tenant_id = get_current_tenant_id()
use_existing_user_message = new_msg_req.use_existing_user_message
llm: LLM
llm: LLM | None = None
try:
user_id = user.id if user is not None else None
@@ -400,17 +447,23 @@ def stream_chat_message_objects(
db_session=db_session,
)
# There are cases where the internal search tool should be disabled
# If the user is in a project, it should not use other sources / generic search
# If they are in a project but using a custom agent, it should use the agent setup
# (which means it can use search)
# However if in a project and there are more files than can fit in the context,
# it should use the search tool with the project filter on
# If no files are uploaded, search should remain enabled
disable_internal_search = bool(
chat_session.project_id
and persona.id is DEFAULT_PERSONA_ID
and extracted_project_files.project_file_texts
# Build a mapping of tool_id to tool_name for history reconstruction
all_tools = get_tools(db_session)
tool_id_to_name_map = {tool.id: tool.name for tool in all_tools}
search_tool_id = next(
(tool.id for tool in all_tools if tool.in_code_tool_id == SEARCH_TOOL_ID),
None,
)
# This may also mutate the new_msg_req.forced_tool_ids
# This logic is specifically for projects
search_usage_forcing_setting = _get_project_search_availability(
project_id=chat_session.project_id,
persona_id=persona.id,
has_project_file_texts=bool(extracted_project_files.project_file_texts),
forced_tool_ids=new_msg_req.forced_tool_ids,
search_tool_id=search_tool_id,
)
emitter = get_default_emitter()
@@ -439,7 +492,7 @@ def stream_chat_message_objects(
additional_headers=custom_tool_additional_headers,
),
allowed_tool_ids=new_msg_req.allowed_tool_ids,
disable_internal_search=disable_internal_search,
search_usage_forcing_setting=search_usage_forcing_setting,
)
tools: list[Tool] = []
for tool_list in tool_dict.values():
@@ -464,10 +517,6 @@ def stream_chat_message_objects(
reserved_assistant_message_id=assistant_response.id,
)
# Build a mapping of tool_id to tool_name for history reconstruction
all_tools = get_tools(db_session)
tool_id_to_name_map = {tool.id: tool.name for tool in all_tools}
# Convert the chat history into a simple format that is free of any DB objects
# and is easy to parse for the agent loop
simple_chat_history = convert_chat_history(
@@ -501,6 +550,10 @@ def stream_chat_message_objects(
if chat_session.project_id:
raise RuntimeError("Deep research is not supported for projects")
# Skip clarification if the last assistant message was a clarification
# (user has already responded to a clarification question)
skip_clarification = is_last_assistant_message_clarification(chat_history)
yield from run_chat_llm_with_state_containers(
run_deep_research_llm_loop,
is_connected=check_is_connected,
@@ -512,6 +565,7 @@ def stream_chat_message_objects(
llm=llm,
token_counter=token_counter,
db_session=db_session,
skip_clarification=skip_clarification,
user_identity=user_identity,
)
else:
@@ -579,13 +633,18 @@ def stream_chat_message_objects(
tool_calls=state_container.tool_calls,
db_session=db_session,
assistant_message=assistant_response,
is_clarification=state_container.is_clarification,
)
except ValueError as e:
logger.exception("Failed to process chat message.")
error_msg = str(e)
yield StreamingError(error=error_msg)
yield StreamingError(
error=error_msg,
error_code="VALIDATION_ERROR",
is_retryable=True,
)
db_session.rollback()
return
@@ -595,9 +654,17 @@ def stream_chat_message_objects(
stack_trace = traceback.format_exc()
if isinstance(e, ToolCallException):
yield StreamingError(error=error_msg, stack_trace=stack_trace)
yield StreamingError(
error=error_msg,
stack_trace=stack_trace,
error_code="TOOL_CALL_FAILED",
is_retryable=True,
details={"tool_name": e.tool_name} if e.tool_name else None,
)
elif llm:
client_error_msg = litellm_exception_to_error_msg(e, llm)
client_error_msg, error_code, is_retryable = litellm_exception_to_error_msg(
e, llm
)
if llm.config.api_key and len(llm.config.api_key) > 2:
client_error_msg = client_error_msg.replace(
llm.config.api_key, "[REDACTED_API_KEY]"
@@ -606,7 +673,24 @@ def stream_chat_message_objects(
llm.config.api_key, "[REDACTED_API_KEY]"
)
yield StreamingError(error=client_error_msg, stack_trace=stack_trace)
yield StreamingError(
error=client_error_msg,
stack_trace=stack_trace,
error_code=error_code,
is_retryable=is_retryable,
details={
"model": llm.config.model_name,
"provider": llm.config.model_provider,
},
)
else:
# LLM was never initialized - early failure
yield StreamingError(
error="Failed to initialize the chat. Please check your configuration and try again.",
stack_trace=stack_trace,
error_code="INIT_FAILED",
is_retryable=True,
)
db_session.rollback()
return

View File

@@ -148,6 +148,7 @@ def save_chat_turn(
citation_docs_info: list[CitationDocInfo],
db_session: Session,
assistant_message: ChatMessage,
is_clarification: bool = False,
) -> None:
"""
Save a chat turn by populating the assistant_message and creating related entities.
@@ -175,10 +176,12 @@ def save_chat_turn(
citation_docs_info: List of citation document information for building citations mapping
db_session: Database session for persistence
assistant_message: The ChatMessage object to populate (should already exist in DB)
is_clarification: Whether this assistant message is a clarification question (deep research flow)
"""
# 1. Update ChatMessage with message content, reasoning tokens, and token count
assistant_message.message = message_text
assistant_message.reasoning_tokens = reasoning_tokens
assistant_message.is_clarification = is_clarification
# Calculate token count using default tokenizer, when storing, this should not use the LLM
# specific one so we use a system default tokenizer here.

View File

@@ -65,9 +65,10 @@ GEN_AI_NUM_RESERVED_OUTPUT_TOKENS = int(
os.environ.get("GEN_AI_NUM_RESERVED_OUTPUT_TOKENS") or 1024
)
# Typically, GenAI models nowadays are at least 4K tokens
# Fallback token limit for models where the max context is unknown
# Set conservatively at 32K to handle most modern models
GEN_AI_MODEL_FALLBACK_MAX_TOKENS = int(
os.environ.get("GEN_AI_MODEL_FALLBACK_MAX_TOKENS") or 4096
os.environ.get("GEN_AI_MODEL_FALLBACK_MAX_TOKENS") or 32000
)
# This is used when computing how much context space is available for documents

View File

@@ -387,124 +387,162 @@ class ConfluenceConnector(
attachment_docs: list[Document] = []
page_url = ""
for attachment in self.confluence_client.paginated_cql_retrieval(
cql=attachment_query,
expand=",".join(_ATTACHMENT_EXPANSION_FIELDS),
):
media_type: str = attachment.get("metadata", {}).get("mediaType", "")
# TODO(rkuo): this check is partially redundant with validate_attachment_filetype
# and checks in convert_attachment_to_content/process_attachment
# but doing the check here avoids an unnecessary download. Due for refactoring.
if not self.allow_images:
if media_type.startswith("image/"):
logger.info(
f"Skipping attachment because allow images is False: {attachment['title']}"
)
continue
if not validate_attachment_filetype(
attachment,
try:
for attachment in self.confluence_client.paginated_cql_retrieval(
cql=attachment_query,
expand=",".join(_ATTACHMENT_EXPANSION_FIELDS),
):
logger.info(
f"Skipping attachment because it is not an accepted file type: {attachment['title']}"
)
continue
media_type: str = attachment.get("metadata", {}).get("mediaType", "")
logger.info(
f"Processing attachment: {attachment['title']} attached to page {page['title']}"
)
# Attachment document id: use the download URL for stable identity
try:
object_url = build_confluence_document_id(
self.wiki_base, attachment["_links"]["download"], self.is_cloud
)
except Exception as e:
logger.warning(
f"Invalid attachment url for id {attachment['id']}, skipping"
)
logger.debug(f"Error building attachment url: {e}")
continue
try:
response = convert_attachment_to_content(
confluence_client=self.confluence_client,
attachment=attachment,
page_id=page["id"],
allow_images=self.allow_images,
)
if response is None:
# TODO(rkuo): this check is partially redundant with validate_attachment_filetype
# and checks in convert_attachment_to_content/process_attachment
# but doing the check here avoids an unnecessary download. Due for refactoring.
if not self.allow_images:
if media_type.startswith("image/"):
logger.info(
f"Skipping attachment because allow images is False: {attachment['title']}"
)
continue
if not validate_attachment_filetype(
attachment,
):
logger.info(
f"Skipping attachment because it is not an accepted file type: {attachment['title']}"
)
continue
content_text, file_storage_name = response
logger.info(
f"Processing attachment: {attachment['title']} attached to page {page['title']}"
)
# Attachment document id: use the download URL for stable identity
try:
object_url = build_confluence_document_id(
self.wiki_base, attachment["_links"]["download"], self.is_cloud
)
except Exception as e:
logger.warning(
f"Invalid attachment url for id {attachment['id']}, skipping"
)
logger.debug(f"Error building attachment url: {e}")
continue
try:
response = convert_attachment_to_content(
confluence_client=self.confluence_client,
attachment=attachment,
page_id=page["id"],
allow_images=self.allow_images,
)
if response is None:
continue
sections: list[TextSection | ImageSection] = []
if content_text:
sections.append(TextSection(text=content_text, link=object_url))
elif file_storage_name:
sections.append(
ImageSection(link=object_url, image_file_id=file_storage_name)
content_text, file_storage_name = response
sections: list[TextSection | ImageSection] = []
if content_text:
sections.append(TextSection(text=content_text, link=object_url))
elif file_storage_name:
sections.append(
ImageSection(
link=object_url, image_file_id=file_storage_name
)
)
# Build attachment-specific metadata
attachment_metadata: dict[str, str | list[str]] = {}
if "space" in attachment:
attachment_metadata["space"] = attachment["space"].get(
"name", ""
)
labels: list[str] = []
if "metadata" in attachment and "labels" in attachment["metadata"]:
for label in attachment["metadata"]["labels"].get(
"results", []
):
labels.append(label.get("name", ""))
if labels:
attachment_metadata["labels"] = labels
page_url = page_url or build_confluence_document_id(
self.wiki_base, page["_links"]["webui"], self.is_cloud
)
attachment_metadata["parent_page_id"] = page_url
attachment_id = build_confluence_document_id(
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
)
# Build attachment-specific metadata
attachment_metadata: dict[str, str | list[str]] = {}
if "space" in attachment:
attachment_metadata["space"] = attachment["space"].get("name", "")
labels: list[str] = []
if "metadata" in attachment and "labels" in attachment["metadata"]:
for label in attachment["metadata"]["labels"].get("results", []):
labels.append(label.get("name", ""))
if labels:
attachment_metadata["labels"] = labels
page_url = page_url or build_confluence_document_id(
self.wiki_base, page["_links"]["webui"], self.is_cloud
)
attachment_metadata["parent_page_id"] = page_url
attachment_id = build_confluence_document_id(
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
)
primary_owners: list[BasicExpertInfo] | None = None
if "version" in attachment and "by" in attachment["version"]:
author = attachment["version"]["by"]
display_name = author.get("displayName", "Unknown")
email = author.get("email", "unknown@domain.invalid")
primary_owners = [
BasicExpertInfo(display_name=display_name, email=email)
]
primary_owners: list[BasicExpertInfo] | None = None
if "version" in attachment and "by" in attachment["version"]:
author = attachment["version"]["by"]
display_name = author.get("displayName", "Unknown")
email = author.get("email", "unknown@domain.invalid")
primary_owners = [
BasicExpertInfo(display_name=display_name, email=email)
]
attachment_doc = Document(
id=attachment_id,
sections=sections,
source=DocumentSource.CONFLUENCE,
semantic_identifier=attachment.get("title", object_url),
metadata=attachment_metadata,
doc_updated_at=(
datetime_from_string(attachment["version"]["when"])
if attachment.get("version")
and attachment["version"].get("when")
else None
),
primary_owners=primary_owners,
)
attachment_docs.append(attachment_doc)
except Exception as e:
logger.error(
f"Failed to extract/summarize attachment {attachment['title']}",
exc_info=e,
)
if is_atlassian_date_error(e):
# propagate error to be caught and retried
raise
attachment_failures.append(
ConnectorFailure(
failed_document=DocumentFailure(
document_id=object_url,
document_link=object_url,
),
failure_message=f"Failed to extract/summarize attachment {attachment['title']} for doc {object_url}",
exception=e,
)
)
except HTTPError as e:
# If we get a 403 after all retries, the user likely doesn't have permission
# to access attachments on this page. Log and skip rather than failing the whole job.
if e.response and e.response.status_code == 403:
page_title = page.get("title", "unknown")
page_id = page.get("id", "unknown")
logger.warning(
f"Permission denied (403) when fetching attachments for page '{page_title}' "
f"(ID: {page_id}). The user may not have permission to query attachments on this page. "
"Skipping attachments for this page."
)
# Build the page URL for the failure record
try:
page_url = build_confluence_document_id(
self.wiki_base, page["_links"]["webui"], self.is_cloud
)
except Exception:
page_url = f"page_id:{page_id}"
attachment_doc = Document(
id=attachment_id,
sections=sections,
source=DocumentSource.CONFLUENCE,
semantic_identifier=attachment.get("title", object_url),
metadata=attachment_metadata,
doc_updated_at=(
datetime_from_string(attachment["version"]["when"])
if attachment.get("version")
and attachment["version"].get("when")
else None
),
primary_owners=primary_owners,
)
attachment_docs.append(attachment_doc)
except Exception as e:
logger.error(
f"Failed to extract/summarize attachment {attachment['title']}",
exc_info=e,
)
if is_atlassian_date_error(e):
# propagate error to be caught and retried
raise
attachment_failures.append(
return [], [
ConnectorFailure(
failed_document=DocumentFailure(
document_id=object_url,
document_link=object_url,
document_id=page_id,
document_link=page_url,
),
failure_message=f"Failed to extract/summarize attachment {attachment['title']} for doc {object_url}",
failure_message=f"Permission denied (403) when fetching attachments for page '{page_title}'",
exception=e,
)
)
]
else:
raise
return attachment_docs, attachment_failures

View File

@@ -579,13 +579,18 @@ class OnyxConfluence:
while url_suffix:
logger.debug(f"Making confluence call to {url_suffix}")
try:
# Only pass params if they're not already in the URL to avoid duplicate
# params accumulating. Confluence's _links.next already includes these.
params = {}
if "body-format=" not in url_suffix:
params["body-format"] = "atlas_doc_format"
if "expand=" not in url_suffix:
params["expand"] = "body.atlas_doc_format"
raw_response = self.get(
path=url_suffix,
advanced_mode=True,
params={
"body-format": "atlas_doc_format",
"expand": "body.atlas_doc_format",
},
params=params,
)
except Exception as e:
logger.exception(f"Error in confluence call to {url_suffix}")

View File

@@ -26,7 +26,6 @@ from onyx.utils.logger import setup_logger
HUBSPOT_BASE_URL = "https://app.hubspot.com"
HUBSPOT_API_URL = "https://api.hubapi.com/integrations/v1/me"
# Available HubSpot object types
AVAILABLE_OBJECT_TYPES = {"tickets", "companies", "deals", "contacts"}
HUBSPOT_PAGE_SIZE = 100

View File

@@ -20,6 +20,11 @@ class OptionalSearchSetting(str, Enum):
AUTO = "auto"
class QueryType(str, Enum):
KEYWORD = "keyword"
SEMANTIC = "semantic"
class SearchType(str, Enum):
KEYWORD = "keyword"
SEMANTIC = "semantic"

View File

@@ -1,7 +1,7 @@
An explanation of how the history of messages, tool calls, and docs are stored in the database:
Messages are grouped by a chat session, a tree structured is used to allow edits and for the
user to switch between branches. Each ChatMessage is either a user message of an assistant message.
user to switch between branches. Each ChatMessage is either a user message or an assistant message.
It should always alternate between the two, System messages, custom agent prompt injections, and
reminder messages are injected dynamically after the chat session is loaded into memory. The user
and assistant messages are stored in pairs, though it is ok if the user message is stored and the

View File

@@ -2141,6 +2141,8 @@ class ChatMessage(Base):
time_sent: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
# True if this assistant message is a clarification question (deep research flow)
is_clarification: Mapped[bool] = mapped_column(Boolean, default=False)
# Relationships
chat_session: Mapped[ChatSession] = relationship("ChatSession")

View File

@@ -1,3 +1,7 @@
# TODO: Notes for potential extensions and future improvements:
# 1. Allow tools that aren't search specific tools
# 2. Use user provided custom prompts
from collections.abc import Callable
from typing import cast
@@ -15,16 +19,28 @@ from onyx.deep_research.dr_mock_tools import get_clarification_tool_definitions
from onyx.llm.interfaces import LLM
from onyx.llm.interfaces import LLMUserIdentity
from onyx.llm.models import ToolChoiceOptions
from onyx.llm.utils import model_is_reasoning_model
from onyx.prompts.deep_research.orchestration_layer import CLARIFICATION_PROMPT
from onyx.prompts.deep_research.orchestration_layer import ORCHESTRATOR_PROMPT
from onyx.prompts.deep_research.orchestration_layer import ORCHESTRATOR_PROMPT_REASONING
from onyx.prompts.deep_research.orchestration_layer import RESEARCH_PLAN_PROMPT
from onyx.prompts.prompt_utils import get_current_llm_day_time
from onyx.server.query_and_chat.streaming_models import AgentResponseDelta
from onyx.server.query_and_chat.streaming_models import AgentResponseStart
from onyx.server.query_and_chat.streaming_models import DeepResearchPlanDelta
from onyx.server.query_and_chat.streaming_models import DeepResearchPlanStart
from onyx.server.query_and_chat.streaming_models import OverallStop
from onyx.server.query_and_chat.streaming_models import Packet
from onyx.tools.tool import Tool
from onyx.tools.tool_implementations.open_url.open_url_tool import OpenURLTool
from onyx.tools.tool_implementations.search.search_tool import SearchTool
from onyx.tools.tool_implementations.web_search.web_search_tool import WebSearchTool
from onyx.utils.logger import setup_logger
logger = setup_logger()
MAX_MESSAGES_FOR_CLARIFICATION = 5
MAX_USER_MESSAGES_FOR_CONTEXT = 5
MAX_ORCHESTRATOR_CYCLES = 8
def run_deep_research_llm_loop(
@@ -42,6 +58,8 @@ def run_deep_research_llm_loop(
# Here for lazy load LiteLLM
from onyx.llm.litellm_singleton.config import initialize_litellm
# An approximate limit. In extreme cases it may still fail but this should allow deep research
# to work in most cases.
if llm.config.max_input_tokens < 25000:
raise RuntimeError(
"Cannot run Deep Research with an LLM that has less than 25,000 max input tokens"
@@ -50,13 +68,19 @@ def run_deep_research_llm_loop(
initialize_litellm()
available_tokens = llm.config.max_input_tokens
current_tool_call_index = 0
llm_step_result: LlmStepResult | None = None
# Filter tools to only allow web search, internal search, and open URL
allowed_tool_names = {SearchTool.NAME, WebSearchTool.NAME, OpenURLTool.NAME}
[tool for tool in tools if tool.name in allowed_tool_names]
#########################################################
# CLARIFICATION STEP (optional)
#########################################################
if not skip_clarification:
clarification_prompt = CLARIFICATION_PROMPT.format(
current_datetime=get_current_llm_day_time()
current_datetime=get_current_llm_day_time(full_sentence=False)
)
system_prompt = ChatMessageSimple(
message=clarification_prompt,
@@ -71,7 +95,7 @@ def run_deep_research_llm_loop(
reminder_message=None,
project_files=None,
available_tokens=available_tokens,
last_n_user_messages=MAX_MESSAGES_FOR_CLARIFICATION,
last_n_user_messages=MAX_USER_MESSAGES_FOR_CONTEXT,
)
step_generator = run_llm_step(
@@ -79,7 +103,7 @@ def run_deep_research_llm_loop(
tool_definitions=get_clarification_tool_definitions(),
tool_choice=ToolChoiceOptions.AUTO,
llm=llm,
turn_index=current_tool_call_index,
turn_index=0,
# No citations in this step, it should just pass through all
# tokens directly so initialized as an empty citation processor
citation_processor=DynamicCitationProcessor(),
@@ -94,16 +118,137 @@ def run_deep_research_llm_loop(
packet = next(step_generator)
emitter.emit(packet)
except StopIteration as e:
llm_step_result, current_tool_call_index = e.value
llm_step_result, _ = e.value
break
# Type narrowing: generator always returns a result, so this can't be None
llm_step_result = cast(LlmStepResult, llm_step_result)
if not llm_step_result.tool_calls:
emitter.emit(
Packet(turn_index=current_tool_call_index, obj=OverallStop(type="stop"))
)
# Mark this turn as a clarification question
state_container.set_is_clarification(True)
emitter.emit(Packet(turn_index=0, obj=OverallStop(type="stop")))
# If a clarification is asked, we need to end this turn and wait on user input
return
#########################################################
# RESEARCH PLAN STEP
#########################################################
system_prompt = ChatMessageSimple(
message=RESEARCH_PLAN_PROMPT.format(
current_datetime=get_current_llm_day_time(full_sentence=False)
),
token_count=300,
message_type=MessageType.SYSTEM,
)
truncated_message_history = construct_message_history(
system_prompt=system_prompt,
custom_agent_prompt=None,
simple_chat_history=simple_chat_history,
reminder_message=None,
project_files=None,
available_tokens=available_tokens,
last_n_user_messages=MAX_USER_MESSAGES_FOR_CONTEXT,
)
research_plan_generator = run_llm_step(
history=truncated_message_history,
tool_definitions=[],
tool_choice=ToolChoiceOptions.NONE,
llm=llm,
turn_index=0,
# No citations in this step, it should just pass through all
# tokens directly so initialized as an empty citation processor
citation_processor=DynamicCitationProcessor(),
state_container=state_container,
final_documents=None,
user_identity=user_identity,
)
while True:
try:
packet = next(research_plan_generator)
# Translate AgentResponseStart/Delta packets to DeepResearchPlanStart/Delta
if isinstance(packet.obj, AgentResponseStart):
emitter.emit(
Packet(
turn_index=packet.turn_index,
obj=DeepResearchPlanStart(),
)
)
elif isinstance(packet.obj, AgentResponseDelta):
emitter.emit(
Packet(
turn_index=packet.turn_index,
obj=DeepResearchPlanDelta(content=packet.obj.content),
)
)
else:
# Pass through other packet types (e.g., ReasoningStart, ReasoningDelta, etc.)
emitter.emit(packet)
except StopIteration as e:
llm_step_result, _ = e.value
break
llm_step_result = cast(LlmStepResult, llm_step_result)
research_plan = llm_step_result.answer
#########################################################
# RESEARCH EXECUTION STEP
#########################################################
is_reasoning_model = model_is_reasoning_model(
llm.config.model_name, llm.config.model_provider
)
orchestrator_prompt_template = (
ORCHESTRATOR_PROMPT if not is_reasoning_model else ORCHESTRATOR_PROMPT_REASONING
)
token_count_prompt = orchestrator_prompt_template.format(
current_datetime=get_current_llm_day_time(full_sentence=False),
current_cycle_count=1,
max_cycles=MAX_ORCHESTRATOR_CYCLES,
research_plan=research_plan,
)
orchestration_tokens = token_counter(token_count_prompt)
for cycle in range(MAX_ORCHESTRATOR_CYCLES):
orchestrator_prompt = orchestrator_prompt_template.format(
current_datetime=get_current_llm_day_time(full_sentence=False),
current_cycle_count=cycle,
max_cycles=MAX_ORCHESTRATOR_CYCLES,
research_plan=research_plan,
)
system_prompt = ChatMessageSimple(
message=orchestrator_prompt,
token_count=orchestration_tokens,
message_type=MessageType.SYSTEM,
)
truncated_message_history = construct_message_history(
system_prompt=system_prompt,
custom_agent_prompt=None,
simple_chat_history=simple_chat_history,
reminder_message=None,
project_files=None,
available_tokens=available_tokens,
last_n_user_messages=MAX_USER_MESSAGES_FOR_CONTEXT,
)
research_plan_generator = run_llm_step(
history=truncated_message_history,
tool_definitions=[],
tool_choice=ToolChoiceOptions.AUTO,
llm=llm,
turn_index=cycle,
# No citations in this step, it should just pass through all
# tokens directly so initialized as an empty citation processor
citation_processor=DynamicCitationProcessor(),
state_container=state_container,
final_documents=None,
user_identity=user_identity,
)

View File

@@ -0,0 +1,325 @@
import abc
from collections.abc import Iterator
from typing import Any
from pydantic import BaseModel
from onyx.access.models import DocumentAccess
from onyx.context.search.enums import QueryType
from onyx.context.search.models import IndexFilters
from onyx.context.search.models import InferenceChunk
from onyx.db.enums import EmbeddingPrecision
from onyx.indexing.models import DocMetadataAwareIndexChunk
from shared_configs.model_server_models import Embedding
# NOTE: "Document" in the naming convention is used to refer to the entire document as represented in Onyx.
# What is actually stored in the index is the document chunks. By the terminology of most search engines / vector
# databases, the individual objects stored are called documents, but in this case it refers to a chunk.
# Outside of searching and update capabilities, the document index must also implement the ability to port all of
# the documents over to a secondary index. This allows for embedding models to be updated and for porting documents
# to happen in the background while the primary index still serves the main traffic.
__all__ = [
# Main interfaces - these are what you should inherit from
"DocumentIndex",
# Data models - used in method signatures
"DocumentInsertionRecord",
"DocumentSectionRequest",
"IndexingMetadata",
"MetadataUpdateRequest",
# Capability mixins - for custom compositions or type checking
"SchemaVerifiable",
"Indexable",
"Deletable",
"Updatable",
"IdRetrievalCapable",
"HybridCapable",
"RandomCapable",
]
class DocumentInsertionRecord(BaseModel):
"""
Result of indexing a document
"""
model_config = {"frozen": True}
document_id: str
already_existed: bool
class DocumentSectionRequest(BaseModel):
"""
Request for a document section or whole document
If no min_chunk_ind is provided it should start at the beginning of the document
If no max_chunk_ind is provided it should go to the end of the document
"""
model_config = {"frozen": True}
document_id: str
min_chunk_ind: int | None = None
max_chunk_ind: int | None = None
class IndexingMetadata(BaseModel):
"""
Information about chunk counts for efficient cleaning / updating of document chunks. A common pattern to ensure
that no chunks are left over is to delete all of the chunks for a document and then re-index the document. This
information allows us to only delete the extra "tail" chunks when the document has gotten shorter.
"""
# The tuple is (old_chunk_cnt, new_chunk_cnt)
doc_id_to_chunk_cnt_diff: dict[str, tuple[int, int]]
class MetadataUpdateRequest(BaseModel):
"""
Updates to the documents that can happen without there being an update to the contents of the document.
"""
document_ids: list[str]
# Passed in to help with potential optimizations of the implementation
doc_id_to_chunk_cnt: dict[str, int]
# For the ones that are None, there is no update required to that field
access: DocumentAccess | None = None
document_sets: set[str] | None = None
boost: float | None = None
hidden: bool | None = None
secondary_index_updated: bool | None = None
project_ids: set[int] | None = None
class SchemaVerifiable(abc.ABC):
"""
Class must implement document index schema verification. For example, verify that all of the
necessary attributes for indexing, querying, filtering, and fields to return from search are
all valid in the schema.
"""
def __init__(
self,
index_name: str,
tenant_id: int | None,
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.index_name = index_name
self.tenant_id = tenant_id
@abc.abstractmethod
def verify_and_create_index_if_necessary(
self,
embedding_dim: int,
embedding_precision: EmbeddingPrecision,
) -> None:
"""
Verify that the document index exists and is consistent with the expectations in the code. For certain search
engines, the schema needs to be created before indexing can happen. This call should create the schema if it
does not exist.
Parameters:
- embedding_dim: Vector dimensionality for the vector similarity part of the search
- embedding_precision: Precision of the vector similarity part of the search
"""
raise NotImplementedError
class Indexable(abc.ABC):
"""
Class must implement the ability to index document chunks
"""
@abc.abstractmethod
def index(
self,
chunks: Iterator[DocMetadataAwareIndexChunk],
indexing_metadata: IndexingMetadata,
) -> set[DocumentInsertionRecord]:
"""
Takes a list of document chunks and indexes them in the document index. This is often a batch operation
including chunks from multiple documents.
NOTE: When a document is reindexed/updated here and has gotten shorter, it is important to delete the extra
chunks at the end to ensure there are no stale chunks in the index.
NOTE: The chunks of a document are never separated into separate index() calls. So there is
no worry of receiving the first 0 through n chunks in one index call and the next n through
m chunks of a document in the next index call.
Parameters:
- chunks: Document chunks with all of the information needed for indexing to the document index.
- indexing_metadata: Information about chunk counts for efficient cleaning / updating
Returns:
List of document ids which map to unique documents and are used for deduping chunks
when updating, as well as if the document is newly indexed or already existed and
just updated
"""
raise NotImplementedError
class Deletable(abc.ABC):
"""
Class must implement the ability to delete document by a given unique document id. Note that the document id is the
unique identifier for the document as represented in Onyx, not in the document index.
"""
@abc.abstractmethod
def delete(
self,
db_doc_id: str,
*,
# Passed in in case it helps the efficiency of the delete implementation
chunk_count: int | None,
) -> int:
"""
Given a single document, hard delete all of the chunks for the document from the document index
Parameters:
- doc_id: document id as represented in Onyx
- chunk_count: number of chunks in the document
Returns:
number of chunks deleted
"""
raise NotImplementedError
class Updatable(abc.ABC):
"""
Class must implement the ability to update certain attributes of a document without needing to
update all of the fields. Specifically, needs to be able to update:
- Access Control List
- Document-set membership
- Boost value (learning from feedback mechanism)
- Whether the document is hidden or not, hidden documents are not returned from search
- Which Projects the document is a part of
"""
@abc.abstractmethod
def update(self, update_requests: list[MetadataUpdateRequest]) -> None:
"""
Updates some set of chunks. The document and fields to update are specified in the update
requests. Each update request in the list applies its changes to a list of document ids.
None values mean that the field does not need an update.
Parameters:
- update_requests: for a list of document ids in the update request, apply the same updates
to all of the documents with those ids. This is for bulk handling efficiency. Many
updates are done at the connector level which have many documents for the connector
"""
raise NotImplementedError
class IdRetrievalCapable(abc.ABC):
"""
Class must implement the ability to retrieve either:
- All of the chunks of a document IN ORDER given a document id. Caller assumes it to be in order.
- A specific section (continuous set of chunks) for some document.
"""
@abc.abstractmethod
def id_based_retrieval(
self,
chunk_requests: list[DocumentSectionRequest],
) -> list[InferenceChunk]:
"""
Fetch chunk(s) based on document id
NOTE: This is used to reconstruct a full document or an extended (multi-chunk) section
of a document. Downstream currently assumes that the chunking does not introduce overlaps
between the chunks. If there are overlaps for the chunks, then the reconstructed document
or extended section will have duplicate segments.
NOTE: This should be used after a search call to get more context around returned chunks.
There is no filters here since the calling code should not be calling this on arbitrary
documents.
Parameters:
- chunk_requests: requests containing the document id and the chunk range to retrieve
Returns:
list of sections from the documents specified
"""
raise NotImplementedError
class HybridCapable(abc.ABC):
"""
Class must implement hybrid (keyword + vector) search functionality
"""
@abc.abstractmethod
def hybrid_retrieval(
self,
query: str,
query_embedding: Embedding,
final_keywords: list[str] | None,
query_type: QueryType,
filters: IndexFilters,
num_to_retrieve: int,
offset: int = 0,
) -> list[InferenceChunk]:
"""
Run hybrid search and return a list of inference chunks.
Parameters:
- query: unmodified user query. This may be needed for getting the matching highlighted
keywords or for logging purposes
- query_embedding: vector representation of the query, must be of the correct
dimensionality for the primary index
- final_keywords: Final keywords to be used from the query, defaults to query if not set
- query_type: Semantic or keyword type query, may use different scoring logic for each
- filters: Filters for things like permissions, source type, time, etc.
- num_to_retrieve: number of highest matching chunks to return
- offset: number of highest matching chunks to skip (kind of like pagination)
Returns:
Score ranked (highest first) list of highest matching chunks
"""
raise NotImplementedError
class RandomCapable(abc.ABC):
"""Class must implement random document retrieval capability.
This currently is just used for porting the documents to a secondary index."""
@abc.abstractmethod
def random_retrieval(
self,
filters: IndexFilters | None = None,
num_to_retrieve: int = 100,
dirty: bool | None = None,
) -> list[InferenceChunk]:
"""Retrieve random chunks matching the filters"""
raise NotImplementedError
class DocumentIndex(
SchemaVerifiable,
Indexable,
Updatable,
Deletable,
HybridCapable,
IdRetrievalCapable,
RandomCapable,
abc.ABC,
):
"""
A valid document index that can plug into all Onyx flows must implement all of these
functionalities.
As a high level summary, document indices need to be able to
- Verify the schema definition is valid
- Index new documents
- Update specific attributes of existing documents
- Delete documents
- Run hybrid search
- Retrieve document or sections of documents based on document id
- Retrieve sets of random documents
"""

View File

@@ -25,17 +25,17 @@ class SlackEntities(BaseModel):
# Direct message filtering
include_dm: bool = Field(
default=False,
default=True,
description="Include user direct messages in search results",
)
include_group_dm: bool = Field(
default=False,
default=True,
description="Include group direct messages (multi-person DMs) in search results",
)
# Private channel filtering
include_private_channels: bool = Field(
default=False,
default=True,
description="Include private channels in search results (user must have access)",
)

View File

@@ -298,17 +298,17 @@ def verify_user_files(
for file_descriptor in user_files:
# Check if this file descriptor has a user_file_id
if "user_file_id" in file_descriptor and file_descriptor["user_file_id"]:
if file_descriptor.get("user_file_id"):
try:
user_file_ids.append(UUID(file_descriptor["user_file_id"]))
except (ValueError, TypeError):
logger.warning(
f"Invalid user_file_id in file descriptor: {file_descriptor.get('user_file_id')}"
f"Invalid user_file_id in file descriptor: {file_descriptor['user_file_id']}"
)
continue
else:
# This is a project file - use the 'id' field which is the file_id
if "id" in file_descriptor and file_descriptor["id"]:
if file_descriptor.get("id"):
project_file_ids.append(file_descriptor["id"])
# Verify user files (existing logic)

View File

@@ -571,7 +571,7 @@ def fetch_model_configurations_for_provider(
"""Fetch model configurations for a static provider (OpenAI, Anthropic, Vertex AI).
Looks up max_input_tokens from LiteLLM's model_cost. If not found, stores None
and the runtime will use the fallback (4096).
and the runtime will use the fallback (32000).
Models in the curated visible lists (OPENAI_VISIBLE_MODEL_NAMES, etc.) are
marked as is_visible=True by default.

View File

@@ -2621,6 +2621,28 @@
"model_vendor": "openai",
"model_version": "2025-10-06"
},
"gpt-5.2-pro-2025-12-11": {
"display_name": "GPT-5.2 Pro",
"model_vendor": "openai",
"model_version": "2025-12-11"
},
"gpt-5.2-pro": {
"display_name": "GPT-5.2 Pro",
"model_vendor": "openai"
},
"gpt-5.2-chat-latest": {
"display_name": "GPT 5.2 Chat",
"model_vendor": "openai"
},
"gpt-5.2-2025-12-11": {
"display_name": "GPT 5.2",
"model_vendor": "openai",
"model_version": "2025-12-11"
},
"gpt-5.2": {
"display_name": "GPT 5.2",
"model_vendor": "openai"
},
"gpt-5.1": {
"display_name": "GPT 5.1",
"model_vendor": "openai"

View File

@@ -85,7 +85,15 @@ def litellm_exception_to_error_msg(
custom_error_msg_mappings: (
dict[str, str] | None
) = LITELLM_CUSTOM_ERROR_MESSAGE_MAPPINGS,
) -> str:
) -> tuple[str, str, bool]:
"""Convert a LiteLLM exception to a user-friendly error message with classification.
Returns:
tuple: (error_message, error_code, is_retryable)
- error_message: User-friendly error description
- error_code: Categorized error code for frontend display
- is_retryable: Whether the user should try again
"""
from litellm.exceptions import BadRequestError
from litellm.exceptions import AuthenticationError
from litellm.exceptions import PermissionDeniedError
@@ -102,25 +110,37 @@ def litellm_exception_to_error_msg(
core_exception = _unwrap_nested_exception(e)
error_msg = str(core_exception)
error_code = "UNKNOWN_ERROR"
is_retryable = True
if custom_error_msg_mappings:
for error_msg_pattern, custom_error_msg in custom_error_msg_mappings.items():
if error_msg_pattern in error_msg:
return custom_error_msg
return custom_error_msg, "CUSTOM_ERROR", True
if isinstance(core_exception, BadRequestError):
error_msg = "Bad request: The server couldn't process your request. Please check your input."
error_code = "BAD_REQUEST"
is_retryable = True
elif isinstance(core_exception, AuthenticationError):
error_msg = "Authentication failed: Please check your API key and credentials."
error_code = "AUTH_ERROR"
is_retryable = False
elif isinstance(core_exception, PermissionDeniedError):
error_msg = (
"Permission denied: You don't have the necessary permissions for this operation."
"Permission denied: You don't have the necessary permissions for this operation. "
"Ensure you have access to this model."
)
error_code = "PERMISSION_DENIED"
is_retryable = False
elif isinstance(core_exception, NotFoundError):
error_msg = "Resource not found: The requested resource doesn't exist."
error_code = "NOT_FOUND"
is_retryable = False
elif isinstance(core_exception, UnprocessableEntityError):
error_msg = "Unprocessable entity: The server couldn't process your request due to semantic errors."
error_code = "UNPROCESSABLE_ENTITY"
is_retryable = True
elif isinstance(core_exception, RateLimitError):
provider_name = (
llm.config.model_provider
@@ -151,6 +171,8 @@ def litellm_exception_to_error_msg(
if upstream_detail
else f"{provider_name} rate limit exceeded: Please slow down your requests and try again later."
)
error_code = "RATE_LIMIT"
is_retryable = True
elif isinstance(core_exception, ServiceUnavailableError):
provider_name = (
llm.config.model_provider
@@ -168,6 +190,8 @@ def litellm_exception_to_error_msg(
else:
# Generic 503 Service Unavailable
error_msg = f"{provider_name} service error: {str(core_exception)}"
error_code = "SERVICE_UNAVAILABLE"
is_retryable = True
elif isinstance(core_exception, ContextWindowExceededError):
error_msg = (
"Context window exceeded: Your input is too long for the model to process."
@@ -178,29 +202,44 @@ def litellm_exception_to_error_msg(
model_name=llm.config.model_name,
model_provider=llm.config.model_provider,
)
error_msg += f"Your invoked model ({llm.config.model_name}) has a maximum context size of {max_context}"
error_msg += f" Your invoked model ({llm.config.model_name}) has a maximum context size of {max_context}."
except Exception:
logger.warning(
"Unable to get maximum input token for LiteLLM excpetion handling"
"Unable to get maximum input token for LiteLLM exception handling"
)
error_code = "CONTEXT_TOO_LONG"
is_retryable = False
elif isinstance(core_exception, ContentPolicyViolationError):
error_msg = "Content policy violation: Your request violates the content policy. Please revise your input."
error_code = "CONTENT_POLICY"
is_retryable = False
elif isinstance(core_exception, APIConnectionError):
error_msg = "API connection error: Failed to connect to the API. Please check your internet connection."
error_code = "CONNECTION_ERROR"
is_retryable = True
elif isinstance(core_exception, BudgetExceededError):
error_msg = (
"Budget exceeded: You've exceeded your allocated budget for API usage."
)
error_code = "BUDGET_EXCEEDED"
is_retryable = False
elif isinstance(core_exception, Timeout):
error_msg = "Request timed out: The operation took too long to complete. Please try again."
error_code = "CONNECTION_ERROR"
is_retryable = True
elif isinstance(core_exception, APIError):
error_msg = (
"API error: An error occurred while communicating with the API. "
f"Details: {str(core_exception)}"
)
error_code = "API_ERROR"
is_retryable = True
elif not fallback_to_error_msg:
error_msg = "An unexpected error occurred while processing your request. Please try again later."
return error_msg
error_code = "UNKNOWN_ERROR"
is_retryable = True
return error_msg, error_code, is_retryable
def llm_response_to_string(message: ModelResponse) -> str:
@@ -514,11 +553,11 @@ def get_max_input_tokens_from_llm_provider(
1. Use max_input_tokens from model_configuration (populated from source APIs
like OpenRouter, Ollama, or our Bedrock mapping)
2. Look up in litellm.model_cost dictionary
3. Fall back to GEN_AI_MODEL_FALLBACK_MAX_TOKENS (4096)
3. Fall back to GEN_AI_MODEL_FALLBACK_MAX_TOKENS (32000)
Most dynamic providers (OpenRouter, Ollama) provide context_length via their
APIs. Bedrock doesn't expose this, so we parse from model ID suffix (:200k)
or use BEDROCK_MODEL_TOKEN_LIMITS mapping. The 4096 fallback is only hit for
or use BEDROCK_MODEL_TOKEN_LIMITS mapping. The 32000 fallback is only hit for
unknown models not in any of these sources.
"""
max_input_tokens = None
@@ -545,7 +584,7 @@ def get_bedrock_token_limit(model_id: str) -> int:
1. Parse from model ID suffix (e.g., ":200k" → 200000)
2. Check LiteLLM's model_cost dictionary
3. Fall back to our hardcoded BEDROCK_MODEL_TOKEN_LIMITS mapping
4. Default to 4096 if not found anywhere
4. Default to 32000 if not found anywhere
"""
from onyx.llm.constants import BEDROCK_MODEL_TOKEN_LIMITS

View File

@@ -149,7 +149,7 @@ def test_llm_configuration(
)
if error:
client_error_msg = litellm_exception_to_error_msg(
client_error_msg, _error_code, _is_retryable = litellm_exception_to_error_msg(
error, llm, fallback_to_error_msg=True
)
raise HTTPException(status_code=400, detail=client_error_msg)

View File

@@ -301,7 +301,7 @@ class BedrockModelsRequest(BaseModel):
class BedrockFinalModelResponse(BaseModel):
name: str # Model ID (e.g., "anthropic.claude-3-5-sonnet-20241022-v2:0")
display_name: str # Human-readable name from AWS (e.g., "Claude 3.5 Sonnet v2")
max_input_tokens: int # From LiteLLM, our mapping, or default 4096
max_input_tokens: int # From LiteLLM, our mapping, or default 32000
supports_image_input: bool

View File

@@ -162,36 +162,13 @@ def test_search_provider(
status_code=400, detail="Unable to build provider configuration."
)
# Actually test the API key by making a real search call
# Run the API client's test_connection method to ensure the connection is valid.
try:
test_results = provider.search("test")
if not test_results or not any(result.link for result in test_results):
raise HTTPException(
status_code=400,
detail="API key validation failed: search returned no results.",
)
return provider.test_connection()
except HTTPException:
raise
except Exception as e:
error_msg = str(e)
if (
"api" in error_msg.lower()
or "key" in error_msg.lower()
or "auth" in error_msg.lower()
):
raise HTTPException(
status_code=400,
detail=f"Invalid API key: {error_msg}",
) from e
raise HTTPException(
status_code=400,
detail=f"API key validation failed: {error_msg}",
) from e
logger.info(
f"Web search provider test succeeded for {request.provider_type.value}."
)
return {"status": "ok"}
raise HTTPException(status_code=400, detail=str(e)) from e
@admin_router.get("/content-providers", response_model=list[WebContentProviderView])

View File

@@ -34,6 +34,9 @@ class StreamingType(Enum):
REASONING_DONE = "reasoning_done"
CITATION_INFO = "citation_info"
DEEP_RESEARCH_PLAN_START = "deep_research_plan_start"
DEEP_RESEARCH_PLAN_DELTA = "deep_research_plan_delta"
class BaseObj(BaseModel):
type: str = ""
@@ -222,6 +225,20 @@ class CustomToolDelta(BaseObj):
file_ids: list[str] | None = None
class DeepResearchPlanStart(BaseObj):
type: Literal["deep_research_plan_start"] = (
StreamingType.DEEP_RESEARCH_PLAN_START.value
)
class DeepResearchPlanDelta(BaseObj):
type: Literal["deep_research_plan_delta"] = (
StreamingType.DEEP_RESEARCH_PLAN_DELTA.value
)
content: str
"""Packet"""
# Discriminated union of all possible packet object types
@@ -254,6 +271,9 @@ PacketObj = Union[
ReasoningDone,
# Citation Packets
CitationInfo,
# Deep Research Packets
DeepResearchPlanStart,
DeepResearchPlanDelta,
]

View File

@@ -1,3 +1,4 @@
from enum import Enum
from typing import cast
from uuid import UUID
@@ -23,6 +24,7 @@ from onyx.db.models import Persona
from onyx.db.models import User
from onyx.db.oauth_config import get_oauth_config
from onyx.db.search_settings import get_current_search_settings
from onyx.db.tools import get_builtin_tool
from onyx.document_index.factory import get_default_document_index
from onyx.llm.interfaces import LLM
from onyx.llm.interfaces import LLMConfig
@@ -65,6 +67,12 @@ class CustomToolConfig(BaseModel):
additional_headers: dict[str, str] | None = None
class SearchToolUsage(str, Enum):
DISABLED = "disabled"
ENABLED = "enabled"
AUTO = "auto"
def _get_image_generation_config(llm: LLM, db_session: Session) -> LLMConfig:
"""Helper function to get image generation LLM config based on available providers"""
if llm and llm.config.api_key and llm.config.model_provider == "openai":
@@ -127,7 +135,7 @@ def construct_tools(
search_tool_config: SearchToolConfig | None = None,
custom_tool_config: CustomToolConfig | None = None,
allowed_tool_ids: list[int] | None = None,
disable_internal_search: bool = False,
search_usage_forcing_setting: SearchToolUsage = SearchToolUsage.AUTO,
) -> dict[int, list[Tool]]:
"""Constructs tools based on persona configuration and available APIs.
@@ -146,6 +154,7 @@ def construct_tools(
if user and user.oauth_accounts:
user_oauth_token = user.oauth_accounts[0].access_token
added_search_tool = False
for db_tool_model in persona.tools:
# If allowed_tool_ids is specified, skip tools not in the allowed list
if allowed_tool_ids is not None and db_tool_model.id not in allowed_tool_ids:
@@ -171,7 +180,8 @@ def construct_tools(
# Handle Internal Search Tool
if tool_cls.__name__ == SearchTool.__name__:
if disable_internal_search:
added_search_tool = True
if search_usage_forcing_setting == SearchToolUsage.DISABLED:
continue
if not search_tool_config:
@@ -180,7 +190,6 @@ def construct_tools(
search_settings = get_current_search_settings(db_session)
document_index = get_default_document_index(search_settings, None)
# TODO concerning passing the db_session here.
search_tool = SearchTool(
tool_id=db_tool_model.id,
db_session=db_session,
@@ -371,6 +380,36 @@ def construct_tools(
f"Tool '{expected_tool_name}' not found in MCP server '{mcp_server.name}'"
)
if (
not added_search_tool
and search_usage_forcing_setting == SearchToolUsage.ENABLED
):
# Get the database tool model for SearchTool
search_tool_db_model = get_builtin_tool(db_session, SearchTool)
# Use the passed-in config if available, otherwise create a new one
if not search_tool_config:
search_tool_config = SearchToolConfig()
search_settings = get_current_search_settings(db_session)
document_index = get_default_document_index(search_settings, None)
search_tool = SearchTool(
tool_id=search_tool_db_model.id,
db_session=db_session,
emitter=emitter,
user=user,
persona=persona,
llm=llm,
fast_llm=fast_llm,
document_index=document_index,
user_selected_filters=search_tool_config.user_selected_filters,
project_id=search_tool_config.project_id,
bypass_acl=search_tool_config.bypass_acl,
slack_context=search_tool_config.slack_context,
)
tool_dict[search_tool_db_model.id] = [search_tool]
tools: list[Tool] = []
for tool_list in tool_dict.values():
tools.extend(tool_list)

View File

@@ -2,6 +2,7 @@ from collections.abc import Sequence
from exa_py import Exa
from exa_py.api import HighlightsContentsOptions
from fastapi import HTTPException
from onyx.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from onyx.tools.tool_implementations.open_url.models import WebContent
@@ -12,8 +13,11 @@ from onyx.tools.tool_implementations.web_search.models import (
from onyx.tools.tool_implementations.web_search.models import (
WebSearchResult,
)
from onyx.utils.logger import setup_logger
from onyx.utils.retry_wrapper import retry_builder
logger = setup_logger()
# TODO can probably break this up
class ExaClient(WebSearchProvider, WebContentProvider):
@@ -48,6 +52,35 @@ class ExaClient(WebSearchProvider, WebContentProvider):
for result in response.results
]
def test_connection(self) -> dict[str, str]:
try:
test_results = self.search("test")
if not test_results or not any(result.link for result in test_results):
raise HTTPException(
status_code=400,
detail="API key validation failed: search returned no results.",
)
except HTTPException:
raise
except Exception as e:
error_msg = str(e)
if (
"api" in error_msg.lower()
or "key" in error_msg.lower()
or "auth" in error_msg.lower()
):
raise HTTPException(
status_code=400,
detail=f"Invalid Exa API key: {error_msg}",
) from e
raise HTTPException(
status_code=400,
detail=f"Exa API key validation failed: {error_msg}",
) from e
logger.info("Web search provider test succeeded for Exa.")
return {"status": "ok"}
@retry_builder(tries=3, delay=1, backoff=2)
def contents(self, urls: Sequence[str]) -> list[WebContent]:
response = self.exa.get_contents(

View File

@@ -4,6 +4,7 @@ from datetime import datetime
from typing import Any
import requests
from fastapi import HTTPException
from onyx.tools.tool_implementations.web_search.models import (
WebSearchProvider,
@@ -28,7 +29,7 @@ class GooglePSEClient(WebSearchProvider):
) -> None:
self._api_key = api_key
self._search_engine_id = search_engine_id
self._num_results = num_results
self._num_results = min(num_results, 10) # Google API max is 10
self._timeout_seconds = timeout_seconds
@retry_builder(tries=3, delay=1, backoff=2)
@@ -119,3 +120,38 @@ class GooglePSEClient(WebSearchProvider):
)
return results
# TODO: I'm not really satisfied with how tailored this is to the particulars of Google PSE.
# In particular, I think this might flatten errors that are caused by the API key vs. ones caused
# by the search engine ID, or by other factors.
# I (David Edelstein) don't feel knowledgeable enough about the return behavior of the Google PSE API
# to ensure that we have nicely descriptive and actionable error messages. (Like, what's up with the
# thing where 200 status codes can have error messages in the response body?)
def test_connection(self) -> dict[str, str]:
try:
test_results = self.search("test")
if not test_results or not any(result.link for result in test_results):
raise HTTPException(
status_code=400,
detail="Google PSE validation failed: search returned no results.",
)
except HTTPException:
raise
except Exception as e:
error_msg = str(e)
if (
"api" in error_msg.lower()
or "key" in error_msg.lower()
or "auth" in error_msg.lower()
):
raise HTTPException(
status_code=400,
detail=f"Invalid Google PSE API key: {error_msg}",
) from e
raise HTTPException(
status_code=400,
detail=f"Google PSE validation failed: {error_msg}",
) from e
logger.info("Web search provider test succeeded for Google PSE.")
return {"status": "ok"}

View File

@@ -0,0 +1,137 @@
import requests
from fastapi import HTTPException
from onyx.tools.tool_implementations.web_search.models import (
WebSearchProvider,
)
from onyx.tools.tool_implementations.web_search.models import (
WebSearchResult,
)
from onyx.utils.logger import setup_logger
from onyx.utils.retry_wrapper import retry_builder
logger = setup_logger()
class SearXNGClient(WebSearchProvider):
def __init__(
self,
searxng_base_url: str,
num_results: int = 10,
) -> None:
logger.debug(f"Initializing SearXNGClient with base URL: {searxng_base_url}")
self._searxng_base_url = searxng_base_url
self._num_results = num_results
@retry_builder(tries=3, delay=1, backoff=2)
def search(self, query: str) -> list[WebSearchResult]:
payload = {
"q": query,
"format": "json",
}
logger.debug(
f"Searching with payload: {payload} to {self._searxng_base_url}/search"
)
response = requests.post(
f"{self._searxng_base_url}/search",
data=payload,
)
response.raise_for_status()
results = response.json()
result_list = results.get("results", [])
# SearXNG doesn't support limiting results via API parameters,
# so we limit client-side after receiving the response
limited_results = result_list[: self._num_results]
return [
WebSearchResult(
title=result["title"],
link=result["url"],
snippet=result["content"],
)
for result in limited_results
]
def test_connection(self) -> dict[str, str]:
try:
logger.debug(f"Testing connection to {self._searxng_base_url}/config")
response = requests.get(f"{self._searxng_base_url}/config")
logger.debug(f"Response: {response.status_code}, text: {response.text}")
response.raise_for_status()
except requests.HTTPError as e:
status_code = e.response.status_code
logger.debug(
f"HTTPError: status_code={status_code}, e.response={e.response.status_code if e.response else None}, error={e}"
)
if status_code == 429:
raise HTTPException(
status_code=400,
detail=(
"This SearXNG instance does not allow API requests. "
"Use a private instance and configure it to allow bots."
),
) from e
elif status_code == 404:
raise HTTPException(
status_code=400,
detail="This SearXNG instance was not found. Please check the URL and try again.",
) from e
else:
raise HTTPException(
status_code=400,
detail=f"SearXNG connection failed (status {status_code}): {str(e)}",
) from e
# Not a sure way to check if this is a SearXNG instance as opposed to some other website that
# happens to have a /config endpoint containing a "brand" key with a "GIT_URL" key with value
# "https://github.com/searxng/searxng". I don't think that would happen by coincidence, so I
# think this is a good enough check for now. I'm open for suggestions on improvements.
config = response.json()
if (
config.get("brand", {}).get("GIT_URL")
!= "https://github.com/searxng/searxng"
):
raise HTTPException(
status_code=400,
detail="This does not appear to be a SearXNG instance. Please check the URL and try again.",
)
# Test that JSON mode is enabled by performing a simple search
self._test_json_mode()
logger.info("Web search provider test succeeded for SearXNG.")
return {"status": "ok"}
def _test_json_mode(self) -> None:
"""Test that JSON format is enabled in SearXNG settings.
SearXNG requires JSON format to be explicitly enabled in settings.yml.
If it's not enabled, the search endpoint returns a 403.
"""
try:
payload = {
"q": "test",
"format": "json",
}
response = requests.post(
f"{self._searxng_base_url}/search",
data=payload,
timeout=5,
)
response.raise_for_status()
except requests.HTTPError as e:
status_code = e.response.status_code if e.response is not None else None
if status_code == 403:
raise HTTPException(
status_code=400,
detail=(
"Got a 403 response when trying to reach SearXNG. This likely means that "
"JSON format is not enabled on this SearXNG instance. "
"Please enable JSON format in your SearXNG settings.yml file by adding "
"'json' to the 'search.formats' list."
),
) from e
raise HTTPException(
status_code=400,
detail=f"Failed to test search on SearXNG instance (status {status_code}): {str(e)}",
) from e

View File

@@ -3,6 +3,7 @@ from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
import requests
from fastapi import HTTPException
from onyx.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from onyx.tools.tool_implementations.open_url.models import WebContent
@@ -13,8 +14,11 @@ from onyx.tools.tool_implementations.web_search.models import (
from onyx.tools.tool_implementations.web_search.models import (
WebSearchResult,
)
from onyx.utils.logger import setup_logger
from onyx.utils.retry_wrapper import retry_builder
logger = setup_logger()
SERPER_SEARCH_URL = "https://google.serper.dev/search"
SERPER_CONTENTS_URL = "https://scrape.serper.dev"
@@ -56,6 +60,35 @@ class SerperClient(WebSearchProvider, WebContentProvider):
for result in organic_results
]
def test_connection(self) -> dict[str, str]:
try:
test_results = self.search("test")
if not test_results or not any(result.link for result in test_results):
raise HTTPException(
status_code=400,
detail="API key validation failed: search returned no results.",
)
except HTTPException:
raise
except Exception as e:
error_msg = str(e)
if (
"api" in error_msg.lower()
or "key" in error_msg.lower()
or "auth" in error_msg.lower()
):
raise HTTPException(
status_code=400,
detail=f"Invalid Serper API key: {error_msg}",
) from e
raise HTTPException(
status_code=400,
detail=f"Serper API key validation failed: {error_msg}",
) from e
logger.info("Web search provider test succeeded for Serper.")
return {"status": "ok"}
def contents(self, urls: Sequence[str]) -> list[WebContent]:
if not urls:
return []

View File

@@ -41,6 +41,10 @@ class WebSearchProvider:
def search(self, query: str) -> Sequence[WebSearchResult]:
pass
@abstractmethod
def test_connection(self) -> dict[str, str]:
pass
class WebContentProviderConfig(BaseModel):
timeout_seconds: int | None = None

View File

@@ -13,6 +13,9 @@ from onyx.tools.tool_implementations.web_search.clients.exa_client import (
from onyx.tools.tool_implementations.web_search.clients.google_pse_client import (
GooglePSEClient,
)
from onyx.tools.tool_implementations.web_search.clients.searxng_client import (
SearXNGClient,
)
from onyx.tools.tool_implementations.web_search.clients.serper_client import (
SerperClient,
)
@@ -55,6 +58,14 @@ def build_search_provider_from_config(
num_results=num_results,
timeout_seconds=int(config.get("timeout_seconds") or 10),
)
if provider_type == WebSearchProviderType.SEARXNG:
searxng_base_url = config.get("searxng_base_url")
if not searxng_base_url:
raise ValueError("Please provide a URL for your private SearXNG instance.")
return SearXNGClient(
searxng_base_url,
num_results=num_results,
)
def _build_search_provider(provider_model: InternetSearchProvider) -> WebSearchProvider:

View File

@@ -260,7 +260,7 @@ fastmcp==2.13.3
# via onyx
fastuuid==0.14.0
# via litellm
filelock==3.15.4
filelock==3.20.1
# via
# huggingface-hub
# onyx
@@ -394,7 +394,7 @@ httpx-sse==0.4.3
# via
# cohere
# mcp
hubspot-api-client==8.1.0
hubspot-api-client==11.1.0
# via onyx
huggingface-hub==0.35.3
# via
@@ -898,6 +898,7 @@ requests==2.32.5
# google-cloud-bigquery
# google-cloud-storage
# google-genai
# hubspot-api-client
# huggingface-hub
# jira
# jsonschema-path
@@ -1141,7 +1142,7 @@ unstructured-client==0.25.4
# unstructured
uritemplate==4.2.0
# via google-api-python-client
urllib3==2.6.0
urllib3==2.6.1
# via
# asana
# botocore

View File

@@ -108,7 +108,7 @@ fastavro==1.12.1
# via cohere
fastuuid==0.14.0
# via litellm
filelock==3.15.4
filelock==3.20.1
# via
# huggingface-hub
# virtualenv
@@ -526,7 +526,7 @@ typing-inspection==0.4.2
# via pydantic
tzdata==2025.2
# via faker
urllib3==2.6.0
urllib3==2.6.1
# via
# botocore
# requests

View File

@@ -77,7 +77,7 @@ fastavro==1.12.1
# via cohere
fastuuid==0.14.0
# via litellm
filelock==3.15.4
filelock==3.20.1
# via huggingface-hub
frozenlist==1.8.0
# via
@@ -338,7 +338,7 @@ typing-extensions==4.15.0
# typing-inspection
typing-inspection==0.4.2
# via pydantic
urllib3==2.6.0
urllib3==2.6.1
# via
# botocore
# requests

View File

@@ -112,7 +112,7 @@ fastavro==1.12.1
# via cohere
fastuuid==0.14.0
# via litellm
filelock==3.15.4
filelock==3.20.1
# via
# datasets
# huggingface-hub
@@ -520,7 +520,7 @@ tzdata==2025.2
# via
# kombu
# pandas
urllib3==2.6.0
urllib3==2.6.1
# via
# botocore
# requests

View File

@@ -0,0 +1,230 @@
# Onyx Data Backup & Restore Scripts
Scripts for backing up and restoring PostgreSQL, Vespa, and MinIO data from an Onyx deployment.
## Overview
Two backup modes are supported:
| Mode | Description | Pros | Cons |
|------|-------------|------|------|
| **volume** | Exports Docker volumes directly | Fast, complete, preserves everything | Services must be stopped for consistency |
| **api** | Uses pg_dump and Vespa REST API | Services can stay running, more portable | Slower for large datasets |
## Quick Start
### Backup (from a running instance)
```bash
# Full backup using volume mode (recommended for complete backups)
# Note: For consistency, stop services first
docker compose -f deployment/docker_compose/docker-compose.yml stop
./scripts/dump_data.sh --mode volume --output ./backups
docker compose -f deployment/docker_compose/docker-compose.yml start
# Or use API mode (services can stay running)
./scripts/dump_data.sh --mode api --output ./backups
```
### Restore (to a local instance)
```bash
# Restore from latest backup
./scripts/restore_data.sh --input ./backups/latest
# Restore from specific backup
./scripts/restore_data.sh --input ./backups/20240115_120000
# Force restore without confirmation
./scripts/restore_data.sh --input ./backups/latest --force
```
## Detailed Usage
### dump_data.sh
```
Usage: ./scripts/dump_data.sh [OPTIONS]
Options:
--mode <volume|api> Backup mode (default: volume)
--output <dir> Output directory (default: ./onyx_backup)
--project <name> Docker Compose project name (default: onyx)
--postgres-only Only backup PostgreSQL
--vespa-only Only backup Vespa
--minio-only Only backup MinIO
--no-minio Skip MinIO backup
--help Show help message
```
**Examples:**
```bash
# Default volume backup
./scripts/dump_data.sh
# API-based backup
./scripts/dump_data.sh --mode api
# Only backup PostgreSQL
./scripts/dump_data.sh --postgres-only --mode api
# Custom output directory
./scripts/dump_data.sh --output /mnt/backups/onyx
# Different project name (if using custom docker compose project)
./scripts/dump_data.sh --project my-onyx-instance
```
### restore_data.sh
```
Usage: ./scripts/restore_data.sh [OPTIONS]
Options:
--input <dir> Backup directory (required)
--project <name> Docker Compose project name (default: onyx)
--postgres-only Only restore PostgreSQL
--vespa-only Only restore Vespa
--minio-only Only restore MinIO
--no-minio Skip MinIO restore
--force Skip confirmation prompts
--help Show help message
```
**Examples:**
```bash
# Restore all components
./scripts/restore_data.sh --input ./onyx_backup/latest
# Restore only PostgreSQL
./scripts/restore_data.sh --input ./onyx_backup/latest --postgres-only
# Non-interactive restore
./scripts/restore_data.sh --input ./onyx_backup/latest --force
```
## Backup Directory Structure
After running a backup, the output directory contains:
```
onyx_backup/
├── 20240115_120000/ # Timestamp-named backup
│ ├── metadata.json # Backup metadata
│ ├── postgres_volume.tar.gz # PostgreSQL data (volume mode)
│ ├── postgres_dump.backup # PostgreSQL dump (api mode)
│ ├── vespa_volume.tar.gz # Vespa data (volume mode)
│ ├── vespa_documents.jsonl # Vespa documents (api mode)
│ ├── minio_volume.tar.gz # MinIO data (volume mode)
│ └── minio_data.tar.gz # MinIO data (api mode)
└── latest -> 20240115_120000 # Symlink to latest backup
```
## Environment Variables
You can customize behavior with environment variables:
```bash
# PostgreSQL settings
export POSTGRES_USER=postgres
export POSTGRES_PASSWORD=password
export POSTGRES_DB=postgres
export POSTGRES_PORT=5432
# Vespa settings
export VESPA_HOST=localhost
export VESPA_PORT=8081
export VESPA_INDEX=danswer_index
```
## Typical Workflows
### Migrate to a new server
```bash
# On source server
./scripts/dump_data.sh --mode volume --output ./migration_backup
tar czf onyx_backup.tar.gz ./migration_backup/latest
# Transfer to new server
scp onyx_backup.tar.gz newserver:/opt/onyx/
# On new server
cd /opt/onyx
tar xzf onyx_backup.tar.gz
./scripts/restore_data.sh --input ./migration_backup/latest --force
docker compose up -d
```
### Create a development copy from production
```bash
# On production (use API mode to avoid downtime)
./scripts/dump_data.sh --mode api --output ./prod_backup
# Copy to dev machine
rsync -avz ./prod_backup/latest dev-machine:/home/dev/onyx_backup/
# On dev machine
./scripts/restore_data.sh --input /home/dev/onyx_backup --force
docker compose -f docker-compose.yml -f docker-compose.dev.yml up -d
```
### Scheduled backups (cron)
```bash
# Add to crontab: crontab -e
# Daily backup at 2 AM
0 2 * * * cd /opt/onyx && ./scripts/dump_data.sh --mode api --output /backups/onyx >> /var/log/onyx-backup.log 2>&1
# Weekly cleanup (keep last 7 days)
0 3 * * 0 find /backups/onyx -maxdepth 1 -type d -mtime +7 -exec rm -rf {} \;
```
## Troubleshooting
### "Volume not found" error
Ensure the Docker Compose project name matches:
```bash
docker volume ls | grep db_volume
# If it shows "myproject_db_volume", use --project myproject
```
### "Container not running" error (API mode)
Start the required services:
```bash
cd deployment/docker_compose
docker compose up -d relational_db index minio
```
### Vespa restore fails with "not ready"
Vespa takes time to initialize. Wait and retry:
```bash
# Check Vespa health
curl http://localhost:8081/state/v1/health
```
### PostgreSQL restore shows warnings
`pg_restore` often shows warnings about objects that don't exist (when using `--clean`). These are usually safe to ignore if the restore completes.
## Alternative: Python Script
For more control, you can also use the existing Python script:
```bash
cd backend
# Save state
python -m scripts.save_load_state --save --checkpoint_dir ../onyx_checkpoint
# Load state
python -m scripts.save_load_state --load --checkpoint_dir ../onyx_checkpoint
```
See `backend/scripts/save_load_state.py` for the Python implementation.

478
backend/scripts/dump/dump_data.sh Executable file
View File

@@ -0,0 +1,478 @@
#!/bin/bash
# =============================================================================
# Onyx Data Dump Script
# =============================================================================
# This script creates a backup of PostgreSQL, Vespa, and MinIO data.
#
# Two modes available:
# - volume: Exports Docker volumes directly (faster, complete backup)
# - api: Uses pg_dump and Vespa API (more portable)
#
# Usage:
# ./dump_data.sh [OPTIONS]
#
# Options:
# --mode <volume|api> Backup mode (default: volume)
# --output <dir> Output directory (default: ./onyx_backup)
# --project <name> Docker Compose project name (default: onyx)
# --volume-prefix <name> Volume name prefix (default: same as project name)
# --compose-dir <dir> Docker Compose directory (for service management)
# --postgres-only Only backup PostgreSQL
# --vespa-only Only backup Vespa
# --minio-only Only backup MinIO
# --no-minio Skip MinIO backup
# --no-restart Don't restart services after backup (volume mode)
# --help Show this help message
#
# Examples:
# ./dump_data.sh # Full volume backup
# ./dump_data.sh --mode api # API-based backup
# ./dump_data.sh --output /tmp/backup # Custom output directory
# ./dump_data.sh --postgres-only --mode api # Only PostgreSQL via pg_dump
# ./dump_data.sh --volume-prefix myprefix # Use custom volume prefix
# =============================================================================
set -e
# Default configuration
MODE="volume"
OUTPUT_DIR="./onyx_backup"
PROJECT_NAME="onyx"
VOLUME_PREFIX="" # Will default to PROJECT_NAME if not set
COMPOSE_DIR="" # Docker Compose directory for service management
BACKUP_POSTGRES=true
BACKUP_VESPA=true
BACKUP_MINIO=true
NO_RESTART=false
# PostgreSQL defaults
POSTGRES_USER="${POSTGRES_USER:-postgres}"
POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-password}"
POSTGRES_DB="${POSTGRES_DB:-postgres}"
POSTGRES_PORT="${POSTGRES_PORT:-5432}"
# Vespa defaults
VESPA_HOST="${VESPA_HOST:-localhost}"
VESPA_PORT="${VESPA_PORT:-8081}"
VESPA_INDEX="${VESPA_INDEX:-danswer_index}"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
log_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
show_help() {
head -35 "$0" | tail -32
exit 0
}
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
--mode)
MODE="$2"
shift 2
;;
--output)
OUTPUT_DIR="$2"
shift 2
;;
--project)
PROJECT_NAME="$2"
shift 2
;;
--volume-prefix)
VOLUME_PREFIX="$2"
shift 2
;;
--compose-dir)
COMPOSE_DIR="$2"
shift 2
;;
--no-restart)
NO_RESTART=true
shift
;;
--postgres-only)
BACKUP_POSTGRES=true
BACKUP_VESPA=false
BACKUP_MINIO=false
shift
;;
--vespa-only)
BACKUP_POSTGRES=false
BACKUP_VESPA=true
BACKUP_MINIO=false
shift
;;
--minio-only)
BACKUP_POSTGRES=false
BACKUP_VESPA=false
BACKUP_MINIO=true
shift
;;
--no-minio)
BACKUP_MINIO=false
shift
;;
--help)
show_help
;;
*)
log_error "Unknown option: $1"
exit 1
;;
esac
done
# Validate mode
if [[ "$MODE" != "volume" && "$MODE" != "api" ]]; then
log_error "Invalid mode: $MODE. Use 'volume' or 'api'"
exit 1
fi
# Set VOLUME_PREFIX to PROJECT_NAME if not specified
if [[ -z "$VOLUME_PREFIX" ]]; then
VOLUME_PREFIX="$PROJECT_NAME"
fi
# Create output directory with timestamp
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_DIR="${OUTPUT_DIR}/${TIMESTAMP}"
mkdir -p "$BACKUP_DIR"
log_info "Starting Onyx data backup..."
log_info "Mode: $MODE"
log_info "Output directory: $BACKUP_DIR"
log_info "Project name: $PROJECT_NAME"
log_info "Volume prefix: $VOLUME_PREFIX"
# Get container names
POSTGRES_CONTAINER="${PROJECT_NAME}-relational_db-1"
VESPA_CONTAINER="${PROJECT_NAME}-index-1"
MINIO_CONTAINER="${PROJECT_NAME}-minio-1"
# Track which services were stopped
STOPPED_SERVICES=()
# =============================================================================
# Service management functions
# =============================================================================
stop_service() {
local service=$1
local container="${PROJECT_NAME}-${service}-1"
if docker ps --format '{{.Names}}' | grep -q "^${container}$"; then
log_info "Stopping ${service}..."
if [[ -n "$COMPOSE_DIR" ]]; then
docker compose -p "$PROJECT_NAME" -f "${COMPOSE_DIR}/docker-compose.yml" stop "$service" 2>/dev/null || \
docker stop "$container"
else
docker stop "$container"
fi
STOPPED_SERVICES+=("$service")
fi
}
start_services() {
if [[ ${#STOPPED_SERVICES[@]} -eq 0 ]]; then
return
fi
log_info "Restarting services: ${STOPPED_SERVICES[*]}"
if [[ -n "$COMPOSE_DIR" ]]; then
docker compose -p "$PROJECT_NAME" -f "${COMPOSE_DIR}/docker-compose.yml" start "${STOPPED_SERVICES[@]}" 2>/dev/null || {
# Fallback to starting containers directly
for service in "${STOPPED_SERVICES[@]}"; do
docker start "${PROJECT_NAME}-${service}-1" 2>/dev/null || true
done
}
else
for service in "${STOPPED_SERVICES[@]}"; do
docker start "${PROJECT_NAME}-${service}-1" 2>/dev/null || true
done
fi
}
# =============================================================================
# Volume-based backup functions
# =============================================================================
backup_postgres_volume() {
log_info "Backing up PostgreSQL volume..."
local volume_name="${VOLUME_PREFIX}_db_volume"
# Check if volume exists
if ! docker volume inspect "$volume_name" &>/dev/null; then
log_error "PostgreSQL volume '$volume_name' not found"
return 1
fi
# Export volume to tar
docker run --rm \
-v "${volume_name}:/source:ro" \
-v "${BACKUP_DIR}:/backup" \
alpine tar czf /backup/postgres_volume.tar.gz -C /source .
log_success "PostgreSQL volume backed up to postgres_volume.tar.gz"
}
backup_vespa_volume() {
log_info "Backing up Vespa volume..."
local volume_name="${VOLUME_PREFIX}_vespa_volume"
# Check if volume exists
if ! docker volume inspect "$volume_name" &>/dev/null; then
log_error "Vespa volume '$volume_name' not found"
return 1
fi
# Export volume to tar
docker run --rm \
-v "${volume_name}:/source:ro" \
-v "${BACKUP_DIR}:/backup" \
alpine tar czf /backup/vespa_volume.tar.gz -C /source .
log_success "Vespa volume backed up to vespa_volume.tar.gz"
}
backup_minio_volume() {
log_info "Backing up MinIO volume..."
local volume_name="${VOLUME_PREFIX}_minio_data"
# Check if volume exists
if ! docker volume inspect "$volume_name" &>/dev/null; then
log_error "MinIO volume '$volume_name' not found"
return 1
fi
# Export volume to tar
docker run --rm \
-v "${volume_name}:/source:ro" \
-v "${BACKUP_DIR}:/backup" \
alpine tar czf /backup/minio_volume.tar.gz -C /source .
log_success "MinIO volume backed up to minio_volume.tar.gz"
}
# =============================================================================
# API-based backup functions
# =============================================================================
backup_postgres_api() {
log_info "Backing up PostgreSQL via pg_dump..."
# Check if container is running
if ! docker ps --format '{{.Names}}' | grep -q "^${POSTGRES_CONTAINER}$"; then
log_error "PostgreSQL container '$POSTGRES_CONTAINER' is not running"
return 1
fi
# Create dump using pg_dump inside container
docker exec "$POSTGRES_CONTAINER" \
pg_dump -U "$POSTGRES_USER" -F c -b -v "$POSTGRES_DB" \
> "${BACKUP_DIR}/postgres_dump.backup"
log_success "PostgreSQL backed up to postgres_dump.backup"
}
backup_vespa_api() {
log_info "Backing up Vespa via API..."
local endpoint="http://${VESPA_HOST}:${VESPA_PORT}/document/v1/default/${VESPA_INDEX}/docid"
local output_file="${BACKUP_DIR}/vespa_documents.jsonl"
local continuation=""
local total_docs=0
# Check if Vespa is accessible
if ! curl -s -o /dev/null -w "%{http_code}" "$endpoint" | grep -q "200\|404"; then
# Try via container if localhost doesn't work
if docker ps --format '{{.Names}}' | grep -q "^${VESPA_CONTAINER}$"; then
log_warning "Vespa not accessible on $VESPA_HOST:$VESPA_PORT, trying via container..."
endpoint="http://localhost:8081/document/v1/default/${VESPA_INDEX}/docid"
else
log_error "Cannot connect to Vespa at $endpoint"
return 1
fi
fi
# Clear output file
> "$output_file"
# Fetch documents with pagination
while true; do
local url="$endpoint"
if [[ -n "$continuation" ]]; then
url="${endpoint}?continuation=${continuation}"
fi
local response
response=$(curl -s "$url")
# Extract continuation token
continuation=$(echo "$response" | jq -r '.continuation // empty')
# Extract and save documents
local docs
docs=$(echo "$response" | jq -c '.documents[]? | {update: .id, create: true, fields: .fields}')
if [[ -n "$docs" ]]; then
echo "$docs" >> "$output_file"
local count
count=$(echo "$docs" | wc -l)
total_docs=$((total_docs + count))
log_info " Fetched $total_docs documents so far..."
fi
# Check if we're done
if [[ -z "$continuation" ]]; then
break
fi
done
log_success "Vespa backed up to vespa_documents.jsonl ($total_docs documents)"
}
backup_minio_api() {
log_info "Backing up MinIO data..."
local minio_dir="${BACKUP_DIR}/minio_data"
mkdir -p "$minio_dir"
# Check if mc (MinIO client) is available
if command -v mc &>/dev/null; then
# Configure mc alias for local minio
mc alias set onyx-backup http://localhost:9004 minioadmin minioadmin 2>/dev/null || true
# Mirror all buckets
mc mirror onyx-backup/ "$minio_dir/" 2>/dev/null || {
log_warning "mc mirror failed, falling back to volume backup"
backup_minio_volume
return
}
else
# Fallback: copy from container
if docker ps --format '{{.Names}}' | grep -q "^${MINIO_CONTAINER}$"; then
docker cp "${MINIO_CONTAINER}:/data/." "$minio_dir/"
else
log_warning "MinIO container not running and mc not available, using volume backup"
backup_minio_volume
return
fi
fi
# Compress the data
tar czf "${BACKUP_DIR}/minio_data.tar.gz" -C "$minio_dir" .
rm -rf "$minio_dir"
log_success "MinIO backed up to minio_data.tar.gz"
}
# =============================================================================
# Main backup logic
# =============================================================================
# Save metadata
cat > "${BACKUP_DIR}/metadata.json" << EOF
{
"timestamp": "$TIMESTAMP",
"mode": "$MODE",
"project_name": "$PROJECT_NAME",
"volume_prefix": "$VOLUME_PREFIX",
"postgres_db": "$POSTGRES_DB",
"vespa_index": "$VESPA_INDEX",
"components": {
"postgres": $BACKUP_POSTGRES,
"vespa": $BACKUP_VESPA,
"minio": $BACKUP_MINIO
}
}
EOF
# Run backups based on mode
if [[ "$MODE" == "volume" ]]; then
log_info "Using volume-based backup"
# Stop services for consistent backup
log_info "Stopping services for consistent backup..."
if $BACKUP_POSTGRES; then
stop_service "relational_db"
fi
if $BACKUP_VESPA; then
stop_service "index"
fi
if $BACKUP_MINIO; then
stop_service "minio"
fi
# Perform backups
if $BACKUP_POSTGRES; then
backup_postgres_volume || log_warning "PostgreSQL backup failed"
fi
if $BACKUP_VESPA; then
backup_vespa_volume || log_warning "Vespa backup failed"
fi
if $BACKUP_MINIO; then
backup_minio_volume || log_warning "MinIO backup failed"
fi
# Restart services unless --no-restart was specified
if [[ "$NO_RESTART" != true ]]; then
start_services
else
log_info "Skipping service restart (--no-restart specified)"
log_info "Stopped services: ${STOPPED_SERVICES[*]}"
fi
else
log_info "Using API-based backup (services must be running)"
if $BACKUP_POSTGRES; then
backup_postgres_api || log_warning "PostgreSQL backup failed"
fi
if $BACKUP_VESPA; then
backup_vespa_api || log_warning "Vespa backup failed"
fi
if $BACKUP_MINIO; then
backup_minio_api || log_warning "MinIO backup failed"
fi
fi
# Calculate total size
TOTAL_SIZE=$(du -sh "$BACKUP_DIR" | cut -f1)
log_success "==================================="
log_success "Backup completed!"
log_success "Location: $BACKUP_DIR"
log_success "Total size: $TOTAL_SIZE"
log_success "==================================="
# Create a symlink to latest backup
ln -sfn "$TIMESTAMP" "${OUTPUT_DIR}/latest"
log_info "Symlink created: ${OUTPUT_DIR}/latest -> $TIMESTAMP"

View File

@@ -0,0 +1,580 @@
#!/bin/bash
# =============================================================================
# Onyx Data Restore Script
# =============================================================================
# This script restores PostgreSQL, Vespa, and MinIO data from a backup.
#
# The script auto-detects the backup mode based on files present:
# - *_volume.tar.gz files -> volume restore
# - postgres_dump.backup / vespa_documents.jsonl -> api restore
#
# Usage:
# ./restore_data.sh [OPTIONS]
#
# Options:
# --input <dir> Backup directory (required, or use 'latest')
# --project <name> Docker Compose project name (default: onyx)
# --volume-prefix <name> Volume name prefix (default: same as project name)
# --compose-dir <dir> Docker Compose directory (for service management)
# --postgres-only Only restore PostgreSQL
# --vespa-only Only restore Vespa
# --minio-only Only restore MinIO
# --no-minio Skip MinIO restore
# --no-restart Don't restart services after restore (volume mode)
# --force Skip confirmation prompts
# --help Show this help message
#
# Examples:
# ./restore_data.sh --input ./onyx_backup/latest
# ./restore_data.sh --input ./onyx_backup/20240115_120000 --force
# ./restore_data.sh --input ./onyx_backup/latest --postgres-only
# ./restore_data.sh --input ./backup --volume-prefix myprefix
#
# WARNING: This will overwrite existing data in the target instance!
# =============================================================================
set -e
# Default configuration
INPUT_DIR=""
PROJECT_NAME="onyx"
VOLUME_PREFIX="" # Will default to PROJECT_NAME if not set
COMPOSE_DIR="" # Docker Compose directory for service management
RESTORE_POSTGRES=true
RESTORE_VESPA=true
RESTORE_MINIO=true
FORCE=false
NO_RESTART=false
# PostgreSQL defaults
POSTGRES_USER="${POSTGRES_USER:-postgres}"
POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-password}"
POSTGRES_DB="${POSTGRES_DB:-postgres}"
POSTGRES_PORT="${POSTGRES_PORT:-5432}"
# Vespa defaults
VESPA_HOST="${VESPA_HOST:-localhost}"
VESPA_PORT="${VESPA_PORT:-8081}"
VESPA_INDEX="${VESPA_INDEX:-danswer_index}"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
log_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
show_help() {
head -36 "$0" | tail -33
exit 0
}
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
--input)
INPUT_DIR="$2"
shift 2
;;
--project)
PROJECT_NAME="$2"
shift 2
;;
--volume-prefix)
VOLUME_PREFIX="$2"
shift 2
;;
--compose-dir)
COMPOSE_DIR="$2"
shift 2
;;
--no-restart)
NO_RESTART=true
shift
;;
--postgres-only)
RESTORE_POSTGRES=true
RESTORE_VESPA=false
RESTORE_MINIO=false
shift
;;
--vespa-only)
RESTORE_POSTGRES=false
RESTORE_VESPA=true
RESTORE_MINIO=false
shift
;;
--minio-only)
RESTORE_POSTGRES=false
RESTORE_VESPA=false
RESTORE_MINIO=true
shift
;;
--no-minio)
RESTORE_MINIO=false
shift
;;
--force)
FORCE=true
shift
;;
--help)
show_help
;;
*)
log_error "Unknown option: $1"
exit 1
;;
esac
done
# Validate input directory
if [[ -z "$INPUT_DIR" ]]; then
log_error "Input directory is required. Use --input <dir>"
exit 1
fi
# Resolve symlinks (e.g., 'latest')
INPUT_DIR=$(cd "$INPUT_DIR" && pwd)
if [[ ! -d "$INPUT_DIR" ]]; then
log_error "Input directory not found: $INPUT_DIR"
exit 1
fi
# Load metadata if available
METADATA_FILE="${INPUT_DIR}/metadata.json"
if [[ -f "$METADATA_FILE" ]]; then
log_info "Loading backup metadata..."
BACKUP_MODE=$(jq -r '.mode // "unknown"' "$METADATA_FILE")
BACKUP_TIMESTAMP=$(jq -r '.timestamp // "unknown"' "$METADATA_FILE")
log_info " Backup timestamp: $BACKUP_TIMESTAMP"
log_info " Backup mode: $BACKUP_MODE"
fi
# Set VOLUME_PREFIX to PROJECT_NAME if not specified
if [[ -z "$VOLUME_PREFIX" ]]; then
VOLUME_PREFIX="$PROJECT_NAME"
fi
log_info "Volume prefix: $VOLUME_PREFIX"
# Track which services were stopped
STOPPED_SERVICES=()
# =============================================================================
# Service management functions
# =============================================================================
stop_service() {
local service=$1
local container="${PROJECT_NAME}-${service}-1"
if docker ps --format '{{.Names}}' | grep -q "^${container}$"; then
log_info "Stopping ${service}..."
if [[ -n "$COMPOSE_DIR" ]]; then
docker compose -p "$PROJECT_NAME" -f "${COMPOSE_DIR}/docker-compose.yml" stop "$service" 2>/dev/null || \
docker stop "$container"
else
docker stop "$container"
fi
STOPPED_SERVICES+=("$service")
fi
}
start_services() {
if [[ ${#STOPPED_SERVICES[@]} -eq 0 ]]; then
return
fi
log_info "Restarting services: ${STOPPED_SERVICES[*]}"
if [[ -n "$COMPOSE_DIR" ]]; then
docker compose -p "$PROJECT_NAME" -f "${COMPOSE_DIR}/docker-compose.yml" start "${STOPPED_SERVICES[@]}" 2>/dev/null || {
# Fallback to starting containers directly
for service in "${STOPPED_SERVICES[@]}"; do
docker start "${PROJECT_NAME}-${service}-1" 2>/dev/null || true
done
}
else
for service in "${STOPPED_SERVICES[@]}"; do
docker start "${PROJECT_NAME}-${service}-1" 2>/dev/null || true
done
fi
}
# Auto-detect backup mode based on files present
detect_backup_mode() {
if [[ -f "${INPUT_DIR}/postgres_volume.tar.gz" ]] || [[ -f "${INPUT_DIR}/vespa_volume.tar.gz" ]]; then
echo "volume"
elif [[ -f "${INPUT_DIR}/postgres_dump.backup" ]] || [[ -f "${INPUT_DIR}/vespa_documents.jsonl" ]]; then
echo "api"
else
echo "unknown"
fi
}
DETECTED_MODE=$(detect_backup_mode)
log_info "Detected backup mode: $DETECTED_MODE"
# Get container names
POSTGRES_CONTAINER="${PROJECT_NAME}-relational_db-1"
VESPA_CONTAINER="${PROJECT_NAME}-index-1"
MINIO_CONTAINER="${PROJECT_NAME}-minio-1"
# Confirmation prompt
if [[ "$FORCE" != true ]]; then
echo ""
log_warning "==================================="
log_warning "WARNING: This will overwrite existing data!"
log_warning "==================================="
echo ""
echo "Restore configuration:"
echo " Input directory: $INPUT_DIR"
echo " Project name: $PROJECT_NAME"
echo " Restore PostgreSQL: $RESTORE_POSTGRES"
echo " Restore Vespa: $RESTORE_VESPA"
echo " Restore MinIO: $RESTORE_MINIO"
echo ""
read -p "Are you sure you want to continue? (yes/no): " confirm
if [[ "$confirm" != "yes" ]]; then
log_info "Restore cancelled."
exit 0
fi
fi
# =============================================================================
# Volume-based restore functions
# =============================================================================
restore_postgres_volume() {
log_info "Restoring PostgreSQL from volume backup..."
local volume_name="${VOLUME_PREFIX}_db_volume"
local backup_file="${INPUT_DIR}/postgres_volume.tar.gz"
if [[ ! -f "$backup_file" ]]; then
log_error "PostgreSQL volume backup not found: $backup_file"
return 1
fi
# Remove existing volume and create new one
log_info "Recreating PostgreSQL volume..."
docker volume rm "$volume_name" 2>/dev/null || true
docker volume create "$volume_name"
# Restore volume from tar
docker run --rm \
-v "${volume_name}:/target" \
-v "${INPUT_DIR}:/backup:ro" \
alpine sh -c "cd /target && tar xzf /backup/postgres_volume.tar.gz"
log_success "PostgreSQL volume restored"
}
restore_vespa_volume() {
log_info "Restoring Vespa from volume backup..."
local volume_name="${VOLUME_PREFIX}_vespa_volume"
local backup_file="${INPUT_DIR}/vespa_volume.tar.gz"
if [[ ! -f "$backup_file" ]]; then
log_error "Vespa volume backup not found: $backup_file"
return 1
fi
# Remove existing volume and create new one
log_info "Recreating Vespa volume..."
docker volume rm "$volume_name" 2>/dev/null || true
docker volume create "$volume_name"
# Restore volume from tar
docker run --rm \
-v "${volume_name}:/target" \
-v "${INPUT_DIR}:/backup:ro" \
alpine sh -c "cd /target && tar xzf /backup/vespa_volume.tar.gz"
log_success "Vespa volume restored"
}
restore_minio_volume() {
log_info "Restoring MinIO from volume backup..."
local volume_name="${VOLUME_PREFIX}_minio_data"
local backup_file="${INPUT_DIR}/minio_volume.tar.gz"
if [[ ! -f "$backup_file" ]]; then
log_error "MinIO volume backup not found: $backup_file"
return 1
fi
# Remove existing volume and create new one
log_info "Recreating MinIO volume..."
docker volume rm "$volume_name" 2>/dev/null || true
docker volume create "$volume_name"
# Restore volume from tar
docker run --rm \
-v "${volume_name}:/target" \
-v "${INPUT_DIR}:/backup:ro" \
alpine sh -c "cd /target && tar xzf /backup/minio_volume.tar.gz"
log_success "MinIO volume restored"
}
# =============================================================================
# API-based restore functions
# =============================================================================
restore_postgres_api() {
log_info "Restoring PostgreSQL from pg_dump backup..."
local backup_file="${INPUT_DIR}/postgres_dump.backup"
if [[ ! -f "$backup_file" ]]; then
log_error "PostgreSQL dump not found: $backup_file"
return 1
fi
# Check if container is running
if ! docker ps --format '{{.Names}}' | grep -q "^${POSTGRES_CONTAINER}$"; then
log_error "PostgreSQL container '$POSTGRES_CONTAINER' is not running"
log_info "Please start the containers first: docker compose up -d relational_db"
return 1
fi
# Copy backup file to container
log_info "Copying backup file to container..."
docker cp "$backup_file" "${POSTGRES_CONTAINER}:/tmp/postgres_dump.backup"
# Drop and recreate database (optional, pg_restore --clean should handle this)
log_info "Restoring database..."
# Use pg_restore with --clean to drop objects before recreating
docker exec "$POSTGRES_CONTAINER" \
pg_restore -U "$POSTGRES_USER" -d "$POSTGRES_DB" \
--clean --if-exists --no-owner --no-privileges \
/tmp/postgres_dump.backup 2>&1 || {
# pg_restore may return non-zero even on success due to warnings
log_warning "pg_restore completed with warnings (this is often normal)"
}
# Cleanup
docker exec "$POSTGRES_CONTAINER" rm -f /tmp/postgres_dump.backup
log_success "PostgreSQL restored"
}
restore_vespa_api() {
log_info "Restoring Vespa from JSONL backup..."
local backup_file="${INPUT_DIR}/vespa_documents.jsonl"
if [[ ! -f "$backup_file" ]]; then
log_error "Vespa backup not found: $backup_file"
return 1
fi
local endpoint="http://${VESPA_HOST}:${VESPA_PORT}/document/v1/default/${VESPA_INDEX}/docid"
local total_docs=0
local failed_docs=0
# Check if Vespa is accessible
if ! curl -s -o /dev/null -w "%{http_code}" "http://${VESPA_HOST}:${VESPA_PORT}/state/v1/health" | grep -q "200"; then
log_error "Cannot connect to Vespa at ${VESPA_HOST}:${VESPA_PORT}"
log_info "Please ensure Vespa is running and accessible"
return 1
fi
# Wait for Vespa to be fully ready
log_info "Waiting for Vespa to be fully ready..."
local max_wait=60
local waited=0
while ! curl -s "http://${VESPA_HOST}:${VESPA_PORT}/state/v1/health" | grep -q '"status":{"code":"up"}'; do
if [[ $waited -ge $max_wait ]]; then
log_error "Vespa did not become ready within ${max_wait} seconds"
return 1
fi
sleep 2
waited=$((waited + 2))
done
# Restore documents
log_info "Restoring documents..."
while IFS= read -r line; do
if [[ -z "$line" ]]; then
continue
fi
# Extract document ID
local doc_id
doc_id=$(echo "$line" | jq -r '.update' | sed 's/.*:://')
# Post document
local response
response=$(curl -s -w "\n%{http_code}" -X POST \
-H "Content-Type: application/json" \
-d "$line" \
"${endpoint}/${doc_id}")
local http_code
http_code=$(echo "$response" | tail -1)
total_docs=$((total_docs + 1))
if [[ "$http_code" != "200" ]]; then
failed_docs=$((failed_docs + 1))
if [[ $failed_docs -le 5 ]]; then
log_warning "Failed to restore document $doc_id (HTTP $http_code)"
fi
fi
# Progress update
if [[ $((total_docs % 100)) -eq 0 ]]; then
log_info " Restored $total_docs documents..."
fi
done < "$backup_file"
if [[ $failed_docs -gt 0 ]]; then
log_warning "Vespa restored with $failed_docs failures out of $total_docs documents"
else
log_success "Vespa restored ($total_docs documents)"
fi
}
restore_minio_api() {
log_info "Restoring MinIO data..."
local backup_file="${INPUT_DIR}/minio_data.tar.gz"
if [[ ! -f "$backup_file" ]]; then
log_warning "MinIO backup not found: $backup_file"
# Try volume backup as fallback
if [[ -f "${INPUT_DIR}/minio_volume.tar.gz" ]]; then
log_info "Found volume backup, using that instead"
restore_minio_volume
return
fi
return 1
fi
# Extract to temp directory
local temp_dir
temp_dir=$(mktemp -d)
tar xzf "$backup_file" -C "$temp_dir"
# Check if mc (MinIO client) is available
if command -v mc &>/dev/null; then
# Configure mc alias for local minio
mc alias set onyx-restore http://localhost:9004 minioadmin minioadmin 2>/dev/null || true
# Mirror data to minio
mc mirror "$temp_dir/" onyx-restore/ 2>/dev/null || {
log_warning "mc mirror failed"
}
else
# Fallback: copy to container
if docker ps --format '{{.Names}}' | grep -q "^${MINIO_CONTAINER}$"; then
docker cp "$temp_dir/." "${MINIO_CONTAINER}:/data/"
else
log_error "MinIO container not running and mc not available"
rm -rf "$temp_dir"
return 1
fi
fi
rm -rf "$temp_dir"
log_success "MinIO restored"
}
# =============================================================================
# Main restore logic
# =============================================================================
log_info "Starting Onyx data restore..."
log_info "Input directory: $INPUT_DIR"
log_info "Project name: $PROJECT_NAME"
# Run restores based on detected mode
if [[ "$DETECTED_MODE" == "volume" ]]; then
log_info "Using volume-based restore"
# Stop services before restore
log_info "Stopping services for restore..."
if $RESTORE_POSTGRES; then
stop_service "relational_db"
fi
if $RESTORE_VESPA; then
stop_service "index"
fi
if $RESTORE_MINIO; then
stop_service "minio"
fi
# Perform restores
if $RESTORE_POSTGRES; then
restore_postgres_volume || log_warning "PostgreSQL restore failed"
fi
if $RESTORE_VESPA; then
restore_vespa_volume || log_warning "Vespa restore failed"
fi
if $RESTORE_MINIO; then
restore_minio_volume || log_warning "MinIO restore failed"
fi
# Restart services unless --no-restart was specified
if [[ "$NO_RESTART" != true ]]; then
start_services
else
log_info "Skipping service restart (--no-restart specified)"
log_info "Stopped services: ${STOPPED_SERVICES[*]}"
fi
elif [[ "$DETECTED_MODE" == "api" ]]; then
log_info "Using API-based restore"
log_info "Services must be running for API restore"
if $RESTORE_POSTGRES; then
restore_postgres_api || log_warning "PostgreSQL restore failed"
fi
if $RESTORE_VESPA; then
restore_vespa_api || log_warning "Vespa restore failed"
fi
if $RESTORE_MINIO; then
restore_minio_api || log_warning "MinIO restore failed"
fi
else
log_error "Could not detect backup mode. Ensure backup files exist in $INPUT_DIR"
exit 1
fi
log_success "==================================="
log_success "Restore completed!"
log_success "==================================="
# Post-restore recommendations
echo ""
log_info "Post-restore steps:"
log_info " 1. Run database migrations if needed: docker compose -p $PROJECT_NAME exec api_server alembic upgrade head"
log_info " 2. Verify data integrity in the application"

View File

@@ -55,7 +55,7 @@ else
docker run --detach --name onyx_minio --publish 9004:9000 --publish 9005:9001 -e MINIO_ROOT_USER=minioadmin -e MINIO_ROOT_PASSWORD=minioadmin minio/minio server /data --console-address ":9001"
fi
# Ensure alembic runs in the correct directory
# Ensure alembic runs in the correct directory (backend/)
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
PARENT_DIR="$(dirname "$SCRIPT_DIR")"
cd "$PARENT_DIR"
@@ -63,6 +63,13 @@ cd "$PARENT_DIR"
# Give Postgres a second to start
sleep 1
# Alembic should be configured in the virtualenv for this repo
if [[ -f "../.venv/bin/activate" ]]; then
source ../.venv/bin/activate
else
echo "Warning: Python virtual environment not found at .venv/bin/activate; alembic may not work."
fi
# Run Alembic upgrade
echo "Running Alembic migration..."
alembic upgrade head

View File

@@ -0,0 +1,80 @@
# Quick Start: Tenant Cleanup Without Bastion
## TL;DR - The Commands You Need
```bash
# Navigate to backend directory
cd onyx/backend
# Step 1: Generate CSV of tenants to clean (5-10 min)
PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_analyze_tenants.py
# Step 2: Mark connectors for deletion (1-2 min)
PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_mark_connectors.py \
--csv gated_tenants_no_query_3mo_*.csv \
--force \
--concurrency 16
# ⏰ WAIT 6+ hours for background deletion to complete
# Step 3: Final cleanup (1-2 min)
PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_cleanup_tenants.py \
--csv gated_tenants_no_query_3mo_*.csv \
--force
```
## What Changed?
Instead of the original scripts that require bastion access:
- `analyze_current_tenants.py``no_bastion_analyze_tenants.py`
- `mark_connectors_for_deletion.py``no_bastion_mark_connectors.py`
- `cleanup_tenants.py``no_bastion_cleanup_tenants.py`
**No environment variables needed!** All queries run directly from pods.
## What You Need
`kubectl` access to your cluster
✅ Running `celery-worker-user-file-processing` pods
✅ Permission to exec into pods
❌ No bastion host required
❌ No SSH keys required
❌ No environment variables required
## Test Your Setup
```bash
# Check if you can find worker pods
kubectl get po | grep celery-worker-user-file-processing | grep Running
# If you see pods, you're ready to go!
```
## Important Notes
1. **Step 2 triggers background deletion** - the actual document deletion happens asynchronously via Celery workers
2. **You MUST wait** between Step 2 and Step 3 for deletion to complete (can take 6+ hours)
3. **Monitor deletion progress** with: `kubectl logs -f <celery-worker-pod>`
4. **All scripts verify tenant status** - they'll refuse to process active (non-GATED_ACCESS) tenants
## Files Generated
- `gated_tenants_no_query_3mo_YYYYMMDD_HHMMSS.csv` - List of tenants to clean
- `cleaned_tenants.csv` - Successfully cleaned tenants with timestamps
## Safety First
The scripts include multiple safety checks:
- ✅ Verifies tenant status before any operation
- ✅ Checks documents are deleted before dropping schemas
- ✅ Prompts for confirmation on dangerous operations (unless `--force`)
- ✅ Records all successful operations in real-time
## Need More Details?
See [NO_BASTION_README.md](./NO_BASTION_README.md) for:
- Detailed explanations of each step
- Troubleshooting guide
- How it works under the hood
- Performance characteristics

View File

@@ -0,0 +1,284 @@
#!/usr/bin/env python3
"""
Verification script to check if your environment is ready for no-bastion tenant cleanup.
Usage:
python scripts/tenant_cleanup/check_no_bastion_setup.py
"""
import subprocess
import sys
def print_header(text: str) -> None:
"""Print a formatted header."""
print(f"\n{'=' * 80}")
print(f" {text}")
print(f"{'=' * 80}\n")
def check_kubectl_access() -> bool:
"""Check if kubectl is installed and can access the cluster."""
print("Checking kubectl access...")
try:
result = subprocess.run(
["kubectl", "version", "--client", "--short"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
print(f"✅ kubectl is installed: {result.stdout.strip()}")
# Try to access cluster
result = subprocess.run(
["kubectl", "get", "ns"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
print("✅ kubectl can access the cluster")
return True
else:
print("❌ kubectl cannot access the cluster")
print(f" Error: {result.stderr}")
return False
else:
print("❌ kubectl is not installed or not in PATH")
return False
except FileNotFoundError:
print("❌ kubectl is not installed")
return False
except subprocess.TimeoutExpired:
print("❌ kubectl command timed out")
return False
except Exception as e:
print(f"❌ Error checking kubectl: {e}")
return False
def check_worker_pods() -> tuple[bool, list[str]]:
"""Check if worker pods are running."""
print("\nChecking for worker pods...")
try:
result = subprocess.run(
["kubectl", "get", "po"],
capture_output=True,
text=True,
timeout=10,
check=True,
)
lines = result.stdout.strip().split("\n")
worker_pods = []
for line in lines[1:]: # Skip header
if "celery-worker-user-file-processing" in line and "Running" in line:
pod_name = line.split()[0]
worker_pods.append(pod_name)
if worker_pods:
print(f"✅ Found {len(worker_pods)} running worker pod(s):")
for pod in worker_pods[:3]: # Show first 3
print(f" - {pod}")
if len(worker_pods) > 3:
print(f" ... and {len(worker_pods) - 3} more")
return True, worker_pods
else:
print("❌ No running celery-worker-user-file-processing pods found")
print(" Available pods:")
for line in lines[1:6]: # Show first 5 pods
print(f" {line}")
return False, []
except subprocess.CalledProcessError as e:
print(f"❌ Error getting pods: {e}")
return False, []
except Exception as e:
print(f"❌ Error checking worker pods: {e}")
return False, []
def check_pod_exec_permission(pod_name: str) -> bool:
"""Check if we can exec into a pod."""
print("\nChecking pod exec permissions...")
try:
result = subprocess.run(
["kubectl", "exec", pod_name, "--", "echo", "test"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and "test" in result.stdout:
print(f"✅ Can exec into pod: {pod_name}")
return True
else:
print(f"❌ Cannot exec into pod: {pod_name}")
print(f" Error: {result.stderr}")
return False
except subprocess.TimeoutExpired:
print(f"❌ Exec command timed out for pod: {pod_name}")
return False
except Exception as e:
print(f"❌ Error checking exec permission: {e}")
return False
def check_pod_db_access(pod_name: str) -> dict:
"""Check if pod has database environment variables."""
print("\nChecking database access from pod...")
checks = {
"control_plane": False,
"data_plane": False,
}
try:
# Check for control plane DB env vars
result = subprocess.run(
["kubectl", "exec", pod_name, "--", "env"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
env_output = result.stdout
# Check control plane access
if any(
var in env_output
for var in [
"POSTGRES_CONTROL_URI",
"POSTGRES_CONTROL_HOST",
]
):
print("✅ Pod has control plane database environment variables")
checks["control_plane"] = True
else:
print(
"⚠️ Pod may not have control plane database environment variables"
)
print(" (This might be okay if they're dynamically loaded)")
# Check data plane access
if any(
var in env_output
for var in ["POSTGRES_URI", "POSTGRES_HOST", "DATABASE_URL"]
):
print("✅ Pod has data plane database environment variables")
checks["data_plane"] = True
else:
print("❌ Pod does not have data plane database environment variables")
return checks
except Exception as e:
print(f"❌ Error checking database access: {e}")
return checks
def check_required_scripts() -> bool:
"""Check if the required on_pod_scripts exist."""
print("\nChecking for required scripts...")
from pathlib import Path
script_dir = Path(__file__).parent
required_scripts = [
"on_pod_scripts/understand_tenants.py",
"on_pod_scripts/execute_connector_deletion.py",
"on_pod_scripts/check_documents_deleted.py",
"on_pod_scripts/cleanup_tenant_schema.py",
"on_pod_scripts/get_tenant_index_name.py",
"on_pod_scripts/get_tenant_users.py",
]
all_exist = True
for script in required_scripts:
script_path = script_dir / script
if script_path.exists():
print(f"{script}")
else:
print(f"{script} - NOT FOUND")
all_exist = False
return all_exist
def main() -> None:
print_header("No-Bastion Tenant Cleanup - Setup Verification")
all_checks_passed = True
# 1. Check kubectl access
if not check_kubectl_access():
all_checks_passed = False
# 2. Check for worker pods
has_pods, worker_pods = check_worker_pods()
if not has_pods:
all_checks_passed = False
print("\n⚠️ Cannot proceed without running worker pods")
print_header("SETUP VERIFICATION FAILED")
sys.exit(1)
# Use first worker pod for remaining checks
test_pod = worker_pods[0]
# 3. Check exec permissions
if not check_pod_exec_permission(test_pod):
all_checks_passed = False
# 4. Check database access
db_checks = check_pod_db_access(test_pod)
if not db_checks["data_plane"]:
all_checks_passed = False
# 5. Check required scripts
if not check_required_scripts():
all_checks_passed = False
# Summary
print_header("VERIFICATION SUMMARY")
if all_checks_passed and db_checks["control_plane"]:
print("✅ ALL CHECKS PASSED!")
print("\nYou're ready to run tenant cleanup without bastion access.")
print("\nNext steps:")
print("1. Read QUICK_START_NO_BASTION.md for commands")
print(
"2. Run: PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_analyze_tenants.py"
)
sys.exit(0)
elif all_checks_passed:
print("⚠️ MOSTLY READY (with warnings)")
print("\nYou can proceed, but control plane access may need verification.")
print("Try running Step 1 and see if it works.")
print("\nNext steps:")
print(
"1. Run: PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_analyze_tenants.py"
)
print("2. If it fails with DB errors, check pod environment variables")
sys.exit(0)
else:
print("❌ SETUP VERIFICATION FAILED")
print("\nPlease fix the issues above before proceeding.")
print("\nCommon fixes:")
print("- Install kubectl: https://kubernetes.io/docs/tasks/tools/")
print("- Configure cluster access: kubectl config use-context <context>")
print("- Check pod status: kubectl get po")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,519 @@
#!/usr/bin/env python3
"""
Tenant analysis script that works WITHOUT bastion access.
Control plane and data plane are in SEPARATE clusters.
Usage:
PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_analyze_tenants.py \
[--skip-cache] \
[--data-plane-context <context>] \
[--control-plane-context <context>]
"""
import argparse
import csv
import json
import subprocess
import sys
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from pathlib import Path
from typing import Any
from scripts.tenant_cleanup.no_bastion_cleanup_utils import find_background_pod
from scripts.tenant_cleanup.no_bastion_cleanup_utils import find_worker_pod
def collect_tenant_data(
pod_name: str, context: str | None = None
) -> list[dict[str, Any]]:
"""Run the understand_tenants script on the data plane pod."""
print(f"\nCollecting tenant data from data plane pod {pod_name}...")
# Get the path to the understand_tenants script
script_dir = Path(__file__).parent
understand_tenants_script = script_dir / "on_pod_scripts" / "understand_tenants.py"
if not understand_tenants_script.exists():
raise FileNotFoundError(
f"understand_tenants.py not found at {understand_tenants_script}"
)
# Copy script to pod
print("Copying script to pod...")
cmd_cp = [
"kubectl",
"cp",
str(understand_tenants_script),
f"{pod_name}:/tmp/understand_tenants.py",
]
if context:
cmd_cp.extend(["--context", context])
subprocess.run(cmd_cp, check=True, capture_output=True)
# Execute script on pod
print("Executing script on pod (this may take a while)...")
cmd_exec = ["kubectl", "exec", pod_name]
if context:
cmd_exec.extend(["--context", context])
cmd_exec.extend(["--", "python", "/tmp/understand_tenants.py"])
result = subprocess.run(cmd_exec, capture_output=True, text=True, check=True)
# Show progress messages from stderr
if result.stderr:
print(result.stderr, file=sys.stderr)
# Parse JSON from stdout
try:
tenant_data = json.loads(result.stdout)
print(f"Successfully collected data for {len(tenant_data)} tenants")
return tenant_data
except json.JSONDecodeError as e:
print(f"Failed to parse JSON output: {e}", file=sys.stderr)
print(f"stdout: {result.stdout[:500]}", file=sys.stderr)
raise
def collect_control_plane_data_from_pod(
pod_name: str, context: str | None = None
) -> list[dict[str, Any]]:
"""Collect control plane data by running a query on a control plane pod."""
print(f"\nCollecting control plane data from pod {pod_name}...")
# Create a script to query the control plane database
query_script = """
import json
import os
from sqlalchemy import create_engine, text
# Try to get database URL from various environment patterns
control_db_url = None
# Pattern 1: POSTGRES_CONTROL_* variables
if os.environ.get("POSTGRES_CONTROL_HOST"):
host = os.environ.get("POSTGRES_CONTROL_HOST")
port = os.environ.get("POSTGRES_CONTROL_PORT", "5432")
db = os.environ.get("POSTGRES_CONTROL_DB", "control")
user = os.environ.get("POSTGRES_CONTROL_USER", "postgres")
password = os.environ.get("POSTGRES_CONTROL_PASSWORD", "")
if password:
control_db_url = f"postgresql://{user}:{password}@{host}:{port}/{db}"
# Pattern 2: Standard POSTGRES_* variables (in control plane cluster)
if not control_db_url and os.environ.get("POSTGRES_HOST"):
host = os.environ.get("POSTGRES_HOST")
port = os.environ.get("POSTGRES_PORT", "5432")
db = os.environ.get("POSTGRES_DB", "danswer")
user = os.environ.get("POSTGRES_USER", "postgres")
password = os.environ.get("POSTGRES_PASSWORD", "")
if password:
control_db_url = f"postgresql://{user}:{password}@{host}:{port}/{db}"
if not control_db_url:
raise ValueError("Cannot determine control plane database connection")
engine = create_engine(control_db_url)
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT tenant_id, stripe_customer_id, created_at, active_seats, "
"creator_email, referral_source, application_status FROM tenant"
)
)
rows = [dict(row._mapping) for row in result]
print(json.dumps(rows, default=str))
"""
# Write the script to a temp file
script_path = "/tmp/query_control_plane.py"
print(" Creating control plane query script on pod...")
cmd_write = ["kubectl", "exec", pod_name]
if context:
cmd_write.extend(["--context", context])
cmd_write.extend(
["--", "bash", "-c", f"cat > {script_path} << 'EOF'\n{query_script}\nEOF"]
)
subprocess.run(cmd_write, check=True, capture_output=True)
# Execute the script on the pod
print(" Executing control plane query on pod...")
cmd_exec = ["kubectl", "exec", pod_name]
if context:
cmd_exec.extend(["--context", context])
cmd_exec.extend(["--", "python", script_path])
result = subprocess.run(cmd_exec, capture_output=True, text=True, check=True)
# Parse JSON output
try:
control_plane_data = json.loads(result.stdout)
print(
f"✓ Successfully collected {len(control_plane_data)} tenant records from control plane"
)
return control_plane_data
except json.JSONDecodeError as e:
print(f"Failed to parse JSON output: {e}", file=sys.stderr)
print(f"stdout: {result.stdout[:500]}", file=sys.stderr)
raise
def analyze_tenants(
tenants: list[dict[str, Any]], control_plane_data: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Analyze tenant activity data and return gated tenants with no query in last 3 months."""
print(f"\n{'=' * 80}")
print(f"TENANT ANALYSIS REPORT - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'=' * 80}")
print(f"Total tenants analyzed: {len(tenants)}\n")
# Create a lookup dict for control plane data by tenant_id
control_plane_lookup = {}
for row in control_plane_data:
tenant_id = row.get("tenant_id")
tenant_status = row.get("application_status")
if tenant_id:
control_plane_lookup[tenant_id] = tenant_status
# Calculate cutoff dates
one_month_cutoff = datetime.now(timezone.utc) - timedelta(days=30)
three_month_cutoff = datetime.now(timezone.utc) - timedelta(days=90)
# Categorize tenants into 4 groups
gated_no_query_3_months = [] # GATED_ACCESS + no query in last 3 months
gated_query_1_3_months = [] # GATED_ACCESS + query between 1-3 months
gated_query_1_month = [] # GATED_ACCESS + query in last 1 month
everyone_else = [] # All other tenants
for tenant in tenants:
tenant_id = tenant.get("tenant_id")
last_query_time = tenant.get("last_query_time")
tenant_status = control_plane_lookup.get(tenant_id, "UNKNOWN")
is_gated = tenant_status == "GATED_ACCESS"
# Parse last query time
if last_query_time:
query_time = datetime.fromisoformat(last_query_time.replace("Z", "+00:00"))
else:
query_time = None
# Categorize
if is_gated:
if query_time is None or query_time <= three_month_cutoff:
gated_no_query_3_months.append(tenant)
elif query_time <= one_month_cutoff:
gated_query_1_3_months.append(tenant)
else: # query_time > one_month_cutoff
gated_query_1_month.append(tenant)
else:
everyone_else.append(tenant)
# Calculate document counts for each group
gated_no_query_docs = sum(
t.get("num_documents", 0) for t in gated_no_query_3_months
)
gated_1_3_month_docs = sum(
t.get("num_documents", 0) for t in gated_query_1_3_months
)
gated_1_month_docs = sum(t.get("num_documents", 0) for t in gated_query_1_month)
everyone_else_docs = sum(t.get("num_documents", 0) for t in everyone_else)
print("=" * 80)
print("TENANT CATEGORIZATION BY GATED ACCESS STATUS AND ACTIVITY")
print("=" * 80)
print("\n1. GATED_ACCESS + No query in last 3 months:")
print(f" Count: {len(gated_no_query_3_months):,}")
print(f" Total documents: {gated_no_query_docs:,}")
print(
f" Avg documents per tenant: {gated_no_query_docs / len(gated_no_query_3_months) if gated_no_query_3_months else 0:.2f}"
)
print("\n2. GATED_ACCESS + Query between 1-3 months ago:")
print(f" Count: {len(gated_query_1_3_months):,}")
print(f" Total documents: {gated_1_3_month_docs:,}")
print(
f" Avg documents per tenant: {gated_1_3_month_docs / len(gated_query_1_3_months) if gated_query_1_3_months else 0:.2f}"
)
print("\n3. GATED_ACCESS + Query in last 1 month:")
print(f" Count: {len(gated_query_1_month):,}")
print(f" Total documents: {gated_1_month_docs:,}")
print(
f" Avg documents per tenant: {gated_1_month_docs / len(gated_query_1_month) if gated_query_1_month else 0:.2f}"
)
print("\n4. Everyone else (non-GATED_ACCESS):")
print(f" Count: {len(everyone_else):,}")
print(f" Total documents: {everyone_else_docs:,}")
print(
f" Avg documents per tenant: {everyone_else_docs / len(everyone_else) if everyone_else else 0:.2f}"
)
total_docs = (
gated_no_query_docs
+ gated_1_3_month_docs
+ gated_1_month_docs
+ everyone_else_docs
)
print(f"\nTotal documents across all tenants: {total_docs:,}")
# Top 100 tenants by document count
print("\n" + "=" * 80)
print("TOP 100 TENANTS BY DOCUMENT COUNT")
print("=" * 80)
# Sort all tenants by document count
sorted_tenants = sorted(
tenants, key=lambda t: t.get("num_documents", 0), reverse=True
)
top_100 = sorted_tenants[:100]
print(
f"\n{'Rank':<6} {'Tenant ID':<45} {'Documents':>12} {'Users':>8} {'Last Query':<12} {'Group'}"
)
print("-" * 130)
for idx, tenant in enumerate(top_100, 1):
tenant_id = tenant.get("tenant_id", "Unknown")
num_docs = tenant.get("num_documents", 0)
num_users = tenant.get("num_users", 0)
last_query = tenant.get("last_query_time", "Never")
tenant_status = control_plane_lookup.get(tenant_id, "UNKNOWN")
# Format the last query time
if last_query and last_query != "Never":
try:
query_dt = datetime.fromisoformat(last_query.replace("Z", "+00:00"))
last_query_str = query_dt.strftime("%Y-%m-%d")
except Exception:
last_query_str = last_query[:10] if len(last_query) > 10 else last_query
else:
last_query_str = "Never"
# Determine group
if tenant_status == "GATED_ACCESS":
if last_query and last_query != "Never":
query_time = datetime.fromisoformat(last_query.replace("Z", "+00:00"))
if query_time <= three_month_cutoff:
group = "Gated - No query (3mo)"
elif query_time <= one_month_cutoff:
group = "Gated - Query (1-3mo)"
else:
group = "Gated - Query (1mo)"
else:
group = "Gated - No query (3mo)"
else:
group = f"Other ({tenant_status})"
print(
f"{idx:<6} {tenant_id:<45} {num_docs:>12,} {num_users:>8} {last_query_str:<12} {group}"
)
# Summary stats for top 100
top_100_docs = sum(t.get("num_documents", 0) for t in top_100)
print("\n" + "-" * 110)
print(f"Top 100 total documents: {top_100_docs:,}")
print(
f"Percentage of all documents: {(top_100_docs / total_docs * 100) if total_docs > 0 else 0:.2f}%"
)
# Additional insights
print("\n" + "=" * 80)
print("ADDITIONAL INSIGHTS")
print("=" * 80)
# Tenants with no documents
no_docs = [t for t in tenants if t.get("num_documents", 0) == 0]
print(
f"\nTenants with 0 documents: {len(no_docs):,} ({len(no_docs) / len(tenants) * 100:.2f}%)"
)
# Tenants with no users
no_users = [t for t in tenants if t.get("num_users", 0) == 0]
print(
f"Tenants with 0 users: {len(no_users):,} ({len(no_users) / len(tenants) * 100:.2f}%)"
)
# Document distribution quartiles
doc_counts = sorted([t.get("num_documents", 0) for t in tenants])
if doc_counts:
print("\nDocument count distribution:")
print(f" Median: {doc_counts[len(doc_counts) // 2]:,}")
print(f" 75th percentile: {doc_counts[int(len(doc_counts) * 0.75)]:,}")
print(f" 90th percentile: {doc_counts[int(len(doc_counts) * 0.90)]:,}")
print(f" 95th percentile: {doc_counts[int(len(doc_counts) * 0.95)]:,}")
print(f" 99th percentile: {doc_counts[int(len(doc_counts) * 0.99)]:,}")
print(f" Max: {doc_counts[-1]:,}")
return gated_no_query_3_months
def find_recent_tenant_data() -> tuple[list[dict[str, Any]] | None, str | None]:
"""Find the most recent tenant data file if it's less than 7 days old."""
current_dir = Path.cwd()
tenant_data_files = list(current_dir.glob("tenant_data_*.json"))
if not tenant_data_files:
return None, None
# Sort by modification time, most recent first
tenant_data_files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
most_recent = tenant_data_files[0]
# Check if file is less than 7 days old
file_age = datetime.now().timestamp() - most_recent.stat().st_mtime
seven_days_in_seconds = 7 * 24 * 60 * 60
if file_age < seven_days_in_seconds:
file_age_days = file_age / (24 * 60 * 60)
print(
f"\n✓ Found recent tenant data: {most_recent.name} (age: {file_age_days:.1f} days)"
)
with open(most_recent, "r") as f:
tenant_data = json.load(f)
return tenant_data, str(most_recent)
return None, None
def main() -> None:
# Parse command-line arguments
parser = argparse.ArgumentParser(
description="Analyze tenant data WITHOUT bastion access - control plane and data plane are separate clusters"
)
parser.add_argument(
"--skip-cache",
action="store_true",
help="Skip cached tenant data and collect fresh data from pod",
)
parser.add_argument(
"--data-plane-context",
type=str,
help="Kubectl context for data plane cluster (optional)",
)
parser.add_argument(
"--control-plane-context",
type=str,
help="Kubectl context for control plane cluster (optional)",
)
args = parser.parse_args()
try:
# Step 1: Check for recent tenant data (< 7 days old) unless --skip-cache is set
tenant_data = None
cached_file = None
if not args.skip_cache:
tenant_data, cached_file = find_recent_tenant_data()
if tenant_data:
print(f"Using cached tenant data from: {cached_file}")
print(f"Total tenants in cache: {len(tenant_data)}")
else:
if args.skip_cache:
print("\n⚠ Skipping cache (--skip-cache flag set)")
# Find data plane worker pod
print("\n" + "=" * 80)
print("CONNECTING TO DATA PLANE CLUSTER")
print("=" * 80)
data_plane_pod = find_worker_pod(args.data_plane_context)
# Collect tenant data from data plane
tenant_data = collect_tenant_data(data_plane_pod, args.data_plane_context)
# Save raw data to file with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"tenant_data_{timestamp}.json"
with open(output_file, "w") as f:
json.dump(tenant_data, f, indent=2, default=str)
print(f"\n✓ Raw data saved to: {output_file}")
# Step 2: Collect control plane data from control plane cluster
print("\n" + "=" * 80)
print("CONNECTING TO CONTROL PLANE CLUSTER")
print("=" * 80)
control_plane_pod = find_background_pod(args.control_plane_context)
control_plane_data = collect_control_plane_data_from_pod(
control_plane_pod, args.control_plane_context
)
# Step 3: Analyze the data and get gated tenants without recent queries
gated_no_query_3_months = analyze_tenants(tenant_data, control_plane_data)
# Step 4: Export to CSV (sorted by num_documents descending)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_file = f"gated_tenants_no_query_3mo_{timestamp}.csv"
# Sort by num_documents in descending order
sorted_tenants = sorted(
gated_no_query_3_months,
key=lambda t: t.get("num_documents", 0),
reverse=True,
)
with open(csv_file, "w", newline="", encoding="utf-8") as csvfile:
fieldnames = [
"tenant_id",
"num_documents",
"num_users",
"last_query_time",
"days_since_last_query",
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
now = datetime.now(timezone.utc)
for tenant in sorted_tenants:
# Calculate days since last query
last_query_time = tenant.get("last_query_time")
if last_query_time:
try:
query_dt = datetime.fromisoformat(
last_query_time.replace("Z", "+00:00")
)
days_since = str((now - query_dt).days)
except Exception:
days_since = "N/A"
else:
days_since = "Never"
writer.writerow(
{
"tenant_id": tenant.get("tenant_id", ""),
"num_documents": tenant.get("num_documents", 0),
"num_users": tenant.get("num_users", 0),
"last_query_time": last_query_time or "Never",
"days_since_last_query": days_since,
}
)
print(f"\n✓ CSV exported to: {csv_file}")
print(
f" Total gated tenants with no query in last 3 months: {len(gated_no_query_3_months)}"
)
except subprocess.CalledProcessError as e:
print(f"Error running command: {e}", file=sys.stderr)
if e.stderr:
print(f"stderr: {e.stderr}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,870 @@
#!/usr/bin/env python3
"""
Tenant cleanup script that works WITHOUT bastion access.
All queries run directly from pods.
Supports two-cluster architecture (data plane and control plane in separate clusters).
Usage:
PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_cleanup_tenants.py <tenant_id> [--force]
PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_cleanup_tenants.py --csv <csv_file_path> [--force]
With explicit contexts:
PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_cleanup_tenants.py --csv <csv_file_path> \
--data-plane-context <context> --control-plane-context <context> [--force]
"""
import csv
import json
import signal
import subprocess
import sys
from concurrent.futures import as_completed
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from threading import Lock
from scripts.tenant_cleanup.no_bastion_cleanup_utils import confirm_step
from scripts.tenant_cleanup.no_bastion_cleanup_utils import execute_control_plane_delete
from scripts.tenant_cleanup.no_bastion_cleanup_utils import find_background_pod
from scripts.tenant_cleanup.no_bastion_cleanup_utils import find_worker_pod
from scripts.tenant_cleanup.no_bastion_cleanup_utils import get_tenant_status
from scripts.tenant_cleanup.no_bastion_cleanup_utils import read_tenant_ids_from_csv
# Global lock for thread-safe operations
_print_lock: Lock = Lock()
_csv_lock: Lock = Lock()
def signal_handler(signum: int, frame: object) -> None:
"""Handle termination signals by killing active subprocess."""
sys.exit(1)
def setup_scripts_on_pod(pod_name: str, context: str | None = None) -> None:
"""Copy all required scripts to the pod once at the beginning.
Args:
pod_name: Pod to copy scripts to
context: Optional kubectl context
"""
print("Setting up scripts on pod (one-time operation)...")
script_dir = Path(__file__).parent
scripts_to_copy = [
(
"on_pod_scripts/check_documents_deleted.py",
"/tmp/check_documents_deleted.py",
),
("on_pod_scripts/cleanup_tenant_schema.py", "/tmp/cleanup_tenant_schema.py"),
("on_pod_scripts/get_tenant_users.py", "/tmp/get_tenant_users.py"),
("on_pod_scripts/get_tenant_index_name.py", "/tmp/get_tenant_index_name.py"),
]
for local_path, remote_path in scripts_to_copy:
local_file = script_dir / local_path
if not local_file.exists():
raise FileNotFoundError(f"Script not found: {local_file}")
cmd_cp = ["kubectl", "cp"]
if context:
cmd_cp.extend(["--context", context])
cmd_cp.extend([str(local_file), f"{pod_name}:{remote_path}"])
subprocess.run(cmd_cp, check=True, capture_output=True)
print("✓ All scripts copied to pod")
def get_tenant_index_name(
pod_name: str, tenant_id: str, context: str | None = None
) -> str:
"""Get the default index name for the given tenant by running script on pod.
Args:
pod_name: Data plane pod to execute on
tenant_id: Tenant ID to process
context: Optional kubectl context for data plane cluster
"""
print(f"Getting default index name for tenant: {tenant_id}")
# Get the path to the script
script_dir = Path(__file__).parent
index_name_script = script_dir / "on_pod_scripts" / "get_tenant_index_name.py"
if not index_name_script.exists():
raise FileNotFoundError(
f"get_tenant_index_name.py not found at {index_name_script}"
)
try:
# Copy script to pod
print(" Copying script to pod...")
cmd_cp = ["kubectl", "cp"]
if context:
cmd_cp.extend(["--context", context])
cmd_cp.extend(
[
str(index_name_script),
f"{pod_name}:/tmp/get_tenant_index_name.py",
]
)
subprocess.run(
cmd_cp,
check=True,
capture_output=True,
)
# Execute script on pod
print(" Executing script on pod...")
cmd_exec = ["kubectl", "exec"]
if context:
cmd_exec.extend(["--context", context])
cmd_exec.extend(
[
pod_name,
"--",
"python",
"/tmp/get_tenant_index_name.py",
tenant_id,
]
)
result = subprocess.run(
cmd_exec,
capture_output=True,
text=True,
check=True,
)
# Show progress messages from stderr
if result.stderr:
print(f" {result.stderr}", end="")
# Parse JSON result from stdout
result_data = json.loads(result.stdout)
status = result_data.get("status")
if status == "success":
index_name = result_data.get("index_name")
print(f"✓ Found index name: {index_name}")
return index_name
else:
message = result_data.get("message", "Unknown error")
raise RuntimeError(f"Failed to get index name: {message}")
except subprocess.CalledProcessError as e:
print(
f"✗ Failed to get index name for tenant {tenant_id}: {e}", file=sys.stderr
)
if e.stderr:
print(f" Error details: {e.stderr}", file=sys.stderr)
raise
except Exception as e:
print(
f"✗ Failed to get index name for tenant {tenant_id}: {e}", file=sys.stderr
)
raise
def get_tenant_users(
pod_name: str, tenant_id: str, context: str | None = None
) -> list[str]:
"""Get list of user emails from the tenant's data plane schema.
Args:
pod_name: Data plane pod to execute on
tenant_id: Tenant ID to process
context: Optional kubectl context for data plane cluster
"""
# Script is already on pod from setup_scripts_on_pod()
try:
# Execute script on pod
cmd_exec = ["kubectl", "exec"]
if context:
cmd_exec.extend(["--context", context])
cmd_exec.extend(
[
pod_name,
"--",
"python",
"/tmp/get_tenant_users.py",
tenant_id,
]
)
result = subprocess.run(
cmd_exec,
capture_output=True,
text=True,
check=True,
)
# Show progress messages from stderr
if result.stderr:
print(f" {result.stderr}", end="")
# Parse JSON result from stdout
result_data = json.loads(result.stdout)
status = result_data.get("status")
if status == "success":
users = result_data.get("users", [])
if users:
print(f"✓ Found {len(users)} user(s):")
for email in users:
print(f" - {email}")
else:
print(" No users found in tenant")
return users
else:
message = result_data.get("message", "Unknown error")
print(f"⚠ Could not fetch users: {message}")
return []
except subprocess.CalledProcessError as e:
print(f"⚠ Failed to get users for tenant {tenant_id}: {e}")
if e.stderr:
print(f" Error details: {e.stderr}")
return []
except Exception as e:
print(f"⚠ Failed to get users for tenant {tenant_id}: {e}")
return []
def check_documents_deleted(
pod_name: str, tenant_id: str, context: str | None = None
) -> None:
"""Check if all documents and connector credential pairs have been deleted.
Args:
pod_name: Data plane pod to execute on
tenant_id: Tenant ID to process
context: Optional kubectl context for data plane cluster
"""
# Script is already on pod from setup_scripts_on_pod()
try:
# Execute script on pod
cmd_exec = ["kubectl", "exec"]
if context:
cmd_exec.extend(["--context", context])
cmd_exec.extend(
[
pod_name,
"--",
"python",
"/tmp/check_documents_deleted.py",
tenant_id,
]
)
result = subprocess.run(
cmd_exec,
capture_output=True,
text=True,
check=True,
)
# Show progress messages from stderr
if result.stderr:
print(f" {result.stderr}", end="")
# Parse JSON result from stdout
result_data = json.loads(result.stdout)
status = result_data.get("status")
if status == "success":
message = result_data.get("message")
print(f"{message}")
elif status == "not_found":
message = result_data.get("message", "Schema not found")
print(f"{message}")
else:
message = result_data.get("message", "Unknown error")
cc_count = result_data.get("connector_credential_pair_count", 0)
doc_count = result_data.get("document_count", 0)
error_details = f"{message}"
if cc_count > 0 or doc_count > 0:
error_details += f"\n ConnectorCredentialPairs: {cc_count}\n Documents: {doc_count}"
raise RuntimeError(error_details)
except subprocess.CalledProcessError as e:
print(
f"✗ Failed to check documents for tenant {tenant_id}: {e}",
file=sys.stderr,
)
if e.stderr:
print(f" Error details: {e.stderr}", file=sys.stderr)
raise
except Exception as e:
print(
f"✗ Failed to check documents for tenant {tenant_id}: {e}",
file=sys.stderr,
)
raise
def drop_data_plane_schema(
pod_name: str, tenant_id: str, context: str | None = None
) -> None:
"""Drop the PostgreSQL schema for the given tenant by running script on pod.
Args:
pod_name: Data plane pod to execute on
tenant_id: Tenant ID to process
context: Optional kubectl context for data plane cluster
"""
# Script is already on pod from setup_scripts_on_pod()
try:
# Execute script on pod
cmd_exec = ["kubectl", "exec"]
if context:
cmd_exec.extend(["--context", context])
cmd_exec.extend(
[
pod_name,
"--",
"python",
"/tmp/cleanup_tenant_schema.py",
tenant_id,
]
)
result = subprocess.run(
cmd_exec,
capture_output=True,
text=True,
check=True,
)
# Show progress messages from stderr
if result.stderr:
print(f" {result.stderr}", end="")
# Parse JSON result from stdout
result_data = json.loads(result.stdout)
status = result_data.get("status")
message = result_data.get("message")
if status == "success":
print(f"{message}")
elif status == "not_found":
print(f"{message}")
else:
print(f"{message}", file=sys.stderr)
raise RuntimeError(message)
except subprocess.CalledProcessError as e:
print(f"✗ Failed to drop schema for tenant {tenant_id}: {e}", file=sys.stderr)
if e.stderr:
print(f" Error details: {e.stderr}", file=sys.stderr)
raise
except Exception as e:
print(f"✗ Failed to drop schema for tenant {tenant_id}: {e}", file=sys.stderr)
raise
def cleanup_control_plane(
pod_name: str, tenant_id: str, context: str | None = None, force: bool = False
) -> None:
"""Clean up control plane data via pod queries.
Args:
pod_name: Control plane pod to execute on
tenant_id: Tenant ID to process
context: Optional kubectl context for control plane cluster
force: Skip confirmations if True
"""
print(f"Cleaning up control plane data for tenant: {tenant_id}")
# Delete in order respecting foreign key constraints
delete_queries = [
(
"tenant_notification",
f"DELETE FROM tenant_notification WHERE tenant_id = '{tenant_id}'",
),
("tenant_config", f"DELETE FROM tenant_config WHERE tenant_id = '{tenant_id}'"),
("subscription", f"DELETE FROM subscription WHERE tenant_id = '{tenant_id}'"),
("tenant", f"DELETE FROM tenant WHERE tenant_id = '{tenant_id}'"),
]
try:
for table_name, query in delete_queries:
print(f" Deleting from {table_name}...")
if not confirm_step(f"Delete from {table_name}?", force):
print(f" Skipping deletion from {table_name}")
continue
execute_control_plane_delete(pod_name, query, context)
print(f"✓ Successfully cleaned up control plane data for tenant: {tenant_id}")
except Exception as e:
print(
f"✗ Failed to clean up control plane for tenant {tenant_id}: {e}",
file=sys.stderr,
)
raise
def cleanup_tenant(
tenant_id: str,
data_plane_pod: str,
control_plane_pod: str,
data_plane_context: str | None = None,
control_plane_context: str | None = None,
force: bool = False,
) -> bool:
"""Main cleanup function that orchestrates all cleanup steps.
Args:
tenant_id: Tenant ID to process
data_plane_pod: Data plane pod for schema operations
control_plane_pod: Control plane pod for tenant record operations
data_plane_context: Optional kubectl context for data plane cluster
control_plane_context: Optional kubectl context for control plane cluster
force: Skip confirmations if True
"""
print(f"Starting cleanup for tenant: {tenant_id}")
# Check tenant status first (from control plane)
print(f"\n{'=' * 80}")
try:
tenant_status = get_tenant_status(
control_plane_pod, tenant_id, control_plane_context
)
# If tenant is not GATED_ACCESS, require explicit confirmation even in force mode
if tenant_status and tenant_status != "GATED_ACCESS":
print(
f"\n⚠️ WARNING: Tenant status is '{tenant_status}', not 'GATED_ACCESS'!"
)
print(
"This tenant may be active and should not be deleted without careful review."
)
print(f"{'=' * 80}\n")
if force:
print(f"Skipping cleanup for tenant {tenant_id} in force mode")
return False
# Always ask for confirmation if not gated
response = input(
"Are you ABSOLUTELY SURE you want to proceed? Type 'yes' to confirm: "
)
if response.lower() != "yes":
print("Cleanup aborted - tenant is not GATED_ACCESS")
return False
elif tenant_status == "GATED_ACCESS":
print("✓ Tenant status is GATED_ACCESS - safe to proceed with cleanup")
elif tenant_status is None:
print("⚠️ WARNING: Could not determine tenant status!")
if force:
print(f"Skipping cleanup for tenant {tenant_id} in force mode")
return False
response = input("Continue anyway? Type 'yes' to confirm: ")
if response.lower() != "yes":
print("Cleanup aborted - could not verify tenant status")
return False
except Exception as e:
print(f"⚠️ WARNING: Failed to check tenant status: {e}")
if force:
print(f"Skipping cleanup for tenant {tenant_id} in force mode")
return False
response = input("Continue anyway? Type 'yes' to confirm: ")
if response.lower() != "yes":
print("Cleanup aborted - could not verify tenant status")
return False
print(f"{'=' * 80}\n")
# Fetch tenant users for informational purposes (non-blocking) from data plane
if not force:
print(f"\n{'=' * 80}")
try:
get_tenant_users(data_plane_pod, tenant_id, data_plane_context)
except Exception as e:
print(f"⚠ Could not fetch tenant users: {e}")
print(f"{'=' * 80}\n")
# Step 1: Make sure all documents are deleted (data plane)
print(f"\n{'=' * 80}")
print("Step 1/3: Checking for remaining ConnectorCredentialPairs and Documents")
print(f"{'=' * 80}")
try:
check_documents_deleted(data_plane_pod, tenant_id, data_plane_context)
except Exception as e:
print(f"✗ Document check failed: {e}", file=sys.stderr)
print(
"\nPlease ensure all ConnectorCredentialPairs and Documents are deleted before running cleanup."
)
print(
"You may need to mark connectors for deletion and wait for cleanup to complete."
)
return False
print(f"{'=' * 80}\n")
# Step 2: Drop data plane schema
if confirm_step(
f"Step 2/3: Drop data plane schema '{tenant_id}' (CASCADE - will delete all tables, functions, etc.)",
force,
):
try:
drop_data_plane_schema(data_plane_pod, tenant_id, data_plane_context)
except Exception as e:
print(f"✗ Failed at schema cleanup step: {e}", file=sys.stderr)
if not force:
response = input("Continue with control plane cleanup? (y/n): ")
if response.lower() != "y":
print("Cleanup aborted by user")
return False
else:
print("[FORCE MODE] Continuing despite schema cleanup failure")
else:
print("Step 2 skipped by user")
# Step 3: Clean up control plane
if confirm_step(
"Step 3/3: Delete control plane records (tenant_notification, tenant_config, subscription, tenant)",
force,
):
try:
cleanup_control_plane(
control_plane_pod, tenant_id, control_plane_context, force
)
except Exception as e:
print(f"✗ Failed at control plane cleanup step: {e}", file=sys.stderr)
if not force:
print("Control plane cleanup failed")
else:
print("[FORCE MODE] Control plane cleanup failed but continuing")
else:
print("Step 3 skipped by user")
return False
print(f"\n{'=' * 80}")
print(f"✓ Cleanup completed for tenant: {tenant_id}")
print(f"{'=' * 80}")
return True
def main() -> None:
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if len(sys.argv) < 2:
print(
"Usage: PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_cleanup_tenants.py <tenant_id> [--force]"
)
print(
" PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_cleanup_tenants.py --csv <csv_file_path> [--force]"
)
print("\nTwo-cluster architecture (with explicit contexts):")
print(
" PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_cleanup_tenants.py --csv <csv_file_path> \\"
)
print(
" --data-plane-context <context> --control-plane-context <context> [--force]"
)
print("\nThis version runs ALL operations from pods (no bastion required)")
print("\nArguments:")
print(
" tenant_id The tenant ID to clean up (required if not using --csv)"
)
print(
" --csv PATH Path to CSV file containing tenant IDs to clean up"
)
print(" --force Skip all confirmation prompts (optional)")
print(
" --concurrency N Process N tenants concurrently (default: 1)"
)
print(
" --data-plane-context CTX Kubectl context for data plane cluster (optional)"
)
print(
" --control-plane-context CTX Kubectl context for control plane cluster (optional)"
)
sys.exit(1)
# Parse arguments
force = "--force" in sys.argv
tenant_ids = []
# Parse concurrency
concurrency: int = 1
if "--concurrency" in sys.argv:
try:
concurrency_index = sys.argv.index("--concurrency")
if concurrency_index + 1 >= len(sys.argv):
print("Error: --concurrency flag requires a number", file=sys.stderr)
sys.exit(1)
concurrency = int(sys.argv[concurrency_index + 1])
if concurrency < 1:
print("Error: concurrency must be at least 1", file=sys.stderr)
sys.exit(1)
except ValueError:
print("Error: --concurrency value must be an integer", file=sys.stderr)
sys.exit(1)
# Validate: concurrency > 1 requires --force
if concurrency > 1 and not force:
print(
"Error: --concurrency > 1 requires --force flag (interactive mode not supported with parallel processing)",
file=sys.stderr,
)
sys.exit(1)
# Parse contexts
data_plane_context: str | None = None
control_plane_context: str | None = None
if "--data-plane-context" in sys.argv:
try:
idx = sys.argv.index("--data-plane-context")
if idx + 1 >= len(sys.argv):
print(
"Error: --data-plane-context requires a context name",
file=sys.stderr,
)
sys.exit(1)
data_plane_context = sys.argv[idx + 1]
except ValueError:
pass
if "--control-plane-context" in sys.argv:
try:
idx = sys.argv.index("--control-plane-context")
if idx + 1 >= len(sys.argv):
print(
"Error: --control-plane-context requires a context name",
file=sys.stderr,
)
sys.exit(1)
control_plane_context = sys.argv[idx + 1]
except ValueError:
pass
# Check for CSV mode
if "--csv" in sys.argv:
try:
csv_index = sys.argv.index("--csv")
if csv_index + 1 >= len(sys.argv):
print("Error: --csv flag requires a file path", file=sys.stderr)
sys.exit(1)
csv_path = sys.argv[csv_index + 1]
tenant_ids = read_tenant_ids_from_csv(csv_path)
if not tenant_ids:
print("Error: No tenant IDs found in CSV file", file=sys.stderr)
sys.exit(1)
print(f"Found {len(tenant_ids)} tenant(s) in CSV file: {csv_path}")
except Exception as e:
print(f"Error reading CSV file: {e}", file=sys.stderr)
sys.exit(1)
else:
# Single tenant mode
tenant_ids = [sys.argv[1]]
# Initial confirmation (unless --force is used)
if not force:
print(f"\n{'=' * 80}")
print("TENANT CLEANUP - NO BASTION VERSION")
print(f"{'=' * 80}")
if len(tenant_ids) == 1:
print(f"Tenant ID: {tenant_ids[0]}")
else:
print(f"Number of tenants: {len(tenant_ids)}")
print(f"Tenant IDs: {', '.join(tenant_ids[:5])}")
if len(tenant_ids) > 5:
print(f" ... and {len(tenant_ids) - 5} more")
print("\nThis will:")
print(" 1. Check for remaining documents and connector credential pairs")
print(" 2. Drop the data plane PostgreSQL schema (CASCADE)")
print(" 3. Clean up control plane data (all via pod queries)")
print(f"\n{'=' * 80}")
print("WARNING: This operation is IRREVERSIBLE!")
print(f"{'=' * 80}\n")
response = input("Are you sure you want to proceed? Type 'yes' to confirm: ")
if response.lower() != "yes":
print("Cleanup aborted by user")
sys.exit(0)
else:
print(
f"⚠ FORCE MODE: Running cleanup for {len(tenant_ids)} tenant(s) without confirmations"
)
# Find pods in both clusters before processing
try:
print("Finding data plane worker pod...")
data_plane_pod = find_worker_pod(data_plane_context)
print(f"✓ Using data plane worker pod: {data_plane_pod}")
print("Finding control plane pod...")
control_plane_pod = find_background_pod(control_plane_context)
print(f"✓ Using control plane pod: {control_plane_pod}\n")
# Copy all scripts to data plane pod once
setup_scripts_on_pod(data_plane_pod, data_plane_context)
print()
except Exception as e:
print(f"✗ Failed to find required pods or setup scripts: {e}", file=sys.stderr)
print("Cannot proceed with cleanup")
sys.exit(1)
# Run cleanup for each tenant
failed_tenants = []
successful_tenants = []
skipped_tenants = []
# Open CSV file for writing successful cleanups in real-time
csv_output_path = "cleaned_tenants.csv"
with open(csv_output_path, "w", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["tenant_id", "cleaned_at"])
csv_file.flush()
print(f"Writing successful cleanups to: {csv_output_path}\n")
if concurrency == 1:
# Sequential processing
for idx, tenant_id in enumerate(tenant_ids, 1):
if len(tenant_ids) > 1:
print(f"\n{'=' * 80}")
print(f"Processing tenant {idx}/{len(tenant_ids)}: {tenant_id}")
print(f"{'=' * 80}")
try:
was_cleaned = cleanup_tenant(
tenant_id,
data_plane_pod,
control_plane_pod,
data_plane_context,
control_plane_context,
force,
)
if was_cleaned:
successful_tenants.append(tenant_id)
# Write to CSV immediately after successful cleanup
timestamp = datetime.utcnow().isoformat()
csv_writer.writerow([tenant_id, timestamp])
csv_file.flush()
print(f"✓ Recorded cleanup in {csv_output_path}")
else:
skipped_tenants.append(tenant_id)
print(f"⚠ Tenant {tenant_id} was skipped (not recorded in CSV)")
except Exception as e:
print(
f"✗ Cleanup failed for tenant {tenant_id}: {e}", file=sys.stderr
)
failed_tenants.append((tenant_id, str(e)))
# If not in force mode and there are more tenants, ask if we should continue
if not force and idx < len(tenant_ids):
response = input(
f"\nContinue with remaining {len(tenant_ids) - idx} tenant(s)? (y/n): "
)
if response.lower() != "y":
print("Cleanup aborted by user")
break
else:
# Parallel processing
print(
f"Processing {len(tenant_ids)} tenant(s) with concurrency={concurrency}\n"
)
def process_tenant(tenant_id: str) -> tuple[str, bool, str | None]:
"""Process a single tenant. Returns (tenant_id, was_cleaned, error_message)."""
try:
was_cleaned = cleanup_tenant(
tenant_id,
data_plane_pod,
control_plane_pod,
data_plane_context,
control_plane_context,
force,
)
return (tenant_id, was_cleaned, None)
except Exception as e:
return (tenant_id, False, str(e))
with ThreadPoolExecutor(max_workers=concurrency) as executor:
# Submit all tasks
future_to_tenant = {
executor.submit(process_tenant, tenant_id): tenant_id
for tenant_id in tenant_ids
}
# Process results as they complete
completed = 0
for future in as_completed(future_to_tenant):
completed += 1
tenant_id, was_cleaned, error = future.result()
if error:
with _print_lock:
print(
f"[{completed}/{len(tenant_ids)}] ✗ Failed: {tenant_id}: {error}",
file=sys.stderr,
)
failed_tenants.append((tenant_id, error))
elif was_cleaned:
with _csv_lock:
timestamp = datetime.utcnow().isoformat()
csv_writer.writerow([tenant_id, timestamp])
csv_file.flush()
successful_tenants.append(tenant_id)
with _print_lock:
print(
f"[{completed}/{len(tenant_ids)}] ✓ Cleaned: {tenant_id}"
)
else:
skipped_tenants.append(tenant_id)
with _print_lock:
print(
f"[{completed}/{len(tenant_ids)}] ⊘ Skipped: {tenant_id}"
)
# Print summary
if len(tenant_ids) > 1:
print(f"\n{'=' * 80}")
print("CLEANUP SUMMARY")
print(f"{'=' * 80}")
print(f"Total tenants: {len(tenant_ids)}")
print(f"Successful: {len(successful_tenants)}")
print(f"Skipped: {len(skipped_tenants)}")
print(f"Failed: {len(failed_tenants)}")
print(f"\nSuccessfully cleaned tenants written to: {csv_output_path}")
if skipped_tenants:
print(f"\nSkipped tenants ({len(skipped_tenants)}):")
for tenant_id in skipped_tenants:
print(f" - {tenant_id}")
if failed_tenants:
print(f"\nFailed tenants ({len(failed_tenants)}):")
for tenant_id, error in failed_tenants:
print(f" - {tenant_id}: {error}")
print(f"{'=' * 80}")
if failed_tenants:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,318 @@
"""
Cleanup utilities that work WITHOUT bastion access.
Control plane and data plane are in SEPARATE clusters.
"""
import csv
import json
import subprocess
import sys
from pathlib import Path
def find_worker_pod(context: str | None = None) -> str:
"""Find a user file processing worker pod using kubectl.
Args:
context: Optional kubectl context to use
"""
print(
f"Finding user file processing worker pod{f' in context {context}' if context else ''}..."
)
cmd = ["kubectl", "get", "po"]
if context:
cmd.extend(["--context", context])
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Parse output and find user file processing worker pod
lines = result.stdout.strip().split("\n")
lines = lines[1:] # Skip header
import random
random.shuffle(lines)
for line in lines:
if "celery-worker-user-file-processing" in line and "Running" in line:
pod_name = line.split()[0]
print(f"Found pod: {pod_name}")
return pod_name
raise RuntimeError("No running user file processing worker pod found")
def find_background_pod(context: str | None = None) -> str:
"""Find a background/api-server pod for control plane operations.
Args:
context: Optional kubectl context to use
"""
print(f"Finding background/api pod{f' in context {context}' if context else ''}...")
cmd = ["kubectl", "get", "po"]
if context:
cmd.extend(["--context", context])
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Parse output and find suitable pod
lines = result.stdout.strip().split("\n")
lines = lines[1:] # Skip header
import random
random.shuffle(lines)
# Try to find api-server, background worker, or any celery worker
for line in lines:
if (
any(
name in line
for name in [
"api-server",
"celery-worker-light",
"celery-worker-primary",
"background",
]
)
and "Running" in line
):
pod_name = line.split()[0]
print(f"Found pod: {pod_name}")
return pod_name
raise RuntimeError("No suitable background pod found for control plane operations")
def confirm_step(message: str, force: bool = False) -> bool:
"""Ask for confirmation before executing a step.
Args:
message: The confirmation message to display
force: If True, skip confirmation and return True
Returns:
True if user confirms or force is True, False otherwise
"""
if force:
print(f"[FORCE MODE] Skipping confirmation: {message}")
return True
print(f"\n{message}")
response = input("Proceed? (y/n): ")
return response.lower() == "y"
def execute_control_plane_query_from_pod(
pod_name: str, query: str, context: str | None = None
) -> dict:
"""Execute a SQL query against control plane database from within a pod.
Args:
pod_name: The Kubernetes pod name to execute from
query: The SQL query to execute
context: Optional kubectl context for control plane cluster
Returns:
Dict with 'success' bool, 'stdout' str, and optional 'error' str
"""
# Create a Python script to run the query
# This script tries multiple environment variable patterns
query_script = f'''
import os
from sqlalchemy import create_engine, text
# Try to get control plane database URL from various environment patterns
control_db_url = None
# Pattern 1: POSTGRES_CONTROL_* variables
if os.environ.get("POSTGRES_CONTROL_HOST"):
host = os.environ.get("POSTGRES_CONTROL_HOST")
port = os.environ.get("POSTGRES_CONTROL_PORT", "5432")
db = os.environ.get("POSTGRES_CONTROL_DB", "control")
user = os.environ.get("POSTGRES_CONTROL_USER", "postgres")
password = os.environ.get("POSTGRES_CONTROL_PASSWORD", "")
if password:
control_db_url = f"postgresql://{{user}}:{{password}}@{{host}}:{{port}}/{{db}}"
# Pattern 2: Standard POSTGRES_* variables (might point to control plane in this cluster)
if not control_db_url and os.environ.get("POSTGRES_HOST"):
host = os.environ.get("POSTGRES_HOST")
port = os.environ.get("POSTGRES_PORT", "5432")
db = os.environ.get("POSTGRES_DB", "danswer")
user = os.environ.get("POSTGRES_USER", "postgres")
password = os.environ.get("POSTGRES_PASSWORD", "")
if password:
control_db_url = f"postgresql://{{user}}:{{password}}@{{host}}:{{port}}/{{db}}"
# Pattern 3: Direct URI
if not control_db_url:
control_db_url = os.environ.get("DATABASE_URL") or os.environ.get("POSTGRES_URI")
if not control_db_url:
raise ValueError("Cannot determine control plane database connection. No suitable environment variables found.")
engine = create_engine(control_db_url)
with engine.connect() as conn:
result = conn.execute(text("""{query}"""))
# Check if this is a SELECT query
if result.returns_rows:
rows = [dict(row._mapping) for row in result]
import json
print(json.dumps(rows, default=str))
else:
# For INSERT/UPDATE/DELETE, print rowcount
print(f"{{result.rowcount}} rows affected")
conn.commit()
'''
# Write the script to a temp file on the pod
script_path = "/tmp/control_plane_query.py"
try:
cmd_write = ["kubectl", "exec", pod_name]
if context:
cmd_write.extend(["--context", context])
cmd_write.extend(
[
"--",
"bash",
"-c",
f"cat > {script_path} << 'EOFQUERY'\n{query_script}\nEOFQUERY",
]
)
subprocess.run(
cmd_write,
check=True,
capture_output=True,
)
# Execute the script
cmd_exec = ["kubectl", "exec", pod_name]
if context:
cmd_exec.extend(["--context", context])
cmd_exec.extend(["--", "python", script_path])
result = subprocess.run(
cmd_exec,
capture_output=True,
text=True,
check=True,
)
return {
"success": True,
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip() if result.stderr else "",
}
except subprocess.CalledProcessError as e:
return {
"success": False,
"stdout": e.stdout if e.stdout else "",
"error": e.stderr if e.stderr else str(e),
}
def get_tenant_status(
pod_name: str, tenant_id: str, context: str | None = None
) -> str | None:
"""
Get tenant status from control plane database via pod.
Args:
pod_name: The pod to execute the query from
tenant_id: The tenant ID to look up
context: Optional kubectl context for control plane cluster
Returns:
Tenant status string (e.g., 'GATED_ACCESS', 'ACTIVE') or None if not found
"""
print(f"Fetching tenant status for tenant: {tenant_id}")
query = f"SELECT application_status FROM tenant WHERE tenant_id = '{tenant_id}'"
result = execute_control_plane_query_from_pod(pod_name, query, context)
if not result["success"]:
print(
f"✗ Failed to get tenant status for {tenant_id}: {result.get('error', 'Unknown error')}",
file=sys.stderr,
)
return None
try:
# Parse JSON output
rows = json.loads(result["stdout"])
if rows and len(rows) > 0:
status = rows[0].get("application_status")
if status:
print(f"✓ Tenant status: {status}")
return status
print("⚠ Tenant not found in control plane")
return None
except (json.JSONDecodeError, KeyError, IndexError) as e:
print(f"✗ Failed to parse tenant status: {e}", file=sys.stderr)
return None
def execute_control_plane_delete(
pod_name: str, query: str, context: str | None = None
) -> bool:
"""Execute a DELETE query against control plane database from pod.
Args:
pod_name: The pod to execute the query from
query: The DELETE query to execute
context: Optional kubectl context for control plane cluster
Returns:
True if successful, False otherwise
"""
result = execute_control_plane_query_from_pod(pod_name, query, context)
if result["success"]:
print(f" {result['stdout']}")
return True
else:
print(f" Error: {result.get('error', 'Unknown error')}", file=sys.stderr)
return False
def read_tenant_ids_from_csv(csv_path: str) -> list[str]:
"""Read tenant IDs from CSV file.
Args:
csv_path: Path to CSV file
Returns:
List of tenant IDs
"""
if not Path(csv_path).exists():
raise FileNotFoundError(f"CSV file not found: {csv_path}")
tenant_ids = []
with open(csv_path, "r", newline="", encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
# Check if tenant_id column exists
if not reader.fieldnames or "tenant_id" not in reader.fieldnames:
raise ValueError(
f"CSV file must have a 'tenant_id' column. Found columns: {reader.fieldnames}"
)
for row in reader:
tenant_id = row.get("tenant_id", "").strip()
if tenant_id:
tenant_ids.append(tenant_id)
return tenant_ids

View File

@@ -0,0 +1,485 @@
#!/usr/bin/env python3
"""
Mark connectors for deletion script that works WITHOUT bastion access.
All queries run directly from pods.
Supports two-cluster architecture (data plane and control plane in separate clusters).
Usage:
PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_mark_connectors.py <tenant_id> [--force]
PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_mark_connectors.py --csv <csv_file_path> [--force] [--concurrency N]
With explicit contexts:
PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_mark_connectors.py --csv <csv_file_path> \
--data-plane-context <context> --control-plane-context <context> [--force] [--concurrency N]
"""
import subprocess
import sys
from concurrent.futures import as_completed
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from threading import Lock
from typing import Any
from scripts.tenant_cleanup.no_bastion_cleanup_utils import confirm_step
from scripts.tenant_cleanup.no_bastion_cleanup_utils import find_background_pod
from scripts.tenant_cleanup.no_bastion_cleanup_utils import find_worker_pod
from scripts.tenant_cleanup.no_bastion_cleanup_utils import get_tenant_status
from scripts.tenant_cleanup.no_bastion_cleanup_utils import read_tenant_ids_from_csv
# Global lock for thread-safe printing
_print_lock: Lock = Lock()
def safe_print(*args: Any, **kwargs: Any) -> None:
"""Thread-safe print function."""
with _print_lock:
print(*args, **kwargs)
def run_connector_deletion(
pod_name: str, tenant_id: str, context: str | None = None
) -> None:
"""Mark all connector credential pairs for deletion.
Args:
pod_name: Data plane pod to execute deletion on
tenant_id: Tenant ID to process
context: Optional kubectl context for data plane cluster
"""
safe_print(" Marking all connector credential pairs for deletion...")
# Get the path to the script
script_dir = Path(__file__).parent
mark_deletion_script = (
script_dir / "on_pod_scripts" / "execute_connector_deletion.py"
)
if not mark_deletion_script.exists():
raise FileNotFoundError(
f"execute_connector_deletion.py not found at {mark_deletion_script}"
)
try:
# Copy script to pod
cmd_cp = ["kubectl", "cp"]
if context:
cmd_cp.extend(["--context", context])
cmd_cp.extend(
[
str(mark_deletion_script),
f"{pod_name}:/tmp/execute_connector_deletion.py",
]
)
subprocess.run(
cmd_cp,
check=True,
capture_output=True,
)
# Execute script on pod
cmd_exec = ["kubectl", "exec"]
if context:
cmd_exec.extend(["--context", context])
cmd_exec.extend(
[
pod_name,
"--",
"python",
"/tmp/execute_connector_deletion.py",
tenant_id,
"--all",
]
)
result = subprocess.run(cmd_exec)
if result.returncode != 0:
raise RuntimeError(result.stderr)
except subprocess.CalledProcessError as e:
safe_print(
f" ✗ Failed to mark all connector credential pairs for deletion: {e}",
file=sys.stderr,
)
if e.stderr:
safe_print(f" Error details: {e.stderr}", file=sys.stderr)
raise
except Exception as e:
safe_print(
f" ✗ Failed to mark all connector credential pairs for deletion: {e}",
file=sys.stderr,
)
raise
def mark_tenant_connectors_for_deletion(
tenant_id: str,
data_plane_pod: str,
control_plane_pod: str,
data_plane_context: str | None = None,
control_plane_context: str | None = None,
force: bool = False,
) -> None:
"""Main function to mark all connectors for a tenant for deletion.
Args:
tenant_id: Tenant ID to process
data_plane_pod: Data plane pod for connector operations
control_plane_pod: Control plane pod for status checks
data_plane_context: Optional kubectl context for data plane cluster
control_plane_context: Optional kubectl context for control plane cluster
force: Skip confirmations if True
"""
safe_print(f"Processing connectors for tenant: {tenant_id}")
# Check tenant status first (from control plane)
safe_print(f"\n{'=' * 80}")
try:
tenant_status = get_tenant_status(
control_plane_pod, tenant_id, control_plane_context
)
# If tenant is not GATED_ACCESS, require explicit confirmation even in force mode
if tenant_status and tenant_status != "GATED_ACCESS":
safe_print(
f"\n⚠️ WARNING: Tenant status is '{tenant_status}', not 'GATED_ACCESS'!"
)
safe_print(
"This tenant may be active and should not have connectors deleted without careful review."
)
safe_print(f"{'=' * 80}\n")
# Always ask for confirmation if not gated, even in force mode
if not force:
response = input(
"Are you ABSOLUTELY SURE you want to proceed? Type 'yes' to confirm: "
)
if response.lower() != "yes":
safe_print("Operation aborted - tenant is not GATED_ACCESS")
raise RuntimeError(f"Tenant {tenant_id} is not GATED_ACCESS")
else:
raise RuntimeError(f"Tenant {tenant_id} is not GATED_ACCESS")
elif tenant_status == "GATED_ACCESS":
safe_print("✓ Tenant status is GATED_ACCESS - safe to proceed")
elif tenant_status is None:
safe_print("⚠️ WARNING: Could not determine tenant status!")
if not force:
response = input("Continue anyway? Type 'yes' to confirm: ")
if response.lower() != "yes":
safe_print("Operation aborted - could not verify tenant status")
raise RuntimeError(
f"Could not verify tenant status for {tenant_id}"
)
else:
raise RuntimeError(f"Could not verify tenant status for {tenant_id}")
except Exception as e:
safe_print(f"⚠️ WARNING: Failed to check tenant status: {e}")
if not force:
response = input("Continue anyway? Type 'yes' to confirm: ")
if response.lower() != "yes":
safe_print("Operation aborted - could not verify tenant status")
raise
else:
raise RuntimeError(f"Failed to check tenant status for {tenant_id}")
safe_print(f"{'=' * 80}\n")
# Confirm before proceeding (only in non-force mode)
if not confirm_step(
f"Mark all connector credential pairs for deletion for tenant {tenant_id}?",
force,
):
safe_print("Operation cancelled by user")
raise ValueError("Operation cancelled by user")
run_connector_deletion(data_plane_pod, tenant_id, data_plane_context)
# Print summary
safe_print(
f"✓ Marked all connector credential pairs for deletion for tenant {tenant_id}"
)
def main() -> None:
if len(sys.argv) < 2:
print(
"Usage: PYTHONPATH=. python scripts/tenant_cleanup/"
"no_bastion_mark_connectors.py <tenant_id> [--force] [--concurrency N]"
)
print(
" PYTHONPATH=. python scripts/tenant_cleanup/"
"no_bastion_mark_connectors.py --csv <csv_file_path> "
"[--force] [--concurrency N]"
)
print("\nTwo-cluster architecture (with explicit contexts):")
print(
" PYTHONPATH=. python scripts/tenant_cleanup/no_bastion_mark_connectors.py --csv <csv_file_path> \\"
)
print(
" --data-plane-context <context> --control-plane-context <context> [--force] [--concurrency N]"
)
print("\nThis version runs ALL operations from pods (no bastion required)")
print("\nArguments:")
print(
" tenant_id The tenant ID to process (required if not using --csv)"
)
print(
" --csv PATH Path to CSV file containing tenant IDs to process"
)
print(" --force Skip all confirmation prompts (optional)")
print(
" --concurrency N Process N tenants concurrently (default: 1)"
)
print(
" --data-plane-context CTX Kubectl context for data plane cluster (optional)"
)
print(
" --control-plane-context CTX Kubectl context for control plane cluster (optional)"
)
sys.exit(1)
# Parse arguments
force = "--force" in sys.argv
tenant_ids: list[str] = []
# Parse contexts
data_plane_context: str | None = None
control_plane_context: str | None = None
if "--data-plane-context" in sys.argv:
try:
idx = sys.argv.index("--data-plane-context")
if idx + 1 >= len(sys.argv):
print(
"Error: --data-plane-context requires a context name",
file=sys.stderr,
)
sys.exit(1)
data_plane_context = sys.argv[idx + 1]
except ValueError:
pass
if "--control-plane-context" in sys.argv:
try:
idx = sys.argv.index("--control-plane-context")
if idx + 1 >= len(sys.argv):
print(
"Error: --control-plane-context requires a context name",
file=sys.stderr,
)
sys.exit(1)
control_plane_context = sys.argv[idx + 1]
except ValueError:
pass
# Parse concurrency
concurrency: int = 1
if "--concurrency" in sys.argv:
try:
concurrency_index = sys.argv.index("--concurrency")
if concurrency_index + 1 >= len(sys.argv):
print("Error: --concurrency flag requires a number", file=sys.stderr)
sys.exit(1)
concurrency = int(sys.argv[concurrency_index + 1])
if concurrency < 1:
print("Error: concurrency must be at least 1", file=sys.stderr)
sys.exit(1)
except ValueError:
print("Error: --concurrency value must be an integer", file=sys.stderr)
sys.exit(1)
# Validate: concurrency > 1 requires --force
if concurrency > 1 and not force:
print(
"Error: --concurrency > 1 requires --force flag (interactive mode not supported with parallel processing)",
file=sys.stderr,
)
sys.exit(1)
# Check for CSV mode
if "--csv" in sys.argv:
try:
csv_index: int = sys.argv.index("--csv")
if csv_index + 1 >= len(sys.argv):
print("Error: --csv flag requires a file path", file=sys.stderr)
sys.exit(1)
csv_path: str = sys.argv[csv_index + 1]
tenant_ids = read_tenant_ids_from_csv(csv_path)
if not tenant_ids:
print("Error: No tenant IDs found in CSV file", file=sys.stderr)
sys.exit(1)
print(f"Found {len(tenant_ids)} tenant(s) in CSV file: {csv_path}")
except Exception as e:
print(f"Error reading CSV file: {e}", file=sys.stderr)
sys.exit(1)
else:
# Single tenant mode
tenant_ids = [sys.argv[1]]
# Find pods in both clusters before processing
try:
print("Finding data plane worker pod...")
data_plane_pod: str = find_worker_pod(data_plane_context)
print(f"✓ Using data plane worker pod: {data_plane_pod}")
print("Finding control plane pod...")
control_plane_pod: str = find_background_pod(control_plane_context)
print(f"✓ Using control plane pod: {control_plane_pod}")
except Exception as e:
print(f"✗ Failed to find required pods: {e}", file=sys.stderr)
print("Cannot proceed with marking connectors for deletion")
sys.exit(1)
# Initial confirmation (unless --force is used)
if not force:
print(f"\n{'=' * 80}")
print("MARK CONNECTORS FOR DELETION - NO BASTION VERSION")
print(f"{'=' * 80}")
if len(tenant_ids) == 1:
print(f"Tenant ID: {tenant_ids[0]}")
else:
print(f"Number of tenants: {len(tenant_ids)}")
print(f"Tenant IDs: {', '.join(tenant_ids[:5])}")
if len(tenant_ids) > 5:
print(f" ... and {len(tenant_ids) - 5} more")
print(
f"Mode: {'FORCE (no confirmations)' if force else 'Interactive (will ask for confirmation at each step)'}"
)
print(f"Concurrency: {concurrency} tenant(s) at a time")
print("\nThis will:")
print(" 1. Fetch all connector credential pairs for each tenant")
print(" 2. Cancel any scheduled indexing attempts for each connector")
print(" 3. Mark each connector credential pair status as DELETING")
print(" 4. Trigger the connector deletion task")
print(f"\n{'=' * 80}")
print("WARNING: This will mark connectors for deletion!")
print("The actual deletion will be performed by the background celery worker.")
print(f"{'=' * 80}\n")
response = input("Are you sure you want to proceed? Type 'yes' to confirm: ")
if response.lower() != "yes":
print("Operation aborted by user")
sys.exit(0)
else:
if len(tenant_ids) == 1:
print(
f"⚠ FORCE MODE: Marking connectors for deletion for {tenant_ids[0]} without confirmations"
)
else:
print(
f"⚠ FORCE MODE: Marking connectors for deletion for {len(tenant_ids)} tenants "
f"(concurrency: {concurrency}) without confirmations"
)
# Process tenants (in parallel if concurrency > 1)
failed_tenants: list[tuple[str, str]] = []
successful_tenants: list[str] = []
if concurrency == 1:
# Sequential processing
for idx, tenant_id in enumerate(tenant_ids, 1):
if len(tenant_ids) > 1:
print(f"\n{'=' * 80}")
print(f"Processing tenant {idx}/{len(tenant_ids)}: {tenant_id}")
print(f"{'=' * 80}")
try:
mark_tenant_connectors_for_deletion(
tenant_id,
data_plane_pod,
control_plane_pod,
data_plane_context,
control_plane_context,
force,
)
successful_tenants.append(tenant_id)
except Exception as e:
print(
f"✗ Failed to process tenant {tenant_id}: {e}",
file=sys.stderr,
)
failed_tenants.append((tenant_id, str(e)))
# If not in force mode and there are more tenants, ask if we should continue
if not force and idx < len(tenant_ids):
response = input(
f"\nContinue with remaining {len(tenant_ids) - idx} tenant(s)? (y/n): "
)
if response.lower() != "y":
print("Operation aborted by user")
break
else:
# Parallel processing
print(
f"\nProcessing {len(tenant_ids)} tenant(s) with concurrency={concurrency}"
)
def process_tenant(tenant_id: str) -> tuple[str, bool, str | None]:
"""Process a single tenant. Returns (tenant_id, success, error_message)."""
try:
mark_tenant_connectors_for_deletion(
tenant_id,
data_plane_pod,
control_plane_pod,
data_plane_context,
control_plane_context,
force,
)
return (tenant_id, True, None)
except Exception as e:
return (tenant_id, False, str(e))
with ThreadPoolExecutor(max_workers=concurrency) as executor:
# Submit all tasks
future_to_tenant = {
executor.submit(process_tenant, tenant_id): tenant_id
for tenant_id in tenant_ids
}
# Process results as they complete
completed: int = 0
for future in as_completed(future_to_tenant):
completed += 1
tenant_id, success, error = future.result()
if success:
successful_tenants.append(tenant_id)
safe_print(
f"[{completed}/{len(tenant_ids)}] ✓ Successfully processed {tenant_id}"
)
else:
failed_tenants.append((tenant_id, error or "Unknown error"))
safe_print(
f"[{completed}/{len(tenant_ids)}] ✗ Failed to process {tenant_id}: {error}",
file=sys.stderr,
)
# Print summary if multiple tenants
if len(tenant_ids) > 1:
print(f"\n{'=' * 80}")
print("OPERATION SUMMARY")
print(f"{'=' * 80}")
print(f"Total tenants: {len(tenant_ids)}")
print(f"Successful: {len(successful_tenants)}")
print(f"Failed: {len(failed_tenants)}")
if failed_tenants:
print("\nFailed tenants:")
for tenant_id, error in failed_tenants:
print(f" - {tenant_id}: {error}")
print(f"{'=' * 80}")
if failed_tenants:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -25,6 +25,7 @@ class WebSearchProviderType(str, Enum):
GOOGLE_PSE = "google_pse"
SERPER = "serper"
EXA = "exa"
SEARXNG = "searxng"
class WebContentProviderType(str, Enum):

View File

@@ -229,9 +229,9 @@ class TestSlackEntitiesValidation:
assert entities.search_all_channels is True
assert entities.channels is None
assert entities.exclude_channels is None
assert entities.include_dm is False
assert entities.include_group_dm is False
assert entities.include_private_channels is False
assert entities.include_dm is True
assert entities.include_group_dm is True
assert entities.include_private_channels is True
assert entities.default_search_days == 30
def test_search_all_channels_true(self) -> None:
@@ -322,17 +322,17 @@ class TestSlackEntitiesValidation:
def test_direct_message_filtering(self) -> None:
"""Test DM filtering options"""
# Test 1:1 DMs
entities_dm = SlackEntities(include_dm=True)
assert entities_dm.include_dm is True
assert entities_dm.include_group_dm is False
# Test disabling 1:1 DMs
entities_no_dm = SlackEntities(include_dm=False)
assert entities_no_dm.include_dm is False
assert entities_no_dm.include_group_dm is True # Default is True
# Test group DMs
entities_group_dm = SlackEntities(include_group_dm=True)
assert entities_group_dm.include_dm is False
assert entities_group_dm.include_group_dm is True
# Test disabling group DMs
entities_no_group_dm = SlackEntities(include_group_dm=False)
assert entities_no_group_dm.include_dm is True # Default is True
assert entities_no_group_dm.include_group_dm is False
# Test both
# Test both enabled (defaults)
entities_both = SlackEntities(include_dm=True, include_group_dm=True)
assert entities_both.include_dm is True
assert entities_both.include_group_dm is True

View File

@@ -94,8 +94,8 @@ class TestGetBedrockTokenLimit:
"""Test default fallback for unknown models."""
with patch("onyx.llm.utils.get_model_map", return_value={}):
result = get_bedrock_token_limit("unknown.model-v1:0")
# Should fall back to GEN_AI_MODEL_FALLBACK_MAX_TOKENS (4096)
assert result == 4096
# Should fall back to GEN_AI_MODEL_FALLBACK_MAX_TOKENS (32000)
assert result == 32000
def test_cross_region_model_id(self) -> None:
"""Test cross-region model ID (us.anthropic.claude-...)."""

View File

@@ -298,8 +298,6 @@ services:
cache:
image: redis:7.4-alpine
restart: unless-stopped
ports:
- "6379:6379"
# docker silently mounts /data even without an explicit volume mount, which enables
# persistence. explicitly setting save and appendonly forces ephemeral behavior.
command: redis-server --save "" --appendonly no

View File

@@ -304,8 +304,6 @@ services:
cache:
image: redis:7.4-alpine
restart: unless-stopped
ports:
- "6379:6379"
# docker silently mounts /data even without an explicit volume mount, which enables
# persistence. explicitly setting save and appendonly forces ephemeral behavior.
command: redis-server --save "" --appendonly no

View File

@@ -334,8 +334,6 @@ services:
cache:
image: redis:7.4-alpine
restart: unless-stopped
ports:
- "6379:6379"
# docker silently mounts /data even without an explicit volume mount, which enables
# persistence. explicitly setting save and appendonly forces ephemeral behavior.
command: redis-server --save "" --appendonly no

View File

@@ -5,7 +5,7 @@ home: https://www.onyx.app/
sources:
- "https://github.com/onyx-dot-app/onyx"
type: application
version: 0.4.15
version: 0.4.17
appVersion: latest
annotations:
category: Productivity

View File

@@ -13,7 +13,7 @@ data:
VESPA_HOST: {{ .Values.vespa.name }}.{{ .Values.vespa.service.name }}.{{ .Release.Namespace }}.svc.cluster.local
{{- end }}
{{- if .Values.redis.enabled }}
REDIS_HOST: {{ .Release.Name }}-redis-master
REDIS_HOST: {{ .Values.redis.redisStandalone.name | default .Release.Name }}-master
{{- end }}
MODEL_SERVER_HOST: "{{ include "onyx.fullname" . }}-inference-model-service"
INDEXING_MODEL_SERVER_HOST: "{{ include "onyx.fullname" . }}-indexing-model-service"

View File

@@ -10,6 +10,10 @@ metadata:
{{- end }}
spec:
replicas: {{ .Values.indexCapability.replicaCount }}
{{- with .Values.indexCapability.strategy }}
strategy:
{{- toYaml . | nindent 4 }}
{{- end }}
selector:
matchLabels:
{{- include "onyx.selectorLabels" . | nindent 6 }}

View File

@@ -9,6 +9,10 @@ metadata:
{{- end }}
spec:
replicas: {{ .Values.inferenceCapability.replicaCount }}
{{- with .Values.inferenceCapability.strategy }}
strategy:
{{- toYaml . | nindent 4 }}
{{- end }}
selector:
matchLabels:
{{- range .Values.inferenceCapability.labels }}

View File

@@ -105,6 +105,15 @@ inferenceCapability:
nodeSelector: {}
tolerations: []
affinity: {}
# Deployment strategy - use Recreate or RollingUpdate with maxSurge: 0 to terminate old pod first
# This prevents pending pods when cluster resources are constrained
strategy: {}
# Example for RollingUpdate that terminates old pod first:
# strategy:
# type: RollingUpdate
# rollingUpdate:
# maxSurge: 0
# maxUnavailable: 1
indexCapability:
service:
@@ -141,6 +150,15 @@ indexCapability:
nodeSelector: {}
tolerations: []
affinity: {}
# Deployment strategy - use Recreate or RollingUpdate with maxSurge: 0 to terminate old pod first
# This prevents pending pods when cluster resources are constrained
strategy: {}
# Example for RollingUpdate that terminates old pod first:
# strategy:
# type: RollingUpdate
# rollingUpdate:
# maxSurge: 0
# maxUnavailable: 1
config:
envConfigMapName: env-configmap

View File

@@ -7,7 +7,7 @@ This directory contains Terraform modules to provision the core AWS infrastructu
- `eks`: Provisions an Amazon EKS cluster, essential addons (EBS CSI, metrics server, cluster autoscaler), and optional IRSA for S3 access
- `postgres`: Creates an Amazon RDS for PostgreSQL instance and returns a connection URL
- `redis`: Creates an ElastiCache for Redis replication group
- `s3`: Creates an S3 bucket (and VPC endpoint) for file storage
- `s3`: Creates an S3 bucket and locks access to a provided S3 VPC endpoint
- `onyx`: A higher-level composition that wires the above modules together for a complete, opinionated stack
Use the `onyx` module if you want a working EKS + Postgres + Redis + S3 stack with sane defaults. Use the individual modules if you need more granular control.
@@ -93,7 +93,7 @@ terraform apply
```
### Using an existing VPC
If you already have a VPC and subnets, disable VPC creation and provide IDs and CIDR:
If you already have a VPC and subnets, disable VPC creation and provide IDs, CIDR, and the ID of the existing S3 gateway endpoint in that VPC:
```hcl
module "onyx" {
@@ -109,6 +109,7 @@ module "onyx" {
private_subnets = ["subnet-aaaa", "subnet-bbbb", "subnet-cccc"]
public_subnets = ["subnet-dddd", "subnet-eeee", "subnet-ffff"]
vpc_cidr_block = "10.0.0.0/16"
s3_vpc_endpoint_id = "vpce-xxxxxxxxxxxxxxxxx"
}
```
@@ -125,11 +126,11 @@ module "onyx" {
Inputs (common):
- `name` (default `onyx`), `region` (default `us-west-2`), `tags`
- `postgres_username`, `postgres_password`
- `create_vpc` (default true) or existing VPC details
- `create_vpc` (default true) or existing VPC details and `s3_vpc_endpoint_id`
### `vpc`
- Builds a VPC sized for EKS with multiple private and public subnets
- Outputs: `vpc_id`, `private_subnets`, `public_subnets`, `vpc_cidr_block`
- Outputs: `vpc_id`, `private_subnets`, `public_subnets`, `vpc_cidr_block`, `s3_vpc_endpoint_id`
### `eks`
- Creates the EKS cluster and node groups
@@ -155,7 +156,7 @@ Key inputs include:
- Outputs endpoint, port, and whether SSL is enabled
### `s3`
- Creates an S3 bucket for file storage and a gateway VPC endpoint for private access
- Creates an S3 bucket for file storage and scopes access to the provided S3 gateway VPC endpoint
## Installing the Onyx Helm chart (after Terraform)
Once the cluster is active, deploy application workloads via Helm. You can use the chart in `deployment/helm/charts/onyx`.

View File

@@ -56,11 +56,10 @@ module "postgres" {
}
module "s3" {
source = "../s3"
bucket_name = local.bucket_name
region = var.region
vpc_id = local.vpc_id
tags = local.merged_tags
source = "../s3"
bucket_name = local.bucket_name
tags = local.merged_tags
s3_vpc_endpoint_id = var.create_vpc ? module.vpc[0].s3_vpc_endpoint_id : var.s3_vpc_endpoint_id
}
module "eks" {

View File

@@ -40,6 +40,17 @@ variable "vpc_cidr_block" {
default = null
}
variable "s3_vpc_endpoint_id" {
type = string
description = "ID of an existing S3 gateway VPC endpoint when reusing an existing VPC"
default = null
validation {
condition = var.create_vpc || var.s3_vpc_endpoint_id != null
error_message = "s3_vpc_endpoint_id must be provided when create_vpc is false."
}
}
variable "tags" {
type = map(string)
description = "Base tags applied to all AWS resources"

View File

@@ -3,21 +3,6 @@ resource "aws_s3_bucket" "bucket" {
tags = var.tags
}
data "aws_route_tables" "vpc" {
filter {
name = "vpc-id"
values = [var.vpc_id]
}
}
resource "aws_vpc_endpoint" "s3" {
vpc_id = var.vpc_id
service_name = "com.amazonaws.${var.region}.s3"
vpc_endpoint_type = "Gateway"
route_table_ids = data.aws_route_tables.vpc.ids
tags = var.tags
}
resource "aws_s3_bucket_policy" "bucket_policy" {
bucket = aws_s3_bucket.bucket.id
@@ -38,7 +23,7 @@ resource "aws_s3_bucket_policy" "bucket_policy" {
],
Condition = {
StringEquals = {
"aws:SourceVpce" = aws_vpc_endpoint.s3.id
"aws:SourceVpce" = var.s3_vpc_endpoint_id
}
}
}

View File

@@ -3,18 +3,13 @@ variable "bucket_name" {
description = "Name of the S3 bucket"
}
variable "region" {
type = string
description = "AWS region"
}
variable "vpc_id" {
type = string
description = "VPC ID where your EKS cluster runs"
}
variable "tags" {
type = map(string)
description = "Tags to apply to S3 resources and VPC endpoint"
description = "Tags to apply to S3 resources"
default = {}
}
variable "s3_vpc_endpoint_id" {
type = string
description = "ID of the S3 gateway VPC endpoint allowed to access this bucket"
}

View File

@@ -6,6 +6,8 @@ data "aws_availability_zones" "available" {
}
}
data "aws_region" "current" {}
module "vpc" {
source = "terraform-aws-modules/vpc/aws"
version = "5.0.0"
@@ -33,3 +35,20 @@ module "vpc" {
tags = var.tags
}
data "aws_route_tables" "this" {
filter {
name = "vpc-id"
values = [module.vpc.vpc_id]
}
depends_on = [module.vpc]
}
resource "aws_vpc_endpoint" "s3" {
vpc_id = module.vpc.vpc_id
service_name = "com.amazonaws.${data.aws_region.current.name}.s3"
vpc_endpoint_type = "Gateway"
route_table_ids = data.aws_route_tables.this.ids
tags = var.tags
}

View File

@@ -13,3 +13,8 @@ output "public_subnets" {
output "vpc_cidr_block" {
value = module.vpc.vpc_cidr_block
}
output "s3_vpc_endpoint_id" {
description = "ID of the S3 gateway VPC endpoint created for this VPC"
value = aws_vpc_endpoint.s3.id
}

View File

@@ -47,7 +47,7 @@ backend = [
"fastapi-users-db-sqlalchemy==5.0.0",
"fastapi-limiter==0.1.6",
"fastmcp==2.13.3",
"filelock==3.15.4",
"filelock==3.20.1",
"google-api-python-client==2.86.0",
"google-auth-httplib2==0.1.0",
"google-auth-oauthlib==1.0.0",
@@ -112,12 +112,12 @@ backend = [
"unstructured==0.15.1",
"unstructured-client==0.25.4",
"zulip==0.8.2",
"hubspot-api-client==8.1.0",
"hubspot-api-client==11.1.0",
"asana==5.0.8",
"dropbox==12.0.2",
"shapely==2.0.6",
"stripe==10.12.0",
"urllib3==2.6.0",
"urllib3==2.6.1",
"mistune==0.8.4",
"sendgrid==6.12.5",
"exa_py==1.15.4",

25
uv.lock generated
View File

@@ -1505,11 +1505,11 @@ wheels = [
[[package]]
name = "filelock"
version = "3.15.4"
version = "3.20.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/08/dd/49e06f09b6645156550fb9aee9cc1e59aba7efbc972d665a1bd6ae0435d4/filelock-3.15.4.tar.gz", hash = "sha256:2207938cbc1844345cb01a5a95524dae30f0ce089eba5b00378295a17e3e90cb", size = 18007, upload-time = "2024-06-22T15:59:14.749Z" }
sdist = { url = "https://files.pythonhosted.org/packages/a7/23/ce7a1126827cedeb958fc043d61745754464eb56c5937c35bbf2b8e26f34/filelock-3.20.1.tar.gz", hash = "sha256:b8360948b351b80f420878d8516519a2204b07aefcdcfd24912a5d33127f188c", size = 19476, upload-time = "2025-12-15T23:54:28.027Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ae/f0/48285f0262fe47103a4a45972ed2f9b93e4c80b8fd609fa98da78b2a5706/filelock-3.15.4-py3-none-any.whl", hash = "sha256:6ca1fffae96225dab4c6eaf1c4f4f28cd2568d3ec2a44e15a08520504de468e7", size = 16159, upload-time = "2024-06-22T15:59:12.695Z" },
{ url = "https://files.pythonhosted.org/packages/e3/7f/a1a97644e39e7316d850784c642093c99df1290a460df4ede27659056834/filelock-3.20.1-py3-none-any.whl", hash = "sha256:15d9e9a67306188a44baa72f569d2bfd803076269365fdea0934385da4dc361a", size = 16666, upload-time = "2025-12-15T23:54:26.874Z" },
]
[[package]]
@@ -2074,17 +2074,18 @@ wheels = [
[[package]]
name = "hubspot-api-client"
version = "8.1.0"
version = "11.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "python-dateutil" },
{ name = "requests" },
{ name = "six" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ad/f7/7318da1458f044ac32ade38af29c897d17f5efe47a84d3b40f43fe67b303/hubspot-api-client-8.1.0.tar.gz", hash = "sha256:e8c25c2a12ab3801a6ab051ad7b25766a2e743928ed6ce9a309672c53b973044", size = 1499140, upload-time = "2023-08-07T12:25:11.342Z" }
sdist = { url = "https://files.pythonhosted.org/packages/08/52/11b0ecd4fd6e175813a0017ab4bc4697bfde058cac9b98938fa28460028a/hubspot_api_client-11.1.0.tar.gz", hash = "sha256:93ed914f1cd4dad67bacf26b8a1ec8506847483fa801cb2bddd4b2d887d0f825", size = 1732887, upload-time = "2024-12-13T08:20:44.38Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bc/85/36eed8faa45617eb6bede804ac6e55bd3842a910f38291494678228b4db7/hubspot_api_client-8.1.0-py3-none-any.whl", hash = "sha256:2b255b6330770b263995baa07a831940f8bd6a692e8462a3a83771f65e718736", size = 3379938, upload-time = "2023-08-07T12:25:09.061Z" },
{ url = "https://files.pythonhosted.org/packages/a1/d3/a2dbdea1f491d5bf362d74d4b7342ece6c043a43c8ee997f8fb8b776239d/hubspot_api_client-11.1.0-py3-none-any.whl", hash = "sha256:0c49e2c2f511a56d249c6890d2dfdd62afd04f66edc69a591e8348e28804634b", size = 3945382, upload-time = "2024-12-13T08:20:42.129Z" },
]
[[package]]
@@ -3562,7 +3563,7 @@ requires-dist = [
{ name = "fastapi-users", marker = "extra == 'backend'", specifier = "==14.0.1" },
{ name = "fastapi-users-db-sqlalchemy", marker = "extra == 'backend'", specifier = "==5.0.0" },
{ name = "fastmcp", marker = "extra == 'backend'", specifier = "==2.13.3" },
{ name = "filelock", marker = "extra == 'backend'", specifier = "==3.15.4" },
{ name = "filelock", marker = "extra == 'backend'", specifier = "==3.20.1" },
{ name = "google-api-python-client", marker = "extra == 'backend'", specifier = "==2.86.0" },
{ name = "google-auth-httplib2", marker = "extra == 'backend'", specifier = "==0.1.0" },
{ name = "google-auth-oauthlib", marker = "extra == 'backend'", specifier = "==1.0.0" },
@@ -3572,7 +3573,7 @@ requires-dist = [
{ name = "httpcore", marker = "extra == 'backend'", specifier = "==1.0.9" },
{ name = "httpx", extras = ["http2"], marker = "extra == 'backend'", specifier = "==0.28.1" },
{ name = "httpx-oauth", marker = "extra == 'backend'", specifier = "==0.15.1" },
{ name = "hubspot-api-client", marker = "extra == 'backend'", specifier = "==8.1.0" },
{ name = "hubspot-api-client", marker = "extra == 'backend'", specifier = "==11.1.0" },
{ name = "huggingface-hub", marker = "extra == 'backend'", specifier = "==0.35.3" },
{ name = "inflection", marker = "extra == 'backend'", specifier = "==0.5.1" },
{ name = "ipykernel", marker = "extra == 'dev'", specifier = "==6.29.5" },
@@ -3680,7 +3681,7 @@ requires-dist = [
{ name = "types-setuptools", marker = "extra == 'dev'", specifier = "==68.0.0.3" },
{ name = "unstructured", marker = "extra == 'backend'", specifier = "==0.15.1" },
{ name = "unstructured-client", marker = "extra == 'backend'", specifier = "==0.25.4" },
{ name = "urllib3", marker = "extra == 'backend'", specifier = "==2.6.0" },
{ name = "urllib3", marker = "extra == 'backend'", specifier = "==2.6.1" },
{ name = "uvicorn", specifier = "==0.35.0" },
{ name = "voyageai", specifier = "==0.2.3" },
{ name = "xmlsec", marker = "extra == 'backend'", specifier = "==1.3.14" },
@@ -6328,11 +6329,11 @@ wheels = [
[[package]]
name = "urllib3"
version = "2.6.0"
version = "2.6.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/1c/43/554c2569b62f49350597348fc3ac70f786e3c32e7f19d266e19817812dd3/urllib3-2.6.0.tar.gz", hash = "sha256:cb9bcef5a4b345d5da5d145dc3e30834f58e8018828cbc724d30b4cb7d4d49f1", size = 432585, upload-time = "2025-12-05T15:08:47.885Z" }
sdist = { url = "https://files.pythonhosted.org/packages/5e/1d/0f3a93cca1ac5e8287842ed4eebbd0f7a991315089b1a0b01c7788aa7b63/urllib3-2.6.1.tar.gz", hash = "sha256:5379eb6e1aba4088bae84f8242960017ec8d8e3decf30480b3a1abdaa9671a3f", size = 432678, upload-time = "2025-12-08T15:25:26.773Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/56/1a/9ffe814d317c5224166b23e7c47f606d6e473712a2fad0f704ea9b99f246/urllib3-2.6.0-py3-none-any.whl", hash = "sha256:c90f7a39f716c572c4e3e58509581ebd83f9b59cced005b7db7ad2d22b0db99f", size = 131083, upload-time = "2025-12-05T15:08:45.983Z" },
{ url = "https://files.pythonhosted.org/packages/bc/56/190ceb8cb10511b730b564fb1e0293fa468363dbad26145c34928a60cb0c/urllib3-2.6.1-py3-none-any.whl", hash = "sha256:e67d06fe947c36a7ca39f4994b08d73922d40e6cca949907be05efa6fd75110b", size = 131138, upload-time = "2025-12-08T15:25:25.51Z" },
]
[[package]]

3
web/.gitignore vendored
View File

@@ -1,7 +1,8 @@
# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.
.env.sentry-build-plugin
# dependencies
/node_modules
node_modules
/.pnp
.pnp.js

View File

@@ -32,6 +32,8 @@ const sharedConfig = {
// Path aliases (must come after specific mocks)
"^@/(.*)$": "<rootDir>/src/$1",
"^@tests/(.*)$": "<rootDir>/tests/$1",
"^@opal$": "<rootDir>/lib/opal/src/index.ts",
"^@opal/(.*)$": "<rootDir>/lib/opal/src/$1",
},
testPathIgnorePatterns: ["/node_modules/", "/tests/e2e/", "/.next/"],

View File

@@ -5,7 +5,7 @@ A Typescript component library for Onyx.
## Usage
```tsx
import { Button } from "@onyx/opal";
import { Button } from "@opal/components";
function MyComponent() {
return <Button onClick={() => console.log("Clicked!")}>Click me</Button>;

View File

@@ -1,11 +1,14 @@
{
"name": "@onyx/opal",
"version": "0.0.1",
"main": "src/index.ts",
"types": "src/index.ts",
"scripts": {
"types:check": "tsc --noEmit",
"format": "prettier --write \"src/**/*.{ts,tsx,js,jsx,json,css,md}\"",
"format:check": "prettier --check \"src/**/*.{ts,tsx,js,jsx,json,css,md}\""
"exports": {
"./icons": {
"types": "./src/icons/index.ts",
"default": "./src/icons/index.ts"
},
"./types": {
"types": "./src/types.ts",
"default": "./src/types.ts"
}
}
}

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgActions = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgActivity = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgAddLines = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgAlertCircle = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgAlertTriangle = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,3 @@
import * as React from "react";
import type { SVGProps } from "react";
const SvgArrowDownDot = (props: SVGProps<SVGSVGElement>) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgArrowExchange = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,3 @@
import * as React from "react";
import type { SVGProps } from "react";
const SvgArrowLeftDot = (props: SVGProps<SVGSVGElement>) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgArrowLeft = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgArrowRightCircle = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,3 @@
import * as React from "react";
import type { SVGProps } from "react";
const SvgArrowRightDot = (props: SVGProps<SVGSVGElement>) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgArrowRight = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,3 @@
import * as React from "react";
import type { SVGProps } from "react";
const SvgArrowUpDot = (props: SVGProps<SVGSVGElement>) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgArrowUpRight = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgArrowUp = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgArrowWallRight = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgAws = (props: IconProps) => {
const { className, size = 24, ...rest } = props;

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgBarChart = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,4 @@
import { IconProps } from "@/icons";
import type { IconProps } from "@opal/types";
const SvgBell = ({ size, ...props }: IconProps) => (
<svg

View File

@@ -1,4 +1,3 @@
import * as React from "react";
import type { SVGProps } from "react";
const SvgBracketCurly = (props: SVGProps<SVGSVGElement>) => (

Some files were not shown because too many files have changed in this diff Show More