mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-02-25 11:45:47 +00:00
Compare commits
7 Commits
ci_python_
...
experiment
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f95b4637e9 | ||
|
|
b9f13db07f | ||
|
|
83b38bcc14 | ||
|
|
0eebb23099 | ||
|
|
5df0d0be24 | ||
|
|
86945782cb | ||
|
|
052f6bebc8 |
@@ -228,14 +228,13 @@ class BuildSessionStatus(str, PyEnum):
|
||||
class SandboxStatus(str, PyEnum):
|
||||
PROVISIONING = "provisioning"
|
||||
RUNNING = "running"
|
||||
IDLE = "idle"
|
||||
SLEEPING = "sleeping" # Pod terminated, snapshots saved to S3
|
||||
TERMINATED = "terminated"
|
||||
FAILED = "failed"
|
||||
|
||||
def is_active(self) -> bool:
|
||||
"""Check if sandbox is in an active state (running or idle)."""
|
||||
return self in (SandboxStatus.RUNNING, SandboxStatus.IDLE)
|
||||
return self == SandboxStatus.RUNNING
|
||||
|
||||
def is_terminal(self) -> bool:
|
||||
"""Check if sandbox is in a terminal state."""
|
||||
|
||||
@@ -243,6 +243,7 @@ class WebappInfo(BaseModel):
|
||||
has_webapp: bool # Whether a webapp exists in outputs/web
|
||||
webapp_url: str | None # URL to access the webapp (e.g., http://localhost:3015)
|
||||
status: str # Sandbox status (running, terminated, etc.)
|
||||
ready: bool # Whether the NextJS dev server is actually responding
|
||||
|
||||
|
||||
# ===== File Upload Models =====
|
||||
|
||||
@@ -13,6 +13,7 @@ from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.auth.users import current_user
|
||||
from onyx.db.engine.sql_engine import get_session
|
||||
from onyx.db.enums import BuildSessionStatus
|
||||
from onyx.db.enums import SandboxStatus
|
||||
from onyx.db.models import BuildMessage
|
||||
from onyx.db.models import User
|
||||
@@ -32,6 +33,8 @@ from onyx.server.features.build.api.models import SuggestionBubble
|
||||
from onyx.server.features.build.api.models import SuggestionTheme
|
||||
from onyx.server.features.build.api.models import UploadResponse
|
||||
from onyx.server.features.build.api.models import WebappInfo
|
||||
from onyx.server.features.build.configs import SANDBOX_BACKEND
|
||||
from onyx.server.features.build.configs import SandboxBackend
|
||||
from onyx.server.features.build.db.build_session import allocate_nextjs_port
|
||||
from onyx.server.features.build.db.build_session import get_build_session
|
||||
from onyx.server.features.build.db.sandbox import get_latest_snapshot_for_session
|
||||
@@ -345,14 +348,25 @@ def restore_session(
|
||||
Returns immediately if session workspace already exists in pod.
|
||||
Always returns session_loaded_in_sandbox=True on success.
|
||||
"""
|
||||
logger.info(
|
||||
f"[RESTORE] === Starting restore for session {session_id}, user {user.id} ==="
|
||||
)
|
||||
|
||||
session = get_build_session(session_id, user.id, db_session)
|
||||
if not session:
|
||||
logger.warning(f"[RESTORE] Session {session_id} not found for user {user.id}")
|
||||
raise HTTPException(status_code=404, detail="Session not found")
|
||||
|
||||
sandbox = get_sandbox_by_user_id(db_session, user.id)
|
||||
if not sandbox:
|
||||
logger.warning(f"[RESTORE] No sandbox found for user {user.id}")
|
||||
raise HTTPException(status_code=404, detail="Sandbox not found")
|
||||
|
||||
logger.info(
|
||||
f"[RESTORE] Session status={session.status}, "
|
||||
f"sandbox={sandbox.id} status={sandbox.status}"
|
||||
)
|
||||
|
||||
# If sandbox is already running, check if session workspace exists
|
||||
sandbox_manager = get_sandbox_manager()
|
||||
tenant_id = get_current_tenant_id()
|
||||
@@ -362,32 +376,41 @@ def restore_session(
|
||||
lock_key = f"sandbox_restore:{sandbox.id}"
|
||||
lock = redis_client.lock(lock_key, timeout=RESTORE_LOCK_TIMEOUT_SECONDS)
|
||||
|
||||
# blocking=True means wait if another restore is in progress
|
||||
acquired = lock.acquire(
|
||||
blocking=True, blocking_timeout=RESTORE_LOCK_TIMEOUT_SECONDS
|
||||
)
|
||||
# Non-blocking: if another restore is already running, return 409 immediately
|
||||
# instead of making the user wait. The frontend will retry.
|
||||
logger.info(f"[RESTORE] Acquiring Redis lock: {lock_key}")
|
||||
acquired = lock.acquire(blocking=False)
|
||||
if not acquired:
|
||||
logger.info(f"[RESTORE] Lock {lock_key} held by another request, returning 409")
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="Restore operation timed out waiting for lock",
|
||||
status_code=409,
|
||||
detail="Restore already in progress",
|
||||
)
|
||||
|
||||
logger.info("[RESTORE] Lock acquired, proceeding with restore")
|
||||
|
||||
try:
|
||||
# Re-fetch sandbox status (may have changed while waiting for lock)
|
||||
db_session.refresh(sandbox)
|
||||
logger.info(f"[RESTORE] After refresh: sandbox status={sandbox.status}")
|
||||
|
||||
# Also re-check if session workspace exists (another request may have
|
||||
# restored it while we were waiting)
|
||||
if sandbox.status == SandboxStatus.RUNNING:
|
||||
# Verify pod is healthy before proceeding
|
||||
logger.info("[RESTORE] Sandbox is RUNNING, checking health...")
|
||||
is_healthy = sandbox_manager.health_check(sandbox.id, timeout=10.0)
|
||||
logger.info(f"[RESTORE] Health check result: {is_healthy}")
|
||||
if is_healthy and sandbox_manager.session_workspace_exists(
|
||||
sandbox.id, session_id
|
||||
):
|
||||
logger.info(
|
||||
f"Session {session_id} workspace was restored by another request"
|
||||
f"[RESTORE] Session {session_id} workspace already exists, "
|
||||
f"returning early"
|
||||
)
|
||||
# Update heartbeat to mark sandbox as active
|
||||
# Ensure session is marked ACTIVE (may still be IDLE from sleep)
|
||||
if session.status != BuildSessionStatus.ACTIVE:
|
||||
session.status = BuildSessionStatus.ACTIVE
|
||||
update_sandbox_heartbeat(db_session, sandbox.id)
|
||||
base_response = SessionResponse.from_model(session, sandbox)
|
||||
return DetailedSessionResponse.from_session_response(
|
||||
@@ -413,14 +436,27 @@ def restore_session(
|
||||
|
||||
if sandbox.status in (SandboxStatus.SLEEPING, SandboxStatus.TERMINATED):
|
||||
# 1. Re-provision the pod
|
||||
logger.info(f"Re-provisioning {sandbox.status.value} sandbox {sandbox.id}")
|
||||
logger.info(
|
||||
f"[RESTORE] Sandbox is {sandbox.status.value}, re-provisioning..."
|
||||
)
|
||||
|
||||
# Mark as PROVISIONING before the long-running provision() call
|
||||
# so other requests know work is in progress
|
||||
update_sandbox_status__no_commit(
|
||||
db_session, sandbox.id, SandboxStatus.PROVISIONING
|
||||
)
|
||||
db_session.commit()
|
||||
db_session.refresh(sandbox)
|
||||
|
||||
llm_config = session_manager._get_llm_config(None, None)
|
||||
logger.info(f"[RESTORE] Calling provision() for sandbox {sandbox.id}")
|
||||
sandbox_manager.provision(
|
||||
sandbox_id=sandbox.id,
|
||||
user_id=user.id,
|
||||
tenant_id=tenant_id,
|
||||
llm_config=llm_config,
|
||||
)
|
||||
logger.info("[RESTORE] Provision complete, marking RUNNING")
|
||||
update_sandbox_status__no_commit(
|
||||
db_session, sandbox.id, SandboxStatus.RUNNING
|
||||
)
|
||||
@@ -429,17 +465,36 @@ def restore_session(
|
||||
|
||||
# 2. Check if session workspace needs to be loaded
|
||||
if sandbox.status == SandboxStatus.RUNNING:
|
||||
if not sandbox_manager.session_workspace_exists(sandbox.id, session_id):
|
||||
# Get latest snapshot and restore it
|
||||
snapshot = get_latest_snapshot_for_session(db_session, session_id)
|
||||
logger.info("[RESTORE] Checking if session workspace exists in pod...")
|
||||
workspace_exists = sandbox_manager.session_workspace_exists(
|
||||
sandbox.id, session_id
|
||||
)
|
||||
logger.info(f"[RESTORE] Workspace exists: {workspace_exists}")
|
||||
|
||||
if not workspace_exists:
|
||||
# Only Kubernetes backend supports snapshot restoration
|
||||
snapshot = None
|
||||
if SANDBOX_BACKEND == SandboxBackend.KUBERNETES:
|
||||
snapshot = get_latest_snapshot_for_session(db_session, session_id)
|
||||
logger.info(
|
||||
f"[RESTORE] Snapshot lookup: "
|
||||
f"{'found ' + snapshot.storage_path if snapshot else 'NONE'}"
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"[RESTORE] Backend is {SANDBOX_BACKEND}, skipping snapshot"
|
||||
)
|
||||
|
||||
llm_config = session_manager._get_llm_config(None, None)
|
||||
|
||||
if snapshot:
|
||||
# Allocate a new port for the restored session
|
||||
new_port = allocate_nextjs_port(db_session)
|
||||
session.nextjs_port = new_port
|
||||
# Commit port allocation before the long-running restore
|
||||
db_session.commit()
|
||||
|
||||
logger.info(
|
||||
f"Restoring snapshot for session {session_id} "
|
||||
f"[RESTORE] Restoring snapshot for session {session_id} "
|
||||
f"from {snapshot.storage_path} with port {new_port}"
|
||||
)
|
||||
|
||||
@@ -450,32 +505,48 @@ def restore_session(
|
||||
snapshot_storage_path=snapshot.storage_path,
|
||||
tenant_id=tenant_id,
|
||||
nextjs_port=new_port,
|
||||
llm_config=llm_config,
|
||||
use_demo_data=session.demo_data_enabled,
|
||||
)
|
||||
logger.info(
|
||||
f"[RESTORE] Snapshot restore succeeded for {session_id}"
|
||||
)
|
||||
session.status = BuildSessionStatus.ACTIVE
|
||||
db_session.commit()
|
||||
except Exception as e:
|
||||
# Clear the port allocation on failure so it can be reused
|
||||
logger.error(
|
||||
f"Failed to restore session {session_id}, "
|
||||
f"clearing port {new_port}: {e}"
|
||||
f"[RESTORE] Snapshot restore FAILED for {session_id}, "
|
||||
f"clearing port {new_port}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
session.nextjs_port = None
|
||||
db_session.commit()
|
||||
raise
|
||||
else:
|
||||
# No snapshot - set up fresh workspace
|
||||
# No snapshot or local backend - set up fresh workspace
|
||||
logger.info(
|
||||
f"No snapshot found for session {session_id}, "
|
||||
f"setting up fresh workspace"
|
||||
f"[RESTORE] No snapshot, setting up fresh workspace "
|
||||
f"for session {session_id}"
|
||||
)
|
||||
llm_config = session_manager._get_llm_config(None, None)
|
||||
if not session.nextjs_port:
|
||||
session.nextjs_port = allocate_nextjs_port(db_session)
|
||||
|
||||
sandbox_manager.setup_session_workspace(
|
||||
sandbox_id=sandbox.id,
|
||||
session_id=session_id,
|
||||
llm_config=llm_config,
|
||||
nextjs_port=session.nextjs_port or 3010,
|
||||
nextjs_port=session.nextjs_port,
|
||||
)
|
||||
session.status = BuildSessionStatus.ACTIVE
|
||||
db_session.commit()
|
||||
else:
|
||||
logger.warning(
|
||||
f"[RESTORE] Sandbox status is {sandbox.status} after "
|
||||
f"re-provision block, expected RUNNING"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to restore session {session_id}: {e}", exc_info=True)
|
||||
logger.error(f"[RESTORE] FAILED for session {session_id}: {e}", exc_info=True)
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to restore session: {e}",
|
||||
@@ -483,10 +554,16 @@ def restore_session(
|
||||
finally:
|
||||
if lock.owned():
|
||||
lock.release()
|
||||
logger.info(f"[RESTORE] Released lock {lock_key}")
|
||||
|
||||
# Update heartbeat to mark sandbox as active after successful restore
|
||||
update_sandbox_heartbeat(db_session, sandbox.id)
|
||||
|
||||
logger.info(
|
||||
f"[RESTORE] === Restore complete for session {session_id}, "
|
||||
f"sandbox={sandbox.id}, status={sandbox.status} ==="
|
||||
)
|
||||
|
||||
base_response = SessionResponse.from_model(session, sandbox)
|
||||
return DetailedSessionResponse.from_session_response(
|
||||
base_response, session_loaded_in_sandbox=True
|
||||
|
||||
@@ -18,7 +18,6 @@ from onyx.db.models import BuildMessage
|
||||
from onyx.db.models import BuildSession
|
||||
from onyx.db.models import LLMProvider as LLMProviderModel
|
||||
from onyx.db.models import Sandbox
|
||||
from onyx.db.models import Snapshot
|
||||
from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_END
|
||||
from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_START
|
||||
from onyx.server.manage.llm.models import LLMProviderView
|
||||
@@ -269,27 +268,6 @@ def update_artifact(
|
||||
logger.info(f"Updated artifact {artifact_id}")
|
||||
|
||||
|
||||
# Snapshot operations
|
||||
def create_snapshot(
|
||||
session_id: UUID,
|
||||
storage_path: str,
|
||||
size_bytes: int,
|
||||
db_session: Session,
|
||||
) -> Snapshot:
|
||||
"""Create a new snapshot record."""
|
||||
snapshot = Snapshot(
|
||||
session_id=session_id,
|
||||
storage_path=storage_path,
|
||||
size_bytes=size_bytes,
|
||||
)
|
||||
db_session.add(snapshot)
|
||||
db_session.commit()
|
||||
db_session.refresh(snapshot)
|
||||
|
||||
logger.info(f"Created snapshot {snapshot.id} for session {session_id}")
|
||||
return snapshot
|
||||
|
||||
|
||||
# Message operations
|
||||
def create_message(
|
||||
session_id: UUID,
|
||||
@@ -501,6 +479,32 @@ def allocate_nextjs_port(db_session: Session) -> int:
|
||||
)
|
||||
|
||||
|
||||
def mark_user_sessions_idle__no_commit(db_session: Session, user_id: UUID) -> int:
|
||||
"""Mark all ACTIVE sessions for a user as IDLE.
|
||||
|
||||
Called when a sandbox goes to sleep so the frontend knows these sessions
|
||||
need restoration before they can be used again.
|
||||
|
||||
Args:
|
||||
db_session: Database session
|
||||
user_id: The user whose sessions should be marked idle
|
||||
|
||||
Returns:
|
||||
Number of sessions updated
|
||||
"""
|
||||
result = (
|
||||
db_session.query(BuildSession)
|
||||
.filter(
|
||||
BuildSession.user_id == user_id,
|
||||
BuildSession.status == BuildSessionStatus.ACTIVE,
|
||||
)
|
||||
.update({BuildSession.status: BuildSessionStatus.IDLE})
|
||||
)
|
||||
db_session.flush()
|
||||
logger.info(f"Marked {result} sessions as IDLE for user {user_id}")
|
||||
return result
|
||||
|
||||
|
||||
def clear_nextjs_ports_for_user(db_session: Session, user_id: UUID) -> int:
|
||||
"""Clear nextjs_port for all sessions belonging to a user.
|
||||
|
||||
|
||||
@@ -126,7 +126,7 @@ def get_idle_sandboxes(
|
||||
)
|
||||
|
||||
stmt = select(Sandbox).where(
|
||||
Sandbox.status.in_([SandboxStatus.RUNNING, SandboxStatus.IDLE]),
|
||||
Sandbox.status == SandboxStatus.RUNNING,
|
||||
or_(
|
||||
Sandbox.last_heartbeat < threshold_time,
|
||||
and_(
|
||||
@@ -147,27 +147,30 @@ def get_running_sandbox_count_by_tenant(
|
||||
since Sandbox model no longer has tenant_id. This function returns
|
||||
the count of all running sandboxes.
|
||||
"""
|
||||
stmt = select(func.count(Sandbox.id)).where(
|
||||
Sandbox.status.in_([SandboxStatus.RUNNING, SandboxStatus.IDLE])
|
||||
)
|
||||
stmt = select(func.count(Sandbox.id)).where(Sandbox.status == SandboxStatus.RUNNING)
|
||||
result = db_session.execute(stmt).scalar()
|
||||
return result or 0
|
||||
|
||||
|
||||
def create_snapshot(
|
||||
def create_snapshot__no_commit(
|
||||
db_session: Session,
|
||||
session_id: UUID,
|
||||
storage_path: str,
|
||||
size_bytes: int,
|
||||
) -> Snapshot:
|
||||
"""Create a snapshot record for a session."""
|
||||
"""Create a snapshot record for a session.
|
||||
|
||||
NOTE: Uses flush() instead of commit(). The caller (cleanup task) is
|
||||
responsible for committing after all snapshots + status updates are done,
|
||||
so the entire operation is atomic.
|
||||
"""
|
||||
snapshot = Snapshot(
|
||||
session_id=session_id,
|
||||
storage_path=storage_path,
|
||||
size_bytes=size_bytes,
|
||||
)
|
||||
db_session.add(snapshot)
|
||||
db_session.commit()
|
||||
db_session.flush()
|
||||
return snapshot
|
||||
|
||||
|
||||
|
||||
@@ -183,13 +183,14 @@ class SandboxManager(ABC):
|
||||
session_id: UUID,
|
||||
tenant_id: str,
|
||||
) -> SnapshotResult | None:
|
||||
"""Create a snapshot of a session's outputs directory.
|
||||
"""Create a snapshot of a session's outputs and attachments directories.
|
||||
|
||||
Captures only the session-specific outputs:
|
||||
sessions/$session_id/outputs/
|
||||
Captures session-specific user data:
|
||||
- sessions/$session_id/outputs/ (generated artifacts, web apps)
|
||||
- sessions/$session_id/attachments/ (user uploaded files)
|
||||
|
||||
Does NOT include: venv, skills, AGENTS.md, opencode.json, attachments
|
||||
Does NOT include: shared files/ directory
|
||||
Does NOT include: venv, skills, AGENTS.md, opencode.json, files symlink
|
||||
(these are regenerated during restore)
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
@@ -197,14 +198,45 @@ class SandboxManager(ABC):
|
||||
tenant_id: Tenant identifier for storage path
|
||||
|
||||
Returns:
|
||||
SnapshotResult with storage path and size, or None if
|
||||
snapshots are disabled for this backend
|
||||
SnapshotResult with storage path and size, or None if:
|
||||
- Snapshots are disabled for this backend
|
||||
- No outputs directory exists (nothing to snapshot)
|
||||
|
||||
Raises:
|
||||
RuntimeError: If snapshot creation fails
|
||||
"""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def restore_snapshot(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
session_id: UUID,
|
||||
snapshot_storage_path: str,
|
||||
tenant_id: str,
|
||||
nextjs_port: int,
|
||||
llm_config: LLMProviderConfig,
|
||||
use_demo_data: bool = False,
|
||||
) -> None:
|
||||
"""Restore a session workspace from a snapshot.
|
||||
|
||||
For Kubernetes: Downloads and extracts the snapshot, regenerates config files.
|
||||
For Local: No-op since workspaces persist on disk (no snapshots).
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
session_id: The session ID to restore
|
||||
snapshot_storage_path: Path to the snapshot in storage
|
||||
tenant_id: Tenant identifier for storage access
|
||||
nextjs_port: Port number for the NextJS dev server
|
||||
llm_config: LLM provider configuration for opencode.json
|
||||
use_demo_data: If True, symlink files/ to demo data
|
||||
|
||||
Raises:
|
||||
RuntimeError: If snapshot restoration fails
|
||||
"""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def session_workspace_exists(
|
||||
self,
|
||||
@@ -225,36 +257,6 @@ class SandboxManager(ABC):
|
||||
"""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def restore_snapshot(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
session_id: UUID,
|
||||
snapshot_storage_path: str,
|
||||
tenant_id: str,
|
||||
nextjs_port: int,
|
||||
) -> None:
|
||||
"""Restore a snapshot into a session's workspace directory.
|
||||
|
||||
Downloads the snapshot from storage, extracts it into
|
||||
sessions/$session_id/outputs/, and starts the NextJS server.
|
||||
|
||||
For Kubernetes backend, this downloads from S3 and streams
|
||||
into the pod via kubectl exec (since the pod has no S3 access).
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
session_id: The session ID to restore
|
||||
snapshot_storage_path: Path to the snapshot in storage
|
||||
tenant_id: Tenant identifier for storage access
|
||||
nextjs_port: Port number for the NextJS dev server
|
||||
|
||||
Raises:
|
||||
RuntimeError: If snapshot restoration fails
|
||||
FileNotFoundError: If snapshot does not exist
|
||||
"""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def health_check(self, sandbox_id: UUID, timeout: float = 60.0) -> bool:
|
||||
"""Check if the sandbox is healthy.
|
||||
|
||||
@@ -65,7 +65,6 @@ from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_END
|
||||
from onyx.server.features.build.configs import SANDBOX_NEXTJS_PORT_START
|
||||
from onyx.server.features.build.configs import SANDBOX_S3_BUCKET
|
||||
from onyx.server.features.build.configs import SANDBOX_SERVICE_ACCOUNT_NAME
|
||||
from onyx.server.features.build.s3.s3_client import build_s3_client
|
||||
from onyx.server.features.build.sandbox.base import SandboxManager
|
||||
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
|
||||
ACPEvent,
|
||||
@@ -409,6 +408,10 @@ done
|
||||
],
|
||||
volume_mounts=[
|
||||
client.V1VolumeMount(name="files", mount_path="/workspace/files"),
|
||||
# Mount sessions directory so file-sync can create snapshots
|
||||
client.V1VolumeMount(
|
||||
name="workspace", mount_path="/workspace/sessions"
|
||||
),
|
||||
],
|
||||
resources=client.V1ResourceRequirements(
|
||||
# Reduced resources since sidecar is mostly idle (sleeping)
|
||||
@@ -442,6 +445,10 @@ done
|
||||
client.V1VolumeMount(
|
||||
name="files", mount_path="/workspace/files", read_only=True
|
||||
),
|
||||
# Mount sessions directory (shared with file-sync for snapshots)
|
||||
client.V1VolumeMount(
|
||||
name="workspace", mount_path="/workspace/sessions"
|
||||
),
|
||||
],
|
||||
resources=client.V1ResourceRequirements(
|
||||
requests={"cpu": "500m", "memory": "1Gi"},
|
||||
@@ -583,6 +590,104 @@ done
|
||||
),
|
||||
)
|
||||
|
||||
def _ensure_service_exists(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
tenant_id: str,
|
||||
) -> None:
|
||||
"""Ensure a ClusterIP service exists for the sandbox pod.
|
||||
|
||||
Handles the case where a service is in Terminating state (has a
|
||||
deletion_timestamp) by waiting for deletion and recreating it.
|
||||
This prevents a race condition where provision reuses an existing pod
|
||||
but the old service is still being deleted.
|
||||
"""
|
||||
service_name = self._get_service_name(str(sandbox_id))
|
||||
|
||||
try:
|
||||
svc = self._core_api.read_namespaced_service(
|
||||
name=service_name,
|
||||
namespace=self._namespace,
|
||||
)
|
||||
# Service exists - check if it's being deleted
|
||||
if svc.metadata.deletion_timestamp:
|
||||
logger.info(
|
||||
f"Service {service_name} is terminating, waiting for deletion"
|
||||
)
|
||||
self._wait_for_resource_deletion("service", service_name)
|
||||
# Now create a fresh service
|
||||
service = self._create_sandbox_service(sandbox_id, tenant_id)
|
||||
self._core_api.create_namespaced_service(
|
||||
namespace=self._namespace,
|
||||
body=service,
|
||||
)
|
||||
logger.info(f"Recreated Service {service_name} after termination")
|
||||
else:
|
||||
logger.debug(f"Service {service_name} already exists and is active")
|
||||
|
||||
except ApiException as e:
|
||||
if e.status == 404:
|
||||
# Service doesn't exist, create it
|
||||
logger.info(f"Creating missing Service {service_name}")
|
||||
service = self._create_sandbox_service(sandbox_id, tenant_id)
|
||||
try:
|
||||
self._core_api.create_namespaced_service(
|
||||
namespace=self._namespace,
|
||||
body=service,
|
||||
)
|
||||
except ApiException as svc_e:
|
||||
if svc_e.status != 409: # Ignore AlreadyExists
|
||||
raise
|
||||
logger.debug(
|
||||
f"Service {service_name} was created by another request"
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
def _wait_for_nextjs_ready(
|
||||
self,
|
||||
pod_name: str,
|
||||
port: int,
|
||||
timeout_seconds: int = 30,
|
||||
) -> None:
|
||||
"""Poll until the NextJS dev server responds on the given port.
|
||||
|
||||
Execs a curl loop inside the sandbox container. Logs a warning
|
||||
if the server doesn't become ready within the timeout but does
|
||||
NOT raise — the frontend will retry on its own.
|
||||
"""
|
||||
check_script = (
|
||||
f"for i in $(seq 1 {timeout_seconds}); do "
|
||||
f" if curl -s -o /dev/null -w '' http://localhost:{port}/ --max-time 1 2>/dev/null; then "
|
||||
f' echo "NEXTJS_READY"; exit 0; '
|
||||
f" fi; "
|
||||
f" sleep 1; "
|
||||
f"done; "
|
||||
f'echo "NEXTJS_TIMEOUT"'
|
||||
)
|
||||
|
||||
try:
|
||||
resp = k8s_stream(
|
||||
self._stream_core_api.connect_get_namespaced_pod_exec,
|
||||
name=pod_name,
|
||||
namespace=self._namespace,
|
||||
container="sandbox",
|
||||
command=["/bin/sh", "-c", check_script],
|
||||
stderr=True,
|
||||
stdin=False,
|
||||
stdout=True,
|
||||
tty=False,
|
||||
)
|
||||
if "NEXTJS_READY" in resp:
|
||||
logger.info(f"[SNAPSHOT_RESTORE] NextJS ready on port {port}")
|
||||
else:
|
||||
logger.warning(
|
||||
f"[SNAPSHOT_RESTORE] NextJS not ready after {timeout_seconds}s "
|
||||
f"on port {port}, continuing anyway"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"[SNAPSHOT_RESTORE] Failed to check NextJS readiness: {e}")
|
||||
|
||||
def _get_init_container_logs(self, pod_name: str, container_name: str) -> str:
|
||||
"""Get logs from an init container.
|
||||
|
||||
@@ -798,34 +903,15 @@ done
|
||||
)
|
||||
|
||||
pod_name = self._get_pod_name(str(sandbox_id))
|
||||
service_name = self._get_service_name(str(sandbox_id))
|
||||
self._get_service_name(str(sandbox_id))
|
||||
|
||||
# Check if pod already exists and is healthy (idempotency check)
|
||||
if self._pod_exists_and_healthy(pod_name):
|
||||
logger.info(
|
||||
f"Pod {pod_name} already exists and is healthy, reusing existing pod"
|
||||
)
|
||||
# Ensure service exists too
|
||||
try:
|
||||
self._core_api.read_namespaced_service(
|
||||
name=service_name,
|
||||
namespace=self._namespace,
|
||||
)
|
||||
except ApiException as e:
|
||||
if e.status == 404:
|
||||
# Service doesn't exist, create it
|
||||
logger.debug(f"Creating missing Service {service_name}")
|
||||
service = self._create_sandbox_service(sandbox_id, tenant_id)
|
||||
try:
|
||||
self._core_api.create_namespaced_service(
|
||||
namespace=self._namespace,
|
||||
body=service,
|
||||
)
|
||||
except ApiException as svc_e:
|
||||
if svc_e.status != 409: # Ignore AlreadyExists
|
||||
raise
|
||||
else:
|
||||
raise
|
||||
# Ensure service exists and is not terminating
|
||||
self._ensure_service_exists(sandbox_id, tenant_id)
|
||||
|
||||
# Wait for pod to be ready if it's still pending
|
||||
logger.info(f"Waiting for existing pod {pod_name} to become ready...")
|
||||
@@ -880,20 +966,8 @@ done
|
||||
else:
|
||||
raise
|
||||
|
||||
# 2. Create Service (idempotent - ignore 409)
|
||||
logger.debug(f"Creating Service {service_name}")
|
||||
service = self._create_sandbox_service(sandbox_id, tenant_id)
|
||||
try:
|
||||
self._core_api.create_namespaced_service(
|
||||
namespace=self._namespace,
|
||||
body=service,
|
||||
)
|
||||
except ApiException as e:
|
||||
if e.status != 409: # Ignore AlreadyExists
|
||||
raise
|
||||
logger.warning(
|
||||
f"During provisioning, discovered that service {service_name} already exists. Reusing"
|
||||
)
|
||||
# 2. Create Service (handles terminating services)
|
||||
self._ensure_service_exists(sandbox_id, tenant_id)
|
||||
|
||||
# 3. Wait for pod to be ready
|
||||
logger.info(f"Waiting for pod {pod_name} to become ready...")
|
||||
@@ -1335,10 +1409,12 @@ echo "Session cleanup complete"
|
||||
session_id: UUID,
|
||||
tenant_id: str,
|
||||
) -> SnapshotResult | None:
|
||||
"""Create a snapshot of a session's outputs directory.
|
||||
"""Create a snapshot of a session's outputs and attachments directories.
|
||||
|
||||
For Kubernetes backend, we exec into the pod to create the snapshot.
|
||||
Only captures sessions/$session_id/outputs/
|
||||
For Kubernetes backend, we exec into the file-sync container to create
|
||||
the snapshot and upload to S3. Captures:
|
||||
- sessions/$session_id/outputs/ (generated artifacts, web apps)
|
||||
- sessions/$session_id/attachments/ (user uploaded files)
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
@@ -1346,7 +1422,7 @@ echo "Session cleanup complete"
|
||||
tenant_id: Tenant identifier for storage path
|
||||
|
||||
Returns:
|
||||
SnapshotResult with storage path and size
|
||||
SnapshotResult with storage path and size, or None if nothing to snapshot
|
||||
|
||||
Raises:
|
||||
RuntimeError: If snapshot creation fails
|
||||
@@ -1356,26 +1432,40 @@ echo "Session cleanup complete"
|
||||
pod_name = self._get_pod_name(sandbox_id_str)
|
||||
snapshot_id = str(uuid4())
|
||||
|
||||
session_path = f"/workspace/sessions/{session_id_str}"
|
||||
# Use shlex.quote for safety (UUIDs are safe but good practice)
|
||||
safe_session_path = shlex.quote(f"/workspace/sessions/{session_id_str}")
|
||||
s3_path = (
|
||||
f"s3://{self._s3_bucket}/{tenant_id}/snapshots/"
|
||||
f"{session_id_str}/{snapshot_id}.tar.gz"
|
||||
)
|
||||
|
||||
# Exec into pod to create and upload snapshot (session outputs only)
|
||||
# Exec into pod to create and upload snapshot (outputs + attachments)
|
||||
# Uses s5cmd pipe to stream tar.gz directly to S3
|
||||
# Only snapshot if outputs/ exists. Include attachments/ only if non-empty.
|
||||
exec_command = [
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
f'tar -czf - -C {session_path} outputs | aws s3 cp - {s3_path} --tagging "Type=snapshot"',
|
||||
f"""
|
||||
set -eo pipefail
|
||||
cd {safe_session_path}
|
||||
if [ ! -d outputs ]; then
|
||||
echo "EMPTY_SNAPSHOT"
|
||||
exit 0
|
||||
fi
|
||||
dirs="outputs"
|
||||
[ -d attachments ] && [ "$(ls -A attachments 2>/dev/null)" ] && dirs="$dirs attachments"
|
||||
tar -czf - $dirs | /s5cmd pipe {s3_path}
|
||||
echo "SNAPSHOT_CREATED"
|
||||
""",
|
||||
]
|
||||
|
||||
try:
|
||||
# Use exec to run snapshot command in sandbox container
|
||||
# Use exec to run snapshot command in file-sync container (has s5cmd)
|
||||
resp = k8s_stream(
|
||||
self._stream_core_api.connect_get_namespaced_pod_exec,
|
||||
name=pod_name,
|
||||
namespace=self._namespace,
|
||||
container="sandbox",
|
||||
container="file-sync",
|
||||
command=exec_command,
|
||||
stderr=True,
|
||||
stdin=False,
|
||||
@@ -1385,6 +1475,17 @@ echo "Session cleanup complete"
|
||||
|
||||
logger.debug(f"Snapshot exec output: {resp}")
|
||||
|
||||
# Check if nothing was snapshotted
|
||||
if "EMPTY_SNAPSHOT" in resp:
|
||||
logger.info(
|
||||
f"No outputs or attachments to snapshot for session {session_id}"
|
||||
)
|
||||
return None
|
||||
|
||||
# Verify upload succeeded
|
||||
if "SNAPSHOT_CREATED" not in resp:
|
||||
raise RuntimeError(f"Snapshot upload may have failed. Output: {resp}")
|
||||
|
||||
except ApiException as e:
|
||||
raise RuntimeError(f"Failed to create snapshot: {e}") from e
|
||||
|
||||
@@ -1392,9 +1493,8 @@ echo "Session cleanup complete"
|
||||
# In production, you might want to query S3 for the actual size
|
||||
size_bytes = 0
|
||||
|
||||
storage_path = (
|
||||
f"sandbox-snapshots/{tenant_id}/{session_id_str}/{snapshot_id}.tar.gz"
|
||||
)
|
||||
# Storage path must match the S3 upload path (without s3://bucket/ prefix)
|
||||
storage_path = f"{tenant_id}/snapshots/{session_id_str}/{snapshot_id}.tar.gz"
|
||||
|
||||
logger.info(f"Created snapshot for session {session_id}")
|
||||
|
||||
@@ -1426,7 +1526,7 @@ echo "Session cleanup complete"
|
||||
exec_command = [
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
f'[ -d "{session_path}" ] && echo "EXISTS" || echo "NOT_EXISTS"',
|
||||
f'[ -d "{session_path}" ] && echo "WORKSPACE_FOUND" || echo "WORKSPACE_MISSING"',
|
||||
]
|
||||
|
||||
try:
|
||||
@@ -1442,7 +1542,12 @@ echo "Session cleanup complete"
|
||||
tty=False,
|
||||
)
|
||||
|
||||
return "EXISTS" in resp
|
||||
result = "WORKSPACE_FOUND" in resp
|
||||
logger.info(
|
||||
f"[WORKSPACE_CHECK] session={session_id}, "
|
||||
f"path={session_path}, raw_resp={resp!r}, result={result}"
|
||||
)
|
||||
return result
|
||||
|
||||
except ApiException as e:
|
||||
logger.warning(
|
||||
@@ -1457,14 +1562,21 @@ echo "Session cleanup complete"
|
||||
snapshot_storage_path: str,
|
||||
tenant_id: str, # noqa: ARG002
|
||||
nextjs_port: int,
|
||||
llm_config: LLMProviderConfig,
|
||||
use_demo_data: bool = False,
|
||||
) -> None:
|
||||
"""Download snapshot from S3, extract into session workspace, and start NextJS.
|
||||
"""Download snapshot from S3 via s5cmd, extract, regenerate config, and start NextJS.
|
||||
|
||||
Since the sandbox pod doesn't have S3 access, this method:
|
||||
1. Downloads snapshot from S3 (using boto3 directly)
|
||||
2. Creates the session directory structure in pod
|
||||
3. Streams the tar.gz into the pod via kubectl exec
|
||||
4. Starts the NextJS dev server
|
||||
Uses the file-sync sidecar container (which has s5cmd + S3 credentials
|
||||
via IRSA) to stream the snapshot directly from S3 into the session
|
||||
directory. This avoids downloading to the backend server and the
|
||||
base64 encoding overhead of piping through kubectl exec.
|
||||
|
||||
Steps:
|
||||
1. Exec s5cmd cat in file-sync container to stream snapshot from S3
|
||||
2. Pipe directly to tar for extraction in the shared workspace volume
|
||||
3. Regenerate configuration files (AGENTS.md, opencode.json, files symlink)
|
||||
4. Start the NextJS dev server
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
@@ -1472,93 +1584,82 @@ echo "Session cleanup complete"
|
||||
snapshot_storage_path: Path to the snapshot in S3 (relative path)
|
||||
tenant_id: Tenant identifier for storage access
|
||||
nextjs_port: Port number for the NextJS dev server
|
||||
llm_config: LLM provider configuration for opencode.json
|
||||
use_demo_data: If True, symlink files/ to demo data; else to user files
|
||||
|
||||
Raises:
|
||||
RuntimeError: If snapshot restoration fails
|
||||
FileNotFoundError: If snapshot does not exist
|
||||
"""
|
||||
import tempfile
|
||||
|
||||
pod_name = self._get_pod_name(str(sandbox_id))
|
||||
session_path = f"/workspace/sessions/{session_id}"
|
||||
safe_session_path = shlex.quote(session_path)
|
||||
|
||||
# Build full S3 path
|
||||
s3_key = snapshot_storage_path
|
||||
s3_path = f"s3://{self._s3_bucket}/{snapshot_storage_path}"
|
||||
|
||||
logger.info(f"Restoring snapshot for session {session_id} from {s3_key}")
|
||||
logger.info(
|
||||
f"[SNAPSHOT_RESTORE] pod={pod_name}, session={session_id}, "
|
||||
f"s3_path={s3_path}, bucket={self._s3_bucket}, "
|
||||
f"storage_path={snapshot_storage_path}"
|
||||
)
|
||||
|
||||
# Download snapshot from S3 - uses IAM roles (IRSA)
|
||||
s3_client = build_s3_client()
|
||||
tmp_path: str | None = None
|
||||
# Stream snapshot directly from S3 via s5cmd in file-sync container.
|
||||
# Mirrors the upload pattern: upload uses `tar | s5cmd pipe`,
|
||||
# restore uses `s5cmd cat | tar`. Both run in file-sync container
|
||||
# which has s5cmd and S3 credentials (IRSA). The shared workspace
|
||||
# volume makes extracted files immediately visible to the sandbox
|
||||
# container.
|
||||
restore_script = f"""
|
||||
set -eo pipefail
|
||||
echo "DEBUG: Starting restore to {safe_session_path}"
|
||||
echo "DEBUG: S3 path = {s3_path}"
|
||||
mkdir -p {safe_session_path}
|
||||
echo "DEBUG: mkdir done, running s5cmd cat..."
|
||||
/s5cmd cat {s3_path} | tar -xzf - -C {safe_session_path}
|
||||
echo "DEBUG: tar extraction done"
|
||||
echo "DEBUG: Contents of {safe_session_path}:"
|
||||
ls -la {safe_session_path}/
|
||||
echo "SNAPSHOT_RESTORED"
|
||||
"""
|
||||
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(
|
||||
suffix=".tar.gz", delete=False
|
||||
) as tmp_file:
|
||||
tmp_path = tmp_file.name
|
||||
|
||||
try:
|
||||
s3_client.download_file(self._s3_bucket, s3_key, tmp_path)
|
||||
except s3_client.exceptions.NoSuchKey:
|
||||
raise FileNotFoundError(
|
||||
f"Snapshot not found: s3://{self._s3_bucket}/{s3_key}"
|
||||
)
|
||||
|
||||
# Create session directory structure in pod
|
||||
# Use shlex.quote to prevent shell injection
|
||||
safe_session_path = shlex.quote(session_path)
|
||||
setup_script = f"""
|
||||
set -e
|
||||
mkdir -p {safe_session_path}/outputs
|
||||
"""
|
||||
k8s_stream(
|
||||
self._stream_core_api.connect_get_namespaced_pod_exec,
|
||||
name=pod_name,
|
||||
namespace=self._namespace,
|
||||
container="sandbox",
|
||||
command=["/bin/sh", "-c", setup_script],
|
||||
stderr=True,
|
||||
stdin=False,
|
||||
stdout=True,
|
||||
tty=False,
|
||||
logger.info(
|
||||
"[SNAPSHOT_RESTORE] Executing restore script in file-sync container"
|
||||
)
|
||||
|
||||
# Stream tar.gz into pod and extract
|
||||
# We use kubectl exec with stdin to pipe the tar file
|
||||
with open(tmp_path, "rb") as tar_file:
|
||||
tar_data = tar_file.read()
|
||||
|
||||
# Use base64 encoding to safely transfer binary data
|
||||
import base64
|
||||
|
||||
tar_b64 = base64.b64encode(tar_data).decode("ascii")
|
||||
|
||||
# Extract in the session directory (tar was created with outputs/ as root)
|
||||
extract_script = f"""
|
||||
set -e
|
||||
cd {safe_session_path}
|
||||
echo '{tar_b64}' | base64 -d | tar -xzf -
|
||||
"""
|
||||
resp = k8s_stream(
|
||||
self._stream_core_api.connect_get_namespaced_pod_exec,
|
||||
name=pod_name,
|
||||
namespace=self._namespace,
|
||||
container="sandbox",
|
||||
command=["/bin/sh", "-c", extract_script],
|
||||
container="file-sync",
|
||||
command=["/bin/sh", "-c", restore_script],
|
||||
stderr=True,
|
||||
stdin=False,
|
||||
stdout=True,
|
||||
tty=False,
|
||||
)
|
||||
|
||||
logger.debug(f"Snapshot restore output: {resp}")
|
||||
logger.info(f"Restored snapshot for session {session_id}")
|
||||
logger.info(f"[SNAPSHOT_RESTORE] Script output: {resp}")
|
||||
|
||||
if "SNAPSHOT_RESTORED" not in resp:
|
||||
raise RuntimeError(f"Snapshot restore may have failed. Output: {resp}")
|
||||
|
||||
logger.info("[SNAPSHOT_RESTORE] Extraction succeeded, regenerating config")
|
||||
|
||||
# Regenerate configuration files that aren't in the snapshot
|
||||
# These are regenerated to ensure they match the current system state
|
||||
self._regenerate_session_config(
|
||||
pod_name=pod_name,
|
||||
session_path=safe_session_path,
|
||||
llm_config=llm_config,
|
||||
nextjs_port=nextjs_port,
|
||||
use_demo_data=use_demo_data,
|
||||
)
|
||||
logger.info("[SNAPSHOT_RESTORE] Config regenerated, starting NextJS")
|
||||
|
||||
# Start NextJS dev server (check node_modules since restoring from snapshot)
|
||||
start_script = _build_nextjs_start_script(
|
||||
safe_session_path, nextjs_port, check_node_modules=True
|
||||
)
|
||||
k8s_stream(
|
||||
resp2 = k8s_stream(
|
||||
self._stream_core_api.connect_get_namespaced_pod_exec,
|
||||
name=pod_name,
|
||||
namespace=self._namespace,
|
||||
@@ -1569,23 +1670,102 @@ echo '{tar_b64}' | base64 -d | tar -xzf -
|
||||
stdout=True,
|
||||
tty=False,
|
||||
)
|
||||
logger.info(f"[SNAPSHOT_RESTORE] NextJS start output: {resp2}")
|
||||
|
||||
logger.info(
|
||||
f"Started NextJS server for session {session_id} on port {nextjs_port}"
|
||||
f"[SNAPSHOT_RESTORE] Done for session {session_id} on port {nextjs_port}"
|
||||
)
|
||||
|
||||
except ApiException as e:
|
||||
logger.error(f"[SNAPSHOT_RESTORE] ApiException: {e}", exc_info=True)
|
||||
raise RuntimeError(f"Failed to restore snapshot: {e}") from e
|
||||
finally:
|
||||
# Cleanup temp file
|
||||
if tmp_path:
|
||||
try:
|
||||
import os
|
||||
|
||||
os.unlink(tmp_path)
|
||||
except Exception as cleanup_error:
|
||||
logger.warning(
|
||||
f"Failed to cleanup temp file {tmp_path}: {cleanup_error}"
|
||||
)
|
||||
def _regenerate_session_config(
|
||||
self,
|
||||
pod_name: str,
|
||||
session_path: str,
|
||||
llm_config: LLMProviderConfig,
|
||||
nextjs_port: int,
|
||||
use_demo_data: bool,
|
||||
) -> None:
|
||||
"""Regenerate session configuration files after snapshot restore.
|
||||
|
||||
Creates:
|
||||
- AGENTS.md (agent instructions)
|
||||
- opencode.json (LLM configuration)
|
||||
- files symlink (to demo data or user files)
|
||||
|
||||
Args:
|
||||
pod_name: The pod name to exec into
|
||||
session_path: Path to the session directory (already shlex.quoted)
|
||||
llm_config: LLM provider configuration
|
||||
nextjs_port: Port for NextJS (used in AGENTS.md)
|
||||
use_demo_data: Whether to use demo data or user files
|
||||
"""
|
||||
# Generate AGENTS.md content
|
||||
agent_instructions = self._load_agent_instructions(
|
||||
files_path=None, # Container script handles this at runtime
|
||||
provider=llm_config.provider,
|
||||
model_name=llm_config.model_name,
|
||||
nextjs_port=nextjs_port,
|
||||
disabled_tools=OPENCODE_DISABLED_TOOLS,
|
||||
user_name=None, # Not stored, regenerate without personalization
|
||||
user_role=None,
|
||||
use_demo_data=use_demo_data,
|
||||
include_org_info=False, # Don't include org_info for restored sessions
|
||||
)
|
||||
|
||||
# Generate opencode.json
|
||||
opencode_config = build_opencode_config(
|
||||
provider=llm_config.provider,
|
||||
model_name=llm_config.model_name,
|
||||
api_key=llm_config.api_key if llm_config.api_key else None,
|
||||
api_base=llm_config.api_base,
|
||||
disabled_tools=OPENCODE_DISABLED_TOOLS,
|
||||
)
|
||||
opencode_json = json.dumps(opencode_config)
|
||||
|
||||
# Escape for shell (single quotes)
|
||||
opencode_json_escaped = opencode_json.replace("'", "'\\''")
|
||||
agent_instructions_escaped = agent_instructions.replace("'", "'\\''")
|
||||
|
||||
# Build files symlink setup
|
||||
if use_demo_data:
|
||||
symlink_target = "/workspace/demo_data"
|
||||
else:
|
||||
symlink_target = "/workspace/files"
|
||||
|
||||
config_script = f"""
|
||||
set -e
|
||||
|
||||
# Create files symlink
|
||||
echo "Creating files symlink to {symlink_target}"
|
||||
ln -sf {symlink_target} {session_path}/files
|
||||
|
||||
# Write agent instructions
|
||||
echo "Writing AGENTS.md"
|
||||
printf '%s' '{agent_instructions_escaped}' > {session_path}/AGENTS.md
|
||||
|
||||
# Write opencode config
|
||||
echo "Writing opencode.json"
|
||||
printf '%s' '{opencode_json_escaped}' > {session_path}/opencode.json
|
||||
|
||||
echo "Session config regeneration complete"
|
||||
"""
|
||||
|
||||
logger.info("Regenerating session configuration files")
|
||||
k8s_stream(
|
||||
self._stream_core_api.connect_get_namespaced_pod_exec,
|
||||
name=pod_name,
|
||||
namespace=self._namespace,
|
||||
container="sandbox",
|
||||
command=["/bin/sh", "-c", config_script],
|
||||
stderr=True,
|
||||
stdin=False,
|
||||
stdout=True,
|
||||
tty=False,
|
||||
)
|
||||
logger.info("Session configuration files regenerated")
|
||||
|
||||
def health_check(self, sandbox_id: UUID, timeout: float = 60.0) -> bool:
|
||||
"""Check if the sandbox pod is healthy (can exec into it).
|
||||
|
||||
@@ -608,34 +608,14 @@ class LocalSandboxManager(SandboxManager):
|
||||
session_id: UUID,
|
||||
tenant_id: str,
|
||||
) -> SnapshotResult | None:
|
||||
"""Create a snapshot of a session's outputs directory.
|
||||
"""Not implemented for local backend - workspaces persist on disk.
|
||||
|
||||
Returns None if snapshots are disabled (local backend).
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
session_id: The session ID to snapshot
|
||||
tenant_id: Tenant identifier for storage path
|
||||
|
||||
Returns:
|
||||
SnapshotResult with storage path and size, or None if
|
||||
snapshots are disabled for this backend
|
||||
Local sandboxes don't use snapshots since the filesystem persists.
|
||||
This should never be called for local backend.
|
||||
"""
|
||||
session_path = self._get_session_path(sandbox_id, session_id)
|
||||
# SnapshotManager expects string session_id for storage path
|
||||
_, storage_path, size_bytes = self._snapshot_manager.create_snapshot(
|
||||
session_path,
|
||||
str(session_id),
|
||||
tenant_id,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Created snapshot for session {session_id}, size: {size_bytes} bytes"
|
||||
)
|
||||
|
||||
return SnapshotResult(
|
||||
storage_path=storage_path,
|
||||
size_bytes=size_bytes,
|
||||
raise NotImplementedError(
|
||||
"create_snapshot is not supported for local backend. "
|
||||
"Local sandboxes persist on disk and don't use snapshots."
|
||||
)
|
||||
|
||||
def session_workspace_exists(
|
||||
@@ -663,53 +643,20 @@ class LocalSandboxManager(SandboxManager):
|
||||
snapshot_storage_path: str,
|
||||
tenant_id: str, # noqa: ARG002
|
||||
nextjs_port: int,
|
||||
llm_config: LLMProviderConfig,
|
||||
use_demo_data: bool = False,
|
||||
) -> None:
|
||||
"""Restore a snapshot into a session's workspace directory and start NextJS.
|
||||
"""Not implemented for local backend - workspaces persist on disk.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
session_id: The session ID to restore
|
||||
snapshot_storage_path: Path to the snapshot in storage
|
||||
tenant_id: Tenant identifier for storage access
|
||||
nextjs_port: Port number for the NextJS dev server
|
||||
|
||||
Raises:
|
||||
RuntimeError: If snapshot restoration fails
|
||||
FileNotFoundError: If snapshot does not exist
|
||||
Local sandboxes don't use snapshots since the filesystem persists.
|
||||
This should never be called for local backend.
|
||||
"""
|
||||
session_path = self._get_session_path(sandbox_id, session_id)
|
||||
|
||||
# Ensure session directory exists
|
||||
session_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Use SnapshotManager to restore
|
||||
self._snapshot_manager.restore_snapshot(
|
||||
storage_path=snapshot_storage_path,
|
||||
target_path=session_path,
|
||||
raise NotImplementedError(
|
||||
"restore_snapshot is not supported for local backend. "
|
||||
"Local sandboxes persist on disk and don't use snapshots."
|
||||
)
|
||||
|
||||
logger.info(f"Restored snapshot for session {session_id}")
|
||||
|
||||
# Start NextJS dev server
|
||||
web_dir = session_path / "outputs" / "web"
|
||||
if web_dir.exists():
|
||||
logger.info(f"Starting Next.js server at {web_dir} on port {nextjs_port}")
|
||||
nextjs_process = self._process_manager.start_nextjs_server(
|
||||
web_dir, nextjs_port
|
||||
)
|
||||
# Store process for clean shutdown on session delete
|
||||
self._nextjs_processes[(sandbox_id, session_id)] = nextjs_process
|
||||
logger.info(
|
||||
f"Started NextJS server for session {session_id} on port {nextjs_port}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Web directory not found at {web_dir}, skipping NextJS startup"
|
||||
)
|
||||
|
||||
def health_check(
|
||||
self, sandbox_id: UUID, timeout: float = 60.0 # noqa: ARG002
|
||||
) -> bool:
|
||||
def health_check(self, sandbox_id: UUID) -> bool:
|
||||
"""Check if the sandbox is healthy (folder exists).
|
||||
|
||||
Args:
|
||||
|
||||
@@ -16,6 +16,9 @@ from onyx.server.features.build.configs import SANDBOX_BACKEND
|
||||
from onyx.server.features.build.configs import SANDBOX_IDLE_TIMEOUT_SECONDS
|
||||
from onyx.server.features.build.configs import SandboxBackend
|
||||
from onyx.server.features.build.db.build_session import clear_nextjs_ports_for_user
|
||||
from onyx.server.features.build.db.build_session import (
|
||||
mark_user_sessions_idle__no_commit,
|
||||
)
|
||||
from onyx.server.features.build.db.sandbox import get_sandbox_by_user_id
|
||||
from onyx.server.features.build.sandbox.base import get_sandbox_manager
|
||||
from onyx.server.features.build.sandbox.kubernetes.kubernetes_sandbox_manager import (
|
||||
@@ -75,12 +78,11 @@ def cleanup_idle_sandboxes_task(self: Task, *, tenant_id: str) -> None: # noqa:
|
||||
try:
|
||||
# Import here to avoid circular imports
|
||||
from onyx.db.enums import SandboxStatus
|
||||
from onyx.server.features.build.db.sandbox import create_snapshot
|
||||
from onyx.server.features.build.db.sandbox import create_snapshot__no_commit
|
||||
from onyx.server.features.build.db.sandbox import get_idle_sandboxes
|
||||
from onyx.server.features.build.db.sandbox import (
|
||||
update_sandbox_status__no_commit,
|
||||
)
|
||||
from onyx.server.features.build.sandbox import get_sandbox_manager
|
||||
|
||||
sandbox_manager = get_sandbox_manager()
|
||||
|
||||
@@ -128,7 +130,7 @@ def cleanup_idle_sandboxes_task(self: Task, *, tenant_id: str) -> None: # noqa:
|
||||
)
|
||||
if snapshot_result:
|
||||
# Create DB record for the snapshot
|
||||
create_snapshot(
|
||||
create_snapshot__no_commit(
|
||||
db_session,
|
||||
session_id,
|
||||
snapshot_result.storage_path,
|
||||
@@ -154,7 +156,15 @@ def cleanup_idle_sandboxes_task(self: Task, *, tenant_id: str) -> None: # noqa:
|
||||
f"{sandbox.user_id}"
|
||||
)
|
||||
|
||||
# Mark sandbox as SLEEPING (not TERMINATED)
|
||||
# Mark all active sessions as IDLE
|
||||
idled = mark_user_sessions_idle__no_commit(
|
||||
db_session, sandbox.user_id
|
||||
)
|
||||
task_logger.debug(
|
||||
f"Marked {idled} sessions as IDLE for user "
|
||||
f"{sandbox.user_id}"
|
||||
)
|
||||
|
||||
update_sandbox_status__no_commit(
|
||||
db_session, sandbox_id, SandboxStatus.SLEEPING
|
||||
)
|
||||
@@ -272,7 +282,7 @@ def sync_sandbox_files(
|
||||
task_logger.debug(f"No sandbox found for user {user_id}, skipping sync")
|
||||
return False
|
||||
|
||||
if sandbox.status not in [SandboxStatus.RUNNING, SandboxStatus.IDLE]:
|
||||
if sandbox.status != SandboxStatus.RUNNING:
|
||||
task_logger.debug(
|
||||
f"Sandbox {sandbox.id} not running (status={sandbox.status}), "
|
||||
f"skipping sync"
|
||||
|
||||
@@ -1675,7 +1675,8 @@ class SessionManager:
|
||||
user_id: The user ID to verify ownership
|
||||
|
||||
Returns:
|
||||
Dict with has_webapp, webapp_url, and status, or None if session not found
|
||||
Dict with has_webapp, webapp_url, status, and ready,
|
||||
or None if session not found
|
||||
"""
|
||||
# Verify session ownership
|
||||
session = get_build_session(session_id, user_id, self._db_session)
|
||||
@@ -1684,20 +1685,51 @@ class SessionManager:
|
||||
|
||||
sandbox = get_sandbox_by_user_id(self._db_session, user_id)
|
||||
if sandbox is None:
|
||||
return {"has_webapp": False, "webapp_url": None, "status": "no_sandbox"}
|
||||
return {
|
||||
"has_webapp": False,
|
||||
"webapp_url": None,
|
||||
"status": "no_sandbox",
|
||||
"ready": False,
|
||||
}
|
||||
|
||||
# Return the proxy URL - the proxy handles routing to the correct sandbox
|
||||
# for both local and Kubernetes environments
|
||||
webapp_url = None
|
||||
ready = False
|
||||
if session.nextjs_port:
|
||||
webapp_url = f"{WEB_DOMAIN}/api/build/sessions/{session_id}/webapp"
|
||||
|
||||
# Quick health check: can the API server reach the NextJS dev server?
|
||||
ready = self._check_nextjs_ready(sandbox.id, session.nextjs_port)
|
||||
|
||||
return {
|
||||
"has_webapp": session.nextjs_port is not None,
|
||||
"webapp_url": webapp_url,
|
||||
"status": sandbox.status.value,
|
||||
"ready": ready,
|
||||
}
|
||||
|
||||
def _check_nextjs_ready(self, sandbox_id: UUID, port: int) -> bool:
|
||||
"""Check if the NextJS dev server is responding.
|
||||
|
||||
Does a quick HTTP GET to the sandbox's internal URL with a short timeout.
|
||||
Returns True if the server responds with any status code, False on timeout
|
||||
or connection error.
|
||||
"""
|
||||
import httpx
|
||||
|
||||
from onyx.server.features.build.sandbox.base import get_sandbox_manager
|
||||
|
||||
try:
|
||||
sandbox_manager = get_sandbox_manager()
|
||||
internal_url = sandbox_manager.get_webapp_url(sandbox_id, port)
|
||||
with httpx.Client(timeout=2.0) as client:
|
||||
resp = client.get(internal_url)
|
||||
# Any response (even 500) means the server is up
|
||||
return resp.status_code < 500
|
||||
except (httpx.TimeoutException, httpx.ConnectError, Exception):
|
||||
return False
|
||||
|
||||
def download_webapp_zip(
|
||||
self,
|
||||
session_id: UUID,
|
||||
|
||||
@@ -34,9 +34,7 @@ def _schema_exists(schema_name: str) -> bool:
|
||||
class TestTenantProvisioningRollback:
|
||||
"""Integration tests for provisioning failure and rollback."""
|
||||
|
||||
def test_failed_provisioning_cleans_up_schema(
|
||||
self, reset_multitenant: None
|
||||
) -> None:
|
||||
def test_failed_provisioning_cleans_up_schema(self) -> None:
|
||||
"""
|
||||
When setup_tenant fails after schema creation, rollback should
|
||||
clean up the orphaned schema.
|
||||
@@ -79,9 +77,7 @@ class TestTenantProvisioningRollback:
|
||||
created_tenant_id
|
||||
), f"Schema {created_tenant_id} should have been rolled back"
|
||||
|
||||
def test_drop_schema_works_with_uuid_tenant_id(
|
||||
self, reset_multitenant: None
|
||||
) -> None:
|
||||
def test_drop_schema_works_with_uuid_tenant_id(self) -> None:
|
||||
"""
|
||||
drop_schema should work with UUID-format tenant IDs.
|
||||
|
||||
|
||||
@@ -162,9 +162,26 @@ const BuildOutputPanel = memo(({ onClose, isOpen }: BuildOutputPanelProps) => {
|
||||
}
|
||||
}, [session?.id, cachedForSessionId]);
|
||||
|
||||
// Webapp refresh trigger from streaming
|
||||
// Webapp refresh trigger from streaming / restore
|
||||
const webappNeedsRefresh = useWebappNeedsRefresh();
|
||||
|
||||
// Track polling window: poll for up to 30s after a restore/refresh trigger
|
||||
const [pollingDeadline, setPollingDeadline] = useState<number | null>(null);
|
||||
const [isWebappReady, setIsWebappReady] = useState(false);
|
||||
|
||||
// When webappNeedsRefresh bumps (restore or file edit), start a 30s polling window
|
||||
// and reset readiness so we poll until the server is back up
|
||||
useEffect(() => {
|
||||
if (webappNeedsRefresh > 0) {
|
||||
setPollingDeadline(Date.now() + 30_000);
|
||||
setIsWebappReady(false);
|
||||
|
||||
// Force a re-render after 30s to stop polling even if server never responded
|
||||
const timer = setTimeout(() => setPollingDeadline(null), 30_000);
|
||||
return () => clearTimeout(timer);
|
||||
}
|
||||
}, [webappNeedsRefresh]);
|
||||
|
||||
// Fetch webapp info from dedicated endpoint
|
||||
// Only fetch for real sessions when panel is fully open
|
||||
const shouldFetchWebapp =
|
||||
@@ -173,16 +190,28 @@ const BuildOutputPanel = memo(({ onClose, isOpen }: BuildOutputPanelProps) => {
|
||||
!session.id.startsWith("temp-") &&
|
||||
session.status !== "creating";
|
||||
|
||||
// Poll every 2s while NextJS is starting up (capped at 30s), then stop
|
||||
const shouldPoll =
|
||||
!isWebappReady && pollingDeadline !== null && Date.now() < pollingDeadline;
|
||||
|
||||
const { data: webappInfo, mutate } = useSWR(
|
||||
shouldFetchWebapp ? `/api/build/sessions/${session.id}/webapp-info` : null,
|
||||
() => (session?.id ? fetchWebappInfo(session.id) : null),
|
||||
{
|
||||
refreshInterval: 0, // Disable polling, use event-based refresh
|
||||
refreshInterval: shouldPoll ? 2000 : 0,
|
||||
revalidateOnFocus: true,
|
||||
keepPreviousData: true, // Stale-while-revalidate
|
||||
keepPreviousData: true,
|
||||
}
|
||||
);
|
||||
|
||||
// Update readiness from SWR response and clear polling deadline
|
||||
useEffect(() => {
|
||||
if (webappInfo?.ready) {
|
||||
setIsWebappReady(true);
|
||||
setPollingDeadline(null);
|
||||
}
|
||||
}, [webappInfo?.ready]);
|
||||
|
||||
// Update cache when SWR returns data for current session
|
||||
useEffect(() => {
|
||||
if (webappInfo?.webapp_url && session?.id === cachedForSessionId) {
|
||||
@@ -190,9 +219,9 @@ const BuildOutputPanel = memo(({ onClose, isOpen }: BuildOutputPanelProps) => {
|
||||
}
|
||||
}, [webappInfo?.webapp_url, session?.id, cachedForSessionId]);
|
||||
|
||||
// Refresh when web/ file changes
|
||||
// webappNeedsRefresh is a counter that increments on each edit, ensuring
|
||||
// each edit triggers a new refresh even if the panel is already open
|
||||
// Refresh when web/ file changes or after restore
|
||||
// webappNeedsRefresh is a counter that increments on each edit/restore,
|
||||
// ensuring each triggers a new refresh even if the panel is already open
|
||||
useEffect(() => {
|
||||
if (webappNeedsRefresh > 0 && isFullyOpen && session?.id) {
|
||||
mutate();
|
||||
|
||||
@@ -28,6 +28,11 @@ const STATUS_CONFIG = {
|
||||
pulse: false,
|
||||
label: "Sandbox sleeping",
|
||||
},
|
||||
restoring: {
|
||||
color: "bg-status-warning-05",
|
||||
pulse: true,
|
||||
label: "Restoring sandbox...",
|
||||
},
|
||||
terminated: {
|
||||
color: "bg-status-error-05",
|
||||
pulse: false,
|
||||
|
||||
@@ -1222,14 +1222,21 @@ export const useBuildSessionStore = create<BuildSessionStore>()((set, get) => ({
|
||||
|
||||
if (needsRestore) {
|
||||
console.log(`Restoring session ${sessionId}...`);
|
||||
// Update UI to show restoring state
|
||||
// Update UI: show sandbox as "restoring" and session as loading
|
||||
updateSessionData(sessionId, {
|
||||
status: "creating", // Use "creating" to show loading indicator
|
||||
status: "creating",
|
||||
sandbox: sessionData.sandbox
|
||||
? { ...sessionData.sandbox, status: "restoring" }
|
||||
: null,
|
||||
});
|
||||
|
||||
// Call restore endpoint (blocks until complete)
|
||||
sessionData = await restoreSession(sessionId);
|
||||
console.log(`Session ${sessionId} restored successfully`);
|
||||
|
||||
// Clear the "creating" loading indicator so subsequent logic
|
||||
// doesn't mistake this for an active streaming session.
|
||||
updateSessionData(sessionId, { status: "idle" });
|
||||
}
|
||||
|
||||
// Now fetch messages and artifacts
|
||||
@@ -1269,7 +1276,7 @@ export const useBuildSessionStore = create<BuildSessionStore>()((set, get) => ({
|
||||
const statusToUse = isCurrentlyStreaming
|
||||
? currentSession!.status
|
||||
: sessionData.status === "active"
|
||||
? "completed"
|
||||
? "active"
|
||||
: "idle";
|
||||
|
||||
updateSessionData(sessionId, {
|
||||
@@ -1282,6 +1289,14 @@ export const useBuildSessionStore = create<BuildSessionStore>()((set, get) => ({
|
||||
sandbox: sessionData.sandbox,
|
||||
error: null,
|
||||
isLoaded: true,
|
||||
// After restore, bump webappNeedsRefresh so OutputPanel's SWR refetches
|
||||
// webapp-info. Done here (not earlier) so all session data is set atomically.
|
||||
...(needsRestore
|
||||
? {
|
||||
webappNeedsRefresh:
|
||||
(get().sessions.get(sessionId)?.webappNeedsRefresh || 0) + 1,
|
||||
}
|
||||
: {}),
|
||||
});
|
||||
} catch (err) {
|
||||
console.error("Failed to load session:", err);
|
||||
|
||||
@@ -393,7 +393,7 @@ export function useBuildStreaming() {
|
||||
}
|
||||
|
||||
updateSessionData(sessionId, {
|
||||
status: "completed",
|
||||
status: "active",
|
||||
streamItems: [],
|
||||
});
|
||||
break;
|
||||
@@ -418,7 +418,7 @@ export function useBuildStreaming() {
|
||||
} else if (err instanceof RateLimitError) {
|
||||
console.warn("[Streaming] Rate limit exceeded");
|
||||
updateSessionData(sessionId, {
|
||||
status: "completed",
|
||||
status: "active",
|
||||
error: SessionErrorCode.RATE_LIMIT_EXCEEDED,
|
||||
});
|
||||
} else {
|
||||
|
||||
@@ -118,7 +118,7 @@ export type SessionStatus =
|
||||
| "idle"
|
||||
| "creating"
|
||||
| "running"
|
||||
| "completed"
|
||||
| "active"
|
||||
| "failed";
|
||||
|
||||
export interface Session {
|
||||
@@ -148,7 +148,8 @@ export interface ApiSandboxResponse {
|
||||
| "idle"
|
||||
| "sleeping"
|
||||
| "terminated"
|
||||
| "failed";
|
||||
| "failed"
|
||||
| "restoring"; // Frontend-only: set during snapshot restore
|
||||
container_id: string | null;
|
||||
created_at: string;
|
||||
last_heartbeat: string | null;
|
||||
@@ -194,6 +195,7 @@ export interface ApiWebappInfoResponse {
|
||||
has_webapp: boolean;
|
||||
webapp_url: string | null;
|
||||
status: string;
|
||||
ready: boolean;
|
||||
}
|
||||
|
||||
export interface FileSystemEntry {
|
||||
|
||||
Reference in New Issue
Block a user