Compare commits

...

14 Commits

Author SHA1 Message Date
trial-danswer
8b610581be Generating answers to FE 2025-06-06 17:02:05 -07:00
trial-danswer
6b1c2368f6 Improved task differentiation and prompting 2025-06-06 15:33:50 -07:00
trial-danswer
77950e4c38 Updated working agent 2025-06-06 14:53:05 -07:00
trial-danswer
bccf033967 Fixed prompts 2025-06-06 13:22:32 -07:00
trial-danswer
8decd51862 Implemented deep planner 2025-06-06 12:37:13 -07:00
trial-danswer
86fc5edf91 Provided company context 2025-06-06 10:35:53 -07:00
trial-danswer
efe0d0172c Added error handling exception, longer research loop and more robust check 2025-06-06 10:05:54 -07:00
trial-danswer
c7533690b3 Updated 2025-06-05 16:42:28 -07:00
trial-danswer
ac9d0eb7fc Updated graph builder 2025-06-05 16:24:17 -07:00
trial-danswer
d20194051b Using Onyx search 2025-06-05 16:03:04 -07:00
trial-danswer
1acbac059b Updated graphbuilder 2025-06-05 15:23:24 -07:00
trial-danswer
f8cffc6009 Added main invoke 2025-06-05 13:38:47 -07:00
trial-danswer
3202ca6c96 Added basic deep research 2025-06-05 12:37:54 -07:00
trial-danswer
15dadd0d3b Basic deep research agent 2025-06-05 12:28:38 -07:00
8 changed files with 1277 additions and 2 deletions

View File

@@ -0,0 +1,71 @@
import os
from typing import Any
from typing import Optional
from langchain_core.runnables import RunnableConfig
from pydantic import BaseModel
from pydantic import Field
class DeepResearchConfiguration(BaseModel):
"""The configuration for the deep research agent."""
query_generator_model: str = Field(
default="primary",
metadata={
"description": "The name of the language model to use for the agent's query generation."
},
)
reflection_model: str = Field(
default="primary",
metadata={
"description": "The name of the language model to use for the agent's reflection."
},
)
answer_model: str = Field(
default="primary",
metadata={
"description": "The name of the language model to use for the agent's answer."
},
)
number_of_initial_queries: int = Field(
default=3,
metadata={"description": "The number of initial search queries to generate."},
)
max_research_loops: int = Field(
default=2,
metadata={"description": "The maximum number of research loops to perform."},
)
@classmethod
def from_runnable_config(
cls, config: Optional[RunnableConfig] = None
) -> "DeepResearchConfiguration":
"""Create a Configuration instance from a RunnableConfig."""
configurable = (
config["configurable"] if config and "configurable" in config else {}
)
# Get raw values from environment or config
raw_values: dict[str, Any] = {
name: os.environ.get(name.upper(), configurable.get(name))
for name in cls.model_fields.keys()
}
# Filter out None values
values = {k: v for k, v in raw_values.items() if v is not None}
return cls(**values)
class DeepPlannerConfiguration(BaseModel):
"""The configuration for the deep planner agent."""
max_steps: int = Field(
default=10,
metadata={"description": "The maximum number of steps to perform."},
)

View File

