Compare commits

...

2 Commits

Author SHA1 Message Date
Richard Kuo (Onyx)
3904cdcfcd only callback outside the exception 2025-04-17 13:15:48 -07:00
Richard Kuo (Onyx)
84bac09a5a add callback interface that can be used inside the connector 2025-04-17 12:54:25 -07:00
3 changed files with 27 additions and 8 deletions

View File

@@ -405,6 +405,9 @@ def _run_indexing(
end_time=window_end,
)
if callback:
connector_runner.connector.set_heartbeat_callback(callback)
# don't use a checkpoint if we're explicitly indexing from
# the beginning in order to avoid weird interactions between
# checkpointing / failure handling.

View File

@@ -63,6 +63,10 @@ class BaseConnector(abc.ABC, Generic[CT]):
"""Implement if the underlying connector wants to skip/allow image downloading
based on the application level image analysis setting."""
def set_heartbeat_callback(self, callback: IndexingHeartbeatInterface) -> None:
"""Implement if the underlying connector needs to report activity out to prevent
watchdog timeouts."""
def build_dummy_checkpoint(self) -> CT:
# TODO: find a way to make this work without type: ignore
return ConnectorCheckpoint(has_more=True) # type: ignore

View File

@@ -108,14 +108,13 @@ def get_channels(
channel_types=channel_types,
)
except SlackApiError as e:
logger.info(
f"Unable to fetch private channels due to: {e}. Trying again without private channels."
)
if get_public:
channel_types = ["public_channel"]
else:
logger.warning(f"Unable to fetch private channels due to: {e}.")
if not get_public:
logger.warning("No channels to fetch.")
return []
logger.warning("Trying again with public channels only.")
channel_types = ["public_channel"]
channels = _collect_paginated_channels(
client=client,
exclude_archived=exclude_archived,
@@ -529,10 +528,16 @@ class SlackConnector(
self.credential_prefix: str | None = None
self.delay_lock: str | None = None # the redis key for the shared lock
self.delay_key: str | None = None # the redis key for the shared delay
self._callback: IndexingHeartbeatInterface | None = None
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
raise NotImplementedError("Use set_credentials_provider with this connector.")
def set_heartbeat_callback(self, callback: IndexingHeartbeatInterface) -> None:
"""Implement if the underlying connector needs to report activity out to prevent
watchdog timeouts."""
self._callback = callback
def set_credentials_provider(
self, credentials_provider: CredentialsProviderInterface
) -> None:
@@ -615,6 +620,10 @@ class SlackConnector(
filtered_channels = filter_channels(
raw_channels, self.channels, self.channel_regex_enabled
)
logger.info(
f"Channels: total={len(raw_channels)} filtered={len(filtered_channels)}."
)
checkpoint.channel_ids = [c["id"] for c in filtered_channels]
if len(filtered_channels) == 0:
checkpoint.has_more = False
@@ -705,7 +714,9 @@ class SlackConnector(
checkpoint.current_channel = None
checkpoint.has_more = checkpoint.current_channel is not None
return checkpoint
if self._callback:
self._callback.progress("load_from_checkpoint", 0)
except Exception as e:
logger.exception(f"Error processing channel {channel['name']}")
@@ -720,7 +731,8 @@ class SlackConnector(
failure_message=str(e),
exception=e,
)
return checkpoint
return checkpoint
def validate_connector_settings(self) -> None:
"""