Compare commits

..

1 Commits
k ... labels

Author SHA1 Message Date
pablodanswer
bf6efbe8e8 nit 2024-12-11 18:46:24 -08:00
18 changed files with 216 additions and 176 deletions

View File

@@ -640,41 +640,18 @@ def connector_indexing_proxy_task(
continue
if job.status == "error":
ignore_exitcode = False
exit_code: int | None = None
if job.process:
exit_code = job.process.exitcode
# seeing non-deterministic behavior where spawned tasks occasionally return exit code 1
# even though logging clearly indicates that they completed successfully
# to work around this, we ignore the job error state if the completion signal is OK
status_int = redis_connector_index.get_completion()
if status_int:
status_enum = HTTPStatus(status_int)
if status_enum == HTTPStatus.OK:
ignore_exitcode = True
if ignore_exitcode:
task_logger.warning(
"Indexing watchdog - spawned task has non-zero exit code "
"but completion signal is OK. Continuing...: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"exit_code={exit_code}"
)
else:
task_logger.error(
"Indexing watchdog - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"exit_code={exit_code} "
f"error={job.exception()}"
)
task_logger.error(
"Indexing watchdog - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"exit_code={exit_code} "
f"error={job.exception()}"
)
job.release()
break

View File

@@ -41,6 +41,7 @@ from danswer.connectors.salesforce.connector import SalesforceConnector
from danswer.connectors.sharepoint.connector import SharepointConnector
from danswer.connectors.slab.connector import SlabConnector
from danswer.connectors.slack.connector import SlackPollConnector
from danswer.connectors.slack.load_connector import SlackLoadConnector
from danswer.connectors.teams.connector import TeamsConnector
from danswer.connectors.web.connector import WebConnector
from danswer.connectors.wikipedia.connector import WikipediaConnector
@@ -63,6 +64,7 @@ def identify_connector_class(
DocumentSource.WEB: WebConnector,
DocumentSource.FILE: LocalFileConnector,
DocumentSource.SLACK: {
InputType.LOAD_STATE: SlackLoadConnector,
InputType.POLL: SlackPollConnector,
InputType.SLIM_RETRIEVAL: SlackPollConnector,
},

View File

@@ -134,6 +134,7 @@ def get_latest_message_time(thread: ThreadType) -> datetime:
def thread_to_doc(
workspace: str,
channel: ChannelType,
thread: ThreadType,
slack_cleaner: SlackTextCleaner,
@@ -178,7 +179,9 @@ def thread_to_doc(
id=f"{channel_id}__{thread[0]['ts']}",
sections=[
Section(
link=get_message_link(event=m, client=client, channel_id=channel_id),
link=get_message_link(
event=m, workspace=workspace, channel_id=channel_id
),
text=slack_cleaner.index_clean(cast(str, m["text"])),
)
for m in thread
@@ -262,6 +265,7 @@ def filter_channels(
def _get_all_docs(
client: WebClient,
workspace: str,
channels: list[str] | None = None,
channel_name_regex_enabled: bool = False,
oldest: str | None = None,
@@ -308,6 +312,7 @@ def _get_all_docs(
if filtered_thread:
channel_docs += 1
yield thread_to_doc(
workspace=workspace,
channel=channel,
thread=filtered_thread,
slack_cleaner=slack_cleaner,
@@ -370,12 +375,14 @@ def _get_all_doc_ids(
class SlackPollConnector(PollConnector, SlimConnector):
def __init__(
self,
workspace: str,
channels: list[str] | None = None,
# if specified, will treat the specified channel strings as
# regexes, and will only index channels that fully match the regexes
channel_regex_enabled: bool = False,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.workspace = workspace
self.channels = channels
self.channel_regex_enabled = channel_regex_enabled
self.batch_size = batch_size
@@ -409,6 +416,7 @@ class SlackPollConnector(PollConnector, SlimConnector):
documents: list[Document] = []
for document in _get_all_docs(
client=self.client,
workspace=self.workspace,
channels=self.channels,
channel_name_regex_enabled=self.channel_regex_enabled,
# NOTE: need to impute to `None` instead of using 0.0, since Slack will
@@ -432,6 +440,7 @@ if __name__ == "__main__":
slack_channel = os.environ.get("SLACK_CHANNEL")
connector = SlackPollConnector(
workspace=os.environ["SLACK_WORKSPACE"],
channels=[slack_channel] if slack_channel else None,
)
connector.load_credentials({"slack_bot_token": os.environ["SLACK_BOT_TOKEN"]})

View File

@@ -0,0 +1,140 @@
import json
import os
from datetime import datetime
from datetime import timezone
from pathlib import Path
from typing import Any
from typing import cast
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.connectors.slack.connector import filter_channels
from danswer.connectors.slack.utils import get_message_link
from danswer.utils.logger import setup_logger
logger = setup_logger()
def get_event_time(event: dict[str, Any]) -> datetime | None:
ts = event.get("ts")
if not ts:
return None
return datetime.fromtimestamp(float(ts), tz=timezone.utc)
class SlackLoadConnector(LoadConnector):
# WARNING: DEPRECATED, DO NOT USE
def __init__(
self,
workspace: str,
export_path_str: str,
channels: list[str] | None = None,
# if specified, will treat the specified channel strings as
# regexes, and will only index channels that fully match the regexes
channel_regex_enabled: bool = False,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.workspace = workspace
self.channels = channels
self.channel_regex_enabled = channel_regex_enabled
self.export_path_str = export_path_str
self.batch_size = batch_size
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
if credentials:
logger.warning("Unexpected credentials provided for Slack Load Connector")
return None
@staticmethod
def _process_batch_event(
slack_event: dict[str, Any],
channel: dict[str, Any],
matching_doc: Document | None,
workspace: str,
) -> Document | None:
if (
slack_event["type"] == "message"
and slack_event.get("subtype") != "channel_join"
):
if matching_doc:
return Document(
id=matching_doc.id,
sections=matching_doc.sections
+ [
Section(
link=get_message_link(
event=slack_event,
workspace=workspace,
channel_id=channel["id"],
),
text=slack_event["text"],
)
],
source=matching_doc.source,
semantic_identifier=matching_doc.semantic_identifier,
title="", # slack docs don't really have a "title"
doc_updated_at=get_event_time(slack_event),
metadata=matching_doc.metadata,
)
return Document(
id=slack_event["ts"],
sections=[
Section(
link=get_message_link(
event=slack_event,
workspace=workspace,
channel_id=channel["id"],
),
text=slack_event["text"],
)
],
source=DocumentSource.SLACK,
semantic_identifier=channel["name"],
title="", # slack docs don't really have a "title"
doc_updated_at=get_event_time(slack_event),
metadata={},
)
return None
def load_from_state(self) -> GenerateDocumentsOutput:
export_path = Path(self.export_path_str)
with open(export_path / "channels.json") as f:
all_channels = json.load(f)
filtered_channels = filter_channels(
all_channels, self.channels, self.channel_regex_enabled
)
document_batch: dict[str, Document] = {}
for channel_info in filtered_channels:
channel_dir_path = export_path / cast(str, channel_info["name"])
channel_file_paths = [
channel_dir_path / file_name
for file_name in os.listdir(channel_dir_path)
]
for path in channel_file_paths:
with open(path) as f:
events = cast(list[dict[str, Any]], json.load(f))
for slack_event in events:
doc = self._process_batch_event(
slack_event=slack_event,
channel=channel_info,
matching_doc=document_batch.get(
slack_event.get("thread_ts", "")
),
workspace=self.workspace,
)
if doc:
document_batch[doc.id] = doc
if len(document_batch) >= self.batch_size:
yield list(document_batch.values())
yield list(document_batch.values())

View File

@@ -2,7 +2,6 @@ import re
import time
from collections.abc import Callable
from collections.abc import Generator
from functools import lru_cache
from functools import wraps
from typing import Any
from typing import cast
@@ -22,21 +21,19 @@ basic_retry_wrapper = retry_builder()
_SLACK_LIMIT = 900
@lru_cache()
def get_base_url(token: str) -> str:
"""Retrieve and cache the base URL of the Slack workspace based on the client token."""
client = WebClient(token=token)
return client.auth_test()["url"]
def get_message_link(
event: dict[str, Any], client: WebClient, channel_id: str | None = None
event: dict[str, Any], workspace: str, channel_id: str | None = None
) -> str:
channel_id = channel_id or event["channel"]
message_ts = event["ts"]
response = client.chat_getPermalink(channel=channel_id, message_ts=message_ts)
permalink = response["permalink"]
return permalink
channel_id = channel_id or cast(
str, event["channel"]
) # channel must either be present in the event or passed in
message_ts = cast(str, event["ts"])
message_ts_without_dot = message_ts.replace(".", "")
thread_ts = cast(str | None, event.get("thread_ts"))
return (
f"https://{workspace}.slack.com/archives/{channel_id}/p{message_ts_without_dot}"
+ (f"?thread_ts={thread_ts}" if thread_ts else "")
)
def _make_slack_api_call_logged(

View File

@@ -1521,7 +1521,6 @@ class SlackBot(Base):
slack_channel_configs: Mapped[list[SlackChannelConfig]] = relationship(
"SlackChannelConfig",
back_populates="slack_bot",
cascade="all, delete-orphan",
)

View File

@@ -148,7 +148,6 @@ class Indexable(abc.ABC):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]:
"""
Takes a list of document chunks and indexes them in the document index
@@ -166,14 +165,9 @@ class Indexable(abc.ABC):
only needs to index chunks into the PRIMARY index. Do not update the secondary index here,
it is done automatically outside of this code.
NOTE: The fresh_index parameter, when set to True, assumes no documents have been previously
indexed for the given index/tenant. This can be used to optimize the indexing process for
new or empty indices.
Parameters:
- chunks: Document chunks with all of the information needed for indexing to the document
index.
- fresh_index: Boolean indicating whether this is a fresh index with no existing documents.
Returns:
List of document ids which map to unique documents and are used for deduping chunks

View File

@@ -306,7 +306,6 @@ class VespaIndex(DocumentIndex):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
fresh_index: bool = False,
) -> set[DocumentInsertionRecord]:
"""Receive a list of chunks from a batch of documents and index the chunks into Vespa along
with updating the associated permissions. Assumes that a document will not be split into
@@ -323,29 +322,26 @@ class VespaIndex(DocumentIndex):
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
get_vespa_http_client() as http_client,
):
if not fresh_index:
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
first_chunks = [
chunk for chunk in cleaned_chunks if chunk.chunk_id == 0
]
for chunk_batch in batch_generator(first_chunks, BATCH_SIZE):
existing_docs.update(
get_existing_documents_from_chunks(
chunks=chunk_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
)
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
document_ids=doc_id_batch,
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0]
for chunk_batch in batch_generator(first_chunks, BATCH_SIZE):
existing_docs.update(
get_existing_documents_from_chunks(
chunks=chunk_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
)
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
document_ids=doc_id_batch,
index_name=self.index_name,
http_client=http_client,
executor=executor,
)
for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE):
batch_index_vespa_chunks(

View File

@@ -216,7 +216,7 @@ def seed_initial_documents(
# as we just sent over the Vespa schema and there is a slight delay
index_with_retries = retry_builder()(document_index.index)
index_with_retries(chunks=chunks, fresh_index=True)
index_with_retries(chunks=chunks)
# Mock a run for the UI even though it did not actually call out to anything
mock_successful_index_attempt(

View File

@@ -39,7 +39,6 @@ from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder
from danswer.natural_language_processing.search_nlp_models import warm_up_cross_encoder
from danswer.seeding.load_docs import seed_initial_documents
from danswer.seeding.load_yamls import load_chat_yamls
from danswer.server.manage.llm.models import LLMProviderUpsertRequest
from danswer.server.settings.store import load_settings
@@ -151,7 +150,7 @@ def setup_danswer(
# update multipass indexing setting based on GPU availability
update_default_multipass_indexing(db_session)
seed_initial_documents(db_session, tenant_id, cohere_enabled)
# seed_initial_documents(db_session, tenant_id, cohere_enabled)
def translate_saved_search_settings(db_session: Session) -> None:

View File

@@ -67,6 +67,7 @@ def test_slack_permission_sync(
input_type=InputType.POLL,
source=DocumentSource.SLACK,
connector_specific_config={
"workspace": "onyx-test-workspace",
"channels": [public_channel["name"], private_channel["name"]],
},
access_type=AccessType.SYNC,
@@ -280,6 +281,7 @@ def test_slack_group_permission_sync(
input_type=InputType.POLL,
source=DocumentSource.SLACK,
connector_specific_config={
"workspace": "onyx-test-workspace",
"channels": [private_channel["name"]],
},
access_type=AccessType.SYNC,

View File

@@ -61,6 +61,7 @@ def test_slack_prune(
input_type=InputType.POLL,
source=DocumentSource.SLACK,
connector_specific_config={
"workspace": "onyx-test-workspace",
"channels": [public_channel["name"], private_channel["name"]],
},
access_type=AccessType.PUBLIC,

View File

@@ -1,74 +0,0 @@
# Docker service resource limits. Most are commented out by default.
# 'background' service has preset (override-able) limits due to variable resource needs.
# Uncomment and set env vars for specific service limits.
# See: https://docs.danswer.dev/deployment/resource-sizing for details.
services:
background:
deploy:
resources:
limits:
cpus: ${BACKGROUND_CPU_LIMIT:-4}
memory: ${BACKGROUND_MEM_LIMIT:-4g}
# reservations:
# cpus: ${BACKGROUND_CPU_RESERVATION}
# memory: ${BACKGROUND_MEM_RESERVATION}
# nginx:
# deploy:
# resources:
# limits:
# cpus: ${NGINX_CPU_LIMIT}
# memory: ${NGINX_MEM_LIMIT}
# reservations:
# cpus: ${NGINX_CPU_RESERVATION}
# memory: ${NGINX_MEM_RESERVATION}
# api_server:
# deploy:
# resources:
# limits:
# cpus: ${API_SERVER_CPU_LIMIT}
# memory: ${API_SERVER_MEM_LIMIT}
# reservations:
# cpus: ${API_SERVER_CPU_RESERVATION}
# memory: ${API_SERVER_MEM_RESERVATION}
# index:
# deploy:
# resources:
# limits:
# cpus: ${VESPA_CPU_LIMIT}
# memory: ${VESPA_MEM_LIMIT}
# reservations:
# cpus: ${VESPA_CPU_RESERVATION}
# memory: ${VESPA_MEM_RESERVATION}
# inference_model_server:
# deploy:
# resources:
# limits:
# cpus: ${INFERENCE_CPU_LIMIT}
# memory: ${INFERENCE_MEM_LIMIT}
# reservations:
# cpus: ${INFERENCE_CPU_RESERVATION}
# memory: ${INFERENCE_MEM_RESERVATION}
# indexing_model_server:
# deploy:
# resources:
# limits:
# cpus: ${INDEXING_CPU_LIMIT}
# memory: ${INDEXING_MEM_LIMIT}
# reservations:
# cpus: ${INDEXING_CPU_RESERVATION}
# memory: ${INDEXING_MEM_RESERVATION}
# relational_db:
# deploy:
# resources:
# limits:
# cpus: ${POSTGRES_CPU_LIMIT}
# memory: ${POSTGRES_MEM_LIMIT}
# reservations:
# cpus: ${POSTGRES_CPU_RESERVATION}
# memory: ${POSTGRES_MEM_RESERVATION}

View File

@@ -96,16 +96,6 @@ export function SlackBotTable({ slackBots }: { slackBots: SlackBot[] }) {
</ClickableTableRow>
);
})}
{slackBots.length === 0 && (
<TableRow>
<TableCell
colSpan={4}
className="text-center text-muted-foreground"
>
Please add a New Slack Bot to begin chatting with Danswer!
</TableCell>
</TableRow>
)}
</TableBody>
</Table>
{slackBots.length > NUM_IN_PAGE && (

View File

@@ -61,12 +61,7 @@ export async function OPTIONS(
}
async function handleRequest(request: NextRequest, path: string[]) {
if (
process.env.NODE_ENV !== "development" &&
// NOTE: Set this environment variable to 'true' for preview environments
// Where you want finer-grained control over API access
process.env.OVERRIDE_API_PRODUCTION !== "true"
) {
if (process.env.NODE_ENV !== "development") {
return NextResponse.json(
{
message:

View File

@@ -145,6 +145,7 @@ export function TextFormField({
min,
onChange,
width,
vertical,
}: {
value?: string;
name: string;
@@ -170,6 +171,7 @@ export function TextFormField({
min?: number;
onChange?: (e: React.ChangeEvent<HTMLInputElement>) => void;
width?: string;
vertical?: boolean;
}) {
let heightString = defaultHeight || "";
if (isTextArea && !heightString) {
@@ -209,14 +211,16 @@ export function TextFormField({
return (
<div className={`w-full ${width}`}>
<div className="flex flex-col gap-x-2 items-start">
{!removeLabel && (
<Label className={sizeClass.label} small={small}>
{label}
</Label>
)}
{optional ? <span>(optional) </span> : ""}
{tooltip && <ToolTipDetails>{tooltip}</ToolTipDetails>}
<div className={`flex ${vertical ? "flex-col" : "flex-row"} items-start`}>
<div className="flex gap-x-2 items-center">
{!removeLabel && (
<Label className={sizeClass.label} small={small}>
{label}
</Label>
)}
{optional ? <span>(optional) </span> : ""}
{tooltip && <ToolTipDetails>{tooltip}</ToolTipDetails>}
</div>
{error ? (
<ManualErrorMessage>{error}</ManualErrorMessage>
) : (

View File

@@ -44,6 +44,7 @@ const EditPropertyModal = ({
</h2>
<TextFormField
vertical
label={propertyDetails || ""}
name="propertyValue"
placeholder="Property value"

View File

@@ -546,7 +546,15 @@ Hint: Use the singular form of the object name (e.g., 'Opportunity' instead of '
},
slack: {
description: "Configure Slack connector",
values: [],
values: [
{
type: "text",
query: "Enter the Slack workspace:",
label: "Workspace",
name: "workspace",
optional: false,
},
],
advanced_values: [
{
type: "list",