Compare commits

...

11 Commits

Author SHA1 Message Date
Wenxi Onyx
03e904be4a simplify useBuildSesssionStore and fetch artifacts after restoration 2026-02-10 12:52:21 -08:00
Wenxi Onyx
ecf3c34207 don't release lock on timeout 2026-02-10 12:52:21 -08:00
Wenxi Onyx
42a589e141 comment 2026-02-10 12:52:21 -08:00
Wenxi Onyx
dd0405998d slight increase to timeout 2026-02-10 12:52:21 -08:00
Wenxi Onyx
e55fbc47d4 feat: add timeout to session restoration 2026-02-10 12:52:21 -08:00
Wenxi Onyx
4321e2b922 fix(craft): messages load before session restore 2026-02-10 12:52:21 -08:00
Wenxi
1cab1f54b5 Merge branch 'main' into whuang/craft-file-sync-lock 2026-02-10 12:52:03 -08:00
Wenxi Onyx
8896af2303 make filesync lock acquisition more elegant 2026-02-10 09:41:13 -08:00
Wenxi Onyx
40005e0a09 fix rebase issue 2026-02-10 09:33:44 -08:00
Wenxi Onyx
03bfc14f1c improve file sync scripts 2026-02-10 09:31:19 -08:00
Wenxi Onyx
e7e4c96ccc feat(craft): narrow file sync to source, prevent concurrent syncs, and use --delete flag on incremental syncs 2026-02-10 09:31:16 -08:00
8 changed files with 248 additions and 118 deletions

View File

