mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-02-20 01:05:46 +00:00
Compare commits
4 Commits
batch_proc
...
prevent_sp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
252cd06fcf | ||
|
|
cec05c5ee9 | ||
|
|
eaf054ef06 | ||
|
|
a7a1a24658 |
@@ -6,8 +6,7 @@ Create Date: 2025-02-26 13:07:56.217791
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import time
|
||||
from sqlalchemy import text
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "3bd4c84fe72f"
|
||||
@@ -28,357 +27,45 @@ depends_on = None
|
||||
# 4. Adds indexes to both chat_message and chat_session tables for comprehensive search
|
||||
|
||||
|
||||
def upgrade():
|
||||
# --- PART 1: chat_message table ---
|
||||
# Step 1: Add nullable column (quick, minimal locking)
|
||||
# op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv")
|
||||
# op.execute("DROP TRIGGER IF EXISTS chat_message_tsv_trigger ON chat_message")
|
||||
# op.execute("DROP FUNCTION IF EXISTS update_chat_message_tsv()")
|
||||
# op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv")
|
||||
# # Drop chat_session tsv trigger if it exists
|
||||
# op.execute("DROP TRIGGER IF EXISTS chat_session_tsv_trigger ON chat_session")
|
||||
# op.execute("DROP FUNCTION IF EXISTS update_chat_session_tsv()")
|
||||
# op.execute("ALTER TABLE chat_session DROP COLUMN IF EXISTS title_tsv")
|
||||
# raise Exception("Stop here")
|
||||
time.time()
|
||||
op.execute("ALTER TABLE chat_message ADD COLUMN IF NOT EXISTS message_tsv tsvector")
|
||||
|
||||
# Step 2: Create function and trigger for new/updated rows
|
||||
def upgrade() -> None:
|
||||
# Create a GIN index for full-text search on chat_message.message
|
||||
op.execute(
|
||||
"""
|
||||
CREATE OR REPLACE FUNCTION update_chat_message_tsv()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
NEW.message_tsv = to_tsvector('english', NEW.message);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql
|
||||
"""
|
||||
ALTER TABLE chat_message
|
||||
ADD COLUMN message_tsv tsvector
|
||||
GENERATED ALWAYS AS (to_tsvector('english', message)) STORED;
|
||||
"""
|
||||
)
|
||||
|
||||
# Create trigger in a separate execute call
|
||||
# Commit the current transaction before creating concurrent indexes
|
||||
op.execute("COMMIT")
|
||||
|
||||
op.execute(
|
||||
"""
|
||||
CREATE TRIGGER chat_message_tsv_trigger
|
||||
BEFORE INSERT OR UPDATE ON chat_message
|
||||
FOR EACH ROW EXECUTE FUNCTION update_chat_message_tsv()
|
||||
"""
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_message_tsv
|
||||
ON chat_message
|
||||
USING GIN (message_tsv)
|
||||
"""
|
||||
)
|
||||
|
||||
# Step 3: Update existing rows in batches using Python
|
||||
time.time()
|
||||
|
||||
# Get connection and count total rows
|
||||
connection = op.get_bind()
|
||||
total_count_result = connection.execute(
|
||||
text("SELECT COUNT(*) FROM chat_message")
|
||||
).scalar()
|
||||
total_count = total_count_result if total_count_result is not None else 0
|
||||
batch_size = 5000
|
||||
batches = 0
|
||||
|
||||
# Calculate total batches needed
|
||||
total_batches = (
|
||||
(total_count + batch_size - 1) // batch_size if total_count > 0 else 0
|
||||
# Also add a stored tsvector column for chat_session.description
|
||||
op.execute(
|
||||
"""
|
||||
ALTER TABLE chat_session
|
||||
ADD COLUMN description_tsv tsvector
|
||||
GENERATED ALWAYS AS (to_tsvector('english', coalesce(description, ''))) STORED;
|
||||
"""
|
||||
)
|
||||
|
||||
# Process in batches - properly handling UUIDs by using OFFSET/LIMIT approach
|
||||
for batch_num in range(total_batches):
|
||||
offset = batch_num * batch_size
|
||||
# Commit again before creating the second concurrent index
|
||||
op.execute("COMMIT")
|
||||
|
||||
# Execute update for this batch using OFFSET/LIMIT which works with UUIDs
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
UPDATE chat_message
|
||||
SET message_tsv = to_tsvector('english', message)
|
||||
WHERE id IN (
|
||||
SELECT id FROM chat_message
|
||||
WHERE message_tsv IS NULL
|
||||
ORDER BY id
|
||||
LIMIT :batch_size OFFSET :offset
|
||||
)
|
||||
"""
|
||||
).bindparams(batch_size=batch_size, offset=offset)
|
||||
)
|
||||
|
||||
# Commit each batch
|
||||
connection.execute(text("COMMIT"))
|
||||
# Start a new transaction
|
||||
connection.execute(text("BEGIN"))
|
||||
|
||||
batches += 1
|
||||
|
||||
# Final check for any remaining NULL values
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
UPDATE chat_message SET message_tsv = to_tsvector('english', message)
|
||||
WHERE message_tsv IS NULL
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# Create GIN index concurrently
|
||||
connection.execute(text("COMMIT"))
|
||||
|
||||
time.time()
|
||||
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_message_tsv
|
||||
ON chat_message USING GIN (message_tsv)
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# First drop the trigger as it won't be needed anymore
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
DROP TRIGGER IF EXISTS chat_message_tsv_trigger ON chat_message;
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
DROP FUNCTION IF EXISTS update_chat_message_tsv();
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# Add new generated column
|
||||
time.time()
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
ALTER TABLE chat_message
|
||||
ADD COLUMN message_tsv_gen tsvector
|
||||
GENERATED ALWAYS AS (to_tsvector('english', message)) STORED;
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
connection.execute(text("COMMIT"))
|
||||
|
||||
time.time()
|
||||
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_message_tsv_gen
|
||||
ON chat_message USING GIN (message_tsv_gen)
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# Drop old index and column
|
||||
connection.execute(text("COMMIT"))
|
||||
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
DROP INDEX CONCURRENTLY IF EXISTS idx_chat_message_tsv;
|
||||
"""
|
||||
)
|
||||
)
|
||||
connection.execute(text("COMMIT"))
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
ALTER TABLE chat_message DROP COLUMN message_tsv;
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# Rename new column to old name
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
ALTER TABLE chat_message RENAME COLUMN message_tsv_gen TO message_tsv;
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# --- PART 2: chat_session table ---
|
||||
|
||||
# Step 1: Add nullable column (quick, minimal locking)
|
||||
time.time()
|
||||
connection.execute(
|
||||
text(
|
||||
"ALTER TABLE chat_session ADD COLUMN IF NOT EXISTS description_tsv tsvector"
|
||||
)
|
||||
)
|
||||
|
||||
# Step 2: Create function and trigger for new/updated rows - SPLIT INTO SEPARATE CALLS
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
CREATE OR REPLACE FUNCTION update_chat_session_tsv()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
NEW.description_tsv = to_tsvector('english', COALESCE(NEW.description, ''));
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# Create trigger in a separate execute call
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
CREATE TRIGGER chat_session_tsv_trigger
|
||||
BEFORE INSERT OR UPDATE ON chat_session
|
||||
FOR EACH ROW EXECUTE FUNCTION update_chat_session_tsv()
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# Step 3: Update existing rows in batches using Python
|
||||
time.time()
|
||||
|
||||
# Get the maximum ID to determine batch count
|
||||
# Cast id to text for MAX function since it's a UUID
|
||||
max_id_result = connection.execute(
|
||||
text("SELECT COALESCE(MAX(id::text), '0') FROM chat_session")
|
||||
).scalar()
|
||||
max_id_result if max_id_result is not None else "0"
|
||||
batch_size = 5000
|
||||
batches = 0
|
||||
|
||||
# Get all IDs ordered to process in batches
|
||||
rows = connection.execute(
|
||||
text("SELECT id FROM chat_session ORDER BY id")
|
||||
).fetchall()
|
||||
total_rows = len(rows)
|
||||
|
||||
# Process in batches
|
||||
for batch_num, batch_start in enumerate(range(0, total_rows, batch_size)):
|
||||
batch_end = min(batch_start + batch_size, total_rows)
|
||||
batch_ids = [row[0] for row in rows[batch_start:batch_end]]
|
||||
|
||||
if not batch_ids:
|
||||
continue
|
||||
|
||||
# Use IN clause instead of BETWEEN for UUIDs
|
||||
placeholders = ", ".join([f":id{i}" for i in range(len(batch_ids))])
|
||||
params = {f"id{i}": id_val for i, id_val in enumerate(batch_ids)}
|
||||
|
||||
# Execute update for this batch
|
||||
connection.execute(
|
||||
text(
|
||||
f"""
|
||||
UPDATE chat_session
|
||||
SET description_tsv = to_tsvector('english', COALESCE(description, ''))
|
||||
WHERE id IN ({placeholders})
|
||||
AND description_tsv IS NULL
|
||||
"""
|
||||
).bindparams(**params)
|
||||
)
|
||||
|
||||
# Commit each batch
|
||||
connection.execute(text("COMMIT"))
|
||||
# Start a new transaction
|
||||
connection.execute(text("BEGIN"))
|
||||
|
||||
batches += 1
|
||||
|
||||
# Final check for any remaining NULL values
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
UPDATE chat_session SET description_tsv = to_tsvector('english', COALESCE(description, ''))
|
||||
WHERE description_tsv IS NULL
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# Create GIN index concurrently
|
||||
connection.execute(text("COMMIT"))
|
||||
|
||||
time.time()
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_session_desc_tsv
|
||||
ON chat_session USING GIN (description_tsv)
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# After Final check for chat_session
|
||||
# First drop the trigger as it won't be needed anymore
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
DROP TRIGGER IF EXISTS chat_session_tsv_trigger ON chat_session;
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
DROP FUNCTION IF EXISTS update_chat_session_tsv();
|
||||
"""
|
||||
)
|
||||
)
|
||||
# Add new generated column
|
||||
time.time()
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
ALTER TABLE chat_session
|
||||
ADD COLUMN description_tsv_gen tsvector
|
||||
GENERATED ALWAYS AS (to_tsvector('english', COALESCE(description, ''))) STORED;
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# Create new index on generated column
|
||||
connection.execute(text("COMMIT"))
|
||||
|
||||
time.time()
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_session_desc_tsv_gen
|
||||
ON chat_session USING GIN (description_tsv_gen)
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# Drop old index and column
|
||||
connection.execute(text("COMMIT"))
|
||||
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
DROP INDEX CONCURRENTLY IF EXISTS idx_chat_session_desc_tsv;
|
||||
"""
|
||||
)
|
||||
)
|
||||
connection.execute(text("COMMIT"))
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
ALTER TABLE chat_session DROP COLUMN description_tsv;
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
# Rename new column to old name
|
||||
connection.execute(
|
||||
text(
|
||||
"""
|
||||
ALTER TABLE chat_session RENAME COLUMN description_tsv_gen TO description_tsv;
|
||||
"""
|
||||
)
|
||||
op.execute(
|
||||
"""
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_session_desc_tsv
|
||||
ON chat_session
|
||||
USING GIN (description_tsv)
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ from ee.onyx.server.enterprise_settings.api import (
|
||||
)
|
||||
from ee.onyx.server.manage.standard_answer import router as standard_answer_router
|
||||
from ee.onyx.server.middleware.tenant_tracking import add_tenant_id_middleware
|
||||
from ee.onyx.server.oauth.api import router as oauth_router
|
||||
from ee.onyx.server.oauth.api import router as ee_oauth_router
|
||||
from ee.onyx.server.query_and_chat.chat_backend import (
|
||||
router as chat_router,
|
||||
)
|
||||
@@ -128,7 +128,7 @@ def get_application() -> FastAPI:
|
||||
include_router_with_global_prefix_prepended(application, query_router)
|
||||
include_router_with_global_prefix_prepended(application, chat_router)
|
||||
include_router_with_global_prefix_prepended(application, standard_answer_router)
|
||||
include_router_with_global_prefix_prepended(application, oauth_router)
|
||||
include_router_with_global_prefix_prepended(application, ee_oauth_router)
|
||||
|
||||
# Enterprise-only global settings
|
||||
include_router_with_global_prefix_prepended(
|
||||
|
||||
@@ -51,6 +51,7 @@ from onyx.server.documents.cc_pair import router as cc_pair_router
|
||||
from onyx.server.documents.connector import router as connector_router
|
||||
from onyx.server.documents.credential import router as credential_router
|
||||
from onyx.server.documents.document import router as document_router
|
||||
from onyx.server.documents.standard_oauth import router as standard_oauth_router
|
||||
from onyx.server.features.document_set.api import router as document_set_router
|
||||
from onyx.server.features.folder.api import router as folder_router
|
||||
from onyx.server.features.input_prompt.api import (
|
||||
@@ -322,6 +323,7 @@ def get_application() -> FastAPI:
|
||||
)
|
||||
include_router_with_global_prefix_prepended(application, long_term_logs_router)
|
||||
include_router_with_global_prefix_prepended(application, api_key_router)
|
||||
include_router_with_global_prefix_prepended(application, standard_oauth_router)
|
||||
|
||||
if AUTH_TYPE == AuthType.DISABLED:
|
||||
# Server logs this during auth setup verification step
|
||||
|
||||
@@ -290,21 +290,24 @@ export function SettingsForm() {
|
||||
id="chatRetentionInput"
|
||||
placeholder="Infinite Retention"
|
||||
/>
|
||||
<Button
|
||||
onClick={handleSetChatRetention}
|
||||
variant="submit"
|
||||
size="sm"
|
||||
className="mr-3"
|
||||
>
|
||||
Set Retention Limit
|
||||
</Button>
|
||||
<Button
|
||||
onClick={handleClearChatRetention}
|
||||
variant="default"
|
||||
size="sm"
|
||||
>
|
||||
Retain All
|
||||
</Button>
|
||||
<div className="mr-auto flex gap-2">
|
||||
<Button
|
||||
onClick={handleSetChatRetention}
|
||||
variant="submit"
|
||||
size="sm"
|
||||
className="mr-auto"
|
||||
>
|
||||
Set Retention Limit
|
||||
</Button>
|
||||
<Button
|
||||
onClick={handleClearChatRetention}
|
||||
variant="default"
|
||||
size="sm"
|
||||
className="mr-auto"
|
||||
>
|
||||
Retain All
|
||||
</Button>
|
||||
</div>
|
||||
</>
|
||||
)}
|
||||
|
||||
|
||||
@@ -191,6 +191,7 @@ export const FolderDropdown = forwardRef<HTMLDivElement, FolderDropdownProps>(
|
||||
onChange={(e) => setNewFolderName(e.target.value)}
|
||||
className="text-sm font-medium bg-transparent outline-none w-full pb-1 border-b border-background-500 transition-colors duration-200"
|
||||
onKeyDown={(e) => {
|
||||
e.stopPropagation();
|
||||
if (e.key === "Enter") {
|
||||
handleEdit();
|
||||
}
|
||||
|
||||
@@ -303,7 +303,6 @@ const FolderItem = ({
|
||||
key={chatSession.id}
|
||||
chatSession={chatSession}
|
||||
isSelected={chatSession.id === currentChatId}
|
||||
skipGradient={isDragOver}
|
||||
showShareModal={showShareModal}
|
||||
showDeleteModal={showDeleteModal}
|
||||
/>
|
||||
|
||||
@@ -32,21 +32,17 @@ export function ChatSessionDisplay({
|
||||
chatSession,
|
||||
search,
|
||||
isSelected,
|
||||
skipGradient,
|
||||
closeSidebar,
|
||||
showShareModal,
|
||||
showDeleteModal,
|
||||
foldersExisting,
|
||||
isDragging,
|
||||
}: {
|
||||
chatSession: ChatSession;
|
||||
isSelected: boolean;
|
||||
search?: boolean;
|
||||
skipGradient?: boolean;
|
||||
closeSidebar?: () => void;
|
||||
showShareModal?: (chatSession: ChatSession) => void;
|
||||
showDeleteModal?: (chatSession: ChatSession) => void;
|
||||
foldersExisting?: boolean;
|
||||
isDragging?: boolean;
|
||||
}) {
|
||||
const router = useRouter();
|
||||
@@ -238,8 +234,12 @@ export function ChatSessionDisplay({
|
||||
e.preventDefault();
|
||||
e.stopPropagation();
|
||||
}}
|
||||
onChange={(e) => setChatName(e.target.value)}
|
||||
onChange={(e) => {
|
||||
setChatName(e.target.value);
|
||||
}}
|
||||
onKeyDown={(event) => {
|
||||
event.stopPropagation();
|
||||
|
||||
if (event.key === "Enter") {
|
||||
onRename();
|
||||
event.preventDefault();
|
||||
|
||||
@@ -264,7 +264,6 @@ export function PagesTab({
|
||||
>
|
||||
<ChatSessionDisplay
|
||||
chatSession={chat}
|
||||
foldersExisting={foldersExisting}
|
||||
isSelected={currentChatId === chat.id}
|
||||
showShareModal={showShareModal}
|
||||
showDeleteModal={showDeleteModal}
|
||||
|
||||
Reference in New Issue
Block a user