Compare commits

...

6 Commits

Author SHA1 Message Date
Evan Lohn
1529add282 nits 2025-10-07 11:24:15 -07:00
Evan Lohn
47c4197292 fix attachment ids 2025-10-07 11:12:10 -07:00
Evan Lohn
308a8fb619 feat: attachments are separate docs 2025-10-06 21:39:11 -07:00
Evan Lohn
ed1e8f972a nit 2025-10-03 10:10:05 -07:00
Evan Lohn
e0f754c776 nits 2025-10-03 10:07:31 -07:00
Evan Lohn
eb647680a9 better interface for slim connectors 2025-10-03 10:07:31 -07:00
35 changed files with 305 additions and 170 deletions

View File

@@ -124,9 +124,9 @@ def get_space_permission(
and not space_permissions.external_user_group_ids
):
logger.warning(
f"No permissions found for space '{space_key}'. This is very unlikely"
"to be correct and is more likely caused by an access token with"
"insufficient permissions. Make sure that the access token has Admin"
f"No permissions found for space '{space_key}'. This is very unlikely "
"to be correct and is more likely caused by an access token with "
"insufficient permissions. Make sure that the access token has Admin "
f"permissions for space '{space_key}'"
)

View File

@@ -26,7 +26,7 @@ def _get_slim_doc_generator(
else 0.0
)
return gmail_connector.retrieve_all_slim_documents(
return gmail_connector.retrieve_all_slim_docs_perm_sync(
start=start_time,
end=current_time.timestamp(),
callback=callback,

View File

@@ -34,7 +34,7 @@ def _get_slim_doc_generator(
else 0.0
)
return google_drive_connector.retrieve_all_slim_documents(
return google_drive_connector.retrieve_all_slim_docs_perm_sync(
start=start_time,
end=current_time.timestamp(),
callback=callback,

View File

@@ -105,7 +105,9 @@ def _get_slack_document_access(
channel_permissions: dict[str, ExternalAccess],
callback: IndexingHeartbeatInterface | None,
) -> Generator[DocExternalAccess, None, None]:
slim_doc_generator = slack_connector.retrieve_all_slim_documents(callback=callback)
slim_doc_generator = slack_connector.retrieve_all_slim_docs_perm_sync(
callback=callback
)
for doc_metadata_batch in slim_doc_generator:
for doc_metadata in doc_metadata_batch:

View File

@@ -4,7 +4,7 @@ from ee.onyx.external_permissions.perm_sync_types import FetchAllDocumentsIdsFun
from onyx.access.models import DocExternalAccess
from onyx.access.models import ExternalAccess
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.db.models import ConnectorCredentialPair
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.logger import setup_logger
@@ -17,7 +17,7 @@ def generic_doc_sync(
fetch_all_existing_docs_ids_fn: FetchAllDocumentsIdsFunction,
callback: IndexingHeartbeatInterface | None,
doc_source: DocumentSource,
slim_connector: SlimConnector,
slim_connector: SlimConnectorWithPermSync,
label: str,
) -> Generator[DocExternalAccess, None, None]:
"""
@@ -40,7 +40,7 @@ def generic_doc_sync(
newly_fetched_doc_ids: set[str] = set()
logger.info(f"Fetching all slim documents from {doc_source}")
for doc_batch in slim_connector.retrieve_all_slim_documents(callback=callback):
for doc_batch in slim_connector.retrieve_all_slim_docs_perm_sync(callback=callback):
logger.info(f"Got {len(doc_batch)} slim documents from {doc_source}")
if callback:

View File

@@ -19,7 +19,9 @@ from onyx.connectors.interfaces import CheckpointedConnector
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import Document
from onyx.connectors.models import SlimDocument
from onyx.httpx.httpx_pool import HttpxPool
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.logger import setup_logger
@@ -30,7 +32,7 @@ PRUNING_CHECKPOINTED_BATCH_SIZE = 32
def document_batch_to_ids(
doc_batch: Iterator[list[Document]],
doc_batch: Iterator[list[Document]] | Iterator[list[SlimDocument]],
) -> Generator[set[str], None, None]:
for doc_list in doc_batch:
yield {doc.id for doc in doc_list}
@@ -41,20 +43,24 @@ def extract_ids_from_runnable_connector(
callback: IndexingHeartbeatInterface | None = None,
) -> set[str]:
"""
If the SlimConnector hasnt been implemented for the given connector, just pull
If the given connector is neither a SlimConnector nor a SlimConnectorWithPermSync, just pull
all docs using the load_from_state and grab out the IDs.
Optionally, a callback can be passed to handle the length of each document batch.
"""
all_connector_doc_ids: set[str] = set()
if isinstance(runnable_connector, SlimConnector):
for metadata_batch in runnable_connector.retrieve_all_slim_documents():
all_connector_doc_ids.update({doc.id for doc in metadata_batch})
doc_batch_id_generator = None
if isinstance(runnable_connector, LoadConnector):
if isinstance(runnable_connector, SlimConnector):
doc_batch_id_generator = document_batch_to_ids(
runnable_connector.retrieve_all_slim_docs()
)
elif isinstance(runnable_connector, SlimConnectorWithPermSync):
doc_batch_id_generator = document_batch_to_ids(
runnable_connector.retrieve_all_slim_docs_perm_sync()
)
# If the connector isn't slim, fall back to running it normally to get ids
elif isinstance(runnable_connector, LoadConnector):
doc_batch_id_generator = document_batch_to_ids(
runnable_connector.load_from_state()
)
@@ -78,13 +84,14 @@ def extract_ids_from_runnable_connector(
raise RuntimeError("Pruning job could not find a valid runnable_connector.")
# this function is called per batch for rate limiting
def doc_batch_processing_func(doc_batch_ids: set[str]) -> set[str]:
return doc_batch_ids
if MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE:
doc_batch_processing_func = rate_limit_builder(
doc_batch_processing_func = (
rate_limit_builder(
max_calls=MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE, period=60
)(lambda x: x)
if MAX_PRUNING_DOCUMENT_RETRIEVAL_PER_MINUTE
else lambda x: x
)
for doc_batch_ids in doc_batch_id_generator:
if callback:
if callback.should_stop():

View File

@@ -41,7 +41,7 @@ All new connectors should have tests added to the `backend/tests/daily/connector
#### Implementing the new Connector
The connector must subclass one or more of LoadConnector, PollConnector, SlimConnector, or EventConnector.
The connector must subclass one or more of LoadConnector, PollConnector, CheckpointedConnector, or CheckpointedConnectorWithPermSync
The `__init__` should take arguments for configuring what documents the connector will and where it finds those
documents. For example, if you have a wiki site, it may include the configuration for the team, topic, folder, etc. of

View File

@@ -25,7 +25,7 @@ from onyx.connectors.exceptions import UnexpectedValidationError
from onyx.connectors.interfaces import CheckpointedConnector
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import ConnectorMissingCredentialError
@@ -56,7 +56,7 @@ class BitbucketConnectorCheckpoint(ConnectorCheckpoint):
class BitbucketConnector(
CheckpointedConnector[BitbucketConnectorCheckpoint],
SlimConnector,
SlimConnectorWithPermSync,
):
"""Connector for indexing Bitbucket Cloud pull requests.
@@ -266,7 +266,7 @@ class BitbucketConnector(
"""Validate and deserialize a checkpoint instance from JSON."""
return BitbucketConnectorCheckpoint.model_validate_json(checkpoint_json)
def retrieve_all_slim_documents(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,

View File

@@ -22,7 +22,6 @@ from onyx.connectors.confluence.onyx_confluence import OnyxConfluence
from onyx.connectors.confluence.utils import build_confluence_document_id
from onyx.connectors.confluence.utils import convert_attachment_to_content
from onyx.connectors.confluence.utils import datetime_from_string
from onyx.connectors.confluence.utils import process_attachment
from onyx.connectors.confluence.utils import update_param_in_path
from onyx.connectors.confluence.utils import validate_attachment_filetype
from onyx.connectors.credentials_provider import OnyxStaticCredentialsProvider
@@ -42,6 +41,7 @@ from onyx.connectors.interfaces import CredentialsProviderInterface
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import BasicExpertInfo
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
@@ -92,6 +92,7 @@ class ConfluenceCheckpoint(ConnectorCheckpoint):
class ConfluenceConnector(
CheckpointedConnector[ConfluenceCheckpoint],
SlimConnector,
SlimConnectorWithPermSync,
CredentialsConnector,
):
def __init__(
@@ -248,9 +249,26 @@ class ConfluenceConnector(
page_query += " order by lastmodified asc"
return page_query
def _construct_attachment_query(self, confluence_page_id: str) -> str:
def _construct_attachment_query(
self,
confluence_page_id: str,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> str:
attachment_query = f"type=attachment and container='{confluence_page_id}'"
attachment_query += self.cql_label_filter
# Add time filters to avoid reprocessing unchanged attachments during refresh
if start:
formatted_start_time = datetime.fromtimestamp(
start, tz=self.timezone
).strftime("%Y-%m-%d %H:%M")
attachment_query += f" and lastmodified >= '{formatted_start_time}'"
if end:
formatted_end_time = datetime.fromtimestamp(end, tz=self.timezone).strftime(
"%Y-%m-%d %H:%M"
)
attachment_query += f" and lastmodified <= '{formatted_end_time}'"
attachment_query += " order by lastmodified asc"
return attachment_query
def _get_comment_string_for_page_id(self, page_id: str) -> str:
@@ -304,41 +322,8 @@ class ConfluenceConnector(
sections.append(
TextSection(text=comment_text, link=f"{page_url}#comments")
)
# Process attachments
if "children" in page and "attachment" in page["children"]:
attachments = self.confluence_client.get_attachments_for_page(
page_id, expand="metadata"
)
for attachment in attachments.get("results", []):
# Process each attachment
result = process_attachment(
self.confluence_client,
attachment,
page_id,
self.allow_images,
)
if result and result.text:
# Create a section for the attachment text
attachment_section = TextSection(
text=result.text,
link=f"{page_url}#attachment-{attachment['id']}",
)
sections.append(attachment_section)
elif result and result.file_name:
# Create an ImageSection for image attachments
image_section = ImageSection(
link=f"{page_url}#attachment-{attachment['id']}",
image_file_id=result.file_name,
)
sections.append(image_section)
else:
logger.warning(
f"Error processing attachment '{attachment.get('title')}':",
f"{result.error if result else 'Unknown error'}",
)
# Note: attachments are no longer merged into the page document.
# They are indexed as separate documents downstream.
# Extract metadata
metadata = {}
@@ -387,9 +372,20 @@ class ConfluenceConnector(
)
def _fetch_page_attachments(
self, page: dict[str, Any], doc: Document
) -> Document | ConnectorFailure:
attachment_query = self._construct_attachment_query(page["id"])
self,
page: dict[str, Any],
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> tuple[list[Document], list[ConnectorFailure]]:
"""
Inline attachments are added directly to the document as text or image sections by
this function. The returned documents/connectorfailures are for non-inline attachments
and those at the end of the page.
"""
attachment_query = self._construct_attachment_query(page["id"], start, end)
attachment_failures: list[ConnectorFailure] = []
attachment_docs: list[Document] = []
page_url = ""
for attachment in self.confluence_client.paginated_cql_retrieval(
cql=attachment_query,
@@ -418,11 +414,17 @@ class ConfluenceConnector(
logger.info(
f"Processing attachment: {attachment['title']} attached to page {page['title']}"
)
# Attempt to get textual content or image summarization:
object_url = build_confluence_document_id(
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
)
# Attachment document id: use the download URL for stable identity
try:
object_url = build_confluence_document_id(
self.wiki_base, attachment["_links"]["download"], self.is_cloud
)
except Exception as e:
logger.warning(
f"Invalid attachment url for id {attachment['id']}, skipping"
)
logger.debug(f"Error building attachment url: {e}")
continue
try:
response = convert_attachment_to_content(
confluence_client=self.confluence_client,
@@ -435,38 +437,76 @@ class ConfluenceConnector(
content_text, file_storage_name = response
sections: list[TextSection | ImageSection] = []
if content_text:
doc.sections.append(
TextSection(
text=content_text,
link=object_url,
)
)
sections.append(TextSection(text=content_text, link=object_url))
elif file_storage_name:
doc.sections.append(
ImageSection(
link=object_url,
image_file_id=file_storage_name,
)
sections.append(
ImageSection(link=object_url, image_file_id=file_storage_name)
)
# Build attachment-specific metadata
attachment_metadata: dict[str, str | list[str]] = {}
if "space" in attachment:
attachment_metadata["space"] = attachment["space"].get("name", "")
labels: list[str] = []
if "metadata" in attachment and "labels" in attachment["metadata"]:
for label in attachment["metadata"]["labels"].get("results", []):
labels.append(label.get("name", ""))
if labels:
attachment_metadata["labels"] = labels
page_url = page_url or build_confluence_document_id(
self.wiki_base, page["_links"]["webui"], self.is_cloud
)
attachment_metadata["parent_page_id"] = page_url
attachment_id = build_confluence_document_id(
self.wiki_base, attachment["_links"]["webui"], self.is_cloud
)
primary_owners: list[BasicExpertInfo] | None = None
if "version" in attachment and "by" in attachment["version"]:
author = attachment["version"]["by"]
display_name = author.get("displayName", "Unknown")
email = author.get("email", "unknown@domain.invalid")
primary_owners = [
BasicExpertInfo(display_name=display_name, email=email)
]
attachment_doc = Document(
id=attachment_id,
sections=sections,
source=DocumentSource.CONFLUENCE,
semantic_identifier=attachment.get("title", object_url),
metadata=attachment_metadata,
doc_updated_at=(
datetime_from_string(attachment["version"]["when"])
if attachment.get("version")
and attachment["version"].get("when")
else None
),
primary_owners=primary_owners,
)
attachment_docs.append(attachment_doc)
except Exception as e:
logger.error(
f"Failed to extract/summarize attachment {attachment['title']}",
exc_info=e,
)
if is_atlassian_date_error(
e
): # propagate error to be caught and retried
if is_atlassian_date_error(e):
# propagate error to be caught and retried
raise
return ConnectorFailure(
failed_document=DocumentFailure(
document_id=doc.id,
document_link=object_url,
),
failure_message=f"Failed to extract/summarize attachment {attachment['title']} for doc {doc.id}",
exception=e,
attachment_failures.append(
ConnectorFailure(
failed_document=DocumentFailure(
document_id=object_url,
document_link=object_url,
),
failure_message=f"Failed to extract/summarize attachment {attachment['title']} for doc {object_url}",
exception=e,
)
)
return doc
return attachment_docs, attachment_failures
def _fetch_document_batches(
self,
@@ -505,12 +545,18 @@ class ConfluenceConnector(
if isinstance(doc_or_failure, ConnectorFailure):
yield doc_or_failure
continue
# Now get attachments for that page:
doc_or_failure = self._fetch_page_attachments(page, doc_or_failure)
# yield completed document (or failure)
yield doc_or_failure
# Now get attachments for that page:
attachment_docs, attachment_failures = self._fetch_page_attachments(
page, start, end
)
# yield attached docs and failures
yield from attachment_docs
yield from attachment_failures
# Create checkpoint once a full page of results is returned
if checkpoint.next_page_url and checkpoint.next_page_url != page_query_url:
return checkpoint
@@ -563,7 +609,21 @@ class ConfluenceConnector(
def validate_checkpoint_json(self, checkpoint_json: str) -> ConfluenceCheckpoint:
return ConfluenceCheckpoint.model_validate_json(checkpoint_json)
def retrieve_all_slim_documents(
@override
def retrieve_all_slim_docs(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
) -> GenerateSlimDocumentOutput:
return self._retrieve_all_slim_docs(
start=start,
end=end,
callback=callback,
include_permissions=False,
)
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
@@ -573,12 +633,28 @@ class ConfluenceConnector(
Return 'slim' docs (IDs + minimal permission data).
Does not fetch actual text. Used primarily for incremental permission sync.
"""
return self._retrieve_all_slim_docs(
start=start,
end=end,
callback=callback,
include_permissions=True,
)
def _retrieve_all_slim_docs(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: IndexingHeartbeatInterface | None = None,
include_permissions: bool = True,
) -> GenerateSlimDocumentOutput:
doc_metadata_list: list[SlimDocument] = []
restrictions_expand = ",".join(_RESTRICTIONS_EXPANSION_FIELDS)
space_level_access_info = get_all_space_permissions(
self.confluence_client, self.is_cloud
)
space_level_access_info: dict[str, ExternalAccess] = {}
if include_permissions:
space_level_access_info = get_all_space_permissions(
self.confluence_client, self.is_cloud
)
def get_external_access(
doc_id: str, restrictions: dict[str, Any], ancestors: list[dict[str, Any]]
@@ -605,8 +681,10 @@ class ConfluenceConnector(
doc_metadata_list.append(
SlimDocument(
id=page_id,
external_access=get_external_access(
page_id, page_restrictions, page_ancestors
external_access=(
get_external_access(page_id, page_restrictions, page_ancestors)
if include_permissions
else None
),
)
)
@@ -641,8 +719,12 @@ class ConfluenceConnector(
doc_metadata_list.append(
SlimDocument(
id=attachment_id,
external_access=get_external_access(
attachment_id, attachment_restrictions, []
external_access=(
get_external_access(
attachment_id, attachment_restrictions, []
)
if include_permissions
else None
),
)
)
@@ -653,10 +735,10 @@ class ConfluenceConnector(
if callback and callback.should_stop():
raise RuntimeError(
"retrieve_all_slim_documents: Stop signal detected"
"retrieve_all_slim_docs_perm_sync: Stop signal detected"
)
if callback:
callback.progress("retrieve_all_slim_documents", 1)
callback.progress("retrieve_all_slim_docs_perm_sync", 1)
yield doc_metadata_list
@@ -737,7 +819,7 @@ if __name__ == "__main__":
end = datetime.now().timestamp()
# Fetch all `SlimDocuments`.
for slim_doc in confluence_connector.retrieve_all_slim_documents():
for slim_doc in confluence_connector.retrieve_all_slim_docs_perm_sync():
print(slim_doc)
# Fetch all `Documents`.

View File

@@ -966,6 +966,13 @@ def _get_user(confluence_client: OnyxConfluence, user_id: str) -> str:
return _USER_ID_TO_DISPLAY_NAME_CACHE.get(user_id) or _USER_NOT_FOUND
def sanitize_attachment_title(title: str) -> str:
"""
Sanitize the attachment title to be a valid HTML attribute.
"""
return title.replace("<", "_").replace(">", "_").replace(" ", "_").replace(":", "_")
def extract_text_from_confluence_html(
confluence_client: OnyxConfluence,
confluence_object: dict[str, Any],
@@ -1068,6 +1075,16 @@ def extract_text_from_confluence_html(
except Exception as e:
logger.warning(f"Error processing ac:link-body: {e}")
for html_attachment in soup.findAll("ri:attachment"):
# This extracts the text from inline attachments in the page so they can be
# represented in the document text as plain text
try:
html_attachment.replaceWith(
f"<attachment>{sanitize_attachment_title(html_attachment.attrs['ri:filename'])}</attachment>"
) # to be replaced later
except Exception as e:
logger.warning(f"Error processing ac:attachment: {e}")
return format_document_soup(soup)

View File

@@ -28,7 +28,7 @@ from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import BasicExpertInfo
from onyx.connectors.models import Document
from onyx.connectors.models import ImageSection
@@ -232,7 +232,7 @@ def thread_to_document(
)
class GmailConnector(LoadConnector, PollConnector, SlimConnector):
class GmailConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None:
self.batch_size = batch_size
@@ -397,10 +397,10 @@ class GmailConnector(LoadConnector, PollConnector, SlimConnector):
if callback:
if callback.should_stop():
raise RuntimeError(
"retrieve_all_slim_documents: Stop signal detected"
"retrieve_all_slim_docs_perm_sync: Stop signal detected"
)
callback.progress("retrieve_all_slim_documents", 1)
callback.progress("retrieve_all_slim_docs_perm_sync", 1)
except HttpError as e:
if _is_mail_service_disabled_error(e):
logger.warning(
@@ -431,7 +431,7 @@ class GmailConnector(LoadConnector, PollConnector, SlimConnector):
raise PermissionError(ONYX_SCOPE_INSTRUCTIONS) from e
raise e
def retrieve_all_slim_documents(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,

View File

@@ -64,7 +64,7 @@ from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
@@ -153,7 +153,7 @@ class DriveIdStatus(Enum):
class GoogleDriveConnector(
SlimConnector, CheckpointedConnectorWithPermSync[GoogleDriveCheckpoint]
SlimConnectorWithPermSync, CheckpointedConnectorWithPermSync[GoogleDriveCheckpoint]
):
def __init__(
self,
@@ -1296,7 +1296,7 @@ class GoogleDriveConnector(
callback.progress("_extract_slim_docs_from_google_drive", 1)
yield slim_batch
def retrieve_all_slim_documents(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,

View File

@@ -18,7 +18,7 @@ from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import SlimDocument
@@ -38,7 +38,7 @@ class HighspotSpot(BaseModel):
name: str
class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
class HighspotConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
"""
Connector for loading data from Highspot.
@@ -362,7 +362,7 @@ class HighspotConnector(LoadConnector, PollConnector, SlimConnector):
description = item_details.get("description", "")
return title, description
def retrieve_all_slim_documents(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,

View File

@@ -97,11 +97,20 @@ class PollConnector(BaseConnector):
raise NotImplementedError
# Slim connectors can retrieve just the ids and
# permission syncing information for connected documents
# Slim connectors retrieve just the ids of documents
class SlimConnector(BaseConnector):
@abc.abstractmethod
def retrieve_all_slim_documents(
def retrieve_all_slim_docs(
self,
) -> GenerateSlimDocumentOutput:
raise NotImplementedError
# Slim connectors retrieve both the ids AND
# permission syncing information for connected documents
class SlimConnectorWithPermSync(BaseConnector):
@abc.abstractmethod
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,

View File

@@ -29,7 +29,7 @@ from onyx.connectors.interfaces import CheckpointedConnectorWithPermSync
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.jira.access import get_project_permissions
from onyx.connectors.jira.utils import best_effort_basic_expert_info
from onyx.connectors.jira.utils import best_effort_get_field_from_issue
@@ -360,7 +360,8 @@ class JiraConnectorCheckpoint(ConnectorCheckpoint):
class JiraConnector(
CheckpointedConnectorWithPermSync[JiraConnectorCheckpoint], SlimConnector
CheckpointedConnectorWithPermSync[JiraConnectorCheckpoint],
SlimConnectorWithPermSync,
):
def __init__(
self,
@@ -570,7 +571,7 @@ class JiraConnector(
# if we didn't retrieve a full batch, we're done
checkpoint.has_more = current_offset - starting_offset == page_size
def retrieve_all_slim_documents(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
@@ -756,7 +757,7 @@ if __name__ == "__main__":
start = 0
end = datetime.now().timestamp()
for slim_doc in connector.retrieve_all_slim_documents(
for slim_doc in connector.retrieve_all_slim_docs_perm_sync(
start=start,
end=end,
):

View File

@@ -65,6 +65,7 @@ def extract_text_from_adf(adf: dict | None) -> str:
WARNING: This function is incomplete and will e.g. skip lists!
"""
# TODO: complete this function
texts = []
if adf is not None and "content" in adf:
for block in adf["content"]:

View File

@@ -16,7 +16,7 @@ from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import BasicExpertInfo
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorMissingCredentialError
@@ -151,7 +151,7 @@ def _validate_custom_query_config(config: dict[str, Any]) -> None:
)
class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
class SalesforceConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
"""Approach outline
Goal
@@ -1119,7 +1119,7 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
with tempfile.TemporaryDirectory() as temp_dir:
return self._delta_sync(temp_dir, start, end)
def retrieve_all_slim_documents(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,

View File

@@ -41,7 +41,7 @@ from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import IndexingHeartbeatInterface
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import BasicExpertInfo
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
@@ -672,7 +672,7 @@ def _convert_sitepage_to_slim_document(
class SharepointConnector(
SlimConnector,
SlimConnectorWithPermSync,
CheckpointedConnectorWithPermSync[SharepointConnectorCheckpoint],
):
def __init__(
@@ -1597,7 +1597,7 @@ class SharepointConnector(
) -> SharepointConnectorCheckpoint:
return SharepointConnectorCheckpoint.model_validate_json(checkpoint_json)
def retrieve_all_slim_documents(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,

View File

@@ -16,7 +16,7 @@ from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import SlimDocument
@@ -164,7 +164,7 @@ def get_slab_url_from_title_id(base_url: str, title: str, page_id: str) -> str:
return urljoin(urljoin(base_url, "posts/"), url_id)
class SlabConnector(LoadConnector, PollConnector, SlimConnector):
class SlabConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
def __init__(
self,
base_url: str,
@@ -239,7 +239,7 @@ class SlabConnector(LoadConnector, PollConnector, SlimConnector):
time_filter=lambda t: start_time <= t <= end_time
)
def retrieve_all_slim_documents(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,

View File

@@ -42,7 +42,7 @@ from onyx.connectors.interfaces import CredentialsConnector
from onyx.connectors.interfaces import CredentialsProviderInterface
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import BasicExpertInfo
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
@@ -581,7 +581,7 @@ def _process_message(
class SlackConnector(
SlimConnector,
SlimConnectorWithPermSync,
CredentialsConnector,
CheckpointedConnectorWithPermSync[SlackCheckpoint],
):
@@ -732,7 +732,7 @@ class SlackConnector(
self.text_cleaner = SlackTextCleaner(client=self.client)
self.credentials_provider = credentials_provider
def retrieve_all_slim_documents(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,

View File

@@ -22,7 +22,7 @@ from onyx.connectors.interfaces import CheckpointedConnector
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import ConnectorMissingCredentialError
@@ -51,7 +51,7 @@ class TeamsCheckpoint(ConnectorCheckpoint):
class TeamsConnector(
CheckpointedConnector[TeamsCheckpoint],
SlimConnector,
SlimConnectorWithPermSync,
):
MAX_WORKERS = 10
AUTHORITY_URL_PREFIX = "https://login.microsoftonline.com/"
@@ -228,9 +228,9 @@ class TeamsConnector(
has_more=bool(todos),
)
# impls for SlimConnector
# impls for SlimConnectorWithPermSync
def retrieve_all_slim_documents(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
@@ -572,7 +572,7 @@ if __name__ == "__main__":
)
teams_connector.validate_connector_settings()
for slim_doc in teams_connector.retrieve_all_slim_documents():
for slim_doc in teams_connector.retrieve_all_slim_docs_perm_sync():
...
for doc in load_everything_from_checkpoint_connector(

View File

@@ -26,7 +26,7 @@ from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import ConnectorFailure
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import SlimConnectorWithPermSync
from onyx.connectors.models import BasicExpertInfo
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import Document
@@ -376,7 +376,7 @@ class ZendeskConnectorCheckpoint(ConnectorCheckpoint):
class ZendeskConnector(
SlimConnector, CheckpointedConnector[ZendeskConnectorCheckpoint]
SlimConnectorWithPermSync, CheckpointedConnector[ZendeskConnectorCheckpoint]
):
def __init__(
self,
@@ -565,7 +565,7 @@ class ZendeskConnector(
)
return checkpoint
def retrieve_all_slim_documents(
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,

View File

@@ -43,7 +43,9 @@ def test_bitbucket_full_ids_subset_of_slim_ids(
# Get all doc IDs from the slim connector
all_slim_doc_ids: set[str] = set()
for slim_doc_batch in bitbucket_connector_for_slim.retrieve_all_slim_documents():
for (
slim_doc_batch
) in bitbucket_connector_for_slim.retrieve_all_slim_docs_perm_sync():
all_slim_doc_ids.update([doc.id for doc in slim_doc_batch])
# The set of full doc IDs should always be a subset of slim doc IDs

View File

@@ -81,18 +81,21 @@ def _test_confluence_connector_basic(
confluence_connector, 0, time.time()
)
assert len(doc_batch) == 2
assert len(doc_batch) == (3 if expect_attachments else 2)
page_within_a_page_doc: Document | None = None
page_doc: Document | None = None
small_file_doc: Document | None = None
for doc in doc_batch:
if doc.semantic_identifier == "DailyConnectorTestSpace Home":
page_doc = doc
elif doc.semantic_identifier == "Page Within A Page":
page_within_a_page_doc = doc
elif doc.semantic_identifier == "small-file.txt":
small_file_doc = doc
else:
pass
print(f"Unexpected doc: {doc.semantic_identifier}")
assert page_within_a_page_doc is not None
assert page_within_a_page_doc.semantic_identifier == "Page Within A Page"
@@ -121,21 +124,27 @@ def _test_confluence_connector_basic(
assert page_doc.primary_owners
assert page_doc.primary_owners[0].email == "hagen@danswer.ai"
assert (
len(page_doc.sections) == 2 if expect_attachments else 1
) # page text + attachment text
len(page_doc.sections) == 1
) # just page text, attachment text is separate doc
page_section = page_doc.sections[0]
assert page_section.text == "test123 " + page_within_a_page_text
assert (
page_section.text
== "test123 "
+ page_within_a_page_text
+ "\n<attachment>small-file.txt</attachment>\n<attachment>big-file.txt</attachment>"
)
assert (
page_section.link
== "https://danswerai.atlassian.net/wiki/spaces/DailyConne/overview"
)
if expect_attachments:
text_attachment_section = page_doc.sections[1]
assert small_file_doc is not None
text_attachment_section = small_file_doc.sections[0]
assert text_attachment_section.text == "small"
assert text_attachment_section.link
assert text_attachment_section.link.endswith("small-file.txt")
assert text_attachment_section.link.split("?")[0].endswith("small-file.txt")
@pytest.mark.parametrize("space", ["MI"])
@@ -188,5 +197,5 @@ def test_confluence_connector_allow_images(
confluence_connector, 0, time.time()
)
assert len(doc_batch) == 8
assert len(doc_batch) == 12
assert sum(len(doc.sections) for doc in doc_batch) == 12

View File

@@ -54,7 +54,7 @@ def test_confluence_connector_permissions(
# Get all doc IDs from the slim connector
all_slim_doc_ids = set()
for slim_doc_batch in confluence_connector.retrieve_all_slim_documents():
for slim_doc_batch in confluence_connector.retrieve_all_slim_docs_perm_sync():
all_slim_doc_ids.update([doc.id for doc in slim_doc_batch])
# Find IDs that are in full but not in slim

View File

@@ -78,7 +78,7 @@ def test_slim_docs_retrieval(
print("\n\nRunning test_slim_docs_retrieval")
connector = google_gmail_service_acct_connector_factory()
retrieved_slim_docs: list[SlimDocument] = []
for doc_batch in connector.retrieve_all_slim_documents(
for doc_batch in connector.retrieve_all_slim_docs_perm_sync(
_THREAD_1_START_TIME, _THREAD_1_END_TIME
):
retrieved_slim_docs.extend(doc_batch)

View File

@@ -97,7 +97,7 @@ def test_highspot_connector_slim(
# Get all doc IDs from the slim connector
all_slim_doc_ids = set()
for slim_doc_batch in highspot_connector.retrieve_all_slim_documents():
for slim_doc_batch in highspot_connector.retrieve_all_slim_docs_perm_sync():
all_slim_doc_ids.update([doc.id for doc in slim_doc_batch])
# The set of full doc IDs should be a subset of the slim doc IDs

View File

@@ -190,7 +190,7 @@ def test_salesforce_connector_slim(salesforce_connector: SalesforceConnector) ->
# Get all doc IDs from the slim connector
all_slim_doc_ids = set()
for slim_doc_batch in salesforce_connector.retrieve_all_slim_documents():
for slim_doc_batch in salesforce_connector.retrieve_all_slim_docs_perm_sync():
all_slim_doc_ids.update([doc.id for doc in slim_doc_batch])
# The set of full doc IDs should be always be a subset of the slim doc IDs

View File

@@ -82,7 +82,7 @@ def test_slab_connector_slim(slab_connector: SlabConnector) -> None:
# Get all doc IDs from the slim connector
all_slim_doc_ids = set()
for slim_doc_batch in slab_connector.retrieve_all_slim_documents():
for slim_doc_batch in slab_connector.retrieve_all_slim_docs_perm_sync():
all_slim_doc_ids.update([doc.id for doc in slim_doc_batch])
# The set of full doc IDs should be always be a subset of the slim doc IDs

View File

@@ -109,11 +109,11 @@ def test_load_from_checkpoint_access__private_channel(
def test_slim_documents_access__public_channel(
slack_connector: SlackConnector,
) -> None:
"""Test that retrieve_all_slim_documents returns correct access information for slim documents."""
"""Test that retrieve_all_slim_docs_perm_sync returns correct access information for slim documents."""
if not slack_connector.client:
raise RuntimeError("Web client must be defined")
slim_docs_generator = slack_connector.retrieve_all_slim_documents(
slim_docs_generator = slack_connector.retrieve_all_slim_docs_perm_sync(
start=0.0,
end=time.time(),
)
@@ -143,11 +143,11 @@ def test_slim_documents_access__public_channel(
def test_slim_documents_access__private_channel(
slack_connector: SlackConnector,
) -> None:
"""Test that retrieve_all_slim_documents returns correct access information for slim documents."""
"""Test that retrieve_all_slim_docs_perm_sync returns correct access information for slim documents."""
if not slack_connector.client:
raise RuntimeError("Web client must be defined")
slim_docs_generator = slack_connector.retrieve_all_slim_documents(
slim_docs_generator = slack_connector.retrieve_all_slim_docs_perm_sync(
start=0.0,
end=time.time(),
)

View File

@@ -157,7 +157,7 @@ def test_slim_docs_retrieval_from_teams_connector(
) -> None:
slim_docs = [
slim_doc
for slim_doc_batch in teams_connector.retrieve_all_slim_documents()
for slim_doc_batch in teams_connector.retrieve_all_slim_docs_perm_sync()
for slim_doc in slim_doc_batch
]

View File

@@ -119,7 +119,7 @@ def test_zendesk_connector_slim(zendesk_article_connector: ZendeskConnector) ->
# Get slim doc IDs
all_slim_doc_ids = set()
for slim_doc_batch in zendesk_article_connector.retrieve_all_slim_documents():
for slim_doc_batch in zendesk_article_connector.retrieve_all_slim_docs_perm_sync():
all_slim_doc_ids.update([doc.id for doc in slim_doc_batch])
# Full docs should be subset of slim docs

View File

@@ -289,7 +289,7 @@ def test_load_from_checkpoint_with_page_processing_error(
)
def test_retrieve_all_slim_documents(
def test_retrieve_all_slim_docs_perm_sync(
confluence_connector: ConfluenceConnector,
create_mock_page: Callable[..., dict[str, Any]],
) -> None:
@@ -318,8 +318,8 @@ def test_retrieve_all_slim_documents(
MagicMock(json=lambda: {"results": []}),
]
# Call retrieve_all_slim_documents
batches = list(confluence_connector.retrieve_all_slim_documents(0, 100))
# Call retrieve_all_slim_docs_perm_sync
batches = list(confluence_connector.retrieve_all_slim_docs_perm_sync(0, 100))
assert get_mock.call_count == 4
# Check that a batch with 2 documents was returned

View File

@@ -315,7 +315,7 @@ def test_load_from_checkpoint_with_skipped_issue(
assert len(checkpoint_output.items) == 0
def test_retrieve_all_slim_documents(
def test_retrieve_all_slim_docs_perm_sync(
jira_connector: JiraConnector, create_mock_issue: Any
) -> None:
"""Test retrieving all slim documents"""
@@ -341,8 +341,8 @@ def test_retrieve_all_slim_documents(
"https://jira.example.com/browse/TEST-2",
]
# Call retrieve_all_slim_documents
batches = list(jira_connector.retrieve_all_slim_documents(0, 100))
# Call retrieve_all_slim_docs_perm_sync
batches = list(jira_connector.retrieve_all_slim_docs_perm_sync(0, 100))
# Check that a batch with 2 documents was returned
assert len(batches) == 1

5
web/package-lock.json generated
View File

@@ -8595,6 +8595,7 @@
"version": "3.9.0",
"resolved": "https://registry.npmjs.org/ci-info/-/ci-info-3.9.0.tgz",
"integrity": "sha512-NIxF55hv4nSqQswkAeiOi1r83xy8JldOFDTWiug55KBu9Jnblncd2U6ViHmYgHf01TPZS77NJBhBMKdWj9HQMQ==",
"dev": true,
"funding": [
{
"type": "github",
@@ -11831,6 +11832,7 @@
"version": "0.1.4",
"resolved": "https://registry.npmjs.org/imurmurhash/-/imurmurhash-0.1.4.tgz",
"integrity": "sha512-JmXMZ6wuvDmLiHEml9ykzqO6lwFbof0GG4IkcGaENdCRDDmMVnny7s5HsIgHCbaq0w2MyPhDqkhTUgS2LU2PHA==",
"dev": true,
"license": "MIT",
"engines": {
"node": ">=0.8.19"
@@ -20478,6 +20480,7 @@
"version": "3.0.7",
"resolved": "https://registry.npmjs.org/signal-exit/-/signal-exit-3.0.7.tgz",
"integrity": "sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ==",
"dev": true,
"license": "ISC"
},
"node_modules/simple-swizzle": {
@@ -21445,6 +21448,7 @@
"version": "0.2.0",
"resolved": "https://registry.npmjs.org/text-table/-/text-table-0.2.0.tgz",
"integrity": "sha512-N+8UisAXDGk8PFXP4HAzVR9nbfmVJ3zYLAWiTIoqC5v5isinhr+r5uaO8+7r3BMfuNIufIsA7RdpVgacC2cSpw==",
"dev": true,
"license": "MIT"
},
"node_modules/thenify": {
@@ -22800,6 +22804,7 @@
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/write-file-atomic/-/write-file-atomic-4.0.2.tgz",
"integrity": "sha512-7KxauUdBmSdWnmpaGFg+ppNjKF8uNLry8LyzjauQDOVONfFLNKrKvQOxZ/VuTIcS/gge/YNahf5RIIQWTSarlg==",
"dev": true,
"license": "ISC",
"dependencies": {
"imurmurhash": "^0.1.4",