Compare commits

...

20 Commits

Author SHA1 Message Date
hagen-danswer
34aa054c5d added chunk_ids and stats to QueryResult 2024-12-19 08:48:05 -08:00
hagen-danswer
cebe237705 renamed PrimaryState to CoreState 2024-12-19 08:47:39 -08:00
hagen-danswer
c759fb5709 Merge pull request #3505 from onyx-dot-app/fix-the-states
Fix the states
2024-12-18 15:48:20 -08:00
hagen-danswer
ffc81f6e45 seperate edge for initial retrieval 2024-12-18 13:10:46 -08:00
hagen-danswer
2d6f746259 made query expansion explicit 2024-12-18 13:03:28 -08:00
hagen-danswer
bca02ebec6 figured it out 2024-12-18 12:44:28 -08:00
hagen-danswer
0c75ca0579 renames 2024-12-18 11:08:43 -08:00
hagen-danswer
9d3220fcfc explicitly ingest state from retrieval 2024-12-18 10:17:07 -08:00
hagen-danswer
50a216f554 naming and comments 2024-12-18 09:56:34 -08:00
hagen-danswer
8399d2ee0a mypy fixed 2024-12-18 09:27:47 -08:00
hagen-danswer
fd694bea8f query->question 2024-12-18 08:47:43 -08:00
hagen-danswer
e76cbec53c main graph works 2024-12-18 08:43:54 -08:00
hagen-danswer
d66180fe13 Cleanup 2024-12-18 07:33:40 -08:00
hagen-danswer
442c94727e got answer subgraph working 2024-12-17 15:16:36 -08:00
hagen-danswer
2f2b9a862a fixed expanded retrieval subgraph 2024-12-17 15:11:54 -08:00
hagen-danswer
1f88b60abd Now using result objects 2024-12-17 14:05:51 -08:00
hagen-danswer
ff03d717f3 brough over joachim changes 2024-12-17 12:36:28 -08:00
hagen-danswer
82914ad365 fixed key issue 2024-12-16 13:26:09 -08:00
hagen-danswer
11ce2a62ab fix: update staged changes 2024-12-16 12:24:17 -08:00
joachim-danswer
6311b70cc6 initial onyx changes 2024-12-16 11:23:01 -08:00
40 changed files with 1968 additions and 4 deletions

View File

@@ -0,0 +1,17 @@
from collections.abc import Hashable
from langgraph.types import Send
from onyx.agent_search.answer_question.states import AnswerQuestionInput
from onyx.agent_search.core_state import extract_core_fields
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalInput
def send_to_expanded_retrieval(state: AnswerQuestionInput) -> Send | Hashable:
return Send(
"decomped_expanded_retrieval",
ExpandedRetrievalInput(
**extract_core_fields(state),
question=state["question"],
),
)

View File

@@ -0,0 +1,106 @@
from langgraph.graph import END
from langgraph.graph import START
from langgraph.graph import StateGraph
from onyx.agent_search.answer_question.edges import send_to_expanded_retrieval
from onyx.agent_search.answer_question.nodes.answer_check import answer_check
from onyx.agent_search.answer_question.nodes.answer_generation import answer_generation
from onyx.agent_search.answer_question.nodes.format_answer import format_answer
from onyx.agent_search.answer_question.nodes.ingest_retrieval import ingest_retrieval
from onyx.agent_search.answer_question.states import AnswerQuestionInput
from onyx.agent_search.answer_question.states import AnswerQuestionOutput
from onyx.agent_search.answer_question.states import AnswerQuestionState
from onyx.agent_search.expanded_retrieval.graph_builder import (
expanded_retrieval_graph_builder,
)
def answer_query_graph_builder() -> StateGraph:
graph = StateGraph(
state_schema=AnswerQuestionState,
input=AnswerQuestionInput,
output=AnswerQuestionOutput,
)
### Add nodes ###
expanded_retrieval = expanded_retrieval_graph_builder().compile()
graph.add_node(
node="decomped_expanded_retrieval",
action=expanded_retrieval,
)
graph.add_node(
node="answer_check",
action=answer_check,
)
graph.add_node(
node="answer_generation",
action=answer_generation,
)
graph.add_node(
node="format_answer",
action=format_answer,
)
graph.add_node(
node="ingest_retrieval",
action=ingest_retrieval,
)
### Add edges ###
graph.add_conditional_edges(
source=START,
path=send_to_expanded_retrieval,
path_map=["decomped_expanded_retrieval"],
)
graph.add_edge(
start_key="decomped_expanded_retrieval",
end_key="ingest_retrieval",
)
graph.add_edge(
start_key="ingest_retrieval",
end_key="answer_generation",
)
graph.add_edge(
start_key="answer_generation",
end_key="answer_check",
)
graph.add_edge(
start_key="answer_check",
end_key="format_answer",
)
graph.add_edge(
start_key="format_answer",
end_key=END,
)
return graph
if __name__ == "__main__":
from onyx.db.engine import get_session_context_manager
from onyx.llm.factory import get_default_llms
from onyx.context.search.models import SearchRequest
graph = answer_query_graph_builder()
compiled_graph = graph.compile()
primary_llm, fast_llm = get_default_llms()
search_request = SearchRequest(
query="what can you do with onyx or danswer?",
)
with get_session_context_manager() as db_session:
inputs = AnswerQuestionInput(
search_request=search_request,
primary_llm=primary_llm,
fast_llm=fast_llm,
db_session=db_session,
question="what can you do with onyx?",
)
for thing in compiled_graph.stream(
input=inputs,
# debug=True,
# subgraphs=True,
):
print(thing)
# output = compiled_graph.invoke(inputs)
# print(output)

View File

@@ -0,0 +1,30 @@
from langchain_core.messages import HumanMessage
from langchain_core.messages import merge_message_runs
from onyx.agent_search.answer_question.states import AnswerQuestionState
from onyx.agent_search.answer_question.states import QACheckUpdate
from onyx.agent_search.shared_graph_utils.prompts import SUB_CHECK_PROMPT
def answer_check(state: AnswerQuestionState) -> QACheckUpdate:
msg = [
HumanMessage(
content=SUB_CHECK_PROMPT.format(
question=state["question"],
base_answer=state["answer"],
)
)
]
fast_llm = state["fast_llm"]
response = list(
fast_llm.stream(
prompt=msg,
)
)
quality_str = merge_message_runs(response, chunk_separator="")[0].content
return QACheckUpdate(
answer_quality=quality_str,
)

View File

@@ -0,0 +1,32 @@
from langchain_core.messages import HumanMessage
from langchain_core.messages import merge_message_runs
from onyx.agent_search.answer_question.states import AnswerQuestionState
from onyx.agent_search.answer_question.states import QAGenerationUpdate
from onyx.agent_search.shared_graph_utils.prompts import BASE_RAG_PROMPT
from onyx.agent_search.shared_graph_utils.utils import format_docs
def answer_generation(state: AnswerQuestionState) -> QAGenerationUpdate:
question = state["question"]
docs = state["documents"]
print(f"Number of verified retrieval docs: {len(docs)}")
msg = [
HumanMessage(
content=BASE_RAG_PROMPT.format(question=question, context=format_docs(docs))
)
]
fast_llm = state["fast_llm"]
response = list(
fast_llm.stream(
prompt=msg,
)
)
answer_str = merge_message_runs(response, chunk_separator="")[0].content
return QAGenerationUpdate(
answer=answer_str,
)

View File

