Compare commits

...

2 Commits

Author SHA1 Message Date
trial-danswer
82bdc012cf Got results showing in UI 2025-07-01 18:49:44 -07:00
trial-danswer
94282945f9 Seeing outputs from multiple sources. 2025-07-01 15:27:32 -07:00
11 changed files with 971 additions and 10 deletions

View File

@@ -169,13 +169,20 @@ def process_individual_deep_search(
logger.debug("DivCon Step A2 - Object Source Research - completed for an object")
if state.research_object_results:
research_object_results = state.research_object_results
else:
research_object_results = []
research_object_results.append(
{
"object": object.replace("::", ":: ").capitalize(),
"results": object_research_results,
}
)
return ResearchObjectUpdate(
research_object_results=[
{
"object": object.replace("::", ":: ").capitalize(),
"results": object_research_results,
}
],
research_object_results=research_object_results,
log_messages=[
get_langgraph_node_log_string(
graph_component="main",

View File

@@ -0,0 +1,113 @@
# Naomi Orchestration Graph
The Naomi orchestration graph is designed to execute both basic and kb_search graphs with a decision node that determines which graph to run based on the current execution stage.
## Overview
The graph follows this flow:
1. **Decision Node**: Determines which stage to execute (BASIC, KB_SEARCH, or COMPLETE)
2. **Basic Graph Execution**: Runs the basic search graph and stores results
3. **KB Search Graph Execution**: Runs the knowledge graph search and stores results
4. **Finalization**: Combines results from both graphs into a final answer
5. **Loop Back**: Returns to decision node to determine next steps
## Architecture
### State Management
- `NaomiState`: Main state that tracks current stage and stores results from both graphs
- `ExecutionStage`: Enum defining the three stages (BASIC, KB_SEARCH, COMPLETE)
- Results from each graph are stored separately and combined at the end
### Nodes
- `decision_node`: Determines which graph to execute next
- `execute_basic_graph`: Runs the basic search graph
- `execute_kb_search_graph`: Runs the knowledge graph search
- `finalize_results`: Combines results and creates final answer
### Conditional Edges
- `route_after_decision`: Routes to appropriate execution node based on current stage
- `should_continue`: Determines if graph should continue or end
## Usage
### Basic Usage
```python
from onyx.agents.agent_search.naomi import naomi_graph_builder, NaomiInput
# Build the graph
graph = naomi_graph_builder()
compiled_graph = graph.compile()
# Create input
input_data = NaomiInput()
# Execute with proper config
result = compiled_graph.invoke(input_data, config={"metadata": {"config": config}})
# Access results
final_answer = result.get("final_answer")
basic_results = result.get("basic_results")
kb_search_results = result.get("kb_search_results")
```
### Testing
Run the test script to see the graph in action:
```bash
cd backend/onyx/agents/agent_search/naomi
python test_naomi.py
```
## Execution Flow
1. **Start**: Graph begins with decision node
2. **Stage Check**: Decision node checks current stage
3. **Graph Execution**:
- If BASIC stage: Execute basic graph
- If KB_SEARCH stage: Execute kb_search graph
- If COMPLETE stage: Finalize results
4. **Result Storage**: Results are stored in state
5. **Loop**: Return to decision node
6. **Completion**: When COMPLETE stage has results, graph ends
## Customization
### Modifying Decision Logic
Edit the `decision_node` function in `nodes.py` to implement custom decision logic based on:
- Query complexity
- Previous results quality
- User preferences
- Performance requirements
### Adding New Stages
1. Add new stage to `ExecutionStage` enum in `states.py`
2. Update decision logic in `nodes.py`
3. Add routing logic in `conditional_edges.py`
4. Update graph builder in `graph_builder.py`
### Config Integration
The current implementation is simplified. In production, you'll need to:
- Pass proper config with LLMs and database session
- Handle authentication and permissions
- Implement proper error handling and retry logic
- Add monitoring and logging
## Dependencies
- `langgraph`: For graph construction and execution
- `pydantic`: For state validation
- `onyx.agents.agent_search.basic`: Basic search graph
- `onyx.agents.agent_search.kb_search`: Knowledge graph search
- `onyx.utils.logger`: For logging
## File Structure
```
naomi/
├── __init__.py # Package initialization
├── states.py # State definitions
├── nodes.py # Node functions
├── conditional_edges.py # Conditional edge logic
├── graph_builder.py # Main graph builder
├── test_naomi.py # Test script
└── README.md # This file
```

View File

@@ -0,0 +1,101 @@
from langgraph.graph import END
from langgraph.graph import START
from langgraph.graph import StateGraph
from onyx.agents.agent_search.basic.states import BasicInput
from onyx.agents.agent_search.basic.states import BasicState
from onyx.agents.agent_search.naomi.states import BasicShortOutput
from onyx.agents.agent_search.orchestration.nodes.call_tool import call_tool
from onyx.agents.agent_search.orchestration.nodes.choose_tool import choose_tool
from onyx.agents.agent_search.orchestration.nodes.prepare_tool_input import (
prepare_tool_input,
)
from onyx.utils.logger import setup_logger
logger = setup_logger()
def basic_graph_builder() -> StateGraph:
graph = StateGraph(
state_schema=BasicState, input=BasicInput, output=BasicShortOutput
)
### Add nodes ###
graph.add_node(
node="prepare_tool_input",
action=prepare_tool_input,
)
graph.add_node(
node="choose_tool",
action=choose_tool,
)
graph.add_node(
node="call_tool",
action=call_tool,
)
graph.add_node(
node="return_output",
action=return_output,
)
### Add edges ###
graph.add_edge(start_key=START, end_key="prepare_tool_input")
graph.add_edge(start_key="prepare_tool_input", end_key="choose_tool")
graph.add_conditional_edges(
"choose_tool", should_continue, ["call_tool", "return_output"]
)
graph.add_edge(
start_key="call_tool",
end_key="return_output",
)
graph.add_edge(
start_key="return_output",
end_key=END,
)
return graph
def should_continue(state: BasicState) -> str:
return (
# If there are no tool calls, basic graph already streamed the answer
END
if state.tool_choice is None
else "call_tool"
)
def return_output(state: BasicState) -> BasicShortOutput:
return BasicShortOutput(
tool_call_output=state.tool_call_output,
tool_choice=state.tool_choice,
)
if __name__ == "__main__":
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.context.search.models import SearchRequest
from onyx.llm.factory import get_default_llms
from onyx.agents.agent_search.shared_graph_utils.utils import get_test_config
graph = basic_graph_builder()
compiled_graph = graph.compile()
input = BasicInput(unused=True)
primary_llm, fast_llm = get_default_llms()
with get_session_with_current_tenant() as db_session:
config, _ = get_test_config(
db_session=db_session,
primary_llm=primary_llm,
fast_llm=fast_llm,
search_request=SearchRequest(query="How does onyx use FastAPI?"),
)
compiled_graph.invoke(input, config={"metadata": {"config": config}})

View File

@@ -0,0 +1,16 @@
from onyx.agents.agent_search.naomi.states import ExecutionStage
from onyx.agents.agent_search.naomi.states import NaomiState
def route_after_decision(state: NaomiState) -> str:
"""
Route to the appropriate node based on the current stage after decision node.
"""
if state.current_stage == ExecutionStage.BASIC:
return "execute_basic_graph"
if state.current_stage == ExecutionStage.KB_SEARCH:
return "execute_kb_search_graph"
elif state.current_stage == ExecutionStage.COMPLETE:
return "finalize_results"
else:
raise ValueError(f"Invalid stage: {state.current_stage}")

View File

@@ -0,0 +1,113 @@
from langgraph.graph import END
from langgraph.graph import START
from langgraph.graph import StateGraph
from onyx.agents.agent_search.naomi.conditional_edges import (
route_after_decision,
)
from onyx.agents.agent_search.naomi.nodes.nodes import decision_node
from onyx.agents.agent_search.naomi.nodes.nodes import execute_basic_graph
from onyx.agents.agent_search.naomi.nodes.nodes import execute_kb_search_graph
from onyx.agents.agent_search.naomi.nodes.nodes import finalize_results
from onyx.agents.agent_search.naomi.states import NaomiInput
from onyx.agents.agent_search.naomi.states import NaomiState
from onyx.utils.logger import setup_logger
logger = setup_logger()
def naomi_graph_builder() -> StateGraph:
"""
LangGraph graph builder for the naomi orchestration process.
This graph orchestrates both basic and kb_search graphs with a decision node.
"""
graph = StateGraph(
state_schema=NaomiState,
input=NaomiInput,
)
### Add nodes ###
graph.add_node(
"decision_node",
decision_node,
)
graph.add_node(
"execute_basic_graph",
execute_basic_graph,
)
graph.add_node(
"execute_kb_search_graph",
execute_kb_search_graph,
)
graph.add_node(
"finalize_results",
finalize_results,
)
### Add edges ###
# Start with decision node
graph.add_edge(start_key=START, end_key="decision_node")
# Decision node routes to appropriate execution node
graph.add_conditional_edges(
"decision_node",
route_after_decision,
["execute_basic_graph", "execute_kb_search_graph", "finalize_results"],
)
# After executing basic graph, go back to decision node
graph.add_edge(
start_key="execute_basic_graph",
end_key="decision_node",
)
# After executing kb_search graph, go back to decision node
graph.add_edge(
start_key="execute_kb_search_graph",
end_key="decision_node",
)
# After finalizing results, check if we should continue or end
graph.add_edge(start_key="finalize_results", end_key=END)
return graph
if __name__ == "__main__":
# Test the graph
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.context.search.models import SearchRequest
from onyx.llm.factory import get_default_llms
from onyx.agents.agent_search.shared_graph_utils.utils import get_test_config
graph = naomi_graph_builder()
compiled_graph = graph.compile()
# Create test input
input_data = NaomiInput()
# Get LLMs and config
primary_llm, fast_llm = get_default_llms()
with get_session_with_current_tenant() as db_session:
config, _ = get_test_config(
db_session=db_session,
primary_llm=primary_llm,
fast_llm=fast_llm,
search_request=SearchRequest(query="How does onyx use FastAPI?"),
)
# Execute the graph
result = compiled_graph.invoke(
input_data, config={"metadata": {"config": config}}
)
print("Final Answer:", result.get("final_answer", ""))
print("Basic Results:", result.get("basic_results", {}))
print("KB Search Results:", result.get("kb_search_results", {}))

View File

@@ -0,0 +1,131 @@
from langgraph.graph import END
from langgraph.graph import START
from langgraph.graph import StateGraph
from onyx.agents.agent_search.kb_search.conditional_edges import (
research_individual_object,
)
from onyx.agents.agent_search.kb_search.conditional_edges import simple_vs_search
from onyx.agents.agent_search.kb_search.nodes.a1_extract_ert import extract_ert
from onyx.agents.agent_search.kb_search.nodes.a2_analyze import analyze
from onyx.agents.agent_search.kb_search.nodes.a3_generate_simple_sql import (
generate_simple_sql,
)
from onyx.agents.agent_search.kb_search.nodes.b1_construct_deep_search_filters import (
construct_deep_search_filters,
)
from onyx.agents.agent_search.kb_search.nodes.b2p_process_individual_deep_search import (
process_individual_deep_search,
)
from onyx.agents.agent_search.kb_search.nodes.b2s_filtered_search import filtered_search
from onyx.agents.agent_search.kb_search.nodes.b3_consolidate_individual_deep_search import (
consolidate_individual_deep_search,
)
from onyx.agents.agent_search.kb_search.nodes.c1_process_kg_only_answers import (
process_kg_only_answers,
)
from onyx.agents.agent_search.kb_search.states import MainInput
from onyx.agents.agent_search.kb_search.states import MainState
from onyx.agents.agent_search.naomi.nodes.generate_answer import generate_answer
from onyx.agents.agent_search.naomi.states import KBShortOutput
from onyx.utils.logger import setup_logger
logger = setup_logger()
def kb_graph_builder() -> StateGraph:
"""
LangGraph graph builder for the knowledge graph search process.
"""
graph = StateGraph(
state_schema=MainState,
input=MainInput,
output=KBShortOutput,
)
### Add nodes ###
graph.add_node(
"extract_ert",
extract_ert,
)
graph.add_node(
"generate_simple_sql",
generate_simple_sql,
)
graph.add_node(
"filtered_search",
filtered_search,
)
graph.add_node(
"analyze",
analyze,
)
graph.add_node(
"construct_deep_search_filters",
construct_deep_search_filters,
)
graph.add_node(
"process_individual_deep_search",
process_individual_deep_search,
)
graph.add_node(
"consolidate_individual_deep_search",
consolidate_individual_deep_search,
)
graph.add_node("process_kg_only_answers", process_kg_only_answers)
graph.add_node(
"generate_answer",
generate_answer,
)
### Add edges ###
graph.add_edge(start_key=START, end_key="extract_ert")
graph.add_edge(
start_key="extract_ert",
end_key="analyze",
)
graph.add_edge(
start_key="analyze",
end_key="generate_simple_sql",
)
graph.add_conditional_edges("generate_simple_sql", simple_vs_search)
graph.add_edge(start_key="process_kg_only_answers", end_key="generate_answer")
graph.add_conditional_edges(
source="construct_deep_search_filters",
path=research_individual_object,
path_map=["process_individual_deep_search", "filtered_search"],
)
graph.add_edge(
start_key="process_individual_deep_search",
end_key="consolidate_individual_deep_search",
)
graph.add_edge(
start_key="consolidate_individual_deep_search", end_key="generate_answer"
)
graph.add_edge(
start_key="filtered_search",
end_key="generate_answer",
)
graph.add_edge(start_key="generate_answer", end_key=END)
return graph

View File

@@ -0,0 +1,163 @@
from datetime import datetime
from typing import cast
from langchain_core.runnables import RunnableConfig
from langgraph.types import StreamWriter
from onyx.access.access import get_acl_for_user
from onyx.agents.agent_search.kb_search.graph_utils import rename_entities_in_answer
from onyx.agents.agent_search.kb_search.graph_utils import stream_write_close_steps
from onyx.agents.agent_search.kb_search.ops import research
from onyx.agents.agent_search.kb_search.states import MainOutput
from onyx.agents.agent_search.kb_search.states import MainState
from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.naomi.states import KBShortOutput
from onyx.agents.agent_search.shared_graph_utils.calculations import (
get_answer_generation_documents,
)
from onyx.agents.agent_search.shared_graph_utils.utils import relevance_from_docs
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.chat.models import ExtendedToolResponse
from onyx.configs.kg_configs import KG_RESEARCH_NUM_RETRIEVED_DOCS
from onyx.context.search.enums import SearchType
from onyx.context.search.models import InferenceSection
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.prompts.kg_prompts import OUTPUT_FORMAT_NO_EXAMPLES_PROMPT
from onyx.prompts.kg_prompts import OUTPUT_FORMAT_NO_OVERALL_ANSWER_PROMPT
from onyx.tools.tool_implementations.search.search_tool import IndexFilters
from onyx.tools.tool_implementations.search.search_tool import SearchQueryInfo
from onyx.tools.tool_implementations.search.search_tool import yield_search_responses
from onyx.utils.logger import setup_logger
logger = setup_logger()
def generate_answer(
state: MainState, config: RunnableConfig, writer: StreamWriter = lambda _: None
) -> MainOutput:
"""
LangGraph node to start the agentic search process.
"""
datetime.now()
graph_config = cast(GraphConfig, config["metadata"]["config"])
question = graph_config.inputs.prompt_builder.raw_user_query
user = (
graph_config.tooling.search_tool.user
if graph_config.tooling.search_tool
else None
)
if not user:
raise ValueError("User is not set")
search_tool = graph_config.tooling.search_tool
if search_tool is None:
raise ValueError("Search tool is not set")
# Close out previous streams of steps
# DECLARE STEPS DONE
stream_write_close_steps(writer)
## MAIN ANSWER
# identify whether documents have already been retrieved
retrieved_docs: list[InferenceSection] = []
for step_result in state.step_results:
retrieved_docs += step_result.verified_reranked_documents
# if still needed, get a search done and send the results to the UI
if not retrieved_docs and state.source_document_results:
assert graph_config.tooling.search_tool is not None
retrieved_docs = cast(
list[InferenceSection],
research(
question=question,
kg_entities=[],
kg_relationships=[],
kg_sources=state.source_document_results[
:KG_RESEARCH_NUM_RETRIEVED_DOCS
],
search_tool=graph_config.tooling.search_tool,
kg_chunk_id_zero_only=True,
inference_sections_only=True,
),
)
answer_generation_documents = get_answer_generation_documents(
relevant_docs=retrieved_docs,
context_documents=retrieved_docs,
original_question_docs=retrieved_docs,
max_docs=KG_RESEARCH_NUM_RETRIEVED_DOCS,
)
relevance_list = relevance_from_docs(
answer_generation_documents.streaming_documents
)
assert graph_config.tooling.search_tool is not None
with get_session_with_current_tenant() as graph_db_session:
user_acl = list(get_acl_for_user(user, graph_db_session))
for tool_response in yield_search_responses(
query=question,
get_retrieved_sections=lambda: answer_generation_documents.context_documents,
get_final_context_sections=lambda: answer_generation_documents.context_documents,
search_query_info=SearchQueryInfo(
predicted_search=SearchType.KEYWORD,
# acl here is empty, because the searach alrady happened and
# we are streaming out the results.
final_filters=IndexFilters(access_control_list=user_acl),
recency_bias_multiplier=1.0,
),
get_section_relevance=lambda: relevance_list,
search_tool=graph_config.tooling.search_tool,
):
write_custom_event(
"tool_response",
ExtendedToolResponse(
id=tool_response.id,
response=tool_response.response,
level=0,
level_question_num=0, # 0, 0 is the base question
),
writer,
)
# if deep path was taken:
consolidated_research_object_results_str = (
state.consolidated_research_object_results_str
)
# reference_results_str = (
# state.reference_results_str
# ) # will not be part of LLM. Manually added to the answer
# if simple path was taken:
introductory_answer = state.query_results_data_str # from simple answer path only
if consolidated_research_object_results_str:
research_results = consolidated_research_object_results_str
else:
research_results = ""
if introductory_answer:
output_format_prompt = rename_entities_in_answer(introductory_answer)
elif research_results and consolidated_research_object_results_str:
output_format_prompt = rename_entities_in_answer(consolidated_research_object_results_str)
elif research_results and not consolidated_research_object_results_str:
output_format_prompt = rename_entities_in_answer(research_results)
elif consolidated_research_object_results_str:
output_format_prompt = rename_entities_in_answer(research_results)
else:
raise ValueError("No research results or introductory answer provided")
return KBShortOutput(
output=output_format_prompt,
)

View File

@@ -0,0 +1,244 @@
import json
from typing import cast
from langchain_core.runnables.config import RunnableConfig
from langgraph.types import StreamWriter
from onyx.agents.agent_search.basic.states import BasicInput
from onyx.agents.agent_search.basic.utils import process_llm_stream
from onyx.agents.agent_search.kb_search.states import MainInput as KBMainInput
from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.naomi.basic_graph_builder import basic_graph_builder
from onyx.agents.agent_search.naomi.kb_graph_builder import kb_graph_builder
from onyx.agents.agent_search.naomi.states import ExecutionStage
from onyx.agents.agent_search.naomi.states import NaomiState
from onyx.utils.logger import setup_logger
logger = setup_logger()
def decision_node(state: NaomiState, config: RunnableConfig) -> NaomiState:
"""
Decision node that determines which graph to execute next based on current stage.
"""
logger.info(f"Decision node: current stage is {state.current_stage}")
structured_output_format= \
{
"type": "json_schema",
"json_schema": {
"name": "execution_stage_enum",
"schema": {
"type": "object",
"properties": {
"execution_stage": {
"type": "string",
"enum": ["BASIC", "KB_SEARCH", "COMPLETE"],
"description": "The current execution stage of the process"
}
},
"required": ["execution_stage"],
"additionalProperties": False
},
"strict": True
}
}
# For now, we'll implement a simple flow: BASIC -> KB_SEARCH -> COMPLETE
# This can be enhanced with more sophisticated logic later
agent_config = cast(GraphConfig, config["metadata"]["config"])
llm = agent_config.tooling.primary_llm
prompt = f"""You are an agent with access to a basic search tool and a knowledge graph search tool. You are given a question and you need to decide whether you neeed to use a tool and if so, which tool to use. Both tools have access to the same base information (which is currently Linear tickets for the Onyx team). However, basic search performs information retrieval from these sources while using the knowledge graph takes the query and performs a relevant SQL query in a postgres database with tables of identified entities and their relationships. Select the tool you believe will be most effective for answering the question. If both tools could be effective, you should use the basic search tool. Return "BASIC" if you need to use the basic search tool, "KB_SEARCH" if you need to use the knowledge graph search tool, and "COMPLETE" if you don't need to use any tool. After each tool is used, you will see the results of from your previous tool calls. You should use the results of your previous tool calls to decide which tool to use next. "COMPLETE" should only be returned if you have already used at least one of the tools and have no more information to gather.
Here is the query: <START_OF_QUERY>{agent_config.inputs.prompt_builder.raw_user_query}<END_OF_QUERY>
This is the information you've already collected on previous steps:
Basic Search Results:
{[result["short_output"] for result in state.basic_results]}
----------------------------------
KB Search Results:
{state.kb_search_results}
----------------------------------
Please make your selection:
"""
output = llm.invoke(
prompt,
structured_response_format=structured_output_format
)
print(prompt)
# When using structured_response_format, the content is a JSON string
# that needs to be parsed
parsed_content = json.loads(output.content)
execution_stage = parsed_content["execution_stage"]
print("EXECUTION STAGE:", execution_stage)
if execution_stage == ExecutionStage.BASIC:
# If basic stage is complete, move to kb_search
state.current_stage = ExecutionStage.BASIC
logger.info("Moving to BASIC stage")
elif execution_stage == ExecutionStage.KB_SEARCH:
state.current_stage = ExecutionStage.KB_SEARCH
logger.info("Moving to KB_SEARCH stage")
elif execution_stage == ExecutionStage.COMPLETE:
state.current_stage = ExecutionStage.COMPLETE
logger.info("Moving to COMPLETE stage")
else:
raise ValueError(f"Invalid stage: {state.current_stage}")
return state
def execute_basic_graph(state: NaomiState, config: RunnableConfig) -> NaomiState:
"""
Execute the basic graph and store results in state.
"""
logger.info("Executing basic graph")
try:
# Extract GraphConfig from RunnableConfig
graph_config = cast(GraphConfig, config["metadata"]["config"])
# Get the basic graph
basic_graph = basic_graph_builder()
compiled_basic_graph = basic_graph.compile()
basic_input = BasicInput(unused=True)
# Execute the basic graph directly with invoke
result = compiled_basic_graph.invoke(
basic_input, config={"metadata": {"config": graph_config}}
)
tool_call_output = result["tool_call_output"]
tool_call_responses = tool_call_output.tool_call_responses
tool_choice = result["tool_choice"]
if tool_choice is None:
raise ValueError("Tool choice is None")
tool = tool_choice.tool
# Store results in state
# The result should contain the tool_call_chunk from BasicOutput
state.basic_results.append(
{
"output": tool.build_tool_message_content(*tool_call_responses),
"short_output": tool_call_output.tool_call_summary,
"status": "completed",
}
)
logger.info("Basic graph execution completed")
state.log_messages.append("Basic graph execution completed")
except Exception as e:
logger.error(f"Error executing basic graph: {e}")
state.basic_results.append({"error": str(e), "status": "failed"})
return state
def execute_kb_search_graph(state: NaomiState, config: RunnableConfig) -> NaomiState:
"""
Execute the kb_search graph and store results in state.
"""
logger.info("Executing kb_search graph")
try:
# Extract GraphConfig from RunnableConfig
graph_config = cast(GraphConfig, config["metadata"]["config"])
# Get the kb_search graph
kb_graph = kb_graph_builder()
compiled_kb_graph = kb_graph.compile()
input = KBMainInput(log_messages=[])
# Execute the kb_search graph directly with invoke
result = compiled_kb_graph.invoke(
input, config={"metadata": {"config": graph_config}}
)
# Store results in state
if out := result["output"]:
state.kb_search_results.append({"output": out, "status": "completed"})
else:
state.kb_search_results.append(
{"error": "No results found", "status": "failed"}
)
logger.info("KB search graph execution completed")
except Exception as e:
logger.error(f"Error executing kb_search graph: {e}")
state.kb_search_results.append({"error": str(e), "status": "failed"})
return state
def finalize_results(state: NaomiState, config: RunnableConfig, writer: StreamWriter = lambda _: None) -> NaomiState:
"""
Combine results from both graphs and create final answer.
"""
logger.info("Finalizing results")
# Combine results from both graphs
basic_answers = []
kb_answers = []
for basic_result in state.basic_results:
if basic_result.get("status") == "completed":
basic_answers.append(basic_result.get("short_output", ""))
for kb_result in state.kb_search_results:
if kb_result.get("status") == "completed":
kb_answers.append(kb_result.get("output", ""))
agent_config = cast(GraphConfig, config["metadata"]["config"])
llm = agent_config.tooling.primary_llm
prompt = f"""You are an agent with access to a basic search tool and a knowledge graph search tool. You are given a question and you need to decide whether you neeed to use a tool and if so, which tool to use. Both tools have access to the same base information (which is currently Linear tickets for the Onyx team). However, basic search performs information retrieval from these sources while using the knowledge graph takes the query and performs a relevant SQL query in a postgres database with tables of identified entities and their relationships. The following is the information you've already collected on previous steps:
Basic Search Results:
{basic_answers}
----------------------------------
KB Search Results:
{kb_answers}
----------------------------------
Here is the original user query: <START_OF_QUERY>{agent_config.inputs.prompt_builder.raw_user_query}<END_OF_QUERY>
Please answer the question based on the information you've collected:
"""
output = llm.invoke(prompt)
state.final_answer = output.content
stream = llm.stream(prompt)
process_llm_stream(
stream,
True,
writer,
)
return state

View File

@@ -0,0 +1,46 @@
from enum import Enum
from operator import add
from typing import Annotated
from typing import Any
from typing import TypedDict
from onyx.agents.agent_search.basic.states import BasicState
from onyx.agents.agent_search.core_state import CoreState
from onyx.agents.agent_search.kb_search.states import MainState as KBMainState
from onyx.agents.agent_search.orchestration.states import ToolCallOutput
from onyx.agents.agent_search.orchestration.states import ToolChoice
class ExecutionStage(str,Enum):
"""Stages of execution for the orchestration graph"""
BASIC = "BASIC"
KB_SEARCH = "KB_SEARCH"
COMPLETE = "COMPLETE"
class NaomiInput(CoreState):
"""Input state for the naomi orchestration graph"""
class NaomiOutput(TypedDict):
log_messages: list[str]
class KBShortOutput(TypedDict):
output: str
class BasicShortOutput(TypedDict):
tool_call_output: ToolCallOutput | None = None
tool_choice: ToolChoice | None = None
class NaomiState(NaomiInput, KBMainState, BasicState):
"""Main state for the naomi orchestration graph"""
current_stage: ExecutionStage = ExecutionStage.BASIC
input_state: NaomiInput | None = None
basic_results: Annotated[list[dict[str, Any]], add] = []
kb_search_results: Annotated[list[dict[str, Any]], add] = []
final_answer: str | None = None

View File

@@ -21,6 +21,8 @@ from onyx.agents.agent_search.deep_search.main.states import (
from onyx.agents.agent_search.kb_search.graph_builder import kb_graph_builder
from onyx.agents.agent_search.kb_search.states import MainInput as KBMainInput
from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.naomi.graph_builder import naomi_graph_builder
from onyx.agents.agent_search.naomi.states import NaomiInput
from onyx.agents.agent_search.shared_graph_utils.utils import get_test_config
from onyx.chat.models import AgentAnswerPiece
from onyx.chat.models import AnswerPacket
@@ -90,7 +92,7 @@ def _parse_agent_event(
def manage_sync_streaming(
compiled_graph: CompiledStateGraph,
config: GraphConfig,
graph_input: BasicInput | MainInput | DCMainInput | KBMainInput,
graph_input: BasicInput | MainInput | DCMainInput | KBMainInput | NaomiInput,
) -> Iterable[StreamEvent]:
message_id = config.persistence.message_id if config.persistence else None
for event in compiled_graph.stream(
@@ -104,7 +106,7 @@ def manage_sync_streaming(
def run_graph(
compiled_graph: CompiledStateGraph,
config: GraphConfig,
input: BasicInput | MainInput | DCMainInput | KBMainInput,
input: BasicInput | MainInput | DCMainInput | KBMainInput | NaomiInput,
) -> AnswerStream:
for event in manage_sync_streaming(
@@ -176,6 +178,25 @@ def run_dc_graph(
return run_graph(compiled_graph, config, input)
def run_naomi_graph(
config: GraphConfig,
) -> AnswerStream:
"""
Run the naomi orchestration graph that executes both basic and kb_search graphs.
"""
graph = naomi_graph_builder()
compiled_graph = graph.compile()
input = NaomiInput(log_messages=[])
# Send tool call kickoff for frontend
yield ToolCallKickoff(
tool_name="naomi_orchestration",
tool_args={"query": config.inputs.prompt_builder.raw_user_query},
)
yield from run_graph(compiled_graph, config, input)
if __name__ == "__main__":
for _ in range(1):
query_start_time = datetime.now()

View File

@@ -12,7 +12,7 @@ from onyx.agents.agent_search.models import GraphTooling
from onyx.agents.agent_search.run_graph import run_agent_search_graph
from onyx.agents.agent_search.run_graph import run_basic_graph
from onyx.agents.agent_search.run_graph import run_dc_graph
from onyx.agents.agent_search.run_graph import run_kb_graph
from onyx.agents.agent_search.run_graph import run_naomi_graph
from onyx.chat.models import AgentAnswerPiece
from onyx.chat.models import AnswerPacket
from onyx.chat.models import AnswerStream
@@ -69,6 +69,7 @@ class Answer:
is_connected: Callable[[], bool] | None = None,
use_agentic_search: bool = False,
) -> None:
self.is_connected: Callable[[], bool] | None = is_connected
self._processed_stream: list[AnswerPacket] | None = None
self._is_cancelled = False
@@ -143,9 +144,12 @@ class Answer:
and self.graph_config.behavior.kg_config_settings.KG_ENABLED
and self.graph_config.inputs.persona.name.startswith("KG Beta")
):
run_langgraph = run_kb_graph
# run_langgraph = run_kb_graph
run_langgraph = run_naomi_graph
print("WE ARE RUNNING KB GRAPH!")
elif self.graph_config.behavior.use_agentic_search:
run_langgraph = run_agent_search_graph
print("WE ARE RUNNING AGENT SEARCH GRAPH!")
elif (
self.graph_config.inputs.persona
and USE_DIV_CON_AGENT
@@ -154,8 +158,10 @@ class Answer:
)
):
run_langgraph = run_dc_graph
print("WE ARE RUNNING DIV CON GRAPH!")
else:
run_langgraph = run_basic_graph
print("WE ARE RUNNING BASIC GRAPH!")
stream = run_langgraph(
self.graph_config,