mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-04-13 19:02:45 +00:00
Compare commits
1 Commits
edge
...
temp/pr-57
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
967e58725f |
@@ -212,6 +212,7 @@ class DocumentSource(str, Enum):
|
||||
|
||||
IMAP = "imap"
|
||||
BITBUCKET = "bitbucket"
|
||||
PYLON = "pylon"
|
||||
|
||||
# Special case just for integration tests
|
||||
MOCK_CONNECTOR = "mock_connector"
|
||||
@@ -619,4 +620,5 @@ project management, and collaboration tools into a single, customizable platform
|
||||
DocumentSource.AIRTABLE: "airtable - database",
|
||||
DocumentSource.HIGHSPOT: "highspot - CRM data",
|
||||
DocumentSource.IMAP: "imap - email data",
|
||||
DocumentSource.PYLON: "pylon - customer support data",
|
||||
}
|
||||
|
||||
1
backend/onyx/connectors/pylon/__init__.py
Normal file
1
backend/onyx/connectors/pylon/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Pylon connector package
|
||||
410
backend/onyx/connectors/pylon/connector.py
Normal file
410
backend/onyx/connectors/pylon/connector.py
Normal file
@@ -0,0 +1,410 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import time
|
||||
from collections.abc import Iterator
|
||||
from typing import Any
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from typing_extensions import override
|
||||
|
||||
from onyx.configs.app_configs import INDEX_BATCH_SIZE
|
||||
from onyx.connectors.exceptions import CredentialExpiredError
|
||||
from onyx.connectors.exceptions import InsufficientPermissionsError
|
||||
from onyx.connectors.exceptions import UnexpectedValidationError
|
||||
from onyx.connectors.interfaces import CheckpointedConnector
|
||||
from onyx.connectors.interfaces import CheckpointOutput
|
||||
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
|
||||
from onyx.connectors.interfaces import SlimConnectorWithPermSync
|
||||
from onyx.connectors.models import ConnectorCheckpoint
|
||||
from onyx.connectors.models import ConnectorFailure
|
||||
from onyx.connectors.models import ConnectorMissingCredentialError
|
||||
from onyx.connectors.models import DocumentFailure
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.connectors.pylon.models import GetIssueMessagesResponseBody
|
||||
from onyx.connectors.pylon.models import GetIssuesResponseBody
|
||||
from onyx.connectors.pylon.models import Issue
|
||||
from onyx.connectors.pylon.models import Message
|
||||
from onyx.connectors.pylon.utils import _create_id
|
||||
from onyx.connectors.pylon.utils import AttachmentData
|
||||
from onyx.connectors.pylon.utils import build_auth_client
|
||||
from onyx.connectors.pylon.utils import build_generic_client
|
||||
from onyx.connectors.pylon.utils import download_attachment
|
||||
from onyx.connectors.pylon.utils import get_time_window_days
|
||||
from onyx.connectors.pylon.utils import is_valid_issue
|
||||
from onyx.connectors.pylon.utils import is_valid_message
|
||||
from onyx.connectors.pylon.utils import map_to_document
|
||||
from onyx.connectors.pylon.utils import normalize_attachment_url
|
||||
from onyx.connectors.pylon.utils import parse_pylon_datetime
|
||||
from onyx.connectors.pylon.utils import parse_ymd_date
|
||||
from onyx.connectors.pylon.utils import pylon_get
|
||||
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import httpx
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
class PylonConnectorCheckpoint(ConnectorCheckpoint):
|
||||
"""Checkpoint state for resumable Pylon indexing.
|
||||
|
||||
Fields:
|
||||
current_day_start: RFC3339 day start for current sub-window
|
||||
|
||||
Note: GET /issues API does not support cursor pagination per OpenAPI spec.
|
||||
Each day window returns all issues in a single response.
|
||||
"""
|
||||
|
||||
current_day_start: str | None = None
|
||||
|
||||
|
||||
class PylonConnector(
|
||||
CheckpointedConnector[PylonConnectorCheckpoint],
|
||||
SlimConnectorWithPermSync,
|
||||
):
|
||||
"""Connector for indexing Pylon customer support data.
|
||||
|
||||
Creates one document per issue. Messages and attachments are embedded
|
||||
as sections within the issue document.
|
||||
|
||||
Args:
|
||||
pylon_entities: Optional entity types to include. Options: "messages", "attachments"
|
||||
- Issues are always included (required as root entity)
|
||||
- "messages": Include issue messages as additional sections
|
||||
- "attachments": Process and include attachment content
|
||||
start_date: Start date for indexing in YYYY-MM-DD format
|
||||
lookback_days: Number of days to look back for updated issues (default: 7).
|
||||
The connector will fetch issues created N days before the sync window to capture
|
||||
updates to existing issues. Issues are only re-indexed if their
|
||||
latest_message_time falls within the sync window.
|
||||
batch_size: Number of documents per batch for indexing
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pylon_entities: list[str],
|
||||
start_date: str,
|
||||
lookback_days: int = 7,
|
||||
batch_size: int = INDEX_BATCH_SIZE,
|
||||
) -> None:
|
||||
# Issues are always included; entities only controls optional messages/attachments
|
||||
self.pylon_entities = pylon_entities
|
||||
self.start_epoch_sec = parse_ymd_date(start_date)
|
||||
self.lookback_days = lookback_days
|
||||
self.batch_size = batch_size
|
||||
self.base_url: str = "https://api.usepylon.com"
|
||||
self.api_key: str | None = None
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
"""Load Pylon API credentials.
|
||||
|
||||
Expects credentials dict with:
|
||||
- pylon_api_key (required): Pylon API key
|
||||
"""
|
||||
self.api_key = credentials.get("pylon_api_key")
|
||||
if not self.api_key:
|
||||
raise ConnectorMissingCredentialError("Pylon")
|
||||
return None
|
||||
|
||||
def _client(self) -> httpx.Client:
|
||||
"""Build authenticated HTTP client."""
|
||||
if not self.api_key:
|
||||
raise ConnectorMissingCredentialError("Pylon")
|
||||
return build_auth_client(self.api_key, self.base_url)
|
||||
|
||||
@override
|
||||
def load_from_checkpoint(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch,
|
||||
end: SecondsSinceUnixEpoch,
|
||||
checkpoint: PylonConnectorCheckpoint,
|
||||
) -> CheckpointOutput[PylonConnectorCheckpoint]:
|
||||
"""Resumable indexing of Pylon issues with embedded messages and attachments.
|
||||
|
||||
Each issue becomes one document with all related content embedded as sections.
|
||||
Checkpointing happens at day boundaries.
|
||||
|
||||
If messages are tracked, the start time is adjusted backwards by lookback_days
|
||||
to capture updates to existing issues. Issues are filtered by latest_message_time
|
||||
to only index those with recent activity.
|
||||
"""
|
||||
new_checkpoint = copy.deepcopy(checkpoint)
|
||||
|
||||
messages_enabled = "messages" in self.pylon_entities
|
||||
attachments_enabled = "attachments" in self.pylon_entities
|
||||
|
||||
adjusted_start = start - (self.lookback_days * 24 * 60 * 60)
|
||||
logger.info(
|
||||
f"Applying {self.lookback_days} - "
|
||||
f"(original_start={start}, adjusted_start={adjusted_start})"
|
||||
)
|
||||
|
||||
start_boundary = self.start_epoch_sec
|
||||
if new_checkpoint.current_day_start:
|
||||
start_boundary = parse_pylon_datetime(new_checkpoint.current_day_start)
|
||||
time_windows = get_time_window_days(adjusted_start, end, start_boundary)
|
||||
if not time_windows:
|
||||
new_checkpoint.has_more = False
|
||||
return new_checkpoint
|
||||
|
||||
for idx, (start_time, end_time) in enumerate(time_windows):
|
||||
for issue in self._iter_issues(
|
||||
start_time,
|
||||
end_time,
|
||||
messages_enabled=messages_enabled,
|
||||
original_start=start,
|
||||
):
|
||||
if not is_valid_issue(issue):
|
||||
logger.warning(f"Skipping invalid issue ID: {issue.id}")
|
||||
continue
|
||||
attachments_urls = issue.attachment_urls or []
|
||||
messages = []
|
||||
if not issue.id:
|
||||
logger.warning("Skipping issue without ID")
|
||||
continue
|
||||
if messages_enabled:
|
||||
for message in self._iter_messages(issue.id):
|
||||
if not is_valid_message(message):
|
||||
logger.warning(
|
||||
f"Skipping invalid message ID: {message.id} for issue ID: {issue.id}"
|
||||
)
|
||||
continue
|
||||
if message.file_urls:
|
||||
attachments_urls.extend(message.file_urls)
|
||||
messages.append(message)
|
||||
unique_attachments_data = []
|
||||
if attachments_urls and attachments_enabled:
|
||||
unique_attachments_urls = []
|
||||
unique_normalized_urls = set()
|
||||
for url in attachments_urls:
|
||||
normalized_url = normalize_attachment_url(url)
|
||||
if (
|
||||
normalized_url
|
||||
and normalized_url not in unique_normalized_urls
|
||||
):
|
||||
unique_normalized_urls.add(normalized_url)
|
||||
unique_attachments_urls.append(url)
|
||||
for attachment in self._iter_attachments(unique_attachments_urls):
|
||||
attachment.url = normalize_attachment_url(attachment.url)
|
||||
unique_attachments_data.append(attachment)
|
||||
try:
|
||||
document = map_to_document(
|
||||
issue,
|
||||
messages,
|
||||
unique_attachments_data,
|
||||
)
|
||||
yield document
|
||||
except Exception as e:
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=(_create_id(issue)),
|
||||
document_link=issue.link,
|
||||
),
|
||||
failure_message=f"Failed to process Pylon Issue: {e}",
|
||||
exception=e,
|
||||
)
|
||||
new_checkpoint.current_day_start = start_time
|
||||
new_checkpoint.has_more = idx < len(time_windows) - 1
|
||||
return new_checkpoint
|
||||
|
||||
@override
|
||||
def build_dummy_checkpoint(self) -> PylonConnectorCheckpoint:
|
||||
"""Create an initial checkpoint with work remaining."""
|
||||
return PylonConnectorCheckpoint(has_more=True)
|
||||
|
||||
@override
|
||||
def validate_checkpoint_json(
|
||||
self, checkpoint_json: str
|
||||
) -> PylonConnectorCheckpoint:
|
||||
"""Validate and deserialize a checkpoint instance from JSON."""
|
||||
return PylonConnectorCheckpoint.model_validate_json(checkpoint_json)
|
||||
|
||||
def retrieve_all_slim_docs_perm_sync(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
) -> Iterator[list[SlimDocument]]:
|
||||
"""Return document IDs for all existing Pylon issues.
|
||||
|
||||
Used by the pruning job to determine which documents no longer exist
|
||||
in the source and should be removed from the index.
|
||||
|
||||
Note: This fetches ALL issues without lookback filtering, as we need
|
||||
to check the existence of the issue only.
|
||||
|
||||
Args:
|
||||
start: Optional start time (typically ignored for pruning)
|
||||
end: Optional end time (typically ignored for pruning)
|
||||
callback: Optional callback for progress reporting
|
||||
|
||||
Yields:
|
||||
Batches of SlimDocument objects containing only document IDs
|
||||
"""
|
||||
batch: list[SlimDocument] = []
|
||||
|
||||
if start is not None:
|
||||
start_epoch = max(start, self.start_epoch_sec)
|
||||
else:
|
||||
start_epoch = self.start_epoch_sec
|
||||
end_epoch = end if end is not None else time.time()
|
||||
|
||||
time_windows = get_time_window_days(start_epoch, end_epoch, start_epoch)
|
||||
if not time_windows:
|
||||
return
|
||||
|
||||
for start_time, end_time in time_windows:
|
||||
for issue in self._iter_issues(
|
||||
start_time, end_time, messages_enabled=False, original_start=None
|
||||
):
|
||||
if not issue.id:
|
||||
logger.warning(
|
||||
"Skipping issue without ID during slim doc retrieval"
|
||||
)
|
||||
continue
|
||||
|
||||
doc_id = _create_id(issue)
|
||||
batch.append(SlimDocument(id=doc_id))
|
||||
|
||||
if len(batch) >= self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
|
||||
if callback:
|
||||
if callback.should_stop():
|
||||
raise RuntimeError("pylon_slim_sync: Stop signal detected")
|
||||
callback.progress("pylon_slim_sync", len(batch))
|
||||
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
def validate_connector_settings(self) -> None:
|
||||
"""Validate connector configuration and credentials.
|
||||
|
||||
Called during connector setup to ensure everything is configured correctly.
|
||||
"""
|
||||
if not self.api_key:
|
||||
raise ConnectorMissingCredentialError("Pylon")
|
||||
|
||||
try:
|
||||
with self._client() as client:
|
||||
# Try a lightweight request to validate credentials
|
||||
resp = pylon_get(client, "/accounts", {"limit": 1})
|
||||
if resp.status_code == 401:
|
||||
raise CredentialExpiredError(
|
||||
"Invalid or expired Pylon credentials (HTTP 401)."
|
||||
)
|
||||
if resp.status_code == 403:
|
||||
raise InsufficientPermissionsError(
|
||||
"Insufficient permissions to access Pylon workspace (HTTP 403)."
|
||||
)
|
||||
if resp.status_code < 200 or resp.status_code >= 300:
|
||||
raise UnexpectedValidationError(
|
||||
f"Unexpected Pylon error (status={resp.status_code})."
|
||||
)
|
||||
except Exception as e:
|
||||
# Network or other unexpected errors
|
||||
if isinstance(
|
||||
e,
|
||||
(
|
||||
CredentialExpiredError,
|
||||
InsufficientPermissionsError,
|
||||
UnexpectedValidationError,
|
||||
ConnectorMissingCredentialError,
|
||||
),
|
||||
):
|
||||
raise
|
||||
raise UnexpectedValidationError(
|
||||
f"Unexpected error while validating Pylon settings: {e}"
|
||||
)
|
||||
|
||||
def _iter_issues(
|
||||
self,
|
||||
start_time: str,
|
||||
end_time: str,
|
||||
messages_enabled: bool,
|
||||
original_start: SecondsSinceUnixEpoch | None = None,
|
||||
) -> Iterator[Issue]:
|
||||
"""Retrieve issues from Pylon within the specified time window.
|
||||
|
||||
Args:
|
||||
start_time: API request start (may be extended backwards for lookback)
|
||||
end_time: API request end
|
||||
original_start: If provided, used to fetch updated issues: last_update >= original_start (Unix epoch seconds)
|
||||
|
||||
Yields:
|
||||
Issues that meet the filtering criteria
|
||||
"""
|
||||
with self._client() as client:
|
||||
params = {
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
}
|
||||
response = pylon_get(client, "/issues", params)
|
||||
response_body = GetIssuesResponseBody.model_validate(response.json())
|
||||
issues = response_body.data or []
|
||||
|
||||
for issue in issues:
|
||||
# Filter by latest_message_time if lookback is used
|
||||
if original_start is not None:
|
||||
if (
|
||||
issue.created_at
|
||||
and parse_pylon_datetime(issue.created_at) < original_start
|
||||
):
|
||||
last_update = (
|
||||
(issue.resolution_time or issue.latest_message_time or None)
|
||||
if messages_enabled
|
||||
else issue.resolution_time
|
||||
)
|
||||
if not last_update:
|
||||
# No last_update means no recent activity, skip it
|
||||
logger.debug(
|
||||
f"Skipping issue {issue.id} - no latest_message_time during lookback"
|
||||
)
|
||||
continue
|
||||
|
||||
last_update_epoch = parse_pylon_datetime(last_update)
|
||||
|
||||
if last_update_epoch < original_start:
|
||||
# last_update is before the requested start window, skip it
|
||||
logger.debug(
|
||||
f"Skipping issue {issue.id} - last_update {last_update} "
|
||||
f"is before original_start"
|
||||
)
|
||||
continue
|
||||
|
||||
yield issue
|
||||
|
||||
def _iter_messages(self, issue_id: str) -> Iterator[Message]:
|
||||
"""Retrieve messages for a specific issue from Pylon.
|
||||
|
||||
Args:
|
||||
issue_id: The ID of the issue to fetch messages for.
|
||||
|
||||
Yields:
|
||||
Message objects associated with the issue.
|
||||
"""
|
||||
with self._client() as client:
|
||||
response = pylon_get(client, f"/issues/{issue_id}/messages")
|
||||
response_body = GetIssueMessagesResponseBody.model_validate(response.json())
|
||||
messages = response_body.data or []
|
||||
for message in messages:
|
||||
yield message
|
||||
|
||||
def _iter_attachments(self, attachment_urls: list[str]) -> Iterator[AttachmentData]:
|
||||
"""Retrieve attachments for a specific issue from Pylon.
|
||||
|
||||
Args:
|
||||
attachment_urls: The URLs of the attachments to fetch.
|
||||
|
||||
Yields:
|
||||
AttachmentData objects for successfully downloaded attachments.
|
||||
Skips attachments that cannot be downloaded.
|
||||
"""
|
||||
with build_generic_client() as client:
|
||||
for attachment_url in attachment_urls:
|
||||
attachment_data = download_attachment(client, attachment_url)
|
||||
if attachment_data is not None:
|
||||
yield attachment_data
|
||||
367
backend/onyx/connectors/pylon/models.py
Normal file
367
backend/onyx/connectors/pylon/models.py
Normal file
@@ -0,0 +1,367 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import ConfigDict
|
||||
from pydantic import Field
|
||||
|
||||
|
||||
class ErrorApiResponseBody(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
errors: Optional[list[str]] = Field(None, description="The list of errors.")
|
||||
exists_id: Optional[str] = Field(
|
||||
None,
|
||||
description="The ID of the object that already exists if this is a duplicate object error.",
|
||||
)
|
||||
request_id: Optional[str] = Field(None, description="The request ID for tracking.")
|
||||
|
||||
|
||||
class GetIssueResponseBody(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
data: Optional[Issue] = None
|
||||
request_id: Optional[str] = Field(None, description="The request ID for tracking.")
|
||||
|
||||
|
||||
class GetIssuesResponseBody(BaseModel):
|
||||
"""
|
||||
The response body for GET /issues.
|
||||
Openapi spec describes pagination, but in practice it is not returned and ignored by the API.
|
||||
Therefore, the field is ignored in ingestion.
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
data: Optional[list[Issue]] = Field(
|
||||
None, description="The data payload of the response."
|
||||
)
|
||||
pagination: Optional[Pagination] = None
|
||||
request_id: Optional[str] = Field(None, description="The request ID for tracking.")
|
||||
|
||||
|
||||
class GetIssueMessagesResponseBody(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
data: Optional[list[Message]] = Field(
|
||||
None, description="The data payload of the response."
|
||||
)
|
||||
pagination: Optional[Pagination] = None
|
||||
request_id: Optional[str] = Field(None, description="The request ID for tracking.")
|
||||
|
||||
|
||||
class Message(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
author: Optional[MessageAuthor] = None
|
||||
email_info: Optional[EmailMessageInfo] = None
|
||||
file_urls: Optional[list[str]] = Field(
|
||||
None, description="The URLs of the files in the message, if any."
|
||||
)
|
||||
id: Optional[str] = Field(None, description="The ID of the message.")
|
||||
is_private: Optional[bool] = Field(
|
||||
None, description="Indicates if the message is private."
|
||||
)
|
||||
message_html: Optional[str] = Field(
|
||||
None, description="The HTML body of the message."
|
||||
)
|
||||
source: Optional[str] = Field(None, description="The source of the message.")
|
||||
thread_id: Optional[str] = Field(
|
||||
None,
|
||||
description="The ID of the thread the message belongs to. This is only set for internal notes.",
|
||||
)
|
||||
timestamp: Optional[str] = Field(
|
||||
None, description="The time at which the message was created."
|
||||
)
|
||||
|
||||
|
||||
class MessageAuthor(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
avatar_url: Optional[str] = None
|
||||
contact: Optional[MiniContact] = None
|
||||
name: Optional[str] = None
|
||||
user: Optional[MiniUser] = None
|
||||
|
||||
|
||||
class EmailMessageInfo(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
bcc_emails: Optional[list[str]] = Field(
|
||||
None, description="The email addresses of the BCC recipients of the message."
|
||||
)
|
||||
cc_emails: Optional[list[str]] = Field(
|
||||
None, description="The email addresses of the CC recipients of the message."
|
||||
)
|
||||
from_email: Optional[str] = Field(
|
||||
None, description="The email address of the sender of the message."
|
||||
)
|
||||
to_emails: Optional[list[str]] = Field(
|
||||
None, description="The email addresses of the recipients of the message."
|
||||
)
|
||||
|
||||
|
||||
class Issue(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
account: Optional[MiniAccount] = None
|
||||
assignee: Optional[MiniUser] = None
|
||||
attachment_urls: Optional[list[str]] = Field(
|
||||
None, description="The attachment URLs attached to this issue, if any."
|
||||
)
|
||||
body_html: Optional[str] = Field(
|
||||
None, description="The body of the issue in HTML format."
|
||||
)
|
||||
business_hours_first_response_seconds: Optional[int] = Field(
|
||||
None,
|
||||
description="The business hours time in seconds it took for the first response to the issue, if any.",
|
||||
)
|
||||
business_hours_resolution_seconds: Optional[int] = Field(
|
||||
None,
|
||||
description="The business hours time in seconds it took for the issue to be resolved, if any.",
|
||||
)
|
||||
chat_widget_info: Optional[IssueChatWidgetInfo] = None
|
||||
created_at: Optional[str] = Field(
|
||||
None, description="The time the issue was created."
|
||||
)
|
||||
csat_responses: Optional[list[CSATResponse]] = Field(
|
||||
None, description="The CSAT responses of the issue, if any."
|
||||
)
|
||||
custom_fields: Optional[dict[str, CustomFieldValue]] = Field(
|
||||
None, description="Custom field values associated with the issue."
|
||||
)
|
||||
customer_portal_visible: Optional[bool] = Field(
|
||||
None, description="Whether the issue is visible in the customer portal."
|
||||
)
|
||||
external_issues: Optional[list[ExternalIssue]] = Field(
|
||||
None, description="The external issues associated with the issue, if any."
|
||||
)
|
||||
first_response_seconds: Optional[int] = Field(
|
||||
None,
|
||||
description="The time in seconds it took for the first response to the issue, if any.",
|
||||
)
|
||||
first_response_time: Optional[str] = Field(
|
||||
None, description="The time of the first response to the issue, if any."
|
||||
)
|
||||
id: Optional[str] = Field(None, description="The ID of the issue.")
|
||||
latest_message_time: Optional[str] = Field(
|
||||
None, description="The time of the latest message in the issue."
|
||||
)
|
||||
link: Optional[str] = Field(None, description="The link to the issue in Pylon.")
|
||||
number: Optional[int] = Field(None, description="The number of the issue.")
|
||||
number_of_touches: Optional[int] = Field(
|
||||
None, description="The number of times the issue has been touched."
|
||||
)
|
||||
requester: Optional[MiniContact] = None
|
||||
resolution_seconds: Optional[int] = Field(
|
||||
None,
|
||||
description="The time in seconds it took for the issue to be resolved, if any.",
|
||||
)
|
||||
resolution_time: Optional[str] = Field(
|
||||
None, description="The time of the resolution of the issue, if any."
|
||||
)
|
||||
slack: Optional[SlackInfo] = None
|
||||
snoozed_until_time: Optional[str] = Field(
|
||||
None,
|
||||
description="The time the issue was snoozed until in RFC3339 format, if any.",
|
||||
)
|
||||
source: Optional[Source] = Field(
|
||||
None,
|
||||
description=(
|
||||
"The source of the issue."
|
||||
"* slack IssueSourceSlack"
|
||||
"* microsoft_teams IssueSourceMicrosoftTeams"
|
||||
"* microsoft_teams_chat IssueSourceMicrosoftTeamsChat"
|
||||
"* chat_widget IssueSourceChatWidget"
|
||||
"* email IssueSourceEmail"
|
||||
"* manual IssueSourceManual"
|
||||
"* form IssueSourceForm"
|
||||
"* discord IssueSourceDiscord"
|
||||
),
|
||||
)
|
||||
state: Optional[str] = Field(
|
||||
None,
|
||||
description=(
|
||||
"The state of the issue. This could be one of "
|
||||
'`["new", "waiting_on_you", "waiting_on_customer", "on_hold", "closed"] or a custom status slug.'
|
||||
),
|
||||
)
|
||||
tags: Optional[list[str]] = Field(
|
||||
None, description="Tags associated with the issue."
|
||||
)
|
||||
team: Optional[MiniTeam] = None
|
||||
title: Optional[str] = Field(None, description="The title of the issue.")
|
||||
type: Optional[Type1] = Field(
|
||||
None,
|
||||
description="The type of the issue.\n\n* Conversation IssueTypeConversation\n\n* Ticket IssueTypeTicket",
|
||||
)
|
||||
|
||||
|
||||
class Pagination(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
cursor: str = Field(..., description="The cursor for the next page of results.")
|
||||
has_next_page: bool = Field(
|
||||
..., description="Indicates if there is a next page of results."
|
||||
)
|
||||
|
||||
|
||||
class MiniAccount(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
id: Optional[str] = Field(None, description="The ID of the account.")
|
||||
|
||||
|
||||
class MiniContact(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
email: Optional[str] = Field(None, description="The email of the contact.")
|
||||
id: Optional[str] = Field(None, description="The ID of the contact.")
|
||||
|
||||
|
||||
class CSATResponse(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
comment: Optional[str] = Field(
|
||||
None, description="The comment of the CSAT response."
|
||||
)
|
||||
score: Optional[int] = Field(None, description="The score of the CSAT response.")
|
||||
|
||||
|
||||
class IssueChatWidgetInfo(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
page_url: Optional[str] = Field(
|
||||
None,
|
||||
description="The URL of the page that the user was on when they started the chat widget issue.",
|
||||
)
|
||||
|
||||
|
||||
class MiniUser(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
email: Optional[str] = Field(None, description="The email of the user.")
|
||||
id: Optional[str] = Field(None, description="The ID of the user.")
|
||||
|
||||
|
||||
class CustomFieldValue(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
slug: Optional[str] = Field(None, description="The slug of the custom field.")
|
||||
value: Optional[str] = Field(
|
||||
None,
|
||||
description=(
|
||||
"The value of the custom field. Only to be used for single-valued custom fields. "
|
||||
"If unset, the custom field will be unset. If the custom field is a select field, "
|
||||
"the value must be the select option slug, which you can find from the GET /custom-fields endpoint."
|
||||
),
|
||||
)
|
||||
values: Optional[list[str]] = Field(
|
||||
None,
|
||||
description=(
|
||||
"The values of the custom field. Only to be used for multi-valued custom fields (ex. multiselect). "
|
||||
"If unset, the custom field will be unset. If the custom field is a multiselect field, "
|
||||
"the values must be the select option slugs which you can find from the GET /custom-fields endpoint."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class ExternalIssue(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
external_id: Optional[str] = Field(
|
||||
None,
|
||||
description=(
|
||||
"The external ID of the external issue."
|
||||
"Jira: ID of the issue (autoincrementing number from 10000)."
|
||||
"GitHub: Owner/Repo/IssueID."
|
||||
"Linear: ID of the issue (UUID)."
|
||||
"Asana: ID of the task (Long number)."
|
||||
),
|
||||
)
|
||||
link: Optional[str] = Field(None, description="Link to the product issue.")
|
||||
source: Optional[str] = Field(None, description="The source of the external issue.")
|
||||
|
||||
|
||||
class SlackInfo(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
channel_id: Optional[str] = Field(
|
||||
None, description="The Slack channel ID associated with the issue."
|
||||
)
|
||||
message_ts: Optional[str] = Field(
|
||||
None, description="The root message ID of slack message that started issue."
|
||||
)
|
||||
workspace_id: Optional[str] = Field(
|
||||
None, description="The Slack workspace ID associated with the issue."
|
||||
)
|
||||
|
||||
|
||||
class Source(Enum):
|
||||
"""
|
||||
The source of the issue.
|
||||
|
||||
* slack IssueSourceSlack
|
||||
|
||||
* microsoft_teams IssueSourceMicrosoftTeams
|
||||
|
||||
* microsoft_teams_chat IssueSourceMicrosoftTeamsChat
|
||||
|
||||
* chat_widget IssueSourceChatWidget
|
||||
|
||||
* email IssueSourceEmail
|
||||
|
||||
* manual IssueSourceManual
|
||||
|
||||
* form IssueSourceForm
|
||||
|
||||
* discord IssueSourceDiscord
|
||||
"""
|
||||
|
||||
slack = "slack"
|
||||
microsoft_teams = "microsoft_teams"
|
||||
microsoft_teams_chat = "microsoft_teams_chat"
|
||||
chat_widget = "chat_widget"
|
||||
email = "email"
|
||||
manual = "manual"
|
||||
form = "form"
|
||||
discord = "discord"
|
||||
|
||||
|
||||
class MiniTeam(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
extra="ignore",
|
||||
)
|
||||
id: Optional[str] = Field(None, description="The ID of the team.")
|
||||
|
||||
|
||||
class Type1(Enum):
|
||||
"""
|
||||
The type of the issue.
|
||||
|
||||
* Conversation IssueTypeConversation
|
||||
|
||||
* Ticket IssueTypeTicket
|
||||
"""
|
||||
|
||||
Conversation = "Conversation"
|
||||
Ticket = "Ticket"
|
||||
598
backend/onyx/connectors/pylon/utils.py
Normal file
598
backend/onyx/connectors/pylon/utils.py
Normal file
@@ -0,0 +1,598 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import time
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from datetime import timezone
|
||||
from io import BytesIO
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from onyx.configs.app_configs import REQUEST_TIMEOUT_SECONDS
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.configs.constants import FileOrigin
|
||||
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import (
|
||||
rate_limit_builder,
|
||||
)
|
||||
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
|
||||
from onyx.connectors.models import BasicExpertInfo
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.connectors.pylon.models import Issue
|
||||
from onyx.connectors.pylon.models import Message
|
||||
from onyx.file_processing.extract_file_text import extract_text_and_images
|
||||
from onyx.file_processing.html_utils import web_html_cleanup
|
||||
from onyx.file_processing.image_utils import store_image_and_create_section
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.retry_wrapper import retry_builder
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
class PylonRetriableError(Exception):
|
||||
"""Raised for retriable Pylon conditions (429, 5xx)."""
|
||||
|
||||
|
||||
class PylonNonRetriableError(Exception):
|
||||
"""Raised for non-retriable Pylon client errors (4xx except 429)."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class AttachmentData:
|
||||
"""Container for downloaded attachment data.
|
||||
|
||||
Attributes:
|
||||
content: Raw bytes of the attachment
|
||||
filename: Extracted filename from content-disposition header
|
||||
content_type: MIME type from content-type header (e.g., "image/png")
|
||||
url: Original presigned URL used for download
|
||||
"""
|
||||
|
||||
content: bytes
|
||||
filename: str
|
||||
content_type: str
|
||||
url: str
|
||||
|
||||
|
||||
def build_auth_client(
|
||||
api_key: str, base_url: str = "https://api.usepylon.com"
|
||||
) -> httpx.Client:
|
||||
"""Build an authenticated HTTP client for Pylon API requests.
|
||||
|
||||
Args:
|
||||
api_key: Pylon API key for Bearer authentication
|
||||
base_url: Base URL for Pylon API (defaults to production)
|
||||
|
||||
Returns:
|
||||
Configured httpx.Client with authentication headers
|
||||
"""
|
||||
client = httpx.Client(
|
||||
base_url=base_url,
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
timeout=REQUEST_TIMEOUT_SECONDS,
|
||||
follow_redirects=True,
|
||||
)
|
||||
return client
|
||||
|
||||
|
||||
def build_generic_client() -> httpx.Client:
|
||||
"""
|
||||
Build a generic HTTP client for downloading attachments from Pylon issues and messages.
|
||||
These attachments are presigned URLs and do not require authentication.
|
||||
|
||||
Returns:
|
||||
Configured httpx.Client with authentication headers
|
||||
"""
|
||||
client = httpx.Client(
|
||||
timeout=REQUEST_TIMEOUT_SECONDS,
|
||||
follow_redirects=True,
|
||||
)
|
||||
return client
|
||||
|
||||
|
||||
@retry_builder(
|
||||
tries=6,
|
||||
delay=1,
|
||||
backoff=2,
|
||||
max_delay=30,
|
||||
exceptions=(PylonRetriableError, httpx.RequestError, httpx.ConnectError),
|
||||
)
|
||||
@rate_limit_builder(max_calls=20, period=60)
|
||||
def pylon_get(
|
||||
client: httpx.Client, url: str, params: dict[str, Any] | None = None
|
||||
) -> httpx.Response:
|
||||
"""Perform a GET against Pylon API with retry and rate limiting.
|
||||
|
||||
Retries on 429 and 5xx responses, and on transport errors. Honors
|
||||
`Retry-After` header for 429 when present by sleeping before retrying.
|
||||
"""
|
||||
try:
|
||||
response = client.get(url, params=params)
|
||||
except httpx.RequestError:
|
||||
# Allow retry_builder to handle retries of transport errors
|
||||
raise
|
||||
|
||||
try:
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError as e:
|
||||
status = e.response.status_code if e.response is not None else None
|
||||
if status == 429:
|
||||
retry_after = e.response.headers.get("Retry-After") if e.response else None
|
||||
if retry_after is not None:
|
||||
try:
|
||||
time.sleep(int(retry_after))
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
raise PylonRetriableError("Pylon rate limit exceeded (429)") from e
|
||||
if status is not None and 500 <= status < 600:
|
||||
raise PylonRetriableError(f"Pylon server error: {status}") from e
|
||||
if status is not None and 400 <= status < 500:
|
||||
raise PylonNonRetriableError(f"Pylon client error: {status}") from e
|
||||
# Unknown status, propagate
|
||||
raise
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def parse_ymd_date(date_str: str) -> SecondsSinceUnixEpoch:
|
||||
"""Parse YYYY-MM-DD date string to Unix timestamp.
|
||||
|
||||
Args:
|
||||
date_str: Date string in YYYY-MM-DD format
|
||||
Returns:
|
||||
Unix timestamp as SecondsSinceUnixEpoch
|
||||
"""
|
||||
try:
|
||||
# Convert start_date (expected format: YYYY-MM-DD) to SecondsSinceUnixEpoch
|
||||
dt = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
|
||||
date_epoch = dt.timestamp()
|
||||
except Exception as e:
|
||||
# Default to 2025-01-01 UTC if there's a parsing error
|
||||
logger.warning(
|
||||
"Unable to parse start date from '%s'. Reason '%s'", date_str, str(e)
|
||||
)
|
||||
date_epoch = datetime(2025, 1, 1, tzinfo=timezone.utc).timestamp()
|
||||
return date_epoch
|
||||
|
||||
|
||||
def parse_pylon_datetime(datetime_str: str) -> SecondsSinceUnixEpoch:
|
||||
"""Parse RFC3339 datetime string to Unix timestamp.
|
||||
|
||||
Args:
|
||||
datetime_str: RFC3339 formatted datetime string
|
||||
|
||||
Returns:
|
||||
Unix timestamp as SecondsSinceUnixEpoch
|
||||
"""
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# Parse RFC3339 format (ISO with timezone)
|
||||
dt = datetime.fromisoformat(datetime_str.replace("Z", "+00:00"))
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt.timestamp()
|
||||
|
||||
|
||||
def get_time_window_days(
|
||||
start: SecondsSinceUnixEpoch,
|
||||
end: SecondsSinceUnixEpoch,
|
||||
start_boundary: SecondsSinceUnixEpoch,
|
||||
) -> list[tuple[str, str]]:
|
||||
"""Split time window into daily chunks for better checkpointing.
|
||||
|
||||
Args:
|
||||
start: Start timestamp
|
||||
end: End timestamp
|
||||
start_boundary: either global start_epoch_sec or date from checkpoint
|
||||
|
||||
Returns:
|
||||
List of (start_iso, end_iso) tuples for each day
|
||||
"""
|
||||
# Respect start_epoch_sec if provided
|
||||
if start_boundary and start < start_boundary:
|
||||
start = start_boundary
|
||||
|
||||
days = []
|
||||
start_dt = datetime.fromtimestamp(start, tz=timezone.utc).replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
|
||||
end_dt = datetime.fromtimestamp(end, tz=timezone.utc)
|
||||
one_day = timedelta(days=1)
|
||||
|
||||
while start_dt <= end_dt:
|
||||
day_start = start_dt
|
||||
day_end = min(start_dt + one_day, end_dt)
|
||||
|
||||
days.append((day_start.isoformat(), day_end.isoformat()))
|
||||
|
||||
start_dt += one_day
|
||||
|
||||
return days
|
||||
|
||||
|
||||
def generate_attachment_id(stable_url: str) -> str:
|
||||
"""Generate stable ID for attachment based on URL.
|
||||
|
||||
Args:
|
||||
stable_url: URL with query parameters stripped
|
||||
|
||||
Returns:
|
||||
Hash-based stable ID
|
||||
"""
|
||||
# Create a hash of the URL for stable ID generation
|
||||
url_hash = hashlib.md5(stable_url.encode()).hexdigest()[:16]
|
||||
return f"pylon_attachment_{url_hash}"
|
||||
|
||||
|
||||
def normalize_attachment_url(url: str) -> str:
|
||||
"""Normalize attachment URL by removing query parameters.
|
||||
|
||||
Args:
|
||||
url: Raw attachment URL
|
||||
|
||||
Returns:
|
||||
URL with query parameters removed
|
||||
|
||||
Example url:
|
||||
"https://assets.usepylon.com/UUID_1/UUID_2-image.png?Expires=253370764800&Signature=SIG_VALUE&Key-Pair-Id=VALUE"
|
||||
"""
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
|
||||
parsed = urlparse(url)
|
||||
# Remove query parameters and fragment
|
||||
normalized = urlunparse(
|
||||
(
|
||||
parsed.scheme,
|
||||
parsed.netloc,
|
||||
parsed.path,
|
||||
parsed.params,
|
||||
"", # Remove query
|
||||
"", # Remove fragment
|
||||
)
|
||||
)
|
||||
return normalized
|
||||
|
||||
|
||||
def _parse_filename_from_content_disposition(content_disposition: str) -> str | None:
|
||||
"""Parse filename from Content-Disposition header.
|
||||
|
||||
Args:
|
||||
content_disposition: Value of Content-Disposition header
|
||||
|
||||
Returns:
|
||||
Extracted filename or None if not found
|
||||
|
||||
Example:
|
||||
"inline; filename=Screenshot 2025-10-12 at 8.22.32.png" -> "Screenshot 2025-10-12 at 8.22.32.png"
|
||||
'attachment; filename="document.pdf"' -> "document.pdf"
|
||||
"""
|
||||
import re
|
||||
|
||||
if not content_disposition:
|
||||
return None
|
||||
|
||||
# Try to match filename="..." or filename=... patterns
|
||||
# Handle both quoted and unquoted filenames
|
||||
patterns = [
|
||||
r"filename\*=UTF-8\'\'([^;]+)", # RFC 5987 encoded filename
|
||||
r'filename="([^"]+)"', # Quoted filename
|
||||
r"filename='([^']+)'", # Single-quoted filename
|
||||
r"filename=([^;]+)", # Unquoted filename
|
||||
]
|
||||
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, content_disposition, re.IGNORECASE)
|
||||
if match:
|
||||
filename = match.group(1).strip()
|
||||
# URL decode if it's RFC 5987 encoded
|
||||
if "UTF-8''" in pattern:
|
||||
from urllib.parse import unquote
|
||||
|
||||
filename = unquote(filename)
|
||||
return filename if filename else None
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _clean_html_to_text(
|
||||
html: str,
|
||||
) -> str:
|
||||
"""Convert HTML fragments to cleaned plain text using shared HTML utils.
|
||||
|
||||
This applies consistent stripping of boilerplate (e.g., style/script/link/meta),
|
||||
normalizes whitespace, and respects global configuration for link handling
|
||||
and readability extraction.
|
||||
|
||||
Args:
|
||||
html: The HTML string to clean.
|
||||
discard: Additional element names to discard beyond defaults. If not
|
||||
provided, a safe default set for inline fragments is used.
|
||||
mintlify_cleanup: Whether to enable Mintlify-specific class pruning.
|
||||
|
||||
Returns:
|
||||
Cleaned plain text suitable for indexing.
|
||||
"""
|
||||
default_discard = [
|
||||
"style",
|
||||
"script",
|
||||
"link",
|
||||
"meta",
|
||||
"svg",
|
||||
"noscript",
|
||||
"template",
|
||||
]
|
||||
parsed = web_html_cleanup(
|
||||
html,
|
||||
mintlify_cleanup_enabled=False,
|
||||
additional_element_types_to_discard=default_discard,
|
||||
)
|
||||
return parsed.cleaned_text
|
||||
|
||||
|
||||
@retry_builder(tries=3, delay=1.0, backoff=2.0)
|
||||
def download_attachment(
|
||||
client: httpx.Client, presigned_url: str
|
||||
) -> AttachmentData | None:
|
||||
"""Download attachment from presigned URL with retry logic.
|
||||
|
||||
Performs a HEAD request first to check content-disposition header and content-length.
|
||||
Skips attachments without content-disposition header or those exceeding 10 MB.
|
||||
|
||||
Args:
|
||||
client: HTTP client to use for the download
|
||||
presigned_url: Presigned URL for the attachment (e.g., from a Pylon issue)
|
||||
|
||||
Returns:
|
||||
AttachmentData object containing content, filename, content_type, and url,
|
||||
or None if file should be skipped
|
||||
|
||||
Raises:
|
||||
Exception: If download fails after retries or if response is unsuccessful
|
||||
"""
|
||||
MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024 # 10 MB in bytes
|
||||
|
||||
# First, make a HEAD request to check content-disposition and content-length
|
||||
head_response = client.head(presigned_url)
|
||||
head_response.raise_for_status()
|
||||
|
||||
# Extract filename from content-disposition header
|
||||
content_disposition = head_response.headers.get("content-disposition")
|
||||
if not content_disposition:
|
||||
logger.info(
|
||||
f"Skipping attachment download (no content-disposition header): {presigned_url}"
|
||||
)
|
||||
return None
|
||||
|
||||
filename = _parse_filename_from_content_disposition(content_disposition)
|
||||
if not filename:
|
||||
logger.info(
|
||||
f"Skipping attachment download (no filename in content-disposition): {presigned_url}"
|
||||
)
|
||||
return None
|
||||
|
||||
# Extract content-type (MIME type)
|
||||
content_type = head_response.headers.get("content-type", "application/octet-stream")
|
||||
|
||||
# Check content-length
|
||||
content_length_header = head_response.headers.get("content-length")
|
||||
if content_length_header and content_length_header.isdigit():
|
||||
content_length = int(content_length_header)
|
||||
if content_length > MAX_ATTACHMENT_SIZE:
|
||||
logger.info(
|
||||
f"Skipping attachment download (size {content_length} bytes exceeds "
|
||||
f"{MAX_ATTACHMENT_SIZE} bytes limit): {presigned_url}"
|
||||
)
|
||||
return None
|
||||
|
||||
response = client.get(presigned_url)
|
||||
response.raise_for_status()
|
||||
|
||||
if not response.content:
|
||||
logger.warning(f"Empty response content from: {presigned_url}")
|
||||
return None
|
||||
|
||||
return AttachmentData(
|
||||
content=response.content,
|
||||
filename=filename,
|
||||
content_type=content_type,
|
||||
url=presigned_url,
|
||||
)
|
||||
|
||||
|
||||
def map_to_document(
|
||||
issue: Issue, messages: list[Message], attachments: list[AttachmentData]
|
||||
) -> Document:
|
||||
"""
|
||||
Map Pylon issue, messages, and attachments to a single Document object.
|
||||
|
||||
Behavior:
|
||||
- Converts issue and message HTML to plain text and appends as TextSection.
|
||||
- Processes each attachment via extract_text_and_images; appends extracted text as TextSection.
|
||||
- Persists only images (original image attachments and any embedded images) to FileStore and
|
||||
appends ImageSection referencing the stored file id.
|
||||
- Attachment URLs are presigned/ephemeral, so they are not added to the link field.
|
||||
"""
|
||||
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
|
||||
# Issue body
|
||||
if issue.body_html:
|
||||
cleaned_issue_text = _clean_html_to_text(issue.body_html)
|
||||
if cleaned_issue_text:
|
||||
sections.append(TextSection(link=issue.link, text=cleaned_issue_text))
|
||||
|
||||
# Messages
|
||||
for idx, message in enumerate(messages):
|
||||
if idx == 0:
|
||||
# the first message's `message_html` and the issue's body_html are identical
|
||||
continue
|
||||
message_html = message.message_html
|
||||
if not message_html:
|
||||
continue
|
||||
cleaned_message_text = _clean_html_to_text(message_html or "")
|
||||
if cleaned_message_text:
|
||||
link = f"{issue.link}&messageID={message.id}" if issue.link else None
|
||||
sections.append(TextSection(link=link, text=cleaned_message_text))
|
||||
|
||||
sections.extend(_process_attachments(attachments))
|
||||
|
||||
if issue.title:
|
||||
semantic_identifier = issue.title
|
||||
elif issue.number is not None:
|
||||
semantic_identifier = f"Issue #{issue.number}"
|
||||
else:
|
||||
semantic_identifier = f"Issue {issue.id or 'unknown'}"
|
||||
|
||||
metadata = _create_metadata(issue)
|
||||
|
||||
document = Document(
|
||||
id=f"pylon:issue:{issue.id}",
|
||||
source=DocumentSource.PYLON,
|
||||
semantic_identifier=semantic_identifier,
|
||||
title=issue.title or semantic_identifier,
|
||||
sections=sections,
|
||||
metadata=metadata,
|
||||
)
|
||||
assignee = issue.assignee.email if issue.assignee and issue.assignee.email else None
|
||||
|
||||
if assignee:
|
||||
document.secondary_owners = [BasicExpertInfo(email=assignee)]
|
||||
|
||||
requester = (
|
||||
issue.requester.email if issue.requester and issue.requester.email else None
|
||||
)
|
||||
if requester:
|
||||
document.primary_owners = [BasicExpertInfo(email=requester)]
|
||||
|
||||
return document
|
||||
|
||||
|
||||
def _process_attachments(
|
||||
attachments: list[AttachmentData],
|
||||
) -> Sequence[TextSection | ImageSection]:
|
||||
sections: list[TextSection | ImageSection] = []
|
||||
for attachment in attachments:
|
||||
# Stable base id derived from normalized (query-stripped) URL
|
||||
base_stable_url = normalize_attachment_url(attachment.url)
|
||||
base_id = generate_attachment_id(base_stable_url)
|
||||
|
||||
# Persist original if it is an image
|
||||
if attachment.content_type.startswith("image/"):
|
||||
try:
|
||||
image_section, _ = store_image_and_create_section(
|
||||
image_data=attachment.content,
|
||||
file_id=base_id,
|
||||
display_name=attachment.filename,
|
||||
media_type=attachment.content_type,
|
||||
file_origin=FileOrigin.CONNECTOR,
|
||||
)
|
||||
logger.debug(
|
||||
f"Attachment Image: {image_section.image_file_id}: {attachment.content_type}: {attachment.filename}"
|
||||
)
|
||||
sections.append(image_section)
|
||||
except Exception:
|
||||
# Best-effort: do not fail the whole document if image storage fails
|
||||
logger.exception(
|
||||
"Failed to persist original image attachment for Pylon"
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
extraction_result = extract_text_and_images(
|
||||
file=BytesIO(attachment.content),
|
||||
file_name=attachment.filename,
|
||||
pdf_pass=None,
|
||||
content_type=attachment.content_type,
|
||||
image_callback=None,
|
||||
)
|
||||
if extraction_result.text_content:
|
||||
sections.append(TextSection(text=extraction_result.text_content))
|
||||
|
||||
# Persist any embedded images discovered during extraction
|
||||
for idx, (img_bytes, img_name) in enumerate(
|
||||
extraction_result.embedded_images
|
||||
):
|
||||
try:
|
||||
mime = "application/octet-stream"
|
||||
image_section, _ = store_image_and_create_section(
|
||||
image_data=img_bytes,
|
||||
file_id=f"{base_id}:{idx}",
|
||||
display_name=img_name,
|
||||
media_type=mime,
|
||||
file_origin=FileOrigin.CONNECTOR,
|
||||
)
|
||||
logger.debug(
|
||||
f"Attachment Image Embedded: {image_section.image_file_id}: "
|
||||
f"{attachment.content_type}: {attachment.filename}"
|
||||
)
|
||||
sections.append(image_section)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to persist embedded image for Pylon attachment"
|
||||
)
|
||||
return sections
|
||||
|
||||
|
||||
def _create_metadata(issue: Issue) -> dict[str, str | list[str]]:
|
||||
# Build metadata with string or list[str] values only
|
||||
metadata: dict[str, str | list[str]] = {
|
||||
"entity_type": issue.type.value if issue.type else "Ticket"
|
||||
}
|
||||
if issue.number is not None:
|
||||
metadata["issue_number"] = str(issue.number)
|
||||
if issue.state:
|
||||
metadata["state"] = issue.state
|
||||
if issue.created_at:
|
||||
metadata["created_at"] = issue.created_at
|
||||
updated_at = issue.latest_message_time or issue.resolution_time or issue.created_at
|
||||
if updated_at:
|
||||
metadata["updated_at"] = updated_at
|
||||
if issue.tags:
|
||||
metadata["tags"] = issue.tags
|
||||
if issue.source is not None:
|
||||
metadata["source"] = str(issue.source.value)
|
||||
return metadata
|
||||
|
||||
|
||||
def is_valid_issue(issue: Issue) -> bool:
|
||||
"""
|
||||
Validate that the Issue model has all required fields.
|
||||
OpenAPI spec does not specify required fields, so we check manually.
|
||||
The intended use is to ignore issues that are missing critical fields.
|
||||
Args:
|
||||
issue: Issue model to validate
|
||||
Returns:
|
||||
True if valid, False otherwise
|
||||
"""
|
||||
required_fields = ["id", "body_html"]
|
||||
for field in required_fields:
|
||||
if not hasattr(issue, field) or getattr(issue, field) is None:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_valid_message(issue: Message) -> bool:
|
||||
"""
|
||||
Validate that the Message model has all required fields.
|
||||
OpenAPI spec does not specify required fields, so we check manually.
|
||||
The intended use is to ignore issues that are missing critical fields.
|
||||
Args:
|
||||
issue: Issue model to validate
|
||||
Returns:
|
||||
True if valid, False otherwise
|
||||
"""
|
||||
required_fields = ["id", "message_html"]
|
||||
for field in required_fields:
|
||||
if not hasattr(issue, field) or getattr(issue, field) is None:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _create_id(issue: Issue) -> str:
|
||||
return f"{DocumentSource.PYLON.value}:issue:{issue.id}"
|
||||
@@ -200,6 +200,10 @@ CONNECTOR_CLASS_MAP = {
|
||||
module_path="onyx.connectors.bitbucket.connector",
|
||||
class_name="BitbucketConnector",
|
||||
),
|
||||
DocumentSource.PYLON: ConnectorMapping(
|
||||
module_path="onyx.connectors.pylon.connector",
|
||||
class_name="PylonConnector",
|
||||
),
|
||||
# just for integration tests
|
||||
DocumentSource.MOCK_CONNECTOR: ConnectorMapping(
|
||||
module_path="onyx.connectors.mock_connector.connector",
|
||||
|
||||
@@ -53,7 +53,7 @@ def test_bitbucket_checkpointed_load(
|
||||
)
|
||||
|
||||
assert isinstance(docs, list)
|
||||
|
||||
assert len(docs) > 0
|
||||
for doc in docs:
|
||||
assert doc.source == DocumentSource.BITBUCKET
|
||||
assert doc.metadata is not None
|
||||
|
||||
5
backend/tests/daily/connectors/pylon/conftest.py
Normal file
5
backend/tests/daily/connectors/pylon/conftest.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from tests.load_env_vars import load_env_vars
|
||||
|
||||
|
||||
# Load environment variables at the module level
|
||||
load_env_vars()
|
||||
@@ -0,0 +1,64 @@
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.connectors.pylon.connector import PylonConnector
|
||||
from onyx.connectors.pylon.utils import parse_ymd_date
|
||||
from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pylon_connector_for_checkpoint() -> PylonConnector:
|
||||
"""Daily fixture for Pylon checkpointed indexing.
|
||||
|
||||
Env vars:
|
||||
- PYLON_API_KEY: Pylon API key
|
||||
"""
|
||||
api_key = os.environ.get("PYLON_API_KEY")
|
||||
|
||||
if not api_key:
|
||||
pytest.skip("PYLON_API_KEY not set in environment")
|
||||
|
||||
connector = PylonConnector(
|
||||
pylon_entities=["messages"],
|
||||
start_date="2025-10-16",
|
||||
lookback_days=0,
|
||||
batch_size=10,
|
||||
)
|
||||
|
||||
connector.load_credentials({"pylon_api_key": api_key})
|
||||
return connector
|
||||
|
||||
|
||||
def test_pylon_checkpointed_load(
|
||||
pylon_connector_for_checkpoint: PylonConnector,
|
||||
) -> None:
|
||||
start = parse_ymd_date("2025-10-16") # fixed date to ensure results
|
||||
end = start + 24 * 60 * 60 # 1 day after start
|
||||
|
||||
docs = load_all_docs_from_checkpoint_connector(
|
||||
connector=pylon_connector_for_checkpoint,
|
||||
start=start,
|
||||
end=end,
|
||||
)
|
||||
|
||||
assert isinstance(docs, list)
|
||||
assert len(docs) > 0
|
||||
for doc in docs:
|
||||
assert doc.source == DocumentSource.PYLON
|
||||
assert doc.metadata is not None
|
||||
|
||||
assert doc.id.startswith(
|
||||
f"{DocumentSource.PYLON.value}:issue:"
|
||||
), f"Unexpected document ID format: {doc.id}"
|
||||
|
||||
assert "state" in doc.metadata, "Missing 'state' in metadata"
|
||||
assert "created_at" in doc.metadata, "Missing 'created_at' in metadata"
|
||||
assert "updated_at" in doc.metadata, "Missing 'updated_at' in metadata"
|
||||
|
||||
assert doc.semantic_identifier, "Missing semantic_identifier"
|
||||
|
||||
assert (
|
||||
len(doc.sections) >= 1
|
||||
), f"Expected at least 1 section, got {len(doc.sections)}"
|
||||
@@ -0,0 +1,51 @@
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.connectors.pylon.connector import PylonConnector
|
||||
from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pylon_connector_for_slim() -> PylonConnector:
|
||||
api_key = os.environ.get("PYLON_API_KEY")
|
||||
|
||||
if not api_key:
|
||||
pytest.skip("PYLON_API_KEY not set in environment")
|
||||
|
||||
connector = PylonConnector(
|
||||
pylon_entities=["messages"], # issues always included
|
||||
start_date="2025-10-16",
|
||||
lookback_days=0,
|
||||
batch_size=10,
|
||||
)
|
||||
|
||||
connector.load_credentials({"pylon_api_key": api_key})
|
||||
return connector
|
||||
|
||||
|
||||
def test_pylon_full_ids_subset_of_slim_ids(
|
||||
pylon_connector_for_slim: PylonConnector,
|
||||
) -> None:
|
||||
docs = load_all_docs_from_checkpoint_connector(
|
||||
connector=pylon_connector_for_slim,
|
||||
start=0,
|
||||
end=time.time(),
|
||||
)
|
||||
all_full_doc_ids: set[str] = set([doc.id for doc in docs])
|
||||
|
||||
all_slim_doc_ids: set[str] = set()
|
||||
for slim_doc_batch in pylon_connector_for_slim.retrieve_all_slim_docs_perm_sync():
|
||||
all_slim_doc_ids.update([doc.id for doc in slim_doc_batch])
|
||||
|
||||
assert all_full_doc_ids.issubset(all_slim_doc_ids)
|
||||
assert len(all_slim_doc_ids) > 0
|
||||
|
||||
if all_slim_doc_ids:
|
||||
example_id = next(iter(all_slim_doc_ids))
|
||||
assert example_id.startswith(f"{DocumentSource.PYLON.value}:")
|
||||
|
||||
for doc_id in all_slim_doc_ids:
|
||||
assert ":issue:" in doc_id, f"Expected issue ID, got: {doc_id}"
|
||||
BIN
web/public/Pylon.png
Normal file
BIN
web/public/Pylon.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 3.2 KiB |
@@ -86,6 +86,7 @@ import googleIcon from "../../../public/Google.png";
|
||||
import xenforoIcon from "../../../public/Xenforo.svg";
|
||||
import highspotIcon from "../../../public/Highspot.png";
|
||||
import bitbucketIcon from "../../../public/Bitbucket.svg";
|
||||
import pylonIcon from "../../../public/Pylon.png";
|
||||
import { FaGithub, FaRobot } from "react-icons/fa";
|
||||
import Image from "next/image";
|
||||
import { cn } from "@/lib/utils";
|
||||
@@ -1246,6 +1247,11 @@ export const BitbucketIcon = ({
|
||||
<LogoIcon size={size} className={className} src={bitbucketIcon} />
|
||||
);
|
||||
|
||||
export const PylonIcon = ({
|
||||
size = 16,
|
||||
className = defaultTailwindCSS,
|
||||
}: IconProps) => <LogoIcon size={size} className={className} src={pylonIcon} />;
|
||||
|
||||
export const GlobeIcon = ({
|
||||
size = 16,
|
||||
className = defaultTailwindCSSBlue,
|
||||
|
||||
@@ -346,6 +346,42 @@ export const connectorConfigs: Record<
|
||||
],
|
||||
advanced_values: [],
|
||||
},
|
||||
pylon: {
|
||||
description: "Configure Pylon connector",
|
||||
subtext:
|
||||
"Pylon connector supports indexing pylon issues and, optionally, related messages and attachments. Issues can only be retrieved by their creation date, consider adjusting the Lookback Days for tracking activity.",
|
||||
values: [
|
||||
{
|
||||
type: "multiselect",
|
||||
label: "Additional data",
|
||||
description: "Select additional data to index.",
|
||||
name: "pylon_entities",
|
||||
optional: false,
|
||||
options: [
|
||||
{ name: "Messages", value: "messages" },
|
||||
{ name: "Attachments (up to 10 MB)", value: "attachments" },
|
||||
],
|
||||
default: ["messages", "attachments"],
|
||||
},
|
||||
{
|
||||
type: "text",
|
||||
label: "Start Date",
|
||||
name: "start_date",
|
||||
description: `Pylon API is based on dates. The connector will ingest data starting from this date. Format: YYYY-MM-DD`,
|
||||
default: "2025-01-01",
|
||||
optional: true,
|
||||
},
|
||||
{
|
||||
type: "number",
|
||||
label: "Lookback Days",
|
||||
name: "lookback_days",
|
||||
description: `Number of days to look back for updated issues. Issues are only re-indexed if they have activity within the sync window.`,
|
||||
default: 7,
|
||||
optional: true,
|
||||
},
|
||||
],
|
||||
advanced_values: [],
|
||||
},
|
||||
gitbook: {
|
||||
description: "Configure GitBook connector",
|
||||
values: [
|
||||
|
||||
@@ -267,6 +267,10 @@ export interface ImapCredentialJson {
|
||||
imap_password: string;
|
||||
}
|
||||
|
||||
export interface PylonCredentialJson {
|
||||
pylon_api_key: string;
|
||||
}
|
||||
|
||||
export const credentialTemplates: Record<ValidSources, any> = {
|
||||
github: { github_access_token: "" } as GithubCredentialJson,
|
||||
gitlab: {
|
||||
@@ -277,6 +281,9 @@ export const credentialTemplates: Record<ValidSources, any> = {
|
||||
bitbucket_email: "",
|
||||
bitbucket_api_token: "",
|
||||
} as BitbucketCredentialJson,
|
||||
pylon: {
|
||||
pylon_api_key: "",
|
||||
} as PylonCredentialJson,
|
||||
slack: { slack_bot_token: "" } as SlackCredentialJson,
|
||||
bookstack: {
|
||||
bookstack_base_url: "",
|
||||
@@ -620,6 +627,9 @@ export const credentialDisplayNames: Record<string, string> = {
|
||||
// Bitbucket
|
||||
bitbucket_email: "Bitbucket Account Email",
|
||||
bitbucket_api_token: "Bitbucket API Token",
|
||||
|
||||
// Pylon
|
||||
pylon_api_key: "Pylon API Key",
|
||||
};
|
||||
|
||||
export function getDisplayNameForCredentialKey(key: string): string {
|
||||
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
GithubIcon,
|
||||
GitlabIcon,
|
||||
BitbucketIcon,
|
||||
PylonIcon,
|
||||
GlobeIcon,
|
||||
GmailIcon,
|
||||
GongIcon,
|
||||
@@ -271,6 +272,12 @@ export const SOURCE_METADATA_MAP: SourceMap = {
|
||||
category: SourceCategory.TicketingAndTaskManagement,
|
||||
docs: "https://docs.onyx.app/admin/connectors/official/productboard",
|
||||
},
|
||||
pylon: {
|
||||
icon: PylonIcon,
|
||||
displayName: "Pylon",
|
||||
category: SourceCategory.TicketingAndTaskManagement,
|
||||
docs: "https://docs.onyx.app/admin/connectors/official/pylon",
|
||||
},
|
||||
|
||||
// Messaging
|
||||
slack: slackMetadata,
|
||||
|
||||
@@ -494,6 +494,7 @@ export enum ValidSources {
|
||||
Highspot = "highspot",
|
||||
Imap = "imap",
|
||||
Bitbucket = "bitbucket",
|
||||
Pylon = "pylon",
|
||||
|
||||
// Federated Connectors
|
||||
FederatedSlack = "federated_slack",
|
||||
|
||||
Reference in New Issue
Block a user