@@ -0,0 +1,17 @@
from onyx.agent_search.answer_question.states import AnswerQuestionOutput
from onyx.agent_search.answer_question.states import AnswerQuestionState
from onyx.agent_search.answer_question.states import QuestionAnswerResults
def format_answer(state: AnswerQuestionState) -> AnswerQuestionOutput:
return AnswerQuestionOutput(
answer_results=[
QuestionAnswerResults(
question=state["question"],
quality=state["answer_quality"],
answer=state["answer"],
expanded_retrieval_results=state["expanded_retrieval_results"],
documents=state["documents"],
)
],
)

View File

@@ -0,0 +1,11 @@
from onyx.agent_search.answer_question.states import RetrievalIngestionUpdate
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalOutput
def ingest_retrieval(state: ExpandedRetrievalOutput) -> RetrievalIngestionUpdate:
return RetrievalIngestionUpdate(
expanded_retrieval_results=state[
"expanded_retrieval_result"
].expanded_queries_results,
documents=state["expanded_retrieval_result"].all_documents,
)

View File

@@ -0,0 +1,71 @@
from operator import add
from typing import Annotated
from typing import TypedDict
from pydantic import BaseModel
from onyx.agent_search.core_state import CoreState
from onyx.agent_search.expanded_retrieval.states import QueryResult
from onyx.agent_search.shared_graph_utils.operators import dedup_inference_sections
from onyx.context.search.models import InferenceSection
### Models ###
class QuestionAnswerResults(BaseModel):
question: str
answer: str
quality: str
expanded_retrieval_results: list[QueryResult]
documents: list[InferenceSection]
### States ###
## Update States
class QACheckUpdate(TypedDict):
answer_quality: str
class QAGenerationUpdate(TypedDict):
answer: str
class RetrievalIngestionUpdate(TypedDict):
expanded_retrieval_results: list[QueryResult]
documents: Annotated[list[InferenceSection], dedup_inference_sections]
## Graph Input State
class AnswerQuestionInput(CoreState):
question: str
## Graph State
class AnswerQuestionState(
AnswerQuestionInput,
QAGenerationUpdate,
QACheckUpdate,
RetrievalIngestionUpdate,
):
pass
## Graph Output State
class AnswerQuestionOutput(TypedDict):
"""
This is a list of results even though each call of this subgraph only returns one result.
This is because if we parallelize the answer query subgraph, there will be multiple
results in a list so the add operator is used to add them together.
"""
answer_results: Annotated[list[QuestionAnswerResults], add]

View File

@@ -0,0 +1,29 @@
from typing import TypedDict
from typing import TypeVar
from sqlalchemy.orm import Session
from onyx.context.search.models import SearchRequest
from onyx.llm.interfaces import LLM
class CoreState(TypedDict, total=False):
"""
This is the core state that is shared across all subgraphs.
"""
search_request: SearchRequest
primary_llm: LLM
fast_llm: LLM
# a single session for the entire agent search
# is fine if we are only reading
db_session: Session
# This ensures that the state passed in extends the CoreState
T = TypeVar("T", bound=CoreState)
def extract_core_fields(state: T) -> CoreState:
filtered_dict = {k: v for k, v in state.items() if k in CoreState.__annotations__}
return CoreState(**dict(filtered_dict)) # type: ignore

View File

@@ -0,0 +1,114 @@
from typing import Any
from langchain_core.messages import HumanMessage
from onyx.agent_search.main.states import MainState
from onyx.agent_search.shared_graph_utils.prompts import COMBINED_CONTEXT
from onyx.agent_search.shared_graph_utils.prompts import MODIFIED_RAG_PROMPT
from onyx.agent_search.shared_graph_utils.utils import format_docs
from onyx.agent_search.shared_graph_utils.utils import normalize_whitespace
# aggregate sub questions and answers
def deep_answer_generation(state: MainState) -> dict[str, Any]:
"""
Generate answer
Args:
state (messages): The current state
Returns:
dict: The updated state with re-phrased question
"""
print("---DEEP GENERATE---")
question = state["original_question"]
docs = state["deduped_retrieval_docs"]
deep_answer_context = state["core_answer_dynamic_context"]
print(f"Number of verified retrieval docs - deep: {len(docs)}")
combined_context = normalize_whitespace(
COMBINED_CONTEXT.format(
deep_answer_context=deep_answer_context, formated_docs=format_docs(docs)
)
)
msg = [
HumanMessage(
content=MODIFIED_RAG_PROMPT.format(
question=question, combined_context=combined_context
)
)
]
# Grader
model = state["fast_llm"]
response = model.invoke(msg)
return {
"deep_answer": response.content,
}
def final_stuff(state: MainState) -> dict[str, Any]:
"""
Invokes the agent model to generate a response based on the current state. Given
the question, it will decide to retrieve using the retriever tool, or simply end.
Args:
state (messages): The current state
Returns:
dict: The updated state with the agent response appended to messages
"""
print("---FINAL---")
messages = state["log_messages"]
time_ordered_messages = [x.pretty_repr() for x in messages]
time_ordered_messages.sort()
print("Message Log:")
print("\n".join(time_ordered_messages))
initial_sub_qas = state["initial_sub_qas"]
initial_sub_qa_list = []
for initial_sub_qa in initial_sub_qas:
if initial_sub_qa["sub_answer_check"] == "yes":
initial_sub_qa_list.append(
f' Question:\n {initial_sub_qa["sub_question"]}\n --\n Answer:\n {initial_sub_qa["sub_answer"]}\n -----'
)
initial_sub_qa_context = "\n".join(initial_sub_qa_list)
base_answer = state["base_answer"]
print(f"Final Base Answer:\n{base_answer}")
print("--------------------------------")
print(f"Initial Answered Sub Questions:\n{initial_sub_qa_context}")
print("--------------------------------")
if not state.get("deep_answer"):
print("No Deep Answer was required")
return {}
deep_answer = state["deep_answer"]
sub_qas = state["sub_qas"]
sub_qa_list = []
for sub_qa in sub_qas:
if sub_qa["sub_answer_check"] == "yes":
sub_qa_list.append(
f' Question:\n {sub_qa["sub_question"]}\n --\n Answer:\n {sub_qa["sub_answer"]}\n -----'
)
sub_qa_context = "\n".join(sub_qa_list)
print(f"Final Base Answer:\n{base_answer}")
print("--------------------------------")
print(f"Final Deep Answer:\n{deep_answer}")
print("--------------------------------")
print("Sub Questions and Answers:")
print(sub_qa_context)
return {}

View File

@@ -0,0 +1,78 @@
import json
import re
from datetime import datetime
from typing import Any
from langchain_core.messages import HumanMessage
from onyx.agent_search.main.states import MainState
from onyx.agent_search.shared_graph_utils.prompts import DEEP_DECOMPOSE_PROMPT
from onyx.agent_search.shared_graph_utils.utils import format_entity_term_extraction
from onyx.agent_search.shared_graph_utils.utils import generate_log_message
def decompose(state: MainState) -> dict[str, Any]:
""" """
node_start_time = datetime.now()
question = state["original_question"]
base_answer = state["base_answer"]
# get the entity term extraction dict and properly format it
entity_term_extraction_dict = state["retrieved_entities_relationships"][
"retrieved_entities_relationships"
]
entity_term_extraction_str = format_entity_term_extraction(
entity_term_extraction_dict
)
initial_question_answers = state["initial_sub_qas"]
addressed_question_list = [
x["sub_question"]
for x in initial_question_answers
if x["sub_answer_check"] == "yes"
]
failed_question_list = [
x["sub_question"]
for x in initial_question_answers
if x["sub_answer_check"] == "no"
]
msg = [
HumanMessage(
content=DEEP_DECOMPOSE_PROMPT.format(
question=question,
entity_term_extraction_str=entity_term_extraction_str,
base_answer=base_answer,
answered_sub_questions="\n - ".join(addressed_question_list),
failed_sub_questions="\n - ".join(failed_question_list),
),
)
]
# Grader
model = state["fast_llm"]
response = model.invoke(msg)
cleaned_response = re.sub(r"```json\n|\n```", "", response.pretty_repr())
parsed_response = json.loads(cleaned_response)
sub_questions_dict = {}
for sub_question_nr, sub_question_dict in enumerate(
parsed_response["sub_questions"]
):
sub_question_dict["answered"] = False
sub_question_dict["verified"] = False
sub_questions_dict[sub_question_nr] = sub_question_dict
return {
"decomposed_sub_questions_dict": sub_questions_dict,
"log_messages": generate_log_message(
message="deep - decompose",
node_start_time=node_start_time,
graph_start_time=state["graph_start_time"],
),
}

