mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-02-16 23:35:46 +00:00
Compare commits
2 Commits
v2.11.0-cl
...
feat/file-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
462537e2ee | ||
|
|
e3a9b2a021 |
@@ -24,10 +24,14 @@ from sqlalchemy.orm import Session
|
||||
from onyx.background.celery.apps.app_base import task_logger
|
||||
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
|
||||
from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.configs.constants import OnyxCeleryPriority
|
||||
from onyx.configs.constants import OnyxCeleryQueues
|
||||
from onyx.configs.constants import OnyxCeleryTask
|
||||
from onyx.configs.constants import OnyxRedisLocks
|
||||
from onyx.connectors.factory import instantiate_connector
|
||||
from onyx.connectors.interfaces import HierarchyConnector
|
||||
from onyx.connectors.models import HierarchyNode as PydanticHierarchyNode
|
||||
from onyx.db.connector import mark_cc_pair_as_hierarchy_fetched
|
||||
from onyx.db.connector_credential_pair import (
|
||||
fetch_indexable_standard_connector_credential_pair_ids,
|
||||
@@ -35,7 +39,10 @@ from onyx.db.connector_credential_pair import (
|
||||
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
|
||||
from onyx.db.engine.sql_engine import get_session_with_current_tenant
|
||||
from onyx.db.enums import ConnectorCredentialPairStatus
|
||||
from onyx.db.hierarchy import upsert_hierarchy_nodes_batch
|
||||
from onyx.db.models import ConnectorCredentialPair
|
||||
from onyx.redis.redis_hierarchy import cache_hierarchy_nodes_batch
|
||||
from onyx.redis.redis_hierarchy import HierarchyNodeCacheEntry
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
@@ -211,6 +218,90 @@ def check_for_hierarchy_fetching(self: Task, *, tenant_id: str) -> int | None:
|
||||
return tasks_created
|
||||
|
||||
|
||||
# Batch size for hierarchy node processing
|
||||
HIERARCHY_NODE_BATCH_SIZE = 100
|
||||
|
||||
|
||||
def _run_hierarchy_extraction(
|
||||
db_session: Session,
|
||||
cc_pair: ConnectorCredentialPair,
|
||||
source: DocumentSource,
|
||||
tenant_id: str,
|
||||
) -> int:
|
||||
"""
|
||||
Run the hierarchy extraction for a connector.
|
||||
|
||||
Instantiates the connector and calls load_hierarchy() if the connector
|
||||
implements HierarchyConnector.
|
||||
|
||||
Returns the total number of hierarchy nodes extracted.
|
||||
"""
|
||||
connector = cc_pair.connector
|
||||
credential = cc_pair.credential
|
||||
|
||||
# Instantiate the connector using its configured input type
|
||||
runnable_connector = instantiate_connector(
|
||||
db_session=db_session,
|
||||
source=source,
|
||||
input_type=connector.input_type,
|
||||
connector_specific_config=connector.connector_specific_config,
|
||||
credential=credential,
|
||||
)
|
||||
|
||||
# Check if the connector supports hierarchy fetching
|
||||
if not isinstance(runnable_connector, HierarchyConnector):
|
||||
task_logger.debug(
|
||||
f"Connector {source} does not implement HierarchyConnector, skipping"
|
||||
)
|
||||
return 0
|
||||
|
||||
# Determine time range: start from last hierarchy fetch, end at now
|
||||
last_fetch = cc_pair.last_time_hierarchy_fetch
|
||||
start_time = last_fetch.timestamp() if last_fetch else 0
|
||||
end_time = datetime.now(timezone.utc).timestamp()
|
||||
|
||||
total_nodes = 0
|
||||
node_batch: list[PydanticHierarchyNode] = []
|
||||
redis_client = get_redis_client(tenant_id=tenant_id)
|
||||
|
||||
def _process_batch() -> int:
|
||||
"""Process accumulated hierarchy nodes batch."""
|
||||
if not node_batch:
|
||||
return 0
|
||||
|
||||
upserted_nodes = upsert_hierarchy_nodes_batch(
|
||||
db_session=db_session,
|
||||
nodes=node_batch,
|
||||
source=source,
|
||||
commit=True,
|
||||
)
|
||||
|
||||
# Cache in Redis for fast ancestor resolution
|
||||
cache_entries = [
|
||||
HierarchyNodeCacheEntry.from_db_model(node) for node in upserted_nodes
|
||||
]
|
||||
cache_hierarchy_nodes_batch(
|
||||
redis_client=redis_client,
|
||||
source=source,
|
||||
entries=cache_entries,
|
||||
)
|
||||
|
||||
count = len(node_batch)
|
||||
node_batch.clear()
|
||||
return count
|
||||
|
||||
# Fetch hierarchy nodes from the connector
|
||||
for node in runnable_connector.load_hierarchy(start=start_time, end=end_time):
|
||||
node_batch.append(node)
|
||||
if len(node_batch) >= HIERARCHY_NODE_BATCH_SIZE:
|
||||
total_nodes += _process_batch()
|
||||
|
||||
# Process any remaining nodes
|
||||
total_nodes += _process_batch()
|
||||
|
||||
return total_nodes
|
||||
|
||||
|
||||
@shared_task(
|
||||
name=OnyxCeleryTask.CONNECTOR_HIERARCHY_FETCHING_TASK,
|
||||
soft_time_limit=3600, # 1 hour soft limit
|
||||
@@ -253,15 +344,17 @@ def connector_hierarchy_fetching_task(
|
||||
)
|
||||
return
|
||||
|
||||
# TODO: Implement the actual hierarchy fetching logic
|
||||
# This will involve:
|
||||
# 1. Instantiating the connector
|
||||
# 2. Calling a hierarchy-specific method on the connector
|
||||
# 3. Upserting the hierarchy nodes to the database
|
||||
source = cc_pair.connector.source
|
||||
total_nodes = _run_hierarchy_extraction(
|
||||
db_session=db_session,
|
||||
cc_pair=cc_pair,
|
||||
source=source,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
|
||||
task_logger.info(
|
||||
f"connector_hierarchy_fetching_task: "
|
||||
f"Hierarchy fetching not yet implemented for cc_pair={cc_pair_id}"
|
||||
f"Extracted {total_nodes} hierarchy nodes for cc_pair={cc_pair_id}"
|
||||
)
|
||||
|
||||
# Update the last fetch time to prevent re-running until next interval
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import copy
|
||||
from collections.abc import Generator
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from datetime import timezone
|
||||
@@ -50,6 +51,7 @@ from onyx.connectors.models import HierarchyNode
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.db.enums import HierarchyNodeType
|
||||
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
@@ -63,6 +65,7 @@ _PAGE_EXPANSION_FIELDS = [
|
||||
"space",
|
||||
"metadata.labels",
|
||||
"history.lastUpdated",
|
||||
"ancestors", # For hierarchy node tracking
|
||||
]
|
||||
_ATTACHMENT_EXPANSION_FIELDS = [
|
||||
"version",
|
||||
@@ -134,6 +137,9 @@ class ConfluenceConnector(
|
||||
self._fetched_titles: set[str] = set()
|
||||
self.allow_images = False
|
||||
|
||||
# Track hierarchy nodes we've already yielded to avoid duplicates
|
||||
self.seen_hierarchy_node_raw_ids: set[str] = set()
|
||||
|
||||
# Remove trailing slash from wiki_base if present
|
||||
self.wiki_base = wiki_base.rstrip("/")
|
||||
"""
|
||||
@@ -184,6 +190,139 @@ class ConfluenceConnector(
|
||||
logger.info(f"Setting allow_images to {value}.")
|
||||
self.allow_images = value
|
||||
|
||||
def _yield_space_hierarchy_nodes(
|
||||
self,
|
||||
) -> Generator[HierarchyNode, None, None]:
|
||||
"""Yield hierarchy nodes for all spaces we're indexing."""
|
||||
space_keys = [self.space] if self.space else None
|
||||
|
||||
for space in self.confluence_client.retrieve_confluence_spaces(
|
||||
space_keys=space_keys,
|
||||
limit=50,
|
||||
):
|
||||
space_key = space.get("key")
|
||||
if not space_key or space_key in self.seen_hierarchy_node_raw_ids:
|
||||
continue
|
||||
|
||||
self.seen_hierarchy_node_raw_ids.add(space_key)
|
||||
|
||||
# Build space link
|
||||
space_link = f"{self.wiki_base}/spaces/{space_key}"
|
||||
|
||||
yield HierarchyNode(
|
||||
raw_node_id=space_key,
|
||||
raw_parent_id=None, # Parent is SOURCE
|
||||
display_name=space.get("name", space_key),
|
||||
link=space_link,
|
||||
node_type=HierarchyNodeType.SPACE,
|
||||
)
|
||||
|
||||
def _yield_ancestor_hierarchy_nodes(
|
||||
self,
|
||||
page: dict[str, Any],
|
||||
) -> Generator[HierarchyNode, None, None]:
|
||||
"""Yield hierarchy nodes for all unseen ancestors of this page.
|
||||
|
||||
Any page that appears as an ancestor of another page IS a hierarchy node
|
||||
(it has at least one child - the page we're currently processing).
|
||||
|
||||
This ensures parent nodes are always yielded before child documents.
|
||||
"""
|
||||
ancestors = page.get("ancestors", [])
|
||||
space_key = page.get("space", {}).get("key")
|
||||
|
||||
# Ensure space is yielded first (if not already)
|
||||
if space_key and space_key not in self.seen_hierarchy_node_raw_ids:
|
||||
self.seen_hierarchy_node_raw_ids.add(space_key)
|
||||
space = page.get("space", {})
|
||||
yield HierarchyNode(
|
||||
raw_node_id=space_key,
|
||||
raw_parent_id=None, # Parent is SOURCE
|
||||
display_name=space.get("name", space_key),
|
||||
link=f"{self.wiki_base}/spaces/{space_key}",
|
||||
node_type=HierarchyNodeType.SPACE,
|
||||
)
|
||||
|
||||
# Walk through ancestors (root to immediate parent)
|
||||
for i, ancestor in enumerate(ancestors):
|
||||
ancestor_id = str(ancestor.get("id"))
|
||||
if ancestor_id in self.seen_hierarchy_node_raw_ids:
|
||||
continue
|
||||
|
||||
self.seen_hierarchy_node_raw_ids.add(ancestor_id)
|
||||
|
||||
# Determine parent of this ancestor
|
||||
if i == 0:
|
||||
# First ancestor - parent is the space
|
||||
parent_raw_id = space_key
|
||||
else:
|
||||
# Parent is the previous ancestor
|
||||
parent_raw_id = str(ancestors[i - 1].get("id"))
|
||||
|
||||
# Build link from ancestor's _links
|
||||
ancestor_link = None
|
||||
if "_links" in ancestor and "webui" in ancestor["_links"]:
|
||||
ancestor_link = build_confluence_document_id(
|
||||
self.wiki_base, ancestor["_links"]["webui"], self.is_cloud
|
||||
)
|
||||
|
||||
yield HierarchyNode(
|
||||
raw_node_id=ancestor_id,
|
||||
raw_parent_id=parent_raw_id,
|
||||
display_name=ancestor.get("title", f"Page {ancestor_id}"),
|
||||
link=ancestor_link,
|
||||
node_type=HierarchyNodeType.PAGE,
|
||||
)
|
||||
|
||||
def _get_parent_hierarchy_raw_id(self, page: dict[str, Any]) -> str | None:
|
||||
"""Get the raw hierarchy node ID of this page's parent.
|
||||
|
||||
Returns:
|
||||
- Parent page ID if page has a parent page (last item in ancestors)
|
||||
- Space key if page is at top level of space
|
||||
- None if we can't determine
|
||||
"""
|
||||
ancestors = page.get("ancestors", [])
|
||||
if ancestors:
|
||||
# Last ancestor is the immediate parent page
|
||||
return str(ancestors[-1].get("id"))
|
||||
|
||||
# Top-level page - parent is the space
|
||||
return page.get("space", {}).get("key")
|
||||
|
||||
def _maybe_yield_page_hierarchy_node(
|
||||
self, page: dict[str, Any]
|
||||
) -> HierarchyNode | None:
|
||||
"""Yield a hierarchy node for this page if not already yielded.
|
||||
|
||||
Used when a page has attachments - attachments are children of the page
|
||||
in the hierarchy, so the page must be a hierarchy node.
|
||||
"""
|
||||
page_id = _get_page_id(page)
|
||||
if page_id in self.seen_hierarchy_node_raw_ids:
|
||||
return None
|
||||
|
||||
self.seen_hierarchy_node_raw_ids.add(page_id)
|
||||
|
||||
# Build page link
|
||||
page_link = None
|
||||
if "_links" in page and "webui" in page["_links"]:
|
||||
page_link = build_confluence_document_id(
|
||||
self.wiki_base, page["_links"]["webui"], self.is_cloud
|
||||
)
|
||||
|
||||
# Get parent hierarchy ID
|
||||
parent_raw_id = self._get_parent_hierarchy_raw_id(page)
|
||||
|
||||
return HierarchyNode(
|
||||
raw_node_id=page_id,
|
||||
raw_parent_id=parent_raw_id,
|
||||
display_name=page.get("title", f"Page {page_id}"),
|
||||
link=page_link,
|
||||
node_type=HierarchyNodeType.PAGE,
|
||||
document_id=page_link, # Page is also a document
|
||||
)
|
||||
|
||||
@property
|
||||
def confluence_client(self) -> OnyxConfluence:
|
||||
if self._confluence_client is None:
|
||||
@@ -355,6 +494,9 @@ class ConfluenceConnector(
|
||||
BasicExpertInfo(display_name=display_name, email=email)
|
||||
)
|
||||
|
||||
# Determine parent hierarchy node
|
||||
parent_hierarchy_raw_node_id = self._get_parent_hierarchy_raw_id(page)
|
||||
|
||||
# Create the document
|
||||
return Document(
|
||||
id=page_url,
|
||||
@@ -364,6 +506,7 @@ class ConfluenceConnector(
|
||||
metadata=metadata,
|
||||
doc_updated_at=datetime_from_string(page["version"]["when"]),
|
||||
primary_owners=primary_owners if primary_owners else None,
|
||||
parent_hierarchy_raw_node_id=parent_hierarchy_raw_node_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error converting page {page.get('id', 'unknown')}: {e}")
|
||||
@@ -388,6 +531,9 @@ class ConfluenceConnector(
|
||||
Inline attachments are added directly to the document as text or image sections by
|
||||
this function. The returned documents/connectorfailures are for non-inline attachments
|
||||
and those at the end of the page.
|
||||
|
||||
If there are valid attachments, the page itself is yielded as a hierarchy node
|
||||
(since attachments are children of the page in the hierarchy).
|
||||
"""
|
||||
attachment_query = self._construct_attachment_query(
|
||||
_get_page_id(page), start, end
|
||||
@@ -395,6 +541,7 @@ class ConfluenceConnector(
|
||||
attachment_failures: list[ConnectorFailure] = []
|
||||
attachment_docs: list[Document | HierarchyNode] = []
|
||||
page_url = ""
|
||||
page_hierarchy_node_yielded = False
|
||||
|
||||
try:
|
||||
for attachment in self.confluence_client.paginated_cql_retrieval(
|
||||
@@ -488,6 +635,9 @@ class ConfluenceConnector(
|
||||
BasicExpertInfo(display_name=display_name, email=email)
|
||||
]
|
||||
|
||||
# Attachments have their parent page as the hierarchy parent
|
||||
attachment_parent_hierarchy_raw_id = _get_page_id(page)
|
||||
|
||||
attachment_doc = Document(
|
||||
id=attachment_id,
|
||||
sections=sections,
|
||||
@@ -501,7 +651,19 @@ class ConfluenceConnector(
|
||||
else None
|
||||
),
|
||||
primary_owners=primary_owners,
|
||||
parent_hierarchy_raw_node_id=attachment_parent_hierarchy_raw_id,
|
||||
)
|
||||
|
||||
# If this is the first valid attachment, yield the page as a
|
||||
# hierarchy node (attachments are children of the page)
|
||||
if not page_hierarchy_node_yielded:
|
||||
page_hierarchy_node = self._maybe_yield_page_hierarchy_node(
|
||||
page
|
||||
)
|
||||
if page_hierarchy_node:
|
||||
attachment_docs.append(page_hierarchy_node)
|
||||
page_hierarchy_node_yielded = True
|
||||
|
||||
attachment_docs.append(attachment_doc)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
@@ -569,7 +731,8 @@ class ConfluenceConnector(
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
) -> CheckpointOutput[ConfluenceCheckpoint]:
|
||||
"""
|
||||
Yields batches of Documents. For each page:
|
||||
Yields batches of Documents and HierarchyNodes. For each page:
|
||||
- Yield hierarchy nodes for spaces and ancestor pages (parent-before-child ordering)
|
||||
- Create a Document with 1 Section for the page text/comments
|
||||
- Then fetch attachments. For each attachment:
|
||||
- Attempt to convert it with convert_attachment_to_content(...)
|
||||
@@ -577,6 +740,10 @@ class ConfluenceConnector(
|
||||
"""
|
||||
checkpoint = copy.deepcopy(checkpoint)
|
||||
|
||||
# Yield space hierarchy nodes FIRST (only once per connector run)
|
||||
if not checkpoint.next_page_url:
|
||||
yield from self._yield_space_hierarchy_nodes()
|
||||
|
||||
# use "start" when last_updated is 0 or for confluence server
|
||||
start_ts = start
|
||||
page_query_url = checkpoint.next_page_url or self._build_page_retrieval_url(
|
||||
@@ -593,6 +760,9 @@ class ConfluenceConnector(
|
||||
limit=self.batch_size,
|
||||
next_page_callback=store_next_page_url,
|
||||
):
|
||||
# Yield hierarchy nodes for all ancestors (parent-before-child ordering)
|
||||
yield from self._yield_ancestor_hierarchy_nodes(page)
|
||||
|
||||
# Build doc from page
|
||||
doc_or_failure = self._convert_page_to_document(page)
|
||||
|
||||
|
||||
@@ -245,6 +245,8 @@ CheckpointOutput: TypeAlias = Generator[
|
||||
Document | HierarchyNode | ConnectorFailure, None, CT
|
||||
]
|
||||
|
||||
HierarchyOutput: TypeAlias = Generator[HierarchyNode, None, None]
|
||||
|
||||
|
||||
class CheckpointedConnector(BaseConnector[CT]):
|
||||
@abc.abstractmethod
|
||||
@@ -294,3 +296,13 @@ class CheckpointedConnectorWithPermSync(CheckpointedConnector[CT]):
|
||||
checkpoint: CT,
|
||||
) -> CheckpointOutput[CT]:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class HierarchyConnector(BaseConnector):
|
||||
@abc.abstractmethod
|
||||
def load_hierarchy(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch, # may be unused if the connector must load the full hierarchy each time
|
||||
end: SecondsSinceUnixEpoch,
|
||||
) -> HierarchyOutput:
|
||||
raise NotImplementedError
|
||||
|
||||
Reference in New Issue
Block a user