Compare commits

...

6 Commits

Author SHA1 Message Date
pablodanswer
23ad201ea6 add cohere default 2024-11-10 23:04:39 -08:00
pablodanswer
3ad98078f5 finalized keda 2024-11-10 18:40:08 -08:00
pablodanswer
0fb12b42f1 minor update 2024-11-10 17:46:33 -08:00
pablodanswer
158329a3cc finalize slackbot improvements 2024-11-10 17:45:13 -08:00
pablodanswer
7f1a50823b fix typing 2024-11-10 17:31:05 -08:00
pablodanswer
0e76bcef45 add improved cloud configuration 2024-11-10 17:28:43 -08:00
24 changed files with 14610 additions and 157 deletions

View File

@@ -6,6 +6,7 @@ from celery import signals
from celery import Task
from celery.signals import celeryd_init
from celery.signals import worker_init
from celery.signals import worker_process_init
from celery.signals import worker_ready
from celery.signals import worker_shutdown
@@ -81,6 +82,11 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
app_base.on_worker_shutdown(sender, **kwargs)
@worker_process_init.connect
def init_worker(**kwargs: Any) -> None:
SqlEngine.reset_engine()
@signals.setup_logging.connect
def on_setup_logging(
loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any

View File

@@ -1,96 +0,0 @@
from datetime import timedelta
from typing import Any
from celery.beat import PersistentScheduler # type: ignore
from celery.utils.log import get_task_logger
from danswer.db.engine import get_all_tenant_ids
from danswer.utils.variable_functionality import fetch_versioned_implementation
logger = get_task_logger(__name__)
class DynamicTenantScheduler(PersistentScheduler):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._reload_interval = timedelta(minutes=1)
self._last_reload = self.app.now() - self._reload_interval
def setup_schedule(self) -> None:
super().setup_schedule()
def tick(self) -> float:
retval = super().tick()
now = self.app.now()
if (
self._last_reload is None
or (now - self._last_reload) > self._reload_interval
):
logger.info("Reloading schedule to check for new tenants...")
self._update_tenant_tasks()
self._last_reload = now
return retval
def _update_tenant_tasks(self) -> None:
logger.info("Checking for tenant task updates...")
try:
tenant_ids = get_all_tenant_ids()
tasks_to_schedule = fetch_versioned_implementation(
"danswer.background.celery.tasks.beat_schedule", "get_tasks_to_schedule"
)
new_beat_schedule: dict[str, dict[str, Any]] = {}
current_schedule = getattr(self, "_store", {"entries": {}}).get(
"entries", {}
)
existing_tenants = set()
for task_name in current_schedule.keys():
if "-" in task_name:
existing_tenants.add(task_name.split("-")[-1])
for tenant_id in tenant_ids:
if tenant_id not in existing_tenants:
logger.info(f"Found new tenant: {tenant_id}")
for task in tasks_to_schedule():
task_name = f"{task['name']}-{tenant_id}"
new_task = {
"task": task["task"],
"schedule": task["schedule"],
"kwargs": {"tenant_id": tenant_id},
}
if options := task.get("options"):
new_task["options"] = options
new_beat_schedule[task_name] = new_task
if self._should_update_schedule(current_schedule, new_beat_schedule):
logger.info(
"Updating schedule",
extra={
"new_tasks": len(new_beat_schedule),
"current_tasks": len(current_schedule),
},
)
if not hasattr(self, "_store"):
self._store: dict[str, dict] = {"entries": {}}
self.update_from_dict(new_beat_schedule)
logger.info(f"New schedule: {new_beat_schedule}")
logger.info("Tenant tasks updated successfully")
else:
logger.debug("No schedule updates needed")
except (AttributeError, KeyError):
logger.exception("Failed to process task configuration")
except Exception:
logger.exception("Unexpected error updating tenant tasks")
def _should_update_schedule(
self, current_schedule: dict, new_schedule: dict
) -> bool:
"""Compare schedules to determine if an update is needed."""
current_tasks = set(current_schedule.keys())
new_tasks = set(new_schedule.keys())
return current_tasks != new_tasks

View File

@@ -8,7 +8,7 @@ tasks_to_schedule = [
{
"name": "check-for-vespa-sync",
"task": "check_for_vespa_sync_task",
"schedule": timedelta(seconds=5),
"schedule": timedelta(seconds=20),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
@@ -20,13 +20,13 @@ tasks_to_schedule = [
{
"name": "check-for-indexing",
"task": "check_for_indexing",
"schedule": timedelta(seconds=10),
"schedule": timedelta(seconds=15),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{
"name": "check-for-prune",
"task": "check_for_pruning",
"schedule": timedelta(seconds=10),
"schedule": timedelta(seconds=15),
"options": {"priority": DanswerCeleryPriority.HIGH},
},
{

View File

@@ -118,15 +118,13 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
embedding_model=embedding_model,
)
cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
cc_pair_ids: list[int] = []
cc_pairs = fetch_connector_credential_pairs(db_session)
for cc_pair_entry in cc_pairs:
cc_pair_ids.append(cc_pair_entry.id)
for cc_pair_id in cc_pair_ids:
redis_connector = RedisConnector(tenant_id, cc_pair_id)
with get_session_with_tenant(tenant_id) as db_session:
for cc_pair_id in cc_pair_ids:
redis_connector = RedisConnector(tenant_id, cc_pair_id)
# Get the primary search settings
primary_search_settings = get_current_search_settings(db_session)
search_settings = [primary_search_settings]

View File

@@ -29,18 +29,26 @@ JobStatusType = (
def _initializer(
func: Callable, args: list | tuple, kwargs: dict[str, Any] | None = None
) -> Any:
"""Ensure the parent proc's database connections are not touched
in the new connection pool
"""Initialize the child process with a fresh SQLAlchemy Engine.
Based on the recommended approach in the SQLAlchemy docs found:
Based on SQLAlchemy's recommendations to handle multiprocessing:
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
"""
if kwargs is None:
kwargs = {}
logger.info("Initializing spawned worker child process.")
# Reset the engine in the child process
SqlEngine.reset_engine()
# Optionally set a custom app name for database logging purposes
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
# Initialize a new engine with desired parameters
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
# Proceed with executing the target function
return func(*args, **kwargs)

View File

@@ -55,11 +55,11 @@ def validate_channel_names(
# Scaling configurations for multi-tenant Slack bot handling
TENANT_LOCK_EXPIRATION = 1800 # How long a pod can hold exclusive access to a tenant before other pods can acquire it
TENANT_HEARTBEAT_INTERVAL = (
60 # How often pods send heartbeats to indicate they are still processing a tenant
15 # How often pods send heartbeats to indicate they are still processing a tenant
)
TENANT_HEARTBEAT_EXPIRATION = 180 # How long before a tenant's heartbeat expires, allowing other pods to take over
TENANT_ACQUISITION_INTERVAL = (
60 # How often pods attempt to acquire unprocessed tenants
TENANT_HEARTBEAT_EXPIRATION = (
30 # How long before a tenant's heartbeat expires, allowing other pods to take over
)
TENANT_ACQUISITION_INTERVAL = 60 # How often pods attempt to acquire unprocessed tenants and checks for new tokens
MAX_TENANTS_PER_POD = int(os.getenv("MAX_TENANTS_PER_POD", 50))

View File

@@ -75,6 +75,7 @@ from danswer.search.retrieval.search_runner import download_nltk_data
from danswer.server.manage.models import SlackBotTokens
from danswer.utils.logger import setup_logger
from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
from shared_configs.configs import ALLOWED_SLACK_BOT_TENANT_LIST
from shared_configs.configs import MODEL_SERVER_HOST
from shared_configs.configs import MODEL_SERVER_PORT
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
@@ -164,9 +165,15 @@ class SlackbotHandler:
def acquire_tenants(self) -> None:
tenant_ids = get_all_tenant_ids()
logger.debug(f"Found {len(tenant_ids)} total tenants in Postgres")
for tenant_id in tenant_ids:
if (
ALLOWED_SLACK_BOT_TENANT_LIST is not None
and tenant_id not in ALLOWED_SLACK_BOT_TENANT_LIST
):
logger.debug(f"Tenant {tenant_id} not in allowed list, skipping")
continue
if tenant_id in self.tenant_ids:
logger.debug(f"Tenant {tenant_id} already in self.tenant_ids")
continue
@@ -190,6 +197,9 @@ class SlackbotHandler:
continue
logger.debug(f"Acquired lock for tenant {tenant_id}")
self.tenant_ids.add(tenant_id)
for tenant_id in self.tenant_ids:
token = CURRENT_TENANT_ID_CONTEXTVAR.set(
tenant_id or POSTGRES_DEFAULT_SCHEMA
)
@@ -236,14 +246,14 @@ class SlackbotHandler:
self.slack_bot_tokens[tenant_id] = slack_bot_tokens
if tenant_id in self.socket_clients:
if self.socket_clients.get(tenant_id):
asyncio.run(self.socket_clients[tenant_id].close())
self.start_socket_client(tenant_id, slack_bot_tokens)
except KvKeyNotFoundError:
logger.debug(f"Missing Slack Bot tokens for tenant {tenant_id}")
if tenant_id in self.socket_clients:
if self.socket_clients.get(tenant_id):
asyncio.run(self.socket_clients[tenant_id].close())
del self.socket_clients[tenant_id]
del self.slack_bot_tokens[tenant_id]
@@ -277,14 +287,14 @@ class SlackbotHandler:
logger.info(f"Connecting socket client for tenant {tenant_id}")
socket_client.connect()
self.socket_clients[tenant_id] = socket_client
self.tenant_ids.add(tenant_id)
logger.info(f"Started SocketModeClient for tenant {tenant_id}")
def stop_socket_clients(self) -> None:
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
for tenant_id, client in self.socket_clients.items():
asyncio.run(client.close())
logger.info(f"Stopped SocketModeClient for tenant {tenant_id}")
if client:
asyncio.run(client.close())
logger.info(f"Stopped SocketModeClient for tenant {tenant_id}")
def shutdown(self, signum: int | None, frame: FrameType | None) -> None:
if not self.running:
@@ -298,6 +308,16 @@ class SlackbotHandler:
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
self.stop_socket_clients()
# Release locks for all tenants
logger.info(f"Releasing locks for {len(self.tenant_ids)} tenants")
for tenant_id in self.tenant_ids:
try:
redis_client = get_redis_client(tenant_id=tenant_id)
redis_client.delete(DanswerRedisLocks.SLACK_BOT_LOCK)
logger.info(f"Released lock for tenant {tenant_id}")
except Exception as e:
logger.error(f"Error releasing lock for tenant {tenant_id}: {e}")
# Wait for background threads to finish (with timeout)
logger.info("Waiting for background threads to finish...")
self.acquire_thread.join(timeout=5)

View File

@@ -38,6 +38,7 @@ from danswer.configs.app_configs import POSTGRES_USER
from danswer.configs.app_configs import USER_AUTH_SECRET
from danswer.configs.constants import POSTGRES_UNKNOWN_APP_NAME
from danswer.utils.logger import setup_logger
from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
from shared_configs.configs import TENANT_ID_PREFIX
@@ -189,6 +190,13 @@ class SqlEngine:
return ""
return cls._app_name
@classmethod
def reset_engine(cls) -> None:
with cls._lock:
if cls._engine:
cls._engine.dispose()
cls._engine = None
def get_all_tenant_ids() -> list[str] | list[None]:
if not MULTI_TENANT:
@@ -207,7 +215,14 @@ def get_all_tenant_ids() -> list[str] | list[None]:
valid_tenants = [
tenant
for tenant in tenant_ids
if tenant is None or tenant.startswith(TENANT_ID_PREFIX)
if tenant is None
or (
tenant.startswith(TENANT_ID_PREFIX)
and (
IGNORED_SYNCING_TENANT_LIST is None
or tenant not in IGNORED_SYNCING_TENANT_LIST
)
)
]
return valid_tenants

View File

@@ -63,6 +63,7 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
stmt = construct_document_select_for_connector_credential_pair_by_needs_sync(
cc_pair.connector_id, cc_pair.credential_id
)
for doc in db_session.scalars(stmt).yield_per(1):
current_time = time.monotonic()
if current_time - last_lock_time >= (

File diff suppressed because it is too large Load Diff

View File

@@ -91,7 +91,9 @@ def _create_indexable_chunks(
return list(ids_to_documents.values()), chunks
def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None:
def seed_initial_documents(
db_session: Session, tenant_id: str | None, cohere_enabled: bool = False
) -> None:
"""
Seed initial documents so users don't have an empty index to start
@@ -132,7 +134,9 @@ def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None:
return
search_settings = get_current_search_settings(db_session)
if search_settings.model_name != DEFAULT_DOCUMENT_ENCODER_MODEL:
if search_settings.model_name != DEFAULT_DOCUMENT_ENCODER_MODEL and not (
search_settings.model_name == "embed-english-v3.0" and cohere_enabled
):
logger.info("Embedding model has been updated, skipping")
return
@@ -174,8 +178,13 @@ def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None:
cc_pair_id = cast(int, result.data)
initial_docs_path = os.path.join(
os.getcwd(), "danswer", "seeding", "initial_docs.json"
os.getcwd(),
"danswer",
"seeding",
"initial_docs_cohere.json" if cohere_enabled else "initial_docs.json",
)
print(f"Loading docs from {initial_docs_path}")
processed_docs = json.load(open(initial_docs_path))
docs, chunks = _create_indexable_chunks(processed_docs, tenant_id)

View File

@@ -59,7 +59,9 @@ from shared_configs.model_server_models import SupportedEmbeddingModel
logger = setup_logger()
def setup_danswer(db_session: Session, tenant_id: str | None) -> None:
def setup_danswer(
db_session: Session, tenant_id: str | None, cohere_enabled: bool = False
) -> None:
"""
Setup Danswer for a particular tenant. In the Single Tenant case, it will set it up for the default schema
on server startup. In the MT case, it will be called when the tenant is created.
@@ -148,7 +150,7 @@ def setup_danswer(db_session: Session, tenant_id: str | None) -> None:
# update multipass indexing setting based on GPU availability
update_default_multipass_indexing(db_session)
seed_initial_documents(db_session, tenant_id)
seed_initial_documents(db_session, tenant_id, cohere_enabled)
def translate_saved_search_settings(db_session: Session) -> None:

View File

@@ -4,6 +4,7 @@ import uuid
import aiohttp # Async HTTP client
from fastapi import HTTPException
from sqlalchemy import select
from sqlalchemy.orm import Session
from danswer.auth.users import exceptions
@@ -13,6 +14,8 @@ from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.llm import update_default_provider
from danswer.db.llm import upsert_cloud_embedding_provider
from danswer.db.llm import upsert_llm_provider
from danswer.db.models import IndexModelStatus
from danswer.db.models import SearchSettings
from danswer.db.models import UserTenantMapping
from danswer.llm.llm_provider_options import ANTHROPIC_MODEL_NAMES
from danswer.llm.llm_provider_options import ANTHROPIC_PROVIDER_NAME
@@ -102,9 +105,19 @@ async def provision_tenant(tenant_id: str, email: str) -> None:
await asyncio.to_thread(run_alembic_migrations, tenant_id)
with get_session_with_tenant(tenant_id) as db_session:
setup_danswer(db_session, tenant_id)
configure_default_api_keys(db_session)
current_search_settings = (
db_session.query(SearchSettings)
.filter_by(status=IndexModelStatus.FUTURE)
.first()
)
cohere_enabled = (
current_search_settings is not None
and current_search_settings.provider_type == EmbeddingProvider.COHERE
)
setup_danswer(db_session, tenant_id, cohere_enabled=cohere_enabled)
add_users_to_tenant([email], tenant_id)
except Exception as e:
@@ -200,11 +213,51 @@ def configure_default_api_keys(db_session: Session) -> None:
provider_type=EmbeddingProvider.COHERE,
api_key=COHERE_DEFAULT_API_KEY,
)
try:
logger.info("Attempting to upsert Cohere cloud embedding provider")
upsert_cloud_embedding_provider(db_session, cloud_embedding_provider)
except Exception as e:
logger.error(f"Failed to configure Cohere embedding provider: {e}")
logger.info("Successfully upserted Cohere cloud embedding provider")
logger.info("Updating search settings with Cohere embedding model details")
query = (
select(SearchSettings)
.where(SearchSettings.status == IndexModelStatus.FUTURE)
.order_by(SearchSettings.id.desc())
)
result = db_session.execute(query)
current_search_settings = result.scalars().first()
if current_search_settings:
current_search_settings.model_name = (
"embed-english-v3.0" # Cohere's latest model as of now
)
current_search_settings.model_dim = (
1024 # Cohere's embed-english-v3.0 dimension
)
current_search_settings.provider_type = EmbeddingProvider.COHERE
current_search_settings.index_name = (
"danswer_chunk_cohere_embed_english_v3_0"
)
current_search_settings.query_prefix = ""
current_search_settings.passage_prefix = ""
db_session.commit()
else:
raise RuntimeError(
"No search settings specified, DB is not in a valid state"
)
logger.info("Fetching updated search settings to verify changes")
updated_query = (
select(SearchSettings)
.where(SearchSettings.status == IndexModelStatus.PRESENT)
.order_by(SearchSettings.id.desc())
)
updated_result = db_session.execute(updated_query)
updated_result.scalars().first()
except Exception:
logger.exception("Failed to configure Cohere embedding provider")
else:
logger.error(
logger.info(
"COHERE_DEFAULT_API_KEY not set, skipping Cohere embedding provider configuration"
)

View File

@@ -142,6 +142,20 @@ async def async_return_default_schema(*args: Any, **kwargs: Any) -> str:
# Prefix used for all tenant ids
TENANT_ID_PREFIX = "tenant_"
ALLOWED_SLACK_BOT_TENANT_IDS = os.environ.get("ALLOWED_SLACK_BOT_TENANT_IDS")
ALLOWED_SLACK_BOT_TENANT_LIST = (
[tenant.strip() for tenant in ALLOWED_SLACK_BOT_TENANT_IDS.split(",")]
if ALLOWED_SLACK_BOT_TENANT_IDS
else None
)
IGNORED_SYNCING_TENANT_IDS = os.environ.get("IGNORED_SYNCING_TENANT_ID")
IGNORED_SYNCING_TENANT_LIST = (
[tenant.strip() for tenant in IGNORED_SYNCING_TENANT_IDS.split(",")]
if IGNORED_SYNCING_TENANT_IDS
else None
)
SUPPORTED_EMBEDDING_MODELS = [
# Cloud-based models
SupportedEmbeddingModel(

View File

@@ -9,12 +9,11 @@ spec:
scaleTargetRef:
name: celery-worker-indexing
minReplicaCount: 1
maxReplicaCount: 10
maxReplicaCount: 30
triggers:
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
port: "6379"
enableTLS: "true"
listName: connector_indexing
@@ -22,10 +21,10 @@ spec:
databaseIndex: "15"
authenticationRef:
name: celery-worker-auth
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
port: "6379"
enableTLS: "true"
listName: connector_indexing:2
@@ -36,7 +35,6 @@ spec:
- type: redis
metadata:
sslEnabled: "true"
host: "{host}"
port: "6379"
enableTLS: "true"
listName: connector_indexing:3
@@ -44,3 +42,12 @@ spec:
databaseIndex: "15"
authenticationRef:
name: celery-worker-auth
- type: cpu
metadata:
type: Utilization
value: "70"
- type: memory
metadata:
type: Utilization
value: "70"

View File

@@ -8,12 +8,11 @@ metadata:
spec:
scaleTargetRef:
name: celery-worker-light
minReplicaCount: 1
minReplicaCount: 5
maxReplicaCount: 20
triggers:
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync
@@ -23,7 +22,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync:2
@@ -33,7 +31,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: vespa_metadata_sync:3
@@ -43,7 +40,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: connector_deletion
@@ -53,7 +49,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: connector_deletion:2

View File

@@ -15,7 +15,6 @@ spec:
triggers:
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: celery
@@ -26,7 +25,6 @@ spec:
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: celery:1
@@ -36,7 +34,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: celery:2
@@ -46,7 +43,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: celery:3
@@ -56,7 +52,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: periodic_tasks
@@ -66,7 +61,6 @@ spec:
name: celery-worker-auth
- type: redis
metadata:
host: "{host}"
port: "6379"
enableTLS: "true"
listName: periodic_tasks:2

View File

@@ -0,0 +1,19 @@
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: indexing-model-server-scaledobject
namespace: danswer
labels:
app: indexing-model-server
spec:
scaleTargetRef:
name: indexing-model-server-deployment
pollingInterval: 15 # Check every 15 seconds
cooldownPeriod: 30 # Wait 30 seconds before scaling down
minReplicaCount: 1
maxReplicaCount: 14
triggers:
- type: cpu
metadata:
type: Utilization
value: "70"

View File

@@ -5,5 +5,5 @@ metadata:
namespace: danswer
type: Opaque
data:
host: { { base64-encoded-hostname } }
password: { { base64-encoded-password } }
host: { base64 encoded host here }
password: { base64 encoded password here }

View File

@@ -14,8 +14,8 @@ spec:
spec:
containers:
- name: celery-beat
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
imagePullPolicy: Always
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.10
imagePullPolicy: IfNotPresent
command:
[
"celery",

View File

@@ -14,8 +14,8 @@ spec:
spec:
containers:
- name: celery-worker-heavy
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
imagePullPolicy: Always
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.10
imagePullPolicy: IfNotPresent
command:
[
"celery",

View File

@@ -14,8 +14,8 @@ spec:
spec:
containers:
- name: celery-worker-indexing
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
imagePullPolicy: Always
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.10
imagePullPolicy: IfNotPresent
command:
[
"celery",
@@ -47,10 +47,10 @@ spec:
resources:
requests:
cpu: "500m"
memory: "1Gi"
memory: "4Gi"
limits:
cpu: "1000m"
memory: "2Gi"
memory: "8Gi"
volumes:
- name: vespa-certificates
secret:

View File

@@ -14,8 +14,8 @@ spec:
spec:
containers:
- name: celery-worker-light
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
imagePullPolicy: Always
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.10
imagePullPolicy: IfNotPresent
command:
[
"celery",

View File

@@ -14,8 +14,8 @@ spec:
spec:
containers:
- name: celery-worker-primary
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
imagePullPolicy: Always
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.10
imagePullPolicy: IfNotPresent
command:
[
"celery",