View File

@@ -0,0 +1,40 @@
import json
import re
from typing import Any
from langchain_core.messages import HumanMessage
from langchain_core.messages import merge_message_runs
from onyx.agent_search.main.states import MainState
from onyx.agent_search.shared_graph_utils.prompts import ENTITY_TERM_PROMPT
from onyx.agent_search.shared_graph_utils.utils import format_docs
def entity_term_extraction(state: MainState) -> dict[str, Any]:
"""Extract entities and terms from the question and context"""
question = state["original_question"]
docs = state["deduped_retrieval_docs"]
doc_context = format_docs(docs)
msg = [
HumanMessage(
content=ENTITY_TERM_PROMPT.format(question=question, context=doc_context),
)
]
fast_llm = state["fast_llm"]
# Grader
llm_response_list = list(
fast_llm.stream(
prompt=msg,
)
)
llm_response = merge_message_runs(llm_response_list, chunk_separator="")[0].content
cleaned_response = re.sub(r"```json\n|\n```", "", llm_response)
parsed_response = json.loads(cleaned_response)
return {
"retrieved_entities_relationships": parsed_response,
}

View File

@@ -0,0 +1,30 @@
from typing import Any
from onyx.agent_search.main.states import MainState
# aggregate sub questions and answers
def sub_qa_level_aggregator(state: MainState) -> dict[str, Any]:
sub_qas = state["sub_qas"]
dynamic_context_list = [
"Below you will find useful information to answer the original question:"
]
checked_sub_qas = []
for core_answer_sub_qa in sub_qas:
question = core_answer_sub_qa["sub_question"]
answer = core_answer_sub_qa["sub_answer"]
verified = core_answer_sub_qa["sub_answer_check"]
if verified == "yes":
dynamic_context_list.append(
f"Question:\n{question}\n\nAnswer:\n{answer}\n\n---\n\n"
)
checked_sub_qas.append({"sub_question": question, "sub_answer": answer})
dynamic_context = "\n".join(dynamic_context_list)
return {
"core_answer_dynamic_context": dynamic_context,
"checked_sub_qas": checked_sub_qas,
}

View File

@@ -0,0 +1,19 @@
from typing import Any
from onyx.agent_search.main.states import MainState
def sub_qa_manager(state: MainState) -> dict[str, Any]:
""" """
sub_questions_dict = state["decomposed_sub_questions_dict"]
sub_questions = {}
for sub_question_nr, sub_question_dict in sub_questions_dict.items():
sub_questions[sub_question_nr] = sub_question_dict["sub_question"]
return {
"sub_questions": sub_questions,
"num_new_question_iterations": 0,
}

View File

@@ -0,0 +1,20 @@
from collections.abc import Hashable
from langgraph.types import Send
from onyx.agent_search.core_state import extract_core_fields
from onyx.agent_search.expanded_retrieval.nodes.doc_retrieval import RetrievalInput
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalState
def parallel_retrieval_edge(state: ExpandedRetrievalState) -> list[Send | Hashable]:
return [
Send(
"doc_retrieval",
RetrievalInput(
query_to_retrieve=query,
**extract_core_fields(state),
),
)
for query in state["expanded_queries"]
]

View File

@@ -0,0 +1,111 @@
from langgraph.graph import END
from langgraph.graph import START
from langgraph.graph import StateGraph
from onyx.agent_search.expanded_retrieval.edges import parallel_retrieval_edge
from onyx.agent_search.expanded_retrieval.nodes.doc_reranking import doc_reranking
from onyx.agent_search.expanded_retrieval.nodes.doc_retrieval import doc_retrieval
from onyx.agent_search.expanded_retrieval.nodes.doc_verification import (
doc_verification,
)
from onyx.agent_search.expanded_retrieval.nodes.expand_queries import expand_queries
from onyx.agent_search.expanded_retrieval.nodes.format_results import format_results
from onyx.agent_search.expanded_retrieval.nodes.verification_kickoff import (
verification_kickoff,
)
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalInput
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalOutput
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalState
def expanded_retrieval_graph_builder() -> StateGraph:
graph = StateGraph(
state_schema=ExpandedRetrievalState,
input=ExpandedRetrievalInput,
output=ExpandedRetrievalOutput,
)
### Add nodes ###
graph.add_node(
node="expand_queries",
action=expand_queries,
)
graph.add_node(
node="doc_retrieval",
action=doc_retrieval,
)
graph.add_node(
node="verification_kickoff",
action=verification_kickoff,
)
graph.add_node(
node="doc_verification",
action=doc_verification,
)
graph.add_node(
node="doc_reranking",
action=doc_reranking,
)
graph.add_node(
node="format_results",
action=format_results,
)
### Add edges ###
graph.add_edge(
start_key=START,
end_key="expand_queries",
)
graph.add_conditional_edges(
source="expand_queries",
path=parallel_retrieval_edge,
path_map=["doc_retrieval"],
)
graph.add_edge(
start_key="doc_retrieval",
end_key="verification_kickoff",
)
graph.add_edge(
start_key="doc_verification",
end_key="doc_reranking",
)
graph.add_edge(
start_key="doc_reranking",
end_key="format_results",
)
graph.add_edge(
start_key="format_results",
end_key=END,
)
return graph
if __name__ == "__main__":
from onyx.db.engine import get_session_context_manager
from onyx.llm.factory import get_default_llms
from onyx.context.search.models import SearchRequest
graph = expanded_retrieval_graph_builder()
compiled_graph = graph.compile()
primary_llm, fast_llm = get_default_llms()
search_request = SearchRequest(
query="what can you do with onyx or danswer?",
)
with get_session_context_manager() as db_session:
inputs = ExpandedRetrievalInput(
search_request=search_request,
primary_llm=primary_llm,
fast_llm=fast_llm,
db_session=db_session,
question="what can you do with onyx?",
)
for thing in compiled_graph.stream(
input=inputs,
# debug=True,
subgraphs=True,
):
print(thing)

View File

@@ -0,0 +1,9 @@
from onyx.agent_search.expanded_retrieval.states import DocRerankingUpdate
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalState
def doc_reranking(state: ExpandedRetrievalState) -> DocRerankingUpdate:
verified_documents = state["verified_documents"]
reranked_documents = verified_documents
return DocRerankingUpdate(reranked_documents=reranked_documents)

View File

