Compare commits

...

2 Commits

Author SHA1 Message Date
rohoswagger
5e5f9f91bc attempt 2 2026-02-02 17:03:49 -08:00
rohoswagger
6dc055c5cd fix(craft): potential fix for craft freezing in kube by truncating large tool call writes 2026-02-02 13:56:20 -08:00
4 changed files with 154 additions and 4 deletions

View File

@@ -109,6 +109,15 @@ SANDBOX_FILE_SYNC_SERVICE_ACCOUNT = os.environ.get(
ENABLE_CRAFT = os.environ.get("ENABLE_CRAFT", "false").lower() == "true"
# ============================================================================
# SSE Streaming Configuration
# ============================================================================
# Maximum character length for tool call content in SSE events
# Content exceeding this length will be truncated to prevent large payloads
# that can cause timeouts or client disconnects during streaming
MAX_SSE_CONTENT_LENGTH = int(os.environ.get("MAX_SSE_CONTENT_LENGTH", "500"))
# ============================================================================
# Rate Limiting Configuration
# ============================================================================

View File

@@ -57,6 +57,19 @@ DEFAULT_CLIENT_INFO = {
"version": "1.0.0",
}
# SSE keepalive interval - yield keepalive if no events for this many seconds
SSE_KEEPALIVE_INTERVAL = 15.0
@dataclass
class SSEKeepalive:
"""Marker event to signal that an SSE keepalive should be sent.
This is yielded when no ACP events have been received for SSE_KEEPALIVE_INTERVAL
seconds, allowing the SSE stream to send a comment to keep the connection alive.
"""
# Union type for all possible events from send_message
ACPEvent = (
AgentMessageChunk
@@ -67,6 +80,7 @@ ACPEvent = (
| CurrentModeUpdate
| PromptResponse
| Error
| SSEKeepalive
)
@@ -429,6 +443,7 @@ class ACPExecClient:
request_id = self._send_request("session/prompt", params)
start_time = time.time()
last_event_time = time.time() # Track time since last event for keepalive
events_yielded = 0
while True:
@@ -446,7 +461,20 @@ class ACPExecClient:
try:
message_data = self._response_queue.get(timeout=min(remaining, 1.0))
last_event_time = time.time() # Reset keepalive timer on event
except Empty:
# Check if we need to send an SSE keepalive
idle_time = time.time() - last_event_time
if idle_time >= SSE_KEEPALIVE_INTERVAL:
packet_logger.log_raw(
"SSE-KEEPALIVE-YIELD",
{
"session_id": session_id,
"idle_seconds": idle_time,
},
)
yield SSEKeepalive()
last_event_time = time.time() # Reset after yielding keepalive
continue
# Check for response to our prompt request

View File

@@ -1499,10 +1499,33 @@ echo '{tar_b64}' | base64 -d | tar -xzf -
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
)
except Exception as e:
# Log failure
except GeneratorExit:
# Generator was closed by consumer (client disconnect, timeout, broken pipe)
# This is the most common failure mode for SSE streaming
packet_logger.log_session_end(
session_id, success=False, error=str(e), events_count=events_count
session_id,
success=False,
error="GeneratorExit: Client disconnected or stream closed by consumer",
events_count=events_count,
)
raise
except Exception as e:
# Log failure from normal exceptions
packet_logger.log_session_end(
session_id,
success=False,
error=f"Exception: {str(e)}",
events_count=events_count,
)
raise
except BaseException as e:
# Log failure from other base exceptions (SystemExit, KeyboardInterrupt, etc.)
exception_type = type(e).__name__
packet_logger.log_session_end(
session_id,
success=False,
error=f"{exception_type}: {str(e) if str(e) else 'System-level interruption'}",
events_count=events_count,
)
raise
finally:

View File

@@ -46,6 +46,7 @@ from onyx.server.features.build.api.packet_logger import log_separator
from onyx.server.features.build.api.packets import BuildPacket
from onyx.server.features.build.api.packets import ErrorPacket
from onyx.server.features.build.api.rate_limit import get_user_rate_limit_status
from onyx.server.features.build.configs import MAX_SSE_CONTENT_LENGTH
from onyx.server.features.build.configs import MAX_TOTAL_UPLOAD_SIZE_BYTES
from onyx.server.features.build.configs import MAX_UPLOAD_FILES_PER_SESSION
from onyx.server.features.build.configs import PERSISTENT_DOCUMENT_STORAGE_PATH
@@ -70,6 +71,9 @@ from onyx.server.features.build.db.sandbox import get_sandbox_by_session_id
from onyx.server.features.build.db.sandbox import get_sandbox_by_user_id
from onyx.server.features.build.db.sandbox import update_sandbox_heartbeat
from onyx.server.features.build.sandbox import get_sandbox_manager
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
SSEKeepalive,
)
from onyx.server.features.build.sandbox.models import LLMProviderConfig
from onyx.server.features.build.session.prompts import BUILD_NAMING_SYSTEM_PROMPT
from onyx.server.features.build.session.prompts import BUILD_NAMING_USER_PROMPT
@@ -1238,6 +1242,14 @@ class SessionManager:
for acp_event in self._sandbox_manager.send_message(
sandbox_id, session_id, user_message_content
):
# Handle SSE keepalive - send comment to keep connection alive
if isinstance(acp_event, SSEKeepalive):
# SSE comments start with : and are ignored by EventSource
# but keep the HTTP connection alive
packet_logger.log_sse_emit("keepalive", session_id)
yield ": keepalive\n\n"
continue
# Check if we need to finalize pending chunks before processing
event_type = self._get_event_type(acp_event)
if state.should_finalize_chunks(event_type):
@@ -1338,9 +1350,87 @@ class SessionManager:
db_session=self._db_session,
)
# Log full event to packet logger (can handle large payloads)
packet_logger.log("tool_call_progress", event_data)
packet_logger.log_sse_emit("tool_call_progress", session_id)
yield _serialize_acp_event(acp_event, "tool_call_progress")
# Create sanitized copy for SSE streaming to prevent massive JSON serialization
# that can block the generator and cause timeouts
sse_event_data = event_data.copy()
# Truncate rawInput.content if present and large
raw_input = sse_event_data.get("rawInput")
truncated_raw_input = False
if raw_input and isinstance(raw_input, dict):
content = raw_input.get("content")
if (
content
and isinstance(content, str)
and len(content) > MAX_SSE_CONTENT_LENGTH
):
truncated_raw_input = True
original_len = len(content)
truncated = content[:MAX_SSE_CONTENT_LENGTH] + "..."
sse_event_data["rawInput"] = {
**raw_input,
"content": truncated,
"_truncated": True,
"_originalLength": original_len,
}
packet_logger.log_raw(
"SSE-TRUNCATE-RAWINPUT",
{
"original_len": original_len,
"truncated_len": len(truncated),
"session_id": str(session_id),
},
)
# Truncate rawOutput.content if present and large
raw_output = sse_event_data.get("rawOutput")
truncated_raw_output = False
if raw_output and isinstance(raw_output, dict):
content = raw_output.get("content")
if (
content
and isinstance(content, str)
and len(content) > MAX_SSE_CONTENT_LENGTH
):
truncated_raw_output = True
original_len = len(content)
truncated = content[:MAX_SSE_CONTENT_LENGTH] + "..."
sse_event_data["rawOutput"] = {
**raw_output,
"content": truncated,
"_truncated": True,
"_originalLength": original_len,
}
packet_logger.log_raw(
"SSE-TRUNCATE-RAWOUTPUT",
{
"original_len": original_len,
"truncated_len": len(truncated),
"session_id": str(session_id),
},
)
# Yield sanitized event for SSE streaming
sse_event = ToolCallProgress.model_validate(sse_event_data)
sse_payload = _serialize_acp_event(sse_event, "tool_call_progress")
# Log final payload size to verify truncation worked
if truncated_raw_input or truncated_raw_output:
packet_logger.log_raw(
"SSE-YIELD-AFTER-TRUNCATE",
{
"payload_bytes": len(sse_payload),
"truncated_input": truncated_raw_input,
"truncated_output": truncated_raw_output,
"session_id": str(session_id),
},
)
yield sse_payload
elif isinstance(acp_event, AgentPlanUpdate):
event_data = acp_event.model_dump(