Compare commits

...

3 Commits

4 changed files with 198 additions and 85 deletions

View File

@@ -21,7 +21,6 @@ from onyx.connectors.confluence.onyx_confluence import OnyxConfluence
from onyx.connectors.confluence.utils import build_confluence_document_id
from onyx.connectors.confluence.utils import convert_attachment_to_content
from onyx.connectors.confluence.utils import datetime_from_string
from onyx.connectors.confluence.utils import process_attachment
from onyx.connectors.confluence.utils import update_param_in_path
from onyx.connectors.confluence.utils import validate_attachment_filetype
from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider
@@ -84,7 +83,6 @@ MAX_CACHED_IDS = 100
class ConfluenceCheckpoint(ConnectorCheckpoint):
next_page_url: str | None
@@ -108,6 +106,9 @@ class ConfluenceConnector(
# pages.
labels_to_skip: list[str] = CONFLUENCE_CONNECTOR_LABELS_TO_SKIP,
timezone_offset: float = CONFLUENCE_TIMEZONE_OFFSET,
# Optional per-connector overrides for attachment processing limits
attachment_size_threshold_bytes: int | None = None,
attachment_char_count_threshold: int | None = None,
scoped_token: bool = False,
) -> None:
self.wiki_base = wiki_base
@@ -124,6 +125,9 @@ class ConfluenceConnector(
self._low_timeout_confluence_client: OnyxConfluence | None = None
self._fetched_titles: set[str] = set()
self.allow_images = False
# Store optional per-connector overrides
self.attachment_size_threshold_bytes = attachment_size_threshold_bytes
self.attachment_char_count_threshold = attachment_char_count_threshold
# Remove trailing slash from wiki_base if present
self.wiki_base = wiki_base.rstrip("/")
@@ -247,9 +251,26 @@ class ConfluenceConnector(
page_query += " order by lastmodified asc"
return page_query
def _construct_attachment_query(self, confluence_page_id: str) -> str:
def _construct_attachment_query(
self,
confluence_page_id: str,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> str:
attachment_query = f"type=attachment and container='{confluence_page_id}'"
attachment_query += self.cql_label_filter
# Add time filters to avoid reprocessing unchanged attachments during refresh
if start:
formatted_start_time = datetime.fromtimestamp(
start, tz=self.timezone
).strftime("%Y-%m-%d %H:%M")
attachment_query += f" and lastmodified >= '{formatted_start_time}'"
if end:
formatted_end_time = datetime.fromtimestamp(end, tz=self.timezone).strftime(
"%Y-%m-%d %H:%M"
)
attachment_query += f" and lastmodified <= '{formatted_end_time}'"
attachment_query += " order by lastmodified asc"
return attachment_query
def _get_comment_string_for_page_id(self, page_id: str) -> str:
@@ -303,41 +324,8 @@ class ConfluenceConnector(
sections.append(
TextSection(text=comment_text, link=f"{page_url}#comments")
)
# Process attachments
if "children" in page and "attachment" in page["children"]:
attachments = self.confluence_client.get_attachments_for_page(
page_id, expand="metadata"
)
for attachment in attachments.get("results", []):
# Process each attachment
result = process_attachment(
self.confluence_client,
attachment,
page_id,
self.allow_images,
)
if result and result.text:
# Create a section for the attachment text
attachment_section = TextSection(
text=result.text,
link=f"{page_url}#attachment-{attachment['id']}",
)
sections.append(attachment_section)
elif result and result.file_name:
# Create an ImageSection for image attachments
image_section = ImageSection(
link=f"{page_url}#attachment-{attachment['id']}",
image_file_id=result.file_name,
)
sections.append(image_section)
else:
logger.warning(
f"Error processing attachment '{attachment.get('title')}':",
f"{result.error if result else 'Unknown error'}",
)
# Note: attachments are no longer merged into the page document.
# They are indexed as separate documents downstream.
# Extract metadata
metadata = {}
@@ -386,9 +374,14 @@ class ConfluenceConnector(
)
def _fetch_page_attachments(
self, page: dict[str, Any], doc: Document
) -> Document | ConnectorFailure:
attachment_query = self._construct_attachment_query(page["id"])
self,
page: dict[str, Any],
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> tuple[list[Document], list[ConnectorFailure]]:
attachment_query = self._construct_attachment_query(page["id"], start, end)
attachment_failures: list[ConnectorFailure] = []
attachment_docs: list[Document] = []
for attachment in self.confluence_client.paginated_cql_retrieval(
cql=attachment_query,
@@ -418,54 +411,117 @@ class ConfluenceConnector(
f"Processing attachment: {attachment['title']} attached to page {page['title']}"
)
# Attempt to get textual content or image summarization:
object_url = build_confluence_document_id(
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
)
# Attachment document id: use the download URL for stable identity
try:
object_url = build_confluence_document_id(
self.wiki_base, attachment["_links"]["download"], self.is_cloud
)
except Exception:
logger.warning(
f"Attachment url for id {attachment['id']} somehow invalid, skipping"
)
continue
try:
response = convert_attachment_to_content(
confluence_client=self.confluence_client,
attachment=attachment,
page_id=page["id"],
allow_images=self.allow_images,
attachment_size_threshold_bytes=self.attachment_size_threshold_bytes,
attachment_char_count_threshold=self.attachment_char_count_threshold,
)
if response is None:
continue
content_text, file_storage_name = response
# Build attachment-specific metadata
attachment_metadata: dict[str, str | list[str]] = {}
if "space" in attachment:
attachment_metadata["space"] = attachment["space"].get("name", "")
labels: list[str] = []
if "metadata" in attachment and "labels" in attachment["metadata"]:
for label in attachment["metadata"]["labels"].get("results", []):
labels.append(label.get("name", ""))
if labels:
attachment_metadata["labels"] = labels
page_url = build_confluence_document_id(
self.wiki_base, page["_links"]["webui"], self.is_cloud
)
attachment_metadata["parent_page_id"] = page_url
primary_owners: list[BasicExpertInfo] | None = None
if "version" in attachment and "by" in attachment["version"]:
author = attachment["version"]["by"]
display_name = author.get("displayName", "Unknown")
email = author.get("email", "unknown@domain.invalid")
primary_owners = [
BasicExpertInfo(display_name=display_name, email=email)
]
sections: list[TextSection | ImageSection] = []
if content_text:
doc.sections.append(
TextSection(
text=content_text,
link=object_url,
)
)
sections.append(TextSection(text=content_text, link=object_url))
elif file_storage_name:
doc.sections.append(
ImageSection(
link=object_url,
image_file_id=file_storage_name,
)
sections.append(
ImageSection(link=object_url, image_file_id=file_storage_name)
)
attachment_doc = Document(
id=object_url,
sections=sections,
source=DocumentSource.CONFLUENCE,
semantic_identifier=attachment.get("title", object_url),
metadata=attachment_metadata,
doc_updated_at=(
datetime_from_string(attachment["version"]["when"])
if attachment.get("version")
and attachment["version"].get("when")
else None
),
primary_owners=primary_owners,
)
attachment_docs.append(attachment_doc)
except ValueError as e:
# Handle size limit errors specifically and continue with next attachment
logger.warning(
f"Attachment '{attachment['title']}' skipped due to size limits: {e}"
)
attachment_failures.append(
ConnectorFailure(
failed_document=DocumentFailure(
document_id=object_url,
document_link=object_url,
),
failure_message=f"Attachment '{attachment['title']}' was skipped due to size limits: {e}",
exception=None,
)
)
continue
except Exception as e:
logger.error(
f"Failed to extract/summarize attachment {attachment['title']}",
exc_info=e,
exc_info=True,
)
if is_atlassian_date_error(
e
): # propagate error to be caught and retried
if is_atlassian_date_error(e):
# propagate error to be caught and retried
raise
return ConnectorFailure(
failed_document=DocumentFailure(
document_id=doc.id,
document_link=object_url,
),
failure_message=f"Failed to extract/summarize attachment {attachment['title']} for doc {doc.id}",
exception=e,
# collect failure but continue with other attachments
attachment_failures.append(
ConnectorFailure(
failed_document=DocumentFailure(
document_id=object_url,
document_link=object_url,
),
failure_message=(
f"Failed to extract/summarize attachment {attachment['title']}"
f" for parent page {page.get('id')}"
),
exception=e,
)
)
return doc
continue
return attachment_docs, attachment_failures
def _fetch_document_batches(
self,
@@ -501,15 +557,22 @@ class ConfluenceConnector(
# Build doc from page
doc_or_failure = self._convert_page_to_document(page)
if isinstance(doc_or_failure, ConnectorFailure):
yield doc_or_failure
continue
# Now get attachments for that page:
doc_or_failure = self._fetch_page_attachments(page, doc_or_failure)
# yield completed document (or failure)
# Yield the page document first
yield doc_or_failure
# Now get attachments for that page as separate documents
attachment_docs, attachment_failures = self._fetch_page_attachments(
page, start, end
)
# Yield each attachment document
for a_doc in attachment_docs:
yield a_doc
# Yield any attachment failures separately
for failure in attachment_failures:
yield failure
# Create checkpoint once a full page of results is returned
if checkpoint.next_page_url and checkpoint.next_page_url != page_query_url:
return checkpoint

View File

@@ -443,9 +443,9 @@ class OnyxConfluence:
return attr
# wrap the method with our retry handler
rate_limited_method: Callable[..., Any] = (
self._make_rate_limited_confluence_method(name, self._credentials_provider)
)
rate_limited_method: Callable[
..., Any
] = self._make_rate_limited_confluence_method(name, self._credentials_provider)
return rate_limited_method

View File

@@ -110,6 +110,9 @@ def process_attachment(
attachment: dict[str, Any],
parent_content_id: str | None,
allow_images: bool,
*,
attachment_size_threshold_bytes: int | None = None,
attachment_char_count_threshold: int | None = None,
) -> AttachmentProcessingResult:
"""
Processes a Confluence attachment. If it's a document, extracts text,
@@ -144,16 +147,21 @@ def process_attachment(
error="Image downloading is not enabled",
)
else:
if attachment_size > CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD:
size_threshold = (
attachment_size_threshold_bytes
if attachment_size_threshold_bytes is not None
else CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD
)
if attachment_size > size_threshold:
logger.warning(
f"Skipping {attachment_link} due to size. "
f"size={attachment_size} "
f"threshold={CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD}"
f"threshold={size_threshold}"
)
return AttachmentProcessingResult(
text=None,
file_name=None,
error=f"Attachment text too long: {attachment_size} chars",
error=f"Attachment file too large: {attachment_size} bytes (max: {size_threshold} bytes)",
)
logger.info(
@@ -195,11 +203,16 @@ def process_attachment(
)
# Skip if the text is too long
if len(text) > CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD:
char_threshold = (
attachment_char_count_threshold
if attachment_char_count_threshold is not None
else CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD
)
if len(text) > char_threshold:
return AttachmentProcessingResult(
text=None,
file_name=None,
error=f"Attachment text too long: {len(text)} chars",
error=f"Extracted text too long: {len(text)} characters (max: {char_threshold} characters)",
)
return AttachmentProcessingResult(text=text, file_name=None, error=None)
@@ -245,6 +258,9 @@ def convert_attachment_to_content(
attachment: dict[str, Any],
page_id: str,
allow_images: bool,
*,
attachment_size_threshold_bytes: int | None = None,
attachment_char_count_threshold: int | None = None,
) -> tuple[str | None, str | None] | None:
"""
Facade function which:
@@ -260,8 +276,25 @@ def convert_attachment_to_content(
)
return None
result = process_attachment(confluence_client, attachment, page_id, allow_images)
result = process_attachment(
confluence_client,
attachment,
page_id,
allow_images,
attachment_size_threshold_bytes=attachment_size_threshold_bytes,
attachment_char_count_threshold=attachment_char_count_threshold,
)
if result.error is not None:
# Check if this is a size limit error that should be reported to users
# Catches both "file too large" (bytes) and "text too long" (characters) errors
if (
"too large" in result.error.lower()
or "too long" in result.error.lower()
or "size" in result.error.lower()
):
# Raise an exception so it can be caught and reported as a failure
raise ValueError(f"Size limit exceeded - {result.error}")
logger.warning(
f"Attachment {attachment['title']} encountered error: {result.error}"
)

View File

@@ -609,7 +609,24 @@ export const connectorConfigs: Record<
defaultTab: "space",
},
],
advanced_values: [],
advanced_values: [
{
type: "number",
label: "Attachment Size Threshold (bytes)",
name: "attachment_size_threshold_bytes",
optional: true,
description:
"Attachments larger than this will be skipped. Leave blank to use default (10 MB).",
},
{
type: "number",
label: "Attachment Text Character Threshold",
name: "attachment_char_count_threshold",
optional: true,
description:
"If extracted text exceeds this character limit, the attachment is skipped. Leave blank to use default (200,000).",
},
],
},
jira: {
description: "Configure Jira connector",