Compare commits

...

27 Commits

Author SHA1 Message Date
Weves
3be4d3a8b8 fix test 2025-06-19 18:33:29 -07:00
Weves
5f7bac82ea Fix test 2025-06-19 18:33:29 -07:00
Weves
1fd5a20088 Fix mypy 2025-06-19 18:33:29 -07:00
Weves
d8b05eccd6 Handle multi-tenant case 2025-06-19 18:33:11 -07:00
Weves
9eab1cd009 Rebase 2025-06-19 18:33:11 -07:00
Weves
d01c4ea8bb Fix it 2025-06-19 18:33:11 -07:00
Weves
91aae54d52 Address more greptile comments 2025-06-19 18:33:11 -07:00
Weves
2a370e8b08 Fix it 2025-06-19 18:33:11 -07:00
Weves
d0688d5508 Fix README 2025-06-19 18:33:11 -07:00
Weves
f8c28537ce Harden migration 2025-06-19 18:33:11 -07:00
Weves
089e6b015d Address greptile comments 2025-06-19 18:33:11 -07:00
Weves
359518e3fa Fix test 2025-06-19 18:33:11 -07:00
Weves
69364c9c5a Working helm w/ minio 2025-06-19 18:33:11 -07:00
Weves
4478857011 Add MinIO to other compose files 2025-06-19 18:33:11 -07:00
Weves
ff25452e39 Fix default values 2025-06-19 18:33:11 -07:00
Weves
159277ceb0 Fix mypy 2025-06-19 18:33:11 -07:00
Weves
2ce3e8f1b6 Fix file store tests again 2025-06-19 18:33:11 -07:00
Weves
983fca5bed Fix file store tests again 2025-06-19 18:33:11 -07:00
Weves
43a63267b6 fix file store tests 2025-06-19 18:33:11 -07:00
Weves
869d96a8ee Improve migration to handle downgrades 2025-06-19 18:33:11 -07:00
Weves
de41ff3061 Small fixes 2025-06-19 18:33:11 -07:00
Weves
a2c19e63b4 Fix mypy 2025-06-19 18:33:11 -07:00
Weves
5672527686 Refactor 2025-06-19 18:33:11 -07:00
Weves
131c7614ee Improve migration / add auto-running tests 2025-06-19 18:33:11 -07:00
Weves
4f7f2dcc48 Add S3 tests 2025-06-19 18:33:11 -07:00
Weves
c4ee8d64cf Add non-mocked test 2025-06-19 18:33:11 -07:00
Weves
54f0ad0cd2 Move to an S3-like file store 2025-06-19 18:33:11 -07:00
71 changed files with 2798 additions and 654 deletions

View File

@@ -0,0 +1,65 @@
name: File Store Tests
on:
merge_group:
pull_request:
branches: [main]
env:
# AWS
S3_AWS_ACCESS_KEY_ID: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
S3_AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
# MinIO
S3_ENDPOINT_URL: "http://localhost:9004"
jobs:
file-store-tests:
# See https://runs-on.com/runners/linux/
runs-on: [runs-on, runner=8cpu-linux-x64, "run-id=${{ github.run_id }}"]
env:
PYTHONPATH: ./backend
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: "pip"
cache-dependency-path: |
backend/requirements/default.txt
backend/requirements/dev.txt
- name: Install Dependencies
run: |
python -m pip install --upgrade pip
pip install --retries 5 --timeout 30 -r backend/requirements/default.txt
pip install --retries 5 --timeout 30 -r backend/requirements/dev.txt
playwright install chromium
playwright install-deps chromium
- name: Set up MinIO and Postgres
run: |
cd deployment/docker_compose
docker compose -f docker-compose.dev.yml -p onyx-stack up -d minio relational_db
- name: Run migrations
run: |
cd backend
alembic upgrade head
- name: Run Tests
shell: script -q -e -c "bash --noprofile --norc -eo pipefail {0}"
run: |
py.test \
-n 8 \
--dist loadfile \
--durations=8 \
-o junit_family=xunit2 \
-xv \
--ff \
backend/tests/daily/file_store

View File

