mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-04-13 10:52:42 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f22bd593f6 | ||
|
|
e76ffbd4c3 | ||
|
|
696e88710d |
@@ -26,6 +26,10 @@ from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
|
||||
process_onyx_metadata,
|
||||
)
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
is_tabular_file,
|
||||
tabular_file_to_sections,
|
||||
)
|
||||
from onyx.connectors.exceptions import ConnectorValidationError
|
||||
from onyx.connectors.exceptions import CredentialExpiredError
|
||||
from onyx.connectors.exceptions import InsufficientPermissionsError
|
||||
@@ -451,6 +455,40 @@ class BlobStorageConnector(LoadConnector, PollConnector):
|
||||
logger.exception(f"Error processing image {key}")
|
||||
continue
|
||||
|
||||
# Handle tabular files (xlsx, csv, tsv) — produce one
|
||||
# TabularSection per sheet (or per file for csv/tsv)
|
||||
# instead of a flat TextSection.
|
||||
if is_tabular_file(file_name):
|
||||
try:
|
||||
downloaded_file = self._download_object(key)
|
||||
if downloaded_file is None:
|
||||
continue
|
||||
tabular_sections = tabular_file_to_sections(
|
||||
BytesIO(downloaded_file),
|
||||
file_name=file_name,
|
||||
link=link,
|
||||
)
|
||||
batch.append(
|
||||
Document(
|
||||
id=f"{self.bucket_type}:{self.bucket_name}:{key}",
|
||||
sections=(
|
||||
tabular_sections
|
||||
if tabular_sections
|
||||
else [TextSection(link=link, text="")]
|
||||
),
|
||||
source=DocumentSource(self.bucket_type.value),
|
||||
semantic_identifier=file_name,
|
||||
doc_updated_at=last_modified,
|
||||
metadata={},
|
||||
)
|
||||
)
|
||||
if len(batch) == self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
except Exception:
|
||||
logger.exception(f"Error processing tabular file {key}")
|
||||
continue
|
||||
|
||||
# Handle text and document files
|
||||
try:
|
||||
downloaded_file = self._download_object(key)
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
"""Helpers for converting tabular files (xlsx, csv, tsv) into
|
||||
TabularSection objects.
|
||||
|
||||
This lives in `connectors/cross_connector_utils` because:
|
||||
- It imports `TabularSection` from `connectors.models` (connector-layer type).
|
||||
- It calls `file_processing` primitives (`xlsx_sheet_extraction`, `file_io_to_text`)
|
||||
but does the connector-layer wrapping here so every connector that ingests
|
||||
tabular data can share the same section shape.
|
||||
"""
|
||||
|
||||
from typing import IO
|
||||
|
||||
from onyx.connectors.models import TabularSection
|
||||
from onyx.file_processing.extract_file_text import file_io_to_text
|
||||
from onyx.file_processing.extract_file_text import xlsx_sheet_extraction
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
# Extensions routed through this helper instead of the generic
|
||||
# `extract_text_and_images` path. Keep in sync with
|
||||
# `OnyxFileExtensions.TABULAR_EXTENSIONS`.
|
||||
TABULAR_FILE_EXTENSIONS = {".xlsx", ".csv", ".tsv"}
|
||||
|
||||
|
||||
def is_tabular_file(file_name: str) -> bool:
|
||||
"""Return True if the file extension indicates a tabular file
|
||||
(xlsx, csv, tsv)."""
|
||||
lowered = file_name.lower()
|
||||
return any(lowered.endswith(ext) for ext in TABULAR_FILE_EXTENSIONS)
|
||||
|
||||
|
||||
def tabular_file_to_sections(
|
||||
file: IO[bytes],
|
||||
file_name: str,
|
||||
link: str = "",
|
||||
) -> list[TabularSection]:
|
||||
"""Convert a tabular file into one or more TabularSections.
|
||||
|
||||
- `.xlsx` → one TabularSection per non-empty sheet, with
|
||||
`link=f"sheet:{title}"`.
|
||||
- `.csv` / `.tsv` → a single TabularSection containing the full
|
||||
decoded file, with `link=link` (falling back to `file_name` when
|
||||
the caller doesn't provide one — `TabularSection.link` is required).
|
||||
|
||||
Returns an empty list when the file yields no extractable content
|
||||
(empty workbook, empty csv, decode failure).
|
||||
|
||||
Raises `ValueError` if `file_name` isn't a recognized tabular
|
||||
extension — callers should gate on `is_tabular_file` first.
|
||||
"""
|
||||
lowered = file_name.lower()
|
||||
|
||||
if lowered.endswith(".xlsx"):
|
||||
return [
|
||||
TabularSection(link=f"sheet:{sheet_title}", text=csv_text)
|
||||
for csv_text, sheet_title in xlsx_sheet_extraction(
|
||||
file, file_name=file_name
|
||||
)
|
||||
]
|
||||
|
||||
if lowered.endswith((".csv", ".tsv")):
|
||||
try:
|
||||
text = file_io_to_text(file).strip()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to decode {file_name}: {e}")
|
||||
return []
|
||||
if not text:
|
||||
return []
|
||||
return [TabularSection(link=link or file_name, text=text)]
|
||||
|
||||
raise ValueError(f"{file_name!r} is not a tabular file")
|
||||
@@ -15,6 +15,10 @@ from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
|
||||
)
|
||||
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rate_limit_builder
|
||||
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rl_requests
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
is_tabular_file,
|
||||
tabular_file_to_sections,
|
||||
)
|
||||
from onyx.connectors.drupal_wiki.models import DrupalWikiCheckpoint
|
||||
from onyx.connectors.drupal_wiki.models import DrupalWikiPage
|
||||
from onyx.connectors.drupal_wiki.models import DrupalWikiPageResponse
|
||||
@@ -33,6 +37,7 @@ from onyx.connectors.models import DocumentFailure
|
||||
from onyx.connectors.models import HierarchyNode
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.connectors.models import TabularSection
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.file_processing.extract_file_text import extract_text_and_images
|
||||
from onyx.file_processing.extract_file_text import get_file_ext
|
||||
@@ -226,7 +231,7 @@ class DrupalWikiConnector(
|
||||
Tuple of (sections, error_message). If error_message is not None, the
|
||||
sections list should be treated as invalid.
|
||||
"""
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
|
||||
try:
|
||||
if not self._validate_attachment_filetype(attachment):
|
||||
@@ -273,6 +278,25 @@ class DrupalWikiConnector(
|
||||
|
||||
return sections, None
|
||||
|
||||
# Tabular attachments (xlsx, csv, tsv) — produce
|
||||
# TabularSections instead of a flat TextSection.
|
||||
if is_tabular_file(file_name):
|
||||
try:
|
||||
sections.extend(
|
||||
tabular_file_to_sections(
|
||||
BytesIO(raw_bytes),
|
||||
file_name=file_name,
|
||||
link=download_url,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to extract tabular sections from {file_name}: {e}"
|
||||
)
|
||||
if not sections:
|
||||
return [], f"No content extracted from tabular file {file_name}"
|
||||
return sections, None
|
||||
|
||||
image_counter = 0
|
||||
|
||||
def _store_embedded_image(image_data: bytes, image_name: str) -> None:
|
||||
|
||||
@@ -12,6 +12,10 @@ from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
|
||||
process_onyx_metadata,
|
||||
)
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
is_tabular_file,
|
||||
tabular_file_to_sections,
|
||||
)
|
||||
from onyx.connectors.interfaces import GenerateDocumentsOutput
|
||||
from onyx.connectors.interfaces import LoadConnector
|
||||
from onyx.connectors.models import Document
|
||||
@@ -145,6 +149,39 @@ def _process_file(
|
||||
logger.error(f"Failed to process image file {file_name}: {e}")
|
||||
return []
|
||||
|
||||
# 1b) If the file is tabular (xlsx/csv/tsv), produce one
|
||||
# TabularSection per sheet (or per file for csv/tsv) instead of
|
||||
# flattening through the generic text extractor.
|
||||
if is_tabular_file(file_name):
|
||||
file.seek(0)
|
||||
try:
|
||||
tabular_sections = tabular_file_to_sections(
|
||||
file=file,
|
||||
file_name=file_name,
|
||||
link=link or "",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process tabular file {file_name}: {e}")
|
||||
return []
|
||||
|
||||
if not tabular_sections:
|
||||
logger.warning(f"No content extracted from tabular file {file_name}")
|
||||
return []
|
||||
|
||||
return [
|
||||
Document(
|
||||
id=doc_id,
|
||||
sections=list(tabular_sections),
|
||||
source=source_type,
|
||||
semantic_identifier=file_display_name,
|
||||
title=title,
|
||||
doc_updated_at=time_updated,
|
||||
primary_owners=primary_owners,
|
||||
secondary_owners=secondary_owners,
|
||||
metadata=custom_tags,
|
||||
)
|
||||
]
|
||||
|
||||
# 2) Otherwise: text-based approach. Possibly with embedded images.
|
||||
file.seek(0)
|
||||
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
is_tabular_file,
|
||||
tabular_file_to_sections,
|
||||
)
|
||||
import io
|
||||
from collections.abc import Callable
|
||||
from datetime import datetime
|
||||
@@ -28,15 +32,16 @@ from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import DocumentFailure
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.connectors.models import TabularSection
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.file_processing.extract_file_text import extract_file_text
|
||||
from onyx.file_processing.extract_file_text import get_file_ext
|
||||
from onyx.file_processing.extract_file_text import pptx_to_text
|
||||
from onyx.file_processing.extract_file_text import read_docx_file
|
||||
from onyx.file_processing.extract_file_text import read_pdf_file
|
||||
from onyx.file_processing.extract_file_text import xlsx_to_text
|
||||
from onyx.file_processing.file_types import OnyxFileExtensions
|
||||
from onyx.file_processing.file_types import OnyxMimeTypes
|
||||
from onyx.file_processing.file_types import SPREADSHEET_MIME_TYPE
|
||||
from onyx.file_processing.image_utils import store_image_and_create_section
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.variable_functionality import (
|
||||
@@ -289,7 +294,7 @@ def _download_and_extract_sections_basic(
|
||||
service: GoogleDriveService,
|
||||
allow_images: bool,
|
||||
size_threshold: int,
|
||||
) -> list[TextSection | ImageSection]:
|
||||
) -> list[TextSection | ImageSection | TabularSection]:
|
||||
"""Extract text and images from a Google Drive file."""
|
||||
file_id = file["id"]
|
||||
file_name = file["name"]
|
||||
@@ -308,7 +313,7 @@ def _download_and_extract_sections_basic(
|
||||
return []
|
||||
|
||||
# Store images for later processing
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
try:
|
||||
section, embedded_id = store_image_and_create_section(
|
||||
image_data=response_call(),
|
||||
@@ -323,10 +328,9 @@ def _download_and_extract_sections_basic(
|
||||
logger.error(f"Failed to process image {file_name}: {e}")
|
||||
return sections
|
||||
|
||||
# For Google Docs, Sheets, and Slides, export as plain text
|
||||
# For Google Docs, Sheets, and Slides, export via the Drive API
|
||||
if mime_type in GOOGLE_MIME_TYPES_TO_EXPORT:
|
||||
export_mime_type = GOOGLE_MIME_TYPES_TO_EXPORT[mime_type]
|
||||
# Use the correct API call for exporting files
|
||||
request = service.files().export_media(
|
||||
fileId=file_id, mimeType=export_mime_type
|
||||
)
|
||||
@@ -335,6 +339,17 @@ def _download_and_extract_sections_basic(
|
||||
logger.warning(f"Failed to export {file_name} as {export_mime_type}")
|
||||
return []
|
||||
|
||||
if export_mime_type in OnyxMimeTypes.TABULAR_MIME_TYPES:
|
||||
# Synthesize an extension on the filename
|
||||
ext = ".xlsx" if export_mime_type == SPREADSHEET_MIME_TYPE else ".csv"
|
||||
return list(
|
||||
tabular_file_to_sections(
|
||||
io.BytesIO(response),
|
||||
file_name=f"{file_name}{ext}",
|
||||
link=link,
|
||||
)
|
||||
)
|
||||
|
||||
text = response.decode("utf-8")
|
||||
return [TextSection(link=link, text=text)]
|
||||
|
||||
@@ -356,9 +371,15 @@ def _download_and_extract_sections_basic(
|
||||
|
||||
elif (
|
||||
mime_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
||||
or is_tabular_file(file_name)
|
||||
):
|
||||
text = xlsx_to_text(io.BytesIO(response_call()), file_name=file_name)
|
||||
return [TextSection(link=link, text=text)] if text else []
|
||||
return list(
|
||||
tabular_file_to_sections(
|
||||
io.BytesIO(response_call()),
|
||||
file_name=file_name,
|
||||
link=link,
|
||||
)
|
||||
)
|
||||
|
||||
elif (
|
||||
mime_type
|
||||
@@ -410,8 +431,9 @@ def _find_nth(haystack: str, needle: str, n: int, start: int = 0) -> int:
|
||||
|
||||
|
||||
def align_basic_advanced(
|
||||
basic_sections: list[TextSection | ImageSection], adv_sections: list[TextSection]
|
||||
) -> list[TextSection | ImageSection]:
|
||||
basic_sections: list[TextSection | ImageSection | TabularSection],
|
||||
adv_sections: list[TextSection],
|
||||
) -> list[TextSection | ImageSection | TabularSection]:
|
||||
"""Align the basic sections with the advanced sections.
|
||||
In particular, the basic sections contain all content of the file,
|
||||
including smart chips like dates and doc links. The advanced sections
|
||||
@@ -428,7 +450,7 @@ def align_basic_advanced(
|
||||
basic_full_text = "".join(
|
||||
[section.text for section in basic_sections if isinstance(section, TextSection)]
|
||||
)
|
||||
new_sections: list[TextSection | ImageSection] = []
|
||||
new_sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
heading_start = 0
|
||||
for adv_ind in range(1, len(adv_sections)):
|
||||
heading = adv_sections[adv_ind].text.split(HEADING_DELIMITER)[0]
|
||||
@@ -599,7 +621,7 @@ def _convert_drive_item_to_document(
|
||||
"""
|
||||
Main entry point for converting a Google Drive file => Document object.
|
||||
"""
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
|
||||
# Only construct these services when needed
|
||||
def _get_drive_service() -> GoogleDriveService:
|
||||
@@ -639,7 +661,9 @@ def _convert_drive_item_to_document(
|
||||
doc_id=file.get("id", ""),
|
||||
)
|
||||
if doc_sections:
|
||||
sections = cast(list[TextSection | ImageSection], doc_sections)
|
||||
sections = cast(
|
||||
list[TextSection | ImageSection | TabularSection], doc_sections
|
||||
)
|
||||
if any(SMART_CHIP_CHAR in section.text for section in doc_sections):
|
||||
logger.debug(
|
||||
f"found smart chips in {file.get('name')}, aligning with basic sections"
|
||||
|
||||
@@ -39,6 +39,7 @@ class SectionKind(str, Enum):
|
||||
|
||||
TEXT = "text"
|
||||
IMAGE = "image"
|
||||
TABULAR = "tabular"
|
||||
|
||||
|
||||
class Section(BaseModel):
|
||||
@@ -70,6 +71,15 @@ class ImageSection(Section):
|
||||
return sys.getsizeof(self.image_file_id) + sys.getsizeof(self.link)
|
||||
|
||||
|
||||
class TabularSection(Section):
|
||||
"""Section containing tabular data (csv/tsv content, or one sheet of
|
||||
an xlsx workbook rendered as CSV)."""
|
||||
|
||||
kind: SectionKind = SectionKind.TABULAR
|
||||
text: str # CSV representation in a string
|
||||
link: str
|
||||
|
||||
|
||||
class BasicExpertInfo(BaseModel):
|
||||
"""Basic Information for the owner of a document, any of the fields can be left as None
|
||||
Display fallback goes as follows:
|
||||
@@ -172,7 +182,7 @@ class DocumentBase(BaseModel):
|
||||
"""Used for Onyx ingestion api, the ID is inferred before use if not provided"""
|
||||
|
||||
id: str | None = None
|
||||
sections: list[TextSection | ImageSection]
|
||||
sections: list[TextSection | ImageSection | TabularSection]
|
||||
source: DocumentSource | None = None
|
||||
semantic_identifier: str # displayed in the UI as the main identifier for the doc
|
||||
# TODO(andrei): Ideally we could improve this to where each value is just a
|
||||
|
||||
@@ -60,7 +60,12 @@ from onyx.connectors.models import ExternalAccess
|
||||
from onyx.connectors.models import HierarchyNode
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.connectors.models import TabularSection
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
|
||||
is_tabular_file,
|
||||
tabular_file_to_sections,
|
||||
)
|
||||
from onyx.connectors.sharepoint.connector_utils import get_sharepoint_external_access
|
||||
from onyx.db.enums import HierarchyNodeType
|
||||
from onyx.file_processing.extract_file_text import extract_text_and_images
|
||||
@@ -586,7 +591,7 @@ def _convert_driveitem_to_document_with_permissions(
|
||||
driveitem, f"Failed to download via graph api: {e}", e
|
||||
)
|
||||
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
sections: list[TextSection | ImageSection | TabularSection] = []
|
||||
file_ext = get_file_ext(driveitem.name)
|
||||
|
||||
if not content_bytes:
|
||||
@@ -602,6 +607,19 @@ def _convert_driveitem_to_document_with_permissions(
|
||||
)
|
||||
image_section.link = driveitem.web_url
|
||||
sections.append(image_section)
|
||||
elif is_tabular_file(driveitem.name):
|
||||
try:
|
||||
sections.extend(
|
||||
tabular_file_to_sections(
|
||||
file=io.BytesIO(content_bytes),
|
||||
file_name=driveitem.name,
|
||||
link=driveitem.web_url or "",
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to extract tabular sections for '{driveitem.name}': {e}"
|
||||
)
|
||||
else:
|
||||
|
||||
def _store_embedded_image(img_data: bytes, img_name: str) -> None:
|
||||
|
||||
@@ -462,30 +462,13 @@ def _remove_empty_runs(
|
||||
|
||||
return result
|
||||
|
||||
def xlsx_sheet_extraction(file: IO[Any], file_name: str = "") -> list[tuple[str, str]]:
|
||||
"""
|
||||
Converts each sheet in the excel file to a csv condensed string.
|
||||
Returns a string and the worksheet title for each worksheet
|
||||
|
||||
def xlsx_to_text(file: IO[Any], file_name: str = "") -> str:
|
||||
# TODO: switch back to this approach in a few months when markitdown
|
||||
# fixes their handling of excel files
|
||||
|
||||
# md = get_markitdown_converter()
|
||||
# stream_info = StreamInfo(
|
||||
# mimetype=SPREADSHEET_MIME_TYPE, filename=file_name or None, extension=".xlsx"
|
||||
# )
|
||||
# try:
|
||||
# workbook = md.convert(to_bytesio(file), stream_info=stream_info)
|
||||
# except (
|
||||
# BadZipFile,
|
||||
# ValueError,
|
||||
# FileConversionException,
|
||||
# UnsupportedFormatException,
|
||||
# ) as e:
|
||||
# error_str = f"Failed to extract text from {file_name or 'xlsx file'}: {e}"
|
||||
# if file_name.startswith("~"):
|
||||
# logger.debug(error_str + " (this is expected for files with ~)")
|
||||
# else:
|
||||
# logger.warning(error_str)
|
||||
# return ""
|
||||
# return workbook.markdown
|
||||
Returns a list of (csv_text, sheet)
|
||||
"""
|
||||
try:
|
||||
workbook = openpyxl.load_workbook(file, read_only=True)
|
||||
except BadZipFile as e:
|
||||
@@ -494,23 +477,30 @@ def xlsx_to_text(file: IO[Any], file_name: str = "") -> str:
|
||||
logger.debug(error_str + " (this is expected for files with ~)")
|
||||
else:
|
||||
logger.warning(error_str)
|
||||
return ""
|
||||
return []
|
||||
except Exception as e:
|
||||
if any(s in str(e) for s in KNOWN_OPENPYXL_BUGS):
|
||||
logger.error(
|
||||
f"Failed to extract text from {file_name or 'xlsx file'}. This happens due to a bug in openpyxl. {e}"
|
||||
)
|
||||
return ""
|
||||
return []
|
||||
raise
|
||||
|
||||
text_content = []
|
||||
sheets: list[tuple[str, str]] = []
|
||||
for sheet in workbook.worksheets:
|
||||
sheet_matrix = _clean_worksheet_matrix(_worksheet_to_matrix(sheet))
|
||||
buf = io.StringIO()
|
||||
writer = csv.writer(buf, lineterminator="\n")
|
||||
writer.writerows(sheet_matrix)
|
||||
text_content.append(buf.getvalue().rstrip("\n"))
|
||||
return TEXT_SECTION_SEPARATOR.join(text_content)
|
||||
csv_text = buf.getvalue().rstrip("\n")
|
||||
if csv_text.strip():
|
||||
sheets.append((csv_text, sheet.title))
|
||||
return sheets
|
||||
|
||||
|
||||
def xlsx_to_text(file: IO[Any], file_name: str = "") -> str:
|
||||
sheets = xlsx_sheet_extraction(file, file_name)
|
||||
return TEXT_SECTION_SEPARATOR.join(csv_text for csv_text, _title in sheets)
|
||||
|
||||
|
||||
def eml_to_text(file: IO[Any]) -> str:
|
||||
|
||||
@@ -7,6 +7,7 @@ from onyx.indexing.chunking.image_section_chunker import ImageChunker
|
||||
from onyx.indexing.chunking.section_chunker import AccumulatorState
|
||||
from onyx.indexing.chunking.section_chunker import ChunkPayload
|
||||
from onyx.indexing.chunking.section_chunker import SectionChunker
|
||||
from onyx.indexing.chunking.tabular_section_chunker import TabularChunker
|
||||
from onyx.indexing.chunking.text_section_chunker import TextChunker
|
||||
from onyx.indexing.models import DocAwareChunk
|
||||
from onyx.natural_language_processing.utils import BaseTokenizer
|
||||
@@ -38,6 +39,7 @@ class DocumentChunker:
|
||||
chunk_splitter=chunk_splitter,
|
||||
),
|
||||
SectionKind.IMAGE: ImageChunker(),
|
||||
SectionKind.TABULAR: TabularChunker(tokenizer=tokenizer),
|
||||
}
|
||||
|
||||
def chunk(
|
||||
|
||||
333
backend/onyx/indexing/chunking/tabular_section_chunker.py
Normal file
333
backend/onyx/indexing/chunking/tabular_section_chunker.py
Normal file
@@ -0,0 +1,333 @@
|
||||
import csv
|
||||
import io
|
||||
from collections.abc import Callable
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from onyx.connectors.models import Section
|
||||
from onyx.indexing.chunking.section_chunker import AccumulatorState
|
||||
from onyx.indexing.chunking.section_chunker import ChunkPayload
|
||||
from onyx.indexing.chunking.section_chunker import SectionChunker
|
||||
from onyx.indexing.chunking.section_chunker import SectionChunkerOutput
|
||||
from onyx.natural_language_processing.utils import BaseTokenizer
|
||||
from onyx.utils.logger import setup_logger
|
||||
from shared_configs.configs import STRICT_CHUNK_TOKEN_LIMIT
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
# --- Markers / separators used in emitted chunks --------------------------
|
||||
|
||||
ROWS_MARKER = "Rows:"
|
||||
COLUMNS_MARKER = "Columns:"
|
||||
FIELD_VALUE_SEPARATOR = ", "
|
||||
ROW_JOIN = "\n"
|
||||
|
||||
# Minimum per-chunk row budget. Guards against a prelude so large that no
|
||||
# row could possibly fit — keeps at least a token or two of headroom so
|
||||
# the chunk still carries something.
|
||||
_MIN_ROW_BUDGET_TOKENS = 16
|
||||
|
||||
|
||||
# --- Parsing --------------------------------------------------------------
|
||||
|
||||
|
||||
class _ParsedSection(BaseModel):
|
||||
sheet_name: str
|
||||
link: str
|
||||
headers: list[str]
|
||||
rows: list[list[str]]
|
||||
|
||||
|
||||
def _parse_section(section: Section) -> _ParsedSection | None:
|
||||
"""Parse a CSV-encoded tabular section into headers + rows.
|
||||
|
||||
The first non-empty row is treated as the header. Blank rows are
|
||||
skipped so stray separator lines don't produce ghost rows. A CSV
|
||||
with only a header row is still parseable (returns empty rows).
|
||||
"""
|
||||
section_text = section.text or ""
|
||||
if not section_text.strip():
|
||||
return None
|
||||
|
||||
reader = csv.reader(io.StringIO(section_text))
|
||||
non_empty_rows = [
|
||||
row for row in reader if any(cell.strip() for cell in row)
|
||||
]
|
||||
if not non_empty_rows:
|
||||
return None
|
||||
|
||||
return _ParsedSection(
|
||||
sheet_name=section.link or "",
|
||||
link=section.link or "",
|
||||
headers=non_empty_rows[0],
|
||||
rows=non_empty_rows[1:],
|
||||
)
|
||||
|
||||
|
||||
# --- Step 1: FORMATTING ---------------------------------------------------
|
||||
#
|
||||
# Converts header + row → a single formatted string. Swap these out to
|
||||
# change the textual representation of rows in chunks (e.g. JSON-line,
|
||||
# bullet-list, markdown table row, etc.) without touching packing.
|
||||
|
||||
|
||||
def format_columns_header(headers: list[str]) -> str:
|
||||
"""Format the 'Columns:' line that appears in every chunk's prelude."""
|
||||
return f"{COLUMNS_MARKER} " + FIELD_VALUE_SEPARATOR.join(headers)
|
||||
|
||||
|
||||
def format_row_field_value(headers: list[str], row: list[str]) -> str:
|
||||
"""Format one row as ``col=val, col=val, ...``.
|
||||
|
||||
- Missing trailing cells (row shorter than headers) are treated as empty.
|
||||
- Empty values are dropped; omitting them keeps chunks dense with
|
||||
retrieval-relevant content rather than padded with ``col=``.
|
||||
"""
|
||||
parts: list[str] = []
|
||||
for i, header in enumerate(headers):
|
||||
value = row[i] if i < len(row) else ""
|
||||
if not value.strip():
|
||||
continue
|
||||
parts.append(f"{header}={value}")
|
||||
return FIELD_VALUE_SEPARATOR.join(parts)
|
||||
|
||||
|
||||
# --- Step 2: PACKING ------------------------------------------------------
|
||||
#
|
||||
# Given formatted row strings + a prelude + a token budget, emit a list of
|
||||
# chunk strings that each fit within the budget. Swap this out to change
|
||||
# the packing strategy (e.g. one-row-per-chunk, fixed-row-count, etc.)
|
||||
# without touching formatting.
|
||||
|
||||
|
||||
class _RowPacker:
|
||||
"""Packs formatted rows into chunks under a token limit.
|
||||
|
||||
Each emitted chunk looks like::
|
||||
|
||||
<prelude>
|
||||
<row 1>
|
||||
<row 2>
|
||||
...
|
||||
|
||||
The prelude is repeated at the top of every chunk so each chunk is
|
||||
self-describing for downstream retrieval.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
prelude: str,
|
||||
token_counter: Callable[[str], int],
|
||||
max_tokens: int,
|
||||
strict: bool,
|
||||
) -> None:
|
||||
self.prelude = prelude
|
||||
self.token_counter = token_counter
|
||||
self.max_tokens = max_tokens
|
||||
self.strict = strict
|
||||
|
||||
prelude_tokens = token_counter(prelude)
|
||||
# Budget for the rows alone, reserving room for the prelude plus
|
||||
# the newline that joins it to the row block.
|
||||
self._row_budget = max(
|
||||
_MIN_ROW_BUDGET_TOKENS, max_tokens - prelude_tokens - 1
|
||||
)
|
||||
|
||||
def pack(self, rows: list[str]) -> list[str]:
|
||||
chunks: list[str] = []
|
||||
buf: list[str] = []
|
||||
buf_tokens = 0
|
||||
|
||||
for row in rows:
|
||||
if not row:
|
||||
continue
|
||||
row_tokens = self.token_counter(row)
|
||||
|
||||
# Row that won't fit its own chunk: flush, split, emit each
|
||||
# piece as a standalone chunk.
|
||||
if row_tokens > self._row_budget:
|
||||
if buf:
|
||||
chunks.append(self._assemble(buf))
|
||||
buf, buf_tokens = [], 0
|
||||
for piece in self._split_oversized_row(row):
|
||||
chunks.append(self._assemble([piece]))
|
||||
continue
|
||||
|
||||
# +1 accounts for the newline separating rows in the buffer.
|
||||
sep_tokens = 1 if buf else 0
|
||||
if buf and buf_tokens + sep_tokens + row_tokens > self._row_budget:
|
||||
chunks.append(self._assemble(buf))
|
||||
buf, buf_tokens = [], 0
|
||||
sep_tokens = 0
|
||||
|
||||
buf.append(row)
|
||||
buf_tokens += sep_tokens + row_tokens
|
||||
|
||||
if buf:
|
||||
chunks.append(self._assemble(buf))
|
||||
return chunks
|
||||
|
||||
def _assemble(self, rows: list[str]) -> str:
|
||||
return self.prelude + ROW_JOIN + ROW_JOIN.join(rows)
|
||||
|
||||
def _split_oversized_row(self, row: str) -> list[str]:
|
||||
"""Split a single over-budget row.
|
||||
|
||||
First pass splits at ``field=value`` boundaries to preserve the
|
||||
column-level structure. If ``strict`` is set and any resulting
|
||||
piece is still over budget, fall back to a hard character-level
|
||||
split so no chunk ever exceeds ``max_tokens``.
|
||||
"""
|
||||
pieces = _split_by_field_boundary(
|
||||
row, self._row_budget, self.token_counter
|
||||
)
|
||||
|
||||
if not self.strict:
|
||||
return pieces
|
||||
|
||||
out: list[str] = []
|
||||
for piece in pieces:
|
||||
if self.token_counter(piece) > self._row_budget:
|
||||
out.extend(_hard_split_by_chars(piece, self._row_budget, self.token_counter))
|
||||
else:
|
||||
out.append(piece)
|
||||
return out
|
||||
|
||||
|
||||
def _split_by_field_boundary(
|
||||
row: str,
|
||||
max_tokens: int,
|
||||
token_counter: Callable[[str], int],
|
||||
) -> list[str]:
|
||||
"""Greedy split of a ``col=val, col=val, ...`` row at ``, `` boundaries."""
|
||||
parts = row.split(FIELD_VALUE_SEPARATOR)
|
||||
pieces: list[str] = []
|
||||
buf: list[str] = []
|
||||
buf_tokens = 0
|
||||
sep_tokens = token_counter(FIELD_VALUE_SEPARATOR)
|
||||
|
||||
for part in parts:
|
||||
part_tokens = token_counter(part)
|
||||
add_sep = sep_tokens if buf else 0
|
||||
if buf and buf_tokens + add_sep + part_tokens > max_tokens:
|
||||
pieces.append(FIELD_VALUE_SEPARATOR.join(buf))
|
||||
buf, buf_tokens = [part], part_tokens
|
||||
else:
|
||||
buf.append(part)
|
||||
buf_tokens += add_sep + part_tokens
|
||||
|
||||
if buf:
|
||||
pieces.append(FIELD_VALUE_SEPARATOR.join(buf))
|
||||
return pieces
|
||||
|
||||
|
||||
def _hard_split_by_chars(
|
||||
text: str,
|
||||
max_tokens: int,
|
||||
token_counter: Callable[[str], int],
|
||||
) -> list[str]:
|
||||
"""Last-resort character split when field-level splitting can't
|
||||
reduce a piece below ``max_tokens`` (e.g. a single field contains a
|
||||
giant value). Approximates via chars-per-token from the input string
|
||||
itself, then slices."""
|
||||
total_tokens = max(1, token_counter(text))
|
||||
approx_chars_per_token = max(1, len(text) // total_tokens)
|
||||
window = max(1, max_tokens * approx_chars_per_token)
|
||||
return [text[i : i + window] for i in range(0, len(text), window)]
|
||||
|
||||
|
||||
# --- Step 3: ORCHESTRATION ------------------------------------------------
|
||||
|
||||
|
||||
class TabularChunker(SectionChunker):
|
||||
"""Chunks tabular sections (csv text) into row-packed field=value chunks.
|
||||
|
||||
Each emitted chunk carries a prelude (sheet name + Rows: marker +
|
||||
Columns: header line) followed by as many ``col=val, col=val``
|
||||
rows as fit under ``content_token_limit``. Rows too large for a
|
||||
single chunk are split at field boundaries (and, under
|
||||
``STRICT_CHUNK_TOKEN_LIMIT``, hard-split by characters as a fallback).
|
||||
"""
|
||||
|
||||
def __init__(self, tokenizer: BaseTokenizer) -> None:
|
||||
self.tokenizer = tokenizer
|
||||
|
||||
def chunk_section(
|
||||
self,
|
||||
section: Section,
|
||||
accumulator: AccumulatorState,
|
||||
content_token_limit: int,
|
||||
) -> SectionChunkerOutput:
|
||||
assert section.text is not None
|
||||
|
||||
parsed = _parse_section(section)
|
||||
if parsed is None:
|
||||
logger.warning(
|
||||
f"TabularChunker: skipping unparseable section (link={section.link})"
|
||||
)
|
||||
return SectionChunkerOutput(payloads=[], accumulator=accumulator)
|
||||
|
||||
# Tabular sections are structurally standalone — flush any pending
|
||||
# text buffer before emitting our own chunks, matching ImageChunker.
|
||||
payloads = accumulator.flush_to_list()
|
||||
|
||||
prelude = self._build_prelude(parsed)
|
||||
formatted_rows = [
|
||||
line
|
||||
for line in (
|
||||
format_row_field_value(parsed.headers, row)
|
||||
for row in parsed.rows
|
||||
)
|
||||
if line
|
||||
]
|
||||
|
||||
# Header-only table (no non-empty rows): emit a single
|
||||
# prelude-only chunk so the column schema is still indexed.
|
||||
if not formatted_rows:
|
||||
payloads.append(
|
||||
ChunkPayload(
|
||||
text=prelude,
|
||||
links={0: parsed.link},
|
||||
is_continuation=False,
|
||||
)
|
||||
)
|
||||
return SectionChunkerOutput(
|
||||
payloads=payloads,
|
||||
accumulator=AccumulatorState(),
|
||||
)
|
||||
|
||||
packer = _RowPacker(
|
||||
prelude=prelude,
|
||||
token_counter=self._count_tokens,
|
||||
max_tokens=content_token_limit,
|
||||
strict=STRICT_CHUNK_TOKEN_LIMIT,
|
||||
)
|
||||
chunk_texts = packer.pack(formatted_rows)
|
||||
|
||||
for i, text in enumerate(chunk_texts):
|
||||
payloads.append(
|
||||
ChunkPayload(
|
||||
text=text,
|
||||
links={0: parsed.link},
|
||||
is_continuation=(i != 0),
|
||||
)
|
||||
)
|
||||
|
||||
return SectionChunkerOutput(
|
||||
payloads=payloads,
|
||||
accumulator=AccumulatorState(),
|
||||
)
|
||||
|
||||
def _build_prelude(self, parsed: _ParsedSection) -> str:
|
||||
"""The per-chunk header: sheet name (if any) + ``Rows:`` marker
|
||||
+ ``Columns:`` header line. Swap this to change the prelude shape."""
|
||||
parts: list[str] = []
|
||||
if parsed.sheet_name:
|
||||
parts.append(parsed.sheet_name)
|
||||
parts.append(ROWS_MARKER)
|
||||
parts.append(format_columns_header(parsed.headers))
|
||||
return ROW_JOIN.join(parts)
|
||||
|
||||
def _count_tokens(self, text: str) -> int:
|
||||
return len(self.tokenizer.encode(text))
|
||||
@@ -4,6 +4,7 @@ from typing import cast
|
||||
import openpyxl
|
||||
from openpyxl.worksheet.worksheet import Worksheet
|
||||
|
||||
from onyx.file_processing.extract_file_text import xlsx_sheet_extraction
|
||||
from onyx.file_processing.extract_file_text import xlsx_to_text
|
||||
|
||||
|
||||
@@ -196,3 +197,136 @@ class TestXlsxToText:
|
||||
assert "r1c1" in lines[0] and "r1c2" in lines[0]
|
||||
assert "r2c1" in lines[1] and "r2c2" in lines[1]
|
||||
assert "r3c1" in lines[2] and "r3c2" in lines[2]
|
||||
|
||||
|
||||
class TestXlsxSheetExtraction:
|
||||
def test_one_tuple_per_sheet(self) -> None:
|
||||
xlsx = _make_xlsx(
|
||||
{
|
||||
"Revenue": [["Month", "Amount"], ["Jan", "100"]],
|
||||
"Expenses": [["Category", "Cost"], ["Rent", "500"]],
|
||||
}
|
||||
)
|
||||
sheets = xlsx_sheet_extraction(xlsx)
|
||||
assert len(sheets) == 2
|
||||
# Order preserved from workbook sheet order
|
||||
titles = [title for _csv, title in sheets]
|
||||
assert titles == ["Revenue", "Expenses"]
|
||||
# Content present in the right tuple
|
||||
revenue_csv, _ = sheets[0]
|
||||
expenses_csv, _ = sheets[1]
|
||||
assert "Month" in revenue_csv
|
||||
assert "Jan" in revenue_csv
|
||||
assert "Category" in expenses_csv
|
||||
assert "Rent" in expenses_csv
|
||||
|
||||
def test_tuple_structure_is_csv_text_then_title(self) -> None:
|
||||
"""The tuple order is (csv_text, sheet_title) — pin it so callers
|
||||
that unpack positionally don't silently break."""
|
||||
xlsx = _make_xlsx({"MySheet": [["a", "b"]]})
|
||||
sheets = xlsx_sheet_extraction(xlsx)
|
||||
assert len(sheets) == 1
|
||||
csv_text, title = sheets[0]
|
||||
assert title == "MySheet"
|
||||
assert "a" in csv_text
|
||||
assert "b" in csv_text
|
||||
|
||||
def test_empty_sheet_is_skipped(self) -> None:
|
||||
"""A sheet whose CSV output is empty/whitespace-only should NOT
|
||||
appear in the result — the `if csv_text.strip():` guard filters
|
||||
it out."""
|
||||
xlsx = _make_xlsx(
|
||||
{
|
||||
"Data": [["a", "b"]],
|
||||
"Empty": [],
|
||||
}
|
||||
)
|
||||
sheets = xlsx_sheet_extraction(xlsx)
|
||||
assert len(sheets) == 1
|
||||
assert sheets[0][1] == "Data"
|
||||
|
||||
def test_empty_workbook_returns_empty_list(self) -> None:
|
||||
"""All sheets empty → empty list (not a list of empty tuples)."""
|
||||
xlsx = _make_xlsx({"Sheet1": [], "Sheet2": []})
|
||||
sheets = xlsx_sheet_extraction(xlsx)
|
||||
assert sheets == []
|
||||
|
||||
def test_single_sheet(self) -> None:
|
||||
xlsx = _make_xlsx({"Only": [["x", "y"], ["1", "2"]]})
|
||||
sheets = xlsx_sheet_extraction(xlsx)
|
||||
assert len(sheets) == 1
|
||||
csv_text, title = sheets[0]
|
||||
assert title == "Only"
|
||||
assert "x" in csv_text
|
||||
assert "1" in csv_text
|
||||
|
||||
def test_bad_zip_returns_empty_list(self) -> None:
|
||||
bad_file = io.BytesIO(b"not a zip file")
|
||||
sheets = xlsx_sheet_extraction(bad_file, file_name="test.xlsx")
|
||||
assert sheets == []
|
||||
|
||||
def test_bad_zip_tilde_file_returns_empty_list(self) -> None:
|
||||
"""`~$`-prefixed files are Excel lock files; failure should log
|
||||
at debug (not warning) and still return []."""
|
||||
bad_file = io.BytesIO(b"not a zip file")
|
||||
sheets = xlsx_sheet_extraction(bad_file, file_name="~$temp.xlsx")
|
||||
assert sheets == []
|
||||
|
||||
def test_csv_content_matches_xlsx_to_text_per_sheet(self) -> None:
|
||||
"""For a single-sheet workbook, xlsx_to_text output should equal
|
||||
the csv_text from xlsx_sheet_extraction — they share the same
|
||||
per-sheet CSV-ification logic."""
|
||||
single_sheet_data = [["Name", "Age"], ["Alice", "30"]]
|
||||
expected_text = xlsx_to_text(_make_xlsx({"People": single_sheet_data}))
|
||||
|
||||
sheets = xlsx_sheet_extraction(_make_xlsx({"People": single_sheet_data}))
|
||||
assert len(sheets) == 1
|
||||
csv_text, title = sheets[0]
|
||||
assert title == "People"
|
||||
assert csv_text.strip() == expected_text.strip()
|
||||
|
||||
def test_commas_in_cells_are_quoted(self) -> None:
|
||||
xlsx = _make_xlsx({"S1": [["hello, world", "normal"]]})
|
||||
sheets = xlsx_sheet_extraction(xlsx)
|
||||
assert len(sheets) == 1
|
||||
csv_text, _ = sheets[0]
|
||||
assert '"hello, world"' in csv_text
|
||||
|
||||
def test_long_empty_row_run_capped_within_sheet(self) -> None:
|
||||
"""The matrix cleanup applies per-sheet: >2 empty rows collapse
|
||||
to 2, which keeps the sheet non-empty and it still appears in
|
||||
the result."""
|
||||
xlsx = _make_xlsx(
|
||||
{
|
||||
"S1": [
|
||||
["header"],
|
||||
[""],
|
||||
[""],
|
||||
[""],
|
||||
[""],
|
||||
["data"],
|
||||
]
|
||||
}
|
||||
)
|
||||
sheets = xlsx_sheet_extraction(xlsx)
|
||||
assert len(sheets) == 1
|
||||
csv_text, _ = sheets[0]
|
||||
lines = [line for line in csv_text.strip().split("\n") if line.strip()]
|
||||
# header + 2 empty (capped) + data = 4 lines
|
||||
assert len(lines) == 4
|
||||
assert "header" in lines[0]
|
||||
assert "data" in lines[-1]
|
||||
|
||||
def test_sheet_title_with_special_chars_preserved(self) -> None:
|
||||
"""Spaces, punctuation, unicode in sheet titles are preserved
|
||||
verbatim — the title is used as a link anchor downstream."""
|
||||
xlsx = _make_xlsx(
|
||||
{
|
||||
"Q1 Revenue (USD)": [["a", "b"]],
|
||||
"Données": [["c", "d"]],
|
||||
}
|
||||
)
|
||||
sheets = xlsx_sheet_extraction(xlsx)
|
||||
titles = [title for _csv, title in sheets]
|
||||
assert "Q1 Revenue (USD)" in titles
|
||||
assert "Données" in titles
|
||||
|
||||
312
backend/tests/unit/onyx/indexing/test_tabular_section_chunker.py
Normal file
312
backend/tests/unit/onyx/indexing/test_tabular_section_chunker.py
Normal file
@@ -0,0 +1,312 @@
|
||||
"""End-to-end tests for `TabularChunker.chunk_section`.
|
||||
|
||||
Each test is structured as:
|
||||
INPUT — the CSV text passed to the chunker + token budget + link
|
||||
EXPECTED — the exact chunk texts the chunker should emit
|
||||
ACT — a single call to `chunk_section`
|
||||
ASSERT — literal equality against the expected chunk texts
|
||||
|
||||
A character-level tokenizer (1 char == 1 token) is used so token-budget
|
||||
arithmetic is deterministic and expected chunks can be spelled out
|
||||
exactly.
|
||||
"""
|
||||
|
||||
from onyx.connectors.models import Section
|
||||
from onyx.connectors.models import SectionKind
|
||||
from onyx.indexing.chunking.section_chunker import AccumulatorState
|
||||
from onyx.indexing.chunking.tabular_section_chunker import TabularChunker
|
||||
from onyx.natural_language_processing.utils import BaseTokenizer
|
||||
|
||||
|
||||
class CharTokenizer(BaseTokenizer):
|
||||
def encode(self, string: str) -> list[int]:
|
||||
return [ord(c) for c in string]
|
||||
|
||||
def tokenize(self, string: str) -> list[str]:
|
||||
return list(string)
|
||||
|
||||
def decode(self, tokens: list[int]) -> str:
|
||||
return "".join(chr(t) for t in tokens)
|
||||
|
||||
|
||||
def _make_chunker() -> TabularChunker:
|
||||
return TabularChunker(tokenizer=CharTokenizer())
|
||||
|
||||
|
||||
def _tabular_section(text: str, link: str = "sheet:Test") -> Section:
|
||||
return Section(kind=SectionKind.TABULAR, text=text, link=link)
|
||||
|
||||
|
||||
class TestTabularChunkerChunkSection:
|
||||
def test_simple_csv_all_rows_fit_one_chunk(self) -> None:
|
||||
# --- INPUT -----------------------------------------------------
|
||||
csv_text = (
|
||||
"Name,Age,City\n"
|
||||
"Alice,30,NYC\n"
|
||||
"Bob,25,SF\n"
|
||||
)
|
||||
link = "sheet:People"
|
||||
content_token_limit = 500
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
(
|
||||
"sheet:People\n"
|
||||
"Rows:\n"
|
||||
"Columns: Name, Age, City\n"
|
||||
"Name=Alice, Age=30, City=NYC\n"
|
||||
"Name=Bob, Age=25, City=SF"
|
||||
),
|
||||
]
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
AccumulatorState(),
|
||||
content_token_limit=content_token_limit,
|
||||
)
|
||||
|
||||
# --- ASSERT ----------------------------------------------------
|
||||
assert [p.text for p in out.payloads] == expected_texts
|
||||
assert [p.is_continuation for p in out.payloads] == [False]
|
||||
assert all(p.links == {0: link} for p in out.payloads)
|
||||
assert out.accumulator.is_empty()
|
||||
|
||||
def test_overflow_splits_into_two_deterministic_chunks(self) -> None:
|
||||
# --- INPUT -----------------------------------------------------
|
||||
# prelude = "sheet:S\nRows:\nColumns: col, val" (31 chars = 31 tokens)
|
||||
# At content_token_limit=57, row_budget = max(16, 57-31-1) = 25.
|
||||
# Each row "col=a, val=1" is 12 tokens; two rows + \n = 25 (fits),
|
||||
# three rows + 2×\n = 38 (overflows) → split after 2 rows.
|
||||
csv_text = (
|
||||
"col,val\n"
|
||||
"a,1\n"
|
||||
"b,2\n"
|
||||
"c,3\n"
|
||||
"d,4\n"
|
||||
)
|
||||
link = "sheet:S"
|
||||
content_token_limit = 57
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
(
|
||||
"sheet:S\n"
|
||||
"Rows:\n"
|
||||
"Columns: col, val\n"
|
||||
"col=a, val=1\n"
|
||||
"col=b, val=2"
|
||||
),
|
||||
(
|
||||
"sheet:S\n"
|
||||
"Rows:\n"
|
||||
"Columns: col, val\n"
|
||||
"col=c, val=3\n"
|
||||
"col=d, val=4"
|
||||
),
|
||||
]
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
AccumulatorState(),
|
||||
content_token_limit=content_token_limit,
|
||||
)
|
||||
|
||||
# --- ASSERT ----------------------------------------------------
|
||||
assert [p.text for p in out.payloads] == expected_texts
|
||||
# First chunk is fresh; subsequent chunks mark as continuations.
|
||||
assert [p.is_continuation for p in out.payloads] == [False, True]
|
||||
# Link carries through every chunk.
|
||||
assert all(p.links == {0: link} for p in out.payloads)
|
||||
|
||||
def test_header_only_csv_produces_single_prelude_chunk(self) -> None:
|
||||
# --- INPUT -----------------------------------------------------
|
||||
csv_text = "col1,col2\n"
|
||||
link = "sheet:Headers"
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
"sheet:Headers\nRows:\nColumns: col1, col2",
|
||||
]
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
AccumulatorState(),
|
||||
content_token_limit=500,
|
||||
)
|
||||
|
||||
# --- ASSERT ----------------------------------------------------
|
||||
assert [p.text for p in out.payloads] == expected_texts
|
||||
|
||||
def test_empty_cells_dropped_from_chunk_text(self) -> None:
|
||||
# --- INPUT -----------------------------------------------------
|
||||
# Alice's Age is empty; Bob's City is empty. Empty cells should
|
||||
# not appear as `field=` pairs in the output.
|
||||
csv_text = (
|
||||
"Name,Age,City\n"
|
||||
"Alice,,NYC\n"
|
||||
"Bob,25,\n"
|
||||
)
|
||||
link = "sheet:P"
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
(
|
||||
"sheet:P\n"
|
||||
"Rows:\n"
|
||||
"Columns: Name, Age, City\n"
|
||||
"Name=Alice, City=NYC\n"
|
||||
"Name=Bob, Age=25"
|
||||
),
|
||||
]
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
AccumulatorState(),
|
||||
content_token_limit=500,
|
||||
)
|
||||
|
||||
# --- ASSERT ----------------------------------------------------
|
||||
assert [p.text for p in out.payloads] == expected_texts
|
||||
|
||||
def test_quoted_commas_in_csv_preserved_as_one_field(self) -> None:
|
||||
# --- INPUT -----------------------------------------------------
|
||||
# "Hello, world" is quoted in the CSV, so it's a single field
|
||||
# value containing a comma — not two cells.
|
||||
csv_text = (
|
||||
'Name,Notes\n'
|
||||
'Alice,"Hello, world"\n'
|
||||
)
|
||||
link = "sheet:P"
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
(
|
||||
"sheet:P\n"
|
||||
"Rows:\n"
|
||||
"Columns: Name, Notes\n"
|
||||
"Name=Alice, Notes=Hello, world"
|
||||
),
|
||||
]
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
AccumulatorState(),
|
||||
content_token_limit=500,
|
||||
)
|
||||
|
||||
# --- ASSERT ----------------------------------------------------
|
||||
assert [p.text for p in out.payloads] == expected_texts
|
||||
|
||||
def test_blank_rows_in_csv_are_skipped(self) -> None:
|
||||
# --- INPUT -----------------------------------------------------
|
||||
# Stray blank rows in the CSV (e.g. export artifacts) shouldn't
|
||||
# produce ghost rows in the output.
|
||||
csv_text = (
|
||||
"A,B\n"
|
||||
"\n"
|
||||
"1,2\n"
|
||||
"\n"
|
||||
"\n"
|
||||
"3,4\n"
|
||||
)
|
||||
link = "sheet:S"
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
(
|
||||
"sheet:S\n"
|
||||
"Rows:\n"
|
||||
"Columns: A, B\n"
|
||||
"A=1, B=2\n"
|
||||
"A=3, B=4"
|
||||
),
|
||||
]
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
AccumulatorState(),
|
||||
content_token_limit=500,
|
||||
)
|
||||
|
||||
# --- ASSERT ----------------------------------------------------
|
||||
assert [p.text for p in out.payloads] == expected_texts
|
||||
|
||||
def test_accumulator_flushes_before_tabular_chunks(self) -> None:
|
||||
# --- INPUT -----------------------------------------------------
|
||||
# A text accumulator was populated by the prior text section.
|
||||
# Tabular sections are structural boundaries, so the pending
|
||||
# text is flushed as its own chunk before the tabular content.
|
||||
pending_text = "prior paragraph from an earlier text section"
|
||||
pending_link = "prev-link"
|
||||
|
||||
csv_text = (
|
||||
"a,b\n"
|
||||
"1,2\n"
|
||||
)
|
||||
link = "sheet:S"
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts = [
|
||||
pending_text, # flushed accumulator
|
||||
(
|
||||
"sheet:S\n"
|
||||
"Rows:\n"
|
||||
"Columns: a, b\n"
|
||||
"a=1, b=2"
|
||||
),
|
||||
]
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section(csv_text, link=link),
|
||||
AccumulatorState(
|
||||
text=pending_text,
|
||||
link_offsets={0: pending_link},
|
||||
),
|
||||
content_token_limit=500,
|
||||
)
|
||||
|
||||
# --- ASSERT ----------------------------------------------------
|
||||
assert [p.text for p in out.payloads] == expected_texts
|
||||
# Flushed chunk keeps the prior text's link; tabular chunk uses
|
||||
# the tabular section's link.
|
||||
assert out.payloads[0].links == {0: pending_link}
|
||||
assert out.payloads[1].links == {0: link}
|
||||
# Accumulator resets — tabular section is a structural boundary.
|
||||
assert out.accumulator.is_empty()
|
||||
|
||||
def test_empty_tabular_section_returns_no_payloads_and_preserves_accumulator(
|
||||
self,
|
||||
) -> None:
|
||||
# --- INPUT -----------------------------------------------------
|
||||
# Malformed/empty tabular section should not flush the text
|
||||
# accumulator — the caller (DocumentChunker) handles skip logic;
|
||||
# we preserve the accumulator so subsequent sections can use it.
|
||||
pending_text = "prior paragraph"
|
||||
pending_link_offsets = {0: "prev-link"}
|
||||
|
||||
# --- EXPECTED --------------------------------------------------
|
||||
expected_texts: list[str] = []
|
||||
expected_accumulator_text = pending_text
|
||||
expected_accumulator_offsets = pending_link_offsets
|
||||
|
||||
# --- ACT -------------------------------------------------------
|
||||
out = _make_chunker().chunk_section(
|
||||
_tabular_section("", link="sheet:Empty"),
|
||||
AccumulatorState(
|
||||
text=pending_text,
|
||||
link_offsets=pending_link_offsets,
|
||||
),
|
||||
content_token_limit=500,
|
||||
)
|
||||
|
||||
# --- ASSERT ----------------------------------------------------
|
||||
assert [p.text for p in out.payloads] == expected_texts
|
||||
assert out.accumulator.text == expected_accumulator_text
|
||||
assert out.accumulator.link_offsets == expected_accumulator_offsets
|
||||
Reference in New Issue
Block a user