mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-02-26 12:15:48 +00:00
Compare commits
1 Commits
optimize_s
...
foreign_ke
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
99bfe15395 |
@@ -0,0 +1,77 @@
|
||||
"""foreign key input prompts
|
||||
|
||||
Revision ID: 33ea50e88f24
|
||||
Revises: 4d58345da04a
|
||||
Create Date: 2025-01-29 10:54:22.141765
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "33ea50e88f24"
|
||||
down_revision = "4d58345da04a"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# First drop the existing FK constraints
|
||||
op.drop_constraint(
|
||||
"inputprompt__user_input_prompt_id_fkey",
|
||||
"inputprompt__user",
|
||||
type_="foreignkey",
|
||||
)
|
||||
op.drop_constraint(
|
||||
"inputprompt__user_user_id_fkey",
|
||||
"inputprompt__user",
|
||||
type_="foreignkey",
|
||||
)
|
||||
|
||||
# Recreate with ON DELETE CASCADE
|
||||
op.create_foreign_key(
|
||||
"inputprompt__user_input_prompt_id_fkey",
|
||||
"inputprompt__user",
|
||||
"inputprompt",
|
||||
["input_prompt_id"],
|
||||
["id"],
|
||||
ondelete="CASCADE",
|
||||
)
|
||||
op.create_foreign_key(
|
||||
"inputprompt__user_user_id_fkey",
|
||||
"inputprompt__user",
|
||||
'"user"',
|
||||
["user_id"],
|
||||
["id"],
|
||||
ondelete="CASCADE",
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# Drop the new FKs with ondelete
|
||||
op.drop_constraint(
|
||||
"inputprompt__user_input_prompt_id_fkey",
|
||||
"inputprompt__user",
|
||||
type_="foreignkey",
|
||||
)
|
||||
op.drop_constraint(
|
||||
"inputprompt__user_user_id_fkey",
|
||||
"inputprompt__user",
|
||||
type_="foreignkey",
|
||||
)
|
||||
|
||||
# Recreate them without cascading
|
||||
op.create_foreign_key(
|
||||
"inputprompt__user_input_prompt_id_fkey",
|
||||
"inputprompt__user",
|
||||
"inputprompt",
|
||||
["input_prompt_id"],
|
||||
["id"],
|
||||
)
|
||||
op.create_foreign_key(
|
||||
"inputprompt__user_user_id_fkey",
|
||||
"inputprompt__user",
|
||||
'"user"',
|
||||
["user_id"],
|
||||
["id"],
|
||||
)
|
||||
@@ -111,7 +111,6 @@ async def login_as_anonymous_user(
|
||||
token = generate_anonymous_user_jwt_token(tenant_id)
|
||||
|
||||
response = Response()
|
||||
response.delete_cookie("fastapiusersauth")
|
||||
response.set_cookie(
|
||||
key=ANONYMOUS_USER_COOKIE_NAME,
|
||||
value=token,
|
||||
|
||||
@@ -11,7 +11,6 @@ from celery import Task
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
from redis import Redis
|
||||
from redis.lock import Lock as RedisLock
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from ee.onyx.db.connector_credential_pair import get_all_auto_sync_cc_pairs
|
||||
from ee.onyx.db.document import upsert_document_external_perms
|
||||
@@ -32,17 +31,12 @@ from onyx.configs.constants import OnyxCeleryPriority
|
||||
from onyx.configs.constants import OnyxCeleryQueues
|
||||
from onyx.configs.constants import OnyxCeleryTask
|
||||
from onyx.configs.constants import OnyxRedisLocks
|
||||
from onyx.db.connector import mark_cc_pair_as_permissions_synced
|
||||
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
|
||||
from onyx.db.document import upsert_document_by_connector_credential_pair
|
||||
from onyx.db.engine import get_session_with_tenant
|
||||
from onyx.db.enums import AccessType
|
||||
from onyx.db.enums import ConnectorCredentialPairStatus
|
||||
from onyx.db.enums import SyncStatus
|
||||
from onyx.db.enums import SyncType
|
||||
from onyx.db.models import ConnectorCredentialPair
|
||||
from onyx.db.sync_record import insert_sync_record
|
||||
from onyx.db.sync_record import update_sync_record_status
|
||||
from onyx.db.users import batch_add_ext_perm_user_if_not_exists
|
||||
from onyx.redis.redis_connector import RedisConnector
|
||||
from onyx.redis.redis_connector_doc_perm_sync import (
|
||||
@@ -63,9 +57,6 @@ LIGHT_SOFT_TIME_LIMIT = 105
|
||||
LIGHT_TIME_LIMIT = LIGHT_SOFT_TIME_LIMIT + 15
|
||||
|
||||
|
||||
"""Jobs / utils for kicking off doc permissions sync tasks."""
|
||||
|
||||
|
||||
def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> bool:
|
||||
"""Returns boolean indicating if external doc permissions sync is due."""
|
||||
|
||||
@@ -183,15 +174,6 @@ def try_creating_permissions_sync_task(
|
||||
|
||||
custom_task_id = f"{redis_connector.permissions.generator_task_key}_{uuid4()}"
|
||||
|
||||
# create before setting fence to avoid race condition where the monitoring
|
||||
# task updates the sync record before it is created
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
insert_sync_record(
|
||||
db_session=db_session,
|
||||
entity_id=cc_pair_id,
|
||||
sync_type=SyncType.EXTERNAL_PERMISSIONS,
|
||||
)
|
||||
|
||||
# set a basic fence to start
|
||||
payload = RedisConnectorPermissionSyncPayload(started=None, celery_task_id=None)
|
||||
redis_connector.permissions.set_fence(payload)
|
||||
@@ -418,53 +400,3 @@ def update_external_document_permissions_task(
|
||||
f"Error Syncing Document Permissions: connector_id={connector_id} doc_id={doc_id}"
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
"""Monitoring CCPair permissions utils, called in monitor_vespa_sync"""
|
||||
|
||||
|
||||
def monitor_ccpair_permissions_taskset(
|
||||
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
|
||||
) -> None:
|
||||
fence_key = key_bytes.decode("utf-8")
|
||||
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
|
||||
if cc_pair_id_str is None:
|
||||
task_logger.warning(
|
||||
f"monitor_ccpair_permissions_taskset: could not parse cc_pair_id from {fence_key}"
|
||||
)
|
||||
return
|
||||
|
||||
cc_pair_id = int(cc_pair_id_str)
|
||||
|
||||
redis_connector = RedisConnector(tenant_id, cc_pair_id)
|
||||
if not redis_connector.permissions.fenced:
|
||||
return
|
||||
|
||||
initial = redis_connector.permissions.generator_complete
|
||||
if initial is None:
|
||||
return
|
||||
|
||||
remaining = redis_connector.permissions.get_remaining()
|
||||
task_logger.info(
|
||||
f"Permissions sync progress: cc_pair={cc_pair_id} remaining={remaining} initial={initial}"
|
||||
)
|
||||
if remaining > 0:
|
||||
return
|
||||
|
||||
payload: RedisConnectorPermissionSyncPayload | None = (
|
||||
redis_connector.permissions.payload
|
||||
)
|
||||
start_time: datetime | None = payload.started if payload else None
|
||||
|
||||
mark_cc_pair_as_permissions_synced(db_session, int(cc_pair_id), start_time)
|
||||
task_logger.info(f"Successfully synced permissions for cc_pair={cc_pair_id}")
|
||||
|
||||
update_sync_record_status(
|
||||
db_session=db_session,
|
||||
entity_id=cc_pair_id,
|
||||
sync_type=SyncType.EXTERNAL_PERMISSIONS,
|
||||
sync_status=SyncStatus.SUCCESS,
|
||||
num_docs_synced=initial,
|
||||
)
|
||||
|
||||
redis_connector.permissions.reset()
|
||||
|
||||
@@ -33,11 +33,7 @@ from onyx.db.connector_credential_pair import get_connector_credential_pair_from
|
||||
from onyx.db.engine import get_session_with_tenant
|
||||
from onyx.db.enums import AccessType
|
||||
from onyx.db.enums import ConnectorCredentialPairStatus
|
||||
from onyx.db.enums import SyncStatus
|
||||
from onyx.db.enums import SyncType
|
||||
from onyx.db.models import ConnectorCredentialPair
|
||||
from onyx.db.sync_record import insert_sync_record
|
||||
from onyx.db.sync_record import update_sync_record_status
|
||||
from onyx.redis.redis_connector import RedisConnector
|
||||
from onyx.redis.redis_connector_ext_group_sync import (
|
||||
RedisConnectorExternalGroupSyncPayload,
|
||||
@@ -204,15 +200,6 @@ def try_creating_external_group_sync_task(
|
||||
celery_task_id=result.id,
|
||||
)
|
||||
|
||||
# create before setting fence to avoid race condition where the monitoring
|
||||
# task updates the sync record before it is created
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
insert_sync_record(
|
||||
db_session=db_session,
|
||||
entity_id=cc_pair_id,
|
||||
sync_type=SyncType.EXTERNAL_GROUP,
|
||||
)
|
||||
|
||||
redis_connector.external_group_sync.set_fence(payload)
|
||||
|
||||
except Exception:
|
||||
@@ -302,26 +289,11 @@ def connector_external_group_sync_generator_task(
|
||||
)
|
||||
|
||||
mark_cc_pair_as_external_group_synced(db_session, cc_pair.id)
|
||||
|
||||
update_sync_record_status(
|
||||
db_session=db_session,
|
||||
entity_id=cc_pair_id,
|
||||
sync_type=SyncType.EXTERNAL_GROUP,
|
||||
sync_status=SyncStatus.SUCCESS,
|
||||
)
|
||||
except Exception as e:
|
||||
task_logger.exception(
|
||||
f"Failed to run external group sync: cc_pair={cc_pair_id}"
|
||||
)
|
||||
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
update_sync_record_status(
|
||||
db_session=db_session,
|
||||
entity_id=cc_pair_id,
|
||||
sync_type=SyncType.EXTERNAL_GROUP,
|
||||
sync_status=SyncStatus.FAILED,
|
||||
)
|
||||
|
||||
redis_connector.external_group_sync.generator_clear()
|
||||
redis_connector.external_group_sync.taskset_clear()
|
||||
raise e
|
||||
|
||||
@@ -58,11 +58,6 @@ _SYNC_START_LATENCY_KEY_FMT = (
|
||||
"sync_start_latency:{sync_type}:{entity_id}:{sync_record_id}"
|
||||
)
|
||||
|
||||
_CONNECTOR_START_TIME_KEY_FMT = "connector_start_time:{cc_pair_id}:{index_attempt_id}"
|
||||
_CONNECTOR_END_TIME_KEY_FMT = "connector_end_time:{cc_pair_id}:{index_attempt_id}"
|
||||
_SYNC_START_TIME_KEY_FMT = "sync_start_time:{sync_type}:{entity_id}:{sync_record_id}"
|
||||
_SYNC_END_TIME_KEY_FMT = "sync_end_time:{sync_type}:{entity_id}:{sync_record_id}"
|
||||
|
||||
|
||||
def _mark_metric_as_emitted(redis_std: Redis, key: str) -> None:
|
||||
"""Mark a metric as having been emitted by setting a Redis key with expiration"""
|
||||
@@ -308,6 +303,8 @@ def _build_connector_final_metrics(
|
||||
)
|
||||
)
|
||||
|
||||
_mark_metric_as_emitted(redis_std, metric_key)
|
||||
|
||||
return metrics
|
||||
|
||||
|
||||
@@ -347,52 +344,6 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me
|
||||
if one_hour_ago > most_recent_attempt.time_created:
|
||||
continue
|
||||
|
||||
# Build a job_id for correlation
|
||||
job_id = build_job_id(
|
||||
"connector", str(cc_pair.id), str(most_recent_attempt.id)
|
||||
)
|
||||
|
||||
# Add raw start time metric if available
|
||||
if most_recent_attempt.time_started:
|
||||
start_time_key = _CONNECTOR_START_TIME_KEY_FMT.format(
|
||||
cc_pair_id=cc_pair.id,
|
||||
index_attempt_id=most_recent_attempt.id,
|
||||
)
|
||||
metrics.append(
|
||||
Metric(
|
||||
key=start_time_key,
|
||||
name="connector_start_time",
|
||||
value=most_recent_attempt.time_started.timestamp(),
|
||||
tags={
|
||||
"job_id": job_id,
|
||||
"connector_id": str(cc_pair.connector.id),
|
||||
"source": str(cc_pair.connector.source),
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# Add raw end time metric if available and in terminal state
|
||||
if (
|
||||
most_recent_attempt.status.is_terminal()
|
||||
and most_recent_attempt.time_updated
|
||||
):
|
||||
end_time_key = _CONNECTOR_END_TIME_KEY_FMT.format(
|
||||
cc_pair_id=cc_pair.id,
|
||||
index_attempt_id=most_recent_attempt.id,
|
||||
)
|
||||
metrics.append(
|
||||
Metric(
|
||||
key=end_time_key,
|
||||
name="connector_end_time",
|
||||
value=most_recent_attempt.time_updated.timestamp(),
|
||||
tags={
|
||||
"job_id": job_id,
|
||||
"connector_id": str(cc_pair.connector.id),
|
||||
"source": str(cc_pair.connector.source),
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# Connector start latency
|
||||
start_latency_metric = _build_connector_start_latency_metric(
|
||||
cc_pair, most_recent_attempt, second_most_recent_attempt, redis_std
|
||||
@@ -414,10 +365,9 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
|
||||
"""
|
||||
Collect metrics for document set and group syncing:
|
||||
- Success/failure status
|
||||
- Start latency (for doc sets / user groups)
|
||||
- Start latency (always)
|
||||
- Duration & doc count (only if success)
|
||||
- Throughput (docs/min) (only if success)
|
||||
- Raw start/end times for each sync
|
||||
"""
|
||||
one_hour_ago = get_db_current_time(db_session) - timedelta(hours=1)
|
||||
|
||||
@@ -439,43 +389,6 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
|
||||
# Build a job_id for correlation
|
||||
job_id = build_job_id("sync_record", str(sync_record.id))
|
||||
|
||||
# Add raw start time metric
|
||||
start_time_key = _SYNC_START_TIME_KEY_FMT.format(
|
||||
sync_type=sync_record.sync_type,
|
||||
entity_id=sync_record.entity_id,
|
||||
sync_record_id=sync_record.id,
|
||||
)
|
||||
metrics.append(
|
||||
Metric(
|
||||
key=start_time_key,
|
||||
name="sync_start_time",
|
||||
value=sync_record.sync_start_time.timestamp(),
|
||||
tags={
|
||||
"job_id": job_id,
|
||||
"sync_type": str(sync_record.sync_type),
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# Add raw end time metric if available
|
||||
if sync_record.sync_end_time:
|
||||
end_time_key = _SYNC_END_TIME_KEY_FMT.format(
|
||||
sync_type=sync_record.sync_type,
|
||||
entity_id=sync_record.entity_id,
|
||||
sync_record_id=sync_record.id,
|
||||
)
|
||||
metrics.append(
|
||||
Metric(
|
||||
key=end_time_key,
|
||||
name="sync_end_time",
|
||||
value=sync_record.sync_end_time.timestamp(),
|
||||
tags={
|
||||
"job_id": job_id,
|
||||
"sync_type": str(sync_record.sync_type),
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# Emit a SUCCESS/FAIL boolean metric
|
||||
# Use a single Redis key to avoid re-emitting final metrics
|
||||
final_metric_key = _FINAL_METRIC_KEY_FMT.format(
|
||||
@@ -526,7 +439,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
|
||||
if duration_seconds is not None:
|
||||
metrics.append(
|
||||
Metric(
|
||||
key=final_metric_key,
|
||||
key=None,
|
||||
name="sync_duration_seconds",
|
||||
value=duration_seconds,
|
||||
tags={
|
||||
@@ -542,7 +455,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
|
||||
|
||||
metrics.append(
|
||||
Metric(
|
||||
key=final_metric_key,
|
||||
key=None,
|
||||
name="sync_doc_count",
|
||||
value=doc_count,
|
||||
tags={
|
||||
@@ -555,7 +468,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
|
||||
if sync_speed is not None:
|
||||
metrics.append(
|
||||
Metric(
|
||||
key=final_metric_key,
|
||||
key=None,
|
||||
name="sync_speed_docs_per_min",
|
||||
value=sync_speed,
|
||||
tags={
|
||||
@@ -569,6 +482,9 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
|
||||
f"Invalid sync record {sync_record.id} with no duration"
|
||||
)
|
||||
|
||||
# Mark final metrics as emitted so we don't re-emit
|
||||
_mark_metric_as_emitted(redis_std, final_metric_key)
|
||||
|
||||
# Emit start latency
|
||||
start_latency_key = _SYNC_START_LATENCY_KEY_FMT.format(
|
||||
sync_type=sync_record.sync_type,
|
||||
@@ -586,20 +502,22 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
|
||||
entity = db_session.scalar(
|
||||
select(UserGroup).where(UserGroup.id == sync_record.entity_id)
|
||||
)
|
||||
else:
|
||||
task_logger.info(
|
||||
f"Skipping sync record {sync_record.id} of type {sync_record.sync_type}."
|
||||
)
|
||||
continue
|
||||
|
||||
if entity is None:
|
||||
task_logger.error(
|
||||
f"Sync record of type {sync_record.sync_type} doesn't have an entity "
|
||||
f"associated with it (id={sync_record.entity_id}). Skipping start latency metric."
|
||||
f"Could not find entity for sync record {sync_record.id} "
|
||||
f"(type={sync_record.sync_type}, id={sync_record.entity_id})."
|
||||
)
|
||||
continue
|
||||
|
||||
# Calculate start latency in seconds:
|
||||
# (actual sync start) - (last modified time)
|
||||
if (
|
||||
entity is not None
|
||||
and entity.time_last_modified_by_user
|
||||
and sync_record.sync_start_time
|
||||
):
|
||||
if entity.time_last_modified_by_user and sync_record.sync_start_time:
|
||||
start_latency = (
|
||||
sync_record.sync_start_time - entity.time_last_modified_by_user
|
||||
).total_seconds()
|
||||
@@ -623,6 +541,8 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
|
||||
)
|
||||
)
|
||||
|
||||
_mark_metric_as_emitted(redis_std, start_latency_key)
|
||||
|
||||
return metrics
|
||||
|
||||
|
||||
@@ -687,12 +607,9 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None:
|
||||
for metric_fn in metric_functions:
|
||||
metrics = metric_fn()
|
||||
for metric in metrics:
|
||||
# double check to make sure we aren't double-emitting metrics
|
||||
if metric.key is not None and not _has_metric_been_emitted(
|
||||
redis_std, metric.key
|
||||
):
|
||||
metric.log()
|
||||
metric.emit(tenant_id)
|
||||
metric.log()
|
||||
metric.emit(tenant_id)
|
||||
if metric.key:
|
||||
_mark_metric_as_emitted(redis_std, metric.key)
|
||||
|
||||
task_logger.info("Successfully collected background metrics")
|
||||
|
||||
@@ -25,18 +25,13 @@ from onyx.configs.constants import OnyxCeleryTask
|
||||
from onyx.configs.constants import OnyxRedisLocks
|
||||
from onyx.connectors.factory import instantiate_connector
|
||||
from onyx.connectors.models import InputType
|
||||
from onyx.db.connector import mark_ccpair_as_pruned
|
||||
from onyx.db.connector_credential_pair import get_connector_credential_pair
|
||||
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
|
||||
from onyx.db.connector_credential_pair import get_connector_credential_pairs
|
||||
from onyx.db.document import get_documents_for_connector_credential_pair
|
||||
from onyx.db.engine import get_session_with_tenant
|
||||
from onyx.db.enums import ConnectorCredentialPairStatus
|
||||
from onyx.db.enums import SyncStatus
|
||||
from onyx.db.enums import SyncType
|
||||
from onyx.db.models import ConnectorCredentialPair
|
||||
from onyx.db.sync_record import insert_sync_record
|
||||
from onyx.db.sync_record import update_sync_record_status
|
||||
from onyx.redis.redis_connector import RedisConnector
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.utils.logger import pruning_ctx
|
||||
@@ -45,9 +40,6 @@ from onyx.utils.logger import setup_logger
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
"""Jobs / utils for kicking off pruning tasks."""
|
||||
|
||||
|
||||
def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool:
|
||||
"""Returns boolean indicating if pruning is due.
|
||||
|
||||
@@ -212,14 +204,6 @@ def try_creating_prune_generator_task(
|
||||
priority=OnyxCeleryPriority.LOW,
|
||||
)
|
||||
|
||||
# create before setting fence to avoid race condition where the monitoring
|
||||
# task updates the sync record before it is created
|
||||
insert_sync_record(
|
||||
db_session=db_session,
|
||||
entity_id=cc_pair.id,
|
||||
sync_type=SyncType.PRUNING,
|
||||
)
|
||||
|
||||
# set this only after all tasks have been added
|
||||
redis_connector.prune.set_fence(True)
|
||||
except Exception:
|
||||
@@ -364,52 +348,3 @@ def connector_pruning_generator_task(
|
||||
lock.release()
|
||||
|
||||
task_logger.info(f"Pruning generator finished: cc_pair={cc_pair_id}")
|
||||
|
||||
|
||||
"""Monitoring pruning utils, called in monitor_vespa_sync"""
|
||||
|
||||
|
||||
def monitor_ccpair_pruning_taskset(
|
||||
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
|
||||
) -> None:
|
||||
fence_key = key_bytes.decode("utf-8")
|
||||
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
|
||||
if cc_pair_id_str is None:
|
||||
task_logger.warning(
|
||||
f"monitor_ccpair_pruning_taskset: could not parse cc_pair_id from {fence_key}"
|
||||
)
|
||||
return
|
||||
|
||||
cc_pair_id = int(cc_pair_id_str)
|
||||
|
||||
redis_connector = RedisConnector(tenant_id, cc_pair_id)
|
||||
if not redis_connector.prune.fenced:
|
||||
return
|
||||
|
||||
initial = redis_connector.prune.generator_complete
|
||||
if initial is None:
|
||||
return
|
||||
|
||||
remaining = redis_connector.prune.get_remaining()
|
||||
task_logger.info(
|
||||
f"Connector pruning progress: cc_pair={cc_pair_id} remaining={remaining} initial={initial}"
|
||||
)
|
||||
if remaining > 0:
|
||||
return
|
||||
|
||||
mark_ccpair_as_pruned(int(cc_pair_id), db_session)
|
||||
task_logger.info(
|
||||
f"Successfully pruned connector credential pair. cc_pair={cc_pair_id}"
|
||||
)
|
||||
|
||||
update_sync_record_status(
|
||||
db_session=db_session,
|
||||
entity_id=cc_pair_id,
|
||||
sync_type=SyncType.PRUNING,
|
||||
sync_status=SyncStatus.SUCCESS,
|
||||
num_docs_synced=initial,
|
||||
)
|
||||
|
||||
redis_connector.prune.taskset_clear()
|
||||
redis_connector.prune.generator_clear()
|
||||
redis_connector.prune.set_fence(False)
|
||||
|
||||
@@ -24,10 +24,6 @@ from onyx.access.access import get_access_for_document
|
||||
from onyx.background.celery.apps.app_base import task_logger
|
||||
from onyx.background.celery.celery_redis import celery_get_queue_length
|
||||
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
|
||||
from onyx.background.celery.tasks.doc_permission_syncing.tasks import (
|
||||
monitor_ccpair_permissions_taskset,
|
||||
)
|
||||
from onyx.background.celery.tasks.pruning.tasks import monitor_ccpair_pruning_taskset
|
||||
from onyx.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex
|
||||
from onyx.background.celery.tasks.shared.tasks import LIGHT_SOFT_TIME_LIMIT
|
||||
from onyx.background.celery.tasks.shared.tasks import LIGHT_TIME_LIMIT
|
||||
@@ -38,6 +34,8 @@ from onyx.configs.constants import OnyxCeleryQueues
|
||||
from onyx.configs.constants import OnyxCeleryTask
|
||||
from onyx.configs.constants import OnyxRedisLocks
|
||||
from onyx.db.connector import fetch_connector_by_id
|
||||
from onyx.db.connector import mark_cc_pair_as_permissions_synced
|
||||
from onyx.db.connector import mark_ccpair_as_pruned
|
||||
from onyx.db.connector_credential_pair import add_deletion_failure_message
|
||||
from onyx.db.connector_credential_pair import (
|
||||
delete_connector_credential_pair__no_commit,
|
||||
@@ -74,6 +72,9 @@ from onyx.redis.redis_connector import RedisConnector
|
||||
from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
|
||||
from onyx.redis.redis_connector_delete import RedisConnectorDelete
|
||||
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
|
||||
from onyx.redis.redis_connector_doc_perm_sync import (
|
||||
RedisConnectorPermissionSyncPayload,
|
||||
)
|
||||
from onyx.redis.redis_connector_index import RedisConnectorIndex
|
||||
from onyx.redis.redis_connector_prune import RedisConnectorPrune
|
||||
from onyx.redis.redis_document_set import RedisDocumentSet
|
||||
@@ -652,6 +653,83 @@ def monitor_connector_deletion_taskset(
|
||||
redis_connector.delete.reset()
|
||||
|
||||
|
||||
def monitor_ccpair_pruning_taskset(
|
||||
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
|
||||
) -> None:
|
||||
fence_key = key_bytes.decode("utf-8")
|
||||
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
|
||||
if cc_pair_id_str is None:
|
||||
task_logger.warning(
|
||||
f"monitor_ccpair_pruning_taskset: could not parse cc_pair_id from {fence_key}"
|
||||
)
|
||||
return
|
||||
|
||||
cc_pair_id = int(cc_pair_id_str)
|
||||
|
||||
redis_connector = RedisConnector(tenant_id, cc_pair_id)
|
||||
if not redis_connector.prune.fenced:
|
||||
return
|
||||
|
||||
initial = redis_connector.prune.generator_complete
|
||||
if initial is None:
|
||||
return
|
||||
|
||||
remaining = redis_connector.prune.get_remaining()
|
||||
task_logger.info(
|
||||
f"Connector pruning progress: cc_pair={cc_pair_id} remaining={remaining} initial={initial}"
|
||||
)
|
||||
if remaining > 0:
|
||||
return
|
||||
|
||||
mark_ccpair_as_pruned(int(cc_pair_id), db_session)
|
||||
task_logger.info(
|
||||
f"Successfully pruned connector credential pair. cc_pair={cc_pair_id}"
|
||||
)
|
||||
|
||||
redis_connector.prune.taskset_clear()
|
||||
redis_connector.prune.generator_clear()
|
||||
redis_connector.prune.set_fence(False)
|
||||
|
||||
|
||||
def monitor_ccpair_permissions_taskset(
|
||||
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
|
||||
) -> None:
|
||||
fence_key = key_bytes.decode("utf-8")
|
||||
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
|
||||
if cc_pair_id_str is None:
|
||||
task_logger.warning(
|
||||
f"monitor_ccpair_permissions_taskset: could not parse cc_pair_id from {fence_key}"
|
||||
)
|
||||
return
|
||||
|
||||
cc_pair_id = int(cc_pair_id_str)
|
||||
|
||||
redis_connector = RedisConnector(tenant_id, cc_pair_id)
|
||||
if not redis_connector.permissions.fenced:
|
||||
return
|
||||
|
||||
initial = redis_connector.permissions.generator_complete
|
||||
if initial is None:
|
||||
return
|
||||
|
||||
remaining = redis_connector.permissions.get_remaining()
|
||||
task_logger.info(
|
||||
f"Permissions sync progress: cc_pair={cc_pair_id} remaining={remaining} initial={initial}"
|
||||
)
|
||||
if remaining > 0:
|
||||
return
|
||||
|
||||
payload: RedisConnectorPermissionSyncPayload | None = (
|
||||
redis_connector.permissions.payload
|
||||
)
|
||||
start_time: datetime | None = payload.started if payload else None
|
||||
|
||||
mark_cc_pair_as_permissions_synced(db_session, int(cc_pair_id), start_time)
|
||||
task_logger.info(f"Successfully synced permissions for cc_pair={cc_pair_id}")
|
||||
|
||||
redis_connector.permissions.reset()
|
||||
|
||||
|
||||
def monitor_ccpair_indexing_taskset(
|
||||
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
|
||||
) -> None:
|
||||
|
||||
@@ -127,6 +127,13 @@ class SharepointConnector(LoadConnector, PollConnector):
|
||||
start: datetime | None = None,
|
||||
end: datetime | None = None,
|
||||
) -> list[tuple[DriveItem, str]]:
|
||||
filter_str = ""
|
||||
if start is not None and end is not None:
|
||||
filter_str = (
|
||||
f"last_modified_datetime ge {start.isoformat()} and "
|
||||
f"last_modified_datetime le {end.isoformat()}"
|
||||
)
|
||||
|
||||
final_driveitems: list[tuple[DriveItem, str]] = []
|
||||
try:
|
||||
site = self.graph_client.sites.get_by_url(site_descriptor.url)
|
||||
@@ -160,10 +167,9 @@ class SharepointConnector(LoadConnector, PollConnector):
|
||||
root_folder = root_folder.get_by_path(folder_part)
|
||||
|
||||
# Get all items recursively
|
||||
query = root_folder.get_files(
|
||||
recursive=True,
|
||||
page_size=1000,
|
||||
)
|
||||
query = root_folder.get_files(True, 1000)
|
||||
if filter_str:
|
||||
query = query.filter(filter_str)
|
||||
driveitems = query.execute_query()
|
||||
logger.debug(
|
||||
f"Found {len(driveitems)} items in drive '{drive.name}'"
|
||||
@@ -174,12 +180,11 @@ class SharepointConnector(LoadConnector, PollConnector):
|
||||
"Shared Documents" if drive.name == "Documents" else drive.name
|
||||
)
|
||||
|
||||
# Filter items based on folder path if specified
|
||||
if site_descriptor.folder_path:
|
||||
# Filter items to ensure they're in the specified folder or its subfolders
|
||||
# The path will be in format: /drives/{drive_id}/root:/folder/path
|
||||
driveitems = [
|
||||
item
|
||||
filtered_driveitems = [
|
||||
(item, drive_name)
|
||||
for item in driveitems
|
||||
if any(
|
||||
path_part == site_descriptor.folder_path
|
||||
@@ -191,7 +196,7 @@ class SharepointConnector(LoadConnector, PollConnector):
|
||||
)[1].split("/")
|
||||
)
|
||||
]
|
||||
if len(driveitems) == 0:
|
||||
if len(filtered_driveitems) == 0:
|
||||
all_paths = [
|
||||
item.parent_reference.path for item in driveitems
|
||||
]
|
||||
@@ -199,23 +204,11 @@ class SharepointConnector(LoadConnector, PollConnector):
|
||||
f"Nothing found for folder '{site_descriptor.folder_path}' "
|
||||
f"in; any of valid paths: {all_paths}"
|
||||
)
|
||||
|
||||
# Filter items based on time window if specified
|
||||
if start is not None and end is not None:
|
||||
driveitems = [
|
||||
item
|
||||
for item in driveitems
|
||||
if start
|
||||
<= item.last_modified_datetime.replace(tzinfo=timezone.utc)
|
||||
<= end
|
||||
]
|
||||
logger.debug(
|
||||
f"Found {len(driveitems)} items within time window in drive '{drive.name}'"
|
||||
final_driveitems.extend(filtered_driveitems)
|
||||
else:
|
||||
final_driveitems.extend(
|
||||
[(item, drive_name) for item in driveitems]
|
||||
)
|
||||
|
||||
for item in driveitems:
|
||||
final_driveitems.append((item, drive_name))
|
||||
|
||||
except Exception as e:
|
||||
# Some drives might not be accessible
|
||||
logger.warning(f"Failed to process drive: {str(e)}")
|
||||
|
||||
@@ -28,9 +28,6 @@ class SyncType(str, PyEnum):
|
||||
DOCUMENT_SET = "document_set"
|
||||
USER_GROUP = "user_group"
|
||||
CONNECTOR_DELETION = "connector_deletion"
|
||||
PRUNING = "pruning" # not really a sync, but close enough
|
||||
EXTERNAL_PERMISSIONS = "external_permissions"
|
||||
EXTERNAL_GROUP = "external_group"
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.value
|
||||
|
||||
@@ -11,7 +11,7 @@ from sqlalchemy import Select
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import update
|
||||
from sqlalchemy.orm import aliased
|
||||
from sqlalchemy.orm import selectinload
|
||||
from sqlalchemy.orm import joinedload
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.auth.schemas import UserRole
|
||||
@@ -291,9 +291,8 @@ def get_personas_for_user(
|
||||
include_deleted: bool = False,
|
||||
joinedload_all: bool = False,
|
||||
) -> Sequence[Persona]:
|
||||
stmt = select(Persona)
|
||||
stmt = _add_user_filters(stmt, user, get_editable)
|
||||
|
||||
stmt = select(Persona).distinct()
|
||||
stmt = _add_user_filters(stmt=stmt, user=user, get_editable=get_editable)
|
||||
if not include_default:
|
||||
stmt = stmt.where(Persona.builtin_persona.is_(False))
|
||||
if not include_slack_bot_personas:
|
||||
@@ -303,16 +302,14 @@ def get_personas_for_user(
|
||||
|
||||
if joinedload_all:
|
||||
stmt = stmt.options(
|
||||
selectinload(Persona.prompts),
|
||||
selectinload(Persona.tools),
|
||||
selectinload(Persona.document_sets),
|
||||
selectinload(Persona.groups),
|
||||
selectinload(Persona.users),
|
||||
selectinload(Persona.labels),
|
||||
joinedload(Persona.prompts),
|
||||
joinedload(Persona.tools),
|
||||
joinedload(Persona.document_sets),
|
||||
joinedload(Persona.groups),
|
||||
joinedload(Persona.users),
|
||||
)
|
||||
|
||||
results = db_session.execute(stmt).scalars().all()
|
||||
return results
|
||||
return db_session.execute(stmt).unique().scalars().all()
|
||||
|
||||
|
||||
def get_personas(db_session: Session) -> Sequence[Persona]:
|
||||
|
||||
@@ -92,7 +92,7 @@ class RedisConnectorPrune:
|
||||
if fence_bytes is None:
|
||||
return None
|
||||
|
||||
fence_int = int(cast(bytes, fence_bytes))
|
||||
fence_int = cast(int, fence_bytes)
|
||||
return fence_int
|
||||
|
||||
@generator_complete.setter
|
||||
|
||||
@@ -176,35 +176,3 @@ def test_sharepoint_connector_other_library(
|
||||
for expected in expected_documents:
|
||||
doc = find_document(found_documents, expected.semantic_identifier)
|
||||
verify_document_content(doc, expected)
|
||||
|
||||
|
||||
def test_sharepoint_connector_poll(
|
||||
mock_get_unstructured_api_key: MagicMock,
|
||||
sharepoint_credentials: dict[str, str],
|
||||
) -> None:
|
||||
# Initialize connector with the base site URL
|
||||
connector = SharepointConnector(
|
||||
sites=["https://danswerai.sharepoint.com/sites/sharepoint-tests"]
|
||||
)
|
||||
|
||||
# Load credentials
|
||||
connector.load_credentials(sharepoint_credentials)
|
||||
|
||||
# Set time window to only capture test1.docx (modified at 2025-01-28 20:51:42+00:00)
|
||||
start = datetime(2025, 1, 28, 20, 51, 30, tzinfo=timezone.utc) # 12 seconds before
|
||||
end = datetime(2025, 1, 28, 20, 51, 50, tzinfo=timezone.utc) # 8 seconds after
|
||||
|
||||
# Get documents within the time window
|
||||
document_batches = list(connector._fetch_from_sharepoint(start=start, end=end))
|
||||
found_documents: list[Document] = [
|
||||
doc for batch in document_batches for doc in batch
|
||||
]
|
||||
|
||||
# Should only find test1.docx
|
||||
assert len(found_documents) == 1, "Should only find one document in the time window"
|
||||
doc = found_documents[0]
|
||||
assert doc.semantic_identifier == "test1.docx"
|
||||
verify_document_metadata(doc)
|
||||
verify_document_content(
|
||||
doc, [d for d in EXPECTED_DOCUMENTS if d.semantic_identifier == "test1.docx"][0]
|
||||
)
|
||||
|
||||
@@ -232,9 +232,11 @@ export function AssistantEditor({
|
||||
existingPersona?.llm_model_provider_override ?? null,
|
||||
llm_model_version_override:
|
||||
existingPersona?.llm_model_version_override ?? null,
|
||||
starter_messages: existingPersona?.starter_messages?.length
|
||||
? existingPersona.starter_messages
|
||||
: [{ message: "" }],
|
||||
starter_messages: existingPersona?.starter_messages ?? [
|
||||
{
|
||||
message: "",
|
||||
},
|
||||
],
|
||||
enabled_tools_map: enabledToolsMap,
|
||||
icon_color: existingPersona?.icon_color ?? defautIconColor,
|
||||
icon_shape: existingPersona?.icon_shape ?? defaultIconShape,
|
||||
@@ -1097,9 +1099,7 @@ export function AssistantEditor({
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<Separator />
|
||||
|
||||
<div className="w-full flex flex-col">
|
||||
<div className="flex gap-x-2 items-center">
|
||||
<div className="block font-medium text-sm">
|
||||
@@ -1110,7 +1110,6 @@ export function AssistantEditor({
|
||||
<SubLabel>
|
||||
Sample messages that help users understand what this
|
||||
assistant can do and how to interact with it effectively.
|
||||
New input fields will appear automatically as you type.
|
||||
</SubLabel>
|
||||
|
||||
<div className="w-full">
|
||||
|
||||
@@ -64,16 +64,19 @@ export default function StarterMessagesList({
|
||||
size="icon"
|
||||
onClick={() => {
|
||||
arrayHelpers.remove(index);
|
||||
if (
|
||||
index === values.length - 2 &&
|
||||
!values[values.length - 1].message
|
||||
) {
|
||||
arrayHelpers.pop();
|
||||
}
|
||||
}}
|
||||
className={`text-gray-400 hover:text-red-500 ${
|
||||
index === values.length - 1 && !starterMessage.message
|
||||
? "opacity-50 cursor-not-allowed"
|
||||
: ""
|
||||
}`}
|
||||
disabled={
|
||||
(index === values.length - 1 && !starterMessage.message) ||
|
||||
(values.length === 1 && index === 0) // should never happen, but just in case
|
||||
}
|
||||
disabled={index === values.length - 1 && !starterMessage.message}
|
||||
>
|
||||
<FiTrash2 className="h-4 w-4" />
|
||||
</Button>
|
||||
|
||||
@@ -275,7 +275,6 @@ export const HistorySidebar = forwardRef<HTMLDivElement, HistorySidebarProps>(
|
||||
flex-col relative
|
||||
h-screen
|
||||
pt-2
|
||||
|
||||
transition-transform
|
||||
`}
|
||||
>
|
||||
|
||||
Reference in New Issue
Block a user