Compare commits

..

4 Commits

Author SHA1 Message Date
Evan Lohn
d8b672bb8e fix: eager load chat session persona 2026-03-23 16:17:10 -07:00
Evan Lohn
f3e38a7ef7 refactor: filter fields 2026-03-23 16:17:09 -07:00
Evan Lohn
a4c9926eb1 pr comments 2026-03-23 16:16:02 -07:00
Evan Lohn
8c63831fff chore: use efficient persona id query path 2026-03-23 14:01:11 -07:00
148 changed files with 1380 additions and 4471 deletions

View File

@@ -44,7 +44,7 @@ jobs:
fetch-tags: true
- name: Setup uv
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
with:
version: "0.9.9"
enable-cache: false
@@ -165,7 +165,7 @@ jobs:
fetch-depth: 0
- name: Setup uv
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
with:
version: "0.9.9"
# NOTE: This isn't caching much and zizmor suggests this could be poisoned, so disable.
@@ -307,7 +307,7 @@ jobs:
xdg-utils
- name: setup node
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v6.3.0
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v6.2.0
with:
node-version: 24
package-manager-cache: false

View File

@@ -114,7 +114,7 @@ jobs:
ref: main
- name: Install the latest version of uv
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"

View File

@@ -50,7 +50,7 @@ jobs:
persist-credentials: false
- name: Setup node
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238
with:
node-version: 24
cache: "npm" # zizmor: ignore[cache-poisoning]

View File

@@ -28,7 +28,7 @@ jobs:
persist-credentials: false
- name: Setup node
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v4
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm" # zizmor: ignore[cache-poisoning] test-only workflow; no deploy artifacts

View File

@@ -272,7 +272,7 @@ jobs:
- name: Setup node
# zizmor: ignore[cache-poisoning] ephemeral runners; no release artifacts
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v4
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm" # zizmor: ignore[cache-poisoning]
@@ -471,7 +471,7 @@ jobs:
- name: Install the latest version of uv
if: always()
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"
@@ -614,7 +614,7 @@ jobs:
- name: Setup node
# zizmor: ignore[cache-poisoning] ephemeral runners; no release artifacts
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v4
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm" # zizmor: ignore[cache-poisoning]

View File

@@ -73,7 +73,7 @@ jobs:
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f
- name: Build and load
uses: docker/bake-action@82490499d2e5613fcead7e128237ef0b0ea210f7 # ratchet:docker/bake-action@v7.0.0
uses: docker/bake-action@5be5f02ff8819ecd3092ea6b2e6261c31774f2b4 # ratchet:docker/bake-action@v6
env:
TAG: model-server-${{ github.run_id }}
with:

View File

@@ -30,7 +30,7 @@ jobs:
- name: Setup Terraform
uses: hashicorp/setup-terraform@5e8dbf3c6d9deaf4193ca7a8fb23f2ac83bb6c85 # ratchet:hashicorp/setup-terraform@v4.0.0
- name: Setup node
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v6
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v6
with: # zizmor: ignore[cache-poisoning]
node-version: 22
cache: "npm"

View File

@@ -22,7 +22,7 @@ jobs:
persist-credentials: false
- name: Setup node
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v4
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm"

View File

@@ -26,7 +26,7 @@ jobs:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
- uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"

View File

@@ -26,7 +26,7 @@ jobs:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
- uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"

View File

@@ -32,7 +32,7 @@ jobs:
persist-credentials: false
- name: Setup node
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # ratchet:actions/setup-node@v4
uses: actions/setup-node@6044e13b5dc448c55e2357c09f80417699197238 # ratchet:actions/setup-node@v4
with:
node-version: 22
cache: "npm"

View File

@@ -24,7 +24,7 @@ jobs:
persist-credentials: false
- name: Install the latest version of uv
uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # ratchet:astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@5a095e7a2014a4212f075830d4f7277575a9d098 # ratchet:astral-sh/setup-uv@v7
with:
enable-cache: false
version: "0.9.9"

View File

@@ -1,26 +0,0 @@
"""rename persona is_visible to is_listed and featured to is_featured
Revision ID: b728689f45b1
Revises: 689433b0d8de
Create Date: 2026-03-23 12:36:26.607305
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "b728689f45b1"
down_revision = "689433b0d8de"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.alter_column("persona", "is_visible", new_column_name="is_listed")
op.alter_column("persona", "featured", new_column_name="is_featured")
def downgrade() -> None:
op.alter_column("persona", "is_listed", new_column_name="is_visible")
op.alter_column("persona", "is_featured", new_column_name="featured")

View File

@@ -36,56 +36,6 @@ TABLES_WITH_USER_ID = [
]
def _dedupe_null_notifications(connection: sa.Connection) -> None:
# Multiple NULL-owned notifications can exist because the unique index treats
# NULL user_id values as distinct. Before migrating them to the anonymous
# user, collapse duplicates and remove rows that would conflict with an
# already-existing anonymous notification.
result = connection.execute(
sa.text(
"""
WITH ranked_null_notifications AS (
SELECT
id,
ROW_NUMBER() OVER (
PARTITION BY notif_type, COALESCE(additional_data, '{}'::jsonb)
ORDER BY first_shown DESC, last_shown DESC, id DESC
) AS row_num
FROM notification
WHERE user_id IS NULL
)
DELETE FROM notification
WHERE id IN (
SELECT id
FROM ranked_null_notifications
WHERE row_num > 1
)
"""
)
)
if result.rowcount > 0:
print(f"Deleted {result.rowcount} duplicate NULL-owned notifications")
result = connection.execute(
sa.text(
"""
DELETE FROM notification AS null_owned
USING notification AS anonymous_owned
WHERE null_owned.user_id IS NULL
AND anonymous_owned.user_id = :user_id
AND null_owned.notif_type = anonymous_owned.notif_type
AND COALESCE(null_owned.additional_data, '{}'::jsonb) =
COALESCE(anonymous_owned.additional_data, '{}'::jsonb)
"""
),
{"user_id": ANONYMOUS_USER_UUID},
)
if result.rowcount > 0:
print(
f"Deleted {result.rowcount} NULL-owned notifications that conflict with existing anonymous-owned notifications"
)
def upgrade() -> None:
"""
Create the anonymous user for anonymous access feature.
@@ -115,12 +65,7 @@ def upgrade() -> None:
# Migrate any remaining user_id=NULL records to anonymous user
for table in TABLES_WITH_USER_ID:
# Dedup notifications outside the savepoint so deletions persist
# even if the subsequent UPDATE rolls back
if table == "notification":
_dedupe_null_notifications(connection)
with connection.begin_nested():
try:
# Exclude public credential (id=0) which must remain user_id=NULL
# Exclude builtin tools (in_code_tool_id IS NOT NULL) which must remain user_id=NULL
# Exclude builtin personas (builtin_persona=True) which must remain user_id=NULL
@@ -135,7 +80,6 @@ def upgrade() -> None:
condition = "user_id IS NULL AND is_public = false"
else:
condition = "user_id IS NULL"
result = connection.execute(
sa.text(
f"""
@@ -148,19 +92,19 @@ def upgrade() -> None:
)
if result.rowcount > 0:
print(f"Updated {result.rowcount} rows in {table} to anonymous user")
except Exception as e:
print(f"Skipping {table}: {e}")
def downgrade() -> None:
"""
Set anonymous user's records back to NULL and delete the anonymous user.
Note: Duplicate NULL-owned notifications removed during upgrade are not restored.
"""
connection = op.get_bind()
# Set records back to NULL
for table in TABLES_WITH_USER_ID:
with connection.begin_nested():
try:
connection.execute(
sa.text(
f"""
@@ -171,6 +115,8 @@ def downgrade() -> None:
),
{"user_id": ANONYMOUS_USER_UUID},
)
except Exception:
pass
# Delete the anonymous user
connection.execute(

View File

@@ -157,11 +157,7 @@ def fetch_logo_helper(db_session: Session) -> Response: # noqa: ARG001
detail="No logo file found",
)
else:
return Response(
content=onyx_file.data,
media_type=onyx_file.mime_type,
headers={"Cache-Control": "no-cache"},
)
return Response(content=onyx_file.data, media_type=onyx_file.mime_type)
def fetch_logotype_helper(db_session: Session) -> Response: # noqa: ARG001

View File

@@ -178,7 +178,7 @@ def _seed_personas(db_session: Session, personas: list[PersonaUpsertRequest]) ->
system_prompt=persona.system_prompt,
task_prompt=persona.task_prompt,
datetime_aware=persona.datetime_aware,
is_featured=persona.is_featured,
featured=persona.featured,
commit=False,
)
db_session.commit()

View File