@@ -0,0 +1,44 @@
from onyx.agent_search.expanded_retrieval.states import DocRetrievalUpdate
from onyx.agent_search.expanded_retrieval.states import QueryResult
from onyx.agent_search.expanded_retrieval.states import RetrievalInput
from onyx.context.search.models import InferenceSection
from onyx.context.search.models import SearchRequest
from onyx.context.search.pipeline import SearchPipeline
def doc_retrieval(state: RetrievalInput) -> DocRetrievalUpdate:
"""
Retrieve documents
Args:
state (RetrievalInput): Primary state + the query to retrieve
Updates:
expanded_retrieval_results: list[ExpandedRetrievalResult]
retrieved_documents: list[InferenceSection]
"""
llm = state["primary_llm"]
fast_llm = state["fast_llm"]
query_to_retrieve = state["query_to_retrieve"]
documents: list[InferenceSection] = SearchPipeline(
search_request=SearchRequest(
query=query_to_retrieve,
),
user=None,
llm=llm,
fast_llm=fast_llm,
db_session=state["db_session"],
).reranked_sections
expanded_retrieval_result = QueryResult(
query=query_to_retrieve,
documents_for_query=documents[:4],
chunk_ids=[],
stats={},
)
return DocRetrievalUpdate(
expanded_retrieval_results=[expanded_retrieval_result],
retrieved_documents=documents[:4],
)

View File

@@ -0,0 +1,51 @@
from langchain_core.messages import HumanMessage
from langchain_core.messages import merge_message_runs
from onyx.agent_search.expanded_retrieval.states import DocVerificationInput
from onyx.agent_search.expanded_retrieval.states import DocVerificationUpdate
from onyx.agent_search.shared_graph_utils.models import BinaryDecision
from onyx.agent_search.shared_graph_utils.prompts import VERIFIER_PROMPT
def doc_verification(state: DocVerificationInput) -> DocVerificationUpdate:
"""
Check whether the document is relevant for the original user question
Args:
state (DocVerificationInput): The current state
Updates:
verified_documents: list[InferenceSection]
"""
original_query = state["search_request"].query
doc_to_verify = state["doc_to_verify"]
document_content = doc_to_verify.combined_content
msg = [
HumanMessage(
content=VERIFIER_PROMPT.format(
question=original_query, document_content=document_content
)
)
]
fast_llm = state["fast_llm"]
response = list(
fast_llm.stream(
prompt=msg,
)
)
response_string = merge_message_runs(response, chunk_separator="")[0].content
# Convert string response to proper dictionary format
decision_dict = {"decision": response_string.lower()}
formatted_response = BinaryDecision.model_validate(decision_dict)
verified_documents = []
if formatted_response.decision == "yes":
verified_documents.append(doc_to_verify)
return DocVerificationUpdate(
verified_documents=verified_documents,
)

View File

@@ -0,0 +1,30 @@
from langchain_core.messages import HumanMessage
from langchain_core.messages import merge_message_runs
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalInput
from onyx.agent_search.expanded_retrieval.states import QueryExpansionUpdate
from onyx.agent_search.shared_graph_utils.prompts import REWRITE_PROMPT_MULTI_ORIGINAL
from onyx.llm.interfaces import LLM
def expand_queries(state: ExpandedRetrievalInput) -> QueryExpansionUpdate:
question = state.get("question")
llm: LLM = state["fast_llm"]
msg = [
HumanMessage(
content=REWRITE_PROMPT_MULTI_ORIGINAL.format(question=question),
)
]
llm_response_list = list(
llm.stream(
prompt=msg,
)
)
llm_response = merge_message_runs(llm_response_list, chunk_separator="")[0].content
rewritten_queries = llm_response.split("--")
return QueryExpansionUpdate(
expanded_queries=rewritten_queries,
)

View File

@@ -0,0 +1,12 @@
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalOutput
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalResult
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalState
def format_results(state: ExpandedRetrievalState) -> ExpandedRetrievalOutput:
return ExpandedRetrievalOutput(
expanded_retrieval_result=ExpandedRetrievalResult(
expanded_queries_results=state["expanded_retrieval_results"],
all_documents=state["reranked_documents"],
),
)

View File

@@ -0,0 +1,31 @@
from typing import Literal
from langgraph.types import Command
from langgraph.types import Send
from onyx.agent_search.core_state import extract_core_fields
from onyx.agent_search.expanded_retrieval.nodes.doc_verification import (
DocVerificationInput,
)
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalState
def verification_kickoff(
state: ExpandedRetrievalState,
) -> Command[Literal["doc_verification"]]:
print(f"verification_kickoff state: {state.keys()}")
documents = state["retrieved_documents"]
return Command(
update={},
goto=[
Send(
node="doc_verification",
arg=DocVerificationInput(
doc_to_verify=doc,
**extract_core_fields(state),
),
)
for doc in documents
],
)

View File

@@ -0,0 +1,85 @@
from operator import add
from typing import Annotated
from typing import Any
from typing import TypedDict
from pydantic import BaseModel
from onyx.agent_search.core_state import CoreState
from onyx.agent_search.shared_graph_utils.operators import dedup_inference_sections
from onyx.context.search.models import InferenceSection
### Models ###
class QueryResult(BaseModel):
query: str
documents_for_query: list[InferenceSection]
chunk_ids: list[str]
stats: dict[str, Any]
class ExpandedRetrievalResult(BaseModel):
expanded_queries_results: list[QueryResult]
all_documents: list[InferenceSection]
### States ###
## Update States
class DocVerificationUpdate(TypedDict):
verified_documents: Annotated[list[InferenceSection], dedup_inference_sections]
class DocRerankingUpdate(TypedDict):
reranked_documents: Annotated[list[InferenceSection], dedup_inference_sections]
class QueryExpansionUpdate(TypedDict):
expanded_queries: list[str]
class DocRetrievalUpdate(TypedDict):
expanded_retrieval_results: Annotated[list[QueryResult], add]
retrieved_documents: Annotated[list[InferenceSection], dedup_inference_sections]
## Graph Input State
class ExpandedRetrievalInput(CoreState):
question: str
## Graph State
class ExpandedRetrievalState(
# This includes the core state
ExpandedRetrievalInput,
DocRetrievalUpdate,
DocVerificationUpdate,
DocRerankingUpdate,
QueryExpansionUpdate,
):
pass
## Graph Output State
class ExpandedRetrievalOutput(TypedDict):
expanded_retrieval_result: ExpandedRetrievalResult
## Conditional Input States
class DocVerificationInput(CoreState):
doc_to_verify: InferenceSection
class RetrievalInput(CoreState):
query_to_retrieve: str

View File

@@ -0,0 +1,76 @@
from collections.abc import Hashable
from langgraph.types import Send
from onyx.agent_search.answer_question.states import AnswerQuestionInput
from onyx.agent_search.core_state import extract_core_fields
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalInput
from onyx.agent_search.main.states import MainInput
from onyx.agent_search.main.states import MainState
def parallelize_decompozed_answer_queries(state: MainState) -> list[Send | Hashable]:
return [
Send(
"answer_query",
AnswerQuestionInput(
**extract_core_fields(state),
question=question,
),
)
for question in state["initial_decomp_questions"]
]
def send_to_initial_retrieval(state: MainInput) -> list[Send | Hashable]:
return [
Send(
"initial_retrieval",
ExpandedRetrievalInput(
**extract_core_fields(state),
question=state["search_request"].query,
),
)
]
# def continue_to_answer_sub_questions(state: QAState) -> Union[Hashable, list[Hashable]]:
# # Routes re-written queries to the (parallel) retrieval steps
# # Notice the 'Send()' API that takes care of the parallelization
# return [
# Send(
# "sub_answers_graph",
# ResearchQAState(
# sub_question=sub_question["sub_question_str"],
# sub_question_nr=sub_question["sub_question_nr"],
# graph_start_time=state["graph_start_time"],
# primary_llm=state["primary_llm"],
# fast_llm=state["fast_llm"],
# ),
# )
# for sub_question in state["sub_questions"]
# ]
# def continue_to_deep_answer(state: QAState) -> Union[Hashable, list[Hashable]]:
# print("---GO TO DEEP ANSWER OR END---")
# base_answer = state["base_answer"]
# question = state["original_question"]
# BASE_CHECK_MESSAGE = [
# HumanMessage(
# content=BASE_CHECK_PROMPT.format(question=question, base_answer=base_answer)
# )
# ]
# model = state["fast_llm"]
# response = model.invoke(BASE_CHECK_MESSAGE)
# print(f"CAN WE CONTINUE W/O GENERATING A DEEP ANSWER? - {response.pretty_repr()}")
# if response.pretty_repr() == "no":
# return "decompose"
# else:
# return "end"

