mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-02-25 11:45:47 +00:00
Compare commits
3 Commits
ci_python_
...
v2.11.0-cl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f97549936 | ||
|
|
5bb60ccc71 | ||
|
|
6740418808 |
@@ -265,6 +265,14 @@ class RateLimitResponse(BaseModel):
|
||||
reset_timestamp: str | None = None
|
||||
|
||||
|
||||
# ===== Pre-Provisioned Session Check Models =====
|
||||
class PreProvisionedCheckResponse(BaseModel):
|
||||
"""Response for checking if a pre-provisioned session is still valid (empty)."""
|
||||
|
||||
valid: bool # True if session exists and has no messages
|
||||
session_id: str | None = None # Session ID if valid, None otherwise
|
||||
|
||||
|
||||
# ===== Build Connector Models =====
|
||||
class BuildConnectorStatus(str, Enum):
|
||||
"""Status of a build connector."""
|
||||
|
||||
@@ -8,11 +8,13 @@ from fastapi import File
|
||||
from fastapi import HTTPException
|
||||
from fastapi import Response
|
||||
from fastapi import UploadFile
|
||||
from sqlalchemy import exists
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.auth.users import current_user
|
||||
from onyx.db.engine.sql_engine import get_session
|
||||
from onyx.db.enums import SandboxStatus
|
||||
from onyx.db.models import BuildMessage
|
||||
from onyx.db.models import User
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.server.features.build.api.models import ArtifactResponse
|
||||
@@ -20,6 +22,7 @@ from onyx.server.features.build.api.models import DetailedSessionResponse
|
||||
from onyx.server.features.build.api.models import DirectoryListing
|
||||
from onyx.server.features.build.api.models import GenerateSuggestionsRequest
|
||||
from onyx.server.features.build.api.models import GenerateSuggestionsResponse
|
||||
from onyx.server.features.build.api.models import PreProvisionedCheckResponse
|
||||
from onyx.server.features.build.api.models import SessionCreateRequest
|
||||
from onyx.server.features.build.api.models import SessionListResponse
|
||||
from onyx.server.features.build.api.models import SessionNameGenerateResponse
|
||||
@@ -161,6 +164,41 @@ def get_session_details(
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/{session_id}/pre-provisioned-check", response_model=PreProvisionedCheckResponse
|
||||
)
|
||||
def check_pre_provisioned_session(
|
||||
session_id: UUID,
|
||||
user: User = Depends(current_user),
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> PreProvisionedCheckResponse:
|
||||
"""
|
||||
Check if a pre-provisioned session is still valid (empty).
|
||||
|
||||
Used by the frontend to poll and detect when another tab has used
|
||||
the session. A session is considered valid if it has no messages yet.
|
||||
|
||||
Returns:
|
||||
- valid=True, session_id=<id> if the session is still empty
|
||||
- valid=False, session_id=None if the session has messages or doesn't exist
|
||||
"""
|
||||
session = get_build_session(session_id, user.id, db_session)
|
||||
|
||||
if session is None:
|
||||
return PreProvisionedCheckResponse(valid=False, session_id=None)
|
||||
|
||||
# Check if session is still empty (no messages = pre-provisioned)
|
||||
has_messages = db_session.query(
|
||||
exists().where(BuildMessage.session_id == session_id)
|
||||
).scalar()
|
||||
|
||||
if not has_messages:
|
||||
return PreProvisionedCheckResponse(valid=True, session_id=str(session_id))
|
||||
|
||||
# Session has messages - it's no longer a valid pre-provisioned session
|
||||
return PreProvisionedCheckResponse(valid=False, session_id=None)
|
||||
|
||||
|
||||
@router.post("/{session_id}/generate-name", response_model=SessionNameGenerateResponse)
|
||||
def generate_session_name(
|
||||
session_id: UUID,
|
||||
|
||||
@@ -110,6 +110,21 @@ SANDBOX_FILE_SYNC_SERVICE_ACCOUNT = os.environ.get(
|
||||
|
||||
ENABLE_CRAFT = os.environ.get("ENABLE_CRAFT", "false").lower() == "true"
|
||||
|
||||
# ============================================================================
|
||||
# SSE Streaming Configuration
|
||||
# ============================================================================
|
||||
|
||||
# SSE keepalive interval in seconds - send keepalive comment if no events
|
||||
SSE_KEEPALIVE_INTERVAL = float(os.environ.get("SSE_KEEPALIVE_INTERVAL", "15.0"))
|
||||
|
||||
# ============================================================================
|
||||
# ACP (Agent Communication Protocol) Configuration
|
||||
# ============================================================================
|
||||
|
||||
# Timeout for ACP message processing in seconds
|
||||
# This is the maximum time to wait for a complete response from the agent
|
||||
ACP_MESSAGE_TIMEOUT = float(os.environ.get("ACP_MESSAGE_TIMEOUT", "900.0"))
|
||||
|
||||
# ============================================================================
|
||||
# Rate Limiting Configuration
|
||||
# ============================================================================
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
"""Database operations for Build Mode sessions."""
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
@@ -81,15 +80,19 @@ def get_user_build_sessions(
|
||||
db_session: Session,
|
||||
limit: int = 100,
|
||||
) -> list[BuildSession]:
|
||||
"""Get all build sessions for a user that have at least 1 message.
|
||||
"""Get all build sessions for a user that have at least one message.
|
||||
|
||||
Excludes empty (pre-provisioned) sessions from the listing.
|
||||
"""
|
||||
# Subquery to check if session has any messages
|
||||
has_messages = exists().where(BuildMessage.session_id == BuildSession.id)
|
||||
|
||||
return (
|
||||
db_session.query(BuildSession)
|
||||
.join(BuildMessage) # Inner join excludes empty sessions
|
||||
.filter(BuildSession.user_id == user_id)
|
||||
.group_by(BuildSession.id)
|
||||
.filter(
|
||||
BuildSession.user_id == user_id,
|
||||
has_messages, # Only sessions with messages
|
||||
)
|
||||
.order_by(desc(BuildSession.created_at))
|
||||
.limit(limit)
|
||||
.all()
|
||||
@@ -99,27 +102,24 @@ def get_user_build_sessions(
|
||||
def get_empty_session_for_user(
|
||||
user_id: UUID,
|
||||
db_session: Session,
|
||||
max_age_minutes: int = 30,
|
||||
demo_data_enabled: bool | None = None,
|
||||
) -> BuildSession | None:
|
||||
"""Get the user's empty session (0 messages) if one exists and is recent.
|
||||
"""Get an empty (pre-provisioned) session for the user if one exists.
|
||||
|
||||
Returns a session with no messages, or None if all sessions have messages.
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
db_session: Database session
|
||||
max_age_minutes: Maximum age of session to consider (default 30)
|
||||
demo_data_enabled: Match sessions with this demo_data setting.
|
||||
This ensures pre-provisioned sessions match the user's current
|
||||
preferences. If None, matches any session regardless of setting.
|
||||
Note: None is only used internally for operations that need to
|
||||
match any session (e.g., deletion).
|
||||
If None, matches any session regardless of setting.
|
||||
"""
|
||||
cutoff = datetime.utcnow() - timedelta(minutes=max_age_minutes)
|
||||
# Subquery to check if session has any messages
|
||||
has_messages = exists().where(BuildMessage.session_id == BuildSession.id)
|
||||
|
||||
query = db_session.query(BuildSession).filter(
|
||||
BuildSession.user_id == user_id,
|
||||
BuildSession.created_at > cutoff,
|
||||
~exists().where(BuildMessage.session_id == BuildSession.id),
|
||||
~has_messages, # Sessions with no messages only
|
||||
)
|
||||
|
||||
if demo_data_enabled is not None:
|
||||
|
||||
@@ -43,6 +43,8 @@ from kubernetes.stream.ws_client import WSClient # type: ignore
|
||||
from pydantic import ValidationError
|
||||
|
||||
from onyx.server.features.build.api.packet_logger import get_packet_logger
|
||||
from onyx.server.features.build.configs import ACP_MESSAGE_TIMEOUT
|
||||
from onyx.server.features.build.configs import SSE_KEEPALIVE_INTERVAL
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
@@ -57,6 +59,20 @@ DEFAULT_CLIENT_INFO = {
|
||||
"version": "1.0.0",
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class SSEKeepalive:
|
||||
"""Marker event to signal that an SSE keepalive should be sent.
|
||||
|
||||
This is yielded when no ACP events have been received for SSE_KEEPALIVE_INTERVAL
|
||||
seconds, allowing the SSE stream to send a comment to keep the connection alive.
|
||||
|
||||
Note: This is an internal event type - it's consumed by session/manager.py and
|
||||
converted to an SSE comment before leaving that layer. It should not be exposed
|
||||
to external consumers.
|
||||
"""
|
||||
|
||||
|
||||
# Union type for all possible events from send_message
|
||||
ACPEvent = (
|
||||
AgentMessageChunk
|
||||
@@ -67,6 +83,7 @@ ACPEvent = (
|
||||
| CurrentModeUpdate
|
||||
| PromptResponse
|
||||
| Error
|
||||
| SSEKeepalive
|
||||
)
|
||||
|
||||
|
||||
@@ -171,6 +188,7 @@ class ACPExecClient:
|
||||
stderr=True,
|
||||
tty=False,
|
||||
_preload_content=False,
|
||||
_request_timeout=900, # 15 minute timeout for long-running sessions
|
||||
)
|
||||
|
||||
# Start reader thread
|
||||
@@ -389,13 +407,13 @@ class ACPExecClient:
|
||||
def send_message(
|
||||
self,
|
||||
message: str,
|
||||
timeout: float = 300.0,
|
||||
timeout: float = ACP_MESSAGE_TIMEOUT,
|
||||
) -> Generator[ACPEvent, None, None]:
|
||||
"""Send a message and stream response events.
|
||||
|
||||
Args:
|
||||
message: The message content to send
|
||||
timeout: Maximum time to wait for complete response
|
||||
timeout: Maximum time to wait for complete response (defaults to ACP_MESSAGE_TIMEOUT env var)
|
||||
|
||||
Yields:
|
||||
Typed ACP schema event objects
|
||||
@@ -428,6 +446,7 @@ class ACPExecClient:
|
||||
|
||||
request_id = self._send_request("session/prompt", params)
|
||||
start_time = time.time()
|
||||
last_event_time = time.time() # Track time since last event for keepalive
|
||||
events_yielded = 0
|
||||
|
||||
while True:
|
||||
@@ -445,7 +464,20 @@ class ACPExecClient:
|
||||
|
||||
try:
|
||||
message_data = self._response_queue.get(timeout=min(remaining, 1.0))
|
||||
last_event_time = time.time() # Reset keepalive timer on event
|
||||
except Empty:
|
||||
# Check if we need to send an SSE keepalive
|
||||
idle_time = time.time() - last_event_time
|
||||
if idle_time >= SSE_KEEPALIVE_INTERVAL:
|
||||
packet_logger.log_raw(
|
||||
"SSE-KEEPALIVE-YIELD",
|
||||
{
|
||||
"session_id": session_id,
|
||||
"idle_seconds": idle_time,
|
||||
},
|
||||
)
|
||||
yield SSEKeepalive()
|
||||
last_event_time = time.time() # Reset after yielding keepalive
|
||||
continue
|
||||
|
||||
# Check for response to our prompt request
|
||||
@@ -538,6 +570,8 @@ class ACPExecClient:
|
||||
return "prompt_response"
|
||||
elif isinstance(event, Error):
|
||||
return "error"
|
||||
elif isinstance(event, SSEKeepalive):
|
||||
return "sse_keepalive"
|
||||
return "unknown"
|
||||
|
||||
def _process_session_update(
|
||||
|
||||
@@ -1499,10 +1499,33 @@ echo '{tar_b64}' | base64 -d | tar -xzf -
|
||||
packet_logger.log_session_end(
|
||||
session_id, success=True, events_count=events_count
|
||||
)
|
||||
except Exception as e:
|
||||
# Log failure
|
||||
except GeneratorExit:
|
||||
# Generator was closed by consumer (client disconnect, timeout, broken pipe)
|
||||
# This is the most common failure mode for SSE streaming
|
||||
packet_logger.log_session_end(
|
||||
session_id, success=False, error=str(e), events_count=events_count
|
||||
session_id,
|
||||
success=False,
|
||||
error="GeneratorExit: Client disconnected or stream closed by consumer",
|
||||
events_count=events_count,
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
# Log failure from normal exceptions
|
||||
packet_logger.log_session_end(
|
||||
session_id,
|
||||
success=False,
|
||||
error=f"Exception: {str(e)}",
|
||||
events_count=events_count,
|
||||
)
|
||||
raise
|
||||
except BaseException as e:
|
||||
# Log failure from other base exceptions (SystemExit, KeyboardInterrupt, etc.)
|
||||
exception_type = type(e).__name__
|
||||
packet_logger.log_session_end(
|
||||
session_id,
|
||||
success=False,
|
||||
error=f"{exception_type}: {str(e) if str(e) else 'System-level interruption'}",
|
||||
events_count=events_count,
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
|
||||
@@ -70,6 +70,9 @@ from onyx.server.features.build.db.sandbox import get_sandbox_by_session_id
|
||||
from onyx.server.features.build.db.sandbox import get_sandbox_by_user_id
|
||||
from onyx.server.features.build.db.sandbox import update_sandbox_heartbeat
|
||||
from onyx.server.features.build.sandbox import get_sandbox_manager
|
||||
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
|
||||
SSEKeepalive,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.models import LLMProviderConfig
|
||||
from onyx.server.features.build.session.prompts import BUILD_NAMING_SYSTEM_PROMPT
|
||||
from onyx.server.features.build.session.prompts import BUILD_NAMING_USER_PROMPT
|
||||
@@ -1238,6 +1241,14 @@ class SessionManager:
|
||||
for acp_event in self._sandbox_manager.send_message(
|
||||
sandbox_id, session_id, user_message_content
|
||||
):
|
||||
# Handle SSE keepalive - send comment to keep connection alive
|
||||
if isinstance(acp_event, SSEKeepalive):
|
||||
# SSE comments start with : and are ignored by EventSource
|
||||
# but keep the HTTP connection alive
|
||||
packet_logger.log_sse_emit("keepalive", session_id)
|
||||
yield ": keepalive\n\n"
|
||||
continue
|
||||
|
||||
# Check if we need to finalize pending chunks before processing
|
||||
event_type = self._get_event_type(acp_event)
|
||||
if state.should_finalize_chunks(event_type):
|
||||
@@ -1338,6 +1349,7 @@ class SessionManager:
|
||||
db_session=self._db_session,
|
||||
)
|
||||
|
||||
# Log full event to packet logger (can handle large payloads)
|
||||
packet_logger.log("tool_call_progress", event_data)
|
||||
packet_logger.log_sse_emit("tool_call_progress", session_id)
|
||||
yield _serialize_acp_event(acp_event, "tool_call_progress")
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
import { useEffect, useRef, useCallback } from "react";
|
||||
import { useRouter } from "next/navigation";
|
||||
import { useBuildSessionStore } from "@/app/craft/hooks/useBuildSessionStore";
|
||||
import { usePreProvisionPolling } from "@/app/craft/hooks/usePreProvisionPolling";
|
||||
import { CRAFT_SEARCH_PARAM_NAMES } from "@/app/craft/services/searchParams";
|
||||
import { CRAFT_PATH } from "@/app/craft/v1/constants";
|
||||
import { getBuildUserPersona } from "@/app/craft/onboarding/constants";
|
||||
@@ -102,10 +103,12 @@ export function useBuildSessionController({
|
||||
setCurrentSession(null);
|
||||
}
|
||||
|
||||
// Reset trigger state when transitioning FROM a session TO new build
|
||||
// This ensures we can re-provision when returning to the new build page
|
||||
// Reset state when transitioning FROM a session TO new build
|
||||
// This ensures we fetch fresh pre-provisioned status from backend
|
||||
if (prevExistingSessionId !== null) {
|
||||
setControllerTriggered(null);
|
||||
// Clear pre-provisioned state to force a fresh check from backend
|
||||
useBuildSessionStore.setState({ preProvisioning: { status: "idle" } });
|
||||
}
|
||||
|
||||
// Trigger pre-provisioning if conditions are met
|
||||
@@ -281,6 +284,10 @@ export function useBuildSessionController({
|
||||
router.push(CRAFT_PATH);
|
||||
}, [router]);
|
||||
|
||||
// Poll to verify pre-provisioned session is still valid (multi-tab support)
|
||||
// Only poll on welcome page (existingSessionId === null) - no point polling on session pages
|
||||
usePreProvisionPolling({ enabled: existingSessionId === null });
|
||||
|
||||
return {
|
||||
currentSessionId,
|
||||
isLoading,
|
||||
|
||||
89
web/src/app/craft/hooks/usePreProvisionPolling.ts
Normal file
89
web/src/app/craft/hooks/usePreProvisionPolling.ts
Normal file
@@ -0,0 +1,89 @@
|
||||
"use client";
|
||||
|
||||
import { useEffect, useRef } from "react";
|
||||
import { useBuildSessionStore } from "./useBuildSessionStore";
|
||||
import { checkPreProvisionedSession } from "../services/apiServices";
|
||||
|
||||
/** Polling interval in milliseconds (5 seconds) */
|
||||
const POLLING_INTERVAL_MS = 5000;
|
||||
|
||||
interface UsePreProvisionPollingOptions {
|
||||
/** Only poll when enabled (should be true only on welcome page) */
|
||||
enabled: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook that polls to verify the pre-provisioned session is still valid.
|
||||
*
|
||||
* When multiple browser tabs have the same pre-provisioned session,
|
||||
* one tab may claim it by sending a message. This hook detects when
|
||||
* that happens and triggers re-provisioning so the current tab gets
|
||||
* a fresh session.
|
||||
*
|
||||
* Usage: Call this hook on the welcome page where pre-provisioned
|
||||
* sessions are used. Pass enabled=true only on the welcome page.
|
||||
*/
|
||||
export function usePreProvisionPolling({
|
||||
enabled,
|
||||
}: UsePreProvisionPollingOptions) {
|
||||
const preProvisioning = useBuildSessionStore(
|
||||
(state) => state.preProvisioning
|
||||
);
|
||||
const ensurePreProvisionedSession = useBuildSessionStore(
|
||||
(state) => state.ensurePreProvisionedSession
|
||||
);
|
||||
|
||||
// Extract sessionId only when status is "ready" (handles discriminated union)
|
||||
const sessionId =
|
||||
preProvisioning.status === "ready" ? preProvisioning.sessionId : null;
|
||||
|
||||
// Use ref to track if we're currently checking (prevents overlapping requests)
|
||||
const isCheckingRef = useRef(false);
|
||||
|
||||
useEffect(() => {
|
||||
// Only poll when enabled (welcome page) and we have a ready session
|
||||
if (!enabled || !sessionId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const checkValidity = async () => {
|
||||
if (isCheckingRef.current) return;
|
||||
isCheckingRef.current = true;
|
||||
|
||||
try {
|
||||
const result = await checkPreProvisionedSession(sessionId);
|
||||
|
||||
if (!result.valid) {
|
||||
console.log(
|
||||
`[PreProvisionPolling] Session ${sessionId.slice(
|
||||
0,
|
||||
8
|
||||
)} was used, re-provisioning...`
|
||||
);
|
||||
// Session was used by another tab - reset state and re-provision.
|
||||
// Zustand setState is synchronous, so ensurePreProvisionedSession
|
||||
// will immediately see the idle status (no setTimeout needed).
|
||||
useBuildSessionStore.setState({
|
||||
preProvisioning: { status: "idle" },
|
||||
});
|
||||
ensurePreProvisionedSession();
|
||||
}
|
||||
} catch (err) {
|
||||
console.error("[PreProvisionPolling] Failed to check session:", err);
|
||||
// On error, don't re-provision - might be a network issue
|
||||
} finally {
|
||||
isCheckingRef.current = false;
|
||||
}
|
||||
};
|
||||
|
||||
// Start polling
|
||||
const intervalId = setInterval(checkValidity, POLLING_INTERVAL_MS);
|
||||
|
||||
// Also check immediately on mount (in case session was used while tab was inactive)
|
||||
checkValidity();
|
||||
|
||||
return () => {
|
||||
clearInterval(intervalId);
|
||||
};
|
||||
}, [enabled, sessionId, ensurePreProvisionedSession]);
|
||||
}
|
||||
@@ -236,6 +236,28 @@ export async function restoreSession(
|
||||
return res.json();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a pre-provisioned session is still valid (empty).
|
||||
* Used for polling to detect when another tab has used the session.
|
||||
*
|
||||
* @returns { valid: true, session_id: string } if session is still empty
|
||||
* @returns { valid: false, session_id: null } if session has messages or doesn't exist
|
||||
*/
|
||||
export async function checkPreProvisionedSession(
|
||||
sessionId: string
|
||||
): Promise<{ valid: boolean; session_id: string | null }> {
|
||||
const res = await fetch(
|
||||
`${API_BASE}/sessions/${sessionId}/pre-provisioned-check`
|
||||
);
|
||||
|
||||
if (!res.ok) {
|
||||
// Treat errors as invalid session
|
||||
return { valid: false, session_id: null };
|
||||
}
|
||||
|
||||
return res.json();
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Messages API
|
||||
// =============================================================================
|
||||
|
||||
Reference in New Issue
Block a user