@@ -80,45 +80,15 @@ def capture_and_sync_with_alternate_posthog(
logger.error(f"Error identifying cloud posthog user: {e}")
def alias_user(distinct_id: str, anonymous_id: str) -> None:
"""Link an anonymous distinct_id to an identified user, merging person profiles.
No-ops when the IDs match (e.g. returning users whose PostHog cookie
already contains their identified user ID).
"""
if not posthog or anonymous_id == distinct_id:
return
try:
posthog.alias(previous_id=anonymous_id, distinct_id=distinct_id)
posthog.flush()
except Exception as e:
logger.error(f"Error aliasing PostHog user: {e}")
def get_anon_id_from_request(request: Any) -> str | None:
"""Extract the anonymous distinct_id from the app PostHog cookie on a request."""
if not POSTHOG_API_KEY:
return None
cookie_name = f"ph_{POSTHOG_API_KEY}_posthog"
if (cookie_value := request.cookies.get(cookie_name)) and (
parsed := parse_posthog_cookie(cookie_value)
):
return parsed.get("distinct_id")
return None
def get_marketing_posthog_cookie_name() -> str | None:
if not MARKETING_POSTHOG_API_KEY:
return None
return f"onyx_custom_ph_{MARKETING_POSTHOG_API_KEY}_posthog"
def parse_posthog_cookie(cookie_value: str) -> dict[str, Any] | None:
def parse_marketing_cookie(cookie_value: str) -> dict[str, Any] | None:
"""
Parse a URL-encoded JSON PostHog cookie
Parse the URL-encoded JSON marketing cookie.
Expected format (URL-encoded):
{"distinct_id":"...", "featureFlags":{"landing_page_variant":"..."}, ...}
@@ -132,7 +102,7 @@ def parse_posthog_cookie(cookie_value: str) -> dict[str, Any] | None:
cookie_data = json.loads(decoded_cookie)
distinct_id = cookie_data.get("distinct_id")
if not distinct_id or not isinstance(distinct_id, str):
if not distinct_id:
return None
return cookie_data

View File

@@ -135,8 +135,6 @@ from onyx.redis.redis_pool import retrieve_ws_token_data
from onyx.server.settings.store import load_settings
from onyx.server.utils import BasicAuthenticationError
from onyx.utils.logger import setup_logger
from onyx.utils.telemetry import mt_cloud_alias
from onyx.utils.telemetry import mt_cloud_get_anon_id
from onyx.utils.telemetry import mt_cloud_identify
from onyx.utils.telemetry import mt_cloud_telemetry
from onyx.utils.telemetry import optional_telemetry
@@ -253,12 +251,18 @@ def verify_email_is_invited(email: str) -> None:
whitelist = get_invited_users()
if not email:
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email must be specified")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": "Email must be specified"},
)
try:
email_info = validate_email(email, check_deliverability=False)
except EmailUndeliverableError:
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email is not valid")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": "Email is not valid"},
)
for email_whitelist in whitelist:
try:
@@ -275,9 +279,12 @@ def verify_email_is_invited(email: str) -> None:
if email_info.normalized.lower() == email_info_whitelist.normalized.lower():
return
raise OnyxError(
OnyxErrorCode.UNAUTHORIZED,
"This workspace is invite-only. Please ask your admin to invite you.",
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"code": REGISTER_INVITE_ONLY_CODE,
"reason": "This workspace is invite-only. Please ask your admin to invite you.",
},
)
@@ -287,47 +294,48 @@ def verify_email_in_whitelist(email: str, tenant_id: str) -> None:
verify_email_is_invited(email)
def verify_email_domain(email: str, *, is_registration: bool = False) -> None:
def verify_email_domain(email: str) -> None:
if email.count("@") != 1:
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email is not valid")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email is not valid",
)
local_part, domain = email.split("@")
domain = domain.lower()
local_part = local_part.lower()
if AUTH_TYPE == AuthType.CLOUD:
# Normalize googlemail.com to gmail.com (they deliver to the same inbox)
if domain == "googlemail.com":
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Please use @gmail.com instead of @googlemail.com.",
)
# Only block dotted Gmail on new signups — existing users must still be
# able to sign in with the address they originally registered with.
if is_registration and domain == "gmail.com" and "." in local_part:
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Gmail addresses with '.' are not allowed. Please use your base email address.",
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": "Please use @gmail.com instead of @googlemail.com."},
)
if "+" in local_part and domain != "onyx.app":
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Email addresses with '+' are not allowed. Please use your base email address.",
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"reason": "Email addresses with '+' are not allowed. Please use your base email address."
},
)
# Check if email uses a disposable/temporary domain
if is_disposable_email(email):
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Disposable email addresses are not allowed. Please use a permanent email address.",
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"reason": "Disposable email addresses are not allowed. Please use a permanent email address."
},
)
# Check domain whitelist if configured
if VALID_EMAIL_DOMAINS:
if domain not in VALID_EMAIL_DOMAINS:
raise OnyxError(OnyxErrorCode.INVALID_INPUT, "Email domain is not valid")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email domain is not valid",
)
def enforce_seat_limit(db_session: Session, seats_needed: int = 1) -> None:
@@ -343,7 +351,7 @@ def enforce_seat_limit(db_session: Session, seats_needed: int = 1) -> None:
)(db_session, seats_needed=seats_needed)
if result is not None and not result.available:
raise OnyxError(OnyxErrorCode.SEAT_LIMIT_EXCEEDED, result.error_message)
raise HTTPException(status_code=402, detail=result.error_message)
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
@@ -396,7 +404,10 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
captcha_token or "", expected_action="signup"
)
except CaptchaVerificationError as e:
raise OnyxError(OnyxErrorCode.INVALID_INPUT, str(e))
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"reason": str(e)},
)
# We verify the password here to make sure it's valid before we proceed
await self.validate_password(
@@ -406,10 +417,13 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
# Check for disposable emails BEFORE provisioning tenant
# This prevents creating tenants for throwaway email addresses
try:
verify_email_domain(user_create.email, is_registration=True)
except OnyxError as e:
verify_email_domain(user_create.email)
except HTTPException as e:
# Log blocked disposable email attempts
if "Disposable email" in e.detail:
if (
e.status_code == status.HTTP_400_BAD_REQUEST
and "Disposable email" in str(e.detail)
):
domain = (
user_create.email.split("@")[-1]
if "@" in user_create.email
@@ -553,9 +567,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
result = await db_session.execute(
select(Persona.id)
.where(
Persona.is_featured.is_(True),
Persona.featured.is_(True),
Persona.is_public.is_(True),
Persona.is_listed.is_(True),
Persona.is_visible.is_(True),
Persona.deleted.is_(False),
)
.order_by(
@@ -683,8 +697,6 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
raise exceptions.UserNotExists()
except exceptions.UserNotExists:
verify_email_domain(account_email, is_registration=True)
# Check seat availability before creating (single-tenant only)
with get_session_with_current_tenant() as sync_db:
enforce_seat_limit(sync_db)
@@ -783,12 +795,6 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
logger.exception("Error deleting anonymous user cookie")
tenant_id = CURRENT_TENANT_ID_CONTEXTVAR.get()
# Link the anonymous PostHog session to the identified user so that
# pre-login session recordings and events merge into one person profile.
if anon_id := mt_cloud_get_anon_id(request):
mt_cloud_alias(distinct_id=str(user.id), anonymous_id=anon_id)
mt_cloud_identify(
distinct_id=str(user.id),
properties={"email": user.email, "tenant_id": tenant_id},
@@ -812,11 +818,6 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
user_count = await get_user_count()
logger.debug(f"Current tenant user count: {user_count}")
# Link the anonymous PostHog session to the identified user so
# that pre-signup session recordings merge into one person profile.
if anon_id := mt_cloud_get_anon_id(request):
mt_cloud_alias(distinct_id=str(user.id), anonymous_id=anon_id)
# Ensure a PostHog person profile exists for this user.
mt_cloud_identify(
distinct_id=str(user.id),
@@ -845,9 +846,9 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
attribute="get_marketing_posthog_cookie_name",
noop_return_value=None,
)
parse_posthog_cookie = fetch_ee_implementation_or_noop(
parse_marketing_cookie = fetch_ee_implementation_or_noop(
module="onyx.utils.posthog_client",
attribute="parse_posthog_cookie",
attribute="parse_marketing_cookie",
noop_return_value=None,
)
capture_and_sync_with_alternate_posthog = fetch_ee_implementation_or_noop(
@@ -861,7 +862,7 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
and user_count is not None
and (marketing_cookie_name := get_marketing_posthog_cookie_name())
and (marketing_cookie_value := request.cookies.get(marketing_cookie_name))
and (parsed_cookie := parse_posthog_cookie(marketing_cookie_value))
and (parsed_cookie := parse_marketing_cookie(marketing_cookie_value))
):
marketing_anonymous_id = parsed_cookie["distinct_id"]

View File

@@ -474,11 +474,18 @@ def handle_stream_message_objects(
db_session=db_session,
)
yield CreateChatSessionID(chat_session_id=chat_session.id)
chat_session = get_chat_session_by_id(
chat_session_id=chat_session.id,
user_id=user_id,
db_session=db_session,
eager_load_persona=True,
)
else:
chat_session = get_chat_session_by_id(
chat_session_id=new_msg_req.chat_session_id,
user_id=user_id,
db_session=db_session,
eager_load_persona=True,
)
persona = chat_session.persona

View File

@@ -787,10 +787,6 @@ MINI_CHUNK_SIZE = 150
# This is the number of regular chunks per large chunk
LARGE_CHUNK_RATIO = 4
# The maximum number of chunks that can be held for 1 document processing batch
# The purpose of this is to set an upper bound on memory usage
MAX_CHUNKS_PER_DOC_BATCH = int(os.environ.get("MAX_CHUNKS_PER_DOC_BATCH") or 1000)
# Include the document level metadata in each chunk. If the metadata is too long, then it is thrown out
# We don't want the metadata to overwhelm the actual contents of the chunk
SKIP_METADATA_IN_CHUNK = os.environ.get("SKIP_METADATA_IN_CHUNK", "").lower() == "true"

View File

@@ -88,9 +88,8 @@ WEB_CONNECTOR_MAX_SCROLL_ATTEMPTS = 20
IFRAME_TEXT_LENGTH_THRESHOLD = 700
# Message indicating JavaScript is disabled, which often appears when scraping fails
JAVASCRIPT_DISABLED_MESSAGE = "You have JavaScript disabled in your browser"
# Grace period after page navigation to allow bot-detection challenges
# and SPA content rendering to complete
PAGE_RENDER_TIMEOUT_MS = 5000
# Grace period after page navigation to allow bot-detection challenges to complete
BOT_DETECTION_GRACE_PERIOD_MS = 5000
# Define common headers that mimic a real browser
DEFAULT_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
@@ -548,15 +547,7 @@ class WebConnector(LoadConnector):
)
# Give the page a moment to start rendering after navigation commits.
# Allows CloudFlare and other bot-detection challenges to complete.
page.wait_for_timeout(PAGE_RENDER_TIMEOUT_MS)
# Wait for network activity to settle so SPAs that fetch content
# asynchronously after the initial JS bundle have time to render.
try:
# A bit of extra time to account for long-polling, websockets, etc.
page.wait_for_load_state("networkidle", timeout=PAGE_RENDER_TIMEOUT_MS)
except TimeoutError:
pass
page.wait_for_timeout(BOT_DETECTION_GRACE_PERIOD_MS)
last_modified = (
page_response.header_value("Last-Modified") if page_response else None
@@ -585,7 +576,7 @@ class WebConnector(LoadConnector):
# (e.g., CloudFlare protection keeps making requests)
try:
page.wait_for_load_state(
"networkidle", timeout=PAGE_RENDER_TIMEOUT_MS
"networkidle", timeout=BOT_DETECTION_GRACE_PERIOD_MS
)
except TimeoutError:
# If networkidle times out, just give it a moment for content to render

View File

@@ -28,6 +28,7 @@ from onyx.db.models import ChatMessage
from onyx.db.models import ChatMessage__SearchDoc
from onyx.db.models import ChatSession
from onyx.db.models import ChatSessionSharedStatus
from onyx.db.models import Persona
from onyx.db.models import SearchDoc as DBSearchDoc
from onyx.db.models import ToolCall
from onyx.db.models import User
@@ -53,9 +54,17 @@ def get_chat_session_by_id(
db_session: Session,
include_deleted: bool = False,
is_shared: bool = False,
eager_load_persona: bool = False,
) -> ChatSession:
stmt = select(ChatSession).where(ChatSession.id == chat_session_id)
if eager_load_persona:
stmt = stmt.options(
selectinload(ChatSession.persona).selectinload(Persona.tools),
selectinload(ChatSession.persona).selectinload(Persona.user_files),
selectinload(ChatSession.project),
)
if is_shared:
stmt = stmt.where(ChatSession.shared_status == ChatSessionSharedStatus.PUBLIC)
else:

View File

@@ -583,67 +583,6 @@ def get_latest_index_attempt_for_cc_pair_id(
return db_session.execute(stmt).scalar_one_or_none()
def get_latest_successful_index_attempt_for_cc_pair_id(
db_session: Session,
connector_credential_pair_id: int,
secondary_index: bool = False,
) -> IndexAttempt | None:
"""Returns the most recent successful index attempt for the given cc pair,
filtered to the current (or future) search settings.
Uses MAX(id) semantics to match get_latest_index_attempts_by_status."""
status = IndexModelStatus.FUTURE if secondary_index else IndexModelStatus.PRESENT
stmt = (
select(IndexAttempt)
.where(
IndexAttempt.connector_credential_pair_id == connector_credential_pair_id,
IndexAttempt.status.in_(
[IndexingStatus.SUCCESS, IndexingStatus.COMPLETED_WITH_ERRORS]
),
)
.join(SearchSettings)
.where(SearchSettings.status == status)
.order_by(desc(IndexAttempt.id))
.limit(1)
)
return db_session.execute(stmt).scalar_one_or_none()
def get_latest_successful_index_attempts_parallel(
secondary_index: bool = False,
) -> Sequence[IndexAttempt]:
"""Batch version: returns the latest successful index attempt per cc pair.
Covers both SUCCESS and COMPLETED_WITH_ERRORS (matching is_successful())."""
model_status = (
IndexModelStatus.FUTURE if secondary_index else IndexModelStatus.PRESENT
)
with get_session_with_current_tenant() as db_session:
latest_ids = (
select(
IndexAttempt.connector_credential_pair_id,
func.max(IndexAttempt.id).label("max_id"),
)
.join(SearchSettings, IndexAttempt.search_settings_id == SearchSettings.id)
.where(
SearchSettings.status == model_status,
IndexAttempt.status.in_(
[IndexingStatus.SUCCESS, IndexingStatus.COMPLETED_WITH_ERRORS]
),
)
.group_by(IndexAttempt.connector_credential_pair_id)
.subquery()
)
stmt = select(IndexAttempt).join(
latest_ids,
(
IndexAttempt.connector_credential_pair_id
== latest_ids.c.connector_credential_pair_id
)
& (IndexAttempt.id == latest_ids.c.max_id),
)
return db_session.execute(stmt).scalars().all()
def count_index_attempts_for_cc_pair(
db_session: Session,
cc_pair_id: int,

View File

@@ -3467,9 +3467,9 @@ class Persona(Base):
builtin_persona: Mapped[bool] = mapped_column(Boolean, default=False)
# Featured personas are highlighted in the UI
is_featured: Mapped[bool] = mapped_column(Boolean, default=False)
# controls whether the persona is listed in user-facing agent lists
is_listed: Mapped[bool] = mapped_column(Boolean, default=True)
featured: Mapped[bool] = mapped_column(Boolean, default=False)
# controls whether the persona is available to be selected by users
is_visible: Mapped[bool] = mapped_column(Boolean, default=True)
# controls the ordering of personas in the UI
# higher priority personas are displayed first, ties are resolved by the ID,
# where lower value IDs (e.g. created earlier) are displayed first

View File

@@ -126,7 +126,7 @@ def _add_user_filters(
else:
# Group the public persona conditions
public_condition = (Persona.is_public == True) & ( # noqa: E712
Persona.is_listed == True # noqa: E712
Persona.is_visible == True # noqa: E712
)
where_clause |= public_condition
@@ -260,7 +260,7 @@ def create_update_persona(
try:
# Featured persona validation
if create_persona_request.is_featured:
if create_persona_request.featured:
# Curators can edit featured personas, but not make them
# TODO this will be reworked soon with RBAC permissions feature
if user.role == UserRole.CURATOR or user.role == UserRole.GLOBAL_CURATOR:
@@ -300,7 +300,7 @@ def create_update_persona(
remove_image=create_persona_request.remove_image,
search_start_date=create_persona_request.search_start_date,
label_ids=create_persona_request.label_ids,
is_featured=create_persona_request.is_featured,
featured=create_persona_request.featured,
user_file_ids=converted_user_file_ids,
commit=False,
hierarchy_node_ids=create_persona_request.hierarchy_node_ids,
@@ -910,11 +910,11 @@ def upsert_persona(
uploaded_image_id: str | None = None,
icon_name: str | None = None,
display_priority: int | None = None,
is_listed: bool = True,
is_visible: bool = True,
remove_image: bool | None = None,
search_start_date: datetime | None = None,
builtin_persona: bool = False,
is_featured: bool | None = None,
featured: bool | None = None,
label_ids: list[int] | None = None,
user_file_ids: list[UUID] | None = None,
hierarchy_node_ids: list[int] | None = None,
@@ -1037,13 +1037,13 @@ def upsert_persona(
if remove_image or uploaded_image_id:
existing_persona.uploaded_image_id = uploaded_image_id
existing_persona.icon_name = icon_name
existing_persona.is_listed = is_listed
existing_persona.is_visible = is_visible
existing_persona.search_start_date = search_start_date
if label_ids is not None:
existing_persona.labels.clear()
existing_persona.labels = labels or []
existing_persona.is_featured = (
is_featured if is_featured is not None else existing_persona.is_featured
existing_persona.featured = (
featured if featured is not None else existing_persona.featured
)
# Update embedded prompt fields if provided
if system_prompt is not None:
@@ -1109,9 +1109,9 @@ def upsert_persona(
uploaded_image_id=uploaded_image_id,
icon_name=icon_name,
display_priority=display_priority,
is_listed=is_listed,
is_visible=is_visible,
search_start_date=search_start_date,
is_featured=(is_featured if is_featured is not None else False),
featured=(featured if featured is not None else False),
user_files=user_files or [],
labels=labels or [],
hierarchy_nodes=hierarchy_nodes or [],
@@ -1158,7 +1158,7 @@ def delete_old_default_personas(
def update_persona_featured(
persona_id: int,
is_featured: bool,
featured: bool,
db_session: Session,
user: User,
) -> None:
@@ -1166,13 +1166,13 @@ def update_persona_featured(
db_session=db_session, persona_id=persona_id, user=user, get_editable=True
)
persona.is_featured = is_featured
persona.featured = featured
db_session.commit()
def update_persona_visibility(
persona_id: int,
is_listed: bool,
is_visible: bool,
db_session: Session,
user: User,
) -> None:
@@ -1180,7 +1180,7 @@ def update_persona_visibility(
db_session=db_session, persona_id=persona_id, user=user, get_editable=True
)
persona.is_listed = is_listed
persona.is_visible = is_visible
db_session.commit()

View File

@@ -75,7 +75,7 @@ def create_slack_channel_persona(
llm_model_version_override=None,
starter_messages=None,
is_public=True,
is_featured=False,
featured=False,
db_session=db_session,
commit=False,
)

View File

@@ -5,7 +5,6 @@ accidentally reaches the vector DB layer will fail loudly instead of timing
out against a nonexistent Vespa/OpenSearch instance.
"""
from collections.abc import Iterable
from typing import Any
from onyx.context.search.models import IndexFilters
@@ -67,7 +66,7 @@ class DisabledDocumentIndex(DocumentIndex):
# ------------------------------------------------------------------
def index(
self,
chunks: Iterable[DocMetadataAwareIndexChunk], # noqa: ARG002
chunks: list[DocMetadataAwareIndexChunk], # noqa: ARG002
index_batch_params: IndexBatchParams, # noqa: ARG002
) -> set[DocumentInsertionRecord]:
raise RuntimeError(VECTOR_DB_DISABLED_ERROR)

View File

@@ -1,5 +1,4 @@
import abc
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime
from typing import Any
@@ -207,7 +206,7 @@ class Indexable(abc.ABC):
@abc.abstractmethod
def index(
self,
chunks: Iterable[DocMetadataAwareIndexChunk],
chunks: list[DocMetadataAwareIndexChunk],
index_batch_params: IndexBatchParams,
) -> set[DocumentInsertionRecord]:
"""
@@ -227,8 +226,8 @@ class Indexable(abc.ABC):
it is done automatically outside of this code.
Parameters:
- chunks: Document chunks with all of the information needed for
indexing to the document index.
- chunks: Document chunks with all of the information needed for indexing to the document
index.
- tenant_id: The tenant id of the user whose chunks are being indexed
- large_chunks_enabled: Whether large chunks are enabled

View File

@@ -1,5 +1,4 @@
import abc
from collections.abc import Iterable
from typing import Self
from pydantic import BaseModel
@@ -210,10 +209,10 @@ class Indexable(abc.ABC):
@abc.abstractmethod
def index(
self,
chunks: Iterable[DocMetadataAwareIndexChunk],
chunks: list[DocMetadataAwareIndexChunk],
indexing_metadata: IndexingMetadata,
) -> list[DocumentInsertionRecord]:
"""Indexes an iterable of document chunks into the document index.
"""Indexes a list of document chunks into the document index.
This is often a batch operation including chunks from multiple
documents.

View File

@@ -1,12 +1,11 @@
import json
from collections.abc import Iterable
from collections import defaultdict
from typing import Any
import httpx
from opensearchpy import NotFoundError
from onyx.access.models import DocumentAccess
from onyx.configs.app_configs import MAX_CHUNKS_PER_DOC_BATCH
from onyx.configs.app_configs import VERIFY_CREATE_OPENSEARCH_INDEX_ON_INIT_MT
from onyx.configs.chat_configs import NUM_RETURNED_HITS
from onyx.configs.chat_configs import TITLE_CONTENT_RATIO
@@ -351,7 +350,7 @@ class OpenSearchOldDocumentIndex(OldDocumentIndex):
def index(
self,
chunks: Iterable[DocMetadataAwareIndexChunk],
chunks: list[DocMetadataAwareIndexChunk],
index_batch_params: IndexBatchParams,
) -> set[OldDocumentInsertionRecord]:
"""
@@ -647,10 +646,10 @@ class OpenSearchDocumentIndex(DocumentIndex):
def index(
self,
chunks: Iterable[DocMetadataAwareIndexChunk],
indexing_metadata: IndexingMetadata,
chunks: list[DocMetadataAwareIndexChunk],
indexing_metadata: IndexingMetadata, # noqa: ARG002
) -> list[DocumentInsertionRecord]:
"""Indexes an iterable of document chunks into the document index.
"""Indexes a list of document chunks into the document index.
Groups chunks by document ID and for each document, deletes existing
chunks and indexes the new chunks in bulk.
@@ -673,34 +672,29 @@ class OpenSearchDocumentIndex(DocumentIndex):
document is newly indexed or had already existed and was just
updated.
"""
total_chunks = sum(
cc.new_chunk_cnt
for cc in indexing_metadata.doc_id_to_chunk_cnt_diff.values()
# Group chunks by document ID.
doc_id_to_chunks: dict[str, list[DocMetadataAwareIndexChunk]] = defaultdict(
list
)
for chunk in chunks:
doc_id_to_chunks[chunk.source_document.id].append(chunk)
logger.debug(
f"[OpenSearchDocumentIndex] Indexing {total_chunks} chunks from {len(indexing_metadata.doc_id_to_chunk_cnt_diff)} "
f"[OpenSearchDocumentIndex] Indexing {len(chunks)} chunks from {len(doc_id_to_chunks)} "
f"documents for index {self._index_name}."
)
document_indexing_results: list[DocumentInsertionRecord] = []
deleted_doc_ids: set[str] = set()
# Buffer chunks per document as they arrive from the iterable.
# When the document ID changes flush the buffered chunks.
current_doc_id: str | None = None
current_chunks: list[DocMetadataAwareIndexChunk] = []
def _flush_chunks(doc_chunks: list[DocMetadataAwareIndexChunk]) -> None:
assert len(doc_chunks) > 0, "doc_chunks is empty"
# Try to index per-document.
for _, chunks in doc_id_to_chunks.items():
# Create a batch of OpenSearch-formatted chunks for bulk insertion.
# Since we are doing this in batches, an error occurring midway
# can result in a state where chunks are deleted and not all the
# new chunks have been indexed.
# Do this before deleting existing chunks to reduce the amount of
# time the document index has no content for a given document, and
# to reduce the chance of entering a state where we delete chunks,
# then some error happens, and never successfully index new chunks.
chunk_batch: list[DocumentChunk] = [
_convert_onyx_chunk_to_opensearch_document(chunk)
for chunk in doc_chunks
_convert_onyx_chunk_to_opensearch_document(chunk) for chunk in chunks
]
onyx_document: Document = doc_chunks[0].source_document
onyx_document: Document = chunks[0].source_document
# First delete the doc's chunks from the index. This is so that
# there are no dangling chunks in the index, in the event that the
# new document's content contains fewer chunks than the previous
@@ -709,43 +703,22 @@ class OpenSearchDocumentIndex(DocumentIndex):
# if the chunk count has actually decreased. This assumes that
# overlapping chunks are perfectly overwritten. If we can't
# guarantee that then we need the code as-is.
if onyx_document.id not in deleted_doc_ids:
num_chunks_deleted = self.delete(
onyx_document.id, onyx_document.chunk_count
)
deleted_doc_ids.add(onyx_document.id)
# If we see that chunks were deleted we assume the doc already
# existed. We record the result before bulk_index_documents
# runs. If indexing raises, this entire result list is discarded
# by the caller's retry logic, so early recording is safe.
document_indexing_results.append(
DocumentInsertionRecord(
document_id=onyx_document.id,
already_existed=num_chunks_deleted > 0,
)
)
num_chunks_deleted = self.delete(
onyx_document.id, onyx_document.chunk_count
)
# If we see that chunks were deleted we assume the doc already
# existed.
document_insertion_record = DocumentInsertionRecord(
document_id=onyx_document.id,
already_existed=num_chunks_deleted > 0,
)
# Now index. This will raise if a chunk of the same ID exists, which
# we do not expect because we should have deleted all chunks.
self._client.bulk_index_documents(
documents=chunk_batch,
tenant_state=self._tenant_state,
)
for chunk in chunks:
doc_id = chunk.source_document.id
if doc_id != current_doc_id:
if current_chunks:
_flush_chunks(current_chunks)
current_doc_id = doc_id
current_chunks = [chunk]
elif len(current_chunks) >= MAX_CHUNKS_PER_DOC_BATCH:
_flush_chunks(current_chunks)
current_chunks = [chunk]
else:
current_chunks.append(chunk)
if current_chunks:
_flush_chunks(current_chunks)
document_indexing_results.append(document_insertion_record)
return document_indexing_results

View File

@@ -6,7 +6,6 @@ import re
import time
import urllib
import zipfile
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime
from datetime import timedelta
@@ -462,7 +461,7 @@ class VespaIndex(DocumentIndex):
def index(
self,
chunks: Iterable[DocMetadataAwareIndexChunk],
chunks: list[DocMetadataAwareIndexChunk],
index_batch_params: IndexBatchParams,
) -> set[OldDocumentInsertionRecord]:
"""

View File

@@ -1,8 +1,6 @@
import concurrent.futures
import logging
import random
from collections.abc import Generator
from collections.abc import Iterable
from typing import Any
from uuid import UUID
@@ -10,7 +8,6 @@ import httpx
from pydantic import BaseModel
from retry import retry
from onyx.configs.app_configs import MAX_CHUNKS_PER_DOC_BATCH
from onyx.configs.app_configs import RECENCY_BIAS_MULTIPLIER
from onyx.configs.app_configs import RERANK_COUNT
from onyx.configs.chat_configs import DOC_TIME_DECAY
@@ -321,7 +318,7 @@ class VespaDocumentIndex(DocumentIndex):
def index(
self,
chunks: Iterable[DocMetadataAwareIndexChunk],
chunks: list[DocMetadataAwareIndexChunk],
indexing_metadata: IndexingMetadata,
) -> list[DocumentInsertionRecord]:
doc_id_to_chunk_cnt_diff = indexing_metadata.doc_id_to_chunk_cnt_diff
@@ -341,31 +338,22 @@ class VespaDocumentIndex(DocumentIndex):
# Vespa has restrictions on valid characters, yet document IDs come from
# external w.r.t. this class. We need to sanitize them.
#
# Instead of materializing all cleaned chunks upfront, we stream them
# through a generator that cleans IDs and builds the original-ID mapping
# incrementally as chunks flow into Vespa.
def _clean_and_track(
chunks_iter: Iterable[DocMetadataAwareIndexChunk],
id_map: dict[str, str],
seen_ids: set[str],
) -> Generator[DocMetadataAwareIndexChunk, None, None]:
"""Cleans chunk IDs and builds the original-ID mapping
incrementally as chunks flow through, avoiding a separate
materialization pass."""
for chunk in chunks_iter:
original_id = chunk.source_document.id
cleaned = clean_chunk_id_copy(chunk)
cleaned_id = cleaned.source_document.id
# Needed so the final DocumentInsertionRecord returned can have
# the original document ID. cleaned_chunks might not contain IDs
# exactly as callers supplied them.
id_map[cleaned_id] = original_id
seen_ids.add(cleaned_id)
yield cleaned
cleaned_chunks: list[DocMetadataAwareIndexChunk] = [
clean_chunk_id_copy(chunk) for chunk in chunks
]
assert len(cleaned_chunks) == len(
chunks
), "Bug: Cleaned chunks and input chunks have different lengths."
new_document_id_to_original_document_id: dict[str, str] = {}
all_cleaned_doc_ids: set[str] = set()
# Needed so the final DocumentInsertionRecord returned can have the
# original document ID. cleaned_chunks might not contain IDs exactly as
# callers supplied them.
new_document_id_to_original_document_id: dict[str, str] = dict()
for i, cleaned_chunk in enumerate(cleaned_chunks):
old_chunk = chunks[i]
new_document_id_to_original_document_id[
cleaned_chunk.source_document.id
] = old_chunk.source_document.id
existing_docs: set[str] = set()
@@ -421,16 +409,8 @@ class VespaDocumentIndex(DocumentIndex):
executor=executor,
)
# Insert new Vespa documents, streaming through the cleaning
# pipeline so chunks are never fully materialized.
cleaned_chunks = _clean_and_track(
chunks,
new_document_id_to_original_document_id,
all_cleaned_doc_ids,
)
for chunk_batch in batch_generator(
cleaned_chunks, min(BATCH_SIZE, MAX_CHUNKS_PER_DOC_BATCH)
):
# Insert new Vespa documents.
for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE):
batch_index_vespa_chunks(
chunks=chunk_batch,
index_name=self._index_name,
@@ -439,6 +419,10 @@ class VespaDocumentIndex(DocumentIndex):
executor=executor,
)
all_cleaned_doc_ids: set[str] = {
chunk.source_document.id for chunk in cleaned_chunks
}
return [
DocumentInsertionRecord(
document_id=new_document_id_to_original_document_id[cleaned_doc_id],

View File

@@ -1,5 +1,3 @@
from __future__ import annotations
import contextlib
from collections.abc import Generator
@@ -21,8 +19,7 @@ from onyx.db.document import update_docs_updated_at__no_commit
from onyx.db.document_set import fetch_document_sets_for_documents
from onyx.indexing.indexing_pipeline import DocumentBatchPrepareContext
from onyx.indexing.indexing_pipeline import index_doc_batch_prepare
from onyx.indexing.models import ChunkEnrichmentContext
from onyx.indexing.models import DocAwareChunk
from onyx.indexing.models import BuildMetadataAwareChunksResult
from onyx.indexing.models import DocMetadataAwareIndexChunk
from onyx.indexing.models import IndexChunk
from onyx.indexing.models import UpdatableChunkData
@@ -88,21 +85,14 @@ class DocumentIndexingBatchAdapter:
) as transaction:
yield transaction
def prepare_enrichment(
def build_metadata_aware_chunks(
self,
context: DocumentBatchPrepareContext,
chunks_with_embeddings: list[IndexChunk],
chunk_content_scores: list[float],
tenant_id: str,
chunks: list[DocAwareChunk],
) -> DocumentChunkEnricher:
"""Do all DB lookups once and return a per-chunk enricher."""
updatable_ids = [doc.id for doc in context.updatable_docs]
doc_id_to_new_chunk_cnt: dict[str, int] = {
doc_id: 0 for doc_id in updatable_ids
}
for chunk in chunks:
if chunk.source_document.id in doc_id_to_new_chunk_cnt:
doc_id_to_new_chunk_cnt[chunk.source_document.id] += 1
context: DocumentBatchPrepareContext,
) -> BuildMetadataAwareChunksResult:
"""Enrich chunks with access, document sets, boosts, token counts, and hierarchy."""
no_access = DocumentAccess.build(
user_emails=[],
@@ -112,30 +102,67 @@ class DocumentIndexingBatchAdapter:
is_public=False,
)
return DocumentChunkEnricher(
doc_id_to_access_info=get_access_for_documents(
updatable_ids = [doc.id for doc in context.updatable_docs]
doc_id_to_access_info = get_access_for_documents(
document_ids=updatable_ids, db_session=self.db_session
)
doc_id_to_document_set = {
document_id: document_sets
for document_id, document_sets in fetch_document_sets_for_documents(
document_ids=updatable_ids, db_session=self.db_session
),
doc_id_to_document_set={
document_id: document_sets
for document_id, document_sets in fetch_document_sets_for_documents(
document_ids=updatable_ids, db_session=self.db_session
)
},
doc_id_to_ancestor_ids=self._get_ancestor_ids_for_documents(
context.updatable_docs, tenant_id
),
id_to_boost_map=context.id_to_boost_map,
doc_id_to_previous_chunk_cnt={
document_id: chunk_count
for document_id, chunk_count in fetch_chunk_counts_for_documents(
document_ids=updatable_ids,
db_session=self.db_session,
)
},
doc_id_to_new_chunk_cnt=dict(doc_id_to_new_chunk_cnt),
no_access=no_access,
tenant_id=tenant_id,
)
}
doc_id_to_previous_chunk_cnt: dict[str, int] = {
document_id: chunk_count
for document_id, chunk_count in fetch_chunk_counts_for_documents(
document_ids=updatable_ids,
db_session=self.db_session,
)
}
doc_id_to_new_chunk_cnt: dict[str, int] = {
doc_id: 0 for doc_id in updatable_ids
}
for chunk in chunks_with_embeddings:
if chunk.source_document.id in doc_id_to_new_chunk_cnt:
doc_id_to_new_chunk_cnt[chunk.source_document.id] += 1
# Get ancestor hierarchy node IDs for each document
doc_id_to_ancestor_ids = self._get_ancestor_ids_for_documents(
context.updatable_docs, tenant_id
)
access_aware_chunks = [
DocMetadataAwareIndexChunk.from_index_chunk(
index_chunk=chunk,
access=doc_id_to_access_info.get(chunk.source_document.id, no_access),
document_sets=set(
doc_id_to_document_set.get(chunk.source_document.id, [])
),
user_project=[],
personas=[],
boost=(
context.id_to_boost_map[chunk.source_document.id]
if chunk.source_document.id in context.id_to_boost_map
else DEFAULT_BOOST
),
tenant_id=tenant_id,
aggregated_chunk_boost_factor=chunk_content_scores[chunk_num],
ancestor_hierarchy_node_ids=doc_id_to_ancestor_ids[
chunk.source_document.id
],
)
for chunk_num, chunk in enumerate(chunks_with_embeddings)
]
return BuildMetadataAwareChunksResult(
chunks=access_aware_chunks,
doc_id_to_previous_chunk_cnt=doc_id_to_previous_chunk_cnt,
doc_id_to_new_chunk_cnt=doc_id_to_new_chunk_cnt,
user_file_id_to_raw_text={},
user_file_id_to_token_count={},
)
def _get_ancestor_ids_for_documents(
@@ -176,7 +203,7 @@ class DocumentIndexingBatchAdapter:
context: DocumentBatchPrepareContext,
updatable_chunk_data: list[UpdatableChunkData],
filtered_documents: list[Document],
enrichment: ChunkEnrichmentContext,
result: BuildMetadataAwareChunksResult,
) -> None:
"""Finalize DB updates, store plaintext, and mark docs as indexed."""
updatable_ids = [doc.id for doc in context.updatable_docs]
@@ -200,7 +227,7 @@ class DocumentIndexingBatchAdapter:
update_docs_chunk_count__no_commit(
document_ids=updatable_ids,
doc_id_to_chunk_count=enrichment.doc_id_to_new_chunk_cnt,
doc_id_to_chunk_count=result.doc_id_to_new_chunk_cnt,
db_session=self.db_session,
)
@@ -222,52 +249,3 @@ class DocumentIndexingBatchAdapter:
)
self.db_session.commit()
class DocumentChunkEnricher:
"""Pre-computed metadata for per-chunk enrichment of connector documents."""
def __init__(
self,
doc_id_to_access_info: dict[str, DocumentAccess],
doc_id_to_document_set: dict[str, list[str]],
doc_id_to_ancestor_ids: dict[str, list[int]],
id_to_boost_map: dict[str, int],
doc_id_to_previous_chunk_cnt: dict[str, int],
doc_id_to_new_chunk_cnt: dict[str, int],
no_access: DocumentAccess,
tenant_id: str,
) -> None:
self._doc_id_to_access_info = doc_id_to_access_info
self._doc_id_to_document_set = doc_id_to_document_set
self._doc_id_to_ancestor_ids = doc_id_to_ancestor_ids
self._id_to_boost_map = id_to_boost_map
self._no_access = no_access
self._tenant_id = tenant_id
self.doc_id_to_previous_chunk_cnt = doc_id_to_previous_chunk_cnt
self.doc_id_to_new_chunk_cnt = doc_id_to_new_chunk_cnt
def enrich_chunk(
self, chunk: IndexChunk, score: float
) -> DocMetadataAwareIndexChunk:
return DocMetadataAwareIndexChunk.from_index_chunk(
index_chunk=chunk,
access=self._doc_id_to_access_info.get(
chunk.source_document.id, self._no_access
),
document_sets=set(
self._doc_id_to_document_set.get(chunk.source_document.id, [])
),
user_project=[],
personas=[],
boost=(
self._id_to_boost_map[chunk.source_document.id]
if chunk.source_document.id in self._id_to_boost_map
else DEFAULT_BOOST
),
tenant_id=self._tenant_id,
aggregated_chunk_boost_factor=score,
ancestor_hierarchy_node_ids=self._doc_id_to_ancestor_ids[
chunk.source_document.id
],
)

View File

@@ -1,9 +1,6 @@
from __future__ import annotations
import contextlib
import datetime
import time
from collections import defaultdict
from collections.abc import Generator
from uuid import UUID
@@ -27,8 +24,7 @@ from onyx.db.user_file import fetch_persona_ids_for_user_files
from onyx.db.user_file import fetch_user_project_ids_for_user_files
from onyx.file_store.utils import store_user_file_plaintext
from onyx.indexing.indexing_pipeline import DocumentBatchPrepareContext
from onyx.indexing.models import ChunkEnrichmentContext
from onyx.indexing.models import DocAwareChunk
from onyx.indexing.models import BuildMetadataAwareChunksResult
from onyx.indexing.models import DocMetadataAwareIndexChunk
from onyx.indexing.models import IndexChunk
from onyx.indexing.models import UpdatableChunkData
@@ -105,20 +101,13 @@ class UserFileIndexingAdapter:
f"Failed to acquire locks after {_NUM_LOCK_ATTEMPTS} attempts for user files: {[doc.id for doc in documents]}"
)
def prepare_enrichment(
def build_metadata_aware_chunks(
self,
context: DocumentBatchPrepareContext,
chunks_with_embeddings: list[IndexChunk],
chunk_content_scores: list[float],
tenant_id: str,
chunks: list[DocAwareChunk],
) -> UserFileChunkEnricher:
"""Do all DB lookups and pre-compute file metadata from chunks."""
updatable_ids = [doc.id for doc in context.updatable_docs]
doc_id_to_new_chunk_cnt: dict[str, int] = defaultdict(int)
content_by_file: dict[str, list[str]] = defaultdict(list)
for chunk in chunks:
doc_id_to_new_chunk_cnt[chunk.source_document.id] += 1
content_by_file[chunk.source_document.id].append(chunk.content)
context: DocumentBatchPrepareContext,
) -> BuildMetadataAwareChunksResult:
no_access = DocumentAccess.build(
user_emails=[],
@@ -128,6 +117,7 @@ class UserFileIndexingAdapter:
is_public=False,
)
updatable_ids = [doc.id for doc in context.updatable_docs]
user_file_id_to_project_ids = fetch_user_project_ids_for_user_files(
user_file_ids=updatable_ids,
db_session=self.db_session,
@@ -148,6 +138,17 @@ class UserFileIndexingAdapter:
)
}
user_file_id_to_new_chunk_cnt: dict[str, int] = {
user_file_id: len(
[
chunk
for chunk in chunks_with_embeddings
if chunk.source_document.id == user_file_id
]
)
for user_file_id in updatable_ids
}
# Initialize tokenizer used for token count calculation
try:
llm = get_default_llm()
@@ -162,9 +163,15 @@ class UserFileIndexingAdapter:
user_file_id_to_raw_text: dict[str, str] = {}
user_file_id_to_token_count: dict[str, int | None] = {}
for user_file_id in updatable_ids:
contents = content_by_file.get(user_file_id)
if contents:
combined_content = " ".join(contents)
user_file_chunks = [
chunk
for chunk in chunks_with_embeddings
if chunk.source_document.id == user_file_id
]
if user_file_chunks:
combined_content = " ".join(
[chunk.content for chunk in user_file_chunks]
)
user_file_id_to_raw_text[str(user_file_id)] = combined_content
token_count = (
len(llm_tokenizer.encode(combined_content)) if llm_tokenizer else 0
@@ -174,16 +181,28 @@ class UserFileIndexingAdapter:
user_file_id_to_raw_text[str(user_file_id)] = ""
user_file_id_to_token_count[str(user_file_id)] = None
return UserFileChunkEnricher(
user_file_id_to_access=user_file_id_to_access,
user_file_id_to_project_ids=user_file_id_to_project_ids,
user_file_id_to_persona_ids=user_file_id_to_persona_ids,
access_aware_chunks = [
DocMetadataAwareIndexChunk.from_index_chunk(
index_chunk=chunk,
access=user_file_id_to_access.get(chunk.source_document.id, no_access),
document_sets=set(),
user_project=user_file_id_to_project_ids.get(
chunk.source_document.id, []
),
personas=user_file_id_to_persona_ids.get(chunk.source_document.id, []),
boost=DEFAULT_BOOST,
tenant_id=tenant_id,
aggregated_chunk_boost_factor=chunk_content_scores[chunk_num],
)
for chunk_num, chunk in enumerate(chunks_with_embeddings)
]
return BuildMetadataAwareChunksResult(
chunks=access_aware_chunks,
doc_id_to_previous_chunk_cnt=user_file_id_to_previous_chunk_cnt,
doc_id_to_new_chunk_cnt=dict(doc_id_to_new_chunk_cnt),
doc_id_to_new_chunk_cnt=user_file_id_to_new_chunk_cnt,
user_file_id_to_raw_text=user_file_id_to_raw_text,
user_file_id_to_token_count=user_file_id_to_token_count,
no_access=no_access,
tenant_id=tenant_id,
)
def _notify_assistant_owners_if_files_ready(
@@ -227,9 +246,8 @@ class UserFileIndexingAdapter:
context: DocumentBatchPrepareContext,
updatable_chunk_data: list[UpdatableChunkData], # noqa: ARG002
filtered_documents: list[Document], # noqa: ARG002
enrichment: ChunkEnrichmentContext,
result: BuildMetadataAwareChunksResult,
) -> None:
assert isinstance(enrichment, UserFileChunkEnricher)
user_file_ids = [doc.id for doc in context.updatable_docs]
user_files = (
@@ -245,10 +263,8 @@ class UserFileIndexingAdapter:
user_file.last_project_sync_at = datetime.datetime.now(
datetime.timezone.utc
)
user_file.chunk_count = enrichment.doc_id_to_new_chunk_cnt.get(
str(user_file.id), 0
)
user_file.token_count = enrichment.user_file_id_to_token_count[
user_file.chunk_count = result.doc_id_to_new_chunk_cnt[str(user_file.id)]
user_file.token_count = result.user_file_id_to_token_count[
str(user_file.id)
]
@@ -260,54 +276,8 @@ class UserFileIndexingAdapter:
# Store the plaintext in the file store for faster retrieval
# NOTE: this creates its own session to avoid committing the overall
# transaction.
for user_file_id, raw_text in enrichment.user_file_id_to_raw_text.items():
for user_file_id, raw_text in result.user_file_id_to_raw_text.items():
store_user_file_plaintext(
user_file_id=UUID(user_file_id),
plaintext_content=raw_text,
)
class UserFileChunkEnricher:
"""Pre-computed metadata for per-chunk enrichment of user-uploaded files."""
def __init__(
self,
user_file_id_to_access: dict[str, DocumentAccess],
user_file_id_to_project_ids: dict[str, list[int]],
user_file_id_to_persona_ids: dict[str, list[int]],
doc_id_to_previous_chunk_cnt: dict[str, int],
doc_id_to_new_chunk_cnt: dict[str, int],
user_file_id_to_raw_text: dict[str, str],
user_file_id_to_token_count: dict[str, int | None],
no_access: DocumentAccess,
tenant_id: str,
) -> None:
self._user_file_id_to_access = user_file_id_to_access
self._user_file_id_to_project_ids = user_file_id_to_project_ids
self._user_file_id_to_persona_ids = user_file_id_to_persona_ids
self._no_access = no_access
self._tenant_id = tenant_id
self.doc_id_to_previous_chunk_cnt = doc_id_to_previous_chunk_cnt
self.doc_id_to_new_chunk_cnt = doc_id_to_new_chunk_cnt
self.user_file_id_to_raw_text = user_file_id_to_raw_text
self.user_file_id_to_token_count = user_file_id_to_token_count
def enrich_chunk(
self, chunk: IndexChunk, score: float
) -> DocMetadataAwareIndexChunk:
return DocMetadataAwareIndexChunk.from_index_chunk(
index_chunk=chunk,
access=self._user_file_id_to_access.get(
chunk.source_document.id, self._no_access
),
document_sets=set(),
user_project=self._user_file_id_to_project_ids.get(
chunk.source_document.id, []
),
personas=self._user_file_id_to_persona_ids.get(
chunk.source_document.id, []
),
boost=DEFAULT_BOOST,
tenant_id=self._tenant_id,
aggregated_chunk_boost_factor=score,
)

View File

@@ -1,11 +1,5 @@
import pickle
import tempfile
from collections import defaultdict
from collections.abc import Callable
from collections.abc import Generator
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import Protocol
from pydantic import BaseModel
@@ -15,7 +9,6 @@ from sqlalchemy.orm import Session
from onyx.configs.app_configs import DEFAULT_CONTEXTUAL_RAG_LLM_NAME
from onyx.configs.app_configs import DEFAULT_CONTEXTUAL_RAG_LLM_PROVIDER
from onyx.configs.app_configs import ENABLE_CONTEXTUAL_RAG
from onyx.configs.app_configs import MAX_CHUNKS_PER_DOC_BATCH
from onyx.configs.app_configs import MAX_DOCUMENT_CHARS
from onyx.configs.app_configs import MAX_TOKENS_FOR_FULL_INCLUSION
from onyx.configs.app_configs import USE_CHUNK_SUMMARY
@@ -54,8 +47,6 @@ from onyx.indexing.chunker import Chunker
from onyx.indexing.embedder import embed_chunks_with_failure_handling
from onyx.indexing.embedder import IndexingEmbedder
from onyx.indexing.models import DocAwareChunk
from onyx.indexing.models import DocMetadataAwareIndexChunk
from onyx.indexing.models import IndexChunk
from onyx.indexing.models import IndexingBatchAdapter
from onyx.indexing.models import UpdatableChunkData
from onyx.indexing.vector_db_insertion import write_chunks_to_vector_db_with_backoff
@@ -72,7 +63,6 @@ from onyx.natural_language_processing.utils import tokenizer_trim_middle
from onyx.prompts.contextual_retrieval import CONTEXTUAL_RAG_PROMPT1
from onyx.prompts.contextual_retrieval import CONTEXTUAL_RAG_PROMPT2
from onyx.prompts.contextual_retrieval import DOCUMENT_SUMMARY_PROMPT
from onyx.utils.batching import batch_generator
from onyx.utils.logger import setup_logger
from onyx.utils.postgres_sanitization import sanitize_documents_for_postgres
from onyx.utils.threadpool_concurrency import run_functions_tuples_in_parallel
@@ -101,21 +91,6 @@ class IndexingPipelineResult(BaseModel):
failures: list[ConnectorFailure]
@classmethod
def empty(cls, total_docs: int) -> "IndexingPipelineResult":
return cls(
new_docs=0,
total_docs=total_docs,
total_chunks=0,
failures=[],
)
class ChunkEmbeddingResult(BaseModel):
embedding_path: Path
successful_chunk_ids: list[tuple[int, str]] # (chunk_id, document_id)
connector_failures: list[ConnectorFailure]
class IndexingPipelineProtocol(Protocol):
def __call__(
@@ -164,105 +139,6 @@ def _upsert_documents_in_db(
)
def embed_chunks_in_batches(
chunks: list[DocAwareChunk],
embedder: IndexingEmbedder,
tenant_id: str,
request_id: str | None,
) -> ChunkEmbeddingResult:
"""Embeds chunks in batches of MAX_CHUNKS_PER_DOC_BATCH, spilling each batch to disk.
For each batch:
1. Embed the chunks via embed_chunks_with_failure_handling
2. Pickle the resulting IndexChunks to a temp file
3. Clear the batch from memory
Returns:
- Path to the temp directory containing one pickle file per batch
- Accumulated embedding failures across all batches
"""
tmpdir = Path(tempfile.mkdtemp(prefix="onyx_embeddings_"))
successful_chunk_ids: list[tuple[int, str]] = []
all_embedding_failures: list[ConnectorFailure] = []
for batch_idx, chunk_batch in enumerate(
batch_generator(chunks, MAX_CHUNKS_PER_DOC_BATCH)
):
logger.debug(f"Embedding batch {batch_idx}: {len(chunk_batch)} chunks")
chunks_with_embeddings, embedding_failures = embed_chunks_with_failure_handling(
chunks=chunk_batch,
embedder=embedder,
tenant_id=tenant_id,
request_id=request_id,
)
all_embedding_failures.extend(embedding_failures)
# Track which chunks succeeded by excluding failed doc IDs
failed_doc_ids = {
f.failed_document.document_id
for f in embedding_failures
if f.failed_document
}
successful_chunk_ids.extend(
(c.chunk_id, c.source_document.id)
for c in chunk_batch
if c.source_document.id not in failed_doc_ids
)
# Spill embeddings to disk
batch_file = tmpdir / f"batch_{batch_idx}.pkl"
with open(batch_file, "wb") as f:
pickle.dump(chunks_with_embeddings, f)
# Free memory
del chunks_with_embeddings
return ChunkEmbeddingResult(
embedding_path=tmpdir,
successful_chunk_ids=successful_chunk_ids,
connector_failures=all_embedding_failures,
)
class EmbedStream:
def __init__(self, tmpdir: Path) -> None:
self._tmpdir = tmpdir
def stream(self) -> Iterator[IndexChunk]:
for batch_file in sorted(
self._tmpdir.glob("batch_*.pkl"),
key=lambda p: int(p.stem.removeprefix("batch_")),
):
with open(batch_file, "rb") as f:
batch: list[IndexChunk] = pickle.load(f)
yield from batch
@contextmanager
def use_embed_stream(
tmpdir: Path,
) -> Generator[EmbedStream, None, None]:
"""Context manager that provides a factory for creating chunk iterators.
Each call to stream() returns a fresh generator over the embedded chunks
on disk, so the data can be iterated multiple times (e.g. once per
document_index). Files are cleaned up when the context manager exits.
Usage:
with use_embed_stream(embedding_path) as embed_stream:
for document_index in document_indices:
for chunk in embed_stream.stream():
...
"""
try:
yield EmbedStream(tmpdir)
finally:
for batch_file in tmpdir.glob("batch_*.pkl"):
batch_file.unlink(missing_ok=True)
tmpdir.rmdir()
def get_doc_ids_to_update(
documents: list[Document], db_docs: list[DBDocument]
) -> list[Document]:
@@ -761,29 +637,6 @@ def add_contextual_summaries(
return chunks
def _verify_indexing_completeness(
insertion_records: list[DocumentInsertionRecord],
write_failures: list[ConnectorFailure],
embedding_failed_doc_ids: set[str],
updatable_ids: list[str],
document_index_name: str,
) -> None:
"""Verify that every updatable document was either indexed or reported as failed."""
all_returned_doc_ids = (
{r.document_id for r in insertion_records}
| {f.failed_document.document_id for f in write_failures if f.failed_document}
| embedding_failed_doc_ids
)
if all_returned_doc_ids != set(updatable_ids):
raise RuntimeError(
f"Some documents were not successfully indexed. "
f"Updatable IDs: {updatable_ids}, "
f"Returned IDs: {all_returned_doc_ids}. "
f"This should never happen. "
f"This occured for document index {document_index_name}"
)
@log_function_time(debug_only=True)
def index_doc_batch(
*,
@@ -819,7 +672,12 @@ def index_doc_batch(
filtered_documents = filter_fnc(document_batch)
context = adapter.prepare(filtered_documents, ignore_time_skip)
if not context:
return IndexingPipelineResult.empty(len(filtered_documents))
return IndexingPipelineResult(
new_docs=0,
total_docs=len(filtered_documents),
total_chunks=0,
failures=[],
)
# Convert documents to IndexingDocument objects with processed section
# logger.debug("Processing image sections")
@@ -858,98 +716,117 @@ def index_doc_batch(
)
logger.debug("Starting embedding")
embedding_result = embed_chunks_in_batches(
chunks=chunks,
embedder=embedder,
tenant_id=tenant_id,
request_id=request_id,
chunks_with_embeddings, embedding_failures = (
embed_chunks_with_failure_handling(
chunks=chunks,
embedder=embedder,
tenant_id=tenant_id,
request_id=request_id,
)
if chunks
else ([], [])
)
chunk_content_scores = [1.0] * len(chunks_with_embeddings)
updatable_ids = [doc.id for doc in context.updatable_docs]
updatable_chunk_data = [
UpdatableChunkData(
chunk_id=chunk_id,
document_id=document_id,
boost_score=1.0,
chunk_id=chunk.chunk_id,
document_id=chunk.source_document.id,
boost_score=score,
)
for chunk_id, document_id in embedding_result.successful_chunk_ids
for chunk, score in zip(chunks_with_embeddings, chunk_content_scores)
]
# Acquires a lock on the documents so that no other process can modify them
# NOTE: don't need to acquire till here, since this is when the actual race condition
# with Vespa can occur.
with (
adapter.lock_context(context.updatable_docs),
use_embed_stream(embedding_result.embedding_path) as embed_stream,
):
enricher = adapter.prepare_enrichment(
with adapter.lock_context(context.updatable_docs):
# we're concerned about race conditions where multiple simultaneous indexings might result
# in one set of metadata overwriting another one in vespa.
# we still write data here for the immediate and most likely correct sync, but
# to resolve this, an update of the last modified field at the end of this loop
# always triggers a final metadata sync via the celery queue
result = adapter.build_metadata_aware_chunks(
chunks_with_embeddings=chunks_with_embeddings,
chunk_content_scores=chunk_content_scores,
tenant_id=tenant_id,
context=context,
tenant_id=tenant_id,
chunks=chunks,
)
index_batch_params = IndexBatchParams(
doc_id_to_previous_chunk_cnt=enricher.doc_id_to_previous_chunk_cnt,
doc_id_to_new_chunk_cnt=enricher.doc_id_to_new_chunk_cnt,
tenant_id=tenant_id,
large_chunks_enabled=chunker.enable_large_chunks,
)
embedding_failed_doc_ids = {
f.failed_document.document_id
for f in embedding_result.connector_failures
if f.failed_document
}
short_descriptor_list = [chunk.to_short_descriptor() for chunk in result.chunks]
short_descriptor_log = str(short_descriptor_list)[:1024]
logger.debug(f"Indexing the following chunks: {short_descriptor_log}")
primary_doc_idx_insertion_records: list[DocumentInsertionRecord] | None = None
primary_doc_idx_vector_db_write_failures: list[ConnectorFailure] | None = None
for document_index in document_indices:
# A document will not be spread across different batches, so all the
# documents with chunks in this set, are fully represented by the chunks
# in this set
def _enriched_stream() -> Iterator[DocMetadataAwareIndexChunk]:
for chunk in embed_stream.stream():
yield enricher.enrich_chunk(chunk, 1.0)
insertion_records, write_failures = write_chunks_to_vector_db_with_backoff(
(
insertion_records,
vector_db_write_failures,
) = write_chunks_to_vector_db_with_backoff(
document_index=document_index,
chunks=_enriched_stream(),
index_batch_params=index_batch_params,
chunks=result.chunks,
index_batch_params=IndexBatchParams(
doc_id_to_previous_chunk_cnt=result.doc_id_to_previous_chunk_cnt,
doc_id_to_new_chunk_cnt=result.doc_id_to_new_chunk_cnt,
tenant_id=tenant_id,
large_chunks_enabled=chunker.enable_large_chunks,
),
)
_verify_indexing_completeness(
insertion_records=insertion_records,
write_failures=write_failures,
embedding_failed_doc_ids=embedding_failed_doc_ids,
updatable_ids=updatable_ids,
document_index_name=document_index.__class__.__name__,
all_returned_doc_ids: set[str] = (
{record.document_id for record in insertion_records}
.union(
{
record.failed_document.document_id
for record in vector_db_write_failures
if record.failed_document
}
)
.union(
{
record.failed_document.document_id
for record in embedding_failures
if record.failed_document
}
)
)
if all_returned_doc_ids != set(updatable_ids):
raise RuntimeError(
f"Some documents were not successfully indexed. "
f"Updatable IDs: {updatable_ids}, "
f"Returned IDs: {all_returned_doc_ids}. "
"This should never happen."
f"This occured for document index {document_index.__class__.__name__}"
)
# We treat the first document index we got as the primary one used
# for reporting the state of indexing.
if primary_doc_idx_insertion_records is None:
primary_doc_idx_insertion_records = insertion_records
if primary_doc_idx_vector_db_write_failures is None:
primary_doc_idx_vector_db_write_failures = write_failures
primary_doc_idx_vector_db_write_failures = vector_db_write_failures
adapter.post_index(
context=context,
updatable_chunk_data=updatable_chunk_data,
filtered_documents=filtered_documents,
enrichment=enricher,
result=result,
)
assert primary_doc_idx_insertion_records is not None
assert primary_doc_idx_vector_db_write_failures is not None
return IndexingPipelineResult(
new_docs=sum(
1 for r in primary_doc_idx_insertion_records if not r.already_existed
new_docs=len(
[r for r in primary_doc_idx_insertion_records if not r.already_existed]
),
total_docs=len(filtered_documents),
total_chunks=len(embedding_result.successful_chunk_ids),
failures=primary_doc_idx_vector_db_write_failures
+ embedding_result.connector_failures,
total_chunks=len(chunks_with_embeddings),
failures=primary_doc_idx_vector_db_write_failures + embedding_failures,
)

View File

@@ -235,16 +235,12 @@ class UpdatableChunkData(BaseModel):
boost_score: float
class ChunkEnrichmentContext(Protocol):
"""Returned by prepare_enrichment. Holds pre-computed metadata lookups
and provides per-chunk enrichment."""
class BuildMetadataAwareChunksResult(BaseModel):
chunks: list[DocMetadataAwareIndexChunk]
doc_id_to_previous_chunk_cnt: dict[str, int]
doc_id_to_new_chunk_cnt: dict[str, int]
def enrich_chunk(
self, chunk: IndexChunk, score: float
) -> DocMetadataAwareIndexChunk: ...
user_file_id_to_raw_text: dict[str, str]
user_file_id_to_token_count: dict[str, int | None]
class IndexingBatchAdapter(Protocol):
@@ -258,24 +254,18 @@ class IndexingBatchAdapter(Protocol):
) -> Generator[TransactionalContext, None, None]:
"""Provide a transaction/row-lock context for critical updates."""
def prepare_enrichment(
def build_metadata_aware_chunks(
self,
context: "DocumentBatchPrepareContext",
chunks_with_embeddings: list[IndexChunk],
chunk_content_scores: list[float],
tenant_id: str,
chunks: list[DocAwareChunk],
) -> ChunkEnrichmentContext:
"""Prepare per-chunk enrichment data (access, document sets, boost, etc.).
Precondition: ``chunks`` have already been through the embedding step
(i.e. they are ``IndexChunk`` instances with populated embeddings,
passed here as the base ``DocAwareChunk`` type).
"""
...
context: "DocumentBatchPrepareContext",
) -> BuildMetadataAwareChunksResult: ...
def post_index(
self,
context: "DocumentBatchPrepareContext",
updatable_chunk_data: list[UpdatableChunkData],
filtered_documents: list[Document],
enrichment: ChunkEnrichmentContext,
result: BuildMetadataAwareChunksResult,
) -> None: ...

View File

@@ -43,9 +43,6 @@ from onyx.db.index_attempt import count_index_attempt_errors_for_cc_pair
from onyx.db.index_attempt import count_index_attempts_for_cc_pair
from onyx.db.index_attempt import get_index_attempt_errors_for_cc_pair
from onyx.db.index_attempt import get_latest_index_attempt_for_cc_pair_id
from onyx.db.index_attempt import (
get_latest_successful_index_attempt_for_cc_pair_id,
)
from onyx.db.index_attempt import get_paginated_index_attempts_for_cc_pair_id
from onyx.db.indexing_coordination import IndexingCoordination
from onyx.db.models import IndexAttempt
@@ -193,11 +190,6 @@ def get_cc_pair_full_info(
only_finished=False,
)
latest_successful_attempt = get_latest_successful_index_attempt_for_cc_pair_id(
db_session=db_session,
connector_credential_pair_id=cc_pair_id,
)
# Get latest permission sync attempt for status
latest_permission_sync_attempt = None
if cc_pair.access_type == AccessType.SYNC:
@@ -215,11 +207,6 @@ def get_cc_pair_full_info(
cc_pair_id=cc_pair_id,
),
last_index_attempt=latest_attempt,
last_successful_index_time=(
latest_successful_attempt.time_started
if latest_successful_attempt
else None
),
latest_deletion_attempt=get_deletion_attempt_snapshot(
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,

View File

@@ -3,7 +3,6 @@ import math
import mimetypes
import os
import zipfile
from datetime import datetime
from io import BytesIO
from typing import Any
from typing import cast
@@ -110,9 +109,6 @@ from onyx.db.federated import fetch_all_federated_connectors_parallel
from onyx.db.index_attempt import get_index_attempts_for_cc_pair
from onyx.db.index_attempt import get_latest_index_attempts_by_status
from onyx.db.index_attempt import get_latest_index_attempts_parallel
from onyx.db.index_attempt import (
get_latest_successful_index_attempts_parallel,
)
from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import FederatedConnector
from onyx.db.models import IndexAttempt
@@ -1162,26 +1158,21 @@ def get_connector_indexing_status(
),
(),
),
# Get most recent successful index attempts
(
lambda: get_latest_successful_index_attempts_parallel(
request.secondary_index,
),
(),
),
]
if user and user.role == UserRole.ADMIN:
# For Admin users, we already got all the cc pair in editable_cc_pairs
# its not needed to get them again
(
editable_cc_pairs,
federated_connectors,
latest_index_attempts,
latest_finished_index_attempts,
latest_successful_index_attempts,
) = run_functions_tuples_in_parallel(parallel_functions)
non_editable_cc_pairs = []
else:
parallel_functions.append(
# Get non-editable connector/credential pairs
(
lambda: get_connector_credential_pairs_for_user_parallel(
user, False, None, True, True, False, True, request.source
@@ -1195,7 +1186,6 @@ def get_connector_indexing_status(
federated_connectors,
latest_index_attempts,
latest_finished_index_attempts,
latest_successful_index_attempts,
non_editable_cc_pairs,
) = run_functions_tuples_in_parallel(parallel_functions)
@@ -1207,9 +1197,6 @@ def get_connector_indexing_status(
latest_finished_index_attempts = cast(
list[IndexAttempt], latest_finished_index_attempts
)
latest_successful_index_attempts = cast(
list[IndexAttempt], latest_successful_index_attempts
)
document_count_info = get_document_counts_for_all_cc_pairs(db_session)
@@ -1219,48 +1206,42 @@ def get_connector_indexing_status(
for connector_id, credential_id, cnt in document_count_info
}
def _attempt_lookup(
attempts: list[IndexAttempt],
) -> dict[int, IndexAttempt]:
return {attempt.connector_credential_pair_id: attempt for attempt in attempts}
cc_pair_to_latest_index_attempt: dict[tuple[int, int], IndexAttempt] = {
(
attempt.connector_credential_pair.connector_id,
attempt.connector_credential_pair.credential_id,
): attempt
for attempt in latest_index_attempts
}
cc_pair_to_latest_index_attempt = _attempt_lookup(latest_index_attempts)
cc_pair_to_latest_finished_index_attempt = _attempt_lookup(
latest_finished_index_attempts
)
cc_pair_to_latest_successful_index_attempt = _attempt_lookup(
latest_successful_index_attempts
)
cc_pair_to_latest_finished_index_attempt: dict[tuple[int, int], IndexAttempt] = {
(
attempt.connector_credential_pair.connector_id,
attempt.connector_credential_pair.credential_id,
): attempt
for attempt in latest_finished_index_attempts
}
def build_connector_indexing_status(
cc_pair: ConnectorCredentialPair,
is_editable: bool,
) -> ConnectorIndexingStatusLite | None:
# TODO remove this to enable ingestion API
if cc_pair.name == "DefaultCCPair":
return None
latest_attempt = cc_pair_to_latest_index_attempt.get(cc_pair.id)
latest_finished_attempt = cc_pair_to_latest_finished_index_attempt.get(
cc_pair.id
latest_attempt = cc_pair_to_latest_index_attempt.get(
(cc_pair.connector_id, cc_pair.credential_id)
)
latest_successful_attempt = cc_pair_to_latest_successful_index_attempt.get(
cc_pair.id
latest_finished_attempt = cc_pair_to_latest_finished_index_attempt.get(
(cc_pair.connector_id, cc_pair.credential_id)
)
doc_count = cc_pair_to_document_cnt.get(
(cc_pair.connector_id, cc_pair.credential_id), 0
)
return _get_connector_indexing_status_lite(
cc_pair,
latest_attempt,
latest_finished_attempt,
(
latest_successful_attempt.time_started
if latest_successful_attempt
else None
),
is_editable,
doc_count,
cc_pair, latest_attempt, latest_finished_attempt, is_editable, doc_count
)
# Process editable cc_pairs
@@ -1421,7 +1402,6 @@ def _get_connector_indexing_status_lite(
cc_pair: ConnectorCredentialPair,
latest_index_attempt: IndexAttempt | None,
latest_finished_index_attempt: IndexAttempt | None,
last_successful_index_time: datetime | None,
is_editable: bool,
document_cnt: int,
) -> ConnectorIndexingStatusLite | None:
@@ -1455,7 +1435,7 @@ def _get_connector_indexing_status_lite(
else None
),
last_status=latest_index_attempt.status if latest_index_attempt else None,
last_success=last_successful_index_time,
last_success=cc_pair.last_successful_index_time,
docs_indexed=document_cnt,
latest_index_attempt_docs_indexed=(
latest_index_attempt.total_docs_indexed if latest_index_attempt else None

View File

@@ -330,7 +330,6 @@ class CCPairFullInfo(BaseModel):
num_docs_indexed: int, # not ideal, but this must be computed separately
is_editable_for_current_user: bool,
indexing: bool,
last_successful_index_time: datetime | None = None,
last_permission_sync_attempt_status: PermissionSyncStatus | None = None,
permission_syncing: bool = False,
last_permission_sync_attempt_finished: datetime | None = None,
@@ -383,7 +382,9 @@ class CCPairFullInfo(BaseModel):
creator_email=(
cc_pair_model.creator.email if cc_pair_model.creator else None
),
last_indexed=last_successful_index_time,
last_indexed=(
last_index_attempt.time_started if last_index_attempt else None
),
last_pruned=cc_pair_model.last_pruned,
last_full_permission_sync=cls._get_last_full_permission_sync(cc_pair_model),
overall_indexing_speed=overall_indexing_speed,

View File

@@ -6978,9 +6978,9 @@
}
},
"node_modules/flatted": {
"version": "3.4.2",
"resolved": "https://registry.npmjs.org/flatted/-/flatted-3.4.2.tgz",
"integrity": "sha512-PjDse7RzhcPkIJwy5t7KPWQSZ9cAbzQXcafsetQoD7sOJRQlGikNbx7yZp2OotDnJyrDcbyRq3Ttb18iYOqkxA==",
"version": "3.3.3",
"resolved": "https://registry.npmjs.org/flatted/-/flatted-3.3.3.tgz",
"integrity": "sha512-GX+ysw4PBCz0PzosHDepZGANEuFCMLrnRTiEy9McGjmkCQYwRq4A/X786G/fjM/+OjsWSU1ZrY5qyARZmO/uwg==",
"dev": true,
"license": "ISC"
},

View File

@@ -119,8 +119,8 @@ admin_agents_router = APIRouter(prefix=ADMIN_AGENTS_RESOURCE)
agents_router = APIRouter(prefix=AGENTS_RESOURCE)
class IsListedRequest(BaseModel):
is_listed: bool
class IsVisibleRequest(BaseModel):
is_visible: bool
class IsPublicRequest(BaseModel):
@@ -128,19 +128,19 @@ class IsPublicRequest(BaseModel):
class IsFeaturedRequest(BaseModel):
is_featured: bool
featured: bool
@admin_router.patch("/{persona_id}/listed")
@admin_router.patch("/{persona_id}/visible")
def patch_persona_visibility(
persona_id: int,
is_listed_request: IsListedRequest,
is_visible_request: IsVisibleRequest,
user: User = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
) -> None:
update_persona_visibility(
persona_id=persona_id,
is_listed=is_listed_request.is_listed,
is_visible=is_visible_request.is_visible,
db_session=db_session,
user=user,
)
@@ -175,7 +175,7 @@ def patch_persona_featured_status(
try:
update_persona_featured(
persona_id=persona_id,
is_featured=is_featured_request.is_featured,
featured=is_featured_request.featured,
db_session=db_session,
user=user,
)

View File

@@ -123,7 +123,7 @@ class PersonaUpsertRequest(BaseModel):
)
search_start_date: datetime | None = None
label_ids: list[int] | None = None
is_featured: bool = False
featured: bool = False
display_priority: int | None = None
# Accept string UUIDs from frontend
user_file_ids: list[str] | None = None
@@ -165,9 +165,9 @@ class MinimalPersonaSnapshot(BaseModel):
icon_name: str | None
is_public: bool
is_listed: bool
is_visible: bool
display_priority: int | None
is_featured: bool
featured: bool
builtin_persona: bool
# Used for filtering
@@ -218,9 +218,9 @@ class MinimalPersonaSnapshot(BaseModel):
uploaded_image_id=persona.uploaded_image_id,
icon_name=persona.icon_name,
is_public=persona.is_public,
is_listed=persona.is_listed,
is_visible=persona.is_visible,
display_priority=persona.display_priority,
is_featured=persona.is_featured,
featured=persona.featured,
builtin_persona=persona.builtin_persona,
labels=[PersonaLabelSnapshot.from_model(label) for label in persona.labels],
owner=(
@@ -236,13 +236,13 @@ class PersonaSnapshot(BaseModel):
name: str
description: str
is_public: bool
is_listed: bool
is_visible: bool
uploaded_image_id: str | None
icon_name: str | None
# Return string UUIDs to frontend for consistency
user_file_ids: list[str]
display_priority: int | None
is_featured: bool
featured: bool
builtin_persona: bool
starter_messages: list[StarterMessage] | None
tools: list[ToolSnapshot]
@@ -271,12 +271,12 @@ class PersonaSnapshot(BaseModel):
name=persona.name,
description=persona.description,
is_public=persona.is_public,
is_listed=persona.is_listed,
is_visible=persona.is_visible,
uploaded_image_id=persona.uploaded_image_id,
icon_name=persona.icon_name,
user_file_ids=[str(file.id) for file in persona.user_files],
display_priority=persona.display_priority,
is_featured=persona.is_featured,
featured=persona.featured,
builtin_persona=persona.builtin_persona,
starter_messages=persona.starter_messages,
tools=[
@@ -337,12 +337,12 @@ class FullPersonaSnapshot(PersonaSnapshot):
name=persona.name,
description=persona.description,
is_public=persona.is_public,
is_listed=persona.is_listed,
is_visible=persona.is_visible,
uploaded_image_id=persona.uploaded_image_id,
icon_name=persona.icon_name,
user_file_ids=[str(file.id) for file in persona.user_files],
display_priority=persona.display_priority,
is_featured=persona.is_featured,
featured=persona.featured,
builtin_persona=persona.builtin_persona,
starter_messages=persona.starter_messages,
users=[

View File

@@ -351,7 +351,7 @@ def upsert_project_instructions(
class ProjectPayload(BaseModel):
project: UserProjectSnapshot
files: list[UserFileSnapshot] | None = None
persona_id_to_is_featured: dict[int, bool] | None = None
persona_id_to_featured: dict[int, bool] | None = None
@router.get(
@@ -370,13 +370,11 @@ def get_project_details(
if session.persona_id is not None
]
personas = get_personas_by_ids(persona_ids, db_session)
persona_id_to_is_featured = {
persona.id: persona.is_featured for persona in personas
}
persona_id_to_featured = {persona.id: persona.featured for persona in personas}
return ProjectPayload(
project=project,
files=files,
persona_id_to_is_featured=persona_id_to_is_featured,
persona_id_to_featured=persona_id_to_featured,
)

View File

@@ -142,7 +142,7 @@ def enable_or_disable_kg(
users=[user.id],
groups=[],
label_ids=[],
is_featured=False,
featured=False,
display_priority=0,
user_file_ids=[],
)

View File

@@ -5,7 +5,6 @@ from fastapi import Depends
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from onyx import __version__ as onyx_version
from onyx.auth.users import current_admin_user
from onyx.auth.users import current_user
from onyx.auth.users import is_user_admin
@@ -80,7 +79,6 @@ def fetch_settings(
needs_reindexing=needs_reindexing,
onyx_craft_enabled=onyx_craft_enabled_for_user,
vector_db_enabled=not DISABLE_VECTOR_DB,
version=onyx_version,
)

View File

@@ -104,5 +104,3 @@ class UserSettings(Settings):
# False when DISABLE_VECTOR_DB is set — connectors, RAG search, and
# document sets are unavailable.
vector_db_enabled: bool = True
# Application version, read from the ONYX_VERSION env var at startup.
version: str | None = None

View File

@@ -400,7 +400,6 @@ def construct_tools(
tool_definition=saved_tool.mcp_input_schema or {},
connection_config=connection_config,
user_email=user_email,
user_id=str(user.id),
user_oauth_token=mcp_user_oauth_token,
additional_headers=additional_mcp_headers,
)

View File

@@ -1,8 +1,6 @@
import json
from typing import Any
from mcp.client.auth import OAuthClientProvider
from onyx.chat.emitter import Emitter
from onyx.db.enums import MCPAuthenticationType
from onyx.db.enums import MCPTransport
@@ -49,7 +47,6 @@ class MCPTool(Tool[None]):
tool_definition: dict[str, Any],
connection_config: MCPConnectionConfig | None = None,
user_email: str = "",
user_id: str = "",
user_oauth_token: str | None = None,
additional_headers: dict[str, str] | None = None,
) -> None:
@@ -59,7 +56,6 @@ class MCPTool(Tool[None]):
self.mcp_server = mcp_server
self.connection_config = connection_config
self.user_email = user_email
self._user_id = user_id
self._user_oauth_token = user_oauth_token
self._additional_headers = additional_headers or {}
@@ -202,42 +198,12 @@ class MCPTool(Tool[None]):
llm_facing_response=llm_facing_response,
)
# For OAuth servers, construct OAuthClientProvider so the MCP SDK
# can refresh expired tokens automatically
auth: OAuthClientProvider | None = None
if (
self.mcp_server.auth_type == MCPAuthenticationType.OAUTH
and self.connection_config is not None
and self._user_id
):
if self.mcp_server.transport == MCPTransport.SSE:
logger.warning(
f"MCP tool '{self._name}': OAuth token refresh is not supported "
f"for SSE transport — auth provider will be ignored. "
f"Re-authentication may be required after token expiry."
)
else:
from onyx.server.features.mcp.api import UNUSED_RETURN_PATH
from onyx.server.features.mcp.api import make_oauth_provider
# user_id is the requesting user's UUID; safe here because
# UNUSED_RETURN_PATH ensures redirect_handler raises immediately
# and user_id is never consulted for Redis state lookups.
auth = make_oauth_provider(
self.mcp_server,
self._user_id,
UNUSED_RETURN_PATH,
self.connection_config.id,
None,
)
tool_result = call_mcp_tool(
self.mcp_server.server_url,
self._name,
llm_kwargs,
connection_headers=headers,
transport=self.mcp_server.transport or MCPTransport.STREAMABLE_HTTP,
auth=auth,
)
logger.info(f"MCP tool '{self._name}' executed successfully")
@@ -282,7 +248,6 @@ class MCPTool(Tool[None]):
"invalid token",
"invalid api key",
"invalid credentials",
"please reconnect to the server",
]
is_auth_error = any(

View File

@@ -189,30 +189,3 @@ def mt_cloud_identify(
attribute="identify_user",
fallback=noop_fallback,
)(distinct_id, properties)
def mt_cloud_alias(
distinct_id: str,
anonymous_id: str,
) -> None:
"""Link an anonymous distinct_id to an identified user (Cloud only)."""
if not MULTI_TENANT:
return
fetch_versioned_implementation_with_fallback(
module="onyx.utils.posthog_client",
attribute="alias_user",
fallback=noop_fallback,
)(distinct_id, anonymous_id)
def mt_cloud_get_anon_id(request: Any) -> str | None:
"""Extract the anonymous distinct_id from the app PostHog cookie (Cloud only)."""
if not MULTI_TENANT or not request:
return None
return fetch_versioned_implementation_with_fallback(
module="onyx.utils.posthog_client",
attribute="get_anon_id_from_request",
fallback=noop_fallback,
)(request)

View File

@@ -1,170 +0,0 @@
#!/usr/bin/env python3
"""Benchmarks OpenSearchDocumentIndex latency.
Requires Onyx to be running as it reads search settings from the database.
Usage:
source .venv/bin/activate
python backend/scripts/debugging/opensearch/benchmark_retrieval.py --help
"""
import argparse
import statistics
import time
from onyx.configs.chat_configs import NUM_RETURNED_HITS
from onyx.context.search.enums import QueryType
from onyx.context.search.models import IndexFilters
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.engine.sql_engine import SqlEngine
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.interfaces_new import TenantState
from onyx.document_index.opensearch.opensearch_document_index import (
OpenSearchDocumentIndex,
)
from onyx.indexing.models import IndexingSetting
from scripts.debugging.opensearch.constants import DEV_TENANT_ID
from scripts.debugging.opensearch.embedding_io import load_query_embedding_from_file
from shared_configs.configs import MULTI_TENANT
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
from shared_configs.contextvars import get_current_tenant_id
DEFAULT_N = 50
def main() -> None:
def add_query_embedding_argument(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"-e",
"--embedding-file-path",
type=str,
required=True,
help="Path to the query embedding file.",
)
def add_query_string_argument(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"-q",
"--query",
type=str,
required=True,
help="Query string.",
)
parser = argparse.ArgumentParser(
description="A benchmarking tool to measure OpenSearch retrieval latency."
)
parser.add_argument(
"-n",
type=int,
default=DEFAULT_N,
help=f"Number of samples to take (default: {DEFAULT_N}).",
)
subparsers = parser.add_subparsers(
dest="query_type",
help="Query type to benchmark.",
required=True,
)
hybrid_parser = subparsers.add_parser(
"hybrid", help="Benchmark hybrid retrieval latency."
)
add_query_embedding_argument(hybrid_parser)
add_query_string_argument(hybrid_parser)
keyword_parser = subparsers.add_parser(
"keyword", help="Benchmark keyword retrieval latency."
)
add_query_string_argument(keyword_parser)
semantic_parser = subparsers.add_parser(
"semantic", help="Benchmark semantic retrieval latency."
)
add_query_embedding_argument(semantic_parser)
args = parser.parse_args()
if args.n < 1:
parser.error("Number of samples (-n) must be at least 1.")
if MULTI_TENANT:
CURRENT_TENANT_ID_CONTEXTVAR.set(DEV_TENANT_ID)
SqlEngine.init_engine(pool_size=1, max_overflow=0)
with get_session_with_current_tenant() as session:
search_settings = get_current_search_settings(session)
indexing_setting = IndexingSetting.from_db_model(search_settings)
tenant_state = TenantState(
tenant_id=get_current_tenant_id(), multitenant=MULTI_TENANT
)
index = OpenSearchDocumentIndex(
tenant_state=tenant_state,
index_name=search_settings.index_name,
embedding_dim=indexing_setting.final_embedding_dim,
embedding_precision=indexing_setting.embedding_precision,
)
filters = IndexFilters(
access_control_list=[],
tenant_id=get_current_tenant_id(),
)
if args.query_type == "hybrid":
embedding = load_query_embedding_from_file(args.embedding_file_path)
search_callable = lambda: index.hybrid_retrieval( # noqa: E731
query=args.query,
query_embedding=embedding,
final_keywords=None,
# This arg doesn't do anything right now.
query_type=QueryType.KEYWORD,
filters=filters,
num_to_retrieve=NUM_RETURNED_HITS,
)
elif args.query_type == "keyword":
search_callable = lambda: index.keyword_retrieval( # noqa: E731
query=args.query,
filters=filters,
num_to_retrieve=NUM_RETURNED_HITS,
)
elif args.query_type == "semantic":
embedding = load_query_embedding_from_file(args.embedding_file_path)
search_callable = lambda: index.semantic_retrieval( # noqa: E731
query_embedding=embedding,
filters=filters,
num_to_retrieve=NUM_RETURNED_HITS,
)
else:
raise ValueError(f"Invalid query type: {args.query_type}")
print(f"Running {args.n} invocations of {args.query_type} retrieval...")
latencies: list[float] = []
for i in range(args.n):
start = time.perf_counter()
results = search_callable()
elapsed_ms = (time.perf_counter() - start) * 1000
latencies.append(elapsed_ms)
# Print the current iteration and its elapsed time on the same line.
print(
f" [{i:>{len(str(args.n))}}] {elapsed_ms:7.1f} ms ({len(results)} results) (top result doc ID, chunk idx: {results[0].document_id if results else 'N/A'}, {results[0].chunk_id if results else 'N/A'})",
end="\r",
flush=True,
)
print()
print(f"Results over {args.n} invocations:")
print(f" mean: {statistics.mean(latencies):7.1f} ms")
print(
f" stdev: {statistics.stdev(latencies):7.1f} ms"
if args.n > 1
else " stdev: N/A (only 1 sample)"
)
print(f" max: {max(latencies):7.1f} ms (i: {latencies.index(max(latencies))})")
print(f" min: {min(latencies):7.1f} ms (i: {latencies.index(min(latencies))})")
if args.n >= 20:
print(f" p50: {statistics.median(latencies):7.1f} ms")
print(f" p95: {statistics.quantiles(latencies, n=20)[-1]:7.1f} ms")
if __name__ == "__main__":
main()

View File

@@ -1 +0,0 @@
DEV_TENANT_ID = "tenant_dev"

View File

@@ -1,64 +0,0 @@
#!/usr/bin/env python3
"""Embeds a query and saves the embedding to a file.
Requires Onyx to be running as it reads search settings from the database.
Usage:
source .venv/bin/activate
python backend/scripts/debugging/opensearch/embed_and_save.py --help
"""
import argparse
import time
from onyx.context.search.utils import get_query_embedding
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.engine.sql_engine import SqlEngine
from scripts.debugging.opensearch.constants import DEV_TENANT_ID
from scripts.debugging.opensearch.embedding_io import save_query_embedding_to_file
from shared_configs.configs import MULTI_TENANT
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
def main() -> None:
parser = argparse.ArgumentParser(
description="A tool to embed a query and save the embedding to a file."
)
parser.add_argument(
"-q",
"--query",
type=str,
required=True,
help="Query string to embed.",
)
parser.add_argument(
"-f",
"--file-path",
type=str,
required=True,
help="Path to the output file to save the embedding to.",
)
args = parser.parse_args()
if MULTI_TENANT:
CURRENT_TENANT_ID_CONTEXTVAR.set(DEV_TENANT_ID)
SqlEngine.init_engine(pool_size=1, max_overflow=0)
with get_session_with_current_tenant() as session:
start = time.perf_counter()
query_embedding = get_query_embedding(
query=args.query,
db_session=session,
embedding_model=None,
)
elapsed_ms = (time.perf_counter() - start) * 1000
save_query_embedding_to_file(query_embedding, args.file_path)
print(
f"Query embedding of dimension {len(query_embedding)} generated in {elapsed_ms:.1f} ms and saved to {args.file_path}."
)
if __name__ == "__main__":
main()

View File

@@ -1,43 +0,0 @@
from shared_configs.model_server_models import Embedding
def load_query_embedding_from_file(file_path: str) -> Embedding:
"""Returns an embedding vector read from a file.
The file should be formatted as follows:
- The first line should contain an integer representing the embedding
dimension.
- Every subsequent line should contain a float value representing a
component of the embedding vector.
- The size and embedding content should all be delimited by a newline.
Args:
file_path: Path to the file containing the embedding vector.
Returns:
Embedding: The embedding vector.
"""
with open(file_path, "r") as f:
dimension = int(f.readline().strip())
embedding = [float(line.strip()) for line in f.readlines()]
assert len(embedding) == dimension, "Embedding dimension mismatch."
return embedding
def save_query_embedding_to_file(embedding: Embedding, file_path: str) -> None:
"""Saves an embedding vector to a file.
The file will be formatted as follows:
- The first line will contain the embedding dimension.
- Every subsequent line will contain a float value representing a
component of the embedding vector.
- The size and embedding content will all be delimited by a newline.
Args:
embedding: The embedding vector to save.
file_path: Path to the file to save the embedding vector to.
"""
with open(file_path, "w") as f:
f.write(f"{len(embedding)}\n")
for component in embedding:
f.write(f"{component}\n")

View File

@@ -2,10 +2,9 @@
"""A utility to interact with OpenSearch.
Usage:
source .venv/bin/activate
python backend/scripts/debugging/opensearch/opensearch_debug.py --help
python backend/scripts/debugging/opensearch/opensearch_debug.py list
python backend/scripts/debugging/opensearch/opensearch_debug.py delete <index_name>
python3 opensearch_debug.py --help
python3 opensearch_debug.py list
python3 opensearch_debug.py delete <index_name>
Environment Variables:
OPENSEARCH_HOST: OpenSearch host
@@ -108,15 +107,16 @@ def main() -> None:
parser = argparse.ArgumentParser(
description="A utility to interact with OpenSearch."
)
add_standard_arguments(parser)
subparsers = parser.add_subparsers(
dest="command", help="Command to execute.", required=True
)
subparsers.add_parser("list", help="List all indices with info.")
list_parser = subparsers.add_parser("list", help="List all indices with info.")
add_standard_arguments(list_parser)
delete_parser = subparsers.add_parser("delete", help="Delete an index.")
delete_parser.add_argument("index", help="Index name.", type=str)
add_standard_arguments(delete_parser)
args = parser.parse_args()

View File

@@ -83,7 +83,7 @@ def test_stream_chat_message_objects_without_web_search(
db_session=db_session,
tool_ids=[], # Explicitly no tools
document_set_ids=None,
is_listed=True,
is_visible=True,
)
# Create a chat session with our test persona

View File

@@ -91,7 +91,7 @@ def _create_test_persona(
document_sets=[],
users=[user],
groups=[],
is_listed=True,
is_visible=True,
is_public=True,
display_priority=None,
starter_messages=None,

View File

@@ -1,7 +1,7 @@
"""
External dependency unit tests for UserFileIndexingAdapter metadata writing.
Validates that prepare_enrichment produces DocMetadataAwareIndexChunk
Validates that build_metadata_aware_chunks produces DocMetadataAwareIndexChunk
objects with both `user_project` and `personas` fields populated correctly
based on actual DB associations.
@@ -63,7 +63,7 @@ def _create_persona(db_session: Session, user: User) -> Persona:
document_sets=[],
users=[user],
groups=[],
is_listed=True,
is_visible=True,
is_public=True,
display_priority=None,
starter_messages=None,
@@ -127,7 +127,7 @@ def _make_index_chunk(user_file: UserFile) -> IndexChunk:
class TestAdapterWritesBothMetadataFields:
"""prepare_enrichment must populate user_project AND personas."""
"""build_metadata_aware_chunks must populate user_project AND personas."""
@patch(
"onyx.indexing.adapters.user_file_indexing_adapter.get_default_llm",
@@ -153,13 +153,15 @@ class TestAdapterWritesBothMetadataFields:
doc = chunk.source_document
context = DocumentBatchPrepareContext(updatable_docs=[doc], id_to_boost_map={})
enricher = adapter.prepare_enrichment(
context=context,
result = adapter.build_metadata_aware_chunks(
chunks_with_embeddings=[chunk],
chunk_content_scores=[1.0],
tenant_id=TEST_TENANT_ID,
chunks=[chunk],
context=context,
)
aware_chunk = enricher.enrich_chunk(chunk, 1.0)
assert len(result.chunks) == 1
aware_chunk = result.chunks[0]
assert persona.id in aware_chunk.personas
assert aware_chunk.user_project == []
@@ -188,13 +190,15 @@ class TestAdapterWritesBothMetadataFields:
updatable_docs=[chunk.source_document], id_to_boost_map={}
)
enricher = adapter.prepare_enrichment(
context=context,
result = adapter.build_metadata_aware_chunks(
chunks_with_embeddings=[chunk],
chunk_content_scores=[1.0],
tenant_id=TEST_TENANT_ID,
chunks=[chunk],
context=context,
)
aware_chunk = enricher.enrich_chunk(chunk, 1.0)
assert len(result.chunks) == 1
aware_chunk = result.chunks[0]
assert project.id in aware_chunk.user_project
assert aware_chunk.personas == []
@@ -225,13 +229,14 @@ class TestAdapterWritesBothMetadataFields:
updatable_docs=[chunk.source_document], id_to_boost_map={}
)
enricher = adapter.prepare_enrichment(
context=context,
result = adapter.build_metadata_aware_chunks(
chunks_with_embeddings=[chunk],
chunk_content_scores=[1.0],
tenant_id=TEST_TENANT_ID,
chunks=[chunk],
context=context,
)
aware_chunk = enricher.enrich_chunk(chunk, 1.0)
aware_chunk = result.chunks[0]
assert persona.id in aware_chunk.personas
assert project.id in aware_chunk.user_project
@@ -256,13 +261,14 @@ class TestAdapterWritesBothMetadataFields:
updatable_docs=[chunk.source_document], id_to_boost_map={}
)
enricher = adapter.prepare_enrichment(
context=context,
result = adapter.build_metadata_aware_chunks(
chunks_with_embeddings=[chunk],
chunk_content_scores=[1.0],
tenant_id=TEST_TENANT_ID,
chunks=[chunk],
context=context,
)
aware_chunk = enricher.enrich_chunk(chunk, 1.0)
aware_chunk = result.chunks[0]
assert aware_chunk.personas == []
assert aware_chunk.user_project == []
@@ -294,11 +300,12 @@ class TestAdapterWritesBothMetadataFields:
updatable_docs=[chunk.source_document], id_to_boost_map={}
)
enricher = adapter.prepare_enrichment(
context=context,
result = adapter.build_metadata_aware_chunks(
chunks_with_embeddings=[chunk],
chunk_content_scores=[1.0],
tenant_id=TEST_TENANT_ID,
chunks=[chunk],
context=context,
)
aware_chunk = enricher.enrich_chunk(chunk, 1.0)
aware_chunk = result.chunks[0]
assert set(aware_chunk.personas) == {persona_a.id, persona_b.id}

View File

@@ -0,0 +1,37 @@
from sqlalchemy import inspect
from sqlalchemy.orm import Session
from onyx.db.chat import create_chat_session
from onyx.db.chat import get_chat_session_by_id
from onyx.db.models import Persona
def test_eager_load_persona_loads_relationships(db_session: Session) -> None:
"""Verify that eager_load_persona pre-loads persona, its collections, and project."""
persona = Persona(name="eager-load-test", description="test")
db_session.add(persona)
db_session.flush()
chat_session = create_chat_session(
db_session=db_session,
description="test",
user_id=None,
persona_id=persona.id,
)
loaded = get_chat_session_by_id(
chat_session_id=chat_session.id,
user_id=None,
db_session=db_session,
eager_load_persona=True,
)
unloaded = inspect(loaded).unloaded
assert "persona" not in unloaded
assert "project" not in unloaded
persona_unloaded = inspect(loaded.persona).unloaded
assert "tools" not in persona_unloaded
assert "user_files" not in persona_unloaded
db_session.rollback()

View File

@@ -1,248 +0,0 @@
"""Shared fixtures for document_index external dependency tests.
Provides Vespa and OpenSearch index setup, tenant context, and chunk helpers.
"""
import os
import time
import uuid
from collections.abc import Generator
from unittest.mock import patch
import httpx
import pytest
from onyx.access.models import DocumentAccess
from onyx.configs.constants import DocumentSource
from onyx.connectors.models import Document
from onyx.db.enums import EmbeddingPrecision
from onyx.document_index.interfaces_new import IndexingMetadata
from onyx.document_index.opensearch.client import wait_for_opensearch_with_timeout
from onyx.document_index.opensearch.opensearch_document_index import (
OpenSearchOldDocumentIndex,
)
from onyx.document_index.vespa.index import VespaIndex
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout
from onyx.indexing.models import ChunkEmbedding
from onyx.indexing.models import DocMetadataAwareIndexChunk
from shared_configs.configs import MULTI_TENANT
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
from shared_configs.contextvars import get_current_tenant_id
from tests.external_dependency_unit.constants import TEST_TENANT_ID
EMBEDDING_DIM = 128
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_chunk(
doc_id: str,
chunk_id: int = 0,
content: str = "test content",
) -> DocMetadataAwareIndexChunk:
"""Create a chunk suitable for external dependency testing (128-dim embeddings)."""
tenant_id = get_current_tenant_id()
access = DocumentAccess.build(
user_emails=[],
user_groups=[],
external_user_emails=[],
external_user_group_ids=[],
is_public=True,
)
embeddings = ChunkEmbedding(
full_embedding=[1.0] + [0.0] * (EMBEDDING_DIM - 1),
mini_chunk_embeddings=[],
)
source_document = Document(
id=doc_id,
semantic_identifier="test_doc",
source=DocumentSource.FILE,
sections=[],
metadata={},
title="test title",
)
return DocMetadataAwareIndexChunk(
tenant_id=tenant_id,
access=access,
document_sets=set(),
user_project=[],
personas=[],
boost=0,
aggregated_chunk_boost_factor=0,
ancestor_hierarchy_node_ids=[],
embeddings=embeddings,
title_embedding=[1.0] + [0.0] * (EMBEDDING_DIM - 1),
source_document=source_document,
title_prefix="",
metadata_suffix_keyword="",
metadata_suffix_semantic="",
contextual_rag_reserved_tokens=0,
doc_summary="",
chunk_context="",
mini_chunk_texts=None,
large_chunk_id=None,
chunk_id=chunk_id,
blurb=content[:50],
content=content,
source_links={0: ""},
image_file_id=None,
section_continuation=False,
)
def make_indexing_metadata(
doc_ids: list[str],
old_counts: list[int],
new_counts: list[int],
) -> IndexingMetadata:
return IndexingMetadata(
doc_id_to_chunk_cnt_diff={
doc_id: IndexingMetadata.ChunkCounts(
old_chunk_cnt=old,
new_chunk_cnt=new,
)
for doc_id, old, new in zip(doc_ids, old_counts, new_counts)
}
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def tenant_context() -> Generator[None, None, None]:
"""Sets up tenant context for testing."""
token = CURRENT_TENANT_ID_CONTEXTVAR.set(TEST_TENANT_ID)
try:
yield
finally:
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
@pytest.fixture(scope="module")
def test_index_name() -> Generator[str, None, None]:
yield f"test_index_{uuid.uuid4().hex[:8]}"
@pytest.fixture(scope="module")
def httpx_client() -> Generator[httpx.Client, None, None]:
client = get_vespa_http_client()
try:
yield client
finally:
client.close()
@pytest.fixture(scope="module")
def vespa_index(
httpx_client: httpx.Client,
tenant_context: None, # noqa: ARG001
test_index_name: str,
) -> Generator[VespaIndex, None, None]:
"""Create a Vespa index, wait for schema readiness, and yield it."""
vespa_idx = VespaIndex(
index_name=test_index_name,
secondary_index_name=None,
large_chunks_enabled=False,
secondary_large_chunks_enabled=None,
multitenant=MULTI_TENANT,
httpx_client=httpx_client,
)
backend_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "..")
)
with patch("os.getcwd", return_value=backend_dir):
vespa_idx.ensure_indices_exist(
primary_embedding_dim=EMBEDDING_DIM,
primary_embedding_precision=EmbeddingPrecision.FLOAT,
secondary_index_embedding_dim=None,
secondary_index_embedding_precision=None,
)
if not wait_for_vespa_with_timeout(wait_limit=90):
pytest.fail("Vespa is not available.")
# Wait until the schema is actually ready for writes on content nodes. We
# probe by attempting a PUT; 200 means the schema is live, 400 means not
# yet. This is only temporary until we entirely move off of Vespa.
probe_doc = {
"fields": {
"document_id": "__probe__",
"chunk_id": 0,
"blurb": "",
"title": "",
"skip_title": True,
"content": "",
"content_summary": "",
"source_type": "file",
"source_links": "null",
"semantic_identifier": "",
"section_continuation": False,
"large_chunk_reference_ids": [],
"metadata": "{}",
"metadata_list": [],
"metadata_suffix": "",
"chunk_context": "",
"doc_summary": "",
"embeddings": {"full_chunk": [1.0] + [0.0] * (EMBEDDING_DIM - 1)},
"access_control_list": {},
"document_sets": {},
"image_file_name": None,
"user_project": [],
"personas": [],
"boost": 0.0,
"aggregated_chunk_boost_factor": 0.0,
"primary_owners": [],
"secondary_owners": [],
}
}
probe_url = (
f"http://localhost:8081/document/v1/default/{test_index_name}/docid/__probe__"
)
schema_ready = False
for _ in range(60):
resp = httpx_client.post(probe_url, json=probe_doc)
if resp.status_code == 200:
schema_ready = True
httpx_client.delete(probe_url)
break
time.sleep(1)
if not schema_ready:
pytest.fail(f"Vespa schema '{test_index_name}' did not become ready in time.")
yield vespa_idx
@pytest.fixture(scope="module")
def opensearch_old_index(
tenant_context: None, # noqa: ARG001
test_index_name: str,
) -> Generator[OpenSearchOldDocumentIndex, None, None]:
"""Create an OpenSearch index via the old adapter and yield it."""
if not wait_for_opensearch_with_timeout():
pytest.fail("OpenSearch is not available.")
opensearch_idx = OpenSearchOldDocumentIndex(
index_name=test_index_name,
embedding_dim=EMBEDDING_DIM,
embedding_precision=EmbeddingPrecision.FLOAT,
secondary_index_name=None,
secondary_embedding_dim=None,
secondary_embedding_precision=None,
large_chunks_enabled=False,
secondary_large_chunks_enabled=None,
multitenant=MULTI_TENANT,
)
opensearch_idx.ensure_indices_exist(
primary_embedding_dim=EMBEDDING_DIM,
primary_embedding_precision=EmbeddingPrecision.FLOAT,
secondary_index_embedding_dim=None,
secondary_index_embedding_precision=None,
)
yield opensearch_idx

View File

@@ -1,227 +0,0 @@
"""External dependency tests for the new DocumentIndex interface.
These tests assume Vespa and OpenSearch are running.
"""
import time
import uuid
from collections.abc import Generator
from collections.abc import Iterator
import httpx
import pytest
from onyx.db.enums import EmbeddingPrecision
from onyx.document_index.interfaces_new import DocumentIndex as DocumentIndexNew
from onyx.document_index.interfaces_new import TenantState
from onyx.document_index.opensearch.opensearch_document_index import (
OpenSearchDocumentIndex,
)
from onyx.document_index.opensearch.opensearch_document_index import (
OpenSearchOldDocumentIndex,
)
from onyx.document_index.vespa.index import VespaIndex
from onyx.document_index.vespa.vespa_document_index import VespaDocumentIndex
from onyx.indexing.models import DocMetadataAwareIndexChunk
from tests.external_dependency_unit.constants import TEST_TENANT_ID
from tests.external_dependency_unit.document_index.conftest import EMBEDDING_DIM
from tests.external_dependency_unit.document_index.conftest import make_chunk
from tests.external_dependency_unit.document_index.conftest import (
make_indexing_metadata,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def vespa_document_index(
vespa_index: VespaIndex, # noqa: ARG001 — ensures schema exists
httpx_client: httpx.Client,
test_index_name: str,
) -> Generator[VespaDocumentIndex, None, None]:
yield VespaDocumentIndex(
index_name=test_index_name,
tenant_state=TenantState(tenant_id=TEST_TENANT_ID, multitenant=False),
large_chunks_enabled=False,
httpx_client=httpx_client,
)
@pytest.fixture(scope="module")
def opensearch_document_index(
opensearch_old_index: OpenSearchOldDocumentIndex, # noqa: ARG001 — ensures index exists
test_index_name: str,
) -> Generator[OpenSearchDocumentIndex, None, None]:
yield OpenSearchDocumentIndex(
tenant_state=TenantState(tenant_id=TEST_TENANT_ID, multitenant=False),
index_name=test_index_name,
embedding_dim=EMBEDDING_DIM,
embedding_precision=EmbeddingPrecision.FLOAT,
)
@pytest.fixture(scope="module")
def document_indices(
vespa_document_index: VespaDocumentIndex,
opensearch_document_index: OpenSearchDocumentIndex,
) -> Generator[list[DocumentIndexNew], None, None]:
yield [opensearch_document_index, vespa_document_index]
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestDocumentIndexNew:
"""Tests the new DocumentIndex interface against real Vespa and OpenSearch."""
def test_index_single_new_doc(
self,
document_indices: list[DocumentIndexNew],
tenant_context: None, # noqa: ARG002
) -> None:
"""Indexing a single new document returns one record with already_existed=False."""
for document_index in document_indices:
doc_id = f"test_single_new_{uuid.uuid4().hex[:8]}"
chunk = make_chunk(doc_id)
metadata = make_indexing_metadata([doc_id], old_counts=[0], new_counts=[1])
results = document_index.index(chunks=[chunk], indexing_metadata=metadata)
assert len(results) == 1
assert results[0].document_id == doc_id
assert results[0].already_existed is False
def test_index_existing_doc_already_existed_true(
self,
document_indices: list[DocumentIndexNew],
tenant_context: None, # noqa: ARG002
) -> None:
"""Re-indexing a doc with previous chunks returns already_existed=True."""
for document_index in document_indices:
doc_id = f"test_existing_{uuid.uuid4().hex[:8]}"
chunk = make_chunk(doc_id)
# First index — brand new document.
metadata_first = make_indexing_metadata(
[doc_id], old_counts=[0], new_counts=[1]
)
document_index.index(chunks=[chunk], indexing_metadata=metadata_first)
# Allow near-real-time indexing to settle (needed for Vespa).
time.sleep(1)
# Re-index — old_chunk_cnt=1 signals the document already existed.
metadata_second = make_indexing_metadata(
[doc_id], old_counts=[1], new_counts=[1]
)
results = document_index.index(
chunks=[chunk], indexing_metadata=metadata_second
)
assert len(results) == 1
assert results[0].already_existed is True
def test_index_multiple_docs(
self,
document_indices: list[DocumentIndexNew],
tenant_context: None, # noqa: ARG002
) -> None:
"""Indexing multiple documents returns one record per unique document."""
for document_index in document_indices:
doc1 = f"test_multi_1_{uuid.uuid4().hex[:8]}"
doc2 = f"test_multi_2_{uuid.uuid4().hex[:8]}"
chunks = [
make_chunk(doc1, chunk_id=0),
make_chunk(doc1, chunk_id=1),
make_chunk(doc2, chunk_id=0),
]
metadata = make_indexing_metadata(
[doc1, doc2], old_counts=[0, 0], new_counts=[2, 1]
)
results = document_index.index(chunks=chunks, indexing_metadata=metadata)
result_map = {r.document_id: r.already_existed for r in results}
assert len(result_map) == 2
assert result_map[doc1] is False
assert result_map[doc2] is False
def test_index_deduplicates_doc_ids_in_results(
self,
document_indices: list[DocumentIndexNew],
tenant_context: None, # noqa: ARG002
) -> None:
"""Multiple chunks from the same document produce only one
DocumentInsertionRecord."""
for document_index in document_indices:
doc_id = f"test_dedup_{uuid.uuid4().hex[:8]}"
chunks = [make_chunk(doc_id, chunk_id=i) for i in range(5)]
metadata = make_indexing_metadata([doc_id], old_counts=[0], new_counts=[5])
results = document_index.index(chunks=chunks, indexing_metadata=metadata)
assert len(results) == 1
assert results[0].document_id == doc_id
def test_index_mixed_new_and_existing_docs(
self,
document_indices: list[DocumentIndexNew],
tenant_context: None, # noqa: ARG002
) -> None:
"""A batch with both new and existing documents returns the correct
already_existed flag for each."""
for document_index in document_indices:
existing_doc = f"test_mixed_exist_{uuid.uuid4().hex[:8]}"
new_doc = f"test_mixed_new_{uuid.uuid4().hex[:8]}"
# Pre-index the existing document.
pre_chunk = make_chunk(existing_doc)
pre_metadata = make_indexing_metadata(
[existing_doc], old_counts=[0], new_counts=[1]
)
document_index.index(chunks=[pre_chunk], indexing_metadata=pre_metadata)
time.sleep(1)
# Now index a batch with the existing doc and a new doc.
chunks = [
make_chunk(existing_doc, chunk_id=0),
make_chunk(new_doc, chunk_id=0),
]
metadata = make_indexing_metadata(
[existing_doc, new_doc], old_counts=[1, 0], new_counts=[1, 1]
)
results = document_index.index(chunks=chunks, indexing_metadata=metadata)
result_map = {r.document_id: r.already_existed for r in results}
assert len(result_map) == 2
assert result_map[existing_doc] is True
assert result_map[new_doc] is False
def test_index_accepts_generator(
self,
document_indices: list[DocumentIndexNew],
tenant_context: None, # noqa: ARG002
) -> None:
"""index() accepts a generator (any iterable), not just a list."""
for document_index in document_indices:
doc_id = f"test_gen_{uuid.uuid4().hex[:8]}"
metadata = make_indexing_metadata([doc_id], old_counts=[0], new_counts=[3])
def chunk_gen() -> Iterator[DocMetadataAwareIndexChunk]:
for i in range(3):
yield make_chunk(doc_id, chunk_id=i)
results = document_index.index(
chunks=chunk_gen(), indexing_metadata=metadata
)
assert len(results) == 1
assert results[0].document_id == doc_id
assert results[0].already_existed is False

View File

@@ -1,42 +1,275 @@
"""External dependency tests for the old DocumentIndex interface.
These tests assume Vespa and OpenSearch are running.
TODO(ENG-3764)(andrei): Consolidate some of these test fixtures.
"""
import os
import time
import uuid
from collections.abc import Generator
from collections.abc import Iterator
from unittest.mock import patch
import httpx
import pytest
from onyx.access.models import DocumentAccess
from onyx.configs.constants import DocumentSource
from onyx.connectors.models import Document
from onyx.context.search.models import IndexFilters
from onyx.db.enums import EmbeddingPrecision
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.interfaces import IndexBatchParams
from onyx.document_index.interfaces import VespaChunkRequest
from onyx.document_index.interfaces import VespaDocumentUserFields
from onyx.document_index.opensearch.client import wait_for_opensearch_with_timeout
from onyx.document_index.opensearch.opensearch_document_index import (
OpenSearchOldDocumentIndex,
)
from onyx.document_index.vespa.index import VespaIndex
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout
from onyx.indexing.models import ChunkEmbedding
from onyx.indexing.models import DocMetadataAwareIndexChunk
from shared_configs.configs import MULTI_TENANT
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
from shared_configs.contextvars import get_current_tenant_id
from tests.external_dependency_unit.document_index.conftest import make_chunk
from tests.external_dependency_unit.constants import TEST_TENANT_ID
@pytest.fixture(scope="module")
def opensearch_available() -> Generator[None, None, None]:
"""Verifies OpenSearch is running, fails the test if not."""
if not wait_for_opensearch_with_timeout():
pytest.fail("OpenSearch is not available.")
yield # Test runs here.
@pytest.fixture(scope="module")
def test_index_name() -> Generator[str, None, None]:
yield f"test_index_{uuid.uuid4().hex[:8]}" # Test runs here.
@pytest.fixture(scope="module")
def tenant_context() -> Generator[None, None, None]:
"""Sets up tenant context for testing."""
token = CURRENT_TENANT_ID_CONTEXTVAR.set(TEST_TENANT_ID)
try:
yield # Test runs here.
finally:
# Reset the tenant context after the test
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
@pytest.fixture(scope="module")
def httpx_client() -> Generator[httpx.Client, None, None]:
client = get_vespa_http_client()
try:
yield client
finally:
client.close()
@pytest.fixture(scope="module")
def vespa_document_index(
httpx_client: httpx.Client,
tenant_context: None, # noqa: ARG001
test_index_name: str,
) -> Generator[VespaIndex, None, None]:
vespa_index = VespaIndex(
index_name=test_index_name,
secondary_index_name=None,
large_chunks_enabled=False,
secondary_large_chunks_enabled=None,
multitenant=MULTI_TENANT,
httpx_client=httpx_client,
)
backend_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "..")
)
with patch("os.getcwd", return_value=backend_dir):
vespa_index.ensure_indices_exist(
primary_embedding_dim=128,
primary_embedding_precision=EmbeddingPrecision.FLOAT,
secondary_index_embedding_dim=None,
secondary_index_embedding_precision=None,
)
# Verify Vespa is running, fails the test if not. Try 90 seconds for testing
# in CI. We have to do this here because this endpoint only becomes live
# once we create an index.
if not wait_for_vespa_with_timeout(wait_limit=90):
pytest.fail("Vespa is not available.")
# Wait until the schema is actually ready for writes on content nodes. We
# probe by attempting a PUT; 200 means the schema is live, 400 means not
# yet. This is so scuffed but running the test is really flakey otherwise;
# this is only temporary until we entirely move off of Vespa.
probe_doc = {
"fields": {
"document_id": "__probe__",
"chunk_id": 0,
"blurb": "",
"title": "",
"skip_title": True,
"content": "",
"content_summary": "",
"source_type": "file",
"source_links": "null",
"semantic_identifier": "",
"section_continuation": False,
"large_chunk_reference_ids": [],
"metadata": "{}",
"metadata_list": [],
"metadata_suffix": "",
"chunk_context": "",
"doc_summary": "",
"embeddings": {"full_chunk": [1.0] + [0.0] * 127},
"access_control_list": {},
"document_sets": {},
"image_file_name": None,
"user_project": [],
"personas": [],
"boost": 0.0,
"aggregated_chunk_boost_factor": 0.0,
"primary_owners": [],
"secondary_owners": [],
}
}
schema_ready = False
probe_url = (
f"http://localhost:8081/document/v1/default/{test_index_name}/docid/__probe__"
)
for _ in range(60):
resp = httpx_client.post(probe_url, json=probe_doc)
if resp.status_code == 200:
schema_ready = True
# Clean up the probe document.
httpx_client.delete(probe_url)
break
time.sleep(1)
if not schema_ready:
pytest.fail(f"Vespa schema '{test_index_name}' did not become ready in time.")
yield vespa_index # Test runs here.
# TODO(ENG-3765)(andrei): Explicitly cleanup index. Not immediately
# pressing; in CI we should be using fresh instances of dependencies each
# time anyway.
@pytest.fixture(scope="module")
def opensearch_document_index(
opensearch_available: None, # noqa: ARG001
tenant_context: None, # noqa: ARG001
test_index_name: str,
) -> Generator[OpenSearchOldDocumentIndex, None, None]:
opensearch_index = OpenSearchOldDocumentIndex(
index_name=test_index_name,
embedding_dim=128,
embedding_precision=EmbeddingPrecision.FLOAT,
secondary_index_name=None,
secondary_embedding_dim=None,
secondary_embedding_precision=None,
large_chunks_enabled=False,
secondary_large_chunks_enabled=None,
multitenant=MULTI_TENANT,
)
opensearch_index.ensure_indices_exist(
primary_embedding_dim=128,
primary_embedding_precision=EmbeddingPrecision.FLOAT,
secondary_index_embedding_dim=None,
secondary_index_embedding_precision=None,
)
yield opensearch_index # Test runs here.
# TODO(ENG-3765)(andrei): Explicitly cleanup index. Not immediately
# pressing; in CI we should be using fresh instances of dependencies each
# time anyway.
@pytest.fixture(scope="module")
def document_indices(
vespa_index: VespaIndex,
opensearch_old_index: OpenSearchOldDocumentIndex,
vespa_document_index: VespaIndex,
opensearch_document_index: OpenSearchOldDocumentIndex,
) -> Generator[list[DocumentIndex], None, None]:
# Ideally these are parametrized; doing so with pytest fixtures is tricky.
yield [opensearch_old_index, vespa_index]
yield [opensearch_document_index, vespa_document_index] # Test runs here.
@pytest.fixture(scope="function")
def chunks(
tenant_context: None, # noqa: ARG001
) -> Generator[list[DocMetadataAwareIndexChunk], None, None]:
yield [make_chunk("test_doc", chunk_id=i) for i in range(5)]
result = []
chunk_count = 5
doc_id = "test_doc"
tenant_id = get_current_tenant_id()
access = DocumentAccess.build(
user_emails=[],
user_groups=[],
external_user_emails=[],
external_user_group_ids=[],
is_public=True,
)
document_sets: set[str] = set()
user_project: list[int] = list()
personas: list[int] = list()
boost = 0
blurb = "blurb"
content = "content"
title_prefix = ""
doc_summary = ""
chunk_context = ""
title_embedding = [1.0] + [0] * 127
# Full 0 vectors are not supported for cos similarity.
embeddings = ChunkEmbedding(
full_embedding=[1.0] + [0] * 127, mini_chunk_embeddings=[]
)
source_document = Document(
id=doc_id,
semantic_identifier="semantic identifier",
source=DocumentSource.FILE,
sections=[],
metadata={},
title="title",
)
metadata_suffix_keyword = ""
image_file_id = None
source_links: dict[int, str] = {0: ""}
ancestor_hierarchy_node_ids: list[int] = []
for i in range(chunk_count):
result.append(
DocMetadataAwareIndexChunk(
tenant_id=tenant_id,
access=access,
document_sets=document_sets,
user_project=user_project,
personas=personas,
boost=boost,
aggregated_chunk_boost_factor=0,
ancestor_hierarchy_node_ids=ancestor_hierarchy_node_ids,
embeddings=embeddings,
title_embedding=title_embedding,
source_document=source_document,
title_prefix=title_prefix,
metadata_suffix_keyword=metadata_suffix_keyword,
metadata_suffix_semantic="",
contextual_rag_reserved_tokens=0,
doc_summary=doc_summary,
chunk_context=chunk_context,
mini_chunk_texts=None,
large_chunk_id=None,
chunk_id=i,
blurb=blurb,
content=content,
source_links=source_links,
image_file_id=image_file_id,
section_continuation=False,
)
)
yield result # Test runs here.
@pytest.fixture(scope="function")
@@ -103,8 +336,8 @@ class TestDocumentIndexOld:
project_persona_filters = IndexFilters(
access_control_list=None,
tenant_id=tenant_id,
project_id_filter=1,
persona_id_filter=2,
project_id=1,
persona_id=2,
# We need this even though none of the chunks belong to a
# document set because project_id and persona_id are only
# additive filters in the event the agent has knowledge scope;
@@ -167,29 +400,3 @@ class TestDocumentIndexOld:
batch_retrieval=True,
)
assert len(inference_chunks) == 0
def test_index_accepts_generator(
self,
document_indices: list[DocumentIndex],
tenant_context: None, # noqa: ARG002
) -> None:
"""index() accepts a generator (any iterable), not just a list."""
for document_index in document_indices:
def chunk_gen() -> Iterator[DocMetadataAwareIndexChunk]:
for i in range(3):
yield make_chunk("test_doc_gen", chunk_id=i)
index_batch_params = IndexBatchParams(
doc_id_to_previous_chunk_cnt={"test_doc_gen": 0},
doc_id_to_new_chunk_cnt={"test_doc_gen": 3},
tenant_id=get_current_tenant_id(),
large_chunks_enabled=False,
)
results = document_index.index(chunk_gen(), index_batch_params)
assert len(results) == 1
record = results.pop()
assert record.document_id == "test_doc_gen"
assert record.already_existed is False

View File

@@ -52,7 +52,7 @@ def _create_test_persona_with_mcp_tool(
document_sets=[],
users=[user],
groups=[],
is_listed=True,
is_visible=True,
is_public=True,
display_priority=None,
starter_messages=None,
@@ -368,10 +368,9 @@ class TestMCPPassThroughOAuth:
def mock_call_mcp_tool(
server_url: str, # noqa: ARG001
tool_name: str, # noqa: ARG001
arguments: dict[str, Any], # noqa: ARG001
kwargs: dict[str, Any], # noqa: ARG001
connection_headers: dict[str, str],
transport: MCPTransport, # noqa: ARG001
auth: Any = None, # noqa: ARG001
) -> dict[str, Any]:
captured_headers.update(connection_headers)
return mocked_response

View File

@@ -62,7 +62,7 @@ def _create_test_persona(db_session: Session, user: User, tools: list[Tool]) ->
document_sets=[],
users=[user],
groups=[],
is_listed=True,
is_visible=True,
is_public=True,
display_priority=None,
starter_messages=None,

View File

@@ -53,7 +53,7 @@ class PersonaManager:
label_ids=label_ids or [],
user_file_ids=user_file_ids or [],
display_priority=display_priority,
is_featured=featured,
featured=featured,
)
response = requests.post(
@@ -79,7 +79,7 @@ class PersonaManager:
users=users or [],
groups=groups or [],
label_ids=label_ids or [],
is_featured=featured,
featured=featured,
)
@staticmethod
@@ -122,7 +122,7 @@ class PersonaManager:
users=[UUID(user) for user in (users or persona.users)],
groups=groups or persona.groups,
label_ids=label_ids or persona.label_ids,
is_featured=featured if featured is not None else persona.is_featured,
featured=featured if featured is not None else persona.featured,
)
response = requests.patch(
@@ -152,7 +152,7 @@ class PersonaManager:
users=[user["email"] for user in updated_persona_data["users"]],
groups=updated_persona_data["groups"],
label_ids=[label["id"] for label in updated_persona_data["labels"]],
is_featured=updated_persona_data["is_featured"],
featured=updated_persona_data["featured"],
)
@staticmethod
@@ -205,13 +205,9 @@ class PersonaManager:
mismatches.append(
("is_public", persona.is_public, fetched_persona.is_public)
)
if fetched_persona.is_featured != persona.is_featured:
if fetched_persona.featured != persona.featured:
mismatches.append(
(
"is_featured",
persona.is_featured,
fetched_persona.is_featured,
)
("featured", persona.featured, fetched_persona.featured)
)
if (
fetched_persona.llm_model_provider_override

View File

@@ -169,7 +169,7 @@ class DATestPersona(BaseModel):
users: list[str]
groups: list[int]
label_ids: list[int]
is_featured: bool = False
featured: bool = False
# Embedded prompt fields (no longer separate prompt_ids)
system_prompt: str | None = None

View File

@@ -1,237 +0,0 @@
"""
Integration tests for the "Last Indexed" time displayed on both the
per-connector detail page and the all-connectors listing page.
Expected behavior: "Last Indexed" = time_started of the most recent
successful index attempt for the cc pair, regardless of pagination.
Edge cases:
1. First page of index attempts is entirely errors — last_indexed should
still reflect the older successful attempt beyond page 1.
2. Credential swap — successful attempts, then failures after a
"credential change"; last_indexed should reflect the most recent
successful attempt.
3. Mix of statuses — only the most recent successful attempt matters.
4. COMPLETED_WITH_ERRORS counts as a success for last_indexed purposes.
"""
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from onyx.db.models import IndexingStatus
from onyx.server.documents.models import CCPairFullInfo
from onyx.server.documents.models import ConnectorIndexingStatusLite
from tests.integration.common_utils.managers.cc_pair import CCPairManager
from tests.integration.common_utils.managers.connector import ConnectorManager
from tests.integration.common_utils.managers.credential import CredentialManager
from tests.integration.common_utils.managers.index_attempt import IndexAttemptManager
from tests.integration.common_utils.managers.user import UserManager
from tests.integration.common_utils.test_models import DATestCCPair
from tests.integration.common_utils.test_models import DATestUser
def _wait_for_real_success(
cc_pair: DATestCCPair,
admin: DATestUser,
) -> None:
"""Wait for the initial index attempt to complete successfully."""
CCPairManager.wait_for_indexing_completion(
cc_pair,
after=datetime(2000, 1, 1, tzinfo=timezone.utc),
user_performing_action=admin,
timeout=120,
)
def _get_detail(cc_pair_id: int, admin: DATestUser) -> CCPairFullInfo:
result = CCPairManager.get_single(cc_pair_id, admin)
assert result is not None
return result
def _get_listing(cc_pair_id: int, admin: DATestUser) -> ConnectorIndexingStatusLite:
result = CCPairManager.get_indexing_status_by_id(cc_pair_id, admin)
assert result is not None
return result
def test_last_indexed_first_page_all_errors(reset: None) -> None: # noqa: ARG001
"""When the first page of index attempts is entirely errors but an
older successful attempt exists, both the detail page and the listing
page should still show the time of that successful attempt.
The detail page UI uses page size 8. We insert 10 failed attempts
more recent than the initial success to push the success off page 1.
"""
admin = UserManager.create(name="admin_first_page_errors")
cc_pair = CCPairManager.create_from_scratch(user_performing_action=admin)
_wait_for_real_success(cc_pair, admin)
# Baseline: last_success should be set from the initial successful run
listing_before = _get_listing(cc_pair.id, admin)
assert listing_before.last_success is not None
# 10 recent failures push the success off page 1
IndexAttemptManager.create_test_index_attempts(
num_attempts=10,
cc_pair_id=cc_pair.id,
status=IndexingStatus.FAILED,
error_msg="simulated failure",
base_time=datetime.now(tz=timezone.utc),
)
detail = _get_detail(cc_pair.id, admin)
listing = _get_listing(cc_pair.id, admin)
assert (
detail.last_indexed is not None
), "Detail page last_indexed is None even though a successful attempt exists"
assert (
listing.last_success is not None
), "Listing page last_success is None even though a successful attempt exists"
# Both surfaces must agree
assert detail.last_indexed == listing.last_success, (
f"Detail last_indexed={detail.last_indexed} != "
f"listing last_success={listing.last_success}"
)
def test_last_indexed_credential_swap_scenario(reset: None) -> None: # noqa: ARG001
"""Perform an actual credential swap: create connector + cred1 (cc_pair_1),
wait for success, then associate a new cred2 with the same connector
(cc_pair_2), wait for that to succeed, and inject failures on cc_pair_2.
cc_pair_2's last_indexed must reflect cc_pair_2's own success, not
cc_pair_1's older one. Both the detail page and listing page must agree.
"""
admin = UserManager.create(name="admin_cred_swap")
connector = ConnectorManager.create(user_performing_action=admin)
cred1 = CredentialManager.create(user_performing_action=admin)
cc_pair_1 = CCPairManager.create(
connector_id=connector.id,
credential_id=cred1.id,
user_performing_action=admin,
)
_wait_for_real_success(cc_pair_1, admin)
cred2 = CredentialManager.create(user_performing_action=admin, name="swapped-cred")
cc_pair_2 = CCPairManager.create(
connector_id=connector.id,
credential_id=cred2.id,
user_performing_action=admin,
)
_wait_for_real_success(cc_pair_2, admin)
listing_after_swap = _get_listing(cc_pair_2.id, admin)
assert listing_after_swap.last_success is not None
IndexAttemptManager.create_test_index_attempts(
num_attempts=10,
cc_pair_id=cc_pair_2.id,
status=IndexingStatus.FAILED,
error_msg="credential expired",
base_time=datetime.now(tz=timezone.utc),
)
detail = _get_detail(cc_pair_2.id, admin)
listing = _get_listing(cc_pair_2.id, admin)
assert detail.last_indexed is not None
assert listing.last_success is not None
assert detail.last_indexed == listing.last_success, (
f"Detail last_indexed={detail.last_indexed} != "
f"listing last_success={listing.last_success}"
)
def test_last_indexed_mixed_statuses(reset: None) -> None: # noqa: ARG001
"""Mix of in_progress, failed, and successful attempts. Only the most
recent successful attempt's time matters."""
admin = UserManager.create(name="admin_mixed")
cc_pair = CCPairManager.create_from_scratch(user_performing_action=admin)
_wait_for_real_success(cc_pair, admin)
now = datetime.now(tz=timezone.utc)
# Success 5 hours ago
IndexAttemptManager.create_test_index_attempts(
num_attempts=1,
cc_pair_id=cc_pair.id,
status=IndexingStatus.SUCCESS,
base_time=now - timedelta(hours=5),
)
# Failures 3 hours ago
IndexAttemptManager.create_test_index_attempts(
num_attempts=3,
cc_pair_id=cc_pair.id,
status=IndexingStatus.FAILED,
error_msg="transient failure",
base_time=now - timedelta(hours=3),
)
# In-progress 1 hour ago
IndexAttemptManager.create_test_index_attempts(
num_attempts=1,
cc_pair_id=cc_pair.id,
status=IndexingStatus.IN_PROGRESS,
base_time=now - timedelta(hours=1),
)
detail = _get_detail(cc_pair.id, admin)
listing = _get_listing(cc_pair.id, admin)
assert detail.last_indexed is not None
assert listing.last_success is not None
assert detail.last_indexed == listing.last_success, (
f"Detail last_indexed={detail.last_indexed} != "
f"listing last_success={listing.last_success}"
)
def test_last_indexed_completed_with_errors(reset: None) -> None: # noqa: ARG001
"""COMPLETED_WITH_ERRORS is treated as a successful attempt (matching
IndexingStatus.is_successful()). When it is the most recent "success"
and later attempts all failed, both surfaces should reflect its time."""
admin = UserManager.create(name="admin_completed_errors")
cc_pair = CCPairManager.create_from_scratch(user_performing_action=admin)
_wait_for_real_success(cc_pair, admin)
now = datetime.now(tz=timezone.utc)
# COMPLETED_WITH_ERRORS 2 hours ago
IndexAttemptManager.create_test_index_attempts(
num_attempts=1,
cc_pair_id=cc_pair.id,
status=IndexingStatus.COMPLETED_WITH_ERRORS,
base_time=now - timedelta(hours=2),
)
# 10 failures after — push everything else off page 1
IndexAttemptManager.create_test_index_attempts(
num_attempts=10,
cc_pair_id=cc_pair.id,
status=IndexingStatus.FAILED,
error_msg="post-partial failure",
base_time=now,
)
detail = _get_detail(cc_pair.id, admin)
listing = _get_listing(cc_pair.id, admin)
assert (
detail.last_indexed is not None
), "COMPLETED_WITH_ERRORS should count as a success for last_indexed"
assert (
listing.last_success is not None
), "COMPLETED_WITH_ERRORS should count as a success for last_success"
assert detail.last_indexed == listing.last_success, (
f"Detail last_indexed={detail.last_indexed} != "
f"listing last_success={listing.last_success}"
)

View File

@@ -35,8 +35,8 @@ def _create_test_persona(db_session: Session, persona_id: int, name: str) -> Per
id=persona_id,
name=name,
description="Test persona for Discord bot tests",
is_listed=True,
is_featured=False,
is_visible=True,
featured=False,
deleted=False,
builtin_persona=False,
)

View File

@@ -25,7 +25,7 @@ def test_cold_startup_default_assistant() -> None:
result = db_session.execute(
text(
"""
SELECT id, name, builtin_persona, is_featured, deleted
SELECT id, name, builtin_persona, featured, deleted
FROM persona
WHERE builtin_persona = true
ORDER BY id
@@ -40,7 +40,7 @@ def test_cold_startup_default_assistant() -> None:
assert default[0] == 0, "Default assistant should have ID 0"
assert default[1] == "Assistant", "Should be named 'Assistant'"
assert default[2] is True, "Should be builtin"
assert default[3] is True, "Should be is_featured"
assert default[3] is True, "Should be featured"
assert default[4] is False, "Should not be deleted"
# Check tools are properly associated

View File

@@ -7,7 +7,6 @@ import json
import pytest
from sqlalchemy import text
from onyx.configs.constants import ANONYMOUS_USER_UUID
from onyx.configs.constants import DEFAULT_BOOST
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from tests.integration.common_utils.reset import downgrade_postgres
@@ -238,6 +237,7 @@ def test_jira_connector_migration() -> None:
upgrade_postgres(
database="postgres", config_name="alembic", revision="da42808081e3"
)
# Verify the upgrade was applied correctly
with get_session_with_current_tenant() as db_session:
results = db_session.execute(
@@ -322,165 +322,3 @@ def test_jira_connector_migration() -> None:
== "https://example.atlassian.net/projects/TEST"
)
assert config_2["batch_size"] == 50
def test_anonymous_user_migration_dedupes_null_notifications() -> None:
downgrade_postgres(
database="postgres", config_name="alembic", revision="base", clear_data=True
)
upgrade_postgres(
database="postgres",
config_name="alembic",
revision="f7ca3e2f45d9",
)
with get_session_with_current_tenant() as db_session:
db_session.execute(
text(
"""
INSERT INTO notification (
id,
notif_type,
user_id,
dismissed,
last_shown,
first_shown,
title,
description,
additional_data
)
VALUES
(
1,
'RELEASE_NOTES',
NULL,
FALSE,
NOW(),
NOW(),
'Onyx v2.10.0 is available!',
'Check out what''s new in v2.10.0',
'{"version":"v2.10.0","link":"https://docs.onyx.app/changelog#v2-10-0"}'::jsonb
),
(
2,
'RELEASE_NOTES',
NULL,
FALSE,
NOW(),
NOW(),
'Onyx v2.10.0 is available!',
'Check out what''s new in v2.10.0',
'{"version":"v2.10.0","link":"https://docs.onyx.app/changelog#v2-10-0"}'::jsonb
)
"""
)
)
db_session.commit()
upgrade_postgres(
database="postgres", config_name="alembic", revision="e7f8a9b0c1d2"
)
with get_session_with_current_tenant() as db_session:
notifications = db_session.execute(
text(
"""
SELECT id, user_id
FROM notification
ORDER BY id
"""
)
).fetchall()
anonymous_user = db_session.execute(
text(
"""
SELECT id, email, role
FROM "user"
WHERE id = :user_id
"""
),
{"user_id": ANONYMOUS_USER_UUID},
).fetchone()
assert len(notifications) == 1
assert notifications[0].id == 2 # Higher id wins when timestamps are equal
assert str(notifications[0].user_id) == ANONYMOUS_USER_UUID
assert anonymous_user is not None
assert anonymous_user.email == "anonymous@onyx.app"
assert anonymous_user.role == "LIMITED"
def test_anonymous_user_migration_collision_with_existing_anonymous_notification() -> (
None
):
"""Test that a NULL-owned notification that collides with an already-existing
anonymous-owned notification is removed during migration."""
downgrade_postgres(
database="postgres", config_name="alembic", revision="base", clear_data=True
)
upgrade_postgres(
database="postgres",
config_name="alembic",
revision="f7ca3e2f45d9",
)
with get_session_with_current_tenant() as db_session:
# Create the anonymous user early so we can insert a notification owned by it
db_session.execute(
text(
"""
INSERT INTO "user" (id, email, hashed_password, is_active, is_superuser, is_verified, role)
VALUES (:id, 'anonymous@onyx.app', '', TRUE, FALSE, TRUE, 'LIMITED')
ON CONFLICT (id) DO NOTHING
"""
),
{"id": ANONYMOUS_USER_UUID},
)
# Insert an anonymous-owned notification (already migrated in a prior partial run)
db_session.execute(
text(
"""
INSERT INTO notification (
id, notif_type, user_id, dismissed, last_shown, first_shown,
title, description, additional_data
)
VALUES
(
1, 'RELEASE_NOTES', :user_id, FALSE, NOW(), NOW(),
'Onyx v2.10.0 is available!',
'Check out what''s new in v2.10.0',
'{"version":"v2.10.0","link":"https://docs.onyx.app/changelog#v2-10-0"}'::jsonb
),
(
2, 'RELEASE_NOTES', NULL, FALSE, NOW(), NOW(),
'Onyx v2.10.0 is available!',
'Check out what''s new in v2.10.0',
'{"version":"v2.10.0","link":"https://docs.onyx.app/changelog#v2-10-0"}'::jsonb
)
"""
),
{"user_id": ANONYMOUS_USER_UUID},
)
db_session.commit()
upgrade_postgres(
database="postgres", config_name="alembic", revision="e7f8a9b0c1d2"
)
with get_session_with_current_tenant() as db_session:
notifications = db_session.execute(
text(
"""
SELECT id, user_id
FROM notification
ORDER BY id
"""
)
).fetchall()
# Only the original anonymous-owned notification should remain;
# the NULL-owned duplicate should have been deleted
assert len(notifications) == 1
assert notifications[0].id == 1
assert str(notifications[0].user_id) == ANONYMOUS_USER_UUID

View File

@@ -33,8 +33,8 @@ def test_unified_assistant(
"search, web browsing, and image generation"
in unified_assistant.description.lower()
)
assert unified_assistant.is_featured is True
assert unified_assistant.is_listed is True
assert unified_assistant.featured is True
assert unified_assistant.is_visible is True
# Verify tools
tools = unified_assistant.tools

View File

@@ -86,7 +86,7 @@ async def test_get_or_create_user_skips_inactive(
"""Inactive users should not be re-authenticated via JWT."""
monkeypatch.setattr(users_module, "TRACK_EXTERNAL_IDP_EXPIRY", True)
monkeypatch.setattr(users_module, "verify_email_is_invited", lambda _: None)
monkeypatch.setattr(users_module, "verify_email_domain", lambda *_a, **_kw: None)
monkeypatch.setattr(users_module, "verify_email_domain", lambda _: None)
email = "inactive@example.com"
payload: dict[str, Any] = {"email": email}
@@ -126,7 +126,7 @@ async def test_get_or_create_user_handles_race_conditions(
"""If provisioning races, newly inactive users should still be blocked."""
monkeypatch.setattr(users_module, "TRACK_EXTERNAL_IDP_EXPIRY", True)
monkeypatch.setattr(users_module, "verify_email_is_invited", lambda _: None)
monkeypatch.setattr(users_module, "verify_email_domain", lambda *_a, **_kw: None)
monkeypatch.setattr(users_module, "verify_email_domain", lambda _: None)
email = "race@example.com"
payload: dict[str, Any] = {"email": email}
@@ -182,7 +182,7 @@ async def test_get_or_create_user_provisions_new_user(
monkeypatch.setattr(users_module, "TRACK_EXTERNAL_IDP_EXPIRY", False)
monkeypatch.setattr(users_module, "generate_password", lambda: "TempPass123!")
monkeypatch.setattr(users_module, "verify_email_is_invited", lambda _: None)
monkeypatch.setattr(users_module, "verify_email_domain", lambda *_a, **_kw: None)
monkeypatch.setattr(users_module, "verify_email_domain", lambda _: None)
recorded: dict[str, Any] = {}

View File

@@ -15,11 +15,11 @@ from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from fastapi import HTTPException
from onyx.auth.schemas import UserCreate
from onyx.auth.users import UserManager
from onyx.configs.constants import AuthType
from onyx.error_handling.exceptions import OnyxError
# Note: Only async test methods are marked with @pytest.mark.asyncio individually
# to avoid warnings on synchronous tests
@@ -89,11 +89,11 @@ class TestDisposableEmailValidation:
user_manager = UserManager(MagicMock())
# Execute & Assert
with pytest.raises(OnyxError) as exc:
with pytest.raises(HTTPException) as exc:
await user_manager.create(mock_user_create)
assert exc.value.status_code == 400
assert "Disposable email" in exc.value.detail
assert "Disposable email" in str(exc.value.detail)
# Verify we never got to tenant provisioning
mock_fetch_ee.assert_not_called()
@@ -138,9 +138,7 @@ class TestDisposableEmailValidation:
pass # We just want to verify domain check passed
# Verify domain validation was called
mock_verify_domain.assert_called_once_with(
mock_user_create.email, is_registration=True
)
mock_verify_domain.assert_called_once_with(mock_user_create.email)
class TestMultiTenantInviteLogic:
@@ -333,7 +331,7 @@ class TestSAMLOIDCBehavior:
mock_get_invited.return_value = ["allowed@example.com"]
# Execute & Assert
with pytest.raises(OnyxError) as exc:
with pytest.raises(HTTPException) as exc:
verify_email_is_invited("newuser@example.com")
assert exc.value.status_code == 403
@@ -387,7 +385,7 @@ class TestWhitelistBehavior:
mock_get_invited.return_value = ["allowed@example.com"]
# Execute & Assert
with pytest.raises(OnyxError) as exc:
with pytest.raises(HTTPException) as exc:
verify_email_is_invited("notallowed@example.com")
assert exc.value.status_code == 403
@@ -422,7 +420,7 @@ class TestSeatLimitEnforcement:
"onyx.auth.users.fetch_ee_implementation_or_noop",
return_value=lambda *_a, **_kw: seat_result,
):
with pytest.raises(OnyxError) as exc:
with pytest.raises(HTTPException) as exc:
enforce_seat_limit(MagicMock())
assert exc.value.status_code == 402
@@ -492,9 +490,7 @@ class TestCaseInsensitiveEmailMatching:
pass
# Verify flow
mock_verify_domain.assert_called_once_with(
user_create.email, is_registration=True
)
mock_verify_domain.assert_called_once_with(user_create.email)
@patch("onyx.auth.users.is_disposable_email")
@patch("onyx.auth.users.verify_email_domain")
@@ -544,7 +540,5 @@ class TestCaseInsensitiveEmailMatching:
pass
# Verify flow
mock_verify_domain.assert_called_once_with(
mock_user_create.email, is_registration=True
)
mock_verify_domain.assert_called_once_with(mock_user_create.email)
mock_verify_invited.assert_called_once() # Existing tenant = invite needed

View File

@@ -1,9 +1,9 @@
import pytest
from fastapi import HTTPException
import onyx.auth.users as users
from onyx.auth.users import verify_email_domain
from onyx.configs.constants import AuthType
from onyx.error_handling.exceptions import OnyxError
def test_verify_email_domain_allows_case_insensitive_match(
@@ -21,7 +21,7 @@ def test_verify_email_domain_rejects_non_whitelisted_domain(
) -> None:
monkeypatch.setattr(users, "VALID_EMAIL_DOMAINS", ["example.com"], raising=False)
with pytest.raises(OnyxError) as exc:
with pytest.raises(HTTPException) as exc:
verify_email_domain("user@another.com")
assert exc.value.status_code == 400
assert "Email domain is not valid" in exc.value.detail
@@ -32,7 +32,7 @@ def test_verify_email_domain_invalid_email_format(
) -> None:
monkeypatch.setattr(users, "VALID_EMAIL_DOMAINS", ["example.com"], raising=False)
with pytest.raises(OnyxError) as exc:
with pytest.raises(HTTPException) as exc:
verify_email_domain("userexample.com") # missing '@'
assert exc.value.status_code == 400
assert "Email is not valid" in exc.value.detail
@@ -44,10 +44,10 @@ def test_verify_email_domain_rejects_plus_addressing(
monkeypatch.setattr(users, "VALID_EMAIL_DOMAINS", [], raising=False)
monkeypatch.setattr(users, "AUTH_TYPE", AuthType.CLOUD, raising=False)
with pytest.raises(OnyxError) as exc:
with pytest.raises(HTTPException) as exc:
verify_email_domain("user+tag@gmail.com")
assert exc.value.status_code == 400
assert "'+'" in exc.value.detail
assert "'+'" in str(exc.value.detail)
def test_verify_email_domain_allows_plus_for_onyx_app(
@@ -60,53 +60,13 @@ def test_verify_email_domain_allows_plus_for_onyx_app(
verify_email_domain("user+tag@onyx.app")
def test_verify_email_domain_rejects_dotted_gmail_on_registration(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(users, "VALID_EMAIL_DOMAINS", [], raising=False)
monkeypatch.setattr(users, "AUTH_TYPE", AuthType.CLOUD, raising=False)
with pytest.raises(OnyxError) as exc:
verify_email_domain("first.last@gmail.com", is_registration=True)
assert exc.value.status_code == 400
assert "'.'" in exc.value.detail
def test_verify_email_domain_dotted_gmail_allowed_when_not_registration(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(users, "VALID_EMAIL_DOMAINS", [], raising=False)
monkeypatch.setattr(users, "AUTH_TYPE", AuthType.CLOUD, raising=False)
# Existing user signing in — should not be blocked
verify_email_domain("first.last@gmail.com", is_registration=False)
def test_verify_email_domain_allows_dotted_non_gmail_on_registration(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(users, "VALID_EMAIL_DOMAINS", [], raising=False)
monkeypatch.setattr(users, "AUTH_TYPE", AuthType.CLOUD, raising=False)
verify_email_domain("first.last@example.com", is_registration=True)
def test_verify_email_domain_dotted_gmail_allowed_when_not_cloud(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(users, "VALID_EMAIL_DOMAINS", [], raising=False)
monkeypatch.setattr(users, "AUTH_TYPE", AuthType.BASIC, raising=False)
verify_email_domain("first.last@gmail.com", is_registration=True)
def test_verify_email_domain_rejects_googlemail(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(users, "VALID_EMAIL_DOMAINS", [], raising=False)
monkeypatch.setattr(users, "AUTH_TYPE", AuthType.CLOUD, raising=False)
with pytest.raises(OnyxError) as exc:
with pytest.raises(HTTPException) as exc:
verify_email_domain("user@googlemail.com")
assert exc.value.status_code == 400
assert "gmail.com" in exc.value.detail
assert "gmail.com" in str(exc.value.detail)

View File

@@ -1,9 +1,9 @@
import pytest
from fastapi import HTTPException
import onyx.auth.users as users
from onyx.auth.users import verify_email_is_invited
from onyx.configs.constants import AuthType
from onyx.error_handling.exceptions import OnyxError
@pytest.mark.parametrize("auth_type", [AuthType.SAML, AuthType.OIDC])
@@ -35,7 +35,7 @@ def test_verify_email_is_invited_enforced_for_basic_auth(
raising=False,
)
with pytest.raises(OnyxError) as exc:
with pytest.raises(HTTPException) as exc:
verify_email_is_invited("newuser@example.com")
assert exc.value.status_code == 403

View File

@@ -324,7 +324,7 @@ class TestExtractContextFiles:
class TestSearchFilterDetermination:
"""Verify that determine_search_params correctly resolves
project_id_filter, persona_id_filter, and search_usage based on
search_project_id, search_persona_id, and search_usage based on
the extraction result and the precedence rule.
"""
@@ -353,8 +353,8 @@ class TestSearchFilterDetermination:
uncapped_token_count=100,
),
)
assert result.project_id_filter is None
assert result.persona_id_filter is None
assert result.search_project_id is None
assert result.search_persona_id is None
assert result.search_usage == SearchToolUsage.AUTO
def test_custom_persona_files_overflow_persona_filter(self) -> None:
@@ -364,8 +364,8 @@ class TestSearchFilterDetermination:
project_id=99,
extracted_context_files=self._make_context(use_as_search_filter=True),
)
assert result.persona_id_filter == 42
assert result.project_id_filter is None
assert result.search_persona_id == 42
assert result.search_project_id is None
assert result.search_usage == SearchToolUsage.AUTO
def test_custom_persona_no_files_no_project_leak(self) -> None:
@@ -375,8 +375,8 @@ class TestSearchFilterDetermination:
project_id=99,
extracted_context_files=self._make_context(),
)
assert result.project_id_filter is None
assert result.persona_id_filter is None
assert result.search_project_id is None
assert result.search_persona_id is None
assert result.search_usage == SearchToolUsage.AUTO
def test_default_persona_project_files_fit_disables_search(self) -> None:
@@ -389,7 +389,7 @@ class TestSearchFilterDetermination:
uncapped_token_count=100,
),
)
assert result.project_id_filter is None
assert result.search_project_id is None
assert result.search_usage == SearchToolUsage.DISABLED
def test_default_persona_project_files_overflow_enables_search(self) -> None:
@@ -402,8 +402,8 @@ class TestSearchFilterDetermination:
uncapped_token_count=7000,
),
)
assert result.project_id_filter == 99
assert result.persona_id_filter is None
assert result.search_project_id == 99
assert result.search_persona_id is None
assert result.search_usage == SearchToolUsage.ENABLED
def test_default_persona_no_project_auto(self) -> None:
@@ -413,7 +413,7 @@ class TestSearchFilterDetermination:
project_id=None,
extracted_context_files=self._make_context(),
)
assert result.project_id_filter is None
assert result.search_project_id is None
assert result.search_usage == SearchToolUsage.AUTO
def test_default_persona_project_no_files_disables_search(self) -> None:

View File

@@ -1,226 +0,0 @@
from unittest.mock import MagicMock
from unittest.mock import patch
from onyx.access.models import DocumentAccess
from onyx.configs.constants import DocumentSource
from onyx.connectors.models import Document
from onyx.connectors.models import TextSection
from onyx.document_index.interfaces_new import IndexingMetadata
from onyx.document_index.interfaces_new import TenantState
from onyx.document_index.opensearch.opensearch_document_index import (
OpenSearchDocumentIndex,
)
from onyx.indexing.models import DocMetadataAwareIndexChunk
def _make_chunk(
doc_id: str,
chunk_id: int,
) -> DocMetadataAwareIndexChunk:
"""Creates a minimal DocMetadataAwareIndexChunk for testing."""
doc = Document(
id=doc_id,
sections=[TextSection(text="test", link="http://test.com")],
source=DocumentSource.FILE,
semantic_identifier="test_doc",
metadata={},
)
access = DocumentAccess.build(
user_emails=[],
user_groups=[],
external_user_emails=[],
external_user_group_ids=[],
is_public=True,
)
return DocMetadataAwareIndexChunk(
chunk_id=chunk_id,
blurb="test",
content="test content",
source_links={0: "http://test.com"},
image_file_id=None,
section_continuation=False,
source_document=doc,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
mini_chunk_texts=None,
large_chunk_id=None,
doc_summary="",
chunk_context="",
contextual_rag_reserved_tokens=0,
embeddings={"full_embedding": [0.1] * 10, "mini_chunk_embeddings": []},
title_embedding=[0.1] * 10,
tenant_id="test_tenant",
access=access,
document_sets=set(),
user_project=[],
personas=[],
boost=0,
aggregated_chunk_boost_factor=1.0,
ancestor_hierarchy_node_ids=[],
)
def _make_index() -> OpenSearchDocumentIndex:
"""Creates an OpenSearchDocumentIndex with a mocked client."""
mock_client = MagicMock()
mock_client.bulk_index_documents = MagicMock()
tenant_state = TenantState(tenant_id="test_tenant", multitenant=False)
index = OpenSearchDocumentIndex.__new__(OpenSearchDocumentIndex)
index._index_name = "test_index"
index._client = mock_client
index._tenant_state = tenant_state
return index
def _make_metadata(doc_id: str, chunk_count: int) -> IndexingMetadata:
return IndexingMetadata(
doc_id_to_chunk_cnt_diff={
doc_id: IndexingMetadata.ChunkCounts(
old_chunk_cnt=0,
new_chunk_cnt=chunk_count,
),
},
)
@patch(
"onyx.document_index.opensearch.opensearch_document_index.MAX_CHUNKS_PER_DOC_BATCH",
100,
)
def test_single_doc_under_batch_limit_flushes_once() -> None:
"""A document with fewer chunks than MAX_CHUNKS_PER_DOC_BATCH should flush once."""
index = _make_index()
doc_id = "doc_1"
num_chunks = 50
chunks = [_make_chunk(doc_id, i) for i in range(num_chunks)]
metadata = _make_metadata(doc_id, num_chunks)
with patch.object(index, "delete", return_value=0):
index.index(chunks, metadata)
assert index._client.bulk_index_documents.call_count == 1
batch_arg = index._client.bulk_index_documents.call_args_list[0]
assert len(batch_arg.kwargs["documents"]) == num_chunks
@patch(
"onyx.document_index.opensearch.opensearch_document_index.MAX_CHUNKS_PER_DOC_BATCH",
100,
)
def test_single_doc_over_batch_limit_flushes_multiple_times() -> None:
"""A document with more chunks than MAX_CHUNKS_PER_DOC_BATCH should flush multiple times."""
index = _make_index()
doc_id = "doc_1"
num_chunks = 250
chunks = [_make_chunk(doc_id, i) for i in range(num_chunks)]
metadata = _make_metadata(doc_id, num_chunks)
with patch.object(index, "delete", return_value=0):
index.index(chunks, metadata)
# 250 chunks / 100 per batch = 3 flushes (100 + 100 + 50)
assert index._client.bulk_index_documents.call_count == 3
batch_sizes = [
len(call.kwargs["documents"])
for call in index._client.bulk_index_documents.call_args_list
]
assert batch_sizes == [100, 100, 50]
@patch(
"onyx.document_index.opensearch.opensearch_document_index.MAX_CHUNKS_PER_DOC_BATCH",
100,
)
def test_single_doc_exactly_at_batch_limit() -> None:
"""A document with exactly MAX_CHUNKS_PER_DOC_BATCH chunks should flush once
(the flush happens on the next chunk, not at the boundary)."""
index = _make_index()
doc_id = "doc_1"
num_chunks = 100
chunks = [_make_chunk(doc_id, i) for i in range(num_chunks)]
metadata = _make_metadata(doc_id, num_chunks)
with patch.object(index, "delete", return_value=0):
index.index(chunks, metadata)
# 100 chunks hit the >= check on chunk 101 which doesn't exist,
# so final flush handles all 100
# Actually: the elif fires when len(current_chunks) >= 100, which happens
# when current_chunks has 100 items and the 101st chunk arrives.
# With exactly 100 chunks, the 100th chunk makes len == 99, then appended -> 100.
# No 101st chunk arrives, so the final flush handles all 100.
assert index._client.bulk_index_documents.call_count == 1
@patch(
"onyx.document_index.opensearch.opensearch_document_index.MAX_CHUNKS_PER_DOC_BATCH",
100,
)
def test_single_doc_one_over_batch_limit() -> None:
"""101 chunks for one doc: first 100 flushed when the 101st arrives, then
the 101st is flushed at the end."""
index = _make_index()
doc_id = "doc_1"
num_chunks = 101
chunks = [_make_chunk(doc_id, i) for i in range(num_chunks)]
metadata = _make_metadata(doc_id, num_chunks)
with patch.object(index, "delete", return_value=0):
index.index(chunks, metadata)
assert index._client.bulk_index_documents.call_count == 2
batch_sizes = [
len(call.kwargs["documents"])
for call in index._client.bulk_index_documents.call_args_list
]
assert batch_sizes == [100, 1]
@patch(
"onyx.document_index.opensearch.opensearch_document_index.MAX_CHUNKS_PER_DOC_BATCH",
100,
)
def test_multiple_docs_each_under_limit_flush_per_doc() -> None:
"""Multiple documents each under the batch limit should flush once per document."""
index = _make_index()
chunks = []
for doc_idx in range(3):
doc_id = f"doc_{doc_idx}"
for chunk_idx in range(50):
chunks.append(_make_chunk(doc_id, chunk_idx))
metadata = IndexingMetadata(
doc_id_to_chunk_cnt_diff={
f"doc_{i}": IndexingMetadata.ChunkCounts(old_chunk_cnt=0, new_chunk_cnt=50)
for i in range(3)
},
)
with patch.object(index, "delete", return_value=0):
index.index(chunks, metadata)
# 3 documents = 3 flushes (one per doc boundary + final)
assert index._client.bulk_index_documents.call_count == 3
@patch(
"onyx.document_index.opensearch.opensearch_document_index.MAX_CHUNKS_PER_DOC_BATCH",
100,
)
def test_delete_called_once_per_document() -> None:
"""Even with multiple flushes for a single document, delete should only be
called once per document."""
index = _make_index()
doc_id = "doc_1"
num_chunks = 250
chunks = [_make_chunk(doc_id, i) for i in range(num_chunks)]
metadata = _make_metadata(doc_id, num_chunks)
with patch.object(index, "delete", return_value=0) as mock_delete:
index.index(chunks, metadata)
mock_delete.assert_called_once_with(doc_id, None)

View File

@@ -1,152 +0,0 @@
"""Unit tests for VespaDocumentIndex.index().
These tests mock all external I/O (HTTP calls, thread pools) and verify
the streaming logic, ID cleaning/mapping, and DocumentInsertionRecord
construction.
"""
from unittest.mock import MagicMock
from unittest.mock import patch
from onyx.access.models import DocumentAccess
from onyx.configs.constants import DocumentSource
from onyx.connectors.models import Document
from onyx.connectors.models import TextSection
from onyx.document_index.interfaces import EnrichedDocumentIndexingInfo
from onyx.document_index.interfaces_new import IndexingMetadata
from onyx.document_index.interfaces_new import TenantState
from onyx.document_index.vespa.vespa_document_index import VespaDocumentIndex
from onyx.indexing.models import ChunkEmbedding
from onyx.indexing.models import DocMetadataAwareIndexChunk
from onyx.indexing.models import IndexChunk
def _make_chunk(
doc_id: str,
chunk_id: int = 0,
content: str = "test content",
) -> DocMetadataAwareIndexChunk:
doc = Document(
id=doc_id,
semantic_identifier="test_doc",
sections=[TextSection(text=content, link=None)],
source=DocumentSource.NOT_APPLICABLE,
metadata={},
)
index_chunk = IndexChunk(
chunk_id=chunk_id,
blurb=content[:50],
content=content,
source_links=None,
image_file_id=None,
section_continuation=False,
source_document=doc,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
contextual_rag_reserved_tokens=0,
doc_summary="",
chunk_context="",
mini_chunk_texts=None,
large_chunk_id=None,
embeddings=ChunkEmbedding(
full_embedding=[0.1] * 10,
mini_chunk_embeddings=[],
),
title_embedding=None,
)
access = DocumentAccess.build(
user_emails=[],
user_groups=[],
external_user_emails=[],
external_user_group_ids=[],
is_public=True,
)
return DocMetadataAwareIndexChunk.from_index_chunk(
index_chunk=index_chunk,
access=access,
document_sets=set(),
user_project=[],
personas=[],
boost=0,
aggregated_chunk_boost_factor=1.0,
tenant_id="test_tenant",
)
def _make_indexing_metadata(
doc_ids: list[str],
old_counts: list[int],
new_counts: list[int],
) -> IndexingMetadata:
return IndexingMetadata(
doc_id_to_chunk_cnt_diff={
doc_id: IndexingMetadata.ChunkCounts(
old_chunk_cnt=old,
new_chunk_cnt=new,
)
for doc_id, old, new in zip(doc_ids, old_counts, new_counts)
}
)
def _stub_enrich(
doc_id: str,
old_chunk_cnt: int,
) -> EnrichedDocumentIndexingInfo:
"""Build an EnrichedDocumentIndexingInfo that says 'no chunks to delete'
when old_chunk_cnt == 0, or 'has existing chunks' otherwise."""
return EnrichedDocumentIndexingInfo(
doc_id=doc_id,
chunk_start_index=0,
old_version=False,
chunk_end_index=old_chunk_cnt,
)
@patch("onyx.document_index.vespa.vespa_document_index.batch_index_vespa_chunks")
@patch("onyx.document_index.vespa.vespa_document_index.delete_vespa_chunks")
@patch(
"onyx.document_index.vespa.vespa_document_index.get_document_chunk_ids",
return_value=[],
)
@patch("onyx.document_index.vespa.vespa_document_index._enrich_basic_chunk_info")
@patch(
"onyx.document_index.vespa.vespa_document_index.BATCH_SIZE",
3,
)
def test_index_respects_batch_size(
mock_enrich: MagicMock,
mock_get_chunk_ids: MagicMock, # noqa: ARG001
mock_delete: MagicMock, # noqa: ARG001
mock_batch_index: MagicMock,
) -> None:
"""When chunks exceed BATCH_SIZE, batch_index_vespa_chunks is called
multiple times with correctly sized batches."""
mock_enrich.return_value = _stub_enrich("doc1", old_chunk_cnt=0)
index = VespaDocumentIndex(
index_name="test_index",
tenant_state=TenantState(tenant_id="test_tenant", multitenant=False),
large_chunks_enabled=False,
httpx_client=MagicMock(),
)
chunks = [_make_chunk("doc1", chunk_id=i) for i in range(7)]
metadata = _make_indexing_metadata(["doc1"], old_counts=[0], new_counts=[7])
results = index.index(chunks=chunks, indexing_metadata=metadata)
assert len(results) == 1
# With BATCH_SIZE=3 and 7 chunks: batches of 3, 3, 1
assert mock_batch_index.call_count == 3
batch_sizes = [len(c.kwargs["chunks"]) for c in mock_batch_index.call_args_list]
assert batch_sizes == [3, 3, 1]
# Verify all chunks are accounted for and in order
all_indexed = [
chunk for c in mock_batch_index.call_args_list for chunk in c.kwargs["chunks"]
]
assert len(all_indexed) == 7
assert [c.chunk_id for c in all_indexed] == list(range(7))

View File

@@ -116,7 +116,7 @@ def _run_adapter_build(
project_ids_map: dict[str, list[int]],
persona_ids_map: dict[str, list[int]],
) -> list[DocMetadataAwareIndexChunk]:
"""Helper that runs UserFileIndexingAdapter.prepare_enrichment + enrich_chunk
"""Helper that runs UserFileIndexingAdapter.build_metadata_aware_chunks
with all external dependencies mocked."""
from onyx.indexing.adapters.user_file_indexing_adapter import (
UserFileIndexingAdapter,
@@ -155,16 +155,18 @@ def _run_adapter_build(
side_effect=Exception("no LLM in tests"),
),
):
enricher = adapter.prepare_enrichment(
context=context,
result = adapter.build_metadata_aware_chunks(
chunks_with_embeddings=[chunk],
chunk_content_scores=[1.0],
tenant_id="test_tenant",
chunks=[chunk],
context=context,
)
return [enricher.enrich_chunk(chunk, 1.0)]
return result.chunks
def test_prepare_enrichment_includes_persona_ids() -> None:
"""UserFileIndexingAdapter.prepare_enrichment writes persona IDs
def test_build_metadata_aware_chunks_includes_persona_ids() -> None:
"""UserFileIndexingAdapter.build_metadata_aware_chunks writes persona IDs
fetched from the DB into each chunk's metadata."""
file_id = str(uuid4())
persona_ids = [5, 12]
@@ -181,7 +183,7 @@ def test_prepare_enrichment_includes_persona_ids() -> None:
assert chunks[0].user_project == project_ids
def test_prepare_enrichment_missing_file_defaults_to_empty() -> None:
def test_build_metadata_aware_chunks_missing_file_defaults_to_empty() -> None:
"""When a file has no persona or project associations in the DB, the
adapter should default to empty lists (not KeyError or None)."""
file_id = str(uuid4())

View File

@@ -8,7 +8,6 @@ This directory contains Terraform modules to provision the core AWS infrastructu
- `postgres`: Creates an Amazon RDS for PostgreSQL instance and returns a connection URL
- `redis`: Creates an ElastiCache for Redis replication group
- `s3`: Creates an S3 bucket and locks access to a provided S3 VPC endpoint
- `opensearch`: Creates an Amazon OpenSearch domain for managed search workloads
- `onyx`: A higher-level composition that wires the above modules together for a complete, opinionated stack
Use the `onyx` module if you want a working EKS + Postgres + Redis + S3 stack with sane defaults. Use the individual modules if you need more granular control.
@@ -129,7 +128,6 @@ Inputs (common):
- `postgres_username`, `postgres_password`
- `create_vpc` (default true) or existing VPC details and `s3_vpc_endpoint_id`
- WAF controls such as `waf_allowed_ip_cidrs`, `waf_common_rule_set_count_rules`, rate limits, geo restrictions, and logging retention
- Optional OpenSearch controls such as `enable_opensearch`, sizing, credentials, and log retention
### `vpc`
- Builds a VPC sized for EKS with multiple private and public subnets
@@ -161,11 +159,6 @@ Key inputs include:
### `s3`
- Creates an S3 bucket for file storage and scopes access to the provided S3 gateway VPC endpoint
### `opensearch`
- Creates an Amazon OpenSearch domain inside the VPC
- Supports custom subnets, security groups, fine-grained access control, encryption, and CloudWatch log publishing
- Outputs domain endpoints, ARN, and the managed security group ID when it creates one
## Installing the Onyx Helm chart (after Terraform)
Once the cluster is active, deploy application workloads via Helm. You can use the chart in `deployment/helm/charts/onyx`.

View File

@@ -1,13 +1,12 @@
locals {
workspace = terraform.workspace
name = var.name
merged_tags = merge(var.tags, { tenant = local.name, environment = local.workspace })
vpc_name = "${var.name}-vpc-${local.workspace}"
cluster_name = "${var.name}-${local.workspace}"
bucket_name = "${var.name}-file-store-${local.workspace}"
redis_name = "${var.name}-redis-${local.workspace}"
postgres_name = "${var.name}-postgres-${local.workspace}"
opensearch_name = var.opensearch_domain_name != null ? var.opensearch_domain_name : "${var.name}-opensearch-${local.workspace}"
workspace = terraform.workspace
name = var.name
merged_tags = merge(var.tags, { tenant = local.name, environment = local.workspace })
vpc_name = "${var.name}-vpc-${local.workspace}"
cluster_name = "${var.name}-${local.workspace}"
bucket_name = "${var.name}-file-store-${local.workspace}"
redis_name = "${var.name}-redis-${local.workspace}"
postgres_name = "${var.name}-postgres-${local.workspace}"
vpc_id = var.create_vpc ? module.vpc[0].vpc_id : var.vpc_id
private_subnets = var.create_vpc ? module.vpc[0].private_subnets : var.private_subnets
@@ -97,38 +96,3 @@ module "waf" {
enable_logging = var.waf_enable_logging
log_retention_days = var.waf_log_retention_days
}
module "opensearch" {
source = "../opensearch"
count = var.enable_opensearch ? 1 : 0
name = local.opensearch_name
vpc_id = local.vpc_id
# Prefer setting subnet_ids explicitly if the state of private_subnets is
# unclear.
subnet_ids = length(var.opensearch_subnet_ids) > 0 ? var.opensearch_subnet_ids : slice(local.private_subnets, 0, 3)
ingress_cidrs = [local.vpc_cidr_block]
tags = local.merged_tags
# Reuse EKS security groups
security_group_ids = [module.eks.node_security_group_id, module.eks.cluster_security_group_id]
# Configuration
engine_version = var.opensearch_engine_version
instance_type = var.opensearch_instance_type
instance_count = var.opensearch_instance_count
dedicated_master_enabled = var.opensearch_dedicated_master_enabled
dedicated_master_type = var.opensearch_dedicated_master_type
multi_az_with_standby_enabled = var.opensearch_multi_az_with_standby_enabled
ebs_volume_size = var.opensearch_ebs_volume_size
ebs_throughput = var.opensearch_ebs_throughput
# Authentication
internal_user_database_enabled = var.opensearch_internal_user_database_enabled
master_user_name = var.opensearch_master_user_name
master_user_password = var.opensearch_master_user_password
# Logging
enable_logging = var.opensearch_enable_logging
log_retention_days = var.opensearch_log_retention_days
}

View File

@@ -32,18 +32,3 @@ output "postgres_dbi_resource_id" {
description = "RDS DB instance resource id"
value = module.postgres.dbi_resource_id
}
output "opensearch_endpoint" {
description = "OpenSearch domain endpoint"
value = var.enable_opensearch ? module.opensearch[0].domain_endpoint : null
}
output "opensearch_dashboard_endpoint" {
description = "OpenSearch Dashboards endpoint"
value = var.enable_opensearch ? module.opensearch[0].kibana_endpoint : null
}
output "opensearch_domain_arn" {
description = "OpenSearch domain ARN"
value = var.enable_opensearch ? module.opensearch[0].domain_arn : null
}

View File

@@ -152,101 +152,3 @@ variable "waf_log_retention_days" {
description = "Number of days to retain WAF logs"
default = 90
}
# OpenSearch Configuration Variables
variable "enable_opensearch" {
type = bool
description = "Whether to create an OpenSearch domain"
default = false
}
variable "opensearch_engine_version" {
type = string
description = "OpenSearch engine version"
default = "3.3"
}
variable "opensearch_instance_type" {
type = string
description = "Instance type for OpenSearch data nodes"
default = "r8g.large.search"
}
variable "opensearch_instance_count" {
type = number
description = "Number of OpenSearch data nodes"
default = 3
}
variable "opensearch_dedicated_master_enabled" {
type = bool
description = "Whether to enable dedicated master nodes for OpenSearch"
default = true
}
variable "opensearch_dedicated_master_type" {
type = string
description = "Instance type for dedicated master nodes"
default = "m7g.large.search"
}
variable "opensearch_multi_az_with_standby_enabled" {
type = bool
description = "Whether to enable Multi-AZ with Standby deployment"
default = true
}
variable "opensearch_ebs_volume_size" {
type = number
description = "EBS volume size in GiB per OpenSearch node"
default = 512
}
variable "opensearch_ebs_throughput" {
type = number
description = "Throughput in MiB/s for gp3 volumes"
default = 256
}
variable "opensearch_internal_user_database_enabled" {
type = bool
description = "Whether to enable the internal user database for fine-grained access control"
default = true
}
variable "opensearch_master_user_name" {
type = string
description = "Master user name for OpenSearch internal user database"
default = null
sensitive = true
}
variable "opensearch_master_user_password" {
type = string
description = "Master user password for OpenSearch internal user database"
default = null
sensitive = true
}
variable "opensearch_domain_name" {
type = string
description = "Override the OpenSearch domain name. If null, defaults to {name}-opensearch-{workspace}."
default = null
}
variable "opensearch_enable_logging" {
type = bool
default = false
}
variable "opensearch_log_retention_days" {
type = number
description = "Number of days to retain OpenSearch CloudWatch logs (0 = never expire)"
default = 0
}
variable "opensearch_subnet_ids" {
type = list(string)
description = "Subnet IDs for OpenSearch. If empty, uses first 3 private subnets."
default = []
}

View File

@@ -1,229 +0,0 @@
# OpenSearch domain security group
resource "aws_security_group" "opensearch_sg" {
count = length(var.security_group_ids) > 0 ? 0 : 1
name = "${var.name}-sg"
description = "Allow inbound traffic to OpenSearch from VPC"
vpc_id = var.vpc_id
tags = var.tags
ingress {
from_port = 443
to_port = 443
protocol = "tcp"
cidr_blocks = var.ingress_cidrs
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
# Service-linked role for OpenSearch (required for VPC deployment)
# This may already exist in your account - if so, import it or set create_service_linked_role = false
resource "aws_iam_service_linked_role" "opensearch" {
count = var.create_service_linked_role ? 1 : 0
aws_service_name = "opensearchservice.amazonaws.com"
}
# IAM policy for OpenSearch access
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
# KMS key lookup for encryption at rest
data "aws_kms_key" "opensearch" {
key_id = "alias/aws/es"
}
# Access policy - allows all principals within the VPC (secured by VPC + security groups)
data "aws_iam_policy_document" "opensearch_access" {
statement {
effect = "Allow"
principals {
type = "AWS"
identifiers = ["*"]
}
actions = ["es:*"]
resources = [
"arn:aws:es:${data.aws_region.current.id}:${data.aws_caller_identity.current.account_id}:domain/${var.name}/*"
]
}
}
# OpenSearch domain
resource "aws_opensearch_domain" "main" {
domain_name = var.name
engine_version = "OpenSearch_${var.engine_version}"
cluster_config {
instance_type = var.instance_type
instance_count = var.instance_count
zone_awareness_enabled = var.zone_awareness_enabled
dedicated_master_enabled = var.dedicated_master_enabled
dedicated_master_type = var.dedicated_master_enabled ? var.dedicated_master_type : null
dedicated_master_count = var.dedicated_master_enabled ? var.dedicated_master_count : null
multi_az_with_standby_enabled = var.multi_az_with_standby_enabled
warm_enabled = var.warm_enabled
warm_type = var.warm_enabled ? var.warm_type : null
warm_count = var.warm_enabled ? var.warm_count : null
dynamic "zone_awareness_config" {
for_each = var.zone_awareness_enabled ? [1] : []
content {
availability_zone_count = var.availability_zone_count
}
}
dynamic "cold_storage_options" {
for_each = var.cold_storage_enabled ? [1] : []
content {
enabled = true
}
}
}
ebs_options {
ebs_enabled = true
volume_type = var.ebs_volume_type
volume_size = var.ebs_volume_size
iops = var.ebs_volume_type == "gp3" || var.ebs_volume_type == "io1" ? var.ebs_iops : null
throughput = var.ebs_volume_type == "gp3" ? var.ebs_throughput : null
}
vpc_options {
subnet_ids = var.subnet_ids
security_group_ids = length(var.security_group_ids) > 0 ? var.security_group_ids : [aws_security_group.opensearch_sg[0].id]
}
encrypt_at_rest {
enabled = true
kms_key_id = var.kms_key_id != null ? var.kms_key_id : data.aws_kms_key.opensearch.arn
}
node_to_node_encryption {
enabled = true
}
domain_endpoint_options {
enforce_https = true
tls_security_policy = var.tls_security_policy
}
advanced_security_options {
enabled = true
anonymous_auth_enabled = false
internal_user_database_enabled = var.internal_user_database_enabled
dynamic "master_user_options" {
for_each = var.internal_user_database_enabled ? [1] : []
content {
master_user_name = var.master_user_name
master_user_password = var.master_user_password
}
}
dynamic "master_user_options" {
for_each = var.internal_user_database_enabled ? [] : [1]
content {
master_user_arn = var.master_user_arn
}
}
}
advanced_options = var.advanced_options
access_policies = data.aws_iam_policy_document.opensearch_access.json
auto_tune_options {
desired_state = var.auto_tune_enabled ? "ENABLED" : "DISABLED"
rollback_on_disable = var.auto_tune_rollback_on_disable
}
off_peak_window_options {
enabled = var.off_peak_window_enabled
dynamic "off_peak_window" {
for_each = var.off_peak_window_enabled ? [1] : []
content {
window_start_time {
hours = var.off_peak_window_start_hours
minutes = var.off_peak_window_start_minutes
}
}
}
}
software_update_options {
auto_software_update_enabled = var.auto_software_update_enabled
}
dynamic "log_publishing_options" {
for_each = var.enable_logging ? ["INDEX_SLOW_LOGS", "SEARCH_SLOW_LOGS", "ES_APPLICATION_LOGS"] : []
content {
cloudwatch_log_group_arn = "arn:aws:logs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:log-group:${local.log_group_name}"
log_type = log_publishing_options.value
}
}
tags = var.tags
depends_on = [
aws_iam_service_linked_role.opensearch,
aws_cloudwatch_log_resource_policy.opensearch
]
lifecycle {
precondition {
condition = !var.internal_user_database_enabled || var.master_user_name != null
error_message = "master_user_name is required when internal_user_database_enabled is true."
}
precondition {
condition = !var.internal_user_database_enabled || var.master_user_password != null
error_message = "master_user_password is required when internal_user_database_enabled is true."
}
}
}
# CloudWatch log group for OpenSearch
locals {
log_group_name = var.log_group_name != null ? var.log_group_name : "/aws/OpenSearchService/domains/${var.name}/search-logs"
}
resource "aws_cloudwatch_log_group" "opensearch" {
count = var.enable_logging ? 1 : 0
name = local.log_group_name
retention_in_days = var.log_retention_days
tags = var.tags
}
# CloudWatch log resource policy for OpenSearch
data "aws_iam_policy_document" "opensearch_log_policy" {
count = var.enable_logging ? 1 : 0
statement {
effect = "Allow"
principals {
type = "Service"
identifiers = ["es.amazonaws.com"]
}
actions = [
"logs:PutLogEvents",
"logs:CreateLogStream",
]
resources = ["arn:aws:logs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:log-group:${local.log_group_name}:*"]
}
}
resource "aws_cloudwatch_log_resource_policy" "opensearch" {
count = var.enable_logging ? 1 : 0
policy_name = "OpenSearchService-${var.name}-Search-logs"
policy_document = data.aws_iam_policy_document.opensearch_log_policy[0].json
}

View File

@@ -1,29 +0,0 @@
output "domain_endpoint" {
description = "The endpoint of the OpenSearch domain"
value = aws_opensearch_domain.main.endpoint
}
output "domain_arn" {
description = "The ARN of the OpenSearch domain"
value = aws_opensearch_domain.main.arn
}
output "domain_id" {
description = "The unique identifier for the OpenSearch domain"
value = aws_opensearch_domain.main.domain_id
}
output "domain_name" {
description = "The name of the OpenSearch domain"
value = aws_opensearch_domain.main.domain_name
}
output "kibana_endpoint" {
description = "The OpenSearch Dashboards endpoint"
value = aws_opensearch_domain.main.dashboard_endpoint
}
output "security_group_id" {
description = "The ID of the OpenSearch security group"
value = length(aws_security_group.opensearch_sg) > 0 ? aws_security_group.opensearch_sg[0].id : null
}

View File

@@ -1,242 +0,0 @@
variable "name" {
description = "Name of the OpenSearch domain"
type = string
}
variable "vpc_id" {
description = "ID of the VPC to deploy the OpenSearch domain into"
type = string
}
variable "subnet_ids" {
description = "List of subnet IDs for the OpenSearch domain"
type = list(string)
}
variable "ingress_cidrs" {
description = "CIDR blocks allowed to access OpenSearch"
type = list(string)
}
variable "engine_version" {
description = "OpenSearch engine version (e.g., 2.17, 3.3)"
type = string
default = "3.3"
}
variable "instance_type" {
description = "Instance type for data nodes"
type = string
default = "r8g.large.search"
}
variable "instance_count" {
description = "Number of data nodes"
type = number
default = 3
}
variable "zone_awareness_enabled" {
description = "Whether to enable zone awareness for the cluster"
type = bool
default = true
}
variable "availability_zone_count" {
description = "Number of availability zones (2 or 3)"
type = number
default = 3
}
variable "dedicated_master_enabled" {
description = "Whether to enable dedicated master nodes"
type = bool
default = true
}
variable "dedicated_master_type" {
description = "Instance type for dedicated master nodes"
type = string
default = "m7g.large.search"
}
variable "dedicated_master_count" {
description = "Number of dedicated master nodes (must be 3 or 5)"
type = number
default = 3
}
variable "multi_az_with_standby_enabled" {
description = "Whether to enable Multi-AZ with Standby deployment"
type = bool
default = true
}
variable "warm_enabled" {
description = "Whether to enable warm storage"
type = bool
default = false
}
variable "warm_type" {
description = "Instance type for warm nodes"
type = string
default = "ultrawarm1.medium.search"
}
variable "warm_count" {
description = "Number of warm nodes"
type = number
default = 2
}
variable "cold_storage_enabled" {
description = "Whether to enable cold storage"
type = bool
default = false
}
variable "ebs_volume_type" {
description = "EBS volume type (gp3, gp2, io1)"
type = string
default = "gp3"
}
variable "ebs_volume_size" {
description = "EBS volume size in GB per node"
type = number
default = 512
}
variable "ebs_iops" {
description = "IOPS for gp3/io1 volumes"
type = number
default = 3000
}
variable "ebs_throughput" {
description = "Throughput in MiB/s for gp3 volumes"
type = number
default = 256
}
variable "kms_key_id" {
description = "KMS key ID for encryption at rest (uses AWS managed key if not specified)"
type = string
default = null
}
variable "tls_security_policy" {
description = "TLS security policy for HTTPS endpoints"
type = string
default = "Policy-Min-TLS-1-2-2019-07"
}
variable "internal_user_database_enabled" {
description = "Whether to enable the internal user database for fine-grained access control"
type = bool
default = true
}
variable "master_user_name" {
description = "Master user name for internal user database"
type = string
default = null
sensitive = true
}
variable "master_user_password" {
description = "Master user password for internal user database"
type = string
default = null
sensitive = true
}
variable "master_user_arn" {
description = "IAM ARN for the master user (used when internal_user_database_enabled is false)"
type = string
default = null
}
variable "advanced_options" {
description = "Advanced options for OpenSearch"
type = map(string)
default = {
"indices.fielddata.cache.size" = "20"
"indices.query.bool.max_clause_count" = "1024"
"override_main_response_version" = "false"
"rest.action.multi.allow_explicit_index" = "true"
}
}
variable "auto_tune_enabled" {
description = "Whether to enable Auto-Tune"
type = bool
default = true
}
variable "auto_tune_rollback_on_disable" {
description = "Whether to roll back Auto-Tune changes when disabled"
type = string
default = "NO_ROLLBACK"
}
variable "off_peak_window_enabled" {
description = "Whether to enable off-peak window for maintenance"
type = bool
default = true
}
variable "off_peak_window_start_hours" {
description = "Hour (UTC) when off-peak window starts (0-23)"
type = number
default = 6
}
variable "off_peak_window_start_minutes" {
description = "Minutes when off-peak window starts (0-59)"
type = number
default = 0
}
variable "auto_software_update_enabled" {
description = "Whether to enable automatic software updates"
type = bool
default = false
}
variable "enable_logging" {
description = "Whether to enable CloudWatch logging"
type = bool
default = false
}
variable "create_service_linked_role" {
description = "Whether to create the OpenSearch service-linked role (set to false if it already exists)"
type = bool
default = false
}
variable "log_retention_days" {
description = "Number of days to retain CloudWatch logs"
type = number
default = 30
}
variable "security_group_ids" {
description = "Existing security group IDs to attach. If empty, a new SG is created."
type = list(string)
default = []
}
variable "log_group_name" {
description = "CloudWatch log group name. Defaults to AWS console convention."
type = string
default = null
}
variable "tags" {
description = "Tags to apply to OpenSearch resources"
type = map(string)
default = {}
}

View File

@@ -144,7 +144,6 @@ module.exports = {
"**/src/app/**/services/*.test.ts",
"**/src/app/**/utils/*.test.ts",
"**/src/app/**/hooks/*.test.ts", // Pure packet processor tests
"**/src/hooks/**/*.test.ts",
"**/src/refresh-components/**/*.test.ts",
"**/src/refresh-pages/**/*.test.ts",
"**/src/sections/**/*.test.ts",

View File

@@ -1,5 +1,6 @@
"use client";
import { cn } from "@opal/utils";
import { useTableSize } from "@opal/components/table/TableSizeContext";
interface ActionsContainerProps {
@@ -24,7 +25,14 @@ export default function ActionsContainer({
data-size={size}
onClick={onClick}
>
<div className="flex h-full items-center justify-end">{children}</div>
<div
className={cn(
"flex h-full items-center",
type === "cell" ? "justify-end" : "justify-center"
)}
>
{children}
</div>
</Tag>
);
}

View File

@@ -61,7 +61,6 @@ function DragOverlayRowInner<TData>({
imageSrc={qualifierColumn.getImageSrc?.(row.original)}
imageAlt={qualifierColumn.getImageAlt?.(row.original)}
background={qualifierColumn.background}
iconSize={qualifierColumn.iconSize}
selectable={isSelectable}
selected={isSelectable && row.getIsSelected()}
/>

View File

@@ -47,7 +47,7 @@ function Table({
<table
ref={ref}
className={cn("border-separate border-spacing-0", !width && "min-w-full")}
style={{ width }}
style={{ tableLayout: "fixed", width }}
data-size={size}
data-variant={variant}
data-selection={selectionBehavior}

View File

@@ -92,7 +92,9 @@ export default function TableHead({
data-size={resolvedSize}
data-bottom-border={bottomBorder || undefined}
>
<div className="flex items-center gap-1">
<div
className={cn("flex items-center gap-1", alignmentFlexClass[alignment])}
>
<div className="table-head-label">
<Text
mainUiAction={!isSmall}

View File

@@ -26,13 +26,11 @@ interface TableQualifierProps {
imageAlt?: string;
/** Show a tinted background container behind the content. */
background?: boolean;
/** Icon size preset. `"lg"` = 28/24, `"md"` = 20/16. @default "md" */
iconSize?: "lg" | "md";
}
const iconSizesMap = {
lg: { lg: 28, md: 24 },
md: { lg: 20, md: 16 },
const iconSizes = {
lg: 28,
md: 24,
} as const;
function getOverlayStyles(selected: boolean, disabled: boolean) {
@@ -55,10 +53,9 @@ function TableQualifier({
imageSrc,
imageAlt = "",
background = false,
iconSize: iconSizePreset = "md",
}: TableQualifierProps) {
const resolvedSize = useTableSize();
const iconSize = iconSizesMap[iconSizePreset][resolvedSize];
const iconSize = iconSizes[resolvedSize];
const overlayStyles = getOverlayStyles(selected, disabled);
function renderContent() {

View File

@@ -33,8 +33,6 @@ interface QualifierConfig<TData> {
getImageAlt?: (row: TData) => string;
/** Show a tinted background container behind the content. @default false */
background?: boolean;
/** Icon size preset. `"lg"` = 28/24, `"md"` = 20/16. @default "md" */
iconSize?: "lg" | "md";
}
// ---------------------------------------------------------------------------
@@ -162,7 +160,6 @@ export function createTableColumns<TData>(): TableColumnsBuilder<TData> {
getImageSrc: config?.getImageSrc,
getImageAlt: config?.getImageAlt,
background: config?.background,
iconSize: config?.iconSize,
};
},

View File

@@ -544,7 +544,6 @@ export function Table<TData>(props: DataTableProps<TData>) {
imageSrc={qDef.getImageSrc?.(row.original)}
imageAlt={qDef.getImageAlt?.(row.original)}
background={qDef.background}
iconSize={qDef.iconSize}
selectable={showQualifierCheckbox}
selected={
showQualifierCheckbox && row.getIsSelected()

View File

@@ -59,8 +59,6 @@ export interface OnyxQualifierColumn<TData> extends OnyxColumnBase<TData> {
getImageAlt?: (row: TData) => string;
/** Show a tinted background container behind the content. @default false */
background?: boolean;
/** Icon size preset. Use `"lg"` for avatars, `"md"` for regular icons. @default "md" */
iconSize?: "lg" | "md";
}
/** Data column — accessor-based column with sorting/resizing. */

View File

@@ -159,7 +159,6 @@ export { default as SvgSort } from "@opal/icons/sort";
export { default as SvgSortOrder } from "@opal/icons/sort-order";
export { default as SvgSparkle } from "@opal/icons/sparkle";
export { default as SvgStar } from "@opal/icons/star";
export { default as SvgStarOff } from "@opal/icons/star-off";
export { default as SvgStep1 } from "@opal/icons/step1";
export { default as SvgStep2 } from "@opal/icons/step2";
export { default as SvgStep3 } from "@opal/icons/step3";

View File

@@ -1,22 +0,0 @@
import type { IconProps } from "@opal/types";
const SvgStarOff = ({ size, ...props }: IconProps) => (
<svg
width={size}
height={size}
viewBox="0 0 16 16"
fill="none"
xmlns="http://www.w3.org/2000/svg"
stroke="currentColor"
{...props}
>
<path
d="M1 1L5.56196 5.56196M15 15L5.56196 5.56196M5.56196 5.56196L1.33333 6.18004L4.66666 9.42671L3.88 14.0134L8 11.8467L12.12 14.0134L11.7267 11.72M12.1405 8.64051L14.6667 6.18004L10.06 5.50671L8 1.33337L6.95349 3.45349"
strokeWidth={1.5}
strokeLinecap="round"
strokeLinejoin="round"
/>
</svg>
);
export default SvgStarOff;

View File

@@ -26,7 +26,7 @@ function PersonaTypeDisplay({ persona }: { persona: Persona }) {
return <Text as="p">Built-In</Text>;
}
if (persona.is_featured) {
if (persona.featured) {
return <Text as="p">Featured</Text>;
}
@@ -153,7 +153,7 @@ export function PersonasTable({
if (personaToToggleDefault) {
const response = await togglePersonaFeatured(
personaToToggleDefault.id,
personaToToggleDefault.is_featured
personaToToggleDefault.featured
);
if (response.ok) {
refreshPersonas();
@@ -179,7 +179,7 @@ export function PersonasTable({
{defaultModalOpen &&
personaToToggleDefault &&
(() => {
const isDefault = personaToToggleDefault.is_featured;
const isDefault = personaToToggleDefault.featured;
const title = isDefault
? "Remove Featured Agent"
@@ -260,20 +260,20 @@ export function PersonasTable({
`}
>
<div className="my-auto flex-none w-22">
{!persona.is_featured ? (
{!persona.featured ? (
<div className="text-error">Not Featured</div>
) : (
"Featured"
)}
</div>
<Checkbox checked={persona.is_featured} />
<Checkbox checked={persona.featured} />
</div>,
<div
key="is_visible"
onClick={async () => {
const response = await togglePersonaVisibility(
persona.id,
persona.is_listed
persona.is_visible
);
if (response.ok) {
refreshPersonas();
@@ -288,13 +288,13 @@ export function PersonasTable({
`}
>
<div className="my-auto w-fit">
{!persona.is_listed ? (
{!persona.is_visible ? (
<div className="text-error">Hidden</div>
) : (
"Visible"
)}
</div>
<Checkbox checked={persona.is_listed} />
<Checkbox checked={persona.is_visible} />
</div>,
<div key="edit" className="flex">
<div className="mr-auto my-auto">

View File

@@ -51,9 +51,9 @@ export interface MinimalPersonaSnapshot {
icon_name?: string;
is_public: boolean;
is_listed: boolean;
is_visible: boolean;
display_priority: number | null;
is_featured: boolean;
featured: boolean;
builtin_persona: boolean;
labels?: PersonaLabel[];

View File

@@ -22,7 +22,7 @@ interface PersonaUpsertRequest {
uploaded_image_id: string | null;
icon_name: string | null;
search_start_date: Date | null;
is_featured: boolean;
featured: boolean;
display_priority: number | null;
label_ids: number[] | null;
user_file_ids: string[] | null;
@@ -52,7 +52,7 @@ export interface PersonaUpsertParameters {
search_start_date: Date | null;
uploaded_image_id: string | null;
icon_name: string | null;
is_featured: boolean;
featured: boolean;
label_ids: number[] | null;
user_file_ids: string[];
// Hierarchy nodes (folders, spaces, channels) for scoped search
@@ -79,7 +79,7 @@ function buildPersonaUpsertRequest({
document_ids,
icon_name,
uploaded_image_id,
is_featured,
featured,
llm_model_provider_override,
llm_model_version_override,
starter_messages,
@@ -101,7 +101,7 @@ function buildPersonaUpsertRequest({
remove_image,
search_start_date,
datetime_aware,
is_featured: is_featured ?? false,
featured: featured ?? false,
llm_model_provider_override: llm_model_provider_override ?? null,
llm_model_version_override: llm_model_version_override ?? null,
starter_messages: starter_messages ?? null,
@@ -224,7 +224,7 @@ export async function togglePersonaFeatured(
"Content-Type": "application/json",
},
body: JSON.stringify({
is_featured: !featured,
featured: !featured,
}),
credentials: "include",
});
@@ -235,13 +235,13 @@ export async function togglePersonaVisibility(
personaId: number,
isVisible: boolean
) {
const response = await fetch(`/api/admin/persona/${personaId}/listed`, {
const response = await fetch(`/api/admin/persona/${personaId}/visible`, {
method: "PATCH",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
is_listed: !isVisible,
is_visible: !isVisible,
}),
credentials: "include",
});

View File

@@ -626,7 +626,10 @@ function Main({ ccPairId }: { ccPairId: number }) {
<div className="w-[200px]">
<div className="text-sm font-medium mb-1">Last Indexed</div>
<div className="text-sm text-text-default">
{timeAgo(ccPair?.last_indexed) ?? "-"}
{timeAgo(
indexAttempts?.find((attempt) => attempt.status === "success")
?.time_started
) ?? "-"}
</div>
</div>

Some files were not shown because too many files have changed in this diff Show More