Compare commits

...

6 Commits

Author SHA1 Message Date
pablodanswer
691cfc5f92 update 2025-01-22 14:17:49 -08:00
pablodanswer
0fec9efb2b va 2025-01-22 14:05:06 -08:00
pablodanswer
e9ee8933ac temporarily turn off 2025-01-22 13:38:16 -08:00
pablodanswer
b930259054 remove prints 2025-01-22 13:23:40 -08:00
pablodanswer
7eaf5d2f01 update conig 2025-01-22 13:23:25 -08:00
pablodanswer
4681fbca11 add logs + update dev script 2025-01-22 13:22:16 -08:00
4 changed files with 65 additions and 15 deletions

View File

@@ -83,7 +83,7 @@ tasks_to_schedule = [
"task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES,
"schedule": timedelta(minutes=5),
"options": {
"priority": OnyxCeleryPriority.LOW,
"priority": OnyxCeleryPriority.HIGH,
"expires": BEAT_EXPIRES_DEFAULT,
"queue": OnyxCeleryQueues.MONITORING,
},

View File

@@ -162,6 +162,7 @@ def _build_connector_start_latency_metric(
# Connector start latency
# first run case - we should start as soon as it's created
if not second_most_recent_attempt:
task_logger.info("No second most recent attempt, using connector creation time")
desired_start_time = cc_pair.connector.time_created
else:
if not cc_pair.connector.refresh_freq:
@@ -171,12 +172,25 @@ def _build_connector_start_latency_metric(
)
return None
task_logger.info(
"Using second most recent attempt time: "
f"{second_most_recent_attempt.time_updated} with id: {second_most_recent_attempt.id}"
)
task_logger.info(
f"Using connector refresh freq: {cc_pair.connector.refresh_freq}"
)
desired_start_time = second_most_recent_attempt.time_updated + timedelta(
seconds=cc_pair.connector.refresh_freq
)
task_logger.info(f"Desired start time: {desired_start_time}")
start_latency = (recent_attempt.time_started - desired_start_time).total_seconds()
task_logger.info(
f"Calculated start latency for index attempt {recent_attempt.id}: {start_latency} seconds"
)
return Metric(
key=metric_key,
name="connector_start_latency",
@@ -210,6 +224,9 @@ def _build_run_success_metrics(
IndexingStatus.FAILED,
IndexingStatus.CANCELED,
]:
task_logger.info(
f"Adding run success metric for index attempt {attempt.id} with status {attempt.status}"
)
metrics.append(
Metric(
key=metric_key,
@@ -235,11 +252,9 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me
# Get all attempts in the last hour
recent_attempts = (
db_session.query(IndexAttempt)
.filter(
IndexAttempt.connector_credential_pair_id == cc_pair.id,
IndexAttempt.time_created >= one_hour_ago,
)
.filter(IndexAttempt.connector_credential_pair_id == cc_pair.id)
.order_by(IndexAttempt.time_created.desc())
.limit(2)
.all()
)
most_recent_attempt = recent_attempts[0] if recent_attempts else None
@@ -248,7 +263,10 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me
)
# if no metric to emit, skip
if most_recent_attempt is None:
if (
most_recent_attempt is None
or one_hour_ago > most_recent_attempt.time_created
):
continue
# Connector start latency
@@ -291,7 +309,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
f"{sync_record.entity_id}:{sync_record.id}"
)
if _has_metric_been_emitted(redis_std, metric_key):
task_logger.debug(
task_logger.info(
f"Skipping metric for sync record {sync_record.id} "
"because it has already been emitted"
)
@@ -311,11 +329,15 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
if sync_speed is None:
task_logger.error(
"Something went wrong with sync speed calculation. "
f"Sync record: {sync_record.id}"
f"Something went wrong with sync speed calculation. "
f"Sync record: {sync_record.id}, duration: {sync_duration_mins}, "
f"docs synced: {sync_record.num_docs_synced}"
)
continue
task_logger.info(
f"Calculated sync speed for record {sync_record.id}: {sync_speed} docs/min"
)
metrics.append(
Metric(
key=metric_key,
@@ -334,7 +356,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
f":{sync_record.entity_id}:{sync_record.id}"
)
if _has_metric_been_emitted(redis_std, start_latency_key):
task_logger.debug(
task_logger.info(
f"Skipping start latency metric for sync record {sync_record.id} "
"because it has already been emitted"
)
@@ -352,7 +374,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
)
else:
# Skip other sync types
task_logger.debug(
task_logger.info(
f"Skipping sync record {sync_record.id} "
f"with type {sync_record.sync_type} "
f"and id {sync_record.entity_id} "
@@ -371,12 +393,15 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]
start_latency = (
sync_record.sync_start_time - entity.time_last_modified_by_user
).total_seconds()
task_logger.info(
f"Calculated start latency for sync record {sync_record.id}: {start_latency} seconds"
)
if start_latency < 0:
task_logger.error(
f"Start latency is negative for sync record {sync_record.id} "
f"with type {sync_record.sync_type} and id {sync_record.entity_id}."
"This is likely because the entity was updated between the time the "
"time the sync finished and this job ran. Skipping."
f"with type {sync_record.sync_type} and id {sync_record.entity_id}. "
f"Sync start time: {sync_record.sync_start_time}, "
f"Entity last modified: {entity.time_last_modified_by_user}"
)
continue

View File

@@ -72,6 +72,19 @@ def run_jobs() -> None:
"--queues=connector_indexing",
]
cmd_worker_monitoring = [
"celery",
"-A",
"onyx.background.celery.versioned_apps.monitoring",
"worker",
"--pool=threads",
"--concurrency=1",
"--prefetch-multiplier=1",
"--loglevel=INFO",
"--hostname=monitoring@%n",
"--queues=monitoring",
]
cmd_beat = [
"celery",
"-A",
@@ -97,6 +110,13 @@ def run_jobs() -> None:
cmd_worker_indexing, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
worker_monitoring_process = subprocess.Popen(
cmd_worker_monitoring,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
beat_process = subprocess.Popen(
cmd_beat, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
@@ -114,18 +134,23 @@ def run_jobs() -> None:
worker_indexing_thread = threading.Thread(
target=monitor_process, args=("INDEX", worker_indexing_process)
)
worker_monitoring_thread = threading.Thread(
target=monitor_process, args=("MONITORING", worker_monitoring_process)
)
beat_thread = threading.Thread(target=monitor_process, args=("BEAT", beat_process))
worker_primary_thread.start()
worker_light_thread.start()
worker_heavy_thread.start()
worker_indexing_thread.start()
worker_monitoring_thread.start()
beat_thread.start()
worker_primary_thread.join()
worker_light_thread.join()
worker_heavy_thread.join()
worker_indexing_thread.join()
worker_monitoring_thread.join()
beat_thread.join()

View File

@@ -5,7 +5,7 @@ if (process.env.NEXT_PUBLIC_SENTRY_DSN) {
dsn: process.env.NEXT_PUBLIC_SENTRY_DSN,
// Capture unhandled exceptions and performance data
enableTracing: true,
enableTracing: false,
integrations: [],
tracesSampleRate: 0.1,
});