mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-03-29 19:42:41 +00:00
Compare commits
2 Commits
cli/v0.2.0
...
naomi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
82bdc012cf | ||
|
|
94282945f9 |
@@ -169,13 +169,20 @@ def process_individual_deep_search(
|
||||
|
||||
logger.debug("DivCon Step A2 - Object Source Research - completed for an object")
|
||||
|
||||
if state.research_object_results:
|
||||
research_object_results = state.research_object_results
|
||||
else:
|
||||
research_object_results = []
|
||||
|
||||
research_object_results.append(
|
||||
{
|
||||
"object": object.replace("::", ":: ").capitalize(),
|
||||
"results": object_research_results,
|
||||
}
|
||||
)
|
||||
|
||||
return ResearchObjectUpdate(
|
||||
research_object_results=[
|
||||
{
|
||||
"object": object.replace("::", ":: ").capitalize(),
|
||||
"results": object_research_results,
|
||||
}
|
||||
],
|
||||
research_object_results=research_object_results,
|
||||
log_messages=[
|
||||
get_langgraph_node_log_string(
|
||||
graph_component="main",
|
||||
|
||||
113
backend/onyx/agents/agent_search/naomi/README.md
Normal file
113
backend/onyx/agents/agent_search/naomi/README.md
Normal file
@@ -0,0 +1,113 @@
|
||||
# Naomi Orchestration Graph
|
||||
|
||||
The Naomi orchestration graph is designed to execute both basic and kb_search graphs with a decision node that determines which graph to run based on the current execution stage.
|
||||
|
||||
## Overview
|
||||
|
||||
The graph follows this flow:
|
||||
1. **Decision Node**: Determines which stage to execute (BASIC, KB_SEARCH, or COMPLETE)
|
||||
2. **Basic Graph Execution**: Runs the basic search graph and stores results
|
||||
3. **KB Search Graph Execution**: Runs the knowledge graph search and stores results
|
||||
4. **Finalization**: Combines results from both graphs into a final answer
|
||||
5. **Loop Back**: Returns to decision node to determine next steps
|
||||
|
||||
## Architecture
|
||||
|
||||
### State Management
|
||||
- `NaomiState`: Main state that tracks current stage and stores results from both graphs
|
||||
- `ExecutionStage`: Enum defining the three stages (BASIC, KB_SEARCH, COMPLETE)
|
||||
- Results from each graph are stored separately and combined at the end
|
||||
|
||||
### Nodes
|
||||
- `decision_node`: Determines which graph to execute next
|
||||
- `execute_basic_graph`: Runs the basic search graph
|
||||
- `execute_kb_search_graph`: Runs the knowledge graph search
|
||||
- `finalize_results`: Combines results and creates final answer
|
||||
|
||||
### Conditional Edges
|
||||
- `route_after_decision`: Routes to appropriate execution node based on current stage
|
||||
- `should_continue`: Determines if graph should continue or end
|
||||
|
||||
## Usage
|
||||
|
||||
### Basic Usage
|
||||
```python
|
||||
from onyx.agents.agent_search.naomi import naomi_graph_builder, NaomiInput
|
||||
|
||||
# Build the graph
|
||||
graph = naomi_graph_builder()
|
||||
compiled_graph = graph.compile()
|
||||
|
||||
# Create input
|
||||
input_data = NaomiInput()
|
||||
|
||||
# Execute with proper config
|
||||
result = compiled_graph.invoke(input_data, config={"metadata": {"config": config}})
|
||||
|
||||
# Access results
|
||||
final_answer = result.get("final_answer")
|
||||
basic_results = result.get("basic_results")
|
||||
kb_search_results = result.get("kb_search_results")
|
||||
```
|
||||
|
||||
### Testing
|
||||
Run the test script to see the graph in action:
|
||||
```bash
|
||||
cd backend/onyx/agents/agent_search/naomi
|
||||
python test_naomi.py
|
||||
```
|
||||
|
||||
## Execution Flow
|
||||
|
||||
1. **Start**: Graph begins with decision node
|
||||
2. **Stage Check**: Decision node checks current stage
|
||||
3. **Graph Execution**:
|
||||
- If BASIC stage: Execute basic graph
|
||||
- If KB_SEARCH stage: Execute kb_search graph
|
||||
- If COMPLETE stage: Finalize results
|
||||
4. **Result Storage**: Results are stored in state
|
||||
5. **Loop**: Return to decision node
|
||||
6. **Completion**: When COMPLETE stage has results, graph ends
|
||||
|
||||
## Customization
|
||||
|
||||
### Modifying Decision Logic
|
||||
Edit the `decision_node` function in `nodes.py` to implement custom decision logic based on:
|
||||
- Query complexity
|
||||
- Previous results quality
|
||||
- User preferences
|
||||
- Performance requirements
|
||||
|
||||
### Adding New Stages
|
||||
1. Add new stage to `ExecutionStage` enum in `states.py`
|
||||
2. Update decision logic in `nodes.py`
|
||||
3. Add routing logic in `conditional_edges.py`
|
||||
4. Update graph builder in `graph_builder.py`
|
||||
|
||||
### Config Integration
|
||||
The current implementation is simplified. In production, you'll need to:
|
||||
- Pass proper config with LLMs and database session
|
||||
- Handle authentication and permissions
|
||||
- Implement proper error handling and retry logic
|
||||
- Add monitoring and logging
|
||||
|
||||
## Dependencies
|
||||
|
||||
- `langgraph`: For graph construction and execution
|
||||
- `pydantic`: For state validation
|
||||
- `onyx.agents.agent_search.basic`: Basic search graph
|
||||
- `onyx.agents.agent_search.kb_search`: Knowledge graph search
|
||||
- `onyx.utils.logger`: For logging
|
||||
|
||||
## File Structure
|
||||
|
||||
```
|
||||
naomi/
|
||||
├── __init__.py # Package initialization
|
||||
├── states.py # State definitions
|
||||
├── nodes.py # Node functions
|
||||
├── conditional_edges.py # Conditional edge logic
|
||||
├── graph_builder.py # Main graph builder
|
||||
├── test_naomi.py # Test script
|
||||
└── README.md # This file
|
||||
```
|
||||
101
backend/onyx/agents/agent_search/naomi/basic_graph_builder.py
Normal file
101
backend/onyx/agents/agent_search/naomi/basic_graph_builder.py
Normal file
@@ -0,0 +1,101 @@
|
||||
from langgraph.graph import END
|
||||
from langgraph.graph import START
|
||||
from langgraph.graph import StateGraph
|
||||
|
||||
from onyx.agents.agent_search.basic.states import BasicInput
|
||||
from onyx.agents.agent_search.basic.states import BasicState
|
||||
from onyx.agents.agent_search.naomi.states import BasicShortOutput
|
||||
from onyx.agents.agent_search.orchestration.nodes.call_tool import call_tool
|
||||
from onyx.agents.agent_search.orchestration.nodes.choose_tool import choose_tool
|
||||
from onyx.agents.agent_search.orchestration.nodes.prepare_tool_input import (
|
||||
prepare_tool_input,
|
||||
)
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def basic_graph_builder() -> StateGraph:
|
||||
graph = StateGraph(
|
||||
state_schema=BasicState, input=BasicInput, output=BasicShortOutput
|
||||
)
|
||||
|
||||
### Add nodes ###
|
||||
|
||||
graph.add_node(
|
||||
node="prepare_tool_input",
|
||||
action=prepare_tool_input,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
node="choose_tool",
|
||||
action=choose_tool,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
node="call_tool",
|
||||
action=call_tool,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
node="return_output",
|
||||
action=return_output,
|
||||
)
|
||||
|
||||
### Add edges ###
|
||||
|
||||
graph.add_edge(start_key=START, end_key="prepare_tool_input")
|
||||
|
||||
graph.add_edge(start_key="prepare_tool_input", end_key="choose_tool")
|
||||
|
||||
graph.add_conditional_edges(
|
||||
"choose_tool", should_continue, ["call_tool", "return_output"]
|
||||
)
|
||||
|
||||
graph.add_edge(
|
||||
start_key="call_tool",
|
||||
end_key="return_output",
|
||||
)
|
||||
|
||||
graph.add_edge(
|
||||
start_key="return_output",
|
||||
end_key=END,
|
||||
)
|
||||
|
||||
return graph
|
||||
|
||||
|
||||
def should_continue(state: BasicState) -> str:
|
||||
return (
|
||||
# If there are no tool calls, basic graph already streamed the answer
|
||||
END
|
||||
if state.tool_choice is None
|
||||
else "call_tool"
|
||||
)
|
||||
|
||||
|
||||
def return_output(state: BasicState) -> BasicShortOutput:
|
||||
return BasicShortOutput(
|
||||
tool_call_output=state.tool_call_output,
|
||||
tool_choice=state.tool_choice,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from onyx.db.engine.sql_engine import get_session_with_current_tenant
|
||||
from onyx.context.search.models import SearchRequest
|
||||
from onyx.llm.factory import get_default_llms
|
||||
from onyx.agents.agent_search.shared_graph_utils.utils import get_test_config
|
||||
|
||||
graph = basic_graph_builder()
|
||||
compiled_graph = graph.compile()
|
||||
input = BasicInput(unused=True)
|
||||
primary_llm, fast_llm = get_default_llms()
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
config, _ = get_test_config(
|
||||
db_session=db_session,
|
||||
primary_llm=primary_llm,
|
||||
fast_llm=fast_llm,
|
||||
search_request=SearchRequest(query="How does onyx use FastAPI?"),
|
||||
)
|
||||
compiled_graph.invoke(input, config={"metadata": {"config": config}})
|
||||
16
backend/onyx/agents/agent_search/naomi/conditional_edges.py
Normal file
16
backend/onyx/agents/agent_search/naomi/conditional_edges.py
Normal file
@@ -0,0 +1,16 @@
|
||||
from onyx.agents.agent_search.naomi.states import ExecutionStage
|
||||
from onyx.agents.agent_search.naomi.states import NaomiState
|
||||
|
||||
|
||||
def route_after_decision(state: NaomiState) -> str:
|
||||
"""
|
||||
Route to the appropriate node based on the current stage after decision node.
|
||||
"""
|
||||
if state.current_stage == ExecutionStage.BASIC:
|
||||
return "execute_basic_graph"
|
||||
if state.current_stage == ExecutionStage.KB_SEARCH:
|
||||
return "execute_kb_search_graph"
|
||||
elif state.current_stage == ExecutionStage.COMPLETE:
|
||||
return "finalize_results"
|
||||
else:
|
||||
raise ValueError(f"Invalid stage: {state.current_stage}")
|
||||
113
backend/onyx/agents/agent_search/naomi/graph_builder.py
Normal file
113
backend/onyx/agents/agent_search/naomi/graph_builder.py
Normal file
@@ -0,0 +1,113 @@
|
||||
from langgraph.graph import END
|
||||
from langgraph.graph import START
|
||||
from langgraph.graph import StateGraph
|
||||
|
||||
from onyx.agents.agent_search.naomi.conditional_edges import (
|
||||
route_after_decision,
|
||||
)
|
||||
from onyx.agents.agent_search.naomi.nodes.nodes import decision_node
|
||||
from onyx.agents.agent_search.naomi.nodes.nodes import execute_basic_graph
|
||||
from onyx.agents.agent_search.naomi.nodes.nodes import execute_kb_search_graph
|
||||
from onyx.agents.agent_search.naomi.nodes.nodes import finalize_results
|
||||
from onyx.agents.agent_search.naomi.states import NaomiInput
|
||||
from onyx.agents.agent_search.naomi.states import NaomiState
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def naomi_graph_builder() -> StateGraph:
|
||||
"""
|
||||
LangGraph graph builder for the naomi orchestration process.
|
||||
This graph orchestrates both basic and kb_search graphs with a decision node.
|
||||
"""
|
||||
|
||||
graph = StateGraph(
|
||||
state_schema=NaomiState,
|
||||
input=NaomiInput,
|
||||
)
|
||||
|
||||
### Add nodes ###
|
||||
|
||||
graph.add_node(
|
||||
"decision_node",
|
||||
decision_node,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
"execute_basic_graph",
|
||||
execute_basic_graph,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
"execute_kb_search_graph",
|
||||
execute_kb_search_graph,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
"finalize_results",
|
||||
finalize_results,
|
||||
)
|
||||
|
||||
### Add edges ###
|
||||
|
||||
# Start with decision node
|
||||
graph.add_edge(start_key=START, end_key="decision_node")
|
||||
|
||||
# Decision node routes to appropriate execution node
|
||||
graph.add_conditional_edges(
|
||||
"decision_node",
|
||||
route_after_decision,
|
||||
["execute_basic_graph", "execute_kb_search_graph", "finalize_results"],
|
||||
)
|
||||
|
||||
# After executing basic graph, go back to decision node
|
||||
graph.add_edge(
|
||||
start_key="execute_basic_graph",
|
||||
end_key="decision_node",
|
||||
)
|
||||
|
||||
# After executing kb_search graph, go back to decision node
|
||||
graph.add_edge(
|
||||
start_key="execute_kb_search_graph",
|
||||
end_key="decision_node",
|
||||
)
|
||||
|
||||
# After finalizing results, check if we should continue or end
|
||||
graph.add_edge(start_key="finalize_results", end_key=END)
|
||||
|
||||
return graph
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Test the graph
|
||||
from onyx.db.engine.sql_engine import get_session_with_current_tenant
|
||||
from onyx.context.search.models import SearchRequest
|
||||
from onyx.llm.factory import get_default_llms
|
||||
from onyx.agents.agent_search.shared_graph_utils.utils import get_test_config
|
||||
|
||||
graph = naomi_graph_builder()
|
||||
compiled_graph = graph.compile()
|
||||
|
||||
# Create test input
|
||||
input_data = NaomiInput()
|
||||
|
||||
# Get LLMs and config
|
||||
primary_llm, fast_llm = get_default_llms()
|
||||
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
config, _ = get_test_config(
|
||||
db_session=db_session,
|
||||
primary_llm=primary_llm,
|
||||
fast_llm=fast_llm,
|
||||
search_request=SearchRequest(query="How does onyx use FastAPI?"),
|
||||
)
|
||||
|
||||
# Execute the graph
|
||||
result = compiled_graph.invoke(
|
||||
input_data, config={"metadata": {"config": config}}
|
||||
)
|
||||
|
||||
print("Final Answer:", result.get("final_answer", ""))
|
||||
print("Basic Results:", result.get("basic_results", {}))
|
||||
print("KB Search Results:", result.get("kb_search_results", {}))
|
||||
131
backend/onyx/agents/agent_search/naomi/kb_graph_builder.py
Normal file
131
backend/onyx/agents/agent_search/naomi/kb_graph_builder.py
Normal file
@@ -0,0 +1,131 @@
|
||||
from langgraph.graph import END
|
||||
from langgraph.graph import START
|
||||
from langgraph.graph import StateGraph
|
||||
|
||||
from onyx.agents.agent_search.kb_search.conditional_edges import (
|
||||
research_individual_object,
|
||||
)
|
||||
from onyx.agents.agent_search.kb_search.conditional_edges import simple_vs_search
|
||||
from onyx.agents.agent_search.kb_search.nodes.a1_extract_ert import extract_ert
|
||||
from onyx.agents.agent_search.kb_search.nodes.a2_analyze import analyze
|
||||
from onyx.agents.agent_search.kb_search.nodes.a3_generate_simple_sql import (
|
||||
generate_simple_sql,
|
||||
)
|
||||
from onyx.agents.agent_search.kb_search.nodes.b1_construct_deep_search_filters import (
|
||||
construct_deep_search_filters,
|
||||
)
|
||||
from onyx.agents.agent_search.kb_search.nodes.b2p_process_individual_deep_search import (
|
||||
process_individual_deep_search,
|
||||
)
|
||||
from onyx.agents.agent_search.kb_search.nodes.b2s_filtered_search import filtered_search
|
||||
from onyx.agents.agent_search.kb_search.nodes.b3_consolidate_individual_deep_search import (
|
||||
consolidate_individual_deep_search,
|
||||
)
|
||||
from onyx.agents.agent_search.kb_search.nodes.c1_process_kg_only_answers import (
|
||||
process_kg_only_answers,
|
||||
)
|
||||
from onyx.agents.agent_search.kb_search.states import MainInput
|
||||
from onyx.agents.agent_search.kb_search.states import MainState
|
||||
from onyx.agents.agent_search.naomi.nodes.generate_answer import generate_answer
|
||||
from onyx.agents.agent_search.naomi.states import KBShortOutput
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def kb_graph_builder() -> StateGraph:
|
||||
"""
|
||||
LangGraph graph builder for the knowledge graph search process.
|
||||
"""
|
||||
|
||||
graph = StateGraph(
|
||||
state_schema=MainState,
|
||||
input=MainInput,
|
||||
output=KBShortOutput,
|
||||
)
|
||||
|
||||
### Add nodes ###
|
||||
|
||||
graph.add_node(
|
||||
"extract_ert",
|
||||
extract_ert,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
"generate_simple_sql",
|
||||
generate_simple_sql,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
"filtered_search",
|
||||
filtered_search,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
"analyze",
|
||||
analyze,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
"construct_deep_search_filters",
|
||||
construct_deep_search_filters,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
"process_individual_deep_search",
|
||||
process_individual_deep_search,
|
||||
)
|
||||
|
||||
graph.add_node(
|
||||
"consolidate_individual_deep_search",
|
||||
consolidate_individual_deep_search,
|
||||
)
|
||||
|
||||
graph.add_node("process_kg_only_answers", process_kg_only_answers)
|
||||
|
||||
graph.add_node(
|
||||
"generate_answer",
|
||||
generate_answer,
|
||||
)
|
||||
|
||||
### Add edges ###
|
||||
|
||||
graph.add_edge(start_key=START, end_key="extract_ert")
|
||||
|
||||
graph.add_edge(
|
||||
start_key="extract_ert",
|
||||
end_key="analyze",
|
||||
)
|
||||
|
||||
graph.add_edge(
|
||||
start_key="analyze",
|
||||
end_key="generate_simple_sql",
|
||||
)
|
||||
|
||||
graph.add_conditional_edges("generate_simple_sql", simple_vs_search)
|
||||
|
||||
graph.add_edge(start_key="process_kg_only_answers", end_key="generate_answer")
|
||||
|
||||
graph.add_conditional_edges(
|
||||
source="construct_deep_search_filters",
|
||||
path=research_individual_object,
|
||||
path_map=["process_individual_deep_search", "filtered_search"],
|
||||
)
|
||||
|
||||
graph.add_edge(
|
||||
start_key="process_individual_deep_search",
|
||||
end_key="consolidate_individual_deep_search",
|
||||
)
|
||||
|
||||
graph.add_edge(
|
||||
start_key="consolidate_individual_deep_search", end_key="generate_answer"
|
||||
)
|
||||
|
||||
graph.add_edge(
|
||||
start_key="filtered_search",
|
||||
end_key="generate_answer",
|
||||
)
|
||||
|
||||
graph.add_edge(start_key="generate_answer", end_key=END)
|
||||
|
||||
return graph
|
||||
163
backend/onyx/agents/agent_search/naomi/nodes/generate_answer.py
Normal file
163
backend/onyx/agents/agent_search/naomi/nodes/generate_answer.py
Normal file
@@ -0,0 +1,163 @@
|
||||
from datetime import datetime
|
||||
from typing import cast
|
||||
|
||||
from langchain_core.runnables import RunnableConfig
|
||||
from langgraph.types import StreamWriter
|
||||
|
||||
from onyx.access.access import get_acl_for_user
|
||||
from onyx.agents.agent_search.kb_search.graph_utils import rename_entities_in_answer
|
||||
from onyx.agents.agent_search.kb_search.graph_utils import stream_write_close_steps
|
||||
from onyx.agents.agent_search.kb_search.ops import research
|
||||
from onyx.agents.agent_search.kb_search.states import MainOutput
|
||||
from onyx.agents.agent_search.kb_search.states import MainState
|
||||
from onyx.agents.agent_search.models import GraphConfig
|
||||
from onyx.agents.agent_search.naomi.states import KBShortOutput
|
||||
from onyx.agents.agent_search.shared_graph_utils.calculations import (
|
||||
get_answer_generation_documents,
|
||||
)
|
||||
from onyx.agents.agent_search.shared_graph_utils.utils import relevance_from_docs
|
||||
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
|
||||
from onyx.chat.models import ExtendedToolResponse
|
||||
from onyx.configs.kg_configs import KG_RESEARCH_NUM_RETRIEVED_DOCS
|
||||
from onyx.context.search.enums import SearchType
|
||||
from onyx.context.search.models import InferenceSection
|
||||
from onyx.db.engine.sql_engine import get_session_with_current_tenant
|
||||
from onyx.prompts.kg_prompts import OUTPUT_FORMAT_NO_EXAMPLES_PROMPT
|
||||
from onyx.prompts.kg_prompts import OUTPUT_FORMAT_NO_OVERALL_ANSWER_PROMPT
|
||||
from onyx.tools.tool_implementations.search.search_tool import IndexFilters
|
||||
from onyx.tools.tool_implementations.search.search_tool import SearchQueryInfo
|
||||
from onyx.tools.tool_implementations.search.search_tool import yield_search_responses
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def generate_answer(
|
||||
state: MainState, config: RunnableConfig, writer: StreamWriter = lambda _: None
|
||||
) -> MainOutput:
|
||||
"""
|
||||
LangGraph node to start the agentic search process.
|
||||
"""
|
||||
|
||||
datetime.now()
|
||||
|
||||
graph_config = cast(GraphConfig, config["metadata"]["config"])
|
||||
question = graph_config.inputs.prompt_builder.raw_user_query
|
||||
|
||||
user = (
|
||||
graph_config.tooling.search_tool.user
|
||||
if graph_config.tooling.search_tool
|
||||
else None
|
||||
)
|
||||
|
||||
if not user:
|
||||
raise ValueError("User is not set")
|
||||
|
||||
search_tool = graph_config.tooling.search_tool
|
||||
if search_tool is None:
|
||||
raise ValueError("Search tool is not set")
|
||||
|
||||
# Close out previous streams of steps
|
||||
|
||||
# DECLARE STEPS DONE
|
||||
|
||||
stream_write_close_steps(writer)
|
||||
|
||||
## MAIN ANSWER
|
||||
|
||||
# identify whether documents have already been retrieved
|
||||
|
||||
retrieved_docs: list[InferenceSection] = []
|
||||
for step_result in state.step_results:
|
||||
retrieved_docs += step_result.verified_reranked_documents
|
||||
|
||||
# if still needed, get a search done and send the results to the UI
|
||||
|
||||
if not retrieved_docs and state.source_document_results:
|
||||
assert graph_config.tooling.search_tool is not None
|
||||
retrieved_docs = cast(
|
||||
list[InferenceSection],
|
||||
research(
|
||||
question=question,
|
||||
kg_entities=[],
|
||||
kg_relationships=[],
|
||||
kg_sources=state.source_document_results[
|
||||
:KG_RESEARCH_NUM_RETRIEVED_DOCS
|
||||
],
|
||||
search_tool=graph_config.tooling.search_tool,
|
||||
kg_chunk_id_zero_only=True,
|
||||
inference_sections_only=True,
|
||||
),
|
||||
)
|
||||
|
||||
answer_generation_documents = get_answer_generation_documents(
|
||||
relevant_docs=retrieved_docs,
|
||||
context_documents=retrieved_docs,
|
||||
original_question_docs=retrieved_docs,
|
||||
max_docs=KG_RESEARCH_NUM_RETRIEVED_DOCS,
|
||||
)
|
||||
|
||||
relevance_list = relevance_from_docs(
|
||||
answer_generation_documents.streaming_documents
|
||||
)
|
||||
|
||||
assert graph_config.tooling.search_tool is not None
|
||||
|
||||
with get_session_with_current_tenant() as graph_db_session:
|
||||
user_acl = list(get_acl_for_user(user, graph_db_session))
|
||||
|
||||
for tool_response in yield_search_responses(
|
||||
query=question,
|
||||
get_retrieved_sections=lambda: answer_generation_documents.context_documents,
|
||||
get_final_context_sections=lambda: answer_generation_documents.context_documents,
|
||||
search_query_info=SearchQueryInfo(
|
||||
predicted_search=SearchType.KEYWORD,
|
||||
# acl here is empty, because the searach alrady happened and
|
||||
# we are streaming out the results.
|
||||
final_filters=IndexFilters(access_control_list=user_acl),
|
||||
recency_bias_multiplier=1.0,
|
||||
),
|
||||
get_section_relevance=lambda: relevance_list,
|
||||
search_tool=graph_config.tooling.search_tool,
|
||||
):
|
||||
write_custom_event(
|
||||
"tool_response",
|
||||
ExtendedToolResponse(
|
||||
id=tool_response.id,
|
||||
response=tool_response.response,
|
||||
level=0,
|
||||
level_question_num=0, # 0, 0 is the base question
|
||||
),
|
||||
writer,
|
||||
)
|
||||
|
||||
# if deep path was taken:
|
||||
|
||||
consolidated_research_object_results_str = (
|
||||
state.consolidated_research_object_results_str
|
||||
)
|
||||
# reference_results_str = (
|
||||
# state.reference_results_str
|
||||
# ) # will not be part of LLM. Manually added to the answer
|
||||
|
||||
# if simple path was taken:
|
||||
introductory_answer = state.query_results_data_str # from simple answer path only
|
||||
if consolidated_research_object_results_str:
|
||||
research_results = consolidated_research_object_results_str
|
||||
else:
|
||||
research_results = ""
|
||||
|
||||
if introductory_answer:
|
||||
output_format_prompt = rename_entities_in_answer(introductory_answer)
|
||||
elif research_results and consolidated_research_object_results_str:
|
||||
output_format_prompt = rename_entities_in_answer(consolidated_research_object_results_str)
|
||||
elif research_results and not consolidated_research_object_results_str:
|
||||
output_format_prompt = rename_entities_in_answer(research_results)
|
||||
elif consolidated_research_object_results_str:
|
||||
output_format_prompt = rename_entities_in_answer(research_results)
|
||||
else:
|
||||
raise ValueError("No research results or introductory answer provided")
|
||||
|
||||
return KBShortOutput(
|
||||
output=output_format_prompt,
|
||||
)
|
||||
244
backend/onyx/agents/agent_search/naomi/nodes/nodes.py
Normal file
244
backend/onyx/agents/agent_search/naomi/nodes/nodes.py
Normal file
@@ -0,0 +1,244 @@
|
||||
import json
|
||||
from typing import cast
|
||||
|
||||
from langchain_core.runnables.config import RunnableConfig
|
||||
from langgraph.types import StreamWriter
|
||||
|
||||
from onyx.agents.agent_search.basic.states import BasicInput
|
||||
from onyx.agents.agent_search.basic.utils import process_llm_stream
|
||||
from onyx.agents.agent_search.kb_search.states import MainInput as KBMainInput
|
||||
from onyx.agents.agent_search.models import GraphConfig
|
||||
from onyx.agents.agent_search.naomi.basic_graph_builder import basic_graph_builder
|
||||
from onyx.agents.agent_search.naomi.kb_graph_builder import kb_graph_builder
|
||||
from onyx.agents.agent_search.naomi.states import ExecutionStage
|
||||
from onyx.agents.agent_search.naomi.states import NaomiState
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def decision_node(state: NaomiState, config: RunnableConfig) -> NaomiState:
|
||||
"""
|
||||
Decision node that determines which graph to execute next based on current stage.
|
||||
"""
|
||||
logger.info(f"Decision node: current stage is {state.current_stage}")
|
||||
structured_output_format= \
|
||||
{
|
||||
"type": "json_schema",
|
||||
"json_schema": {
|
||||
"name": "execution_stage_enum",
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"execution_stage": {
|
||||
"type": "string",
|
||||
"enum": ["BASIC", "KB_SEARCH", "COMPLETE"],
|
||||
"description": "The current execution stage of the process"
|
||||
}
|
||||
},
|
||||
"required": ["execution_stage"],
|
||||
"additionalProperties": False
|
||||
},
|
||||
"strict": True
|
||||
}
|
||||
}
|
||||
|
||||
# For now, we'll implement a simple flow: BASIC -> KB_SEARCH -> COMPLETE
|
||||
# This can be enhanced with more sophisticated logic later
|
||||
|
||||
agent_config = cast(GraphConfig, config["metadata"]["config"])
|
||||
llm = agent_config.tooling.primary_llm
|
||||
|
||||
prompt = f"""You are an agent with access to a basic search tool and a knowledge graph search tool. You are given a question and you need to decide whether you neeed to use a tool and if so, which tool to use. Both tools have access to the same base information (which is currently Linear tickets for the Onyx team). However, basic search performs information retrieval from these sources while using the knowledge graph takes the query and performs a relevant SQL query in a postgres database with tables of identified entities and their relationships. Select the tool you believe will be most effective for answering the question. If both tools could be effective, you should use the basic search tool. Return "BASIC" if you need to use the basic search tool, "KB_SEARCH" if you need to use the knowledge graph search tool, and "COMPLETE" if you don't need to use any tool. After each tool is used, you will see the results of from your previous tool calls. You should use the results of your previous tool calls to decide which tool to use next. "COMPLETE" should only be returned if you have already used at least one of the tools and have no more information to gather.
|
||||
|
||||
Here is the query: <START_OF_QUERY>{agent_config.inputs.prompt_builder.raw_user_query}<END_OF_QUERY>
|
||||
|
||||
This is the information you've already collected on previous steps:
|
||||
|
||||
Basic Search Results:
|
||||
|
||||
{[result["short_output"] for result in state.basic_results]}
|
||||
|
||||
----------------------------------
|
||||
|
||||
KB Search Results:
|
||||
|
||||
{state.kb_search_results}
|
||||
|
||||
----------------------------------
|
||||
|
||||
Please make your selection:
|
||||
"""
|
||||
|
||||
output = llm.invoke(
|
||||
prompt,
|
||||
structured_response_format=structured_output_format
|
||||
)
|
||||
|
||||
|
||||
print(prompt)
|
||||
# When using structured_response_format, the content is a JSON string
|
||||
# that needs to be parsed
|
||||
parsed_content = json.loads(output.content)
|
||||
execution_stage = parsed_content["execution_stage"]
|
||||
|
||||
print("EXECUTION STAGE:", execution_stage)
|
||||
|
||||
if execution_stage == ExecutionStage.BASIC:
|
||||
# If basic stage is complete, move to kb_search
|
||||
state.current_stage = ExecutionStage.BASIC
|
||||
logger.info("Moving to BASIC stage")
|
||||
elif execution_stage == ExecutionStage.KB_SEARCH:
|
||||
state.current_stage = ExecutionStage.KB_SEARCH
|
||||
logger.info("Moving to KB_SEARCH stage")
|
||||
elif execution_stage == ExecutionStage.COMPLETE:
|
||||
state.current_stage = ExecutionStage.COMPLETE
|
||||
logger.info("Moving to COMPLETE stage")
|
||||
else:
|
||||
raise ValueError(f"Invalid stage: {state.current_stage}")
|
||||
|
||||
return state
|
||||
|
||||
|
||||
def execute_basic_graph(state: NaomiState, config: RunnableConfig) -> NaomiState:
|
||||
"""
|
||||
Execute the basic graph and store results in state.
|
||||
"""
|
||||
logger.info("Executing basic graph")
|
||||
|
||||
try:
|
||||
# Extract GraphConfig from RunnableConfig
|
||||
graph_config = cast(GraphConfig, config["metadata"]["config"])
|
||||
|
||||
# Get the basic graph
|
||||
basic_graph = basic_graph_builder()
|
||||
compiled_basic_graph = basic_graph.compile()
|
||||
|
||||
basic_input = BasicInput(unused=True)
|
||||
|
||||
# Execute the basic graph directly with invoke
|
||||
result = compiled_basic_graph.invoke(
|
||||
basic_input, config={"metadata": {"config": graph_config}}
|
||||
)
|
||||
|
||||
tool_call_output = result["tool_call_output"]
|
||||
tool_call_responses = tool_call_output.tool_call_responses
|
||||
|
||||
tool_choice = result["tool_choice"]
|
||||
if tool_choice is None:
|
||||
raise ValueError("Tool choice is None")
|
||||
tool = tool_choice.tool
|
||||
|
||||
# Store results in state
|
||||
# The result should contain the tool_call_chunk from BasicOutput
|
||||
state.basic_results.append(
|
||||
{
|
||||
"output": tool.build_tool_message_content(*tool_call_responses),
|
||||
"short_output": tool_call_output.tool_call_summary,
|
||||
"status": "completed",
|
||||
}
|
||||
)
|
||||
|
||||
logger.info("Basic graph execution completed")
|
||||
state.log_messages.append("Basic graph execution completed")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing basic graph: {e}")
|
||||
state.basic_results.append({"error": str(e), "status": "failed"})
|
||||
|
||||
return state
|
||||
|
||||
|
||||
def execute_kb_search_graph(state: NaomiState, config: RunnableConfig) -> NaomiState:
|
||||
"""
|
||||
Execute the kb_search graph and store results in state.
|
||||
"""
|
||||
logger.info("Executing kb_search graph")
|
||||
|
||||
try:
|
||||
# Extract GraphConfig from RunnableConfig
|
||||
graph_config = cast(GraphConfig, config["metadata"]["config"])
|
||||
|
||||
# Get the kb_search graph
|
||||
kb_graph = kb_graph_builder()
|
||||
compiled_kb_graph = kb_graph.compile()
|
||||
|
||||
input = KBMainInput(log_messages=[])
|
||||
|
||||
# Execute the kb_search graph directly with invoke
|
||||
result = compiled_kb_graph.invoke(
|
||||
input, config={"metadata": {"config": graph_config}}
|
||||
)
|
||||
|
||||
# Store results in state
|
||||
if out := result["output"]:
|
||||
state.kb_search_results.append({"output": out, "status": "completed"})
|
||||
else:
|
||||
state.kb_search_results.append(
|
||||
{"error": "No results found", "status": "failed"}
|
||||
)
|
||||
|
||||
logger.info("KB search graph execution completed")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing kb_search graph: {e}")
|
||||
state.kb_search_results.append({"error": str(e), "status": "failed"})
|
||||
|
||||
return state
|
||||
|
||||
|
||||
def finalize_results(state: NaomiState, config: RunnableConfig, writer: StreamWriter = lambda _: None) -> NaomiState:
|
||||
"""
|
||||
Combine results from both graphs and create final answer.
|
||||
"""
|
||||
logger.info("Finalizing results")
|
||||
|
||||
# Combine results from both graphs
|
||||
basic_answers = []
|
||||
kb_answers = []
|
||||
|
||||
for basic_result in state.basic_results:
|
||||
if basic_result.get("status") == "completed":
|
||||
basic_answers.append(basic_result.get("short_output", ""))
|
||||
|
||||
for kb_result in state.kb_search_results:
|
||||
if kb_result.get("status") == "completed":
|
||||
kb_answers.append(kb_result.get("output", ""))
|
||||
|
||||
|
||||
agent_config = cast(GraphConfig, config["metadata"]["config"])
|
||||
llm = agent_config.tooling.primary_llm
|
||||
|
||||
prompt = f"""You are an agent with access to a basic search tool and a knowledge graph search tool. You are given a question and you need to decide whether you neeed to use a tool and if so, which tool to use. Both tools have access to the same base information (which is currently Linear tickets for the Onyx team). However, basic search performs information retrieval from these sources while using the knowledge graph takes the query and performs a relevant SQL query in a postgres database with tables of identified entities and their relationships. The following is the information you've already collected on previous steps:
|
||||
|
||||
Basic Search Results:
|
||||
|
||||
{basic_answers}
|
||||
|
||||
----------------------------------
|
||||
|
||||
KB Search Results:
|
||||
|
||||
{kb_answers}
|
||||
|
||||
----------------------------------
|
||||
|
||||
Here is the original user query: <START_OF_QUERY>{agent_config.inputs.prompt_builder.raw_user_query}<END_OF_QUERY>
|
||||
|
||||
Please answer the question based on the information you've collected:
|
||||
"""
|
||||
|
||||
output = llm.invoke(prompt)
|
||||
|
||||
|
||||
state.final_answer = output.content
|
||||
|
||||
stream = llm.stream(prompt)
|
||||
|
||||
process_llm_stream(
|
||||
stream,
|
||||
True,
|
||||
writer,
|
||||
)
|
||||
|
||||
return state
|
||||
46
backend/onyx/agents/agent_search/naomi/states.py
Normal file
46
backend/onyx/agents/agent_search/naomi/states.py
Normal file
@@ -0,0 +1,46 @@
|
||||
from enum import Enum
|
||||
from operator import add
|
||||
from typing import Annotated
|
||||
from typing import Any
|
||||
from typing import TypedDict
|
||||
|
||||
from onyx.agents.agent_search.basic.states import BasicState
|
||||
from onyx.agents.agent_search.core_state import CoreState
|
||||
from onyx.agents.agent_search.kb_search.states import MainState as KBMainState
|
||||
from onyx.agents.agent_search.orchestration.states import ToolCallOutput
|
||||
from onyx.agents.agent_search.orchestration.states import ToolChoice
|
||||
|
||||
|
||||
class ExecutionStage(str,Enum):
|
||||
"""Stages of execution for the orchestration graph"""
|
||||
|
||||
BASIC = "BASIC"
|
||||
KB_SEARCH = "KB_SEARCH"
|
||||
COMPLETE = "COMPLETE"
|
||||
|
||||
|
||||
class NaomiInput(CoreState):
|
||||
"""Input state for the naomi orchestration graph"""
|
||||
|
||||
|
||||
class NaomiOutput(TypedDict):
|
||||
log_messages: list[str]
|
||||
|
||||
|
||||
class KBShortOutput(TypedDict):
|
||||
output: str
|
||||
|
||||
|
||||
class BasicShortOutput(TypedDict):
|
||||
tool_call_output: ToolCallOutput | None = None
|
||||
tool_choice: ToolChoice | None = None
|
||||
|
||||
|
||||
class NaomiState(NaomiInput, KBMainState, BasicState):
|
||||
"""Main state for the naomi orchestration graph"""
|
||||
|
||||
current_stage: ExecutionStage = ExecutionStage.BASIC
|
||||
input_state: NaomiInput | None = None
|
||||
basic_results: Annotated[list[dict[str, Any]], add] = []
|
||||
kb_search_results: Annotated[list[dict[str, Any]], add] = []
|
||||
final_answer: str | None = None
|
||||
@@ -21,6 +21,8 @@ from onyx.agents.agent_search.deep_search.main.states import (
|
||||
from onyx.agents.agent_search.kb_search.graph_builder import kb_graph_builder
|
||||
from onyx.agents.agent_search.kb_search.states import MainInput as KBMainInput
|
||||
from onyx.agents.agent_search.models import GraphConfig
|
||||
from onyx.agents.agent_search.naomi.graph_builder import naomi_graph_builder
|
||||
from onyx.agents.agent_search.naomi.states import NaomiInput
|
||||
from onyx.agents.agent_search.shared_graph_utils.utils import get_test_config
|
||||
from onyx.chat.models import AgentAnswerPiece
|
||||
from onyx.chat.models import AnswerPacket
|
||||
@@ -90,7 +92,7 @@ def _parse_agent_event(
|
||||
def manage_sync_streaming(
|
||||
compiled_graph: CompiledStateGraph,
|
||||
config: GraphConfig,
|
||||
graph_input: BasicInput | MainInput | DCMainInput | KBMainInput,
|
||||
graph_input: BasicInput | MainInput | DCMainInput | KBMainInput | NaomiInput,
|
||||
) -> Iterable[StreamEvent]:
|
||||
message_id = config.persistence.message_id if config.persistence else None
|
||||
for event in compiled_graph.stream(
|
||||
@@ -104,7 +106,7 @@ def manage_sync_streaming(
|
||||
def run_graph(
|
||||
compiled_graph: CompiledStateGraph,
|
||||
config: GraphConfig,
|
||||
input: BasicInput | MainInput | DCMainInput | KBMainInput,
|
||||
input: BasicInput | MainInput | DCMainInput | KBMainInput | NaomiInput,
|
||||
) -> AnswerStream:
|
||||
|
||||
for event in manage_sync_streaming(
|
||||
@@ -176,6 +178,25 @@ def run_dc_graph(
|
||||
return run_graph(compiled_graph, config, input)
|
||||
|
||||
|
||||
def run_naomi_graph(
|
||||
config: GraphConfig,
|
||||
) -> AnswerStream:
|
||||
"""
|
||||
Run the naomi orchestration graph that executes both basic and kb_search graphs.
|
||||
"""
|
||||
graph = naomi_graph_builder()
|
||||
compiled_graph = graph.compile()
|
||||
input = NaomiInput(log_messages=[])
|
||||
|
||||
# Send tool call kickoff for frontend
|
||||
yield ToolCallKickoff(
|
||||
tool_name="naomi_orchestration",
|
||||
tool_args={"query": config.inputs.prompt_builder.raw_user_query},
|
||||
)
|
||||
|
||||
yield from run_graph(compiled_graph, config, input)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
for _ in range(1):
|
||||
query_start_time = datetime.now()
|
||||
|
||||
@@ -12,7 +12,7 @@ from onyx.agents.agent_search.models import GraphTooling
|
||||
from onyx.agents.agent_search.run_graph import run_agent_search_graph
|
||||
from onyx.agents.agent_search.run_graph import run_basic_graph
|
||||
from onyx.agents.agent_search.run_graph import run_dc_graph
|
||||
from onyx.agents.agent_search.run_graph import run_kb_graph
|
||||
from onyx.agents.agent_search.run_graph import run_naomi_graph
|
||||
from onyx.chat.models import AgentAnswerPiece
|
||||
from onyx.chat.models import AnswerPacket
|
||||
from onyx.chat.models import AnswerStream
|
||||
@@ -69,6 +69,7 @@ class Answer:
|
||||
is_connected: Callable[[], bool] | None = None,
|
||||
use_agentic_search: bool = False,
|
||||
) -> None:
|
||||
|
||||
self.is_connected: Callable[[], bool] | None = is_connected
|
||||
self._processed_stream: list[AnswerPacket] | None = None
|
||||
self._is_cancelled = False
|
||||
@@ -143,9 +144,12 @@ class Answer:
|
||||
and self.graph_config.behavior.kg_config_settings.KG_ENABLED
|
||||
and self.graph_config.inputs.persona.name.startswith("KG Beta")
|
||||
):
|
||||
run_langgraph = run_kb_graph
|
||||
# run_langgraph = run_kb_graph
|
||||
run_langgraph = run_naomi_graph
|
||||
print("WE ARE RUNNING KB GRAPH!")
|
||||
elif self.graph_config.behavior.use_agentic_search:
|
||||
run_langgraph = run_agent_search_graph
|
||||
print("WE ARE RUNNING AGENT SEARCH GRAPH!")
|
||||
elif (
|
||||
self.graph_config.inputs.persona
|
||||
and USE_DIV_CON_AGENT
|
||||
@@ -154,8 +158,10 @@ class Answer:
|
||||
)
|
||||
):
|
||||
run_langgraph = run_dc_graph
|
||||
print("WE ARE RUNNING DIV CON GRAPH!")
|
||||
else:
|
||||
run_langgraph = run_basic_graph
|
||||
print("WE ARE RUNNING BASIC GRAPH!")
|
||||
|
||||
stream = run_langgraph(
|
||||
self.graph_config,
|
||||
|
||||
Reference in New Issue
Block a user