Compare commits

...

3 Commits

Author SHA1 Message Date
rohoswagger
c1621e004f fix(craft): preserve assistant message on stop generation
Instead of hard-aborting the HTTP connection when the user stops
generation, only send the cancel signal to the ACP agent and let
the stream finish gracefully. This allows the backend to save the
partial message to the DB and the frontend to consolidate it into
session state via the normal prompt_response handler.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 17:38:08 -08:00
rohoswagger
5bb8f16d6c fix(craft): show grayed-out send button instead of stop during sandbox restore
During sandbox restoration, the session status is "creating" which
caused useIsRunning to return true, incorrectly showing the stop button.
Added useIsStreaming hook (only true for status "running") to control
the stop button, and included sandbox restoring state in sandboxNotReady.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 16:42:00 -08:00
rohoswagger
21aa89badc feat(craft): stop generation 2026-02-26 15:03:34 -08:00
12 changed files with 359 additions and 44 deletions

View File

@@ -124,3 +124,26 @@ def send_message(
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
@router.post("/sessions/{session_id}/cancel-message", tags=PUBLIC_API_TAGS)
def cancel_message(
session_id: UUID,
user: User = Depends(current_user),
db_session: Session = Depends(get_session),
) -> dict[str, bool]:
"""
Cancel the current message/prompt operation for a session.
Sends a session/cancel notification to the ACP agent to stop the
currently running operation. This follows the ACP protocol specification
for cancellation.
Returns:
{"cancelled": true} if cancel was sent successfully
{"cancelled": false} if no active operation to cancel
"""
session_manager = SessionManager(db_session)
cancelled = session_manager.cancel_message(session_id, user.id)
return {"cancelled": cancelled}

View File

@@ -491,6 +491,27 @@ class SandboxManager(ABC):
nextjs_port: The port the Next.js server should be listening on
"""
@abstractmethod
def cancel_message(
self,
sandbox_id: UUID,
session_id: UUID,
) -> bool:
"""Cancel the current message/prompt operation for a session.
Sends a session/cancel notification to the ACP agent to stop the
currently running operation. This is a non-blocking operation that
signals the agent to stop but does not wait for confirmation.
Args:
sandbox_id: The sandbox ID
session_id: The session ID whose operation should be cancelled
Returns:
True if cancel was sent, False if no active session/client found
"""
...
# Singleton instance cache for the factory
_sandbox_manager_instance: SandboxManager | None = None

View File

@@ -358,6 +358,15 @@ class KubernetesSandboxManager(SandboxManager):
self._agent_instructions_template_path = build_dir / "AGENTS.template.md"
self._skills_path = Path(__file__).parent / "docker" / "skills"
# Track active ephemeral ACP clients for cancellation support.
# Only populated during an active send_message() call — entries are
# added at the start of send_message() and removed in its finally block.
# Keyed by (sandbox_id, session_id) tuple.
# Lock guards dict access and client lifecycle between send_message()
# (streaming thread) and cancel_message() (cancel request thread).
self._active_acp_clients: dict[tuple[UUID, UUID], ACPExecClient] = {}
self._active_acp_clients_lock = threading.Lock()
logger.info(
f"KubernetesSandboxManager initialized: "
f"namespace={self._namespace}, image={self._image}"
@@ -1405,8 +1414,8 @@ echo "Session workspace setup complete"
) -> None:
"""Clean up a session workspace (on session delete).
Removes the ACP session mapping and executes kubectl exec to remove
the session directory. The shared ACP client persists for other sessions.
Executes kubectl exec to remove the session directory. ACP clients are
ephemeral (created per message), so there's nothing to stop here.
Args:
sandbox_id: The sandbox ID
@@ -1889,6 +1898,12 @@ echo "Session config regeneration complete"
# Create an ephemeral ACP client for this message
acp_client = self._create_ephemeral_acp_client(sandbox_id, session_path)
# Register as the active client for this session so cancel_message()
# can find and cancel it from another thread.
client_key = (sandbox_id, session_id)
with self._active_acp_clients_lock:
self._active_acp_clients[client_key] = acp_client
try:
# Resume (or create) the ACP session from opencode's on-disk storage
acp_session_id = acp_client.resume_or_create_session(cwd=session_path)
@@ -1972,8 +1987,9 @@ echo "Session config regeneration complete"
)
raise
finally:
# Always stop the ephemeral ACP client to kill the opencode process.
# This ensures no stale processes linger in the sandbox container.
# Always deregister and stop the ephemeral ACP client.
with self._active_acp_clients_lock:
self._active_acp_clients.pop(client_key, None)
try:
acp_client.stop()
except Exception as e:
@@ -2714,3 +2730,43 @@ fi
except ApiException as e:
logger.warning(f"Failed to get upload stats: {e}")
return 0, 0
def cancel_message(
self,
sandbox_id: UUID,
session_id: UUID,
) -> bool:
"""Cancel the current message/prompt operation for a session.
Sends a session/cancel notification to the ACP agent to stop the
currently running operation.
Args:
sandbox_id: The sandbox ID
session_id: The session ID whose operation should be cancelled
Returns:
True if cancel was sent, False if no active session/client found
"""
client_key = (sandbox_id, session_id)
with self._active_acp_clients_lock:
exec_client = self._active_acp_clients.get(client_key)
if exec_client is None or not exec_client.is_running:
logger.debug(
f"No active ACP client for sandbox {sandbox_id}, session {session_id}"
)
return False
try:
exec_client.cancel()
logger.info(
f"Sent cancel notification for sandbox {sandbox_id}, session {session_id}"
)
return True
except Exception as e:
logger.warning(
f"Failed to cancel operation for sandbox {sandbox_id}, "
f"session {session_id}: {e}"
)
return False

