Compare commits

...

4 Commits

Author SHA1 Message Date
justin-tahara
c6c0175ee1 Reverting web change 2025-12-18 10:25:18 -08:00
justin-tahara
f2e3705f08 Fixing test 2025-12-17 18:41:34 -08:00
justin-tahara
d50303d988 addressing comments 2025-12-17 18:38:41 -08:00
justin-tahara
d868029e2b fix(github): Rate Limit Guardrails 2025-12-17 18:22:51 -08:00
3 changed files with 208 additions and 18 deletions

View File

@@ -28,6 +28,8 @@ from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedValidationError
from onyx.connectors.github.models import SerializedRepository
from onyx.connectors.github.rate_limit_utils import raise_if_approaching_rate_limit
from onyx.connectors.github.rate_limit_utils import RateLimitBudgetLow
from onyx.connectors.github.rate_limit_utils import sleep_after_rate_limit_exception
from onyx.connectors.github.utils import deserialize_repository
from onyx.connectors.github.utils import get_external_access_permission
@@ -172,6 +174,7 @@ def _get_batch_rate_limited(
raise RuntimeError(
"Re-tried fetching batch too many times. Something is going wrong with fetching objects from Github"
)
raise_if_approaching_rate_limit(github_client)
try:
if cursor_url:
# when this is set, we are resuming from an earlier
@@ -567,14 +570,24 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
if self.include_prs and checkpoint.stage == GithubConnectorStage.PRS:
logger.info(f"Fetching PRs for repo: {repo.name}")
pr_batch = _get_batch_rate_limited(
self._pull_requests_func(repo),
checkpoint.curr_page,
checkpoint.cursor_url,
checkpoint.num_retrieved,
cursor_url_callback,
self.github_client,
)
try:
pr_batch = list(
_get_batch_rate_limited(
self._pull_requests_func(repo),
checkpoint.curr_page,
checkpoint.cursor_url,
checkpoint.num_retrieved,
cursor_url_callback,
self.github_client,
)
)
except RateLimitBudgetLow as e:
logger.info(
"Stopping GitHub fetch early to avoid hitting rate limit "
f"(remaining={e.remaining}, threshold={e.threshold}, "
f"resets_at={e.reset_at.isoformat()}, seconds_until_reset={e.seconds_until_reset:.0f})."
)
return checkpoint
checkpoint.curr_page += 1 # NOTE: not used for cursor-based fallback
done_with_prs = False
num_prs = 0
@@ -640,16 +653,24 @@ class GithubConnector(CheckpointedConnectorWithPermSync[GithubConnectorCheckpoin
if self.include_issues and checkpoint.stage == GithubConnectorStage.ISSUES:
logger.info(f"Fetching issues for repo: {repo.name}")
issue_batch = list(
_get_batch_rate_limited(
self._issues_func(repo),
checkpoint.curr_page,
checkpoint.cursor_url,
checkpoint.num_retrieved,
cursor_url_callback,
self.github_client,
try:
issue_batch = list(
_get_batch_rate_limited(
self._issues_func(repo),
checkpoint.curr_page,
checkpoint.cursor_url,
checkpoint.num_retrieved,
cursor_url_callback,
self.github_client,
)
)
)
except RateLimitBudgetLow as e:
logger.info(
"Stopping GitHub fetch early to avoid hitting rate limit "
f"(remaining={e.remaining}, threshold={e.threshold}, "
f"resets_at={e.reset_at.isoformat()}, seconds_until_reset={e.seconds_until_reset:.0f})."
)
return checkpoint
logger.info(f"Fetched {len(issue_batch)} issues for repo: {repo.name}")
checkpoint.curr_page += 1
done_with_issues = False

View File

@@ -1,3 +1,4 @@
import os
import time
from datetime import datetime
from datetime import timedelta
@@ -10,6 +11,59 @@ from onyx.utils.logger import setup_logger
logger = setup_logger()
def _load_minimum_remaining_threshold() -> int:
"""Read the configurable remaining-budget threshold from env."""
default_threshold = 100
try:
return max(
int(
os.environ.get("GITHUB_RATE_LIMIT_MINIMUM_REMAINING", default_threshold)
),
0,
)
except ValueError:
return default_threshold
MINIMUM_RATE_LIMIT_REMAINING = _load_minimum_remaining_threshold()
class RateLimitBudgetLow(Exception):
"""Raised when we're close enough to the rate limit that we should pause early."""
def __init__(
self, remaining: int, threshold: int, reset_at: datetime, *args: object
) -> None:
super().__init__(*args)
self.remaining = remaining
self.threshold = threshold
self.reset_at = reset_at
self.seconds_until_reset = max(
0, (reset_at - datetime.now(tz=timezone.utc)).total_seconds()
)
def raise_if_approaching_rate_limit(
github_client: Github, minimum_remaining: int | None = None
) -> None:
"""Raise if the client is close to its rate limit to avoid long sleeps."""
threshold = (
minimum_remaining
if minimum_remaining is not None
else MINIMUM_RATE_LIMIT_REMAINING
)
if threshold <= 0:
return
core_rate_limit = github_client.get_rate_limit().core
remaining = core_rate_limit.remaining
# GitHub returns a naive datetime; normalize to UTC
reset_at = core_rate_limit.reset.replace(tzinfo=timezone.utc)
if remaining is not None and remaining <= threshold:
raise RateLimitBudgetLow(remaining, threshold, reset_at)
def sleep_after_rate_limit_exception(github_client: Github) -> None:
"""
Sleep until the GitHub rate limit resets.

View File

@@ -24,6 +24,8 @@ from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.github.connector import GithubConnector
from onyx.connectors.github.connector import GithubConnectorStage
from onyx.connectors.github.models import SerializedRepository
from onyx.connectors.github.rate_limit_utils import raise_if_approaching_rate_limit
from onyx.connectors.github.rate_limit_utils import RateLimitBudgetLow
from onyx.connectors.models import Document
from tests.unit.onyx.connectors.utils import load_everything_from_checkpoint_connector
from tests.unit.onyx.connectors.utils import (
@@ -48,7 +50,13 @@ def mock_github_client() -> MagicMock:
mock.get_repo = MagicMock()
mock.get_organization = MagicMock()
mock.get_user = MagicMock()
mock.get_rate_limit = MagicMock(return_value=MagicMock(spec=RateLimit))
# Mock rate limit to avoid triggering RateLimitBudgetLow
mock_rate_limit = MagicMock(spec=RateLimit)
mock_rate_limit.core.remaining = 5000
mock_rate_limit.core.reset = datetime.now(timezone.utc)
mock.get_rate_limit = MagicMock(return_value=mock_rate_limit)
mock._requester = MagicMock(spec=Requester)
return mock
@@ -266,6 +274,7 @@ def test_load_from_checkpoint_with_rate_limit(
# Mock rate limit reset time
mock_rate_limit = MagicMock(spec=RateLimit)
mock_rate_limit.core.remaining = 5000
mock_rate_limit.core.reset = datetime.now(timezone.utc)
github_connector.github_client.get_rate_limit.return_value = mock_rate_limit
@@ -291,6 +300,112 @@ def test_load_from_checkpoint_with_rate_limit(
assert outputs[-1].next_checkpoint.has_more is False
def test_raise_if_approaching_rate_limit_raises_when_under_threshold() -> None:
github_client = MagicMock(spec=Github)
rate_limit_reset = datetime(2024, 1, 1, tzinfo=timezone.utc)
rate_limit = MagicMock(spec=RateLimit)
rate_limit.core = MagicMock(remaining=5, reset=rate_limit_reset)
github_client.get_rate_limit.return_value = rate_limit
with pytest.raises(RateLimitBudgetLow) as excinfo:
raise_if_approaching_rate_limit(github_client, minimum_remaining=10)
assert excinfo.value.remaining == 5
assert excinfo.value.threshold == 10
assert excinfo.value.reset_at == rate_limit_reset
def test_raise_if_approaching_rate_limit_allows_when_above_threshold() -> None:
github_client = MagicMock(spec=Github)
rate_limit_reset = datetime(2024, 1, 1, tzinfo=timezone.utc)
rate_limit = MagicMock(spec=RateLimit)
rate_limit.core = MagicMock(remaining=50, reset=rate_limit_reset)
github_client.get_rate_limit.return_value = rate_limit
# Should not raise
raise_if_approaching_rate_limit(github_client, minimum_remaining=10)
def test_fetch_from_github_returns_checkpoint_when_rate_limit_low_prs(
build_github_connector: Callable[..., GithubConnector],
mock_github_client: MagicMock,
create_mock_repo: Callable[..., MagicMock],
) -> None:
github_connector = build_github_connector()
github_connector.github_client = mock_github_client
mock_repo = create_mock_repo()
checkpoint = github_connector.build_dummy_checkpoint()
checkpoint.cached_repo = SerializedRepository(
id=mock_repo.id, headers=mock_repo.raw_headers, raw_data=mock_repo.raw_data
)
checkpoint.cached_repo_ids = []
checkpoint.stage = GithubConnectorStage.PRS
rate_limit_exception = RateLimitBudgetLow(
remaining=1, threshold=10, reset_at=datetime.now(timezone.utc)
)
with (
patch.object(SerializedRepository, "to_Repository", return_value=mock_repo),
patch(
"onyx.connectors.github.connector._get_batch_rate_limited",
side_effect=rate_limit_exception,
) as mock_batch,
):
gen = github_connector._fetch_from_github(checkpoint)
with pytest.raises(StopIteration) as stop_exc:
next(gen)
returned_checkpoint = stop_exc.value.value
mock_batch.assert_called_once()
assert returned_checkpoint.stage == GithubConnectorStage.PRS
assert returned_checkpoint.curr_page == 0
assert returned_checkpoint.num_retrieved == 0
assert returned_checkpoint.cached_repo is not None
assert returned_checkpoint.cached_repo.id == mock_repo.id
def test_fetch_from_github_returns_checkpoint_when_rate_limit_low_issues(
build_github_connector: Callable[..., GithubConnector],
mock_github_client: MagicMock,
create_mock_repo: Callable[..., MagicMock],
) -> None:
github_connector = build_github_connector()
github_connector.github_client = mock_github_client
mock_repo = create_mock_repo()
checkpoint = github_connector.build_dummy_checkpoint()
checkpoint.cached_repo = SerializedRepository(
id=mock_repo.id, headers=mock_repo.raw_headers, raw_data=mock_repo.raw_data
)
checkpoint.cached_repo_ids = []
checkpoint.stage = GithubConnectorStage.ISSUES
rate_limit_exception = RateLimitBudgetLow(
remaining=1, threshold=10, reset_at=datetime.now(timezone.utc)
)
with (
patch.object(SerializedRepository, "to_Repository", return_value=mock_repo),
patch(
"onyx.connectors.github.connector._get_batch_rate_limited",
side_effect=rate_limit_exception,
) as mock_batch,
):
gen = github_connector._fetch_from_github(checkpoint)
with pytest.raises(StopIteration) as stop_exc:
next(gen)
returned_checkpoint = stop_exc.value.value
mock_batch.assert_called_once()
assert returned_checkpoint.stage == GithubConnectorStage.ISSUES
assert returned_checkpoint.curr_page == 0
assert returned_checkpoint.num_retrieved == 0
assert returned_checkpoint.cached_repo is not None
assert returned_checkpoint.cached_repo.id == mock_repo.id
def test_load_from_checkpoint_with_empty_repo(
build_github_connector: Callable[..., GithubConnector],
mock_github_client: MagicMock,