Compare commits

...

16 Commits

Author SHA1 Message Date
Weves
c84b1923a0 Fix 2023-11-01 22:13:54 -07:00
Weves
684abec372 Use spawn-based mp 2023-11-01 22:08:28 -07:00
Weves
6174b6620a change 2023-11-01 22:00:59 -07:00
Weves
13c783eeb7 Update benchmark 2023-11-01 22:00:59 -07:00
Weves
b150655902 Disable re-ranking for hybrid 2023-11-01 22:00:59 -07:00
Weves
6b37a8a428 Enable empty blurb 2023-11-01 22:00:59 -07:00
Weves
ff4781e352 script 2023-11-01 22:00:59 -07:00
Weves
2e70e760b8 prevent marking job as failed 2023-11-01 22:00:59 -07:00
Weves
0d0ba2c23a Add retries for indexing pipeline 2023-11-01 22:00:59 -07:00
Weves
20575064bc Change # of threads 2023-11-01 22:00:59 -07:00
Weves
49ba3cb41e remove file cleanup job 2023-11-01 22:00:59 -07:00
Weves
ef228e6be3 tweak 2023-11-01 22:00:59 -07:00
Weves
6e20c7c514 Increase # of threads when indexing in Vespa 2023-11-01 22:00:59 -07:00
Weves
64d6c1fde6 Make chunking FAST 2023-11-01 22:00:59 -07:00
Weves
ea60558a09 Adding timing info everywhere 2023-11-01 22:00:59 -07:00
Weves
cffcceac72 Testing 2023-11-01 22:00:59 -07:00
12 changed files with 224 additions and 40 deletions

View File

