Compare commits

...

21 Commits

Author SHA1 Message Date
rohoswagger
663abf87bb shell script edits 2026-02-10 16:30:40 -08:00
rohoswagger
59f69d0274 debug logs 2026-02-10 16:20:27 -08:00
rohoswagger
d4392e8068 no more deleting 2026-02-10 15:59:10 -08:00
rohoswagger
b37cb9eda6 dont delete files from sandbox 2026-02-10 15:46:29 -08:00
rohoswagger
6d1e4ac374 bug fix 2026-02-10 15:26:14 -08:00
rohoswagger
a6af453f4a prayge 2026-02-10 15:16:06 -08:00
rohoswagger
bd7da750a7 nits 2026-02-10 13:32:56 -08:00
rohoswagger
69590b3eee feat: file/folder connector 2026-02-10 13:32:56 -08:00
rohoswagger
2ead8a2c45 new connector setup 2026-02-10 13:32:56 -08:00
Wenxi Onyx
fe6b70625b fix restore session message retrieval 2026-02-10 13:31:03 -08:00
Wenxi Onyx
03e904be4a simplify useBuildSesssionStore and fetch artifacts after restoration 2026-02-10 12:52:21 -08:00
Wenxi Onyx
ecf3c34207 don't release lock on timeout 2026-02-10 12:52:21 -08:00
Wenxi Onyx
42a589e141 comment 2026-02-10 12:52:21 -08:00
Wenxi Onyx
dd0405998d slight increase to timeout 2026-02-10 12:52:21 -08:00
Wenxi Onyx
e55fbc47d4 feat: add timeout to session restoration 2026-02-10 12:52:21 -08:00
Wenxi Onyx
4321e2b922 fix(craft): messages load before session restore 2026-02-10 12:52:21 -08:00
Wenxi
1cab1f54b5 Merge branch 'main' into whuang/craft-file-sync-lock 2026-02-10 12:52:03 -08:00
Wenxi Onyx
8896af2303 make filesync lock acquisition more elegant 2026-02-10 09:41:13 -08:00
Wenxi Onyx
40005e0a09 fix rebase issue 2026-02-10 09:33:44 -08:00
Wenxi Onyx
03bfc14f1c improve file sync scripts 2026-02-10 09:31:19 -08:00
Wenxi Onyx
e7e4c96ccc feat(craft): narrow file sync to source, prevent concurrent syncs, and use --delete flag on incremental syncs 2026-02-10 09:31:16 -08:00
36 changed files with 2993 additions and 178 deletions

View File

@@ -677,8 +677,19 @@ def connector_document_extraction(
logger.debug(f"Indexing batch of documents: {batch_description}")
memory_tracer.increment_and_maybe_trace()
# cc4a
if processing_mode == ProcessingMode.FILE_SYSTEM:
if processing_mode == ProcessingMode.RAW_BINARY:
# RAW_BINARY mode - files are uploaded via API directly to S3
# CraftFileConnector yields no documents (load_from_state returns empty)
# This code path should not be reached since the connector
# doesn't produce document batches. Log a warning if we get here.
logger.warning(
f"Unexpected RAW_BINARY document batch received: "
f"docs={len(doc_batch_cleaned)} attempt={index_attempt_id}. "
f"CraftFileConnector should not yield documents."
)
continue
elif processing_mode == ProcessingMode.FILE_SYSTEM:
# File system only - write directly to persistent storage,
# skip chunking/embedding/Vespa but still track documents in DB
@@ -812,22 +823,25 @@ def connector_document_extraction(
total_batches=batch_num,
)
# Trigger file sync to user's sandbox (if running) - only for FILE_SYSTEM mode
# This syncs the newly written documents from S3 to any running sandbox pod
if processing_mode == ProcessingMode.FILE_SYSTEM:
# Trigger file sync to user's sandbox (if running)
# This syncs files from S3 to any running sandbox pod
# Applies to both FILE_SYSTEM (connector JSON documents) and RAW_BINARY (user uploads)
if processing_mode in (ProcessingMode.FILE_SYSTEM, ProcessingMode.RAW_BINARY):
creator_id = index_attempt.connector_credential_pair.creator_id
if creator_id:
source_value = db_connector.source.value
app.send_task(
OnyxCeleryTask.SANDBOX_FILE_SYNC,
kwargs={
"user_id": str(creator_id),
"tenant_id": tenant_id,
"source": source_value,
},
queue=OnyxCeleryQueues.SANDBOX,
)
logger.info(
f"Triggered sandbox file sync for user {creator_id} "
f"after indexing complete"
f"source={source_value} after indexing complete"
)
except Exception as e:

View File

@@ -160,6 +160,8 @@ CELERY_USER_FILE_PROCESSING_LOCK_TIMEOUT = 30 * 60 # 30 minutes (in seconds)
CELERY_USER_FILE_PROJECT_SYNC_LOCK_TIMEOUT = 5 * 60 # 5 minutes (in seconds)
CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT = 5 * 60 # 5 minutes (in seconds)
DANSWER_REDIS_FUNCTION_LOCK_PREFIX = "da_function_lock:"
TMP_DRALPHA_PERSONA_NAME = "KG Beta"
@@ -226,6 +228,9 @@ class DocumentSource(str, Enum):
MOCK_CONNECTOR = "mock_connector"
# Special case for user files
USER_FILE = "user_file"
# Raw files for Craft sandbox access (xlsx, pptx, docx, etc.)
# Uses RAW_BINARY processing mode - no text extraction
CRAFT_FILE = "craft_file"
class FederatedConnectorSource(str, Enum):
@@ -447,6 +452,9 @@ class OnyxRedisLocks:
CLEANUP_IDLE_SANDBOXES_BEAT_LOCK = "da_lock:cleanup_idle_sandboxes_beat"
CLEANUP_OLD_SNAPSHOTS_BEAT_LOCK = "da_lock:cleanup_old_snapshots_beat"
# Sandbox file sync
SANDBOX_FILE_SYNC_LOCK_PREFIX = "da_lock:sandbox_file_sync"
class OnyxRedisSignals:
BLOCK_VALIDATE_INDEXING_FENCES = "signal:block_validate_indexing_fences"

View File

@@ -0,0 +1,2 @@
# Craft File Connector for Onyx Craft
# Handles raw binary file uploads (xlsx, pptx, docx, etc.) without text extraction

View File

@@ -0,0 +1,81 @@
"""
Craft File Connector for Onyx Craft.
This connector handles raw binary file uploads (xlsx, pptx, docx, csv, etc.)
that are stored directly in S3 WITHOUT text extraction.
Key differences from LocalFileConnector:
- Does NOT extract text from files
- Stores raw binary files directly to S3
- Uses RAW_BINARY processing mode
- Agent reads files with Python libraries (openpyxl, python-pptx, etc.)
Files are stored at:
s3://{bucket}/{tenant_id}/knowledge/{user_id}/user_library/{path}
And synced to sandbox at:
/workspace/files/user_library/{path}
Note: Sync enable/disable is managed via document metadata (sync_disabled field),
not via connector config. The _get_disabled_user_library_paths() function in
tasks.py queries the database for disabled files during sandbox sync.
"""
from typing import Any
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.utils.logger import setup_logger
logger = setup_logger()
class CraftFileConnector(LoadConnector):
"""
Connector for raw binary files in Onyx Craft.
Unlike LocalFileConnector which extracts text from files, this connector
preserves raw binary files for direct access by the sandbox agent.
The actual file upload happens via API endpoints, not through the
standard connector indexing flow. This connector exists primarily to:
1. Provide a valid connector entry for document table relationships
2. Enable RAW_BINARY processing mode in the indexing pipeline
"""
def __init__(
self,
batch_size: int = INDEX_BATCH_SIZE,
**kwargs: Any, # noqa: ARG002
) -> None:
"""
Initialize the CraftFileConnector.
Args:
batch_size: Number of documents to yield per batch (unused for raw files).
**kwargs: Extra parameters (ignored, for compatibility with connector config).
"""
self.batch_size = batch_size
def load_credentials(
self, credentials: dict[str, Any] # noqa: ARG002
) -> dict[str, Any] | None:
"""
Load credentials for the connector.
Craft files don't require external credentials since files are
uploaded directly by authenticated users. Returns None.
"""
return None
def load_from_state(self) -> GenerateDocumentsOutput:
"""
Load documents from the current state.
For CraftFileConnector, documents are managed via API endpoints.
This method yields no documents since uploads go directly to S3
and are tracked in the document table by the API.
"""
# Craft files are uploaded via API, not fetched by connector
yield from ()

View File

@@ -217,4 +217,10 @@ CONNECTOR_CLASS_MAP = {
module_path="onyx.connectors.mock_connector.connector",
class_name="MockConnector",
),
# Craft files - raw binary uploads (xlsx, pptx, docx, etc.)
# Uses RAW_BINARY processing mode - no text extraction
DocumentSource.CRAFT_FILE: ConnectorMapping(
module_path="onyx.connectors.craft_file.connector",
class_name="CraftFileConnector",
),
}

View File