View File

@@ -1047,23 +1047,35 @@ class LocalSandboxManager(SandboxManager):
if not self._is_path_allowed(session_path, target_path):
raise ValueError("Path traversal not allowed")
# If directory doesn't exist, return empty list (no files yet)
if not target_path.exists():
return []
if not target_path.is_dir():
raise ValueError(f"Not a directory: {path}")
entries = []
for item in target_path.iterdir():
stat = item.stat()
is_file = item.is_file()
mime_type = mimetypes.guess_type(str(item))[0] if is_file else None
entries.append(
FilesystemEntry(
name=item.name,
path=str(item.relative_to(session_path)),
is_directory=item.is_dir(),
size=stat.st_size if is_file else None,
mime_type=mime_type,
)
)
try:
for item in target_path.iterdir():
try:
stat = item.stat()
is_file = item.is_file()
mime_type = mimetypes.guess_type(str(item))[0] if is_file else None
entries.append(
FilesystemEntry(
name=item.name,
path=str(item.relative_to(session_path)),
is_directory=item.is_dir(),
size=stat.st_size if is_file else None,
mime_type=mime_type,
)
)
except (FileNotFoundError, OSError):
# Skip files that were deleted during iteration (race condition)
continue
except FileNotFoundError:
# Directory was deleted during iteration
return []
return sorted(entries, key=lambda e: (not e.is_directory, e.name.lower()))
@@ -1411,3 +1423,42 @@ class LocalSandboxManager(SandboxManager):
f"sync_files called for local sandbox {sandbox_id}{source_info} - no-op"
)
return True
def cancel_message(
self,
sandbox_id: UUID,
session_id: UUID,
) -> bool:
"""Cancel the current message/prompt operation for a session.
Sends a session/cancel notification to the ACP agent to stop the
currently running operation.
Args:
sandbox_id: The sandbox ID
session_id: The session ID whose operation should be cancelled
Returns:
True if cancel was sent, False if no active session/client found
"""
client_key = (sandbox_id, session_id)
client = self._acp_clients.get(client_key)
if client is None or not client.is_running:
logger.debug(
f"No active ACP client for sandbox {sandbox_id}, session {session_id}"
)
return False
try:
client.cancel()
logger.info(
f"Sent cancel notification for sandbox {sandbox_id}, session {session_id}"
)
return True
except Exception as e:
logger.warning(
f"Failed to cancel operation for sandbox {sandbox_id}, "
f"session {session_id}: {e}"
)
return False