@@ -0,0 +1,295 @@
"""modify_file_store_for_external_storage
Revision ID: c9e2cd766c29
Revises: 03bf8be6b53a
Create Date: 2025-06-13 14:02:09.867679
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.orm import Session
from sqlalchemy import text
from typing import cast, Any
from botocore.exceptions import ClientError
from onyx.db._deprecated.pg_file_store import delete_lobj_by_id, read_lobj
from onyx.file_store.file_store import get_s3_file_store
# revision identifiers, used by Alembic.
revision = "c9e2cd766c29"
down_revision = "03bf8be6b53a"
branch_labels = None
depends_on = None
def upgrade() -> None:
try:
# Modify existing file_store table to support external storage
op.rename_table("file_store", "file_record")
# Make lobj_oid nullable (for external storage files)
op.alter_column("file_record", "lobj_oid", nullable=True)
# Add external storage columns with generic names
op.add_column(
"file_record", sa.Column("bucket_name", sa.String(), nullable=True)
)
op.add_column(
"file_record", sa.Column("object_key", sa.String(), nullable=True)
)
# Add timestamps for tracking
op.add_column(
"file_record",
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
)
op.add_column(
"file_record",
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
)
op.alter_column("file_record", "file_name", new_column_name="file_id")
except Exception as e:
if "does not exist" in str(e) or 'relation "file_store" does not exist' in str(
e
):
print(
f"Ran into error - {e}. Likely means we had a partial success in the past, continuing..."
)
else:
raise
print(
"External storage configured - migrating files from PostgreSQL to external storage..."
)
# if we fail midway through this, we'll have a partial success. Running the migration
# again should allow us to continue.
_migrate_files_to_external_storage()
print("File migration completed successfully!")
# Remove lobj_oid column
op.drop_column("file_record", "lobj_oid")
def downgrade() -> None:
"""Revert schema changes and migrate files from external storage back to PostgreSQL large objects."""
print(
"Reverting to PostgreSQL-backed file store migrating files from external storage …"
)
# 1. Ensure `lobj_oid` exists on the current `file_record` table (nullable for now).
op.add_column("file_record", sa.Column("lobj_oid", sa.Integer(), nullable=True))
# 2. Move content from external storage back into PostgreSQL large objects (table is still
# called `file_record` so application code continues to work during the copy).
try:
_migrate_files_to_postgres()
except Exception:
print("Error during downgrade migration, rolling back …")
op.drop_column("file_record", "lobj_oid")
raise
# 3. After migration every row should now have `lobj_oid` populated mark NOT NULL.
op.alter_column("file_record", "lobj_oid", nullable=False)
# 4. Remove columns that are only relevant to external storage.
op.drop_column("file_record", "updated_at")
op.drop_column("file_record", "created_at")
op.drop_column("file_record", "object_key")
op.drop_column("file_record", "bucket_name")
# 5. Rename `file_id` back to `file_name` (still on `file_record`).
op.alter_column("file_record", "file_id", new_column_name="file_name")
# 6. Finally, rename the table back to its original name expected by the legacy codebase.
op.rename_table("file_record", "file_store")
print(
"Downgrade migration completed files are now stored inside PostgreSQL again."
)
# -----------------------------------------------------------------------------
# Helper: migrate from external storage (S3/MinIO) back into PostgreSQL large objects
def _migrate_files_to_postgres() -> None:
"""Move any files whose content lives in external S3-compatible storage back into PostgreSQL.
The logic mirrors *inverse* of `_migrate_files_to_external_storage` used on upgrade.
"""
# Obtain DB session from Alembic context
bind = op.get_bind()
session = Session(bind=bind)
# Fetch rows that have external storage pointers (bucket/object_key not NULL)
result = session.execute(
text(
"SELECT file_id, bucket_name, object_key FROM file_record "
"WHERE bucket_name IS NOT NULL AND object_key IS NOT NULL"
)
)
files_to_migrate = [row[0] for row in result.fetchall()]
total_files = len(files_to_migrate)
if total_files == 0:
print("No files found in external storage to migrate back to PostgreSQL.")
return
print(f"Found {total_files} files to migrate back to PostgreSQL large objects.")
migrated_count = 0
# only create external store if we have files to migrate. This line
# makes it so we need to have S3/MinIO configured to run this migration.
external_store = get_s3_file_store(db_session=session)
for i, file_id in enumerate(files_to_migrate, 1):
print(f"Migrating file {i}/{total_files}: {file_id}")
# Read file content from external storage (always binary)
try:
file_io = external_store.read_file(
file_id=file_id, mode="b", use_tempfile=True
)
file_io.seek(0)
# Import lazily to avoid circular deps at Alembic runtime
from onyx.db._deprecated.pg_file_store import (
create_populate_lobj,
) # noqa: E402
# Create new Postgres large object and populate it
lobj_oid = create_populate_lobj(content=file_io, db_session=session)
# Update DB row: set lobj_oid, clear bucket/object_key
session.execute(
text(
"UPDATE file_record SET lobj_oid = :lobj_oid, bucket_name = NULL, "
"object_key = NULL WHERE file_id = :file_id"
),
{"lobj_oid": lobj_oid, "file_id": file_id},
)
except ClientError as e:
if "NoSuchKey" in str(e):
print(
f"File {file_id} not found in external storage. Deleting from database."
)
session.execute(
text("DELETE FROM file_record WHERE file_id = :file_id"),
{"file_id": file_id},
)
else:
raise
migrated_count += 1
print(f"✓ Successfully migrated file {i}/{total_files}: {file_id}")
# Flush the SQLAlchemy session so statements are sent to the DB, but **do not**
# commit the transaction. The surrounding Alembic migration will commit once
# the *entire* downgrade succeeds. This keeps the whole downgrade atomic and
# avoids leaving the database in a partially-migrated state if a later schema
# operation fails.
session.flush()
print(
f"Migration back to PostgreSQL completed: {migrated_count} files staged for commit."
)
def _migrate_files_to_external_storage() -> None:
"""Migrate files from PostgreSQL large objects to external storage"""
# Get database session
bind = op.get_bind()
session = Session(bind=bind)
external_store = get_s3_file_store(db_session=session)
# Find all files currently stored in PostgreSQL (lobj_oid is not null)
result = session.execute(
text(
"SELECT file_id FROM file_record WHERE lobj_oid IS NOT NULL "
"AND bucket_name IS NULL AND object_key IS NULL"
)
)
files_to_migrate = [row[0] for row in result.fetchall()]
total_files = len(files_to_migrate)
if total_files == 0:
print("No files found in PostgreSQL storage to migrate.")
return
print(f"Found {total_files} files to migrate from PostgreSQL to external storage.")
migrated_count = 0
for i, file_id in enumerate(files_to_migrate, 1):
print(f"Migrating file {i}/{total_files}: {file_id}")
# Read file record to get metadata
file_record = session.execute(
text("SELECT * FROM file_record WHERE file_id = :file_id"),
{"file_id": file_id},
).fetchone()
if file_record is None:
print(f"File {file_id} not found in PostgreSQL storage.")
continue
lobj_id = cast(int, file_record.lobj_oid) # type: ignore
file_metadata = cast(Any, file_record.file_metadata) # type: ignore
# Read file content from PostgreSQL
file_content = read_lobj(
lobj_id, db_session=session, mode="b", use_tempfile=True
)
# Handle file_metadata type conversion
file_metadata = None
if file_metadata is not None:
if isinstance(file_metadata, dict):
file_metadata = file_metadata
else:
# Convert other types to dict if possible, otherwise None
try:
file_metadata = dict(file_record.file_metadata) # type: ignore
except (TypeError, ValueError):
file_metadata = None
# Save to external storage (this will handle the database record update and cleanup)
# NOTE: this WILL .commit() the transaction.
external_store.save_file(
file_id=file_id,
content=file_content,
display_name=file_record.display_name,
file_origin=file_record.file_origin,
file_type=file_record.file_type,
file_metadata=file_metadata,
)
delete_lobj_by_id(lobj_id, db_session=session)
migrated_count += 1
print(f"✓ Successfully migrated file {i}/{total_files}: {file_id}")
# See note above flush but do **not** commit so the outer Alembic transaction
# controls atomicity.
session.flush()
print(
f"Migration completed: {migrated_count} files staged for commit to external storage."
)

View File

@@ -86,7 +86,6 @@ def export_query_history_task(
try:
stream.seek(0)
get_default_file_store(db_session).save_file(
file_name=report_name,
content=stream,
display_name=report_name,
file_origin=FileOrigin.QUERY_HISTORY_CSV,
@@ -96,6 +95,7 @@ def export_query_history_task(
"end": end.isoformat(),
"start_time": start_time.isoformat(),
},
file_id=report_name,
)
delete_task_with_id(

View File

@@ -115,11 +115,24 @@ def get_all_usage_reports(db_session: Session) -> list[UsageReportMetadata]:
def get_usage_report_data(
db_session: Session,
report_name: str,
report_display_name: str,
) -> IO:
"""
Get the usage report data from the file store.
Args:
db_session: The database session.
report_display_name: The display name of the usage report. Also assumes
that the file is stored with this as the ID in the file store.
Returns:
The usage report data.
"""
file_store = get_default_file_store(db_session)
# usage report may be very large, so don't load it all into memory
return file_store.read_file(file_name=report_name, mode="b", use_tempfile=True)
return file_store.read_file(
file_id=report_display_name, mode="b", use_tempfile=True
)
def write_usage_report(

View File

@@ -28,7 +28,7 @@ from onyx.auth.users import get_user_manager
from onyx.auth.users import UserManager
from onyx.db.engine import get_session
from onyx.db.models import User
from onyx.file_store.file_store import PostgresBackedFileStore
from onyx.file_store.file_store import get_default_file_store
from onyx.server.utils import BasicAuthenticationError
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
@@ -142,11 +142,12 @@ def put_logo(
def fetch_logo_helper(db_session: Session) -> Response:
try:
file_store = PostgresBackedFileStore(db_session)
file_store = get_default_file_store(db_session)
onyx_file = file_store.get_file_with_mime_type(get_logo_filename())
if not onyx_file:
raise ValueError("get_onyx_file returned None!")
except Exception:
logger.exception("Faield to fetch logo file")
raise HTTPException(
status_code=404,
detail="No logo file found",
@@ -157,7 +158,7 @@ def fetch_logo_helper(db_session: Session) -> Response:
def fetch_logotype_helper(db_session: Session) -> Response:
try:
file_store = PostgresBackedFileStore(db_session)
file_store = get_default_file_store(db_session)
onyx_file = file_store.get_file_with_mime_type(get_logotype_filename())
if not onyx_file:
raise ValueError("get_onyx_file returned None!")

View File

@@ -131,11 +131,11 @@ def upload_logo(
file_store = get_default_file_store(db_session)
file_store.save_file(
file_name=_LOGOTYPE_FILENAME if is_logotype else _LOGO_FILENAME,
content=content,
display_name=display_name,
file_origin=FileOrigin.OTHER,
file_type=file_type,
file_id=_LOGOTYPE_FILENAME if is_logotype else _LOGO_FILENAME,
)
return True

View File

@@ -39,9 +39,9 @@ from onyx.db.chat import get_chat_session_by_id
from onyx.db.chat import get_chat_sessions_by_user
from onyx.db.engine import get_session
from onyx.db.enums import TaskStatus
from onyx.db.file_record import get_query_history_export_files
from onyx.db.models import ChatSession
from onyx.db.models import User
from onyx.db.pg_file_store import get_query_history_export_files
from onyx.db.tasks import get_task_with_id
from onyx.db.tasks import register_task
from onyx.file_store.file_store import get_default_file_store
@@ -360,7 +360,7 @@ def get_query_history_export_status(
report_name = construct_query_history_report_name(request_id)
has_file = file_store.has_file(
file_name=report_name,
file_id=report_name,
file_origin=FileOrigin.QUERY_HISTORY_CSV,
file_type=FileType.CSV,
)
@@ -385,7 +385,7 @@ def download_query_history_csv(
report_name = construct_query_history_report_name(request_id)
file_store = get_default_file_store(db_session)
has_file = file_store.has_file(
file_name=report_name,
file_id=report_name,
file_origin=FileOrigin.QUERY_HISTORY_CSV,
file_type=FileType.CSV,
)

View File

@@ -12,7 +12,7 @@ from onyx.configs.constants import SessionType
from onyx.db.enums import TaskStatus
from onyx.db.models import ChatMessage
from onyx.db.models import ChatSession
from onyx.db.models import PGFileStore
from onyx.db.models import FileRecord
from onyx.db.models import TaskQueueState
@@ -254,7 +254,7 @@ class QueryHistoryExport(BaseModel):
@classmethod
def from_file(
cls,
file: PGFileStore,
file: FileRecord,
) -> "QueryHistoryExport":
if not file.file_metadata or not isinstance(file.file_metadata, dict):
raise RuntimeError(
@@ -262,7 +262,7 @@ class QueryHistoryExport(BaseModel):
)
metadata = QueryHistoryFileMetadata.model_validate(dict(file.file_metadata))
task_id = extract_task_id_from_query_history_report_name(file.file_name)
task_id = extract_task_id_from_query_history_report_name(file.file_id)
return cls(
task_id=task_id,

View File

@@ -62,17 +62,16 @@ def generate_chat_messages_report(
]
)
# after writing seek to begining of buffer
# after writing seek to beginning of buffer
temp_file.seek(0)
file_store.save_file(
file_name=file_name,
file_id = file_store.save_file(
content=temp_file,
display_name=file_name,
file_origin=FileOrigin.OTHER,
file_type="text/csv",
)
return file_name
return file_id
def generate_user_report(
@@ -97,15 +96,14 @@ def generate_user_report(
csvwriter.writerow([user_skeleton.user_id, user_skeleton.is_active])
temp_file.seek(0)
file_store.save_file(
file_name=file_name,
file_id = file_store.save_file(
content=temp_file,
display_name=file_name,
file_origin=FileOrigin.OTHER,
file_type="text/csv",
)
return file_name
return file_id
def create_new_usage_report(
@@ -116,16 +114,16 @@ def create_new_usage_report(
report_id = str(uuid.uuid4())
file_store = get_default_file_store(db_session)
messages_filename = generate_chat_messages_report(
messages_file_id = generate_chat_messages_report(
db_session, file_store, report_id, period
)
users_filename = generate_user_report(db_session, file_store, report_id)
users_file_id = generate_user_report(db_session, file_store, report_id)
with tempfile.SpooledTemporaryFile(max_size=MAX_IN_MEMORY_SIZE) as zip_buffer:
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED) as zip_file:
# write messages
chat_messages_tmpfile = file_store.read_file(
messages_filename, mode="b", use_tempfile=True
messages_file_id, mode="b", use_tempfile=True
)
zip_file.writestr(
"chat_messages.csv",
@@ -134,7 +132,7 @@ def create_new_usage_report(
# write users
users_tmpfile = file_store.read_file(
users_filename, mode="b", use_tempfile=True
users_file_id, mode="b", use_tempfile=True
)
zip_file.writestr("users.csv", users_tmpfile.read())
@@ -146,11 +144,11 @@ def create_new_usage_report(
f"_{report_id}_usage_report.zip"
)
file_store.save_file(
file_name=report_name,
content=zip_buffer,
display_name=report_name,
file_origin=FileOrigin.GENERATED_REPORT,
file_type="application/zip",
file_id=report_name,
)
# add report after zip file is written

View File

@@ -35,11 +35,11 @@ def save_checkpoint(
file_store = get_default_file_store(db_session)
file_store.save_file(
file_name=checkpoint_pointer,
content=BytesIO(checkpoint.model_dump_json().encode()),
display_name=checkpoint_pointer,
file_origin=FileOrigin.INDEXING_CHECKPOINT,
file_type="application/json",
file_id=checkpoint_pointer,
)
index_attempt = get_index_attempt(db_session, index_attempt_id)

View File

@@ -781,3 +781,16 @@ DB_READONLY_USER: str = os.environ.get("DB_READONLY_USER", "db_readonly_user")
DB_READONLY_PASSWORD: str = urllib.parse.quote_plus(
os.environ.get("DB_READONLY_PASSWORD") or "password"
)
# File Store Configuration
S3_FILE_STORE_BUCKET_NAME = (
os.environ.get("S3_FILE_STORE_BUCKET_NAME") or "onyx-file-store-bucket"
)
S3_FILE_STORE_PREFIX = os.environ.get("S3_FILE_STORE_PREFIX") or "onyx-files"
# S3_ENDPOINT_URL is for MinIO and other S3-compatible storage. Leave blank for AWS S3.
S3_ENDPOINT_URL = os.environ.get("S3_ENDPOINT_URL")
S3_VERIFY_SSL = os.environ.get("S3_VERIFY_SSL", "").lower() == "true"
# S3/MinIO Access Keys
S3_AWS_ACCESS_KEY_ID = os.environ.get("S3_AWS_ACCESS_KEY_ID")
S3_AWS_SECRET_ACCESS_KEY = os.environ.get("S3_AWS_SECRET_ACCESS_KEY")

View File

@@ -281,7 +281,7 @@ class BlobStorageConnector(LoadConnector, PollConnector):
image_section, _ = store_image_and_create_section(
db_session=db_session,
image_data=downloaded_file,
file_name=f"{self.bucket_type}_{self.bucket_name}_{key.replace('/', '_')}",
file_id=f"{self.bucket_type}_{self.bucket_name}_{key.replace('/', '_')}",
display_name=file_name,
link=link,
file_origin=FileOrigin.CONNECTOR,
@@ -449,10 +449,8 @@ if __name__ == "__main__":
print(f" - Link: {section.link}")
if isinstance(section, TextSection) and section.text is not None:
print(f" - Text: {section.text[:100]}...")
elif (
hasattr(section, "image_file_name") and section.image_file_name
):
print(f" - Image: {section.image_file_name}")
elif hasattr(section, "image_file_id") and section.image_file_id:
print(f" - Image: {section.image_file_id}")
else:
print("Error: Unknown section type")
print("---")

View File

@@ -325,7 +325,7 @@ class ConfluenceConnector(
# Create an ImageSection for image attachments
image_section = ImageSection(
link=f"{page_url}#attachment-{attachment['id']}",
image_file_name=result.file_name,
image_file_id=result.file_name,
)
sections.append(image_section)
else:
@@ -440,7 +440,7 @@ class ConfluenceConnector(
doc.sections.append(
ImageSection(
link=object_url,
image_file_name=file_storage_name,
image_file_id=file_storage_name,
)
)
except Exception as e:

View File

@@ -1,4 +1,3 @@
import io
import math
import time
from collections.abc import Callable
@@ -18,30 +17,23 @@ from urllib.parse import urlparse
import requests
from pydantic import BaseModel
from sqlalchemy.orm import Session
from onyx.configs.app_configs import (
CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD,
)
from onyx.configs.app_configs import CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD
from onyx.configs.constants import FileOrigin
from onyx.db.engine import get_session_with_current_tenant
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_processing.extract_file_text import is_accepted_file_ext
from onyx.file_processing.extract_file_text import OnyxExtensionType
from onyx.file_processing.file_validation import is_valid_image_type
from onyx.file_processing.image_utils import store_image_and_create_section
from onyx.utils.logger import setup_logger
if TYPE_CHECKING:
from onyx.connectors.confluence.onyx_confluence import OnyxConfluence
from onyx.db.engine import get_session_with_current_tenant
from onyx.db.models import PGFileStore
from onyx.db.pg_file_store import create_populate_lobj
from onyx.db.pg_file_store import save_bytes_to_pgfilestore
from onyx.db.pg_file_store import upsert_pgfilestore
from onyx.file_processing.extract_file_text import (
OnyxExtensionType,
extract_file_text,
is_accepted_file_ext,
)
from onyx.file_processing.file_validation import is_valid_image_type
from onyx.file_processing.image_utils import store_image_and_create_section
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -80,7 +72,7 @@ class AttachmentProcessingResult(BaseModel):
"""
A container for results after processing a Confluence attachment.
'text' is the textual content of the attachment.
'file_name' is the final file name used in PGFileStore to store the content.
'file_name' is the final file name used in FileStore to store the content.
'error' holds an exception or string if something failed.
"""
@@ -236,7 +228,7 @@ def _process_image_attachment(
section, file_name = store_image_and_create_section(
db_session=db_session,
image_data=raw_bytes,
file_name=Path(attachment["id"]).name,
file_id=Path(attachment["id"]).name,
display_name=attachment["title"],
media_type=media_type,
file_origin=FileOrigin.CONNECTOR,
@@ -251,59 +243,6 @@ def _process_image_attachment(
return AttachmentProcessingResult(text=None, file_name=None, error=msg)
def _process_text_attachment(
attachment: dict[str, Any],
raw_bytes: bytes,
media_type: str,
) -> AttachmentProcessingResult:
"""Process a text-based attachment by extracting its content."""
try:
extracted_text = extract_file_text(
io.BytesIO(raw_bytes),
file_name=attachment["title"],
break_on_unprocessable=False,
)
except Exception as e:
msg = f"Failed to extract text for '{attachment['title']}': {e}"
logger.error(msg, exc_info=e)
return AttachmentProcessingResult(text=None, file_name=None, error=msg)
# Check length constraints
if extracted_text is None or len(extracted_text) == 0:
msg = f"No text extracted for {attachment['title']}"
logger.warning(msg)
return AttachmentProcessingResult(text=None, file_name=None, error=msg)
if len(extracted_text) > CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD:
msg = (
f"Skipping attachment {attachment['title']} due to char count "
f"({len(extracted_text)} > {CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD})"
)
logger.warning(msg)
return AttachmentProcessingResult(text=None, file_name=None, error=msg)
# Save the attachment
try:
with get_session_with_current_tenant() as db_session:
saved_record = save_bytes_to_pgfilestore(
db_session=db_session,
raw_bytes=raw_bytes,
media_type=media_type,
identifier=attachment["id"],
display_name=attachment["title"],
)
except Exception as e:
msg = f"Failed to save attachment '{attachment['title']}' to PG: {e}"
logger.error(msg, exc_info=e)
return AttachmentProcessingResult(
text=extracted_text, file_name=None, error=msg
)
return AttachmentProcessingResult(
text=extracted_text, file_name=saved_record.file_name, error=None
)
def convert_attachment_to_content(
confluence_client: "OnyxConfluence",
attachment: dict[str, Any],
@@ -540,41 +479,3 @@ def update_param_in_path(path: str, param: str, value: str) -> str:
+ "?"
+ "&".join(f"{k}={quote(v[0])}" for k, v in query_params.items())
)
def attachment_to_file_record(
confluence_client: "OnyxConfluence",
attachment: dict[str, Any],
db_session: Session,
) -> tuple[PGFileStore, bytes]:
"""Save an attachment to the file store and return the file record."""
download_link = _attachment_to_download_link(confluence_client, attachment)
image_data = confluence_client.get(
download_link, absolute=True, not_json_response=True
)
file_type = attachment.get("metadata", {}).get(
"mediaType", "application/octet-stream"
)
# Save image to file store
file_name = f"confluence_attachment_{attachment['id']}"
lobj_oid = create_populate_lobj(BytesIO(image_data), db_session)
pgfilestore = upsert_pgfilestore(
file_name=file_name,
display_name=attachment["title"],
file_origin=FileOrigin.OTHER,
file_type=file_type,
lobj_oid=lobj_oid,
db_session=db_session,
commit=True,
)
return pgfilestore, image_data
def _attachment_to_download_link(
confluence_client: "OnyxConfluence", attachment: dict[str, Any]
) -> str:
"""Extracts the download link to images."""
return confluence_client.url + attachment["_links"]["download"]

View File

@@ -18,7 +18,6 @@ from onyx.connectors.models import Document
from onyx.connectors.models import ImageSection
from onyx.connectors.models import TextSection
from onyx.db.engine import get_session_with_current_tenant
from onyx.db.pg_file_store import get_pgfilestore_by_file_name
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_processing.extract_file_text import get_file_ext
from onyx.file_processing.extract_file_text import is_accepted_file_ext
@@ -30,24 +29,6 @@ from onyx.utils.logger import setup_logger
logger = setup_logger()
def _read_file_from_filestore(
file_name: str,
db_session: Session,
) -> IO | None:
"""
Gets the content of a file from Postgres.
"""
extension = get_file_ext(file_name)
# Read file from Postgres store
file_content = get_default_file_store(db_session).read_file(file_name, mode="b")
if is_accepted_file_ext(extension, OnyxExtensionType.All):
return file_content
logger.warning(f"Skipping file '{file_name}' with extension '{extension}'")
return None
def _create_image_section(
image_data: bytes,
db_session: Session,
@@ -58,7 +39,7 @@ def _create_image_section(
) -> tuple[ImageSection, str | None]:
"""
Creates an ImageSection for an image file or embedded image.
Stores the image in PGFileStore but does not generate a summary.
Stores the image in FileStore but does not generate a summary.
Args:
image_data: Raw image bytes
@@ -71,14 +52,14 @@ def _create_image_section(
Tuple of (ImageSection, stored_file_name or None)
"""
# Create a unique identifier for the image
file_name = f"{parent_file_name}_embedded_{idx}" if idx > 0 else parent_file_name
file_id = f"{parent_file_name}_embedded_{idx}" if idx > 0 else parent_file_name
# Store the image and create a section
try:
section, stored_file_name = store_image_and_create_section(
db_session=db_session,
image_data=image_data,
file_name=file_name,
file_id=file_id,
display_name=display_name,
link=link,
file_origin=FileOrigin.CONNECTOR,
@@ -90,6 +71,7 @@ def _create_image_section(
def _process_file(
file_id: str,
file_name: str,
file: IO[Any],
metadata: dict[str, Any] | None,
@@ -107,12 +89,6 @@ def _process_file(
# Get file extension and determine file type
extension = get_file_ext(file_name)
# Fetch the DB record so we know the ID for internal URL
pg_record = get_pgfilestore_by_file_name(file_name=file_name, db_session=db_session)
if not pg_record:
logger.warning(f"No file record found for '{file_name}' in PG; skipping.")
return []
if not is_accepted_file_ext(extension, OnyxExtensionType.All):
logger.warning(
f"Skipping file '{file_name}' with unrecognized extension '{extension}'"
@@ -151,7 +127,6 @@ def _process_file(
for k, v in metadata.items()
if k
not in [
"document_id",
"time_updated",
"doc_updated_at",
"link",
@@ -171,7 +146,7 @@ def _process_file(
DocumentSource(source_type_str) if source_type_str else DocumentSource.FILE
)
doc_id = metadata.get("document_id") or f"FILE_CONNECTOR__{file_name}"
doc_id = f"FILE_CONNECTOR__{file_id}"
title = metadata.get("title") or file_display_name
# 1) If the file itself is an image, handle that scenario quickly
@@ -187,7 +162,7 @@ def _process_file(
section, _ = _create_image_section(
image_data=image_data,
db_session=db_session,
parent_file_name=pg_record.file_name,
parent_file_name=file_id,
display_name=title,
)
@@ -234,21 +209,25 @@ def _process_file(
TextSection(link=link_in_meta, text=extraction_result.text_content.strip())
)
# Then any extracted images from docx, etc.
# Then any extracted images from docx, PDFs, etc.
for idx, (img_data, img_name) in enumerate(
extraction_result.embedded_images, start=1
):
# Store each embedded image as a separate file in PGFileStore
# Store each embedded image as a separate file in FileStore
# and create a section with the image reference
try:
image_section, _ = _create_image_section(
image_section, stored_file_name = _create_image_section(
image_data=img_data,
db_session=db_session,
parent_file_name=pg_record.file_name,
parent_file_name=file_id,
display_name=f"{title} - image {idx}",
idx=idx,
)
sections.append(image_section)
logger.debug(
f"Created ImageSection for embedded image {idx} "
f"in {file_name}, stored as: {stored_file_name}"
)
except Exception as e:
logger.warning(
f"Failed to process embedded image {idx} in {file_name}: {e}"
@@ -304,23 +283,27 @@ class LocalFileConnector(LoadConnector):
documents: list[Document] = []
with get_session_with_current_tenant() as db_session:
for file_path in self.file_locations:
for file_id in self.file_locations:
current_datetime = datetime.now(timezone.utc)
file_io = _read_file_from_filestore(
file_name=file_path,
db_session=db_session,
)
if not file_io:
file_store = get_default_file_store(db_session)
file_record = file_store.read_file_record(file_id=file_id)
if not file_record:
# typically an unsupported extension
logger.warning(
f"No file record found for '{file_id}' in PG; skipping."
)
continue
metadata = self._get_file_metadata(file_path)
file_io = file_store.read_file(file_id=file_id, mode="b")
metadata = self._get_file_metadata(file_record.display_name)
metadata["time_updated"] = metadata.get(
"time_updated", current_datetime
)
new_docs = _process_file(
file_name=file_path,
file_id=file_id,
file_name=file_record.display_name,
file=file_io,
metadata=metadata,
pdf_pass=self.pdf_pass,

View File

@@ -187,7 +187,7 @@ def _download_and_extract_sections_basic(
section, embedded_id = store_image_and_create_section(
db_session=db_session,
image_data=response_call(),
file_name=file_id,
file_id=file_id,
display_name=file_name,
media_type=mime_type,
file_origin=FileOrigin.CONNECTOR,
@@ -211,7 +211,7 @@ def _download_and_extract_sections_basic(
section, embedded_id = store_image_and_create_section(
db_session=db_session,
image_data=img_data,
file_name=f"{file_id}_img_{idx}",
file_id=f"{file_id}_img_{idx}",
display_name=img_name or f"{file_name} - image {idx}",
file_origin=FileOrigin.CONNECTOR,
)

View File

@@ -34,7 +34,7 @@ class Section(BaseModel):
link: str | None = None
text: str | None = None
image_file_name: str | None = None
image_file_id: str | None = None
class TextSection(Section):
@@ -49,10 +49,10 @@ class TextSection(Section):
class ImageSection(Section):
"""Section containing an image reference"""
image_file_name: str
image_file_id: str
def __sizeof__(self) -> int:
return sys.getsizeof(self.image_file_name) + sys.getsizeof(self.link)
return sys.getsizeof(self.image_file_id) + sys.getsizeof(self.link)
class BasicExpertInfo(BaseModel):

View File

@@ -56,7 +56,7 @@ def update_image_sections_with_query(
chunks_with_images = []
for section in sections:
for chunk in section.chunks:
if chunk.image_file_name:
if chunk.image_file_id:
chunks_with_images.append(chunk)
if not chunks_with_images:
@@ -68,19 +68,19 @@ def update_image_sections_with_query(
def process_image_chunk(chunk: InferenceChunk) -> tuple[str, str]:
try:
logger.debug(
f"Processing image chunk with ID: {chunk.unique_id}, image: {chunk.image_file_name}"
f"Processing image chunk with ID: {chunk.unique_id}, image: {chunk.image_file_id}"
)
with get_session_with_current_tenant() as db_session:
file_record = get_default_file_store(db_session).read_file(
cast(str, chunk.image_file_name), mode="b"
cast(str, chunk.image_file_id), mode="b"
)
if not file_record:
logger.error(f"Image file not found: {chunk.image_file_name}")
logger.error(f"Image file not found: {chunk.image_file_id}")
raise Exception("File not found")
file_content = file_record.read()
image_base64 = base64.b64encode(file_content).decode()
logger.debug(
f"Successfully loaded image data for {chunk.image_file_name}"
f"Successfully loaded image data for {chunk.image_file_id}"
)
messages: list[BaseMessage] = [
@@ -114,7 +114,7 @@ def update_image_sections_with_query(
except Exception:
logger.exception(
f"Error updating image section with query source image url: {chunk.image_file_name}"
f"Error updating image section with query source image url: {chunk.image_file_id}"
)
return chunk.unique_id, "Error analyzing image."

View File

@@ -0,0 +1,142 @@
"""Kept around since it's used in the migration to move to S3/MinIO"""
import tempfile
from io import BytesIO
from typing import IO
from psycopg2.extensions import connection
from sqlalchemy import text # NEW: for SQL large-object helpers
from sqlalchemy.orm import Session
from onyx.file_store.constants import MAX_IN_MEMORY_SIZE
from onyx.file_store.constants import STANDARD_CHUNK_SIZE
from onyx.utils.logger import setup_logger
logger = setup_logger()
def get_pg_conn_from_session(db_session: Session) -> connection:
return db_session.connection().connection.connection # type: ignore
def create_populate_lobj(
content: IO,
db_session: Session,
) -> int:
"""Create a PostgreSQL large object from *content* and return its OID.
Preferred approach is to use the psycopg2 ``lobject`` API, but if that is
unavailable (e.g. when the underlying connection is an asyncpg adapter)
we fall back to PostgreSQL helper functions such as ``lo_from_bytea``.
NOTE: this function intentionally *does not* commit the surrounding
transaction that is handled by the caller so all work stays atomic.
"""
pg_conn = None
try:
pg_conn = get_pg_conn_from_session(db_session)
# ``AsyncAdapt_asyncpg_connection`` (asyncpg) has no ``lobject``
if not hasattr(pg_conn, "lobject"):
raise AttributeError # will be handled by fallback below
large_object = pg_conn.lobject()
# write in multiple chunks to avoid loading the whole file into memory
while True:
chunk = content.read(STANDARD_CHUNK_SIZE)
if not chunk:
break
large_object.write(chunk)
large_object.close()
return large_object.oid
except AttributeError:
# Fall back to SQL helper functions read the full content into memory
# (acceptable for the limited number and size of files handled during
# migrations). ``lo_from_bytea`` returns the new OID.
byte_data = content.read()
result = db_session.execute(
text("SELECT lo_from_bytea(0, :data) AS oid"),
{"data": byte_data},
)
# ``scalar_one`` is 2.0-style; ``scalar`` works on both 1.4/2.0.
lobj_oid = result.scalar()
if lobj_oid is None:
raise RuntimeError("Failed to create large object")
return int(lobj_oid)
def read_lobj(
lobj_oid: int,
db_session: Session,
mode: str | None = None,
use_tempfile: bool = False,
) -> IO:
"""Read a PostgreSQL large object identified by *lobj_oid*.
Attempts to use the native ``lobject`` API first; if unavailable falls back
to ``lo_get`` which returns the large object's contents as *bytea*.
"""
pg_conn = None
try:
pg_conn = get_pg_conn_from_session(db_session)
if not hasattr(pg_conn, "lobject"):
raise AttributeError
# Ensure binary mode by default
if mode is None:
mode = "rb"
large_object = (
pg_conn.lobject(lobj_oid, mode=mode) if mode else pg_conn.lobject(lobj_oid)
)
if use_tempfile:
temp_file = tempfile.SpooledTemporaryFile(max_size=MAX_IN_MEMORY_SIZE)
while True:
chunk = large_object.read(STANDARD_CHUNK_SIZE)
if not chunk:
break
temp_file.write(chunk)
temp_file.seek(0)
return temp_file
else:
return BytesIO(large_object.read())
except AttributeError:
# Fallback path using ``lo_get``
result = db_session.execute(
text("SELECT lo_get(:oid) AS data"),
{"oid": lobj_oid},
)
byte_data = result.scalar()
if byte_data is None:
raise RuntimeError("Failed to read large object")
if use_tempfile:
temp_file = tempfile.SpooledTemporaryFile(max_size=MAX_IN_MEMORY_SIZE)
temp_file.write(byte_data)
temp_file.seek(0)
return temp_file
return BytesIO(byte_data)
def delete_lobj_by_id(
lobj_oid: int,
db_session: Session,
) -> None:
"""Remove a large object by OID, regardless of driver implementation."""
try:
pg_conn = get_pg_conn_from_session(db_session)
if hasattr(pg_conn, "lobject"):
pg_conn.lobject(lobj_oid).unlink()
return
raise AttributeError
except AttributeError:
# Fallback for drivers without ``lobject`` support
db_session.execute(text("SELECT lo_unlink(:oid)"), {"oid": lobj_oid})
# No explicit result expected

View File

@@ -47,7 +47,7 @@ from onyx.db.models import ToolCall
from onyx.db.models import User
from onyx.db.models import UserFile
from onyx.db.persona import get_best_persona_id_for_user
from onyx.db.pg_file_store import delete_lobj_by_name
from onyx.file_store.file_store import get_default_file_store
from onyx.file_store.models import FileDescriptor
from onyx.file_store.models import InMemoryChatFile
from onyx.llm.override_models import LLMOverride
@@ -228,11 +228,10 @@ def delete_messages_and_files_from_chat_session(
for id, files in messages_with_files:
delete_tool_call_for_message_id(message_id=id, db_session=db_session)
delete_search_doc_message_relationship(message_id=id, db_session=db_session)
for file_info in files or {}:
lobj_name = file_info.get("id")
if lobj_name:
logger.info(f"Deleting file with name: {lobj_name}")
delete_lobj_by_name(lobj_name, db_session)
file_store = get_default_file_store(db_session)
for file_info in files or []:
file_store.delete_file(file_id=file_info.get("id"))
# Delete ChatMessage records - CASCADE constraints will automatically handle:
# - AgentSubQuery records (via AgentSubQuestion)

View File

@@ -0,0 +1,85 @@
from sqlalchemy import and_
from sqlalchemy import select
from sqlalchemy.orm import Session
from onyx.background.task_utils import QUERY_REPORT_NAME_PREFIX
from onyx.configs.constants import FileOrigin
from onyx.configs.constants import FileType
from onyx.db.models import FileRecord
def get_query_history_export_files(
db_session: Session,
) -> list[FileRecord]:
return list(
db_session.scalars(
select(FileRecord).where(
and_(
FileRecord.file_id.like(f"{QUERY_REPORT_NAME_PREFIX}-%"),
FileRecord.file_type == FileType.CSV,
FileRecord.file_origin == FileOrigin.QUERY_HISTORY_CSV,
)
)
)
)
def get_filerecord_by_file_id_optional(
file_id: str,
db_session: Session,
) -> FileRecord | None:
return db_session.query(FileRecord).filter_by(file_id=file_id).first()
def get_filerecord_by_file_id(
file_id: str,
db_session: Session,
) -> FileRecord:
filestore = db_session.query(FileRecord).filter_by(file_id=file_id).first()
if not filestore:
raise RuntimeError(f"File by id {file_id} does not exist or was deleted")
return filestore
def delete_filerecord_by_file_id(
file_id: str,
db_session: Session,
) -> None:
db_session.query(FileRecord).filter_by(file_id=file_id).delete()
def upsert_filerecord(
file_id: str,
display_name: str,
file_origin: FileOrigin,
file_type: str,
bucket_name: str,
object_key: str,
db_session: Session,
file_metadata: dict | None = None,
) -> FileRecord:
"""Create or update a file store record for external storage (S3, MinIO, etc.)"""
filestore = db_session.query(FileRecord).filter_by(file_id=file_id).first()
if filestore:
filestore.display_name = display_name
filestore.file_origin = file_origin
filestore.file_type = file_type
filestore.file_metadata = file_metadata
filestore.bucket_name = bucket_name
filestore.object_key = object_key
else:
filestore = FileRecord(
file_id=file_id,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_metadata=file_metadata,
bucket_name=bucket_name,
object_key=object_key,
)
db_session.add(filestore)
return filestore

View File

@@ -2689,15 +2689,28 @@ class KVStore(Base):
encrypted_value: Mapped[JSON_ro] = mapped_column(EncryptedJson(), nullable=True)
class PGFileStore(Base):
__tablename__ = "file_store"
class FileRecord(Base):
__tablename__ = "file_record"
# Internal file ID, must be unique across all files.
file_id: Mapped[str] = mapped_column(String, primary_key=True)
file_name: Mapped[str] = mapped_column(String, primary_key=True)
display_name: Mapped[str] = mapped_column(String, nullable=True)
file_origin: Mapped[FileOrigin] = mapped_column(Enum(FileOrigin, native_enum=False))
file_type: Mapped[str] = mapped_column(String, default="text/plain")
file_metadata: Mapped[JSON_ro] = mapped_column(postgresql.JSONB(), nullable=True)
lobj_oid: Mapped[int] = mapped_column(Integer, nullable=False)
# External storage support (S3, MinIO, Azure Blob, etc.)
bucket_name: Mapped[str] = mapped_column(String)
object_key: Mapped[str] = mapped_column(String)
# Timestamps for external storage
created_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
class AgentSearchMetrics(Base):
@@ -3022,13 +3035,13 @@ class PublicExternalUserGroup(Base):
class UsageReport(Base):
"""This stores metadata about usage reports generated by admin including user who generated
them as well las the period they cover. The actual zip file of the report is stored as a lo
using the PGFileStore
using the FileRecord
"""
__tablename__ = "usage_reports"
id: Mapped[int] = mapped_column(primary_key=True)
report_name: Mapped[str] = mapped_column(ForeignKey("file_store.file_name"))
report_name: Mapped[str] = mapped_column(ForeignKey("file_record.file_id"))
# if None, report was auto-generated
requestor_user_id: Mapped[UUID | None] = mapped_column(
@@ -3043,7 +3056,7 @@ class UsageReport(Base):
period_to: Mapped[datetime.datetime | None] = mapped_column(DateTime(timezone=True))
requestor = relationship("User")
file = relationship("PGFileStore")
file = relationship("FileRecord")
class InputPrompt(Base):

View File

@@ -1,206 +0,0 @@
import tempfile
from io import BytesIO
from typing import IO
from psycopg2.extensions import connection
from sqlalchemy.orm import Session
from sqlalchemy.sql import and_
from sqlalchemy.sql import select
from onyx.background.task_utils import QUERY_REPORT_NAME_PREFIX
from onyx.configs.constants import FileOrigin
from onyx.configs.constants import FileType
from onyx.db.models import PGFileStore
from onyx.file_store.constants import MAX_IN_MEMORY_SIZE
from onyx.file_store.constants import STANDARD_CHUNK_SIZE
from onyx.utils.logger import setup_logger
logger = setup_logger()
def get_pg_conn_from_session(db_session: Session) -> connection:
return db_session.connection().connection.connection # type: ignore
def get_pgfilestore_by_file_name_optional(
file_name: str,
db_session: Session,
) -> PGFileStore | None:
return db_session.query(PGFileStore).filter_by(file_name=file_name).first()
def get_pgfilestore_by_file_name(
file_name: str,
db_session: Session,
) -> PGFileStore:
pgfilestore = db_session.query(PGFileStore).filter_by(file_name=file_name).first()
if not pgfilestore:
raise RuntimeError(f"File by name {file_name} does not exist or was deleted")
return pgfilestore
def delete_pgfilestore_by_file_name(
file_name: str,
db_session: Session,
) -> None:
db_session.query(PGFileStore).filter_by(file_name=file_name).delete()
def create_populate_lobj(
content: IO,
db_session: Session,
) -> int:
"""Note, this does not commit the changes to the DB
This is because the commit should happen with the PGFileStore row creation
That step finalizes both the Large Object and the table tracking it
"""
pg_conn = get_pg_conn_from_session(db_session)
large_object = pg_conn.lobject()
# write in multiple chunks to avoid loading the whole file into memory
while True:
chunk = content.read(STANDARD_CHUNK_SIZE)
if not chunk:
break
large_object.write(chunk)
large_object.close()
return large_object.oid
def read_lobj(
lobj_oid: int,
db_session: Session,
mode: str | None = None,
use_tempfile: bool = False,
) -> IO:
pg_conn = get_pg_conn_from_session(db_session)
# Ensure we're using binary mode by default for large objects
if mode is None:
mode = "rb"
large_object = (
pg_conn.lobject(lobj_oid, mode=mode) if mode else pg_conn.lobject(lobj_oid)
)
if use_tempfile:
temp_file = tempfile.SpooledTemporaryFile(max_size=MAX_IN_MEMORY_SIZE)
while True:
chunk = large_object.read(STANDARD_CHUNK_SIZE)
if not chunk:
break
temp_file.write(chunk)
temp_file.seek(0)
return temp_file
else:
# Ensure we're getting raw bytes without text decoding
return BytesIO(large_object.read())
def delete_lobj_by_id(
lobj_oid: int,
db_session: Session,
) -> None:
pg_conn = get_pg_conn_from_session(db_session)
pg_conn.lobject(lobj_oid).unlink()
def delete_lobj_by_name(
lobj_name: str,
db_session: Session,
) -> None:
try:
pgfilestore = get_pgfilestore_by_file_name(lobj_name, db_session)
except RuntimeError:
logger.info(f"no file with name {lobj_name} found")
return
pg_conn = get_pg_conn_from_session(db_session)
pg_conn.lobject(pgfilestore.lobj_oid).unlink()
delete_pgfilestore_by_file_name(lobj_name, db_session)
db_session.commit()
def upsert_pgfilestore(
file_name: str,
display_name: str | None,
file_origin: FileOrigin,
file_type: str,
lobj_oid: int,
db_session: Session,
commit: bool = False,
file_metadata: dict | None = None,
) -> PGFileStore:
pgfilestore = db_session.query(PGFileStore).filter_by(file_name=file_name).first()
if pgfilestore:
try:
# This should not happen in normal execution
delete_lobj_by_id(lobj_oid=pgfilestore.lobj_oid, db_session=db_session)
except Exception:
# If the delete fails as well, the large object doesn't exist anyway and even if it
# fails to delete, it's not too terrible as most files sizes are insignificant
logger.error(
f"Failed to delete large object with oid {pgfilestore.lobj_oid}"
)
pgfilestore.lobj_oid = lobj_oid
else:
pgfilestore = PGFileStore(
file_name=file_name,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_metadata=file_metadata,
lobj_oid=lobj_oid,
)
db_session.add(pgfilestore)
if commit:
db_session.commit()
return pgfilestore
def save_bytes_to_pgfilestore(
db_session: Session,
raw_bytes: bytes,
media_type: str,
identifier: str,
display_name: str,
file_origin: FileOrigin = FileOrigin.OTHER,
) -> PGFileStore:
"""
Saves raw bytes to PGFileStore and returns the resulting record.
"""
file_name = f"{file_origin.name.lower()}_{identifier}"
lobj_oid = create_populate_lobj(BytesIO(raw_bytes), db_session)
pgfilestore = upsert_pgfilestore(
file_name=file_name,
display_name=display_name,
file_origin=file_origin,
file_type=media_type,
lobj_oid=lobj_oid,
db_session=db_session,
commit=True,
)
return pgfilestore
def get_query_history_export_files(
db_session: Session,
) -> list[PGFileStore]:
return list(
db_session.scalars(
select(PGFileStore).where(
and_(
PGFileStore.file_name.like(f"{QUERY_REPORT_NAME_PREFIX}-%"),
PGFileStore.file_type == FileType.CSV,
PGFileStore.file_origin == FileOrigin.QUERY_HISTORY_CSV,
)
)
)
)

View File

@@ -136,7 +136,8 @@ def _vespa_hit_to_inference_chunk(
section_continuation=fields[SECTION_CONTINUATION],
document_id=fields[DOCUMENT_ID],
source_type=fields[SOURCE_TYPE],
image_file_name=fields.get(IMAGE_FILE_NAME),
# still called `image_file_name` in Vespa for backwards compatibility
image_file_id=fields.get(IMAGE_FILE_NAME),
title=fields.get(TITLE),
semantic_identifier=fields[SEMANTIC_IDENTIFIER],
boost=fields.get(BOOST, 1),

View File

@@ -206,7 +206,8 @@ def _index_vespa_chunk(
# which only calls VespaIndex.update
ACCESS_CONTROL_LIST: {acl_entry: 1 for acl_entry in chunk.access.to_acl()},
DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets},
IMAGE_FILE_NAME: chunk.image_file_name,
# still called `image_file_name` in Vespa for backwards compatibility
IMAGE_FILE_NAME: chunk.image_file_id,
USER_FILE: chunk.user_file if chunk.user_file is not None else None,
USER_FOLDER: chunk.user_folder if chunk.user_folder is not None else None,
BOOST: chunk.boost,

View File

@@ -68,6 +68,7 @@ PRIMARY_OWNERS = "primary_owners"
SECONDARY_OWNERS = "secondary_owners"
RECENCY_BIAS = "recency_bias"
HIDDEN = "hidden"
# for legacy reasons, called `name` in Vespa despite it really being an ID
IMAGE_FILE_NAME = "image_file_name"
# Specific to Vespa, needed for highlighting matching keywords / section

View File

@@ -2,7 +2,6 @@ import io
import json
import os
import re
import uuid
import zipfile
from collections.abc import Callable
from collections.abc import Iterator
@@ -613,15 +612,13 @@ def convert_docx_to_txt(file: UploadFile, file_store: FileStore) -> str:
all_paras = [p.text for p in doc.paragraphs]
text_content = "\n".join(all_paras)
text_file_name = docx_to_txt_filename(file.filename or f"docx_{uuid.uuid4()}")
file_store.save_file(
file_name=text_file_name,
file_id = file_store.save_file(
content=BytesIO(text_content.encode("utf-8")),
display_name=file.filename,
file_origin=FileOrigin.CONNECTOR,
file_type="text/plain",
)
return text_file_name
return file_id
def docx_to_txt_filename(file_path: str) -> str:

View File

@@ -1,10 +1,11 @@
from io import BytesIO
from typing import Tuple
from sqlalchemy.orm import Session
from onyx.configs.constants import FileOrigin
from onyx.connectors.models import ImageSection
from onyx.db.pg_file_store import save_bytes_to_pgfilestore
from onyx.file_store.file_store import get_default_file_store
from onyx.utils.logger import setup_logger
logger = setup_logger()
@@ -13,19 +14,19 @@ logger = setup_logger()
def store_image_and_create_section(
db_session: Session,
image_data: bytes,
file_name: str,
file_id: str,
display_name: str,
link: str | None = None,
media_type: str = "application/octet-stream",
file_origin: FileOrigin = FileOrigin.OTHER,
) -> Tuple[ImageSection, str | None]:
"""
Stores an image in PGFileStore and creates an ImageSection object without summarization.
Stores an image in FileStore and creates an ImageSection object without summarization.
Args:
db_session: Database session
image_data: Raw image bytes
file_name: Base identifier for the file
file_id: Base identifier for the file
display_name: Human-readable name for the image
media_type: MIME type of the image
file_origin: Origin of the file (e.g., CONFLUENCE, GOOGLE_DRIVE, etc.)
@@ -33,26 +34,24 @@ def store_image_and_create_section(
Returns:
Tuple containing:
- ImageSection object with image reference
- The file_name in PGFileStore or None if storage failed
- The file_id in FileStore or None if storage failed
"""
# Storage logic
stored_file_name = None
try:
pgfilestore = save_bytes_to_pgfilestore(
db_session=db_session,
raw_bytes=image_data,
media_type=media_type,
identifier=file_name,
file_store = get_default_file_store(db_session)
file_id = file_store.save_file(
content=BytesIO(image_data),
display_name=display_name,
file_origin=file_origin,
file_type=media_type,
file_id=file_id,
)
stored_file_name = pgfilestore.file_name
except Exception as e:
logger.error(f"Failed to store image: {e}")
raise e
# Create an ImageSection with empty text (will be filled by LLM later in the pipeline)
return (
ImageSection(image_file_name=stored_file_name, link=link),
stored_file_name,
ImageSection(image_file_id=file_id, link=link),
file_id,
)

View File

@@ -0,0 +1,161 @@
# Onyx File Store
The Onyx file store provides a unified interface for storing files and large binary objects in S3-compatible storage systems. It supports AWS S3, MinIO, Azure Blob Storage, Digital Ocean Spaces, and other S3-compatible services.
## Architecture
The file store uses a single database table (`file_record`) to store file metadata while the actual file content is stored in external S3-compatible storage. This approach provides scalability, cost-effectiveness, and decouples file storage from the database.
### Database Schema
The `file_record` table contains the following columns:
- `file_id` (primary key): Unique identifier for the file
- `display_name`: Human-readable name for the file
- `file_origin`: Origin/source of the file (enum)
- `file_type`: MIME type of the file
- `file_metadata`: Additional metadata as JSON
- `bucket_name`: External storage bucket/container name
- `object_key`: External storage object key/path
- `created_at`: Timestamp when the file was created
- `updated_at`: Timestamp when the file was last updated
## Storage Backend
### S3-Compatible Storage
Stores files in external S3-compatible storage systems while keeping metadata in the database.
**Pros:**
- Scalable storage
- Cost-effective for large files
- CDN integration possible
- Decoupled from database
- Wide ecosystem support
**Cons:**
- Additional infrastructure required
- Network dependency
- Eventual consistency considerations
## Configuration
All configuration is handled via environment variables. The system requires S3-compatible storage to be configured.
### AWS S3
```bash
S3_FILE_STORE_BUCKET_NAME=your-bucket-name # Defaults to 'onyx-file-store-bucket'
S3_FILE_STORE_PREFIX=onyx-files # Optional, defaults to 'onyx-files'
# AWS credentials (use one of these methods):
# 1. Environment variables
S3_AWS_ACCESS_KEY_ID=your-access-key
S3_AWS_SECRET_ACCESS_KEY=your-secret-key
AWS_REGION_NAME=us-east-2 # Optional, defaults to 'us-east-2'
# 2. IAM roles (recommended for EC2/ECS deployments)
# No additional configuration needed if using IAM roles
```
### MinIO
```bash
S3_FILE_STORE_BUCKET_NAME=your-bucket-name
S3_ENDPOINT_URL=http://localhost:9000 # MinIO endpoint
S3_AWS_ACCESS_KEY_ID=minioadmin
S3_AWS_SECRET_ACCESS_KEY=minioadmin
AWS_REGION_NAME=us-east-1 # Any region name
S3_VERIFY_SSL=false # Optional, defaults to false
```
### Digital Ocean Spaces
```bash
S3_FILE_STORE_BUCKET_NAME=your-space-name
S3_ENDPOINT_URL=https://nyc3.digitaloceanspaces.com
S3_AWS_ACCESS_KEY_ID=your-spaces-key
S3_AWS_SECRET_ACCESS_KEY=your-spaces-secret
AWS_REGION_NAME=nyc3
```
### Other S3-Compatible Services
The file store works with any S3-compatible service. Simply configure:
- `S3_FILE_STORE_BUCKET_NAME`: Your bucket/container name
- `S3_ENDPOINT_URL`: The service endpoint URL
- `S3_AWS_ACCESS_KEY_ID` and `S3_AWS_SECRET_ACCESS_KEY`: Your credentials
- `AWS_REGION_NAME`: The region (any valid region name)
## Implementation
The system uses the `S3BackedFileStore` class that implements the abstract `FileStore` interface. The database uses generic column names (`bucket_name`, `object_key`) to maintain compatibility with different S3-compatible services.
### File Store Interface
The `FileStore` abstract base class defines the following methods:
- `initialize()`: Initialize the storage backend (create bucket if needed)
- `has_file(file_id, file_origin, file_type)`: Check if a file exists
- `save_file(content, display_name, file_origin, file_type, file_metadata, file_id)`: Save a file
- `read_file(file_id, mode, use_tempfile)`: Read file content
- `read_file_record(file_id)`: Get file metadata from database
- `delete_file(file_id)`: Delete a file and its metadata
- `get_file_with_mime_type(filename)`: Get file with parsed MIME type
## Usage Example
```python
from onyx.file_store.file_store import get_default_file_store
from onyx.configs.constants import FileOrigin
# Get the configured file store
file_store = get_default_file_store(db_session)
# Initialize the storage backend (creates bucket if needed)
file_store.initialize()
# Save a file
with open("example.pdf", "rb") as f:
file_id = file_store.save_file(
content=f,
display_name="Important Document.pdf",
file_origin=FileOrigin.OTHER,
file_type="application/pdf",
file_metadata={"department": "engineering", "version": "1.0"}
)
# Check if a file exists
exists = file_store.has_file(
file_id=file_id,
file_origin=FileOrigin.OTHER,
file_type="application/pdf"
)
# Read a file
file_content = file_store.read_file(file_id)
# Read file with temporary file (for large files)
file_content = file_store.read_file(file_id, use_tempfile=True)
# Get file metadata
file_record = file_store.read_file_record(file_id)
# Get file with MIME type detection
file_with_mime = file_store.get_file_with_mime_type(file_id)
# Delete a file
file_store.delete_file(file_id)
```
## Initialization
When deploying the application, ensure that:
1. The S3-compatible storage service is accessible
2. Credentials are properly configured
3. The bucket specified in `S3_FILE_STORE_BUCKET_NAME` exists or the service account has permissions to create it
4. Call `file_store.initialize()` during application startup to ensure the bucket exists
The file store will automatically create the bucket if it doesn't exist and the credentials have sufficient permissions.

View File

@@ -1,21 +1,37 @@
import tempfile
import uuid
from abc import ABC
from abc import abstractmethod
from io import BytesIO
from typing import Any
from typing import cast
from typing import IO
import boto3
import puremagic
from botocore.config import Config
from botocore.exceptions import ClientError
from mypy_boto3_s3 import S3Client
from sqlalchemy.orm import Session
from onyx.configs.app_configs import AWS_REGION_NAME
from onyx.configs.app_configs import S3_AWS_ACCESS_KEY_ID
from onyx.configs.app_configs import S3_AWS_SECRET_ACCESS_KEY
from onyx.configs.app_configs import S3_ENDPOINT_URL
from onyx.configs.app_configs import S3_FILE_STORE_BUCKET_NAME
from onyx.configs.app_configs import S3_FILE_STORE_PREFIX
from onyx.configs.app_configs import S3_VERIFY_SSL
from onyx.configs.constants import FileOrigin
from onyx.db.models import PGFileStore
from onyx.db.pg_file_store import create_populate_lobj
from onyx.db.pg_file_store import delete_lobj_by_id
from onyx.db.pg_file_store import delete_pgfilestore_by_file_name
from onyx.db.pg_file_store import get_pgfilestore_by_file_name
from onyx.db.pg_file_store import get_pgfilestore_by_file_name_optional
from onyx.db.pg_file_store import read_lobj
from onyx.db.pg_file_store import upsert_pgfilestore
from onyx.db.file_record import delete_filerecord_by_file_id
from onyx.db.file_record import get_filerecord_by_file_id
from onyx.db.file_record import get_filerecord_by_file_id_optional
from onyx.db.file_record import upsert_filerecord
from onyx.db.models import FileRecord as FileStoreModel
from onyx.utils.file import FileWithMimeType
from onyx.utils.logger import setup_logger
from shared_configs.contextvars import get_current_tenant_id
logger = setup_logger()
class FileStore(ABC):
@@ -23,20 +39,25 @@ class FileStore(ABC):
An abstraction for storing files and large binary objects.
"""
@abstractmethod
def initialize(self) -> None:
"""
Should generally be called once before any other methods are called.
"""
raise NotImplementedError
@abstractmethod
def has_file(
self,
file_name: str,
file_id: str,
file_origin: FileOrigin,
file_type: str,
display_name: str | None = None,
) -> bool:
"""
Check if a file exists in the blob store
Parameters:
- file_name: Name of the file to save
- display_name: Display name of the file
- file_id: Unique ID of the file to check for
- file_origin: Origin of the file
- file_type: Type of the file
"""
@@ -45,38 +66,39 @@ class FileStore(ABC):
@abstractmethod
def save_file(
self,
file_name: str,
content: IO,
display_name: str | None,
file_origin: FileOrigin,
file_type: str,
file_metadata: dict | None = None,
commit: bool = True,
) -> None:
file_metadata: dict[str, Any] | None = None,
file_id: str | None = None,
) -> str:
"""
Save a file to the blob store
Parameters:
- connector_name: Name of the CC-Pair (as specified by the user in the UI)
- file_name: Name of the file to save
- content: Contents of the file
- display_name: Display name of the file
- display_name: Display name of the file to save
- file_origin: Origin of the file
- file_type: Type of the file
- file_metadata: Additional metadata for the file
- commit: Whether to commit the transaction after saving the file
- file_id: Unique ID of the file to save. If not provided, a random UUID will be generated.
It is generally NOT recommended to provide this.
Returns:
The unique ID of the file that was saved.
"""
raise NotImplementedError
@abstractmethod
def read_file(
self, file_name: str, mode: str | None = None, use_tempfile: bool = False
) -> IO:
self, file_id: str, mode: str | None = None, use_tempfile: bool = False
) -> IO[bytes]:
"""
Read the content of a given file by the name
Read the content of a given file by the ID
Parameters:
- file_name: Name of file to read
- file_id: Unique ID of file to read
- mode: Mode to open the file (e.g. 'b' for binary)
- use_tempfile: Whether to use a temporary file to store the contents
in order to avoid loading the entire file into memory
@@ -86,34 +108,160 @@ class FileStore(ABC):
"""
@abstractmethod
def read_file_record(self, file_name: str) -> PGFileStore:
def read_file_record(self, file_id: str) -> FileStoreModel:
"""
Read the file record by the name
Read the file record by the ID
"""
@abstractmethod
def delete_file(self, file_name: str) -> None:
def delete_file(self, file_id: str) -> None:
"""
Delete a file by its name.
Delete a file by its ID.
Parameters:
- file_name: Name of file to delete
"""
@abstractmethod
def get_file_with_mime_type(self, filename: str) -> FileWithMimeType | None:
"""
Get the file + parse out the mime type.
"""
class PostgresBackedFileStore(FileStore):
def __init__(self, db_session: Session):
class S3BackedFileStore(FileStore):
"""Isn't necessarily S3, but is any S3-compatible storage (e.g. MinIO)"""
def __init__(
self,
db_session: Session,
bucket_name: str,
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
aws_region_name: str | None = None,
s3_endpoint_url: str | None = None,
s3_prefix: str | None = None,
s3_verify_ssl: bool = True,
) -> None:
self.db_session = db_session
self._s3_client: S3Client | None = None
self._bucket_name = bucket_name
self._aws_access_key_id = aws_access_key_id
self._aws_secret_access_key = aws_secret_access_key
self._aws_region_name = aws_region_name or "us-east-2"
self._s3_endpoint_url = s3_endpoint_url
self._s3_prefix = s3_prefix or "onyx-files"
self._s3_verify_ssl = s3_verify_ssl
def _get_s3_client(self) -> S3Client:
"""Initialize S3 client if not already done"""
if self._s3_client is None:
try:
client_kwargs: dict[str, Any] = {
"service_name": "s3",
"region_name": self._aws_region_name,
}
# Add endpoint URL if specified (for MinIO, etc.)
if self._s3_endpoint_url:
client_kwargs["endpoint_url"] = self._s3_endpoint_url
client_kwargs["config"] = Config(
signature_version="s3v4",
s3={"addressing_style": "path"}, # Required for MinIO
)
# Disable SSL verification if requested (for local development)
if not self._s3_verify_ssl:
import urllib3
urllib3.disable_warnings(
urllib3.exceptions.InsecureRequestWarning
)
client_kwargs["verify"] = False
if self._aws_access_key_id and self._aws_secret_access_key:
# Use explicit credentials
client_kwargs.update(
{
"aws_access_key_id": self._aws_access_key_id,
"aws_secret_access_key": self._aws_secret_access_key,
}
)
self._s3_client = boto3.client(**client_kwargs)
else:
# Use IAM role or default credentials (not typically used with MinIO)
self._s3_client = boto3.client(**client_kwargs)
except Exception as e:
logger.error(f"Failed to initialize S3 client: {e}")
raise RuntimeError(f"Failed to initialize S3 client: {e}")
return self._s3_client
def _get_bucket_name(self) -> str:
"""Get S3 bucket name from configuration"""
if not self._bucket_name:
raise RuntimeError("S3 bucket name is required for S3 file store")
return self._bucket_name
def _get_s3_key(self, file_name: str) -> str:
"""Generate S3 key from file name with tenant ID prefix"""
tenant_id = get_current_tenant_id()
return f"{self._s3_prefix}/{tenant_id}/{file_name}"
def initialize(self) -> None:
"""Initialize the S3 file store by ensuring the bucket exists"""
s3_client = self._get_s3_client()
bucket_name = self._get_bucket_name()
# Check if bucket exists
try:
s3_client.head_bucket(Bucket=bucket_name)
logger.info(f"S3 bucket '{bucket_name}' already exists")
except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "404":
# Bucket doesn't exist, create it
logger.info(f"Creating S3 bucket '{bucket_name}'")
# For AWS S3, we need to handle region-specific bucket creation
region = (
s3_client._client_config.region_name
if hasattr(s3_client, "_client_config")
else None
)
if region and region != "us-east-1":
# For regions other than us-east-1, we need to specify LocationConstraint
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": region},
)
else:
# For us-east-1 or MinIO/other S3-compatible services
s3_client.create_bucket(Bucket=bucket_name)
logger.info(f"Successfully created S3 bucket '{bucket_name}'")
elif error_code == "403":
# Bucket exists but we don't have permission to access it
logger.warning(
f"S3 bucket '{bucket_name}' exists but access is forbidden"
)
raise RuntimeError(
f"Access denied to S3 bucket '{bucket_name}'. Check credentials and permissions."
)
else:
# Some other error occurred
logger.error(f"Failed to check S3 bucket '{bucket_name}': {e}")
raise RuntimeError(f"Failed to check S3 bucket '{bucket_name}': {e}")
def has_file(
self,
file_name: str,
file_id: str,
file_origin: FileOrigin,
file_type: str,
display_name: str | None = None,
) -> bool:
file_record = get_pgfilestore_by_file_name_optional(
file_name=display_name or file_name, db_session=self.db_session
file_record = get_filerecord_by_file_id_optional(
file_id=file_id, db_session=self.db_session
)
return (
file_record is not None
@@ -123,63 +271,101 @@ class PostgresBackedFileStore(FileStore):
def save_file(
self,
file_name: str,
content: IO,
display_name: str | None,
file_origin: FileOrigin,
file_type: str,
file_metadata: dict | None = None,
commit: bool = True,
) -> None:
try:
# The large objects in postgres are saved as special objects can be listed with
# SELECT * FROM pg_largeobject_metadata;
obj_id = create_populate_lobj(content=content, db_session=self.db_session)
upsert_pgfilestore(
file_name=file_name,
display_name=display_name or file_name,
file_origin=file_origin,
file_type=file_type,
lobj_oid=obj_id,
db_session=self.db_session,
file_metadata=file_metadata,
)
if commit:
self.db_session.commit()
except Exception:
self.db_session.rollback()
raise
file_metadata: dict[str, Any] | None = None,
file_id: str | None = None,
) -> str:
if file_id is None:
file_id = str(uuid.uuid4())
s3_client = self._get_s3_client()
bucket_name = self._get_bucket_name()
s3_key = self._get_s3_key(file_id)
# Read content from IO object
if hasattr(content, "read"):
file_content = content.read()
if hasattr(content, "seek"):
content.seek(0) # Reset position for potential re-reads
else:
file_content = content
# Upload to S3
s3_client.put_object(
Bucket=bucket_name,
Key=s3_key,
Body=file_content,
ContentType=file_type,
)
# Save metadata to database
upsert_filerecord(
file_id=file_id,
display_name=display_name or file_id,
file_origin=file_origin,
file_type=file_type,
bucket_name=bucket_name,
object_key=s3_key,
db_session=self.db_session,
file_metadata=file_metadata,
)
self.db_session.commit()
return file_id
def read_file(
self, file_name: str, mode: str | None = None, use_tempfile: bool = False
) -> IO:
file_record = get_pgfilestore_by_file_name(
file_name=file_name, db_session=self.db_session
)
return read_lobj(
lobj_oid=file_record.lobj_oid,
db_session=self.db_session,
mode=mode,
use_tempfile=use_tempfile,
self, file_id: str, mode: str | None = None, use_tempfile: bool = False
) -> IO[bytes]:
file_record = get_filerecord_by_file_id(
file_id=file_id, db_session=self.db_session
)
def read_file_record(self, file_name: str) -> PGFileStore:
file_record = get_pgfilestore_by_file_name(
file_name=file_name, db_session=self.db_session
)
s3_client = self._get_s3_client()
try:
response = s3_client.get_object(
Bucket=file_record.bucket_name, Key=file_record.object_key
)
except ClientError:
logger.error(f"Failed to read file {file_id} from S3")
raise
file_content = response["Body"].read()
if use_tempfile:
# Always open in binary mode for temp files since we're writing bytes
temp_file = tempfile.NamedTemporaryFile(mode="w+b", delete=False)
temp_file.write(file_content)
temp_file.seek(0)
return temp_file
else:
return BytesIO(file_content)
def read_file_record(self, file_id: str) -> FileStoreModel:
file_record = get_filerecord_by_file_id(
file_id=file_id, db_session=self.db_session
)
return file_record
def delete_file(self, file_name: str) -> None:
def delete_file(self, file_id: str) -> None:
try:
file_record = get_pgfilestore_by_file_name(
file_name=file_name, db_session=self.db_session
file_record = get_filerecord_by_file_id(
file_id=file_id, db_session=self.db_session
)
delete_lobj_by_id(file_record.lobj_oid, db_session=self.db_session)
delete_pgfilestore_by_file_name(
file_name=file_name, db_session=self.db_session
# Delete from external storage
s3_client = self._get_s3_client()
s3_client.delete_object(
Bucket=file_record.bucket_name, Key=file_record.object_key
)
# Delete metadata from database
delete_filerecord_by_file_id(file_id=file_id, db_session=self.db_session)
self.db_session.commit()
except Exception:
self.db_session.rollback()
raise
@@ -197,6 +383,53 @@ class PostgresBackedFileStore(FileStore):
return None
def get_s3_file_store(db_session: Session) -> S3BackedFileStore:
"""
Returns the S3 file store implementation.
"""
# Get bucket name - this is required
bucket_name = S3_FILE_STORE_BUCKET_NAME
if not bucket_name:
raise RuntimeError(
"S3_FILE_STORE_BUCKET_NAME configuration is required for S3 file store"
)
return S3BackedFileStore(
db_session=db_session,
bucket_name=bucket_name,
aws_access_key_id=S3_AWS_ACCESS_KEY_ID,
aws_secret_access_key=S3_AWS_SECRET_ACCESS_KEY,
aws_region_name=AWS_REGION_NAME,
s3_endpoint_url=S3_ENDPOINT_URL,
s3_prefix=S3_FILE_STORE_PREFIX,
s3_verify_ssl=S3_VERIFY_SSL,
)
def get_default_file_store(db_session: Session) -> FileStore:
# The only supported file store now is the Postgres File Store
return PostgresBackedFileStore(db_session=db_session)
"""
Returns the configured file store implementation.
Supports AWS S3, MinIO, and other S3-compatible storage.
Configuration is handled via environment variables defined in app_configs.py:
AWS S3:
- S3_FILE_STORE_BUCKET_NAME=<bucket-name>
- S3_FILE_STORE_PREFIX=<prefix> (optional, defaults to 'onyx-files')
- AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (or use IAM roles)
- AWS_REGION_NAME=<region> (optional, defaults to 'us-east-2')
MinIO:
- S3_FILE_STORE_BUCKET_NAME=<bucket-name>
- S3_ENDPOINT_URL=<minio-endpoint> (e.g., http://localhost:9000)
- MINIO_ACCESS_KEY=<minio-access-key> (falls back to AWS_ACCESS_KEY_ID)
- MINIO_SECRET_KEY=<minio-secret-key> (falls back to AWS_SECRET_ACCESS_KEY)
- AWS_REGION_NAME=<any-region> (optional, defaults to 'us-east-2')
- S3_VERIFY_SSL=false (optional, for local development)
Other S3-compatible storage (Digital Ocean, Linode, etc.):
- Same as MinIO, but set appropriate S3_ENDPOINT_URL
"""
return get_s3_file_store(db_session)

View File

@@ -3,7 +3,6 @@ from collections.abc import Callable
from io import BytesIO
from typing import cast
from uuid import UUID
from uuid import uuid4
import requests
from sqlalchemy.orm import Session
@@ -30,16 +29,13 @@ def user_file_id_to_plaintext_file_name(user_file_id: int) -> str:
return f"plaintext_{user_file_id}"
def store_user_file_plaintext(
user_file_id: int, plaintext_content: str, db_session: Session
) -> bool:
def store_user_file_plaintext(user_file_id: int, plaintext_content: str) -> bool:
"""
Store plaintext content for a user file in the file store.
Args:
user_file_id: The ID of the user file
plaintext_content: The plaintext content to store
db_session: The database session
Returns:
bool: True if storage was successful, False otherwise
@@ -51,19 +47,19 @@ def store_user_file_plaintext(
# Get plaintext file name
plaintext_file_name = user_file_id_to_plaintext_file_name(user_file_id)
# Store the plaintext in the file store
file_store = get_default_file_store(db_session)
file_content = BytesIO(plaintext_content.encode("utf-8"))
# Use a separate session to avoid committing the caller's transaction
try:
file_store.save_file(
file_name=plaintext_file_name,
content=file_content,
display_name=f"Plaintext for user file {user_file_id}",
file_origin=FileOrigin.PLAINTEXT_CACHE,
file_type="text/plain",
commit=False,
)
return True
with get_session_with_current_tenant() as file_store_session:
file_store = get_default_file_store(file_store_session)
file_content = BytesIO(plaintext_content.encode("utf-8"))
file_store.save_file(
content=file_content,
display_name=f"Plaintext for user file {user_file_id}",
file_origin=FileOrigin.PLAINTEXT_CACHE,
file_type="text/plain",
file_id=plaintext_file_name,
)
return True
except Exception as e:
logger.warning(f"Failed to store plaintext for user file {user_file_id}: {e}")
return False
@@ -274,34 +270,27 @@ def save_file_from_url(url: str) -> str:
response = requests.get(url)
response.raise_for_status()
unique_id = str(uuid4())
file_io = BytesIO(response.content)
file_store = get_default_file_store(db_session)
file_store.save_file(
file_name=unique_id,
file_id = file_store.save_file(
content=file_io,
display_name="GeneratedImage",
file_origin=FileOrigin.CHAT_IMAGE_GEN,
file_type="image/png;base64",
commit=True,
)
return unique_id
return file_id
def save_file_from_base64(base64_string: str) -> str:
with get_session_with_current_tenant() as db_session:
unique_id = str(uuid4())
file_store = get_default_file_store(db_session)
file_store.save_file(
file_name=unique_id,
file_id = file_store.save_file(
content=BytesIO(base64.b64decode(base64_string)),
display_name="GeneratedImage",
file_origin=FileOrigin.CHAT_IMAGE_GEN,
file_type=get_image_type(base64_string),
commit=True,
)
return unique_id
return file_id
def save_file(

View File

@@ -82,7 +82,7 @@ def _combine_chunks(chunks: list[DocAwareChunk], large_chunk_id: int) -> DocAwar
blurb=chunks[0].blurb,
content=chunks[0].content,
source_links=chunks[0].source_links or {},
image_file_name=None,
image_file_id=None,
section_continuation=(chunks[0].chunk_id > 0),
title_prefix=chunks[0].title_prefix,
metadata_suffix_semantic=chunks[0].metadata_suffix_semantic,
@@ -233,7 +233,7 @@ class Chunker:
title_prefix: str = "",
metadata_suffix_semantic: str = "",
metadata_suffix_keyword: str = "",
image_file_name: str | None = None,
image_file_id: str | None = None,
) -> None:
"""
Helper to create a new DocAwareChunk, append it to chunks_list.
@@ -244,7 +244,7 @@ class Chunker:
blurb=self._extract_blurb(text),
content=text,
source_links=links or {0: ""},
image_file_name=image_file_name,
image_file_id=image_file_id,
section_continuation=is_continuation,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
@@ -278,7 +278,7 @@ class Chunker:
# Get section text and other attributes
section_text = clean_text(str(section.text or ""))
section_link_text = section.link or ""
image_url = section.image_file_name
image_url = section.image_file_id
# If there is no useful content, skip
if not section_text and (not document.title or section_idx > 0):
@@ -312,7 +312,7 @@ class Chunker:
chunks,
section_text,
links={0: section_link_text} if section_link_text else {},
image_file_name=image_url,
image_file_id=image_url,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,

View File

@@ -44,8 +44,6 @@ from onyx.db.document_set import fetch_document_sets_for_documents
from onyx.db.engine import get_session_with_current_tenant
from onyx.db.models import Document as DBDocument
from onyx.db.models import IndexModelStatus
from onyx.db.pg_file_store import get_pgfilestore_by_file_name
from onyx.db.pg_file_store import read_lobj
from onyx.db.search_settings import get_active_search_settings
from onyx.db.tag import create_or_add_document_tag
from onyx.db.tag import create_or_add_document_tag_list
@@ -59,6 +57,7 @@ from onyx.document_index.interfaces import DocumentIndex
from onyx.document_index.interfaces import DocumentMetadata
from onyx.document_index.interfaces import IndexBatchParams
from onyx.file_processing.image_summarization import summarize_image_with_error_handling
from onyx.file_store.file_store import get_default_file_store
from onyx.file_store.utils import store_user_file_plaintext
from onyx.indexing.chunker import Chunker
from onyx.indexing.embedder import embed_chunks_with_failure_handling
@@ -463,13 +462,13 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
# Even without LLM, we still convert to IndexingDocument with base Sections
return [
IndexingDocument(
**document.dict(),
**document.model_dump(),
processed_sections=[
Section(
text=section.text if isinstance(section, TextSection) else "",
link=section.link,
image_file_name=(
section.image_file_name
image_file_id=(
section.image_file_id
if isinstance(section, ImageSection)
else None
),
@@ -486,38 +485,39 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
processed_sections: list[Section] = []
for section in document.sections:
# For ImageSection, process and create base Section with both text and image_file_name
# For ImageSection, process and create base Section with both text and image_file_id
if isinstance(section, ImageSection):
# Default section with image path preserved - ensure text is always a string
processed_section = Section(
link=section.link,
image_file_name=section.image_file_name,
image_file_id=section.image_file_id,
text="", # Initialize with empty string
)
# Try to get image summary
try:
with get_session_with_current_tenant() as db_session:
pgfilestore = get_pgfilestore_by_file_name(
file_name=section.image_file_name, db_session=db_session
)
file_store = get_default_file_store(db_session)
if not pgfilestore:
file_record = file_store.read_file_record(
file_id=section.image_file_id
)
if not file_record:
logger.warning(
f"Image file {section.image_file_name} not found in PGFileStore"
f"Image file {section.image_file_id} not found in FileStore"
)
processed_section.text = "[Image could not be processed]"
else:
# Get the image data
image_data_io = read_lobj(
pgfilestore.lobj_oid, db_session, mode="rb"
image_data_io = file_store.read_file(
file_id=section.image_file_id
)
pgfilestore_data = image_data_io.read()
image_data = image_data_io.read()
summary = summarize_image_with_error_handling(
llm=llm,
image_data=pgfilestore_data,
context_name=pgfilestore.display_name or "Image",
image_data=image_data,
context_name=file_record.display_name or "Image",
)
if summary:
@@ -537,23 +537,13 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
processed_section = Section(
text=section.text or "", # Ensure text is always a string, not None
link=section.link,
image_file_name=None,
)
processed_sections.append(processed_section)
# If it's already a base Section (unlikely), just append it with text validation
else:
# Ensure text is always a string
processed_section = Section(
text=section.text if section.text is not None else "",
link=section.link,
image_file_name=section.image_file_name,
image_file_id=None,
)
processed_sections.append(processed_section)
# Create IndexingDocument with original sections and processed_sections
indexed_document = IndexingDocument(
**document.dict(), processed_sections=processed_sections
**document.model_dump(), processed_sections=processed_sections
)
indexed_documents.append(indexed_document)
@@ -988,6 +978,15 @@ def index_doc_batch(
continue
ids_to_new_updated_at[doc.id] = doc.doc_updated_at
# Store the plaintext in the file store for faster retrieval
# NOTE: this creates its own session to avoid committing the overall
# transaction.
for user_file_id, raw_text in user_file_id_to_raw_text.items():
store_user_file_plaintext(
user_file_id=user_file_id,
plaintext_content=raw_text,
)
update_docs_updated_at__no_commit(
ids_to_new_updated_at=ids_to_new_updated_at, db_session=db_session
)
@@ -1018,14 +1017,6 @@ def index_doc_batch(
document_ids=[doc.id for doc in filtered_documents],
db_session=db_session,
)
# Store the plaintext in the file store for faster retrieval
for user_file_id, raw_text in user_file_id_to_raw_text.items():
# Use the dedicated function to store plaintext
store_user_file_plaintext(
user_file_id=user_file_id,
plaintext_content=raw_text,
db_session=db_session,
)
# save the chunk boost components to postgres
update_chunk_boost_components__no_commit(
@@ -1033,6 +1024,7 @@ def index_doc_batch(
)
# Pause user file ccpairs
# TODO: investigate why nothing is done here?
db_session.commit()

View File

@@ -29,7 +29,7 @@ class BaseChunk(BaseModel):
content: str
# Holds the link and the offsets into the raw Chunk text
source_links: dict[int, str] | None
image_file_name: str | None
image_file_id: str | None
# True if this Chunk's start is not at the start of a Section
section_continuation: bool

View File

@@ -51,6 +51,7 @@ from onyx.configs.constants import POSTGRES_WEB_APP_NAME
from onyx.db.engine import get_session_context_manager
from onyx.db.engine import SqlEngine
from onyx.db.engine import warm_up_connections
from onyx.file_store.file_store import get_default_file_store
from onyx.server.api_key.api import router as api_key_router
from onyx.server.auth_check import check_router_auth
from onyx.server.documents.cc_pair import router as cc_pair_router
@@ -255,6 +256,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# If we are multi-tenant, we need to only set up initial public tables
with get_session_context_manager() as db_session:
setup_onyx(db_session, POSTGRES_DEFAULT_SCHEMA)
# set up the file store (e.g. create bucket if needed). On multi-tenant,
# this is done via IaC
get_default_file_store(db_session).initialize()
else:
setup_multitenant_onyx()

View File

@@ -58,7 +58,7 @@ def _create_indexable_chunks(
TextSection(
text=preprocessed_doc["content"],
link=preprocessed_doc["url"],
image_file_name=None,
image_file_id=None,
)
],
source=DocumentSource.WEB,
@@ -102,7 +102,7 @@ def _create_indexable_chunks(
user_folder=None,
boost=DEFAULT_BOOST,
large_chunk_id=None,
image_file_name=None,
image_file_id=None,
aggregated_chunk_boost_factor=1.0,
)

View File

@@ -1,7 +1,6 @@
import json
import mimetypes
import os
import uuid
import zipfile
from io import BytesIO
from typing import Any
@@ -449,40 +448,36 @@ def upload_files(files: list[UploadFile], db_session: Session) -> FileUploadResp
continue
sub_file_bytes = zf.read(file_info)
sub_file_name = os.path.join(str(uuid.uuid4()), file_info)
deduped_file_paths.append(sub_file_name)
mime_type, __ = mimetypes.guess_type(file_info)
if mime_type is None:
mime_type = "application/octet-stream"
file_store.save_file(
file_name=sub_file_name,
file_id = file_store.save_file(
content=BytesIO(sub_file_bytes),
display_name=os.path.basename(file_info),
file_origin=FileOrigin.CONNECTOR,
file_type=mime_type,
)
deduped_file_paths.append(file_id)
continue
# Special handling for docx files - only store the plaintext version
if file.content_type and file.content_type.startswith(
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
):
file_path = convert_docx_to_txt(file, file_store)
deduped_file_paths.append(file_path)
docx_file_id = convert_docx_to_txt(file, file_store)
deduped_file_paths.append(docx_file_id)
continue
# Default handling for all other file types
file_path = os.path.join(str(uuid.uuid4()), cast(str, file.filename))
deduped_file_paths.append(file_path)
file_store.save_file(
file_name=file_path,
file_id = file_store.save_file(
content=file.file,
display_name=file.filename,
file_origin=FileOrigin.CONNECTOR,
file_type=file.content_type or "text/plain",
)
deduped_file_paths.append(file_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))

View File

@@ -1,4 +1,3 @@
import uuid
from uuid import UUID
from fastapi import APIRouter
@@ -182,9 +181,7 @@ def upload_file(
) -> dict[str, str]:
file_store = get_default_file_store(db_session)
file_type = ChatFileType.IMAGE
file_id = str(uuid.uuid4())
file_store.save_file(
file_name=file_id,
file_id = file_store.save_file(
content=file.file,
display_name=file.filename,
file_origin=FileOrigin.CHAT_UPLOAD,

View File

@@ -4,7 +4,6 @@ import io
import json
import os
import time
import uuid
from collections.abc import Callable
from collections.abc import Generator
from datetime import timedelta
@@ -739,9 +738,7 @@ def upload_files_for_chat(
new_content_type = file.content_type
# Store the file normally
file_id = str(uuid.uuid4())
file_store.save_file(
file_name=file_id,
file_id = file_store.save_file(
content=file_content_io,
display_name=file.filename,
file_origin=FileOrigin.CHAT_UPLOAD,
@@ -756,10 +753,8 @@ def upload_files_for_chat(
file=extracted_text_io, # use the bytes we already read
file_name=file.filename or "",
)
text_file_id = str(uuid.uuid4())
file_store.save_file(
file_name=text_file_id,
text_file_id = file_store.save_file(
content=io.BytesIO(extracted_text.encode()),
display_name=file.filename,
file_origin=FileOrigin.CHAT_UPLOAD,

View File

@@ -12,7 +12,7 @@ from onyx.configs.constants import ONYX_CLOUD_REDIS_RUNTIME
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
from onyx.configs.constants import ONYX_EMAILABLE_LOGO_MAX_DIM
from onyx.db.engine import get_session_with_shared_schema
from onyx.file_store.file_store import PostgresBackedFileStore
from onyx.file_store.file_store import get_default_file_store
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.utils.file import FileWithMimeType
from onyx.utils.file import OnyxStaticFileManager
@@ -41,7 +41,7 @@ class OnyxRuntime:
if db_filename:
with get_session_with_shared_schema() as db_session:
file_store = PostgresBackedFileStore(db_session)
file_store = get_default_file_store(db_session)
onyx_file = file_store.get_file_with_mime_type(db_filename)
if not onyx_file:

View File

@@ -217,7 +217,7 @@ class CustomTool(BaseTool):
content = BytesIO(file_content)
file_store.save_file(
file_name=file_id,
file_id=file_id,
content=content,
display_name=file_id,
file_origin=FileOrigin.CHAT_UPLOAD,

View File

@@ -75,7 +75,7 @@ def generate_dummy_chunk(
title_embedding=generate_random_embedding(embedding_dim),
large_chunk_id=None,
large_chunk_reference_ids=[],
image_file_name=None,
image_file_id=None,
)
document_set_names = []

View File

@@ -0,0 +1,912 @@
import os
import time
import uuid
from collections.abc import Generator
from concurrent.futures import as_completed
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from typing import Any
from typing import cast
from typing import Dict
from typing import List
from typing import Tuple
from typing import TypedDict
from unittest.mock import patch
import pytest
from botocore.exceptions import ClientError
from sqlalchemy.orm import Session
from onyx.configs.constants import FileOrigin
from onyx.db.engine import get_session_context_manager
from onyx.db.engine import SqlEngine
from onyx.file_store.file_store import S3BackedFileStore
from onyx.utils.logger import setup_logger
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
logger = setup_logger()
TEST_BUCKET_NAME: str = "onyx-file-store-tests"
TEST_FILE_PREFIX: str = "test-files"
TEST_TENANT_ID: str = "public"
# Type definitions for test data
class BackendConfig(TypedDict):
endpoint_url: str | None
access_key: str
secret_key: str
region: str
verify_ssl: bool
backend_name: str
class FileTestData(TypedDict):
name: str
display_name: str
content: str
type: str
origin: FileOrigin
class WorkerResult(TypedDict):
worker_id: int
file_name: str
content: str
def _get_all_backend_configs() -> List[BackendConfig]:
"""Get configurations for all available backends"""
from onyx.configs.app_configs import (
S3_ENDPOINT_URL,
S3_AWS_ACCESS_KEY_ID,
S3_AWS_SECRET_ACCESS_KEY,
AWS_REGION_NAME,
)
configs: List[BackendConfig] = []
# MinIO configuration (if endpoint is configured)
if S3_ENDPOINT_URL:
minio_access_key = "minioadmin"
minio_secret_key = "minioadmin"
if minio_access_key and minio_secret_key:
configs.append(
{
"endpoint_url": S3_ENDPOINT_URL,
"access_key": minio_access_key,
"secret_key": minio_secret_key,
"region": "us-east-1",
"verify_ssl": False,
"backend_name": "MinIO",
}
)
# AWS S3 configuration (if credentials are available)
if S3_AWS_ACCESS_KEY_ID and S3_AWS_SECRET_ACCESS_KEY:
configs.append(
{
"endpoint_url": None,
"access_key": S3_AWS_ACCESS_KEY_ID,
"secret_key": S3_AWS_SECRET_ACCESS_KEY,
"region": AWS_REGION_NAME or "us-east-2",
"verify_ssl": True,
"backend_name": "AWS S3",
}
)
if not configs:
pytest.skip(
"No backend configurations available - set MinIO or AWS S3 credentials"
)
return configs
@pytest.fixture(scope="function")
def db_session() -> Generator[Session, None, None]:
"""Create a database session for testing using the actual PostgreSQL database"""
# Make sure that the db engine is initialized before any tests are run
SqlEngine.init_engine(
pool_size=10,
max_overflow=5,
)
with get_session_context_manager() as session:
yield session
@pytest.fixture(scope="function")
def tenant_context() -> Generator[None, None, None]:
"""Set up tenant context for testing"""
# Set the tenant context for the test
token = CURRENT_TENANT_ID_CONTEXTVAR.set(TEST_TENANT_ID)
try:
yield
finally:
# Reset the tenant context after the test
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
@pytest.fixture(
scope="function",
params=_get_all_backend_configs(),
ids=lambda config: config["backend_name"],
)
def file_store(
request: pytest.FixtureRequest, db_session: Session, tenant_context: None
) -> Generator[S3BackedFileStore, None, None]:
"""Create an S3BackedFileStore instance for testing with parametrized backend"""
backend_config: BackendConfig = request.param
# Create S3BackedFileStore with backend-specific configuration
store = S3BackedFileStore(
db_session=db_session,
bucket_name=TEST_BUCKET_NAME,
aws_access_key_id=backend_config["access_key"],
aws_secret_access_key=backend_config["secret_key"],
aws_region_name=backend_config["region"],
s3_endpoint_url=backend_config["endpoint_url"],
s3_prefix=f"{TEST_FILE_PREFIX}-{uuid.uuid4()}",
s3_verify_ssl=backend_config["verify_ssl"],
)
# Initialize the store and ensure bucket exists
store.initialize()
logger.info(
f"Successfully initialized {backend_config['backend_name']} file store with bucket {TEST_BUCKET_NAME}"
)
yield store
# Cleanup: Remove all test files from the bucket (including tenant-prefixed files)
try:
s3_client = store._get_s3_client()
actual_bucket_name = store._get_bucket_name()
# List and delete all objects in the test prefix (including tenant subdirectories)
response = s3_client.list_objects_v2(
Bucket=actual_bucket_name, Prefix=f"{store._s3_prefix}/"
)
if "Contents" in response:
objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]]
s3_client.delete_objects(
Bucket=actual_bucket_name, Delete={"Objects": objects_to_delete} # type: ignore[typeddict-item]
)
logger.info(
f"Cleaned up {len(objects_to_delete)} test objects from {backend_config['backend_name']}"
)
except Exception as e:
logger.warning(f"Failed to cleanup test objects: {e}")
class TestS3BackedFileStore:
"""Test suite for S3BackedFileStore using real S3-compatible storage (MinIO or AWS S3)"""
def test_store_initialization(self, file_store: S3BackedFileStore) -> None:
"""Test that the file store initializes properly"""
# The fixture already calls initialize(), so we just verify it worked
bucket_name = file_store._get_bucket_name()
assert bucket_name.startswith(TEST_BUCKET_NAME) # Should be backend-specific
# Verify bucket exists by trying to list objects
s3_client = file_store._get_s3_client()
# This should not raise an exception
s3_client.list_objects_v2(Bucket=bucket_name, MaxKeys=1)
def test_save_and_read_text_file(self, file_store: S3BackedFileStore) -> None:
"""Test saving and reading a text file"""
file_id = f"{uuid.uuid4()}.txt"
display_name = "Test Text File"
content = "This is a test text file content.\nWith multiple lines."
file_type = "text/plain"
file_origin = FileOrigin.OTHER
# Save the file
content_io = BytesIO(content.encode("utf-8"))
returned_file_id = file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
assert returned_file_id == file_id
# Read the file back
read_content_io = file_store.read_file(file_id)
read_content = read_content_io.read().decode("utf-8")
assert read_content == content
# Verify file record in database
file_record = file_store.read_file_record(file_id)
assert file_record.file_id == file_id
assert file_record.display_name == display_name
assert file_record.file_origin == file_origin
assert file_record.file_type == file_type
assert (
file_record.bucket_name == file_store._get_bucket_name()
) # Use actual bucket name
# The object key should include the tenant ID
expected_object_key = f"{file_store._s3_prefix}/{TEST_TENANT_ID}/{file_id}"
assert file_record.object_key == expected_object_key
def test_save_and_read_binary_file(self, file_store: S3BackedFileStore) -> None:
"""Test saving and reading a binary file"""
file_id = f"{uuid.uuid4()}.bin"
display_name = "Test Binary File"
# Create some binary content
content = bytes(range(256)) # 0-255 bytes
file_type = "application/octet-stream"
file_origin = FileOrigin.CONNECTOR
# Save the file
content_io = BytesIO(content)
returned_file_id = file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
assert returned_file_id == file_id
# Read the file back
read_content_io = file_store.read_file(file_id)
read_content = read_content_io.read()
assert read_content == content
def test_save_with_metadata(self, file_store: S3BackedFileStore) -> None:
"""Test saving a file with metadata"""
file_id = f"{uuid.uuid4()}.json"
display_name = "Test Metadata File"
content = '{"key": "value", "number": 42}'
file_type = "application/json"
file_origin = FileOrigin.CHAT_UPLOAD
metadata: Dict[str, Any] = {
"source": "test_suite",
"version": "1.0",
"tags": ["test", "json"],
"size": len(content),
}
# Save the file with metadata
content_io = BytesIO(content.encode("utf-8"))
returned_file_id = file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_metadata=metadata,
file_id=file_id,
)
assert returned_file_id == file_id
# Verify metadata is stored in database
file_record = file_store.read_file_record(file_id)
assert file_record.file_metadata == metadata
def test_has_file(self, file_store: S3BackedFileStore) -> None:
"""Test the has_file method"""
file_id = f"{uuid.uuid4()}.txt"
display_name = "Test Has File"
content = "Content for has_file test"
file_type = "text/plain"
file_origin = FileOrigin.OTHER
# Initially, file should not exist
assert not file_store.has_file(
file_id=file_id,
file_origin=file_origin,
file_type=file_type,
)
# Save the file
content_io = BytesIO(content.encode("utf-8"))
returned_file_id = file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
assert returned_file_id == file_id
# Now file should exist
assert file_store.has_file(
file_id=file_id,
file_origin=file_origin,
file_type=file_type,
)
# Test with wrong parameters
assert not file_store.has_file(
file_id=file_id,
file_origin=FileOrigin.CONNECTOR, # Wrong origin
file_type=file_type,
)
assert not file_store.has_file(
file_id=file_id,
file_origin=file_origin,
file_type="application/pdf", # Wrong type
)
def test_read_file_with_tempfile(self, file_store: S3BackedFileStore) -> None:
"""Test reading a file using temporary file"""
file_id = f"{uuid.uuid4()}.txt"
display_name = "Test Temp File"
content = "Content for temporary file test"
file_type = "text/plain"
file_origin = FileOrigin.OTHER
# Save the file
content_io = BytesIO(content.encode("utf-8"))
returned_file_id = file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
assert returned_file_id == file_id
# Read using temporary file
temp_file = file_store.read_file(file_id, use_tempfile=True)
# Read content from temp file
temp_file.seek(0)
read_content_bytes = temp_file.read()
if isinstance(read_content_bytes, bytes):
read_content_str = read_content_bytes.decode("utf-8")
else:
read_content_str = str(read_content_bytes)
assert read_content_str == content
# Clean up the temp file
temp_file.close()
if hasattr(temp_file, "name"):
try:
os.unlink(temp_file.name)
except (OSError, AttributeError):
pass
def test_delete_file(self, file_store: S3BackedFileStore) -> None:
"""Test deleting a file"""
file_id = f"{uuid.uuid4()}.txt"
display_name = "Test Delete File"
content = "Content for delete test"
file_type = "text/plain"
file_origin = FileOrigin.OTHER
# Save the file
content_io = BytesIO(content.encode("utf-8"))
returned_file_id = file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
assert returned_file_id == file_id
# Verify file exists
assert file_store.has_file(
file_id=file_id,
file_origin=file_origin,
file_type=file_type,
)
# Delete the file
file_store.delete_file(file_id)
# Verify file no longer exists
assert not file_store.has_file(
file_id=file_id,
file_origin=file_origin,
file_type=file_type,
)
# Verify trying to read deleted file raises exception
with pytest.raises(RuntimeError, match="does not exist or was deleted"):
file_store.read_file(file_id)
def test_get_file_with_mime_type(self, file_store: S3BackedFileStore) -> None:
"""Test getting file with mime type detection"""
file_id = f"{uuid.uuid4()}.txt"
display_name = "Test MIME Type"
content = "This is a plain text file"
file_type = "text/plain"
file_origin = FileOrigin.OTHER
# Save the file
content_io = BytesIO(content.encode("utf-8"))
returned_file_id = file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
assert returned_file_id == file_id
# Get file with mime type
file_with_mime = file_store.get_file_with_mime_type(file_id)
assert file_with_mime is not None
assert file_with_mime.data.decode("utf-8") == content
# The detected mime type might be different from what we stored
assert file_with_mime.mime_type is not None
def test_file_overwrite(self, file_store: S3BackedFileStore) -> None:
"""Test overwriting an existing file"""
file_id = f"{uuid.uuid4()}.txt"
display_name = "Test Overwrite"
original_content = "Original content"
new_content = "New content after overwrite"
file_type = "text/plain"
file_origin = FileOrigin.OTHER
# Save original file
content_io = BytesIO(original_content.encode("utf-8"))
returned_file_id = file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
assert returned_file_id == file_id
# Verify original content
read_content_io = file_store.read_file(file_id)
assert read_content_io.read().decode("utf-8") == original_content
# Overwrite with new content
new_content_io = BytesIO(new_content.encode("utf-8"))
returned_file_id_2 = file_store.save_file(
content=new_content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
assert returned_file_id_2 == file_id
# Verify new content
read_content_io = file_store.read_file(file_id)
assert read_content_io.read().decode("utf-8") == new_content
def test_large_file_handling(self, file_store: S3BackedFileStore) -> None:
"""Test handling of larger files"""
file_id = f"{uuid.uuid4()}.bin"
display_name = "Test Large File"
# Create a 1MB file
content_size = 1024 * 1024 # 1MB
content = b"A" * content_size
file_type = "application/octet-stream"
file_origin = FileOrigin.CONNECTOR
# Save the large file
content_io = BytesIO(content)
returned_file_id = file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
assert returned_file_id == file_id
# Read the file back
read_content_io = file_store.read_file(file_id)
read_content = read_content_io.read()
assert len(read_content) == content_size
assert read_content == content
def test_error_handling_nonexistent_file(
self, file_store: S3BackedFileStore
) -> None:
"""Test error handling when trying to read a non-existent file"""
nonexistent_file_id = f"{uuid.uuid4()}.txt"
with pytest.raises(RuntimeError, match="does not exist or was deleted"):
file_store.read_file(nonexistent_file_id)
with pytest.raises(RuntimeError, match="does not exist or was deleted"):
file_store.read_file_record(nonexistent_file_id)
# get_file_with_mime_type should return None for non-existent files
result = file_store.get_file_with_mime_type(nonexistent_file_id)
assert result is None
def test_error_handling_delete_nonexistent_file(
self, file_store: S3BackedFileStore
) -> None:
"""Test error handling when trying to delete a non-existent file"""
nonexistent_file_id = f"{uuid.uuid4()}.txt"
# Should raise an exception when trying to delete non-existent file
with pytest.raises(RuntimeError, match="does not exist or was deleted"):
file_store.delete_file(nonexistent_file_id)
def test_multiple_files_different_origins(
self, file_store: S3BackedFileStore
) -> None:
"""Test storing multiple files with different origins and types"""
files_data: List[FileTestData] = [
{
"name": f"{uuid.uuid4()}.txt",
"display_name": "Chat Upload File",
"content": "Content from chat upload",
"type": "text/plain",
"origin": FileOrigin.CHAT_UPLOAD,
},
{
"name": f"{uuid.uuid4()}.json",
"display_name": "Connector File",
"content": '{"from": "connector"}',
"type": "application/json",
"origin": FileOrigin.CONNECTOR,
},
{
"name": f"{uuid.uuid4()}.csv",
"display_name": "Generated Report",
"content": "col1,col2\nval1,val2",
"type": "text/csv",
"origin": FileOrigin.GENERATED_REPORT,
},
]
# Save all files
for file_data in files_data:
content_io = BytesIO(file_data["content"].encode("utf-8"))
returned_file_id = file_store.save_file(
content=content_io,
display_name=file_data["display_name"],
file_origin=file_data["origin"],
file_type=file_data["type"],
file_id=file_data["name"],
)
assert returned_file_id == file_data["name"]
# Verify all files exist and have correct properties
for file_data in files_data:
assert file_store.has_file(
file_id=file_data["name"],
file_origin=file_data["origin"],
file_type=file_data["type"],
)
# Read and verify content
read_content_io = file_store.read_file(file_data["name"])
read_content = read_content_io.read().decode("utf-8")
assert read_content == file_data["content"]
# Verify record
file_record = file_store.read_file_record(file_data["name"])
assert file_record.file_origin == file_data["origin"]
assert file_record.file_type == file_data["type"]
def test_special_characters_in_filenames(
self, file_store: S3BackedFileStore
) -> None:
"""Test handling of special characters in filenames"""
# Note: S3 keys have some restrictions, so we test reasonable special characters
special_files: List[str] = [
f"{uuid.uuid4()} with spaces.txt",
f"{uuid.uuid4()}-with-dashes.txt",
f"{uuid.uuid4()}_with_underscores.txt",
f"{uuid.uuid4()}.with.dots.txt",
f"{uuid.uuid4()}(with)parentheses.txt",
]
for file_id in special_files:
content = f"Content for {file_id}"
content_io = BytesIO(content.encode("utf-8"))
# Save the file
returned_file_id = file_store.save_file(
content=content_io,
display_name=f"Display: {file_id}",
file_origin=FileOrigin.OTHER,
file_type="text/plain",
file_id=file_id,
)
assert returned_file_id == file_id
# Read and verify
read_content_io = file_store.read_file(file_id)
read_content = read_content_io.read().decode("utf-8")
assert read_content == content
@pytest.mark.skipif(
not os.environ.get("TEST_S3_NETWORK_ERRORS"),
reason="Network error tests require TEST_S3_NETWORK_ERRORS environment variable",
)
def test_network_error_handling(self, file_store: S3BackedFileStore) -> None:
"""Test handling of network errors (requires special setup)"""
# This test requires specific network configuration to simulate failures
# It's marked as skip by default and only runs when explicitly enabled
# Mock a network error during file operations
with patch.object(file_store, "_get_s3_client") as mock_client:
mock_s3 = mock_client.return_value
mock_s3.put_object.side_effect = ClientError(
error_response={
"Error": {
"Code": "NetworkingError",
"Message": "Connection timeout",
}
},
operation_name="PutObject",
)
content_io = BytesIO(b"test content")
with pytest.raises(ClientError):
file_store.save_file(
content=content_io,
display_name="Network Error Test",
file_origin=FileOrigin.OTHER,
file_type="text/plain",
file_id=f"{uuid.uuid4()}.txt",
)
def test_database_transaction_rollback(self, file_store: S3BackedFileStore) -> None:
"""Test database transaction rollback behavior with PostgreSQL"""
file_id = f"{uuid.uuid4()}.txt"
display_name = "Test Rollback"
content = "Content for rollback test"
file_type = "text/plain"
file_origin = FileOrigin.OTHER
# Mock S3 to fail after database write but before commit
with patch.object(file_store, "_get_s3_client") as mock_client:
mock_s3 = mock_client.return_value
mock_s3.put_object.side_effect = ClientError(
error_response={
"Error": {"Code": "InternalError", "Message": "S3 internal error"}
},
operation_name="PutObject",
)
content_io = BytesIO(content.encode("utf-8"))
# This should fail and rollback the database transaction
with pytest.raises(ClientError):
file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
# Verify that the database record was not created due to rollback
with pytest.raises(RuntimeError, match="does not exist or was deleted"):
file_store.read_file_record(file_id)
def test_complex_jsonb_metadata(self, file_store: S3BackedFileStore) -> None:
"""Test PostgreSQL JSONB metadata handling with complex data structures"""
file_id = f"{uuid.uuid4()}.json"
display_name = "Test Complex Metadata"
content = '{"data": "test"}'
file_type = "application/json"
file_origin = FileOrigin.CONNECTOR
# Complex metadata that tests PostgreSQL JSONB capabilities
complex_metadata: Dict[str, Any] = {
"nested": {
"array": [1, 2, 3, {"inner": "value"}],
"boolean": True,
"null_value": None,
"number": 42.5,
},
"unicode": "测试数据 🚀",
"special_chars": "Line 1\nLine 2\t\r\nSpecial: !@#$%^&*()",
"large_text": "x" * 1000, # Test large text in JSONB
"timestamps": {
"created": "2024-01-01T00:00:00Z",
"updated": "2024-01-02T12:30:45Z",
},
}
# Save file with complex metadata
content_io = BytesIO(content.encode("utf-8"))
returned_file_id = file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_metadata=complex_metadata,
file_id=file_id,
)
assert returned_file_id == file_id
# Retrieve and verify the metadata was stored correctly
file_record = file_store.read_file_record(file_id)
stored_metadata = file_record.file_metadata
# Verify all metadata fields were preserved
assert stored_metadata == complex_metadata
# Type casting for complex metadata access
stored_metadata_dict = cast(Dict[str, Any], stored_metadata)
nested_data = cast(Dict[str, Any], stored_metadata_dict["nested"])
array_data = cast(List[Any], nested_data["array"])
inner_obj = cast(Dict[str, Any], array_data[3])
assert inner_obj["inner"] == "value"
assert stored_metadata_dict["unicode"] == "测试数据 🚀"
assert nested_data["boolean"] is True
assert nested_data["null_value"] is None
assert len(cast(str, stored_metadata_dict["large_text"])) == 1000
def test_database_consistency_after_s3_failure(
self, file_store: S3BackedFileStore
) -> None:
"""Test that database stays consistent when S3 operations fail"""
file_id = f"{uuid.uuid4()}.txt"
display_name = "Test Consistency"
content = "Initial content"
file_type = "text/plain"
file_origin = FileOrigin.OTHER
# First, save a file successfully
content_io = BytesIO(content.encode("utf-8"))
returned_file_id = file_store.save_file(
content=content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
assert returned_file_id == file_id
# Verify initial state
assert file_store.has_file(file_id, file_origin, file_type)
initial_record = file_store.read_file_record(file_id)
# Now try to update but fail on S3 side
with patch.object(file_store, "_get_s3_client") as mock_client:
mock_s3 = mock_client.return_value
# Let the first call (for reading/checking) succeed, but fail on put_object
mock_s3.put_object.side_effect = ClientError(
error_response={
"Error": {
"Code": "ServiceUnavailable",
"Message": "Service temporarily unavailable",
}
},
operation_name="PutObject",
)
new_content = "Updated content that should fail"
new_content_io = BytesIO(new_content.encode("utf-8"))
# This should fail and rollback
with pytest.raises(ClientError):
file_store.save_file(
content=new_content_io,
display_name=display_name,
file_origin=file_origin,
file_type=file_type,
file_id=file_id,
)
# Verify the database record is unchanged (not updated)
current_record = file_store.read_file_record(file_id)
assert current_record.file_id == initial_record.file_id
assert current_record.display_name == initial_record.display_name
assert current_record.bucket_name == initial_record.bucket_name
assert current_record.object_key == initial_record.object_key
# Verify we can still read the original file content
read_content_io = file_store.read_file(file_id)
read_content = read_content_io.read().decode("utf-8")
assert read_content == content # Original content, not the failed update
def test_concurrent_file_operations(self, file_store: S3BackedFileStore) -> None:
"""Test handling of concurrent file operations on the same file"""
base_file_name: str = str(uuid.uuid4())
file_type: str = "text/plain"
file_origin: FileOrigin = FileOrigin.OTHER
# Get current file store configuration to replicate in workers
current_bucket_name = file_store._get_bucket_name()
current_access_key = file_store._aws_access_key_id
current_secret_key = file_store._aws_secret_access_key
current_region = file_store._aws_region_name
current_endpoint_url = file_store._s3_endpoint_url
current_verify_ssl = file_store._s3_verify_ssl
results: List[Tuple[str, str]] = []
errors: List[Tuple[int, str]] = []
def save_file_worker(worker_id: int) -> bool:
"""Worker function to save a file with its own database session"""
try:
# Set up tenant context for this worker
token = CURRENT_TENANT_ID_CONTEXTVAR.set(TEST_TENANT_ID)
try:
# Create a new database session for each worker to avoid conflicts
with get_session_context_manager() as worker_session:
worker_file_store = S3BackedFileStore(
db_session=worker_session,
bucket_name=current_bucket_name,
aws_access_key_id=current_access_key,
aws_secret_access_key=current_secret_key,
aws_region_name=current_region,
s3_endpoint_url=current_endpoint_url,
s3_prefix=TEST_FILE_PREFIX,
s3_verify_ssl=current_verify_ssl,
)
file_name: str = f"{base_file_name}_{worker_id}.txt"
content: str = (
f"Content from worker {worker_id} at {time.time()}"
)
content_io: BytesIO = BytesIO(content.encode("utf-8"))
worker_file_store.save_file(
file_id=file_name,
content=content_io,
display_name=f"Worker {worker_id} File",
file_origin=file_origin,
file_type=file_type,
)
results.append((file_name, content))
return True
finally:
# Reset the tenant context after the worker completes
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
except Exception as e:
errors.append((worker_id, str(e)))
return False
# Run multiple concurrent file save operations
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(save_file_worker, i) for i in range(10)]
for future in as_completed(futures):
future.result() # Wait for completion
# Verify all operations completed successfully
assert len(errors) == 0, f"Concurrent operations had errors: {errors}"
assert (
len(results) == 10
), f"Expected 10 successful operations, got {len(results)}"
# Verify all files were saved correctly
for file_id, expected_content in results:
# Check file exists
assert file_store.has_file(
file_id=file_id,
file_origin=file_origin,
file_type=file_type,
)
# Check content is correct
read_content_io = file_store.read_file(file_id)
actual_content: str = read_content_io.read().decode("utf-8")
assert actual_content == expected_content

View File

@@ -233,10 +233,11 @@ class DocumentManager:
for doc_dict in retrieved_docs_dict:
doc_id = doc_dict["fields"]["document_id"]
doc_content = doc_dict["fields"]["content"]
image_file_name = doc_dict["fields"].get("image_file_name", None)
# still called `image_file_name` in Vespa for backwards compatibility
image_file_id = doc_dict["fields"].get("image_file_name", None)
final_docs.append(
SimpleTestDocument(
id=doc_id, content=doc_content, image_file_name=image_file_name
id=doc_id, content=doc_content, image_file_id=image_file_id
)
)

View File

@@ -8,6 +8,7 @@ from typing import Tuple
import requests
from onyx.file_store.models import FileDescriptor
from onyx.server.documents.models import FileUploadResponse
from tests.integration.common_utils.constants import API_SERVER_URL
from tests.integration.common_utils.constants import GENERAL_HEADERS
from tests.integration.common_utils.test_models import DATestUser
@@ -70,7 +71,7 @@ class FileManager:
file_name: str,
user_performing_action: DATestUser,
content_type: str = "application/octet-stream",
) -> dict:
) -> FileUploadResponse:
# Read the file content
with open(file_path, "rb") as f:
file_content = f.read()
@@ -105,4 +106,4 @@ class FileManager:
)
response_json = response.json()
return response_json
return FileUploadResponse(**response_json)

View File

@@ -76,7 +76,7 @@ class DATestConnector(BaseModel):
class SimpleTestDocument(BaseModel):
id: str
content: str
image_file_name: str | None = None
image_file_id: str | None = None
class DATestCCPair(BaseModel):

View File

@@ -15,7 +15,6 @@ from tests.integration.common_utils.managers.document import DocumentManager
from tests.integration.common_utils.managers.file import FileManager
from tests.integration.common_utils.managers.llm_provider import LLMProviderManager
from tests.integration.common_utils.managers.settings import SettingsManager
from tests.integration.common_utils.managers.user import UserManager
from tests.integration.common_utils.test_models import DATestSettings
from tests.integration.common_utils.test_models import DATestUser
from tests.integration.common_utils.vespa import vespa_fixture
@@ -26,13 +25,9 @@ FILE_PATH = "tests/integration/common_utils/test_files"
def test_image_indexing(
reset: None,
admin_user: DATestUser,
vespa_client: vespa_fixture,
) -> None:
# Creating an admin user (first user created is automatically an admin)
admin_user: DATestUser = UserManager.create(
email="admin@onyx-test.com",
)
os.makedirs(FILE_PATH, exist_ok=True)
test_file_path = os.path.join(FILE_PATH, FILE_NAME)
@@ -54,7 +49,7 @@ def test_image_indexing(
user_performing_action=admin_user,
)
file_paths = upload_response.get("file_paths", [])
file_paths = upload_response.file_paths
if not file_paths:
pytest.fail("File upload failed - no file paths returned")
@@ -95,23 +90,23 @@ def test_image_indexing(
CCPairManager.wait_for_indexing_completion(
cc_pair=cc_pair,
after=datetime.now(timezone.utc),
timeout=180,
user_performing_action=admin_user,
)
with get_session_context_manager() as db_session:
# really gets the chunks from Vespa, which is why there are two;
# one for the raw text and one for the summarized image.
documents = DocumentManager.fetch_documents_for_cc_pair(
cc_pair_id=cc_pair.id,
db_session=db_session,
vespa_client=vespa_client,
)
# Ensure we indexed an image from the sample.pdf file
has_sample_pdf_image = False
for doc in documents:
if doc.image_file_name and FILE_NAME in doc.image_file_name:
has_sample_pdf_image = True
# Assert that at least one document has an image file name containing "sample.pdf"
assert (
has_sample_pdf_image
), "No document found with an image file name containing 'sample.pdf'"
assert len(documents) == 2
for document in documents:
if "These are Johns dogs" in document.content:
assert document.image_file_id is None
else:
assert document.image_file_id is not None
assert file_paths[0] in document.image_file_id

View File

@@ -53,10 +53,10 @@ def test_zip_metadata_handling(
content_type="application/zip",
)
file_paths = upload_response.get("file_paths", [])
file_paths = upload_response.file_paths
assert file_paths, "File upload failed - no file paths returned"
if has_metadata:
metadata = upload_response.get("zip_metadata", {})
metadata = upload_response.zip_metadata
assert metadata, "Metadata should be present"
else:
metadata = {}

View File

@@ -84,13 +84,13 @@ def test_mock_connector_basic_flow(
# Verify results
with get_session_context_manager() as db_session:
documents = DocumentManager.fetch_documents_for_cc_pair(
chunks = DocumentManager.fetch_documents_for_cc_pair(
cc_pair_id=cc_pair.id,
db_session=db_session,
vespa_client=vespa_client,
)
assert len(documents) == 1
assert documents[0].id == test_doc.id
assert len(chunks) == 1
assert chunks[0].id == test_doc.id
errors = IndexAttemptManager.get_index_attempt_errors_for_cc_pair(
cc_pair_id=cc_pair.id,

View File

@@ -34,7 +34,7 @@ def create_test_chunk(
metadata={},
match_highlights=[],
updated_at=datetime.now(),
image_file_name=None,
image_file_id=None,
doc_summary="",
chunk_context="",
)

View File

@@ -0,0 +1,328 @@
import datetime
from collections.abc import Generator
from io import BytesIO
from typing import Any
from unittest.mock import MagicMock
from unittest.mock import Mock
from unittest.mock import patch
import pytest
from sqlalchemy import create_engine
from sqlalchemy import DateTime
from sqlalchemy import Enum
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import func
from onyx.configs.constants import FileOrigin
from onyx.file_store.file_store import get_default_file_store
from onyx.file_store.file_store import S3BackedFileStore
@pytest.fixture
def db_session() -> Generator[Session, None, None]:
"""Create an in-memory SQLite database for testing"""
# Create test-specific base and model that matches the actual FileRecord structure
# but uses SQLite-compatible types
TestDBBase = declarative_base()
class FileRecord(TestDBBase):
__tablename__: str = "file_record"
# Internal file ID, must be unique across all files
file_id: Mapped[str] = mapped_column(String, primary_key=True)
display_name: Mapped[str | None] = mapped_column(String, nullable=True)
file_origin: Mapped[FileOrigin] = mapped_column(
Enum(FileOrigin, native_enum=False), nullable=False
)
file_type: Mapped[str] = mapped_column(String, default="text/plain")
# External storage support (S3, MinIO, Azure Blob, etc.)
bucket_name: Mapped[str] = mapped_column(String, nullable=False)
object_key: Mapped[str] = mapped_column(String, nullable=False)
# Timestamps for external storage
created_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
updated_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
engine = create_engine("sqlite:///:memory:")
TestDBBase.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
yield session
session.close()
@pytest.fixture
def sample_content() -> bytes:
"""Sample file content for testing"""
return b"This is a test file content"
@pytest.fixture
def sample_file_io(sample_content: bytes) -> BytesIO:
"""Sample file IO object for testing"""
return BytesIO(sample_content)
class TestExternalStorageFileStore:
"""Test external storage file store functionality (S3-compatible)"""
def test_get_default_file_store_s3(self, db_session: Session) -> None:
"""Test that external storage file store is returned"""
file_store = get_default_file_store(db_session)
assert isinstance(file_store, S3BackedFileStore)
def test_s3_client_initialization_with_credentials(
self, db_session: Session
) -> None:
"""Test S3 client initialization with explicit credentials"""
with patch("boto3.client") as mock_boto3:
file_store = S3BackedFileStore(
db_session,
bucket_name="test-bucket",
aws_access_key_id="test-key",
aws_secret_access_key="test-secret",
aws_region_name="us-west-2",
s3_endpoint_url=None,
)
file_store._get_s3_client()
# Verify boto3 client was called with the expected arguments
mock_boto3.assert_called_once()
call_kwargs: dict[str, Any] = mock_boto3.call_args[1]
assert call_kwargs["service_name"] == "s3"
assert call_kwargs["aws_access_key_id"] == "test-key"
assert call_kwargs["aws_secret_access_key"] == "test-secret"
assert call_kwargs["region_name"] == "us-west-2"
def test_s3_client_initialization_with_iam_role(self, db_session: Session) -> None:
"""Test S3 client initialization with IAM role (no explicit credentials)"""
with patch("boto3.client") as mock_boto3:
file_store = S3BackedFileStore(
db_session,
bucket_name="test-bucket",
aws_access_key_id=None,
aws_secret_access_key=None,
aws_region_name="us-west-2",
s3_endpoint_url=None,
)
file_store._get_s3_client()
# Verify boto3 client was called with the expected arguments
mock_boto3.assert_called_once()
call_kwargs: dict[str, Any] = mock_boto3.call_args[1]
assert call_kwargs["service_name"] == "s3"
assert call_kwargs["region_name"] == "us-west-2"
# Should not have explicit credentials
assert "aws_access_key_id" not in call_kwargs
assert "aws_secret_access_key" not in call_kwargs
def test_s3_bucket_name_configuration(self, db_session: Session) -> None:
"""Test S3 bucket name configuration"""
with patch(
"onyx.file_store.file_store.S3_FILE_STORE_BUCKET_NAME", "my-test-bucket"
):
file_store = S3BackedFileStore(db_session, bucket_name="my-test-bucket")
bucket_name: str = file_store._get_bucket_name()
assert bucket_name == "my-test-bucket"
def test_s3_key_generation_default_prefix(self, db_session: Session) -> None:
"""Test S3 key generation with default prefix"""
with (
patch("onyx.file_store.file_store.S3_FILE_STORE_PREFIX", "onyx-files"),
patch(
"onyx.file_store.file_store.get_current_tenant_id",
return_value="test-tenant",
),
):
file_store = S3BackedFileStore(db_session, bucket_name="test-bucket")
s3_key: str = file_store._get_s3_key("test-file.txt")
assert s3_key == "onyx-files/test-tenant/test-file.txt"
def test_s3_key_generation_custom_prefix(self, db_session: Session) -> None:
"""Test S3 key generation with custom prefix"""
with (
patch("onyx.file_store.file_store.S3_FILE_STORE_PREFIX", "custom-prefix"),
patch(
"onyx.file_store.file_store.get_current_tenant_id",
return_value="test-tenant",
),
):
file_store = S3BackedFileStore(
db_session, bucket_name="test-bucket", s3_prefix="custom-prefix"
)
s3_key: str = file_store._get_s3_key("test-file.txt")
assert s3_key == "custom-prefix/test-tenant/test-file.txt"
def test_s3_key_generation_with_different_tenant_ids(
self, db_session: Session
) -> None:
"""Test S3 key generation with different tenant IDs"""
with patch("onyx.file_store.file_store.S3_FILE_STORE_PREFIX", "onyx-files"):
file_store = S3BackedFileStore(db_session, bucket_name="test-bucket")
# Test with tenant ID "tenant-1"
with patch(
"onyx.file_store.file_store.get_current_tenant_id",
return_value="tenant-1",
):
s3_key = file_store._get_s3_key("document.pdf")
assert s3_key == "onyx-files/tenant-1/document.pdf"
# Test with tenant ID "tenant-2"
with patch(
"onyx.file_store.file_store.get_current_tenant_id",
return_value="tenant-2",
):
s3_key = file_store._get_s3_key("document.pdf")
assert s3_key == "onyx-files/tenant-2/document.pdf"
# Test with default tenant (public)
with patch(
"onyx.file_store.file_store.get_current_tenant_id",
return_value="public",
):
s3_key = file_store._get_s3_key("document.pdf")
assert s3_key == "onyx-files/public/document.pdf"
@patch("boto3.client")
def test_s3_save_file_mock(
self, mock_boto3: MagicMock, db_session: Session, sample_file_io: BytesIO
) -> None:
"""Test S3 file saving with mocked S3 client"""
# Setup S3 mock
mock_s3_client: Mock = Mock()
mock_boto3.return_value = mock_s3_client
# Create a mock database session
mock_db_session: Mock = Mock()
mock_db_session.commit = Mock()
mock_db_session.rollback = Mock()
with (
patch(
"onyx.file_store.file_store.S3_FILE_STORE_BUCKET_NAME", "test-bucket"
),
patch("onyx.file_store.file_store.S3_FILE_STORE_PREFIX", "onyx-files"),
patch("onyx.file_store.file_store.S3_AWS_ACCESS_KEY_ID", "test-key"),
patch("onyx.file_store.file_store.S3_AWS_SECRET_ACCESS_KEY", "test-secret"),
):
# Mock the database operation to avoid SQLAlchemy issues
with patch("onyx.db.file_record.upsert_filerecord") as mock_upsert:
mock_upsert.return_value = Mock()
file_store = S3BackedFileStore(
mock_db_session, bucket_name="test-bucket"
)
# This should not raise an exception
file_store.save_file(
file_id="test-file.txt",
content=sample_file_io,
display_name="Test File",
file_origin=FileOrigin.OTHER,
file_type="text/plain",
)
# Verify S3 client was called correctly
mock_s3_client.put_object.assert_called_once()
call_args = mock_s3_client.put_object.call_args
assert call_args[1]["Bucket"] == "test-bucket"
assert call_args[1]["Key"] == "onyx-files/public/test-file.txt"
assert call_args[1]["ContentType"] == "text/plain"
def test_minio_client_initialization(self, db_session: Session) -> None:
"""Test S3 client initialization with MinIO endpoint"""
with (
patch("boto3.client") as mock_boto3,
patch("urllib3.disable_warnings"),
):
file_store = S3BackedFileStore(
db_session,
bucket_name="test-bucket",
aws_access_key_id="minioadmin",
aws_secret_access_key="minioadmin",
aws_region_name="us-east-1",
s3_endpoint_url="http://localhost:9000",
s3_verify_ssl=False,
)
file_store._get_s3_client()
# Verify boto3 client was called with MinIO-specific settings
mock_boto3.assert_called_once()
call_kwargs: dict[str, Any] = mock_boto3.call_args[1]
assert call_kwargs["service_name"] == "s3"
assert call_kwargs["endpoint_url"] == "http://localhost:9000"
assert call_kwargs["aws_access_key_id"] == "minioadmin"
assert call_kwargs["aws_secret_access_key"] == "minioadmin"
assert call_kwargs["region_name"] == "us-east-1"
assert call_kwargs["verify"] is False
# Verify S3 configuration for MinIO
config = call_kwargs["config"]
assert config.signature_version == "s3v4"
assert config.s3["addressing_style"] == "path"
def test_minio_ssl_verification_enabled(self, db_session: Session) -> None:
"""Test MinIO with SSL verification enabled"""
with patch("boto3.client") as mock_boto3:
file_store = S3BackedFileStore(
db_session,
bucket_name="test-bucket",
aws_access_key_id="test-key",
aws_secret_access_key="test-secret",
s3_endpoint_url="https://minio.example.com",
s3_verify_ssl=True,
)
file_store._get_s3_client()
call_kwargs: dict[str, Any] = mock_boto3.call_args[1]
# When SSL verification is enabled, verify should not be in kwargs (defaults to True)
assert "verify" not in call_kwargs or call_kwargs.get("verify") is not False
assert call_kwargs["endpoint_url"] == "https://minio.example.com"
def test_aws_s3_without_endpoint_url(self, db_session: Session) -> None:
"""Test that regular AWS S3 doesn't include endpoint URL or custom config"""
with patch("boto3.client") as mock_boto3:
file_store = S3BackedFileStore(
db_session,
bucket_name="test-bucket",
aws_access_key_id="test-key",
aws_secret_access_key="test-secret",
aws_region_name="us-west-2",
s3_endpoint_url=None,
)
file_store._get_s3_client()
call_kwargs: dict[str, Any] = mock_boto3.call_args[1]
# For regular AWS S3, endpoint_url should not be present
assert "endpoint_url" not in call_kwargs
assert call_kwargs["service_name"] == "s3"
assert call_kwargs["region_name"] == "us-west-2"
# config should not be present for regular AWS S3
assert "config" not in call_kwargs
class TestFileStoreInterface:
"""Test the general file store interface"""
def test_file_store_always_external_storage(self, db_session: Session) -> None:
"""Test that external storage file store is always returned"""
# File store should always be S3BackedFileStore regardless of environment
file_store = get_default_file_store(db_session)
assert isinstance(file_store, S3BackedFileStore)

View File

@@ -65,7 +65,7 @@ def create_test_inference_chunk(
section_continuation=False,
document_id=document_id,
source_type=DocumentSource.FILE,
image_file_name=None,
image_file_id=None,
title=title,
semantic_identifier=semantic_identifier,
boost=1,

View File

@@ -85,7 +85,7 @@ def mock_inference_sections() -> list[InferenceSection]:
updated_at=datetime(2023, 1, 1),
source_links={0: "https://example.com/doc1"},
match_highlights=[],
image_file_name=None,
image_file_id=None,
doc_summary="",
chunk_context="",
),
@@ -110,7 +110,7 @@ def mock_inference_sections() -> list[InferenceSection]:
updated_at=datetime(2023, 1, 2),
source_links={0: "https://example.com/doc2"},
match_highlights=[],
image_file_name=None,
image_file_id=None,
doc_summary="",
chunk_context="",
),

View File

@@ -150,7 +150,7 @@ def test_fuzzy_match_quotes_to_docs() -> None:
metadata={},
match_highlights=[],
updated_at=None,
image_file_name=None,
image_file_id=None,
doc_summary="",
chunk_context="",
)
@@ -171,7 +171,7 @@ def test_fuzzy_match_quotes_to_docs() -> None:
metadata={},
match_highlights=[],
updated_at=None,
image_file_name=None,
image_file_id=None,
doc_summary="",
chunk_context="",
)

View File

@@ -38,7 +38,7 @@ def create_inference_chunk(
metadata={},
match_highlights=[],
updated_at=None,
image_file_name=None,
image_file_id=None,
doc_summary="",
chunk_context="",
)

View File

@@ -38,7 +38,7 @@ class TestPostQueryChunkCensoring:
doc_summary="doc1 summary",
chunk_context="doc1 context",
updated_at=None,
image_file_name=None,
image_file_id=None,
source_links={},
section_continuation=False,
blurb="chunk1",
@@ -59,7 +59,7 @@ class TestPostQueryChunkCensoring:
doc_summary="doc2 summary",
chunk_context="doc2 context",
updated_at=None,
image_file_name=None,
image_file_id=None,
source_links={},
section_continuation=False,
blurb="chunk2",
@@ -80,7 +80,7 @@ class TestPostQueryChunkCensoring:
doc_summary="doc3 summary",
chunk_context="doc3 context",
updated_at=None,
image_file_name=None,
image_file_id=None,
source_links={},
section_continuation=False,
blurb="chunk3",
@@ -101,7 +101,7 @@ class TestPostQueryChunkCensoring:
doc_summary="doc4 summary",
chunk_context="doc4 context",
updated_at=None,
image_file_name=None,
image_file_id=None,
source_links={},
section_continuation=False,
blurb="chunk4",

View File

@@ -68,7 +68,7 @@ def test_default_indexing_embedder_embed_chunks(
mini_chunk_texts=None,
large_chunk_reference_ids=[],
large_chunk_id=None,
image_file_name=None,
image_file_id=None,
chunk_context=chunk_context,
doc_summary=doc_summary,
contextual_rag_reserved_tokens=200,

View File

@@ -172,7 +172,7 @@ def create_test_chunk(
large_chunk_reference_ids=[],
embeddings=ChunkEmbedding(full_embedding=[], mini_chunk_embeddings=[]),
title_embedding=None,
image_file_name=None,
image_file_id=None,
chunk_context="",
doc_summary="",
contextual_rag_reserved_tokens=200,

View File

@@ -71,6 +71,11 @@ services:
- VESPA_HOST=index
- REDIS_HOST=cache
- WEB_DOMAIN=${WEB_DOMAIN:-} # For frontend redirect auth purpose
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
# Don't change the NLP model configs unless you know what you're doing
- EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-}
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
@@ -191,6 +196,11 @@ services:
- VESPA_HOST=index
- REDIS_HOST=cache
- WEB_DOMAIN=${WEB_DOMAIN:-} # For frontend redirect auth purpose for OAuth2 connectors
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
# Don't change the NLP model configs unless you know what you're doing
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
- DOC_EMBEDDING_DIM=${DOC_EMBEDDING_DIM:-}
@@ -448,9 +458,29 @@ services:
# persistence. explicitly setting save and appendonly forces ephemeral behavior.
command: redis-server --save "" --appendonly no
minio:
image: minio/minio:latest
restart: always
ports:
- "9004:9000"
- "9005:9001"
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
volumes:
- minio_data:/data
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
volumes:
db_volume:
vespa_volume: # Created by the container itself
minio_data:
model_cache_huggingface:
indexing_huggingface_model_cache:

View File

@@ -58,6 +58,11 @@ services:
- VESPA_HOST=index
- REDIS_HOST=cache
- WEB_DOMAIN=${WEB_DOMAIN:-} # For frontend redirect auth purpose
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
# Don't change the NLP model configs unless you know what you're doing
- EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-}
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
@@ -154,6 +159,11 @@ services:
- VESPA_HOST=index
- REDIS_HOST=cache
- WEB_DOMAIN=${WEB_DOMAIN:-} # For frontend redirect auth purpose for OAuth2 connectors
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
# Don't change the NLP model configs unless you know what you're doing
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
- DOC_EMBEDDING_DIM=${DOC_EMBEDDING_DIM:-}
@@ -385,6 +395,25 @@ services:
/bin/sh -c "dos2unix /etc/nginx/conf.d/run-nginx.sh
&& /etc/nginx/conf.d/run-nginx.sh app.conf.template.dev"
minio:
image: minio/minio:latest
restart: always
ports:
- "9004:9000"
- "9005:9001"
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
volumes:
- minio_data:/data
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
cache:
image: redis:7.4-alpine
restart: always
@@ -397,6 +426,7 @@ services:
volumes:
db_volume:
vespa_volume:
minio_data:
# Created by the container itself
model_cache_huggingface:
indexing_huggingface_model_cache:

View File

@@ -70,6 +70,11 @@ services:
- VESPA_HOST=index
- REDIS_HOST=cache
- WEB_DOMAIN=${WEB_DOMAIN:-}
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
# Don't change the NLP model configs unless you know what you're doing
- EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-}
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
@@ -173,6 +178,11 @@ services:
- VESPA_HOST=index
- REDIS_HOST=cache
- WEB_DOMAIN=${WEB_DOMAIN:-}
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
# Don't change the NLP model configs unless you know what you're doing
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
- DOC_EMBEDDING_DIM=${DOC_EMBEDDING_DIM:-}
@@ -412,6 +422,25 @@ services:
/bin/sh -c "dos2unix /etc/nginx/conf.d/run-nginx.sh
&& /etc/nginx/conf.d/run-nginx.sh app.conf.template.dev"
minio:
image: minio/minio:latest
restart: always
ports:
- "9004:9000"
- "9005:9001"
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
volumes:
- minio_data:/data
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
cache:
image: redis:7.4-alpine
restart: always
@@ -424,6 +453,7 @@ services:
volumes:
db_volume:
vespa_volume: # Created by the container itself
minio_data:
model_cache_huggingface:
indexing_huggingface_model_cache:

View File

@@ -22,6 +22,10 @@ services:
- VESPA_HOST=index
- REDIS_HOST=cache
- MODEL_SERVER_HOST=${MODEL_SERVER_HOST:-inference_model_server}
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
extra_hosts:
- "host.docker.internal:host-gateway"
logging:
@@ -52,6 +56,10 @@ services:
- REDIS_HOST=cache
- MODEL_SERVER_HOST=${MODEL_SERVER_HOST:-inference_model_server}
- INDEXING_MODEL_SERVER_HOST=${INDEXING_MODEL_SERVER_HOST:-indexing_model_server}
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
extra_hosts:
- "host.docker.internal:host-gateway"
logging:
@@ -221,6 +229,22 @@ services:
max-file: "6"
entrypoint: "/bin/sh -c 'trap exit TERM; while :; do certbot renew; sleep 12h & wait $${!}; done;'"
minio:
image: minio/minio:latest
restart: always
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
volumes:
- minio_data:/data
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
cache:
image: redis:7.4-alpine
restart: always
@@ -233,6 +257,7 @@ services:
volumes:
db_volume:
vespa_volume:
minio_data:
# Created by the container itself
model_cache_huggingface:
indexing_huggingface_model_cache:

View File

@@ -26,6 +26,10 @@ services:
- AWS_REGION_NAME=${AWS_REGION_NAME-}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID-}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY-}
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
# Uncomment the line below to use if IAM_AUTH is true and you are using iam auth for postgres
# volumes:
# - ./bundle.pem:/app/bundle.pem:ro
@@ -67,6 +71,10 @@ services:
- AWS_REGION_NAME=${AWS_REGION_NAME-}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID-}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY-}
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
# Uncomment the line below to use if IAM_AUTH is true and you are using iam auth for postgres
# volumes:
# - ./bundle.pem:/app/bundle.pem:ro
@@ -227,6 +235,22 @@ services:
env_file:
- .env.nginx
minio:
image: minio/minio:latest
restart: always
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
volumes:
- minio_data:/data
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
cache:
image: redis:7.4-alpine
restart: always
@@ -239,6 +263,7 @@ services:
volumes:
db_volume:
vespa_volume:
minio_data:
# Created by the container itself
model_cache_huggingface:
indexing_huggingface_model_cache:

View File

@@ -27,6 +27,10 @@ services:
- AWS_REGION_NAME=${AWS_REGION_NAME-}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID-}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY-}
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
# Uncomment the line below to use if IAM_AUTH is true and you are using iam auth for postgres
# volumes:
# - ./bundle.pem:/app/bundle.pem:ro
@@ -71,6 +75,10 @@ services:
- AWS_REGION_NAME=${AWS_REGION_NAME-}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID-}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY-}
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
# Uncomment the line below to use if IAM_AUTH is true and you are using iam auth for postgres
# volumes:
# - ./bundle.pem:/app/bundle.pem:ro
@@ -257,6 +265,22 @@ services:
max-file: "6"
entrypoint: "/bin/sh -c 'trap exit TERM; while :; do certbot renew; sleep 12h & wait $${!}; done;'"
minio:
image: minio/minio:latest
restart: always
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
volumes:
- minio_data:/data
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
cache:
image: redis:7.4-alpine
restart: always
@@ -269,6 +293,7 @@ services:
volumes:
db_volume:
vespa_volume:
minio_data:
# Created by the container itself
model_cache_huggingface:
indexing_huggingface_model_cache:

View File

@@ -74,3 +74,13 @@ services:
# reservations:
# cpus: ${POSTGRES_CPU_RESERVATION}
# memory: ${POSTGRES_MEM_RESERVATION}
# minio:
# deploy:
# resources:
# limits:
# cpus: ${MINIO_CPU_LIMIT:-1}
# memory: ${MINIO_MEM_LIMIT:-1g}
# reservations:
# cpus: ${MINIO_CPU_RESERVATION}
# memory: ${MINIO_MEM_RESERVATION}

View File

@@ -26,6 +26,11 @@ services:
- MODEL_SERVER_PORT=${MODEL_SERVER_PORT:-}
- ENV_SEED_CONFIGURATION=${ENV_SEED_CONFIGURATION:-}
- ENABLE_PAID_ENTERPRISE_EDITION_FEATURES=True
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
# To enable the LLM for testing, update the value below
# NOTE: this is disabled by default since this is a high volume eval that can be costly
- DISABLE_GENERATIVE_AI=${DISABLE_GENERATIVE_AI:-true}
@@ -60,6 +65,11 @@ services:
- INDEXING_MODEL_SERVER_HOST=${INDEXING_MODEL_SERVER_HOST:-indexing_model_server}
- ENV_SEED_CONFIGURATION=${ENV_SEED_CONFIGURATION:-}
- ENABLE_PAID_ENTERPRISE_EDITION_FEATURES=True
# MinIO configuration
- S3_ENDPOINT_URL=${S3_ENDPOINT_URL:-http://minio:9000}
- S3_AWS_ACCESS_KEY_ID=${S3_AWS_ACCESS_KEY_ID:-minioadmin}
- S3_AWS_SECRET_ACCESS_KEY=${S3_AWS_SECRET_ACCESS_KEY:-minioadmin}
- S3_FILE_STORE_BUCKET_NAME=${S3_FILE_STORE_BUCKET_NAME:-}
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
@@ -206,6 +216,25 @@ services:
/bin/sh -c "dos2unix /etc/nginx/conf.d/run-nginx.sh
&& /etc/nginx/conf.d/run-nginx.sh app.conf.template.dev"
minio:
image: minio/minio:latest
restart: always
ports:
- "9004:9000"
- "9005:9001"
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin}
MINIO_DEFAULT_BUCKETS: ${S3_FILE_STORE_BUCKET_NAME:-onyx-file-store-bucket}
volumes:
- minio_data:/data
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
cache:
image: redis:7.4-alpine
restart: always
@@ -229,3 +258,4 @@ volumes:
o: bind
device: ${DANSWER_VESPA_DATA_DIR:-./vespa_data}
log_store: # for logs that we don't want to lose on container restarts
minio_data:

View File

@@ -11,5 +11,8 @@ dependencies:
- name: redis
repository: https://charts.bitnami.com/bitnami
version: 20.1.0
digest: sha256:28a9f2bb1c85822679a6583e03eb5885a9a0dfb11d03d969b7d783436604c43d
generated: "2025-06-03T15:21:29.377034-07:00"
- name: minio
repository: oci://registry-1.docker.io/bitnamicharts
version: 17.0.4
digest: sha256:4c938cf9138e4ff6f5ecac5c044324d508ef2b0e1a23ba3f2bc089015cb40ff6
generated: "2025-06-16T18:53:19.63168-07:00"

View File

@@ -34,3 +34,7 @@ dependencies:
version: 20.1.0
repository: https://charts.bitnami.com/bitnami
condition: redis.enabled
- name: minio
version: 17.0.4
repository: oci://registry-1.docker.io/bitnamicharts
condition: minio.enabled

View File

@@ -14,3 +14,6 @@ data:
{{- range $key, $value := .Values.configMap }}
{{ $key }}: "{{ $value }}"
{{- end }}
{{- if .Values.minio.enabled }}
S3_ENDPOINT_URL: "http://{{ .Release.Name }}-minio:{{ .Values.minio.service.ports.api | default 9000 }}"
{{- end }}

View File

@@ -563,6 +563,26 @@ redis:
existingSecret: onyx-secrets
existingSecretPasswordKey: redis_password
minio:
enabled: true
auth:
existingSecret: onyx-secrets
rootUserSecretKey: s3_aws_access_key_id
rootPasswordSecretKey: s3_aws_secret_access_key
defaultBuckets: "onyx-file-store-bucket"
persistence:
enabled: true
size: 30Gi
service:
type: ClusterIP
ports:
api: 9000
console: 9001
consoleService:
type: ClusterIP
ports:
http: 9001
ingress:
enabled: false
className: ""
@@ -589,6 +609,8 @@ auth:
oauth_client_secret: ""
oauth_cookie_secret: ""
redis_password: "redis_password"
s3_aws_access_key_id: "s3_aws_access_key_id"
s3_aws_secret_access_key: "s3_aws_secret_access_key"
# will be overridden by the existingSecret if set
secretName: "onyx-secrets"
# set values as strings, they will be base64 encoded
@@ -600,6 +622,8 @@ auth:
oauth_client_secret: ""
oauth_cookie_secret: ""
redis_password: "password"
s3_aws_access_key_id: "minioadmin"
s3_aws_secret_access_key: "minioadmin"
configMap:
# Change this for production uses unless Onyx is only accessible behind VPN
@@ -618,6 +642,9 @@ configMap:
# SMTP_PASS: ""
# 'your-email@company.com' SMTP_USER missing used instead
EMAIL_FROM: ""
# MinIO/S3 Configuration override
S3_ENDPOINT_URL: "" # only used if minio is not enabled
S3_FILE_STORE_BUCKET_NAME: ""
# Gen AI Settings
GEN_AI_MAX_TOKENS: ""
QA_TIMEOUT: "60"