Compare commits

..

5 Commits

33 changed files with 958 additions and 176 deletions

View File

@@ -12,7 +12,7 @@ founders@onyx.app for more information. Please visit https://github.com/onyx-dot
ARG ENABLE_CRAFT=false
# DO_NOT_TRACK is used to disable telemetry for Unstructured
ENV DANSWER_RUNNING_IN_DOCKER="true" \
ENV ONYX_RUNNING_IN_DOCKER="true" \
DO_NOT_TRACK="true" \
PLAYWRIGHT_BROWSERS_PATH="/app/.cache/ms-playwright"

View File

@@ -1,7 +1,7 @@
# Base stage with dependencies
FROM python:3.11-slim-bookworm@sha256:9c6f90801e6b68e772b7c0ca74260cbf7af9f320acec894e26fccdaccfbe3b47 AS base
ENV DANSWER_RUNNING_IN_DOCKER="true" \
ENV ONYX_RUNNING_IN_DOCKER="true" \
HF_HOME=/app/.cache/huggingface
COPY --from=ghcr.io/astral-sh/uv:0.9.9 /uv /uvx /bin/

View File

@@ -5,8 +5,8 @@ from logging.handlers import RotatingFileHandler
import psutil
from onyx.utils.logger import is_running_in_container
from onyx.utils.logger import setup_logger
from onyx.utils.platform import is_running_in_container
# Regular application logger
logger = setup_logger()

View File

@@ -42,7 +42,7 @@ from onyx.db.models import UserGroup
from onyx.db.search_settings import get_active_search_settings_list
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.utils.logger import is_running_in_container
from onyx.utils.platform import is_running_in_container
from onyx.utils.telemetry import optional_telemetry
from onyx.utils.telemetry import RecordType
from shared_configs.configs import MULTI_TENANT

View File

@@ -51,7 +51,6 @@ from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.hierarchy import delete_orphaned_hierarchy_nodes
from onyx.db.hierarchy import link_hierarchy_nodes_to_documents
from onyx.db.hierarchy import remove_stale_hierarchy_node_cc_pair_entries
from onyx.db.hierarchy import reparent_orphaned_hierarchy_nodes
from onyx.db.hierarchy import update_document_parent_hierarchy_nodes
@@ -643,16 +642,6 @@ def connector_pruning_generator_task(
raw_id_to_parent=all_connector_doc_ids,
)
# Link hierarchy nodes to documents for sources where pages can be
# both hierarchy nodes AND documents (e.g. Notion, Confluence)
all_doc_id_list = list(all_connector_doc_ids.keys())
link_hierarchy_nodes_to_documents(
db_session=db_session,
document_ids=all_doc_id_list,
source=source,
commit=True,
)
diff_start = time.monotonic()
try:
# a list of docs in our local index

View File

@@ -93,7 +93,6 @@ from onyx.llm.factory import get_llm_for_persona
from onyx.llm.factory import get_llm_token_counter
from onyx.llm.interfaces import LLM
from onyx.llm.interfaces import LLMUserIdentity
from onyx.llm.multi_llm import LLMTimeoutError
from onyx.llm.override_models import LLMOverride
from onyx.llm.request_context import reset_llm_mock_response
from onyx.llm.request_context import set_llm_mock_response
@@ -1167,32 +1166,6 @@ def _run_models(
else:
if item is _MODEL_DONE:
models_remaining -= 1
elif isinstance(item, LLMTimeoutError):
model_llm = setup.llms[model_idx]
error_msg = (
"The LLM took too long to respond. "
"If you're running a local model, try increasing the "
"LLM_SOCKET_READ_TIMEOUT environment variable "
"(current default: 120 seconds)."
)
stack_trace = "".join(
traceback.format_exception(type(item), item, item.__traceback__)
)
if model_llm.config.api_key and len(model_llm.config.api_key) > 2:
stack_trace = stack_trace.replace(
model_llm.config.api_key, "[REDACTED_API_KEY]"
)
yield StreamingError(
error=error_msg,
stack_trace=stack_trace,
error_code="CONNECTION_ERROR",
is_retryable=True,
details={
"model": model_llm.config.model_name,
"provider": model_llm.config.model_provider,
"model_index": model_idx,
},
)
elif isinstance(item, Exception):
# Yield a tagged error for this model but keep the other models running.
# Do NOT decrement models_remaining — _run_model's finally always posts

View File

@@ -1125,6 +1125,32 @@ DEFAULT_IMAGE_ANALYSIS_MAX_SIZE_MB = 20
# Number of pre-provisioned tenants to maintain
TARGET_AVAILABLE_TENANTS = int(os.environ.get("TARGET_AVAILABLE_TENANTS", "5"))
# Master switch for the tenant work-gating feature. Controls the `enabled`
# axis only — flipping this True puts the feature in shadow mode (compute
# the gate, log skip counts, but do not actually skip). The `enforce` axis
# is Redis-only with a hard-coded default of False, so this env flag alone
# cannot cause real tenants to be skipped. Default off.
ENABLE_TENANT_WORK_GATING = (
os.environ.get("ENABLE_TENANT_WORK_GATING", "").lower() == "true"
)
# Membership TTL for the `active_tenants` sorted set. Members older than this
# are treated as inactive by the gate read path. Must be > the full-fanout
# interval so self-healing re-adds a genuinely-working tenant before their
# membership expires. Default 30 min.
TENANT_WORK_GATING_TTL_SECONDS = int(
os.environ.get("TENANT_WORK_GATING_TTL_SECONDS", 30 * 60)
)
# Minimum wall-clock interval between full-fanout cycles. When this many
# seconds have elapsed since the last bypass, the generator ignores the gate
# on the next invocation and dispatches to every non-gated tenant, letting
# consumers re-populate the active set. Schedule-independent so beat drift
# or backlog can't make the self-heal bursty or sparse. Default 20 min.
TENANT_WORK_GATING_FULL_FANOUT_INTERVAL_SECONDS = int(
os.environ.get("TENANT_WORK_GATING_FULL_FANOUT_INTERVAL_SECONDS", 20 * 60)
)
# Image summarization configuration
IMAGE_SUMMARIZATION_SYSTEM_PROMPT = os.environ.get(

View File

@@ -26,6 +26,10 @@ from onyx.configs.constants import FileOrigin
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
process_onyx_metadata,
)
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
tabular_file_to_sections,
)
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
@@ -38,6 +42,7 @@ from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_processing.extract_file_text import get_file_ext
@@ -451,6 +456,40 @@ class BlobStorageConnector(LoadConnector, PollConnector):
logger.exception(f"Error processing image {key}")
continue
# Handle tabular files (xlsx, csv, tsv) — produce one
# TabularSection per sheet (or per file for csv/tsv)
# instead of a flat TextSection.
if is_tabular_file(file_name):
try:
downloaded_file = self._download_object(key)
if downloaded_file is None:
continue
tabular_sections = tabular_file_to_sections(
BytesIO(downloaded_file),
file_name=file_name,
link=link,
)
batch.append(
Document(
id=f"{self.bucket_type}:{self.bucket_name}:{key}",
sections=(
tabular_sections
if tabular_sections
else [TabularSection(link=link, text="")]
),
source=DocumentSource(self.bucket_type.value),
semantic_identifier=file_name,
doc_updated_at=last_modified,
metadata={},
)
)
if len(batch) == self.batch_size:
yield batch
batch = []
except Exception:
logger.exception(f"Error processing tabular file {key}")
continue
# Handle text and document files
try:
downloaded_file = self._download_object(key)

View File