View File

@@ -1550,6 +1550,46 @@ class SessionManager:
return "error"
return "unknown"
def cancel_message(
self,
session_id: UUID,
user_id: UUID,
) -> bool:
"""Cancel the current message/prompt operation for a session.
Sends a session/cancel notification to the ACP agent to stop the
currently running operation. This follows the ACP protocol:
https://agentclientprotocol.com
Args:
session_id: The session UUID
user_id: The user ID (for ownership verification)
Returns:
True if cancel was sent, False if session/sandbox not found or cancel failed
"""
# Verify session ownership
session = get_build_session(session_id, user_id, self._db_session)
if session is None:
logger.warning(f"Cancel request for non-existent session {session_id}")
return False
# Get the user's sandbox
sandbox = get_sandbox_by_user_id(self._db_session, user_id)
if sandbox is None:
logger.warning(f"Cancel request but no sandbox for user {user_id}")
return False
# Send cancel to the sandbox manager
result = self._sandbox_manager.cancel_message(sandbox.id, session_id)
if result:
logger.info(f"Cancelled message operation for session {session_id}")
else:
logger.debug(f"No active operation to cancel for session {session_id}")
return result
# =========================================================================
# Artifact Operations
# =========================================================================

View File

@@ -2,6 +2,7 @@
import { useRef, useEffect } from "react";
import Logo from "@/refresh-components/Logo";
import Text from "@/refresh-components/texts/Text";
import TextChunk from "@/app/craft/components/TextChunk";
import ThinkingCard from "@/app/craft/components/ThinkingCard";
import ToolCallPill from "@/app/craft/components/ToolCallPill";
@@ -68,6 +69,8 @@ interface BuildMessageListProps {
messages: BuildMessage[];
streamItems: StreamItem[];
isStreaming?: boolean;
/** Whether the user cancelled the current generation */
userCancelled?: boolean;
/** Whether auto-scroll is enabled (user is at bottom) */
autoScrollEnabled?: boolean;
/** Ref to the end marker div for scroll detection */
@@ -85,6 +88,7 @@ export default function BuildMessageList({
messages,
streamItems,
isStreaming = false,
userCancelled = false,
autoScrollEnabled = true,
messagesEndRef: externalMessagesEndRef,
}: BuildMessageListProps) {
@@ -104,8 +108,11 @@ export default function BuildMessageList({
const lastMessage = messages[messages.length - 1];
const lastMessageIsUser = lastMessage?.type === "user";
// Show streaming area if we have stream items OR if we're waiting for a response to the latest user message
// Also show if user cancelled (to display the cancellation message)
const showStreamingArea =
hasStreamItems || (isStreaming && lastMessageIsUser);
hasStreamItems ||
(isStreaming && lastMessageIsUser) ||
(userCancelled && lastMessageIsUser);
// Check for active tools (for "Working..." state)
const hasActiveTools = streamItems.some(
@@ -206,16 +213,28 @@ export default function BuildMessageList({
</div>
<div className="flex-1 flex flex-col gap-3 min-w-0">
{!hasStreamItems ? (
// Loading state - no content yet, show blinking dot like main chat
<BlinkingDot />
// Loading state or cancelled - show blinking dot or cancelled message
userCancelled ? (
<Text as="p" secondaryBody text04>
User has stopped generation
</Text>
) : (
<BlinkingDot />
)
) : (
<>
{/* Render stream items in FIFO order */}
{renderStreamItems(streamItems, true)}
{/* Streaming indicator when actively streaming text */}
{isStreaming && hasStreamItems && !hasActiveTools && (
<BlinkingDot />
{/* Show cancelled message or streaming indicator */}
{userCancelled ? (
<Text as="p" secondaryBody text04>
User has stopped generation
</Text>
) : (
isStreaming &&
hasStreamItems &&
!hasActiveTools && <BlinkingDot />
)}
</>
)}

View File

@@ -8,6 +8,7 @@ import {
useSessionId,
useHasSession,
useIsRunning,
useIsStreaming,
useOutputPanelOpen,
useToggleOutputPanel,
useBuildSessionStore,
@@ -67,6 +68,7 @@ export default function BuildChatPanel({
const sessionId = useSessionId();
const hasSession = useHasSession();
const isRunning = useIsRunning();
const isStreaming = useIsStreaming();
const { setLeftSidebarFolded, leftSidebarFolded } = useBuildContext();
const { isMobile } = useScreenSize();
const toggleOutputPanel = useToggleOutputPanel();
@@ -112,13 +114,16 @@ export default function BuildChatPanel({
const nameBuildSession = useBuildSessionStore(
(state) => state.nameBuildSession
);
const { streamMessage } = useBuildStreaming();
const { streamMessage, abortStream } = useBuildStreaming();
const isPreProvisioning = useIsPreProvisioning();
const isPreProvisioningFailed = useIsPreProvisioningFailed();
const preProvisionedSessionId = usePreProvisionedSessionId();
// Disable input when pre-provisioning is in progress or failed (waiting for retry)
const sandboxNotReady = isPreProvisioning || isPreProvisioningFailed;
// Disable input when pre-provisioning is in progress, failed, or sandbox is restoring
const sandboxNotReady =
isPreProvisioning ||
isPreProvisioningFailed ||
session?.sandbox?.status === "restoring";
const { currentMessageFiles, hasUploadingFiles, setActiveSession } =
useUploadFilesContext();
const followupSuggestions = useFollowupSuggestions();
@@ -431,6 +436,7 @@ export default function BuildChatPanel({
messages={session?.messages ?? []}
streamItems={session?.streamItems ?? []}
isStreaming={isRunning}
userCancelled={session?.status === "cancelled"}
autoScrollEnabled={isAtBottom}
/>
)}
@@ -483,7 +489,10 @@ export default function BuildChatPanel({
<InputBar
ref={inputBarRef}
onSubmit={handleSubmit}
onStop={abortStream}
isRunning={isRunning}
isStreaming={isStreaming}
sandboxInitializing={sandboxNotReady}
placeholder="Continue the conversation..."
/>
</div>

View File

@@ -36,6 +36,7 @@ import {
SvgPaperclip,
SvgOrganization,
SvgAlertCircle,
SvgStop,
} from "@opal/icons";
const MAX_INPUT_HEIGHT = 200;
@@ -52,7 +53,11 @@ export interface InputBarProps {
files: BuildFile[],
demoDataEnabled: boolean
) => void;
/** Callback to stop the current generation. If provided and isStreaming is true, shows a stop button. */
onStop?: () => void;
isRunning: boolean;
/** True only when the LLM is actively streaming. Controls whether the stop button is shown. */
isStreaming?: boolean;
disabled?: boolean;
placeholder?: string;
/** When true, shows spinner on send button with "Initializing sandbox..." tooltip */
@@ -153,7 +158,9 @@ const InputBar = memo(
(
{
onSubmit,
onStop,
isRunning,
isStreaming = false,
disabled = false,
placeholder = "Describe your task...",
sandboxInitializing = false,
@@ -405,18 +412,26 @@ const InputBar = memo(
{/* Bottom right controls */}
<div className="flex flex-row items-center gap-1">
{/* Submit button */}
<IconButton
icon={sandboxInitializing ? SvgLoader : SvgArrowUp}
onClick={handleSubmit}
disabled={!canSubmit}
tooltip={
sandboxInitializing ? "Initializing sandbox..." : "Send"
}
iconClassName={
sandboxInitializing ? "animate-spin" : undefined
}
/>
{/* Submit button - shows Stop when running, Send otherwise */}
{isStreaming && onStop ? (
<IconButton
icon={SvgStop}
onClick={() => onStop()}
tooltip="Stop generation"
/>
) : (
<IconButton
icon={sandboxInitializing ? SvgLoader : SvgArrowUp}
onClick={handleSubmit}
disabled={!canSubmit}
tooltip={
sandboxInitializing ? "Initializing sandbox..." : "Send"
}
iconClassName={
sandboxInitializing ? "animate-spin" : undefined
}
/>
)}
</div>
</div>
</div>

View File

@@ -2068,6 +2068,15 @@ export const useIsRunning = () =>
return session?.status === "running" || session?.status === "creating";
});
/** True only when the LLM is actively streaming a response. */
export const useIsStreaming = () =>
useBuildSessionStore((state) => {
const { currentSessionId, sessions } = state;
if (!currentSessionId) return false;
const session = sessions.get(currentSessionId);
return session?.status === "running";
});
export const useMessages = () =>
useBuildSessionStore((state) => {
const { currentSessionId, sessions } = state;

View File

@@ -14,6 +14,7 @@ import {
fetchSession,
generateFollowupSuggestions,
RateLimitError,
cancelMessage,
} from "@/app/craft/services/apiServices";
import { useBuildSessionStore } from "@/app/craft/hooks/useBuildSessionStore";
@@ -40,9 +41,7 @@ export function useBuildStreaming() {
const setAbortController = useBuildSessionStore(
(state) => state.setAbortController
);
const abortCurrentSession = useBuildSessionStore(
(state) => state.abortCurrentSession
);
const abortSession = useBuildSessionStore((state) => state.abortSession);
const updateSessionData = useBuildSessionStore(
(state) => state.updateSessionData
);
@@ -449,11 +448,59 @@ export function useBuildStreaming() {
]
);
/**
* Abort the current streaming operation.
*
* Sends a cancel notification to the ACP agent so it stops gracefully.
* The stream stays open so we receive the final `prompt_response` event,
* which triggers the normal message-saving path (both backend DB persist
* and frontend state consolidation).
*
* Previous approach (kept for reference): we used to also abort the HTTP
* request via `abortSession()`, but that killed the connection before the
* backend could save the partial message to the DB, causing the assistant
* message to disappear when the user sent a follow-up.
*/
const abortStream = useCallback(
async (sessionId?: string) => {
const targetSessionId =
sessionId ?? useBuildSessionStore.getState().currentSessionId;
if (!targetSessionId) {
return;
}
// Send cancel to the backend to stop the agent.
// This sends session/cancel notification per ACP protocol.
// The stream stays open so the agent can finish up and send
// a prompt_response, allowing normal save logic to run.
try {
await cancelMessage(targetSessionId);
} catch (err) {
console.error("[Streaming] Failed to cancel agent operation:", err);
// If cancel fails, fall back to hard abort so the UI doesn't hang
updateSessionData(targetSessionId, { status: "cancelled" });
abortSession(targetSessionId);
}
// NOTE: We intentionally do NOT call abortSession() here.
// The stream continues until the agent sends prompt_response,
// at which point the normal prompt_response handler (above)
// saves the message and transitions status to "active".
//
// Previous hard-abort approach (kept in case graceful cancel
// doesn't work reliably):
// updateSessionData(targetSessionId, { status: "cancelled" });
// abortSession(targetSessionId);
},
[abortSession, updateSessionData]
);
return useMemo(
() => ({
streamMessage,
abortStream: abortCurrentSession,
abortStream,
}),
[streamMessage, abortCurrentSession]
[streamMessage, abortStream]
);
}

View File

@@ -358,6 +358,30 @@ export async function sendMessageStream(
return res;
}
/**
* Cancel the current message/prompt operation for a session.
* Sends a session/cancel notification to the ACP agent to stop
* the currently running operation.
*
* This follows the ACP (Agent Client Protocol) specification for cancellation.
*
* @returns true if cancel was sent, false if no active operation
*/
export async function cancelMessage(sessionId: string): Promise<boolean> {
const res = await fetch(`${API_BASE}/sessions/${sessionId}/cancel-message`, {
method: "POST",
headers: { "Content-Type": "application/json" },
});
if (!res.ok) {
console.error(`Failed to cancel message: ${res.status}`);
return false;
}
const data = await res.json();
return data.cancelled ?? false;
}
// =============================================================================
// Artifacts API
// =============================================================================

View File

@@ -125,7 +125,8 @@ export type SessionStatus =
| "creating"
| "running"
| "active"
| "failed";
| "failed"
| "cancelled";
export interface Session {
id: string | null;