Compare commits

...

1 Commits

Author SHA1 Message Date
Wenxi Onyx
aecc8db764 . 2026-02-17 15:06:26 -08:00
28 changed files with 2898 additions and 383 deletions

View File

@@ -0,0 +1,28 @@
"""Add opencode_session_id to build_session
Revision ID: 3fa4f9869b2d
Revises: 19c0ccb01687
Create Date: 2026-02-17 00:00:00.000000
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "3fa4f9869b2d"
down_revision = "19c0ccb01687"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"build_session",
sa.Column("opencode_session_id", sa.String(), nullable=True),
)
def downgrade() -> None:
op.drop_column("build_session", "opencode_session_id")

View File

@@ -4709,6 +4709,7 @@ class BuildSession(Base):
nullable=False,
)
nextjs_port: Mapped[int | None] = mapped_column(Integer, nullable=True)
opencode_session_id: Mapped[str | None] = mapped_column(String, nullable=True)
demo_data_enabled: Mapped[bool] = mapped_column(
Boolean, nullable=False, server_default=text("true")
)

View File

@@ -169,7 +169,7 @@ class MessageRequest(BaseModel):
class MessageResponse(BaseModel):
"""Response containing message details.
All message data is stored in message_metadata as JSON (the raw ACP packet).
All message data is stored in message_metadata as JSON (the raw streamed packet).
The turn_index groups all assistant responses under the user prompt they respond to.
Packet types in message_metadata:
@@ -177,7 +177,6 @@ class MessageResponse(BaseModel):
- agent_message: {type: "agent_message", content: {...}}
- agent_thought: {type: "agent_thought", content: {...}}
- tool_call_progress: {type: "tool_call_progress", status: "completed", ...}
- agent_plan_update: {type: "agent_plan_update", entries: [...]}
"""
id: str

View File

@@ -1,27 +1,23 @@
"""Build Mode packet types for streaming agent responses.
This module defines CUSTOM Onyx packet types that extend ACP (Agent Client Protocol).
ACP events are passed through directly from the agent - this module only contains
This module defines CUSTOM Onyx packet types for build-mode streaming.
Agent events are passed through directly from OpenCode - this module only contains
Onyx-specific extensions like artifacts and file operations.
All packets use SSE (Server-Sent Events) format with `event: message` and include
a `type` field to distinguish packet types.
ACP events (passed through directly from acp.schema):
Agent events (passed through directly):
- agent_message_chunk: Text/image content from agent
- agent_thought_chunk: Agent's internal reasoning
- tool_call_start: Tool invocation started
- tool_call_progress: Tool execution progress/result
- agent_plan_update: Agent's execution plan
- current_mode_update: Agent mode change
- prompt_response: Agent finished processing
- error: An error occurred
Custom Onyx packets (defined here):
- error: Onyx-specific errors (e.g., session not found)
Based on:
- Agent Client Protocol (ACP): https://agentclientprotocol.com
"""
from datetime import datetime

View File

@@ -66,6 +66,11 @@ SANDBOX_SNAPSHOTS_BUCKET = os.environ.get(
# Next.js preview server port range
SANDBOX_NEXTJS_PORT_START = int(os.environ.get("SANDBOX_NEXTJS_PORT_START", "3010"))
SANDBOX_NEXTJS_PORT_END = int(os.environ.get("SANDBOX_NEXTJS_PORT_END", "3100"))
# Per-session OpenCode server port is derived from nextjs_port + this offset.
# Example: nextjs 3010 -> opencode 13010
SANDBOX_OPENCODE_PORT_OFFSET = int(
os.environ.get("SANDBOX_OPENCODE_PORT_OFFSET", "10000")
)
# File upload configuration
MAX_UPLOAD_FILE_SIZE_MB = int(os.environ.get("BUILD_MAX_UPLOAD_FILE_SIZE_MB", "50"))
@@ -117,12 +122,25 @@ ENABLE_CRAFT = os.environ.get("ENABLE_CRAFT", "false").lower() == "true"
SSE_KEEPALIVE_INTERVAL = float(os.environ.get("SSE_KEEPALIVE_INTERVAL", "15.0"))
# ============================================================================
# ACP (Agent Communication Protocol) Configuration
# OpenCode Message Streaming Configuration
# ============================================================================
# Timeout for ACP message processing in seconds
# This is the maximum time to wait for a complete response from the agent
ACP_MESSAGE_TIMEOUT = float(os.environ.get("ACP_MESSAGE_TIMEOUT", "900.0"))
# Timeout for OpenCode message processing in seconds.
# This is the maximum time to wait for a complete response from the agent.
OPENCODE_MESSAGE_TIMEOUT = float(
os.environ.get(
"OPENCODE_MESSAGE_TIMEOUT",
os.environ.get("ACP_MESSAGE_TIMEOUT", "900.0"),
)
)
# Backwards-compatible alias for older ACP-named env usage.
ACP_MESSAGE_TIMEOUT = OPENCODE_MESSAGE_TIMEOUT
# Maximum time to wait for an OpenCode server to become healthy.
OPENCODE_SERVER_STARTUP_TIMEOUT_SECONDS = float(
os.environ.get("OPENCODE_SERVER_STARTUP_TIMEOUT_SECONDS", "30.0")
)
# ============================================================================
# Rate Limiting Configuration

View File

@@ -159,6 +159,32 @@ def update_session_status(
logger.info(f"Updated build session {session_id} status to {status}")
def update_opencode_session_id(
session_id: UUID,
opencode_session_id: str | None,
db_session: Session,
) -> None:
"""Persist the OpenCode session ID associated with a build session."""
session = (
db_session.query(BuildSession)
.filter(BuildSession.id == session_id)
.one_or_none()
)
if not session:
return
if session.opencode_session_id == opencode_session_id:
return
session.opencode_session_id = opencode_session_id
db_session.commit()
logger.info(
"Updated build session %s opencode_session_id to %s",
session_id,
opencode_session_id,
)
def delete_build_session__no_commit(
session_id: UUID,
user_id: UUID,
@@ -284,7 +310,7 @@ def create_message(
session_id: Session UUID
message_type: Type of message (USER, ASSISTANT, SYSTEM)
turn_index: 0-indexed user message number this message belongs to
message_metadata: Required structured data (the raw ACP packet JSON)
message_metadata: Required structured data (the raw streamed packet JSON)
db_session: Database session
"""
message = BuildMessage(

View File

@@ -18,23 +18,27 @@ import threading
from abc import ABC
from abc import abstractmethod
from collections.abc import Generator
from typing import Any
from uuid import UUID
from onyx.server.features.build.configs import SANDBOX_BACKEND
from onyx.server.features.build.configs import SANDBOX_OPENCODE_PORT_OFFSET
from onyx.server.features.build.configs import SandboxBackend
from onyx.server.features.build.sandbox.models import FilesystemEntry
from onyx.server.features.build.sandbox.models import LLMProviderConfig
from onyx.server.features.build.sandbox.models import SandboxInfo
from onyx.server.features.build.sandbox.models import SnapshotResult
from onyx.server.features.build.sandbox.opencode import OpenCodeEvent
from onyx.utils.logger import setup_logger
logger = setup_logger()
# ACPEvent is a union type defined in both local and kubernetes modules
# Using Any here to avoid circular imports - the actual type checking
# happens in the implementation modules
ACPEvent = Any
# Backward compatibility for legacy imports/tests.
ACPEvent = OpenCodeEvent
def get_opencode_server_port(nextjs_port: int) -> int:
"""Derive per-session OpenCode server port from the session Next.js port."""
return nextjs_port + SANDBOX_OPENCODE_PORT_OFFSET
class SandboxManager(ABC):
@@ -273,14 +277,33 @@ class SandboxManager(ABC):
"""
...
@abstractmethod
def get_opencode_server_url(
self,
sandbox_id: UUID,
nextjs_port: int,
) -> str:
"""Get URL for the session's OpenCode server.
Args:
sandbox_id: The sandbox ID
nextjs_port: The session's Next.js port
Returns:
URL for the session-scoped OpenCode server.
"""
...
@abstractmethod
def send_message(
self,
sandbox_id: UUID,
session_id: UUID,
nextjs_port: int,
message: str,
) -> Generator[ACPEvent, None, None]:
"""Send a message to the CLI agent and stream typed ACP events.
opencode_session_id: str | None = None,
) -> Generator[OpenCodeEvent, None, None]:
"""Send a message to OpenCode and stream typed events.
The agent runs in the session-specific workspace:
sessions/$session_id/
@@ -288,10 +311,12 @@ class SandboxManager(ABC):
Args:
sandbox_id: The sandbox ID
session_id: The session ID (determines workspace directory)
nextjs_port: The session's allocated Next.js port
message: The message content to send
opencode_session_id: Existing OpenCode session ID to resume
Yields:
Typed ACP schema event objects
Typed OpenCode event objects
Raises:
RuntimeError: If agent communication fails

View File

@@ -3,10 +3,10 @@
These modules are implementation details and should only be used by KubernetesSandboxManager.
"""
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
ACPEvent,
from onyx.server.features.build.sandbox.kubernetes.internal.opencode_exec_client import (
OpenCodeExecClient,
)
__all__ = [
"ACPEvent",
"OpenCodeExecClient",
]

View File

@@ -0,0 +1,272 @@
"""Run OpenCode client commands inside a sandbox pod via Kubernetes exec."""
import json
import time
from collections.abc import Generator
from kubernetes import client # type: ignore
from kubernetes.stream import stream as k8s_stream # type: ignore
from kubernetes.stream.ws_client import WSClient # type: ignore
from onyx.server.features.build.api.packet_logger import get_packet_logger
from onyx.server.features.build.configs import OPENCODE_MESSAGE_TIMEOUT
from onyx.server.features.build.configs import SSE_KEEPALIVE_INTERVAL
from onyx.server.features.build.sandbox.opencode.events import OpenCodeError
from onyx.server.features.build.sandbox.opencode.events import OpenCodeEvent
from onyx.server.features.build.sandbox.opencode.events import OpenCodePromptResponse
from onyx.server.features.build.sandbox.opencode.events import OpenCodeSSEKeepalive
from onyx.server.features.build.sandbox.opencode.parser import (
looks_like_session_not_found,
)
from onyx.server.features.build.sandbox.opencode.parser import OpenCodeEventParser
from onyx.server.features.build.sandbox.opencode.run_client import (
OpenCodeSessionNotFoundError,
)
from onyx.utils.logger import setup_logger
logger = setup_logger()
STATUS_CHANNEL = 3
def _drain_lines(buffer: str) -> tuple[list[str], str]:
"""Split buffered stream data into complete stripped lines."""
if "\n" not in buffer:
return [], buffer
parts = buffer.split("\n")
lines = [line.strip() for line in parts[:-1] if line.strip()]
return lines, parts[-1]
def _parse_exec_exit_code(status_payload: str) -> int | None:
"""Parse Kubernetes exec status channel payload for exit code."""
if not status_payload:
return None
payload = status_payload.strip().splitlines()[-1]
try:
status_obj = json.loads(payload)
except json.JSONDecodeError:
return None
causes = status_obj.get("details", {}).get("causes", [])
if not isinstance(causes, list):
return None
for cause in causes:
if not isinstance(cause, dict):
continue
if cause.get("reason") == "ExitCode":
message = cause.get("message")
if isinstance(message, str) and message.isdigit():
return int(message)
return None
class OpenCodeExecClient:
"""Run `opencode run --attach` in a pod and stream normalized events."""
def __init__(
self,
core_api: client.CoreV1Api,
pod_name: str,
namespace: str,
container: str = "sandbox",
timeout: float = OPENCODE_MESSAGE_TIMEOUT,
keepalive_interval: float = SSE_KEEPALIVE_INTERVAL,
) -> None:
self._core_api = core_api
self._pod_name = pod_name
self._namespace = namespace
self._container = container
self._timeout = timeout
self._keepalive_interval = keepalive_interval
@staticmethod
def _read_channel(ws_client: WSClient, channel: int) -> str:
try:
return ws_client.read_channel(channel)
except Exception:
return ""
def send_message(
self,
server_url: str,
message: str,
cwd: str,
session_id: str | None = None,
) -> Generator[OpenCodeEvent, None, None]:
"""Send one user message via pod exec and stream normalized OpenCode events."""
command = [
"/bin/sh",
"-c",
'cd "$1" && shift && exec "$@"',
"sh",
cwd,
"opencode",
"run",
"--attach",
server_url,
"--format",
"json",
*(["--session", session_id] if session_id else []),
message,
]
packet_logger = get_packet_logger()
packet_logger.log_raw(
"OPENCODE-K8S-RUN-START",
{
"pod_name": self._pod_name,
"namespace": self._namespace,
"container": self._container,
"cwd": cwd,
"server_url": server_url,
"session_id": session_id,
"command": command,
},
)
ws_client: WSClient | None = None
parser = OpenCodeEventParser(session_id=session_id)
stdout_buffer = ""
stderr_buffer = ""
stderr_lines: list[str] = []
status_payload = ""
start_time = time.monotonic()
last_event_time = start_time
try:
ws_client = k8s_stream(
self._core_api.connect_get_namespaced_pod_exec,
name=self._pod_name,
namespace=self._namespace,
container=self._container,
command=command,
stdin=False,
stdout=True,
stderr=True,
tty=False,
_preload_content=False,
_request_timeout=int(self._timeout) + 30,
)
while True:
elapsed = time.monotonic() - start_time
if elapsed > self._timeout:
yield OpenCodeError(
opencode_session_id=parser.session_id,
code=-1,
message=(
f"Timeout waiting for OpenCode response after {self._timeout:.1f}s"
),
)
return
if ws_client.is_open():
ws_client.update(timeout=0.5)
try:
stdout_chunk = ws_client.read_stdout(timeout=0.1)
except Exception:
stdout_chunk = ""
try:
stderr_chunk = ws_client.read_stderr(timeout=0.1)
except Exception:
stderr_chunk = ""
status_chunk = self._read_channel(ws_client, STATUS_CHANNEL)
if status_chunk:
status_payload += status_chunk
if stdout_chunk:
stdout_buffer += stdout_chunk
lines, stdout_buffer = _drain_lines(stdout_buffer)
for line in lines:
try:
raw_event = json.loads(line)
except json.JSONDecodeError:
packet_logger.log_raw(
"OPENCODE-K8S-RUN-PARSE-ERROR",
{"line": line[:500]},
)
continue
for event in parser.parse_raw_event(raw_event):
last_event_time = time.monotonic()
yield event
if stderr_chunk:
stderr_buffer += stderr_chunk
lines, stderr_buffer = _drain_lines(stderr_buffer)
stderr_lines.extend(lines)
if (
not stdout_chunk
and not stderr_chunk
and (time.monotonic() - last_event_time) >= self._keepalive_interval
):
yield OpenCodeSSEKeepalive()
last_event_time = time.monotonic()
if not ws_client.is_open() and not stdout_chunk and not stderr_chunk:
break
# Flush any trailing partial lines
trailing_stdout = stdout_buffer.strip()
if trailing_stdout:
try:
raw_event = json.loads(trailing_stdout)
for event in parser.parse_raw_event(raw_event):
yield event
except json.JSONDecodeError:
packet_logger.log_raw(
"OPENCODE-K8S-RUN-PARSE-ERROR",
{"line": trailing_stdout[:500]},
)
trailing_stderr = stderr_buffer.strip()
if trailing_stderr:
stderr_lines.append(trailing_stderr)
stderr_text = "\n".join(stderr_lines).strip()
exit_code = _parse_exec_exit_code(status_payload)
packet_logger.log_raw(
"OPENCODE-K8S-RUN-END",
{
"pod_name": self._pod_name,
"namespace": self._namespace,
"container": self._container,
"cwd": cwd,
"server_url": server_url,
"session_id": parser.session_id,
"stderr_preview": stderr_text[:500] if stderr_text else None,
"exit_code": exit_code,
},
)
if stderr_text and looks_like_session_not_found(stderr_text):
raise OpenCodeSessionNotFoundError(stderr_text)
if (exit_code not in (None, 0)) or stderr_text:
yield OpenCodeError(
opencode_session_id=parser.session_id,
code=exit_code,
message=stderr_text
or f"OpenCode run failed with exit code {exit_code}",
)
return
if not parser.saw_prompt_response:
yield OpenCodePromptResponse(
opencode_session_id=parser.session_id,
stop_reason="completed",
)
finally:
if ws_client is not None:
try:
ws_client.close()
except Exception as e:
logger.debug(f"Failed to close OpenCode exec websocket: {e}")

