mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-02-25 11:45:47 +00:00
Compare commits
21 Commits
ci_python_
...
experiment
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4143dfbb5d | ||
|
|
5ec8aff704 | ||
|
|
625b472df4 | ||
|
|
1878f97444 | ||
|
|
2690918386 | ||
|
|
d8bc7ae1dc | ||
|
|
2d5b829e63 | ||
|
|
495e2f7c52 | ||
|
|
189cc5bc3c | ||
|
|
23ec38662e | ||
|
|
57d741c5b3 | ||
|
|
021af74739 | ||
|
|
adaca6a353 | ||
|
|
4cd07d7bbc | ||
|
|
bf3c98142d | ||
|
|
f023599618 | ||
|
|
c55cb899f7 | ||
|
|
9b8a6e60b7 | ||
|
|
dd9d201b51 | ||
|
|
c545819aa6 | ||
|
|
960ee228bf |
@@ -4,8 +4,9 @@ This client runs `opencode acp` directly in the sandbox pod via kubernetes exec,
|
||||
using stdin/stdout for JSON-RPC communication. This bypasses the HTTP server
|
||||
and uses the native ACP subprocess protocol.
|
||||
|
||||
This module includes comprehensive logging for debugging ACP communication.
|
||||
Enable logging by setting LOG_LEVEL=DEBUG or BUILD_PACKET_LOGGING=true.
|
||||
Each message creates an ephemeral client (start → resume_or_create_session →
|
||||
send_message → stop) to prevent concurrent processes from corrupting
|
||||
opencode's flat file session storage.
|
||||
|
||||
Usage:
|
||||
client = ACPExecClient(
|
||||
@@ -13,7 +14,8 @@ Usage:
|
||||
namespace="onyx-sandboxes",
|
||||
)
|
||||
client.start(cwd="/workspace")
|
||||
for event in client.send_message("What files are here?"):
|
||||
session_id = client.resume_or_create_session(cwd="/workspace/sessions/abc")
|
||||
for event in client.send_message("What files are here?", session_id=session_id):
|
||||
print(event)
|
||||
client.stop()
|
||||
"""
|
||||
@@ -100,7 +102,7 @@ class ACPClientState:
|
||||
"""Internal state for the ACP client."""
|
||||
|
||||
initialized: bool = False
|
||||
current_session: ACPSession | None = None
|
||||
sessions: dict[str, ACPSession] = field(default_factory=dict)
|
||||
next_request_id: int = 0
|
||||
agent_capabilities: dict[str, Any] = field(default_factory=dict)
|
||||
agent_info: dict[str, Any] = field(default_factory=dict)
|
||||
@@ -155,16 +157,16 @@ class ACPExecClient:
|
||||
self._k8s_client = client.CoreV1Api()
|
||||
return self._k8s_client
|
||||
|
||||
def start(self, cwd: str = "/workspace", timeout: float = 30.0) -> str:
|
||||
"""Start the agent process via exec and initialize a session.
|
||||
def start(self, cwd: str = "/workspace", timeout: float = 30.0) -> None:
|
||||
"""Start the agent process via exec and initialize the ACP connection.
|
||||
|
||||
Only performs the ACP `initialize` handshake. Sessions are created
|
||||
separately via `resume_or_create_session()`.
|
||||
|
||||
Args:
|
||||
cwd: Working directory for the agent
|
||||
cwd: Working directory for the `opencode acp` process
|
||||
timeout: Timeout for initialization
|
||||
|
||||
Returns:
|
||||
The session ID
|
||||
|
||||
Raises:
|
||||
RuntimeError: If startup fails
|
||||
"""
|
||||
@@ -176,6 +178,8 @@ class ACPExecClient:
|
||||
# Start opencode acp via exec
|
||||
exec_command = ["opencode", "acp", "--cwd", cwd]
|
||||
|
||||
logger.info(f"[ACP] Starting client: pod={self._pod_name} cwd={cwd}")
|
||||
|
||||
try:
|
||||
self._ws_client = k8s_stream(
|
||||
k8s.connect_get_namespaced_pod_exec,
|
||||
@@ -201,15 +205,13 @@ class ACPExecClient:
|
||||
# Give process a moment to start
|
||||
time.sleep(0.5)
|
||||
|
||||
# Initialize ACP connection
|
||||
# Initialize ACP connection (no session creation)
|
||||
self._initialize(timeout=timeout)
|
||||
|
||||
# Create session
|
||||
session_id = self._create_session(cwd=cwd, timeout=timeout)
|
||||
|
||||
return session_id
|
||||
logger.info(f"[ACP] Client started: pod={self._pod_name}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[ACP] Client start failed: pod={self._pod_name} error={e}")
|
||||
self.stop()
|
||||
raise RuntimeError(f"Failed to start ACP exec client: {e}") from e
|
||||
|
||||
@@ -218,62 +220,61 @@ class ACPExecClient:
|
||||
buffer = ""
|
||||
packet_logger = get_packet_logger()
|
||||
|
||||
while not self._stop_reader.is_set():
|
||||
if self._ws_client is None:
|
||||
break
|
||||
|
||||
try:
|
||||
if self._ws_client.is_open():
|
||||
# Read available data
|
||||
self._ws_client.update(timeout=0.1)
|
||||
|
||||
# Read stdout (channel 1)
|
||||
data = self._ws_client.read_stdout(timeout=0.1)
|
||||
if data:
|
||||
buffer += data
|
||||
|
||||
# Process complete lines
|
||||
while "\n" in buffer:
|
||||
line, buffer = buffer.split("\n", 1)
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
message = json.loads(line)
|
||||
# Log the raw incoming message
|
||||
packet_logger.log_jsonrpc_raw_message(
|
||||
"IN", message, context="k8s"
|
||||
)
|
||||
self._response_queue.put(message)
|
||||
except json.JSONDecodeError:
|
||||
packet_logger.log_raw(
|
||||
"JSONRPC-PARSE-ERROR-K8S",
|
||||
{
|
||||
"raw_line": line[:500],
|
||||
"error": "JSON decode failed",
|
||||
},
|
||||
)
|
||||
logger.warning(
|
||||
f"Invalid JSON from agent: {line[:100]}"
|
||||
)
|
||||
|
||||
else:
|
||||
packet_logger.log_raw(
|
||||
"K8S-WEBSOCKET-CLOSED",
|
||||
{"pod": self._pod_name, "namespace": self._namespace},
|
||||
)
|
||||
try:
|
||||
while not self._stop_reader.is_set():
|
||||
if self._ws_client is None:
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
if not self._stop_reader.is_set():
|
||||
packet_logger.log_raw(
|
||||
"K8S-READER-ERROR",
|
||||
{"error": str(e), "pod": self._pod_name},
|
||||
)
|
||||
logger.debug(f"Reader error: {e}")
|
||||
break
|
||||
try:
|
||||
if self._ws_client.is_open():
|
||||
self._ws_client.update(timeout=0.1)
|
||||
|
||||
# Read stderr - log any agent errors
|
||||
stderr_data = self._ws_client.read_stderr(timeout=0.01)
|
||||
if stderr_data:
|
||||
logger.warning(
|
||||
f"[ACP] stderr pod={self._pod_name}: "
|
||||
f"{stderr_data.strip()[:500]}"
|
||||
)
|
||||
|
||||
# Read stdout
|
||||
data = self._ws_client.read_stdout(timeout=0.1)
|
||||
if data:
|
||||
buffer += data
|
||||
|
||||
while "\n" in buffer:
|
||||
line, buffer = buffer.split("\n", 1)
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
message = json.loads(line)
|
||||
packet_logger.log_jsonrpc_raw_message(
|
||||
"IN", message, context="k8s"
|
||||
)
|
||||
self._response_queue.put(message)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(
|
||||
f"[ACP] Invalid JSON from agent: "
|
||||
f"{line[:100]}"
|
||||
)
|
||||
|
||||
else:
|
||||
logger.warning(f"[ACP] WebSocket closed: pod={self._pod_name}")
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
if not self._stop_reader.is_set():
|
||||
logger.warning(f"[ACP] Reader error: {e}, pod={self._pod_name}")
|
||||
break
|
||||
finally:
|
||||
pass
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the exec session and clean up."""
|
||||
session_ids = list(self._state.sessions.keys())
|
||||
logger.info(
|
||||
f"[ACP] Stopping client: pod={self._pod_name} " f"sessions={session_ids}"
|
||||
)
|
||||
self._stop_reader.set()
|
||||
|
||||
if self._ws_client is not None:
|
||||
@@ -400,42 +401,159 @@ class ACPExecClient:
|
||||
if not session_id:
|
||||
raise RuntimeError("No session ID returned from session/new")
|
||||
|
||||
self._state.current_session = ACPSession(session_id=session_id, cwd=cwd)
|
||||
self._state.sessions[session_id] = ACPSession(session_id=session_id, cwd=cwd)
|
||||
logger.info(f"[ACP] Created session: acp_session={session_id} cwd={cwd}")
|
||||
|
||||
return session_id
|
||||
|
||||
def _list_sessions(self, cwd: str, timeout: float = 10.0) -> list[dict[str, Any]]:
|
||||
"""List available ACP sessions, filtered by working directory.
|
||||
|
||||
Returns:
|
||||
List of session info dicts with keys like 'sessionId', 'cwd', 'title'.
|
||||
Empty list if session/list is not supported or fails.
|
||||
"""
|
||||
try:
|
||||
request_id = self._send_request("session/list", {"cwd": cwd})
|
||||
result = self._wait_for_response(request_id, timeout)
|
||||
sessions = result.get("sessions", [])
|
||||
logger.info(f"[ACP] session/list: {len(sessions)} sessions for cwd={cwd}")
|
||||
return sessions
|
||||
except Exception as e:
|
||||
logger.info(f"[ACP] session/list unavailable: {e}")
|
||||
return []
|
||||
|
||||
def _resume_session(self, session_id: str, cwd: str, timeout: float = 30.0) -> str:
|
||||
"""Resume an existing ACP session.
|
||||
|
||||
Args:
|
||||
session_id: The ACP session ID to resume
|
||||
cwd: Working directory for the session
|
||||
timeout: Timeout for the resume request
|
||||
|
||||
Returns:
|
||||
The session ID
|
||||
|
||||
Raises:
|
||||
RuntimeError: If resume fails
|
||||
"""
|
||||
params = {
|
||||
"sessionId": session_id,
|
||||
"cwd": cwd,
|
||||
"mcpServers": [],
|
||||
}
|
||||
|
||||
request_id = self._send_request("session/resume", params)
|
||||
result = self._wait_for_response(request_id, timeout)
|
||||
|
||||
# The response should contain the session ID
|
||||
resumed_id = result.get("sessionId", session_id)
|
||||
self._state.sessions[resumed_id] = ACPSession(session_id=resumed_id, cwd=cwd)
|
||||
|
||||
logger.info(f"[ACP] Resumed session: acp_session={resumed_id} cwd={cwd}")
|
||||
return resumed_id
|
||||
|
||||
def _try_resume_existing_session(self, cwd: str, timeout: float) -> str | None:
|
||||
"""Try to find and resume an existing session for this workspace.
|
||||
|
||||
When multiple API server replicas connect to the same sandbox pod,
|
||||
a previous replica may have already created an ACP session for this
|
||||
workspace. This method discovers and resumes that session so the
|
||||
agent retains conversation context.
|
||||
|
||||
Args:
|
||||
cwd: Working directory to search for sessions
|
||||
timeout: Timeout for ACP requests
|
||||
|
||||
Returns:
|
||||
The resumed session ID, or None if no session could be resumed
|
||||
"""
|
||||
# Check if the agent supports session/list + session/resume
|
||||
session_caps = self._state.agent_capabilities.get("sessionCapabilities", {})
|
||||
supports_list = session_caps.get("list") is not None
|
||||
supports_resume = session_caps.get("resume") is not None
|
||||
|
||||
if not supports_list or not supports_resume:
|
||||
logger.debug("[ACP] Agent does not support session resume")
|
||||
return None
|
||||
|
||||
# List sessions for this workspace directory
|
||||
sessions = self._list_sessions(cwd, timeout=min(timeout, 10.0))
|
||||
if not sessions:
|
||||
return None
|
||||
|
||||
# Pick the most recent session (first in list, assuming sorted)
|
||||
target = sessions[0]
|
||||
target_id = target.get("sessionId")
|
||||
if not target_id:
|
||||
logger.warning("[ACP] session/list returned session without sessionId")
|
||||
return None
|
||||
|
||||
logger.info(
|
||||
f"[ACP] Resuming existing session: acp_session={target_id} "
|
||||
f"(found {len(sessions)})"
|
||||
)
|
||||
|
||||
try:
|
||||
return self._resume_session(target_id, cwd, timeout)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[ACP] session/resume failed for {target_id}: {e}, "
|
||||
f"falling back to session/new"
|
||||
)
|
||||
return None
|
||||
|
||||
def resume_or_create_session(self, cwd: str, timeout: float = 30.0) -> str:
|
||||
"""Resume a session from opencode's on-disk storage, or create a new one.
|
||||
|
||||
With ephemeral clients (one process per message), this always hits disk.
|
||||
Tries resume first to preserve conversation context, falls back to new.
|
||||
|
||||
Args:
|
||||
cwd: Working directory for the session
|
||||
timeout: Timeout for ACP requests
|
||||
|
||||
Returns:
|
||||
The ACP session ID
|
||||
"""
|
||||
if not self._state.initialized:
|
||||
raise RuntimeError("Client not initialized. Call start() first.")
|
||||
|
||||
# Try to resume from opencode's persisted storage
|
||||
resumed_id = self._try_resume_existing_session(cwd, timeout)
|
||||
if resumed_id:
|
||||
return resumed_id
|
||||
|
||||
# Create a new session
|
||||
return self._create_session(cwd=cwd, timeout=timeout)
|
||||
|
||||
def send_message(
|
||||
self,
|
||||
message: str,
|
||||
session_id: str,
|
||||
timeout: float = ACP_MESSAGE_TIMEOUT,
|
||||
) -> Generator[ACPEvent, None, None]:
|
||||
"""Send a message and stream response events.
|
||||
"""Send a message to a specific session and stream response events.
|
||||
|
||||
Args:
|
||||
message: The message content to send
|
||||
session_id: The ACP session ID to send the message to
|
||||
timeout: Maximum time to wait for complete response (defaults to ACP_MESSAGE_TIMEOUT env var)
|
||||
|
||||
Yields:
|
||||
Typed ACP schema event objects
|
||||
"""
|
||||
if self._state.current_session is None:
|
||||
raise RuntimeError("No active session. Call start() first.")
|
||||
|
||||
session_id = self._state.current_session.session_id
|
||||
if session_id not in self._state.sessions:
|
||||
raise RuntimeError(
|
||||
f"Unknown session {session_id}. "
|
||||
f"Known sessions: {list(self._state.sessions.keys())}"
|
||||
)
|
||||
packet_logger = get_packet_logger()
|
||||
|
||||
# Log the start of message processing
|
||||
packet_logger.log_raw(
|
||||
"ACP-SEND-MESSAGE-START-K8S",
|
||||
{
|
||||
"session_id": session_id,
|
||||
"pod": self._pod_name,
|
||||
"namespace": self._namespace,
|
||||
"message_preview": (
|
||||
message[:200] + "..." if len(message) > 200 else message
|
||||
),
|
||||
"timeout": timeout,
|
||||
},
|
||||
logger.info(
|
||||
f"[ACP] Sending prompt: "
|
||||
f"acp_session={session_id} pod={self._pod_name} "
|
||||
f"queue_backlog={self._response_queue.qsize()}"
|
||||
)
|
||||
|
||||
prompt_content = [{"type": "text", "text": message}]
|
||||
@@ -446,44 +564,88 @@ class ACPExecClient:
|
||||
|
||||
request_id = self._send_request("session/prompt", params)
|
||||
start_time = time.time()
|
||||
last_event_time = time.time() # Track time since last event for keepalive
|
||||
last_event_time = time.time()
|
||||
events_yielded = 0
|
||||
keepalive_count = 0
|
||||
completion_reason = "unknown"
|
||||
|
||||
while True:
|
||||
remaining = timeout - (time.time() - start_time)
|
||||
if remaining <= 0:
|
||||
packet_logger.log_raw(
|
||||
"ACP-TIMEOUT-K8S",
|
||||
{
|
||||
"session_id": session_id,
|
||||
"elapsed_ms": (time.time() - start_time) * 1000,
|
||||
},
|
||||
completion_reason = "timeout"
|
||||
logger.warning(
|
||||
f"[ACP] Prompt timeout: "
|
||||
f"acp_session={session_id} events={events_yielded}, "
|
||||
f"sending session/cancel"
|
||||
)
|
||||
try:
|
||||
self.cancel(session_id=session_id)
|
||||
except Exception as cancel_err:
|
||||
logger.warning(
|
||||
f"[ACP] session/cancel failed on timeout: {cancel_err}"
|
||||
)
|
||||
yield Error(code=-1, message="Timeout waiting for response")
|
||||
break
|
||||
|
||||
try:
|
||||
message_data = self._response_queue.get(timeout=min(remaining, 1.0))
|
||||
last_event_time = time.time() # Reset keepalive timer on event
|
||||
last_event_time = time.time()
|
||||
except Empty:
|
||||
# Check if we need to send an SSE keepalive
|
||||
# Check if reader thread is still alive
|
||||
if (
|
||||
self._reader_thread is not None
|
||||
and not self._reader_thread.is_alive()
|
||||
):
|
||||
completion_reason = "reader_thread_dead"
|
||||
# Drain any final messages the reader flushed before dying
|
||||
while not self._response_queue.empty():
|
||||
try:
|
||||
final_msg = self._response_queue.get_nowait()
|
||||
if final_msg.get("id") == request_id:
|
||||
if "error" in final_msg:
|
||||
error_data = final_msg["error"]
|
||||
yield Error(
|
||||
code=error_data.get("code", -1),
|
||||
message=error_data.get(
|
||||
"message", "Unknown error"
|
||||
),
|
||||
)
|
||||
else:
|
||||
result = final_msg.get("result", {})
|
||||
try:
|
||||
yield PromptResponse.model_validate(result)
|
||||
except ValidationError:
|
||||
pass
|
||||
break
|
||||
except Empty:
|
||||
break
|
||||
|
||||
logger.warning(
|
||||
f"[ACP] Reader thread dead: "
|
||||
f"acp_session={session_id} events={events_yielded}"
|
||||
)
|
||||
break
|
||||
|
||||
# Send SSE keepalive if idle
|
||||
idle_time = time.time() - last_event_time
|
||||
if idle_time >= SSE_KEEPALIVE_INTERVAL:
|
||||
packet_logger.log_raw(
|
||||
"SSE-KEEPALIVE-YIELD",
|
||||
{
|
||||
"session_id": session_id,
|
||||
"idle_seconds": idle_time,
|
||||
},
|
||||
)
|
||||
keepalive_count += 1
|
||||
yield SSEKeepalive()
|
||||
last_event_time = time.time() # Reset after yielding keepalive
|
||||
last_event_time = time.time()
|
||||
continue
|
||||
|
||||
# Check for response to our prompt request
|
||||
if message_data.get("id") == request_id:
|
||||
# Check for JSON-RPC response to our prompt request.
|
||||
msg_id = message_data.get("id")
|
||||
is_response = "method" not in message_data and (
|
||||
msg_id == request_id
|
||||
or (msg_id is not None and str(msg_id) == str(request_id))
|
||||
)
|
||||
if is_response:
|
||||
completion_reason = "jsonrpc_response"
|
||||
if "error" in message_data:
|
||||
error_data = message_data["error"]
|
||||
completion_reason = "jsonrpc_error"
|
||||
logger.warning(f"[ACP] Prompt error: {error_data}")
|
||||
packet_logger.log_jsonrpc_response(
|
||||
request_id, error=error_data, context="k8s"
|
||||
)
|
||||
@@ -498,26 +660,16 @@ class ACPExecClient:
|
||||
)
|
||||
try:
|
||||
prompt_response = PromptResponse.model_validate(result)
|
||||
packet_logger.log_acp_event_yielded(
|
||||
"prompt_response", prompt_response
|
||||
)
|
||||
events_yielded += 1
|
||||
yield prompt_response
|
||||
except ValidationError as e:
|
||||
packet_logger.log_raw(
|
||||
"ACP-VALIDATION-ERROR-K8S",
|
||||
{"type": "prompt_response", "error": str(e)},
|
||||
)
|
||||
logger.error(f"[ACP] PromptResponse validation failed: {e}")
|
||||
|
||||
# Log completion summary
|
||||
elapsed_ms = (time.time() - start_time) * 1000
|
||||
packet_logger.log_raw(
|
||||
"ACP-SEND-MESSAGE-COMPLETE-K8S",
|
||||
{
|
||||
"session_id": session_id,
|
||||
"events_yielded": events_yielded,
|
||||
"elapsed_ms": elapsed_ms,
|
||||
},
|
||||
logger.info(
|
||||
f"[ACP] Prompt complete: "
|
||||
f"reason={completion_reason} acp_session={session_id} "
|
||||
f"events={events_yielded} elapsed={elapsed_ms:.0f}ms"
|
||||
)
|
||||
break
|
||||
|
||||
@@ -526,25 +678,29 @@ class ACPExecClient:
|
||||
params_data = message_data.get("params", {})
|
||||
update = params_data.get("update", {})
|
||||
|
||||
# Log the notification
|
||||
packet_logger.log_jsonrpc_notification(
|
||||
"session/update",
|
||||
{"update_type": update.get("sessionUpdate")},
|
||||
context="k8s",
|
||||
)
|
||||
|
||||
prompt_complete = False
|
||||
for event in self._process_session_update(update):
|
||||
events_yielded += 1
|
||||
# Log each yielded event
|
||||
event_type = self._get_event_type_name(event)
|
||||
packet_logger.log_acp_event_yielded(event_type, event)
|
||||
yield event
|
||||
if isinstance(event, PromptResponse):
|
||||
prompt_complete = True
|
||||
break
|
||||
|
||||
if prompt_complete:
|
||||
completion_reason = "prompt_response_via_notification"
|
||||
elapsed_ms = (time.time() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[ACP] Prompt complete: "
|
||||
f"reason={completion_reason} acp_session={session_id} "
|
||||
f"events={events_yielded} elapsed={elapsed_ms:.0f}ms"
|
||||
)
|
||||
break
|
||||
|
||||
# Handle requests from agent - send error response
|
||||
elif "method" in message_data and "id" in message_data:
|
||||
packet_logger.log_raw(
|
||||
"ACP-UNSUPPORTED-REQUEST-K8S",
|
||||
{"method": message_data["method"], "id": message_data["id"]},
|
||||
logger.debug(
|
||||
f"[ACP] Unsupported agent request: "
|
||||
f"method={message_data['method']}"
|
||||
)
|
||||
self._send_error_response(
|
||||
message_data["id"],
|
||||
@@ -552,113 +708,47 @@ class ACPExecClient:
|
||||
f"Method not supported: {message_data['method']}",
|
||||
)
|
||||
|
||||
def _get_event_type_name(self, event: ACPEvent) -> str:
|
||||
"""Get the type name for an ACP event."""
|
||||
if isinstance(event, AgentMessageChunk):
|
||||
return "agent_message_chunk"
|
||||
elif isinstance(event, AgentThoughtChunk):
|
||||
return "agent_thought_chunk"
|
||||
elif isinstance(event, ToolCallStart):
|
||||
return "tool_call_start"
|
||||
elif isinstance(event, ToolCallProgress):
|
||||
return "tool_call_progress"
|
||||
elif isinstance(event, AgentPlanUpdate):
|
||||
return "agent_plan_update"
|
||||
elif isinstance(event, CurrentModeUpdate):
|
||||
return "current_mode_update"
|
||||
elif isinstance(event, PromptResponse):
|
||||
return "prompt_response"
|
||||
elif isinstance(event, Error):
|
||||
return "error"
|
||||
elif isinstance(event, SSEKeepalive):
|
||||
return "sse_keepalive"
|
||||
return "unknown"
|
||||
else:
|
||||
logger.warning(
|
||||
f"[ACP] Unhandled message: "
|
||||
f"id={message_data.get('id')} "
|
||||
f"method={message_data.get('method')} "
|
||||
f"keys={list(message_data.keys())}"
|
||||
)
|
||||
|
||||
def _process_session_update(
|
||||
self, update: dict[str, Any]
|
||||
) -> Generator[ACPEvent, None, None]:
|
||||
"""Process a session/update notification and yield typed ACP schema objects."""
|
||||
update_type = update.get("sessionUpdate")
|
||||
packet_logger = get_packet_logger()
|
||||
|
||||
if update_type == "agent_message_chunk":
|
||||
# Map update types to their ACP schema classes.
|
||||
# Note: prompt_response is included because ACP sometimes sends it as a
|
||||
# notification WITHOUT a corresponding JSON-RPC response. We accept
|
||||
# either signal as turn completion (first one wins).
|
||||
type_map: dict[str, type] = {
|
||||
"agent_message_chunk": AgentMessageChunk,
|
||||
"agent_thought_chunk": AgentThoughtChunk,
|
||||
"tool_call": ToolCallStart,
|
||||
"tool_call_update": ToolCallProgress,
|
||||
"plan": AgentPlanUpdate,
|
||||
"current_mode_update": CurrentModeUpdate,
|
||||
"prompt_response": PromptResponse,
|
||||
}
|
||||
|
||||
model_class = type_map.get(update_type) # type: ignore[arg-type]
|
||||
if model_class is not None:
|
||||
try:
|
||||
yield AgentMessageChunk.model_validate(update)
|
||||
yield model_class.model_validate(update)
|
||||
except ValidationError as e:
|
||||
packet_logger.log_raw(
|
||||
"ACP-VALIDATION-ERROR-K8S",
|
||||
{"update_type": update_type, "error": str(e), "update": update},
|
||||
)
|
||||
|
||||
elif update_type == "agent_thought_chunk":
|
||||
try:
|
||||
yield AgentThoughtChunk.model_validate(update)
|
||||
except ValidationError as e:
|
||||
packet_logger.log_raw(
|
||||
"ACP-VALIDATION-ERROR-K8S",
|
||||
{"update_type": update_type, "error": str(e), "update": update},
|
||||
)
|
||||
|
||||
elif update_type == "user_message_chunk":
|
||||
# Echo of user message - skip but log
|
||||
packet_logger.log_raw(
|
||||
"ACP-SKIPPED-UPDATE-K8S", {"type": "user_message_chunk"}
|
||||
)
|
||||
|
||||
elif update_type == "tool_call":
|
||||
try:
|
||||
yield ToolCallStart.model_validate(update)
|
||||
except ValidationError as e:
|
||||
packet_logger.log_raw(
|
||||
"ACP-VALIDATION-ERROR-K8S",
|
||||
{"update_type": update_type, "error": str(e), "update": update},
|
||||
)
|
||||
|
||||
elif update_type == "tool_call_update":
|
||||
try:
|
||||
yield ToolCallProgress.model_validate(update)
|
||||
except ValidationError as e:
|
||||
packet_logger.log_raw(
|
||||
"ACP-VALIDATION-ERROR-K8S",
|
||||
{"update_type": update_type, "error": str(e), "update": update},
|
||||
)
|
||||
|
||||
elif update_type == "plan":
|
||||
try:
|
||||
yield AgentPlanUpdate.model_validate(update)
|
||||
except ValidationError as e:
|
||||
packet_logger.log_raw(
|
||||
"ACP-VALIDATION-ERROR-K8S",
|
||||
{"update_type": update_type, "error": str(e), "update": update},
|
||||
)
|
||||
|
||||
elif update_type == "current_mode_update":
|
||||
try:
|
||||
yield CurrentModeUpdate.model_validate(update)
|
||||
except ValidationError as e:
|
||||
packet_logger.log_raw(
|
||||
"ACP-VALIDATION-ERROR-K8S",
|
||||
{"update_type": update_type, "error": str(e), "update": update},
|
||||
)
|
||||
|
||||
elif update_type == "available_commands_update":
|
||||
# Skip command updates
|
||||
packet_logger.log_raw(
|
||||
"ACP-SKIPPED-UPDATE-K8S", {"type": "available_commands_update"}
|
||||
)
|
||||
|
||||
elif update_type == "session_info_update":
|
||||
# Skip session info updates
|
||||
packet_logger.log_raw(
|
||||
"ACP-SKIPPED-UPDATE-K8S", {"type": "session_info_update"}
|
||||
)
|
||||
|
||||
else:
|
||||
# Unknown update types are logged
|
||||
packet_logger.log_raw(
|
||||
"ACP-UNKNOWN-UPDATE-TYPE-K8S",
|
||||
{"update_type": update_type, "update": update},
|
||||
)
|
||||
logger.warning(f"[ACP] Validation error for {update_type}: {e}")
|
||||
elif update_type not in (
|
||||
"user_message_chunk",
|
||||
"available_commands_update",
|
||||
"session_info_update",
|
||||
"usage_update",
|
||||
):
|
||||
logger.debug(f"[ACP] Unknown update type: {update_type}")
|
||||
|
||||
def _send_error_response(self, request_id: int, code: int, message: str) -> None:
|
||||
"""Send an error response to an agent request."""
|
||||
@@ -673,15 +763,24 @@ class ACPExecClient:
|
||||
|
||||
self._ws_client.write_stdin(json.dumps(response) + "\n")
|
||||
|
||||
def cancel(self) -> None:
|
||||
"""Cancel the current operation."""
|
||||
if self._state.current_session is None:
|
||||
return
|
||||
def cancel(self, session_id: str | None = None) -> None:
|
||||
"""Cancel the current operation on a session.
|
||||
|
||||
self._send_notification(
|
||||
"session/cancel",
|
||||
{"sessionId": self._state.current_session.session_id},
|
||||
)
|
||||
Args:
|
||||
session_id: The ACP session ID to cancel. If None, cancels all sessions.
|
||||
"""
|
||||
if session_id:
|
||||
if session_id in self._state.sessions:
|
||||
self._send_notification(
|
||||
"session/cancel",
|
||||
{"sessionId": session_id},
|
||||
)
|
||||
else:
|
||||
for sid in self._state.sessions:
|
||||
self._send_notification(
|
||||
"session/cancel",
|
||||
{"sessionId": sid},
|
||||
)
|
||||
|
||||
def health_check(self, timeout: float = 5.0) -> bool: # noqa: ARG002
|
||||
"""Check if we can exec into the pod."""
|
||||
@@ -707,13 +806,6 @@ class ACPExecClient:
|
||||
"""Check if the exec session is running."""
|
||||
return self._ws_client is not None and self._ws_client.is_open()
|
||||
|
||||
@property
|
||||
def session_id(self) -> str | None:
|
||||
"""Get the current session ID, if any."""
|
||||
if self._state.current_session:
|
||||
return self._state.current_session.session_id
|
||||
return None
|
||||
|
||||
def __enter__(self) -> "ACPExecClient":
|
||||
"""Context manager entry."""
|
||||
return self
|
||||
|
||||
@@ -50,6 +50,7 @@ from pathlib import Path
|
||||
from uuid import UUID
|
||||
from uuid import uuid4
|
||||
|
||||
from acp.schema import PromptResponse
|
||||
from kubernetes import client # type: ignore
|
||||
from kubernetes import config
|
||||
from kubernetes.client.rest import ApiException # type: ignore
|
||||
@@ -97,6 +98,10 @@ from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
# API server pod hostname — used to identify which replica is handling a request.
|
||||
# In K8s, HOSTNAME is set to the pod name (e.g., "api-server-dpgg7").
|
||||
_API_SERVER_HOSTNAME = os.environ.get("HOSTNAME", "unknown")
|
||||
|
||||
# Constants for pod configuration
|
||||
# Note: Next.js ports are dynamically allocated from SANDBOX_NEXTJS_PORT_START to
|
||||
# SANDBOX_NEXTJS_PORT_END range, with one port per session.
|
||||
@@ -1156,7 +1161,9 @@ done
|
||||
def terminate(self, sandbox_id: UUID) -> None:
|
||||
"""Terminate a sandbox and clean up Kubernetes resources.
|
||||
|
||||
Deletes the Service and Pod for the sandbox.
|
||||
Removes session mappings for this sandbox, then deletes the
|
||||
Service and Pod. ACP clients are ephemeral (created per message),
|
||||
so there's nothing to stop here.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID to terminate
|
||||
@@ -1395,7 +1402,8 @@ echo "Session workspace setup complete"
|
||||
) -> None:
|
||||
"""Clean up a session workspace (on session delete).
|
||||
|
||||
Executes kubectl exec to remove the session directory.
|
||||
Removes the ACP session mapping and executes kubectl exec to remove
|
||||
the session directory. The shared ACP client persists for other sessions.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
@@ -1807,6 +1815,36 @@ echo "Session config regeneration complete"
|
||||
)
|
||||
return exec_client.health_check(timeout=timeout)
|
||||
|
||||
def _create_ephemeral_acp_client(self, sandbox_id: UUID) -> ACPExecClient:
|
||||
"""Create a new ephemeral ACP client for a single message exchange.
|
||||
|
||||
Each call starts a fresh `opencode acp` process in the sandbox pod.
|
||||
The process is short-lived — stopped after the message completes.
|
||||
This prevents the bug where multiple long-lived processes (one per
|
||||
API replica) operate on the same session's flat file storage
|
||||
concurrently, causing the JSON-RPC response to be silently lost.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
|
||||
Returns:
|
||||
A running ACPExecClient (caller must stop it when done)
|
||||
"""
|
||||
pod_name = self._get_pod_name(str(sandbox_id))
|
||||
acp_client = ACPExecClient(
|
||||
pod_name=pod_name,
|
||||
namespace=self._namespace,
|
||||
container="sandbox",
|
||||
)
|
||||
acp_client.start(cwd="/workspace")
|
||||
|
||||
logger.info(
|
||||
f"[SANDBOX-ACP] Created ephemeral ACP client: "
|
||||
f"sandbox={sandbox_id} pod={pod_name} "
|
||||
f"api_pod={_API_SERVER_HOSTNAME}"
|
||||
)
|
||||
return acp_client
|
||||
|
||||
def send_message(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
@@ -1815,8 +1853,12 @@ echo "Session config regeneration complete"
|
||||
) -> Generator[ACPEvent, None, None]:
|
||||
"""Send a message to the CLI agent and stream ACP events.
|
||||
|
||||
Runs `opencode acp` via kubectl exec in the sandbox pod.
|
||||
The agent runs in the session-specific workspace.
|
||||
Creates an ephemeral `opencode acp` process for each message.
|
||||
The process resumes the session from opencode's on-disk storage,
|
||||
handles the prompt, then is stopped. This ensures only one process
|
||||
operates on a session's flat files at a time, preventing the bug
|
||||
where multiple long-lived processes (one per API replica) corrupt
|
||||
each other's in-memory state.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
@@ -1827,67 +1869,103 @@ echo "Session config regeneration complete"
|
||||
Typed ACP schema event objects
|
||||
"""
|
||||
packet_logger = get_packet_logger()
|
||||
pod_name = self._get_pod_name(str(sandbox_id))
|
||||
session_path = f"/workspace/sessions/{session_id}"
|
||||
|
||||
# Log ACP client creation
|
||||
packet_logger.log_acp_client_start(
|
||||
sandbox_id, session_id, session_path, context="k8s"
|
||||
)
|
||||
# Create an ephemeral ACP client for this message
|
||||
acp_client = self._create_ephemeral_acp_client(sandbox_id)
|
||||
|
||||
exec_client = ACPExecClient(
|
||||
pod_name=pod_name,
|
||||
namespace=self._namespace,
|
||||
container="sandbox",
|
||||
)
|
||||
|
||||
# Log the send_message call at sandbox manager level
|
||||
packet_logger.log_session_start(session_id, sandbox_id, message)
|
||||
|
||||
events_count = 0
|
||||
try:
|
||||
exec_client.start(cwd=session_path)
|
||||
for event in exec_client.send_message(message):
|
||||
events_count += 1
|
||||
yield event
|
||||
# Resume (or create) the ACP session from opencode's on-disk storage
|
||||
session_path = f"/workspace/sessions/{session_id}"
|
||||
acp_session_id = acp_client.resume_or_create_session(cwd=session_path)
|
||||
|
||||
# Log successful completion
|
||||
packet_logger.log_session_end(
|
||||
session_id, success=True, events_count=events_count
|
||||
logger.info(
|
||||
f"[SANDBOX-ACP] Sending message: "
|
||||
f"session={session_id} acp_session={acp_session_id} "
|
||||
f"api_pod={_API_SERVER_HOSTNAME}"
|
||||
)
|
||||
except GeneratorExit:
|
||||
# Generator was closed by consumer (client disconnect, timeout, broken pipe)
|
||||
# This is the most common failure mode for SSE streaming
|
||||
packet_logger.log_session_end(
|
||||
session_id,
|
||||
success=False,
|
||||
error="GeneratorExit: Client disconnected or stream closed by consumer",
|
||||
events_count=events_count,
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
# Log failure from normal exceptions
|
||||
packet_logger.log_session_end(
|
||||
session_id,
|
||||
success=False,
|
||||
error=f"Exception: {str(e)}",
|
||||
events_count=events_count,
|
||||
)
|
||||
raise
|
||||
except BaseException as e:
|
||||
# Log failure from other base exceptions (SystemExit, KeyboardInterrupt, etc.)
|
||||
exception_type = type(e).__name__
|
||||
packet_logger.log_session_end(
|
||||
session_id,
|
||||
success=False,
|
||||
error=f"{exception_type}: {str(e) if str(e) else 'System-level interruption'}",
|
||||
events_count=events_count,
|
||||
)
|
||||
raise
|
||||
|
||||
# Log the send_message call at sandbox manager level
|
||||
packet_logger.log_session_start(session_id, sandbox_id, message)
|
||||
|
||||
events_count = 0
|
||||
got_prompt_response = False
|
||||
try:
|
||||
for event in acp_client.send_message(
|
||||
message, session_id=acp_session_id
|
||||
):
|
||||
events_count += 1
|
||||
if isinstance(event, PromptResponse):
|
||||
got_prompt_response = True
|
||||
yield event
|
||||
|
||||
logger.info(
|
||||
f"[SANDBOX-ACP] send_message completed: "
|
||||
f"session={session_id} events={events_count} "
|
||||
f"got_prompt_response={got_prompt_response}"
|
||||
)
|
||||
packet_logger.log_session_end(
|
||||
session_id, success=True, events_count=events_count
|
||||
)
|
||||
except GeneratorExit:
|
||||
logger.warning(
|
||||
f"[SANDBOX-ACP] GeneratorExit: session={session_id} "
|
||||
f"events={events_count}, sending session/cancel"
|
||||
)
|
||||
try:
|
||||
acp_client.cancel(session_id=acp_session_id)
|
||||
except Exception as cancel_err:
|
||||
logger.warning(
|
||||
f"[SANDBOX-ACP] session/cancel failed on GeneratorExit: "
|
||||
f"{cancel_err}"
|
||||
)
|
||||
packet_logger.log_session_end(
|
||||
session_id,
|
||||
success=False,
|
||||
error="GeneratorExit: Client disconnected or stream closed by consumer",
|
||||
events_count=events_count,
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[SANDBOX-ACP] Exception: session={session_id} "
|
||||
f"events={events_count} error={e}, sending session/cancel"
|
||||
)
|
||||
try:
|
||||
acp_client.cancel(session_id=acp_session_id)
|
||||
except Exception as cancel_err:
|
||||
logger.warning(
|
||||
f"[SANDBOX-ACP] session/cancel failed on Exception: "
|
||||
f"{cancel_err}"
|
||||
)
|
||||
packet_logger.log_session_end(
|
||||
session_id,
|
||||
success=False,
|
||||
error=f"Exception: {str(e)}",
|
||||
events_count=events_count,
|
||||
)
|
||||
raise
|
||||
except BaseException as e:
|
||||
logger.error(
|
||||
f"[SANDBOX-ACP] {type(e).__name__}: session={session_id} "
|
||||
f"error={e}"
|
||||
)
|
||||
packet_logger.log_session_end(
|
||||
session_id,
|
||||
success=False,
|
||||
error=f"{type(e).__name__}: {str(e) if str(e) else 'System-level interruption'}",
|
||||
events_count=events_count,
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
exec_client.stop()
|
||||
# Log client stop
|
||||
packet_logger.log_acp_client_stop(sandbox_id, session_id, context="k8s")
|
||||
# Always stop the ephemeral ACP client to kill the opencode process.
|
||||
# This ensures no stale processes linger in the sandbox container.
|
||||
try:
|
||||
acp_client.stop()
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[SANDBOX-ACP] Failed to stop ephemeral ACP client: "
|
||||
f"session={session_id} error={e}"
|
||||
)
|
||||
|
||||
def list_directory(
|
||||
self, sandbox_id: UUID, session_id: UUID, path: str
|
||||
|
||||
Reference in New Issue
Block a user