mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-04-16 15:06:45 +00:00
Compare commits
5 Commits
jamison/ti
...
jamison/co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1fb36093f9 | ||
|
|
560a8f7ab4 | ||
|
|
eaabb19c72 | ||
|
|
d3e5e16150 | ||
|
|
d3739611ba |
@@ -12,7 +12,7 @@ founders@onyx.app for more information. Please visit https://github.com/onyx-dot
|
||||
ARG ENABLE_CRAFT=false
|
||||
|
||||
# DO_NOT_TRACK is used to disable telemetry for Unstructured
|
||||
ENV DANSWER_RUNNING_IN_DOCKER="true" \
|
||||
ENV ONYX_RUNNING_IN_DOCKER="true" \
|
||||
DO_NOT_TRACK="true" \
|
||||
PLAYWRIGHT_BROWSERS_PATH="/app/.cache/ms-playwright"
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Base stage with dependencies
|
||||
FROM python:3.11-slim-bookworm@sha256:9c6f90801e6b68e772b7c0ca74260cbf7af9f320acec894e26fccdaccfbe3b47 AS base
|
||||
|
||||
ENV DANSWER_RUNNING_IN_DOCKER="true" \
|
||||
ENV ONYX_RUNNING_IN_DOCKER="true" \
|
||||
HF_HOME=/app/.cache/huggingface
|
||||
|
||||
COPY --from=ghcr.io/astral-sh/uv:0.9.9 /uv /uvx /bin/
|
||||
|
||||
@@ -5,8 +5,8 @@ from logging.handlers import RotatingFileHandler
|
||||
|
||||
import psutil
|
||||
|
||||
from onyx.utils.logger import is_running_in_container
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.platform import is_running_in_container
|
||||
|
||||
# Regular application logger
|
||||
logger = setup_logger()
|
||||
|
||||
@@ -42,7 +42,7 @@ from onyx.db.models import UserGroup
|
||||
from onyx.db.search_settings import get_active_search_settings_list
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.redis.redis_pool import redis_lock_dump
|
||||
from onyx.utils.logger import is_running_in_container
|
||||
from onyx.utils.platform import is_running_in_container
|
||||
from onyx.utils.telemetry import optional_telemetry
|
||||
from onyx.utils.telemetry import RecordType
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
|
||||
@@ -51,7 +51,6 @@ from onyx.db.enums import ConnectorCredentialPairStatus
|
||||
from onyx.db.enums import SyncStatus
|
||||
from onyx.db.enums import SyncType
|
||||
from onyx.db.hierarchy import delete_orphaned_hierarchy_nodes
|
||||
from onyx.db.hierarchy import link_hierarchy_nodes_to_documents
|
||||
from onyx.db.hierarchy import remove_stale_hierarchy_node_cc_pair_entries
|
||||
from onyx.db.hierarchy import reparent_orphaned_hierarchy_nodes
|
||||
from onyx.db.hierarchy import update_document_parent_hierarchy_nodes
|
||||
@@ -643,16 +642,6 @@ def connector_pruning_generator_task(
|
||||
raw_id_to_parent=all_connector_doc_ids,
|
||||
)
|
||||
|
||||
# Link hierarchy nodes to documents for sources where pages can be
|
||||
# both hierarchy nodes AND documents (e.g. Notion, Confluence)
|
||||
all_doc_id_list = list(all_connector_doc_ids.keys())
|
||||
link_hierarchy_nodes_to_documents(
|
||||
db_session=db_session,
|
||||
document_ids=all_doc_id_list,
|
||||
source=source,
|
||||
commit=True,
|
||||
)
|
||||
|
||||
diff_start = time.monotonic()
|
||||
try:
|
||||
# a list of docs in our local index
|
||||
|
||||
@@ -93,7 +93,6 @@ from onyx.llm.factory import get_llm_for_persona
|
||||
from onyx.llm.factory import get_llm_token_counter
|
||||
from onyx.llm.interfaces import LLM
|
||||
from onyx.llm.interfaces import LLMUserIdentity
|
||||
from onyx.llm.multi_llm import LLMTimeoutError
|
||||
from onyx.llm.override_models import LLMOverride
|
||||
from onyx.llm.request_context import reset_llm_mock_response
|
||||
from onyx.llm.request_context import set_llm_mock_response
|
||||
@@ -1167,32 +1166,6 @@ def _run_models(
|
||||
else:
|
||||
if item is _MODEL_DONE:
|
||||
models_remaining -= 1
|
||||
elif isinstance(item, LLMTimeoutError):
|
||||
model_llm = setup.llms[model_idx]
|
||||
error_msg = (
|
||||
"The LLM took too long to respond. "
|
||||
"If you're running a local model, try increasing the "
|
||||
"LLM_SOCKET_READ_TIMEOUT environment variable "
|
||||
"(current default: 120 seconds)."
|
||||
)
|
||||
stack_trace = "".join(
|
||||
traceback.format_exception(type(item), item, item.__traceback__)
|
||||
)
|
||||
if model_llm.config.api_key and len(model_llm.config.api_key) > 2:
|
||||
stack_trace = stack_trace.replace(
|
||||
model_llm.config.api_key, "[REDACTED_API_KEY]"
|
||||
)
|
||||
yield StreamingError(
|
||||
error=error_msg,
|
||||
stack_trace=stack_trace,
|
||||
error_code="CONNECTION_ERROR",
|
||||
is_retryable=True,
|
||||
details={
|
||||
"model": model_llm.config.model_name,
|
||||
"provider": model_llm.config.model_provider,
|
||||
"model_index": model_idx,
|
||||
},
|
||||
)
|
||||
elif isinstance(item, Exception):
|
||||
# Yield a tagged error for this model but keep the other models running.
|
||||
# Do NOT decrement models_remaining — _run_model's finally always posts
|
||||
|
||||
@@ -1125,6 +1125,32 @@ DEFAULT_IMAGE_ANALYSIS_MAX_SIZE_MB = 20
|
||||
# Number of pre-provisioned tenants to maintain
|
||||
TARGET_AVAILABLE_TENANTS = int(os.environ.get("TARGET_AVAILABLE_TENANTS", "5"))
|
||||
|
||||
# Master switch for the tenant work-gating feature. Controls the `enabled`
|
||||
# axis only — flipping this True puts the feature in shadow mode (compute
|
||||
# the gate, log skip counts, but do not actually skip). The `enforce` axis
|
||||
# is Redis-only with a hard-coded default of False, so this env flag alone
|
||||
# cannot cause real tenants to be skipped. Default off.
|
||||
ENABLE_TENANT_WORK_GATING = (
|
||||
os.environ.get("ENABLE_TENANT_WORK_GATING", "").lower() == "true"
|
||||
)
|
||||
|
||||
# Membership TTL for the `active_tenants` sorted set. Members older than this
|
||||
# are treated as inactive by the gate read path. Must be > the full-fanout
|
||||
# interval so self-healing re-adds a genuinely-working tenant before their
|
||||
# membership expires. Default 30 min.
|
||||
TENANT_WORK_GATING_TTL_SECONDS = int(
|
||||
os.environ.get("TENANT_WORK_GATING_TTL_SECONDS", 30 * 60)
|
||||
)
|
||||
|
||||
# Minimum wall-clock interval between full-fanout cycles. When this many
|
||||
# seconds have elapsed since the last bypass, the generator ignores the gate
|
||||
# on the next invocation and dispatches to every non-gated tenant, letting
|
||||
# consumers re-populate the active set. Schedule-independent so beat drift
|
||||
# or backlog can't make the self-heal bursty or sparse. Default 20 min.
|
||||
TENANT_WORK_GATING_FULL_FANOUT_INTERVAL_SECONDS = int(
|
||||
os.environ.get("TENANT_WORK_GATING_FULL_FANOUT_INTERVAL_SECONDS", 20 * 60)
|
||||
)
|
||||
|
||||
|
||||
# Image summarization configuration
|
||||
IMAGE_SUMMARIZATION_SYSTEM_PROMPT = os.environ.get(
|
||||
|
||||
@@ -26,6 +26,10 @@ from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
|
||||
process_onyx_metadata,
|
||||
)
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
tabular_file_to_sections,
|
||||
)
|
||||
from onyx.connectors.exceptions import ConnectorValidationError
|
||||
from onyx.connectors.exceptions import CredentialExpiredError
|
||||
from onyx.connectors.exceptions import InsufficientPermissionsError
|
||||
@@ -38,6 +42,7 @@ from onyx.connectors.models import ConnectorMissingCredentialError
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import HierarchyNode
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import TabularSection
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.file_processing.extract_file_text import extract_text_and_images
|
||||
from onyx.file_processing.extract_file_text import get_file_ext
|
||||
@@ -451,6 +456,40 @@ class BlobStorageConnector(LoadConnector, PollConnector):
|
||||
logger.exception(f"Error processing image {key}")
|
||||
continue
|
||||
|
||||
# Handle tabular files (xlsx, csv, tsv) — produce one
|
||||
# TabularSection per sheet (or per file for csv/tsv)
|
||||
# instead of a flat TextSection.
|
||||
if is_tabular_file(file_name):
|
||||
try:
|
||||
downloaded_file = self._download_object(key)
|
||||
if downloaded_file is None:
|
||||
continue
|
||||
tabular_sections = tabular_file_to_sections(
|
||||
BytesIO(downloaded_file),
|
||||
file_name=file_name,
|
||||
link=link,
|
||||
)
|
||||
batch.append(
|
||||
Document(
|
||||
id=f"{self.bucket_type}:{self.bucket_name}:{key}",
|
||||
sections=(
|
||||
tabular_sections
|
||||
if tabular_sections
|
||||
else [TabularSection(link=link, text="")]
|
||||
),
|
||||
source=DocumentSource(self.bucket_type.value),
|
||||
semantic_identifier=file_name,
|
||||
doc_updated_at=last_modified,
|
||||
metadata={},
|
||||
)
|
||||
)
|
||||
if len(batch) == self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
except Exception:
|
||||
logger.exception(f"Error processing tabular file {key}")
|
||||
continue
|
||||
|
||||
# Handle text and document files
|
||||
try:
|
||||
downloaded_file = self._download_object(key)
|
||||
|
||||
@@ -43,7 +43,11 @@ def tabular_file_to_sections(
|
||||
|
||||
if lowered.endswith(".xlsx"):
|
||||
return [
|
||||
TabularSection(link=f"{file_name} :: {sheet_title}", text=csv_text)
|
||||
TabularSection(
|
||||
link=link or file_name,
|
||||
text=csv_text,
|
||||
heading=f"{file_name} :: {sheet_title}",
|
||||
)
|
||||
for csv_text, sheet_title in xlsx_sheet_extraction(
|
||||
file, file_name=file_name
|
||||
)
|
||||
|
||||
@@ -15,6 +15,10 @@ from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
|
||||
)
|
||||
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rate_limit_builder
|
||||
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rl_requests
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
tabular_file_to_sections,
|
||||
)
|
||||
from onyx.connectors.drupal_wiki.models import DrupalWikiCheckpoint
|
||||
from onyx.connectors.drupal_wiki.models import DrupalWikiPage
|
||||
from onyx.connectors.drupal_wiki.models import DrupalWikiPageResponse
|
||||
@@ -33,6 +37,7 @@ from onyx.connectors.models import DocumentFailure
|
||||
from onyx.connectors.models import HierarchyNode
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.connectors.models import TabularSection
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.file_processing.extract_file_text import extract_text_and_images
|
||||
from onyx.file_processing.extract_file_text import get_file_ext
|
||||
@@ -213,7 +218,7 @@ class DrupalWikiConnector(
|
||||
attachment: dict[str, Any],
|
||||
page_id: int,
|
||||
download_url: str,
|
||||
) -> tuple[list[TextSection | ImageSection], str | None]:
|
||||
) -> tuple[list[TextSection | ImageSection | TabularSection], str | None]:
|
||||
"""
|
||||
Process a single attachment and return generated sections.
|
||||
|
||||
@@ -226,7 +231,7 @@ class DrupalWikiConnector(
|
||||
Tuple of (sections, error_message). If error_message is not None, the
|
||||
sections list should be treated as invalid.
|
||||
"""
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
|
||||
try:
|
||||
if not self._validate_attachment_filetype(attachment):
|
||||
@@ -273,6 +278,25 @@ class DrupalWikiConnector(
|
||||
|
||||
return sections, None
|
||||
|
||||
# Tabular attachments (xlsx, csv, tsv) — produce
|
||||
# TabularSections instead of a flat TextSection.
|
||||
if is_tabular_file(file_name):
|
||||
try:
|
||||
sections.extend(
|
||||
tabular_file_to_sections(
|
||||
BytesIO(raw_bytes),
|
||||
file_name=file_name,
|
||||
link=download_url,
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
f"Failed to extract tabular sections from {file_name}"
|
||||
)
|
||||
if not sections:
|
||||
return [], f"No content extracted from tabular file {file_name}"
|
||||
return sections, None
|
||||
|
||||
image_counter = 0
|
||||
|
||||
def _store_embedded_image(image_data: bytes, image_name: str) -> None:
|
||||
@@ -497,7 +521,7 @@ class DrupalWikiConnector(
|
||||
page_url = build_drupal_wiki_document_id(self.base_url, page.id)
|
||||
|
||||
# Create sections with just the page content
|
||||
sections: list[TextSection | ImageSection] = [
|
||||
sections: list[TextSection | ImageSection | TabularSection] = [
|
||||
TextSection(text=text_content, link=page_url)
|
||||
]
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import IO
|
||||
@@ -12,11 +13,16 @@ from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
|
||||
process_onyx_metadata,
|
||||
)
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
tabular_file_to_sections,
|
||||
)
|
||||
from onyx.connectors.interfaces import GenerateDocumentsOutput
|
||||
from onyx.connectors.interfaces import LoadConnector
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import HierarchyNode
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import TabularSection
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.file_processing.extract_file_text import extract_text_and_images
|
||||
from onyx.file_processing.extract_file_text import get_file_ext
|
||||
@@ -179,8 +185,32 @@ def _process_file(
|
||||
link = onyx_metadata.link or link
|
||||
|
||||
# Build sections: first the text as a single Section
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
if extraction_result.text_content.strip():
|
||||
sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
if is_tabular_file(file_name):
|
||||
# Produce TabularSections
|
||||
lowered_name = file_name.lower()
|
||||
if lowered_name.endswith(".xlsx"):
|
||||
file.seek(0)
|
||||
tabular_source: IO[bytes] = file
|
||||
else:
|
||||
tabular_source = BytesIO(
|
||||
extraction_result.text_content.encode("utf-8", errors="replace")
|
||||
)
|
||||
try:
|
||||
sections.extend(
|
||||
tabular_file_to_sections(
|
||||
file=tabular_source,
|
||||
file_name=file_name,
|
||||
link=link or "",
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process tabular file {file_name}: {e}")
|
||||
return []
|
||||
if not sections:
|
||||
logger.warning(f"No content extracted from tabular file {file_name}")
|
||||
return []
|
||||
elif extraction_result.text_content.strip():
|
||||
logger.debug(f"Creating TextSection for {file_name} with link: {link}")
|
||||
sections.append(
|
||||
TextSection(link=link, text=extraction_result.text_content.strip())
|
||||
|
||||
@@ -22,6 +22,7 @@ from typing_extensions import override
|
||||
from onyx.access.models import ExternalAccess
|
||||
from onyx.configs.app_configs import GITHUB_CONNECTOR_BASE_URL
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.connectors.connector_runner import CheckpointOutputWrapper
|
||||
from onyx.connectors.connector_runner import ConnectorRunner
|
||||
from onyx.connectors.exceptions import ConnectorValidationError
|
||||
from onyx.connectors.exceptions import CredentialExpiredError
|
||||
@@ -35,10 +36,16 @@ from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync
|
||||
from onyx.connectors.interfaces import CheckpointOutput
|
||||
from onyx.connectors.interfaces import ConnectorCheckpoint
|
||||
from onyx.connectors.interfaces import ConnectorFailure
|
||||
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
|
||||
from onyx.connectors.interfaces import IndexingHeartbeatInterface
|
||||
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
|
||||
from onyx.connectors.interfaces import SlimConnector
|
||||
from onyx.connectors.interfaces import SlimConnectorWithPermSync
|
||||
from onyx.connectors.models import ConnectorMissingCredentialError
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import DocumentFailure
|
||||
from onyx.connectors.models import HierarchyNode
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
@@ -427,7 +434,11 @@ def make_cursor_url_callback(
|
||||
return cursor_url_callback
|
||||
|
||||
|
||||
class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoint]):
|
||||
class GithubConnector(
|
||||
CheckpointedConnectorWithPermSync[GithubConnectorCheckpoint],
|
||||
SlimConnector,
|
||||
SlimConnectorWithPermSync,
|
||||
):
|
||||
def __init__(
|
||||
self,
|
||||
repo_owner: str,
|
||||
@@ -559,6 +570,7 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
|
||||
start: datetime | None = None,
|
||||
end: datetime | None = None,
|
||||
include_permissions: bool = False,
|
||||
is_slim: bool = False,
|
||||
) -> Generator[Document | ConnectorFailure, None, GithubConnectorCheckpoint]:
|
||||
if self.github_client is None:
|
||||
raise ConnectorMissingCredentialError("GitHub")
|
||||
@@ -614,36 +626,46 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
|
||||
for pr in pr_batch:
|
||||
num_prs += 1
|
||||
|
||||
# we iterate backwards in time, so at this point we stop processing prs
|
||||
if (
|
||||
start is not None
|
||||
and pr.updated_at
|
||||
and pr.updated_at.replace(tzinfo=timezone.utc) < start
|
||||
):
|
||||
done_with_prs = True
|
||||
break
|
||||
# Skip PRs updated after the end date
|
||||
if (
|
||||
end is not None
|
||||
and pr.updated_at
|
||||
and pr.updated_at.replace(tzinfo=timezone.utc) > end
|
||||
):
|
||||
continue
|
||||
try:
|
||||
yield _convert_pr_to_document(
|
||||
cast(PullRequest, pr), repo_external_access
|
||||
if is_slim:
|
||||
yield Document(
|
||||
id=pr.html_url,
|
||||
sections=[],
|
||||
external_access=repo_external_access,
|
||||
source=DocumentSource.GITHUB,
|
||||
semantic_identifier="",
|
||||
metadata={},
|
||||
)
|
||||
except Exception as e:
|
||||
error_msg = f"Error converting PR to document: {e}"
|
||||
logger.exception(error_msg)
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=str(pr.id), document_link=pr.html_url
|
||||
),
|
||||
failure_message=error_msg,
|
||||
exception=e,
|
||||
)
|
||||
continue
|
||||
else:
|
||||
# we iterate backwards in time, so at this point we stop processing prs
|
||||
if (
|
||||
start is not None
|
||||
and pr.updated_at
|
||||
and pr.updated_at.replace(tzinfo=timezone.utc) < start
|
||||
):
|
||||
done_with_prs = True
|
||||
break
|
||||
# Skip PRs updated after the end date
|
||||
if (
|
||||
end is not None
|
||||
and pr.updated_at
|
||||
and pr.updated_at.replace(tzinfo=timezone.utc) > end
|
||||
):
|
||||
continue
|
||||
try:
|
||||
yield _convert_pr_to_document(
|
||||
cast(PullRequest, pr), repo_external_access
|
||||
)
|
||||
except Exception as e:
|
||||
error_msg = f"Error converting PR to document: {e}"
|
||||
logger.exception(error_msg)
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=str(pr.id), document_link=pr.html_url
|
||||
),
|
||||
failure_message=error_msg,
|
||||
exception=e,
|
||||
)
|
||||
continue
|
||||
|
||||
# If we reach this point with a cursor url in the checkpoint, we were using
|
||||
# the fallback cursor-based pagination strategy. That strategy tries to get all
|
||||
@@ -689,38 +711,47 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
|
||||
for issue in issue_batch:
|
||||
num_issues += 1
|
||||
issue = cast(Issue, issue)
|
||||
# we iterate backwards in time, so at this point we stop processing prs
|
||||
if (
|
||||
start is not None
|
||||
and issue.updated_at.replace(tzinfo=timezone.utc) < start
|
||||
):
|
||||
done_with_issues = True
|
||||
break
|
||||
# Skip PRs updated after the end date
|
||||
if (
|
||||
end is not None
|
||||
and issue.updated_at.replace(tzinfo=timezone.utc) > end
|
||||
):
|
||||
continue
|
||||
|
||||
if issue.pull_request is not None:
|
||||
# PRs are handled separately
|
||||
continue
|
||||
|
||||
try:
|
||||
yield _convert_issue_to_document(issue, repo_external_access)
|
||||
except Exception as e:
|
||||
error_msg = f"Error converting issue to document: {e}"
|
||||
logger.exception(error_msg)
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=str(issue.id),
|
||||
document_link=issue.html_url,
|
||||
),
|
||||
failure_message=error_msg,
|
||||
exception=e,
|
||||
if is_slim:
|
||||
yield Document(
|
||||
id=issue.html_url,
|
||||
sections=[],
|
||||
external_access=repo_external_access,
|
||||
source=DocumentSource.GITHUB,
|
||||
semantic_identifier="",
|
||||
metadata={},
|
||||
)
|
||||
continue
|
||||
else:
|
||||
# we iterate backwards in time, so at this point we stop processing issues
|
||||
if (
|
||||
start is not None
|
||||
and issue.updated_at.replace(tzinfo=timezone.utc) < start
|
||||
):
|
||||
done_with_issues = True
|
||||
break
|
||||
# Skip issues updated after the end date
|
||||
if (
|
||||
end is not None
|
||||
and issue.updated_at.replace(tzinfo=timezone.utc) > end
|
||||
):
|
||||
continue
|
||||
try:
|
||||
yield _convert_issue_to_document(issue, repo_external_access)
|
||||
except Exception as e:
|
||||
error_msg = f"Error converting issue to document: {e}"
|
||||
logger.exception(error_msg)
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=str(issue.id),
|
||||
document_link=issue.html_url,
|
||||
),
|
||||
failure_message=error_msg,
|
||||
exception=e,
|
||||
)
|
||||
continue
|
||||
|
||||
logger.info(f"Fetched {num_issues} issues for repo: {repo.name}")
|
||||
# if we found any issues on the page, and we're not done, return the checkpoint.
|
||||
@@ -803,6 +834,60 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
|
||||
start, end, checkpoint, include_permissions=True
|
||||
)
|
||||
|
||||
def _retrieve_slim_docs(
|
||||
self,
|
||||
include_permissions: bool,
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
) -> GenerateSlimDocumentOutput:
|
||||
"""Iterate all PRs and issues across all configured repos as SlimDocuments.
|
||||
|
||||
Drives _fetch_from_github in a checkpoint loop — each call processes one
|
||||
page and returns an updated checkpoint. CheckpointOutputWrapper handles
|
||||
draining the generator and extracting the returned checkpoint. Rate
|
||||
limiting and pagination are handled centrally by _fetch_from_github via
|
||||
_get_batch_rate_limited.
|
||||
"""
|
||||
checkpoint = self.build_dummy_checkpoint()
|
||||
while checkpoint.has_more:
|
||||
batch: list[SlimDocument | HierarchyNode] = []
|
||||
gen = self._fetch_from_github(
|
||||
checkpoint, include_permissions=include_permissions, is_slim=True
|
||||
)
|
||||
wrapper: CheckpointOutputWrapper[GithubConnectorCheckpoint] = (
|
||||
CheckpointOutputWrapper()
|
||||
)
|
||||
for document, _, _, next_checkpoint in wrapper(gen):
|
||||
if document is not None:
|
||||
batch.append(
|
||||
SlimDocument(
|
||||
id=document.id, external_access=document.external_access
|
||||
)
|
||||
)
|
||||
if next_checkpoint is not None:
|
||||
checkpoint = next_checkpoint
|
||||
if batch:
|
||||
yield batch
|
||||
if callback and callback.should_stop():
|
||||
raise RuntimeError("github_slim_docs: Stop signal detected")
|
||||
|
||||
@override
|
||||
def retrieve_all_slim_docs(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
) -> GenerateSlimDocumentOutput:
|
||||
return self._retrieve_slim_docs(include_permissions=False, callback=callback)
|
||||
|
||||
@override
|
||||
def retrieve_all_slim_docs_perm_sync(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
) -> GenerateSlimDocumentOutput:
|
||||
return self._retrieve_slim_docs(include_permissions=True, callback=callback)
|
||||
|
||||
def validate_connector_settings(self) -> None:
|
||||
if self.github_client is None:
|
||||
raise ConnectorMissingCredentialError("GitHub credentials not loaded.")
|
||||
|
||||
@@ -13,6 +13,10 @@ from pydantic import BaseModel
|
||||
from onyx.access.models import ExternalAccess
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
tabular_file_to_sections,
|
||||
)
|
||||
from onyx.connectors.google_drive.constants import DRIVE_FOLDER_TYPE
|
||||
from onyx.connectors.google_drive.constants import DRIVE_SHORTCUT_TYPE
|
||||
from onyx.connectors.google_drive.models import GDriveMimeType
|
||||
@@ -28,15 +32,16 @@ from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import DocumentFailure
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.connectors.models import TabularSection
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.file_processing.extract_file_text import extract_file_text
|
||||
from onyx.file_processing.extract_file_text import get_file_ext
|
||||
from onyx.file_processing.extract_file_text import pptx_to_text
|
||||
from onyx.file_processing.extract_file_text import read_docx_file
|
||||
from onyx.file_processing.extract_file_text import read_pdf_file
|
||||
from onyx.file_processing.extract_file_text import xlsx_to_text
|
||||
from onyx.file_processing.file_types import OnyxFileExtensions
|
||||
from onyx.file_processing.file_types import OnyxMimeTypes
|
||||
from onyx.file_processing.file_types import SPREADSHEET_MIME_TYPE
|
||||
from onyx.file_processing.image_utils import store_image_and_create_section
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.variable_functionality import (
|
||||
@@ -289,7 +294,7 @@ def _download_and_extract_sections_basic(
|
||||
service: GoogleDriveService,
|
||||
allow_images: bool,
|
||||
size_threshold: int,
|
||||
) -> list[TextSection | ImageSection]:
|
||||
) -> list[TextSection | ImageSection | TabularSection]:
|
||||
"""Extract text and images from a Google Drive file."""
|
||||
file_id = file["id"]
|
||||
file_name = file["name"]
|
||||
@@ -308,7 +313,7 @@ def _download_and_extract_sections_basic(
|
||||
return []
|
||||
|
||||
# Store images for later processing
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
try:
|
||||
section, embedded_id = store_image_and_create_section(
|
||||
image_data=response_call(),
|
||||
@@ -323,10 +328,9 @@ def _download_and_extract_sections_basic(
|
||||
logger.error(f"Failed to process image {file_name}: {e}")
|
||||
return sections
|
||||
|
||||
# For Google Docs, Sheets, and Slides, export as plain text
|
||||
# For Google Docs, Sheets, and Slides, export via the Drive API
|
||||
if mime_type in GOOGLE_MIME_TYPES_TO_EXPORT:
|
||||
export_mime_type = GOOGLE_MIME_TYPES_TO_EXPORT[mime_type]
|
||||
# Use the correct API call for exporting files
|
||||
request = service.files().export_media(
|
||||
fileId=file_id, mimeType=export_mime_type
|
||||
)
|
||||
@@ -335,6 +339,17 @@ def _download_and_extract_sections_basic(
|
||||
logger.warning(f"Failed to export {file_name} as {export_mime_type}")
|
||||
return []
|
||||
|
||||
if export_mime_type in OnyxMimeTypes.TABULAR_MIME_TYPES:
|
||||
# Synthesize an extension on the filename
|
||||
ext = ".xlsx" if export_mime_type == SPREADSHEET_MIME_TYPE else ".csv"
|
||||
return list(
|
||||
tabular_file_to_sections(
|
||||
io.BytesIO(response),
|
||||
file_name=f"{file_name}{ext}",
|
||||
link=link,
|
||||
)
|
||||
)
|
||||
|
||||
text = response.decode("utf-8")
|
||||
return [TextSection(link=link, text=text)]
|
||||
|
||||
@@ -356,9 +371,15 @@ def _download_and_extract_sections_basic(
|
||||
|
||||
elif (
|
||||
mime_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
||||
or is_tabular_file(file_name)
|
||||
):
|
||||
text = xlsx_to_text(io.BytesIO(response_call()), file_name=file_name)
|
||||
return [TextSection(link=link, text=text)] if text else []
|
||||
return list(
|
||||
tabular_file_to_sections(
|
||||
io.BytesIO(response_call()),
|
||||
file_name=file_name,
|
||||
link=link,
|
||||
)
|
||||
)
|
||||
|
||||
elif (
|
||||
mime_type
|
||||
@@ -369,7 +390,7 @@ def _download_and_extract_sections_basic(
|
||||
|
||||
elif mime_type == "application/pdf":
|
||||
text, _pdf_meta, images = read_pdf_file(io.BytesIO(response_call()))
|
||||
pdf_sections: list[TextSection | ImageSection] = [
|
||||
pdf_sections: list[TextSection | ImageSection | TabularSection] = [
|
||||
TextSection(link=link, text=text)
|
||||
]
|
||||
|
||||
@@ -410,8 +431,9 @@ def _find_nth(haystack: str, needle: str, n: int, start: int = 0) -> int:
|
||||
|
||||
|
||||
def align_basic_advanced(
|
||||
basic_sections: list[TextSection | ImageSection], adv_sections: list[TextSection]
|
||||
) -> list[TextSection | ImageSection]:
|
||||
basic_sections: list[TextSection | ImageSection | TabularSection],
|
||||
adv_sections: list[TextSection],
|
||||
) -> list[TextSection | ImageSection | TabularSection]:
|
||||
"""Align the basic sections with the advanced sections.
|
||||
In particular, the basic sections contain all content of the file,
|
||||
including smart chips like dates and doc links. The advanced sections
|
||||
@@ -428,7 +450,7 @@ def align_basic_advanced(
|
||||
basic_full_text = "".join(
|
||||
[section.text for section in basic_sections if isinstance(section, TextSection)]
|
||||
)
|
||||
new_sections: list[TextSection | ImageSection] = []
|
||||
new_sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
heading_start = 0
|
||||
for adv_ind in range(1, len(adv_sections)):
|
||||
heading = adv_sections[adv_ind].text.split(HEADING_DELIMITER)[0]
|
||||
@@ -599,7 +621,7 @@ def _convert_drive_item_to_document(
|
||||
"""
|
||||
Main entry point for converting a Google Drive file => Document object.
|
||||
"""
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
|
||||
# Only construct these services when needed
|
||||
def _get_drive_service() -> GoogleDriveService:
|
||||
@@ -639,7 +661,9 @@ def _convert_drive_item_to_document(
|
||||
doc_id=file.get("id", ""),
|
||||
)
|
||||
if doc_sections:
|
||||
sections = cast(list[TextSection | ImageSection], doc_sections)
|
||||
sections = cast(
|
||||
list[TextSection | ImageSection | TabularSection], doc_sections
|
||||
)
|
||||
if any(SMART_CHIP_CHAR in section.text for section in doc_sections):
|
||||
logger.debug(
|
||||
f"found smart chips in {file.get('name')}, aligning with basic sections"
|
||||
|
||||
@@ -50,6 +50,7 @@ class Section(BaseModel):
|
||||
link: str | None = None
|
||||
text: str | None = None
|
||||
image_file_id: str | None = None
|
||||
heading: str | None = None
|
||||
|
||||
|
||||
class TextSection(Section):
|
||||
|
||||
@@ -41,6 +41,10 @@ from onyx.configs.app_configs import REQUEST_TIMEOUT_SECONDS
|
||||
from onyx.configs.app_configs import SHAREPOINT_CONNECTOR_SIZE_THRESHOLD
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import is_tabular_file
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
tabular_file_to_sections,
|
||||
)
|
||||
from onyx.connectors.exceptions import ConnectorValidationError
|
||||
from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync
|
||||
from onyx.connectors.interfaces import CheckpointOutput
|
||||
@@ -60,6 +64,7 @@ from onyx.connectors.models import ExternalAccess
|
||||
from onyx.connectors.models import HierarchyNode
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.connectors.models import TabularSection
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.connectors.sharepoint.connector_utils import get_sharepoint_external_access
|
||||
from onyx.db.enums import HierarchyNodeType
|
||||
@@ -586,7 +591,7 @@ def _convert_driveitem_to_document_with_permissions(
|
||||
driveitem, f"Failed to download via graph api: {e}", e
|
||||
)
|
||||
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
file_ext = get_file_ext(driveitem.name)
|
||||
|
||||
if not content_bytes:
|
||||
@@ -602,6 +607,19 @@ def _convert_driveitem_to_document_with_permissions(
|
||||
)
|
||||
image_section.link = driveitem.web_url
|
||||
sections.append(image_section)
|
||||
elif is_tabular_file(driveitem.name):
|
||||
try:
|
||||
sections.extend(
|
||||
tabular_file_to_sections(
|
||||
file=io.BytesIO(content_bytes),
|
||||
file_name=driveitem.name,
|
||||
link=driveitem.web_url or "",
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to extract tabular sections for '{driveitem.name}': {e}"
|
||||
)
|
||||
else:
|
||||
|
||||
def _store_embedded_image(img_data: bytes, img_name: str) -> None:
|
||||
|
||||
@@ -253,7 +253,7 @@ class TabularChunker(SectionChunker):
|
||||
payloads=payloads, accumulator=AccumulatorState()
|
||||
)
|
||||
|
||||
sheet_header = section.link or ""
|
||||
sheet_header = section.heading or ""
|
||||
chunk_texts = parse_to_chunks(
|
||||
rows=parsed_rows,
|
||||
sheet_header=sheet_header,
|
||||
|
||||
@@ -551,7 +551,7 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
|
||||
processed_sections=[
|
||||
Section(
|
||||
type=section.type,
|
||||
text=section.text if isinstance(section, TextSection) else "",
|
||||
text="" if isinstance(section, ImageSection) else section.text,
|
||||
link=section.link,
|
||||
image_file_id=(
|
||||
section.image_file_id
|
||||
@@ -617,7 +617,7 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
|
||||
processed_sections.append(processed_section)
|
||||
|
||||
# For TextSection, create a base Section with text and link
|
||||
elif isinstance(section, TextSection):
|
||||
else:
|
||||
processed_section = Section(
|
||||
type=section.type,
|
||||
text=section.text or "", # Ensure text is always a string, not None
|
||||
|
||||
@@ -290,11 +290,7 @@ def litellm_exception_to_error_msg(
|
||||
error_code = "BUDGET_EXCEEDED"
|
||||
is_retryable = False
|
||||
elif isinstance(core_exception, Timeout):
|
||||
error_msg = (
|
||||
"The LLM took too long to respond. "
|
||||
"If you're running a local model, try increasing the "
|
||||
"LLM_SOCKET_READ_TIMEOUT environment variable (current default: 120 seconds)."
|
||||
)
|
||||
error_msg = "Request timed out: The operation took too long to complete. Please try again."
|
||||
error_code = "CONNECTION_ERROR"
|
||||
is_retryable = True
|
||||
elif isinstance(core_exception, APIError):
|
||||
|
||||
@@ -125,6 +125,11 @@ class TenantRedis(redis.Redis):
|
||||
"sadd",
|
||||
"srem",
|
||||
"scard",
|
||||
"zadd",
|
||||
"zrangebyscore",
|
||||
"zremrangebyscore",
|
||||
"zscore",
|
||||
"zcard",
|
||||
"hexists",
|
||||
"hset",
|
||||
"hdel",
|
||||
|
||||
104
backend/onyx/redis/redis_tenant_work_gating.py
Normal file
104
backend/onyx/redis/redis_tenant_work_gating.py
Normal file
@@ -0,0 +1,104 @@
|
||||
"""Redis helpers for the tenant work-gating feature.
|
||||
|
||||
One sorted set `active_tenants` under the cloud Redis tenant tracks the last
|
||||
time each tenant was observed doing work. The fanout generator reads the set
|
||||
(filtered to entries within a TTL window) and skips tenants that haven't been
|
||||
active recently.
|
||||
|
||||
All public functions no-op in single-tenant mode (`MULTI_TENANT=False`).
|
||||
"""
|
||||
|
||||
import time
|
||||
from typing import cast
|
||||
|
||||
from redis.client import Redis
|
||||
|
||||
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.utils.logger import setup_logger
|
||||
from shared_configs.configs import MULTI_TENANT
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
# Unprefixed key. `TenantRedis._prefixed` prepends `cloud:` at call time so
|
||||
# the full rendered key is `cloud:active_tenants`.
|
||||
_SET_KEY = "active_tenants"
|
||||
|
||||
|
||||
def _now_ms() -> int:
|
||||
return int(time.time() * 1000)
|
||||
|
||||
|
||||
def _client() -> Redis:
|
||||
return get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
|
||||
|
||||
|
||||
def mark_tenant_active(tenant_id: str) -> None:
|
||||
"""Record that `tenant_id` was just observed doing work (ZADD with the
|
||||
current timestamp as the score). Best-effort — a Redis failure is logged
|
||||
and swallowed so it never breaks a writer path.
|
||||
|
||||
Call sites:
|
||||
- Top of each gated beat-task consumer when its "is there work?" query
|
||||
returns a non-empty result.
|
||||
- cc_pair create lifecycle hook.
|
||||
"""
|
||||
if not MULTI_TENANT:
|
||||
return
|
||||
|
||||
try:
|
||||
# `mapping={member: score}` syntax; ZADD overwrites the score on
|
||||
# existing members, which is exactly the refresh semantics we want.
|
||||
_client().zadd(_SET_KEY, mapping={tenant_id: _now_ms()})
|
||||
except Exception:
|
||||
logger.exception(f"mark_tenant_active failed: tenant_id={tenant_id}")
|
||||
|
||||
|
||||
def get_active_tenants(ttl_seconds: int) -> set[str] | None:
|
||||
"""Return tenants whose last-seen timestamp is within `ttl_seconds` of
|
||||
now.
|
||||
|
||||
Return values:
|
||||
- `set[str]` (possibly empty) — Redis read succeeded. Empty set means
|
||||
no tenants are currently marked active; callers should *skip* all
|
||||
tenants if the gate is enforcing.
|
||||
- `None` — Redis read failed *or* we are in single-tenant mode. Callers
|
||||
should fail open (dispatch to every tenant this cycle). Distinguishing
|
||||
failure from "genuinely empty" prevents a Redis outage from silently
|
||||
starving every tenant on every enforced cycle.
|
||||
"""
|
||||
if not MULTI_TENANT:
|
||||
return None
|
||||
|
||||
cutoff_ms = _now_ms() - (ttl_seconds * 1000)
|
||||
try:
|
||||
raw = cast(
|
||||
list[bytes],
|
||||
_client().zrangebyscore(_SET_KEY, min=cutoff_ms, max="+inf"),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("get_active_tenants failed")
|
||||
return None
|
||||
|
||||
return {m.decode() if isinstance(m, bytes) else m for m in raw}
|
||||
|
||||
|
||||
def cleanup_expired(ttl_seconds: int) -> int:
|
||||
"""Remove members older than `ttl_seconds` from the set. Optional
|
||||
memory-hygiene helper — correctness does not depend on calling this, but
|
||||
without it the set grows unboundedly as old tenants accumulate. Returns
|
||||
the number of members removed."""
|
||||
if not MULTI_TENANT:
|
||||
return 0
|
||||
|
||||
cutoff_ms = _now_ms() - (ttl_seconds * 1000)
|
||||
try:
|
||||
removed = cast(
|
||||
int,
|
||||
_client().zremrangebyscore(_SET_KEY, min="-inf", max=f"({cutoff_ms}"),
|
||||
)
|
||||
return removed
|
||||
except Exception:
|
||||
logger.exception("cleanup_expired failed")
|
||||
return 0
|
||||
@@ -7,6 +7,9 @@ from onyx.background.celery.tasks.beat_schedule import CLOUD_BEAT_MULTIPLIER_DEF
|
||||
from onyx.background.celery.tasks.beat_schedule import (
|
||||
CLOUD_DOC_PERMISSION_SYNC_MULTIPLIER_DEFAULT,
|
||||
)
|
||||
from onyx.configs.app_configs import ENABLE_TENANT_WORK_GATING
|
||||
from onyx.configs.app_configs import TENANT_WORK_GATING_FULL_FANOUT_INTERVAL_SECONDS
|
||||
from onyx.configs.app_configs import TENANT_WORK_GATING_TTL_SECONDS
|
||||
from onyx.configs.constants import CLOUD_BUILD_FENCE_LOOKUP_TABLE_INTERVAL_DEFAULT
|
||||
from onyx.configs.constants import ONYX_CLOUD_REDIS_RUNTIME
|
||||
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
|
||||
@@ -139,6 +142,87 @@ class OnyxRuntime:
|
||||
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def _read_tenant_work_gating_flag(axis: str, default: bool) -> bool:
|
||||
"""Read `runtime:tenant_work_gating:{axis}` from Redis and interpret
|
||||
it as a bool. Returns `default` if the key is absent or unparseable.
|
||||
`axis` is either `enabled` (compute the gate) or `enforce` (actually
|
||||
skip)."""
|
||||
r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID)
|
||||
raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:tenant_work_gating:{axis}")
|
||||
if raw is None:
|
||||
return default
|
||||
|
||||
try:
|
||||
return cast(bytes, raw).decode().strip().lower() == "true"
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
@staticmethod
|
||||
def get_tenant_work_gating_enabled() -> bool:
|
||||
"""Should we *compute* the work gate? (read the Redis set, log how
|
||||
many tenants would be skipped). Env-var `ENABLE_TENANT_WORK_GATING`
|
||||
is the fallback default when no Redis override is set — it acts as
|
||||
the master switch that turns the feature on in shadow mode."""
|
||||
return OnyxRuntime._read_tenant_work_gating_flag(
|
||||
"enabled", default=ENABLE_TENANT_WORK_GATING
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_tenant_work_gating_enforce() -> bool:
|
||||
"""Should we *actually skip* tenants not in the work set?
|
||||
|
||||
Deliberately Redis-only with a hard-coded default of False: the env
|
||||
var `ENABLE_TENANT_WORK_GATING` only flips `enabled` (shadow mode),
|
||||
never `enforce`. Enforcement has to be turned on by an explicit
|
||||
`runtime:tenant_work_gating:enforce=true` write so ops can't
|
||||
accidentally skip real tenant traffic by flipping an env flag. Only
|
||||
meaningful when `get_tenant_work_gating_enabled()` is also True.
|
||||
"""
|
||||
return OnyxRuntime._read_tenant_work_gating_flag("enforce", default=False)
|
||||
|
||||
@staticmethod
|
||||
def get_tenant_work_gating_ttl_seconds() -> int:
|
||||
"""Membership TTL for the `active_tenants` sorted set. Members older
|
||||
than this are treated as "no recent work" by the gate read path.
|
||||
Must be > (full-fanout cadence × base task schedule) so self-healing
|
||||
has time to refresh memberships before they expire."""
|
||||
default = TENANT_WORK_GATING_TTL_SECONDS
|
||||
|
||||
r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID)
|
||||
raw = r.get(f"{ONYX_CLOUD_REDIS_RUNTIME}:tenant_work_gating:ttl_seconds")
|
||||
if raw is None:
|
||||
return default
|
||||
|
||||
try:
|
||||
value = int(cast(bytes, raw).decode())
|
||||
return value if value > 0 else default
|
||||
except ValueError:
|
||||
return default
|
||||
|
||||
@staticmethod
|
||||
def get_tenant_work_gating_full_fanout_interval_seconds() -> int:
|
||||
"""Minimum wall-clock interval between full-fanout cycles. When at
|
||||
least this many seconds have elapsed since the last bypass, the
|
||||
generator ignores the gate on its next invocation and dispatches to
|
||||
every non-gated tenant, letting consumers re-populate the active
|
||||
set. Schedule-independent so beat drift or backlog can't skew the
|
||||
self-heal cadence."""
|
||||
default = TENANT_WORK_GATING_FULL_FANOUT_INTERVAL_SECONDS
|
||||
|
||||
r = get_redis_replica_client(tenant_id=ONYX_CLOUD_TENANT_ID)
|
||||
raw = r.get(
|
||||
f"{ONYX_CLOUD_REDIS_RUNTIME}:tenant_work_gating:full_fanout_interval_seconds"
|
||||
)
|
||||
if raw is None:
|
||||
return default
|
||||
|
||||
try:
|
||||
value = int(cast(bytes, raw).decode())
|
||||
return value if value > 0 else default
|
||||
except ValueError:
|
||||
return default
|
||||
|
||||
@staticmethod
|
||||
def get_build_fence_lookup_table_interval() -> int:
|
||||
"""We maintain an active fence table to make lookups of existing fences efficient.
|
||||
|
||||
@@ -34,6 +34,7 @@ from onyx.server.settings.models import UserSettings
|
||||
from onyx.server.settings.store import load_settings
|
||||
from onyx.server.settings.store import store_settings
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.platform import is_running_in_container
|
||||
from onyx.utils.variable_functionality import (
|
||||
fetch_versioned_implementation_with_fallback,
|
||||
)
|
||||
@@ -111,6 +112,7 @@ def fetch_settings(
|
||||
if DISABLE_VECTOR_DB
|
||||
else DEFAULT_FILE_TOKEN_COUNT_THRESHOLD_K_VECTOR_DB
|
||||
),
|
||||
is_containerized=is_running_in_container(),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -133,3 +133,7 @@ class UserSettings(Settings):
|
||||
else DEFAULT_FILE_TOKEN_COUNT_THRESHOLD_K_VECTOR_DB
|
||||
)
|
||||
)
|
||||
# True when the backend is running inside a container (Docker/Podman).
|
||||
# The frontend uses this to default local-service URLs (e.g. Ollama,
|
||||
# LM Studio) to host.docker.internal instead of localhost.
|
||||
is_containerized: bool = False
|
||||
|
||||
@@ -5,6 +5,7 @@ from collections.abc import MutableMapping
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from typing import Any
|
||||
|
||||
from onyx.utils.platform import is_running_in_container
|
||||
from onyx.utils.tenant import get_tenant_id_short_string
|
||||
from shared_configs.configs import DEV_LOGGING_ENABLED
|
||||
from shared_configs.configs import LOG_FILE_NAME
|
||||
@@ -169,13 +170,6 @@ def get_standard_formatter() -> ColoredFormatter:
|
||||
)
|
||||
|
||||
|
||||
DANSWER_DOCKER_ENV_STR = "DANSWER_RUNNING_IN_DOCKER"
|
||||
|
||||
|
||||
def is_running_in_container() -> bool:
|
||||
return os.getenv(DANSWER_DOCKER_ENV_STR) == "true"
|
||||
|
||||
|
||||
def setup_logger(
|
||||
name: str = __name__,
|
||||
log_level: int = get_log_level_from_str(),
|
||||
|
||||
32
backend/onyx/utils/platform.py
Normal file
32
backend/onyx/utils/platform.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_ONYX_DOCKER_ENV_STR = "ONYX_RUNNING_IN_DOCKER"
|
||||
_DANSWER_DOCKER_ENV_STR = "DANSWER_RUNNING_IN_DOCKER"
|
||||
|
||||
|
||||
def _resolve_container_flag() -> bool:
|
||||
onyx_val = os.getenv(_ONYX_DOCKER_ENV_STR)
|
||||
if onyx_val is not None:
|
||||
return onyx_val.lower() == "true"
|
||||
|
||||
danswer_val = os.getenv(_DANSWER_DOCKER_ENV_STR)
|
||||
if danswer_val is not None:
|
||||
logger.warning(
|
||||
"%s is deprecated and will be ignored in a future release. "
|
||||
"Use %s instead.",
|
||||
_DANSWER_DOCKER_ENV_STR,
|
||||
_ONYX_DOCKER_ENV_STR,
|
||||
)
|
||||
return danswer_val.lower() == "true"
|
||||
|
||||
return False
|
||||
|
||||
|
||||
_IS_RUNNING_IN_CONTAINER: bool = _resolve_container_flag()
|
||||
|
||||
|
||||
def is_running_in_container() -> bool:
|
||||
return _IS_RUNNING_IN_CONTAINER
|
||||
@@ -0,0 +1,159 @@
|
||||
"""Tests for the tenant work-gating Redis helpers.
|
||||
|
||||
Requires a running Redis instance. Run with::
|
||||
|
||||
python -m dotenv -f .vscode/.env run -- pytest \
|
||||
backend/tests/external_dependency_unit/tenant_work_gating/test_tenant_work_gating.py
|
||||
"""
|
||||
|
||||
import time
|
||||
from collections.abc import Generator
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
|
||||
from onyx.redis import redis_tenant_work_gating as twg
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.redis.redis_tenant_work_gating import _SET_KEY
|
||||
from onyx.redis.redis_tenant_work_gating import cleanup_expired
|
||||
from onyx.redis.redis_tenant_work_gating import get_active_tenants
|
||||
from onyx.redis.redis_tenant_work_gating import mark_tenant_active
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _multi_tenant_true() -> Generator[None, None, None]:
|
||||
"""Force MULTI_TENANT=True for the helper module so public functions are
|
||||
not no-ops during tests."""
|
||||
with patch.object(twg, "MULTI_TENANT", True):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clean_set() -> Generator[None, None, None]:
|
||||
"""Clear the active_tenants sorted set before and after each test."""
|
||||
client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
|
||||
client.delete(_SET_KEY)
|
||||
yield
|
||||
client.delete(_SET_KEY)
|
||||
|
||||
|
||||
def test_mark_adds_tenant_to_set() -> None:
|
||||
mark_tenant_active("tenant_a")
|
||||
|
||||
assert get_active_tenants(ttl_seconds=60) == {"tenant_a"}
|
||||
|
||||
|
||||
def test_mark_refreshes_timestamp() -> None:
|
||||
"""ZADD overwrites the score on existing members. Without a refresh,
|
||||
reading with a TTL that excludes the first write should return empty;
|
||||
after a second mark_tenant_active at a newer timestamp, the same TTL
|
||||
read should include the tenant. Pins `_now_ms` so the test is
|
||||
deterministic."""
|
||||
base_ms = int(time.time() * 1000)
|
||||
|
||||
# First write at t=0.
|
||||
with patch.object(twg, "_now_ms", return_value=base_ms):
|
||||
mark_tenant_active("tenant_a")
|
||||
|
||||
# Read 5s later with a 1s TTL — first write is outside the window.
|
||||
with patch.object(twg, "_now_ms", return_value=base_ms + 5000):
|
||||
assert get_active_tenants(ttl_seconds=1) == set()
|
||||
|
||||
# Refresh at t=5s.
|
||||
with patch.object(twg, "_now_ms", return_value=base_ms + 5000):
|
||||
mark_tenant_active("tenant_a")
|
||||
|
||||
# Read at t=5s with a 1s TTL — refreshed write is inside the window.
|
||||
with patch.object(twg, "_now_ms", return_value=base_ms + 5000):
|
||||
assert get_active_tenants(ttl_seconds=1) == {"tenant_a"}
|
||||
|
||||
|
||||
def test_get_active_tenants_filters_by_ttl() -> None:
|
||||
"""Tenant marked in the past, read with a TTL short enough to exclude it."""
|
||||
# Pin _now_ms so the write happens at t=0 and the read cutoff is
|
||||
# well after that.
|
||||
base_ms = int(time.time() * 1000)
|
||||
with patch.object(twg, "_now_ms", return_value=base_ms):
|
||||
mark_tenant_active("tenant_old")
|
||||
|
||||
# Read 5 seconds later with a 1-second TTL — tenant_old is outside.
|
||||
with patch.object(twg, "_now_ms", return_value=base_ms + 5000):
|
||||
assert get_active_tenants(ttl_seconds=1) == set()
|
||||
|
||||
# Read 5 seconds later with a 10-second TTL — tenant_old is inside.
|
||||
with patch.object(twg, "_now_ms", return_value=base_ms + 5000):
|
||||
assert get_active_tenants(ttl_seconds=10) == {"tenant_old"}
|
||||
|
||||
|
||||
def test_get_active_tenants_multiple_members() -> None:
|
||||
mark_tenant_active("tenant_a")
|
||||
mark_tenant_active("tenant_b")
|
||||
mark_tenant_active("tenant_c")
|
||||
|
||||
assert get_active_tenants(ttl_seconds=60) == {"tenant_a", "tenant_b", "tenant_c"}
|
||||
|
||||
|
||||
def test_get_active_tenants_empty_set() -> None:
|
||||
"""Genuinely-empty set returns an empty set (not None)."""
|
||||
assert get_active_tenants(ttl_seconds=60) == set()
|
||||
|
||||
|
||||
def test_get_active_tenants_returns_none_on_redis_error() -> None:
|
||||
"""Callers need to distinguish Redis failure from "no tenants active" so
|
||||
they can fail open. Simulate failure by patching the client to raise."""
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
failing_client = MagicMock()
|
||||
failing_client.zrangebyscore.side_effect = RuntimeError("simulated outage")
|
||||
|
||||
with patch.object(twg, "_client", return_value=failing_client):
|
||||
assert get_active_tenants(ttl_seconds=60) is None
|
||||
|
||||
|
||||
def test_get_active_tenants_returns_none_in_single_tenant_mode() -> None:
|
||||
"""Single-tenant mode returns None so callers can skip the gate entirely
|
||||
(same fail-open handling as Redis unavailability)."""
|
||||
with patch.object(twg, "MULTI_TENANT", False):
|
||||
assert get_active_tenants(ttl_seconds=60) is None
|
||||
|
||||
|
||||
def test_cleanup_expired_removes_only_stale_members() -> None:
|
||||
"""Seed one stale and one fresh member directly; cleanup should drop only
|
||||
the stale one."""
|
||||
now_ms = int(time.time() * 1000)
|
||||
|
||||
client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
|
||||
client.zadd(_SET_KEY, mapping={"tenant_old": now_ms - 10 * 60 * 1000})
|
||||
client.zadd(_SET_KEY, mapping={"tenant_new": now_ms})
|
||||
|
||||
removed = cleanup_expired(ttl_seconds=60)
|
||||
|
||||
assert removed == 1
|
||||
assert get_active_tenants(ttl_seconds=60 * 60) == {"tenant_new"}
|
||||
|
||||
|
||||
def test_cleanup_expired_empty_set_noop() -> None:
|
||||
assert cleanup_expired(ttl_seconds=60) == 0
|
||||
|
||||
|
||||
def test_noop_when_multi_tenant_false() -> None:
|
||||
with patch.object(twg, "MULTI_TENANT", False):
|
||||
mark_tenant_active("tenant_a")
|
||||
assert get_active_tenants(ttl_seconds=60) is None
|
||||
assert cleanup_expired(ttl_seconds=60) == 0
|
||||
|
||||
# Verify nothing was written while MULTI_TENANT was False.
|
||||
assert get_active_tenants(ttl_seconds=60) == set()
|
||||
|
||||
|
||||
def test_rendered_key_is_cloud_prefixed() -> None:
|
||||
"""Exercises TenantRedis auto-prefixing on sorted-set ops. The rendered
|
||||
Redis key should be `cloud:active_tenants`, not bare `active_tenants`."""
|
||||
mark_tenant_active("tenant_a")
|
||||
|
||||
from onyx.redis.redis_pool import RedisPool
|
||||
|
||||
raw = RedisPool().get_raw_client()
|
||||
assert raw.zscore("cloud:active_tenants", "tenant_a") is not None
|
||||
assert raw.zscore("active_tenants", "tenant_a") is None
|
||||
@@ -0,0 +1,172 @@
|
||||
"""
|
||||
Tests verifying that GithubConnector implements SlimConnector and SlimConnectorWithPermSync
|
||||
correctly, and that pruning uses the cheap slim path (no lazy loading).
|
||||
"""
|
||||
|
||||
from collections.abc import Generator
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import PropertyMock
|
||||
|
||||
import pytest
|
||||
|
||||
from onyx.access.models import ExternalAccess
|
||||
from onyx.background.celery.celery_utils import extract_ids_from_runnable_connector
|
||||
from onyx.connectors.github.connector import GithubConnector
|
||||
from onyx.connectors.interfaces import SlimConnector
|
||||
from onyx.connectors.interfaces import SlimConnectorWithPermSync
|
||||
from onyx.connectors.models import SlimDocument
|
||||
|
||||
|
||||
def _make_pr(html_url: str) -> MagicMock:
|
||||
pr = MagicMock()
|
||||
pr.html_url = html_url
|
||||
pr.pull_request = None
|
||||
# commits and changed_files should never be accessed during slim retrieval
|
||||
type(pr).commits = PropertyMock(side_effect=AssertionError("lazy load triggered"))
|
||||
type(pr).changed_files = PropertyMock(
|
||||
side_effect=AssertionError("lazy load triggered")
|
||||
)
|
||||
return pr
|
||||
|
||||
|
||||
def _make_issue(html_url: str) -> MagicMock:
|
||||
issue = MagicMock()
|
||||
issue.html_url = html_url
|
||||
issue.pull_request = None
|
||||
return issue
|
||||
|
||||
|
||||
def _make_connector(include_issues: bool = False) -> GithubConnector:
|
||||
connector = GithubConnector(
|
||||
repo_owner="test-org",
|
||||
repositories="test-repo",
|
||||
include_prs=True,
|
||||
include_issues=include_issues,
|
||||
)
|
||||
connector.github_client = MagicMock()
|
||||
return connector
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_deserialize_repository(mock_repo: MagicMock) -> Generator[None, None, None]:
|
||||
with patch(
|
||||
"onyx.connectors.github.connector.deserialize_repository",
|
||||
return_value=mock_repo,
|
||||
):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_repo() -> MagicMock:
|
||||
repo = MagicMock()
|
||||
repo.name = "test-repo"
|
||||
repo.id = 123
|
||||
repo.raw_headers = {"x-github-request-id": "test"}
|
||||
repo.raw_data = {"id": 123, "name": "test-repo", "full_name": "test-org/test-repo"}
|
||||
prs = [
|
||||
_make_pr(f"https://github.com/test-org/test-repo/pull/{i}") for i in range(1, 4)
|
||||
]
|
||||
mock_paginated = MagicMock()
|
||||
mock_paginated.get_page.side_effect = lambda page: prs if page == 0 else []
|
||||
repo.get_pulls.return_value = mock_paginated
|
||||
return repo
|
||||
|
||||
|
||||
def test_github_connector_implements_slim_connector() -> None:
|
||||
connector = _make_connector()
|
||||
assert isinstance(connector, SlimConnector)
|
||||
|
||||
|
||||
def test_github_connector_implements_slim_connector_with_perm_sync() -> None:
|
||||
connector = _make_connector()
|
||||
assert isinstance(connector, SlimConnectorWithPermSync)
|
||||
|
||||
|
||||
def test_retrieve_all_slim_docs_returns_pr_urls(mock_repo: MagicMock) -> None:
|
||||
connector = _make_connector()
|
||||
with patch.object(connector, "fetch_configured_repos", return_value=[mock_repo]):
|
||||
batches = list(connector.retrieve_all_slim_docs())
|
||||
|
||||
all_docs = [doc for batch in batches for doc in batch]
|
||||
assert len(all_docs) == 3
|
||||
assert all(isinstance(doc, SlimDocument) for doc in all_docs)
|
||||
assert {doc.id for doc in all_docs if isinstance(doc, SlimDocument)} == {
|
||||
"https://github.com/test-org/test-repo/pull/1",
|
||||
"https://github.com/test-org/test-repo/pull/2",
|
||||
"https://github.com/test-org/test-repo/pull/3",
|
||||
}
|
||||
|
||||
|
||||
def test_retrieve_all_slim_docs_has_no_external_access(mock_repo: MagicMock) -> None:
|
||||
"""Pruning does not need permissions — external_access should be None."""
|
||||
connector = _make_connector()
|
||||
with patch.object(connector, "fetch_configured_repos", return_value=[mock_repo]):
|
||||
batches = list(connector.retrieve_all_slim_docs())
|
||||
|
||||
all_docs = [doc for batch in batches for doc in batch]
|
||||
assert all(doc.external_access is None for doc in all_docs)
|
||||
|
||||
|
||||
def test_retrieve_all_slim_docs_perm_sync_populates_external_access(
|
||||
mock_repo: MagicMock,
|
||||
) -> None:
|
||||
connector = _make_connector()
|
||||
mock_access = MagicMock(spec=ExternalAccess)
|
||||
|
||||
with patch.object(connector, "fetch_configured_repos", return_value=[mock_repo]):
|
||||
with patch(
|
||||
"onyx.connectors.github.connector.get_external_access_permission",
|
||||
return_value=mock_access,
|
||||
) as mock_perm:
|
||||
batches = list(connector.retrieve_all_slim_docs_perm_sync())
|
||||
|
||||
# permission fetched at least once per repo (once per page in checkpoint-based flow)
|
||||
mock_perm.assert_called_with(mock_repo, connector.github_client)
|
||||
|
||||
all_docs = [doc for batch in batches for doc in batch]
|
||||
assert all(doc.external_access is mock_access for doc in all_docs)
|
||||
|
||||
|
||||
def test_retrieve_all_slim_docs_skips_pr_issues(mock_repo: MagicMock) -> None:
|
||||
"""Issues that are actually PRs should be skipped when include_issues=True."""
|
||||
connector = _make_connector(include_issues=True)
|
||||
|
||||
pr_issue = MagicMock()
|
||||
pr_issue.html_url = "https://github.com/test-org/test-repo/pull/99"
|
||||
pr_issue.pull_request = MagicMock() # non-None means it's a PR
|
||||
|
||||
real_issue = _make_issue("https://github.com/test-org/test-repo/issues/1")
|
||||
issues = [pr_issue, real_issue]
|
||||
mock_issues_paginated = MagicMock()
|
||||
mock_issues_paginated.get_page.side_effect = lambda page: (
|
||||
issues if page == 0 else []
|
||||
)
|
||||
mock_repo.get_issues.return_value = mock_issues_paginated
|
||||
|
||||
with patch.object(connector, "fetch_configured_repos", return_value=[mock_repo]):
|
||||
batches = list(connector.retrieve_all_slim_docs())
|
||||
|
||||
issue_ids = {
|
||||
doc.id
|
||||
for batch in batches
|
||||
for doc in batch
|
||||
if isinstance(doc, SlimDocument) and "issues" in doc.id
|
||||
}
|
||||
assert issue_ids == {"https://github.com/test-org/test-repo/issues/1"}
|
||||
|
||||
|
||||
def test_pruning_routes_to_slim_connector_path(mock_repo: MagicMock) -> None:
|
||||
"""extract_ids_from_runnable_connector must use SlimConnector, not CheckpointedConnector."""
|
||||
connector = _make_connector()
|
||||
|
||||
with patch.object(connector, "fetch_configured_repos", return_value=[mock_repo]):
|
||||
# If the CheckpointedConnector fallback were used instead, it would call
|
||||
# load_from_checkpoint which hits _convert_pr_to_document and lazy loads.
|
||||
# We verify the slim path is taken by checking load_from_checkpoint is NOT called.
|
||||
with patch.object(connector, "load_from_checkpoint") as mock_load:
|
||||
result = extract_ids_from_runnable_connector(connector)
|
||||
mock_load.assert_not_called()
|
||||
|
||||
assert len(result.raw_id_to_parent) == 3
|
||||
assert "https://github.com/test-org/test-repo/pull/1" in result.raw_id_to_parent
|
||||
@@ -33,15 +33,22 @@ def _make_chunker() -> TabularChunker:
|
||||
return TabularChunker(tokenizer=CharTokenizer())
|
||||
|
||||
|
||||
def _tabular_section(text: str, link: str = "sheet:Test") -> Section:
|
||||
return TabularSection(text=text, link=link)
|
||||
_DEFAULT_LINK = "https://example.com/doc"
|
||||
|
||||
|
||||
def _tabular_section(
|
||||
text: str,
|
||||
link: str = _DEFAULT_LINK,
|
||||
heading: str | None = "sheet:Test",
|
||||
) -> Section:
|
||||
return TabularSection(text=text, link=link, heading=heading)
|
||||
|
||||
|
||||
class TestTabularChunkerChunkSection:
|
||||
def test_simple_csv_all_rows_fit_one_chunk(self) -> None:
|
||||
# --- INPUT -----------------------------------------------------
|
||||
csv_text = "Name,Age,City\n" "Alice,30,NYC\n" "Bob,25,SF\n"
|
||||
link = "sheet:People"
|
||||
heading = "sheet:People"
|
||||
content_token_limit = 500
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
@@ -56,7 +63,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=content_token_limit,
|
||||
)
|
||||
@@ -64,7 +71,7 @@ class TestTabularChunkerChunkSection:
|
||||
# --- ASSERT ----------------------------------------------------
|
||||
assert [p.text for p in out.payloads] == expected_texts
|
||||
assert [p.is_continuation for p in out.payloads] == [False]
|
||||
assert all(p.links == {0: link} for p in out.payloads)
|
||||
assert all(p.links == {0: _DEFAULT_LINK} for p in out.payloads)
|
||||
assert out.accumulator.is_empty()
|
||||
|
||||
def test_overflow_splits_into_two_deterministic_chunks(self) -> None:
|
||||
@@ -74,7 +81,7 @@ class TestTabularChunkerChunkSection:
|
||||
# Each row "col=a, val=1" is 12 tokens; two rows + \n = 25 (fits),
|
||||
# three rows + 2×\n = 38 (overflows) → split after 2 rows.
|
||||
csv_text = "col,val\n" "a,1\n" "b,2\n" "c,3\n" "d,4\n"
|
||||
link = "sheet:S"
|
||||
heading = "sheet:S"
|
||||
content_token_limit = 57
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
@@ -85,7 +92,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=content_token_limit,
|
||||
)
|
||||
@@ -95,7 +102,7 @@ class TestTabularChunkerChunkSection:
|
||||
# First chunk is fresh; subsequent chunks mark as continuations.
|
||||
assert [p.is_continuation for p in out.payloads] == [False, True]
|
||||
# Link carries through every chunk.
|
||||
assert all(p.links == {0: link} for p in out.payloads)
|
||||
assert all(p.links == {0: _DEFAULT_LINK} for p in out.payloads)
|
||||
|
||||
# Add back in shortly
|
||||
# def test_header_only_csv_produces_single_prelude_chunk(self) -> None:
|
||||
@@ -123,7 +130,7 @@ class TestTabularChunkerChunkSection:
|
||||
# Alice's Age is empty; Bob's City is empty. Empty cells should
|
||||
# not appear as `field=` pairs in the output.
|
||||
csv_text = "Name,Age,City\n" "Alice,,NYC\n" "Bob,25,\n"
|
||||
link = "sheet:P"
|
||||
heading = "sheet:P"
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
@@ -137,7 +144,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=500,
|
||||
)
|
||||
@@ -151,7 +158,7 @@ class TestTabularChunkerChunkSection:
|
||||
# a single field. The surrounding quotes are stripped during
|
||||
# decoding, so the chunk text carries the bare value.
|
||||
csv_text = "Name,Notes\n" 'Alice,"Hello, world"\n'
|
||||
link = "sheet:P"
|
||||
heading = "sheet:P"
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
@@ -160,7 +167,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=500,
|
||||
)
|
||||
@@ -173,7 +180,7 @@ class TestTabularChunkerChunkSection:
|
||||
# Stray blank rows in the CSV (e.g. export artifacts) shouldn't
|
||||
# produce ghost rows in the output.
|
||||
csv_text = "A,B\n" "\n" "1,2\n" "\n" "\n" "3,4\n"
|
||||
link = "sheet:S"
|
||||
heading = "sheet:S"
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
@@ -182,7 +189,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=500,
|
||||
)
|
||||
@@ -199,7 +206,7 @@ class TestTabularChunkerChunkSection:
|
||||
pending_link = "prev-link"
|
||||
|
||||
csv_text = "a,b\n" "1,2\n"
|
||||
link = "sheet:S"
|
||||
heading = "sheet:S"
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
@@ -209,7 +216,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(
|
||||
text=pending_text,
|
||||
link_offsets={0: pending_link},
|
||||
@@ -222,7 +229,7 @@ class TestTabularChunkerChunkSection:
|
||||
# Flushed chunk keeps the prior text's link; tabular chunk uses
|
||||
# the tabular section's link.
|
||||
assert out.payloads[0].links == {0: pending_link}
|
||||
assert out.payloads[1].links == {0: link}
|
||||
assert out.payloads[1].links == {0: _DEFAULT_LINK}
|
||||
# Accumulator resets — tabular section is a structural boundary.
|
||||
assert out.accumulator.is_empty()
|
||||
|
||||
@@ -234,7 +241,7 @@ class TestTabularChunkerChunkSection:
|
||||
csv_text = (
|
||||
"x\n" "aaaaaaaaaaaaaaaaaa\n" "bbbbbbbbbbbbbbbbbb\n" "cccccccccccccccccc\n"
|
||||
)
|
||||
link = "S"
|
||||
heading = "S"
|
||||
content_token_limit = 100
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
@@ -252,7 +259,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=content_token_limit,
|
||||
)
|
||||
@@ -274,7 +281,7 @@ class TestTabularChunkerChunkSection:
|
||||
# Every emitted chunk therefore carries its full prelude rather
|
||||
# than dropping Columns at emit time.
|
||||
csv_text = "x\n" "aa\n" "bb\n" "cc\n" "dd\n" "ee\n"
|
||||
link = "S"
|
||||
heading = "S"
|
||||
content_token_limit = 30
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
@@ -290,7 +297,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=content_token_limit,
|
||||
)
|
||||
@@ -311,7 +318,7 @@ class TestTabularChunkerChunkSection:
|
||||
# pieces (they already consume the full budget). A 53-token row
|
||||
# packs into 3 field-boundary pieces under a 20-token budget.
|
||||
csv_text = "field 1,field 2,field 3,field 4,field 5\n" "1,2,3,4,5\n"
|
||||
link = "S"
|
||||
heading = "S"
|
||||
content_token_limit = 20
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
@@ -331,7 +338,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=content_token_limit,
|
||||
)
|
||||
@@ -359,7 +366,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section("", link="sheet:Empty"),
|
||||
_tabular_section("", heading="sheet:Empty"),
|
||||
AccumulatorState(
|
||||
text=pending_text,
|
||||
link_offsets=pending_link_offsets,
|
||||
@@ -381,7 +388,7 @@ class TestTabularChunkerChunkSection:
|
||||
# CSV has one column "x" with a 50-char value. Formatted pair =
|
||||
# "x=" + 50 a's = 52 tokens. Budget = 10.
|
||||
csv_text = "x\n" + ("a" * 50) + "\n"
|
||||
link = "S"
|
||||
heading = "S"
|
||||
content_token_limit = 10
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
@@ -404,7 +411,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=content_token_limit,
|
||||
)
|
||||
@@ -421,7 +428,7 @@ class TestTabularChunkerChunkSection:
|
||||
# alias appended in parens on the `Columns:` line. Plain headers
|
||||
# pass through untouched.
|
||||
csv_text = "MTTR_hours,id,owner_name\n" "3,42,Alice\n"
|
||||
link = "sheet:M"
|
||||
heading = "sheet:M"
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
@@ -434,7 +441,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=500,
|
||||
)
|
||||
@@ -455,7 +462,7 @@ class TestTabularChunkerChunkSection:
|
||||
# populated (tiny). Row 2 is a "fat" row with all four columns
|
||||
# populated.
|
||||
csv_text = "a,b,c,d\n" "1,,,\n" "xxx,yyy,zzz,www\n" "2,,,\n"
|
||||
link = "S"
|
||||
heading = "S"
|
||||
content_token_limit = 20
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
@@ -481,7 +488,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=content_token_limit,
|
||||
)
|
||||
@@ -503,7 +510,7 @@ class TestTabularChunkerChunkSection:
|
||||
# cols + row: 10+1+3 = 14 ≤ 15 ✓
|
||||
# sheet + cols + row: 13+1+10+1+3 = 28 > 15 ✗
|
||||
csv_text = "x\n" "y\n"
|
||||
link = "LongSheetName"
|
||||
heading = "LongSheetName"
|
||||
content_token_limit = 15
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
@@ -511,7 +518,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=content_token_limit,
|
||||
)
|
||||
@@ -534,7 +541,7 @@ class TestTabularChunkerChunkSection:
|
||||
# cols + row: 17+1+12 = 30 > 20 ✗
|
||||
# sheet + row: 1+1+12 = 14 ≤ 20 ✓
|
||||
csv_text = "ABC,DEF\n" "1,2\n"
|
||||
link = "S"
|
||||
heading = "S"
|
||||
content_token_limit = 20
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
@@ -542,7 +549,7 @@ class TestTabularChunkerChunkSection:
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
_tabular_section(csv_text, heading=heading),
|
||||
AccumulatorState(),
|
||||
content_token_limit=content_token_limit,
|
||||
)
|
||||
|
||||
@@ -172,7 +172,7 @@ LOG_ONYX_MODEL_INTERACTIONS=False
|
||||
|
||||
## Gen AI Settings
|
||||
# GEN_AI_MAX_TOKENS=
|
||||
LLM_SOCKET_READ_TIMEOUT=120
|
||||
# LLM_SOCKET_READ_TIMEOUT=
|
||||
# MAX_CHUNKS_FED_TO_CHAT=
|
||||
# DISABLE_LITELLM_STREAMING=
|
||||
# LITELLM_EXTRA_HEADERS=
|
||||
|
||||
@@ -1259,7 +1259,7 @@ configMap:
|
||||
S3_FILE_STORE_BUCKET_NAME: ""
|
||||
# Gen AI Settings
|
||||
GEN_AI_MAX_TOKENS: ""
|
||||
LLM_SOCKET_READ_TIMEOUT: "120"
|
||||
LLM_SOCKET_READ_TIMEOUT: "60"
|
||||
MAX_CHUNKS_FED_TO_CHAT: ""
|
||||
# Query Options
|
||||
DOC_TIME_DECAY: ""
|
||||
|
||||
@@ -76,6 +76,10 @@ export interface Settings {
|
||||
// Factory defaults for the restore button.
|
||||
default_user_file_max_upload_size_mb?: number;
|
||||
default_file_token_count_threshold_k?: number;
|
||||
|
||||
// True when the backend runs inside a container (Docker/Podman).
|
||||
// Used to default local-service URLs to host.docker.internal.
|
||||
is_containerized?: boolean;
|
||||
}
|
||||
|
||||
export enum NotificationType {
|
||||
|
||||
@@ -27,8 +27,7 @@ import {
|
||||
import { fetchModels } from "@/lib/llmConfig/svc";
|
||||
import { toast } from "@/hooks/useToast";
|
||||
import { refreshLlmProviderCaches } from "@/lib/llmConfig/cache";
|
||||
|
||||
const DEFAULT_API_BASE = "http://localhost:1234";
|
||||
import { useSettingsContext } from "@/providers/SettingsProvider";
|
||||
|
||||
interface LMStudioModalValues extends BaseLLMModalValues {
|
||||
api_base: string;
|
||||
@@ -116,6 +115,10 @@ export default function LMStudioModal({
|
||||
}: LLMProviderFormProps) {
|
||||
const isOnboarding = variant === "onboarding";
|
||||
const { mutate } = useSWRConfig();
|
||||
const { settings } = useSettingsContext();
|
||||
const defaultApiBase = settings.is_containerized
|
||||
? "http://host.docker.internal:1234"
|
||||
: "http://localhost:1234";
|
||||
|
||||
const onClose = () => onOpenChange?.(false);
|
||||
|
||||
@@ -125,7 +128,7 @@ export default function LMStudioModal({
|
||||
LLMProviderName.LM_STUDIO,
|
||||
existingLlmProvider
|
||||
),
|
||||
api_base: existingLlmProvider?.api_base ?? DEFAULT_API_BASE,
|
||||
api_base: existingLlmProvider?.api_base ?? defaultApiBase,
|
||||
custom_config: {
|
||||
LM_STUDIO_API_KEY: existingLlmProvider?.custom_config?.LM_STUDIO_API_KEY,
|
||||
},
|
||||
|
||||
@@ -31,8 +31,7 @@ import { Card } from "@opal/components";
|
||||
import { toast } from "@/hooks/useToast";
|
||||
import { refreshLlmProviderCaches } from "@/lib/llmConfig/cache";
|
||||
import InputTypeInField from "@/refresh-components/form/InputTypeInField";
|
||||
|
||||
const DEFAULT_API_BASE = "http://127.0.0.1:11434";
|
||||
import { useSettingsContext } from "@/providers/SettingsProvider";
|
||||
const CLOUD_API_BASE = "https://ollama.com";
|
||||
|
||||
enum Tab {
|
||||
@@ -163,6 +162,10 @@ export default function OllamaModal({
|
||||
}: LLMProviderFormProps) {
|
||||
const isOnboarding = variant === "onboarding";
|
||||
const { mutate } = useSWRConfig();
|
||||
const { settings } = useSettingsContext();
|
||||
const defaultApiBase = settings.is_containerized
|
||||
? "http://host.docker.internal:11434"
|
||||
: "http://127.0.0.1:11434";
|
||||
const apiKey = existingLlmProvider?.custom_config?.OLLAMA_API_KEY;
|
||||
const defaultTab =
|
||||
existingLlmProvider && !!apiKey ? Tab.TAB_CLOUD : Tab.TAB_SELF_HOSTED;
|
||||
@@ -176,7 +179,7 @@ export default function OllamaModal({
|
||||
LLMProviderName.OLLAMA_CHAT,
|
||||
existingLlmProvider
|
||||
),
|
||||
api_base: existingLlmProvider?.api_base ?? DEFAULT_API_BASE,
|
||||
api_base: existingLlmProvider?.api_base ?? defaultApiBase,
|
||||
custom_config: {
|
||||
OLLAMA_API_KEY: apiKey,
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user