@@ -0,0 +1,750 @@
import random
from datetime import datetime
from json import JSONDecodeError
from pprint import pprint
from typing import cast
from langchain.globals import set_debug
from langchain.globals import set_verbose
from langchain_core.messages import AIMessage
from langchain_core.messages import HumanMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END
from langgraph.graph import START
from langgraph.graph import StateGraph
from langgraph.types import Send
from langgraph.types import StreamWriter
from onyx.agents.agent_search.deep_research.configuration import (
DeepPlannerConfiguration,
)
from onyx.agents.agent_search.deep_research.configuration import (
DeepResearchConfiguration,
)
from onyx.agents.agent_search.deep_research.prompts import answer_instructions
from onyx.agents.agent_search.deep_research.prompts import COMPANY_CONTEXT
from onyx.agents.agent_search.deep_research.prompts import COMPANY_NAME
from onyx.agents.agent_search.deep_research.prompts import get_current_date
from onyx.agents.agent_search.deep_research.prompts import planner_prompt
from onyx.agents.agent_search.deep_research.prompts import query_writer_instructions
from onyx.agents.agent_search.deep_research.prompts import reflection_instructions
from onyx.agents.agent_search.deep_research.prompts import replanner_prompt
from onyx.agents.agent_search.deep_research.prompts import task_completion_prompt
from onyx.agents.agent_search.deep_research.prompts import task_to_query_prompt
from onyx.agents.agent_search.deep_research.states import OnyxSearchState
from onyx.agents.agent_search.deep_research.states import OverallState
from onyx.agents.agent_search.deep_research.states import PlanExecute
from onyx.agents.agent_search.deep_research.states import QueryGenerationState
from onyx.agents.agent_search.deep_research.states import ReflectionState
from onyx.agents.agent_search.deep_research.tools_and_schemas import Act
from onyx.agents.agent_search.deep_research.tools_and_schemas import json_to_pydantic
from onyx.agents.agent_search.deep_research.tools_and_schemas import Plan
from onyx.agents.agent_search.deep_research.tools_and_schemas import Reflection
from onyx.agents.agent_search.deep_research.tools_and_schemas import Response
from onyx.agents.agent_search.deep_research.tools_and_schemas import SearchQueryList
from onyx.agents.agent_search.deep_research.utils import collate_messages
from onyx.agents.agent_search.deep_research.utils import get_research_topic
from onyx.agents.agent_search.models import GraphConfig
from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event
from onyx.chat.models import AgentAnswerPiece
from onyx.chat.models import AnswerStyleConfig
from onyx.chat.models import CitationConfig
from onyx.chat.models import DocumentPruningConfig
from onyx.chat.models import PromptConfig
from onyx.context.search.enums import LLMEvaluationType
from onyx.context.search.models import InferenceSection
from onyx.db.engine import get_session_with_current_tenant
from onyx.db.models import Persona
from onyx.llm.factory import get_default_llms
from onyx.natural_language_processing.utils import get_tokenizer
from onyx.natural_language_processing.utils import tokenizer_trim_content
from onyx.tools.models import SearchToolOverrideKwargs
from onyx.tools.tool_implementations.search.search_tool import (
SEARCH_RESPONSE_SUMMARY_ID,
)
from onyx.tools.tool_implementations.search.search_tool import SearchResponseSummary
from onyx.tools.tool_implementations.search.search_tool import SearchTool
from onyx.utils.logger import setup_logger
logger = setup_logger()
test_mode = False
IS_DEBUG = False
IS_VERBOSE = False
MAX_RETRIEVED_DOCS = 10
def mock_do_onyx_search(query: str) -> str:
random_answers = [
"Onyx is a startup founded by Yuhong Sun and Chris Weaver.",
"Chris Weaver was born in the country of Wakanda",
"Yuhong Sun is the CEO of Onyx",
"Yuhong Sun was born in the country of Valhalla",
]
return {"text": random.choice(random_answers)}
def do_onyx_search(query: str) -> dict[str, str]:
"""
Perform a search using the SearchTool and return the results.
Args:
query: The search query string
Returns:
Dictionary containing the search results text
"""
retrieved_docs: list[InferenceSection] = []
primary_llm, fast_llm = get_default_llms()
try:
with get_session_with_current_tenant() as db_session:
# Create a default persona with basic settings
default_persona = Persona(
name="default",
chunks_above=2,
chunks_below=2,
description="Default persona for search",
)
search_tool = SearchTool(
db_session=db_session,
user=None,
persona=default_persona,
retrieval_options=None,
prompt_config=PromptConfig(
system_prompt="You are a helpful assistant.",
task_prompt="Answer the user's question based on the provided context.",
datetime_aware=True,
include_citations=True,
),
llm=primary_llm,
fast_llm=fast_llm,
pruning_config=DocumentPruningConfig(),
answer_style_config=AnswerStyleConfig(
citation_config=CitationConfig(
include_citations=True, citation_style="inline"
)
),
evaluation_type=LLMEvaluationType.SKIP,
)
for tool_response in search_tool.run(
query=query,
override_kwargs=SearchToolOverrideKwargs(
force_no_rerank=True,
alternate_db_session=db_session,
retrieved_sections_callback=None,
skip_query_analysis=False,
),
):
if tool_response.id == SEARCH_RESPONSE_SUMMARY_ID:
response = cast(SearchResponseSummary, tool_response.response)
retrieved_docs = response.top_sections
break
# Combine the retrieved documents into a single text
combined_text = "\n\n".join(
[doc.combined_content for doc in retrieved_docs[:MAX_RETRIEVED_DOCS]]
)
return {"text": combined_text}
except Exception as e:
logger.error(f"Error in do_onyx_search: {e}")
return {"text": "Error in search, no results returned"}
def generate_query(state: OverallState, config: RunnableConfig) -> QueryGenerationState:
"""
LangGraph node that generates a search queries based on the User's question.
Uses an LLM to create an optimized search query for onyx research based on
the User's question.
Args:
state: Current graph state containing the User's question
config: Configuration for the runnable
Returns:
Dictionary with state update, including search_query key containing the generated query
"""
configurable = DeepResearchConfiguration.from_runnable_config(config)
# check for custom initial search query count
if state.get("initial_search_query_count") is None:
state["initial_search_query_count"] = configurable.number_of_initial_queries
primary_llm, fast_llm = get_default_llms()
llm = primary_llm if configurable.query_generator_model == "primary" else fast_llm
# Format the prompt
current_date = get_current_date()
formatted_prompt = query_writer_instructions.format(
current_date=current_date,
research_topic=get_research_topic(state["messages"]),
number_queries=state["initial_search_query_count"],
company_name=COMPANY_NAME,
company_context=COMPANY_CONTEXT,
user_context=collate_messages(state["messages"]),
)
# Get the LLM response and extract its content
llm_response = llm.invoke(formatted_prompt)
try:
result = json_to_pydantic(llm_response.content, SearchQueryList)
return {"query_list": result.query}
except JSONDecodeError:
return {"query_list": [llm_response.content]}
def continue_to_onyx_research(state: QueryGenerationState) -> OverallState:
"""
LangGraph node that sends the search queries to the onyx research node.
This is used to spawn n number of onyx research nodes, one for each search query.
"""
return [
Send("onyx_research", {"search_query": search_query, "id": int(idx)})
for idx, search_query in enumerate(state["query_list"])
]
def onyx_research(state: OnyxSearchState, config: RunnableConfig) -> OverallState:
"""LangGraph node that performs onyx research using onyx search interface.
Executes an onyx search in combination with an llm.
Args:
state: Current graph state containing the search query and research loop count
config: Configuration for the runnable, including any search API settings or llm settings
Returns:
Dictionary with state update, including sources_gathered, research_loop_count, and web_research_results
"""
# TODO: think about whether we should use any filtered returned results in addition to the final text answer
response = do_onyx_search(state["search_query"])
text = response["text"]
sources_gathered = []
return {
"sources_gathered": sources_gathered,
"search_query": [state["search_query"]],
"onyx_research_result": [text],
}
def get_combined_summaries(state: OverallState, llm=None) -> str:
if llm is None:
_, llm = get_default_llms()
# Calculate tokens and trim if needed
tokenizer = get_tokenizer(
provider_type=llm.config.model_provider, model_name=llm.config.model_name
)
# Combine summaries and check token count
combined_summaries = "\n\n---\n\n".join(state["onyx_research_result"])
combined_summaries = tokenizer_trim_content(
content=combined_summaries, desired_length=10000, tokenizer=tokenizer
)
return combined_summaries
def reflection(state: OverallState, config: RunnableConfig) -> ReflectionState:
"""LangGraph node that identifies knowledge gaps and generates potential follow-up queries.
Analyzes the current summary to identify areas for further research and generates
potential follow-up queries. Uses structured output to extract
the follow-up query in JSON format.
Args:
state: Current graph state containing the running summary and research topic
config: Configuration for the runnable, including LLM settings
Returns:
Dictionary with state update, including search_query key containing the generated follow-up query
"""
configurable = DeepResearchConfiguration.from_runnable_config(config)
# Increment the research loop count and get the reasoning model
state["research_loop_count"] = state.get("research_loop_count", 0) + 1
# Get the LLM to use for token counting
primary_llm, fast_llm = get_default_llms()
llm = primary_llm if configurable.reflection_model == "primary" else fast_llm
combined_summaries = get_combined_summaries(state, llm)
# Format the prompt
# First, collate the messages to give a historical context of the current conversation
# Then, produce a concatenation of the onyx research results
# Then, pass this to the reflection instructions
# Then, the LLM will produce a JSON response with the following fields:
# - is_sufficient: boolean indicating if the research is sufficient
# - knowledge_gap: string describing the knowledge gap
# - follow_up_queries: list of follow-up queries
current_date = get_current_date()
formatted_prompt = reflection_instructions.format(
current_date=current_date,
research_topic=get_research_topic(state["messages"]),
summaries=combined_summaries,
company_name=COMPANY_NAME,
company_context=COMPANY_CONTEXT,
)
# Get result from LLM
result = json_to_pydantic(llm.invoke(formatted_prompt).content, Reflection)
# TODO: convert to pydantic here
return {
"is_sufficient": result.is_sufficient,
"knowledge_gap": result.knowledge_gap,
"follow_up_queries": result.follow_up_queries,
"research_loop_count": state["research_loop_count"],
"number_of_ran_queries": len(state["search_query"]),
}
def strtobool(val):
"""Convert a string representation of truth to true (1) or false (0).
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
'val' is anything else.
"""
if isinstance(val, bool):
return val
val = val.lower()
if val in ("y", "yes", "t", "true", "on", "1"):
return 1
elif val in ("n", "no", "f", "false", "off", "0"):
return 0
else:
raise ValueError("invalid truth value %r" % (val,))
def evaluate_research(
state: ReflectionState,
config: RunnableConfig,
) -> OverallState:
"""LangGraph routing function that determines the next step in the research flow.
Controls the research loop by deciding whether to continue gathering information
or to finalize the summary based on the configured maximum number of research loops.
Args:
state: Current graph state containing the research loop count
config: Configuration for the runnable, including max_research_loops setting
Returns:
String literal indicating the next node to visit ("onyx_research" or "finalize_summary")
"""
configurable = DeepResearchConfiguration.from_runnable_config(config)
max_research_loops = (
state.get("max_research_loops")
if state.get("max_research_loops") is not None
else configurable.max_research_loops
)
if (
strtobool(state["is_sufficient"]) is True
or state["research_loop_count"] >= max_research_loops
):
return "finalize_answer"
else:
return [
Send(
"onyx_research",
{
"search_query": follow_up_query,
"id": state["number_of_ran_queries"] + int(idx),
},
)
for idx, follow_up_query in enumerate(state["follow_up_queries"])
]
def finalize_answer(state: OverallState, config: RunnableConfig):
"""LangGraph node that finalizes the research summary.
Prepares the final result based on the onyx research results.
Args:
state: Current graph state containing the running summary and sources gathered
Returns:
Dictionary with state update, including running_summary key containing the formatted final summary with sources
"""
configurable = DeepResearchConfiguration.from_runnable_config(config)
answer_model = state.get("answer_model") or configurable.answer_model
# get the LLM to generate the final answer
primary_llm, fast_llm = get_default_llms()
llm = primary_llm if answer_model == "primary" else fast_llm
combined_summaries = get_combined_summaries(state, llm)
# Format the prompt
current_date = get_current_date()
formatted_prompt = answer_instructions.format(
current_date=current_date,
research_topic=get_research_topic(state["messages"]),
summaries=combined_summaries,
company_name=COMPANY_NAME,
company_context=COMPANY_CONTEXT,
user_context=collate_messages(state["messages"]),
)
result = llm.invoke(formatted_prompt)
unique_sources = []
return {
"messages": [AIMessage(content=result.content)],
"sources_gathered": unique_sources,
}
def deep_research_graph_builder(test_mode: bool = False) -> StateGraph:
"""
LangGraph graph builder for deep research process.
"""
graph = StateGraph(
OverallState,
config_schema=DeepResearchConfiguration,
)
### Add nodes ###
graph.add_node("generate_query", generate_query)
graph.add_node("onyx_research", onyx_research)
graph.add_node("reflection", reflection)
graph.add_node("finalize_answer", finalize_answer)
# Set the entrypoint as `generate_query`
graph.add_edge(START, "generate_query")
# Add conditional edge to continue with search queries in a parallel branch
graph.add_conditional_edges(
"generate_query", continue_to_onyx_research, ["onyx_research"]
)
# Reflect on the onyx research
graph.add_edge("onyx_research", "reflection")
# Evaluate the research
graph.add_conditional_edges(
"reflection", evaluate_research, ["onyx_research", "finalize_answer"]
)
# Finalize the answer
graph.add_edge("finalize_answer", END)
return graph
def translate_task_to_query(
task: str,
context=None,
company_name=COMPANY_NAME,
company_context=COMPANY_CONTEXT,
initial_question=None,
) -> str:
"""
LangGraph node that translates a task to a query.
"""
_, fast_llm = get_default_llms()
formatted_prompt = task_to_query_prompt.format(
initial_question=initial_question,
task=task,
context=context,
company_name=company_name,
company_context=company_context,
)
return fast_llm.invoke(formatted_prompt).content
def is_search_query(query: str) -> bool:
terms = [
"search",
"query",
"find",
"look up",
"look for",
"find out",
"find information",
"find data",
"find facts",
"find statistics",
"find trends",
"find insights",
"find trends",
"find insights",
"find trends",
"find insights",
"gather",
"gather information",
"gather data",
"gather facts",
"gather statistics",
"gather trends",
"gather insights",
]
query = query.lower()
for term in terms:
if term in query:
return True
return False
def execute_step(
state: PlanExecute, config: RunnableConfig, writer: StreamWriter = lambda _: None
):
"""
LangGraph node that plans the deep research process.
"""
graph_config = cast(GraphConfig, config["metadata"]["config"])
question = graph_config.inputs.prompt_builder.raw_user_query
plan = state["plan"]
task = plan[0]
step_count = state.get("step_count", 0) + 1
if is_search_query(task):
query = translate_task_to_query(
plan[0], context=state["past_steps"], initial_question=question
)
write_custom_event(
"refined_agent_answer",
AgentAnswerPiece(
answer_piece=" executing a search query with Onyx...",
level=0,
level_question_num=0,
answer_type="agent_level_answer",
),
writer,
)
graph = deep_research_graph_builder()
compiled_graph = graph.compile(debug=IS_DEBUG)
# TODO: use this input_state for the deep research graph
# input_state = DeepResearchInput(log_messages=[])
initial_state = {
"messages": [HumanMessage(content=query)],
"search_query": [],
"onyx_research_result": [],
"sources_gathered": [],
"initial_search_query_count": 3, # Default value from Configuration
"max_research_loops": 2, # State does not seem to pick up this value
"research_loop_count": 0,
"reasoning_model": "primary",
}
result = compiled_graph.invoke(initial_state)
return {
"past_steps": [(task, query, result["messages"][-1].content)],
"step_count": step_count,
}
else:
primary_llm, _ = get_default_llms()
formatted_prompt = task_completion_prompt.format(
task=task,
plan=state["plan"],
past_steps=state["past_steps"],
company_name=COMPANY_NAME,
company_context=COMPANY_CONTEXT,
)
write_custom_event(
"refined_agent_answer",
AgentAnswerPiece(
answer_piece=" accomplishing the planned task...",
level=0,
level_question_num=0,
answer_type="agent_level_answer",
),
writer,
)
response = primary_llm.invoke(formatted_prompt).content
return {
"past_steps": [(task, "task: " + task, response)],
"step_count": step_count,
}
def plan_step(
state: PlanExecute, config: RunnableConfig, writer: StreamWriter = lambda _: None
):
"""
LangGraph node that replans the deep research process.
"""
graph_config = cast(GraphConfig, config["metadata"]["config"])
question = graph_config.inputs.prompt_builder.raw_user_query
formatted_prompt = planner_prompt.format(
input=question, company_name=COMPANY_NAME, company_context=COMPANY_CONTEXT
)
primary_llm, _ = get_default_llms()
response = primary_llm.invoke(formatted_prompt).content
plan = json_to_pydantic(response, Plan)
write_custom_event(
"refined_agent_answer",
AgentAnswerPiece(
answer_piece="Generating a plan to answer the user's question... ",
level=0,
level_question_num=0,
answer_type="agent_level_answer",
),
writer,
)
return {"plan": plan.steps}
def replan_step(
state: PlanExecute, config: RunnableConfig, writer: StreamWriter = lambda _: None
):
"""
LangGraph node that determines if the deep research process should end.
"""
graph_config = cast(GraphConfig, config["metadata"]["config"])
question = graph_config.inputs.prompt_builder.raw_user_query
formatted_prompt = replanner_prompt.format(
input=question,
plan=state["plan"],
past_steps=state["past_steps"],
company_name=COMPANY_NAME,
company_context=COMPANY_CONTEXT,
)
primary_llm, _ = get_default_llms()
response = primary_llm.invoke(formatted_prompt).content
output = json_to_pydantic(response, Act)
# TODO: add a check for time limit too
if isinstance(output.action, Response):
# Check for canned response, if so, return the answer from the last step
if output.action.response == "The final answer to the user's question":
return {
"response": state["past_steps"][-1][2],
}
else:
return {"response": output.action.response}
elif state["step_count"] >= state.get("max_steps", 5):
return {
"response": f"I've reached the maximum number of step, my best guess is {state['past_steps'][-1][2]}."
}
else:
write_custom_event(
"refined_agent_answer",
AgentAnswerPiece(
answer_piece=" moving on to the next step...",
level=0,
level_question_num=0,
answer_type="agent_level_answer",
),
writer,
)
return {"plan": output.action.steps}
def should_end(
state: PlanExecute, config: RunnableConfig, writer: StreamWriter = lambda _: None
):
if "response" in state and state["response"]:
write_custom_event(
"refined_agent_answer",
AgentAnswerPiece(
answer_piece=state["response"],
level=0,
level_question_num=0,
answer_type="agent_level_answer",
),
writer,
)
return END
else:
return "agent"
def deep_planner_graph_builder(test_mode: bool = False) -> StateGraph:
"""
LangGraph graph builder for deep planner process.
"""
workflow = StateGraph(PlanExecute, config_schema=DeepPlannerConfiguration)
# Add the plan node
workflow.add_node("planner", plan_step)
# Add the execution step
workflow.add_node("agent", execute_step)
# Add a replan node
workflow.add_node("replan", replan_step)
workflow.add_edge(START, "planner")
# From plan we go to agent
workflow.add_edge("planner", "agent")
# From agent, we replan
workflow.add_edge("agent", "replan")
workflow.add_conditional_edges(
"replan",
should_end,
["agent", END],
)
return workflow
if __name__ == "__main__":
# Initialize the SQLAlchemy engine first
from onyx.db.engine import SqlEngine
SqlEngine.init_engine(
pool_size=5, # You can adjust these values based on your needs
max_overflow=10,
app_name="graph_builder",
)
# Set the debug and verbose flags for Langchain/Langgraph
set_debug(IS_DEBUG)
set_verbose(IS_VERBOSE)
# Compile the graph
query_start_time = datetime.now()
logger.debug(f"Start at {query_start_time}")
graph = deep_planner_graph_builder()
compiled_graph = graph.compile(debug=IS_DEBUG)
query_end_time = datetime.now()
logger.debug(f"Graph compiled in {query_end_time - query_start_time} seconds")
queries = [
"What is the capital of France?",
"What is Onyx?",
"Who are the founders of Onyx?",
"Who is the CEO of Onyx?",
"Where was the CEO of Onyx born?",
"What is the highest contract value for last month?",
"What is the most expensive component of our technical pipeline so far?",
"Who are top 5 competitors who are not US based?",
"What companies should we focus on to maximize our revenue?",
"What are some of the biggest problems for our customers and their potential solutions?",
]
hard_queries = [
"Where was the CEO of Onyx born?",
"Who are top 5 competitors who are not US based?",
"What companies should we focus on to maximize our revenue?",
"What are some of the biggest problems for our customers and their potential solutions?",
]
for query in hard_queries:
# Create the initial state with all required fields
initial_state = {
"input": [HumanMessage(content=query)],
"plan": [],
"past_steps": [],
"response": "",
"max_steps": 10,
"step_count": 0,
}
result = compiled_graph.invoke(initial_state)
print("Max planning loops: ", result["max_steps"])
print("Steps: ", result["step_count"])
print("Past steps: ")
pprint(result["past_steps"], indent=4)
print("Question: ", query)
print("Answer: ", result["response"])
print("--------------------------------")
# from pdb import set_trace
# set_trace()

