Compare commits

...

5 Commits

Author SHA1 Message Date
Yuhong Sun
0fdec08e60 k 2025-11-06 18:10:34 -08:00
Yuhong Sun
ab5d00b53a Reintroduce Doc Search 2025-11-06 17:26:02 -08:00
Yuhong Sun
27612a61b1 mypy 2025-11-06 16:03:17 -08:00
Yuhong Sun
5d6fabd077 checkpoint 2025-11-06 15:58:20 -08:00
Yuhong Sun
1eef4903c0 not-tested 2025-11-06 14:52:35 -08:00
16 changed files with 425 additions and 307 deletions

1
.gitignore vendored
View File

@@ -44,3 +44,4 @@ CLAUDE.md
# Local .terraform.lock.hcl file
.terraform.lock.hcl
node_modules

View File

@@ -10,9 +10,10 @@ from onyx.chat.models import PersonaOverrideConfig
from onyx.chat.models import QADocsResponse
from onyx.chat.models import ThreadMessage
from onyx.configs.constants import DocumentSource
from onyx.context.search.enums import LLMEvaluationType
from onyx.context.search.enums import SearchType
from onyx.context.search.models import BaseFilters
from onyx.context.search.models import BasicChunkRequest
from onyx.context.search.models import ChunkContext
from onyx.context.search.models import InferenceChunk
from onyx.context.search.models import RerankingDetails
from onyx.context.search.models import RetrievalDetails
from onyx.server.manage.models import StandardAnswer
@@ -29,14 +30,12 @@ class StandardAnswerResponse(BaseModel):
standard_answers: list[StandardAnswer] = Field(default_factory=list)
class DocumentSearchRequest(ChunkContext):
message: str
search_type: SearchType
retrieval_options: RetrievalDetails
recency_bias_multiplier: float = 1.0
evaluation_type: LLMEvaluationType
# None to use system defaults for reranking
rerank_settings: RerankingDetails | None = None
class DocumentSearchRequest(BasicChunkRequest):
user_selected_filters: BaseFilters | None = None
class DocumentSearchResponse(BaseModel):
top_documents: list[InferenceChunk]
class BasicCreateChatMessageRequest(ChunkContext):

View File

