mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-04-07 07:52:44 +00:00
Compare commits
3 Commits
cli/v0.2.1
...
temp/pr-55
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
53515cd345 | ||
|
|
d6c7a528e2 | ||
|
|
b5e0d0c220 |
@@ -21,7 +21,6 @@ from onyx.connectors.confluence.onyx_confluence import OnyxConfluence
|
||||
from onyx.connectors.confluence.utils import build_confluence_document_id
|
||||
from onyx.connectors.confluence.utils import convert_attachment_to_content
|
||||
from onyx.connectors.confluence.utils import datetime_from_string
|
||||
from onyx.connectors.confluence.utils import process_attachment
|
||||
from onyx.connectors.confluence.utils import update_param_in_path
|
||||
from onyx.connectors.confluence.utils import validate_attachment_filetype
|
||||
from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider
|
||||
@@ -84,7 +83,6 @@ MAX_CACHED_IDS = 100
|
||||
|
||||
|
||||
class ConfluenceCheckpoint(ConnectorCheckpoint):
|
||||
|
||||
next_page_url: str | None
|
||||
|
||||
|
||||
@@ -108,6 +106,9 @@ class ConfluenceConnector(
|
||||
# pages.
|
||||
labels_to_skip: list[str] = CONFLUENCE_CONNECTOR_LABELS_TO_SKIP,
|
||||
timezone_offset: float = CONFLUENCE_TIMEZONE_OFFSET,
|
||||
# Optional per-connector overrides for attachment processing limits
|
||||
attachment_size_threshold_bytes: int | None = None,
|
||||
attachment_char_count_threshold: int | None = None,
|
||||
scoped_token: bool = False,
|
||||
) -> None:
|
||||
self.wiki_base = wiki_base
|
||||
@@ -124,6 +125,9 @@ class ConfluenceConnector(
|
||||
self._low_timeout_confluence_client: OnyxConfluence | None = None
|
||||
self._fetched_titles: set[str] = set()
|
||||
self.allow_images = False
|
||||
# Store optional per-connector overrides
|
||||
self.attachment_size_threshold_bytes = attachment_size_threshold_bytes
|
||||
self.attachment_char_count_threshold = attachment_char_count_threshold
|
||||
|
||||
# Remove trailing slash from wiki_base if present
|
||||
self.wiki_base = wiki_base.rstrip("/")
|
||||
@@ -247,9 +251,26 @@ class ConfluenceConnector(
|
||||
page_query += " order by lastmodified asc"
|
||||
return page_query
|
||||
|
||||
def _construct_attachment_query(self, confluence_page_id: str) -> str:
|
||||
def _construct_attachment_query(
|
||||
self,
|
||||
confluence_page_id: str,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
) -> str:
|
||||
attachment_query = f"type=attachment and container='{confluence_page_id}'"
|
||||
attachment_query += self.cql_label_filter
|
||||
# Add time filters to avoid reprocessing unchanged attachments during refresh
|
||||
if start:
|
||||
formatted_start_time = datetime.fromtimestamp(
|
||||
start, tz=self.timezone
|
||||
).strftime("%Y-%m-%d %H:%M")
|
||||
attachment_query += f" and lastmodified >= '{formatted_start_time}'"
|
||||
if end:
|
||||
formatted_end_time = datetime.fromtimestamp(end, tz=self.timezone).strftime(
|
||||
"%Y-%m-%d %H:%M"
|
||||
)
|
||||
attachment_query += f" and lastmodified <= '{formatted_end_time}'"
|
||||
attachment_query += " order by lastmodified asc"
|
||||
return attachment_query
|
||||
|
||||
def _get_comment_string_for_page_id(self, page_id: str) -> str:
|
||||
@@ -303,41 +324,8 @@ class ConfluenceConnector(
|
||||
sections.append(
|
||||
TextSection(text=comment_text, link=f"{page_url}#comments")
|
||||
)
|
||||
|
||||
# Process attachments
|
||||
if "children" in page and "attachment" in page["children"]:
|
||||
attachments = self.confluence_client.get_attachments_for_page(
|
||||
page_id, expand="metadata"
|
||||
)
|
||||
|
||||
for attachment in attachments.get("results", []):
|
||||
# Process each attachment
|
||||
result = process_attachment(
|
||||
self.confluence_client,
|
||||
attachment,
|
||||
page_id,
|
||||
self.allow_images,
|
||||
)
|
||||
|
||||
if result and result.text:
|
||||
# Create a section for the attachment text
|
||||
attachment_section = TextSection(
|
||||
text=result.text,
|
||||
link=f"{page_url}#attachment-{attachment['id']}",
|
||||
)
|
||||
sections.append(attachment_section)
|
||||
elif result and result.file_name:
|
||||
# Create an ImageSection for image attachments
|
||||
image_section = ImageSection(
|
||||
link=f"{page_url}#attachment-{attachment['id']}",
|
||||
image_file_id=result.file_name,
|
||||
)
|
||||
sections.append(image_section)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Error processing attachment '{attachment.get('title')}':",
|
||||
f"{result.error if result else 'Unknown error'}",
|
||||
)
|
||||
# Note: attachments are no longer merged into the page document.
|
||||
# They are indexed as separate documents downstream.
|
||||
|
||||
# Extract metadata
|
||||
metadata = {}
|
||||
@@ -386,9 +374,14 @@ class ConfluenceConnector(
|
||||
)
|
||||
|
||||
def _fetch_page_attachments(
|
||||
self, page: dict[str, Any], doc: Document
|
||||
) -> Document | ConnectorFailure:
|
||||
attachment_query = self._construct_attachment_query(page["id"])
|
||||
self,
|
||||
page: dict[str, Any],
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
) -> tuple[list[Document], list[ConnectorFailure]]:
|
||||
attachment_query = self._construct_attachment_query(page["id"], start, end)
|
||||
attachment_failures: list[ConnectorFailure] = []
|
||||
attachment_docs: list[Document] = []
|
||||
|
||||
for attachment in self.confluence_client.paginated_cql_retrieval(
|
||||
cql=attachment_query,
|
||||
@@ -418,54 +411,117 @@ class ConfluenceConnector(
|
||||
f"Processing attachment: {attachment['title']} attached to page {page['title']}"
|
||||
)
|
||||
|
||||
# Attempt to get textual content or image summarization:
|
||||
object_url = build_confluence_document_id(
|
||||
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
|
||||
)
|
||||
# Attachment document id: use the download URL for stable identity
|
||||
try:
|
||||
object_url = build_confluence_document_id(
|
||||
self.wiki_base, attachment["_links"]["download"], self.is_cloud
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
f"Attachment url for id {attachment['id']} somehow invalid, skipping"
|
||||
)
|
||||
continue
|
||||
try:
|
||||
response = convert_attachment_to_content(
|
||||
confluence_client=self.confluence_client,
|
||||
attachment=attachment,
|
||||
page_id=page["id"],
|
||||
allow_images=self.allow_images,
|
||||
attachment_size_threshold_bytes=self.attachment_size_threshold_bytes,
|
||||
attachment_char_count_threshold=self.attachment_char_count_threshold,
|
||||
)
|
||||
if response is None:
|
||||
continue
|
||||
|
||||
content_text, file_storage_name = response
|
||||
|
||||
# Build attachment-specific metadata
|
||||
attachment_metadata: dict[str, str | list[str]] = {}
|
||||
if "space" in attachment:
|
||||
attachment_metadata["space"] = attachment["space"].get("name", "")
|
||||
labels: list[str] = []
|
||||
if "metadata" in attachment and "labels" in attachment["metadata"]:
|
||||
for label in attachment["metadata"]["labels"].get("results", []):
|
||||
labels.append(label.get("name", ""))
|
||||
if labels:
|
||||
attachment_metadata["labels"] = labels
|
||||
page_url = build_confluence_document_id(
|
||||
self.wiki_base, page["_links"]["webui"], self.is_cloud
|
||||
)
|
||||
attachment_metadata["parent_page_id"] = page_url
|
||||
|
||||
primary_owners: list[BasicExpertInfo] | None = None
|
||||
if "version" in attachment and "by" in attachment["version"]:
|
||||
author = attachment["version"]["by"]
|
||||
display_name = author.get("displayName", "Unknown")
|
||||
email = author.get("email", "unknown@domain.invalid")
|
||||
primary_owners = [
|
||||
BasicExpertInfo(display_name=display_name, email=email)
|
||||
]
|
||||
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
if content_text:
|
||||
doc.sections.append(
|
||||
TextSection(
|
||||
text=content_text,
|
||||
link=object_url,
|
||||
)
|
||||
)
|
||||
sections.append(TextSection(text=content_text, link=object_url))
|
||||
elif file_storage_name:
|
||||
doc.sections.append(
|
||||
ImageSection(
|
||||
link=object_url,
|
||||
image_file_id=file_storage_name,
|
||||
)
|
||||
sections.append(
|
||||
ImageSection(link=object_url, image_file_id=file_storage_name)
|
||||
)
|
||||
|
||||
attachment_doc = Document(
|
||||
id=object_url,
|
||||
sections=sections,
|
||||
source=DocumentSource.CONFLUENCE,
|
||||
semantic_identifier=attachment.get("title", object_url),
|
||||
metadata=attachment_metadata,
|
||||
doc_updated_at=(
|
||||
datetime_from_string(attachment["version"]["when"])
|
||||
if attachment.get("version")
|
||||
and attachment["version"].get("when")
|
||||
else None
|
||||
),
|
||||
primary_owners=primary_owners,
|
||||
)
|
||||
attachment_docs.append(attachment_doc)
|
||||
except ValueError as e:
|
||||
# Handle size limit errors specifically and continue with next attachment
|
||||
logger.warning(
|
||||
f"Attachment '{attachment['title']}' skipped due to size limits: {e}"
|
||||
)
|
||||
attachment_failures.append(
|
||||
ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=object_url,
|
||||
document_link=object_url,
|
||||
),
|
||||
failure_message=f"Attachment '{attachment['title']}' was skipped due to size limits: {e}",
|
||||
exception=None,
|
||||
)
|
||||
)
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to extract/summarize attachment {attachment['title']}",
|
||||
exc_info=e,
|
||||
exc_info=True,
|
||||
)
|
||||
if is_atlassian_date_error(
|
||||
e
|
||||
): # propagate error to be caught and retried
|
||||
if is_atlassian_date_error(e):
|
||||
# propagate error to be caught and retried
|
||||
raise
|
||||
return ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=doc.id,
|
||||
document_link=object_url,
|
||||
),
|
||||
failure_message=f"Failed to extract/summarize attachment {attachment['title']} for doc {doc.id}",
|
||||
exception=e,
|
||||
# collect failure but continue with other attachments
|
||||
attachment_failures.append(
|
||||
ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=object_url,
|
||||
document_link=object_url,
|
||||
),
|
||||
failure_message=(
|
||||
f"Failed to extract/summarize attachment {attachment['title']}"
|
||||
f" for parent page {page.get('id')}"
|
||||
),
|
||||
exception=e,
|
||||
)
|
||||
)
|
||||
return doc
|
||||
continue
|
||||
return attachment_docs, attachment_failures
|
||||
|
||||
def _fetch_document_batches(
|
||||
self,
|
||||
@@ -501,15 +557,22 @@ class ConfluenceConnector(
|
||||
# Build doc from page
|
||||
doc_or_failure = self._convert_page_to_document(page)
|
||||
|
||||
if isinstance(doc_or_failure, ConnectorFailure):
|
||||
yield doc_or_failure
|
||||
continue
|
||||
# Now get attachments for that page:
|
||||
doc_or_failure = self._fetch_page_attachments(page, doc_or_failure)
|
||||
|
||||
# yield completed document (or failure)
|
||||
# Yield the page document first
|
||||
yield doc_or_failure
|
||||
|
||||
# Now get attachments for that page as separate documents
|
||||
attachment_docs, attachment_failures = self._fetch_page_attachments(
|
||||
page, start, end
|
||||
)
|
||||
|
||||
# Yield each attachment document
|
||||
for a_doc in attachment_docs:
|
||||
yield a_doc
|
||||
|
||||
# Yield any attachment failures separately
|
||||
for failure in attachment_failures:
|
||||
yield failure
|
||||
|
||||
# Create checkpoint once a full page of results is returned
|
||||
if checkpoint.next_page_url and checkpoint.next_page_url != page_query_url:
|
||||
return checkpoint
|
||||
|
||||
@@ -443,9 +443,9 @@ class OnyxConfluence:
|
||||
return attr
|
||||
|
||||
# wrap the method with our retry handler
|
||||
rate_limited_method: Callable[..., Any] = (
|
||||
self._make_rate_limited_confluence_method(name, self._credentials_provider)
|
||||
)
|
||||
rate_limited_method: Callable[
|
||||
..., Any
|
||||
] = self._make_rate_limited_confluence_method(name, self._credentials_provider)
|
||||
|
||||
return rate_limited_method
|
||||
|
||||
|
||||
@@ -110,6 +110,9 @@ def process_attachment(
|
||||
attachment: dict[str, Any],
|
||||
parent_content_id: str | None,
|
||||
allow_images: bool,
|
||||
*,
|
||||
attachment_size_threshold_bytes: int | None = None,
|
||||
attachment_char_count_threshold: int | None = None,
|
||||
) -> AttachmentProcessingResult:
|
||||
"""
|
||||
Processes a Confluence attachment. If it's a document, extracts text,
|
||||
@@ -144,16 +147,21 @@ def process_attachment(
|
||||
error="Image downloading is not enabled",
|
||||
)
|
||||
else:
|
||||
if attachment_size > CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD:
|
||||
size_threshold = (
|
||||
attachment_size_threshold_bytes
|
||||
if attachment_size_threshold_bytes is not None
|
||||
else CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD
|
||||
)
|
||||
if attachment_size > size_threshold:
|
||||
logger.warning(
|
||||
f"Skipping {attachment_link} due to size. "
|
||||
f"size={attachment_size} "
|
||||
f"threshold={CONFLUENCE_CONNECTOR_ATTACHMENT_SIZE_THRESHOLD}"
|
||||
f"threshold={size_threshold}"
|
||||
)
|
||||
return AttachmentProcessingResult(
|
||||
text=None,
|
||||
file_name=None,
|
||||
error=f"Attachment text too long: {attachment_size} chars",
|
||||
error=f"Attachment file too large: {attachment_size} bytes (max: {size_threshold} bytes)",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
@@ -195,11 +203,16 @@ def process_attachment(
|
||||
)
|
||||
|
||||
# Skip if the text is too long
|
||||
if len(text) > CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD:
|
||||
char_threshold = (
|
||||
attachment_char_count_threshold
|
||||
if attachment_char_count_threshold is not None
|
||||
else CONFLUENCE_CONNECTOR_ATTACHMENT_CHAR_COUNT_THRESHOLD
|
||||
)
|
||||
if len(text) > char_threshold:
|
||||
return AttachmentProcessingResult(
|
||||
text=None,
|
||||
file_name=None,
|
||||
error=f"Attachment text too long: {len(text)} chars",
|
||||
error=f"Extracted text too long: {len(text)} characters (max: {char_threshold} characters)",
|
||||
)
|
||||
|
||||
return AttachmentProcessingResult(text=text, file_name=None, error=None)
|
||||
@@ -245,6 +258,9 @@ def convert_attachment_to_content(
|
||||
attachment: dict[str, Any],
|
||||
page_id: str,
|
||||
allow_images: bool,
|
||||
*,
|
||||
attachment_size_threshold_bytes: int | None = None,
|
||||
attachment_char_count_threshold: int | None = None,
|
||||
) -> tuple[str | None, str | None] | None:
|
||||
"""
|
||||
Facade function which:
|
||||
@@ -260,8 +276,25 @@ def convert_attachment_to_content(
|
||||
)
|
||||
return None
|
||||
|
||||
result = process_attachment(confluence_client, attachment, page_id, allow_images)
|
||||
result = process_attachment(
|
||||
confluence_client,
|
||||
attachment,
|
||||
page_id,
|
||||
allow_images,
|
||||
attachment_size_threshold_bytes=attachment_size_threshold_bytes,
|
||||
attachment_char_count_threshold=attachment_char_count_threshold,
|
||||
)
|
||||
if result.error is not None:
|
||||
# Check if this is a size limit error that should be reported to users
|
||||
# Catches both "file too large" (bytes) and "text too long" (characters) errors
|
||||
if (
|
||||
"too large" in result.error.lower()
|
||||
or "too long" in result.error.lower()
|
||||
or "size" in result.error.lower()
|
||||
):
|
||||
# Raise an exception so it can be caught and reported as a failure
|
||||
raise ValueError(f"Size limit exceeded - {result.error}")
|
||||
|
||||
logger.warning(
|
||||
f"Attachment {attachment['title']} encountered error: {result.error}"
|
||||
)
|
||||
|
||||
@@ -609,7 +609,24 @@ export const connectorConfigs: Record<
|
||||
defaultTab: "space",
|
||||
},
|
||||
],
|
||||
advanced_values: [],
|
||||
advanced_values: [
|
||||
{
|
||||
type: "number",
|
||||
label: "Attachment Size Threshold (bytes)",
|
||||
name: "attachment_size_threshold_bytes",
|
||||
optional: true,
|
||||
description:
|
||||
"Attachments larger than this will be skipped. Leave blank to use default (10 MB).",
|
||||
},
|
||||
{
|
||||
type: "number",
|
||||
label: "Attachment Text Character Threshold",
|
||||
name: "attachment_char_count_threshold",
|
||||
optional: true,
|
||||
description:
|
||||
"If extracted text exceeds this character limit, the attachment is skipped. Leave blank to use default (200,000).",
|
||||
},
|
||||
],
|
||||
},
|
||||
jira: {
|
||||
description: "Configure Jira connector",
|
||||
|
||||
Reference in New Issue
Block a user