View File

@@ -0,0 +1,228 @@
from datetime import datetime
# Get current date in a readable format
def get_current_date():
return datetime.now().strftime("%B %d, %Y")
COMPANY_NAME = "Onyx"
COMPANY_CONTEXT = """
Our company is Onyx, a startup founded by Yuhong Sun and Chris Weaver. Onyx is a startup that provides a platform for
automated research and analysis using AI and LLMss. The current CEO of Onyx is Yuhong Sun. The company is based in San
Francisco, CA.
""" # noqa: W291
query_writer_instructions = """Your goal is to generate sophisticated and diverse search queries for an internal search
tool. These queries are intended for an advanced automated research tool capable of analyzing complex results and synthesizing
information. The search tool has access to internal documents and information for {company_name}.
Here is some context about our company:
{company_context}
Instructions:
- Always prefer a single search query, only add another query if the original question requests multiple aspects or elements
and one query is not enough.
- Each query should focus on one specific aspect of the original question.
- Don't produce more than {number_queries} queries.
- Queries should be diverse, if the topic is broad, generate more than 1 query.
- Don't generate multiple similar queries, 1 is enough.
- Query should ensure that the most current information is gathered. The current date is {current_date}.
- Query should be concise and contain relevant information to the company, the task, and the context.
- Unless the task is general, the query should be specific to the company, the task, and the context.
Format:
- Format your response as a JSON object with ALL three of these exact keys:
- "rationale": Brief explanation of why these queries are relevant
- "query": A list of search queries
Example:
Topic: What revenue grew more last year apple stock or the number of people buying an iphone
```json
{{
"rationale": "To answer this comparative growth question accurately, we need specific data points on Apple's stock
performance and iPhone sales metrics. These queries target the precise financial information needed: company revenue
trends, product-specific unit sales figures, and stock price movement over the same fiscal period for direct comparison.",
"query": [
"Apple total revenue growth fiscal year 2024",
"iPhone unit sales growth fiscal year 2024",
"Apple stock price growth fiscal year 2024",
],
}}
```
Context: {user_context}""" # noqa: W291
reflection_instructions = """You are an expert research assistant analyzing summaries about "{research_topic}" for {company_name}.
Here is some context about our company:
{company_context}
Instructions:
- Identify knowledge gaps or areas that need deeper exploration and generate a follow-up query. (1 or multiple).
- If provided summaries are sufficient to answer the user's question, don't generate a follow-up query.
- If there is a knowledge gap, generate a follow-up query that would help expand your understanding.
- Focus on technical details, implementation specifics, or emerging trends that weren't fully covered.
Requirements:
- Ensure the follow-up query is self-contained and includes necessary context for onyx search, an internal search tool, that
has access to internal documents and information.
Output Format:
- Format your response as a JSON object with these exact keys:
- "is_sufficient": true or false
- "knowledge_gap": Describe what information is missing or needs clarification
- "follow_up_queries": Write a specific question to address this gap
Example:
```json
{{
"is_sufficient": true, // or false
"knowledge_gap": "The summary lacks information about performance benchmarks", // "" if is_sufficient is true
"follow_up_queries":
["What are typical performance benchmarks used to evaluate [specific technology]?"] // [] if is_sufficient is true
}}
```
Reflect carefully on the Summaries to identify knowledge gaps and produce a follow-up query.
Then, produce your output following this JSON format:
Summaries:
{summaries}
"""
answer_instructions = """You are an expert research assistant analyzing summaries about "{research_topic}" for {company_name}.
Here is some context about our company:
{company_context}
Generate a high-quality answer to the user's question based on the provided summaries.
Instructions:
- The current date is {current_date}.
- You are the final step of a multi-step research process, don't mention that you are the final step.
- You have access to all the information gathered from the previous steps.
- You have access to the user's question.
- Generate a high-quality answer to the user's question based on the provided summaries and the user's question.
- You MUST include all the citations from the summaries in the answer correctly.
- Only use the information from the summaries to answer the question, do not make up information or speculate.
User Context:
- {user_context}
Summaries:
{summaries}"""
planner_prompt = """You are an expert research assistant for {company_name}.
Here is some context about our company:
{company_context}
For this given objective, come up with a simple step by step plan.
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps.
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.
Example:
Input: What's the population of the city or town where the CEO of Onyx was born?
```json
{{
"steps": ["Step 1: Search for the CEO of Onyx", "Step 2: Search for the birthplace of the CEO", "Step 3: Find the population of that town or city"]
}}
```
Your objective was this:
{input}
""" # noqa: E501
replanner_prompt = """You are an expert research assistant for {company_name}.
Here is some context about our company:
{company_context}
For the given objective, come up with a simple step by step plan.
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps.
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.
Your objective was this:
{input}
Your original plan was this:
{plan}
You have currently done the follow steps:
{past_steps}
Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan.
Plan structure:
{{
"steps": ["Step 1", "Step 2", "Step 3]
}}
Response structure:
{{
"response": "The final answer to the user's question"
}}
Example Output structure with the plan:
{{
"action": {{
"steps": ["Step 1", "Step 2", "Step 3"]
}}
}}
Example Output structure with the response:
{{
"action": {{
"response": "The final answer to the user's question" // This should be the answer from the last step
}}
}}
Output:
{{
"action": object // "Response" | "Plan"
}}
""" # noqa: E501
task_to_query_prompt = """
You are an expert research assistant for {company_name}.
Here is some context about our company:
{company_context}
Given the task and and the context, generate a query to search for the information to answer the task. Return a single query text,
make sure that the query is concise and contain relevant information to the company, the task, and the context.
Initial question: {initial_question}
Task: {task}
Information that we have also gathered so far: {context}
Query:
"""
task_completion_prompt = """You are an expert research assistant for {company_name}.
Here is some context about our company:
{company_context}
For the given task, try to accomplish it in a thorough and return well formed answer.
Your current task is this:
{task}
Your original plan was this:
{plan}
Here is the context of the tasks:
{past_steps}
Answer:
""" # noqa: E501

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
import operator
from dataclasses import dataclass
from dataclasses import field
from typing import Annotated
from typing import List
from typing import Tuple
from typing import TypedDict
from langgraph.graph import add_messages
from onyx.agents.agent_search.core_state import CoreState
class OverallState(TypedDict):
messages: Annotated[list, add_messages]
search_query: Annotated[list, operator.add]
onyx_research_result: Annotated[list, operator.add]
sources_gathered: Annotated[list, operator.add]
initial_search_query_count: int
max_research_loops: int
research_loop_count: int
reasoning_model: str
class ReflectionState(TypedDict):
is_sufficient: bool
knowledge_gap: str
follow_up_queries: Annotated[list, operator.add]
research_loop_count: int
number_of_ran_queries: int
class Query(TypedDict):
query: str
rationale: str
class QueryGenerationState(TypedDict):
query_list: list[Query]
class OnyxSearchState(TypedDict):
search_query: str
id: str
class PlanExecute(TypedDict):
input: str
plan: List[str]
past_steps: Annotated[List[Tuple], operator.add]
response: str
max_steps: int
step_count: int
@dataclass(kw_only=True)
class SearchStateOutput:
running_summary: str = field(default=None) # Final report
class DeepResearchInput(CoreState):
pass

