mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-04-07 16:02:45 +00:00
Compare commits
1 Commits
cli/v0.2.1
...
whuang/cra
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aecc8db764 |
@@ -0,0 +1,28 @@
|
||||
"""Add opencode_session_id to build_session
|
||||
|
||||
Revision ID: 3fa4f9869b2d
|
||||
Revises: 19c0ccb01687
|
||||
Create Date: 2026-02-17 00:00:00.000000
|
||||
|
||||
"""
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "3fa4f9869b2d"
|
||||
down_revision = "19c0ccb01687"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column(
|
||||
"build_session",
|
||||
sa.Column("opencode_session_id", sa.String(), nullable=True),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_column("build_session", "opencode_session_id")
|
||||
@@ -4709,6 +4709,7 @@ class BuildSession(Base):
|
||||
nullable=False,
|
||||
)
|
||||
nextjs_port: Mapped[int | None] = mapped_column(Integer, nullable=True)
|
||||
opencode_session_id: Mapped[str | None] = mapped_column(String, nullable=True)
|
||||
demo_data_enabled: Mapped[bool] = mapped_column(
|
||||
Boolean, nullable=False, server_default=text("true")
|
||||
)
|
||||
|
||||
@@ -169,7 +169,7 @@ class MessageRequest(BaseModel):
|
||||
class MessageResponse(BaseModel):
|
||||
"""Response containing message details.
|
||||
|
||||
All message data is stored in message_metadata as JSON (the raw ACP packet).
|
||||
All message data is stored in message_metadata as JSON (the raw streamed packet).
|
||||
The turn_index groups all assistant responses under the user prompt they respond to.
|
||||
|
||||
Packet types in message_metadata:
|
||||
@@ -177,7 +177,6 @@ class MessageResponse(BaseModel):
|
||||
- agent_message: {type: "agent_message", content: {...}}
|
||||
- agent_thought: {type: "agent_thought", content: {...}}
|
||||
- tool_call_progress: {type: "tool_call_progress", status: "completed", ...}
|
||||
- agent_plan_update: {type: "agent_plan_update", entries: [...]}
|
||||
"""
|
||||
|
||||
id: str
|
||||
|
||||
@@ -1,27 +1,23 @@
|
||||
"""Build Mode packet types for streaming agent responses.
|
||||
|
||||
This module defines CUSTOM Onyx packet types that extend ACP (Agent Client Protocol).
|
||||
ACP events are passed through directly from the agent - this module only contains
|
||||
This module defines CUSTOM Onyx packet types for build-mode streaming.
|
||||
Agent events are passed through directly from OpenCode - this module only contains
|
||||
Onyx-specific extensions like artifacts and file operations.
|
||||
|
||||
All packets use SSE (Server-Sent Events) format with `event: message` and include
|
||||
a `type` field to distinguish packet types.
|
||||
|
||||
ACP events (passed through directly from acp.schema):
|
||||
Agent events (passed through directly):
|
||||
- agent_message_chunk: Text/image content from agent
|
||||
- agent_thought_chunk: Agent's internal reasoning
|
||||
- tool_call_start: Tool invocation started
|
||||
- tool_call_progress: Tool execution progress/result
|
||||
- agent_plan_update: Agent's execution plan
|
||||
- current_mode_update: Agent mode change
|
||||
- prompt_response: Agent finished processing
|
||||
- error: An error occurred
|
||||
|
||||
Custom Onyx packets (defined here):
|
||||
- error: Onyx-specific errors (e.g., session not found)
|
||||
|
||||
Based on:
|
||||
- Agent Client Protocol (ACP): https://agentclientprotocol.com
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
@@ -66,6 +66,11 @@ SANDBOX_SNAPSHOTS_BUCKET = os.environ.get(
|
||||
# Next.js preview server port range
|
||||
SANDBOX_NEXTJS_PORT_START = int(os.environ.get("SANDBOX_NEXTJS_PORT_START", "3010"))
|
||||
SANDBOX_NEXTJS_PORT_END = int(os.environ.get("SANDBOX_NEXTJS_PORT_END", "3100"))
|
||||
# Per-session OpenCode server port is derived from nextjs_port + this offset.
|
||||
# Example: nextjs 3010 -> opencode 13010
|
||||
SANDBOX_OPENCODE_PORT_OFFSET = int(
|
||||
os.environ.get("SANDBOX_OPENCODE_PORT_OFFSET", "10000")
|
||||
)
|
||||
|
||||
# File upload configuration
|
||||
MAX_UPLOAD_FILE_SIZE_MB = int(os.environ.get("BUILD_MAX_UPLOAD_FILE_SIZE_MB", "50"))
|
||||
@@ -117,12 +122,25 @@ ENABLE_CRAFT = os.environ.get("ENABLE_CRAFT", "false").lower() == "true"
|
||||
SSE_KEEPALIVE_INTERVAL = float(os.environ.get("SSE_KEEPALIVE_INTERVAL", "15.0"))
|
||||
|
||||
# ============================================================================
|
||||
# ACP (Agent Communication Protocol) Configuration
|
||||
# OpenCode Message Streaming Configuration
|
||||
# ============================================================================
|
||||
|
||||
# Timeout for ACP message processing in seconds
|
||||
# This is the maximum time to wait for a complete response from the agent
|
||||
ACP_MESSAGE_TIMEOUT = float(os.environ.get("ACP_MESSAGE_TIMEOUT", "900.0"))
|
||||
# Timeout for OpenCode message processing in seconds.
|
||||
# This is the maximum time to wait for a complete response from the agent.
|
||||
OPENCODE_MESSAGE_TIMEOUT = float(
|
||||
os.environ.get(
|
||||
"OPENCODE_MESSAGE_TIMEOUT",
|
||||
os.environ.get("ACP_MESSAGE_TIMEOUT", "900.0"),
|
||||
)
|
||||
)
|
||||
|
||||
# Backwards-compatible alias for older ACP-named env usage.
|
||||
ACP_MESSAGE_TIMEOUT = OPENCODE_MESSAGE_TIMEOUT
|
||||
|
||||
# Maximum time to wait for an OpenCode server to become healthy.
|
||||
OPENCODE_SERVER_STARTUP_TIMEOUT_SECONDS = float(
|
||||
os.environ.get("OPENCODE_SERVER_STARTUP_TIMEOUT_SECONDS", "30.0")
|
||||
)
|
||||
|
||||
# ============================================================================
|
||||
# Rate Limiting Configuration
|
||||
|
||||
@@ -159,6 +159,32 @@ def update_session_status(
|
||||
logger.info(f"Updated build session {session_id} status to {status}")
|
||||
|
||||
|
||||
def update_opencode_session_id(
|
||||
session_id: UUID,
|
||||
opencode_session_id: str | None,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
"""Persist the OpenCode session ID associated with a build session."""
|
||||
session = (
|
||||
db_session.query(BuildSession)
|
||||
.filter(BuildSession.id == session_id)
|
||||
.one_or_none()
|
||||
)
|
||||
if not session:
|
||||
return
|
||||
|
||||
if session.opencode_session_id == opencode_session_id:
|
||||
return
|
||||
|
||||
session.opencode_session_id = opencode_session_id
|
||||
db_session.commit()
|
||||
logger.info(
|
||||
"Updated build session %s opencode_session_id to %s",
|
||||
session_id,
|
||||
opencode_session_id,
|
||||
)
|
||||
|
||||
|
||||
def delete_build_session__no_commit(
|
||||
session_id: UUID,
|
||||
user_id: UUID,
|
||||
@@ -284,7 +310,7 @@ def create_message(
|
||||
session_id: Session UUID
|
||||
message_type: Type of message (USER, ASSISTANT, SYSTEM)
|
||||
turn_index: 0-indexed user message number this message belongs to
|
||||
message_metadata: Required structured data (the raw ACP packet JSON)
|
||||
message_metadata: Required structured data (the raw streamed packet JSON)
|
||||
db_session: Database session
|
||||
"""
|
||||
message = BuildMessage(
|
||||
|
||||
@@ -18,23 +18,27 @@ import threading
|
||||
from abc import ABC
|
||||
from abc import abstractmethod
|
||||
from collections.abc import Generator
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
from onyx.server.features.build.configs import SANDBOX_BACKEND
|
||||
from onyx.server.features.build.configs import SANDBOX_OPENCODE_PORT_OFFSET
|
||||
from onyx.server.features.build.configs import SandboxBackend
|
||||
from onyx.server.features.build.sandbox.models import FilesystemEntry
|
||||
from onyx.server.features.build.sandbox.models import LLMProviderConfig
|
||||
from onyx.server.features.build.sandbox.models import SandboxInfo
|
||||
from onyx.server.features.build.sandbox.models import SnapshotResult
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeEvent
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
# ACPEvent is a union type defined in both local and kubernetes modules
|
||||
# Using Any here to avoid circular imports - the actual type checking
|
||||
# happens in the implementation modules
|
||||
ACPEvent = Any
|
||||
# Backward compatibility for legacy imports/tests.
|
||||
ACPEvent = OpenCodeEvent
|
||||
|
||||
|
||||
def get_opencode_server_port(nextjs_port: int) -> int:
|
||||
"""Derive per-session OpenCode server port from the session Next.js port."""
|
||||
return nextjs_port + SANDBOX_OPENCODE_PORT_OFFSET
|
||||
|
||||
|
||||
class SandboxManager(ABC):
|
||||
@@ -273,14 +277,33 @@ class SandboxManager(ABC):
|
||||
"""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def get_opencode_server_url(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
nextjs_port: int,
|
||||
) -> str:
|
||||
"""Get URL for the session's OpenCode server.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
nextjs_port: The session's Next.js port
|
||||
|
||||
Returns:
|
||||
URL for the session-scoped OpenCode server.
|
||||
"""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def send_message(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
session_id: UUID,
|
||||
nextjs_port: int,
|
||||
message: str,
|
||||
) -> Generator[ACPEvent, None, None]:
|
||||
"""Send a message to the CLI agent and stream typed ACP events.
|
||||
opencode_session_id: str | None = None,
|
||||
) -> Generator[OpenCodeEvent, None, None]:
|
||||
"""Send a message to OpenCode and stream typed events.
|
||||
|
||||
The agent runs in the session-specific workspace:
|
||||
sessions/$session_id/
|
||||
@@ -288,10 +311,12 @@ class SandboxManager(ABC):
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
session_id: The session ID (determines workspace directory)
|
||||
nextjs_port: The session's allocated Next.js port
|
||||
message: The message content to send
|
||||
opencode_session_id: Existing OpenCode session ID to resume
|
||||
|
||||
Yields:
|
||||
Typed ACP schema event objects
|
||||
Typed OpenCode event objects
|
||||
|
||||
Raises:
|
||||
RuntimeError: If agent communication fails
|
||||
|
||||
@@ -3,10 +3,10 @@
|
||||
These modules are implementation details and should only be used by KubernetesSandboxManager.
|
||||
"""
|
||||
|
||||
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
|
||||
ACPEvent,
|
||||
from onyx.server.features.build.sandbox.kubernetes.internal.opencode_exec_client import (
|
||||
OpenCodeExecClient,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"ACPEvent",
|
||||
"OpenCodeExecClient",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,272 @@
|
||||
"""Run OpenCode client commands inside a sandbox pod via Kubernetes exec."""
|
||||
|
||||
import json
|
||||
import time
|
||||
from collections.abc import Generator
|
||||
|
||||
from kubernetes import client # type: ignore
|
||||
from kubernetes.stream import stream as k8s_stream # type: ignore
|
||||
from kubernetes.stream.ws_client import WSClient # type: ignore
|
||||
|
||||
from onyx.server.features.build.api.packet_logger import get_packet_logger
|
||||
from onyx.server.features.build.configs import OPENCODE_MESSAGE_TIMEOUT
|
||||
from onyx.server.features.build.configs import SSE_KEEPALIVE_INTERVAL
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeError
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeEvent
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodePromptResponse
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeSSEKeepalive
|
||||
from onyx.server.features.build.sandbox.opencode.parser import (
|
||||
looks_like_session_not_found,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.opencode.parser import OpenCodeEventParser
|
||||
from onyx.server.features.build.sandbox.opencode.run_client import (
|
||||
OpenCodeSessionNotFoundError,
|
||||
)
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
STATUS_CHANNEL = 3
|
||||
|
||||
|
||||
def _drain_lines(buffer: str) -> tuple[list[str], str]:
|
||||
"""Split buffered stream data into complete stripped lines."""
|
||||
if "\n" not in buffer:
|
||||
return [], buffer
|
||||
|
||||
parts = buffer.split("\n")
|
||||
lines = [line.strip() for line in parts[:-1] if line.strip()]
|
||||
return lines, parts[-1]
|
||||
|
||||
|
||||
def _parse_exec_exit_code(status_payload: str) -> int | None:
|
||||
"""Parse Kubernetes exec status channel payload for exit code."""
|
||||
if not status_payload:
|
||||
return None
|
||||
|
||||
payload = status_payload.strip().splitlines()[-1]
|
||||
|
||||
try:
|
||||
status_obj = json.loads(payload)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
causes = status_obj.get("details", {}).get("causes", [])
|
||||
if not isinstance(causes, list):
|
||||
return None
|
||||
|
||||
for cause in causes:
|
||||
if not isinstance(cause, dict):
|
||||
continue
|
||||
if cause.get("reason") == "ExitCode":
|
||||
message = cause.get("message")
|
||||
if isinstance(message, str) and message.isdigit():
|
||||
return int(message)
|
||||
return None
|
||||
|
||||
|
||||
class OpenCodeExecClient:
|
||||
"""Run `opencode run --attach` in a pod and stream normalized events."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
core_api: client.CoreV1Api,
|
||||
pod_name: str,
|
||||
namespace: str,
|
||||
container: str = "sandbox",
|
||||
timeout: float = OPENCODE_MESSAGE_TIMEOUT,
|
||||
keepalive_interval: float = SSE_KEEPALIVE_INTERVAL,
|
||||
) -> None:
|
||||
self._core_api = core_api
|
||||
self._pod_name = pod_name
|
||||
self._namespace = namespace
|
||||
self._container = container
|
||||
self._timeout = timeout
|
||||
self._keepalive_interval = keepalive_interval
|
||||
|
||||
@staticmethod
|
||||
def _read_channel(ws_client: WSClient, channel: int) -> str:
|
||||
try:
|
||||
return ws_client.read_channel(channel)
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def send_message(
|
||||
self,
|
||||
server_url: str,
|
||||
message: str,
|
||||
cwd: str,
|
||||
session_id: str | None = None,
|
||||
) -> Generator[OpenCodeEvent, None, None]:
|
||||
"""Send one user message via pod exec and stream normalized OpenCode events."""
|
||||
command = [
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
'cd "$1" && shift && exec "$@"',
|
||||
"sh",
|
||||
cwd,
|
||||
"opencode",
|
||||
"run",
|
||||
"--attach",
|
||||
server_url,
|
||||
"--format",
|
||||
"json",
|
||||
*(["--session", session_id] if session_id else []),
|
||||
message,
|
||||
]
|
||||
|
||||
packet_logger = get_packet_logger()
|
||||
packet_logger.log_raw(
|
||||
"OPENCODE-K8S-RUN-START",
|
||||
{
|
||||
"pod_name": self._pod_name,
|
||||
"namespace": self._namespace,
|
||||
"container": self._container,
|
||||
"cwd": cwd,
|
||||
"server_url": server_url,
|
||||
"session_id": session_id,
|
||||
"command": command,
|
||||
},
|
||||
)
|
||||
|
||||
ws_client: WSClient | None = None
|
||||
parser = OpenCodeEventParser(session_id=session_id)
|
||||
stdout_buffer = ""
|
||||
stderr_buffer = ""
|
||||
stderr_lines: list[str] = []
|
||||
status_payload = ""
|
||||
start_time = time.monotonic()
|
||||
last_event_time = start_time
|
||||
|
||||
try:
|
||||
ws_client = k8s_stream(
|
||||
self._core_api.connect_get_namespaced_pod_exec,
|
||||
name=self._pod_name,
|
||||
namespace=self._namespace,
|
||||
container=self._container,
|
||||
command=command,
|
||||
stdin=False,
|
||||
stdout=True,
|
||||
stderr=True,
|
||||
tty=False,
|
||||
_preload_content=False,
|
||||
_request_timeout=int(self._timeout) + 30,
|
||||
)
|
||||
|
||||
while True:
|
||||
elapsed = time.monotonic() - start_time
|
||||
if elapsed > self._timeout:
|
||||
yield OpenCodeError(
|
||||
opencode_session_id=parser.session_id,
|
||||
code=-1,
|
||||
message=(
|
||||
f"Timeout waiting for OpenCode response after {self._timeout:.1f}s"
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
if ws_client.is_open():
|
||||
ws_client.update(timeout=0.5)
|
||||
|
||||
try:
|
||||
stdout_chunk = ws_client.read_stdout(timeout=0.1)
|
||||
except Exception:
|
||||
stdout_chunk = ""
|
||||
try:
|
||||
stderr_chunk = ws_client.read_stderr(timeout=0.1)
|
||||
except Exception:
|
||||
stderr_chunk = ""
|
||||
status_chunk = self._read_channel(ws_client, STATUS_CHANNEL)
|
||||
if status_chunk:
|
||||
status_payload += status_chunk
|
||||
|
||||
if stdout_chunk:
|
||||
stdout_buffer += stdout_chunk
|
||||
lines, stdout_buffer = _drain_lines(stdout_buffer)
|
||||
for line in lines:
|
||||
try:
|
||||
raw_event = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
packet_logger.log_raw(
|
||||
"OPENCODE-K8S-RUN-PARSE-ERROR",
|
||||
{"line": line[:500]},
|
||||
)
|
||||
continue
|
||||
|
||||
for event in parser.parse_raw_event(raw_event):
|
||||
last_event_time = time.monotonic()
|
||||
yield event
|
||||
|
||||
if stderr_chunk:
|
||||
stderr_buffer += stderr_chunk
|
||||
lines, stderr_buffer = _drain_lines(stderr_buffer)
|
||||
stderr_lines.extend(lines)
|
||||
|
||||
if (
|
||||
not stdout_chunk
|
||||
and not stderr_chunk
|
||||
and (time.monotonic() - last_event_time) >= self._keepalive_interval
|
||||
):
|
||||
yield OpenCodeSSEKeepalive()
|
||||
last_event_time = time.monotonic()
|
||||
|
||||
if not ws_client.is_open() and not stdout_chunk and not stderr_chunk:
|
||||
break
|
||||
|
||||
# Flush any trailing partial lines
|
||||
trailing_stdout = stdout_buffer.strip()
|
||||
if trailing_stdout:
|
||||
try:
|
||||
raw_event = json.loads(trailing_stdout)
|
||||
for event in parser.parse_raw_event(raw_event):
|
||||
yield event
|
||||
except json.JSONDecodeError:
|
||||
packet_logger.log_raw(
|
||||
"OPENCODE-K8S-RUN-PARSE-ERROR",
|
||||
{"line": trailing_stdout[:500]},
|
||||
)
|
||||
|
||||
trailing_stderr = stderr_buffer.strip()
|
||||
if trailing_stderr:
|
||||
stderr_lines.append(trailing_stderr)
|
||||
|
||||
stderr_text = "\n".join(stderr_lines).strip()
|
||||
exit_code = _parse_exec_exit_code(status_payload)
|
||||
|
||||
packet_logger.log_raw(
|
||||
"OPENCODE-K8S-RUN-END",
|
||||
{
|
||||
"pod_name": self._pod_name,
|
||||
"namespace": self._namespace,
|
||||
"container": self._container,
|
||||
"cwd": cwd,
|
||||
"server_url": server_url,
|
||||
"session_id": parser.session_id,
|
||||
"stderr_preview": stderr_text[:500] if stderr_text else None,
|
||||
"exit_code": exit_code,
|
||||
},
|
||||
)
|
||||
|
||||
if stderr_text and looks_like_session_not_found(stderr_text):
|
||||
raise OpenCodeSessionNotFoundError(stderr_text)
|
||||
|
||||
if (exit_code not in (None, 0)) or stderr_text:
|
||||
yield OpenCodeError(
|
||||
opencode_session_id=parser.session_id,
|
||||
code=exit_code,
|
||||
message=stderr_text
|
||||
or f"OpenCode run failed with exit code {exit_code}",
|
||||
)
|
||||
return
|
||||
|
||||
if not parser.saw_prompt_response:
|
||||
yield OpenCodePromptResponse(
|
||||
opencode_session_id=parser.session_id,
|
||||
stop_reason="completed",
|
||||
)
|
||||
finally:
|
||||
if ws_client is not None:
|
||||
try:
|
||||
ws_client.close()
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to close OpenCode exec websocket: {e}")
|
||||
@@ -45,6 +45,8 @@ import shlex
|
||||
import tarfile
|
||||
import threading
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
from uuid import UUID
|
||||
@@ -65,17 +67,15 @@ from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_END
|
||||
from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_START
|
||||
from onyx.server.features.build.configs import SANDBOX_S3_BUCKET
|
||||
from onyx.server.features.build.configs import SANDBOX_SERVICE_ACCOUNT_NAME
|
||||
from onyx.server.features.build.sandbox.base import get_opencode_server_port
|
||||
from onyx.server.features.build.sandbox.base import SandboxManager
|
||||
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
|
||||
ACPEvent,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
|
||||
ACPExecClient,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.models import FilesystemEntry
|
||||
from onyx.server.features.build.sandbox.models import LLMProviderConfig
|
||||
from onyx.server.features.build.sandbox.models import SandboxInfo
|
||||
from onyx.server.features.build.sandbox.models import SnapshotResult
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeEvent
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeHttpClient
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeSessionNotFoundError
|
||||
from onyx.server.features.build.sandbox.util.agent_instructions import (
|
||||
ATTACHMENTS_SECTION_CONTENT,
|
||||
)
|
||||
@@ -148,6 +148,79 @@ echo $NEXTJS_PID > {session_path}/nextjs.pid
|
||||
"""
|
||||
|
||||
|
||||
def _build_opencode_server_start_script(session_path: str, nextjs_port: int) -> str:
|
||||
"""Build shell script to ensure the session's OpenCode server is running."""
|
||||
opencode_port = get_opencode_server_port(nextjs_port)
|
||||
opencode_url = f"http://127.0.0.1:{opencode_port}/config"
|
||||
pid_path = f"{session_path}/opencode_server.pid"
|
||||
log_path = f"{session_path}/opencode_server.log"
|
||||
|
||||
return f"""
|
||||
is_opencode_healthy() {{
|
||||
python - <<'PY'
|
||||
import sys
|
||||
import urllib.request
|
||||
|
||||
try:
|
||||
urllib.request.urlopen("{opencode_url}", timeout=1)
|
||||
except Exception:
|
||||
sys.exit(1)
|
||||
sys.exit(0)
|
||||
PY
|
||||
}}
|
||||
|
||||
wait_for_opencode() {{
|
||||
python - <<'PY'
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
|
||||
deadline = time.time() + 30
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
urllib.request.urlopen("{opencode_url}", timeout=1)
|
||||
sys.exit(0)
|
||||
except Exception:
|
||||
time.sleep(0.25)
|
||||
sys.exit(1)
|
||||
PY
|
||||
}}
|
||||
|
||||
OPENCODE_NEEDS_START=1
|
||||
if [ -f {pid_path} ]; then
|
||||
OPENCODE_PID=$(cat {pid_path} 2>/dev/null || true)
|
||||
if [ -n "$OPENCODE_PID" ] && kill -0 "$OPENCODE_PID" 2>/dev/null; then
|
||||
if is_opencode_healthy; then
|
||||
OPENCODE_NEEDS_START=0
|
||||
echo "OpenCode server already running (PID: $OPENCODE_PID)"
|
||||
else
|
||||
echo "Stopping stale OpenCode server (PID: $OPENCODE_PID)"
|
||||
kill "$OPENCODE_PID" 2>/dev/null || true
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ "$OPENCODE_NEEDS_START" -eq 1 ] && is_opencode_healthy; then
|
||||
OPENCODE_NEEDS_START=0
|
||||
echo "OpenCode server already reachable on port {opencode_port}"
|
||||
fi
|
||||
|
||||
if [ "$OPENCODE_NEEDS_START" -eq 1 ]; then
|
||||
echo "Starting OpenCode server on port {opencode_port}"
|
||||
nohup opencode serve --hostname 127.0.0.1 --port {opencode_port} > {log_path} 2>&1 &
|
||||
NEW_OPENCODE_PID=$!
|
||||
echo "$NEW_OPENCODE_PID" > {pid_path}
|
||||
fi
|
||||
|
||||
if ! wait_for_opencode; then
|
||||
echo "ERROR: OpenCode server failed to become healthy at {opencode_url}" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "OpenCode server ready on port {opencode_port}"
|
||||
"""
|
||||
|
||||
|
||||
def _get_local_aws_credential_env_vars() -> list[client.V1EnvVar]:
|
||||
"""Get AWS credential environment variables from local environment.
|
||||
|
||||
@@ -379,6 +452,59 @@ class KubernetesSandboxManager(SandboxManager):
|
||||
service_name = self._get_service_name(sandbox_id)
|
||||
return f"http://{service_name}.{self._namespace}.svc.cluster.local:{port}"
|
||||
|
||||
def _get_pod_ip(self, sandbox_id: UUID) -> str | None:
|
||||
"""Fetch current sandbox pod IP for direct HTTP access."""
|
||||
pod_name = self._get_pod_name(str(sandbox_id))
|
||||
try:
|
||||
pod = self._core_api.read_namespaced_pod(
|
||||
name=pod_name,
|
||||
namespace=self._namespace,
|
||||
)
|
||||
except ApiException as e:
|
||||
logger.warning("Failed to fetch pod %s for IP lookup: %s", pod_name, e)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Unexpected error fetching pod %s for IP lookup: %s", pod_name, e
|
||||
)
|
||||
return None
|
||||
|
||||
pod_status = getattr(pod, "status", None)
|
||||
pod_ip = getattr(pod_status, "pod_ip", None) if pod_status else None
|
||||
return pod_ip if isinstance(pod_ip, str) and pod_ip else None
|
||||
|
||||
@staticmethod
|
||||
def _is_opencode_http_reachable(server_url: str) -> bool:
|
||||
"""Check if OpenCode server is reachable over HTTP from this process."""
|
||||
try:
|
||||
with urllib.request.urlopen(f"{server_url}/config", timeout=2):
|
||||
return True
|
||||
except (urllib.error.URLError, TimeoutError, Exception):
|
||||
return False
|
||||
|
||||
def _ensure_opencode_server(
|
||||
self,
|
||||
pod_name: str,
|
||||
session_path: str,
|
||||
nextjs_port: int,
|
||||
) -> None:
|
||||
"""Ensure the session's OpenCode server is running and healthy."""
|
||||
script = f"""
|
||||
set -e
|
||||
{_build_opencode_server_start_script(session_path=session_path, nextjs_port=nextjs_port)}
|
||||
"""
|
||||
k8s_stream(
|
||||
self._stream_core_api.connect_get_namespaced_pod_exec,
|
||||
name=pod_name,
|
||||
namespace=self._namespace,
|
||||
container="sandbox",
|
||||
command=["/bin/sh", "-c", script],
|
||||
stderr=True,
|
||||
stdin=False,
|
||||
stdout=True,
|
||||
tty=False,
|
||||
)
|
||||
|
||||
def _load_agent_instructions(
|
||||
self,
|
||||
files_path: Path | None = None,
|
||||
@@ -1374,6 +1500,13 @@ echo "Session workspace setup complete"
|
||||
)
|
||||
|
||||
logger.debug(f"Session setup output: {exec_response}")
|
||||
|
||||
# Start and verify per-session OpenCode server.
|
||||
self._ensure_opencode_server(
|
||||
pod_name=pod_name,
|
||||
session_path=session_path,
|
||||
nextjs_port=nextjs_port,
|
||||
)
|
||||
logger.info(
|
||||
f"Set up session workspace {session_id} in sandbox {sandbox_id}"
|
||||
)
|
||||
@@ -1416,6 +1549,13 @@ if [ -f {session_path}/nextjs.pid ]; then
|
||||
kill $NEXTJS_PID 2>/dev/null || true
|
||||
fi
|
||||
|
||||
# Kill OpenCode server if running
|
||||
if [ -f {session_path}/opencode_server.pid ]; then
|
||||
OPENCODE_PID=$(cat {session_path}/opencode_server.pid)
|
||||
echo "Stopping OpenCode server (PID: $OPENCODE_PID)"
|
||||
kill $OPENCODE_PID 2>/dev/null || true
|
||||
fi
|
||||
|
||||
echo "Removing session directory: {session_path}"
|
||||
rm -rf {session_path}
|
||||
echo "Session cleanup complete"
|
||||
@@ -1699,6 +1839,13 @@ echo "SNAPSHOT_RESTORED"
|
||||
stdout=True,
|
||||
tty=False,
|
||||
)
|
||||
|
||||
# Start and verify per-session OpenCode server after wake/restore.
|
||||
self._ensure_opencode_server(
|
||||
pod_name=pod_name,
|
||||
session_path=session_path,
|
||||
nextjs_port=nextjs_port,
|
||||
)
|
||||
except ApiException as e:
|
||||
raise RuntimeError(f"Failed to restore snapshot: {e}") from e
|
||||
|
||||
@@ -1800,94 +1947,125 @@ echo "Session config regeneration complete"
|
||||
True if sandbox is healthy, False otherwise
|
||||
"""
|
||||
pod_name = self._get_pod_name(str(sandbox_id))
|
||||
exec_client = ACPExecClient(
|
||||
pod_name=pod_name,
|
||||
namespace=self._namespace,
|
||||
container="sandbox",
|
||||
)
|
||||
return exec_client.health_check(timeout=timeout)
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
try:
|
||||
resp = k8s_stream(
|
||||
self._stream_core_api.connect_get_namespaced_pod_exec,
|
||||
name=pod_name,
|
||||
namespace=self._namespace,
|
||||
container="sandbox",
|
||||
command=["/bin/sh", "-c", "echo HEALTHY"],
|
||||
stderr=True,
|
||||
stdin=False,
|
||||
stdout=True,
|
||||
tty=False,
|
||||
)
|
||||
return "HEALTHY" in resp
|
||||
except ApiException as e:
|
||||
if e.status == 404:
|
||||
return False
|
||||
logger.debug(f"Health check exec failed for pod {pod_name}: {e}")
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
logger.debug(f"Health check error for pod {pod_name}: {e}")
|
||||
time.sleep(1)
|
||||
return False
|
||||
|
||||
def send_message(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
session_id: UUID,
|
||||
nextjs_port: int,
|
||||
message: str,
|
||||
) -> Generator[ACPEvent, None, None]:
|
||||
"""Send a message to the CLI agent and stream ACP events.
|
||||
opencode_session_id: str | None = None,
|
||||
) -> Generator[OpenCodeEvent, None, None]:
|
||||
"""Send a message to OpenCode and stream typed events.
|
||||
|
||||
Runs `opencode acp` via kubectl exec in the sandbox pod.
|
||||
The agent runs in the session-specific workspace.
|
||||
Uses OpenCode HTTP APIs (`/event` + `prompt_async`) over pod networking
|
||||
for true incremental SSE chunks.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
session_id: The session ID (determines workspace directory)
|
||||
nextjs_port: Allocated Next.js port (used to derive OpenCode port)
|
||||
message: The message content to send
|
||||
opencode_session_id: Existing OpenCode session ID to resume
|
||||
|
||||
Yields:
|
||||
Typed ACP schema event objects
|
||||
Typed OpenCode event objects
|
||||
"""
|
||||
packet_logger = get_packet_logger()
|
||||
pod_name = self._get_pod_name(str(sandbox_id))
|
||||
session_path = f"/workspace/sessions/{session_id}"
|
||||
http_server_url = self.get_opencode_server_url(sandbox_id, nextjs_port)
|
||||
|
||||
# Log ACP client creation
|
||||
packet_logger.log_acp_client_start(
|
||||
sandbox_id, session_id, session_path, context="k8s"
|
||||
)
|
||||
|
||||
exec_client = ACPExecClient(
|
||||
# Ensure per-session OpenCode server is running before attaching a client.
|
||||
self._ensure_opencode_server(
|
||||
pod_name=pod_name,
|
||||
namespace=self._namespace,
|
||||
container="sandbox",
|
||||
session_path=session_path,
|
||||
nextjs_port=nextjs_port,
|
||||
)
|
||||
|
||||
# Log OpenCode client creation
|
||||
packet_logger.log_acp_client_start(
|
||||
sandbox_id, session_id, session_path, context="k8s-opencode"
|
||||
)
|
||||
|
||||
# Log the send_message call at sandbox manager level
|
||||
packet_logger.log_session_start(session_id, sandbox_id, message)
|
||||
|
||||
events_count = 0
|
||||
try:
|
||||
exec_client.start(cwd=session_path)
|
||||
for event in exec_client.send_message(message):
|
||||
events_count += 1
|
||||
yield event
|
||||
success = False
|
||||
|
||||
# Log successful completion
|
||||
packet_logger.log_session_end(
|
||||
session_id, success=True, events_count=events_count
|
||||
if not self._is_opencode_http_reachable(http_server_url):
|
||||
raise RuntimeError(
|
||||
"OpenCode HTTP SSE endpoint is not reachable "
|
||||
f"at {http_server_url} for sandbox {sandbox_id}"
|
||||
)
|
||||
except GeneratorExit:
|
||||
# Generator was closed by consumer (client disconnect, timeout, broken pipe)
|
||||
# This is the most common failure mode for SSE streaming
|
||||
packet_logger.log_session_end(
|
||||
session_id,
|
||||
success=False,
|
||||
error="GeneratorExit: Client disconnected or stream closed by consumer",
|
||||
events_count=events_count,
|
||||
|
||||
def _stream_via_http(
|
||||
session_id_override: str | None,
|
||||
) -> Generator[OpenCodeEvent, None, None]:
|
||||
http_client = OpenCodeHttpClient(
|
||||
server_url=http_server_url,
|
||||
session_id=session_id_override,
|
||||
cwd=session_path,
|
||||
)
|
||||
raise
|
||||
yield from http_client.send_message(message)
|
||||
|
||||
try:
|
||||
try:
|
||||
for event in _stream_via_http(opencode_session_id):
|
||||
events_count += 1
|
||||
yield event
|
||||
except OpenCodeSessionNotFoundError:
|
||||
logger.warning(
|
||||
"OpenCode session %s not found for build session %s in pod %s. "
|
||||
"Retrying with a fresh OpenCode session.",
|
||||
opencode_session_id,
|
||||
session_id,
|
||||
pod_name,
|
||||
)
|
||||
for event in _stream_via_http(None):
|
||||
events_count += 1
|
||||
yield event
|
||||
|
||||
success = True
|
||||
except Exception as e:
|
||||
# Log failure from normal exceptions
|
||||
packet_logger.log_session_end(
|
||||
session_id,
|
||||
success=False,
|
||||
error=f"Exception: {str(e)}",
|
||||
events_count=events_count,
|
||||
)
|
||||
raise
|
||||
except BaseException as e:
|
||||
# Log failure from other base exceptions (SystemExit, KeyboardInterrupt, etc.)
|
||||
exception_type = type(e).__name__
|
||||
packet_logger.log_session_end(
|
||||
session_id,
|
||||
success=False,
|
||||
error=f"{exception_type}: {str(e) if str(e) else 'System-level interruption'}",
|
||||
error=str(e),
|
||||
events_count=events_count,
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
exec_client.stop()
|
||||
# Log client stop
|
||||
packet_logger.log_acp_client_stop(sandbox_id, session_id, context="k8s")
|
||||
if success:
|
||||
packet_logger.log_session_end(
|
||||
session_id, success=True, events_count=events_count
|
||||
)
|
||||
packet_logger.log_acp_client_stop(
|
||||
sandbox_id, session_id, context="k8s-opencode"
|
||||
)
|
||||
|
||||
def list_directory(
|
||||
self, sandbox_id: UUID, session_id: UUID, path: str
|
||||
@@ -2100,6 +2278,24 @@ echo "Session config regeneration complete"
|
||||
"""
|
||||
return self._get_nextjs_url(str(sandbox_id), port)
|
||||
|
||||
def get_opencode_server_url(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
nextjs_port: int,
|
||||
) -> str:
|
||||
"""Get the OpenCode server URL reachable from the API server process.
|
||||
|
||||
Uses pod IP networking for direct HTTP access to per-session OpenCode
|
||||
servers.
|
||||
"""
|
||||
pod_ip = self._get_pod_ip(sandbox_id)
|
||||
opencode_port = get_opencode_server_port(nextjs_port)
|
||||
if not pod_ip:
|
||||
raise RuntimeError(
|
||||
"Sandbox pod IP unavailable; cannot construct OpenCode HTTP URL"
|
||||
)
|
||||
return f"http://{pod_ip}:{opencode_port}"
|
||||
|
||||
def generate_pptx_preview(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
|
||||
@@ -1,19 +1,11 @@
|
||||
"""Local filesystem-based sandbox implementation.
|
||||
"""Local filesystem-based sandbox implementation."""
|
||||
|
||||
This module provides the LocalSandboxManager for development and single-node
|
||||
deployments that run sandboxes as directories on the local filesystem.
|
||||
"""
|
||||
|
||||
from onyx.server.features.build.sandbox.local.agent_client import ACPAgentClient
|
||||
from onyx.server.features.build.sandbox.local.agent_client import ACPEvent
|
||||
from onyx.server.features.build.sandbox.local.local_sandbox_manager import (
|
||||
LocalSandboxManager,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.local.process_manager import ProcessManager
|
||||
|
||||
__all__ = [
|
||||
"ACPAgentClient",
|
||||
"ACPEvent",
|
||||
"LocalSandboxManager",
|
||||
"ProcessManager",
|
||||
]
|
||||
|
||||
@@ -11,20 +11,24 @@ import mimetypes
|
||||
import re
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
from uuid import UUID
|
||||
|
||||
from onyx.db.enums import SandboxStatus
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.server.features.build.api.packet_logger import get_packet_logger
|
||||
from onyx.server.features.build.configs import DEMO_DATA_PATH
|
||||
from onyx.server.features.build.configs import OPENCODE_DISABLED_TOOLS
|
||||
from onyx.server.features.build.configs import OPENCODE_SERVER_STARTUP_TIMEOUT_SECONDS
|
||||
from onyx.server.features.build.configs import OUTPUTS_TEMPLATE_PATH
|
||||
from onyx.server.features.build.configs import SANDBOX_BASE_PATH
|
||||
from onyx.server.features.build.configs import VENV_TEMPLATE_PATH
|
||||
from onyx.server.features.build.sandbox.base import get_opencode_server_port
|
||||
from onyx.server.features.build.sandbox.base import SandboxManager
|
||||
from onyx.server.features.build.sandbox.local.agent_client import ACPAgentClient
|
||||
from onyx.server.features.build.sandbox.local.agent_client import ACPEvent
|
||||
from onyx.server.features.build.sandbox.local.process_manager import ProcessManager
|
||||
from onyx.server.features.build.sandbox.manager.directory_manager import (
|
||||
DirectoryManager,
|
||||
@@ -34,6 +38,9 @@ from onyx.server.features.build.sandbox.models import FilesystemEntry
|
||||
from onyx.server.features.build.sandbox.models import LLMProviderConfig
|
||||
from onyx.server.features.build.sandbox.models import SandboxInfo
|
||||
from onyx.server.features.build.sandbox.models import SnapshotResult
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeEvent
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeHttpClient
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeSessionNotFoundError
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
@@ -84,9 +91,10 @@ class LocalSandboxManager(SandboxManager):
|
||||
self._process_manager = ProcessManager()
|
||||
self._snapshot_manager = SnapshotManager(get_default_file_store())
|
||||
|
||||
# Track ACP clients in memory - keyed by (sandbox_id, session_id) tuple
|
||||
# Each session within a sandbox has its own ACP client
|
||||
self._acp_clients: dict[tuple[UUID, UUID], ACPAgentClient] = {}
|
||||
# Track per-session OpenCode server processes.
|
||||
# Keyed by (sandbox_id, session_id) because each Craft session has its
|
||||
# own OpenCode server working directory and session state.
|
||||
self._opencode_servers: dict[tuple[UUID, UUID], subprocess.Popen[bytes]] = {}
|
||||
|
||||
# Track Next.js processes - keyed by (sandbox_id, session_id) tuple
|
||||
# Used for clean shutdown when sessions are deleted
|
||||
@@ -152,6 +160,93 @@ class LocalSandboxManager(SandboxManager):
|
||||
"""
|
||||
return self._get_sandbox_path(sandbox_id) / "sessions" / str(session_id)
|
||||
|
||||
def _get_opencode_server_url_for_nextjs_port(self, nextjs_port: int) -> str:
|
||||
"""Build local OpenCode server URL from session Next.js port."""
|
||||
return f"http://127.0.0.1:{get_opencode_server_port(nextjs_port)}"
|
||||
|
||||
def _wait_for_opencode_server(
|
||||
self,
|
||||
url: str,
|
||||
process: subprocess.Popen[bytes],
|
||||
timeout: float = OPENCODE_SERVER_STARTUP_TIMEOUT_SECONDS,
|
||||
) -> bool:
|
||||
"""Poll OpenCode server readiness via /config endpoint."""
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
if process.poll() is not None:
|
||||
return False
|
||||
try:
|
||||
with urllib.request.urlopen(f"{url}/config", timeout=2):
|
||||
return True
|
||||
except (urllib.error.URLError, TimeoutError, Exception):
|
||||
time.sleep(0.25)
|
||||
return False
|
||||
|
||||
def _start_opencode_server(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
session_id: UUID,
|
||||
session_path: Path,
|
||||
nextjs_port: int,
|
||||
) -> subprocess.Popen[bytes]:
|
||||
"""Start a per-session OpenCode server process."""
|
||||
key = (sandbox_id, session_id)
|
||||
existing = self._opencode_servers.pop(key, None)
|
||||
if existing is not None:
|
||||
self._stop_opencode_server_process(existing, session_id)
|
||||
|
||||
server_url = self._get_opencode_server_url_for_nextjs_port(nextjs_port)
|
||||
server_log = session_path / "opencode_server.log"
|
||||
|
||||
with server_log.open("ab") as log_file:
|
||||
process = subprocess.Popen(
|
||||
[
|
||||
"opencode",
|
||||
"serve",
|
||||
"--hostname",
|
||||
"127.0.0.1",
|
||||
"--port",
|
||||
str(get_opencode_server_port(nextjs_port)),
|
||||
],
|
||||
cwd=session_path,
|
||||
stdout=log_file,
|
||||
stderr=log_file,
|
||||
)
|
||||
|
||||
if not self._wait_for_opencode_server(server_url, process):
|
||||
self._stop_opencode_server_process(process, session_id)
|
||||
raise RuntimeError(
|
||||
"OpenCode server failed to start "
|
||||
f"for session {session_id} at {server_url}"
|
||||
)
|
||||
|
||||
self._opencode_servers[key] = process
|
||||
logger.info(
|
||||
"Started OpenCode server for session %s on %s", session_id, server_url
|
||||
)
|
||||
return process
|
||||
|
||||
def _stop_opencode_server_process(
|
||||
self,
|
||||
process: subprocess.Popen[bytes],
|
||||
session_id: UUID,
|
||||
) -> None:
|
||||
"""Stop an OpenCode server process."""
|
||||
if process.poll() is not None:
|
||||
return
|
||||
|
||||
try:
|
||||
self._process_manager.terminate_process(process.pid)
|
||||
logger.debug(
|
||||
"Stopped OpenCode server (PID %s) for session %s",
|
||||
process.pid,
|
||||
session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to stop OpenCode server for session {session_id}: {e}"
|
||||
)
|
||||
|
||||
def _setup_filtered_files(
|
||||
self,
|
||||
session_path: Path,
|
||||
@@ -316,7 +411,7 @@ class LocalSandboxManager(SandboxManager):
|
||||
"""Terminate a sandbox and clean up all resources.
|
||||
|
||||
1. Stop all Next.js processes for this sandbox
|
||||
2. Stop all ACP clients for this sandbox (terminates agent subprocesses)
|
||||
2. Stop all OpenCode servers for this sandbox
|
||||
3. Cleanup sandbox directory
|
||||
|
||||
Args:
|
||||
@@ -342,19 +437,19 @@ class LocalSandboxManager(SandboxManager):
|
||||
f"session {session_id}: {e}"
|
||||
)
|
||||
|
||||
# Stop all ACP clients for this sandbox (keyed by (sandbox_id, session_id))
|
||||
clients_to_stop = [
|
||||
(key, client)
|
||||
for key, client in self._acp_clients.items()
|
||||
# Stop all OpenCode servers for this sandbox.
|
||||
servers_to_stop = [
|
||||
(key, process)
|
||||
for key, process in self._opencode_servers.items()
|
||||
if key[0] == sandbox_id
|
||||
]
|
||||
for key, client in clients_to_stop:
|
||||
for key, process in servers_to_stop:
|
||||
try:
|
||||
client.stop()
|
||||
del self._acp_clients[key]
|
||||
self._stop_opencode_server_process(process, key[1])
|
||||
del self._opencode_servers[key]
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to stop ACP client for sandbox {sandbox_id}, "
|
||||
f"Failed to stop OpenCode server for sandbox {sandbox_id}, "
|
||||
f"session {key[1]}: {e}"
|
||||
)
|
||||
|
||||
@@ -538,6 +633,14 @@ class LocalSandboxManager(SandboxManager):
|
||||
)
|
||||
logger.debug("Agent instructions ready")
|
||||
|
||||
# Start per-session OpenCode server.
|
||||
self._start_opencode_server(
|
||||
sandbox_id=sandbox_id,
|
||||
session_id=session_id,
|
||||
session_path=session_path,
|
||||
nextjs_port=nextjs_port,
|
||||
)
|
||||
|
||||
logger.info(f"Set up session workspace {session_id} at {session_path}")
|
||||
|
||||
except Exception as e:
|
||||
@@ -563,7 +666,7 @@ class LocalSandboxManager(SandboxManager):
|
||||
"""Clean up a session workspace (on session delete).
|
||||
|
||||
1. Stop Next.js dev server if running
|
||||
2. Stop ACP client for this session
|
||||
2. Stop OpenCode server for this session
|
||||
3. Remove session directory
|
||||
|
||||
Does NOT terminate the sandbox - other sessions may still be using it.
|
||||
@@ -582,16 +685,16 @@ class LocalSandboxManager(SandboxManager):
|
||||
# Fallback: find by port (e.g., if server was restarted)
|
||||
self._stop_nextjs_server_on_port(nextjs_port, session_id)
|
||||
|
||||
# Stop ACP client for this session
|
||||
client_key = (sandbox_id, session_id)
|
||||
client = self._acp_clients.pop(client_key, None)
|
||||
if client:
|
||||
# Stop OpenCode server for this session.
|
||||
server_key = (sandbox_id, session_id)
|
||||
server_process = self._opencode_servers.pop(server_key, None)
|
||||
if server_process:
|
||||
try:
|
||||
client.stop()
|
||||
logger.debug(f"Stopped ACP client for session {session_id}")
|
||||
self._stop_opencode_server_process(server_process, session_id)
|
||||
logger.debug(f"Stopped OpenCode server for session {session_id}")
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to stop ACP client for session {session_id}: {e}"
|
||||
f"Failed to stop OpenCode server for session {session_id}: {e}"
|
||||
)
|
||||
|
||||
# Cleanup session directory
|
||||
@@ -806,77 +909,100 @@ class LocalSandboxManager(SandboxManager):
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_opencode_server_url(
|
||||
self, sandbox_id: UUID, nextjs_port: int # noqa: ARG002
|
||||
) -> str:
|
||||
"""Get the OpenCode server URL for a session in local mode."""
|
||||
return self._get_opencode_server_url_for_nextjs_port(nextjs_port)
|
||||
|
||||
def send_message(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
session_id: UUID,
|
||||
nextjs_port: int,
|
||||
message: str,
|
||||
) -> Generator[ACPEvent, None, None]:
|
||||
"""Send a message to the CLI agent and stream typed ACP events.
|
||||
|
||||
The agent runs in the session-specific workspace:
|
||||
sessions/$session_id/
|
||||
|
||||
Yields ACPEvent objects:
|
||||
- AgentMessageChunk: Text/image content from agent
|
||||
- AgentThoughtChunk: Agent's internal reasoning
|
||||
- ToolCallStart: Tool invocation started
|
||||
- ToolCallProgress: Tool execution progress/result
|
||||
- AgentPlanUpdate: Agent's execution plan
|
||||
- CurrentModeUpdate: Agent mode change
|
||||
- PromptResponse: Agent finished (has stop_reason)
|
||||
- Error: An error occurred
|
||||
opencode_session_id: str | None = None,
|
||||
) -> Generator[OpenCodeEvent, None, None]:
|
||||
"""Send a message to OpenCode and stream normalized events.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
session_id: The session ID (determines workspace directory)
|
||||
nextjs_port: Allocated Next.js port (used to derive OpenCode port)
|
||||
message: The message content to send
|
||||
|
||||
Yields:
|
||||
Typed ACP schema event objects
|
||||
opencode_session_id: Existing OpenCode session ID to resume
|
||||
"""
|
||||
from onyx.server.features.build.api.packet_logger import get_packet_logger
|
||||
|
||||
packet_logger = get_packet_logger()
|
||||
|
||||
# Get or create ACP client for this session
|
||||
client_key = (sandbox_id, session_id)
|
||||
client = self._acp_clients.get(client_key)
|
||||
|
||||
if client is None or not client.is_running:
|
||||
session_path = self._get_session_path(sandbox_id, session_id)
|
||||
|
||||
# Log client creation
|
||||
packet_logger.log_acp_client_start(
|
||||
sandbox_id, session_id, str(session_path), context="local"
|
||||
)
|
||||
logger.info(
|
||||
f"Creating new ACP client for sandbox {sandbox_id}, session {session_id}"
|
||||
session_path = self._get_session_path(sandbox_id, session_id)
|
||||
if not session_path.exists():
|
||||
raise RuntimeError(
|
||||
f"Session workspace not found for sandbox={sandbox_id} session={session_id}"
|
||||
)
|
||||
|
||||
# Create and start ACP client for this session
|
||||
client = ACPAgentClient(cwd=str(session_path))
|
||||
self._acp_clients[client_key] = client
|
||||
server_key = (sandbox_id, session_id)
|
||||
server_process = self._opencode_servers.get(server_key)
|
||||
if server_process is None or server_process.poll() is not None:
|
||||
self._start_opencode_server(
|
||||
sandbox_id=sandbox_id,
|
||||
session_id=session_id,
|
||||
session_path=session_path,
|
||||
nextjs_port=nextjs_port,
|
||||
)
|
||||
|
||||
server_url = self.get_opencode_server_url(sandbox_id, nextjs_port)
|
||||
packet_logger.log_acp_client_start(
|
||||
sandbox_id,
|
||||
session_id,
|
||||
str(session_path),
|
||||
context="local-opencode",
|
||||
)
|
||||
|
||||
# Log the send_message call at sandbox manager level
|
||||
packet_logger.log_session_start(session_id, sandbox_id, message)
|
||||
|
||||
events_count = 0
|
||||
success = False
|
||||
try:
|
||||
for event in client.send_message(message):
|
||||
events_count += 1
|
||||
yield event
|
||||
|
||||
# Log successful completion
|
||||
packet_logger.log_session_end(
|
||||
session_id, success=True, events_count=events_count
|
||||
http_client = OpenCodeHttpClient(
|
||||
server_url=server_url,
|
||||
session_id=opencode_session_id,
|
||||
cwd=str(session_path),
|
||||
)
|
||||
try:
|
||||
for event in http_client.send_message(message):
|
||||
events_count += 1
|
||||
yield event
|
||||
except OpenCodeSessionNotFoundError:
|
||||
# Server lost this session (e.g. restarted). Retry once without session id.
|
||||
logger.warning(
|
||||
"OpenCode session %s not found for build session %s. "
|
||||
"Retrying with a fresh OpenCode session.",
|
||||
opencode_session_id,
|
||||
session_id,
|
||||
)
|
||||
http_client = OpenCodeHttpClient(
|
||||
server_url=server_url,
|
||||
session_id=None,
|
||||
cwd=str(session_path),
|
||||
)
|
||||
for event in http_client.send_message(message):
|
||||
events_count += 1
|
||||
yield event
|
||||
success = True
|
||||
except Exception as e:
|
||||
# Log failure
|
||||
packet_logger.log_session_end(
|
||||
session_id, success=False, error=str(e), events_count=events_count
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
if success:
|
||||
packet_logger.log_session_end(
|
||||
session_id, success=True, events_count=events_count
|
||||
)
|
||||
packet_logger.log_acp_client_stop(
|
||||
sandbox_id, session_id, context="local-opencode"
|
||||
)
|
||||
|
||||
def _sanitize_path(self, path: str) -> str:
|
||||
"""Sanitize a user-provided path to prevent path traversal attacks.
|
||||
|
||||
@@ -15,8 +15,6 @@ from uuid import UUID
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from acp.schema import PromptResponse
|
||||
from acp.schema import ToolCallStart
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.db.engine.sql_engine import get_session_with_current_tenant
|
||||
@@ -32,10 +30,12 @@ from onyx.server.features.build.configs import SANDBOX_BASE_PATH
|
||||
from onyx.server.features.build.db.build_session import allocate_nextjs_port
|
||||
from onyx.server.features.build.sandbox import get_sandbox_manager
|
||||
from onyx.server.features.build.sandbox.local import LocalSandboxManager
|
||||
from onyx.server.features.build.sandbox.local.agent_client import ACPEvent
|
||||
from onyx.server.features.build.sandbox.models import FilesystemEntry
|
||||
from onyx.server.features.build.sandbox.models import LLMProviderConfig
|
||||
from onyx.server.features.build.sandbox.models import SnapshotResult
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeEvent
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodePromptResponse
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeToolCallStart
|
||||
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
|
||||
|
||||
|
||||
@@ -190,8 +190,8 @@ def session_workspace(
|
||||
sandbox_record: Sandbox,
|
||||
build_session_record: BuildSession,
|
||||
db_session: Session,
|
||||
) -> Generator[tuple[Sandbox, UUID], None, None]:
|
||||
"""Set up a session workspace within the sandbox and return (sandbox, session_id)."""
|
||||
) -> Generator[tuple[Sandbox, UUID, int], None, None]:
|
||||
"""Set up a session workspace and return (sandbox, session_id, nextjs_port)."""
|
||||
session_id = build_session_record.id
|
||||
|
||||
# Use setup_session_workspace to create the session directory structure
|
||||
@@ -218,7 +218,7 @@ def session_workspace(
|
||||
file_system_path=SANDBOX_BASE_PATH,
|
||||
)
|
||||
|
||||
yield sandbox_record, session_id
|
||||
yield sandbox_record, session_id, nextjs_port
|
||||
|
||||
# Cleanup session workspace
|
||||
sandbox_manager.cleanup_session_workspace(
|
||||
@@ -263,7 +263,7 @@ class TestCreateSnapshot:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
file_store_initialized: None, # noqa: ARG002
|
||||
) -> None:
|
||||
@@ -271,7 +271,7 @@ class TestCreateSnapshot:
|
||||
|
||||
Note: Caller is responsible for creating DB record from the SnapshotResult.
|
||||
"""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
sandbox_path = Path(SANDBOX_BASE_PATH) / str(sandbox.id)
|
||||
outputs_dir = sandbox_path / "sessions" / str(session_id) / "outputs"
|
||||
(outputs_dir / "app.py").write_text("print('hello')")
|
||||
@@ -309,11 +309,11 @@ class TestListDirectory:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test that list_directory returns filesystem entries."""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
sandbox_path = Path(SANDBOX_BASE_PATH) / str(sandbox.id)
|
||||
outputs_dir = sandbox_path / "sessions" / str(session_id)
|
||||
(outputs_dir / "file.txt").write_text("content")
|
||||
@@ -334,11 +334,11 @@ class TestReadFile:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test that read_file returns file contents as bytes."""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
sandbox_path = Path(SANDBOX_BASE_PATH) / str(sandbox.id)
|
||||
outputs_dir = sandbox_path / "sessions" / str(session_id) / "outputs"
|
||||
(outputs_dir / "test.txt").write_bytes(b"Hello, World!")
|
||||
@@ -355,19 +355,19 @@ class TestSendMessage:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test that send_message streams ACPEvent objects and ends with PromptResponse.
|
||||
"""Test that send_message streams OpenCode events and ends with prompt_response.
|
||||
|
||||
Note: Heartbeat update is now handled by the caller (SessionManager),
|
||||
not by the SandboxManager itself.
|
||||
"""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
|
||||
events: list[ACPEvent] = []
|
||||
events: list[OpenCodeEvent] = []
|
||||
for event in sandbox_manager.send_message(
|
||||
sandbox.id, session_id, "What is 2 + 2?"
|
||||
sandbox.id, session_id, nextjs_port, "What is 2 + 2?"
|
||||
):
|
||||
events.append(event)
|
||||
|
||||
@@ -376,30 +376,31 @@ class TestSendMessage:
|
||||
|
||||
# Last event should be PromptResponse (success) or contain results
|
||||
last_event = events[-1]
|
||||
assert isinstance(last_event, PromptResponse)
|
||||
assert isinstance(last_event, OpenCodePromptResponse)
|
||||
|
||||
def test_send_message_write_file(
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test that send_message can write files and emits edit tool calls."""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
sandbox_path = Path(SANDBOX_BASE_PATH) / str(sandbox.id)
|
||||
session_path = sandbox_path / "sessions" / str(session_id)
|
||||
|
||||
events: list[ACPEvent] = []
|
||||
events: list[OpenCodeEvent] = []
|
||||
for event in sandbox_manager.send_message(
|
||||
sandbox.id,
|
||||
session_id,
|
||||
nextjs_port,
|
||||
"Create a file called hello.txt with the content 'Hello, World!'",
|
||||
):
|
||||
events.append(event)
|
||||
|
||||
# Should have at least one ToolCallStart with kind='edit'
|
||||
tool_calls = [e for e in events if isinstance(e, ToolCallStart)]
|
||||
tool_calls = [e for e in events if isinstance(e, OpenCodeToolCallStart)]
|
||||
edit_tool_calls = [tc for tc in tool_calls if tc.kind == "edit"]
|
||||
assert len(edit_tool_calls) >= 1, (
|
||||
f"Expected at least one edit tool call, got {len(edit_tool_calls)}. "
|
||||
@@ -408,7 +409,7 @@ class TestSendMessage:
|
||||
|
||||
# Last event should be PromptResponse
|
||||
last_event = events[-1]
|
||||
assert isinstance(last_event, PromptResponse)
|
||||
assert isinstance(last_event, OpenCodePromptResponse)
|
||||
|
||||
# Verify the file was actually created (agent writes relative to session root)
|
||||
created_file = session_path / "hello.txt"
|
||||
@@ -419,11 +420,11 @@ class TestSendMessage:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test that send_message can read files and emits read tool calls."""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
sandbox_path = Path(SANDBOX_BASE_PATH) / str(sandbox.id)
|
||||
session_path = sandbox_path / "sessions" / str(session_id)
|
||||
|
||||
@@ -431,16 +432,17 @@ class TestSendMessage:
|
||||
test_file = session_path / "secret.txt"
|
||||
test_file.write_text("The secret code is 12345")
|
||||
|
||||
events: list[ACPEvent] = []
|
||||
events: list[OpenCodeEvent] = []
|
||||
for event in sandbox_manager.send_message(
|
||||
sandbox.id,
|
||||
session_id,
|
||||
nextjs_port,
|
||||
"Read the file secret.txt and tell me what the secret code is",
|
||||
):
|
||||
events.append(event)
|
||||
|
||||
# Should have at least one ToolCallStart with kind='read'
|
||||
tool_calls = [e for e in events if isinstance(e, ToolCallStart)]
|
||||
tool_calls = [e for e in events if isinstance(e, OpenCodeToolCallStart)]
|
||||
read_tool_calls = [tc for tc in tool_calls if tc.kind == "read"]
|
||||
assert len(read_tool_calls) >= 1, (
|
||||
f"Expected at least one read tool call, got {len(read_tool_calls)}. "
|
||||
@@ -449,7 +451,7 @@ class TestSendMessage:
|
||||
|
||||
# Last event should be PromptResponse
|
||||
last_event = events[-1]
|
||||
assert isinstance(last_event, PromptResponse)
|
||||
assert isinstance(last_event, OpenCodePromptResponse)
|
||||
|
||||
|
||||
class TestUploadFile:
|
||||
@@ -459,11 +461,11 @@ class TestUploadFile:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test that upload_file creates a file in the attachments directory."""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
content = b"Hello, World!"
|
||||
|
||||
result = sandbox_manager.upload_file(
|
||||
@@ -484,11 +486,11 @@ class TestUploadFile:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test that upload_file renames files on collision."""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
|
||||
# Upload first file
|
||||
sandbox_manager.upload_file(sandbox.id, session_id, "test.txt", b"first")
|
||||
@@ -508,11 +510,11 @@ class TestDeleteFile:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test that delete_file removes a file."""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
|
||||
# Upload a file first
|
||||
sandbox_manager.upload_file(sandbox.id, session_id, "test.txt", b"content")
|
||||
@@ -535,11 +537,11 @@ class TestDeleteFile:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test that delete_file returns False for non-existent file."""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
|
||||
result = sandbox_manager.delete_file(
|
||||
sandbox.id, session_id, "attachments/nonexistent.txt"
|
||||
@@ -551,11 +553,11 @@ class TestDeleteFile:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test that delete_file rejects path traversal attempts."""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
|
||||
with pytest.raises(ValueError, match="path traversal"):
|
||||
sandbox_manager.delete_file(sandbox.id, session_id, "../../../etc/passwd")
|
||||
@@ -568,11 +570,11 @@ class TestGetUploadStats:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test get_upload_stats returns zeros for empty directory."""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
|
||||
file_count, total_size = sandbox_manager.get_upload_stats(
|
||||
sandbox.id, session_id
|
||||
@@ -585,11 +587,11 @@ class TestGetUploadStats:
|
||||
self,
|
||||
sandbox_manager: LocalSandboxManager,
|
||||
db_session: Session, # noqa: ARG002
|
||||
session_workspace: tuple[Sandbox, UUID],
|
||||
session_workspace: tuple[Sandbox, UUID, int],
|
||||
tenant_context: None, # noqa: ARG002
|
||||
) -> None:
|
||||
"""Test get_upload_stats returns correct count and size."""
|
||||
sandbox, session_id = session_workspace
|
||||
sandbox, session_id, nextjs_port = session_workspace
|
||||
|
||||
# Upload some files
|
||||
sandbox_manager.upload_file(
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
"""OpenCode server/client primitives for Craft sandbox execution."""
|
||||
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeAgentMessageChunk
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeAgentThoughtChunk
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeError
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeEvent
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodePromptResponse
|
||||
from onyx.server.features.build.sandbox.opencode.events import (
|
||||
OpenCodeSessionEstablished,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeSSEKeepalive
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeToolCallProgress
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeToolCallStart
|
||||
from onyx.server.features.build.sandbox.opencode.http_client import OpenCodeHttpClient
|
||||
from onyx.server.features.build.sandbox.opencode.run_client import OpenCodeRunClient
|
||||
from onyx.server.features.build.sandbox.opencode.run_client import (
|
||||
OpenCodeSessionNotFoundError,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"OpenCodeEvent",
|
||||
"OpenCodeSSEKeepalive",
|
||||
"OpenCodeAgentMessageChunk",
|
||||
"OpenCodeAgentThoughtChunk",
|
||||
"OpenCodeToolCallStart",
|
||||
"OpenCodeToolCallProgress",
|
||||
"OpenCodePromptResponse",
|
||||
"OpenCodeError",
|
||||
"OpenCodeSessionEstablished",
|
||||
"OpenCodeHttpClient",
|
||||
"OpenCodeRunClient",
|
||||
"OpenCodeSessionNotFoundError",
|
||||
]
|
||||
125
backend/onyx/server/features/build/sandbox/opencode/events.py
Normal file
125
backend/onyx/server/features/build/sandbox/opencode/events.py
Normal file
@@ -0,0 +1,125 @@
|
||||
"""Typed event models for OpenCode streaming in Craft."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from typing import Any
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import Field
|
||||
|
||||
|
||||
def utc_now_iso() -> str:
|
||||
"""Return current UTC time in ISO format."""
|
||||
return datetime.now(tz=timezone.utc).isoformat()
|
||||
|
||||
|
||||
def timestamp_ms_to_iso(timestamp_ms: int | float | None) -> str:
|
||||
"""Convert millisecond epoch timestamp to UTC ISO string."""
|
||||
if timestamp_ms is None:
|
||||
return utc_now_iso()
|
||||
try:
|
||||
return datetime.fromtimestamp(
|
||||
float(timestamp_ms) / 1000.0, tz=timezone.utc
|
||||
).isoformat()
|
||||
except (TypeError, ValueError, OSError):
|
||||
return utc_now_iso()
|
||||
|
||||
|
||||
class OpenCodeTextContent(BaseModel):
|
||||
"""Text content block used by streamed text/reasoning packets."""
|
||||
|
||||
type: Literal["text"] = "text"
|
||||
text: str
|
||||
|
||||
|
||||
class OpenCodeAgentMessageChunk(BaseModel):
|
||||
"""Assistant text chunk event."""
|
||||
|
||||
type: Literal["agent_message_chunk"] = "agent_message_chunk"
|
||||
opencode_session_id: str | None = None
|
||||
content: OpenCodeTextContent
|
||||
timestamp: str = Field(default_factory=utc_now_iso)
|
||||
|
||||
|
||||
class OpenCodeAgentThoughtChunk(BaseModel):
|
||||
"""Assistant reasoning/thought chunk event."""
|
||||
|
||||
type: Literal["agent_thought_chunk"] = "agent_thought_chunk"
|
||||
opencode_session_id: str | None = None
|
||||
content: OpenCodeTextContent
|
||||
timestamp: str = Field(default_factory=utc_now_iso)
|
||||
|
||||
|
||||
class OpenCodeToolCallBase(BaseModel):
|
||||
"""Shared fields for synthesized tool call packets."""
|
||||
|
||||
opencode_session_id: str | None = None
|
||||
tool_call_id: str
|
||||
tool_name: str
|
||||
kind: str | None = None
|
||||
title: str | None = None
|
||||
content: list[dict[str, Any]] | None = None
|
||||
locations: list[str] | None = None
|
||||
raw_input: dict[str, Any] | None = None
|
||||
raw_output: dict[str, Any] | None = None
|
||||
status: str | None = None
|
||||
timestamp: str = Field(default_factory=utc_now_iso)
|
||||
|
||||
|
||||
class OpenCodeToolCallStart(OpenCodeToolCallBase):
|
||||
"""Tool call start event."""
|
||||
|
||||
type: Literal["tool_call_start"] = "tool_call_start"
|
||||
|
||||
|
||||
class OpenCodeToolCallProgress(OpenCodeToolCallBase):
|
||||
"""Tool call update event."""
|
||||
|
||||
type: Literal["tool_call_progress"] = "tool_call_progress"
|
||||
|
||||
|
||||
class OpenCodePromptResponse(BaseModel):
|
||||
"""Turn completion event."""
|
||||
|
||||
type: Literal["prompt_response"] = "prompt_response"
|
||||
opencode_session_id: str | None = None
|
||||
stop_reason: str | None = None
|
||||
timestamp: str = Field(default_factory=utc_now_iso)
|
||||
|
||||
|
||||
class OpenCodeError(BaseModel):
|
||||
"""Error event from OpenCode client/server interaction."""
|
||||
|
||||
type: Literal["error"] = "error"
|
||||
opencode_session_id: str | None = None
|
||||
code: int | str | None = None
|
||||
message: str
|
||||
data: dict[str, Any] | None = None
|
||||
timestamp: str = Field(default_factory=utc_now_iso)
|
||||
|
||||
|
||||
class OpenCodeSessionEstablished(BaseModel):
|
||||
"""Internal event emitted when a session ID is discovered/created."""
|
||||
|
||||
type: Literal["opencode_session_established"] = "opencode_session_established"
|
||||
opencode_session_id: str
|
||||
timestamp: str = Field(default_factory=utc_now_iso)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class OpenCodeSSEKeepalive:
|
||||
"""Marker used by SessionManager to emit SSE keepalive comments."""
|
||||
|
||||
|
||||
OpenCodeEvent = (
|
||||
OpenCodeAgentMessageChunk
|
||||
| OpenCodeAgentThoughtChunk
|
||||
| OpenCodeToolCallStart
|
||||
| OpenCodeToolCallProgress
|
||||
| OpenCodePromptResponse
|
||||
| OpenCodeError
|
||||
| OpenCodeSessionEstablished
|
||||
| OpenCodeSSEKeepalive
|
||||
)
|
||||
@@ -0,0 +1,571 @@
|
||||
"""HTTP streaming client for OpenCode server event and prompt APIs."""
|
||||
|
||||
import json
|
||||
import socket
|
||||
import time
|
||||
from collections.abc import Generator
|
||||
from http.client import HTTPResponse
|
||||
from typing import Any
|
||||
from urllib.error import HTTPError
|
||||
from urllib.parse import urlencode
|
||||
from urllib.request import Request
|
||||
from urllib.request import urlopen
|
||||
|
||||
from onyx.server.features.build.api.packet_logger import get_packet_logger
|
||||
from onyx.server.features.build.configs import OPENCODE_MESSAGE_TIMEOUT
|
||||
from onyx.server.features.build.configs import SSE_KEEPALIVE_INTERVAL
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeError
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeEvent
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodePromptResponse
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeSSEKeepalive
|
||||
from onyx.server.features.build.sandbox.opencode.parser import (
|
||||
looks_like_session_not_found,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.opencode.parser import OpenCodeEventParser
|
||||
from onyx.server.features.build.sandbox.opencode.run_client import (
|
||||
OpenCodeSessionNotFoundError,
|
||||
)
|
||||
|
||||
|
||||
READ_TIMEOUT_SECONDS = 1.0
|
||||
|
||||
|
||||
def _as_dict(value: Any) -> dict[str, Any] | None:
|
||||
return value if isinstance(value, dict) else None
|
||||
|
||||
|
||||
def _extract_timestamp_ms(value: dict[str, Any]) -> int | float | None:
|
||||
"""Extract a millisecond timestamp from `time` metadata, if present."""
|
||||
time_obj = _as_dict(value.get("time")) or {}
|
||||
for key in ("end", "completed", "start", "created", "updated"):
|
||||
ts = time_obj.get(key)
|
||||
if isinstance(ts, (int, float)):
|
||||
return ts
|
||||
return None
|
||||
|
||||
|
||||
def _extract_session_error_message(error_payload: dict[str, Any] | None) -> str:
|
||||
if not error_payload:
|
||||
return "OpenCode session failed"
|
||||
|
||||
message = error_payload.get("message")
|
||||
if isinstance(message, str) and message:
|
||||
return message
|
||||
|
||||
data = _as_dict(error_payload.get("data"))
|
||||
if data:
|
||||
nested = data.get("message")
|
||||
if isinstance(nested, str) and nested:
|
||||
return nested
|
||||
|
||||
return json.dumps(error_payload, default=str)
|
||||
|
||||
|
||||
def _build_raw_event_from_part(
|
||||
part: dict[str, Any],
|
||||
delta: str | None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Translate OpenCode SSE `message.part.updated` payload into run-style events."""
|
||||
part_type = part.get("type")
|
||||
session_id = part.get("sessionID")
|
||||
if not isinstance(part_type, str) or not isinstance(session_id, str):
|
||||
return None
|
||||
|
||||
timestamp = _extract_timestamp_ms(part)
|
||||
|
||||
if part_type == "text":
|
||||
text_value = delta if isinstance(delta, str) else part.get("text")
|
||||
if not isinstance(text_value, str) or not text_value:
|
||||
return None
|
||||
return {
|
||||
"type": "text",
|
||||
"sessionID": session_id,
|
||||
"timestamp": timestamp,
|
||||
"part": {"text": text_value},
|
||||
}
|
||||
|
||||
if part_type == "reasoning":
|
||||
text_value = delta if isinstance(delta, str) else part.get("text")
|
||||
if not isinstance(text_value, str) or not text_value:
|
||||
return None
|
||||
return {
|
||||
"type": "reasoning",
|
||||
"sessionID": session_id,
|
||||
"timestamp": timestamp,
|
||||
"part": {"text": text_value},
|
||||
}
|
||||
|
||||
if part_type == "tool":
|
||||
call_id = part.get("callID")
|
||||
tool_name = part.get("tool")
|
||||
state = _as_dict(part.get("state")) or {}
|
||||
if not isinstance(call_id, str) or not isinstance(tool_name, str):
|
||||
return None
|
||||
return {
|
||||
"type": "tool_use",
|
||||
"sessionID": session_id,
|
||||
"timestamp": timestamp,
|
||||
"part": {
|
||||
"callID": call_id,
|
||||
"tool": tool_name,
|
||||
"state": state,
|
||||
},
|
||||
}
|
||||
|
||||
if part_type == "step-finish":
|
||||
reason = part.get("reason")
|
||||
if not isinstance(reason, str):
|
||||
reason = "completed"
|
||||
return {
|
||||
"type": "step_finish",
|
||||
"sessionID": session_id,
|
||||
"timestamp": timestamp,
|
||||
"part": {"reason": reason},
|
||||
}
|
||||
|
||||
if part_type == "error":
|
||||
message = part.get("message")
|
||||
if not isinstance(message, str):
|
||||
return None
|
||||
return {
|
||||
"type": "error",
|
||||
"sessionID": session_id,
|
||||
"timestamp": timestamp,
|
||||
"part": {
|
||||
"message": message,
|
||||
"code": part.get("code"),
|
||||
"data": _as_dict(part.get("data")),
|
||||
},
|
||||
}
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class OpenCodeHttpClient:
|
||||
"""Executes OpenCode turns using `/event` SSE and `/prompt_async` HTTP APIs."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_url: str,
|
||||
session_id: str | None = None,
|
||||
cwd: str | None = None,
|
||||
timeout: float = OPENCODE_MESSAGE_TIMEOUT,
|
||||
keepalive_interval: float = SSE_KEEPALIVE_INTERVAL,
|
||||
) -> None:
|
||||
self._server_url = server_url.rstrip("/")
|
||||
self._cwd = cwd
|
||||
self._timeout = timeout
|
||||
self._keepalive_interval = keepalive_interval
|
||||
self._parser = OpenCodeEventParser(session_id=session_id)
|
||||
|
||||
@property
|
||||
def session_id(self) -> str | None:
|
||||
return self._parser.session_id
|
||||
|
||||
def _build_url(self, path: str) -> str:
|
||||
if self._cwd:
|
||||
query = urlencode({"directory": self._cwd})
|
||||
return f"{self._server_url}{path}?{query}"
|
||||
return f"{self._server_url}{path}"
|
||||
|
||||
def _create_session_if_needed(self) -> None:
|
||||
if self._parser.session_id:
|
||||
return
|
||||
|
||||
request = Request(
|
||||
self._build_url("/session"),
|
||||
data=b"{}",
|
||||
method="POST",
|
||||
headers={
|
||||
"content-type": "application/json",
|
||||
"accept": "application/json",
|
||||
},
|
||||
)
|
||||
with urlopen(request, timeout=15.0) as response:
|
||||
body = response.read().decode("utf-8")
|
||||
|
||||
payload = json.loads(body) if body else {}
|
||||
session_id = payload.get("id") if isinstance(payload, dict) else None
|
||||
if not isinstance(session_id, str) or not session_id:
|
||||
raise RuntimeError("OpenCode /session response missing session id")
|
||||
|
||||
self._parser.session_id = session_id
|
||||
|
||||
def _open_event_stream(self) -> HTTPResponse:
|
||||
request = Request(
|
||||
self._build_url("/event"),
|
||||
method="GET",
|
||||
headers={"accept": "text/event-stream"},
|
||||
)
|
||||
return urlopen(request, timeout=READ_TIMEOUT_SECONDS)
|
||||
|
||||
def _send_prompt_async(self, message: str) -> None:
|
||||
session_id = self._parser.session_id
|
||||
if not session_id:
|
||||
raise RuntimeError("OpenCode session id missing before prompt_async")
|
||||
|
||||
payload = {"parts": [{"type": "text", "text": message}]}
|
||||
request = Request(
|
||||
self._build_url(f"/session/{session_id}/prompt_async"),
|
||||
data=json.dumps(payload).encode("utf-8"),
|
||||
method="POST",
|
||||
headers={
|
||||
"content-type": "application/json",
|
||||
"accept": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
with urlopen(request, timeout=15.0):
|
||||
return
|
||||
except HTTPError as exc:
|
||||
body = exc.read().decode("utf-8", errors="replace")
|
||||
if exc.code == 404 and looks_like_session_not_found(body):
|
||||
raise OpenCodeSessionNotFoundError(body)
|
||||
if exc.code == 404:
|
||||
raise OpenCodeSessionNotFoundError(
|
||||
body or f"Session {session_id} not found"
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"OpenCode prompt_async failed ({exc.code}): {body[:500]}"
|
||||
)
|
||||
|
||||
def send_message(self, message: str) -> Generator[OpenCodeEvent, None, None]:
|
||||
"""Send one message through OpenCode HTTP APIs and stream normalized events."""
|
||||
packet_logger = get_packet_logger()
|
||||
packet_logger.log_raw(
|
||||
"OPENCODE-HTTP-START",
|
||||
{
|
||||
"server_url": self._server_url,
|
||||
"session_id": self._parser.session_id,
|
||||
"cwd": self._cwd,
|
||||
},
|
||||
)
|
||||
|
||||
message_roles: dict[str, str] = {}
|
||||
message_parent_ids: dict[str, str] = {}
|
||||
user_message_ids_by_session: dict[str, str] = {}
|
||||
assistant_message_ids_by_session: dict[str, str] = {}
|
||||
saw_target_busy = False
|
||||
saw_target_idle = False
|
||||
status = "started"
|
||||
error_preview: str | None = None
|
||||
|
||||
try:
|
||||
self._create_session_if_needed()
|
||||
primary_session_id = self._parser.session_id
|
||||
if not primary_session_id:
|
||||
raise RuntimeError(
|
||||
"OpenCode session id missing after /session creation"
|
||||
)
|
||||
session_parsers: dict[str, OpenCodeEventParser] = {
|
||||
primary_session_id: self._parser
|
||||
}
|
||||
|
||||
def _get_parser_for_session(session_id: str) -> OpenCodeEventParser:
|
||||
parser = session_parsers.get(session_id)
|
||||
if parser is not None:
|
||||
return parser
|
||||
parser = OpenCodeEventParser(session_id=session_id)
|
||||
session_parsers[session_id] = parser
|
||||
return parser
|
||||
|
||||
with self._open_event_stream() as stream:
|
||||
self._send_prompt_async(message)
|
||||
|
||||
start_time = time.monotonic()
|
||||
last_event_time = start_time
|
||||
data_lines: list[str] = []
|
||||
|
||||
while True:
|
||||
elapsed = time.monotonic() - start_time
|
||||
if elapsed > self._timeout:
|
||||
status = "timeout"
|
||||
yield OpenCodeError(
|
||||
opencode_session_id=primary_session_id,
|
||||
code=-1,
|
||||
message=(
|
||||
"Timeout waiting for OpenCode response "
|
||||
f"after {self._timeout:.1f}s"
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
raw_line = stream.readline()
|
||||
except (TimeoutError, socket.timeout):
|
||||
raw_line = None
|
||||
except OSError as exc:
|
||||
if "timed out" in str(exc).lower():
|
||||
raw_line = None
|
||||
else:
|
||||
raise
|
||||
|
||||
if raw_line is None:
|
||||
if (
|
||||
time.monotonic() - last_event_time
|
||||
) >= self._keepalive_interval:
|
||||
yield OpenCodeSSEKeepalive()
|
||||
last_event_time = time.monotonic()
|
||||
continue
|
||||
|
||||
# Stream closed by server.
|
||||
if raw_line == b"":
|
||||
break
|
||||
|
||||
line = raw_line.decode("utf-8", errors="replace").rstrip("\r\n")
|
||||
|
||||
if not line:
|
||||
if not data_lines:
|
||||
continue
|
||||
|
||||
payload_str = "\n".join(data_lines)
|
||||
data_lines.clear()
|
||||
try:
|
||||
payload = json.loads(payload_str)
|
||||
except json.JSONDecodeError:
|
||||
packet_logger.log_raw(
|
||||
"OPENCODE-HTTP-PARSE-ERROR",
|
||||
{"line": payload_str[:500]},
|
||||
)
|
||||
continue
|
||||
|
||||
if not isinstance(payload, dict):
|
||||
continue
|
||||
|
||||
payload_type = payload.get("type")
|
||||
properties = _as_dict(payload.get("properties")) or {}
|
||||
|
||||
if payload_type == "session.status":
|
||||
status_session_id = properties.get("sessionID")
|
||||
if (
|
||||
isinstance(status_session_id, str)
|
||||
and status_session_id == primary_session_id
|
||||
):
|
||||
status_obj = _as_dict(properties.get("status")) or {}
|
||||
status_type = status_obj.get("type")
|
||||
if status_type == "busy":
|
||||
saw_target_busy = True
|
||||
elif status_type == "idle":
|
||||
saw_target_idle = True
|
||||
if saw_target_busy:
|
||||
if not self._parser.saw_prompt_response:
|
||||
yield OpenCodePromptResponse(
|
||||
opencode_session_id=primary_session_id,
|
||||
stop_reason="completed",
|
||||
)
|
||||
status = "completed"
|
||||
return
|
||||
continue
|
||||
|
||||
if payload_type == "session.error":
|
||||
status_session_id = properties.get("sessionID")
|
||||
if (
|
||||
isinstance(status_session_id, str)
|
||||
and status_session_id == primary_session_id
|
||||
):
|
||||
error_payload = _as_dict(properties.get("error"))
|
||||
status = "failed"
|
||||
yield OpenCodeError(
|
||||
opencode_session_id=primary_session_id,
|
||||
code=(
|
||||
error_payload.get("name")
|
||||
if error_payload
|
||||
else None
|
||||
),
|
||||
message=_extract_session_error_message(
|
||||
error_payload
|
||||
),
|
||||
data=error_payload,
|
||||
)
|
||||
return
|
||||
continue
|
||||
|
||||
if payload_type == "message.updated":
|
||||
info = _as_dict(properties.get("info")) or {}
|
||||
info_session_id = info.get("sessionID")
|
||||
if not isinstance(info_session_id, str):
|
||||
continue
|
||||
|
||||
parser_for_session = _get_parser_for_session(
|
||||
info_session_id
|
||||
)
|
||||
raw_info_event = {
|
||||
"type": "noop",
|
||||
"sessionID": info_session_id,
|
||||
"timestamp": _extract_timestamp_ms(info),
|
||||
"part": {},
|
||||
}
|
||||
for parsed_event in parser_for_session.parse_raw_event(
|
||||
raw_info_event
|
||||
):
|
||||
last_event_time = time.monotonic()
|
||||
yield parsed_event
|
||||
|
||||
message_id = info.get("id")
|
||||
role = info.get("role")
|
||||
if isinstance(message_id, str) and isinstance(role, str):
|
||||
message_roles[message_id] = role
|
||||
parent_id = info.get("parentID")
|
||||
if isinstance(parent_id, str):
|
||||
message_parent_ids[message_id] = parent_id
|
||||
|
||||
if (
|
||||
role == "user"
|
||||
and info_session_id
|
||||
not in user_message_ids_by_session
|
||||
):
|
||||
user_message_ids_by_session[info_session_id] = (
|
||||
message_id
|
||||
)
|
||||
elif role == "assistant":
|
||||
user_message_id = user_message_ids_by_session.get(
|
||||
info_session_id
|
||||
)
|
||||
if (
|
||||
isinstance(parent_id, str)
|
||||
and parent_id == user_message_id
|
||||
):
|
||||
assistant_message_ids_by_session[
|
||||
info_session_id
|
||||
] = message_id
|
||||
continue
|
||||
|
||||
if payload_type != "message.part.updated":
|
||||
continue
|
||||
|
||||
part = _as_dict(properties.get("part")) or {}
|
||||
part_session_id = part.get("sessionID")
|
||||
if not isinstance(part_session_id, str):
|
||||
continue
|
||||
|
||||
message_id = part.get("messageID")
|
||||
if not isinstance(message_id, str):
|
||||
continue
|
||||
|
||||
role = message_roles.get(message_id)
|
||||
if role != "assistant":
|
||||
continue
|
||||
|
||||
expected_assistant_id = assistant_message_ids_by_session.get(
|
||||
part_session_id
|
||||
)
|
||||
if (
|
||||
expected_assistant_id
|
||||
and message_id != expected_assistant_id
|
||||
):
|
||||
continue
|
||||
|
||||
if not expected_assistant_id:
|
||||
user_message_id = user_message_ids_by_session.get(
|
||||
part_session_id
|
||||
)
|
||||
parent_id = message_parent_ids.get(message_id)
|
||||
if (
|
||||
user_message_id
|
||||
and isinstance(parent_id, str)
|
||||
and parent_id != user_message_id
|
||||
):
|
||||
continue
|
||||
assistant_message_ids_by_session[part_session_id] = (
|
||||
message_id
|
||||
)
|
||||
|
||||
raw_part_event = _build_raw_event_from_part(
|
||||
part=part,
|
||||
delta=(
|
||||
properties.get("delta")
|
||||
if isinstance(properties.get("delta"), str)
|
||||
else None
|
||||
),
|
||||
)
|
||||
if raw_part_event is None:
|
||||
continue
|
||||
|
||||
parser_for_session = _get_parser_for_session(part_session_id)
|
||||
for parsed_event in parser_for_session.parse_raw_event(
|
||||
raw_part_event
|
||||
):
|
||||
last_event_time = time.monotonic()
|
||||
yield parsed_event
|
||||
if (
|
||||
isinstance(parsed_event, OpenCodePromptResponse)
|
||||
and part_session_id == primary_session_id
|
||||
):
|
||||
status = "completed"
|
||||
return
|
||||
|
||||
continue
|
||||
|
||||
if line.startswith("data:"):
|
||||
data_lines.append(line[5:].lstrip())
|
||||
|
||||
# Flush trailing buffered SSE data if stream closed mid-event.
|
||||
if data_lines:
|
||||
payload_str = "\n".join(data_lines)
|
||||
try:
|
||||
payload = json.loads(payload_str)
|
||||
except json.JSONDecodeError:
|
||||
payload = None
|
||||
if isinstance(payload, dict):
|
||||
properties = _as_dict(payload.get("properties")) or {}
|
||||
part = _as_dict(properties.get("part")) or {}
|
||||
part_session_id = part.get("sessionID")
|
||||
if not isinstance(part_session_id, str):
|
||||
part_session_id = primary_session_id
|
||||
raw_part_event = _build_raw_event_from_part(
|
||||
part=part,
|
||||
delta=(
|
||||
properties.get("delta")
|
||||
if isinstance(properties.get("delta"), str)
|
||||
else None
|
||||
),
|
||||
)
|
||||
if raw_part_event:
|
||||
parser_for_session = _get_parser_for_session(
|
||||
part_session_id
|
||||
)
|
||||
for parsed_event in parser_for_session.parse_raw_event(
|
||||
raw_part_event
|
||||
):
|
||||
yield parsed_event
|
||||
if (
|
||||
isinstance(parsed_event, OpenCodePromptResponse)
|
||||
and part_session_id == primary_session_id
|
||||
):
|
||||
status = "completed"
|
||||
return
|
||||
|
||||
if not self._parser.saw_prompt_response:
|
||||
# Session may have completed without explicit step-finish event.
|
||||
stop_reason = "completed"
|
||||
if saw_target_idle and not saw_target_busy:
|
||||
stop_reason = "idle"
|
||||
yield OpenCodePromptResponse(
|
||||
opencode_session_id=primary_session_id,
|
||||
stop_reason=stop_reason,
|
||||
)
|
||||
|
||||
status = "completed"
|
||||
except OpenCodeSessionNotFoundError:
|
||||
status = "session_not_found"
|
||||
raise
|
||||
except Exception as exc:
|
||||
status = "failed"
|
||||
error_preview = str(exc)
|
||||
yield OpenCodeError(
|
||||
opencode_session_id=self._parser.session_id,
|
||||
message=error_preview,
|
||||
)
|
||||
finally:
|
||||
packet_logger.log_raw(
|
||||
"OPENCODE-HTTP-END",
|
||||
{
|
||||
"server_url": self._server_url,
|
||||
"session_id": self._parser.session_id,
|
||||
"cwd": self._cwd,
|
||||
"status": status,
|
||||
"saw_prompt_response": self._parser.saw_prompt_response,
|
||||
"error_preview": (error_preview[:500] if error_preview else None),
|
||||
},
|
||||
)
|
||||
257
backend/onyx/server/features/build/sandbox/opencode/parser.py
Normal file
257
backend/onyx/server/features/build/sandbox/opencode/parser.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""Shared OpenCode raw-event parser used by local and Kubernetes clients."""
|
||||
|
||||
import json
|
||||
from collections.abc import Generator
|
||||
from typing import Any
|
||||
|
||||
from onyx.server.features.build.sandbox.opencode.events import (
|
||||
OpenCodeAgentMessageChunk,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.opencode.events import (
|
||||
OpenCodeAgentThoughtChunk,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeError
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeEvent
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodePromptResponse
|
||||
from onyx.server.features.build.sandbox.opencode.events import (
|
||||
OpenCodeSessionEstablished,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeTextContent
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeToolCallProgress
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeToolCallStart
|
||||
from onyx.server.features.build.sandbox.opencode.events import timestamp_ms_to_iso
|
||||
|
||||
|
||||
def _normalize_status(status: Any) -> str:
|
||||
if not isinstance(status, str):
|
||||
return "pending"
|
||||
normalized = status.strip().lower()
|
||||
alias_map = {
|
||||
"running": "in_progress",
|
||||
"in-progress": "in_progress",
|
||||
"done": "completed",
|
||||
"success": "completed",
|
||||
"error": "failed",
|
||||
}
|
||||
normalized = alias_map.get(normalized, normalized)
|
||||
if normalized in {"pending", "in_progress", "completed", "failed", "cancelled"}:
|
||||
return normalized
|
||||
return "pending"
|
||||
|
||||
|
||||
def _map_tool_kind(tool_name: str, raw_input: dict[str, Any] | None) -> str:
|
||||
"""Map OpenCode tool name into the UI's existing tool kind bucket."""
|
||||
lower_name = tool_name.lower()
|
||||
|
||||
if lower_name in {"glob", "grep", "websearch"}:
|
||||
return "search"
|
||||
if lower_name in {"read"}:
|
||||
return "read"
|
||||
if lower_name in {"bash"}:
|
||||
return "execute"
|
||||
if lower_name in {"task"}:
|
||||
return "task"
|
||||
if lower_name in {"apply_patch", "edit", "write"}:
|
||||
return "edit"
|
||||
|
||||
if raw_input:
|
||||
if isinstance(raw_input.get("command"), str):
|
||||
return "execute"
|
||||
if isinstance(raw_input.get("patchText"), str):
|
||||
return "edit"
|
||||
if raw_input.get("subagent_type") or raw_input.get("subagentType"):
|
||||
return "task"
|
||||
|
||||
return "other"
|
||||
|
||||
|
||||
def _as_dict(value: Any) -> dict[str, Any] | None:
|
||||
return value if isinstance(value, dict) else None
|
||||
|
||||
|
||||
def _build_tool_content(
|
||||
tool_name: str,
|
||||
raw_input: dict[str, Any] | None,
|
||||
raw_output: dict[str, Any] | None,
|
||||
) -> list[dict[str, Any]] | None:
|
||||
"""Build tool content blocks used by frontend diff/read parsing logic."""
|
||||
if tool_name.lower() == "apply_patch":
|
||||
metadata = _as_dict((raw_output or {}).get("metadata"))
|
||||
files = metadata.get("files") if metadata else None
|
||||
if isinstance(files, list):
|
||||
blocks: list[dict[str, Any]] = []
|
||||
for file_entry in files:
|
||||
if not isinstance(file_entry, dict):
|
||||
continue
|
||||
path = file_entry.get("relativePath") or file_entry.get("filePath")
|
||||
blocks.append(
|
||||
{
|
||||
"type": "diff",
|
||||
"path": path,
|
||||
"oldText": file_entry.get("before") or "",
|
||||
"newText": file_entry.get("after") or "",
|
||||
}
|
||||
)
|
||||
if blocks:
|
||||
return blocks
|
||||
|
||||
if raw_input and isinstance(raw_input.get("path"), str):
|
||||
return [{"type": "diff", "path": raw_input["path"]}]
|
||||
|
||||
if tool_name.lower() == "read" and raw_output:
|
||||
output_text = raw_output.get("output")
|
||||
if isinstance(output_text, str):
|
||||
return [
|
||||
{
|
||||
"type": "content",
|
||||
"content": {"type": "text", "text": output_text},
|
||||
}
|
||||
]
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def looks_like_session_not_found(error_text: str) -> bool:
|
||||
lower = error_text.lower()
|
||||
candidates = (
|
||||
"session not found",
|
||||
"unknown session",
|
||||
"invalid session",
|
||||
"session does not exist",
|
||||
)
|
||||
return any(token in lower for token in candidates)
|
||||
|
||||
|
||||
class OpenCodeEventParser:
|
||||
"""Stateful parser that converts raw OpenCode JSON lines into UI packets."""
|
||||
|
||||
def __init__(self, session_id: str | None = None) -> None:
|
||||
self.session_id = session_id
|
||||
self._emitted_session_id: str | None = None
|
||||
self.saw_prompt_response = False
|
||||
self._seen_tool_call_ids: set[str] = set()
|
||||
|
||||
def parse_raw_event(
|
||||
self, raw_event: dict[str, Any]
|
||||
) -> Generator[OpenCodeEvent, None, None]:
|
||||
timestamp = timestamp_ms_to_iso(raw_event.get("timestamp"))
|
||||
|
||||
session_id = raw_event.get("sessionID") or raw_event.get("sessionId")
|
||||
if isinstance(session_id, str):
|
||||
if self.session_id != session_id:
|
||||
self.session_id = session_id
|
||||
if self._emitted_session_id != session_id:
|
||||
self._emitted_session_id = session_id
|
||||
yield OpenCodeSessionEstablished(
|
||||
opencode_session_id=session_id,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
|
||||
raw_type = raw_event.get("type")
|
||||
part = _as_dict(raw_event.get("part")) or {}
|
||||
|
||||
if raw_type == "text":
|
||||
text = part.get("text")
|
||||
if isinstance(text, str) and text:
|
||||
yield OpenCodeAgentMessageChunk(
|
||||
opencode_session_id=self.session_id,
|
||||
content=OpenCodeTextContent(text=text),
|
||||
timestamp=timestamp,
|
||||
)
|
||||
return
|
||||
|
||||
if raw_type == "reasoning":
|
||||
text = part.get("text")
|
||||
if isinstance(text, str) and text:
|
||||
yield OpenCodeAgentThoughtChunk(
|
||||
opencode_session_id=self.session_id,
|
||||
content=OpenCodeTextContent(text=text),
|
||||
timestamp=timestamp,
|
||||
)
|
||||
return
|
||||
|
||||
if raw_type == "tool_use":
|
||||
call_id = part.get("callID")
|
||||
tool_name = part.get("tool")
|
||||
state = _as_dict(part.get("state")) or {}
|
||||
raw_input = _as_dict(state.get("input"))
|
||||
metadata = _as_dict(state.get("metadata")) or {}
|
||||
output = state.get("output")
|
||||
title = state.get("title")
|
||||
status = _normalize_status(state.get("status"))
|
||||
|
||||
if not isinstance(call_id, str) or not isinstance(tool_name, str):
|
||||
return
|
||||
|
||||
kind = _map_tool_kind(tool_name, raw_input)
|
||||
raw_output: dict[str, Any] = {
|
||||
"output": (
|
||||
output
|
||||
if isinstance(output, str)
|
||||
else json.dumps(output, default=str)
|
||||
),
|
||||
"metadata": metadata,
|
||||
}
|
||||
|
||||
if tool_name.lower() == "read" and isinstance(output, str):
|
||||
raw_output["content"] = output
|
||||
|
||||
if tool_name.lower() == "task":
|
||||
task_session_id = metadata.get("sessionId") or metadata.get("sessionID")
|
||||
if not isinstance(task_session_id, str):
|
||||
task_session_id = metadata.get("session_id")
|
||||
if isinstance(task_session_id, str):
|
||||
raw_output["sessionId"] = task_session_id
|
||||
|
||||
content = _build_tool_content(tool_name, raw_input, raw_output)
|
||||
|
||||
if call_id not in self._seen_tool_call_ids:
|
||||
self._seen_tool_call_ids.add(call_id)
|
||||
yield OpenCodeToolCallStart(
|
||||
opencode_session_id=self.session_id,
|
||||
tool_call_id=call_id,
|
||||
tool_name=tool_name,
|
||||
kind=kind,
|
||||
title=title if isinstance(title, str) else None,
|
||||
content=content,
|
||||
raw_input=raw_input,
|
||||
raw_output=None,
|
||||
status="pending",
|
||||
timestamp=timestamp,
|
||||
)
|
||||
|
||||
yield OpenCodeToolCallProgress(
|
||||
opencode_session_id=self.session_id,
|
||||
tool_call_id=call_id,
|
||||
tool_name=tool_name,
|
||||
kind=kind,
|
||||
title=title if isinstance(title, str) else None,
|
||||
content=content,
|
||||
raw_input=raw_input,
|
||||
raw_output=raw_output,
|
||||
status=status,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
return
|
||||
|
||||
if raw_type == "step_finish":
|
||||
reason = part.get("reason")
|
||||
if isinstance(reason, str) and reason.lower() != "tool-calls":
|
||||
self.saw_prompt_response = True
|
||||
yield OpenCodePromptResponse(
|
||||
opencode_session_id=self.session_id,
|
||||
stop_reason=reason,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
return
|
||||
|
||||
if raw_type == "error":
|
||||
message = part.get("message")
|
||||
if isinstance(message, str):
|
||||
yield OpenCodeError(
|
||||
opencode_session_id=self.session_id,
|
||||
message=message,
|
||||
code=part.get("code"),
|
||||
data=_as_dict(part.get("data")),
|
||||
timestamp=timestamp,
|
||||
)
|
||||
@@ -0,0 +1,214 @@
|
||||
"""OpenCode run client for one-shot message execution against a running server."""
|
||||
|
||||
import json
|
||||
import select
|
||||
import subprocess
|
||||
import time
|
||||
from collections.abc import Generator
|
||||
|
||||
from onyx.server.features.build.api.packet_logger import get_packet_logger
|
||||
from onyx.server.features.build.configs import OPENCODE_MESSAGE_TIMEOUT
|
||||
from onyx.server.features.build.configs import SSE_KEEPALIVE_INTERVAL
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeError
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeEvent
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodePromptResponse
|
||||
from onyx.server.features.build.sandbox.opencode.events import OpenCodeSSEKeepalive
|
||||
from onyx.server.features.build.sandbox.opencode.parser import (
|
||||
looks_like_session_not_found,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.opencode.parser import OpenCodeEventParser
|
||||
|
||||
|
||||
class OpenCodeSessionNotFoundError(RuntimeError):
|
||||
"""Raised when a requested OpenCode session ID is not available on the server."""
|
||||
|
||||
|
||||
class OpenCodeRunClient:
|
||||
"""Executes `opencode run --attach` and streams normalized packet events."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_url: str,
|
||||
session_id: str | None = None,
|
||||
cwd: str | None = None,
|
||||
timeout: float = OPENCODE_MESSAGE_TIMEOUT,
|
||||
keepalive_interval: float = SSE_KEEPALIVE_INTERVAL,
|
||||
) -> None:
|
||||
self._server_url = server_url
|
||||
self._cwd = cwd
|
||||
self._timeout = timeout
|
||||
self._keepalive_interval = keepalive_interval
|
||||
self._parser = OpenCodeEventParser(session_id=session_id)
|
||||
|
||||
@property
|
||||
def session_id(self) -> str | None:
|
||||
return self._parser.session_id
|
||||
|
||||
def _build_command(self, message: str) -> list[str]:
|
||||
command = [
|
||||
"opencode",
|
||||
"run",
|
||||
"--attach",
|
||||
self._server_url,
|
||||
"--format",
|
||||
"json",
|
||||
]
|
||||
if self._parser.session_id:
|
||||
command.extend(["--session", self._parser.session_id])
|
||||
command.append(message)
|
||||
return command
|
||||
|
||||
@staticmethod
|
||||
def _terminate_process(process: subprocess.Popen[str]) -> None:
|
||||
if process.poll() is not None:
|
||||
return
|
||||
process.terminate()
|
||||
try:
|
||||
process.wait(timeout=3)
|
||||
except subprocess.TimeoutExpired:
|
||||
process.kill()
|
||||
|
||||
def send_message(self, message: str) -> Generator[OpenCodeEvent, None, None]:
|
||||
"""Send a message and stream normalized events."""
|
||||
command = self._build_command(message)
|
||||
packet_logger = get_packet_logger()
|
||||
packet_logger.log_raw(
|
||||
"OPENCODE-RUN-START",
|
||||
{
|
||||
"server_url": self._server_url,
|
||||
"session_id": self._parser.session_id,
|
||||
"cwd": self._cwd,
|
||||
"command": command,
|
||||
},
|
||||
)
|
||||
|
||||
process = subprocess.Popen(
|
||||
command,
|
||||
cwd=self._cwd,
|
||||
stdin=subprocess.DEVNULL,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
)
|
||||
|
||||
if process.stdout is None or process.stderr is None:
|
||||
self._terminate_process(process)
|
||||
yield OpenCodeError(
|
||||
opencode_session_id=self._parser.session_id,
|
||||
message="Failed to open opencode subprocess pipes",
|
||||
)
|
||||
return
|
||||
|
||||
start_time = time.monotonic()
|
||||
last_event_time = start_time
|
||||
stderr_lines: list[str] = []
|
||||
|
||||
stdout_fd = process.stdout.fileno()
|
||||
stderr_fd = process.stderr.fileno()
|
||||
active_fds = {stdout_fd, stderr_fd}
|
||||
fd_to_stream = {
|
||||
stdout_fd: process.stdout,
|
||||
stderr_fd: process.stderr,
|
||||
}
|
||||
|
||||
try:
|
||||
while active_fds:
|
||||
elapsed = time.monotonic() - start_time
|
||||
if elapsed > self._timeout:
|
||||
self._terminate_process(process)
|
||||
yield OpenCodeError(
|
||||
opencode_session_id=self._parser.session_id,
|
||||
code=-1,
|
||||
message=(
|
||||
f"Timeout waiting for OpenCode response after {self._timeout:.1f}s"
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
ready, _, _ = select.select(list(active_fds), [], [], 1.0)
|
||||
if not ready:
|
||||
if (time.monotonic() - last_event_time) >= self._keepalive_interval:
|
||||
yield OpenCodeSSEKeepalive()
|
||||
last_event_time = time.monotonic()
|
||||
|
||||
# Process may have exited while stdout/stderr fds are still open.
|
||||
if process.poll() is not None:
|
||||
for fd in list(active_fds):
|
||||
stream = fd_to_stream[fd]
|
||||
remaining = stream.readline()
|
||||
if not remaining:
|
||||
active_fds.discard(fd)
|
||||
elif fd == stderr_fd:
|
||||
stderr_lines.append(remaining.strip())
|
||||
if process.poll() is not None and not active_fds:
|
||||
break
|
||||
continue
|
||||
|
||||
for fd in ready:
|
||||
stream = fd_to_stream[fd]
|
||||
line = stream.readline()
|
||||
if line == "":
|
||||
active_fds.discard(fd)
|
||||
continue
|
||||
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if fd == stderr_fd:
|
||||
stderr_lines.append(line)
|
||||
continue
|
||||
|
||||
try:
|
||||
raw_event = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
packet_logger.log_raw(
|
||||
"OPENCODE-RUN-PARSE-ERROR",
|
||||
{"line": line[:500]},
|
||||
)
|
||||
continue
|
||||
|
||||
for event in self._parser.parse_raw_event(raw_event):
|
||||
last_event_time = time.monotonic()
|
||||
yield event
|
||||
finally:
|
||||
if process.poll() is None:
|
||||
self._terminate_process(process)
|
||||
|
||||
return_code = process.returncode
|
||||
if return_code is None:
|
||||
try:
|
||||
return_code = process.wait(timeout=3)
|
||||
except subprocess.TimeoutExpired:
|
||||
self._terminate_process(process)
|
||||
return_code = process.returncode
|
||||
|
||||
stderr_text = "\n".join(stderr_lines).strip()
|
||||
|
||||
packet_logger.log_raw(
|
||||
"OPENCODE-RUN-END",
|
||||
{
|
||||
"server_url": self._server_url,
|
||||
"session_id": self._parser.session_id,
|
||||
"cwd": self._cwd,
|
||||
"return_code": return_code,
|
||||
"stderr_preview": stderr_text[:500] if stderr_text else None,
|
||||
},
|
||||
)
|
||||
|
||||
if return_code not in (0, None):
|
||||
if stderr_text and looks_like_session_not_found(stderr_text):
|
||||
raise OpenCodeSessionNotFoundError(stderr_text)
|
||||
yield OpenCodeError(
|
||||
opencode_session_id=self._parser.session_id,
|
||||
code=return_code,
|
||||
message=stderr_text or f"OpenCode run exited with code {return_code}",
|
||||
)
|
||||
return
|
||||
|
||||
if not self._parser.saw_prompt_response:
|
||||
yield OpenCodePromptResponse(
|
||||
opencode_session_id=self._parser.session_id,
|
||||
stop_reason="completed",
|
||||
)
|
||||
@@ -15,14 +15,6 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
from acp.schema import AgentMessageChunk
|
||||
from acp.schema import AgentPlanUpdate
|
||||
from acp.schema import AgentThoughtChunk
|
||||
from acp.schema import CurrentModeUpdate
|
||||
from acp.schema import Error as ACPError
|
||||
from acp.schema import PromptResponse
|
||||
from acp.schema import ToolCallProgress
|
||||
from acp.schema import ToolCallStart
|
||||
from sqlalchemy.orm import Session as DBSession
|
||||
|
||||
from onyx.configs.app_configs import WEB_DOMAIN
|
||||
@@ -62,8 +54,8 @@ from onyx.server.features.build.db.build_session import get_build_session
|
||||
from onyx.server.features.build.db.build_session import get_empty_session_for_user
|
||||
from onyx.server.features.build.db.build_session import get_session_messages
|
||||
from onyx.server.features.build.db.build_session import get_user_build_sessions
|
||||
from onyx.server.features.build.db.build_session import update_opencode_session_id
|
||||
from onyx.server.features.build.db.build_session import update_session_activity
|
||||
from onyx.server.features.build.db.build_session import upsert_agent_plan
|
||||
from onyx.server.features.build.db.sandbox import create_sandbox__no_commit
|
||||
from onyx.server.features.build.db.sandbox import get_running_sandbox_count_by_tenant
|
||||
from onyx.server.features.build.db.sandbox import get_sandbox_by_session_id
|
||||
@@ -71,10 +63,15 @@ from onyx.server.features.build.db.sandbox import get_sandbox_by_user_id
|
||||
from onyx.server.features.build.db.sandbox import update_sandbox_heartbeat
|
||||
from onyx.server.features.build.db.sandbox import update_sandbox_status__no_commit
|
||||
from onyx.server.features.build.sandbox import get_sandbox_manager
|
||||
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
|
||||
SSEKeepalive,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.models import LLMProviderConfig
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeAgentMessageChunk
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeAgentThoughtChunk
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeError
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodePromptResponse
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeSessionEstablished
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeSSEKeepalive
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeToolCallProgress
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeToolCallStart
|
||||
from onyx.server.features.build.sandbox.tasks.tasks import (
|
||||
_get_disabled_user_library_paths,
|
||||
)
|
||||
@@ -99,9 +96,9 @@ class UploadLimitExceededError(ValueError):
|
||||
|
||||
|
||||
class BuildStreamingState:
|
||||
"""Container for accumulating state during ACP streaming.
|
||||
"""Container for accumulating state during OpenCode streaming.
|
||||
|
||||
Similar to ChatStateContainer but adapted for ACP packet types.
|
||||
Similar to ChatStateContainer but adapted for build-mode packet types.
|
||||
Accumulates chunks and tracks pending tool calls until completion.
|
||||
|
||||
Usage:
|
||||
@@ -130,9 +127,6 @@ class BuildStreamingState:
|
||||
self.message_chunks: list[str] = []
|
||||
self.thought_chunks: list[str] = []
|
||||
|
||||
# For upserting agent_plan_update - track ID so we can update in place
|
||||
self.plan_message_id: UUID | None = None
|
||||
|
||||
# Track what type of chunk we were last receiving
|
||||
self._last_chunk_type: str | None = None
|
||||
|
||||
@@ -1103,18 +1097,17 @@ class SessionManager:
|
||||
- agent_thought_chunk: Accumulated, saved as one synthetic packet at end/type change
|
||||
- tool_call_start: Streamed to frontend only, not saved
|
||||
- tool_call_progress: Only saved when status="completed"
|
||||
- agent_plan_update: Upserted (only latest plan kept per turn)
|
||||
"""
|
||||
|
||||
def _serialize_acp_event(event: Any, event_type: str) -> str:
|
||||
"""Serialize an ACP event to SSE format, preserving ALL ACP data."""
|
||||
def _serialize_event(event: Any, event_type: str) -> str:
|
||||
"""Serialize an event to SSE format, preserving all event data."""
|
||||
if hasattr(event, "model_dump"):
|
||||
data = event.model_dump(mode="json", by_alias=True, exclude_none=False)
|
||||
else:
|
||||
data = {"raw": str(event)}
|
||||
|
||||
data["type"] = event_type
|
||||
data["timestamp"] = datetime.now(tz=timezone.utc).isoformat()
|
||||
data.setdefault("timestamp", datetime.now(tz=timezone.utc).isoformat())
|
||||
|
||||
return f"event: message\ndata: {json.dumps(data)}\n\n"
|
||||
|
||||
@@ -1123,7 +1116,7 @@ class SessionManager:
|
||||
return f"event: message\ndata: {packet.model_dump_json(by_alias=True)}\n\n"
|
||||
|
||||
def _extract_text_from_content(content: Any) -> str:
|
||||
"""Extract text from ACP content structure."""
|
||||
"""Extract text from event content structures."""
|
||||
if content is None:
|
||||
return ""
|
||||
if hasattr(content, "type") and content.type == "text":
|
||||
@@ -1263,81 +1256,321 @@ class SessionManager:
|
||||
},
|
||||
)
|
||||
|
||||
# Stream ACP events directly to frontend
|
||||
for acp_event in self._sandbox_manager.send_message(
|
||||
sandbox_id, session_id, user_message_content
|
||||
if session.nextjs_port is None:
|
||||
error_packet = ErrorPacket(
|
||||
message="Session is missing a Next.js port allocation."
|
||||
)
|
||||
packet_logger.log("error", error_packet.model_dump())
|
||||
yield _format_packet_event(error_packet)
|
||||
return
|
||||
|
||||
primary_opencode_session_id: str | None = session.opencode_session_id
|
||||
saw_primary_opencode_session = False
|
||||
task_call_to_subagent_session: dict[str, str] = {}
|
||||
subagent_session_to_task_call: dict[str, str] = {}
|
||||
buffered_subagent_packets_by_session: dict[str, list[dict[str, Any]]] = {}
|
||||
|
||||
def _extract_task_subagent_session_id(
|
||||
event_data: dict[str, Any],
|
||||
) -> str | None:
|
||||
raw_output_obj = (
|
||||
event_data.get("raw_output") or event_data.get("rawOutput") or {}
|
||||
)
|
||||
raw_output = raw_output_obj if isinstance(raw_output_obj, dict) else {}
|
||||
metadata_obj = raw_output.get("metadata")
|
||||
metadata = metadata_obj if isinstance(metadata_obj, dict) else {}
|
||||
|
||||
direct = (
|
||||
raw_output.get("sessionId")
|
||||
or raw_output.get("sessionID")
|
||||
or raw_output.get("session_id")
|
||||
)
|
||||
if isinstance(direct, str) and direct:
|
||||
return direct
|
||||
|
||||
nested = (
|
||||
metadata.get("sessionId")
|
||||
or metadata.get("sessionID")
|
||||
or metadata.get("session_id")
|
||||
)
|
||||
if isinstance(nested, str) and nested:
|
||||
return nested
|
||||
|
||||
return None
|
||||
|
||||
def _serialize_subagent_packet(
|
||||
parent_tool_call_id: str,
|
||||
subagent_session_id: str,
|
||||
packet: dict[str, Any],
|
||||
) -> str:
|
||||
payload = {
|
||||
"type": "subagent_packet",
|
||||
"parent_tool_call_id": parent_tool_call_id,
|
||||
"subagent_session_id": subagent_session_id,
|
||||
"packet": packet,
|
||||
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
|
||||
}
|
||||
return f"event: message\ndata: {json.dumps(payload)}\n\n"
|
||||
|
||||
def _emit_subagent_packets(
|
||||
parent_tool_call_id: str,
|
||||
subagent_session_id: str,
|
||||
packets: list[dict[str, Any]],
|
||||
) -> Generator[str, None, None]:
|
||||
nonlocal events_emitted
|
||||
|
||||
for packet in packets:
|
||||
subagent_packet = {
|
||||
"type": "subagent_packet",
|
||||
"parent_tool_call_id": parent_tool_call_id,
|
||||
"subagent_session_id": subagent_session_id,
|
||||
"packet": packet,
|
||||
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
create_message(
|
||||
session_id=session_id,
|
||||
message_type=MessageType.ASSISTANT,
|
||||
turn_index=state.turn_index,
|
||||
message_metadata=subagent_packet,
|
||||
db_session=self._db_session,
|
||||
)
|
||||
|
||||
packet_logger.log("subagent_packet", subagent_packet)
|
||||
packet_logger.log_sse_emit("subagent_packet", session_id)
|
||||
events_emitted += 1
|
||||
yield _serialize_subagent_packet(
|
||||
parent_tool_call_id=parent_tool_call_id,
|
||||
subagent_session_id=subagent_session_id,
|
||||
packet=packet,
|
||||
)
|
||||
|
||||
def _event_to_packet_dict(event: Any) -> dict[str, Any] | None:
|
||||
if isinstance(
|
||||
event, (OpenCodeSSEKeepalive, OpenCodeSessionEstablished)
|
||||
):
|
||||
return None
|
||||
|
||||
event_type = self._get_event_type(event)
|
||||
if not event_type or event_type == "unknown":
|
||||
return None
|
||||
|
||||
if hasattr(event, "model_dump"):
|
||||
data = event.model_dump(
|
||||
mode="json", by_alias=True, exclude_none=False
|
||||
)
|
||||
else:
|
||||
data = {"raw": str(event)}
|
||||
|
||||
data["type"] = event_type
|
||||
data.setdefault("timestamp", datetime.now(tz=timezone.utc).isoformat())
|
||||
return data
|
||||
|
||||
def _flush_buffered_subagent_packets(
|
||||
subagent_session_id: str,
|
||||
) -> Generator[str, None, None]:
|
||||
parent_tool_call_id = subagent_session_to_task_call.get(
|
||||
subagent_session_id
|
||||
)
|
||||
if not parent_tool_call_id:
|
||||
return
|
||||
|
||||
buffered_packets = buffered_subagent_packets_by_session.pop(
|
||||
subagent_session_id, []
|
||||
)
|
||||
if buffered_packets:
|
||||
yield from _emit_subagent_packets(
|
||||
parent_tool_call_id=parent_tool_call_id,
|
||||
subagent_session_id=subagent_session_id,
|
||||
packets=buffered_packets,
|
||||
)
|
||||
|
||||
def _persist_primary_opencode_session(
|
||||
opencode_session_id: str,
|
||||
) -> None:
|
||||
nonlocal primary_opencode_session_id
|
||||
|
||||
if primary_opencode_session_id == opencode_session_id:
|
||||
return
|
||||
|
||||
primary_opencode_session_id = opencode_session_id
|
||||
if session.opencode_session_id != opencode_session_id:
|
||||
update_opencode_session_id(
|
||||
session_id=session_id,
|
||||
opencode_session_id=opencode_session_id,
|
||||
db_session=self._db_session,
|
||||
)
|
||||
session.opencode_session_id = opencode_session_id
|
||||
|
||||
# Stream OpenCode events directly to frontend.
|
||||
for opencode_event in self._sandbox_manager.send_message(
|
||||
sandbox_id=sandbox_id,
|
||||
session_id=session_id,
|
||||
nextjs_port=session.nextjs_port,
|
||||
message=user_message_content,
|
||||
opencode_session_id=session.opencode_session_id,
|
||||
):
|
||||
# Handle SSE keepalive - send comment to keep connection alive
|
||||
if isinstance(acp_event, SSEKeepalive):
|
||||
# SSE comments start with : and are ignored by EventSource
|
||||
# but keep the HTTP connection alive
|
||||
# Handle SSE keepalive - send comment to keep connection alive.
|
||||
if isinstance(opencode_event, OpenCodeSSEKeepalive):
|
||||
packet_logger.log_sse_emit("keepalive", session_id)
|
||||
yield ": keepalive\n\n"
|
||||
continue
|
||||
|
||||
# Check if we need to finalize pending chunks before processing
|
||||
event_type = self._get_event_type(acp_event)
|
||||
# Internal event used for OpenCode session-id persistence.
|
||||
if isinstance(opencode_event, OpenCodeSessionEstablished):
|
||||
if not saw_primary_opencode_session:
|
||||
saw_primary_opencode_session = True
|
||||
_persist_primary_opencode_session(
|
||||
opencode_event.opencode_session_id
|
||||
)
|
||||
packet_logger.log(
|
||||
"opencode_session_established",
|
||||
opencode_event.model_dump(
|
||||
mode="json", by_alias=True, exclude_none=False
|
||||
),
|
||||
)
|
||||
continue
|
||||
|
||||
event_session_id = getattr(opencode_event, "opencode_session_id", None)
|
||||
if (
|
||||
not saw_primary_opencode_session
|
||||
and isinstance(event_session_id, str)
|
||||
and event_session_id
|
||||
):
|
||||
saw_primary_opencode_session = True
|
||||
_persist_primary_opencode_session(event_session_id)
|
||||
|
||||
is_primary_event = (
|
||||
not isinstance(event_session_id, str)
|
||||
or not event_session_id
|
||||
or event_session_id == primary_opencode_session_id
|
||||
)
|
||||
|
||||
if not is_primary_event:
|
||||
packet = _event_to_packet_dict(opencode_event)
|
||||
if packet is None:
|
||||
continue
|
||||
|
||||
parent_tool_call_id = subagent_session_to_task_call.get(
|
||||
event_session_id
|
||||
)
|
||||
if parent_tool_call_id:
|
||||
yield from _emit_subagent_packets(
|
||||
parent_tool_call_id=parent_tool_call_id,
|
||||
subagent_session_id=event_session_id,
|
||||
packets=[packet],
|
||||
)
|
||||
else:
|
||||
buffered_subagent_packets_by_session.setdefault(
|
||||
event_session_id, []
|
||||
).append(packet)
|
||||
continue
|
||||
|
||||
# Check if we need to finalize pending chunks before processing.
|
||||
event_type = self._get_event_type(opencode_event)
|
||||
if state.should_finalize_chunks(event_type):
|
||||
_save_pending_chunks(state)
|
||||
|
||||
events_emitted += 1
|
||||
|
||||
# Pass through ACP events with snake_case type names
|
||||
if isinstance(acp_event, AgentMessageChunk):
|
||||
text = _extract_text_from_content(acp_event.content)
|
||||
if isinstance(opencode_event, OpenCodeAgentMessageChunk):
|
||||
text = _extract_text_from_content(opencode_event.content)
|
||||
if text:
|
||||
state.add_message_chunk(text)
|
||||
event_data = acp_event.model_dump(
|
||||
event_data = opencode_event.model_dump(
|
||||
mode="json", by_alias=True, exclude_none=False
|
||||
)
|
||||
event_data["type"] = "agent_message_chunk"
|
||||
packet_logger.log("agent_message_chunk", event_data)
|
||||
packet_logger.log_sse_emit("agent_message_chunk", session_id)
|
||||
yield _serialize_acp_event(acp_event, "agent_message_chunk")
|
||||
yield _serialize_event(opencode_event, "agent_message_chunk")
|
||||
|
||||
elif isinstance(acp_event, AgentThoughtChunk):
|
||||
text = _extract_text_from_content(acp_event.content)
|
||||
elif isinstance(opencode_event, OpenCodeAgentThoughtChunk):
|
||||
text = _extract_text_from_content(opencode_event.content)
|
||||
if text:
|
||||
state.add_thought_chunk(text)
|
||||
packet_logger.log(
|
||||
"agent_thought_chunk",
|
||||
acp_event.model_dump(mode="json", by_alias=True),
|
||||
opencode_event.model_dump(mode="json", by_alias=True),
|
||||
)
|
||||
packet_logger.log_sse_emit("agent_thought_chunk", session_id)
|
||||
yield _serialize_acp_event(acp_event, "agent_thought_chunk")
|
||||
yield _serialize_event(opencode_event, "agent_thought_chunk")
|
||||
|
||||
elif isinstance(acp_event, ToolCallStart):
|
||||
# Stream to frontend but don't save - wait for completion
|
||||
elif isinstance(opencode_event, OpenCodeToolCallStart):
|
||||
# Stream to frontend but don't save - wait for completion.
|
||||
packet_logger.log(
|
||||
"tool_call_start",
|
||||
acp_event.model_dump(mode="json", by_alias=True),
|
||||
opencode_event.model_dump(mode="json", by_alias=True),
|
||||
)
|
||||
packet_logger.log_sse_emit("tool_call_start", session_id)
|
||||
yield _serialize_acp_event(acp_event, "tool_call_start")
|
||||
yield _serialize_event(opencode_event, "tool_call_start")
|
||||
|
||||
elif isinstance(acp_event, ToolCallProgress):
|
||||
event_data = acp_event.model_dump(
|
||||
elif isinstance(opencode_event, OpenCodeToolCallProgress):
|
||||
event_data = opencode_event.model_dump(
|
||||
mode="json", by_alias=True, exclude_none=False
|
||||
)
|
||||
event_data["type"] = "tool_call_progress"
|
||||
event_data["timestamp"] = datetime.now(tz=timezone.utc).isoformat()
|
||||
event_data.setdefault(
|
||||
"timestamp", datetime.now(tz=timezone.utc).isoformat()
|
||||
)
|
||||
|
||||
# Check if this is a TodoWrite tool call
|
||||
tool_name = (event_data.get("title") or "").lower()
|
||||
is_todo_write = tool_name in ("todowrite", "todo_write")
|
||||
# Check if this is a TodoWrite tool call.
|
||||
tool_name = (
|
||||
event_data.get("tool_name")
|
||||
or event_data.get("toolName")
|
||||
or event_data.get("title")
|
||||
or ""
|
||||
)
|
||||
tool_name_lower = str(tool_name).lower()
|
||||
is_todo_write = tool_name_lower in ("todowrite", "todo_write")
|
||||
|
||||
# Check if this is a Task (subagent) tool call
|
||||
raw_input = event_data.get("rawInput") or {}
|
||||
# Check if this is a Task (subagent) tool call.
|
||||
raw_input_obj = (
|
||||
event_data.get("raw_input") or event_data.get("rawInput") or {}
|
||||
)
|
||||
raw_input = raw_input_obj if isinstance(raw_input_obj, dict) else {}
|
||||
is_task_tool = (
|
||||
tool_name == "task"
|
||||
tool_name_lower == "task"
|
||||
or raw_input.get("subagent_type") is not None
|
||||
or raw_input.get("subagentType") is not None
|
||||
)
|
||||
|
||||
tool_call_id = (
|
||||
event_data.get("tool_call_id")
|
||||
or event_data.get("toolCallId")
|
||||
or opencode_event.tool_call_id
|
||||
)
|
||||
tool_call_id_str = (
|
||||
tool_call_id if isinstance(tool_call_id, str) else None
|
||||
)
|
||||
if is_task_tool and tool_call_id_str:
|
||||
subagent_session_id = _extract_task_subagent_session_id(
|
||||
event_data
|
||||
)
|
||||
if subagent_session_id:
|
||||
existing_mapped = task_call_to_subagent_session.get(
|
||||
tool_call_id_str
|
||||
)
|
||||
if (
|
||||
existing_mapped is not None
|
||||
and existing_mapped != subagent_session_id
|
||||
):
|
||||
subagent_session_to_task_call.pop(existing_mapped, None)
|
||||
|
||||
task_call_to_subagent_session[tool_call_id_str] = (
|
||||
subagent_session_id
|
||||
)
|
||||
subagent_session_to_task_call[subagent_session_id] = (
|
||||
tool_call_id_str
|
||||
)
|
||||
yield from _flush_buffered_subagent_packets(
|
||||
subagent_session_id
|
||||
)
|
||||
|
||||
# Save to DB:
|
||||
# - For TodoWrite: Save every progress update (todos change frequently)
|
||||
# - For other tools: Only save when status="completed"
|
||||
if is_todo_write or acp_event.status == "completed":
|
||||
if is_todo_write or opencode_event.status == "completed":
|
||||
create_message(
|
||||
session_id=session_id,
|
||||
message_type=MessageType.ASSISTANT,
|
||||
@@ -1346,100 +1579,56 @@ class SessionManager:
|
||||
db_session=self._db_session,
|
||||
)
|
||||
|
||||
# For completed Task tools, also save the output as an agent_message
|
||||
# This allows the task output to be rendered as assistant text on reload
|
||||
if is_task_tool and acp_event.status == "completed":
|
||||
raw_output = event_data.get("rawOutput") or {}
|
||||
task_output = raw_output.get("output")
|
||||
if task_output and isinstance(task_output, str):
|
||||
# Strip task_metadata from the output
|
||||
metadata_idx = task_output.find("<task_metadata>")
|
||||
if metadata_idx >= 0:
|
||||
task_output = task_output[:metadata_idx].strip()
|
||||
|
||||
if task_output:
|
||||
# Create agent_message packet for the task output
|
||||
task_output_packet = {
|
||||
"type": "agent_message",
|
||||
"content": {"type": "text", "text": task_output},
|
||||
"source": "task_output",
|
||||
"timestamp": datetime.now(
|
||||
tz=timezone.utc
|
||||
).isoformat(),
|
||||
}
|
||||
create_message(
|
||||
session_id=session_id,
|
||||
message_type=MessageType.ASSISTANT,
|
||||
turn_index=state.turn_index,
|
||||
message_metadata=task_output_packet,
|
||||
db_session=self._db_session,
|
||||
)
|
||||
|
||||
# Log full event to packet logger (can handle large payloads)
|
||||
packet_logger.log("tool_call_progress", event_data)
|
||||
packet_logger.log_sse_emit("tool_call_progress", session_id)
|
||||
yield _serialize_acp_event(acp_event, "tool_call_progress")
|
||||
yield _serialize_event(opencode_event, "tool_call_progress")
|
||||
|
||||
elif isinstance(acp_event, AgentPlanUpdate):
|
||||
event_data = acp_event.model_dump(
|
||||
mode="json", by_alias=True, exclude_none=False
|
||||
)
|
||||
event_data["type"] = "agent_plan_update"
|
||||
event_data["timestamp"] = datetime.now(tz=timezone.utc).isoformat()
|
||||
|
||||
# Upsert plan immediately
|
||||
plan_msg = upsert_agent_plan(
|
||||
session_id=session_id,
|
||||
turn_index=state.turn_index,
|
||||
plan_metadata=event_data,
|
||||
db_session=self._db_session,
|
||||
existing_plan_id=state.plan_message_id,
|
||||
)
|
||||
state.plan_message_id = plan_msg.id
|
||||
|
||||
packet_logger.log("agent_plan_update", event_data)
|
||||
packet_logger.log_sse_emit("agent_plan_update", session_id)
|
||||
yield _serialize_acp_event(acp_event, "agent_plan_update")
|
||||
|
||||
elif isinstance(acp_event, CurrentModeUpdate):
|
||||
event_data = acp_event.model_dump(
|
||||
mode="json", by_alias=True, exclude_none=False
|
||||
)
|
||||
event_data["type"] = "current_mode_update"
|
||||
packet_logger.log("current_mode_update", event_data)
|
||||
packet_logger.log_sse_emit("current_mode_update", session_id)
|
||||
yield _serialize_acp_event(acp_event, "current_mode_update")
|
||||
|
||||
elif isinstance(acp_event, PromptResponse):
|
||||
event_data = acp_event.model_dump(
|
||||
elif isinstance(opencode_event, OpenCodePromptResponse):
|
||||
event_data = opencode_event.model_dump(
|
||||
mode="json", by_alias=True, exclude_none=False
|
||||
)
|
||||
event_data["type"] = "prompt_response"
|
||||
packet_logger.log("prompt_response", event_data)
|
||||
packet_logger.log_sse_emit("prompt_response", session_id)
|
||||
yield _serialize_acp_event(acp_event, "prompt_response")
|
||||
yield _serialize_event(opencode_event, "prompt_response")
|
||||
|
||||
elif isinstance(acp_event, ACPError):
|
||||
event_data = acp_event.model_dump(
|
||||
elif isinstance(opencode_event, OpenCodeError):
|
||||
event_data = opencode_event.model_dump(
|
||||
mode="json", by_alias=True, exclude_none=False
|
||||
)
|
||||
event_data["type"] = "error"
|
||||
packet_logger.log("error", event_data)
|
||||
packet_logger.log_sse_emit("error", session_id)
|
||||
yield _serialize_acp_event(acp_event, "error")
|
||||
yield _serialize_event(opencode_event, "error")
|
||||
|
||||
else:
|
||||
# Unrecognized packet type - log it but don't stream to frontend
|
||||
event_type_name = type(acp_event).__name__
|
||||
event_data = acp_event.model_dump(
|
||||
mode="json", by_alias=True, exclude_none=False
|
||||
)
|
||||
event_type_name = type(opencode_event).__name__
|
||||
if hasattr(opencode_event, "model_dump"):
|
||||
event_data = opencode_event.model_dump(
|
||||
mode="json", by_alias=True, exclude_none=False
|
||||
)
|
||||
else:
|
||||
event_data = {"raw": str(opencode_event)}
|
||||
event_data["type"] = f"unrecognized_{event_type_name.lower()}"
|
||||
packet_logger.log(
|
||||
f"unrecognized_{event_type_name.lower()}", event_data
|
||||
)
|
||||
|
||||
# Save all accumulated state at end of streaming
|
||||
if buffered_subagent_packets_by_session:
|
||||
packet_logger.log_raw(
|
||||
"DROPPED-UNMAPPED-SUBAGENT-PACKETS",
|
||||
{
|
||||
"session_id": str(session_id),
|
||||
"unmapped_subagent_session_ids": list(
|
||||
buffered_subagent_packets_by_session.keys()
|
||||
),
|
||||
"packet_counts": {
|
||||
sid: len(packets)
|
||||
for sid, packets in buffered_subagent_packets_by_session.items()
|
||||
},
|
||||
},
|
||||
)
|
||||
_save_build_turn(state)
|
||||
|
||||
# Log streaming completion
|
||||
@@ -1498,24 +1687,22 @@ class SessionManager:
|
||||
logger.exception("Unexpected error in build message streaming")
|
||||
yield _format_packet_event(error_packet)
|
||||
|
||||
def _get_event_type(self, acp_event: Any) -> str:
|
||||
"""Get the event type string for an ACP event."""
|
||||
if isinstance(acp_event, AgentMessageChunk):
|
||||
def _get_event_type(self, event: Any) -> str:
|
||||
"""Get the event type string for an OpenCode event."""
|
||||
if isinstance(event, OpenCodeAgentMessageChunk):
|
||||
return "agent_message_chunk"
|
||||
elif isinstance(acp_event, AgentThoughtChunk):
|
||||
elif isinstance(event, OpenCodeAgentThoughtChunk):
|
||||
return "agent_thought_chunk"
|
||||
elif isinstance(acp_event, ToolCallStart):
|
||||
elif isinstance(event, OpenCodeToolCallStart):
|
||||
return "tool_call_start"
|
||||
elif isinstance(acp_event, ToolCallProgress):
|
||||
elif isinstance(event, OpenCodeToolCallProgress):
|
||||
return "tool_call_progress"
|
||||
elif isinstance(acp_event, AgentPlanUpdate):
|
||||
return "agent_plan_update"
|
||||
elif isinstance(acp_event, CurrentModeUpdate):
|
||||
return "current_mode_update"
|
||||
elif isinstance(acp_event, PromptResponse):
|
||||
elif isinstance(event, OpenCodePromptResponse):
|
||||
return "prompt_response"
|
||||
elif isinstance(acp_event, ACPError):
|
||||
elif isinstance(event, OpenCodeError):
|
||||
return "error"
|
||||
elif isinstance(event, OpenCodeSessionEstablished):
|
||||
return "opencode_session_established"
|
||||
return "unknown"
|
||||
|
||||
# =========================================================================
|
||||
|
||||
@@ -27,11 +27,14 @@ from onyx.server.features.build.configs import SANDBOX_BACKEND
|
||||
from onyx.server.features.build.configs import SANDBOX_NAMESPACE
|
||||
from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_START
|
||||
from onyx.server.features.build.configs import SandboxBackend
|
||||
from onyx.server.features.build.sandbox.base import ACPEvent
|
||||
from onyx.server.features.build.sandbox.kubernetes.kubernetes_sandbox_manager import (
|
||||
KubernetesSandboxManager,
|
||||
)
|
||||
from onyx.server.features.build.sandbox.models import LLMProviderConfig
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeAgentMessageChunk
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeError
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodeEvent
|
||||
from onyx.server.features.build.sandbox.opencode import OpenCodePromptResponse
|
||||
from onyx.utils.logger import setup_logger
|
||||
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
|
||||
|
||||
@@ -309,12 +312,9 @@ def test_kubernetes_sandbox_send_message() -> None:
|
||||
This test:
|
||||
1. Creates a sandbox pod
|
||||
2. Sends a simple message via send_message()
|
||||
3. Verifies we receive ACP events back (agent responses)
|
||||
3. Verifies we receive streamed OpenCode events back (agent responses)
|
||||
4. Cleans up by terminating the sandbox
|
||||
"""
|
||||
from acp.schema import AgentMessageChunk
|
||||
from acp.schema import Error
|
||||
from acp.schema import PromptResponse
|
||||
|
||||
_is_kubernetes_available()
|
||||
|
||||
@@ -365,8 +365,13 @@ def test_kubernetes_sandbox_send_message() -> None:
|
||||
)
|
||||
|
||||
# Send a simple message
|
||||
events: list[ACPEvent] = []
|
||||
for event in manager.send_message(sandbox_id, session_id, "What is 2 + 2?"):
|
||||
events: list[OpenCodeEvent] = []
|
||||
for event in manager.send_message(
|
||||
sandbox_id,
|
||||
session_id,
|
||||
SANDBOX_NEXTJS_PORT_START,
|
||||
"What is 2 + 2?",
|
||||
):
|
||||
events.append(event)
|
||||
|
||||
# Verify we received events
|
||||
@@ -376,16 +381,16 @@ def test_kubernetes_sandbox_send_message() -> None:
|
||||
print(f"Recieved event: {event}")
|
||||
|
||||
# Check for errors
|
||||
errors = [e for e in events if isinstance(e, Error)]
|
||||
errors = [e for e in events if isinstance(e, OpenCodeError)]
|
||||
assert len(errors) == 0, f"Should not receive errors: {errors}"
|
||||
|
||||
# Verify we received some agent message content or a final response
|
||||
message_chunks = [e for e in events if isinstance(e, AgentMessageChunk)]
|
||||
prompt_responses = [e for e in events if isinstance(e, PromptResponse)]
|
||||
message_chunks = [e for e in events if isinstance(e, OpenCodeAgentMessageChunk)]
|
||||
prompt_responses = [e for e in events if isinstance(e, OpenCodePromptResponse)]
|
||||
|
||||
assert (
|
||||
len(message_chunks) > 0 or len(prompt_responses) > 0
|
||||
), "Should receive either AgentMessageChunk or PromptResponse events"
|
||||
), "Should receive either agent message chunks or prompt response events"
|
||||
|
||||
# If we got a PromptResponse, verify it completed successfully
|
||||
if prompt_responses:
|
||||
|
||||
1
hello_test_opencode.txt
Normal file
1
hello_test_opencode.txt
Normal file
@@ -0,0 +1 @@
|
||||
hello
|
||||
@@ -19,7 +19,17 @@ import {
|
||||
} from "@opal/icons";
|
||||
import RawOutputBlock from "@/app/craft/components/RawOutputBlock";
|
||||
import DiffView from "@/app/craft/components/DiffView";
|
||||
import { ToolCallState, ToolCallKind } from "@/app/craft/types/displayTypes";
|
||||
import TextChunk from "@/app/craft/components/TextChunk";
|
||||
import ThinkingCard from "@/app/craft/components/ThinkingCard";
|
||||
import TodoListCard from "@/app/craft/components/TodoListCard";
|
||||
import WorkingPill from "@/app/craft/components/WorkingPill";
|
||||
import {
|
||||
ToolCallState,
|
||||
ToolCallKind,
|
||||
StreamItem,
|
||||
GroupedStreamItem,
|
||||
} from "@/app/craft/types/displayTypes";
|
||||
import { isWorkingToolCall } from "@/app/craft/utils/streamItemHelpers";
|
||||
|
||||
interface ToolCallPillProps {
|
||||
toolCall: ToolCallState;
|
||||
@@ -113,6 +123,35 @@ function getLanguageHint(toolCall: ToolCallState): string | undefined {
|
||||
}
|
||||
}
|
||||
|
||||
function groupSubagentStreamItems(items: StreamItem[]): GroupedStreamItem[] {
|
||||
const grouped: GroupedStreamItem[] = [];
|
||||
let currentWorkingGroup: ToolCallState[] = [];
|
||||
|
||||
const flushWorkingGroup = () => {
|
||||
const firstToolCall = currentWorkingGroup[0];
|
||||
if (firstToolCall) {
|
||||
grouped.push({
|
||||
type: "working_group",
|
||||
id: `subagent-working-${firstToolCall.id}`,
|
||||
toolCalls: [...currentWorkingGroup],
|
||||
});
|
||||
currentWorkingGroup = [];
|
||||
}
|
||||
};
|
||||
|
||||
for (const item of items) {
|
||||
if (item.type === "tool_call" && isWorkingToolCall(item.toolCall)) {
|
||||
currentWorkingGroup.push(item.toolCall);
|
||||
} else {
|
||||
flushWorkingGroup();
|
||||
grouped.push(item as GroupedStreamItem);
|
||||
}
|
||||
}
|
||||
|
||||
flushWorkingGroup();
|
||||
return grouped;
|
||||
}
|
||||
|
||||
/**
|
||||
* ToolCallPill - Expandable pill for tool calls
|
||||
*
|
||||
@@ -131,6 +170,12 @@ export default function ToolCallPill({ toolCall }: ToolCallPillProps) {
|
||||
const Icon = getToolIcon(toolCall.kind);
|
||||
const statusDisplay = getStatusDisplay(toolCall.status);
|
||||
const StatusIcon = statusDisplay.icon;
|
||||
const groupedSubagentItems = toolCall.subagentStreamItems
|
||||
? groupSubagentStreamItems(toolCall.subagentStreamItems)
|
||||
: [];
|
||||
const lastSubagentWorkingGroupIndex = groupedSubagentItems.findLastIndex(
|
||||
(item) => item.type === "working_group"
|
||||
);
|
||||
|
||||
return (
|
||||
<Collapsible open={isOpen} onOpenChange={setIsOpen}>
|
||||
@@ -198,10 +243,75 @@ export default function ToolCallPill({ toolCall }: ToolCallPillProps) {
|
||||
|
||||
<CollapsibleContent>
|
||||
<div className="px-3 pb-3 pt-0">
|
||||
{/* Show diff view for edit operations (not new files) */}
|
||||
{toolCall.title === "Editing file" &&
|
||||
toolCall.oldContent !== undefined &&
|
||||
toolCall.newContent !== undefined ? (
|
||||
{toolCall.kind === "task" && groupedSubagentItems.length > 0 ? (
|
||||
<div className="flex flex-col gap-3">
|
||||
<div className="text-xs font-medium text-text-03 uppercase tracking-wide">
|
||||
Subagent Activity
|
||||
</div>
|
||||
<div className="flex flex-col gap-2">
|
||||
{groupedSubagentItems.map((item, index) => {
|
||||
switch (item.type) {
|
||||
case "text":
|
||||
return (
|
||||
<TextChunk key={item.id} content={item.content} />
|
||||
);
|
||||
case "thinking":
|
||||
return (
|
||||
<ThinkingCard
|
||||
key={item.id}
|
||||
content={item.content}
|
||||
isStreaming={false}
|
||||
/>
|
||||
);
|
||||
case "todo_list":
|
||||
return (
|
||||
<TodoListCard
|
||||
key={item.id}
|
||||
todoList={item.todoList}
|
||||
defaultOpen={false}
|
||||
/>
|
||||
);
|
||||
case "working_group":
|
||||
return (
|
||||
<WorkingPill
|
||||
key={item.id}
|
||||
toolCalls={item.toolCalls}
|
||||
isLatest={index === lastSubagentWorkingGroupIndex}
|
||||
/>
|
||||
);
|
||||
case "tool_call":
|
||||
return (
|
||||
<div
|
||||
key={item.id}
|
||||
className="rounded-md border border-border-01 bg-background-neutral-01 p-2"
|
||||
>
|
||||
<div className="text-sm font-medium text-text-04">
|
||||
{item.toolCall.title}
|
||||
</div>
|
||||
{item.toolCall.description && (
|
||||
<div className="text-sm text-text-03">
|
||||
{item.toolCall.description}
|
||||
</div>
|
||||
)}
|
||||
{item.toolCall.rawOutput && (
|
||||
<div className="pt-2">
|
||||
<RawOutputBlock
|
||||
content={item.toolCall.rawOutput}
|
||||
maxHeight="220px"
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
})}
|
||||
</div>
|
||||
</div>
|
||||
) : toolCall.title === "Editing file" &&
|
||||
toolCall.oldContent !== undefined &&
|
||||
toolCall.newContent !== undefined ? (
|
||||
<DiffView
|
||||
oldContent={toolCall.oldContent}
|
||||
newContent={toolCall.newContent}
|
||||
|
||||
@@ -40,6 +40,7 @@ import {
|
||||
|
||||
import { genId } from "@/app/craft/utils/streamItemHelpers";
|
||||
import { parsePacket } from "@/app/craft/utils/parsePacket";
|
||||
import { convertSubagentPacketDataToStreamItems } from "@/app/craft/utils/subagentStreamItems";
|
||||
|
||||
/**
|
||||
* Convert loaded messages (with message_metadata) to StreamItem[] format.
|
||||
@@ -49,12 +50,17 @@ import { parsePacket } from "@/app/craft/utils/parsePacket";
|
||||
* - agent_message: {type: "agent_message", content: {type: "text", text: "..."}}
|
||||
* - agent_thought: {type: "agent_thought", content: {type: "text", text: "..."}}
|
||||
* - tool_call_progress: Full tool call data with status="completed"
|
||||
* - subagent_packet: Incremental packet routed to a parent task tool call
|
||||
* - agent_plan_update: Plan entries (not rendered as stream items)
|
||||
*
|
||||
* This function converts assistant messages to StreamItem[] for rendering.
|
||||
*/
|
||||
function convertMessagesToStreamItems(messages: BuildMessage[]): StreamItem[] {
|
||||
const items: StreamItem[] = [];
|
||||
const pendingSubagentPacketsByToolCall = new Map<
|
||||
string,
|
||||
Record<string, unknown>[]
|
||||
>();
|
||||
|
||||
for (const message of messages) {
|
||||
if (message.type === "user") continue;
|
||||
@@ -116,6 +122,16 @@ function convertMessagesToStreamItems(messages: BuildMessage[]): StreamItem[] {
|
||||
});
|
||||
}
|
||||
} else {
|
||||
const pendingSubagentPacketData =
|
||||
pendingSubagentPacketsByToolCall.get(packet.toolCallId) ?? [];
|
||||
const subagentPacketData =
|
||||
packet.subagentPacketData.length > 0
|
||||
? packet.subagentPacketData
|
||||
: pendingSubagentPacketData;
|
||||
if (subagentPacketData.length > 0) {
|
||||
pendingSubagentPacketsByToolCall.delete(packet.toolCallId);
|
||||
}
|
||||
|
||||
items.push({
|
||||
type: "tool_call",
|
||||
id: packet.toolCallId,
|
||||
@@ -128,6 +144,13 @@ function convertMessagesToStreamItems(messages: BuildMessage[]): StreamItem[] {
|
||||
status: packet.status,
|
||||
rawOutput: packet.rawOutput,
|
||||
subagentType: packet.subagentType ?? undefined,
|
||||
subagentSessionId: packet.subagentSessionId ?? undefined,
|
||||
subagentPacketData:
|
||||
subagentPacketData.length > 0 ? subagentPacketData : undefined,
|
||||
subagentStreamItems:
|
||||
subagentPacketData.length > 0
|
||||
? convertSubagentPacketDataToStreamItems(subagentPacketData)
|
||||
: undefined,
|
||||
isNewFile: packet.isNewFile,
|
||||
oldContent: packet.oldContent,
|
||||
newContent: packet.newContent,
|
||||
@@ -136,6 +159,46 @@ function convertMessagesToStreamItems(messages: BuildMessage[]): StreamItem[] {
|
||||
}
|
||||
break;
|
||||
|
||||
case "subagent_packet":
|
||||
if (!packet.parentToolCallId || !packet.packetData) break;
|
||||
|
||||
// Update existing task tool card if present.
|
||||
const taskToolIndex = items.findIndex(
|
||||
(item) =>
|
||||
item.type === "tool_call" &&
|
||||
item.toolCall.id === packet.parentToolCallId
|
||||
);
|
||||
if (taskToolIndex >= 0) {
|
||||
const existingItem = items[taskToolIndex];
|
||||
if (existingItem && existingItem.type === "tool_call") {
|
||||
const existingPacketData =
|
||||
existingItem.toolCall.subagentPacketData ?? [];
|
||||
const nextPacketData = [...existingPacketData, packet.packetData];
|
||||
items[taskToolIndex] = {
|
||||
...existingItem,
|
||||
toolCall: {
|
||||
...existingItem.toolCall,
|
||||
subagentSessionId:
|
||||
packet.subagentSessionId ??
|
||||
existingItem.toolCall.subagentSessionId,
|
||||
subagentPacketData: nextPacketData,
|
||||
subagentStreamItems:
|
||||
convertSubagentPacketDataToStreamItems(nextPacketData),
|
||||
},
|
||||
};
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// If the task pill hasn't been created yet, cache until tool_call_progress arrives.
|
||||
const existingPending =
|
||||
pendingSubagentPacketsByToolCall.get(packet.parentToolCallId) ?? [];
|
||||
pendingSubagentPacketsByToolCall.set(packet.parentToolCallId, [
|
||||
...existingPending,
|
||||
packet.packetData,
|
||||
]);
|
||||
break;
|
||||
|
||||
// agent_plan_update and other packet types are not rendered as stream items
|
||||
default:
|
||||
break;
|
||||
|
||||
@@ -21,6 +21,7 @@ import { StreamItem } from "@/app/craft/types/displayTypes";
|
||||
|
||||
import { genId } from "@/app/craft/utils/streamItemHelpers";
|
||||
import { parsePacket } from "@/app/craft/utils/parsePacket";
|
||||
import { convertSubagentPacketDataToStreamItems } from "@/app/craft/utils/subagentStreamItems";
|
||||
|
||||
/**
|
||||
* Hook for handling message streaming in build sessions.
|
||||
@@ -239,6 +240,8 @@ export function useBuildStreaming() {
|
||||
command: "",
|
||||
rawOutput: "",
|
||||
subagentType: undefined,
|
||||
subagentPacketData: undefined,
|
||||
subagentStreamItems: undefined,
|
||||
isNewFile: true,
|
||||
oldContent: "",
|
||||
newContent: "",
|
||||
@@ -266,6 +269,13 @@ export function useBuildStreaming() {
|
||||
command: parsed.command,
|
||||
rawOutput: parsed.rawOutput,
|
||||
subagentType: parsed.subagentType ?? undefined,
|
||||
subagentSessionId: parsed.subagentSessionId ?? undefined,
|
||||
...(parsed.subagentPacketData.length > 0 && {
|
||||
subagentPacketData: parsed.subagentPacketData,
|
||||
subagentStreamItems: convertSubagentPacketDataToStreamItems(
|
||||
parsed.subagentPacketData
|
||||
),
|
||||
}),
|
||||
...(parsed.kind === "edit" && {
|
||||
isNewFile: parsed.isNewFile,
|
||||
oldContent: parsed.oldContent,
|
||||
@@ -282,18 +292,37 @@ export function useBuildStreaming() {
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// Task completion → emit text StreamItem
|
||||
if (parsed.taskOutput) {
|
||||
appendStreamItem(sessionId, {
|
||||
type: "text",
|
||||
id: genId("task-output"),
|
||||
content: parsed.taskOutput,
|
||||
isStreaming: false,
|
||||
});
|
||||
lastItemType = "text";
|
||||
accumulatedText = "";
|
||||
}
|
||||
// Subagent packet - append to the parent task card stream
|
||||
case "subagent_packet": {
|
||||
if (!parsed.parentToolCallId || !parsed.packetData) break;
|
||||
|
||||
const liveSession = useBuildSessionStore
|
||||
.getState()
|
||||
.sessions.get(sessionId);
|
||||
if (!liveSession) break;
|
||||
|
||||
const taskToolItem = liveSession.streamItems.find(
|
||||
(item) =>
|
||||
item.type === "tool_call" &&
|
||||
item.toolCall.id === parsed.parentToolCallId
|
||||
);
|
||||
if (!taskToolItem || taskToolItem.type !== "tool_call") break;
|
||||
|
||||
const existingPacketData =
|
||||
taskToolItem.toolCall.subagentPacketData ?? [];
|
||||
const nextPacketData = [...existingPacketData, parsed.packetData];
|
||||
|
||||
updateToolCallStreamItem(sessionId, parsed.parentToolCallId, {
|
||||
subagentSessionId:
|
||||
parsed.subagentSessionId ??
|
||||
taskToolItem.toolCall.subagentSessionId,
|
||||
subagentPacketData: nextPacketData,
|
||||
subagentStreamItems:
|
||||
convertSubagentPacketDataToStreamItems(nextPacketData),
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
@@ -53,6 +53,12 @@ export interface ToolCallState {
|
||||
rawOutput: string; // Full output for expanded view
|
||||
/** For task tool calls: the subagent type (e.g., "explore", "plan") */
|
||||
subagentType?: string;
|
||||
/** For task tool calls: OpenCode session id used by the subagent */
|
||||
subagentSessionId?: string;
|
||||
/** For task tool calls: raw packet history used to build subagentStreamItems */
|
||||
subagentPacketData?: Record<string, unknown>[];
|
||||
/** For task tool calls: live stream items from the subagent session */
|
||||
subagentStreamItems?: StreamItem[];
|
||||
/** For edit operations: whether this is a new file (write) or edit of existing */
|
||||
isNewFile?: boolean;
|
||||
/** For edit operations: the old content before the edit (empty for new files) */
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
/**
|
||||
* Packet Types
|
||||
*
|
||||
* Type definitions for raw and parsed ACP packets.
|
||||
* Type definitions for raw and parsed streamed packets.
|
||||
* Centralizes all snake_case / camelCase field resolution.
|
||||
* Defines the ParsedPacket discriminated union consumed by both
|
||||
* useBuildStreaming (live SSE) and useBuildSessionStore (DB reload).
|
||||
@@ -99,14 +99,21 @@ export interface ParsedToolCallProgress {
|
||||
rawOutput: string;
|
||||
filePath: string; // Session-relative
|
||||
subagentType: string | null;
|
||||
subagentSessionId: string | null;
|
||||
subagentPacketData: Record<string, unknown>[];
|
||||
// Edit-specific
|
||||
isNewFile: boolean;
|
||||
oldContent: string;
|
||||
newContent: string;
|
||||
// Todo-specific
|
||||
todos: TodoItem[];
|
||||
// Task-specific
|
||||
taskOutput: string | null;
|
||||
}
|
||||
|
||||
export interface ParsedSubagentPacket {
|
||||
type: "subagent_packet";
|
||||
parentToolCallId: string;
|
||||
subagentSessionId: string | null;
|
||||
packetData: Record<string, unknown> | null;
|
||||
}
|
||||
|
||||
export interface ParsedPromptResponse {
|
||||
@@ -138,6 +145,7 @@ export type ParsedPacket =
|
||||
| ParsedThinkingChunk
|
||||
| ParsedToolCallStart
|
||||
| ParsedToolCallProgress
|
||||
| ParsedSubagentPacket
|
||||
| ParsedPromptResponse
|
||||
| ParsedArtifact
|
||||
| ParsedError
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
/**
|
||||
* Parse Packet
|
||||
*
|
||||
* Single entry point for converting raw ACP packets into strongly-typed
|
||||
* Single entry point for converting raw streamed packets into strongly-typed
|
||||
* ParsedPacket values. All field resolution, tool detection, and path
|
||||
* sanitization happen here. Consumers never touch Record<string, unknown>.
|
||||
*/
|
||||
@@ -15,6 +15,7 @@ import {
|
||||
type ParsedPacket,
|
||||
type ParsedToolCallStart,
|
||||
type ParsedToolCallProgress,
|
||||
type ParsedSubagentPacket,
|
||||
type ParsedArtifact,
|
||||
type ToolName,
|
||||
type ToolKind,
|
||||
@@ -42,6 +43,9 @@ export function parsePacket(raw: unknown): ParsedPacket {
|
||||
case "tool_call_progress":
|
||||
return parseToolCallProgress(p);
|
||||
|
||||
case "subagent_packet":
|
||||
return parseSubagentPacket(p);
|
||||
|
||||
case "prompt_response":
|
||||
return { type: "prompt_response" };
|
||||
|
||||
@@ -131,7 +135,7 @@ function resolveKind(toolName: ToolName, rawKind: string | null): ToolKind {
|
||||
|
||||
// ─── Shared Helpers ───────────────────────────────────────────────
|
||||
|
||||
/** Extract text from ACP content structure (string, {type,text}, or array) */
|
||||
/** Extract text from content structure (string, {type,text}, or array) */
|
||||
function extractText(content: unknown): string {
|
||||
if (!content) return "";
|
||||
if (typeof content === "string") return content;
|
||||
@@ -398,16 +402,39 @@ function normalizeTodoStatus(status: unknown): TodoStatus {
|
||||
return "pending";
|
||||
}
|
||||
|
||||
// ─── Task Output Extraction ──────────────────────────────────────
|
||||
|
||||
function extractTaskOutput(ro: Record<string, unknown> | null): string | null {
|
||||
if (!ro?.output || typeof ro.output !== "string") return null;
|
||||
return (
|
||||
ro.output.replace(/<task_metadata>[\s\S]*?<\/task_metadata>/g, "").trim() ||
|
||||
null
|
||||
// ─── Task Subagent Packet Extraction ─────────────────────────────
|
||||
function extractSubagentPacketData(
|
||||
ro: Record<string, unknown> | null
|
||||
): Record<string, unknown>[] {
|
||||
const raw = ro?.subagent_packets ?? ro?.subagentPackets ?? null;
|
||||
if (!Array.isArray(raw)) return [];
|
||||
return raw.filter(
|
||||
(item): item is Record<string, unknown> =>
|
||||
!!item && typeof item === "object"
|
||||
);
|
||||
}
|
||||
|
||||
function parseSubagentPacket(p: Record<string, unknown>): ParsedSubagentPacket {
|
||||
const parentToolCallId = (p.parent_tool_call_id ??
|
||||
p.parentToolCallId ??
|
||||
"") as string | undefined;
|
||||
const subagentSessionId = (p.subagent_session_id ??
|
||||
p.subagentSessionId ??
|
||||
null) as string | null;
|
||||
const packetRaw = p.packet;
|
||||
const packetData =
|
||||
packetRaw && typeof packetRaw === "object"
|
||||
? (packetRaw as Record<string, unknown>)
|
||||
: null;
|
||||
|
||||
return {
|
||||
type: "subagent_packet",
|
||||
parentToolCallId: parentToolCallId || "",
|
||||
subagentSessionId,
|
||||
packetData,
|
||||
};
|
||||
}
|
||||
|
||||
// ─── Artifact Parsing ─────────────────────────────────────────────
|
||||
|
||||
function parseArtifact(p: Record<string, unknown>): ParsedArtifact {
|
||||
@@ -505,10 +532,16 @@ function parseToolCallProgress(
|
||||
const subagentType = (ri?.subagent_type ?? ri?.subagentType ?? null) as
|
||||
| string
|
||||
| null;
|
||||
const taskOutput =
|
||||
toolName === "task" && status === "completed"
|
||||
? extractTaskOutput(ro)
|
||||
: null;
|
||||
const rawMetadata = (ro?.metadata ?? null) as Record<string, unknown> | null;
|
||||
const subagentSessionId = (ro?.sessionId ??
|
||||
ro?.sessionID ??
|
||||
ro?.session_id ??
|
||||
rawMetadata?.sessionId ??
|
||||
rawMetadata?.sessionID ??
|
||||
rawMetadata?.session_id ??
|
||||
null) as string | null;
|
||||
const subagentPacketData =
|
||||
toolName === "task" ? extractSubagentPacketData(ro) : [];
|
||||
|
||||
return {
|
||||
type: "tool_call_progress",
|
||||
@@ -523,6 +556,8 @@ function parseToolCallProgress(
|
||||
rawOutput,
|
||||
filePath,
|
||||
subagentType,
|
||||
subagentSessionId,
|
||||
subagentPacketData,
|
||||
isNewFile:
|
||||
diffData.oldText || diffData.newText
|
||||
? diffData.isNewFile
|
||||
@@ -530,6 +565,5 @@ function parseToolCallProgress(
|
||||
oldContent: diffData.oldText,
|
||||
newContent: diffData.newText,
|
||||
todos,
|
||||
taskOutput,
|
||||
};
|
||||
}
|
||||
|
||||
191
web/src/app/craft/utils/subagentStreamItems.ts
Normal file
191
web/src/app/craft/utils/subagentStreamItems.ts
Normal file
@@ -0,0 +1,191 @@
|
||||
"use client";
|
||||
|
||||
import {
|
||||
StreamItem,
|
||||
ToolCallState,
|
||||
TodoListState,
|
||||
} from "../types/displayTypes";
|
||||
import { parsePacket } from "./parsePacket";
|
||||
import { genId } from "./streamItemHelpers";
|
||||
|
||||
const MAX_SUBAGENT_DEPTH = 3;
|
||||
|
||||
function upsertToolCall(
|
||||
items: StreamItem[],
|
||||
toolCallId: string,
|
||||
updates: Partial<ToolCallState>
|
||||
) {
|
||||
const existingIndex = items.findIndex(
|
||||
(item) => item.type === "tool_call" && item.toolCall.id === toolCallId
|
||||
);
|
||||
|
||||
if (existingIndex >= 0) {
|
||||
const existing = items[existingIndex];
|
||||
if (existing && existing.type === "tool_call") {
|
||||
items[existingIndex] = {
|
||||
...existing,
|
||||
toolCall: {
|
||||
...existing.toolCall,
|
||||
...updates,
|
||||
},
|
||||
};
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
items.push({
|
||||
type: "tool_call",
|
||||
id: toolCallId,
|
||||
toolCall: {
|
||||
id: toolCallId,
|
||||
kind: updates.kind || "other",
|
||||
title: updates.title || "Running tool",
|
||||
description: updates.description || "",
|
||||
command: updates.command || "",
|
||||
status: updates.status || "pending",
|
||||
rawOutput: updates.rawOutput || "",
|
||||
subagentType: updates.subagentType,
|
||||
subagentSessionId: updates.subagentSessionId,
|
||||
subagentStreamItems: updates.subagentStreamItems,
|
||||
isNewFile: updates.isNewFile ?? true,
|
||||
oldContent: updates.oldContent || "",
|
||||
newContent: updates.newContent || "",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function upsertTodoList(
|
||||
items: StreamItem[],
|
||||
todoListId: string,
|
||||
todoList: TodoListState
|
||||
) {
|
||||
const existingIndex = items.findIndex(
|
||||
(item) => item.type === "todo_list" && item.todoList.id === todoListId
|
||||
);
|
||||
if (existingIndex >= 0) {
|
||||
const existing = items[existingIndex];
|
||||
if (existing && existing.type === "todo_list") {
|
||||
items[existingIndex] = {
|
||||
...existing,
|
||||
todoList: {
|
||||
...existing.todoList,
|
||||
...todoList,
|
||||
},
|
||||
};
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
items.push({
|
||||
type: "todo_list",
|
||||
id: todoListId,
|
||||
todoList,
|
||||
});
|
||||
}
|
||||
|
||||
export function convertSubagentPacketDataToStreamItems(
|
||||
packetData: Record<string, unknown>[],
|
||||
depth = 0
|
||||
): StreamItem[] {
|
||||
if (depth >= MAX_SUBAGENT_DEPTH) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const items: StreamItem[] = [];
|
||||
|
||||
for (const rawPacket of packetData) {
|
||||
const parsed = parsePacket(rawPacket);
|
||||
|
||||
switch (parsed.type) {
|
||||
case "text_chunk": {
|
||||
if (!parsed.text) break;
|
||||
const lastItem = items[items.length - 1];
|
||||
if (lastItem && lastItem.type === "text") {
|
||||
lastItem.content += parsed.text;
|
||||
} else {
|
||||
items.push({
|
||||
type: "text",
|
||||
id: genId("subagent-text"),
|
||||
content: parsed.text,
|
||||
isStreaming: false,
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case "thinking_chunk": {
|
||||
if (!parsed.text) break;
|
||||
const lastItem = items[items.length - 1];
|
||||
if (lastItem && lastItem.type === "thinking") {
|
||||
lastItem.content += parsed.text;
|
||||
} else {
|
||||
items.push({
|
||||
type: "thinking",
|
||||
id: genId("subagent-thinking"),
|
||||
content: parsed.text,
|
||||
isStreaming: false,
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case "tool_call_start": {
|
||||
if (parsed.isTodo) break;
|
||||
upsertToolCall(items, parsed.toolCallId, {
|
||||
id: parsed.toolCallId,
|
||||
kind: parsed.kind,
|
||||
status: "pending",
|
||||
title: "",
|
||||
description: "",
|
||||
command: "",
|
||||
rawOutput: "",
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
case "tool_call_progress": {
|
||||
if (parsed.isTodo) {
|
||||
upsertTodoList(items, parsed.toolCallId, {
|
||||
id: parsed.toolCallId,
|
||||
todos: parsed.todos,
|
||||
isOpen: false,
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
upsertToolCall(items, parsed.toolCallId, {
|
||||
id: parsed.toolCallId,
|
||||
kind: parsed.kind,
|
||||
status: parsed.status,
|
||||
title: parsed.title,
|
||||
description: parsed.description,
|
||||
command: parsed.command,
|
||||
rawOutput: parsed.rawOutput,
|
||||
subagentType: parsed.subagentType ?? undefined,
|
||||
subagentSessionId: parsed.subagentSessionId ?? undefined,
|
||||
subagentStreamItems:
|
||||
parsed.subagentPacketData.length > 0
|
||||
? convertSubagentPacketDataToStreamItems(
|
||||
parsed.subagentPacketData,
|
||||
depth + 1
|
||||
)
|
||||
: undefined,
|
||||
isNewFile: parsed.isNewFile,
|
||||
oldContent: parsed.oldContent,
|
||||
newContent: parsed.newContent,
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
case "subagent_packet":
|
||||
case "prompt_response":
|
||||
case "artifact_created":
|
||||
case "error":
|
||||
case "unknown":
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return items;
|
||||
}
|
||||
Reference in New Issue
Block a user