View File

@@ -0,0 +1,121 @@
from langgraph.graph import END
from langgraph.graph import START
from langgraph.graph import StateGraph
from onyx.agent_search.answer_question.graph_builder import answer_query_graph_builder
from onyx.agent_search.expanded_retrieval.graph_builder import (
expanded_retrieval_graph_builder,
)
from onyx.agent_search.main.edges import parallelize_decompozed_answer_queries
from onyx.agent_search.main.edges import send_to_initial_retrieval
from onyx.agent_search.main.nodes.base_decomp import main_decomp_base
from onyx.agent_search.main.nodes.generate_initial_answer import (
generate_initial_answer,
)
from onyx.agent_search.main.nodes.ingest_answers import ingest_answers
from onyx.agent_search.main.nodes.ingest_initial_retrieval import (
ingest_initial_retrieval,
)
from onyx.agent_search.main.states import MainInput
from onyx.agent_search.main.states import MainState
def main_graph_builder() -> StateGraph:
graph = StateGraph(
state_schema=MainState,
input=MainInput,
)
### Add nodes ###
graph.add_node(
node="base_decomp",
action=main_decomp_base,
)
answer_query_subgraph = answer_query_graph_builder().compile()
graph.add_node(
node="answer_query",
action=answer_query_subgraph,
)
expanded_retrieval_subgraph = expanded_retrieval_graph_builder().compile()
graph.add_node(
node="initial_retrieval",
action=expanded_retrieval_subgraph,
)
graph.add_node(
node="ingest_initial_retrieval",
action=ingest_initial_retrieval,
)
graph.add_node(
node="ingest_answers",
action=ingest_answers,
)
graph.add_node(
node="generate_initial_answer",
action=generate_initial_answer,
)
### Add edges ###
graph.add_conditional_edges(
source=START,
path=send_to_initial_retrieval,
path_map=["initial_retrieval"],
)
graph.add_edge(
start_key="initial_retrieval",
end_key="ingest_initial_retrieval",
)
graph.add_edge(
start_key=START,
end_key="base_decomp",
)
graph.add_conditional_edges(
source="base_decomp",
path=parallelize_decompozed_answer_queries,
path_map=["answer_query"],
)
graph.add_edge(
start_key="answer_query",
end_key="ingest_answers",
)
graph.add_edge(
start_key=["ingest_answers", "ingest_initial_retrieval"],
end_key="generate_initial_answer",
)
graph.add_edge(
start_key="generate_initial_answer",
end_key=END,
)
return graph
if __name__ == "__main__":
from onyx.db.engine import get_session_context_manager
from onyx.llm.factory import get_default_llms
from onyx.context.search.models import SearchRequest
graph = main_graph_builder()
compiled_graph = graph.compile()
primary_llm, fast_llm = get_default_llms()
search_request = SearchRequest(
query="what can you do with onyx or danswer?",
)
with get_session_context_manager() as db_session:
inputs = MainInput(
search_request=search_request,
primary_llm=primary_llm,
fast_llm=fast_llm,
db_session=db_session,
)
for thing in compiled_graph.stream(
input=inputs,
# stream_mode="debug",
# debug=True,
subgraphs=True,
):
# print(thing)
print()

View File

@@ -0,0 +1,31 @@
from langchain_core.messages import HumanMessage
from onyx.agent_search.main.states import BaseDecompUpdate
from onyx.agent_search.main.states import MainState
from onyx.agent_search.shared_graph_utils.prompts import INITIAL_DECOMPOSITION_PROMPT
from onyx.agent_search.shared_graph_utils.utils import clean_and_parse_list_string
def main_decomp_base(state: MainState) -> BaseDecompUpdate:
question = state["search_request"].query
msg = [
HumanMessage(
content=INITIAL_DECOMPOSITION_PROMPT.format(question=question),
)
]
# Get the rewritten queries in a defined format
model = state["fast_llm"]
response = model.invoke(msg)
content = response.pretty_repr()
list_of_subquestions = clean_and_parse_list_string(content)
decomp_list: list[str] = [
sub_question["sub_question"].strip() for sub_question in list_of_subquestions
]
return BaseDecompUpdate(
initial_decomp_questions=decomp_list,
)

View File

@@ -0,0 +1,55 @@
from langchain_core.messages import HumanMessage
from onyx.agent_search.main.states import InitialAnswerUpdate
from onyx.agent_search.main.states import MainState
from onyx.agent_search.shared_graph_utils.prompts import INITIAL_RAG_PROMPT
from onyx.agent_search.shared_graph_utils.utils import format_docs
def generate_initial_answer(state: MainState) -> InitialAnswerUpdate:
print("---GENERATE INITIAL---")
question = state["search_request"].query
docs = state["documents"]
all_original_question_documents = state["all_original_question_documents"]
combined_docs = docs + all_original_question_documents
decomp_answer_results = state["decomp_answer_results"]
good_qa_list: list[str] = []
_SUB_QUESTION_ANSWER_TEMPLATE = """
Sub-Question:\n - {sub_question}\n --\nAnswer:\n - {sub_answer}\n\n
"""
for decomp_answer_result in decomp_answer_results:
if (
decomp_answer_result.quality.lower() == "yes"
and len(decomp_answer_result.answer) > 0
and decomp_answer_result.answer != "I don't know"
):
good_qa_list.append(
_SUB_QUESTION_ANSWER_TEMPLATE.format(
sub_question=decomp_answer_result.question,
sub_answer=decomp_answer_result.answer,
)
)
sub_question_answer_str = "\n\n------\n\n".join(good_qa_list)
msg = [
HumanMessage(
content=INITIAL_RAG_PROMPT.format(
question=question,
context=format_docs(combined_docs),
answered_sub_questions=sub_question_answer_str,
)
)
]
# Grader
model = state["fast_llm"]
response = model.invoke(msg)
answer = response.pretty_repr()
print(answer)
return InitialAnswerUpdate(initial_answer=answer)

View File

@@ -0,0 +1,15 @@
from onyx.agent_search.answer_question.states import AnswerQuestionOutput
from onyx.agent_search.main.states import DecompAnswersUpdate
from onyx.agent_search.shared_graph_utils.operators import dedup_inference_sections
def ingest_answers(state: AnswerQuestionOutput) -> DecompAnswersUpdate:
documents = []
for answer_result in state["answer_results"]:
documents.extend(answer_result.documents)
return DecompAnswersUpdate(
# Deduping is done by the documents operator for the main graph
# so we might not need to dedup here
documents=dedup_inference_sections(documents, []),
decomp_answer_results=state["answer_results"],
)

View File

@@ -0,0 +1,13 @@
from onyx.agent_search.expanded_retrieval.states import ExpandedRetrievalOutput
from onyx.agent_search.main.states import ExpandedRetrievalUpdate
def ingest_initial_retrieval(state: ExpandedRetrievalOutput) -> ExpandedRetrievalUpdate:
return ExpandedRetrievalUpdate(
original_question_retrieval_results=state[
"expanded_retrieval_result"
].expanded_queries_results,
all_original_question_documents=state[
"expanded_retrieval_result"
].all_documents,
)

View File

