Compare commits

..

10 Commits

Author SHA1 Message Date
Dane Urban
e76ffbd4c3 Connectors output TabularSection 2026-04-12 15:52:26 -07:00
Dane Urban
696e88710d Tabular log 2026-04-12 15:23:12 -07:00
Dane Urban
e131ce9547 . 2026-04-12 13:34:08 -07:00
Dane Urban
828c2ded5c Rework 2026-04-12 13:30:35 -07:00
Dane Urban
ee3f399cc2 Change dispatcher 2026-04-12 13:25:09 -07:00
Dane Urban
0a86507cda . 2026-04-12 13:21:54 -07:00
Dane Urban
9ab125441f . 2026-04-12 13:20:32 -07:00
Dane Urban
0de5399303 Refactor stuff 2026-04-12 13:20:31 -07:00
Dane Urban
537bf1ce1d Add kind to the Section model 2026-04-12 13:16:08 -07:00
Dane Urban
5b7779bc78 Add tests for current document chunking 2026-04-11 13:49:23 -07:00
18 changed files with 1608 additions and 281 deletions

View File

@@ -26,6 +26,10 @@ from onyx.configs.constants import FileOrigin
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
process_onyx_metadata,
)
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
is_tabular_file,
tabular_file_to_sections,
)
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
@@ -451,6 +455,40 @@ class BlobStorageConnector(LoadConnector, PollConnector):
logger.exception(f"Error processing image {key}")
continue
# Handle tabular files (xlsx, csv, tsv) — produce one
# TabularSection per sheet (or per file for csv/tsv)
# instead of a flat TextSection.
if is_tabular_file(file_name):
try:
downloaded_file = self._download_object(key)
if downloaded_file is None:
continue
tabular_sections = tabular_file_to_sections(
BytesIO(downloaded_file),
file_name=file_name,
link=link,
)
batch.append(
Document(
id=f"{self.bucket_type}:{self.bucket_name}:{key}",
sections=(
tabular_sections
if tabular_sections
else [TextSection(link=link, text="")]
),
source=DocumentSource(self.bucket_type.value),
semantic_identifier=file_name,
doc_updated_at=last_modified,
metadata={},
)
)
if len(batch) == self.batch_size:
yield batch
batch = []
except Exception:
logger.exception(f"Error processing tabular file {key}")
continue
# Handle text and document files
try:
downloaded_file = self._download_object(key)

View File

@@ -0,0 +1,73 @@
"""Helpers for converting tabular files (xlsx, csv, tsv) into
TabularSection objects.
This lives in `connectors/cross_connector_utils` because:
- It imports `TabularSection` from `connectors.models` (connector-layer type).
- It calls `file_processing` primitives (`xlsx_sheet_extraction`, `file_io_to_text`)
but does the connector-layer wrapping here so every connector that ingests
tabular data can share the same section shape.
"""
from typing import IO
from onyx.connectors.models import TabularSection
from onyx.file_processing.extract_file_text import file_io_to_text
from onyx.file_processing.extract_file_text import xlsx_sheet_extraction
from onyx.utils.logger import setup_logger
logger = setup_logger()
# Extensions routed through this helper instead of the generic
# `extract_text_and_images` path. Keep in sync with
# `OnyxFileExtensions.TABULAR_EXTENSIONS`.
TABULAR_FILE_EXTENSIONS = {".xlsx", ".csv", ".tsv"}
def is_tabular_file(file_name: str) -> bool:
"""Return True if the file extension indicates a tabular file
(xlsx, csv, tsv)."""
lowered = file_name.lower()
return any(lowered.endswith(ext) for ext in TABULAR_FILE_EXTENSIONS)
def tabular_file_to_sections(
file: IO[bytes],
file_name: str,
link: str = "",
) -> list[TabularSection]:
"""Convert a tabular file into one or more TabularSections.
- `.xlsx` → one TabularSection per non-empty sheet, with
`link=f"sheet:{title}"`.
- `.csv` / `.tsv` → a single TabularSection containing the full
decoded file, with `link=link` (falling back to `file_name` when
the caller doesn't provide one — `TabularSection.link` is required).
Returns an empty list when the file yields no extractable content
(empty workbook, empty csv, decode failure).
Raises `ValueError` if `file_name` isn't a recognized tabular
extension — callers should gate on `is_tabular_file` first.
"""
lowered = file_name.lower()
if lowered.endswith(".xlsx"):
return [
TabularSection(link=f"sheet:{sheet_title}", text=csv_text)
for csv_text, sheet_title in xlsx_sheet_extraction(
file, file_name=file_name
)
]
if lowered.endswith((".csv", ".tsv")):
try:
text = file_io_to_text(file).strip()
except Exception as e:
logger.warning(f"Failed to decode {file_name}: {e}")
return []
if not text:
return []
return [TabularSection(link=link or file_name, text=text)]
raise ValueError(f"{file_name!r} is not a tabular file")

View File

