mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-03-01 05:35:46 +00:00
Compare commits
11 Commits
embed_imag
...
experiment
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
03e904be4a | ||
|
|
ecf3c34207 | ||
|
|
42a589e141 | ||
|
|
dd0405998d | ||
|
|
e55fbc47d4 | ||
|
|
4321e2b922 | ||
|
|
1cab1f54b5 | ||
|
|
8896af2303 | ||
|
|
40005e0a09 | ||
|
|
03bfc14f1c | ||
|
|
e7e4c96ccc |
@@ -817,17 +817,19 @@ def connector_document_extraction(
|
||||
if processing_mode == ProcessingMode.FILE_SYSTEM:
|
||||
creator_id = index_attempt.connector_credential_pair.creator_id
|
||||
if creator_id:
|
||||
source_value = db_connector.source.value
|
||||
app.send_task(
|
||||
OnyxCeleryTask.SANDBOX_FILE_SYNC,
|
||||
kwargs={
|
||||
"user_id": str(creator_id),
|
||||
"tenant_id": tenant_id,
|
||||
"source": source_value,
|
||||
},
|
||||
queue=OnyxCeleryQueues.SANDBOX,
|
||||
)
|
||||
logger.info(
|
||||
f"Triggered sandbox file sync for user {creator_id} "
|
||||
f"after indexing complete"
|
||||
f"source={source_value} after indexing complete"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -160,6 +160,8 @@ CELERY_USER_FILE_PROCESSING_LOCK_TIMEOUT = 30 * 60 # 30 minutes (in seconds)
|
||||
|
||||
CELERY_USER_FILE_PROJECT_SYNC_LOCK_TIMEOUT = 5 * 60 # 5 minutes (in seconds)
|
||||
|
||||
CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT = 5 * 60 # 5 minutes (in seconds)
|
||||
|
||||
DANSWER_REDIS_FUNCTION_LOCK_PREFIX = "da_function_lock:"
|
||||
|
||||
TMP_DRALPHA_PERSONA_NAME = "KG Beta"
|
||||
@@ -447,6 +449,9 @@ class OnyxRedisLocks:
|
||||
CLEANUP_IDLE_SANDBOXES_BEAT_LOCK = "da_lock:cleanup_idle_sandboxes_beat"
|
||||
CLEANUP_OLD_SNAPSHOTS_BEAT_LOCK = "da_lock:cleanup_old_snapshots_beat"
|
||||
|
||||
# Sandbox file sync
|
||||
SANDBOX_FILE_SYNC_LOCK_PREFIX = "da_lock:sandbox_file_sync"
|
||||
|
||||
|
||||
class OnyxRedisSignals:
|
||||
BLOCK_VALIDATE_INDEXING_FENCES = "signal:block_validate_indexing_fences"
|
||||
|
||||
@@ -47,6 +47,7 @@ from onyx.server.features.build.session.manager import UploadLimitExceededError
|
||||
from onyx.server.features.build.utils import sanitize_filename
|
||||
from onyx.server.features.build.utils import validate_file
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.threadpool_concurrency import run_with_timeout
|
||||
from shared_configs.contextvars import get_current_tenant_id
|
||||
|
||||
logger = setup_logger()
|
||||
@@ -328,6 +329,9 @@ def delete_session(
|
||||
|
||||
# Lock timeout should be longer than max restore time (5 minutes)
|
||||
RESTORE_LOCK_TIMEOUT_SECONDS = 300
|
||||
# Per-operation timeout (provision, snapshot restore, etc.)
|
||||
# If more than this, probably failed to restore.
|
||||
RESTORE_TIMEOUT_SECONDS = 120
|
||||
|
||||
|
||||
@router.post("/{session_id}/restore", response_model=DetailedSessionResponse)
|
||||
@@ -418,7 +422,9 @@ def restore_session(
|
||||
)
|
||||
db_session.commit()
|
||||
|
||||
sandbox_manager.provision(
|
||||
run_with_timeout(
|
||||
RESTORE_TIMEOUT_SECONDS,
|
||||
sandbox_manager.provision,
|
||||
sandbox_id=sandbox.id,
|
||||
user_id=user.id,
|
||||
tenant_id=tenant_id,
|
||||
@@ -451,7 +457,9 @@ def restore_session(
|
||||
|
||||
if snapshot:
|
||||
try:
|
||||
sandbox_manager.restore_snapshot(
|
||||
run_with_timeout(
|
||||
RESTORE_TIMEOUT_SECONDS,
|
||||
sandbox_manager.restore_snapshot,
|
||||
sandbox_id=sandbox.id,
|
||||
session_id=session_id,
|
||||
snapshot_storage_path=snapshot.storage_path,
|
||||
@@ -462,6 +470,8 @@ def restore_session(
|
||||
)
|
||||
session.status = BuildSessionStatus.ACTIVE
|
||||
db_session.commit()
|
||||
except TimeoutError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Snapshot restore failed for session {session_id}: {e}"
|
||||
@@ -471,7 +481,9 @@ def restore_session(
|
||||
raise
|
||||
else:
|
||||
# No snapshot - set up fresh workspace
|
||||
sandbox_manager.setup_session_workspace(
|
||||
run_with_timeout(
|
||||
RESTORE_TIMEOUT_SECONDS,
|
||||
sandbox_manager.setup_session_workspace,
|
||||
sandbox_id=sandbox.id,
|
||||
session_id=session_id,
|
||||
llm_config=llm_config,
|
||||
@@ -485,17 +497,29 @@ def restore_session(
|
||||
f"re-provision, expected RUNNING"
|
||||
)
|
||||
|
||||
except TimeoutError as e:
|
||||
# Do NOT release the Redis lock here. The timed-out operation is
|
||||
# still running in a background thread. Releasing the lock would
|
||||
# allow a concurrent restore to start on the same sandbox. The
|
||||
# lock's TTL (RESTORE_LOCK_TIMEOUT_SECONDS) will expire it once
|
||||
# the orphaned operation has had time to finish.
|
||||
logger.error(f"Restore timed out for session {session_id}: {e}")
|
||||
raise HTTPException(
|
||||
status_code=504,
|
||||
detail="Sandbox restore timed out",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to restore session {session_id}: {e}", exc_info=True)
|
||||
if lock.owned():
|
||||
lock.release()
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to restore session: {e}",
|
||||
)
|
||||
finally:
|
||||
if lock.owned():
|
||||
lock.release()
|
||||
|
||||
# Update heartbeat to mark sandbox as active after successful restore
|
||||
# Success - release the lock and update heartbeat
|
||||
if lock.owned():
|
||||
lock.release()
|
||||
update_sandbox_heartbeat(db_session, sandbox.id)
|
||||
|
||||
base_response = SessionResponse.from_model(session, sandbox)
|
||||
|
||||
@@ -416,6 +416,7 @@ class SandboxManager(ABC):
|
||||
sandbox_id: UUID,
|
||||
user_id: UUID,
|
||||
tenant_id: str,
|
||||
source: str | None = None,
|
||||
) -> bool:
|
||||
"""Sync files from S3 to the sandbox's /workspace/files directory.
|
||||
|
||||
@@ -428,6 +429,9 @@ class SandboxManager(ABC):
|
||||
sandbox_id: The sandbox UUID
|
||||
user_id: The user ID (for S3 path construction)
|
||||
tenant_id: The tenant ID (for S3 path construction)
|
||||
source: Optional source type (e.g., "gmail", "google_drive").
|
||||
If None, syncs all sources. If specified, only syncs
|
||||
that source's directory.
|
||||
|
||||
Returns:
|
||||
True if sync was successful, False otherwise.
|
||||
|
||||
@@ -361,45 +361,33 @@ class KubernetesSandboxManager(SandboxManager):
|
||||
command=["/bin/sh", "-c"],
|
||||
args=[
|
||||
f"""
|
||||
# Handle SIGTERM for fast container termination
|
||||
trap 'echo "Received SIGTERM, exiting"; exit 0' TERM
|
||||
# Handle signals for graceful container termination
|
||||
trap 'echo "Shutting down"; exit 0' TERM INT
|
||||
|
||||
# Initial sync on startup - sync knowledge files for this user/tenant
|
||||
echo "Starting initial file sync for tenant: {tenant_id} / user: {user_id}"
|
||||
echo "S3 source: s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/"
|
||||
echo "Starting initial file sync"
|
||||
echo "S3: s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/*"
|
||||
echo "Local: /workspace/files/"
|
||||
|
||||
# s5cmd sync: high-performance parallel S3 sync (default 256 workers)
|
||||
# Capture both stdout and stderr to see all messages including errors
|
||||
# s5cmd sync (default 256 workers)
|
||||
# Exit codes: 0=success, 1=success with warnings
|
||||
sync_exit_code=0
|
||||
sync_output=$(mktemp)
|
||||
/s5cmd --log debug --stat sync \
|
||||
/s5cmd --stat sync \
|
||||
"s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/*" \
|
||||
/workspace/files/ 2>&1 | tee "$sync_output" || sync_exit_code=$?
|
||||
/workspace/files/ 2>&1 || sync_exit_code=$?
|
||||
|
||||
echo "=== S3 sync finished with exit code: $sync_exit_code ==="
|
||||
echo "=== Initial sync finished (exit code: $sync_exit_code) ==="
|
||||
|
||||
# Count files synced
|
||||
file_count=$(find /workspace/files -type f | wc -l)
|
||||
echo "Total files in /workspace/files: $file_count"
|
||||
|
||||
# Show summary of any errors from the output
|
||||
if [ $sync_exit_code -ne 0 ]; then
|
||||
echo "=== Errors/warnings from sync ==="
|
||||
grep -iE "error|warn|fail" "$sync_output" || echo "No errors found"
|
||||
echo "=========================="
|
||||
fi
|
||||
rm -f "$sync_output"
|
||||
|
||||
# Exit codes 0 and 1 are considered success (1 = success with warnings)
|
||||
# Handle result
|
||||
if [ $sync_exit_code -eq 0 ] || [ $sync_exit_code -eq 1 ]; then
|
||||
echo "Sync complete (exit $sync_exit_code), staying alive for incremental syncs"
|
||||
file_count=$(find /workspace/files -type f 2>/dev/null | wc -l)
|
||||
echo "Files synced: $file_count"
|
||||
echo "Sidecar ready for incremental syncs"
|
||||
else
|
||||
echo "ERROR: Sync failed with exit code: $sync_exit_code"
|
||||
echo "ERROR: Initial sync failed (exit code: $sync_exit_code)"
|
||||
exit $sync_exit_code
|
||||
fi
|
||||
|
||||
# Stay alive - incremental sync commands will be executed via kubectl exec
|
||||
# Use 'wait' so shell can respond to signals while sleeping
|
||||
# Stay alive for incremental syncs via kubectl exec
|
||||
while true; do
|
||||
sleep 30 &
|
||||
wait $!
|
||||
@@ -2011,6 +1999,7 @@ echo "Session config regeneration complete"
|
||||
sandbox_id: UUID,
|
||||
user_id: UUID,
|
||||
tenant_id: str,
|
||||
source: str | None = None,
|
||||
) -> bool:
|
||||
"""Sync files from S3 to the running pod via the file-sync sidecar.
|
||||
|
||||
@@ -2023,21 +2012,66 @@ echo "Session config regeneration complete"
|
||||
sandbox_id: The sandbox UUID
|
||||
user_id: The user ID (for S3 path construction)
|
||||
tenant_id: The tenant ID (for S3 path construction)
|
||||
source: Optional source type (e.g., "gmail", "google_drive").
|
||||
If None, syncs all sources. If specified, only syncs
|
||||
that source's directory.
|
||||
|
||||
Returns:
|
||||
True if sync was successful, False otherwise.
|
||||
"""
|
||||
pod_name = self._get_pod_name(str(sandbox_id))
|
||||
|
||||
# s5cmd sync: high-performance parallel S3 sync (default 256 workers)
|
||||
# --stat shows transfer statistics for monitoring
|
||||
s3_path = f"s3://{self._s3_bucket}/{tenant_id}/knowledge/{str(user_id)}/*"
|
||||
sync_command = [
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
f'/s5cmd --log debug --stat sync "{s3_path}" /workspace/files/; '
|
||||
f'echo "Files in workspace: $(find /workspace/files -type f | wc -l)"',
|
||||
]
|
||||
# Build S3 path based on whether source is specified
|
||||
if source:
|
||||
# Sync only the specific source directory
|
||||
s3_path = f"s3://{self._s3_bucket}/{tenant_id}/knowledge/{str(user_id)}/{source}/*"
|
||||
local_path = f"/workspace/files/{source}/"
|
||||
else:
|
||||
# Sync all sources (original behavior)
|
||||
s3_path = f"s3://{self._s3_bucket}/{tenant_id}/knowledge/{str(user_id)}/*"
|
||||
local_path = "/workspace/files/"
|
||||
|
||||
# s5cmd sync: high-performance parallel S3 sync
|
||||
# --delete: mirror S3 to local (remove files that no longer exist in source)
|
||||
# timeout: prevent zombie processes from kubectl exec disconnections
|
||||
# trap: kill child processes on exit/disconnect
|
||||
source_info = f" (source={source})" if source else ""
|
||||
sync_script = f"""
|
||||
# Kill child processes on exit/disconnect to prevent zombie s5cmd workers
|
||||
cleanup() {{ pkill -P $$ 2>/dev/null || true; }}
|
||||
trap cleanup EXIT INT TERM
|
||||
|
||||
echo "Starting incremental file sync{source_info}"
|
||||
echo "S3: {s3_path}"
|
||||
echo "Local: {local_path}"
|
||||
|
||||
# Ensure destination exists (needed for source-specific syncs)
|
||||
mkdir -p "{local_path}"
|
||||
|
||||
# Run s5cmd with 5-minute timeout (SIGKILL after 10s if SIGTERM ignored)
|
||||
# Exit codes: 0=success, 1=success with warnings, 124=timeout
|
||||
sync_exit_code=0
|
||||
timeout --signal=TERM --kill-after=10s 5m \
|
||||
/s5cmd --stat sync --delete "{s3_path}" "{local_path}" 2>&1 || sync_exit_code=$?
|
||||
|
||||
echo "=== Sync finished (exit code: $sync_exit_code) ==="
|
||||
|
||||
# Handle result
|
||||
if [ $sync_exit_code -eq 0 ] || [ $sync_exit_code -eq 1 ]; then
|
||||
file_count=$(find "{local_path}" -type f 2>/dev/null | wc -l)
|
||||
echo "Files in {local_path}: $file_count"
|
||||
echo "SYNC_SUCCESS"
|
||||
elif [ $sync_exit_code -eq 124 ]; then
|
||||
echo "ERROR: Sync timed out after 5 minutes"
|
||||
echo "SYNC_FAILED"
|
||||
exit 1
|
||||
else
|
||||
echo "ERROR: Sync failed (exit code: $sync_exit_code)"
|
||||
echo "SYNC_FAILED"
|
||||
exit $sync_exit_code
|
||||
fi
|
||||
"""
|
||||
sync_command = ["/bin/sh", "-c", sync_script]
|
||||
resp = k8s_stream(
|
||||
self._stream_core_api.connect_get_namespaced_pod_exec,
|
||||
pod_name,
|
||||
@@ -2050,6 +2084,11 @@ echo "Session config regeneration complete"
|
||||
tty=False,
|
||||
)
|
||||
logger.debug(f"File sync response: {resp}")
|
||||
|
||||
# Check if sync succeeded based on output markers
|
||||
if "SYNC_FAILED" in resp:
|
||||
logger.warning(f"File sync failed for sandbox {sandbox_id}")
|
||||
return False
|
||||
return True
|
||||
|
||||
def _ensure_agents_md_attachments_section(
|
||||
|
||||
@@ -1056,6 +1056,7 @@ class LocalSandboxManager(SandboxManager):
|
||||
sandbox_id: UUID,
|
||||
user_id: UUID, # noqa: ARG002
|
||||
tenant_id: str, # noqa: ARG002
|
||||
source: str | None = None, # noqa: ARG002
|
||||
) -> bool:
|
||||
"""No-op for local mode - files are directly accessible via symlink.
|
||||
|
||||
@@ -1066,9 +1067,13 @@ class LocalSandboxManager(SandboxManager):
|
||||
sandbox_id: The sandbox UUID (unused)
|
||||
user_id: The user ID (unused)
|
||||
tenant_id: The tenant ID (unused)
|
||||
source: The source type (unused in local mode)
|
||||
|
||||
Returns:
|
||||
True (always succeeds since no sync is needed)
|
||||
"""
|
||||
logger.debug(f"sync_files called for local sandbox {sandbox_id} - no-op")
|
||||
source_info = f" source={source}" if source else ""
|
||||
logger.debug(
|
||||
f"sync_files called for local sandbox {sandbox_id}{source_info} - no-op"
|
||||
)
|
||||
return True
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
"""Celery tasks for sandbox operations (cleanup, file sync, etc.)."""
|
||||
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from uuid import UUID
|
||||
|
||||
from celery import shared_task
|
||||
@@ -7,6 +9,7 @@ from celery import Task
|
||||
from redis.lock import Lock as RedisLock
|
||||
|
||||
from onyx.background.celery.apps.app_base import task_logger
|
||||
from onyx.configs.constants import CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT
|
||||
from onyx.configs.constants import OnyxCeleryTask
|
||||
from onyx.configs.constants import OnyxRedisLocks
|
||||
from onyx.db.engine.sql_engine import get_session_with_current_tenant
|
||||
@@ -247,6 +250,19 @@ def _list_session_directories(
|
||||
return []
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _acquire_sandbox_file_sync_lock(lock: RedisLock) -> Iterator[bool]:
|
||||
"""Acquire the sandbox file-sync lock with blocking timeout; release on exit."""
|
||||
acquired = lock.acquire(
|
||||
blocking_timeout=CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT,
|
||||
)
|
||||
try:
|
||||
yield acquired
|
||||
finally:
|
||||
if lock.owned():
|
||||
lock.release()
|
||||
|
||||
|
||||
@shared_task(
|
||||
name=OnyxCeleryTask.SANDBOX_FILE_SYNC,
|
||||
soft_time_limit=TIMEOUT_SECONDS,
|
||||
@@ -254,7 +270,11 @@ def _list_session_directories(
|
||||
ignore_result=True,
|
||||
)
|
||||
def sync_sandbox_files(
|
||||
self: Task, *, user_id: str, tenant_id: str # noqa: ARG001
|
||||
self: Task, # noqa: ARG001
|
||||
*,
|
||||
user_id: str,
|
||||
tenant_id: str,
|
||||
source: str | None = None,
|
||||
) -> bool:
|
||||
"""Sync files from S3 to a user's running sandbox.
|
||||
|
||||
@@ -262,46 +282,63 @@ def sync_sandbox_files(
|
||||
It executes `s5cmd sync` in the file-sync sidecar container to download
|
||||
any new or changed files.
|
||||
|
||||
This is safe to call multiple times - s5cmd sync is idempotent.
|
||||
Per-user locking ensures only one sync runs at a time for a given user.
|
||||
If a sync is already in progress, this task will wait until it completes.
|
||||
|
||||
Args:
|
||||
user_id: The user ID whose sandbox should be synced
|
||||
tenant_id: The tenant ID for S3 path construction
|
||||
source: Optional source type (e.g., "gmail", "google_drive").
|
||||
If None, syncs all sources.
|
||||
|
||||
Returns:
|
||||
True if sync was successful, False if skipped or failed
|
||||
"""
|
||||
source_info = f" source={source}" if source else " (all sources)"
|
||||
task_logger.info(
|
||||
f"sync_sandbox_files starting for user {user_id} in tenant {tenant_id}"
|
||||
f"{source_info}"
|
||||
)
|
||||
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
sandbox = get_sandbox_by_user_id(db_session, UUID(user_id))
|
||||
lock_timeout = CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT
|
||||
redis_client = get_redis_client(tenant_id=tenant_id)
|
||||
lock = redis_client.lock(
|
||||
f"{OnyxRedisLocks.SANDBOX_FILE_SYNC_LOCK_PREFIX}:{user_id}",
|
||||
timeout=lock_timeout,
|
||||
)
|
||||
|
||||
if sandbox is None:
|
||||
task_logger.debug(f"No sandbox found for user {user_id}, skipping sync")
|
||||
return False
|
||||
|
||||
if sandbox.status != SandboxStatus.RUNNING:
|
||||
task_logger.debug(
|
||||
f"Sandbox {sandbox.id} not running (status={sandbox.status}), "
|
||||
f"skipping sync"
|
||||
with _acquire_sandbox_file_sync_lock(lock) as acquired:
|
||||
if not acquired:
|
||||
task_logger.warning(
|
||||
f"sync_sandbox_files - failed to acquire lock for user {user_id} "
|
||||
f"after {lock_timeout}s, skipping"
|
||||
)
|
||||
return False
|
||||
|
||||
sandbox_manager = get_sandbox_manager()
|
||||
result = sandbox_manager.sync_files(
|
||||
sandbox_id=sandbox.id,
|
||||
user_id=UUID(user_id),
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
sandbox = get_sandbox_by_user_id(db_session, UUID(user_id))
|
||||
if sandbox is None:
|
||||
task_logger.debug(f"No sandbox found for user {user_id}, skipping sync")
|
||||
return False
|
||||
if sandbox.status != SandboxStatus.RUNNING:
|
||||
task_logger.debug(
|
||||
f"Sandbox {sandbox.id} not running (status={sandbox.status}), "
|
||||
f"skipping sync"
|
||||
)
|
||||
return False
|
||||
|
||||
if result:
|
||||
task_logger.info(f"File sync completed for user {user_id}")
|
||||
else:
|
||||
task_logger.warning(f"File sync failed for user {user_id}")
|
||||
|
||||
return result
|
||||
sandbox_manager = get_sandbox_manager()
|
||||
result = sandbox_manager.sync_files(
|
||||
sandbox_id=sandbox.id,
|
||||
user_id=UUID(user_id),
|
||||
tenant_id=tenant_id,
|
||||
source=source,
|
||||
)
|
||||
if result:
|
||||
task_logger.info(f"File sync completed for user {user_id}{source_info}")
|
||||
else:
|
||||
task_logger.warning(f"File sync failed for user {user_id}{source_info}")
|
||||
return result
|
||||
|
||||
|
||||
# NOTE: in the future, may need to add this. For now, will do manual cleanup.
|
||||
|
||||
@@ -1221,27 +1221,29 @@ export const useBuildSessionStore = create<BuildSessionStore>()((set, get) => ({
|
||||
!sessionData.session_loaded_in_sandbox);
|
||||
|
||||
if (needsRestore) {
|
||||
// Update UI: show sandbox as "restoring" and session as loading
|
||||
// Show sandbox as "restoring" while we load messages + restore
|
||||
updateSessionData(sessionId, {
|
||||
status: "creating",
|
||||
sandbox: sessionData.sandbox
|
||||
? { ...sessionData.sandbox, status: "restoring" }
|
||||
: null,
|
||||
});
|
||||
|
||||
// Call restore endpoint (blocks until complete)
|
||||
sessionData = await restoreSession(sessionId);
|
||||
|
||||
// Clear the "creating" loading indicator so subsequent logic
|
||||
// doesn't mistake this for an active streaming session.
|
||||
updateSessionData(sessionId, { status: "idle" });
|
||||
}
|
||||
|
||||
// Now fetch messages and artifacts
|
||||
const [messages, artifacts] = await Promise.all([
|
||||
fetchMessages(sessionId),
|
||||
fetchArtifacts(sessionId),
|
||||
]);
|
||||
// Fetch messages from DB first - they don't depend on the sandbox
|
||||
// being running. This ensures message history is visible immediately,
|
||||
// even while a sandbox restore is in progress.
|
||||
// NOTE: artifacts require sandbox filesystem access, so only fetch
|
||||
// them when the sandbox is already running.
|
||||
const messages = await fetchMessages(sessionId);
|
||||
const artifacts = needsRestore ? [] : await fetchArtifacts(sessionId);
|
||||
|
||||
// If session is already streaming (e.g. pre-provisioned flow),
|
||||
// preserve its current messages and status. Otherwise use DB messages.
|
||||
const currentSession = get().sessions.get(sessionId);
|
||||
const isStreaming =
|
||||
currentSession?.status === "running" ||
|
||||
currentSession?.status === "creating";
|
||||
|
||||
// Construct webapp URL if sandbox has a Next.js port and there's a webapp artifact
|
||||
let webappUrl: string | null = null;
|
||||
@@ -1252,50 +1254,62 @@ export const useBuildSessionStore = create<BuildSessionStore>()((set, get) => ({
|
||||
webappUrl = `http://localhost:${sessionData.sandbox.nextjs_port}`;
|
||||
}
|
||||
|
||||
// Re-fetch existing session to check for optimistic messages
|
||||
const currentSession = get().sessions.get(sessionId);
|
||||
const hasOptimisticMessages = (currentSession?.messages.length ?? 0) > 0;
|
||||
const isCurrentlyStreaming =
|
||||
currentSession?.status === "running" ||
|
||||
currentSession?.status === "creating";
|
||||
|
||||
// Consolidate messages into proper conversation turns
|
||||
// Each assistant turn becomes a single message with streamItems in metadata
|
||||
// If there are optimistic messages (active streaming), preserve current state
|
||||
const messagesToUse = hasOptimisticMessages
|
||||
const status = isStreaming
|
||||
? currentSession!.status
|
||||
: needsRestore
|
||||
? "creating"
|
||||
: sessionData.status === "active"
|
||||
? "active"
|
||||
: "idle";
|
||||
const resolvedMessages = isStreaming
|
||||
? currentSession!.messages
|
||||
: consolidateMessagesIntoTurns(messages);
|
||||
// Session-level streamItems are only for current streaming response
|
||||
// When loading from history, they should be empty (each message has its own streamItems)
|
||||
const streamItemsToUse = hasOptimisticMessages
|
||||
? currentSession!.streamItems
|
||||
: [];
|
||||
// Preserve streaming status if currently streaming, otherwise use backend status
|
||||
const statusToUse = isCurrentlyStreaming
|
||||
? currentSession!.status
|
||||
: sessionData.status === "active"
|
||||
? "active"
|
||||
: "idle";
|
||||
const streamItems = isStreaming ? currentSession!.streamItems : [];
|
||||
const sandbox =
|
||||
needsRestore && sessionData.sandbox
|
||||
? { ...sessionData.sandbox, status: "restoring" as const }
|
||||
: sessionData.sandbox;
|
||||
|
||||
updateSessionData(sessionId, {
|
||||
status: statusToUse,
|
||||
// Preserve optimistic messages if they exist (e.g., from pre-provisioned flow)
|
||||
messages: messagesToUse,
|
||||
streamItems: streamItemsToUse,
|
||||
status,
|
||||
messages: resolvedMessages,
|
||||
streamItems,
|
||||
artifacts,
|
||||
webappUrl,
|
||||
sandbox: sessionData.sandbox,
|
||||
sandbox,
|
||||
error: null,
|
||||
isLoaded: true,
|
||||
// After restore, bump webappNeedsRefresh so OutputPanel's SWR refetches
|
||||
// webapp-info. Done here (not earlier) so all session data is set atomically.
|
||||
...(needsRestore
|
||||
? {
|
||||
webappNeedsRefresh:
|
||||
(get().sessions.get(sessionId)?.webappNeedsRefresh || 0) + 1,
|
||||
}
|
||||
: {}),
|
||||
});
|
||||
|
||||
// Now restore the sandbox if needed (messages are already visible).
|
||||
// The backend enforces a timeout and returns an error if restore
|
||||
// takes too long, so no frontend timeout needed here.
|
||||
if (needsRestore) {
|
||||
try {
|
||||
sessionData = await restoreSession(sessionId);
|
||||
|
||||
// Sandbox is now running - fetch artifacts
|
||||
const restoredArtifacts = await fetchArtifacts(sessionId);
|
||||
|
||||
updateSessionData(sessionId, {
|
||||
status: sessionData.status === "active" ? "active" : "idle",
|
||||
artifacts: restoredArtifacts,
|
||||
sandbox: sessionData.sandbox,
|
||||
// Bump so OutputPanel's SWR refetches webapp-info (which
|
||||
// derives the actual webappUrl from the backend).
|
||||
webappNeedsRefresh:
|
||||
(get().sessions.get(sessionId)?.webappNeedsRefresh || 0) + 1,
|
||||
});
|
||||
} catch (restoreErr) {
|
||||
console.error("Sandbox restore failed:", restoreErr);
|
||||
updateSessionData(sessionId, {
|
||||
status: "idle",
|
||||
sandbox: sessionData.sandbox
|
||||
? { ...sessionData.sandbox, status: "failed" }
|
||||
: null,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error("Failed to load session:", err);
|
||||
updateSessionData(sessionId, {
|
||||
|
||||
Reference in New Issue
Block a user