@@ -0,0 +1,64 @@
from operator import add
from typing import Annotated
from typing import TypedDict
from onyx.agent_search.answer_question.states import QuestionAnswerResults
from onyx.agent_search.core_state import CoreState
from onyx.agent_search.expanded_retrieval.states import QueryResult
from onyx.agent_search.shared_graph_utils.operators import dedup_inference_sections
from onyx.context.search.models import InferenceSection
### States ###
## Update States
class BaseDecompUpdate(TypedDict):
initial_decomp_questions: list[str]
class InitialAnswerUpdate(TypedDict):
initial_answer: str
class DecompAnswersUpdate(TypedDict):
documents: Annotated[list[InferenceSection], dedup_inference_sections]
decomp_answer_results: Annotated[list[QuestionAnswerResults], add]
class ExpandedRetrievalUpdate(TypedDict):
all_original_question_documents: Annotated[
list[InferenceSection], dedup_inference_sections
]
original_question_retrieval_results: list[QueryResult]
## Graph Input State
class MainInput(CoreState):
pass
## Graph State
class MainState(
# This includes the core state
MainInput,
BaseDecompUpdate,
InitialAnswerUpdate,
DecompAnswersUpdate,
ExpandedRetrievalUpdate,
):
pass
## Graph Output State
class MainOutput(TypedDict):
"""
This is not used because defining the output only matters for filtering the output of
a .invoke() call but we are streaming so we just yield the entire state.
"""

View File

@@ -0,0 +1,27 @@
from onyx.agent_search.main.graph_builder import main_graph_builder
from onyx.chat.answer import AnswerStream
from onyx.llm.interfaces import LLM
from onyx.tools.tool import Tool
def run_graph(
query: str,
llm: LLM,
tools: list[Tool],
) -> AnswerStream:
graph = main_graph_builder()
inputs = {
"original_query": query,
"messages": [],
"tools": tools,
"llm": llm,
}
compiled_graph = graph.compile()
output = compiled_graph.invoke(input=inputs)
yield from output
if __name__ == "__main__":
pass
# run_graph("What is the capital of France?", llm, [])

View File

@@ -0,0 +1,12 @@
from typing import Literal
from pydantic import BaseModel
# Pydantic models for structured outputs
class RewrittenQueries(BaseModel):
rewritten_queries: list[str]
class BinaryDecision(BaseModel):
decision: Literal["yes", "no"]

View File

@@ -0,0 +1,9 @@
from onyx.chat.prune_and_merge import _merge_sections
from onyx.context.search.models import InferenceSection
def dedup_inference_sections(
list1: list[InferenceSection], list2: list[InferenceSection]
) -> list[InferenceSection]:
deduped = _merge_sections(list1 + list2)
return deduped

View File

