mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-04-08 08:22:42 +00:00
Compare commits
1 Commits
cli/v0.2.1
...
temp/pr-52
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3f977d3752 |
@@ -199,6 +199,7 @@ class DocumentSource(str, Enum):
|
||||
HIGHSPOT = "highspot"
|
||||
|
||||
IMAP = "imap"
|
||||
BITBUCKET = "bitbucket"
|
||||
|
||||
# Special case just for integration tests
|
||||
MOCK_CONNECTOR = "mock_connector"
|
||||
@@ -541,6 +542,7 @@ DocumentSourceDescription: dict[DocumentSource, str] = {
|
||||
DocumentSource.GITHUB: "github data (issues, PRs)",
|
||||
DocumentSource.GITBOOK: "gitbook data",
|
||||
DocumentSource.GITLAB: "gitlab data",
|
||||
DocumentSource.BITBUCKET: "bitbucket data",
|
||||
DocumentSource.GURU: "guru data",
|
||||
DocumentSource.BOOKSTACK: "bookstack data",
|
||||
DocumentSource.OUTLINE: "outline data",
|
||||
|
||||
0
backend/onyx/connectors/bitbucket/__init__.py
Normal file
0
backend/onyx/connectors/bitbucket/__init__.py
Normal file
345
backend/onyx/connectors/bitbucket/connector.py
Normal file
345
backend/onyx/connectors/bitbucket/connector.py
Normal file
@@ -0,0 +1,345 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Iterator
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from typing import Any
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from typing_extensions import override
|
||||
|
||||
from onyx.configs.app_configs import INDEX_BATCH_SIZE
|
||||
from onyx.configs.app_configs import REQUEST_TIMEOUT_SECONDS
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.connectors.bitbucket.utils import build_auth_client
|
||||
from onyx.connectors.bitbucket.utils import list_repositories
|
||||
from onyx.connectors.bitbucket.utils import map_pr_to_document
|
||||
from onyx.connectors.bitbucket.utils import paginate
|
||||
from onyx.connectors.bitbucket.utils import PR_LIST_RESPONSE_FIELDS
|
||||
from onyx.connectors.bitbucket.utils import SLIM_PR_LIST_RESPONSE_FIELDS
|
||||
from onyx.connectors.exceptions import CredentialExpiredError
|
||||
from onyx.connectors.exceptions import InsufficientPermissionsError
|
||||
from onyx.connectors.exceptions import UnexpectedValidationError
|
||||
from onyx.connectors.interfaces import CheckpointedConnector
|
||||
from onyx.connectors.interfaces import CheckpointOutput
|
||||
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
|
||||
from onyx.connectors.interfaces import SlimConnector
|
||||
from onyx.connectors.models import ConnectorCheckpoint
|
||||
from onyx.connectors.models import ConnectorFailure
|
||||
from onyx.connectors.models import ConnectorMissingCredentialError
|
||||
from onyx.connectors.models import DocumentFailure
|
||||
from onyx.connectors.models import SlimDocument
|
||||
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
|
||||
from onyx.utils.logger import setup_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import httpx
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
class BitbucketConnectorCheckpoint(ConnectorCheckpoint):
|
||||
"""Checkpoint state for resumable Bitbucket PR indexing.
|
||||
|
||||
Fields:
|
||||
repos_queue: Materialized list of repository slugs to process.
|
||||
current_repo_index: Index of the repository currently being processed.
|
||||
next_url: Bitbucket "next" URL for continuing pagination within the current repo.
|
||||
"""
|
||||
|
||||
repos_queue: list[str] = []
|
||||
current_repo_index: int = 0
|
||||
next_url: str | None = None
|
||||
|
||||
|
||||
class BitbucketConnector(
|
||||
CheckpointedConnector[BitbucketConnectorCheckpoint],
|
||||
SlimConnector,
|
||||
):
|
||||
"""Connector for indexing Bitbucket Cloud pull requests.
|
||||
|
||||
Args:
|
||||
workspace: Bitbucket workspace ID.
|
||||
repositories: Comma-separated list of repository slugs to index.
|
||||
projects: Comma-separated list of project keys to index all repositories within.
|
||||
batch_size: Max number of documents to yield per batch.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
workspace: str,
|
||||
repositories: str | None = None,
|
||||
projects: str | None = None,
|
||||
batch_size: int = INDEX_BATCH_SIZE,
|
||||
) -> None:
|
||||
self.workspace = workspace
|
||||
self._repositories = (
|
||||
[s.strip() for s in repositories.split(",") if s.strip()]
|
||||
if repositories
|
||||
else None
|
||||
)
|
||||
self._projects: list[str] | None = (
|
||||
[s.strip() for s in projects.split(",") if s.strip()] if projects else None
|
||||
)
|
||||
self.batch_size = batch_size
|
||||
self.email: str | None = None
|
||||
self.api_token: str | None = None
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
"""Load API token-based credentials.
|
||||
|
||||
Expects a dict with keys: `bitbucket_email`, `bitbucket_api_token`.
|
||||
"""
|
||||
self.email = credentials.get("bitbucket_email")
|
||||
self.api_token = credentials.get("bitbucket_api_token")
|
||||
if not self.email or not self.api_token:
|
||||
raise ConnectorMissingCredentialError("Bitbucket")
|
||||
return None
|
||||
|
||||
def _client(self) -> httpx.Client:
|
||||
"""Build an authenticated HTTP client or raise if credentials missing."""
|
||||
if not self.email or not self.api_token:
|
||||
raise ConnectorMissingCredentialError("Bitbucket")
|
||||
return build_auth_client(self.email, self.api_token)
|
||||
|
||||
def _iter_pull_requests_for_repo(
|
||||
self,
|
||||
client: httpx.Client,
|
||||
repo_slug: str,
|
||||
params: dict[str, Any] | None = None,
|
||||
start_url: str | None = None,
|
||||
on_page: Callable[[str | None], None] | None = None,
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
base = f"https://api.bitbucket.org/2.0/repositories/{self.workspace}/{repo_slug}/pullrequests"
|
||||
yield from paginate(
|
||||
client,
|
||||
base,
|
||||
params,
|
||||
start_url=start_url,
|
||||
on_page=on_page,
|
||||
)
|
||||
|
||||
def _build_params(
|
||||
self,
|
||||
fields: str = PR_LIST_RESPONSE_FIELDS,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Build Bitbucket fetch params.
|
||||
|
||||
Always include OPEN, MERGED, and DECLINED PRs. If both ``start`` and
|
||||
``end`` are provided, apply a single updated_on time window.
|
||||
"""
|
||||
|
||||
def _iso(ts: SecondsSinceUnixEpoch) -> str:
|
||||
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
|
||||
|
||||
def _tc_epoch(
|
||||
lower_epoch: SecondsSinceUnixEpoch | None,
|
||||
upper_epoch: SecondsSinceUnixEpoch | None,
|
||||
) -> str | None:
|
||||
if lower_epoch is not None and upper_epoch is not None:
|
||||
lower_iso = _iso(lower_epoch)
|
||||
upper_iso = _iso(upper_epoch)
|
||||
return f'(updated_on >= "{lower_iso}" AND updated_on <= "{upper_iso}")'
|
||||
return None
|
||||
|
||||
params: dict[str, Any] = {"fields": fields, "pagelen": 50}
|
||||
time_clause = _tc_epoch(start, end)
|
||||
q = '(state = "OPEN" OR state = "MERGED" OR state = "DECLINED")'
|
||||
if time_clause:
|
||||
q = f"{q} AND {time_clause}"
|
||||
params["q"] = q
|
||||
return params
|
||||
|
||||
def _iter_target_repositories(self, client: httpx.Client) -> Iterator[str]:
|
||||
"""Yield repository slugs based on configuration.
|
||||
|
||||
Priority:
|
||||
- repositories list
|
||||
- projects list (list repos by project key)
|
||||
- workspace (all repos)
|
||||
"""
|
||||
if self._repositories:
|
||||
for slug in self._repositories:
|
||||
yield slug
|
||||
return
|
||||
if self._projects:
|
||||
for project_key in self._projects:
|
||||
for repo in list_repositories(client, self.workspace, project_key):
|
||||
slug_val = repo.get("slug")
|
||||
if isinstance(slug_val, str) and slug_val:
|
||||
yield slug_val
|
||||
return
|
||||
for repo in list_repositories(client, self.workspace, None):
|
||||
slug_val = repo.get("slug")
|
||||
if isinstance(slug_val, str) and slug_val:
|
||||
yield slug_val
|
||||
|
||||
@override
|
||||
def load_from_checkpoint(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch,
|
||||
end: SecondsSinceUnixEpoch,
|
||||
checkpoint: BitbucketConnectorCheckpoint,
|
||||
) -> CheckpointOutput[BitbucketConnectorCheckpoint]:
|
||||
"""Resumable PR ingestion across repos and pages within a time window.
|
||||
|
||||
Yields Documents (or ConnectorFailure for per-PR mapping failures) and returns
|
||||
an updated checkpoint that records repo position and next page URL.
|
||||
"""
|
||||
new_checkpoint = copy.deepcopy(checkpoint)
|
||||
|
||||
with self._client() as client:
|
||||
# Materialize target repositories once
|
||||
if not new_checkpoint.repos_queue:
|
||||
# Preserve explicit order; otherwise ensure deterministic ordering
|
||||
repos_list = list(self._iter_target_repositories(client))
|
||||
new_checkpoint.repos_queue = sorted(set(repos_list))
|
||||
new_checkpoint.current_repo_index = 0
|
||||
new_checkpoint.next_url = None
|
||||
|
||||
repos = new_checkpoint.repos_queue
|
||||
if not repos or new_checkpoint.current_repo_index >= len(repos):
|
||||
new_checkpoint.has_more = False
|
||||
return new_checkpoint
|
||||
|
||||
repo_slug = repos[new_checkpoint.current_repo_index]
|
||||
|
||||
first_page_params = self._build_params(
|
||||
fields=PR_LIST_RESPONSE_FIELDS,
|
||||
start=start,
|
||||
end=end,
|
||||
)
|
||||
|
||||
def _on_page(next_url: str | None) -> None:
|
||||
new_checkpoint.next_url = next_url
|
||||
|
||||
for pr in self._iter_pull_requests_for_repo(
|
||||
client,
|
||||
repo_slug,
|
||||
params=first_page_params,
|
||||
start_url=new_checkpoint.next_url,
|
||||
on_page=_on_page,
|
||||
):
|
||||
try:
|
||||
document = map_pr_to_document(pr, self.workspace, repo_slug)
|
||||
yield document
|
||||
except Exception as e:
|
||||
pr_id = pr.get("id")
|
||||
pr_link = (
|
||||
f"https://bitbucket.org/{self.workspace}/{repo_slug}/pull-requests/{pr_id}"
|
||||
if pr_id is not None
|
||||
else None
|
||||
)
|
||||
yield ConnectorFailure(
|
||||
failed_document=DocumentFailure(
|
||||
document_id=(
|
||||
f"{DocumentSource.BITBUCKET.value}:{self.workspace}:{repo_slug}:pr:{pr_id}"
|
||||
if pr_id is not None
|
||||
else f"{DocumentSource.BITBUCKET.value}:{self.workspace}:{repo_slug}:pr:unknown"
|
||||
),
|
||||
document_link=pr_link,
|
||||
),
|
||||
failure_message=f"Failed to process Bitbucket PR: {e}",
|
||||
exception=e,
|
||||
)
|
||||
|
||||
# Advance to next repository (if any) and set has_more accordingly
|
||||
new_checkpoint.current_repo_index += 1
|
||||
new_checkpoint.next_url = None
|
||||
new_checkpoint.has_more = new_checkpoint.current_repo_index < len(repos)
|
||||
|
||||
return new_checkpoint
|
||||
|
||||
@override
|
||||
def build_dummy_checkpoint(self) -> BitbucketConnectorCheckpoint:
|
||||
"""Create an initial checkpoint with work remaining."""
|
||||
return BitbucketConnectorCheckpoint(has_more=True)
|
||||
|
||||
@override
|
||||
def validate_checkpoint_json(
|
||||
self, checkpoint_json: str
|
||||
) -> BitbucketConnectorCheckpoint:
|
||||
"""Validate and deserialize a checkpoint instance from JSON."""
|
||||
return BitbucketConnectorCheckpoint.model_validate_json(checkpoint_json)
|
||||
|
||||
def retrieve_all_slim_documents(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
callback: IndexingHeartbeatInterface | None = None,
|
||||
) -> Iterator[list[SlimDocument]]:
|
||||
"""Return only document IDs for all existing pull requests."""
|
||||
batch: list[SlimDocument] = []
|
||||
params = self._build_params(
|
||||
fields=SLIM_PR_LIST_RESPONSE_FIELDS,
|
||||
start=start,
|
||||
end=end,
|
||||
)
|
||||
with self._client() as client:
|
||||
for slug in self._iter_target_repositories(client):
|
||||
for pr in self._iter_pull_requests_for_repo(
|
||||
client, slug, params=params
|
||||
):
|
||||
pr_id = pr["id"]
|
||||
doc_id = f"{DocumentSource.BITBUCKET.value}:{self.workspace}:{slug}:pr:{pr_id}"
|
||||
batch.append(SlimDocument(id=doc_id))
|
||||
if len(batch) >= self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
if callback:
|
||||
if callback.should_stop():
|
||||
# Note: this is not actually used for permission sync yet, just pruning
|
||||
raise RuntimeError(
|
||||
"bitbucket_pr_sync: Stop signal detected"
|
||||
)
|
||||
callback.progress("bitbucket_pr_sync", len(batch))
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
def validate_connector_settings(self) -> None:
|
||||
"""Validate Bitbucket credentials and workspace access by probing a lightweight endpoint.
|
||||
|
||||
Raises:
|
||||
CredentialExpiredError: on HTTP 401
|
||||
InsufficientPermissionsError: on HTTP 403
|
||||
UnexpectedValidationError: on any other failure
|
||||
"""
|
||||
try:
|
||||
with self._client() as client:
|
||||
url = f"https://api.bitbucket.org/2.0/repositories/{self.workspace}"
|
||||
resp = client.get(
|
||||
url,
|
||||
params={"pagelen": 1, "fields": "pagelen"},
|
||||
timeout=REQUEST_TIMEOUT_SECONDS,
|
||||
)
|
||||
if resp.status_code == 401:
|
||||
raise CredentialExpiredError(
|
||||
"Invalid or expired Bitbucket credentials (HTTP 401)."
|
||||
)
|
||||
if resp.status_code == 403:
|
||||
raise InsufficientPermissionsError(
|
||||
"Insufficient permissions to access Bitbucket workspace (HTTP 403)."
|
||||
)
|
||||
if resp.status_code < 200 or resp.status_code >= 300:
|
||||
raise UnexpectedValidationError(
|
||||
f"Unexpected Bitbucket error (status={resp.status_code})."
|
||||
)
|
||||
except Exception as e:
|
||||
# Network or other unexpected errors
|
||||
if isinstance(
|
||||
e,
|
||||
(
|
||||
CredentialExpiredError,
|
||||
InsufficientPermissionsError,
|
||||
UnexpectedValidationError,
|
||||
ConnectorMissingCredentialError,
|
||||
),
|
||||
):
|
||||
raise
|
||||
raise UnexpectedValidationError(
|
||||
f"Unexpected error while validating Bitbucket settings: {e}"
|
||||
)
|
||||
294
backend/onyx/connectors/bitbucket/utils.py
Normal file
294
backend/onyx/connectors/bitbucket/utils.py
Normal file
@@ -0,0 +1,294 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Iterator
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from onyx.configs.app_configs import REQUEST_TIMEOUT_SECONDS
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import (
|
||||
rate_limit_builder,
|
||||
)
|
||||
from onyx.connectors.models import BasicExpertInfo
|
||||
from onyx.connectors.models import Document
|
||||
from onyx.connectors.models import ImageSection
|
||||
from onyx.connectors.models import TextSection
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.retry_wrapper import retry_builder
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
# Fields requested from Bitbucket PR list endpoint to ensure rich PR data
|
||||
PR_LIST_RESPONSE_FIELDS: str = ",".join(
|
||||
[
|
||||
"next",
|
||||
"page",
|
||||
"pagelen",
|
||||
"values.author",
|
||||
"values.close_source_branch",
|
||||
"values.closed_by",
|
||||
"values.comment_count",
|
||||
"values.created_on",
|
||||
"values.description",
|
||||
"values.destination",
|
||||
"values.draft",
|
||||
"values.id",
|
||||
"values.links",
|
||||
"values.merge_commit",
|
||||
"values.participants",
|
||||
"values.reason",
|
||||
"values.rendered",
|
||||
"values.reviewers",
|
||||
"values.source",
|
||||
"values.state",
|
||||
"values.summary",
|
||||
"values.task_count",
|
||||
"values.title",
|
||||
"values.type",
|
||||
"values.updated_on",
|
||||
]
|
||||
)
|
||||
|
||||
# Minimal fields for slim retrieval (IDs only)
|
||||
SLIM_PR_LIST_RESPONSE_FIELDS: str = ",".join(
|
||||
[
|
||||
"next",
|
||||
"page",
|
||||
"pagelen",
|
||||
"values.id",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
# Minimal fields for repository list calls
|
||||
REPO_LIST_RESPONSE_FIELDS: str = ",".join(
|
||||
[
|
||||
"next",
|
||||
"page",
|
||||
"pagelen",
|
||||
"values.slug",
|
||||
"values.full_name",
|
||||
"values.project.key",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class BitbucketRetriableError(Exception):
|
||||
"""Raised for retriable Bitbucket conditions (429, 5xx)."""
|
||||
|
||||
|
||||
class BitbucketNonRetriableError(Exception):
|
||||
"""Raised for non-retriable Bitbucket client errors (4xx except 429)."""
|
||||
|
||||
|
||||
@retry_builder(
|
||||
tries=6,
|
||||
delay=1,
|
||||
backoff=2,
|
||||
max_delay=30,
|
||||
exceptions=(BitbucketRetriableError, httpx.RequestError),
|
||||
)
|
||||
@rate_limit_builder(max_calls=60, period=60)
|
||||
def bitbucket_get(
|
||||
client: httpx.Client, url: str, params: dict[str, Any] | None = None
|
||||
) -> httpx.Response:
|
||||
"""Perform a GET against Bitbucket with retry and rate limiting.
|
||||
|
||||
Retries on 429 and 5xx responses, and on transport errors. Honors
|
||||
`Retry-After` header for 429 when present by sleeping before retrying.
|
||||
"""
|
||||
try:
|
||||
response = client.get(url, params=params, timeout=REQUEST_TIMEOUT_SECONDS)
|
||||
except httpx.RequestError:
|
||||
# Allow retry_builder to handle retries of transport errors
|
||||
raise
|
||||
|
||||
try:
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError as e:
|
||||
status = e.response.status_code if e.response is not None else None
|
||||
if status == 429:
|
||||
retry_after = e.response.headers.get("Retry-After") if e.response else None
|
||||
if retry_after is not None:
|
||||
try:
|
||||
time.sleep(int(retry_after))
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
raise BitbucketRetriableError("Bitbucket rate limit exceeded (429)") from e
|
||||
if status is not None and 500 <= status < 600:
|
||||
raise BitbucketRetriableError(f"Bitbucket server error: {status}") from e
|
||||
if status is not None and 400 <= status < 500:
|
||||
raise BitbucketNonRetriableError(f"Bitbucket client error: {status}") from e
|
||||
# Unknown status, propagate
|
||||
raise
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def build_auth_client(email: str, api_token: str) -> httpx.Client:
|
||||
"""Create an authenticated httpx client for Bitbucket Cloud API."""
|
||||
return httpx.Client(auth=(email, api_token), http2=True)
|
||||
|
||||
|
||||
def paginate(
|
||||
client: httpx.Client,
|
||||
url: str,
|
||||
params: dict[str, Any] | None = None,
|
||||
start_url: str | None = None,
|
||||
on_page: Callable[[str | None], None] | None = None,
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
"""Iterate over paginated Bitbucket API responses yielding individual values.
|
||||
|
||||
Args:
|
||||
client: Authenticated HTTP client.
|
||||
url: Base collection URL (first page when start_url is None).
|
||||
params: Query params for the first page.
|
||||
start_url: If provided, start from this absolute URL (ignores params).
|
||||
on_page: Optional callback invoked after each page with the next page URL.
|
||||
"""
|
||||
next_url = start_url or url
|
||||
# If resuming from a next URL, do not pass params again
|
||||
query = params.copy() if params else None
|
||||
query = None if start_url else query
|
||||
while next_url:
|
||||
resp = bitbucket_get(client, next_url, params=query)
|
||||
data = resp.json()
|
||||
values = data.get("values", [])
|
||||
for item in values:
|
||||
yield item
|
||||
next_url = data.get("next")
|
||||
if on_page is not None:
|
||||
on_page(next_url)
|
||||
# only include params on first call, next_url will contain all necessary params
|
||||
query = None
|
||||
|
||||
|
||||
def list_repositories(
|
||||
client: httpx.Client, workspace: str, project_key: str | None = None
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
"""List repositories in a workspace, optionally filtered by project key."""
|
||||
base_url = f"https://api.bitbucket.org/2.0/repositories/{workspace}"
|
||||
params: dict[str, Any] = {
|
||||
"fields": REPO_LIST_RESPONSE_FIELDS,
|
||||
"pagelen": 100,
|
||||
# Ensure deterministic ordering
|
||||
"sort": "full_name",
|
||||
}
|
||||
if project_key:
|
||||
params["q"] = f'project.key="{project_key}"'
|
||||
yield from paginate(client, base_url, params)
|
||||
|
||||
|
||||
def map_pr_to_document(pr: dict[str, Any], workspace: str, repo_slug: str) -> Document:
|
||||
"""Map a Bitbucket pull request JSON to Onyx Document."""
|
||||
pr_id = pr["id"]
|
||||
title = pr.get("title") or f"PR {pr_id}"
|
||||
description = pr.get("description") or ""
|
||||
state = pr.get("state")
|
||||
draft = pr.get("draft", False)
|
||||
author = pr.get("author", {})
|
||||
reviewers = pr.get("reviewers", [])
|
||||
participants = pr.get("participants", [])
|
||||
|
||||
link = pr.get("links", {}).get("html", {}).get("href") or (
|
||||
f"https://bitbucket.org/{workspace}/{repo_slug}/pull-requests/{pr_id}"
|
||||
)
|
||||
|
||||
created_on = pr.get("created_on")
|
||||
updated_on = pr.get("updated_on")
|
||||
updated_dt = (
|
||||
datetime.fromisoformat(updated_on.replace("Z", "+00:00")).astimezone(
|
||||
timezone.utc
|
||||
)
|
||||
if isinstance(updated_on, str)
|
||||
else None
|
||||
)
|
||||
|
||||
source_branch = pr.get("source", {}).get("branch", {}).get("name", "")
|
||||
destination_branch = pr.get("destination", {}).get("branch", {}).get("name", "")
|
||||
|
||||
approved_by = [
|
||||
_get_user_name(p.get("user", {})) for p in participants if p.get("approved")
|
||||
]
|
||||
|
||||
primary_owner = None
|
||||
if author:
|
||||
primary_owner = BasicExpertInfo(
|
||||
display_name=_get_user_name(author),
|
||||
)
|
||||
|
||||
secondary_owners = [
|
||||
BasicExpertInfo(display_name=_get_user_name(r)) for r in reviewers
|
||||
] or None
|
||||
|
||||
reviewer_names = [_get_user_name(r) for r in reviewers]
|
||||
|
||||
# Create a concise summary of key PR info
|
||||
created_date = created_on.split("T")[0] if created_on else "N/A"
|
||||
updated_date = updated_on.split("T")[0] if updated_on else "N/A"
|
||||
content_text = (
|
||||
"Pull Request Information:\n"
|
||||
f"- Pull Request ID: {pr_id}\n"
|
||||
f"- Title: {title}\n"
|
||||
f"- State: {state or 'N/A'} {'(Draft)' if draft else ''}\n"
|
||||
)
|
||||
if state == "DECLINED":
|
||||
content_text += f"- Reason: {pr.get('reason', 'N/A')}\n"
|
||||
content_text += (
|
||||
f"- Author: {_get_user_name(author) if author else 'N/A'}\n"
|
||||
f"- Reviewers: {', '.join(reviewer_names) if reviewer_names else 'N/A'}\n"
|
||||
f"- Branch: {source_branch} -> {destination_branch}\n"
|
||||
f"- Created: {created_date}\n"
|
||||
f"- Updated: {updated_date}"
|
||||
)
|
||||
if description:
|
||||
content_text += f"\n\nDescription:\n{description}"
|
||||
sections: list[TextSection | ImageSection] = [
|
||||
TextSection(link=link, text=content_text)
|
||||
]
|
||||
|
||||
metadata: dict[str, str | list[str]] = {
|
||||
"object_type": "PullRequest",
|
||||
"workspace": workspace,
|
||||
"repository": repo_slug,
|
||||
"pr_key": f"{workspace}/{repo_slug}#{pr_id}",
|
||||
"id": str(pr_id),
|
||||
"title": title,
|
||||
"state": state or "",
|
||||
"draft": str(bool(draft)),
|
||||
"link": link,
|
||||
"author": _get_user_name(author) if author else "",
|
||||
"reviewers": reviewer_names,
|
||||
"approved_by": approved_by,
|
||||
"comment_count": str(pr.get("comment_count", "")),
|
||||
"task_count": str(pr.get("task_count", "")),
|
||||
"created_on": created_on or "",
|
||||
"updated_on": updated_on or "",
|
||||
"source_branch": source_branch,
|
||||
"destination_branch": destination_branch,
|
||||
"closed_by": (
|
||||
_get_user_name(pr.get("closed_by", {})) if pr.get("closed_by") else ""
|
||||
),
|
||||
"close_source_branch": str(bool(pr.get("close_source_branch", False))),
|
||||
}
|
||||
|
||||
return Document(
|
||||
id=f"{DocumentSource.BITBUCKET.value}:{workspace}:{repo_slug}:pr:{pr_id}",
|
||||
sections=sections,
|
||||
source=DocumentSource.BITBUCKET,
|
||||
semantic_identifier=f"#{pr_id}: {title}",
|
||||
title=title,
|
||||
doc_updated_at=updated_dt,
|
||||
primary_owners=[primary_owner] if primary_owner else None,
|
||||
secondary_owners=secondary_owners,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
|
||||
def _get_user_name(user: dict[str, Any]) -> str:
|
||||
return user.get("display_name") or user.get("nickname") or "unknown"
|
||||
@@ -9,6 +9,7 @@ from onyx.configs.llm_configs import get_image_extraction_and_analysis_enabled
|
||||
from onyx.connectors.airtable.airtable_connector import AirtableConnector
|
||||
from onyx.connectors.asana.connector import AsanaConnector
|
||||
from onyx.connectors.axero.connector import AxeroConnector
|
||||
from onyx.connectors.bitbucket.connector import BitbucketConnector
|
||||
from onyx.connectors.blob.connector import BlobStorageConnector
|
||||
from onyx.connectors.bookstack.connector import BookstackConnector
|
||||
from onyx.connectors.clickup.connector import ClickupConnector
|
||||
@@ -125,6 +126,7 @@ def identify_connector_class(
|
||||
DocumentSource.AIRTABLE: AirtableConnector,
|
||||
DocumentSource.HIGHSPOT: HighspotConnector,
|
||||
DocumentSource.IMAP: ImapConnector,
|
||||
DocumentSource.BITBUCKET: BitbucketConnector,
|
||||
# just for integration tests
|
||||
DocumentSource.MOCK_CONNECTOR: MockConnector,
|
||||
}
|
||||
|
||||
5
backend/tests/daily/connectors/bitbucket/conftest.py
Normal file
5
backend/tests/daily/connectors/bitbucket/conftest.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from tests.load_env_vars import load_env_vars
|
||||
|
||||
|
||||
# Load environment variables at the module level
|
||||
load_env_vars()
|
||||
@@ -0,0 +1,70 @@
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.connectors.bitbucket.connector import BitbucketConnector
|
||||
from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bitbucket_connector_for_checkpoint() -> BitbucketConnector:
|
||||
"""Daily fixture for Bitbucket checkpointed indexing.
|
||||
|
||||
Env vars:
|
||||
- BITBUCKET_EMAIL: Bitbucket account email
|
||||
- BITBUCKET_API_TOKEN: Bitbucket app password/token
|
||||
- BITBUCKET_WORKSPACE: workspace id
|
||||
- BITBUCKET_REPOSITORIES: comma-separated slugs
|
||||
- BITBUCKET_PROJECTS: optional comma-separated project keys
|
||||
"""
|
||||
workspace = os.environ["BITBUCKET_WORKSPACE"]
|
||||
repositories = os.environ.get("BITBUCKET_REPOSITORIES")
|
||||
projects = os.environ.get("BITBUCKET_PROJECTS")
|
||||
|
||||
connector = BitbucketConnector(
|
||||
workspace=workspace,
|
||||
repositories=repositories,
|
||||
projects=projects,
|
||||
batch_size=10,
|
||||
)
|
||||
|
||||
email = os.environ.get("BITBUCKET_EMAIL")
|
||||
token = os.environ.get("BITBUCKET_API_TOKEN")
|
||||
if not email or not token:
|
||||
pytest.skip("BITBUCKET_EMAIL or BITBUCKET_API_TOKEN not set in environment")
|
||||
|
||||
connector.load_credentials({"bitbucket_email": email, "bitbucket_api_token": token})
|
||||
return connector
|
||||
|
||||
|
||||
def test_bitbucket_checkpointed_load(
|
||||
bitbucket_connector_for_checkpoint: BitbucketConnector,
|
||||
) -> None:
|
||||
# Use a broad window; results may be empty depending on repository state
|
||||
start = 1755004439 # Tue Aug 12 2025 13:13:59 UTC
|
||||
end = time.time()
|
||||
|
||||
docs = load_all_docs_from_checkpoint_connector(
|
||||
connector=bitbucket_connector_for_checkpoint,
|
||||
start=start,
|
||||
end=end,
|
||||
)
|
||||
|
||||
assert isinstance(docs, list)
|
||||
|
||||
for doc in docs:
|
||||
assert doc.source == DocumentSource.BITBUCKET
|
||||
assert doc.metadata is not None
|
||||
assert doc.metadata.get("object_type") == "PullRequest"
|
||||
assert "id" in doc.metadata
|
||||
assert "state" in doc.metadata
|
||||
assert "title" in doc.metadata
|
||||
assert "updated_on" in doc.metadata
|
||||
|
||||
# Basic section checks
|
||||
assert len(doc.sections) >= 1
|
||||
section = doc.sections[0]
|
||||
assert isinstance(section.link, str)
|
||||
assert isinstance(section.text, str)
|
||||
@@ -0,0 +1,57 @@
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from onyx.configs.constants import DocumentSource
|
||||
from onyx.connectors.bitbucket.connector import BitbucketConnector
|
||||
from tests.daily.connectors.utils import load_all_docs_from_checkpoint_connector
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bitbucket_connector_for_slim() -> BitbucketConnector:
|
||||
workspace = os.environ["BITBUCKET_WORKSPACE"]
|
||||
repositories = os.environ.get("BITBUCKET_REPOSITORIES")
|
||||
projects = os.environ.get("BITBUCKET_PROJECTS")
|
||||
|
||||
connector = BitbucketConnector(
|
||||
workspace=workspace,
|
||||
repositories=repositories,
|
||||
projects=projects,
|
||||
batch_size=10,
|
||||
)
|
||||
|
||||
email = os.environ.get("BITBUCKET_EMAIL")
|
||||
token = os.environ.get("BITBUCKET_API_TOKEN")
|
||||
if not email or not token:
|
||||
pytest.skip("BITBUCKET_EMAIL or BITBUCKET_API_TOKEN not set in environment")
|
||||
|
||||
connector.load_credentials({"bitbucket_email": email, "bitbucket_api_token": token})
|
||||
return connector
|
||||
|
||||
|
||||
def test_bitbucket_full_ids_subset_of_slim_ids(
|
||||
bitbucket_connector_for_slim: BitbucketConnector,
|
||||
) -> None:
|
||||
# Get all full doc IDs from load_from_state
|
||||
docs = load_all_docs_from_checkpoint_connector(
|
||||
connector=bitbucket_connector_for_slim,
|
||||
start=0,
|
||||
end=time.time(),
|
||||
)
|
||||
all_full_doc_ids: set[str] = set([doc.id for doc in docs])
|
||||
|
||||
# Get all doc IDs from the slim connector
|
||||
all_slim_doc_ids: set[str] = set()
|
||||
for slim_doc_batch in bitbucket_connector_for_slim.retrieve_all_slim_documents():
|
||||
all_slim_doc_ids.update([doc.id for doc in slim_doc_batch])
|
||||
|
||||
# The set of full doc IDs should always be a subset of slim doc IDs
|
||||
assert all_full_doc_ids.issubset(all_slim_doc_ids)
|
||||
# Make sure we actually got some documents
|
||||
assert len(all_slim_doc_ids) > 0
|
||||
|
||||
# Basic sanity checks if any docs exist
|
||||
if all_slim_doc_ids:
|
||||
example_id = next(iter(all_slim_doc_ids))
|
||||
assert example_id.startswith(f"{DocumentSource.BITBUCKET.value}:")
|
||||
13
web/public/Bitbucket.svg
Normal file
13
web/public/Bitbucket.svg
Normal file
@@ -0,0 +1,13 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 128 128">
|
||||
<defs>
|
||||
<linearGradient id="bitbucket-original-a" gradientUnits="userSpaceOnUse" x1="28.593" y1="14.226" x2="16.672"
|
||||
y2="23.532" gradientTransform="scale(4)">
|
||||
<stop offset=".176" stop-color="#0052cc"/>
|
||||
<stop offset="1" stop-color="#2684ff"/>
|
||||
</linearGradient>
|
||||
</defs>
|
||||
<path d="M19.082 20c-1.918 0-3.355 1.758-3.039 3.516l12.95 79.289c.32 2.078 2.077 3.515 4.155 3.515h62.66c1.442 0 2.72-1.12 3.04-2.558l13.109-80.086c.316-1.918-1.121-3.516-3.039-3.516zM74.07 77.227H54.09l-5.278-28.293h30.215zm0 0"
|
||||
fill="#2684ff"/>
|
||||
<path d="M107.64 48.934H78.868L74.07 77.227H54.09l-23.5 27.972s1.12.961 2.719.961h62.66c1.441 0 2.719-1.12 3.039-2.558zm0 0"
|
||||
fill="url(#bitbucket-original-a)"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 846 B |
@@ -85,6 +85,7 @@ import cohereIcon from "../../../public/Cohere.svg";
|
||||
import googleIcon from "../../../public/Google.png";
|
||||
import xenforoIcon from "../../../public/Xenforo.svg";
|
||||
import highspotIcon from "../../../public/Highspot.png";
|
||||
import bitbucketIcon from "../../../public/Bitbucket.svg";
|
||||
import { FaGithub, FaRobot } from "react-icons/fa";
|
||||
import Image from "next/image";
|
||||
import { cn } from "@/lib/utils";
|
||||
@@ -1214,6 +1215,13 @@ export const GithubIcon = ({
|
||||
<FaGithub size={size} className={cn(className, "text-black")} />
|
||||
);
|
||||
|
||||
export const BitbucketIcon = ({
|
||||
size = 16,
|
||||
className = defaultTailwindCSS,
|
||||
}: IconProps) => (
|
||||
<LogoIcon size={size} className={className} src={bitbucketIcon} />
|
||||
);
|
||||
|
||||
export const GlobeIcon = ({
|
||||
size = 16,
|
||||
className = defaultTailwindCSSBlue,
|
||||
|
||||
@@ -282,6 +282,70 @@ export const connectorConfigs: Record<
|
||||
},
|
||||
],
|
||||
},
|
||||
bitbucket: {
|
||||
description: "Configure Bitbucket connector",
|
||||
subtext:
|
||||
"Configure Bitbucket connector (Cloud only). You can index a workspace, specific projects or repositories.",
|
||||
values: [
|
||||
{
|
||||
type: "text",
|
||||
label: "Workspace",
|
||||
name: "workspace",
|
||||
optional: false,
|
||||
description: `The Bitbucket workspace to index (e.g., "atlassian" from https://bitbucket.org/atlassian/workspace ).`,
|
||||
},
|
||||
{
|
||||
type: "tab",
|
||||
name: "bitbucket_mode",
|
||||
label: "What should be indexed from Bitbucket?",
|
||||
optional: true,
|
||||
tabs: [
|
||||
{
|
||||
value: "repo",
|
||||
label: "Specific Repositories",
|
||||
fields: [
|
||||
{
|
||||
type: "text",
|
||||
label: "Repository Slugs",
|
||||
name: "repositories",
|
||||
optional: false,
|
||||
description:
|
||||
"For multiple repositories, enter comma-separated slugs (e.g., repo1,repo2,repo3)",
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
value: "project",
|
||||
label: "Project(s)",
|
||||
fields: [
|
||||
{
|
||||
type: "text",
|
||||
label: "Project Key(s)",
|
||||
name: "projects",
|
||||
optional: false,
|
||||
description:
|
||||
"One or more Bitbucket Project Keys (comma-separated) to index all repositories in those projects (e.g., PROJ1,PROJ2)",
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
value: "workspace",
|
||||
label: "Workspace",
|
||||
fields: [
|
||||
{
|
||||
type: "string_tab",
|
||||
label: "Workspace",
|
||||
name: "workspace_tab",
|
||||
description:
|
||||
"This connector will index all repositories in the workspace.",
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
advanced_values: [],
|
||||
},
|
||||
gitbook: {
|
||||
description: "Configure GitBook connector",
|
||||
values: [
|
||||
@@ -1582,6 +1646,12 @@ export interface GitlabConfig {
|
||||
include_issues: boolean;
|
||||
}
|
||||
|
||||
export interface BitbucketConfig {
|
||||
workspace: string;
|
||||
repositories?: string;
|
||||
projects?: string;
|
||||
}
|
||||
|
||||
export interface GoogleDriveConfig {
|
||||
include_shared_drives?: boolean;
|
||||
shared_drive_urls?: string;
|
||||
|
||||
@@ -57,6 +57,11 @@ export interface GitlabCredentialJson {
|
||||
gitlab_access_token: string;
|
||||
}
|
||||
|
||||
export interface BitbucketCredentialJson {
|
||||
bitbucket_email: string;
|
||||
bitbucket_api_token: string;
|
||||
}
|
||||
|
||||
export interface BookstackCredentialJson {
|
||||
bookstack_base_url: string;
|
||||
bookstack_api_token_id: string;
|
||||
@@ -268,6 +273,10 @@ export const credentialTemplates: Record<ValidSources, any> = {
|
||||
gitlab_url: "",
|
||||
gitlab_access_token: "",
|
||||
} as GitlabCredentialJson,
|
||||
bitbucket: {
|
||||
bitbucket_email: "",
|
||||
bitbucket_api_token: "",
|
||||
} as BitbucketCredentialJson,
|
||||
slack: { slack_bot_token: "" } as SlackCredentialJson,
|
||||
bookstack: {
|
||||
bookstack_base_url: "",
|
||||
@@ -606,6 +615,10 @@ export const credentialDisplayNames: Record<string, string> = {
|
||||
highspot_url: "Highspot URL",
|
||||
highspot_key: "Highspot Key",
|
||||
highspot_secret: "Highspot Secret",
|
||||
|
||||
// Bitbucket
|
||||
bitbucket_email: "Bitbucket Account Email",
|
||||
bitbucket_api_token: "Bitbucket API Token",
|
||||
};
|
||||
|
||||
export function getDisplayNameForCredentialKey(key: string): string {
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
DropboxIcon,
|
||||
GithubIcon,
|
||||
GitlabIcon,
|
||||
BitbucketIcon,
|
||||
GlobeIcon,
|
||||
GmailIcon,
|
||||
GongIcon,
|
||||
@@ -363,6 +364,12 @@ export const SOURCE_METADATA_MAP: SourceMap = {
|
||||
category: SourceCategory.CodeRepository,
|
||||
docs: "https://docs.onyx.app/admin/connectors/official/gitlab",
|
||||
},
|
||||
bitbucket: {
|
||||
icon: BitbucketIcon,
|
||||
displayName: "Bitbucket",
|
||||
category: SourceCategory.CodeRepository,
|
||||
docs: "https://docs.onyx.app/connectors/bitbucket",
|
||||
},
|
||||
|
||||
// Others
|
||||
web: {
|
||||
|
||||
@@ -482,6 +482,7 @@ export enum ValidSources {
|
||||
Gitbook = "gitbook",
|
||||
Highspot = "highspot",
|
||||
Imap = "imap",
|
||||
Bitbucket = "bitbucket",
|
||||
|
||||
// Federated Connectors
|
||||
FederatedSlack = "federated_slack",
|
||||
|
||||
Reference in New Issue
Block a user