Compare commits

...

8 Commits

Author SHA1 Message Date
edwin
3bbcbe3633 . 2025-09-20 16:20:00 -07:00
edwin
32483b20eb . 2025-09-20 16:06:23 -07:00
edwin
7fcadef718 . 2025-09-20 15:51:59 -07:00
edwin
e5a9de3122 . 2025-09-20 15:20:31 -07:00
edwin
7d70aae3df . 2025-09-20 15:13:21 -07:00
edwin
5e4f9dac16 . 2025-09-20 13:50:29 -07:00
edwin
c795231181 . 2025-09-20 13:48:58 -07:00
edwin
4a55abe46d . 2025-09-20 13:15:46 -07:00
3 changed files with 33 additions and 6 deletions

View File

@@ -1,4 +1,5 @@
import contextvars
import time
from concurrent.futures import as_completed
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
@@ -22,6 +23,7 @@ from onyx.connectors.models import TextSection
from onyx.file_processing.extract_file_text import extract_file_text
from onyx.file_processing.extract_file_text import get_file_ext
from onyx.utils.logger import setup_logger
from onyx.utils.retry_wrapper import retry_builder
logger = setup_logger()
@@ -93,7 +95,10 @@ class AirtableConnector(LoadConnector):
)
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
# Use the standard AirtableApi with default retry strategy
# Wrap the main API calls with Onyx's retry_builder for better error handling
self._airtable_client = AirtableApi(credentials["airtable_access_token"])
logger.info("Airtable client initialized")
return None
@property
@@ -187,10 +192,17 @@ class AirtableConnector(LoadConnector):
except requests.exceptions.HTTPError as e:
if e.response.status_code == 410:
logger.info(f"Refreshing attachment for {filename}")
# Re-fetch the record to get a fresh URL
refreshed_record = self.airtable_client.table(
base_id, table_id
).get(record_id)
# Re-fetch the record to get a fresh URL with retry logic
@retry_builder(
tries=2, delay=30, backoff=1, exceptions=(Exception,)
)
def refetch_record():
return self.airtable_client.table(
base_id, table_id
).get(record_id)
refreshed_record = refetch_record()
for refreshed_attachment in refreshed_record["fields"][
field_name
]:
@@ -403,7 +415,12 @@ class AirtableConnector(LoadConnector):
raise AirtableClientNotSetUpError()
table = self.airtable_client.table(self.base_id, self.table_name_or_id)
logger.info("Fetching all records from Airtable table, waiting 30 seconds")
time.sleep(30)
records = table.all()
logger.info(f"Successfully fetched {len(records)} records from Airtable")
table_schema = table.schema()
primary_field_name = None
@@ -417,7 +434,8 @@ class AirtableConnector(LoadConnector):
logger.info(f"Starting to process Airtable records for {table.name}.")
# Process records in parallel batches using ThreadPoolExecutor
PARALLEL_BATCH_SIZE = 8
# Reduce batch size and parallelism to be more respectful of rate limits
PARALLEL_BATCH_SIZE = 4
max_workers = min(PARALLEL_BATCH_SIZE, len(records))
record_documents: list[Document] = []
@@ -456,6 +474,11 @@ class AirtableConnector(LoadConnector):
yield record_documents
record_documents = []
logger.info(
f"Completed batch {i // PARALLEL_BATCH_SIZE + 1} of "
f"{(len(records) + PARALLEL_BATCH_SIZE - 1) // PARALLEL_BATCH_SIZE}"
)
# Yield any remaining records
if record_documents:
yield record_documents

View File

@@ -191,6 +191,7 @@ def compare_documents(
), f"Section {i} link mismatch for document {doc_id}"
@pytest.mark.xdist_group(name="airtable")
def test_airtable_connector_basic(
mock_get_unstructured_api_key: MagicMock, airtable_config: AirtableConfig
) -> None:
@@ -255,6 +256,7 @@ def test_airtable_connector_basic(
compare_documents(doc_batch, expected_docs)
@pytest.mark.xdist_group(name="airtable")
def test_airtable_connector_all_metadata(
mock_get_unstructured_api_key: MagicMock, airtable_config: AirtableConfig
) -> None:
@@ -307,6 +309,7 @@ def test_airtable_connector_all_metadata(
compare_documents(doc_batch, expected_docs)
@pytest.mark.xdist_group(name="airtable")
def test_airtable_connector_with_share_and_view(
mock_get_unstructured_api_key: MagicMock, airtable_config: AirtableConfig
) -> None:

View File

@@ -335,6 +335,7 @@ class CCPairManager:
terminating that indexing."""
start = time.monotonic()
while True:
elapsed = time.monotonic() - start
fetched_cc_pairs = CCPairManager.get_indexing_statuses(
user_performing_action
)
@@ -351,10 +352,10 @@ class CCPairManager:
f"cc_pair={cc_pair.id} "
f"docs_indexed={fetched_cc_pair.docs_indexed} "
f"num_docs={num_docs}"
f"time_taken={elapsed:.2f}s"
)
return
elapsed = time.monotonic() - start
if elapsed > timeout:
raise TimeoutError(
f"Indexing in progress wait timed out: cc_pair={cc_pair.id} timeout={timeout}s"