@@ -0,0 +1,443 @@
REWRITE_PROMPT_MULTI_ORIGINAL = """ \n
Please convert an initial user question into a 2-3 more appropriate short and pointed search queries for retrievel from a
document store. Particularly, try to think about resolving ambiguities and make the search queries more specific,
enabling the system to search more broadly.
Also, try to make the search queries not redundant, i.e. not too similar! \n\n
Here is the initial question:
\n ------- \n
{question}
\n ------- \n
Formulate the queries separated by '--' (Do not say 'Query 1: ...', just write the querytext): """
REWRITE_PROMPT_MULTI = """ \n
Please create a list of 2-3 sample documents that could answer an original question. Each document
should be about as long as the original question. \n
Here is the initial question:
\n ------- \n
{question}
\n ------- \n
Formulate the sample documents separated by '--' (Do not say 'Document 1: ...', just write the text): """
BASE_RAG_PROMPT = """ \n
You are an assistant for question-answering tasks. Use the context provided below - and only the
provided context - to answer the question. If you don't know the answer or if the provided context is
empty, just say "I don't know". Do not use your internal knowledge!
Again, only use the provided context and do not use your internal knowledge! If you cannot answer the
question based on the context, say "I don't know". It is a matter of life and death that you do NOT
use your internal knowledge, just the provided information!
Use three sentences maximum and keep the answer concise.
answer concise.\nQuestion:\n {question} \nContext:\n {context} \n\n
\n\n
Answer:"""
SUB_CHECK_PROMPT = """ \n
Your task is to see whether a given answer addresses a given question.
Please do not use any internal knowledge you may have - just focus on whether the answer
as given seems to address the question as given.
Here is the question:
\n ------- \n
{question}
\n ------- \n
Here is the suggested answer:
\n ------- \n
{base_answer}
\n ------- \n
Please answer with yes or no:"""
BASE_CHECK_PROMPT = """ \n
Please check whether 1) the suggested answer seems to fully address the original question AND 2)the
original question requests a simple, factual answer, and there are no ambiguities, judgements,
aggregations, or any other complications that may require extra context. (I.e., if the question is
somewhat addressed, but the answer would benefit from more context, then answer with 'no'.)
Please only answer with 'yes' or 'no' \n
Here is the initial question:
\n ------- \n
{question}
\n ------- \n
Here is the proposed answer:
\n ------- \n
{base_answer}
\n ------- \n
Please answer with yes or no:"""
VERIFIER_PROMPT = """ \n
Please check whether the document provided below seems to be relevant
to get an answer to the provided question. Please
only answer with 'yes' or 'no' \n
Here is the initial question:
\n ------- \n
{question}
\n ------- \n
Here is the document text:
\n ------- \n
{document_content}
\n ------- \n
Please answer with yes or no:"""
INITIAL_DECOMPOSITION_PROMPT_BASIC = """ \n
Please decompose an initial user question into not more than 4 appropriate sub-questions that help to
answer the original question. The purpose for this decomposition is to isolate individulal entities
(i.e., 'compare sales of company A and company B' -> 'what are sales for company A' + 'what are sales
for company B'), split ambiguous terms (i.e., 'what is our success with company A' -> 'what are our
sales with company A' + 'what is our market share with company A' + 'is company A a reference customer
for us'), etc. Each sub-question should be realistically be answerable by a good RAG system. \n
Here is the initial question:
\n ------- \n
{question}
\n ------- \n
Please formulate your answer as a list of subquestions:
Answer:
"""
REWRITE_PROMPT_SINGLE = """ \n
Please convert an initial user question into a more appropriate search query for retrievel from a
document store. \n
Here is the initial question:
\n ------- \n
{question}
\n ------- \n
Formulate the query: """
MODIFIED_RAG_PROMPT = """You are an assistant for question-answering tasks. Use the context provided below
- and only this context - to answer the question. If you don't know the answer, just say "I don't know".
Use three sentences maximum and keep the answer concise.
Pay also particular attention to the sub-questions and their answers, at least it may enrich the answer.
Again, only use the provided context and do not use your internal knowledge! If you cannot answer the
question based on the context, say "I don't know". It is a matter of life and death that you do NOT
use your internal knowledge, just the provided information!
\nQuestion: {question}
\nContext: {combined_context} \n
Answer:"""
ORIG_DEEP_DECOMPOSE_PROMPT = """ \n
An initial user question needs to be answered. An initial answer has been provided but it wasn't quite
good enough. Also, some sub-questions had been answered and this information has been used to provide
the initial answer. Some other subquestions may have been suggested based on little knowledge, but they
were not directly answerable. Also, some entities, relationships and terms are givenm to you so that
you have an idea of how the avaiolable data looks like.
Your role is to generate 3-5 new sub-questions that would help to answer the initial question,
considering:
1) The initial question
2) The initial answer that was found to be unsatisfactory
3) The sub-questions that were answered
4) The sub-questions that were suggested but not answered
5) The entities, relationships and terms that were extracted from the context
The individual questions should be answerable by a good RAG system.
So a good idea would be to use the sub-questions to resolve ambiguities and/or to separate the
question for different entities that may be involved in the original question, but in a way that does
not duplicate questions that were already tried.
Additional Guidelines:
- The sub-questions should be specific to the question and provide richer context for the question,
resolve ambiguities, or address shortcoming of the initial answer
- Each sub-question - when answered - should be relevant for the answer to the original question
- The sub-questions should be free from comparisions, ambiguities,judgements, aggregations, or any
other complications that may require extra context.
- The sub-questions MUST have the full context of the original question so that it can be executed by
a RAG system independently without the original question available
(Example:
- initial question: "What is the capital of France?"
- bad sub-question: "What is the name of the river there?"
- good sub-question: "What is the name of the river that flows through Paris?"
- For each sub-question, please provide a short explanation for why it is a good sub-question. So
generate a list of dictionaries with the following format:
[{{"sub_question": <sub-question>, "explanation": <explanation>, "search_term": <rewrite the
sub-question using as a search phrase for the document store>}}, ...]
\n\n
Here is the initial question:
\n ------- \n
{question}
\n ------- \n
Here is the initial sub-optimal answer:
\n ------- \n
{base_answer}
\n ------- \n
Here are the sub-questions that were answered:
\n ------- \n
{answered_sub_questions}
\n ------- \n
Here are the sub-questions that were suggested but not answered:
\n ------- \n
{failed_sub_questions}
\n ------- \n
And here are the entities, relationships and terms extracted from the context:
\n ------- \n
{entity_term_extraction_str}
\n ------- \n
Please generate the list of good, fully contextualized sub-questions that would help to address the
main question. Again, please find questions that are NOT overlapping too much with the already answered
sub-questions or those that already were suggested and failed.
In other words - what can we try in addition to what has been tried so far?
Please think through it step by step and then generate the list of json dictionaries with the following
format:
{{"sub_questions": [{{"sub_question": <sub-question>,
"explanation": <explanation>,
"search_term": <rewrite the sub-question using as a search phrase for the document store>}},
...]}} """
DEEP_DECOMPOSE_PROMPT = """ \n
An initial user question needs to be answered. An initial answer has been provided but it wasn't quite
good enough. Also, some sub-questions had been answered and this information has been used to provide
the initial answer. Some other subquestions may have been suggested based on little knowledge, but they
were not directly answerable. Also, some entities, relationships and terms are givenm to you so that
you have an idea of how the avaiolable data looks like.
Your role is to generate 4-6 new sub-questions that would help to answer the initial question,
considering:
1) The initial question
2) The initial answer that was found to be unsatisfactory
3) The sub-questions that were answered
4) The sub-questions that were suggested but not answered
5) The entities, relationships and terms that were extracted from the context
The individual questions should be answerable by a good RAG system.
So a good idea would be to use the sub-questions to resolve ambiguities and/or to separate the
question for different entities that may be involved in the original question, but in a way that does
not duplicate questions that were already tried.
Additional Guidelines:
- The sub-questions should be specific to the question and provide richer context for the question,
resolve ambiguities, or address shortcoming of the initial answer
- Each sub-question - when answered - should be relevant for the answer to the original question
- The sub-questions should be free from comparisions, ambiguities,judgements, aggregations, or any
other complications that may require extra context.
- The sub-questions MUST have the full context of the original question so that it can be executed by
a RAG system independently without the original question available
(Example:
- initial question: "What is the capital of France?"
- bad sub-question: "What is the name of the river there?"
- good sub-question: "What is the name of the river that flows through Paris?"
- For each sub-question, please also provide a search term that can be used to retrieve relevant
documents from a document store.
\n\n
Here is the initial question:
\n ------- \n
{question}
\n ------- \n
Here is the initial sub-optimal answer:
\n ------- \n
{base_answer}
\n ------- \n
Here are the sub-questions that were answered:
\n ------- \n
{answered_sub_questions}
\n ------- \n
Here are the sub-questions that were suggested but not answered:
\n ------- \n
{failed_sub_questions}
\n ------- \n
And here are the entities, relationships and terms extracted from the context:
\n ------- \n
{entity_term_extraction_str}
\n ------- \n
Please generate the list of good, fully contextualized sub-questions that would help to address the
main question. Again, please find questions that are NOT overlapping too much with the already answered
sub-questions or those that already were suggested and failed.
In other words - what can we try in addition to what has been tried so far?
Generate the list of json dictionaries with the following format:
{{"sub_questions": [{{"sub_question": <sub-question>,
"search_term": <rewrite the sub-question using as a search phrase for the document store>}},
...]}} """
DECOMPOSE_PROMPT = """ \n
For an initial user question, please generate at 5-10 individual sub-questions whose answers would help
\n to answer the initial question. The individual questions should be answerable by a good RAG system.
So a good idea would be to \n use the sub-questions to resolve ambiguities and/or to separate the
question for different entities that may be involved in the original question.
In order to arrive at meaningful sub-questions, please also consider the context retrieved from the
document store, expressed as entities, relationships and terms. You can also think about the types
mentioned in brackets
Guidelines:
- The sub-questions should be specific to the question and provide richer context for the question,
and or resolve ambiguities
- Each sub-question - when answered - should be relevant for the answer to the original question
- The sub-questions should be free from comparisions, ambiguities,judgements, aggregations, or any
other complications that may require extra context.
- The sub-questions MUST have the full context of the original question so that it can be executed by
a RAG system independently without the original question available
(Example:
- initial question: "What is the capital of France?"
- bad sub-question: "What is the name of the river there?"
- good sub-question: "What is the name of the river that flows through Paris?"
- For each sub-question, please provide a short explanation for why it is a good sub-question. So
generate a list of dictionaries with the following format:
[{{"sub_question": <sub-question>, "explanation": <explanation>, "search_term": <rewrite the
sub-question using as a search phrase for the document store>}}, ...]
\n\n
Here is the initial question:
\n ------- \n
{question}
\n ------- \n
And here are the entities, relationships and terms extracted from the context:
\n ------- \n
{entity_term_extraction_str}
\n ------- \n
Please generate the list of good, fully contextualized sub-questions that would help to address the
main question. Don't be too specific unless the original question is specific.
Please think through it step by step and then generate the list of json dictionaries with the following
format:
{{"sub_questions": [{{"sub_question": <sub-question>,
"explanation": <explanation>,
"search_term": <rewrite the sub-question using as a search phrase for the document store>}},
...]}} """
#### Consolidations
COMBINED_CONTEXT = """-------
Below you will find useful information to answer the original question. First, you see a number of
sub-questions with their answers. This information should be considered to be more focussed and
somewhat more specific to the original question as it tries to contextualized facts.
After that will see the documents that were considered to be relevant to answer the original question.
Here are the sub-questions and their answers:
\n\n {deep_answer_context} \n\n
\n\n Here are the documents that were considered to be relevant to answer the original question:
\n\n {formated_docs} \n\n
----------------
"""
SUB_QUESTION_EXPLANATION_RANKER_PROMPT = """-------
Below you will find a question that we ultimately want to answer (the original question) and a list of
motivations in arbitrary order for generated sub-questions that are supposed to help us answering the
original question. The motivations are formatted as <motivation number>: <motivation explanation>.
(Again, the numbering is arbitrary and does not necessarily mean that 1 is the most relevant
motivation and 2 is less relevant.)
Please rank the motivations in order of relevance for answering the original question. Also, try to
ensure that the top questions do not duplicate too much, i.e. that they are not too similar.
Ultimately, create a list with the motivation numbers where the number of the most relevant
motivations comes first.
Here is the original question:
\n\n {original_question} \n\n
\n\n Here is the list of sub-question motivations:
\n\n {sub_question_explanations} \n\n
----------------
Please think step by step and then generate the ranked list of motivations.
Please format your answer as a json object in the following format:
{{"reasonning": <explain your reasoning for the ranking>,
"ranked_motivations": <ranked list of motivation numbers>}}
"""
INITIAL_DECOMPOSITION_PROMPT = """ \n
Please decompose an initial user question into 2 or 3 appropriate sub-questions that help to
answer the original question. The purpose for this decomposition is to isolate individulal entities
(i.e., 'compare sales of company A and company B' -> 'what are sales for company A' + 'what are sales
for company B'), split ambiguous terms (i.e., 'what is our success with company A' -> 'what are our
sales with company A' + 'what is our market share with company A' + 'is company A a reference customer
for us'), etc. Each sub-question should be realistically be answerable by a good RAG system. \n
For each sub-question, please also create one search term that can be used to retrieve relevant
documents from a document store.
Here is the initial question:
\n ------- \n
{question}
\n ------- \n
Please formulate your answer as a list of json objects with the following format:
[{{"sub_question": <sub-question>, "search_term": <search term>}}, ...]
Answer:
"""
INITIAL_RAG_PROMPT = """ \n
You are an assistant for question-answering tasks. Use the information provided below - and only the
provided information - to answer the provided question.
The information provided below consists of:
1) a number of answered sub-questions - these are very important(!) and definitely should be
considered to answer the question.
2) a number of documents that were also deemed relevant for the question.
If you don't know the answer or if the provided information is empty or insufficient, just say
"I don't know". Do not use your internal knowledge!
Again, only use the provided informationand do not use your internal knowledge! It is a matter of life
and death that you do NOT use your internal knowledge, just the provided information!
Try to keep your answer concise.
And here is the question and the provided information:
\n
\nQuestion:\n {question}
\nAnswered Sub-questions:\n {answered_sub_questions}
\nContext:\n {context} \n\n
\n\n
Answer:"""
ENTITY_TERM_PROMPT = """ \n
Based on the original question and the context retieved from a dataset, please generate a list of
entities (e.g. companies, organizations, industries, products, locations, etc.), terms and concepts
(e.g. sales, revenue, etc.) that are relevant for the question, plus their relations to each other.
\n\n
Here is the original question:
\n ------- \n
{question}
\n ------- \n
And here is the context retrieved:
\n ------- \n
{context}
\n ------- \n
Please format your answer as a json object in the following format:
{{"retrieved_entities_relationships": {{
"entities": [{{
"entity_name": <assign a name for the entity>,
"entity_type": <specify a short type name for the entity, such as 'company', 'location',...>
}}],
"relationships": [{{
"name": <assign a name for the relationship>,
"type": <specify a short type name for the relationship, such as 'sales_to', 'is_location_of',...>,
"entities": [<related entity name 1>, <related entity name 2>]
}}],
"terms": [{{
"term_name": <assign a name for the term>,
"term_type": <specify a short type name for the term, such as 'revenue', 'market_share',...>,
"similar_to": <list terms that are similar to this term>
}}]
}}
}}
"""