View File

@@ -45,6 +45,8 @@ import shlex
import tarfile
import threading
import time
import urllib.error
import urllib.request
from collections.abc import Generator
from pathlib import Path
from uuid import UUID
@@ -65,17 +67,15 @@ from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_END
from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_START
from onyx.server.features.build.configs import SANDBOX_S3_BUCKET
from onyx.server.features.build.configs import SANDBOX_SERVICE_ACCOUNT_NAME
from onyx.server.features.build.sandbox.base import get_opencode_server_port
from onyx.server.features.build.sandbox.base import SandboxManager
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
ACPEvent,
)
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
ACPExecClient,
)
from onyx.server.features.build.sandbox.models import FilesystemEntry
from onyx.server.features.build.sandbox.models import LLMProviderConfig
from onyx.server.features.build.sandbox.models import SandboxInfo
from onyx.server.features.build.sandbox.models import SnapshotResult
from onyx.server.features.build.sandbox.opencode import OpenCodeEvent
from onyx.server.features.build.sandbox.opencode import OpenCodeHttpClient
from onyx.server.features.build.sandbox.opencode import OpenCodeSessionNotFoundError
from onyx.server.features.build.sandbox.util.agent_instructions import (
ATTACHMENTS_SECTION_CONTENT,
)
@@ -148,6 +148,79 @@ echo $NEXTJS_PID > {session_path}/nextjs.pid
"""
def _build_opencode_server_start_script(session_path: str, nextjs_port: int) -> str:
"""Build shell script to ensure the session's OpenCode server is running."""
opencode_port = get_opencode_server_port(nextjs_port)
opencode_url = f"http://127.0.0.1:{opencode_port}/config"
pid_path = f"{session_path}/opencode_server.pid"
log_path = f"{session_path}/opencode_server.log"
return f"""
is_opencode_healthy() {{
python - <<'PY'
import sys
import urllib.request
try:
urllib.request.urlopen("{opencode_url}", timeout=1)
except Exception:
sys.exit(1)
sys.exit(0)
PY
}}
wait_for_opencode() {{
python - <<'PY'
import sys
import time
import urllib.request
deadline = time.time() + 30
while time.time() < deadline:
try:
urllib.request.urlopen("{opencode_url}", timeout=1)
sys.exit(0)
except Exception:
time.sleep(0.25)
sys.exit(1)
PY
}}
OPENCODE_NEEDS_START=1
if [ -f {pid_path} ]; then
OPENCODE_PID=$(cat {pid_path} 2>/dev/null || true)
if [ -n "$OPENCODE_PID" ] && kill -0 "$OPENCODE_PID" 2>/dev/null; then
if is_opencode_healthy; then
OPENCODE_NEEDS_START=0
echo "OpenCode server already running (PID: $OPENCODE_PID)"
else
echo "Stopping stale OpenCode server (PID: $OPENCODE_PID)"
kill "$OPENCODE_PID" 2>/dev/null || true
fi
fi
fi
if [ "$OPENCODE_NEEDS_START" -eq 1 ] && is_opencode_healthy; then
OPENCODE_NEEDS_START=0
echo "OpenCode server already reachable on port {opencode_port}"
fi
if [ "$OPENCODE_NEEDS_START" -eq 1 ]; then
echo "Starting OpenCode server on port {opencode_port}"
nohup opencode serve --hostname 127.0.0.1 --port {opencode_port} > {log_path} 2>&1 &
NEW_OPENCODE_PID=$!
echo "$NEW_OPENCODE_PID" > {pid_path}
fi
if ! wait_for_opencode; then
echo "ERROR: OpenCode server failed to become healthy at {opencode_url}" >&2
exit 1
fi
echo "OpenCode server ready on port {opencode_port}"
"""
def _get_local_aws_credential_env_vars() -> list[client.V1EnvVar]:
"""Get AWS credential environment variables from local environment.
@@ -379,6 +452,59 @@ class KubernetesSandboxManager(SandboxManager):
service_name = self._get_service_name(sandbox_id)
return f"http://{service_name}.{self._namespace}.svc.cluster.local:{port}"
def _get_pod_ip(self, sandbox_id: UUID) -> str | None:
"""Fetch current sandbox pod IP for direct HTTP access."""
pod_name = self._get_pod_name(str(sandbox_id))
try:
pod = self._core_api.read_namespaced_pod(
name=pod_name,
namespace=self._namespace,
)
except ApiException as e:
logger.warning("Failed to fetch pod %s for IP lookup: %s", pod_name, e)
return None
except Exception as e:
logger.warning(
"Unexpected error fetching pod %s for IP lookup: %s", pod_name, e
)
return None
pod_status = getattr(pod, "status", None)
pod_ip = getattr(pod_status, "pod_ip", None) if pod_status else None
return pod_ip if isinstance(pod_ip, str) and pod_ip else None
@staticmethod
def _is_opencode_http_reachable(server_url: str) -> bool:
"""Check if OpenCode server is reachable over HTTP from this process."""
try:
with urllib.request.urlopen(f"{server_url}/config", timeout=2):
return True
except (urllib.error.URLError, TimeoutError, Exception):
return False
def _ensure_opencode_server(
self,
pod_name: str,
session_path: str,
nextjs_port: int,
) -> None:
"""Ensure the session's OpenCode server is running and healthy."""
script = f"""
set -e
{_build_opencode_server_start_script(session_path=session_path, nextjs_port=nextjs_port)}
"""
k8s_stream(
self._stream_core_api.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=self._namespace,
container="sandbox",
command=["/bin/sh", "-c", script],
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
def _load_agent_instructions(
self,
files_path: Path | None = None,
@@ -1374,6 +1500,13 @@ echo "Session workspace setup complete"
)
logger.debug(f"Session setup output: {exec_response}")
# Start and verify per-session OpenCode server.
self._ensure_opencode_server(
pod_name=pod_name,
session_path=session_path,
nextjs_port=nextjs_port,
)
logger.info(
f"Set up session workspace {session_id} in sandbox {sandbox_id}"
)
@@ -1416,6 +1549,13 @@ if [ -f {session_path}/nextjs.pid ]; then
kill $NEXTJS_PID 2>/dev/null || true
fi
# Kill OpenCode server if running
if [ -f {session_path}/opencode_server.pid ]; then
OPENCODE_PID=$(cat {session_path}/opencode_server.pid)
echo "Stopping OpenCode server (PID: $OPENCODE_PID)"
kill $OPENCODE_PID 2>/dev/null || true
fi
echo "Removing session directory: {session_path}"
rm -rf {session_path}
echo "Session cleanup complete"
@@ -1699,6 +1839,13 @@ echo "SNAPSHOT_RESTORED"
stdout=True,
tty=False,
)
# Start and verify per-session OpenCode server after wake/restore.
self._ensure_opencode_server(
pod_name=pod_name,
session_path=session_path,
nextjs_port=nextjs_port,
)
except ApiException as e:
raise RuntimeError(f"Failed to restore snapshot: {e}") from e
@@ -1800,94 +1947,125 @@ echo "Session config regeneration complete"
True if sandbox is healthy, False otherwise
"""
pod_name = self._get_pod_name(str(sandbox_id))
exec_client = ACPExecClient(
pod_name=pod_name,
namespace=self._namespace,
container="sandbox",
)
return exec_client.health_check(timeout=timeout)
start_time = time.time()
while time.time() - start_time < timeout:
try:
resp = k8s_stream(
self._stream_core_api.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=self._namespace,
container="sandbox",
command=["/bin/sh", "-c", "echo HEALTHY"],
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
return "HEALTHY" in resp
except ApiException as e:
if e.status == 404:
return False
logger.debug(f"Health check exec failed for pod {pod_name}: {e}")
time.sleep(1)
except Exception as e:
logger.debug(f"Health check error for pod {pod_name}: {e}")
time.sleep(1)
return False
def send_message(
self,
sandbox_id: UUID,
session_id: UUID,
nextjs_port: int,
message: str,
) -> Generator[ACPEvent, None, None]:
"""Send a message to the CLI agent and stream ACP events.
opencode_session_id: str | None = None,
) -> Generator[OpenCodeEvent, None, None]:
"""Send a message to OpenCode and stream typed events.
Runs `opencode acp` via kubectl exec in the sandbox pod.
The agent runs in the session-specific workspace.
Uses OpenCode HTTP APIs (`/event` + `prompt_async`) over pod networking
for true incremental SSE chunks.
Args:
sandbox_id: The sandbox ID
session_id: The session ID (determines workspace directory)
nextjs_port: Allocated Next.js port (used to derive OpenCode port)
message: The message content to send
opencode_session_id: Existing OpenCode session ID to resume
Yields:
Typed ACP schema event objects
Typed OpenCode event objects
"""
packet_logger = get_packet_logger()
pod_name = self._get_pod_name(str(sandbox_id))
session_path = f"/workspace/sessions/{session_id}"
http_server_url = self.get_opencode_server_url(sandbox_id, nextjs_port)
# Log ACP client creation
packet_logger.log_acp_client_start(
sandbox_id, session_id, session_path, context="k8s"
)
exec_client = ACPExecClient(
# Ensure per-session OpenCode server is running before attaching a client.
self._ensure_opencode_server(
pod_name=pod_name,
namespace=self._namespace,
container="sandbox",
session_path=session_path,
nextjs_port=nextjs_port,
)
# Log OpenCode client creation
packet_logger.log_acp_client_start(
sandbox_id, session_id, session_path, context="k8s-opencode"
)
# Log the send_message call at sandbox manager level
packet_logger.log_session_start(session_id, sandbox_id, message)
events_count = 0
try:
exec_client.start(cwd=session_path)
for event in exec_client.send_message(message):
events_count += 1
yield event
success = False
# Log successful completion
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
if not self._is_opencode_http_reachable(http_server_url):
raise RuntimeError(
"OpenCode HTTP SSE endpoint is not reachable "
f"at {http_server_url} for sandbox {sandbox_id}"
)
except GeneratorExit:
# Generator was closed by consumer (client disconnect, timeout, broken pipe)
# This is the most common failure mode for SSE streaming
packet_logger.log_session_end(
session_id,
success=False,
error="GeneratorExit: Client disconnected or stream closed by consumer",
events_count=events_count,
def _stream_via_http(
session_id_override: str | None,
) -> Generator[OpenCodeEvent, None, None]:
http_client = OpenCodeHttpClient(
server_url=http_server_url,
session_id=session_id_override,
cwd=session_path,
)
raise
yield from http_client.send_message(message)
try:
try:
for event in _stream_via_http(opencode_session_id):
events_count += 1
yield event
except OpenCodeSessionNotFoundError:
logger.warning(
"OpenCode session %s not found for build session %s in pod %s. "
"Retrying with a fresh OpenCode session.",
opencode_session_id,
session_id,
pod_name,
)
for event in _stream_via_http(None):
events_count += 1
yield event
success = True
except Exception as e:
# Log failure from normal exceptions
packet_logger.log_session_end(
session_id,
success=False,
error=f"Exception: {str(e)}",
events_count=events_count,
)
raise
except BaseException as e:
# Log failure from other base exceptions (SystemExit, KeyboardInterrupt, etc.)
exception_type = type(e).__name__
packet_logger.log_session_end(
session_id,
success=False,
error=f"{exception_type}: {str(e) if str(e) else 'System-level interruption'}",
error=str(e),
events_count=events_count,
)
raise
finally:
exec_client.stop()
# Log client stop
packet_logger.log_acp_client_stop(sandbox_id, session_id, context="k8s")
if success:
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
)
packet_logger.log_acp_client_stop(
sandbox_id, session_id, context="k8s-opencode"
)
def list_directory(
self, sandbox_id: UUID, session_id: UUID, path: str
@@ -2100,6 +2278,24 @@ echo "Session config regeneration complete"
"""
return self._get_nextjs_url(str(sandbox_id), port)
def get_opencode_server_url(
self,
sandbox_id: UUID,
nextjs_port: int,
) -> str:
"""Get the OpenCode server URL reachable from the API server process.
Uses pod IP networking for direct HTTP access to per-session OpenCode
servers.
"""
pod_ip = self._get_pod_ip(sandbox_id)
opencode_port = get_opencode_server_port(nextjs_port)
if not pod_ip:
raise RuntimeError(
"Sandbox pod IP unavailable; cannot construct OpenCode HTTP URL"
)
return f"http://{pod_ip}:{opencode_port}"
def generate_pptx_preview(
self,
sandbox_id: UUID,

View File

@@ -1,19 +1,11 @@
"""Local filesystem-based sandbox implementation.
"""Local filesystem-based sandbox implementation."""
This module provides the LocalSandboxManager for development and single-node
deployments that run sandboxes as directories on the local filesystem.
"""
from onyx.server.features.build.sandbox.local.agent_client import ACPAgentClient
from onyx.server.features.build.sandbox.local.agent_client import ACPEvent
from onyx.server.features.build.sandbox.local.local_sandbox_manager import (
LocalSandboxManager,
)
from onyx.server.features.build.sandbox.local.process_manager import ProcessManager
__all__ = [
"ACPAgentClient",
"ACPEvent",
"LocalSandboxManager",
"ProcessManager",
]

View File

@@ -11,20 +11,24 @@ import mimetypes
import re
import subprocess
import threading
import time
import urllib.error
import urllib.request
from collections.abc import Generator
from pathlib import Path
from uuid import UUID
from onyx.db.enums import SandboxStatus
from onyx.file_store.file_store import get_default_file_store
from onyx.server.features.build.api.packet_logger import get_packet_logger
from onyx.server.features.build.configs import DEMO_DATA_PATH
from onyx.server.features.build.configs import OPENCODE_DISABLED_TOOLS
from onyx.server.features.build.configs import OPENCODE_SERVER_STARTUP_TIMEOUT_SECONDS
from onyx.server.features.build.configs import OUTPUTS_TEMPLATE_PATH
from onyx.server.features.build.configs import SANDBOX_BASE_PATH
from onyx.server.features.build.configs import VENV_TEMPLATE_PATH
from onyx.server.features.build.sandbox.base import get_opencode_server_port
from onyx.server.features.build.sandbox.base import SandboxManager
from onyx.server.features.build.sandbox.local.agent_client import ACPAgentClient
from onyx.server.features.build.sandbox.local.agent_client import ACPEvent
from onyx.server.features.build.sandbox.local.process_manager import ProcessManager
from onyx.server.features.build.sandbox.manager.directory_manager import (
DirectoryManager,
@@ -34,6 +38,9 @@ from onyx.server.features.build.sandbox.models import FilesystemEntry
from onyx.server.features.build.sandbox.models import LLMProviderConfig
from onyx.server.features.build.sandbox.models import SandboxInfo
from onyx.server.features.build.sandbox.models import SnapshotResult
from onyx.server.features.build.sandbox.opencode import OpenCodeEvent
from onyx.server.features.build.sandbox.opencode import OpenCodeHttpClient
from onyx.server.features.build.sandbox.opencode import OpenCodeSessionNotFoundError
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -84,9 +91,10 @@ class LocalSandboxManager(SandboxManager):
self._process_manager = ProcessManager()
self._snapshot_manager = SnapshotManager(get_default_file_store())
# Track ACP clients in memory - keyed by (sandbox_id, session_id) tuple
# Each session within a sandbox has its own ACP client
self._acp_clients: dict[tuple[UUID, UUID], ACPAgentClient] = {}
# Track per-session OpenCode server processes.
# Keyed by (sandbox_id, session_id) because each Craft session has its
# own OpenCode server working directory and session state.
self._opencode_servers: dict[tuple[UUID, UUID], subprocess.Popen[bytes]] = {}
# Track Next.js processes - keyed by (sandbox_id, session_id) tuple
# Used for clean shutdown when sessions are deleted
@@ -152,6 +160,93 @@ class LocalSandboxManager(SandboxManager):
"""
return self._get_sandbox_path(sandbox_id) / "sessions" / str(session_id)
def _get_opencode_server_url_for_nextjs_port(self, nextjs_port: int) -> str:
"""Build local OpenCode server URL from session Next.js port."""
return f"http://127.0.0.1:{get_opencode_server_port(nextjs_port)}"
def _wait_for_opencode_server(
self,
url: str,
process: subprocess.Popen[bytes],
timeout: float = OPENCODE_SERVER_STARTUP_TIMEOUT_SECONDS,
) -> bool:
"""Poll OpenCode server readiness via /config endpoint."""
start_time = time.time()
while time.time() - start_time < timeout:
if process.poll() is not None:
return False
try:
with urllib.request.urlopen(f"{url}/config", timeout=2):
return True
except (urllib.error.URLError, TimeoutError, Exception):
time.sleep(0.25)
return False
def _start_opencode_server(
self,
sandbox_id: UUID,
session_id: UUID,
session_path: Path,
nextjs_port: int,
) -> subprocess.Popen[bytes]:
"""Start a per-session OpenCode server process."""
key = (sandbox_id, session_id)
existing = self._opencode_servers.pop(key, None)
if existing is not None:
self._stop_opencode_server_process(existing, session_id)
server_url = self._get_opencode_server_url_for_nextjs_port(nextjs_port)
server_log = session_path / "opencode_server.log"
with server_log.open("ab") as log_file:
process = subprocess.Popen(
[
"opencode",
"serve",
"--hostname",
"127.0.0.1",
"--port",
str(get_opencode_server_port(nextjs_port)),
],
cwd=session_path,
stdout=log_file,
stderr=log_file,
)
if not self._wait_for_opencode_server(server_url, process):
self._stop_opencode_server_process(process, session_id)
raise RuntimeError(
"OpenCode server failed to start "
f"for session {session_id} at {server_url}"
)
self._opencode_servers[key] = process
logger.info(
"Started OpenCode server for session %s on %s", session_id, server_url
)
return process
def _stop_opencode_server_process(
self,
process: subprocess.Popen[bytes],
session_id: UUID,
) -> None:
"""Stop an OpenCode server process."""
if process.poll() is not None:
return
try:
self._process_manager.terminate_process(process.pid)
logger.debug(
"Stopped OpenCode server (PID %s) for session %s",
process.pid,
session_id,
)
except Exception as e:
logger.warning(
f"Failed to stop OpenCode server for session {session_id}: {e}"
)
def _setup_filtered_files(
self,
session_path: Path,
@@ -316,7 +411,7 @@ class LocalSandboxManager(SandboxManager):
"""Terminate a sandbox and clean up all resources.
1. Stop all Next.js processes for this sandbox
2. Stop all ACP clients for this sandbox (terminates agent subprocesses)
2. Stop all OpenCode servers for this sandbox
3. Cleanup sandbox directory
Args:
@@ -342,19 +437,19 @@ class LocalSandboxManager(SandboxManager):
f"session {session_id}: {e}"
)
# Stop all ACP clients for this sandbox (keyed by (sandbox_id, session_id))
clients_to_stop = [
(key, client)
for key, client in self._acp_clients.items()
# Stop all OpenCode servers for this sandbox.
servers_to_stop = [
(key, process)
for key, process in self._opencode_servers.items()
if key[0] == sandbox_id
]
for key, client in clients_to_stop:
for key, process in servers_to_stop:
try:
client.stop()
del self._acp_clients[key]
self._stop_opencode_server_process(process, key[1])
del self._opencode_servers[key]
except Exception as e:
logger.warning(
f"Failed to stop ACP client for sandbox {sandbox_id}, "
f"Failed to stop OpenCode server for sandbox {sandbox_id}, "
f"session {key[1]}: {e}"
)
@@ -538,6 +633,14 @@ class LocalSandboxManager(SandboxManager):
)
logger.debug("Agent instructions ready")
# Start per-session OpenCode server.
self._start_opencode_server(
sandbox_id=sandbox_id,
session_id=session_id,
session_path=session_path,
nextjs_port=nextjs_port,
)
logger.info(f"Set up session workspace {session_id} at {session_path}")
except Exception as e:
@@ -563,7 +666,7 @@ class LocalSandboxManager(SandboxManager):
"""Clean up a session workspace (on session delete).
1. Stop Next.js dev server if running
2. Stop ACP client for this session
2. Stop OpenCode server for this session
3. Remove session directory
Does NOT terminate the sandbox - other sessions may still be using it.
@@ -582,16 +685,16 @@ class LocalSandboxManager(SandboxManager):
# Fallback: find by port (e.g., if server was restarted)
self._stop_nextjs_server_on_port(nextjs_port, session_id)
# Stop ACP client for this session
client_key = (sandbox_id, session_id)
client = self._acp_clients.pop(client_key, None)
if client:
# Stop OpenCode server for this session.
server_key = (sandbox_id, session_id)
server_process = self._opencode_servers.pop(server_key, None)
if server_process:
try:
client.stop()
logger.debug(f"Stopped ACP client for session {session_id}")
self._stop_opencode_server_process(server_process, session_id)
logger.debug(f"Stopped OpenCode server for session {session_id}")
except Exception as e:
logger.warning(
f"Failed to stop ACP client for session {session_id}: {e}"
f"Failed to stop OpenCode server for session {session_id}: {e}"
)
# Cleanup session directory
@@ -806,77 +909,100 @@ class LocalSandboxManager(SandboxManager):
return False
return True
def get_opencode_server_url(
self, sandbox_id: UUID, nextjs_port: int # noqa: ARG002
) -> str:
"""Get the OpenCode server URL for a session in local mode."""
return self._get_opencode_server_url_for_nextjs_port(nextjs_port)
def send_message(
self,
sandbox_id: UUID,
session_id: UUID,
nextjs_port: int,
message: str,
) -> Generator[ACPEvent, None, None]:
"""Send a message to the CLI agent and stream typed ACP events.
The agent runs in the session-specific workspace:
sessions/$session_id/
Yields ACPEvent objects:
- AgentMessageChunk: Text/image content from agent
- AgentThoughtChunk: Agent's internal reasoning
- ToolCallStart: Tool invocation started
- ToolCallProgress: Tool execution progress/result
- AgentPlanUpdate: Agent's execution plan
- CurrentModeUpdate: Agent mode change
- PromptResponse: Agent finished (has stop_reason)
- Error: An error occurred
opencode_session_id: str | None = None,
) -> Generator[OpenCodeEvent, None, None]:
"""Send a message to OpenCode and stream normalized events.
Args:
sandbox_id: The sandbox ID
session_id: The session ID (determines workspace directory)
nextjs_port: Allocated Next.js port (used to derive OpenCode port)
message: The message content to send
Yields:
Typed ACP schema event objects
opencode_session_id: Existing OpenCode session ID to resume
"""
from onyx.server.features.build.api.packet_logger import get_packet_logger
packet_logger = get_packet_logger()
# Get or create ACP client for this session
client_key = (sandbox_id, session_id)
client = self._acp_clients.get(client_key)
if client is None or not client.is_running:
session_path = self._get_session_path(sandbox_id, session_id)
# Log client creation
packet_logger.log_acp_client_start(
sandbox_id, session_id, str(session_path), context="local"
)
logger.info(
f"Creating new ACP client for sandbox {sandbox_id}, session {session_id}"
session_path = self._get_session_path(sandbox_id, session_id)
if not session_path.exists():
raise RuntimeError(
f"Session workspace not found for sandbox={sandbox_id} session={session_id}"
)
# Create and start ACP client for this session
client = ACPAgentClient(cwd=str(session_path))
self._acp_clients[client_key] = client
server_key = (sandbox_id, session_id)
server_process = self._opencode_servers.get(server_key)
if server_process is None or server_process.poll() is not None:
self._start_opencode_server(
sandbox_id=sandbox_id,
session_id=session_id,
session_path=session_path,
nextjs_port=nextjs_port,
)
server_url = self.get_opencode_server_url(sandbox_id, nextjs_port)
packet_logger.log_acp_client_start(
sandbox_id,
session_id,
str(session_path),
context="local-opencode",
)
# Log the send_message call at sandbox manager level
packet_logger.log_session_start(session_id, sandbox_id, message)
events_count = 0
success = False
try:
for event in client.send_message(message):
events_count += 1
yield event
# Log successful completion
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
http_client = OpenCodeHttpClient(
server_url=server_url,
session_id=opencode_session_id,
cwd=str(session_path),
)
try:
for event in http_client.send_message(message):
events_count += 1
yield event
except OpenCodeSessionNotFoundError:
# Server lost this session (e.g. restarted). Retry once without session id.
logger.warning(
"OpenCode session %s not found for build session %s. "
"Retrying with a fresh OpenCode session.",
opencode_session_id,
session_id,
)
http_client = OpenCodeHttpClient(
server_url=server_url,
session_id=None,
cwd=str(session_path),
)
for event in http_client.send_message(message):
events_count += 1
yield event
success = True
except Exception as e:
# Log failure
packet_logger.log_session_end(
session_id, success=False, error=str(e), events_count=events_count
)
raise
finally:
if success:
packet_logger.log_session_end(
session_id, success=True, events_count=events_count
)
packet_logger.log_acp_client_stop(
sandbox_id, session_id, context="local-opencode"
)
def _sanitize_path(self, path: str) -> str:
"""Sanitize a user-provided path to prevent path traversal attacks.

View File

@@ -15,8 +15,6 @@ from uuid import UUID
from uuid import uuid4
import pytest
from acp.schema import PromptResponse
from acp.schema import ToolCallStart
from sqlalchemy.orm import Session
from onyx.db.engine.sql_engine import get_session_with_current_tenant
@@ -32,10 +30,12 @@ from onyx.server.features.build.configs import SANDBOX_BASE_PATH
from onyx.server.features.build.db.build_session import allocate_nextjs_port
from onyx.server.features.build.sandbox import get_sandbox_manager
from onyx.server.features.build.sandbox.local import LocalSandboxManager
from onyx.server.features.build.sandbox.local.agent_client import ACPEvent
from onyx.server.features.build.sandbox.models import FilesystemEntry
from onyx.server.features.build.sandbox.models import LLMProviderConfig
from onyx.server.features.build.sandbox.models import SnapshotResult
from onyx.server.features.build.sandbox.opencode import OpenCodeEvent
from onyx.server.features.build.sandbox.opencode import OpenCodePromptResponse
from onyx.server.features.build.sandbox.opencode import OpenCodeToolCallStart
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
@@ -190,8 +190,8 @@ def session_workspace(
sandbox_record: Sandbox,
build_session_record: BuildSession,
db_session: Session,
) -> Generator[tuple[Sandbox, UUID], None, None]:
"""Set up a session workspace within the sandbox and return (sandbox, session_id)."""
) -> Generator[tuple[Sandbox, UUID, int], None, None]:
"""Set up a session workspace and return (sandbox, session_id, nextjs_port)."""
session_id = build_session_record.id
# Use setup_session_workspace to create the session directory structure
@@ -218,7 +218,7 @@ def session_workspace(
file_system_path=SANDBOX_BASE_PATH,
)
yield sandbox_record, session_id
yield sandbox_record, session_id, nextjs_port
# Cleanup session workspace
sandbox_manager.cleanup_session_workspace(
@@ -263,7 +263,7 @@ class TestCreateSnapshot:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
file_store_initialized: None, # noqa: ARG002
) -> None:
@@ -271,7 +271,7 @@ class TestCreateSnapshot:
Note: Caller is responsible for creating DB record from the SnapshotResult.
"""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
sandbox_path = Path(SANDBOX_BASE_PATH) / str(sandbox.id)
outputs_dir = sandbox_path / "sessions" / str(session_id) / "outputs"
(outputs_dir / "app.py").write_text("print('hello')")
@@ -309,11 +309,11 @@ class TestListDirectory:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test that list_directory returns filesystem entries."""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
sandbox_path = Path(SANDBOX_BASE_PATH) / str(sandbox.id)
outputs_dir = sandbox_path / "sessions" / str(session_id)
(outputs_dir / "file.txt").write_text("content")
@@ -334,11 +334,11 @@ class TestReadFile:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test that read_file returns file contents as bytes."""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
sandbox_path = Path(SANDBOX_BASE_PATH) / str(sandbox.id)
outputs_dir = sandbox_path / "sessions" / str(session_id) / "outputs"
(outputs_dir / "test.txt").write_bytes(b"Hello, World!")
@@ -355,19 +355,19 @@ class TestSendMessage:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test that send_message streams ACPEvent objects and ends with PromptResponse.
"""Test that send_message streams OpenCode events and ends with prompt_response.
Note: Heartbeat update is now handled by the caller (SessionManager),
not by the SandboxManager itself.
"""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
events: list[ACPEvent] = []
events: list[OpenCodeEvent] = []
for event in sandbox_manager.send_message(
sandbox.id, session_id, "What is 2 + 2?"
sandbox.id, session_id, nextjs_port, "What is 2 + 2?"
):
events.append(event)
@@ -376,30 +376,31 @@ class TestSendMessage:
# Last event should be PromptResponse (success) or contain results
last_event = events[-1]
assert isinstance(last_event, PromptResponse)
assert isinstance(last_event, OpenCodePromptResponse)
def test_send_message_write_file(
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test that send_message can write files and emits edit tool calls."""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
sandbox_path = Path(SANDBOX_BASE_PATH) / str(sandbox.id)
session_path = sandbox_path / "sessions" / str(session_id)
events: list[ACPEvent] = []
events: list[OpenCodeEvent] = []
for event in sandbox_manager.send_message(
sandbox.id,
session_id,
nextjs_port,
"Create a file called hello.txt with the content 'Hello, World!'",
):
events.append(event)
# Should have at least one ToolCallStart with kind='edit'
tool_calls = [e for e in events if isinstance(e, ToolCallStart)]
tool_calls = [e for e in events if isinstance(e, OpenCodeToolCallStart)]
edit_tool_calls = [tc for tc in tool_calls if tc.kind == "edit"]
assert len(edit_tool_calls) >= 1, (
f"Expected at least one edit tool call, got {len(edit_tool_calls)}. "
@@ -408,7 +409,7 @@ class TestSendMessage:
# Last event should be PromptResponse
last_event = events[-1]
assert isinstance(last_event, PromptResponse)
assert isinstance(last_event, OpenCodePromptResponse)
# Verify the file was actually created (agent writes relative to session root)
created_file = session_path / "hello.txt"
@@ -419,11 +420,11 @@ class TestSendMessage:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test that send_message can read files and emits read tool calls."""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
sandbox_path = Path(SANDBOX_BASE_PATH) / str(sandbox.id)
session_path = sandbox_path / "sessions" / str(session_id)
@@ -431,16 +432,17 @@ class TestSendMessage:
test_file = session_path / "secret.txt"
test_file.write_text("The secret code is 12345")
events: list[ACPEvent] = []
events: list[OpenCodeEvent] = []
for event in sandbox_manager.send_message(
sandbox.id,
session_id,
nextjs_port,
"Read the file secret.txt and tell me what the secret code is",
):
events.append(event)
# Should have at least one ToolCallStart with kind='read'
tool_calls = [e for e in events if isinstance(e, ToolCallStart)]
tool_calls = [e for e in events if isinstance(e, OpenCodeToolCallStart)]
read_tool_calls = [tc for tc in tool_calls if tc.kind == "read"]
assert len(read_tool_calls) >= 1, (
f"Expected at least one read tool call, got {len(read_tool_calls)}. "
@@ -449,7 +451,7 @@ class TestSendMessage:
# Last event should be PromptResponse
last_event = events[-1]
assert isinstance(last_event, PromptResponse)
assert isinstance(last_event, OpenCodePromptResponse)
class TestUploadFile:
@@ -459,11 +461,11 @@ class TestUploadFile:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test that upload_file creates a file in the attachments directory."""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
content = b"Hello, World!"
result = sandbox_manager.upload_file(
@@ -484,11 +486,11 @@ class TestUploadFile:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test that upload_file renames files on collision."""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
# Upload first file
sandbox_manager.upload_file(sandbox.id, session_id, "test.txt", b"first")
@@ -508,11 +510,11 @@ class TestDeleteFile:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test that delete_file removes a file."""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
# Upload a file first
sandbox_manager.upload_file(sandbox.id, session_id, "test.txt", b"content")
@@ -535,11 +537,11 @@ class TestDeleteFile:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test that delete_file returns False for non-existent file."""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
result = sandbox_manager.delete_file(
sandbox.id, session_id, "attachments/nonexistent.txt"
@@ -551,11 +553,11 @@ class TestDeleteFile:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test that delete_file rejects path traversal attempts."""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
with pytest.raises(ValueError, match="path traversal"):
sandbox_manager.delete_file(sandbox.id, session_id, "../../../etc/passwd")
@@ -568,11 +570,11 @@ class TestGetUploadStats:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test get_upload_stats returns zeros for empty directory."""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
file_count, total_size = sandbox_manager.get_upload_stats(
sandbox.id, session_id
@@ -585,11 +587,11 @@ class TestGetUploadStats:
self,
sandbox_manager: LocalSandboxManager,
db_session: Session, # noqa: ARG002
session_workspace: tuple[Sandbox, UUID],
session_workspace: tuple[Sandbox, UUID, int],
tenant_context: None, # noqa: ARG002
) -> None:
"""Test get_upload_stats returns correct count and size."""
sandbox, session_id = session_workspace
sandbox, session_id, nextjs_port = session_workspace
# Upload some files
sandbox_manager.upload_file(

View File

@@ -0,0 +1,33 @@
"""OpenCode server/client primitives for Craft sandbox execution."""
from onyx.server.features.build.sandbox.opencode.events import OpenCodeAgentMessageChunk
from onyx.server.features.build.sandbox.opencode.events import OpenCodeAgentThoughtChunk
from onyx.server.features.build.sandbox.opencode.events import OpenCodeError
from onyx.server.features.build.sandbox.opencode.events import OpenCodeEvent
from onyx.server.features.build.sandbox.opencode.events import OpenCodePromptResponse
from onyx.server.features.build.sandbox.opencode.events import (
OpenCodeSessionEstablished,
)
from onyx.server.features.build.sandbox.opencode.events import OpenCodeSSEKeepalive
from onyx.server.features.build.sandbox.opencode.events import OpenCodeToolCallProgress
from onyx.server.features.build.sandbox.opencode.events import OpenCodeToolCallStart
from onyx.server.features.build.sandbox.opencode.http_client import OpenCodeHttpClient
from onyx.server.features.build.sandbox.opencode.run_client import OpenCodeRunClient
from onyx.server.features.build.sandbox.opencode.run_client import (
OpenCodeSessionNotFoundError,
)
__all__ = [
"OpenCodeEvent",
"OpenCodeSSEKeepalive",
"OpenCodeAgentMessageChunk",
"OpenCodeAgentThoughtChunk",
"OpenCodeToolCallStart",
"OpenCodeToolCallProgress",
"OpenCodePromptResponse",
"OpenCodeError",
"OpenCodeSessionEstablished",
"OpenCodeHttpClient",
"OpenCodeRunClient",
"OpenCodeSessionNotFoundError",
]

View File

@@ -0,0 +1,125 @@
"""Typed event models for OpenCode streaming in Craft."""
from dataclasses import dataclass
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import Literal
from pydantic import BaseModel
from pydantic import Field
def utc_now_iso() -> str:
"""Return current UTC time in ISO format."""
return datetime.now(tz=timezone.utc).isoformat()
def timestamp_ms_to_iso(timestamp_ms: int | float | None) -> str:
"""Convert millisecond epoch timestamp to UTC ISO string."""
if timestamp_ms is None:
return utc_now_iso()
try:
return datetime.fromtimestamp(
float(timestamp_ms) / 1000.0, tz=timezone.utc
).isoformat()
except (TypeError, ValueError, OSError):
return utc_now_iso()
class OpenCodeTextContent(BaseModel):
"""Text content block used by streamed text/reasoning packets."""
type: Literal["text"] = "text"
text: str
class OpenCodeAgentMessageChunk(BaseModel):
"""Assistant text chunk event."""
type: Literal["agent_message_chunk"] = "agent_message_chunk"
opencode_session_id: str | None = None
content: OpenCodeTextContent
timestamp: str = Field(default_factory=utc_now_iso)
class OpenCodeAgentThoughtChunk(BaseModel):
"""Assistant reasoning/thought chunk event."""
type: Literal["agent_thought_chunk"] = "agent_thought_chunk"
opencode_session_id: str | None = None
content: OpenCodeTextContent
timestamp: str = Field(default_factory=utc_now_iso)
class OpenCodeToolCallBase(BaseModel):
"""Shared fields for synthesized tool call packets."""
opencode_session_id: str | None = None
tool_call_id: str
tool_name: str
kind: str | None = None
title: str | None = None
content: list[dict[str, Any]] | None = None
locations: list[str] | None = None
raw_input: dict[str, Any] | None = None
raw_output: dict[str, Any] | None = None
status: str | None = None
timestamp: str = Field(default_factory=utc_now_iso)
class OpenCodeToolCallStart(OpenCodeToolCallBase):
"""Tool call start event."""
type: Literal["tool_call_start"] = "tool_call_start"
class OpenCodeToolCallProgress(OpenCodeToolCallBase):
"""Tool call update event."""
type: Literal["tool_call_progress"] = "tool_call_progress"
class OpenCodePromptResponse(BaseModel):
"""Turn completion event."""
type: Literal["prompt_response"] = "prompt_response"
opencode_session_id: str | None = None
stop_reason: str | None = None
timestamp: str = Field(default_factory=utc_now_iso)
class OpenCodeError(BaseModel):
"""Error event from OpenCode client/server interaction."""
type: Literal["error"] = "error"
opencode_session_id: str | None = None
code: int | str | None = None
message: str
data: dict[str, Any] | None = None
timestamp: str = Field(default_factory=utc_now_iso)
class OpenCodeSessionEstablished(BaseModel):
"""Internal event emitted when a session ID is discovered/created."""
type: Literal["opencode_session_established"] = "opencode_session_established"
opencode_session_id: str
timestamp: str = Field(default_factory=utc_now_iso)
@dataclass(frozen=True)
class OpenCodeSSEKeepalive:
"""Marker used by SessionManager to emit SSE keepalive comments."""
OpenCodeEvent = (
OpenCodeAgentMessageChunk
| OpenCodeAgentThoughtChunk
| OpenCodeToolCallStart
| OpenCodeToolCallProgress
| OpenCodePromptResponse
| OpenCodeError
| OpenCodeSessionEstablished
| OpenCodeSSEKeepalive
)

View File

@@ -0,0 +1,571 @@
"""HTTP streaming client for OpenCode server event and prompt APIs."""
import json
import socket
import time
from collections.abc import Generator
from http.client import HTTPResponse
from typing import Any
from urllib.error import HTTPError
from urllib.parse import urlencode
from urllib.request import Request
from urllib.request import urlopen
from onyx.server.features.build.api.packet_logger import get_packet_logger
from onyx.server.features.build.configs import OPENCODE_MESSAGE_TIMEOUT
from onyx.server.features.build.configs import SSE_KEEPALIVE_INTERVAL
from onyx.server.features.build.sandbox.opencode.events import OpenCodeError
from onyx.server.features.build.sandbox.opencode.events import OpenCodeEvent
from onyx.server.features.build.sandbox.opencode.events import OpenCodePromptResponse
from onyx.server.features.build.sandbox.opencode.events import OpenCodeSSEKeepalive
from onyx.server.features.build.sandbox.opencode.parser import (
looks_like_session_not_found,
)
from onyx.server.features.build.sandbox.opencode.parser import OpenCodeEventParser
from onyx.server.features.build.sandbox.opencode.run_client import (
OpenCodeSessionNotFoundError,
)
READ_TIMEOUT_SECONDS = 1.0
def _as_dict(value: Any) -> dict[str, Any] | None:
return value if isinstance(value, dict) else None
def _extract_timestamp_ms(value: dict[str, Any]) -> int | float | None:
"""Extract a millisecond timestamp from `time` metadata, if present."""
time_obj = _as_dict(value.get("time")) or {}
for key in ("end", "completed", "start", "created", "updated"):
ts = time_obj.get(key)
if isinstance(ts, (int, float)):
return ts
return None
def _extract_session_error_message(error_payload: dict[str, Any] | None) -> str:
if not error_payload:
return "OpenCode session failed"
message = error_payload.get("message")
if isinstance(message, str) and message:
return message
data = _as_dict(error_payload.get("data"))
if data:
nested = data.get("message")
if isinstance(nested, str) and nested:
return nested
return json.dumps(error_payload, default=str)
def _build_raw_event_from_part(
part: dict[str, Any],
delta: str | None,
) -> dict[str, Any] | None:
"""Translate OpenCode SSE `message.part.updated` payload into run-style events."""
part_type = part.get("type")
session_id = part.get("sessionID")
if not isinstance(part_type, str) or not isinstance(session_id, str):
return None
timestamp = _extract_timestamp_ms(part)
if part_type == "text":
text_value = delta if isinstance(delta, str) else part.get("text")
if not isinstance(text_value, str) or not text_value:
return None
return {
"type": "text",
"sessionID": session_id,
"timestamp": timestamp,
"part": {"text": text_value},
}
if part_type == "reasoning":
text_value = delta if isinstance(delta, str) else part.get("text")
if not isinstance(text_value, str) or not text_value:
return None
return {
"type": "reasoning",
"sessionID": session_id,
"timestamp": timestamp,
"part": {"text": text_value},
}
if part_type == "tool":
call_id = part.get("callID")
tool_name = part.get("tool")
state = _as_dict(part.get("state")) or {}
if not isinstance(call_id, str) or not isinstance(tool_name, str):
return None
return {
"type": "tool_use",
"sessionID": session_id,
"timestamp": timestamp,
"part": {
"callID": call_id,
"tool": tool_name,
"state": state,
},
}
if part_type == "step-finish":
reason = part.get("reason")
if not isinstance(reason, str):
reason = "completed"
return {
"type": "step_finish",
"sessionID": session_id,
"timestamp": timestamp,
"part": {"reason": reason},
}
if part_type == "error":
message = part.get("message")
if not isinstance(message, str):
return None
return {
"type": "error",
"sessionID": session_id,
"timestamp": timestamp,
"part": {
"message": message,
"code": part.get("code"),
"data": _as_dict(part.get("data")),
},
}
return None
class OpenCodeHttpClient:
"""Executes OpenCode turns using `/event` SSE and `/prompt_async` HTTP APIs."""
def __init__(
self,
server_url: str,
session_id: str | None = None,
cwd: str | None = None,
timeout: float = OPENCODE_MESSAGE_TIMEOUT,
keepalive_interval: float = SSE_KEEPALIVE_INTERVAL,
) -> None:
self._server_url = server_url.rstrip("/")
self._cwd = cwd
self._timeout = timeout
self._keepalive_interval = keepalive_interval
self._parser = OpenCodeEventParser(session_id=session_id)
@property
def session_id(self) -> str | None:
return self._parser.session_id
def _build_url(self, path: str) -> str:
if self._cwd:
query = urlencode({"directory": self._cwd})
return f"{self._server_url}{path}?{query}"
return f"{self._server_url}{path}"
def _create_session_if_needed(self) -> None:
if self._parser.session_id:
return
request = Request(
self._build_url("/session"),
data=b"{}",
method="POST",
headers={
"content-type": "application/json",
"accept": "application/json",
},
)
with urlopen(request, timeout=15.0) as response:
body = response.read().decode("utf-8")
payload = json.loads(body) if body else {}
session_id = payload.get("id") if isinstance(payload, dict) else None
if not isinstance(session_id, str) or not session_id:
raise RuntimeError("OpenCode /session response missing session id")
self._parser.session_id = session_id
def _open_event_stream(self) -> HTTPResponse:
request = Request(
self._build_url("/event"),
method="GET",
headers={"accept": "text/event-stream"},
)
return urlopen(request, timeout=READ_TIMEOUT_SECONDS)
def _send_prompt_async(self, message: str) -> None:
session_id = self._parser.session_id
if not session_id:
raise RuntimeError("OpenCode session id missing before prompt_async")
payload = {"parts": [{"type": "text", "text": message}]}
request = Request(
self._build_url(f"/session/{session_id}/prompt_async"),
data=json.dumps(payload).encode("utf-8"),
method="POST",
headers={
"content-type": "application/json",
"accept": "application/json",
},
)
try:
with urlopen(request, timeout=15.0):
return
except HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
if exc.code == 404 and looks_like_session_not_found(body):
raise OpenCodeSessionNotFoundError(body)
if exc.code == 404:
raise OpenCodeSessionNotFoundError(
body or f"Session {session_id} not found"
)
raise RuntimeError(
f"OpenCode prompt_async failed ({exc.code}): {body[:500]}"
)
def send_message(self, message: str) -> Generator[OpenCodeEvent, None, None]:
"""Send one message through OpenCode HTTP APIs and stream normalized events."""
packet_logger = get_packet_logger()
packet_logger.log_raw(
"OPENCODE-HTTP-START",
{
"server_url": self._server_url,
"session_id": self._parser.session_id,
"cwd": self._cwd,
},
)
message_roles: dict[str, str] = {}
message_parent_ids: dict[str, str] = {}
user_message_ids_by_session: dict[str, str] = {}
assistant_message_ids_by_session: dict[str, str] = {}
saw_target_busy = False
saw_target_idle = False
status = "started"
error_preview: str | None = None
try:
self._create_session_if_needed()
primary_session_id = self._parser.session_id
if not primary_session_id:
raise RuntimeError(
"OpenCode session id missing after /session creation"
)
session_parsers: dict[str, OpenCodeEventParser] = {
primary_session_id: self._parser
}
def _get_parser_for_session(session_id: str) -> OpenCodeEventParser:
parser = session_parsers.get(session_id)
if parser is not None:
return parser
parser = OpenCodeEventParser(session_id=session_id)
session_parsers[session_id] = parser
return parser
with self._open_event_stream() as stream:
self._send_prompt_async(message)
start_time = time.monotonic()
last_event_time = start_time
data_lines: list[str] = []
while True:
elapsed = time.monotonic() - start_time
if elapsed > self._timeout:
status = "timeout"
yield OpenCodeError(
opencode_session_id=primary_session_id,
code=-1,
message=(
"Timeout waiting for OpenCode response "
f"after {self._timeout:.1f}s"
),
)
return
try:
raw_line = stream.readline()
except (TimeoutError, socket.timeout):
raw_line = None
except OSError as exc:
if "timed out" in str(exc).lower():
raw_line = None
else:
raise
if raw_line is None:
if (
time.monotonic() - last_event_time
) >= self._keepalive_interval:
yield OpenCodeSSEKeepalive()
last_event_time = time.monotonic()
continue
# Stream closed by server.
if raw_line == b"":
break
line = raw_line.decode("utf-8", errors="replace").rstrip("\r\n")
if not line:
if not data_lines:
continue
payload_str = "\n".join(data_lines)
data_lines.clear()
try:
payload = json.loads(payload_str)
except json.JSONDecodeError:
packet_logger.log_raw(
"OPENCODE-HTTP-PARSE-ERROR",
{"line": payload_str[:500]},
)
continue
if not isinstance(payload, dict):
continue
payload_type = payload.get("type")
properties = _as_dict(payload.get("properties")) or {}
if payload_type == "session.status":
status_session_id = properties.get("sessionID")
if (
isinstance(status_session_id, str)
and status_session_id == primary_session_id
):
status_obj = _as_dict(properties.get("status")) or {}
status_type = status_obj.get("type")
if status_type == "busy":
saw_target_busy = True
elif status_type == "idle":
saw_target_idle = True
if saw_target_busy:
if not self._parser.saw_prompt_response:
yield OpenCodePromptResponse(
opencode_session_id=primary_session_id,
stop_reason="completed",
)
status = "completed"
return
continue
if payload_type == "session.error":
status_session_id = properties.get("sessionID")
if (
isinstance(status_session_id, str)
and status_session_id == primary_session_id
):
error_payload = _as_dict(properties.get("error"))
status = "failed"
yield OpenCodeError(
opencode_session_id=primary_session_id,
code=(
error_payload.get("name")
if error_payload
else None
),
message=_extract_session_error_message(
error_payload
),
data=error_payload,
)
return
continue
if payload_type == "message.updated":
info = _as_dict(properties.get("info")) or {}
info_session_id = info.get("sessionID")
if not isinstance(info_session_id, str):
continue
parser_for_session = _get_parser_for_session(
info_session_id
)
raw_info_event = {
"type": "noop",
"sessionID": info_session_id,
"timestamp": _extract_timestamp_ms(info),
"part": {},
}
for parsed_event in parser_for_session.parse_raw_event(
raw_info_event
):
last_event_time = time.monotonic()
yield parsed_event
message_id = info.get("id")
role = info.get("role")
if isinstance(message_id, str) and isinstance(role, str):
message_roles[message_id] = role
parent_id = info.get("parentID")
if isinstance(parent_id, str):
message_parent_ids[message_id] = parent_id
if (
role == "user"
and info_session_id
not in user_message_ids_by_session
):
user_message_ids_by_session[info_session_id] = (
message_id
)
elif role == "assistant":
user_message_id = user_message_ids_by_session.get(
info_session_id
)
if (
isinstance(parent_id, str)
and parent_id == user_message_id
):
assistant_message_ids_by_session[
info_session_id
] = message_id
continue
if payload_type != "message.part.updated":
continue
part = _as_dict(properties.get("part")) or {}
part_session_id = part.get("sessionID")
if not isinstance(part_session_id, str):
continue
message_id = part.get("messageID")
if not isinstance(message_id, str):
continue
role = message_roles.get(message_id)
if role != "assistant":
continue
expected_assistant_id = assistant_message_ids_by_session.get(
part_session_id
)
if (
expected_assistant_id
and message_id != expected_assistant_id
):
continue
if not expected_assistant_id:
user_message_id = user_message_ids_by_session.get(
part_session_id
)
parent_id = message_parent_ids.get(message_id)
if (
user_message_id
and isinstance(parent_id, str)
and parent_id != user_message_id
):
continue
assistant_message_ids_by_session[part_session_id] = (
message_id
)
raw_part_event = _build_raw_event_from_part(
part=part,
delta=(
properties.get("delta")
if isinstance(properties.get("delta"), str)
else None
),
)
if raw_part_event is None:
continue
parser_for_session = _get_parser_for_session(part_session_id)
for parsed_event in parser_for_session.parse_raw_event(
raw_part_event
):
last_event_time = time.monotonic()
yield parsed_event
if (
isinstance(parsed_event, OpenCodePromptResponse)
and part_session_id == primary_session_id
):
status = "completed"
return
continue
if line.startswith("data:"):
data_lines.append(line[5:].lstrip())
# Flush trailing buffered SSE data if stream closed mid-event.
if data_lines:
payload_str = "\n".join(data_lines)
try:
payload = json.loads(payload_str)
except json.JSONDecodeError:
payload = None
if isinstance(payload, dict):
properties = _as_dict(payload.get("properties")) or {}
part = _as_dict(properties.get("part")) or {}
part_session_id = part.get("sessionID")
if not isinstance(part_session_id, str):
part_session_id = primary_session_id
raw_part_event = _build_raw_event_from_part(
part=part,
delta=(
properties.get("delta")
if isinstance(properties.get("delta"), str)
else None
),
)
if raw_part_event:
parser_for_session = _get_parser_for_session(
part_session_id
)
for parsed_event in parser_for_session.parse_raw_event(
raw_part_event
):
yield parsed_event
if (
isinstance(parsed_event, OpenCodePromptResponse)
and part_session_id == primary_session_id
):
status = "completed"
return
if not self._parser.saw_prompt_response:
# Session may have completed without explicit step-finish event.
stop_reason = "completed"
if saw_target_idle and not saw_target_busy:
stop_reason = "idle"
yield OpenCodePromptResponse(
opencode_session_id=primary_session_id,
stop_reason=stop_reason,
)
status = "completed"
except OpenCodeSessionNotFoundError:
status = "session_not_found"
raise
except Exception as exc:
status = "failed"
error_preview = str(exc)
yield OpenCodeError(
opencode_session_id=self._parser.session_id,
message=error_preview,
)
finally:
packet_logger.log_raw(
"OPENCODE-HTTP-END",
{
"server_url": self._server_url,
"session_id": self._parser.session_id,
"cwd": self._cwd,
"status": status,
"saw_prompt_response": self._parser.saw_prompt_response,
"error_preview": (error_preview[:500] if error_preview else None),
},
)

View File

@@ -0,0 +1,257 @@
"""Shared OpenCode raw-event parser used by local and Kubernetes clients."""
import json
from collections.abc import Generator
from typing import Any
from onyx.server.features.build.sandbox.opencode.events import (
OpenCodeAgentMessageChunk,
)
from onyx.server.features.build.sandbox.opencode.events import (
OpenCodeAgentThoughtChunk,
)
from onyx.server.features.build.sandbox.opencode.events import OpenCodeError
from onyx.server.features.build.sandbox.opencode.events import OpenCodeEvent
from onyx.server.features.build.sandbox.opencode.events import OpenCodePromptResponse
from onyx.server.features.build.sandbox.opencode.events import (
OpenCodeSessionEstablished,
)
from onyx.server.features.build.sandbox.opencode.events import OpenCodeTextContent
from onyx.server.features.build.sandbox.opencode.events import OpenCodeToolCallProgress
from onyx.server.features.build.sandbox.opencode.events import OpenCodeToolCallStart
from onyx.server.features.build.sandbox.opencode.events import timestamp_ms_to_iso
def _normalize_status(status: Any) -> str:
if not isinstance(status, str):
return "pending"
normalized = status.strip().lower()
alias_map = {
"running": "in_progress",
"in-progress": "in_progress",
"done": "completed",
"success": "completed",
"error": "failed",
}
normalized = alias_map.get(normalized, normalized)
if normalized in {"pending", "in_progress", "completed", "failed", "cancelled"}:
return normalized
return "pending"
def _map_tool_kind(tool_name: str, raw_input: dict[str, Any] | None) -> str:
"""Map OpenCode tool name into the UI's existing tool kind bucket."""
lower_name = tool_name.lower()
if lower_name in {"glob", "grep", "websearch"}:
return "search"
if lower_name in {"read"}:
return "read"
if lower_name in {"bash"}:
return "execute"
if lower_name in {"task"}:
return "task"
if lower_name in {"apply_patch", "edit", "write"}:
return "edit"
if raw_input:
if isinstance(raw_input.get("command"), str):
return "execute"
if isinstance(raw_input.get("patchText"), str):
return "edit"
if raw_input.get("subagent_type") or raw_input.get("subagentType"):
return "task"
return "other"
def _as_dict(value: Any) -> dict[str, Any] | None:
return value if isinstance(value, dict) else None
def _build_tool_content(
tool_name: str,
raw_input: dict[str, Any] | None,
raw_output: dict[str, Any] | None,
) -> list[dict[str, Any]] | None:
"""Build tool content blocks used by frontend diff/read parsing logic."""
if tool_name.lower() == "apply_patch":
metadata = _as_dict((raw_output or {}).get("metadata"))
files = metadata.get("files") if metadata else None
if isinstance(files, list):
blocks: list[dict[str, Any]] = []
for file_entry in files:
if not isinstance(file_entry, dict):
continue
path = file_entry.get("relativePath") or file_entry.get("filePath")
blocks.append(
{
"type": "diff",
"path": path,
"oldText": file_entry.get("before") or "",
"newText": file_entry.get("after") or "",
}
)
if blocks:
return blocks
if raw_input and isinstance(raw_input.get("path"), str):
return [{"type": "diff", "path": raw_input["path"]}]
if tool_name.lower() == "read" and raw_output:
output_text = raw_output.get("output")
if isinstance(output_text, str):
return [
{
"type": "content",
"content": {"type": "text", "text": output_text},
}
]
return None
def looks_like_session_not_found(error_text: str) -> bool:
lower = error_text.lower()
candidates = (
"session not found",
"unknown session",
"invalid session",
"session does not exist",
)
return any(token in lower for token in candidates)
class OpenCodeEventParser:
"""Stateful parser that converts raw OpenCode JSON lines into UI packets."""
def __init__(self, session_id: str | None = None) -> None:
self.session_id = session_id
self._emitted_session_id: str | None = None
self.saw_prompt_response = False
self._seen_tool_call_ids: set[str] = set()
def parse_raw_event(
self, raw_event: dict[str, Any]
) -> Generator[OpenCodeEvent, None, None]:
timestamp = timestamp_ms_to_iso(raw_event.get("timestamp"))
session_id = raw_event.get("sessionID") or raw_event.get("sessionId")
if isinstance(session_id, str):
if self.session_id != session_id:
self.session_id = session_id
if self._emitted_session_id != session_id:
self._emitted_session_id = session_id
yield OpenCodeSessionEstablished(
opencode_session_id=session_id,
timestamp=timestamp,
)
raw_type = raw_event.get("type")
part = _as_dict(raw_event.get("part")) or {}
if raw_type == "text":
text = part.get("text")
if isinstance(text, str) and text:
yield OpenCodeAgentMessageChunk(
opencode_session_id=self.session_id,
content=OpenCodeTextContent(text=text),
timestamp=timestamp,
)
return
if raw_type == "reasoning":
text = part.get("text")
if isinstance(text, str) and text:
yield OpenCodeAgentThoughtChunk(
opencode_session_id=self.session_id,
content=OpenCodeTextContent(text=text),
timestamp=timestamp,
)
return
if raw_type == "tool_use":
call_id = part.get("callID")
tool_name = part.get("tool")
state = _as_dict(part.get("state")) or {}
raw_input = _as_dict(state.get("input"))
metadata = _as_dict(state.get("metadata")) or {}
output = state.get("output")
title = state.get("title")
status = _normalize_status(state.get("status"))
if not isinstance(call_id, str) or not isinstance(tool_name, str):
return
kind = _map_tool_kind(tool_name, raw_input)
raw_output: dict[str, Any] = {
"output": (
output
if isinstance(output, str)
else json.dumps(output, default=str)
),
"metadata": metadata,
}
if tool_name.lower() == "read" and isinstance(output, str):
raw_output["content"] = output
if tool_name.lower() == "task":
task_session_id = metadata.get("sessionId") or metadata.get("sessionID")
if not isinstance(task_session_id, str):
task_session_id = metadata.get("session_id")
if isinstance(task_session_id, str):
raw_output["sessionId"] = task_session_id
content = _build_tool_content(tool_name, raw_input, raw_output)
if call_id not in self._seen_tool_call_ids:
self._seen_tool_call_ids.add(call_id)
yield OpenCodeToolCallStart(
opencode_session_id=self.session_id,
tool_call_id=call_id,
tool_name=tool_name,
kind=kind,
title=title if isinstance(title, str) else None,
content=content,
raw_input=raw_input,
raw_output=None,
status="pending",
timestamp=timestamp,
)
yield OpenCodeToolCallProgress(
opencode_session_id=self.session_id,
tool_call_id=call_id,
tool_name=tool_name,
kind=kind,
title=title if isinstance(title, str) else None,
content=content,
raw_input=raw_input,
raw_output=raw_output,
status=status,
timestamp=timestamp,
)
return
if raw_type == "step_finish":
reason = part.get("reason")
if isinstance(reason, str) and reason.lower() != "tool-calls":
self.saw_prompt_response = True
yield OpenCodePromptResponse(
opencode_session_id=self.session_id,
stop_reason=reason,
timestamp=timestamp,
)
return
if raw_type == "error":
message = part.get("message")
if isinstance(message, str):
yield OpenCodeError(
opencode_session_id=self.session_id,
message=message,
code=part.get("code"),
data=_as_dict(part.get("data")),
timestamp=timestamp,
)

View File

@@ -0,0 +1,214 @@
"""OpenCode run client for one-shot message execution against a running server."""
import json
import select
import subprocess
import time
from collections.abc import Generator
from onyx.server.features.build.api.packet_logger import get_packet_logger
from onyx.server.features.build.configs import OPENCODE_MESSAGE_TIMEOUT
from onyx.server.features.build.configs import SSE_KEEPALIVE_INTERVAL
from onyx.server.features.build.sandbox.opencode.events import OpenCodeError
from onyx.server.features.build.sandbox.opencode.events import OpenCodeEvent
from onyx.server.features.build.sandbox.opencode.events import OpenCodePromptResponse
from onyx.server.features.build.sandbox.opencode.events import OpenCodeSSEKeepalive
from onyx.server.features.build.sandbox.opencode.parser import (
looks_like_session_not_found,
)
from onyx.server.features.build.sandbox.opencode.parser import OpenCodeEventParser
class OpenCodeSessionNotFoundError(RuntimeError):
"""Raised when a requested OpenCode session ID is not available on the server."""
class OpenCodeRunClient:
"""Executes `opencode run --attach` and streams normalized packet events."""
def __init__(
self,
server_url: str,
session_id: str | None = None,
cwd: str | None = None,
timeout: float = OPENCODE_MESSAGE_TIMEOUT,
keepalive_interval: float = SSE_KEEPALIVE_INTERVAL,
) -> None:
self._server_url = server_url
self._cwd = cwd
self._timeout = timeout
self._keepalive_interval = keepalive_interval
self._parser = OpenCodeEventParser(session_id=session_id)
@property
def session_id(self) -> str | None:
return self._parser.session_id
def _build_command(self, message: str) -> list[str]:
command = [
"opencode",
"run",
"--attach",
self._server_url,
"--format",
"json",
]
if self._parser.session_id:
command.extend(["--session", self._parser.session_id])
command.append(message)
return command
@staticmethod
def _terminate_process(process: subprocess.Popen[str]) -> None:
if process.poll() is not None:
return
process.terminate()
try:
process.wait(timeout=3)
except subprocess.TimeoutExpired:
process.kill()
def send_message(self, message: str) -> Generator[OpenCodeEvent, None, None]:
"""Send a message and stream normalized events."""
command = self._build_command(message)
packet_logger = get_packet_logger()
packet_logger.log_raw(
"OPENCODE-RUN-START",
{
"server_url": self._server_url,
"session_id": self._parser.session_id,
"cwd": self._cwd,
"command": command,
},
)
process = subprocess.Popen(
command,
cwd=self._cwd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
if process.stdout is None or process.stderr is None:
self._terminate_process(process)
yield OpenCodeError(
opencode_session_id=self._parser.session_id,
message="Failed to open opencode subprocess pipes",
)
return
start_time = time.monotonic()
last_event_time = start_time
stderr_lines: list[str] = []
stdout_fd = process.stdout.fileno()
stderr_fd = process.stderr.fileno()
active_fds = {stdout_fd, stderr_fd}
fd_to_stream = {
stdout_fd: process.stdout,
stderr_fd: process.stderr,
}
try:
while active_fds:
elapsed = time.monotonic() - start_time
if elapsed > self._timeout:
self._terminate_process(process)
yield OpenCodeError(
opencode_session_id=self._parser.session_id,
code=-1,
message=(
f"Timeout waiting for OpenCode response after {self._timeout:.1f}s"
),
)
return
ready, _, _ = select.select(list(active_fds), [], [], 1.0)
if not ready:
if (time.monotonic() - last_event_time) >= self._keepalive_interval:
yield OpenCodeSSEKeepalive()
last_event_time = time.monotonic()
# Process may have exited while stdout/stderr fds are still open.
if process.poll() is not None:
for fd in list(active_fds):
stream = fd_to_stream[fd]
remaining = stream.readline()
if not remaining:
active_fds.discard(fd)
elif fd == stderr_fd:
stderr_lines.append(remaining.strip())
if process.poll() is not None and not active_fds:
break
continue
for fd in ready:
stream = fd_to_stream[fd]
line = stream.readline()
if line == "":
active_fds.discard(fd)
continue
line = line.strip()
if not line:
continue
if fd == stderr_fd:
stderr_lines.append(line)
continue
try:
raw_event = json.loads(line)
except json.JSONDecodeError:
packet_logger.log_raw(
"OPENCODE-RUN-PARSE-ERROR",
{"line": line[:500]},
)
continue
for event in self._parser.parse_raw_event(raw_event):
last_event_time = time.monotonic()
yield event
finally:
if process.poll() is None:
self._terminate_process(process)
return_code = process.returncode
if return_code is None:
try:
return_code = process.wait(timeout=3)
except subprocess.TimeoutExpired:
self._terminate_process(process)
return_code = process.returncode
stderr_text = "\n".join(stderr_lines).strip()
packet_logger.log_raw(
"OPENCODE-RUN-END",
{
"server_url": self._server_url,
"session_id": self._parser.session_id,
"cwd": self._cwd,
"return_code": return_code,
"stderr_preview": stderr_text[:500] if stderr_text else None,
},
)
if return_code not in (0, None):
if stderr_text and looks_like_session_not_found(stderr_text):
raise OpenCodeSessionNotFoundError(stderr_text)
yield OpenCodeError(
opencode_session_id=self._parser.session_id,
code=return_code,
message=stderr_text or f"OpenCode run exited with code {return_code}",
)
return
if not self._parser.saw_prompt_response:
yield OpenCodePromptResponse(
opencode_session_id=self._parser.session_id,
stop_reason="completed",
)

View File

@@ -15,14 +15,6 @@ from pathlib import Path
from typing import Any
from uuid import UUID
from acp.schema import AgentMessageChunk
from acp.schema import AgentPlanUpdate
from acp.schema import AgentThoughtChunk
from acp.schema import CurrentModeUpdate
from acp.schema import Error as ACPError
from acp.schema import PromptResponse
from acp.schema import ToolCallProgress
from acp.schema import ToolCallStart
from sqlalchemy.orm import Session as DBSession
from onyx.configs.app_configs import WEB_DOMAIN
@@ -62,8 +54,8 @@ from onyx.server.features.build.db.build_session import get_build_session
from onyx.server.features.build.db.build_session import get_empty_session_for_user
from onyx.server.features.build.db.build_session import get_session_messages
from onyx.server.features.build.db.build_session import get_user_build_sessions
from onyx.server.features.build.db.build_session import update_opencode_session_id
from onyx.server.features.build.db.build_session import update_session_activity
from onyx.server.features.build.db.build_session import upsert_agent_plan
from onyx.server.features.build.db.sandbox import create_sandbox__no_commit
from onyx.server.features.build.db.sandbox import get_running_sandbox_count_by_tenant
from onyx.server.features.build.db.sandbox import get_sandbox_by_session_id
@@ -71,10 +63,15 @@ from onyx.server.features.build.db.sandbox import get_sandbox_by_user_id
from onyx.server.features.build.db.sandbox import update_sandbox_heartbeat
from onyx.server.features.build.db.sandbox import update_sandbox_status__no_commit
from onyx.server.features.build.sandbox import get_sandbox_manager
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
SSEKeepalive,
)
from onyx.server.features.build.sandbox.models import LLMProviderConfig
from onyx.server.features.build.sandbox.opencode import OpenCodeAgentMessageChunk
from onyx.server.features.build.sandbox.opencode import OpenCodeAgentThoughtChunk
from onyx.server.features.build.sandbox.opencode import OpenCodeError
from onyx.server.features.build.sandbox.opencode import OpenCodePromptResponse
from onyx.server.features.build.sandbox.opencode import OpenCodeSessionEstablished
from onyx.server.features.build.sandbox.opencode import OpenCodeSSEKeepalive
from onyx.server.features.build.sandbox.opencode import OpenCodeToolCallProgress
from onyx.server.features.build.sandbox.opencode import OpenCodeToolCallStart
from onyx.server.features.build.sandbox.tasks.tasks import (
_get_disabled_user_library_paths,
)
@@ -99,9 +96,9 @@ class UploadLimitExceededError(ValueError):
class BuildStreamingState:
"""Container for accumulating state during ACP streaming.
"""Container for accumulating state during OpenCode streaming.
Similar to ChatStateContainer but adapted for ACP packet types.
Similar to ChatStateContainer but adapted for build-mode packet types.
Accumulates chunks and tracks pending tool calls until completion.
Usage:
@@ -130,9 +127,6 @@ class BuildStreamingState:
self.message_chunks: list[str] = []
self.thought_chunks: list[str] = []
# For upserting agent_plan_update - track ID so we can update in place
self.plan_message_id: UUID | None = None
# Track what type of chunk we were last receiving
self._last_chunk_type: str | None = None
@@ -1103,18 +1097,17 @@ class SessionManager:
- agent_thought_chunk: Accumulated, saved as one synthetic packet at end/type change
- tool_call_start: Streamed to frontend only, not saved
- tool_call_progress: Only saved when status="completed"
- agent_plan_update: Upserted (only latest plan kept per turn)
"""
def _serialize_acp_event(event: Any, event_type: str) -> str:
"""Serialize an ACP event to SSE format, preserving ALL ACP data."""
def _serialize_event(event: Any, event_type: str) -> str:
"""Serialize an event to SSE format, preserving all event data."""
if hasattr(event, "model_dump"):
data = event.model_dump(mode="json", by_alias=True, exclude_none=False)
else:
data = {"raw": str(event)}
data["type"] = event_type
data["timestamp"] = datetime.now(tz=timezone.utc).isoformat()
data.setdefault("timestamp", datetime.now(tz=timezone.utc).isoformat())
return f"event: message\ndata: {json.dumps(data)}\n\n"
@@ -1123,7 +1116,7 @@ class SessionManager:
return f"event: message\ndata: {packet.model_dump_json(by_alias=True)}\n\n"
def _extract_text_from_content(content: Any) -> str:
"""Extract text from ACP content structure."""
"""Extract text from event content structures."""
if content is None:
return ""
if hasattr(content, "type") and content.type == "text":
@@ -1263,81 +1256,321 @@ class SessionManager:
},
)
# Stream ACP events directly to frontend
for acp_event in self._sandbox_manager.send_message(
sandbox_id, session_id, user_message_content
if session.nextjs_port is None:
error_packet = ErrorPacket(
message="Session is missing a Next.js port allocation."
)
packet_logger.log("error", error_packet.model_dump())
yield _format_packet_event(error_packet)
return
primary_opencode_session_id: str | None = session.opencode_session_id
saw_primary_opencode_session = False
task_call_to_subagent_session: dict[str, str] = {}
subagent_session_to_task_call: dict[str, str] = {}
buffered_subagent_packets_by_session: dict[str, list[dict[str, Any]]] = {}
def _extract_task_subagent_session_id(
event_data: dict[str, Any],
) -> str | None:
raw_output_obj = (
event_data.get("raw_output") or event_data.get("rawOutput") or {}
)
raw_output = raw_output_obj if isinstance(raw_output_obj, dict) else {}
metadata_obj = raw_output.get("metadata")
metadata = metadata_obj if isinstance(metadata_obj, dict) else {}
direct = (
raw_output.get("sessionId")
or raw_output.get("sessionID")
or raw_output.get("session_id")
)
if isinstance(direct, str) and direct:
return direct
nested = (
metadata.get("sessionId")
or metadata.get("sessionID")
or metadata.get("session_id")
)
if isinstance(nested, str) and nested:
return nested
return None
def _serialize_subagent_packet(
parent_tool_call_id: str,
subagent_session_id: str,
packet: dict[str, Any],
) -> str:
payload = {
"type": "subagent_packet",
"parent_tool_call_id": parent_tool_call_id,
"subagent_session_id": subagent_session_id,
"packet": packet,
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
}
return f"event: message\ndata: {json.dumps(payload)}\n\n"
def _emit_subagent_packets(
parent_tool_call_id: str,
subagent_session_id: str,
packets: list[dict[str, Any]],
) -> Generator[str, None, None]:
nonlocal events_emitted
for packet in packets:
subagent_packet = {
"type": "subagent_packet",
"parent_tool_call_id": parent_tool_call_id,
"subagent_session_id": subagent_session_id,
"packet": packet,
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
}
create_message(
session_id=session_id,
message_type=MessageType.ASSISTANT,
turn_index=state.turn_index,
message_metadata=subagent_packet,
db_session=self._db_session,
)
packet_logger.log("subagent_packet", subagent_packet)
packet_logger.log_sse_emit("subagent_packet", session_id)
events_emitted += 1
yield _serialize_subagent_packet(
parent_tool_call_id=parent_tool_call_id,
subagent_session_id=subagent_session_id,
packet=packet,
)
def _event_to_packet_dict(event: Any) -> dict[str, Any] | None:
if isinstance(
event, (OpenCodeSSEKeepalive, OpenCodeSessionEstablished)
):
return None
event_type = self._get_event_type(event)
if not event_type or event_type == "unknown":
return None
if hasattr(event, "model_dump"):
data = event.model_dump(
mode="json", by_alias=True, exclude_none=False
)
else:
data = {"raw": str(event)}
data["type"] = event_type
data.setdefault("timestamp", datetime.now(tz=timezone.utc).isoformat())
return data
def _flush_buffered_subagent_packets(
subagent_session_id: str,
) -> Generator[str, None, None]:
parent_tool_call_id = subagent_session_to_task_call.get(
subagent_session_id
)
if not parent_tool_call_id:
return
buffered_packets = buffered_subagent_packets_by_session.pop(
subagent_session_id, []
)
if buffered_packets:
yield from _emit_subagent_packets(
parent_tool_call_id=parent_tool_call_id,
subagent_session_id=subagent_session_id,
packets=buffered_packets,
)
def _persist_primary_opencode_session(
opencode_session_id: str,
) -> None:
nonlocal primary_opencode_session_id
if primary_opencode_session_id == opencode_session_id:
return
primary_opencode_session_id = opencode_session_id
if session.opencode_session_id != opencode_session_id:
update_opencode_session_id(
session_id=session_id,
opencode_session_id=opencode_session_id,
db_session=self._db_session,
)
session.opencode_session_id = opencode_session_id
# Stream OpenCode events directly to frontend.
for opencode_event in self._sandbox_manager.send_message(
sandbox_id=sandbox_id,
session_id=session_id,
nextjs_port=session.nextjs_port,
message=user_message_content,
opencode_session_id=session.opencode_session_id,
):
# Handle SSE keepalive - send comment to keep connection alive
if isinstance(acp_event, SSEKeepalive):
# SSE comments start with : and are ignored by EventSource
# but keep the HTTP connection alive
# Handle SSE keepalive - send comment to keep connection alive.
if isinstance(opencode_event, OpenCodeSSEKeepalive):
packet_logger.log_sse_emit("keepalive", session_id)
yield ": keepalive\n\n"
continue
# Check if we need to finalize pending chunks before processing
event_type = self._get_event_type(acp_event)
# Internal event used for OpenCode session-id persistence.
if isinstance(opencode_event, OpenCodeSessionEstablished):
if not saw_primary_opencode_session:
saw_primary_opencode_session = True
_persist_primary_opencode_session(
opencode_event.opencode_session_id
)
packet_logger.log(
"opencode_session_established",
opencode_event.model_dump(
mode="json", by_alias=True, exclude_none=False
),
)
continue
event_session_id = getattr(opencode_event, "opencode_session_id", None)
if (
not saw_primary_opencode_session
and isinstance(event_session_id, str)
and event_session_id
):
saw_primary_opencode_session = True
_persist_primary_opencode_session(event_session_id)
is_primary_event = (
not isinstance(event_session_id, str)
or not event_session_id
or event_session_id == primary_opencode_session_id
)
if not is_primary_event:
packet = _event_to_packet_dict(opencode_event)
if packet is None:
continue
parent_tool_call_id = subagent_session_to_task_call.get(
event_session_id
)
if parent_tool_call_id:
yield from _emit_subagent_packets(
parent_tool_call_id=parent_tool_call_id,
subagent_session_id=event_session_id,
packets=[packet],
)
else:
buffered_subagent_packets_by_session.setdefault(
event_session_id, []
).append(packet)
continue
# Check if we need to finalize pending chunks before processing.
event_type = self._get_event_type(opencode_event)
if state.should_finalize_chunks(event_type):
_save_pending_chunks(state)
events_emitted += 1
# Pass through ACP events with snake_case type names
if isinstance(acp_event, AgentMessageChunk):
text = _extract_text_from_content(acp_event.content)
if isinstance(opencode_event, OpenCodeAgentMessageChunk):
text = _extract_text_from_content(opencode_event.content)
if text:
state.add_message_chunk(text)
event_data = acp_event.model_dump(
event_data = opencode_event.model_dump(
mode="json", by_alias=True, exclude_none=False
)
event_data["type"] = "agent_message_chunk"
packet_logger.log("agent_message_chunk", event_data)
packet_logger.log_sse_emit("agent_message_chunk", session_id)
yield _serialize_acp_event(acp_event, "agent_message_chunk")
yield _serialize_event(opencode_event, "agent_message_chunk")
elif isinstance(acp_event, AgentThoughtChunk):
text = _extract_text_from_content(acp_event.content)
elif isinstance(opencode_event, OpenCodeAgentThoughtChunk):
text = _extract_text_from_content(opencode_event.content)
if text:
state.add_thought_chunk(text)
packet_logger.log(
"agent_thought_chunk",
acp_event.model_dump(mode="json", by_alias=True),
opencode_event.model_dump(mode="json", by_alias=True),
)
packet_logger.log_sse_emit("agent_thought_chunk", session_id)
yield _serialize_acp_event(acp_event, "agent_thought_chunk")
yield _serialize_event(opencode_event, "agent_thought_chunk")
elif isinstance(acp_event, ToolCallStart):
# Stream to frontend but don't save - wait for completion
elif isinstance(opencode_event, OpenCodeToolCallStart):
# Stream to frontend but don't save - wait for completion.
packet_logger.log(
"tool_call_start",
acp_event.model_dump(mode="json", by_alias=True),
opencode_event.model_dump(mode="json", by_alias=True),
)
packet_logger.log_sse_emit("tool_call_start", session_id)
yield _serialize_acp_event(acp_event, "tool_call_start")
yield _serialize_event(opencode_event, "tool_call_start")
elif isinstance(acp_event, ToolCallProgress):
event_data = acp_event.model_dump(
elif isinstance(opencode_event, OpenCodeToolCallProgress):
event_data = opencode_event.model_dump(
mode="json", by_alias=True, exclude_none=False
)
event_data["type"] = "tool_call_progress"
event_data["timestamp"] = datetime.now(tz=timezone.utc).isoformat()
event_data.setdefault(
"timestamp", datetime.now(tz=timezone.utc).isoformat()
)
# Check if this is a TodoWrite tool call
tool_name = (event_data.get("title") or "").lower()
is_todo_write = tool_name in ("todowrite", "todo_write")
# Check if this is a TodoWrite tool call.
tool_name = (
event_data.get("tool_name")
or event_data.get("toolName")
or event_data.get("title")
or ""
)
tool_name_lower = str(tool_name).lower()
is_todo_write = tool_name_lower in ("todowrite", "todo_write")
# Check if this is a Task (subagent) tool call
raw_input = event_data.get("rawInput") or {}
# Check if this is a Task (subagent) tool call.
raw_input_obj = (
event_data.get("raw_input") or event_data.get("rawInput") or {}
)
raw_input = raw_input_obj if isinstance(raw_input_obj, dict) else {}
is_task_tool = (
tool_name == "task"
tool_name_lower == "task"
or raw_input.get("subagent_type") is not None
or raw_input.get("subagentType") is not None
)
tool_call_id = (
event_data.get("tool_call_id")
or event_data.get("toolCallId")
or opencode_event.tool_call_id
)
tool_call_id_str = (
tool_call_id if isinstance(tool_call_id, str) else None
)
if is_task_tool and tool_call_id_str:
subagent_session_id = _extract_task_subagent_session_id(
event_data
)
if subagent_session_id:
existing_mapped = task_call_to_subagent_session.get(
tool_call_id_str
)
if (
existing_mapped is not None
and existing_mapped != subagent_session_id
):
subagent_session_to_task_call.pop(existing_mapped, None)
task_call_to_subagent_session[tool_call_id_str] = (
subagent_session_id
)
subagent_session_to_task_call[subagent_session_id] = (
tool_call_id_str
)
yield from _flush_buffered_subagent_packets(
subagent_session_id
)
# Save to DB:
# - For TodoWrite: Save every progress update (todos change frequently)
# - For other tools: Only save when status="completed"
if is_todo_write or acp_event.status == "completed":
if is_todo_write or opencode_event.status == "completed":
create_message(
session_id=session_id,
message_type=MessageType.ASSISTANT,
@@ -1346,100 +1579,56 @@ class SessionManager:
db_session=self._db_session,
)
# For completed Task tools, also save the output as an agent_message
# This allows the task output to be rendered as assistant text on reload
if is_task_tool and acp_event.status == "completed":
raw_output = event_data.get("rawOutput") or {}
task_output = raw_output.get("output")
if task_output and isinstance(task_output, str):
# Strip task_metadata from the output
metadata_idx = task_output.find("<task_metadata>")
if metadata_idx >= 0:
task_output = task_output[:metadata_idx].strip()
if task_output:
# Create agent_message packet for the task output
task_output_packet = {
"type": "agent_message",
"content": {"type": "text", "text": task_output},
"source": "task_output",
"timestamp": datetime.now(
tz=timezone.utc
).isoformat(),
}
create_message(
session_id=session_id,
message_type=MessageType.ASSISTANT,
turn_index=state.turn_index,
message_metadata=task_output_packet,
db_session=self._db_session,
)
# Log full event to packet logger (can handle large payloads)
packet_logger.log("tool_call_progress", event_data)
packet_logger.log_sse_emit("tool_call_progress", session_id)
yield _serialize_acp_event(acp_event, "tool_call_progress")
yield _serialize_event(opencode_event, "tool_call_progress")
elif isinstance(acp_event, AgentPlanUpdate):
event_data = acp_event.model_dump(
mode="json", by_alias=True, exclude_none=False
)
event_data["type"] = "agent_plan_update"
event_data["timestamp"] = datetime.now(tz=timezone.utc).isoformat()
# Upsert plan immediately
plan_msg = upsert_agent_plan(
session_id=session_id,
turn_index=state.turn_index,
plan_metadata=event_data,
db_session=self._db_session,
existing_plan_id=state.plan_message_id,
)
state.plan_message_id = plan_msg.id
packet_logger.log("agent_plan_update", event_data)
packet_logger.log_sse_emit("agent_plan_update", session_id)
yield _serialize_acp_event(acp_event, "agent_plan_update")
elif isinstance(acp_event, CurrentModeUpdate):
event_data = acp_event.model_dump(
mode="json", by_alias=True, exclude_none=False
)
event_data["type"] = "current_mode_update"
packet_logger.log("current_mode_update", event_data)
packet_logger.log_sse_emit("current_mode_update", session_id)
yield _serialize_acp_event(acp_event, "current_mode_update")
elif isinstance(acp_event, PromptResponse):
event_data = acp_event.model_dump(
elif isinstance(opencode_event, OpenCodePromptResponse):
event_data = opencode_event.model_dump(
mode="json", by_alias=True, exclude_none=False
)
event_data["type"] = "prompt_response"
packet_logger.log("prompt_response", event_data)
packet_logger.log_sse_emit("prompt_response", session_id)
yield _serialize_acp_event(acp_event, "prompt_response")
yield _serialize_event(opencode_event, "prompt_response")
elif isinstance(acp_event, ACPError):
event_data = acp_event.model_dump(
elif isinstance(opencode_event, OpenCodeError):
event_data = opencode_event.model_dump(
mode="json", by_alias=True, exclude_none=False
)
event_data["type"] = "error"
packet_logger.log("error", event_data)
packet_logger.log_sse_emit("error", session_id)
yield _serialize_acp_event(acp_event, "error")
yield _serialize_event(opencode_event, "error")
else:
# Unrecognized packet type - log it but don't stream to frontend
event_type_name = type(acp_event).__name__
event_data = acp_event.model_dump(
mode="json", by_alias=True, exclude_none=False
)
event_type_name = type(opencode_event).__name__
if hasattr(opencode_event, "model_dump"):
event_data = opencode_event.model_dump(
mode="json", by_alias=True, exclude_none=False
)
else:
event_data = {"raw": str(opencode_event)}
event_data["type"] = f"unrecognized_{event_type_name.lower()}"
packet_logger.log(
f"unrecognized_{event_type_name.lower()}", event_data
)
# Save all accumulated state at end of streaming
if buffered_subagent_packets_by_session:
packet_logger.log_raw(
"DROPPED-UNMAPPED-SUBAGENT-PACKETS",
{
"session_id": str(session_id),
"unmapped_subagent_session_ids": list(
buffered_subagent_packets_by_session.keys()
),
"packet_counts": {
sid: len(packets)
for sid, packets in buffered_subagent_packets_by_session.items()
},
},
)
_save_build_turn(state)
# Log streaming completion
@@ -1498,24 +1687,22 @@ class SessionManager:
logger.exception("Unexpected error in build message streaming")
yield _format_packet_event(error_packet)
def _get_event_type(self, acp_event: Any) -> str:
"""Get the event type string for an ACP event."""
if isinstance(acp_event, AgentMessageChunk):
def _get_event_type(self, event: Any) -> str:
"""Get the event type string for an OpenCode event."""
if isinstance(event, OpenCodeAgentMessageChunk):
return "agent_message_chunk"
elif isinstance(acp_event, AgentThoughtChunk):
elif isinstance(event, OpenCodeAgentThoughtChunk):
return "agent_thought_chunk"
elif isinstance(acp_event, ToolCallStart):
elif isinstance(event, OpenCodeToolCallStart):
return "tool_call_start"
elif isinstance(acp_event, ToolCallProgress):
elif isinstance(event, OpenCodeToolCallProgress):
return "tool_call_progress"
elif isinstance(acp_event, AgentPlanUpdate):
return "agent_plan_update"
elif isinstance(acp_event, CurrentModeUpdate):
return "current_mode_update"
elif isinstance(acp_event, PromptResponse):
elif isinstance(event, OpenCodePromptResponse):
return "prompt_response"
elif isinstance(acp_event, ACPError):
elif isinstance(event, OpenCodeError):
return "error"
elif isinstance(event, OpenCodeSessionEstablished):
return "opencode_session_established"
return "unknown"
# =========================================================================

View File

@@ -27,11 +27,14 @@ from onyx.server.features.build.configs import SANDBOX_BACKEND
from onyx.server.features.build.configs import SANDBOX_NAMESPACE
from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_START
from onyx.server.features.build.configs import SandboxBackend
from onyx.server.features.build.sandbox.base import ACPEvent
from onyx.server.features.build.sandbox.kubernetes.kubernetes_sandbox_manager import (
KubernetesSandboxManager,
)
from onyx.server.features.build.sandbox.models import LLMProviderConfig
from onyx.server.features.build.sandbox.opencode import OpenCodeAgentMessageChunk
from onyx.server.features.build.sandbox.opencode import OpenCodeError
from onyx.server.features.build.sandbox.opencode import OpenCodeEvent
from onyx.server.features.build.sandbox.opencode import OpenCodePromptResponse
from onyx.utils.logger import setup_logger
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
@@ -309,12 +312,9 @@ def test_kubernetes_sandbox_send_message() -> None:
This test:
1. Creates a sandbox pod
2. Sends a simple message via send_message()
3. Verifies we receive ACP events back (agent responses)
3. Verifies we receive streamed OpenCode events back (agent responses)
4. Cleans up by terminating the sandbox
"""
from acp.schema import AgentMessageChunk
from acp.schema import Error
from acp.schema import PromptResponse
_is_kubernetes_available()
@@ -365,8 +365,13 @@ def test_kubernetes_sandbox_send_message() -> None:
)
# Send a simple message
events: list[ACPEvent] = []
for event in manager.send_message(sandbox_id, session_id, "What is 2 + 2?"):
events: list[OpenCodeEvent] = []
for event in manager.send_message(
sandbox_id,
session_id,
SANDBOX_NEXTJS_PORT_START,
"What is 2 + 2?",
):
events.append(event)
# Verify we received events
@@ -376,16 +381,16 @@ def test_kubernetes_sandbox_send_message() -> None:
print(f"Recieved event: {event}")
# Check for errors
errors = [e for e in events if isinstance(e, Error)]
errors = [e for e in events if isinstance(e, OpenCodeError)]
assert len(errors) == 0, f"Should not receive errors: {errors}"
# Verify we received some agent message content or a final response
message_chunks = [e for e in events if isinstance(e, AgentMessageChunk)]
prompt_responses = [e for e in events if isinstance(e, PromptResponse)]
message_chunks = [e for e in events if isinstance(e, OpenCodeAgentMessageChunk)]
prompt_responses = [e for e in events if isinstance(e, OpenCodePromptResponse)]
assert (
len(message_chunks) > 0 or len(prompt_responses) > 0
), "Should receive either AgentMessageChunk or PromptResponse events"
), "Should receive either agent message chunks or prompt response events"
# If we got a PromptResponse, verify it completed successfully
if prompt_responses:

1
hello_test_opencode.txt Normal file
View File

@@ -0,0 +1 @@
hello

View File

@@ -19,7 +19,17 @@ import {
} from "@opal/icons";
import RawOutputBlock from "@/app/craft/components/RawOutputBlock";
import DiffView from "@/app/craft/components/DiffView";
import { ToolCallState, ToolCallKind } from "@/app/craft/types/displayTypes";
import TextChunk from "@/app/craft/components/TextChunk";
import ThinkingCard from "@/app/craft/components/ThinkingCard";
import TodoListCard from "@/app/craft/components/TodoListCard";
import WorkingPill from "@/app/craft/components/WorkingPill";
import {
ToolCallState,
ToolCallKind,
StreamItem,
GroupedStreamItem,
} from "@/app/craft/types/displayTypes";
import { isWorkingToolCall } from "@/app/craft/utils/streamItemHelpers";
interface ToolCallPillProps {
toolCall: ToolCallState;
@@ -113,6 +123,35 @@ function getLanguageHint(toolCall: ToolCallState): string | undefined {
}
}
function groupSubagentStreamItems(items: StreamItem[]): GroupedStreamItem[] {
const grouped: GroupedStreamItem[] = [];
let currentWorkingGroup: ToolCallState[] = [];
const flushWorkingGroup = () => {
const firstToolCall = currentWorkingGroup[0];
if (firstToolCall) {
grouped.push({
type: "working_group",
id: `subagent-working-${firstToolCall.id}`,
toolCalls: [...currentWorkingGroup],
});
currentWorkingGroup = [];
}
};
for (const item of items) {
if (item.type === "tool_call" && isWorkingToolCall(item.toolCall)) {
currentWorkingGroup.push(item.toolCall);
} else {
flushWorkingGroup();
grouped.push(item as GroupedStreamItem);
}
}
flushWorkingGroup();
return grouped;
}
/**
* ToolCallPill - Expandable pill for tool calls
*
@@ -131,6 +170,12 @@ export default function ToolCallPill({ toolCall }: ToolCallPillProps) {
const Icon = getToolIcon(toolCall.kind);
const statusDisplay = getStatusDisplay(toolCall.status);
const StatusIcon = statusDisplay.icon;
const groupedSubagentItems = toolCall.subagentStreamItems
? groupSubagentStreamItems(toolCall.subagentStreamItems)
: [];
const lastSubagentWorkingGroupIndex = groupedSubagentItems.findLastIndex(
(item) => item.type === "working_group"
);
return (
<Collapsible open={isOpen} onOpenChange={setIsOpen}>
@@ -198,10 +243,75 @@ export default function ToolCallPill({ toolCall }: ToolCallPillProps) {
<CollapsibleContent>
<div className="px-3 pb-3 pt-0">
{/* Show diff view for edit operations (not new files) */}
{toolCall.title === "Editing file" &&
toolCall.oldContent !== undefined &&
toolCall.newContent !== undefined ? (
{toolCall.kind === "task" && groupedSubagentItems.length > 0 ? (
<div className="flex flex-col gap-3">
<div className="text-xs font-medium text-text-03 uppercase tracking-wide">
Subagent Activity
</div>
<div className="flex flex-col gap-2">
{groupedSubagentItems.map((item, index) => {
switch (item.type) {
case "text":
return (
<TextChunk key={item.id} content={item.content} />
);
case "thinking":
return (
<ThinkingCard
key={item.id}
content={item.content}
isStreaming={false}
/>
);
case "todo_list":
return (
<TodoListCard
key={item.id}
todoList={item.todoList}
defaultOpen={false}
/>
);
case "working_group":
return (
<WorkingPill
key={item.id}
toolCalls={item.toolCalls}
isLatest={index === lastSubagentWorkingGroupIndex}
/>
);
case "tool_call":
return (
<div
key={item.id}
className="rounded-md border border-border-01 bg-background-neutral-01 p-2"
>
<div className="text-sm font-medium text-text-04">
{item.toolCall.title}
</div>
{item.toolCall.description && (
<div className="text-sm text-text-03">
{item.toolCall.description}
</div>
)}
{item.toolCall.rawOutput && (
<div className="pt-2">
<RawOutputBlock
content={item.toolCall.rawOutput}
maxHeight="220px"
/>
</div>
)}
</div>
);
default:
return null;
}
})}
</div>
</div>
) : toolCall.title === "Editing file" &&
toolCall.oldContent !== undefined &&
toolCall.newContent !== undefined ? (
<DiffView
oldContent={toolCall.oldContent}
newContent={toolCall.newContent}

View File

@@ -40,6 +40,7 @@ import {
import { genId } from "@/app/craft/utils/streamItemHelpers";
import { parsePacket } from "@/app/craft/utils/parsePacket";
import { convertSubagentPacketDataToStreamItems } from "@/app/craft/utils/subagentStreamItems";
/**
* Convert loaded messages (with message_metadata) to StreamItem[] format.
@@ -49,12 +50,17 @@ import { parsePacket } from "@/app/craft/utils/parsePacket";
* - agent_message: {type: "agent_message", content: {type: "text", text: "..."}}
* - agent_thought: {type: "agent_thought", content: {type: "text", text: "..."}}
* - tool_call_progress: Full tool call data with status="completed"
* - subagent_packet: Incremental packet routed to a parent task tool call
* - agent_plan_update: Plan entries (not rendered as stream items)
*
* This function converts assistant messages to StreamItem[] for rendering.
*/
function convertMessagesToStreamItems(messages: BuildMessage[]): StreamItem[] {
const items: StreamItem[] = [];
const pendingSubagentPacketsByToolCall = new Map<
string,
Record<string, unknown>[]
>();
for (const message of messages) {
if (message.type === "user") continue;
@@ -116,6 +122,16 @@ function convertMessagesToStreamItems(messages: BuildMessage[]): StreamItem[] {
});
}
} else {
const pendingSubagentPacketData =
pendingSubagentPacketsByToolCall.get(packet.toolCallId) ?? [];
const subagentPacketData =
packet.subagentPacketData.length > 0
? packet.subagentPacketData
: pendingSubagentPacketData;
if (subagentPacketData.length > 0) {
pendingSubagentPacketsByToolCall.delete(packet.toolCallId);
}
items.push({
type: "tool_call",
id: packet.toolCallId,
@@ -128,6 +144,13 @@ function convertMessagesToStreamItems(messages: BuildMessage[]): StreamItem[] {
status: packet.status,
rawOutput: packet.rawOutput,
subagentType: packet.subagentType ?? undefined,
subagentSessionId: packet.subagentSessionId ?? undefined,
subagentPacketData:
subagentPacketData.length > 0 ? subagentPacketData : undefined,
subagentStreamItems:
subagentPacketData.length > 0
? convertSubagentPacketDataToStreamItems(subagentPacketData)
: undefined,
isNewFile: packet.isNewFile,
oldContent: packet.oldContent,
newContent: packet.newContent,
@@ -136,6 +159,46 @@ function convertMessagesToStreamItems(messages: BuildMessage[]): StreamItem[] {
}
break;
case "subagent_packet":
if (!packet.parentToolCallId || !packet.packetData) break;
// Update existing task tool card if present.
const taskToolIndex = items.findIndex(
(item) =>
item.type === "tool_call" &&
item.toolCall.id === packet.parentToolCallId
);
if (taskToolIndex >= 0) {
const existingItem = items[taskToolIndex];
if (existingItem && existingItem.type === "tool_call") {
const existingPacketData =
existingItem.toolCall.subagentPacketData ?? [];
const nextPacketData = [...existingPacketData, packet.packetData];
items[taskToolIndex] = {
...existingItem,
toolCall: {
...existingItem.toolCall,
subagentSessionId:
packet.subagentSessionId ??
existingItem.toolCall.subagentSessionId,
subagentPacketData: nextPacketData,
subagentStreamItems:
convertSubagentPacketDataToStreamItems(nextPacketData),
},
};
}
break;
}
// If the task pill hasn't been created yet, cache until tool_call_progress arrives.
const existingPending =
pendingSubagentPacketsByToolCall.get(packet.parentToolCallId) ?? [];
pendingSubagentPacketsByToolCall.set(packet.parentToolCallId, [
...existingPending,
packet.packetData,
]);
break;
// agent_plan_update and other packet types are not rendered as stream items
default:
break;

View File

@@ -21,6 +21,7 @@ import { StreamItem } from "@/app/craft/types/displayTypes";
import { genId } from "@/app/craft/utils/streamItemHelpers";
import { parsePacket } from "@/app/craft/utils/parsePacket";
import { convertSubagentPacketDataToStreamItems } from "@/app/craft/utils/subagentStreamItems";
/**
* Hook for handling message streaming in build sessions.
@@ -239,6 +240,8 @@ export function useBuildStreaming() {
command: "",
rawOutput: "",
subagentType: undefined,
subagentPacketData: undefined,
subagentStreamItems: undefined,
isNewFile: true,
oldContent: "",
newContent: "",
@@ -266,6 +269,13 @@ export function useBuildStreaming() {
command: parsed.command,
rawOutput: parsed.rawOutput,
subagentType: parsed.subagentType ?? undefined,
subagentSessionId: parsed.subagentSessionId ?? undefined,
...(parsed.subagentPacketData.length > 0 && {
subagentPacketData: parsed.subagentPacketData,
subagentStreamItems: convertSubagentPacketDataToStreamItems(
parsed.subagentPacketData
),
}),
...(parsed.kind === "edit" && {
isNewFile: parsed.isNewFile,
oldContent: parsed.oldContent,
@@ -282,18 +292,37 @@ export function useBuildStreaming() {
}
}
}
break;
}
// Task completion → emit text StreamItem
if (parsed.taskOutput) {
appendStreamItem(sessionId, {
type: "text",
id: genId("task-output"),
content: parsed.taskOutput,
isStreaming: false,
});
lastItemType = "text";
accumulatedText = "";
}
// Subagent packet - append to the parent task card stream
case "subagent_packet": {
if (!parsed.parentToolCallId || !parsed.packetData) break;
const liveSession = useBuildSessionStore
.getState()
.sessions.get(sessionId);
if (!liveSession) break;
const taskToolItem = liveSession.streamItems.find(
(item) =>
item.type === "tool_call" &&
item.toolCall.id === parsed.parentToolCallId
);
if (!taskToolItem || taskToolItem.type !== "tool_call") break;
const existingPacketData =
taskToolItem.toolCall.subagentPacketData ?? [];
const nextPacketData = [...existingPacketData, parsed.packetData];
updateToolCallStreamItem(sessionId, parsed.parentToolCallId, {
subagentSessionId:
parsed.subagentSessionId ??
taskToolItem.toolCall.subagentSessionId,
subagentPacketData: nextPacketData,
subagentStreamItems:
convertSubagentPacketDataToStreamItems(nextPacketData),
});
break;
}

View File

@@ -53,6 +53,12 @@ export interface ToolCallState {
rawOutput: string; // Full output for expanded view
/** For task tool calls: the subagent type (e.g., "explore", "plan") */
subagentType?: string;
/** For task tool calls: OpenCode session id used by the subagent */
subagentSessionId?: string;
/** For task tool calls: raw packet history used to build subagentStreamItems */
subagentPacketData?: Record<string, unknown>[];
/** For task tool calls: live stream items from the subagent session */
subagentStreamItems?: StreamItem[];
/** For edit operations: whether this is a new file (write) or edit of existing */
isNewFile?: boolean;
/** For edit operations: the old content before the edit (empty for new files) */

View File

@@ -1,7 +1,7 @@
/**
* Packet Types
*
* Type definitions for raw and parsed ACP packets.
* Type definitions for raw and parsed streamed packets.
* Centralizes all snake_case / camelCase field resolution.
* Defines the ParsedPacket discriminated union consumed by both
* useBuildStreaming (live SSE) and useBuildSessionStore (DB reload).
@@ -99,14 +99,21 @@ export interface ParsedToolCallProgress {
rawOutput: string;
filePath: string; // Session-relative
subagentType: string | null;
subagentSessionId: string | null;
subagentPacketData: Record<string, unknown>[];
// Edit-specific
isNewFile: boolean;
oldContent: string;
newContent: string;
// Todo-specific
todos: TodoItem[];
// Task-specific
taskOutput: string | null;
}
export interface ParsedSubagentPacket {
type: "subagent_packet";
parentToolCallId: string;
subagentSessionId: string | null;
packetData: Record<string, unknown> | null;
}
export interface ParsedPromptResponse {
@@ -138,6 +145,7 @@ export type ParsedPacket =
| ParsedThinkingChunk
| ParsedToolCallStart
| ParsedToolCallProgress
| ParsedSubagentPacket
| ParsedPromptResponse
| ParsedArtifact
| ParsedError

View File

@@ -1,7 +1,7 @@
/**
* Parse Packet
*
* Single entry point for converting raw ACP packets into strongly-typed
* Single entry point for converting raw streamed packets into strongly-typed
* ParsedPacket values. All field resolution, tool detection, and path
* sanitization happen here. Consumers never touch Record<string, unknown>.
*/
@@ -15,6 +15,7 @@ import {
type ParsedPacket,
type ParsedToolCallStart,
type ParsedToolCallProgress,
type ParsedSubagentPacket,
type ParsedArtifact,
type ToolName,
type ToolKind,
@@ -42,6 +43,9 @@ export function parsePacket(raw: unknown): ParsedPacket {
case "tool_call_progress":
return parseToolCallProgress(p);
case "subagent_packet":
return parseSubagentPacket(p);
case "prompt_response":
return { type: "prompt_response" };
@@ -131,7 +135,7 @@ function resolveKind(toolName: ToolName, rawKind: string | null): ToolKind {
// ─── Shared Helpers ───────────────────────────────────────────────
/** Extract text from ACP content structure (string, {type,text}, or array) */
/** Extract text from content structure (string, {type,text}, or array) */
function extractText(content: unknown): string {
if (!content) return "";
if (typeof content === "string") return content;
@@ -398,16 +402,39 @@ function normalizeTodoStatus(status: unknown): TodoStatus {
return "pending";
}
// ─── Task Output Extraction ──────────────────────────────────────
function extractTaskOutput(ro: Record<string, unknown> | null): string | null {
if (!ro?.output || typeof ro.output !== "string") return null;
return (
ro.output.replace(/<task_metadata>[\s\S]*?<\/task_metadata>/g, "").trim() ||
null
// ─── Task Subagent Packet Extraction ─────────────────────────────
function extractSubagentPacketData(
ro: Record<string, unknown> | null
): Record<string, unknown>[] {
const raw = ro?.subagent_packets ?? ro?.subagentPackets ?? null;
if (!Array.isArray(raw)) return [];
return raw.filter(
(item): item is Record<string, unknown> =>
!!item && typeof item === "object"
);
}
function parseSubagentPacket(p: Record<string, unknown>): ParsedSubagentPacket {
const parentToolCallId = (p.parent_tool_call_id ??
p.parentToolCallId ??
"") as string | undefined;
const subagentSessionId = (p.subagent_session_id ??
p.subagentSessionId ??
null) as string | null;
const packetRaw = p.packet;
const packetData =
packetRaw && typeof packetRaw === "object"
? (packetRaw as Record<string, unknown>)
: null;
return {
type: "subagent_packet",
parentToolCallId: parentToolCallId || "",
subagentSessionId,
packetData,
};
}
// ─── Artifact Parsing ─────────────────────────────────────────────
function parseArtifact(p: Record<string, unknown>): ParsedArtifact {
@@ -505,10 +532,16 @@ function parseToolCallProgress(
const subagentType = (ri?.subagent_type ?? ri?.subagentType ?? null) as
| string
| null;
const taskOutput =
toolName === "task" && status === "completed"
? extractTaskOutput(ro)
: null;
const rawMetadata = (ro?.metadata ?? null) as Record<string, unknown> | null;
const subagentSessionId = (ro?.sessionId ??
ro?.sessionID ??
ro?.session_id ??
rawMetadata?.sessionId ??
rawMetadata?.sessionID ??
rawMetadata?.session_id ??
null) as string | null;
const subagentPacketData =
toolName === "task" ? extractSubagentPacketData(ro) : [];
return {
type: "tool_call_progress",
@@ -523,6 +556,8 @@ function parseToolCallProgress(
rawOutput,
filePath,
subagentType,
subagentSessionId,
subagentPacketData,
isNewFile:
diffData.oldText || diffData.newText
? diffData.isNewFile
@@ -530,6 +565,5 @@ function parseToolCallProgress(
oldContent: diffData.oldText,
newContent: diffData.newText,
todos,
taskOutput,
};
}

View File

@@ -0,0 +1,191 @@
"use client";
import {
StreamItem,
ToolCallState,
TodoListState,
} from "../types/displayTypes";
import { parsePacket } from "./parsePacket";
import { genId } from "./streamItemHelpers";
const MAX_SUBAGENT_DEPTH = 3;
function upsertToolCall(
items: StreamItem[],
toolCallId: string,
updates: Partial<ToolCallState>
) {
const existingIndex = items.findIndex(
(item) => item.type === "tool_call" && item.toolCall.id === toolCallId
);
if (existingIndex >= 0) {
const existing = items[existingIndex];
if (existing && existing.type === "tool_call") {
items[existingIndex] = {
...existing,
toolCall: {
...existing.toolCall,
...updates,
},
};
}
return;
}
items.push({
type: "tool_call",
id: toolCallId,
toolCall: {
id: toolCallId,
kind: updates.kind || "other",
title: updates.title || "Running tool",
description: updates.description || "",
command: updates.command || "",
status: updates.status || "pending",
rawOutput: updates.rawOutput || "",
subagentType: updates.subagentType,
subagentSessionId: updates.subagentSessionId,
subagentStreamItems: updates.subagentStreamItems,
isNewFile: updates.isNewFile ?? true,
oldContent: updates.oldContent || "",
newContent: updates.newContent || "",
},
});
}
function upsertTodoList(
items: StreamItem[],
todoListId: string,
todoList: TodoListState
) {
const existingIndex = items.findIndex(
(item) => item.type === "todo_list" && item.todoList.id === todoListId
);
if (existingIndex >= 0) {
const existing = items[existingIndex];
if (existing && existing.type === "todo_list") {
items[existingIndex] = {
...existing,
todoList: {
...existing.todoList,
...todoList,
},
};
}
return;
}
items.push({
type: "todo_list",
id: todoListId,
todoList,
});
}
export function convertSubagentPacketDataToStreamItems(
packetData: Record<string, unknown>[],
depth = 0
): StreamItem[] {
if (depth >= MAX_SUBAGENT_DEPTH) {
return [];
}
const items: StreamItem[] = [];
for (const rawPacket of packetData) {
const parsed = parsePacket(rawPacket);
switch (parsed.type) {
case "text_chunk": {
if (!parsed.text) break;
const lastItem = items[items.length - 1];
if (lastItem && lastItem.type === "text") {
lastItem.content += parsed.text;
} else {
items.push({
type: "text",
id: genId("subagent-text"),
content: parsed.text,
isStreaming: false,
});
}
break;
}
case "thinking_chunk": {
if (!parsed.text) break;
const lastItem = items[items.length - 1];
if (lastItem && lastItem.type === "thinking") {
lastItem.content += parsed.text;
} else {
items.push({
type: "thinking",
id: genId("subagent-thinking"),
content: parsed.text,
isStreaming: false,
});
}
break;
}
case "tool_call_start": {
if (parsed.isTodo) break;
upsertToolCall(items, parsed.toolCallId, {
id: parsed.toolCallId,
kind: parsed.kind,
status: "pending",
title: "",
description: "",
command: "",
rawOutput: "",
});
break;
}
case "tool_call_progress": {
if (parsed.isTodo) {
upsertTodoList(items, parsed.toolCallId, {
id: parsed.toolCallId,
todos: parsed.todos,
isOpen: false,
});
break;
}
upsertToolCall(items, parsed.toolCallId, {
id: parsed.toolCallId,
kind: parsed.kind,
status: parsed.status,
title: parsed.title,
description: parsed.description,
command: parsed.command,
rawOutput: parsed.rawOutput,
subagentType: parsed.subagentType ?? undefined,
subagentSessionId: parsed.subagentSessionId ?? undefined,
subagentStreamItems:
parsed.subagentPacketData.length > 0
? convertSubagentPacketDataToStreamItems(
parsed.subagentPacketData,
depth + 1
)
: undefined,
isNewFile: parsed.isNewFile,
oldContent: parsed.oldContent,
newContent: parsed.newContent,
});
break;
}
case "subagent_packet":
case "prompt_response":
case "artifact_created":
case "error":
case "unknown":
default:
break;
}
}
return items;
}