Compare commits

...

3 Commits

10 changed files with 270 additions and 22 deletions

View File

@@ -265,6 +265,14 @@ class RateLimitResponse(BaseModel):
reset_timestamp: str | None = None
# ===== Pre-Provisioned Session Check Models =====
class PreProvisionedCheckResponse(BaseModel):
"""Response for checking if a pre-provisioned session is still valid (empty)."""
valid: bool # True if session exists and has no messages
session_id: str | None = None # Session ID if valid, None otherwise
# ===== Build Connector Models =====
class BuildConnectorStatus(str, Enum):
"""Status of a build connector."""

View File

@@ -8,11 +8,13 @@ from fastapi import File
from fastapi import HTTPException
from fastapi import Response
from fastapi import UploadFile
from sqlalchemy import exists
from sqlalchemy.orm import Session
from onyx.auth.users import current_user
from onyx.db.engine.sql_engine import get_session
from onyx.db.enums import SandboxStatus
from onyx.db.models import BuildMessage
from onyx.db.models import User
from onyx.redis.redis_pool import get_redis_client
from onyx.server.features.build.api.models import ArtifactResponse
@@ -20,6 +22,7 @@ from onyx.server.features.build.api.models import DetailedSessionResponse
from onyx.server.features.build.api.models import DirectoryListing
from onyx.server.features.build.api.models import GenerateSuggestionsRequest
from onyx.server.features.build.api.models import GenerateSuggestionsResponse
from onyx.server.features.build.api.models import PreProvisionedCheckResponse
from onyx.server.features.build.api.models import SessionCreateRequest
from onyx.server.features.build.api.models import SessionListResponse
from onyx.server.features.build.api.models import SessionNameGenerateResponse
@@ -161,6 +164,41 @@ def get_session_details(
)
@router.get(
"/{session_id}/pre-provisioned-check", response_model=PreProvisionedCheckResponse
)
def check_pre_provisioned_session(
session_id: UUID,
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> PreProvisionedCheckResponse:
"""
Check if a pre-provisioned session is still valid (empty).
Used by the frontend to poll and detect when another tab has used
the session. A session is considered valid if it has no messages yet.
Returns:
- valid=True, session_id=<id> if the session is still empty
- valid=False, session_id=None if the session has messages or doesn't exist
"""
session = get_build_session(session_id, user.id, db_session)
if session is None:
return PreProvisionedCheckResponse(valid=False, session_id=None)
# Check if session is still empty (no messages = pre-provisioned)
has_messages = db_session.query(
exists().where(BuildMessage.session_id == session_id)
).scalar()
if not has_messages:
return PreProvisionedCheckResponse(valid=True, session_id=str(session_id))
# Session has messages - it's no longer a valid pre-provisioned session
return PreProvisionedCheckResponse(valid=False, session_id=None)
@router.post("/{session_id}/generate-name", response_model=SessionNameGenerateResponse)
def generate_session_name(
session_id: UUID,

View File

@@ -110,6 +110,21 @@ SANDBOX_FILE_SYNC_SERVICE_ACCOUNT = os.environ.get(
ENABLE_CRAFT = os.environ.get("ENABLE_CRAFT", "false").lower() == "true"
# ============================================================================
# SSE Streaming Configuration
# ============================================================================
# SSE keepalive interval in seconds - send keepalive comment if no events
SSE_KEEPALIVE_INTERVAL = float(os.environ.get("SSE_KEEPALIVE_INTERVAL", "15.0"))
# ============================================================================
# ACP (Agent Communication Protocol) Configuration
# ============================================================================
# Timeout for ACP message processing in seconds
# This is the maximum time to wait for a complete response from the agent
ACP_MESSAGE_TIMEOUT = float(os.environ.get("ACP_MESSAGE_TIMEOUT", "900.0"))
# ============================================================================
# Rate Limiting Configuration
# ============================================================================

View File

@@ -1,7 +1,6 @@
"""Database operations for Build Mode sessions."""
from datetime import datetime
from datetime import timedelta
from typing import Any
from uuid import UUID
@@ -81,15 +80,19 @@ def get_user_build_sessions(
db_session: Session,
limit: int = 100,
) -> list[BuildSession]:
"""Get all build sessions for a user that have at least 1 message.
"""Get all build sessions for a user that have at least one message.
Excludes empty (pre-provisioned) sessions from the listing.
"""
# Subquery to check if session has any messages
has_messages = exists().where(BuildMessage.session_id == BuildSession.id)
return (
db_session.query(BuildSession)
.join(BuildMessage) # Inner join excludes empty sessions
.filter(BuildSession.user_id == user_id)
.group_by(BuildSession.id)
.filter(
BuildSession.user_id == user_id,
has_messages, # Only sessions with messages
)
.order_by(desc(BuildSession.created_at))
.limit(limit)
.all()
@@ -99,27 +102,24 @@ def get_user_build_sessions(
def get_empty_session_for_user(
user_id: UUID,
db_session: Session,
max_age_minutes: int = 30,
demo_data_enabled: bool | None = None,
) -> BuildSession | None:
"""Get the user's empty session (0 messages) if one exists and is recent.
"""Get an empty (pre-provisioned) session for the user if one exists.
Returns a session with no messages, or None if all sessions have messages.
Args:
user_id: The user ID
db_session: Database session
max_age_minutes: Maximum age of session to consider (default 30)
demo_data_enabled: Match sessions with this demo_data setting.
This ensures pre-provisioned sessions match the user's current
preferences. If None, matches any session regardless of setting.
Note: None is only used internally for operations that need to
match any session (e.g., deletion).
If None, matches any session regardless of setting.
"""
cutoff = datetime.utcnow() - timedelta(minutes=max_age_minutes)
# Subquery to check if session has any messages
has_messages = exists().where(BuildMessage.session_id == BuildSession.id)
query = db_session.query(BuildSession).filter(
BuildSession.user_id == user_id,
BuildSession.created_at > cutoff,
~exists().where(BuildMessage.session_id == BuildSession.id),
~has_messages, # Sessions with no messages only
)
if demo_data_enabled is not None:

View File

@@ -43,6 +43,8 @@ from kubernetes.stream.ws_client import WSClient # type: ignore
from pydantic import ValidationError
from onyx.server.features.build.api.packet_logger import get_packet_logger
from onyx.server.features.build.configs import ACP_MESSAGE_TIMEOUT
from onyx.server.features.build.configs import SSE_KEEPALIVE_INTERVAL
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -57,6 +59,20 @@ DEFAULT_CLIENT_INFO = {
"version": "1.0.0",
}
@dataclass
class SSEKeepalive:
"""Marker event to signal that an SSE keepalive should be sent.
This is yielded when no ACP events have been received for SSE_KEEPALIVE_INTERVAL
seconds, allowing the SSE stream to send a comment to keep the connection alive.
Note: This is an internal event type - it's consumed by session/manager.py and
converted to an SSE comment before leaving that layer. It should not be exposed
to external consumers.
"""
# Union type for all possible events from send_message
ACPEvent = (
AgentMessageChunk
@@ -67,6 +83,7 @@ ACPEvent = (
| CurrentModeUpdate
| PromptResponse
| Error
| SSEKeepalive
)
@@ -171,6 +188,7 @@ class ACPExecClient:
stderr=True,
tty=False,
_preload_content=False,
_request_timeout=900, # 15 minute timeout for long-running sessions
)
# Start reader thread
@@ -389,13 +407,13 @@ class ACPExecClient:
def send_message(
self,
message: str,
timeout: float = 300.0,
timeout: float = ACP_MESSAGE_TIMEOUT,
) -> Generator[ACPEvent, None, None]:
"""Send a message and stream response events.
Args:
message: The message content to send
timeout: Maximum time to wait for complete response
timeout: Maximum time to wait for complete response (defaults to ACP_MESSAGE_TIMEOUT env var)
Yields:
Typed ACP schema event objects
@@ -428,6 +446,7 @@ class ACPExecClient:
request_id = self._send_request("session/prompt", params)
start_time = time.time()
last_event_time = time.time() # Track time since last event for keepalive
events_yielded = 0
while True:
@@ -445,7 +464,20 @@ class ACPExecClient:
try:
message_data = self._response_queue.get(timeout=min(remaining, 1.0))
last_event_time = time.time() # Reset keepalive timer on event
except Empty:
# Check if we need to send an SSE keepalive
idle_time = time.time() - last_event_time
if idle_time >= SSE_KEEPALIVE_INTERVAL:
packet_logger.log_raw(
"SSE-KEEPALIVE-YIELD",
{
"session_id": session_id,
"idle_seconds": idle_time,
},
)
yield SSEKeepalive()
last_event_time = time.time() # Reset after yielding keepalive
continue
# Check for response to our prompt request
@@ -538,6 +570,8 @@ class ACPExecClient:
return "prompt_response"
elif isinstance(event, Error):
return "error"
elif isinstance(event, SSEKeepalive):
return "sse_keepalive"
return "unknown"
def _process_session_update(

View File

@@ -1499,10 +1499,33 @@ echo '{tar_b64}' | base64 -d | tar -xzf -
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
)
except Exception as e:
# Log failure
except GeneratorExit:
# Generator was closed by consumer (client disconnect, timeout, broken pipe)
# This is the most common failure mode for SSE streaming
packet_logger.log_session_end(
session_id, success=False, error=str(e), events_count=events_count
session_id,
success=False,
error="GeneratorExit: Client disconnected or stream closed by consumer",
events_count=events_count,
)
raise
except Exception as e:
# Log failure from normal exceptions
packet_logger.log_session_end(
session_id,
success=False,
error=f"Exception: {str(e)}",
events_count=events_count,
)
raise
except BaseException as e:
# Log failure from other base exceptions (SystemExit, KeyboardInterrupt, etc.)
exception_type = type(e).__name__
packet_logger.log_session_end(
session_id,
success=False,
error=f"{exception_type}: {str(e) if str(e) else 'System-level interruption'}",
events_count=events_count,
)
raise
finally:

View File

@@ -70,6 +70,9 @@ from onyx.server.features.build.db.sandbox import get_sandbox_by_session_id
from onyx.server.features.build.db.sandbox import get_sandbox_by_user_id
from onyx.server.features.build.db.sandbox import update_sandbox_heartbeat
from onyx.server.features.build.sandbox import get_sandbox_manager
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
SSEKeepalive,
)
from onyx.server.features.build.sandbox.models import LLMProviderConfig
from onyx.server.features.build.session.prompts import BUILD_NAMING_SYSTEM_PROMPT
from onyx.server.features.build.session.prompts import BUILD_NAMING_USER_PROMPT
@@ -1238,6 +1241,14 @@ class SessionManager:
for acp_event in self._sandbox_manager.send_message(
sandbox_id, session_id, user_message_content
):
# Handle SSE keepalive - send comment to keep connection alive
if isinstance(acp_event, SSEKeepalive):
# SSE comments start with : and are ignored by EventSource
# but keep the HTTP connection alive
packet_logger.log_sse_emit("keepalive", session_id)
yield ": keepalive\n\n"
continue
# Check if we need to finalize pending chunks before processing
event_type = self._get_event_type(acp_event)
if state.should_finalize_chunks(event_type):
@@ -1338,6 +1349,7 @@ class SessionManager:
db_session=self._db_session,
)
# Log full event to packet logger (can handle large payloads)
packet_logger.log("tool_call_progress", event_data)
packet_logger.log_sse_emit("tool_call_progress", session_id)
yield _serialize_acp_event(acp_event, "tool_call_progress")

View File

@@ -3,6 +3,7 @@
import { useEffect, useRef, useCallback } from "react";
import { useRouter } from "next/navigation";
import { useBuildSessionStore } from "@/app/craft/hooks/useBuildSessionStore";
import { usePreProvisionPolling } from "@/app/craft/hooks/usePreProvisionPolling";
import { CRAFT_SEARCH_PARAM_NAMES } from "@/app/craft/services/searchParams";
import { CRAFT_PATH } from "@/app/craft/v1/constants";
import { getBuildUserPersona } from "@/app/craft/onboarding/constants";
@@ -102,10 +103,12 @@ export function useBuildSessionController({
setCurrentSession(null);
}
// Reset trigger state when transitioning FROM a session TO new build
// This ensures we can re-provision when returning to the new build page
// Reset state when transitioning FROM a session TO new build
// This ensures we fetch fresh pre-provisioned status from backend
if (prevExistingSessionId !== null) {
setControllerTriggered(null);
// Clear pre-provisioned state to force a fresh check from backend
useBuildSessionStore.setState({ preProvisioning: { status: "idle" } });
}
// Trigger pre-provisioning if conditions are met
@@ -281,6 +284,10 @@ export function useBuildSessionController({
router.push(CRAFT_PATH);
}, [router]);
// Poll to verify pre-provisioned session is still valid (multi-tab support)
// Only poll on welcome page (existingSessionId === null) - no point polling on session pages
usePreProvisionPolling({ enabled: existingSessionId === null });
return {
currentSessionId,
isLoading,

View File

@@ -0,0 +1,89 @@
"use client";
import { useEffect, useRef } from "react";
import { useBuildSessionStore } from "./useBuildSessionStore";
import { checkPreProvisionedSession } from "../services/apiServices";
/** Polling interval in milliseconds (5 seconds) */
const POLLING_INTERVAL_MS = 5000;
interface UsePreProvisionPollingOptions {
/** Only poll when enabled (should be true only on welcome page) */
enabled: boolean;
}
/**
* Hook that polls to verify the pre-provisioned session is still valid.
*
* When multiple browser tabs have the same pre-provisioned session,
* one tab may claim it by sending a message. This hook detects when
* that happens and triggers re-provisioning so the current tab gets
* a fresh session.
*
* Usage: Call this hook on the welcome page where pre-provisioned
* sessions are used. Pass enabled=true only on the welcome page.
*/
export function usePreProvisionPolling({
enabled,
}: UsePreProvisionPollingOptions) {
const preProvisioning = useBuildSessionStore(
(state) => state.preProvisioning
);
const ensurePreProvisionedSession = useBuildSessionStore(
(state) => state.ensurePreProvisionedSession
);
// Extract sessionId only when status is "ready" (handles discriminated union)
const sessionId =
preProvisioning.status === "ready" ? preProvisioning.sessionId : null;
// Use ref to track if we're currently checking (prevents overlapping requests)
const isCheckingRef = useRef(false);
useEffect(() => {
// Only poll when enabled (welcome page) and we have a ready session
if (!enabled || !sessionId) {
return;
}
const checkValidity = async () => {
if (isCheckingRef.current) return;
isCheckingRef.current = true;
try {
const result = await checkPreProvisionedSession(sessionId);
if (!result.valid) {
console.log(
`[PreProvisionPolling] Session ${sessionId.slice(
0,
8
)} was used, re-provisioning...`
);
// Session was used by another tab - reset state and re-provision.
// Zustand setState is synchronous, so ensurePreProvisionedSession
// will immediately see the idle status (no setTimeout needed).
useBuildSessionStore.setState({
preProvisioning: { status: "idle" },
});
ensurePreProvisionedSession();
}
} catch (err) {
console.error("[PreProvisionPolling] Failed to check session:", err);
// On error, don't re-provision - might be a network issue
} finally {
isCheckingRef.current = false;
}
};
// Start polling
const intervalId = setInterval(checkValidity, POLLING_INTERVAL_MS);
// Also check immediately on mount (in case session was used while tab was inactive)
checkValidity();
return () => {
clearInterval(intervalId);
};
}, [enabled, sessionId, ensurePreProvisionedSession]);
}

View File

@@ -236,6 +236,28 @@ export async function restoreSession(
return res.json();
}
/**
* Check if a pre-provisioned session is still valid (empty).
* Used for polling to detect when another tab has used the session.
*
* @returns { valid: true, session_id: string } if session is still empty
* @returns { valid: false, session_id: null } if session has messages or doesn't exist
*/
export async function checkPreProvisionedSession(
sessionId: string
): Promise<{ valid: boolean; session_id: string | null }> {
const res = await fetch(
`${API_BASE}/sessions/${sessionId}/pre-provisioned-check`
);
if (!res.ok) {
// Treat errors as invalid session
return { valid: false, session_id: null };
}
return res.json();
}
// =============================================================================
// Messages API
// =============================================================================