Compare commits

...

2 Commits

Author SHA1 Message Date
Wenxi Onyx
6155589aa9 attempt 2 to solve hanging with keep alive ping 2026-02-01 11:34:19 -08:00
Wenxi
6740418808 fix(craft): attempt to solve hanging with explicit k8s_stream timeout (#8066) 2026-02-01 10:01:37 -08:00
2 changed files with 44 additions and 4 deletions

View File

@@ -89,7 +89,7 @@ SANDBOX_NAMESPACE = os.environ.get("SANDBOX_NAMESPACE", "onyx-sandboxes")
# Container image for sandbox pods
# Should include Next.js template and opencode CLI
SANDBOX_CONTAINER_IMAGE = os.environ.get(
"SANDBOX_CONTAINER_IMAGE", "onyxdotapp/sandbox:v0.1.0"
"SANDBOX_CONTAINER_IMAGE", "onyxdotapp/sandbox:v0.1.2"
)
# S3 bucket for sandbox file storage (snapshots, knowledge files, uploads)

View File

@@ -125,7 +125,9 @@ class ACPExecClient:
self._ws_client: WSClient | None = None
self._response_queue: Queue[dict[str, Any]] = Queue()
self._reader_thread: threading.Thread | None = None
self._keepalive_thread: threading.Thread | None = None
self._stop_reader = threading.Event()
self._write_lock = threading.Lock() # Protects write_stdin calls
self._k8s_client: client.CoreV1Api | None = None
def _get_k8s_client(self) -> client.CoreV1Api:
@@ -171,6 +173,7 @@ class ACPExecClient:
stderr=True,
tty=False,
_preload_content=False,
_request_timeout=900, # 15 minute timeout for long-running sessions
)
# Start reader thread
@@ -180,6 +183,12 @@ class ACPExecClient:
)
self._reader_thread.start()
# Start keepalive thread to prevent idle timeout
self._keepalive_thread = threading.Thread(
target=self._keepalive_loop, daemon=True
)
self._keepalive_thread.start()
# Give process a moment to start
time.sleep(0.5)
@@ -254,6 +263,30 @@ class ACPExecClient:
logger.debug(f"Reader error: {e}")
break
def _keepalive_loop(self) -> None:
"""Send periodic keepalive to prevent idle timeout on the k8s websocket."""
packet_logger = get_packet_logger()
while not self._stop_reader.is_set():
# Wait 30 seconds between keepalives
if self._stop_reader.wait(timeout=30.0):
break # Stop event was set
if self._ws_client is not None and self._ws_client.is_open():
try:
# Send empty line as keepalive - opencode ignores empty lines
with self._write_lock:
self._ws_client.write_stdin("\n")
packet_logger.log_raw(
"K8S-KEEPALIVE-SENT",
{"pod": self._pod_name, "namespace": self._namespace},
)
except Exception as e:
packet_logger.log_raw(
"K8S-KEEPALIVE-ERROR",
{"error": str(e), "pod": self._pod_name},
)
break
def stop(self) -> None:
"""Stop the exec session and clean up."""
self._stop_reader.set()
@@ -269,6 +302,10 @@ class ACPExecClient:
self._reader_thread.join(timeout=2.0)
self._reader_thread = None
if self._keepalive_thread is not None:
self._keepalive_thread.join(timeout=2.0)
self._keepalive_thread = None
self._state = ACPClientState()
def _get_next_id(self) -> int:
@@ -296,7 +333,8 @@ class ACPExecClient:
packet_logger.log_jsonrpc_request(method, request_id, params, context="k8s")
message = json.dumps(request) + "\n"
self._ws_client.write_stdin(message)
with self._write_lock:
self._ws_client.write_stdin(message)
return request_id
@@ -319,7 +357,8 @@ class ACPExecClient:
packet_logger.log_jsonrpc_request(method, None, params, context="k8s")
message = json.dumps(notification) + "\n"
self._ws_client.write_stdin(message)
with self._write_lock:
self._ws_client.write_stdin(message)
def _wait_for_response(
self, request_id: int, timeout: float = 30.0
@@ -637,7 +676,8 @@ class ACPExecClient:
"error": {"code": code, "message": message},
}
self._ws_client.write_stdin(json.dumps(response) + "\n")
with self._write_lock:
self._ws_client.write_stdin(json.dumps(response) + "\n")
def cancel(self) -> None:
"""Cancel the current operation."""