Compare commits

...

6 Commits

Author SHA1 Message Date
pablodanswer
b733cbc755 nit 2025-01-17 13:25:04 -08:00
pablodanswer
c6f919cde2 bot -> app 2025-01-17 13:22:21 -08:00
pablodanswer
c15d88954c nit 2025-01-17 13:21:11 -08:00
pablodanswer
52952cabcb nit 2025-01-17 13:21:01 -08:00
pablodanswer
dfbdd1db81 update 2025-01-17 13:16:38 -08:00
pablodanswer
43f4c84280 initial pass 2025-01-17 13:07:03 -08:00

View File

@@ -159,10 +159,15 @@ class SlackbotHandler:
while not self._shutdown_event.is_set():
try:
self.acquire_tenants()
# After we finish acquiring and managing Slack bots,
# set the gauge to the number of active tenants (those with Slack bots).
active_tenants_gauge.labels(namespace=POD_NAMESPACE, pod=POD_NAME).set(
len(self.tenant_ids)
)
logger.debug(f"Current active tenants: {len(self.tenant_ids)}")
logger.debug(
f"Current active tenants with Slack bots: {len(self.tenant_ids)}"
)
except Exception as e:
logger.exception(f"Error in Slack acquisition: {e}")
self._shutdown_event.wait(timeout=TENANT_ACQUISITION_INTERVAL)
@@ -171,7 +176,9 @@ class SlackbotHandler:
while not self._shutdown_event.is_set():
try:
self.send_heartbeats()
logger.debug(f"Sent heartbeats for {len(self.tenant_ids)} tenants")
logger.debug(
f"Sent heartbeats for {len(self.tenant_ids)} active tenants"
)
except Exception as e:
logger.exception(f"Error in heartbeat loop: {e}")
self._shutdown_event.wait(timeout=TENANT_HEARTBEAT_INTERVAL)
@@ -185,11 +192,10 @@ class SlackbotHandler:
)
tenant_bot_pair = (tenant_id, bot.id)
# If the tokens are not set, we need to close the socket client and delete the tokens
# for the tenant and app
# If the tokens are missing or empty, close the socket client and remove them.
if not slack_bot_tokens:
logger.debug(
f"No Slack bot token found for tenant {tenant_id}, bot {bot.id}"
f"No Slack bot tokens found for tenant={tenant_id}, bot {bot.id}"
)
if tenant_bot_pair in self.socket_clients:
asyncio.run(self.socket_clients[tenant_bot_pair].close())
@@ -204,9 +210,10 @@ class SlackbotHandler:
if not tokens_exist or tokens_changed:
if tokens_exist:
logger.info(
f"Slack Bot tokens have changed for tenant {tenant_id}, bot {bot.id} - reconnecting"
f"Slack Bot tokens changed for tenant={tenant_id}, bot {bot.id}; reconnecting"
)
else:
# Warm up the model if needed
search_settings = get_current_search_settings(db_session)
embedding_model = EmbeddingModel.from_db_model(
search_settings=search_settings,
@@ -217,29 +224,38 @@ class SlackbotHandler:
self.slack_bot_tokens[tenant_bot_pair] = slack_bot_tokens
# Close any existing connection first
if tenant_bot_pair in self.socket_clients:
asyncio.run(self.socket_clients[tenant_bot_pair].close())
self.start_socket_client(bot.id, tenant_id, slack_bot_tokens)
def acquire_tenants(self) -> None:
tenant_ids = get_all_tenant_ids()
"""
- Attempt to acquire a lock for each tenant.
- If acquired, check if that tenant actually has Slack bots.
- If yes, store them in self.tenant_ids and manage the socket connections.
- If a tenant in self.tenant_ids no longer has Slack bots, remove it and release lock.
"""
all_tenants = get_all_tenant_ids()
for tenant_id in tenant_ids:
# 1) Try to acquire locks for new tenants
for tenant_id in all_tenants:
if (
DISALLOWED_SLACK_BOT_TENANT_LIST is not None
and tenant_id in DISALLOWED_SLACK_BOT_TENANT_LIST
):
logger.debug(f"Tenant {tenant_id} is in the disallowed list, skipping")
logger.debug(f"Tenant {tenant_id} is disallowed; skipping.")
continue
# Already acquired in a previous loop iteration?
if tenant_id in self.tenant_ids:
logger.debug(f"Tenant {tenant_id} already in self.tenant_ids")
continue
# Respect max tenant limit per pod
if len(self.tenant_ids) >= MAX_TENANTS_PER_POD:
logger.info(
f"Max tenants per pod reached ({MAX_TENANTS_PER_POD}) Not acquiring any more tenants"
f"Max tenants per pod reached ({MAX_TENANTS_PER_POD}); not acquiring more."
)
break
@@ -252,42 +268,111 @@ class SlackbotHandler:
ex=TENANT_LOCK_EXPIRATION,
)
if not acquired and not DEV_MODE:
logger.debug(f"Another pod holds the lock for tenant {tenant_id}")
logger.debug(
f"Another pod holds the lock for tenant {tenant_id}, skipping."
)
continue
logger.debug(f"Acquired lock for tenant {tenant_id}")
logger.debug(f"Acquired lock for tenant {tenant_id}.")
self.tenant_ids.add(tenant_id)
for tenant_id in self.tenant_ids:
# Now check if this tenant actually has Slack bots before adding to self.tenant_ids
token = CURRENT_TENANT_ID_CONTEXTVAR.set(
tenant_id or POSTGRES_DEFAULT_SCHEMA
)
try:
with get_session_with_tenant(tenant_id) as db_session:
bots: list[SlackBot] = []
try:
bots = fetch_slack_bots(db_session=db_session)
bots = list(fetch_slack_bots(db_session=db_session))
except KvKeyNotFoundError:
# No Slackbot tokens, pass
pass
except Exception as e:
logger.exception(
f"Error fetching Slack bots for tenant {tenant_id}: {e}"
)
if bots:
# Mark as active tenant
self.tenant_ids.add(tenant_id)
for bot in bots:
self._manage_clients_per_tenant(
db_session=db_session,
tenant_id=tenant_id,
bot=bot,
)
except KvKeyNotFoundError:
logger.debug(f"Missing Slack Bot tokens for tenant {tenant_id}")
if (tenant_id, bot.id) in self.socket_clients:
asyncio.run(self.socket_clients[tenant_id, bot.id].close())
del self.socket_clients[tenant_id, bot.id]
del self.slack_bot_tokens[tenant_id, bot.id]
except Exception as e:
logger.exception(f"Error handling tenant {tenant_id}: {e}")
elif not DEV_MODE:
# If no Slack bots, release lock immediately
redis_client.delete(OnyxRedisLocks.SLACK_BOT_LOCK)
logger.debug(
f"No Slack bots for tenant {tenant_id}; lock released."
)
finally:
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
# 2) Make sure tenants we're handling still have Slack bots
for tenant_id in list(self.tenant_ids):
token = CURRENT_TENANT_ID_CONTEXTVAR.set(
tenant_id or POSTGRES_DEFAULT_SCHEMA
)
try:
with get_session_with_tenant(tenant_id) as db_session:
# Attempt to fetch Slack bots
try:
bots = list(fetch_slack_bots(db_session=db_session))
except KvKeyNotFoundError:
# No Slackbot tokens, pass (and remove below)
bots = []
except Exception as e:
logger.exception(f"Error handling tenant {tenant_id}: {e}")
bots = []
if not bots:
logger.info(
f"Tenant {tenant_id} no longer has Slack bots. Removing."
)
self._remove_tenant(tenant_id)
else:
# Manage or reconnect Slack bot sockets
for bot in bots:
self._manage_clients_per_tenant(
db_session=db_session,
tenant_id=tenant_id,
bot=bot,
)
finally:
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
def _remove_tenant(self, tenant_id: str | None) -> None:
"""
Helper to remove a tenant from `self.tenant_ids`, close any socket clients,
and release the lock (if not in DEV_MODE).
"""
if tenant_id not in self.tenant_ids:
return
# Close all socket clients for this tenant
for (tenant_id, slack_bot_id), client in list(self.socket_clients.items()):
if tenant_id == tenant_id:
asyncio.run(client.close())
del self.socket_clients[(tenant_id, slack_bot_id)]
del self.slack_bot_tokens[(tenant_id, slack_bot_id)]
logger.info(
f"Stopped SocketModeClient for tenant: {tenant_id}, app: {slack_bot_id}"
)
# Remove from active set
self.tenant_ids.remove(tenant_id)
# Release the lock if not in dev mode
if not DEV_MODE:
redis_client = get_redis_client(tenant_id=tenant_id)
redis_client.delete(OnyxRedisLocks.SLACK_BOT_LOCK)
logger.info(f"Released lock for tenant {tenant_id}")
def send_heartbeats(self) -> None:
current_time = int(time.time())
logger.debug(f"Sending heartbeats for {len(self.tenant_ids)} tenants")
logger.debug(f"Sending heartbeats for {len(self.tenant_ids)} active tenants")
for tenant_id in self.tenant_ids:
redis_client = get_redis_client(tenant_id=tenant_id)
heartbeat_key = f"{OnyxRedisLocks.SLACK_BOT_HEARTBEAT_PREFIX}:{self.pod_id}"
@@ -315,6 +400,7 @@ class SlackbotHandler:
)
socket_client.connect()
self.socket_clients[tenant_id, slack_bot_id] = socket_client
# Ensure tenant is tracked as active
self.tenant_ids.add(tenant_id)
logger.info(
f"Started SocketModeClient for tenant: {tenant_id}, app: {slack_bot_id}"
@@ -322,7 +408,7 @@ class SlackbotHandler:
def stop_socket_clients(self) -> None:
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
for (tenant_id, slack_bot_id), client in self.socket_clients.items():
for (tenant_id, slack_bot_id), client in list(self.socket_clients.items()):
asyncio.run(client.close())
logger.info(
f"Stopped SocketModeClient for tenant: {tenant_id}, app: {slack_bot_id}"
@@ -350,7 +436,7 @@ class SlackbotHandler:
except Exception as e:
logger.error(f"Error releasing lock for tenant {tenant_id}: {e}")
# Wait for background threads to finish (with timeout)
# Wait for background threads to finish (with a timeout)
logger.info("Waiting for background threads to finish...")
self.acquire_thread.join(timeout=5)
self.heartbeat_thread.join(timeout=5)