Compare commits

..

8 Commits

Author SHA1 Message Date
pablonyx
02322b8567 k 2025-03-11 16:03:02 -07:00
pablonyx
fef08ffdc6 k 2025-03-11 16:02:10 -07:00
pablonyx
168d77a3d7 redis -> pg advisory lock (https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS) 2025-03-11 15:59:17 -07:00
pablonyx
b077de1449 improved safety 2025-03-11 15:49:31 -07:00
pablonyx
307c07b12e k 2025-03-06 09:39:18 -08:00
pablonyx
cec05c5ee9 Revert "k"
This reverts commit 687122911d.
2025-03-06 09:38:31 -08:00
Richard Kuo (Danswer)
eaf054ef06 oauth router went missing? 2025-03-05 15:50:23 -08:00
pablonyx
a7a1a24658 minor nit 2025-03-05 15:35:02 -08:00
7 changed files with 339 additions and 219 deletions

View File

@@ -7,14 +7,25 @@ Create Date: 2025-02-26 13:07:56.217791
"""
from alembic import op
import time
import hashlib
from sqlalchemy import text
# Remove Redis import as we're not using it anymore
# from onyx.redis.redis_pool import get_redis_client
# from onyx.configs.app_configs import ALEMBIC_MIGRATION_LOCK_KEY
# revision identifiers, used by Alembic.
revision = "3bd4c84fe72f"
down_revision = "8f43500ee275"
branch_labels = None
depends_on = None
# Define a constant for our advisory lock
# Converting a string to a bigint for advisory lock
ALEMBIC_MIGRATION_LOCK_KEY = int(
hashlib.md5("alembic_migration_lock".encode()).hexdigest()[:15], 16
)
# NOTE:
# This migration addresses issues with the previous migration (8f43500ee275) which caused
@@ -28,24 +39,63 @@ depends_on = None
# 4. Adds indexes to both chat_message and chat_session tables for comprehensive search
def upgrade():
# --- PART 1: chat_message table ---
# Step 1: Add nullable column (quick, minimal locking)
# op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv")
# op.execute("DROP TRIGGER IF EXISTS chat_message_tsv_trigger ON chat_message")
# op.execute("DROP FUNCTION IF EXISTS update_chat_message_tsv()")
# op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv")
# # Drop chat_session tsv trigger if it exists
# op.execute("DROP TRIGGER IF EXISTS chat_session_tsv_trigger ON chat_session")
# op.execute("DROP FUNCTION IF EXISTS update_chat_session_tsv()")
# op.execute("ALTER TABLE chat_session DROP COLUMN IF EXISTS title_tsv")
# raise Exception("Stop here")
time.time()
op.execute("ALTER TABLE chat_message ADD COLUMN IF NOT EXISTS message_tsv tsvector")
def upgrade() -> None:
# Use PostgreSQL advisory locks to ensure only one migration runs at a time
connection = op.get_bind()
# Step 2: Create function and trigger for new/updated rows
op.execute(
"""
# Try to acquire an advisory lock (exclusive, session level)
lock_acquired = connection.execute(
text("SELECT pg_try_advisory_lock(:lock_key)").bindparams(
lock_key=ALEMBIC_MIGRATION_LOCK_KEY
)
).scalar()
if not lock_acquired:
raise Exception(
"Migration already in progress by another process. Try again later."
)
try:
# --- PART 1: chat_message table ---
# Step 1: Add nullable column (quick, minimal locking)
op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv_gen")
op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv")
op.execute("DROP TRIGGER IF EXISTS chat_message_tsv_trigger ON chat_message")
op.execute("DROP FUNCTION IF EXISTS update_chat_message_tsv()")
op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv")
# Drop chat_session tsv trigger if it exists
op.execute("DROP TRIGGER IF EXISTS chat_session_tsv_trigger ON chat_session")
op.execute("DROP FUNCTION IF EXISTS update_chat_session_tsv()")
op.execute("ALTER TABLE chat_session DROP COLUMN IF EXISTS title_tsv")
# Drop all indexes that will be created later (using CONCURRENTLY to avoid locking)
op.execute("COMMIT") # Required for CONCURRENTLY
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_chat_message_tsv")
op.execute("COMMIT")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_chat_message_tsv_gen")
op.execute("COMMIT")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_chat_session_desc_tsv")
op.execute("COMMIT")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_chat_session_desc_tsv_gen")
op.execute("COMMIT")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_chat_message_message_lower")
op.execute("COMMIT")
# Drop any column on chat_session that will be created
op.execute("ALTER TABLE chat_session DROP COLUMN IF EXISTS description_tsv")
op.execute("ALTER TABLE chat_session DROP COLUMN IF EXISTS description_tsv_gen")
# Begin a new transaction before continuing
op.execute("BEGIN")
time.time()
op.execute(
"ALTER TABLE chat_message ADD COLUMN IF NOT EXISTS message_tsv tsvector"
)
# Step 2: Create function and trigger for new/updated rows
op.execute(
"""
CREATE OR REPLACE FUNCTION update_chat_message_tsv()
RETURNS TRIGGER AS $$
BEGIN
@@ -54,42 +104,42 @@ def upgrade():
END;
$$ LANGUAGE plpgsql
"""
)
)
# Create trigger in a separate execute call
op.execute(
"""
# Create trigger in a separate execute call
op.execute(
"""
CREATE TRIGGER chat_message_tsv_trigger
BEFORE INSERT OR UPDATE ON chat_message
FOR EACH ROW EXECUTE FUNCTION update_chat_message_tsv()
"""
)
)
# Step 3: Update existing rows in batches using Python
time.time()
# Step 3: Update existing rows in batches using Python
time.time()
# Get connection and count total rows
connection = op.get_bind()
total_count_result = connection.execute(
text("SELECT COUNT(*) FROM chat_message")
).scalar()
total_count = total_count_result if total_count_result is not None else 0
batch_size = 5000
batches = 0
# Get connection and count total rows
connection = op.get_bind()
total_count_result = connection.execute(
text("SELECT COUNT(*) FROM chat_message")
).scalar()
total_count = total_count_result if total_count_result is not None else 0
batch_size = 5000
batches = 0
# Calculate total batches needed
total_batches = (
(total_count + batch_size - 1) // batch_size if total_count > 0 else 0
)
# Calculate total batches needed
total_batches = (
(total_count + batch_size - 1) // batch_size if total_count > 0 else 0
)
# Process in batches - properly handling UUIDs by using OFFSET/LIMIT approach
for batch_num in range(total_batches):
offset = batch_num * batch_size
# Process in batches - properly handling UUIDs by using OFFSET/LIMIT approach
for batch_num in range(total_batches):
offset = batch_num * batch_size
# Execute update for this batch using OFFSET/LIMIT which works with UUIDs
connection.execute(
text(
"""
# Execute update for this batch using OFFSET/LIMIT which works with UUIDs
connection.execute(
text(
"""
UPDATE chat_message
SET message_tsv = to_tsvector('english', message)
WHERE id IN (
@@ -99,124 +149,124 @@ def upgrade():
LIMIT :batch_size OFFSET :offset
)
"""
).bindparams(batch_size=batch_size, offset=offset)
)
).bindparams(batch_size=batch_size, offset=offset)
)
# Commit each batch
connection.execute(text("COMMIT"))
# Start a new transaction
connection.execute(text("BEGIN"))
# Commit each batch
connection.execute(text("COMMIT"))
# Start a new transaction
connection.execute(text("BEGIN"))
batches += 1
batches += 1
# Final check for any remaining NULL values
connection.execute(
text(
"""
# Final check for any remaining NULL values
connection.execute(
text(
"""
UPDATE chat_message SET message_tsv = to_tsvector('english', message)
WHERE message_tsv IS NULL
"""
)
)
)
# Create GIN index concurrently
connection.execute(text("COMMIT"))
# Create GIN index concurrently
connection.execute(text("COMMIT"))
time.time()
time.time()
connection.execute(
text(
"""
connection.execute(
text(
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_message_tsv
ON chat_message USING GIN (message_tsv)
"""
)
)
)
# First drop the trigger as it won't be needed anymore
connection.execute(
text(
"""
# First drop the trigger as it won't be needed anymore
connection.execute(
text(
"""
DROP TRIGGER IF EXISTS chat_message_tsv_trigger ON chat_message;
"""
)
)
)
connection.execute(
text(
"""
connection.execute(
text(
"""
DROP FUNCTION IF EXISTS update_chat_message_tsv();
"""
)
)
)
# Add new generated column
time.time()
connection.execute(
text(
"""
# Add new generated column
time.time()
connection.execute(
text(
"""
ALTER TABLE chat_message
ADD COLUMN message_tsv_gen tsvector
GENERATED ALWAYS AS (to_tsvector('english', message)) STORED;
"""
)
)
)
connection.execute(text("COMMIT"))
connection.execute(text("COMMIT"))
time.time()
time.time()
connection.execute(
text(
"""
connection.execute(
text(
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_message_tsv_gen
ON chat_message USING GIN (message_tsv_gen)
"""
)
)
)
# Drop old index and column
connection.execute(text("COMMIT"))
# Drop old index and column
connection.execute(text("COMMIT"))
connection.execute(
text(
"""
connection.execute(
text(
"""
DROP INDEX CONCURRENTLY IF EXISTS idx_chat_message_tsv;
"""
)
)
)
connection.execute(text("COMMIT"))
connection.execute(
text(
"""
connection.execute(text("COMMIT"))
connection.execute(
text(
"""
ALTER TABLE chat_message DROP COLUMN message_tsv;
"""
)
)
)
# Rename new column to old name
connection.execute(
text(
"""
# Rename new column to old name
connection.execute(
text(
"""
ALTER TABLE chat_message RENAME COLUMN message_tsv_gen TO message_tsv;
"""
)
)
)
# --- PART 2: chat_session table ---
# --- PART 2: chat_session table ---
# Step 1: Add nullable column (quick, minimal locking)
time.time()
connection.execute(
text(
"ALTER TABLE chat_session ADD COLUMN IF NOT EXISTS description_tsv tsvector"
# Step 1: Add nullable column (quick, minimal locking)
time.time()
connection.execute(
text(
"ALTER TABLE chat_session ADD COLUMN IF NOT EXISTS description_tsv tsvector"
)
)
)
# Step 2: Create function and trigger for new/updated rows - SPLIT INTO SEPARATE CALLS
connection.execute(
text(
"""
# Step 2: Create function and trigger for new/updated rows - SPLIT INTO SEPARATE CALLS
connection.execute(
text(
"""
CREATE OR REPLACE FUNCTION update_chat_session_tsv()
RETURNS TRIGGER AS $$
BEGIN
@@ -225,161 +275,177 @@ def upgrade():
END;
$$ LANGUAGE plpgsql
"""
)
)
)
# Create trigger in a separate execute call
connection.execute(
text(
"""
# Create trigger in a separate execute call
connection.execute(
text(
"""
CREATE TRIGGER chat_session_tsv_trigger
BEFORE INSERT OR UPDATE ON chat_session
FOR EACH ROW EXECUTE FUNCTION update_chat_session_tsv()
"""
)
)
)
# Step 3: Update existing rows in batches using Python
time.time()
# Step 3: Update existing rows in batches using Python
time.time()
# Get the maximum ID to determine batch count
# Cast id to text for MAX function since it's a UUID
max_id_result = connection.execute(
text("SELECT COALESCE(MAX(id::text), '0') FROM chat_session")
).scalar()
max_id_result if max_id_result is not None else "0"
batch_size = 5000
batches = 0
# Get the maximum ID to determine batch count
# Cast id to text for MAX function since it's a UUID
max_id_result = connection.execute(
text("SELECT COALESCE(MAX(id::text), '0') FROM chat_session")
).scalar()
max_id_result if max_id_result is not None else "0"
batch_size = 5000
batches = 0
# Get all IDs ordered to process in batches
rows = connection.execute(
text("SELECT id FROM chat_session ORDER BY id")
).fetchall()
total_rows = len(rows)
# Get all IDs ordered to process in batches
rows = connection.execute(
text("SELECT id FROM chat_session ORDER BY id")
).fetchall()
total_rows = len(rows)
# Process in batches
for batch_num, batch_start in enumerate(range(0, total_rows, batch_size)):
batch_end = min(batch_start + batch_size, total_rows)
batch_ids = [row[0] for row in rows[batch_start:batch_end]]
# Process in batches
for batch_num, batch_start in enumerate(range(0, total_rows, batch_size)):
batch_end = min(batch_start + batch_size, total_rows)
batch_ids = [row[0] for row in rows[batch_start:batch_end]]
if not batch_ids:
continue
if not batch_ids:
continue
# Use IN clause instead of BETWEEN for UUIDs
placeholders = ", ".join([f":id{i}" for i in range(len(batch_ids))])
params = {f"id{i}": id_val for i, id_val in enumerate(batch_ids)}
# Use IN clause instead of BETWEEN for UUIDs
placeholders = ", ".join([f":id{i}" for i in range(len(batch_ids))])
params = {f"id{i}": id_val for i, id_val in enumerate(batch_ids)}
# Execute update for this batch
connection.execute(
text(
f"""
# Execute update for this batch
connection.execute(
text(
f"""
UPDATE chat_session
SET description_tsv = to_tsvector('english', COALESCE(description, ''))
WHERE id IN ({placeholders})
AND description_tsv IS NULL
"""
).bindparams(**params)
)
).bindparams(**params)
)
# Commit each batch
connection.execute(text("COMMIT"))
# Start a new transaction
connection.execute(text("BEGIN"))
# Commit each batch
connection.execute(text("COMMIT"))
# Start a new transaction
connection.execute(text("BEGIN"))
batches += 1
batches += 1
# Final check for any remaining NULL values
connection.execute(
text(
"""
# Final check for any remaining NULL values
connection.execute(
text(
"""
UPDATE chat_session SET description_tsv = to_tsvector('english', COALESCE(description, ''))
WHERE description_tsv IS NULL
"""
)
)
)
# Create GIN index concurrently
connection.execute(text("COMMIT"))
# Create GIN index concurrently
connection.execute(text("COMMIT"))
time.time()
connection.execute(
text(
"""
time.time()
connection.execute(
text(
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_session_desc_tsv
ON chat_session USING GIN (description_tsv)
"""
)
)
)
# After Final check for chat_session
# First drop the trigger as it won't be needed anymore
connection.execute(
text(
"""
# After Final check for chat_session
# First drop the trigger as it won't be needed anymore
connection.execute(
text(
"""
DROP TRIGGER IF EXISTS chat_session_tsv_trigger ON chat_session;
"""
)
)
)
connection.execute(
text(
"""
connection.execute(
text(
"""
DROP FUNCTION IF EXISTS update_chat_session_tsv();
"""
)
)
)
# Add new generated column
time.time()
connection.execute(
text(
"""
# Add new generated column
time.time()
connection.execute(
text(
"""
ALTER TABLE chat_session
ADD COLUMN description_tsv_gen tsvector
GENERATED ALWAYS AS (to_tsvector('english', COALESCE(description, ''))) STORED;
"""
)
)
)
# Create new index on generated column
connection.execute(text("COMMIT"))
# Create new index on generated column
connection.execute(text("COMMIT"))
time.time()
connection.execute(
text(
"""
time.time()
connection.execute(
text(
"""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_session_desc_tsv_gen
ON chat_session USING GIN (description_tsv_gen)
"""
)
)
)
# Drop old index and column
connection.execute(text("COMMIT"))
# Drop old index and column
connection.execute(text("COMMIT"))
connection.execute(
text(
"""
connection.execute(
text(
"""
DROP INDEX CONCURRENTLY IF EXISTS idx_chat_session_desc_tsv;
"""
)
)
)
connection.execute(text("COMMIT"))
connection.execute(
text(
"""
connection.execute(text("COMMIT"))
connection.execute(
text(
"""
ALTER TABLE chat_session DROP COLUMN description_tsv;
"""
)
)
)
# Rename new column to old name
connection.execute(
text(
"""
# Rename new column to old name
connection.execute(
text(
"""
ALTER TABLE chat_session RENAME COLUMN description_tsv_gen TO description_tsv;
"""
)
)
except Exception as e:
# Make sure to release the lock in case of error
connection.execute(
text("SELECT pg_advisory_unlock(:lock_key)").bindparams(
lock_key=ALEMBIC_MIGRATION_LOCK_KEY
)
)
raise e
finally:
# Release the advisory lock when done
connection.execute(
text("SELECT pg_advisory_unlock(:lock_key)").bindparams(
lock_key=ALEMBIC_MIGRATION_LOCK_KEY
)
)
)
def downgrade() -> None:

View File

45
backend/asdf.py Normal file
View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python
"""
Simple script that keeps trying to run 'alembic upgrade head' until it succeeds.
"""
import subprocess
import sys
import time
# Path to alembic.ini (change this if needed)
ALEMBIC_CONFIG = "alembic.ini"
# Time to wait between attempts (in seconds)
WAIT_TIME = 0
print("Starting continuous alembic upgrade attempts")
print(f"Using config: {ALEMBIC_CONFIG}")
print(f"Will retry every {WAIT_TIME} seconds until successful")
attempt = 1
while True:
print(f"\nAttempt #{attempt} to run alembic upgrade head")
try:
# Run the alembic upgrade head command
result = subprocess.run(
["alembic", "-c", ALEMBIC_CONFIG, "upgrade", "head"],
check=True,
capture_output=True,
text=True,
)
# If we get here, the command was successful
print("SUCCESS! Alembic upgrade completed successfully.")
print(f"Output: {result.stdout}")
sys.exit(0)
except subprocess.CalledProcessError as e:
# Command failed, print error and try again
print(f"FAILED with return code {e.returncode}")
print(f"Error output: {e.stderr}")
print(f"Waiting {WAIT_TIME} seconds before next attempt...")
time.sleep(WAIT_TIME)
attempt += 1

View File

@@ -15,7 +15,7 @@ from ee.onyx.server.enterprise_settings.api import (
)
from ee.onyx.server.manage.standard_answer import router as standard_answer_router
from ee.onyx.server.middleware.tenant_tracking import add_tenant_id_middleware
from ee.onyx.server.oauth.api import router as oauth_router
from ee.onyx.server.oauth.api import router as ee_oauth_router
from ee.onyx.server.query_and_chat.chat_backend import (
router as chat_router,
)
@@ -128,7 +128,7 @@ def get_application() -> FastAPI:
include_router_with_global_prefix_prepended(application, query_router)
include_router_with_global_prefix_prepended(application, chat_router)
include_router_with_global_prefix_prepended(application, standard_answer_router)
include_router_with_global_prefix_prepended(application, oauth_router)
include_router_with_global_prefix_prepended(application, ee_oauth_router)
# Enterprise-only global settings
include_router_with_global_prefix_prepended(

View File

@@ -643,3 +643,7 @@ MOCK_LLM_RESPONSE = (
DEFAULT_IMAGE_ANALYSIS_MAX_SIZE_MB = 20
ALEMBIC_MIGRATION_LOCK_KEY = os.environ.get(
"ALEMBIC_MIGRATION_LOCK_KEY", "alembic_migration_lock"
)

View File

@@ -51,6 +51,7 @@ from onyx.server.documents.cc_pair import router as cc_pair_router
from onyx.server.documents.connector import router as connector_router
from onyx.server.documents.credential import router as credential_router
from onyx.server.documents.document import router as document_router
from onyx.server.documents.standard_oauth import router as standard_oauth_router
from onyx.server.features.document_set.api import router as document_set_router
from onyx.server.features.folder.api import router as folder_router
from onyx.server.features.input_prompt.api import (
@@ -322,6 +323,7 @@ def get_application() -> FastAPI:
)
include_router_with_global_prefix_prepended(application, long_term_logs_router)
include_router_with_global_prefix_prepended(application, api_key_router)
include_router_with_global_prefix_prepended(application, standard_oauth_router)
if AUTH_TYPE == AuthType.DISABLED:
# Server logs this during auth setup verification step

View File

@@ -290,21 +290,24 @@ export function SettingsForm() {
id="chatRetentionInput"
placeholder="Infinite Retention"
/>
<Button
onClick={handleSetChatRetention}
variant="submit"
size="sm"
className="mr-3"
>
Set Retention Limit
</Button>
<Button
onClick={handleClearChatRetention}
variant="default"
size="sm"
>
Retain All
</Button>
<div className="mr-auto flex gap-2">
<Button
onClick={handleSetChatRetention}
variant="submit"
size="sm"
className="mr-auto"
>
Set Retention Limit
</Button>
<Button
onClick={handleClearChatRetention}
variant="default"
size="sm"
className="mr-auto"
>
Retain All
</Button>
</div>
</>
)}