Compare commits

...

1 Commits

Author SHA1 Message Date
SubashMohan
55040dc23c feat: Add AgentMessage component and related packet processing utilities 2026-01-13 11:11:17 +05:30
4 changed files with 1017 additions and 0 deletions

View File

@@ -0,0 +1,347 @@
import React, { useRef, useState, useCallback, RefObject } from "react";
import { Packet, StopReason } from "@/app/chat/services/streamingModels";
import { FullChatState } from "@/app/chat/message/messageComponents/interfaces";
import { FeedbackType } from "@/app/chat/interfaces";
import { useCurrentChatState } from "@/app/chat/stores/useChatSessionStore";
import { handleCopy } from "@/app/chat/message/copyingUtils";
import { BlinkingDot } from "@/app/chat/message/BlinkingDot";
import { isDisplayPacket, isToolPacket } from "@/app/chat/services/packetUtils";
import { useMessageSwitching } from "@/app/chat/message/messageComponents/hooks/useMessageSwitching";
import MultiToolRenderer from "@/app/chat/message/messageComponents/MultiToolRenderer";
import { RendererComponent } from "@/app/chat/message/messageComponents/renderMessageComponent";
import { usePacketProcessor } from "@/app/chat/message/messageComponents/usePacketProcessor";
import MessageToolbar from "@/app/chat/message/messageComponents/MessageToolbar";
import AgentAvatar from "@/refresh-components/avatars/AgentAvatar";
import { LlmDescriptor, LlmManager } from "@/lib/hooks";
import { Message } from "@/app/chat/interfaces";
import { useCreateModal } from "@/refresh-components/contexts/ModalContext";
import FeedbackModal, {
FeedbackModalProps,
} from "@/app/chat/components/modal/FeedbackModal";
import { usePopup } from "@/components/admin/connectors/Popup";
import { useFeedbackController } from "@/app/chat/hooks/useFeedbackController";
import Text from "@/refresh-components/texts/Text";
import { useTripleClickSelect } from "@/hooks/useTripleClickSelect";
// Type for the regeneration factory function passed from ChatUI
export type RegenerationFactory = (regenerationRequest: {
messageId: number;
parentMessage: Message;
forceSearch?: boolean;
}) => (modelOverride: LlmDescriptor) => Promise<void>;
export interface AgentMessageProps {
rawPackets: Packet[];
chatState: FullChatState;
nodeId: number;
messageId?: number;
currentFeedback?: FeedbackType | null;
llmManager: LlmManager | null;
otherMessagesCanSwitchTo?: number[];
onMessageSelection?: (nodeId: number) => void;
// Stable regeneration callback - takes (parentMessage) and returns a function that takes (modelOverride)
onRegenerate?: RegenerationFactory;
// Parent message needed to construct regeneration request
parentMessage?: Message | null;
}
// TODO: Consider more robust comparisons:
// - `rawPackets.length` assumes packets are append-only. Could compare the last
// packet or use a shallow comparison if packets can be modified in place.
// - `chatState.docs`, `chatState.citations`, and `otherMessagesCanSwitchTo` use
// reference equality. Shallow array/object comparison would be more robust if
// these are recreated with the same values.
function arePropsEqual(
prev: AgentMessageProps,
next: AgentMessageProps
): boolean {
return (
prev.nodeId === next.nodeId &&
prev.messageId === next.messageId &&
prev.currentFeedback === next.currentFeedback &&
prev.rawPackets.length === next.rawPackets.length &&
prev.chatState.assistant?.id === next.chatState.assistant?.id &&
prev.chatState.docs === next.chatState.docs &&
prev.chatState.citations === next.chatState.citations &&
prev.chatState.overriddenModel === next.chatState.overriddenModel &&
prev.chatState.researchType === next.chatState.researchType &&
prev.otherMessagesCanSwitchTo === next.otherMessagesCanSwitchTo &&
prev.onRegenerate === next.onRegenerate &&
prev.parentMessage?.messageId === next.parentMessage?.messageId &&
prev.llmManager?.isLoadingProviders === next.llmManager?.isLoadingProviders
// Skip: chatState.regenerate, chatState.setPresentingDocument,
// most of llmManager, onMessageSelection (function/object props)
);
}
const AgentMessage = React.memo(function AgentMessage({
rawPackets,
chatState,
nodeId,
messageId,
currentFeedback,
llmManager,
otherMessagesCanSwitchTo,
onMessageSelection,
onRegenerate,
parentMessage,
}: AgentMessageProps) {
const markdownRef = useRef<HTMLDivElement>(null);
const finalAnswerRef = useRef<HTMLDivElement>(null);
const handleTripleClick = useTripleClickSelect(markdownRef);
const { popup, setPopup } = usePopup();
const { handleFeedbackChange } = useFeedbackController({ setPopup });
// Get the global chat state to know if we're currently streaming
const globalChatState = useCurrentChatState();
const modal = useCreateModal();
const [feedbackModalProps, setFeedbackModalProps] =
useState<FeedbackModalProps | null>(null);
// Helper to check if feedback button should be in transient state
const isFeedbackTransient = useCallback(
(feedbackType: "like" | "dislike") => {
const hasCurrentFeedback = currentFeedback === feedbackType;
if (!modal.isOpen) return hasCurrentFeedback;
const isModalForThisFeedback =
feedbackModalProps?.feedbackType === feedbackType;
const isModalForThisMessage = feedbackModalProps?.messageId === messageId;
return (
hasCurrentFeedback || (isModalForThisFeedback && isModalForThisMessage)
);
},
[currentFeedback, modal, feedbackModalProps, messageId]
);
// Handler for feedback button clicks with toggle logic
const handleFeedbackClick = useCallback(
async (clickedFeedback: "like" | "dislike") => {
if (!messageId) {
console.error("Cannot provide feedback - message has no messageId");
return;
}
// Toggle logic
if (currentFeedback === clickedFeedback) {
// Clicking same button - remove feedback
await handleFeedbackChange(messageId, null);
}
// Clicking like (will automatically clear dislike if it was active).
// Check if we need modal for positive feedback.
else if (clickedFeedback === "like") {
const predefinedOptions =
process.env.NEXT_PUBLIC_POSITIVE_PREDEFINED_FEEDBACK_OPTIONS;
if (predefinedOptions && predefinedOptions.trim()) {
// Open modal for positive feedback
setFeedbackModalProps({
feedbackType: "like",
messageId,
});
modal.toggle(true);
} else {
// No modal needed - just submit like (this replaces any existing feedback)
await handleFeedbackChange(messageId, "like");
}
}
// Clicking dislike (will automatically clear like if it was active).
// Always open modal for dislike.
else {
setFeedbackModalProps({
feedbackType: "dislike",
messageId,
});
modal.toggle(true);
}
},
[messageId, currentFeedback, handleFeedbackChange, modal]
);
// Use the packet processor hook for all streaming packet processing
const {
citations,
citationMap,
documentMap,
groupedPackets,
finalAnswerComing,
stopPacketSeen,
stopReason,
expectedBranchesPerTurn,
displayComplete,
setDisplayComplete,
setFinalAnswerComingOverride,
} = usePacketProcessor(rawPackets, nodeId);
// Keep a ref to finalAnswerComing for use in callbacks (to read latest value)
const finalAnswerComingRef = useRef(finalAnswerComing);
finalAnswerComingRef.current = finalAnswerComing;
// Create a chatState that uses streaming citations for immediate rendering
// This merges the prop citations with streaming citations, preferring streaming ones
const effectiveChatState: FullChatState = {
...chatState,
citations: {
...chatState.citations,
...citationMap,
},
};
// Message switching logic
const {
currentMessageInd,
includeMessageSwitcher,
getPreviousMessage,
getNextMessage,
} = useMessageSwitching({
nodeId,
otherMessagesCanSwitchTo,
onMessageSelection,
});
// Return a list of rendered message components, one for each ind
return (
<>
{popup}
<modal.Provider>
<FeedbackModal {...feedbackModalProps!} />
</modal.Provider>
<div
// for e2e tests
data-testid={displayComplete ? "onyx-ai-message" : undefined}
className="flex items-start pb-5 md:pt-5"
>
<AgentAvatar agent={chatState.assistant} size={24} />
{/* w-full ensures the MultiToolRenderer non-expanded state takes up the full width */}
<div className="max-w-message-max break-words pl-4 w-full">
<div
ref={markdownRef}
className="overflow-x-visible max-w-content-max focus:outline-none select-text cursor-text"
onMouseDown={handleTripleClick}
onCopy={(e) => {
if (markdownRef.current) {
handleCopy(e, markdownRef as RefObject<HTMLDivElement>);
}
}}
>
{groupedPackets.length === 0 ? (
// Show blinking dot when no content yet, or stopped message if user cancelled
stopReason === StopReason.USER_CANCELLED ? (
<Text as="p" secondaryBody text04>
User has stopped generation
</Text>
) : (
<BlinkingDot addMargin />
)
) : (
(() => {
// Simple split: tools vs non-tools
const toolGroups = groupedPackets.filter(
(group) =>
group.packets[0] && isToolPacket(group.packets[0], false)
);
// Non-tools include messages AND image generation
const displayGroups =
finalAnswerComing || toolGroups.length === 0
? groupedPackets.filter(
(group) =>
group.packets[0] && isDisplayPacket(group.packets[0])
)
: [];
return (
<>
{/* Render tool groups in multi-tool renderer */}
{toolGroups.length > 0 && (
<MultiToolRenderer
packetGroups={toolGroups}
chatState={effectiveChatState}
isComplete={finalAnswerComing}
isFinalAnswerComing={finalAnswerComingRef.current}
stopPacketSeen={stopPacketSeen}
stopReason={stopReason}
isStreaming={globalChatState === "streaming"}
onAllToolsDisplayed={() =>
setFinalAnswerComingOverride(true)
}
expectedBranchesPerTurn={expectedBranchesPerTurn}
/>
)}
{/* Render all display groups (messages + image generation) in main area */}
<div ref={finalAnswerRef}>
{displayGroups.map((displayGroup, index) => (
<RendererComponent
key={`${displayGroup.turn_index}-${displayGroup.tab_index}`}
packets={displayGroup.packets}
chatState={effectiveChatState}
onComplete={() => {
// if we've reverted to final answer not coming, don't set display complete
// this happens when using claude and a tool calling packet comes after
// some message packets
// Only mark complete on the last display group
if (
finalAnswerComingRef.current &&
index === displayGroups.length - 1
) {
setDisplayComplete(true);
}
}}
animate={false}
stopPacketSeen={stopPacketSeen}
stopReason={stopReason}
>
{({ content }) => <div>{content}</div>}
</RendererComponent>
))}
{/* Show stopped message when user cancelled and no display content */}
{displayGroups.length === 0 &&
stopReason === StopReason.USER_CANCELLED && (
<Text as="p" secondaryBody text04>
User has stopped generation
</Text>
)}
</div>
</>
);
})()
)}
</div>
{/* Feedback buttons - only show when streaming is complete */}
{stopPacketSeen && displayComplete && (
<MessageToolbar
nodeId={nodeId}
messageId={messageId}
includeMessageSwitcher={includeMessageSwitcher}
currentMessageInd={currentMessageInd}
otherMessagesCanSwitchTo={otherMessagesCanSwitchTo}
getPreviousMessage={getPreviousMessage}
getNextMessage={getNextMessage}
onMessageSelection={onMessageSelection}
rawPackets={rawPackets}
finalAnswerRef={finalAnswerRef}
currentFeedback={currentFeedback}
onFeedbackClick={handleFeedbackClick}
isFeedbackTransient={isFeedbackTransient}
onRegenerate={onRegenerate}
parentMessage={parentMessage}
llmManager={llmManager}
currentModelName={chatState.overriddenModel}
citations={citations}
documentMap={documentMap}
/>
)}
</div>
</div>
</>
);
}, arePropsEqual);
export default AgentMessage;

