Compare commits

...

2 Commits

Author SHA1 Message Date
rohoswagger
86945782cb reorganizing 2026-02-06 10:26:12 -08:00
rohoswagger
052f6bebc8 fix(craft): snapshot restore 2026-02-06 10:09:20 -08:00
4 changed files with 241 additions and 169 deletions

View File

@@ -32,6 +32,8 @@ from onyx.server.features.build.api.models import SuggestionBubble
from onyx.server.features.build.api.models import SuggestionTheme
from onyx.server.features.build.api.models import UploadResponse
from onyx.server.features.build.api.models import WebappInfo
from onyx.server.features.build.configs import SANDBOX_BACKEND
from onyx.server.features.build.configs import SandboxBackend
from onyx.server.features.build.db.build_session import allocate_nextjs_port
from onyx.server.features.build.db.build_session import get_build_session
from onyx.server.features.build.db.sandbox import get_latest_snapshot_for_session
@@ -430,8 +432,11 @@ def restore_session(
# 2. Check if session workspace needs to be loaded
if sandbox.status == SandboxStatus.RUNNING:
if not sandbox_manager.session_workspace_exists(sandbox.id, session_id):
# Get latest snapshot and restore it
snapshot = get_latest_snapshot_for_session(db_session, session_id)
# Only Kubernetes backend supports snapshot restoration
snapshot = None
if SANDBOX_BACKEND == SandboxBackend.KUBERNETES:
snapshot = get_latest_snapshot_for_session(db_session, session_id)
if snapshot:
# Allocate a new port for the restored session
new_port = allocate_nextjs_port(db_session)
@@ -443,6 +448,9 @@ def restore_session(
f"from {snapshot.storage_path} with port {new_port}"
)
# Get LLM config for regenerating session config files
llm_config = session_manager._get_llm_config(None, None)
try:
sandbox_manager.restore_snapshot(
sandbox_id=sandbox.id,
@@ -450,6 +458,8 @@ def restore_session(
snapshot_storage_path=snapshot.storage_path,
tenant_id=tenant_id,
nextjs_port=new_port,
llm_config=llm_config,
use_demo_data=session.demo_data_enabled,
)
except Exception as e:
# Clear the port allocation on failure so it can be reused
@@ -461,7 +471,7 @@ def restore_session(
db_session.commit()
raise
else:
# No snapshot - set up fresh workspace
# No snapshot or local backend - set up fresh workspace
logger.info(
f"No snapshot found for session {session_id}, "
f"setting up fresh workspace"

View File

@@ -183,13 +183,14 @@ class SandboxManager(ABC):
session_id: UUID,
tenant_id: str,
) -> SnapshotResult | None:
"""Create a snapshot of a session's outputs directory.
"""Create a snapshot of a session's outputs and attachments directories.
Captures only the session-specific outputs:
sessions/$session_id/outputs/
Captures session-specific user data:
- sessions/$session_id/outputs/ (generated artifacts, web apps)
- sessions/$session_id/attachments/ (user uploaded files)
Does NOT include: venv, skills, AGENTS.md, opencode.json, attachments
Does NOT include: shared files/ directory
Does NOT include: venv, skills, AGENTS.md, opencode.json, files symlink
(these are regenerated during restore)
Args:
sandbox_id: The sandbox ID
@@ -197,14 +198,45 @@ class SandboxManager(ABC):
tenant_id: Tenant identifier for storage path
Returns:
SnapshotResult with storage path and size, or None if
snapshots are disabled for this backend
SnapshotResult with storage path and size, or None if:
- Snapshots are disabled for this backend
- No outputs or attachments exist to snapshot
Raises:
RuntimeError: If snapshot creation fails
"""
...
@abstractmethod
def restore_snapshot(
self,
sandbox_id: UUID,
session_id: UUID,
snapshot_storage_path: str,
tenant_id: str,
nextjs_port: int,
llm_config: LLMProviderConfig,
use_demo_data: bool = False,
) -> None:
"""Restore a session workspace from a snapshot.
For Kubernetes: Downloads and extracts the snapshot, regenerates config files.
For Local: No-op since workspaces persist on disk (no snapshots).
Args:
sandbox_id: The sandbox ID
session_id: The session ID to restore
snapshot_storage_path: Path to the snapshot in storage
tenant_id: Tenant identifier for storage access
nextjs_port: Port number for the NextJS dev server
llm_config: LLM provider configuration for opencode.json
use_demo_data: If True, symlink files/ to demo data
Raises:
RuntimeError: If snapshot restoration fails
"""
...
@abstractmethod
def session_workspace_exists(
self,
@@ -225,36 +257,6 @@ class SandboxManager(ABC):
"""
...
@abstractmethod
def restore_snapshot(
self,
sandbox_id: UUID,
session_id: UUID,
snapshot_storage_path: str,
tenant_id: str,
nextjs_port: int,
) -> None:
"""Restore a snapshot into a session's workspace directory.
Downloads the snapshot from storage, extracts it into
sessions/$session_id/outputs/, and starts the NextJS server.
For Kubernetes backend, this downloads from S3 and streams
into the pod via kubectl exec (since the pod has no S3 access).
Args:
sandbox_id: The sandbox ID
session_id: The session ID to restore
snapshot_storage_path: Path to the snapshot in storage
tenant_id: Tenant identifier for storage access
nextjs_port: Port number for the NextJS dev server
Raises:
RuntimeError: If snapshot restoration fails
FileNotFoundError: If snapshot does not exist
"""
...
@abstractmethod
def health_check(self, sandbox_id: UUID, timeout: float = 60.0) -> bool:
"""Check if the sandbox is healthy.

View File

@@ -65,7 +65,6 @@ from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_END
from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_START
from onyx.server.features.build.configs import SANDBOX_S3_BUCKET
from onyx.server.features.build.configs import SANDBOX_SERVICE_ACCOUNT_NAME
from onyx.server.features.build.s3.s3_client import build_s3_client
from onyx.server.features.build.sandbox.base import SandboxManager
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
ACPEvent,
@@ -409,6 +408,10 @@ done
],
volume_mounts=[
client.V1VolumeMount(name="files", mount_path="/workspace/files"),
# Mount sessions directory so file-sync can create snapshots
client.V1VolumeMount(
name="workspace", mount_path="/workspace/sessions"
),
],
resources=client.V1ResourceRequirements(
# Reduced resources since sidecar is mostly idle (sleeping)
@@ -442,6 +445,10 @@ done
client.V1VolumeMount(
name="files", mount_path="/workspace/files", read_only=True
),
# Mount sessions directory (shared with file-sync for snapshots)
client.V1VolumeMount(
name="workspace", mount_path="/workspace/sessions"
),
],
resources=client.V1ResourceRequirements(
requests={"cpu": "500m", "memory": "1Gi"},
@@ -1335,10 +1342,12 @@ echo "Session cleanup complete"
session_id: UUID,
tenant_id: str,
) -> SnapshotResult | None:
"""Create a snapshot of a session's outputs directory.
"""Create a snapshot of a session's outputs and attachments directories.
For Kubernetes backend, we exec into the pod to create the snapshot.
Only captures sessions/$session_id/outputs/
For Kubernetes backend, we exec into the file-sync container to create
the snapshot and upload to S3. Captures:
- sessions/$session_id/outputs/ (generated artifacts, web apps)
- sessions/$session_id/attachments/ (user uploaded files)
Args:
sandbox_id: The sandbox ID
@@ -1346,7 +1355,7 @@ echo "Session cleanup complete"
tenant_id: Tenant identifier for storage path
Returns:
SnapshotResult with storage path and size
SnapshotResult with storage path and size, or None if nothing to snapshot
Raises:
RuntimeError: If snapshot creation fails
@@ -1356,26 +1365,42 @@ echo "Session cleanup complete"
pod_name = self._get_pod_name(sandbox_id_str)
snapshot_id = str(uuid4())
session_path = f"/workspace/sessions/{session_id_str}"
# Use shlex.quote for safety (UUIDs are safe but good practice)
safe_session_path = shlex.quote(f"/workspace/sessions/{session_id_str}")
s3_path = (
f"s3://{self._s3_bucket}/{tenant_id}/snapshots/"
f"{session_id_str}/{snapshot_id}.tar.gz"
)
# Exec into pod to create and upload snapshot (session outputs only)
# Exec into pod to create and upload snapshot (outputs + attachments)
# Uses s5cmd pipe to stream tar.gz directly to S3
# Note: attachments/ may not exist if user never uploaded files, so we
# use a shell script to only include directories that exist
exec_command = [
"/bin/sh",
"-c",
f'tar -czf - -C {session_path} outputs | aws s3 cp - {s3_path} --tagging "Type=snapshot"',
f"""
set -eo pipefail
cd {safe_session_path}
dirs=""
[ -d outputs ] && dirs="$dirs outputs"
[ -d attachments ] && dirs="$dirs attachments"
if [ -z "$dirs" ]; then
echo "EMPTY_SNAPSHOT"
exit 0
fi
tar -czf - $dirs | /s5cmd pipe {s3_path}
echo "SNAPSHOT_CREATED"
""",
]
try:
# Use exec to run snapshot command in sandbox container
# Use exec to run snapshot command in file-sync container (has s5cmd)
resp = k8s_stream(
self._stream_core_api.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=self._namespace,
container="sandbox",
container="file-sync",
command=exec_command,
stderr=True,
stdin=False,
@@ -1385,6 +1410,17 @@ echo "Session cleanup complete"
logger.debug(f"Snapshot exec output: {resp}")
# Check if nothing was snapshotted
if "EMPTY_SNAPSHOT" in resp:
logger.info(
f"No outputs or attachments to snapshot for session {session_id}"
)
return None
# Verify upload succeeded
if "SNAPSHOT_CREATED" not in resp:
raise RuntimeError(f"Snapshot upload may have failed. Output: {resp}")
except ApiException as e:
raise RuntimeError(f"Failed to create snapshot: {e}") from e
@@ -1392,9 +1428,8 @@ echo "Session cleanup complete"
# In production, you might want to query S3 for the actual size
size_bytes = 0
storage_path = (
f"sandbox-snapshots/{tenant_id}/{session_id_str}/{snapshot_id}.tar.gz"
)
# Storage path must match the S3 upload path (without s3://bucket/ prefix)
storage_path = f"{tenant_id}/snapshots/{session_id_str}/{snapshot_id}.tar.gz"
logger.info(f"Created snapshot for session {session_id}")
@@ -1457,14 +1492,21 @@ echo "Session cleanup complete"
snapshot_storage_path: str,
tenant_id: str, # noqa: ARG002
nextjs_port: int,
llm_config: LLMProviderConfig,
use_demo_data: bool = False,
) -> None:
"""Download snapshot from S3, extract into session workspace, and start NextJS.
"""Download snapshot from S3 via s5cmd, extract, regenerate config, and start NextJS.
Since the sandbox pod doesn't have S3 access, this method:
1. Downloads snapshot from S3 (using boto3 directly)
2. Creates the session directory structure in pod
3. Streams the tar.gz into the pod via kubectl exec
4. Starts the NextJS dev server
Uses the file-sync sidecar container (which has s5cmd + S3 credentials
via IRSA) to stream the snapshot directly from S3 into the session
directory. This avoids downloading to the backend server and the
base64 encoding overhead of piping through kubectl exec.
Steps:
1. Exec s5cmd cat in file-sync container to stream snapshot from S3
2. Pipe directly to tar for extraction in the shared workspace volume
3. Regenerate configuration files (AGENTS.md, opencode.json, files symlink)
4. Start the NextJS dev server
Args:
sandbox_id: The sandbox ID
@@ -1472,79 +1514,40 @@ echo "Session cleanup complete"
snapshot_storage_path: Path to the snapshot in S3 (relative path)
tenant_id: Tenant identifier for storage access
nextjs_port: Port number for the NextJS dev server
llm_config: LLM provider configuration for opencode.json
use_demo_data: If True, symlink files/ to demo data; else to user files
Raises:
RuntimeError: If snapshot restoration fails
FileNotFoundError: If snapshot does not exist
"""
import tempfile
pod_name = self._get_pod_name(str(sandbox_id))
session_path = f"/workspace/sessions/{session_id}"
safe_session_path = shlex.quote(session_path)
# Build full S3 path
s3_key = snapshot_storage_path
s3_path = f"s3://{self._s3_bucket}/{snapshot_storage_path}"
logger.info(f"Restoring snapshot for session {session_id} from {s3_key}")
logger.info(f"Restoring snapshot for session {session_id} from {s3_path}")
# Download snapshot from S3 - uses IAM roles (IRSA)
s3_client = build_s3_client()
tmp_path: str | None = None
# Stream snapshot directly from S3 via s5cmd in file-sync container.
# Mirrors the upload pattern: upload uses `tar | s5cmd pipe`,
# restore uses `s5cmd cat | tar`. Both run in file-sync container
# which has s5cmd and S3 credentials (IRSA). The shared workspace
# volume makes extracted files immediately visible to the sandbox
# container.
restore_script = f"""
set -eo pipefail
mkdir -p {safe_session_path}
/s5cmd cat {s3_path} | tar -xzf - -C {safe_session_path}
echo "SNAPSHOT_RESTORED"
"""
try:
with tempfile.NamedTemporaryFile(
suffix=".tar.gz", delete=False
) as tmp_file:
tmp_path = tmp_file.name
try:
s3_client.download_file(self._s3_bucket, s3_key, tmp_path)
except s3_client.exceptions.NoSuchKey:
raise FileNotFoundError(
f"Snapshot not found: s3://{self._s3_bucket}/{s3_key}"
)
# Create session directory structure in pod
# Use shlex.quote to prevent shell injection
safe_session_path = shlex.quote(session_path)
setup_script = f"""
set -e
mkdir -p {safe_session_path}/outputs
"""
k8s_stream(
self._stream_core_api.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=self._namespace,
container="sandbox",
command=["/bin/sh", "-c", setup_script],
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
# Stream tar.gz into pod and extract
# We use kubectl exec with stdin to pipe the tar file
with open(tmp_path, "rb") as tar_file:
tar_data = tar_file.read()
# Use base64 encoding to safely transfer binary data
import base64
tar_b64 = base64.b64encode(tar_data).decode("ascii")
# Extract in the session directory (tar was created with outputs/ as root)
extract_script = f"""
set -e
cd {safe_session_path}
echo '{tar_b64}' | base64 -d | tar -xzf -
"""
resp = k8s_stream(
self._stream_core_api.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=self._namespace,
container="sandbox",
command=["/bin/sh", "-c", extract_script],
container="file-sync",
command=["/bin/sh", "-c", restore_script],
stderr=True,
stdin=False,
stdout=True,
@@ -1552,8 +1555,22 @@ echo '{tar_b64}' | base64 -d | tar -xzf -
)
logger.debug(f"Snapshot restore output: {resp}")
if "SNAPSHOT_RESTORED" not in resp:
raise RuntimeError(f"Snapshot restore may have failed. Output: {resp}")
logger.info(f"Restored snapshot for session {session_id}")
# Regenerate configuration files that aren't in the snapshot
# These are regenerated to ensure they match the current system state
self._regenerate_session_config(
pod_name=pod_name,
session_path=safe_session_path,
llm_config=llm_config,
nextjs_port=nextjs_port,
use_demo_data=use_demo_data,
)
# Start NextJS dev server (check node_modules since restoring from snapshot)
start_script = _build_nextjs_start_script(
safe_session_path, nextjs_port, check_node_modules=True
@@ -1575,17 +1592,93 @@ echo '{tar_b64}' | base64 -d | tar -xzf -
except ApiException as e:
raise RuntimeError(f"Failed to restore snapshot: {e}") from e
finally:
# Cleanup temp file
if tmp_path:
try:
import os
os.unlink(tmp_path)
except Exception as cleanup_error:
logger.warning(
f"Failed to cleanup temp file {tmp_path}: {cleanup_error}"
)
def _regenerate_session_config(
self,
pod_name: str,
session_path: str,
llm_config: LLMProviderConfig,
nextjs_port: int,
use_demo_data: bool,
) -> None:
"""Regenerate session configuration files after snapshot restore.
Creates:
- AGENTS.md (agent instructions)
- opencode.json (LLM configuration)
- files symlink (to demo data or user files)
Args:
pod_name: The pod name to exec into
session_path: Path to the session directory (already shlex.quoted)
llm_config: LLM provider configuration
nextjs_port: Port for NextJS (used in AGENTS.md)
use_demo_data: Whether to use demo data or user files
"""
# Generate AGENTS.md content
agent_instructions = self._load_agent_instructions(
files_path=None, # Container script handles this at runtime
provider=llm_config.provider,
model_name=llm_config.model_name,
nextjs_port=nextjs_port,
disabled_tools=OPENCODE_DISABLED_TOOLS,
user_name=None, # Not stored, regenerate without personalization
user_role=None,
use_demo_data=use_demo_data,
include_org_info=False, # Don't include org_info for restored sessions
)
# Generate opencode.json
opencode_config = build_opencode_config(
provider=llm_config.provider,
model_name=llm_config.model_name,
api_key=llm_config.api_key if llm_config.api_key else None,
api_base=llm_config.api_base,
disabled_tools=OPENCODE_DISABLED_TOOLS,
)
opencode_json = json.dumps(opencode_config)
# Escape for shell (single quotes)
opencode_json_escaped = opencode_json.replace("'", "'\\''")
agent_instructions_escaped = agent_instructions.replace("'", "'\\''")
# Build files symlink setup
if use_demo_data:
symlink_target = "/workspace/demo_data"
else:
symlink_target = "/workspace/files"
config_script = f"""
set -e
# Create files symlink
echo "Creating files symlink to {symlink_target}"
ln -sf {symlink_target} {session_path}/files
# Write agent instructions
echo "Writing AGENTS.md"
printf '%s' '{agent_instructions_escaped}' > {session_path}/AGENTS.md
# Write opencode config
echo "Writing opencode.json"
printf '%s' '{opencode_json_escaped}' > {session_path}/opencode.json
echo "Session config regeneration complete"
"""
logger.info("Regenerating session configuration files")
k8s_stream(
self._stream_core_api.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=self._namespace,
container="sandbox",
command=["/bin/sh", "-c", config_script],
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
logger.info("Session configuration files regenerated")
def health_check(self, sandbox_id: UUID, timeout: float = 60.0) -> bool:
"""Check if the sandbox pod is healthy (can exec into it).

View File

@@ -663,53 +663,20 @@ class LocalSandboxManager(SandboxManager):
snapshot_storage_path: str,
tenant_id: str, # noqa: ARG002
nextjs_port: int,
llm_config: LLMProviderConfig,
use_demo_data: bool = False,
) -> None:
"""Restore a snapshot into a session's workspace directory and start NextJS.
"""Not implemented for local backend - workspaces persist on disk.
Args:
sandbox_id: The sandbox ID
session_id: The session ID to restore
snapshot_storage_path: Path to the snapshot in storage
tenant_id: Tenant identifier for storage access
nextjs_port: Port number for the NextJS dev server
Raises:
RuntimeError: If snapshot restoration fails
FileNotFoundError: If snapshot does not exist
Local sandboxes don't use snapshots since the filesystem persists.
This should never be called for local backend.
"""
session_path = self._get_session_path(sandbox_id, session_id)
# Ensure session directory exists
session_path.mkdir(parents=True, exist_ok=True)
# Use SnapshotManager to restore
self._snapshot_manager.restore_snapshot(
storage_path=snapshot_storage_path,
target_path=session_path,
raise NotImplementedError(
"restore_snapshot is not supported for local backend. "
"Local sandboxes persist on disk and don't use snapshots."
)
logger.info(f"Restored snapshot for session {session_id}")
# Start NextJS dev server
web_dir = session_path / "outputs" / "web"
if web_dir.exists():
logger.info(f"Starting Next.js server at {web_dir} on port {nextjs_port}")
nextjs_process = self._process_manager.start_nextjs_server(
web_dir, nextjs_port
)
# Store process for clean shutdown on session delete
self._nextjs_processes[(sandbox_id, session_id)] = nextjs_process
logger.info(
f"Started NextJS server for session {session_id} on port {nextjs_port}"
)
else:
logger.warning(
f"Web directory not found at {web_dir}, skipping NextJS startup"
)
def health_check(
self, sandbox_id: UUID, timeout: float = 60.0 # noqa: ARG002
) -> bool:
def health_check(self, sandbox_id: UUID, timeout: float = 60.0) -> bool:
"""Check if the sandbox is healthy (folder exists).
Args: