mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-02-28 13:15:44 +00:00
Compare commits
1 Commits
main
...
experiment
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
21aa89badc |
@@ -124,3 +124,26 @@ def send_message(
|
||||
"X-Accel-Buffering": "no", # Disable nginx buffering
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.post("/sessions/{session_id}/cancel-message", tags=PUBLIC_API_TAGS)
|
||||
def cancel_message(
|
||||
session_id: UUID,
|
||||
user: User = Depends(current_user),
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> dict[str, bool]:
|
||||
"""
|
||||
Cancel the current message/prompt operation for a session.
|
||||
|
||||
Sends a session/cancel notification to the ACP agent to stop the
|
||||
currently running operation. This follows the ACP protocol specification
|
||||
for cancellation.
|
||||
|
||||
Returns:
|
||||
{"cancelled": true} if cancel was sent successfully
|
||||
{"cancelled": false} if no active operation to cancel
|
||||
"""
|
||||
session_manager = SessionManager(db_session)
|
||||
cancelled = session_manager.cancel_message(session_id, user.id)
|
||||
|
||||
return {"cancelled": cancelled}
|
||||
|
||||
@@ -491,6 +491,27 @@ class SandboxManager(ABC):
|
||||
nextjs_port: The port the Next.js server should be listening on
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def cancel_message(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
session_id: UUID,
|
||||
) -> bool:
|
||||
"""Cancel the current message/prompt operation for a session.
|
||||
|
||||
Sends a session/cancel notification to the ACP agent to stop the
|
||||
currently running operation. This is a non-blocking operation that
|
||||
signals the agent to stop but does not wait for confirmation.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
session_id: The session ID whose operation should be cancelled
|
||||
|
||||
Returns:
|
||||
True if cancel was sent, False if no active session/client found
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
# Singleton instance cache for the factory
|
||||
_sandbox_manager_instance: SandboxManager | None = None
|
||||
|
||||
@@ -358,6 +358,15 @@ class KubernetesSandboxManager(SandboxManager):
|
||||
self._agent_instructions_template_path = build_dir / "AGENTS.template.md"
|
||||
self._skills_path = Path(__file__).parent / "docker" / "skills"
|
||||
|
||||
# Track active ephemeral ACP clients for cancellation support.
|
||||
# Only populated during an active send_message() call — entries are
|
||||
# added at the start of send_message() and removed in its finally block.
|
||||
# Keyed by (sandbox_id, session_id) tuple.
|
||||
# Lock guards dict access and client lifecycle between send_message()
|
||||
# (streaming thread) and cancel_message() (cancel request thread).
|
||||
self._active_acp_clients: dict[tuple[UUID, UUID], ACPExecClient] = {}
|
||||
self._active_acp_clients_lock = threading.Lock()
|
||||
|
||||
logger.info(
|
||||
f"KubernetesSandboxManager initialized: "
|
||||
f"namespace={self._namespace}, image={self._image}"
|
||||
@@ -1405,8 +1414,8 @@ echo "Session workspace setup complete"
|
||||
) -> None:
|
||||
"""Clean up a session workspace (on session delete).
|
||||
|
||||
Removes the ACP session mapping and executes kubectl exec to remove
|
||||
the session directory. The shared ACP client persists for other sessions.
|
||||
Executes kubectl exec to remove the session directory. ACP clients are
|
||||
ephemeral (created per message), so there's nothing to stop here.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
@@ -1889,6 +1898,12 @@ echo "Session config regeneration complete"
|
||||
# Create an ephemeral ACP client for this message
|
||||
acp_client = self._create_ephemeral_acp_client(sandbox_id, session_path)
|
||||
|
||||
# Register as the active client for this session so cancel_message()
|
||||
# can find and cancel it from another thread.
|
||||
client_key = (sandbox_id, session_id)
|
||||
with self._active_acp_clients_lock:
|
||||
self._active_acp_clients[client_key] = acp_client
|
||||
|
||||
try:
|
||||
# Resume (or create) the ACP session from opencode's on-disk storage
|
||||
acp_session_id = acp_client.resume_or_create_session(cwd=session_path)
|
||||
@@ -1972,8 +1987,9 @@ echo "Session config regeneration complete"
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
# Always stop the ephemeral ACP client to kill the opencode process.
|
||||
# This ensures no stale processes linger in the sandbox container.
|
||||
# Always deregister and stop the ephemeral ACP client.
|
||||
with self._active_acp_clients_lock:
|
||||
self._active_acp_clients.pop(client_key, None)
|
||||
try:
|
||||
acp_client.stop()
|
||||
except Exception as e:
|
||||
@@ -2714,3 +2730,43 @@ fi
|
||||
except ApiException as e:
|
||||
logger.warning(f"Failed to get upload stats: {e}")
|
||||
return 0, 0
|
||||
|
||||
def cancel_message(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
session_id: UUID,
|
||||
) -> bool:
|
||||
"""Cancel the current message/prompt operation for a session.
|
||||
|
||||
Sends a session/cancel notification to the ACP agent to stop the
|
||||
currently running operation.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
session_id: The session ID whose operation should be cancelled
|
||||
|
||||
Returns:
|
||||
True if cancel was sent, False if no active session/client found
|
||||
"""
|
||||
client_key = (sandbox_id, session_id)
|
||||
with self._active_acp_clients_lock:
|
||||
exec_client = self._active_acp_clients.get(client_key)
|
||||
|
||||
if exec_client is None or not exec_client.is_running:
|
||||
logger.debug(
|
||||
f"No active ACP client for sandbox {sandbox_id}, session {session_id}"
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
exec_client.cancel()
|
||||
logger.info(
|
||||
f"Sent cancel notification for sandbox {sandbox_id}, session {session_id}"
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to cancel operation for sandbox {sandbox_id}, "
|
||||
f"session {session_id}: {e}"
|
||||
)
|
||||
return False
|
||||
|
||||
@@ -1047,23 +1047,35 @@ class LocalSandboxManager(SandboxManager):
|
||||
if not self._is_path_allowed(session_path, target_path):
|
||||
raise ValueError("Path traversal not allowed")
|
||||
|
||||
# If directory doesn't exist, return empty list (no files yet)
|
||||
if not target_path.exists():
|
||||
return []
|
||||
|
||||
if not target_path.is_dir():
|
||||
raise ValueError(f"Not a directory: {path}")
|
||||
|
||||
entries = []
|
||||
for item in target_path.iterdir():
|
||||
stat = item.stat()
|
||||
is_file = item.is_file()
|
||||
mime_type = mimetypes.guess_type(str(item))[0] if is_file else None
|
||||
entries.append(
|
||||
FilesystemEntry(
|
||||
name=item.name,
|
||||
path=str(item.relative_to(session_path)),
|
||||
is_directory=item.is_dir(),
|
||||
size=stat.st_size if is_file else None,
|
||||
mime_type=mime_type,
|
||||
)
|
||||
)
|
||||
try:
|
||||
for item in target_path.iterdir():
|
||||
try:
|
||||
stat = item.stat()
|
||||
is_file = item.is_file()
|
||||
mime_type = mimetypes.guess_type(str(item))[0] if is_file else None
|
||||
entries.append(
|
||||
FilesystemEntry(
|
||||
name=item.name,
|
||||
path=str(item.relative_to(session_path)),
|
||||
is_directory=item.is_dir(),
|
||||
size=stat.st_size if is_file else None,
|
||||
mime_type=mime_type,
|
||||
)
|
||||
)
|
||||
except (FileNotFoundError, OSError):
|
||||
# Skip files that were deleted during iteration (race condition)
|
||||
continue
|
||||
except FileNotFoundError:
|
||||
# Directory was deleted during iteration
|
||||
return []
|
||||
|
||||
return sorted(entries, key=lambda e: (not e.is_directory, e.name.lower()))
|
||||
|
||||
@@ -1411,3 +1423,42 @@ class LocalSandboxManager(SandboxManager):
|
||||
f"sync_files called for local sandbox {sandbox_id}{source_info} - no-op"
|
||||
)
|
||||
return True
|
||||
|
||||
def cancel_message(
|
||||
self,
|
||||
sandbox_id: UUID,
|
||||
session_id: UUID,
|
||||
) -> bool:
|
||||
"""Cancel the current message/prompt operation for a session.
|
||||
|
||||
Sends a session/cancel notification to the ACP agent to stop the
|
||||
currently running operation.
|
||||
|
||||
Args:
|
||||
sandbox_id: The sandbox ID
|
||||
session_id: The session ID whose operation should be cancelled
|
||||
|
||||
Returns:
|
||||
True if cancel was sent, False if no active session/client found
|
||||
"""
|
||||
client_key = (sandbox_id, session_id)
|
||||
client = self._acp_clients.get(client_key)
|
||||
|
||||
if client is None or not client.is_running:
|
||||
logger.debug(
|
||||
f"No active ACP client for sandbox {sandbox_id}, session {session_id}"
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
client.cancel()
|
||||
logger.info(
|
||||
f"Sent cancel notification for sandbox {sandbox_id}, session {session_id}"
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to cancel operation for sandbox {sandbox_id}, "
|
||||
f"session {session_id}: {e}"
|
||||
)
|
||||
return False
|
||||
|
||||
@@ -1550,6 +1550,46 @@ class SessionManager:
|
||||
return "error"
|
||||
return "unknown"
|
||||
|
||||
def cancel_message(
|
||||
self,
|
||||
session_id: UUID,
|
||||
user_id: UUID,
|
||||
) -> bool:
|
||||
"""Cancel the current message/prompt operation for a session.
|
||||
|
||||
Sends a session/cancel notification to the ACP agent to stop the
|
||||
currently running operation. This follows the ACP protocol:
|
||||
https://agentclientprotocol.com
|
||||
|
||||
Args:
|
||||
session_id: The session UUID
|
||||
user_id: The user ID (for ownership verification)
|
||||
|
||||
Returns:
|
||||
True if cancel was sent, False if session/sandbox not found or cancel failed
|
||||
"""
|
||||
# Verify session ownership
|
||||
session = get_build_session(session_id, user_id, self._db_session)
|
||||
if session is None:
|
||||
logger.warning(f"Cancel request for non-existent session {session_id}")
|
||||
return False
|
||||
|
||||
# Get the user's sandbox
|
||||
sandbox = get_sandbox_by_user_id(self._db_session, user_id)
|
||||
if sandbox is None:
|
||||
logger.warning(f"Cancel request but no sandbox for user {user_id}")
|
||||
return False
|
||||
|
||||
# Send cancel to the sandbox manager
|
||||
result = self._sandbox_manager.cancel_message(sandbox.id, session_id)
|
||||
|
||||
if result:
|
||||
logger.info(f"Cancelled message operation for session {session_id}")
|
||||
else:
|
||||
logger.debug(f"No active operation to cancel for session {session_id}")
|
||||
|
||||
return result
|
||||
|
||||
# =========================================================================
|
||||
# Artifact Operations
|
||||
# =========================================================================
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import { useRef, useEffect } from "react";
|
||||
import Logo from "@/refresh-components/Logo";
|
||||
import Text from "@/refresh-components/texts/Text";
|
||||
import TextChunk from "@/app/craft/components/TextChunk";
|
||||
import ThinkingCard from "@/app/craft/components/ThinkingCard";
|
||||
import ToolCallPill from "@/app/craft/components/ToolCallPill";
|
||||
@@ -68,6 +69,8 @@ interface BuildMessageListProps {
|
||||
messages: BuildMessage[];
|
||||
streamItems: StreamItem[];
|
||||
isStreaming?: boolean;
|
||||
/** Whether the user cancelled the current generation */
|
||||
userCancelled?: boolean;
|
||||
/** Whether auto-scroll is enabled (user is at bottom) */
|
||||
autoScrollEnabled?: boolean;
|
||||
/** Ref to the end marker div for scroll detection */
|
||||
@@ -85,6 +88,7 @@ export default function BuildMessageList({
|
||||
messages,
|
||||
streamItems,
|
||||
isStreaming = false,
|
||||
userCancelled = false,
|
||||
autoScrollEnabled = true,
|
||||
messagesEndRef: externalMessagesEndRef,
|
||||
}: BuildMessageListProps) {
|
||||
@@ -104,8 +108,11 @@ export default function BuildMessageList({
|
||||
const lastMessage = messages[messages.length - 1];
|
||||
const lastMessageIsUser = lastMessage?.type === "user";
|
||||
// Show streaming area if we have stream items OR if we're waiting for a response to the latest user message
|
||||
// Also show if user cancelled (to display the cancellation message)
|
||||
const showStreamingArea =
|
||||
hasStreamItems || (isStreaming && lastMessageIsUser);
|
||||
hasStreamItems ||
|
||||
(isStreaming && lastMessageIsUser) ||
|
||||
(userCancelled && lastMessageIsUser);
|
||||
|
||||
// Check for active tools (for "Working..." state)
|
||||
const hasActiveTools = streamItems.some(
|
||||
@@ -206,16 +213,28 @@ export default function BuildMessageList({
|
||||
</div>
|
||||
<div className="flex-1 flex flex-col gap-3 min-w-0">
|
||||
{!hasStreamItems ? (
|
||||
// Loading state - no content yet, show blinking dot like main chat
|
||||
<BlinkingDot />
|
||||
// Loading state or cancelled - show blinking dot or cancelled message
|
||||
userCancelled ? (
|
||||
<Text as="p" secondaryBody text04>
|
||||
User has stopped generation
|
||||
</Text>
|
||||
) : (
|
||||
<BlinkingDot />
|
||||
)
|
||||
) : (
|
||||
<>
|
||||
{/* Render stream items in FIFO order */}
|
||||
{renderStreamItems(streamItems, true)}
|
||||
|
||||
{/* Streaming indicator when actively streaming text */}
|
||||
{isStreaming && hasStreamItems && !hasActiveTools && (
|
||||
<BlinkingDot />
|
||||
{/* Show cancelled message or streaming indicator */}
|
||||
{userCancelled ? (
|
||||
<Text as="p" secondaryBody text04>
|
||||
User has stopped generation
|
||||
</Text>
|
||||
) : (
|
||||
isStreaming &&
|
||||
hasStreamItems &&
|
||||
!hasActiveTools && <BlinkingDot />
|
||||
)}
|
||||
</>
|
||||
)}
|
||||
|
||||
@@ -112,7 +112,7 @@ export default function BuildChatPanel({
|
||||
const nameBuildSession = useBuildSessionStore(
|
||||
(state) => state.nameBuildSession
|
||||
);
|
||||
const { streamMessage } = useBuildStreaming();
|
||||
const { streamMessage, abortStream } = useBuildStreaming();
|
||||
const isPreProvisioning = useIsPreProvisioning();
|
||||
const isPreProvisioningFailed = useIsPreProvisioningFailed();
|
||||
const preProvisionedSessionId = usePreProvisionedSessionId();
|
||||
@@ -431,6 +431,7 @@ export default function BuildChatPanel({
|
||||
messages={session?.messages ?? []}
|
||||
streamItems={session?.streamItems ?? []}
|
||||
isStreaming={isRunning}
|
||||
userCancelled={session?.status === "cancelled"}
|
||||
autoScrollEnabled={isAtBottom}
|
||||
/>
|
||||
)}
|
||||
@@ -483,6 +484,7 @@ export default function BuildChatPanel({
|
||||
<InputBar
|
||||
ref={inputBarRef}
|
||||
onSubmit={handleSubmit}
|
||||
onStop={abortStream}
|
||||
isRunning={isRunning}
|
||||
placeholder="Continue the conversation..."
|
||||
/>
|
||||
|
||||
@@ -36,6 +36,7 @@ import {
|
||||
SvgPaperclip,
|
||||
SvgOrganization,
|
||||
SvgAlertCircle,
|
||||
SvgStop,
|
||||
} from "@opal/icons";
|
||||
|
||||
const MAX_INPUT_HEIGHT = 200;
|
||||
@@ -52,6 +53,8 @@ export interface InputBarProps {
|
||||
files: BuildFile[],
|
||||
demoDataEnabled: boolean
|
||||
) => void;
|
||||
/** Callback to stop the current generation. If provided and isRunning is true, shows a stop button. */
|
||||
onStop?: () => void;
|
||||
isRunning: boolean;
|
||||
disabled?: boolean;
|
||||
placeholder?: string;
|
||||
@@ -153,6 +156,7 @@ const InputBar = memo(
|
||||
(
|
||||
{
|
||||
onSubmit,
|
||||
onStop,
|
||||
isRunning,
|
||||
disabled = false,
|
||||
placeholder = "Describe your task...",
|
||||
@@ -405,18 +409,26 @@ const InputBar = memo(
|
||||
|
||||
{/* Bottom right controls */}
|
||||
<div className="flex flex-row items-center gap-1">
|
||||
{/* Submit button */}
|
||||
<IconButton
|
||||
icon={sandboxInitializing ? SvgLoader : SvgArrowUp}
|
||||
onClick={handleSubmit}
|
||||
disabled={!canSubmit}
|
||||
tooltip={
|
||||
sandboxInitializing ? "Initializing sandbox..." : "Send"
|
||||
}
|
||||
iconClassName={
|
||||
sandboxInitializing ? "animate-spin" : undefined
|
||||
}
|
||||
/>
|
||||
{/* Submit button - shows Stop when running, Send otherwise */}
|
||||
{isRunning && onStop ? (
|
||||
<IconButton
|
||||
icon={SvgStop}
|
||||
onClick={() => onStop()}
|
||||
tooltip="Stop generation"
|
||||
/>
|
||||
) : (
|
||||
<IconButton
|
||||
icon={sandboxInitializing ? SvgLoader : SvgArrowUp}
|
||||
onClick={handleSubmit}
|
||||
disabled={!canSubmit}
|
||||
tooltip={
|
||||
sandboxInitializing ? "Initializing sandbox..." : "Send"
|
||||
}
|
||||
iconClassName={
|
||||
sandboxInitializing ? "animate-spin" : undefined
|
||||
}
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
fetchSession,
|
||||
generateFollowupSuggestions,
|
||||
RateLimitError,
|
||||
cancelMessage,
|
||||
} from "@/app/craft/services/apiServices";
|
||||
|
||||
import { useBuildSessionStore } from "@/app/craft/hooks/useBuildSessionStore";
|
||||
@@ -40,9 +41,7 @@ export function useBuildStreaming() {
|
||||
const setAbortController = useBuildSessionStore(
|
||||
(state) => state.setAbortController
|
||||
);
|
||||
const abortCurrentSession = useBuildSessionStore(
|
||||
(state) => state.abortCurrentSession
|
||||
);
|
||||
const abortSession = useBuildSessionStore((state) => state.abortSession);
|
||||
const updateSessionData = useBuildSessionStore(
|
||||
(state) => state.updateSessionData
|
||||
);
|
||||
@@ -449,11 +448,45 @@ export function useBuildStreaming() {
|
||||
]
|
||||
);
|
||||
|
||||
/**
|
||||
* Abort the current streaming operation.
|
||||
* This both:
|
||||
* 1. Aborts the HTTP request (via AbortController)
|
||||
* 2. Sends a cancel notification to the ACP agent (via backend API)
|
||||
* 3. Updates session status to "cancelled" to update UI state
|
||||
*/
|
||||
const abortStream = useCallback(
|
||||
async (sessionId?: string) => {
|
||||
const targetSessionId =
|
||||
sessionId ?? useBuildSessionStore.getState().currentSessionId;
|
||||
|
||||
if (!targetSessionId) {
|
||||
return;
|
||||
}
|
||||
|
||||
// First, send cancel to the backend to stop the agent
|
||||
// This sends session/cancel notification per ACP protocol
|
||||
try {
|
||||
await cancelMessage(targetSessionId);
|
||||
} catch (err) {
|
||||
console.error("[Streaming] Failed to cancel agent operation:", err);
|
||||
}
|
||||
|
||||
// Update status first so the UI transitions atomically (stop button
|
||||
// disappears and cancellation message appears before the stream closes)
|
||||
updateSessionData(targetSessionId, { status: "cancelled" });
|
||||
|
||||
// Abort the HTTP request to stop receiving events
|
||||
abortSession(targetSessionId);
|
||||
},
|
||||
[abortSession, updateSessionData]
|
||||
);
|
||||
|
||||
return useMemo(
|
||||
() => ({
|
||||
streamMessage,
|
||||
abortStream: abortCurrentSession,
|
||||
abortStream,
|
||||
}),
|
||||
[streamMessage, abortCurrentSession]
|
||||
[streamMessage, abortStream]
|
||||
);
|
||||
}
|
||||
|
||||
@@ -358,6 +358,30 @@ export async function sendMessageStream(
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel the current message/prompt operation for a session.
|
||||
* Sends a session/cancel notification to the ACP agent to stop
|
||||
* the currently running operation.
|
||||
*
|
||||
* This follows the ACP (Agent Client Protocol) specification for cancellation.
|
||||
*
|
||||
* @returns true if cancel was sent, false if no active operation
|
||||
*/
|
||||
export async function cancelMessage(sessionId: string): Promise<boolean> {
|
||||
const res = await fetch(`${API_BASE}/sessions/${sessionId}/cancel-message`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
console.error(`Failed to cancel message: ${res.status}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
const data = await res.json();
|
||||
return data.cancelled ?? false;
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Artifacts API
|
||||
// =============================================================================
|
||||
|
||||
@@ -125,7 +125,8 @@ export type SessionStatus =
|
||||
| "creating"
|
||||
| "running"
|
||||
| "active"
|
||||
| "failed";
|
||||
| "failed"
|
||||
| "cancelled";
|
||||
|
||||
export interface Session {
|
||||
id: string | null;
|
||||
|
||||
Reference in New Issue
Block a user