View File

@@ -0,0 +1,193 @@
import React, { RefObject } from "react";
import { Packet, StreamingCitation } from "@/app/chat/services/streamingModels";
import { FeedbackType } from "@/app/chat/interfaces";
import { OnyxDocument } from "@/lib/search/interfaces";
import { TooltipGroup } from "@/components/tooltip/CustomTooltip";
import {
useChatSessionStore,
useDocumentSidebarVisible,
useSelectedNodeForDocDisplay,
} from "@/app/chat/stores/useChatSessionStore";
import {
handleCopy,
convertMarkdownTablesToTsv,
} from "@/app/chat/message/copyingUtils";
import { getTextContent } from "@/app/chat/services/packetUtils";
import { removeThinkingTokens } from "@/app/chat/services/thinkingTokens";
import MessageSwitcher from "@/app/chat/message/MessageSwitcher";
import CitedSourcesToggle from "@/app/chat/message/messageComponents/CitedSourcesToggle";
import IconButton from "@/refresh-components/buttons/IconButton";
import CopyIconButton from "@/refresh-components/buttons/CopyIconButton";
import LLMPopover from "@/refresh-components/popovers/LLMPopover";
import { parseLlmDescriptor } from "@/lib/llm/utils";
import { LlmDescriptor, LlmManager } from "@/lib/hooks";
import { Message } from "@/app/chat/interfaces";
import { SvgThumbsDown, SvgThumbsUp } from "@opal/icons";
import { RegenerationFactory } from "./AgentMessage";
export interface MessageToolbarProps {
// Message identification
nodeId: number;
messageId?: number;
// Message switching
includeMessageSwitcher: boolean;
currentMessageInd: number | null | undefined;
otherMessagesCanSwitchTo?: number[];
getPreviousMessage: () => number | undefined;
getNextMessage: () => number | undefined;
onMessageSelection?: (nodeId: number) => void;
// Copy functionality
rawPackets: Packet[];
finalAnswerRef: RefObject<HTMLDivElement | null>;
// Feedback
currentFeedback?: FeedbackType | null;
onFeedbackClick: (feedbackType: "like" | "dislike") => void;
isFeedbackTransient: (feedbackType: "like" | "dislike") => boolean;
// Regeneration
onRegenerate?: RegenerationFactory;
parentMessage?: Message | null;
llmManager: LlmManager | null;
currentModelName?: string;
// Citations
citations: StreamingCitation[];
documentMap: Map<string, OnyxDocument>;
}
export default function MessageToolbar({
nodeId,
messageId,
includeMessageSwitcher,
currentMessageInd,
otherMessagesCanSwitchTo,
getPreviousMessage,
getNextMessage,
onMessageSelection,
rawPackets,
finalAnswerRef,
currentFeedback,
onFeedbackClick,
isFeedbackTransient,
onRegenerate,
parentMessage,
llmManager,
currentModelName,
citations,
documentMap,
}: MessageToolbarProps) {
// Document sidebar state - managed internally to reduce prop drilling
const documentSidebarVisible = useDocumentSidebarVisible();
const selectedMessageForDocDisplay = useSelectedNodeForDocDisplay();
const updateCurrentDocumentSidebarVisible = useChatSessionStore(
(state) => state.updateCurrentDocumentSidebarVisible
);
const updateCurrentSelectedNodeForDocDisplay = useChatSessionStore(
(state) => state.updateCurrentSelectedNodeForDocDisplay
);
return (
<div className="flex md:flex-row justify-between items-center w-full mt-1 transition-transform duration-300 ease-in-out transform opacity-100">
<TooltipGroup>
<div className="flex items-center gap-x-0.5">
{includeMessageSwitcher && (
<div className="-mx-1">
<MessageSwitcher
currentPage={(currentMessageInd ?? 0) + 1}
totalPages={otherMessagesCanSwitchTo?.length || 0}
handlePrevious={() => {
const prevMessage = getPreviousMessage();
if (prevMessage !== undefined && onMessageSelection) {
onMessageSelection(prevMessage);
}
}}
handleNext={() => {
const nextMessage = getNextMessage();
if (nextMessage !== undefined && onMessageSelection) {
onMessageSelection(nextMessage);
}
}}
/>
</div>
)}
<CopyIconButton
getCopyText={() =>
convertMarkdownTablesToTsv(
removeThinkingTokens(getTextContent(rawPackets)) as string
)
}
getHtmlContent={() => finalAnswerRef.current?.innerHTML || ""}
tertiary
data-testid="AgentMessage/copy-button"
/>
<IconButton
icon={SvgThumbsUp}
onClick={() => onFeedbackClick("like")}
tertiary
transient={isFeedbackTransient("like")}
tooltip={
currentFeedback === "like" ? "Remove Like" : "Good Response"
}
data-testid="AgentMessage/like-button"
/>
<IconButton
icon={SvgThumbsDown}
onClick={() => onFeedbackClick("dislike")}
tertiary
transient={isFeedbackTransient("dislike")}
tooltip={
currentFeedback === "dislike" ? "Remove Dislike" : "Bad Response"
}
data-testid="AgentMessage/dislike-button"
/>
{onRegenerate &&
messageId !== undefined &&
parentMessage &&
llmManager && (
<div data-testid="AgentMessage/regenerate">
<LLMPopover
llmManager={llmManager}
currentModelName={currentModelName}
onSelect={(modelName) => {
const llmDescriptor = parseLlmDescriptor(modelName);
const regenerator = onRegenerate({
messageId,
parentMessage,
});
regenerator(llmDescriptor);
}}
folded
/>
</div>
)}
{nodeId && (citations.length > 0 || documentMap.size > 0) && (
<CitedSourcesToggle
citations={citations}
documentMap={documentMap}
nodeId={nodeId}
onToggle={(toggledNodeId) => {
// Toggle sidebar if clicking on the same message
if (
selectedMessageForDocDisplay === toggledNodeId &&
documentSidebarVisible
) {
updateCurrentDocumentSidebarVisible(false);
updateCurrentSelectedNodeForDocDisplay(null);
} else {
updateCurrentSelectedNodeForDocDisplay(toggledNodeId);
updateCurrentDocumentSidebarVisible(true);
}
}}
/>
)}
</div>
</TooltipGroup>
</div>
);
}

