mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-04-09 17:02:48 +00:00
Compare commits
14 Commits
edge
...
experiment
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8b610581be | ||
|
|
6b1c2368f6 | ||
|
|
77950e4c38 | ||
|
|
bccf033967 | ||
|
|
8decd51862 | ||
|
|
86fc5edf91 | ||
|
|
efe0d0172c | ||
|
|
c7533690b3 | ||
|
|
ac9d0eb7fc | ||
|
|
d20194051b | ||
|
|
1acbac059b | ||
|
|
f8cffc6009 | ||
|
|
3202ca6c96 | ||
|
|
15dadd0d3b |
@@ -0,0 +1,71 @@
|
||||
import os
|
||||
from typing import Any
|
||||
from typing import Optional
|
||||
|
||||
from langchain_core.runnables import RunnableConfig
|
||||
from pydantic import BaseModel
|
||||
from pydantic import Field
|
||||
|
||||
|
||||
class DeepResearchConfiguration(BaseModel):
|
||||
"""The configuration for the deep research agent."""
|
||||
|
||||
query_generator_model: str = Field(
|
||||
default="primary",
|
||||
metadata={
|
||||
"description": "The name of the language model to use for the agent's query generation."
|
||||
},
|
||||
)
|
||||
|
||||
reflection_model: str = Field(
|
||||
default="primary",
|
||||
metadata={
|
||||
"description": "The name of the language model to use for the agent's reflection."
|
||||
},
|
||||
)
|
||||
|
||||
answer_model: str = Field(
|
||||
default="primary",
|
||||
metadata={
|
||||
"description": "The name of the language model to use for the agent's answer."
|
||||
},
|
||||
)
|
||||
|
||||
number_of_initial_queries: int = Field(
|
||||
default=3,
|
||||
metadata={"description": "The number of initial search queries to generate."},
|
||||
)
|
||||
|
||||
max_research_loops: int = Field(
|
||||
default=2,
|
||||
metadata={"description": "The maximum number of research loops to perform."},
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_runnable_config(
|
||||
cls, config: Optional[RunnableConfig] = None
|
||||
) -> "DeepResearchConfiguration":
|
||||
"""Create a Configuration instance from a RunnableConfig."""
|
||||
configurable = (
|
||||
config["configurable"] if config and "configurable" in config else {}
|
||||
)
|
||||
|
||||
# Get raw values from environment or config
|
||||
raw_values: dict[str, Any] = {
|
||||
name: os.environ.get(name.upper(), configurable.get(name))
|
||||
for name in cls.model_fields.keys()
|
||||
}
|
||||
|
||||
# Filter out None values
|
||||
values = {k: v for k, v in raw_values.items() if v is not None}
|
||||
|
||||
return cls(**values)
|
||||
|
||||
|
||||
class DeepPlannerConfiguration(BaseModel):
|
||||
"""The configuration for the deep planner agent."""
|
||||
|
||||
max_steps: int = Field(
|
||||
default=10,
|
||||
metadata={"description": "The maximum number of steps to perform."},
|
||||
)
|
||||
750
backend/onyx/agents/agent_search/deep_research/graph_builder.py
Normal file
750
backend/onyx/agents/agent_search/deep_research/graph_builder.py
Normal file
@@ -0,0 +1,750 @@
|
||||
import random
|
||||
from datetime import datetime
|
||||
from json import JSONDecodeError
|
||||
from pprint import pprint
|
||||
from typing import cast
|
||||
|
||||
from langchain.globals import set_debug
|
||||
from langchain.globals import set_verbose
|
||||
from langchain_core.messages import AIMessage
|
||||
from langchain_core.messages import HumanMessage
|
||||
from langchain_core.runnables import RunnableConfig
|
||||
from langgraph.graph import END
|
||||
from langgraph.graph import START
|
||||
from langgraph.graph import StateGraph
|
||||
from langgraph.types import Send
|
||||
from langgraph.types import StreamWriter
|
||||
|
||||
from onyx.agents.agent_search.deep_research.configuration import (
|
||||
DeepPlannerConfiguration,
|
||||
)
|
||||
from onyx.agents.agent_search.deep_research.configuration import (
|
||||
DeepResearchConfiguration,
|
||||
)
|
||||
from onyx.agents.agent_search.deep_research.prompts import answer_instructions
|
||||
from onyx.agents.agent_search.deep_research.prompts import COMPANY_CONTEXT
|
||||
from onyx.agents.agent_search.deep_research.prompts import COMPANY_NAME
|
||||
from onyx.agents.agent_search.deep_research.prompts import get_current_date
|
||||
from onyx.agents.agent_search.deep_research.prompts import planner_prompt
|
||||
from onyx.agents.agent_search.deep_research.prompts import query_writer_instructions
|
||||
from onyx.agents.agent_search.deep_research.prompts import reflection_instructions
|
||||
from onyx.agents.agent_search.deep_research.prompts import replanner_prompt
|
||||
from onyx.agents.agent_search.deep_research.prompts import task_completion_prompt
|
||||
from onyx.agents.agent_search.deep_research.prompts import task_to_query_prompt
|
||||
from onyx.agents.agent_search.deep_research.states import OnyxSearchState
|
||||
from onyx.agents.agent_search.deep_research.states import OverallState
|
||||
from onyx.agents.agent_search.deep_research.states import PlanExecute
|
||||
from onyx.agents.agent_search.deep_research.states import QueryGenerationState
|
||||
from onyx.agents.agent_search.deep_research.states import ReflectionState
|
||||
from onyx.agents.agent_search.deep_research.tools_and_schemas import Act
|
||||
from onyx.agents.agent_search.deep_research.tools_and_schemas import json_to_pydantic
|
||||
from onyx.agents.agent_search.deep_research.tools_and_schemas import Plan
|
||||
from onyx.agents.agent_search.deep_research.tools_and_schemas import Reflection
|
||||
from onyx.agents.agent_search.deep_research.tools_and_schemas import Response
|
||||
from onyx.agents.agent_search.deep_research.tools_and_schemas import SearchQueryList
|
||||
from onyx.agents.agent_search.deep_research.utils import collate_messages
|
||||
from onyx.agents.agent_search.deep_research.utils import get_research_topic
|
||||
from onyx.agents.agent_search.models import GraphConfig
|
||||
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
|
||||
from onyx.chat.models import AgentAnswerPiece
|
||||
from onyx.chat.models import AnswerStyleConfig
|
||||
from onyx.chat.models import CitationConfig
|
||||
from onyx.chat.models import DocumentPruningConfig
|
||||
from onyx.chat.models import PromptConfig
|
||||
from onyx.context.search.enums import LLMEvaluationType
|
||||
from onyx.context.search.models import InferenceSection
|
||||
from onyx.db.engine import get_session_with_current_tenant
|
||||
from onyx.db.models import Persona
|
||||
from onyx.llm.factory import get_default_llms
|
||||
from onyx.natural_language_processing.utils import get_tokenizer
|
||||
from onyx.natural_language_processing.utils import tokenizer_trim_content
|
||||
from onyx.tools.models import SearchToolOverrideKwargs
|
||||
from onyx.tools.tool_implementations.search.search_tool import (
|
||||
SEARCH_RESPONSE_SUMMARY_ID,
|
||||
)
|
||||
from onyx.tools.tool_implementations.search.search_tool import SearchResponseSummary
|
||||
from onyx.tools.tool_implementations.search.search_tool import SearchTool
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
test_mode = False
|
||||
IS_DEBUG = False
|
||||
IS_VERBOSE = False
|
||||
MAX_RETRIEVED_DOCS = 10
|
||||
|
||||
|
||||
def mock_do_onyx_search(query: str) -> str:
|
||||
random_answers = [
|
||||
"Onyx is a startup founded by Yuhong Sun and Chris Weaver.",
|
||||
"Chris Weaver was born in the country of Wakanda",
|
||||
"Yuhong Sun is the CEO of Onyx",
|
||||
"Yuhong Sun was born in the country of Valhalla",
|
||||
]
|
||||
return {"text": random.choice(random_answers)}
|
||||
|
||||
|
||||
def do_onyx_search(query: str) -> dict[str, str]:
|
||||
"""
|
||||
Perform a search using the SearchTool and return the results.
|
||||
|
||||
Args:
|
||||
query: The search query string
|
||||
|
||||
Returns:
|
||||
Dictionary containing the search results text
|
||||
"""
|
||||
retrieved_docs: list[InferenceSection] = []
|
||||
primary_llm, fast_llm = get_default_llms()
|
||||
try:
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
# Create a default persona with basic settings
|
||||
default_persona = Persona(
|
||||
name="default",
|
||||
chunks_above=2,
|
||||
chunks_below=2,
|
||||
description="Default persona for search",
|
||||
)
|
||||
|
||||
search_tool = SearchTool(
|
||||
db_session=db_session,
|
||||
user=None,
|
||||
persona=default_persona,
|
||||
retrieval_options=None,
|
||||
prompt_config=PromptConfig(
|
||||
system_prompt="You are a helpful assistant.",
|
||||
task_prompt="Answer the user's question based on the provided context.",
|
||||
datetime_aware=True,
|
||||
include_citations=True,
|
||||
),
|
||||
llm=primary_llm,
|
||||
fast_llm=fast_llm,
|
||||
pruning_config=DocumentPruningConfig(),
|
||||
answer_style_config=AnswerStyleConfig(
|
||||
citation_config=CitationConfig(
|
||||
include_citations=True, citation_style="inline"
|
||||
)
|
||||
),
|
||||
evaluation_type=LLMEvaluationType.SKIP,
|
||||
)
|
||||
|
||||
for tool_response in search_tool.run(
|
||||
query=query,
|
||||
override_kwargs=SearchToolOverrideKwargs(
|
||||
force_no_rerank=True,
|
||||
alternate_db_session=db_session,
|
||||
retrieved_sections_callback=None,
|
||||
skip_query_analysis=False,
|
||||
),
|
||||
):
|
||||
if tool_response.id == SEARCH_RESPONSE_SUMMARY_ID:
|
||||
response = cast(SearchResponseSummary, tool_response.response)
|
||||
retrieved_docs = response.top_sections
|
||||
break
|
||||
|
||||
# Combine the retrieved documents into a single text
|
||||
combined_text = "\n\n".join(
|
||||
[doc.combined_content for doc in retrieved_docs[:MAX_RETRIEVED_DOCS]]
|
||||
)
|
||||
return {"text": combined_text}
|
||||
except Exception as e:
|
||||
logger.error(f"Error in do_onyx_search: {e}")
|
||||
return {"text": "Error in search, no results returned"}
|
||||
|
||||
|
||||
def generate_query(state: OverallState, config: RunnableConfig) -> QueryGenerationState:
|
||||
"""
|
||||
LangGraph node that generates a search queries based on the User's question.
|
||||
|
||||
Uses an LLM to create an optimized search query for onyx research based on
|
||||
the User's question.
|
||||
|
||||
Args:
|
||||
state: Current graph state containing the User's question
|
||||
config: Configuration for the runnable
|
||||
|
||||
Returns:
|
||||
Dictionary with state update, including search_query key containing the generated query
|
||||
"""
|
||||
configurable = DeepResearchConfiguration.from_runnable_config(config)
|
||||
|
||||
# check for custom initial search query count
|
||||
if state.get("initial_search_query_count") is None:
|
||||
state["initial_search_query_count"] = configurable.number_of_initial_queries
|
||||
|
||||
primary_llm, fast_llm = get_default_llms()
|
||||
llm = primary_llm if configurable.query_generator_model == "primary" else fast_llm
|
||||
|
||||
# Format the prompt
|
||||
current_date = get_current_date()
|
||||
formatted_prompt = query_writer_instructions.format(
|
||||
current_date=current_date,
|
||||
research_topic=get_research_topic(state["messages"]),
|
||||
number_queries=state["initial_search_query_count"],
|
||||
company_name=COMPANY_NAME,
|
||||
company_context=COMPANY_CONTEXT,
|
||||
user_context=collate_messages(state["messages"]),
|
||||
)
|
||||
|
||||
# Get the LLM response and extract its content
|
||||
llm_response = llm.invoke(formatted_prompt)
|
||||
try:
|
||||
result = json_to_pydantic(llm_response.content, SearchQueryList)
|
||||
return {"query_list": result.query}
|
||||
except JSONDecodeError:
|
||||
return {"query_list": [llm_response.content]}
|
||||
|
||||
|
||||
def continue_to_onyx_research(state: QueryGenerationState) -> OverallState:
|
||||
"""
|
||||
LangGraph node that sends the search queries to the onyx research node.
|
||||
|
||||
This is used to spawn n number of onyx research nodes, one for each search query.
|
||||
"""
|
||||
return [
|
||||
Send("onyx_research", {"search_query": search_query, "id": int(idx)})
|
||||
for idx, search_query in enumerate(state["query_list"])
|
||||
]
|
||||
|
||||
|
||||
def onyx_research(state: OnyxSearchState, config: RunnableConfig) -> OverallState:
|
||||
"""LangGraph node that performs onyx research using onyx search interface.
|
||||
|
||||
Executes an onyx search in combination with an llm.
|
||||
|
||||
Args:
|
||||
state: Current graph state containing the search query and research loop count
|
||||
config: Configuration for the runnable, including any search API settings or llm settings
|
||||
|
||||
Returns:
|
||||
Dictionary with state update, including sources_gathered, research_loop_count, and web_research_results
|
||||
"""
|
||||
# TODO: think about whether we should use any filtered returned results in addition to the final text answer
|
||||
response = do_onyx_search(state["search_query"])
|
||||
|
||||
text = response["text"]
|
||||
sources_gathered = []
|
||||
|
||||
return {
|
||||
"sources_gathered": sources_gathered,
|
||||
"search_query": [state["search_query"]],
|
||||
"onyx_research_result": [text],
|
||||
}
|
||||
|
||||
|
||||
def get_combined_summaries(state: OverallState, llm=None) -> str:
|
||||
if llm is None:
|
||||
_, llm = get_default_llms()
|
||||
|
||||
# Calculate tokens and trim if needed
|
||||
tokenizer = get_tokenizer(
|
||||
provider_type=llm.config.model_provider, model_name=llm.config.model_name
|
||||
)
|
||||
|
||||
# Combine summaries and check token count
|
||||
combined_summaries = "\n\n---\n\n".join(state["onyx_research_result"])
|
||||
combined_summaries = tokenizer_trim_content(
|
||||
content=combined_summaries, desired_length=10000, tokenizer=tokenizer
|
||||
)
|
||||
return combined_summaries
|
||||
|
||||
|
||||
def reflection(state: OverallState, config: RunnableConfig) -> ReflectionState:
|
||||
"""LangGraph node that identifies knowledge gaps and generates potential follow-up queries.
|
||||
|
||||
Analyzes the current summary to identify areas for further research and generates
|
||||
potential follow-up queries. Uses structured output to extract
|
||||
the follow-up query in JSON format.
|
||||
|
||||
Args:
|
||||
state: Current graph state containing the running summary and research topic
|
||||
config: Configuration for the runnable, including LLM settings
|
||||
|
||||
Returns:
|
||||
Dictionary with state update, including search_query key containing the generated follow-up query
|
||||
"""
|
||||
configurable = DeepResearchConfiguration.from_runnable_config(config)
|
||||
# Increment the research loop count and get the reasoning model
|
||||
state["research_loop_count"] = state.get("research_loop_count", 0) + 1
|
||||
|
||||
# Get the LLM to use for token counting
|
||||
primary_llm, fast_llm = get_default_llms()
|
||||
llm = primary_llm if configurable.reflection_model == "primary" else fast_llm
|
||||
combined_summaries = get_combined_summaries(state, llm)
|
||||
|
||||
# Format the prompt
|
||||
# First, collate the messages to give a historical context of the current conversation
|
||||
# Then, produce a concatenation of the onyx research results
|
||||
# Then, pass this to the reflection instructions
|
||||
# Then, the LLM will produce a JSON response with the following fields:
|
||||
# - is_sufficient: boolean indicating if the research is sufficient
|
||||
# - knowledge_gap: string describing the knowledge gap
|
||||
# - follow_up_queries: list of follow-up queries
|
||||
current_date = get_current_date()
|
||||
formatted_prompt = reflection_instructions.format(
|
||||
current_date=current_date,
|
||||
research_topic=get_research_topic(state["messages"]),
|
||||
summaries=combined_summaries,
|
||||
company_name=COMPANY_NAME,
|
||||
company_context=COMPANY_CONTEXT,
|
||||
)
|
||||
|
||||
# Get result from LLM
|
||||
result = json_to_pydantic(llm.invoke(formatted_prompt).content, Reflection)
|
||||
|
||||
# TODO: convert to pydantic here
|
||||
return {
|
||||
"is_sufficient": result.is_sufficient,
|
||||
"knowledge_gap": result.knowledge_gap,
|
||||
"follow_up_queries": result.follow_up_queries,
|
||||
"research_loop_count": state["research_loop_count"],
|
||||
"number_of_ran_queries": len(state["search_query"]),
|
||||
}
|
||||
|
||||
|
||||
def strtobool(val):
|
||||
"""Convert a string representation of truth to true (1) or false (0).
|
||||
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
|
||||
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
|
||||
'val' is anything else.
|
||||
"""
|
||||
if isinstance(val, bool):
|
||||
return val
|
||||
val = val.lower()
|
||||
if val in ("y", "yes", "t", "true", "on", "1"):
|
||||
return 1
|
||||
elif val in ("n", "no", "f", "false", "off", "0"):
|
||||
return 0
|
||||
else:
|
||||
raise ValueError("invalid truth value %r" % (val,))
|
||||
|
||||
|
||||
def evaluate_research(
|
||||
state: ReflectionState,
|
||||
config: RunnableConfig,
|
||||
) -> OverallState:
|
||||
"""LangGraph routing function that determines the next step in the research flow.
|
||||
|
||||
Controls the research loop by deciding whether to continue gathering information
|
||||
or to finalize the summary based on the configured maximum number of research loops.
|
||||
|
||||
Args:
|
||||
state: Current graph state containing the research loop count
|
||||
config: Configuration for the runnable, including max_research_loops setting
|
||||
|
||||
Returns:
|
||||
String literal indicating the next node to visit ("onyx_research" or "finalize_summary")
|
||||
"""
|
||||
configurable = DeepResearchConfiguration.from_runnable_config(config)
|
||||
max_research_loops = (
|
||||
state.get("max_research_loops")
|
||||
if state.get("max_research_loops") is not None
|
||||
else configurable.max_research_loops
|
||||
)
|
||||
if (
|
||||
strtobool(state["is_sufficient"]) is True
|
||||
or state["research_loop_count"] >= max_research_loops
|
||||
):
|
||||
return "finalize_answer"
|
||||
else:
|
||||
return [
|
||||
Send(
|
||||
"onyx_research",
|
||||
{
|
||||
"search_query": follow_up_query,
|
||||
"id": state["number_of_ran_queries"] + int(idx),
|
||||
},
|
||||
)
|
||||
for idx, follow_up_query in enumerate(state["follow_up_queries"])
|
||||
]
|
||||
|
||||
|
||||
def finalize_answer(state: OverallState, config: RunnableConfig):
|
||||
"""LangGraph node that finalizes the research summary.
|
||||
|
||||
Prepares the final result based on the onyx research results.
|
||||
|
||||
Args:
|
||||
state: Current graph state containing the running summary and sources gathered
|
||||
|
||||
Returns:
|
||||
Dictionary with state update, including running_summary key containing the formatted final summary with sources
|
||||
"""
|
||||
configurable = DeepResearchConfiguration.from_runnable_config(config)
|
||||
answer_model = state.get("answer_model") or configurable.answer_model
|
||||
|
||||
# get the LLM to generate the final answer
|
||||
primary_llm, fast_llm = get_default_llms()
|
||||
llm = primary_llm if answer_model == "primary" else fast_llm
|
||||
combined_summaries = get_combined_summaries(state, llm)
|
||||
|
||||
# Format the prompt
|
||||
current_date = get_current_date()
|
||||
formatted_prompt = answer_instructions.format(
|
||||
current_date=current_date,
|
||||
research_topic=get_research_topic(state["messages"]),
|
||||
summaries=combined_summaries,
|
||||
company_name=COMPANY_NAME,
|
||||
company_context=COMPANY_CONTEXT,
|
||||
user_context=collate_messages(state["messages"]),
|
||||
)
|
||||
|
||||
result = llm.invoke(formatted_prompt)
|
||||
unique_sources = []
|
||||
return {
|
||||
"messages": [AIMessage(content=result.content)],
|
||||
"sources_gathered": unique_sources,
|
||||
}
|
||||
|
||||
|
||||
def deep_research_graph_builder(test_mode: bool = False) -> StateGraph:
|
||||
"""
|
||||
LangGraph graph builder for deep research process.
|
||||
"""
|
||||
|
||||
graph = StateGraph(
|
||||
OverallState,
|
||||
config_schema=DeepResearchConfiguration,
|
||||
)
|
||||
|
||||
### Add nodes ###
|
||||
|
||||
graph.add_node("generate_query", generate_query)
|
||||
graph.add_node("onyx_research", onyx_research)
|
||||
graph.add_node("reflection", reflection)
|
||||
graph.add_node("finalize_answer", finalize_answer)
|
||||
|
||||
# Set the entrypoint as `generate_query`
|
||||
graph.add_edge(START, "generate_query")
|
||||
# Add conditional edge to continue with search queries in a parallel branch
|
||||
graph.add_conditional_edges(
|
||||
"generate_query", continue_to_onyx_research, ["onyx_research"]
|
||||
)
|
||||
# Reflect on the onyx research
|
||||
graph.add_edge("onyx_research", "reflection")
|
||||
# Evaluate the research
|
||||
graph.add_conditional_edges(
|
||||
"reflection", evaluate_research, ["onyx_research", "finalize_answer"]
|
||||
)
|
||||
# Finalize the answer
|
||||
graph.add_edge("finalize_answer", END)
|
||||
|
||||
return graph
|
||||
|
||||
|
||||
def translate_task_to_query(
|
||||
task: str,
|
||||
context=None,
|
||||
company_name=COMPANY_NAME,
|
||||
company_context=COMPANY_CONTEXT,
|
||||
initial_question=None,
|
||||
) -> str:
|
||||
"""
|
||||
LangGraph node that translates a task to a query.
|
||||
"""
|
||||
_, fast_llm = get_default_llms()
|
||||
|
||||
formatted_prompt = task_to_query_prompt.format(
|
||||
initial_question=initial_question,
|
||||
task=task,
|
||||
context=context,
|
||||
company_name=company_name,
|
||||
company_context=company_context,
|
||||
)
|
||||
return fast_llm.invoke(formatted_prompt).content
|
||||
|
||||
|
||||
def is_search_query(query: str) -> bool:
|
||||
terms = [
|
||||
"search",
|
||||
"query",
|
||||
"find",
|
||||
"look up",
|
||||
"look for",
|
||||
"find out",
|
||||
"find information",
|
||||
"find data",
|
||||
"find facts",
|
||||
"find statistics",
|
||||
"find trends",
|
||||
"find insights",
|
||||
"find trends",
|
||||
"find insights",
|
||||
"find trends",
|
||||
"find insights",
|
||||
"gather",
|
||||
"gather information",
|
||||
"gather data",
|
||||
"gather facts",
|
||||
"gather statistics",
|
||||
"gather trends",
|
||||
"gather insights",
|
||||
]
|
||||
query = query.lower()
|
||||
for term in terms:
|
||||
if term in query:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def execute_step(
|
||||
state: PlanExecute, config: RunnableConfig, writer: StreamWriter = lambda _: None
|
||||
):
|
||||
"""
|
||||
LangGraph node that plans the deep research process.
|
||||
"""
|
||||
graph_config = cast(GraphConfig, config["metadata"]["config"])
|
||||
question = graph_config.inputs.prompt_builder.raw_user_query
|
||||
plan = state["plan"]
|
||||
task = plan[0]
|
||||
step_count = state.get("step_count", 0) + 1
|
||||
if is_search_query(task):
|
||||
query = translate_task_to_query(
|
||||
plan[0], context=state["past_steps"], initial_question=question
|
||||
)
|
||||
write_custom_event(
|
||||
"refined_agent_answer",
|
||||
AgentAnswerPiece(
|
||||
answer_piece=" executing a search query with Onyx...",
|
||||
level=0,
|
||||
level_question_num=0,
|
||||
answer_type="agent_level_answer",
|
||||
),
|
||||
writer,
|
||||
)
|
||||
graph = deep_research_graph_builder()
|
||||
compiled_graph = graph.compile(debug=IS_DEBUG)
|
||||
# TODO: use this input_state for the deep research graph
|
||||
# input_state = DeepResearchInput(log_messages=[])
|
||||
|
||||
initial_state = {
|
||||
"messages": [HumanMessage(content=query)],
|
||||
"search_query": [],
|
||||
"onyx_research_result": [],
|
||||
"sources_gathered": [],
|
||||
"initial_search_query_count": 3, # Default value from Configuration
|
||||
"max_research_loops": 2, # State does not seem to pick up this value
|
||||
"research_loop_count": 0,
|
||||
"reasoning_model": "primary",
|
||||
}
|
||||
|
||||
result = compiled_graph.invoke(initial_state)
|
||||
|
||||
return {
|
||||
"past_steps": [(task, query, result["messages"][-1].content)],
|
||||
"step_count": step_count,
|
||||
}
|
||||
else:
|
||||
primary_llm, _ = get_default_llms()
|
||||
formatted_prompt = task_completion_prompt.format(
|
||||
task=task,
|
||||
plan=state["plan"],
|
||||
past_steps=state["past_steps"],
|
||||
company_name=COMPANY_NAME,
|
||||
company_context=COMPANY_CONTEXT,
|
||||
)
|
||||
write_custom_event(
|
||||
"refined_agent_answer",
|
||||
AgentAnswerPiece(
|
||||
answer_piece=" accomplishing the planned task...",
|
||||
level=0,
|
||||
level_question_num=0,
|
||||
answer_type="agent_level_answer",
|
||||
),
|
||||
writer,
|
||||
)
|
||||
response = primary_llm.invoke(formatted_prompt).content
|
||||
return {
|
||||
"past_steps": [(task, "task: " + task, response)],
|
||||
"step_count": step_count,
|
||||
}
|
||||
|
||||
|
||||
def plan_step(
|
||||
state: PlanExecute, config: RunnableConfig, writer: StreamWriter = lambda _: None
|
||||
):
|
||||
"""
|
||||
LangGraph node that replans the deep research process.
|
||||
"""
|
||||
graph_config = cast(GraphConfig, config["metadata"]["config"])
|
||||
question = graph_config.inputs.prompt_builder.raw_user_query
|
||||
|
||||
formatted_prompt = planner_prompt.format(
|
||||
input=question, company_name=COMPANY_NAME, company_context=COMPANY_CONTEXT
|
||||
)
|
||||
primary_llm, _ = get_default_llms()
|
||||
response = primary_llm.invoke(formatted_prompt).content
|
||||
plan = json_to_pydantic(response, Plan)
|
||||
write_custom_event(
|
||||
"refined_agent_answer",
|
||||
AgentAnswerPiece(
|
||||
answer_piece="Generating a plan to answer the user's question... ",
|
||||
level=0,
|
||||
level_question_num=0,
|
||||
answer_type="agent_level_answer",
|
||||
),
|
||||
writer,
|
||||
)
|
||||
return {"plan": plan.steps}
|
||||
|
||||
|
||||
def replan_step(
|
||||
state: PlanExecute, config: RunnableConfig, writer: StreamWriter = lambda _: None
|
||||
):
|
||||
"""
|
||||
LangGraph node that determines if the deep research process should end.
|
||||
"""
|
||||
graph_config = cast(GraphConfig, config["metadata"]["config"])
|
||||
question = graph_config.inputs.prompt_builder.raw_user_query
|
||||
|
||||
formatted_prompt = replanner_prompt.format(
|
||||
input=question,
|
||||
plan=state["plan"],
|
||||
past_steps=state["past_steps"],
|
||||
company_name=COMPANY_NAME,
|
||||
company_context=COMPANY_CONTEXT,
|
||||
)
|
||||
primary_llm, _ = get_default_llms()
|
||||
response = primary_llm.invoke(formatted_prompt).content
|
||||
output = json_to_pydantic(response, Act)
|
||||
# TODO: add a check for time limit too
|
||||
if isinstance(output.action, Response):
|
||||
# Check for canned response, if so, return the answer from the last step
|
||||
if output.action.response == "The final answer to the user's question":
|
||||
return {
|
||||
"response": state["past_steps"][-1][2],
|
||||
}
|
||||
else:
|
||||
return {"response": output.action.response}
|
||||
elif state["step_count"] >= state.get("max_steps", 5):
|
||||
return {
|
||||
"response": f"I've reached the maximum number of step, my best guess is {state['past_steps'][-1][2]}."
|
||||
}
|
||||
else:
|
||||
write_custom_event(
|
||||
"refined_agent_answer",
|
||||
AgentAnswerPiece(
|
||||
answer_piece=" moving on to the next step...",
|
||||
level=0,
|
||||
level_question_num=0,
|
||||
answer_type="agent_level_answer",
|
||||
),
|
||||
writer,
|
||||
)
|
||||
|
||||
return {"plan": output.action.steps}
|
||||
|
||||
|
||||
def should_end(
|
||||
state: PlanExecute, config: RunnableConfig, writer: StreamWriter = lambda _: None
|
||||
):
|
||||
if "response" in state and state["response"]:
|
||||
write_custom_event(
|
||||
"refined_agent_answer",
|
||||
AgentAnswerPiece(
|
||||
answer_piece=state["response"],
|
||||
level=0,
|
||||
level_question_num=0,
|
||||
answer_type="agent_level_answer",
|
||||
),
|
||||
writer,
|
||||
)
|
||||
return END
|
||||
else:
|
||||
return "agent"
|
||||
|
||||
|
||||
def deep_planner_graph_builder(test_mode: bool = False) -> StateGraph:
|
||||
"""
|
||||
LangGraph graph builder for deep planner process.
|
||||
"""
|
||||
workflow = StateGraph(PlanExecute, config_schema=DeepPlannerConfiguration)
|
||||
|
||||
# Add the plan node
|
||||
workflow.add_node("planner", plan_step)
|
||||
|
||||
# Add the execution step
|
||||
workflow.add_node("agent", execute_step)
|
||||
|
||||
# Add a replan node
|
||||
workflow.add_node("replan", replan_step)
|
||||
|
||||
workflow.add_edge(START, "planner")
|
||||
|
||||
# From plan we go to agent
|
||||
workflow.add_edge("planner", "agent")
|
||||
|
||||
# From agent, we replan
|
||||
workflow.add_edge("agent", "replan")
|
||||
|
||||
workflow.add_conditional_edges(
|
||||
"replan",
|
||||
should_end,
|
||||
["agent", END],
|
||||
)
|
||||
return workflow
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Initialize the SQLAlchemy engine first
|
||||
from onyx.db.engine import SqlEngine
|
||||
|
||||
SqlEngine.init_engine(
|
||||
pool_size=5, # You can adjust these values based on your needs
|
||||
max_overflow=10,
|
||||
app_name="graph_builder",
|
||||
)
|
||||
|
||||
# Set the debug and verbose flags for Langchain/Langgraph
|
||||
set_debug(IS_DEBUG)
|
||||
set_verbose(IS_VERBOSE)
|
||||
|
||||
# Compile the graph
|
||||
query_start_time = datetime.now()
|
||||
logger.debug(f"Start at {query_start_time}")
|
||||
graph = deep_planner_graph_builder()
|
||||
compiled_graph = graph.compile(debug=IS_DEBUG)
|
||||
query_end_time = datetime.now()
|
||||
logger.debug(f"Graph compiled in {query_end_time - query_start_time} seconds")
|
||||
|
||||
queries = [
|
||||
"What is the capital of France?",
|
||||
"What is Onyx?",
|
||||
"Who are the founders of Onyx?",
|
||||
"Who is the CEO of Onyx?",
|
||||
"Where was the CEO of Onyx born?",
|
||||
"What is the highest contract value for last month?",
|
||||
"What is the most expensive component of our technical pipeline so far?",
|
||||
"Who are top 5 competitors who are not US based?",
|
||||
"What companies should we focus on to maximize our revenue?",
|
||||
"What are some of the biggest problems for our customers and their potential solutions?",
|
||||
]
|
||||
|
||||
hard_queries = [
|
||||
"Where was the CEO of Onyx born?",
|
||||
"Who are top 5 competitors who are not US based?",
|
||||
"What companies should we focus on to maximize our revenue?",
|
||||
"What are some of the biggest problems for our customers and their potential solutions?",
|
||||
]
|
||||
|
||||
for query in hard_queries:
|
||||
# Create the initial state with all required fields
|
||||
initial_state = {
|
||||
"input": [HumanMessage(content=query)],
|
||||
"plan": [],
|
||||
"past_steps": [],
|
||||
"response": "",
|
||||
"max_steps": 10,
|
||||
"step_count": 0,
|
||||
}
|
||||
|
||||
result = compiled_graph.invoke(initial_state)
|
||||
print("Max planning loops: ", result["max_steps"])
|
||||
print("Steps: ", result["step_count"])
|
||||
print("Past steps: ")
|
||||
pprint(result["past_steps"], indent=4)
|
||||
print("Question: ", query)
|
||||
print("Answer: ", result["response"])
|
||||
print("--------------------------------")
|
||||
# from pdb import set_trace
|
||||
# set_trace()
|
||||
228
backend/onyx/agents/agent_search/deep_research/prompts.py
Normal file
228
backend/onyx/agents/agent_search/deep_research/prompts.py
Normal file
@@ -0,0 +1,228 @@
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
# Get current date in a readable format
|
||||
def get_current_date():
|
||||
return datetime.now().strftime("%B %d, %Y")
|
||||
|
||||
|
||||
COMPANY_NAME = "Onyx"
|
||||
|
||||
COMPANY_CONTEXT = """
|
||||
Our company is Onyx, a startup founded by Yuhong Sun and Chris Weaver. Onyx is a startup that provides a platform for
|
||||
automated research and analysis using AI and LLMss. The current CEO of Onyx is Yuhong Sun. The company is based in San
|
||||
Francisco, CA.
|
||||
""" # noqa: W291
|
||||
|
||||
query_writer_instructions = """Your goal is to generate sophisticated and diverse search queries for an internal search
|
||||
tool. These queries are intended for an advanced automated research tool capable of analyzing complex results and synthesizing
|
||||
information. The search tool has access to internal documents and information for {company_name}.
|
||||
|
||||
Here is some context about our company:
|
||||
{company_context}
|
||||
|
||||
Instructions:
|
||||
- Always prefer a single search query, only add another query if the original question requests multiple aspects or elements
|
||||
and one query is not enough.
|
||||
- Each query should focus on one specific aspect of the original question.
|
||||
- Don't produce more than {number_queries} queries.
|
||||
- Queries should be diverse, if the topic is broad, generate more than 1 query.
|
||||
- Don't generate multiple similar queries, 1 is enough.
|
||||
- Query should ensure that the most current information is gathered. The current date is {current_date}.
|
||||
- Query should be concise and contain relevant information to the company, the task, and the context.
|
||||
- Unless the task is general, the query should be specific to the company, the task, and the context.
|
||||
|
||||
Format:
|
||||
- Format your response as a JSON object with ALL three of these exact keys:
|
||||
- "rationale": Brief explanation of why these queries are relevant
|
||||
- "query": A list of search queries
|
||||
|
||||
Example:
|
||||
|
||||
Topic: What revenue grew more last year apple stock or the number of people buying an iphone
|
||||
```json
|
||||
{{
|
||||
"rationale": "To answer this comparative growth question accurately, we need specific data points on Apple's stock
|
||||
performance and iPhone sales metrics. These queries target the precise financial information needed: company revenue
|
||||
trends, product-specific unit sales figures, and stock price movement over the same fiscal period for direct comparison.",
|
||||
"query": [
|
||||
"Apple total revenue growth fiscal year 2024",
|
||||
"iPhone unit sales growth fiscal year 2024",
|
||||
"Apple stock price growth fiscal year 2024",
|
||||
],
|
||||
}}
|
||||
```
|
||||
|
||||
Context: {user_context}""" # noqa: W291
|
||||
|
||||
|
||||
reflection_instructions = """You are an expert research assistant analyzing summaries about "{research_topic}" for {company_name}.
|
||||
|
||||
Here is some context about our company:
|
||||
{company_context}
|
||||
|
||||
Instructions:
|
||||
- Identify knowledge gaps or areas that need deeper exploration and generate a follow-up query. (1 or multiple).
|
||||
- If provided summaries are sufficient to answer the user's question, don't generate a follow-up query.
|
||||
- If there is a knowledge gap, generate a follow-up query that would help expand your understanding.
|
||||
- Focus on technical details, implementation specifics, or emerging trends that weren't fully covered.
|
||||
|
||||
Requirements:
|
||||
- Ensure the follow-up query is self-contained and includes necessary context for onyx search, an internal search tool, that
|
||||
has access to internal documents and information.
|
||||
|
||||
Output Format:
|
||||
- Format your response as a JSON object with these exact keys:
|
||||
- "is_sufficient": true or false
|
||||
- "knowledge_gap": Describe what information is missing or needs clarification
|
||||
- "follow_up_queries": Write a specific question to address this gap
|
||||
|
||||
Example:
|
||||
```json
|
||||
{{
|
||||
"is_sufficient": true, // or false
|
||||
"knowledge_gap": "The summary lacks information about performance benchmarks", // "" if is_sufficient is true
|
||||
"follow_up_queries":
|
||||
["What are typical performance benchmarks used to evaluate [specific technology]?"] // [] if is_sufficient is true
|
||||
}}
|
||||
```
|
||||
|
||||
Reflect carefully on the Summaries to identify knowledge gaps and produce a follow-up query.
|
||||
Then, produce your output following this JSON format:
|
||||
|
||||
Summaries:
|
||||
{summaries}
|
||||
"""
|
||||
|
||||
answer_instructions = """You are an expert research assistant analyzing summaries about "{research_topic}" for {company_name}.
|
||||
|
||||
Here is some context about our company:
|
||||
{company_context}
|
||||
|
||||
Generate a high-quality answer to the user's question based on the provided summaries.
|
||||
|
||||
Instructions:
|
||||
- The current date is {current_date}.
|
||||
- You are the final step of a multi-step research process, don't mention that you are the final step.
|
||||
- You have access to all the information gathered from the previous steps.
|
||||
- You have access to the user's question.
|
||||
- Generate a high-quality answer to the user's question based on the provided summaries and the user's question.
|
||||
- You MUST include all the citations from the summaries in the answer correctly.
|
||||
- Only use the information from the summaries to answer the question, do not make up information or speculate.
|
||||
|
||||
User Context:
|
||||
- {user_context}
|
||||
|
||||
Summaries:
|
||||
{summaries}"""
|
||||
|
||||
planner_prompt = """You are an expert research assistant for {company_name}.
|
||||
|
||||
Here is some context about our company:
|
||||
{company_context}
|
||||
|
||||
For this given objective, come up with a simple step by step plan.
|
||||
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps.
|
||||
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.
|
||||
|
||||
Example:
|
||||
Input: What's the population of the city or town where the CEO of Onyx was born?
|
||||
```json
|
||||
{{
|
||||
"steps": ["Step 1: Search for the CEO of Onyx", "Step 2: Search for the birthplace of the CEO", "Step 3: Find the population of that town or city"]
|
||||
}}
|
||||
```
|
||||
|
||||
Your objective was this:
|
||||
{input}
|
||||
""" # noqa: E501
|
||||
|
||||
|
||||
replanner_prompt = """You are an expert research assistant for {company_name}.
|
||||
|
||||
Here is some context about our company:
|
||||
{company_context}
|
||||
|
||||
For the given objective, come up with a simple step by step plan.
|
||||
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps.
|
||||
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.
|
||||
|
||||
Your objective was this:
|
||||
{input}
|
||||
|
||||
Your original plan was this:
|
||||
{plan}
|
||||
|
||||
You have currently done the follow steps:
|
||||
{past_steps}
|
||||
|
||||
Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan.
|
||||
|
||||
Plan structure:
|
||||
{{
|
||||
"steps": ["Step 1", "Step 2", "Step 3]
|
||||
}}
|
||||
|
||||
Response structure:
|
||||
{{
|
||||
"response": "The final answer to the user's question"
|
||||
}}
|
||||
|
||||
Example Output structure with the plan:
|
||||
{{
|
||||
"action": {{
|
||||
"steps": ["Step 1", "Step 2", "Step 3"]
|
||||
}}
|
||||
}}
|
||||
|
||||
Example Output structure with the response:
|
||||
{{
|
||||
"action": {{
|
||||
"response": "The final answer to the user's question" // This should be the answer from the last step
|
||||
}}
|
||||
}}
|
||||
|
||||
Output:
|
||||
{{
|
||||
"action": object // "Response" | "Plan"
|
||||
}}
|
||||
""" # noqa: E501
|
||||
|
||||
|
||||
task_to_query_prompt = """
|
||||
You are an expert research assistant for {company_name}.
|
||||
|
||||
Here is some context about our company:
|
||||
{company_context}
|
||||
|
||||
Given the task and and the context, generate a query to search for the information to answer the task. Return a single query text,
|
||||
make sure that the query is concise and contain relevant information to the company, the task, and the context.
|
||||
|
||||
Initial question: {initial_question}
|
||||
|
||||
Task: {task}
|
||||
|
||||
Information that we have also gathered so far: {context}
|
||||
|
||||
Query:
|
||||
"""
|
||||
|
||||
|
||||
task_completion_prompt = """You are an expert research assistant for {company_name}.
|
||||
|
||||
Here is some context about our company:
|
||||
{company_context}
|
||||
|
||||
For the given task, try to accomplish it in a thorough and return well formed answer.
|
||||
|
||||
Your current task is this:
|
||||
{task}
|
||||
|
||||
Your original plan was this:
|
||||
{plan}
|
||||
|
||||
Here is the context of the tasks:
|
||||
{past_steps}
|
||||
|
||||
Answer:
|
||||
""" # noqa: E501
|
||||
64
backend/onyx/agents/agent_search/deep_research/states.py
Normal file
64
backend/onyx/agents/agent_search/deep_research/states.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import operator
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import field
|
||||
from typing import Annotated
|
||||
from typing import List
|
||||
from typing import Tuple
|
||||
from typing import TypedDict
|
||||
|
||||
from langgraph.graph import add_messages
|
||||
|
||||
from onyx.agents.agent_search.core_state import CoreState
|
||||
|
||||
|
||||
class OverallState(TypedDict):
|
||||
messages: Annotated[list, add_messages]
|
||||
search_query: Annotated[list, operator.add]
|
||||
onyx_research_result: Annotated[list, operator.add]
|
||||
sources_gathered: Annotated[list, operator.add]
|
||||
initial_search_query_count: int
|
||||
max_research_loops: int
|
||||
research_loop_count: int
|
||||
reasoning_model: str
|
||||
|
||||
|
||||
class ReflectionState(TypedDict):
|
||||
is_sufficient: bool
|
||||
knowledge_gap: str
|
||||
follow_up_queries: Annotated[list, operator.add]
|
||||
research_loop_count: int
|
||||
number_of_ran_queries: int
|
||||
|
||||
|
||||
class Query(TypedDict):
|
||||
query: str
|
||||
rationale: str
|
||||
|
||||
|
||||
class QueryGenerationState(TypedDict):
|
||||
query_list: list[Query]
|
||||
|
||||
|
||||
class OnyxSearchState(TypedDict):
|
||||
search_query: str
|
||||
id: str
|
||||
|
||||
|
||||
class PlanExecute(TypedDict):
|
||||
input: str
|
||||
plan: List[str]
|
||||
past_steps: Annotated[List[Tuple], operator.add]
|
||||
response: str
|
||||
max_steps: int
|
||||
step_count: int
|
||||
|
||||
|
||||
@dataclass(kw_only=True)
|
||||
class SearchStateOutput:
|
||||
running_summary: str = field(default=None) # Final report
|
||||
|
||||
|
||||
class DeepResearchInput(CoreState):
|
||||
pass
|
||||
@@ -0,0 +1,94 @@
|
||||
import json
|
||||
from typing import List
|
||||
from typing import Type
|
||||
from typing import TypeVar
|
||||
from typing import Union
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import Field
|
||||
from pydantic import ValidationError
|
||||
|
||||
T = TypeVar("T", bound=BaseModel)
|
||||
|
||||
|
||||
class SearchQueryList(BaseModel):
|
||||
query: List[str] = Field(
|
||||
description="A list of search queries to be used for Onyx research."
|
||||
)
|
||||
rationale: str = Field(
|
||||
description="A brief explanation of why these queries are relevant to the research topic."
|
||||
)
|
||||
|
||||
|
||||
class Reflection(BaseModel):
|
||||
is_sufficient: bool = Field(
|
||||
description="Whether the provided summaries are sufficient to answer the user's question."
|
||||
)
|
||||
knowledge_gap: str = Field(
|
||||
description="A description of what information is missing or needs clarification."
|
||||
)
|
||||
follow_up_queries: List[str] = Field(
|
||||
description="A list of follow-up queries to address the knowledge gap."
|
||||
)
|
||||
|
||||
|
||||
class Plan(BaseModel):
|
||||
"""Plan to follow in future"""
|
||||
|
||||
steps: List[str] = Field(
|
||||
description="different steps to follow, should be in sorted order"
|
||||
)
|
||||
|
||||
|
||||
class Response(BaseModel):
|
||||
"""Response to user."""
|
||||
|
||||
response: str
|
||||
|
||||
|
||||
class Act(BaseModel):
|
||||
"""Action to perform."""
|
||||
|
||||
action: Union[Response, Plan] = Field(
|
||||
description="Action to perform. If you want to respond to user, use Response. "
|
||||
"If you need to further use tools to get the answer, use Plan."
|
||||
)
|
||||
|
||||
|
||||
def json_to_pydantic(json_string: str, pydantic_class: Type[T]) -> T:
|
||||
"""
|
||||
Convert a JSON string to a Pydantic model instance.
|
||||
|
||||
Args:
|
||||
json_string: JSON string to parse
|
||||
pydantic_class: Pydantic model class to instantiate
|
||||
|
||||
Returns:
|
||||
Instance of the pydantic_class
|
||||
|
||||
Raises:
|
||||
json.JSONDecodeError: If json_string is invalid JSON
|
||||
ValidationError: If JSON data doesn't match the Pydantic model schema
|
||||
TypeError: If pydantic_class is not a Pydantic model
|
||||
"""
|
||||
# Validate that the class is a Pydantic model
|
||||
if not (isinstance(pydantic_class, type) and issubclass(pydantic_class, BaseModel)):
|
||||
raise TypeError(
|
||||
f"pydantic_class must be a Pydantic BaseModel subclass, got {type(pydantic_class)}"
|
||||
)
|
||||
|
||||
artifacts = ["json", "```"]
|
||||
json_string = json_string.replace(artifacts[0], "").replace(artifacts[1], "")
|
||||
|
||||
# Parse JSON string to dictionary
|
||||
try:
|
||||
data = json.loads(json_string)
|
||||
except json.JSONDecodeError as e:
|
||||
raise json.JSONDecodeError(f"Invalid JSON string: {e.msg}", e.doc, e.pos)
|
||||
|
||||
# Create and validate Pydantic model instance
|
||||
try:
|
||||
return pydantic_class.model_validate(data)
|
||||
except ValidationError as e:
|
||||
print(f"JSON data doesn't match {pydantic_class.__name__} schema: {e}")
|
||||
raise e
|
||||
40
backend/onyx/agents/agent_search/deep_research/utils.py
Normal file
40
backend/onyx/agents/agent_search/deep_research/utils.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from typing import List
|
||||
|
||||
from langchain_core.messages import AIMessage
|
||||
from langchain_core.messages import AnyMessage
|
||||
from langchain_core.messages import HumanMessage
|
||||
|
||||
from onyx.llm.factory import get_default_llms
|
||||
|
||||
|
||||
def collate_messages(messages: List[AnyMessage]) -> str:
|
||||
"""
|
||||
Collate the messages into a single string.
|
||||
"""
|
||||
# check if request has a history and combine the messages into a single string
|
||||
if len(messages) == 1:
|
||||
research_topic = messages[-1].content
|
||||
else:
|
||||
research_topic = ""
|
||||
for message in messages:
|
||||
if isinstance(message, HumanMessage):
|
||||
research_topic += f"User: {message.content}\n"
|
||||
elif isinstance(message, AIMessage):
|
||||
research_topic += f"Assistant: {message.content}\n"
|
||||
return research_topic
|
||||
|
||||
|
||||
def get_research_topic(messages: list[AnyMessage]) -> str:
|
||||
"""
|
||||
Get the research topic from the messages.
|
||||
"""
|
||||
_, fast_llm = get_default_llms()
|
||||
prompt = """You are a helpful assistant that summarizes the conversation history.
|
||||
The conversation history is as follows:
|
||||
{messages}
|
||||
|
||||
Please summarize the conversation history in a single research topic.
|
||||
"""
|
||||
collated_messages = collate_messages(messages)
|
||||
llm_response = fast_llm.invoke(prompt.format(messages=collated_messages))
|
||||
return llm_response.content
|
||||
@@ -12,6 +12,13 @@ from onyx.agents.agent_search.dc_search_analysis.graph_builder import (
|
||||
divide_and_conquer_graph_builder,
|
||||
)
|
||||
from onyx.agents.agent_search.dc_search_analysis.states import MainInput as DCMainInput
|
||||
from onyx.agents.agent_search.deep_research.graph_builder import (
|
||||
deep_planner_graph_builder,
|
||||
)
|
||||
from onyx.agents.agent_search.deep_research.graph_builder import (
|
||||
deep_research_graph_builder,
|
||||
)
|
||||
from onyx.agents.agent_search.deep_research.states import DeepResearchInput
|
||||
from onyx.agents.agent_search.deep_search.main.graph_builder import (
|
||||
agent_search_graph_builder as agent_search_graph_builder,
|
||||
)
|
||||
@@ -159,6 +166,27 @@ def run_dc_graph(
|
||||
return run_graph(compiled_graph, config, input)
|
||||
|
||||
|
||||
def run_deepresearch_graph(
|
||||
config: GraphConfig,
|
||||
) -> AnswerStream:
|
||||
graph = deep_research_graph_builder()
|
||||
compiled_graph = graph.compile()
|
||||
input = DeepResearchInput(log_messages=[])
|
||||
config.inputs.prompt_builder.raw_user_query = (
|
||||
config.inputs.prompt_builder.raw_user_query.strip()
|
||||
)
|
||||
return run_graph(compiled_graph, config, input)
|
||||
|
||||
|
||||
def run_deepplanner_graph(
|
||||
config: GraphConfig,
|
||||
) -> AnswerStream:
|
||||
graph = deep_planner_graph_builder()
|
||||
compiled_graph = graph.compile()
|
||||
input = DeepResearchInput(log_messages=[])
|
||||
return run_graph(compiled_graph, config, input)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
for _ in range(1):
|
||||
query_start_time = datetime.now()
|
||||
|
||||
@@ -11,7 +11,7 @@ from onyx.agents.agent_search.models import GraphSearchConfig
|
||||
from onyx.agents.agent_search.models import GraphTooling
|
||||
from onyx.agents.agent_search.run_graph import run_agent_search_graph
|
||||
from onyx.agents.agent_search.run_graph import run_basic_graph
|
||||
from onyx.agents.agent_search.run_graph import run_dc_graph
|
||||
from onyx.agents.agent_search.run_graph import run_deepplanner_graph
|
||||
from onyx.chat.models import AgentAnswerPiece
|
||||
from onyx.chat.models import AnswerPacket
|
||||
from onyx.chat.models import AnswerStream
|
||||
@@ -142,7 +142,7 @@ class Answer:
|
||||
"DivCon Beta Agent"
|
||||
)
|
||||
):
|
||||
run_langgraph = run_dc_graph
|
||||
run_langgraph = run_deepplanner_graph
|
||||
else:
|
||||
run_langgraph = run_basic_graph
|
||||
|
||||
|
||||
Reference in New Issue
Block a user