Compare commits

...

1 Commits

Author SHA1 Message Date
rohoswagger
6dc055c5cd fix(craft): potential fix for craft freezing in kube by truncating large tool call writes 2026-02-02 13:56:20 -08:00
3 changed files with 85 additions and 4 deletions

View File

@@ -109,6 +109,15 @@ SANDBOX_FILE_SYNC_SERVICE_ACCOUNT = os.environ.get(
ENABLE_CRAFT = os.environ.get("ENABLE_CRAFT", "false").lower() == "true"
# ============================================================================
# SSE Streaming Configuration
# ============================================================================
# Maximum character length for tool call content in SSE events
# Content exceeding this length will be truncated to prevent large payloads
# that can cause timeouts or client disconnects during streaming
MAX_SSE_CONTENT_LENGTH = int(os.environ.get("MAX_SSE_CONTENT_LENGTH", "500"))
# ============================================================================
# Rate Limiting Configuration
# ============================================================================

View File

@@ -1499,10 +1499,33 @@ echo '{tar_b64}' | base64 -d | tar -xzf -
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
)
except Exception as e:
# Log failure
except GeneratorExit:
# Generator was closed by consumer (client disconnect, timeout, broken pipe)
# This is the most common failure mode for SSE streaming
packet_logger.log_session_end(
session_id, success=False, error=str(e), events_count=events_count
session_id,
success=False,
error="GeneratorExit: Client disconnected or stream closed by consumer",
events_count=events_count,
)
raise
except Exception as e:
# Log failure from normal exceptions
packet_logger.log_session_end(
session_id,
success=False,
error=f"Exception: {str(e)}",
events_count=events_count,
)
raise
except BaseException as e:
# Log failure from other base exceptions (SystemExit, KeyboardInterrupt, etc.)
exception_type = type(e).__name__
packet_logger.log_session_end(
session_id,
success=False,
error=f"{exception_type}: {str(e) if str(e) else 'System-level interruption'}",
events_count=events_count,
)
raise
finally:

View File

@@ -46,6 +46,7 @@ from onyx.server.features.build.api.packet_logger import log_separator
from onyx.server.features.build.api.packets import BuildPacket
from onyx.server.features.build.api.packets import ErrorPacket
from onyx.server.features.build.api.rate_limit import get_user_rate_limit_status
from onyx.server.features.build.configs import MAX_SSE_CONTENT_LENGTH
from onyx.server.features.build.configs import MAX_TOTAL_UPLOAD_SIZE_BYTES
from onyx.server.features.build.configs import MAX_UPLOAD_FILES_PER_SESSION
from onyx.server.features.build.configs import PERSISTENT_DOCUMENT_STORAGE_PATH
@@ -1338,9 +1339,57 @@ class SessionManager:
db_session=self._db_session,
)
# Log full event to packet logger (can handle large payloads)
packet_logger.log("tool_call_progress", event_data)
packet_logger.log_sse_emit("tool_call_progress", session_id)
yield _serialize_acp_event(acp_event, "tool_call_progress")
# Create sanitized copy for SSE streaming to prevent massive JSON serialization
# that can block the generator and cause timeouts
sse_event_data = event_data.copy()
# Truncate rawInput.content if present and large
raw_input = sse_event_data.get("rawInput")
if raw_input and isinstance(raw_input, dict):
content = raw_input.get("content")
if (
content
and isinstance(content, str)
and len(content) > MAX_SSE_CONTENT_LENGTH
):
logger.debug(
f"Truncating tool_call_progress content from {len(content)} to {MAX_SSE_CONTENT_LENGTH} chars"
)
truncated = content[:MAX_SSE_CONTENT_LENGTH] + "..."
sse_event_data["rawInput"] = {
**raw_input,
"content": truncated,
"_truncated": True,
"_originalLength": len(content),
}
# Truncate rawOutput.content if present and large
raw_output = sse_event_data.get("rawOutput")
if raw_output and isinstance(raw_output, dict):
content = raw_output.get("content")
if (
content
and isinstance(content, str)
and len(content) > MAX_SSE_CONTENT_LENGTH
):
logger.debug(
f"Truncating tool_call_progress content from {len(content)} to {MAX_SSE_CONTENT_LENGTH} chars"
)
truncated = content[:MAX_SSE_CONTENT_LENGTH] + "..."
sse_event_data["rawOutput"] = {
**raw_output,
"content": truncated,
"_truncated": True,
"_originalLength": len(content),
}
# Yield sanitized event for SSE streaming
sse_event = ToolCallProgress.model_validate(sse_event_data)
yield _serialize_acp_event(sse_event, "tool_call_progress")
elif isinstance(acp_event, AgentPlanUpdate):
event_data = acp_event.model_dump(