@@ -817,17 +817,19 @@ def connector_document_extraction(
if processing_mode == ProcessingMode.FILE_SYSTEM:
creator_id = index_attempt.connector_credential_pair.creator_id
if creator_id:
source_value = db_connector.source.value
app.send_task(
OnyxCeleryTask.SANDBOX_FILE_SYNC,
kwargs={
"user_id": str(creator_id),
"tenant_id": tenant_id,
"source": source_value,
},
queue=OnyxCeleryQueues.SANDBOX,
)
logger.info(
f"Triggered sandbox file sync for user {creator_id} "
f"after indexing complete"
f"source={source_value} after indexing complete"
)
except Exception as e:

View File

@@ -160,6 +160,8 @@ CELERY_USER_FILE_PROCESSING_LOCK_TIMEOUT = 30 * 60 # 30 minutes (in seconds)
CELERY_USER_FILE_PROJECT_SYNC_LOCK_TIMEOUT = 5 * 60 # 5 minutes (in seconds)
CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT = 5 * 60 # 5 minutes (in seconds)
DANSWER_REDIS_FUNCTION_LOCK_PREFIX = "da_function_lock:"
TMP_DRALPHA_PERSONA_NAME = "KG Beta"
@@ -447,6 +449,9 @@ class OnyxRedisLocks:
CLEANUP_IDLE_SANDBOXES_BEAT_LOCK = "da_lock:cleanup_idle_sandboxes_beat"
CLEANUP_OLD_SNAPSHOTS_BEAT_LOCK = "da_lock:cleanup_old_snapshots_beat"
# Sandbox file sync
SANDBOX_FILE_SYNC_LOCK_PREFIX = "da_lock:sandbox_file_sync"
class OnyxRedisSignals:
BLOCK_VALIDATE_INDEXING_FENCES = "signal:block_validate_indexing_fences"

View File

@@ -47,6 +47,7 @@ from onyx.server.features.build.session.manager import UploadLimitExceededError
from onyx.server.features.build.utils import sanitize_filename
from onyx.server.features.build.utils import validate_file
from onyx.utils.logger import setup_logger
from onyx.utils.threadpool_concurrency import run_with_timeout
from shared_configs.contextvars import get_current_tenant_id
logger = setup_logger()
@@ -328,6 +329,9 @@ def delete_session(
# Lock timeout should be longer than max restore time (5 minutes)
RESTORE_LOCK_TIMEOUT_SECONDS = 300
# Per-operation timeout (provision, snapshot restore, etc.)
# If more than this, probably failed to restore.
RESTORE_TIMEOUT_SECONDS = 120
@router.post("/{session_id}/restore", response_model=DetailedSessionResponse)
@@ -418,7 +422,9 @@ def restore_session(
)
db_session.commit()
sandbox_manager.provision(
run_with_timeout(
RESTORE_TIMEOUT_SECONDS,
sandbox_manager.provision,
sandbox_id=sandbox.id,
user_id=user.id,
tenant_id=tenant_id,
@@ -451,7 +457,9 @@ def restore_session(
if snapshot:
try:
sandbox_manager.restore_snapshot(
run_with_timeout(
RESTORE_TIMEOUT_SECONDS,
sandbox_manager.restore_snapshot,
sandbox_id=sandbox.id,
session_id=session_id,
snapshot_storage_path=snapshot.storage_path,
@@ -462,6 +470,8 @@ def restore_session(
)
session.status = BuildSessionStatus.ACTIVE
db_session.commit()
except TimeoutError:
raise
except Exception as e:
logger.error(
f"Snapshot restore failed for session {session_id}: {e}"
@@ -471,7 +481,9 @@ def restore_session(
raise
else:
# No snapshot - set up fresh workspace
sandbox_manager.setup_session_workspace(
run_with_timeout(
RESTORE_TIMEOUT_SECONDS,
sandbox_manager.setup_session_workspace,
sandbox_id=sandbox.id,
session_id=session_id,
llm_config=llm_config,
@@ -485,17 +497,29 @@ def restore_session(
f"re-provision, expected RUNNING"
)
except TimeoutError as e:
# Do NOT release the Redis lock here. The timed-out operation is
# still running in a background thread. Releasing the lock would
# allow a concurrent restore to start on the same sandbox. The
# lock's TTL (RESTORE_LOCK_TIMEOUT_SECONDS) will expire it once
# the orphaned operation has had time to finish.
logger.error(f"Restore timed out for session {session_id}: {e}")
raise HTTPException(
status_code=504,
detail="Sandbox restore timed out",
)
except Exception as e:
logger.error(f"Failed to restore session {session_id}: {e}", exc_info=True)
if lock.owned():
lock.release()
raise HTTPException(
status_code=500,
detail=f"Failed to restore session: {e}",
)
finally:
if lock.owned():
lock.release()
# Update heartbeat to mark sandbox as active after successful restore
# Success - release the lock and update heartbeat
if lock.owned():
lock.release()
update_sandbox_heartbeat(db_session, sandbox.id)
base_response = SessionResponse.from_model(session, sandbox)

View File

@@ -416,6 +416,7 @@ class SandboxManager(ABC):
sandbox_id: UUID,
user_id: UUID,
tenant_id: str,
source: str | None = None,
) -> bool:
"""Sync files from S3 to the sandbox's /workspace/files directory.
@@ -428,6 +429,9 @@ class SandboxManager(ABC):
sandbox_id: The sandbox UUID
user_id: The user ID (for S3 path construction)
tenant_id: The tenant ID (for S3 path construction)
source: Optional source type (e.g., "gmail", "google_drive").
If None, syncs all sources. If specified, only syncs
that source's directory.
Returns:
True if sync was successful, False otherwise.

View File

@@ -361,45 +361,33 @@ class KubernetesSandboxManager(SandboxManager):
command=["/bin/sh", "-c"],
args=[
f"""
# Handle SIGTERM for fast container termination
trap 'echo "Received SIGTERM, exiting"; exit 0' TERM
# Handle signals for graceful container termination
trap 'echo "Shutting down"; exit 0' TERM INT
# Initial sync on startup - sync knowledge files for this user/tenant
echo "Starting initial file sync for tenant: {tenant_id} / user: {user_id}"
echo "S3 source: s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/"
echo "Starting initial file sync"
echo "S3: s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/*"
echo "Local: /workspace/files/"
# s5cmd sync: high-performance parallel S3 sync (default 256 workers)
# Capture both stdout and stderr to see all messages including errors
# s5cmd sync (default 256 workers)
# Exit codes: 0=success, 1=success with warnings
sync_exit_code=0
sync_output=$(mktemp)
/s5cmd --log debug --stat sync \
/s5cmd --stat sync \
"s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/*" \
/workspace/files/ 2>&1 | tee "$sync_output" || sync_exit_code=$?
/workspace/files/ 2>&1 || sync_exit_code=$?
echo "=== S3 sync finished with exit code: $sync_exit_code ==="
echo "=== Initial sync finished (exit code: $sync_exit_code) ==="
# Count files synced
file_count=$(find /workspace/files -type f | wc -l)
echo "Total files in /workspace/files: $file_count"
# Show summary of any errors from the output
if [ $sync_exit_code -ne 0 ]; then
echo "=== Errors/warnings from sync ==="
grep -iE "error|warn|fail" "$sync_output" || echo "No errors found"
echo "=========================="
fi
rm -f "$sync_output"
# Exit codes 0 and 1 are considered success (1 = success with warnings)
# Handle result
if [ $sync_exit_code -eq 0 ] || [ $sync_exit_code -eq 1 ]; then
echo "Sync complete (exit $sync_exit_code), staying alive for incremental syncs"
file_count=$(find /workspace/files -type f 2>/dev/null | wc -l)
echo "Files synced: $file_count"
echo "Sidecar ready for incremental syncs"
else
echo "ERROR: Sync failed with exit code: $sync_exit_code"
echo "ERROR: Initial sync failed (exit code: $sync_exit_code)"
exit $sync_exit_code
fi
# Stay alive - incremental sync commands will be executed via kubectl exec
# Use 'wait' so shell can respond to signals while sleeping
# Stay alive for incremental syncs via kubectl exec
while true; do
sleep 30 &
wait $!
@@ -2011,6 +1999,7 @@ echo "Session config regeneration complete"
sandbox_id: UUID,
user_id: UUID,
tenant_id: str,
source: str | None = None,
) -> bool:
"""Sync files from S3 to the running pod via the file-sync sidecar.
@@ -2023,21 +2012,66 @@ echo "Session config regeneration complete"
sandbox_id: The sandbox UUID
user_id: The user ID (for S3 path construction)
tenant_id: The tenant ID (for S3 path construction)
source: Optional source type (e.g., "gmail", "google_drive").
If None, syncs all sources. If specified, only syncs
that source's directory.
Returns:
True if sync was successful, False otherwise.
"""
pod_name = self._get_pod_name(str(sandbox_id))
# s5cmd sync: high-performance parallel S3 sync (default 256 workers)
# --stat shows transfer statistics for monitoring
s3_path = f"s3://{self._s3_bucket}/{tenant_id}/knowledge/{str(user_id)}/*"
sync_command = [
"/bin/sh",
"-c",
f'/s5cmd --log debug --stat sync "{s3_path}" /workspace/files/; '
f'echo "Files in workspace: $(find /workspace/files -type f | wc -l)"',
]
# Build S3 path based on whether source is specified
if source:
# Sync only the specific source directory
s3_path = f"s3://{self._s3_bucket}/{tenant_id}/knowledge/{str(user_id)}/{source}/*"
local_path = f"/workspace/files/{source}/"
else:
# Sync all sources (original behavior)
s3_path = f"s3://{self._s3_bucket}/{tenant_id}/knowledge/{str(user_id)}/*"
local_path = "/workspace/files/"
# s5cmd sync: high-performance parallel S3 sync
# --delete: mirror S3 to local (remove files that no longer exist in source)
# timeout: prevent zombie processes from kubectl exec disconnections
# trap: kill child processes on exit/disconnect
source_info = f" (source={source})" if source else ""
sync_script = f"""
# Kill child processes on exit/disconnect to prevent zombie s5cmd workers
cleanup() {{ pkill -P $$ 2>/dev/null || true; }}
trap cleanup EXIT INT TERM
echo "Starting incremental file sync{source_info}"
echo "S3: {s3_path}"
echo "Local: {local_path}"
# Ensure destination exists (needed for source-specific syncs)
mkdir -p "{local_path}"
# Run s5cmd with 5-minute timeout (SIGKILL after 10s if SIGTERM ignored)
# Exit codes: 0=success, 1=success with warnings, 124=timeout
sync_exit_code=0
timeout --signal=TERM --kill-after=10s 5m \
/s5cmd --stat sync --delete "{s3_path}" "{local_path}" 2>&1 || sync_exit_code=$?
echo "=== Sync finished (exit code: $sync_exit_code) ==="
# Handle result
if [ $sync_exit_code -eq 0 ] || [ $sync_exit_code -eq 1 ]; then
file_count=$(find "{local_path}" -type f 2>/dev/null | wc -l)
echo "Files in {local_path}: $file_count"
echo "SYNC_SUCCESS"
elif [ $sync_exit_code -eq 124 ]; then
echo "ERROR: Sync timed out after 5 minutes"
echo "SYNC_FAILED"
exit 1
else
echo "ERROR: Sync failed (exit code: $sync_exit_code)"
echo "SYNC_FAILED"
exit $sync_exit_code
fi
"""
sync_command = ["/bin/sh", "-c", sync_script]
resp = k8s_stream(
self._stream_core_api.connect_get_namespaced_pod_exec,
pod_name,
@@ -2050,6 +2084,11 @@ echo "Session config regeneration complete"
tty=False,
)
logger.debug(f"File sync response: {resp}")
# Check if sync succeeded based on output markers
if "SYNC_FAILED" in resp:
logger.warning(f"File sync failed for sandbox {sandbox_id}")
return False
return True
def _ensure_agents_md_attachments_section(

View File

@@ -1056,6 +1056,7 @@ class LocalSandboxManager(SandboxManager):
sandbox_id: UUID,
user_id: UUID, # noqa: ARG002
tenant_id: str, # noqa: ARG002
source: str | None = None, # noqa: ARG002
) -> bool:
"""No-op for local mode - files are directly accessible via symlink.
@@ -1066,9 +1067,13 @@ class LocalSandboxManager(SandboxManager):
sandbox_id: The sandbox UUID (unused)
user_id: The user ID (unused)
tenant_id: The tenant ID (unused)
source: The source type (unused in local mode)
Returns:
True (always succeeds since no sync is needed)
"""
logger.debug(f"sync_files called for local sandbox {sandbox_id} - no-op")
source_info = f" source={source}" if source else ""
logger.debug(
f"sync_files called for local sandbox {sandbox_id}{source_info} - no-op"
)
return True

View File

@@ -1,5 +1,7 @@
"""Celery tasks for sandbox operations (cleanup, file sync, etc.)."""
from collections.abc import Iterator
from contextlib import contextmanager
from uuid import UUID
from celery import shared_task
@@ -7,6 +9,7 @@ from celery import Task
from redis.lock import Lock as RedisLock
from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.constants import CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine.sql_engine import get_session_with_current_tenant
@@ -247,6 +250,19 @@ def _list_session_directories(
return []
@contextmanager
def _acquire_sandbox_file_sync_lock(lock: RedisLock) -> Iterator[bool]:
"""Acquire the sandbox file-sync lock with blocking timeout; release on exit."""
acquired = lock.acquire(
blocking_timeout=CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT,
)
try:
yield acquired
finally:
if lock.owned():
lock.release()
@shared_task(
name=OnyxCeleryTask.SANDBOX_FILE_SYNC,
soft_time_limit=TIMEOUT_SECONDS,
@@ -254,7 +270,11 @@ def _list_session_directories(
ignore_result=True,
)
def sync_sandbox_files(
self: Task, *, user_id: str, tenant_id: str # noqa: ARG001
self: Task, # noqa: ARG001
*,
user_id: str,
tenant_id: str,
source: str | None = None,
) -> bool:
"""Sync files from S3 to a user's running sandbox.
@@ -262,46 +282,63 @@ def sync_sandbox_files(
It executes `s5cmd sync` in the file-sync sidecar container to download
any new or changed files.
This is safe to call multiple times - s5cmd sync is idempotent.
Per-user locking ensures only one sync runs at a time for a given user.
If a sync is already in progress, this task will wait until it completes.
Args:
user_id: The user ID whose sandbox should be synced
tenant_id: The tenant ID for S3 path construction
source: Optional source type (e.g., "gmail", "google_drive").
If None, syncs all sources.
Returns:
True if sync was successful, False if skipped or failed
"""
source_info = f" source={source}" if source else " (all sources)"
task_logger.info(
f"sync_sandbox_files starting for user {user_id} in tenant {tenant_id}"
f"{source_info}"
)
with get_session_with_current_tenant() as db_session:
sandbox = get_sandbox_by_user_id(db_session, UUID(user_id))
lock_timeout = CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT
redis_client = get_redis_client(tenant_id=tenant_id)
lock = redis_client.lock(
f"{OnyxRedisLocks.SANDBOX_FILE_SYNC_LOCK_PREFIX}:{user_id}",
timeout=lock_timeout,
)
if sandbox is None:
task_logger.debug(f"No sandbox found for user {user_id}, skipping sync")
return False
if sandbox.status != SandboxStatus.RUNNING:
task_logger.debug(
f"Sandbox {sandbox.id} not running (status={sandbox.status}), "
f"skipping sync"
with _acquire_sandbox_file_sync_lock(lock) as acquired:
if not acquired:
task_logger.warning(
f"sync_sandbox_files - failed to acquire lock for user {user_id} "
f"after {lock_timeout}s, skipping"
)
return False
sandbox_manager = get_sandbox_manager()
result = sandbox_manager.sync_files(
sandbox_id=sandbox.id,
user_id=UUID(user_id),
tenant_id=tenant_id,
)
with get_session_with_current_tenant() as db_session:
sandbox = get_sandbox_by_user_id(db_session, UUID(user_id))
if sandbox is None:
task_logger.debug(f"No sandbox found for user {user_id}, skipping sync")
return False
if sandbox.status != SandboxStatus.RUNNING:
task_logger.debug(
f"Sandbox {sandbox.id} not running (status={sandbox.status}), "
f"skipping sync"
)
return False
if result:
task_logger.info(f"File sync completed for user {user_id}")
else:
task_logger.warning(f"File sync failed for user {user_id}")
return result
sandbox_manager = get_sandbox_manager()
result = sandbox_manager.sync_files(
sandbox_id=sandbox.id,
user_id=UUID(user_id),
tenant_id=tenant_id,
source=source,
)
if result:
task_logger.info(f"File sync completed for user {user_id}{source_info}")
else:
task_logger.warning(f"File sync failed for user {user_id}{source_info}")
return result
# NOTE: in the future, may need to add this. For now, will do manual cleanup.

View File

@@ -1221,27 +1221,29 @@ export const useBuildSessionStore = create<BuildSessionStore>()((set, get) => ({
!sessionData.session_loaded_in_sandbox);
if (needsRestore) {
// Update UI: show sandbox as "restoring" and session as loading
// Show sandbox as "restoring" while we load messages + restore
updateSessionData(sessionId, {
status: "creating",
sandbox: sessionData.sandbox
? { ...sessionData.sandbox, status: "restoring" }
: null,
});
// Call restore endpoint (blocks until complete)
sessionData = await restoreSession(sessionId);
// Clear the "creating" loading indicator so subsequent logic
// doesn't mistake this for an active streaming session.
updateSessionData(sessionId, { status: "idle" });
}
// Now fetch messages and artifacts
const [messages, artifacts] = await Promise.all([
fetchMessages(sessionId),
fetchArtifacts(sessionId),
]);
// Fetch messages from DB first - they don't depend on the sandbox
// being running. This ensures message history is visible immediately,
// even while a sandbox restore is in progress.
// NOTE: artifacts require sandbox filesystem access, so only fetch
// them when the sandbox is already running.
const messages = await fetchMessages(sessionId);
const artifacts = needsRestore ? [] : await fetchArtifacts(sessionId);
// If session is already streaming (e.g. pre-provisioned flow),
// preserve its current messages and status. Otherwise use DB messages.
const currentSession = get().sessions.get(sessionId);
const isStreaming =
currentSession?.status === "running" ||
currentSession?.status === "creating";
// Construct webapp URL if sandbox has a Next.js port and there's a webapp artifact
let webappUrl: string | null = null;
@@ -1252,50 +1254,62 @@ export const useBuildSessionStore = create<BuildSessionStore>()((set, get) => ({
webappUrl = `http://localhost:${sessionData.sandbox.nextjs_port}`;
}
// Re-fetch existing session to check for optimistic messages
const currentSession = get().sessions.get(sessionId);
const hasOptimisticMessages = (currentSession?.messages.length ?? 0) > 0;
const isCurrentlyStreaming =
currentSession?.status === "running" ||
currentSession?.status === "creating";
// Consolidate messages into proper conversation turns
// Each assistant turn becomes a single message with streamItems in metadata
// If there are optimistic messages (active streaming), preserve current state
const messagesToUse = hasOptimisticMessages
const status = isStreaming
? currentSession!.status
: needsRestore
? "creating"
: sessionData.status === "active"
? "active"
: "idle";
const resolvedMessages = isStreaming
? currentSession!.messages
: consolidateMessagesIntoTurns(messages);
// Session-level streamItems are only for current streaming response
// When loading from history, they should be empty (each message has its own streamItems)
const streamItemsToUse = hasOptimisticMessages
? currentSession!.streamItems
: [];
// Preserve streaming status if currently streaming, otherwise use backend status
const statusToUse = isCurrentlyStreaming
? currentSession!.status
: sessionData.status === "active"
? "active"
: "idle";
const streamItems = isStreaming ? currentSession!.streamItems : [];
const sandbox =
needsRestore && sessionData.sandbox
? { ...sessionData.sandbox, status: "restoring" as const }
: sessionData.sandbox;
updateSessionData(sessionId, {
status: statusToUse,
// Preserve optimistic messages if they exist (e.g., from pre-provisioned flow)
messages: messagesToUse,
streamItems: streamItemsToUse,
status,
messages: resolvedMessages,
streamItems,
artifacts,
webappUrl,
sandbox: sessionData.sandbox,
sandbox,
error: null,
isLoaded: true,
// After restore, bump webappNeedsRefresh so OutputPanel's SWR refetches
// webapp-info. Done here (not earlier) so all session data is set atomically.
...(needsRestore
? {
webappNeedsRefresh:
(get().sessions.get(sessionId)?.webappNeedsRefresh || 0) + 1,
}
: {}),
});
// Now restore the sandbox if needed (messages are already visible).
// The backend enforces a timeout and returns an error if restore
// takes too long, so no frontend timeout needed here.
if (needsRestore) {
try {
sessionData = await restoreSession(sessionId);
// Sandbox is now running - fetch artifacts
const restoredArtifacts = await fetchArtifacts(sessionId);
updateSessionData(sessionId, {
status: sessionData.status === "active" ? "active" : "idle",
artifacts: restoredArtifacts,
sandbox: sessionData.sandbox,
// Bump so OutputPanel's SWR refetches webapp-info (which
// derives the actual webappUrl from the backend).
webappNeedsRefresh:
(get().sessions.get(sessionId)?.webappNeedsRefresh || 0) + 1,
});
} catch (restoreErr) {
console.error("Sandbox restore failed:", restoreErr);
updateSessionData(sessionId, {
status: "idle",
sandbox: sessionData.sandbox
? { ...sessionData.sandbox, status: "failed" }
: null,
});
}
}
} catch (err) {
console.error("Failed to load session:", err);
updateSessionData(sessionId, {