Compare commits

...

3 Commits

Author SHA1 Message Date
Wenxi Onyx
faa4f76393 remove manual sandbox status updating to correctly update heartbeat 2026-02-03 13:54:26 -08:00
Wenxi Onyx
4714cf6085 flatten try 2026-02-03 13:24:27 -08:00
Wenxi Onyx
f6b9f0785f fix(craft): pod restoration race and unexpected state 2026-02-03 13:10:14 -08:00
4 changed files with 190 additions and 45 deletions

View File

@@ -74,6 +74,10 @@ def list_sessions(
)
# Lock timeout for session creation (should be longer than max provision time)
SESSION_CREATE_LOCK_TIMEOUT_SECONDS = 300
@router.post("", response_model=DetailedSessionResponse)
def create_session(
request: SessionCreateRequest,
@@ -88,12 +92,31 @@ def create_session(
This endpoint is atomic - if sandbox provisioning fails, no database
records are created (transaction is rolled back).
Uses Redis lock to prevent race conditions when multiple requests try to
create/provision a session for the same user concurrently.
"""
session_manager = SessionManager(db_session)
tenant_id = get_current_tenant_id()
redis_client = get_redis_client(tenant_id=tenant_id)
# Lock on user_id to prevent concurrent session creation for the same user
# This prevents race conditions where two requests both see sandbox as SLEEPING
# and both try to provision, with one deleting the other's work
lock_key = f"session_create:{user.id}"
lock = redis_client.lock(lock_key, timeout=SESSION_CREATE_LOCK_TIMEOUT_SECONDS)
# blocking=True means wait if another create is in progress
acquired = lock.acquire(
blocking=True, blocking_timeout=SESSION_CREATE_LOCK_TIMEOUT_SECONDS
)
if not acquired:
raise HTTPException(
status_code=503,
detail="Session creation timed out waiting for lock",
)
try:
# Only pass user_work_area and user_level if demo data is enabled
# This prevents org_info directory creation when demo data is disabled
session_manager = SessionManager(db_session)
build_session = session_manager.get_or_create_empty_session(
user.id,
user_work_area=(
@@ -105,27 +128,25 @@ def create_session(
demo_data_enabled=request.demo_data_enabled,
)
db_session.commit()
sandbox = get_sandbox_by_user_id(db_session, user.id)
base_response = SessionResponse.from_model(build_session, sandbox)
return DetailedSessionResponse.from_session_response(
base_response, session_loaded_in_sandbox=True
)
except ValueError as e:
# Max concurrent sandboxes reached or other validation error
logger.exception("Sandbox provisioning failed")
logger.exception("Session creation failed")
db_session.rollback()
raise HTTPException(status_code=429, detail=str(e))
except Exception as e:
# Sandbox provisioning failed - rollback to remove any uncommitted records
db_session.rollback()
logger.error(f"Sandbox provisioning failed: {e}")
raise HTTPException(
status_code=500,
detail=f"Sandbox provisioning failed: {e}",
)
# Get the user's sandbox to include in response
sandbox = get_sandbox_by_user_id(db_session, user.id)
base_response = SessionResponse.from_model(build_session, sandbox)
# Session was just created, so it's loaded in the sandbox
return DetailedSessionResponse.from_session_response(
base_response, session_loaded_in_sandbox=True
)
logger.error(f"Session creation failed: {e}")
raise HTTPException(status_code=500, detail=f"Session creation failed: {e}")
finally:
try:
lock.release()
except Exception:
pass
@router.get("/{session_id}", response_model=DetailedSessionResponse)

View File

@@ -88,7 +88,7 @@ SANDBOX_NAMESPACE = os.environ.get("SANDBOX_NAMESPACE", "onyx-sandboxes")
# Container image for sandbox pods
# Should include Next.js template, opencode CLI, and demo_data zip
SANDBOX_CONTAINER_IMAGE = os.environ.get(
"SANDBOX_CONTAINER_IMAGE", "onyxdotapp/sandbox:v0.1.1"
"SANDBOX_CONTAINER_IMAGE", "onyxdotapp/sandbox:v0.1.2"
)
# S3 bucket for sandbox file storage (snapshots, knowledge files, uploads)

View File

@@ -699,6 +699,39 @@ done
logger.warning(f"Timeout waiting for pod {pod_name} to become ready")
return False
def _pod_exists_and_healthy(self, pod_name: str) -> bool:
"""Check if a pod exists and is in a healthy/running state.
Args:
pod_name: Name of the pod to check
Returns:
True if pod exists and is running/ready, False otherwise
"""
try:
pod = self._core_api.read_namespaced_pod(
name=pod_name,
namespace=self._namespace,
)
phase = pod.status.phase
# Check if running and ready
if phase == "Running":
conditions = pod.status.conditions or []
for condition in conditions:
if condition.type == "Ready" and condition.status == "True":
return True
# Pending is OK too - pod is being created by another request
if phase == "Pending":
return True
return False
except ApiException as e:
if e.status == 404:
return False
raise
def provision(
self,
sandbox_id: UUID,
@@ -708,6 +741,10 @@ done
) -> SandboxInfo:
"""Provision a new sandbox as a Kubernetes pod (user-level).
This method is idempotent - if a pod already exists and is healthy,
it will be reused. This prevents race conditions when multiple requests
try to provision the same sandbox concurrently.
Creates pod with:
1. Init container syncs files/ from S3
2. Creates sessions/ directory for per-session workspaces
@@ -734,6 +771,51 @@ done
)
pod_name = self._get_pod_name(str(sandbox_id))
service_name = self._get_service_name(str(sandbox_id))
# Check if pod already exists and is healthy (idempotency check)
if self._pod_exists_and_healthy(pod_name):
logger.info(
f"Pod {pod_name} already exists and is healthy, reusing existing pod"
)
# Ensure service exists too
try:
self._core_api.read_namespaced_service(
name=service_name,
namespace=self._namespace,
)
except ApiException as e:
if e.status == 404:
# Service doesn't exist, create it
logger.debug(f"Creating missing Service {service_name}")
service = self._create_sandbox_service(sandbox_id, tenant_id)
try:
self._core_api.create_namespaced_service(
namespace=self._namespace,
body=service,
)
except ApiException as svc_e:
if svc_e.status != 409: # Ignore AlreadyExists
raise
else:
raise
# Wait for pod to be ready if it's still pending
logger.info(f"Waiting for existing pod {pod_name} to become ready...")
if not self._wait_for_pod_ready(pod_name):
raise RuntimeError(
f"Timeout waiting for existing sandbox pod {pod_name} to become ready"
)
logger.info(
f"Reusing existing Kubernetes sandbox {sandbox_id}, pod: {pod_name}"
)
return SandboxInfo(
sandbox_id=sandbox_id,
directory_path=f"k8s://{self._namespace}/{pod_name}",
status=SandboxStatus.RUNNING,
last_heartbeat=None,
)
try:
# 1. Create Pod (user-level only, no session setup)
@@ -743,18 +825,46 @@ done
user_id=str(user_id),
tenant_id=tenant_id,
)
self._core_api.create_namespaced_pod(
namespace=self._namespace,
body=pod,
)
try:
self._core_api.create_namespaced_pod(
namespace=self._namespace,
body=pod,
)
except ApiException as e:
if e.status == 409:
# Pod was created by another concurrent request
# Check if it's healthy and reuse it
logger.info(
f"Pod {pod_name} already exists (409 conflict), "
"checking if it's healthy..."
)
if self._pod_exists_and_healthy(pod_name):
logger.info(
f"Pod {pod_name} created by concurrent request, reusing"
)
# Continue to ensure service exists and wait for ready
else:
# Pod exists but is not healthy - this shouldn't happen often
# but could occur if a previous provision failed mid-way
logger.warning(
f"Pod {pod_name} exists but is not healthy, "
"waiting for it to become ready or fail"
)
else:
raise
# 2. Create Service
logger.debug(f"Creating Service {self._get_service_name(str(sandbox_id))}")
# 2. Create Service (idempotent - ignore 409)
logger.debug(f"Creating Service {service_name}")
service = self._create_sandbox_service(sandbox_id, tenant_id)
self._core_api.create_namespaced_service(
namespace=self._namespace,
body=service,
)
try:
self._core_api.create_namespaced_service(
namespace=self._namespace,
body=service,
)
except ApiException as e:
if e.status != 409: # Ignore AlreadyExists
raise
logger.debug(f"Service {service_name} already exists, reusing")
# 3. Wait for pod to be ready
logger.info(f"Waiting for pod {pod_name} to become ready...")
@@ -776,12 +886,19 @@ done
)
except Exception as e:
# Cleanup on failure
logger.error(
f"Kubernetes sandbox provisioning failed for sandbox {sandbox_id}: {e}",
exc_info=True,
)
self._cleanup_kubernetes_resources(str(sandbox_id))
# Only cleanup if we're sure the pod is not being used by another request
# Check if pod is healthy - if so, don't clean up (another request may own it)
if self._pod_exists_and_healthy(pod_name):
logger.warning(
f"Kubernetes sandbox provisioning failed for sandbox {sandbox_id}: {e}, "
"but pod is healthy (likely owned by concurrent request), not cleaning up"
)
else:
logger.error(
f"Kubernetes sandbox provisioning failed for sandbox {sandbox_id}: {e}",
exc_info=True,
)
self._cleanup_kubernetes_resources(str(sandbox_id))
raise
def _wait_for_resource_deletion(

View File

@@ -69,6 +69,7 @@ from onyx.server.features.build.db.sandbox import get_running_sandbox_count_by_t
from onyx.server.features.build.db.sandbox import get_sandbox_by_session_id
from onyx.server.features.build.db.sandbox import get_sandbox_by_user_id
from onyx.server.features.build.db.sandbox import update_sandbox_heartbeat
from onyx.server.features.build.db.sandbox import update_sandbox_status__no_commit
from onyx.server.features.build.sandbox import get_sandbox_manager
from onyx.server.features.build.sandbox.kubernetes.internal.acp_exec_client import (
SSEKeepalive,
@@ -481,8 +482,10 @@ class SessionManager:
tenant_id=tenant_id,
llm_config=llm_config,
)
sandbox.status = sandbox_info.status
self._db_session.flush()
# Use update function to also set heartbeat when transitioning to RUNNING
update_sandbox_status__no_commit(
self._db_session, sandbox_id, sandbox_info.status
)
elif sandbox.status.is_active():
# Verify pod is healthy before reusing (use short timeout for quick check)
if not self._sandbox_manager.health_check(sandbox_id, timeout=5.0):
@@ -494,8 +497,9 @@ class SessionManager:
self._sandbox_manager.terminate(sandbox_id)
# Mark as terminated and re-provision
sandbox.status = SandboxStatus.TERMINATED
self._db_session.flush()
update_sandbox_status__no_commit(
self._db_session, sandbox_id, SandboxStatus.TERMINATED
)
logger.info(
f"Re-provisioning sandbox {sandbox_id} for user {user_id}"
@@ -506,8 +510,10 @@ class SessionManager:
tenant_id=tenant_id,
llm_config=llm_config,
)
sandbox.status = sandbox_info.status
self._db_session.flush()
# Use update function to also set heartbeat when transitioning to RUNNING
update_sandbox_status__no_commit(
self._db_session, sandbox_id, sandbox_info.status
)
else:
logger.info(
f"Reusing existing sandbox {sandbox_id} (status: {sandbox.status}) "
@@ -539,9 +545,10 @@ class SessionManager:
llm_config=llm_config,
)
# Update sandbox record with status from provisioning
sandbox.status = sandbox_info.status
self._db_session.flush()
# Update sandbox status (also refreshes heartbeat when transitioning to RUNNING)
update_sandbox_status__no_commit(
self._db_session, sandbox_id, sandbox_info.status
)
# Set up session workspace within the sandbox
logger.info(