@@ -6,6 +6,8 @@ from collections.abc import Sequence
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from typing import Any
from uuid import UUID
from sqlalchemy import and_
from sqlalchemy import delete
@@ -226,6 +228,50 @@ def get_documents_by_ids(
return list(documents)
def get_documents_by_source(
db_session: Session,
source: DocumentSource,
creator_id: UUID | None = None,
) -> list[DbDocument]:
"""Get all documents associated with a specific source type.
This queries through the connector relationship to find all documents
that were indexed by connectors of the given source type.
Args:
db_session: Database session
source: The document source type to filter by
creator_id: If provided, only return documents from connectors
created by this user. Filters via ConnectorCredentialPair.
"""
stmt = (
select(DbDocument)
.join(
DocumentByConnectorCredentialPair,
DbDocument.id == DocumentByConnectorCredentialPair.id,
)
.join(
ConnectorCredentialPair,
and_(
DocumentByConnectorCredentialPair.connector_id
== ConnectorCredentialPair.connector_id,
DocumentByConnectorCredentialPair.credential_id
== ConnectorCredentialPair.credential_id,
),
)
.join(
Connector,
ConnectorCredentialPair.connector_id == Connector.id,
)
.where(Connector.source == source)
)
if creator_id is not None:
stmt = stmt.where(ConnectorCredentialPair.creator_id == creator_id)
stmt = stmt.distinct()
documents = db_session.execute(stmt).scalars().all()
return list(documents)
def _apply_last_updated_cursor_filter(
stmt: Select,
cursor_last_modified: datetime | None,
@@ -1527,3 +1573,40 @@ def get_document_kg_entities_and_relationships(
def get_num_chunks_for_document(db_session: Session, document_id: str) -> int:
stmt = select(DbDocument.chunk_count).where(DbDocument.id == document_id)
return db_session.execute(stmt).scalar_one_or_none() or 0
def update_document_metadata__no_commit(
db_session: Session,
document_id: str,
doc_metadata: dict[str, Any],
) -> None:
"""Update the doc_metadata field for a document.
Note: Does not commit. Caller is responsible for committing.
Args:
db_session: Database session
document_id: The ID of the document to update
doc_metadata: The new metadata dictionary to set
"""
stmt = (
update(DbDocument)
.where(DbDocument.id == document_id)
.values(doc_metadata=doc_metadata)
)
db_session.execute(stmt)
def delete_document_by_id__no_commit(
db_session: Session,
document_id: str,
) -> None:
"""Delete a single document and its connector credential pair relationships.
Note: Does not commit. Caller is responsible for committing.
This uses delete_documents_complete__no_commit which handles
all foreign key relationships (KG entities, relationships, chunk stats,
cc pair associations, feedback, tags).
"""
delete_documents_complete__no_commit(db_session, [document_id])

View File

@@ -60,7 +60,8 @@ class ProcessingMode(str, PyEnum):
"""Determines how documents are processed after fetching."""
REGULAR = "REGULAR" # Full pipeline: chunk → embed → Vespa
FILE_SYSTEM = "FILE_SYSTEM" # Write to file system only
FILE_SYSTEM = "FILE_SYSTEM" # Write to file system only (JSON documents)
RAW_BINARY = "RAW_BINARY" # Write raw binary to S3 (no text extraction)
class SyncType(str, PyEnum):

View File

@@ -27,6 +27,7 @@ from onyx.server.features.build.api.models import BuildConnectorStatus
from onyx.server.features.build.api.models import RateLimitResponse
from onyx.server.features.build.api.rate_limit import get_user_rate_limit_status
from onyx.server.features.build.api.sessions_api import router as sessions_router
from onyx.server.features.build.api.user_library import router as user_library_router
from onyx.server.features.build.db.sandbox import get_sandbox_by_user_id
from onyx.server.features.build.sandbox import get_sandbox_manager
from onyx.server.features.build.session.manager import SessionManager
@@ -51,9 +52,10 @@ def require_onyx_craft_enabled(user: User = Depends(current_user)) -> User:
router = APIRouter(prefix="/build", dependencies=[Depends(require_onyx_craft_enabled)])
# Include sub-routers for sessions and messages
# Include sub-routers for sessions, messages, and user library
router.include_router(sessions_router, tags=["build"])
router.include_router(messages_router, tags=["build"])
router.include_router(user_library_router, tags=["build"])
# -----------------------------------------------------------------------------
@@ -86,14 +88,24 @@ def get_build_connectors(
On the build configure page, all users (including admins) only see connectors
they own/created. Users can create new connectors if they don't have one of a type.
"""
cc_pairs = get_connector_credential_pairs_for_user(
# Fetch both FILE_SYSTEM (standard connectors) and RAW_BINARY (User Library) connectors
file_system_cc_pairs = get_connector_credential_pairs_for_user(
db_session=db_session,
user=user,
get_editable=False,
eager_load_connector=True,
eager_load_credential=True,
processing_mode=ProcessingMode.FILE_SYSTEM, # Only show FILE_SYSTEM connectors
processing_mode=ProcessingMode.FILE_SYSTEM,
)
raw_binary_cc_pairs = get_connector_credential_pairs_for_user(
db_session=db_session,
user=user,
get_editable=False,
eager_load_connector=True,
eager_load_credential=True,
processing_mode=ProcessingMode.RAW_BINARY,
)
cc_pairs = file_system_cc_pairs + raw_binary_cc_pairs
# Filter to only show connectors created by the current user
# All users (including admins) must create their own connectors on the build configure page

View File

@@ -47,6 +47,7 @@ from onyx.server.features.build.session.manager import UploadLimitExceededError
from onyx.server.features.build.utils import sanitize_filename
from onyx.server.features.build.utils import validate_file
from onyx.utils.logger import setup_logger
from onyx.utils.threadpool_concurrency import run_with_timeout
from shared_configs.contextvars import get_current_tenant_id
logger = setup_logger()
@@ -328,6 +329,9 @@ def delete_session(
# Lock timeout should be longer than max restore time (5 minutes)
RESTORE_LOCK_TIMEOUT_SECONDS = 300
# Per-operation timeout (provision, snapshot restore, etc.)
# If more than this, probably failed to restore.
RESTORE_TIMEOUT_SECONDS = 120
@router.post("/{session_id}/restore", response_model=DetailedSessionResponse)
@@ -418,7 +422,9 @@ def restore_session(
)
db_session.commit()
sandbox_manager.provision(
run_with_timeout(
RESTORE_TIMEOUT_SECONDS,
sandbox_manager.provision,
sandbox_id=sandbox.id,
user_id=user.id,
tenant_id=tenant_id,
@@ -451,7 +457,9 @@ def restore_session(
if snapshot:
try:
sandbox_manager.restore_snapshot(
run_with_timeout(
RESTORE_TIMEOUT_SECONDS,
sandbox_manager.restore_snapshot,
sandbox_id=sandbox.id,
session_id=session_id,
snapshot_storage_path=snapshot.storage_path,
@@ -462,6 +470,8 @@ def restore_session(
)
session.status = BuildSessionStatus.ACTIVE
db_session.commit()
except TimeoutError:
raise
except Exception as e:
logger.error(
f"Snapshot restore failed for session {session_id}: {e}"
@@ -471,7 +481,9 @@ def restore_session(
raise
else:
# No snapshot - set up fresh workspace
sandbox_manager.setup_session_workspace(
run_with_timeout(
RESTORE_TIMEOUT_SECONDS,
sandbox_manager.setup_session_workspace,
sandbox_id=sandbox.id,
session_id=session_id,
llm_config=llm_config,
@@ -485,17 +497,29 @@ def restore_session(
f"re-provision, expected RUNNING"
)
except TimeoutError as e:
# Do NOT release the Redis lock here. The timed-out operation is
# still running in a background thread. Releasing the lock would
# allow a concurrent restore to start on the same sandbox. The
# lock's TTL (RESTORE_LOCK_TIMEOUT_SECONDS) will expire it once
# the orphaned operation has had time to finish.
logger.error(f"Restore timed out for session {session_id}: {e}")
raise HTTPException(
status_code=504,
detail="Sandbox restore timed out",
)
except Exception as e:
logger.error(f"Failed to restore session {session_id}: {e}", exc_info=True)
if lock.owned():
lock.release()
raise HTTPException(
status_code=500,
detail=f"Failed to restore session: {e}",
)
finally:
if lock.owned():
lock.release()
# Update heartbeat to mark sandbox as active after successful restore
# Success - release the lock and update heartbeat
if lock.owned():
lock.release()
update_sandbox_heartbeat(db_session, sandbox.id)
base_response = SessionResponse.from_model(session, sandbox)

View File

@@ -0,0 +1,794 @@
"""API endpoints for User Library file management in Craft.
This module provides endpoints for uploading and managing raw binary files
(xlsx, pptx, docx, csv, etc.) that are stored directly in S3 for sandbox access.
Files are stored at:
s3://{bucket}/{tenant_id}/knowledge/{user_id}/user_library/{path}
And synced to sandbox at:
/workspace/files/user_library/{path}
"""
import mimetypes
import zipfile
from datetime import datetime
from datetime import timezone
from io import BytesIO
from typing import Any
from uuid import UUID
from fastapi import APIRouter
from fastapi import Depends
from fastapi import File
from fastapi import Form
from fastapi import HTTPException
from fastapi import Query
from fastapi import UploadFile
from pydantic import BaseModel
from sqlalchemy.orm import Session
from onyx.auth.users import current_user
from onyx.background.celery.versioned_apps.client import app as celery_app
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.db.connector_credential_pair import update_connector_credential_pair
from onyx.db.document import upsert_document_by_connector_credential_pair
from onyx.db.document import upsert_documents
from onyx.db.engine.sql_engine import get_session
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.models import User
from onyx.document_index.interfaces import DocumentMetadata
from onyx.server.features.build.configs import USER_LIBRARY_MAX_FILE_SIZE_BYTES
from onyx.server.features.build.configs import USER_LIBRARY_MAX_FILES_PER_UPLOAD
from onyx.server.features.build.configs import USER_LIBRARY_MAX_TOTAL_SIZE_BYTES
from onyx.server.features.build.indexing.persistent_document_writer import (
get_persistent_document_writer,
)
from onyx.server.features.build.indexing.persistent_document_writer import (
S3PersistentDocumentWriter,
)
from onyx.server.features.build.utils import sanitize_filename as api_sanitize_filename
from onyx.utils.logger import setup_logger
from shared_configs.contextvars import get_current_tenant_id
logger = setup_logger()
router = APIRouter(prefix="/user-library")
# =============================================================================
# Pydantic Models
# =============================================================================
class LibraryEntryResponse(BaseModel):
"""Response for a single library entry (file or directory)."""
id: str # document_id
name: str
path: str
is_directory: bool
file_size: int | None
mime_type: str | None
sync_enabled: bool
created_at: datetime
children: list["LibraryEntryResponse"] | None = None
class CreateDirectoryRequest(BaseModel):
"""Request to create a virtual directory."""
name: str
parent_path: str = "/"
class UploadResponse(BaseModel):
"""Response after successful file upload."""
entries: list[LibraryEntryResponse]
total_uploaded: int
total_size_bytes: int
# =============================================================================
# Helper Functions
# =============================================================================
def _sanitize_path(path: str) -> str:
"""Sanitize a file path, removing traversal attempts and normalizing.
Removes '..' and '.' segments and ensures the path starts with '/'.
Only allows alphanumeric characters, hyphens, underscores, dots, and
forward slashes in the final path segments.
"""
parts = path.split("/")
sanitized_parts = [p for p in parts if p and p != ".." and p != "."]
result = "/" + "/".join(sanitized_parts)
return result
def _build_document_id(user_id: str, path: str) -> str:
"""Build a document ID for a craft file.
Deterministic: re-uploading the same file to the same path will produce the
same document ID, allowing upsert to overwrite the previous record.
"""
sanitized_path = path.replace("/", "_").strip("_")
return f"CRAFT_FILE__{user_id}__{sanitized_path}"
def _trigger_sandbox_sync(
user_id: str, tenant_id: str, source: str | None = None
) -> None:
"""Trigger sandbox file sync task.
Args:
user_id: The user ID whose sandbox should be synced
tenant_id: The tenant ID for S3 path construction
source: Optional source type (e.g., "user_library"). If specified,
only syncs that source's directory with --delete flag.
"""
celery_app.send_task(
OnyxCeleryTask.SANDBOX_FILE_SYNC,
kwargs={"user_id": user_id, "tenant_id": tenant_id, "source": source},
queue=OnyxCeleryQueues.SANDBOX,
)
def _get_user_storage_bytes(db_session: Session, user_id: UUID) -> int:
"""Get total storage usage for a user's library files.
Sums file_size from doc_metadata for all CRAFT_FILE documents owned by this user.
"""
from onyx.db.document import get_documents_by_source
docs = get_documents_by_source(
db_session=db_session,
source=DocumentSource.CRAFT_FILE,
creator_id=user_id,
)
total = 0
for doc in docs:
metadata = doc.doc_metadata or {}
if not metadata.get("is_directory"):
total += metadata.get("file_size", 0)
return total
def _get_or_create_craft_connector(db_session: Session, user: User) -> tuple[int, int]:
"""Get or create the CRAFT_FILE connector for a user.
Returns:
Tuple of (connector_id, credential_id)
Note: We need to create a credential even though CRAFT_FILE doesn't require
authentication. This is because Onyx's connector-credential pair system
requires a credential for all connectors. The credential is empty ({}).
This function handles recovery from partial creation failures by detecting
orphaned connectors (connectors without cc_pairs) and completing their setup.
"""
from onyx.connectors.models import InputType
from onyx.db.connector import create_connector
from onyx.db.connector import fetch_connectors
from onyx.db.connector_credential_pair import add_credential_to_connector
from onyx.db.connector_credential_pair import (
get_connector_credential_pairs_for_user,
)
from onyx.db.credentials import create_credential
from onyx.db.credentials import fetch_credentials_for_user
from onyx.db.enums import AccessType
from onyx.db.enums import ProcessingMode
from onyx.server.documents.models import ConnectorBase
from onyx.server.documents.models import CredentialBase
# Check if user already has a complete CRAFT_FILE cc_pair
cc_pairs = get_connector_credential_pairs_for_user(
db_session=db_session,
user=user,
get_editable=False,
eager_load_connector=True,
eager_load_credential=True,
processing_mode=ProcessingMode.RAW_BINARY,
)
for cc_pair in cc_pairs:
if cc_pair.connector.source == DocumentSource.CRAFT_FILE:
return cc_pair.connector.id, cc_pair.credential.id
# Check for orphaned connector (created but cc_pair creation failed previously)
existing_connectors = fetch_connectors(
db_session, sources=[DocumentSource.CRAFT_FILE]
)
orphaned_connector = None
for conn in existing_connectors:
if conn.name == "User Library":
orphaned_connector = conn
break
if orphaned_connector:
connector_id = orphaned_connector.id
logger.info(
f"Found orphaned User Library connector {connector_id}, completing setup"
)
else:
# Create new connector
connector_data = ConnectorBase(
name="User Library",
source=DocumentSource.CRAFT_FILE,
input_type=InputType.LOAD_STATE,
connector_specific_config={"disabled_paths": []},
refresh_freq=None,
prune_freq=None,
)
connector_response = create_connector(
db_session=db_session,
connector_data=connector_data,
)
connector_id = connector_response.id
# Try to reuse an existing User Library credential for this user
existing_credentials = fetch_credentials_for_user(
db_session=db_session,
user=user,
)
credential = None
for cred in existing_credentials:
if (
cred.source == DocumentSource.CRAFT_FILE
and cred.name == "User Library Credential"
):
credential = cred
break
if credential is None:
# Create credential (empty - no auth needed, but required by the system)
credential_data = CredentialBase(
credential_json={},
admin_public=False,
source=DocumentSource.CRAFT_FILE,
name="User Library Credential",
)
credential = create_credential(
credential_data=credential_data,
user=user,
db_session=db_session,
)
# Link them with RAW_BINARY processing mode
add_credential_to_connector(
db_session=db_session,
connector_id=connector_id,
credential_id=credential.id,
user=user,
cc_pair_name="User Library",
access_type=AccessType.PRIVATE,
groups=None,
processing_mode=ProcessingMode.RAW_BINARY,
)
db_session.commit()
return connector_id, credential.id
# =============================================================================
# API Endpoints
# =============================================================================
@router.get("/tree")
def get_library_tree(
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> list[LibraryEntryResponse]:
"""Get user's uploaded files as a tree structure.
Returns all CRAFT_FILE documents for the user, organized hierarchically.
"""
from onyx.db.document import get_documents_by_source
# Get CRAFT_FILE documents for this user (filtered at SQL level)
user_docs = get_documents_by_source(
db_session=db_session,
source=DocumentSource.CRAFT_FILE,
creator_id=user.id,
)
# Build tree structure
entries: list[LibraryEntryResponse] = []
now = datetime.now(timezone.utc)
for doc in user_docs:
doc_metadata = doc.doc_metadata or {}
entries.append(
LibraryEntryResponse(
id=doc.id,
name=doc.semantic_id.split("/")[-1] if doc.semantic_id else "unknown",
path=doc.semantic_id or "",
is_directory=doc_metadata.get("is_directory", False),
file_size=doc_metadata.get("file_size"),
mime_type=doc_metadata.get("mime_type"),
sync_enabled=not doc_metadata.get("sync_disabled", False),
created_at=doc.last_modified or now,
)
)
return entries
@router.post("/upload")
async def upload_files(
files: list[UploadFile] = File(...),
path: str = Form("/"),
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> UploadResponse:
"""Upload files directly to S3 and track in PostgreSQL.
Files are stored as raw binary (no text extraction) for access by
the sandbox agent using Python libraries like openpyxl, python-pptx, etc.
"""
tenant_id = get_current_tenant_id()
if tenant_id is None:
raise HTTPException(status_code=500, detail="Tenant ID not found")
# Validate file count
if len(files) > USER_LIBRARY_MAX_FILES_PER_UPLOAD:
raise HTTPException(
status_code=400,
detail=f"Too many files. Maximum is {USER_LIBRARY_MAX_FILES_PER_UPLOAD} per upload.",
)
# Check cumulative storage usage
existing_usage = _get_user_storage_bytes(db_session, user.id)
# Get or create connector
connector_id, credential_id = _get_or_create_craft_connector(db_session, user)
# Get the persistent document writer
writer = get_persistent_document_writer(
user_id=str(user.id),
tenant_id=tenant_id,
)
uploaded_entries: list[LibraryEntryResponse] = []
total_size = 0
now = datetime.now(timezone.utc)
# Sanitize the base path
base_path = _sanitize_path(path)
for file in files:
# Read content
content = await file.read()
file_size = len(content)
# Validate individual file size
if file_size > USER_LIBRARY_MAX_FILE_SIZE_BYTES:
raise HTTPException(
status_code=400,
detail=f"File '{file.filename}' exceeds maximum size of {USER_LIBRARY_MAX_FILE_SIZE_BYTES // (1024*1024)}MB",
)
# Validate cumulative storage (existing + this upload batch)
total_size += file_size
if existing_usage + total_size > USER_LIBRARY_MAX_TOTAL_SIZE_BYTES:
raise HTTPException(
status_code=400,
detail=f"Total storage would exceed maximum of {USER_LIBRARY_MAX_TOTAL_SIZE_BYTES // (1024*1024*1024)}GB",
)
# Sanitize filename
safe_filename = api_sanitize_filename(file.filename or "unnamed")
file_path = f"{base_path}/{safe_filename}".replace("//", "/")
# Write raw binary to storage
storage_key = writer.write_raw_file(
path=file_path,
content=content,
content_type=file.content_type,
)
# Track in document table
doc_id = _build_document_id(str(user.id), file_path)
doc_metadata = DocumentMetadata(
connector_id=connector_id,
credential_id=credential_id,
document_id=doc_id,
semantic_identifier=f"user_library{file_path}",
first_link=storage_key,
doc_metadata={
"storage_key": storage_key,
"file_path": file_path,
"file_size": file_size,
"mime_type": file.content_type,
"is_directory": False,
},
)
upsert_documents(db_session, [doc_metadata])
upsert_document_by_connector_credential_pair(
db_session, connector_id, credential_id, [doc_id]
)
uploaded_entries.append(
LibraryEntryResponse(
id=doc_id,
name=safe_filename,
path=file_path,
is_directory=False,
file_size=file_size,
mime_type=file.content_type,
sync_enabled=True,
created_at=now,
)
)
# Mark connector as having succeeded (sets last_successful_index_time)
# This allows the demo data toggle to be disabled
update_connector_credential_pair(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
status=ConnectorCredentialPairStatus.ACTIVE,
net_docs=len(uploaded_entries),
run_dt=now,
)
# Trigger sandbox sync for user_library source only
_trigger_sandbox_sync(str(user.id), tenant_id, source="user_library")
logger.info(
f"Uploaded {len(uploaded_entries)} files ({total_size} bytes) for user {user.id}"
)
return UploadResponse(
entries=uploaded_entries,
total_uploaded=len(uploaded_entries),
total_size_bytes=total_size,
)
@router.post("/upload-zip")
async def upload_zip(
file: UploadFile = File(...),
path: str = Form("/"),
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> UploadResponse:
"""Upload and extract a zip file, storing each extracted file to S3.
Preserves the directory structure from the zip file.
"""
tenant_id = get_current_tenant_id()
if tenant_id is None:
raise HTTPException(status_code=500, detail="Tenant ID not found")
# Read zip content
content = await file.read()
if len(content) > USER_LIBRARY_MAX_TOTAL_SIZE_BYTES:
raise HTTPException(
status_code=400,
detail=f"Zip file exceeds maximum size of {USER_LIBRARY_MAX_TOTAL_SIZE_BYTES // (1024*1024*1024)}GB",
)
# Check cumulative storage usage
existing_usage = _get_user_storage_bytes(db_session, user.id)
# Get or create connector
connector_id, credential_id = _get_or_create_craft_connector(db_session, user)
# Get the persistent document writer
writer = get_persistent_document_writer(
user_id=str(user.id),
tenant_id=tenant_id,
)
uploaded_entries: list[LibraryEntryResponse] = []
total_size = 0
base_path = _sanitize_path(path)
now = datetime.now(timezone.utc)
try:
with zipfile.ZipFile(BytesIO(content), "r") as zip_file:
# Check file count
if len(zip_file.namelist()) > USER_LIBRARY_MAX_FILES_PER_UPLOAD:
raise HTTPException(
status_code=400,
detail=f"Zip contains too many files. Maximum is {USER_LIBRARY_MAX_FILES_PER_UPLOAD}.",
)
for zip_info in zip_file.infolist():
# Skip directories
if zip_info.is_dir():
continue
# Skip hidden files and __MACOSX
if (
zip_info.filename.startswith("__MACOSX")
or "/." in zip_info.filename
):
continue
# Read file content
file_content = zip_file.read(zip_info.filename)
file_size = len(file_content)
# Validate individual file size
if file_size > USER_LIBRARY_MAX_FILE_SIZE_BYTES:
logger.warning(f"Skipping '{zip_info.filename}' - exceeds max size")
continue
total_size += file_size
# Validate cumulative storage
if existing_usage + total_size > USER_LIBRARY_MAX_TOTAL_SIZE_BYTES:
raise HTTPException(
status_code=400,
detail=f"Total storage would exceed maximum of {USER_LIBRARY_MAX_TOTAL_SIZE_BYTES // (1024*1024*1024)}GB",
)
# Build path preserving zip structure
sanitized_zip_path = _sanitize_path(zip_info.filename)
file_path = f"{base_path}{sanitized_zip_path}".replace("//", "/")
file_name = file_path.split("/")[-1]
# Guess content type
content_type, _ = mimetypes.guess_type(file_name)
# Write raw binary to storage
storage_key = writer.write_raw_file(
path=file_path,
content=file_content,
content_type=content_type,
)
# Track in document table
doc_id = _build_document_id(str(user.id), file_path)
doc_metadata = DocumentMetadata(
connector_id=connector_id,
credential_id=credential_id,
document_id=doc_id,
semantic_identifier=f"user_library{file_path}",
first_link=storage_key,
doc_metadata={
"storage_key": storage_key,
"file_path": file_path,
"file_size": file_size,
"mime_type": content_type,
"is_directory": False,
},
)
upsert_documents(db_session, [doc_metadata])
upsert_document_by_connector_credential_pair(
db_session, connector_id, credential_id, [doc_id]
)
uploaded_entries.append(
LibraryEntryResponse(
id=doc_id,
name=file_name,
path=file_path,
is_directory=False,
file_size=file_size,
mime_type=content_type,
sync_enabled=True,
created_at=now,
)
)
except zipfile.BadZipFile:
raise HTTPException(status_code=400, detail="Invalid zip file")
# Mark connector as having succeeded (sets last_successful_index_time)
# This allows the demo data toggle to be disabled
update_connector_credential_pair(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
status=ConnectorCredentialPairStatus.ACTIVE,
net_docs=len(uploaded_entries),
run_dt=now,
)
# Trigger sandbox sync for user_library source only
_trigger_sandbox_sync(str(user.id), tenant_id, source="user_library")
logger.info(
f"Extracted {len(uploaded_entries)} files ({total_size} bytes) from zip for user {user.id}"
)
return UploadResponse(
entries=uploaded_entries,
total_uploaded=len(uploaded_entries),
total_size_bytes=total_size,
)
@router.post("/directories")
def create_directory(
request: CreateDirectoryRequest,
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> LibraryEntryResponse:
"""Create a virtual directory.
Directories are tracked as documents with is_directory=True.
No S3 object is created (S3 doesn't have real directories).
"""
# Get or create connector
connector_id, credential_id = _get_or_create_craft_connector(db_session, user)
# Build path
parent_path = _sanitize_path(request.parent_path)
safe_name = api_sanitize_filename(request.name)
dir_path = f"{parent_path}/{safe_name}".replace("//", "/")
# Track in document table
doc_id = _build_document_id(str(user.id), dir_path)
doc_metadata = DocumentMetadata(
connector_id=connector_id,
credential_id=credential_id,
document_id=doc_id,
semantic_identifier=f"user_library{dir_path}",
first_link="",
doc_metadata={
"is_directory": True,
},
)
upsert_documents(db_session, [doc_metadata])
upsert_document_by_connector_credential_pair(
db_session, connector_id, credential_id, [doc_id]
)
db_session.commit()
return LibraryEntryResponse(
id=doc_id,
name=safe_name,
path=dir_path,
is_directory=True,
file_size=None,
mime_type=None,
sync_enabled=True,
created_at=datetime.now(timezone.utc),
)
@router.patch("/files/{document_id}/toggle")
def toggle_file_sync(
document_id: str,
enabled: bool = Query(...),
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> dict[str, Any]:
"""Enable/disable syncing a file to sandboxes.
When sync is disabled, the file's metadata is updated with sync_disabled=True.
The sandbox sync task will exclude these files when syncing to the sandbox.
If the item is a directory, all children are also toggled.
"""
from onyx.db.document import get_document
from onyx.db.document import get_documents_by_source
from onyx.db.document import update_document_metadata__no_commit
tenant_id = get_current_tenant_id()
if tenant_id is None:
raise HTTPException(status_code=500, detail="Tenant ID not found")
# Verify ownership
user_prefix = f"CRAFT_FILE__{user.id}__"
if not document_id.startswith(user_prefix):
raise HTTPException(
status_code=403, detail="Not authorized to modify this file"
)
# Get document
doc = get_document(document_id, db_session)
if doc is None:
raise HTTPException(status_code=404, detail="File not found")
# Update metadata for this document
new_metadata = dict(doc.doc_metadata or {})
new_metadata["sync_disabled"] = not enabled
update_document_metadata__no_commit(db_session, document_id, new_metadata)
# If this is a directory, also toggle all children
doc_metadata = doc.doc_metadata or {}
if doc_metadata.get("is_directory"):
folder_path = doc.semantic_id
if folder_path:
# Get CRAFT_FILE documents for this user (filtered at SQL level)
all_docs = get_documents_by_source(
db_session=db_session,
source=DocumentSource.CRAFT_FILE,
creator_id=user.id,
)
# Find children of this folder
for child_doc in all_docs:
if child_doc.semantic_id and child_doc.semantic_id.startswith(
folder_path + "/"
):
# Update metadata
child_metadata = dict(child_doc.doc_metadata or {})
child_metadata["sync_disabled"] = not enabled
update_document_metadata__no_commit(
db_session, child_doc.id, child_metadata
)
db_session.commit()
return {"success": True, "sync_enabled": enabled}
@router.delete("/files/{document_id}")
def delete_file(
document_id: str,
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> dict[str, Any]:
"""Delete a file from both S3 and the document table."""
from onyx.db.document import delete_document_by_id__no_commit
from onyx.db.document import get_document
tenant_id = get_current_tenant_id()
if tenant_id is None:
raise HTTPException(status_code=500, detail="Tenant ID not found")
# Verify ownership
user_prefix = f"CRAFT_FILE__{user.id}__"
if not document_id.startswith(user_prefix):
raise HTTPException(
status_code=403, detail="Not authorized to delete this file"
)
# Get document
doc = get_document(document_id, db_session)
if doc is None:
raise HTTPException(status_code=404, detail="File not found")
# Delete from storage if it's a file (not directory)
doc_metadata = doc.doc_metadata or {}
if not doc_metadata.get("is_directory"):
file_path = doc_metadata.get("file_path")
if file_path:
writer = get_persistent_document_writer(
user_id=str(user.id),
tenant_id=tenant_id,
)
try:
if isinstance(writer, S3PersistentDocumentWriter):
writer.delete_raw_file_by_path(file_path)
else:
writer.delete_raw_file(file_path)
except Exception as e:
logger.warning(f"Failed to delete file at path {file_path}: {e}")
else:
# Fallback for documents created before file_path was stored
storage_key = doc_metadata.get("storage_key") or doc_metadata.get("s3_key")
if storage_key:
writer = get_persistent_document_writer(
user_id=str(user.id),
tenant_id=tenant_id,
)
try:
if isinstance(writer, S3PersistentDocumentWriter):
writer.delete_raw_file(storage_key)
else:
logger.warning(
f"Cannot delete file in local mode without file_path: {document_id}"
)
except Exception as e:
logger.warning(
f"Failed to delete storage object {storage_key}: {e}"
)
# Delete from document table
delete_document_by_id__no_commit(db_session, document_id)
db_session.commit()
# Trigger sync to apply changes
_trigger_sandbox_sync(str(user.id), tenant_id, source="user_library")
return {"success": True, "deleted": document_id}

View File

@@ -132,3 +132,25 @@ ACP_MESSAGE_TIMEOUT = float(os.environ.get("ACP_MESSAGE_TIMEOUT", "900.0"))
# Free users always get 5 messages total (not configurable)
# Per-user overrides are managed via PostHog feature flag "craft-has-usage-limits"
CRAFT_PAID_USER_RATE_LIMIT = int(os.environ.get("CRAFT_PAID_USER_RATE_LIMIT", "25"))
# ============================================================================
# User Library Configuration
# For user-uploaded raw files (xlsx, pptx, docx, etc.) in Craft
# ============================================================================
# Maximum size per file in MB (default 500MB)
USER_LIBRARY_MAX_FILE_SIZE_MB = int(
os.environ.get("USER_LIBRARY_MAX_FILE_SIZE_MB", "500")
)
USER_LIBRARY_MAX_FILE_SIZE_BYTES = USER_LIBRARY_MAX_FILE_SIZE_MB * 1024 * 1024
# Maximum total storage per user in GB (default 10GB)
USER_LIBRARY_MAX_TOTAL_SIZE_GB = int(
os.environ.get("USER_LIBRARY_MAX_TOTAL_SIZE_GB", "10")
)
USER_LIBRARY_MAX_TOTAL_SIZE_BYTES = USER_LIBRARY_MAX_TOTAL_SIZE_GB * 1024 * 1024 * 1024
# Maximum files per single upload request (default 100)
USER_LIBRARY_MAX_FILES_PER_UPLOAD = int(
os.environ.get("USER_LIBRARY_MAX_FILES_PER_UPLOAD", "100")
)

View File

@@ -272,6 +272,70 @@ class PersistentDocumentWriter:
logger.debug(f"Wrote document to {path}")
def write_raw_file(
self,
path: str,
content: bytes,
content_type: str | None = None, # noqa: ARG002
) -> str:
"""Write a raw binary file to local filesystem (for User Library).
Unlike write_documents which serializes Document objects to JSON, this method
writes raw binary content directly. Used for user-uploaded files like xlsx, pptx.
Args:
path: Relative path within user's library (e.g., "/project-data/financials.xlsx")
content: Raw binary content to write
content_type: MIME type of the file (stored as metadata, unused locally)
Returns:
Full filesystem path where file was written
"""
# Build full path: {base_path}/{tenant}/knowledge/{user}/user_library/{path}
# Normalize path - ensure it starts with / and doesn't have double slashes
normalized_path = "/" + path.lstrip("/")
full_path = (
self.base_path
/ self.tenant_id
/ "knowledge"
/ self.user_id
/ "user_library"
/ normalized_path.lstrip("/")
)
# Create parent directories if they don't exist
full_path.parent.mkdir(parents=True, exist_ok=True)
# Write the raw binary content
with open(full_path, "wb") as f:
f.write(content)
logger.debug(f"Wrote raw file to {full_path}")
return str(full_path)
def delete_raw_file(self, path: str) -> None:
"""Delete a raw file from local filesystem.
Args:
path: Relative path within user's library (e.g., "/project-data/financials.xlsx")
"""
# Build full path
normalized_path = "/" + path.lstrip("/")
full_path = (
self.base_path
/ self.tenant_id
/ "knowledge"
/ self.user_id
/ "user_library"
/ normalized_path.lstrip("/")
)
if full_path.exists():
full_path.unlink()
logger.debug(f"Deleted raw file at {full_path}")
else:
logger.warning(f"File not found for deletion: {full_path}")
class S3PersistentDocumentWriter:
"""Writes indexed documents to S3 with hierarchical structure.
@@ -375,6 +439,73 @@ class S3PersistentDocumentWriter:
logger.error(f"Failed to write to S3: {e}")
raise
def write_raw_file(
self,
path: str,
content: bytes,
content_type: str | None = None,
) -> str:
"""Write a raw binary file to S3 (for User Library).
Unlike write_documents which serializes Document objects to JSON, this method
writes raw binary content directly. Used for user-uploaded files like xlsx, pptx.
Args:
path: Relative path within user's library (e.g., "/project-data/financials.xlsx")
content: Raw binary content to write
content_type: MIME type of the file
Returns:
S3 key where file was written
"""
# Build S3 key: {tenant}/knowledge/{user}/user_library/{path}
normalized_path = path.lstrip("/")
s3_key = (
f"{self.tenant_id}/knowledge/{self.user_id}/user_library/{normalized_path}"
)
s3_client = self._get_s3_client()
try:
s3_client.put_object(
Bucket=self.bucket,
Key=s3_key,
Body=content,
ContentType=content_type or "application/octet-stream",
)
logger.debug(f"Wrote raw file to s3://{self.bucket}/{s3_key}")
return s3_key
except ClientError as e:
logger.error(f"Failed to write raw file to S3: {e}")
raise
def delete_raw_file(self, s3_key: str) -> None:
"""Delete a raw file from S3.
Args:
s3_key: Full S3 key of the file to delete
"""
s3_client = self._get_s3_client()
try:
s3_client.delete_object(Bucket=self.bucket, Key=s3_key)
logger.debug(f"Deleted raw file at s3://{self.bucket}/{s3_key}")
except ClientError as e:
logger.error(f"Failed to delete raw file from S3: {e}")
raise
def delete_raw_file_by_path(self, path: str) -> None:
"""Delete a raw file from S3 by its relative path.
Args:
path: Relative path within user's library (e.g., "/project-data/financials.xlsx")
"""
normalized_path = path.lstrip("/")
s3_key = (
f"{self.tenant_id}/knowledge/{self.user_id}/user_library/{normalized_path}"
)
self.delete_raw_file(s3_key)
def get_persistent_document_writer(
user_id: str,

View File

@@ -125,6 +125,7 @@ class SandboxManager(ABC):
user_work_area: str | None = None,
user_level: str | None = None,
use_demo_data: bool = False,
excluded_user_library_paths: list[str] | None = None,
) -> None:
"""Set up a session workspace within an existing sandbox.
@@ -149,6 +150,9 @@ class SandboxManager(ABC):
user_work_area: User's work area for demo persona (e.g., "engineering")
user_level: User's level for demo persona (e.g., "ic", "manager")
use_demo_data: If True, symlink files/ to demo data; else to user files
excluded_user_library_paths: List of paths within user_library to exclude
from the sandbox (e.g., ["/data/file.xlsx"]). Only applies when
use_demo_data=False. Files at these paths won't be accessible.
Raises:
RuntimeError: If workspace setup fails
@@ -416,6 +420,7 @@ class SandboxManager(ABC):
sandbox_id: UUID,
user_id: UUID,
tenant_id: str,
source: str | None = None,
) -> bool:
"""Sync files from S3 to the sandbox's /workspace/files directory.
@@ -428,6 +433,12 @@ class SandboxManager(ABC):
sandbox_id: The sandbox UUID
user_id: The user ID (for S3 path construction)
tenant_id: The tenant ID (for S3 path construction)
source: Optional source type (e.g., "gmail", "google_drive").
If None, syncs all sources. If specified, only syncs
that source's directory.
exclude_paths: Optional list of relative paths to exclude from sync
(e.g., ["/data/file.xlsx"]). Files matching these paths
will be skipped during sync and deleted from sandbox if present.
Returns:
True if sync was successful, False otherwise.

View File

@@ -361,45 +361,33 @@ class KubernetesSandboxManager(SandboxManager):
command=["/bin/sh", "-c"],
args=[
f"""
# Handle SIGTERM for fast container termination
trap 'echo "Received SIGTERM, exiting"; exit 0' TERM
# Handle signals for graceful container termination
trap 'echo "Shutting down"; exit 0' TERM INT
# Initial sync on startup - sync knowledge files for this user/tenant
echo "Starting initial file sync for tenant: {tenant_id} / user: {user_id}"
echo "S3 source: s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/"
echo "Starting initial file sync"
echo "S3: s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/*"
echo "Local: /workspace/files/"
# s5cmd sync: high-performance parallel S3 sync (default 256 workers)
# Capture both stdout and stderr to see all messages including errors
# s5cmd sync (default 256 workers)
# Exit codes: 0=success, 1=success with warnings
sync_exit_code=0
sync_output=$(mktemp)
/s5cmd --log debug --stat sync \
/s5cmd --stat sync \
"s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/*" \
/workspace/files/ 2>&1 | tee "$sync_output" || sync_exit_code=$?
/workspace/files/ 2>&1 || sync_exit_code=$?
echo "=== S3 sync finished with exit code: $sync_exit_code ==="
echo "=== Initial sync finished (exit code: $sync_exit_code) ==="
# Count files synced
file_count=$(find /workspace/files -type f | wc -l)
echo "Total files in /workspace/files: $file_count"
# Show summary of any errors from the output
if [ $sync_exit_code -ne 0 ]; then
echo "=== Errors/warnings from sync ==="
grep -iE "error|warn|fail" "$sync_output" || echo "No errors found"
echo "=========================="
fi
rm -f "$sync_output"
# Exit codes 0 and 1 are considered success (1 = success with warnings)
# Handle result
if [ $sync_exit_code -eq 0 ] || [ $sync_exit_code -eq 1 ]; then
echo "Sync complete (exit $sync_exit_code), staying alive for incremental syncs"
file_count=$(find /workspace/files -type f 2>/dev/null | wc -l)
echo "Files synced: $file_count"
echo "Sidecar ready for incremental syncs"
else
echo "ERROR: Sync failed with exit code: $sync_exit_code"
echo "ERROR: Initial sync failed (exit code: $sync_exit_code)"
exit $sync_exit_code
fi
# Stay alive - incremental sync commands will be executed via kubectl exec
# Use 'wait' so shell can respond to signals while sleeping
# Stay alive for incremental syncs via kubectl exec
while true; do
sleep 30 &
wait $!
@@ -1099,6 +1087,7 @@ done
user_work_area: str | None = None,
user_level: str | None = None,
use_demo_data: bool = False,
excluded_user_library_paths: list[str] | None = None,
) -> None:
"""Set up a session workspace within an existing sandbox pod.
@@ -1127,6 +1116,8 @@ done
user_level: User's level for demo persona (e.g., "ic", "manager")
use_demo_data: If True, symlink files/ to /workspace/demo_data;
else to /workspace/files (S3-synced user files)
excluded_user_library_paths: List of paths within user_library/ to exclude
(e.g., ["/data/file.xlsx"]). These files won't be accessible in the session.
Raises:
RuntimeError: If workspace setup fails
@@ -1206,6 +1197,82 @@ printf '%s' '{org_structure_escaped}' > {session_path}/org_info/organization_str
# Create files symlink to demo data (baked into image)
echo "Creating files symlink to demo data: {symlink_target}"
ln -sf {symlink_target} {session_path}/files
"""
elif excluded_user_library_paths:
# User files with exclusions: create filtered symlink structure
# Instead of symlinking to /workspace/files directly, create a directory
# with symlinks to each top-level item, then filter user_library contents
excluded_paths_str = " ".join(
p.lstrip("/") for p in excluded_user_library_paths
)
files_symlink_setup = f"""
# Create filtered files directory with exclusions
echo "Creating filtered files structure with exclusions"
mkdir -p {session_path}/files
# Symlink all top-level directories except user_library
for item in /workspace/files/*; do
[ -e "$item" ] || continue
name=$(basename "$item")
if [ "$name" != "user_library" ]; then
ln -sf "$item" {session_path}/files/"$name"
fi
done
# Excluded paths
EXCLUDED_PATHS="{excluded_paths_str}"
# Recursively create symlinks for non-excluded files
# Using a simple approach: iterate and check each path against exclusions
create_filtered_symlinks() {{
src_dir="$1"
dst_dir="$2"
rel_base="$3"
for item in "$src_dir"/*; do
[ -e "$item" ] || continue
name=$(basename "$item")
if [ -n "$rel_base" ]; then
rel_path="$rel_base/$name"
else
rel_path="$name"
fi
# Check if this path is excluded
excluded=0
for excl in $EXCLUDED_PATHS; do
if [ "$rel_path" = "$excl" ]; then
excluded=1
break
fi
# Check if rel_path starts with excl/ (is a child of excluded dir)
case "$rel_path" in
"$excl"/*) excluded=1; break ;;
esac
done
if [ $excluded -eq 1 ]; then
echo "Excluding: $rel_path"
continue
fi
if [ -d "$item" ]; then
mkdir -p "$dst_dir/$name"
create_filtered_symlinks "$item" "$dst_dir/$name" "$rel_path"
# Remove empty directories
rmdir "$dst_dir/$name" 2>/dev/null || true
else
ln -sf "$item" "$dst_dir/$name"
fi
done
}}
if [ -d "/workspace/files/user_library" ]; then
mkdir -p {session_path}/files/user_library
create_filtered_symlinks /workspace/files/user_library {session_path}/files/user_library ""
# Remove user_library if empty
rmdir {session_path}/files/user_library 2>/dev/null || true
fi
"""
else:
# Normal mode: symlink to user's S3-synced knowledge files
@@ -2011,6 +2078,7 @@ echo "Session config regeneration complete"
sandbox_id: UUID,
user_id: UUID,
tenant_id: str,
source: str | None = None,
) -> bool:
"""Sync files from S3 to the running pod via the file-sync sidecar.
@@ -2019,25 +2087,87 @@ echo "Session config regeneration complete"
This is safe to call multiple times - s5cmd sync is idempotent.
IMPORTANT: All files are synced to /workspace/files/ regardless of
exclude_paths. Session visibility is controlled via filtered symlinks
in setup_session_workspace(), not during sync.
Args:
sandbox_id: The sandbox UUID
user_id: The user ID (for S3 path construction)
tenant_id: The tenant ID (for S3 path construction)
source: Optional source type (e.g., "gmail", "google_drive").
If None, syncs all sources. If specified, only syncs
that source's directory.
exclude_paths: DEPRECATED - ignored. Kept for API compatibility.
Session visibility is controlled via symlinks.
Returns:
True if sync was successful, False otherwise.
"""
pod_name = self._get_pod_name(str(sandbox_id))
# s5cmd sync: high-performance parallel S3 sync (default 256 workers)
# --stat shows transfer statistics for monitoring
s3_path = f"s3://{self._s3_bucket}/{tenant_id}/knowledge/{str(user_id)}/*"
sync_command = [
"/bin/sh",
"-c",
f'/s5cmd --log debug --stat sync "{s3_path}" /workspace/files/; '
f'echo "Files in workspace: $(find /workspace/files -type f | wc -l)"',
]
# Build S3 path based on whether source is specified
if source:
# Sync only the specific source directory
s3_path = f"s3://{self._s3_bucket}/{tenant_id}/knowledge/{str(user_id)}/{source}/*"
local_path = f"/workspace/files/{source}/"
else:
# Sync all sources (original behavior)
s3_path = f"s3://{self._s3_bucket}/{tenant_id}/knowledge/{str(user_id)}/*"
local_path = "/workspace/files/"
# s5cmd sync: high-performance parallel S3 sync
# --delete: mirror S3 to local (remove files that no longer exist in source)
# Use --delete for external connectors (gmail, google_drive) where
# files can be removed from the source.
# Do NOT use --delete for user_library - files are only added by user
# uploads, and deletions are handled explicitly by the delete_file endpoint.
# timeout: prevent zombie processes from kubectl exec disconnections
# trap: kill child processes on exit/disconnect
source_info = f" (source={source})" if source else ""
# Only use --delete for external connectors where the source of truth is external.
# For user_library, we only add files - deletions are handled explicitly.
# When source is None (sync all), we also skip --delete to be safe.
use_delete = source is not None and source != "user_library"
delete_flag = " --delete" if use_delete else ""
sync_script = f"""
# Kill child processes on exit/disconnect to prevent zombie s5cmd workers
cleanup() {{ pkill -P $$ 2>/dev/null || true; }}
trap cleanup EXIT INT TERM
echo "Starting incremental file sync{source_info}"
echo "S3: {s3_path}"
echo "Local: {local_path}"
# Ensure destination exists (needed for source-specific syncs)
mkdir -p "{local_path}"
# Run s5cmd with 5-minute timeout (SIGKILL after 10s if SIGTERM ignored)
# Exit codes: 0=success, 1=success with warnings, 124=timeout
sync_exit_code=0
timeout --signal=TERM --kill-after=10s 5m \
/s5cmd --stat sync{delete_flag} "{s3_path}" "{local_path}" 2>&1 || sync_exit_code=$?
echo "=== Sync finished (exit code: $sync_exit_code) ==="
# Handle result
if [ $sync_exit_code -eq 0 ] || [ $sync_exit_code -eq 1 ]; then
file_count=$(find "{local_path}" -type f 2>/dev/null | wc -l)
echo "Files in {local_path}: $file_count"
echo "SYNC_SUCCESS"
elif [ $sync_exit_code -eq 124 ]; then
echo "ERROR: Sync timed out after 5 minutes"
echo "SYNC_FAILED"
exit 1
else
echo "ERROR: Sync failed (exit code: $sync_exit_code)"
echo "SYNC_FAILED"
exit $sync_exit_code
fi
"""
sync_command = ["/bin/sh", "-c", sync_script]
resp = k8s_stream(
self._stream_core_api.connect_get_namespaced_pod_exec,
pod_name,
@@ -2050,6 +2180,11 @@ echo "Session config regeneration complete"
tty=False,
)
logger.debug(f"File sync response: {resp}")
# Check if sync succeeded based on output markers
if "SYNC_FAILED" in resp:
logger.warning(f"File sync failed for sandbox {sandbox_id}")
return False
return True
def _ensure_agents_md_attachments_section(

View File

@@ -152,6 +152,117 @@ class LocalSandboxManager(SandboxManager):
"""
return self._get_sandbox_path(sandbox_id) / "sessions" / str(session_id)
def _setup_filtered_files(
self,
session_path: Path,
source_path: Path,
excluded_paths: list[str],
) -> None:
"""Set up files directory with filtered symlinks based on exclusions.
Instead of symlinking the entire source directory, this creates a files/
directory structure where:
- Top-level items (except user_library) are symlinked directly
- user_library/ is created as a real directory with filtered symlinks
Args:
session_path: Path to the session directory
source_path: Path to the user's knowledge files (e.g., /storage/tenant/knowledge/user/)
excluded_paths: List of paths within user_library to exclude
(e.g., ["/data/file.xlsx", "/reports/old.pdf"])
"""
files_dir = session_path / "files"
files_dir.mkdir(parents=True, exist_ok=True)
# Normalize excluded paths for comparison (remove leading slash)
excluded_set = {p.lstrip("/") for p in excluded_paths}
if not source_path.exists():
logger.warning(f"Source path does not exist: {source_path}")
return
# Iterate through top-level items in source
for item in source_path.iterdir():
target_link = files_dir / item.name
if item.name == "user_library":
# user_library needs filtered handling
self._setup_filtered_user_library(
target_dir=target_link,
source_dir=item,
excluded_set=excluded_set,
base_path="",
)
else:
# Other directories/files: symlink directly
if not target_link.exists():
target_link.symlink_to(item, target_is_directory=item.is_dir())
def _setup_filtered_user_library(
self,
target_dir: Path,
source_dir: Path,
excluded_set: set[str],
base_path: str,
) -> bool:
"""Recursively set up user_library with filtered symlinks.
Creates directory structure and symlinks only non-excluded files.
Only creates directories if they will contain at least one enabled file.
Args:
target_dir: Where to create the filtered structure
source_dir: Source user_library directory
excluded_set: Set of excluded relative paths (e.g., {"data/file.xlsx"})
base_path: Current path relative to user_library root (for recursion)
Returns:
True if any content was created (files or non-empty subdirectories)
"""
if not source_dir.exists():
return False
has_content = False
for item in source_dir.iterdir():
# Build relative path for exclusion check
rel_path = (
f"{base_path}/{item.name}".lstrip("/") if base_path else item.name
)
target_link = target_dir / item.name
if item.is_dir():
# Check if entire directory is excluded
if rel_path in excluded_set:
logger.debug(f"Excluding directory: user_library/{rel_path}")
continue
# Recurse into directory - only create if it has content
subdir_has_content = self._setup_filtered_user_library(
target_dir=target_link,
source_dir=item,
excluded_set=excluded_set,
base_path=rel_path,
)
if subdir_has_content:
has_content = True
else:
# Check if file is excluded
if rel_path in excluded_set:
logger.debug(f"Excluding file: user_library/{rel_path}")
continue
# Create parent directory if needed (lazy creation)
if not target_dir.exists():
target_dir.mkdir(parents=True, exist_ok=True)
# Create symlink to file
if not target_link.exists():
target_link.symlink_to(item)
has_content = True
return has_content
def provision(
self,
sandbox_id: UUID,
@@ -271,6 +382,7 @@ class LocalSandboxManager(SandboxManager):
user_work_area: str | None = None,
user_level: str | None = None,
use_demo_data: bool = False,
excluded_user_library_paths: list[str] | None = None,
) -> None:
"""Set up a session workspace within an existing sandbox.
@@ -280,7 +392,7 @@ class LocalSandboxManager(SandboxManager):
3. .venv/ (from template)
4. AGENTS.md
5. .agent/skills/
6. files/ (symlink to demo data OR user's file_system_path)
6. files/ (symlink to demo data OR filtered user files)
7. opencode.json
8. org_info/ (if demo_data is enabled, the org structure and user identity for the user's demo persona)
9. attachments/
@@ -297,6 +409,8 @@ class LocalSandboxManager(SandboxManager):
user_work_area: User's work area for demo persona (e.g., "engineering")
user_level: User's level for demo persona (e.g., "ic", "manager")
use_demo_data: If True, symlink files/ to demo data; else to user files
excluded_user_library_paths: List of paths within user_library/ to exclude
(e.g., ["/data/file.xlsx"]). These files won't be linked in the sandbox.
Raises:
RuntimeError: If workspace setup fails
@@ -320,7 +434,7 @@ class LocalSandboxManager(SandboxManager):
logger.debug(f"Session directory created at {session_path}")
try:
# Setup files symlink - choose between demo data or user files
# Setup files access - choose between demo data or user files
if use_demo_data:
# Demo mode: symlink to demo data directory
symlink_target = Path(DEMO_DATA_PATH)
@@ -329,17 +443,33 @@ class LocalSandboxManager(SandboxManager):
f"Demo data directory does not exist: {symlink_target}"
)
logger.info(f"Setting up files symlink to demo data: {symlink_target}")
elif file_system_path:
# Normal mode: symlink to user's knowledge files
symlink_target = Path(file_system_path)
logger.debug(
f"Setting up files symlink to user files: {symlink_target}"
self._directory_manager.setup_files_symlink(
session_path, symlink_target
)
elif file_system_path:
source_path = Path(file_system_path)
# Check if we have exclusions for user_library
if excluded_user_library_paths:
# Create filtered file structure with symlinks to enabled files only
logger.debug(
f"Setting up filtered files with {len(excluded_user_library_paths)} exclusions"
)
self._setup_filtered_files(
session_path=session_path,
source_path=source_path,
excluded_paths=excluded_user_library_paths,
)
else:
# No exclusions: simple symlink to entire directory
logger.debug(
f"Setting up files symlink to user files: {source_path}"
)
self._directory_manager.setup_files_symlink(
session_path, source_path
)
else:
raise ValueError("No files symlink target provided")
self._directory_manager.setup_files_symlink(session_path, symlink_target)
logger.debug("Files symlink ready")
logger.debug("Files ready")
# Setup org_info directory with user identity (at session root)
if user_work_area:
@@ -1056,19 +1186,28 @@ class LocalSandboxManager(SandboxManager):
sandbox_id: UUID,
user_id: UUID, # noqa: ARG002
tenant_id: str, # noqa: ARG002
source: str | None = None, # noqa: ARG002
) -> bool:
"""No-op for local mode - files are directly accessible via symlink.
In local mode, the sandbox's files/ directory is a symlink to the
local persistent document storage, so no sync is needed.
Note: exclude_paths is not implemented for local mode since files are
accessed via symlink. All files in the storage directory are visible.
Args:
sandbox_id: The sandbox UUID (unused)
user_id: The user ID (unused)
tenant_id: The tenant ID (unused)
source: The source type (unused in local mode)
exclude_paths: Paths to exclude (not implemented for local mode)
Returns:
True (always succeeds since no sync is needed)
"""
logger.debug(f"sync_files called for local sandbox {sandbox_id} - no-op")
source_info = f" source={source}" if source else ""
logger.debug(
f"sync_files called for local sandbox {sandbox_id}{source_info} - no-op"
)
return True

View File

@@ -1,12 +1,19 @@
"""Celery tasks for sandbox operations (cleanup, file sync, etc.)."""
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING
from uuid import UUID
from celery import shared_task
from celery import Task
from redis.lock import Lock as RedisLock
if TYPE_CHECKING:
from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.constants import CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine.sql_engine import get_session_with_current_tenant
@@ -247,6 +254,66 @@ def _list_session_directories(
return []
@contextmanager
def _acquire_sandbox_file_sync_lock(lock: RedisLock) -> Iterator[bool]:
"""Acquire the sandbox file-sync lock with blocking timeout; release on exit."""
acquired = lock.acquire(
blocking_timeout=CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT,
)
try:
yield acquired
finally:
if lock.owned():
lock.release()
def _get_disabled_user_library_paths(db_session: "Session", user_id: str) -> list[str]:
"""Get list of disabled user library file paths for exclusion during sync.
Queries the document table for CRAFT_FILE documents with sync_disabled=True
and returns their relative paths within user_library/.
Args:
db_session: Database session
user_id: The user ID to filter documents
Returns:
List of relative file paths to exclude (e.g., ["/data/file.xlsx", "/old/report.pdf"])
"""
from uuid import UUID
from onyx.configs.constants import DocumentSource
from onyx.db.document import get_documents_by_source
disabled_paths: list[str] = []
# Get CRAFT_FILE documents for this user (filtered at SQL level)
documents = get_documents_by_source(
db_session=db_session,
source=DocumentSource.CRAFT_FILE,
creator_id=UUID(user_id),
)
for doc in documents:
doc_metadata = doc.doc_metadata or {}
if not doc_metadata.get("sync_disabled"):
continue
# Skip directories - we only need to exclude actual files
if doc_metadata.get("is_directory"):
continue
# Extract file path from semantic_id
# semantic_id format: "user_library/path/to/file.xlsx"
semantic_id = doc.semantic_id or ""
if semantic_id.startswith("user_library"):
file_path = semantic_id[len("user_library") :]
if file_path:
disabled_paths.append(file_path)
return disabled_paths
@shared_task(
name=OnyxCeleryTask.SANDBOX_FILE_SYNC,
soft_time_limit=TIMEOUT_SECONDS,
@@ -254,7 +321,11 @@ def _list_session_directories(
ignore_result=True,
)
def sync_sandbox_files(
self: Task, *, user_id: str, tenant_id: str # noqa: ARG001
self: Task, # noqa: ARG001
*,
user_id: str,
tenant_id: str,
source: str | None = None,
) -> bool:
"""Sync files from S3 to a user's running sandbox.
@@ -262,46 +333,75 @@ def sync_sandbox_files(
It executes `s5cmd sync` in the file-sync sidecar container to download
any new or changed files.
This is safe to call multiple times - s5cmd sync is idempotent.
Per-user locking ensures only one sync runs at a time for a given user.
If a sync is already in progress, this task will wait until it completes.
For user_library source, files with sync_disabled=True in their metadata
are excluded from the sync and deleted from the sandbox if present.
Args:
user_id: The user ID whose sandbox should be synced
tenant_id: The tenant ID for S3 path construction
source: Optional source type (e.g., "gmail", "google_drive", "user_library").
If None, syncs all sources.
Returns:
True if sync was successful, False if skipped or failed
"""
source_info = f" source={source}" if source else " (all sources)"
task_logger.info(
f"sync_sandbox_files starting for user {user_id} in tenant {tenant_id}"
f"{source_info}"
)
with get_session_with_current_tenant() as db_session:
sandbox = get_sandbox_by_user_id(db_session, UUID(user_id))
lock_timeout = CELERY_SANDBOX_FILE_SYNC_LOCK_TIMEOUT
redis_client = get_redis_client(tenant_id=tenant_id)
lock = redis_client.lock(
f"{OnyxRedisLocks.SANDBOX_FILE_SYNC_LOCK_PREFIX}:{user_id}",
timeout=lock_timeout,
)
if sandbox is None:
task_logger.debug(f"No sandbox found for user {user_id}, skipping sync")
return False
if sandbox.status != SandboxStatus.RUNNING:
task_logger.debug(
f"Sandbox {sandbox.id} not running (status={sandbox.status}), "
f"skipping sync"
with _acquire_sandbox_file_sync_lock(lock) as acquired:
if not acquired:
task_logger.warning(
f"sync_sandbox_files - failed to acquire lock for user {user_id} "
f"after {lock_timeout}s, skipping"
)
return False
sandbox_manager = get_sandbox_manager()
result = sandbox_manager.sync_files(
sandbox_id=sandbox.id,
user_id=UUID(user_id),
tenant_id=tenant_id,
)
with get_session_with_current_tenant() as db_session:
sandbox = get_sandbox_by_user_id(db_session, UUID(user_id))
if sandbox is None:
task_logger.debug(f"No sandbox found for user {user_id}, skipping sync")
return False
if sandbox.status != SandboxStatus.RUNNING:
task_logger.debug(
f"Sandbox {sandbox.id} not running (status={sandbox.status}), "
f"skipping sync"
)
return False
if result:
task_logger.info(f"File sync completed for user {user_id}")
else:
task_logger.warning(f"File sync failed for user {user_id}")
# For user_library source, get list of disabled files to exclude
exclude_paths: list[str] | None = None
if source == "user_library":
exclude_paths = _get_disabled_user_library_paths(db_session, user_id)
if exclude_paths:
task_logger.info(
f"Excluding {len(exclude_paths)} disabled files from user_library sync"
)
return result
sandbox_manager = get_sandbox_manager()
result = sandbox_manager.sync_files(
sandbox_id=sandbox.id,
user_id=UUID(user_id),
tenant_id=tenant_id,
source=source,
)
if result:
task_logger.info(f"File sync completed for user {user_id}{source_info}")
else:
task_logger.warning(f"File sync failed for user {user_id}{source_info}")
return result
# NOTE: in the future, may need to add this. For now, will do manual cleanup.

View File

@@ -563,6 +563,22 @@ class SessionManager:
user_name = user.personal_name if user else None
user_role = user.personal_role if user else None
# Get excluded user library paths (files with sync_disabled=True)
# Only query if not using demo data (user library only applies to user files)
excluded_user_library_paths: list[str] | None = None
if not demo_data_enabled:
from onyx.server.features.build.sandbox.tasks.tasks import (
_get_disabled_user_library_paths,
)
excluded_user_library_paths = _get_disabled_user_library_paths(
self._db_session, str(user_id)
)
if excluded_user_library_paths:
logger.debug(
f"Excluding {len(excluded_user_library_paths)} disabled user library paths"
)
self._sandbox_manager.setup_session_workspace(
sandbox_id=sandbox.id,
session_id=build_session.id,
@@ -575,7 +591,9 @@ class SessionManager:
user_work_area=user_work_area,
user_level=user_level,
use_demo_data=demo_data_enabled,
excluded_user_library_paths=excluded_user_library_paths,
)
sandbox_id = sandbox.id
logger.info(
f"Successfully created session {session_id} with workspace in sandbox {sandbox.id}"

View File

@@ -0,0 +1,57 @@
---
name: excel-files
description: Read and process Excel spreadsheets (.xlsx, .xls) using openpyxl or pandas
---
# Working with Excel Files
User-uploaded Excel files are in `files/user_library/`. Use openpyxl or pandas to read them.
## Reading with openpyxl (Best for formulas & formatting)
```python
from openpyxl import load_workbook
# Load workbook (read_only=True for large files, data_only=True for calculated values)
wb = load_workbook('files/user_library/data.xlsx', read_only=True, data_only=True)
# List sheets
print(wb.sheetnames)
# Read specific sheet
sheet = wb['Revenue']
for row in sheet.iter_rows(min_row=1, max_row=10, values_only=True):
print(row)
```
## Reading with pandas (Best for data analysis)
```python
import pandas as pd
# Read specific sheet into DataFrame
df = pd.read_excel('files/user_library/data.xlsx', sheet_name='Revenue')
print(df.head())
# Read all sheets
all_sheets = pd.read_excel('files/user_library/data.xlsx', sheet_name=None)
for name, df in all_sheets.items():
print(f"Sheet: {name}, Rows: {len(df)}")
```
## Writing Modified Data
Files in `files/` are READ-ONLY. To modify:
```python
# Read, modify, write to outputs/
df = pd.read_excel('files/user_library/data.xlsx')
df['new_column'] = df['Amount'] * 1.1
df.to_excel('outputs/modified_data.xlsx', index=False)
```
## Best Practices
- Use `read_only=True` for files >10MB
- Use `data_only=True` to get calculated values (not formulas)
- For data analysis, pandas is faster; for formatting, use openpyxl

View File

@@ -0,0 +1,78 @@
---
name: powerpoint-files
description: Read and create PowerPoint presentations (.pptx) using python-pptx
---
# Working with PowerPoint Files
User-uploaded PowerPoint files are in `files/user_library/`. Use python-pptx to read them.
## Reading Presentations
```python
from pptx import Presentation
prs = Presentation('files/user_library/deck.pptx')
# Get slide count and titles
print(f"Total slides: {len(prs.slides)}")
for i, slide in enumerate(prs.slides, 1):
print(f"=== Slide {i} ===")
# Extract text from shapes
for shape in slide.shapes:
if hasattr(shape, "text"):
print(shape.text)
# Handle tables
if shape.has_table:
for row in shape.table.rows:
print([cell.text for cell in row.cells])
# Speaker notes
if slide.has_notes_slide:
print(f"Notes: {slide.notes_slide.notes_text_frame.text}")
```
## Creating New Presentations
Write to `outputs/`:
```python
from pptx import Presentation
from pptx.util import Inches, Pt
prs = Presentation()
slide = prs.slides.add_slide(prs.slide_layouts[1]) # Title and Content
slide.shapes.title.text = "My Title"
slide.placeholders[1].text = "Bullet point content"
prs.save('outputs/new_deck.pptx')
```
## Adding Images to Slides
```python
from pptx import Presentation
from pptx.util import Inches
prs = Presentation()
slide = prs.slides.add_slide(prs.slide_layouts[6]) # Blank slide
# Add image
slide.shapes.add_picture(
'outputs/chart.png',
left=Inches(1),
top=Inches(1),
width=Inches(6)
)
prs.save('outputs/presentation_with_image.pptx')
```
## Best Practices
- Files in `files/` are READ-ONLY; write to `outputs/`
- Use `slide.shapes.title` for title shapes
- Access placeholders by index for content areas
- Check `hasattr(shape, "text")` before reading text

View File

@@ -0,0 +1,102 @@
---
name: word-documents
description: Read and create Word documents (.docx) using python-docx
---
# Working with Word Documents
User-uploaded Word documents are in `files/user_library/`. Use python-docx to read them.
## Reading Documents
```python
from docx import Document
doc = Document('files/user_library/report.docx')
# Read all paragraphs
for para in doc.paragraphs:
print(para.text)
# Read with style information
for para in doc.paragraphs:
print(f"Style: {para.style.name}")
print(f"Text: {para.text}")
```
## Reading Tables
```python
from docx import Document
doc = Document('files/user_library/report.docx')
for table in doc.tables:
for row in table.rows:
row_data = [cell.text for cell in row.cells]
print(row_data)
```
## Creating New Documents
Write to `outputs/`:
```python
from docx import Document
from docx.shared import Inches, Pt
doc = Document()
# Add heading
doc.add_heading('Document Title', level=0)
# Add paragraphs
doc.add_paragraph('This is a paragraph.')
doc.add_paragraph('Another paragraph with some text.')
# Add bullet points
doc.add_paragraph('First item', style='List Bullet')
doc.add_paragraph('Second item', style='List Bullet')
doc.save('outputs/new_report.docx')
```
## Adding Tables
```python
from docx import Document
doc = Document()
# Create table with 3 rows and 3 columns
table = doc.add_table(rows=3, cols=3)
table.style = 'Table Grid'
# Fill in data
for i, row in enumerate(table.rows):
for j, cell in enumerate(row.cells):
cell.text = f"Row {i+1}, Col {j+1}"
doc.save('outputs/table_doc.docx')
```
## Adding Images
```python
from docx import Document
from docx.shared import Inches
doc = Document()
doc.add_heading('Report with Image', level=0)
doc.add_picture('outputs/chart.png', width=Inches(5))
doc.add_paragraph('Figure 1: Chart showing data analysis')
doc.save('outputs/report_with_image.docx')
```
## Best Practices
- Files in `files/` are READ-ONLY; write to `outputs/`
- Use `doc.paragraphs` for text content
- Use `doc.tables` for tabular data
- Check document styles with `para.style.name`

View File

@@ -0,0 +1 @@
# Integration tests for Craft/Build features

View File

@@ -0,0 +1,205 @@
"""
Integration tests for the User Library API.
Tests the CRUD operations for user-uploaded raw files in Craft.
"""
import io
import zipfile
import requests
from tests.integration.common_utils.managers.user import UserManager
from tests.integration.common_utils.test_models import DATestUser
API_SERVER_URL = "http://localhost:3000"
def _get_user_library_tree(user: DATestUser) -> list[dict]:
"""Get the user's library tree (returns a flat list of entries)."""
response = requests.get(
f"{API_SERVER_URL}/api/build/user-library/tree",
headers=user.headers,
)
response.raise_for_status()
return response.json()
def _upload_files(user: DATestUser, path: str, files: list[tuple[str, bytes]]) -> dict:
"""Upload files to the user library."""
multipart_files = [("files", (name, content)) for name, content in files]
response = requests.post(
f"{API_SERVER_URL}/api/build/user-library/upload",
headers=user.headers,
data={"path": path},
files=multipart_files,
)
response.raise_for_status()
return response.json()
def _upload_zip(user: DATestUser, path: str, zip_data: bytes) -> dict:
"""Upload and extract a zip file."""
response = requests.post(
f"{API_SERVER_URL}/api/build/user-library/upload-zip",
headers=user.headers,
data={"path": path},
files={"file": ("archive.zip", zip_data)},
)
response.raise_for_status()
return response.json()
def _create_directory(user: DATestUser, name: str, parent_path: str = "/") -> dict:
"""Create a directory in the user library."""
response = requests.post(
f"{API_SERVER_URL}/api/build/user-library/directories",
headers=user.headers,
json={"name": name, "parent_path": parent_path},
)
response.raise_for_status()
return response.json()
def _toggle_sync(user: DATestUser, document_id: str, enabled: bool) -> None:
"""Toggle sync status for a file."""
response = requests.patch(
f"{API_SERVER_URL}/api/build/user-library/files/{document_id}/toggle",
headers=user.headers,
params={"enabled": str(enabled).lower()},
)
response.raise_for_status()
def _delete_file(user: DATestUser, document_id: str) -> None:
"""Delete a file from the user library."""
response = requests.delete(
f"{API_SERVER_URL}/api/build/user-library/files/{document_id}",
headers=user.headers,
)
response.raise_for_status()
def test_user_library_upload_file(reset: None) -> None: # noqa: ARG001
"""Test uploading a single file to the user library."""
admin_user: DATestUser = UserManager.create(name="admin_user")
# Upload a simple CSV file
csv_content = b"name,value\nfoo,1\nbar,2"
result = _upload_files(admin_user, "/", [("data.csv", csv_content)])
assert "entries" in result
assert len(result["entries"]) == 1
assert result["entries"][0]["name"] == "data.csv"
assert result["entries"][0]["sync_enabled"] is True
# Verify it appears in the tree
tree = _get_user_library_tree(admin_user)
assert any(entry["name"] == "data.csv" for entry in tree)
def test_user_library_upload_to_directory(reset: None) -> None: # noqa: ARG001
"""Test uploading files to a specific directory path."""
admin_user: DATestUser = UserManager.create(name="admin_user")
# Create a directory first
_create_directory(admin_user, "reports")
# Upload to that directory
xlsx_content = b"fake xlsx content"
result = _upload_files(admin_user, "/reports", [("quarterly.xlsx", xlsx_content)])
assert len(result["entries"]) == 1
assert result["entries"][0]["path"] == "/reports/quarterly.xlsx"
def test_user_library_upload_zip(reset: None) -> None: # noqa: ARG001
"""Test uploading and extracting a zip file."""
admin_user: DATestUser = UserManager.create(name="admin_user")
# Create an in-memory zip file
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("folder/file1.txt", "content1")
zf.writestr("folder/file2.txt", "content2")
zf.writestr("root.txt", "root content")
zip_buffer.seek(0)
result = _upload_zip(admin_user, "/", zip_buffer.read())
# Should have 3 files extracted
assert "entries" in result
assert len(result["entries"]) >= 3
def test_user_library_create_directory(reset: None) -> None: # noqa: ARG001
"""Test creating a directory."""
admin_user: DATestUser = UserManager.create(name="admin_user")
result = _create_directory(admin_user, "my-data")
assert result["is_directory"] is True
assert result["name"] == "my-data"
# Verify in tree
tree = _get_user_library_tree(admin_user)
assert any(entry["name"] == "my-data" for entry in tree)
def test_user_library_toggle_sync(reset: None) -> None: # noqa: ARG001
"""Test toggling sync status for a file."""
admin_user: DATestUser = UserManager.create(name="admin_user")
# Upload a file
result = _upload_files(admin_user, "/", [("test.txt", b"hello")])
document_id = result["entries"][0]["id"]
# Initially sync is enabled
assert result["entries"][0]["sync_enabled"] is True
# Disable sync
_toggle_sync(admin_user, document_id, False)
# Verify sync is disabled
tree = _get_user_library_tree(admin_user)
entry = next((e for e in tree if e["id"] == document_id), None)
assert entry is not None
assert entry["sync_enabled"] is False
def test_user_library_delete_file(reset: None) -> None: # noqa: ARG001
"""Test deleting a file."""
admin_user: DATestUser = UserManager.create(name="admin_user")
# Upload a file
result = _upload_files(admin_user, "/", [("delete-me.txt", b"temp")])
document_id = result["entries"][0]["id"]
# Delete it
_delete_file(admin_user, document_id)
# Verify it's gone
tree = _get_user_library_tree(admin_user)
assert not any(entry["id"] == document_id for entry in tree)
def test_user_library_isolation_between_users(reset: None) -> None: # noqa: ARG001
"""Test that users can only see their own files."""
admin_user: DATestUser = UserManager.create(name="admin_user")
other_user: DATestUser = UserManager.create(name="other_user")
# Admin uploads a file
_upload_files(admin_user, "/", [("admin-file.txt", b"admin data")])
# Other user uploads a file
_upload_files(other_user, "/", [("other-file.txt", b"other data")])
# Admin should only see their file
admin_tree = _get_user_library_tree(admin_user)
assert any(e["name"] == "admin-file.txt" for e in admin_tree)
assert not any(e["name"] == "other-file.txt" for e in admin_tree)
# Other user should only see their file
other_tree = _get_user_library_tree(other_user)
assert any(e["name"] == "other-file.txt" for e in other_tree)
assert not any(e["name"] == "admin-file.txt" for e in other_tree)

View File

@@ -165,9 +165,10 @@ export function useBuildSessionController({
const currentSessionData = currentState.currentSessionId
? currentState.sessions.get(currentState.currentSessionId)
: null;
const isCurrentlyStreaming =
currentSessionData?.status === "running" ||
currentSessionData?.status === "creating";
// Only block loading during active LLM streaming ("running").
// "creating" means sandbox restore, which should not prevent
// navigating to and loading a different session.
const isCurrentlyStreaming = currentSessionData?.status === "running";
if (
controllerState.loadedSessionId !== existingSessionId &&

View File

@@ -1221,29 +1221,28 @@ export const useBuildSessionStore = create<BuildSessionStore>()((set, get) => ({
!sessionData.session_loaded_in_sandbox);
if (needsRestore) {
// Update UI: show sandbox as "restoring" and session as loading
// Show sandbox as "restoring" while we load messages + restore
updateSessionData(sessionId, {
status: "creating",
sandbox: sessionData.sandbox
? { ...sessionData.sandbox, status: "restoring" }
: null,
});
// Call restore endpoint (blocks until complete)
sessionData = await restoreSession(sessionId);
// Clear the "creating" loading indicator so subsequent logic
// doesn't mistake this for an active streaming session.
updateSessionData(sessionId, { status: "idle" });
}
// Now fetch messages and artifacts
const [messages, artifacts] = await Promise.all([
fetchMessages(sessionId),
fetchArtifacts(sessionId),
]);
// Messages come from DB and don't need the sandbox running.
// Artifacts need sandbox filesystem, so skip during restore.
const messages = await fetchMessages(sessionId);
const artifacts = needsRestore ? [] : await fetchArtifacts(sessionId);
// Construct webapp URL if sandbox has a Next.js port and there's a webapp artifact
// Preserve optimistic messages if actively streaming (pre-provisioned flow).
const currentSession = get().sessions.get(sessionId);
const isStreaming =
(currentSession?.messages?.length ?? 0) > 0 &&
(currentSession?.status === "running" ||
currentSession?.status === "creating");
// Construct webapp URL
let webappUrl: string | null = null;
const hasWebapp = artifacts.some(
(a) => a.type === "nextjs_app" || a.type === "web_app"
@@ -1252,50 +1251,62 @@ export const useBuildSessionStore = create<BuildSessionStore>()((set, get) => ({
webappUrl = `http://localhost:${sessionData.sandbox.nextjs_port}`;
}
// Re-fetch existing session to check for optimistic messages
const currentSession = get().sessions.get(sessionId);
const hasOptimisticMessages = (currentSession?.messages.length ?? 0) > 0;
const isCurrentlyStreaming =
currentSession?.status === "running" ||
currentSession?.status === "creating";
// Consolidate messages into proper conversation turns
// Each assistant turn becomes a single message with streamItems in metadata
// If there are optimistic messages (active streaming), preserve current state
const messagesToUse = hasOptimisticMessages
const status = isStreaming
? currentSession!.status
: needsRestore
? "creating"
: sessionData.status === "active"
? "active"
: "idle";
const resolvedMessages = isStreaming
? currentSession!.messages
: consolidateMessagesIntoTurns(messages);
// Session-level streamItems are only for current streaming response
// When loading from history, they should be empty (each message has its own streamItems)
const streamItemsToUse = hasOptimisticMessages
? currentSession!.streamItems
: [];
// Preserve streaming status if currently streaming, otherwise use backend status
const statusToUse = isCurrentlyStreaming
? currentSession!.status
: sessionData.status === "active"
? "active"
: "idle";
const streamItems = isStreaming ? currentSession!.streamItems : [];
const sandbox =
needsRestore && sessionData.sandbox
? { ...sessionData.sandbox, status: "restoring" as const }
: sessionData.sandbox;
updateSessionData(sessionId, {
status: statusToUse,
// Preserve optimistic messages if they exist (e.g., from pre-provisioned flow)
messages: messagesToUse,
streamItems: streamItemsToUse,
status,
messages: resolvedMessages,
streamItems,
artifacts,
webappUrl,
sandbox: sessionData.sandbox,
sandbox,
error: null,
isLoaded: true,
// After restore, bump webappNeedsRefresh so OutputPanel's SWR refetches
// webapp-info. Done here (not earlier) so all session data is set atomically.
...(needsRestore
? {
webappNeedsRefresh:
(get().sessions.get(sessionId)?.webappNeedsRefresh || 0) + 1,
}
: {}),
});
// Now restore the sandbox if needed (messages are already visible).
// The backend enforces a timeout and returns an error if restore
// takes too long, so no frontend timeout needed here.
if (needsRestore) {
try {
sessionData = await restoreSession(sessionId);
// Sandbox is now running - fetch artifacts
const restoredArtifacts = await fetchArtifacts(sessionId);
updateSessionData(sessionId, {
status: sessionData.status === "active" ? "active" : "idle",
artifacts: restoredArtifacts,
sandbox: sessionData.sandbox,
// Bump so OutputPanel's SWR refetches webapp-info (which
// derives the actual webappUrl from the backend).
webappNeedsRefresh:
(get().sessions.get(sessionId)?.webappNeedsRefresh || 0) + 1,
});
} catch (restoreErr) {
console.error("Sandbox restore failed:", restoreErr);
updateSessionData(sessionId, {
status: "idle",
sandbox: sessionData.sandbox
? { ...sessionData.sandbox, status: "failed" }
: null,
});
}
}
} catch (err) {
console.error("Failed to load session:", err);
updateSessionData(sessionId, {

View File

@@ -623,3 +623,141 @@ export async function deleteConnector(
);
}
}
// =============================================================================
// User Library API
// =============================================================================
import {
LibraryEntry,
CreateDirectoryRequest,
UploadResponse,
} from "@/app/craft/types/user-library";
const USER_LIBRARY_BASE = `${API_BASE}/user-library`;
/**
* Fetch the user's library tree (uploaded files).
*/
export async function fetchLibraryTree(): Promise<LibraryEntry[]> {
const res = await fetch(`${USER_LIBRARY_BASE}/tree`);
if (!res.ok) {
throw new Error(`Failed to fetch library tree: ${res.status}`);
}
return res.json();
}
/**
* Upload files to the user library.
*/
export async function uploadLibraryFiles(
path: string,
files: File[]
): Promise<UploadResponse> {
const formData = new FormData();
formData.append("path", path);
for (const file of files) {
formData.append("files", file);
}
const res = await fetch(`${USER_LIBRARY_BASE}/upload`, {
method: "POST",
body: formData,
});
if (!res.ok) {
const errorData = await res.json().catch(() => ({}));
throw new Error(
errorData.detail || `Failed to upload files: ${res.status}`
);
}
return res.json();
}
/**
* Upload and extract a zip file to the user library.
*/
export async function uploadLibraryZip(
path: string,
file: File
): Promise<UploadResponse> {
const formData = new FormData();
formData.append("path", path);
formData.append("file", file);
const res = await fetch(`${USER_LIBRARY_BASE}/upload-zip`, {
method: "POST",
body: formData,
});
if (!res.ok) {
const errorData = await res.json().catch(() => ({}));
throw new Error(errorData.detail || `Failed to upload zip: ${res.status}`);
}
return res.json();
}
/**
* Create a directory in the user library.
*/
export async function createLibraryDirectory(
request: CreateDirectoryRequest
): Promise<LibraryEntry> {
const res = await fetch(`${USER_LIBRARY_BASE}/directories`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(request),
});
if (!res.ok) {
const errorData = await res.json().catch(() => ({}));
throw new Error(
errorData.detail || `Failed to create directory: ${res.status}`
);
}
return res.json();
}
/**
* Toggle sync status for a file/directory in the user library.
*/
export async function toggleLibraryFileSync(
documentId: string,
enabled: boolean
): Promise<void> {
const res = await fetch(
`${USER_LIBRARY_BASE}/files/${encodeURIComponent(
documentId
)}/toggle?enabled=${enabled}`,
{
method: "PATCH",
}
);
if (!res.ok) {
const errorData = await res.json().catch(() => ({}));
throw new Error(errorData.detail || `Failed to toggle sync: ${res.status}`);
}
}
/**
* Delete a file/directory from the user library.
*/
export async function deleteLibraryFile(documentId: string): Promise<void> {
const res = await fetch(
`${USER_LIBRARY_BASE}/files/${encodeURIComponent(documentId)}`,
{
method: "DELETE",
}
);
if (!res.ok) {
const errorData = await res.json().catch(() => ({}));
throw new Error(errorData.detail || `Failed to delete file: ${res.status}`);
}
}

View File

@@ -0,0 +1,26 @@
/**
* Types for User Library - raw binary file uploads in Craft.
*/
export interface LibraryEntry {
id: string; // document_id
name: string;
path: string;
is_directory: boolean;
file_size: number | null;
mime_type: string | null;
sync_enabled: boolean;
created_at: string;
children?: LibraryEntry[];
}
export interface CreateDirectoryRequest {
name: string;
parent_path: string;
}
export interface UploadResponse {
entries: LibraryEntry[];
total_uploaded: number;
total_size_bytes: number;
}

View File

@@ -131,10 +131,6 @@ export default function ConfigureConnectorModal({
setStep("credential");
};
const handleConnectorSuccess = () => {
onSuccess();
};
// Dynamic title and description based on flow type
const getStepTitle = () => {
if (isSingleStep) {
@@ -192,14 +188,14 @@ export default function ConfigureConnectorModal({
onOAuthRedirect={handleOAuthRedirect}
refresh={refreshCredentials}
isSingleStep={isSingleStep}
onConnectorSuccess={handleConnectorSuccess}
onConnectorSuccess={onSuccess}
setPopup={setPopup}
/>
) : selectedCredential ? (
<ConnectorConfigStep
connectorType={connectorType}
credential={selectedCredential}
onSuccess={handleConnectorSuccess}
onSuccess={onSuccess}
onBack={handleBack}
setPopup={setPopup}
/>

View File

@@ -29,10 +29,14 @@ export function ConnectorInfoOverlay({ visible }: ConnectorInfoOverlayProps) {
interface ReprovisionWarningOverlayProps {
visible: boolean;
onUpdate?: () => void;
isUpdating?: boolean;
}
export function ReprovisionWarningOverlay({
visible,
onUpdate,
isUpdating,
}: ReprovisionWarningOverlayProps) {
return (
<div
@@ -48,6 +52,8 @@ export function ReprovisionWarningOverlay({
text="Click Update to apply your changes"
description="Your sandbox will be recreated with your new settings. Previously running sessions will not be affected by your changes."
close={false}
actions={isUpdating ? "Updating..." : "Update"}
onAction={isUpdating ? undefined : onUpdate}
/>
</div>
);

View File

@@ -105,13 +105,21 @@ export default function ConnectorCard({
const sourceMetadata = getSourceMetadata(connectorType);
const status: ConnectorStatus = config?.status || "not_connected";
const isConnected = status !== "not_connected" && status !== "deleting";
const isDeleting = status === "deleting";
// Check if this connector type is always available (doesn't need connection setup)
const isAlwaysConnected = sourceMetadata.alwaysConnected ?? false;
const customDescription = sourceMetadata.customDescription;
const handleCardClick = () => {
if (isDeleting) {
return; // No action while deleting
}
// Always-connected connectors always go to onConfigure
if (isAlwaysConnected) {
onConfigure();
return;
}
if (isConnected) {
setPopoverOpen(true);
} else {
@@ -119,7 +127,11 @@ export default function ConnectorCard({
}
};
const rightContent = isDeleting ? null : isConnected ? (
// Always-connected connectors show a settings icon
// Regular connectors show popover menu when connected, plug icon when not
const rightContent = isDeleting ? null : isAlwaysConnected ? (
<IconButton icon={SvgSettings} internal />
) : isConnected ? (
<Popover open={popoverOpen} onOpenChange={setPopoverOpen}>
<Popover.Trigger asChild>
<IconButton
@@ -164,21 +176,32 @@ export default function ConnectorCard({
<IconButton icon={SvgPlug} internal />
);
// Always-connected connectors show as "primary" variant
const cardVariant =
isAlwaysConnected || isConnected ? "primary" : "secondary";
// Use custom description if provided, otherwise show status
const descriptionContent = customDescription ? (
<Text secondaryBody text03>
{customDescription}
</Text>
) : (
<StatusDescription
status={status}
docsIndexed={config?.docs_indexed || 0}
/>
);
return (
<div
className={cn(!isDeleting && "cursor-pointer")}
onClick={handleCardClick}
>
<Card variant={isConnected ? "primary" : "secondary"}>
<Card variant={cardVariant}>
<LineItemLayout
icon={sourceMetadata.icon}
title={sourceMetadata.displayName}
description={
<StatusDescription
status={status}
docsIndexed={config?.docs_indexed || 0}
/>
}
description={descriptionContent}
rightChildren={rightContent}
center
/>

View File

@@ -0,0 +1,529 @@
"use client";
import { useState, useCallback, useRef, useMemo } from "react";
import useSWR from "swr";
import {
fetchLibraryTree,
uploadLibraryFiles,
uploadLibraryZip,
createLibraryDirectory,
toggleLibraryFileSync,
deleteLibraryFile,
} from "@/app/craft/services/apiServices";
import { LibraryEntry } from "@/app/craft/types/user-library";
import { cn } from "@/lib/utils";
import Text from "@/refresh-components/texts/Text";
import Button from "@/refresh-components/buttons/Button";
import Modal from "@/refresh-components/Modal";
import { Section } from "@/layouts/general-layouts";
import {
SvgFolder,
SvgFolderOpen,
SvgChevronRight,
SvgUploadCloud,
SvgPlus,
SvgTrash,
SvgFileText,
} from "@opal/icons";
import Switch from "@/refresh-components/inputs/Switch";
import InputTypeIn from "@/refresh-components/inputs/InputTypeIn";
import SimpleTooltip from "@/refresh-components/SimpleTooltip";
import { ConfirmEntityModal } from "@/components/modals/ConfirmEntityModal";
/**
* Build a hierarchical tree from a flat list of library entries.
* Entries have paths like "user_library/test" or "user_library/test/file.pdf"
*/
function buildTreeFromFlatList(flatList: LibraryEntry[]): LibraryEntry[] {
// Create a map of path -> entry (with children array initialized)
const pathToEntry = new Map<string, LibraryEntry>();
// First pass: create entries with empty children arrays
for (const entry of flatList) {
pathToEntry.set(entry.path, { ...entry, children: [] });
}
// Second pass: build parent-child relationships
const rootEntries: LibraryEntry[] = [];
for (const entry of flatList) {
const entryWithChildren = pathToEntry.get(entry.path)!;
// Find parent path by removing the last segment
const pathParts = entry.path.split("/");
pathParts.pop(); // Remove last segment (filename or folder name)
const parentPath = pathParts.join("/");
const parent = pathToEntry.get(parentPath);
if (parent && parent.children) {
parent.children.push(entryWithChildren);
} else {
// No parent found, this is a root-level entry
rootEntries.push(entryWithChildren);
}
}
return rootEntries;
}
interface UserLibraryModalProps {
open: boolean;
onClose: () => void;
onChanges?: () => void; // Called when files are uploaded, deleted, or sync toggled
}
export default function UserLibraryModal({
open,
onClose,
onChanges,
}: UserLibraryModalProps) {
const [expandedPaths, setExpandedPaths] = useState<Set<string>>(new Set());
const [isUploading, setIsUploading] = useState(false);
const [uploadError, setUploadError] = useState<string | null>(null);
const [entryToDelete, setEntryToDelete] = useState<LibraryEntry | null>(null);
const [showNewFolderModal, setShowNewFolderModal] = useState(false);
const [newFolderName, setNewFolderName] = useState("");
const fileInputRef = useRef<HTMLInputElement>(null);
const uploadTargetPathRef = useRef<string>("/");
// Fetch library tree
const {
data: tree,
error,
isLoading,
mutate,
} = useSWR(open ? "/api/build/user-library/tree" : null, fetchLibraryTree, {
revalidateOnFocus: false,
});
// Build hierarchical tree from flat list
const hierarchicalTree = useMemo(() => {
if (!tree) return [];
return buildTreeFromFlatList(tree);
}, [tree]);
const toggleFolder = useCallback((path: string) => {
setExpandedPaths((prev) => {
const newSet = new Set(prev);
if (newSet.has(path)) {
newSet.delete(path);
} else {
newSet.add(path);
}
return newSet;
});
}, []);
const handleFileUpload = useCallback(
async (event: React.ChangeEvent<HTMLInputElement>) => {
const files = event.target.files;
if (!files || files.length === 0) return;
setIsUploading(true);
setUploadError(null);
const targetPath = uploadTargetPathRef.current;
try {
const fileArray = Array.from(files);
// Check if it's a single zip file
const firstFile = fileArray[0];
if (
fileArray.length === 1 &&
firstFile &&
firstFile.name.endsWith(".zip")
) {
await uploadLibraryZip(targetPath, firstFile);
} else {
await uploadLibraryFiles(targetPath, fileArray);
}
mutate();
onChanges?.(); // Notify parent that changes were made
} catch (err) {
setUploadError(err instanceof Error ? err.message : "Upload failed");
} finally {
setIsUploading(false);
uploadTargetPathRef.current = "/";
// Reset input
event.target.value = "";
}
},
[mutate, onChanges]
);
const handleUploadToFolder = useCallback((folderPath: string) => {
uploadTargetPathRef.current = folderPath;
fileInputRef.current?.click();
}, []);
const handleToggleSync = useCallback(
async (entry: LibraryEntry, enabled: boolean) => {
try {
await toggleLibraryFileSync(entry.id, enabled);
mutate();
onChanges?.(); // Notify parent that changes were made
} catch (err) {
console.error("Failed to toggle sync:", err);
}
},
[mutate, onChanges]
);
const handleDeleteConfirm = useCallback(async () => {
if (!entryToDelete) return;
try {
await deleteLibraryFile(entryToDelete.id);
mutate();
onChanges?.(); // Notify parent that changes were made
} catch (err) {
console.error("Failed to delete:", err);
} finally {
setEntryToDelete(null);
}
}, [entryToDelete, mutate, onChanges]);
const handleCreateDirectory = useCallback(async () => {
const name = newFolderName.trim();
if (!name) return;
try {
await createLibraryDirectory({ name, parent_path: "/" });
mutate();
} catch (err) {
console.error("Failed to create directory:", err);
setUploadError(
err instanceof Error ? err.message : "Failed to create folder"
);
} finally {
setShowNewFolderModal(false);
setNewFolderName("");
}
}, [mutate, newFolderName]);
const formatFileSize = (bytes: number | null): string => {
if (bytes === null) return "";
if (bytes < 1024) return `${bytes} B`;
if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`;
return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
};
const fileCount = hierarchicalTree.length;
return (
<>
<Modal open={open} onOpenChange={(isOpen) => !isOpen && onClose()}>
<Modal.Content width="md" height="fit">
<Modal.Header
icon={SvgFileText}
title="Your Files"
description="Upload files for your agent to read (Excel, Word, PowerPoint, etc.)"
onClose={onClose}
/>
<Modal.Body>
<Section flexDirection="column" gap={1} alignItems="stretch">
{/* Action buttons */}
<Section flexDirection="row" justifyContent="end" gap={0.5}>
<Button
secondary
onClick={() => setShowNewFolderModal(true)}
leftIcon={SvgPlus}
>
New Folder
</Button>
<input
ref={fileInputRef}
type="file"
multiple
className="hidden"
onChange={handleFileUpload}
disabled={isUploading}
accept=".xlsx,.xls,.docx,.doc,.pptx,.ppt,.csv,.json,.txt,.pdf,.zip"
/>
<Button
secondary
onClick={() => handleUploadToFolder("/")}
leftIcon={SvgUploadCloud}
disabled={isUploading}
>
{isUploading ? "Uploading..." : "Upload"}
</Button>
</Section>
{/* Upload error */}
{uploadError && (
<div className="p-2 bg-status-error-01 rounded-08">
<Text secondaryBody>{uploadError}</Text>
</div>
)}
{/* File list */}
<div className="w-full border border-border-01 rounded-08 min-h-[200px] max-h-[400px] overflow-auto">
{isLoading ? (
<div className="flex flex-col items-center justify-center p-8">
<Text secondaryBody text03>
Loading files...
</Text>
</div>
) : error ? (
<div className="flex flex-col items-center justify-center p-8">
<Text secondaryBody text03>
Failed to load files
</Text>
</div>
) : fileCount === 0 ? (
<div className="flex flex-col items-center justify-center p-8">
<SvgFileText size={32} className="mb-3 stroke-text-02" />
<Text secondaryBody text03>
No files uploaded yet
</Text>
<Text secondaryBody text02 className="mt-1 text-center">
Upload Excel, Word, PowerPoint, or other files for your
agent to work with
</Text>
</div>
) : (
<div className="w-full p-2">
<LibraryTreeView
entries={hierarchicalTree}
expandedPaths={expandedPaths}
onToggleFolder={toggleFolder}
onToggleSync={handleToggleSync}
onDelete={setEntryToDelete}
onUploadToFolder={handleUploadToFolder}
formatFileSize={formatFileSize}
/>
</div>
)}
</div>
</Section>
</Modal.Body>
<Modal.Footer>
<Button onClick={onClose}>Done</Button>
</Modal.Footer>
</Modal.Content>
</Modal>
{/* Delete confirmation modal */}
{entryToDelete && (
<ConfirmEntityModal
danger
entityType={entryToDelete.is_directory ? "folder" : "file"}
entityName={entryToDelete.name}
action="delete"
actionButtonText="Delete"
additionalDetails={
entryToDelete.is_directory
? "This will delete the folder and all its contents."
: "This file will be removed from your library."
}
onClose={() => setEntryToDelete(null)}
onSubmit={handleDeleteConfirm}
/>
)}
{/* New folder modal */}
<Modal
open={showNewFolderModal}
onOpenChange={(isOpen) => {
if (!isOpen) {
setShowNewFolderModal(false);
setNewFolderName("");
}
}}
>
<Modal.Content width="sm" height="fit">
<Modal.Header
icon={SvgFolder}
title="New Folder"
onClose={() => {
setShowNewFolderModal(false);
setNewFolderName("");
}}
/>
<Modal.Body>
<Section flexDirection="column" gap={0.5} alignItems="stretch">
<Text secondaryBody text03>
Folder name
</Text>
<InputTypeIn
value={newFolderName}
onChange={(e) => setNewFolderName(e.target.value)}
placeholder="Enter folder name"
onKeyDown={(e) => {
if (e.key === "Enter" && newFolderName.trim()) {
handleCreateDirectory();
}
}}
autoFocus
/>
</Section>
</Modal.Body>
<Modal.Footer>
<Button
secondary
onClick={() => {
setShowNewFolderModal(false);
setNewFolderName("");
}}
>
Cancel
</Button>
<Button
onClick={handleCreateDirectory}
disabled={!newFolderName.trim()}
>
Create
</Button>
</Modal.Footer>
</Modal.Content>
</Modal>
</>
);
}
interface LibraryTreeViewProps {
entries: LibraryEntry[];
expandedPaths: Set<string>;
onToggleFolder: (path: string) => void;
onToggleSync: (entry: LibraryEntry, enabled: boolean) => void;
onDelete: (entry: LibraryEntry) => void;
onUploadToFolder: (folderPath: string) => void;
formatFileSize: (bytes: number | null) => string;
depth?: number;
}
function LibraryTreeView({
entries,
expandedPaths,
onToggleFolder,
onToggleSync,
onDelete,
onUploadToFolder,
formatFileSize,
depth = 0,
}: LibraryTreeViewProps) {
// Sort entries: directories first, then alphabetically
const sortedEntries = [...entries].sort((a, b) => {
if (a.is_directory && !b.is_directory) return -1;
if (!a.is_directory && b.is_directory) return 1;
return a.name.localeCompare(b.name);
});
return (
<>
{sortedEntries.map((entry) => {
const isExpanded = expandedPaths.has(entry.path);
return (
<div key={entry.id}>
<div
className={cn(
"flex items-center py-2 px-3 hover:bg-background-tint-02 rounded-08 transition-colors",
"group"
)}
style={{ paddingLeft: `${depth * 20 + 12}px` }}
>
{/* Expand/collapse for directories */}
{entry.is_directory ? (
<button
onClick={() => onToggleFolder(entry.path)}
className="p-0.5 rounded hover:bg-background-tint-03"
>
<SvgChevronRight
size={14}
className={cn(
"stroke-text-03 transition-transform",
isExpanded && "rotate-90"
)}
/>
</button>
) : (
<span className="w-5" />
)}
{/* Icon */}
{entry.is_directory ? (
isExpanded ? (
<SvgFolderOpen size={16} className="stroke-text-03 mx-1.5" />
) : (
<SvgFolder size={16} className="stroke-text-03 mx-1.5" />
)
) : (
<SvgFileText size={16} className="stroke-text-03 mx-1.5" />
)}
{/* Name */}
<Text secondaryBody text04 className="flex-1 truncate">
{entry.name}
</Text>
{/* File size */}
{!entry.is_directory && entry.file_size !== null && (
<Text secondaryBody text02 className="mx-2 flex-shrink-0">
{formatFileSize(entry.file_size)}
</Text>
)}
{/* Actions - show on hover */}
<div className="flex items-center gap-2 opacity-0 group-hover:opacity-100 transition-opacity">
{/* Upload to folder button - only for directories */}
{entry.is_directory && (
<SimpleTooltip tooltip="Upload to this folder">
<button
onClick={(e) => {
e.stopPropagation();
// Strip "user_library" prefix from path for upload API
const uploadPath =
entry.path.replace(/^user_library/, "") || "/";
onUploadToFolder(uploadPath);
}}
className="p-1 rounded hover:bg-background-tint-03 transition-colors"
>
<SvgUploadCloud size={14} className="stroke-text-03" />
</button>
</SimpleTooltip>
)}
<button
onClick={() => onDelete(entry)}
className="p-1 rounded hover:bg-status-error-01 transition-colors"
>
<SvgTrash size={14} className="stroke-text-03" />
</button>
</div>
{/* Sync toggle - always visible, rightmost */}
<SimpleTooltip
tooltip={
entry.sync_enabled
? "Synced to sandbox - click to disable"
: "Not synced - click to enable"
}
>
<Switch
checked={entry.sync_enabled}
onCheckedChange={(checked) => onToggleSync(entry, checked)}
/>
</SimpleTooltip>
</div>
{/* Children */}
{entry.is_directory && isExpanded && entry.children && (
<LibraryTreeView
entries={entry.children}
expandedPaths={expandedPaths}
onToggleFolder={onToggleFolder}
onToggleSync={onToggleSync}
onDelete={onDelete}
onUploadToFolder={onUploadToFolder}
formatFileSize={formatFileSize}
depth={depth + 1}
/>
)}
</div>
);
})}
</>
);
}

View File

@@ -14,8 +14,12 @@ import { useBuildConnectors } from "@/app/craft/hooks/useBuildConnectors";
import { BuildLLMPopover } from "@/app/craft/components/BuildLLMPopover";
import Text from "@/refresh-components/texts/Text";
import Card from "@/refresh-components/cards/Card";
import { SvgPlug, SvgSettings, SvgChevronDown } from "@opal/icons";
import { FiInfo } from "react-icons/fi";
import {
SvgPlug,
SvgSettings,
SvgChevronDown,
SvgInfoSmall,
} from "@opal/icons";
import { ValidSources } from "@/lib/types";
import ConnectorCard, {
BuildConnectorConfig,
@@ -23,6 +27,7 @@ import ConnectorCard, {
import ConfigureConnectorModal from "@/app/craft/v1/configure/components/ConfigureConnectorModal";
import ComingSoonConnectors from "@/app/craft/v1/configure/components/ComingSoonConnectors";
import DemoDataConfirmModal from "@/app/craft/v1/configure/components/DemoDataConfirmModal";
import UserLibraryModal from "@/app/craft/v1/configure/components/UserLibraryModal";
import {
ConnectorInfoOverlay,
ReprovisionWarningOverlay,
@@ -63,6 +68,7 @@ const BUILD_CONNECTORS: ValidSources[] = [
ValidSources.Linear,
ValidSources.Fireflies,
ValidSources.Hubspot,
ValidSources.CraftFile, // User's uploaded files
];
interface SelectedConnectorState {
@@ -87,6 +93,7 @@ export default function BuildConfigPage() {
const [showNotAllowedModal, setShowNotAllowedModal] = useState(false);
const [showDemoDataConfirmModal, setShowDemoDataConfirmModal] =
useState(false);
const [showUserLibraryModal, setShowUserLibraryModal] = useState(false);
const [pendingDemoDataEnabled, setPendingDemoDataEnabled] = useState<
boolean | null
>(null);
@@ -95,6 +102,7 @@ export default function BuildConfigPage() {
const [pendingLlmSelection, setPendingLlmSelection] =
useState<BuildLlmSelection | null>(null);
const [pendingDemoData, setPendingDemoData] = useState<boolean | null>(null);
const [userLibraryChanged, setUserLibraryChanged] = useState(false);
const [isUpdating, setIsUpdating] = useState(false);
// Track original values (set on mount and after Update)
@@ -152,12 +160,13 @@ export default function BuildConfigPage() {
originalDemoData !== null &&
pendingDemoData !== originalDemoData;
return llmChanged || demoDataChanged;
return llmChanged || demoDataChanged || userLibraryChanged;
}, [
pendingLlmSelection,
pendingDemoData,
originalLlmSelection,
originalDemoData,
userLibraryChanged,
]);
// Compute display name for the pending LLM selection
@@ -208,9 +217,12 @@ export default function BuildConfigPage() {
}, [pendingDemoDataEnabled]);
// Restore changes - revert pending state to original values
// Note: User Library changes cannot be reverted (files already uploaded/deleted/toggled)
// so we just reset the flag - user needs to manually undo file changes if desired
const handleRestoreChanges = useCallback(() => {
setPendingLlmSelection(originalLlmSelection);
setPendingDemoData(originalDemoData);
setUserLibraryChanged(false);
}, [originalLlmSelection, originalDemoData]);
// Update - apply pending changes and re-provision sandbox
@@ -236,6 +248,9 @@ export default function BuildConfigPage() {
// 3. Start provisioning a new session with updated settings
ensurePreProvisionedSession();
// 4. Reset User Library change flag (sandbox now has the updated files)
setUserLibraryChanged(false);
} catch (error) {
console.error("Failed to update settings:", error);
} finally {
@@ -518,7 +533,10 @@ export default function BuildConfigPage() {
<div className="flex items-center gap-2">
<SimpleTooltip tooltip="The demo dataset contains 1000 files across various connectors">
<span className="inline-flex items-center cursor-help">
<FiInfo size={16} className="text-text-03" />
<SvgInfoSmall
size={16}
className="text-text-03"
/>
</span>
</SimpleTooltip>
<Text mainUiAction>Use Demo Dataset</Text>
@@ -541,24 +559,32 @@ export default function BuildConfigPage() {
</div>
</div>
<div className="w-full grid grid-cols-1 md:grid-cols-2 gap-2 pt-2">
{connectorStates.map(({ type, config }) => (
<ConnectorCard
key={type}
connectorType={type}
config={config}
onConfigure={() => {
// Only open modal for unconfigured connectors
if (!config) {
if (isBasicUser) {
setShowNotAllowedModal(true);
} else {
setSelectedConnector({ type, config });
{connectorStates.map(({ type, config }) => {
const metadata = getSourceMetadata(type);
return (
<ConnectorCard
key={type}
connectorType={type}
config={config}
onConfigure={() => {
// Connectors marked as alwaysConnected open their custom modal
if (metadata.alwaysConnected) {
setShowUserLibraryModal(true);
return;
}
}
}}
onDelete={() => config && setConnectorToDelete(config)}
/>
))}
// Only open modal for unconfigured connectors
if (!config) {
if (isBasicUser) {
setShowNotAllowedModal(true);
} else {
setSelectedConnector({ type, config });
}
}
}}
onDelete={() => config && setConnectorToDelete(config)}
/>
);
})}
</div>
<ComingSoonConnectors />
</Section>
@@ -567,7 +593,11 @@ export default function BuildConfigPage() {
{/* Sticky overlay for reprovision warning */}
<div className="sticky z-toast bottom-10 w-fit mx-auto">
<ReprovisionWarningOverlay visible={hasChanges && !isLoading} />
<ReprovisionWarningOverlay
visible={hasChanges && !isLoading}
onUpdate={handleUpdate}
isUpdating={isUpdating || isPreProvisioning}
/>
</div>
{/* Fixed overlay for connector info - centered on screen like the modal */}
@@ -615,6 +645,12 @@ export default function BuildConfigPage() {
pendingDemoDataEnabled={pendingDemoDataEnabled}
onConfirm={handleDemoDataConfirm}
/>
<UserLibraryModal
open={showUserLibraryModal}
onClose={() => setShowUserLibraryModal(false)}
onChanges={() => setUserLibraryChanged(true)}
/>
</SettingsLayouts.Root>
</div>
);

View File

@@ -459,6 +459,7 @@ export const credentialTemplates: Record<ValidSources, any> = {
google_sites: null,
file: null,
user_file: null,
craft_file: null, // User Library - managed through dedicated UI
wikipedia: null,
mediawiki: null,
web: null,

View File

@@ -142,6 +142,11 @@ export interface SourceMetadata {
uniqueKey?: string;
// For federated connectors, this stores the base source type for the icon
baseSourceType?: ValidSources;
// For connectors that are always available (don't need connection setup)
// e.g., User Library (CraftFile) where users just upload files
alwaysConnected?: boolean;
// Custom description to show instead of status (e.g., "Manage your uploaded files")
customDescription?: string;
}
export interface SearchDefaultOverrides {

View File

@@ -67,6 +67,11 @@ interface PartialSourceMetadata {
// federated connectors store the base source type if it's a source
// that has both indexed connectors and federated connectors
baseSourceType?: ValidSources;
// For connectors that are always available (don't need connection setup)
// e.g., User Library (CraftFile) where users just upload files
alwaysConnected?: boolean;
// Custom description to show instead of status (e.g., "Manage your uploaded files")
customDescription?: string;
}
type SourceMap = {
@@ -422,6 +427,16 @@ export const SOURCE_METADATA_MAP: SourceMap = {
category: SourceCategory.Other,
},
// Craft-specific sources
craft_file: {
icon: SvgFileText,
displayName: "Your Files",
category: SourceCategory.Other,
isPopular: false, // Hidden from standard Add Connector page
alwaysConnected: true, // No setup required, just upload files
customDescription: "Manage your uploaded files",
},
// Placeholder (non-null default)
not_applicable: {
icon: SvgGlobe,

View File

@@ -509,6 +509,9 @@ export enum ValidSources {
Bitbucket = "bitbucket",
TestRail = "testrail",
// Craft-specific sources
CraftFile = "craft_file",
// Federated Connectors
FederatedSlack = "federated_slack",
}
@@ -543,6 +546,7 @@ export type ConfigurableSources = Exclude<
| ValidSources.IngestionApi
| ValidSources.FederatedSlack // is part of ValiedSources.Slack
| ValidSources.UserFile
| ValidSources.CraftFile // User Library - managed through dedicated UI
>;
export const oauthSupportedSources: ConfigurableSources[] = [

View File

@@ -331,7 +331,7 @@ function MessageInner(
{/* Actions */}
{actions && (
<div className="flex items-start justify-end shrink-0">
<div className="flex items-center justify-end shrink-0 self-center pr-2">
<Button
secondary
onClick={onAction}