mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-02-16 23:35:46 +00:00
Compare commits
17 Commits
v1.0.0-clo
...
v0.12.0-cl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cd806356d2 | ||
|
|
05cb9f7fe4 | ||
|
|
7128b43458 | ||
|
|
a1711a3e24 | ||
|
|
3409e768f6 | ||
|
|
028789b83f | ||
|
|
f2144f7453 | ||
|
|
c614ca9828 | ||
|
|
379d569c61 | ||
|
|
53f9d94ceb | ||
|
|
5058d898b8 | ||
|
|
bc7de4ec1b | ||
|
|
3ad98078f5 | ||
|
|
0fb12b42f1 | ||
|
|
158329a3cc | ||
|
|
7f1a50823b | ||
|
|
0e76bcef45 |
@@ -12,6 +12,7 @@ from danswer.db.engine import get_all_tenant_ids
|
||||
from danswer.db.engine import SqlEngine
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.variable_functionality import fetch_versioned_implementation
|
||||
from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
|
||||
logger = setup_logger(__name__)
|
||||
@@ -72,6 +73,15 @@ class DynamicTenantScheduler(PersistentScheduler):
|
||||
logger.info(f"Found {len(existing_tenants)} existing tenants in schedule")
|
||||
|
||||
for tenant_id in tenant_ids:
|
||||
if (
|
||||
IGNORED_SYNCING_TENANT_LIST
|
||||
and tenant_id in IGNORED_SYNCING_TENANT_LIST
|
||||
):
|
||||
logger.info(
|
||||
f"Skipping tenant {tenant_id} as it is in the ignored syncing list"
|
||||
)
|
||||
continue
|
||||
|
||||
if tenant_id not in existing_tenants:
|
||||
logger.info(f"Processing new tenant: {tenant_id}")
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ from celery import signals
|
||||
from celery import Task
|
||||
from celery.signals import celeryd_init
|
||||
from celery.signals import worker_init
|
||||
from celery.signals import worker_process_init
|
||||
from celery.signals import worker_ready
|
||||
from celery.signals import worker_shutdown
|
||||
|
||||
@@ -81,6 +82,11 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
|
||||
app_base.on_worker_shutdown(sender, **kwargs)
|
||||
|
||||
|
||||
@worker_process_init.connect
|
||||
def init_worker(**kwargs: Any) -> None:
|
||||
SqlEngine.reset_engine()
|
||||
|
||||
|
||||
@signals.setup_logging.connect
|
||||
def on_setup_logging(
|
||||
loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any
|
||||
|
||||
@@ -1,96 +0,0 @@
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
|
||||
from celery.beat import PersistentScheduler # type: ignore
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
from danswer.db.engine import get_all_tenant_ids
|
||||
from danswer.utils.variable_functionality import fetch_versioned_implementation
|
||||
|
||||
logger = get_task_logger(__name__)
|
||||
|
||||
|
||||
class DynamicTenantScheduler(PersistentScheduler):
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
self._reload_interval = timedelta(minutes=1)
|
||||
self._last_reload = self.app.now() - self._reload_interval
|
||||
|
||||
def setup_schedule(self) -> None:
|
||||
super().setup_schedule()
|
||||
|
||||
def tick(self) -> float:
|
||||
retval = super().tick()
|
||||
now = self.app.now()
|
||||
if (
|
||||
self._last_reload is None
|
||||
or (now - self._last_reload) > self._reload_interval
|
||||
):
|
||||
logger.info("Reloading schedule to check for new tenants...")
|
||||
self._update_tenant_tasks()
|
||||
self._last_reload = now
|
||||
return retval
|
||||
|
||||
def _update_tenant_tasks(self) -> None:
|
||||
logger.info("Checking for tenant task updates...")
|
||||
try:
|
||||
tenant_ids = get_all_tenant_ids()
|
||||
tasks_to_schedule = fetch_versioned_implementation(
|
||||
"danswer.background.celery.tasks.beat_schedule", "get_tasks_to_schedule"
|
||||
)
|
||||
|
||||
new_beat_schedule: dict[str, dict[str, Any]] = {}
|
||||
|
||||
current_schedule = getattr(self, "_store", {"entries": {}}).get(
|
||||
"entries", {}
|
||||
)
|
||||
|
||||
existing_tenants = set()
|
||||
for task_name in current_schedule.keys():
|
||||
if "-" in task_name:
|
||||
existing_tenants.add(task_name.split("-")[-1])
|
||||
|
||||
for tenant_id in tenant_ids:
|
||||
if tenant_id not in existing_tenants:
|
||||
logger.info(f"Found new tenant: {tenant_id}")
|
||||
|
||||
for task in tasks_to_schedule():
|
||||
task_name = f"{task['name']}-{tenant_id}"
|
||||
new_task = {
|
||||
"task": task["task"],
|
||||
"schedule": task["schedule"],
|
||||
"kwargs": {"tenant_id": tenant_id},
|
||||
}
|
||||
if options := task.get("options"):
|
||||
new_task["options"] = options
|
||||
new_beat_schedule[task_name] = new_task
|
||||
|
||||
if self._should_update_schedule(current_schedule, new_beat_schedule):
|
||||
logger.info(
|
||||
"Updating schedule",
|
||||
extra={
|
||||
"new_tasks": len(new_beat_schedule),
|
||||
"current_tasks": len(current_schedule),
|
||||
},
|
||||
)
|
||||
if not hasattr(self, "_store"):
|
||||
self._store: dict[str, dict] = {"entries": {}}
|
||||
self.update_from_dict(new_beat_schedule)
|
||||
logger.info(f"New schedule: {new_beat_schedule}")
|
||||
|
||||
logger.info("Tenant tasks updated successfully")
|
||||
else:
|
||||
logger.debug("No schedule updates needed")
|
||||
|
||||
except (AttributeError, KeyError):
|
||||
logger.exception("Failed to process task configuration")
|
||||
except Exception:
|
||||
logger.exception("Unexpected error updating tenant tasks")
|
||||
|
||||
def _should_update_schedule(
|
||||
self, current_schedule: dict, new_schedule: dict
|
||||
) -> bool:
|
||||
"""Compare schedules to determine if an update is needed."""
|
||||
current_tasks = set(current_schedule.keys())
|
||||
new_tasks = set(new_schedule.keys())
|
||||
return current_tasks != new_tasks
|
||||
@@ -8,7 +8,7 @@ tasks_to_schedule = [
|
||||
{
|
||||
"name": "check-for-vespa-sync",
|
||||
"task": "check_for_vespa_sync_task",
|
||||
"schedule": timedelta(seconds=5),
|
||||
"schedule": timedelta(seconds=20),
|
||||
"options": {"priority": DanswerCeleryPriority.HIGH},
|
||||
},
|
||||
{
|
||||
@@ -20,13 +20,13 @@ tasks_to_schedule = [
|
||||
{
|
||||
"name": "check-for-indexing",
|
||||
"task": "check_for_indexing",
|
||||
"schedule": timedelta(seconds=10),
|
||||
"schedule": timedelta(seconds=15),
|
||||
"options": {"priority": DanswerCeleryPriority.HIGH},
|
||||
},
|
||||
{
|
||||
"name": "check-for-prune",
|
||||
"task": "check_for_pruning",
|
||||
"schedule": timedelta(seconds=10),
|
||||
"schedule": timedelta(seconds=15),
|
||||
"options": {"priority": DanswerCeleryPriority.HIGH},
|
||||
},
|
||||
{
|
||||
|
||||
@@ -29,18 +29,26 @@ JobStatusType = (
|
||||
def _initializer(
|
||||
func: Callable, args: list | tuple, kwargs: dict[str, Any] | None = None
|
||||
) -> Any:
|
||||
"""Ensure the parent proc's database connections are not touched
|
||||
in the new connection pool
|
||||
"""Initialize the child process with a fresh SQLAlchemy Engine.
|
||||
|
||||
Based on the recommended approach in the SQLAlchemy docs found:
|
||||
Based on SQLAlchemy's recommendations to handle multiprocessing:
|
||||
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
|
||||
"""
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
|
||||
logger.info("Initializing spawned worker child process.")
|
||||
|
||||
# Reset the engine in the child process
|
||||
SqlEngine.reset_engine()
|
||||
|
||||
# Optionally set a custom app name for database logging purposes
|
||||
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
|
||||
|
||||
# Initialize a new engine with desired parameters
|
||||
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
|
||||
|
||||
# Proceed with executing the target function
|
||||
return func(*args, **kwargs)
|
||||
|
||||
|
||||
|
||||
@@ -503,3 +503,7 @@ _API_KEY_HASH_ROUNDS_RAW = os.environ.get("API_KEY_HASH_ROUNDS")
|
||||
API_KEY_HASH_ROUNDS = (
|
||||
int(_API_KEY_HASH_ROUNDS_RAW) if _API_KEY_HASH_ROUNDS_RAW else None
|
||||
)
|
||||
|
||||
|
||||
POD_NAME = os.environ.get("POD_NAME")
|
||||
POD_NAMESPACE = os.environ.get("POD_NAMESPACE")
|
||||
|
||||
@@ -55,11 +55,11 @@ def validate_channel_names(
|
||||
# Scaling configurations for multi-tenant Slack bot handling
|
||||
TENANT_LOCK_EXPIRATION = 1800 # How long a pod can hold exclusive access to a tenant before other pods can acquire it
|
||||
TENANT_HEARTBEAT_INTERVAL = (
|
||||
60 # How often pods send heartbeats to indicate they are still processing a tenant
|
||||
15 # How often pods send heartbeats to indicate they are still processing a tenant
|
||||
)
|
||||
TENANT_HEARTBEAT_EXPIRATION = 180 # How long before a tenant's heartbeat expires, allowing other pods to take over
|
||||
TENANT_ACQUISITION_INTERVAL = (
|
||||
60 # How often pods attempt to acquire unprocessed tenants
|
||||
TENANT_HEARTBEAT_EXPIRATION = (
|
||||
30 # How long before a tenant's heartbeat expires, allowing other pods to take over
|
||||
)
|
||||
TENANT_ACQUISITION_INTERVAL = 60 # How often pods attempt to acquire unprocessed tenants and checks for new tokens
|
||||
|
||||
MAX_TENANTS_PER_POD = int(os.getenv("MAX_TENANTS_PER_POD", 50))
|
||||
|
||||
@@ -17,6 +17,8 @@ from slack_sdk import WebClient
|
||||
from slack_sdk.socket_mode.request import SocketModeRequest
|
||||
from slack_sdk.socket_mode.response import SocketModeResponse
|
||||
|
||||
from danswer.configs.app_configs import POD_NAME
|
||||
from danswer.configs.app_configs import POD_NAMESPACE
|
||||
from danswer.configs.constants import DanswerRedisLocks
|
||||
from danswer.configs.constants import MessageType
|
||||
from danswer.configs.danswerbot_configs import DANSWER_BOT_REPHRASE_MESSAGE
|
||||
@@ -75,6 +77,7 @@ from danswer.search.retrieval.search_runner import download_nltk_data
|
||||
from danswer.server.manage.models import SlackBotTokens
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
|
||||
from shared_configs.configs import DISALLOWED_SLACK_BOT_TENANT_LIST
|
||||
from shared_configs.configs import MODEL_SERVER_HOST
|
||||
from shared_configs.configs import MODEL_SERVER_PORT
|
||||
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
|
||||
@@ -84,7 +87,9 @@ logger = setup_logger()
|
||||
|
||||
# Prometheus metric for HPA
|
||||
active_tenants_gauge = Gauge(
|
||||
"active_tenants", "Number of active tenants handled by this pod"
|
||||
"active_tenants",
|
||||
"Number of active tenants handled by this pod",
|
||||
["namespace", "pod"],
|
||||
)
|
||||
|
||||
# In rare cases, some users have been experiencing a massive amount of trivial messages coming through
|
||||
@@ -147,7 +152,9 @@ class SlackbotHandler:
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
self.acquire_tenants()
|
||||
active_tenants_gauge.set(len(self.tenant_ids))
|
||||
active_tenants_gauge.labels(namespace=POD_NAMESPACE, pod=POD_NAME).set(
|
||||
len(self.tenant_ids)
|
||||
)
|
||||
logger.debug(f"Current active tenants: {len(self.tenant_ids)}")
|
||||
except Exception as e:
|
||||
logger.exception(f"Error in Slack acquisition: {e}")
|
||||
@@ -164,9 +171,15 @@ class SlackbotHandler:
|
||||
|
||||
def acquire_tenants(self) -> None:
|
||||
tenant_ids = get_all_tenant_ids()
|
||||
logger.debug(f"Found {len(tenant_ids)} total tenants in Postgres")
|
||||
|
||||
for tenant_id in tenant_ids:
|
||||
if (
|
||||
DISALLOWED_SLACK_BOT_TENANT_LIST is not None
|
||||
and tenant_id in DISALLOWED_SLACK_BOT_TENANT_LIST
|
||||
):
|
||||
logger.debug(f"Tenant {tenant_id} is in the disallowed list, skipping")
|
||||
continue
|
||||
|
||||
if tenant_id in self.tenant_ids:
|
||||
logger.debug(f"Tenant {tenant_id} already in self.tenant_ids")
|
||||
continue
|
||||
@@ -190,6 +203,9 @@ class SlackbotHandler:
|
||||
continue
|
||||
|
||||
logger.debug(f"Acquired lock for tenant {tenant_id}")
|
||||
self.tenant_ids.add(tenant_id)
|
||||
|
||||
for tenant_id in self.tenant_ids:
|
||||
token = CURRENT_TENANT_ID_CONTEXTVAR.set(
|
||||
tenant_id or POSTGRES_DEFAULT_SCHEMA
|
||||
)
|
||||
@@ -236,14 +252,14 @@ class SlackbotHandler:
|
||||
|
||||
self.slack_bot_tokens[tenant_id] = slack_bot_tokens
|
||||
|
||||
if tenant_id in self.socket_clients:
|
||||
if self.socket_clients.get(tenant_id):
|
||||
asyncio.run(self.socket_clients[tenant_id].close())
|
||||
|
||||
self.start_socket_client(tenant_id, slack_bot_tokens)
|
||||
|
||||
except KvKeyNotFoundError:
|
||||
logger.debug(f"Missing Slack Bot tokens for tenant {tenant_id}")
|
||||
if tenant_id in self.socket_clients:
|
||||
if self.socket_clients.get(tenant_id):
|
||||
asyncio.run(self.socket_clients[tenant_id].close())
|
||||
del self.socket_clients[tenant_id]
|
||||
del self.slack_bot_tokens[tenant_id]
|
||||
@@ -277,14 +293,14 @@ class SlackbotHandler:
|
||||
logger.info(f"Connecting socket client for tenant {tenant_id}")
|
||||
socket_client.connect()
|
||||
self.socket_clients[tenant_id] = socket_client
|
||||
self.tenant_ids.add(tenant_id)
|
||||
logger.info(f"Started SocketModeClient for tenant {tenant_id}")
|
||||
|
||||
def stop_socket_clients(self) -> None:
|
||||
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
|
||||
for tenant_id, client in self.socket_clients.items():
|
||||
asyncio.run(client.close())
|
||||
logger.info(f"Stopped SocketModeClient for tenant {tenant_id}")
|
||||
if client:
|
||||
asyncio.run(client.close())
|
||||
logger.info(f"Stopped SocketModeClient for tenant {tenant_id}")
|
||||
|
||||
def shutdown(self, signum: int | None, frame: FrameType | None) -> None:
|
||||
if not self.running:
|
||||
@@ -298,6 +314,16 @@ class SlackbotHandler:
|
||||
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
|
||||
self.stop_socket_clients()
|
||||
|
||||
# Release locks for all tenants
|
||||
logger.info(f"Releasing locks for {len(self.tenant_ids)} tenants")
|
||||
for tenant_id in self.tenant_ids:
|
||||
try:
|
||||
redis_client = get_redis_client(tenant_id=tenant_id)
|
||||
redis_client.delete(DanswerRedisLocks.SLACK_BOT_LOCK)
|
||||
logger.info(f"Released lock for tenant {tenant_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error releasing lock for tenant {tenant_id}: {e}")
|
||||
|
||||
# Wait for background threads to finish (with timeout)
|
||||
logger.info("Waiting for background threads to finish...")
|
||||
self.acquire_thread.join(timeout=5)
|
||||
|
||||
@@ -189,6 +189,13 @@ class SqlEngine:
|
||||
return ""
|
||||
return cls._app_name
|
||||
|
||||
@classmethod
|
||||
def reset_engine(cls) -> None:
|
||||
with cls._lock:
|
||||
if cls._engine:
|
||||
cls._engine.dispose()
|
||||
cls._engine = None
|
||||
|
||||
|
||||
def get_all_tenant_ids() -> list[str] | list[None]:
|
||||
if not MULTI_TENANT:
|
||||
|
||||
@@ -63,6 +63,7 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
|
||||
stmt = construct_document_select_for_connector_credential_pair_by_needs_sync(
|
||||
cc_pair.connector_id, cc_pair.credential_id
|
||||
)
|
||||
|
||||
for doc in db_session.scalars(stmt).yield_per(1):
|
||||
current_time = time.monotonic()
|
||||
if current_time - last_lock_time >= (
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
44
backend/danswer/seeding/initial_docs_cohere.json
Normal file
44
backend/danswer/seeding/initial_docs_cohere.json
Normal file
@@ -0,0 +1,44 @@
|
||||
[
|
||||
{
|
||||
"url": "https://docs.danswer.dev/more/use_cases/overview",
|
||||
"title": "Use Cases Overview",
|
||||
"content": "How to leverage Danswer in your organization\n\nDanswer Overview\nDanswer is the AI Assistant connected to your organization's docs, apps, and people. Danswer makes Generative AI more versatile for work by enabling new types of questions like \"What is the most common feature request we've heard from customers this month\". Whereas other AI systems have no context of your team and are generally unhelpful with work related questions, Danswer makes it possible to ask these questions in natural language and get back answers in seconds.\n\nDanswer can connect to +30 different tools and the use cases are not limited to the ones in the following pages. The highlighted use cases are for inspiration and come from feedback gathered from our users and customers.\n\n\nCommon Getting Started Questions:\n\nWhy are these docs connected in my Danswer deployment?\nAnswer: This is just an example of how connectors work in Danswer. You can connect up your own team's knowledge and you will be able to ask questions unique to your organization. Danswer will keep all of the knowledge up to date and in sync with your connected applications.\n\nIs my data being sent anywhere when I connect it up to Danswer?\nAnswer: No! Danswer is built with data security as our highest priority. We open sourced it so our users can know exactly what is going on with their data. By default all of the document processing happens within Danswer. The only time it is sent outward is for the GenAI call to generate answers.\n\nWhere is the feature for auto sync-ing document level access permissions from all connected sources?\nAnswer: This falls under the Enterprise Edition set of Danswer features built on top of the MIT/community edition. If you are on Danswer Cloud, you have access to them by default. If you're running it yourself, reach out to the Danswer team to receive access.",
|
||||
"chunk_ind": 0
|
||||
},
|
||||
{
|
||||
"url": "https://docs.danswer.dev/more/use_cases/enterprise_search",
|
||||
"title": "Enterprise Search",
|
||||
"content": "Value of Enterprise Search with Danswer\n\nWhat is Enterprise Search and why is it Important?\nAn Enterprise Search system gives team members a single place to access all of the disparate knowledge of an organization. Critical information is saved across a host of channels like call transcripts with prospects, engineering design docs, IT runbooks, customer support email exchanges, project management tickets, and more. As fast moving teams scale up, information gets spread out and more disorganized.\n\nSince it quickly becomes infeasible to check across every source, decisions get made on incomplete information, employee satisfaction decreases, and the most valuable members of your team are tied up with constant distractions as junior teammates are unable to unblock themselves. Danswer solves this problem by letting anyone on the team access all of the knowledge across your organization in a permissioned and secure way. Users can ask questions in natural language and get back answers and documents across all of the connected sources instantly.\n\nWhat's the real cost?\nA typical knowledge worker spends over 2 hours a week on search, but more than that, the cost of incomplete or incorrect information can be extremely high. Customer support/success that isn't able to find the reference to similar cases could cause hours or even days of delay leading to lower customer satisfaction or in the worst case - churn. An account exec not realizing that a prospect had previously mentioned a specific need could lead to lost deals. An engineer not realizing a similar feature had previously been built could result in weeks of wasted development time and tech debt with duplicate implementation. With a lack of knowledge, your whole organization is navigating in the dark - inefficient and mistake prone.",
|
||||
"chunk_ind": 0
|
||||
},
|
||||
{
|
||||
"url": "https://docs.danswer.dev/more/use_cases/enterprise_search",
|
||||
"title": "Enterprise Search",
|
||||
"content": "More than Search\nWhen analyzing the entire corpus of knowledge within your company is as easy as asking a question in a search bar, your entire team can stay informed and up to date. Danswer also makes it trivial to identify where knowledge is well documented and where it is lacking. Team members who are centers of knowledge can begin to effectively document their expertise since it is no longer being thrown into a black hole. All of this allows the organization to achieve higher efficiency and drive business outcomes.\n\nWith Generative AI, the entire user experience has evolved as well. For example, instead of just finding similar cases for your customer support team to reference, Danswer breaks down the issue and explains it so that even the most junior members can understand it. This in turn lets them give the most holistic and technically accurate response possible to your customers. On the other end, even the super stars of your sales team will not be able to review 10 hours of transcripts before hopping on that critical call, but Danswer can easily parse through it in mere seconds and give crucial context to help your team close.",
|
||||
"chunk_ind": 0
|
||||
},
|
||||
{
|
||||
"url": "https://docs.danswer.dev/more/use_cases/ai_platform",
|
||||
"title": "AI Platform",
|
||||
"content": "Build AI Agents powered by the knowledge and workflows specific to your organization.\n\nBeyond Answers\nAgents enabled by generative AI and reasoning capable models are helping teams to automate their work. Danswer is helping teams make it happen. Danswer provides out of the box user chat sessions, attaching custom tools, handling LLM reasoning, code execution, data analysis, referencing internal knowledge, and much more.\n\nDanswer as a platform is not a no-code agent builder. We are made by developers for developers and this gives your team the full flexibility and power to create agents not constrained by blocks and simple logic paths.\n\nFlexibility and Extensibility\nDanswer is open source and completely whitebox. This not only gives transparency to what happens within the system but also means that your team can directly modify the source code to suit your unique needs.",
|
||||
"chunk_ind": 0
|
||||
},
|
||||
{
|
||||
"url": "https://docs.danswer.dev/more/use_cases/customer_support",
|
||||
"title": "Customer Support",
|
||||
"content": "Help your customer support team instantly answer any question across your entire product.\n\nAI Enabled Support\nCustomer support agents have one of the highest breadth jobs. They field requests that cover the entire surface area of the product and need to help your users find success on extremely short timelines. Because they're not the same people who designed or built the system, they often lack the depth of understanding needed - resulting in delays and escalations to other teams. Modern teams are leveraging AI to help their CS team optimize the speed and quality of these critical customer-facing interactions.\n\nThe Importance of Context\nThere are two critical components of AI copilots for customer support. The first is that the AI system needs to be connected with as much information as possible (not just support tools like Zendesk or Intercom) and that the knowledge needs to be as fresh as possible. Sometimes a fix might even be in places rarely checked by CS such as pull requests in a code repository. The second critical component is the ability of the AI system to break down difficult concepts and convoluted processes into more digestible descriptions and for your team members to be able to chat back and forth with the system to build a better understanding.\n\nDanswer takes care of both of these. The system connects up to over 30+ different applications and the knowledge is pulled in constantly so that the information access is always up to date.",
|
||||
"chunk_ind": 0
|
||||
},
|
||||
{
|
||||
"url": "https://docs.danswer.dev/more/use_cases/sales",
|
||||
"title": "Sales",
|
||||
"content": "Keep your team up to date on every conversation and update so they can close.\n\nRecall Every Detail\nBeing able to instantly revisit every detail of any call without reading transcripts is helping Sales teams provide more tailored pitches, build stronger relationships, and close more deals. Instead of searching and reading through hours of transcripts in preparation for a call, your team can now ask Danswer \"What specific features was ACME interested in seeing for the demo\". Since your team doesn't have time to read every transcript prior to a call, Danswer provides a more thorough summary because it can instantly parse hundreds of pages and distill out the relevant information. Even for fast lookups it becomes much more convenient - for example to brush up on connection building topics by asking \"What rapport building topic did we chat about in the last call with ACME\".\n\nKnow Every Product Update\nIt is impossible for Sales teams to keep up with every product update. Because of this, when a prospect has a question that the Sales team does not know, they have no choice but to rely on the Product and Engineering orgs to get an authoritative answer. Not only is this distracting to the other teams, it also slows down the time to respond to the prospect (and as we know, time is the biggest killer of deals). With Danswer, it is even possible to get answers live on call because of how fast accessing information becomes. A question like \"Have we shipped the Microsoft AD integration yet?\" can now be answered in seconds meaning that prospects can get answers while on the call instead of asynchronously and sales cycles are reduced as a result.",
|
||||
"chunk_ind": 0
|
||||
},
|
||||
{
|
||||
"url": "https://docs.danswer.dev/more/use_cases/operations",
|
||||
"title": "Operations",
|
||||
"content": "Double the productivity of your Ops teams like IT, HR, etc.\n\nAutomatically Resolve Tickets\nModern teams are leveraging AI to auto-resolve up to 50% of tickets. Whether it is an employee asking about benefits details or how to set up the VPN for remote work, Danswer can help your team help themselves. This frees up your team to do the real impactful work of landing star candidates or improving your internal processes.\n\nAI Aided Onboarding\nOne of the periods where your team needs the most help is when they're just ramping up. Instead of feeling lost in dozens of new tools, Danswer gives them a single place where they can ask about anything in natural language. Whether it's how to set up their work environment or what their onboarding goals are, Danswer can walk them through every step with the help of Generative AI. This lets your team feel more empowered and gives time back to the more seasoned members of your team to focus on moving the needle.",
|
||||
"chunk_ind": 0
|
||||
}
|
||||
]
|
||||
@@ -32,7 +32,7 @@ from danswer.key_value_store.interface import KvKeyNotFoundError
|
||||
from danswer.server.documents.models import ConnectorBase
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.retry_wrapper import retry_builder
|
||||
|
||||
from danswer.utils.variable_functionality import fetch_versioned_implementation
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
@@ -91,7 +91,21 @@ def _create_indexable_chunks(
|
||||
return list(ids_to_documents.values()), chunks
|
||||
|
||||
|
||||
def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None:
|
||||
# Cohere is used in EE version
|
||||
def load_processed_docs(cohere_enabled: bool) -> list[dict]:
|
||||
initial_docs_path = os.path.join(
|
||||
os.getcwd(),
|
||||
"danswer",
|
||||
"seeding",
|
||||
"initial_docs.json",
|
||||
)
|
||||
processed_docs = json.load(open(initial_docs_path))
|
||||
return processed_docs
|
||||
|
||||
|
||||
def seed_initial_documents(
|
||||
db_session: Session, tenant_id: str | None, cohere_enabled: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
Seed initial documents so users don't have an empty index to start
|
||||
|
||||
@@ -132,7 +146,9 @@ def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None:
|
||||
return
|
||||
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
if search_settings.model_name != DEFAULT_DOCUMENT_ENCODER_MODEL:
|
||||
if search_settings.model_name != DEFAULT_DOCUMENT_ENCODER_MODEL and not (
|
||||
search_settings.model_name == "embed-english-v3.0" and cohere_enabled
|
||||
):
|
||||
logger.info("Embedding model has been updated, skipping")
|
||||
return
|
||||
|
||||
@@ -172,11 +188,10 @@ def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None:
|
||||
last_successful_index_time=last_index_time,
|
||||
)
|
||||
cc_pair_id = cast(int, result.data)
|
||||
|
||||
initial_docs_path = os.path.join(
|
||||
os.getcwd(), "danswer", "seeding", "initial_docs.json"
|
||||
)
|
||||
processed_docs = json.load(open(initial_docs_path))
|
||||
processed_docs = fetch_versioned_implementation(
|
||||
"danswer.seeding.load_docs",
|
||||
"load_processed_docs",
|
||||
)(cohere_enabled)
|
||||
|
||||
docs, chunks = _create_indexable_chunks(processed_docs, tenant_id)
|
||||
|
||||
|
||||
@@ -59,7 +59,9 @@ from shared_configs.model_server_models import SupportedEmbeddingModel
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def setup_danswer(db_session: Session, tenant_id: str | None) -> None:
|
||||
def setup_danswer(
|
||||
db_session: Session, tenant_id: str | None, cohere_enabled: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
Setup Danswer for a particular tenant. In the Single Tenant case, it will set it up for the default schema
|
||||
on server startup. In the MT case, it will be called when the tenant is created.
|
||||
@@ -148,7 +150,7 @@ def setup_danswer(db_session: Session, tenant_id: str | None) -> None:
|
||||
# update multipass indexing setting based on GPU availability
|
||||
update_default_multipass_indexing(db_session)
|
||||
|
||||
seed_initial_documents(db_session, tenant_id)
|
||||
seed_initial_documents(db_session, tenant_id, cohere_enabled)
|
||||
|
||||
|
||||
def translate_saved_search_settings(db_session: Session) -> None:
|
||||
|
||||
45
backend/ee/danswer/seeding/load_docs.py
Normal file
45
backend/ee/danswer/seeding/load_docs.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import json
|
||||
import os
|
||||
from typing import cast
|
||||
from typing import List
|
||||
|
||||
from cohere import Client
|
||||
|
||||
from ee.danswer.configs.app_configs import COHERE_DEFAULT_API_KEY
|
||||
|
||||
Embedding = List[float]
|
||||
|
||||
|
||||
def load_processed_docs(cohere_enabled: bool) -> list[dict]:
|
||||
base_path = os.path.join(os.getcwd(), "danswer", "seeding")
|
||||
|
||||
if cohere_enabled and COHERE_DEFAULT_API_KEY:
|
||||
initial_docs_path = os.path.join(base_path, "initial_docs_cohere.json")
|
||||
processed_docs = json.load(open(initial_docs_path))
|
||||
|
||||
cohere_client = Client(api_key=COHERE_DEFAULT_API_KEY)
|
||||
embed_model = "embed-english-v3.0"
|
||||
|
||||
for doc in processed_docs:
|
||||
title_embed_response = cohere_client.embed(
|
||||
texts=[doc["title"]],
|
||||
model=embed_model,
|
||||
input_type="search_document",
|
||||
)
|
||||
content_embed_response = cohere_client.embed(
|
||||
texts=[doc["content"]],
|
||||
model=embed_model,
|
||||
input_type="search_document",
|
||||
)
|
||||
|
||||
doc["title_embedding"] = cast(
|
||||
List[Embedding], title_embed_response.embeddings
|
||||
)[0]
|
||||
doc["content_embedding"] = cast(
|
||||
List[Embedding], content_embed_response.embeddings
|
||||
)[0]
|
||||
else:
|
||||
initial_docs_path = os.path.join(base_path, "initial_docs.json")
|
||||
processed_docs = json.load(open(initial_docs_path))
|
||||
|
||||
return processed_docs
|
||||
@@ -4,6 +4,7 @@ import uuid
|
||||
|
||||
import aiohttp # Async HTTP client
|
||||
from fastapi import HTTPException
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.auth.users import exceptions
|
||||
@@ -13,6 +14,8 @@ from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.db.llm import update_default_provider
|
||||
from danswer.db.llm import upsert_cloud_embedding_provider
|
||||
from danswer.db.llm import upsert_llm_provider
|
||||
from danswer.db.models import IndexModelStatus
|
||||
from danswer.db.models import SearchSettings
|
||||
from danswer.db.models import UserTenantMapping
|
||||
from danswer.llm.llm_provider_options import ANTHROPIC_MODEL_NAMES
|
||||
from danswer.llm.llm_provider_options import ANTHROPIC_PROVIDER_NAME
|
||||
@@ -102,9 +105,19 @@ async def provision_tenant(tenant_id: str, email: str) -> None:
|
||||
await asyncio.to_thread(run_alembic_migrations, tenant_id)
|
||||
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
setup_danswer(db_session, tenant_id)
|
||||
configure_default_api_keys(db_session)
|
||||
|
||||
current_search_settings = (
|
||||
db_session.query(SearchSettings)
|
||||
.filter_by(status=IndexModelStatus.FUTURE)
|
||||
.first()
|
||||
)
|
||||
cohere_enabled = (
|
||||
current_search_settings is not None
|
||||
and current_search_settings.provider_type == EmbeddingProvider.COHERE
|
||||
)
|
||||
setup_danswer(db_session, tenant_id, cohere_enabled=cohere_enabled)
|
||||
|
||||
add_users_to_tenant([email], tenant_id)
|
||||
|
||||
except Exception as e:
|
||||
@@ -200,11 +213,51 @@ def configure_default_api_keys(db_session: Session) -> None:
|
||||
provider_type=EmbeddingProvider.COHERE,
|
||||
api_key=COHERE_DEFAULT_API_KEY,
|
||||
)
|
||||
|
||||
try:
|
||||
logger.info("Attempting to upsert Cohere cloud embedding provider")
|
||||
upsert_cloud_embedding_provider(db_session, cloud_embedding_provider)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to configure Cohere embedding provider: {e}")
|
||||
logger.info("Successfully upserted Cohere cloud embedding provider")
|
||||
|
||||
logger.info("Updating search settings with Cohere embedding model details")
|
||||
query = (
|
||||
select(SearchSettings)
|
||||
.where(SearchSettings.status == IndexModelStatus.FUTURE)
|
||||
.order_by(SearchSettings.id.desc())
|
||||
)
|
||||
result = db_session.execute(query)
|
||||
current_search_settings = result.scalars().first()
|
||||
|
||||
if current_search_settings:
|
||||
current_search_settings.model_name = (
|
||||
"embed-english-v3.0" # Cohere's latest model as of now
|
||||
)
|
||||
current_search_settings.model_dim = (
|
||||
1024 # Cohere's embed-english-v3.0 dimension
|
||||
)
|
||||
current_search_settings.provider_type = EmbeddingProvider.COHERE
|
||||
current_search_settings.index_name = (
|
||||
"danswer_chunk_cohere_embed_english_v3_0"
|
||||
)
|
||||
current_search_settings.query_prefix = ""
|
||||
current_search_settings.passage_prefix = ""
|
||||
db_session.commit()
|
||||
else:
|
||||
raise RuntimeError(
|
||||
"No search settings specified, DB is not in a valid state"
|
||||
)
|
||||
logger.info("Fetching updated search settings to verify changes")
|
||||
updated_query = (
|
||||
select(SearchSettings)
|
||||
.where(SearchSettings.status == IndexModelStatus.PRESENT)
|
||||
.order_by(SearchSettings.id.desc())
|
||||
)
|
||||
updated_result = db_session.execute(updated_query)
|
||||
updated_result.scalars().first()
|
||||
|
||||
except Exception:
|
||||
logger.exception("Failed to configure Cohere embedding provider")
|
||||
else:
|
||||
logger.error(
|
||||
logger.info(
|
||||
"COHERE_DEFAULT_API_KEY not set, skipping Cohere embedding provider configuration"
|
||||
)
|
||||
|
||||
@@ -26,4 +26,5 @@ lxml==5.3.0
|
||||
lxml_html_clean==0.2.2
|
||||
boto3-stubs[s3]==1.34.133
|
||||
pandas==2.2.3
|
||||
pandas-stubs==2.2.3.241009
|
||||
pandas-stubs==2.2.3.241009
|
||||
cohere==5.6.1
|
||||
@@ -142,6 +142,20 @@ async def async_return_default_schema(*args: Any, **kwargs: Any) -> str:
|
||||
# Prefix used for all tenant ids
|
||||
TENANT_ID_PREFIX = "tenant_"
|
||||
|
||||
DISALLOWED_SLACK_BOT_TENANT_IDS = os.environ.get("DISALLOWED_SLACK_BOT_TENANT_IDS")
|
||||
DISALLOWED_SLACK_BOT_TENANT_LIST = (
|
||||
[tenant.strip() for tenant in DISALLOWED_SLACK_BOT_TENANT_IDS.split(",")]
|
||||
if DISALLOWED_SLACK_BOT_TENANT_IDS
|
||||
else None
|
||||
)
|
||||
|
||||
IGNORED_SYNCING_TENANT_IDS = os.environ.get("IGNORED_SYNCING_TENANT_IDS")
|
||||
IGNORED_SYNCING_TENANT_LIST = (
|
||||
[tenant.strip() for tenant in IGNORED_SYNCING_TENANT_IDS.split(",")]
|
||||
if IGNORED_SYNCING_TENANT_IDS
|
||||
else None
|
||||
)
|
||||
|
||||
SUPPORTED_EMBEDDING_MODELS = [
|
||||
# Cloud-based models
|
||||
SupportedEmbeddingModel(
|
||||
|
||||
@@ -9,12 +9,11 @@ spec:
|
||||
scaleTargetRef:
|
||||
name: celery-worker-indexing
|
||||
minReplicaCount: 1
|
||||
maxReplicaCount: 10
|
||||
maxReplicaCount: 30
|
||||
triggers:
|
||||
- type: redis
|
||||
metadata:
|
||||
sslEnabled: "true"
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: connector_indexing
|
||||
@@ -22,10 +21,10 @@ spec:
|
||||
databaseIndex: "15"
|
||||
authenticationRef:
|
||||
name: celery-worker-auth
|
||||
|
||||
- type: redis
|
||||
metadata:
|
||||
sslEnabled: "true"
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: connector_indexing:2
|
||||
@@ -36,7 +35,6 @@ spec:
|
||||
- type: redis
|
||||
metadata:
|
||||
sslEnabled: "true"
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: connector_indexing:3
|
||||
@@ -44,3 +42,12 @@ spec:
|
||||
databaseIndex: "15"
|
||||
authenticationRef:
|
||||
name: celery-worker-auth
|
||||
- type: cpu
|
||||
metadata:
|
||||
type: Utilization
|
||||
value: "70"
|
||||
|
||||
- type: memory
|
||||
metadata:
|
||||
type: Utilization
|
||||
value: "70"
|
||||
|
||||
@@ -8,12 +8,11 @@ metadata:
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
name: celery-worker-light
|
||||
minReplicaCount: 1
|
||||
minReplicaCount: 5
|
||||
maxReplicaCount: 20
|
||||
triggers:
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: vespa_metadata_sync
|
||||
@@ -23,7 +22,6 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: vespa_metadata_sync:2
|
||||
@@ -33,7 +31,6 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: vespa_metadata_sync:3
|
||||
@@ -43,7 +40,6 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: connector_deletion
|
||||
@@ -53,7 +49,6 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: connector_deletion:2
|
||||
|
||||
@@ -15,7 +15,6 @@ spec:
|
||||
triggers:
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: celery
|
||||
@@ -26,7 +25,6 @@ spec:
|
||||
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: celery:1
|
||||
@@ -36,7 +34,6 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: celery:2
|
||||
@@ -46,7 +43,6 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: celery:3
|
||||
@@ -56,7 +52,6 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: periodic_tasks
|
||||
@@ -66,7 +61,6 @@ spec:
|
||||
name: celery-worker-auth
|
||||
- type: redis
|
||||
metadata:
|
||||
host: "{host}"
|
||||
port: "6379"
|
||||
enableTLS: "true"
|
||||
listName: periodic_tasks:2
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
apiVersion: keda.sh/v1alpha1
|
||||
kind: ScaledObject
|
||||
metadata:
|
||||
name: indexing-model-server-scaledobject
|
||||
namespace: danswer
|
||||
labels:
|
||||
app: indexing-model-server
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
name: indexing-model-server-deployment
|
||||
pollingInterval: 15 # Check every 15 seconds
|
||||
cooldownPeriod: 30 # Wait 30 seconds before scaling down
|
||||
minReplicaCount: 1
|
||||
maxReplicaCount: 14
|
||||
triggers:
|
||||
- type: cpu
|
||||
metadata:
|
||||
type: Utilization
|
||||
value: "70"
|
||||
@@ -5,5 +5,5 @@ metadata:
|
||||
namespace: danswer
|
||||
type: Opaque
|
||||
data:
|
||||
host: { { base64-encoded-hostname } }
|
||||
password: { { base64-encoded-password } }
|
||||
host: { base64 encoded host here }
|
||||
password: { base64 encoded password here }
|
||||
|
||||
@@ -14,8 +14,8 @@ spec:
|
||||
spec:
|
||||
containers:
|
||||
- name: celery-beat
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
||||
imagePullPolicy: Always
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.12
|
||||
imagePullPolicy: IfNotPresent
|
||||
command:
|
||||
[
|
||||
"celery",
|
||||
|
||||
@@ -14,8 +14,8 @@ spec:
|
||||
spec:
|
||||
containers:
|
||||
- name: celery-worker-heavy
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
||||
imagePullPolicy: Always
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.12
|
||||
imagePullPolicy: IfNotPresent
|
||||
command:
|
||||
[
|
||||
"celery",
|
||||
|
||||
@@ -14,8 +14,8 @@ spec:
|
||||
spec:
|
||||
containers:
|
||||
- name: celery-worker-indexing
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
||||
imagePullPolicy: Always
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.12
|
||||
imagePullPolicy: IfNotPresent
|
||||
command:
|
||||
[
|
||||
"celery",
|
||||
@@ -47,10 +47,10 @@ spec:
|
||||
resources:
|
||||
requests:
|
||||
cpu: "500m"
|
||||
memory: "1Gi"
|
||||
memory: "4Gi"
|
||||
limits:
|
||||
cpu: "1000m"
|
||||
memory: "2Gi"
|
||||
memory: "8Gi"
|
||||
volumes:
|
||||
- name: vespa-certificates
|
||||
secret:
|
||||
|
||||
@@ -14,8 +14,8 @@ spec:
|
||||
spec:
|
||||
containers:
|
||||
- name: celery-worker-light
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
||||
imagePullPolicy: Always
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.12
|
||||
imagePullPolicy: IfNotPresent
|
||||
command:
|
||||
[
|
||||
"celery",
|
||||
|
||||
@@ -14,8 +14,8 @@ spec:
|
||||
spec:
|
||||
containers:
|
||||
- name: celery-worker-primary
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
||||
imagePullPolicy: Always
|
||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.12
|
||||
imagePullPolicy: IfNotPresent
|
||||
command:
|
||||
[
|
||||
"celery",
|
||||
|
||||
Reference in New Issue
Block a user