Compare commits

...

1 Commits

Author SHA1 Message Date
Daniel Kravets
967e58725f feat: pylon connector 2025-10-19 20:57:15 +03:00
16 changed files with 1563 additions and 1 deletions

View File

@@ -212,6 +212,7 @@ class DocumentSource(str, Enum):
IMAP = "imap"
BITBUCKET = "bitbucket"
PYLON = "pylon"
# Special case just for integration tests
MOCK_CONNECTOR = "mock_connector"
@@ -619,4 +620,5 @@ project management, and collaboration tools into a single, customizable platform
DocumentSource.AIRTABLE: "airtable - database",
DocumentSource.HIGHSPOT: "highspot - CRM data",
DocumentSource.IMAP: "imap - email data",
DocumentSource.PYLON: "pylon - customer support data",
}

View File

@@ -0,0 +1 @@
# Pylon connector package

View File

@@ -0,0 +1,410 @@
from __future__ import annotations
import copy
import time
from collections.abc import Iterator
from typing import Any
from typing import TYPE_CHECKING
from typing_extensions import override
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedValidationError
from onyx.connectors.interfaces import CheckpointedConnector
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import SlimDocument
from onyx.connectors.pylon.models import GetIssueMessagesResponseBody
from onyx.connectors.pylon.models import GetIssuesResponseBody
from onyx.connectors.pylon.models import Issue
from onyx.connectors.pylon.models import Message
from onyx.connectors.pylon.utils import _create_id
from onyx.connectors.pylon.utils import AttachmentData
from onyx.connectors.pylon.utils import build_auth_client
from onyx.connectors.pylon.utils import build_generic_client
from onyx.connectors.pylon.utils import download_attachment
from onyx.connectors.pylon.utils import get_time_window_days
from onyx.connectors.pylon.utils import is_valid_issue
from onyx.connectors.pylon.utils import is_valid_message
from onyx.connectors.pylon.utils import map_to_document
from onyx.connectors.pylon.utils import normalize_attachment_url
from onyx.connectors.pylon.utils import parse_pylon_datetime
from onyx.connectors.pylon.utils import parse_ymd_date
from onyx.connectors.pylon.utils import pylon_get
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.logger import setup_logger
if TYPE_CHECKING:
import httpx
logger = setup_logger()
class PylonConnectorCheckpoint(ConnectorCheckpoint):
"""Checkpoint state for resumable Pylon indexing.
Fields:
current_day_start: RFC3339 day start for current sub-window
Note: GET /issues API does not support cursor pagination per OpenAPI spec.
Each day window returns all issues in a single response.
"""
current_day_start: str | None = None
class PylonConnector(
CheckpointedConnector[PylonConnectorCheckpoint],
SlimConnectorWithPermSync,
):
"""Connector for indexing Pylon customer support data.
Creates one document per issue. Messages and attachments are embedded
as sections within the issue document.
Args:
pylon_entities: Optional entity types to include. Options: "messages", "attachments"
- Issues are always included (required as root entity)
- "messages": Include issue messages as additional sections
- "attachments": Process and include attachment content
start_date: Start date for indexing in YYYY-MM-DD format
lookback_days: Number of days to look back for updated issues (default: 7).
The connector will fetch issues created N days before the sync window to capture
updates to existing issues. Issues are only re-indexed if their
latest_message_time falls within the sync window.
batch_size: Number of documents per batch for indexing
"""
def __init__(
self,
pylon_entities: list[str],
start_date: str,
lookback_days: int = 7,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
# Issues are always included; entities only controls optional messages/attachments
self.pylon_entities = pylon_entities
self.start_epoch_sec = parse_ymd_date(start_date)
self.lookback_days = lookback_days
self.batch_size = batch_size
self.base_url: str = "https://api.usepylon.com"
self.api_key: str | None = None
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
"""Load Pylon API credentials.
Expects credentials dict with:
- pylon_api_key (required): Pylon API key
"""
self.api_key = credentials.get("pylon_api_key")
if not self.api_key:
raise ConnectorMissingCredentialError("Pylon")
return None
def _client(self) -> httpx.Client:
"""Build authenticated HTTP client."""
if not self.api_key:
raise ConnectorMissingCredentialError("Pylon")
return build_auth_client(self.api_key, self.base_url)
@override
def load_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: PylonConnectorCheckpoint,
) -> CheckpointOutput[PylonConnectorCheckpoint]:
"""Resumable indexing of Pylon issues with embedded messages and attachments.
Each issue becomes one document with all related content embedded as sections.
Checkpointing happens at day boundaries.
If messages are tracked, the start time is adjusted backwards by lookback_days
to capture updates to existing issues. Issues are filtered by latest_message_time
to only index those with recent activity.
"""
new_checkpoint = copy.deepcopy(checkpoint)
messages_enabled = "messages" in self.pylon_entities
attachments_enabled = "attachments" in self.pylon_entities
adjusted_start = start - (self.lookback_days * 24 * 60 * 60)
logger.info(
f"Applying {self.lookback_days} - "
f"(original_start={start}, adjusted_start={adjusted_start})"
)
start_boundary = self.start_epoch_sec
if new_checkpoint.current_day_start:
start_boundary = parse_pylon_datetime(new_checkpoint.current_day_start)
time_windows = get_time_window_days(adjusted_start, end, start_boundary)
if not time_windows:
new_checkpoint.has_more = False
return new_checkpoint
for idx, (start_time, end_time) in enumerate(time_windows):
for issue in self._iter_issues(
start_time,
end_time,
messages_enabled=messages_enabled,
original_start=start,
):
if not is_valid_issue(issue):
logger.warning(f"Skipping invalid issue ID: {issue.id}")
continue
attachments_urls = issue.attachment_urls or []
messages = []
if not issue.id:
logger.warning("Skipping issue without ID")
continue
if messages_enabled:
for message in self._iter_messages(issue.id):
if not is_valid_message(message):
logger.warning(
f"Skipping invalid message ID: {message.id} for issue ID: {issue.id}"
)
continue
if message.file_urls:
attachments_urls.extend(message.file_urls)
messages.append(message)
unique_attachments_data = []
if attachments_urls and attachments_enabled:
unique_attachments_urls = []
unique_normalized_urls = set()
for url in attachments_urls:
normalized_url = normalize_attachment_url(url)
if (
normalized_url
and normalized_url not in unique_normalized_urls
):
unique_normalized_urls.add(normalized_url)
unique_attachments_urls.append(url)
for attachment in self._iter_attachments(unique_attachments_urls):
attachment.url = normalize_attachment_url(attachment.url)
unique_attachments_data.append(attachment)
try:
document = map_to_document(
issue,
messages,
unique_attachments_data,
)
yield document
except Exception as e:
yield ConnectorFailure(
failed_document=DocumentFailure(
document_id=(_create_id(issue)),
document_link=issue.link,
),
failure_message=f"Failed to process Pylon Issue: {e}",
exception=e,
)
new_checkpoint.current_day_start = start_time
new_checkpoint.has_more = idx < len(time_windows) - 1
return new_checkpoint
@override
def build_dummy_checkpoint(self) -> PylonConnectorCheckpoint:
"""Create an initial checkpoint with work remaining."""
return PylonConnectorCheckpoint(has_more=True)
@override
def validate_checkpoint_json(
self, checkpoint_json: str
) -> PylonConnectorCheckpoint:
"""Validate and deserialize a checkpoint instance from JSON."""
return PylonConnectorCheckpoint.model_validate_json(checkpoint_json)
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> Iterator[list[SlimDocument]]:
"""Return document IDs for all existing Pylon issues.
Used by the pruning job to determine which documents no longer exist
in the source and should be removed from the index.
Note: This fetches ALL issues without lookback filtering, as we need
to check the existence of the issue only.
Args:
start: Optional start time (typically ignored for pruning)
end: Optional end time (typically ignored for pruning)
callback: Optional callback for progress reporting
Yields:
Batches of SlimDocument objects containing only document IDs
"""
batch: list[SlimDocument] = []
if start is not None:
start_epoch = max(start, self.start_epoch_sec)
else:
start_epoch = self.start_epoch_sec
end_epoch = end if end is not None else time.time()
time_windows = get_time_window_days(start_epoch, end_epoch, start_epoch)
if not time_windows:
return
for start_time, end_time in time_windows:
for issue in self._iter_issues(
start_time, end_time, messages_enabled=False, original_start=None
):
if not issue.id:
logger.warning(
"Skipping issue without ID during slim doc retrieval"
)
continue
doc_id = _create_id(issue)
batch.append(SlimDocument(id=doc_id))
if len(batch) >= self.batch_size:
yield batch
batch = []
if callback:
if callback.should_stop():
raise RuntimeError("pylon_slim_sync: Stop signal detected")
callback.progress("pylon_slim_sync", len(batch))
if batch:
yield batch
def validate_connector_settings(self) -> None:
"""Validate connector configuration and credentials.
Called during connector setup to ensure everything is configured correctly.
"""
if not self.api_key:
raise ConnectorMissingCredentialError("Pylon")
try:
with self._client() as client:
# Try a lightweight request to validate credentials
resp = pylon_get(client, "/accounts", {"limit": 1})
if resp.status_code == 401:
raise CredentialExpiredError(
"Invalid or expired Pylon credentials (HTTP 401)."
)
if resp.status_code == 403:
raise InsufficientPermissionsError(
"Insufficient permissions to access Pylon workspace (HTTP 403)."
)
if resp.status_code < 200 or resp.status_code >= 300:
raise UnexpectedValidationError(
f"Unexpected Pylon error (status={resp.status_code})."
)
except Exception as e:
# Network or other unexpected errors
if isinstance(
e,
(
CredentialExpiredError,
InsufficientPermissionsError,
UnexpectedValidationError,
ConnectorMissingCredentialError,
),
):
raise
raise UnexpectedValidationError(
f"Unexpected error while validating Pylon settings: {e}"
)
def _iter_issues(
self,
start_time: str,
end_time: str,
messages_enabled: bool,
original_start: SecondsSinceUnixEpoch | None = None,
) -> Iterator[Issue]:
"""Retrieve issues from Pylon within the specified time window.
Args:
start_time: API request start (may be extended backwards for lookback)
end_time: API request end
original_start: If provided, used to fetch updated issues: last_update >= original_start (Unix epoch seconds)
Yields:
Issues that meet the filtering criteria
"""
with self._client() as client:
params = {
"start_time": start_time,
"end_time": end_time,
}
response = pylon_get(client, "/issues", params)
response_body = GetIssuesResponseBody.model_validate(response.json())
issues = response_body.data or []
for issue in issues:
# Filter by latest_message_time if lookback is used
if original_start is not None:
if (
issue.created_at
and parse_pylon_datetime(issue.created_at) < original_start
):
last_update = (
(issue.resolution_time or issue.latest_message_time or None)
if messages_enabled
else issue.resolution_time
)
if not last_update:
# No last_update means no recent activity, skip it
logger.debug(
f"Skipping issue {issue.id} - no latest_message_time during lookback"
)
continue
last_update_epoch = parse_pylon_datetime(last_update)
if last_update_epoch < original_start:
# last_update is before the requested start window, skip it
logger.debug(
f"Skipping issue {issue.id} - last_update {last_update} "
f"is before original_start"
)
continue
yield issue
def _iter_messages(self, issue_id: str) -> Iterator[Message]:
"""Retrieve messages for a specific issue from Pylon.
Args:
issue_id: The ID of the issue to fetch messages for.
Yields:
Message objects associated with the issue.
"""
with self._client() as client:
response = pylon_get(client, f"/issues/{issue_id}/messages")
response_body = GetIssueMessagesResponseBody.model_validate(response.json())
messages = response_body.data or []
for message in messages:
yield message
def _iter_attachments(self, attachment_urls: list[str]) -> Iterator[AttachmentData]:
"""Retrieve attachments for a specific issue from Pylon.
Args:
attachment_urls: The URLs of the attachments to fetch.
Yields:
AttachmentData objects for successfully downloaded attachments.
Skips attachments that cannot be downloaded.
"""
with build_generic_client() as client:
for attachment_url in attachment_urls:
attachment_data = download_attachment(client, attachment_url)
if attachment_data is not None:
yield attachment_data

