Compare commits

..

16 Commits

Author SHA1 Message Date
pablodanswer
2f350ac209 minutes -> seconds 2025-01-25 17:57:10 -08:00
pablodanswer
76b97b0e06 update 2025-01-25 17:56:25 -08:00
pablodanswer
de98775b43 improvements to monitoring 2025-01-25 17:44:21 -08:00
pablodanswer
95b89863e4 functioning 2025-01-25 14:47:43 -08:00
pablodanswer
25c1f16e5b quick nit 2025-01-25 14:31:47 -08:00
pablodanswer
8822b37dad update values 2025-01-25 14:29:17 -08:00
pablodanswer
ee5752f3d5 fix typing 2025-01-25 14:04:09 -08:00
pablodanswer
ebf06ee528 quick nit 2025-01-25 14:02:27 -08:00
pablodanswer
d56a089370 connector deletion 2025-01-25 13:50:49 -08:00
pablodanswer
b6fab0687a quick updates to monitoring 2025-01-25 13:47:26 -08:00
pablodanswer
3ada740f3c typing 2025-01-24 13:36:46 -08:00
pablodanswer
d8e9e56526 additional comment for clarity 2025-01-24 13:35:13 -08:00
pablodanswer
9439628890 minor improvments / clarity 2025-01-24 13:33:53 -08:00
pablonyx
9b19990764 Input shortcut fix in multi tenant case (#3768)
* validated fix

* nit

* k
2025-01-24 20:40:08 +00:00
Chris Weaver
5d6a18f358 Add support for more /models/list formats (#3739) 2025-01-24 18:25:19 +00:00
pablonyx
3c37764974 Allow all LLMs for image generation assistants (#3730)
* Allow all LLMs for image generation assistants

* ensure pushed

* update color + assistant -> model

* update prompt

* fix silly conditional
2025-01-24 18:23:55 +00:00
21 changed files with 637 additions and 283 deletions

View File

@@ -221,7 +221,7 @@ if not MULTI_TENANT:
{
"name": "monitor-background-processes",
"task": OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES,
"schedule": timedelta(minutes=5),
"schedule": timedelta(minutes=15),
"options": {
"priority": OnyxCeleryPriority.LOW,
"expires": BEAT_EXPIRES_DEFAULT,

View File

@@ -139,13 +139,6 @@ def try_generate_document_cc_pair_cleanup_tasks(
submitted=datetime.now(timezone.utc),
)
# create before setting fence to avoid race condition where the monitoring
# task updates the sync record before it is created
insert_sync_record(
db_session=db_session,
entity_id=cc_pair_id,
sync_type=SyncType.CONNECTOR_DELETION,
)
redis_connector.delete.set_fence(fence_payload)
try:
@@ -184,6 +177,13 @@ def try_generate_document_cc_pair_cleanup_tasks(
)
if tasks_generated is None:
raise ValueError("RedisConnectorDeletion.generate_tasks returned None")
insert_sync_record(
db_session=db_session,
entity_id=cc_pair_id,
sync_type=SyncType.CONNECTOR_DELETION,
)
except TaskDependencyError:
redis_connector.delete.set_fence(None)
raise

View File

@@ -14,8 +14,16 @@ from onyx.db.models import LLMProvider
def _process_model_list_response(model_list_json: Any) -> list[str]:
# Handle case where response is wrapped in a "data" field
if isinstance(model_list_json, dict) and "data" in model_list_json:
model_list_json = model_list_json["data"]
if isinstance(model_list_json, dict):
if "data" in model_list_json:
model_list_json = model_list_json["data"]
elif "models" in model_list_json:
model_list_json = model_list_json["models"]
else:
raise ValueError(
"Invalid response from API - expected dict with 'data' or "
f"'models' field, got {type(model_list_json)}"
)
if not isinstance(model_list_json, list):
raise ValueError(
@@ -27,11 +35,18 @@ def _process_model_list_response(model_list_json: Any) -> list[str]:
for item in model_list_json:
if isinstance(item, str):
model_names.append(item)
elif isinstance(item, dict) and "model_name" in item:
model_names.append(item["model_name"])
elif isinstance(item, dict):
if "model_name" in item:
model_names.append(item["model_name"])
elif "id" in item:
model_names.append(item["id"])
else:
raise ValueError(
f"Invalid item in model list - expected dict with model_name or id, got {type(item)}"
)
else:
raise ValueError(
f"Invalid item in model list - expected string or dict with model_name, got {type(item)}"
f"Invalid item in model list - expected string or dict, got {type(item)}"
)
return model_names

View File

@@ -4,6 +4,7 @@ from collections.abc import Callable
from datetime import timedelta
from itertools import islice
from typing import Any
from typing import Literal
from celery import shared_task
from celery import Task
@@ -26,6 +27,7 @@ from onyx.db.engine import get_all_tenant_ids
from onyx.db.engine import get_db_current_time
from onyx.db.engine import get_session_with_tenant
from onyx.db.enums import IndexingStatus
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import DocumentSet
@@ -38,6 +40,7 @@ from onyx.redis.redis_pool import redis_lock_dump
from onyx.utils.telemetry import optional_telemetry
from onyx.utils.telemetry import RecordType
_MONITORING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes
_MONITORING_TIME_LIMIT = _MONITORING_SOFT_TIME_LIMIT + 60 # 6 minutes
@@ -49,6 +52,12 @@ _CONNECTOR_INDEX_ATTEMPT_RUN_SUCCESS_KEY_FMT = (
"monitoring_connector_index_attempt_run_success:{cc_pair_id}:{index_attempt_id}"
)
_FINAL_METRIC_KEY_FMT = "sync_final_metrics:{sync_type}:{entity_id}:{sync_record_id}"
_SYNC_START_LATENCY_KEY_FMT = (
"sync_start_latency:{sync_type}:{entity_id}:{sync_record_id}"
)
def _mark_metric_as_emitted(redis_std: Redis, key: str) -> None:
"""Mark a metric as having been emitted by setting a Redis key with expiration"""
@@ -111,6 +120,7 @@ class Metric(BaseModel):
}.items()
if v is not None
}
task_logger.info(f"Emitting metric: {data}")
optional_telemetry(
record_type=RecordType.METRIC,
data=data,
@@ -189,237 +199,368 @@ def _build_connector_start_latency_metric(
f"Start latency for index attempt {recent_attempt.id}: {start_latency:.2f}s "
f"(desired: {desired_start_time}, actual: {recent_attempt.time_started})"
)
job_id = build_job_id("connector", str(cc_pair.id), str(recent_attempt.id))
return Metric(
key=metric_key,
name="connector_start_latency",
value=start_latency,
tags={},
tags={
"job_id": job_id,
"connector_id": str(cc_pair.connector.id),
"source": str(cc_pair.connector.source),
},
)
def _build_run_success_metrics(
def _build_connector_final_metrics(
cc_pair: ConnectorCredentialPair,
recent_attempts: list[IndexAttempt],
redis_std: Redis,
) -> list[Metric]:
"""
Final metrics for connector index attempts:
- Boolean success/fail metric
- If success, emit:
* duration (seconds)
* doc_count
"""
metrics = []
for attempt in recent_attempts:
metric_key = _CONNECTOR_INDEX_ATTEMPT_RUN_SUCCESS_KEY_FMT.format(
cc_pair_id=cc_pair.id,
index_attempt_id=attempt.id,
)
if _has_metric_been_emitted(redis_std, metric_key):
task_logger.info(
f"Skipping metric for connector {cc_pair.connector.id} "
f"index attempt {attempt.id} because it has already been "
"emitted"
f"Skipping final metrics for connector {cc_pair.connector.id} "
f"index attempt {attempt.id}, already emitted."
)
continue
if attempt.status in [
# We only emit final metrics if the attempt is in a terminal state
if attempt.status not in [
IndexingStatus.SUCCESS,
IndexingStatus.FAILED,
IndexingStatus.CANCELED,
]:
task_logger.info(
f"Adding run success metric for index attempt {attempt.id} with status {attempt.status}"
# Not finished; skip
continue
job_id = build_job_id("connector", str(cc_pair.id), str(attempt.id))
success = attempt.status == IndexingStatus.SUCCESS
metrics.append(
Metric(
key=metric_key, # We'll mark the same key for any final metrics
name="connector_run_succeeded",
value=success,
tags={
"job_id": job_id,
"connector_id": str(cc_pair.connector.id),
"source": str(cc_pair.connector.source),
"status": attempt.status.value,
},
)
)
if success:
# Make sure we have valid time_started
if attempt.time_started and attempt.time_updated:
duration_seconds = (
attempt.time_updated - attempt.time_started
).total_seconds()
metrics.append(
Metric(
key=None, # No need for a new key, or you can reuse the same if you prefer
name="connector_index_duration_seconds",
value=duration_seconds,
tags={
"job_id": job_id,
"connector_id": str(cc_pair.connector.id),
"source": str(cc_pair.connector.source),
},
)
)
else:
task_logger.error(
f"Index attempt {attempt.id} succeeded but has missing time "
f"(time_started={attempt.time_started}, time_updated={attempt.time_updated})."
)
# For doc counts, choose whichever field is more relevant
doc_count = attempt.total_docs_indexed or 0
metrics.append(
Metric(
key=metric_key,
name="connector_run_succeeded",
value=attempt.status == IndexingStatus.SUCCESS,
tags={"source": str(cc_pair.connector.source)},
key=None,
name="connector_index_doc_count",
value=doc_count,
tags={
"job_id": job_id,
"connector_id": str(cc_pair.connector.id),
"source": str(cc_pair.connector.source),
},
)
)
_mark_metric_as_emitted(redis_std, metric_key)
return metrics
def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Metric]:
"""Collect metrics about connector runs from the past hour"""
# NOTE: use get_db_current_time since the IndexAttempt times are set based on DB time
one_hour_ago = get_db_current_time(db_session) - timedelta(hours=1)
# Get all connector credential pairs
cc_pairs = db_session.scalars(select(ConnectorCredentialPair)).all()
# Might be more than one search setting, or just one
active_search_settings = get_active_search_settings(db_session)
metrics = []
for cc_pair, search_settings in zip(cc_pairs, active_search_settings):
recent_attempts = (
db_session.query(IndexAttempt)
.filter(
IndexAttempt.connector_credential_pair_id == cc_pair.id,
IndexAttempt.search_settings_id == search_settings.id,
# If you want to process each cc_pair against each search setting:
for cc_pair in cc_pairs:
for search_settings in active_search_settings:
recent_attempts = (
db_session.query(IndexAttempt)
.filter(
IndexAttempt.connector_credential_pair_id == cc_pair.id,
IndexAttempt.search_settings_id == search_settings.id,
)
.order_by(IndexAttempt.time_created.desc())
.limit(2)
.all()
)
.order_by(IndexAttempt.time_created.desc())
.limit(2)
.all()
)
if not recent_attempts:
continue
most_recent_attempt = recent_attempts[0]
second_most_recent_attempt = (
recent_attempts[1] if len(recent_attempts) > 1 else None
)
if not recent_attempts:
continue
if one_hour_ago > most_recent_attempt.time_created:
continue
most_recent_attempt = recent_attempts[0]
second_most_recent_attempt = (
recent_attempts[1] if len(recent_attempts) > 1 else None
)
# Connector start latency
start_latency_metric = _build_connector_start_latency_metric(
cc_pair, most_recent_attempt, second_most_recent_attempt, redis_std
)
if start_latency_metric:
metrics.append(start_latency_metric)
if one_hour_ago > most_recent_attempt.time_created:
continue
# Connector run success/failure
run_success_metrics = _build_run_success_metrics(
cc_pair, recent_attempts, redis_std
)
metrics.extend(run_success_metrics)
# Connector start latency
start_latency_metric = _build_connector_start_latency_metric(
cc_pair, most_recent_attempt, second_most_recent_attempt, redis_std
)
if start_latency_metric:
metrics.append(start_latency_metric)
# Connector run success/failure
final_metrics = _build_connector_final_metrics(
cc_pair, recent_attempts, redis_std
)
metrics.extend(final_metrics)
return metrics
def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric]:
"""Collect metrics about document set and group syncing speed"""
# NOTE: use get_db_current_time since the SyncRecord times are set based on DB time
"""
Collect metrics for document set and group syncing:
- Success/failure status
- Start latency (always)
- Duration & doc count (only if success)
- Throughput (docs/min) (only if success)
"""
one_hour_ago = get_db_current_time(db_session) - timedelta(hours=1)
# Get all sync records from the last hour
# Get all sync records that ended in the last hour
recent_sync_records = db_session.scalars(
select(SyncRecord)
.where(SyncRecord.sync_start_time >= one_hour_ago)
.order_by(SyncRecord.sync_start_time.desc())
.where(SyncRecord.sync_end_time.isnot(None))
.where(SyncRecord.sync_end_time >= one_hour_ago)
.order_by(SyncRecord.sync_end_time.desc())
).all()
task_logger.info(
f"Collecting sync metrics for {len(recent_sync_records)} sync records"
)
metrics = []
for sync_record in recent_sync_records:
# Skip if no end time (sync still in progress)
if not sync_record.sync_end_time:
continue
# Build a job_id for correlation
job_id = build_job_id("sync_record", str(sync_record.id))
# Check if we already emitted a metric for this sync record
metric_key = (
f"sync_speed:{sync_record.sync_type}:"
f"{sync_record.entity_id}:{sync_record.id}"
# Emit a SUCCESS/FAIL boolean metric
# Use a single Redis key to avoid re-emitting final metrics
final_metric_key = _FINAL_METRIC_KEY_FMT.format(
sync_type=sync_record.sync_type,
entity_id=sync_record.entity_id,
sync_record_id=sync_record.id,
)
if _has_metric_been_emitted(redis_std, metric_key):
task_logger.info(
f"Skipping metric for sync record {sync_record.id} "
"because it has already been emitted"
if not _has_metric_been_emitted(redis_std, final_metric_key):
# Evaluate success
sync_succeeded = sync_record.sync_status == SyncStatus.SUCCESS
metrics.append(
Metric(
key=final_metric_key,
name="sync_run_succeeded",
value=sync_succeeded,
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
"status": str(sync_record.sync_status),
},
)
)
continue
# Calculate sync duration in minutes
sync_duration_mins = (
sync_record.sync_end_time - sync_record.sync_start_time
).total_seconds() / 60.0
# If successful, emit additional metrics
if sync_succeeded:
if sync_record.sync_end_time and sync_record.sync_start_time:
duration_seconds = (
sync_record.sync_end_time - sync_record.sync_start_time
).total_seconds()
else:
task_logger.error(
f"Invalid times for sync record {sync_record.id}: "
f"start={sync_record.sync_start_time}, end={sync_record.sync_end_time}"
)
duration_seconds = None
# Calculate sync speed (docs/min) - avoid division by zero
sync_speed = (
sync_record.num_docs_synced / sync_duration_mins
if sync_duration_mins > 0
else None
doc_count = sync_record.num_docs_synced or 0
sync_speed = None
if duration_seconds and duration_seconds > 0:
duration_mins = duration_seconds / 60.0
sync_speed = (
doc_count / duration_mins if duration_mins > 0 else None
)
# Emit duration, doc count, speed
if duration_seconds is not None:
metrics.append(
Metric(
key=None,
name="sync_duration_seconds",
value=duration_seconds,
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
},
)
)
else:
task_logger.error(
f"Invalid sync record {sync_record.id} with no duration"
)
metrics.append(
Metric(
key=None,
name="sync_doc_count",
value=doc_count,
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
},
)
)
if sync_speed is not None:
metrics.append(
Metric(
key=None,
name="sync_speed_docs_per_min",
value=sync_speed,
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
},
)
)
else:
task_logger.error(
f"Invalid sync record {sync_record.id} with no duration"
)
# Mark final metrics as emitted so we don't re-emit
_mark_metric_as_emitted(redis_std, final_metric_key)
# Emit start latency
start_latency_key = _SYNC_START_LATENCY_KEY_FMT.format(
sync_type=sync_record.sync_type,
entity_id=sync_record.entity_id,
sync_record_id=sync_record.id,
)
if not _has_metric_been_emitted(redis_std, start_latency_key):
# Get the entity's last update time based on sync type
entity: DocumentSet | UserGroup | None = None
if sync_record.sync_type == SyncType.DOCUMENT_SET:
entity = db_session.scalar(
select(DocumentSet).where(DocumentSet.id == sync_record.entity_id)
)
elif sync_record.sync_type == SyncType.USER_GROUP:
entity = db_session.scalar(
select(UserGroup).where(UserGroup.id == sync_record.entity_id)
)
else:
task_logger.info(
f"Skipping sync record {sync_record.id} of type {sync_record.sync_type}."
)
continue
if sync_speed is None:
task_logger.error(
f"Something went wrong with sync speed calculation. "
f"Sync record: {sync_record.id}, duration: {sync_duration_mins}, "
f"docs synced: {sync_record.num_docs_synced}"
)
continue
if entity is None:
task_logger.error(
f"Could not find entity for sync record {sync_record.id} "
f"(type={sync_record.sync_type}, id={sync_record.entity_id})."
)
continue
task_logger.info(
f"Calculated sync speed for record {sync_record.id}: {sync_speed} docs/min"
)
metrics.append(
Metric(
key=metric_key,
name="sync_speed_docs_per_min",
value=sync_speed,
tags={
"sync_type": str(sync_record.sync_type),
"status": str(sync_record.sync_status),
},
)
)
# Calculate start latency in seconds:
# (actual sync start) - (last modified time)
if entity.time_last_modified_by_user and sync_record.sync_start_time:
start_latency = (
sync_record.sync_start_time - entity.time_last_modified_by_user
).total_seconds()
# Add sync start latency metric
start_latency_key = (
f"sync_start_latency:{sync_record.sync_type}"
f":{sync_record.entity_id}:{sync_record.id}"
)
if _has_metric_been_emitted(redis_std, start_latency_key):
task_logger.info(
f"Skipping start latency metric for sync record {sync_record.id} "
"because it has already been emitted"
)
continue
if start_latency < 0:
task_logger.error(
f"Negative start latency for sync record {sync_record.id} "
f"(start={sync_record.sync_start_time}, entity_modified={entity.time_last_modified_by_user})"
)
continue
# Get the entity's last update time based on sync type
entity: DocumentSet | UserGroup | None = None
if sync_record.sync_type == SyncType.DOCUMENT_SET:
entity = db_session.scalar(
select(DocumentSet).where(DocumentSet.id == sync_record.entity_id)
)
elif sync_record.sync_type == SyncType.USER_GROUP:
entity = db_session.scalar(
select(UserGroup).where(UserGroup.id == sync_record.entity_id)
)
else:
# Skip other sync types
task_logger.info(
f"Skipping sync record {sync_record.id} "
f"with type {sync_record.sync_type} "
f"and id {sync_record.entity_id} "
"because it is not a document set or user group"
)
continue
metrics.append(
Metric(
key=start_latency_key,
name="sync_start_latency_seconds",
value=start_latency,
tags={
"job_id": job_id,
"sync_type": str(sync_record.sync_type),
},
)
)
if entity is None:
task_logger.error(
f"Could not find entity for sync record {sync_record.id} "
f"with type {sync_record.sync_type} and id {sync_record.entity_id}"
)
continue
# Calculate start latency in seconds
start_latency = (
sync_record.sync_start_time - entity.time_last_modified_by_user
).total_seconds()
task_logger.info(
f"Calculated start latency for sync record {sync_record.id}: {start_latency} seconds"
)
if start_latency < 0:
task_logger.error(
f"Start latency is negative for sync record {sync_record.id} "
f"with type {sync_record.sync_type} and id {sync_record.entity_id}. "
f"Sync start time: {sync_record.sync_start_time}, "
f"Entity last modified: {entity.time_last_modified_by_user}"
)
continue
metrics.append(
Metric(
key=start_latency_key,
name="sync_start_latency_seconds",
value=start_latency,
tags={
"sync_type": str(sync_record.sync_type),
},
)
)
_mark_metric_as_emitted(redis_std, start_latency_key)
return metrics
def build_job_id(
job_type: Literal["connector", "sync_record"],
primary_id: str,
secondary_id: str | None = None,
) -> str:
if job_type == "connector":
if secondary_id is None:
raise ValueError(
"secondary_id (attempt_id) is required for connector job_type"
)
return f"connector:{primary_id}:attempt:{secondary_id}"
elif job_type == "sync_record":
return f"sync_record:{primary_id}"
@shared_task(
name=OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES,
soft_time_limit=_MONITORING_SOFT_TIME_LIMIT,
@@ -459,6 +600,7 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None:
lambda: _collect_connector_metrics(db_session, redis_std),
lambda: _collect_sync_metrics(db_session, redis_std),
]
# Collect and log each metric
with get_session_with_tenant(tenant_id) as db_session:
for metric_fn in metric_functions:

View File

@@ -15,6 +15,7 @@ from onyx.llm.models import PreviousMessage
from onyx.llm.utils import build_content_with_imgs
from onyx.llm.utils import check_message_tokens
from onyx.llm.utils import message_to_prompt_and_imgs
from onyx.llm.utils import model_supports_image_input
from onyx.natural_language_processing.utils import get_tokenizer
from onyx.prompts.chat_prompts import CHAT_USER_CONTEXT_FREE_PROMPT
from onyx.prompts.direct_qa_prompts import HISTORY_BLOCK
@@ -90,6 +91,7 @@ class AnswerPromptBuilder:
provider_type=llm_config.model_provider,
model_name=llm_config.model_name,
)
self.llm_config = llm_config
self.llm_tokenizer_encode_func = cast(
Callable[[str], list[int]], llm_tokenizer.encode
)
@@ -98,12 +100,21 @@ class AnswerPromptBuilder:
(
self.message_history,
self.history_token_cnts,
) = translate_history_to_basemessages(message_history)
) = translate_history_to_basemessages(
message_history,
exclude_images=not model_supports_image_input(
self.llm_config.model_name,
self.llm_config.model_provider,
),
)
self.system_message_and_token_cnt: tuple[SystemMessage, int] | None = None
self.user_message_and_token_cnt = (
user_message,
check_message_tokens(user_message, self.llm_tokenizer_encode_func),
check_message_tokens(
user_message,
self.llm_tokenizer_encode_func,
),
)
self.new_messages_and_token_cnts: list[tuple[BaseMessage, int]] = []

View File

@@ -11,6 +11,7 @@ from onyx.llm.utils import build_content_with_imgs
def translate_onyx_msg_to_langchain(
msg: ChatMessage | PreviousMessage,
exclude_images: bool = False,
) -> BaseMessage:
files: list[InMemoryChatFile] = []
@@ -18,7 +19,9 @@ def translate_onyx_msg_to_langchain(
# attached. Just ignore them for now.
if not isinstance(msg, ChatMessage):
files = msg.files
content = build_content_with_imgs(msg.message, files, message_type=msg.message_type)
content = build_content_with_imgs(
msg.message, files, message_type=msg.message_type, exclude_images=exclude_images
)
if msg.message_type == MessageType.SYSTEM:
raise ValueError("System messages are not currently part of history")
@@ -32,9 +35,12 @@ def translate_onyx_msg_to_langchain(
def translate_history_to_basemessages(
history: list[ChatMessage] | list["PreviousMessage"],
exclude_images: bool = False,
) -> tuple[list[BaseMessage], list[int]]:
history_basemessages = [
translate_onyx_msg_to_langchain(msg) for msg in history if msg.token_count != 0
translate_onyx_msg_to_langchain(msg, exclude_images)
for msg in history
if msg.token_count != 0
]
history_token_counts = [msg.token_count for msg in history if msg.token_count != 0]
return history_basemessages, history_token_counts

View File

@@ -193,13 +193,13 @@ def fetch_input_prompts_by_user(
"""
Returns all prompts belonging to the user or public prompts,
excluding those the user has specifically disabled.
Also, if `user_id` is None and AUTH_TYPE is DISABLED, then all prompts are returned.
"""
# Start with a basic query for InputPrompt
query = select(InputPrompt)
# If we have a user, left join to InputPrompt__User so we can check "disabled"
if user_id is not None:
# If we have a user, left join to InputPrompt__User to check "disabled"
IPU = aliased(InputPrompt__User)
query = query.join(
IPU,
@@ -208,25 +208,30 @@ def fetch_input_prompts_by_user(
)
# Exclude disabled prompts
# i.e. keep only those where (IPU.disabled is NULL or False)
query = query.where(or_(IPU.disabled.is_(None), IPU.disabled.is_(False)))
if include_public:
# user-owned or public
# Return both user-owned and public prompts
query = query.where(
(InputPrompt.user_id == user_id) | (InputPrompt.is_public)
or_(
InputPrompt.user_id == user_id,
InputPrompt.is_public,
)
)
else:
# only user-owned prompts
# Return only user-owned prompts
query = query.where(InputPrompt.user_id == user_id)
# If no user is logged in, get all prompts (public and private)
if user_id is None and AUTH_TYPE == AuthType.DISABLED:
query = query.where(True) # type: ignore
else:
# user_id is None
if AUTH_TYPE == AuthType.DISABLED:
# If auth is disabled, return all prompts
query = query.where(True) # type: ignore
elif include_public:
# Anonymous usage
query = query.where(InputPrompt.is_public)
# If no user is logged in but we want to include public prompts
elif include_public:
query = query.where(InputPrompt.is_public)
# Default to returning all prompts
if active is not None:
query = query.where(InputPrompt.active == active)

View File

@@ -8,20 +8,64 @@ from sqlalchemy.orm import Session
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.models import SyncRecord
from onyx.setup import setup_logger
logger = setup_logger()
def insert_sync_record(
db_session: Session,
entity_id: int | None,
entity_id: int,
sync_type: SyncType,
) -> SyncRecord:
"""Insert a new sync record into the database.
"""Insert a new sync record into the database, cancelling any existing in-progress records.
Args:
db_session: The database session to use
entity_id: The ID of the entity being synced (document set ID, user group ID, etc.)
sync_type: The type of sync operation
"""
# If an existing in-progress sync record exists, mark as cancelled
existing_in_progress_sync_record = fetch_latest_sync_record(
db_session, entity_id, sync_type, sync_status=SyncStatus.IN_PROGRESS
)
if existing_in_progress_sync_record is not None:
logger.info(
f"Cancelling existing in-progress sync record {existing_in_progress_sync_record.id} "
f"for entity_id={entity_id} sync_type={sync_type}"
)
mark_sync_records_as_cancelled(db_session, entity_id, sync_type)
return _create_sync_record(db_session, entity_id, sync_type)
def mark_sync_records_as_cancelled(
db_session: Session,
entity_id: int | None,
sync_type: SyncType,
) -> None:
stmt = (
update(SyncRecord)
.where(
and_(
SyncRecord.entity_id == entity_id,
SyncRecord.sync_type == sync_type,
SyncRecord.sync_status == SyncStatus.IN_PROGRESS,
)
)
.values(sync_status=SyncStatus.CANCELED)
)
db_session.execute(stmt)
db_session.commit()
def _create_sync_record(
db_session: Session,
entity_id: int | None,
sync_type: SyncType,
) -> SyncRecord:
"""Create and insert a new sync record into the database."""
sync_record = SyncRecord(
entity_id=entity_id,
sync_type=sync_type,
@@ -39,6 +83,7 @@ def fetch_latest_sync_record(
db_session: Session,
entity_id: int,
sync_type: SyncType,
sync_status: SyncStatus | None = None,
) -> SyncRecord | None:
"""Fetch the most recent sync record for a given entity ID and status.
@@ -59,6 +104,9 @@ def fetch_latest_sync_record(
.limit(1)
)
if sync_status is not None:
stmt = stmt.where(SyncRecord.sync_status == sync_status)
result = db_session.execute(stmt)
return result.scalar_one_or_none()

View File

@@ -142,6 +142,7 @@ def build_content_with_imgs(
img_urls: list[str] | None = None,
b64_imgs: list[str] | None = None,
message_type: MessageType = MessageType.USER,
exclude_images: bool = False,
) -> str | list[str | dict[str, Any]]: # matching Langchain's BaseMessage content type
files = files or []
@@ -157,7 +158,7 @@ def build_content_with_imgs(
message_main_content = _build_content(message, files)
if not img_files and not img_urls:
if exclude_images or (not img_files and not img_urls):
return message_main_content
return cast(
@@ -382,9 +383,19 @@ def _strip_colon_from_model_name(model_name: str) -> str:
return ":".join(model_name.split(":")[:-1]) if ":" in model_name else model_name
def _find_model_obj(
model_map: dict, provider: str, model_names: list[str | None]
) -> dict | None:
def _find_model_obj(model_map: dict, provider: str, model_name: str) -> dict | None:
stripped_model_name = _strip_extra_provider_from_model_name(model_name)
model_names = [
model_name,
_strip_extra_provider_from_model_name(model_name),
# Remove leading extra provider. Usually for cases where user has a
# customer model proxy which appends another prefix
# remove :XXXX from the end, if present. Needed for ollama.
_strip_colon_from_model_name(model_name),
_strip_colon_from_model_name(stripped_model_name),
]
# Filter out None values and deduplicate model names
filtered_model_names = [name for name in model_names if name]
@@ -417,21 +428,10 @@ def get_llm_max_tokens(
return GEN_AI_MAX_TOKENS
try:
extra_provider_stripped_model_name = _strip_extra_provider_from_model_name(
model_name
)
model_obj = _find_model_obj(
model_map,
model_provider,
[
model_name,
# Remove leading extra provider. Usually for cases where user has a
# customer model proxy which appends another prefix
extra_provider_stripped_model_name,
# remove :XXXX from the end, if present. Needed for ollama.
_strip_colon_from_model_name(model_name),
_strip_colon_from_model_name(extra_provider_stripped_model_name),
],
model_name,
)
if not model_obj:
raise RuntimeError(
@@ -523,3 +523,23 @@ def get_max_input_tokens(
raise RuntimeError("No tokens for input for the LLM given settings")
return input_toks
def model_supports_image_input(model_name: str, model_provider: str) -> bool:
model_map = get_model_map()
try:
model_obj = _find_model_obj(
model_map,
model_provider,
model_name,
)
if not model_obj:
raise RuntimeError(
f"No litellm entry found for {model_provider}/{model_name}"
)
return model_obj.get("supports_vision", False)
except Exception:
logger.exception(
f"Failed to get model object for {model_provider}/{model_name}"
)
return False

View File

@@ -1,8 +1,5 @@
import mimetypes
import os
import uuid
import zipfile
from io import BytesIO
from typing import cast
from fastapi import APIRouter
@@ -389,43 +386,10 @@ def upload_files(
for file in files:
if not file.filename:
raise HTTPException(status_code=400, detail="File name cannot be empty")
# Skip directories and known macOS metadata entries
def should_process_file(file_path: str) -> bool:
normalized_path = os.path.normpath(file_path)
return not any(part.startswith(".") for part in normalized_path.split(os.sep))
try:
file_store = get_default_file_store(db_session)
deduped_file_paths = []
for file in files:
if file.content_type and file.content_type.startswith("application/zip"):
with zipfile.ZipFile(file.file, "r") as zf:
for file_info in zf.namelist():
if zf.getinfo(file_info).is_dir():
continue
if not should_process_file(file_info):
continue
sub_file_bytes = zf.read(file_info)
sub_file_name = os.path.join(str(uuid.uuid4()), file_info)
deduped_file_paths.append(sub_file_name)
mime_type, __ = mimetypes.guess_type(file_info)
if mime_type is None:
mime_type = "application/octet-stream"
file_store.save_file(
file_name=sub_file_name,
content=BytesIO(sub_file_bytes),
display_name=os.path.basename(file_info),
file_origin=FileOrigin.CONNECTOR,
file_type=mime_type,
)
continue
file_path = os.path.join(str(uuid.uuid4()), cast(str, file.filename))
deduped_file_paths.append(file_path)
file_store.save_file(

View File

@@ -16,6 +16,7 @@ from onyx.llm.interfaces import LLM
from onyx.llm.models import PreviousMessage
from onyx.llm.utils import build_content_with_imgs
from onyx.llm.utils import message_to_string
from onyx.llm.utils import model_supports_image_input
from onyx.prompts.constants import GENERAL_SEP_PAT
from onyx.tools.message import ToolCallSummary
from onyx.tools.models import ToolResponse
@@ -316,12 +317,22 @@ class ImageGenerationTool(Tool):
for img in img_generation_response
if img.image_data is not None
]
prompt_builder.update_user_prompt(
build_image_generation_user_prompt(
query=prompt_builder.get_user_message_content(),
img_urls=img_urls,
b64_imgs=b64_imgs,
)
user_prompt = build_image_generation_user_prompt(
query=prompt_builder.get_user_message_content(),
supports_image_input=model_supports_image_input(
prompt_builder.llm_config.model_name,
prompt_builder.llm_config.model_provider,
),
prompts=[
prompt
for response in img_generation_response
for prompt in response.revised_prompt
],
img_urls=img_urls,
b64_imgs=b64_imgs,
)
prompt_builder.update_user_prompt(user_prompt)
return prompt_builder

View File

@@ -9,16 +9,34 @@ You have just created the attached images in response to the following query: "{
Can you please summarize them in a sentence or two? Do NOT include image urls or bulleted lists.
"""
IMG_GENERATION_SUMMARY_PROMPT_NO_IMAGES = """
You have generated images based on the following query: "{query}".
The prompts used to create these images were: {prompts}
Describe the two images you generated, summarizing the key elements and content in a sentence or two.
Be specific about what was generated and respond as if you have seen them,
without including any disclaimers or speculations.
"""
def build_image_generation_user_prompt(
query: str,
supports_image_input: bool,
img_urls: list[str] | None = None,
b64_imgs: list[str] | None = None,
prompts: list[str] | None = None,
) -> HumanMessage:
return HumanMessage(
content=build_content_with_imgs(
message=IMG_GENERATION_SUMMARY_PROMPT.format(query=query).strip(),
b64_imgs=b64_imgs,
img_urls=img_urls,
if supports_image_input:
return HumanMessage(
content=build_content_with_imgs(
message=IMG_GENERATION_SUMMARY_PROMPT.format(query=query).strip(),
b64_imgs=b64_imgs,
img_urls=img_urls,
)
)
else:
return HumanMessage(
content=IMG_GENERATION_SUMMARY_PROMPT_NO_IMAGES.format(
query=query, prompts=prompts
).strip()
)
)

View File

@@ -123,6 +123,7 @@ def optional_telemetry(
headers={"Content-Type": "application/json"},
json=payload,
)
except Exception:
# This way it silences all thread level logging as well
pass

View File

View File

@@ -0,0 +1,92 @@
import pytest
from onyx.background.celery.tasks.llm_model_update.tasks import (
_process_model_list_response,
)
@pytest.mark.parametrize(
"input_data,expected_result,expected_error,error_match",
[
# Success cases
(
["gpt-4", "gpt-3.5-turbo", "claude-2"],
["gpt-4", "gpt-3.5-turbo", "claude-2"],
None,
None,
),
(
[
{"model_name": "gpt-4", "other_field": "value"},
{"model_name": "gpt-3.5-turbo", "other_field": "value"},
],
["gpt-4", "gpt-3.5-turbo"],
None,
None,
),
(
[
{"id": "gpt-4", "other_field": "value"},
{"id": "gpt-3.5-turbo", "other_field": "value"},
],
["gpt-4", "gpt-3.5-turbo"],
None,
None,
),
(
{"data": ["gpt-4", "gpt-3.5-turbo"]},
["gpt-4", "gpt-3.5-turbo"],
None,
None,
),
(
{"models": ["gpt-4", "gpt-3.5-turbo"]},
["gpt-4", "gpt-3.5-turbo"],
None,
None,
),
(
{"models": [{"id": "gpt-4"}, {"id": "gpt-3.5-turbo"}]},
["gpt-4", "gpt-3.5-turbo"],
None,
None,
),
# Error cases
(
"not a list",
None,
ValueError,
"Invalid response from API - expected list",
),
(
{"wrong_field": []},
None,
ValueError,
"Invalid response from API - expected dict with 'data' or 'models' field",
),
(
[{"wrong_field": "value"}],
None,
ValueError,
"Invalid item in model list - expected dict with model_name or id",
),
(
[42],
None,
ValueError,
"Invalid item in model list - expected string or dict",
),
],
)
def test_process_model_list_response(
input_data: dict | list,
expected_result: list[str] | None,
expected_error: type[Exception] | None,
error_match: str | None,
) -> None:
if expected_error:
with pytest.raises(expected_error, match=error_match):
_process_model_list_response(input_data)
else:
result = _process_model_list_response(input_data)
assert result == expected_result

View File

@@ -444,26 +444,10 @@ export function AssistantEditor({
let enabledTools = Object.keys(values.enabled_tools_map)
.map((toolId) => Number(toolId))
.filter((toolId) => values.enabled_tools_map[toolId]);
const searchToolEnabled = searchTool
? enabledTools.includes(searchTool.id)
: false;
const imageGenerationToolEnabled = imageGenerationTool
? enabledTools.includes(imageGenerationTool.id)
: false;
if (imageGenerationToolEnabled) {
if (
// model must support image input for image generation
// to work
!checkLLMSupportsImageInput(
values.llm_model_version_override || defaultModelName || ""
)
) {
enabledTools = enabledTools.filter(
(toolId) => toolId !== imageGenerationTool!.id
);
}
}
// if disable_retrieval is set, set num_chunks to 0
// to tell the backend to not fetch any documents
@@ -914,25 +898,20 @@ export function AssistantEditor({
id={`enabled_tools_map.${imageGenerationTool.id}`}
name={`enabled_tools_map.${imageGenerationTool.id}`}
onCheckedChange={() => {
if (
currentLLMSupportsImageOutput &&
isImageGenerationAvailable
) {
if (isImageGenerationAvailable) {
toggleToolInValues(
imageGenerationTool.id
);
}
}}
className={
!currentLLMSupportsImageOutput ||
!isImageGenerationAvailable
? "opacity-50 cursor-not-allowed"
: ""
}
/>
</TooltipTrigger>
{(!currentLLMSupportsImageOutput ||
!isImageGenerationAvailable) && (
{!isImageGenerationAvailable && (
<TooltipContent side="top" align="center">
<p className="bg-background-900 max-w-[200px] mb-1 text-sm rounded-lg p-1.5 text-white">
{!currentLLMSupportsImageOutput

View File

@@ -49,6 +49,7 @@ import {
useContext,
useEffect,
useLayoutEffect,
useMemo,
useRef,
useState,
} from "react";
@@ -1623,7 +1624,7 @@ export function ChatPage({
setPopup({
type: "error",
message:
"The current Assistant does not support image input. Please select an assistant with Vision support.",
"The current model does not support image input. Please select a model with Vision support.",
});
return;
}
@@ -1841,6 +1842,14 @@ export function ChatPage({
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [messageHistory]);
const imageFileInMessageHistory = useMemo(() => {
return messageHistory
.filter((message) => message.type === "user")
.some((message) =>
message.files.some((file) => file.type === ChatFileType.IMAGE)
);
}, [messageHistory]);
const currentVisibleRange = visibleRange.get(currentSessionId()) || {
start: 0,
end: 0,
@@ -1921,6 +1930,10 @@ export function ChatPage({
handleSlackChatRedirect();
}, [searchParams, router]);
useEffect(() => {
llmOverrideManager.updateImageFilesPresent(imageFileInMessageHistory);
}, [imageFileInMessageHistory]);
useEffect(() => {
const handleKeyDown = (event: KeyboardEvent) => {
if (event.metaKey || event.ctrlKey) {

View File

@@ -5,7 +5,6 @@ import {
PopoverTrigger,
} from "@/components/ui/popover";
import { ChatInputOption } from "./ChatInputOption";
import { AnthropicSVG } from "@/components/icons/icons";
import { getDisplayNameForModel } from "@/lib/hooks";
import {
checkLLMSupportsImageInput,
@@ -19,6 +18,14 @@ import {
import { Persona } from "@/app/admin/assistants/interfaces";
import { LlmOverrideManager } from "@/lib/hooks";
import {
Tooltip,
TooltipContent,
TooltipProvider,
TooltipTrigger,
} from "@/components/ui/tooltip";
import { FiAlertTriangle } from "react-icons/fi";
interface LLMPopoverProps {
llmProviders: LLMProviderDescriptor[];
llmOverrideManager: LlmOverrideManager;
@@ -139,6 +146,22 @@ export default function LLMPopover({
);
}
})()}
{llmOverrideManager.imageFilesPresent &&
!checkLLMSupportsImageInput(name) && (
<TooltipProvider>
<Tooltip delayDuration={0}>
<TooltipTrigger className="my-auto flex items-center ml-auto">
<FiAlertTriangle className="text-alert" size={16} />
</TooltipTrigger>
<TooltipContent>
<p className="text-xs">
This LLM is not vision-capable and cannot process
image files present in your chat session.
</p>
</TooltipContent>
</Tooltip>
</TooltipProvider>
)}
</button>
);
}

View File

@@ -1,7 +1,8 @@
import { useRef, useState } from "react";
import { cva, type VariantProps } from "class-variance-authority";
import { cn } from "@/lib/utils";
import { CheckCircle, XCircle } from "lucide-react";
import { Check, CheckCircle, XCircle } from "lucide-react";
import { Warning } from "@phosphor-icons/react";
const popupVariants = cva(
"fixed bottom-4 left-4 p-4 rounded-lg shadow-xl text-white z-[10000] flex items-center space-x-3 transition-all duration-300 ease-in-out",
{
@@ -26,9 +27,9 @@ export interface PopupSpec extends VariantProps<typeof popupVariants> {
export const Popup: React.FC<PopupSpec> = ({ message, type }) => (
<div className={cn(popupVariants({ type }))}>
{type === "success" ? (
<CheckCircle className="w-6 h-6 animate-pulse" />
<Check className="w-6 h-6" />
) : type === "error" ? (
<XCircle className="w-6 h-6 animate-pulse" />
<Warning className="w-6 h-6 " />
) : type === "info" ? (
<svg
className="w-6 h-6"

View File

@@ -36,7 +36,6 @@ export default function TextView({
"text/plain",
"text/x-rst",
"text/x-org",
"txt",
];
return markdownFormats.some((format) => mimeType.startsWith(format));
};
@@ -118,10 +117,7 @@ export default function TextView({
return (
<Dialog open onOpenChange={onClose}>
<DialogContent
hideCloseIcon
className="max-w-5xl w-[90vw] flex flex-col justify-between gap-y-0 h-full max-h-[80vh] p-0"
>
<DialogContent className="max-w-5xl w-[90vw] flex flex-col justify-between gap-y-0 h-full max-h-[80vh] p-0">
<DialogHeader className="px-4 mb-0 pt-2 pb-3 flex flex-row items-center justify-between border-b">
<DialogTitle className="text-lg font-medium truncate">
{fileName}

View File

@@ -360,6 +360,8 @@ export interface LlmOverrideManager {
temperature: number | null;
updateTemperature: (temperature: number | null) => void;
updateModelOverrideForChatSession: (chatSession?: ChatSession) => void;
imageFilesPresent: boolean;
updateImageFilesPresent: (present: boolean) => void;
}
export function useLlmOverride(
llmProviders: LLMProviderDescriptor[],
@@ -383,6 +385,11 @@ export function useLlmOverride(
}
return { name: "", provider: "", modelName: "" };
};
const [imageFilesPresent, setImageFilesPresent] = useState(false);
const updateImageFilesPresent = (present: boolean) => {
setImageFilesPresent(present);
};
const [globalDefault, setGlobalDefault] = useState<LlmOverride>(
getValidLlmOverride(globalModel)
@@ -447,6 +454,8 @@ export function useLlmOverride(
setGlobalDefault,
temperature,
updateTemperature,
imageFilesPresent,
updateImageFilesPresent,
};
}