Compare commits

..

18 Commits
labels ... k

Author SHA1 Message Date
Richard Kuo (Danswer)
a13dea160a use redis completion signal to double check exit code 2024-12-12 10:56:11 -08:00
pablonyx
ca172f3306 Merge pull request #3442 from onyx-dot-app/vespa_seeding_fix
Update initial seeding for latency requirements
2024-12-12 09:59:50 -08:00
pablodanswer
e5d0587efa pre-commit 2024-12-12 09:12:08 -08:00
pablonyx
a9516202fe update conditional (#3446) 2024-12-12 17:07:30 +00:00
Richard Kuo
d23fca96c4 reverse commit (fix later) 2024-12-11 22:19:10 -08:00
pablodanswer
a45724c899 run black 2024-12-11 19:18:06 -08:00
pablodanswer
34e250407a k 2024-12-11 19:14:10 -08:00
pablodanswer
046c0fbe3e update indexing 2024-12-11 19:08:05 -08:00
pablonyx
76595facef Merge pull request #3432 from onyx-dot-app/vercel_preview
Enable Vercel Preview
2024-12-11 18:55:14 -08:00
pablodanswer
af2d548766 k 2024-12-11 18:52:47 -08:00
pablonyx
0770a587f1 remove slack workspace (#3394)
* remove slack workspace

* update client tokens

* fix up

* clean up docs

* fix up tests
2024-12-12 01:01:43 +00:00
hagen-danswer
748b79b0ef Added text for empty table and cascade delete for slack bot deletion (#3390)
* fixed fk issue for slack bot deletion

* Added text for empty table and cascade delete for slack bot deletion
2024-12-12 01:00:32 +00:00
pablonyx
9cacb373ef let users specify resourcing caps (#3403)
* let users specify resourcing caps

* functioanl resource limits

* improve defaults

* k

* update

* update comment + refer to proper resource

* self nit

* update var names
2024-12-12 00:59:41 +00:00
pablodanswer
f5d638161b k 2024-12-11 15:35:44 -08:00
pablodanswer
0b5013b47d k 2024-12-11 15:34:26 -08:00
pablodanswer
1b846fbf06 update config 2024-12-11 15:17:11 -08:00
pablodanswer
d7f8cf8f18 testing 2024-12-11 13:36:10 -08:00
pablodanswer
5d810d373e k 2024-12-11 13:32:09 -08:00
18 changed files with 176 additions and 216 deletions

View File

@@ -640,18 +640,41 @@ def connector_indexing_proxy_task(
continue
if job.status == "error":
ignore_exitcode = False
exit_code: int | None = None
if job.process:
exit_code = job.process.exitcode
task_logger.error(
"Indexing watchdog - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"exit_code={exit_code} "
f"error={job.exception()}"
)
# seeing non-deterministic behavior where spawned tasks occasionally return exit code 1
# even though logging clearly indicates that they completed successfully
# to work around this, we ignore the job error state if the completion signal is OK
status_int = redis_connector_index.get_completion()
if status_int:
status_enum = HTTPStatus(status_int)
if status_enum == HTTPStatus.OK:
ignore_exitcode = True
if ignore_exitcode:
task_logger.warning(
"Indexing watchdog - spawned task has non-zero exit code "
"but completion signal is OK. Continuing...: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"exit_code={exit_code}"
)
else:
task_logger.error(
"Indexing watchdog - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"exit_code={exit_code} "
f"error={job.exception()}"
)
job.release()
break

View File

@@ -41,7 +41,6 @@ from danswer.connectors.salesforce.connector import SalesforceConnector
from danswer.connectors.sharepoint.connector import SharepointConnector
from danswer.connectors.slab.connector import SlabConnector
from danswer.connectors.slack.connector import SlackPollConnector
from danswer.connectors.slack.load_connector import SlackLoadConnector
from danswer.connectors.teams.connector import TeamsConnector
from danswer.connectors.web.connector import WebConnector
from danswer.connectors.wikipedia.connector import WikipediaConnector
@@ -64,7 +63,6 @@ def identify_connector_class(
DocumentSource.WEB: WebConnector,
DocumentSource.FILE: LocalFileConnector,
DocumentSource.SLACK: {
InputType.LOAD_STATE: SlackLoadConnector,
InputType.POLL: SlackPollConnector,
InputType.SLIM_RETRIEVAL: SlackPollConnector,
},

View File

@@ -134,7 +134,6 @@ def get_latest_message_time(thread: ThreadType) -> datetime:
def thread_to_doc(
workspace: str,
channel: ChannelType,
thread: ThreadType,
slack_cleaner: SlackTextCleaner,
@@ -179,9 +178,7 @@ def thread_to_doc(
id=f"{channel_id}__{thread[0]['ts']}",
sections=[
Section(
link=get_message_link(
event=m, workspace=workspace, channel_id=channel_id
),
link=get_message_link(event=m, client=client, channel_id=channel_id),
text=slack_cleaner.index_clean(cast(str, m["text"])),
)
for m in thread
@@ -265,7 +262,6 @@ def filter_channels(
def _get_all_docs(
client: WebClient,
workspace: str,
channels: list[str] | None = None,
channel_name_regex_enabled: bool = False,
oldest: str | None = None,
@@ -312,7 +308,6 @@ def _get_all_docs(
if filtered_thread:
channel_docs += 1
yield thread_to_doc(
workspace=workspace,
channel=channel,
thread=filtered_thread,
slack_cleaner=slack_cleaner,
@@ -375,14 +370,12 @@ def _get_all_doc_ids(
class SlackPollConnector(PollConnector, SlimConnector):
def __init__(
self,
workspace: str,
channels: list[str] | None = None,
# if specified, will treat the specified channel strings as
# regexes, and will only index channels that fully match the regexes
channel_regex_enabled: bool = False,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.workspace = workspace
self.channels = channels
self.channel_regex_enabled = channel_regex_enabled
self.batch_size = batch_size
@@ -416,7 +409,6 @@ class SlackPollConnector(PollConnector, SlimConnector):
documents: list[Document] = []
for document in _get_all_docs(
client=self.client,
workspace=self.workspace,
channels=self.channels,
channel_name_regex_enabled=self.channel_regex_enabled,
# NOTE: need to impute to `None` instead of using 0.0, since Slack will
@@ -440,7 +432,6 @@ if __name__ == "__main__":
slack_channel = os.environ.get("SLACK_CHANNEL")
connector = SlackPollConnector(
workspace=os.environ["SLACK_WORKSPACE"],
channels=[slack_channel] if slack_channel else None,
)
connector.load_credentials({"slack_bot_token": os.environ["SLACK_BOT_TOKEN"]})

View File

@@ -1,140 +0,0 @@
import json
import os
from datetime import datetime
from datetime import timezone
from pathlib import Path
from typing import Any
from typing import cast
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.connectors.slack.connector import filter_channels
from danswer.connectors.slack.utils import get_message_link
from danswer.utils.logger import setup_logger
logger = setup_logger()
def get_event_time(event: dict[str, Any]) -> datetime | None:
ts = event.get("ts")
if not ts:
return None
return datetime.fromtimestamp(float(ts), tz=timezone.utc)
class SlackLoadConnector(LoadConnector):
# WARNING: DEPRECATED, DO NOT USE
def __init__(
self,
workspace: str,
export_path_str: str,
channels: list[str] | None = None,
# if specified, will treat the specified channel strings as
# regexes, and will only index channels that fully match the regexes
channel_regex_enabled: bool = False,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.workspace = workspace
self.channels = channels
self.channel_regex_enabled = channel_regex_enabled
self.export_path_str = export_path_str
self.batch_size = batch_size
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
if credentials:
logger.warning("Unexpected credentials provided for Slack Load Connector")
return None
@staticmethod
def _process_batch_event(
slack_event: dict[str, Any],
channel: dict[str, Any],
matching_doc: Document | None,
workspace: str,
) -> Document | None:
if (
slack_event["type"] == "message"
and slack_event.get("subtype") != "channel_join"
):
if matching_doc:
return Document(
id=matching_doc.id,
sections=matching_doc.sections
+ [
Section(
link=get_message_link(
event=slack_event,
workspace=workspace,
channel_id=channel["id"],
),
text=slack_event["text"],
)
],
source=matching_doc.source,
semantic_identifier=matching_doc.semantic_identifier,
title="", # slack docs don't really have a "title"
doc_updated_at=get_event_time(slack_event),
metadata=matching_doc.metadata,
)
return Document(
id=slack_event["ts"],
sections=[
Section(
link=get_message_link(
event=slack_event,
workspace=workspace,
channel_id=channel["id"],
),
text=slack_event["text"],
)
],
source=DocumentSource.SLACK,
semantic_identifier=channel["name"],
title="", # slack docs don't really have a "title"
doc_updated_at=get_event_time(slack_event),
metadata={},
)
return None
def load_from_state(self) -> GenerateDocumentsOutput:
export_path = Path(self.export_path_str)
with open(export_path / "channels.json") as f:
all_channels = json.load(f)
filtered_channels = filter_channels(
all_channels, self.channels, self.channel_regex_enabled
)
document_batch: dict[str, Document] = {}
for channel_info in filtered_channels:
channel_dir_path = export_path / cast(str, channel_info["name"])
channel_file_paths = [
channel_dir_path / file_name
for file_name in os.listdir(channel_dir_path)
]
for path in channel_file_paths:
with open(path) as f:
events = cast(list[dict[str, Any]], json.load(f))
for slack_event in events:
doc = self._process_batch_event(
slack_event=slack_event,
channel=channel_info,
matching_doc=document_batch.get(
slack_event.get("thread_ts", "")
),
workspace=self.workspace,
)
if doc:
document_batch[doc.id] = doc
if len(document_batch) >= self.batch_size:
yield list(document_batch.values())
yield list(document_batch.values())

View File

@@ -2,6 +2,7 @@ import re
import time
from collections.abc import Callable
from collections.abc import Generator
from functools import lru_cache
from functools import wraps
from typing import Any
from typing import cast
@@ -21,19 +22,21 @@ basic_retry_wrapper = retry_builder()
_SLACK_LIMIT = 900
@lru_cache()
def get_base_url(token: str) -> str:
"""Retrieve and cache the base URL of the Slack workspace based on the client token."""
client = WebClient(token=token)
return client.auth_test()["url"]
def get_message_link(
event: dict[str, Any], workspace: str, channel_id: str | None = None
event: dict[str, Any], client: WebClient, channel_id: str | None = None
) -> str:
channel_id = channel_id or cast(
str, event["channel"]
) # channel must either be present in the event or passed in
message_ts = cast(str, event["ts"])
message_ts_without_dot = message_ts.replace(".", "")
thread_ts = cast(str | None, event.get("thread_ts"))
return (
f"https://{workspace}.slack.com/archives/{channel_id}/p{message_ts_without_dot}"
+ (f"?thread_ts={thread_ts}" if thread_ts else "")
)
channel_id = channel_id or event["channel"]
message_ts = event["ts"]
response = client.chat_getPermalink(channel=channel_id, message_ts=message_ts)
permalink = response["permalink"]
return permalink
def _make_slack_api_call_logged(

View File

@@ -1521,6 +1521,7 @@ class SlackBot(Base):
slack_channel_configs: Mapped[list[SlackChannelConfig]] = relationship(
"SlackChannelConfig",
back_populates="slack_bot",
cascade="all, delete-orphan",
)

View File

@@ -148,6 +148,7 @@ class Indexable(abc.ABC):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]:
"""
Takes a list of document chunks and indexes them in the document index
@@ -165,9 +166,14 @@ class Indexable(abc.ABC):
only needs to index chunks into the PRIMARY index. Do not update the secondary index here,
it is done automatically outside of this code.
NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously
indexed for the given index/tenant. This can be used to optimize the indexing process for
new or empty indices.
Parameters:
- chunks: Document chunks with all of the information needed for indexing to the document
index.
- fresh_index: Boolean indicating whether this is a fresh index with no existing documents.
Returns:
List of document ids which map to unique documents and are used for deduping chunks

View File

@@ -306,6 +306,7 @@ class VespaIndex(DocumentIndex):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]:
"""Receive a list of chunks from a batch of documents and index the chunks into Vespa along
with updating the associated permissions. Assumes that a document will not be split into
@@ -322,26 +323,29 @@ class VespaIndex(DocumentIndex):
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
get_vespa_http_client() as http_client,
):
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0]
for chunk_batch in batch_generator(first_chunks, BATCH_SIZE):
existing_docs.update(
get_existing_documents_from_chunks(
chunks=chunk_batch,
if not fresh_index:
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
first_chunks = [
chunk for chunk in cleaned_chunks if chunk.chunk_id == 0
]
for chunk_batch in batch_generator(first_chunks, BATCH_SIZE):
existing_docs.update(
get_existing_documents_from_chunks(
chunks=chunk_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
)
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
document_ids=doc_id_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
)
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
document_ids=doc_id_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE):
batch_index_vespa_chunks(

View File

@@ -216,7 +216,7 @@ def seed_initial_documents(
# as we just sent over the Vespa schema and there is a slight delay
index_with_retries = retry_builder()(document_index.index)
index_with_retries(chunks=chunks)
index_with_retries(chunks=chunks, fresh_index=True)
# Mock a run for the UI even though it did not actually call out to anything
mock_successful_index_attempt(

View File

@@ -39,6 +39,7 @@ from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder
from danswer.natural_language_processing.search_nlp_models import warm_up_cross_encoder
from danswer.seeding.load_docs import seed_initial_documents
from danswer.seeding.load_yamls import load_chat_yamls
from danswer.server.manage.llm.models import LLMProviderUpsertRequest
from danswer.server.settings.store import load_settings
@@ -150,7 +151,7 @@ def setup_danswer(
# update multipass indexing setting based on GPU availability
update_default_multipass_indexing(db_session)
# seed_initial_documents(db_session, tenant_id, cohere_enabled)
seed_initial_documents(db_session, tenant_id, cohere_enabled)
def translate_saved_search_settings(db_session: Session) -> None:

View File

@@ -67,7 +67,6 @@ def test_slack_permission_sync(
input_type=InputType.POLL,
source=DocumentSource.SLACK,
connector_specific_config={
"workspace": "onyx-test-workspace",
"channels": [public_channel["name"], private_channel["name"]],
},
access_type=AccessType.SYNC,
@@ -281,7 +280,6 @@ def test_slack_group_permission_sync(
input_type=InputType.POLL,
source=DocumentSource.SLACK,
connector_specific_config={
"workspace": "onyx-test-workspace",
"channels": [private_channel["name"]],
},
access_type=AccessType.SYNC,

View File

@@ -61,7 +61,6 @@ def test_slack_prune(
input_type=InputType.POLL,
source=DocumentSource.SLACK,
connector_specific_config={
"workspace": "onyx-test-workspace",
"channels": [public_channel["name"], private_channel["name"]],
},
access_type=AccessType.PUBLIC,

View File

@@ -0,0 +1,74 @@
# Docker service resource limits. Most are commented out by default.
# 'background' service has preset (override-able) limits due to variable resource needs.
# Uncomment and set env vars for specific service limits.
# See: https://docs.danswer.dev/deployment/resource-sizing for details.
services:
background:
deploy:
resources:
limits:
cpus: ${BACKGROUND_CPU_LIMIT:-4}
memory: ${BACKGROUND_MEM_LIMIT:-4g}
# reservations:
# cpus: ${BACKGROUND_CPU_RESERVATION}
# memory: ${BACKGROUND_MEM_RESERVATION}
# nginx:
# deploy:
# resources:
# limits:
# cpus: ${NGINX_CPU_LIMIT}
# memory: ${NGINX_MEM_LIMIT}
# reservations:
# cpus: ${NGINX_CPU_RESERVATION}
# memory: ${NGINX_MEM_RESERVATION}
# api_server:
# deploy:
# resources:
# limits:
# cpus: ${API_SERVER_CPU_LIMIT}
# memory: ${API_SERVER_MEM_LIMIT}
# reservations:
# cpus: ${API_SERVER_CPU_RESERVATION}
# memory: ${API_SERVER_MEM_RESERVATION}
# index:
# deploy:
# resources:
# limits:
# cpus: ${VESPA_CPU_LIMIT}
# memory: ${VESPA_MEM_LIMIT}
# reservations:
# cpus: ${VESPA_CPU_RESERVATION}
# memory: ${VESPA_MEM_RESERVATION}
# inference_model_server:
# deploy:
# resources:
# limits:
# cpus: ${INFERENCE_CPU_LIMIT}
# memory: ${INFERENCE_MEM_LIMIT}
# reservations:
# cpus: ${INFERENCE_CPU_RESERVATION}
# memory: ${INFERENCE_MEM_RESERVATION}
# indexing_model_server:
# deploy:
# resources:
# limits:
# cpus: ${INDEXING_CPU_LIMIT}
# memory: ${INDEXING_MEM_LIMIT}
# reservations:
# cpus: ${INDEXING_CPU_RESERVATION}
# memory: ${INDEXING_MEM_RESERVATION}
# relational_db:
# deploy:
# resources:
# limits:
# cpus: ${POSTGRES_CPU_LIMIT}
# memory: ${POSTGRES_MEM_LIMIT}
# reservations:
# cpus: ${POSTGRES_CPU_RESERVATION}
# memory: ${POSTGRES_MEM_RESERVATION}

View File

@@ -96,6 +96,16 @@ export function SlackBotTable({ slackBots }: { slackBots: SlackBot[] }) {
</ClickableTableRow>
);
})}
{slackBots.length === 0 && (
<TableRow>
<TableCell
colSpan={4}
className="text-center text-muted-foreground"
>
Please add a New Slack Bot to begin chatting with Danswer!
</TableCell>
</TableRow>
)}
</TableBody>
</Table>
{slackBots.length > NUM_IN_PAGE && (

View File

@@ -61,7 +61,12 @@ export async function OPTIONS(
}
async function handleRequest(request: NextRequest, path: string[]) {
if (process.env.NODE_ENV !== "development") {
if (
process.env.NODE_ENV !== "development" &&
// NOTE: Set this environment variable to 'true' for preview environments
// Where you want finer-grained control over API access
process.env.OVERRIDE_API_PRODUCTION !== "true"
) {
return NextResponse.json(
{
message:

View File

@@ -145,7 +145,6 @@ export function TextFormField({
min,
onChange,
width,
vertical,
}: {
value?: string;
name: string;
@@ -171,7 +170,6 @@ export function TextFormField({
min?: number;
onChange?: (e: React.ChangeEvent<HTMLInputElement>) => void;
width?: string;
vertical?: boolean;
}) {
let heightString = defaultHeight || "";
if (isTextArea && !heightString) {
@@ -211,16 +209,14 @@ export function TextFormField({
return (
<div className={`w-full ${width}`}>
<div className={`flex ${vertical ? "flex-col" : "flex-row"} items-start`}>
<div className="flex gap-x-2 items-center">
{!removeLabel && (
<Label className={sizeClass.label} small={small}>
{label}
</Label>
)}
{optional ? <span>(optional) </span> : ""}
{tooltip && <ToolTipDetails>{tooltip}</ToolTipDetails>}
</div>
<div className="flex flex-col gap-x-2 items-start">
{!removeLabel && (
<Label className={sizeClass.label} small={small}>
{label}
</Label>
)}
{optional ? <span>(optional) </span> : ""}
{tooltip && <ToolTipDetails>{tooltip}</ToolTipDetails>}
{error ? (
<ManualErrorMessage>{error}</ManualErrorMessage>
) : (

View File

@@ -44,7 +44,6 @@ const EditPropertyModal = ({
</h2>
<TextFormField
vertical
label={propertyDetails || ""}
name="propertyValue"
placeholder="Property value"

View File

@@ -546,15 +546,7 @@ Hint: Use the singular form of the object name (e.g., 'Opportunity' instead of '
},
slack: {
description: "Configure Slack connector",
values: [
{
type: "text",
query: "Enter the Slack workspace:",
label: "Workspace",
name: "workspace",
optional: false,
},
],
values: [],
advanced_values: [
{
type: "list",