Compare commits

...

4 Commits

Author SHA1 Message Date
rohoswagger
9b8a6e60b7 fix(craft): resume ACP sessions across API replicas for follow-up messages
Multiple API server replicas each maintain independent in-memory ACP client
caches. When a follow-up message is routed to a different replica, it creates
a new opencode session with no conversation context.

Fix: After initializing a new opencode ACP process, try session/list (filtered
by workspace cwd) to discover existing sessions from previous processes, then
session/resume to restore conversation context. Falls back to session/new if
the agent doesn't support these methods or no sessions exist.

Also adds api_pod hostname to SANDBOX-ACP log lines for replica identification.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 17:20:43 -08:00
rohoswagger
dd9d201b51 fix(craft): add extensive diagnostic logging for ACP follow-up messages
- Add [ACP-LIFECYCLE] logs for client start/stop with session IDs
- Add [ACP-READER] logs for every message read from WebSocket with
  update_type, queue size, and ACP session ID
- Add [ACP-SEND] logs for every dequeued message with prompt number,
  completion reason tracking, and queue state
- Add [SANDBOX-ACP] logs for cache hit/miss decisions and PromptResponse
  tracking in the sandbox manager
- Add stderr reading in reader thread to catch opencode errors
- Add queue drain at start of each send_message() to clear stale messages
- Track prompt_count per client to identify 1st vs 2nd+ prompts
- Log completion_reason (jsonrpc_response, notification, timeout, etc.)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 16:49:57 -08:00
rohoswagger
c545819aa6 fix(craft): harden ACP response matching for cached sessions
The cached ACPExecClient could emit keepalives forever on follow-up
messages because the PromptResponse was never matched. This adds:

- ID type-mismatch tolerance (str fallback for int/str id comparison)
- Guard against agent request ID collision (require no "method" field)
- PromptResponse via session/update notification handler
- Reader thread health check (detect dead WebSocket, stop keepalives)
- Buffer flush on reader thread exit (catch trailing PromptResponse)
- Diagnostic logging for every dequeued message and dropped messages

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 15:45:39 -08:00
rohoswagger
960ee228bf fix(craft): cache ACPExecClient in K8s sandbox to fix follow-up messages 2026-02-15 11:14:10 -08:00
7 changed files with 1013 additions and 79 deletions

View File

