Compare commits

...

1 Commits

Author SHA1 Message Date
Wenxi Onyx
6fca48d9cb refactor(craft): chad s5cmd > aws cli (mem overhead + speed) 2026-02-04 13:50:55 -08:00
4 changed files with 17 additions and 39 deletions

View File

@@ -269,7 +269,7 @@ class S3PersistentDocumentWriter:
s3://{bucket}/{tenant_id}/knowledge/{user_id}/{source}/{hierarchy}/document.json
This matches the location that KubernetesSandboxManager reads from when
provisioning sandboxes (via the init container's aws s3 sync command).
provisioning sandboxes (via the sidecar container's s5cmd sync command).
"""
def __init__(self, tenant_id: str, user_id: str):
@@ -338,7 +338,7 @@ class S3PersistentDocumentWriter:
{tenant_id}/knowledge/{user_id}/{source}/{hierarchy}/
This matches the path that KubernetesSandboxManager syncs from:
aws s3 sync "s3://{bucket}/{tenant_id}/knowledge/{user_id}/" /workspace/files/
s5cmd sync "s3://{bucket}/{tenant_id}/knowledge/{user_id}/*" /workspace/files/
"""
# Tenant and user segregation (matches K8s sandbox init container path)
parts = [self.tenant_id, "knowledge", self.user_id]

View File

@@ -417,7 +417,7 @@ class SandboxManager(ABC):
) -> bool:
"""Sync files from S3 to the sandbox's /workspace/files directory.
For Kubernetes backend: Executes `aws s3 sync` in the file-sync sidecar container.
For Kubernetes backend: Executes `s5cmd sync` in the file-sync sidecar container.
For Local backend: No-op since files are directly accessible via symlink.
This is idempotent - only downloads changed files.

View File

@@ -352,13 +352,8 @@ class KubernetesSandboxManager(SandboxManager):
# via kubectl exec after new documents are indexed
file_sync_container = client.V1Container(
name="file-sync",
image="amazon/aws-cli:latest",
env=_get_local_aws_credential_env_vars()
+ [
# Set HOME to a writable directory so AWS CLI can create .aws config dir
# Without this, AWS CLI tries to access /.aws which fails with permission denied
client.V1EnvVar(name="HOME", value="/tmp"),
],
image="peakcom/s5cmd:v2.3.0",
env=_get_local_aws_credential_env_vars(),
command=["/bin/sh", "-c"],
args=[
f"""
@@ -369,28 +364,12 @@ trap 'echo "Received SIGTERM, exiting"; exit 0' TERM
echo "Starting initial file sync for tenant: {tenant_id} / user: {user_id}"
echo "S3 source: s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/"
# Capture both stdout and stderr, track exit code
# aws s3 sync returns exit code 1 even on success if there are warnings
sync_exit_code=0
sync_stderr=$(mktemp)
aws s3 sync "s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/" /workspace/files/ 2>"$sync_stderr" || sync_exit_code=$?
# Always show stderr if there was any output (for debugging)
if [ -s "$sync_stderr" ]; then
echo "=== S3 sync stderr output ==="
cat "$sync_stderr"
echo "=== End stderr output ==="
fi
rm -f "$sync_stderr"
# Report outcome
echo "S3 sync finished with exit code: $sync_exit_code"
# Exit codes 0 and 1 are both considered success
# (exit code 1 = success with warnings, e.g., metadata/timestamp issues)
if [ $sync_exit_code -eq 0 ] || [ $sync_exit_code -eq 1 ]; then
# s5cmd sync: high-performance parallel S3 sync (default 256 workers)
# Note: s5cmd requires wildcard (*) at end of source path for sync
if s5cmd sync "s3://{self._s3_bucket}/{tenant_id}/knowledge/{user_id}/*" /workspace/files/; then
echo "Initial sync complete, staying alive for incremental syncs"
else
sync_exit_code=$?
echo "ERROR: Initial sync failed with exit code: $sync_exit_code"
exit $sync_exit_code
fi
@@ -1894,10 +1873,10 @@ echo '{tar_b64}' | base64 -d | tar -xzf -
) -> bool:
"""Sync files from S3 to the running pod via the file-sync sidecar.
Executes `aws s3 sync` in the file-sync sidecar container to download
Executes `s5cmd sync` in the file-sync sidecar container to download
any new or changed files from S3 to /workspace/files/.
This is safe to call multiple times - aws s3 sync is idempotent.
This is safe to call multiple times - s5cmd sync is idempotent.
Args:
sandbox_id: The sandbox UUID
@@ -1909,14 +1888,13 @@ echo '{tar_b64}' | base64 -d | tar -xzf -
"""
pod_name = self._get_pod_name(str(sandbox_id))
# Configure AWS CLI for higher concurrency (default is 10) then run sync
# max_concurrent_requests controls parallel S3 API calls for faster transfers
s3_path = f"s3://{self._s3_bucket}/{tenant_id}/knowledge/{str(user_id)}/"
# s5cmd sync: high-performance parallel S3 sync (default 256 workers)
# Note: s5cmd requires wildcard (*) at end of source path for sync
s3_path = f"s3://{self._s3_bucket}/{tenant_id}/knowledge/{str(user_id)}/*"
sync_command = [
"/bin/sh",
"-c",
f"aws configure set default.s3.max_concurrent_requests 200 && "
f'aws s3 sync "{s3_path}" /workspace/files/',
f's5cmd sync "{s3_path}" /workspace/files/',
]
resp = k8s_stream(
self._stream_core_api.connect_get_namespaced_pod_exec,

View File

@@ -247,10 +247,10 @@ def sync_sandbox_files(self: Task, *, user_id: str, tenant_id: str) -> bool:
"""Sync files from S3 to a user's running sandbox.
This task is triggered after documents are written to S3 during indexing.
It executes `aws s3 sync` in the file-sync sidecar container to download
It executes `s5cmd sync` in the file-sync sidecar container to download
any new or changed files.
This is safe to call multiple times - aws s3 sync is idempotent.
This is safe to call multiple times - s5cmd sync is idempotent.
Args:
user_id: The user ID whose sandbox should be synced