Compare commits

...

4 Commits

Author SHA1 Message Date
Evan Lohn
4a24fb539b more meta 2025-09-18 15:05:54 -07:00
Evan Lohn
e09fa3c2b2 file names and extensions etc 2025-09-18 15:00:34 -07:00
Evan Lohn
7a24d65e2a even more specific logs 2025-09-18 13:01:02 -07:00
Evan Lohn
d6786f1578 sharepoint memory issue debugging 2025-09-17 13:32:15 -07:00
6 changed files with 583 additions and 47 deletions

View File

@@ -100,6 +100,7 @@ from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.redis.redis_utils import is_fence
from onyx.server.runtime.onyx_runtime import OnyxRuntime
from onyx.utils.logger import setup_logger
from onyx.utils.memory_logger import log_memory_usage
from onyx.utils.middleware import make_randomized_onyx_request_id
from onyx.utils.telemetry import optional_telemetry
from onyx.utils.telemetry import RecordType
@@ -1279,6 +1280,10 @@ def _docprocessing_task(
f"batch_num={batch_num} "
)
log_memory_usage(
f"docprocessing_task:start:batch_{batch_num}:attempt_{index_attempt_id}"
)
# Get the document batch storage
storage = get_document_batch_storage(cc_pair_id, index_attempt_id)
@@ -1297,7 +1302,16 @@ def _docprocessing_task(
per_batch_lock: RedisLock | None = None
try:
# Retrieve documents from storage
log_memory_usage(
f"docprocessing_task:before_load_batch_{batch_num}:attempt_{index_attempt_id}"
)
documents = storage.get_batch(batch_num)
log_memory_usage(
f"docprocessing_task:after_load_batch_{batch_num}:attempt_{index_attempt_id}",
documents,
f"batch_{batch_num}_documents",
)
if not documents:
task_logger.error(f"No documents found for batch {batch_num}")
return
@@ -1369,6 +1383,12 @@ def _docprocessing_task(
f"Processing {len(documents)} documents through indexing pipeline"
)
log_memory_usage(
f"docprocessing_task:before_indexing_pipeline:batch_{batch_num}:attempt_{index_attempt_id}",
documents,
f"batch_{batch_num}_documents_before_pipeline",
)
# real work happens here!
index_pipeline_result = run_indexing_pipeline(
embedder=embedding_model,
@@ -1381,6 +1401,12 @@ def _docprocessing_task(
index_attempt_metadata=index_attempt_metadata,
)
log_memory_usage(
f"docprocessing_task:after_indexing_pipeline:batch_{batch_num}:attempt_{index_attempt_id}",
index_pipeline_result,
f"batch_{batch_num}_pipeline_result",
)
# Update batch completion and document counts atomically using database coordination
with get_session_with_current_tenant() as db_session, cross_batch_db_lock:
@@ -1458,7 +1484,14 @@ def _docprocessing_task(
f"elapsed={elapsed_time:.2f}s"
)
log_memory_usage(
f"docprocessing_task:completed:batch_{batch_num}:attempt_{index_attempt_id}"
)
except Exception:
log_memory_usage(
f"docprocessing_task:exception:batch_{batch_num}:attempt_{index_attempt_id}"
)
task_logger.exception(
f"Document batch processing failed: "
f"batch_num={batch_num} "

View File

@@ -60,6 +60,7 @@ from onyx.file_processing.extract_file_text import get_file_ext
from onyx.file_processing.file_validation import EXCLUDED_IMAGE_TYPES
from onyx.file_processing.image_utils import store_image_and_create_section
from onyx.utils.logger import setup_logger
from onyx.utils.memory_logger import log_memory_usage
logger = setup_logger()
SLIM_BATCH_SIZE = 1000
@@ -247,6 +248,7 @@ def _download_with_cap(url: str, timeout: int, cap: int) -> bytes:
- Raises `SizeCapExceeded` when the cap would be exceeded.
- Returns the full bytes if the content fits within `cap`.
"""
log_memory_usage("_download_with_cap:start", url, "download_url")
with requests.get(url, stream=True, timeout=timeout) as resp:
resp.raise_for_status()
@@ -273,7 +275,9 @@ def _download_with_cap(url: str, timeout: int, cap: int) -> bytes:
)
raise SizeCapExceeded("during_download")
return buf.getvalue()
content_bytes = buf.getvalue()
log_memory_usage("_download_with_cap:end", content_bytes, "downloaded_content")
return content_bytes
def _download_via_sdk_with_cap(
@@ -283,6 +287,7 @@ def _download_via_sdk_with_cap(
Raises SizeCapExceeded("during_sdk_download") if the cap would be exceeded.
"""
log_memory_usage("_download_via_sdk_with_cap:start", driveitem.id, "driveitem_id")
buf = io.BytesIO()
def on_chunk(bytes_read: int) -> None:
@@ -294,7 +299,11 @@ def _download_via_sdk_with_cap(
driveitem.download_session(buf, chunk_downloaded=on_chunk, chunk_size=chunk_size)
# Execute the configured request with retries using existing helper
sleep_and_retry(driveitem.context, "download_session")
return buf.getvalue()
content_bytes = buf.getvalue()
log_memory_usage(
"_download_via_sdk_with_cap:end", content_bytes, "sdk_downloaded_content"
)
return content_bytes
def _convert_driveitem_to_document_with_permissions(
@@ -304,6 +313,11 @@ def _convert_driveitem_to_document_with_permissions(
graph_client: GraphClient,
include_permissions: bool = False,
) -> Document | None:
log_memory_usage(
f"_convert_driveitem_to_document_with_permissions:start:{drive_name}:{driveitem.name}:{driveitem.resource_url}",
driveitem.id,
"driveitem_id",
)
if not driveitem.name or not driveitem.id:
raise ValueError("DriveItem name/id is required")
@@ -345,6 +359,11 @@ def _convert_driveitem_to_document_with_permissions(
# Prefer downloadUrl streaming with size cap
content_bytes: bytes | None = None
log_memory_usage(
"_convert_driveitem_to_document_with_permissions:before_download",
driveitem.id,
"driveitem_id",
)
if download_url:
try:
# Use this to test the sdk size cap
@@ -364,6 +383,11 @@ def _convert_driveitem_to_document_with_permissions(
)
# Fallback to SDK content if needed
log_memory_usage(
"_convert_driveitem_to_document_with_permissions:after_primary_download",
content_bytes,
"content_bytes",
)
if content_bytes is None:
try:
content_bytes = _download_via_sdk_with_cap(
@@ -404,11 +428,21 @@ def _convert_driveitem_to_document_with_permissions(
image_section.link = driveitem.web_url
sections.append(image_section)
log_memory_usage(
"_convert_driveitem_to_document_with_permissions:before_text_extraction",
content_bytes,
"content_bytes",
)
extraction_result = extract_text_and_images(
file=io.BytesIO(content_bytes),
file_name=driveitem.name,
image_callback=_store_embedded_image,
)
log_memory_usage(
"_convert_driveitem_to_document_with_permissions:after_text_extraction",
extraction_result,
"extraction_result",
)
if extraction_result.text_content:
sections.append(
TextSection(link=driveitem.web_url, text=extraction_result.text_content)
@@ -447,6 +481,9 @@ def _convert_driveitem_to_document_with_permissions(
],
metadata={"drive": drive_name},
)
log_memory_usage(
"_convert_driveitem_to_document_with_permissions:end", doc, "final_document"
)
return doc
@@ -458,6 +495,9 @@ def _convert_sitepage_to_document(
include_permissions: bool = False,
) -> Document:
"""Convert a SharePoint site page to a Document object."""
log_memory_usage(
"_convert_sitepage_to_document:start", site_page.get("id"), "site_page_id"
)
# Extract text content from the site page
page_text = ""
# Get title and description
@@ -474,11 +514,32 @@ def _convert_sitepage_to_document(
canvas_layout = site_page.get("canvasLayout", {})
if canvas_layout:
horizontal_sections = canvas_layout.get("horizontalSections", [])
for section in horizontal_sections:
log_memory_usage(
"_convert_sitepage_to_document:processing_sections",
horizontal_sections,
"horizontal_sections",
)
for section_idx, section in enumerate(horizontal_sections):
columns = section.get("columns", [])
for column in columns:
log_memory_usage(
f"_convert_sitepage_to_document:processing_columns_{section_idx}",
columns,
"columns",
)
for column_idx, column in enumerate(columns):
webparts = column.get("webparts", [])
for webpart in webparts:
log_memory_usage(
f"_convert_sitepage_to_document:processing_webparts_{section_idx}_{column_idx}",
webparts,
"webparts",
)
for webpart_idx, webpart in enumerate(webparts):
if webpart_idx % 5 == 0: # Log every 5 webparts
log_memory_usage(
f"_convert_sitepage_to_document:processing_webpart_{section_idx}_{column_idx}_{webpart_idx}",
page_text,
"accumulated_page_text",
)
# Extract text from different types of webparts
webpart_type = webpart.get("@odata.type", "")
@@ -517,7 +578,12 @@ def _convert_sitepage_to_document(
"searchablePlainTexts", []
)
for text_item in searchable_texts:
log_memory_usage(
"_convert_sitepage_to_document:processing_searchable_texts",
searchable_texts,
"searchable_texts",
)
for text_idx, text_item in enumerate(searchable_texts):
if isinstance(text_item, dict):
key = text_item.get("key", "")
value = text_item.get("value", "")
@@ -527,6 +593,12 @@ def _convert_sitepage_to_document(
page_text += f"## {value}\n\n"
else:
page_text += f"{value}\n\n"
if text_idx % 10 == 0: # Log every 10 text items
log_memory_usage(
f"_convert_sitepage_to_document:processed_text_{text_idx}",
page_text,
"accumulated_text",
)
# Extract description if available
description = data.get("description", "")
@@ -605,6 +677,9 @@ def _convert_sitepage_to_document(
else {}
),
)
log_memory_usage(
"_convert_sitepage_to_document:end", doc, "final_site_page_document"
)
return doc
@@ -735,6 +810,11 @@ class SharepointConnector(
start: datetime | None = None,
end: datetime | None = None,
) -> list[DriveItem]:
log_memory_usage(
f"_get_drive_items_for_drive_name:start:{drive_name}",
site_descriptor.url,
"site_url",
)
try:
site = self.graph_client.sites.get_by_url(site_descriptor.url)
drives = site.drives.get().execute_query()
@@ -757,11 +837,19 @@ class SharepointConnector(
root_folder = root_folder.get_by_path(folder_part)
# TODO: consider ways to avoid materializing the entire list of files in memory
log_memory_usage(
f"_get_drive_items_for_drive_name:before_query:{drive_name}"
)
query = root_folder.get_files(
recursive=True,
page_size=1000,
)
driveitems = query.execute_query()
log_memory_usage(
f"_get_drive_items_for_drive_name:after_query:{drive_name}",
driveitems,
"driveitems_list",
)
logger.debug(f"Found {len(driveitems)} items in drive '{drive_name}'")
# Filter items based on folder path if specified
@@ -805,7 +893,13 @@ class SharepointConnector(
f"Found {len(driveitems)} items within time window in drive '{drive.name}'"
)
return list(driveitems)
final_driveitems = list(driveitems)
log_memory_usage(
f"_get_drive_items_for_drive_name:end:{drive_name}",
final_driveitems,
"final_driveitems",
)
return final_driveitems
except Exception as e:
# Some drives might not be accessible
@@ -832,6 +926,7 @@ class SharepointConnector(
start: datetime | None = None,
end: datetime | None = None,
) -> list[tuple[DriveItem, str]]:
log_memory_usage("_fetch_driveitems:start", site_descriptor.url, "site_url")
final_driveitems: list[tuple[DriveItem, str]] = []
try:
site = self.graph_client.sites.get_by_url(site_descriptor.url)
@@ -856,21 +951,44 @@ class SharepointConnector(
return []
# Process each matching drive
for drive in drives:
log_memory_usage(
"_fetch_driveitems:before_drives_loop", drives, "drives_to_process"
)
for drive_idx, drive in enumerate(drives):
log_memory_usage(
f"_fetch_driveitems:processing_drive_{drive_idx}:{drive.name}",
final_driveitems,
"final_driveitems_before_drive",
)
try:
root_folder = drive.root
if site_descriptor.folder_path:
# If a specific folder is requested, navigate to it
for folder_part in site_descriptor.folder_path.split("/"):
folder_parts = site_descriptor.folder_path.split("/")
log_memory_usage(
f"_fetch_driveitems:navigating_folders:{drive.name}",
folder_parts,
"folder_parts",
)
for part_idx, folder_part in enumerate(folder_parts):
log_memory_usage(
f"_fetch_driveitems:navigating_part_{part_idx}:{folder_part}"
)
root_folder = root_folder.get_by_path(folder_part)
# Get all items recursively
# TODO: consider ways to avoid materializing the entire list of files in memory
log_memory_usage(f"_fetch_driveitems:before_query:{drive.name}")
query = root_folder.get_files(
recursive=True,
page_size=1000,
)
driveitems = query.execute_query()
log_memory_usage(
f"_fetch_driveitems:after_query:{drive.name}",
driveitems,
"drive_items",
)
logger.debug(
f"Found {len(driveitems)} items in drive '{drive.name}'"
)
@@ -922,8 +1040,24 @@ class SharepointConnector(
f"Found {len(driveitems)} items within time window in drive '{drive.name}'"
)
for item in driveitems:
log_memory_usage(
f"_fetch_driveitems:before_append_loop:{drive.name}",
driveitems,
"drive_items_to_append",
)
for item_idx, item in enumerate(driveitems):
if item_idx % 100 == 0: # Log every 100 items
log_memory_usage(
f"_fetch_driveitems:appending_item_{item_idx}:{drive.name}",
final_driveitems,
"final_driveitems_accumulator",
)
final_driveitems.append((item, drive_name or ""))
log_memory_usage(
f"_fetch_driveitems:after_append_loop:{drive.name}",
final_driveitems,
"final_driveitems_after_drive",
)
except Exception as e:
# Some drives might not be accessible
@@ -942,34 +1076,65 @@ class SharepointConnector(
# but this is fine, as there are no actual documents in those
logger.warning(f"Failed to process site: {err_str}")
log_memory_usage("_fetch_driveitems:end", final_driveitems, "final_driveitems")
return final_driveitems
def _handle_paginated_sites(
self, sites: SitesWithRoot
) -> Generator[Site, None, None]:
page_count = 0
while sites:
log_memory_usage(
f"_handle_paginated_sites:processing_page_{page_count}",
sites.current_page,
"current_page",
)
if sites.current_page:
yield from sites.current_page
for site_idx, site in enumerate(sites.current_page):
if site_idx % 50 == 0: # Log every 50 sites
log_memory_usage(
f"_handle_paginated_sites:yielding_site_{site_idx}_page_{page_count}",
site.web_url,
"site_url",
)
yield site
if not sites.has_next:
break
page_count += 1
log_memory_usage(f"_handle_paginated_sites:fetching_next_page_{page_count}")
sites = sites._get_next().execute_query()
def fetch_sites(self) -> list[SiteDescriptor]:
log_memory_usage("fetch_sites:start")
sites = self.graph_client.sites.get_all_sites().execute_query()
if not sites:
raise RuntimeError("No sites found in the tenant")
# OneDrive personal sites should not be indexed with SharepointConnector
site_descriptors = [
SiteDescriptor(
url=site.web_url or "",
drive_name=None,
folder_path=None,
)
for site in self._handle_paginated_sites(sites)
if "-my.sharepoint" not in site.web_url
]
log_memory_usage("fetch_sites:before_site_descriptors_creation")
site_descriptors: list[SiteDescriptor] = []
for site_idx, site in enumerate(self._handle_paginated_sites(sites)):
if site_idx % 100 == 0: # Log every 100 sites during processing
log_memory_usage(
f"fetch_sites:processing_site_{site_idx}",
site_descriptors,
"site_descriptors_so_far",
)
if "-my.sharepoint" not in site.web_url:
site_descriptors.append(
SiteDescriptor(
url=site.web_url or "",
drive_name=None,
folder_path=None,
)
)
log_memory_usage(
"fetch_sites:after_site_descriptors_creation",
site_descriptors,
"all_site_descriptors",
)
log_memory_usage("fetch_sites:end", site_descriptors, "site_descriptors")
return site_descriptors
def _fetch_site_pages(
@@ -979,6 +1144,7 @@ class SharepointConnector(
end: datetime | None = None,
) -> list[dict[str, Any]]:
"""Fetch SharePoint site pages (.aspx files) using the SharePoint Pages API."""
log_memory_usage("_fetch_site_pages:start", site_descriptor.url, "site_url")
# Get the site to extract the site ID
site = self.graph_client.sites.get_by_url(site_descriptor.url)
@@ -1012,11 +1178,15 @@ class SharepointConnector(
response.raise_for_status()
pages_data = response.json()
all_pages = pages_data.get("value", [])
log_memory_usage("_fetch_site_pages:initial_pages", all_pages, "initial_pages")
# Handle pagination if there are more pages
# TODO: This accumulates all pages in memory and can be heavy on large tenants.
# We should process each page incrementally to avoid unbounded growth.
while "@odata.nextLink" in pages_data:
log_memory_usage(
"_fetch_site_pages:before_pagination", all_pages, "accumulated_pages"
)
next_url = pages_data["@odata.nextLink"]
response = requests.get(
next_url, headers=headers, timeout=REQUEST_TIMEOUT_SECONDS
@@ -1024,13 +1194,25 @@ class SharepointConnector(
response.raise_for_status()
pages_data = response.json()
all_pages.extend(pages_data.get("value", []))
log_memory_usage(
"_fetch_site_pages:after_pagination", all_pages, "accumulated_pages"
)
logger.debug(f"Found {len(all_pages)} site pages in {site_descriptor.url}")
# Filter pages based on time window if specified
if start is not None or end is not None:
filtered_pages = []
for page in all_pages:
filtered_pages: list[dict[str, Any]] = []
log_memory_usage(
"_fetch_site_pages:before_filtering_loop", all_pages, "pages_to_filter"
)
for i, page in enumerate(all_pages):
if i % 100 == 0: # Log every 100 pages
log_memory_usage(
f"_fetch_site_pages:filtering_page_{i}",
filtered_pages,
"filtered_pages_so_far",
)
page_modified = page.get("lastModifiedDateTime")
if page_modified:
if isinstance(page_modified, str):
@@ -1044,8 +1226,14 @@ class SharepointConnector(
continue
filtered_pages.append(page)
log_memory_usage(
"_fetch_site_pages:after_filtering_loop",
filtered_pages,
"filtered_pages",
)
all_pages = filtered_pages
log_memory_usage("_fetch_site_pages:end", all_pages, "final_pages")
return all_pages
def _acquire_token(self) -> dict[str, Any]:
@@ -1061,11 +1249,22 @@ class SharepointConnector(
return token
def _fetch_slim_documents_from_sharepoint(self) -> GenerateSlimDocumentOutput:
log_memory_usage("_fetch_slim_documents_from_sharepoint:start")
site_descriptors = self.site_descriptors or self.fetch_sites()
# goes over all urls, converts them into SlimDocument objects and then yields them in batches
doc_batch: list[SlimDocument] = []
for site_descriptor in site_descriptors:
log_memory_usage(
"_fetch_slim_documents_from_sharepoint:before_site_loop",
site_descriptors,
"site_descriptors",
)
for site_idx, site_descriptor in enumerate(site_descriptors):
log_memory_usage(
f"_fetch_slim_documents_from_sharepoint:processing_site_{site_idx}:{site_descriptor.url}",
doc_batch,
"current_doc_batch",
)
ctx: ClientContext | None = None
if self.msal_app and self.sp_tenant_domain:
@@ -1083,8 +1282,22 @@ class SharepointConnector(
# Process site documents if flag is True
if self.include_site_documents:
log_memory_usage(
f"_fetch_slim_documents_from_sharepoint:before_fetch_driveitems:{site_descriptor.url}"
)
driveitems = self._fetch_driveitems(site_descriptor=site_descriptor)
for driveitem, drive_name in driveitems:
log_memory_usage(
f"_fetch_slim_documents_from_sharepoint:after_fetch_driveitems:{site_descriptor.url}",
driveitems,
"driveitems",
)
for drive_idx, (driveitem, drive_name) in enumerate(driveitems):
if drive_idx % 50 == 0: # Log every 50 drive items
log_memory_usage(
f"_fetch_slim_documents_from_sharepoint:processing_driveitem_{drive_idx}:{driveitem.id}",
doc_batch,
"doc_batch_size",
)
try:
logger.debug(f"Processing: {driveitem.web_url}")
doc_batch.append(
@@ -1096,13 +1309,33 @@ class SharepointConnector(
logger.warning(f"Failed to process driveitem: {str(e)}")
if len(doc_batch) >= SLIM_BATCH_SIZE:
log_memory_usage(
"_fetch_slim_documents_from_sharepoint:yielding_batch",
doc_batch,
"doc_batch",
)
yield doc_batch
doc_batch = []
# Process site pages if flag is True
if self.include_site_pages:
log_memory_usage(
f"_fetch_slim_documents_from_sharepoint:before_fetch_site_pages:{site_descriptor.url}"
)
site_pages = self._fetch_site_pages(site_descriptor)
for site_page in site_pages:
log_memory_usage(
f"_fetch_slim_documents_from_sharepoint:after_fetch_site_pages:{site_descriptor.url}",
site_pages,
"site_pages",
)
for page_idx, site_page in enumerate(site_pages):
if page_idx % 20 == 0: # Log every 20 site pages
log_memory_usage(
f"_fetch_slim_documents_from_sharepoint:processing_sitepage_"
f"{page_idx}:{site_page.get('id', 'unknown')}",
doc_batch,
"doc_batch_size",
)
logger.debug(
f"Processing site page: {site_page.get('webUrl', site_page.get('name', 'Unknown'))}"
)
@@ -1112,8 +1345,18 @@ class SharepointConnector(
)
)
if len(doc_batch) >= SLIM_BATCH_SIZE:
log_memory_usage(
"_fetch_slim_documents_from_sharepoint:yielding_page_batch",
doc_batch,
"doc_batch",
)
yield doc_batch
doc_batch = []
log_memory_usage(
"_fetch_slim_documents_from_sharepoint:final_yield",
doc_batch,
"final_doc_batch",
)
yield doc_batch
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
@@ -1248,6 +1491,7 @@ class SharepointConnector(
checkpoint: SharepointConnectorCheckpoint,
include_permissions: bool = False,
) -> CheckpointOutput[SharepointConnectorCheckpoint]:
log_memory_usage("_load_from_checkpoint:start", checkpoint, "checkpoint")
if self._graph_client is None:
raise ConnectorMissingCredentialError("Sharepoint")
@@ -1261,8 +1505,14 @@ class SharepointConnector(
and not checkpoint.process_site_pages
):
logger.info("Initializing SharePoint sites for processing")
log_memory_usage("_load_from_checkpoint:before_fetch_sites")
site_descs = self.site_descriptors or self.fetch_sites()
checkpoint.cached_site_descriptors = deque(site_descs)
log_memory_usage(
"_load_from_checkpoint:after_site_init",
checkpoint.cached_site_descriptors,
"cached_site_descriptors",
)
if not checkpoint.cached_site_descriptors:
logger.warning(
@@ -1388,9 +1638,17 @@ class SharepointConnector(
return checkpoint
try:
log_memory_usage(
f"_load_from_checkpoint:before_get_drive_items:{current_drive_name}"
)
driveitems = self._get_drive_items_for_drive_name(
site_descriptor, current_drive_name, start_dt, end_dt
)
log_memory_usage(
f"_load_from_checkpoint:after_get_drive_items:{current_drive_name}",
driveitems,
"driveitems",
)
if not driveitems:
logger.warning(
@@ -1419,7 +1677,16 @@ class SharepointConnector(
if current_drive_name == "Documents"
else current_drive_name
)
for driveitem in driveitems:
log_memory_usage(
f"_load_from_checkpoint:before_process_driveitems:{current_drive_name}",
driveitems,
"driveitems_to_process",
)
for item_idx, driveitem in enumerate(driveitems):
if item_idx % 25 == 0: # Log every 25 drive items
log_memory_usage(
f"_load_from_checkpoint:processing_driveitem_{item_idx}:{driveitem.id}"
)
driveitem_extension = get_file_ext(driveitem.name)
# Only yield empty documents if they are PDFs or images
should_yield_if_empty = (
@@ -1428,6 +1695,9 @@ class SharepointConnector(
)
try:
log_memory_usage(
f"_load_from_checkpoint:before_convert_driveitem:{driveitem.id}:{driveitem.name}"
)
doc = _convert_driveitem_to_document_with_permissions(
driveitem,
current_drive_name,
@@ -1435,23 +1705,44 @@ class SharepointConnector(
self.graph_client,
include_permissions=include_permissions,
)
log_memory_usage(
f"_load_from_checkpoint:after_convert_driveitem:{driveitem.id}:{driveitem.name}",
doc,
"converted_doc",
)
if doc:
if doc.sections:
log_memory_usage(
f"_load_from_checkpoint:yielding_doc:{driveitem.id}",
doc,
"yielded_doc",
)
yield doc
elif should_yield_if_empty:
doc.sections = [
TextSection(link=driveitem.web_url, text="")
]
log_memory_usage(
f"_load_from_checkpoint:yielding_empty_doc:{driveitem.id}",
doc,
"yielded_empty_doc",
)
yield doc
except Exception as e:
logger.warning(
f"Failed to process driveitem {driveitem.web_url}: {e}"
)
# Yield a ConnectorFailure for individual document processing failures
yield self._create_document_failure(
failure = self._create_document_failure(
driveitem, f"Failed to process: {str(e)}", e
)
log_memory_usage(
f"_load_from_checkpoint:yielding_failure:{driveitem.id}",
failure,
"yielded_failure",
)
yield failure
# Clear current drive after processing
checkpoint.current_drive_name = None
@@ -1484,9 +1775,17 @@ class SharepointConnector(
site_descriptor = checkpoint.current_site_descriptor
start_dt = datetime.fromtimestamp(start, tz=timezone.utc)
end_dt = datetime.fromtimestamp(end, tz=timezone.utc)
log_memory_usage(
f"_load_from_checkpoint:before_fetch_site_pages:{site_descriptor.url}"
)
site_pages = self._fetch_site_pages(
site_descriptor, start=start_dt, end=end_dt
)
log_memory_usage(
f"_load_from_checkpoint:after_fetch_site_pages:{site_descriptor.url}",
site_pages,
"site_pages",
)
client_ctx: ClientContext | None = None
if include_permissions:
if self.msal_app and self.sp_tenant_domain:
@@ -1497,19 +1796,27 @@ class SharepointConnector(
)
else:
raise RuntimeError("MSAL app or tenant domain is not set")
for site_page in site_pages:
for page_idx, site_page in enumerate(site_pages):
if page_idx % 10 == 0: # Log every 10 site pages
log_memory_usage(
f"_load_from_checkpoint:processing_sitepage_{page_idx}:{site_page.get('id', 'unknown')}"
)
logger.debug(
f"Processing site page: {site_page.get('webUrl', site_page.get('name', 'Unknown'))}"
)
yield (
_convert_sitepage_to_document(
site_page,
site_descriptor.drive_name,
client_ctx,
self.graph_client,
include_permissions=include_permissions,
)
converted_page = _convert_sitepage_to_document(
site_page,
site_descriptor.drive_name,
client_ctx,
self.graph_client,
include_permissions=include_permissions,
)
log_memory_usage(
f"_load_from_checkpoint:yielding_sitepage:{site_page.get('id', 'unknown')}",
converted_page,
"yielded_sitepage",
)
yield converted_page
logger.info(
f"Finished processing site pages for site: {site_descriptor.url}"
)
@@ -1547,6 +1854,9 @@ class SharepointConnector(
f"SharePoint processing complete. Finished last site: {current_site}"
)
checkpoint.has_more = False
log_memory_usage(
"_load_from_checkpoint:end_complete", checkpoint, "final_checkpoint"
)
return checkpoint
def load_from_checkpoint(

View File

@@ -19,6 +19,7 @@ from zipfile import BadZipFile
import chardet
from markitdown import FileConversionException
from markitdown import MarkItDown
from markitdown import StreamInfo
from markitdown import UnsupportedFormatException
from PIL import Image
from pypdf import PdfReader
@@ -30,7 +31,11 @@ from onyx.file_processing.file_validation import TEXT_MIME_TYPE
from onyx.file_processing.html_utils import parse_html_page_basic
from onyx.file_processing.unstructured import get_unstructured_api_key
from onyx.file_processing.unstructured import unstructured_to_text
from onyx.utils.file_types import PRESENTATION_MIME_TYPE
from onyx.utils.file_types import SPREADSHEET_MIME_TYPE
from onyx.utils.file_types import WORD_PROCESSING_MIME_TYPE
from onyx.utils.logger import setup_logger
from onyx.utils.memory_logger import log_memory_usage
logger = setup_logger()
@@ -80,6 +85,15 @@ IMAGE_MEDIA_TYPES = [
"image/webp",
]
_MARKITDOWN_CONVERTER: MarkItDown | None = None
def get_markitdown_converter() -> MarkItDown:
global _MARKITDOWN_CONVERTER
if _MARKITDOWN_CONVERTER is None:
_MARKITDOWN_CONVERTER = MarkItDown(enable_plugins=False)
return _MARKITDOWN_CONVERTER
class OnyxExtensionType(IntFlag):
Plain = auto()
@@ -209,6 +223,11 @@ def read_text_file(
"""
metadata = {}
file_content_raw = ""
log_memory_usage(
f"read_text_file:before_read_text_file:{encoding}:{errors}:{ignore_onyx_metadata}",
file,
"file",
)
for ind, line in enumerate(file):
# decode
try:
@@ -229,6 +248,16 @@ def read_text_file(
file_content_raw += line
log_memory_usage(
f"read_text_file:after_read_text_file:{encoding}:{errors}:{ignore_onyx_metadata}",
file_content_raw,
"file_content_raw",
)
log_memory_usage(
f"read_text_file:after_read_text_file:{encoding}:{errors}:{ignore_onyx_metadata}",
metadata,
"metadata",
)
return file_content_raw, metadata
@@ -338,9 +367,31 @@ def docx_to_text_and_images(
of avoiding materializing the list of images in memory.
The images list returned is empty in this case.
"""
md = MarkItDown(enable_plugins=False)
log_memory_usage(
"docx_to_text_and_images:before_md_create",
file,
"file",
)
md = get_markitdown_converter()
log_memory_usage(
"docx_to_text_and_images:after_md_create",
md,
"md",
)
try:
doc = md.convert(to_bytesio(file))
log_memory_usage(
"docx_to_text_and_images:before_md_convert",
file,
"file",
)
doc = md.convert(
to_bytesio(file), stream_info=StreamInfo(mimetype=WORD_PROCESSING_MIME_TYPE)
)
log_memory_usage(
"docx_to_text_and_images:after_md_convert",
doc,
"doc",
)
except (
BadZipFile,
ValueError,
@@ -372,9 +423,12 @@ def docx_to_text_and_images(
def pptx_to_text(file: IO[Any], file_name: str = "") -> str:
md = MarkItDown(enable_plugins=False)
md = get_markitdown_converter()
stream_info = StreamInfo(
mimetype=PRESENTATION_MIME_TYPE, filename=file_name or None, extension=".pptx"
)
try:
presentation = md.convert(to_bytesio(file))
presentation = md.convert(to_bytesio(file), stream_info=stream_info)
except (
BadZipFile,
ValueError,
@@ -388,9 +442,12 @@ def pptx_to_text(file: IO[Any], file_name: str = "") -> str:
def xlsx_to_text(file: IO[Any], file_name: str = "") -> str:
md = MarkItDown(enable_plugins=False)
md = get_markitdown_converter()
stream_info = StreamInfo(
mimetype=SPREADSHEET_MIME_TYPE, filename=file_name or None, extension=".xlsx"
)
try:
workbook = md.convert(to_bytesio(file))
workbook = md.convert(to_bytesio(file), stream_info=stream_info)
except (
BadZipFile,
ValueError,
@@ -531,8 +588,42 @@ def extract_text_and_images(
Primary new function for the updated connector.
Returns structured extraction result with text content, embedded images, and metadata.
"""
res = _extract_text_and_images(
file, file_name, pdf_pass, content_type, image_callback
)
# Clean up any temporary objects and force garbage collection
import gc
log_memory_usage(
"extract_text_and_images:before_gc_collect",
gc,
"gc",
)
unreachable = gc.collect()
logger.info(f"Unreachable objects: {unreachable}")
log_memory_usage(
"extract_text_and_images:after_gc_collect",
gc,
"gc",
)
return res
def _extract_text_and_images(
file: IO[Any],
file_name: str,
pdf_pass: str | None = None,
content_type: str | None = None,
image_callback: Callable[[bytes, str], None] | None = None,
) -> ExtractionResult:
file.seek(0)
log_memory_usage(
f"extract_text_and_images:before_unstructured:{file_name}:{content_type}",
file,
"file",
)
if get_unstructured_api_key():
try:
text_content = unstructured_to_text(file, file_name)
@@ -556,12 +647,31 @@ def extract_text_and_images(
# Default processing
try:
extension = get_file_ext(file_name)
log_memory_usage(
f"extract_text_and_images:before_unstructured:{file_name}:{content_type}:{extension}",
file,
"file",
)
# docx example for embedded images
if extension == ".docx":
log_memory_usage(
"extract_text_and_images:before_docx_to_text_and_images",
file,
"file",
)
text_content, images = docx_to_text_and_images(
file, file_name, image_callback=image_callback
)
log_memory_usage(
"extract_text_and_images:after_docx_to_text_and_images",
text_content,
"text_content",
)
log_memory_usage(
"extract_text_and_images:after_docx_to_text_and_images",
images,
"images",
)
return ExtractionResult(
text_content=text_content, embedded_images=images, metadata={}
)
@@ -569,12 +679,32 @@ def extract_text_and_images(
# PDF example: we do not show complicated PDF image extraction here
# so we simply extract text for now and skip images.
if extension == ".pdf":
log_memory_usage(
"extract_text_and_images:before_read_pdf_file",
file,
"file",
)
text_content, pdf_metadata, images = read_pdf_file(
file,
pdf_pass,
extract_images=get_image_extraction_and_analysis_enabled(),
image_callback=image_callback,
)
log_memory_usage(
"extract_text_and_images:after_read_pdf_file",
text_content,
"text_content",
)
log_memory_usage(
"extract_text_and_images:after_read_pdf_file",
pdf_metadata,
"pdf_metadata",
)
log_memory_usage(
"extract_text_and_images:after_read_pdf_file",
images,
"images",
)
return ExtractionResult(
text_content=text_content, embedded_images=images, metadata=pdf_metadata
)

View File

@@ -1,3 +1,16 @@
PRESENTATION_MIME_TYPE = (
"application/vnd.openxmlformats-officedocument.presentationml.presentation"
)
SPREADSHEET_MIME_TYPE = (
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
WORD_PROCESSING_MIME_TYPE = (
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
PDF_MIME_TYPE = "application/pdf"
class UploadMimeTypes:
IMAGE_MIME_TYPES = {"image/jpeg", "image/png", "image/webp"}
CSV_MIME_TYPES = {"text/csv"}
@@ -13,10 +26,10 @@ class UploadMimeTypes:
"application/x-yaml",
}
DOCUMENT_MIME_TYPES = {
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
PDF_MIME_TYPE,
WORD_PROCESSING_MIME_TYPE,
PRESENTATION_MIME_TYPE,
SPREADSHEET_MIME_TYPE,
"message/rfc822",
"application/epub+zip",
}

View File

@@ -0,0 +1,49 @@
import os
from typing import Any
import psutil
from pympler import asizeof
from onyx.utils.logger import setup_logger
logger = setup_logger()
def log_memory_usage(
label: str,
specific_object: Any = None,
object_label: str = "",
) -> None:
"""Log current process memory usage and optionally the size of a specific object.
Args:
label: A descriptive label for the current location/operation in code
specific_object: Optional object to measure the size of
object_label: Optional label describing the specific object
"""
try:
# Get current process memory info
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
# Convert to MB for readability
rss_mb = memory_info.rss / (1024 * 1024)
vms_mb = memory_info.vms / (1024 * 1024)
log_parts = [f"MEMORY[{label}]", f"RSS: {rss_mb:.2f}MB", f"VMS: {vms_mb:.2f}MB"]
# Add object size if provided
if specific_object is not None:
try:
# recursively calculate the size of the object
obj_size = asizeof.asizeof(specific_object)
obj_size_mb = obj_size / (1024 * 1024)
obj_desc = f"[{object_label}]" if object_label else "[object]"
log_parts.append(f"OBJ{obj_desc}: {obj_size_mb:.2f}MB")
except Exception as e:
log_parts.append(f"OBJ_SIZE_ERROR: {str(e)}")
logger.info(" | ".join(log_parts))
except Exception as e:
logger.warning(f"Failed to log memory usage for {label}: {str(e)}")

View File

@@ -60,6 +60,7 @@ pyairtable==3.0.1
pycryptodome==3.19.1
pydantic==2.11.7
PyGithub==2.5.0
pympler==1.1
python-dateutil==2.8.2
python-gitlab==5.6.0
python-pptx==0.6.23