@@ -25,16 +25,15 @@ from onyx.chat.models import QADocsResponse
from onyx.chat.process_message import gather_stream
from onyx.chat.process_message import stream_chat_message_objects
from onyx.configs.onyxbot_configs import MAX_THREAD_CONTEXT_PERCENTAGE
from onyx.context.search.models import SavedSearchDocWithContent
from onyx.context.search.models import SearchRequest
from onyx.context.search.pipeline import SearchPipeline
from onyx.context.search.utils import dedupe_documents
from onyx.context.search.utils import drop_llm_indices
from onyx.context.search.utils import relevant_sections_to_indices
from onyx.context.search.models import ChunkSearchRequest
from onyx.context.search.models import InferenceChunk
from onyx.context.search.pipeline import search_pipeline
from onyx.db.engine.sql_engine import get_session
from onyx.db.models import Persona
from onyx.db.models import User
from onyx.db.persona import get_persona_by_id
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.factory import get_default_document_index
from onyx.llm.factory import get_default_llms
from onyx.llm.factory import get_llms_for_persona
from onyx.llm.factory import get_main_llm_from_tuple
@@ -49,8 +48,22 @@ basic_router = APIRouter(prefix="/query")
class DocumentSearchResponse(BaseModel):
top_documents: list[SavedSearchDocWithContent]
llm_indices: list[int]
top_chunks: list[InferenceChunk]
def _translate_search_request(
search_request: DocumentSearchRequest,
) -> ChunkSearchRequest:
return ChunkSearchRequest(
query=search_request.query,
hybrid_alpha=search_request.hybrid_alpha,
recency_bias_multiplier=search_request.recency_bias_multiplier,
query_keywords=search_request.query_keywords,
limit=search_request.limit,
offset=search_request.offset,
user_selected_filters=search_request.user_selected_filters,
# No bypass_acl, not allowed for this endpoint
)
@basic_router.post("/document-search")
@@ -60,81 +73,28 @@ def handle_search_request(
db_session: Session = Depends(get_session),
) -> DocumentSearchResponse:
"""Simple search endpoint, does not create a new message or records in the DB"""
query = search_request.message
query = search_request.query
logger.notice(f"Received document search query: {query}")
llm, fast_llm = get_default_llms()
llm, _ = get_default_llms()
search_pipeline = SearchPipeline(
search_request=SearchRequest(
query=query,
search_type=search_request.search_type,
human_selected_filters=search_request.retrieval_options.filters,
enable_auto_detect_filters=search_request.retrieval_options.enable_auto_detect_filters,
persona=None, # For simplicity, default settings should be good for this search
offset=search_request.retrieval_options.offset,
limit=search_request.retrieval_options.limit,
rerank_settings=search_request.rerank_settings,
evaluation_type=search_request.evaluation_type,
chunks_above=search_request.chunks_above,
chunks_below=search_request.chunks_below,
full_doc=search_request.full_doc,
),
search_settings = get_current_search_settings(db_session)
document_index = get_default_document_index(
search_settings=search_settings,
secondary_search_settings=None,
)
retrieved_chunks = search_pipeline(
chunk_search_request=_translate_search_request(search_request),
document_index=document_index,
user=user,
llm=llm,
fast_llm=fast_llm,
skip_query_analysis=False,
persona=None,
db_session=db_session,
bypass_acl=False,
)
top_sections = search_pipeline.reranked_sections
relevance_sections = search_pipeline.section_relevance
top_docs = [
SavedSearchDocWithContent(
document_id=section.center_chunk.document_id,
chunk_ind=section.center_chunk.chunk_id,
content=section.center_chunk.content,
semantic_identifier=section.center_chunk.semantic_identifier or "Unknown",
link=(
section.center_chunk.source_links.get(0)
if section.center_chunk.source_links
else None
),
blurb=section.center_chunk.blurb,
source_type=section.center_chunk.source_type,
boost=section.center_chunk.boost,
hidden=section.center_chunk.hidden,
metadata=section.center_chunk.metadata,
score=section.center_chunk.score or 0.0,
match_highlights=section.center_chunk.match_highlights,
updated_at=section.center_chunk.updated_at,
primary_owners=section.center_chunk.primary_owners,
secondary_owners=section.center_chunk.secondary_owners,
is_internet=False,
db_doc_id=0,
)
for section in top_sections
]
# Deduping happens at the last step to avoid harming quality by dropping content early on
deduped_docs = top_docs
dropped_inds = None
if search_request.retrieval_options.dedupe_docs:
deduped_docs, dropped_inds = dedupe_documents(top_docs)
llm_indices = relevant_sections_to_indices(
relevance_sections=relevance_sections, items=deduped_docs
auto_detect_filters=False,
llm=llm,
)
if dropped_inds:
llm_indices = drop_llm_indices(
llm_indices=llm_indices,
search_docs=deduped_docs,
dropped_indices=dropped_inds,
)
return DocumentSearchResponse(top_documents=deduped_docs, llm_indices=llm_indices)
return DocumentSearchResponse(top_chunks=retrieved_chunks)
def get_answer_stream(

View File

@@ -15,8 +15,8 @@ from onyx.configs.model_configs import DOC_EMBEDDING_CONTEXT_SIZE
from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import TextSection
from onyx.context.search.federated.models import SlackMessage
from onyx.context.search.models import ChunkIndexRequest
from onyx.context.search.models import InferenceChunk
from onyx.context.search.models import SearchQuery
from onyx.db.document import DocumentSource
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.document_index_utils import (
@@ -77,7 +77,7 @@ def _should_skip_channel(
return False
def build_slack_queries(query: SearchQuery, llm: LLM) -> list[str]:
def build_slack_queries(query: ChunkIndexRequest, llm: LLM) -> list[str]:
# get time filter
time_filter = ""
time_cutoff = query.filters.time_cutoff
@@ -86,8 +86,8 @@ def build_slack_queries(query: SearchQuery, llm: LLM) -> list[str]:
time_cutoff = time_cutoff - timedelta(days=1)
time_filter = f" after:{time_cutoff.strftime('%Y-%m-%d')}"
# use llm to generate slack queries (use original query to use same keywords as the user)
prompt = SLACK_QUERY_EXPANSION_PROMPT.format(query=query.original_query)
# use llm to generate slack queries
prompt = SLACK_QUERY_EXPANSION_PROMPT.format(query=query)
try:
msg = HumanMessage(content=prompt)
response = llm.invoke([msg])
@@ -123,7 +123,7 @@ def _is_public_channel(channel_info: dict[str, Any]) -> bool:
def query_slack(
query_string: str,
original_query: SearchQuery,
original_query: ChunkIndexRequest,
access_token: str,
limit: int | None = None,
allowed_private_channel: str | None = None,
@@ -376,7 +376,7 @@ def convert_slack_score(slack_score: float) -> float:
@log_function_time(print_only=True)
def slack_retrieval(
query: SearchQuery,
query: ChunkIndexRequest,
access_token: str,
db_session: Session,
limit: int | None = None,

View File

@@ -152,6 +152,39 @@ class ChunkContext(BaseModel):
return value
class BasicChunkRequest(BaseModel):
query: str
# In case the caller wants to override the weighting between semantic and keyword search.
hybrid_alpha: float | None = None
# In case some queries favor recency more than other queries.
recency_bias_multiplier: float = 1.0
# Sometimes we may want to extract specific keywords from a more semantic query for
# a better keyword search.
query_keywords: list[str] | None = None
# TODO: Currently this is never set
limit: int | None = None
offset: int | None = None
class ChunkSearchRequest(BasicChunkRequest):
# Final filters are calculated from these
user_selected_filters: BaseFilters | None = None
# Use with caution!
bypass_acl: bool = False
# From the Chat Session we know what project (if any) this search should include
# From the user uploads and persona uploaded files, we know which of those to include
class ChunkIndexRequest(BasicChunkRequest):
# Calculated final filters
filters: IndexFilters
class SearchRequest(ChunkContext):
query: str

View File

@@ -1,7 +1,9 @@
from collections import defaultdict
from collections.abc import Callable
from collections.abc import Iterator
from datetime import datetime
from typing import cast
from uuid import UUID
from sqlalchemy.orm import Session
@@ -16,6 +18,9 @@ from onyx.configs.chat_configs import DISABLE_LLM_DOC_RELEVANCE
from onyx.context.search.enums import LLMEvaluationType
from onyx.context.search.enums import QueryFlow
from onyx.context.search.enums import SearchType
from onyx.context.search.models import BaseFilters
from onyx.context.search.models import ChunkIndexRequest
from onyx.context.search.models import ChunkSearchRequest
from onyx.context.search.models import IndexFilters
from onyx.context.search.models import InferenceChunk
from onyx.context.search.models import InferenceSection
@@ -23,30 +28,181 @@ from onyx.context.search.models import RerankMetricsContainer
from onyx.context.search.models import RetrievalMetricsContainer
from onyx.context.search.models import SearchQuery
from onyx.context.search.models import SearchRequest
from onyx.context.search.postprocessing.postprocessing import cleanup_chunks
from onyx.context.search.postprocessing.postprocessing import search_postprocessing
from onyx.context.search.preprocessing.preprocessing import retrieval_preprocessing
from onyx.context.search.retrieval.search_runner import (
retrieve_chunks,
from onyx.context.search.preprocessing.access_filters import (
build_access_filters_for_user,
)
from onyx.context.search.preprocessing.preprocessing import retrieval_preprocessing
from onyx.context.search.retrieval.search_runner import retrieve_chunks
from onyx.context.search.retrieval.search_runner import search_chunks
from onyx.context.search.utils import inference_section_from_chunks
from onyx.context.search.utils import relevant_sections_to_indices
from onyx.db.models import Persona
from onyx.db.models import User
from onyx.db.search_settings import get_current_search_settings
from onyx.document_index.factory import get_default_document_index
from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.interfaces import VespaChunkRequest
from onyx.llm.interfaces import LLM
from onyx.onyxbot.slack.models import SlackContext
from onyx.secondary_llm_flows.agentic_evaluation import evaluate_inference_section
from onyx.secondary_llm_flows.source_filter import extract_source_filter
from onyx.secondary_llm_flows.time_filter import extract_time_filter
from onyx.utils.logger import setup_logger
from onyx.utils.threadpool_concurrency import FunctionCall
from onyx.utils.threadpool_concurrency import run_functions_in_parallel
from onyx.utils.timing import log_function_time
from onyx.utils.variable_functionality import fetch_ee_implementation_or_noop
from shared_configs.configs import MULTI_TENANT
from shared_configs.contextvars import get_current_tenant_id
logger = setup_logger()
@log_function_time(print_only=True)
def _build_index_filters(
user_provided_filters: BaseFilters | None,
user: User | None, # Used for ACLs
project_id: int | None,
user_file_ids: list[UUID] | None,
persona_document_sets: list[str] | None,
persona_time_cutoff: datetime | None,
db_session: Session,
auto_detect_filters: bool = False,
query: str | None = None,
llm: LLM | None = None,
bypass_acl: bool = False,
) -> IndexFilters:
if auto_detect_filters and (llm is None or query is None):
raise RuntimeError("LLM and query are required for auto detect filters")
base_filters = user_provided_filters or BaseFilters()
if (
user_provided_filters
and user_provided_filters.document_set is None
and persona_document_sets is not None
):
base_filters.document_set = persona_document_sets
time_filter = base_filters.time_cutoff or persona_time_cutoff
source_filter = base_filters.source_type
detected_time_filter = None
detected_source_filter = None
if auto_detect_filters:
time_filter_fnc = FunctionCall(extract_time_filter, (query, llm), {})
if not source_filter:
source_filter_fnc = FunctionCall(
extract_source_filter, (query, llm, db_session), {}
)
else:
source_filter_fnc = None
functions_to_run = [fn for fn in [time_filter_fnc, source_filter_fnc] if fn]
parallel_results = run_functions_in_parallel(functions_to_run)
# Detected favor recent is not used for now
detected_time_filter, _detected_favor_recent = parallel_results[
time_filter_fnc.result_id
]
if source_filter_fnc:
detected_source_filter = parallel_results[source_filter_fnc.result_id]
# If the detected time filter is more recent, use that one
if time_filter and detected_time_filter and detected_time_filter > time_filter:
time_filter = detected_time_filter
# If the user has explicitly set a source filter, use that one
if not source_filter and detected_source_filter:
source_filter = detected_source_filter
user_acl_filters = (
None if bypass_acl else build_access_filters_for_user(user, db_session)
)
final_filters = IndexFilters(
user_file_ids=user_file_ids,
project_id=project_id,
source_type=source_filter,
document_set=persona_document_sets,
time_cutoff=time_filter,
tags=base_filters.tags,
access_control_list=user_acl_filters,
tenant_id=get_current_tenant_id() if MULTI_TENANT else None,
kg_entities=base_filters.kg_entities,
kg_relationships=base_filters.kg_relationships,
kg_terms=base_filters.kg_terms,
kg_sources=base_filters.kg_sources,
kg_chunk_id_zero_only=base_filters.kg_chunk_id_zero_only,
)
return final_filters
def search_pipeline(
# Query and settings
chunk_search_request: ChunkSearchRequest,
# Document index to search over
# Note that federated sources will also be used (not related to this arg)
document_index: DocumentIndex,
# Used for ACLs and federated search
user: User | None,
# Used for default filters and settings
persona: Persona | None,
db_session: Session,
auto_detect_filters: bool = False,
llm: LLM | None = None,
# Needed for federated Slack search
slack_context: SlackContext | None = None,
# If a project ID is provided, it will be exclusively scoped to that project
project_id: int | None = None,
) -> list[InferenceChunk]:
user_uploaded_persona_files: list[UUID] | None = (
[user_file.id for user_file in persona.user_files] if persona else None
)
persona_document_sets: list[str] | None = (
[persona_document_set.name for persona_document_set in persona.document_sets]
if persona
else None
)
persona_time_cutoff: datetime | None = (
persona.search_start_date if persona else None
)
filters = _build_index_filters(
user_provided_filters=chunk_search_request.user_selected_filters,
user=user,
project_id=project_id,
user_file_ids=user_uploaded_persona_files,
persona_document_sets=persona_document_sets,
persona_time_cutoff=persona_time_cutoff,
db_session=db_session,
auto_detect_filters=auto_detect_filters,
query=chunk_search_request.query,
llm=llm,
bypass_acl=chunk_search_request.bypass_acl,
)
query_request = ChunkIndexRequest(
query=chunk_search_request.query,
hybrid_alpha=chunk_search_request.hybrid_alpha,
recency_bias_multiplier=chunk_search_request.recency_bias_multiplier,
query_keywords=chunk_search_request.query_keywords,
filters=filters,
)
retrieved_chunks = search_chunks(
query_request=query_request,
# Needed for federated Slack search
user_id=user.id if user else None,
document_index=document_index,
db_session=db_session,
slack_context=slack_context,
)
return retrieved_chunks
class SearchPipeline:
def __init__(
self,
@@ -223,11 +379,9 @@ class SearchPipeline:
)
inference_chunks.extend(
cleanup_chunks(
self.document_index.id_based_retrieval(
chunk_requests=chunk_requests,
filters=IndexFilters(access_control_list=None),
)
self.document_index.id_based_retrieval(
chunk_requests=chunk_requests,
filters=IndexFilters(access_control_list=None),
)
)
@@ -297,12 +451,10 @@ class SearchPipeline:
if chunk_requests:
inference_chunks.extend(
cleanup_chunks(
self.document_index.id_based_retrieval(
chunk_requests=chunk_requests,
filters=IndexFilters(access_control_list=None),
batch_retrieval=True,
)
self.document_index.id_based_retrieval(
chunk_requests=chunk_requests,
filters=IndexFilters(access_control_list=None),
batch_retrieval=True,
)
)

View File

@@ -9,17 +9,14 @@ from langchain_core.messages import HumanMessage
from langchain_core.messages import SystemMessage
from onyx.chat.models import SectionRelevancePiece
from onyx.configs.app_configs import BLURB_SIZE
from onyx.configs.app_configs import IMAGE_ANALYSIS_SYSTEM_PROMPT
from onyx.configs.chat_configs import DISABLE_LLM_DOC_RELEVANCE
from onyx.configs.constants import RETURN_SEPARATOR
from onyx.configs.llm_configs import get_search_time_image_analysis_enabled
from onyx.configs.model_configs import CROSS_ENCODER_RANGE_MAX
from onyx.configs.model_configs import CROSS_ENCODER_RANGE_MIN
from onyx.context.search.enums import LLMEvaluationType
from onyx.context.search.models import ChunkMetric
from onyx.context.search.models import InferenceChunk
from onyx.context.search.models import InferenceChunkUncleaned
from onyx.context.search.models import InferenceSection
from onyx.context.search.models import MAX_METRICS_CONTENT
from onyx.context.search.models import RerankingDetails
@@ -170,51 +167,6 @@ def _log_top_section_links(search_flow: str, sections: list[InferenceSection]) -
logger.debug(f"Top links from {search_flow} search: {', '.join(top_links)}")
def cleanup_chunks(chunks: list[InferenceChunkUncleaned]) -> list[InferenceChunk]:
def _remove_title(chunk: InferenceChunkUncleaned) -> str:
if not chunk.title or not chunk.content:
return chunk.content
if chunk.content.startswith(chunk.title):
return chunk.content[len(chunk.title) :].lstrip()
# BLURB SIZE is by token instead of char but each token is at least 1 char
# If this prefix matches the content, it's assumed the title was prepended
if chunk.content.startswith(chunk.title[:BLURB_SIZE]):
return (
chunk.content.split(RETURN_SEPARATOR, 1)[-1]
if RETURN_SEPARATOR in chunk.content
else chunk.content
)
return chunk.content
def _remove_metadata_suffix(chunk: InferenceChunkUncleaned) -> str:
if not chunk.metadata_suffix:
return chunk.content
return chunk.content.removesuffix(chunk.metadata_suffix).rstrip(
RETURN_SEPARATOR
)
def _remove_contextual_rag(chunk: InferenceChunkUncleaned) -> str:
# remove document summary
if chunk.content.startswith(chunk.doc_summary):
chunk.content = chunk.content[len(chunk.doc_summary) :].lstrip()
# remove chunk context
if chunk.content.endswith(chunk.chunk_context):
chunk.content = chunk.content[
: len(chunk.content) - len(chunk.chunk_context)
].rstrip()
return chunk.content
for chunk in chunks:
chunk.content = _remove_title(chunk)
chunk.content = _remove_metadata_suffix(chunk)
chunk.content = _remove_contextual_rag(chunk)
return [chunk.to_inference_chunk() for chunk in chunks]
@log_function_time(print_only=True)
def semantic_reranking(
query_str: str,

View File

@@ -5,16 +5,16 @@ from uuid import UUID
from sqlalchemy.orm import Session
from onyx.agents.agent_search.shared_graph_utils.models import QueryExpansionType
from onyx.configs.chat_configs import NUM_RETURNED_HITS
from onyx.context.search.enums import SearchType
from onyx.context.search.models import ChunkIndexRequest
from onyx.context.search.models import ChunkMetric
from onyx.context.search.models import IndexFilters
from onyx.context.search.models import InferenceChunk
from onyx.context.search.models import InferenceChunkUncleaned
from onyx.context.search.models import InferenceSection
from onyx.context.search.models import MAX_METRICS_CONTENT
from onyx.context.search.models import RetrievalMetricsContainer
from onyx.context.search.models import SearchQuery
from onyx.context.search.postprocessing.postprocessing import cleanup_chunks
from onyx.context.search.preprocessing.preprocessing import HYBRID_ALPHA
from onyx.context.search.preprocessing.preprocessing import HYBRID_ALPHA_KEYWORD
from onyx.context.search.utils import get_query_embedding
@@ -43,9 +43,9 @@ logger = setup_logger()
def _dedupe_chunks(
chunks: list[InferenceChunkUncleaned],
) -> list[InferenceChunkUncleaned]:
used_chunks: dict[tuple[str, int], InferenceChunkUncleaned] = {}
chunks: list[InferenceChunk],
) -> list[InferenceChunk]:
used_chunks: dict[tuple[str, int], InferenceChunk] = {}
for chunk in chunks:
key = (chunk.document_id, chunk.chunk_id)
if key not in used_chunks:
@@ -137,17 +137,15 @@ def doc_index_retrieval(
keyword_embeddings_thread: TimeoutThread[list[Embedding]] | None = None
semantic_embeddings_thread: TimeoutThread[list[Embedding]] | None = None
top_base_chunks_standard_ranking_thread: (
TimeoutThread[list[InferenceChunkUncleaned]] | None
TimeoutThread[list[InferenceChunk]] | None
) = None
top_semantic_chunks_thread: TimeoutThread[list[InferenceChunkUncleaned]] | None = (
None
)
top_semantic_chunks_thread: TimeoutThread[list[InferenceChunk]] | None = None
keyword_embeddings: list[Embedding] | None = None
semantic_embeddings: list[Embedding] | None = None
top_semantic_chunks: list[InferenceChunkUncleaned] | None = None
top_semantic_chunks: list[InferenceChunk] | None = None
# original retrieveal method
top_base_chunks_standard_ranking_thread = run_in_background(
@@ -251,7 +249,7 @@ def doc_index_retrieval(
logger.info(f"Overall number of top initial retrieval chunks: {len(top_chunks)}")
retrieval_requests: list[VespaChunkRequest] = []
normal_chunks: list[InferenceChunkUncleaned] = []
normal_chunks: list[InferenceChunk] = []
referenced_chunk_scores: dict[tuple[str, int], float] = {}
for chunk in top_chunks:
if chunk.large_chunk_reference_ids:
@@ -274,7 +272,7 @@ def doc_index_retrieval(
# If there are no large chunks, just return the normal chunks
if not retrieval_requests:
return cleanup_chunks(normal_chunks)
return normal_chunks
# Retrieve and return the referenced normal chunks from the large chunks
retrieved_inference_chunks = document_index.id_based_retrieval(
@@ -298,7 +296,7 @@ def doc_index_retrieval(
for reference in referenced_chunk_scores.keys():
logger.error(f"Chunk {reference} not found in retrieved chunks")
unique_chunks: dict[tuple[str, int], InferenceChunkUncleaned] = {
unique_chunks: dict[tuple[str, int], InferenceChunk] = {
(chunk.document_id, chunk.chunk_id): chunk for chunk in normal_chunks
}
@@ -314,7 +312,7 @@ def doc_index_retrieval(
# Deduplicate the chunks
deduped_chunks = list(unique_chunks.values())
deduped_chunks.sort(key=lambda chunk: chunk.score or 0, reverse=True)
return cleanup_chunks(deduped_chunks)
return deduped_chunks
def _simplify_text(text: str) -> str:
@@ -323,6 +321,7 @@ def _simplify_text(text: str) -> str:
).lower()
# TODO delete this
def retrieve_chunks(
query: SearchQuery,
user_id: UUID | None,
@@ -346,7 +345,7 @@ def retrieve_chunks(
federated_retrieval_infos = get_federated_retrieval_functions(
db_session,
user_id,
query.filters.source_type,
set(query.filters.source_type) if query.filters.source_type else None,
query.filters.document_set,
slack_context,
)
@@ -426,6 +425,84 @@ def retrieve_chunks(
return top_chunks
def _embed_and_search(
query_request: ChunkIndexRequest,
document_index: DocumentIndex,
db_session: Session,
) -> list[InferenceChunk]:
query_embedding = get_query_embedding(query_request.query, db_session)
return document_index.hybrid_retrieval(
query=query_request.query,
query_embedding=query_embedding,
final_keywords=query_request.query_keywords,
filters=query_request.filters,
hybrid_alpha=query_request.hybrid_alpha or HYBRID_ALPHA,
time_decay_multiplier=query_request.recency_bias_multiplier,
num_to_retrieve=query_request.limit or NUM_RETURNED_HITS,
# Hardcoded to this for now
ranking_profile_type=QueryExpansionType.SEMANTIC,
offset=query_request.offset or 0,
)
def search_chunks(
query_request: ChunkIndexRequest,
user_id: UUID | None,
document_index: DocumentIndex,
db_session: Session,
slack_context: SlackContext | None = None,
) -> list[InferenceChunk]:
run_queries: list[tuple[Callable, tuple]] = []
source_filters = (
set(query_request.filters.source_type)
if query_request.filters.source_type
else None
)
# Federated retrieval
federated_retrieval_infos = get_federated_retrieval_functions(
db_session=db_session,
user_id=user_id,
source_types=source_filters,
document_set_names=query_request.filters.document_set,
slack_context=slack_context,
)
federated_sources = set(
federated_retrieval_info.source.to_non_federated_source()
for federated_retrieval_info in federated_retrieval_infos
)
for federated_retrieval_info in federated_retrieval_infos:
run_queries.append(
(federated_retrieval_info.retrieval_function, (query_request,))
)
# Don't run normal hybrid search if there are no indexed sources to
# search over
normal_search_enabled = (source_filters is None) or (
len(set(source_filters) - federated_sources) > 0
)
if normal_search_enabled:
run_queries.append(
(_embed_and_search, (query_request, document_index, db_session))
)
parallel_search_results = run_functions_tuples_in_parallel(run_queries)
top_chunks = combine_retrieval_results(parallel_search_results)
if not top_chunks:
logger.debug(
f"Hybrid search returned no results for query: {query_request.query}"
f"with filters: {query_request.filters}"
)
return []
return top_chunks
def inference_sections_from_ids(
doc_identifiers: list[tuple[str, int]],
document_index: DocumentIndex,
@@ -445,13 +522,12 @@ def inference_sections_from_ids(
filters=filters,
)
cleaned_chunks = cleanup_chunks(retrieved_chunks)
if not cleaned_chunks:
if not retrieved_chunks:
return []
# Group chunks by document ID
chunks_by_doc_id: dict[str, list[InferenceChunk]] = {}
for chunk in cleaned_chunks:
for chunk in retrieved_chunks:
chunks_by_doc_id.setdefault(chunk.document_id, []).append(chunk)
inference_sections = [

View File

@@ -8,7 +8,7 @@ from onyx.access.models import ExternalAccess
from onyx.agents.agent_search.shared_graph_utils.models import QueryExpansionType
from onyx.configs.chat_configs import TITLE_CONTENT_RATIO
from onyx.context.search.models import IndexFilters
from onyx.context.search.models import InferenceChunkUncleaned
from onyx.context.search.models import InferenceChunk
from onyx.db.enums import EmbeddingPrecision
from onyx.indexing.models import DocMetadataAwareIndexChunk
from shared_configs.model_server_models import Embedding
@@ -324,7 +324,7 @@ class IdRetrievalCapable(abc.ABC):
chunk_requests: list[VespaChunkRequest],
filters: IndexFilters,
batch_retrieval: bool = False,
) -> list[InferenceChunkUncleaned]:
) -> list[InferenceChunk]:
"""
Fetch chunk(s) based on document id
@@ -363,7 +363,7 @@ class HybridCapable(abc.ABC):
ranking_profile_type: QueryExpansionType,
offset: int = 0,
title_content_ratio: float | None = TITLE_CONTENT_RATIO,
) -> list[InferenceChunkUncleaned]:
) -> list[InferenceChunk]:
"""
Run hybrid search and return a list of inference chunks.
@@ -414,7 +414,7 @@ class AdminCapable(abc.ABC):
filters: IndexFilters,
num_to_retrieve: int,
offset: int = 0,
) -> list[InferenceChunkUncleaned]:
) -> list[InferenceChunk]:
"""
Run the special search for the admin document explorer page
@@ -438,7 +438,7 @@ class RandomCapable(abc.ABC):
self,
filters: IndexFilters,
num_to_retrieve: int = 10,
) -> list[InferenceChunkUncleaned]:
) -> list[InferenceChunk]:
"""Retrieve random chunks matching the filters"""
raise NotImplementedError

View File

@@ -22,12 +22,15 @@ from pydantic import BaseModel
from retry import retry
from onyx.agents.agent_search.shared_graph_utils.models import QueryExpansionType
from onyx.configs.app_configs import BLURB_SIZE
from onyx.configs.chat_configs import DOC_TIME_DECAY
from onyx.configs.chat_configs import NUM_RETURNED_HITS
from onyx.configs.chat_configs import TITLE_CONTENT_RATIO
from onyx.configs.chat_configs import VESPA_SEARCHER_THREADS
from onyx.configs.constants import KV_REINDEX_KEY
from onyx.configs.constants import RETURN_SEPARATOR
from onyx.context.search.models import IndexFilters
from onyx.context.search.models import InferenceChunk
from onyx.context.search.models import InferenceChunkUncleaned
from onyx.db.enums import EmbeddingPrecision
from onyx.document_index.document_index_utils import get_document_chunk_ids
@@ -183,6 +186,51 @@ def add_ngrams_to_schema(schema_content: str) -> str:
return schema_content
def cleanup_chunks(chunks: list[InferenceChunkUncleaned]) -> list[InferenceChunk]:
def _remove_title(chunk: InferenceChunkUncleaned) -> str:
if not chunk.title or not chunk.content:
return chunk.content
if chunk.content.startswith(chunk.title):
return chunk.content[len(chunk.title) :].lstrip()
# BLURB SIZE is by token instead of char but each token is at least 1 char
# If this prefix matches the content, it's assumed the title was prepended
if chunk.content.startswith(chunk.title[:BLURB_SIZE]):
return (
chunk.content.split(RETURN_SEPARATOR, 1)[-1]
if RETURN_SEPARATOR in chunk.content
else chunk.content
)
return chunk.content
def _remove_metadata_suffix(chunk: InferenceChunkUncleaned) -> str:
if not chunk.metadata_suffix:
return chunk.content
return chunk.content.removesuffix(chunk.metadata_suffix).rstrip(
RETURN_SEPARATOR
)
def _remove_contextual_rag(chunk: InferenceChunkUncleaned) -> str:
# remove document summary
if chunk.content.startswith(chunk.doc_summary):
chunk.content = chunk.content[len(chunk.doc_summary) :].lstrip()
# remove chunk context
if chunk.content.endswith(chunk.chunk_context):
chunk.content = chunk.content[
: len(chunk.content) - len(chunk.chunk_context)
].rstrip()
return chunk.content
for chunk in chunks:
chunk.content = _remove_title(chunk)
chunk.content = _remove_metadata_suffix(chunk)
chunk.content = _remove_contextual_rag(chunk)
return [chunk.to_inference_chunk() for chunk in chunks]
class VespaIndex(DocumentIndex):
VESPA_SCHEMA_JINJA_FILENAME = "danswer_chunk.sd.jinja"
@@ -906,7 +954,7 @@ class VespaIndex(DocumentIndex):
filters: IndexFilters,
batch_retrieval: bool = False,
get_large_chunks: bool = False,
) -> list[InferenceChunkUncleaned]:
) -> list[InferenceChunk]:
# make sure to use the vespa-afied document IDs
chunk_requests = [
VespaChunkRequest(
@@ -920,17 +968,21 @@ class VespaIndex(DocumentIndex):
]
if batch_retrieval:
return batch_search_api_retrieval(
return cleanup_chunks(
batch_search_api_retrieval(
index_name=self.index_name,
chunk_requests=chunk_requests,
filters=filters,
get_large_chunks=get_large_chunks,
)
)
return cleanup_chunks(
parallel_visit_api_retrieval(
index_name=self.index_name,
chunk_requests=chunk_requests,
filters=filters,
get_large_chunks=get_large_chunks,
)
return parallel_visit_api_retrieval(
index_name=self.index_name,
chunk_requests=chunk_requests,
filters=filters,
get_large_chunks=get_large_chunks,
)
def hybrid_retrieval(
@@ -942,10 +994,10 @@ class VespaIndex(DocumentIndex):
hybrid_alpha: float,
time_decay_multiplier: float,
num_to_retrieve: int,
ranking_profile_type: QueryExpansionType,
ranking_profile_type: QueryExpansionType = QueryExpansionType.SEMANTIC,
offset: int = 0,
title_content_ratio: float | None = TITLE_CONTENT_RATIO,
) -> list[InferenceChunkUncleaned]:
) -> list[InferenceChunk]:
vespa_where_clauses = build_vespa_filters(filters)
# Needs to be at least as much as the value set in Vespa schema config
target_hits = max(10 * num_to_retrieve, 1000)
@@ -987,7 +1039,7 @@ class VespaIndex(DocumentIndex):
"timeout": VESPA_TIMEOUT,
}
return query_vespa(params)
return cleanup_chunks(query_vespa(params))
def admin_retrieval(
self,
@@ -995,7 +1047,7 @@ class VespaIndex(DocumentIndex):
filters: IndexFilters,
num_to_retrieve: int = NUM_RETURNED_HITS,
offset: int = 0,
) -> list[InferenceChunkUncleaned]:
) -> list[InferenceChunk]:
vespa_where_clauses = build_vespa_filters(filters, include_hidden=True)
yql = (
YQL_BASE.format(index_name=self.index_name)
@@ -1016,7 +1068,7 @@ class VespaIndex(DocumentIndex):
"timeout": VESPA_TIMEOUT,
}
return query_vespa(params)
return cleanup_chunks(query_vespa(params))
# Retrieves chunk information for a document:
# - Determines the last indexed chunk
@@ -1224,7 +1276,7 @@ class VespaIndex(DocumentIndex):
self,
filters: IndexFilters,
num_to_retrieve: int = 10,
) -> list[InferenceChunkUncleaned]:
) -> list[InferenceChunk]:
"""Retrieve random chunks matching the filters using Vespa's random ranking
This method is currently used for random chunk retrieval in the context of
@@ -1244,7 +1296,7 @@ class VespaIndex(DocumentIndex):
"ranking.properties.random.seed": random_seed,
}
return query_vespa(params)
return cleanup_chunks(query_vespa(params))
class _VespaDeleteRequest:

View File

@@ -9,8 +9,8 @@ from sqlalchemy.orm import Session
from onyx.configs.app_configs import MAX_FEDERATED_CHUNKS
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import FederatedConnectorSource
from onyx.context.search.models import ChunkIndexRequest
from onyx.context.search.models import InferenceChunk
from onyx.context.search.models import SearchQuery
from onyx.db.federated import (
get_federated_connector_document_set_mappings_by_document_set_names,
)
@@ -27,14 +27,14 @@ logger = setup_logger()
class FederatedRetrievalInfo(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
retrieval_function: Callable[[SearchQuery], list[InferenceChunk]]
retrieval_function: Callable[[ChunkIndexRequest], list[InferenceChunk]]
source: FederatedConnectorSource
def get_federated_retrieval_functions(
db_session: Session,
user_id: UUID | None,
source_types: list[DocumentSource] | None,
source_types: set[DocumentSource] | None,
document_set_names: list[str] | None,
slack_context: SlackContext | None = None,
) -> list[FederatedRetrievalInfo]:

View File

@@ -3,8 +3,8 @@ from abc import abstractmethod
from typing import Any
from typing import Dict
from onyx.context.search.models import ChunkIndexRequest
from onyx.context.search.models import InferenceChunk
from onyx.context.search.models import SearchQuery
from onyx.federated_connectors.models import CredentialField
from onyx.federated_connectors.models import EntityField
from onyx.federated_connectors.models import OAuthResult
@@ -83,7 +83,7 @@ class FederatedConnector(ABC):
@abstractmethod
def search(
self,
query: SearchQuery,
query: ChunkIndexRequest,
entities: dict[str, Any],
access_token: str,
limit: int | None = None,

View File

@@ -9,8 +9,8 @@ from pydantic import ValidationError
from typing_extensions import override
from onyx.context.search.federated.slack_search import slack_retrieval
from onyx.context.search.models import ChunkIndexRequest
from onyx.context.search.models import InferenceChunk
from onyx.context.search.models import SearchQuery
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.federated_connectors.interfaces import FederatedConnector
from onyx.federated_connectors.models import CredentialField
@@ -218,7 +218,7 @@ class SlackFederatedConnector(FederatedConnector):
@override
def search(
self,
query: SearchQuery,
query: ChunkIndexRequest,
entities: dict[str, Any],
access_token: str,
limit: int | None = None,

View File

@@ -88,7 +88,6 @@ from onyx.server.features.tool.api import admin_router as admin_tool_router
from onyx.server.features.tool.api import router as tool_router
from onyx.server.features.user_oauth_token.api import router as user_oauth_token_router
from onyx.server.federated.api import router as federated_router
from onyx.server.gpts.api import router as gpts_router
from onyx.server.kg.api import admin_router as kg_admin_router
from onyx.server.long_term_logs.long_term_logs_api import (
router as long_term_logs_router,
@@ -380,7 +379,6 @@ def get_application(lifespan_override: Lifespan | None = None) -> FastAPI:
include_router_with_global_prefix_prepended(application, user_oauth_token_router)
include_router_with_global_prefix_prepended(application, state_router)
include_router_with_global_prefix_prepended(application, onyx_api_router)
include_router_with_global_prefix_prepended(application, gpts_router)
include_router_with_global_prefix_prepended(application, settings_router)
include_router_with_global_prefix_prepended(application, settings_admin_router)
include_router_with_global_prefix_prepended(application, llm_admin_router)

View File

@@ -8,7 +8,6 @@ from sqlalchemy.orm import Session
from onyx.configs.chat_configs import NUM_PERSONA_PROMPT_GENERATION_CHUNKS
from onyx.context.search.models import IndexFilters
from onyx.context.search.models import InferenceChunk
from onyx.context.search.postprocessing.postprocessing import cleanup_chunks
from onyx.context.search.preprocessing.access_filters import (
build_access_filters_for_user,
)
@@ -45,7 +44,7 @@ def get_random_chunks_from_doc_sets(
chunks = document_index.random_retrieval(
filters=filters, num_to_retrieve=NUM_PERSONA_PROMPT_GENERATION_CHUNKS
)
return cleanup_chunks(chunks)
return chunks
def parse_categories(content: str) -> List[str | None]:

View File

@@ -1,104 +0,0 @@
import math
from datetime import datetime
from datetime import timezone
from fastapi import APIRouter
from fastapi import Depends
from pydantic import BaseModel
from sqlalchemy.orm import Session
from onyx.context.search.models import SearchRequest
from onyx.context.search.pipeline import SearchPipeline
from onyx.db.engine.sql_engine import get_session
from onyx.db.models import User
from onyx.llm.factory import get_default_llms
from onyx.server.onyx_api.ingestion import api_key_dep
from onyx.utils.logger import setup_logger
logger = setup_logger()
router = APIRouter(prefix="/gpts")
def time_ago(dt: datetime) -> str:
# Calculate time difference
now = datetime.now(timezone.utc)
diff = now - dt.astimezone(timezone.utc)
# Convert difference to minutes
minutes = diff.total_seconds() / 60
# Determine the appropriate unit and calculate the age
if minutes < 60:
return f"~{math.floor(minutes)} minutes"
hours = minutes / 60
if hours < 24:
return f"~{math.floor(hours)} hours"
days = hours / 24
if days < 7:
return f"~{math.floor(days)} days"
weeks = days / 7
if weeks < 4:
return f"~{math.floor(weeks)} weeks"
months = days / 30
return f"~{math.floor(months)} months"
class GptSearchRequest(BaseModel):
query: str
class GptDocChunk(BaseModel):
title: str
content: str
source_type: str
link: str
metadata: dict[str, str | list[str]]
document_age: str
class GptSearchResponse(BaseModel):
matching_document_chunks: list[GptDocChunk]
@router.post("/gpt-document-search")
def gpt_search(
search_request: GptSearchRequest,
_: User | None = Depends(api_key_dep),
db_session: Session = Depends(get_session),
) -> GptSearchResponse:
llm, fast_llm = get_default_llms()
top_sections = SearchPipeline(
search_request=SearchRequest(
query=search_request.query,
),
user=None,
llm=llm,
fast_llm=fast_llm,
skip_query_analysis=True,
db_session=db_session,
).reranked_sections
return GptSearchResponse(
matching_document_chunks=[
GptDocChunk(
title=section.center_chunk.semantic_identifier,
content=section.center_chunk.content,
source_type=section.center_chunk.source_type,
link=(
section.center_chunk.source_links.get(0, "")
if section.center_chunk.source_links
else ""
),
metadata=section.center_chunk.metadata,
document_age=(
time_ago(section.center_chunk.updated_at)
if section.center_chunk.updated_at
else "Unknown"
),
)
for section in top_sections
],
)