Compare commits

...

9 Commits

Author SHA1 Message Date
pablonyx
86cc29ae43 k 2025-03-06 16:43:51 -08:00
pablonyx
99f2c3e87f k 2025-03-06 16:43:07 -08:00
pablonyx
05cb7bfd51 nit 2025-03-06 16:39:39 -08:00
pablonyx
ee89591060 quick destructive nit 2025-03-06 15:14:34 -08:00
pablonyx
44cf923ab7 k 2025-03-06 14:47:55 -08:00
pablonyx
3ca97648f5 quick nit 2025-03-06 14:47:55 -08:00
pablonyx
b8b20585e1 cleaner 2025-03-06 14:47:53 -08:00
pablonyx
55b0b02068 k 2025-03-06 14:47:24 -08:00
pablonyx
2b1ec9c454 k 2025-03-06 14:47:24 -08:00
11 changed files with 414 additions and 74 deletions

View File

@@ -0,0 +1,33 @@
"""add new available tenant table
Revision ID: 3b45e0018bf1
Revises: 34e3630c7f32
Create Date: 2025-03-06 09:55:18.229910
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "3b45e0018bf1"
down_revision = "34e3630c7f32"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create new_available_tenant table
op.create_table(
"available_tenant",
sa.Column("tenant_id", sa.String(), nullable=False),
sa.Column("alembic_version", sa.String(), nullable=False),
sa.Column("date_created", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("tenant_id"),
)
def downgrade() -> None:
# Drop new_available_tenant table
op.drop_table("available_tenant")

View File

@@ -31,6 +31,7 @@ from onyx.db.engine import get_sqlalchemy_engine
from onyx.db.llm import update_default_provider
from onyx.db.llm import upsert_cloud_embedding_provider
from onyx.db.llm import upsert_llm_provider
from onyx.db.models import AvailableTenant
from onyx.db.models import IndexModelStatus
from onyx.db.models import SearchSettings
from onyx.db.models import UserTenantMapping
@@ -66,12 +67,28 @@ async def get_or_provision_tenant(
if referral_source and request:
await submit_to_hubspot(email, referral_source, request)
tenant_id: str | None = None
try:
# First, check if the user already has a tenant
tenant_id = get_tenant_id_for_email(email)
return tenant_id
except exceptions.UserNotExists:
# If tenant does not exist and in Multi tenant mode, provision a new tenant
# User doesn't exist, so we need to create a new tenant or assign an existing one
try:
tenant_id = await create_tenant(email, referral_source)
# Try to get a pre-provisioned tenant
tenant_id = await get_available_tenant()
if tenant_id:
# If we have a pre-provisioned tenant, assign it to the user
await assign_tenant_to_user(tenant_id, email, referral_source)
logger.info(
f"Assigned pre-provisioned tenant {tenant_id} to user {email}"
)
else:
# If no pre-provisioned tenant is available, create a new one on-demand
tenant_id = await create_tenant(email, referral_source)
except Exception as e:
logger.error(f"Tenant provisioning failed: {e}")
raise HTTPException(status_code=500, detail="Failed to provision tenant.")
@@ -85,13 +102,19 @@ async def get_or_provision_tenant(
async def create_tenant(email: str, referral_source: str | None = None) -> str:
"""
Create a new tenant on-demand when no pre-provisioned tenants are available.
This is the fallback method when we can't use a pre-provisioned tenant.
"""
tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4())
try:
# Provision tenant on data plane
await provision_tenant(tenant_id, email)
# Notify control plane
if not DEV_MODE:
# Notify control plane if not already done in provision_tenant
if not DEV_MODE and referral_source:
await notify_control_plane(tenant_id, email, referral_source)
except Exception as e:
logger.error(f"Tenant provisioning failed: {e}")
await rollback_tenant_provisioning(tenant_id)
@@ -109,54 +132,25 @@ async def provision_tenant(tenant_id: str, email: str) -> None:
)
logger.debug(f"Provisioning tenant {tenant_id} for user {email}")
token = None
try:
# Create the schema for the tenant
if not create_schema_if_not_exists(tenant_id):
logger.debug(f"Created schema for tenant {tenant_id}")
else:
logger.debug(f"Schema already exists for tenant {tenant_id}")
token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)
# Set up the tenant with all necessary configurations
await setup_tenant(tenant_id)
# Await the Alembic migrations
await asyncio.to_thread(run_alembic_migrations, tenant_id)
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
configure_default_api_keys(db_session)
current_search_settings = (
db_session.query(SearchSettings)
.filter_by(status=IndexModelStatus.FUTURE)
.first()
)
cohere_enabled = (
current_search_settings is not None
and current_search_settings.provider_type == EmbeddingProvider.COHERE
)
setup_onyx(db_session, tenant_id, cohere_enabled=cohere_enabled)
add_users_to_tenant([email], tenant_id)
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
create_milestone_and_report(
user=None,
distinct_id=tenant_id,
event_type=MilestoneRecordType.TENANT_CREATED,
properties={
"email": email,
},
db_session=db_session,
)
# Assign the tenant to the user
await assign_tenant_to_user(tenant_id, email)
except Exception as e:
logger.exception(f"Failed to create tenant {tenant_id}")
raise HTTPException(
status_code=500, detail=f"Failed to create tenant: {str(e)}"
)
finally:
if token is not None:
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
async def notify_control_plane(
@@ -353,3 +347,97 @@ async def delete_user_from_control_plane(tenant_id: str, email: str) -> None:
raise Exception(
f"Failed to delete tenant on control plane: {error_text}"
)
async def get_available_tenant() -> str | None:
"""
Get an available pre-provisioned tenant from the NewAvailableTenant table.
Returns the tenant_id if one is available, None otherwise.
"""
if not MULTI_TENANT:
return None
try:
with Session(get_sqlalchemy_engine()) as db_session:
# Get the oldest available tenant
available_tenant = (
db_session.query(AvailableTenant)
.order_by(AvailableTenant.date_created)
.first()
)
if available_tenant:
tenant_id = available_tenant.tenant_id
# Remove the tenant from the available tenants table
db_session.delete(available_tenant)
db_session.commit()
logger.info(f"Using pre-provisioned tenant {tenant_id}")
return tenant_id
except Exception as e:
logger.error(f"Error getting available tenant: {e}")
return None
async def setup_tenant(tenant_id: str) -> None:
"""
Set up a tenant with all necessary configurations.
This is a centralized function that handles all tenant setup logic.
"""
token = None
try:
token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)
# Run Alembic migrations
await asyncio.to_thread(run_alembic_migrations, tenant_id)
# Configure the tenant with default settings
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
# Configure default API keys
configure_default_api_keys(db_session)
# Set up Onyx with appropriate settings
current_search_settings = (
db_session.query(SearchSettings)
.filter_by(status=IndexModelStatus.FUTURE)
.first()
)
cohere_enabled = (
current_search_settings is not None
and current_search_settings.provider_type == EmbeddingProvider.COHERE
)
setup_onyx(db_session, tenant_id, cohere_enabled=cohere_enabled)
except Exception as e:
logger.exception(f"Failed to set up tenant {tenant_id}")
raise e
finally:
if token is not None:
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
async def assign_tenant_to_user(
tenant_id: str, email: str, referral_source: str | None = None
) -> None:
"""
Assign a tenant to a user and perform necessary operations.
"""
# Add the user to the tenant
add_users_to_tenant([email], tenant_id)
# Create milestone record
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
create_milestone_and_report(
user=None,
distinct_id=tenant_id,
event_type=MilestoneRecordType.TENANT_CREATED,
properties={
"email": email,
},
db_session=db_session,
)
# Notify control plane
if not DEV_MODE:
await notify_control_plane(tenant_id, email, referral_source)

View File

@@ -74,3 +74,21 @@ def drop_schema(tenant_id: str) -> None:
text("DROP SCHEMA IF EXISTS %(schema_name)s CASCADE"),
{"schema_name": tenant_id},
)
def get_current_alembic_version(tenant_id: str) -> str:
"""Get the current Alembic version for a tenant."""
from alembic.runtime.migration import MigrationContext
from sqlalchemy import text
engine = get_sqlalchemy_engine()
# Set the search path to the tenant's schema
with engine.connect() as connection:
connection.execute(text(f'SET search_path TO "{tenant_id}"'))
# Get the current version from the alembic_version table
context = MigrationContext.configure(connection)
current_rev = context.get_current_revision()
return current_rev or "head"

View File

@@ -111,5 +111,6 @@ celery_app.autodiscover_tasks(
"onyx.background.celery.tasks.vespa",
"onyx.background.celery.tasks.connector_deletion",
"onyx.background.celery.tasks.doc_permission_syncing",
"onyx.background.celery.tasks.tenant_provisioning",
]
)

View File

@@ -20,7 +20,7 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds)
# hack to slow down task dispatch in the cloud until
# we have a better implementation (backpressure, etc)
# Note that DynamicTenantScheduler can adjust the runtime value for this via Redis
CLOUD_BEAT_MULTIPLIER_DEFAULT = 8.0
CLOUD_BEAT_MULTIPLIER_DEFAULT = 8
# tasks that run in either self-hosted on cloud
beat_task_templates: list[dict] = []
@@ -167,6 +167,16 @@ beat_cloud_tasks: list[dict] = [
"expires": BEAT_EXPIRES_DEFAULT,
},
},
{
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-available-tenants",
"task": OnyxCeleryTask.CHECK_AVAILABLE_TENANTS,
"schedule": timedelta(minutes=10),
"options": {
"queue": OnyxCeleryQueues.MONITORING,
"priority": OnyxCeleryPriority.HIGH,
"expires": BEAT_EXPIRES_DEFAULT,
},
},
]
# tasks that only run self hosted

View File

@@ -0,0 +1,176 @@
"""
Periodic tasks for tenant pre-provisioning.
"""
import asyncio
import datetime
import uuid
from celery import shared_task
from celery import Task
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from ee.onyx.server.tenants.provisioning import setup_tenant
from ee.onyx.server.tenants.schema_management import create_schema_if_not_exists
from ee.onyx.server.tenants.schema_management import get_current_alembic_version
from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.app_configs import TARGET_AVAILABLE_TENANTS
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine import get_sqlalchemy_engine
from onyx.db.models import AvailableTenant
from onyx.redis.redis_pool import get_redis_client
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import TENANT_ID_PREFIX
# Default number of pre-provisioned tenants to maintain
DEFAULT_TARGET_AVAILABLE_TENANTS = 5
# Soft time limit for tenant pre-provisioning tasks (in seconds)
_TENANT_PROVISIONING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes
# Hard time limit for tenant pre-provisioning tasks (in seconds)
_TENANT_PROVISIONING_TIME_LIMIT = 60 * 10 # 10 minutes
@shared_task(
name=OnyxCeleryTask.CHECK_AVAILABLE_TENANTS,
ignore_result=True,
soft_time_limit=JOB_TIMEOUT,
trail=False,
bind=True,
)
def check_available_tenants(self: Task) -> None:
"""
Check if we have enough pre-provisioned tenants available.
If not, trigger the pre-provisioning of new tenants.
"""
task_logger.info("STARTING CHECK_AVAILABLE_TENANTS")
if not MULTI_TENANT:
task_logger.info(
"Multi-tenancy is not enabled, skipping tenant pre-provisioning"
)
return
r = get_redis_client()
lock_check: RedisLock = r.lock(
OnyxRedisLocks.CHECK_AVAILABLE_TENANTS_LOCK,
timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
)
# These tasks should never overlap
if not lock_check.acquire(blocking=False):
task_logger.info(
"Skipping check_available_tenants task because it is already running"
)
return
try:
# Get the current count of available tenants
with Session(get_sqlalchemy_engine()) as db_session:
available_tenants_count = db_session.query(AvailableTenant).count()
# Get the target number of available tenants
target_available_tenants = getattr(
TARGET_AVAILABLE_TENANTS, "value", DEFAULT_TARGET_AVAILABLE_TENANTS
)
# Calculate how many new tenants we need to provision
tenants_to_provision = max(
0, target_available_tenants - available_tenants_count
)
task_logger.info(
f"Available tenants: {available_tenants_count}, "
f"Target: {target_available_tenants}, "
f"To provision: {tenants_to_provision}"
)
# Trigger pre-provisioning tasks for each tenant needed
for _ in range(tenants_to_provision):
pre_provision_tenant.apply_async(
priority=OnyxCeleryPriority.LOW,
)
except Exception as e:
task_logger.exception(f"Error in check_available_tenants task: {e}")
finally:
lock_check.release()
@shared_task(
name=OnyxCeleryTask.PRE_PROVISION_TENANT,
ignore_result=True,
soft_time_limit=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
time_limit=_TENANT_PROVISIONING_TIME_LIMIT,
queue=OnyxCeleryQueues.PRIMARY,
bind=True,
)
def pre_provision_tenant(self: Task) -> None:
"""
Pre-provision a new tenant and store it in the NewAvailableTenant table.
This function fully sets up the tenant with all necessary configurations,
so it's ready to be assigned to a user immediately.
"""
task_logger.info("STARTING PRE_PROVISION_TENANT")
if not MULTI_TENANT:
task_logger.info(
"Multi-tenancy is not enabled, skipping tenant pre-provisioning"
)
return
r = get_redis_client()
lock_provision: RedisLock = r.lock(
OnyxRedisLocks.PRE_PROVISION_TENANT_LOCK,
timeout=_TENANT_PROVISIONING_SOFT_TIME_LIMIT,
)
# Allow multiple pre-provisioning tasks to run, but ensure they don't overlap
if not lock_provision.acquire(blocking=False):
task_logger.info(
"Skipping pre_provision_tenant task because it is already running"
)
return
try:
# Generate a new tenant ID
tenant_id = TENANT_ID_PREFIX + str(uuid.uuid4())
task_logger.info(f"Starting pre-provisioning for tenant {tenant_id}")
# Create the schema for the new tenant
schema_created = create_schema_if_not_exists(tenant_id)
if schema_created:
task_logger.info(f"Created schema for tenant '{tenant_id}'")
else:
task_logger.info(f"Schema already exists for tenant '{tenant_id}'")
# Set up the tenant with all necessary configurations
task_logger.info(f"Setting up tenant configuration for '{tenant_id}'")
asyncio.run(setup_tenant(tenant_id))
task_logger.info(f"Tenant configuration completed for '{tenant_id}'")
# Get the current Alembic version
alembic_version = get_current_alembic_version(tenant_id)
task_logger.info(
f"Tenant '{tenant_id}' using Alembic version: {alembic_version}"
)
# Store the pre-provisioned tenant in the database
task_logger.info(f"Storing pre-provisioned tenant '{tenant_id}' in database")
with Session(get_sqlalchemy_engine()) as db_session:
new_tenant = AvailableTenant(
tenant_id=tenant_id,
alembic_version=alembic_version,
date_created=datetime.datetime.now(),
)
db_session.add(new_tenant)
db_session.commit()
task_logger.info(f"Successfully pre-provisioned tenant {tenant_id}")
except Exception as e:
task_logger.exception(f"Error in pre_provision_tenant task: {e}")
finally:
lock_provision.release()

View File

@@ -643,3 +643,6 @@ MOCK_LLM_RESPONSE = (
DEFAULT_IMAGE_ANALYSIS_MAX_SIZE_MB = 20
# Number of pre-provisioned tenants to maintain
TARGET_AVAILABLE_TENANTS = int(os.environ.get("TARGET_AVAILABLE_TENANTS", "5"))

View File

@@ -321,6 +321,8 @@ class OnyxRedisLocks:
"da_lock:check_connector_external_group_sync_beat"
)
MONITOR_BACKGROUND_PROCESSES_LOCK = "da_lock:monitor_background_processes"
CHECK_AVAILABLE_TENANTS_LOCK = "da_lock:check_available_tenants"
PRE_PROVISION_TENANT_LOCK = "da_lock:pre_provision_tenant"
CONNECTOR_DOC_PERMISSIONS_SYNC_LOCK_PREFIX = (
"da_lock:connector_doc_permissions_sync"
@@ -383,6 +385,7 @@ class OnyxCeleryTask:
CLOUD_MONITOR_CELERY_QUEUES = (
f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor_celery_queues"
)
CHECK_AVAILABLE_TENANTS = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_available_tenants"
CHECK_FOR_CONNECTOR_DELETION = "check_for_connector_deletion_task"
CHECK_FOR_VESPA_SYNC_TASK = "check_for_vespa_sync_task"
@@ -399,6 +402,9 @@ class OnyxCeleryTask:
MONITOR_BACKGROUND_PROCESSES = "monitor_background_processes"
MONITOR_CELERY_QUEUES = "monitor_celery_queues"
# Tenant pre-provisioning
PRE_PROVISION_TENANT = "pre_provision_tenant"
KOMBU_MESSAGE_CLEANUP_TASK = "kombu_message_cleanup_task"
CONNECTOR_PERMISSION_SYNC_GENERATOR_TASK = (
"connector_permission_sync_generator_task"

View File

@@ -2310,6 +2310,17 @@ class UserTenantMapping(Base):
return value.lower() if value else value
class AvailableTenant(Base):
__tablename__ = "available_tenant"
"""
These entries will only exist ephemerally and are meant to be picked up by new users on registration.
"""
tenant_id: Mapped[str] = mapped_column(String, primary_key=True, nullable=False)
alembic_version: Mapped[str] = mapped_column(String, nullable=False)
date_created: Mapped[datetime.datetime] = mapped_column(DateTime, nullable=False)
# This is a mapping from tenant IDs to anonymous user paths
class TenantAnonymousUserPath(Base):
__tablename__ = "tenant_anonymous_user_path"

View File

@@ -1259,21 +1259,18 @@ export function createConnectorInitialValues(
name: "",
groups: [],
access_type: "public",
...configuration.values.reduce(
(acc, field) => {
if (field.type === "select") {
acc[field.name] = null;
} else if (field.type === "list") {
acc[field.name] = field.default || [];
} else if (field.type === "checkbox") {
acc[field.name] = field.default || false;
} else if (field.default !== undefined) {
acc[field.name] = field.default;
}
return acc;
},
{} as { [record: string]: any }
),
...configuration.values.reduce((acc, field) => {
if (field.type === "select") {
acc[field.name] = null;
} else if (field.type === "list") {
acc[field.name] = field.default || [];
} else if (field.type === "checkbox") {
acc[field.name] = field.default || false;
} else if (field.default !== undefined) {
acc[field.name] = field.default;
}
return acc;
}, {} as { [record: string]: any }),
};
}
@@ -1285,28 +1282,25 @@ export function createConnectorValidationSchema(
return Yup.object().shape({
access_type: Yup.string().required("Access Type is required"),
name: Yup.string().required("Connector Name is required"),
...configuration.values.reduce(
(acc, field) => {
let schema: any =
field.type === "select"
? Yup.string()
: field.type === "list"
? Yup.array().of(Yup.string())
: field.type === "checkbox"
? Yup.boolean()
: field.type === "file"
? Yup.mixed()
: Yup.string();
...configuration.values.reduce((acc, field) => {
let schema: any =
field.type === "select"
? Yup.string()
: field.type === "list"
? Yup.array().of(Yup.string())
: field.type === "checkbox"
? Yup.boolean()
: field.type === "file"
? Yup.mixed()
: Yup.string();
if (!field.optional) {
schema = schema.required(`${field.label} is required`);
}
if (!field.optional) {
schema = schema.required(`${field.label} is required`);
}
acc[field.name] = schema;
return acc;
},
{} as Record<string, any>
),
acc[field.name] = schema;
return acc;
}, {} as Record<string, any>),
// These are advanced settings
indexingStart: Yup.string().nullable(),
pruneFreq: Yup.number().min(0, "Prune frequency must be non-negative"),