1
0
forked from github/onyx

Compare commits

...

3 Commits

Author SHA1 Message Date
pablodanswer
50c186c8c6 seed no docs 2024-12-04 08:57:03 -08:00
pablodanswer
e965a602ad k 2024-12-03 11:32:12 -08:00
pablodanswer
7fa3c9c82d update document 2024-12-03 10:21:11 -08:00
9 changed files with 38 additions and 2 deletions

View File

@@ -135,6 +135,7 @@ class SearchPipeline:
"""Retrieval and Postprocessing"""
@log_function_time(print_only=True)
def _get_chunks(self) -> list[InferenceChunk]:
if self._retrieved_chunks is not None:
return self._retrieved_chunks
@@ -306,6 +307,7 @@ class SearchPipeline:
return expanded_inference_sections
@property
@log_function_time(print_only=True)
def reranked_sections(self) -> list[InferenceSection]:
"""Reranking is always done at the chunk level since section merging could create arbitrarily
long sections which could be:
@@ -331,6 +333,7 @@ class SearchPipeline:
return self._reranked_sections
@property
@log_function_time(print_only=True)
def final_context_sections(self) -> list[InferenceSection]:
if self._final_context_sections is not None:
return self._final_context_sections
@@ -339,6 +342,7 @@ class SearchPipeline:
return self._final_context_sections
@property
@log_function_time(print_only=True)
def section_relevance(self) -> list[SectionRelevancePiece] | None:
if self._section_relevance is not None:
return self._section_relevance
@@ -393,6 +397,7 @@ class SearchPipeline:
return self._section_relevance
@property
@log_function_time(print_only=True)
def section_relevance_list(self) -> list[bool]:
llm_indices = relevant_sections_to_indices(
relevance_sections=self.section_relevance,

View File

@@ -42,6 +42,7 @@ def _log_top_section_links(search_flow: str, sections: list[InferenceSection]) -
logger.debug(f"Top links from {search_flow} search: {', '.join(top_links)}")
@log_function_time(print_only=True)
def cleanup_chunks(chunks: list[InferenceChunkUncleaned]) -> list[InferenceChunk]:
def _remove_title(chunk: InferenceChunkUncleaned) -> str:
if not chunk.title or not chunk.content:
@@ -244,6 +245,7 @@ def filter_sections(
]
@log_function_time(print_only=True)
def search_postprocessing(
search_query: SearchQuery,
retrieved_sections: list[InferenceSection],

View File

@@ -1,4 +1,5 @@
import string
import time
from collections.abc import Callable
import nltk # type:ignore
@@ -85,6 +86,7 @@ def remove_stop_words_and_punctuation(keywords: list[str]) -> list[str]:
return keywords
@log_function_time(print_only=True)
def combine_retrieval_results(
chunk_sets: list[list[InferenceChunk]],
) -> list[InferenceChunk]:
@@ -256,7 +258,13 @@ def retrieve_chunks(
(q_copy, document_index, db_session),
)
)
start_time = time.time()
parallel_search_results = run_functions_tuples_in_parallel(run_queries)
end_time = time.time()
logger.info(
f"Parallel search execution took {end_time - start_time:.2f} seconds"
)
top_chunks = combine_retrieval_results(parallel_search_results)
if not top_chunks:

View File

@@ -8,6 +8,7 @@ from danswer.context.search.models import SavedSearchDoc
from danswer.context.search.models import SavedSearchDocWithContent
from danswer.context.search.models import SearchDoc
from danswer.db.models import SearchDoc as DBSearchDoc
from danswer.utils.timing import log_function_time
T = TypeVar(
@@ -88,6 +89,7 @@ def drop_llm_indices(
return [i for i, val in enumerate(llm_bools) if val]
@log_function_time(print_only=True)
def inference_section_from_chunks(
center_chunk: InferenceChunk,
chunks: list[InferenceChunk],

View File

@@ -48,6 +48,7 @@ from danswer.document_index.vespa_constants import TITLE
from danswer.document_index.vespa_constants import YQL_BASE
from danswer.utils.logger import setup_logger
from danswer.utils.threadpool_concurrency import run_functions_tuples_in_parallel
from danswer.utils.timing import log_function_time
logger = setup_logger()
@@ -146,6 +147,7 @@ def _vespa_hit_to_inference_chunk(
)
@log_function_time(print_only=True)
def _get_chunks_via_visit_api(
chunk_request: VespaChunkRequest,
index_name: str,
@@ -232,6 +234,7 @@ def _get_chunks_via_visit_api(
@retry(tries=10, delay=1, backoff=2)
@log_function_time(print_only=True)
def get_all_vespa_ids_for_document_id(
document_id: str,
index_name: str,
@@ -248,6 +251,7 @@ def get_all_vespa_ids_for_document_id(
return [chunk["id"].split("::", 1)[-1] for chunk in document_chunks]
@log_function_time(print_only=True)
def parallel_visit_api_retrieval(
index_name: str,
chunk_requests: list[VespaChunkRequest],
@@ -262,9 +266,12 @@ def parallel_visit_api_retrieval(
for chunk_request in chunk_requests
]
start_time = datetime.now()
parallel_results = run_functions_tuples_in_parallel(
functions_with_args, allow_failures=True
)
duration = datetime.now() - start_time
print(f"Parallel visit API retrieval took {duration.total_seconds():.2f} seconds")
# Any failures to retrieve would give a None, drop the Nones and empty lists
vespa_chunk_sets = [res for res in parallel_results if res]
@@ -282,9 +289,11 @@ def parallel_visit_api_retrieval(
@retry(tries=3, delay=1, backoff=2)
@log_function_time(print_only=True)
def query_vespa(
query_params: Mapping[str, str | int | float]
) -> list[InferenceChunkUncleaned]:
print(f"query_params: {query_params}")
if "query" in query_params and not cast(str, query_params["query"]).strip():
raise ValueError("No/empty query received")
@@ -340,6 +349,7 @@ def query_vespa(
return inference_chunks
@log_function_time(print_only=True)
def _get_chunks_via_batch_search(
index_name: str,
chunk_requests: list[VespaChunkRequest],
@@ -374,6 +384,7 @@ def _get_chunks_via_batch_search(
return inference_chunks
@log_function_time(print_only=True)
def batch_search_api_retrieval(
index_name: str,
chunk_requests: list[VespaChunkRequest],

View File

@@ -72,6 +72,7 @@ from danswer.indexing.models import DocMetadataAwareIndexChunk
from danswer.key_value_store.factory import get_kv_store
from danswer.utils.batching import batch_generator
from danswer.utils.logger import setup_logger
from danswer.utils.timing import log_function_time
from shared_configs.configs import MULTI_TENANT
from shared_configs.model_server_models import Embedding
@@ -660,6 +661,7 @@ class VespaIndex(DocumentIndex):
return total_chunks_deleted
@log_function_time(print_only=True)
def id_based_retrieval(
self,
chunk_requests: list[VespaChunkRequest],
@@ -681,6 +683,7 @@ class VespaIndex(DocumentIndex):
get_large_chunks=get_large_chunks,
)
@log_function_time(print_only=True)
def hybrid_retrieval(
self,
query: str,

View File

@@ -21,6 +21,7 @@ from danswer.natural_language_processing.utils import tokenizer_trim_content
from danswer.prompts.prompt_utils import build_doc_context_str
from danswer.tools.tool_implementations.search.search_utils import section_to_dict
from danswer.utils.logger import setup_logger
from danswer.utils.timing import log_function_time
logger = setup_logger()
@@ -43,6 +44,7 @@ class ChunkRange(BaseModel):
end: int
@log_function_time(print_only=True)
def merge_chunk_intervals(chunk_ranges: list[ChunkRange]) -> list[ChunkRange]:
"""
This acts on a single document to merge the overlapping ranges of chunks
@@ -300,6 +302,7 @@ def prune_sections(
)
@log_function_time(print_only=True)
def _merge_doc_chunks(chunks: list[InferenceChunk]) -> InferenceSection:
# Assuming there are no duplicates by this point
sorted_chunks = sorted(chunks, key=lambda x: x.chunk_id)
@@ -327,6 +330,7 @@ def _merge_doc_chunks(chunks: list[InferenceChunk]) -> InferenceSection:
)
@log_function_time(print_only=True)
def _merge_sections(sections: list[InferenceSection]) -> list[InferenceSection]:
docs_map: dict[str, dict[int, InferenceChunk]] = defaultdict(dict)
doc_order: dict[str, int] = {}

