Compare commits

...

24 Commits

Author SHA1 Message Date
rohoswagger
83cae3ebc0 feat(craft): persist opencode session data across sandbox sleep/wake
Move opencode's session storage from container-local ~/.local/share/ to
the shared workspace volume via XDG_DATA_HOME. This ensures session data
is captured in snapshots and restored on wake, allowing ephemeral ACP
clients to resume conversation context after sleep/wake cycles.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 13:56:44 -08:00
rohoswagger
de31483ac5 more nits 2026-02-16 19:28:23 -08:00
rohoswagger
79cf0bb7d4 clean up 2026-02-16 19:18:38 -08:00
rohoswagger
4143dfbb5d refactor(craft): simplify ACP session management — single resume_or_create_session entry point
Remove redundant two-layer session management that was left over from the
long-lived client pattern. With ephemeral clients, the in-memory session
cache and public wrapper methods were dead code.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 17:46:40 -08:00
rohoswagger
5ec8aff704 delete useless tests 2026-02-16 17:29:29 -08:00
rohoswagger
625b472df4 chore(craft): remove stale buffer cycle logic from ACP reader
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 17:13:05 -08:00
rohoswagger
1878f97444 chore(craft): remove ACP diagnostic logging
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 17:11:32 -08:00
rohoswagger
2690918386 fix(craft): use ephemeral ACP clients to prevent multi-replica session corruption
Root cause: each API server replica created a long-lived `opencode acp`
process per sandbox via k8s exec. With multiple replicas, multiple
processes ran inside the same container operating on the same session's
flat file storage (~/.local/share/opencode/storage/). This caused the
second process's in-memory state to become stale after the first process
modified the session files, resulting in the JSON-RPC completion response
being silently lost — hanging follow-up messages.

