Compare commits

...

17 Commits

Author SHA1 Message Date
rohoswagger
2690918386 fix(craft): use ephemeral ACP clients to prevent multi-replica session corruption
Root cause: each API server replica created a long-lived `opencode acp`
process per sandbox via k8s exec. With multiple replicas, multiple
processes ran inside the same container operating on the same session's
flat file storage (~/.local/share/opencode/storage/). This caused the
second process's in-memory state to become stale after the first process
modified the session files, resulting in the JSON-RPC completion response
being silently lost — hanging follow-up messages.

Fix: make ACP clients ephemeral — create a fresh `opencode acp` process
for each message, stop it in a finally block when done. Each message now
gets a clean process that reads session state fresh from disk, with no
stale in-memory state from a previous replica's modifications.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 16:50:37 -08:00
rohoswagger
d8bc7ae1dc fix(craft): add comprehensive reader thread diagnostics for ACP hang debugging
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 16:05:41 -08:00
rohoswagger
2d5b829e63 fix(craft): accept either completion signal + add ACP debug logging
ACP behavior is inconsistent — some prompts get both a prompt_response
notification AND a JSON-RPC response, others get only the notification.
Accept whichever arrives first. Add per-message dequeue logging to
understand the exact packet sequence ACP sends for each prompt.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 15:45:31 -08:00
rohoswagger
495e2f7c52 fix(craft): use JSON-RPC response as sole turn completion signal
Stop breaking out of the send_message loop on `prompt_response`
session/update notifications. Per the ACP spec (and Zed's impl), turn
completion is determined by the JSON-RPC response to session/prompt —
not by any notification. Breaking early on the notification left the
actual JSON-RPC response orphaned in the queue, which caused stale
message buildup and premature termination of follow-up prompts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 15:36:00 -08:00
rohoswagger
189cc5bc3c fix(craft): drain stale queue messages + usage_update end-of-turn guard
Drain leftover messages from the response queue before sending a new
prompt to prevent stale usage_update packets from prematurely terminating
follow-up messages. Additionally, only treat usage_update as an
end-of-turn signal when real content events have already been received.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 15:25:27 -08:00
rohoswagger
23ec38662e chore: update .gitignore to remove unnecessary entries
Removed entries for .claude and .worktrees from .gitignore to streamline ignored files. This change helps maintain a cleaner project structure.
2026-02-16 14:50:05 -08:00
rohoswagger
57d741c5b3 chore(craft): clean up diagnostic logging in ACP exec client
Remove verbose debug logging added during prompt #2 freeze investigation.
Keep operational logs (prompt start/complete, errors, connection health).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 14:48:01 -08:00
rohoswagger
021af74739 fix(craft): remove queue drain + treat usage_update as end-of-turn signal
- Remove pre-prompt queue drain that discarded messages between prompts.
  Stale messages now flow through the main loop naturally.
- Treat usage_update as implicit end-of-turn by synthesizing a
  PromptResponse, fixing prompt #2+ hangs where opencode doesn't send
  a JSON-RPC PromptResponse.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 14:36:35 -08:00
rohoswagger
adaca6a353 fix(craft): add diagnostic logging for prompt #2 freeze debugging
- Log sessionUpdate type for prompt #2+ events to identify what's flowing
- Respond to stale agent JSON-RPC requests during queue drain (potential hang cause)
- Log every keepalive instead of every 3rd for better observability

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 13:14:10 -08:00
rohoswagger
4cd07d7bbc fix(sandbox): send session/cancel when prompt is abandoned or times out
When a user sends a follow-up message, the SSE stream consumer disconnects,
triggering GeneratorExit on the in-flight prompt generator. Previously,
opencode continued processing the old prompt because session/cancel was
never sent, blocking subsequent prompts on the same process.

Now sends session/cancel in three scenarios:
- GeneratorExit (user sends new message / disconnects)
- Exception during prompt streaming
- Timeout (ACP_MESSAGE_TIMEOUT exceeded)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 12:30:07 -08:00
rohoswagger
bf3c98142d refactor(craft): Zed-style ACP session management — one process per sandbox
Instead of creating a fresh `opencode acp` process per prompt (or caching
one per session), use one long-lived process per sandbox with multiple ACP
sessions. This mirrors how Zed editor manages ACP connections.

Key changes:
- ACPExecClient: start() only does initialize (no session creation),
  new public create_session/resume_session/get_or_create_session methods,
  send_message accepts explicit session_id parameter, tracks multiple
  sessions in a dict
- KubernetesSandboxManager: _acp_clients keyed by sandbox_id (not
  session), _acp_session_ids maps (sandbox_id, session_id) → ACP session
  ID, switching sessions is implicit via sessionId in each prompt
- Tests rewritten for new architecture (8 passing)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 12:04:56 -08:00
rohoswagger
f023599618 fix(craft): add diagnostic logging for hanging prompt debug + silence usage_update
Adds targeted logging to identify why Prompt #2 hangs after usage_update:
- Reader thread: logs buffer state when unterminated data detected
- Reader thread: periodic idle heartbeat every ~5s
- send_message: logs wait state every 3rd keepalive
- Silences usage_update (token stats) in _process_session_update

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 10:49:55 -08:00
rohoswagger
c55cb899f7 fix(craft): handle unterminated ACP messages + clean up diagnostic logging
The reader thread only processed JSON messages terminated with \n. If the
agent's final response lacked a trailing newline, it sat in the buffer
forever causing send_message to hang with keepalives. Added stale buffer
detection that parses unterminated content after ~0.3s of no new data.

Also cleaned up verbose diagnostic logging ([ACP-LIFECYCLE], [ACP-READER],
[ACP-SEND] prefixes) added during debugging — moved per-message noise to
debug level, kept important lifecycle events at info.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 21:46:00 -08:00
rohoswagger
9b8a6e60b7 fix(craft): resume ACP sessions across API replicas for follow-up messages
Multiple API server replicas each maintain independent in-memory ACP client
caches. When a follow-up message is routed to a different replica, it creates
a new opencode session with no conversation context.

Fix: After initializing a new opencode ACP process, try session/list (filtered
by workspace cwd) to discover existing sessions from previous processes, then
session/resume to restore conversation context. Falls back to session/new if
the agent doesn't support these methods or no sessions exist.

Also adds api_pod hostname to SANDBOX-ACP log lines for replica identification.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 17:20:43 -08:00
rohoswagger
dd9d201b51 fix(craft): add extensive diagnostic logging for ACP follow-up messages
- Add [ACP-LIFECYCLE] logs for client start/stop with session IDs
- Add [ACP-READER] logs for every message read from WebSocket with
  update_type, queue size, and ACP session ID
- Add [ACP-SEND] logs for every dequeued message with prompt number,
  completion reason tracking, and queue state
- Add [SANDBOX-ACP] logs for cache hit/miss decisions and PromptResponse
  tracking in the sandbox manager
- Add stderr reading in reader thread to catch opencode errors
- Add queue drain at start of each send_message() to clear stale messages
- Track prompt_count per client to identify 1st vs 2nd+ prompts
- Log completion_reason (jsonrpc_response, notification, timeout, etc.)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 16:49:57 -08:00
rohoswagger
c545819aa6 fix(craft): harden ACP response matching for cached sessions
The cached ACPExecClient could emit keepalives forever on follow-up
messages because the PromptResponse was never matched. This adds:

- ID type-mismatch tolerance (str fallback for int/str id comparison)
- Guard against agent request ID collision (require no "method" field)
- PromptResponse via session/update notification handler
- Reader thread health check (detect dead WebSocket, stop keepalives)
- Buffer flush on reader thread exit (catch trailing PromptResponse)
- Diagnostic logging for every dequeued message and dropped messages

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 15:45:39 -08:00
rohoswagger
960ee228bf fix(craft): cache ACPExecClient in K8s sandbox to fix follow-up messages 2026-02-15 11:14:10 -08:00
7 changed files with 1087 additions and 297 deletions

View File

@@ -4,8 +4,9 @@ This client runs `opencode acp` directly in the sandbox pod via kubernetes exec,
using stdin/stdout for JSON-RPC communication. This bypasses the HTTP server
and uses the native ACP subprocess protocol.
This module includes comprehensive logging for debugging ACP communication.
Enable logging by setting LOG_LEVEL=DEBUG or BUILD_PACKET_LOGGING=true.
When multiple API server replicas share the same sandbox pod, this client
uses ACP session resumption (session/list + session/resume) to maintain
conversation context across replicas.
Usage:
client = ACPExecClient(
@@ -100,7 +101,7 @@ class ACPClientState:
"""Internal state for the ACP client."""
initialized: bool = False
current_session: ACPSession | None = None
sessions: dict[str, ACPSession] = field(default_factory=dict)
next_request_id: int = 0
agent_capabilities: dict[str, Any] = field(default_factory=dict)
agent_info: dict[str, Any] = field(default_factory=dict)
@@ -155,16 +156,16 @@ class ACPExecClient:
self._k8s_client = client.CoreV1Api()
return self._k8s_client
def start(self, cwd: str = "/workspace", timeout: float = 30.0) -> str:
"""Start the agent process via exec and initialize a session.
def start(self, cwd: str = "/workspace", timeout: float = 30.0) -> None:
"""Start the agent process via exec and initialize the ACP connection.
Only performs the ACP `initialize` handshake. Sessions are created
separately via `create_session()` or `resume_session()`.
Args:
cwd: Working directory for the agent
cwd: Working directory for the `opencode acp` process
timeout: Timeout for initialization
Returns:
The session ID
Raises:
RuntimeError: If startup fails
"""
@@ -176,6 +177,8 @@ class ACPExecClient:
# Start opencode acp via exec
exec_command = ["opencode", "acp", "--cwd", cwd]
logger.info(f"[ACP] Starting client: pod={self._pod_name} cwd={cwd}")
try:
self._ws_client = k8s_stream(
k8s.connect_get_namespaced_pod_exec,
@@ -201,15 +204,13 @@ class ACPExecClient:
# Give process a moment to start
time.sleep(0.5)
# Initialize ACP connection
# Initialize ACP connection (no session creation)
self._initialize(timeout=timeout)
# Create session
session_id = self._create_session(cwd=cwd, timeout=timeout)
return session_id
logger.info(f"[ACP] Client started: pod={self._pod_name}")
except Exception as e:
logger.error(f"[ACP] Client start failed: pod={self._pod_name} error={e}")
self.stop()
raise RuntimeError(f"Failed to start ACP exec client: {e}") from e
@@ -217,63 +218,190 @@ class ACPExecClient:
"""Background thread to read responses from the exec stream."""
buffer = ""
packet_logger = get_packet_logger()
messages_enqueued = 0
buffer_stale_cycles = 0
empty_reads = 0
while not self._stop_reader.is_set():
if self._ws_client is None:
break
logger.info(f"[ACP] Reader thread started: pod={self._pod_name}")
try:
if self._ws_client.is_open():
# Read available data
self._ws_client.update(timeout=0.1)
# Read stdout (channel 1)
data = self._ws_client.read_stdout(timeout=0.1)
if data:
buffer += data
# Process complete lines
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if line:
try:
message = json.loads(line)
# Log the raw incoming message
packet_logger.log_jsonrpc_raw_message(
"IN", message, context="k8s"
)
self._response_queue.put(message)
except json.JSONDecodeError:
packet_logger.log_raw(
"JSONRPC-PARSE-ERROR-K8S",
{
"raw_line": line[:500],
"error": "JSON decode failed",
},
)
logger.warning(
f"Invalid JSON from agent: {line[:100]}"
)
else:
packet_logger.log_raw(
"K8S-WEBSOCKET-CLOSED",
{"pod": self._pod_name, "namespace": self._namespace},
try:
while not self._stop_reader.is_set():
if self._ws_client is None:
logger.warning(
f"[ACP] Reader: ws_client is None, exiting: "
f"pod={self._pod_name}"
)
break
except Exception as e:
if not self._stop_reader.is_set():
packet_logger.log_raw(
"K8S-READER-ERROR",
{"error": str(e), "pod": self._pod_name},
try:
if self._ws_client.is_open():
self._ws_client.update(timeout=0.1)
# Read stderr
stderr_data = self._ws_client.read_stderr(timeout=0.01)
if stderr_data:
logger.warning(
f"[ACP] stderr pod={self._pod_name}: "
f"{stderr_data.strip()[:500]}"
)
# Read stdout
data = self._ws_client.read_stdout(timeout=0.1)
if data:
buffer += data
buffer_stale_cycles = 0
empty_reads = 0
# Log raw data chunks for debugging
newline_count = data.count("\n")
logger.info(
f"[ACP] Reader: received {len(data)} bytes, "
f"{newline_count} newlines, "
f"buffer_total={len(buffer)}b "
f"pod={self._pod_name}"
)
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if line:
try:
message = json.loads(line)
messages_enqueued += 1
# Identify the message for logging
msg_id = message.get("id")
method = message.get("method")
update_type = (
message.get("params", {})
.get("update", {})
.get("sessionUpdate", "")
)
has_result = "result" in message
has_error = "error" in message
logger.info(
f"[ACP] Reader enqueue #{messages_enqueued}: "
f"id={msg_id} method={method} "
f"update={update_type} "
f"has_result={has_result} "
f"has_error={has_error} "
f"queue_size={self._response_queue.qsize()} "
f"pod={self._pod_name}"
)
packet_logger.log_jsonrpc_raw_message(
"IN", message, context="k8s"
)
self._response_queue.put(message)
except json.JSONDecodeError:
logger.warning(
f"[ACP] Invalid JSON from agent: "
f"{line[:200]}"
)
# Log if there's partial data left in the buffer
if buffer.strip():
logger.info(
f"[ACP] Reader: partial buffer after parse: "
f"{len(buffer)}b "
f"preview={buffer.strip()[:200]} "
f"pod={self._pod_name}"
)
else:
empty_reads += 1
if buffer.strip():
buffer_stale_cycles += 1
if buffer_stale_cycles == 1:
logger.info(
f"[ACP] Reader: unterminated buffer: "
f"{len(buffer)}b "
f"preview={buffer.strip()[:200]} "
f"pod={self._pod_name}"
)
if buffer_stale_cycles >= 3:
try:
message = json.loads(buffer.strip())
messages_enqueued += 1
logger.info(
f"[ACP] Reader: parsed unterminated "
f"buffer as message "
f"#{messages_enqueued}: "
f"id={message.get('id')} "
f"method={message.get('method')} "
f"pod={self._pod_name}"
)
packet_logger.log_jsonrpc_raw_message(
"IN",
message,
context="k8s-unterminated",
)
self._response_queue.put(message)
buffer = ""
buffer_stale_cycles = 0
except json.JSONDecodeError:
pass
# Log every ~3s of silence
if empty_reads > 0 and empty_reads % 30 == 0:
logger.info(
f"[ACP] Reader idle: "
f"no data for {empty_reads} cycles "
f"(~{empty_reads * 0.1:.0f}s) "
f"ws_open={self._ws_client.is_open()} "
f"buffer={len(buffer)}b "
f"enqueued={messages_enqueued} "
f"queue_size={self._response_queue.qsize()} "
f"pod={self._pod_name}"
)
else:
logger.warning(
f"[ACP] WebSocket closed: pod={self._pod_name} "
f"enqueued={messages_enqueued}"
)
break
except Exception as e:
if not self._stop_reader.is_set():
logger.warning(
f"[ACP] Reader error: {e}, "
f"enqueued={messages_enqueued} "
f"pod={self._pod_name}"
)
break
finally:
remaining = buffer.strip()
if remaining:
logger.info(
f"[ACP] Reader: flushing buffer on exit: "
f"{len(remaining)}b preview={remaining[:200]} "
f"pod={self._pod_name}"
)
try:
message = json.loads(remaining)
packet_logger.log_jsonrpc_raw_message(
"IN", message, context="k8s-flush"
)
logger.debug(f"Reader error: {e}")
break
self._response_queue.put(message)
except json.JSONDecodeError:
logger.warning(
f"[ACP] Buffer flush failed (not JSON): {remaining[:200]}"
)
logger.info(
f"[ACP] Reader thread exiting: pod={self._pod_name} "
f"enqueued={messages_enqueued} "
f"queue_size={self._response_queue.qsize()}"
)
def stop(self) -> None:
"""Stop the exec session and clean up."""
session_ids = list(self._state.sessions.keys())
logger.info(
f"[ACP] Stopping client: pod={self._pod_name} " f"sessions={session_ids}"
)
self._stop_reader.set()
if self._ws_client is not None:
@@ -400,42 +528,198 @@ class ACPExecClient:
if not session_id:
raise RuntimeError("No session ID returned from session/new")
self._state.current_session = ACPSession(session_id=session_id, cwd=cwd)
self._state.sessions[session_id] = ACPSession(session_id=session_id, cwd=cwd)
logger.info(f"[ACP] Created session: acp_session={session_id} cwd={cwd}")
return session_id
def _list_sessions(self, cwd: str, timeout: float = 10.0) -> list[dict[str, Any]]:
"""List available ACP sessions, filtered by working directory.
Returns:
List of session info dicts with keys like 'sessionId', 'cwd', 'title'.
Empty list if session/list is not supported or fails.
"""
try:
request_id = self._send_request("session/list", {"cwd": cwd})
result = self._wait_for_response(request_id, timeout)
sessions = result.get("sessions", [])
logger.info(f"[ACP] session/list: {len(sessions)} sessions for cwd={cwd}")
return sessions
except Exception as e:
logger.info(f"[ACP] session/list unavailable: {e}")
return []
def _resume_session(self, session_id: str, cwd: str, timeout: float = 30.0) -> str:
"""Resume an existing ACP session.
Args:
session_id: The ACP session ID to resume
cwd: Working directory for the session
timeout: Timeout for the resume request
Returns:
The session ID
Raises:
RuntimeError: If resume fails
"""
params = {
"sessionId": session_id,
"cwd": cwd,
"mcpServers": [],
}
request_id = self._send_request("session/resume", params)
result = self._wait_for_response(request_id, timeout)
# The response should contain the session ID
resumed_id = result.get("sessionId", session_id)
self._state.sessions[resumed_id] = ACPSession(session_id=resumed_id, cwd=cwd)
logger.info(f"[ACP] Resumed session: acp_session={resumed_id} cwd={cwd}")
return resumed_id
def _try_resume_existing_session(self, cwd: str, timeout: float) -> str | None:
"""Try to find and resume an existing session for this workspace.
When multiple API server replicas connect to the same sandbox pod,
a previous replica may have already created an ACP session for this
workspace. This method discovers and resumes that session so the
agent retains conversation context.
Args:
cwd: Working directory to search for sessions
timeout: Timeout for ACP requests
Returns:
The resumed session ID, or None if no session could be resumed
"""
# Check if the agent supports session/list + session/resume
session_caps = self._state.agent_capabilities.get("sessionCapabilities", {})
supports_list = session_caps.get("list") is not None
supports_resume = session_caps.get("resume") is not None
if not supports_list or not supports_resume:
logger.debug("[ACP] Agent does not support session resume")
return None
# List sessions for this workspace directory
sessions = self._list_sessions(cwd, timeout=min(timeout, 10.0))
if not sessions:
return None
# Pick the most recent session (first in list, assuming sorted)
target = sessions[0]
target_id = target.get("sessionId")
if not target_id:
logger.warning("[ACP] session/list returned session without sessionId")
return None
logger.info(
f"[ACP] Resuming existing session: acp_session={target_id} "
f"(found {len(sessions)})"
)
try:
return self._resume_session(target_id, cwd, timeout)
except Exception as e:
logger.warning(
f"[ACP] session/resume failed for {target_id}: {e}, "
f"falling back to session/new"
)
return None
def create_session(self, cwd: str, timeout: float = 30.0) -> str:
"""Create a new ACP session on this connection.
Args:
cwd: Working directory for the session
timeout: Timeout for the request
Returns:
The ACP session ID
"""
if not self._state.initialized:
raise RuntimeError("Client not initialized. Call start() first.")
return self._create_session(cwd=cwd, timeout=timeout)
def resume_session(self, session_id: str, cwd: str, timeout: float = 30.0) -> str:
"""Resume an existing ACP session on this connection.
Args:
session_id: The ACP session ID to resume
cwd: Working directory for the session
timeout: Timeout for the request
Returns:
The ACP session ID
"""
if not self._state.initialized:
raise RuntimeError("Client not initialized. Call start() first.")
return self._resume_session(session_id=session_id, cwd=cwd, timeout=timeout)
def get_or_create_session(self, cwd: str, timeout: float = 30.0) -> str:
"""Get an existing session for this cwd, or create/resume one.
Tries in order:
1. Return an already-tracked session for this cwd
2. Resume an existing session from opencode's storage (multi-replica)
3. Create a new session
Args:
cwd: Working directory for the session
timeout: Timeout for ACP requests
Returns:
The ACP session ID
"""
if not self._state.initialized:
raise RuntimeError("Client not initialized. Call start() first.")
# Check if we already have a session for this cwd
for sid, session in self._state.sessions.items():
if session.cwd == cwd:
logger.info(
f"[ACP] Reusing existing session: " f"acp_session={sid} cwd={cwd}"
)
return sid
# Try to resume from opencode's persisted storage
resumed_id = self._try_resume_existing_session(cwd, timeout)
if resumed_id:
return resumed_id
# Create a new session
return self._create_session(cwd=cwd, timeout=timeout)
def send_message(
self,
message: str,
session_id: str,
timeout: float = ACP_MESSAGE_TIMEOUT,
) -> Generator[ACPEvent, None, None]:
"""Send a message and stream response events.
"""Send a message to a specific session and stream response events.
Args:
message: The message content to send
session_id: The ACP session ID to send the message to
timeout: Maximum time to wait for complete response (defaults to ACP_MESSAGE_TIMEOUT env var)
Yields:
Typed ACP schema event objects
"""
if self._state.current_session is None:
raise RuntimeError("No active session. Call start() first.")
session_id = self._state.current_session.session_id
if session_id not in self._state.sessions:
raise RuntimeError(
f"Unknown session {session_id}. "
f"Known sessions: {list(self._state.sessions.keys())}"
)
packet_logger = get_packet_logger()
# Log the start of message processing
packet_logger.log_raw(
"ACP-SEND-MESSAGE-START-K8S",
{
"session_id": session_id,
"pod": self._pod_name,
"namespace": self._namespace,
"message_preview": (
message[:200] + "..." if len(message) > 200 else message
),
"timeout": timeout,
},
logger.info(
f"[ACP] Sending prompt: "
f"acp_session={session_id} pod={self._pod_name} "
f"queue_backlog={self._response_queue.qsize()}"
)
prompt_content = [{"type": "text", "text": message}]
@@ -446,44 +730,105 @@ class ACPExecClient:
request_id = self._send_request("session/prompt", params)
start_time = time.time()
last_event_time = time.time() # Track time since last event for keepalive
last_event_time = time.time()
events_yielded = 0
keepalive_count = 0
completion_reason = "unknown"
while True:
remaining = timeout - (time.time() - start_time)
if remaining <= 0:
packet_logger.log_raw(
"ACP-TIMEOUT-K8S",
{
"session_id": session_id,
"elapsed_ms": (time.time() - start_time) * 1000,
},
completion_reason = "timeout"
logger.warning(
f"[ACP] Prompt timeout: "
f"acp_session={session_id} events={events_yielded}, "
f"sending session/cancel"
)
try:
self.cancel(session_id=session_id)
except Exception as cancel_err:
logger.warning(
f"[ACP] session/cancel failed on timeout: {cancel_err}"
)
yield Error(code=-1, message="Timeout waiting for response")
break
try:
message_data = self._response_queue.get(timeout=min(remaining, 1.0))
last_event_time = time.time() # Reset keepalive timer on event
last_event_time = time.time()
# Log every dequeued message for debugging ACP behavior
update_type = (
message_data.get("params", {})
.get("update", {})
.get("sessionUpdate", "")
)
logger.info(
f"[ACP] Dequeued: "
f"id={message_data.get('id')} "
f"method={message_data.get('method')} "
f"update={update_type} "
f"has_result={'result' in message_data} "
f"has_error={'error' in message_data} "
f"request_id={request_id} "
f"elapsed={(time.time() - start_time) * 1000:.0f}ms"
)
except Empty:
# Check if we need to send an SSE keepalive
# Check if reader thread is still alive
if (
self._reader_thread is not None
and not self._reader_thread.is_alive()
):
completion_reason = "reader_thread_dead"
# Drain any final messages the reader flushed before dying
while not self._response_queue.empty():
try:
final_msg = self._response_queue.get_nowait()
if final_msg.get("id") == request_id:
if "error" in final_msg:
error_data = final_msg["error"]
yield Error(
code=error_data.get("code", -1),
message=error_data.get(
"message", "Unknown error"
),
)
else:
result = final_msg.get("result", {})
try:
yield PromptResponse.model_validate(result)
except ValidationError:
pass
break
except Empty:
break
logger.warning(
f"[ACP] Reader thread dead: "
f"acp_session={session_id} events={events_yielded}"
)
break
# Send SSE keepalive if idle
idle_time = time.time() - last_event_time
if idle_time >= SSE_KEEPALIVE_INTERVAL:
packet_logger.log_raw(
"SSE-KEEPALIVE-YIELD",
{
"session_id": session_id,
"idle_seconds": idle_time,
},
)
keepalive_count += 1
yield SSEKeepalive()
last_event_time = time.time() # Reset after yielding keepalive
last_event_time = time.time()
continue
# Check for response to our prompt request
if message_data.get("id") == request_id:
# Check for JSON-RPC response to our prompt request.
msg_id = message_data.get("id")
is_response = "method" not in message_data and (
msg_id == request_id
or (msg_id is not None and str(msg_id) == str(request_id))
)
if is_response:
completion_reason = "jsonrpc_response"
if "error" in message_data:
error_data = message_data["error"]
completion_reason = "jsonrpc_error"
logger.warning(f"[ACP] Prompt error: {error_data}")
packet_logger.log_jsonrpc_response(
request_id, error=error_data, context="k8s"
)
@@ -498,26 +843,16 @@ class ACPExecClient:
)
try:
prompt_response = PromptResponse.model_validate(result)
packet_logger.log_acp_event_yielded(
"prompt_response", prompt_response
)
events_yielded += 1
yield prompt_response
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"type": "prompt_response", "error": str(e)},
)
logger.error(f"[ACP] PromptResponse validation failed: {e}")
# Log completion summary
elapsed_ms = (time.time() - start_time) * 1000
packet_logger.log_raw(
"ACP-SEND-MESSAGE-COMPLETE-K8S",
{
"session_id": session_id,
"events_yielded": events_yielded,
"elapsed_ms": elapsed_ms,
},
logger.info(
f"[ACP] Prompt complete: "
f"reason={completion_reason} acp_session={session_id} "
f"events={events_yielded} elapsed={elapsed_ms:.0f}ms"
)
break
@@ -526,25 +861,29 @@ class ACPExecClient:
params_data = message_data.get("params", {})
update = params_data.get("update", {})
# Log the notification
packet_logger.log_jsonrpc_notification(
"session/update",
{"update_type": update.get("sessionUpdate")},
context="k8s",
)
prompt_complete = False
for event in self._process_session_update(update):
events_yielded += 1
# Log each yielded event
event_type = self._get_event_type_name(event)
packet_logger.log_acp_event_yielded(event_type, event)
yield event
if isinstance(event, PromptResponse):
prompt_complete = True
break
if prompt_complete:
completion_reason = "prompt_response_via_notification"
elapsed_ms = (time.time() - start_time) * 1000
logger.info(
f"[ACP] Prompt complete: "
f"reason={completion_reason} acp_session={session_id} "
f"events={events_yielded} elapsed={elapsed_ms:.0f}ms"
)
break
# Handle requests from agent - send error response
elif "method" in message_data and "id" in message_data:
packet_logger.log_raw(
"ACP-UNSUPPORTED-REQUEST-K8S",
{"method": message_data["method"], "id": message_data["id"]},
logger.debug(
f"[ACP] Unsupported agent request: "
f"method={message_data['method']}"
)
self._send_error_response(
message_data["id"],
@@ -552,113 +891,47 @@ class ACPExecClient:
f"Method not supported: {message_data['method']}",
)
def _get_event_type_name(self, event: ACPEvent) -> str:
"""Get the type name for an ACP event."""
if isinstance(event, AgentMessageChunk):
return "agent_message_chunk"
elif isinstance(event, AgentThoughtChunk):
return "agent_thought_chunk"
elif isinstance(event, ToolCallStart):
return "tool_call_start"
elif isinstance(event, ToolCallProgress):
return "tool_call_progress"
elif isinstance(event, AgentPlanUpdate):
return "agent_plan_update"
elif isinstance(event, CurrentModeUpdate):
return "current_mode_update"
elif isinstance(event, PromptResponse):
return "prompt_response"
elif isinstance(event, Error):
return "error"
elif isinstance(event, SSEKeepalive):
return "sse_keepalive"
return "unknown"
else:
logger.warning(
f"[ACP] Unhandled message: "
f"id={message_data.get('id')} "
f"method={message_data.get('method')} "
f"keys={list(message_data.keys())}"
)
def _process_session_update(
self, update: dict[str, Any]
) -> Generator[ACPEvent, None, None]:
"""Process a session/update notification and yield typed ACP schema objects."""
update_type = update.get("sessionUpdate")
packet_logger = get_packet_logger()
if update_type == "agent_message_chunk":
# Map update types to their ACP schema classes.
# Note: prompt_response is included because ACP sometimes sends it as a
# notification WITHOUT a corresponding JSON-RPC response. We accept
# either signal as turn completion (first one wins).
type_map: dict[str, type] = {
"agent_message_chunk": AgentMessageChunk,
"agent_thought_chunk": AgentThoughtChunk,
"tool_call": ToolCallStart,
"tool_call_update": ToolCallProgress,
"plan": AgentPlanUpdate,
"current_mode_update": CurrentModeUpdate,
"prompt_response": PromptResponse,
}
model_class = type_map.get(update_type) # type: ignore[arg-type]
if model_class is not None:
try:
yield AgentMessageChunk.model_validate(update)
yield model_class.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "agent_thought_chunk":
try:
yield AgentThoughtChunk.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "user_message_chunk":
# Echo of user message - skip but log
packet_logger.log_raw(
"ACP-SKIPPED-UPDATE-K8S", {"type": "user_message_chunk"}
)
elif update_type == "tool_call":
try:
yield ToolCallStart.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "tool_call_update":
try:
yield ToolCallProgress.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "plan":
try:
yield AgentPlanUpdate.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "current_mode_update":
try:
yield CurrentModeUpdate.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "available_commands_update":
# Skip command updates
packet_logger.log_raw(
"ACP-SKIPPED-UPDATE-K8S", {"type": "available_commands_update"}
)
elif update_type == "session_info_update":
# Skip session info updates
packet_logger.log_raw(
"ACP-SKIPPED-UPDATE-K8S", {"type": "session_info_update"}
)
else:
# Unknown update types are logged
packet_logger.log_raw(
"ACP-UNKNOWN-UPDATE-TYPE-K8S",
{"update_type": update_type, "update": update},
)
logger.warning(f"[ACP] Validation error for {update_type}: {e}")
elif update_type not in (
"user_message_chunk",
"available_commands_update",
"session_info_update",
"usage_update",
):
logger.debug(f"[ACP] Unknown update type: {update_type}")
def _send_error_response(self, request_id: int, code: int, message: str) -> None:
"""Send an error response to an agent request."""
@@ -673,15 +946,24 @@ class ACPExecClient:
self._ws_client.write_stdin(json.dumps(response) + "\n")
def cancel(self) -> None:
"""Cancel the current operation."""
if self._state.current_session is None:
return
def cancel(self, session_id: str | None = None) -> None:
"""Cancel the current operation on a session.
self._send_notification(
"session/cancel",
{"sessionId": self._state.current_session.session_id},
)
Args:
session_id: The ACP session ID to cancel. If None, cancels all sessions.
"""
if session_id:
if session_id in self._state.sessions:
self._send_notification(
"session/cancel",
{"sessionId": session_id},
)
else:
for sid in self._state.sessions:
self._send_notification(
"session/cancel",
{"sessionId": sid},
)
def health_check(self, timeout: float = 5.0) -> bool: # noqa: ARG002
"""Check if we can exec into the pod."""
@@ -708,11 +990,9 @@ class ACPExecClient:
return self._ws_client is not None and self._ws_client.is_open()
@property
def session_id(self) -> str | None:
"""Get the current session ID, if any."""
if self._state.current_session:
return self._state.current_session.session_id
return None
def session_ids(self) -> list[str]:
"""Get all tracked session IDs."""
return list(self._state.sessions.keys())
def __enter__(self) -> "ACPExecClient":
"""Context manager entry."""

View File

@@ -50,6 +50,7 @@ from pathlib import Path
from uuid import UUID
from uuid import uuid4
from acp.schema import PromptResponse
from kubernetes import client # type: ignore
from kubernetes import config
from kubernetes.client.rest import ApiException # type: ignore
@@ -97,6 +98,10 @@ from onyx.utils.logger import setup_logger
logger = setup_logger()
# API server pod hostname — used to identify which replica is handling a request.
# In K8s, HOSTNAME is set to the pod name (e.g., "api-server-dpgg7").
_API_SERVER_HOSTNAME = os.environ.get("HOSTNAME", "unknown")
# Constants for pod configuration
# Note: Next.js ports are dynamically allocated from SANDBOX_NEXTJS_PORT_START to
# SANDBOX_NEXTJS_PORT_END range, with one port per session.
@@ -348,6 +353,12 @@ class KubernetesSandboxManager(SandboxManager):
self._service_account = SANDBOX_SERVICE_ACCOUNT_NAME
self._file_sync_service_account = SANDBOX_FILE_SYNC_SERVICE_ACCOUNT
# Maps (sandbox_id, craft_session_id) → ACP session ID.
# Cached across messages so we can resume the same opencode session.
# The actual opencode session is persisted on disk in the sandbox pod,
# so this cache is just an optimization to skip session/list + resume.
self._acp_session_ids: dict[tuple[UUID, UUID], str] = {}
# Load AGENTS.md template path
build_dir = Path(__file__).parent.parent.parent # /onyx/server/features/build/
self._agent_instructions_template_path = build_dir / "AGENTS.template.md"
@@ -1156,11 +1167,18 @@ done
def terminate(self, sandbox_id: UUID) -> None:
"""Terminate a sandbox and clean up Kubernetes resources.
Deletes the Service and Pod for the sandbox.
Removes session mappings for this sandbox, then deletes the
Service and Pod. ACP clients are ephemeral (created per message),
so there's nothing to stop here.
Args:
sandbox_id: The sandbox ID to terminate
"""
# Remove all session mappings for this sandbox
keys_to_remove = [key for key in self._acp_session_ids if key[0] == sandbox_id]
for key in keys_to_remove:
del self._acp_session_ids[key]
# Clean up Kubernetes resources (needs string for pod/service names)
self._cleanup_kubernetes_resources(str(sandbox_id))
@@ -1395,7 +1413,8 @@ echo "Session workspace setup complete"
) -> None:
"""Clean up a session workspace (on session delete).
Executes kubectl exec to remove the session directory.
Removes the ACP session mapping and executes kubectl exec to remove
the session directory. The shared ACP client persists for other sessions.
Args:
sandbox_id: The sandbox ID
@@ -1403,6 +1422,15 @@ echo "Session workspace setup complete"
nextjs_port: Optional port where Next.js server is running (unused in K8s,
we use PID file instead)
"""
# Remove the ACP session mapping (shared client persists)
session_key = (sandbox_id, session_id)
acp_session_id = self._acp_session_ids.pop(session_key, None)
if acp_session_id:
logger.info(
f"[SANDBOX-ACP] Removed ACP session mapping: "
f"session={session_id} acp_session={acp_session_id}"
)
pod_name = self._get_pod_name(str(sandbox_id))
session_path = f"/workspace/sessions/{session_id}"
@@ -1807,6 +1835,73 @@ echo "Session config regeneration complete"
)
return exec_client.health_check(timeout=timeout)
def _create_ephemeral_acp_client(self, sandbox_id: UUID) -> ACPExecClient:
"""Create a new ephemeral ACP client for a single message exchange.
Each call starts a fresh `opencode acp` process in the sandbox pod.
The process is short-lived — stopped after the message completes.
This prevents the bug where multiple long-lived processes (one per
API replica) operate on the same session's flat file storage
concurrently, causing the JSON-RPC response to be silently lost.
Args:
sandbox_id: The sandbox ID
Returns:
A running ACPExecClient (caller must stop it when done)
"""
pod_name = self._get_pod_name(str(sandbox_id))
acp_client = ACPExecClient(
pod_name=pod_name,
namespace=self._namespace,
container="sandbox",
)
acp_client.start(cwd="/workspace")
logger.info(
f"[SANDBOX-ACP] Created ephemeral ACP client: "
f"sandbox={sandbox_id} pod={pod_name} "
f"api_pod={_API_SERVER_HOSTNAME}"
)
return acp_client
def _get_or_create_acp_session(
self,
sandbox_id: UUID,
session_id: UUID,
acp_client: ACPExecClient,
) -> str:
"""Get the ACP session ID for a craft session, creating one if needed.
Uses the session mapping cache first, then falls back to
`get_or_create_session()` which handles resume from opencode's
persisted storage on disk.
Args:
sandbox_id: The sandbox ID
session_id: The craft session ID
acp_client: The ACP client for this message
Returns:
The ACP session ID
"""
session_key = (sandbox_id, session_id)
acp_session_id = self._acp_session_ids.get(session_key)
if acp_session_id and acp_session_id in acp_client.session_ids:
return acp_session_id
# Session not tracked or new process — get or create from disk
session_path = f"/workspace/sessions/{session_id}"
acp_session_id = acp_client.get_or_create_session(cwd=session_path)
self._acp_session_ids[session_key] = acp_session_id
logger.info(
f"[SANDBOX-ACP] Session mapped: "
f"craft_session={session_id} acp_session={acp_session_id}"
)
return acp_session_id
def send_message(
self,
sandbox_id: UUID,
@@ -1815,8 +1910,12 @@ echo "Session config regeneration complete"
) -> Generator[ACPEvent, None, None]:
"""Send a message to the CLI agent and stream ACP events.
Runs `opencode acp` via kubectl exec in the sandbox pod.
The agent runs in the session-specific workspace.
Creates an ephemeral `opencode acp` process for each message.
The process resumes the session from opencode's on-disk storage,
handles the prompt, then is stopped. This ensures only one process
operates on a session's flat files at a time, preventing the bug
where multiple long-lived processes (one per API replica) corrupt
each other's in-memory state.
Args:
sandbox_id: The sandbox ID
@@ -1827,67 +1926,104 @@ echo "Session config regeneration complete"
Typed ACP schema event objects
"""
packet_logger = get_packet_logger()
pod_name = self._get_pod_name(str(sandbox_id))
session_path = f"/workspace/sessions/{session_id}"
# Log ACP client creation
packet_logger.log_acp_client_start(
sandbox_id, session_id, session_path, context="k8s"
)
# Create an ephemeral ACP client for this message
acp_client = self._create_ephemeral_acp_client(sandbox_id)
exec_client = ACPExecClient(
pod_name=pod_name,
namespace=self._namespace,
container="sandbox",
)
# Log the send_message call at sandbox manager level
packet_logger.log_session_start(session_id, sandbox_id, message)
events_count = 0
try:
exec_client.start(cwd=session_path)
for event in exec_client.send_message(message):
events_count += 1
yield event
# Get or create the ACP session for this craft session
acp_session_id = self._get_or_create_acp_session(
sandbox_id, session_id, acp_client
)
# Log successful completion
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
logger.info(
f"[SANDBOX-ACP] Sending message: "
f"session={session_id} acp_session={acp_session_id} "
f"api_pod={_API_SERVER_HOSTNAME}"
)
except GeneratorExit:
# Generator was closed by consumer (client disconnect, timeout, broken pipe)
# This is the most common failure mode for SSE streaming
packet_logger.log_session_end(
session_id,
success=False,
error="GeneratorExit: Client disconnected or stream closed by consumer",
events_count=events_count,
)
raise
except Exception as e:
# Log failure from normal exceptions
packet_logger.log_session_end(
session_id,
success=False,
error=f"Exception: {str(e)}",
events_count=events_count,
)
raise
except BaseException as e:
# Log failure from other base exceptions (SystemExit, KeyboardInterrupt, etc.)
exception_type = type(e).__name__
packet_logger.log_session_end(
session_id,
success=False,
error=f"{exception_type}: {str(e) if str(e) else 'System-level interruption'}",
events_count=events_count,
)
raise
# Log the send_message call at sandbox manager level
packet_logger.log_session_start(session_id, sandbox_id, message)
events_count = 0
got_prompt_response = False
try:
for event in acp_client.send_message(
message, session_id=acp_session_id
):
events_count += 1
if isinstance(event, PromptResponse):
got_prompt_response = True
yield event
logger.info(
f"[SANDBOX-ACP] send_message completed: "
f"session={session_id} events={events_count} "
f"got_prompt_response={got_prompt_response}"
)
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
)
except GeneratorExit:
logger.warning(
f"[SANDBOX-ACP] GeneratorExit: session={session_id} "
f"events={events_count}, sending session/cancel"
)
try:
acp_client.cancel(session_id=acp_session_id)
except Exception as cancel_err:
logger.warning(
f"[SANDBOX-ACP] session/cancel failed on GeneratorExit: "
f"{cancel_err}"
)
packet_logger.log_session_end(
session_id,
success=False,
error="GeneratorExit: Client disconnected or stream closed by consumer",
events_count=events_count,
)
raise
except Exception as e:
logger.error(
f"[SANDBOX-ACP] Exception: session={session_id} "
f"events={events_count} error={e}, sending session/cancel"
)
try:
acp_client.cancel(session_id=acp_session_id)
except Exception as cancel_err:
logger.warning(
f"[SANDBOX-ACP] session/cancel failed on Exception: "
f"{cancel_err}"
)
packet_logger.log_session_end(
session_id,
success=False,
error=f"Exception: {str(e)}",
events_count=events_count,
)
raise
except BaseException as e:
logger.error(
f"[SANDBOX-ACP] {type(e).__name__}: session={session_id} "
f"error={e}"
)
packet_logger.log_session_end(
session_id,
success=False,
error=f"{type(e).__name__}: {str(e) if str(e) else 'System-level interruption'}",
events_count=events_count,
)
raise
finally:
exec_client.stop()
# Log client stop
packet_logger.log_acp_client_stop(sandbox_id, session_id, context="k8s")
# Always stop the ephemeral ACP client to kill the opencode process.
# This ensures no stale processes linger in the sandbox container.
try:
acp_client.stop()
except Exception as e:
logger.warning(
f"[SANDBOX-ACP] Failed to stop ephemeral ACP client: "
f"session={session_id} error={e}"
)
def list_directory(
self, sandbox_id: UUID, session_id: UUID, path: str

View File

@@ -0,0 +1,374 @@
"""Unit tests for Zed-style ACP session management in KubernetesSandboxManager.
These tests verify that the KubernetesSandboxManager correctly:
- Maintains one shared ACPExecClient per sandbox
- Maps craft sessions to ACP sessions on the shared client
- Replaces dead clients and re-creates sessions
- Cleans up on terminate/cleanup
All external dependencies (K8s, WebSockets, packet logging) are mocked.
"""
from collections.abc import Generator
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
from uuid import UUID
from uuid import uuid4
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
# The fully-qualified path to the module under test, used for patching
_K8S_MODULE = "onyx.server.features.build.sandbox.kubernetes.kubernetes_sandbox_manager"
_ACP_CLIENT_CLASS = f"{_K8S_MODULE}.ACPExecClient"
_GET_PACKET_LOGGER = f"{_K8S_MODULE}.get_packet_logger"
def _make_mock_event() -> MagicMock:
"""Create a mock ACP event."""
return MagicMock(name="mock_acp_event")
def _make_mock_client(
is_running: bool = True,
session_ids: list[str] | None = None,
) -> MagicMock:
"""Create a mock ACPExecClient with configurable state.
Args:
is_running: Whether the client appears running
session_ids: List of ACP session IDs the client tracks
"""
mock_client = MagicMock()
type(mock_client).is_running = property(lambda _self: is_running)
type(mock_client).session_ids = property(
lambda _self: session_ids if session_ids is not None else []
)
mock_client.start.return_value = None
mock_client.stop.return_value = None
# get_or_create_session returns a unique ACP session ID
mock_client.get_or_create_session.return_value = f"acp-session-{uuid4().hex[:8]}"
mock_event = _make_mock_event()
mock_client.send_message.return_value = iter([mock_event])
return mock_client
def _drain_generator(gen: Generator[Any, None, None]) -> list[Any]:
"""Consume a generator and return all yielded values as a list."""
return list(gen)
# ---------------------------------------------------------------------------
# Fixture: fresh KubernetesSandboxManager instance
# ---------------------------------------------------------------------------
@pytest.fixture()
def manager() -> Generator[Any, None, None]:
"""Create a fresh KubernetesSandboxManager instance with all externals mocked."""
with (
patch(f"{_K8S_MODULE}.config") as _mock_config,
patch(f"{_K8S_MODULE}.client") as _mock_k8s_client,
patch(f"{_K8S_MODULE}.k8s_stream"),
patch(_GET_PACKET_LOGGER) as mock_get_logger,
):
mock_packet_logger = MagicMock()
mock_get_logger.return_value = mock_packet_logger
_mock_config.load_incluster_config.return_value = None
_mock_config.ConfigException = Exception
_mock_k8s_client.ApiClient.return_value = MagicMock()
_mock_k8s_client.CoreV1Api.return_value = MagicMock()
_mock_k8s_client.BatchV1Api.return_value = MagicMock()
_mock_k8s_client.NetworkingV1Api.return_value = MagicMock()
from onyx.server.features.build.sandbox.kubernetes.kubernetes_sandbox_manager import (
KubernetesSandboxManager,
)
KubernetesSandboxManager._instance = None
mgr = KubernetesSandboxManager()
yield mgr
KubernetesSandboxManager._instance = None
# ---------------------------------------------------------------------------
# Tests: Shared client lifecycle
# ---------------------------------------------------------------------------
def test_send_message_creates_shared_client_on_first_call(manager: Any) -> None:
"""First call to send_message() should create one shared ACPExecClient
for the sandbox, create an ACP session, and yield events."""
sandbox_id: UUID = uuid4()
session_id: UUID = uuid4()
message = "hello world"
mock_event = _make_mock_event()
mock_client = _make_mock_client(is_running=True)
acp_session_id = "acp-session-abc"
mock_client.get_or_create_session.return_value = acp_session_id
# session_ids must include the created session for validation
type(mock_client).session_ids = property(lambda _: [acp_session_id])
mock_client.send_message.return_value = iter([mock_event])
with patch(_ACP_CLIENT_CLASS, return_value=mock_client) as MockClass:
events = _drain_generator(manager.send_message(sandbox_id, session_id, message))
# Verify shared client was constructed once
MockClass.assert_called_once()
# Verify start() was called with /workspace (not session-specific path)
mock_client.start.assert_called_once_with(cwd="/workspace")
# Verify get_or_create_session was called with the session path
expected_cwd = f"/workspace/sessions/{session_id}"
mock_client.get_or_create_session.assert_called_once_with(cwd=expected_cwd)
# Verify send_message was called with correct args
mock_client.send_message.assert_called_once_with(message, session_id=acp_session_id)
# Verify we got the event
assert mock_event in events
# Verify shared client is cached by sandbox_id
assert sandbox_id in manager._acp_clients
assert manager._acp_clients[sandbox_id] is mock_client
# Verify session mapping exists
assert (sandbox_id, session_id) in manager._acp_session_ids
assert manager._acp_session_ids[(sandbox_id, session_id)] == acp_session_id
def test_send_message_reuses_shared_client_for_same_session(manager: Any) -> None:
"""Second call with the same session should reuse the shared client
and the same ACP session ID."""
sandbox_id: UUID = uuid4()
session_id: UUID = uuid4()
mock_event_1 = _make_mock_event()
mock_event_2 = _make_mock_event()
mock_client = _make_mock_client(is_running=True)
acp_session_id = "acp-session-reuse"
mock_client.get_or_create_session.return_value = acp_session_id
type(mock_client).session_ids = property(lambda _: [acp_session_id])
mock_client.send_message.side_effect = [
iter([mock_event_1]),
iter([mock_event_2]),
]
with patch(_ACP_CLIENT_CLASS, return_value=mock_client) as MockClass:
events_1 = _drain_generator(
manager.send_message(sandbox_id, session_id, "first")
)
events_2 = _drain_generator(
manager.send_message(sandbox_id, session_id, "second")
)
# Constructor called only ONCE (shared client)
MockClass.assert_called_once()
# start() called only once
mock_client.start.assert_called_once()
# get_or_create_session called only once (second call uses cached mapping)
mock_client.get_or_create_session.assert_called_once()
# send_message called twice with same ACP session ID
assert mock_client.send_message.call_count == 2
assert mock_event_1 in events_1
assert mock_event_2 in events_2
def test_send_message_different_sessions_share_client(manager: Any) -> None:
"""Two different craft sessions on the same sandbox should share the
same ACPExecClient but have different ACP sessions."""
sandbox_id: UUID = uuid4()
session_id_a: UUID = uuid4()
session_id_b: UUID = uuid4()
mock_client = _make_mock_client(is_running=True)
acp_session_a = "acp-session-a"
acp_session_b = "acp-session-b"
mock_client.get_or_create_session.side_effect = [acp_session_a, acp_session_b]
type(mock_client).session_ids = property(lambda _: [acp_session_a, acp_session_b])
mock_event_a = _make_mock_event()
mock_event_b = _make_mock_event()
mock_client.send_message.side_effect = [
iter([mock_event_a]),
iter([mock_event_b]),
]
with patch(_ACP_CLIENT_CLASS, return_value=mock_client) as MockClass:
events_a = _drain_generator(
manager.send_message(sandbox_id, session_id_a, "msg a")
)
events_b = _drain_generator(
manager.send_message(sandbox_id, session_id_b, "msg b")
)
# Only ONE shared client was created
MockClass.assert_called_once()
# get_or_create_session called twice (once per craft session)
assert mock_client.get_or_create_session.call_count == 2
# send_message called with different ACP session IDs
mock_client.send_message.assert_any_call("msg a", session_id=acp_session_a)
mock_client.send_message.assert_any_call("msg b", session_id=acp_session_b)
# Both session mappings exist
assert manager._acp_session_ids[(sandbox_id, session_id_a)] == acp_session_a
assert manager._acp_session_ids[(sandbox_id, session_id_b)] == acp_session_b
assert mock_event_a in events_a
assert mock_event_b in events_b
def test_send_message_replaces_dead_client(manager: Any) -> None:
"""If the shared client has is_running == False, should replace it and
re-create sessions."""
sandbox_id: UUID = uuid4()
session_id: UUID = uuid4()
# Place a dead client in the cache
dead_client = _make_mock_client(is_running=False)
manager._acp_clients[sandbox_id] = dead_client
manager._acp_session_ids[(sandbox_id, session_id)] = "old-acp-session"
# Create the replacement client
new_event = _make_mock_event()
new_client = _make_mock_client(is_running=True)
new_acp_session = "new-acp-session"
new_client.get_or_create_session.return_value = new_acp_session
type(new_client).session_ids = property(lambda _: [new_acp_session])
new_client.send_message.return_value = iter([new_event])
with patch(_ACP_CLIENT_CLASS, return_value=new_client):
events = _drain_generator(manager.send_message(sandbox_id, session_id, "test"))
# Dead client was stopped during replacement
dead_client.stop.assert_called_once()
# New client was started
new_client.start.assert_called_once()
# Old session mapping was cleared, new one created
assert manager._acp_session_ids[(sandbox_id, session_id)] == new_acp_session
# Cache holds the new client
assert manager._acp_clients[sandbox_id] is new_client
assert new_event in events
# ---------------------------------------------------------------------------
# Tests: Cleanup
# ---------------------------------------------------------------------------
def test_terminate_stops_shared_client(manager: Any) -> None:
"""terminate(sandbox_id) should stop the shared client and clear
all session mappings for that sandbox."""
sandbox_id: UUID = uuid4()
session_id_1: UUID = uuid4()
session_id_2: UUID = uuid4()
mock_client = _make_mock_client(is_running=True)
manager._acp_clients[sandbox_id] = mock_client
manager._acp_session_ids[(sandbox_id, session_id_1)] = "acp-1"
manager._acp_session_ids[(sandbox_id, session_id_2)] = "acp-2"
with patch.object(manager, "_cleanup_kubernetes_resources"):
manager.terminate(sandbox_id)
# Shared client was stopped
mock_client.stop.assert_called_once()
# Client removed from cache
assert sandbox_id not in manager._acp_clients
# Session mappings removed
assert (sandbox_id, session_id_1) not in manager._acp_session_ids
assert (sandbox_id, session_id_2) not in manager._acp_session_ids
def test_terminate_leaves_other_sandbox_untouched(manager: Any) -> None:
"""terminate(sandbox_A) should NOT affect sandbox_B's client or sessions."""
sandbox_a: UUID = uuid4()
sandbox_b: UUID = uuid4()
session_a: UUID = uuid4()
session_b: UUID = uuid4()
client_a = _make_mock_client(is_running=True)
client_b = _make_mock_client(is_running=True)
manager._acp_clients[sandbox_a] = client_a
manager._acp_clients[sandbox_b] = client_b
manager._acp_session_ids[(sandbox_a, session_a)] = "acp-a"
manager._acp_session_ids[(sandbox_b, session_b)] = "acp-b"
with patch.object(manager, "_cleanup_kubernetes_resources"):
manager.terminate(sandbox_a)
# sandbox_a cleaned up
client_a.stop.assert_called_once()
assert sandbox_a not in manager._acp_clients
assert (sandbox_a, session_a) not in manager._acp_session_ids
# sandbox_b untouched
client_b.stop.assert_not_called()
assert sandbox_b in manager._acp_clients
assert manager._acp_session_ids[(sandbox_b, session_b)] == "acp-b"
def test_cleanup_session_removes_session_mapping(manager: Any) -> None:
"""cleanup_session_workspace() should remove the session mapping but
leave the shared client alive for other sessions."""
sandbox_id: UUID = uuid4()
session_id: UUID = uuid4()
mock_client = _make_mock_client(is_running=True)
manager._acp_clients[sandbox_id] = mock_client
manager._acp_session_ids[(sandbox_id, session_id)] = "acp-session-xyz"
with patch.object(manager, "_stream_core_api") as mock_stream_api:
mock_stream_api.connect_get_namespaced_pod_exec = MagicMock()
with patch(f"{_K8S_MODULE}.k8s_stream", return_value="cleanup ok"):
manager.cleanup_session_workspace(sandbox_id, session_id)
# Session mapping removed
assert (sandbox_id, session_id) not in manager._acp_session_ids
# Shared client is NOT stopped (other sessions may use it)
mock_client.stop.assert_not_called()
assert sandbox_id in manager._acp_clients
def test_cleanup_session_handles_no_mapping(manager: Any) -> None:
"""cleanup_session_workspace() should not error when there's no
session mapping."""
sandbox_id: UUID = uuid4()
session_id: UUID = uuid4()
assert (sandbox_id, session_id) not in manager._acp_session_ids
with patch.object(manager, "_stream_core_api") as mock_stream_api:
mock_stream_api.connect_get_namespaced_pod_exec = MagicMock()
with patch(f"{_K8S_MODULE}.k8s_stream", return_value="cleanup ok"):
manager.cleanup_session_workspace(sandbox_id, session_id)
assert (sandbox_id, session_id) not in manager._acp_session_ids