View File

@@ -0,0 +1,377 @@
import {
Packet,
PacketType,
StreamingCitation,
StopReason,
CitationInfo,
SearchToolDocumentsDelta,
FetchToolDocuments,
TopLevelBranching,
Stop,
} from "@/app/chat/services/streamingModels";
import { CitationMap } from "@/app/chat/interfaces";
import { OnyxDocument } from "@/lib/search/interfaces";
import { isActualToolCallPacket } from "@/app/chat/services/packetUtils";
import { parseToolKey } from "@/app/chat/message/messageComponents/toolDisplayHelpers";
// Re-export parseToolKey for consumers that import from this module
export { parseToolKey };
// ============================================================================
// Types
// ============================================================================
export interface ProcessorState {
nodeId: number;
lastProcessedIndex: number;
// Citations
citations: StreamingCitation[];
seenCitationDocIds: Set<string>;
citationMap: CitationMap;
// Documents
documentMap: Map<string, OnyxDocument>;
// Packet grouping
groupedPacketsMap: Map<string, Packet[]>;
seenGroupKeys: Set<string>;
groupKeysWithSectionEnd: Set<string>;
expectedBranches: Map<number, number>;
// Streaming status
finalAnswerComing: boolean;
stopPacketSeen: boolean;
stopReason: StopReason | undefined;
}
export interface GroupedPacket {
turn_index: number;
tab_index: number;
packets: Packet[];
}
export interface ProcessorResult {
citations: StreamingCitation[];
citationMap: CitationMap;
documentMap: Map<string, OnyxDocument>;
groupedPackets: GroupedPacket[];
finalAnswerComing: boolean;
stopPacketSeen: boolean;
stopReason: StopReason | undefined;
expectedBranchesPerTurn: Map<number, number>;
}
// ============================================================================
// State Creation
// ============================================================================
export function createInitialState(nodeId: number): ProcessorState {
return {
nodeId,
lastProcessedIndex: 0,
citations: [],
seenCitationDocIds: new Set(),
citationMap: {},
documentMap: new Map(),
groupedPacketsMap: new Map(),
seenGroupKeys: new Set(),
groupKeysWithSectionEnd: new Set(),
expectedBranches: new Map(),
finalAnswerComing: false,
stopPacketSeen: false,
stopReason: undefined,
};
}
// ============================================================================
// Helper Functions
// ============================================================================
function getGroupKey(packet: Packet): string {
const turnIndex = packet.placement.turn_index;
const tabIndex = packet.placement.tab_index ?? 0;
return `${turnIndex}-${tabIndex}`;
}
function injectSectionEnd(state: ProcessorState, groupKey: string): void {
if (state.groupKeysWithSectionEnd.has(groupKey)) {
return; // Already has SECTION_END
}
const { turn_index, tab_index } = parseToolKey(groupKey);
const syntheticPacket: Packet = {
placement: { turn_index, tab_index },
obj: { type: PacketType.SECTION_END },
};
const existingGroup = state.groupedPacketsMap.get(groupKey);
if (existingGroup) {
existingGroup.push(syntheticPacket);
}
state.groupKeysWithSectionEnd.add(groupKey);
}
/**
* Content packet types that indicate a group has meaningful content to display
*/
const CONTENT_PACKET_TYPES = [
PacketType.MESSAGE_START,
PacketType.SEARCH_TOOL_START,
PacketType.IMAGE_GENERATION_TOOL_START,
PacketType.PYTHON_TOOL_START,
PacketType.CUSTOM_TOOL_START,
PacketType.FETCH_TOOL_START,
PacketType.REASONING_START,
PacketType.DEEP_RESEARCH_PLAN_START,
PacketType.RESEARCH_AGENT_START,
];
function hasContentPackets(packets: Packet[]): boolean {
return packets.some((packet) =>
CONTENT_PACKET_TYPES.includes(packet.obj.type as PacketType)
);
}
/**
* Packet types that indicate final answer content is coming
*/
const FINAL_ANSWER_PACKET_TYPES = [
PacketType.MESSAGE_START,
PacketType.MESSAGE_DELTA,
PacketType.IMAGE_GENERATION_TOOL_START,
PacketType.IMAGE_GENERATION_TOOL_DELTA,
PacketType.PYTHON_TOOL_START,
PacketType.PYTHON_TOOL_DELTA,
];
// ============================================================================
// Packet Handlers
// ============================================================================
function handleTopLevelBranching(state: ProcessorState, packet: Packet): void {
const branchingPacket = packet.obj as TopLevelBranching;
state.expectedBranches.set(
packet.placement.turn_index,
branchingPacket.num_parallel_branches
);
}
function handleTurnTransition(state: ProcessorState, packet: Packet): void {
const currentTurnIndex = packet.placement.turn_index;
// Get all previous turn indices from seen group keys
const previousTurnIndices = new Set(
Array.from(state.seenGroupKeys).map((key) => parseToolKey(key).turn_index)
);
const isNewTurnIndex = !previousTurnIndices.has(currentTurnIndex);
// If we see a new turn_index (not just tab_index), inject SECTION_END for previous groups
if (isNewTurnIndex && state.seenGroupKeys.size > 0) {
state.seenGroupKeys.forEach((prevGroupKey) => {
if (!state.groupKeysWithSectionEnd.has(prevGroupKey)) {
injectSectionEnd(state, prevGroupKey);
}
});
}
}
function handleCitationPacket(state: ProcessorState, packet: Packet): void {
if (packet.obj.type !== PacketType.CITATION_INFO) {
return;
}
const citationInfo = packet.obj as CitationInfo;
// Add to citation map immediately for rendering
state.citationMap[citationInfo.citation_number] = citationInfo.document_id;
// Also add to citations array for CitedSourcesToggle (deduplicated)
if (!state.seenCitationDocIds.has(citationInfo.document_id)) {
state.seenCitationDocIds.add(citationInfo.document_id);
state.citations.push({
citation_num: citationInfo.citation_number,
document_id: citationInfo.document_id,
});
}
}
function handleDocumentPacket(state: ProcessorState, packet: Packet): void {
if (packet.obj.type === PacketType.SEARCH_TOOL_DOCUMENTS_DELTA) {
const docDelta = packet.obj as SearchToolDocumentsDelta;
if (docDelta.documents) {
for (const doc of docDelta.documents) {
if (doc.document_id) {
state.documentMap.set(doc.document_id, doc);
}
}
}
} else if (packet.obj.type === PacketType.FETCH_TOOL_DOCUMENTS) {
const fetchDocuments = packet.obj as FetchToolDocuments;
if (fetchDocuments.documents) {
for (const doc of fetchDocuments.documents) {
if (doc.document_id) {
state.documentMap.set(doc.document_id, doc);
}
}
}
}
}
function handleStreamingStatusPacket(
state: ProcessorState,
packet: Packet
): void {
// Check if final answer is coming
if (FINAL_ANSWER_PACKET_TYPES.includes(packet.obj.type as PacketType)) {
state.finalAnswerComing = true;
}
}
function handleStopPacket(state: ProcessorState, packet: Packet): void {
if (packet.obj.type !== PacketType.STOP || state.stopPacketSeen) {
return;
}
state.stopPacketSeen = true;
// Extract and store the stop reason
const stopPacket = packet.obj as Stop;
state.stopReason = stopPacket.stop_reason;
// Inject SECTION_END for all group keys that don't have one
state.seenGroupKeys.forEach((groupKey) => {
if (!state.groupKeysWithSectionEnd.has(groupKey)) {
injectSectionEnd(state, groupKey);
}
});
}
function handleToolAfterMessagePacket(
state: ProcessorState,
packet: Packet
): void {
// Handles case where we get a Message packet from Claude, and then tool
// calling packets. We use isActualToolCallPacket instead of isToolPacket
// to exclude reasoning packets - reasoning is just the model thinking,
// not an actual tool call that would produce new content.
if (
state.finalAnswerComing &&
!state.stopPacketSeen &&
isActualToolCallPacket(packet)
) {
state.finalAnswerComing = false;
}
}
function addPacketToGroup(
state: ProcessorState,
packet: Packet,
groupKey: string
): void {
const existingGroup = state.groupedPacketsMap.get(groupKey);
if (existingGroup) {
existingGroup.push(packet);
} else {
state.groupedPacketsMap.set(groupKey, [packet]);
}
}
// ============================================================================
// Main Processing Function
// ============================================================================
function processPacket(state: ProcessorState, packet: Packet): void {
if (!packet) return;
// Handle TopLevelBranching packets - these tell us how many parallel branches to expect
if (packet.obj.type === PacketType.TOP_LEVEL_BRANCHING) {
handleTopLevelBranching(state, packet);
// Don't add this packet to any group, it's just metadata
return;
}
// Handle turn transitions (inject SECTION_END for previous groups)
handleTurnTransition(state, packet);
// Track group key
const groupKey = getGroupKey(packet);
state.seenGroupKeys.add(groupKey);
// Track SECTION_END and ERROR packets (both indicate completion)
if (
packet.obj.type === PacketType.SECTION_END ||
packet.obj.type === PacketType.ERROR
) {
state.groupKeysWithSectionEnd.add(groupKey);
}
// Add packet to group
addPacketToGroup(state, packet, groupKey);
// Handle specific packet types
handleCitationPacket(state, packet);
handleDocumentPacket(state, packet);
handleStreamingStatusPacket(state, packet);
handleStopPacket(state, packet);
handleToolAfterMessagePacket(state, packet);
}
export function processPackets(
state: ProcessorState,
rawPackets: Packet[]
): ProcessorState {
// Handle reset (packets array shrunk - upstream replaced with shorter list)
if (state.lastProcessedIndex > rawPackets.length) {
state = createInitialState(state.nodeId);
}
// Process only new packets
for (let i = state.lastProcessedIndex; i < rawPackets.length; i++) {
const packet = rawPackets[i];
if (packet) {
processPacket(state, packet);
}
}
state.lastProcessedIndex = rawPackets.length;
return state;
}
// ============================================================================
// Result Derivation
// ============================================================================
export function getResult(state: ProcessorState): ProcessorResult {
// Build sorted, filtered grouped packets array
// Clone packet arrays to ensure referential changes so downstream memo hooks update
const groupedPackets = Array.from(state.groupedPacketsMap.entries())
.map(([key, packets]) => {
const { turn_index, tab_index } = parseToolKey(key);
return {
turn_index,
tab_index,
packets: [...packets],
};
})
.filter(({ packets }) => hasContentPackets(packets))
.sort((a, b) => {
if (a.turn_index !== b.turn_index) {
return a.turn_index - b.turn_index;
}
return a.tab_index - b.tab_index;
});
return {
citations: state.citations,
citationMap: state.citationMap,
documentMap: state.documentMap,
groupedPackets,
finalAnswerComing: state.finalAnswerComing,
stopPacketSeen: state.stopPacketSeen,
stopReason: state.stopReason,
expectedBranchesPerTurn: state.expectedBranches,
};
}