View File

@@ -15,6 +15,7 @@ from danswer.prompts.miscellaneous_prompts import LANGUAGE_REPHRASE_PROMPT
from danswer.utils.logger import setup_logger
from danswer.utils.text_processing import count_punctuation
from danswer.utils.threadpool_concurrency import run_functions_tuples_in_parallel
from danswer.utils.timing import log_function_time
logger = setup_logger()
@@ -48,6 +49,7 @@ def llm_multilingual_query_expansion(query: str, language: str) -> str:
return model_output
@log_function_time(print_only=True)
def multilingual_query_expansion(
query: str,
expansion_languages: list[str],

View File

@@ -38,7 +38,6 @@ from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder
from danswer.natural_language_processing.search_nlp_models import warm_up_cross_encoder
from danswer.seeding.load_docs import seed_initial_documents
from danswer.seeding.load_yamls import load_chat_yamls
from danswer.server.manage.llm.models import LLMProviderUpsertRequest
from danswer.server.settings.store import load_settings
@@ -150,7 +149,7 @@ def setup_danswer(
# update multipass indexing setting based on GPU availability
update_default_multipass_indexing(db_session)
seed_initial_documents(db_session, tenant_id, cohere_enabled)
# seed_initial_documents(db_session, tenant_id, cohere_enabled)
def translate_saved_search_settings(db_session: Session) -> None: