Compare commits

...

17 Commits

Author SHA1 Message Date
pablodanswer
cd806356d2 update config 2024-11-11 19:33:15 -08:00
pablodanswer
05cb9f7fe4 ensure we properly expose name(space) for slackbot 2024-11-11 19:26:59 -08:00
pablodanswer
7128b43458 update configs 2024-11-11 18:50:46 -08:00
pablodanswer
a1711a3e24 update 2024-11-11 15:56:03 -08:00
pablodanswer
3409e768f6 update 2024-11-11 15:56:03 -08:00
pablodanswer
028789b83f minor improvement 2024-11-11 15:56:03 -08:00
pablodanswer
f2144f7453 finalize 2024-11-11 15:56:03 -08:00
pablodanswer
c614ca9828 add cohere default 2024-11-11 15:56:03 -08:00
pablodanswer
379d569c61 include reset engine! 2024-11-11 15:55:32 -08:00
pablodanswer
53f9d94ceb revert 2024-11-11 14:39:23 -08:00
pablodanswer
5058d898b8 update some configs 2024-11-11 14:38:10 -08:00
pablodanswer
bc7de4ec1b moderate slackbot switch 2024-11-11 11:58:17 -08:00
pablodanswer
3ad98078f5 finalized keda 2024-11-10 18:40:08 -08:00
pablodanswer
0fb12b42f1 minor update 2024-11-10 17:46:33 -08:00
pablodanswer
158329a3cc finalize slackbot improvements 2024-11-10 17:45:13 -08:00
pablodanswer
7f1a50823b fix typing 2024-11-10 17:31:05 -08:00
pablodanswer
0e76bcef45 add improved cloud configuration 2024-11-10 17:28:43 -08:00
28 changed files with 3969 additions and 10982 deletions

View File

@@ -12,6 +12,7 @@ from danswer.db.engine import get_all_tenant_ids
from danswer.db.engine import SqlEngine
from danswer.utils.logger import setup_logger
from danswer.utils.variable_functionality import fetch_versioned_implementation
from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST
from shared_configs.configs import MULTI_TENANT
logger = setup_logger(__name__)
@@ -72,6 +73,15 @@ class DynamicTenantScheduler(PersistentScheduler):
logger.info(f"Found {len(existing_tenants)} existing tenants in schedule")
for tenant_id in tenant_ids:
if (
IGNORED_SYNCING_TENANT_LIST
and tenant_id in IGNORED_SYNCING_TENANT_LIST
):
logger.info(
f"Skipping tenant {tenant_id} as it is in the ignored syncing list"
)
continue
if tenant_id not in existing_tenants:
logger.info(f"Processing new tenant: {tenant_id}")

View File

@@ -6,6 +6,7 @@ from celery import signals
from celery import Task
from celery.signals import celeryd_init
from celery.signals import worker_init
from celery.signals import worker_process_init
from celery.signals import worker_ready
from celery.signals import worker_shutdown
@@ -81,6 +82,11 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
app_base.on_worker_shutdown(sender, **kwargs)
@worker_process_init.connect
def init_worker(**kwargs: Any) -> None:
SqlEngine.reset_engine()
@signals.setup_logging.connect
def on_setup_logging(
loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any

View File

@@ -1,96 +0,0 @@
from datetime import timedelta
from typing import Any
from celery.beat import PersistentScheduler # type: ignore
from celery.utils.log import get_task_logger
from danswer.db.engine import get_all_tenant_ids
from danswer.utils.variable_functionality import fetch_versioned_implementation
logger = get_task_logger(__name__)
class DynamicTenantScheduler(PersistentScheduler):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._reload_interval = timedelta(minutes=1)
self._last_reload = self.app.now() - self._reload_interval
def setup_schedule(self) -> None:
super().setup_schedule()
def tick(self) -> float:
retval = super().tick()
now = self.app.now()
if (
self._last_reload is None
or (now - self._last_reload) > self._reload_interval
):
logger.info("Reloading schedule to check for new tenants...")
self._update_tenant_tasks()
self._last_reload = now
return retval
def _update_tenant_tasks(self) -> None:
logger.info("Checking for tenant task updates...")
try:
tenant_ids = get_all_tenant_ids()
tasks_to_schedule = fetch_versioned_implementation(
"danswer.background.celery.tasks.beat_schedule", "get_tasks_to_schedule"
)
new_beat_schedule: dict[str, dict[str, Any]] = {}
current_schedule = getattr(self, "_store", {"entries": {}}).get(
"entries", {}
)
existing_tenants = set()
for task_name in current_schedule.keys():
if "-" in task_name:
existing_tenants.add(task_name.split("-")[-1])
for tenant_id in tenant_ids:
if tenant_id not in existing_tenants:
logger.info(f"Found new tenant: {tenant_id}")
for task in tasks_to_schedule():
task_name = f"{task['name']}-{tenant_id}"
new_task = {
"task": task["task"],
"schedule": task["schedule"],
"kwargs": {"tenant_id": tenant_id},
}
if options := task.get("options"):
new_task["options"] = options
new_beat_schedule[task_name] = new_task
if self._should_update_schedule(current_schedule, new_beat_schedule):
logger.info(
"Updating schedule",
extra={
"new_tasks": len(new_beat_schedule),
"current_tasks": len(current_schedule),
},
)
if not hasattr(self, "_store"):
self._store: dict[str, dict] = {"entries": {}}
self.update_from_dict(new_beat_schedule)
logger.info(f"New schedule: {new_beat_schedule}")
logger.info("Tenant tasks updated successfully")
else:
logger.debug("No schedule updates needed")
except (AttributeError, KeyError):
logger.exception("Failed to process task configuration")
except Exception:
logger.exception("Unexpected error updating tenant tasks")
def _should_update_schedule(
self, current_schedule: dict, new_schedule: dict
) -> bool:
"""Compare schedules to determine if an update is needed."""
current_tasks = set(current_schedule.keys())
new_tasks = set(new_schedule.keys())
return current_tasks != new_tasks

View File

@@ -8,7 +8,7 @@ tasks_to_schedule = [
{
"name": "check-for-vespa-sync",
"task": "check_for_vespa_sync_task",
"schedule": timedelta(seconds=5),
"schedule": timedelta(seconds=20),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
@@ -20,13 +20,13 @@ tasks_to_schedule = [
{
"name": "check-for-indexing",
"task": "check_for_indexing",
"schedule": timedelta(seconds=10),
"schedule": timedelta(seconds=15),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-prune",
"task": "check_for_pruning",
"schedule": timedelta(seconds=10),
"schedule": timedelta(seconds=15),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{

View File

@@ -29,18 +29,26 @@ JobStatusType = (
def _initializer(
func: Callable, args: list | tuple, kwargs: dict[str, Any] | None = None
) -> Any:
"""Ensure the parent proc's database connections are not touched
in the new connection pool
"""Initialize the child process with a fresh SQLAlchemy Engine.
Based on the recommended approach in the SQLAlchemy docs found:
Based on SQLAlchemy's recommendations to handle multiprocessing:
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
"""
if kwargs is None:
kwargs = {}
logger.info("Initializing spawned worker child process.")
# Reset the engine in the child process
SqlEngine.reset_engine()
# Optionally set a custom app name for database logging purposes
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
# Initialize a new engine with desired parameters
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
# Proceed with executing the target function
return func(*args, **kwargs)

View File

@@ -503,3 +503,7 @@ _API_KEY_HASH_ROUNDS_RAW = os.environ.get("API_KEY_HASH_ROUNDS")
API_KEY_HASH_ROUNDS = (
int(_API_KEY_HASH_ROUNDS_RAW) if _API_KEY_HASH_ROUNDS_RAW else None
)
POD_NAME = os.environ.get("POD_NAME")
POD_NAMESPACE = os.environ.get("POD_NAMESPACE")

View File

@@ -55,11 +55,11 @@ def validate_channel_names(
# Scaling configurations for multi-tenant Slack bot handling
TENANT_LOCK_EXPIRATION = 1800 # How long a pod can hold exclusive access to a tenant before other pods can acquire it
TENANT_HEARTBEAT_INTERVAL = (
60 # How often pods send heartbeats to indicate they are still processing a tenant
15 # How often pods send heartbeats to indicate they are still processing a tenant
)
TENANT_HEARTBEAT_EXPIRATION = 180 # How long before a tenant's heartbeat expires, allowing other pods to take over
TENANT_ACQUISITION_INTERVAL = (
60 # How often pods attempt to acquire unprocessed tenants
TENANT_HEARTBEAT_EXPIRATION = (
30 # How long before a tenant's heartbeat expires, allowing other pods to take over
)
TENANT_ACQUISITION_INTERVAL = 60 # How often pods attempt to acquire unprocessed tenants and checks for new tokens
MAX_TENANTS_PER_POD = int(os.getenv("MAX_TENANTS_PER_POD", 50))

View File

@@ -17,6 +17,8 @@ from slack_sdk import WebClient
from slack_sdk.socket_mode.request import SocketModeRequest
from slack_sdk.socket_mode.response import SocketModeResponse
from danswer.configs.app_configs import POD_NAME
from danswer.configs.app_configs import POD_NAMESPACE
from danswer.configs.constants import DanswerRedisLocks
from danswer.configs.constants import MessageType
from danswer.configs.danswerbot_configs import DANSWER_BOT_REPHRASE_MESSAGE
@@ -75,6 +77,7 @@ from danswer.search.retrieval.search_runner import download_nltk_data
from danswer.server.manage.models import SlackBotTokens
from danswer.utils.logger import setup_logger
from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
from shared_configs.configs import DISALLOWED_SLACK_BOT_TENANT_LIST
from shared_configs.configs import MODEL_SERVER_HOST
from shared_configs.configs import MODEL_SERVER_PORT
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
@@ -84,7 +87,9 @@ logger = setup_logger()
# Prometheus metric for HPA
active_tenants_gauge = Gauge(
"active_tenants", "Number of active tenants handled by this pod"
"active_tenants",
"Number of active tenants handled by this pod",
["namespace", "pod"],
)
# In rare cases, some users have been experiencing a massive amount of trivial messages coming through
@@ -147,7 +152,9 @@ class SlackbotHandler:
while not self._shutdown_event.is_set():
try:
self.acquire_tenants()
active_tenants_gauge.set(len(self.tenant_ids))
active_tenants_gauge.labels(namespace=POD_NAMESPACE, pod=POD_NAME).set(
len(self.tenant_ids)
)
logger.debug(f"Current active tenants: {len(self.tenant_ids)}")
except Exception as e:
logger.exception(f"Error in Slack acquisition: {e}")
@@ -164,9 +171,15 @@ class SlackbotHandler:
def acquire_tenants(self) -> None:
tenant_ids = get_all_tenant_ids()
logger.debug(f"Found {len(tenant_ids)} total tenants in Postgres")
for tenant_id in tenant_ids:
if (
DISALLOWED_SLACK_BOT_TENANT_LIST is not None
and tenant_id in DISALLOWED_SLACK_BOT_TENANT_LIST
):
logger.debug(f"Tenant {tenant_id} is in the disallowed list, skipping")
continue
if tenant_id in self.tenant_ids:
logger.debug(f"Tenant {tenant_id} already in self.tenant_ids")
continue
@@ -190,6 +203,9 @@ class SlackbotHandler:
continue
logger.debug(f"Acquired lock for tenant {tenant_id}")
self.tenant_ids.add(tenant_id)
for tenant_id in self.tenant_ids:
token = CURRENT_TENANT_ID_CONTEXTVAR.set(
tenant_id or POSTGRES_DEFAULT_SCHEMA
)
@@ -236,14 +252,14 @@ class SlackbotHandler:
self.slack_bot_tokens[tenant_id] = slack_bot_tokens
if tenant_id in self.socket_clients:
if self.socket_clients.get(tenant_id):
asyncio.run(self.socket_clients[tenant_id].close())
self.start_socket_client(tenant_id, slack_bot_tokens)
except KvKeyNotFoundError:
logger.debug(f"Missing Slack Bot tokens for tenant {tenant_id}")
if tenant_id in self.socket_clients:
if self.socket_clients.get(tenant_id):
asyncio.run(self.socket_clients[tenant_id].close())
del self.socket_clients[tenant_id]
del self.slack_bot_tokens[tenant_id]
@@ -277,14 +293,14 @@ class SlackbotHandler:
logger.info(f"Connecting socket client for tenant {tenant_id}")
socket_client.connect()
self.socket_clients[tenant_id] = socket_client
self.tenant_ids.add(tenant_id)
logger.info(f"Started SocketModeClient for tenant {tenant_id}")
def stop_socket_clients(self) -> None:
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
for tenant_id, client in self.socket_clients.items():
asyncio.run(client.close())
logger.info(f"Stopped SocketModeClient for tenant {tenant_id}")
if client:
asyncio.run(client.close())
logger.info(f"Stopped SocketModeClient for tenant {tenant_id}")
def shutdown(self, signum: int | None, frame: FrameType | None) -> None:
if not self.running:
@@ -298,6 +314,16 @@ class SlackbotHandler:
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
self.stop_socket_clients()
# Release locks for all tenants
logger.info(f"Releasing locks for {len(self.tenant_ids)} tenants")
for tenant_id in self.tenant_ids:
try:
redis_client = get_redis_client(tenant_id=tenant_id)
redis_client.delete(DanswerRedisLocks.SLACK_BOT_LOCK)
logger.info(f"Released lock for tenant {tenant_id}")
except Exception as e:
logger.error(f"Error releasing lock for tenant {tenant_id}: {e}")
# Wait for background threads to finish (with timeout)
logger.info("Waiting for background threads to finish...")
self.acquire_thread.join(timeout=5)

View File

@@ -189,6 +189,13 @@ class SqlEngine:
return ""
return cls._app_name
@classmethod
def reset_engine(cls) -> None:
with cls._lock:
if cls._engine:
cls._engine.dispose()
cls._engine = None
def get_all_tenant_ids() -> list[str] | list[None]:
if not MULTI_TENANT:

View File

@@ -63,6 +63,7 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
stmt = construct_document_select_for_connector_credential_pair_by_needs_sync(
cc_pair.connector_id, cc_pair.credential_id
)
for doc in db_session.scalars(stmt).yield_per(1):
current_time = time.monotonic()
if current_time - last_lock_time >= (

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,44 @@
[
{
"url": "https://docs.danswer.dev/more/use_cases/overview",
"title": "Use Cases Overview",
"content": "How to leverage Danswer in your organization\n\nDanswer Overview\nDanswer is the AI Assistant connected to your organization's docs, apps, and people. Danswer makes Generative AI more versatile for work by enabling new types of questions like \"What is the most common feature request we've heard from customers this month\". Whereas other AI systems have no context of your team and are generally unhelpful with work related questions, Danswer makes it possible to ask these questions in natural language and get back answers in seconds.\n\nDanswer can connect to +30 different tools and the use cases are not limited to the ones in the following pages. The highlighted use cases are for inspiration and come from feedback gathered from our users and customers.\n\n\nCommon Getting Started Questions:\n\nWhy are these docs connected in my Danswer deployment?\nAnswer: This is just an example of how connectors work in Danswer. You can connect up your own team's knowledge and you will be able to ask questions unique to your organization. Danswer will keep all of the knowledge up to date and in sync with your connected applications.\n\nIs my data being sent anywhere when I connect it up to Danswer?\nAnswer: No! Danswer is built with data security as our highest priority. We open sourced it so our users can know exactly what is going on with their data. By default all of the document processing happens within Danswer. The only time it is sent outward is for the GenAI call to generate answers.\n\nWhere is the feature for auto sync-ing document level access permissions from all connected sources?\nAnswer: This falls under the Enterprise Edition set of Danswer features built on top of the MIT/community edition. If you are on Danswer Cloud, you have access to them by default. If you're running it yourself, reach out to the Danswer team to receive access.",
"chunk_ind": 0
},
{
"url": "https://docs.danswer.dev/more/use_cases/enterprise_search",
"title": "Enterprise Search",
"content": "Value of Enterprise Search with Danswer\n\nWhat is Enterprise Search and why is it Important?\nAn Enterprise Search system gives team members a single place to access all of the disparate knowledge of an organization. Critical information is saved across a host of channels like call transcripts with prospects, engineering design docs, IT runbooks, customer support email exchanges, project management tickets, and more. As fast moving teams scale up, information gets spread out and more disorganized.\n\nSince it quickly becomes infeasible to check across every source, decisions get made on incomplete information, employee satisfaction decreases, and the most valuable members of your team are tied up with constant distractions as junior teammates are unable to unblock themselves. Danswer solves this problem by letting anyone on the team access all of the knowledge across your organization in a permissioned and secure way. Users can ask questions in natural language and get back answers and documents across all of the connected sources instantly.\n\nWhat's the real cost?\nA typical knowledge worker spends over 2 hours a week on search, but more than that, the cost of incomplete or incorrect information can be extremely high. Customer support/success that isn't able to find the reference to similar cases could cause hours or even days of delay leading to lower customer satisfaction or in the worst case - churn. An account exec not realizing that a prospect had previously mentioned a specific need could lead to lost deals. An engineer not realizing a similar feature had previously been built could result in weeks of wasted development time and tech debt with duplicate implementation. With a lack of knowledge, your whole organization is navigating in the dark - inefficient and mistake prone.",
"chunk_ind": 0
},
{
"url": "https://docs.danswer.dev/more/use_cases/enterprise_search",
"title": "Enterprise Search",
"content": "More than Search\nWhen analyzing the entire corpus of knowledge within your company is as easy as asking a question in a search bar, your entire team can stay informed and up to date. Danswer also makes it trivial to identify where knowledge is well documented and where it is lacking. Team members who are centers of knowledge can begin to effectively document their expertise since it is no longer being thrown into a black hole. All of this allows the organization to achieve higher efficiency and drive business outcomes.\n\nWith Generative AI, the entire user experience has evolved as well. For example, instead of just finding similar cases for your customer support team to reference, Danswer breaks down the issue and explains it so that even the most junior members can understand it. This in turn lets them give the most holistic and technically accurate response possible to your customers. On the other end, even the super stars of your sales team will not be able to review 10 hours of transcripts before hopping on that critical call, but Danswer can easily parse through it in mere seconds and give crucial context to help your team close.",
"chunk_ind": 0
},
{
"url": "https://docs.danswer.dev/more/use_cases/ai_platform",
"title": "AI Platform",
"content": "Build AI Agents powered by the knowledge and workflows specific to your organization.\n\nBeyond Answers\nAgents enabled by generative AI and reasoning capable models are helping teams to automate their work. Danswer is helping teams make it happen. Danswer provides out of the box user chat sessions, attaching custom tools, handling LLM reasoning, code execution, data analysis, referencing internal knowledge, and much more.\n\nDanswer as a platform is not a no-code agent builder. We are made by developers for developers and this gives your team the full flexibility and power to create agents not constrained by blocks and simple logic paths.\n\nFlexibility and Extensibility\nDanswer is open source and completely whitebox. This not only gives transparency to what happens within the system but also means that your team can directly modify the source code to suit your unique needs.",
"chunk_ind": 0
},
{
"url": "https://docs.danswer.dev/more/use_cases/customer_support",
"title": "Customer Support",
"content": "Help your customer support team instantly answer any question across your entire product.\n\nAI Enabled Support\nCustomer support agents have one of the highest breadth jobs. They field requests that cover the entire surface area of the product and need to help your users find success on extremely short timelines. Because they're not the same people who designed or built the system, they often lack the depth of understanding needed - resulting in delays and escalations to other teams. Modern teams are leveraging AI to help their CS team optimize the speed and quality of these critical customer-facing interactions.\n\nThe Importance of Context\nThere are two critical components of AI copilots for customer support. The first is that the AI system needs to be connected with as much information as possible (not just support tools like Zendesk or Intercom) and that the knowledge needs to be as fresh as possible. Sometimes a fix might even be in places rarely checked by CS such as pull requests in a code repository. The second critical component is the ability of the AI system to break down difficult concepts and convoluted processes into more digestible descriptions and for your team members to be able to chat back and forth with the system to build a better understanding.\n\nDanswer takes care of both of these. The system connects up to over 30+ different applications and the knowledge is pulled in constantly so that the information access is always up to date.",
"chunk_ind": 0
},
{
"url": "https://docs.danswer.dev/more/use_cases/sales",
"title": "Sales",
"content": "Keep your team up to date on every conversation and update so they can close.\n\nRecall Every Detail\nBeing able to instantly revisit every detail of any call without reading transcripts is helping Sales teams provide more tailored pitches, build stronger relationships, and close more deals. Instead of searching and reading through hours of transcripts in preparation for a call, your team can now ask Danswer \"What specific features was ACME interested in seeing for the demo\". Since your team doesn't have time to read every transcript prior to a call, Danswer provides a more thorough summary because it can instantly parse hundreds of pages and distill out the relevant information. Even for fast lookups it becomes much more convenient - for example to brush up on connection building topics by asking \"What rapport building topic did we chat about in the last call with ACME\".\n\nKnow Every Product Update\nIt is impossible for Sales teams to keep up with every product update. Because of this, when a prospect has a question that the Sales team does not know, they have no choice but to rely on the Product and Engineering orgs to get an authoritative answer. Not only is this distracting to the other teams, it also slows down the time to respond to the prospect (and as we know, time is the biggest killer of deals). With Danswer, it is even possible to get answers live on call because of how fast accessing information becomes. A question like \"Have we shipped the Microsoft AD integration yet?\" can now be answered in seconds meaning that prospects can get answers while on the call instead of asynchronously and sales cycles are reduced as a result.",
"chunk_ind": 0
},
{
"url": "https://docs.danswer.dev/more/use_cases/operations",
"title": "Operations",
"content": "Double the productivity of your Ops teams like IT, HR, etc.\n\nAutomatically Resolve Tickets\nModern teams are leveraging AI to auto-resolve up to 50% of tickets. Whether it is an employee asking about benefits details or how to set up the VPN for remote work, Danswer can help your team help themselves. This frees up your team to do the real impactful work of landing star candidates or improving your internal processes.\n\nAI Aided Onboarding\nOne of the periods where your team needs the most help is when they're just ramping up. Instead of feeling lost in dozens of new tools, Danswer gives them a single place where they can ask about anything in natural language. Whether it's how to set up their work environment or what their onboarding goals are, Danswer can walk them through every step with the help of Generative AI. This lets your team feel more empowered and gives time back to the more seasoned members of your team to focus on moving the needle.",
"chunk_ind": 0
}
]

View File

@@ -32,7 +32,7 @@ from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.server.documents.models import ConnectorBase
from danswer.utils.logger import setup_logger
from danswer.utils.retry_wrapper import retry_builder
from danswer.utils.variable_functionality import fetch_versioned_implementation
logger = setup_logger()
@@ -91,7 +91,21 @@ def _create_indexable_chunks(
return list(ids_to_documents.values()), chunks
def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None:
# Cohere is used in EE version
def load_processed_docs(cohere_enabled: bool) -> list[dict]:
initial_docs_path = os.path.join(
os.getcwd(),
"danswer",
"seeding",
"initial_docs.json",
)
processed_docs = json.load(open(initial_docs_path))
return processed_docs
def seed_initial_documents(
db_session: Session, tenant_id: str | None, cohere_enabled: bool = False
) -> None:
"""
Seed initial documents so users don't have an empty index to start
@@ -132,7 +146,9 @@ def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None:
return
search_settings = get_current_search_settings(db_session)
if search_settings.model_name != DEFAULT_DOCUMENT_ENCODER_MODEL:
if search_settings.model_name != DEFAULT_DOCUMENT_ENCODER_MODEL and not (
search_settings.model_name == "embed-english-v3.0" and cohere_enabled
):
logger.info("Embedding model has been updated, skipping")
return
@@ -172,11 +188,10 @@ def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None:
last_successful_index_time=last_index_time,
)
cc_pair_id = cast(int, result.data)
initial_docs_path = os.path.join(
os.getcwd(), "danswer", "seeding", "initial_docs.json"
)
processed_docs = json.load(open(initial_docs_path))
processed_docs = fetch_versioned_implementation(
"danswer.seeding.load_docs",
"load_processed_docs",
)(cohere_enabled)
docs, chunks = _create_indexable_chunks(processed_docs, tenant_id)

View File

@@ -59,7 +59,9 @@ from shared_configs.model_server_models import SupportedEmbeddingModel
logger = setup_logger()
def setup_danswer(db_session: Session, tenant_id: str | None) -> None:
def setup_danswer(
db_session: Session, tenant_id: str | None, cohere_enabled: bool = False
) -> None:
"""
Setup Danswer for a particular tenant. In the Single Tenant case, it will set it up for the default schema
on server startup. In the MT case, it will be called when the tenant is created.
@@ -148,7 +150,7 @@ def setup_danswer(db_session: Session, tenant_id: str | None) -> None:
# update multipass indexing setting based on GPU availability
update_default_multipass_indexing(db_session)
seed_initial_documents(db_session, tenant_id)
seed_initial_documents(db_session, tenant_id, cohere_enabled)
def translate_saved_search_settings(db_session: Session) -> None:

View File

@@ -0,0 +1,45 @@
import json
import os
from typing import cast
from typing import List
from cohere import Client
from ee.danswer.configs.app_configs import COHERE_DEFAULT_API_KEY
Embedding = List[float]
def load_processed_docs(cohere_enabled: bool) -> list[dict]:
base_path = os.path.join(os.getcwd(), "danswer", "seeding")
if cohere_enabled and COHERE_DEFAULT_API_KEY:
initial_docs_path = os.path.join(base_path, "initial_docs_cohere.json")
processed_docs = json.load(open(initial_docs_path))
cohere_client = Client(api_key=COHERE_DEFAULT_API_KEY)
embed_model = "embed-english-v3.0"
for doc in processed_docs:
title_embed_response = cohere_client.embed(
texts=[doc["title"]],
model=embed_model,
input_type="search_document",
)
content_embed_response = cohere_client.embed(
texts=[doc["content"]],
model=embed_model,
input_type="search_document",
)
doc["title_embedding"] = cast(
List[Embedding], title_embed_response.embeddings
)[0]
doc["content_embedding"] = cast(
List[Embedding], content_embed_response.embeddings
)[0]
else:
initial_docs_path = os.path.join(base_path, "initial_docs.json")
processed_docs = json.load(open(initial_docs_path))
return processed_docs

View File

@@ -4,6 +4,7 @@ import uuid
import aiohttp # Async HTTP client
from fastapi import HTTPException
from sqlalchemy import select
from sqlalchemy.orm import Session
from danswer.auth.users import exceptions
@@ -13,6 +14,8 @@ from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.llm import update_default_provider
from danswer.db.llm import upsert_cloud_embedding_provider
from danswer.db.llm import upsert_llm_provider
from danswer.db.models import IndexModelStatus
from danswer.db.models import SearchSettings
from danswer.db.models import UserTenantMapping
from danswer.llm.llm_provider_options import ANTHROPIC_MODEL_NAMES
from danswer.llm.llm_provider_options import ANTHROPIC_PROVIDER_NAME
@@ -102,9 +105,19 @@ async def provision_tenant(tenant_id: str, email: str) -> None:
await asyncio.to_thread(run_alembic_migrations, tenant_id)
with get_session_with_tenant(tenant_id) as db_session:
setup_danswer(db_session, tenant_id)
configure_default_api_keys(db_session)
current_search_settings = (
db_session.query(SearchSettings)
.filter_by(status=IndexModelStatus.FUTURE)
.first()
)
cohere_enabled = (
current_search_settings is not None
and current_search_settings.provider_type == EmbeddingProvider.COHERE
)
setup_danswer(db_session, tenant_id, cohere_enabled=cohere_enabled)
add_users_to_tenant([email], tenant_id)
except Exception as e:
@@ -200,11 +213,51 @@ def configure_default_api_keys(db_session: Session) -> None:
provider_type=EmbeddingProvider.COHERE,
api_key=COHERE_DEFAULT_API_KEY,
)
try:
logger.info("Attempting to upsert Cohere cloud embedding provider")
upsert_cloud_embedding_provider(db_session, cloud_embedding_provider)
except Exception as e:
logger.error(f"Failed to configure Cohere embedding provider: {e}")
logger.info("Successfully upserted Cohere cloud embedding provider")
logger.info("Updating search settings with Cohere embedding model details")
query = (
select(SearchSettings)
.where(SearchSettings.status == IndexModelStatus.FUTURE)
.order_by(SearchSettings.id.desc())
)
result = db_session.execute(query)
current_search_settings = result.scalars().first()
if current_search_settings:
current_search_settings.model_name = (
"embed-english-v3.0" # Cohere's latest model as of now
)
current_search_settings.model_dim = (
1024 # Cohere's embed-english-v3.0 dimension
)
current_search_settings.provider_type = EmbeddingProvider.COHERE
current_search_settings.index_name = (
"danswer_chunk_cohere_embed_english_v3_0"
)
current_search_settings.query_prefix = ""
current_search_settings.passage_prefix = ""
db_session.commit()
else:
raise RuntimeError(
"No search settings specified, DB is not in a valid state"
)
logger.info("Fetching updated search settings to verify changes")
updated_query = (
select(SearchSettings)
.where(SearchSettings.status == IndexModelStatus.PRESENT)
.order_by(SearchSettings.id.desc())
)
updated_result = db_session.execute(updated_query)
updated_result.scalars().first()
except Exception:
logger.exception("Failed to configure Cohere embedding provider")
else:
logger.error(
logger.info(
"COHERE_DEFAULT_API_KEY not set, skipping Cohere embedding provider configuration"
)

View File

@@ -26,4 +26,5 @@ lxml==5.3.0
lxml_html_clean==0.2.2
boto3-stubs[s3]==1.34.133
pandas==2.2.3
pandas-stubs==2.2.3.241009
pandas-stubs==2.2.3.241009
cohere==5.6.1

View File

@@ -142,6 +142,20 @@ async def async_return_default_schema(*args: Any, **kwargs: Any) -> str:
# Prefix used for all tenant ids
TENANT_ID_PREFIX = "tenant_"
DISALLOWED_SLACK_BOT_TENANT_IDS = os.environ.get("DISALLOWED_SLACK_BOT_TENANT_IDS")
DISALLOWED_SLACK_BOT_TENANT_LIST = (
[tenant.strip() for tenant in DISALLOWED_SLACK_BOT_TENANT_IDS.split(",")]
if DISALLOWED_SLACK_BOT_TENANT_IDS
else None
)
IGNORED_SYNCING_TENANT_IDS = os.environ.get("IGNORED_SYNCING_TENANT_IDS")
IGNORED_SYNCING_TENANT_LIST = (
[tenant.strip() for tenant in IGNORED_SYNCING_TENANT_IDS.split(",")]
if IGNORED_SYNCING_TENANT_IDS
else None
)
SUPPORTED_EMBEDDING_MODELS = [
# Cloud-based models
SupportedEmbeddingModel(

View File

@@ -9,12 +9,11 @@ spec:
scaleTargetRef:
name: celery-worker-indexing
minReplicaCount: 1
maxReplicaCount: 10
maxReplicaCount: 30
triggers:
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
port: "6379"
enableTLS: "true"
listName: connector_indexing
@@ -22,10 +21,10 @@ spec:
databaseIndex: "15"
authenticationRef:
name: celery-worker-auth
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
port: "6379"
enableTLS: "true"
listName: connector_indexing:2
@@ -36,7 +35,6 @@ spec:
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
port: "6379"
enableTLS: "true"
listName: connector_indexing:3
@@ -44,3 +42,12 @@ spec:
databaseIndex: "15"
authenticationRef:
name: celery-worker-auth
- type: cpu
metadata:
type: Utilization
value: "70"
- type: memory
metadata:
type: Utilization
value: "70"

View File

@@ -8,12 +8,11 @@ metadata:
spec:
scaleTargetRef:
name: celery-worker-light
minReplicaCount: 1
minReplicaCount: 5
maxReplicaCount: 20
triggers:
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync
@@ -23,7 +22,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync:2
@@ -33,7 +31,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync:3
@@ -43,7 +40,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: connector_deletion
@@ -53,7 +49,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: connector_deletion:2

View File

@@ -15,7 +15,6 @@ spec:
triggers:
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: celery
@@ -26,7 +25,6 @@ spec:
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: celery:1
@@ -36,7 +34,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: celery:2
@@ -46,7 +43,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: celery:3
@@ -56,7 +52,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: periodic_tasks
@@ -66,7 +61,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: periodic_tasks:2

View File

@@ -0,0 +1,19 @@
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: indexing-model-server-scaledobject
namespace: danswer
labels:
app: indexing-model-server
spec:
scaleTargetRef:
name: indexing-model-server-deployment
pollingInterval: 15 # Check every 15 seconds
cooldownPeriod: 30 # Wait 30 seconds before scaling down
minReplicaCount: 1
maxReplicaCount: 14
triggers:
- type: cpu
metadata:
type: Utilization
value: "70"

View File

@@ -5,5 +5,5 @@ metadata:
namespace: danswer
type: Opaque
data:
host: { { base64-encoded-hostname } }
password: { { base64-encoded-password } }
host: { base64 encoded host here }
password: { base64 encoded password here }

View File

@@ -14,8 +14,8 @@ spec:
spec:
containers:
- name: celery-beat
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
imagePullPolicy: Always
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.12
imagePullPolicy: IfNotPresent
command:
[
"celery",

View File

@@ -14,8 +14,8 @@ spec:
spec:
containers:
- name: celery-worker-heavy
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
imagePullPolicy: Always
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.12
imagePullPolicy: IfNotPresent
command:
[
"celery",

View File

@@ -14,8 +14,8 @@ spec:
spec:
containers:
- name: celery-worker-indexing
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
imagePullPolicy: Always
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.12
imagePullPolicy: IfNotPresent
command:
[
"celery",
@@ -47,10 +47,10 @@ spec:
resources:
requests:
cpu: "500m"
memory: "1Gi"
memory: "4Gi"
limits:
cpu: "1000m"
memory: "2Gi"
memory: "8Gi"
volumes:
- name: vespa-certificates
secret:

View File

@@ -14,8 +14,8 @@ spec:
spec:
containers:
- name: celery-worker-light
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
imagePullPolicy: Always
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.12
imagePullPolicy: IfNotPresent
command:
[
"celery",

View File

@@ -14,8 +14,8 @@ spec:
spec:
containers:
- name: celery-worker-primary
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
imagePullPolicy: Always
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.12
imagePullPolicy: IfNotPresent
command:
[
"celery",