View File

@@ -0,0 +1,100 @@
import { useRef, useState } from "react";
import { Packet } from "@/app/chat/services/streamingModels";
import {
ProcessorState,
ProcessorResult,
createInitialState,
processPackets,
getResult,
} from "./packetProcessor";
export interface UsePacketProcessorResult extends ProcessorResult {
displayComplete: boolean;
setDisplayComplete: (value: boolean) => void;
// UI override for finalAnswerComing - used when all tools have been displayed
// and we want to show the message content regardless of packet state
setFinalAnswerComingOverride: (value: boolean) => void;
}
/**
* Custom hook for processing streaming packets in AgentMessage.
*
* This hook encapsulates all packet processing logic:
* - Incremental processing (only processes new packets)
* - Citation extraction and deduplication
* - Document accumulation
* - Packet grouping by turn_index and tab_index
* - Streaming status tracking (finalAnswerComing, stopPacketSeen, stopReason)
* - Synthetic SECTION_END injection for graceful tool completion
*
* The hook uses a ref to store the processor state, which allows for:
* - Synchronous access during render
* - Persistence across renders without triggering re-renders
*
* Re-renders are triggered by:
* - Parent updating rawPackets prop (most common)
* - Child calling setDisplayComplete (for animation completion)
*
* @param rawPackets - Array of packets from the streaming response
* @param nodeId - Unique identifier for the message node (used for reset detection)
* @returns ProcessorResult with derived values plus displayComplete state
*/
export function usePacketProcessor(
rawPackets: Packet[],
nodeId: number
): UsePacketProcessorResult {
const stateRef = useRef<ProcessorState>(createInitialState(nodeId));
// displayComplete needs state because it's set from child callback (onComplete)
// which needs to trigger a re-render to show feedback buttons
const [displayComplete, setDisplayComplete] = useState(false);
// UI override for finalAnswerComing - when all tools have been displayed,
// we want to show the message content regardless of packet-derived state
const [finalAnswerComingOverride, setFinalAnswerComingOverride] =
useState(false);
// Reset on nodeId change
if (stateRef.current.nodeId !== nodeId) {
stateRef.current = createInitialState(nodeId);
setDisplayComplete(false);
setFinalAnswerComingOverride(false);
}
// Track previous state to detect transitions
const prevLastProcessed = stateRef.current.lastProcessedIndex;
const prevFinalAnswerComing = stateRef.current.finalAnswerComing;
// Process packets (incremental - only processes new packets)
stateRef.current = processPackets(stateRef.current, rawPackets);
// Detect tool-after-message scenario: if finalAnswerComing went from true to false,
// it means a tool packet arrived after message packets. Reset displayComplete to
// prevent showing feedback buttons while tools are still executing.
if (prevFinalAnswerComing && !stateRef.current.finalAnswerComing) {
setDisplayComplete(false);
}
// Detect stream reset (packets array shrunk) - processPackets resets state internally,
// but we also need to reset React state that lives outside the processor
if (prevLastProcessed > rawPackets.length) {
setDisplayComplete(false);
setFinalAnswerComingOverride(false);
}
// Get derived result
const result = getResult(stateRef.current);
// Combine packet-derived finalAnswerComing with UI override
// UI override is set when all tools have been displayed
const finalAnswerComing =
result.finalAnswerComing || finalAnswerComingOverride;
return {
...result,
finalAnswerComing,
displayComplete,
setDisplayComplete,
setFinalAnswerComingOverride,
};
}