View File

@@ -0,0 +1,101 @@
import ast
import json
import re
from collections.abc import Sequence
from datetime import datetime
from datetime import timedelta
from typing import Any
from onyx.context.search.models import InferenceSection
def normalize_whitespace(text: str) -> str:
"""Normalize whitespace in text to single spaces and strip leading/trailing whitespace."""
import re
return re.sub(r"\s+", " ", text.strip())
# Post-processing
def format_docs(docs: Sequence[InferenceSection]) -> str:
return "\n\n".join(doc.combined_content for doc in docs)
def clean_and_parse_list_string(json_string: str) -> list[dict]:
# Remove any prefixes/labels before the actual JSON content
json_string = re.sub(r"^.*?(?=\[)", "", json_string, flags=re.DOTALL)
# Remove markdown code block markers and any newline prefixes
cleaned_string = re.sub(r"```json\n|\n```", "", json_string)
cleaned_string = cleaned_string.replace("\\n", " ").replace("\n", " ")
cleaned_string = " ".join(cleaned_string.split())
# Try parsing with json.loads first, fall back to ast.literal_eval
try:
return json.loads(cleaned_string)
except json.JSONDecodeError:
try:
return ast.literal_eval(cleaned_string)
except (ValueError, SyntaxError) as e:
raise ValueError(f"Failed to parse JSON string: {cleaned_string}") from e
def clean_and_parse_json_string(json_string: str) -> dict[str, Any]:
# Remove markdown code block markers and any newline prefixes
cleaned_string = re.sub(r"```json\n|\n```", "", json_string)
cleaned_string = cleaned_string.replace("\\n", " ").replace("\n", " ")
cleaned_string = " ".join(cleaned_string.split())
# Parse the cleaned string into a Python dictionary
return json.loads(cleaned_string)
def format_entity_term_extraction(entity_term_extraction_dict: dict[str, Any]) -> str:
entities = entity_term_extraction_dict["entities"]
terms = entity_term_extraction_dict["terms"]
relationships = entity_term_extraction_dict["relationships"]
entity_strs = ["\nEntities:\n"]
for entity in entities:
entity_str = f"{entity['entity_name']} ({entity['entity_type']})"
entity_strs.append(entity_str)
entity_str = "\n - ".join(entity_strs)
relationship_strs = ["\n\nRelationships:\n"]
for relationship in relationships:
relationship_str = f"{relationship['name']} ({relationship['type']}): {relationship['entities']}"
relationship_strs.append(relationship_str)
relationship_str = "\n - ".join(relationship_strs)
term_strs = ["\n\nTerms:\n"]
for term in terms:
term_str = f"{term['term_name']} ({term['term_type']}): similar to {term['similar_to']}"
term_strs.append(term_str)
term_str = "\n - ".join(term_strs)
return "\n".join(entity_strs + relationship_strs + term_strs)
def _format_time_delta(time: timedelta) -> str:
seconds_from_start = f"{((time).seconds):03d}"
microseconds_from_start = f"{((time).microseconds):06d}"
return f"{seconds_from_start}.{microseconds_from_start}"
def generate_log_message(
message: str,
node_start_time: datetime,
graph_start_time: datetime | None = None,
) -> str:
current_time = datetime.now()
if graph_start_time is not None:
graph_time_str = _format_time_delta(current_time - graph_start_time)
else:
graph_time_str = "N/A"
node_time_str = _format_time_delta(current_time - node_start_time)
return f"{graph_time_str} ({node_time_str} s): {message}"

View File

@@ -25,6 +25,11 @@ class ToolCallSummary(BaseModel__v1):
tool_call_request: AIMessage
tool_call_result: ToolMessage
# This is a workaround to allow arbitrary types in the model
# TODO: Remove this once we have a better solution
class Config:
arbitrary_types_allowed = True
def tool_call_tokens(
tool_call_summary: ToolCallSummary, llm_tokenizer: BaseTokenizer

View File

@@ -26,10 +26,15 @@ huggingface-hub==0.20.1
jira==3.5.1
jsonref==1.1.0
trafilatura==1.12.2
langchain==0.1.17
langchain-core==0.1.50
langchain-text-splitters==0.0.1
litellm==1.54.1
langchain==0.3.7
langchain-core==0.3.24
langchain-openai==0.2.9
langchain-text-splitters==0.3.2
langchainhub==0.1.21
langgraph==0.2.59
langgraph-checkpoint==2.0.5
langgraph-sdk==0.1.44
litellm==1.53.1
lxml==5.3.0
lxml_html_clean==0.2.2
llama-index==0.9.45