View File

@@ -0,0 +1,367 @@
from __future__ import annotations
from enum import Enum
from typing import Optional
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
class ErrorApiResponseBody(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
errors: Optional[list[str]] = Field(None, description="The list of errors.")
exists_id: Optional[str] = Field(
None,
description="The ID of the object that already exists if this is a duplicate object error.",
)
request_id: Optional[str] = Field(None, description="The request ID for tracking.")
class GetIssueResponseBody(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
data: Optional[Issue] = None
request_id: Optional[str] = Field(None, description="The request ID for tracking.")
class GetIssuesResponseBody(BaseModel):
"""
The response body for GET /issues.
Openapi spec describes pagination, but in practice it is not returned and ignored by the API.
Therefore, the field is ignored in ingestion.
"""
model_config = ConfigDict(
extra="ignore",
)
data: Optional[list[Issue]] = Field(
None, description="The data payload of the response."
)
pagination: Optional[Pagination] = None
request_id: Optional[str] = Field(None, description="The request ID for tracking.")
class GetIssueMessagesResponseBody(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
data: Optional[list[Message]] = Field(
None, description="The data payload of the response."
)
pagination: Optional[Pagination] = None
request_id: Optional[str] = Field(None, description="The request ID for tracking.")
class Message(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
author: Optional[MessageAuthor] = None
email_info: Optional[EmailMessageInfo] = None
file_urls: Optional[list[str]] = Field(
None, description="The URLs of the files in the message, if any."
)
id: Optional[str] = Field(None, description="The ID of the message.")
is_private: Optional[bool] = Field(
None, description="Indicates if the message is private."
)
message_html: Optional[str] = Field(
None, description="The HTML body of the message."
)
source: Optional[str] = Field(None, description="The source of the message.")
thread_id: Optional[str] = Field(
None,
description="The ID of the thread the message belongs to. This is only set for internal notes.",
)
timestamp: Optional[str] = Field(
None, description="The time at which the message was created."
)
class MessageAuthor(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
avatar_url: Optional[str] = None
contact: Optional[MiniContact] = None
name: Optional[str] = None
user: Optional[MiniUser] = None
class EmailMessageInfo(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
bcc_emails: Optional[list[str]] = Field(
None, description="The email addresses of the BCC recipients of the message."
)
cc_emails: Optional[list[str]] = Field(
None, description="The email addresses of the CC recipients of the message."
)
from_email: Optional[str] = Field(
None, description="The email address of the sender of the message."
)
to_emails: Optional[list[str]] = Field(
None, description="The email addresses of the recipients of the message."
)
class Issue(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
account: Optional[MiniAccount] = None
assignee: Optional[MiniUser] = None
attachment_urls: Optional[list[str]] = Field(
None, description="The attachment URLs attached to this issue, if any."
)
body_html: Optional[str] = Field(
None, description="The body of the issue in HTML format."
)
business_hours_first_response_seconds: Optional[int] = Field(
None,
description="The business hours time in seconds it took for the first response to the issue, if any.",
)
business_hours_resolution_seconds: Optional[int] = Field(
None,
description="The business hours time in seconds it took for the issue to be resolved, if any.",
)
chat_widget_info: Optional[IssueChatWidgetInfo] = None
created_at: Optional[str] = Field(
None, description="The time the issue was created."
)
csat_responses: Optional[list[CSATResponse]] = Field(
None, description="The CSAT responses of the issue, if any."
)
custom_fields: Optional[dict[str, CustomFieldValue]] = Field(
None, description="Custom field values associated with the issue."
)
customer_portal_visible: Optional[bool] = Field(
None, description="Whether the issue is visible in the customer portal."
)
external_issues: Optional[list[ExternalIssue]] = Field(
None, description="The external issues associated with the issue, if any."
)
first_response_seconds: Optional[int] = Field(
None,
description="The time in seconds it took for the first response to the issue, if any.",
)
first_response_time: Optional[str] = Field(
None, description="The time of the first response to the issue, if any."
)
id: Optional[str] = Field(None, description="The ID of the issue.")
latest_message_time: Optional[str] = Field(
None, description="The time of the latest message in the issue."
)
link: Optional[str] = Field(None, description="The link to the issue in Pylon.")
number: Optional[int] = Field(None, description="The number of the issue.")
number_of_touches: Optional[int] = Field(
None, description="The number of times the issue has been touched."
)
requester: Optional[MiniContact] = None
resolution_seconds: Optional[int] = Field(
None,
description="The time in seconds it took for the issue to be resolved, if any.",
)
resolution_time: Optional[str] = Field(
None, description="The time of the resolution of the issue, if any."
)
slack: Optional[SlackInfo] = None
snoozed_until_time: Optional[str] = Field(
None,
description="The time the issue was snoozed until in RFC3339 format, if any.",
)
source: Optional[Source] = Field(
None,
description=(
"The source of the issue."
"* slack IssueSourceSlack"
"* microsoft_teams IssueSourceMicrosoftTeams"
"* microsoft_teams_chat IssueSourceMicrosoftTeamsChat"
"* chat_widget IssueSourceChatWidget"
"* email IssueSourceEmail"
"* manual IssueSourceManual"
"* form IssueSourceForm"
"* discord IssueSourceDiscord"
),
)
state: Optional[str] = Field(
None,
description=(
"The state of the issue. This could be one of "
'`["new", "waiting_on_you", "waiting_on_customer", "on_hold", "closed"] or a custom status slug.'
),
)
tags: Optional[list[str]] = Field(
None, description="Tags associated with the issue."
)
team: Optional[MiniTeam] = None
title: Optional[str] = Field(None, description="The title of the issue.")
type: Optional[Type1] = Field(
None,
description="The type of the issue.\n\n* Conversation IssueTypeConversation\n\n* Ticket IssueTypeTicket",
)
class Pagination(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
cursor: str = Field(..., description="The cursor for the next page of results.")
has_next_page: bool = Field(
..., description="Indicates if there is a next page of results."
)
class MiniAccount(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
id: Optional[str] = Field(None, description="The ID of the account.")
class MiniContact(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
email: Optional[str] = Field(None, description="The email of the contact.")
id: Optional[str] = Field(None, description="The ID of the contact.")
class CSATResponse(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
comment: Optional[str] = Field(
None, description="The comment of the CSAT response."
)
score: Optional[int] = Field(None, description="The score of the CSAT response.")
class IssueChatWidgetInfo(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
page_url: Optional[str] = Field(
None,
description="The URL of the page that the user was on when they started the chat widget issue.",
)
class MiniUser(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
email: Optional[str] = Field(None, description="The email of the user.")
id: Optional[str] = Field(None, description="The ID of the user.")
class CustomFieldValue(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
slug: Optional[str] = Field(None, description="The slug of the custom field.")
value: Optional[str] = Field(
None,
description=(
"The value of the custom field. Only to be used for single-valued custom fields. "
"If unset, the custom field will be unset. If the custom field is a select field, "
"the value must be the select option slug, which you can find from the GET /custom-fields endpoint."
),
)
values: Optional[list[str]] = Field(
None,
description=(
"The values of the custom field. Only to be used for multi-valued custom fields (ex. multiselect). "
"If unset, the custom field will be unset. If the custom field is a multiselect field, "
"the values must be the select option slugs which you can find from the GET /custom-fields endpoint."
),
)
class ExternalIssue(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
external_id: Optional[str] = Field(
None,
description=(
"The external ID of the external issue."
"Jira: ID of the issue (autoincrementing number from 10000)."
"GitHub: Owner/Repo/IssueID."
"Linear: ID of the issue (UUID)."
"Asana: ID of the task (Long number)."
),
)
link: Optional[str] = Field(None, description="Link to the product issue.")
source: Optional[str] = Field(None, description="The source of the external issue.")
class SlackInfo(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
channel_id: Optional[str] = Field(
None, description="The Slack channel ID associated with the issue."
)
message_ts: Optional[str] = Field(
None, description="The root message ID of slack message that started issue."
)
workspace_id: Optional[str] = Field(
None, description="The Slack workspace ID associated with the issue."
)
class Source(Enum):
"""
The source of the issue.
* slack IssueSourceSlack
* microsoft_teams IssueSourceMicrosoftTeams
* microsoft_teams_chat IssueSourceMicrosoftTeamsChat
* chat_widget IssueSourceChatWidget
* email IssueSourceEmail
* manual IssueSourceManual
* form IssueSourceForm
* discord IssueSourceDiscord
"""
slack = "slack"
microsoft_teams = "microsoft_teams"
microsoft_teams_chat = "microsoft_teams_chat"
chat_widget = "chat_widget"
email = "email"
manual = "manual"
form = "form"
discord = "discord"
class MiniTeam(BaseModel):
model_config = ConfigDict(
extra="ignore",
)
id: Optional[str] = Field(None, description="The ID of the team.")
class Type1(Enum):
"""
The type of the issue.
* Conversation IssueTypeConversation
* Ticket IssueTypeTicket
"""
Conversation = "Conversation"
Ticket = "Ticket"

View File

@@ -0,0 +1,598 @@
from __future__ import annotations
import hashlib
import time
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from io import BytesIO
from typing import Any
import httpx
from onyx.configs.app_configs import REQUEST_TIMEOUT_SECONDS
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import FileOrigin
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import (
rate_limit_builder,
)
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.models import BasicExpertInfo
from onyx.connectors.models import Document
from onyx.connectors.models import ImageSection
from onyx.connectors.models import TextSection
from onyx.connectors.pylon.models import Issue
from onyx.connectors.pylon.models import Message
from onyx.file_processing.extract_file_text import extract_text_and_images
from onyx.file_processing.html_utils import web_html_cleanup
from onyx.file_processing.image_utils import store_image_and_create_section
from onyx.utils.logger import setup_logger
from onyx.utils.retry_wrapper import retry_builder
logger = setup_logger()
class PylonRetriableError(Exception):
"""Raised for retriable Pylon conditions (429, 5xx)."""
class PylonNonRetriableError(Exception):
"""Raised for non-retriable Pylon client errors (4xx except 429)."""
@dataclass
class AttachmentData:
"""Container for downloaded attachment data.
Attributes:
content: Raw bytes of the attachment
filename: Extracted filename from content-disposition header
content_type: MIME type from content-type header (e.g., "image/png")
url: Original presigned URL used for download
"""
content: bytes
filename: str
content_type: str
url: str
def build_auth_client(
api_key: str, base_url: str = "https://api.usepylon.com"
) -> httpx.Client:
"""Build an authenticated HTTP client for Pylon API requests.
Args:
api_key: Pylon API key for Bearer authentication
base_url: Base URL for Pylon API (defaults to production)
Returns:
Configured httpx.Client with authentication headers
"""
client = httpx.Client(
base_url=base_url,
headers={"Authorization": f"Bearer {api_key}"},
timeout=REQUEST_TIMEOUT_SECONDS,
follow_redirects=True,
)
return client
def build_generic_client() -> httpx.Client:
"""
Build a generic HTTP client for downloading attachments from Pylon issues and messages.
These attachments are presigned URLs and do not require authentication.
Returns:
Configured httpx.Client with authentication headers
"""
client = httpx.Client(
timeout=REQUEST_TIMEOUT_SECONDS,
follow_redirects=True,
)
return client
@retry_builder(
tries=6,
delay=1,
backoff=2,
max_delay=30,
exceptions=(PylonRetriableError, httpx.RequestError, httpx.ConnectError),
)
@rate_limit_builder(max_calls=20, period=60)
def pylon_get(
client: httpx.Client, url: str, params: dict[str, Any] | None = None
) -> httpx.Response:
"""Perform a GET against Pylon API with retry and rate limiting.
Retries on 429 and 5xx responses, and on transport errors. Honors
`Retry-After` header for 429 when present by sleeping before retrying.
"""
try:
response = client.get(url, params=params)
except httpx.RequestError:
# Allow retry_builder to handle retries of transport errors
raise
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
status = e.response.status_code if e.response is not None else None
if status == 429:
retry_after = e.response.headers.get("Retry-After") if e.response else None
if retry_after is not None:
try:
time.sleep(int(retry_after))
except (TypeError, ValueError):
pass
raise PylonRetriableError("Pylon rate limit exceeded (429)") from e
if status is not None and 500 <= status < 600:
raise PylonRetriableError(f"Pylon server error: {status}") from e
if status is not None and 400 <= status < 500:
raise PylonNonRetriableError(f"Pylon client error: {status}") from e
# Unknown status, propagate
raise
return response
def parse_ymd_date(date_str: str) -> SecondsSinceUnixEpoch:
"""Parse YYYY-MM-DD date string to Unix timestamp.
Args:
date_str: Date string in YYYY-MM-DD format
Returns:
Unix timestamp as SecondsSinceUnixEpoch
"""
try:
# Convert start_date (expected format: YYYY-MM-DD) to SecondsSinceUnixEpoch
dt = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
date_epoch = dt.timestamp()
except Exception as e:
# Default to 2025-01-01 UTC if there's a parsing error
logger.warning(
"Unable to parse start date from '%s'. Reason '%s'", date_str, str(e)
)
date_epoch = datetime(2025, 1, 1, tzinfo=timezone.utc).timestamp()
return date_epoch
def parse_pylon_datetime(datetime_str: str) -> SecondsSinceUnixEpoch:
"""Parse RFC3339 datetime string to Unix timestamp.
Args:
datetime_str: RFC3339 formatted datetime string
Returns:
Unix timestamp as SecondsSinceUnixEpoch
"""
from datetime import datetime, timezone
# Parse RFC3339 format (ISO with timezone)
dt = datetime.fromisoformat(datetime_str.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.timestamp()
def get_time_window_days(
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
start_boundary: SecondsSinceUnixEpoch,
) -> list[tuple[str, str]]:
"""Split time window into daily chunks for better checkpointing.
Args:
start: Start timestamp
end: End timestamp
start_boundary: either global start_epoch_sec or date from checkpoint
Returns:
List of (start_iso, end_iso) tuples for each day
"""
# Respect start_epoch_sec if provided
if start_boundary and start < start_boundary:
start = start_boundary
days = []
start_dt = datetime.fromtimestamp(start, tz=timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
)
end_dt = datetime.fromtimestamp(end, tz=timezone.utc)
one_day = timedelta(days=1)
while start_dt <= end_dt:
day_start = start_dt
day_end = min(start_dt + one_day, end_dt)
days.append((day_start.isoformat(), day_end.isoformat()))
start_dt += one_day
return days
def generate_attachment_id(stable_url: str) -> str:
"""Generate stable ID for attachment based on URL.
Args:
stable_url: URL with query parameters stripped
Returns:
Hash-based stable ID
"""
# Create a hash of the URL for stable ID generation
url_hash = hashlib.md5(stable_url.encode()).hexdigest()[:16]
return f"pylon_attachment_{url_hash}"
def normalize_attachment_url(url: str) -> str:
"""Normalize attachment URL by removing query parameters.
Args:
url: Raw attachment URL
Returns:
URL with query parameters removed
Example url:
"https://assets.usepylon.com/UUID_1/UUID_2-image.png?Expires=253370764800&Signature=SIG_VALUE&Key-Pair-Id=VALUE"
"""
from urllib.parse import urlparse, urlunparse
parsed = urlparse(url)
# Remove query parameters and fragment
normalized = urlunparse(
(
parsed.scheme,
parsed.netloc,
parsed.path,
parsed.params,
"", # Remove query
"", # Remove fragment
)
)
return normalized
def _parse_filename_from_content_disposition(content_disposition: str) -> str | None:
"""Parse filename from Content-Disposition header.
Args:
content_disposition: Value of Content-Disposition header
Returns:
Extracted filename or None if not found
Example:
"inline; filename=Screenshot 2025-10-12 at 8.22.32.png" -> "Screenshot 2025-10-12 at 8.22.32.png"
'attachment; filename="document.pdf"' -> "document.pdf"
"""
import re
if not content_disposition:
return None
# Try to match filename="..." or filename=... patterns
# Handle both quoted and unquoted filenames
patterns = [
r"filename\*=UTF-8\'\'([^;]+)", # RFC 5987 encoded filename
r'filename="([^"]+)"', # Quoted filename
r"filename='([^']+)'", # Single-quoted filename
r"filename=([^;]+)", # Unquoted filename
]
for pattern in patterns:
match = re.search(pattern, content_disposition, re.IGNORECASE)
if match:
filename = match.group(1).strip()
# URL decode if it's RFC 5987 encoded
if "UTF-8''" in pattern:
from urllib.parse import unquote
filename = unquote(filename)
return filename if filename else None
return None
def _clean_html_to_text(
html: str,
) -> str:
"""Convert HTML fragments to cleaned plain text using shared HTML utils.
This applies consistent stripping of boilerplate (e.g., style/script/link/meta),
normalizes whitespace, and respects global configuration for link handling
and readability extraction.
Args:
html: The HTML string to clean.
discard: Additional element names to discard beyond defaults. If not
provided, a safe default set for inline fragments is used.
mintlify_cleanup: Whether to enable Mintlify-specific class pruning.
Returns:
Cleaned plain text suitable for indexing.
"""
default_discard = [
"style",
"script",
"link",
"meta",
"svg",
"noscript",
"template",
]
parsed = web_html_cleanup(
html,
mintlify_cleanup_enabled=False,
additional_element_types_to_discard=default_discard,
)
return parsed.cleaned_text
@retry_builder(tries=3, delay=1.0, backoff=2.0)
def download_attachment(
client: httpx.Client, presigned_url: str
) -> AttachmentData | None:
"""Download attachment from presigned URL with retry logic.
Performs a HEAD request first to check content-disposition header and content-length.
Skips attachments without content-disposition header or those exceeding 10 MB.
Args:
client: HTTP client to use for the download
presigned_url: Presigned URL for the attachment (e.g., from a Pylon issue)
Returns:
AttachmentData object containing content, filename, content_type, and url,
or None if file should be skipped
Raises:
Exception: If download fails after retries or if response is unsuccessful
"""
MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024 # 10 MB in bytes
# First, make a HEAD request to check content-disposition and content-length
head_response = client.head(presigned_url)
head_response.raise_for_status()
# Extract filename from content-disposition header
content_disposition = head_response.headers.get("content-disposition")
if not content_disposition:
logger.info(
f"Skipping attachment download (no content-disposition header): {presigned_url}"
)
return None
filename = _parse_filename_from_content_disposition(content_disposition)
if not filename:
logger.info(
f"Skipping attachment download (no filename in content-disposition): {presigned_url}"
)
return None
# Extract content-type (MIME type)
content_type = head_response.headers.get("content-type", "application/octet-stream")
# Check content-length
content_length_header = head_response.headers.get("content-length")
if content_length_header and content_length_header.isdigit():
content_length = int(content_length_header)
if content_length > MAX_ATTACHMENT_SIZE:
logger.info(
f"Skipping attachment download (size {content_length} bytes exceeds "
f"{MAX_ATTACHMENT_SIZE} bytes limit): {presigned_url}"
)
return None
response = client.get(presigned_url)
response.raise_for_status()
if not response.content:
logger.warning(f"Empty response content from: {presigned_url}")
return None
return AttachmentData(
content=response.content,
filename=filename,
content_type=content_type,
url=presigned_url,
)
def map_to_document(
issue: Issue, messages: list[Message], attachments: list[AttachmentData]
) -> Document:
"""
Map Pylon issue, messages, and attachments to a single Document object.
Behavior:
- Converts issue and message HTML to plain text and appends as TextSection.
- Processes each attachment via extract_text_and_images; appends extracted text as TextSection.
- Persists only images (original image attachments and any embedded images) to FileStore and
appends ImageSection referencing the stored file id.
- Attachment URLs are presigned/ephemeral, so they are not added to the link field.
"""
sections: list[TextSection | ImageSection] = []
# Issue body
if issue.body_html:
cleaned_issue_text = _clean_html_to_text(issue.body_html)
if cleaned_issue_text:
sections.append(TextSection(link=issue.link, text=cleaned_issue_text))
# Messages
for idx, message in enumerate(messages):
if idx == 0:
# the first message's `message_html` and the issue's body_html are identical
continue
message_html = message.message_html
if not message_html:
continue
cleaned_message_text = _clean_html_to_text(message_html or "")
if cleaned_message_text:
link = f"{issue.link}&messageID={message.id}" if issue.link else None
sections.append(TextSection(link=link, text=cleaned_message_text))
sections.extend(_process_attachments(attachments))
if issue.title:
semantic_identifier = issue.title
elif issue.number is not None:
semantic_identifier = f"Issue #{issue.number}"
else:
semantic_identifier = f"Issue {issue.id or 'unknown'}"
metadata = _create_metadata(issue)
document = Document(
id=f"pylon:issue:{issue.id}",
source=DocumentSource.PYLON,
semantic_identifier=semantic_identifier,
title=issue.title or semantic_identifier,
sections=sections,
metadata=metadata,
)
assignee = issue.assignee.email if issue.assignee and issue.assignee.email else None
if assignee:
document.secondary_owners = [BasicExpertInfo(email=assignee)]
requester = (
issue.requester.email if issue.requester and issue.requester.email else None
)
if requester:
document.primary_owners = [BasicExpertInfo(email=requester)]
return document
def _process_attachments(
attachments: list[AttachmentData],
) -> Sequence[TextSection | ImageSection]:
sections: list[TextSection | ImageSection] = []
for attachment in attachments:
# Stable base id derived from normalized (query-stripped) URL
base_stable_url = normalize_attachment_url(attachment.url)
base_id = generate_attachment_id(base_stable_url)
# Persist original if it is an image
if attachment.content_type.startswith("image/"):
try:
image_section, _ = store_image_and_create_section(
image_data=attachment.content,
file_id=base_id,
display_name=attachment.filename,
media_type=attachment.content_type,
file_origin=FileOrigin.CONNECTOR,
)
logger.debug(
f"Attachment Image: {image_section.image_file_id}: {attachment.content_type}: {attachment.filename}"
)
sections.append(image_section)
except Exception:
# Best-effort: do not fail the whole document if image storage fails
logger.exception(
"Failed to persist original image attachment for Pylon"
)
else:
extraction_result = extract_text_and_images(
file=BytesIO(attachment.content),
file_name=attachment.filename,
pdf_pass=None,
content_type=attachment.content_type,
image_callback=None,
)
if extraction_result.text_content:
sections.append(TextSection(text=extraction_result.text_content))
# Persist any embedded images discovered during extraction
for idx, (img_bytes, img_name) in enumerate(
extraction_result.embedded_images
):
try:
mime = "application/octet-stream"
image_section, _ = store_image_and_create_section(
image_data=img_bytes,
file_id=f"{base_id}:{idx}",
display_name=img_name,
media_type=mime,
file_origin=FileOrigin.CONNECTOR,
)
logger.debug(
f"Attachment Image Embedded: {image_section.image_file_id}: "
f"{attachment.content_type}: {attachment.filename}"
)
sections.append(image_section)
except Exception:
logger.exception(
"Failed to persist embedded image for Pylon attachment"
)
return sections
def _create_metadata(issue: Issue) -> dict[str, str | list[str]]:
# Build metadata with string or list[str] values only
metadata: dict[str, str | list[str]] = {
"entity_type": issue.type.value if issue.type else "Ticket"
}
if issue.number is not None:
metadata["issue_number"] = str(issue.number)
if issue.state:
metadata["state"] = issue.state
if issue.created_at:
metadata["created_at"] = issue.created_at
updated_at = issue.latest_message_time or issue.resolution_time or issue.created_at
if updated_at:
metadata["updated_at"] = updated_at
if issue.tags:
metadata["tags"] = issue.tags
if issue.source is not None:
metadata["source"] = str(issue.source.value)
return metadata
def is_valid_issue(issue: Issue) -> bool:
"""
Validate that the Issue model has all required fields.
OpenAPI spec does not specify required fields, so we check manually.
The intended use is to ignore issues that are missing critical fields.
Args:
issue: Issue model to validate
Returns:
True if valid, False otherwise
"""
required_fields = ["id", "body_html"]
for field in required_fields:
if not hasattr(issue, field) or getattr(issue, field) is None:
return False
return True
def is_valid_message(issue: Message) -> bool:
"""
Validate that the Message model has all required fields.
OpenAPI spec does not specify required fields, so we check manually.
The intended use is to ignore issues that are missing critical fields.
Args:
issue: Issue model to validate
Returns:
True if valid, False otherwise
"""
required_fields = ["id", "message_html"]
for field in required_fields:
if not hasattr(issue, field) or getattr(issue, field) is None:
return False
return True
def _create_id(issue: Issue) -> str:
return f"{DocumentSource.PYLON.value}:issue:{issue.id}"

View File

@@ -200,6 +200,10 @@ CONNECTOR_CLASS_MAP = {
module_path="onyx.connectors.bitbucket.connector",
class_name="BitbucketConnector",
),
DocumentSource.PYLON: ConnectorMapping(
module_path="onyx.connectors.pylon.connector",
class_name="PylonConnector",
),
# just for integration tests
DocumentSource.MOCK_CONNECTOR: ConnectorMapping(
module_path="onyx.connectors.mock_connector.connector",

View File

@@ -53,7 +53,7 @@ def test_bitbucket_checkpointed_load(
)
assert isinstance(docs, list)
assert len(docs) > 0
for doc in docs:
assert doc.source == DocumentSource.BITBUCKET
assert doc.metadata is not None

View File

@@ -0,0 +1,5 @@
from tests.load_env_vars import load_env_vars
# Load environment variables at the module level
load_env_vars()

View File

@@ -0,0 +1,64 @@
import os
import pytest
from onyx.configs.constants import DocumentSource
from onyx.connectors.pylon.connector import PylonConnector
from onyx.connectors.pylon.utils import parse_ymd_date
from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector
@pytest.fixture
def pylon_connector_for_checkpoint() -> PylonConnector:
"""Daily fixture for Pylon checkpointed indexing.
Env vars:
- PYLON_API_KEY: Pylon API key
"""
api_key = os.environ.get("PYLON_API_KEY")
if not api_key:
pytest.skip("PYLON_API_KEY not set in environment")
connector = PylonConnector(
pylon_entities=["messages"],
start_date="2025-10-16",
lookback_days=0,
batch_size=10,
)
connector.load_credentials({"pylon_api_key": api_key})
return connector
def test_pylon_checkpointed_load(
pylon_connector_for_checkpoint: PylonConnector,
) -> None:
start = parse_ymd_date("2025-10-16") # fixed date to ensure results
end = start + 24 * 60 * 60 # 1 day after start
docs = load_all_docs_from_checkpoint_connector(
connector=pylon_connector_for_checkpoint,
start=start,
end=end,
)
assert isinstance(docs, list)
assert len(docs) > 0
for doc in docs:
assert doc.source == DocumentSource.PYLON
assert doc.metadata is not None
assert doc.id.startswith(
f"{DocumentSource.PYLON.value}:issue:"
), f"Unexpected document ID format: {doc.id}"
assert "state" in doc.metadata, "Missing 'state' in metadata"
assert "created_at" in doc.metadata, "Missing 'created_at' in metadata"
assert "updated_at" in doc.metadata, "Missing 'updated_at' in metadata"
assert doc.semantic_identifier, "Missing semantic_identifier"
assert (
len(doc.sections) >= 1
), f"Expected at least 1 section, got {len(doc.sections)}"

View File

@@ -0,0 +1,51 @@
import os
import time
import pytest
from onyx.configs.constants import DocumentSource
from onyx.connectors.pylon.connector import PylonConnector
from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector
@pytest.fixture
def pylon_connector_for_slim() -> PylonConnector:
api_key = os.environ.get("PYLON_API_KEY")
if not api_key:
pytest.skip("PYLON_API_KEY not set in environment")
connector = PylonConnector(
pylon_entities=["messages"], # issues always included
start_date="2025-10-16",
lookback_days=0,
batch_size=10,
)
connector.load_credentials({"pylon_api_key": api_key})
return connector
def test_pylon_full_ids_subset_of_slim_ids(
pylon_connector_for_slim: PylonConnector,
) -> None:
docs = load_all_docs_from_checkpoint_connector(
connector=pylon_connector_for_slim,
start=0,
end=time.time(),
)
all_full_doc_ids: set[str] = set([doc.id for doc in docs])
all_slim_doc_ids: set[str] = set()
for slim_doc_batch in pylon_connector_for_slim.retrieve_all_slim_docs_perm_sync():
all_slim_doc_ids.update([doc.id for doc in slim_doc_batch])
assert all_full_doc_ids.issubset(all_slim_doc_ids)
assert len(all_slim_doc_ids) > 0
if all_slim_doc_ids:
example_id = next(iter(all_slim_doc_ids))
assert example_id.startswith(f"{DocumentSource.PYLON.value}:")
for doc_id in all_slim_doc_ids:
assert ":issue:" in doc_id, f"Expected issue ID, got: {doc_id}"

BIN
web/public/Pylon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.2 KiB

View File

@@ -86,6 +86,7 @@ import googleIcon from "../../../public/Google.png";
import xenforoIcon from "../../../public/Xenforo.svg";
import highspotIcon from "../../../public/Highspot.png";
import bitbucketIcon from "../../../public/Bitbucket.svg";
import pylonIcon from "../../../public/Pylon.png";
import { FaGithub, FaRobot } from "react-icons/fa";
import Image from "next/image";
import { cn } from "@/lib/utils";
@@ -1246,6 +1247,11 @@ export const BitbucketIcon = ({
<LogoIcon size={size} className={className} src={bitbucketIcon} />
);
export const PylonIcon = ({
size = 16,
className = defaultTailwindCSS,
}: IconProps) => <LogoIcon size={size} className={className} src={pylonIcon} />;
export const GlobeIcon = ({
size = 16,
className = defaultTailwindCSSBlue,

View File

@@ -346,6 +346,42 @@ export const connectorConfigs: Record<
],
advanced_values: [],
},
pylon: {
description: "Configure Pylon connector",
subtext:
"Pylon connector supports indexing pylon issues and, optionally, related messages and attachments. Issues can only be retrieved by their creation date, consider adjusting the Lookback Days for tracking activity.",
values: [
{
type: "multiselect",
label: "Additional data",
description: "Select additional data to index.",
name: "pylon_entities",
optional: false,
options: [
{ name: "Messages", value: "messages" },
{ name: "Attachments (up to 10 MB)", value: "attachments" },
],
default: ["messages", "attachments"],
},
{
type: "text",
label: "Start Date",
name: "start_date",
description: `Pylon API is based on dates. The connector will ingest data starting from this date. Format: YYYY-MM-DD`,
default: "2025-01-01",
optional: true,
},
{
type: "number",
label: "Lookback Days",
name: "lookback_days",
description: `Number of days to look back for updated issues. Issues are only re-indexed if they have activity within the sync window.`,
default: 7,
optional: true,
},
],
advanced_values: [],
},
gitbook: {
description: "Configure GitBook connector",
values: [

View File

@@ -267,6 +267,10 @@ export interface ImapCredentialJson {
imap_password: string;
}
export interface PylonCredentialJson {
pylon_api_key: string;
}
export const credentialTemplates: Record<ValidSources, any> = {
github: { github_access_token: "" } as GithubCredentialJson,
gitlab: {
@@ -277,6 +281,9 @@ export const credentialTemplates: Record<ValidSources, any> = {
bitbucket_email: "",
bitbucket_api_token: "",
} as BitbucketCredentialJson,
pylon: {
pylon_api_key: "",
} as PylonCredentialJson,
slack: { slack_bot_token: "" } as SlackCredentialJson,
bookstack: {
bookstack_base_url: "",
@@ -620,6 +627,9 @@ export const credentialDisplayNames: Record<string, string> = {
// Bitbucket
bitbucket_email: "Bitbucket Account Email",
bitbucket_api_token: "Bitbucket API Token",
// Pylon
pylon_api_key: "Pylon API Key",
};
export function getDisplayNameForCredentialKey(key: string): string {

View File

@@ -10,6 +10,7 @@ import {
GithubIcon,
GitlabIcon,
BitbucketIcon,
PylonIcon,
GlobeIcon,
GmailIcon,
GongIcon,
@@ -271,6 +272,12 @@ export const SOURCE_METADATA_MAP: SourceMap = {
category: SourceCategory.TicketingAndTaskManagement,
docs: "https://docs.onyx.app/admin/connectors/official/productboard",
},
pylon: {
icon: PylonIcon,
displayName: "Pylon",
category: SourceCategory.TicketingAndTaskManagement,
docs: "https://docs.onyx.app/admin/connectors/official/pylon",
},
// Messaging
slack: slackMetadata,

View File

@@ -494,6 +494,7 @@ export enum ValidSources {
Highspot = "highspot",
Imap = "imap",
Bitbucket = "bitbucket",
Pylon = "pylon",
// Federated Connectors
FederatedSlack = "federated_slack",