mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-04-09 00:42:47 +00:00
Compare commits
1 Commits
v3.1.2
...
refactor/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
55040dc23c |
347
web/src/app/chat/message/messageComponents/AgentMessage.tsx
Normal file
347
web/src/app/chat/message/messageComponents/AgentMessage.tsx
Normal file
@@ -0,0 +1,347 @@
|
||||
import React, { useRef, useState, useCallback, RefObject } from "react";
|
||||
import { Packet, StopReason } from "@/app/chat/services/streamingModels";
|
||||
import { FullChatState } from "@/app/chat/message/messageComponents/interfaces";
|
||||
import { FeedbackType } from "@/app/chat/interfaces";
|
||||
import { useCurrentChatState } from "@/app/chat/stores/useChatSessionStore";
|
||||
import { handleCopy } from "@/app/chat/message/copyingUtils";
|
||||
import { BlinkingDot } from "@/app/chat/message/BlinkingDot";
|
||||
import { isDisplayPacket, isToolPacket } from "@/app/chat/services/packetUtils";
|
||||
import { useMessageSwitching } from "@/app/chat/message/messageComponents/hooks/useMessageSwitching";
|
||||
import MultiToolRenderer from "@/app/chat/message/messageComponents/MultiToolRenderer";
|
||||
import { RendererComponent } from "@/app/chat/message/messageComponents/renderMessageComponent";
|
||||
import { usePacketProcessor } from "@/app/chat/message/messageComponents/usePacketProcessor";
|
||||
import MessageToolbar from "@/app/chat/message/messageComponents/MessageToolbar";
|
||||
import AgentAvatar from "@/refresh-components/avatars/AgentAvatar";
|
||||
import { LlmDescriptor, LlmManager } from "@/lib/hooks";
|
||||
import { Message } from "@/app/chat/interfaces";
|
||||
import { useCreateModal } from "@/refresh-components/contexts/ModalContext";
|
||||
import FeedbackModal, {
|
||||
FeedbackModalProps,
|
||||
} from "@/app/chat/components/modal/FeedbackModal";
|
||||
import { usePopup } from "@/components/admin/connectors/Popup";
|
||||
import { useFeedbackController } from "@/app/chat/hooks/useFeedbackController";
|
||||
import Text from "@/refresh-components/texts/Text";
|
||||
import { useTripleClickSelect } from "@/hooks/useTripleClickSelect";
|
||||
|
||||
// Type for the regeneration factory function passed from ChatUI
|
||||
export type RegenerationFactory = (regenerationRequest: {
|
||||
messageId: number;
|
||||
parentMessage: Message;
|
||||
forceSearch?: boolean;
|
||||
}) => (modelOverride: LlmDescriptor) => Promise<void>;
|
||||
|
||||
export interface AgentMessageProps {
|
||||
rawPackets: Packet[];
|
||||
chatState: FullChatState;
|
||||
nodeId: number;
|
||||
messageId?: number;
|
||||
currentFeedback?: FeedbackType | null;
|
||||
llmManager: LlmManager | null;
|
||||
otherMessagesCanSwitchTo?: number[];
|
||||
onMessageSelection?: (nodeId: number) => void;
|
||||
// Stable regeneration callback - takes (parentMessage) and returns a function that takes (modelOverride)
|
||||
onRegenerate?: RegenerationFactory;
|
||||
// Parent message needed to construct regeneration request
|
||||
parentMessage?: Message | null;
|
||||
}
|
||||
|
||||
// TODO: Consider more robust comparisons:
|
||||
// - `rawPackets.length` assumes packets are append-only. Could compare the last
|
||||
// packet or use a shallow comparison if packets can be modified in place.
|
||||
// - `chatState.docs`, `chatState.citations`, and `otherMessagesCanSwitchTo` use
|
||||
// reference equality. Shallow array/object comparison would be more robust if
|
||||
// these are recreated with the same values.
|
||||
function arePropsEqual(
|
||||
prev: AgentMessageProps,
|
||||
next: AgentMessageProps
|
||||
): boolean {
|
||||
return (
|
||||
prev.nodeId === next.nodeId &&
|
||||
prev.messageId === next.messageId &&
|
||||
prev.currentFeedback === next.currentFeedback &&
|
||||
prev.rawPackets.length === next.rawPackets.length &&
|
||||
prev.chatState.assistant?.id === next.chatState.assistant?.id &&
|
||||
prev.chatState.docs === next.chatState.docs &&
|
||||
prev.chatState.citations === next.chatState.citations &&
|
||||
prev.chatState.overriddenModel === next.chatState.overriddenModel &&
|
||||
prev.chatState.researchType === next.chatState.researchType &&
|
||||
prev.otherMessagesCanSwitchTo === next.otherMessagesCanSwitchTo &&
|
||||
prev.onRegenerate === next.onRegenerate &&
|
||||
prev.parentMessage?.messageId === next.parentMessage?.messageId &&
|
||||
prev.llmManager?.isLoadingProviders === next.llmManager?.isLoadingProviders
|
||||
// Skip: chatState.regenerate, chatState.setPresentingDocument,
|
||||
// most of llmManager, onMessageSelection (function/object props)
|
||||
);
|
||||
}
|
||||
|
||||
const AgentMessage = React.memo(function AgentMessage({
|
||||
rawPackets,
|
||||
chatState,
|
||||
nodeId,
|
||||
messageId,
|
||||
currentFeedback,
|
||||
llmManager,
|
||||
otherMessagesCanSwitchTo,
|
||||
onMessageSelection,
|
||||
onRegenerate,
|
||||
parentMessage,
|
||||
}: AgentMessageProps) {
|
||||
const markdownRef = useRef<HTMLDivElement>(null);
|
||||
const finalAnswerRef = useRef<HTMLDivElement>(null);
|
||||
const handleTripleClick = useTripleClickSelect(markdownRef);
|
||||
const { popup, setPopup } = usePopup();
|
||||
const { handleFeedbackChange } = useFeedbackController({ setPopup });
|
||||
|
||||
// Get the global chat state to know if we're currently streaming
|
||||
const globalChatState = useCurrentChatState();
|
||||
|
||||
const modal = useCreateModal();
|
||||
const [feedbackModalProps, setFeedbackModalProps] =
|
||||
useState<FeedbackModalProps | null>(null);
|
||||
|
||||
// Helper to check if feedback button should be in transient state
|
||||
const isFeedbackTransient = useCallback(
|
||||
(feedbackType: "like" | "dislike") => {
|
||||
const hasCurrentFeedback = currentFeedback === feedbackType;
|
||||
if (!modal.isOpen) return hasCurrentFeedback;
|
||||
|
||||
const isModalForThisFeedback =
|
||||
feedbackModalProps?.feedbackType === feedbackType;
|
||||
const isModalForThisMessage = feedbackModalProps?.messageId === messageId;
|
||||
|
||||
return (
|
||||
hasCurrentFeedback || (isModalForThisFeedback && isModalForThisMessage)
|
||||
);
|
||||
},
|
||||
[currentFeedback, modal, feedbackModalProps, messageId]
|
||||
);
|
||||
|
||||
// Handler for feedback button clicks with toggle logic
|
||||
const handleFeedbackClick = useCallback(
|
||||
async (clickedFeedback: "like" | "dislike") => {
|
||||
if (!messageId) {
|
||||
console.error("Cannot provide feedback - message has no messageId");
|
||||
return;
|
||||
}
|
||||
|
||||
// Toggle logic
|
||||
if (currentFeedback === clickedFeedback) {
|
||||
// Clicking same button - remove feedback
|
||||
await handleFeedbackChange(messageId, null);
|
||||
}
|
||||
|
||||
// Clicking like (will automatically clear dislike if it was active).
|
||||
// Check if we need modal for positive feedback.
|
||||
else if (clickedFeedback === "like") {
|
||||
const predefinedOptions =
|
||||
process.env.NEXT_PUBLIC_POSITIVE_PREDEFINED_FEEDBACK_OPTIONS;
|
||||
if (predefinedOptions && predefinedOptions.trim()) {
|
||||
// Open modal for positive feedback
|
||||
setFeedbackModalProps({
|
||||
feedbackType: "like",
|
||||
messageId,
|
||||
});
|
||||
modal.toggle(true);
|
||||
} else {
|
||||
// No modal needed - just submit like (this replaces any existing feedback)
|
||||
await handleFeedbackChange(messageId, "like");
|
||||
}
|
||||
}
|
||||
|
||||
// Clicking dislike (will automatically clear like if it was active).
|
||||
// Always open modal for dislike.
|
||||
else {
|
||||
setFeedbackModalProps({
|
||||
feedbackType: "dislike",
|
||||
messageId,
|
||||
});
|
||||
modal.toggle(true);
|
||||
}
|
||||
},
|
||||
[messageId, currentFeedback, handleFeedbackChange, modal]
|
||||
);
|
||||
|
||||
// Use the packet processor hook for all streaming packet processing
|
||||
const {
|
||||
citations,
|
||||
citationMap,
|
||||
documentMap,
|
||||
groupedPackets,
|
||||
finalAnswerComing,
|
||||
stopPacketSeen,
|
||||
stopReason,
|
||||
expectedBranchesPerTurn,
|
||||
displayComplete,
|
||||
setDisplayComplete,
|
||||
setFinalAnswerComingOverride,
|
||||
} = usePacketProcessor(rawPackets, nodeId);
|
||||
|
||||
// Keep a ref to finalAnswerComing for use in callbacks (to read latest value)
|
||||
const finalAnswerComingRef = useRef(finalAnswerComing);
|
||||
finalAnswerComingRef.current = finalAnswerComing;
|
||||
|
||||
// Create a chatState that uses streaming citations for immediate rendering
|
||||
// This merges the prop citations with streaming citations, preferring streaming ones
|
||||
const effectiveChatState: FullChatState = {
|
||||
...chatState,
|
||||
citations: {
|
||||
...chatState.citations,
|
||||
...citationMap,
|
||||
},
|
||||
};
|
||||
|
||||
// Message switching logic
|
||||
const {
|
||||
currentMessageInd,
|
||||
includeMessageSwitcher,
|
||||
getPreviousMessage,
|
||||
getNextMessage,
|
||||
} = useMessageSwitching({
|
||||
nodeId,
|
||||
otherMessagesCanSwitchTo,
|
||||
onMessageSelection,
|
||||
});
|
||||
|
||||
// Return a list of rendered message components, one for each ind
|
||||
return (
|
||||
<>
|
||||
{popup}
|
||||
|
||||
<modal.Provider>
|
||||
<FeedbackModal {...feedbackModalProps!} />
|
||||
</modal.Provider>
|
||||
|
||||
<div
|
||||
// for e2e tests
|
||||
data-testid={displayComplete ? "onyx-ai-message" : undefined}
|
||||
className="flex items-start pb-5 md:pt-5"
|
||||
>
|
||||
<AgentAvatar agent={chatState.assistant} size={24} />
|
||||
{/* w-full ensures the MultiToolRenderer non-expanded state takes up the full width */}
|
||||
<div className="max-w-message-max break-words pl-4 w-full">
|
||||
<div
|
||||
ref={markdownRef}
|
||||
className="overflow-x-visible max-w-content-max focus:outline-none select-text cursor-text"
|
||||
onMouseDown={handleTripleClick}
|
||||
onCopy={(e) => {
|
||||
if (markdownRef.current) {
|
||||
handleCopy(e, markdownRef as RefObject<HTMLDivElement>);
|
||||
}
|
||||
}}
|
||||
>
|
||||
{groupedPackets.length === 0 ? (
|
||||
// Show blinking dot when no content yet, or stopped message if user cancelled
|
||||
stopReason === StopReason.USER_CANCELLED ? (
|
||||
<Text as="p" secondaryBody text04>
|
||||
User has stopped generation
|
||||
</Text>
|
||||
) : (
|
||||
<BlinkingDot addMargin />
|
||||
)
|
||||
) : (
|
||||
(() => {
|
||||
// Simple split: tools vs non-tools
|
||||
const toolGroups = groupedPackets.filter(
|
||||
(group) =>
|
||||
group.packets[0] && isToolPacket(group.packets[0], false)
|
||||
);
|
||||
|
||||
// Non-tools include messages AND image generation
|
||||
const displayGroups =
|
||||
finalAnswerComing || toolGroups.length === 0
|
||||
? groupedPackets.filter(
|
||||
(group) =>
|
||||
group.packets[0] && isDisplayPacket(group.packets[0])
|
||||
)
|
||||
: [];
|
||||
|
||||
return (
|
||||
<>
|
||||
{/* Render tool groups in multi-tool renderer */}
|
||||
{toolGroups.length > 0 && (
|
||||
<MultiToolRenderer
|
||||
packetGroups={toolGroups}
|
||||
chatState={effectiveChatState}
|
||||
isComplete={finalAnswerComing}
|
||||
isFinalAnswerComing={finalAnswerComingRef.current}
|
||||
stopPacketSeen={stopPacketSeen}
|
||||
stopReason={stopReason}
|
||||
isStreaming={globalChatState === "streaming"}
|
||||
onAllToolsDisplayed={() =>
|
||||
setFinalAnswerComingOverride(true)
|
||||
}
|
||||
expectedBranchesPerTurn={expectedBranchesPerTurn}
|
||||
/>
|
||||
)}
|
||||
|
||||
{/* Render all display groups (messages + image generation) in main area */}
|
||||
<div ref={finalAnswerRef}>
|
||||
{displayGroups.map((displayGroup, index) => (
|
||||
<RendererComponent
|
||||
key={`${displayGroup.turn_index}-${displayGroup.tab_index}`}
|
||||
packets={displayGroup.packets}
|
||||
chatState={effectiveChatState}
|
||||
onComplete={() => {
|
||||
// if we've reverted to final answer not coming, don't set display complete
|
||||
// this happens when using claude and a tool calling packet comes after
|
||||
// some message packets
|
||||
// Only mark complete on the last display group
|
||||
if (
|
||||
finalAnswerComingRef.current &&
|
||||
index === displayGroups.length - 1
|
||||
) {
|
||||
setDisplayComplete(true);
|
||||
}
|
||||
}}
|
||||
animate={false}
|
||||
stopPacketSeen={stopPacketSeen}
|
||||
stopReason={stopReason}
|
||||
>
|
||||
{({ content }) => <div>{content}</div>}
|
||||
</RendererComponent>
|
||||
))}
|
||||
{/* Show stopped message when user cancelled and no display content */}
|
||||
{displayGroups.length === 0 &&
|
||||
stopReason === StopReason.USER_CANCELLED && (
|
||||
<Text as="p" secondaryBody text04>
|
||||
User has stopped generation
|
||||
</Text>
|
||||
)}
|
||||
</div>
|
||||
</>
|
||||
);
|
||||
})()
|
||||
)}
|
||||
</div>
|
||||
|
||||
{/* Feedback buttons - only show when streaming is complete */}
|
||||
{stopPacketSeen && displayComplete && (
|
||||
<MessageToolbar
|
||||
nodeId={nodeId}
|
||||
messageId={messageId}
|
||||
includeMessageSwitcher={includeMessageSwitcher}
|
||||
currentMessageInd={currentMessageInd}
|
||||
otherMessagesCanSwitchTo={otherMessagesCanSwitchTo}
|
||||
getPreviousMessage={getPreviousMessage}
|
||||
getNextMessage={getNextMessage}
|
||||
onMessageSelection={onMessageSelection}
|
||||
rawPackets={rawPackets}
|
||||
finalAnswerRef={finalAnswerRef}
|
||||
currentFeedback={currentFeedback}
|
||||
onFeedbackClick={handleFeedbackClick}
|
||||
isFeedbackTransient={isFeedbackTransient}
|
||||
onRegenerate={onRegenerate}
|
||||
parentMessage={parentMessage}
|
||||
llmManager={llmManager}
|
||||
currentModelName={chatState.overriddenModel}
|
||||
citations={citations}
|
||||
documentMap={documentMap}
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
</>
|
||||
);
|
||||
}, arePropsEqual);
|
||||
|
||||
export default AgentMessage;
|
||||
193
web/src/app/chat/message/messageComponents/MessageToolbar.tsx
Normal file
193
web/src/app/chat/message/messageComponents/MessageToolbar.tsx
Normal file
@@ -0,0 +1,193 @@
|
||||
import React, { RefObject } from "react";
|
||||
import { Packet, StreamingCitation } from "@/app/chat/services/streamingModels";
|
||||
import { FeedbackType } from "@/app/chat/interfaces";
|
||||
import { OnyxDocument } from "@/lib/search/interfaces";
|
||||
import { TooltipGroup } from "@/components/tooltip/CustomTooltip";
|
||||
import {
|
||||
useChatSessionStore,
|
||||
useDocumentSidebarVisible,
|
||||
useSelectedNodeForDocDisplay,
|
||||
} from "@/app/chat/stores/useChatSessionStore";
|
||||
import {
|
||||
handleCopy,
|
||||
convertMarkdownTablesToTsv,
|
||||
} from "@/app/chat/message/copyingUtils";
|
||||
import { getTextContent } from "@/app/chat/services/packetUtils";
|
||||
import { removeThinkingTokens } from "@/app/chat/services/thinkingTokens";
|
||||
import MessageSwitcher from "@/app/chat/message/MessageSwitcher";
|
||||
import CitedSourcesToggle from "@/app/chat/message/messageComponents/CitedSourcesToggle";
|
||||
import IconButton from "@/refresh-components/buttons/IconButton";
|
||||
import CopyIconButton from "@/refresh-components/buttons/CopyIconButton";
|
||||
import LLMPopover from "@/refresh-components/popovers/LLMPopover";
|
||||
import { parseLlmDescriptor } from "@/lib/llm/utils";
|
||||
import { LlmDescriptor, LlmManager } from "@/lib/hooks";
|
||||
import { Message } from "@/app/chat/interfaces";
|
||||
import { SvgThumbsDown, SvgThumbsUp } from "@opal/icons";
|
||||
import { RegenerationFactory } from "./AgentMessage";
|
||||
|
||||
export interface MessageToolbarProps {
|
||||
// Message identification
|
||||
nodeId: number;
|
||||
messageId?: number;
|
||||
|
||||
// Message switching
|
||||
includeMessageSwitcher: boolean;
|
||||
currentMessageInd: number | null | undefined;
|
||||
otherMessagesCanSwitchTo?: number[];
|
||||
getPreviousMessage: () => number | undefined;
|
||||
getNextMessage: () => number | undefined;
|
||||
onMessageSelection?: (nodeId: number) => void;
|
||||
|
||||
// Copy functionality
|
||||
rawPackets: Packet[];
|
||||
finalAnswerRef: RefObject<HTMLDivElement | null>;
|
||||
|
||||
// Feedback
|
||||
currentFeedback?: FeedbackType | null;
|
||||
onFeedbackClick: (feedbackType: "like" | "dislike") => void;
|
||||
isFeedbackTransient: (feedbackType: "like" | "dislike") => boolean;
|
||||
|
||||
// Regeneration
|
||||
onRegenerate?: RegenerationFactory;
|
||||
parentMessage?: Message | null;
|
||||
llmManager: LlmManager | null;
|
||||
currentModelName?: string;
|
||||
|
||||
// Citations
|
||||
citations: StreamingCitation[];
|
||||
documentMap: Map<string, OnyxDocument>;
|
||||
}
|
||||
|
||||
export default function MessageToolbar({
|
||||
nodeId,
|
||||
messageId,
|
||||
includeMessageSwitcher,
|
||||
currentMessageInd,
|
||||
otherMessagesCanSwitchTo,
|
||||
getPreviousMessage,
|
||||
getNextMessage,
|
||||
onMessageSelection,
|
||||
rawPackets,
|
||||
finalAnswerRef,
|
||||
currentFeedback,
|
||||
onFeedbackClick,
|
||||
isFeedbackTransient,
|
||||
onRegenerate,
|
||||
parentMessage,
|
||||
llmManager,
|
||||
currentModelName,
|
||||
citations,
|
||||
documentMap,
|
||||
}: MessageToolbarProps) {
|
||||
// Document sidebar state - managed internally to reduce prop drilling
|
||||
const documentSidebarVisible = useDocumentSidebarVisible();
|
||||
const selectedMessageForDocDisplay = useSelectedNodeForDocDisplay();
|
||||
const updateCurrentDocumentSidebarVisible = useChatSessionStore(
|
||||
(state) => state.updateCurrentDocumentSidebarVisible
|
||||
);
|
||||
const updateCurrentSelectedNodeForDocDisplay = useChatSessionStore(
|
||||
(state) => state.updateCurrentSelectedNodeForDocDisplay
|
||||
);
|
||||
|
||||
return (
|
||||
<div className="flex md:flex-row justify-between items-center w-full mt-1 transition-transform duration-300 ease-in-out transform opacity-100">
|
||||
<TooltipGroup>
|
||||
<div className="flex items-center gap-x-0.5">
|
||||
{includeMessageSwitcher && (
|
||||
<div className="-mx-1">
|
||||
<MessageSwitcher
|
||||
currentPage={(currentMessageInd ?? 0) + 1}
|
||||
totalPages={otherMessagesCanSwitchTo?.length || 0}
|
||||
handlePrevious={() => {
|
||||
const prevMessage = getPreviousMessage();
|
||||
if (prevMessage !== undefined && onMessageSelection) {
|
||||
onMessageSelection(prevMessage);
|
||||
}
|
||||
}}
|
||||
handleNext={() => {
|
||||
const nextMessage = getNextMessage();
|
||||
if (nextMessage !== undefined && onMessageSelection) {
|
||||
onMessageSelection(nextMessage);
|
||||
}
|
||||
}}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
|
||||
<CopyIconButton
|
||||
getCopyText={() =>
|
||||
convertMarkdownTablesToTsv(
|
||||
removeThinkingTokens(getTextContent(rawPackets)) as string
|
||||
)
|
||||
}
|
||||
getHtmlContent={() => finalAnswerRef.current?.innerHTML || ""}
|
||||
tertiary
|
||||
data-testid="AgentMessage/copy-button"
|
||||
/>
|
||||
<IconButton
|
||||
icon={SvgThumbsUp}
|
||||
onClick={() => onFeedbackClick("like")}
|
||||
tertiary
|
||||
transient={isFeedbackTransient("like")}
|
||||
tooltip={
|
||||
currentFeedback === "like" ? "Remove Like" : "Good Response"
|
||||
}
|
||||
data-testid="AgentMessage/like-button"
|
||||
/>
|
||||
<IconButton
|
||||
icon={SvgThumbsDown}
|
||||
onClick={() => onFeedbackClick("dislike")}
|
||||
tertiary
|
||||
transient={isFeedbackTransient("dislike")}
|
||||
tooltip={
|
||||
currentFeedback === "dislike" ? "Remove Dislike" : "Bad Response"
|
||||
}
|
||||
data-testid="AgentMessage/dislike-button"
|
||||
/>
|
||||
|
||||
{onRegenerate &&
|
||||
messageId !== undefined &&
|
||||
parentMessage &&
|
||||
llmManager && (
|
||||
<div data-testid="AgentMessage/regenerate">
|
||||
<LLMPopover
|
||||
llmManager={llmManager}
|
||||
currentModelName={currentModelName}
|
||||
onSelect={(modelName) => {
|
||||
const llmDescriptor = parseLlmDescriptor(modelName);
|
||||
const regenerator = onRegenerate({
|
||||
messageId,
|
||||
parentMessage,
|
||||
});
|
||||
regenerator(llmDescriptor);
|
||||
}}
|
||||
folded
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{nodeId && (citations.length > 0 || documentMap.size > 0) && (
|
||||
<CitedSourcesToggle
|
||||
citations={citations}
|
||||
documentMap={documentMap}
|
||||
nodeId={nodeId}
|
||||
onToggle={(toggledNodeId) => {
|
||||
// Toggle sidebar if clicking on the same message
|
||||
if (
|
||||
selectedMessageForDocDisplay === toggledNodeId &&
|
||||
documentSidebarVisible
|
||||
) {
|
||||
updateCurrentDocumentSidebarVisible(false);
|
||||
updateCurrentSelectedNodeForDocDisplay(null);
|
||||
} else {
|
||||
updateCurrentSelectedNodeForDocDisplay(toggledNodeId);
|
||||
updateCurrentDocumentSidebarVisible(true);
|
||||
}
|
||||
}}
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
</TooltipGroup>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
377
web/src/app/chat/message/messageComponents/packetProcessor.ts
Normal file
377
web/src/app/chat/message/messageComponents/packetProcessor.ts
Normal file
@@ -0,0 +1,377 @@
|
||||
import {
|
||||
Packet,
|
||||
PacketType,
|
||||
StreamingCitation,
|
||||
StopReason,
|
||||
CitationInfo,
|
||||
SearchToolDocumentsDelta,
|
||||
FetchToolDocuments,
|
||||
TopLevelBranching,
|
||||
Stop,
|
||||
} from "@/app/chat/services/streamingModels";
|
||||
import { CitationMap } from "@/app/chat/interfaces";
|
||||
import { OnyxDocument } from "@/lib/search/interfaces";
|
||||
import { isActualToolCallPacket } from "@/app/chat/services/packetUtils";
|
||||
import { parseToolKey } from "@/app/chat/message/messageComponents/toolDisplayHelpers";
|
||||
|
||||
// Re-export parseToolKey for consumers that import from this module
|
||||
export { parseToolKey };
|
||||
|
||||
// ============================================================================
|
||||
// Types
|
||||
// ============================================================================
|
||||
|
||||
export interface ProcessorState {
|
||||
nodeId: number;
|
||||
lastProcessedIndex: number;
|
||||
|
||||
// Citations
|
||||
citations: StreamingCitation[];
|
||||
seenCitationDocIds: Set<string>;
|
||||
citationMap: CitationMap;
|
||||
|
||||
// Documents
|
||||
documentMap: Map<string, OnyxDocument>;
|
||||
|
||||
// Packet grouping
|
||||
groupedPacketsMap: Map<string, Packet[]>;
|
||||
seenGroupKeys: Set<string>;
|
||||
groupKeysWithSectionEnd: Set<string>;
|
||||
expectedBranches: Map<number, number>;
|
||||
|
||||
// Streaming status
|
||||
finalAnswerComing: boolean;
|
||||
stopPacketSeen: boolean;
|
||||
stopReason: StopReason | undefined;
|
||||
}
|
||||
|
||||
export interface GroupedPacket {
|
||||
turn_index: number;
|
||||
tab_index: number;
|
||||
packets: Packet[];
|
||||
}
|
||||
|
||||
export interface ProcessorResult {
|
||||
citations: StreamingCitation[];
|
||||
citationMap: CitationMap;
|
||||
documentMap: Map<string, OnyxDocument>;
|
||||
groupedPackets: GroupedPacket[];
|
||||
finalAnswerComing: boolean;
|
||||
stopPacketSeen: boolean;
|
||||
stopReason: StopReason | undefined;
|
||||
expectedBranchesPerTurn: Map<number, number>;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// State Creation
|
||||
// ============================================================================
|
||||
|
||||
export function createInitialState(nodeId: number): ProcessorState {
|
||||
return {
|
||||
nodeId,
|
||||
lastProcessedIndex: 0,
|
||||
citations: [],
|
||||
seenCitationDocIds: new Set(),
|
||||
citationMap: {},
|
||||
documentMap: new Map(),
|
||||
groupedPacketsMap: new Map(),
|
||||
seenGroupKeys: new Set(),
|
||||
groupKeysWithSectionEnd: new Set(),
|
||||
expectedBranches: new Map(),
|
||||
finalAnswerComing: false,
|
||||
stopPacketSeen: false,
|
||||
stopReason: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Helper Functions
|
||||
// ============================================================================
|
||||
|
||||
function getGroupKey(packet: Packet): string {
|
||||
const turnIndex = packet.placement.turn_index;
|
||||
const tabIndex = packet.placement.tab_index ?? 0;
|
||||
return `${turnIndex}-${tabIndex}`;
|
||||
}
|
||||
|
||||
function injectSectionEnd(state: ProcessorState, groupKey: string): void {
|
||||
if (state.groupKeysWithSectionEnd.has(groupKey)) {
|
||||
return; // Already has SECTION_END
|
||||
}
|
||||
|
||||
const { turn_index, tab_index } = parseToolKey(groupKey);
|
||||
|
||||
const syntheticPacket: Packet = {
|
||||
placement: { turn_index, tab_index },
|
||||
obj: { type: PacketType.SECTION_END },
|
||||
};
|
||||
|
||||
const existingGroup = state.groupedPacketsMap.get(groupKey);
|
||||
if (existingGroup) {
|
||||
existingGroup.push(syntheticPacket);
|
||||
}
|
||||
state.groupKeysWithSectionEnd.add(groupKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Content packet types that indicate a group has meaningful content to display
|
||||
*/
|
||||
const CONTENT_PACKET_TYPES = [
|
||||
PacketType.MESSAGE_START,
|
||||
PacketType.SEARCH_TOOL_START,
|
||||
PacketType.IMAGE_GENERATION_TOOL_START,
|
||||
PacketType.PYTHON_TOOL_START,
|
||||
PacketType.CUSTOM_TOOL_START,
|
||||
PacketType.FETCH_TOOL_START,
|
||||
PacketType.REASONING_START,
|
||||
PacketType.DEEP_RESEARCH_PLAN_START,
|
||||
PacketType.RESEARCH_AGENT_START,
|
||||
];
|
||||
|
||||
function hasContentPackets(packets: Packet[]): boolean {
|
||||
return packets.some((packet) =>
|
||||
CONTENT_PACKET_TYPES.includes(packet.obj.type as PacketType)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Packet types that indicate final answer content is coming
|
||||
*/
|
||||
const FINAL_ANSWER_PACKET_TYPES = [
|
||||
PacketType.MESSAGE_START,
|
||||
PacketType.MESSAGE_DELTA,
|
||||
PacketType.IMAGE_GENERATION_TOOL_START,
|
||||
PacketType.IMAGE_GENERATION_TOOL_DELTA,
|
||||
PacketType.PYTHON_TOOL_START,
|
||||
PacketType.PYTHON_TOOL_DELTA,
|
||||
];
|
||||
|
||||
// ============================================================================
|
||||
// Packet Handlers
|
||||
// ============================================================================
|
||||
|
||||
function handleTopLevelBranching(state: ProcessorState, packet: Packet): void {
|
||||
const branchingPacket = packet.obj as TopLevelBranching;
|
||||
state.expectedBranches.set(
|
||||
packet.placement.turn_index,
|
||||
branchingPacket.num_parallel_branches
|
||||
);
|
||||
}
|
||||
|
||||
function handleTurnTransition(state: ProcessorState, packet: Packet): void {
|
||||
const currentTurnIndex = packet.placement.turn_index;
|
||||
|
||||
// Get all previous turn indices from seen group keys
|
||||
const previousTurnIndices = new Set(
|
||||
Array.from(state.seenGroupKeys).map((key) => parseToolKey(key).turn_index)
|
||||
);
|
||||
|
||||
const isNewTurnIndex = !previousTurnIndices.has(currentTurnIndex);
|
||||
|
||||
// If we see a new turn_index (not just tab_index), inject SECTION_END for previous groups
|
||||
if (isNewTurnIndex && state.seenGroupKeys.size > 0) {
|
||||
state.seenGroupKeys.forEach((prevGroupKey) => {
|
||||
if (!state.groupKeysWithSectionEnd.has(prevGroupKey)) {
|
||||
injectSectionEnd(state, prevGroupKey);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function handleCitationPacket(state: ProcessorState, packet: Packet): void {
|
||||
if (packet.obj.type !== PacketType.CITATION_INFO) {
|
||||
return;
|
||||
}
|
||||
|
||||
const citationInfo = packet.obj as CitationInfo;
|
||||
|
||||
// Add to citation map immediately for rendering
|
||||
state.citationMap[citationInfo.citation_number] = citationInfo.document_id;
|
||||
|
||||
// Also add to citations array for CitedSourcesToggle (deduplicated)
|
||||
if (!state.seenCitationDocIds.has(citationInfo.document_id)) {
|
||||
state.seenCitationDocIds.add(citationInfo.document_id);
|
||||
state.citations.push({
|
||||
citation_num: citationInfo.citation_number,
|
||||
document_id: citationInfo.document_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function handleDocumentPacket(state: ProcessorState, packet: Packet): void {
|
||||
if (packet.obj.type === PacketType.SEARCH_TOOL_DOCUMENTS_DELTA) {
|
||||
const docDelta = packet.obj as SearchToolDocumentsDelta;
|
||||
if (docDelta.documents) {
|
||||
for (const doc of docDelta.documents) {
|
||||
if (doc.document_id) {
|
||||
state.documentMap.set(doc.document_id, doc);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (packet.obj.type === PacketType.FETCH_TOOL_DOCUMENTS) {
|
||||
const fetchDocuments = packet.obj as FetchToolDocuments;
|
||||
if (fetchDocuments.documents) {
|
||||
for (const doc of fetchDocuments.documents) {
|
||||
if (doc.document_id) {
|
||||
state.documentMap.set(doc.document_id, doc);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function handleStreamingStatusPacket(
|
||||
state: ProcessorState,
|
||||
packet: Packet
|
||||
): void {
|
||||
// Check if final answer is coming
|
||||
if (FINAL_ANSWER_PACKET_TYPES.includes(packet.obj.type as PacketType)) {
|
||||
state.finalAnswerComing = true;
|
||||
}
|
||||
}
|
||||
|
||||
function handleStopPacket(state: ProcessorState, packet: Packet): void {
|
||||
if (packet.obj.type !== PacketType.STOP || state.stopPacketSeen) {
|
||||
return;
|
||||
}
|
||||
|
||||
state.stopPacketSeen = true;
|
||||
|
||||
// Extract and store the stop reason
|
||||
const stopPacket = packet.obj as Stop;
|
||||
state.stopReason = stopPacket.stop_reason;
|
||||
|
||||
// Inject SECTION_END for all group keys that don't have one
|
||||
state.seenGroupKeys.forEach((groupKey) => {
|
||||
if (!state.groupKeysWithSectionEnd.has(groupKey)) {
|
||||
injectSectionEnd(state, groupKey);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function handleToolAfterMessagePacket(
|
||||
state: ProcessorState,
|
||||
packet: Packet
|
||||
): void {
|
||||
// Handles case where we get a Message packet from Claude, and then tool
|
||||
// calling packets. We use isActualToolCallPacket instead of isToolPacket
|
||||
// to exclude reasoning packets - reasoning is just the model thinking,
|
||||
// not an actual tool call that would produce new content.
|
||||
if (
|
||||
state.finalAnswerComing &&
|
||||
!state.stopPacketSeen &&
|
||||
isActualToolCallPacket(packet)
|
||||
) {
|
||||
state.finalAnswerComing = false;
|
||||
}
|
||||
}
|
||||
|
||||
function addPacketToGroup(
|
||||
state: ProcessorState,
|
||||
packet: Packet,
|
||||
groupKey: string
|
||||
): void {
|
||||
const existingGroup = state.groupedPacketsMap.get(groupKey);
|
||||
if (existingGroup) {
|
||||
existingGroup.push(packet);
|
||||
} else {
|
||||
state.groupedPacketsMap.set(groupKey, [packet]);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Main Processing Function
|
||||
// ============================================================================
|
||||
|
||||
function processPacket(state: ProcessorState, packet: Packet): void {
|
||||
if (!packet) return;
|
||||
|
||||
// Handle TopLevelBranching packets - these tell us how many parallel branches to expect
|
||||
if (packet.obj.type === PacketType.TOP_LEVEL_BRANCHING) {
|
||||
handleTopLevelBranching(state, packet);
|
||||
// Don't add this packet to any group, it's just metadata
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle turn transitions (inject SECTION_END for previous groups)
|
||||
handleTurnTransition(state, packet);
|
||||
|
||||
// Track group key
|
||||
const groupKey = getGroupKey(packet);
|
||||
state.seenGroupKeys.add(groupKey);
|
||||
|
||||
// Track SECTION_END and ERROR packets (both indicate completion)
|
||||
if (
|
||||
packet.obj.type === PacketType.SECTION_END ||
|
||||
packet.obj.type === PacketType.ERROR
|
||||
) {
|
||||
state.groupKeysWithSectionEnd.add(groupKey);
|
||||
}
|
||||
|
||||
// Add packet to group
|
||||
addPacketToGroup(state, packet, groupKey);
|
||||
|
||||
// Handle specific packet types
|
||||
handleCitationPacket(state, packet);
|
||||
handleDocumentPacket(state, packet);
|
||||
handleStreamingStatusPacket(state, packet);
|
||||
handleStopPacket(state, packet);
|
||||
handleToolAfterMessagePacket(state, packet);
|
||||
}
|
||||
|
||||
export function processPackets(
|
||||
state: ProcessorState,
|
||||
rawPackets: Packet[]
|
||||
): ProcessorState {
|
||||
// Handle reset (packets array shrunk - upstream replaced with shorter list)
|
||||
if (state.lastProcessedIndex > rawPackets.length) {
|
||||
state = createInitialState(state.nodeId);
|
||||
}
|
||||
|
||||
// Process only new packets
|
||||
for (let i = state.lastProcessedIndex; i < rawPackets.length; i++) {
|
||||
const packet = rawPackets[i];
|
||||
if (packet) {
|
||||
processPacket(state, packet);
|
||||
}
|
||||
}
|
||||
|
||||
state.lastProcessedIndex = rawPackets.length;
|
||||
return state;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Result Derivation
|
||||
// ============================================================================
|
||||
|
||||
export function getResult(state: ProcessorState): ProcessorResult {
|
||||
// Build sorted, filtered grouped packets array
|
||||
// Clone packet arrays to ensure referential changes so downstream memo hooks update
|
||||
const groupedPackets = Array.from(state.groupedPacketsMap.entries())
|
||||
.map(([key, packets]) => {
|
||||
const { turn_index, tab_index } = parseToolKey(key);
|
||||
return {
|
||||
turn_index,
|
||||
tab_index,
|
||||
packets: [...packets],
|
||||
};
|
||||
})
|
||||
.filter(({ packets }) => hasContentPackets(packets))
|
||||
.sort((a, b) => {
|
||||
if (a.turn_index !== b.turn_index) {
|
||||
return a.turn_index - b.turn_index;
|
||||
}
|
||||
return a.tab_index - b.tab_index;
|
||||
});
|
||||
|
||||
return {
|
||||
citations: state.citations,
|
||||
citationMap: state.citationMap,
|
||||
documentMap: state.documentMap,
|
||||
groupedPackets,
|
||||
finalAnswerComing: state.finalAnswerComing,
|
||||
stopPacketSeen: state.stopPacketSeen,
|
||||
stopReason: state.stopReason,
|
||||
expectedBranchesPerTurn: state.expectedBranches,
|
||||
};
|
||||
}
|
||||
100
web/src/app/chat/message/messageComponents/usePacketProcessor.ts
Normal file
100
web/src/app/chat/message/messageComponents/usePacketProcessor.ts
Normal file
@@ -0,0 +1,100 @@
|
||||
import { useRef, useState } from "react";
|
||||
import { Packet } from "@/app/chat/services/streamingModels";
|
||||
import {
|
||||
ProcessorState,
|
||||
ProcessorResult,
|
||||
createInitialState,
|
||||
processPackets,
|
||||
getResult,
|
||||
} from "./packetProcessor";
|
||||
|
||||
export interface UsePacketProcessorResult extends ProcessorResult {
|
||||
displayComplete: boolean;
|
||||
setDisplayComplete: (value: boolean) => void;
|
||||
// UI override for finalAnswerComing - used when all tools have been displayed
|
||||
// and we want to show the message content regardless of packet state
|
||||
setFinalAnswerComingOverride: (value: boolean) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom hook for processing streaming packets in AgentMessage.
|
||||
*
|
||||
* This hook encapsulates all packet processing logic:
|
||||
* - Incremental processing (only processes new packets)
|
||||
* - Citation extraction and deduplication
|
||||
* - Document accumulation
|
||||
* - Packet grouping by turn_index and tab_index
|
||||
* - Streaming status tracking (finalAnswerComing, stopPacketSeen, stopReason)
|
||||
* - Synthetic SECTION_END injection for graceful tool completion
|
||||
*
|
||||
* The hook uses a ref to store the processor state, which allows for:
|
||||
* - Synchronous access during render
|
||||
* - Persistence across renders without triggering re-renders
|
||||
*
|
||||
* Re-renders are triggered by:
|
||||
* - Parent updating rawPackets prop (most common)
|
||||
* - Child calling setDisplayComplete (for animation completion)
|
||||
*
|
||||
* @param rawPackets - Array of packets from the streaming response
|
||||
* @param nodeId - Unique identifier for the message node (used for reset detection)
|
||||
* @returns ProcessorResult with derived values plus displayComplete state
|
||||
*/
|
||||
export function usePacketProcessor(
|
||||
rawPackets: Packet[],
|
||||
nodeId: number
|
||||
): UsePacketProcessorResult {
|
||||
const stateRef = useRef<ProcessorState>(createInitialState(nodeId));
|
||||
|
||||
// displayComplete needs state because it's set from child callback (onComplete)
|
||||
// which needs to trigger a re-render to show feedback buttons
|
||||
const [displayComplete, setDisplayComplete] = useState(false);
|
||||
|
||||
// UI override for finalAnswerComing - when all tools have been displayed,
|
||||
// we want to show the message content regardless of packet-derived state
|
||||
const [finalAnswerComingOverride, setFinalAnswerComingOverride] =
|
||||
useState(false);
|
||||
|
||||
// Reset on nodeId change
|
||||
if (stateRef.current.nodeId !== nodeId) {
|
||||
stateRef.current = createInitialState(nodeId);
|
||||
setDisplayComplete(false);
|
||||
setFinalAnswerComingOverride(false);
|
||||
}
|
||||
|
||||
// Track previous state to detect transitions
|
||||
const prevLastProcessed = stateRef.current.lastProcessedIndex;
|
||||
const prevFinalAnswerComing = stateRef.current.finalAnswerComing;
|
||||
|
||||
// Process packets (incremental - only processes new packets)
|
||||
stateRef.current = processPackets(stateRef.current, rawPackets);
|
||||
|
||||
// Detect tool-after-message scenario: if finalAnswerComing went from true to false,
|
||||
// it means a tool packet arrived after message packets. Reset displayComplete to
|
||||
// prevent showing feedback buttons while tools are still executing.
|
||||
if (prevFinalAnswerComing && !stateRef.current.finalAnswerComing) {
|
||||
setDisplayComplete(false);
|
||||
}
|
||||
|
||||
// Detect stream reset (packets array shrunk) - processPackets resets state internally,
|
||||
// but we also need to reset React state that lives outside the processor
|
||||
if (prevLastProcessed > rawPackets.length) {
|
||||
setDisplayComplete(false);
|
||||
setFinalAnswerComingOverride(false);
|
||||
}
|
||||
|
||||
// Get derived result
|
||||
const result = getResult(stateRef.current);
|
||||
|
||||
// Combine packet-derived finalAnswerComing with UI override
|
||||
// UI override is set when all tools have been displayed
|
||||
const finalAnswerComing =
|
||||
result.finalAnswerComing || finalAnswerComingOverride;
|
||||
|
||||
return {
|
||||
...result,
|
||||
finalAnswerComing,
|
||||
displayComplete,
|
||||
setDisplayComplete,
|
||||
setFinalAnswerComingOverride,
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user