@@ -43,7 +43,11 @@ def tabular_file_to_sections(
if lowered.endswith(".xlsx"):
return [
TabularSection(link=f"{file_name} :: {sheet_title}", text=csv_text)
TabularSection(
link=link or file_name,
text=csv_text,
heading=f"{file_name} :: {sheet_title}",
)
for csv_text, sheet_title in xlsx_sheet_extraction(
file, file_name=file_name
)

View File

@@ -15,6 +15,10 @@ from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
)
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rate_limit_builder
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rl_requests
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
tabular_file_to_sections,
)
from onyx.connectors.drupal_wiki.models import DrupalWikiCheckpoint
from onyx.connectors.drupal_wiki.models import DrupalWikiPage
from onyx.connectors.drupal_wiki.models import DrupalWikiPageResponse
@@ -33,6 +37,7 @@ from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_processing.extract_file_text import get_file_ext
@@ -213,7 +218,7 @@ class DrupalWikiConnector(
attachment: dict[str, Any],
page_id: int,
download_url: str,
) -> tuple[list[TextSection | ImageSection], str | None]:
) -> tuple[list[TextSection | ImageSection | TabularSection], str | None]:
"""
Process a single attachment and return generated sections.
@@ -226,7 +231,7 @@ class DrupalWikiConnector(
Tuple of (sections, error_message). If error_message is not None, the
sections list should be treated as invalid.
"""
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
try:
if not self._validate_attachment_filetype(attachment):
@@ -273,6 +278,25 @@ class DrupalWikiConnector(
return sections, None
# Tabular attachments (xlsx, csv, tsv) — produce
# TabularSections instead of a flat TextSection.
if is_tabular_file(file_name):
try:
sections.extend(
tabular_file_to_sections(
BytesIO(raw_bytes),
file_name=file_name,
link=download_url,
)
)
except Exception:
logger.exception(
f"Failed to extract tabular sections from {file_name}"
)
if not sections:
return [], f"No content extracted from tabular file {file_name}"
return sections, None
image_counter = 0
def _store_embedded_image(image_data: bytes, image_name: str) -> None:
@@ -497,7 +521,7 @@ class DrupalWikiConnector(
page_url = build_drupal_wiki_document_id(self.base_url, page.id)
# Create sections with just the page content
sections: list[TextSection | ImageSection] = [
sections: list[TextSection | ImageSection | TabularSection] = [
TextSection(text=text_content, link=page_url)
]

View File

@@ -2,6 +2,7 @@ import json
import os
from datetime import datetime
from datetime import timezone
from io import BytesIO
from pathlib import Path
from typing import Any
from typing import IO
@@ -12,11 +13,16 @@ from onyx.configs.constants import FileOrigin
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
process_onyx_metadata,
)
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
tabular_file_to_sections,
)
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.models import Document
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_processing.extract_file_text import get_file_ext
@@ -179,8 +185,32 @@ def _process_file(
link = onyx_metadata.link or link
# Build sections: first the text as a single Section
sections: list[TextSection | ImageSection] = []
if extraction_result.text_content.strip():
sections: list[TextSection | ImageSection | TabularSection] = []
if is_tabular_file(file_name):
# Produce TabularSections
lowered_name = file_name.lower()
if lowered_name.endswith(".xlsx"):
file.seek(0)
tabular_source: IO[bytes] = file
else:
tabular_source = BytesIO(
extraction_result.text_content.encode("utf-8", errors="replace")
)
try:
sections.extend(
tabular_file_to_sections(
file=tabular_source,
file_name=file_name,
link=link or "",
)
)
except Exception as e:
logger.error(f"Failed to process tabular file {file_name}: {e}")
return []
if not sections:
logger.warning(f"No content extracted from tabular file {file_name}")
return []
elif extraction_result.text_content.strip():
logger.debug(f"Creating TextSection for {file_name} with link: {link}")
sections.append(
TextSection(link=link, text=extraction_result.text_content.strip())

View File

@@ -22,6 +22,7 @@ from typing_extensions import override
from onyx.access.models import ExternalAccess
from onyx.configs.app_configs import GITHUB_CONNECTOR_BASE_URL
from onyx.configs.constants import DocumentSource
from onyx.connectors.connector_runner import CheckpointOutputWrapper
from onyx.connectors.connector_runner import ConnectorRunner
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
@@ -35,10 +36,16 @@ from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import ConnectorCheckpoint
from onyx.connectors.interfaces import ConnectorFailure
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import IndexingHeartbeatInterface
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TextSection
from onyx.utils.logger import setup_logger
@@ -427,7 +434,11 @@ def make_cursor_url_callback(
return cursor_url_callback
class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoint]):
class GithubConnector(
CheckpointedConnectorWithPermSync[GithubConnectorCheckpoint],
SlimConnector,
SlimConnectorWithPermSync,
):
def __init__(
self,
repo_owner: str,
@@ -559,6 +570,7 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
start: datetime | None = None,
end: datetime | None = None,
include_permissions: bool = False,
is_slim: bool = False,
) -> Generator[Document | ConnectorFailure, None, GithubConnectorCheckpoint]:
if self.github_client is None:
raise ConnectorMissingCredentialError("GitHub")
@@ -614,36 +626,46 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
for pr in pr_batch:
num_prs += 1
# we iterate backwards in time, so at this point we stop processing prs
if (
start is not None
and pr.updated_at
and pr.updated_at.replace(tzinfo=timezone.utc) < start
):
done_with_prs = True
break
# Skip PRs updated after the end date
if (
end is not None
and pr.updated_at
and pr.updated_at.replace(tzinfo=timezone.utc) > end
):
continue
try:
yield _convert_pr_to_document(
cast(PullRequest, pr), repo_external_access
if is_slim:
yield Document(
id=pr.html_url,
sections=[],
external_access=repo_external_access,
source=DocumentSource.GITHUB,
semantic_identifier="",
metadata={},
)
except Exception as e:
error_msg = f"Error converting PR to document: {e}"
logger.exception(error_msg)
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=str(pr.id), document_link=pr.html_url
),
failure_message=error_msg,
exception=e,
)
continue
else:
# we iterate backwards in time, so at this point we stop processing prs
if (
start is not None
and pr.updated_at
and pr.updated_at.replace(tzinfo=timezone.utc) < start
):
done_with_prs = True
break
# Skip PRs updated after the end date
if (
end is not None
and pr.updated_at
and pr.updated_at.replace(tzinfo=timezone.utc) > end
):
continue
try:
yield _convert_pr_to_document(
cast(PullRequest, pr), repo_external_access
)
except Exception as e:
error_msg = f"Error converting PR to document: {e}"
logger.exception(error_msg)
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=str(pr.id), document_link=pr.html_url
),
failure_message=error_msg,
exception=e,
)
continue
# If we reach this point with a cursor url in the checkpoint, we were using
# the fallback cursor-based pagination strategy. That strategy tries to get all
@@ -689,38 +711,47 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
for issue in issue_batch:
num_issues += 1
issue = cast(Issue, issue)
# we iterate backwards in time, so at this point we stop processing prs
if (
start is not None
and issue.updated_at.replace(tzinfo=timezone.utc) < start
):
done_with_issues = True
break
# Skip PRs updated after the end date
if (
end is not None
and issue.updated_at.replace(tzinfo=timezone.utc) > end
):
continue
if issue.pull_request is not None:
# PRs are handled separately
continue
try:
yield _convert_issue_to_document(issue, repo_external_access)
except Exception as e:
error_msg = f"Error converting issue to document: {e}"
logger.exception(error_msg)
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=str(issue.id),
document_link=issue.html_url,
),
failure_message=error_msg,
exception=e,
if is_slim:
yield Document(
id=issue.html_url,
sections=[],
external_access=repo_external_access,
source=DocumentSource.GITHUB,
semantic_identifier="",
metadata={},
)
continue
else:
# we iterate backwards in time, so at this point we stop processing issues
if (
start is not None
and issue.updated_at.replace(tzinfo=timezone.utc) < start
):
done_with_issues = True
break
# Skip issues updated after the end date
if (
end is not None
and issue.updated_at.replace(tzinfo=timezone.utc) > end
):
continue
try:
yield _convert_issue_to_document(issue, repo_external_access)
except Exception as e:
error_msg = f"Error converting issue to document: {e}"
logger.exception(error_msg)
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=str(issue.id),
document_link=issue.html_url,
),
failure_message=error_msg,
exception=e,
)
continue
logger.info(f"Fetched {num_issues} issues for repo: {repo.name}")
# if we found any issues on the page, and we're not done, return the checkpoint.
@@ -803,6 +834,60 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
start, end, checkpoint, include_permissions=True
)
def _retrieve_slim_docs(
self,
include_permissions: bool,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
"""Iterate all PRs and issues across all configured repos as SlimDocuments.
Drives _fetch_from_github in a checkpoint loop — each call processes one
page and returns an updated checkpoint. CheckpointOutputWrapper handles
draining the generator and extracting the returned checkpoint. Rate
limiting and pagination are handled centrally by _fetch_from_github via
_get_batch_rate_limited.
"""
checkpoint = self.build_dummy_checkpoint()
while checkpoint.has_more:
batch: list[SlimDocument | HierarchyNode] = []
gen = self._fetch_from_github(
checkpoint, include_permissions=include_permissions, is_slim=True
)
wrapper: CheckpointOutputWrapper[GithubConnectorCheckpoint] = (
CheckpointOutputWrapper()
)
for document, _, _, next_checkpoint in wrapper(gen):
if document is not None:
batch.append(
SlimDocument(
id=document.id, external_access=document.external_access
)
)
if next_checkpoint is not None:
checkpoint = next_checkpoint
if batch:
yield batch
if callback and callback.should_stop():
raise RuntimeError("github_slim_docs: Stop signal detected")
@override
def retrieve_all_slim_docs(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
return self._retrieve_slim_docs(include_permissions=False, callback=callback)
@override
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
return self._retrieve_slim_docs(include_permissions=True, callback=callback)
def validate_connector_settings(self) -> None:
if self.github_client is None:
raise ConnectorMissingCredentialError("GitHub credentials not loaded.")

View File

@@ -13,6 +13,10 @@ from pydantic import BaseModel
from onyx.access.models import ExternalAccess
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import FileOrigin
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
tabular_file_to_sections,
)
from onyx.connectors.google_drive.constants import DRIVE_FOLDER_TYPE
from onyx.connectors.google_drive.constants import DRIVE_SHORTCUT_TYPE
from onyx.connectors.google_drive.models import GDriveMimeType
@@ -28,15 +32,16 @@ from onyx.connectors.models import Document
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import ImageSection
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_processing.extract_file_text import get_file_ext
from onyx.file_processing.extract_file_text import pptx_to_text
from onyx.file_processing.extract_file_text import read_docx_file
from onyx.file_processing.extract_file_text import read_pdf_file
from onyx.file_processing.extract_file_text import xlsx_to_text
from onyx.file_processing.file_types import OnyxFileExtensions
from onyx.file_processing.file_types import OnyxMimeTypes
from onyx.file_processing.file_types import SPREADSHEET_MIME_TYPE
from onyx.file_processing.image_utils import store_image_and_create_section
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import (
@@ -289,7 +294,7 @@ def _download_and_extract_sections_basic(
service: GoogleDriveService,
allow_images: bool,
size_threshold: int,
) -> list[TextSection | ImageSection]:
) -> list[TextSection | ImageSection | TabularSection]:
"""Extract text and images from a Google Drive file."""
file_id = file["id"]
file_name = file["name"]
@@ -308,7 +313,7 @@ def _download_and_extract_sections_basic(
return []
# Store images for later processing
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
try:
section, embedded_id = store_image_and_create_section(
image_data=response_call(),
@@ -323,10 +328,9 @@ def _download_and_extract_sections_basic(
logger.error(f"Failed to process image {file_name}: {e}")
return sections
# For Google Docs, Sheets, and Slides, export as plain text
# For Google Docs, Sheets, and Slides, export via the Drive API
if mime_type in GOOGLE_MIME_TYPES_TO_EXPORT:
export_mime_type = GOOGLE_MIME_TYPES_TO_EXPORT[mime_type]
# Use the correct API call for exporting files
request = service.files().export_media(
fileId=file_id, mimeType=export_mime_type
)
@@ -335,6 +339,17 @@ def _download_and_extract_sections_basic(
logger.warning(f"Failed to export {file_name} as {export_mime_type}")
return []
if export_mime_type in OnyxMimeTypes.TABULAR_MIME_TYPES:
# Synthesize an extension on the filename
ext = ".xlsx" if export_mime_type == SPREADSHEET_MIME_TYPE else ".csv"
return list(
tabular_file_to_sections(
io.BytesIO(response),
file_name=f"{file_name}{ext}",
link=link,
)
)
text = response.decode("utf-8")
return [TextSection(link=link, text=text)]
@@ -356,9 +371,15 @@ def _download_and_extract_sections_basic(
elif (
mime_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
or is_tabular_file(file_name)
):
text = xlsx_to_text(io.BytesIO(response_call()), file_name=file_name)
return [TextSection(link=link, text=text)] if text else []
return list(
tabular_file_to_sections(
io.BytesIO(response_call()),
file_name=file_name,
link=link,
)
)
elif (
mime_type
@@ -369,7 +390,7 @@ def _download_and_extract_sections_basic(
elif mime_type == "application/pdf":
text, _pdf_meta, images = read_pdf_file(io.BytesIO(response_call()))
pdf_sections: list[TextSection | ImageSection] = [
pdf_sections: list[TextSection | ImageSection | TabularSection] = [
TextSection(link=link, text=text)
]
@@ -410,8 +431,9 @@ def _find_nth(haystack: str, needle: str, n: int, start: int = 0) -> int:
def align_basic_advanced(
basic_sections: list[TextSection | ImageSection], adv_sections: list[TextSection]
) -> list[TextSection | ImageSection]:
basic_sections: list[TextSection | ImageSection | TabularSection],
adv_sections: list[TextSection],
) -> list[TextSection | ImageSection | TabularSection]:
"""Align the basic sections with the advanced sections.
In particular, the basic sections contain all content of the file,
including smart chips like dates and doc links. The advanced sections
@@ -428,7 +450,7 @@ def align_basic_advanced(
basic_full_text = "".join(
[section.text for section in basic_sections if isinstance(section, TextSection)]
)
new_sections: list[TextSection | ImageSection] = []
new_sections: list[TextSection | ImageSection | TabularSection] = []
heading_start = 0
for adv_ind in range(1, len(adv_sections)):
heading = adv_sections[adv_ind].text.split(HEADING_DELIMITER)[0]
@@ -599,7 +621,7 @@ def _convert_drive_item_to_document(
"""
Main entry point for converting a Google Drive file => Document object.
"""
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
# Only construct these services when needed
def _get_drive_service() -> GoogleDriveService:
@@ -639,7 +661,9 @@ def _convert_drive_item_to_document(
doc_id=file.get("id", ""),
)
if doc_sections:
sections = cast(list[TextSection | ImageSection], doc_sections)
sections = cast(
list[TextSection | ImageSection | TabularSection], doc_sections
)
if any(SMART_CHIP_CHAR in section.text for section in doc_sections):
logger.debug(
f"found smart chips in {file.get('name')}, aligning with basic sections"

View File

@@ -50,6 +50,7 @@ class Section(BaseModel):
link: str | None = None
text: str | None = None
image_file_id: str | None = None
heading: str | None = None
class TextSection(Section):

View File

@@ -41,6 +41,10 @@ from onyx.configs.app_configs import REQUEST_TIMEOUT_SECONDS
from onyx.configs.app_configs import SHAREPOINT_CONNECTOR_SIZE_THRESHOLD
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import FileOrigin
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
tabular_file_to_sections,
)
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync
from onyx.connectors.interfaces import CheckpointOutput
@@ -60,6 +64,7 @@ from onyx.connectors.models import ExternalAccess
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.connectors.sharepoint.connector_utils import get_sharepoint_external_access
from onyx.db.enums import HierarchyNodeType
@@ -586,7 +591,7 @@ def _convert_driveitem_to_document_with_permissions(
driveitem, f"Failed to download via graph api: {e}", e
)
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
file_ext = get_file_ext(driveitem.name)
if not content_bytes:
@@ -602,6 +607,19 @@ def _convert_driveitem_to_document_with_permissions(
)
image_section.link = driveitem.web_url
sections.append(image_section)
elif is_tabular_file(driveitem.name):
try:
sections.extend(
tabular_file_to_sections(
file=io.BytesIO(content_bytes),
file_name=driveitem.name,
link=driveitem.web_url or "",
)
)
except Exception as e:
logger.warning(
f"Failed to extract tabular sections for '{driveitem.name}': {e}"
)
else:
def _store_embedded_image(img_data: bytes, img_name: str) -> None:

View File

@@ -253,7 +253,7 @@ class TabularChunker(SectionChunker):
payloads=payloads, accumulator=AccumulatorState()
)
sheet_header = section.link or ""
sheet_header = section.heading or ""
chunk_texts = parse_to_chunks(
rows=parsed_rows,
sheet_header=sheet_header,

View File

@@ -551,7 +551,7 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
processed_sections=[
Section(
type=section.type,
text=section.text if isinstance(section, TextSection) else "",
text="" if isinstance(section, ImageSection) else section.text,
link=section.link,
image_file_id=(
section.image_file_id
@@ -617,7 +617,7 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
processed_sections.append(processed_section)
# For TextSection, create a base Section with text and link
elif isinstance(section, TextSection):
else:
processed_section = Section(
type=section.type,
text=section.text or "", # Ensure text is always a string, not None

View File

@@ -290,11 +290,7 @@ def litellm_exception_to_error_msg(
error_code = "BUDGET_EXCEEDED"
is_retryable = False
elif isinstance(core_exception, Timeout):
error_msg = (
"The LLM took too long to respond. "
"If you're running a local model, try increasing the "
"LLM_SOCKET_READ_TIMEOUT environment variable (current default: 120 seconds)."
)
error_msg = "Request timed out: The operation took too long to complete. Please try again."
error_code = "CONNECTION_ERROR"
is_retryable = True
elif isinstance(core_exception, APIError):

View File

@@ -125,6 +125,11 @@ class TenantRedis(redis.Redis):
"sadd",
"srem",
"scard",
"zadd",
"zrangebyscore",
"zremrangebyscore",
"zscore",
"zcard",
"hexists",
"hset",
"hdel",

View File

@@ -0,0 +1,104 @@
"""Redis helpers for the tenant work-gating feature.
One sorted set `active_tenants` under the cloud Redis tenant tracks the last
time each tenant was observed doing work. The fanout generator reads the set
(filtered to entries within a TTL window) and skips tenants that haven't been
active recently.
All public functions no-op in single-tenant mode (`MULTI_TENANT=False`).
"""
import time
from typing import cast
from redis.client import Redis
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
from onyx.redis.redis_pool import get_redis_client
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
logger = setup_logger()
# Unprefixed key. `TenantRedis._prefixed` prepends `cloud:` at call time so
# the full rendered key is `cloud:active_tenants`.
_SET_KEY = "active_tenants"
def _now_ms() -> int:
return int(time.time() * 1000)
def _client() -> Redis:
return get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
def mark_tenant_active(tenant_id: str) -> None:
"""Record that `tenant_id` was just observed doing work (ZADD with the
current timestamp as the score). Best-effort — a Redis failure is logged
and swallowed so it never breaks a writer path.
Call sites:
- Top of each gated beat-task consumer when its "is there work?" query
returns a non-empty result.
- cc_pair create lifecycle hook.
"""
if not MULTI_TENANT:
return
try:
# `mapping={member: score}` syntax; ZADD overwrites the score on
# existing members, which is exactly the refresh semantics we want.
_client().zadd(_SET_KEY, mapping={tenant_id: _now_ms()})
except Exception:
logger.exception(f"mark_tenant_active failed: tenant_id={tenant_id}")
def get_active_tenants(ttl_seconds: int) -> set[str] | None:
"""Return tenants whose last-seen timestamp is within `ttl_seconds` of
now.
Return values:
- `set[str]` (possibly empty) — Redis read succeeded. Empty set means
no tenants are currently marked active; callers should *skip* all
tenants if the gate is enforcing.
- `None` — Redis read failed *or* we are in single-tenant mode. Callers
should fail open (dispatch to every tenant this cycle). Distinguishing
failure from "genuinely empty" prevents a Redis outage from silently
starving every tenant on every enforced cycle.
"""
if not MULTI_TENANT:
return None
cutoff_ms = _now_ms() - (ttl_seconds * 1000)
try:
raw = cast(
list[bytes],
_client().zrangebyscore(_SET_KEY, min=cutoff_ms, max="+inf"),
)
except Exception:
logger.exception("get_active_tenants failed")
return None
return {m.decode() if isinstance(m, bytes) else m for m in raw}
def cleanup_expired(ttl_seconds: int) -> int:
"""Remove members older than `ttl_seconds` from the set. Optional
memory-hygiene helper — correctness does not depend on calling this, but
without it the set grows unboundedly as old tenants accumulate. Returns
the number of members removed."""
if not MULTI_TENANT:
return 0
cutoff_ms = _now_ms() - (ttl_seconds * 1000)
try:
removed = cast(
int,
_client().zremrangebyscore(_SET_KEY, min="-inf", max=f"({cutoff_ms}"),
)
return removed
except Exception:
logger.exception("cleanup_expired failed")
return 0

View File

@@ -7,6 +7,9 @@ from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEF
from onyx.background.celery.tasks.beat_schedule import (
CLOUD_DOC_PERMISSION_SYNC_MULTIPLIER_DEFAULT,
)
from onyx.configs.app_configs import ENABLE_TENANT_WORK_GATING
from onyx.configs.app_configs import TENANT_WORK_GATING_FULL_FANOUT_INTERVAL_SECONDS
from onyx.configs.app_configs import TENANT_WORK_GATING_TTL_SECONDS
from onyx.configs.constants import CLOUD_BUILD_FENCE_LOOKUP_TABLE_INTERVAL_DEFAULT
from onyx.configs.constants import ONYX_CLOUD_REDIS_RUNTIME
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
@@ -139,6 +142,87 @@ class OnyxRuntime:
return value
@staticmethod
def _read_tenant_work_gating_flag(axis: str, default: bool) -> bool:
"""Read `runtime:tenant_work_gating:{axis}` from Redis and interpret
it as a bool. Returns `default` if the key is absent or unparseable.
`axis` is either `enabled` (compute the gate) or `enforce` (actually
skip)."""
r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID)
raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:tenant_work_gating:{axis}")
if raw is None:
return default
try:
return cast(bytes, raw).decode().strip().lower() == "true"
except Exception:
return default
@staticmethod
def get_tenant_work_gating_enabled() -> bool:
"""Should we *compute* the work gate? (read the Redis set, log how
many tenants would be skipped). Env-var `ENABLE_TENANT_WORK_GATING`
is the fallback default when no Redis override is set — it acts as
the master switch that turns the feature on in shadow mode."""
return OnyxRuntime._read_tenant_work_gating_flag(
"enabled", default=ENABLE_TENANT_WORK_GATING
)
@staticmethod
def get_tenant_work_gating_enforce() -> bool:
"""Should we *actually skip* tenants not in the work set?
Deliberately Redis-only with a hard-coded default of False: the env
var `ENABLE_TENANT_WORK_GATING` only flips `enabled` (shadow mode),
never `enforce`. Enforcement has to be turned on by an explicit
`runtime:tenant_work_gating:enforce=true` write so ops can't
accidentally skip real tenant traffic by flipping an env flag. Only
meaningful when `get_tenant_work_gating_enabled()` is also True.
"""
return OnyxRuntime._read_tenant_work_gating_flag("enforce", default=False)
@staticmethod
def get_tenant_work_gating_ttl_seconds() -> int:
"""Membership TTL for the `active_tenants` sorted set. Members older
than this are treated as "no recent work" by the gate read path.
Must be > (full-fanout cadence × base task schedule) so self-healing
has time to refresh memberships before they expire."""
default = TENANT_WORK_GATING_TTL_SECONDS
r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID)
raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:tenant_work_gating:ttl_seconds")
if raw is None:
return default
try:
value = int(cast(bytes, raw).decode())
return value if value > 0 else default
except ValueError:
return default
@staticmethod
def get_tenant_work_gating_full_fanout_interval_seconds() -> int:
"""Minimum wall-clock interval between full-fanout cycles. When at
least this many seconds have elapsed since the last bypass, the
generator ignores the gate on its next invocation and dispatches to
every non-gated tenant, letting consumers re-populate the active
set. Schedule-independent so beat drift or backlog can't skew the
self-heal cadence."""
default = TENANT_WORK_GATING_FULL_FANOUT_INTERVAL_SECONDS
r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID)
raw = r.get(
f"{ONYX_CLOUD_REDIS_RUNTIME}:tenant_work_gating:full_fanout_interval_seconds"
)
if raw is None:
return default
try:
value = int(cast(bytes, raw).decode())
return value if value > 0 else default
except ValueError:
return default
@staticmethod
def get_build_fence_lookup_table_interval() -> int:
"""We maintain an active fence table to make lookups of existing fences efficient.

View File

@@ -34,6 +34,7 @@ from onyx.server.settings.models import UserSettings
from onyx.server.settings.store import load_settings
from onyx.server.settings.store import store_settings
from onyx.utils.logger import setup_logger
from onyx.utils.platform import is_running_in_container
from onyx.utils.variable_functionality import (
fetch_versioned_implementation_with_fallback,
)
@@ -111,6 +112,7 @@ def fetch_settings(
if DISABLE_VECTOR_DB
else DEFAULT_FILE_TOKEN_COUNT_THRESHOLD_K_VECTOR_DB
),
is_containerized=is_running_in_container(),
)

View File

@@ -133,3 +133,7 @@ class UserSettings(Settings):
else DEFAULT_FILE_TOKEN_COUNT_THRESHOLD_K_VECTOR_DB
)
)
# True when the backend is running inside a container (Docker/Podman).
# The frontend uses this to default local-service URLs (e.g. Ollama,
# LM Studio) to host.docker.internal instead of localhost.
is_containerized: bool = False

View File

@@ -5,6 +5,7 @@ from collections.abc import MutableMapping
from logging.handlers import RotatingFileHandler
from typing import Any
from onyx.utils.platform import is_running_in_container
from onyx.utils.tenant import get_tenant_id_short_string
from shared_configs.configs import DEV_LOGGING_ENABLED
from shared_configs.configs import LOG_FILE_NAME
@@ -169,13 +170,6 @@ def get_standard_formatter() -> ColoredFormatter:
)
DANSWER_DOCKER_ENV_STR = "DANSWER_RUNNING_IN_DOCKER"
def is_running_in_container() -> bool:
return os.getenv(DANSWER_DOCKER_ENV_STR) == "true"
def setup_logger(
name: str = __name__,
log_level: int = get_log_level_from_str(),

View File

@@ -0,0 +1,32 @@
import logging
import os
logger = logging.getLogger(__name__)
_ONYX_DOCKER_ENV_STR = "ONYX_RUNNING_IN_DOCKER"
_DANSWER_DOCKER_ENV_STR = "DANSWER_RUNNING_IN_DOCKER"
def _resolve_container_flag() -> bool:
onyx_val = os.getenv(_ONYX_DOCKER_ENV_STR)
if onyx_val is not None:
return onyx_val.lower() == "true"
danswer_val = os.getenv(_DANSWER_DOCKER_ENV_STR)
if danswer_val is not None:
logger.warning(
"%s is deprecated and will be ignored in a future release. "
"Use %s instead.",
_DANSWER_DOCKER_ENV_STR,
_ONYX_DOCKER_ENV_STR,
)
return danswer_val.lower() == "true"
return False
_IS_RUNNING_IN_CONTAINER: bool = _resolve_container_flag()
def is_running_in_container() -> bool:
return _IS_RUNNING_IN_CONTAINER

View File

@@ -0,0 +1,159 @@
"""Tests for the tenant work-gating Redis helpers.
Requires a running Redis instance. Run with::
python -m dotenv -f .vscode/.env run -- pytest \
backend/tests/external_dependency_unit/tenant_work_gating/test_tenant_work_gating.py
"""
import time
from collections.abc import Generator
from unittest.mock import patch
import pytest
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
from onyx.redis import redis_tenant_work_gating as twg
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_tenant_work_gating import _SET_KEY
from onyx.redis.redis_tenant_work_gating import cleanup_expired
from onyx.redis.redis_tenant_work_gating import get_active_tenants
from onyx.redis.redis_tenant_work_gating import mark_tenant_active
@pytest.fixture(autouse=True)
def _multi_tenant_true() -> Generator[None, None, None]:
"""Force MULTI_TENANT=True for the helper module so public functions are
not no-ops during tests."""
with patch.object(twg, "MULTI_TENANT", True):
yield
@pytest.fixture(autouse=True)
def _clean_set() -> Generator[None, None, None]:
"""Clear the active_tenants sorted set before and after each test."""
client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
client.delete(_SET_KEY)
yield
client.delete(_SET_KEY)
def test_mark_adds_tenant_to_set() -> None:
mark_tenant_active("tenant_a")
assert get_active_tenants(ttl_seconds=60) == {"tenant_a"}
def test_mark_refreshes_timestamp() -> None:
"""ZADD overwrites the score on existing members. Without a refresh,
reading with a TTL that excludes the first write should return empty;
after a second mark_tenant_active at a newer timestamp, the same TTL
read should include the tenant. Pins `_now_ms` so the test is
deterministic."""
base_ms = int(time.time() * 1000)
# First write at t=0.
with patch.object(twg, "_now_ms", return_value=base_ms):
mark_tenant_active("tenant_a")
# Read 5s later with a 1s TTL — first write is outside the window.
with patch.object(twg, "_now_ms", return_value=base_ms + 5000):
assert get_active_tenants(ttl_seconds=1) == set()
# Refresh at t=5s.
with patch.object(twg, "_now_ms", return_value=base_ms + 5000):
mark_tenant_active("tenant_a")
# Read at t=5s with a 1s TTL — refreshed write is inside the window.
with patch.object(twg, "_now_ms", return_value=base_ms + 5000):
assert get_active_tenants(ttl_seconds=1) == {"tenant_a"}
def test_get_active_tenants_filters_by_ttl() -> None:
"""Tenant marked in the past, read with a TTL short enough to exclude it."""
# Pin _now_ms so the write happens at t=0 and the read cutoff is
# well after that.
base_ms = int(time.time() * 1000)
with patch.object(twg, "_now_ms", return_value=base_ms):
mark_tenant_active("tenant_old")
# Read 5 seconds later with a 1-second TTL — tenant_old is outside.
with patch.object(twg, "_now_ms", return_value=base_ms + 5000):
assert get_active_tenants(ttl_seconds=1) == set()
# Read 5 seconds later with a 10-second TTL — tenant_old is inside.
with patch.object(twg, "_now_ms", return_value=base_ms + 5000):
assert get_active_tenants(ttl_seconds=10) == {"tenant_old"}
def test_get_active_tenants_multiple_members() -> None:
mark_tenant_active("tenant_a")
mark_tenant_active("tenant_b")
mark_tenant_active("tenant_c")
assert get_active_tenants(ttl_seconds=60) == {"tenant_a", "tenant_b", "tenant_c"}
def test_get_active_tenants_empty_set() -> None:
"""Genuinely-empty set returns an empty set (not None)."""
assert get_active_tenants(ttl_seconds=60) == set()
def test_get_active_tenants_returns_none_on_redis_error() -> None:
"""Callers need to distinguish Redis failure from "no tenants active" so
they can fail open. Simulate failure by patching the client to raise."""
from unittest.mock import MagicMock
failing_client = MagicMock()
failing_client.zrangebyscore.side_effect = RuntimeError("simulated outage")
with patch.object(twg, "_client", return_value=failing_client):
assert get_active_tenants(ttl_seconds=60) is None
def test_get_active_tenants_returns_none_in_single_tenant_mode() -> None:
"""Single-tenant mode returns None so callers can skip the gate entirely
(same fail-open handling as Redis unavailability)."""
with patch.object(twg, "MULTI_TENANT", False):
assert get_active_tenants(ttl_seconds=60) is None
def test_cleanup_expired_removes_only_stale_members() -> None:
"""Seed one stale and one fresh member directly; cleanup should drop only
the stale one."""
now_ms = int(time.time() * 1000)
client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
client.zadd(_SET_KEY, mapping={"tenant_old": now_ms - 10 * 60 * 1000})
client.zadd(_SET_KEY, mapping={"tenant_new": now_ms})
removed = cleanup_expired(ttl_seconds=60)
assert removed == 1
assert get_active_tenants(ttl_seconds=60 * 60) == {"tenant_new"}
def test_cleanup_expired_empty_set_noop() -> None:
assert cleanup_expired(ttl_seconds=60) == 0
def test_noop_when_multi_tenant_false() -> None:
with patch.object(twg, "MULTI_TENANT", False):
mark_tenant_active("tenant_a")
assert get_active_tenants(ttl_seconds=60) is None
assert cleanup_expired(ttl_seconds=60) == 0
# Verify nothing was written while MULTI_TENANT was False.
assert get_active_tenants(ttl_seconds=60) == set()
def test_rendered_key_is_cloud_prefixed() -> None:
"""Exercises TenantRedis auto-prefixing on sorted-set ops. The rendered
Redis key should be `cloud:active_tenants`, not bare `active_tenants`."""
mark_tenant_active("tenant_a")
from onyx.redis.redis_pool import RedisPool
raw = RedisPool().get_raw_client()
assert raw.zscore("cloud:active_tenants", "tenant_a") is not None
assert raw.zscore("active_tenants", "tenant_a") is None

View File

@@ -0,0 +1,172 @@
"""
Tests verifying that GithubConnector implements SlimConnector and SlimConnectorWithPermSync
correctly, and that pruning uses the cheap slim path (no lazy loading).
"""
from collections.abc import Generator
from unittest.mock import MagicMock
from unittest.mock import patch
from unittest.mock import PropertyMock
import pytest
from onyx.access.models import ExternalAccess
from onyx.background.celery.celery_utils import extract_ids_from_runnable_connector
from onyx.connectors.github.connector import GithubConnector
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import SlimDocument
def _make_pr(html_url: str) -> MagicMock:
pr = MagicMock()
pr.html_url = html_url
pr.pull_request = None
# commits and changed_files should never be accessed during slim retrieval
type(pr).commits = PropertyMock(side_effect=AssertionError("lazy load triggered"))
type(pr).changed_files = PropertyMock(
side_effect=AssertionError("lazy load triggered")
)
return pr
def _make_issue(html_url: str) -> MagicMock:
issue = MagicMock()
issue.html_url = html_url
issue.pull_request = None
return issue
def _make_connector(include_issues: bool = False) -> GithubConnector:
connector = GithubConnector(
repo_owner="test-org",
repositories="test-repo",
include_prs=True,
include_issues=include_issues,
)
connector.github_client = MagicMock()
return connector
@pytest.fixture(autouse=True)
def patch_deserialize_repository(mock_repo: MagicMock) -> Generator[None, None, None]:
with patch(
"onyx.connectors.github.connector.deserialize_repository",
return_value=mock_repo,
):
yield
@pytest.fixture
def mock_repo() -> MagicMock:
repo = MagicMock()
repo.name = "test-repo"
repo.id = 123
repo.raw_headers = {"x-github-request-id": "test"}
repo.raw_data = {"id": 123, "name": "test-repo", "full_name": "test-org/test-repo"}
prs = [
_make_pr(f"https://github.com/test-org/test-repo/pull/{i}") for i in range(1, 4)
]
mock_paginated = MagicMock()
mock_paginated.get_page.side_effect = lambda page: prs if page == 0 else []
repo.get_pulls.return_value = mock_paginated
return repo
def test_github_connector_implements_slim_connector() -> None:
connector = _make_connector()
assert isinstance(connector, SlimConnector)
def test_github_connector_implements_slim_connector_with_perm_sync() -> None:
connector = _make_connector()
assert isinstance(connector, SlimConnectorWithPermSync)
def test_retrieve_all_slim_docs_returns_pr_urls(mock_repo: MagicMock) -> None:
connector = _make_connector()
with patch.object(connector, "fetch_configured_repos", return_value=[mock_repo]):
batches = list(connector.retrieve_all_slim_docs())
all_docs = [doc for batch in batches for doc in batch]
assert len(all_docs) == 3
assert all(isinstance(doc, SlimDocument) for doc in all_docs)
assert {doc.id for doc in all_docs if isinstance(doc, SlimDocument)} == {
"https://github.com/test-org/test-repo/pull/1",
"https://github.com/test-org/test-repo/pull/2",
"https://github.com/test-org/test-repo/pull/3",
}
def test_retrieve_all_slim_docs_has_no_external_access(mock_repo: MagicMock) -> None:
"""Pruning does not need permissions — external_access should be None."""
connector = _make_connector()
with patch.object(connector, "fetch_configured_repos", return_value=[mock_repo]):
batches = list(connector.retrieve_all_slim_docs())
all_docs = [doc for batch in batches for doc in batch]
assert all(doc.external_access is None for doc in all_docs)
def test_retrieve_all_slim_docs_perm_sync_populates_external_access(
mock_repo: MagicMock,
) -> None:
connector = _make_connector()
mock_access = MagicMock(spec=ExternalAccess)
with patch.object(connector, "fetch_configured_repos", return_value=[mock_repo]):
with patch(
"onyx.connectors.github.connector.get_external_access_permission",
return_value=mock_access,
) as mock_perm:
batches = list(connector.retrieve_all_slim_docs_perm_sync())
# permission fetched at least once per repo (once per page in checkpoint-based flow)
mock_perm.assert_called_with(mock_repo, connector.github_client)
all_docs = [doc for batch in batches for doc in batch]
assert all(doc.external_access is mock_access for doc in all_docs)
def test_retrieve_all_slim_docs_skips_pr_issues(mock_repo: MagicMock) -> None:
"""Issues that are actually PRs should be skipped when include_issues=True."""
connector = _make_connector(include_issues=True)
pr_issue = MagicMock()
pr_issue.html_url = "https://github.com/test-org/test-repo/pull/99"
pr_issue.pull_request = MagicMock() # non-None means it's a PR
real_issue = _make_issue("https://github.com/test-org/test-repo/issues/1")
issues = [pr_issue, real_issue]
mock_issues_paginated = MagicMock()
mock_issues_paginated.get_page.side_effect = lambda page: (
issues if page == 0 else []
)
mock_repo.get_issues.return_value = mock_issues_paginated
with patch.object(connector, "fetch_configured_repos", return_value=[mock_repo]):
batches = list(connector.retrieve_all_slim_docs())
issue_ids = {
doc.id
for batch in batches
for doc in batch
if isinstance(doc, SlimDocument) and "issues" in doc.id
}
assert issue_ids == {"https://github.com/test-org/test-repo/issues/1"}
def test_pruning_routes_to_slim_connector_path(mock_repo: MagicMock) -> None:
"""extract_ids_from_runnable_connector must use SlimConnector, not CheckpointedConnector."""
connector = _make_connector()
with patch.object(connector, "fetch_configured_repos", return_value=[mock_repo]):
# If the CheckpointedConnector fallback were used instead, it would call
# load_from_checkpoint which hits _convert_pr_to_document and lazy loads.
# We verify the slim path is taken by checking load_from_checkpoint is NOT called.
with patch.object(connector, "load_from_checkpoint") as mock_load:
result = extract_ids_from_runnable_connector(connector)
mock_load.assert_not_called()
assert len(result.raw_id_to_parent) == 3
assert "https://github.com/test-org/test-repo/pull/1" in result.raw_id_to_parent

View File

@@ -33,15 +33,22 @@ def _make_chunker() -> TabularChunker:
return TabularChunker(tokenizer=CharTokenizer())
def _tabular_section(text: str, link: str = "sheet:Test") -> Section:
return TabularSection(text=text, link=link)
_DEFAULT_LINK = "https://example.com/doc"
def _tabular_section(
text: str,
link: str = _DEFAULT_LINK,
heading: str | None = "sheet:Test",
) -> Section:
return TabularSection(text=text, link=link, heading=heading)
class TestTabularChunkerChunkSection:
def test_simple_csv_all_rows_fit_one_chunk(self) -> None:
# --- INPUT -----------------------------------------------------
csv_text = "Name,Age,City\n" "Alice,30,NYC\n" "Bob,25,SF\n"
link = "sheet:People"
heading = "sheet:People"
content_token_limit = 500
# --- EXPECTED --------------------------------------------------
@@ -56,7 +63,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=content_token_limit,
)
@@ -64,7 +71,7 @@ class TestTabularChunkerChunkSection:
# --- ASSERT ----------------------------------------------------
assert [p.text for p in out.payloads] == expected_texts
assert [p.is_continuation for p in out.payloads] == [False]
assert all(p.links == {0: link} for p in out.payloads)
assert all(p.links == {0: _DEFAULT_LINK} for p in out.payloads)
assert out.accumulator.is_empty()
def test_overflow_splits_into_two_deterministic_chunks(self) -> None:
@@ -74,7 +81,7 @@ class TestTabularChunkerChunkSection:
# Each row "col=a, val=1" is 12 tokens; two rows + \n = 25 (fits),
# three rows + 2×\n = 38 (overflows) → split after 2 rows.
csv_text = "col,val\n" "a,1\n" "b,2\n" "c,3\n" "d,4\n"
link = "sheet:S"
heading = "sheet:S"
content_token_limit = 57
# --- EXPECTED --------------------------------------------------
@@ -85,7 +92,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=content_token_limit,
)
@@ -95,7 +102,7 @@ class TestTabularChunkerChunkSection:
# First chunk is fresh; subsequent chunks mark as continuations.
assert [p.is_continuation for p in out.payloads] == [False, True]
# Link carries through every chunk.
assert all(p.links == {0: link} for p in out.payloads)
assert all(p.links == {0: _DEFAULT_LINK} for p in out.payloads)
# Add back in shortly
# def test_header_only_csv_produces_single_prelude_chunk(self) -> None:
@@ -123,7 +130,7 @@ class TestTabularChunkerChunkSection:
# Alice's Age is empty; Bob's City is empty. Empty cells should
# not appear as `field=` pairs in the output.
csv_text = "Name,Age,City\n" "Alice,,NYC\n" "Bob,25,\n"
link = "sheet:P"
heading = "sheet:P"
# --- EXPECTED --------------------------------------------------
expected_texts = [
@@ -137,7 +144,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=500,
)
@@ -151,7 +158,7 @@ class TestTabularChunkerChunkSection:
# a single field. The surrounding quotes are stripped during
# decoding, so the chunk text carries the bare value.
csv_text = "Name,Notes\n" 'Alice,"Hello, world"\n'
link = "sheet:P"
heading = "sheet:P"
# --- EXPECTED --------------------------------------------------
expected_texts = [
@@ -160,7 +167,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=500,
)
@@ -173,7 +180,7 @@ class TestTabularChunkerChunkSection:
# Stray blank rows in the CSV (e.g. export artifacts) shouldn't
# produce ghost rows in the output.
csv_text = "A,B\n" "\n" "1,2\n" "\n" "\n" "3,4\n"
link = "sheet:S"
heading = "sheet:S"
# --- EXPECTED --------------------------------------------------
expected_texts = [
@@ -182,7 +189,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=500,
)
@@ -199,7 +206,7 @@ class TestTabularChunkerChunkSection:
pending_link = "prev-link"
csv_text = "a,b\n" "1,2\n"
link = "sheet:S"
heading = "sheet:S"
# --- EXPECTED --------------------------------------------------
expected_texts = [
@@ -209,7 +216,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(
text=pending_text,
link_offsets={0: pending_link},
@@ -222,7 +229,7 @@ class TestTabularChunkerChunkSection:
# Flushed chunk keeps the prior text's link; tabular chunk uses
# the tabular section's link.
assert out.payloads[0].links == {0: pending_link}
assert out.payloads[1].links == {0: link}
assert out.payloads[1].links == {0: _DEFAULT_LINK}
# Accumulator resets — tabular section is a structural boundary.
assert out.accumulator.is_empty()
@@ -234,7 +241,7 @@ class TestTabularChunkerChunkSection:
csv_text = (
"x\n" "aaaaaaaaaaaaaaaaaa\n" "bbbbbbbbbbbbbbbbbb\n" "cccccccccccccccccc\n"
)
link = "S"
heading = "S"
content_token_limit = 100
# --- EXPECTED --------------------------------------------------
@@ -252,7 +259,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=content_token_limit,
)
@@ -274,7 +281,7 @@ class TestTabularChunkerChunkSection:
# Every emitted chunk therefore carries its full prelude rather
# than dropping Columns at emit time.
csv_text = "x\n" "aa\n" "bb\n" "cc\n" "dd\n" "ee\n"
link = "S"
heading = "S"
content_token_limit = 30
# --- EXPECTED --------------------------------------------------
@@ -290,7 +297,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=content_token_limit,
)
@@ -311,7 +318,7 @@ class TestTabularChunkerChunkSection:
# pieces (they already consume the full budget). A 53-token row
# packs into 3 field-boundary pieces under a 20-token budget.
csv_text = "field 1,field 2,field 3,field 4,field 5\n" "1,2,3,4,5\n"
link = "S"
heading = "S"
content_token_limit = 20
# --- EXPECTED --------------------------------------------------
@@ -331,7 +338,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=content_token_limit,
)
@@ -359,7 +366,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section("", link="sheet:Empty"),
_tabular_section("", heading="sheet:Empty"),
AccumulatorState(
text=pending_text,
link_offsets=pending_link_offsets,
@@ -381,7 +388,7 @@ class TestTabularChunkerChunkSection:
# CSV has one column "x" with a 50-char value. Formatted pair =
# "x=" + 50 a's = 52 tokens. Budget = 10.
csv_text = "x\n" + ("a" * 50) + "\n"
link = "S"
heading = "S"
content_token_limit = 10
# --- EXPECTED --------------------------------------------------
@@ -404,7 +411,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=content_token_limit,
)
@@ -421,7 +428,7 @@ class TestTabularChunkerChunkSection:
# alias appended in parens on the `Columns:` line. Plain headers
# pass through untouched.
csv_text = "MTTR_hours,id,owner_name\n" "3,42,Alice\n"
link = "sheet:M"
heading = "sheet:M"
# --- EXPECTED --------------------------------------------------
expected_texts = [
@@ -434,7 +441,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=500,
)
@@ -455,7 +462,7 @@ class TestTabularChunkerChunkSection:
# populated (tiny). Row 2 is a "fat" row with all four columns
# populated.
csv_text = "a,b,c,d\n" "1,,,\n" "xxx,yyy,zzz,www\n" "2,,,\n"
link = "S"
heading = "S"
content_token_limit = 20
# --- EXPECTED --------------------------------------------------
@@ -481,7 +488,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=content_token_limit,
)
@@ -503,7 +510,7 @@ class TestTabularChunkerChunkSection:
# cols + row: 10+1+3 = 14 ≤ 15 ✓
# sheet + cols + row: 13+1+10+1+3 = 28 > 15 ✗
csv_text = "x\n" "y\n"
link = "LongSheetName"
heading = "LongSheetName"
content_token_limit = 15
# --- EXPECTED --------------------------------------------------
@@ -511,7 +518,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=content_token_limit,
)
@@ -534,7 +541,7 @@ class TestTabularChunkerChunkSection:
# cols + row: 17+1+12 = 30 > 20 ✗
# sheet + row: 1+1+12 = 14 ≤ 20 ✓
csv_text = "ABC,DEF\n" "1,2\n"
link = "S"
heading = "S"
content_token_limit = 20
# --- EXPECTED --------------------------------------------------
@@ -542,7 +549,7 @@ class TestTabularChunkerChunkSection:
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
_tabular_section(csv_text, heading=heading),
AccumulatorState(),
content_token_limit=content_token_limit,
)