Fix: make ACP clients ephemeral — create a fresh `opencode acp` process
for each message, stop it in a finally block when done. Each message now
gets a clean process that reads session state fresh from disk, with no
stale in-memory state from a previous replica's modifications.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 16:50:37 -08:00
rohoswagger
d8bc7ae1dc fix(craft): add comprehensive reader thread diagnostics for ACP hang debugging
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 16:05:41 -08:00
rohoswagger
2d5b829e63 fix(craft): accept either completion signal + add ACP debug logging
ACP behavior is inconsistent — some prompts get both a prompt_response
notification AND a JSON-RPC response, others get only the notification.
Accept whichever arrives first. Add per-message dequeue logging to
understand the exact packet sequence ACP sends for each prompt.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 15:45:31 -08:00
rohoswagger
495e2f7c52 fix(craft): use JSON-RPC response as sole turn completion signal
Stop breaking out of the send_message loop on `prompt_response`
session/update notifications. Per the ACP spec (and Zed's impl), turn
completion is determined by the JSON-RPC response to session/prompt —
not by any notification. Breaking early on the notification left the
actual JSON-RPC response orphaned in the queue, which caused stale
message buildup and premature termination of follow-up prompts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 15:36:00 -08:00
rohoswagger
189cc5bc3c fix(craft): drain stale queue messages + usage_update end-of-turn guard
Drain leftover messages from the response queue before sending a new
prompt to prevent stale usage_update packets from prematurely terminating
follow-up messages. Additionally, only treat usage_update as an
end-of-turn signal when real content events have already been received.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 15:25:27 -08:00
rohoswagger
23ec38662e chore: update .gitignore to remove unnecessary entries
Removed entries for .claude and .worktrees from .gitignore to streamline ignored files. This change helps maintain a cleaner project structure.
2026-02-16 14:50:05 -08:00
rohoswagger
57d741c5b3 chore(craft): clean up diagnostic logging in ACP exec client
Remove verbose debug logging added during prompt #2 freeze investigation.
Keep operational logs (prompt start/complete, errors, connection health).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 14:48:01 -08:00
rohoswagger
021af74739 fix(craft): remove queue drain + treat usage_update as end-of-turn signal
- Remove pre-prompt queue drain that discarded messages between prompts.
  Stale messages now flow through the main loop naturally.
- Treat usage_update as implicit end-of-turn by synthesizing a
  PromptResponse, fixing prompt #2+ hangs where opencode doesn't send
  a JSON-RPC PromptResponse.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 14:36:35 -08:00
rohoswagger
adaca6a353 fix(craft): add diagnostic logging for prompt #2 freeze debugging
- Log sessionUpdate type for prompt #2+ events to identify what's flowing
- Respond to stale agent JSON-RPC requests during queue drain (potential hang cause)
- Log every keepalive instead of every 3rd for better observability

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 13:14:10 -08:00
rohoswagger
4cd07d7bbc fix(sandbox): send session/cancel when prompt is abandoned or times out
When a user sends a follow-up message, the SSE stream consumer disconnects,
triggering GeneratorExit on the in-flight prompt generator. Previously,
opencode continued processing the old prompt because session/cancel was
never sent, blocking subsequent prompts on the same process.

Now sends session/cancel in three scenarios:
- GeneratorExit (user sends new message / disconnects)
- Exception during prompt streaming
- Timeout (ACP_MESSAGE_TIMEOUT exceeded)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 12:30:07 -08:00
rohoswagger
bf3c98142d refactor(craft): Zed-style ACP session management — one process per sandbox
Instead of creating a fresh `opencode acp` process per prompt (or caching
one per session), use one long-lived process per sandbox with multiple ACP
sessions. This mirrors how Zed editor manages ACP connections.

Key changes:
- ACPExecClient: start() only does initialize (no session creation),
  new public create_session/resume_session/get_or_create_session methods,
  send_message accepts explicit session_id parameter, tracks multiple
  sessions in a dict
- KubernetesSandboxManager: _acp_clients keyed by sandbox_id (not
  session), _acp_session_ids maps (sandbox_id, session_id) → ACP session
  ID, switching sessions is implicit via sessionId in each prompt
- Tests rewritten for new architecture (8 passing)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 12:04:56 -08:00
rohoswagger
f023599618 fix(craft): add diagnostic logging for hanging prompt debug + silence usage_update
Adds targeted logging to identify why Prompt #2 hangs after usage_update:
- Reader thread: logs buffer state when unterminated data detected
- Reader thread: periodic idle heartbeat every ~5s
- send_message: logs wait state every 3rd keepalive
- Silences usage_update (token stats) in _process_session_update

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 10:49:55 -08:00
rohoswagger
c55cb899f7 fix(craft): handle unterminated ACP messages + clean up diagnostic logging
The reader thread only processed JSON messages terminated with \n. If the
agent's final response lacked a trailing newline, it sat in the buffer
forever causing send_message to hang with keepalives. Added stale buffer
detection that parses unterminated content after ~0.3s of no new data.

Also cleaned up verbose diagnostic logging ([ACP-LIFECYCLE], [ACP-READER],
[ACP-SEND] prefixes) added during debugging — moved per-message noise to
debug level, kept important lifecycle events at info.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 21:46:00 -08:00
rohoswagger
9b8a6e60b7 fix(craft): resume ACP sessions across API replicas for follow-up messages
Multiple API server replicas each maintain independent in-memory ACP client
caches. When a follow-up message is routed to a different replica, it creates
a new opencode session with no conversation context.

Fix: After initializing a new opencode ACP process, try session/list (filtered
by workspace cwd) to discover existing sessions from previous processes, then
session/resume to restore conversation context. Falls back to session/new if
the agent doesn't support these methods or no sessions exist.

Also adds api_pod hostname to SANDBOX-ACP log lines for replica identification.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 17:20:43 -08:00
rohoswagger
dd9d201b51 fix(craft): add extensive diagnostic logging for ACP follow-up messages
- Add [ACP-LIFECYCLE] logs for client start/stop with session IDs
- Add [ACP-READER] logs for every message read from WebSocket with
  update_type, queue size, and ACP session ID
- Add [ACP-SEND] logs for every dequeued message with prompt number,
  completion reason tracking, and queue state
- Add [SANDBOX-ACP] logs for cache hit/miss decisions and PromptResponse
  tracking in the sandbox manager
- Add stderr reading in reader thread to catch opencode errors
- Add queue drain at start of each send_message() to clear stale messages
- Track prompt_count per client to identify 1st vs 2nd+ prompts
- Log completion_reason (jsonrpc_response, notification, timeout, etc.)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 16:49:57 -08:00
rohoswagger
c545819aa6 fix(craft): harden ACP response matching for cached sessions
The cached ACPExecClient could emit keepalives forever on follow-up
messages because the PromptResponse was never matched. This adds:

- ID type-mismatch tolerance (str fallback for int/str id comparison)
- Guard against agent request ID collision (require no "method" field)
- PromptResponse via session/update notification handler
- Reader thread health check (detect dead WebSocket, stop keepalives)
- Buffer flush on reader thread exit (catch trailing PromptResponse)
- Diagnostic logging for every dequeued message and dropped messages

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 15:45:39 -08:00
rohoswagger
960ee228bf fix(craft): cache ACPExecClient in K8s sandbox to fix follow-up messages 2026-02-15 11:14:10 -08:00
2 changed files with 419 additions and 278 deletions

View File

@@ -4,8 +4,9 @@ This client runs `opencode acp` directly in the sandbox pod via kubernetes exec,
using stdin/stdout for JSON-RPC communication. This bypasses the HTTP server
and uses the native ACP subprocess protocol.
This module includes comprehensive logging for debugging ACP communication.
Enable logging by setting LOG_LEVEL=DEBUG or BUILD_PACKET_LOGGING=true.
Each message creates an ephemeral client (start → resume_or_create_session
send_message → stop) to prevent concurrent processes from corrupting
opencode's flat file session storage.
Usage:
client = ACPExecClient(
@@ -13,12 +14,14 @@ Usage:
namespace="onyx-sandboxes",
)
client.start(cwd="/workspace")
for event in client.send_message("What files are here?"):
session_id = client.resume_or_create_session(cwd="/workspace/sessions/abc")
for event in client.send_message("What files are here?", session_id=session_id):
print(event)
client.stop()
"""
import json
import shlex
import threading
import time
from collections.abc import Generator
@@ -100,7 +103,7 @@ class ACPClientState:
"""Internal state for the ACP client."""
initialized: bool = False
current_session: ACPSession | None = None
sessions: dict[str, ACPSession] = field(default_factory=dict)
next_request_id: int = 0
agent_capabilities: dict[str, Any] = field(default_factory=dict)
agent_info: dict[str, Any] = field(default_factory=dict)
@@ -155,16 +158,16 @@ class ACPExecClient:
self._k8s_client = client.CoreV1Api()
return self._k8s_client
def start(self, cwd: str = "/workspace", timeout: float = 30.0) -> str:
"""Start the agent process via exec and initialize a session.
def start(self, cwd: str = "/workspace", timeout: float = 30.0) -> None:
"""Start the agent process via exec and initialize the ACP connection.
Only performs the ACP `initialize` handshake. Sessions are created
separately via `resume_or_create_session()`.
Args:
cwd: Working directory for the agent
cwd: Working directory for the `opencode acp` process
timeout: Timeout for initialization
Returns:
The session ID
Raises:
RuntimeError: If startup fails
"""
@@ -173,8 +176,19 @@ class ACPExecClient:
k8s = self._get_k8s_client()
# Start opencode acp via exec
exec_command = ["opencode", "acp", "--cwd", cwd]
# Start opencode acp via exec.
# Set XDG_DATA_HOME so opencode stores session data on the shared
# workspace volume (accessible from file-sync container for snapshots)
# instead of the container-local ~/.local/share/ filesystem.
data_dir = shlex.quote(f"{cwd}/.opencode-data")
safe_cwd = shlex.quote(cwd)
exec_command = [
"/bin/sh",
"-c",
f"XDG_DATA_HOME={data_dir} exec opencode acp --cwd {safe_cwd}",
]
logger.info(f"[ACP] Starting client: pod={self._pod_name} cwd={cwd}")
try:
self._ws_client = k8s_stream(
@@ -201,15 +215,12 @@ class ACPExecClient:
# Give process a moment to start
time.sleep(0.5)
# Initialize ACP connection
# Initialize ACP connection (no session creation)
self._initialize(timeout=timeout)
# Create session
session_id = self._create_session(cwd=cwd, timeout=timeout)
return session_id
logger.info(f"[ACP] Client started: pod={self._pod_name}")
except Exception as e:
logger.error(f"[ACP] Client start failed: pod={self._pod_name} error={e}")
self.stop()
raise RuntimeError(f"Failed to start ACP exec client: {e}") from e
@@ -224,56 +235,52 @@ class ACPExecClient:
try:
if self._ws_client.is_open():
# Read available data
self._ws_client.update(timeout=0.1)
# Read stdout (channel 1)
# Read stderr - log any agent errors
stderr_data = self._ws_client.read_stderr(timeout=0.01)
if stderr_data:
logger.warning(
f"[ACP] stderr pod={self._pod_name}: "
f"{stderr_data.strip()[:500]}"
)
# Read stdout
data = self._ws_client.read_stdout(timeout=0.1)
if data:
buffer += data
# Process complete lines
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if line:
try:
message = json.loads(line)
# Log the raw incoming message
packet_logger.log_jsonrpc_raw_message(
"IN", message, context="k8s"
)
self._response_queue.put(message)
except json.JSONDecodeError:
packet_logger.log_raw(
"JSONRPC-PARSE-ERROR-K8S",
{
"raw_line": line[:500],
"error": "JSON decode failed",
},
)
logger.warning(
f"Invalid JSON from agent: {line[:100]}"
f"[ACP] Invalid JSON from agent: "
f"{line[:100]}"
)
else:
packet_logger.log_raw(
"K8S-WEBSOCKET-CLOSED",
{"pod": self._pod_name, "namespace": self._namespace},
)
logger.warning(f"[ACP] WebSocket closed: pod={self._pod_name}")
break
except Exception as e:
if not self._stop_reader.is_set():
packet_logger.log_raw(
"K8S-READER-ERROR",
{"error": str(e), "pod": self._pod_name},
)
logger.debug(f"Reader error: {e}")
logger.warning(f"[ACP] Reader error: {e}, pod={self._pod_name}")
break
def stop(self) -> None:
"""Stop the exec session and clean up."""
session_ids = list(self._state.sessions.keys())
logger.info(
f"[ACP] Stopping client: pod={self._pod_name} " f"sessions={session_ids}"
)
self._stop_reader.set()
if self._ws_client is not None:
@@ -400,42 +407,150 @@ class ACPExecClient:
if not session_id:
raise RuntimeError("No session ID returned from session/new")
self._state.current_session = ACPSession(session_id=session_id, cwd=cwd)
self._state.sessions[session_id] = ACPSession(session_id=session_id, cwd=cwd)
logger.info(f"[ACP] Created session: acp_session={session_id} cwd={cwd}")
return session_id
def _list_sessions(self, cwd: str, timeout: float = 10.0) -> list[dict[str, Any]]:
"""List available ACP sessions, filtered by working directory.
Returns:
List of session info dicts with keys like 'sessionId', 'cwd', 'title'.
Empty list if session/list is not supported or fails.
"""
try:
request_id = self._send_request("session/list", {"cwd": cwd})
result = self._wait_for_response(request_id, timeout)
sessions = result.get("sessions", [])
logger.info(f"[ACP] session/list: {len(sessions)} sessions for cwd={cwd}")
return sessions
except Exception as e:
logger.info(f"[ACP] session/list unavailable: {e}")
return []
def _resume_session(self, session_id: str, cwd: str, timeout: float = 30.0) -> str:
"""Resume an existing ACP session.
Args:
session_id: The ACP session ID to resume
cwd: Working directory for the session
timeout: Timeout for the resume request
Returns:
The session ID
Raises:
RuntimeError: If resume fails
"""
params = {
"sessionId": session_id,
"cwd": cwd,
"mcpServers": [],
}
request_id = self._send_request("session/resume", params)
result = self._wait_for_response(request_id, timeout)
# The response should contain the session ID
resumed_id = result.get("sessionId", session_id)
self._state.sessions[resumed_id] = ACPSession(session_id=resumed_id, cwd=cwd)
logger.info(f"[ACP] Resumed session: acp_session={resumed_id} cwd={cwd}")
return resumed_id
def _try_resume_existing_session(self, cwd: str, timeout: float) -> str | None:
"""Try to find and resume an existing session for this workspace.
When multiple API server replicas connect to the same sandbox pod,
a previous replica may have already created an ACP session for this
workspace. This method discovers and resumes that session so the
agent retains conversation context.
Args:
cwd: Working directory to search for sessions
timeout: Timeout for ACP requests
Returns:
The resumed session ID, or None if no session could be resumed
"""
# List sessions for this workspace directory
sessions = self._list_sessions(cwd, timeout=min(timeout, 10.0))
if not sessions:
return None
# Pick the most recent session (first in list, assuming sorted)
target = sessions[0]
target_id = target.get("sessionId")
if not target_id:
logger.warning("[ACP] session/list returned session without sessionId")
return None
logger.info(
f"[ACP] Resuming existing session: acp_session={target_id} "
f"(found {len(sessions)})"
)
try:
return self._resume_session(target_id, cwd, timeout)
except Exception as e:
logger.warning(
f"[ACP] session/resume failed for {target_id}: {e}, "
f"falling back to session/new"
)
return None
def resume_or_create_session(self, cwd: str, timeout: float = 30.0) -> str:
"""Resume a session from opencode's on-disk storage, or create a new one.
With ephemeral clients (one process per message), this always hits disk.
Tries resume first to preserve conversation context, falls back to new.
Args:
cwd: Working directory for the session
timeout: Timeout for ACP requests
Returns:
The ACP session ID
"""
if not self._state.initialized:
raise RuntimeError("Client not initialized. Call start() first.")
# Try to resume from opencode's persisted storage
resumed_id = self._try_resume_existing_session(cwd, timeout)
if resumed_id:
return resumed_id
# Create a new session
return self._create_session(cwd=cwd, timeout=timeout)
def send_message(
self,
message: str,
session_id: str,
timeout: float = ACP_MESSAGE_TIMEOUT,
) -> Generator[ACPEvent, None, None]:
"""Send a message and stream response events.
"""Send a message to a specific session and stream response events.
Args:
message: The message content to send
session_id: The ACP session ID to send the message to
timeout: Maximum time to wait for complete response (defaults to ACP_MESSAGE_TIMEOUT env var)
Yields:
Typed ACP schema event objects
"""
if self._state.current_session is None:
raise RuntimeError("No active session. Call start() first.")
session_id = self._state.current_session.session_id
if session_id not in self._state.sessions:
raise RuntimeError(
f"Unknown session {session_id}. "
f"Known sessions: {list(self._state.sessions.keys())}"
)
packet_logger = get_packet_logger()
# Log the start of message processing
packet_logger.log_raw(
"ACP-SEND-MESSAGE-START-K8S",
{
"session_id": session_id,
"pod": self._pod_name,
"namespace": self._namespace,
"message_preview": (
message[:200] + "..." if len(message) > 200 else message
),
"timeout": timeout,
},
logger.info(
f"[ACP] Sending prompt: "
f"acp_session={session_id} pod={self._pod_name} "
f"queue_backlog={self._response_queue.qsize()}"
)
prompt_content = [{"type": "text", "text": message}]
@@ -446,44 +561,53 @@ class ACPExecClient:
request_id = self._send_request("session/prompt", params)
start_time = time.time()
last_event_time = time.time() # Track time since last event for keepalive
last_event_time = time.time()
events_yielded = 0
keepalive_count = 0
completion_reason = "unknown"
while True:
remaining = timeout - (time.time() - start_time)
if remaining <= 0:
packet_logger.log_raw(
"ACP-TIMEOUT-K8S",
{
"session_id": session_id,
"elapsed_ms": (time.time() - start_time) * 1000,
},
completion_reason = "timeout"
logger.warning(
f"[ACP] Prompt timeout: "
f"acp_session={session_id} events={events_yielded}, "
f"sending session/cancel"
)
try:
self.cancel(session_id=session_id)
except Exception as cancel_err:
logger.warning(
f"[ACP] session/cancel failed on timeout: {cancel_err}"
)
yield Error(code=-1, message="Timeout waiting for response")
break
try:
message_data = self._response_queue.get(timeout=min(remaining, 1.0))
last_event_time = time.time() # Reset keepalive timer on event
last_event_time = time.time()
except Empty:
# Check if we need to send an SSE keepalive
# Send SSE keepalive if idle
idle_time = time.time() - last_event_time
if idle_time >= SSE_KEEPALIVE_INTERVAL:
packet_logger.log_raw(
"SSE-KEEPALIVE-YIELD",
{
"session_id": session_id,
"idle_seconds": idle_time,
},
)
keepalive_count += 1
yield SSEKeepalive()
last_event_time = time.time() # Reset after yielding keepalive
last_event_time = time.time()
continue
# Check for response to our prompt request
if message_data.get("id") == request_id:
# Check for JSON-RPC response to our prompt request.
msg_id = message_data.get("id")
is_response = "method" not in message_data and (
msg_id == request_id
or (msg_id is not None and str(msg_id) == str(request_id))
)
if is_response:
completion_reason = "jsonrpc_response"
if "error" in message_data:
error_data = message_data["error"]
completion_reason = "jsonrpc_error"
logger.warning(f"[ACP] Prompt error: {error_data}")
packet_logger.log_jsonrpc_response(
request_id, error=error_data, context="k8s"
)
@@ -498,26 +622,16 @@ class ACPExecClient:
)
try:
prompt_response = PromptResponse.model_validate(result)
packet_logger.log_acp_event_yielded(
"prompt_response", prompt_response
)
events_yielded += 1
yield prompt_response
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"type": "prompt_response", "error": str(e)},
)
logger.error(f"[ACP] PromptResponse validation failed: {e}")
# Log completion summary
elapsed_ms = (time.time() - start_time) * 1000
packet_logger.log_raw(
"ACP-SEND-MESSAGE-COMPLETE-K8S",
{
"session_id": session_id,
"events_yielded": events_yielded,
"elapsed_ms": elapsed_ms,
},
logger.info(
f"[ACP] Prompt complete: "
f"reason={completion_reason} acp_session={session_id} "
f"events={events_yielded} elapsed={elapsed_ms:.0f}ms"
)
break
@@ -526,25 +640,29 @@ class ACPExecClient:
params_data = message_data.get("params", {})
update = params_data.get("update", {})
# Log the notification
packet_logger.log_jsonrpc_notification(
"session/update",
{"update_type": update.get("sessionUpdate")},
context="k8s",
)
prompt_complete = False
for event in self._process_session_update(update):
events_yielded += 1
# Log each yielded event
event_type = self._get_event_type_name(event)
packet_logger.log_acp_event_yielded(event_type, event)
yield event
if isinstance(event, PromptResponse):
prompt_complete = True
break
if prompt_complete:
completion_reason = "prompt_response_via_notification"
elapsed_ms = (time.time() - start_time) * 1000
logger.info(
f"[ACP] Prompt complete: "
f"reason={completion_reason} acp_session={session_id} "
f"events={events_yielded} elapsed={elapsed_ms:.0f}ms"
)
break
# Handle requests from agent - send error response
elif "method" in message_data and "id" in message_data:
packet_logger.log_raw(
"ACP-UNSUPPORTED-REQUEST-K8S",
{"method": message_data["method"], "id": message_data["id"]},
logger.debug(
f"[ACP] Unsupported agent request: "
f"method={message_data['method']}"
)
self._send_error_response(
message_data["id"],
@@ -552,113 +670,47 @@ class ACPExecClient:
f"Method not supported: {message_data['method']}",
)
def _get_event_type_name(self, event: ACPEvent) -> str:
"""Get the type name for an ACP event."""
if isinstance(event, AgentMessageChunk):
return "agent_message_chunk"
elif isinstance(event, AgentThoughtChunk):
return "agent_thought_chunk"
elif isinstance(event, ToolCallStart):
return "tool_call_start"
elif isinstance(event, ToolCallProgress):
return "tool_call_progress"
elif isinstance(event, AgentPlanUpdate):
return "agent_plan_update"
elif isinstance(event, CurrentModeUpdate):
return "current_mode_update"
elif isinstance(event, PromptResponse):
return "prompt_response"
elif isinstance(event, Error):
return "error"
elif isinstance(event, SSEKeepalive):
return "sse_keepalive"
return "unknown"
else:
logger.warning(
f"[ACP] Unhandled message: "
f"id={message_data.get('id')} "
f"method={message_data.get('method')} "
f"keys={list(message_data.keys())}"
)
def _process_session_update(
self, update: dict[str, Any]
) -> Generator[ACPEvent, None, None]:
"""Process a session/update notification and yield typed ACP schema objects."""
update_type = update.get("sessionUpdate")
packet_logger = get_packet_logger()
if update_type == "agent_message_chunk":
# Map update types to their ACP schema classes.
# Note: prompt_response is included because ACP sometimes sends it as a
# notification WITHOUT a corresponding JSON-RPC response. We accept
# either signal as turn completion (first one wins).
type_map: dict[str, type] = {
"agent_message_chunk": AgentMessageChunk,
"agent_thought_chunk": AgentThoughtChunk,
"tool_call": ToolCallStart,
"tool_call_update": ToolCallProgress,
"plan": AgentPlanUpdate,
"current_mode_update": CurrentModeUpdate,
"prompt_response": PromptResponse,
}
model_class = type_map.get(update_type) # type: ignore[arg-type]
if model_class is not None:
try:
yield AgentMessageChunk.model_validate(update)
yield model_class.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "agent_thought_chunk":
try:
yield AgentThoughtChunk.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "user_message_chunk":
# Echo of user message - skip but log
packet_logger.log_raw(
"ACP-SKIPPED-UPDATE-K8S", {"type": "user_message_chunk"}
)
elif update_type == "tool_call":
try:
yield ToolCallStart.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "tool_call_update":
try:
yield ToolCallProgress.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "plan":
try:
yield AgentPlanUpdate.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "current_mode_update":
try:
yield CurrentModeUpdate.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "available_commands_update":
# Skip command updates
packet_logger.log_raw(
"ACP-SKIPPED-UPDATE-K8S", {"type": "available_commands_update"}
)
elif update_type == "session_info_update":
# Skip session info updates
packet_logger.log_raw(
"ACP-SKIPPED-UPDATE-K8S", {"type": "session_info_update"}
)
else:
# Unknown update types are logged
packet_logger.log_raw(
"ACP-UNKNOWN-UPDATE-TYPE-K8S",
{"update_type": update_type, "update": update},
)
logger.warning(f"[ACP] Validation error for {update_type}: {e}")
elif update_type not in (
"user_message_chunk",
"available_commands_update",
"session_info_update",
"usage_update",
):
logger.debug(f"[ACP] Unknown update type: {update_type}")
def _send_error_response(self, request_id: int, code: int, message: str) -> None:
"""Send an error response to an agent request."""
@@ -673,15 +725,24 @@ class ACPExecClient:
self._ws_client.write_stdin(json.dumps(response) + "\n")
def cancel(self) -> None:
"""Cancel the current operation."""
if self._state.current_session is None:
return
def cancel(self, session_id: str | None = None) -> None:
"""Cancel the current operation on a session.
self._send_notification(
"session/cancel",
{"sessionId": self._state.current_session.session_id},
)
Args:
session_id: The ACP session ID to cancel. If None, cancels all sessions.
"""
if session_id:
if session_id in self._state.sessions:
self._send_notification(
"session/cancel",
{"sessionId": session_id},
)
else:
for sid in self._state.sessions:
self._send_notification(
"session/cancel",
{"sessionId": sid},
)
def health_check(self, timeout: float = 5.0) -> bool: # noqa: ARG002
"""Check if we can exec into the pod."""
@@ -707,13 +768,6 @@ class ACPExecClient:
"""Check if the exec session is running."""
return self._ws_client is not None and self._ws_client.is_open()
@property
def session_id(self) -> str | None:
"""Get the current session ID, if any."""
if self._state.current_session:
return self._state.current_session.session_id
return None
def __enter__(self) -> "ACPExecClient":
"""Context manager entry."""
return self

View File

@@ -50,6 +50,7 @@ from pathlib import Path
from uuid import UUID
from uuid import uuid4
from acp.schema import PromptResponse
from kubernetes import client # type: ignore
from kubernetes import config
from kubernetes.client.rest import ApiException # type: ignore
@@ -97,6 +98,10 @@ from onyx.utils.logger import setup_logger
logger = setup_logger()
# API server pod hostname — used to identify which replica is handling a request.
# In K8s, HOSTNAME is set to the pod name (e.g., "api-server-dpgg7").
_API_SERVER_HOSTNAME = os.environ.get("HOSTNAME", "unknown")
# Constants for pod configuration
# Note: Next.js ports are dynamically allocated from SANDBOX_NEXTJS_PORT_START to
# SANDBOX_NEXTJS_PORT_END range, with one port per session.
@@ -1156,7 +1161,9 @@ done
def terminate(self, sandbox_id: UUID) -> None:
"""Terminate a sandbox and clean up Kubernetes resources.
Deletes the Service and Pod for the sandbox.
Removes session mappings for this sandbox, then deletes the
Service and Pod. ACP clients are ephemeral (created per message),
so there's nothing to stop here.
Args:
sandbox_id: The sandbox ID to terminate
@@ -1395,7 +1402,8 @@ echo "Session workspace setup complete"
) -> None:
"""Clean up a session workspace (on session delete).
Executes kubectl exec to remove the session directory.
Removes the ACP session mapping and executes kubectl exec to remove
the session directory. The shared ACP client persists for other sessions.
Args:
sandbox_id: The sandbox ID
@@ -1464,6 +1472,7 @@ echo "Session cleanup complete"
the snapshot and upload to S3. Captures:
- sessions/$session_id/outputs/ (generated artifacts, web apps)
- sessions/$session_id/attachments/ (user uploaded files)
- sessions/$session_id/.opencode-data/ (opencode session data for resumption)
Args:
sandbox_id: The sandbox ID
@@ -1488,9 +1497,10 @@ echo "Session cleanup complete"
f"{session_id_str}/{snapshot_id}.tar.gz"
)
# Exec into pod to create and upload snapshot (outputs + attachments)
# Uses s5cmd pipe to stream tar.gz directly to S3
# Only snapshot if outputs/ exists. Include attachments/ only if non-empty.
# Create tar and upload to S3 via file-sync container.
# .opencode-data/ is already on the shared workspace volume because we set
# XDG_DATA_HOME to the session directory when starting opencode (see
# ACPExecClient.start()). No cross-container copy needed.
exec_command = [
"/bin/sh",
"-c",
@@ -1503,6 +1513,7 @@ if [ ! -d outputs ]; then
fi
dirs="outputs"
[ -d attachments ] && [ "$(ls -A attachments 2>/dev/null)" ] && dirs="$dirs attachments"
[ -d .opencode-data ] && [ "$(ls -A .opencode-data 2>/dev/null)" ] && dirs="$dirs .opencode-data"
tar -czf - $dirs | /s5cmd pipe {s3_path}
echo "SNAPSHOT_CREATED"
""",
@@ -1624,6 +1635,7 @@ echo "SNAPSHOT_CREATED"
Steps:
1. Exec s5cmd cat in file-sync container to stream snapshot from S3
2. Pipe directly to tar for extraction in the shared workspace volume
(.opencode-data/ is restored automatically since XDG_DATA_HOME points here)
3. Regenerate configuration files (AGENTS.md, opencode.json, files symlink)
4. Start the NextJS dev server
@@ -1807,6 +1819,41 @@ echo "Session config regeneration complete"
)
return exec_client.health_check(timeout=timeout)
def _create_ephemeral_acp_client(
self, sandbox_id: UUID, session_path: str
) -> ACPExecClient:
"""Create a new ephemeral ACP client for a single message exchange.
Each call starts a fresh `opencode acp` process in the sandbox pod.
The process is short-lived — stopped after the message completes.
This prevents the bug where multiple long-lived processes (one per
API replica) operate on the same session's flat file storage
concurrently, causing the JSON-RPC response to be silently lost.
Args:
sandbox_id: The sandbox ID
session_path: Working directory for the session (e.g. /workspace/sessions/{id}).
XDG_DATA_HOME is set relative to this so opencode's session data
lives inside the snapshot directory.
Returns:
A running ACPExecClient (caller must stop it when done)
"""
pod_name = self._get_pod_name(str(sandbox_id))
acp_client = ACPExecClient(
pod_name=pod_name,
namespace=self._namespace,
container="sandbox",
)
acp_client.start(cwd=session_path)
logger.info(
f"[SANDBOX-ACP] Created ephemeral ACP client: "
f"sandbox={sandbox_id} pod={pod_name} "
f"api_pod={_API_SERVER_HOSTNAME}"
)
return acp_client
def send_message(
self,
sandbox_id: UUID,
@@ -1815,8 +1862,12 @@ echo "Session config regeneration complete"
) -> Generator[ACPEvent, None, None]:
"""Send a message to the CLI agent and stream ACP events.
Runs `opencode acp` via kubectl exec in the sandbox pod.
The agent runs in the session-specific workspace.
Creates an ephemeral `opencode acp` process for each message.
The process resumes the session from opencode's on-disk storage,
handles the prompt, then is stopped. This ensures only one process
operates on a session's flat files at a time, preventing the bug
where multiple long-lived processes (one per API replica) corrupt
each other's in-memory state.
Args:
sandbox_id: The sandbox ID
@@ -1827,67 +1878,103 @@ echo "Session config regeneration complete"
Typed ACP schema event objects
"""
packet_logger = get_packet_logger()
pod_name = self._get_pod_name(str(sandbox_id))
session_path = f"/workspace/sessions/{session_id}"
# Log ACP client creation
packet_logger.log_acp_client_start(
sandbox_id, session_id, session_path, context="k8s"
)
# Create an ephemeral ACP client for this message
acp_client = self._create_ephemeral_acp_client(sandbox_id, session_path)
exec_client = ACPExecClient(
pod_name=pod_name,
namespace=self._namespace,
container="sandbox",
)
# Log the send_message call at sandbox manager level
packet_logger.log_session_start(session_id, sandbox_id, message)
events_count = 0
try:
exec_client.start(cwd=session_path)
for event in exec_client.send_message(message):
events_count += 1
yield event
# Resume (or create) the ACP session from opencode's on-disk storage
acp_session_id = acp_client.resume_or_create_session(cwd=session_path)
# Log successful completion
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
logger.info(
f"[SANDBOX-ACP] Sending message: "
f"session={session_id} acp_session={acp_session_id} "
f"api_pod={_API_SERVER_HOSTNAME}"
)
except GeneratorExit:
# Generator was closed by consumer (client disconnect, timeout, broken pipe)
# This is the most common failure mode for SSE streaming
packet_logger.log_session_end(
session_id,
success=False,
error="GeneratorExit: Client disconnected or stream closed by consumer",
events_count=events_count,
)
raise
except Exception as e:
# Log failure from normal exceptions
packet_logger.log_session_end(
session_id,
success=False,
error=f"Exception: {str(e)}",
events_count=events_count,
)
raise
except BaseException as e:
# Log failure from other base exceptions (SystemExit, KeyboardInterrupt, etc.)
exception_type = type(e).__name__
packet_logger.log_session_end(
session_id,
success=False,
error=f"{exception_type}: {str(e) if str(e) else 'System-level interruption'}",
events_count=events_count,
)
raise
# Log the send_message call at sandbox manager level
packet_logger.log_session_start(session_id, sandbox_id, message)
events_count = 0
got_prompt_response = False
try:
for event in acp_client.send_message(
message, session_id=acp_session_id
):
events_count += 1
if isinstance(event, PromptResponse):
got_prompt_response = True
yield event
logger.info(
f"[SANDBOX-ACP] send_message completed: "
f"session={session_id} events={events_count} "
f"got_prompt_response={got_prompt_response}"
)
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
)
except GeneratorExit:
logger.warning(
f"[SANDBOX-ACP] GeneratorExit: session={session_id} "
f"events={events_count}, sending session/cancel"
)
try:
acp_client.cancel(session_id=acp_session_id)
except Exception as cancel_err:
logger.warning(
f"[SANDBOX-ACP] session/cancel failed on GeneratorExit: "
f"{cancel_err}"
)
packet_logger.log_session_end(
session_id,
success=False,
error="GeneratorExit: Client disconnected or stream closed by consumer",
events_count=events_count,
)
raise
except Exception as e:
logger.error(
f"[SANDBOX-ACP] Exception: session={session_id} "
f"events={events_count} error={e}, sending session/cancel"
)
try:
acp_client.cancel(session_id=acp_session_id)
except Exception as cancel_err:
logger.warning(
f"[SANDBOX-ACP] session/cancel failed on Exception: "
f"{cancel_err}"
)
packet_logger.log_session_end(
session_id,
success=False,
error=f"Exception: {str(e)}",
events_count=events_count,
)
raise
except BaseException as e:
logger.error(
f"[SANDBOX-ACP] {type(e).__name__}: session={session_id} "
f"error={e}"
)
packet_logger.log_session_end(
session_id,
success=False,
error=f"{type(e).__name__}: {str(e) if str(e) else 'System-level interruption'}",
events_count=events_count,
)
raise
finally:
exec_client.stop()
# Log client stop
packet_logger.log_acp_client_stop(sandbox_id, session_id, context="k8s")
# Always stop the ephemeral ACP client to kill the opencode process.
# This ensures no stale processes linger in the sandbox container.
try:
acp_client.stop()
except Exception as e:
logger.warning(
f"[SANDBOX-ACP] Failed to stop ephemeral ACP client: "
f"session={session_id} error={e}"
)
def list_directory(
self, sandbox_id: UUID, session_id: UUID, path: str