Compare commits

..

2 Commits

Author SHA1 Message Date
Dane Urban
5ccc016d3c . 2026-04-11 21:28:49 -07:00
Dane Urban
1289e69607 Refactor stuff 2026-04-11 21:03:17 -07:00
6 changed files with 454 additions and 296 deletions

View File

@@ -1,5 +1,3 @@
from typing import cast
from chonkie import SentenceChunker
from onyx.configs.app_configs import AVERAGE_SUMMARY_EMBEDDINGS
@@ -15,17 +13,15 @@ from onyx.configs.constants import SECTION_SEPARATOR
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
get_metadata_keys_to_ignore,
)
from onyx.indexing.document_chunker.document_chunker import DocumentChunker
from onyx.indexing.document_chunker.section_chunker import extract_blurb
from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import Section
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.indexing.models import DocAwareChunk
from onyx.llm.utils import MAX_CONTEXT_TOKENS
from onyx.natural_language_processing.utils import BaseTokenizer
from onyx.utils.logger import setup_logger
from onyx.utils.text_processing import clean_text
from onyx.utils.text_processing import shared_precompare_cleanup
from shared_configs.configs import DOC_EMBEDDING_CONTEXT_SIZE
from shared_configs.configs import STRICT_CHUNK_TOKEN_LIMIT
# Not supporting overlaps, we need a clean combination of chunks and it is unclear if overlaps
# actually help quality at all
@@ -154,9 +150,6 @@ class Chunker:
self.tokenizer = tokenizer
self.callback = callback
self.max_context = 0
self.prompt_tokens = 0
# Create a token counter function that returns the count instead of the tokens
def token_counter(text: str) -> int:
return len(tokenizer.encode(text))
@@ -186,234 +179,12 @@ class Chunker:
else None
)
def _split_oversized_chunk(self, text: str, content_token_limit: int) -> list[str]:
"""
Splits the text into smaller chunks based on token count to ensure
no chunk exceeds the content_token_limit.
"""
tokens = self.tokenizer.tokenize(text)
chunks = []
start = 0
total_tokens = len(tokens)
while start < total_tokens:
end = min(start + content_token_limit, total_tokens)
token_chunk = tokens[start:end]
chunk_text = " ".join(token_chunk)
chunks.append(chunk_text)
start = end
return chunks
def _extract_blurb(self, text: str) -> str:
"""
Extract a short blurb from the text (first chunk of size `blurb_size`).
"""
# chunker is in `text` mode
texts = cast(list[str], self.blurb_splitter.chunk(text))
if not texts:
return ""
return texts[0]
def _get_mini_chunk_texts(self, chunk_text: str) -> list[str] | None:
"""
For "multipass" mode: additional sub-chunks (mini-chunks) for use in certain embeddings.
"""
if self.mini_chunk_splitter and chunk_text.strip():
# chunker is in `text` mode
return cast(list[str], self.mini_chunk_splitter.chunk(chunk_text))
return None
# ADDED: extra param image_url to store in the chunk
def _create_chunk(
self,
document: IndexingDocument,
chunks_list: list[DocAwareChunk],
text: str,
links: dict[int, str],
is_continuation: bool = False,
title_prefix: str = "",
metadata_suffix_semantic: str = "",
metadata_suffix_keyword: str = "",
image_file_id: str | None = None,
) -> None:
"""
Helper to create a new DocAwareChunk, append it to chunks_list.
"""
new_chunk = DocAwareChunk(
source_document=document,
chunk_id=len(chunks_list),
blurb=self._extract_blurb(text),
content=text,
source_links=links or {0: ""},
image_file_id=image_file_id,
section_continuation=is_continuation,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
mini_chunk_texts=self._get_mini_chunk_texts(text),
large_chunk_id=None,
doc_summary="",
chunk_context="",
contextual_rag_reserved_tokens=0, # set per-document in _handle_single_document
self._document_chunker = DocumentChunker(
tokenizer=tokenizer,
blurb_splitter=self.blurb_splitter,
chunk_splitter=self.chunk_splitter,
mini_chunk_splitter=self.mini_chunk_splitter,
)
chunks_list.append(new_chunk)
def _chunk_document_with_sections(
self,
document: IndexingDocument,
sections: list[Section],
title_prefix: str,
metadata_suffix_semantic: str,
metadata_suffix_keyword: str,
content_token_limit: int,
) -> list[DocAwareChunk]:
"""
Loops through sections of the document, converting them into one or more chunks.
Works with processed sections that are base Section objects.
"""
chunks: list[DocAwareChunk] = []
link_offsets: dict[int, str] = {}
chunk_text = ""
for section_idx, section in enumerate(sections):
# Get section text and other attributes
section_text = clean_text(str(section.text or ""))
section_link_text = section.link or ""
image_url = section.image_file_id
# If there is no useful content, skip
if not section_text and (not document.title or section_idx > 0):
logger.warning(
f"Skipping empty or irrelevant section in doc {document.semantic_identifier}, link={section_link_text}"
)
continue
# CASE 1: If this section has an image, force a separate chunk
if image_url:
# First, if we have any partially built text chunk, finalize it
if chunk_text.strip():
self._create_chunk(
document,
chunks,
chunk_text,
link_offsets,
is_continuation=False,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
)
chunk_text = ""
link_offsets = {}
# Create a chunk specifically for this image section
# (Using the text summary that was generated during processing)
self._create_chunk(
document,
chunks,
section_text,
links={0: section_link_text} if section_link_text else {},
image_file_id=image_url,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
)
# Continue to next section
continue
# CASE 2: Normal text section
section_token_count = len(self.tokenizer.encode(section_text))
# If the section is large on its own, split it separately
if section_token_count > content_token_limit:
if chunk_text.strip():
self._create_chunk(
document,
chunks,
chunk_text,
link_offsets,
False,
title_prefix,
metadata_suffix_semantic,
metadata_suffix_keyword,
)
chunk_text = ""
link_offsets = {}
# chunker is in `text` mode
split_texts = cast(list[str], self.chunk_splitter.chunk(section_text))
for i, split_text in enumerate(split_texts):
# If even the split_text is bigger than strict limit, further split
if (
STRICT_CHUNK_TOKEN_LIMIT
and len(self.tokenizer.encode(split_text)) > content_token_limit
):
smaller_chunks = self._split_oversized_chunk(
split_text, content_token_limit
)
for j, small_chunk in enumerate(smaller_chunks):
self._create_chunk(
document,
chunks,
small_chunk,
{0: section_link_text},
is_continuation=(j != 0),
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
)
else:
self._create_chunk(
document,
chunks,
split_text,
{0: section_link_text},
is_continuation=(i != 0),
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
)
continue
# If we can still fit this section into the current chunk, do so
current_token_count = len(self.tokenizer.encode(chunk_text))
current_offset = len(shared_precompare_cleanup(chunk_text))
next_section_tokens = (
len(self.tokenizer.encode(SECTION_SEPARATOR)) + section_token_count
)
if next_section_tokens + current_token_count <= content_token_limit:
if chunk_text:
chunk_text += SECTION_SEPARATOR
chunk_text += section_text
link_offsets[current_offset] = section_link_text
else:
# finalize the existing chunk
self._create_chunk(
document,
chunks,
chunk_text,
link_offsets,
False,
title_prefix,
metadata_suffix_semantic,
metadata_suffix_keyword,
)
# start a new chunk
link_offsets = {0: section_link_text}
chunk_text = section_text
# finalize any leftover text chunk
if chunk_text.strip() or not chunks:
self._create_chunk(
document,
chunks,
chunk_text,
link_offsets or {0: ""}, # safe default
False,
title_prefix,
metadata_suffix_semantic,
metadata_suffix_keyword,
)
return chunks
def _handle_single_document(
self, document: IndexingDocument
@@ -423,7 +194,10 @@ class Chunker:
logger.debug(f"Chunking {document.semantic_identifier}")
# Title prep
title = self._extract_blurb(document.get_title_for_document_index() or "")
title = extract_blurb(
document.get_title_for_document_index() or "",
self.blurb_splitter,
)
title_prefix = title + RETURN_SEPARATOR if title else ""
title_tokens = len(self.tokenizer.encode(title_prefix))
@@ -491,7 +265,7 @@ class Chunker:
# Use processed_sections if available (IndexingDocument), otherwise use original sections
sections_to_chunk = document.processed_sections
normal_chunks = self._chunk_document_with_sections(
normal_chunks = self._document_chunker.chunk(
document,
sections_to_chunk,
title_prefix,

View File

@@ -0,0 +1,107 @@
from chonkie import SentenceChunker
from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import Section
from onyx.indexing.document_chunker.image_section_chunker import ImageChunker
from onyx.indexing.document_chunker.section_chunker import AccumulatorState
from onyx.indexing.document_chunker.section_chunker import ChunkPayload
from onyx.indexing.document_chunker.section_chunker import SectionChunker
from onyx.indexing.document_chunker.text_section_chunker import TextChunker
from onyx.indexing.models import DocAwareChunk
from onyx.natural_language_processing.utils import BaseTokenizer
from onyx.utils.logger import setup_logger
from onyx.utils.text_processing import clean_text
logger = setup_logger()
class DocumentChunker:
"""Converts a document's processed sections into DocAwareChunks.
Drop-in replacement for `Chunker._chunk_document_with_sections`.
"""
def __init__(
self,
tokenizer: BaseTokenizer,
blurb_splitter: SentenceChunker,
chunk_splitter: SentenceChunker,
mini_chunk_splitter: SentenceChunker | None = None,
) -> None:
self.blurb_splitter = blurb_splitter
self.mini_chunk_splitter = mini_chunk_splitter
self._text_chunker = TextChunker(
tokenizer=tokenizer,
chunk_splitter=chunk_splitter,
)
self._image_chunker = ImageChunker()
def chunk(
self,
document: IndexingDocument,
sections: list[Section],
title_prefix: str,
metadata_suffix_semantic: str,
metadata_suffix_keyword: str,
content_token_limit: int,
) -> list[DocAwareChunk]:
payloads = self._collect_section_payloads(
document=document,
sections=sections,
content_token_limit=content_token_limit,
)
if not payloads:
payloads.append(ChunkPayload(text="", links={0: ""}))
return [
payload.to_doc_aware_chunk(
document=document,
chunk_id=idx,
blurb_splitter=self.blurb_splitter,
mini_chunk_splitter=self.mini_chunk_splitter,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
)
for idx, payload in enumerate(payloads)
]
def _collect_section_payloads(
self,
document: IndexingDocument,
sections: list[Section],
content_token_limit: int,
) -> list[ChunkPayload]:
accumulator = AccumulatorState()
payloads: list[ChunkPayload] = []
for section_idx, section in enumerate(sections):
section_text = clean_text(str(section.text or ""))
if not section_text and (
not document.title or section_idx > 0
):
logger.warning(
f"Skipping empty or irrelevant section in doc "
f"{document.semantic_identifier}, link={section.link}"
)
continue
chunker = self._select_chunker(section)
result = chunker.chunk_section(
section=section,
accumulator=accumulator,
content_token_limit=content_token_limit,
)
payloads.extend(result.payloads)
accumulator = result.accumulator
payloads.extend(accumulator.flush_to_list())
return payloads
def _select_chunker(self, section: Section) -> SectionChunker:
if section.image_file_id is not None:
return self._image_chunker
return self._text_chunker

View File

@@ -0,0 +1,34 @@
from onyx.connectors.models import Section
from onyx.indexing.document_chunker.section_chunker import AccumulatorState
from onyx.indexing.document_chunker.section_chunker import ChunkPayload
from onyx.indexing.document_chunker.section_chunker import SectionChunker
from onyx.indexing.document_chunker.section_chunker import SectionChunkerOutput
from onyx.utils.text_processing import clean_text
class ImageChunker(SectionChunker):
def chunk_section(
self,
section: Section,
accumulator: AccumulatorState,
content_token_limit: int, # noqa: ARG002
) -> SectionChunkerOutput:
assert section.image_file_id is not None
section_text = clean_text(str(section.text or ""))
section_link = section.link or ""
payloads = accumulator.flush_to_list()
payloads.append(
ChunkPayload(
text=section_text,
links={0: section_link} if section_link else {},
image_file_id=section.image_file_id,
is_continuation=False,
)
)
return SectionChunkerOutput(
payloads=payloads,
accumulator=AccumulatorState(),
)

View File

@@ -0,0 +1,102 @@
from abc import ABC
from abc import abstractmethod
from typing import cast
from chonkie import SentenceChunker
from pydantic import BaseModel
from pydantic import Field
from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import Section
from onyx.indexing.models import DocAwareChunk
def extract_blurb(text: str, blurb_splitter: SentenceChunker) -> str:
texts = cast(list[str], blurb_splitter.chunk(text))
if not texts:
return ""
return texts[0]
def get_mini_chunk_texts(
chunk_text: str,
mini_chunk_splitter: SentenceChunker | None,
) -> list[str] | None:
if mini_chunk_splitter and chunk_text.strip():
return cast(list[str], mini_chunk_splitter.chunk(chunk_text))
return None
class ChunkPayload(BaseModel):
"""Section-local chunk content without document-scoped fields.
The orchestrator upgrades these to DocAwareChunks via
`to_doc_aware_chunk` after assigning chunk_ids and attaching
title/metadata.
"""
text: str
links: dict[int, str]
is_continuation: bool = False
image_file_id: str | None = None
def to_doc_aware_chunk(
self,
document: IndexingDocument,
chunk_id: int,
blurb_splitter: SentenceChunker,
title_prefix: str = "",
metadata_suffix_semantic: str = "",
metadata_suffix_keyword: str = "",
mini_chunk_splitter: SentenceChunker | None = None,
) -> DocAwareChunk:
return DocAwareChunk(
source_document=document,
chunk_id=chunk_id,
blurb=extract_blurb(self.text, blurb_splitter),
content=self.text,
source_links=self.links or {0: ""},
image_file_id=self.image_file_id,
section_continuation=self.is_continuation,
title_prefix=title_prefix,
metadata_suffix_semantic=metadata_suffix_semantic,
metadata_suffix_keyword=metadata_suffix_keyword,
mini_chunk_texts=get_mini_chunk_texts(
self.text, mini_chunk_splitter
),
large_chunk_id=None,
doc_summary="",
chunk_context="",
contextual_rag_reserved_tokens=0,
)
class AccumulatorState(BaseModel):
"""Cross-section text buffer threaded through SectionChunkers."""
text: str = ""
link_offsets: dict[int, str] = Field(default_factory=dict)
def is_empty(self) -> bool:
return not self.text.strip()
def flush_to_list(self) -> list["ChunkPayload"]:
if self.is_empty():
return []
return [ChunkPayload(text=self.text, links=self.link_offsets)]
class SectionChunkerOutput(BaseModel):
payloads: list[ChunkPayload]
accumulator: AccumulatorState
class SectionChunker(ABC):
@abstractmethod
def chunk_section(
self,
section: Section,
accumulator: AccumulatorState,
content_token_limit: int,
) -> SectionChunkerOutput:
...

View File

@@ -0,0 +1,129 @@
from typing import cast
from chonkie import SentenceChunker
from onyx.configs.constants import SECTION_SEPARATOR
from onyx.connectors.models import Section
from onyx.indexing.document_chunker.section_chunker import AccumulatorState
from onyx.indexing.document_chunker.section_chunker import ChunkPayload
from onyx.indexing.document_chunker.section_chunker import SectionChunker
from onyx.indexing.document_chunker.section_chunker import SectionChunkerOutput
from onyx.natural_language_processing.utils import BaseTokenizer
from onyx.utils.text_processing import clean_text
from onyx.utils.text_processing import shared_precompare_cleanup
from shared_configs.configs import STRICT_CHUNK_TOKEN_LIMIT
class TextChunker(SectionChunker):
def __init__(
self,
tokenizer: BaseTokenizer,
chunk_splitter: SentenceChunker,
) -> None:
self.tokenizer = tokenizer
self.chunk_splitter = chunk_splitter
def chunk_section(
self,
section: Section,
accumulator: AccumulatorState,
content_token_limit: int,
) -> SectionChunkerOutput:
section_text = clean_text(str(section.text or ""))
section_link = section.link or ""
section_token_count = len(self.tokenizer.encode(section_text))
# Oversized — flush buffer and split the section
if section_token_count > content_token_limit:
return self._handle_oversized_section(
section_text=section_text,
section_link=section_link,
accumulator=accumulator,
content_token_limit=content_token_limit,
)
current_token_count = len(self.tokenizer.encode(accumulator.text))
next_section_tokens = (
len(self.tokenizer.encode(SECTION_SEPARATOR)) + section_token_count
)
# Fits — extend the accumulator
if next_section_tokens + current_token_count <= content_token_limit:
offset = len(shared_precompare_cleanup(accumulator.text))
new_text = accumulator.text
if new_text:
new_text += SECTION_SEPARATOR
new_text += section_text
return SectionChunkerOutput(
payloads=[],
accumulator=AccumulatorState(
text=new_text,
link_offsets={**accumulator.link_offsets, offset: section_link},
),
)
# Doesn't fit — flush buffer and restart with this section
return SectionChunkerOutput(
payloads=accumulator.flush_to_list(),
accumulator=AccumulatorState(
text=section_text,
link_offsets={0: section_link},
),
)
def _handle_oversized_section(
self,
section_text: str,
section_link: str,
accumulator: AccumulatorState,
content_token_limit: int,
) -> SectionChunkerOutput:
payloads = accumulator.flush_to_list()
split_texts = cast(
list[str], self.chunk_splitter.chunk(section_text)
)
for i, split_text in enumerate(split_texts):
if (
STRICT_CHUNK_TOKEN_LIMIT
and len(self.tokenizer.encode(split_text)) > content_token_limit
):
smaller_chunks = self._split_oversized_chunk(
split_text, content_token_limit
)
for j, small_chunk in enumerate(smaller_chunks):
payloads.append(
ChunkPayload(
text=small_chunk,
links={0: section_link},
is_continuation=(j != 0),
)
)
else:
payloads.append(
ChunkPayload(
text=split_text,
links={0: section_link},
is_continuation=(i != 0),
)
)
return SectionChunkerOutput(
payloads=payloads,
accumulator=AccumulatorState(),
)
def _split_oversized_chunk(
self, text: str, content_token_limit: int
) -> list[str]:
tokens = self.tokenizer.tokenize(text)
chunks: list[str] = []
start = 0
total_tokens = len(tokens)
while start < total_tokens:
end = min(start + content_token_limit, total_tokens)
token_chunk = tokens[start:end]
chunk_text = " ".join(token_chunk)
chunks.append(chunk_text)
start = end
return chunks

View File

@@ -1,4 +1,5 @@
"""Unit tests for Chunker._chunk_document_with_sections.
"""Unit tests for DocumentChunker.chunk (replacement for
Chunker._chunk_document_with_sections).
These tests use a fake character-level tokenizer so every char counts as
exactly one token. This makes token-limit arithmetic deterministic and lets
@@ -7,13 +8,14 @@ models into the test.
"""
import pytest
from chonkie import SentenceChunker
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import SECTION_SEPARATOR
from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import Section
from onyx.indexing import chunker as chunker_module
from onyx.indexing.chunker import Chunker
from onyx.indexing.document_chunker import text_section_chunker as text_chunker_module
from onyx.indexing.document_chunker.document_chunker import DocumentChunker
from onyx.natural_language_processing.utils import BaseTokenizer
@@ -35,16 +37,26 @@ class CharTokenizer(BaseTokenizer):
CHUNK_LIMIT = 200
def _make_chunker(
def _make_document_chunker(
chunk_token_limit: int = CHUNK_LIMIT,
enable_multipass: bool = False,
) -> Chunker:
return Chunker(
) -> DocumentChunker:
def token_counter(text: str) -> int:
return len(text)
return DocumentChunker(
tokenizer=CharTokenizer(),
enable_multipass=enable_multipass,
enable_large_chunks=False,
enable_contextual_rag=False,
chunk_token_limit=chunk_token_limit,
blurb_splitter=SentenceChunker(
tokenizer_or_token_counter=token_counter,
chunk_size=128,
chunk_overlap=0,
return_type="texts",
),
chunk_splitter=SentenceChunker(
tokenizer_or_token_counter=token_counter,
chunk_size=chunk_token_limit,
chunk_overlap=0,
return_type="texts",
),
)
@@ -70,10 +82,10 @@ def _make_doc(
def test_empty_processed_sections_returns_single_empty_safety_chunk() -> None:
"""No sections at all should still yield one empty chunk (the
`or not chunks` safety branch at the end)."""
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(sections=[])
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=[],
title_prefix="TITLE\n",
@@ -95,13 +107,13 @@ def test_empty_processed_sections_returns_single_empty_safety_chunk() -> None:
def test_empty_section_on_first_position_without_title_is_skipped() -> None:
"""Doc has no title, first section has empty text — the guard
`(not document.title or section_idx > 0)` means it IS skipped."""
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(
sections=[Section(text="", link="l0")],
title=None,
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -117,7 +129,7 @@ def test_empty_section_on_first_position_without_title_is_skipped() -> None:
def test_empty_section_on_later_position_is_skipped_even_with_title() -> None:
"""Index > 0 empty sections are skipped regardless of title."""
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(text="Alpha.", link="l0"),
@@ -126,7 +138,7 @@ def test_empty_section_on_later_position_is_skipped_even_with_title() -> None:
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -146,10 +158,10 @@ def test_empty_section_on_later_position_is_skipped_even_with_title() -> None:
def test_single_small_text_section_becomes_one_chunk() -> None:
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(sections=[Section(text="Hello world.", link="https://a")])
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="TITLE\n",
@@ -173,7 +185,7 @@ def test_single_small_text_section_becomes_one_chunk() -> None:
def test_multiple_small_sections_combine_into_one_chunk() -> None:
chunker = _make_chunker()
dc = _make_document_chunker()
sections = [
Section(text="Part one.", link="l1"),
Section(text="Part two.", link="l2"),
@@ -181,7 +193,7 @@ def test_multiple_small_sections_combine_into_one_chunk() -> None:
]
doc = _make_doc(sections=sections)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -205,7 +217,7 @@ def test_multiple_small_sections_combine_into_one_chunk() -> None:
def test_sections_overflow_into_second_chunk() -> None:
"""Two sections that together exceed content_token_limit should
finalize the first as one chunk and start a new one."""
chunker = _make_chunker()
dc = _make_document_chunker()
# char-level: 120 char section → 120 tokens. 2 of these plus separator
# exceed a 200-token limit, forcing a flush.
a = "A" * 120
@@ -217,7 +229,7 @@ def test_sections_overflow_into_second_chunk() -> None:
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -244,7 +256,7 @@ def test_sections_overflow_into_second_chunk() -> None:
def test_image_only_section_produces_single_chunk_with_image_id() -> None:
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(
@@ -255,7 +267,7 @@ def test_image_only_section_produces_single_chunk_with_image_id() -> None:
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -273,7 +285,7 @@ def test_image_only_section_produces_single_chunk_with_image_id() -> None:
def test_image_section_flushes_pending_text_and_creates_its_own_chunk() -> None:
"""A buffered text section followed by an image section:
the pending text should be flushed first, then the image chunk."""
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(text="Pending text.", link="ltext"),
@@ -286,7 +298,7 @@ def test_image_section_flushes_pending_text_and_creates_its_own_chunk() -> None:
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -315,14 +327,14 @@ def test_image_section_flushes_pending_text_and_creates_its_own_chunk() -> None:
def test_image_section_without_link_gets_empty_links_dict() -> None:
"""If an image section has no link, links param is {} (not {0: ""})."""
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(text="img", link=None, image_file_id="img-xyz"),
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -333,7 +345,7 @@ def test_image_section_without_link_gets_empty_links_dict() -> None:
assert len(chunks) == 1
assert chunks[0].image_file_id == "img-xyz"
# _create_chunk falls back to {0: ""} when given an empty dict
# to_doc_aware_chunk falls back to {0: ""} when given an empty dict
assert chunks[0].source_links == {0: ""}
@@ -344,7 +356,7 @@ def test_oversized_section_is_split_across_multiple_chunks() -> None:
"""A section whose text exceeds content_token_limit should be passed
through chunk_splitter and yield >1 chunks; only the first is not a
continuation."""
chunker = _make_chunker()
dc = _make_document_chunker()
# Build a section whose char-count is well over CHUNK_LIMIT (200), made
# of many short sentences so chonkie's SentenceChunker can split cleanly.
section_text = (
@@ -360,7 +372,7 @@ def test_oversized_section_is_split_across_multiple_chunks() -> None:
sections=[Section(text=section_text, link="big-link")],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -387,7 +399,7 @@ def test_oversized_section_is_split_across_multiple_chunks() -> None:
def test_oversized_section_flushes_pending_text_first() -> None:
"""A buffered text section followed by an oversized section should
flush the pending chunk first, then emit the split chunks."""
chunker = _make_chunker()
dc = _make_document_chunker()
pending = "Pending buffered text."
big = (
"Alpha beta gamma. Delta epsilon zeta. Eta theta iota. "
@@ -404,7 +416,7 @@ def test_oversized_section_flushes_pending_text_first() -> None:
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -433,7 +445,7 @@ def test_oversized_section_flushes_pending_text_first() -> None:
def test_title_prefix_and_metadata_propagate_to_all_chunks() -> None:
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(text="A" * 120, link="la"),
@@ -441,7 +453,7 @@ def test_title_prefix_and_metadata_propagate_to_all_chunks() -> None:
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="MY_TITLE\n",
@@ -461,7 +473,7 @@ def test_title_prefix_and_metadata_propagate_to_all_chunks() -> None:
def test_chunk_ids_are_sequential_starting_at_zero() -> None:
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(text="A" * 120, link="la"),
@@ -470,7 +482,7 @@ def test_chunk_ids_are_sequential_starting_at_zero() -> None:
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -488,7 +500,7 @@ def test_chunk_ids_are_sequential_starting_at_zero() -> None:
def test_overflow_flush_then_subsequent_section_joins_new_chunk() -> None:
"""After an overflow flush starts a new chunk, the next fitting section
should combine into that same new chunk (not spawn a third)."""
chunker = _make_chunker()
dc = _make_document_chunker()
# 120 + 120 > 200 → first two sections produce two chunks.
# Third section is small (20 chars) → should fit with second.
doc = _make_doc(
@@ -499,7 +511,7 @@ def test_overflow_flush_then_subsequent_section_joins_new_chunk() -> None:
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -519,7 +531,7 @@ def test_small_section_after_oversized_starts_a_fresh_chunk() -> None:
"""After an oversized section is emitted as its own chunks, the internal
accumulator should be empty so a following small section starts a new
chunk instead of being swallowed."""
chunker = _make_chunker()
dc = _make_document_chunker()
big = (
"Alpha beta gamma. Delta epsilon zeta. Eta theta iota. "
"Kappa lambda mu. Nu xi omicron. Pi rho sigma. Tau upsilon phi. "
@@ -534,7 +546,7 @@ def test_small_section_after_oversized_starts_a_fresh_chunk() -> None:
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -563,14 +575,14 @@ def test_strict_chunk_token_limit_subdivides_oversized_split(
"""When STRICT_CHUNK_TOKEN_LIMIT is enabled and chonkie's chunk_splitter
still produces a piece larger than content_token_limit (e.g. a single
no-period run), the code must fall back to _split_oversized_chunk."""
monkeypatch.setattr(chunker_module, "STRICT_CHUNK_TOKEN_LIMIT", True)
chunker = _make_chunker()
monkeypatch.setattr(text_chunker_module, "STRICT_CHUNK_TOKEN_LIMIT", True)
dc = _make_document_chunker()
# 500 non-whitespace chars with no sentence boundaries — chonkie will
# return it as one oversized piece (>200) which triggers the fallback.
run = "a" * 500
doc = _make_doc(sections=[Section(text=run, link="l-run")])
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -600,12 +612,12 @@ def test_strict_chunk_token_limit_disabled_allows_oversized_split(
) -> None:
"""Same pathological input, but with STRICT disabled: the oversized
split is emitted verbatim as a single chunk (current behavior)."""
monkeypatch.setattr(chunker_module, "STRICT_CHUNK_TOKEN_LIMIT", False)
chunker = _make_chunker()
monkeypatch.setattr(text_chunker_module, "STRICT_CHUNK_TOKEN_LIMIT", False)
dc = _make_document_chunker()
run = "a" * 500
doc = _make_doc(sections=[Section(text=run, link="l-run")])
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -627,7 +639,7 @@ def test_first_empty_section_with_title_is_processed_not_skipped() -> None:
the doc has a title AND it's the first section, an empty text section
is NOT skipped. This pins current behavior so a refactor can't silently
change it."""
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(text="", link="l0"), # empty first section, kept
@@ -636,7 +648,7 @@ def test_first_empty_section_with_title_is_processed_not_skipped() -> None:
title="Has A Title",
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -659,13 +671,13 @@ def test_first_empty_section_with_title_is_processed_not_skipped() -> None:
def test_clean_text_strips_control_chars_from_section_content() -> None:
"""clean_text() should remove control chars before the text enters the
accumulator — verifies the call isn't dropped by a refactor."""
chunker = _make_chunker()
dc = _make_document_chunker()
# NUL + BEL are control chars below 0x20 and not \n or \t → should be
# stripped by clean_text.
dirty = "Hello\x00 World\x07!"
doc = _make_doc(sections=[Section(text=dirty, link="l1")])
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -685,7 +697,7 @@ def test_section_with_none_text_behaves_like_empty_string() -> None:
"""`section.text` may be None — the method coerces via
`str(section.text or "")`, so a None-text section behaves identically
to an empty one (skipped unless it's the first section of a titled doc)."""
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(text="Alpha.", link="la"),
@@ -694,7 +706,7 @@ def test_section_with_none_text_behaves_like_empty_string() -> None:
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -716,7 +728,7 @@ def test_no_trailing_empty_chunk_when_last_section_was_image() -> None:
"""If the final section was an image (which emits its own chunk and
resets chunk_text), the safety `or not chunks` branch should NOT fire
because chunks is non-empty. Pin this explicitly."""
chunker = _make_chunker()
dc = _make_document_chunker()
doc = _make_doc(
sections=[
Section(text="Leading text.", link="ltext"),
@@ -726,7 +738,7 @@ def test_no_trailing_empty_chunk_when_last_section_was_image() -> None:
],
)
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",
@@ -746,7 +758,7 @@ def test_no_trailing_empty_chunk_when_last_section_was_image() -> None:
def test_no_trailing_empty_chunk_when_last_section_was_oversized() -> None:
"""Same guarantee for oversized sections: their splits fully clear the
accumulator, and the trailing safety branch should be a no-op."""
chunker = _make_chunker()
dc = _make_document_chunker()
big = (
"Alpha beta gamma. Delta epsilon zeta. Eta theta iota. "
"Kappa lambda mu. Nu xi omicron. Pi rho sigma. Tau upsilon phi. "
@@ -756,7 +768,7 @@ def test_no_trailing_empty_chunk_when_last_section_was_oversized() -> None:
assert len(big) > CHUNK_LIMIT
doc = _make_doc(sections=[Section(text=big, link="l-big")])
chunks = chunker._chunk_document_with_sections(
chunks = dc.chunk(
document=doc,
sections=doc.processed_sections,
title_prefix="",