mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-03-21 07:32:41 +00:00
Compare commits
27 Commits
bo/query_p
...
v0.29.0-cl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3be4d3a8b8 | ||
|
|
5f7bac82ea | ||
|
|
1fd5a20088 | ||
|
|
d8b05eccd6 | ||
|
|
9eab1cd009 | ||
|
|
d01c4ea8bb | ||
|
|
91aae54d52 | ||
|
|
2a370e8b08 | ||
|
|
d0688d5508 | ||
|
|
f8c28537ce | ||
|
|
089e6b015d | ||
|
|
359518e3fa | ||
|
|
69364c9c5a | ||
|
|
4478857011 | ||
|
|
ff25452e39 | ||
|
|
159277ceb0 | ||
|
|
2ce3e8f1b6 | ||
|
|
983fca5bed | ||
|
|
43a63267b6 | ||
|
|
869d96a8ee | ||
|
|
de41ff3061 | ||
|
|
a2c19e63b4 | ||
|
|
5672527686 | ||
|
|
131c7614ee | ||
|
|
4f7f2dcc48 | ||
|
|
c4ee8d64cf | ||
|
|
54f0ad0cd2 |
65
.github/workflows/pr-file-store-tests.yml
vendored
Normal file
65
.github/workflows/pr-file-store-tests.yml
vendored
Normal file
@@ -0,0 +1,65 @@
|
||||
name: File Store Tests
|
||||
|
||||
on:
|
||||
merge_group:
|
||||
pull_request:
|
||||
branches: [main]
|
||||
|
||||
env:
|
||||
# AWS
|
||||
S3_AWS_ACCESS_KEY_ID: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
|
||||
S3_AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
|
||||
|
||||
# MinIO
|
||||
S3_ENDPOINT_URL: "http://localhost:9004"
|
||||
|
||||
jobs:
|
||||
file-store-tests:
|
||||
# See https://runs-on.com/runners/linux/
|
||||
runs-on: [runs-on, runner=8cpu-linux-x64, "run-id=${{ github.run_id }}"]
|
||||
|
||||
env:
|
||||
PYTHONPATH: ./backend
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.11"
|
||||
cache: "pip"
|
||||
cache-dependency-path: |
|
||||
backend/requirements/default.txt
|
||||
backend/requirements/dev.txt
|
||||
|
||||
- name: Install Dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install --retries 5 --timeout 30 -r backend/requirements/default.txt
|
||||
pip install --retries 5 --timeout 30 -r backend/requirements/dev.txt
|
||||
playwright install chromium
|
||||
playwright install-deps chromium
|
||||
|
||||
- name: Set up MinIO and Postgres
|
||||
run: |
|
||||
cd deployment/docker_compose
|
||||
docker compose -f docker-compose.dev.yml -p onyx-stack up -d minio relational_db
|
||||
|
||||
- name: Run migrations
|
||||
run: |
|
||||
cd backend
|
||||
alembic upgrade head
|
||||
|
||||
- name: Run Tests
|
||||
shell: script -q -e -c "bash --noprofile --norc -eo pipefail {0}"
|
||||
run: |
|
||||
py.test \
|
||||
-n 8 \
|
||||
--dist loadfile \
|
||||
--durations=8 \
|
||||
-o junit_family=xunit2 \
|
||||
-xv \
|
||||
--ff \
|
||||
backend/tests/daily/file_store
|
||||
295
backend/alembic/versions/c9e2cd766c29_add_s3_file_store_table.py
Normal file
295
backend/alembic/versions/c9e2cd766c29_add_s3_file_store_table.py
Normal file
@@ -0,0 +1,295 @@
|
||||
"""modify_file_store_for_external_storage
|
||||
|
||||
Revision ID: c9e2cd766c29
|
||||
Revises: 03bf8be6b53a
|
||||
Create Date: 2025-06-13 14:02:09.867679
|
||||
|
||||
"""
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import text
|
||||
from typing import cast, Any
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from onyx.db._deprecated.pg_file_store import delete_lobj_by_id, read_lobj
|
||||
from onyx.file_store.file_store import get_s3_file_store
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "c9e2cd766c29"
|
||||
down_revision = "03bf8be6b53a"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
try:
|
||||
# Modify existing file_store table to support external storage
|
||||
op.rename_table("file_store", "file_record")
|
||||
|
||||
# Make lobj_oid nullable (for external storage files)
|
||||
op.alter_column("file_record", "lobj_oid", nullable=True)
|
||||
|
||||
# Add external storage columns with generic names
|
||||
op.add_column(
|
||||
"file_record", sa.Column("bucket_name", sa.String(), nullable=True)
|
||||
)
|
||||
op.add_column(
|
||||
"file_record", sa.Column("object_key", sa.String(), nullable=True)
|
||||
)
|
||||
|
||||
# Add timestamps for tracking
|
||||
op.add_column(
|
||||
"file_record",
|
||||
sa.Column(
|
||||
"created_at",
|
||||
sa.DateTime(timezone=True),
|
||||
server_default=sa.func.now(),
|
||||
nullable=False,
|
||||
),
|
||||
)
|
||||
op.add_column(
|
||||
"file_record",
|
||||
sa.Column(
|
||||
"updated_at",
|
||||
sa.DateTime(timezone=True),
|
||||
server_default=sa.func.now(),
|
||||
nullable=False,
|
||||
),
|
||||
)
|
||||
|
||||
op.alter_column("file_record", "file_name", new_column_name="file_id")
|
||||
except Exception as e:
|
||||
if "does not exist" in str(e) or 'relation "file_store" does not exist' in str(
|
||||
e
|
||||
):
|
||||
print(
|
||||
f"Ran into error - {e}. Likely means we had a partial success in the past, continuing..."
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
print(
|
||||
"External storage configured - migrating files from PostgreSQL to external storage..."
|
||||
)
|
||||
# if we fail midway through this, we'll have a partial success. Running the migration
|
||||
# again should allow us to continue.
|
||||
_migrate_files_to_external_storage()
|
||||
print("File migration completed successfully!")
|
||||
|
||||
# Remove lobj_oid column
|
||||
op.drop_column("file_record", "lobj_oid")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Revert schema changes and migrate files from external storage back to PostgreSQL large objects."""
|
||||
|
||||
print(
|
||||
"Reverting to PostgreSQL-backed file store – migrating files from external storage …"
|
||||
)
|
||||
|
||||
# 1. Ensure `lobj_oid` exists on the current `file_record` table (nullable for now).
|
||||
op.add_column("file_record", sa.Column("lobj_oid", sa.Integer(), nullable=True))
|
||||
|
||||
# 2. Move content from external storage back into PostgreSQL large objects (table is still
|
||||
# called `file_record` so application code continues to work during the copy).
|
||||
try:
|
||||
_migrate_files_to_postgres()
|
||||
except Exception:
|
||||
print("Error during downgrade migration, rolling back …")
|
||||
op.drop_column("file_record", "lobj_oid")
|
||||
raise
|
||||
|
||||
# 3. After migration every row should now have `lobj_oid` populated – mark NOT NULL.
|
||||
op.alter_column("file_record", "lobj_oid", nullable=False)
|
||||
|
||||
# 4. Remove columns that are only relevant to external storage.
|
||||
op.drop_column("file_record", "updated_at")
|
||||
op.drop_column("file_record", "created_at")
|
||||
op.drop_column("file_record", "object_key")
|
||||
op.drop_column("file_record", "bucket_name")
|
||||
|
||||
# 5. Rename `file_id` back to `file_name` (still on `file_record`).
|
||||
op.alter_column("file_record", "file_id", new_column_name="file_name")
|
||||
|
||||
# 6. Finally, rename the table back to its original name expected by the legacy codebase.
|
||||
op.rename_table("file_record", "file_store")
|
||||
|
||||
print(
|
||||
"Downgrade migration completed – files are now stored inside PostgreSQL again."
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Helper: migrate from external storage (S3/MinIO) back into PostgreSQL large objects
|
||||
|
||||
|
||||
def _migrate_files_to_postgres() -> None:
|
||||
"""Move any files whose content lives in external S3-compatible storage back into PostgreSQL.
|
||||
|
||||
The logic mirrors *inverse* of `_migrate_files_to_external_storage` used on upgrade.
|
||||
"""
|
||||
|
||||
# Obtain DB session from Alembic context
|
||||
bind = op.get_bind()
|
||||
session = Session(bind=bind)
|
||||
|
||||
# Fetch rows that have external storage pointers (bucket/object_key not NULL)
|
||||
result = session.execute(
|
||||
text(
|
||||
"SELECT file_id, bucket_name, object_key FROM file_record "
|
||||
"WHERE bucket_name IS NOT NULL AND object_key IS NOT NULL"
|
||||
)
|
||||
)
|
||||
|
||||
files_to_migrate = [row[0] for row in result.fetchall()]
|
||||
total_files = len(files_to_migrate)
|
||||
|
||||
if total_files == 0:
|
||||
print("No files found in external storage to migrate back to PostgreSQL.")
|
||||
return
|
||||
|
||||
print(f"Found {total_files} files to migrate back to PostgreSQL large objects.")
|
||||
|
||||
migrated_count = 0
|
||||
|
||||
# only create external store if we have files to migrate. This line
|
||||
# makes it so we need to have S3/MinIO configured to run this migration.
|
||||
external_store = get_s3_file_store(db_session=session)
|
||||
|
||||
for i, file_id in enumerate(files_to_migrate, 1):
|
||||
print(f"Migrating file {i}/{total_files}: {file_id}")
|
||||
|
||||
# Read file content from external storage (always binary)
|
||||
try:
|
||||
file_io = external_store.read_file(
|
||||
file_id=file_id, mode="b", use_tempfile=True
|
||||
)
|
||||
file_io.seek(0)
|
||||
|
||||
# Import lazily to avoid circular deps at Alembic runtime
|
||||
from onyx.db._deprecated.pg_file_store import (
|
||||
create_populate_lobj,
|
||||
) # noqa: E402
|
||||
|
||||
# Create new Postgres large object and populate it
|
||||
lobj_oid = create_populate_lobj(content=file_io, db_session=session)
|
||||
|
||||
# Update DB row: set lobj_oid, clear bucket/object_key
|
||||
session.execute(
|
||||
text(
|
||||
"UPDATE file_record SET lobj_oid = :lobj_oid, bucket_name = NULL, "
|
||||
"object_key = NULL WHERE file_id = :file_id"
|
||||
),
|
||||
{"lobj_oid": lobj_oid, "file_id": file_id},
|
||||
)
|
||||
except ClientError as e:
|
||||
if "NoSuchKey" in str(e):
|
||||
print(
|
||||
f"File {file_id} not found in external storage. Deleting from database."
|
||||
)
|
||||
session.execute(
|
||||
text("DELETE FROM file_record WHERE file_id = :file_id"),
|
||||
{"file_id": file_id},
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
migrated_count += 1
|
||||
print(f"✓ Successfully migrated file {i}/{total_files}: {file_id}")
|
||||
|
||||
# Flush the SQLAlchemy session so statements are sent to the DB, but **do not**
|
||||
# commit the transaction. The surrounding Alembic migration will commit once
|
||||
# the *entire* downgrade succeeds. This keeps the whole downgrade atomic and
|
||||
# avoids leaving the database in a partially-migrated state if a later schema
|
||||
# operation fails.
|
||||
session.flush()
|
||||
|
||||
print(
|
||||
f"Migration back to PostgreSQL completed: {migrated_count} files staged for commit."
|
||||
)
|
||||
|
||||
|
||||
def _migrate_files_to_external_storage() -> None:
|
||||
"""Migrate files from PostgreSQL large objects to external storage"""
|
||||
# Get database session
|
||||
bind = op.get_bind()
|
||||
session = Session(bind=bind)
|
||||
external_store = get_s3_file_store(db_session=session)
|
||||
|
||||
# Find all files currently stored in PostgreSQL (lobj_oid is not null)
|
||||
result = session.execute(
|
||||
text(
|
||||
"SELECT file_id FROM file_record WHERE lobj_oid IS NOT NULL "
|
||||
"AND bucket_name IS NULL AND object_key IS NULL"
|
||||
)
|
||||
)
|
||||
|
||||
files_to_migrate = [row[0] for row in result.fetchall()]
|
||||
total_files = len(files_to_migrate)
|
||||
|
||||
if total_files == 0:
|
||||
print("No files found in PostgreSQL storage to migrate.")
|
||||
return
|
||||
|
||||
print(f"Found {total_files} files to migrate from PostgreSQL to external storage.")
|
||||
|
||||
migrated_count = 0
|
||||
|
||||
for i, file_id in enumerate(files_to_migrate, 1):
|
||||
print(f"Migrating file {i}/{total_files}: {file_id}")
|
||||
|
||||
# Read file record to get metadata
|
||||
file_record = session.execute(
|
||||
text("SELECT * FROM file_record WHERE file_id = :file_id"),
|
||||
{"file_id": file_id},
|
||||
).fetchone()
|
||||
|
||||
if file_record is None:
|
||||
print(f"File {file_id} not found in PostgreSQL storage.")
|
||||
continue
|
||||
|
||||
lobj_id = cast(int, file_record.lobj_oid) # type: ignore
|
||||
file_metadata = cast(Any, file_record.file_metadata) # type: ignore
|
||||
|
||||
# Read file content from PostgreSQL
|
||||
file_content = read_lobj(
|
||||
lobj_id, db_session=session, mode="b", use_tempfile=True
|
||||
)
|
||||
|
||||
# Handle file_metadata type conversion
|
||||
file_metadata = None
|
||||
if file_metadata is not None:
|
||||
if isinstance(file_metadata, dict):
|
||||
file_metadata = file_metadata
|
||||
else:
|
||||
# Convert other types to dict if possible, otherwise None
|
||||
try:
|
||||
file_metadata = dict(file_record.file_metadata) # type: ignore
|
||||
except (TypeError, ValueError):
|
||||
file_metadata = None
|
||||
|
||||
# Save to external storage (this will handle the database record update and cleanup)
|
||||
# NOTE: this WILL .commit() the transaction.
|
||||
external_store.save_file(
|
||||
file_id=file_id,
|
||||
content=file_content,
|
||||
display_name=file_record.display_name,
|
||||
file_origin=file_record.file_origin,
|
||||
file_type=file_record.file_type,
|
||||
file_metadata=file_metadata,
|
||||
)
|
||||
delete_lobj_by_id(lobj_id, db_session=session)
|
||||
|
||||
migrated_count += 1
|
||||
print(f"✓ Successfully migrated file {i}/{total_files}: {file_id}")
|
||||
|
||||
# See note above – flush but do **not** commit so the outer Alembic transaction
|
||||
# controls atomicity.
|
||||
session.flush()
|
||||
|
||||
print(
|
||||
f"Migration completed: {migrated_count} files staged for commit to external storage."
|
||||
)
|
||||
@@ -86,7 +86,6 @@ def export_query_history_task(
|
||||
try:
|
||||
stream.seek(0)
|
||||
get_default_file_store(db_session).save_file(
|
||||
file_name=report_name,
|
||||
content=stream,
|
||||
display_name=report_name,
|
||||
file_origin=FileOrigin.QUERY_HISTORY_CSV,
|
||||
@@ -96,6 +95,7 @@ def export_query_history_task(
|
||||
"end": end.isoformat(),
|
||||
"start_time": start_time.isoformat(),
|
||||
},
|
||||
file_id=report_name,
|
||||
)
|
||||
|
||||
delete_task_with_id(
|
||||
|
||||
@@ -115,11 +115,24 @@ def get_all_usage_reports(db_session: Session) -> list[UsageReportMetadata]:
|
||||
|
||||
def get_usage_report_data(
|
||||
db_session: Session,
|
||||
report_name: str,
|
||||
report_display_name: str,
|
||||
) -> IO:
|
||||
"""
|
||||
Get the usage report data from the file store.
|
||||
|
||||
Args:
|
||||
db_session: The database session.
|
||||
report_display_name: The display name of the usage report. Also assumes
|
||||
that the file is stored with this as the ID in the file store.
|
||||
|
||||
Returns:
|
||||
The usage report data.
|
||||
"""
|
||||
file_store = get_default_file_store(db_session)
|
||||
# usage report may be very large, so don't load it all into memory
|
||||
return file_store.read_file(file_name=report_name, mode="b", use_tempfile=True)
|
||||
return file_store.read_file(
|
||||
file_id=report_display_name, mode="b", use_tempfile=True
|
||||
)
|
||||
|
||||
|
||||
def write_usage_report(
|
||||
|
||||
@@ -28,7 +28,7 @@ from onyx.auth.users import get_user_manager
|
||||
from onyx.auth.users import UserManager
|
||||
from onyx.db.engine import get_session
|
||||
from onyx.db.models import User
|
||||
from onyx.file_store.file_store import PostgresBackedFileStore
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.server.utils import BasicAuthenticationError
|
||||
from onyx.utils.logger import setup_logger
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
@@ -142,11 +142,12 @@ def put_logo(
|
||||
|
||||
def fetch_logo_helper(db_session: Session) -> Response:
|
||||
try:
|
||||
file_store = PostgresBackedFileStore(db_session)
|
||||
file_store = get_default_file_store(db_session)
|
||||
onyx_file = file_store.get_file_with_mime_type(get_logo_filename())
|
||||
if not onyx_file:
|
||||
raise ValueError("get_onyx_file returned None!")
|
||||
except Exception:
|
||||
logger.exception("Faield to fetch logo file")
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="No logo file found",
|
||||
@@ -157,7 +158,7 @@ def fetch_logo_helper(db_session: Session) -> Response:
|
||||
|
||||
def fetch_logotype_helper(db_session: Session) -> Response:
|
||||
try:
|
||||
file_store = PostgresBackedFileStore(db_session)
|
||||
file_store = get_default_file_store(db_session)
|
||||
onyx_file = file_store.get_file_with_mime_type(get_logotype_filename())
|
||||
if not onyx_file:
|
||||
raise ValueError("get_onyx_file returned None!")
|
||||
|
||||
@@ -131,11 +131,11 @@ def upload_logo(
|
||||
|
||||
file_store = get_default_file_store(db_session)
|
||||
file_store.save_file(
|
||||
file_name=_LOGOTYPE_FILENAME if is_logotype else _LOGO_FILENAME,
|
||||
content=content,
|
||||
display_name=display_name,
|
||||
file_origin=FileOrigin.OTHER,
|
||||
file_type=file_type,
|
||||
file_id=_LOGOTYPE_FILENAME if is_logotype else _LOGO_FILENAME,
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
@@ -39,9 +39,9 @@ from onyx.db.chat import get_chat_session_by_id
|
||||
from onyx.db.chat import get_chat_sessions_by_user
|
||||
from onyx.db.engine import get_session
|
||||
from onyx.db.enums import TaskStatus
|
||||
from onyx.db.file_record import get_query_history_export_files
|
||||
from onyx.db.models import ChatSession
|
||||
from onyx.db.models import User
|
||||
from onyx.db.pg_file_store import get_query_history_export_files
|
||||
from onyx.db.tasks import get_task_with_id
|
||||
from onyx.db.tasks import register_task
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
@@ -360,7 +360,7 @@ def get_query_history_export_status(
|
||||
|
||||
report_name = construct_query_history_report_name(request_id)
|
||||
has_file = file_store.has_file(
|
||||
file_name=report_name,
|
||||
file_id=report_name,
|
||||
file_origin=FileOrigin.QUERY_HISTORY_CSV,
|
||||
file_type=FileType.CSV,
|
||||
)
|
||||
@@ -385,7 +385,7 @@ def download_query_history_csv(
|
||||
report_name = construct_query_history_report_name(request_id)
|
||||
file_store = get_default_file_store(db_session)
|
||||
has_file = file_store.has_file(
|
||||
file_name=report_name,
|
||||
file_id=report_name,
|
||||
file_origin=FileOrigin.QUERY_HISTORY_CSV,
|
||||
file_type=FileType.CSV,
|
||||
)
|
||||
|
||||
@@ -12,7 +12,7 @@ from onyx.configs.constants import SessionType
|
||||
from onyx.db.enums import TaskStatus
|
||||
from onyx.db.models import ChatMessage
|
||||
from onyx.db.models import ChatSession
|
||||
from onyx.db.models import PGFileStore
|
||||
from onyx.db.models import FileRecord
|
||||
from onyx.db.models import TaskQueueState
|
||||
|
||||
|
||||
@@ -254,7 +254,7 @@ class QueryHistoryExport(BaseModel):
|
||||
@classmethod
|
||||
def from_file(
|
||||
cls,
|
||||
file: PGFileStore,
|
||||
file: FileRecord,
|
||||
) -> "QueryHistoryExport":
|
||||
if not file.file_metadata or not isinstance(file.file_metadata, dict):
|
||||
raise RuntimeError(
|
||||
@@ -262,7 +262,7 @@ class QueryHistoryExport(BaseModel):
|
||||
)
|
||||
|
||||
metadata = QueryHistoryFileMetadata.model_validate(dict(file.file_metadata))
|
||||
task_id = extract_task_id_from_query_history_report_name(file.file_name)
|
||||
task_id = extract_task_id_from_query_history_report_name(file.file_id)
|
||||
|
||||
return cls(
|
||||
task_id=task_id,
|
||||
|
||||
@@ -62,17 +62,16 @@ def generate_chat_messages_report(
|
||||
]
|
||||
)
|
||||
|
||||
# after writing seek to begining of buffer
|
||||
# after writing seek to beginning of buffer
|
||||
temp_file.seek(0)
|
||||
file_store.save_file(
|
||||
file_name=file_name,
|
||||
file_id = file_store.save_file(
|
||||
content=temp_file,
|
||||
display_name=file_name,
|
||||
file_origin=FileOrigin.OTHER,
|
||||
file_type="text/csv",
|
||||
)
|
||||
|
||||
return file_name
|
||||
return file_id
|
||||
|
||||
|
||||
def generate_user_report(
|
||||
@@ -97,15 +96,14 @@ def generate_user_report(
|
||||
csvwriter.writerow([user_skeleton.user_id, user_skeleton.is_active])
|
||||
|
||||
temp_file.seek(0)
|
||||
file_store.save_file(
|
||||
file_name=file_name,
|
||||
file_id = file_store.save_file(
|
||||
content=temp_file,
|
||||
display_name=file_name,
|
||||
file_origin=FileOrigin.OTHER,
|
||||
file_type="text/csv",
|
||||
)
|
||||
|
||||
return file_name
|
||||
return file_id
|
||||
|
||||
|
||||
def create_new_usage_report(
|
||||
@@ -116,16 +114,16 @@ def create_new_usage_report(
|
||||
report_id = str(uuid.uuid4())
|
||||
file_store = get_default_file_store(db_session)
|
||||
|
||||
messages_filename = generate_chat_messages_report(
|
||||
messages_file_id = generate_chat_messages_report(
|
||||
db_session, file_store, report_id, period
|
||||
)
|
||||
users_filename = generate_user_report(db_session, file_store, report_id)
|
||||
users_file_id = generate_user_report(db_session, file_store, report_id)
|
||||
|
||||
with tempfile.SpooledTemporaryFile(max_size=MAX_IN_MEMORY_SIZE) as zip_buffer:
|
||||
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED) as zip_file:
|
||||
# write messages
|
||||
chat_messages_tmpfile = file_store.read_file(
|
||||
messages_filename, mode="b", use_tempfile=True
|
||||
messages_file_id, mode="b", use_tempfile=True
|
||||
)
|
||||
zip_file.writestr(
|
||||
"chat_messages.csv",
|
||||
@@ -134,7 +132,7 @@ def create_new_usage_report(
|
||||
|
||||
# write users
|
||||
users_tmpfile = file_store.read_file(
|
||||
users_filename, mode="b", use_tempfile=True
|
||||
users_file_id, mode="b", use_tempfile=True
|
||||
)
|
||||
zip_file.writestr("users.csv", users_tmpfile.read())
|
||||
|
||||
@@ -146,11 +144,11 @@ def create_new_usage_report(
|
||||
f"_{report_id}_usage_report.zip"
|
||||
)
|
||||
file_store.save_file(
|
||||
file_name=report_name,
|
||||
content=zip_buffer,
|
||||
display_name=report_name,
|
||||
file_origin=FileOrigin.GENERATED_REPORT,
|
||||
file_type="application/zip",
|
||||
file_id=report_name,
|
||||
)
|
||||
|
||||
# add report after zip file is written
|
||||
|
||||
@@ -35,11 +35,11 @@ def save_checkpoint(
|
||||
|
||||
file_store = get_default_file_store(db_session)
|
||||
file_store.save_file(
|
||||
file_name=checkpoint_pointer,
|
||||
content=BytesIO(checkpoint.model_dump_json().encode()),
|
||||
display_name=checkpoint_pointer,
|
||||
file_origin=FileOrigin.INDEXING_CHECKPOINT,
|
||||
file_type="application/json",
|
||||
file_id=checkpoint_pointer,
|
||||
)
|
||||
|
||||
index_attempt = get_index_attempt(db_session, index_attempt_id)
|
||||
|
||||
@@ -781,3 +781,16 @@ DB_READONLY_USER: str = os.environ.get("DB_READONLY_USER", "db_readonly_user")
|
||||
DB_READONLY_PASSWORD: str = urllib.parse.quote_plus(
|
||||
os.environ.get("DB_READONLY_PASSWORD") or "password"
|
||||
)
|
||||
|
||||
# File Store Configuration
|
||||
S3_FILE_STORE_BUCKET_NAME = (
|
||||
os.environ.get("S3_FILE_STORE_BUCKET_NAME") or "onyx-file-store-bucket"
|
||||
)
|
||||
S3_FILE_STORE_PREFIX = os.environ.get("S3_FILE_STORE_PREFIX") or "onyx-files"
|
||||
# S3_ENDPOINT_URL is for MinIO and other S3-compatible storage. Leave blank for AWS S3.
|
||||
S3_ENDPOINT_URL = os.environ.get("S3_ENDPOINT_URL")
|
||||
S3_VERIFY_SSL = os.environ.get("S3_VERIFY_SSL", "").lower() == "true"
|
||||
|
||||
# S3/MinIO Access Keys
|
||||
S3_AWS_ACCESS_KEY_ID = os.environ.get("S3_AWS_ACCESS_KEY_ID")
|
||||
S3_AWS_SECRET_ACCESS_KEY = os.environ.get("S3_AWS_SECRET_ACCESS_KEY")
|
||||
|
||||
@@ -281,7 +281,7 @@ class BlobStorageConnector(LoadConnector, PollConnector):
|
||||
image_section, _ = store_image_and_create_section(
|
||||
db_session=db_session,
|
||||
image_data=downloaded_file,
|
||||
file_name=f"{self.bucket_type}_{self.bucket_name}_{key.replace('/', '_')}",
|
||||
file_id=f"{self.bucket_type}_{self.bucket_name}_{key.replace('/', '_')}",
|
||||
display_name=file_name,
|
||||
link=link,
|
||||
file_origin=FileOrigin.CONNECTOR,
|
||||
@@ -449,10 +449,8 @@ if __name__ == "__main__":
|
||||
print(f" - Link: {section.link}")
|
||||
if isinstance(section, TextSection) and section.text is not None:
|
||||
print(f" - Text: {section.text[:100]}...")
|
||||
elif (
|
||||
hasattr(section, "image_file_name") and section.image_file_name
|
||||
):
|
||||
print(f" - Image: {section.image_file_name}")
|
||||
elif hasattr(section, "image_file_id") and section.image_file_id:
|
||||
print(f" - Image: {section.image_file_id}")
|
||||
else:
|
||||
print("Error: Unknown section type")
|
||||
print("---")
|
||||
|
||||
@@ -325,7 +325,7 @@ class ConfluenceConnector(
|
||||
# Create an ImageSection for image attachments
|
||||
image_section = ImageSection(
|
||||
link=f"{page_url}#attachment-{attachment['id']}",
|
||||
image_file_name=result.file_name,
|
||||
image_file_id=result.file_name,
|
||||
)
|
||||
sections.append(image_section)
|
||||
else:
|
||||
@@ -440,7 +440,7 @@ class ConfluenceConnector(
|
||||
doc.sections.append(
|
||||
ImageSection(
|
||||
link=object_url,
|
||||
image_file_name=file_storage_name,
|
||||
image_file_id=file_storage_name,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import io
|
||||
import math
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
@@ -18,30 +17,23 @@ from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.app_configs import (
|
||||
CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD,
|
||||
)
|
||||
from onyx.configs.app_configs import CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.db.engine import get_session_with_current_tenant
|
||||
from onyx.file_processing.extract_file_text import extract_file_text
|
||||
from onyx.file_processing.extract_file_text import is_accepted_file_ext
|
||||
from onyx.file_processing.extract_file_text import OnyxExtensionType
|
||||
from onyx.file_processing.file_validation import is_valid_image_type
|
||||
from onyx.file_processing.image_utils import store_image_and_create_section
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from onyx.connectors.confluence.onyx_confluence import OnyxConfluence
|
||||
|
||||
from onyx.db.engine import get_session_with_current_tenant
|
||||
from onyx.db.models import PGFileStore
|
||||
from onyx.db.pg_file_store import create_populate_lobj
|
||||
from onyx.db.pg_file_store import save_bytes_to_pgfilestore
|
||||
from onyx.db.pg_file_store import upsert_pgfilestore
|
||||
from onyx.file_processing.extract_file_text import (
|
||||
OnyxExtensionType,
|
||||
extract_file_text,
|
||||
is_accepted_file_ext,
|
||||
)
|
||||
from onyx.file_processing.file_validation import is_valid_image_type
|
||||
from onyx.file_processing.image_utils import store_image_and_create_section
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
@@ -80,7 +72,7 @@ class AttachmentProcessingResult(BaseModel):
|
||||
"""
|
||||
A container for results after processing a Confluence attachment.
|
||||
'text' is the textual content of the attachment.
|
||||
'file_name' is the final file name used in PGFileStore to store the content.
|
||||
'file_name' is the final file name used in FileStore to store the content.
|
||||
'error' holds an exception or string if something failed.
|
||||
"""
|
||||
|
||||
@@ -236,7 +228,7 @@ def _process_image_attachment(
|
||||
section, file_name = store_image_and_create_section(
|
||||
db_session=db_session,
|
||||
image_data=raw_bytes,
|
||||
file_name=Path(attachment["id"]).name,
|
||||
file_id=Path(attachment["id"]).name,
|
||||
display_name=attachment["title"],
|
||||
media_type=media_type,
|
||||
file_origin=FileOrigin.CONNECTOR,
|
||||
@@ -251,59 +243,6 @@ def _process_image_attachment(
|
||||
return AttachmentProcessingResult(text=None, file_name=None, error=msg)
|
||||
|
||||
|
||||
def _process_text_attachment(
|
||||
attachment: dict[str, Any],
|
||||
raw_bytes: bytes,
|
||||
media_type: str,
|
||||
) -> AttachmentProcessingResult:
|
||||
"""Process a text-based attachment by extracting its content."""
|
||||
try:
|
||||
extracted_text = extract_file_text(
|
||||
io.BytesIO(raw_bytes),
|
||||
file_name=attachment["title"],
|
||||
break_on_unprocessable=False,
|
||||
)
|
||||
except Exception as e:
|
||||
msg = f"Failed to extract text for '{attachment['title']}': {e}"
|
||||
logger.error(msg, exc_info=e)
|
||||
return AttachmentProcessingResult(text=None, file_name=None, error=msg)
|
||||
|
||||
# Check length constraints
|
||||
if extracted_text is None or len(extracted_text) == 0:
|
||||
msg = f"No text extracted for {attachment['title']}"
|
||||
logger.warning(msg)
|
||||
return AttachmentProcessingResult(text=None, file_name=None, error=msg)
|
||||
|
||||
if len(extracted_text) > CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD:
|
||||
msg = (
|
||||
f"Skipping attachment {attachment['title']} due to char count "
|
||||
f"({len(extracted_text)} > {CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD})"
|
||||
)
|
||||
logger.warning(msg)
|
||||
return AttachmentProcessingResult(text=None, file_name=None, error=msg)
|
||||
|
||||
# Save the attachment
|
||||
try:
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
saved_record = save_bytes_to_pgfilestore(
|
||||
db_session=db_session,
|
||||
raw_bytes=raw_bytes,
|
||||
media_type=media_type,
|
||||
identifier=attachment["id"],
|
||||
display_name=attachment["title"],
|
||||
)
|
||||
except Exception as e:
|
||||
msg = f"Failed to save attachment '{attachment['title']}' to PG: {e}"
|
||||
logger.error(msg, exc_info=e)
|
||||
return AttachmentProcessingResult(
|
||||
text=extracted_text, file_name=None, error=msg
|
||||
)
|
||||
|
||||
return AttachmentProcessingResult(
|
||||
text=extracted_text, file_name=saved_record.file_name, error=None
|
||||
)
|
||||
|
||||
|
||||
def convert_attachment_to_content(
|
||||
confluence_client: "OnyxConfluence",
|
||||
attachment: dict[str, Any],
|
||||
@@ -540,41 +479,3 @@ def update_param_in_path(path: str, param: str, value: str) -> str:
|
||||
+ "?"
|
||||
+ "&".join(f"{k}={quote(v[0])}" for k, v in query_params.items())
|
||||
)
|
||||
|
||||
|
||||
def attachment_to_file_record(
|
||||
confluence_client: "OnyxConfluence",
|
||||
attachment: dict[str, Any],
|
||||
db_session: Session,
|
||||
) -> tuple[PGFileStore, bytes]:
|
||||
"""Save an attachment to the file store and return the file record."""
|
||||
download_link = _attachment_to_download_link(confluence_client, attachment)
|
||||
image_data = confluence_client.get(
|
||||
download_link, absolute=True, not_json_response=True
|
||||
)
|
||||
|
||||
file_type = attachment.get("metadata", {}).get(
|
||||
"mediaType", "application/octet-stream"
|
||||
)
|
||||
|
||||
# Save image to file store
|
||||
file_name = f"confluence_attachment_{attachment['id']}"
|
||||
lobj_oid = create_populate_lobj(BytesIO(image_data), db_session)
|
||||
pgfilestore = upsert_pgfilestore(
|
||||
file_name=file_name,
|
||||
display_name=attachment["title"],
|
||||
file_origin=FileOrigin.OTHER,
|
||||
file_type=file_type,
|
||||
lobj_oid=lobj_oid,
|
||||
db_session=db_session,
|
||||
commit=True,
|
||||
)
|
||||
|
||||
return pgfilestore, image_data
|
||||
|
||||
|
||||
def _attachment_to_download_link(
|
||||
confluence_client: "OnyxConfluence", attachment: dict[str, Any]
|
||||
) -> str:
|
||||
"""Extracts the download link to images."""
|
||||
return confluence_client.url + attachment["_links"]["download"]
|
||||
|
||||
@@ -18,7 +18,6 @@ from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.db.engine import get_session_with_current_tenant
|
||||
from onyx.db.pg_file_store import get_pgfilestore_by_file_name
|
||||
from onyx.file_processing.extract_file_text import extract_text_and_images
|
||||
from onyx.file_processing.extract_file_text import get_file_ext
|
||||
from onyx.file_processing.extract_file_text import is_accepted_file_ext
|
||||
@@ -30,24 +29,6 @@ from onyx.utils.logger import setup_logger
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def _read_file_from_filestore(
|
||||
file_name: str,
|
||||
db_session: Session,
|
||||
) -> IO | None:
|
||||
"""
|
||||
Gets the content of a file from Postgres.
|
||||
"""
|
||||
extension = get_file_ext(file_name)
|
||||
|
||||
# Read file from Postgres store
|
||||
file_content = get_default_file_store(db_session).read_file(file_name, mode="b")
|
||||
|
||||
if is_accepted_file_ext(extension, OnyxExtensionType.All):
|
||||
return file_content
|
||||
logger.warning(f"Skipping file '{file_name}' with extension '{extension}'")
|
||||
return None
|
||||
|
||||
|
||||
def _create_image_section(
|
||||
image_data: bytes,
|
||||
db_session: Session,
|
||||
@@ -58,7 +39,7 @@ def _create_image_section(
|
||||
) -> tuple[ImageSection, str | None]:
|
||||
"""
|
||||
Creates an ImageSection for an image file or embedded image.
|
||||
Stores the image in PGFileStore but does not generate a summary.
|
||||
Stores the image in FileStore but does not generate a summary.
|
||||
|
||||
Args:
|
||||
image_data: Raw image bytes
|
||||
@@ -71,14 +52,14 @@ def _create_image_section(
|
||||
Tuple of (ImageSection, stored_file_name or None)
|
||||
"""
|
||||
# Create a unique identifier for the image
|
||||
file_name = f"{parent_file_name}_embedded_{idx}" if idx > 0 else parent_file_name
|
||||
file_id = f"{parent_file_name}_embedded_{idx}" if idx > 0 else parent_file_name
|
||||
|
||||
# Store the image and create a section
|
||||
try:
|
||||
section, stored_file_name = store_image_and_create_section(
|
||||
db_session=db_session,
|
||||
image_data=image_data,
|
||||
file_name=file_name,
|
||||
file_id=file_id,
|
||||
display_name=display_name,
|
||||
link=link,
|
||||
file_origin=FileOrigin.CONNECTOR,
|
||||
@@ -90,6 +71,7 @@ def _create_image_section(
|
||||
|
||||
|
||||
def _process_file(
|
||||
file_id: str,
|
||||
file_name: str,
|
||||
file: IO[Any],
|
||||
metadata: dict[str, Any] | None,
|
||||
@@ -107,12 +89,6 @@ def _process_file(
|
||||
# Get file extension and determine file type
|
||||
extension = get_file_ext(file_name)
|
||||
|
||||
# Fetch the DB record so we know the ID for internal URL
|
||||
pg_record = get_pgfilestore_by_file_name(file_name=file_name, db_session=db_session)
|
||||
if not pg_record:
|
||||
logger.warning(f"No file record found for '{file_name}' in PG; skipping.")
|
||||
return []
|
||||
|
||||
if not is_accepted_file_ext(extension, OnyxExtensionType.All):
|
||||
logger.warning(
|
||||
f"Skipping file '{file_name}' with unrecognized extension '{extension}'"
|
||||
@@ -151,7 +127,6 @@ def _process_file(
|
||||
for k, v in metadata.items()
|
||||
if k
|
||||
not in [
|
||||
"document_id",
|
||||
"time_updated",
|
||||
"doc_updated_at",
|
||||
"link",
|
||||
@@ -171,7 +146,7 @@ def _process_file(
|
||||
DocumentSource(source_type_str) if source_type_str else DocumentSource.FILE
|
||||
)
|
||||
|
||||
doc_id = metadata.get("document_id") or f"FILE_CONNECTOR__{file_name}"
|
||||
doc_id = f"FILE_CONNECTOR__{file_id}"
|
||||
title = metadata.get("title") or file_display_name
|
||||
|
||||
# 1) If the file itself is an image, handle that scenario quickly
|
||||
@@ -187,7 +162,7 @@ def _process_file(
|
||||
section, _ = _create_image_section(
|
||||
image_data=image_data,
|
||||
db_session=db_session,
|
||||
parent_file_name=pg_record.file_name,
|
||||
parent_file_name=file_id,
|
||||
display_name=title,
|
||||
)
|
||||
|
||||
@@ -234,21 +209,25 @@ def _process_file(
|
||||
TextSection(link=link_in_meta, text=extraction_result.text_content.strip())
|
||||
)
|
||||
|
||||
# Then any extracted images from docx, etc.
|
||||
# Then any extracted images from docx, PDFs, etc.
|
||||
for idx, (img_data, img_name) in enumerate(
|
||||
extraction_result.embedded_images, start=1
|
||||
):
|
||||
# Store each embedded image as a separate file in PGFileStore
|
||||
# Store each embedded image as a separate file in FileStore
|
||||
# and create a section with the image reference
|
||||
try:
|
||||
image_section, _ = _create_image_section(
|
||||
image_section, stored_file_name = _create_image_section(
|
||||
image_data=img_data,
|
||||
db_session=db_session,
|
||||
parent_file_name=pg_record.file_name,
|
||||
parent_file_name=file_id,
|
||||
display_name=f"{title} - image {idx}",
|
||||
idx=idx,
|
||||
)
|
||||
sections.append(image_section)
|
||||
logger.debug(
|
||||
f"Created ImageSection for embedded image {idx} "
|
||||
f"in {file_name}, stored as: {stored_file_name}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to process embedded image {idx} in {file_name}: {e}"
|
||||
@@ -304,23 +283,27 @@ class LocalFileConnector(LoadConnector):
|
||||
documents: list[Document] = []
|
||||
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
for file_path in self.file_locations:
|
||||
for file_id in self.file_locations:
|
||||
current_datetime = datetime.now(timezone.utc)
|
||||
|
||||
file_io = _read_file_from_filestore(
|
||||
file_name=file_path,
|
||||
db_session=db_session,
|
||||
)
|
||||
if not file_io:
|
||||
file_store = get_default_file_store(db_session)
|
||||
file_record = file_store.read_file_record(file_id=file_id)
|
||||
if not file_record:
|
||||
# typically an unsupported extension
|
||||
logger.warning(
|
||||
f"No file record found for '{file_id}' in PG; skipping."
|
||||
)
|
||||
continue
|
||||
|
||||
metadata = self._get_file_metadata(file_path)
|
||||
file_io = file_store.read_file(file_id=file_id, mode="b")
|
||||
|
||||
metadata = self._get_file_metadata(file_record.display_name)
|
||||
metadata["time_updated"] = metadata.get(
|
||||
"time_updated", current_datetime
|
||||
)
|
||||
new_docs = _process_file(
|
||||
file_name=file_path,
|
||||
file_id=file_id,
|
||||
file_name=file_record.display_name,
|
||||
file=file_io,
|
||||
metadata=metadata,
|
||||
pdf_pass=self.pdf_pass,
|
||||
|
||||
@@ -187,7 +187,7 @@ def _download_and_extract_sections_basic(
|
||||
section, embedded_id = store_image_and_create_section(
|
||||
db_session=db_session,
|
||||
image_data=response_call(),
|
||||
file_name=file_id,
|
||||
file_id=file_id,
|
||||
display_name=file_name,
|
||||
media_type=mime_type,
|
||||
file_origin=FileOrigin.CONNECTOR,
|
||||
@@ -211,7 +211,7 @@ def _download_and_extract_sections_basic(
|
||||
section, embedded_id = store_image_and_create_section(
|
||||
db_session=db_session,
|
||||
image_data=img_data,
|
||||
file_name=f"{file_id}_img_{idx}",
|
||||
file_id=f"{file_id}_img_{idx}",
|
||||
display_name=img_name or f"{file_name} - image {idx}",
|
||||
file_origin=FileOrigin.CONNECTOR,
|
||||
)
|
||||
|
||||
@@ -34,7 +34,7 @@ class Section(BaseModel):
|
||||
|
||||
link: str | None = None
|
||||
text: str | None = None
|
||||
image_file_name: str | None = None
|
||||
image_file_id: str | None = None
|
||||
|
||||
|
||||
class TextSection(Section):
|
||||
@@ -49,10 +49,10 @@ class TextSection(Section):
|
||||
class ImageSection(Section):
|
||||
"""Section containing an image reference"""
|
||||
|
||||
image_file_name: str
|
||||
image_file_id: str
|
||||
|
||||
def __sizeof__(self) -> int:
|
||||
return sys.getsizeof(self.image_file_name) + sys.getsizeof(self.link)
|
||||
return sys.getsizeof(self.image_file_id) + sys.getsizeof(self.link)
|
||||
|
||||
|
||||
class BasicExpertInfo(BaseModel):
|
||||
|
||||
@@ -56,7 +56,7 @@ def update_image_sections_with_query(
|
||||
chunks_with_images = []
|
||||
for section in sections:
|
||||
for chunk in section.chunks:
|
||||
if chunk.image_file_name:
|
||||
if chunk.image_file_id:
|
||||
chunks_with_images.append(chunk)
|
||||
|
||||
if not chunks_with_images:
|
||||
@@ -68,19 +68,19 @@ def update_image_sections_with_query(
|
||||
def process_image_chunk(chunk: InferenceChunk) -> tuple[str, str]:
|
||||
try:
|
||||
logger.debug(
|
||||
f"Processing image chunk with ID: {chunk.unique_id}, image: {chunk.image_file_name}"
|
||||
f"Processing image chunk with ID: {chunk.unique_id}, image: {chunk.image_file_id}"
|
||||
)
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
file_record = get_default_file_store(db_session).read_file(
|
||||
cast(str, chunk.image_file_name), mode="b"
|
||||
cast(str, chunk.image_file_id), mode="b"
|
||||
)
|
||||
if not file_record:
|
||||
logger.error(f"Image file not found: {chunk.image_file_name}")
|
||||
logger.error(f"Image file not found: {chunk.image_file_id}")
|
||||
raise Exception("File not found")
|
||||
file_content = file_record.read()
|
||||
image_base64 = base64.b64encode(file_content).decode()
|
||||
logger.debug(
|
||||
f"Successfully loaded image data for {chunk.image_file_name}"
|
||||
f"Successfully loaded image data for {chunk.image_file_id}"
|
||||
)
|
||||
|
||||
messages: list[BaseMessage] = [
|
||||
@@ -114,7 +114,7 @@ def update_image_sections_with_query(
|
||||
|
||||
except Exception:
|
||||
logger.exception(
|
||||
f"Error updating image section with query source image url: {chunk.image_file_name}"
|
||||
f"Error updating image section with query source image url: {chunk.image_file_id}"
|
||||
)
|
||||
return chunk.unique_id, "Error analyzing image."
|
||||
|
||||
|
||||
142
backend/onyx/db/_deprecated/pg_file_store.py
Normal file
142
backend/onyx/db/_deprecated/pg_file_store.py
Normal file
@@ -0,0 +1,142 @@
|
||||
"""Kept around since it's used in the migration to move to S3/MinIO"""
|
||||
|
||||
import tempfile
|
||||
from io import BytesIO
|
||||
from typing import IO
|
||||
|
||||
from psycopg2.extensions import connection
|
||||
from sqlalchemy import text # NEW: for SQL large-object helpers
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.file_store.constants import MAX_IN_MEMORY_SIZE
|
||||
from onyx.file_store.constants import STANDARD_CHUNK_SIZE
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def get_pg_conn_from_session(db_session: Session) -> connection:
|
||||
return db_session.connection().connection.connection # type: ignore
|
||||
|
||||
|
||||
def create_populate_lobj(
|
||||
content: IO,
|
||||
db_session: Session,
|
||||
) -> int:
|
||||
"""Create a PostgreSQL large object from *content* and return its OID.
|
||||
|
||||
Preferred approach is to use the psycopg2 ``lobject`` API, but if that is
|
||||
unavailable (e.g. when the underlying connection is an asyncpg adapter)
|
||||
we fall back to PostgreSQL helper functions such as ``lo_from_bytea``.
|
||||
|
||||
NOTE: this function intentionally *does not* commit the surrounding
|
||||
transaction – that is handled by the caller so all work stays atomic.
|
||||
"""
|
||||
|
||||
pg_conn = None
|
||||
try:
|
||||
pg_conn = get_pg_conn_from_session(db_session)
|
||||
# ``AsyncAdapt_asyncpg_connection`` (asyncpg) has no ``lobject``
|
||||
if not hasattr(pg_conn, "lobject"):
|
||||
raise AttributeError # will be handled by fallback below
|
||||
|
||||
large_object = pg_conn.lobject()
|
||||
|
||||
# write in multiple chunks to avoid loading the whole file into memory
|
||||
while True:
|
||||
chunk = content.read(STANDARD_CHUNK_SIZE)
|
||||
if not chunk:
|
||||
break
|
||||
large_object.write(chunk)
|
||||
|
||||
large_object.close()
|
||||
|
||||
return large_object.oid
|
||||
|
||||
except AttributeError:
|
||||
# Fall back to SQL helper functions – read the full content into memory
|
||||
# (acceptable for the limited number and size of files handled during
|
||||
# migrations). ``lo_from_bytea`` returns the new OID.
|
||||
byte_data = content.read()
|
||||
result = db_session.execute(
|
||||
text("SELECT lo_from_bytea(0, :data) AS oid"),
|
||||
{"data": byte_data},
|
||||
)
|
||||
# ``scalar_one`` is 2.0-style; ``scalar`` works on both 1.4/2.0.
|
||||
lobj_oid = result.scalar()
|
||||
if lobj_oid is None:
|
||||
raise RuntimeError("Failed to create large object")
|
||||
return int(lobj_oid)
|
||||
|
||||
|
||||
def read_lobj(
|
||||
lobj_oid: int,
|
||||
db_session: Session,
|
||||
mode: str | None = None,
|
||||
use_tempfile: bool = False,
|
||||
) -> IO:
|
||||
"""Read a PostgreSQL large object identified by *lobj_oid*.
|
||||
|
||||
Attempts to use the native ``lobject`` API first; if unavailable falls back
|
||||
to ``lo_get`` which returns the large object's contents as *bytea*.
|
||||
"""
|
||||
|
||||
pg_conn = None
|
||||
try:
|
||||
pg_conn = get_pg_conn_from_session(db_session)
|
||||
if not hasattr(pg_conn, "lobject"):
|
||||
raise AttributeError
|
||||
|
||||
# Ensure binary mode by default
|
||||
if mode is None:
|
||||
mode = "rb"
|
||||
large_object = (
|
||||
pg_conn.lobject(lobj_oid, mode=mode) if mode else pg_conn.lobject(lobj_oid)
|
||||
)
|
||||
|
||||
if use_tempfile:
|
||||
temp_file = tempfile.SpooledTemporaryFile(max_size=MAX_IN_MEMORY_SIZE)
|
||||
while True:
|
||||
chunk = large_object.read(STANDARD_CHUNK_SIZE)
|
||||
if not chunk:
|
||||
break
|
||||
temp_file.write(chunk)
|
||||
temp_file.seek(0)
|
||||
return temp_file
|
||||
else:
|
||||
return BytesIO(large_object.read())
|
||||
|
||||
except AttributeError:
|
||||
# Fallback path using ``lo_get``
|
||||
result = db_session.execute(
|
||||
text("SELECT lo_get(:oid) AS data"),
|
||||
{"oid": lobj_oid},
|
||||
)
|
||||
byte_data = result.scalar()
|
||||
if byte_data is None:
|
||||
raise RuntimeError("Failed to read large object")
|
||||
|
||||
if use_tempfile:
|
||||
temp_file = tempfile.SpooledTemporaryFile(max_size=MAX_IN_MEMORY_SIZE)
|
||||
temp_file.write(byte_data)
|
||||
temp_file.seek(0)
|
||||
return temp_file
|
||||
return BytesIO(byte_data)
|
||||
|
||||
|
||||
def delete_lobj_by_id(
|
||||
lobj_oid: int,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
"""Remove a large object by OID, regardless of driver implementation."""
|
||||
|
||||
try:
|
||||
pg_conn = get_pg_conn_from_session(db_session)
|
||||
if hasattr(pg_conn, "lobject"):
|
||||
pg_conn.lobject(lobj_oid).unlink()
|
||||
return
|
||||
raise AttributeError
|
||||
except AttributeError:
|
||||
# Fallback for drivers without ``lobject`` support
|
||||
db_session.execute(text("SELECT lo_unlink(:oid)"), {"oid": lobj_oid})
|
||||
# No explicit result expected
|
||||
@@ -47,7 +47,7 @@ from onyx.db.models import ToolCall
|
||||
from onyx.db.models import User
|
||||
from onyx.db.models import UserFile
|
||||
from onyx.db.persona import get_best_persona_id_for_user
|
||||
from onyx.db.pg_file_store import delete_lobj_by_name
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.file_store.models import FileDescriptor
|
||||
from onyx.file_store.models import InMemoryChatFile
|
||||
from onyx.llm.override_models import LLMOverride
|
||||
@@ -228,11 +228,10 @@ def delete_messages_and_files_from_chat_session(
|
||||
for id, files in messages_with_files:
|
||||
delete_tool_call_for_message_id(message_id=id, db_session=db_session)
|
||||
delete_search_doc_message_relationship(message_id=id, db_session=db_session)
|
||||
for file_info in files or {}:
|
||||
lobj_name = file_info.get("id")
|
||||
if lobj_name:
|
||||
logger.info(f"Deleting file with name: {lobj_name}")
|
||||
delete_lobj_by_name(lobj_name, db_session)
|
||||
|
||||
file_store = get_default_file_store(db_session)
|
||||
for file_info in files or []:
|
||||
file_store.delete_file(file_id=file_info.get("id"))
|
||||
|
||||
# Delete ChatMessage records - CASCADE constraints will automatically handle:
|
||||
# - AgentSubQuery records (via AgentSubQuestion)
|
||||
|
||||
85
backend/onyx/db/file_record.py
Normal file
85
backend/onyx/db/file_record.py
Normal file
@@ -0,0 +1,85 @@
|
||||
from sqlalchemy import and_
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.background.task_utils import QUERY_REPORT_NAME_PREFIX
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.configs.constants import FileType
|
||||
from onyx.db.models import FileRecord
|
||||
|
||||
|
||||
def get_query_history_export_files(
|
||||
db_session: Session,
|
||||
) -> list[FileRecord]:
|
||||
return list(
|
||||
db_session.scalars(
|
||||
select(FileRecord).where(
|
||||
and_(
|
||||
FileRecord.file_id.like(f"{QUERY_REPORT_NAME_PREFIX}-%"),
|
||||
FileRecord.file_type == FileType.CSV,
|
||||
FileRecord.file_origin == FileOrigin.QUERY_HISTORY_CSV,
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def get_filerecord_by_file_id_optional(
|
||||
file_id: str,
|
||||
db_session: Session,
|
||||
) -> FileRecord | None:
|
||||
return db_session.query(FileRecord).filter_by(file_id=file_id).first()
|
||||
|
||||
|
||||
def get_filerecord_by_file_id(
|
||||
file_id: str,
|
||||
db_session: Session,
|
||||
) -> FileRecord:
|
||||
filestore = db_session.query(FileRecord).filter_by(file_id=file_id).first()
|
||||
|
||||
if not filestore:
|
||||
raise RuntimeError(f"File by id {file_id} does not exist or was deleted")
|
||||
|
||||
return filestore
|
||||
|
||||
|
||||
def delete_filerecord_by_file_id(
|
||||
file_id: str,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
db_session.query(FileRecord).filter_by(file_id=file_id).delete()
|
||||
|
||||
|
||||
def upsert_filerecord(
|
||||
file_id: str,
|
||||
display_name: str,
|
||||
file_origin: FileOrigin,
|
||||
file_type: str,
|
||||
bucket_name: str,
|
||||
object_key: str,
|
||||
db_session: Session,
|
||||
file_metadata: dict | None = None,
|
||||
) -> FileRecord:
|
||||
"""Create or update a file store record for external storage (S3, MinIO, etc.)"""
|
||||
filestore = db_session.query(FileRecord).filter_by(file_id=file_id).first()
|
||||
|
||||
if filestore:
|
||||
filestore.display_name = display_name
|
||||
filestore.file_origin = file_origin
|
||||
filestore.file_type = file_type
|
||||
filestore.file_metadata = file_metadata
|
||||
filestore.bucket_name = bucket_name
|
||||
filestore.object_key = object_key
|
||||
else:
|
||||
filestore = FileRecord(
|
||||
file_id=file_id,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_metadata=file_metadata,
|
||||
bucket_name=bucket_name,
|
||||
object_key=object_key,
|
||||
)
|
||||
db_session.add(filestore)
|
||||
|
||||
return filestore
|
||||
@@ -2689,15 +2689,28 @@ class KVStore(Base):
|
||||
encrypted_value: Mapped[JSON_ro] = mapped_column(EncryptedJson(), nullable=True)
|
||||
|
||||
|
||||
class PGFileStore(Base):
|
||||
__tablename__ = "file_store"
|
||||
class FileRecord(Base):
|
||||
__tablename__ = "file_record"
|
||||
|
||||
# Internal file ID, must be unique across all files.
|
||||
file_id: Mapped[str] = mapped_column(String, primary_key=True)
|
||||
|
||||
file_name: Mapped[str] = mapped_column(String, primary_key=True)
|
||||
display_name: Mapped[str] = mapped_column(String, nullable=True)
|
||||
file_origin: Mapped[FileOrigin] = mapped_column(Enum(FileOrigin, native_enum=False))
|
||||
file_type: Mapped[str] = mapped_column(String, default="text/plain")
|
||||
file_metadata: Mapped[JSON_ro] = mapped_column(postgresql.JSONB(), nullable=True)
|
||||
lobj_oid: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
|
||||
# External storage support (S3, MinIO, Azure Blob, etc.)
|
||||
bucket_name: Mapped[str] = mapped_column(String)
|
||||
object_key: Mapped[str] = mapped_column(String)
|
||||
|
||||
# Timestamps for external storage
|
||||
created_at: Mapped[datetime.datetime] = mapped_column(
|
||||
DateTime(timezone=True), server_default=func.now()
|
||||
)
|
||||
updated_at: Mapped[datetime.datetime] = mapped_column(
|
||||
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
|
||||
)
|
||||
|
||||
|
||||
class AgentSearchMetrics(Base):
|
||||
@@ -3022,13 +3035,13 @@ class PublicExternalUserGroup(Base):
|
||||
class UsageReport(Base):
|
||||
"""This stores metadata about usage reports generated by admin including user who generated
|
||||
them as well las the period they cover. The actual zip file of the report is stored as a lo
|
||||
using the PGFileStore
|
||||
using the FileRecord
|
||||
"""
|
||||
|
||||
__tablename__ = "usage_reports"
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
report_name: Mapped[str] = mapped_column(ForeignKey("file_store.file_name"))
|
||||
report_name: Mapped[str] = mapped_column(ForeignKey("file_record.file_id"))
|
||||
|
||||
# if None, report was auto-generated
|
||||
requestor_user_id: Mapped[UUID | None] = mapped_column(
|
||||
@@ -3043,7 +3056,7 @@ class UsageReport(Base):
|
||||
period_to: Mapped[datetime.datetime | None] = mapped_column(DateTime(timezone=True))
|
||||
|
||||
requestor = relationship("User")
|
||||
file = relationship("PGFileStore")
|
||||
file = relationship("FileRecord")
|
||||
|
||||
|
||||
class InputPrompt(Base):
|
||||
|
||||
@@ -1,206 +0,0 @@
|
||||
import tempfile
|
||||
from io import BytesIO
|
||||
from typing import IO
|
||||
|
||||
from psycopg2.extensions import connection
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.sql import and_
|
||||
from sqlalchemy.sql import select
|
||||
|
||||
from onyx.background.task_utils import QUERY_REPORT_NAME_PREFIX
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.configs.constants import FileType
|
||||
from onyx.db.models import PGFileStore
|
||||
from onyx.file_store.constants import MAX_IN_MEMORY_SIZE
|
||||
from onyx.file_store.constants import STANDARD_CHUNK_SIZE
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def get_pg_conn_from_session(db_session: Session) -> connection:
|
||||
return db_session.connection().connection.connection # type: ignore
|
||||
|
||||
|
||||
def get_pgfilestore_by_file_name_optional(
|
||||
file_name: str,
|
||||
db_session: Session,
|
||||
) -> PGFileStore | None:
|
||||
return db_session.query(PGFileStore).filter_by(file_name=file_name).first()
|
||||
|
||||
|
||||
def get_pgfilestore_by_file_name(
|
||||
file_name: str,
|
||||
db_session: Session,
|
||||
) -> PGFileStore:
|
||||
pgfilestore = db_session.query(PGFileStore).filter_by(file_name=file_name).first()
|
||||
|
||||
if not pgfilestore:
|
||||
raise RuntimeError(f"File by name {file_name} does not exist or was deleted")
|
||||
|
||||
return pgfilestore
|
||||
|
||||
|
||||
def delete_pgfilestore_by_file_name(
|
||||
file_name: str,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
db_session.query(PGFileStore).filter_by(file_name=file_name).delete()
|
||||
|
||||
|
||||
def create_populate_lobj(
|
||||
content: IO,
|
||||
db_session: Session,
|
||||
) -> int:
|
||||
"""Note, this does not commit the changes to the DB
|
||||
This is because the commit should happen with the PGFileStore row creation
|
||||
That step finalizes both the Large Object and the table tracking it
|
||||
"""
|
||||
pg_conn = get_pg_conn_from_session(db_session)
|
||||
large_object = pg_conn.lobject()
|
||||
|
||||
# write in multiple chunks to avoid loading the whole file into memory
|
||||
while True:
|
||||
chunk = content.read(STANDARD_CHUNK_SIZE)
|
||||
if not chunk:
|
||||
break
|
||||
large_object.write(chunk)
|
||||
|
||||
large_object.close()
|
||||
|
||||
return large_object.oid
|
||||
|
||||
|
||||
def read_lobj(
|
||||
lobj_oid: int,
|
||||
db_session: Session,
|
||||
mode: str | None = None,
|
||||
use_tempfile: bool = False,
|
||||
) -> IO:
|
||||
pg_conn = get_pg_conn_from_session(db_session)
|
||||
# Ensure we're using binary mode by default for large objects
|
||||
if mode is None:
|
||||
mode = "rb"
|
||||
large_object = (
|
||||
pg_conn.lobject(lobj_oid, mode=mode) if mode else pg_conn.lobject(lobj_oid)
|
||||
)
|
||||
|
||||
if use_tempfile:
|
||||
temp_file = tempfile.SpooledTemporaryFile(max_size=MAX_IN_MEMORY_SIZE)
|
||||
while True:
|
||||
chunk = large_object.read(STANDARD_CHUNK_SIZE)
|
||||
if not chunk:
|
||||
break
|
||||
temp_file.write(chunk)
|
||||
temp_file.seek(0)
|
||||
return temp_file
|
||||
else:
|
||||
# Ensure we're getting raw bytes without text decoding
|
||||
return BytesIO(large_object.read())
|
||||
|
||||
|
||||
def delete_lobj_by_id(
|
||||
lobj_oid: int,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
pg_conn = get_pg_conn_from_session(db_session)
|
||||
pg_conn.lobject(lobj_oid).unlink()
|
||||
|
||||
|
||||
def delete_lobj_by_name(
|
||||
lobj_name: str,
|
||||
db_session: Session,
|
||||
) -> None:
|
||||
try:
|
||||
pgfilestore = get_pgfilestore_by_file_name(lobj_name, db_session)
|
||||
except RuntimeError:
|
||||
logger.info(f"no file with name {lobj_name} found")
|
||||
return
|
||||
|
||||
pg_conn = get_pg_conn_from_session(db_session)
|
||||
pg_conn.lobject(pgfilestore.lobj_oid).unlink()
|
||||
|
||||
delete_pgfilestore_by_file_name(lobj_name, db_session)
|
||||
db_session.commit()
|
||||
|
||||
|
||||
def upsert_pgfilestore(
|
||||
file_name: str,
|
||||
display_name: str | None,
|
||||
file_origin: FileOrigin,
|
||||
file_type: str,
|
||||
lobj_oid: int,
|
||||
db_session: Session,
|
||||
commit: bool = False,
|
||||
file_metadata: dict | None = None,
|
||||
) -> PGFileStore:
|
||||
pgfilestore = db_session.query(PGFileStore).filter_by(file_name=file_name).first()
|
||||
|
||||
if pgfilestore:
|
||||
try:
|
||||
# This should not happen in normal execution
|
||||
delete_lobj_by_id(lobj_oid=pgfilestore.lobj_oid, db_session=db_session)
|
||||
except Exception:
|
||||
# If the delete fails as well, the large object doesn't exist anyway and even if it
|
||||
# fails to delete, it's not too terrible as most files sizes are insignificant
|
||||
logger.error(
|
||||
f"Failed to delete large object with oid {pgfilestore.lobj_oid}"
|
||||
)
|
||||
|
||||
pgfilestore.lobj_oid = lobj_oid
|
||||
else:
|
||||
pgfilestore = PGFileStore(
|
||||
file_name=file_name,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_metadata=file_metadata,
|
||||
lobj_oid=lobj_oid,
|
||||
)
|
||||
db_session.add(pgfilestore)
|
||||
|
||||
if commit:
|
||||
db_session.commit()
|
||||
|
||||
return pgfilestore
|
||||
|
||||
|
||||
def save_bytes_to_pgfilestore(
|
||||
db_session: Session,
|
||||
raw_bytes: bytes,
|
||||
media_type: str,
|
||||
identifier: str,
|
||||
display_name: str,
|
||||
file_origin: FileOrigin = FileOrigin.OTHER,
|
||||
) -> PGFileStore:
|
||||
"""
|
||||
Saves raw bytes to PGFileStore and returns the resulting record.
|
||||
"""
|
||||
file_name = f"{file_origin.name.lower()}_{identifier}"
|
||||
lobj_oid = create_populate_lobj(BytesIO(raw_bytes), db_session)
|
||||
pgfilestore = upsert_pgfilestore(
|
||||
file_name=file_name,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=media_type,
|
||||
lobj_oid=lobj_oid,
|
||||
db_session=db_session,
|
||||
commit=True,
|
||||
)
|
||||
return pgfilestore
|
||||
|
||||
|
||||
def get_query_history_export_files(
|
||||
db_session: Session,
|
||||
) -> list[PGFileStore]:
|
||||
return list(
|
||||
db_session.scalars(
|
||||
select(PGFileStore).where(
|
||||
and_(
|
||||
PGFileStore.file_name.like(f"{QUERY_REPORT_NAME_PREFIX}-%"),
|
||||
PGFileStore.file_type == FileType.CSV,
|
||||
PGFileStore.file_origin == FileOrigin.QUERY_HISTORY_CSV,
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
@@ -136,7 +136,8 @@ def _vespa_hit_to_inference_chunk(
|
||||
section_continuation=fields[SECTION_CONTINUATION],
|
||||
document_id=fields[DOCUMENT_ID],
|
||||
source_type=fields[SOURCE_TYPE],
|
||||
image_file_name=fields.get(IMAGE_FILE_NAME),
|
||||
# still called `image_file_name` in Vespa for backwards compatibility
|
||||
image_file_id=fields.get(IMAGE_FILE_NAME),
|
||||
title=fields.get(TITLE),
|
||||
semantic_identifier=fields[SEMANTIC_IDENTIFIER],
|
||||
boost=fields.get(BOOST, 1),
|
||||
|
||||
@@ -206,7 +206,8 @@ def _index_vespa_chunk(
|
||||
# which only calls VespaIndex.update
|
||||
ACCESS_CONTROL_LIST: {acl_entry: 1 for acl_entry in chunk.access.to_acl()},
|
||||
DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets},
|
||||
IMAGE_FILE_NAME: chunk.image_file_name,
|
||||
# still called `image_file_name` in Vespa for backwards compatibility
|
||||
IMAGE_FILE_NAME: chunk.image_file_id,
|
||||
USER_FILE: chunk.user_file if chunk.user_file is not None else None,
|
||||
USER_FOLDER: chunk.user_folder if chunk.user_folder is not None else None,
|
||||
BOOST: chunk.boost,
|
||||
|
||||
@@ -68,6 +68,7 @@ PRIMARY_OWNERS = "primary_owners"
|
||||
SECONDARY_OWNERS = "secondary_owners"
|
||||
RECENCY_BIAS = "recency_bias"
|
||||
HIDDEN = "hidden"
|
||||
# for legacy reasons, called `name` in Vespa despite it really being an ID
|
||||
IMAGE_FILE_NAME = "image_file_name"
|
||||
|
||||
# Specific to Vespa, needed for highlighting matching keywords / section
|
||||
|
||||
@@ -2,7 +2,6 @@ import io
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
import zipfile
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Iterator
|
||||
@@ -613,15 +612,13 @@ def convert_docx_to_txt(file: UploadFile, file_store: FileStore) -> str:
|
||||
all_paras = [p.text for p in doc.paragraphs]
|
||||
text_content = "\n".join(all_paras)
|
||||
|
||||
text_file_name = docx_to_txt_filename(file.filename or f"docx_{uuid.uuid4()}")
|
||||
file_store.save_file(
|
||||
file_name=text_file_name,
|
||||
file_id = file_store.save_file(
|
||||
content=BytesIO(text_content.encode("utf-8")),
|
||||
display_name=file.filename,
|
||||
file_origin=FileOrigin.CONNECTOR,
|
||||
file_type="text/plain",
|
||||
)
|
||||
return text_file_name
|
||||
return file_id
|
||||
|
||||
|
||||
def docx_to_txt_filename(file_path: str) -> str:
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
from io import BytesIO
|
||||
from typing import Tuple
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.db.pg_file_store import save_bytes_to_pgfilestore
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
@@ -13,19 +14,19 @@ logger = setup_logger()
|
||||
def store_image_and_create_section(
|
||||
db_session: Session,
|
||||
image_data: bytes,
|
||||
file_name: str,
|
||||
file_id: str,
|
||||
display_name: str,
|
||||
link: str | None = None,
|
||||
media_type: str = "application/octet-stream",
|
||||
file_origin: FileOrigin = FileOrigin.OTHER,
|
||||
) -> Tuple[ImageSection, str | None]:
|
||||
"""
|
||||
Stores an image in PGFileStore and creates an ImageSection object without summarization.
|
||||
Stores an image in FileStore and creates an ImageSection object without summarization.
|
||||
|
||||
Args:
|
||||
db_session: Database session
|
||||
image_data: Raw image bytes
|
||||
file_name: Base identifier for the file
|
||||
file_id: Base identifier for the file
|
||||
display_name: Human-readable name for the image
|
||||
media_type: MIME type of the image
|
||||
file_origin: Origin of the file (e.g., CONFLUENCE, GOOGLE_DRIVE, etc.)
|
||||
@@ -33,26 +34,24 @@ def store_image_and_create_section(
|
||||
Returns:
|
||||
Tuple containing:
|
||||
- ImageSection object with image reference
|
||||
- The file_name in PGFileStore or None if storage failed
|
||||
- The file_id in FileStore or None if storage failed
|
||||
"""
|
||||
# Storage logic
|
||||
stored_file_name = None
|
||||
try:
|
||||
pgfilestore = save_bytes_to_pgfilestore(
|
||||
db_session=db_session,
|
||||
raw_bytes=image_data,
|
||||
media_type=media_type,
|
||||
identifier=file_name,
|
||||
file_store = get_default_file_store(db_session)
|
||||
file_id = file_store.save_file(
|
||||
content=BytesIO(image_data),
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=media_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
stored_file_name = pgfilestore.file_name
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to store image: {e}")
|
||||
raise e
|
||||
|
||||
# Create an ImageSection with empty text (will be filled by LLM later in the pipeline)
|
||||
return (
|
||||
ImageSection(image_file_name=stored_file_name, link=link),
|
||||
stored_file_name,
|
||||
ImageSection(image_file_id=file_id, link=link),
|
||||
file_id,
|
||||
)
|
||||
|
||||
161
backend/onyx/file_store/README.md
Normal file
161
backend/onyx/file_store/README.md
Normal file
@@ -0,0 +1,161 @@
|
||||
# Onyx File Store
|
||||
|
||||
The Onyx file store provides a unified interface for storing files and large binary objects in S3-compatible storage systems. It supports AWS S3, MinIO, Azure Blob Storage, Digital Ocean Spaces, and other S3-compatible services.
|
||||
|
||||
## Architecture
|
||||
|
||||
The file store uses a single database table (`file_record`) to store file metadata while the actual file content is stored in external S3-compatible storage. This approach provides scalability, cost-effectiveness, and decouples file storage from the database.
|
||||
|
||||
### Database Schema
|
||||
|
||||
The `file_record` table contains the following columns:
|
||||
|
||||
- `file_id` (primary key): Unique identifier for the file
|
||||
- `display_name`: Human-readable name for the file
|
||||
- `file_origin`: Origin/source of the file (enum)
|
||||
- `file_type`: MIME type of the file
|
||||
- `file_metadata`: Additional metadata as JSON
|
||||
- `bucket_name`: External storage bucket/container name
|
||||
- `object_key`: External storage object key/path
|
||||
- `created_at`: Timestamp when the file was created
|
||||
- `updated_at`: Timestamp when the file was last updated
|
||||
|
||||
## Storage Backend
|
||||
|
||||
### S3-Compatible Storage
|
||||
|
||||
Stores files in external S3-compatible storage systems while keeping metadata in the database.
|
||||
|
||||
**Pros:**
|
||||
- Scalable storage
|
||||
- Cost-effective for large files
|
||||
- CDN integration possible
|
||||
- Decoupled from database
|
||||
- Wide ecosystem support
|
||||
|
||||
**Cons:**
|
||||
- Additional infrastructure required
|
||||
- Network dependency
|
||||
- Eventual consistency considerations
|
||||
|
||||
## Configuration
|
||||
|
||||
All configuration is handled via environment variables. The system requires S3-compatible storage to be configured.
|
||||
|
||||
### AWS S3
|
||||
|
||||
```bash
|
||||
S3_FILE_STORE_BUCKET_NAME=your-bucket-name # Defaults to 'onyx-file-store-bucket'
|
||||
S3_FILE_STORE_PREFIX=onyx-files # Optional, defaults to 'onyx-files'
|
||||
|
||||
# AWS credentials (use one of these methods):
|
||||
# 1. Environment variables
|
||||
S3_AWS_ACCESS_KEY_ID=your-access-key
|
||||
S3_AWS_SECRET_ACCESS_KEY=your-secret-key
|
||||
AWS_REGION_NAME=us-east-2 # Optional, defaults to 'us-east-2'
|
||||
|
||||
# 2. IAM roles (recommended for EC2/ECS deployments)
|
||||
# No additional configuration needed if using IAM roles
|
||||
```
|
||||
|
||||
### MinIO
|
||||
|
||||
```bash
|
||||
S3_FILE_STORE_BUCKET_NAME=your-bucket-name
|
||||
S3_ENDPOINT_URL=http://localhost:9000 # MinIO endpoint
|
||||
S3_AWS_ACCESS_KEY_ID=minioadmin
|
||||
S3_AWS_SECRET_ACCESS_KEY=minioadmin
|
||||
AWS_REGION_NAME=us-east-1 # Any region name
|
||||
S3_VERIFY_SSL=false # Optional, defaults to false
|
||||
```
|
||||
|
||||
### Digital Ocean Spaces
|
||||
|
||||
```bash
|
||||
S3_FILE_STORE_BUCKET_NAME=your-space-name
|
||||
S3_ENDPOINT_URL=https://nyc3.digitaloceanspaces.com
|
||||
S3_AWS_ACCESS_KEY_ID=your-spaces-key
|
||||
S3_AWS_SECRET_ACCESS_KEY=your-spaces-secret
|
||||
AWS_REGION_NAME=nyc3
|
||||
```
|
||||
|
||||
### Other S3-Compatible Services
|
||||
|
||||
The file store works with any S3-compatible service. Simply configure:
|
||||
- `S3_FILE_STORE_BUCKET_NAME`: Your bucket/container name
|
||||
- `S3_ENDPOINT_URL`: The service endpoint URL
|
||||
- `S3_AWS_ACCESS_KEY_ID` and `S3_AWS_SECRET_ACCESS_KEY`: Your credentials
|
||||
- `AWS_REGION_NAME`: The region (any valid region name)
|
||||
|
||||
## Implementation
|
||||
|
||||
The system uses the `S3BackedFileStore` class that implements the abstract `FileStore` interface. The database uses generic column names (`bucket_name`, `object_key`) to maintain compatibility with different S3-compatible services.
|
||||
|
||||
### File Store Interface
|
||||
|
||||
The `FileStore` abstract base class defines the following methods:
|
||||
|
||||
- `initialize()`: Initialize the storage backend (create bucket if needed)
|
||||
- `has_file(file_id, file_origin, file_type)`: Check if a file exists
|
||||
- `save_file(content, display_name, file_origin, file_type, file_metadata, file_id)`: Save a file
|
||||
- `read_file(file_id, mode, use_tempfile)`: Read file content
|
||||
- `read_file_record(file_id)`: Get file metadata from database
|
||||
- `delete_file(file_id)`: Delete a file and its metadata
|
||||
- `get_file_with_mime_type(filename)`: Get file with parsed MIME type
|
||||
|
||||
## Usage Example
|
||||
|
||||
```python
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.configs.constants import FileOrigin
|
||||
|
||||
# Get the configured file store
|
||||
file_store = get_default_file_store(db_session)
|
||||
|
||||
# Initialize the storage backend (creates bucket if needed)
|
||||
file_store.initialize()
|
||||
|
||||
# Save a file
|
||||
with open("example.pdf", "rb") as f:
|
||||
file_id = file_store.save_file(
|
||||
content=f,
|
||||
display_name="Important Document.pdf",
|
||||
file_origin=FileOrigin.OTHER,
|
||||
file_type="application/pdf",
|
||||
file_metadata={"department": "engineering", "version": "1.0"}
|
||||
)
|
||||
|
||||
# Check if a file exists
|
||||
exists = file_store.has_file(
|
||||
file_id=file_id,
|
||||
file_origin=FileOrigin.OTHER,
|
||||
file_type="application/pdf"
|
||||
)
|
||||
|
||||
# Read a file
|
||||
file_content = file_store.read_file(file_id)
|
||||
|
||||
# Read file with temporary file (for large files)
|
||||
file_content = file_store.read_file(file_id, use_tempfile=True)
|
||||
|
||||
# Get file metadata
|
||||
file_record = file_store.read_file_record(file_id)
|
||||
|
||||
# Get file with MIME type detection
|
||||
file_with_mime = file_store.get_file_with_mime_type(file_id)
|
||||
|
||||
# Delete a file
|
||||
file_store.delete_file(file_id)
|
||||
```
|
||||
|
||||
## Initialization
|
||||
|
||||
When deploying the application, ensure that:
|
||||
|
||||
1. The S3-compatible storage service is accessible
|
||||
2. Credentials are properly configured
|
||||
3. The bucket specified in `S3_FILE_STORE_BUCKET_NAME` exists or the service account has permissions to create it
|
||||
4. Call `file_store.initialize()` during application startup to ensure the bucket exists
|
||||
|
||||
The file store will automatically create the bucket if it doesn't exist and the credentials have sufficient permissions.
|
||||
|
||||
@@ -1,21 +1,37 @@
|
||||
import tempfile
|
||||
import uuid
|
||||
from abc import ABC
|
||||
from abc import abstractmethod
|
||||
from io import BytesIO
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
from typing import IO
|
||||
|
||||
import boto3
|
||||
import puremagic
|
||||
from botocore.config import Config
|
||||
from botocore.exceptions import ClientError
|
||||
from mypy_boto3_s3 import S3Client
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.app_configs import AWS_REGION_NAME
|
||||
from onyx.configs.app_configs import S3_AWS_ACCESS_KEY_ID
|
||||
from onyx.configs.app_configs import S3_AWS_SECRET_ACCESS_KEY
|
||||
from onyx.configs.app_configs import S3_ENDPOINT_URL
|
||||
from onyx.configs.app_configs import S3_FILE_STORE_BUCKET_NAME
|
||||
from onyx.configs.app_configs import S3_FILE_STORE_PREFIX
|
||||
from onyx.configs.app_configs import S3_VERIFY_SSL
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.db.models import PGFileStore
|
||||
from onyx.db.pg_file_store import create_populate_lobj
|
||||
from onyx.db.pg_file_store import delete_lobj_by_id
|
||||
from onyx.db.pg_file_store import delete_pgfilestore_by_file_name
|
||||
from onyx.db.pg_file_store import get_pgfilestore_by_file_name
|
||||
from onyx.db.pg_file_store import get_pgfilestore_by_file_name_optional
|
||||
from onyx.db.pg_file_store import read_lobj
|
||||
from onyx.db.pg_file_store import upsert_pgfilestore
|
||||
from onyx.db.file_record import delete_filerecord_by_file_id
|
||||
from onyx.db.file_record import get_filerecord_by_file_id
|
||||
from onyx.db.file_record import get_filerecord_by_file_id_optional
|
||||
from onyx.db.file_record import upsert_filerecord
|
||||
from onyx.db.models import FileRecord as FileStoreModel
|
||||
from onyx.utils.file import FileWithMimeType
|
||||
from onyx.utils.logger import setup_logger
|
||||
from shared_configs.contextvars import get_current_tenant_id
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
class FileStore(ABC):
|
||||
@@ -23,20 +39,25 @@ class FileStore(ABC):
|
||||
An abstraction for storing files and large binary objects.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def initialize(self) -> None:
|
||||
"""
|
||||
Should generally be called once before any other methods are called.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def has_file(
|
||||
self,
|
||||
file_name: str,
|
||||
file_id: str,
|
||||
file_origin: FileOrigin,
|
||||
file_type: str,
|
||||
display_name: str | None = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Check if a file exists in the blob store
|
||||
|
||||
Parameters:
|
||||
- file_name: Name of the file to save
|
||||
- display_name: Display name of the file
|
||||
- file_id: Unique ID of the file to check for
|
||||
- file_origin: Origin of the file
|
||||
- file_type: Type of the file
|
||||
"""
|
||||
@@ -45,38 +66,39 @@ class FileStore(ABC):
|
||||
@abstractmethod
|
||||
def save_file(
|
||||
self,
|
||||
file_name: str,
|
||||
content: IO,
|
||||
display_name: str | None,
|
||||
file_origin: FileOrigin,
|
||||
file_type: str,
|
||||
file_metadata: dict | None = None,
|
||||
commit: bool = True,
|
||||
) -> None:
|
||||
file_metadata: dict[str, Any] | None = None,
|
||||
file_id: str | None = None,
|
||||
) -> str:
|
||||
"""
|
||||
Save a file to the blob store
|
||||
|
||||
Parameters:
|
||||
- connector_name: Name of the CC-Pair (as specified by the user in the UI)
|
||||
- file_name: Name of the file to save
|
||||
- content: Contents of the file
|
||||
- display_name: Display name of the file
|
||||
- display_name: Display name of the file to save
|
||||
- file_origin: Origin of the file
|
||||
- file_type: Type of the file
|
||||
- file_metadata: Additional metadata for the file
|
||||
- commit: Whether to commit the transaction after saving the file
|
||||
- file_id: Unique ID of the file to save. If not provided, a random UUID will be generated.
|
||||
It is generally NOT recommended to provide this.
|
||||
|
||||
Returns:
|
||||
The unique ID of the file that was saved.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def read_file(
|
||||
self, file_name: str, mode: str | None = None, use_tempfile: bool = False
|
||||
) -> IO:
|
||||
self, file_id: str, mode: str | None = None, use_tempfile: bool = False
|
||||
) -> IO[bytes]:
|
||||
"""
|
||||
Read the content of a given file by the name
|
||||
Read the content of a given file by the ID
|
||||
|
||||
Parameters:
|
||||
- file_name: Name of file to read
|
||||
- file_id: Unique ID of file to read
|
||||
- mode: Mode to open the file (e.g. 'b' for binary)
|
||||
- use_tempfile: Whether to use a temporary file to store the contents
|
||||
in order to avoid loading the entire file into memory
|
||||
@@ -86,34 +108,160 @@ class FileStore(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def read_file_record(self, file_name: str) -> PGFileStore:
|
||||
def read_file_record(self, file_id: str) -> FileStoreModel:
|
||||
"""
|
||||
Read the file record by the name
|
||||
Read the file record by the ID
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def delete_file(self, file_name: str) -> None:
|
||||
def delete_file(self, file_id: str) -> None:
|
||||
"""
|
||||
Delete a file by its name.
|
||||
Delete a file by its ID.
|
||||
|
||||
Parameters:
|
||||
- file_name: Name of file to delete
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_file_with_mime_type(self, filename: str) -> FileWithMimeType | None:
|
||||
"""
|
||||
Get the file + parse out the mime type.
|
||||
"""
|
||||
|
||||
class PostgresBackedFileStore(FileStore):
|
||||
def __init__(self, db_session: Session):
|
||||
|
||||
class S3BackedFileStore(FileStore):
|
||||
"""Isn't necessarily S3, but is any S3-compatible storage (e.g. MinIO)"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
db_session: Session,
|
||||
bucket_name: str,
|
||||
aws_access_key_id: str | None = None,
|
||||
aws_secret_access_key: str | None = None,
|
||||
aws_region_name: str | None = None,
|
||||
s3_endpoint_url: str | None = None,
|
||||
s3_prefix: str | None = None,
|
||||
s3_verify_ssl: bool = True,
|
||||
) -> None:
|
||||
self.db_session = db_session
|
||||
self._s3_client: S3Client | None = None
|
||||
self._bucket_name = bucket_name
|
||||
self._aws_access_key_id = aws_access_key_id
|
||||
self._aws_secret_access_key = aws_secret_access_key
|
||||
self._aws_region_name = aws_region_name or "us-east-2"
|
||||
self._s3_endpoint_url = s3_endpoint_url
|
||||
self._s3_prefix = s3_prefix or "onyx-files"
|
||||
self._s3_verify_ssl = s3_verify_ssl
|
||||
|
||||
def _get_s3_client(self) -> S3Client:
|
||||
"""Initialize S3 client if not already done"""
|
||||
if self._s3_client is None:
|
||||
try:
|
||||
client_kwargs: dict[str, Any] = {
|
||||
"service_name": "s3",
|
||||
"region_name": self._aws_region_name,
|
||||
}
|
||||
|
||||
# Add endpoint URL if specified (for MinIO, etc.)
|
||||
if self._s3_endpoint_url:
|
||||
client_kwargs["endpoint_url"] = self._s3_endpoint_url
|
||||
client_kwargs["config"] = Config(
|
||||
signature_version="s3v4",
|
||||
s3={"addressing_style": "path"}, # Required for MinIO
|
||||
)
|
||||
# Disable SSL verification if requested (for local development)
|
||||
if not self._s3_verify_ssl:
|
||||
import urllib3
|
||||
|
||||
urllib3.disable_warnings(
|
||||
urllib3.exceptions.InsecureRequestWarning
|
||||
)
|
||||
client_kwargs["verify"] = False
|
||||
|
||||
if self._aws_access_key_id and self._aws_secret_access_key:
|
||||
# Use explicit credentials
|
||||
client_kwargs.update(
|
||||
{
|
||||
"aws_access_key_id": self._aws_access_key_id,
|
||||
"aws_secret_access_key": self._aws_secret_access_key,
|
||||
}
|
||||
)
|
||||
self._s3_client = boto3.client(**client_kwargs)
|
||||
else:
|
||||
# Use IAM role or default credentials (not typically used with MinIO)
|
||||
self._s3_client = boto3.client(**client_kwargs)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize S3 client: {e}")
|
||||
raise RuntimeError(f"Failed to initialize S3 client: {e}")
|
||||
|
||||
return self._s3_client
|
||||
|
||||
def _get_bucket_name(self) -> str:
|
||||
"""Get S3 bucket name from configuration"""
|
||||
if not self._bucket_name:
|
||||
raise RuntimeError("S3 bucket name is required for S3 file store")
|
||||
return self._bucket_name
|
||||
|
||||
def _get_s3_key(self, file_name: str) -> str:
|
||||
"""Generate S3 key from file name with tenant ID prefix"""
|
||||
tenant_id = get_current_tenant_id()
|
||||
return f"{self._s3_prefix}/{tenant_id}/{file_name}"
|
||||
|
||||
def initialize(self) -> None:
|
||||
"""Initialize the S3 file store by ensuring the bucket exists"""
|
||||
s3_client = self._get_s3_client()
|
||||
bucket_name = self._get_bucket_name()
|
||||
|
||||
# Check if bucket exists
|
||||
try:
|
||||
s3_client.head_bucket(Bucket=bucket_name)
|
||||
logger.info(f"S3 bucket '{bucket_name}' already exists")
|
||||
except ClientError as e:
|
||||
error_code = e.response["Error"]["Code"]
|
||||
if error_code == "404":
|
||||
# Bucket doesn't exist, create it
|
||||
logger.info(f"Creating S3 bucket '{bucket_name}'")
|
||||
|
||||
# For AWS S3, we need to handle region-specific bucket creation
|
||||
region = (
|
||||
s3_client._client_config.region_name
|
||||
if hasattr(s3_client, "_client_config")
|
||||
else None
|
||||
)
|
||||
|
||||
if region and region != "us-east-1":
|
||||
# For regions other than us-east-1, we need to specify LocationConstraint
|
||||
s3_client.create_bucket(
|
||||
Bucket=bucket_name,
|
||||
CreateBucketConfiguration={"LocationConstraint": region},
|
||||
)
|
||||
else:
|
||||
# For us-east-1 or MinIO/other S3-compatible services
|
||||
s3_client.create_bucket(Bucket=bucket_name)
|
||||
|
||||
logger.info(f"Successfully created S3 bucket '{bucket_name}'")
|
||||
elif error_code == "403":
|
||||
# Bucket exists but we don't have permission to access it
|
||||
logger.warning(
|
||||
f"S3 bucket '{bucket_name}' exists but access is forbidden"
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Access denied to S3 bucket '{bucket_name}'. Check credentials and permissions."
|
||||
)
|
||||
else:
|
||||
# Some other error occurred
|
||||
logger.error(f"Failed to check S3 bucket '{bucket_name}': {e}")
|
||||
raise RuntimeError(f"Failed to check S3 bucket '{bucket_name}': {e}")
|
||||
|
||||
def has_file(
|
||||
self,
|
||||
file_name: str,
|
||||
file_id: str,
|
||||
file_origin: FileOrigin,
|
||||
file_type: str,
|
||||
display_name: str | None = None,
|
||||
) -> bool:
|
||||
file_record = get_pgfilestore_by_file_name_optional(
|
||||
file_name=display_name or file_name, db_session=self.db_session
|
||||
file_record = get_filerecord_by_file_id_optional(
|
||||
file_id=file_id, db_session=self.db_session
|
||||
)
|
||||
return (
|
||||
file_record is not None
|
||||
@@ -123,63 +271,101 @@ class PostgresBackedFileStore(FileStore):
|
||||
|
||||
def save_file(
|
||||
self,
|
||||
file_name: str,
|
||||
content: IO,
|
||||
display_name: str | None,
|
||||
file_origin: FileOrigin,
|
||||
file_type: str,
|
||||
file_metadata: dict | None = None,
|
||||
commit: bool = True,
|
||||
) -> None:
|
||||
try:
|
||||
# The large objects in postgres are saved as special objects can be listed with
|
||||
# SELECT * FROM pg_largeobject_metadata;
|
||||
obj_id = create_populate_lobj(content=content, db_session=self.db_session)
|
||||
upsert_pgfilestore(
|
||||
file_name=file_name,
|
||||
display_name=display_name or file_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
lobj_oid=obj_id,
|
||||
db_session=self.db_session,
|
||||
file_metadata=file_metadata,
|
||||
)
|
||||
if commit:
|
||||
self.db_session.commit()
|
||||
except Exception:
|
||||
self.db_session.rollback()
|
||||
raise
|
||||
file_metadata: dict[str, Any] | None = None,
|
||||
file_id: str | None = None,
|
||||
) -> str:
|
||||
if file_id is None:
|
||||
file_id = str(uuid.uuid4())
|
||||
|
||||
s3_client = self._get_s3_client()
|
||||
bucket_name = self._get_bucket_name()
|
||||
s3_key = self._get_s3_key(file_id)
|
||||
|
||||
# Read content from IO object
|
||||
if hasattr(content, "read"):
|
||||
file_content = content.read()
|
||||
if hasattr(content, "seek"):
|
||||
content.seek(0) # Reset position for potential re-reads
|
||||
else:
|
||||
file_content = content
|
||||
|
||||
# Upload to S3
|
||||
s3_client.put_object(
|
||||
Bucket=bucket_name,
|
||||
Key=s3_key,
|
||||
Body=file_content,
|
||||
ContentType=file_type,
|
||||
)
|
||||
|
||||
# Save metadata to database
|
||||
upsert_filerecord(
|
||||
file_id=file_id,
|
||||
display_name=display_name or file_id,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
bucket_name=bucket_name,
|
||||
object_key=s3_key,
|
||||
db_session=self.db_session,
|
||||
file_metadata=file_metadata,
|
||||
)
|
||||
self.db_session.commit()
|
||||
|
||||
return file_id
|
||||
|
||||
def read_file(
|
||||
self, file_name: str, mode: str | None = None, use_tempfile: bool = False
|
||||
) -> IO:
|
||||
file_record = get_pgfilestore_by_file_name(
|
||||
file_name=file_name, db_session=self.db_session
|
||||
)
|
||||
return read_lobj(
|
||||
lobj_oid=file_record.lobj_oid,
|
||||
db_session=self.db_session,
|
||||
mode=mode,
|
||||
use_tempfile=use_tempfile,
|
||||
self, file_id: str, mode: str | None = None, use_tempfile: bool = False
|
||||
) -> IO[bytes]:
|
||||
file_record = get_filerecord_by_file_id(
|
||||
file_id=file_id, db_session=self.db_session
|
||||
)
|
||||
|
||||
def read_file_record(self, file_name: str) -> PGFileStore:
|
||||
file_record = get_pgfilestore_by_file_name(
|
||||
file_name=file_name, db_session=self.db_session
|
||||
)
|
||||
s3_client = self._get_s3_client()
|
||||
try:
|
||||
response = s3_client.get_object(
|
||||
Bucket=file_record.bucket_name, Key=file_record.object_key
|
||||
)
|
||||
except ClientError:
|
||||
logger.error(f"Failed to read file {file_id} from S3")
|
||||
raise
|
||||
|
||||
file_content = response["Body"].read()
|
||||
|
||||
if use_tempfile:
|
||||
# Always open in binary mode for temp files since we're writing bytes
|
||||
temp_file = tempfile.NamedTemporaryFile(mode="w+b", delete=False)
|
||||
temp_file.write(file_content)
|
||||
temp_file.seek(0)
|
||||
return temp_file
|
||||
else:
|
||||
return BytesIO(file_content)
|
||||
|
||||
def read_file_record(self, file_id: str) -> FileStoreModel:
|
||||
file_record = get_filerecord_by_file_id(
|
||||
file_id=file_id, db_session=self.db_session
|
||||
)
|
||||
return file_record
|
||||
|
||||
def delete_file(self, file_name: str) -> None:
|
||||
def delete_file(self, file_id: str) -> None:
|
||||
try:
|
||||
file_record = get_pgfilestore_by_file_name(
|
||||
file_name=file_name, db_session=self.db_session
|
||||
file_record = get_filerecord_by_file_id(
|
||||
file_id=file_id, db_session=self.db_session
|
||||
)
|
||||
delete_lobj_by_id(file_record.lobj_oid, db_session=self.db_session)
|
||||
delete_pgfilestore_by_file_name(
|
||||
file_name=file_name, db_session=self.db_session
|
||||
|
||||
# Delete from external storage
|
||||
s3_client = self._get_s3_client()
|
||||
s3_client.delete_object(
|
||||
Bucket=file_record.bucket_name, Key=file_record.object_key
|
||||
)
|
||||
|
||||
# Delete metadata from database
|
||||
delete_filerecord_by_file_id(file_id=file_id, db_session=self.db_session)
|
||||
|
||||
self.db_session.commit()
|
||||
|
||||
except Exception:
|
||||
self.db_session.rollback()
|
||||
raise
|
||||
@@ -197,6 +383,53 @@ class PostgresBackedFileStore(FileStore):
|
||||
return None
|
||||
|
||||
|
||||
def get_s3_file_store(db_session: Session) -> S3BackedFileStore:
|
||||
"""
|
||||
Returns the S3 file store implementation.
|
||||
"""
|
||||
|
||||
# Get bucket name - this is required
|
||||
bucket_name = S3_FILE_STORE_BUCKET_NAME
|
||||
if not bucket_name:
|
||||
raise RuntimeError(
|
||||
"S3_FILE_STORE_BUCKET_NAME configuration is required for S3 file store"
|
||||
)
|
||||
|
||||
return S3BackedFileStore(
|
||||
db_session=db_session,
|
||||
bucket_name=bucket_name,
|
||||
aws_access_key_id=S3_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=S3_AWS_SECRET_ACCESS_KEY,
|
||||
aws_region_name=AWS_REGION_NAME,
|
||||
s3_endpoint_url=S3_ENDPOINT_URL,
|
||||
s3_prefix=S3_FILE_STORE_PREFIX,
|
||||
s3_verify_ssl=S3_VERIFY_SSL,
|
||||
)
|
||||
|
||||
|
||||
def get_default_file_store(db_session: Session) -> FileStore:
|
||||
# The only supported file store now is the Postgres File Store
|
||||
return PostgresBackedFileStore(db_session=db_session)
|
||||
"""
|
||||
Returns the configured file store implementation.
|
||||
|
||||
Supports AWS S3, MinIO, and other S3-compatible storage.
|
||||
|
||||
Configuration is handled via environment variables defined in app_configs.py:
|
||||
|
||||
AWS S3:
|
||||
- S3_FILE_STORE_BUCKET_NAME=<bucket-name>
|
||||
- S3_FILE_STORE_PREFIX=<prefix> (optional, defaults to 'onyx-files')
|
||||
- AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (or use IAM roles)
|
||||
- AWS_REGION_NAME=<region> (optional, defaults to 'us-east-2')
|
||||
|
||||
MinIO:
|
||||
- S3_FILE_STORE_BUCKET_NAME=<bucket-name>
|
||||
- S3_ENDPOINT_URL=<minio-endpoint> (e.g., http://localhost:9000)
|
||||
- MINIO_ACCESS_KEY=<minio-access-key> (falls back to AWS_ACCESS_KEY_ID)
|
||||
- MINIO_SECRET_KEY=<minio-secret-key> (falls back to AWS_SECRET_ACCESS_KEY)
|
||||
- AWS_REGION_NAME=<any-region> (optional, defaults to 'us-east-2')
|
||||
- S3_VERIFY_SSL=false (optional, for local development)
|
||||
|
||||
Other S3-compatible storage (Digital Ocean, Linode, etc.):
|
||||
- Same as MinIO, but set appropriate S3_ENDPOINT_URL
|
||||
"""
|
||||
return get_s3_file_store(db_session)
|
||||
|
||||
@@ -3,7 +3,6 @@ from collections.abc import Callable
|
||||
from io import BytesIO
|
||||
from typing import cast
|
||||
from uuid import UUID
|
||||
from uuid import uuid4
|
||||
|
||||
import requests
|
||||
from sqlalchemy.orm import Session
|
||||
@@ -30,16 +29,13 @@ def user_file_id_to_plaintext_file_name(user_file_id: int) -> str:
|
||||
return f"plaintext_{user_file_id}"
|
||||
|
||||
|
||||
def store_user_file_plaintext(
|
||||
user_file_id: int, plaintext_content: str, db_session: Session
|
||||
) -> bool:
|
||||
def store_user_file_plaintext(user_file_id: int, plaintext_content: str) -> bool:
|
||||
"""
|
||||
Store plaintext content for a user file in the file store.
|
||||
|
||||
Args:
|
||||
user_file_id: The ID of the user file
|
||||
plaintext_content: The plaintext content to store
|
||||
db_session: The database session
|
||||
|
||||
Returns:
|
||||
bool: True if storage was successful, False otherwise
|
||||
@@ -51,19 +47,19 @@ def store_user_file_plaintext(
|
||||
# Get plaintext file name
|
||||
plaintext_file_name = user_file_id_to_plaintext_file_name(user_file_id)
|
||||
|
||||
# Store the plaintext in the file store
|
||||
file_store = get_default_file_store(db_session)
|
||||
file_content = BytesIO(plaintext_content.encode("utf-8"))
|
||||
# Use a separate session to avoid committing the caller's transaction
|
||||
try:
|
||||
file_store.save_file(
|
||||
file_name=plaintext_file_name,
|
||||
content=file_content,
|
||||
display_name=f"Plaintext for user file {user_file_id}",
|
||||
file_origin=FileOrigin.PLAINTEXT_CACHE,
|
||||
file_type="text/plain",
|
||||
commit=False,
|
||||
)
|
||||
return True
|
||||
with get_session_with_current_tenant() as file_store_session:
|
||||
file_store = get_default_file_store(file_store_session)
|
||||
file_content = BytesIO(plaintext_content.encode("utf-8"))
|
||||
file_store.save_file(
|
||||
content=file_content,
|
||||
display_name=f"Plaintext for user file {user_file_id}",
|
||||
file_origin=FileOrigin.PLAINTEXT_CACHE,
|
||||
file_type="text/plain",
|
||||
file_id=plaintext_file_name,
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to store plaintext for user file {user_file_id}: {e}")
|
||||
return False
|
||||
@@ -274,34 +270,27 @@ def save_file_from_url(url: str) -> str:
|
||||
response = requests.get(url)
|
||||
response.raise_for_status()
|
||||
|
||||
unique_id = str(uuid4())
|
||||
|
||||
file_io = BytesIO(response.content)
|
||||
file_store = get_default_file_store(db_session)
|
||||
file_store.save_file(
|
||||
file_name=unique_id,
|
||||
file_id = file_store.save_file(
|
||||
content=file_io,
|
||||
display_name="GeneratedImage",
|
||||
file_origin=FileOrigin.CHAT_IMAGE_GEN,
|
||||
file_type="image/png;base64",
|
||||
commit=True,
|
||||
)
|
||||
return unique_id
|
||||
return file_id
|
||||
|
||||
|
||||
def save_file_from_base64(base64_string: str) -> str:
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
unique_id = str(uuid4())
|
||||
file_store = get_default_file_store(db_session)
|
||||
file_store.save_file(
|
||||
file_name=unique_id,
|
||||
file_id = file_store.save_file(
|
||||
content=BytesIO(base64.b64decode(base64_string)),
|
||||
display_name="GeneratedImage",
|
||||
file_origin=FileOrigin.CHAT_IMAGE_GEN,
|
||||
file_type=get_image_type(base64_string),
|
||||
commit=True,
|
||||
)
|
||||
return unique_id
|
||||
return file_id
|
||||
|
||||
|
||||
def save_file(
|
||||
|
||||
@@ -82,7 +82,7 @@ def _combine_chunks(chunks: list[DocAwareChunk], large_chunk_id: int) -> DocAwar
|
||||
blurb=chunks[0].blurb,
|
||||
content=chunks[0].content,
|
||||
source_links=chunks[0].source_links or {},
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
section_continuation=(chunks[0].chunk_id > 0),
|
||||
title_prefix=chunks[0].title_prefix,
|
||||
metadata_suffix_semantic=chunks[0].metadata_suffix_semantic,
|
||||
@@ -233,7 +233,7 @@ class Chunker:
|
||||
title_prefix: str = "",
|
||||
metadata_suffix_semantic: str = "",
|
||||
metadata_suffix_keyword: str = "",
|
||||
image_file_name: str | None = None,
|
||||
image_file_id: str | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Helper to create a new DocAwareChunk, append it to chunks_list.
|
||||
@@ -244,7 +244,7 @@ class Chunker:
|
||||
blurb=self._extract_blurb(text),
|
||||
content=text,
|
||||
source_links=links or {0: ""},
|
||||
image_file_name=image_file_name,
|
||||
image_file_id=image_file_id,
|
||||
section_continuation=is_continuation,
|
||||
title_prefix=title_prefix,
|
||||
metadata_suffix_semantic=metadata_suffix_semantic,
|
||||
@@ -278,7 +278,7 @@ class Chunker:
|
||||
# Get section text and other attributes
|
||||
section_text = clean_text(str(section.text or ""))
|
||||
section_link_text = section.link or ""
|
||||
image_url = section.image_file_name
|
||||
image_url = section.image_file_id
|
||||
|
||||
# If there is no useful content, skip
|
||||
if not section_text and (not document.title or section_idx > 0):
|
||||
@@ -312,7 +312,7 @@ class Chunker:
|
||||
chunks,
|
||||
section_text,
|
||||
links={0: section_link_text} if section_link_text else {},
|
||||
image_file_name=image_url,
|
||||
image_file_id=image_url,
|
||||
title_prefix=title_prefix,
|
||||
metadata_suffix_semantic=metadata_suffix_semantic,
|
||||
metadata_suffix_keyword=metadata_suffix_keyword,
|
||||
|
||||
@@ -44,8 +44,6 @@ from onyx.db.document_set import fetch_document_sets_for_documents
|
||||
from onyx.db.engine import get_session_with_current_tenant
|
||||
from onyx.db.models import Document as DBDocument
|
||||
from onyx.db.models import IndexModelStatus
|
||||
from onyx.db.pg_file_store import get_pgfilestore_by_file_name
|
||||
from onyx.db.pg_file_store import read_lobj
|
||||
from onyx.db.search_settings import get_active_search_settings
|
||||
from onyx.db.tag import create_or_add_document_tag
|
||||
from onyx.db.tag import create_or_add_document_tag_list
|
||||
@@ -59,6 +57,7 @@ from onyx.document_index.interfaces import DocumentIndex
|
||||
from onyx.document_index.interfaces import DocumentMetadata
|
||||
from onyx.document_index.interfaces import IndexBatchParams
|
||||
from onyx.file_processing.image_summarization import summarize_image_with_error_handling
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.file_store.utils import store_user_file_plaintext
|
||||
from onyx.indexing.chunker import Chunker
|
||||
from onyx.indexing.embedder import embed_chunks_with_failure_handling
|
||||
@@ -463,13 +462,13 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
|
||||
# Even without LLM, we still convert to IndexingDocument with base Sections
|
||||
return [
|
||||
IndexingDocument(
|
||||
**document.dict(),
|
||||
**document.model_dump(),
|
||||
processed_sections=[
|
||||
Section(
|
||||
text=section.text if isinstance(section, TextSection) else "",
|
||||
link=section.link,
|
||||
image_file_name=(
|
||||
section.image_file_name
|
||||
image_file_id=(
|
||||
section.image_file_id
|
||||
if isinstance(section, ImageSection)
|
||||
else None
|
||||
),
|
||||
@@ -486,38 +485,39 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
|
||||
processed_sections: list[Section] = []
|
||||
|
||||
for section in document.sections:
|
||||
# For ImageSection, process and create base Section with both text and image_file_name
|
||||
# For ImageSection, process and create base Section with both text and image_file_id
|
||||
if isinstance(section, ImageSection):
|
||||
# Default section with image path preserved - ensure text is always a string
|
||||
processed_section = Section(
|
||||
link=section.link,
|
||||
image_file_name=section.image_file_name,
|
||||
image_file_id=section.image_file_id,
|
||||
text="", # Initialize with empty string
|
||||
)
|
||||
|
||||
# Try to get image summary
|
||||
try:
|
||||
with get_session_with_current_tenant() as db_session:
|
||||
pgfilestore = get_pgfilestore_by_file_name(
|
||||
file_name=section.image_file_name, db_session=db_session
|
||||
)
|
||||
file_store = get_default_file_store(db_session)
|
||||
|
||||
if not pgfilestore:
|
||||
file_record = file_store.read_file_record(
|
||||
file_id=section.image_file_id
|
||||
)
|
||||
if not file_record:
|
||||
logger.warning(
|
||||
f"Image file {section.image_file_name} not found in PGFileStore"
|
||||
f"Image file {section.image_file_id} not found in FileStore"
|
||||
)
|
||||
|
||||
processed_section.text = "[Image could not be processed]"
|
||||
else:
|
||||
# Get the image data
|
||||
image_data_io = read_lobj(
|
||||
pgfilestore.lobj_oid, db_session, mode="rb"
|
||||
image_data_io = file_store.read_file(
|
||||
file_id=section.image_file_id
|
||||
)
|
||||
pgfilestore_data = image_data_io.read()
|
||||
image_data = image_data_io.read()
|
||||
summary = summarize_image_with_error_handling(
|
||||
llm=llm,
|
||||
image_data=pgfilestore_data,
|
||||
context_name=pgfilestore.display_name or "Image",
|
||||
image_data=image_data,
|
||||
context_name=file_record.display_name or "Image",
|
||||
)
|
||||
|
||||
if summary:
|
||||
@@ -537,23 +537,13 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
|
||||
processed_section = Section(
|
||||
text=section.text or "", # Ensure text is always a string, not None
|
||||
link=section.link,
|
||||
image_file_name=None,
|
||||
)
|
||||
processed_sections.append(processed_section)
|
||||
|
||||
# If it's already a base Section (unlikely), just append it with text validation
|
||||
else:
|
||||
# Ensure text is always a string
|
||||
processed_section = Section(
|
||||
text=section.text if section.text is not None else "",
|
||||
link=section.link,
|
||||
image_file_name=section.image_file_name,
|
||||
image_file_id=None,
|
||||
)
|
||||
processed_sections.append(processed_section)
|
||||
|
||||
# Create IndexingDocument with original sections and processed_sections
|
||||
indexed_document = IndexingDocument(
|
||||
**document.dict(), processed_sections=processed_sections
|
||||
**document.model_dump(), processed_sections=processed_sections
|
||||
)
|
||||
indexed_documents.append(indexed_document)
|
||||
|
||||
@@ -988,6 +978,15 @@ def index_doc_batch(
|
||||
continue
|
||||
ids_to_new_updated_at[doc.id] = doc.doc_updated_at
|
||||
|
||||
# Store the plaintext in the file store for faster retrieval
|
||||
# NOTE: this creates its own session to avoid committing the overall
|
||||
# transaction.
|
||||
for user_file_id, raw_text in user_file_id_to_raw_text.items():
|
||||
store_user_file_plaintext(
|
||||
user_file_id=user_file_id,
|
||||
plaintext_content=raw_text,
|
||||
)
|
||||
|
||||
update_docs_updated_at__no_commit(
|
||||
ids_to_new_updated_at=ids_to_new_updated_at, db_session=db_session
|
||||
)
|
||||
@@ -1018,14 +1017,6 @@ def index_doc_batch(
|
||||
document_ids=[doc.id for doc in filtered_documents],
|
||||
db_session=db_session,
|
||||
)
|
||||
# Store the plaintext in the file store for faster retrieval
|
||||
for user_file_id, raw_text in user_file_id_to_raw_text.items():
|
||||
# Use the dedicated function to store plaintext
|
||||
store_user_file_plaintext(
|
||||
user_file_id=user_file_id,
|
||||
plaintext_content=raw_text,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
# save the chunk boost components to postgres
|
||||
update_chunk_boost_components__no_commit(
|
||||
@@ -1033,6 +1024,7 @@ def index_doc_batch(
|
||||
)
|
||||
|
||||
# Pause user file ccpairs
|
||||
# TODO: investigate why nothing is done here?
|
||||
|
||||
db_session.commit()
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ class BaseChunk(BaseModel):
|
||||
content: str
|
||||
# Holds the link and the offsets into the raw Chunk text
|
||||
source_links: dict[int, str] | None
|
||||
image_file_name: str | None
|
||||
image_file_id: str | None
|
||||
# True if this Chunk's start is not at the start of a Section
|
||||
section_continuation: bool
|
||||
|
||||
|
||||
@@ -51,6 +51,7 @@ from onyx.configs.constants import POSTGRES_WEB_APP_NAME
|
||||
from onyx.db.engine import get_session_context_manager
|
||||
from onyx.db.engine import SqlEngine
|
||||
from onyx.db.engine import warm_up_connections
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.server.api_key.api import router as api_key_router
|
||||
from onyx.server.auth_check import check_router_auth
|
||||
from onyx.server.documents.cc_pair import router as cc_pair_router
|
||||
@@ -255,6 +256,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
# If we are multi-tenant, we need to only set up initial public tables
|
||||
with get_session_context_manager() as db_session:
|
||||
setup_onyx(db_session, POSTGRES_DEFAULT_SCHEMA)
|
||||
# set up the file store (e.g. create bucket if needed). On multi-tenant,
|
||||
# this is done via IaC
|
||||
get_default_file_store(db_session).initialize()
|
||||
else:
|
||||
setup_multitenant_onyx()
|
||||
|
||||
|
||||
@@ -58,7 +58,7 @@ def _create_indexable_chunks(
|
||||
TextSection(
|
||||
text=preprocessed_doc["content"],
|
||||
link=preprocessed_doc["url"],
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
)
|
||||
],
|
||||
source=DocumentSource.WEB,
|
||||
@@ -102,7 +102,7 @@ def _create_indexable_chunks(
|
||||
user_folder=None,
|
||||
boost=DEFAULT_BOOST,
|
||||
large_chunk_id=None,
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
aggregated_chunk_boost_factor=1.0,
|
||||
)
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import json
|
||||
import mimetypes
|
||||
import os
|
||||
import uuid
|
||||
import zipfile
|
||||
from io import BytesIO
|
||||
from typing import Any
|
||||
@@ -449,40 +448,36 @@ def upload_files(files: list[UploadFile], db_session: Session) -> FileUploadResp
|
||||
continue
|
||||
|
||||
sub_file_bytes = zf.read(file_info)
|
||||
sub_file_name = os.path.join(str(uuid.uuid4()), file_info)
|
||||
deduped_file_paths.append(sub_file_name)
|
||||
|
||||
mime_type, __ = mimetypes.guess_type(file_info)
|
||||
if mime_type is None:
|
||||
mime_type = "application/octet-stream"
|
||||
|
||||
file_store.save_file(
|
||||
file_name=sub_file_name,
|
||||
file_id = file_store.save_file(
|
||||
content=BytesIO(sub_file_bytes),
|
||||
display_name=os.path.basename(file_info),
|
||||
file_origin=FileOrigin.CONNECTOR,
|
||||
file_type=mime_type,
|
||||
)
|
||||
deduped_file_paths.append(file_id)
|
||||
continue
|
||||
|
||||
# Special handling for docx files - only store the plaintext version
|
||||
if file.content_type and file.content_type.startswith(
|
||||
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
||||
):
|
||||
file_path = convert_docx_to_txt(file, file_store)
|
||||
deduped_file_paths.append(file_path)
|
||||
docx_file_id = convert_docx_to_txt(file, file_store)
|
||||
deduped_file_paths.append(docx_file_id)
|
||||
continue
|
||||
|
||||
# Default handling for all other file types
|
||||
file_path = os.path.join(str(uuid.uuid4()), cast(str, file.filename))
|
||||
deduped_file_paths.append(file_path)
|
||||
file_store.save_file(
|
||||
file_name=file_path,
|
||||
file_id = file_store.save_file(
|
||||
content=file.file,
|
||||
display_name=file.filename,
|
||||
file_origin=FileOrigin.CONNECTOR,
|
||||
file_type=file.content_type or "text/plain",
|
||||
)
|
||||
deduped_file_paths.append(file_id)
|
||||
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import uuid
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter
|
||||
@@ -182,9 +181,7 @@ def upload_file(
|
||||
) -> dict[str, str]:
|
||||
file_store = get_default_file_store(db_session)
|
||||
file_type = ChatFileType.IMAGE
|
||||
file_id = str(uuid.uuid4())
|
||||
file_store.save_file(
|
||||
file_name=file_id,
|
||||
file_id = file_store.save_file(
|
||||
content=file.file,
|
||||
display_name=file.filename,
|
||||
file_origin=FileOrigin.CHAT_UPLOAD,
|
||||
|
||||
@@ -4,7 +4,6 @@ import io
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Generator
|
||||
from datetime import timedelta
|
||||
@@ -739,9 +738,7 @@ def upload_files_for_chat(
|
||||
new_content_type = file.content_type
|
||||
|
||||
# Store the file normally
|
||||
file_id = str(uuid.uuid4())
|
||||
file_store.save_file(
|
||||
file_name=file_id,
|
||||
file_id = file_store.save_file(
|
||||
content=file_content_io,
|
||||
display_name=file.filename,
|
||||
file_origin=FileOrigin.CHAT_UPLOAD,
|
||||
@@ -756,10 +753,8 @@ def upload_files_for_chat(
|
||||
file=extracted_text_io, # use the bytes we already read
|
||||
file_name=file.filename or "",
|
||||
)
|
||||
text_file_id = str(uuid.uuid4())
|
||||
|
||||
file_store.save_file(
|
||||
file_name=text_file_id,
|
||||
text_file_id = file_store.save_file(
|
||||
content=io.BytesIO(extracted_text.encode()),
|
||||
display_name=file.filename,
|
||||
file_origin=FileOrigin.CHAT_UPLOAD,
|
||||
|
||||
@@ -12,7 +12,7 @@ from onyx.configs.constants import ONYX_CLOUD_REDIS_RUNTIME
|
||||
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
|
||||
from onyx.configs.constants import ONYX_EMAILABLE_LOGO_MAX_DIM
|
||||
from onyx.db.engine import get_session_with_shared_schema
|
||||
from onyx.file_store.file_store import PostgresBackedFileStore
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.redis.redis_pool import get_redis_replica_client
|
||||
from onyx.utils.file import FileWithMimeType
|
||||
from onyx.utils.file import OnyxStaticFileManager
|
||||
@@ -41,7 +41,7 @@ class OnyxRuntime:
|
||||
|
||||
if db_filename:
|
||||
with get_session_with_shared_schema() as db_session:
|
||||
file_store = PostgresBackedFileStore(db_session)
|
||||
file_store = get_default_file_store(db_session)
|
||||
onyx_file = file_store.get_file_with_mime_type(db_filename)
|
||||
|
||||
if not onyx_file:
|
||||
|
||||
@@ -217,7 +217,7 @@ class CustomTool(BaseTool):
|
||||
content = BytesIO(file_content)
|
||||
|
||||
file_store.save_file(
|
||||
file_name=file_id,
|
||||
file_id=file_id,
|
||||
content=content,
|
||||
display_name=file_id,
|
||||
file_origin=FileOrigin.CHAT_UPLOAD,
|
||||
|
||||
@@ -75,7 +75,7 @@ def generate_dummy_chunk(
|
||||
title_embedding=generate_random_embedding(embedding_dim),
|
||||
large_chunk_id=None,
|
||||
large_chunk_reference_ids=[],
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
)
|
||||
|
||||
document_set_names = []
|
||||
|
||||
912
backend/tests/daily/file_store/test_file_store_non_mocked.py
Normal file
912
backend/tests/daily/file_store/test_file_store_non_mocked.py
Normal file
@@ -0,0 +1,912 @@
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
from collections.abc import Generator
|
||||
from concurrent.futures import as_completed
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from io import BytesIO
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Tuple
|
||||
from typing import TypedDict
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.db.engine import get_session_context_manager
|
||||
from onyx.db.engine import SqlEngine
|
||||
from onyx.file_store.file_store import S3BackedFileStore
|
||||
from onyx.utils.logger import setup_logger
|
||||
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
TEST_BUCKET_NAME: str = "onyx-file-store-tests"
|
||||
TEST_FILE_PREFIX: str = "test-files"
|
||||
TEST_TENANT_ID: str = "public"
|
||||
|
||||
|
||||
# Type definitions for test data
|
||||
class BackendConfig(TypedDict):
|
||||
endpoint_url: str | None
|
||||
access_key: str
|
||||
secret_key: str
|
||||
region: str
|
||||
verify_ssl: bool
|
||||
backend_name: str
|
||||
|
||||
|
||||
class FileTestData(TypedDict):
|
||||
name: str
|
||||
display_name: str
|
||||
content: str
|
||||
type: str
|
||||
origin: FileOrigin
|
||||
|
||||
|
||||
class WorkerResult(TypedDict):
|
||||
worker_id: int
|
||||
file_name: str
|
||||
content: str
|
||||
|
||||
|
||||
def _get_all_backend_configs() -> List[BackendConfig]:
|
||||
"""Get configurations for all available backends"""
|
||||
from onyx.configs.app_configs import (
|
||||
S3_ENDPOINT_URL,
|
||||
S3_AWS_ACCESS_KEY_ID,
|
||||
S3_AWS_SECRET_ACCESS_KEY,
|
||||
AWS_REGION_NAME,
|
||||
)
|
||||
|
||||
configs: List[BackendConfig] = []
|
||||
|
||||
# MinIO configuration (if endpoint is configured)
|
||||
if S3_ENDPOINT_URL:
|
||||
minio_access_key = "minioadmin"
|
||||
minio_secret_key = "minioadmin"
|
||||
if minio_access_key and minio_secret_key:
|
||||
configs.append(
|
||||
{
|
||||
"endpoint_url": S3_ENDPOINT_URL,
|
||||
"access_key": minio_access_key,
|
||||
"secret_key": minio_secret_key,
|
||||
"region": "us-east-1",
|
||||
"verify_ssl": False,
|
||||
"backend_name": "MinIO",
|
||||
}
|
||||
)
|
||||
|
||||
# AWS S3 configuration (if credentials are available)
|
||||
if S3_AWS_ACCESS_KEY_ID and S3_AWS_SECRET_ACCESS_KEY:
|
||||
configs.append(
|
||||
{
|
||||
"endpoint_url": None,
|
||||
"access_key": S3_AWS_ACCESS_KEY_ID,
|
||||
"secret_key": S3_AWS_SECRET_ACCESS_KEY,
|
||||
"region": AWS_REGION_NAME or "us-east-2",
|
||||
"verify_ssl": True,
|
||||
"backend_name": "AWS S3",
|
||||
}
|
||||
)
|
||||
|
||||
if not configs:
|
||||
pytest.skip(
|
||||
"No backend configurations available - set MinIO or AWS S3 credentials"
|
||||
)
|
||||
|
||||
return configs
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def db_session() -> Generator[Session, None, None]:
|
||||
"""Create a database session for testing using the actual PostgreSQL database"""
|
||||
# Make sure that the db engine is initialized before any tests are run
|
||||
SqlEngine.init_engine(
|
||||
pool_size=10,
|
||||
max_overflow=5,
|
||||
)
|
||||
with get_session_context_manager() as session:
|
||||
yield session
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def tenant_context() -> Generator[None, None, None]:
|
||||
"""Set up tenant context for testing"""
|
||||
# Set the tenant context for the test
|
||||
token = CURRENT_TENANT_ID_CONTEXTVAR.set(TEST_TENANT_ID)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
# Reset the tenant context after the test
|
||||
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope="function",
|
||||
params=_get_all_backend_configs(),
|
||||
ids=lambda config: config["backend_name"],
|
||||
)
|
||||
def file_store(
|
||||
request: pytest.FixtureRequest, db_session: Session, tenant_context: None
|
||||
) -> Generator[S3BackedFileStore, None, None]:
|
||||
"""Create an S3BackedFileStore instance for testing with parametrized backend"""
|
||||
backend_config: BackendConfig = request.param
|
||||
|
||||
# Create S3BackedFileStore with backend-specific configuration
|
||||
store = S3BackedFileStore(
|
||||
db_session=db_session,
|
||||
bucket_name=TEST_BUCKET_NAME,
|
||||
aws_access_key_id=backend_config["access_key"],
|
||||
aws_secret_access_key=backend_config["secret_key"],
|
||||
aws_region_name=backend_config["region"],
|
||||
s3_endpoint_url=backend_config["endpoint_url"],
|
||||
s3_prefix=f"{TEST_FILE_PREFIX}-{uuid.uuid4()}",
|
||||
s3_verify_ssl=backend_config["verify_ssl"],
|
||||
)
|
||||
|
||||
# Initialize the store and ensure bucket exists
|
||||
store.initialize()
|
||||
logger.info(
|
||||
f"Successfully initialized {backend_config['backend_name']} file store with bucket {TEST_BUCKET_NAME}"
|
||||
)
|
||||
|
||||
yield store
|
||||
|
||||
# Cleanup: Remove all test files from the bucket (including tenant-prefixed files)
|
||||
try:
|
||||
s3_client = store._get_s3_client()
|
||||
actual_bucket_name = store._get_bucket_name()
|
||||
|
||||
# List and delete all objects in the test prefix (including tenant subdirectories)
|
||||
response = s3_client.list_objects_v2(
|
||||
Bucket=actual_bucket_name, Prefix=f"{store._s3_prefix}/"
|
||||
)
|
||||
|
||||
if "Contents" in response:
|
||||
objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]]
|
||||
s3_client.delete_objects(
|
||||
Bucket=actual_bucket_name, Delete={"Objects": objects_to_delete} # type: ignore[typeddict-item]
|
||||
)
|
||||
logger.info(
|
||||
f"Cleaned up {len(objects_to_delete)} test objects from {backend_config['backend_name']}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to cleanup test objects: {e}")
|
||||
|
||||
|
||||
class TestS3BackedFileStore:
|
||||
"""Test suite for S3BackedFileStore using real S3-compatible storage (MinIO or AWS S3)"""
|
||||
|
||||
def test_store_initialization(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test that the file store initializes properly"""
|
||||
# The fixture already calls initialize(), so we just verify it worked
|
||||
bucket_name = file_store._get_bucket_name()
|
||||
assert bucket_name.startswith(TEST_BUCKET_NAME) # Should be backend-specific
|
||||
|
||||
# Verify bucket exists by trying to list objects
|
||||
s3_client = file_store._get_s3_client()
|
||||
|
||||
# This should not raise an exception
|
||||
s3_client.list_objects_v2(Bucket=bucket_name, MaxKeys=1)
|
||||
|
||||
def test_save_and_read_text_file(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test saving and reading a text file"""
|
||||
file_id = f"{uuid.uuid4()}.txt"
|
||||
display_name = "Test Text File"
|
||||
content = "This is a test text file content.\nWith multiple lines."
|
||||
file_type = "text/plain"
|
||||
file_origin = FileOrigin.OTHER
|
||||
|
||||
# Save the file
|
||||
content_io = BytesIO(content.encode("utf-8"))
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Read the file back
|
||||
read_content_io = file_store.read_file(file_id)
|
||||
read_content = read_content_io.read().decode("utf-8")
|
||||
|
||||
assert read_content == content
|
||||
|
||||
# Verify file record in database
|
||||
file_record = file_store.read_file_record(file_id)
|
||||
assert file_record.file_id == file_id
|
||||
assert file_record.display_name == display_name
|
||||
assert file_record.file_origin == file_origin
|
||||
assert file_record.file_type == file_type
|
||||
assert (
|
||||
file_record.bucket_name == file_store._get_bucket_name()
|
||||
) # Use actual bucket name
|
||||
# The object key should include the tenant ID
|
||||
expected_object_key = f"{file_store._s3_prefix}/{TEST_TENANT_ID}/{file_id}"
|
||||
assert file_record.object_key == expected_object_key
|
||||
|
||||
def test_save_and_read_binary_file(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test saving and reading a binary file"""
|
||||
file_id = f"{uuid.uuid4()}.bin"
|
||||
display_name = "Test Binary File"
|
||||
# Create some binary content
|
||||
content = bytes(range(256)) # 0-255 bytes
|
||||
file_type = "application/octet-stream"
|
||||
file_origin = FileOrigin.CONNECTOR
|
||||
|
||||
# Save the file
|
||||
content_io = BytesIO(content)
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Read the file back
|
||||
read_content_io = file_store.read_file(file_id)
|
||||
read_content = read_content_io.read()
|
||||
|
||||
assert read_content == content
|
||||
|
||||
def test_save_with_metadata(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test saving a file with metadata"""
|
||||
file_id = f"{uuid.uuid4()}.json"
|
||||
display_name = "Test Metadata File"
|
||||
content = '{"key": "value", "number": 42}'
|
||||
file_type = "application/json"
|
||||
file_origin = FileOrigin.CHAT_UPLOAD
|
||||
metadata: Dict[str, Any] = {
|
||||
"source": "test_suite",
|
||||
"version": "1.0",
|
||||
"tags": ["test", "json"],
|
||||
"size": len(content),
|
||||
}
|
||||
|
||||
# Save the file with metadata
|
||||
content_io = BytesIO(content.encode("utf-8"))
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_metadata=metadata,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Verify metadata is stored in database
|
||||
file_record = file_store.read_file_record(file_id)
|
||||
assert file_record.file_metadata == metadata
|
||||
|
||||
def test_has_file(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test the has_file method"""
|
||||
file_id = f"{uuid.uuid4()}.txt"
|
||||
display_name = "Test Has File"
|
||||
content = "Content for has_file test"
|
||||
file_type = "text/plain"
|
||||
file_origin = FileOrigin.OTHER
|
||||
|
||||
# Initially, file should not exist
|
||||
assert not file_store.has_file(
|
||||
file_id=file_id,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
)
|
||||
|
||||
# Save the file
|
||||
content_io = BytesIO(content.encode("utf-8"))
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Now file should exist
|
||||
assert file_store.has_file(
|
||||
file_id=file_id,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
)
|
||||
|
||||
# Test with wrong parameters
|
||||
assert not file_store.has_file(
|
||||
file_id=file_id,
|
||||
file_origin=FileOrigin.CONNECTOR, # Wrong origin
|
||||
file_type=file_type,
|
||||
)
|
||||
|
||||
assert not file_store.has_file(
|
||||
file_id=file_id,
|
||||
file_origin=file_origin,
|
||||
file_type="application/pdf", # Wrong type
|
||||
)
|
||||
|
||||
def test_read_file_with_tempfile(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test reading a file using temporary file"""
|
||||
file_id = f"{uuid.uuid4()}.txt"
|
||||
display_name = "Test Temp File"
|
||||
content = "Content for temporary file test"
|
||||
file_type = "text/plain"
|
||||
file_origin = FileOrigin.OTHER
|
||||
|
||||
# Save the file
|
||||
content_io = BytesIO(content.encode("utf-8"))
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Read using temporary file
|
||||
temp_file = file_store.read_file(file_id, use_tempfile=True)
|
||||
|
||||
# Read content from temp file
|
||||
temp_file.seek(0)
|
||||
read_content_bytes = temp_file.read()
|
||||
if isinstance(read_content_bytes, bytes):
|
||||
read_content_str = read_content_bytes.decode("utf-8")
|
||||
else:
|
||||
read_content_str = str(read_content_bytes)
|
||||
|
||||
assert read_content_str == content
|
||||
|
||||
# Clean up the temp file
|
||||
temp_file.close()
|
||||
if hasattr(temp_file, "name"):
|
||||
try:
|
||||
os.unlink(temp_file.name)
|
||||
except (OSError, AttributeError):
|
||||
pass
|
||||
|
||||
def test_delete_file(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test deleting a file"""
|
||||
file_id = f"{uuid.uuid4()}.txt"
|
||||
display_name = "Test Delete File"
|
||||
content = "Content for delete test"
|
||||
file_type = "text/plain"
|
||||
file_origin = FileOrigin.OTHER
|
||||
|
||||
# Save the file
|
||||
content_io = BytesIO(content.encode("utf-8"))
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Verify file exists
|
||||
assert file_store.has_file(
|
||||
file_id=file_id,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
)
|
||||
|
||||
# Delete the file
|
||||
file_store.delete_file(file_id)
|
||||
|
||||
# Verify file no longer exists
|
||||
assert not file_store.has_file(
|
||||
file_id=file_id,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
)
|
||||
|
||||
# Verify trying to read deleted file raises exception
|
||||
with pytest.raises(RuntimeError, match="does not exist or was deleted"):
|
||||
file_store.read_file(file_id)
|
||||
|
||||
def test_get_file_with_mime_type(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test getting file with mime type detection"""
|
||||
file_id = f"{uuid.uuid4()}.txt"
|
||||
display_name = "Test MIME Type"
|
||||
content = "This is a plain text file"
|
||||
file_type = "text/plain"
|
||||
file_origin = FileOrigin.OTHER
|
||||
|
||||
# Save the file
|
||||
content_io = BytesIO(content.encode("utf-8"))
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Get file with mime type
|
||||
file_with_mime = file_store.get_file_with_mime_type(file_id)
|
||||
|
||||
assert file_with_mime is not None
|
||||
assert file_with_mime.data.decode("utf-8") == content
|
||||
# The detected mime type might be different from what we stored
|
||||
assert file_with_mime.mime_type is not None
|
||||
|
||||
def test_file_overwrite(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test overwriting an existing file"""
|
||||
file_id = f"{uuid.uuid4()}.txt"
|
||||
display_name = "Test Overwrite"
|
||||
original_content = "Original content"
|
||||
new_content = "New content after overwrite"
|
||||
file_type = "text/plain"
|
||||
file_origin = FileOrigin.OTHER
|
||||
|
||||
# Save original file
|
||||
content_io = BytesIO(original_content.encode("utf-8"))
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Verify original content
|
||||
read_content_io = file_store.read_file(file_id)
|
||||
assert read_content_io.read().decode("utf-8") == original_content
|
||||
|
||||
# Overwrite with new content
|
||||
new_content_io = BytesIO(new_content.encode("utf-8"))
|
||||
returned_file_id_2 = file_store.save_file(
|
||||
content=new_content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id_2 == file_id
|
||||
|
||||
# Verify new content
|
||||
read_content_io = file_store.read_file(file_id)
|
||||
assert read_content_io.read().decode("utf-8") == new_content
|
||||
|
||||
def test_large_file_handling(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test handling of larger files"""
|
||||
file_id = f"{uuid.uuid4()}.bin"
|
||||
display_name = "Test Large File"
|
||||
# Create a 1MB file
|
||||
content_size = 1024 * 1024 # 1MB
|
||||
content = b"A" * content_size
|
||||
file_type = "application/octet-stream"
|
||||
file_origin = FileOrigin.CONNECTOR
|
||||
|
||||
# Save the large file
|
||||
content_io = BytesIO(content)
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Read the file back
|
||||
read_content_io = file_store.read_file(file_id)
|
||||
read_content = read_content_io.read()
|
||||
|
||||
assert len(read_content) == content_size
|
||||
assert read_content == content
|
||||
|
||||
def test_error_handling_nonexistent_file(
|
||||
self, file_store: S3BackedFileStore
|
||||
) -> None:
|
||||
"""Test error handling when trying to read a non-existent file"""
|
||||
nonexistent_file_id = f"{uuid.uuid4()}.txt"
|
||||
|
||||
with pytest.raises(RuntimeError, match="does not exist or was deleted"):
|
||||
file_store.read_file(nonexistent_file_id)
|
||||
|
||||
with pytest.raises(RuntimeError, match="does not exist or was deleted"):
|
||||
file_store.read_file_record(nonexistent_file_id)
|
||||
|
||||
# get_file_with_mime_type should return None for non-existent files
|
||||
result = file_store.get_file_with_mime_type(nonexistent_file_id)
|
||||
assert result is None
|
||||
|
||||
def test_error_handling_delete_nonexistent_file(
|
||||
self, file_store: S3BackedFileStore
|
||||
) -> None:
|
||||
"""Test error handling when trying to delete a non-existent file"""
|
||||
nonexistent_file_id = f"{uuid.uuid4()}.txt"
|
||||
|
||||
# Should raise an exception when trying to delete non-existent file
|
||||
with pytest.raises(RuntimeError, match="does not exist or was deleted"):
|
||||
file_store.delete_file(nonexistent_file_id)
|
||||
|
||||
def test_multiple_files_different_origins(
|
||||
self, file_store: S3BackedFileStore
|
||||
) -> None:
|
||||
"""Test storing multiple files with different origins and types"""
|
||||
files_data: List[FileTestData] = [
|
||||
{
|
||||
"name": f"{uuid.uuid4()}.txt",
|
||||
"display_name": "Chat Upload File",
|
||||
"content": "Content from chat upload",
|
||||
"type": "text/plain",
|
||||
"origin": FileOrigin.CHAT_UPLOAD,
|
||||
},
|
||||
{
|
||||
"name": f"{uuid.uuid4()}.json",
|
||||
"display_name": "Connector File",
|
||||
"content": '{"from": "connector"}',
|
||||
"type": "application/json",
|
||||
"origin": FileOrigin.CONNECTOR,
|
||||
},
|
||||
{
|
||||
"name": f"{uuid.uuid4()}.csv",
|
||||
"display_name": "Generated Report",
|
||||
"content": "col1,col2\nval1,val2",
|
||||
"type": "text/csv",
|
||||
"origin": FileOrigin.GENERATED_REPORT,
|
||||
},
|
||||
]
|
||||
|
||||
# Save all files
|
||||
for file_data in files_data:
|
||||
content_io = BytesIO(file_data["content"].encode("utf-8"))
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=file_data["display_name"],
|
||||
file_origin=file_data["origin"],
|
||||
file_type=file_data["type"],
|
||||
file_id=file_data["name"],
|
||||
)
|
||||
assert returned_file_id == file_data["name"]
|
||||
|
||||
# Verify all files exist and have correct properties
|
||||
for file_data in files_data:
|
||||
assert file_store.has_file(
|
||||
file_id=file_data["name"],
|
||||
file_origin=file_data["origin"],
|
||||
file_type=file_data["type"],
|
||||
)
|
||||
|
||||
# Read and verify content
|
||||
read_content_io = file_store.read_file(file_data["name"])
|
||||
read_content = read_content_io.read().decode("utf-8")
|
||||
assert read_content == file_data["content"]
|
||||
|
||||
# Verify record
|
||||
file_record = file_store.read_file_record(file_data["name"])
|
||||
assert file_record.file_origin == file_data["origin"]
|
||||
assert file_record.file_type == file_data["type"]
|
||||
|
||||
def test_special_characters_in_filenames(
|
||||
self, file_store: S3BackedFileStore
|
||||
) -> None:
|
||||
"""Test handling of special characters in filenames"""
|
||||
# Note: S3 keys have some restrictions, so we test reasonable special characters
|
||||
special_files: List[str] = [
|
||||
f"{uuid.uuid4()} with spaces.txt",
|
||||
f"{uuid.uuid4()}-with-dashes.txt",
|
||||
f"{uuid.uuid4()}_with_underscores.txt",
|
||||
f"{uuid.uuid4()}.with.dots.txt",
|
||||
f"{uuid.uuid4()}(with)parentheses.txt",
|
||||
]
|
||||
|
||||
for file_id in special_files:
|
||||
content = f"Content for {file_id}"
|
||||
content_io = BytesIO(content.encode("utf-8"))
|
||||
|
||||
# Save the file
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=f"Display: {file_id}",
|
||||
file_origin=FileOrigin.OTHER,
|
||||
file_type="text/plain",
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Read and verify
|
||||
read_content_io = file_store.read_file(file_id)
|
||||
read_content = read_content_io.read().decode("utf-8")
|
||||
assert read_content == content
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not os.environ.get("TEST_S3_NETWORK_ERRORS"),
|
||||
reason="Network error tests require TEST_S3_NETWORK_ERRORS environment variable",
|
||||
)
|
||||
def test_network_error_handling(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test handling of network errors (requires special setup)"""
|
||||
# This test requires specific network configuration to simulate failures
|
||||
# It's marked as skip by default and only runs when explicitly enabled
|
||||
|
||||
# Mock a network error during file operations
|
||||
with patch.object(file_store, "_get_s3_client") as mock_client:
|
||||
mock_s3 = mock_client.return_value
|
||||
mock_s3.put_object.side_effect = ClientError(
|
||||
error_response={
|
||||
"Error": {
|
||||
"Code": "NetworkingError",
|
||||
"Message": "Connection timeout",
|
||||
}
|
||||
},
|
||||
operation_name="PutObject",
|
||||
)
|
||||
|
||||
content_io = BytesIO(b"test content")
|
||||
|
||||
with pytest.raises(ClientError):
|
||||
file_store.save_file(
|
||||
content=content_io,
|
||||
display_name="Network Error Test",
|
||||
file_origin=FileOrigin.OTHER,
|
||||
file_type="text/plain",
|
||||
file_id=f"{uuid.uuid4()}.txt",
|
||||
)
|
||||
|
||||
def test_database_transaction_rollback(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test database transaction rollback behavior with PostgreSQL"""
|
||||
file_id = f"{uuid.uuid4()}.txt"
|
||||
display_name = "Test Rollback"
|
||||
content = "Content for rollback test"
|
||||
file_type = "text/plain"
|
||||
file_origin = FileOrigin.OTHER
|
||||
|
||||
# Mock S3 to fail after database write but before commit
|
||||
with patch.object(file_store, "_get_s3_client") as mock_client:
|
||||
mock_s3 = mock_client.return_value
|
||||
mock_s3.put_object.side_effect = ClientError(
|
||||
error_response={
|
||||
"Error": {"Code": "InternalError", "Message": "S3 internal error"}
|
||||
},
|
||||
operation_name="PutObject",
|
||||
)
|
||||
|
||||
content_io = BytesIO(content.encode("utf-8"))
|
||||
|
||||
# This should fail and rollback the database transaction
|
||||
with pytest.raises(ClientError):
|
||||
file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
# Verify that the database record was not created due to rollback
|
||||
with pytest.raises(RuntimeError, match="does not exist or was deleted"):
|
||||
file_store.read_file_record(file_id)
|
||||
|
||||
def test_complex_jsonb_metadata(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test PostgreSQL JSONB metadata handling with complex data structures"""
|
||||
file_id = f"{uuid.uuid4()}.json"
|
||||
display_name = "Test Complex Metadata"
|
||||
content = '{"data": "test"}'
|
||||
file_type = "application/json"
|
||||
file_origin = FileOrigin.CONNECTOR
|
||||
|
||||
# Complex metadata that tests PostgreSQL JSONB capabilities
|
||||
complex_metadata: Dict[str, Any] = {
|
||||
"nested": {
|
||||
"array": [1, 2, 3, {"inner": "value"}],
|
||||
"boolean": True,
|
||||
"null_value": None,
|
||||
"number": 42.5,
|
||||
},
|
||||
"unicode": "测试数据 🚀",
|
||||
"special_chars": "Line 1\nLine 2\t\r\nSpecial: !@#$%^&*()",
|
||||
"large_text": "x" * 1000, # Test large text in JSONB
|
||||
"timestamps": {
|
||||
"created": "2024-01-01T00:00:00Z",
|
||||
"updated": "2024-01-02T12:30:45Z",
|
||||
},
|
||||
}
|
||||
|
||||
# Save file with complex metadata
|
||||
content_io = BytesIO(content.encode("utf-8"))
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_metadata=complex_metadata,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Retrieve and verify the metadata was stored correctly
|
||||
file_record = file_store.read_file_record(file_id)
|
||||
stored_metadata = file_record.file_metadata
|
||||
|
||||
# Verify all metadata fields were preserved
|
||||
assert stored_metadata == complex_metadata
|
||||
|
||||
# Type casting for complex metadata access
|
||||
stored_metadata_dict = cast(Dict[str, Any], stored_metadata)
|
||||
nested_data = cast(Dict[str, Any], stored_metadata_dict["nested"])
|
||||
array_data = cast(List[Any], nested_data["array"])
|
||||
inner_obj = cast(Dict[str, Any], array_data[3])
|
||||
|
||||
assert inner_obj["inner"] == "value"
|
||||
assert stored_metadata_dict["unicode"] == "测试数据 🚀"
|
||||
assert nested_data["boolean"] is True
|
||||
assert nested_data["null_value"] is None
|
||||
assert len(cast(str, stored_metadata_dict["large_text"])) == 1000
|
||||
|
||||
def test_database_consistency_after_s3_failure(
|
||||
self, file_store: S3BackedFileStore
|
||||
) -> None:
|
||||
"""Test that database stays consistent when S3 operations fail"""
|
||||
file_id = f"{uuid.uuid4()}.txt"
|
||||
display_name = "Test Consistency"
|
||||
content = "Initial content"
|
||||
file_type = "text/plain"
|
||||
file_origin = FileOrigin.OTHER
|
||||
|
||||
# First, save a file successfully
|
||||
content_io = BytesIO(content.encode("utf-8"))
|
||||
returned_file_id = file_store.save_file(
|
||||
content=content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
assert returned_file_id == file_id
|
||||
|
||||
# Verify initial state
|
||||
assert file_store.has_file(file_id, file_origin, file_type)
|
||||
initial_record = file_store.read_file_record(file_id)
|
||||
|
||||
# Now try to update but fail on S3 side
|
||||
with patch.object(file_store, "_get_s3_client") as mock_client:
|
||||
mock_s3 = mock_client.return_value
|
||||
# Let the first call (for reading/checking) succeed, but fail on put_object
|
||||
mock_s3.put_object.side_effect = ClientError(
|
||||
error_response={
|
||||
"Error": {
|
||||
"Code": "ServiceUnavailable",
|
||||
"Message": "Service temporarily unavailable",
|
||||
}
|
||||
},
|
||||
operation_name="PutObject",
|
||||
)
|
||||
|
||||
new_content = "Updated content that should fail"
|
||||
new_content_io = BytesIO(new_content.encode("utf-8"))
|
||||
|
||||
# This should fail and rollback
|
||||
with pytest.raises(ClientError):
|
||||
file_store.save_file(
|
||||
content=new_content_io,
|
||||
display_name=display_name,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
file_id=file_id,
|
||||
)
|
||||
|
||||
# Verify the database record is unchanged (not updated)
|
||||
current_record = file_store.read_file_record(file_id)
|
||||
assert current_record.file_id == initial_record.file_id
|
||||
assert current_record.display_name == initial_record.display_name
|
||||
assert current_record.bucket_name == initial_record.bucket_name
|
||||
assert current_record.object_key == initial_record.object_key
|
||||
|
||||
# Verify we can still read the original file content
|
||||
read_content_io = file_store.read_file(file_id)
|
||||
read_content = read_content_io.read().decode("utf-8")
|
||||
assert read_content == content # Original content, not the failed update
|
||||
|
||||
def test_concurrent_file_operations(self, file_store: S3BackedFileStore) -> None:
|
||||
"""Test handling of concurrent file operations on the same file"""
|
||||
base_file_name: str = str(uuid.uuid4())
|
||||
file_type: str = "text/plain"
|
||||
file_origin: FileOrigin = FileOrigin.OTHER
|
||||
|
||||
# Get current file store configuration to replicate in workers
|
||||
current_bucket_name = file_store._get_bucket_name()
|
||||
current_access_key = file_store._aws_access_key_id
|
||||
current_secret_key = file_store._aws_secret_access_key
|
||||
current_region = file_store._aws_region_name
|
||||
current_endpoint_url = file_store._s3_endpoint_url
|
||||
current_verify_ssl = file_store._s3_verify_ssl
|
||||
|
||||
results: List[Tuple[str, str]] = []
|
||||
errors: List[Tuple[int, str]] = []
|
||||
|
||||
def save_file_worker(worker_id: int) -> bool:
|
||||
"""Worker function to save a file with its own database session"""
|
||||
try:
|
||||
# Set up tenant context for this worker
|
||||
token = CURRENT_TENANT_ID_CONTEXTVAR.set(TEST_TENANT_ID)
|
||||
try:
|
||||
# Create a new database session for each worker to avoid conflicts
|
||||
with get_session_context_manager() as worker_session:
|
||||
worker_file_store = S3BackedFileStore(
|
||||
db_session=worker_session,
|
||||
bucket_name=current_bucket_name,
|
||||
aws_access_key_id=current_access_key,
|
||||
aws_secret_access_key=current_secret_key,
|
||||
aws_region_name=current_region,
|
||||
s3_endpoint_url=current_endpoint_url,
|
||||
s3_prefix=TEST_FILE_PREFIX,
|
||||
s3_verify_ssl=current_verify_ssl,
|
||||
)
|
||||
|
||||
file_name: str = f"{base_file_name}_{worker_id}.txt"
|
||||
content: str = (
|
||||
f"Content from worker {worker_id} at {time.time()}"
|
||||
)
|
||||
content_io: BytesIO = BytesIO(content.encode("utf-8"))
|
||||
|
||||
worker_file_store.save_file(
|
||||
file_id=file_name,
|
||||
content=content_io,
|
||||
display_name=f"Worker {worker_id} File",
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
)
|
||||
results.append((file_name, content))
|
||||
return True
|
||||
finally:
|
||||
# Reset the tenant context after the worker completes
|
||||
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
|
||||
except Exception as e:
|
||||
errors.append((worker_id, str(e)))
|
||||
return False
|
||||
|
||||
# Run multiple concurrent file save operations
|
||||
with ThreadPoolExecutor(max_workers=5) as executor:
|
||||
futures = [executor.submit(save_file_worker, i) for i in range(10)]
|
||||
|
||||
for future in as_completed(futures):
|
||||
future.result() # Wait for completion
|
||||
|
||||
# Verify all operations completed successfully
|
||||
assert len(errors) == 0, f"Concurrent operations had errors: {errors}"
|
||||
assert (
|
||||
len(results) == 10
|
||||
), f"Expected 10 successful operations, got {len(results)}"
|
||||
|
||||
# Verify all files were saved correctly
|
||||
for file_id, expected_content in results:
|
||||
# Check file exists
|
||||
assert file_store.has_file(
|
||||
file_id=file_id,
|
||||
file_origin=file_origin,
|
||||
file_type=file_type,
|
||||
)
|
||||
|
||||
# Check content is correct
|
||||
read_content_io = file_store.read_file(file_id)
|
||||
actual_content: str = read_content_io.read().decode("utf-8")
|
||||
assert actual_content == expected_content
|
||||
@@ -233,10 +233,11 @@ class DocumentManager:
|
||||
for doc_dict in retrieved_docs_dict:
|
||||
doc_id = doc_dict["fields"]["document_id"]
|
||||
doc_content = doc_dict["fields"]["content"]
|
||||
image_file_name = doc_dict["fields"].get("image_file_name", None)
|
||||
# still called `image_file_name` in Vespa for backwards compatibility
|
||||
image_file_id = doc_dict["fields"].get("image_file_name", None)
|
||||
final_docs.append(
|
||||
SimpleTestDocument(
|
||||
id=doc_id, content=doc_content, image_file_name=image_file_name
|
||||
id=doc_id, content=doc_content, image_file_id=image_file_id
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ from typing import Tuple
|
||||
import requests
|
||||
|
||||
from onyx.file_store.models import FileDescriptor
|
||||
from onyx.server.documents.models import FileUploadResponse
|
||||
from tests.integration.common_utils.constants import API_SERVER_URL
|
||||
from tests.integration.common_utils.constants import GENERAL_HEADERS
|
||||
from tests.integration.common_utils.test_models import DATestUser
|
||||
@@ -70,7 +71,7 @@ class FileManager:
|
||||
file_name: str,
|
||||
user_performing_action: DATestUser,
|
||||
content_type: str = "application/octet-stream",
|
||||
) -> dict:
|
||||
) -> FileUploadResponse:
|
||||
# Read the file content
|
||||
with open(file_path, "rb") as f:
|
||||
file_content = f.read()
|
||||
@@ -105,4 +106,4 @@ class FileManager:
|
||||
)
|
||||
|
||||
response_json = response.json()
|
||||
return response_json
|
||||
return FileUploadResponse(**response_json)
|
||||
|
||||
@@ -76,7 +76,7 @@ class DATestConnector(BaseModel):
|
||||
class SimpleTestDocument(BaseModel):
|
||||
id: str
|
||||
content: str
|
||||
image_file_name: str | None = None
|
||||
image_file_id: str | None = None
|
||||
|
||||
|
||||
class DATestCCPair(BaseModel):
|
||||
|
||||
@@ -15,7 +15,6 @@ from tests.integration.common_utils.managers.document import DocumentManager
|
||||
from tests.integration.common_utils.managers.file import FileManager
|
||||
from tests.integration.common_utils.managers.llm_provider import LLMProviderManager
|
||||
from tests.integration.common_utils.managers.settings import SettingsManager
|
||||
from tests.integration.common_utils.managers.user import UserManager
|
||||
from tests.integration.common_utils.test_models import DATestSettings
|
||||
from tests.integration.common_utils.test_models import DATestUser
|
||||
from tests.integration.common_utils.vespa import vespa_fixture
|
||||
@@ -26,13 +25,9 @@ FILE_PATH = "tests/integration/common_utils/test_files"
|
||||
|
||||
def test_image_indexing(
|
||||
reset: None,
|
||||
admin_user: DATestUser,
|
||||
vespa_client: vespa_fixture,
|
||||
) -> None:
|
||||
# Creating an admin user (first user created is automatically an admin)
|
||||
admin_user: DATestUser = UserManager.create(
|
||||
email="admin@onyx-test.com",
|
||||
)
|
||||
|
||||
os.makedirs(FILE_PATH, exist_ok=True)
|
||||
test_file_path = os.path.join(FILE_PATH, FILE_NAME)
|
||||
|
||||
@@ -54,7 +49,7 @@ def test_image_indexing(
|
||||
user_performing_action=admin_user,
|
||||
)
|
||||
|
||||
file_paths = upload_response.get("file_paths", [])
|
||||
file_paths = upload_response.file_paths
|
||||
|
||||
if not file_paths:
|
||||
pytest.fail("File upload failed - no file paths returned")
|
||||
@@ -95,23 +90,23 @@ def test_image_indexing(
|
||||
CCPairManager.wait_for_indexing_completion(
|
||||
cc_pair=cc_pair,
|
||||
after=datetime.now(timezone.utc),
|
||||
timeout=180,
|
||||
user_performing_action=admin_user,
|
||||
)
|
||||
|
||||
with get_session_context_manager() as db_session:
|
||||
# really gets the chunks from Vespa, which is why there are two;
|
||||
# one for the raw text and one for the summarized image.
|
||||
documents = DocumentManager.fetch_documents_for_cc_pair(
|
||||
cc_pair_id=cc_pair.id,
|
||||
db_session=db_session,
|
||||
vespa_client=vespa_client,
|
||||
)
|
||||
|
||||
# Ensure we indexed an image from the sample.pdf file
|
||||
has_sample_pdf_image = False
|
||||
for doc in documents:
|
||||
if doc.image_file_name and FILE_NAME in doc.image_file_name:
|
||||
has_sample_pdf_image = True
|
||||
|
||||
# Assert that at least one document has an image file name containing "sample.pdf"
|
||||
assert (
|
||||
has_sample_pdf_image
|
||||
), "No document found with an image file name containing 'sample.pdf'"
|
||||
assert len(documents) == 2
|
||||
for document in documents:
|
||||
if "These are Johns dogs" in document.content:
|
||||
assert document.image_file_id is None
|
||||
else:
|
||||
assert document.image_file_id is not None
|
||||
assert file_paths[0] in document.image_file_id
|
||||
|
||||
@@ -53,10 +53,10 @@ def test_zip_metadata_handling(
|
||||
content_type="application/zip",
|
||||
)
|
||||
|
||||
file_paths = upload_response.get("file_paths", [])
|
||||
file_paths = upload_response.file_paths
|
||||
assert file_paths, "File upload failed - no file paths returned"
|
||||
if has_metadata:
|
||||
metadata = upload_response.get("zip_metadata", {})
|
||||
metadata = upload_response.zip_metadata
|
||||
assert metadata, "Metadata should be present"
|
||||
else:
|
||||
metadata = {}
|
||||
|
||||
@@ -84,13 +84,13 @@ def test_mock_connector_basic_flow(
|
||||
|
||||
# Verify results
|
||||
with get_session_context_manager() as db_session:
|
||||
documents = DocumentManager.fetch_documents_for_cc_pair(
|
||||
chunks = DocumentManager.fetch_documents_for_cc_pair(
|
||||
cc_pair_id=cc_pair.id,
|
||||
db_session=db_session,
|
||||
vespa_client=vespa_client,
|
||||
)
|
||||
assert len(documents) == 1
|
||||
assert documents[0].id == test_doc.id
|
||||
assert len(chunks) == 1
|
||||
assert chunks[0].id == test_doc.id
|
||||
|
||||
errors = IndexAttemptManager.get_index_attempt_errors_for_cc_pair(
|
||||
cc_pair_id=cc_pair.id,
|
||||
|
||||
@@ -34,7 +34,7 @@ def create_test_chunk(
|
||||
metadata={},
|
||||
match_highlights=[],
|
||||
updated_at=datetime.now(),
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
doc_summary="",
|
||||
chunk_context="",
|
||||
)
|
||||
|
||||
328
backend/tests/unit/file_store/test_file_store.py
Normal file
328
backend/tests/unit/file_store/test_file_store.py
Normal file
@@ -0,0 +1,328 @@
|
||||
import datetime
|
||||
from collections.abc import Generator
|
||||
from io import BytesIO
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import Mock
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import DateTime
|
||||
from sqlalchemy import Enum
|
||||
from sqlalchemy import String
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import Mapped
|
||||
from sqlalchemy.orm import mapped_column
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.sql import func
|
||||
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.file_store.file_store import get_default_file_store
|
||||
from onyx.file_store.file_store import S3BackedFileStore
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db_session() -> Generator[Session, None, None]:
|
||||
"""Create an in-memory SQLite database for testing"""
|
||||
# Create test-specific base and model that matches the actual FileRecord structure
|
||||
# but uses SQLite-compatible types
|
||||
TestDBBase = declarative_base()
|
||||
|
||||
class FileRecord(TestDBBase):
|
||||
__tablename__: str = "file_record"
|
||||
|
||||
# Internal file ID, must be unique across all files
|
||||
file_id: Mapped[str] = mapped_column(String, primary_key=True)
|
||||
|
||||
display_name: Mapped[str | None] = mapped_column(String, nullable=True)
|
||||
file_origin: Mapped[FileOrigin] = mapped_column(
|
||||
Enum(FileOrigin, native_enum=False), nullable=False
|
||||
)
|
||||
file_type: Mapped[str] = mapped_column(String, default="text/plain")
|
||||
|
||||
# External storage support (S3, MinIO, Azure Blob, etc.)
|
||||
bucket_name: Mapped[str] = mapped_column(String, nullable=False)
|
||||
object_key: Mapped[str] = mapped_column(String, nullable=False)
|
||||
|
||||
# Timestamps for external storage
|
||||
created_at: Mapped[datetime.datetime] = mapped_column(
|
||||
DateTime(timezone=True), server_default=func.now(), nullable=False
|
||||
)
|
||||
updated_at: Mapped[datetime.datetime] = mapped_column(
|
||||
DateTime(timezone=True), server_default=func.now(), nullable=False
|
||||
)
|
||||
|
||||
engine = create_engine("sqlite:///:memory:")
|
||||
TestDBBase.metadata.create_all(engine)
|
||||
SessionLocal = sessionmaker(bind=engine)
|
||||
session = SessionLocal()
|
||||
yield session
|
||||
session.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_content() -> bytes:
|
||||
"""Sample file content for testing"""
|
||||
return b"This is a test file content"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_file_io(sample_content: bytes) -> BytesIO:
|
||||
"""Sample file IO object for testing"""
|
||||
return BytesIO(sample_content)
|
||||
|
||||
|
||||
class TestExternalStorageFileStore:
|
||||
"""Test external storage file store functionality (S3-compatible)"""
|
||||
|
||||
def test_get_default_file_store_s3(self, db_session: Session) -> None:
|
||||
"""Test that external storage file store is returned"""
|
||||
file_store = get_default_file_store(db_session)
|
||||
assert isinstance(file_store, S3BackedFileStore)
|
||||
|
||||
def test_s3_client_initialization_with_credentials(
|
||||
self, db_session: Session
|
||||
) -> None:
|
||||
"""Test S3 client initialization with explicit credentials"""
|
||||
with patch("boto3.client") as mock_boto3:
|
||||
file_store = S3BackedFileStore(
|
||||
db_session,
|
||||
bucket_name="test-bucket",
|
||||
aws_access_key_id="test-key",
|
||||
aws_secret_access_key="test-secret",
|
||||
aws_region_name="us-west-2",
|
||||
s3_endpoint_url=None,
|
||||
)
|
||||
file_store._get_s3_client()
|
||||
|
||||
# Verify boto3 client was called with the expected arguments
|
||||
mock_boto3.assert_called_once()
|
||||
call_kwargs: dict[str, Any] = mock_boto3.call_args[1]
|
||||
|
||||
assert call_kwargs["service_name"] == "s3"
|
||||
assert call_kwargs["aws_access_key_id"] == "test-key"
|
||||
assert call_kwargs["aws_secret_access_key"] == "test-secret"
|
||||
assert call_kwargs["region_name"] == "us-west-2"
|
||||
|
||||
def test_s3_client_initialization_with_iam_role(self, db_session: Session) -> None:
|
||||
"""Test S3 client initialization with IAM role (no explicit credentials)"""
|
||||
with patch("boto3.client") as mock_boto3:
|
||||
file_store = S3BackedFileStore(
|
||||
db_session,
|
||||
bucket_name="test-bucket",
|
||||
aws_access_key_id=None,
|
||||
aws_secret_access_key=None,
|
||||
aws_region_name="us-west-2",
|
||||
s3_endpoint_url=None,
|
||||
)
|
||||
file_store._get_s3_client()
|
||||
|
||||
# Verify boto3 client was called with the expected arguments
|
||||
mock_boto3.assert_called_once()
|
||||
call_kwargs: dict[str, Any] = mock_boto3.call_args[1]
|
||||
|
||||
assert call_kwargs["service_name"] == "s3"
|
||||
assert call_kwargs["region_name"] == "us-west-2"
|
||||
# Should not have explicit credentials
|
||||
assert "aws_access_key_id" not in call_kwargs
|
||||
assert "aws_secret_access_key" not in call_kwargs
|
||||
|
||||
def test_s3_bucket_name_configuration(self, db_session: Session) -> None:
|
||||
"""Test S3 bucket name configuration"""
|
||||
with patch(
|
||||
"onyx.file_store.file_store.S3_FILE_STORE_BUCKET_NAME", "my-test-bucket"
|
||||
):
|
||||
file_store = S3BackedFileStore(db_session, bucket_name="my-test-bucket")
|
||||
bucket_name: str = file_store._get_bucket_name()
|
||||
assert bucket_name == "my-test-bucket"
|
||||
|
||||
def test_s3_key_generation_default_prefix(self, db_session: Session) -> None:
|
||||
"""Test S3 key generation with default prefix"""
|
||||
with (
|
||||
patch("onyx.file_store.file_store.S3_FILE_STORE_PREFIX", "onyx-files"),
|
||||
patch(
|
||||
"onyx.file_store.file_store.get_current_tenant_id",
|
||||
return_value="test-tenant",
|
||||
),
|
||||
):
|
||||
file_store = S3BackedFileStore(db_session, bucket_name="test-bucket")
|
||||
s3_key: str = file_store._get_s3_key("test-file.txt")
|
||||
assert s3_key == "onyx-files/test-tenant/test-file.txt"
|
||||
|
||||
def test_s3_key_generation_custom_prefix(self, db_session: Session) -> None:
|
||||
"""Test S3 key generation with custom prefix"""
|
||||
with (
|
||||
patch("onyx.file_store.file_store.S3_FILE_STORE_PREFIX", "custom-prefix"),
|
||||
patch(
|
||||
"onyx.file_store.file_store.get_current_tenant_id",
|
||||
return_value="test-tenant",
|
||||
),
|
||||
):
|
||||
file_store = S3BackedFileStore(
|
||||
db_session, bucket_name="test-bucket", s3_prefix="custom-prefix"
|
||||
)
|
||||
s3_key: str = file_store._get_s3_key("test-file.txt")
|
||||
assert s3_key == "custom-prefix/test-tenant/test-file.txt"
|
||||
|
||||
def test_s3_key_generation_with_different_tenant_ids(
|
||||
self, db_session: Session
|
||||
) -> None:
|
||||
"""Test S3 key generation with different tenant IDs"""
|
||||
with patch("onyx.file_store.file_store.S3_FILE_STORE_PREFIX", "onyx-files"):
|
||||
file_store = S3BackedFileStore(db_session, bucket_name="test-bucket")
|
||||
|
||||
# Test with tenant ID "tenant-1"
|
||||
with patch(
|
||||
"onyx.file_store.file_store.get_current_tenant_id",
|
||||
return_value="tenant-1",
|
||||
):
|
||||
s3_key = file_store._get_s3_key("document.pdf")
|
||||
assert s3_key == "onyx-files/tenant-1/document.pdf"
|
||||
|
||||
# Test with tenant ID "tenant-2"
|
||||
with patch(
|
||||
"onyx.file_store.file_store.get_current_tenant_id",
|
||||
return_value="tenant-2",
|
||||
):
|
||||
s3_key = file_store._get_s3_key("document.pdf")
|
||||
assert s3_key == "onyx-files/tenant-2/document.pdf"
|
||||
|
||||
# Test with default tenant (public)
|
||||
with patch(
|
||||
"onyx.file_store.file_store.get_current_tenant_id",
|
||||
return_value="public",
|
||||
):
|
||||
s3_key = file_store._get_s3_key("document.pdf")
|
||||
assert s3_key == "onyx-files/public/document.pdf"
|
||||
|
||||
@patch("boto3.client")
|
||||
def test_s3_save_file_mock(
|
||||
self, mock_boto3: MagicMock, db_session: Session, sample_file_io: BytesIO
|
||||
) -> None:
|
||||
"""Test S3 file saving with mocked S3 client"""
|
||||
# Setup S3 mock
|
||||
mock_s3_client: Mock = Mock()
|
||||
mock_boto3.return_value = mock_s3_client
|
||||
|
||||
# Create a mock database session
|
||||
mock_db_session: Mock = Mock()
|
||||
mock_db_session.commit = Mock()
|
||||
mock_db_session.rollback = Mock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"onyx.file_store.file_store.S3_FILE_STORE_BUCKET_NAME", "test-bucket"
|
||||
),
|
||||
patch("onyx.file_store.file_store.S3_FILE_STORE_PREFIX", "onyx-files"),
|
||||
patch("onyx.file_store.file_store.S3_AWS_ACCESS_KEY_ID", "test-key"),
|
||||
patch("onyx.file_store.file_store.S3_AWS_SECRET_ACCESS_KEY", "test-secret"),
|
||||
):
|
||||
|
||||
# Mock the database operation to avoid SQLAlchemy issues
|
||||
with patch("onyx.db.file_record.upsert_filerecord") as mock_upsert:
|
||||
mock_upsert.return_value = Mock()
|
||||
|
||||
file_store = S3BackedFileStore(
|
||||
mock_db_session, bucket_name="test-bucket"
|
||||
)
|
||||
|
||||
# This should not raise an exception
|
||||
file_store.save_file(
|
||||
file_id="test-file.txt",
|
||||
content=sample_file_io,
|
||||
display_name="Test File",
|
||||
file_origin=FileOrigin.OTHER,
|
||||
file_type="text/plain",
|
||||
)
|
||||
|
||||
# Verify S3 client was called correctly
|
||||
mock_s3_client.put_object.assert_called_once()
|
||||
call_args = mock_s3_client.put_object.call_args
|
||||
assert call_args[1]["Bucket"] == "test-bucket"
|
||||
assert call_args[1]["Key"] == "onyx-files/public/test-file.txt"
|
||||
assert call_args[1]["ContentType"] == "text/plain"
|
||||
|
||||
def test_minio_client_initialization(self, db_session: Session) -> None:
|
||||
"""Test S3 client initialization with MinIO endpoint"""
|
||||
with (
|
||||
patch("boto3.client") as mock_boto3,
|
||||
patch("urllib3.disable_warnings"),
|
||||
):
|
||||
file_store = S3BackedFileStore(
|
||||
db_session,
|
||||
bucket_name="test-bucket",
|
||||
aws_access_key_id="minioadmin",
|
||||
aws_secret_access_key="minioadmin",
|
||||
aws_region_name="us-east-1",
|
||||
s3_endpoint_url="http://localhost:9000",
|
||||
s3_verify_ssl=False,
|
||||
)
|
||||
file_store._get_s3_client()
|
||||
|
||||
# Verify boto3 client was called with MinIO-specific settings
|
||||
mock_boto3.assert_called_once()
|
||||
call_kwargs: dict[str, Any] = mock_boto3.call_args[1]
|
||||
|
||||
assert call_kwargs["service_name"] == "s3"
|
||||
assert call_kwargs["endpoint_url"] == "http://localhost:9000"
|
||||
assert call_kwargs["aws_access_key_id"] == "minioadmin"
|
||||
assert call_kwargs["aws_secret_access_key"] == "minioadmin"
|
||||
assert call_kwargs["region_name"] == "us-east-1"
|
||||
assert call_kwargs["verify"] is False
|
||||
|
||||
# Verify S3 configuration for MinIO
|
||||
config = call_kwargs["config"]
|
||||
assert config.signature_version == "s3v4"
|
||||
assert config.s3["addressing_style"] == "path"
|
||||
|
||||
def test_minio_ssl_verification_enabled(self, db_session: Session) -> None:
|
||||
"""Test MinIO with SSL verification enabled"""
|
||||
with patch("boto3.client") as mock_boto3:
|
||||
file_store = S3BackedFileStore(
|
||||
db_session,
|
||||
bucket_name="test-bucket",
|
||||
aws_access_key_id="test-key",
|
||||
aws_secret_access_key="test-secret",
|
||||
s3_endpoint_url="https://minio.example.com",
|
||||
s3_verify_ssl=True,
|
||||
)
|
||||
file_store._get_s3_client()
|
||||
|
||||
call_kwargs: dict[str, Any] = mock_boto3.call_args[1]
|
||||
# When SSL verification is enabled, verify should not be in kwargs (defaults to True)
|
||||
assert "verify" not in call_kwargs or call_kwargs.get("verify") is not False
|
||||
assert call_kwargs["endpoint_url"] == "https://minio.example.com"
|
||||
|
||||
def test_aws_s3_without_endpoint_url(self, db_session: Session) -> None:
|
||||
"""Test that regular AWS S3 doesn't include endpoint URL or custom config"""
|
||||
with patch("boto3.client") as mock_boto3:
|
||||
file_store = S3BackedFileStore(
|
||||
db_session,
|
||||
bucket_name="test-bucket",
|
||||
aws_access_key_id="test-key",
|
||||
aws_secret_access_key="test-secret",
|
||||
aws_region_name="us-west-2",
|
||||
s3_endpoint_url=None,
|
||||
)
|
||||
file_store._get_s3_client()
|
||||
|
||||
call_kwargs: dict[str, Any] = mock_boto3.call_args[1]
|
||||
|
||||
# For regular AWS S3, endpoint_url should not be present
|
||||
assert "endpoint_url" not in call_kwargs
|
||||
assert call_kwargs["service_name"] == "s3"
|
||||
assert call_kwargs["region_name"] == "us-west-2"
|
||||
# config should not be present for regular AWS S3
|
||||
assert "config" not in call_kwargs
|
||||
|
||||
|
||||
class TestFileStoreInterface:
|
||||
"""Test the general file store interface"""
|
||||
|
||||
def test_file_store_always_external_storage(self, db_session: Session) -> None:
|
||||
"""Test that external storage file store is always returned"""
|
||||
# File store should always be S3BackedFileStore regardless of environment
|
||||
file_store = get_default_file_store(db_session)
|
||||
assert isinstance(file_store, S3BackedFileStore)
|
||||
@@ -65,7 +65,7 @@ def create_test_inference_chunk(
|
||||
section_continuation=False,
|
||||
document_id=document_id,
|
||||
source_type=DocumentSource.FILE,
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
title=title,
|
||||
semantic_identifier=semantic_identifier,
|
||||
boost=1,
|
||||
|
||||
@@ -85,7 +85,7 @@ def mock_inference_sections() -> list[InferenceSection]:
|
||||
updated_at=datetime(2023, 1, 1),
|
||||
source_links={0: "https://example.com/doc1"},
|
||||
match_highlights=[],
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
doc_summary="",
|
||||
chunk_context="",
|
||||
),
|
||||
@@ -110,7 +110,7 @@ def mock_inference_sections() -> list[InferenceSection]:
|
||||
updated_at=datetime(2023, 1, 2),
|
||||
source_links={0: "https://example.com/doc2"},
|
||||
match_highlights=[],
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
doc_summary="",
|
||||
chunk_context="",
|
||||
),
|
||||
|
||||
@@ -150,7 +150,7 @@ def test_fuzzy_match_quotes_to_docs() -> None:
|
||||
metadata={},
|
||||
match_highlights=[],
|
||||
updated_at=None,
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
doc_summary="",
|
||||
chunk_context="",
|
||||
)
|
||||
@@ -171,7 +171,7 @@ def test_fuzzy_match_quotes_to_docs() -> None:
|
||||
metadata={},
|
||||
match_highlights=[],
|
||||
updated_at=None,
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
doc_summary="",
|
||||
chunk_context="",
|
||||
)
|
||||
|
||||
@@ -38,7 +38,7 @@ def create_inference_chunk(
|
||||
metadata={},
|
||||
match_highlights=[],
|
||||
updated_at=None,
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
doc_summary="",
|
||||
chunk_context="",
|
||||
)
|
||||
|
||||
@@ -38,7 +38,7 @@ class TestPostQueryChunkCensoring:
|
||||
doc_summary="doc1 summary",
|
||||
chunk_context="doc1 context",
|
||||
updated_at=None,
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
source_links={},
|
||||
section_continuation=False,
|
||||
blurb="chunk1",
|
||||
@@ -59,7 +59,7 @@ class TestPostQueryChunkCensoring:
|
||||
doc_summary="doc2 summary",
|
||||
chunk_context="doc2 context",
|
||||
updated_at=None,
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
source_links={},
|
||||
section_continuation=False,
|
||||
blurb="chunk2",
|
||||
@@ -80,7 +80,7 @@ class TestPostQueryChunkCensoring:
|
||||
doc_summary="doc3 summary",
|
||||
chunk_context="doc3 context",
|
||||
updated_at=None,
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
source_links={},
|
||||
section_continuation=False,
|
||||
blurb="chunk3",
|
||||
@@ -101,7 +101,7 @@ class TestPostQueryChunkCensoring:
|
||||
doc_summary="doc4 summary",
|
||||
chunk_context="doc4 context",
|
||||
updated_at=None,
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
source_links={},
|
||||
section_continuation=False,
|
||||
blurb="chunk4",
|
||||
|
||||
@@ -68,7 +68,7 @@ def test_default_indexing_embedder_embed_chunks(
|
||||
mini_chunk_texts=None,
|
||||
large_chunk_reference_ids=[],
|
||||
large_chunk_id=None,
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
chunk_context=chunk_context,
|
||||
doc_summary=doc_summary,
|
||||
contextual_rag_reserved_tokens=200,
|
||||
|
||||
@@ -172,7 +172,7 @@ def create_test_chunk(
|
||||
large_chunk_reference_ids=[],
|
||||
embeddings=ChunkEmbedding(full_embedding=[], mini_chunk_embeddings=[]),
|
||||
title_embedding=None,
|
||||
image_file_name=None,
|
||||
image_file_id=None,
|
||||
chunk_context="",
|
||||
doc_summary="",
|
||||
contextual_rag_reserved_tokens=200,
|
||||
|
||||
@@ -71,6 +71,11 @@ services:
|
||||
- VESPA_HOST=index
|
||||
- REDIS_HOST=cache
|
||||
- WEB_DOMAIN=${WEB_DOMAIN:-} # For frontend redirect auth purpose
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
|
||||
# Don't change the NLP model configs unless you know what you're doing
|
||||
- EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-}
|
||||
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
|
||||
@@ -191,6 +196,11 @@ services:
|
||||
- VESPA_HOST=index
|
||||
- REDIS_HOST=cache
|
||||
- WEB_DOMAIN=${WEB_DOMAIN:-} # For frontend redirect auth purpose for OAuth2 connectors
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
|
||||
# Don't change the NLP model configs unless you know what you're doing
|
||||
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
|
||||
- DOC_EMBEDDING_DIM=${DOC_EMBEDDING_DIM:-}
|
||||
@@ -448,9 +458,29 @@ services:
|
||||
# persistence. explicitly setting save and appendonly forces ephemeral behavior.
|
||||
command: redis-server --save "" --appendonly no
|
||||
|
||||
minio:
|
||||
image: minio/minio:latest
|
||||
restart: always
|
||||
ports:
|
||||
- "9004:9000"
|
||||
- "9005:9001"
|
||||
environment:
|
||||
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
|
||||
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
|
||||
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
|
||||
volumes:
|
||||
- minio_data:/data
|
||||
command: server /data --console-address ":9001"
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
|
||||
interval: 30s
|
||||
timeout: 20s
|
||||
retries: 3
|
||||
|
||||
volumes:
|
||||
db_volume:
|
||||
vespa_volume: # Created by the container itself
|
||||
minio_data:
|
||||
|
||||
model_cache_huggingface:
|
||||
indexing_huggingface_model_cache:
|
||||
|
||||
@@ -58,6 +58,11 @@ services:
|
||||
- VESPA_HOST=index
|
||||
- REDIS_HOST=cache
|
||||
- WEB_DOMAIN=${WEB_DOMAIN:-} # For frontend redirect auth purpose
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
|
||||
# Don't change the NLP model configs unless you know what you're doing
|
||||
- EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-}
|
||||
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
|
||||
@@ -154,6 +159,11 @@ services:
|
||||
- VESPA_HOST=index
|
||||
- REDIS_HOST=cache
|
||||
- WEB_DOMAIN=${WEB_DOMAIN:-} # For frontend redirect auth purpose for OAuth2 connectors
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
|
||||
# Don't change the NLP model configs unless you know what you're doing
|
||||
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
|
||||
- DOC_EMBEDDING_DIM=${DOC_EMBEDDING_DIM:-}
|
||||
@@ -385,6 +395,25 @@ services:
|
||||
/bin/sh -c "dos2unix /etc/nginx/conf.d/run-nginx.sh
|
||||
&& /etc/nginx/conf.d/run-nginx.sh app.conf.template.dev"
|
||||
|
||||
minio:
|
||||
image: minio/minio:latest
|
||||
restart: always
|
||||
ports:
|
||||
- "9004:9000"
|
||||
- "9005:9001"
|
||||
environment:
|
||||
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
|
||||
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
|
||||
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
|
||||
volumes:
|
||||
- minio_data:/data
|
||||
command: server /data --console-address ":9001"
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
|
||||
interval: 30s
|
||||
timeout: 20s
|
||||
retries: 3
|
||||
|
||||
cache:
|
||||
image: redis:7.4-alpine
|
||||
restart: always
|
||||
@@ -397,6 +426,7 @@ services:
|
||||
volumes:
|
||||
db_volume:
|
||||
vespa_volume:
|
||||
minio_data:
|
||||
# Created by the container itself
|
||||
model_cache_huggingface:
|
||||
indexing_huggingface_model_cache:
|
||||
|
||||
@@ -70,6 +70,11 @@ services:
|
||||
- VESPA_HOST=index
|
||||
- REDIS_HOST=cache
|
||||
- WEB_DOMAIN=${WEB_DOMAIN:-}
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
|
||||
# Don't change the NLP model configs unless you know what you're doing
|
||||
- EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-}
|
||||
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
|
||||
@@ -173,6 +178,11 @@ services:
|
||||
- VESPA_HOST=index
|
||||
- REDIS_HOST=cache
|
||||
- WEB_DOMAIN=${WEB_DOMAIN:-}
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
|
||||
# Don't change the NLP model configs unless you know what you're doing
|
||||
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
|
||||
- DOC_EMBEDDING_DIM=${DOC_EMBEDDING_DIM:-}
|
||||
@@ -412,6 +422,25 @@ services:
|
||||
/bin/sh -c "dos2unix /etc/nginx/conf.d/run-nginx.sh
|
||||
&& /etc/nginx/conf.d/run-nginx.sh app.conf.template.dev"
|
||||
|
||||
minio:
|
||||
image: minio/minio:latest
|
||||
restart: always
|
||||
ports:
|
||||
- "9004:9000"
|
||||
- "9005:9001"
|
||||
environment:
|
||||
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
|
||||
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
|
||||
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
|
||||
volumes:
|
||||
- minio_data:/data
|
||||
command: server /data --console-address ":9001"
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
|
||||
interval: 30s
|
||||
timeout: 20s
|
||||
retries: 3
|
||||
|
||||
cache:
|
||||
image: redis:7.4-alpine
|
||||
restart: always
|
||||
@@ -424,6 +453,7 @@ services:
|
||||
volumes:
|
||||
db_volume:
|
||||
vespa_volume: # Created by the container itself
|
||||
minio_data:
|
||||
|
||||
model_cache_huggingface:
|
||||
indexing_huggingface_model_cache:
|
||||
|
||||
@@ -22,6 +22,10 @@ services:
|
||||
- VESPA_HOST=index
|
||||
- REDIS_HOST=cache
|
||||
- MODEL_SERVER_HOST=${MODEL_SERVER_HOST:-inference_model_server}
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
logging:
|
||||
@@ -52,6 +56,10 @@ services:
|
||||
- REDIS_HOST=cache
|
||||
- MODEL_SERVER_HOST=${MODEL_SERVER_HOST:-inference_model_server}
|
||||
- INDEXING_MODEL_SERVER_HOST=${INDEXING_MODEL_SERVER_HOST:-indexing_model_server}
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
logging:
|
||||
@@ -221,6 +229,22 @@ services:
|
||||
max-file: "6"
|
||||
entrypoint: "/bin/sh -c 'trap exit TERM; while :; do certbot renew; sleep 12h & wait $${!}; done;'"
|
||||
|
||||
minio:
|
||||
image: minio/minio:latest
|
||||
restart: always
|
||||
environment:
|
||||
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
|
||||
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
|
||||
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
|
||||
volumes:
|
||||
- minio_data:/data
|
||||
command: server /data --console-address ":9001"
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
|
||||
interval: 30s
|
||||
timeout: 20s
|
||||
retries: 3
|
||||
|
||||
cache:
|
||||
image: redis:7.4-alpine
|
||||
restart: always
|
||||
@@ -233,6 +257,7 @@ services:
|
||||
volumes:
|
||||
db_volume:
|
||||
vespa_volume:
|
||||
minio_data:
|
||||
# Created by the container itself
|
||||
model_cache_huggingface:
|
||||
indexing_huggingface_model_cache:
|
||||
|
||||
@@ -26,6 +26,10 @@ services:
|
||||
- AWS_REGION_NAME=${AWS_REGION_NAME-}
|
||||
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID-}
|
||||
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY-}
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
# Uncomment the line below to use if IAM_AUTH is true and you are using iam auth for postgres
|
||||
# volumes:
|
||||
# - ./bundle.pem:/app/bundle.pem:ro
|
||||
@@ -67,6 +71,10 @@ services:
|
||||
- AWS_REGION_NAME=${AWS_REGION_NAME-}
|
||||
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID-}
|
||||
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY-}
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
# Uncomment the line below to use if IAM_AUTH is true and you are using iam auth for postgres
|
||||
# volumes:
|
||||
# - ./bundle.pem:/app/bundle.pem:ro
|
||||
@@ -227,6 +235,22 @@ services:
|
||||
env_file:
|
||||
- .env.nginx
|
||||
|
||||
minio:
|
||||
image: minio/minio:latest
|
||||
restart: always
|
||||
environment:
|
||||
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
|
||||
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
|
||||
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
|
||||
volumes:
|
||||
- minio_data:/data
|
||||
command: server /data --console-address ":9001"
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
|
||||
interval: 30s
|
||||
timeout: 20s
|
||||
retries: 3
|
||||
|
||||
cache:
|
||||
image: redis:7.4-alpine
|
||||
restart: always
|
||||
@@ -239,6 +263,7 @@ services:
|
||||
volumes:
|
||||
db_volume:
|
||||
vespa_volume:
|
||||
minio_data:
|
||||
# Created by the container itself
|
||||
model_cache_huggingface:
|
||||
indexing_huggingface_model_cache:
|
||||
|
||||
@@ -27,6 +27,10 @@ services:
|
||||
- AWS_REGION_NAME=${AWS_REGION_NAME-}
|
||||
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID-}
|
||||
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY-}
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
# Uncomment the line below to use if IAM_AUTH is true and you are using iam auth for postgres
|
||||
# volumes:
|
||||
# - ./bundle.pem:/app/bundle.pem:ro
|
||||
@@ -71,6 +75,10 @@ services:
|
||||
- AWS_REGION_NAME=${AWS_REGION_NAME-}
|
||||
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID-}
|
||||
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY-}
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
# Uncomment the line below to use if IAM_AUTH is true and you are using iam auth for postgres
|
||||
# volumes:
|
||||
# - ./bundle.pem:/app/bundle.pem:ro
|
||||
@@ -257,6 +265,22 @@ services:
|
||||
max-file: "6"
|
||||
entrypoint: "/bin/sh -c 'trap exit TERM; while :; do certbot renew; sleep 12h & wait $${!}; done;'"
|
||||
|
||||
minio:
|
||||
image: minio/minio:latest
|
||||
restart: always
|
||||
environment:
|
||||
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
|
||||
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
|
||||
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
|
||||
volumes:
|
||||
- minio_data:/data
|
||||
command: server /data --console-address ":9001"
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
|
||||
interval: 30s
|
||||
timeout: 20s
|
||||
retries: 3
|
||||
|
||||
cache:
|
||||
image: redis:7.4-alpine
|
||||
restart: always
|
||||
@@ -269,6 +293,7 @@ services:
|
||||
volumes:
|
||||
db_volume:
|
||||
vespa_volume:
|
||||
minio_data:
|
||||
# Created by the container itself
|
||||
model_cache_huggingface:
|
||||
indexing_huggingface_model_cache:
|
||||
|
||||
@@ -74,3 +74,13 @@ services:
|
||||
# reservations:
|
||||
# cpus: ${POSTGRES_CPU_RESERVATION}
|
||||
# memory: ${POSTGRES_MEM_RESERVATION}
|
||||
|
||||
# minio:
|
||||
# deploy:
|
||||
# resources:
|
||||
# limits:
|
||||
# cpus: ${MINIO_CPU_LIMIT:-1}
|
||||
# memory: ${MINIO_MEM_LIMIT:-1g}
|
||||
# reservations:
|
||||
# cpus: ${MINIO_CPU_RESERVATION}
|
||||
# memory: ${MINIO_MEM_RESERVATION}
|
||||
|
||||
@@ -26,6 +26,11 @@ services:
|
||||
- MODEL_SERVER_PORT=${MODEL_SERVER_PORT:-}
|
||||
- ENV_SEED_CONFIGURATION=${ENV_SEED_CONFIGURATION:-}
|
||||
- ENABLE_PAID_ENTERPRISE_EDITION_FEATURES=True
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
|
||||
# To enable the LLM for testing, update the value below
|
||||
# NOTE: this is disabled by default since this is a high volume eval that can be costly
|
||||
- DISABLE_GENERATIVE_AI=${DISABLE_GENERATIVE_AI:-true}
|
||||
@@ -60,6 +65,11 @@ services:
|
||||
- INDEXING_MODEL_SERVER_HOST=${INDEXING_MODEL_SERVER_HOST:-indexing_model_server}
|
||||
- ENV_SEED_CONFIGURATION=${ENV_SEED_CONFIGURATION:-}
|
||||
- ENABLE_PAID_ENTERPRISE_EDITION_FEATURES=True
|
||||
# MinIO configuration
|
||||
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
|
||||
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
volumes:
|
||||
@@ -206,6 +216,25 @@ services:
|
||||
/bin/sh -c "dos2unix /etc/nginx/conf.d/run-nginx.sh
|
||||
&& /etc/nginx/conf.d/run-nginx.sh app.conf.template.dev"
|
||||
|
||||
minio:
|
||||
image: minio/minio:latest
|
||||
restart: always
|
||||
ports:
|
||||
- "9004:9000"
|
||||
- "9005:9001"
|
||||
environment:
|
||||
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
|
||||
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
|
||||
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
|
||||
volumes:
|
||||
- minio_data:/data
|
||||
command: server /data --console-address ":9001"
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
|
||||
interval: 30s
|
||||
timeout: 20s
|
||||
retries: 3
|
||||
|
||||
cache:
|
||||
image: redis:7.4-alpine
|
||||
restart: always
|
||||
@@ -229,3 +258,4 @@ volumes:
|
||||
o: bind
|
||||
device: ${DANSWER_VESPA_DATA_DIR:-./vespa_data}
|
||||
log_store: # for logs that we don't want to lose on container restarts
|
||||
minio_data:
|
||||
|
||||
@@ -11,5 +11,8 @@ dependencies:
|
||||
- name: redis
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
version: 20.1.0
|
||||
digest: sha256:28a9f2bb1c85822679a6583e03eb5885a9a0dfb11d03d969b7d783436604c43d
|
||||
generated: "2025-06-03T15:21:29.377034-07:00"
|
||||
- name: minio
|
||||
repository: oci://registry-1.docker.io/bitnamicharts
|
||||
version: 17.0.4
|
||||
digest: sha256:4c938cf9138e4ff6f5ecac5c044324d508ef2b0e1a23ba3f2bc089015cb40ff6
|
||||
generated: "2025-06-16T18:53:19.63168-07:00"
|
||||
|
||||
@@ -34,3 +34,7 @@ dependencies:
|
||||
version: 20.1.0
|
||||
repository: https://charts.bitnami.com/bitnami
|
||||
condition: redis.enabled
|
||||
- name: minio
|
||||
version: 17.0.4
|
||||
repository: oci://registry-1.docker.io/bitnamicharts
|
||||
condition: minio.enabled
|
||||
|
||||
@@ -14,3 +14,6 @@ data:
|
||||
{{- range $key, $value := .Values.configMap }}
|
||||
{{ $key }}: "{{ $value }}"
|
||||
{{- end }}
|
||||
{{- if .Values.minio.enabled }}
|
||||
S3_ENDPOINT_URL: "http://{{ .Release.Name }}-minio:{{ .Values.minio.service.ports.api | default 9000 }}"
|
||||
{{- end }}
|
||||
|
||||
@@ -563,6 +563,26 @@ redis:
|
||||
existingSecret: onyx-secrets
|
||||
existingSecretPasswordKey: redis_password
|
||||
|
||||
minio:
|
||||
enabled: true
|
||||
auth:
|
||||
existingSecret: onyx-secrets
|
||||
rootUserSecretKey: s3_aws_access_key_id
|
||||
rootPasswordSecretKey: s3_aws_secret_access_key
|
||||
defaultBuckets: "onyx-file-store-bucket"
|
||||
persistence:
|
||||
enabled: true
|
||||
size: 30Gi
|
||||
service:
|
||||
type: ClusterIP
|
||||
ports:
|
||||
api: 9000
|
||||
console: 9001
|
||||
consoleService:
|
||||
type: ClusterIP
|
||||
ports:
|
||||
http: 9001
|
||||
|
||||
ingress:
|
||||
enabled: false
|
||||
className: ""
|
||||
@@ -589,6 +609,8 @@ auth:
|
||||
oauth_client_secret: ""
|
||||
oauth_cookie_secret: ""
|
||||
redis_password: "redis_password"
|
||||
s3_aws_access_key_id: "s3_aws_access_key_id"
|
||||
s3_aws_secret_access_key: "s3_aws_secret_access_key"
|
||||
# will be overridden by the existingSecret if set
|
||||
secretName: "onyx-secrets"
|
||||
# set values as strings, they will be base64 encoded
|
||||
@@ -600,6 +622,8 @@ auth:
|
||||
oauth_client_secret: ""
|
||||
oauth_cookie_secret: ""
|
||||
redis_password: "password"
|
||||
s3_aws_access_key_id: "minioadmin"
|
||||
s3_aws_secret_access_key: "minioadmin"
|
||||
|
||||
configMap:
|
||||
# Change this for production uses unless Onyx is only accessible behind VPN
|
||||
@@ -618,6 +642,9 @@ configMap:
|
||||
# SMTP_PASS: ""
|
||||
# 'your-email@company.com' SMTP_USER missing used instead
|
||||
EMAIL_FROM: ""
|
||||
# MinIO/S3 Configuration override
|
||||
S3_ENDPOINT_URL: "" # only used if minio is not enabled
|
||||
S3_FILE_STORE_BUCKET_NAME: ""
|
||||
# Gen AI Settings
|
||||
GEN_AI_MAX_TOKENS: ""
|
||||
QA_TIMEOUT: "60"
|
||||
|
||||
Reference in New Issue
Block a user