mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-02-16 23:35:46 +00:00
Compare commits
1 Commits
dump-scrip
...
debug-shar
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6786f1578 |
@@ -100,6 +100,7 @@ from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
|
||||
from onyx.redis.redis_utils import is_fence
|
||||
from onyx.server.runtime.onyx_runtime import OnyxRuntime
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.memory_logger import log_memory_usage
|
||||
from onyx.utils.middleware import make_randomized_onyx_request_id
|
||||
from onyx.utils.telemetry import optional_telemetry
|
||||
from onyx.utils.telemetry import RecordType
|
||||
@@ -1279,6 +1280,10 @@ def _docprocessing_task(
|
||||
f"batch_num={batch_num} "
|
||||
)
|
||||
|
||||
log_memory_usage(
|
||||
f"docprocessing_task:start:batch_{batch_num}:attempt_{index_attempt_id}"
|
||||
)
|
||||
|
||||
# Get the document batch storage
|
||||
storage = get_document_batch_storage(cc_pair_id, index_attempt_id)
|
||||
|
||||
@@ -1297,7 +1302,16 @@ def _docprocessing_task(
|
||||
per_batch_lock: RedisLock | None = None
|
||||
try:
|
||||
# Retrieve documents from storage
|
||||
log_memory_usage(
|
||||
f"docprocessing_task:before_load_batch_{batch_num}:attempt_{index_attempt_id}"
|
||||
)
|
||||
documents = storage.get_batch(batch_num)
|
||||
log_memory_usage(
|
||||
f"docprocessing_task:after_load_batch_{batch_num}:attempt_{index_attempt_id}",
|
||||
documents,
|
||||
f"batch_{batch_num}_documents",
|
||||
)
|
||||
|
||||
if not documents:
|
||||
task_logger.error(f"No documents found for batch {batch_num}")
|
||||
return
|
||||
@@ -1369,6 +1383,12 @@ def _docprocessing_task(
|
||||
f"Processing {len(documents)} documents through indexing pipeline"
|
||||
)
|
||||
|
||||
log_memory_usage(
|
||||
f"docprocessing_task:before_indexing_pipeline:batch_{batch_num}:attempt_{index_attempt_id}",
|
||||
documents,
|
||||
f"batch_{batch_num}_documents_before_pipeline",
|
||||
)
|
||||
|
||||
# real work happens here!
|
||||
index_pipeline_result = run_indexing_pipeline(
|
||||
embedder=embedding_model,
|
||||
@@ -1381,6 +1401,12 @@ def _docprocessing_task(
|
||||
index_attempt_metadata=index_attempt_metadata,
|
||||
)
|
||||
|
||||
log_memory_usage(
|
||||
f"docprocessing_task:after_indexing_pipeline:batch_{batch_num}:attempt_{index_attempt_id}",
|
||||
index_pipeline_result,
|
||||
f"batch_{batch_num}_pipeline_result",
|
||||
)
|
||||
|
||||
# Update batch completion and document counts atomically using database coordination
|
||||
|
||||
with get_session_with_current_tenant() as db_session, cross_batch_db_lock:
|
||||
@@ -1458,7 +1484,14 @@ def _docprocessing_task(
|
||||
f"elapsed={elapsed_time:.2f}s"
|
||||
)
|
||||
|
||||
log_memory_usage(
|
||||
f"docprocessing_task:completed:batch_{batch_num}:attempt_{index_attempt_id}"
|
||||
)
|
||||
|
||||
except Exception:
|
||||
log_memory_usage(
|
||||
f"docprocessing_task:exception:batch_{batch_num}:attempt_{index_attempt_id}"
|
||||
)
|
||||
task_logger.exception(
|
||||
f"Document batch processing failed: "
|
||||
f"batch_num={batch_num} "
|
||||
|
||||
@@ -60,6 +60,7 @@ from onyx.file_processing.extract_file_text import get_file_ext
|
||||
from onyx.file_processing.file_validation import EXCLUDED_IMAGE_TYPES
|
||||
from onyx.file_processing.image_utils import store_image_and_create_section
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.memory_logger import log_memory_usage
|
||||
|
||||
logger = setup_logger()
|
||||
SLIM_BATCH_SIZE = 1000
|
||||
@@ -247,6 +248,7 @@ def _download_with_cap(url: str, timeout: int, cap: int) -> bytes:
|
||||
- Raises `SizeCapExceeded` when the cap would be exceeded.
|
||||
- Returns the full bytes if the content fits within `cap`.
|
||||
"""
|
||||
log_memory_usage("_download_with_cap:start", url, "download_url")
|
||||
with requests.get(url, stream=True, timeout=timeout) as resp:
|
||||
resp.raise_for_status()
|
||||
|
||||
@@ -273,7 +275,9 @@ def _download_with_cap(url: str, timeout: int, cap: int) -> bytes:
|
||||
)
|
||||
raise SizeCapExceeded("during_download")
|
||||
|
||||
return buf.getvalue()
|
||||
content_bytes = buf.getvalue()
|
||||
log_memory_usage("_download_with_cap:end", content_bytes, "downloaded_content")
|
||||
return content_bytes
|
||||
|
||||
|
||||
def _download_via_sdk_with_cap(
|
||||
@@ -283,6 +287,7 @@ def _download_via_sdk_with_cap(
|
||||
|
||||
Raises SizeCapExceeded("during_sdk_download") if the cap would be exceeded.
|
||||
"""
|
||||
log_memory_usage("_download_via_sdk_with_cap:start", driveitem.id, "driveitem_id")
|
||||
buf = io.BytesIO()
|
||||
|
||||
def on_chunk(bytes_read: int) -> None:
|
||||
@@ -294,7 +299,11 @@ def _download_via_sdk_with_cap(
|
||||
driveitem.download_session(buf, chunk_downloaded=on_chunk, chunk_size=chunk_size)
|
||||
# Execute the configured request with retries using existing helper
|
||||
sleep_and_retry(driveitem.context, "download_session")
|
||||
return buf.getvalue()
|
||||
content_bytes = buf.getvalue()
|
||||
log_memory_usage(
|
||||
"_download_via_sdk_with_cap:end", content_bytes, "sdk_downloaded_content"
|
||||
)
|
||||
return content_bytes
|
||||
|
||||
|
||||
def _convert_driveitem_to_document_with_permissions(
|
||||
@@ -304,6 +313,11 @@ def _convert_driveitem_to_document_with_permissions(
|
||||
graph_client: GraphClient,
|
||||
include_permissions: bool = False,
|
||||
) -> Document | None:
|
||||
log_memory_usage(
|
||||
"_convert_driveitem_to_document_with_permissions:start",
|
||||
driveitem.id,
|
||||
"driveitem_id",
|
||||
)
|
||||
|
||||
if not driveitem.name or not driveitem.id:
|
||||
raise ValueError("DriveItem name/id is required")
|
||||
@@ -345,6 +359,11 @@ def _convert_driveitem_to_document_with_permissions(
|
||||
|
||||
# Prefer downloadUrl streaming with size cap
|
||||
content_bytes: bytes | None = None
|
||||
log_memory_usage(
|
||||
"_convert_driveitem_to_document_with_permissions:before_download",
|
||||
driveitem.id,
|
||||
"driveitem_id",
|
||||
)
|
||||
if download_url:
|
||||
try:
|
||||
# Use this to test the sdk size cap
|
||||
@@ -364,6 +383,11 @@ def _convert_driveitem_to_document_with_permissions(
|
||||
)
|
||||
|
||||
# Fallback to SDK content if needed
|
||||
log_memory_usage(
|
||||
"_convert_driveitem_to_document_with_permissions:after_primary_download",
|
||||
content_bytes,
|
||||
"content_bytes",
|
||||
)
|
||||
if content_bytes is None:
|
||||
try:
|
||||
content_bytes = _download_via_sdk_with_cap(
|
||||
@@ -404,11 +428,21 @@ def _convert_driveitem_to_document_with_permissions(
|
||||
image_section.link = driveitem.web_url
|
||||
sections.append(image_section)
|
||||
|
||||
log_memory_usage(
|
||||
"_convert_driveitem_to_document_with_permissions:before_text_extraction",
|
||||
content_bytes,
|
||||
"content_bytes",
|
||||
)
|
||||
extraction_result = extract_text_and_images(
|
||||
file=io.BytesIO(content_bytes),
|
||||
file_name=driveitem.name,
|
||||
image_callback=_store_embedded_image,
|
||||
)
|
||||
log_memory_usage(
|
||||
"_convert_driveitem_to_document_with_permissions:after_text_extraction",
|
||||
extraction_result,
|
||||
"extraction_result",
|
||||
)
|
||||
if extraction_result.text_content:
|
||||
sections.append(
|
||||
TextSection(link=driveitem.web_url, text=extraction_result.text_content)
|
||||
@@ -447,6 +481,9 @@ def _convert_driveitem_to_document_with_permissions(
|
||||
],
|
||||
metadata={"drive": drive_name},
|
||||
)
|
||||
log_memory_usage(
|
||||
"_convert_driveitem_to_document_with_permissions:end", doc, "final_document"
|
||||
)
|
||||
return doc
|
||||
|
||||
|
||||
@@ -458,6 +495,9 @@ def _convert_sitepage_to_document(
|
||||
include_permissions: bool = False,
|
||||
) -> Document:
|
||||
"""Convert a SharePoint site page to a Document object."""
|
||||
log_memory_usage(
|
||||
"_convert_sitepage_to_document:start", site_page.get("id"), "site_page_id"
|
||||
)
|
||||
# Extract text content from the site page
|
||||
page_text = ""
|
||||
# Get title and description
|
||||
@@ -474,11 +514,32 @@ def _convert_sitepage_to_document(
|
||||
canvas_layout = site_page.get("canvasLayout", {})
|
||||
if canvas_layout:
|
||||
horizontal_sections = canvas_layout.get("horizontalSections", [])
|
||||
for section in horizontal_sections:
|
||||
log_memory_usage(
|
||||
"_convert_sitepage_to_document:processing_sections",
|
||||
horizontal_sections,
|
||||
"horizontal_sections",
|
||||
)
|
||||
for section_idx, section in enumerate(horizontal_sections):
|
||||
columns = section.get("columns", [])
|
||||
for column in columns:
|
||||
log_memory_usage(
|
||||
f"_convert_sitepage_to_document:processing_columns_{section_idx}",
|
||||
columns,
|
||||
"columns",
|
||||
)
|
||||
for column_idx, column in enumerate(columns):
|
||||
webparts = column.get("webparts", [])
|
||||
for webpart in webparts:
|
||||
log_memory_usage(
|
||||
f"_convert_sitepage_to_document:processing_webparts_{section_idx}_{column_idx}",
|
||||
webparts,
|
||||
"webparts",
|
||||
)
|
||||
for webpart_idx, webpart in enumerate(webparts):
|
||||
if webpart_idx % 5 == 0: # Log every 5 webparts
|
||||
log_memory_usage(
|
||||
f"_convert_sitepage_to_document:processing_webpart_{section_idx}_{column_idx}_{webpart_idx}",
|
||||
page_text,
|
||||
"accumulated_page_text",
|
||||
)
|
||||
# Extract text from different types of webparts
|
||||
webpart_type = webpart.get("@odata.type", "")
|
||||
|
||||
@@ -517,7 +578,12 @@ def _convert_sitepage_to_document(
|
||||
"searchablePlainTexts", []
|
||||
)
|
||||
|
||||
for text_item in searchable_texts:
|
||||
log_memory_usage(
|
||||
"_convert_sitepage_to_document:processing_searchable_texts",
|
||||
searchable_texts,
|
||||
"searchable_texts",
|
||||
)
|
||||
for text_idx, text_item in enumerate(searchable_texts):
|
||||
if isinstance(text_item, dict):
|
||||
key = text_item.get("key", "")
|
||||
value = text_item.get("value", "")
|
||||
@@ -527,6 +593,12 @@ def _convert_sitepage_to_document(
|
||||
page_text += f"## {value}\n\n"
|
||||
else:
|
||||
page_text += f"{value}\n\n"
|
||||
if text_idx % 10 == 0: # Log every 10 text items
|
||||
log_memory_usage(
|
||||
f"_convert_sitepage_to_document:processed_text_{text_idx}",
|
||||
page_text,
|
||||
"accumulated_text",
|
||||
)
|
||||
|
||||
# Extract description if available
|
||||
description = data.get("description", "")
|
||||
@@ -605,6 +677,9 @@ def _convert_sitepage_to_document(
|
||||
else {}
|
||||
),
|
||||
)
|
||||
log_memory_usage(
|
||||
"_convert_sitepage_to_document:end", doc, "final_site_page_document"
|
||||
)
|
||||
return doc
|
||||
|
||||
|
||||
@@ -735,6 +810,11 @@ class SharepointConnector(
|
||||
start: datetime | None = None,
|
||||
end: datetime | None = None,
|
||||
) -> list[DriveItem]:
|
||||
log_memory_usage(
|
||||
f"_get_drive_items_for_drive_name:start:{drive_name}",
|
||||
site_descriptor.url,
|
||||
"site_url",
|
||||
)
|
||||
try:
|
||||
site = self.graph_client.sites.get_by_url(site_descriptor.url)
|
||||
drives = site.drives.get().execute_query()
|
||||
@@ -757,11 +837,19 @@ class SharepointConnector(
|
||||
root_folder = root_folder.get_by_path(folder_part)
|
||||
|
||||
# TODO: consider ways to avoid materializing the entire list of files in memory
|
||||
log_memory_usage(
|
||||
f"_get_drive_items_for_drive_name:before_query:{drive_name}"
|
||||
)
|
||||
query = root_folder.get_files(
|
||||
recursive=True,
|
||||
page_size=1000,
|
||||
)
|
||||
driveitems = query.execute_query()
|
||||
log_memory_usage(
|
||||
f"_get_drive_items_for_drive_name:after_query:{drive_name}",
|
||||
driveitems,
|
||||
"driveitems_list",
|
||||
)
|
||||
logger.debug(f"Found {len(driveitems)} items in drive '{drive_name}'")
|
||||
|
||||
# Filter items based on folder path if specified
|
||||
@@ -805,7 +893,13 @@ class SharepointConnector(
|
||||
f"Found {len(driveitems)} items within time window in drive '{drive.name}'"
|
||||
)
|
||||
|
||||
return list(driveitems)
|
||||
final_driveitems = list(driveitems)
|
||||
log_memory_usage(
|
||||
f"_get_drive_items_for_drive_name:end:{drive_name}",
|
||||
final_driveitems,
|
||||
"final_driveitems",
|
||||
)
|
||||
return final_driveitems
|
||||
|
||||
except Exception as e:
|
||||
# Some drives might not be accessible
|
||||
@@ -832,6 +926,7 @@ class SharepointConnector(
|
||||
start: datetime | None = None,
|
||||
end: datetime | None = None,
|
||||
) -> list[tuple[DriveItem, str]]:
|
||||
log_memory_usage("_fetch_driveitems:start", site_descriptor.url, "site_url")
|
||||
final_driveitems: list[tuple[DriveItem, str]] = []
|
||||
try:
|
||||
site = self.graph_client.sites.get_by_url(site_descriptor.url)
|
||||
@@ -856,21 +951,44 @@ class SharepointConnector(
|
||||
return []
|
||||
|
||||
# Process each matching drive
|
||||
for drive in drives:
|
||||
log_memory_usage(
|
||||
"_fetch_driveitems:before_drives_loop", drives, "drives_to_process"
|
||||
)
|
||||
for drive_idx, drive in enumerate(drives):
|
||||
log_memory_usage(
|
||||
f"_fetch_driveitems:processing_drive_{drive_idx}:{drive.name}",
|
||||
final_driveitems,
|
||||
"final_driveitems_before_drive",
|
||||
)
|
||||
try:
|
||||
root_folder = drive.root
|
||||
if site_descriptor.folder_path:
|
||||
# If a specific folder is requested, navigate to it
|
||||
for folder_part in site_descriptor.folder_path.split("/"):
|
||||
folder_parts = site_descriptor.folder_path.split("/")
|
||||
log_memory_usage(
|
||||
f"_fetch_driveitems:navigating_folders:{drive.name}",
|
||||
folder_parts,
|
||||
"folder_parts",
|
||||
)
|
||||
for part_idx, folder_part in enumerate(folder_parts):
|
||||
log_memory_usage(
|
||||
f"_fetch_driveitems:navigating_part_{part_idx}:{folder_part}"
|
||||
)
|
||||
root_folder = root_folder.get_by_path(folder_part)
|
||||
|
||||
# Get all items recursively
|
||||
# TODO: consider ways to avoid materializing the entire list of files in memory
|
||||
log_memory_usage(f"_fetch_driveitems:before_query:{drive.name}")
|
||||
query = root_folder.get_files(
|
||||
recursive=True,
|
||||
page_size=1000,
|
||||
)
|
||||
driveitems = query.execute_query()
|
||||
log_memory_usage(
|
||||
f"_fetch_driveitems:after_query:{drive.name}",
|
||||
driveitems,
|
||||
"drive_items",
|
||||
)
|
||||
logger.debug(
|
||||
f"Found {len(driveitems)} items in drive '{drive.name}'"
|
||||
)
|
||||
@@ -922,8 +1040,24 @@ class SharepointConnector(
|
||||
f"Found {len(driveitems)} items within time window in drive '{drive.name}'"
|
||||
)
|
||||
|
||||
for item in driveitems:
|
||||
log_memory_usage(
|
||||
f"_fetch_driveitems:before_append_loop:{drive.name}",
|
||||
driveitems,
|
||||
"drive_items_to_append",
|
||||
)
|
||||
for item_idx, item in enumerate(driveitems):
|
||||
if item_idx % 100 == 0: # Log every 100 items
|
||||
log_memory_usage(
|
||||
f"_fetch_driveitems:appending_item_{item_idx}:{drive.name}",
|
||||
final_driveitems,
|
||||
"final_driveitems_accumulator",
|
||||
)
|
||||
final_driveitems.append((item, drive_name or ""))
|
||||
log_memory_usage(
|
||||
f"_fetch_driveitems:after_append_loop:{drive.name}",
|
||||
final_driveitems,
|
||||
"final_driveitems_after_drive",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# Some drives might not be accessible
|
||||
@@ -942,34 +1076,65 @@ class SharepointConnector(
|
||||
# but this is fine, as there are no actual documents in those
|
||||
logger.warning(f"Failed to process site: {err_str}")
|
||||
|
||||
log_memory_usage("_fetch_driveitems:end", final_driveitems, "final_driveitems")
|
||||
return final_driveitems
|
||||
|
||||
def _handle_paginated_sites(
|
||||
self, sites: SitesWithRoot
|
||||
) -> Generator[Site, None, None]:
|
||||
page_count = 0
|
||||
while sites:
|
||||
log_memory_usage(
|
||||
f"_handle_paginated_sites:processing_page_{page_count}",
|
||||
sites.current_page,
|
||||
"current_page",
|
||||
)
|
||||
if sites.current_page:
|
||||
yield from sites.current_page
|
||||
for site_idx, site in enumerate(sites.current_page):
|
||||
if site_idx % 50 == 0: # Log every 50 sites
|
||||
log_memory_usage(
|
||||
f"_handle_paginated_sites:yielding_site_{site_idx}_page_{page_count}",
|
||||
site.web_url,
|
||||
"site_url",
|
||||
)
|
||||
yield site
|
||||
if not sites.has_next:
|
||||
break
|
||||
page_count += 1
|
||||
log_memory_usage(f"_handle_paginated_sites:fetching_next_page_{page_count}")
|
||||
sites = sites._get_next().execute_query()
|
||||
|
||||
def fetch_sites(self) -> list[SiteDescriptor]:
|
||||
log_memory_usage("fetch_sites:start")
|
||||
sites = self.graph_client.sites.get_all_sites().execute_query()
|
||||
|
||||
if not sites:
|
||||
raise RuntimeError("No sites found in the tenant")
|
||||
|
||||
# OneDrive personal sites should not be indexed with SharepointConnector
|
||||
site_descriptors = [
|
||||
SiteDescriptor(
|
||||
url=site.web_url or "",
|
||||
drive_name=None,
|
||||
folder_path=None,
|
||||
)
|
||||
for site in self._handle_paginated_sites(sites)
|
||||
if "-my.sharepoint" not in site.web_url
|
||||
]
|
||||
log_memory_usage("fetch_sites:before_site_descriptors_creation")
|
||||
site_descriptors: list[SiteDescriptor] = []
|
||||
for site_idx, site in enumerate(self._handle_paginated_sites(sites)):
|
||||
if site_idx % 100 == 0: # Log every 100 sites during processing
|
||||
log_memory_usage(
|
||||
f"fetch_sites:processing_site_{site_idx}",
|
||||
site_descriptors,
|
||||
"site_descriptors_so_far",
|
||||
)
|
||||
if "-my.sharepoint" not in site.web_url:
|
||||
site_descriptors.append(
|
||||
SiteDescriptor(
|
||||
url=site.web_url or "",
|
||||
drive_name=None,
|
||||
folder_path=None,
|
||||
)
|
||||
)
|
||||
log_memory_usage(
|
||||
"fetch_sites:after_site_descriptors_creation",
|
||||
site_descriptors,
|
||||
"all_site_descriptors",
|
||||
)
|
||||
log_memory_usage("fetch_sites:end", site_descriptors, "site_descriptors")
|
||||
return site_descriptors
|
||||
|
||||
def _fetch_site_pages(
|
||||
@@ -979,6 +1144,7 @@ class SharepointConnector(
|
||||
end: datetime | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Fetch SharePoint site pages (.aspx files) using the SharePoint Pages API."""
|
||||
log_memory_usage("_fetch_site_pages:start", site_descriptor.url, "site_url")
|
||||
|
||||
# Get the site to extract the site ID
|
||||
site = self.graph_client.sites.get_by_url(site_descriptor.url)
|
||||
@@ -1012,11 +1178,15 @@ class SharepointConnector(
|
||||
response.raise_for_status()
|
||||
pages_data = response.json()
|
||||
all_pages = pages_data.get("value", [])
|
||||
log_memory_usage("_fetch_site_pages:initial_pages", all_pages, "initial_pages")
|
||||
|
||||
# Handle pagination if there are more pages
|
||||
# TODO: This accumulates all pages in memory and can be heavy on large tenants.
|
||||
# We should process each page incrementally to avoid unbounded growth.
|
||||
while "@odata.nextLink" in pages_data:
|
||||
log_memory_usage(
|
||||
"_fetch_site_pages:before_pagination", all_pages, "accumulated_pages"
|
||||
)
|
||||
next_url = pages_data["@odata.nextLink"]
|
||||
response = requests.get(
|
||||
next_url, headers=headers, timeout=REQUEST_TIMEOUT_SECONDS
|
||||
@@ -1024,13 +1194,25 @@ class SharepointConnector(
|
||||
response.raise_for_status()
|
||||
pages_data = response.json()
|
||||
all_pages.extend(pages_data.get("value", []))
|
||||
log_memory_usage(
|
||||
"_fetch_site_pages:after_pagination", all_pages, "accumulated_pages"
|
||||
)
|
||||
|
||||
logger.debug(f"Found {len(all_pages)} site pages in {site_descriptor.url}")
|
||||
|
||||
# Filter pages based on time window if specified
|
||||
if start is not None or end is not None:
|
||||
filtered_pages = []
|
||||
for page in all_pages:
|
||||
filtered_pages: list[dict[str, Any]] = []
|
||||
log_memory_usage(
|
||||
"_fetch_site_pages:before_filtering_loop", all_pages, "pages_to_filter"
|
||||
)
|
||||
for i, page in enumerate(all_pages):
|
||||
if i % 100 == 0: # Log every 100 pages
|
||||
log_memory_usage(
|
||||
f"_fetch_site_pages:filtering_page_{i}",
|
||||
filtered_pages,
|
||||
"filtered_pages_so_far",
|
||||
)
|
||||
page_modified = page.get("lastModifiedDateTime")
|
||||
if page_modified:
|
||||
if isinstance(page_modified, str):
|
||||
@@ -1044,8 +1226,14 @@ class SharepointConnector(
|
||||
continue
|
||||
|
||||
filtered_pages.append(page)
|
||||
log_memory_usage(
|
||||
"_fetch_site_pages:after_filtering_loop",
|
||||
filtered_pages,
|
||||
"filtered_pages",
|
||||
)
|
||||
all_pages = filtered_pages
|
||||
|
||||
log_memory_usage("_fetch_site_pages:end", all_pages, "final_pages")
|
||||
return all_pages
|
||||
|
||||
def _acquire_token(self) -> dict[str, Any]:
|
||||
@@ -1061,11 +1249,22 @@ class SharepointConnector(
|
||||
return token
|
||||
|
||||
def _fetch_slim_documents_from_sharepoint(self) -> GenerateSlimDocumentOutput:
|
||||
log_memory_usage("_fetch_slim_documents_from_sharepoint:start")
|
||||
site_descriptors = self.site_descriptors or self.fetch_sites()
|
||||
|
||||
# goes over all urls, converts them into SlimDocument objects and then yields them in batches
|
||||
doc_batch: list[SlimDocument] = []
|
||||
for site_descriptor in site_descriptors:
|
||||
log_memory_usage(
|
||||
"_fetch_slim_documents_from_sharepoint:before_site_loop",
|
||||
site_descriptors,
|
||||
"site_descriptors",
|
||||
)
|
||||
for site_idx, site_descriptor in enumerate(site_descriptors):
|
||||
log_memory_usage(
|
||||
f"_fetch_slim_documents_from_sharepoint:processing_site_{site_idx}:{site_descriptor.url}",
|
||||
doc_batch,
|
||||
"current_doc_batch",
|
||||
)
|
||||
ctx: ClientContext | None = None
|
||||
|
||||
if self.msal_app and self.sp_tenant_domain:
|
||||
@@ -1083,8 +1282,22 @@ class SharepointConnector(
|
||||
|
||||
# Process site documents if flag is True
|
||||
if self.include_site_documents:
|
||||
log_memory_usage(
|
||||
f"_fetch_slim_documents_from_sharepoint:before_fetch_driveitems:{site_descriptor.url}"
|
||||
)
|
||||
driveitems = self._fetch_driveitems(site_descriptor=site_descriptor)
|
||||
for driveitem, drive_name in driveitems:
|
||||
log_memory_usage(
|
||||
f"_fetch_slim_documents_from_sharepoint:after_fetch_driveitems:{site_descriptor.url}",
|
||||
driveitems,
|
||||
"driveitems",
|
||||
)
|
||||
for drive_idx, (driveitem, drive_name) in enumerate(driveitems):
|
||||
if drive_idx % 50 == 0: # Log every 50 drive items
|
||||
log_memory_usage(
|
||||
f"_fetch_slim_documents_from_sharepoint:processing_driveitem_{drive_idx}:{driveitem.id}",
|
||||
doc_batch,
|
||||
"doc_batch_size",
|
||||
)
|
||||
try:
|
||||
logger.debug(f"Processing: {driveitem.web_url}")
|
||||
doc_batch.append(
|
||||
@@ -1096,13 +1309,33 @@ class SharepointConnector(
|
||||
logger.warning(f"Failed to process driveitem: {str(e)}")
|
||||
|
||||
if len(doc_batch) >= SLIM_BATCH_SIZE:
|
||||
log_memory_usage(
|
||||
"_fetch_slim_documents_from_sharepoint:yielding_batch",
|
||||
doc_batch,
|
||||
"doc_batch",
|
||||
)
|
||||
yield doc_batch
|
||||
doc_batch = []
|
||||
|
||||
# Process site pages if flag is True
|
||||
if self.include_site_pages:
|
||||
log_memory_usage(
|
||||
f"_fetch_slim_documents_from_sharepoint:before_fetch_site_pages:{site_descriptor.url}"
|
||||
)
|
||||
site_pages = self._fetch_site_pages(site_descriptor)
|
||||
for site_page in site_pages:
|
||||
log_memory_usage(
|
||||
f"_fetch_slim_documents_from_sharepoint:after_fetch_site_pages:{site_descriptor.url}",
|
||||
site_pages,
|
||||
"site_pages",
|
||||
)
|
||||
for page_idx, site_page in enumerate(site_pages):
|
||||
if page_idx % 20 == 0: # Log every 20 site pages
|
||||
log_memory_usage(
|
||||
f"_fetch_slim_documents_from_sharepoint:processing_sitepage_"
|
||||
f"{page_idx}:{site_page.get('id', 'unknown')}",
|
||||
doc_batch,
|
||||
"doc_batch_size",
|
||||
)
|
||||
logger.debug(
|
||||
f"Processing site page: {site_page.get('webUrl', site_page.get('name', 'Unknown'))}"
|
||||
)
|
||||
@@ -1112,8 +1345,18 @@ class SharepointConnector(
|
||||
)
|
||||
)
|
||||
if len(doc_batch) >= SLIM_BATCH_SIZE:
|
||||
log_memory_usage(
|
||||
"_fetch_slim_documents_from_sharepoint:yielding_page_batch",
|
||||
doc_batch,
|
||||
"doc_batch",
|
||||
)
|
||||
yield doc_batch
|
||||
doc_batch = []
|
||||
log_memory_usage(
|
||||
"_fetch_slim_documents_from_sharepoint:final_yield",
|
||||
doc_batch,
|
||||
"final_doc_batch",
|
||||
)
|
||||
yield doc_batch
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
@@ -1248,6 +1491,7 @@ class SharepointConnector(
|
||||
checkpoint: SharepointConnectorCheckpoint,
|
||||
include_permissions: bool = False,
|
||||
) -> CheckpointOutput[SharepointConnectorCheckpoint]:
|
||||
log_memory_usage("_load_from_checkpoint:start", checkpoint, "checkpoint")
|
||||
|
||||
if self._graph_client is None:
|
||||
raise ConnectorMissingCredentialError("Sharepoint")
|
||||
@@ -1261,8 +1505,14 @@ class SharepointConnector(
|
||||
and not checkpoint.process_site_pages
|
||||
):
|
||||
logger.info("Initializing SharePoint sites for processing")
|
||||
log_memory_usage("_load_from_checkpoint:before_fetch_sites")
|
||||
site_descs = self.site_descriptors or self.fetch_sites()
|
||||
checkpoint.cached_site_descriptors = deque(site_descs)
|
||||
log_memory_usage(
|
||||
"_load_from_checkpoint:after_site_init",
|
||||
checkpoint.cached_site_descriptors,
|
||||
"cached_site_descriptors",
|
||||
)
|
||||
|
||||
if not checkpoint.cached_site_descriptors:
|
||||
logger.warning(
|
||||
@@ -1388,9 +1638,17 @@ class SharepointConnector(
|
||||
return checkpoint
|
||||
|
||||
try:
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:before_get_drive_items:{current_drive_name}"
|
||||
)
|
||||
driveitems = self._get_drive_items_for_drive_name(
|
||||
site_descriptor, current_drive_name, start_dt, end_dt
|
||||
)
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:after_get_drive_items:{current_drive_name}",
|
||||
driveitems,
|
||||
"driveitems",
|
||||
)
|
||||
|
||||
if not driveitems:
|
||||
logger.warning(
|
||||
@@ -1419,7 +1677,16 @@ class SharepointConnector(
|
||||
if current_drive_name == "Documents"
|
||||
else current_drive_name
|
||||
)
|
||||
for driveitem in driveitems:
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:before_process_driveitems:{current_drive_name}",
|
||||
driveitems,
|
||||
"driveitems_to_process",
|
||||
)
|
||||
for item_idx, driveitem in enumerate(driveitems):
|
||||
if item_idx % 25 == 0: # Log every 25 drive items
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:processing_driveitem_{item_idx}:{driveitem.id}"
|
||||
)
|
||||
driveitem_extension = get_file_ext(driveitem.name)
|
||||
# Only yield empty documents if they are PDFs or images
|
||||
should_yield_if_empty = (
|
||||
@@ -1428,6 +1695,9 @@ class SharepointConnector(
|
||||
)
|
||||
|
||||
try:
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:before_convert_driveitem:{driveitem.id}"
|
||||
)
|
||||
doc = _convert_driveitem_to_document_with_permissions(
|
||||
driveitem,
|
||||
current_drive_name,
|
||||
@@ -1435,23 +1705,44 @@ class SharepointConnector(
|
||||
self.graph_client,
|
||||
include_permissions=include_permissions,
|
||||
)
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:after_convert_driveitem:{driveitem.id}",
|
||||
doc,
|
||||
"converted_doc",
|
||||
)
|
||||
|
||||
if doc:
|
||||
if doc.sections:
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:yielding_doc:{driveitem.id}",
|
||||
doc,
|
||||
"yielded_doc",
|
||||
)
|
||||
yield doc
|
||||
elif should_yield_if_empty:
|
||||
doc.sections = [
|
||||
TextSection(link=driveitem.web_url, text="")
|
||||
]
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:yielding_empty_doc:{driveitem.id}",
|
||||
doc,
|
||||
"yielded_empty_doc",
|
||||
)
|
||||
yield doc
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to process driveitem {driveitem.web_url}: {e}"
|
||||
)
|
||||
# Yield a ConnectorFailure for individual document processing failures
|
||||
yield self._create_document_failure(
|
||||
failure = self._create_document_failure(
|
||||
driveitem, f"Failed to process: {str(e)}", e
|
||||
)
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:yielding_failure:{driveitem.id}",
|
||||
failure,
|
||||
"yielded_failure",
|
||||
)
|
||||
yield failure
|
||||
|
||||
# Clear current drive after processing
|
||||
checkpoint.current_drive_name = None
|
||||
@@ -1484,9 +1775,17 @@ class SharepointConnector(
|
||||
site_descriptor = checkpoint.current_site_descriptor
|
||||
start_dt = datetime.fromtimestamp(start, tz=timezone.utc)
|
||||
end_dt = datetime.fromtimestamp(end, tz=timezone.utc)
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:before_fetch_site_pages:{site_descriptor.url}"
|
||||
)
|
||||
site_pages = self._fetch_site_pages(
|
||||
site_descriptor, start=start_dt, end=end_dt
|
||||
)
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:after_fetch_site_pages:{site_descriptor.url}",
|
||||
site_pages,
|
||||
"site_pages",
|
||||
)
|
||||
client_ctx: ClientContext | None = None
|
||||
if include_permissions:
|
||||
if self.msal_app and self.sp_tenant_domain:
|
||||
@@ -1497,19 +1796,27 @@ class SharepointConnector(
|
||||
)
|
||||
else:
|
||||
raise RuntimeError("MSAL app or tenant domain is not set")
|
||||
for site_page in site_pages:
|
||||
for page_idx, site_page in enumerate(site_pages):
|
||||
if page_idx % 10 == 0: # Log every 10 site pages
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:processing_sitepage_{page_idx}:{site_page.get('id', 'unknown')}"
|
||||
)
|
||||
logger.debug(
|
||||
f"Processing site page: {site_page.get('webUrl', site_page.get('name', 'Unknown'))}"
|
||||
)
|
||||
yield (
|
||||
_convert_sitepage_to_document(
|
||||
site_page,
|
||||
site_descriptor.drive_name,
|
||||
client_ctx,
|
||||
self.graph_client,
|
||||
include_permissions=include_permissions,
|
||||
)
|
||||
converted_page = _convert_sitepage_to_document(
|
||||
site_page,
|
||||
site_descriptor.drive_name,
|
||||
client_ctx,
|
||||
self.graph_client,
|
||||
include_permissions=include_permissions,
|
||||
)
|
||||
log_memory_usage(
|
||||
f"_load_from_checkpoint:yielding_sitepage:{site_page.get('id', 'unknown')}",
|
||||
converted_page,
|
||||
"yielded_sitepage",
|
||||
)
|
||||
yield converted_page
|
||||
logger.info(
|
||||
f"Finished processing site pages for site: {site_descriptor.url}"
|
||||
)
|
||||
@@ -1547,6 +1854,9 @@ class SharepointConnector(
|
||||
f"SharePoint processing complete. Finished last site: {current_site}"
|
||||
)
|
||||
checkpoint.has_more = False
|
||||
log_memory_usage(
|
||||
"_load_from_checkpoint:end_complete", checkpoint, "final_checkpoint"
|
||||
)
|
||||
return checkpoint
|
||||
|
||||
def load_from_checkpoint(
|
||||
|
||||
49
backend/onyx/utils/memory_logger.py
Normal file
49
backend/onyx/utils/memory_logger.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import psutil
|
||||
from pympler import asizeof
|
||||
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def log_memory_usage(
|
||||
label: str,
|
||||
specific_object: Any = None,
|
||||
object_label: str = "",
|
||||
) -> None:
|
||||
"""Log current process memory usage and optionally the size of a specific object.
|
||||
|
||||
Args:
|
||||
label: A descriptive label for the current location/operation in code
|
||||
specific_object: Optional object to measure the size of
|
||||
object_label: Optional label describing the specific object
|
||||
"""
|
||||
try:
|
||||
# Get current process memory info
|
||||
process = psutil.Process(os.getpid())
|
||||
memory_info = process.memory_info()
|
||||
|
||||
# Convert to MB for readability
|
||||
rss_mb = memory_info.rss / (1024 * 1024)
|
||||
vms_mb = memory_info.vms / (1024 * 1024)
|
||||
|
||||
log_parts = [f"MEMORY[{label}]", f"RSS: {rss_mb:.2f}MB", f"VMS: {vms_mb:.2f}MB"]
|
||||
|
||||
# Add object size if provided
|
||||
if specific_object is not None:
|
||||
try:
|
||||
# recursively calculate the size of the object
|
||||
obj_size = asizeof.asizeof(specific_object)
|
||||
obj_size_mb = obj_size / (1024 * 1024)
|
||||
obj_desc = f"[{object_label}]" if object_label else "[object]"
|
||||
log_parts.append(f"OBJ{obj_desc}: {obj_size_mb:.2f}MB")
|
||||
except Exception as e:
|
||||
log_parts.append(f"OBJ_SIZE_ERROR: {str(e)}")
|
||||
|
||||
logger.info(" | ".join(log_parts))
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to log memory usage for {label}: {str(e)}")
|
||||
@@ -60,6 +60,7 @@ pyairtable==3.0.1
|
||||
pycryptodome==3.19.1
|
||||
pydantic==2.11.7
|
||||
PyGithub==2.5.0
|
||||
pympler==1.1
|
||||
python-dateutil==2.8.2
|
||||
python-gitlab==5.6.0
|
||||
python-pptx==0.6.23
|
||||
|
||||
Reference in New Issue
Block a user