View File

@@ -172,7 +172,7 @@ LOG_ONYX_MODEL_INTERACTIONS=False
## Gen AI Settings
# GEN_AI_MAX_TOKENS=
LLM_SOCKET_READ_TIMEOUT=120
# LLM_SOCKET_READ_TIMEOUT=
# MAX_CHUNKS_FED_TO_CHAT=
# DISABLE_LITELLM_STREAMING=
# LITELLM_EXTRA_HEADERS=

View File

@@ -1259,7 +1259,7 @@ configMap:
S3_FILE_STORE_BUCKET_NAME: ""
# Gen AI Settings
GEN_AI_MAX_TOKENS: ""
LLM_SOCKET_READ_TIMEOUT: "120"
LLM_SOCKET_READ_TIMEOUT: "60"
MAX_CHUNKS_FED_TO_CHAT: ""
# Query Options
DOC_TIME_DECAY: ""

View File

@@ -76,6 +76,10 @@ export interface Settings {
// Factory defaults for the restore button.
default_user_file_max_upload_size_mb?: number;
default_file_token_count_threshold_k?: number;
// True when the backend runs inside a container (Docker/Podman).
// Used to default local-service URLs to host.docker.internal.
is_containerized?: boolean;
}
export enum NotificationType {

View File

@@ -27,8 +27,7 @@ import {
import { fetchModels } from "@/lib/llmConfig/svc";
import { toast } from "@/hooks/useToast";
import { refreshLlmProviderCaches } from "@/lib/llmConfig/cache";
const DEFAULT_API_BASE = "http://localhost:1234";
import { useSettingsContext } from "@/providers/SettingsProvider";
interface LMStudioModalValues extends BaseLLMModalValues {
api_base: string;
@@ -116,6 +115,10 @@ export default function LMStudioModal({
}: LLMProviderFormProps) {
const isOnboarding = variant === "onboarding";
const { mutate } = useSWRConfig();
const { settings } = useSettingsContext();
const defaultApiBase = settings.is_containerized
? "http://host.docker.internal:1234"
: "http://localhost:1234";
const onClose = () => onOpenChange?.(false);
@@ -125,7 +128,7 @@ export default function LMStudioModal({
LLMProviderName.LM_STUDIO,
existingLlmProvider
),
api_base: existingLlmProvider?.api_base ?? DEFAULT_API_BASE,
api_base: existingLlmProvider?.api_base ?? defaultApiBase,
custom_config: {
LM_STUDIO_API_KEY: existingLlmProvider?.custom_config?.LM_STUDIO_API_KEY,
},

View File

@@ -31,8 +31,7 @@ import { Card } from "@opal/components";
import { toast } from "@/hooks/useToast";
import { refreshLlmProviderCaches } from "@/lib/llmConfig/cache";
import InputTypeInField from "@/refresh-components/form/InputTypeInField";
const DEFAULT_API_BASE = "http://127.0.0.1:11434";
import { useSettingsContext } from "@/providers/SettingsProvider";
const CLOUD_API_BASE = "https://ollama.com";
enum Tab {
@@ -163,6 +162,10 @@ export default function OllamaModal({
}: LLMProviderFormProps) {
const isOnboarding = variant === "onboarding";
const { mutate } = useSWRConfig();
const { settings } = useSettingsContext();
const defaultApiBase = settings.is_containerized
? "http://host.docker.internal:11434"
: "http://127.0.0.1:11434";
const apiKey = existingLlmProvider?.custom_config?.OLLAMA_API_KEY;
const defaultTab =
existingLlmProvider && !!apiKey ? Tab.TAB_CLOUD : Tab.TAB_SELF_HOSTED;
@@ -176,7 +179,7 @@ export default function OllamaModal({
LLMProviderName.OLLAMA_CHAT,
existingLlmProvider
),
api_base: existingLlmProvider?.api_base ?? DEFAULT_API_BASE,
api_base: existingLlmProvider?.api_base ?? defaultApiBase,
custom_config: {
OLLAMA_API_KEY: apiKey,
},