@@ -15,6 +15,10 @@ from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
)
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rate_limit_builder
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rl_requests
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
is_tabular_file,
tabular_file_to_sections,
)
from onyx.connectors.drupal_wiki.models import DrupalWikiCheckpoint
from onyx.connectors.drupal_wiki.models import DrupalWikiPage
from onyx.connectors.drupal_wiki.models import DrupalWikiPageResponse
@@ -33,6 +37,7 @@ from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_processing.extract_file_text import get_file_ext
@@ -226,7 +231,7 @@ class DrupalWikiConnector(
Tuple of (sections, error_message). If error_message is not None, the
sections list should be treated as invalid.
"""
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
try:
if not self._validate_attachment_filetype(attachment):
@@ -273,6 +278,25 @@ class DrupalWikiConnector(
return sections, None
# Tabular attachments (xlsx, csv, tsv) — produce
# TabularSections instead of a flat TextSection.
if is_tabular_file(file_name):
try:
sections.extend(
tabular_file_to_sections(
BytesIO(raw_bytes),
file_name=file_name,
link=download_url,
)
)
except Exception as e:
logger.warning(
f"Failed to extract tabular sections from {file_name}: {e}"
)
if not sections:
return [], f"No content extracted from tabular file {file_name}"
return sections, None
image_counter = 0
def _store_embedded_image(image_data: bytes, image_name: str) -> None:

View File

@@ -12,6 +12,10 @@ from onyx.configs.constants import FileOrigin
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
process_onyx_metadata,
)
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
is_tabular_file,
tabular_file_to_sections,
)
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.models import Document
@@ -145,6 +149,39 @@ def _process_file(
logger.error(f"Failed to process image file {file_name}: {e}")
return []
# 1b) If the file is tabular (xlsx/csv/tsv), produce one
# TabularSection per sheet (or per file for csv/tsv) instead of
# flattening through the generic text extractor.
if is_tabular_file(file_name):
file.seek(0)
try:
tabular_sections = tabular_file_to_sections(
file=file,
file_name=file_name,
link=link or "",
)
except Exception as e:
logger.error(f"Failed to process tabular file {file_name}: {e}")
return []
if not tabular_sections:
logger.warning(f"No content extracted from tabular file {file_name}")
return []
return [
Document(
id=doc_id,
sections=list(tabular_sections),
source=source_type,
semantic_identifier=file_display_name,
title=title,
doc_updated_at=time_updated,
primary_owners=primary_owners,
secondary_owners=secondary_owners,
metadata=custom_tags,
)
]
# 2) Otherwise: text-based approach. Possibly with embedded images.
file.seek(0)

View File

@@ -1,3 +1,7 @@
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
is_tabular_file,
tabular_file_to_sections,
)
import io
from collections.abc import Callable
from datetime import datetime
@@ -28,15 +32,16 @@ from onyx.connectors.models import Document
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import ImageSection
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_processing.extract_file_text import get_file_ext
from onyx.file_processing.extract_file_text import pptx_to_text
from onyx.file_processing.extract_file_text import read_docx_file
from onyx.file_processing.extract_file_text import read_pdf_file
from onyx.file_processing.extract_file_text import xlsx_to_text
from onyx.file_processing.file_types import OnyxFileExtensions
from onyx.file_processing.file_types import OnyxMimeTypes
from onyx.file_processing.file_types import SPREADSHEET_MIME_TYPE
from onyx.file_processing.image_utils import store_image_and_create_section
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import (
@@ -289,7 +294,7 @@ def _download_and_extract_sections_basic(
service: GoogleDriveService,
allow_images: bool,
size_threshold: int,
) -> list[TextSection | ImageSection]:
) -> list[TextSection | ImageSection | TabularSection]:
"""Extract text and images from a Google Drive file."""
file_id = file["id"]
file_name = file["name"]
@@ -308,7 +313,7 @@ def _download_and_extract_sections_basic(
return []
# Store images for later processing
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
try:
section, embedded_id = store_image_and_create_section(
image_data=response_call(),
@@ -323,10 +328,9 @@ def _download_and_extract_sections_basic(
logger.error(f"Failed to process image {file_name}: {e}")
return sections
# For Google Docs, Sheets, and Slides, export as plain text
# For Google Docs, Sheets, and Slides, export via the Drive API
if mime_type in GOOGLE_MIME_TYPES_TO_EXPORT:
export_mime_type = GOOGLE_MIME_TYPES_TO_EXPORT[mime_type]
# Use the correct API call for exporting files
request = service.files().export_media(
fileId=file_id, mimeType=export_mime_type
)
@@ -335,6 +339,17 @@ def _download_and_extract_sections_basic(
logger.warning(f"Failed to export {file_name} as {export_mime_type}")
return []
if export_mime_type in OnyxMimeTypes.TABULAR_MIME_TYPES:
# Synthesize an extension on the filename
ext = ".xlsx" if export_mime_type == SPREADSHEET_MIME_TYPE else ".csv"
return list(
tabular_file_to_sections(
io.BytesIO(response),
file_name=f"{file_name}{ext}",
link=link,
)
)
text = response.decode("utf-8")
return [TextSection(link=link, text=text)]
@@ -356,9 +371,15 @@ def _download_and_extract_sections_basic(
elif (
mime_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
or is_tabular_file(file_name)
):
text = xlsx_to_text(io.BytesIO(response_call()), file_name=file_name)
return [TextSection(link=link, text=text)] if text else []
return list(
tabular_file_to_sections(
io.BytesIO(response_call()),
file_name=file_name,
link=link,
)
)
elif (
mime_type
@@ -410,8 +431,9 @@ def _find_nth(haystack: str, needle: str, n: int, start: int = 0) -> int:
def align_basic_advanced(
basic_sections: list[TextSection | ImageSection], adv_sections: list[TextSection]
) -> list[TextSection | ImageSection]:
basic_sections: list[TextSection | ImageSection | TabularSection],
adv_sections: list[TextSection],
) -> list[TextSection | ImageSection | TabularSection]:
"""Align the basic sections with the advanced sections.
In particular, the basic sections contain all content of the file,
including smart chips like dates and doc links. The advanced sections
@@ -428,7 +450,7 @@ def align_basic_advanced(
basic_full_text = "".join(
[section.text for section in basic_sections if isinstance(section, TextSection)]
)
new_sections: list[TextSection | ImageSection] = []
new_sections: list[TextSection | ImageSection | TabularSection] = []
heading_start = 0
for adv_ind in range(1, len(adv_sections)):
heading = adv_sections[adv_ind].text.split(HEADING_DELIMITER)[0]
@@ -599,7 +621,7 @@ def _convert_drive_item_to_document(
"""
Main entry point for converting a Google Drive file => Document object.
"""
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
# Only construct these services when needed
def _get_drive_service() -> GoogleDriveService:
@@ -639,7 +661,9 @@ def _convert_drive_item_to_document(
doc_id=file.get("id", ""),
)
if doc_sections:
sections = cast(list[TextSection | ImageSection], doc_sections)
sections = cast(
list[TextSection | ImageSection | TabularSection], doc_sections
)
if any(SMART_CHIP_CHAR in section.text for section in doc_sections):
logger.debug(
f"found smart chips in {file.get('name')}, aligning with basic sections"

View File

@@ -33,9 +33,19 @@ class ConnectorMissingCredentialError(PermissionError):
)
class SectionKind(str, Enum):
"""Discriminator for Section subclasses.
"""
TEXT = "text"
IMAGE = "image"
TABULAR = "tabular"
class Section(BaseModel):
"""Base section class with common attributes"""
kind: SectionKind
link: str | None = None
text: str | None = None
image_file_id: str | None = None
@@ -44,6 +54,7 @@ class Section(BaseModel):
class TextSection(Section):
"""Section containing text content"""
kind: SectionKind = SectionKind.TEXT
text: str
def __sizeof__(self) -> int:
@@ -53,12 +64,22 @@ class TextSection(Section):
class ImageSection(Section):
"""Section containing an image reference"""
kind: SectionKind = SectionKind.IMAGE
image_file_id: str
def __sizeof__(self) -> int:
return sys.getsizeof(self.image_file_id) + sys.getsizeof(self.link)
class TabularSection(Section):
"""Section containing tabular data (csv/tsv content, or one sheet of
an xlsx workbook rendered as CSV)."""
kind: SectionKind = SectionKind.TABULAR
text: str # CSV representation in a string
link: str
class BasicExpertInfo(BaseModel):
"""Basic Information for the owner of a document, any of the fields can be left as None
Display fallback goes as follows:
@@ -161,7 +182,7 @@ class DocumentBase(BaseModel):
"""Used for Onyx ingestion api, the ID is inferred before use if not provided"""
id: str | None = None
sections: list[TextSection | ImageSection]
sections: list[TextSection | ImageSection | TabularSection]
source: DocumentSource | None = None
semantic_identifier: str # displayed in the UI as the main identifier for the doc
# TODO(andrei): Ideally we could improve this to where each value is just a

View File

@@ -60,7 +60,12 @@ from onyx.connectors.models import ExternalAccess
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
is_tabular_file,
tabular_file_to_sections,
)
from onyx.connectors.sharepoint.connector_utils import get_sharepoint_external_access
from onyx.db.enums import HierarchyNodeType
from onyx.file_processing.extract_file_text import extract_text_and_images
@@ -586,7 +591,7 @@ def _convert_driveitem_to_document_with_permissions(
driveitem, f"Failed to download via graph api: {e}", e
)
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
file_ext = get_file_ext(driveitem.name)
if not content_bytes:
@@ -602,6 +607,19 @@ def _convert_driveitem_to_document_with_permissions(
)
image_section.link = driveitem.web_url
sections.append(image_section)
elif is_tabular_file(driveitem.name):
try:
sections.extend(
tabular_file_to_sections(
file=io.BytesIO(content_bytes),
file_name=driveitem.name,
link=driveitem.web_url or "",
)
)
except Exception as e:
logger.warning(
f"Failed to extract tabular sections for '{driveitem.name}': {e}"
)
else:
def _store_embedded_image(img_data: bytes, img_name: str) -> None:

View File

@@ -462,30 +462,13 @@ def _remove_empty_runs(
return result
def xlsx_sheet_extraction(file: IO[Any], file_name: str = "") -> list[tuple[str, str]]:
"""
Converts each sheet in the excel file to a csv condensed string.
Returns a string and the worksheet title for each worksheet
def xlsx_to_text(file: IO[Any], file_name: str = "") -> str:
# TODO: switch back to this approach in a few months when markitdown
# fixes their handling of excel files
# md = get_markitdown_converter()
# stream_info = StreamInfo(
# mimetype=SPREADSHEET_MIME_TYPE, filename=file_name or None, extension=".xlsx"
# )
# try:
# workbook = md.convert(to_bytesio(file), stream_info=stream_info)
# except (
# BadZipFile,
# ValueError,
# FileConversionException,
# UnsupportedFormatException,
# ) as e:
# error_str = f"Failed to extract text from {file_name or 'xlsx file'}: {e}"
# if file_name.startswith("~"):
# logger.debug(error_str + " (this is expected for files with ~)")
# else:
# logger.warning(error_str)
# return ""
# return workbook.markdown
Returns a list of (csv_text, sheet)
"""
try:
workbook = openpyxl.load_workbook(file, read_only=True)
except BadZipFile as e:
@@ -494,23 +477,30 @@ def xlsx_to_text(file: IO[Any], file_name: str = "") -> str:
logger.debug(error_str + " (this is expected for files with ~)")
else:
logger.warning(error_str)
return ""
return []
except Exception as e:
if any(s in str(e) for s in KNOWN_OPENPYXL_BUGS):
logger.error(
f"Failed to extract text from {file_name or 'xlsx file'}. This happens due to a bug in openpyxl. {e}"
)
return ""
return []
raise
text_content = []
sheets: list[tuple[str, str]] = []
for sheet in workbook.worksheets:
sheet_matrix = _clean_worksheet_matrix(_worksheet_to_matrix(sheet))
buf = io.StringIO()
writer = csv.writer(buf, lineterminator="\n")
writer.writerows(sheet_matrix)
text_content.append(buf.getvalue().rstrip("\n"))
return TEXT_SECTION_SEPARATOR.join(text_content)
csv_text = buf.getvalue().rstrip("\n")
if csv_text.strip():
sheets.append((csv_text, sheet.title))
return sheets
def xlsx_to_text(file: IO[Any], file_name: str = "") -> str:
sheets = xlsx_sheet_extraction(file, file_name)
return TEXT_SECTION_SEPARATOR.join(csv_text for csv_text, _title in sheets)
def eml_to_text(file: IO[Any]) -> str:

View File

@@ -1,5 +1,3 @@
from typing import cast
from chonkie import SentenceChunker
from onyx.configs.app_configs import AVERAGE_SUMMARY_EMBEDDINGS
@@ -15,17 +13,15 @@ from onyx.configs.constants import SECTION_SEPARATOR
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
get_metadata_keys_to_ignore,
)
from onyx.indexing.chunking import DocumentChunker
from onyx.indexing.chunking import extract_blurb
from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import Section
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.indexing.models import DocAwareChunk
from onyx.llm.utils import MAX_CONTEXT_TOKENS
from onyx.natural_language_processing.utils import BaseTokenizer
from onyx.utils.logger import setup_logger
from onyx.utils.text_processing import clean_text
from onyx.utils.text_processing import shared_precompare_cleanup
from shared_configs.configs import DOC_EMBEDDING_CONTEXT_SIZE
from shared_configs.configs import STRICT_CHUNK_TOKEN_LIMIT
# Not supporting overlaps, we need a clean combination of chunks and it is unclear if overlaps
# actually help quality at all
@@ -154,9 +150,6 @@ class Chunker:
self.tokenizer = tokenizer
self.callback = callback
self.max_context = 0
self.prompt_tokens = 0
# Create a token counter function that returns the count instead of the tokens
def token_counter(text: str) -> int:
return len(tokenizer.encode(text))
@@ -186,234 +179,12 @@ class Chunker:
else None
)
def _split_oversized_chunk(self, text: str, content_token_limit: int) -> list[str]:
"""
Splits the text into smaller chunks based on token count to ensure
no chunk exceeds the content_token_limit.
"""
tokens = self.tokenizer.tokenize(text)
chunks = []
start = 0
total_tokens = len(tokens)
while start < total_tokens:
end = min(start + content_token_limit, total_tokens)
token_chunk = tokens[start:end]
chunk_text = " ".join(token_chunk)
chunks.append(chunk_text)
start = end
return chunks
def _extract_blurb(self, text: str) -> str:
"""
Extract a short blurb from the text (first chunk of size `blurb_size`).
"""
# chunker is in `text` mode
texts = cast(list[str], self.blurb_splitter.chunk(text))
if not texts:
return ""
return texts[0]
def _get_mini_chunk_texts(self, chunk_text: str) -> list[str] | None:
"""
For "multipass" mode: additional sub-chunks (mini-chunks) for use in certain embeddings.
"""
if self.mini_chunk_splitter and chunk_text.strip():
# chunker is in `text` mode
return cast(list[str], self.mini_chunk_splitter.chunk(chunk_text))
return None
# ADDED: extra param image_url to store in the chunk
def _create_chunk(
self,
document: IndexingDocument,
chunks_list: list[DocAwareChunk],
text: str,
links: dict[int, str],
is_continuation: bool = False,
title_prefix: str = "",
metadata_suffix_semantic: str = "",
metadata_suffix_keyword: str = "",
image_file_id: str | None = None,
) -> None:
"""
Helper to create a new DocAwareChunk, append it to chunks_list.
"""
new_chunk = DocAwareChunk(
source_document=document,
chunk_id=len(chunks_list),
blurb=self._extract_blurb(text),
content=text,
source_links=links or {0: ""},
image_file_id=image_file_id,
section_continuation=is_continuation,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
mini_chunk_texts=self._get_mini_chunk_texts(text),
large_chunk_id=None,
doc_summary="",
chunk_context="",
contextual_rag_reserved_tokens=0, # set per-document in _handle_single_document
self._document_chunker = DocumentChunker(
tokenizer=tokenizer,
blurb_splitter=self.blurb_splitter,
chunk_splitter=self.chunk_splitter,
mini_chunk_splitter=self.mini_chunk_splitter,
)
chunks_list.append(new_chunk)
def _chunk_document_with_sections(
self,
document: IndexingDocument,
sections: list[Section],
title_prefix: str,
metadata_suffix_semantic: str,
metadata_suffix_keyword: str,
content_token_limit: int,
) -> list[DocAwareChunk]:
"""
Loops through sections of the document, converting them into one or more chunks.
Works with processed sections that are base Section objects.
"""
chunks: list[DocAwareChunk] = []
link_offsets: dict[int, str] = {}
chunk_text = ""
for section_idx, section in enumerate(sections):
# Get section text and other attributes
section_text = clean_text(str(section.text or ""))
section_link_text = section.link or ""
image_url = section.image_file_id
# If there is no useful content, skip
if not section_text and (not document.title or section_idx > 0):
logger.warning(
f"Skipping empty or irrelevant section in doc {document.semantic_identifier}, link={section_link_text}"
)
continue
# CASE 1: If this section has an image, force a separate chunk
if image_url:
# First, if we have any partially built text chunk, finalize it
if chunk_text.strip():
self._create_chunk(
document,
chunks,
chunk_text,
link_offsets,
is_continuation=False,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
)
chunk_text = ""
link_offsets = {}
# Create a chunk specifically for this image section
# (Using the text summary that was generated during processing)
self._create_chunk(
document,
chunks,
section_text,
links={0: section_link_text} if section_link_text else {},
image_file_id=image_url,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
)
# Continue to next section
continue
# CASE 2: Normal text section
section_token_count = len(self.tokenizer.encode(section_text))
# If the section is large on its own, split it separately
if section_token_count > content_token_limit:
if chunk_text.strip():
self._create_chunk(
document,
chunks,
chunk_text,
link_offsets,
False,
title_prefix,
metadata_suffix_semantic,
metadata_suffix_keyword,
)
chunk_text = ""
link_offsets = {}
# chunker is in `text` mode
split_texts = cast(list[str], self.chunk_splitter.chunk(section_text))
for i, split_text in enumerate(split_texts):
# If even the split_text is bigger than strict limit, further split
if (
STRICT_CHUNK_TOKEN_LIMIT
and len(self.tokenizer.encode(split_text)) > content_token_limit
):
smaller_chunks = self._split_oversized_chunk(
split_text, content_token_limit
)
for j, small_chunk in enumerate(smaller_chunks):
self._create_chunk(
document,
chunks,
small_chunk,
{0: section_link_text},
is_continuation=(j != 0),
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
)
else:
self._create_chunk(
document,
chunks,
split_text,
{0: section_link_text},
is_continuation=(i != 0),
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
)
continue
# If we can still fit this section into the current chunk, do so
current_token_count = len(self.tokenizer.encode(chunk_text))
current_offset = len(shared_precompare_cleanup(chunk_text))
next_section_tokens = (
len(self.tokenizer.encode(SECTION_SEPARATOR)) + section_token_count
)
if next_section_tokens + current_token_count <= content_token_limit:
if chunk_text:
chunk_text += SECTION_SEPARATOR
chunk_text += section_text
link_offsets[current_offset] = section_link_text
else:
# finalize the existing chunk
self._create_chunk(
document,
chunks,
chunk_text,
link_offsets,
False,
title_prefix,
metadata_suffix_semantic,
metadata_suffix_keyword,
)
# start a new chunk
link_offsets = {0: section_link_text}
chunk_text = section_text
# finalize any leftover text chunk
if chunk_text.strip() or not chunks:
self._create_chunk(
document,
chunks,
chunk_text,
link_offsets or {0: ""}, # safe default
False,
title_prefix,
metadata_suffix_semantic,
metadata_suffix_keyword,
)
return chunks
def _handle_single_document(
self, document: IndexingDocument
@@ -423,7 +194,10 @@ class Chunker:
logger.debug(f"Chunking {document.semantic_identifier}")
# Title prep
title = self._extract_blurb(document.get_title_for_document_index() or "")
title = extract_blurb(
document.get_title_for_document_index() or "",
self.blurb_splitter,
)
title_prefix = title + RETURN_SEPARATOR if title else ""
title_tokens = len(self.tokenizer.encode(title_prefix))
@@ -491,7 +265,7 @@ class Chunker:
# Use processed_sections if available (IndexingDocument), otherwise use original sections
sections_to_chunk = document.processed_sections
normal_chunks = self._chunk_document_with_sections(
normal_chunks = self._document_chunker.chunk(
document,
sections_to_chunk,
title_prefix,

View File

@@ -0,0 +1,7 @@
from onyx.indexing.chunking.document_chunker import DocumentChunker
from onyx.indexing.chunking.section_chunker import extract_blurb
__all__ = [
"DocumentChunker",
"extract_blurb",
]

View File

@@ -0,0 +1,113 @@
from chonkie import SentenceChunker
from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import Section
from onyx.connectors.models import SectionKind
from onyx.indexing.chunking.image_section_chunker import ImageChunker
from onyx.indexing.chunking.section_chunker import AccumulatorState
from onyx.indexing.chunking.section_chunker import ChunkPayload
from onyx.indexing.chunking.section_chunker import SectionChunker
from onyx.indexing.chunking.text_section_chunker import TextChunker
from onyx.indexing.models import DocAwareChunk
from onyx.natural_language_processing.utils import BaseTokenizer
from onyx.utils.logger import setup_logger
from onyx.utils.text_processing import clean_text
logger = setup_logger()
class DocumentChunker:
"""Converts a document's processed sections into DocAwareChunks.
Drop-in replacement for `Chunker._chunk_document_with_sections`.
"""
def __init__(
self,
tokenizer: BaseTokenizer,
blurb_splitter: SentenceChunker,
chunk_splitter: SentenceChunker,
mini_chunk_splitter: SentenceChunker | None = None,
) -> None:
self.blurb_splitter = blurb_splitter
self.mini_chunk_splitter = mini_chunk_splitter
self._dispatch: dict[SectionKind, SectionChunker] = {
SectionKind.TEXT: TextChunker(
tokenizer=tokenizer,
chunk_splitter=chunk_splitter,
),
SectionKind.IMAGE: ImageChunker(),
}
def chunk(
self,
document: IndexingDocument,
sections: list[Section],
title_prefix: str,
metadata_suffix_semantic: str,
metadata_suffix_keyword: str,
content_token_limit: int,
) -> list[DocAwareChunk]:
payloads = self._collect_section_payloads(
document=document,
sections=sections,
content_token_limit=content_token_limit,
)
if not payloads:
payloads.append(ChunkPayload(text="", links={0: ""}))
return [
payload.to_doc_aware_chunk(
document=document,
chunk_id=idx,
blurb_splitter=self.blurb_splitter,
mini_chunk_splitter=self.mini_chunk_splitter,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
)
for idx, payload in enumerate(payloads)
]
def _collect_section_payloads(
self,
document: IndexingDocument,
sections: list[Section],
content_token_limit: int,
) -> list[ChunkPayload]:
accumulator = AccumulatorState()
payloads: list[ChunkPayload] = []
for section_idx, section in enumerate(sections):
section_text = clean_text(str(section.text or ""))
if not section_text and (
not document.title or section_idx > 0
):
logger.warning(
f"Skipping empty or irrelevant section in doc "
f"{document.semantic_identifier}, link={section.link}"
)
continue
chunker = self._select_chunker(section)
result = chunker.chunk_section(
section=section,
accumulator=accumulator,
content_token_limit=content_token_limit,
)
payloads.extend(result.payloads)
accumulator = result.accumulator
payloads.extend(accumulator.flush_to_list())
return payloads
def _select_chunker(self, section: Section) -> SectionChunker:
try:
return self._dispatch[section.kind]
except KeyError:
raise ValueError(
f"No SectionChunker registered for kind={section.kind}"
)

View File

@@ -0,0 +1,34 @@
from onyx.connectors.models import Section
from onyx.indexing.chunking.section_chunker import AccumulatorState
from onyx.indexing.chunking.section_chunker import ChunkPayload
from onyx.indexing.chunking.section_chunker import SectionChunker
from onyx.indexing.chunking.section_chunker import SectionChunkerOutput
from onyx.utils.text_processing import clean_text
class ImageChunker(SectionChunker):
def chunk_section(
self,
section: Section,
accumulator: AccumulatorState,
content_token_limit: int, # noqa: ARG002
) -> SectionChunkerOutput:
assert section.image_file_id is not None
section_text = clean_text(str(section.text or ""))
section_link = section.link or ""
payloads = accumulator.flush_to_list()
payloads.append(
ChunkPayload(
text=section_text,
links={0: section_link} if section_link else {},
image_file_id=section.image_file_id,
is_continuation=False,
)
)
return SectionChunkerOutput(
payloads=payloads,
accumulator=AccumulatorState(),
)

View File

@@ -0,0 +1,102 @@
from abc import ABC
from abc import abstractmethod
from typing import cast
from chonkie import SentenceChunker
from pydantic import BaseModel
from pydantic import Field
from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import Section
from onyx.indexing.models import DocAwareChunk
def extract_blurb(text: str, blurb_splitter: SentenceChunker) -> str:
texts = cast(list[str], blurb_splitter.chunk(text))
if not texts:
return ""
return texts[0]
def get_mini_chunk_texts(
chunk_text: str,
mini_chunk_splitter: SentenceChunker | None,
) -> list[str] | None:
if mini_chunk_splitter and chunk_text.strip():
return cast(list[str], mini_chunk_splitter.chunk(chunk_text))
return None
class ChunkPayload(BaseModel):
"""Section-local chunk content without document-scoped fields.
The orchestrator upgrades these to DocAwareChunks via
`to_doc_aware_chunk` after assigning chunk_ids and attaching
title/metadata.
"""
text: str
links: dict[int, str]
is_continuation: bool = False
image_file_id: str | None = None
def to_doc_aware_chunk(
self,
document: IndexingDocument,
chunk_id: int,
blurb_splitter: SentenceChunker,
title_prefix: str = "",
metadata_suffix_semantic: str = "",
metadata_suffix_keyword: str = "",
mini_chunk_splitter: SentenceChunker | None = None,
) -> DocAwareChunk:
return DocAwareChunk(
source_document=document,
chunk_id=chunk_id,
blurb=extract_blurb(self.text, blurb_splitter),
content=self.text,
source_links=self.links or {0: ""},
image_file_id=self.image_file_id,
section_continuation=self.is_continuation,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
mini_chunk_texts=get_mini_chunk_texts(
self.text, mini_chunk_splitter
),
large_chunk_id=None,
doc_summary="",
chunk_context="",
contextual_rag_reserved_tokens=0,
)
class AccumulatorState(BaseModel):
"""Cross-section text buffer threaded through SectionChunkers."""
text: str = ""
link_offsets: dict[int, str] = Field(default_factory=dict)
def is_empty(self) -> bool:
return not self.text.strip()
def flush_to_list(self) -> list["ChunkPayload"]:
if self.is_empty():
return []
return [ChunkPayload(text=self.text, links=self.link_offsets)]
class SectionChunkerOutput(BaseModel):
payloads: list[ChunkPayload]
accumulator: AccumulatorState
class SectionChunker(ABC):
@abstractmethod
def chunk_section(
self,
section: Section,
accumulator: AccumulatorState,
content_token_limit: int,
) -> SectionChunkerOutput:
...

View File

@@ -0,0 +1,129 @@
from typing import cast
from chonkie import SentenceChunker
from onyx.configs.constants import SECTION_SEPARATOR
from onyx.connectors.models import Section
from onyx.indexing.chunking.section_chunker import AccumulatorState
from onyx.indexing.chunking.section_chunker import ChunkPayload
from onyx.indexing.chunking.section_chunker import SectionChunker
from onyx.indexing.chunking.section_chunker import SectionChunkerOutput
from onyx.natural_language_processing.utils import BaseTokenizer
from onyx.utils.text_processing import clean_text
from onyx.utils.text_processing import shared_precompare_cleanup
from shared_configs.configs import STRICT_CHUNK_TOKEN_LIMIT
class TextChunker(SectionChunker):
def __init__(
self,
tokenizer: BaseTokenizer,
chunk_splitter: SentenceChunker,
) -> None:
self.tokenizer = tokenizer
self.chunk_splitter = chunk_splitter
def chunk_section(
self,
section: Section,
accumulator: AccumulatorState,
content_token_limit: int,
) -> SectionChunkerOutput:
section_text = clean_text(str(section.text or ""))
section_link = section.link or ""
section_token_count = len(self.tokenizer.encode(section_text))
# Oversized — flush buffer and split the section
if section_token_count > content_token_limit:
return self._handle_oversized_section(
section_text=section_text,
section_link=section_link,
accumulator=accumulator,
content_token_limit=content_token_limit,
)
current_token_count = len(self.tokenizer.encode(accumulator.text))
next_section_tokens = (
len(self.tokenizer.encode(SECTION_SEPARATOR)) + section_token_count
)
# Fits — extend the accumulator
if next_section_tokens + current_token_count <= content_token_limit:
offset = len(shared_precompare_cleanup(accumulator.text))
new_text = accumulator.text
if new_text:
new_text += SECTION_SEPARATOR
new_text += section_text
return SectionChunkerOutput(
payloads=[],
accumulator=AccumulatorState(
text=new_text,
link_offsets={**accumulator.link_offsets, offset: section_link},
),
)
# Doesn't fit — flush buffer and restart with this section
return SectionChunkerOutput(
payloads=accumulator.flush_to_list(),
accumulator=AccumulatorState(
text=section_text,
link_offsets={0: section_link},
),
)
def _handle_oversized_section(
self,
section_text: str,
section_link: str,
accumulator: AccumulatorState,
content_token_limit: int,
) -> SectionChunkerOutput:
payloads = accumulator.flush_to_list()
split_texts = cast(
list[str], self.chunk_splitter.chunk(section_text)
)
for i, split_text in enumerate(split_texts):
if (
STRICT_CHUNK_TOKEN_LIMIT
and len(self.tokenizer.encode(split_text)) > content_token_limit
):
smaller_chunks = self._split_oversized_chunk(
split_text, content_token_limit
)
for j, small_chunk in enumerate(smaller_chunks):
payloads.append(
ChunkPayload(
text=small_chunk,
links={0: section_link},
is_continuation=(j != 0),
)
)
else:
payloads.append(
ChunkPayload(
text=split_text,
links={0: section_link},
is_continuation=(i != 0),
)
)
return SectionChunkerOutput(
payloads=payloads,
accumulator=AccumulatorState(),
)
def _split_oversized_chunk(
self, text: str, content_token_limit: int
) -> list[str]:
tokens = self.tokenizer.tokenize(text)
chunks: list[str] = []
start = 0
total_tokens = len(tokens)
while start < total_tokens:
end = min(start + content_token_limit, total_tokens)
token_chunk = tokens[start:end]
chunk_text = " ".join(token_chunk)
chunks.append(chunk_text)
start = end
return chunks

View File

@@ -542,6 +542,7 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
**document.model_dump(),
processed_sections=[
Section(
kind=section.kind,
text=section.text if isinstance(section, TextSection) else "",
link=section.link,
image_file_id=(
@@ -566,6 +567,7 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
if isinstance(section, ImageSection):
# Default section with image path preserved - ensure text is always a string
processed_section = Section(
kind=section.kind,
link=section.link,
image_file_id=section.image_file_id,
text="", # Initialize with empty string
@@ -609,6 +611,7 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
# For TextSection, create a base Section with text and link
elif isinstance(section, TextSection):
processed_section = Section(
kind=section.kind,
text=section.text or "", # Ensure text is always a string, not None
link=section.link,
image_file_id=None,

View File

@@ -1,6 +1,7 @@
from onyx.connectors.models import Document
from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import Section
from onyx.connectors.models import SectionKind
FINAL_CONTEXT_DOCUMENTS_ID = "final_context_documents"
@@ -17,6 +18,7 @@ def documents_to_indexing_documents(
processed_sections = []
for section in document.sections:
processed_section = Section(
kind=SectionKind.TEXT,
text=section.text or "",
link=section.link,
image_file_id=None,

View File

@@ -4,6 +4,7 @@ from typing import cast
import openpyxl
from openpyxl.worksheet.worksheet import Worksheet
from onyx.file_processing.extract_file_text import xlsx_sheet_extraction
from onyx.file_processing.extract_file_text import xlsx_to_text
@@ -196,3 +197,136 @@ class TestXlsxToText:
assert "r1c1" in lines[0] and "r1c2" in lines[0]
assert "r2c1" in lines[1] and "r2c2" in lines[1]
assert "r3c1" in lines[2] and "r3c2" in lines[2]
class TestXlsxSheetExtraction:
def test_one_tuple_per_sheet(self) -> None:
xlsx = _make_xlsx(
{
"Revenue": [["Month", "Amount"], ["Jan", "100"]],
"Expenses": [["Category", "Cost"], ["Rent", "500"]],
}
)
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 2
# Order preserved from workbook sheet order
titles = [title for _csv, title in sheets]
assert titles == ["Revenue", "Expenses"]
# Content present in the right tuple
revenue_csv, _ = sheets[0]
expenses_csv, _ = sheets[1]
assert "Month" in revenue_csv
assert "Jan" in revenue_csv
assert "Category" in expenses_csv
assert "Rent" in expenses_csv
def test_tuple_structure_is_csv_text_then_title(self) -> None:
"""The tuple order is (csv_text, sheet_title) — pin it so callers
that unpack positionally don't silently break."""
xlsx = _make_xlsx({"MySheet": [["a", "b"]]})
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 1
csv_text, title = sheets[0]
assert title == "MySheet"
assert "a" in csv_text
assert "b" in csv_text
def test_empty_sheet_is_skipped(self) -> None:
"""A sheet whose CSV output is empty/whitespace-only should NOT
appear in the result — the `if csv_text.strip():` guard filters
it out."""
xlsx = _make_xlsx(
{
"Data": [["a", "b"]],
"Empty": [],
}
)
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 1
assert sheets[0][1] == "Data"
def test_empty_workbook_returns_empty_list(self) -> None:
"""All sheets empty → empty list (not a list of empty tuples)."""
xlsx = _make_xlsx({"Sheet1": [], "Sheet2": []})
sheets = xlsx_sheet_extraction(xlsx)
assert sheets == []
def test_single_sheet(self) -> None:
xlsx = _make_xlsx({"Only": [["x", "y"], ["1", "2"]]})
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 1
csv_text, title = sheets[0]
assert title == "Only"
assert "x" in csv_text
assert "1" in csv_text
def test_bad_zip_returns_empty_list(self) -> None:
bad_file = io.BytesIO(b"not a zip file")
sheets = xlsx_sheet_extraction(bad_file, file_name="test.xlsx")
assert sheets == []
def test_bad_zip_tilde_file_returns_empty_list(self) -> None:
"""`~$`-prefixed files are Excel lock files; failure should log
at debug (not warning) and still return []."""
bad_file = io.BytesIO(b"not a zip file")
sheets = xlsx_sheet_extraction(bad_file, file_name="~$temp.xlsx")
assert sheets == []
def test_csv_content_matches_xlsx_to_text_per_sheet(self) -> None:
"""For a single-sheet workbook, xlsx_to_text output should equal
the csv_text from xlsx_sheet_extraction — they share the same
per-sheet CSV-ification logic."""
single_sheet_data = [["Name", "Age"], ["Alice", "30"]]
expected_text = xlsx_to_text(_make_xlsx({"People": single_sheet_data}))
sheets = xlsx_sheet_extraction(_make_xlsx({"People": single_sheet_data}))
assert len(sheets) == 1
csv_text, title = sheets[0]
assert title == "People"
assert csv_text.strip() == expected_text.strip()
def test_commas_in_cells_are_quoted(self) -> None:
xlsx = _make_xlsx({"S1": [["hello, world", "normal"]]})
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 1
csv_text, _ = sheets[0]
assert '"hello, world"' in csv_text
def test_long_empty_row_run_capped_within_sheet(self) -> None:
"""The matrix cleanup applies per-sheet: >2 empty rows collapse
to 2, which keeps the sheet non-empty and it still appears in
the result."""
xlsx = _make_xlsx(
{
"S1": [
["header"],
[""],
[""],
[""],
[""],
["data"],
]
}
)
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 1
csv_text, _ = sheets[0]
lines = [line for line in csv_text.strip().split("\n") if line.strip()]
# header + 2 empty (capped) + data = 4 lines
assert len(lines) == 4
assert "header" in lines[0]
assert "data" in lines[-1]
def test_sheet_title_with_special_chars_preserved(self) -> None:
"""Spaces, punctuation, unicode in sheet titles are preserved
verbatim — the title is used as a link anchor downstream."""
xlsx = _make_xlsx(
{
"Q1 Revenue (USD)": [["a", "b"]],
"Données": [["c", "d"]],
}
)
sheets = xlsx_sheet_extraction(xlsx)
titles = [title for _csv, title in sheets]
assert "Q1 Revenue (USD)" in titles
assert "Données" in titles

View File

@@ -0,0 +1,804 @@
"""Unit tests for DocumentChunker.chunk (replacement for
Chunker._chunk_document_with_sections).
These tests use a fake character-level tokenizer so every char counts as
exactly one token. This makes token-limit arithmetic deterministic and lets
us exercise every branch of the method without pulling real embedding
models into the test.
"""
import pytest
from chonkie import SentenceChunker
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import SECTION_SEPARATOR
from onyx.connectors.models import SectionKind
from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import Section
from onyx.indexing.chunking import DocumentChunker
from onyx.indexing.chunking import text_section_chunker as text_chunker_module
from onyx.natural_language_processing.utils import BaseTokenizer
class CharTokenizer(BaseTokenizer):
"""1 character == 1 token. Deterministic & trivial to reason about."""
def encode(self, string: str) -> list[int]:
return [ord(c) for c in string]
def tokenize(self, string: str) -> list[str]:
return list(string)
def decode(self, tokens: list[int]) -> str:
return "".join(chr(t) for t in tokens)
# With a char-level tokenizer, each char is a token. 200 is comfortably
# above BLURB_SIZE (128) so the blurb splitter won't get weird on small text.
CHUNK_LIMIT = 200
def _make_document_chunker(
chunk_token_limit: int = CHUNK_LIMIT,
) -> DocumentChunker:
def token_counter(text: str) -> int:
return len(text)
return DocumentChunker(
tokenizer=CharTokenizer(),
blurb_splitter=SentenceChunker(
tokenizer_or_token_counter=token_counter,
chunk_size=128,
chunk_overlap=0,
return_type="texts",
),
chunk_splitter=SentenceChunker(
tokenizer_or_token_counter=token_counter,
chunk_size=chunk_token_limit,
chunk_overlap=0,
return_type="texts",
),
)
def _make_doc(
sections: list[Section],
title: str | None = "Test Doc",
doc_id: str = "doc1",
) -> IndexingDocument:
return IndexingDocument(
id=doc_id,
source=DocumentSource.WEB,
semantic_identifier=doc_id,
title=title,
metadata={},
sections=[], # real sections unused — method reads processed_sections
processed_sections=sections,
)
# --- Empty / degenerate input -------------------------------------------------
def test_empty_processed_sections_returns_single_empty_safety_chunk() -> None:
"""No sections at all should still yield one empty chunk (the
`or not chunks` safety branch at the end)."""
dc = _make_document_chunker()
doc = _make_doc(sections=[])
chunks = dc.chunk(
document=doc,
sections=[],
title_prefix="TITLE\n",
metadata_suffix_semantic="meta_sem",
metadata_suffix_keyword="meta_kw",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 1
assert chunks[0].content == ""
assert chunks[0].chunk_id == 0
assert chunks[0].title_prefix == "TITLE\n"
assert chunks[0].metadata_suffix_semantic == "meta_sem"
assert chunks[0].metadata_suffix_keyword == "meta_kw"
# safe default link offsets
assert chunks[0].source_links == {0: ""}
def test_empty_section_on_first_position_without_title_is_skipped() -> None:
"""Doc has no title, first section has empty text — the guard
`(not document.title or section_idx > 0)` means it IS skipped."""
dc = _make_document_chunker()
doc = _make_doc(
sections=[Section(kind=SectionKind.TEXT, text="", link="l0")],
title=None,
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
# skipped → no real content, but safety branch still yields 1 empty chunk
assert len(chunks) == 1
assert chunks[0].content == ""
def test_empty_section_on_later_position_is_skipped_even_with_title() -> None:
"""Index > 0 empty sections are skipped regardless of title."""
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text="Alpha.", link="l0"),
Section(kind=SectionKind.TEXT, text="", link="l1"), # should be skipped
Section(kind=SectionKind.TEXT, text="Beta.", link="l2"),
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 1
assert "Alpha." in chunks[0].content
assert "Beta." in chunks[0].content
# link offsets should only contain l0 and l2 (no l1)
assert "l1" not in (chunks[0].source_links or {}).values()
# --- Single text section ------------------------------------------------------
def test_single_small_text_section_becomes_one_chunk() -> None:
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text="Hello world.", link="https://a")
]
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="TITLE\n",
metadata_suffix_semantic="ms",
metadata_suffix_keyword="mk",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 1
chunk = chunks[0]
assert chunk.content == "Hello world."
assert chunk.source_links == {0: "https://a"}
assert chunk.title_prefix == "TITLE\n"
assert chunk.metadata_suffix_semantic == "ms"
assert chunk.metadata_suffix_keyword == "mk"
assert chunk.section_continuation is False
assert chunk.image_file_id is None
# --- Multiple text sections combined -----------------------------------------
def test_multiple_small_sections_combine_into_one_chunk() -> None:
dc = _make_document_chunker()
sections = [
Section(kind=SectionKind.TEXT, text="Part one.", link="l1"),
Section(kind=SectionKind.TEXT, text="Part two.", link="l2"),
Section(kind=SectionKind.TEXT, text="Part three.", link="l3"),
]
doc = _make_doc(sections=sections)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 1
expected = SECTION_SEPARATOR.join(["Part one.", "Part two.", "Part three."])
assert chunks[0].content == expected
# link_offsets: indexed by shared_precompare_cleanup length of the
# chunk_text *before* each section was appended.
# "" -> "", len 0
# "Part one." -> "partone", len 7
# "Part one.\n\nPart two." -> "partoneparttwo", len 14
assert chunks[0].source_links == {0: "l1", 7: "l2", 14: "l3"}
def test_sections_overflow_into_second_chunk() -> None:
"""Two sections that together exceed content_token_limit should
finalize the first as one chunk and start a new one."""
dc = _make_document_chunker()
# char-level: 120 char section → 120 tokens. 2 of these plus separator
# exceed a 200-token limit, forcing a flush.
a = "A" * 120
b = "B" * 120
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text=a, link="la"),
Section(kind=SectionKind.TEXT, text=b, link="lb"),
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 2
assert chunks[0].content == a
assert chunks[1].content == b
# first chunk is not a continuation; second starts a new section → not either
assert chunks[0].section_continuation is False
assert chunks[1].section_continuation is False
# chunk_ids should be sequential starting at 0
assert chunks[0].chunk_id == 0
assert chunks[1].chunk_id == 1
# links routed appropriately
assert chunks[0].source_links == {0: "la"}
assert chunks[1].source_links == {0: "lb"}
# --- Image section handling --------------------------------------------------
def test_image_only_section_produces_single_chunk_with_image_id() -> None:
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(
kind=SectionKind.IMAGE,
text="summary of image",
link="https://img",
image_file_id="img-abc",
)
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 1
assert chunks[0].image_file_id == "img-abc"
assert chunks[0].content == "summary of image"
assert chunks[0].source_links == {0: "https://img"}
def test_image_section_flushes_pending_text_and_creates_its_own_chunk() -> None:
"""A buffered text section followed by an image section:
the pending text should be flushed first, then the image chunk."""
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text="Pending text.", link="ltext"),
Section(
kind=SectionKind.IMAGE,
text="image summary",
link="limage",
image_file_id="img-1",
),
Section(kind=SectionKind.TEXT, text="Trailing text.", link="ltail"),
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 3
# 0: flushed pending text
assert chunks[0].content == "Pending text."
assert chunks[0].image_file_id is None
assert chunks[0].source_links == {0: "ltext"}
# 1: image chunk
assert chunks[1].content == "image summary"
assert chunks[1].image_file_id == "img-1"
assert chunks[1].source_links == {0: "limage"}
# 2: trailing text, started fresh after image
assert chunks[2].content == "Trailing text."
assert chunks[2].image_file_id is None
assert chunks[2].source_links == {0: "ltail"}
def test_image_section_without_link_gets_empty_links_dict() -> None:
"""If an image section has no link, links param is {} (not {0: ""})."""
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(
kind=SectionKind.IMAGE,
text="img",
link=None,
image_file_id="img-xyz",
),
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 1
assert chunks[0].image_file_id == "img-xyz"
# to_doc_aware_chunk falls back to {0: ""} when given an empty dict
assert chunks[0].source_links == {0: ""}
# --- Oversized section splitting ---------------------------------------------
def test_oversized_section_is_split_across_multiple_chunks() -> None:
"""A section whose text exceeds content_token_limit should be passed
through chunk_splitter and yield >1 chunks; only the first is not a
continuation."""
dc = _make_document_chunker()
# Build a section whose char-count is well over CHUNK_LIMIT (200), made
# of many short sentences so chonkie's SentenceChunker can split cleanly.
section_text = (
"Alpha beta gamma. Delta epsilon zeta. Eta theta iota. "
"Kappa lambda mu. Nu xi omicron. Pi rho sigma. Tau upsilon phi. "
"Chi psi omega. One two three. Four five six. Seven eight nine. "
"Ten eleven twelve. Thirteen fourteen fifteen. "
"Sixteen seventeen eighteen. Nineteen twenty."
)
assert len(section_text) > CHUNK_LIMIT
doc = _make_doc(
sections=[Section(kind=SectionKind.TEXT, text=section_text, link="big-link")],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) >= 2
# First chunk is fresh, rest are continuations
assert chunks[0].section_continuation is False
for c in chunks[1:]:
assert c.section_continuation is True
# Every produced chunk should carry the section's link
for c in chunks:
assert c.source_links == {0: "big-link"}
# Concatenated content should roughly cover the original (allowing
# for chunker boundary whitespace differences).
joined = "".join(c.content for c in chunks)
for word in ("Alpha", "omega", "twenty"):
assert word in joined
def test_oversized_section_flushes_pending_text_first() -> None:
"""A buffered text section followed by an oversized section should
flush the pending chunk first, then emit the split chunks."""
dc = _make_document_chunker()
pending = "Pending buffered text."
big = (
"Alpha beta gamma. Delta epsilon zeta. Eta theta iota. "
"Kappa lambda mu. Nu xi omicron. Pi rho sigma. Tau upsilon phi. "
"Chi psi omega. One two three. Four five six. Seven eight nine. "
"Ten eleven twelve. Thirteen fourteen fifteen. Sixteen seventeen."
)
assert len(big) > CHUNK_LIMIT
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text=pending, link="l-pending"),
Section(kind=SectionKind.TEXT, text=big, link="l-big"),
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
# First chunk is the flushed pending text
assert chunks[0].content == pending
assert chunks[0].source_links == {0: "l-pending"}
assert chunks[0].section_continuation is False
# Remaining chunks correspond to the oversized section
assert len(chunks) >= 2
for c in chunks[1:]:
assert c.source_links == {0: "l-big"}
# Within the oversized section, the first is fresh and the rest are
# continuations.
assert chunks[1].section_continuation is False
for c in chunks[2:]:
assert c.section_continuation is True
# --- Title prefix / metadata propagation -------------------------------------
def test_title_prefix_and_metadata_propagate_to_all_chunks() -> None:
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text="A" * 120, link="la"),
Section(kind=SectionKind.TEXT, text="B" * 120, link="lb"),
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="MY_TITLE\n",
metadata_suffix_semantic="MS",
metadata_suffix_keyword="MK",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 2
for chunk in chunks:
assert chunk.title_prefix == "MY_TITLE\n"
assert chunk.metadata_suffix_semantic == "MS"
assert chunk.metadata_suffix_keyword == "MK"
# --- chunk_id monotonicity ---------------------------------------------------
def test_chunk_ids_are_sequential_starting_at_zero() -> None:
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text="A" * 120, link="la"),
Section(kind=SectionKind.TEXT, text="B" * 120, link="lb"),
Section(kind=SectionKind.TEXT, text="C" * 120, link="lc"),
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert [c.chunk_id for c in chunks] == list(range(len(chunks)))
# --- Overflow accumulation behavior ------------------------------------------
def test_overflow_flush_then_subsequent_section_joins_new_chunk() -> None:
"""After an overflow flush starts a new chunk, the next fitting section
should combine into that same new chunk (not spawn a third)."""
dc = _make_document_chunker()
# 120 + 120 > 200 → first two sections produce two chunks.
# Third section is small (20 chars) → should fit with second.
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text="A" * 120, link="la"),
Section(kind=SectionKind.TEXT, text="B" * 120, link="lb"),
Section(kind=SectionKind.TEXT, text="C" * 20, link="lc"),
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 2
assert chunks[0].content == "A" * 120
assert chunks[1].content == ("B" * 120) + SECTION_SEPARATOR + ("C" * 20)
# link_offsets on second chunk: lb at 0, lc at precompare-len("BBBB...")=120
assert chunks[1].source_links == {0: "lb", 120: "lc"}
def test_small_section_after_oversized_starts_a_fresh_chunk() -> None:
"""After an oversized section is emitted as its own chunks, the internal
accumulator should be empty so a following small section starts a new
chunk instead of being swallowed."""
dc = _make_document_chunker()
big = (
"Alpha beta gamma. Delta epsilon zeta. Eta theta iota. "
"Kappa lambda mu. Nu xi omicron. Pi rho sigma. Tau upsilon phi. "
"Chi psi omega. One two three. Four five six. Seven eight nine. "
"Ten eleven twelve. Thirteen fourteen fifteen. Sixteen seventeen."
)
assert len(big) > CHUNK_LIMIT
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text=big, link="l-big"),
Section(kind=SectionKind.TEXT, text="Tail text.", link="l-tail"),
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
# All-but-last chunks belong to the oversized section; the very last is
# the tail text starting fresh (not a continuation).
assert len(chunks) >= 2
assert chunks[-1].content == "Tail text."
assert chunks[-1].source_links == {0: "l-tail"}
assert chunks[-1].section_continuation is False
# And earlier oversized chunks never leaked the tail link
for c in chunks[:-1]:
assert c.source_links == {0: "l-big"}
# --- STRICT_CHUNK_TOKEN_LIMIT fallback path ----------------------------------
def test_strict_chunk_token_limit_subdivides_oversized_split(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""When STRICT_CHUNK_TOKEN_LIMIT is enabled and chonkie's chunk_splitter
still produces a piece larger than content_token_limit (e.g. a single
no-period run), the code must fall back to _split_oversized_chunk."""
monkeypatch.setattr(text_chunker_module, "STRICT_CHUNK_TOKEN_LIMIT", True)
dc = _make_document_chunker()
# 500 non-whitespace chars with no sentence boundaries — chonkie will
# return it as one oversized piece (>200) which triggers the fallback.
run = "a" * 500
doc = _make_doc(
sections=[Section(kind=SectionKind.TEXT, text=run, link="l-run")]
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
# With CHUNK_LIMIT=200 and a 500-char run we expect ceil(500/200)=3 sub-chunks.
assert len(chunks) == 3
# First is fresh, rest are continuations (is_continuation=(j != 0))
assert chunks[0].section_continuation is False
assert chunks[1].section_continuation is True
assert chunks[2].section_continuation is True
# All carry the section link
for c in chunks:
assert c.source_links == {0: "l-run"}
# NOTE: we do NOT assert the chunks are at or below content_token_limit.
# _split_oversized_chunk joins tokens with " ", which means the resulting
# chunk contents can exceed the limit when tokens are short. That's a
# quirk of the current implementation and this test pins the window
# slicing, not the post-join length.
def test_strict_chunk_token_limit_disabled_allows_oversized_split(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Same pathological input, but with STRICT disabled: the oversized
split is emitted verbatim as a single chunk (current behavior)."""
monkeypatch.setattr(text_chunker_module, "STRICT_CHUNK_TOKEN_LIMIT", False)
dc = _make_document_chunker()
run = "a" * 500
doc = _make_doc(
sections=[Section(kind=SectionKind.TEXT, text=run, link="l-run")]
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 1
assert chunks[0].content == run
assert chunks[0].section_continuation is False
# --- First-section-with-empty-text-but-document-has-title edge case ----------
def test_first_empty_section_with_title_is_processed_not_skipped() -> None:
"""The guard `(not document.title or section_idx > 0)` means: when
the doc has a title AND it's the first section, an empty text section
is NOT skipped. This pins current behavior so a refactor can't silently
change it."""
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text="", link="l0"), # empty first section, kept
Section(kind=SectionKind.TEXT, text="Real content.", link="l1"),
],
title="Has A Title",
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 1
assert chunks[0].content == "Real content."
# First (empty) section did register a link_offset at 0 before being
# overwritten; that offset is then reused when "Real content." is added,
# because shared_precompare_cleanup("") is still "". End state: {0: "l1"}
assert chunks[0].source_links == {0: "l1"}
# --- clean_text is applied to section text -----------------------------------
def test_clean_text_strips_control_chars_from_section_content() -> None:
"""clean_text() should remove control chars before the text enters the
accumulator — verifies the call isn't dropped by a refactor."""
dc = _make_document_chunker()
# NUL + BEL are control chars below 0x20 and not \n or \t → should be
# stripped by clean_text.
dirty = "Hello\x00 World\x07!"
doc = _make_doc(
sections=[Section(kind=SectionKind.TEXT, text=dirty, link="l1")]
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 1
assert chunks[0].content == "Hello World!"
# --- None-valued fields ------------------------------------------------------
def test_section_with_none_text_behaves_like_empty_string() -> None:
"""`section.text` may be None — the method coerces via
`str(section.text or "")`, so a None-text section behaves identically
to an empty one (skipped unless it's the first section of a titled doc)."""
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text="Alpha.", link="la"),
Section(kind=SectionKind.TEXT, text=None, link="lnone"), # idx 1 → skipped
Section(kind=SectionKind.TEXT, text="Beta.", link="lb"),
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 1
assert "Alpha." in chunks[0].content
assert "Beta." in chunks[0].content
assert "lnone" not in (chunks[0].source_links or {}).values()
# --- Trailing empty chunk suppression ----------------------------------------
def test_no_trailing_empty_chunk_when_last_section_was_image() -> None:
"""If the final section was an image (which emits its own chunk and
resets chunk_text), the safety `or not chunks` branch should NOT fire
because chunks is non-empty. Pin this explicitly."""
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(kind=SectionKind.TEXT, text="Leading text.", link="ltext"),
Section(
kind=SectionKind.IMAGE,
text="img summary",
link="limg",
image_file_id="img-final",
),
],
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
assert len(chunks) == 2
assert chunks[0].content == "Leading text."
assert chunks[0].image_file_id is None
assert chunks[1].content == "img summary"
assert chunks[1].image_file_id == "img-final"
# Crucially: no third empty chunk got appended at the end.
def test_no_trailing_empty_chunk_when_last_section_was_oversized() -> None:
"""Same guarantee for oversized sections: their splits fully clear the
accumulator, and the trailing safety branch should be a no-op."""
dc = _make_document_chunker()
big = (
"Alpha beta gamma. Delta epsilon zeta. Eta theta iota. "
"Kappa lambda mu. Nu xi omicron. Pi rho sigma. Tau upsilon phi. "
"Chi psi omega. One two three. Four five six. Seven eight nine. "
"Ten eleven twelve. Thirteen fourteen fifteen. Sixteen seventeen."
)
assert len(big) > CHUNK_LIMIT
doc = _make_doc(
sections=[Section(kind=SectionKind.TEXT, text=big, link="l-big")]
)
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
metadata_suffix_semantic="",
metadata_suffix_keyword="",
content_token_limit=CHUNK_LIMIT,
)
# Every chunk should be non-empty — no dangling "" chunk at the tail.
assert all(c.content.strip() for c in chunks)