Compare commits

...

2 Commits

Author SHA1 Message Date
Evan Lohn
462537e2ee initial confluence hierarchy impl 2026-01-27 23:08:38 -08:00
Evan Lohn
e3a9b2a021 hierarchyfetching task implementation 2026-01-27 22:11:31 -08:00
3 changed files with 282 additions and 7 deletions

View File

@@ -24,10 +24,14 @@ from sqlalchemy.orm import Session
from onyx.background.celery.apps.app_base import task_logger
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.connectors.factory import instantiate_connector
from onyx.connectors.interfaces import HierarchyConnector
from onyx.connectors.models import HierarchyNode as PydanticHierarchyNode
from onyx.db.connector import mark_cc_pair_as_hierarchy_fetched
from onyx.db.connector_credential_pair import (
fetch_indexable_standard_connector_credential_pair_ids,
@@ -35,7 +39,10 @@ from onyx.db.connector_credential_pair import (
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.engine.sql_engine import get_session_with_current_tenant
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.hierarchy import upsert_hierarchy_nodes_batch
from onyx.db.models import ConnectorCredentialPair
from onyx.redis.redis_hierarchy import cache_hierarchy_nodes_batch
from onyx.redis.redis_hierarchy import HierarchyNodeCacheEntry
from onyx.redis.redis_pool import get_redis_client
from onyx.utils.logger import setup_logger
@@ -211,6 +218,90 @@ def check_for_hierarchy_fetching(self: Task, *, tenant_id: str) -> int | None:
return tasks_created
# Batch size for hierarchy node processing
HIERARCHY_NODE_BATCH_SIZE = 100
def _run_hierarchy_extraction(
db_session: Session,
cc_pair: ConnectorCredentialPair,
source: DocumentSource,
tenant_id: str,
) -> int:
"""
Run the hierarchy extraction for a connector.
Instantiates the connector and calls load_hierarchy() if the connector
implements HierarchyConnector.
Returns the total number of hierarchy nodes extracted.
"""
connector = cc_pair.connector
credential = cc_pair.credential
# Instantiate the connector using its configured input type
runnable_connector = instantiate_connector(
db_session=db_session,
source=source,
input_type=connector.input_type,
connector_specific_config=connector.connector_specific_config,
credential=credential,
)
# Check if the connector supports hierarchy fetching
if not isinstance(runnable_connector, HierarchyConnector):
task_logger.debug(
f"Connector {source} does not implement HierarchyConnector, skipping"
)
return 0
# Determine time range: start from last hierarchy fetch, end at now
last_fetch = cc_pair.last_time_hierarchy_fetch
start_time = last_fetch.timestamp() if last_fetch else 0
end_time = datetime.now(timezone.utc).timestamp()
total_nodes = 0
node_batch: list[PydanticHierarchyNode] = []
redis_client = get_redis_client(tenant_id=tenant_id)
def _process_batch() -> int:
"""Process accumulated hierarchy nodes batch."""
if not node_batch:
return 0
upserted_nodes = upsert_hierarchy_nodes_batch(
db_session=db_session,
nodes=node_batch,
source=source,
commit=True,
)
# Cache in Redis for fast ancestor resolution
cache_entries = [
HierarchyNodeCacheEntry.from_db_model(node) for node in upserted_nodes
]
cache_hierarchy_nodes_batch(
redis_client=redis_client,
source=source,
entries=cache_entries,
)
count = len(node_batch)
node_batch.clear()
return count
# Fetch hierarchy nodes from the connector
for node in runnable_connector.load_hierarchy(start=start_time, end=end_time):
node_batch.append(node)
if len(node_batch) >= HIERARCHY_NODE_BATCH_SIZE:
total_nodes += _process_batch()
# Process any remaining nodes
total_nodes += _process_batch()
return total_nodes
@shared_task(
name=OnyxCeleryTask.CONNECTOR_HIERARCHY_FETCHING_TASK,
soft_time_limit=3600, # 1 hour soft limit
@@ -253,15 +344,17 @@ def connector_hierarchy_fetching_task(
)
return
# TODO: Implement the actual hierarchy fetching logic
# This will involve:
# 1. Instantiating the connector
# 2. Calling a hierarchy-specific method on the connector
# 3. Upserting the hierarchy nodes to the database
source = cc_pair.connector.source
total_nodes = _run_hierarchy_extraction(
db_session=db_session,
cc_pair=cc_pair,
source=source,
tenant_id=tenant_id,
)
task_logger.info(
f"connector_hierarchy_fetching_task: "
f"Hierarchy fetching not yet implemented for cc_pair={cc_pair_id}"
f"Extracted {total_nodes} hierarchy nodes for cc_pair={cc_pair_id}"
)
# Update the last fetch time to prevent re-running until next interval

View File

@@ -1,4 +1,5 @@
import copy
from collections.abc import Generator
from datetime import datetime
from datetime import timedelta
from datetime import timezone
@@ -50,6 +51,7 @@ from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import ImageSection
from onyx.connectors.models import SlimDocument
from onyx.connectors.models import TextSection
from onyx.db.enums import HierarchyNodeType
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.logger import setup_logger
@@ -63,6 +65,7 @@ _PAGE_EXPANSION_FIELDS = [
"space",
"metadata.labels",
"history.lastUpdated",
"ancestors", # For hierarchy node tracking
]
_ATTACHMENT_EXPANSION_FIELDS = [
"version",
@@ -134,6 +137,9 @@ class ConfluenceConnector(
self._fetched_titles: set[str] = set()
self.allow_images = False
# Track hierarchy nodes we've already yielded to avoid duplicates
self.seen_hierarchy_node_raw_ids: set[str] = set()
# Remove trailing slash from wiki_base if present
self.wiki_base = wiki_base.rstrip("/")
"""
@@ -184,6 +190,139 @@ class ConfluenceConnector(
logger.info(f"Setting allow_images to {value}.")
self.allow_images = value
def _yield_space_hierarchy_nodes(
self,
) -> Generator[HierarchyNode, None, None]:
"""Yield hierarchy nodes for all spaces we're indexing."""
space_keys = [self.space] if self.space else None
for space in self.confluence_client.retrieve_confluence_spaces(
space_keys=space_keys,
limit=50,
):
space_key = space.get("key")
if not space_key or space_key in self.seen_hierarchy_node_raw_ids:
continue
self.seen_hierarchy_node_raw_ids.add(space_key)
# Build space link
space_link = f"{self.wiki_base}/spaces/{space_key}"
yield HierarchyNode(
raw_node_id=space_key,
raw_parent_id=None, # Parent is SOURCE
display_name=space.get("name", space_key),
link=space_link,
node_type=HierarchyNodeType.SPACE,
)
def _yield_ancestor_hierarchy_nodes(
self,
page: dict[str, Any],
) -> Generator[HierarchyNode, None, None]:
"""Yield hierarchy nodes for all unseen ancestors of this page.
Any page that appears as an ancestor of another page IS a hierarchy node
(it has at least one child - the page we're currently processing).
This ensures parent nodes are always yielded before child documents.
"""
ancestors = page.get("ancestors", [])
space_key = page.get("space", {}).get("key")
# Ensure space is yielded first (if not already)
if space_key and space_key not in self.seen_hierarchy_node_raw_ids:
self.seen_hierarchy_node_raw_ids.add(space_key)
space = page.get("space", {})
yield HierarchyNode(
raw_node_id=space_key,
raw_parent_id=None, # Parent is SOURCE
display_name=space.get("name", space_key),
link=f"{self.wiki_base}/spaces/{space_key}",
node_type=HierarchyNodeType.SPACE,
)
# Walk through ancestors (root to immediate parent)
for i, ancestor in enumerate(ancestors):
ancestor_id = str(ancestor.get("id"))
if ancestor_id in self.seen_hierarchy_node_raw_ids:
continue
self.seen_hierarchy_node_raw_ids.add(ancestor_id)
# Determine parent of this ancestor
if i == 0:
# First ancestor - parent is the space
parent_raw_id = space_key
else:
# Parent is the previous ancestor
parent_raw_id = str(ancestors[i - 1].get("id"))
# Build link from ancestor's _links
ancestor_link = None
if "_links" in ancestor and "webui" in ancestor["_links"]:
ancestor_link = build_confluence_document_id(
self.wiki_base, ancestor["_links"]["webui"], self.is_cloud
)
yield HierarchyNode(
raw_node_id=ancestor_id,
raw_parent_id=parent_raw_id,
display_name=ancestor.get("title", f"Page {ancestor_id}"),
link=ancestor_link,
node_type=HierarchyNodeType.PAGE,
)
def _get_parent_hierarchy_raw_id(self, page: dict[str, Any]) -> str | None:
"""Get the raw hierarchy node ID of this page's parent.
Returns:
- Parent page ID if page has a parent page (last item in ancestors)
- Space key if page is at top level of space
- None if we can't determine
"""
ancestors = page.get("ancestors", [])
if ancestors:
# Last ancestor is the immediate parent page
return str(ancestors[-1].get("id"))
# Top-level page - parent is the space
return page.get("space", {}).get("key")
def _maybe_yield_page_hierarchy_node(
self, page: dict[str, Any]
) -> HierarchyNode | None:
"""Yield a hierarchy node for this page if not already yielded.
Used when a page has attachments - attachments are children of the page
in the hierarchy, so the page must be a hierarchy node.
"""
page_id = _get_page_id(page)
if page_id in self.seen_hierarchy_node_raw_ids:
return None
self.seen_hierarchy_node_raw_ids.add(page_id)
# Build page link
page_link = None
if "_links" in page and "webui" in page["_links"]:
page_link = build_confluence_document_id(
self.wiki_base, page["_links"]["webui"], self.is_cloud
)
# Get parent hierarchy ID
parent_raw_id = self._get_parent_hierarchy_raw_id(page)
return HierarchyNode(
raw_node_id=page_id,
raw_parent_id=parent_raw_id,
display_name=page.get("title", f"Page {page_id}"),
link=page_link,
node_type=HierarchyNodeType.PAGE,
document_id=page_link, # Page is also a document
)
@property
def confluence_client(self) -> OnyxConfluence:
if self._confluence_client is None:
@@ -355,6 +494,9 @@ class ConfluenceConnector(
BasicExpertInfo(display_name=display_name, email=email)
)
# Determine parent hierarchy node
parent_hierarchy_raw_node_id = self._get_parent_hierarchy_raw_id(page)
# Create the document
return Document(
id=page_url,
@@ -364,6 +506,7 @@ class ConfluenceConnector(
metadata=metadata,
doc_updated_at=datetime_from_string(page["version"]["when"]),
primary_owners=primary_owners if primary_owners else None,
parent_hierarchy_raw_node_id=parent_hierarchy_raw_node_id,
)
except Exception as e:
logger.error(f"Error converting page {page.get('id', 'unknown')}: {e}")
@@ -388,6 +531,9 @@ class ConfluenceConnector(
Inline attachments are added directly to the document as text or image sections by
this function. The returned documents/connectorfailures are for non-inline attachments
and those at the end of the page.
If there are valid attachments, the page itself is yielded as a hierarchy node
(since attachments are children of the page in the hierarchy).
"""
attachment_query = self._construct_attachment_query(
_get_page_id(page), start, end
@@ -395,6 +541,7 @@ class ConfluenceConnector(
attachment_failures: list[ConnectorFailure] = []
attachment_docs: list[Document | HierarchyNode] = []
page_url = ""
page_hierarchy_node_yielded = False
try:
for attachment in self.confluence_client.paginated_cql_retrieval(
@@ -488,6 +635,9 @@ class ConfluenceConnector(
BasicExpertInfo(display_name=display_name, email=email)
]
# Attachments have their parent page as the hierarchy parent
attachment_parent_hierarchy_raw_id = _get_page_id(page)
attachment_doc = Document(
id=attachment_id,
sections=sections,
@@ -501,7 +651,19 @@ class ConfluenceConnector(
else None
),
primary_owners=primary_owners,
parent_hierarchy_raw_node_id=attachment_parent_hierarchy_raw_id,
)
# If this is the first valid attachment, yield the page as a
# hierarchy node (attachments are children of the page)
if not page_hierarchy_node_yielded:
page_hierarchy_node = self._maybe_yield_page_hierarchy_node(
page
)
if page_hierarchy_node:
attachment_docs.append(page_hierarchy_node)
page_hierarchy_node_yielded = True
attachment_docs.append(attachment_doc)
except Exception as e:
logger.error(
@@ -569,7 +731,8 @@ class ConfluenceConnector(
end: SecondsSinceUnixEpoch | None = None,
) -> CheckpointOutput[ConfluenceCheckpoint]:
"""
Yields batches of Documents. For each page:
Yields batches of Documents and HierarchyNodes. For each page:
- Yield hierarchy nodes for spaces and ancestor pages (parent-before-child ordering)
- Create a Document with 1 Section for the page text/comments
- Then fetch attachments. For each attachment:
- Attempt to convert it with convert_attachment_to_content(...)
@@ -577,6 +740,10 @@ class ConfluenceConnector(
"""
checkpoint = copy.deepcopy(checkpoint)
# Yield space hierarchy nodes FIRST (only once per connector run)
if not checkpoint.next_page_url:
yield from self._yield_space_hierarchy_nodes()
# use "start" when last_updated is 0 or for confluence server
start_ts = start
page_query_url = checkpoint.next_page_url or self._build_page_retrieval_url(
@@ -593,6 +760,9 @@ class ConfluenceConnector(
limit=self.batch_size,
next_page_callback=store_next_page_url,
):
# Yield hierarchy nodes for all ancestors (parent-before-child ordering)
yield from self._yield_ancestor_hierarchy_nodes(page)
# Build doc from page
doc_or_failure = self._convert_page_to_document(page)

View File

@@ -245,6 +245,8 @@ CheckpointOutput: TypeAlias = Generator[
Document | HierarchyNode | ConnectorFailure, None, CT
]
HierarchyOutput: TypeAlias = Generator[HierarchyNode, None, None]
class CheckpointedConnector(BaseConnector[CT]):
@abc.abstractmethod
@@ -294,3 +296,13 @@ class CheckpointedConnectorWithPermSync(CheckpointedConnector[CT]):
checkpoint: CT,
) -> CheckpointOutput[CT]:
raise NotImplementedError
class HierarchyConnector(BaseConnector):
@abc.abstractmethod
def load_hierarchy(
self,
start: SecondsSinceUnixEpoch, # may be unused if the connector must load the full hierarchy each time
end: SecondsSinceUnixEpoch,
) -> HierarchyOutput:
raise NotImplementedError