@@ -144,6 +144,7 @@ class ACPExecClient:
self._reader_thread: threading.Thread | None = None
self._stop_reader = threading.Event()
self._k8s_client: client.CoreV1Api | None = None
self._prompt_count: int = 0 # Track how many prompts sent on this client
def _get_k8s_client(self) -> client.CoreV1Api:
"""Get or create kubernetes client."""
@@ -176,6 +177,11 @@ class ACPExecClient:
# Start opencode acp via exec
exec_command = ["opencode", "acp", "--cwd", cwd]
logger.info(
f"[ACP-LIFECYCLE] Starting ACP client: pod={self._pod_name}, "
f"namespace={self._namespace}, cwd={cwd}"
)
try:
self._ws_client = k8s_stream(
k8s.connect_get_namespaced_pod_exec,
@@ -204,12 +210,29 @@ class ACPExecClient:
# Initialize ACP connection
self._initialize(timeout=timeout)
# Create session
session_id = self._create_session(cwd=cwd, timeout=timeout)
# Try to resume an existing session first (handles multi-replica).
# When multiple API server replicas connect to the same sandbox
# pod, a previous replica may have already created a session for
# this workspace. Resuming preserves conversation context.
session_id = self._try_resume_existing_session(cwd, timeout)
resumed = session_id is not None
if not session_id:
# No existing session found — create a new one
session_id = self._create_session(cwd=cwd, timeout=timeout)
logger.info(
f"[ACP-LIFECYCLE] ACP client started successfully: "
f"pod={self._pod_name}, acp_session_id={session_id}, "
f"cwd={cwd}, resumed={resumed}"
)
return session_id
except Exception as e:
logger.error(
f"[ACP-LIFECYCLE] ACP client start FAILED: "
f"pod={self._pod_name}, error={e}"
)
self.stop()
raise RuntimeError(f"Failed to start ACP exec client: {e}") from e
@@ -217,63 +240,141 @@ class ACPExecClient:
"""Background thread to read responses from the exec stream."""
buffer = ""
packet_logger = get_packet_logger()
messages_read = 0
while not self._stop_reader.is_set():
if self._ws_client is None:
break
logger.info(f"[ACP-READER] Reader thread started for pod={self._pod_name}")
try:
if self._ws_client.is_open():
# Read available data
self._ws_client.update(timeout=0.1)
# Read stdout (channel 1)
data = self._ws_client.read_stdout(timeout=0.1)
if data:
buffer += data
# Process complete lines
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if line:
try:
message = json.loads(line)
# Log the raw incoming message
packet_logger.log_jsonrpc_raw_message(
"IN", message, context="k8s"
)
self._response_queue.put(message)
except json.JSONDecodeError:
packet_logger.log_raw(
"JSONRPC-PARSE-ERROR-K8S",
{
"raw_line": line[:500],
"error": "JSON decode failed",
},
)
logger.warning(
f"Invalid JSON from agent: {line[:100]}"
)
else:
packet_logger.log_raw(
"K8S-WEBSOCKET-CLOSED",
{"pod": self._pod_name, "namespace": self._namespace},
)
try:
while not self._stop_reader.is_set():
if self._ws_client is None:
logger.warning("[ACP-READER] WebSocket client is None, exiting")
break
except Exception as e:
if not self._stop_reader.is_set():
packet_logger.log_raw(
"K8S-READER-ERROR",
{"error": str(e), "pod": self._pod_name},
try:
if self._ws_client.is_open():
# Read available data
self._ws_client.update(timeout=0.1)
# Read stderr (channel 2) - log any agent errors
stderr_data = self._ws_client.read_stderr(timeout=0.01)
if stderr_data:
logger.warning(
f"[ACP-STDERR] pod={self._pod_name}: "
f"{stderr_data.strip()[:500]}"
)
# Read stdout (channel 1)
data = self._ws_client.read_stdout(timeout=0.1)
if data:
buffer += data
# Process complete lines
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if line:
try:
message = json.loads(line)
messages_read += 1
# Log the raw incoming message
packet_logger.log_jsonrpc_raw_message(
"IN", message, context="k8s"
)
# Log key fields for every message
msg_id = message.get("id")
msg_method = message.get("method")
update_type = None
if msg_method == "session/update":
update = message.get("params", {}).get(
"update", {}
)
update_type = update.get("sessionUpdate")
acp_sid = (
self._state.current_session.session_id
if self._state.current_session
else "none"
)
logger.info(
f"[ACP-READER] #{messages_read} "
f"id={msg_id} method={msg_method} "
f"update_type={update_type} "
f"queue={self._response_queue.qsize()} "
f"acp_session={acp_sid}"
)
self._response_queue.put(message)
except json.JSONDecodeError:
packet_logger.log_raw(
"JSONRPC-PARSE-ERROR-K8S",
{
"raw_line": line[:500],
"error": "JSON decode failed",
},
)
logger.warning(
f"Invalid JSON from agent: {line[:100]}"
)
else:
logger.warning(
f"[ACP-READER] WebSocket closed: pod={self._pod_name}, "
f"total_messages_read={messages_read}"
)
packet_logger.log_raw(
"K8S-WEBSOCKET-CLOSED",
{"pod": self._pod_name, "namespace": self._namespace},
)
break
except Exception as e:
if not self._stop_reader.is_set():
logger.warning(
f"[ACP-READER] Error: {e}, pod={self._pod_name}, "
f"total_messages_read={messages_read}"
)
packet_logger.log_raw(
"K8S-READER-ERROR",
{"error": str(e), "pod": self._pod_name},
)
break
finally:
# Flush any remaining data in buffer (e.g., PromptResponse without
# trailing newline when the WebSocket closes)
remaining = buffer.strip()
if remaining:
try:
message = json.loads(remaining)
packet_logger.log_jsonrpc_raw_message(
"IN", message, context="k8s-flush"
)
logger.debug(f"Reader error: {e}")
break
self._response_queue.put(message)
logger.info(
f"[ACP-READER] Flushed remaining buffer: "
f"id={message.get('id')} method={message.get('method')}"
)
except json.JSONDecodeError:
packet_logger.log_raw(
"K8S-BUFFER-FLUSH-FAILED",
{"remaining": remaining[:500]},
)
logger.info(
f"[ACP-READER] Reader thread exiting: pod={self._pod_name}, "
f"total_messages_read={messages_read}, "
f"queue_size={self._response_queue.qsize()}"
)
def stop(self) -> None:
"""Stop the exec session and clean up."""
acp_session = (
self._state.current_session.session_id
if self._state.current_session
else "none"
)
logger.info(
f"[ACP-LIFECYCLE] Stopping ACP client: pod={self._pod_name} "
f"acp_session={acp_session} prompts_sent={self._prompt_count} "
f"queue_size={self._response_queue.qsize()}"
)
self._stop_reader.set()
if self._ws_client is not None:
@@ -288,6 +389,10 @@ class ACPExecClient:
self._reader_thread = None
self._state = ACPClientState()
logger.info(
f"[ACP-LIFECYCLE] ACP client stopped: pod={self._pod_name} "
f"acp_session={acp_session}"
)
def _get_next_id(self) -> int:
"""Get the next request ID."""
@@ -404,6 +509,126 @@ class ACPExecClient:
return session_id
def _list_sessions(self, cwd: str, timeout: float = 10.0) -> list[dict[str, Any]]:
"""List available ACP sessions, filtered by working directory.
Returns:
List of session info dicts with keys like 'sessionId', 'cwd', 'title'.
Empty list if session/list is not supported or fails.
"""
try:
request_id = self._send_request("session/list", {"cwd": cwd})
result = self._wait_for_response(request_id, timeout)
sessions = result.get("sessions", [])
logger.info(
f"[ACP-LIFECYCLE] session/list returned {len(sessions)} sessions "
f"for cwd={cwd} pod={self._pod_name}"
)
return sessions
except Exception as e:
logger.info(
f"[ACP-LIFECYCLE] session/list failed (may not be supported): "
f"{e} pod={self._pod_name}"
)
return []
def _resume_session(self, session_id: str, cwd: str, timeout: float = 30.0) -> str:
"""Resume an existing ACP session.
Args:
session_id: The ACP session ID to resume
cwd: Working directory for the session
timeout: Timeout for the resume request
Returns:
The session ID
Raises:
RuntimeError: If resume fails
"""
params = {
"sessionId": session_id,
"cwd": cwd,
"mcpServers": [],
}
request_id = self._send_request("session/resume", params)
result = self._wait_for_response(request_id, timeout)
# The response should contain the session ID
resumed_id = result.get("sessionId", session_id)
self._state.current_session = ACPSession(session_id=resumed_id, cwd=cwd)
logger.info(
f"[ACP-LIFECYCLE] Resumed session: acp_session={resumed_id} "
f"cwd={cwd} pod={self._pod_name}"
)
return resumed_id
def _try_resume_existing_session(self, cwd: str, timeout: float) -> str | None:
"""Try to find and resume an existing session for this workspace.
When multiple API server replicas connect to the same sandbox pod,
a previous replica may have already created an ACP session for this
workspace. This method discovers and resumes that session so the
agent retains conversation context.
Args:
cwd: Working directory to search for sessions
timeout: Timeout for ACP requests
Returns:
The resumed session ID, or None if no session could be resumed
"""
# Check if the agent supports session/list + session/resume
session_caps = self._state.agent_capabilities.get("sessionCapabilities", {})
supports_list = session_caps.get("list") is not None
supports_resume = session_caps.get("resume") is not None
if not supports_list:
logger.info(
"[ACP-LIFECYCLE] Agent does not support session/list, "
"skipping session resume"
)
return None
if not supports_resume:
logger.info(
"[ACP-LIFECYCLE] Agent does not support session/resume, "
"skipping session resume"
)
return None
# List sessions for this workspace directory
sessions = self._list_sessions(cwd, timeout=min(timeout, 10.0))
if not sessions:
return None
# Pick the most recent session (first in list, assuming sorted)
target = sessions[0]
target_id = target.get("sessionId")
if not target_id:
logger.warning(
"[ACP-LIFECYCLE] session/list returned session without sessionId"
)
return None
logger.info(
f"[ACP-LIFECYCLE] Found {len(sessions)} existing session(s), "
f"attempting resume of acp_session={target_id} "
f"title={target.get('title')} pod={self._pod_name}"
)
try:
return self._resume_session(target_id, cwd, timeout)
except Exception as e:
logger.warning(
f"[ACP-LIFECYCLE] session/resume failed for "
f"acp_session={target_id}: {e}, "
f"falling back to session/new"
)
return None
def send_message(
self,
message: str,
@@ -423,6 +648,51 @@ class ACPExecClient:
session_id = self._state.current_session.session_id
packet_logger = get_packet_logger()
self._prompt_count += 1
prompt_num = self._prompt_count
# Check WebSocket and reader thread health before sending
ws_open = self._ws_client is not None and self._ws_client.is_open()
reader_alive = (
self._reader_thread is not None and self._reader_thread.is_alive()
)
queue_size_before = self._response_queue.qsize()
logger.info(
f"[ACP-SEND] === Prompt #{prompt_num} START === "
f"acp_session={session_id} pod={self._pod_name} "
f"ws_open={ws_open} reader_alive={reader_alive} "
f"queue_size_before_drain={queue_size_before}"
)
# Drain any leftover messages from the queue before sending prompt.
# These are messages that arrived between the previous prompt's
# completion and this prompt's start (e.g., session_info_update,
# available_commands_update).
drained_count = 0
while not self._response_queue.empty():
try:
stale_msg = self._response_queue.get_nowait()
drained_count += 1
stale_method = stale_msg.get("method")
stale_id = stale_msg.get("id")
stale_update_type = None
if stale_method == "session/update":
stale_update = stale_msg.get("params", {}).get("update", {})
stale_update_type = stale_update.get("sessionUpdate")
logger.info(
f"[ACP-SEND] Drained stale message #{drained_count}: "
f"id={stale_id} method={stale_method} "
f"update_type={stale_update_type}"
)
except Empty:
break
if drained_count > 0:
logger.info(
f"[ACP-SEND] Drained {drained_count} stale messages from queue "
f"before prompt #{prompt_num}"
)
# Log the start of message processing
packet_logger.log_raw(
@@ -431,10 +701,14 @@ class ACPExecClient:
"session_id": session_id,
"pod": self._pod_name,
"namespace": self._namespace,
"prompt_num": prompt_num,
"message_preview": (
message[:200] + "..." if len(message) > 200 else message
),
"timeout": timeout,
"queue_drained": drained_count,
"ws_open": ws_open,
"reader_alive": reader_alive,
},
)
@@ -445,18 +719,35 @@ class ACPExecClient:
}
request_id = self._send_request("session/prompt", params)
logger.info(
f"[ACP-SEND] Sent session/prompt: request_id={request_id} "
f"acp_session={session_id} prompt_num={prompt_num}"
)
start_time = time.time()
last_event_time = time.time() # Track time since last event for keepalive
last_event_time = time.time()
events_yielded = 0
messages_processed = 0
completion_reason = "unknown"
while True:
remaining = timeout - (time.time() - start_time)
if remaining <= 0:
completion_reason = "timeout"
logger.warning(
f"[ACP-SEND] TIMEOUT: prompt #{prompt_num} "
f"acp_session={session_id} request_id={request_id} "
f"elapsed_ms={(time.time() - start_time) * 1000:.0f} "
f"events_yielded={events_yielded} "
f"messages_processed={messages_processed}"
)
packet_logger.log_raw(
"ACP-TIMEOUT-K8S",
{
"session_id": session_id,
"prompt_num": prompt_num,
"elapsed_ms": (time.time() - start_time) * 1000,
"events_yielded": events_yielded,
"messages_processed": messages_processed,
},
)
yield Error(code=-1, message="Timeout waiting for response")
@@ -464,26 +755,124 @@ class ACPExecClient:
try:
message_data = self._response_queue.get(timeout=min(remaining, 1.0))
last_event_time = time.time() # Reset keepalive timer on event
last_event_time = time.time()
messages_processed += 1
# Log every dequeued message with comprehensive detail
msg_id = message_data.get("id")
msg_method = message_data.get("method")
update_type = None
if msg_method == "session/update":
update = message_data.get("params", {}).get("update", {})
update_type = update.get("sessionUpdate")
logger.info(
f"[ACP-SEND] Dequeued #{messages_processed}: "
f"id={msg_id}({type(msg_id).__name__}) "
f"method={msg_method} update_type={update_type} "
f"request_id={request_id} id_match={msg_id == request_id} "
f"acp_session={session_id} prompt_num={prompt_num} "
f"queue_remaining={self._response_queue.qsize()}"
)
except Empty:
# Check if reader thread is still alive
if (
self._reader_thread is not None
and not self._reader_thread.is_alive()
):
completion_reason = "reader_thread_dead"
# Drain any final messages the reader flushed before dying
found_response = False
while not self._response_queue.empty():
try:
final_msg = self._response_queue.get_nowait()
logger.info(
f"[ACP-SEND] Final drain: id={final_msg.get('id')} "
f"method={final_msg.get('method')}"
)
if final_msg.get("id") == request_id:
found_response = True
if "error" in final_msg:
error_data = final_msg["error"]
yield Error(
code=error_data.get("code", -1),
message=error_data.get(
"message", "Unknown error"
),
)
else:
result = final_msg.get("result", {})
try:
yield PromptResponse.model_validate(result)
except ValidationError:
pass
break
except Empty:
break
logger.warning(
f"[ACP-SEND] Reader thread DEAD: prompt #{prompt_num} "
f"acp_session={session_id} request_id={request_id} "
f"found_response={found_response} "
f"events_yielded={events_yielded} "
f"messages_processed={messages_processed}"
)
packet_logger.log_raw(
"ACP-CONNECTION-LOST-K8S",
{
"session_id": session_id,
"prompt_num": prompt_num,
"events_yielded": events_yielded,
"found_response_in_drain": found_response,
},
)
break
# Check if we need to send an SSE keepalive
idle_time = time.time() - last_event_time
if idle_time >= SSE_KEEPALIVE_INTERVAL:
logger.info(
f"[ACP-SEND] SSE keepalive: prompt #{prompt_num} "
f"acp_session={session_id} idle={idle_time:.1f}s "
f"elapsed={(time.time() - start_time):.1f}s "
f"events_yielded={events_yielded} "
f"messages_processed={messages_processed} "
f"ws_open={self._ws_client is not None and self._ws_client.is_open()} "
f"reader_alive={self._reader_thread is not None and self._reader_thread.is_alive()}"
)
packet_logger.log_raw(
"SSE-KEEPALIVE-YIELD",
{
"session_id": session_id,
"prompt_num": prompt_num,
"idle_seconds": idle_time,
},
)
yield SSEKeepalive()
last_event_time = time.time() # Reset after yielding keepalive
last_event_time = time.time()
continue
# Check for response to our prompt request
if message_data.get("id") == request_id:
# Check for JSON-RPC response to our prompt request.
msg_id = message_data.get("id")
is_response = "method" not in message_data and (
msg_id == request_id
or (msg_id is not None and str(msg_id) == str(request_id))
)
if is_response and msg_id != request_id:
logger.warning(
f"[ACP-SEND] ID type mismatch: "
f"got {type(msg_id).__name__}({msg_id}), "
f"expected {type(request_id).__name__}({request_id})"
)
if is_response:
completion_reason = "jsonrpc_response"
if "error" in message_data:
error_data = message_data["error"]
completion_reason = "jsonrpc_error"
logger.warning(
f"[ACP-SEND] JSON-RPC ERROR response: prompt #{prompt_num} "
f"acp_session={session_id} request_id={request_id} "
f"error={error_data}"
)
packet_logger.log_jsonrpc_response(
request_id, error=error_data, context="k8s"
)
@@ -493,6 +882,13 @@ class ACPExecClient:
)
else:
result = message_data.get("result", {})
logger.info(
f"[ACP-SEND] PromptResponse via JSON-RPC: "
f"prompt #{prompt_num} acp_session={session_id} "
f"request_id={request_id} "
f"stop_reason={result.get('stopReason')} "
f"result_keys={list(result.keys())}"
)
packet_logger.log_jsonrpc_response(
request_id, result=result, context="k8s"
)
@@ -504,19 +900,32 @@ class ACPExecClient:
events_yielded += 1
yield prompt_response
except ValidationError as e:
logger.error(
f"[ACP-SEND] PromptResponse VALIDATION FAILED: "
f"prompt #{prompt_num} error={e} result={result}"
)
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"type": "prompt_response", "error": str(e)},
)
# Log completion summary
# Log completion
elapsed_ms = (time.time() - start_time) * 1000
logger.info(
f"[ACP-SEND] === Prompt #{prompt_num} COMPLETE === "
f"reason={completion_reason} acp_session={session_id} "
f"request_id={request_id} events={events_yielded} "
f"messages={messages_processed} elapsed={elapsed_ms:.0f}ms"
)
packet_logger.log_raw(
"ACP-SEND-MESSAGE-COMPLETE-K8S",
{
"session_id": session_id,
"prompt_num": prompt_num,
"events_yielded": events_yielded,
"messages_processed": messages_processed,
"elapsed_ms": elapsed_ms,
"completion_reason": completion_reason,
},
)
break
@@ -525,26 +934,60 @@ class ACPExecClient:
if message_data.get("method") == "session/update":
params_data = message_data.get("params", {})
update = params_data.get("update", {})
update_type_val = update.get("sessionUpdate")
# Log the notification
packet_logger.log_jsonrpc_notification(
"session/update",
{"update_type": update.get("sessionUpdate")},
{"update_type": update_type_val, "prompt_num": prompt_num},
context="k8s",
)
prompt_complete = False
for event in self._process_session_update(update):
events_yielded += 1
# Log each yielded event
event_type = self._get_event_type_name(event)
packet_logger.log_acp_event_yielded(event_type, event)
yield event
if isinstance(event, PromptResponse):
prompt_complete = True
break
if prompt_complete:
completion_reason = "prompt_response_via_notification"
elapsed_ms = (time.time() - start_time) * 1000
logger.info(
f"[ACP-SEND] === Prompt #{prompt_num} COMPLETE === "
f"reason={completion_reason} acp_session={session_id} "
f"request_id={request_id} events={events_yielded} "
f"messages={messages_processed} elapsed={elapsed_ms:.0f}ms"
)
packet_logger.log_raw(
"ACP-SEND-MESSAGE-COMPLETE-K8S",
{
"session_id": session_id,
"prompt_num": prompt_num,
"events_yielded": events_yielded,
"messages_processed": messages_processed,
"elapsed_ms": elapsed_ms,
"completion_reason": completion_reason,
},
)
break
# Handle requests from agent - send error response
elif "method" in message_data and "id" in message_data:
logger.info(
f"[ACP-SEND] Agent request (unsupported): "
f"method={message_data['method']} id={message_data['id']} "
f"prompt_num={prompt_num} acp_session={session_id}"
)
packet_logger.log_raw(
"ACP-UNSUPPORTED-REQUEST-K8S",
{"method": message_data["method"], "id": message_data["id"]},
{
"method": message_data["method"],
"id": message_data["id"],
"prompt_num": prompt_num,
},
)
self._send_error_response(
message_data["id"],
@@ -552,6 +995,18 @@ class ACPExecClient:
f"Method not supported: {message_data['method']}",
)
else:
# Message didn't match any handler
logger.warning(
f"[ACP-SEND] UNHANDLED message: "
f"id={message_data.get('id')} "
f"method={message_data.get('method')} "
f"keys={list(message_data.keys())} "
f"request_id={request_id} prompt_num={prompt_num} "
f"acp_session={session_id} "
f"raw_preview={json.dumps(message_data)[:300]}"
)
def _get_event_type_name(self, event: ACPEvent) -> str:
"""Get the type name for an ACP event."""
if isinstance(event, AgentMessageChunk):
@@ -641,6 +1096,20 @@ class ACPExecClient:
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "prompt_response":
# Some ACP versions send PromptResponse as a session/update notification
# rather than (or in addition to) a JSON-RPC response.
logger.info(
"[ACP] Received prompt_response via session/update notification"
)
try:
yield PromptResponse.model_validate(update)
except ValidationError as e:
packet_logger.log_raw(
"ACP-VALIDATION-ERROR-K8S",
{"update_type": update_type, "error": str(e), "update": update},
)
elif update_type == "available_commands_update":
# Skip command updates
packet_logger.log_raw(

View File

@@ -50,6 +50,7 @@ from pathlib import Path
from uuid import UUID
from uuid import uuid4
from acp.schema import PromptResponse
from kubernetes import client # type: ignore
from kubernetes import config
from kubernetes.client.rest import ApiException # type: ignore
@@ -97,6 +98,10 @@ from onyx.utils.logger import setup_logger
logger = setup_logger()
# API server pod hostname — used to identify which replica is handling a request.
# In K8s, HOSTNAME is set to the pod name (e.g., "api-server-dpgg7").
_API_SERVER_HOSTNAME = os.environ.get("HOSTNAME", "unknown")
# Constants for pod configuration
# Note: Next.js ports are dynamically allocated from SANDBOX_NEXTJS_PORT_START to
# SANDBOX_NEXTJS_PORT_END range, with one port per session.
@@ -353,6 +358,10 @@ class KubernetesSandboxManager(SandboxManager):
self._agent_instructions_template_path = build_dir / "AGENTS.template.md"
self._skills_path = Path(__file__).parent / "docker" / "skills"
# Track ACP exec clients in memory - keyed by (sandbox_id, session_id) tuple
# Each session within a sandbox has its own ACP client (WebSocket connection)
self._acp_clients: dict[tuple[UUID, UUID], ACPExecClient] = {}
logger.info(
f"KubernetesSandboxManager initialized: "
f"namespace={self._namespace}, image={self._image}"
@@ -1161,6 +1170,20 @@ done
Args:
sandbox_id: The sandbox ID to terminate
"""
# Stop all ACP clients for this sandbox (keyed by (sandbox_id, session_id))
clients_to_stop = [
(key, cl) for key, cl in self._acp_clients.items() if key[0] == sandbox_id
]
for key, cl in clients_to_stop:
try:
cl.stop()
del self._acp_clients[key]
except Exception as e:
logger.warning(
f"Failed to stop ACP client for sandbox {sandbox_id}, "
f"session {key[1]}: {e}"
)
# Clean up Kubernetes resources (needs string for pod/service names)
self._cleanup_kubernetes_resources(str(sandbox_id))
@@ -1403,6 +1426,18 @@ echo "Session workspace setup complete"
nextjs_port: Optional port where Next.js server is running (unused in K8s,
we use PID file instead)
"""
# Stop ACP client for this session
client_key = (sandbox_id, session_id)
acp_client = self._acp_clients.pop(client_key, None)
if acp_client:
try:
acp_client.stop()
logger.debug(f"Stopped ACP client for session {session_id}")
except Exception as e:
logger.warning(
f"Failed to stop ACP client for session {session_id}: {e}"
)
pod_name = self._get_pod_name(str(sandbox_id))
session_path = f"/workspace/sessions/{session_id}"
@@ -1830,34 +1865,100 @@ echo "Session config regeneration complete"
pod_name = self._get_pod_name(str(sandbox_id))
session_path = f"/workspace/sessions/{session_id}"
# Log ACP client creation
packet_logger.log_acp_client_start(
sandbox_id, session_id, session_path, context="k8s"
)
# Get or create ACP client for this session
client_key = (sandbox_id, session_id)
client = self._acp_clients.get(client_key)
total_cached = len(self._acp_clients)
exec_client = ACPExecClient(
pod_name=pod_name,
namespace=self._namespace,
container="sandbox",
)
if client is None or not client.is_running:
cache_reason = "not_found" if client is None else "not_running"
# Clean up stale client if it exists but is no longer running
if client is not None:
logger.info(
f"[SANDBOX-ACP] Cleaning up stale client: "
f"sandbox={sandbox_id} session={session_id} "
f"is_running={client.is_running} "
f"acp_session={client.session_id} "
f"api_pod={_API_SERVER_HOSTNAME}"
)
try:
client.stop()
except Exception:
pass
# Log ACP client creation
packet_logger.log_acp_client_start(
sandbox_id, session_id, session_path, context="k8s"
)
logger.info(
f"[SANDBOX-ACP] Creating NEW ACP client: "
f"sandbox={sandbox_id} session={session_id} "
f"reason={cache_reason} pod={pod_name} cwd={session_path} "
f"total_cached_clients={total_cached} "
f"api_pod={_API_SERVER_HOSTNAME}"
)
# Create and start ACP client for this session.
# start() will try to resume an existing session from the pod
# (handles multi-replica: another API pod may have created
# the session earlier).
client = ACPExecClient(
pod_name=pod_name,
namespace=self._namespace,
container="sandbox",
)
client.start(cwd=session_path)
self._acp_clients[client_key] = client
logger.info(
f"[SANDBOX-ACP] ACP client created and cached: "
f"sandbox={sandbox_id} session={session_id} "
f"acp_session={client.session_id} pod={pod_name} "
f"total_cached_clients={len(self._acp_clients)} "
f"api_pod={_API_SERVER_HOSTNAME}"
)
else:
logger.info(
f"[SANDBOX-ACP] REUSING cached ACP client: "
f"sandbox={sandbox_id} session={session_id} "
f"acp_session={client.session_id} pod={pod_name} "
f"is_running={client.is_running} "
f"total_cached_clients={total_cached} "
f"api_pod={_API_SERVER_HOSTNAME}"
)
# Log the send_message call at sandbox manager level
packet_logger.log_session_start(session_id, sandbox_id, message)
events_count = 0
got_prompt_response = False
try:
exec_client.start(cwd=session_path)
for event in exec_client.send_message(message):
for event in client.send_message(message):
events_count += 1
if isinstance(event, PromptResponse):
got_prompt_response = True
yield event
# Log successful completion
logger.info(
f"[SANDBOX-ACP] send_message completed: "
f"sandbox={sandbox_id} session={session_id} "
f"acp_session={client.session_id} "
f"events_count={events_count} "
f"got_prompt_response={got_prompt_response}"
)
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
)
except GeneratorExit:
# Generator was closed by consumer (client disconnect, timeout, broken pipe)
# This is the most common failure mode for SSE streaming
logger.warning(
f"[SANDBOX-ACP] GeneratorExit during send_message: "
f"sandbox={sandbox_id} session={session_id} "
f"acp_session={client.session_id} "
f"events_count={events_count} "
f"got_prompt_response={got_prompt_response}"
)
packet_logger.log_session_end(
session_id,
success=False,
@@ -1866,7 +1967,12 @@ echo "Session config regeneration complete"
)
raise
except Exception as e:
# Log failure from normal exceptions
logger.error(
f"[SANDBOX-ACP] Exception during send_message: "
f"sandbox={sandbox_id} session={session_id} "
f"acp_session={client.session_id} "
f"events_count={events_count} error={e}"
)
packet_logger.log_session_end(
session_id,
success=False,
@@ -1875,8 +1981,11 @@ echo "Session config regeneration complete"
)
raise
except BaseException as e:
# Log failure from other base exceptions (SystemExit, KeyboardInterrupt, etc.)
exception_type = type(e).__name__
logger.error(
f"[SANDBOX-ACP] {exception_type} during send_message: "
f"sandbox={sandbox_id} session={session_id} error={e}"
)
packet_logger.log_session_end(
session_id,
success=False,
@@ -1884,10 +1993,6 @@ echo "Session config regeneration complete"
events_count=events_count,
)
raise
finally:
exec_client.stop()
# Log client stop
packet_logger.log_acp_client_stop(sandbox_id, session_id, context="k8s")
def list_directory(
self, sandbox_id: UUID, session_id: UUID, path: str

View File

@@ -0,0 +1,360 @@
"""Unit tests for ACPExecClient caching behavior in KubernetesSandboxManager.
These tests verify that the KubernetesSandboxManager correctly caches
ACPExecClient instances per (sandbox_id, session_id) pair, reuses them
across send_message calls, replaces dead clients, and cleans them up
on terminate/cleanup.
All external dependencies (K8s, WebSockets, packet logging) are mocked.
"""
from collections.abc import Generator
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import patch
from uuid import UUID
from uuid import uuid4
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
# The fully-qualified path to the module under test, used for patching
_K8S_MODULE = "onyx.server.features.build.sandbox.kubernetes.kubernetes_sandbox_manager"
_ACP_CLIENT_CLASS = f"{_K8S_MODULE}.ACPExecClient"
_GET_PACKET_LOGGER = f"{_K8S_MODULE}.get_packet_logger"
def _make_mock_event() -> MagicMock:
"""Create a mock ACP event."""
return MagicMock(name="mock_acp_event")
def _make_mock_client(is_running: bool = True) -> MagicMock:
"""Create a mock ACPExecClient with configurable is_running property."""
mock_client = MagicMock()
type(mock_client).is_running = property(lambda _self: is_running)
mock_client.start.return_value = "mock-session-id"
mock_event = _make_mock_event()
mock_client.send_message.return_value = iter([mock_event])
mock_client.stop.return_value = None
return mock_client
def _drain_generator(gen: Generator[Any, None, None]) -> list[Any]:
"""Consume a generator and return all yielded values as a list."""
return list(gen)
# ---------------------------------------------------------------------------
# Fixture: fresh KubernetesSandboxManager instance
# ---------------------------------------------------------------------------
@pytest.fixture()
def manager() -> Generator[Any, None, None]:
"""Create a fresh KubernetesSandboxManager instance with all externals mocked.
This fixture:
1. Resets the singleton _instance so each test gets a fresh manager
2. Mocks kubernetes.config and kubernetes.client to prevent real K8s calls
3. Mocks get_packet_logger to prevent logging side effects
"""
# Import here so patches are in effect when the class loads
with (
patch(f"{_K8S_MODULE}.config") as _mock_config,
patch(f"{_K8S_MODULE}.client") as _mock_k8s_client,
patch(f"{_K8S_MODULE}.k8s_stream"),
patch(_GET_PACKET_LOGGER) as mock_get_logger,
):
# Set up the mock packet logger
mock_packet_logger = MagicMock()
mock_get_logger.return_value = mock_packet_logger
# Make config.load_incluster_config succeed (no-op)
_mock_config.load_incluster_config.return_value = None
_mock_config.ConfigException = Exception
# Make client constructors return mocks
_mock_k8s_client.ApiClient.return_value = MagicMock()
_mock_k8s_client.CoreV1Api.return_value = MagicMock()
_mock_k8s_client.BatchV1Api.return_value = MagicMock()
_mock_k8s_client.NetworkingV1Api.return_value = MagicMock()
# Reset singleton before importing
from onyx.server.features.build.sandbox.kubernetes.kubernetes_sandbox_manager import (
KubernetesSandboxManager,
)
KubernetesSandboxManager._instance = None
mgr = KubernetesSandboxManager()
# Ensure the _acp_clients dict exists (it should be initialized by
# the caching implementation)
if not hasattr(mgr, "_acp_clients"):
mgr._acp_clients = {}
yield mgr
# Reset singleton after test
KubernetesSandboxManager._instance = None
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_send_message_creates_client_on_first_call(manager: Any) -> None:
"""First call to send_message() should create a new ACPExecClient,
call start(), cache it, and yield events from send_message()."""
sandbox_id: UUID = uuid4()
session_id: UUID = uuid4()
message = "hello world"
mock_event = _make_mock_event()
mock_client = _make_mock_client(is_running=True)
mock_client.send_message.return_value = iter([mock_event])
with patch(_ACP_CLIENT_CLASS, return_value=mock_client) as MockClass:
events = _drain_generator(manager.send_message(sandbox_id, session_id, message))
# Verify client was constructed
MockClass.assert_called_once()
# Verify start() was called with the correct session path
expected_cwd = f"/workspace/sessions/{session_id}"
mock_client.start.assert_called_once_with(cwd=expected_cwd)
# Verify send_message was called on the client
mock_client.send_message.assert_called_once_with(message)
# Verify we got the event
assert len(events) >= 1
# Find our mock event (filter out any SSEKeepalive or similar)
assert mock_event in events
# Verify client was cached
client_key = (sandbox_id, session_id)
assert client_key in manager._acp_clients
assert manager._acp_clients[client_key] is mock_client
def test_send_message_reuses_cached_client(manager: Any) -> None:
"""Second call with the same (sandbox_id, session_id) should NOT create
a new client. Should reuse the cached one."""
sandbox_id: UUID = uuid4()
session_id: UUID = uuid4()
message_1 = "first message"
message_2 = "second message"
mock_event_1 = _make_mock_event()
mock_event_2 = _make_mock_event()
mock_client = _make_mock_client(is_running=True)
# send_message returns different events for each call
mock_client.send_message.side_effect = [
iter([mock_event_1]),
iter([mock_event_2]),
]
with patch(_ACP_CLIENT_CLASS, return_value=mock_client) as MockClass:
events_1 = _drain_generator(
manager.send_message(sandbox_id, session_id, message_1)
)
events_2 = _drain_generator(
manager.send_message(sandbox_id, session_id, message_2)
)
# Constructor called only ONCE (on first send_message)
MockClass.assert_called_once()
# start() called only once
mock_client.start.assert_called_once()
# send_message called twice with different messages
assert mock_client.send_message.call_count == 2
mock_client.send_message.assert_any_call(message_1)
mock_client.send_message.assert_any_call(message_2)
# Both calls yielded events
assert mock_event_1 in events_1
assert mock_event_2 in events_2
def test_send_message_replaces_dead_client(manager: Any) -> None:
"""If cached client has is_running == False, should create a new one,
start it, and cache the replacement."""
sandbox_id: UUID = uuid4()
session_id: UUID = uuid4()
message = "test message"
# Create a dead client (is_running = False) and place it in the cache
dead_client = _make_mock_client(is_running=False)
client_key = (sandbox_id, session_id)
manager._acp_clients[client_key] = dead_client
# Create the replacement client
new_event = _make_mock_event()
new_client = _make_mock_client(is_running=True)
new_client.send_message.return_value = iter([new_event])
with patch(_ACP_CLIENT_CLASS, return_value=new_client) as MockClass:
events = _drain_generator(manager.send_message(sandbox_id, session_id, message))
# A new client was constructed (the dead one was replaced)
MockClass.assert_called_once()
# New client was started and used
new_client.start.assert_called_once()
new_client.send_message.assert_called_once_with(message)
# Cache now holds the new client
assert manager._acp_clients[client_key] is new_client
# Events from new client were yielded
assert new_event in events
def test_send_message_different_sessions_get_different_clients(
manager: Any,
) -> None:
"""Two calls with different session_id values should create two
separate clients, each cached under its own key."""
sandbox_id: UUID = uuid4()
session_id_a: UUID = uuid4()
session_id_b: UUID = uuid4()
message = "test"
mock_client_a = _make_mock_client(is_running=True)
mock_client_b = _make_mock_client(is_running=True)
mock_event_a = _make_mock_event()
mock_event_b = _make_mock_event()
mock_client_a.send_message.return_value = iter([mock_event_a])
mock_client_b.send_message.return_value = iter([mock_event_b])
with patch(
_ACP_CLIENT_CLASS, side_effect=[mock_client_a, mock_client_b]
) as MockClass:
events_a = _drain_generator(
manager.send_message(sandbox_id, session_id_a, message)
)
events_b = _drain_generator(
manager.send_message(sandbox_id, session_id_b, message)
)
# Two separate clients were constructed
assert MockClass.call_count == 2
# Both were started
mock_client_a.start.assert_called_once()
mock_client_b.start.assert_called_once()
# Each is cached under a different key
assert manager._acp_clients[(sandbox_id, session_id_a)] is mock_client_a
assert manager._acp_clients[(sandbox_id, session_id_b)] is mock_client_b
# Events from each client are correct
assert mock_event_a in events_a
assert mock_event_b in events_b
def test_terminate_stops_all_sandbox_clients(manager: Any) -> None:
"""terminate(sandbox_id) should stop all cached clients for that
sandbox and remove them from the cache."""
sandbox_id: UUID = uuid4()
session_id_1: UUID = uuid4()
session_id_2: UUID = uuid4()
client_1 = _make_mock_client(is_running=True)
client_2 = _make_mock_client(is_running=True)
manager._acp_clients[(sandbox_id, session_id_1)] = client_1
manager._acp_clients[(sandbox_id, session_id_2)] = client_2
# Mock _cleanup_kubernetes_resources to prevent actual K8s calls
with patch.object(manager, "_cleanup_kubernetes_resources"):
manager.terminate(sandbox_id)
# Both clients should have been stopped
client_1.stop.assert_called_once()
client_2.stop.assert_called_once()
# Both should be removed from cache
assert (sandbox_id, session_id_1) not in manager._acp_clients
assert (sandbox_id, session_id_2) not in manager._acp_clients
def test_terminate_leaves_other_sandbox_clients(manager: Any) -> None:
"""terminate(sandbox_id_A) should NOT affect clients cached for
sandbox_id_B."""
sandbox_id_a: UUID = uuid4()
sandbox_id_b: UUID = uuid4()
session_id_a: UUID = uuid4()
session_id_b: UUID = uuid4()
client_a = _make_mock_client(is_running=True)
client_b = _make_mock_client(is_running=True)
manager._acp_clients[(sandbox_id_a, session_id_a)] = client_a
manager._acp_clients[(sandbox_id_b, session_id_b)] = client_b
# Terminate only sandbox A
with patch.object(manager, "_cleanup_kubernetes_resources"):
manager.terminate(sandbox_id_a)
# Client A stopped and removed
client_a.stop.assert_called_once()
assert (sandbox_id_a, session_id_a) not in manager._acp_clients
# Client B untouched
client_b.stop.assert_not_called()
assert (sandbox_id_b, session_id_b) in manager._acp_clients
assert manager._acp_clients[(sandbox_id_b, session_id_b)] is client_b
def test_cleanup_session_stops_session_client(manager: Any) -> None:
"""cleanup_session_workspace(sandbox_id, session_id) should stop and
remove the specific session's client from the cache."""
sandbox_id: UUID = uuid4()
session_id: UUID = uuid4()
cached_client = _make_mock_client(is_running=True)
manager._acp_clients[(sandbox_id, session_id)] = cached_client
# Mock the k8s exec call that runs the cleanup script
with patch.object(manager, "_stream_core_api") as mock_stream_api:
mock_stream_api.connect_get_namespaced_pod_exec = MagicMock()
with patch(f"{_K8S_MODULE}.k8s_stream", return_value="cleanup ok"):
manager.cleanup_session_workspace(sandbox_id, session_id)
# Client should have been stopped
cached_client.stop.assert_called_once()
# Client should be removed from the cache
assert (sandbox_id, session_id) not in manager._acp_clients
def test_cleanup_session_handles_no_cached_client(manager: Any) -> None:
"""cleanup_session_workspace() should not error when there's no cached
client for that session."""
sandbox_id: UUID = uuid4()
session_id: UUID = uuid4()
# Ensure no client is cached for this pair
assert (sandbox_id, session_id) not in manager._acp_clients
# Mock the k8s exec call that runs the cleanup script
with patch.object(manager, "_stream_core_api") as mock_stream_api:
mock_stream_api.connect_get_namespaced_pod_exec = MagicMock()
with patch(f"{_K8S_MODULE}.k8s_stream", return_value="cleanup ok"):
# This should NOT raise
manager.cleanup_session_workspace(sandbox_id, session_id)
# Cache is still empty for this key
assert (sandbox_id, session_id) not in manager._acp_clients