@@ -1,4 +1,4 @@
FROM python:3.11.4-slim-bookworm
FROM nvidia/cuda:12.2.2-devel-ubuntu22.04
# Install system dependencies
RUN apt-get update && \
@@ -8,6 +8,18 @@ RUN apt-get update && \
rm -rf /var/lib/apt/lists/* && \
apt-get clean
# RUN apt-get update && \
# apt-get install -y build-essential checkinstall \
# libreadline-gplv2-dev libncursesw5-dev libssl-dev \
# libsqlite3-dev tk-dev libgdbm-dev libc6-dev libbz2-dev
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y tzdata
RUN apt-get update && \
apt-get install software-properties-common -y && \
add-apt-repository ppa:deadsnakes/ppa && \
apt-get install python3.11-dev python3.11-distutils -y && \
curl -sS https://bootstrap.pypa.io/get-pip.py | python3.11
# Install Python dependencies
# Remove py which is pulled in by retry, py is not needed and is a CVE
COPY ./requirements/default.txt /tmp/requirements.txt
@@ -21,22 +33,22 @@ RUN pip install --no-cache-dir --upgrade -r /tmp/requirements.txt && \
# https://nodejs.org/en/download/package-manager#debian-and-ubuntu-based-linux-distributions
# this is temporarily needed until playwright updates their packaged node version to
# 20.5.1+
RUN mkdir -p /etc/apt/keyrings && \
curl -fsSL https://deb.nodesource.com/gpgkey/nodesource-repo.gpg.key | gpg --dearmor -o /etc/apt/keyrings/nodesource.gpg && \
echo "deb [signed-by=/etc/apt/keyrings/nodesource.gpg] https://deb.nodesource.com/node_20.x nodistro main" | tee /etc/apt/sources.list.d/nodesource.list && \
apt-get update && \
apt-get install -y nodejs && \
cp /usr/bin/node /usr/local/lib/python3.11/site-packages/playwright/driver/node && \
apt-get remove -y nodejs
# RUN mkdir -p /etc/apt/keyrings && \
# curl -fsSL https://deb.nodesource.com/gpgkey/nodesource-repo.gpg.key | gpg --dearmor -o /etc/apt/keyrings/nodesource.gpg && \
# echo "deb [signed-by=/etc/apt/keyrings/nodesource.gpg] https://deb.nodesource.com/node_20.x nodistro main" | tee /etc/apt/sources.list.d/nodesource.list && \
# apt-get update && \
# apt-get install -y nodejs && \
# cp /usr/bin/node /usr/local/lib/python3.11/site-packages/playwright/driver/node && \
# apt-get remove -y nodejs
# Cleanup for CVEs and size reduction
# Remove tornado test key to placate vulnerability scanners
# More details can be found here:
# https://github.com/tornadoweb/tornado/issues/3107
RUN apt-get remove -y linux-libc-dev && \
apt-get autoremove -y && \
rm -rf /var/lib/apt/lists/* && \
rm /usr/local/lib/python3.11/site-packages/tornado/test/test.key
# RUN apt-get remove -y linux-libc-dev && \
# apt-get autoremove -y && \
# rm -rf /var/lib/apt/lists/* && \
# rm /usr/local/lib/python3.11/site-packages/tornado/test/test.key
# Set up application files
WORKDIR /app
@@ -55,6 +67,9 @@ COPY ./scripts/migrate_vespa_to_acl.py /app/migrate_vespa_to_acl.py
ENV PYTHONPATH /app
RUN ln -s /usr/bin/python3.11 /usr/bin/python & \
ln -s /usr/bin/pip3.11 /usr/bin/pip
# Default command which does nothing
# This container is used by api server and background which specify their own CMD
CMD ["tail", "-f", "/dev/null"]

View File

@@ -220,8 +220,8 @@ celery_app.conf.beat_schedule = {
"task": "check_for_document_sets_sync_task",
"schedule": timedelta(seconds=5),
},
"clean-old-temp-files": {
"task": "clean_old_temp_files_task",
"schedule": timedelta(minutes=30),
},
# "clean-old-temp-files": {
# "task": "clean_old_temp_files_task",
# "schedule": timedelta(minutes=30),
# },
}

View File

@@ -4,12 +4,13 @@ not follow the expected behavior, etc.
NOTE: cannot use Celery directly due to
https://github.com/celery/celery/issues/7007#issuecomment-1740139367"""
import multiprocessing
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
from typing import Literal
from torch import multiprocessing as mp
from danswer.utils.logger import setup_logger
logger = setup_logger()
@@ -29,7 +30,7 @@ class SimpleJob:
"""Drop in replacement for `dask.distributed.Future`"""
id: int
process: multiprocessing.Process | None = None
process: mp.Process | None = None
def cancel(self) -> bool:
return self.release()
@@ -94,7 +95,7 @@ class SimpleJobClient:
job_id = self.job_id_counter
self.job_id_counter += 1
process = multiprocessing.Process(target=func, args=args)
process = mp.Process(target=func, args=args)
job = SimpleJob(id=job_id, process=process)
process.start()

View File

@@ -9,6 +9,7 @@ from dask.distributed import Client
from dask.distributed import Future
from distributed import LocalCluster
from sqlalchemy.orm import Session
from torch import multiprocessing as mp
from danswer.background.indexing.job_client import SimpleJob
from danswer.background.indexing.job_client import SimpleJobClient
@@ -299,22 +300,41 @@ def _run_indexing(
run_dt=run_dt,
)
net_doc_change = 0
document_count = 0
chunk_count = 0
try:
net_doc_change = 0
document_count = 0
chunk_count = 0
curr_time = time.time()
for doc_batch in doc_batch_generator:
logger.info(
f"Building the next doc batch took {time.time() - curr_time} seconds"
)
logger.debug(
f"Indexing batch of documents: {[doc.to_short_descriptor() for doc in doc_batch]}"
)
new_docs, total_batch_chunks = indexing_pipeline(
documents=doc_batch,
index_attempt_metadata=IndexAttemptMetadata(
connector_id=db_connector.id,
credential_id=db_credential.id,
),
)
success = False
for _ in range(4):
try:
new_docs, total_batch_chunks = indexing_pipeline(
documents=doc_batch,
index_attempt_metadata=IndexAttemptMetadata(
connector_id=db_connector.id,
credential_id=db_credential.id,
),
)
success = True
break
except Exception:
logger.exception("Failed to run indexing pipeline")
time.sleep(5)
if not success:
logger.error(
"Failed to run indexing pipeline 4 times, skipping batch"
)
continue
net_doc_change += new_docs
chunk_count += total_batch_chunks
document_count += len(doc_batch)
@@ -340,6 +360,8 @@ def _run_indexing(
# let the `except` block handle this
raise RuntimeError("Connector was disabled mid run")
curr_time = time.time()
mark_attempt_succeeded(attempt, db_session)
update_connector_credential_pair(
db_session=db_session,
@@ -519,6 +541,11 @@ def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> Non
if __name__ == "__main__":
# required to spin up new processes from within a process
# when using pytorch based models on GPU
if torch.cuda.is_available():
mp.set_start_method("spawn")
logger.info("Warming up Embedding Model(s)")
warm_up_models(indexer_only=True)
logger.info("Starting Indexing Loop")

View File

@@ -93,7 +93,7 @@ VESPA_DEPLOYMENT_ZIP = (
os.environ.get("VESPA_DEPLOYMENT_ZIP") or "/app/danswer/vespa-app.zip"
)
# Number of documents in a batch during indexing (further batching done by chunks before passing to bi-encoder)
INDEX_BATCH_SIZE = 16
INDEX_BATCH_SIZE = 128
# Below are intended to match the env variables names used by the official postgres docker image
# https://hub.docker.com/_/postgres

View File

@@ -70,7 +70,7 @@ DOCUMENT_ID_ENDPOINT = (
f"{VESPA_APP_CONTAINER_URL}/document/v1/default/danswer_chunk/docid"
)
SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/"
_BATCH_SIZE = 100 # Specific to Vespa
_BATCH_SIZE = 128 # Specific to Vespa
_NUM_THREADS = (
16 # since Vespa doesn't allow batching of inserts / updates, we use threads
)
@@ -412,7 +412,7 @@ def _vespa_hit_to_inference_chunk(hit: dict[str, Any]) -> InferenceChunk:
return InferenceChunk(
chunk_id=fields[CHUNK_ID],
blurb=fields[BLURB],
blurb=fields.get(BLURB, ""),
content=fields[CONTENT],
source_links=source_links_dict,
section_continuation=fields[SECTION_CONTINUATION],

View File

@@ -156,6 +156,32 @@ def split_chunk_text_into_mini_chunks(
return sentence_aware_splitter.split_text(chunk_text)
def split_string(s: str, chunk_size: int = 1024) -> str:
return [s[i : i + chunk_size] for i in range(0, len(s), chunk_size)]
def chunk_document_FAST(
document: Document,
chunk_tok_size: int = CHUNK_SIZE,
subsection_overlap: int = CHUNK_OVERLAP,
blurb_size: int = BLURB_SIZE,
) -> list[DocAwareChunk]:
chunks: list[DocAwareChunk] = []
for section in document.sections:
for ind, text in enumerate(split_string(section.text, 1024)):
chunks.append(
DocAwareChunk(
source_document=document,
chunk_id=len(chunks),
blurb="",
content=text,
source_links={0: section.link},
section_continuation=ind == 0,
)
)
return chunks
class Chunker:
@abc.abstractmethod
def chunk(self, document: Document) -> list[DocAwareChunk]:
@@ -164,4 +190,4 @@ class Chunker:
class DefaultChunker(Chunker):
def chunk(self, document: Document) -> list[DocAwareChunk]:
return chunk_document(document)
return chunk_document_FAST(document)

View File

@@ -1,3 +1,4 @@
import time
from functools import partial
from itertools import chain
from typing import Protocol
@@ -21,6 +22,7 @@ from danswer.indexing.models import DocAwareChunk
from danswer.indexing.models import DocMetadataAwareIndexChunk
from danswer.search.models import Embedder
from danswer.utils.logger import setup_logger
from danswer.utils.timing import log_function_time
logger = setup_logger()
@@ -74,29 +76,36 @@ def _indexing_pipeline(
with Session(get_sqlalchemy_engine()) as db_session:
# acquires a lock on the documents so that no other process can modify them
prepare_to_modify_documents(db_session=db_session, document_ids=document_ids)
log_function_time()(prepare_to_modify_documents)(
db_session=db_session, document_ids=document_ids
)
# create records in the source of truth about these documents
_upsert_documents(
log_function_time()(_upsert_documents)(
documents=documents,
index_attempt_metadata=index_attempt_metadata,
db_session=db_session,
)
start_time = time.time()
chunks: list[DocAwareChunk] = list(
chain(*[chunker.chunk(document=document) for document in documents])
)
logger.info(f"Chunking took {time.time() - start_time} seconds")
logger.debug(
f"Indexing the following chunks: {[chunk.to_short_descriptor() for chunk in chunks]}"
)
chunks_with_embeddings = embedder.embed(chunks=chunks)
chunks_with_embeddings = log_function_time()(embedder.embed)(chunks=chunks)
# Attach the latest status from Postgres (source of truth for access) to each
# chunk. This access status will be attached to each chunk in the document index
# TODO: attach document sets to the chunk based on the status of Postgres as well
document_id_to_access_info = get_access_for_documents(
document_id_to_access_info = log_function_time()(get_access_for_documents)(
document_ids=document_ids, db_session=db_session
)
start_time = time.time()
document_id_to_document_set = {
document_id: document_sets
for document_id, document_sets in fetch_document_sets_for_documents(
@@ -113,11 +122,12 @@ def _indexing_pipeline(
)
for chunk in chunks_with_embeddings
]
logger.info(f"Getting access info took {time.time() - start_time} seconds")
# A document will not be spread across different batches, so all the
# documents with chunks in this set, are fully represented by the chunks
# in this set
insertion_records = document_index.index(
insertion_records = log_function_time()(document_index.index)(
chunks=access_aware_chunks,
)

View File

@@ -310,7 +310,10 @@ def search_chunks(
)
# Keyword Search should never do reranking, no transformers involved in this flow
if query.search_type == SearchType.KEYWORD:
if (
query.search_type == SearchType.KEYWORD
or query.search_type == SearchType.HYBRID
):
_log_top_chunk_links(query.search_type.value, top_chunks)
return top_chunks, None

View File

@@ -0,0 +1,93 @@
import random
import time
import requests
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
from danswer.search.models import SearchType
question_bank = [
"Who was the first president of the United States?",
"What is photosynthesis?",
"How long is the Great Wall of China?",
"When was the Eiffel Tower constructed?",
"Who wrote 'Pride and Prejudice'?",
"What's the difference between mitosis and meiosis?",
"What is the capital of Brazil?",
"Who discovered penicillin?",
"What causes the Aurora Borealis?",
"When did the Titanic sink?",
"How does a combustion engine work?",
"Who is the author of 'The Odyssey'?",
"What is quantum physics?",
"When was the Mona Lisa painted?",
"What's the difference between a meteor and a meteorite?",
"Who founded the city of Rome?",
"What is the boiling point of water at sea level?",
"Who won the Nobel Prize in Literature in 1953?",
"How do honeybees produce honey?",
"What is the deepest part of the ocean?",
"When did the first humans arrive in the Americas?",
"What is the Fibonacci sequence?",
"How was the Grand Canyon formed?",
"Who composed the Moonlight Sonata?",
"What are the primary colors of light?",
"When did the Roman Empire fall?",
"How does photosynthesis contribute to the carbon cycle?",
"Who was the first woman in space?",
"What is the Pythagorean theorem?",
"Which planet is known as the 'Red Planet'?",
"Who is the father of modern physics?",
"What is the primary purpose of the United Nations?",
"How old is the Earth?",
"Who wrote 'Don Quixote'?",
"What is the structure of DNA?",
"When was the Declaration of Independence signed?",
"What causes a solar eclipse?",
"Who was the longest-reigning British monarch?",
"How do tornadoes form?",
"Who developed the theory of relativity?",
"What's the tallest mountain on Earth when measured from base to peak?",
"How many bones are there in the adult human body?",
"When was the Internet invented?",
"Who was the ancient Egyptian queen known for her relationship with Roman leaders?",
"What is the Krebs cycle?",
"Which country has the largest land area?",
"Who painted the Starry Night?",
"What's the difference between an alligator and a crocodile?",
"Who discovered the circulation of blood?",
"How many planets are there in our solar system?",
]
def _measure_hybrid_search_latency(filters: dict = {}):
start = time.monotonic()
response = requests.post(
"http://localhost:8080/document-search",
json={
"query": random.choice(question_bank),
"collection": DOCUMENT_INDEX_NAME,
"filters": filters,
"enable_auto_detect_filters": False,
"search_type": SearchType.HYBRID.value,
},
)
if not response.ok:
raise Exception(f"Failed to search: {response.text}")
return time.monotonic() - start
if __name__ == "__main__":
latencies: list[float] = []
for _ in range(50):
latencies.append(_measure_hybrid_search_latency())
print("Latency", latencies[-1])
print(f"Average latency: {sum(latencies) / len(latencies)}")
print("Testing with filters")
for _ in range(50):
latencies.append(
_measure_hybrid_search_latency(filters={"source_type": ["file"]})
)
print("Latency", latencies[-1])
print(f"Average latency: {sum(latencies) / len(latencies)}")

View File

@@ -18,7 +18,7 @@ server {
listen 80;
server_name ${DOMAIN};
client_max_body_size 500M; # Maximum upload size
client_max_body_size 0; # Maximum upload size
location ~ ^/api(.*)$ {
rewrite ^/api(/.*)$ $1 break;

View File

@@ -99,6 +99,15 @@ services:
- model_cache_torch:/root/.cache/torch/
- model_cache_nltk:/root/nltk_data/
- model_cache_huggingface:/root/.cache/huggingface/
deploy:
resources:
reservations:
devices:
- driver: nvidia
capabilities:
- gpu
- utility # nvidia-smi
- compute # CUDA
web_server:
image: danswer/danswer-web-server:latest
build: