Compare commits

..

3 Commits

Author SHA1 Message Date
Dane Urban
f22bd593f6 Tabular Chunker 2026-04-12 19:28:41 -07:00
Dane Urban
e76ffbd4c3 Connectors output TabularSection 2026-04-12 15:52:26 -07:00
Dane Urban
696e88710d Tabular log 2026-04-12 15:23:12 -07:00
12 changed files with 1038 additions and 43 deletions

View File

@@ -26,6 +26,10 @@ from onyx.configs.constants import FileOrigin
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
process_onyx_metadata,
)
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
is_tabular_file,
tabular_file_to_sections,
)
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
@@ -451,6 +455,40 @@ class BlobStorageConnector(LoadConnector, PollConnector):
logger.exception(f"Error processing image {key}")
continue
# Handle tabular files (xlsx, csv, tsv) — produce one
# TabularSection per sheet (or per file for csv/tsv)
# instead of a flat TextSection.
if is_tabular_file(file_name):
try:
downloaded_file = self._download_object(key)
if downloaded_file is None:
continue
tabular_sections = tabular_file_to_sections(
BytesIO(downloaded_file),
file_name=file_name,
link=link,
)
batch.append(
Document(
id=f"{self.bucket_type}:{self.bucket_name}:{key}",
sections=(
tabular_sections
if tabular_sections
else [TextSection(link=link, text="")]
),
source=DocumentSource(self.bucket_type.value),
semantic_identifier=file_name,
doc_updated_at=last_modified,
metadata={},
)
)
if len(batch) == self.batch_size:
yield batch
batch = []
except Exception:
logger.exception(f"Error processing tabular file {key}")
continue
# Handle text and document files
try:
downloaded_file = self._download_object(key)

View File

@@ -0,0 +1,73 @@
"""Helpers for converting tabular files (xlsx, csv, tsv) into
TabularSection objects.
This lives in `connectors/cross_connector_utils` because:
- It imports `TabularSection` from `connectors.models` (connector-layer type).
- It calls `file_processing` primitives (`xlsx_sheet_extraction`, `file_io_to_text`)
but does the connector-layer wrapping here so every connector that ingests
tabular data can share the same section shape.
"""
from typing import IO
from onyx.connectors.models import TabularSection
from onyx.file_processing.extract_file_text import file_io_to_text
from onyx.file_processing.extract_file_text import xlsx_sheet_extraction
from onyx.utils.logger import setup_logger
logger = setup_logger()
# Extensions routed through this helper instead of the generic
# `extract_text_and_images` path. Keep in sync with
# `OnyxFileExtensions.TABULAR_EXTENSIONS`.
TABULAR_FILE_EXTENSIONS = {".xlsx", ".csv", ".tsv"}
def is_tabular_file(file_name: str) -> bool:
"""Return True if the file extension indicates a tabular file
(xlsx, csv, tsv)."""
lowered = file_name.lower()
return any(lowered.endswith(ext) for ext in TABULAR_FILE_EXTENSIONS)
def tabular_file_to_sections(
file: IO[bytes],
file_name: str,
link: str = "",
) -> list[TabularSection]:
"""Convert a tabular file into one or more TabularSections.
- `.xlsx` → one TabularSection per non-empty sheet, with
`link=f"sheet:{title}"`.
- `.csv` / `.tsv` → a single TabularSection containing the full
decoded file, with `link=link` (falling back to `file_name` when
the caller doesn't provide one — `TabularSection.link` is required).
Returns an empty list when the file yields no extractable content
(empty workbook, empty csv, decode failure).
Raises `ValueError` if `file_name` isn't a recognized tabular
extension — callers should gate on `is_tabular_file` first.
"""
lowered = file_name.lower()
if lowered.endswith(".xlsx"):
return [
TabularSection(link=f"sheet:{sheet_title}", text=csv_text)
for csv_text, sheet_title in xlsx_sheet_extraction(
file, file_name=file_name
)
]
if lowered.endswith((".csv", ".tsv")):
try:
text = file_io_to_text(file).strip()
except Exception as e:
logger.warning(f"Failed to decode {file_name}: {e}")
return []
if not text:
return []
return [TabularSection(link=link or file_name, text=text)]
raise ValueError(f"{file_name!r} is not a tabular file")

View File

@@ -15,6 +15,10 @@ from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
)
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rate_limit_builder
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import rl_requests
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
is_tabular_file,
tabular_file_to_sections,
)
from onyx.connectors.drupal_wiki.models import DrupalWikiCheckpoint
from onyx.connectors.drupal_wiki.models import DrupalWikiPage
from onyx.connectors.drupal_wiki.models import DrupalWikiPageResponse
@@ -33,6 +37,7 @@ from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_processing.extract_file_text import get_file_ext
@@ -226,7 +231,7 @@ class DrupalWikiConnector(
Tuple of (sections, error_message). If error_message is not None, the
sections list should be treated as invalid.
"""
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
try:
if not self._validate_attachment_filetype(attachment):
@@ -273,6 +278,25 @@ class DrupalWikiConnector(
return sections, None
# Tabular attachments (xlsx, csv, tsv) — produce
# TabularSections instead of a flat TextSection.
if is_tabular_file(file_name):
try:
sections.extend(
tabular_file_to_sections(
BytesIO(raw_bytes),
file_name=file_name,
link=download_url,
)
)
except Exception as e:
logger.warning(
f"Failed to extract tabular sections from {file_name}: {e}"
)
if not sections:
return [], f"No content extracted from tabular file {file_name}"
return sections, None
image_counter = 0
def _store_embedded_image(image_data: bytes, image_name: str) -> None:

View File

@@ -12,6 +12,10 @@ from onyx.configs.constants import FileOrigin
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
process_onyx_metadata,
)
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
is_tabular_file,
tabular_file_to_sections,
)
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.models import Document
@@ -145,6 +149,39 @@ def _process_file(
logger.error(f"Failed to process image file {file_name}: {e}")
return []
# 1b) If the file is tabular (xlsx/csv/tsv), produce one
# TabularSection per sheet (or per file for csv/tsv) instead of
# flattening through the generic text extractor.
if is_tabular_file(file_name):
file.seek(0)
try:
tabular_sections = tabular_file_to_sections(
file=file,
file_name=file_name,
link=link or "",
)
except Exception as e:
logger.error(f"Failed to process tabular file {file_name}: {e}")
return []
if not tabular_sections:
logger.warning(f"No content extracted from tabular file {file_name}")
return []
return [
Document(
id=doc_id,
sections=list(tabular_sections),
source=source_type,
semantic_identifier=file_display_name,
title=title,
doc_updated_at=time_updated,
primary_owners=primary_owners,
secondary_owners=secondary_owners,
metadata=custom_tags,
)
]
# 2) Otherwise: text-based approach. Possibly with embedded images.
file.seek(0)

View File

@@ -1,3 +1,7 @@
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
is_tabular_file,
tabular_file_to_sections,
)
import io
from collections.abc import Callable
from datetime import datetime
@@ -28,15 +32,16 @@ from onyx.connectors.models import Document
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import ImageSection
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_processing.extract_file_text import get_file_ext
from onyx.file_processing.extract_file_text import pptx_to_text
from onyx.file_processing.extract_file_text import read_docx_file
from onyx.file_processing.extract_file_text import read_pdf_file
from onyx.file_processing.extract_file_text import xlsx_to_text
from onyx.file_processing.file_types import OnyxFileExtensions
from onyx.file_processing.file_types import OnyxMimeTypes
from onyx.file_processing.file_types import SPREADSHEET_MIME_TYPE
from onyx.file_processing.image_utils import store_image_and_create_section
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import (
@@ -289,7 +294,7 @@ def _download_and_extract_sections_basic(
service: GoogleDriveService,
allow_images: bool,
size_threshold: int,
) -> list[TextSection | ImageSection]:
) -> list[TextSection | ImageSection | TabularSection]:
"""Extract text and images from a Google Drive file."""
file_id = file["id"]
file_name = file["name"]
@@ -308,7 +313,7 @@ def _download_and_extract_sections_basic(
return []
# Store images for later processing
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
try:
section, embedded_id = store_image_and_create_section(
image_data=response_call(),
@@ -323,10 +328,9 @@ def _download_and_extract_sections_basic(
logger.error(f"Failed to process image {file_name}: {e}")
return sections
# For Google Docs, Sheets, and Slides, export as plain text
# For Google Docs, Sheets, and Slides, export via the Drive API
if mime_type in GOOGLE_MIME_TYPES_TO_EXPORT:
export_mime_type = GOOGLE_MIME_TYPES_TO_EXPORT[mime_type]
# Use the correct API call for exporting files
request = service.files().export_media(
fileId=file_id, mimeType=export_mime_type
)
@@ -335,6 +339,17 @@ def _download_and_extract_sections_basic(
logger.warning(f"Failed to export {file_name} as {export_mime_type}")
return []
if export_mime_type in OnyxMimeTypes.TABULAR_MIME_TYPES:
# Synthesize an extension on the filename
ext = ".xlsx" if export_mime_type == SPREADSHEET_MIME_TYPE else ".csv"
return list(
tabular_file_to_sections(
io.BytesIO(response),
file_name=f"{file_name}{ext}",
link=link,
)
)
text = response.decode("utf-8")
return [TextSection(link=link, text=text)]
@@ -356,9 +371,15 @@ def _download_and_extract_sections_basic(
elif (
mime_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
or is_tabular_file(file_name)
):
text = xlsx_to_text(io.BytesIO(response_call()), file_name=file_name)
return [TextSection(link=link, text=text)] if text else []
return list(
tabular_file_to_sections(
io.BytesIO(response_call()),
file_name=file_name,
link=link,
)
)
elif (
mime_type
@@ -410,8 +431,9 @@ def _find_nth(haystack: str, needle: str, n: int, start: int = 0) -> int:
def align_basic_advanced(
basic_sections: list[TextSection | ImageSection], adv_sections: list[TextSection]
) -> list[TextSection | ImageSection]:
basic_sections: list[TextSection | ImageSection | TabularSection],
adv_sections: list[TextSection],
) -> list[TextSection | ImageSection | TabularSection]:
"""Align the basic sections with the advanced sections.
In particular, the basic sections contain all content of the file,
including smart chips like dates and doc links. The advanced sections
@@ -428,7 +450,7 @@ def align_basic_advanced(
basic_full_text = "".join(
[section.text for section in basic_sections if isinstance(section, TextSection)]
)
new_sections: list[TextSection | ImageSection] = []
new_sections: list[TextSection | ImageSection | TabularSection] = []
heading_start = 0
for adv_ind in range(1, len(adv_sections)):
heading = adv_sections[adv_ind].text.split(HEADING_DELIMITER)[0]
@@ -599,7 +621,7 @@ def _convert_drive_item_to_document(
"""
Main entry point for converting a Google Drive file => Document object.
"""
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
# Only construct these services when needed
def _get_drive_service() -> GoogleDriveService:
@@ -639,7 +661,9 @@ def _convert_drive_item_to_document(
doc_id=file.get("id", ""),
)
if doc_sections:
sections = cast(list[TextSection | ImageSection], doc_sections)
sections = cast(
list[TextSection | ImageSection | TabularSection], doc_sections
)
if any(SMART_CHIP_CHAR in section.text for section in doc_sections):
logger.debug(
f"found smart chips in {file.get('name')}, aligning with basic sections"

View File

@@ -39,6 +39,7 @@ class SectionKind(str, Enum):
TEXT = "text"
IMAGE = "image"
TABULAR = "tabular"
class Section(BaseModel):
@@ -70,6 +71,15 @@ class ImageSection(Section):
return sys.getsizeof(self.image_file_id) + sys.getsizeof(self.link)
class TabularSection(Section):
"""Section containing tabular data (csv/tsv content, or one sheet of
an xlsx workbook rendered as CSV)."""
kind: SectionKind = SectionKind.TABULAR
text: str # CSV representation in a string
link: str
class BasicExpertInfo(BaseModel):
"""Basic Information for the owner of a document, any of the fields can be left as None
Display fallback goes as follows:
@@ -172,7 +182,7 @@ class DocumentBase(BaseModel):
"""Used for Onyx ingestion api, the ID is inferred before use if not provided"""
id: str | None = None
sections: list[TextSection | ImageSection]
sections: list[TextSection | ImageSection | TabularSection]
source: DocumentSource | None = None
semantic_identifier: str # displayed in the UI as the main identifier for the doc
# TODO(andrei): Ideally we could improve this to where each value is just a

View File

@@ -60,7 +60,12 @@ from onyx.connectors.models import ExternalAccess
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TabularSection
from onyx.connectors.models import TextSection
from onyx.connectors.cross_connector_utils.tabular_section_utils import (
is_tabular_file,
tabular_file_to_sections,
)
from onyx.connectors.sharepoint.connector_utils import get_sharepoint_external_access
from onyx.db.enums import HierarchyNodeType
from onyx.file_processing.extract_file_text import extract_text_and_images
@@ -586,7 +591,7 @@ def _convert_driveitem_to_document_with_permissions(
driveitem, f"Failed to download via graph api: {e}", e
)
sections: list[TextSection | ImageSection] = []
sections: list[TextSection | ImageSection | TabularSection] = []
file_ext = get_file_ext(driveitem.name)
if not content_bytes:
@@ -602,6 +607,19 @@ def _convert_driveitem_to_document_with_permissions(
)
image_section.link = driveitem.web_url
sections.append(image_section)
elif is_tabular_file(driveitem.name):
try:
sections.extend(
tabular_file_to_sections(
file=io.BytesIO(content_bytes),
file_name=driveitem.name,
link=driveitem.web_url or "",
)
)
except Exception as e:
logger.warning(
f"Failed to extract tabular sections for '{driveitem.name}': {e}"
)
else:
def _store_embedded_image(img_data: bytes, img_name: str) -> None:

View File

@@ -462,30 +462,13 @@ def _remove_empty_runs(
return result
def xlsx_sheet_extraction(file: IO[Any], file_name: str = "") -> list[tuple[str, str]]:
"""
Converts each sheet in the excel file to a csv condensed string.
Returns a string and the worksheet title for each worksheet
def xlsx_to_text(file: IO[Any], file_name: str = "") -> str:
# TODO: switch back to this approach in a few months when markitdown
# fixes their handling of excel files
# md = get_markitdown_converter()
# stream_info = StreamInfo(
# mimetype=SPREADSHEET_MIME_TYPE, filename=file_name or None, extension=".xlsx"
# )
# try:
# workbook = md.convert(to_bytesio(file), stream_info=stream_info)
# except (
# BadZipFile,
# ValueError,
# FileConversionException,
# UnsupportedFormatException,
# ) as e:
# error_str = f"Failed to extract text from {file_name or 'xlsx file'}: {e}"
# if file_name.startswith("~"):
# logger.debug(error_str + " (this is expected for files with ~)")
# else:
# logger.warning(error_str)
# return ""
# return workbook.markdown
Returns a list of (csv_text, sheet)
"""
try:
workbook = openpyxl.load_workbook(file, read_only=True)
except BadZipFile as e:
@@ -494,23 +477,30 @@ def xlsx_to_text(file: IO[Any], file_name: str = "") -> str:
logger.debug(error_str + " (this is expected for files with ~)")
else:
logger.warning(error_str)
return ""
return []
except Exception as e:
if any(s in str(e) for s in KNOWN_OPENPYXL_BUGS):
logger.error(
f"Failed to extract text from {file_name or 'xlsx file'}. This happens due to a bug in openpyxl. {e}"
)
return ""
return []
raise
text_content = []
sheets: list[tuple[str, str]] = []
for sheet in workbook.worksheets:
sheet_matrix = _clean_worksheet_matrix(_worksheet_to_matrix(sheet))
buf = io.StringIO()
writer = csv.writer(buf, lineterminator="\n")
writer.writerows(sheet_matrix)
text_content.append(buf.getvalue().rstrip("\n"))
return TEXT_SECTION_SEPARATOR.join(text_content)
csv_text = buf.getvalue().rstrip("\n")
if csv_text.strip():
sheets.append((csv_text, sheet.title))
return sheets
def xlsx_to_text(file: IO[Any], file_name: str = "") -> str:
sheets = xlsx_sheet_extraction(file, file_name)
return TEXT_SECTION_SEPARATOR.join(csv_text for csv_text, _title in sheets)
def eml_to_text(file: IO[Any]) -> str:

View File

@@ -7,6 +7,7 @@ from onyx.indexing.chunking.image_section_chunker import ImageChunker
from onyx.indexing.chunking.section_chunker import AccumulatorState
from onyx.indexing.chunking.section_chunker import ChunkPayload
from onyx.indexing.chunking.section_chunker import SectionChunker
from onyx.indexing.chunking.tabular_section_chunker import TabularChunker
from onyx.indexing.chunking.text_section_chunker import TextChunker
from onyx.indexing.models import DocAwareChunk
from onyx.natural_language_processing.utils import BaseTokenizer
@@ -38,6 +39,7 @@ class DocumentChunker:
chunk_splitter=chunk_splitter,
),
SectionKind.IMAGE: ImageChunker(),
SectionKind.TABULAR: TabularChunker(tokenizer=tokenizer),
}
def chunk(

View File

@@ -0,0 +1,333 @@
import csv
import io
from collections.abc import Callable
from pydantic import BaseModel
from onyx.connectors.models import Section
from onyx.indexing.chunking.section_chunker import AccumulatorState
from onyx.indexing.chunking.section_chunker import ChunkPayload
from onyx.indexing.chunking.section_chunker import SectionChunker
from onyx.indexing.chunking.section_chunker import SectionChunkerOutput
from onyx.natural_language_processing.utils import BaseTokenizer
from onyx.utils.logger import setup_logger
from shared_configs.configs import STRICT_CHUNK_TOKEN_LIMIT
logger = setup_logger()
# --- Markers / separators used in emitted chunks --------------------------
ROWS_MARKER = "Rows:"
COLUMNS_MARKER = "Columns:"
FIELD_VALUE_SEPARATOR = ", "
ROW_JOIN = "\n"
# Minimum per-chunk row budget. Guards against a prelude so large that no
# row could possibly fit — keeps at least a token or two of headroom so
# the chunk still carries something.
_MIN_ROW_BUDGET_TOKENS = 16
# --- Parsing --------------------------------------------------------------
class _ParsedSection(BaseModel):
sheet_name: str
link: str
headers: list[str]
rows: list[list[str]]
def _parse_section(section: Section) -> _ParsedSection | None:
"""Parse a CSV-encoded tabular section into headers + rows.
The first non-empty row is treated as the header. Blank rows are
skipped so stray separator lines don't produce ghost rows. A CSV
with only a header row is still parseable (returns empty rows).
"""
section_text = section.text or ""
if not section_text.strip():
return None
reader = csv.reader(io.StringIO(section_text))
non_empty_rows = [
row for row in reader if any(cell.strip() for cell in row)
]
if not non_empty_rows:
return None
return _ParsedSection(
sheet_name=section.link or "",
link=section.link or "",
headers=non_empty_rows[0],
rows=non_empty_rows[1:],
)
# --- Step 1: FORMATTING ---------------------------------------------------
#
# Converts header + row → a single formatted string. Swap these out to
# change the textual representation of rows in chunks (e.g. JSON-line,
# bullet-list, markdown table row, etc.) without touching packing.
def format_columns_header(headers: list[str]) -> str:
"""Format the 'Columns:' line that appears in every chunk's prelude."""
return f"{COLUMNS_MARKER} " + FIELD_VALUE_SEPARATOR.join(headers)
def format_row_field_value(headers: list[str], row: list[str]) -> str:
"""Format one row as ``col=val, col=val, ...``.
- Missing trailing cells (row shorter than headers) are treated as empty.
- Empty values are dropped; omitting them keeps chunks dense with
retrieval-relevant content rather than padded with ``col=``.
"""
parts: list[str] = []
for i, header in enumerate(headers):
value = row[i] if i < len(row) else ""
if not value.strip():
continue
parts.append(f"{header}={value}")
return FIELD_VALUE_SEPARATOR.join(parts)
# --- Step 2: PACKING ------------------------------------------------------
#
# Given formatted row strings + a prelude + a token budget, emit a list of
# chunk strings that each fit within the budget. Swap this out to change
# the packing strategy (e.g. one-row-per-chunk, fixed-row-count, etc.)
# without touching formatting.
class _RowPacker:
"""Packs formatted rows into chunks under a token limit.
Each emitted chunk looks like::
<prelude>
<row 1>
<row 2>
...
The prelude is repeated at the top of every chunk so each chunk is
self-describing for downstream retrieval.
"""
def __init__(
self,
prelude: str,
token_counter: Callable[[str], int],
max_tokens: int,
strict: bool,
) -> None:
self.prelude = prelude
self.token_counter = token_counter
self.max_tokens = max_tokens
self.strict = strict
prelude_tokens = token_counter(prelude)
# Budget for the rows alone, reserving room for the prelude plus
# the newline that joins it to the row block.
self._row_budget = max(
_MIN_ROW_BUDGET_TOKENS, max_tokens - prelude_tokens - 1
)
def pack(self, rows: list[str]) -> list[str]:
chunks: list[str] = []
buf: list[str] = []
buf_tokens = 0
for row in rows:
if not row:
continue
row_tokens = self.token_counter(row)
# Row that won't fit its own chunk: flush, split, emit each
# piece as a standalone chunk.
if row_tokens > self._row_budget:
if buf:
chunks.append(self._assemble(buf))
buf, buf_tokens = [], 0
for piece in self._split_oversized_row(row):
chunks.append(self._assemble([piece]))
continue
# +1 accounts for the newline separating rows in the buffer.
sep_tokens = 1 if buf else 0
if buf and buf_tokens + sep_tokens + row_tokens > self._row_budget:
chunks.append(self._assemble(buf))
buf, buf_tokens = [], 0
sep_tokens = 0
buf.append(row)
buf_tokens += sep_tokens + row_tokens
if buf:
chunks.append(self._assemble(buf))
return chunks
def _assemble(self, rows: list[str]) -> str:
return self.prelude + ROW_JOIN + ROW_JOIN.join(rows)
def _split_oversized_row(self, row: str) -> list[str]:
"""Split a single over-budget row.
First pass splits at ``field=value`` boundaries to preserve the
column-level structure. If ``strict`` is set and any resulting
piece is still over budget, fall back to a hard character-level
split so no chunk ever exceeds ``max_tokens``.
"""
pieces = _split_by_field_boundary(
row, self._row_budget, self.token_counter
)
if not self.strict:
return pieces
out: list[str] = []
for piece in pieces:
if self.token_counter(piece) > self._row_budget:
out.extend(_hard_split_by_chars(piece, self._row_budget, self.token_counter))
else:
out.append(piece)
return out
def _split_by_field_boundary(
row: str,
max_tokens: int,
token_counter: Callable[[str], int],
) -> list[str]:
"""Greedy split of a ``col=val, col=val, ...`` row at ``, `` boundaries."""
parts = row.split(FIELD_VALUE_SEPARATOR)
pieces: list[str] = []
buf: list[str] = []
buf_tokens = 0
sep_tokens = token_counter(FIELD_VALUE_SEPARATOR)
for part in parts:
part_tokens = token_counter(part)
add_sep = sep_tokens if buf else 0
if buf and buf_tokens + add_sep + part_tokens > max_tokens:
pieces.append(FIELD_VALUE_SEPARATOR.join(buf))
buf, buf_tokens = [part], part_tokens
else:
buf.append(part)
buf_tokens += add_sep + part_tokens
if buf:
pieces.append(FIELD_VALUE_SEPARATOR.join(buf))
return pieces
def _hard_split_by_chars(
text: str,
max_tokens: int,
token_counter: Callable[[str], int],
) -> list[str]:
"""Last-resort character split when field-level splitting can't
reduce a piece below ``max_tokens`` (e.g. a single field contains a
giant value). Approximates via chars-per-token from the input string
itself, then slices."""
total_tokens = max(1, token_counter(text))
approx_chars_per_token = max(1, len(text) // total_tokens)
window = max(1, max_tokens * approx_chars_per_token)
return [text[i : i + window] for i in range(0, len(text), window)]
# --- Step 3: ORCHESTRATION ------------------------------------------------
class TabularChunker(SectionChunker):
"""Chunks tabular sections (csv text) into row-packed field=value chunks.
Each emitted chunk carries a prelude (sheet name + Rows: marker +
Columns: header line) followed by as many ``col=val, col=val``
rows as fit under ``content_token_limit``. Rows too large for a
single chunk are split at field boundaries (and, under
``STRICT_CHUNK_TOKEN_LIMIT``, hard-split by characters as a fallback).
"""
def __init__(self, tokenizer: BaseTokenizer) -> None:
self.tokenizer = tokenizer
def chunk_section(
self,
section: Section,
accumulator: AccumulatorState,
content_token_limit: int,
) -> SectionChunkerOutput:
assert section.text is not None
parsed = _parse_section(section)
if parsed is None:
logger.warning(
f"TabularChunker: skipping unparseable section (link={section.link})"
)
return SectionChunkerOutput(payloads=[], accumulator=accumulator)
# Tabular sections are structurally standalone — flush any pending
# text buffer before emitting our own chunks, matching ImageChunker.
payloads = accumulator.flush_to_list()
prelude = self._build_prelude(parsed)
formatted_rows = [
line
for line in (
format_row_field_value(parsed.headers, row)
for row in parsed.rows
)
if line
]
# Header-only table (no non-empty rows): emit a single
# prelude-only chunk so the column schema is still indexed.
if not formatted_rows:
payloads.append(
ChunkPayload(
text=prelude,
links={0: parsed.link},
is_continuation=False,
)
)
return SectionChunkerOutput(
payloads=payloads,
accumulator=AccumulatorState(),
)
packer = _RowPacker(
prelude=prelude,
token_counter=self._count_tokens,
max_tokens=content_token_limit,
strict=STRICT_CHUNK_TOKEN_LIMIT,
)
chunk_texts = packer.pack(formatted_rows)
for i, text in enumerate(chunk_texts):
payloads.append(
ChunkPayload(
text=text,
links={0: parsed.link},
is_continuation=(i != 0),
)
)
return SectionChunkerOutput(
payloads=payloads,
accumulator=AccumulatorState(),
)
def _build_prelude(self, parsed: _ParsedSection) -> str:
"""The per-chunk header: sheet name (if any) + ``Rows:`` marker
+ ``Columns:`` header line. Swap this to change the prelude shape."""
parts: list[str] = []
if parsed.sheet_name:
parts.append(parsed.sheet_name)
parts.append(ROWS_MARKER)
parts.append(format_columns_header(parsed.headers))
return ROW_JOIN.join(parts)
def _count_tokens(self, text: str) -> int:
return len(self.tokenizer.encode(text))

View File

@@ -4,6 +4,7 @@ from typing import cast
import openpyxl
from openpyxl.worksheet.worksheet import Worksheet
from onyx.file_processing.extract_file_text import xlsx_sheet_extraction
from onyx.file_processing.extract_file_text import xlsx_to_text
@@ -196,3 +197,136 @@ class TestXlsxToText:
assert "r1c1" in lines[0] and "r1c2" in lines[0]
assert "r2c1" in lines[1] and "r2c2" in lines[1]
assert "r3c1" in lines[2] and "r3c2" in lines[2]
class TestXlsxSheetExtraction:
def test_one_tuple_per_sheet(self) -> None:
xlsx = _make_xlsx(
{
"Revenue": [["Month", "Amount"], ["Jan", "100"]],
"Expenses": [["Category", "Cost"], ["Rent", "500"]],
}
)
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 2
# Order preserved from workbook sheet order
titles = [title for _csv, title in sheets]
assert titles == ["Revenue", "Expenses"]
# Content present in the right tuple
revenue_csv, _ = sheets[0]
expenses_csv, _ = sheets[1]
assert "Month" in revenue_csv
assert "Jan" in revenue_csv
assert "Category" in expenses_csv
assert "Rent" in expenses_csv
def test_tuple_structure_is_csv_text_then_title(self) -> None:
"""The tuple order is (csv_text, sheet_title) — pin it so callers
that unpack positionally don't silently break."""
xlsx = _make_xlsx({"MySheet": [["a", "b"]]})
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 1
csv_text, title = sheets[0]
assert title == "MySheet"
assert "a" in csv_text
assert "b" in csv_text
def test_empty_sheet_is_skipped(self) -> None:
"""A sheet whose CSV output is empty/whitespace-only should NOT
appear in the result — the `if csv_text.strip():` guard filters
it out."""
xlsx = _make_xlsx(
{
"Data": [["a", "b"]],
"Empty": [],
}
)
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 1
assert sheets[0][1] == "Data"
def test_empty_workbook_returns_empty_list(self) -> None:
"""All sheets empty → empty list (not a list of empty tuples)."""
xlsx = _make_xlsx({"Sheet1": [], "Sheet2": []})
sheets = xlsx_sheet_extraction(xlsx)
assert sheets == []
def test_single_sheet(self) -> None:
xlsx = _make_xlsx({"Only": [["x", "y"], ["1", "2"]]})
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 1
csv_text, title = sheets[0]
assert title == "Only"
assert "x" in csv_text
assert "1" in csv_text
def test_bad_zip_returns_empty_list(self) -> None:
bad_file = io.BytesIO(b"not a zip file")
sheets = xlsx_sheet_extraction(bad_file, file_name="test.xlsx")
assert sheets == []
def test_bad_zip_tilde_file_returns_empty_list(self) -> None:
"""`~$`-prefixed files are Excel lock files; failure should log
at debug (not warning) and still return []."""
bad_file = io.BytesIO(b"not a zip file")
sheets = xlsx_sheet_extraction(bad_file, file_name="~$temp.xlsx")
assert sheets == []
def test_csv_content_matches_xlsx_to_text_per_sheet(self) -> None:
"""For a single-sheet workbook, xlsx_to_text output should equal
the csv_text from xlsx_sheet_extraction — they share the same
per-sheet CSV-ification logic."""
single_sheet_data = [["Name", "Age"], ["Alice", "30"]]
expected_text = xlsx_to_text(_make_xlsx({"People": single_sheet_data}))
sheets = xlsx_sheet_extraction(_make_xlsx({"People": single_sheet_data}))
assert len(sheets) == 1
csv_text, title = sheets[0]
assert title == "People"
assert csv_text.strip() == expected_text.strip()
def test_commas_in_cells_are_quoted(self) -> None:
xlsx = _make_xlsx({"S1": [["hello, world", "normal"]]})
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 1
csv_text, _ = sheets[0]
assert '"hello, world"' in csv_text
def test_long_empty_row_run_capped_within_sheet(self) -> None:
"""The matrix cleanup applies per-sheet: >2 empty rows collapse
to 2, which keeps the sheet non-empty and it still appears in
the result."""
xlsx = _make_xlsx(
{
"S1": [
["header"],
[""],
[""],
[""],
[""],
["data"],
]
}
)
sheets = xlsx_sheet_extraction(xlsx)
assert len(sheets) == 1
csv_text, _ = sheets[0]
lines = [line for line in csv_text.strip().split("\n") if line.strip()]
# header + 2 empty (capped) + data = 4 lines
assert len(lines) == 4
assert "header" in lines[0]
assert "data" in lines[-1]
def test_sheet_title_with_special_chars_preserved(self) -> None:
"""Spaces, punctuation, unicode in sheet titles are preserved
verbatim — the title is used as a link anchor downstream."""
xlsx = _make_xlsx(
{
"Q1 Revenue (USD)": [["a", "b"]],
"Données": [["c", "d"]],
}
)
sheets = xlsx_sheet_extraction(xlsx)
titles = [title for _csv, title in sheets]
assert "Q1 Revenue (USD)" in titles
assert "Données" in titles

View File

@@ -0,0 +1,312 @@
"""End-to-end tests for `TabularChunker.chunk_section`.
Each test is structured as:
INPUT — the CSV text passed to the chunker + token budget + link
EXPECTED — the exact chunk texts the chunker should emit
ACT — a single call to `chunk_section`
ASSERT — literal equality against the expected chunk texts
A character-level tokenizer (1 char == 1 token) is used so token-budget
arithmetic is deterministic and expected chunks can be spelled out
exactly.
"""
from onyx.connectors.models import Section
from onyx.connectors.models import SectionKind
from onyx.indexing.chunking.section_chunker import AccumulatorState
from onyx.indexing.chunking.tabular_section_chunker import TabularChunker
from onyx.natural_language_processing.utils import BaseTokenizer
class CharTokenizer(BaseTokenizer):
def encode(self, string: str) -> list[int]:
return [ord(c) for c in string]
def tokenize(self, string: str) -> list[str]:
return list(string)
def decode(self, tokens: list[int]) -> str:
return "".join(chr(t) for t in tokens)
def _make_chunker() -> TabularChunker:
return TabularChunker(tokenizer=CharTokenizer())
def _tabular_section(text: str, link: str = "sheet:Test") -> Section:
return Section(kind=SectionKind.TABULAR, text=text, link=link)
class TestTabularChunkerChunkSection:
def test_simple_csv_all_rows_fit_one_chunk(self) -> None:
# --- INPUT -----------------------------------------------------
csv_text = (
"Name,Age,City\n"
"Alice,30,NYC\n"
"Bob,25,SF\n"
)
link = "sheet:People"
content_token_limit = 500
# --- EXPECTED --------------------------------------------------
expected_texts = [
(
"sheet:People\n"
"Rows:\n"
"Columns: Name, Age, City\n"
"Name=Alice, Age=30, City=NYC\n"
"Name=Bob, Age=25, City=SF"
),
]
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
AccumulatorState(),
content_token_limit=content_token_limit,
)
# --- ASSERT ----------------------------------------------------
assert [p.text for p in out.payloads] == expected_texts
assert [p.is_continuation for p in out.payloads] == [False]
assert all(p.links == {0: link} for p in out.payloads)
assert out.accumulator.is_empty()
def test_overflow_splits_into_two_deterministic_chunks(self) -> None:
# --- INPUT -----------------------------------------------------
# prelude = "sheet:S\nRows:\nColumns: col, val" (31 chars = 31 tokens)
# At content_token_limit=57, row_budget = max(16, 57-31-1) = 25.
# Each row "col=a, val=1" is 12 tokens; two rows + \n = 25 (fits),
# three rows + 2×\n = 38 (overflows) → split after 2 rows.
csv_text = (
"col,val\n"
"a,1\n"
"b,2\n"
"c,3\n"
"d,4\n"
)
link = "sheet:S"
content_token_limit = 57
# --- EXPECTED --------------------------------------------------
expected_texts = [
(
"sheet:S\n"
"Rows:\n"
"Columns: col, val\n"
"col=a, val=1\n"
"col=b, val=2"
),
(
"sheet:S\n"
"Rows:\n"
"Columns: col, val\n"
"col=c, val=3\n"
"col=d, val=4"
),
]
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
AccumulatorState(),
content_token_limit=content_token_limit,
)
# --- ASSERT ----------------------------------------------------
assert [p.text for p in out.payloads] == expected_texts
# First chunk is fresh; subsequent chunks mark as continuations.
assert [p.is_continuation for p in out.payloads] == [False, True]
# Link carries through every chunk.
assert all(p.links == {0: link} for p in out.payloads)
def test_header_only_csv_produces_single_prelude_chunk(self) -> None:
# --- INPUT -----------------------------------------------------
csv_text = "col1,col2\n"
link = "sheet:Headers"
# --- EXPECTED --------------------------------------------------
expected_texts = [
"sheet:Headers\nRows:\nColumns: col1, col2",
]
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
AccumulatorState(),
content_token_limit=500,
)
# --- ASSERT ----------------------------------------------------
assert [p.text for p in out.payloads] == expected_texts
def test_empty_cells_dropped_from_chunk_text(self) -> None:
# --- INPUT -----------------------------------------------------
# Alice's Age is empty; Bob's City is empty. Empty cells should
# not appear as `field=` pairs in the output.
csv_text = (
"Name,Age,City\n"
"Alice,,NYC\n"
"Bob,25,\n"
)
link = "sheet:P"
# --- EXPECTED --------------------------------------------------
expected_texts = [
(
"sheet:P\n"
"Rows:\n"
"Columns: Name, Age, City\n"
"Name=Alice, City=NYC\n"
"Name=Bob, Age=25"
),
]
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
AccumulatorState(),
content_token_limit=500,
)
# --- ASSERT ----------------------------------------------------
assert [p.text for p in out.payloads] == expected_texts
def test_quoted_commas_in_csv_preserved_as_one_field(self) -> None:
# --- INPUT -----------------------------------------------------
# "Hello, world" is quoted in the CSV, so it's a single field
# value containing a comma — not two cells.
csv_text = (
'Name,Notes\n'
'Alice,"Hello, world"\n'
)
link = "sheet:P"
# --- EXPECTED --------------------------------------------------
expected_texts = [
(
"sheet:P\n"
"Rows:\n"
"Columns: Name, Notes\n"
"Name=Alice, Notes=Hello, world"
),
]
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
AccumulatorState(),
content_token_limit=500,
)
# --- ASSERT ----------------------------------------------------
assert [p.text for p in out.payloads] == expected_texts
def test_blank_rows_in_csv_are_skipped(self) -> None:
# --- INPUT -----------------------------------------------------
# Stray blank rows in the CSV (e.g. export artifacts) shouldn't
# produce ghost rows in the output.
csv_text = (
"A,B\n"
"\n"
"1,2\n"
"\n"
"\n"
"3,4\n"
)
link = "sheet:S"
# --- EXPECTED --------------------------------------------------
expected_texts = [
(
"sheet:S\n"
"Rows:\n"
"Columns: A, B\n"
"A=1, B=2\n"
"A=3, B=4"
),
]
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
AccumulatorState(),
content_token_limit=500,
)
# --- ASSERT ----------------------------------------------------
assert [p.text for p in out.payloads] == expected_texts
def test_accumulator_flushes_before_tabular_chunks(self) -> None:
# --- INPUT -----------------------------------------------------
# A text accumulator was populated by the prior text section.
# Tabular sections are structural boundaries, so the pending
# text is flushed as its own chunk before the tabular content.
pending_text = "prior paragraph from an earlier text section"
pending_link = "prev-link"
csv_text = (
"a,b\n"
"1,2\n"
)
link = "sheet:S"
# --- EXPECTED --------------------------------------------------
expected_texts = [
pending_text, # flushed accumulator
(
"sheet:S\n"
"Rows:\n"
"Columns: a, b\n"
"a=1, b=2"
),
]
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section(csv_text, link=link),
AccumulatorState(
text=pending_text,
link_offsets={0: pending_link},
),
content_token_limit=500,
)
# --- ASSERT ----------------------------------------------------
assert [p.text for p in out.payloads] == expected_texts
# Flushed chunk keeps the prior text's link; tabular chunk uses
# the tabular section's link.
assert out.payloads[0].links == {0: pending_link}
assert out.payloads[1].links == {0: link}
# Accumulator resets — tabular section is a structural boundary.
assert out.accumulator.is_empty()
def test_empty_tabular_section_returns_no_payloads_and_preserves_accumulator(
self,
) -> None:
# --- INPUT -----------------------------------------------------
# Malformed/empty tabular section should not flush the text
# accumulator — the caller (DocumentChunker) handles skip logic;
# we preserve the accumulator so subsequent sections can use it.
pending_text = "prior paragraph"
pending_link_offsets = {0: "prev-link"}
# --- EXPECTED --------------------------------------------------
expected_texts: list[str] = []
expected_accumulator_text = pending_text
expected_accumulator_offsets = pending_link_offsets
# --- ACT -------------------------------------------------------
out = _make_chunker().chunk_section(
_tabular_section("", link="sheet:Empty"),
AccumulatorState(
text=pending_text,
link_offsets=pending_link_offsets,
),
content_token_limit=500,
)
# --- ASSERT ----------------------------------------------------
assert [p.text for p in out.payloads] == expected_texts
assert out.accumulator.text == expected_accumulator_text
assert out.accumulator.link_offsets == expected_accumulator_offsets