Compare commits

..

17 Commits

Author SHA1 Message Date
Jean Caillé
569fef22b4 style: align test conventions with existing codebase
- Prefix unused mock params with _ and type as Any
- Extract shared db session mock setup into helper
- Remove unused pytest import
- Use module path constant to reduce repetition
2026-03-31 18:21:42 +02:00
Jean Caille
517d1035e0 Merge branch 'main' into fix/ttl-skip-missing-file-records 2026-03-31 18:13:12 +02:00
Jean Caillé
ad108dd573 test: add unit tests for TTL deletion resilience
- test_chat_deletion: verifies delete_messages_and_files_from_chat_session
  continues when a file record is missing, and handles messages with no files
- test_ttl_task: verifies perform_ttl_management_task continues deleting
  remaining sessions after one fails, and reports correct success status

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-31 18:11:45 +02:00
Ben Wu
adf5691b5f feat(canvas 2/4): Canvas Connector data fetching (#9386) 2026-03-31 03:07:05 +00:00
Nikolas Garza
c1a8a5bd83 fix(tenants): run migrations on pool tenants before assigning to new users (#9788) 2026-03-31 01:24:01 +00:00
Justin Tahara
8fd486da99 feat(rds): Add Freeable Memory alert (#9787) 2026-03-31 00:59:30 +00:00
Raunak Bhagat
4bda4d3637 refactor: migrate away from cards/Select (#9771) 2026-03-31 00:27:01 +00:00
Justin Tahara
13c25eadad feat(rds): Adding CPU Alerts (#9784) 2026-03-31 00:22:15 +00:00
Justin Tahara
1f244e6388 feat(eks): Adding Cloudwatch logging (#9783) 2026-03-30 23:52:44 +00:00
Nikolas Garza
18b0416d30 feat(sentry): enable frontend source map uploads in cloud CI (#9775) 2026-03-30 23:42:57 +00:00
Nikolas Garza
4bc0bc1efb feat(helm): add Grafana dashboard provisioning (#9725) 2026-03-30 23:42:32 +00:00
Justin Tahara
1555217061 feat(rds): Adding RDS Snapshosts (#9779) 2026-03-30 23:17:08 +00:00
Nikolas Garza
d177a833f0 feat(sentry): add release tracking to backend and frontend (#9773) 2026-03-30 22:35:38 +00:00
Jamison Lahman
086997d3c5 chore(types): fix IconButton size props (#9772) 2026-03-30 21:40:25 +00:00
dependabot[bot]
dccec78397 chore(deps): bump helm/chart-testing-action from b5eebdd9998021f29756c53432f48dab66394810 to 2e2940618cb426dce2999631d543b53cdcfc8527 (#9764)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-30 14:41:01 -07:00
Jamison Lahman
0123133621 chore(fe): polish Query History table (#9767) 2026-03-30 21:30:13 +00:00
Jean Caillé
2a5810a44a fix: TTL management task no longer crashes on missing file records
The TTL cleanup task would crash with a RuntimeError when trying to
delete a file record that no longer exists, blocking cleanup of all
remaining sessions. Two fixes:

1. Wrap file deletion in delete_messages_and_files_from_chat_session
   with try/except — a missing file is already the desired state, so
   log a warning and continue.

2. Add per-session error handling in the TTL task loop. The existing
   comment said "one session per delete so that we don't blow up" but
   the outer try/except still aborted on the first failure. Now each
   session deletion is individually wrapped so failures don't block
   cleanup of subsequent sessions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-30 19:25:36 +02:00
62 changed files with 4523 additions and 2772 deletions

View File

@@ -704,6 +704,9 @@ jobs:
NEXT_PUBLIC_FORGOT_PASSWORD_ENABLED=true
NEXT_PUBLIC_INCLUDE_ERROR_POPUP_SUPPORT_LINK=true
NODE_OPTIONS=--max-old-space-size=8192
SENTRY_RELEASE=${{ github.sha }}
secrets: |
sentry_auth_token=${{ secrets.SENTRY_AUTH_TOKEN }}
cache-from: |
type=registry,ref=${{ env.RUNS_ON_ECR_CACHE }}:cloudweb-cache-amd64
type=registry,ref=${{ env.REGISTRY_IMAGE }}:latest
@@ -786,6 +789,9 @@ jobs:
NEXT_PUBLIC_FORGOT_PASSWORD_ENABLED=true
NEXT_PUBLIC_INCLUDE_ERROR_POPUP_SUPPORT_LINK=true
NODE_OPTIONS=--max-old-space-size=8192
SENTRY_RELEASE=${{ github.sha }}
secrets: |
sentry_auth_token=${{ secrets.SENTRY_AUTH_TOKEN }}
cache-from: |
type=registry,ref=${{ env.RUNS_ON_ECR_CACHE }}:cloudweb-cache-arm64
type=registry,ref=${{ env.REGISTRY_IMAGE }}:latest

View File

@@ -41,7 +41,7 @@ jobs:
version: v3.19.0
- name: Set up chart-testing
uses: helm/chart-testing-action@b5eebdd9998021f29756c53432f48dab66394810
uses: helm/chart-testing-action@2e2940618cb426dce2999631d543b53cdcfc8527
with:
uv_version: "0.9.9"

View File

@@ -13,6 +13,7 @@ from redis.lock import Lock as RedisLock
from ee.onyx.server.tenants.provisioning import setup_tenant
from ee.onyx.server.tenants.schema_management import create_schema_if_not_exists
from ee.onyx.server.tenants.schema_management import get_current_alembic_version
from ee.onyx.server.tenants.schema_management import run_alembic_migrations
from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.app_configs import TARGET_AVAILABLE_TENANTS
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
@@ -29,9 +30,10 @@ from shared_configs.configs import TENANT_ID_PREFIX
# Each tenant takes ~80s (alembic migrations), so 5 tenants ≈ 7 minutes.
_MAX_TENANTS_PER_RUN = 5
# Time limits sized for worst-case batch: _MAX_TENANTS_PER_RUN × ~90s + buffer.
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 10 # 10 minutes
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 15 # 15 minutes
# Time limits sized for worst-case: provisioning up to _MAX_TENANTS_PER_RUN new tenants
# (~90s each) plus migrating up to TARGET_AVAILABLE_TENANTS pool tenants (~90s each).
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 20 # 20 minutes
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 25 # 25 minutes
@shared_task(
@@ -91,8 +93,7 @@ def check_available_tenants(self: Task) -> None: # noqa: ARG001
batch_size = min(tenants_to_provision, _MAX_TENANTS_PER_RUN)
if batch_size < tenants_to_provision:
task_logger.info(
f"Capping batch to {batch_size} "
f"(need {tenants_to_provision}, will catch up next cycle)"
f"Capping batch to {batch_size} (need {tenants_to_provision}, will catch up next cycle)"
)
provisioned = 0
@@ -103,12 +104,14 @@ def check_available_tenants(self: Task) -> None: # noqa: ARG001
provisioned += 1
except Exception:
task_logger.exception(
f"Failed to provision tenant {i + 1}/{batch_size}, "
"continuing with remaining tenants"
f"Failed to provision tenant {i + 1}/{batch_size}, continuing with remaining tenants"
)
task_logger.info(f"Provisioning complete: {provisioned}/{batch_size} succeeded")
# Migrate any pool tenants that were provisioned before a new migration was deployed
_migrate_stale_pool_tenants()
except Exception:
task_logger.exception("Error in check_available_tenants task")
@@ -121,6 +124,46 @@ def check_available_tenants(self: Task) -> None: # noqa: ARG001
)
def _migrate_stale_pool_tenants() -> None:
"""
Run alembic upgrade head on all pool tenants. Since alembic upgrade head is
idempotent, tenants already at head are a fast no-op. This ensures pool
tenants are always current so that signup doesn't hit schema mismatches
(e.g. missing columns added after the tenant was pre-provisioned).
"""
with get_session_with_shared_schema() as db_session:
pool_tenants = db_session.query(AvailableTenant).all()
tenant_ids = [t.tenant_id for t in pool_tenants]
if not tenant_ids:
return
task_logger.info(
f"Checking {len(tenant_ids)} pool tenant(s) for pending migrations"
)
for tenant_id in tenant_ids:
try:
run_alembic_migrations(tenant_id)
new_version = get_current_alembic_version(tenant_id)
with get_session_with_shared_schema() as db_session:
tenant = (
db_session.query(AvailableTenant)
.filter_by(tenant_id=tenant_id)
.first()
)
if tenant and tenant.alembic_version != new_version:
task_logger.info(
f"Migrated pool tenant {tenant_id}: {tenant.alembic_version} -> {new_version}"
)
tenant.alembic_version = new_version
db_session.commit()
except Exception:
task_logger.exception(
f"Failed to migrate pool tenant {tenant_id}, skipping"
)
def pre_provision_tenant() -> bool:
"""
Pre-provision a new tenant and store it in the NewAvailableTenant table.

View File

@@ -54,27 +54,35 @@ def perform_ttl_management_task(
retention_limit_days, db_session
)
failures = 0
for user_id, session_id in old_chat_sessions:
# one session per delete so that we don't blow up if a deletion fails.
with get_session_with_current_tenant() as db_session:
delete_chat_session(
user_id,
session_id,
db_session,
include_deleted=True,
hard_delete=True,
try:
with get_session_with_current_tenant() as db_session:
delete_chat_session(
user_id,
session_id,
db_session,
include_deleted=True,
hard_delete=True,
)
except Exception:
failures += 1
logger.exception(
"Failed to delete chat session "
f"user_id={user_id} session_id={session_id}, "
"continuing with remaining sessions"
)
with get_session_with_current_tenant() as db_session:
mark_task_as_finished_with_id(
db_session=db_session,
task_id=task_id,
success=True,
success=failures == 0,
)
except Exception:
logger.exception(
f"delete_chat_session exceptioned. user_id={user_id} session_id={session_id}"
f"TTL management task failed. user_id={user_id} session_id={session_id}"
)
with get_session_with_current_tenant() as db_session:
mark_task_as_finished_with_id(

View File

@@ -99,6 +99,26 @@ async def get_or_provision_tenant(
tenant_id = await get_available_tenant()
if tenant_id:
# Run migrations to ensure the pre-provisioned tenant schema is current.
# Pool tenants may have been created before a new migration was deployed.
# Capture as a non-optional local so mypy can type the lambda correctly.
_tenant_id: str = tenant_id
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(
None, lambda: run_alembic_migrations(_tenant_id)
)
except Exception:
# The tenant was already dequeued from the pool — roll it back so
# it doesn't end up orphaned (schema exists, but not assigned to anyone).
logger.exception(
f"Migration failed for pre-provisioned tenant {_tenant_id}; rolling back"
)
try:
await rollback_tenant_provisioning(_tenant_id)
except Exception:
logger.exception(f"Failed to rollback orphaned tenant {_tenant_id}")
raise
# If we have a pre-provisioned tenant, assign it to the user
await assign_tenant_to_user(tenant_id, email, referral_source)
logger.info(f"Assigned pre-provisioned tenant {tenant_id} to user {email}")

View File

@@ -100,6 +100,7 @@ def get_model_app() -> FastAPI:
dsn=SENTRY_DSN,
integrations=[StarletteIntegration(), FastApiIntegration()],
traces_sample_rate=0.1,
release=__version__,
)
logger.info("Sentry initialized")
else:

View File

@@ -20,6 +20,7 @@ from sentry_sdk.integrations.celery import CeleryIntegration
from sqlalchemy import text
from sqlalchemy.orm import Session
from onyx import __version__
from onyx.background.celery.apps.task_formatters import CeleryTaskColoredFormatter
from onyx.background.celery.apps.task_formatters import CeleryTaskPlainFormatter
from onyx.background.celery.celery_utils import celery_is_worker_primary
@@ -65,6 +66,7 @@ if SENTRY_DSN:
dsn=SENTRY_DSN,
integrations=[CeleryIntegration()],
traces_sample_rate=0.1,
release=__version__,
)
logger.info("Sentry initialized")
else:
@@ -515,7 +517,8 @@ def reset_tenant_id(
def wait_for_vespa_or_shutdown(
sender: Any, **kwargs: Any # noqa: ARG001
sender: Any, # noqa: ARG001
**kwargs: Any, # noqa: ARG001
) -> None: # noqa: ARG001
"""Waits for Vespa to become ready subject to a timeout.
Raises WorkerShutdown if the timeout is reached."""

View File

@@ -9,6 +9,7 @@ from celery import Celery
from celery import shared_task
from celery import Task
from onyx import __version__
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.memory_monitoring import emit_process_memory
from onyx.background.celery.tasks.docprocessing.heartbeat import start_heartbeat
@@ -137,6 +138,7 @@ def _docfetching_task(
sentry_sdk.init(
dsn=SENTRY_DSN,
traces_sample_rate=0.1,
release=__version__,
)
logger.info("Sentry initialized")
else:

View File

@@ -1,8 +1,19 @@
import threading
import time
from collections.abc import Callable
from collections.abc import Generator
from queue import Empty
from onyx.chat.citation_processor import CitationMapping
from onyx.chat.emitter import Emitter
from onyx.context.search.models import SearchDoc
from onyx.server.query_and_chat.placement import Placement
from onyx.server.query_and_chat.streaming_models import OverallStop
from onyx.server.query_and_chat.streaming_models import Packet
from onyx.server.query_and_chat.streaming_models import PacketException
from onyx.tools.models import ToolCallInfo
from onyx.utils.threadpool_concurrency import run_in_background
from onyx.utils.threadpool_concurrency import wait_on_background
# Type alias for search doc deduplication key
# Simple key: just document_id (str)
@@ -148,3 +159,114 @@ class ChatStateContainer:
"""Thread-safe getter for emitted citations (returns a copy)."""
with self._lock:
return self._emitted_citations.copy()
def run_chat_loop_with_state_containers(
chat_loop_func: Callable[[Emitter, ChatStateContainer], None],
completion_callback: Callable[[ChatStateContainer], None],
is_connected: Callable[[], bool],
emitter: Emitter,
state_container: ChatStateContainer,
) -> Generator[Packet, None]:
"""
Explicit wrapper function that runs a function in a background thread
with event streaming capabilities.
The wrapped function should accept emitter as first arg and use it to emit
Packet objects. This wrapper polls every 300ms to check if stop signal is set.
Args:
func: The function to wrap (should accept emitter and state_container as first and second args)
completion_callback: Callback function to call when the function completes
emitter: Emitter instance for sending packets
state_container: ChatStateContainer instance for accumulating state
is_connected: Callable that returns False when stop signal is set
Usage:
packets = run_chat_loop_with_state_containers(
my_func,
completion_callback=completion_callback,
emitter=emitter,
state_container=state_container,
is_connected=check_func,
)
for packet in packets:
# Process packets
pass
"""
def run_with_exception_capture() -> None:
try:
chat_loop_func(emitter, state_container)
except Exception as e:
# If execution fails, emit an exception packet
emitter.emit(
Packet(
placement=Placement(turn_index=0),
obj=PacketException(type="error", exception=e),
)
)
# Run the function in a background thread
thread = run_in_background(run_with_exception_capture)
pkt: Packet | None = None
last_turn_index = 0 # Track the highest turn_index seen for stop packet
last_cancel_check = time.monotonic()
cancel_check_interval = 0.3 # Check for cancellation every 300ms
try:
while True:
# Poll queue with 300ms timeout for natural stop signal checking
# the 300ms timeout is to avoid busy-waiting and to allow the stop signal to be checked regularly
try:
pkt = emitter.bus.get(timeout=0.3)
except Empty:
if not is_connected():
# Stop signal detected
yield Packet(
placement=Placement(turn_index=last_turn_index + 1),
obj=OverallStop(type="stop", stop_reason="user_cancelled"),
)
break
last_cancel_check = time.monotonic()
continue
if pkt is not None:
# Track the highest turn_index for the stop packet
if pkt.placement and pkt.placement.turn_index > last_turn_index:
last_turn_index = pkt.placement.turn_index
if isinstance(pkt.obj, OverallStop):
yield pkt
break
elif isinstance(pkt.obj, PacketException):
raise pkt.obj.exception
else:
yield pkt
# Check for cancellation periodically even when packets are flowing
# This ensures stop signal is checked during active streaming
current_time = time.monotonic()
if current_time - last_cancel_check >= cancel_check_interval:
if not is_connected():
# Stop signal detected during streaming
yield Packet(
placement=Placement(turn_index=last_turn_index + 1),
obj=OverallStop(type="stop", stop_reason="user_cancelled"),
)
break
last_cancel_check = current_time
finally:
# Wait for thread to complete on normal exit to propagate exceptions and ensure cleanup.
# Skip waiting if user disconnected to exit quickly.
if is_connected():
wait_on_background(thread)
try:
completion_callback(state_container)
except Exception as e:
emitter.emit(
Packet(
placement=Placement(turn_index=last_turn_index + 1),
obj=PacketException(type="error", exception=e),
)
)

View File

@@ -1,40 +1,19 @@
import threading
from queue import Queue
from onyx.server.query_and_chat.placement import Placement
from onyx.server.query_and_chat.streaming_models import Packet
class Emitter:
"""Routes packets from LLM/tool execution to the ``_run_models`` drain loop.
"""Use this inside tools to emit arbitrary UI progress."""
Tags every packet with ``model_index`` and places it on ``merged_queue``
as a ``(model_idx, packet)`` tuple for ordered consumption downstream.
Args:
merged_queue: Shared queue owned by ``_run_models``.
model_idx: Index embedded in packet placements (``0`` for N=1 runs).
drain_done: Optional event set by ``_run_models`` when the drain loop
exits early (e.g. HTTP disconnect). When set, ``emit`` returns
immediately so worker threads can exit fast.
"""
def __init__(
self,
merged_queue: Queue[tuple[int, Packet | Exception | object]],
model_idx: int = 0,
drain_done: threading.Event | None = None,
) -> None:
self._model_idx = model_idx
self._merged_queue = merged_queue
self._drain_done = drain_done
def __init__(self, bus: Queue):
self.bus = bus
def emit(self, packet: Packet) -> None:
if self._drain_done is not None and self._drain_done.is_set():
return
base = packet.placement or Placement(turn_index=0)
tagged = Packet(
placement=base.model_copy(update={"model_index": self._model_idx}),
obj=packet.obj,
)
self._merged_queue.put((self._model_idx, tagged))
self.bus.put(packet) # Thread-safe
def get_default_emitter() -> Emitter:
bus: Queue[Packet] = Queue()
emitter = Emitter(bus)
return emitter

File diff suppressed because it is too large Load Diff

View File

@@ -212,6 +212,7 @@ class DocumentSource(str, Enum):
PRODUCTBOARD = "productboard"
FILE = "file"
CODA = "coda"
CANVAS = "canvas"
NOTION = "notion"
ZULIP = "zulip"
LINEAR = "linear"
@@ -672,6 +673,7 @@ DocumentSourceDescription: dict[DocumentSource, str] = {
DocumentSource.SLAB: "slab data",
DocumentSource.PRODUCTBOARD: "productboard data (boards, etc.)",
DocumentSource.FILE: "files",
DocumentSource.CANVAS: "canvas lms - courses, pages, assignments, and announcements",
DocumentSource.CODA: "coda - team workspace with docs, tables, and pages",
DocumentSource.NOTION: "notion data - a workspace that combines note-taking, \
project management, and collaboration tools into a single, customizable platform",

View File

@@ -0,0 +1,32 @@
"""
Permissioning / AccessControl logic for Canvas courses.
CE stub — returns None (no permissions). The EE implementation is loaded
at runtime via ``fetch_versioned_implementation``.
"""
from collections.abc import Callable
from typing import cast
from onyx.access.models import ExternalAccess
from onyx.connectors.canvas.client import CanvasApiClient
from onyx.utils.variable_functionality import fetch_versioned_implementation
from onyx.utils.variable_functionality import global_version
def get_course_permissions(
canvas_client: CanvasApiClient,
course_id: int,
) -> ExternalAccess | None:
if not global_version.is_ee_version():
return None
ee_get_course_permissions = cast(
Callable[[CanvasApiClient, int], ExternalAccess | None],
fetch_versioned_implementation(
"onyx.external_permissions.canvas.access",
"get_course_permissions",
),
)
return ee_get_course_permissions(canvas_client, course_id)

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import logging
import re
from collections.abc import Iterator
from typing import Any
from urllib.parse import urlparse
@@ -190,3 +191,22 @@ class CanvasApiClient:
if clean_endpoint:
final_url += "/" + clean_endpoint
return final_url
def paginate(
self,
endpoint: str,
params: dict[str, Any] | None = None,
) -> Iterator[list[Any]]:
"""Yield each page of results, following Link-header pagination.
Makes the first request with endpoint + params, then follows
next_url from Link headers for subsequent pages.
"""
response, next_url = self.get(endpoint, params=params)
while True:
if not response:
break
yield response
if not next_url:
break
response, next_url = self.get(full_url=next_url)

View File

@@ -1,17 +1,82 @@
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import cast
from typing import Literal
from typing import NoReturn
from typing import TypeAlias
from pydantic import BaseModel
from retry import retry
from typing_extensions import override
from onyx.access.models import ExternalAccess
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.canvas.access import get_course_permissions
from onyx.connectors.canvas.client import CanvasApiClient
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedValidationError
from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import ImageSection
from onyx.connectors.models import TextSection
from onyx.error_handling.exceptions import OnyxError
from onyx.file_processing.html_utils import parse_html_page_basic
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.logger import setup_logger
logger = setup_logger()
def _handle_canvas_api_error(e: OnyxError) -> NoReturn:
"""Map Canvas API errors to connector framework exceptions."""
if e.status_code == 401:
raise CredentialExpiredError(
"Canvas API token is invalid or expired (HTTP 401)."
)
elif e.status_code == 403:
raise InsufficientPermissionsError(
"Canvas API token does not have sufficient permissions (HTTP 403)."
)
elif e.status_code == 429:
raise ConnectorValidationError(
"Canvas rate-limit exceeded (HTTP 429). Please try again later."
)
elif e.status_code >= 500:
raise UnexpectedValidationError(
f"Unexpected Canvas HTTP error (status={e.status_code}): {e}"
)
else:
raise ConnectorValidationError(
f"Canvas API error (status={e.status_code}): {e}"
)
class CanvasCourse(BaseModel):
id: int
name: str
course_code: str
created_at: str
workflow_state: str
name: str | None = None
course_code: str | None = None
created_at: str | None = None
workflow_state: str | None = None
@classmethod
def from_api(cls, payload: dict[str, Any]) -> "CanvasCourse":
return cls(
id=payload["id"],
name=payload.get("name"),
course_code=payload.get("course_code"),
created_at=payload.get("created_at"),
workflow_state=payload.get("workflow_state"),
)
class CanvasPage(BaseModel):
@@ -19,10 +84,22 @@ class CanvasPage(BaseModel):
url: str
title: str
body: str | None = None
created_at: str
updated_at: str
created_at: str | None = None
updated_at: str | None = None
course_id: int
@classmethod
def from_api(cls, payload: dict[str, Any], course_id: int) -> "CanvasPage":
return cls(
page_id=payload["page_id"],
url=payload["url"],
title=payload["title"],
body=payload.get("body"),
created_at=payload.get("created_at"),
updated_at=payload.get("updated_at"),
course_id=course_id,
)
class CanvasAssignment(BaseModel):
id: int
@@ -30,10 +107,23 @@ class CanvasAssignment(BaseModel):
description: str | None = None
html_url: str
course_id: int
created_at: str
updated_at: str
created_at: str | None = None
updated_at: str | None = None
due_at: str | None = None
@classmethod
def from_api(cls, payload: dict[str, Any], course_id: int) -> "CanvasAssignment":
return cls(
id=payload["id"],
name=payload["name"],
description=payload.get("description"),
html_url=payload["html_url"],
course_id=course_id,
created_at=payload.get("created_at"),
updated_at=payload.get("updated_at"),
due_at=payload.get("due_at"),
)
class CanvasAnnouncement(BaseModel):
id: int
@@ -43,6 +133,17 @@ class CanvasAnnouncement(BaseModel):
posted_at: str | None = None
course_id: int
@classmethod
def from_api(cls, payload: dict[str, Any], course_id: int) -> "CanvasAnnouncement":
return cls(
id=payload["id"],
title=payload["title"],
message=payload.get("message"),
html_url=payload["html_url"],
posted_at=payload.get("posted_at"),
course_id=course_id,
)
CanvasStage: TypeAlias = Literal["pages", "assignments", "announcements"]
@@ -72,3 +173,286 @@ class CanvasConnectorCheckpoint(ConnectorCheckpoint):
self.current_course_index += 1
self.stage = "pages"
self.next_url = None
class CanvasConnector(
CheckpointedConnectorWithPermSync[CanvasConnectorCheckpoint],
SlimConnectorWithPermSync,
):
def __init__(
self,
canvas_base_url: str,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.canvas_base_url = canvas_base_url.rstrip("/").removesuffix("/api/v1")
self.batch_size = batch_size
self._canvas_client: CanvasApiClient | None = None
self._course_permissions_cache: dict[int, ExternalAccess | None] = {}
@property
def canvas_client(self) -> CanvasApiClient:
if self._canvas_client is None:
raise ConnectorMissingCredentialError("Canvas")
return self._canvas_client
def _get_course_permissions(self, course_id: int) -> ExternalAccess | None:
"""Get course permissions with caching."""
if course_id not in self._course_permissions_cache:
self._course_permissions_cache[course_id] = get_course_permissions(
canvas_client=self.canvas_client,
course_id=course_id,
)
return self._course_permissions_cache[course_id]
@retry(tries=3, delay=1, backoff=2)
def _list_courses(self) -> list[CanvasCourse]:
"""Fetch all courses accessible to the authenticated user."""
logger.debug("Fetching Canvas courses")
courses: list[CanvasCourse] = []
for page in self.canvas_client.paginate(
"courses", params={"per_page": "100", "state[]": "available"}
):
courses.extend(CanvasCourse.from_api(c) for c in page)
return courses
@retry(tries=3, delay=1, backoff=2)
def _list_pages(self, course_id: int) -> list[CanvasPage]:
"""Fetch all pages for a given course."""
logger.debug(f"Fetching pages for course {course_id}")
pages: list[CanvasPage] = []
for page in self.canvas_client.paginate(
f"courses/{course_id}/pages",
params={"per_page": "100", "include[]": "body", "published": "true"},
):
pages.extend(CanvasPage.from_api(p, course_id=course_id) for p in page)
return pages
@retry(tries=3, delay=1, backoff=2)
def _list_assignments(self, course_id: int) -> list[CanvasAssignment]:
"""Fetch all assignments for a given course."""
logger.debug(f"Fetching assignments for course {course_id}")
assignments: list[CanvasAssignment] = []
for page in self.canvas_client.paginate(
f"courses/{course_id}/assignments",
params={"per_page": "100", "published": "true"},
):
assignments.extend(
CanvasAssignment.from_api(a, course_id=course_id) for a in page
)
return assignments
@retry(tries=3, delay=1, backoff=2)
def _list_announcements(self, course_id: int) -> list[CanvasAnnouncement]:
"""Fetch all announcements for a given course."""
logger.debug(f"Fetching announcements for course {course_id}")
announcements: list[CanvasAnnouncement] = []
for page in self.canvas_client.paginate(
"announcements",
params={
"per_page": "100",
"context_codes[]": f"course_{course_id}",
"active_only": "true",
},
):
announcements.extend(
CanvasAnnouncement.from_api(a, course_id=course_id) for a in page
)
return announcements
def _build_document(
self,
doc_id: str,
link: str,
text: str,
semantic_identifier: str,
doc_updated_at: datetime | None,
course_id: int,
doc_type: str,
) -> Document:
"""Build a Document with standard Canvas fields."""
return Document(
id=doc_id,
sections=cast(
list[TextSection | ImageSection],
[TextSection(link=link, text=text)],
),
source=DocumentSource.CANVAS,
semantic_identifier=semantic_identifier,
doc_updated_at=doc_updated_at,
metadata={"course_id": str(course_id), "type": doc_type},
)
def _convert_page_to_document(self, page: CanvasPage) -> Document:
"""Convert a Canvas page to a Document."""
link = f"{self.canvas_base_url}/courses/{page.course_id}/pages/{page.url}"
text_parts = [page.title]
body_text = parse_html_page_basic(page.body) if page.body else ""
if body_text:
text_parts.append(body_text)
doc_updated_at = (
datetime.fromisoformat(page.updated_at.replace("Z", "+00:00")).astimezone(
timezone.utc
)
if page.updated_at
else None
)
document = self._build_document(
doc_id=f"canvas-page-{page.course_id}-{page.page_id}",
link=link,
text="\n\n".join(text_parts),
semantic_identifier=page.title or f"Page {page.page_id}",
doc_updated_at=doc_updated_at,
course_id=page.course_id,
doc_type="page",
)
return document
def _convert_assignment_to_document(self, assignment: CanvasAssignment) -> Document:
"""Convert a Canvas assignment to a Document."""
text_parts = [assignment.name]
desc_text = (
parse_html_page_basic(assignment.description)
if assignment.description
else ""
)
if desc_text:
text_parts.append(desc_text)
if assignment.due_at:
due_dt = datetime.fromisoformat(
assignment.due_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
text_parts.append(f"Due: {due_dt.strftime('%B %d, %Y %H:%M UTC')}")
doc_updated_at = (
datetime.fromisoformat(
assignment.updated_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
if assignment.updated_at
else None
)
document = self._build_document(
doc_id=f"canvas-assignment-{assignment.course_id}-{assignment.id}",
link=assignment.html_url,
text="\n\n".join(text_parts),
semantic_identifier=assignment.name or f"Assignment {assignment.id}",
doc_updated_at=doc_updated_at,
course_id=assignment.course_id,
doc_type="assignment",
)
return document
def _convert_announcement_to_document(
self, announcement: CanvasAnnouncement
) -> Document:
"""Convert a Canvas announcement to a Document."""
text_parts = [announcement.title]
msg_text = (
parse_html_page_basic(announcement.message) if announcement.message else ""
)
if msg_text:
text_parts.append(msg_text)
doc_updated_at = (
datetime.fromisoformat(
announcement.posted_at.replace("Z", "+00:00")
).astimezone(timezone.utc)
if announcement.posted_at
else None
)
document = self._build_document(
doc_id=f"canvas-announcement-{announcement.course_id}-{announcement.id}",
link=announcement.html_url,
text="\n\n".join(text_parts),
semantic_identifier=announcement.title or f"Announcement {announcement.id}",
doc_updated_at=doc_updated_at,
course_id=announcement.course_id,
doc_type="announcement",
)
return document
@override
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
"""Load and validate Canvas credentials."""
access_token = credentials.get("canvas_access_token")
if not access_token:
raise ConnectorMissingCredentialError("Canvas")
try:
client = CanvasApiClient(
bearer_token=access_token,
canvas_base_url=self.canvas_base_url,
)
client.get("courses", params={"per_page": "1"})
except ValueError as e:
raise ConnectorValidationError(f"Invalid Canvas base URL: {e}")
except OnyxError as e:
_handle_canvas_api_error(e)
self._canvas_client = client
return None
@override
def validate_connector_settings(self) -> None:
"""Validate Canvas connector settings by testing API access."""
try:
self.canvas_client.get("courses", params={"per_page": "1"})
logger.info("Canvas connector settings validated successfully")
except OnyxError as e:
_handle_canvas_api_error(e)
except ConnectorMissingCredentialError:
raise
except Exception as exc:
raise UnexpectedValidationError(
f"Unexpected error during Canvas settings validation: {exc}"
)
@override
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CanvasConnectorCheckpoint,
) -> CheckpointOutput[CanvasConnectorCheckpoint]:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def load_from_checkpoint_with_perm_sync(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CanvasConnectorCheckpoint,
) -> CheckpointOutput[CanvasConnectorCheckpoint]:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def build_dummy_checkpoint(self) -> CanvasConnectorCheckpoint:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def validate_checkpoint_json(
self, checkpoint_json: str
) -> CanvasConnectorCheckpoint:
# TODO(benwu408): implemented in PR3 (checkpoint)
raise NotImplementedError
@override
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
# TODO(benwu408): implemented in PR4 (perm sync)
raise NotImplementedError

View File

@@ -72,6 +72,10 @@ CONNECTOR_CLASS_MAP = {
module_path="onyx.connectors.coda.connector",
class_name="CodaConnector",
),
DocumentSource.CANVAS: ConnectorMapping(
module_path="onyx.connectors.canvas.connector",
class_name="CanvasConnector",
),
DocumentSource.NOTION: ConnectorMapping(
module_path="onyx.connectors.notion.connector",
class_name="NotionConnector",

View File

@@ -185,7 +185,15 @@ def delete_messages_and_files_from_chat_session(
for _, files in messages_with_files:
file_store = get_default_file_store()
for file_info in files or []:
file_store.delete_file(file_id=file_info.get("id"))
try:
file_store.delete_file(file_id=file_info.get("id"))
except Exception:
logger.warning(
"Failed to delete file %s from file store during "
"chat session cleanup, skipping.",
file_info.get("id"),
)
continue
# Delete ChatMessage records - CASCADE constraints will automatically handle:
# - ChatMessage__StandardAnswer relationship records
@@ -617,92 +625,6 @@ def reserve_message_id(
return empty_message
def reserve_multi_model_message_ids(
db_session: Session,
chat_session_id: UUID,
parent_message_id: int,
model_display_names: list[str],
) -> list[ChatMessage]:
"""Reserve N assistant message placeholders for multi-model parallel streaming.
All messages share the same parent (the user message). The parent's
latest_child_message_id points to the LAST reserved message so that the
default history-chain walker picks it up.
"""
reserved: list[ChatMessage] = []
for display_name in model_display_names:
msg = ChatMessage(
chat_session_id=chat_session_id,
parent_message_id=parent_message_id,
latest_child_message_id=None,
message="Response was terminated prior to completion, try regenerating.",
token_count=15, # placeholder; updated on completion by llm_loop_completion_handle
message_type=MessageType.ASSISTANT,
model_display_name=display_name,
)
db_session.add(msg)
reserved.append(msg)
# Flush to assign IDs without committing yet
db_session.flush()
# Point parent's latest_child to the last reserved message
parent = (
db_session.query(ChatMessage)
.filter(ChatMessage.id == parent_message_id)
.first()
)
if parent:
parent.latest_child_message_id = reserved[-1].id
db_session.commit()
return reserved
def set_preferred_response(
db_session: Session,
user_message_id: int,
preferred_assistant_message_id: int,
) -> None:
"""Mark one assistant response as the user's preferred choice in a multi-model turn.
Also advances ``latest_child_message_id`` so the preferred response becomes
the active branch for any subsequent messages in the conversation.
Args:
db_session: Active database session.
user_message_id: Primary key of the ``USER``-type ``ChatMessage`` whose
preferred response is being set.
preferred_assistant_message_id: Primary key of the ``ASSISTANT``-type
``ChatMessage`` to prefer. Must be a direct child of ``user_message_id``.
Raises:
ValueError: If either message is not found, if ``user_message_id`` does not
refer to a USER message, or if the assistant message is not a direct child
of the user message.
"""
user_msg = db_session.get(ChatMessage, user_message_id)
if user_msg is None:
raise ValueError(f"User message {user_message_id} not found")
if user_msg.message_type != MessageType.USER:
raise ValueError(f"Message {user_message_id} is not a user message")
assistant_msg = db_session.get(ChatMessage, preferred_assistant_message_id)
if assistant_msg is None:
raise ValueError(
f"Assistant message {preferred_assistant_message_id} not found"
)
if assistant_msg.parent_message_id != user_message_id:
raise ValueError(
f"Assistant message {preferred_assistant_message_id} is not a child "
f"of user message {user_message_id}"
)
user_msg.preferred_response_id = preferred_assistant_message_id
user_msg.latest_child_message_id = preferred_assistant_message_id
db_session.commit()
def create_new_chat_message(
chat_session_id: UUID,
parent_message: ChatMessage,
@@ -925,8 +847,6 @@ def translate_db_message_to_chat_message_detail(
error=chat_message.error,
current_feedback=current_feedback,
processing_duration_seconds=chat_message.processing_duration_seconds,
preferred_response_id=chat_message.preferred_response_id,
model_display_name=chat_message.model_display_name,
)
return chat_msg_detail

View File

@@ -8,24 +8,6 @@ from pydantic import BaseModel
class LLMOverride(BaseModel):
"""Per-request LLM settings that override persona defaults.
All fields are optional — only the fields that differ from the persona's
configured LLM need to be supplied. Used both over the wire (API requests)
and for multi-model comparison, where one override is supplied per model.
Attributes:
model_provider: LLM provider slug (e.g. ``"openai"``, ``"anthropic"``).
When ``None``, the persona's default provider is used.
model_version: Specific model version string (e.g. ``"gpt-4o"``).
When ``None``, the persona's default model is used.
temperature: Sampling temperature in ``[0, 2]``. When ``None``, the
persona's default temperature is used.
display_name: Human-readable label shown in the UI for this model,
e.g. ``"GPT-4 Turbo"``. Optional; falls back to ``model_version``
when not set.
"""
model_provider: str | None = None
model_version: str | None = None
temperature: float | None = None

View File

@@ -439,6 +439,7 @@ def get_application(lifespan_override: Lifespan | None = None) -> FastAPI:
dsn=SENTRY_DSN,
integrations=[StarletteIntegration(), FastApiIntegration()],
traces_sample_rate=0.1,
release=__version__,
)
logger.info("Sentry initialized")
else:

View File

@@ -28,7 +28,6 @@ from onyx.chat.chat_utils import extract_headers
from onyx.chat.models import ChatFullResponse
from onyx.chat.models import CreateChatSessionID
from onyx.chat.process_message import gather_stream_full
from onyx.chat.process_message import handle_multi_model_stream
from onyx.chat.process_message import handle_stream_message_objects
from onyx.chat.prompt_utils import get_default_base_system_prompt
from onyx.chat.stop_signal_checker import set_fence
@@ -47,7 +46,6 @@ from onyx.db.chat import get_chat_messages_by_session
from onyx.db.chat import get_chat_session_by_id
from onyx.db.chat import get_chat_sessions_by_user
from onyx.db.chat import set_as_latest_chat_message
from onyx.db.chat import set_preferred_response
from onyx.db.chat import translate_db_message_to_chat_message_detail
from onyx.db.chat import update_chat_session
from onyx.db.chat_search import search_chat_sessions
@@ -62,8 +60,6 @@ from onyx.db.persona import get_persona_by_id
from onyx.db.usage import increment_usage
from onyx.db.usage import UsageType
from onyx.db.user_file import get_file_id_by_user_file_id
from onyx.error_handling.error_codes import OnyxErrorCode
from onyx.error_handling.exceptions import OnyxError
from onyx.file_store.file_store import get_default_file_store
from onyx.llm.constants import LlmProviderNames
from onyx.llm.factory import get_default_llm
@@ -85,7 +81,6 @@ from onyx.server.query_and_chat.models import ChatSessionUpdateRequest
from onyx.server.query_and_chat.models import MessageOrigin
from onyx.server.query_and_chat.models import RenameChatSessionResponse
from onyx.server.query_and_chat.models import SendMessageRequest
from onyx.server.query_and_chat.models import SetPreferredResponseRequest
from onyx.server.query_and_chat.models import UpdateChatSessionTemperatureRequest
from onyx.server.query_and_chat.models import UpdateChatSessionThreadRequest
from onyx.server.query_and_chat.session_loading import (
@@ -575,46 +570,6 @@ def handle_send_chat_message(
if get_hashed_api_key_from_request(request) or get_hashed_pat_from_request(request):
chat_message_req.origin = MessageOrigin.API
# Multi-model streaming path: 2-3 LLMs in parallel (streaming only)
is_multi_model = (
chat_message_req.llm_overrides is not None
and len(chat_message_req.llm_overrides) > 1
)
if is_multi_model and chat_message_req.stream:
# Narrowed here; is_multi_model already checked llm_overrides is not None
llm_overrides = chat_message_req.llm_overrides or []
def multi_model_stream_generator() -> Generator[str, None, None]:
try:
with get_session_with_current_tenant() as db_session:
for obj in handle_multi_model_stream(
new_msg_req=chat_message_req,
user=user,
db_session=db_session,
llm_overrides=llm_overrides,
litellm_additional_headers=extract_headers(
request.headers, LITELLM_PASS_THROUGH_HEADERS
),
custom_tool_additional_headers=get_custom_tool_additional_request_headers(
request.headers
),
mcp_headers=chat_message_req.mcp_headers,
):
yield get_json_line(obj.model_dump())
except Exception as e:
logger.exception("Error in multi-model streaming")
yield json.dumps({"error": str(e)})
return StreamingResponse(
multi_model_stream_generator(), media_type="text/event-stream"
)
if is_multi_model and not chat_message_req.stream:
raise OnyxError(
OnyxErrorCode.INVALID_INPUT,
"Multi-model mode (llm_overrides with >1 entry) requires stream=True.",
)
# Non-streaming path: consume all packets and return complete response
if not chat_message_req.stream:
with get_session_with_current_tenant() as db_session:
@@ -705,30 +660,6 @@ def set_message_as_latest(
)
@router.put("/set-preferred-response")
def set_preferred_response_endpoint(
request_body: SetPreferredResponseRequest,
user: User | None = Depends(current_user),
db_session: Session = Depends(get_session),
) -> None:
"""Set the preferred assistant response for a multi-model turn."""
try:
# Ownership check: get_chat_message raises ValueError if the message
# doesn't belong to this user, preventing cross-user mutation.
get_chat_message(
chat_message_id=request_body.user_message_id,
user_id=user.id if user else None,
db_session=db_session,
)
set_preferred_response(
db_session=db_session,
user_message_id=request_body.user_message_id,
preferred_assistant_message_id=request_body.preferred_response_id,
)
except ValueError as e:
raise OnyxError(OnyxErrorCode.INVALID_INPUT, str(e))
@router.post("/create-chat-message-feedback")
def create_chat_feedback(
feedback: ChatFeedbackRequest,

View File

@@ -2,25 +2,11 @@ from pydantic import BaseModel
class Placement(BaseModel):
"""Coordinates that identify where a streaming packet belongs in the UI.
The frontend uses these fields to route each packet to the correct turn,
tool tab, agent sub-turn, and (in multi-model mode) response column.
Attributes:
turn_index: Monotonically increasing index of the iterative reasoning block
(e.g. tool call round) within this chat message. Lower values happened first.
tab_index: Disambiguates parallel tool calls within the same turn so each
tool's output can be displayed in its own tab.
sub_turn_index: Nesting level for tools that invoke other tools. ``None`` for
top-level packets; an integer for tool-within-tool output.
model_index: Which model this packet belongs to. ``0`` for single-model
responses; ``0``, ``1``, or ``2`` for multi-model comparison. ``None``
for pre-LLM setup packets (e.g. message ID info) that are yielded
before any Emitter runs.
"""
# Which iterative block in the UI is this part of, these are ordered and smaller ones happened first
turn_index: int
# For parallel tool calls to preserve order of execution
tab_index: int = 0
# Used for tools/agents that call other tools, this currently doesn't support nested agents but can be added later
sub_turn_index: int | None = None
# For multi-model streaming: identifies which model (0, 1, 2) this packet belongs to.
model_index: int | None = None

View File

@@ -1,4 +1,3 @@
import queue
import time
from collections.abc import Callable
from typing import Any
@@ -709,6 +708,7 @@ def run_research_agent_calls(
if __name__ == "__main__":
from queue import Queue
from uuid import uuid4
from onyx.chat.chat_state import ChatStateContainer
@@ -744,8 +744,8 @@ if __name__ == "__main__":
if user is None:
raise ValueError("No users found in database. Please create a user first.")
emitter_queue: queue.Queue = queue.Queue()
emitter = Emitter(merged_queue=emitter_queue)
bus: Queue[Packet] = Queue()
emitter = Emitter(bus)
state_container = ChatStateContainer()
tool_dict = construct_tools(
@@ -792,4 +792,4 @@ if __name__ == "__main__":
print(result.intermediate_report)
print("=" * 80)
print(f"Citations: {result.citation_mapping}")
print(f"Total packets emitted: {emitter_queue.qsize()}")
print(f"Total packets emitted: {bus.qsize()}")

View File

@@ -1,6 +1,5 @@
import csv
import json
import queue
import uuid
from io import BytesIO
from io import StringIO
@@ -12,6 +11,7 @@ import requests
from requests import JSONDecodeError
from onyx.chat.emitter import Emitter
from onyx.chat.emitter import get_default_emitter
from onyx.configs.constants import FileOrigin
from onyx.file_store.file_store import get_default_file_store
from onyx.server.query_and_chat.placement import Placement
@@ -296,9 +296,9 @@ def build_custom_tools_from_openapi_schema_and_headers(
url = openapi_to_url(openapi_schema)
method_specs = openapi_to_method_specs(openapi_schema)
# Use a discard emitter if none provided (packets go nowhere)
# Use default emitter if none provided
if emitter is None:
emitter = Emitter(merged_queue=queue.Queue())
emitter = get_default_emitter()
return [
CustomTool(
@@ -367,7 +367,7 @@ if __name__ == "__main__":
tools = build_custom_tools_from_openapi_schema_and_headers(
tool_id=0, # dummy tool id
openapi_schema=openapi_schema,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
dynamic_schema_info=None,
)

View File

@@ -27,13 +27,11 @@ def create_placement(
turn_index: int,
tab_index: int = 0,
sub_turn_index: int | None = None,
model_index: int | None = 0,
) -> Placement:
return Placement(
turn_index=turn_index,
tab_index=tab_index,
sub_turn_index=sub_turn_index,
model_index=model_index,
)

View File

@@ -13,7 +13,6 @@ This test:
All external HTTP calls are mocked, but Postgres and Redis are running.
"""
import queue
from typing import Any
from unittest.mock import patch
from uuid import uuid4
@@ -21,7 +20,7 @@ from uuid import uuid4
import pytest
from sqlalchemy.orm import Session
from onyx.chat.emitter import Emitter
from onyx.chat.emitter import get_default_emitter
from onyx.db.enums import MCPAuthenticationPerformer
from onyx.db.enums import MCPAuthenticationType
from onyx.db.enums import MCPTransport
@@ -138,7 +137,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
search_tool_config=search_tool_config,
@@ -201,7 +200,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
search_tool_config=SearchToolConfig(),
@@ -276,7 +275,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
search_tool_config=SearchToolConfig(),
@@ -351,7 +350,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
search_tool_config=SearchToolConfig(),
@@ -459,7 +458,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
search_tool_config=SearchToolConfig(),
@@ -542,7 +541,7 @@ class TestMCPPassThroughOAuth:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
search_tool_config=SearchToolConfig(),

View File

@@ -8,7 +8,6 @@ Tests the priority logic for OAuth tokens when constructing custom tools:
All external HTTP calls are mocked, but Postgres and Redis are running.
"""
import queue
from typing import Any
from unittest.mock import Mock
from unittest.mock import patch
@@ -17,7 +16,7 @@ from uuid import uuid4
import pytest
from sqlalchemy.orm import Session
from onyx.chat.emitter import Emitter
from onyx.chat.emitter import get_default_emitter
from onyx.db.models import OAuthAccount
from onyx.db.models import OAuthConfig
from onyx.db.models import Persona
@@ -175,7 +174,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
search_tool_config=search_tool_config,
@@ -233,7 +232,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
)
@@ -285,7 +284,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
)
@@ -346,7 +345,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
)
@@ -417,7 +416,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
)
@@ -484,7 +483,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
)
@@ -537,7 +536,7 @@ class TestOAuthToolIntegrationPriority:
tool_dict = construct_tools(
persona=persona,
db_session=db_session,
emitter=Emitter(merged_queue=queue.Queue()),
emitter=get_default_emitter(),
user=user,
llm=llm,
)

View File

@@ -0,0 +1,109 @@
"""Tests for TTL management task resilience."""
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
from uuid import uuid4
_TASK_MODULE = "ee.onyx.background.celery.tasks.ttl_management.tasks"
def _setup_db_session_mock(mock_get_db_session: MagicMock) -> None:
mock_db_session = MagicMock()
mock_get_db_session.return_value.__enter__ = MagicMock(
return_value=mock_db_session
)
mock_get_db_session.return_value.__exit__ = MagicMock(return_value=False)
@patch(f"{_TASK_MODULE}.get_session_with_current_tenant")
@patch(f"{_TASK_MODULE}.delete_chat_session")
@patch(f"{_TASK_MODULE}.get_chat_sessions_older_than")
@patch(f"{_TASK_MODULE}.mark_task_as_finished_with_id")
@patch(f"{_TASK_MODULE}.register_task")
def test_ttl_task_continues_after_session_delete_failure(
_mock_register: Any,
mock_mark_finished: MagicMock,
mock_get_old_sessions: MagicMock,
mock_delete_session: MagicMock,
mock_get_db_session: MagicMock,
) -> None:
"""One failing session should not prevent cleanup of remaining sessions."""
from ee.onyx.background.celery.tasks.ttl_management.tasks import (
perform_ttl_management_task,
)
user1, session1 = uuid4(), uuid4()
user2, session2 = uuid4(), uuid4()
user3, session3 = uuid4(), uuid4()
mock_get_old_sessions.return_value = [
(user1, session1),
(user2, session2),
(user3, session3),
]
# Second session fails
mock_delete_session.side_effect = [
None,
RuntimeError("File does not exist"),
None,
]
_setup_db_session_mock(mock_get_db_session)
mock_task = MagicMock()
mock_task.request.id = "test-task-id"
# Call the underlying function directly, bypassing Celery decorator
perform_ttl_management_task.__wrapped__(
mock_task, retention_limit_days=30, tenant_id="test"
)
# All three sessions should have been attempted
assert mock_delete_session.call_count == 3
# Task marked as finished with success=False (due to the one failure)
mock_mark_finished.assert_called()
finish_call_kwargs = mock_mark_finished.call_args[1]
assert finish_call_kwargs["success"] is False
@patch(f"{_TASK_MODULE}.get_session_with_current_tenant")
@patch(f"{_TASK_MODULE}.delete_chat_session")
@patch(f"{_TASK_MODULE}.get_chat_sessions_older_than")
@patch(f"{_TASK_MODULE}.mark_task_as_finished_with_id")
@patch(f"{_TASK_MODULE}.register_task")
def test_ttl_task_reports_success_when_all_deletions_pass(
_mock_register: Any,
mock_mark_finished: MagicMock,
mock_get_old_sessions: MagicMock,
mock_delete_session: MagicMock,
mock_get_db_session: MagicMock,
) -> None:
"""Task should report success when all sessions are deleted."""
from ee.onyx.background.celery.tasks.ttl_management.tasks import (
perform_ttl_management_task,
)
mock_get_old_sessions.return_value = [
(uuid4(), uuid4()),
(uuid4(), uuid4()),
]
mock_delete_session.side_effect = None
_setup_db_session_mock(mock_get_db_session)
mock_task = MagicMock()
mock_task.request.id = "test-task-id"
perform_ttl_management_task.__wrapped__(
mock_task, retention_limit_days=30, tenant_id="test"
)
assert mock_delete_session.call_count == 2
mock_mark_finished.assert_called()
finish_call_kwargs = mock_mark_finished.call_args[1]
assert finish_call_kwargs["success"] is True

View File

@@ -1,173 +0,0 @@
"""Unit tests for the Emitter class.
All tests use the streaming mode (merged_queue required). Emitter has a single
code path — no standalone bus.
"""
import queue
from onyx.chat.emitter import Emitter
from onyx.server.query_and_chat.placement import Placement
from onyx.server.query_and_chat.streaming_models import OverallStop
from onyx.server.query_and_chat.streaming_models import Packet
from onyx.server.query_and_chat.streaming_models import ReasoningStart
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _placement(
turn_index: int = 0,
tab_index: int = 0,
sub_turn_index: int | None = None,
) -> Placement:
return Placement(
turn_index=turn_index,
tab_index=tab_index,
sub_turn_index=sub_turn_index,
)
def _packet(
turn_index: int = 0,
tab_index: int = 0,
sub_turn_index: int | None = None,
) -> Packet:
"""Build a minimal valid packet with an OverallStop payload."""
return Packet(
placement=_placement(turn_index, tab_index, sub_turn_index),
obj=OverallStop(stop_reason="test"),
)
def _make_emitter(model_idx: int = 0) -> tuple["Emitter", "queue.Queue"]:
"""Return (emitter, queue) wired together."""
mq: queue.Queue = queue.Queue()
return Emitter(merged_queue=mq, model_idx=model_idx), mq
# ---------------------------------------------------------------------------
# Queue routing
# ---------------------------------------------------------------------------
class TestEmitterQueueRouting:
def test_emit_lands_on_merged_queue(self) -> None:
emitter, mq = _make_emitter()
emitter.emit(_packet())
assert not mq.empty()
def test_queue_item_is_tuple_of_key_and_packet(self) -> None:
emitter, mq = _make_emitter(model_idx=1)
emitter.emit(_packet())
item = mq.get_nowait()
assert isinstance(item, tuple)
assert len(item) == 2
def test_multiple_packets_delivered_fifo(self) -> None:
emitter, mq = _make_emitter()
p1 = _packet(turn_index=0)
p2 = _packet(turn_index=1)
emitter.emit(p1)
emitter.emit(p2)
_, t1 = mq.get_nowait()
_, t2 = mq.get_nowait()
assert t1.placement.turn_index == 0
assert t2.placement.turn_index == 1
# ---------------------------------------------------------------------------
# model_index tagging
# ---------------------------------------------------------------------------
class TestEmitterModelIndexTagging:
def test_n1_default_model_idx_tags_model_index_zero(self) -> None:
"""N=1: default model_idx=0, so packet gets model_index=0."""
emitter, mq = _make_emitter(model_idx=0)
emitter.emit(_packet())
_key, tagged = mq.get_nowait()
assert tagged.placement.model_index == 0
def test_model_idx_one_tags_packet(self) -> None:
emitter, mq = _make_emitter(model_idx=1)
emitter.emit(_packet())
_key, tagged = mq.get_nowait()
assert tagged.placement.model_index == 1
def test_model_idx_two_tags_packet(self) -> None:
"""Boundary: third model in a 3-model run."""
emitter, mq = _make_emitter(model_idx=2)
emitter.emit(_packet())
_key, tagged = mq.get_nowait()
assert tagged.placement.model_index == 2
# ---------------------------------------------------------------------------
# Queue key
# ---------------------------------------------------------------------------
class TestEmitterQueueKey:
def test_key_equals_model_idx(self) -> None:
"""Drain loop uses the key to route packets; it must match model_idx."""
emitter, mq = _make_emitter(model_idx=2)
emitter.emit(_packet())
key, _ = mq.get_nowait()
assert key == 2
def test_n1_key_is_zero(self) -> None:
emitter, mq = _make_emitter(model_idx=0)
emitter.emit(_packet())
key, _ = mq.get_nowait()
assert key == 0
# ---------------------------------------------------------------------------
# Placement field preservation
# ---------------------------------------------------------------------------
class TestEmitterPlacementPreservation:
def test_turn_index_is_preserved(self) -> None:
emitter, mq = _make_emitter()
emitter.emit(_packet(turn_index=5))
_, tagged = mq.get_nowait()
assert tagged.placement.turn_index == 5
def test_tab_index_is_preserved(self) -> None:
emitter, mq = _make_emitter()
emitter.emit(_packet(tab_index=3))
_, tagged = mq.get_nowait()
assert tagged.placement.tab_index == 3
def test_sub_turn_index_is_preserved(self) -> None:
emitter, mq = _make_emitter()
emitter.emit(_packet(sub_turn_index=2))
_, tagged = mq.get_nowait()
assert tagged.placement.sub_turn_index == 2
def test_sub_turn_index_none_is_preserved(self) -> None:
emitter, mq = _make_emitter()
emitter.emit(_packet(sub_turn_index=None))
_, tagged = mq.get_nowait()
assert tagged.placement.sub_turn_index is None
def test_packet_obj_is_not_modified(self) -> None:
"""The payload object must survive tagging untouched."""
emitter, mq = _make_emitter()
original_obj = OverallStop(stop_reason="sentinel")
pkt = Packet(placement=_placement(), obj=original_obj)
emitter.emit(pkt)
_, tagged = mq.get_nowait()
assert tagged.obj is original_obj
def test_different_obj_types_are_handled(self) -> None:
"""Any valid PacketObj type passes through correctly."""
emitter, mq = _make_emitter()
pkt = Packet(placement=_placement(), obj=ReasoningStart())
emitter.emit(pkt)
_, tagged = mq.get_nowait()
assert isinstance(tagged.obj, ReasoningStart)

View File

@@ -1,768 +0,0 @@
"""Unit tests for multi-model streaming validation and DB helpers.
These are pure unit tests — no real database or LLM calls required.
The validation logic in handle_multi_model_stream fires before any external
calls, so we can trigger it with lightweight mocks.
"""
import time
from collections.abc import Generator
from typing import Any
from typing import cast
from unittest.mock import MagicMock
from unittest.mock import patch
from uuid import uuid4
import pytest
from onyx.chat.models import StreamingError
from onyx.configs.constants import MessageType
from onyx.db.chat import set_preferred_response
from onyx.llm.override_models import LLMOverride
from onyx.server.query_and_chat.models import SendMessageRequest
from onyx.server.query_and_chat.placement import Placement
from onyx.server.query_and_chat.streaming_models import OverallStop
from onyx.server.query_and_chat.streaming_models import Packet
from onyx.server.query_and_chat.streaming_models import ReasoningStart
from onyx.utils.variable_functionality import global_version
@pytest.fixture(autouse=True)
def _restore_ee_version() -> Generator[None, None, None]:
"""Reset EE global state after each test.
Importing onyx.chat.process_message triggers set_is_ee_based_on_env_variable()
(via the celery client import chain). Without this fixture, the EE flag stays
True for the rest of the session and breaks unrelated tests that mock Confluence
or other connectors and assume EE is disabled.
"""
original = global_version._is_ee
yield
global_version._is_ee = original
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_request(**kwargs: Any) -> SendMessageRequest:
defaults: dict[str, Any] = {
"message": "hello",
"chat_session_id": uuid4(),
}
defaults.update(kwargs)
return SendMessageRequest(**defaults)
def _make_override(provider: str = "openai", version: str = "gpt-4") -> LLMOverride:
return LLMOverride(model_provider=provider, model_version=version)
def _first_from_stream(req: SendMessageRequest, overrides: list[LLMOverride]) -> Any:
"""Return the first item yielded by handle_multi_model_stream."""
from onyx.chat.process_message import handle_multi_model_stream
user = MagicMock()
user.is_anonymous = False
user.email = "test@example.com"
db = MagicMock()
gen = handle_multi_model_stream(req, user, db, overrides)
return next(gen)
# ---------------------------------------------------------------------------
# handle_multi_model_stream — validation
# ---------------------------------------------------------------------------
class TestRunMultiModelStreamValidation:
def test_single_override_yields_error(self) -> None:
"""Exactly 1 override is not multi-model — yields StreamingError."""
req = _make_request()
result = _first_from_stream(req, [_make_override()])
assert isinstance(result, StreamingError)
assert "2-3" in result.error
def test_four_overrides_yields_error(self) -> None:
"""4 overrides exceeds maximum — yields StreamingError."""
req = _make_request()
result = _first_from_stream(
req,
[
_make_override("openai", "gpt-4"),
_make_override("anthropic", "claude-3"),
_make_override("google", "gemini-pro"),
_make_override("cohere", "command-r"),
],
)
assert isinstance(result, StreamingError)
assert "2-3" in result.error
def test_zero_overrides_yields_error(self) -> None:
"""Empty override list yields StreamingError."""
req = _make_request()
result = _first_from_stream(req, [])
assert isinstance(result, StreamingError)
assert "2-3" in result.error
def test_deep_research_yields_error(self) -> None:
"""deep_research=True is incompatible with multi-model — yields StreamingError."""
req = _make_request(deep_research=True)
result = _first_from_stream(
req, [_make_override(), _make_override("anthropic", "claude-3")]
)
assert isinstance(result, StreamingError)
assert "not supported" in result.error
def test_exactly_two_overrides_is_minimum(self) -> None:
"""Boundary: 1 override yields error, 2 overrides passes validation."""
req = _make_request()
# 1 override must yield a StreamingError
result = _first_from_stream(req, [_make_override()])
assert isinstance(
result, StreamingError
), "1 override should yield StreamingError"
# 2 overrides must NOT yield a validation StreamingError (may raise later due to
# missing session, that's OK — validation itself passed)
try:
result2 = _first_from_stream(
req, [_make_override(), _make_override("anthropic", "claude-3")]
)
if isinstance(result2, StreamingError) and "2-3" in result2.error:
pytest.fail(
f"2 overrides should pass validation, got StreamingError: {result2.error}"
)
except Exception:
pass # Any non-validation error means validation passed
# ---------------------------------------------------------------------------
# set_preferred_response — validation (mocked db)
# ---------------------------------------------------------------------------
class TestSetPreferredResponseValidation:
def test_user_message_not_found(self) -> None:
db = MagicMock()
db.get.return_value = None
with pytest.raises(ValueError, match="not found"):
set_preferred_response(
db, user_message_id=999, preferred_assistant_message_id=1
)
def test_wrong_message_type(self) -> None:
"""Cannot set preferred response on a non-USER message."""
db = MagicMock()
user_msg = MagicMock()
user_msg.message_type = MessageType.ASSISTANT # wrong type
db.get.return_value = user_msg
with pytest.raises(ValueError, match="not a user message"):
set_preferred_response(
db, user_message_id=1, preferred_assistant_message_id=2
)
def test_assistant_message_not_found(self) -> None:
db = MagicMock()
user_msg = MagicMock()
user_msg.message_type = MessageType.USER
# First call returns user_msg, second call (for assistant) returns None
db.get.side_effect = [user_msg, None]
with pytest.raises(ValueError, match="not found"):
set_preferred_response(
db, user_message_id=1, preferred_assistant_message_id=2
)
def test_assistant_not_child_of_user(self) -> None:
db = MagicMock()
user_msg = MagicMock()
user_msg.message_type = MessageType.USER
assistant_msg = MagicMock()
assistant_msg.parent_message_id = 999 # different parent
db.get.side_effect = [user_msg, assistant_msg]
with pytest.raises(ValueError, match="not a child"):
set_preferred_response(
db, user_message_id=1, preferred_assistant_message_id=2
)
def test_valid_call_sets_preferred_response_id(self) -> None:
db = MagicMock()
user_msg = MagicMock()
user_msg.message_type = MessageType.USER
assistant_msg = MagicMock()
assistant_msg.parent_message_id = 1 # correct parent
db.get.side_effect = [user_msg, assistant_msg]
set_preferred_response(db, user_message_id=1, preferred_assistant_message_id=2)
assert user_msg.preferred_response_id == 2
assert user_msg.latest_child_message_id == 2
# ---------------------------------------------------------------------------
# LLMOverride — display_name field
# ---------------------------------------------------------------------------
class TestLLMOverrideDisplayName:
def test_display_name_defaults_none(self) -> None:
override = LLMOverride(model_provider="openai", model_version="gpt-4")
assert override.display_name is None
def test_display_name_set(self) -> None:
override = LLMOverride(
model_provider="openai",
model_version="gpt-4",
display_name="GPT-4 Turbo",
)
assert override.display_name == "GPT-4 Turbo"
def test_display_name_serializes(self) -> None:
override = LLMOverride(
model_provider="anthropic",
model_version="claude-opus-4-6",
display_name="Claude Opus",
)
d = override.model_dump()
assert d["display_name"] == "Claude Opus"
# ---------------------------------------------------------------------------
# _run_models — drain loop behaviour
# ---------------------------------------------------------------------------
def _make_setup(n_models: int = 1) -> MagicMock:
"""Minimal ChatTurnSetup mock whose fields pass Pydantic validation in _run_model."""
setup = MagicMock()
setup.llms = [MagicMock() for _ in range(n_models)]
setup.model_display_names = [f"model-{i}" for i in range(n_models)]
setup.check_is_connected = MagicMock(return_value=True)
setup.reserved_messages = [MagicMock() for _ in range(n_models)]
setup.reserved_token_count = 100
# Fields consumed by SearchToolConfig / CustomToolConfig / FileReaderToolConfig
# constructors inside _run_model — must be typed correctly for Pydantic.
setup.new_msg_req.deep_research = False
setup.new_msg_req.internal_search_filters = None
setup.new_msg_req.allowed_tool_ids = None
setup.new_msg_req.include_citations = True
setup.search_params.project_id_filter = None
setup.search_params.persona_id_filter = None
setup.bypass_acl = False
setup.slack_context = None
setup.available_files.user_file_ids = []
setup.available_files.chat_file_ids = []
setup.forced_tool_id = None
setup.simple_chat_history = []
setup.chat_session.id = uuid4()
setup.user_message.id = None
setup.custom_tool_additional_headers = None
setup.mcp_headers = None
return setup
def _run_models_collect(setup: MagicMock) -> list:
"""Drive _run_models to completion and return all yielded items."""
from onyx.chat.process_message import _run_models
return list(_run_models(setup, MagicMock(), MagicMock()))
class TestRunModels:
"""Tests for the _run_models worker-thread drain loop.
All external dependencies (LLM, DB, tools) are patched out. Worker threads
still run but return immediately since run_llm_loop is mocked.
"""
def test_n1_overall_stop_from_llm_loop_passes_through(self) -> None:
"""OverallStop emitted by run_llm_loop is passed through the drain loop unchanged."""
def emit_stop(**kwargs: Any) -> None:
kwargs["emitter"].emit(
Packet(
placement=Placement(turn_index=0),
obj=OverallStop(stop_reason="complete"),
)
)
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=emit_stop),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(_make_setup(n_models=1))
stops = [
p
for p in packets
if isinstance(p, Packet) and isinstance(p.obj, OverallStop)
]
assert len(stops) == 1
stop_obj = stops[0].obj
assert isinstance(stop_obj, OverallStop)
assert stop_obj.stop_reason == "complete"
def test_n1_emitted_packet_has_model_index_zero(self) -> None:
"""Single-model path: model_index is 0 (Emitter defaults model_idx=0)."""
def emit_one(**kwargs: Any) -> None:
kwargs["emitter"].emit(
Packet(placement=Placement(turn_index=0), obj=ReasoningStart())
)
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=emit_one),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(_make_setup(n_models=1))
reasoning = [
p
for p in packets
if isinstance(p, Packet) and isinstance(p.obj, ReasoningStart)
]
assert len(reasoning) == 1
assert reasoning[0].placement.model_index == 0
def test_n2_each_model_packet_tagged_with_its_index(self) -> None:
"""Multi-model path: packets from model 0 get index=0, model 1 gets index=1."""
def emit_one(**kwargs: Any) -> None:
# _model_idx is set by _run_model based on position in setup.llms
emitter = kwargs["emitter"]
emitter.emit(
Packet(placement=Placement(turn_index=0), obj=ReasoningStart())
)
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=emit_one),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(_make_setup(n_models=2))
reasoning = [
p
for p in packets
if isinstance(p, Packet) and isinstance(p.obj, ReasoningStart)
]
assert len(reasoning) == 2
indices = {p.placement.model_index for p in reasoning}
assert indices == {0, 1}
def test_model_error_yields_streaming_error(self) -> None:
"""An exception inside a worker thread is surfaced as a StreamingError."""
def always_fail(**_kwargs: Any) -> None:
raise RuntimeError("intentional test failure")
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=always_fail),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(_make_setup(n_models=1))
errors = [p for p in packets if isinstance(p, StreamingError)]
assert len(errors) == 1
assert errors[0].error_code == "MODEL_ERROR"
assert "intentional test failure" in errors[0].error
def test_one_model_error_does_not_stop_other_models(self) -> None:
"""A failing model yields StreamingError; the surviving model's packets still arrive."""
setup = _make_setup(n_models=2)
def fail_model_0_succeed_model_1(**kwargs: Any) -> None:
if kwargs["llm"] is setup.llms[0]:
raise RuntimeError("model 0 failed")
kwargs["emitter"].emit(
Packet(placement=Placement(turn_index=0), obj=ReasoningStart())
)
with (
patch(
"onyx.chat.process_message.run_llm_loop",
side_effect=fail_model_0_succeed_model_1,
),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(setup)
errors = [p for p in packets if isinstance(p, StreamingError)]
assert len(errors) == 1
reasoning = [
p
for p in packets
if isinstance(p, Packet) and isinstance(p.obj, ReasoningStart)
]
assert len(reasoning) == 1
assert reasoning[0].placement.model_index == 1
def test_cancellation_yields_user_cancelled_stop(self) -> None:
"""If check_is_connected returns False, drain loop emits user_cancelled."""
def slow_llm(**_kwargs: Any) -> None:
time.sleep(0.3) # Outlasts the 50 ms queue-poll interval
setup = _make_setup(n_models=1)
setup.check_is_connected = MagicMock(return_value=False)
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=slow_llm),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
packets = _run_models_collect(setup)
stops = [
p
for p in packets
if isinstance(p, Packet) and isinstance(p.obj, OverallStop)
]
assert any(
isinstance(s.obj, OverallStop) and s.obj.stop_reason == "user_cancelled"
for s in stops
)
def test_stop_button_calls_completion_for_all_models(self) -> None:
"""llm_loop_completion_handle must be called for all models when the stop button fires.
Regression test for the disconnect-cleanup bug: the old
run_chat_loop_with_state_containers always called completion_callback in
its finally block (even on disconnect) so the DB message was updated from
the TERMINATED placeholder to a partial answer. The new _run_models must
replicate this — otherwise the integration test
test_send_message_disconnect_and_cleanup fails because the message stays
as "Response was terminated prior to completion, try regenerating."
"""
def slow_llm(**_kwargs: Any) -> None:
time.sleep(0.3)
setup = _make_setup(n_models=2)
setup.check_is_connected = MagicMock(return_value=False)
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=slow_llm),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle"
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
_run_models_collect(setup)
# Must be called once per model, not zero times
assert mock_handle.call_count == 2
def test_completion_handle_called_for_each_successful_model(self) -> None:
"""llm_loop_completion_handle must be called once per model that succeeded."""
setup = _make_setup(n_models=2)
with (
patch("onyx.chat.process_message.run_llm_loop"),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle"
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
_run_models_collect(setup)
assert mock_handle.call_count == 2
def test_completion_handle_not_called_for_failed_model(self) -> None:
"""llm_loop_completion_handle must be skipped for a model that raised."""
def always_fail(**_kwargs: Any) -> None:
raise RuntimeError("fail")
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=always_fail),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle"
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
_run_models_collect(_make_setup(n_models=1))
mock_handle.assert_not_called()
def test_http_disconnect_completion_via_generator_exit(self) -> None:
"""GeneratorExit from HTTP disconnect triggers worker self-completion.
When the HTTP client closes the connection, Starlette throws GeneratorExit
into the stream generator. The finally block sets drain_done (signalling
emitters to stop blocking) and calls executor.shutdown(wait=False) so the
server thread is never blocked. Worker threads detect drain_done.is_set()
after run_llm_loop completes and self-persist the result via
llm_loop_completion_handle using their own DB session.
This is the primary regression for test_send_message_disconnect_and_cleanup:
the integration test disconnects mid-stream and expects the DB message to be
updated from the TERMINATED placeholder to the real response.
"""
import threading
# Signals the worker to unblock from run_llm_loop after gen.close() returns.
# This guarantees drain_done is set BEFORE the worker returns from run_llm_loop,
# so the self-completion path (drain_done.is_set() check) is always taken.
disconnect_received = threading.Event()
# Set by the llm_loop_completion_handle mock when called.
completion_called = threading.Event()
def emit_then_complete(**kwargs: Any) -> None:
"""Emit one packet (to give the drain loop a yield point), then block
until the main thread signals that gen.close() has been called. This
ensures drain_done is set before we return so model_succeeded is checked
against a set drain_done — no race condition.
"""
emitter = kwargs["emitter"]
emitter.emit(
Packet(placement=Placement(turn_index=0), obj=ReasoningStart())
)
disconnect_received.wait(timeout=5)
setup = _make_setup(n_models=1)
# is_connected() always True — HTTP disconnect does NOT set the Redis stop fence.
setup.check_is_connected = MagicMock(return_value=True)
with (
patch(
"onyx.chat.process_message.run_llm_loop",
side_effect=emit_then_complete,
),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle",
side_effect=lambda *_, **__: completion_called.set(),
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
from onyx.chat.process_message import _run_models
# cast to Generator so .close() is available; _run_models returns
# AnswerStream (= Iterator) but the actual object is always a generator.
gen = cast(Generator, _run_models(setup, MagicMock(), MagicMock()))
# Advance to the first yielded packet — generator suspends at `yield item`.
first = next(gen)
assert isinstance(first, Packet)
# Simulate Starlette closing the stream on HTTP client disconnect.
# GeneratorExit is thrown at the `yield item` suspension point.
gen.close()
# Unblock the worker now that drain_done has been set by gen.close().
disconnect_received.set()
# Worker self-completes asynchronously (executor.shutdown(wait=False)).
# Wait here, inside the patch context, so that get_session_with_current_tenant
# and llm_loop_completion_handle mocks are still active when the worker calls them.
assert completion_called.wait(
timeout=5
), "worker must self-complete via drain_done within 5 seconds"
assert (
mock_handle.call_count == 1
), "completion handle must be called once for the successful model"
def test_b1_race_disconnect_handler_completes_already_finished_model(self) -> None:
"""B1 regression: model finishes BEFORE GeneratorExit fires.
The worker exits _run_model with drain_done.is_set()=False and skips
self-completion. When gen.close() fires afterward, the finally else-branch
must detect model_succeeded=True and call llm_loop_completion_handle itself.
Contrast with test_http_disconnect_completion_via_generator_exit, which
tests the opposite ordering (worker finishes AFTER disconnect).
"""
import threading
import time
completion_called = threading.Event()
def emit_and_return_immediately(**kwargs: Any) -> None:
# Emit one packet so the drain loop has something to yield, then return
# immediately — no blocking. The worker will be done in microseconds.
kwargs["emitter"].emit(
Packet(placement=Placement(turn_index=0), obj=ReasoningStart())
)
setup = _make_setup(n_models=1)
setup.check_is_connected = MagicMock(return_value=True)
with (
patch(
"onyx.chat.process_message.run_llm_loop",
side_effect=emit_and_return_immediately,
),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle",
side_effect=lambda *_, **__: completion_called.set(),
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
from onyx.chat.process_message import _run_models
gen = cast(Generator, _run_models(setup, MagicMock(), MagicMock()))
first = next(gen)
assert isinstance(first, Packet)
# Give the worker thread time to finish completely (emit + return +
# finally + self-completion check). It does almost no work, so 100 ms
# is far more than enough while still keeping the test fast.
time.sleep(0.1)
# Now close — worker is already done, so else-branch handles completion.
gen.close()
assert completion_called.wait(
timeout=5
), "disconnect handler must call completion for a model that already finished"
assert mock_handle.call_count == 1, "completion must be called exactly once"
def test_stop_button_does_not_call_completion_for_errored_model(self) -> None:
"""B2 regression: stop-button must NOT call completion for an errored model.
When model 0 raises an exception, its reserved ChatMessage must not be
saved with 'stopped by user' — that message is wrong for a model that
errored. llm_loop_completion_handle must only be called for non-errored
models when the stop button fires.
"""
def fail_model_0(**kwargs: Any) -> None:
if kwargs["llm"] is setup.llms[0]:
raise RuntimeError("model 0 errored")
# Model 1: run forever (stop button fires before it finishes)
time.sleep(10)
setup = _make_setup(n_models=2)
# Return False immediately so the stop-button path fires while model 1
# is still sleeping (model 0 has already errored by then).
setup.check_is_connected = lambda: False
with (
patch("onyx.chat.process_message.run_llm_loop", side_effect=fail_model_0),
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch(
"onyx.chat.process_message.llm_loop_completion_handle"
) as mock_handle,
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
_run_models_collect(setup)
# Completion must NOT be called for model 0 (it errored).
# It MAY be called for model 1 (still in-flight when stop fired).
for call in mock_handle.call_args_list:
assert (
call.kwargs.get("llm") is not setup.llms[0]
), "llm_loop_completion_handle must not be called for the errored model"
def test_external_state_container_used_for_model_zero(self) -> None:
"""When provided, external_state_container is used as state_containers[0]."""
from onyx.chat.chat_state import ChatStateContainer
from onyx.chat.process_message import _run_models
external = ChatStateContainer()
setup = _make_setup(n_models=1)
with (
patch("onyx.chat.process_message.run_llm_loop") as mock_llm,
patch("onyx.chat.process_message.run_deep_research_llm_loop"),
patch("onyx.chat.process_message.construct_tools", return_value={}),
patch("onyx.chat.process_message.get_session_with_current_tenant"),
patch("onyx.chat.process_message.llm_loop_completion_handle"),
patch(
"onyx.chat.process_message.get_llm_token_counter",
return_value=lambda _: 0,
),
):
list(
_run_models(
setup, MagicMock(), MagicMock(), external_state_container=external
)
)
# The state_container kwarg passed to run_llm_loop must be the external one
call_kwargs = mock_llm.call_args.kwargs
assert call_kwargs["state_container"] is external

View File

@@ -1,15 +1,23 @@
"""Tests for Canvas connector — client (PR1)."""
"""Tests for Canvas connector — client, credentials, conversion."""
from datetime import datetime
from datetime import timezone
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from onyx.configs.constants import DocumentSource
from onyx.connectors.canvas.client import CanvasApiClient
from onyx.connectors.canvas.connector import CanvasConnector
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedValidationError
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.error_handling.exceptions import OnyxError
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@@ -18,6 +26,77 @@ FAKE_BASE_URL = "https://myschool.instructure.com"
FAKE_TOKEN = "fake-canvas-token"
def _mock_course(
course_id: int = 1,
name: str = "Intro to CS",
course_code: str = "CS101",
) -> dict[str, Any]:
return {
"id": course_id,
"name": name,
"course_code": course_code,
"created_at": "2025-01-01T00:00:00Z",
"workflow_state": "available",
}
def _build_connector(base_url: str = FAKE_BASE_URL) -> CanvasConnector:
"""Build a connector with mocked credential validation."""
with patch("onyx.connectors.canvas.client.rl_requests") as mock_req:
mock_req.get.return_value = _mock_response(json_data=[_mock_course()])
connector = CanvasConnector(canvas_base_url=base_url)
connector.load_credentials({"canvas_access_token": FAKE_TOKEN})
return connector
def _mock_page(
page_id: int = 10,
title: str = "Syllabus",
updated_at: str = "2025-06-01T12:00:00Z",
) -> dict[str, Any]:
return {
"page_id": page_id,
"url": "syllabus",
"title": title,
"body": "<p>Welcome to the course</p>",
"created_at": "2025-01-15T00:00:00Z",
"updated_at": updated_at,
}
def _mock_assignment(
assignment_id: int = 20,
name: str = "Homework 1",
course_id: int = 1,
updated_at: str = "2025-06-01T12:00:00Z",
) -> dict[str, Any]:
return {
"id": assignment_id,
"name": name,
"description": "<p>Solve these problems</p>",
"html_url": f"{FAKE_BASE_URL}/courses/{course_id}/assignments/{assignment_id}",
"course_id": course_id,
"created_at": "2025-01-20T00:00:00Z",
"updated_at": updated_at,
"due_at": "2025-02-01T23:59:00Z",
}
def _mock_announcement(
announcement_id: int = 30,
title: str = "Class Cancelled",
course_id: int = 1,
posted_at: str = "2025-06-01T12:00:00Z",
) -> dict[str, Any]:
return {
"id": announcement_id,
"title": title,
"message": "<p>No class today</p>",
"html_url": f"{FAKE_BASE_URL}/courses/{course_id}/discussion_topics/{announcement_id}",
"posted_at": posted_at,
}
def _mock_response(
status_code: int = 200,
json_data: Any = None,
@@ -325,6 +404,57 @@ class TestGet:
assert result == expected
# ---------------------------------------------------------------------------
# CanvasApiClient.paginate tests
# ---------------------------------------------------------------------------
class TestPaginate:
@patch("onyx.connectors.canvas.client.rl_requests")
def test_single_page(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(
json_data=[{"id": 1}, {"id": 2}]
)
client = CanvasApiClient(
bearer_token=FAKE_TOKEN,
canvas_base_url=FAKE_BASE_URL,
)
pages = list(client.paginate("courses"))
assert len(pages) == 1
assert pages[0] == [{"id": 1}, {"id": 2}]
@patch("onyx.connectors.canvas.client.rl_requests")
def test_two_pages(self, mock_requests: MagicMock) -> None:
next_link = f'<{FAKE_BASE_URL}/api/v1/courses?page=2>; rel="next"'
page1 = _mock_response(json_data=[{"id": 1}], link_header=next_link)
page2 = _mock_response(json_data=[{"id": 2}])
mock_requests.get.side_effect = [page1, page2]
client = CanvasApiClient(
bearer_token=FAKE_TOKEN,
canvas_base_url=FAKE_BASE_URL,
)
pages = list(client.paginate("courses"))
assert len(pages) == 2
assert pages[0] == [{"id": 1}]
assert pages[1] == [{"id": 2}]
@patch("onyx.connectors.canvas.client.rl_requests")
def test_empty_response(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[])
client = CanvasApiClient(
bearer_token=FAKE_TOKEN,
canvas_base_url=FAKE_BASE_URL,
)
pages = list(client.paginate("courses"))
assert pages == []
# ---------------------------------------------------------------------------
# CanvasApiClient._parse_next_link tests
# ---------------------------------------------------------------------------
@@ -379,3 +509,368 @@ class TestParseNextLink:
with pytest.raises(OnyxError, match="must use https"):
self.client._parse_next_link(header)
# ---------------------------------------------------------------------------
# CanvasConnector — credential loading
# ---------------------------------------------------------------------------
class TestLoadCredentials:
def _assert_load_credentials_raises(
self,
status_code: int,
expected_error: type[Exception],
mock_requests: MagicMock,
) -> None:
"""Helper: assert load_credentials raises expected_error for a given status."""
mock_requests.get.return_value = _mock_response(status_code, {})
connector = CanvasConnector(canvas_base_url=FAKE_BASE_URL)
with pytest.raises(expected_error):
connector.load_credentials({"canvas_access_token": FAKE_TOKEN})
@patch("onyx.connectors.canvas.client.rl_requests")
def test_load_credentials_success(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[_mock_course()])
connector = CanvasConnector(canvas_base_url=FAKE_BASE_URL)
result = connector.load_credentials({"canvas_access_token": FAKE_TOKEN})
assert result is None
assert connector._canvas_client is not None
def test_canvas_client_raises_without_credentials(self) -> None:
connector = CanvasConnector(canvas_base_url=FAKE_BASE_URL)
with pytest.raises(ConnectorMissingCredentialError):
_ = connector.canvas_client
@patch("onyx.connectors.canvas.client.rl_requests")
def test_load_credentials_invalid_token(self, mock_requests: MagicMock) -> None:
self._assert_load_credentials_raises(401, CredentialExpiredError, mock_requests)
@patch("onyx.connectors.canvas.client.rl_requests")
def test_load_credentials_insufficient_permissions(
self, mock_requests: MagicMock
) -> None:
self._assert_load_credentials_raises(
403, InsufficientPermissionsError, mock_requests
)
# ---------------------------------------------------------------------------
# CanvasConnector — URL normalization
# ---------------------------------------------------------------------------
class TestConnectorUrlNormalization:
def test_strips_api_v1_suffix(self) -> None:
connector = _build_connector(base_url=f"{FAKE_BASE_URL}/api/v1")
result = connector.canvas_base_url
expected = FAKE_BASE_URL
assert result == expected
def test_strips_trailing_slash(self) -> None:
connector = _build_connector(base_url=f"{FAKE_BASE_URL}/")
result = connector.canvas_base_url
expected = FAKE_BASE_URL
assert result == expected
def test_no_change_for_clean_url(self) -> None:
connector = _build_connector(base_url=FAKE_BASE_URL)
result = connector.canvas_base_url
expected = FAKE_BASE_URL
assert result == expected
# ---------------------------------------------------------------------------
# CanvasConnector — document conversion
# ---------------------------------------------------------------------------
class TestDocumentConversion:
def setup_method(self) -> None:
self.connector = _build_connector()
def test_convert_page_to_document(self) -> None:
from onyx.connectors.canvas.connector import CanvasPage
page = CanvasPage(
page_id=10,
url="syllabus",
title="Syllabus",
body="<p>Welcome</p>",
created_at="2025-01-15T00:00:00Z",
updated_at="2025-06-01T12:00:00Z",
course_id=1,
)
doc = self.connector._convert_page_to_document(page)
expected_id = "canvas-page-1-10"
expected_metadata = {"course_id": "1", "type": "page"}
expected_updated_at = datetime(2025, 6, 1, 12, 0, tzinfo=timezone.utc)
assert doc.id == expected_id
assert doc.source == DocumentSource.CANVAS
assert doc.semantic_identifier == "Syllabus"
assert doc.metadata == expected_metadata
assert doc.sections[0].link is not None
assert f"{FAKE_BASE_URL}/courses/1/pages/syllabus" in doc.sections[0].link
assert doc.doc_updated_at == expected_updated_at
def test_convert_page_without_body(self) -> None:
from onyx.connectors.canvas.connector import CanvasPage
page = CanvasPage(
page_id=11,
url="empty-page",
title="Empty Page",
body=None,
created_at="2025-01-15T00:00:00Z",
updated_at="2025-06-01T12:00:00Z",
course_id=1,
)
doc = self.connector._convert_page_to_document(page)
section_text = doc.sections[0].text
assert section_text is not None
assert "Empty Page" in section_text
assert "<p>" not in section_text
def test_convert_assignment_to_document(self) -> None:
from onyx.connectors.canvas.connector import CanvasAssignment
assignment = CanvasAssignment(
id=20,
name="Homework 1",
description="<p>Solve these</p>",
html_url=f"{FAKE_BASE_URL}/courses/1/assignments/20",
course_id=1,
created_at="2025-01-20T00:00:00Z",
updated_at="2025-06-01T12:00:00Z",
due_at="2025-02-01T23:59:00Z",
)
doc = self.connector._convert_assignment_to_document(assignment)
expected_id = "canvas-assignment-1-20"
expected_due_text = "Due: February 01, 2025 23:59 UTC"
assert doc.id == expected_id
assert doc.source == DocumentSource.CANVAS
assert doc.semantic_identifier == "Homework 1"
assert doc.sections[0].text is not None
assert expected_due_text in doc.sections[0].text
def test_convert_assignment_without_description(self) -> None:
from onyx.connectors.canvas.connector import CanvasAssignment
assignment = CanvasAssignment(
id=21,
name="Quiz 1",
description=None,
html_url=f"{FAKE_BASE_URL}/courses/1/assignments/21",
course_id=1,
created_at="2025-01-20T00:00:00Z",
updated_at="2025-06-01T12:00:00Z",
due_at=None,
)
doc = self.connector._convert_assignment_to_document(assignment)
section_text = doc.sections[0].text
assert section_text is not None
assert "Quiz 1" in section_text
assert "Due:" not in section_text
def test_convert_announcement_to_document(self) -> None:
from onyx.connectors.canvas.connector import CanvasAnnouncement
announcement = CanvasAnnouncement(
id=30,
title="Class Cancelled",
message="<p>No class today</p>",
html_url=f"{FAKE_BASE_URL}/courses/1/discussion_topics/30",
posted_at="2025-06-01T12:00:00Z",
course_id=1,
)
doc = self.connector._convert_announcement_to_document(announcement)
expected_id = "canvas-announcement-1-30"
expected_updated_at = datetime(2025, 6, 1, 12, 0, tzinfo=timezone.utc)
assert doc.id == expected_id
assert doc.source == DocumentSource.CANVAS
assert doc.semantic_identifier == "Class Cancelled"
assert doc.doc_updated_at == expected_updated_at
def test_convert_announcement_without_posted_at(self) -> None:
from onyx.connectors.canvas.connector import CanvasAnnouncement
announcement = CanvasAnnouncement(
id=31,
title="TBD Announcement",
message=None,
html_url=f"{FAKE_BASE_URL}/courses/1/discussion_topics/31",
posted_at=None,
course_id=1,
)
doc = self.connector._convert_announcement_to_document(announcement)
assert doc.doc_updated_at is None
# ---------------------------------------------------------------------------
# CanvasConnector — validate_connector_settings
# ---------------------------------------------------------------------------
class TestValidateConnectorSettings:
def _assert_validate_raises(
self,
status_code: int,
expected_error: type[Exception],
mock_requests: MagicMock,
) -> None:
"""Helper: assert validate_connector_settings raises expected_error."""
success_resp = _mock_response(json_data=[_mock_course()])
fail_resp = _mock_response(status_code, {})
mock_requests.get.side_effect = [success_resp, fail_resp]
connector = CanvasConnector(canvas_base_url=FAKE_BASE_URL)
connector.load_credentials({"canvas_access_token": FAKE_TOKEN})
with pytest.raises(expected_error):
connector.validate_connector_settings()
@patch("onyx.connectors.canvas.client.rl_requests")
def test_validate_success(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[_mock_course()])
connector = _build_connector()
connector.validate_connector_settings() # should not raise
@patch("onyx.connectors.canvas.client.rl_requests")
def test_validate_expired_credential(self, mock_requests: MagicMock) -> None:
self._assert_validate_raises(401, CredentialExpiredError, mock_requests)
@patch("onyx.connectors.canvas.client.rl_requests")
def test_validate_insufficient_permissions(self, mock_requests: MagicMock) -> None:
self._assert_validate_raises(403, InsufficientPermissionsError, mock_requests)
@patch("onyx.connectors.canvas.client.rl_requests")
def test_validate_rate_limited(self, mock_requests: MagicMock) -> None:
self._assert_validate_raises(429, ConnectorValidationError, mock_requests)
@patch("onyx.connectors.canvas.client.rl_requests")
def test_validate_unexpected_error(self, mock_requests: MagicMock) -> None:
self._assert_validate_raises(500, UnexpectedValidationError, mock_requests)
# ---------------------------------------------------------------------------
# _list_* pagination tests
# ---------------------------------------------------------------------------
class TestListCourses:
@patch("onyx.connectors.canvas.client.rl_requests")
def test_single_page(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(
json_data=[_mock_course(1), _mock_course(2, "CS201", "Data Structures")]
)
connector = _build_connector()
result = connector._list_courses()
assert len(result) == 2
assert result[0].id == 1
assert result[1].id == 2
@patch("onyx.connectors.canvas.client.rl_requests")
def test_empty_response(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[])
connector = _build_connector()
result = connector._list_courses()
assert result == []
class TestListPages:
@patch("onyx.connectors.canvas.client.rl_requests")
def test_single_page(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(
json_data=[_mock_page(10), _mock_page(11, "Notes")]
)
connector = _build_connector()
result = connector._list_pages(course_id=1)
assert len(result) == 2
assert result[0].page_id == 10
assert result[1].page_id == 11
@patch("onyx.connectors.canvas.client.rl_requests")
def test_empty_response(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[])
connector = _build_connector()
result = connector._list_pages(course_id=1)
assert result == []
class TestListAssignments:
@patch("onyx.connectors.canvas.client.rl_requests")
def test_single_page(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(
json_data=[_mock_assignment(20), _mock_assignment(21, "Quiz 1")]
)
connector = _build_connector()
result = connector._list_assignments(course_id=1)
assert len(result) == 2
assert result[0].id == 20
assert result[1].id == 21
@patch("onyx.connectors.canvas.client.rl_requests")
def test_empty_response(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[])
connector = _build_connector()
result = connector._list_assignments(course_id=1)
assert result == []
class TestListAnnouncements:
@patch("onyx.connectors.canvas.client.rl_requests")
def test_single_page(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(
json_data=[_mock_announcement(30), _mock_announcement(31, "Update")]
)
connector = _build_connector()
result = connector._list_announcements(course_id=1)
assert len(result) == 2
assert result[0].id == 30
assert result[1].id == 31
@patch("onyx.connectors.canvas.client.rl_requests")
def test_empty_response(self, mock_requests: MagicMock) -> None:
mock_requests.get.return_value = _mock_response(json_data=[])
connector = _build_connector()
result = connector._list_announcements(course_id=1)
assert result == []

View File

@@ -0,0 +1,58 @@
"""Tests for chat session and message deletion resilience."""
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
from uuid import uuid4
from onyx.db.chat import delete_messages_and_files_from_chat_session
@patch("onyx.db.chat.delete_orphaned_search_docs")
@patch("onyx.db.chat.get_default_file_store")
def test_delete_messages_skips_missing_files(
mock_get_file_store: MagicMock,
_mock_delete_orphaned: Any,
) -> None:
"""Deletion should continue when a referenced file record no longer exists."""
session_id = uuid4()
file_store = MagicMock()
file_store.delete_file.side_effect = [
None, # first file deletes fine
RuntimeError("File by id abc does not exist or was deleted"),
None, # third file deletes fine
]
mock_get_file_store.return_value = file_store
mock_db_session = MagicMock()
mock_db_session.execute.return_value.fetchall.return_value = [
(1, [{"id": "file-ok-1"}, {"id": "file-missing"}, {"id": "file-ok-2"}]),
]
delete_messages_and_files_from_chat_session(session_id, mock_db_session)
assert file_store.delete_file.call_count == 3
mock_db_session.execute.assert_called()
mock_db_session.commit.assert_called()
@patch("onyx.db.chat.delete_orphaned_search_docs")
@patch("onyx.db.chat.get_default_file_store")
def test_delete_messages_succeeds_with_no_files(
mock_get_file_store: MagicMock,
_mock_delete_orphaned: Any,
) -> None:
"""Deletion works when messages have no attached files."""
session_id = uuid4()
mock_db_session = MagicMock()
mock_db_session.execute.return_value.fetchall.return_value = [
(1, None),
(2, []),
]
delete_messages_and_files_from_chat_session(session_id, mock_db_session)
mock_get_file_store.return_value.delete_file.assert_not_called()
mock_db_session.commit.assert_called()

View File

@@ -1,6 +1,6 @@
"""Tests for memory tool streaming packet emissions."""
import queue
from queue import Queue
from unittest.mock import MagicMock
from unittest.mock import patch
@@ -18,13 +18,9 @@ from onyx.tools.tool_implementations.memory.models import MemoryToolResponse
@pytest.fixture
def emitter_queue() -> queue.Queue:
return queue.Queue()
@pytest.fixture
def emitter(emitter_queue: queue.Queue) -> Emitter:
return Emitter(merged_queue=emitter_queue)
def emitter() -> Emitter:
bus: Queue = Queue()
return Emitter(bus)
@pytest.fixture
@@ -57,27 +53,24 @@ class TestMemoryToolEmitStart:
def test_emit_start_emits_memory_tool_start_packet(
self,
memory_tool: MemoryTool,
emitter_queue: queue.Queue,
emitter: Emitter,
placement: Placement,
) -> None:
memory_tool.emit_start(placement)
_key, packet = emitter_queue.get_nowait()
packet = emitter.bus.get_nowait()
assert isinstance(packet.obj, MemoryToolStart)
assert packet.placement is not None
assert packet.placement.turn_index == placement.turn_index
assert packet.placement.tab_index == placement.tab_index
assert packet.placement.model_index == 0 # emitter stamps model_index=0
assert packet.placement == placement
def test_emit_start_with_different_placement(
self,
memory_tool: MemoryTool,
emitter_queue: queue.Queue,
emitter: Emitter,
) -> None:
placement = Placement(turn_index=2, tab_index=1)
memory_tool.emit_start(placement)
_key, packet = emitter_queue.get_nowait()
packet = emitter.bus.get_nowait()
assert packet.placement.turn_index == 2
assert packet.placement.tab_index == 1
@@ -88,7 +81,7 @@ class TestMemoryToolRun:
self,
mock_process: MagicMock,
memory_tool: MemoryTool,
emitter_queue: queue.Queue,
emitter: Emitter,
placement: Placement,
override_kwargs: MemoryToolOverrideKwargs,
) -> None:
@@ -100,19 +93,21 @@ class TestMemoryToolRun:
memory="User prefers Python",
)
_key, packet = emitter_queue.get_nowait()
# The delta packet should be in the queue
packet = emitter.bus.get_nowait()
assert isinstance(packet.obj, MemoryToolDelta)
assert packet.obj.memory_text == "User prefers Python"
assert packet.obj.operation == "add"
assert packet.obj.memory_id is None
assert packet.obj.index is None
assert packet.placement == placement
@patch("onyx.tools.tool_implementations.memory.memory_tool.process_memory_update")
def test_run_emits_delta_for_update_operation(
self,
mock_process: MagicMock,
memory_tool: MemoryTool,
emitter_queue: queue.Queue,
emitter: Emitter,
placement: Placement,
override_kwargs: MemoryToolOverrideKwargs,
) -> None:
@@ -124,7 +119,7 @@ class TestMemoryToolRun:
memory="User prefers light mode",
)
_key, packet = emitter_queue.get_nowait()
packet = emitter.bus.get_nowait()
assert isinstance(packet.obj, MemoryToolDelta)
assert packet.obj.memory_text == "User prefers light mode"
assert packet.obj.operation == "update"

View File

@@ -5,7 +5,7 @@ home: https://www.onyx.app/
sources:
- "https://github.com/onyx-dot-app/onyx"
type: application
version: 0.4.38
version: 0.4.39
appVersion: latest
annotations:
category: Productivity

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,77 @@
{{- if and .Values.monitoring.serviceMonitors.enabled .Values.vectorDB.enabled }}
{{- if gt (int .Values.celery_worker_monitoring.replicaCount) 0 }}
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "onyx.fullname" . }}-celery-worker-monitoring
labels:
{{- include "onyx.labels" . | nindent 4 }}
{{- with .Values.monitoring.serviceMonitors.labels }}
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
namespaceSelector:
matchNames:
- {{ .Release.Namespace }}
selector:
matchLabels:
app: {{ .Values.celery_worker_monitoring.deploymentLabels.app }}
metrics: "true"
endpoints:
- port: metrics
path: /metrics
interval: 30s
scrapeTimeout: 10s
{{- end }}
{{- if gt (int .Values.celery_worker_docfetching.replicaCount) 0 }}
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "onyx.fullname" . }}-celery-worker-docfetching
labels:
{{- include "onyx.labels" . | nindent 4 }}
{{- with .Values.monitoring.serviceMonitors.labels }}
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
namespaceSelector:
matchNames:
- {{ .Release.Namespace }}
selector:
matchLabels:
app: {{ .Values.celery_worker_docfetching.deploymentLabels.app }}
metrics: "true"
endpoints:
- port: metrics
path: /metrics
interval: 30s
scrapeTimeout: 10s
{{- end }}
{{- if gt (int .Values.celery_worker_docprocessing.replicaCount) 0 }}
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "onyx.fullname" . }}-celery-worker-docprocessing
labels:
{{- include "onyx.labels" . | nindent 4 }}
{{- with .Values.monitoring.serviceMonitors.labels }}
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
namespaceSelector:
matchNames:
- {{ .Release.Namespace }}
selector:
matchLabels:
app: {{ .Values.celery_worker_docprocessing.deploymentLabels.app }}
metrics: "true"
endpoints:
- port: metrics
path: /metrics
interval: 30s
scrapeTimeout: 10s
{{- end }}
{{- end }}

View File

@@ -0,0 +1,15 @@
{{- if .Values.monitoring.grafana.dashboards.enabled }}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "onyx.fullname" . }}-indexing-pipeline-dashboard
labels:
{{- include "onyx.labels" . | nindent 4 }}
grafana_dashboard: "1"
annotations:
grafana_folder: "Onyx"
data:
onyx-indexing-pipeline.json: |
{{- .Files.Get "dashboards/indexing-pipeline.json" | nindent 4 }}
{{- end }}

View File

@@ -256,6 +256,20 @@ tooling:
# -- Which client binary to call; change if your image uses a non-default path.
psqlBinary: psql
monitoring:
grafana:
dashboards:
# -- Set to true to deploy Grafana dashboard ConfigMaps for the Onyx indexing pipeline.
# Requires kube-prometheus-stack (or equivalent) with the Grafana sidecar enabled and watching this namespace.
# The sidecar must be configured with label selector: grafana_dashboard=1
enabled: false
serviceMonitors:
# -- Set to true to deploy ServiceMonitor resources for Celery worker metrics endpoints.
# Requires the Prometheus Operator CRDs (included in kube-prometheus-stack).
# Use `labels` to match your Prometheus CR's serviceMonitorSelector (e.g. release: onyx-monitoring).
enabled: false
labels: {}
serviceAccount:
# Specifies whether a service account should be created
create: false

View File

@@ -19,6 +19,10 @@ module "eks" {
cluster_endpoint_public_access_cidrs = var.cluster_endpoint_public_access_cidrs
enable_cluster_creator_admin_permissions = true
# Control plane logging
cluster_enabled_log_types = var.cluster_enabled_log_types
cloudwatch_log_group_retention_in_days = var.cloudwatch_log_group_retention_in_days
eks_managed_node_group_defaults = {
ami_type = "AL2023_x86_64_STANDARD"
}

View File

@@ -161,3 +161,25 @@ variable "rds_db_connect_arn" {
description = "Full rds-db:connect ARN to allow (required when enable_rds_iam_for_service_account is true)"
default = null
}
variable "cluster_enabled_log_types" {
type = list(string)
description = "EKS control plane log types to enable (valid: api, audit, authenticator, controllerManager, scheduler)"
default = ["api", "audit", "authenticator", "controllerManager", "scheduler"]
validation {
condition = alltrue([for t in var.cluster_enabled_log_types : contains(["api", "audit", "authenticator", "controllerManager", "scheduler"], t)])
error_message = "Each entry must be one of: api, audit, authenticator, controllerManager, scheduler."
}
}
variable "cloudwatch_log_group_retention_in_days" {
type = number
description = "Number of days to retain EKS control plane logs in CloudWatch (0 = never expire)"
default = 30
validation {
condition = contains([0, 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557, 2922, 3288, 3653], var.cloudwatch_log_group_retention_in_days)
error_message = "Must be a valid CloudWatch retention value (0, 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557, 2922, 3288, 3653)."
}
}

View File

@@ -54,6 +54,9 @@ module "postgres" {
password = var.postgres_password
tags = local.merged_tags
enable_rds_iam_auth = var.enable_iam_auth
backup_retention_period = var.postgres_backup_retention_period
backup_window = var.postgres_backup_window
}
module "s3" {
@@ -80,6 +83,10 @@ module "eks" {
public_cluster_enabled = var.public_cluster_enabled
private_cluster_enabled = var.private_cluster_enabled
cluster_endpoint_public_access_cidrs = var.cluster_endpoint_public_access_cidrs
# Control plane logging
cluster_enabled_log_types = var.eks_cluster_enabled_log_types
cloudwatch_log_group_retention_in_days = var.eks_cloudwatch_log_group_retention_in_days
}
module "waf" {

View File

@@ -250,3 +250,34 @@ variable "opensearch_subnet_ids" {
description = "Subnet IDs for OpenSearch. If empty, uses first 3 private subnets."
default = []
}
# RDS Backup Configuration
variable "postgres_backup_retention_period" {
type = number
description = "Number of days to retain automated RDS backups (0 to disable)"
default = 7
}
variable "postgres_backup_window" {
type = string
description = "Preferred UTC time window for automated RDS backups (hh24:mi-hh24:mi)"
default = "03:00-04:00"
}
# EKS Control Plane Logging
variable "eks_cluster_enabled_log_types" {
type = list(string)
description = "EKS control plane log types to enable (valid: api, audit, authenticator, controllerManager, scheduler)"
default = ["api", "audit", "authenticator", "controllerManager", "scheduler"]
}
variable "eks_cloudwatch_log_group_retention_in_days" {
type = number
description = "Number of days to retain EKS control plane logs in CloudWatch (0 = never expire)"
default = 30
validation {
condition = contains([0, 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557, 2922, 3288, 3653], var.eks_cloudwatch_log_group_retention_in_days)
error_message = "Must be a valid CloudWatch retention value (0, 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557, 2922, 3288, 3653)."
}
}

View File

@@ -44,5 +44,56 @@ resource "aws_db_instance" "this" {
publicly_accessible = false
deletion_protection = true
storage_encrypted = true
tags = var.tags
# Automated backups
backup_retention_period = var.backup_retention_period
backup_window = var.backup_window
tags = var.tags
}
# CloudWatch alarm for CPU utilization monitoring
resource "aws_cloudwatch_metric_alarm" "cpu_utilization" {
alarm_name = "${var.identifier}-cpu-utilization"
alarm_description = "RDS CPU utilization for ${var.identifier}"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = var.cpu_alarm_evaluation_periods
metric_name = "CPUUtilization"
namespace = "AWS/RDS"
period = var.cpu_alarm_period
statistic = "Average"
threshold = var.cpu_alarm_threshold
treat_missing_data = "missing"
alarm_actions = var.alarm_actions
ok_actions = var.alarm_actions
dimensions = {
DBInstanceIdentifier = aws_db_instance.this.identifier
}
tags = var.tags
}
# CloudWatch alarm for freeable memory monitoring
resource "aws_cloudwatch_metric_alarm" "freeable_memory" {
alarm_name = "${var.identifier}-freeable-memory"
alarm_description = "RDS freeable memory for ${var.identifier}"
comparison_operator = "LessThanThreshold"
evaluation_periods = var.memory_alarm_evaluation_periods
metric_name = "FreeableMemory"
namespace = "AWS/RDS"
period = var.memory_alarm_period
statistic = "Average"
threshold = var.memory_alarm_threshold
treat_missing_data = "missing"
alarm_actions = var.alarm_actions
ok_actions = var.alarm_actions
dimensions = {
DBInstanceIdentifier = aws_db_instance.this.identifier
}
tags = var.tags
}

View File

@@ -67,3 +67,98 @@ variable "enable_rds_iam_auth" {
description = "Enable AWS IAM database authentication for this RDS instance"
default = false
}
variable "backup_retention_period" {
type = number
description = "Number of days to retain automated backups (0 to disable)"
default = 7
validation {
condition = var.backup_retention_period >= 0 && var.backup_retention_period <= 35
error_message = "backup_retention_period must be between 0 and 35 (AWS RDS limit)."
}
}
variable "backup_window" {
type = string
description = "Preferred UTC time window for automated backups (hh24:mi-hh24:mi)"
default = "03:00-04:00"
validation {
condition = can(regex("^([01]\\d|2[0-3]):[0-5]\\d-([01]\\d|2[0-3]):[0-5]\\d$", var.backup_window))
error_message = "backup_window must be in hh24:mi-hh24:mi format (e.g. \"03:00-04:00\")."
}
}
# CloudWatch CPU alarm configuration
variable "cpu_alarm_threshold" {
type = number
description = "CPU utilization percentage threshold for the CloudWatch alarm"
default = 80
validation {
condition = var.cpu_alarm_threshold >= 0 && var.cpu_alarm_threshold <= 100
error_message = "cpu_alarm_threshold must be between 0 and 100 (percentage)."
}
}
variable "cpu_alarm_evaluation_periods" {
type = number
description = "Number of consecutive periods the threshold must be breached before alarming"
default = 3
validation {
condition = var.cpu_alarm_evaluation_periods >= 1
error_message = "cpu_alarm_evaluation_periods must be at least 1."
}
}
variable "cpu_alarm_period" {
type = number
description = "Period in seconds over which the CPU metric is evaluated"
default = 300
validation {
condition = var.cpu_alarm_period >= 60 && var.cpu_alarm_period % 60 == 0
error_message = "cpu_alarm_period must be a multiple of 60 seconds and at least 60 (CloudWatch requirement)."
}
}
variable "memory_alarm_threshold" {
type = number
description = "Freeable memory threshold in bytes. Alarm fires when memory drops below this value."
default = 256000000 # 256 MB
validation {
condition = var.memory_alarm_threshold > 0
error_message = "memory_alarm_threshold must be greater than 0."
}
}
variable "memory_alarm_evaluation_periods" {
type = number
description = "Number of consecutive periods the threshold must be breached before alarming"
default = 3
validation {
condition = var.memory_alarm_evaluation_periods >= 1
error_message = "memory_alarm_evaluation_periods must be at least 1."
}
}
variable "memory_alarm_period" {
type = number
description = "Period in seconds over which the freeable memory metric is evaluated"
default = 300
validation {
condition = var.memory_alarm_period >= 60 && var.memory_alarm_period % 60 == 0
error_message = "memory_alarm_period must be a multiple of 60 seconds and at least 60 (CloudWatch requirement)."
}
}
variable "alarm_actions" {
type = list(string)
description = "List of ARNs to notify when the alarm transitions state (e.g. SNS topic ARNs)"
default = []
}

View File

@@ -73,11 +73,17 @@ ENV NEXT_PUBLIC_STRIPE_PUBLISHABLE_KEY=${NEXT_PUBLIC_STRIPE_PUBLISHABLE_KEY}
ARG NEXT_PUBLIC_RECAPTCHA_SITE_KEY
ENV NEXT_PUBLIC_RECAPTCHA_SITE_KEY=${NEXT_PUBLIC_RECAPTCHA_SITE_KEY}
ARG SENTRY_RELEASE
ENV SENTRY_RELEASE=${SENTRY_RELEASE}
# Add NODE_OPTIONS argument
ARG NODE_OPTIONS
# SENTRY_AUTH_TOKEN is injected via BuildKit secret mount so it is never written
# to any image layer, build cache, or registry manifest.
# Use NODE_OPTIONS in the build command
RUN NODE_OPTIONS="${NODE_OPTIONS}" npx next build
RUN --mount=type=secret,id=sentry_auth_token,env=SENTRY_AUTH_TOKEN \
NODE_OPTIONS="${NODE_OPTIONS}" npx next build
# Step 2. Production image, copy all the files and run next
FROM base AS runner
@@ -150,6 +156,9 @@ ENV NEXT_PUBLIC_STRIPE_PUBLISHABLE_KEY=${NEXT_PUBLIC_STRIPE_PUBLISHABLE_KEY}
ARG NEXT_PUBLIC_RECAPTCHA_SITE_KEY
ENV NEXT_PUBLIC_RECAPTCHA_SITE_KEY=${NEXT_PUBLIC_RECAPTCHA_SITE_KEY}
ARG SENTRY_RELEASE
ENV SENTRY_RELEASE=${SENTRY_RELEASE}
# Default ONYX_VERSION, typically overriden during builds by GitHub Actions.
ARG ONYX_VERSION=0.0.0-dev
ENV ONYX_VERSION=${ONYX_VERSION}

View File

@@ -8,6 +8,7 @@ import * as Sentry from "@sentry/nextjs";
if (process.env.NEXT_PUBLIC_SENTRY_DSN) {
Sentry.init({
dsn: process.env.NEXT_PUBLIC_SENTRY_DSN,
release: process.env.SENTRY_RELEASE,
// Only capture unhandled exceptions
tracesSampleRate: 0,
debug: false,

View File

@@ -7,6 +7,7 @@ import * as Sentry from "@sentry/nextjs";
if (process.env.NEXT_PUBLIC_SENTRY_DSN) {
Sentry.init({
dsn: process.env.NEXT_PUBLIC_SENTRY_DSN,
release: process.env.SENTRY_RELEASE,
// Setting this option to true will print useful information to the console while you're setting up Sentry.
debug: false,

View File

@@ -2,7 +2,7 @@
import { useState, useMemo, useEffect } from "react";
import useSWR from "swr";
import { Select } from "@/refresh-components/cards";
import ProviderCard from "@/sections/cards/ProviderCard";
import { useCreateModal } from "@/refresh-components/contexts/ModalContext";
import { toast } from "@/hooks/useToast";
import { Section } from "@/layouts/general-layouts";
@@ -228,11 +228,11 @@ export default function ImageGenerationContent() {
</Text>
<div className="flex flex-col gap-2">
{group.providers.map((provider) => (
<Select
<ProviderCard
key={provider.image_provider_id}
aria-label={`image-gen-provider-${provider.image_provider_id}`}
icon={() => (
<ProviderIcon provider={provider.provider_name} size={18} />
<ProviderIcon provider={provider.provider_name} size={16} />
)}
title={provider.title}
description={provider.description}

View File

@@ -4,7 +4,7 @@ import Image from "next/image";
import { useEffect, useMemo, useState, useReducer } from "react";
import { InfoIcon } from "@/components/icons/icons";
import Text from "@/refresh-components/texts/Text";
import { Select } from "@/refresh-components/cards";
import ProviderCard from "@/sections/cards/ProviderCard";
import { Section } from "@/layouts/general-layouts";
import * as SettingsLayouts from "@/layouts/settings-layouts";
import { Content } from "@opal/layouts";
@@ -1091,7 +1091,7 @@ export default function Page() {
: "connected";
return (
<Select
<ProviderCard
key={`${key}-${providerType}`}
icon={() =>
logoSrc ? (
@@ -1207,7 +1207,7 @@ export default function Page() {
CONTENT_PROVIDER_DETAILS[provider.provider_type]?.logoSrc;
return (
<Select
<ProviderCard
key={`${provider.provider_type}-${provider.id}`}
icon={() =>
contentLogoSrc ? (

View File

@@ -182,8 +182,7 @@ export async function* sendMessage({
});
if (!response.ok) {
const data = await response.json().catch(() => ({}));
throw new Error(data.detail ?? `HTTP error! status: ${response.status}`);
throw new Error(`HTTP error! status: ${response.status}`);
}
yield* handleSSEStream<PacketType>(response, signal);

View File

@@ -11,6 +11,7 @@ import Text from "@/refresh-components/texts/Text";
import InputSelect from "@/refresh-components/inputs/InputSelect";
import { ThreeDotsLoader } from "@/components/Loading";
import { ChatSessionMinimal } from "@/app/ee/admin/performance/usage/types";
import { Section } from "@/layouts/general-layouts";
import { timestampToReadableDate } from "@/lib/dateUtils";
import { Dispatch, SetStateAction, useCallback, useState } from "react";
import { Feedback, TaskStatus } from "@/lib/types";
@@ -101,34 +102,32 @@ function SelectFeedbackType({
onValueChange: (value: Feedback | "all") => void;
}) {
return (
<div>
<Text as="p" className="my-auto mr-2 font-medium mb-1">
<Section alignItems="start" gap={0.25}>
<Text as="p" className="font-medium">
Feedback Type
</Text>
<div className="max-w-sm space-y-6">
<InputSelect
value={value}
onValueChange={onValueChange as (value: string) => void}
>
<InputSelect.Trigger />
<InputSelect
value={value}
onValueChange={onValueChange as (value: string) => void}
>
<InputSelect.Trigger />
<InputSelect.Content>
<InputSelect.Item value="all" icon={SvgMinusCircle}>
Any
</InputSelect.Item>
<InputSelect.Item value="like" icon={SvgThumbsUp}>
Like
</InputSelect.Item>
<InputSelect.Item value="dislike" icon={SvgThumbsDown}>
Dislike
</InputSelect.Item>
<InputSelect.Item value="mixed" icon={SvgMinus}>
Mixed
</InputSelect.Item>
</InputSelect.Content>
</InputSelect>
</div>
</div>
<InputSelect.Content>
<InputSelect.Item value="all" icon={SvgMinusCircle}>
Any
</InputSelect.Item>
<InputSelect.Item value="like" icon={SvgThumbsUp}>
Like
</InputSelect.Item>
<InputSelect.Item value="dislike" icon={SvgThumbsDown}>
Dislike
</InputSelect.Item>
<InputSelect.Item value="mixed" icon={SvgMinus}>
Mixed
</InputSelect.Item>
</InputSelect.Content>
</InputSelect>
</Section>
);
}
@@ -185,60 +184,61 @@ function PreviousQueryHistoryExportsModal({
onClose={() => setShowModal(false)}
/>
<Modal.Body>
<div className="flex flex-col w-full">
<div className="flex flex-1">
<Table>
<TableHeader>
<TableRow>
<TableHead>Generated At</TableHead>
<TableHead>Start Range</TableHead>
<TableHead>End Range</TableHead>
<TableHead>Status</TableHead>
<TableHead>Download</TableHead>
</TableRow>
</TableHeader>
<TableBody>
{paginatedTasks.map((task, index) => (
<TableRow key={index}>
<TableCell>
{humanReadableFormatWithTime(task.startTime)}
</TableCell>
<TableCell>{task.start.toDateString()}</TableCell>
<TableCell>{task.end.toDateString()}</TableCell>
<TableCell>
<ExportBadge status={task.status} />
</TableCell>
<TableCell>
{task.status === "SUCCESS" ? (
<a
className="flex justify-center"
href={withRequestId(
<Table>
<TableHeader>
<TableRow>
<TableHead>Generated At</TableHead>
<TableHead>Start Range</TableHead>
<TableHead>End Range</TableHead>
<TableHead>Status</TableHead>
<TableHead>Download</TableHead>
</TableRow>
</TableHeader>
<TableBody>
{paginatedTasks.map((task, index) => (
<TableRow key={index}>
<TableCell>
{humanReadableFormatWithTime(task.startTime)}
</TableCell>
<TableCell>{task.start.toDateString()}</TableCell>
<TableCell>{task.end.toDateString()}</TableCell>
<TableCell>
<ExportBadge status={task.status} />
</TableCell>
<TableCell>
<Button
variant="default"
prominence="tertiary"
icon={SvgDownloadCloud}
size="sm"
disabled={task.status !== "SUCCESS"}
tooltip={
task.status !== "SUCCESS"
? "Export is not yet ready"
: undefined
}
href={
task.status === "SUCCESS"
? withRequestId(
DOWNLOAD_QUERY_HISTORY_URL,
task.taskId
)}
>
<SvgDownloadCloud className="h-4 w-4 text-action-link-05" />
</a>
) : (
<SvgDownloadCloud className="h-4 w-4 text-action-link-05 opacity-20" />
)}
</TableCell>
</TableRow>
))}
</TableBody>
</Table>
</div>
)
: undefined
}
/>
</TableCell>
</TableRow>
))}
</TableBody>
</Table>
<div className="flex mt-3">
<div className="mx-auto">
<PageSelector
currentPage={taskPage}
totalPages={totalTaskPages}
onPageChange={setTaskPage}
/>
</div>
</div>
</div>
<Section>
<PageSelector
currentPage={taskPage}
totalPages={totalTaskPages}
onPageChange={setTaskPage}
/>
</Section>
</Modal.Body>
</Modal.Content>
</Modal>
@@ -330,48 +330,48 @@ export function QueryHistoryTable() {
</div>
</div>
<Separator />
<Table className="mt-5">
<TableHeader>
<TableRow>
<TableHead>First User Message</TableHead>
<TableHead>First AI Response</TableHead>
<TableHead>Feedback</TableHead>
<TableHead>User</TableHead>
<TableHead>Persona</TableHead>
<TableHead>Date</TableHead>
</TableRow>
</TableHeader>
{isLoading ? (
<TableBody>
<Section>
<Table className="mt-5">
<TableHeader>
<TableRow>
<TableCell colSpan={6} className="text-center">
<ThreeDotsLoader />
</TableCell>
<TableHead>First User Message</TableHead>
<TableHead>First AI Response</TableHead>
<TableHead>Feedback</TableHead>
<TableHead>User</TableHead>
<TableHead>Persona</TableHead>
<TableHead>Date</TableHead>
</TableRow>
</TableBody>
) : (
<TableBody>
{chatSessionData?.map((chatSessionMinimal) => (
<QueryHistoryTableRow
key={chatSessionMinimal.id}
chatSessionMinimal={chatSessionMinimal}
/>
))}
</TableBody>
)}
</Table>
</TableHeader>
{isLoading ? (
<TableBody>
<TableRow>
<TableCell colSpan={6} className="text-center">
<ThreeDotsLoader />
</TableCell>
</TableRow>
</TableBody>
) : (
<TableBody>
{chatSessionData?.map((chatSessionMinimal) => (
<QueryHistoryTableRow
key={chatSessionMinimal.id}
chatSessionMinimal={chatSessionMinimal}
/>
))}
</TableBody>
)}
</Table>
{chatSessionData && (
<div className="mt-3 flex">
<div className="mx-auto">
{chatSessionData && (
<Section>
<PageSelector
totalPages={totalPages}
currentPage={currentPage}
onPageChange={goToPage}
/>
</div>
</div>
)}
</Section>
)}
</Section>
</CardSection>
{showModal && (

View File

@@ -901,11 +901,6 @@ export default function useChatController({
});
}
}
// Surface FIFO errors (e.g. 429 before any packets arrive) so the
// catch block replaces the thinking placeholder with an error message.
if (stack.error) {
throw new Error(stack.error);
}
} catch (e: any) {
console.log("Error:", e);
const errorMsg = e.message;

View File

@@ -7,6 +7,7 @@ import * as Sentry from "@sentry/nextjs";
if (process.env.NEXT_PUBLIC_SENTRY_DSN) {
Sentry.init({
dsn: process.env.NEXT_PUBLIC_SENTRY_DSN,
release: process.env.SENTRY_RELEASE,
// Setting this option to true will print useful information to the console while you're setting up Sentry.
debug: false,

View File

@@ -292,10 +292,9 @@ export interface IconButtonProps
secondary?: boolean;
tertiary?: boolean;
internal?: boolean;
small?: boolean;
// Button size
large?: boolean;
small?: boolean;
// Button states
transient?: boolean;

View File

@@ -1,129 +0,0 @@
import React from "react";
import type { Meta, StoryObj } from "@storybook/react";
import Select from "./Select";
import { SvgSettings, SvgFolder, SvgSearch } from "@opal/icons";
import * as TooltipPrimitive from "@radix-ui/react-tooltip";
const meta: Meta<typeof Select> = {
title: "refresh-components/cards/Select",
component: Select,
tags: ["autodocs"],
decorators: [
(Story) => (
<TooltipPrimitive.Provider>
<div style={{ maxWidth: 500 }}>
<Story />
</div>
</TooltipPrimitive.Provider>
),
],
};
export default meta;
type Story = StoryObj<typeof Select>;
export const Disconnected: Story = {
args: {
icon: SvgFolder,
title: "Google Drive",
description: "Connect to sync your files",
status: "disconnected",
onConnect: () => {},
},
};
export const Connected: Story = {
args: {
icon: SvgFolder,
title: "Google Drive",
description: "Connected and syncing",
status: "connected",
onSelect: () => {},
onEdit: () => {},
},
};
export const Selected: Story = {
args: {
icon: SvgFolder,
title: "Google Drive",
description: "Currently the default source",
status: "selected",
onDeselect: () => {},
onEdit: () => {},
},
};
export const DisabledState: Story = {
args: {
icon: SvgFolder,
title: "Google Drive",
description: "Not available on this plan",
status: "disconnected",
disabled: true,
onConnect: () => {},
},
};
export const MediumSize: Story = {
args: {
icon: SvgSearch,
title: "Elastic Search",
description: "Search engine connector",
status: "connected",
medium: true,
onSelect: () => {},
},
};
export const CustomLabels: Story = {
args: {
icon: SvgSettings,
title: "Custom LLM",
description: "Your custom model endpoint",
status: "connected",
connectLabel: "Link",
selectLabel: "Make Primary",
selectedLabel: "Primary Model",
onSelect: () => {},
onEdit: () => {},
},
};
export const AllStates: Story = {
render: () => (
<div style={{ display: "flex", flexDirection: "column", gap: 12 }}>
<Select
icon={SvgFolder}
title="Google Drive"
description="Connect to sync your files"
status="disconnected"
onConnect={() => {}}
/>
<Select
icon={SvgSearch}
title="Confluence"
description="Connected and syncing"
status="connected"
onSelect={() => {}}
onEdit={() => {}}
/>
<Select
icon={SvgSettings}
title="Notion"
description="Currently the default source"
status="selected"
onDeselect={() => {}}
onEdit={() => {}}
/>
<Select
icon={SvgFolder}
title="Sharepoint"
description="Not available"
status="disconnected"
disabled
onConnect={() => {}}
/>
</div>
),
};

View File

@@ -1,229 +0,0 @@
"use client";
import React, { useState } from "react";
import type { IconProps } from "@opal/types";
import { cn, noProp } from "@/lib/utils";
import { Disabled } from "@opal/core";
import Text from "@/refresh-components/texts/Text";
import { Button } from "@opal/components";
import SelectButton from "@/refresh-components/buttons/SelectButton";
import {
SvgArrowExchange,
SvgArrowRightCircle,
SvgCheckSquare,
SvgSettings,
SvgUnplug,
} from "@opal/icons";
const containerClasses = {
selected: "border-action-link-05 bg-action-link-01",
connected: "border-border-01 bg-background-tint-00 hover:shadow-00",
disconnected: "border-border-01 bg-background-neutral-01 hover:shadow-00",
} as const;
export interface SelectProps
extends Omit<React.ComponentPropsWithoutRef<"div">, "title"> {
// Content
icon: React.FunctionComponent<IconProps>;
title: string;
description: string;
// State
status: "disconnected" | "connected" | "selected";
// Actions
onConnect?: () => void;
onSelect?: () => void;
onDeselect?: () => void;
onEdit?: () => void;
onDisconnect?: () => void;
// Labels (customizable)
connectLabel?: string;
selectLabel?: string;
selectedLabel?: string;
// Size
large?: boolean;
medium?: boolean;
// Optional
className?: string;
disabled?: boolean;
}
export default function Select({
icon: Icon,
title,
description,
status,
onConnect,
onSelect,
onDeselect,
onEdit,
onDisconnect,
connectLabel = "Connect",
selectLabel = "Set as Default",
selectedLabel = "Current Default",
large = true,
medium,
className,
disabled,
...rest
}: SelectProps) {
const sizeClass = medium ? "h-[3.75rem]" : "min-h-[3.75rem] max-h-[5.25rem]";
const containerClass = containerClasses[status];
const [isHovered, setIsHovered] = useState(false);
const isSelected = status === "selected";
const isConnected = status === "connected";
const isDisconnected = status === "disconnected";
const isCardClickable = isDisconnected && onConnect && !disabled;
const handleCardClick = () => {
if (isCardClickable) {
onConnect?.();
}
};
return (
<Disabled disabled={disabled} allowClick>
<div
{...rest}
onMouseEnter={() => setIsHovered(true)}
onMouseLeave={() => setIsHovered(false)}
onClick={isCardClickable ? handleCardClick : undefined}
className={cn(
"flex items-start justify-between gap-3 rounded-16 border p-2 min-w-[17.5rem]",
sizeClass,
containerClass,
isCardClickable &&
"cursor-pointer hover:bg-background-tint-01 transition-colors",
className
)}
>
{/* Left section - Icon, Title, Description */}
<div className="flex flex-1 items-start gap-1 p-1">
<div className="flex size-5 items-center justify-center px-0.5 shrink-0">
<Icon
className={cn(
"size-4",
isSelected ? "text-action-text-link-05" : "text-text-02"
)}
/>
</div>
<div className="flex flex-col gap-0.5">
<Text mainUiAction text05>
{title}
</Text>
<Text secondaryBody text03>
{description}
</Text>
</div>
</div>
{/* Right section - Actions */}
<div className="flex flex-col h-full items-end justify-between gap-1">
{/* Disconnected: Show Connect button */}
{isDisconnected && (
<Disabled disabled={disabled || !onConnect}>
<Button
prominence="tertiary"
onClick={noProp(onConnect)}
rightIcon={SvgArrowExchange}
>
{connectLabel}
</Button>
</Disabled>
)}
{/* Connected: Show select icon + settings icon */}
{isConnected && (
<>
<Disabled disabled={disabled || !onSelect}>
<SelectButton
action
folded
transient={isHovered}
onClick={onSelect}
rightIcon={SvgArrowRightCircle}
>
{selectLabel}
</SelectButton>
</Disabled>
<div className="flex px-1 gap-1">
{onDisconnect && (
<Disabled disabled={disabled}>
<Button
icon={SvgUnplug}
tooltip="Disconnect"
prominence="tertiary"
size="sm"
onClick={noProp(onDisconnect)}
aria-label={`Disconnect ${title}`}
/>
</Disabled>
)}
{onEdit && (
<Disabled disabled={disabled}>
<Button
icon={SvgSettings}
tooltip="Edit"
prominence="tertiary"
size="sm"
onClick={noProp(onEdit)}
aria-label={`Edit ${title}`}
/>
</Disabled>
)}
</div>
</>
)}
{/* Selected: Show "Current Default" label + settings icon */}
{isSelected && (
<>
<Disabled disabled={disabled}>
<SelectButton
action
engaged
onClick={onDeselect}
leftIcon={SvgCheckSquare}
>
{selectedLabel}
</SelectButton>
</Disabled>
<div className="flex px-1 gap-1">
{onDisconnect && (
<Disabled disabled={disabled}>
<Button
icon={SvgUnplug}
tooltip="Disconnect"
prominence="tertiary"
size="sm"
onClick={noProp(onDisconnect)}
aria-label={`Disconnect ${title}`}
/>
</Disabled>
)}
{onEdit && (
<Disabled disabled={disabled}>
<Button
icon={SvgSettings}
tooltip="Edit"
prominence="tertiary"
size="sm"
onClick={noProp(onEdit)}
aria-label={`Edit ${title}`}
/>
</Disabled>
)}
</div>
</>
)}
</div>
</div>
</Disabled>
);
}

View File

@@ -1,4 +1,2 @@
export { default as Card } from "./Card";
export type { CardProps } from "./Card";
export { default as Select } from "./Select";
export type { SelectProps } from "./Select";

View File

@@ -7,7 +7,7 @@ import {
IconProps,
OpenAIIcon,
} from "@/components/icons/icons";
import { Select } from "@/refresh-components/cards";
import ProviderCard from "@/sections/cards/ProviderCard";
import Message from "@/refresh-components/messages/Message";
import * as SettingsLayouts from "@/layouts/settings-layouts";
import { FetchError } from "@/lib/fetcher";
@@ -488,7 +488,7 @@ export default function VoiceConfigurationPage() {
const Icon = getProviderIcon(model.providerType);
return (
<Select
<ProviderCard
key={`${mode}-${model.id}`}
aria-label={`voice-${mode}-${model.id}`}
icon={Icon}

View File

@@ -0,0 +1,138 @@
"use client";
import type { IconFunctionComponent } from "@opal/types";
import { Button, SelectCard } from "@opal/components";
import { Content, CardHeaderLayout } from "@opal/layouts";
import {
SvgArrowExchange,
SvgArrowRightCircle,
SvgCheckSquare,
SvgSettings,
SvgUnplug,
} from "@opal/icons";
type ProviderStatus = "disconnected" | "connected" | "selected";
interface ProviderCardProps {
icon: IconFunctionComponent;
title: string;
description: string;
status: ProviderStatus;
onConnect?: () => void;
onSelect?: () => void;
onDeselect?: () => void;
onEdit?: () => void;
onDisconnect?: () => void;
selectedLabel?: string;
"aria-label"?: string;
}
const STATUS_TO_STATE = {
disconnected: "empty",
connected: "filled",
selected: "selected",
} as const;
export default function ProviderCard({
icon,
title,
description,
status,
onConnect,
onSelect,
onDeselect,
onEdit,
onDisconnect,
selectedLabel = "Current Default",
"aria-label": ariaLabel,
}: ProviderCardProps) {
const isDisconnected = status === "disconnected";
const isConnected = status === "connected";
const isSelected = status === "selected";
return (
<SelectCard
variant="select-card"
state={STATUS_TO_STATE[status]}
sizeVariant="lg"
aria-label={ariaLabel}
onClick={isDisconnected && onConnect ? onConnect : undefined}
>
<CardHeaderLayout
sizePreset="main-ui"
variant="section"
icon={icon}
title={title}
description={description}
rightChildren={
isDisconnected && onConnect ? (
<Button
prominence="tertiary"
rightIcon={SvgArrowExchange}
onClick={(e) => {
e.stopPropagation();
onConnect();
}}
>
Connect
</Button>
) : isConnected && onSelect ? (
<Button
prominence="tertiary"
rightIcon={SvgArrowRightCircle}
onClick={(e) => {
e.stopPropagation();
onSelect();
}}
>
Set as Default
</Button>
) : isSelected ? (
<div className="p-2">
<Content
title={selectedLabel}
sizePreset="main-ui"
variant="section"
icon={SvgCheckSquare}
/>
</div>
) : undefined
}
bottomRightChildren={
!isDisconnected ? (
<div className="flex flex-row px-1 pb-1">
{onDisconnect && (
<Button
icon={SvgUnplug}
tooltip="Disconnect"
aria-label={`Disconnect ${title}`}
prominence="tertiary"
onClick={(e) => {
e.stopPropagation();
onDisconnect();
}}
size="md"
/>
)}
{onEdit && (
<Button
icon={SvgSettings}
tooltip="Edit"
aria-label={`Edit ${title}`}
prominence="tertiary"
onClick={(e) => {
e.stopPropagation();
onEdit();
}}
size="md"
/>
)}
</div>
) : undefined
}
/>
</SelectCard>
);
}
export type { ProviderCardProps, ProviderStatus };