mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-04-05 06:52:42 +00:00
Compare commits
16 Commits
cli/v0.2.1
...
enable-cud
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c84b1923a0 | ||
|
|
684abec372 | ||
|
|
6174b6620a | ||
|
|
13c783eeb7 | ||
|
|
b150655902 | ||
|
|
6b37a8a428 | ||
|
|
ff4781e352 | ||
|
|
2e70e760b8 | ||
|
|
0d0ba2c23a | ||
|
|
20575064bc | ||
|
|
49ba3cb41e | ||
|
|
ef228e6be3 | ||
|
|
6e20c7c514 | ||
|
|
64d6c1fde6 | ||
|
|
ea60558a09 | ||
|
|
cffcceac72 |
@@ -1,4 +1,4 @@
|
||||
FROM python:3.11.4-slim-bookworm
|
||||
FROM nvidia/cuda:12.2.2-devel-ubuntu22.04
|
||||
|
||||
# Install system dependencies
|
||||
RUN apt-get update && \
|
||||
@@ -8,6 +8,18 @@ RUN apt-get update && \
|
||||
rm -rf /var/lib/apt/lists/* && \
|
||||
apt-get clean
|
||||
|
||||
# RUN apt-get update && \
|
||||
# apt-get install -y build-essential checkinstall \
|
||||
# libreadline-gplv2-dev libncursesw5-dev libssl-dev \
|
||||
# libsqlite3-dev tk-dev libgdbm-dev libc6-dev libbz2-dev
|
||||
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y tzdata
|
||||
RUN apt-get update && \
|
||||
apt-get install software-properties-common -y && \
|
||||
add-apt-repository ppa:deadsnakes/ppa && \
|
||||
apt-get install python3.11-dev python3.11-distutils -y && \
|
||||
curl -sS https://bootstrap.pypa.io/get-pip.py | python3.11
|
||||
|
||||
|
||||
# Install Python dependencies
|
||||
# Remove py which is pulled in by retry, py is not needed and is a CVE
|
||||
COPY ./requirements/default.txt /tmp/requirements.txt
|
||||
@@ -21,22 +33,22 @@ RUN pip install --no-cache-dir --upgrade -r /tmp/requirements.txt && \
|
||||
# https://nodejs.org/en/download/package-manager#debian-and-ubuntu-based-linux-distributions
|
||||
# this is temporarily needed until playwright updates their packaged node version to
|
||||
# 20.5.1+
|
||||
RUN mkdir -p /etc/apt/keyrings && \
|
||||
curl -fsSL https://deb.nodesource.com/gpgkey/nodesource-repo.gpg.key | gpg --dearmor -o /etc/apt/keyrings/nodesource.gpg && \
|
||||
echo "deb [signed-by=/etc/apt/keyrings/nodesource.gpg] https://deb.nodesource.com/node_20.x nodistro main" | tee /etc/apt/sources.list.d/nodesource.list && \
|
||||
apt-get update && \
|
||||
apt-get install -y nodejs && \
|
||||
cp /usr/bin/node /usr/local/lib/python3.11/site-packages/playwright/driver/node && \
|
||||
apt-get remove -y nodejs
|
||||
# RUN mkdir -p /etc/apt/keyrings && \
|
||||
# curl -fsSL https://deb.nodesource.com/gpgkey/nodesource-repo.gpg.key | gpg --dearmor -o /etc/apt/keyrings/nodesource.gpg && \
|
||||
# echo "deb [signed-by=/etc/apt/keyrings/nodesource.gpg] https://deb.nodesource.com/node_20.x nodistro main" | tee /etc/apt/sources.list.d/nodesource.list && \
|
||||
# apt-get update && \
|
||||
# apt-get install -y nodejs && \
|
||||
# cp /usr/bin/node /usr/local/lib/python3.11/site-packages/playwright/driver/node && \
|
||||
# apt-get remove -y nodejs
|
||||
|
||||
# Cleanup for CVEs and size reduction
|
||||
# Remove tornado test key to placate vulnerability scanners
|
||||
# More details can be found here:
|
||||
# https://github.com/tornadoweb/tornado/issues/3107
|
||||
RUN apt-get remove -y linux-libc-dev && \
|
||||
apt-get autoremove -y && \
|
||||
rm -rf /var/lib/apt/lists/* && \
|
||||
rm /usr/local/lib/python3.11/site-packages/tornado/test/test.key
|
||||
# RUN apt-get remove -y linux-libc-dev && \
|
||||
# apt-get autoremove -y && \
|
||||
# rm -rf /var/lib/apt/lists/* && \
|
||||
# rm /usr/local/lib/python3.11/site-packages/tornado/test/test.key
|
||||
|
||||
# Set up application files
|
||||
WORKDIR /app
|
||||
@@ -55,6 +67,9 @@ COPY ./scripts/migrate_vespa_to_acl.py /app/migrate_vespa_to_acl.py
|
||||
|
||||
ENV PYTHONPATH /app
|
||||
|
||||
RUN ln -s /usr/bin/python3.11 /usr/bin/python & \
|
||||
ln -s /usr/bin/pip3.11 /usr/bin/pip
|
||||
|
||||
# Default command which does nothing
|
||||
# This container is used by api server and background which specify their own CMD
|
||||
CMD ["tail", "-f", "/dev/null"]
|
||||
|
||||
@@ -220,8 +220,8 @@ celery_app.conf.beat_schedule = {
|
||||
"task": "check_for_document_sets_sync_task",
|
||||
"schedule": timedelta(seconds=5),
|
||||
},
|
||||
"clean-old-temp-files": {
|
||||
"task": "clean_old_temp_files_task",
|
||||
"schedule": timedelta(minutes=30),
|
||||
},
|
||||
# "clean-old-temp-files": {
|
||||
# "task": "clean_old_temp_files_task",
|
||||
# "schedule": timedelta(minutes=30),
|
||||
# },
|
||||
}
|
||||
|
||||
@@ -4,12 +4,13 @@ not follow the expected behavior, etc.
|
||||
|
||||
NOTE: cannot use Celery directly due to
|
||||
https://github.com/celery/celery/issues/7007#issuecomment-1740139367"""
|
||||
import multiprocessing
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from typing import Literal
|
||||
|
||||
from torch import multiprocessing as mp
|
||||
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
@@ -29,7 +30,7 @@ class SimpleJob:
|
||||
"""Drop in replacement for `dask.distributed.Future`"""
|
||||
|
||||
id: int
|
||||
process: multiprocessing.Process | None = None
|
||||
process: mp.Process | None = None
|
||||
|
||||
def cancel(self) -> bool:
|
||||
return self.release()
|
||||
@@ -94,7 +95,7 @@ class SimpleJobClient:
|
||||
job_id = self.job_id_counter
|
||||
self.job_id_counter += 1
|
||||
|
||||
process = multiprocessing.Process(target=func, args=args)
|
||||
process = mp.Process(target=func, args=args)
|
||||
job = SimpleJob(id=job_id, process=process)
|
||||
process.start()
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from dask.distributed import Client
|
||||
from dask.distributed import Future
|
||||
from distributed import LocalCluster
|
||||
from sqlalchemy.orm import Session
|
||||
from torch import multiprocessing as mp
|
||||
|
||||
from danswer.background.indexing.job_client import SimpleJob
|
||||
from danswer.background.indexing.job_client import SimpleJobClient
|
||||
@@ -299,22 +300,41 @@ def _run_indexing(
|
||||
run_dt=run_dt,
|
||||
)
|
||||
|
||||
net_doc_change = 0
|
||||
document_count = 0
|
||||
chunk_count = 0
|
||||
try:
|
||||
net_doc_change = 0
|
||||
document_count = 0
|
||||
chunk_count = 0
|
||||
curr_time = time.time()
|
||||
for doc_batch in doc_batch_generator:
|
||||
logger.info(
|
||||
f"Building the next doc batch took {time.time() - curr_time} seconds"
|
||||
)
|
||||
logger.debug(
|
||||
f"Indexing batch of documents: {[doc.to_short_descriptor() for doc in doc_batch]}"
|
||||
)
|
||||
|
||||
new_docs, total_batch_chunks = indexing_pipeline(
|
||||
documents=doc_batch,
|
||||
index_attempt_metadata=IndexAttemptMetadata(
|
||||
connector_id=db_connector.id,
|
||||
credential_id=db_credential.id,
|
||||
),
|
||||
)
|
||||
success = False
|
||||
for _ in range(4):
|
||||
try:
|
||||
new_docs, total_batch_chunks = indexing_pipeline(
|
||||
documents=doc_batch,
|
||||
index_attempt_metadata=IndexAttemptMetadata(
|
||||
connector_id=db_connector.id,
|
||||
credential_id=db_credential.id,
|
||||
),
|
||||
)
|
||||
success = True
|
||||
break
|
||||
except Exception:
|
||||
logger.exception("Failed to run indexing pipeline")
|
||||
time.sleep(5)
|
||||
|
||||
if not success:
|
||||
logger.error(
|
||||
"Failed to run indexing pipeline 4 times, skipping batch"
|
||||
)
|
||||
continue
|
||||
|
||||
net_doc_change += new_docs
|
||||
chunk_count += total_batch_chunks
|
||||
document_count += len(doc_batch)
|
||||
@@ -340,6 +360,8 @@ def _run_indexing(
|
||||
# let the `except` block handle this
|
||||
raise RuntimeError("Connector was disabled mid run")
|
||||
|
||||
curr_time = time.time()
|
||||
|
||||
mark_attempt_succeeded(attempt, db_session)
|
||||
update_connector_credential_pair(
|
||||
db_session=db_session,
|
||||
@@ -519,6 +541,11 @@ def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> Non
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# required to spin up new processes from within a process
|
||||
# when using pytorch based models on GPU
|
||||
if torch.cuda.is_available():
|
||||
mp.set_start_method("spawn")
|
||||
|
||||
logger.info("Warming up Embedding Model(s)")
|
||||
warm_up_models(indexer_only=True)
|
||||
logger.info("Starting Indexing Loop")
|
||||
|
||||
@@ -93,7 +93,7 @@ VESPA_DEPLOYMENT_ZIP = (
|
||||
os.environ.get("VESPA_DEPLOYMENT_ZIP") or "/app/danswer/vespa-app.zip"
|
||||
)
|
||||
# Number of documents in a batch during indexing (further batching done by chunks before passing to bi-encoder)
|
||||
INDEX_BATCH_SIZE = 16
|
||||
INDEX_BATCH_SIZE = 128
|
||||
|
||||
# Below are intended to match the env variables names used by the official postgres docker image
|
||||
# https://hub.docker.com/_/postgres
|
||||
|
||||
@@ -70,7 +70,7 @@ DOCUMENT_ID_ENDPOINT = (
|
||||
f"{VESPA_APP_CONTAINER_URL}/document/v1/default/danswer_chunk/docid"
|
||||
)
|
||||
SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/"
|
||||
_BATCH_SIZE = 100 # Specific to Vespa
|
||||
_BATCH_SIZE = 128 # Specific to Vespa
|
||||
_NUM_THREADS = (
|
||||
16 # since Vespa doesn't allow batching of inserts / updates, we use threads
|
||||
)
|
||||
@@ -412,7 +412,7 @@ def _vespa_hit_to_inference_chunk(hit: dict[str, Any]) -> InferenceChunk:
|
||||
|
||||
return InferenceChunk(
|
||||
chunk_id=fields[CHUNK_ID],
|
||||
blurb=fields[BLURB],
|
||||
blurb=fields.get(BLURB, ""),
|
||||
content=fields[CONTENT],
|
||||
source_links=source_links_dict,
|
||||
section_continuation=fields[SECTION_CONTINUATION],
|
||||
|
||||
@@ -156,6 +156,32 @@ def split_chunk_text_into_mini_chunks(
|
||||
return sentence_aware_splitter.split_text(chunk_text)
|
||||
|
||||
|
||||
def split_string(s: str, chunk_size: int = 1024) -> str:
|
||||
return [s[i : i + chunk_size] for i in range(0, len(s), chunk_size)]
|
||||
|
||||
|
||||
def chunk_document_FAST(
|
||||
document: Document,
|
||||
chunk_tok_size: int = CHUNK_SIZE,
|
||||
subsection_overlap: int = CHUNK_OVERLAP,
|
||||
blurb_size: int = BLURB_SIZE,
|
||||
) -> list[DocAwareChunk]:
|
||||
chunks: list[DocAwareChunk] = []
|
||||
for section in document.sections:
|
||||
for ind, text in enumerate(split_string(section.text, 1024)):
|
||||
chunks.append(
|
||||
DocAwareChunk(
|
||||
source_document=document,
|
||||
chunk_id=len(chunks),
|
||||
blurb="",
|
||||
content=text,
|
||||
source_links={0: section.link},
|
||||
section_continuation=ind == 0,
|
||||
)
|
||||
)
|
||||
return chunks
|
||||
|
||||
|
||||
class Chunker:
|
||||
@abc.abstractmethod
|
||||
def chunk(self, document: Document) -> list[DocAwareChunk]:
|
||||
@@ -164,4 +190,4 @@ class Chunker:
|
||||
|
||||
class DefaultChunker(Chunker):
|
||||
def chunk(self, document: Document) -> list[DocAwareChunk]:
|
||||
return chunk_document(document)
|
||||
return chunk_document_FAST(document)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import time
|
||||
from functools import partial
|
||||
from itertools import chain
|
||||
from typing import Protocol
|
||||
@@ -21,6 +22,7 @@ from danswer.indexing.models import DocAwareChunk
|
||||
from danswer.indexing.models import DocMetadataAwareIndexChunk
|
||||
from danswer.search.models import Embedder
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.timing import log_function_time
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
@@ -74,29 +76,36 @@ def _indexing_pipeline(
|
||||
|
||||
with Session(get_sqlalchemy_engine()) as db_session:
|
||||
# acquires a lock on the documents so that no other process can modify them
|
||||
prepare_to_modify_documents(db_session=db_session, document_ids=document_ids)
|
||||
log_function_time()(prepare_to_modify_documents)(
|
||||
db_session=db_session, document_ids=document_ids
|
||||
)
|
||||
|
||||
# create records in the source of truth about these documents
|
||||
_upsert_documents(
|
||||
log_function_time()(_upsert_documents)(
|
||||
documents=documents,
|
||||
index_attempt_metadata=index_attempt_metadata,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
chunks: list[DocAwareChunk] = list(
|
||||
chain(*[chunker.chunk(document=document) for document in documents])
|
||||
)
|
||||
logger.info(f"Chunking took {time.time() - start_time} seconds")
|
||||
|
||||
logger.debug(
|
||||
f"Indexing the following chunks: {[chunk.to_short_descriptor() for chunk in chunks]}"
|
||||
)
|
||||
chunks_with_embeddings = embedder.embed(chunks=chunks)
|
||||
chunks_with_embeddings = log_function_time()(embedder.embed)(chunks=chunks)
|
||||
|
||||
# Attach the latest status from Postgres (source of truth for access) to each
|
||||
# chunk. This access status will be attached to each chunk in the document index
|
||||
# TODO: attach document sets to the chunk based on the status of Postgres as well
|
||||
document_id_to_access_info = get_access_for_documents(
|
||||
document_id_to_access_info = log_function_time()(get_access_for_documents)(
|
||||
document_ids=document_ids, db_session=db_session
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
document_id_to_document_set = {
|
||||
document_id: document_sets
|
||||
for document_id, document_sets in fetch_document_sets_for_documents(
|
||||
@@ -113,11 +122,12 @@ def _indexing_pipeline(
|
||||
)
|
||||
for chunk in chunks_with_embeddings
|
||||
]
|
||||
logger.info(f"Getting access info took {time.time() - start_time} seconds")
|
||||
|
||||
# A document will not be spread across different batches, so all the
|
||||
# documents with chunks in this set, are fully represented by the chunks
|
||||
# in this set
|
||||
insertion_records = document_index.index(
|
||||
insertion_records = log_function_time()(document_index.index)(
|
||||
chunks=access_aware_chunks,
|
||||
)
|
||||
|
||||
|
||||
@@ -310,7 +310,10 @@ def search_chunks(
|
||||
)
|
||||
|
||||
# Keyword Search should never do reranking, no transformers involved in this flow
|
||||
if query.search_type == SearchType.KEYWORD:
|
||||
if (
|
||||
query.search_type == SearchType.KEYWORD
|
||||
or query.search_type == SearchType.HYBRID
|
||||
):
|
||||
_log_top_chunk_links(query.search_type.value, top_chunks)
|
||||
return top_chunks, None
|
||||
|
||||
|
||||
93
backend/scripts/benchmark_search.py
Normal file
93
backend/scripts/benchmark_search.py
Normal file
@@ -0,0 +1,93 @@
|
||||
import random
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.search.models import SearchType
|
||||
|
||||
question_bank = [
|
||||
"Who was the first president of the United States?",
|
||||
"What is photosynthesis?",
|
||||
"How long is the Great Wall of China?",
|
||||
"When was the Eiffel Tower constructed?",
|
||||
"Who wrote 'Pride and Prejudice'?",
|
||||
"What's the difference between mitosis and meiosis?",
|
||||
"What is the capital of Brazil?",
|
||||
"Who discovered penicillin?",
|
||||
"What causes the Aurora Borealis?",
|
||||
"When did the Titanic sink?",
|
||||
"How does a combustion engine work?",
|
||||
"Who is the author of 'The Odyssey'?",
|
||||
"What is quantum physics?",
|
||||
"When was the Mona Lisa painted?",
|
||||
"What's the difference between a meteor and a meteorite?",
|
||||
"Who founded the city of Rome?",
|
||||
"What is the boiling point of water at sea level?",
|
||||
"Who won the Nobel Prize in Literature in 1953?",
|
||||
"How do honeybees produce honey?",
|
||||
"What is the deepest part of the ocean?",
|
||||
"When did the first humans arrive in the Americas?",
|
||||
"What is the Fibonacci sequence?",
|
||||
"How was the Grand Canyon formed?",
|
||||
"Who composed the Moonlight Sonata?",
|
||||
"What are the primary colors of light?",
|
||||
"When did the Roman Empire fall?",
|
||||
"How does photosynthesis contribute to the carbon cycle?",
|
||||
"Who was the first woman in space?",
|
||||
"What is the Pythagorean theorem?",
|
||||
"Which planet is known as the 'Red Planet'?",
|
||||
"Who is the father of modern physics?",
|
||||
"What is the primary purpose of the United Nations?",
|
||||
"How old is the Earth?",
|
||||
"Who wrote 'Don Quixote'?",
|
||||
"What is the structure of DNA?",
|
||||
"When was the Declaration of Independence signed?",
|
||||
"What causes a solar eclipse?",
|
||||
"Who was the longest-reigning British monarch?",
|
||||
"How do tornadoes form?",
|
||||
"Who developed the theory of relativity?",
|
||||
"What's the tallest mountain on Earth when measured from base to peak?",
|
||||
"How many bones are there in the adult human body?",
|
||||
"When was the Internet invented?",
|
||||
"Who was the ancient Egyptian queen known for her relationship with Roman leaders?",
|
||||
"What is the Krebs cycle?",
|
||||
"Which country has the largest land area?",
|
||||
"Who painted the Starry Night?",
|
||||
"What's the difference between an alligator and a crocodile?",
|
||||
"Who discovered the circulation of blood?",
|
||||
"How many planets are there in our solar system?",
|
||||
]
|
||||
|
||||
|
||||
def _measure_hybrid_search_latency(filters: dict = {}):
|
||||
start = time.monotonic()
|
||||
response = requests.post(
|
||||
"http://localhost:8080/document-search",
|
||||
json={
|
||||
"query": random.choice(question_bank),
|
||||
"collection": DOCUMENT_INDEX_NAME,
|
||||
"filters": filters,
|
||||
"enable_auto_detect_filters": False,
|
||||
"search_type": SearchType.HYBRID.value,
|
||||
},
|
||||
)
|
||||
if not response.ok:
|
||||
raise Exception(f"Failed to search: {response.text}")
|
||||
return time.monotonic() - start
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
latencies: list[float] = []
|
||||
for _ in range(50):
|
||||
latencies.append(_measure_hybrid_search_latency())
|
||||
print("Latency", latencies[-1])
|
||||
print(f"Average latency: {sum(latencies) / len(latencies)}")
|
||||
|
||||
print("Testing with filters")
|
||||
for _ in range(50):
|
||||
latencies.append(
|
||||
_measure_hybrid_search_latency(filters={"source_type": ["file"]})
|
||||
)
|
||||
print("Latency", latencies[-1])
|
||||
print(f"Average latency: {sum(latencies) / len(latencies)}")
|
||||
@@ -18,7 +18,7 @@ server {
|
||||
listen 80;
|
||||
server_name ${DOMAIN};
|
||||
|
||||
client_max_body_size 500M; # Maximum upload size
|
||||
client_max_body_size 0; # Maximum upload size
|
||||
|
||||
location ~ ^/api(.*)$ {
|
||||
rewrite ^/api(/.*)$ $1 break;
|
||||
|
||||
@@ -99,6 +99,15 @@ services:
|
||||
- model_cache_torch:/root/.cache/torch/
|
||||
- model_cache_nltk:/root/nltk_data/
|
||||
- model_cache_huggingface:/root/.cache/huggingface/
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
devices:
|
||||
- driver: nvidia
|
||||
capabilities:
|
||||
- gpu
|
||||
- utility # nvidia-smi
|
||||
- compute # CUDA
|
||||
web_server:
|
||||
image: danswer/danswer-web-server:latest
|
||||
build:
|
||||
|
||||
Reference in New Issue
Block a user