View File

@@ -0,0 +1,94 @@
import json
from typing import List
from typing import Type
from typing import TypeVar
from typing import Union
from pydantic import BaseModel
from pydantic import Field
from pydantic import ValidationError
T = TypeVar("T", bound=BaseModel)
class SearchQueryList(BaseModel):
query: List[str] = Field(
description="A list of search queries to be used for Onyx research."
)
rationale: str = Field(
description="A brief explanation of why these queries are relevant to the research topic."
)
class Reflection(BaseModel):
is_sufficient: bool = Field(
description="Whether the provided summaries are sufficient to answer the user's question."
)
knowledge_gap: str = Field(
description="A description of what information is missing or needs clarification."
)
follow_up_queries: List[str] = Field(
description="A list of follow-up queries to address the knowledge gap."
)
class Plan(BaseModel):
"""Plan to follow in future"""
steps: List[str] = Field(
description="different steps to follow, should be in sorted order"
)
class Response(BaseModel):
"""Response to user."""
response: str
class Act(BaseModel):
"""Action to perform."""
action: Union[Response, Plan] = Field(
description="Action to perform. If you want to respond to user, use Response. "
"If you need to further use tools to get the answer, use Plan."
)
def json_to_pydantic(json_string: str, pydantic_class: Type[T]) -> T:
"""
Convert a JSON string to a Pydantic model instance.
Args:
json_string: JSON string to parse
pydantic_class: Pydantic model class to instantiate
Returns:
Instance of the pydantic_class
Raises:
json.JSONDecodeError: If json_string is invalid JSON
ValidationError: If JSON data doesn't match the Pydantic model schema
TypeError: If pydantic_class is not a Pydantic model
"""
# Validate that the class is a Pydantic model
if not (isinstance(pydantic_class, type) and issubclass(pydantic_class, BaseModel)):
raise TypeError(
f"pydantic_class must be a Pydantic BaseModel subclass, got {type(pydantic_class)}"
)
artifacts = ["json", "```"]
json_string = json_string.replace(artifacts[0], "").replace(artifacts[1], "")
# Parse JSON string to dictionary
try:
data = json.loads(json_string)
except json.JSONDecodeError as e:
raise json.JSONDecodeError(f"Invalid JSON string: {e.msg}", e.doc, e.pos)
# Create and validate Pydantic model instance
try:
return pydantic_class.model_validate(data)
except ValidationError as e:
print(f"JSON data doesn't match {pydantic_class.__name__} schema: {e}")
raise e

View File

@@ -0,0 +1,40 @@
from typing import List
from langchain_core.messages import AIMessage
from langchain_core.messages import AnyMessage
from langchain_core.messages import HumanMessage
from onyx.llm.factory import get_default_llms
def collate_messages(messages: List[AnyMessage]) -> str:
"""
Collate the messages into a single string.
"""
# check if request has a history and combine the messages into a single string
if len(messages) == 1:
research_topic = messages[-1].content
else:
research_topic = ""
for message in messages:
if isinstance(message, HumanMessage):
research_topic += f"User: {message.content}\n"
elif isinstance(message, AIMessage):
research_topic += f"Assistant: {message.content}\n"
return research_topic
def get_research_topic(messages: list[AnyMessage]) -> str:
"""
Get the research topic from the messages.
"""
_, fast_llm = get_default_llms()
prompt = """You are a helpful assistant that summarizes the conversation history.
The conversation history is as follows:
{messages}
Please summarize the conversation history in a single research topic.
"""
collated_messages = collate_messages(messages)
llm_response = fast_llm.invoke(prompt.format(messages=collated_messages))
return llm_response.content

View File

@@ -12,6 +12,13 @@ from onyx.agents.agent_search.dc_search_analysis.graph_builder import (
divide_and_conquer_graph_builder,
)
from onyx.agents.agent_search.dc_search_analysis.states import MainInput as DCMainInput
from onyx.agents.agent_search.deep_research.graph_builder import (
deep_planner_graph_builder,
)
from onyx.agents.agent_search.deep_research.graph_builder import (
deep_research_graph_builder,
)
from onyx.agents.agent_search.deep_research.states import DeepResearchInput
from onyx.agents.agent_search.deep_search.main.graph_builder import (
agent_search_graph_builder as agent_search_graph_builder,
)
@@ -159,6 +166,27 @@ def run_dc_graph(
return run_graph(compiled_graph, config, input)
def run_deepresearch_graph(
config: GraphConfig,
) -> AnswerStream:
graph = deep_research_graph_builder()
compiled_graph = graph.compile()
input = DeepResearchInput(log_messages=[])
config.inputs.prompt_builder.raw_user_query = (
config.inputs.prompt_builder.raw_user_query.strip()
)
return run_graph(compiled_graph, config, input)
def run_deepplanner_graph(
config: GraphConfig,
) -> AnswerStream:
graph = deep_planner_graph_builder()
compiled_graph = graph.compile()
input = DeepResearchInput(log_messages=[])
return run_graph(compiled_graph, config, input)
if __name__ == "__main__":
for _ in range(1):
query_start_time = datetime.now()

View File

@@ -11,7 +11,7 @@ from onyx.agents.agent_search.models import GraphSearchConfig
from onyx.agents.agent_search.models import GraphTooling
from onyx.agents.agent_search.run_graph import run_agent_search_graph
from onyx.agents.agent_search.run_graph import run_basic_graph
from onyx.agents.agent_search.run_graph import run_dc_graph
from onyx.agents.agent_search.run_graph import run_deepplanner_graph
from onyx.chat.models import AgentAnswerPiece
from onyx.chat.models import AnswerPacket
from onyx.chat.models import AnswerStream
@@ -142,7 +142,7 @@ class Answer:
"DivCon Beta Agent"
)
):
run_langgraph = run_dc_graph
run_langgraph = run_deepplanner_graph
else:
run_langgraph = run_basic_graph