Compare commits

..

15 Commits

Author SHA1 Message Date
Richard Kuo (Onyx)
54e61611c5 prototype for surfacing docs without a query 2025-03-27 16:52:31 -07:00
rkuo-danswer
f08fa878a6 refactor file extension checking and add test for blob s3 (#4369)
* refactor file extension checking and add test for blob s3

* code review

* fix checking ext

---------

Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
2025-03-27 18:57:44 +00:00
pablonyx
d307534781 add some debug logging (#4328) 2025-03-27 11:49:32 -07:00
rkuo-danswer
6f54791910 adjust some vars in real time (#4365)
* adjust some vars in real time

* some sanity checking

---------

Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
2025-03-27 17:30:08 +00:00
pablonyx
0d5497bb6b Add multi-tenant user invitation flow test (#4360) 2025-03-27 09:53:15 -07:00
Chris Weaver
7648627503 Save all logs + add log persistence to most Onyx-owned containers (#4368)
* Save all logs + add log persistence to most Onyx-owned containers

* Separate volumes for each container

* Small fixes
2025-03-26 22:25:39 -07:00
pablonyx
927554d5ca slight robustification (#4367) 2025-03-27 03:23:36 +00:00
pablonyx
7dcec6caf5 Fix session touching (#4363)
* fix session touching

* Revert "fix session touching"

This reverts commit c473d5c9a2.

* Revert "Revert "fix session touching""

This reverts commit 26a71d40b6.

* update

* quick nit
2025-03-27 01:18:46 +00:00
rkuo-danswer
036648146d possible fix for confluence query filter (#4280)
* possible fix for confluence query filter

* nuke the attachment filter query ... it doesn't work!

---------

Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
2025-03-27 00:35:14 +00:00
rkuo-danswer
2aa4697ac8 permission sync runs so often that it starves out other tasks if run at high priority (#4364)
Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
2025-03-27 00:22:53 +00:00
rkuo-danswer
bc9b4e4f45 use slack's built in rate limit handler for the bot (#4362)
Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
2025-03-26 21:55:04 +00:00
evan-danswer
178a64f298 fix issue with drive connector service account indexing (#4356)
* fix issue with drive connector service account indexing

* correct checkpoint resumption

* final set of fixes

* nit

* fix typing

* logging and CW comments

* nit
2025-03-26 20:54:26 +00:00
pablonyx
c79f1edf1d add a flush (#4361) 2025-03-26 14:40:52 -07:00
pablonyx
7c8e23aa54 Fix saml conversion from ext_perm -> basic (#4343)
* fix saml conversion from ext_perm -> basic

* quick nit

* minor fix

* finalize

* update

* quick fix
2025-03-26 20:36:51 +00:00
pablonyx
d37b427d52 fix email flow (#4339) 2025-03-26 18:59:12 +00:00
45 changed files with 604 additions and 184 deletions

View File

@@ -9,6 +9,10 @@ on:
- cron: "0 16 * * *"
env:
# AWS
AWS_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS: ${{ secrets.AWS_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS }}
AWS_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS: ${{ secrets.AWS_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS }}
# Confluence
CONFLUENCE_TEST_SPACE_URL: ${{ secrets.CONFLUENCE_TEST_SPACE_URL }}
CONFLUENCE_TEST_SPACE: ${{ secrets.CONFLUENCE_TEST_SPACE }}

View File

@@ -102,6 +102,7 @@ COPY ./alembic /app/alembic
COPY ./alembic_tenants /app/alembic_tenants
COPY ./alembic.ini /app/alembic.ini
COPY supervisord.conf /usr/etc/supervisord.conf
COPY ./static /app/static
# Escape hatch scripts
COPY ./scripts/debugging /app/scripts/debugging

View File

@@ -28,6 +28,20 @@ depends_on = None
def upgrade() -> None:
# First, drop any existing indexes to avoid conflicts
op.execute("COMMIT")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_chat_message_tsv;")
op.execute("COMMIT")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_chat_session_desc_tsv;")
op.execute("COMMIT")
op.execute("DROP INDEX IF EXISTS idx_chat_message_message_lower;")
# Drop existing columns if they exist
op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv;")
op.execute("ALTER TABLE chat_session DROP COLUMN IF EXISTS description_tsv;")
# Create a GIN index for full-text search on chat_message.message
op.execute(
"""

View File

@@ -25,6 +25,10 @@ SAML_CONF_DIR = os.environ.get("SAML_CONF_DIR") or "/app/ee/onyx/configs/saml_co
#####
# Auto Permission Sync
#####
DEFAULT_PERMISSION_DOC_SYNC_FREQUENCY = int(
os.environ.get("DEFAULT_PERMISSION_DOC_SYNC_FREQUENCY") or 5 * 60
)
# In seconds, default is 5 minutes
CONFLUENCE_PERMISSION_GROUP_SYNC_FREQUENCY = int(
os.environ.get("CONFLUENCE_PERMISSION_GROUP_SYNC_FREQUENCY") or 5 * 60
@@ -39,6 +43,7 @@ CONFLUENCE_ANONYMOUS_ACCESS_IS_PUBLIC = (
CONFLUENCE_PERMISSION_DOC_SYNC_FREQUENCY = int(
os.environ.get("CONFLUENCE_PERMISSION_DOC_SYNC_FREQUENCY") or 5 * 60
)
NUM_PERMISSION_WORKERS = int(os.environ.get("NUM_PERMISSION_WORKERS") or 2)
@@ -72,6 +77,13 @@ OAUTH_GOOGLE_DRIVE_CLIENT_SECRET = os.environ.get(
"OAUTH_GOOGLE_DRIVE_CLIENT_SECRET", ""
)
GOOGLE_DRIVE_PERMISSION_GROUP_SYNC_FREQUENCY = int(
os.environ.get("GOOGLE_DRIVE_PERMISSION_GROUP_SYNC_FREQUENCY") or 5 * 60
)
SLACK_PERMISSION_DOC_SYNC_FREQUENCY = int(
os.environ.get("SLACK_PERMISSION_DOC_SYNC_FREQUENCY") or 5 * 60
)
# The posthog client does not accept empty API keys or hosts however it fails silently
# when the capture is called. These defaults prevent Posthog issues from breaking the Onyx app

View File

@@ -3,6 +3,8 @@ from collections.abc import Generator
from ee.onyx.configs.app_configs import CONFLUENCE_PERMISSION_DOC_SYNC_FREQUENCY
from ee.onyx.configs.app_configs import CONFLUENCE_PERMISSION_GROUP_SYNC_FREQUENCY
from ee.onyx.configs.app_configs import GOOGLE_DRIVE_PERMISSION_GROUP_SYNC_FREQUENCY
from ee.onyx.configs.app_configs import SLACK_PERMISSION_DOC_SYNC_FREQUENCY
from ee.onyx.db.external_perm import ExternalUserGroup
from ee.onyx.external_permissions.confluence.doc_sync import confluence_doc_sync
from ee.onyx.external_permissions.confluence.group_sync import confluence_group_sync
@@ -66,13 +68,13 @@ GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC: set[DocumentSource] = {
DOC_PERMISSION_SYNC_PERIODS: dict[DocumentSource, int] = {
# Polling is not supported so we fetch all doc permissions every 5 minutes
DocumentSource.CONFLUENCE: CONFLUENCE_PERMISSION_DOC_SYNC_FREQUENCY,
DocumentSource.SLACK: 5 * 60,
DocumentSource.SLACK: SLACK_PERMISSION_DOC_SYNC_FREQUENCY,
}
# If nothing is specified here, we run the doc_sync every time the celery beat runs
EXTERNAL_GROUP_SYNC_PERIODS: dict[DocumentSource, int] = {
# Polling is not supported so we fetch all group permissions every 30 minutes
DocumentSource.GOOGLE_DRIVE: 5 * 60,
DocumentSource.GOOGLE_DRIVE: GOOGLE_DRIVE_PERMISSION_GROUP_SYNC_FREQUENCY,
DocumentSource.CONFLUENCE: CONFLUENCE_PERMISSION_GROUP_SYNC_FREQUENCY,
}

View File

@@ -70,6 +70,7 @@ def add_users_to_tenant(emails: list[str], tenant_id: str) -> None:
"""
Add users to a tenant with proper transaction handling.
Checks if users already have a tenant mapping to avoid duplicates.
If a user already has an active mapping to any tenant, the new mapping will be added as inactive.
"""
with get_session_with_tenant(tenant_id=POSTGRES_DEFAULT_SCHEMA) as db_session:
try:
@@ -88,9 +89,25 @@ def add_users_to_tenant(emails: list[str], tenant_id: str) -> None:
.first()
)
# If user already has an active mapping, add this one as inactive
if not existing_mapping:
# Only add if mapping doesn't exist
db_session.add(UserTenantMapping(email=email, tenant_id=tenant_id))
# Check if the user already has an active mapping to any tenant
has_active_mapping = (
db_session.query(UserTenantMapping)
.filter(
UserTenantMapping.email == email,
UserTenantMapping.active == True, # noqa: E712
)
.first()
)
db_session.add(
UserTenantMapping(
email=email,
tenant_id=tenant_id,
active=False if has_active_mapping else True,
)
)
# Commit the transaction
db_session.commit()

View File

@@ -1,6 +1,5 @@
from datetime import timedelta
from typing import Any
from typing import cast
from celery import Celery
from celery import signals
@@ -10,12 +9,10 @@ from celery.utils.log import get_task_logger
import onyx.background.celery.apps.app_base as app_base
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
from onyx.configs.constants import ONYX_CLOUD_REDIS_RUNTIME
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
from onyx.configs.constants import POSTGRES_CELERY_BEAT_APP_NAME
from onyx.db.engine import get_all_tenant_ids
from onyx.db.engine import SqlEngine
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.utils.variable_functionality import fetch_versioned_implementation
from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST
from shared_configs.configs import MULTI_TENANT
@@ -141,8 +138,6 @@ class DynamicTenantScheduler(PersistentScheduler):
"""Only updates the actual beat schedule on the celery app when it changes"""
do_update = False
r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID)
task_logger.debug("_try_updating_schedule starting")
tenant_ids = get_all_tenant_ids()
@@ -152,16 +147,7 @@ class DynamicTenantScheduler(PersistentScheduler):
current_schedule = self.schedule.items()
# get potential new state
beat_multiplier = CLOUD_BEAT_MULTIPLIER_DEFAULT
beat_multiplier_raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:beat_multiplier")
if beat_multiplier_raw is not None:
try:
beat_multiplier_bytes = cast(bytes, beat_multiplier_raw)
beat_multiplier = float(beat_multiplier_bytes.decode())
except ValueError:
task_logger.error(
f"Invalid beat_multiplier value: {beat_multiplier_raw}"
)
beat_multiplier = OnyxRuntime.get_beat_multiplier()
new_schedule = self._generate_schedule(tenant_ids, beat_multiplier)

View File

@@ -14,7 +14,7 @@ logger = setup_logger()
# Only set up memory monitoring in container environment
if is_running_in_container():
# Set up a dedicated memory monitoring logger
MEMORY_LOG_DIR = "/var/log/persisted-logs/memory"
MEMORY_LOG_DIR = "/var/log/memory"
MEMORY_LOG_FILE = os.path.join(MEMORY_LOG_DIR, "memory_usage.log")
MEMORY_LOG_MAX_BYTES = 10 * 1024 * 1024 # 10MB
MEMORY_LOG_BACKUP_COUNT = 5 # Keep 5 backup files

View File

@@ -21,6 +21,7 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds)
# we have a better implementation (backpressure, etc)
# Note that DynamicTenantScheduler can adjust the runtime value for this via Redis
CLOUD_BEAT_MULTIPLIER_DEFAULT = 8.0
CLOUD_DOC_PERMISSION_SYNC_MULTIPLIER_DEFAULT = 1.0
# tasks that run in either self-hosted on cloud
beat_task_templates: list[dict] = []

View File

@@ -451,6 +451,8 @@ def monitor_connector_deletion_taskset(
credential_id=cc_pair.credential_id,
)
db_session.flush()
# finally, delete the cc-pair
delete_connector_credential_pair__no_commit(
db_session=db_session,

View File

@@ -17,6 +17,7 @@ from redis.exceptions import LockError
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from ee.onyx.configs.app_configs import DEFAULT_PERMISSION_DOC_SYNC_FREQUENCY
from ee.onyx.db.connector_credential_pair import get_all_auto_sync_cc_pairs
from ee.onyx.db.document import upsert_document_external_perms
from ee.onyx.external_permissions.sync_params import DOC_PERMISSION_SYNC_PERIODS
@@ -63,6 +64,7 @@ from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyn
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.server.utils import make_short_id
from onyx.utils.logger import doc_permission_sync_ctx
from onyx.utils.logger import format_error_for_logging
@@ -106,9 +108,10 @@ def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> b
source_sync_period = DOC_PERMISSION_SYNC_PERIODS.get(cc_pair.connector.source)
# If RESTRICTED_FETCH_PERIOD[source] is None, we always run the sync.
if not source_sync_period:
return True
source_sync_period = DEFAULT_PERMISSION_DOC_SYNC_FREQUENCY
source_sync_period *= int(OnyxRuntime.get_doc_permission_sync_multiplier())
# If the last sync is greater than the full fetch period, we run the sync
next_sync = last_perm_sync + timedelta(seconds=source_sync_period)
@@ -286,7 +289,7 @@ def try_creating_permissions_sync_task(
),
queue=OnyxCeleryQueues.CONNECTOR_DOC_PERMISSIONS_SYNC,
task_id=custom_task_id,
priority=OnyxCeleryPriority.HIGH,
priority=OnyxCeleryPriority.MEDIUM,
)
# fill in the celery task id

View File

@@ -271,7 +271,7 @@ def try_creating_external_group_sync_task(
),
queue=OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC,
task_id=custom_task_id,
priority=OnyxCeleryPriority.HIGH,
priority=OnyxCeleryPriority.MEDIUM,
)
payload.celery_task_id = result.id

View File

@@ -72,6 +72,7 @@ from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.redis.redis_utils import is_fence
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import global_version
from shared_configs.configs import INDEXING_MODEL_SERVER_HOST
@@ -401,7 +402,11 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None:
logger.warning(f"Adding {key_bytes} to the lookup table.")
redis_client.sadd(OnyxRedisConstants.ACTIVE_FENCES, key_bytes)
redis_client.set(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE, 1, ex=300)
redis_client.set(
OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE,
1,
ex=OnyxRuntime.get_build_fence_lookup_table_interval(),
)
# 1/3: KICKOFF

View File

@@ -73,6 +73,7 @@ from onyx.db.chat import get_or_create_root_message
from onyx.db.chat import reserve_message_id
from onyx.db.chat import translate_db_message_to_chat_message_detail
from onyx.db.chat import translate_db_search_doc_to_server_search_doc
from onyx.db.chat import update_chat_session_updated_at_timestamp
from onyx.db.engine import get_session_context_manager
from onyx.db.milestone import check_multi_assistant_milestone
from onyx.db.milestone import create_milestone_if_not_exists
@@ -1069,6 +1070,8 @@ def stream_chat_message_objects(
prev_message = next_answer_message
logger.debug("Committing messages")
# Explicitly update the timestamp on the chat session
update_chat_session_updated_at_timestamp(chat_session_id, db_session)
db_session.commit() # actually save user / assistant message
yield AgenticMessageResponseIDInfo(agentic_message_ids=agentic_message_ids)

View File

@@ -382,6 +382,7 @@ ONYX_CLOUD_TENANT_ID = "cloud"
# the redis namespace for runtime variables
ONYX_CLOUD_REDIS_RUNTIME = "runtime"
CLOUD_BUILD_FENCE_LOOKUP_TABLE_INTERVAL_DEFAULT = 600
class OnyxCeleryTask:

View File

@@ -87,7 +87,7 @@ class BlobStorageConnector(LoadConnector, PollConnector):
credentials.get(key)
for key in ["aws_access_key_id", "aws_secret_access_key"]
):
raise ConnectorMissingCredentialError("Google Cloud Storage")
raise ConnectorMissingCredentialError("Amazon S3")
session = boto3.Session(
aws_access_key_id=credentials["aws_access_key_id"],

View File

@@ -65,20 +65,6 @@ _RESTRICTIONS_EXPANSION_FIELDS = [
_SLIM_DOC_BATCH_SIZE = 5000
_ATTACHMENT_EXTENSIONS_TO_FILTER_OUT = [
"gif",
"mp4",
"mov",
"mp3",
"wav",
]
_FULL_EXTENSION_FILTER_STRING = "".join(
[
f" and title!~'*.{extension}'"
for extension in _ATTACHMENT_EXTENSIONS_TO_FILTER_OUT
]
)
ONE_HOUR = 3600
@@ -209,7 +195,6 @@ class ConfluenceConnector(
def _construct_attachment_query(self, confluence_page_id: str) -> str:
attachment_query = f"type=attachment and container='{confluence_page_id}'"
attachment_query += self.cql_label_filter
attachment_query += _FULL_EXTENSION_FILTER_STRING
return attachment_query
def _get_comment_string_for_page_id(self, page_id: str) -> str:
@@ -374,11 +359,13 @@ class ConfluenceConnector(
if not validate_attachment_filetype(
attachment,
):
logger.info(f"Skipping attachment: {attachment['title']}")
continue
logger.info(f"Processing attachment: {attachment['title']}")
# Attempt to get textual content or image summarization:
try:
logger.info(f"Processing attachment: {attachment['title']}")
response = convert_attachment_to_content(
confluence_client=self.confluence_client,
attachment=attachment,

View File

@@ -28,8 +28,9 @@ from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import detect_encoding
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_processing.extract_file_text import get_file_ext
from onyx.file_processing.extract_file_text import is_accepted_file_ext
from onyx.file_processing.extract_file_text import is_text_file_extension
from onyx.file_processing.extract_file_text import is_valid_file_ext
from onyx.file_processing.extract_file_text import OnyxExtensionType
from onyx.file_processing.extract_file_text import read_text_file
from onyx.utils.logger import setup_logger
from onyx.utils.retry_wrapper import request_with_retries
@@ -69,7 +70,9 @@ def _process_egnyte_file(
file_name = file_metadata["name"]
extension = get_file_ext(file_name)
if not is_valid_file_ext(extension):
if not is_accepted_file_ext(
extension, OnyxExtensionType.Plain | OnyxExtensionType.Document
):
logger.warning(f"Skipping file '{file_name}' with extension '{extension}'")
return None

View File

@@ -22,8 +22,9 @@ from onyx.db.engine import get_session_with_current_tenant
from onyx.db.pg_file_store import get_pgfilestore_by_file_name
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_processing.extract_file_text import get_file_ext
from onyx.file_processing.extract_file_text import is_valid_file_ext
from onyx.file_processing.extract_file_text import is_accepted_file_ext
from onyx.file_processing.extract_file_text import load_files_from_zip
from onyx.file_processing.extract_file_text import OnyxExtensionType
from onyx.file_processing.image_utils import store_image_and_create_section
from onyx.file_store.file_store import get_default_file_store
from onyx.utils.logger import setup_logger
@@ -51,7 +52,7 @@ def _read_files_and_metadata(
file_content, ignore_dirs=True
):
yield os.path.join(directory_path, file_info.filename), subfile, metadata
elif is_valid_file_ext(extension):
elif is_accepted_file_ext(extension, OnyxExtensionType.All):
yield file_name, file_content, metadata
else:
logger.warning(f"Skipping file '{file_name}' with extension '{extension}'")
@@ -122,7 +123,7 @@ def _process_file(
logger.warning(f"No file record found for '{file_name}' in PG; skipping.")
return []
if not is_valid_file_ext(extension):
if not is_accepted_file_ext(extension, OnyxExtensionType.All):
logger.warning(
f"Skipping file '{file_name}' with unrecognized extension '{extension}'"
)

View File

@@ -2,9 +2,11 @@ import copy
import threading
from collections.abc import Callable
from collections.abc import Iterator
from datetime import datetime
from enum import Enum
from functools import partial
from typing import Any
from typing import cast
from typing import Protocol
from urllib.parse import urlparse
@@ -459,6 +461,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo
DriveRetrievalStage.MY_DRIVE_FILES,
)
curr_stage.stage = DriveRetrievalStage.SHARED_DRIVE_FILES
resuming = False # we are starting the next stage for the first time
if curr_stage.stage == DriveRetrievalStage.SHARED_DRIVE_FILES:
@@ -494,7 +497,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo
)
yield from _yield_from_drive(drive_id, start)
curr_stage.stage = DriveRetrievalStage.FOLDER_FILES
resuming = False # we are starting the next stage for the first time
if curr_stage.stage == DriveRetrievalStage.FOLDER_FILES:
def _yield_from_folder_crawl(
@@ -547,6 +550,16 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo
checkpoint, is_slim, DriveRetrievalStage.MY_DRIVE_FILES
)
# Setup initial completion map on first connector run
for email in all_org_emails:
# don't overwrite existing completion map on resuming runs
if email in checkpoint.completion_map:
continue
checkpoint.completion_map[email] = StageCompletion(
stage=DriveRetrievalStage.START,
completed_until=0,
)
# we've found all users and drives, now time to actually start
# fetching stuff
logger.info(f"Found {len(all_org_emails)} users to impersonate")
@@ -560,11 +573,6 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo
drive_ids_to_retrieve, checkpoint
)
for email in all_org_emails:
checkpoint.completion_map[email] = StageCompletion(
stage=DriveRetrievalStage.START,
completed_until=0,
)
user_retrieval_gens = [
self._impersonate_user_for_retrieval(
email,
@@ -795,10 +803,12 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo
return
for file in drive_files:
if file.error is not None:
if file.error is None:
checkpoint.completion_map[file.user_email].update(
stage=file.completion_stage,
completed_until=file.drive_file[GoogleFields.MODIFIED_TIME.value],
completed_until=datetime.fromisoformat(
file.drive_file[GoogleFields.MODIFIED_TIME.value]
).timestamp(),
completed_until_parent_id=file.parent_id,
)
yield file
@@ -900,10 +910,8 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo
checkpoint: GoogleDriveCheckpoint,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> Iterator[list[Document | ConnectorFailure]]:
) -> Iterator[Document | ConnectorFailure]:
try:
documents: list[Document | ConnectorFailure] = []
# Prepare a partial function with the credentials and admin email
convert_func = partial(
_convert_single_file,
@@ -912,15 +920,27 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo
self.allow_images,
self.size_threshold,
)
# Fetch files in batches
batches_complete = 0
files_batch: list[GoogleDriveFileType] = []
func_with_args: list[
tuple[
Callable[..., Document | ConnectorFailure | None], tuple[Any, ...]
]
] = []
def _yield_batch(
files_batch: list[GoogleDriveFileType],
) -> Iterator[Document | ConnectorFailure]:
nonlocal batches_complete
# Process the batch using run_functions_tuples_in_parallel
func_with_args = [(convert_func, (file,)) for file in files_batch]
results = cast(
list[Document | ConnectorFailure | None],
run_functions_tuples_in_parallel(func_with_args, max_workers=8),
)
docs_and_failures = [result for result in results if result is not None]
if docs_and_failures:
yield from docs_and_failures
batches_complete += 1
for retrieved_file in self._fetch_drive_items(
is_slim=False,
checkpoint=checkpoint,
@@ -938,44 +958,21 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo
)
failure_message += f"error: {retrieved_file.error}"
logger.error(failure_message)
yield [
ConnectorFailure(
failed_entity=EntityFailure(
entity_id=failure_stage,
),
failure_message=failure_message,
exception=retrieved_file.error,
)
]
yield ConnectorFailure(
failed_entity=EntityFailure(
entity_id=failure_stage,
),
failure_message=failure_message,
exception=retrieved_file.error,
)
continue
files_batch.append(retrieved_file.drive_file)
if len(files_batch) < self.batch_size:
continue
# Process the batch using run_functions_tuples_in_parallel
func_with_args = [(convert_func, (file,)) for file in files_batch]
results = run_functions_tuples_in_parallel(
func_with_args, max_workers=8
)
documents = []
for idx, result in enumerate(results):
if not result:
continue
if isinstance(result, ConnectorFailure):
logger.error(result.exception)
yield [result]
elif isinstance(result, Document):
documents.append(result)
else:
logger.warning(f"Unexpected result type: {type(result)}")
continue
if documents:
yield documents
batches_complete += 1
yield from _yield_batch(files_batch)
files_batch = []
if batches_complete > BATCHES_PER_CHECKPOINT:
@@ -984,27 +981,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo
# Process any remaining files
if files_batch:
func_with_args = [(convert_func, (file,)) for file in files_batch]
results = run_functions_tuples_in_parallel(
func_with_args, max_workers=8
)
documents = []
for idx, result in enumerate(results):
if not result:
continue
if isinstance(result, ConnectorFailure):
logger.error(result.exception)
yield [result]
elif isinstance(result, Document):
documents.append(result)
else:
logger.warning(f"Unexpected result type: {type(result)}")
continue
if documents:
yield documents
yield from _yield_batch(files_batch)
except Exception as e:
logger.exception(f"Error extracting documents from Google Drive: {e}")
raise e
@@ -1026,10 +1003,7 @@ class GoogleDriveConnector(SlimConnector, CheckpointConnector[GoogleDriveCheckpo
checkpoint = copy.deepcopy(checkpoint)
self._retrieved_ids = checkpoint.retrieved_folder_and_drive_ids
try:
for doc_list in self._extract_docs_from_google_drive(
checkpoint, start, end
):
yield from doc_list
yield from self._extract_docs_from_google_drive(checkpoint, start, end)
except Exception as e:
if MISSING_SCOPES_ERROR_STR in str(e):
raise PermissionError(ONYX_SCOPE_INSTRUCTIONS) from e

View File

@@ -123,7 +123,7 @@ def crawl_folders_for_files(
end=end,
):
found_files = True
logger.info(f"Found file: {file['name']}")
logger.info(f"Found file: {file['name']}, user email: {user_email}")
yield RetrievedDriveFile(
drive_file=file,
user_email=user_email,

View File

@@ -20,8 +20,8 @@ from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import ALL_ACCEPTED_FILE_EXTENSIONS
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_processing.extract_file_text import VALID_FILE_EXTENSIONS
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.logger import setup_logger
@@ -298,7 +298,7 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
elif (
is_valid_format
and file_extension in VALID_FILE_EXTENSIONS
and file_extension in ALL_ACCEPTED_FILE_EXTENSIONS
and can_download
):
# For documents, try to get the text content

View File

@@ -1089,3 +1089,20 @@ def log_agent_sub_question_results(
db_session.commit()
return None
def update_chat_session_updated_at_timestamp(
chat_session_id: UUID, db_session: Session
) -> None:
"""
Explicitly update the timestamp on a chat session without modifying other fields.
This is useful when adding messages to a chat session to reflect recent activity.
"""
# Direct SQL update to avoid loading the entire object if it's not already loaded
db_session.execute(
update(ChatSession)
.where(ChatSession.id == chat_session_id)
.values(time_updated=func.now())
)
# No commit - the caller is responsible for committing the transaction

View File

@@ -821,26 +821,30 @@ class VespaIndex(DocumentIndex):
num_to_retrieve: int = NUM_RETURNED_HITS,
offset: int = 0,
) -> list[InferenceChunkUncleaned]:
vespa_where_clauses = build_vespa_filters(filters, include_hidden=True)
yql = (
YQL_BASE.format(index_name=self.index_name)
+ vespa_where_clauses
+ '({grammar: "weakAnd"}userInput(@query) '
# `({defaultIndex: "content_summary"}userInput(@query))` section is
# needed for highlighting while the N-gram highlighting is broken /
# not working as desired
+ f'or ({{defaultIndex: "{CONTENT_SUMMARY}"}}userInput(@query)))'
vespa_where_clauses = build_vespa_filters(
filters, include_hidden=True, remove_trailing_and=True
)
yql = YQL_BASE.format(index_name=self.index_name) + vespa_where_clauses
params: dict[str, str | int] = {
"yql": yql,
"query": query,
"hits": num_to_retrieve,
"offset": 0,
"ranking.profile": "admin_search",
"timeout": VESPA_TIMEOUT,
}
if len(query.strip()) > 0:
yql += (
' and ({grammar: "weakAnd"}userInput(@query) '
# `({defaultIndex: "content_summary"}userInput(@query))` section is
# needed for highlighting while the N-gram highlighting is broken /
# not working as desired
+ f'or ({{defaultIndex: "{CONTENT_SUMMARY}"}}userInput(@query)))'
)
params["yql"] = yql
params["query"] = query
return query_vespa(params)
# Retrieves chunk information for a document:

View File

@@ -7,6 +7,8 @@ from collections.abc import Callable
from collections.abc import Iterator
from collections.abc import Sequence
from email.parser import Parser as EmailParser
from enum import auto
from enum import IntFlag
from io import BytesIO
from pathlib import Path
from typing import Any
@@ -35,7 +37,7 @@ logger = setup_logger()
TEXT_SECTION_SEPARATOR = "\n\n"
PLAIN_TEXT_FILE_EXTENSIONS = [
ACCEPTED_PLAIN_TEXT_FILE_EXTENSIONS = [
".txt",
".md",
".mdx",
@@ -49,7 +51,7 @@ PLAIN_TEXT_FILE_EXTENSIONS = [
".yaml",
]
VALID_FILE_EXTENSIONS = PLAIN_TEXT_FILE_EXTENSIONS + [
ACCEPTED_DOCUMENT_FILE_EXTENSIONS = [
".pdf",
".docx",
".pptx",
@@ -57,12 +59,21 @@ VALID_FILE_EXTENSIONS = PLAIN_TEXT_FILE_EXTENSIONS + [
".eml",
".epub",
".html",
]
ACCEPTED_IMAGE_FILE_EXTENSIONS = [
".png",
".jpg",
".jpeg",
".webp",
]
ALL_ACCEPTED_FILE_EXTENSIONS = (
ACCEPTED_PLAIN_TEXT_FILE_EXTENSIONS
+ ACCEPTED_DOCUMENT_FILE_EXTENSIONS
+ ACCEPTED_IMAGE_FILE_EXTENSIONS
)
IMAGE_MEDIA_TYPES = [
"image/png",
"image/jpeg",
@@ -70,8 +81,15 @@ IMAGE_MEDIA_TYPES = [
]
class OnyxExtensionType(IntFlag):
Plain = auto()
Document = auto()
Multimedia = auto()
All = Plain | Document | Multimedia
def is_text_file_extension(file_name: str) -> bool:
return any(file_name.endswith(ext) for ext in PLAIN_TEXT_FILE_EXTENSIONS)
return any(file_name.endswith(ext) for ext in ACCEPTED_PLAIN_TEXT_FILE_EXTENSIONS)
def get_file_ext(file_path_or_name: str | Path) -> str:
@@ -83,8 +101,20 @@ def is_valid_media_type(media_type: str) -> bool:
return media_type in IMAGE_MEDIA_TYPES
def is_valid_file_ext(ext: str) -> bool:
return ext in VALID_FILE_EXTENSIONS
def is_accepted_file_ext(ext: str, ext_type: OnyxExtensionType) -> bool:
if ext_type & OnyxExtensionType.Plain:
if ext in ACCEPTED_PLAIN_TEXT_FILE_EXTENSIONS:
return True
if ext_type & OnyxExtensionType.Document:
if ext in ACCEPTED_DOCUMENT_FILE_EXTENSIONS:
return True
if ext_type & OnyxExtensionType.Multimedia:
if ext in ACCEPTED_IMAGE_FILE_EXTENSIONS:
return True
return False
def is_text_file(file: IO[bytes]) -> bool:
@@ -382,6 +412,9 @@ def extract_file_text(
"""
Legacy function that returns *only text*, ignoring embedded images.
For backward-compatibility in code that only wants text.
NOTE: Ignoring seems to be defined as returning an empty string for files it can't
handle (such as images).
"""
extension_to_function: dict[str, Callable[[IO[Any]], str]] = {
".pdf": pdf_to_text,
@@ -405,7 +438,9 @@ def extract_file_text(
if extension is None:
extension = get_file_ext(file_name)
if is_valid_file_ext(extension):
if is_accepted_file_ext(
extension, OnyxExtensionType.Plain | OnyxExtensionType.Document
):
func = extension_to_function.get(extension, file_io_to_text)
file.seek(0)
return func(file)

View File

@@ -15,6 +15,7 @@ EXCLUDED_IMAGE_TYPES = [
"image/tiff",
"image/gif",
"image/svg+xml",
"image/avif",
]

View File

@@ -15,7 +15,6 @@ from onyx.configs.constants import MessageType
from onyx.configs.constants import SearchFeedbackType
from onyx.configs.onyxbot_configs import DANSWER_FOLLOWUP_EMOJI
from onyx.connectors.slack.utils import expert_info_from_slack_id
from onyx.connectors.slack.utils import make_slack_api_rate_limited
from onyx.context.search.models import SavedSearchDoc
from onyx.db.chat import get_chat_message
from onyx.db.chat import translate_db_message_to_chat_message_detail
@@ -553,8 +552,7 @@ def handle_followup_resolved_button(
# Delete the message with the option to mark resolved
if not immediate:
slack_call = make_slack_api_rate_limited(client.web_client.chat_delete)
response = slack_call(
response = client.web_client.chat_delete(
channel=channel_id,
ts=message_ts,
)

View File

@@ -18,6 +18,9 @@ from prometheus_client import start_http_server
from redis.lock import Lock
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.http_retry import ConnectionErrorRetryHandler
from slack_sdk.http_retry import RateLimitErrorRetryHandler
from slack_sdk.http_retry import RetryHandler
from slack_sdk.socket_mode.request import SocketModeRequest
from slack_sdk.socket_mode.response import SocketModeResponse
from sqlalchemy.orm import Session
@@ -944,10 +947,21 @@ def _get_socket_client(
) -> TenantSocketModeClient:
# For more info on how to set this up, checkout the docs:
# https://docs.onyx.app/slack_bot_setup
# use the retry handlers built into the slack sdk
connection_error_retry_handler = ConnectionErrorRetryHandler()
rate_limit_error_retry_handler = RateLimitErrorRetryHandler(max_retry_count=7)
slack_retry_handlers: list[RetryHandler] = [
connection_error_retry_handler,
rate_limit_error_retry_handler,
]
return TenantSocketModeClient(
# This app-level token will be used only for establishing a connection
app_token=slack_bot_tokens.app_token,
web_client=WebClient(token=slack_bot_tokens.bot_token),
web_client=WebClient(
token=slack_bot_tokens.bot_token, retry_handlers=slack_retry_handlers
),
tenant_id=tenant_id,
slack_bot_id=slack_bot_id,
)

View File

@@ -30,7 +30,6 @@ from onyx.configs.onyxbot_configs import (
from onyx.configs.onyxbot_configs import (
DANSWER_BOT_RESPONSE_LIMIT_TIME_PERIOD_SECONDS,
)
from onyx.connectors.slack.utils import make_slack_api_rate_limited
from onyx.connectors.slack.utils import SlackTextCleaner
from onyx.db.engine import get_session_with_current_tenant
from onyx.db.users import get_user_by_email
@@ -125,13 +124,18 @@ def update_emote_react(
)
return
func = client.reactions_remove if remove else client.reactions_add
slack_call = make_slack_api_rate_limited(func) # type: ignore
slack_call(
name=emoji,
channel=channel,
timestamp=message_ts,
)
if remove:
client.reactions_remove(
name=emoji,
channel=channel,
timestamp=message_ts,
)
else:
client.reactions_add(
name=emoji,
channel=channel,
timestamp=message_ts,
)
except SlackApiError as e:
if remove:
logger.error(f"Failed to remove Reaction due to: {e}")
@@ -200,9 +204,8 @@ def respond_in_thread_or_channel(
message_ids: list[str] = []
if not receiver_ids:
slack_call = make_slack_api_rate_limited(client.chat_postMessage)
try:
response = slack_call(
response = client.chat_postMessage(
channel=channel,
text=text,
blocks=blocks,
@@ -224,7 +227,7 @@ def respond_in_thread_or_channel(
blocks_without_urls.append(_build_error_block(str(e)))
# Try again wtihout blocks containing url
response = slack_call(
response = client.chat_postMessage(
channel=channel,
text=text,
blocks=blocks_without_urls,
@@ -236,11 +239,9 @@ def respond_in_thread_or_channel(
message_ids.append(response["message_ts"])
else:
slack_call = make_slack_api_rate_limited(client.chat_postEphemeral)
for receiver in receiver_ids:
try:
response = slack_call(
response = client.chat_postEphemeral(
channel=channel,
user=receiver,
text=text,
@@ -263,7 +264,7 @@ def respond_in_thread_or_channel(
blocks_without_urls.append(_build_error_block(str(e)))
# Try again wtihout blocks containing url
response = slack_call(
response = client.chat_postEphemeral(
channel=channel,
user=receiver,
text=text,
@@ -500,7 +501,7 @@ def fetch_user_semantic_id_from_id(
if not user_id:
return None
response = make_slack_api_rate_limited(client.users_info)(user=user_id)
response = client.users_info(user=user_id)
if not response["ok"]:
return None

View File

@@ -313,7 +313,7 @@ def bulk_invite_users(
detail=f"Invalid email address: {email} - {str(e)}",
)
if MULTI_TENANT and not DEV_MODE:
if MULTI_TENANT:
try:
fetch_ee_implementation_or_noop(
"onyx.server.tenants.provisioning", "add_users_to_tenant", None
@@ -335,7 +335,7 @@ def bulk_invite_users(
except Exception as e:
logger.error(f"Error sending email invite to invited users: {e}")
if not MULTI_TENANT:
if not MULTI_TENANT or DEV_MODE:
return number_of_invited_users
# for billing purposes, write to the control plane about the number of new users
@@ -376,7 +376,7 @@ def remove_invited_user(
number_of_invited_users = write_invited_users(remaining_users)
try:
if MULTI_TENANT:
if MULTI_TENANT and not DEV_MODE:
fetch_ee_implementation_or_noop(
"onyx.server.tenants.billing", "register_tenant_users", None
)(tenant_id, get_total_users_count(db_session))

View File

@@ -1,10 +1,19 @@
import io
from typing import cast
from PIL import Image
from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEFAULT
from onyx.background.celery.tasks.beat_schedule import (
CLOUD_DOC_PERMISSION_SYNC_MULTIPLIER_DEFAULT,
)
from onyx.configs.constants import CLOUD_BUILD_FENCE_LOOKUP_TABLE_INTERVAL_DEFAULT
from onyx.configs.constants import ONYX_CLOUD_REDIS_RUNTIME
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
from onyx.configs.constants import ONYX_EMAILABLE_LOGO_MAX_DIM
from onyx.db.engine import get_session_with_shared_schema
from onyx.file_store.file_store import PostgresBackedFileStore
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.utils.file import FileWithMimeType
from onyx.utils.file import OnyxStaticFileManager
from onyx.utils.variable_functionality import (
@@ -87,3 +96,72 @@ class OnyxRuntime:
)
return OnyxRuntime._get_with_static_fallback(db_filename, STATIC_FILENAME)
@staticmethod
def get_beat_multiplier() -> float:
"""the beat multiplier is used to scale up or down the frequency of certain beat
tasks in the cloud. It has a significant effect on load and is useful to adjust
in real time."""
beat_multiplier: float = CLOUD_BEAT_MULTIPLIER_DEFAULT
r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID)
beat_multiplier_raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:beat_multiplier")
if beat_multiplier_raw is not None:
try:
beat_multiplier_bytes = cast(bytes, beat_multiplier_raw)
beat_multiplier = float(beat_multiplier_bytes.decode())
except ValueError:
pass
if beat_multiplier <= 0.0:
return 1.0
return beat_multiplier
@staticmethod
def get_doc_permission_sync_multiplier() -> float:
"""Permission syncs are a significant source of load / queueing in the cloud."""
value: float = CLOUD_DOC_PERMISSION_SYNC_MULTIPLIER_DEFAULT
r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID)
value_raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:doc_permission_sync_multiplier")
if value_raw is not None:
try:
value_bytes = cast(bytes, value_raw)
value = float(value_bytes.decode())
except ValueError:
pass
if value <= 0.0:
return 1.0
return value
@staticmethod
def get_build_fence_lookup_table_interval() -> int:
"""We maintain an active fence table to make lookups of existing fences efficient.
However, reconstructing the table is expensive, so adjusting it in realtime is useful.
"""
interval: int = CLOUD_BUILD_FENCE_LOOKUP_TABLE_INTERVAL_DEFAULT
r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID)
interval_raw = r.get(
f"{ONYX_CLOUD_REDIS_RUNTIME}:build_fence_lookup_table_interval"
)
if interval_raw is not None:
try:
interval_bytes = cast(bytes, interval_raw)
interval = int(interval_bytes.decode())
except ValueError:
pass
if interval <= 0.0:
return CLOUD_BUILD_FENCE_LOOKUP_TABLE_INTERVAL_DEFAULT
return interval

View File

@@ -0,0 +1,77 @@
import os
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
from onyx.configs.constants import BlobType
from onyx.connectors.blob.connector import BlobStorageConnector
from onyx.connectors.models import Document
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import ACCEPTED_DOCUMENT_FILE_EXTENSIONS
from onyx.file_processing.extract_file_text import ACCEPTED_IMAGE_FILE_EXTENSIONS
from onyx.file_processing.extract_file_text import ACCEPTED_PLAIN_TEXT_FILE_EXTENSIONS
from onyx.file_processing.extract_file_text import get_file_ext
@pytest.fixture
def blob_connector(request: pytest.FixtureRequest) -> BlobStorageConnector:
connector = BlobStorageConnector(
bucket_type=BlobType.S3, bucket_name="onyx-connector-tests"
)
connector.load_credentials(
{
"aws_access_key_id": os.environ["AWS_ACCESS_KEY_ID_DAILY_CONNECTOR_TESTS"],
"aws_secret_access_key": os.environ[
"AWS_SECRET_ACCESS_KEY_DAILY_CONNECTOR_TESTS"
],
}
)
return connector
@patch(
"onyx.file_processing.extract_file_text.get_unstructured_api_key",
return_value=None,
)
def test_blob_s3_connector(
mock_get_api_key: MagicMock, blob_connector: BlobStorageConnector
) -> None:
"""
Plain and document file types should be fully indexed.
Multimedia and unknown file types will be indexed by title only with one empty section.
This is intentional in order to allow searching by just the title even if we can't
index the file content.
"""
all_docs: list[Document] = []
document_batches = blob_connector.load_from_state()
for doc_batch in document_batches:
for doc in doc_batch:
all_docs.append(doc)
#
assert len(all_docs) == 19
for doc in all_docs:
section = doc.sections[0]
assert isinstance(section, TextSection)
file_extension = get_file_ext(doc.semantic_identifier)
if file_extension in ACCEPTED_PLAIN_TEXT_FILE_EXTENSIONS:
assert len(section.text) > 0
continue
if file_extension in ACCEPTED_DOCUMENT_FILE_EXTENSIONS:
assert len(section.text) > 0
continue
if file_extension in ACCEPTED_IMAGE_FILE_EXTENSIONS:
assert len(section.text) == 0
continue
# unknown extension
assert len(section.text) == 0

View File

@@ -9,7 +9,9 @@ from requests import HTTPError
from onyx.auth.schemas import UserRole
from onyx.configs.constants import FASTAPI_USERS_AUTH_COOKIE_NAME
from onyx.server.documents.models import PaginatedReturn
from onyx.server.manage.models import UserInfo
from onyx.server.models import FullUserSnapshot
from onyx.server.models import InvitedUserSnapshot
from tests.integration.common_utils.constants import API_SERVER_URL
from tests.integration.common_utils.constants import GENERAL_HEADERS
from tests.integration.common_utils.test_models import DATestUser
@@ -245,3 +247,69 @@ class UserManager:
total_items=data["total_items"],
)
return paginated_result
@staticmethod
def invite_user(
user_to_invite_email: str, user_performing_action: DATestUser
) -> None:
"""Invite a user by email to join the organization.
Args:
user_to_invite_email: Email of the user to invite
user_performing_action: User with admin permissions performing the invitation
"""
response = requests.put(
url=f"{API_SERVER_URL}/manage/admin/users",
headers=user_performing_action.headers,
json={"emails": [user_to_invite_email]},
)
response.raise_for_status()
@staticmethod
def accept_invitation(tenant_id: str, user_performing_action: DATestUser) -> None:
"""Accept an invitation to join the organization.
Args:
tenant_id: ID of the tenant/organization to accept invitation for
user_performing_action: User accepting the invitation
"""
response = requests.post(
url=f"{API_SERVER_URL}/tenants/users/invite/accept",
headers=user_performing_action.headers,
json={"tenant_id": tenant_id},
)
response.raise_for_status()
@staticmethod
def get_invited_users(
user_performing_action: DATestUser,
) -> list[InvitedUserSnapshot]:
"""Get a list of all invited users.
Args:
user_performing_action: User with admin permissions performing the action
Returns:
List of invited user snapshots
"""
response = requests.get(
url=f"{API_SERVER_URL}/manage/users/invited",
headers=user_performing_action.headers,
)
response.raise_for_status()
return [InvitedUserSnapshot(**user) for user in response.json()]
@staticmethod
def get_user_info(user_performing_action: DATestUser) -> UserInfo:
"""Get user info for the current user.
Args:
user_performing_action: User performing the action
"""
response = requests.get(
url=f"{API_SERVER_URL}/me",
headers=user_performing_action.headers,
)
response.raise_for_status()
return UserInfo(**response.json())

View File

@@ -0,0 +1,70 @@
from onyx.db.models import UserRole
from tests.integration.common_utils.managers.user import UserManager
from tests.integration.common_utils.test_models import DATestUser
INVITED_BASIC_USER = "basic_user"
INVITED_BASIC_USER_EMAIL = "basic_user@test.com"
def test_user_invitation_flow(reset_multitenant: None) -> None:
# Create first user (admin)
admin_user: DATestUser = UserManager.create(name="admin")
assert UserManager.is_role(admin_user, UserRole.ADMIN)
# Create second user
invited_user: DATestUser = UserManager.create(name="admin_invited")
assert UserManager.is_role(invited_user, UserRole.ADMIN)
# Admin user invites the previously registered and non-registered user
UserManager.invite_user(invited_user.email, admin_user)
UserManager.invite_user(INVITED_BASIC_USER_EMAIL, admin_user)
invited_basic_user: DATestUser = UserManager.create(
name=INVITED_BASIC_USER, email=INVITED_BASIC_USER_EMAIL
)
assert UserManager.is_role(invited_basic_user, UserRole.BASIC)
# Verify the user is in the invited users list
invited_users = UserManager.get_invited_users(admin_user)
assert invited_user.email in [
user.email for user in invited_users
], f"User {invited_user.email} not found in invited users list"
# Get user info to check tenant information
user_info = UserManager.get_user_info(invited_user)
# Extract the tenant_id from the invitation
invited_tenant_id = (
user_info.tenant_info.invitation.tenant_id
if user_info.tenant_info and user_info.tenant_info.invitation
else None
)
assert invited_tenant_id is not None, "Expected to find an invitation tenant_id"
UserManager.accept_invitation(invited_tenant_id, invited_user)
# Get updated user info after accepting invitation
updated_user_info = UserManager.get_user_info(invited_user)
# Verify the user is no longer in the invited users list
updated_invited_users = UserManager.get_invited_users(admin_user)
assert invited_user.email not in [
user.email for user in updated_invited_users
], f"User {invited_user.email} should not be in invited users list after accepting"
# Verify the user has BASIC role in the organization
assert (
updated_user_info.role == UserRole.BASIC
), f"Expected user to have BASIC role, but got {updated_user_info.role}"
# Verify user is in the organization
user_page = UserManager.get_user_page(
user_performing_action=admin_user, role_filter=[UserRole.BASIC]
)
# Check if the invited user is in the list of users with BASIC role
invited_user_emails = [user.email for user in user_page.items]
assert invited_user.email in invited_user_emails, (
f"User {invited_user.email} not found in the list of basic users "
f"in the organization. Available users: {invited_user_emails}"
)

View File

@@ -129,6 +129,9 @@ services:
options:
max-size: "50m"
max-file: "6"
# optional, only for debugging purposes
volumes:
- api_server_logs:/var/log
background:
image: onyxdotapp/onyx-backend:${IMAGE_TAG:-latest}
@@ -256,7 +259,7 @@ services:
- "host.docker.internal:host-gateway"
# optional, only for debugging purposes
volumes:
- log_store:/var/log/persisted-logs
- background_logs:/var/log
logging:
driver: json-file
options:
@@ -325,6 +328,8 @@ services:
volumes:
# Not necessary, this is just to reduce download time during startup
- model_cache_huggingface:/root/.cache/huggingface/
# optional, only for debugging purposes
- inference_model_server_logs:/var/log
logging:
driver: json-file
options:
@@ -357,6 +362,8 @@ services:
volumes:
# Not necessary, this is just to reduce download time during startup
- indexing_huggingface_model_cache:/root/.cache/huggingface/
# optional, only for debugging purposes
- indexing_model_server_logs:/var/log
logging:
driver: json-file
options:
@@ -434,4 +441,8 @@ volumes:
model_cache_huggingface:
indexing_huggingface_model_cache:
log_store: # for logs that we don't want to lose on container restarts
# for logs that we don't want to lose on container restarts
api_server_logs:
background_logs:
inference_model_server_logs:
indexing_model_server_logs:

View File

@@ -106,6 +106,9 @@ services:
options:
max-size: "50m"
max-file: "6"
volumes:
# optional, only for debugging purposes
- api_server_logs:/var/log
background:
image: onyxdotapp/onyx-backend:${IMAGE_TAG:-latest}
@@ -211,7 +214,7 @@ services:
- "host.docker.internal:host-gateway"
# optional, only for debugging purposes
volumes:
- log_store:/var/log/persisted-logs
- background_logs:/var/log
logging:
driver: json-file
options:
@@ -273,6 +276,8 @@ services:
volumes:
# Not necessary, this is just to reduce download time during startup
- model_cache_huggingface:/root/.cache/huggingface/
# optional, only for debugging purposes
- inference_model_server_logs:/var/log
logging:
driver: json-file
options:
@@ -310,6 +315,8 @@ services:
volumes:
# Not necessary, this is just to reduce download time during startup
- indexing_huggingface_model_cache:/root/.cache/huggingface/
# optional, only for debugging purposes
- indexing_model_server_logs:/var/log
logging:
driver: json-file
options:
@@ -387,4 +394,8 @@ volumes:
# Created by the container itself
model_cache_huggingface:
indexing_huggingface_model_cache:
log_store: # for logs that we don't want to lose on container restarts
# for logs that we don't want to lose on container restarts
api_server_logs:
background_logs:
inference_model_server_logs:
indexing_model_server_logs:

View File

@@ -244,8 +244,6 @@ services:
# - ./bundle.pem:/app/bundle.pem:ro
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- log_store:/var/log/persisted-logs
logging:
driver: json-file
options:
@@ -423,4 +421,3 @@ volumes:
model_cache_huggingface:
indexing_huggingface_model_cache:
log_store: # for logs that we don't want to lose on container restarts

View File

@@ -54,9 +54,6 @@ services:
- INDEXING_MODEL_SERVER_HOST=${INDEXING_MODEL_SERVER_HOST:-indexing_model_server}
extra_hosts:
- "host.docker.internal:host-gateway"
# optional, only for debugging purposes
volumes:
- log_store:/var/log/persisted-logs
logging:
driver: json-file
options:
@@ -236,4 +233,3 @@ volumes:
# Created by the container itself
model_cache_huggingface:
indexing_huggingface_model_cache:
log_store: # for logs that we don't want to lose on container restarts

View File

@@ -36,6 +36,10 @@ services:
options:
max-size: "50m"
max-file: "6"
volumes:
# optional, only for debugging purposes
- api_server_logs:/var/log
background:
image: onyxdotapp/onyx-backend:${IMAGE_TAG:-latest}
@@ -69,7 +73,7 @@ services:
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- log_store:/var/log/persisted-logs
- background_logs:/var/log
logging:
driver: json-file
options:
@@ -122,6 +126,8 @@ services:
volumes:
# Not necessary, this is just to reduce download time during startup
- model_cache_huggingface:/root/.cache/huggingface/
# optional, only for debugging purposes
- inference_model_server_logs:/var/log
logging:
driver: json-file
options:
@@ -150,6 +156,8 @@ services:
volumes:
# Not necessary, this is just to reduce download time during startup
- indexing_huggingface_model_cache:/root/.cache/huggingface/
# optional, only for debugging purposes
- indexing_model_server_logs:/var/log
logging:
driver: json-file
options:
@@ -231,4 +239,8 @@ volumes:
# Created by the container itself
model_cache_huggingface:
indexing_huggingface_model_cache:
log_store: # for logs that we don't want to lose on container restarts
# for logs that we don't want to lose on container restarts
api_server_logs:
background_logs:
inference_model_server_logs:
indexing_model_server_logs:

View File

@@ -32,13 +32,14 @@ services:
# - ./bundle.pem:/app/bundle.pem:ro
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- log_store:/var/log/persisted-logs
logging:
driver: json-file
options:
max-size: "50m"
max-file: "6"
volumes:
- api_server_logs:/var/log
background:
image: onyxdotapp/onyx-backend:${IMAGE_TAG:-latest}
build:
@@ -76,7 +77,7 @@ services:
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- log_store:/var/log/persisted-logs
- background_logs:/var/log
logging:
driver: json-file
options:
@@ -152,6 +153,8 @@ services:
volumes:
# Not necessary, this is just to reduce download time during startup
- model_cache_huggingface:/root/.cache/huggingface/
# optional, only for debugging purposes
- inference_model_server_logs:/var/log
logging:
driver: json-file
options:
@@ -180,6 +183,8 @@ services:
volumes:
# Not necessary, this is just to reduce download time during startup
- indexing_huggingface_model_cache:/root/.cache/huggingface/
# optional, only for debugging purposes
- indexing_model_server_logs:/var/log
logging:
driver: json-file
options:
@@ -264,4 +269,8 @@ volumes:
# Created by the container itself
model_cache_huggingface:
indexing_huggingface_model_cache:
log_store: # for logs that we don't want to lose on container restarts
# for logs that we don't want to lose on container restarts
api_server_logs:
background_logs:
inference_model_server_logs:
indexing_model_server_logs:

View File

@@ -63,7 +63,7 @@ services:
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- log_store:/var/log/persisted-logs
- log_store:/var/log
logging:
driver: json-file
options:

View File

@@ -148,7 +148,8 @@ export function Explorer({
clearTimeout(timeoutId);
}
if (query && query.trim() !== "") {
let doSearch = true;
if (doSearch) {
router.replace(
`/admin/documents/explorer?query=${encodeURIComponent(query)}`
);

View File

@@ -1384,6 +1384,7 @@ export function ChatPage({
if (!packet) {
continue;
}
console.log("Packet:", JSON.stringify(packet));
if (!initialFetchDetails) {
if (!Object.hasOwn(packet, "user_message_id")) {
@@ -1729,6 +1730,7 @@ export function ChatPage({
}
}
} catch (e: any) {
console.log("Error:", e);
const errorMsg = e.message;
upsertToCompleteMessageMap({
messages: [
@@ -1756,11 +1758,13 @@ export function ChatPage({
completeMessageMapOverride: currentMessageMap(completeMessageDetail),
});
}
console.log("Finished streaming");
setAgenticGenerating(false);
resetRegenerationState(currentSessionId());
updateChatState("input");
if (isNewSession) {
console.log("Setting up new session");
if (finalMessage) {
setSelectedMessageForDocDisplay(finalMessage.message_id);
}

View File

@@ -102,7 +102,7 @@ export function UserProvider({
};
// Use the custom token refresh hook
useTokenRefresh(upToDateUser, fetchUser);
// useTokenRefresh(upToDateUser, fetchUser);
const updateUserTemperatureOverrideEnabled = async (enabled: boolean) => {
try {