mirror of
https://github.com/onyx-dot-app/onyx.git
synced 2026-02-16 23:35:46 +00:00
Compare commits
12 Commits
error_supp
...
v0.14.4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
78dd50949a | ||
|
|
e48dea65ed | ||
|
|
dd88fbd548 | ||
|
|
bde0d240af | ||
|
|
a038c05b7b | ||
|
|
8427c380a0 | ||
|
|
ef5a176830 | ||
|
|
4e8552478a | ||
|
|
ac236a91bd | ||
|
|
b11a3ad89e | ||
|
|
a21104d9c2 | ||
|
|
f752b9ed77 |
@@ -14,7 +14,9 @@ from celery.signals import worker_shutdown
|
||||
import danswer.background.celery.apps.app_base as app_base
|
||||
from danswer.background.celery.apps.app_base import task_logger
|
||||
from danswer.background.celery.celery_utils import celery_is_worker_primary
|
||||
from danswer.background.celery.tasks.vespa.tasks import get_unfenced_index_attempt_ids
|
||||
from danswer.background.celery.tasks.indexing.tasks import (
|
||||
get_unfenced_index_attempt_ids,
|
||||
)
|
||||
from danswer.configs.constants import CELERY_PRIMARY_WORKER_LOCK_TIMEOUT
|
||||
from danswer.configs.constants import DanswerRedisLocks
|
||||
from danswer.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME
|
||||
|
||||
@@ -3,6 +3,7 @@ from datetime import timezone
|
||||
from http import HTTPStatus
|
||||
from time import sleep
|
||||
|
||||
import redis
|
||||
import sentry_sdk
|
||||
from celery import Celery
|
||||
from celery import shared_task
|
||||
@@ -33,6 +34,8 @@ from danswer.db.enums import ConnectorCredentialPairStatus
|
||||
from danswer.db.enums import IndexingStatus
|
||||
from danswer.db.enums import IndexModelStatus
|
||||
from danswer.db.index_attempt import create_index_attempt
|
||||
from danswer.db.index_attempt import delete_index_attempt
|
||||
from danswer.db.index_attempt import get_all_index_attempts_by_status
|
||||
from danswer.db.index_attempt import get_index_attempt
|
||||
from danswer.db.index_attempt import get_last_attempt_for_cc_pair
|
||||
from danswer.db.index_attempt import mark_attempt_failed
|
||||
@@ -45,6 +48,7 @@ from danswer.db.swap_index import check_index_swap
|
||||
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
|
||||
from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder
|
||||
from danswer.redis.redis_connector import RedisConnector
|
||||
from danswer.redis.redis_connector_index import RedisConnectorIndex
|
||||
from danswer.redis.redis_connector_index import RedisConnectorIndexPayload
|
||||
from danswer.redis.redis_pool import get_redis_client
|
||||
from danswer.utils.logger import setup_logger
|
||||
@@ -97,6 +101,54 @@ class RunIndexingCallback(RunIndexingCallbackInterface):
|
||||
self.redis_client.incrby(self.generator_progress_key, amount)
|
||||
|
||||
|
||||
def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[int]:
|
||||
"""Gets a list of unfenced index attempts. Should not be possible, so we'd typically
|
||||
want to clean them up.
|
||||
|
||||
Unfenced = attempt not in terminal state and fence does not exist.
|
||||
"""
|
||||
unfenced_attempts: list[int] = []
|
||||
|
||||
# inner/outer/inner double check pattern to avoid race conditions when checking for
|
||||
# bad state
|
||||
# inner = index_attempt in non terminal state
|
||||
# outer = r.fence_key down
|
||||
|
||||
# check the db for index attempts in a non terminal state
|
||||
attempts: list[IndexAttempt] = []
|
||||
attempts.extend(
|
||||
get_all_index_attempts_by_status(IndexingStatus.NOT_STARTED, db_session)
|
||||
)
|
||||
attempts.extend(
|
||||
get_all_index_attempts_by_status(IndexingStatus.IN_PROGRESS, db_session)
|
||||
)
|
||||
|
||||
for attempt in attempts:
|
||||
fence_key = RedisConnectorIndex.fence_key_with_ids(
|
||||
attempt.connector_credential_pair_id, attempt.search_settings_id
|
||||
)
|
||||
|
||||
# if the fence is down / doesn't exist, possible error but not confirmed
|
||||
if r.exists(fence_key):
|
||||
continue
|
||||
|
||||
# Between the time the attempts are first looked up and the time we see the fence down,
|
||||
# the attempt may have completed and taken down the fence normally.
|
||||
|
||||
# We need to double check that the index attempt is still in a non terminal state
|
||||
# and matches the original state, which confirms we are really in a bad state.
|
||||
attempt_2 = get_index_attempt(db_session, attempt.id)
|
||||
if not attempt_2:
|
||||
continue
|
||||
|
||||
if attempt.status != attempt_2.status:
|
||||
continue
|
||||
|
||||
unfenced_attempts.append(attempt.id)
|
||||
|
||||
return unfenced_attempts
|
||||
|
||||
|
||||
@shared_task(
|
||||
name="check_for_indexing",
|
||||
soft_time_limit=300,
|
||||
@@ -107,7 +159,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
|
||||
r = get_redis_client(tenant_id=tenant_id)
|
||||
|
||||
lock_beat = r.lock(
|
||||
lock_beat: RedisLock = r.lock(
|
||||
DanswerRedisLocks.CHECK_INDEXING_BEAT_LOCK,
|
||||
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
|
||||
)
|
||||
@@ -117,6 +169,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
if not lock_beat.acquire(blocking=False):
|
||||
return None
|
||||
|
||||
# check for search settings swap
|
||||
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
|
||||
old_search_settings = check_index_swap(db_session=db_session)
|
||||
current_search_settings = get_current_search_settings(db_session)
|
||||
@@ -135,13 +188,18 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
embedding_model=embedding_model,
|
||||
)
|
||||
|
||||
# gather cc_pair_ids
|
||||
cc_pair_ids: list[int] = []
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
lock_beat.reacquire()
|
||||
cc_pairs = fetch_connector_credential_pairs(db_session)
|
||||
for cc_pair_entry in cc_pairs:
|
||||
cc_pair_ids.append(cc_pair_entry.id)
|
||||
|
||||
# kick off index attempts
|
||||
for cc_pair_id in cc_pair_ids:
|
||||
lock_beat.reacquire()
|
||||
|
||||
redis_connector = RedisConnector(tenant_id, cc_pair_id)
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
# Get the primary search settings
|
||||
@@ -198,6 +256,29 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
f"search_settings={search_settings_instance.id} "
|
||||
)
|
||||
tasks_created += 1
|
||||
|
||||
# Fail any index attempts in the DB that don't have fences
|
||||
# This shouldn't ever happen!
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
unfenced_attempt_ids = get_unfenced_index_attempt_ids(db_session, r)
|
||||
for attempt_id in unfenced_attempt_ids:
|
||||
lock_beat.reacquire()
|
||||
|
||||
attempt = get_index_attempt(db_session, attempt_id)
|
||||
if not attempt:
|
||||
continue
|
||||
|
||||
failure_reason = (
|
||||
f"Unfenced index attempt found in DB: "
|
||||
f"index_attempt={attempt.id} "
|
||||
f"cc_pair={attempt.connector_credential_pair_id} "
|
||||
f"search_settings={attempt.search_settings_id}"
|
||||
)
|
||||
task_logger.error(failure_reason)
|
||||
mark_attempt_failed(
|
||||
attempt.id, db_session, failure_reason=failure_reason
|
||||
)
|
||||
|
||||
except SoftTimeLimitExceeded:
|
||||
task_logger.info(
|
||||
"Soft time limit exceeded, task is being terminated gracefully."
|
||||
@@ -207,6 +288,11 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
finally:
|
||||
if lock_beat.owned():
|
||||
lock_beat.release()
|
||||
else:
|
||||
task_logger.error(
|
||||
"check_for_indexing - Lock not owned on completion: "
|
||||
f"tenant={tenant_id}"
|
||||
)
|
||||
|
||||
return tasks_created
|
||||
|
||||
@@ -311,10 +397,11 @@ def try_creating_indexing_task(
|
||||
"""
|
||||
|
||||
LOCK_TIMEOUT = 30
|
||||
index_attempt_id: int | None = None
|
||||
|
||||
# we need to serialize any attempt to trigger indexing since it can be triggered
|
||||
# either via celery beat or manually (API call)
|
||||
lock = r.lock(
|
||||
lock: RedisLock = r.lock(
|
||||
DANSWER_REDIS_FUNCTION_LOCK_PREFIX + "try_creating_indexing_task",
|
||||
timeout=LOCK_TIMEOUT,
|
||||
)
|
||||
@@ -365,6 +452,8 @@ def try_creating_indexing_task(
|
||||
|
||||
custom_task_id = redis_connector_index.generate_generator_task_id()
|
||||
|
||||
# when the task is sent, we have yet to finish setting up the fence
|
||||
# therefore, the task must contain code that blocks until the fence is ready
|
||||
result = celery_app.send_task(
|
||||
"connector_indexing_proxy_task",
|
||||
kwargs=dict(
|
||||
@@ -385,13 +474,16 @@ def try_creating_indexing_task(
|
||||
payload.celery_task_id = result.id
|
||||
redis_connector_index.set_fence(payload)
|
||||
except Exception:
|
||||
redis_connector_index.set_fence(None)
|
||||
task_logger.exception(
|
||||
f"Unexpected exception: "
|
||||
f"try_creating_indexing_task - Unexpected exception: "
|
||||
f"tenant={tenant_id} "
|
||||
f"cc_pair={cc_pair.id} "
|
||||
f"search_settings={search_settings.id}"
|
||||
)
|
||||
|
||||
if index_attempt_id is not None:
|
||||
delete_index_attempt(db_session, index_attempt_id)
|
||||
redis_connector_index.set_fence(None)
|
||||
return None
|
||||
finally:
|
||||
if lock.owned():
|
||||
@@ -409,7 +501,7 @@ def connector_indexing_proxy_task(
|
||||
) -> None:
|
||||
"""celery tasks are forked, but forking is unstable. This proxies work to a spawned task."""
|
||||
task_logger.info(
|
||||
f"Indexing proxy - starting: attempt={index_attempt_id} "
|
||||
f"Indexing watchdog - starting: attempt={index_attempt_id} "
|
||||
f"tenant={tenant_id} "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"search_settings={search_settings_id}"
|
||||
@@ -417,7 +509,7 @@ def connector_indexing_proxy_task(
|
||||
client = SimpleJobClient()
|
||||
|
||||
job = client.submit(
|
||||
connector_indexing_task,
|
||||
connector_indexing_task_wrapper,
|
||||
index_attempt_id,
|
||||
cc_pair_id,
|
||||
search_settings_id,
|
||||
@@ -428,7 +520,7 @@ def connector_indexing_proxy_task(
|
||||
|
||||
if not job:
|
||||
task_logger.info(
|
||||
f"Indexing proxy - spawn failed: attempt={index_attempt_id} "
|
||||
f"Indexing watchdog - spawn failed: attempt={index_attempt_id} "
|
||||
f"tenant={tenant_id} "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"search_settings={search_settings_id}"
|
||||
@@ -436,7 +528,7 @@ def connector_indexing_proxy_task(
|
||||
return
|
||||
|
||||
task_logger.info(
|
||||
f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} "
|
||||
f"Indexing watchdog - spawn succeeded: attempt={index_attempt_id} "
|
||||
f"tenant={tenant_id} "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"search_settings={search_settings_id}"
|
||||
@@ -460,7 +552,7 @@ def connector_indexing_proxy_task(
|
||||
|
||||
if job.status == "error":
|
||||
task_logger.error(
|
||||
f"Indexing proxy - spawned task exceptioned: "
|
||||
f"Indexing watchdog - spawned task exceptioned: "
|
||||
f"attempt={index_attempt_id} "
|
||||
f"tenant={tenant_id} "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
@@ -472,7 +564,7 @@ def connector_indexing_proxy_task(
|
||||
break
|
||||
|
||||
task_logger.info(
|
||||
f"Indexing proxy - finished: attempt={index_attempt_id} "
|
||||
f"Indexing watchdog - finished: attempt={index_attempt_id} "
|
||||
f"tenant={tenant_id} "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"search_settings={search_settings_id}"
|
||||
@@ -480,6 +572,38 @@ def connector_indexing_proxy_task(
|
||||
return
|
||||
|
||||
|
||||
def connector_indexing_task_wrapper(
|
||||
index_attempt_id: int,
|
||||
cc_pair_id: int,
|
||||
search_settings_id: int,
|
||||
tenant_id: str | None,
|
||||
is_ee: bool,
|
||||
) -> int | None:
|
||||
"""Just wraps connector_indexing_task so we can log any exceptions before
|
||||
re-raising it."""
|
||||
result: int | None = None
|
||||
|
||||
try:
|
||||
result = connector_indexing_task(
|
||||
index_attempt_id,
|
||||
cc_pair_id,
|
||||
search_settings_id,
|
||||
tenant_id,
|
||||
is_ee,
|
||||
)
|
||||
except:
|
||||
logger.exception(
|
||||
f"connector_indexing_task exceptioned: "
|
||||
f"tenant={tenant_id} "
|
||||
f"index_attempt={index_attempt_id} "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"search_settings={search_settings_id}"
|
||||
)
|
||||
raise
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def connector_indexing_task(
|
||||
index_attempt_id: int,
|
||||
cc_pair_id: int,
|
||||
@@ -534,6 +658,7 @@ def connector_indexing_task(
|
||||
if redis_connector.delete.fenced:
|
||||
raise RuntimeError(
|
||||
f"Indexing will not start because connector deletion is in progress: "
|
||||
f"attempt={index_attempt_id} "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"fence={redis_connector.delete.fence_key}"
|
||||
)
|
||||
@@ -541,18 +666,18 @@ def connector_indexing_task(
|
||||
if redis_connector.stop.fenced:
|
||||
raise RuntimeError(
|
||||
f"Indexing will not start because a connector stop signal was detected: "
|
||||
f"attempt={index_attempt_id} "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"fence={redis_connector.stop.fence_key}"
|
||||
)
|
||||
|
||||
while True:
|
||||
# wait for the fence to come up
|
||||
if not redis_connector_index.fenced:
|
||||
if not redis_connector_index.fenced: # The fence must exist
|
||||
raise ValueError(
|
||||
f"connector_indexing_task - fence not found: fence={redis_connector_index.fence_key}"
|
||||
)
|
||||
|
||||
payload = redis_connector_index.payload
|
||||
payload = redis_connector_index.payload # The payload must exist
|
||||
if not payload:
|
||||
raise ValueError("connector_indexing_task: payload invalid or not found")
|
||||
|
||||
@@ -575,7 +700,7 @@ def connector_indexing_task(
|
||||
)
|
||||
break
|
||||
|
||||
lock = r.lock(
|
||||
lock: RedisLock = r.lock(
|
||||
redis_connector_index.generator_lock_key,
|
||||
timeout=CELERY_INDEXING_LOCK_TIMEOUT,
|
||||
)
|
||||
@@ -584,7 +709,7 @@ def connector_indexing_task(
|
||||
if not acquired:
|
||||
logger.warning(
|
||||
f"Indexing task already running, exiting...: "
|
||||
f"cc_pair={cc_pair_id} search_settings={search_settings_id}"
|
||||
f"index_attempt={index_attempt_id} cc_pair={cc_pair_id} search_settings={search_settings_id}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ from http import HTTPStatus
|
||||
from typing import cast
|
||||
|
||||
import httpx
|
||||
import redis
|
||||
from celery import Celery
|
||||
from celery import shared_task
|
||||
from celery import Task
|
||||
@@ -47,13 +46,10 @@ from danswer.db.document_set import fetch_document_sets_for_document
|
||||
from danswer.db.document_set import get_document_set_by_id
|
||||
from danswer.db.document_set import mark_document_set_as_synced
|
||||
from danswer.db.engine import get_session_with_tenant
|
||||
from danswer.db.enums import IndexingStatus
|
||||
from danswer.db.index_attempt import delete_index_attempts
|
||||
from danswer.db.index_attempt import get_all_index_attempts_by_status
|
||||
from danswer.db.index_attempt import get_index_attempt
|
||||
from danswer.db.index_attempt import mark_attempt_failed
|
||||
from danswer.db.models import DocumentSet
|
||||
from danswer.db.models import IndexAttempt
|
||||
from danswer.document_index.document_index_utils import get_both_index_names
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.document_index.interfaces import VespaDocumentFields
|
||||
@@ -649,20 +645,26 @@ def monitor_ccpair_indexing_taskset(
|
||||
# the task is still setting up
|
||||
return
|
||||
|
||||
# Read result state BEFORE generator_complete_key to avoid a race condition
|
||||
# never use any blocking methods on the result from inside a task!
|
||||
result: AsyncResult = AsyncResult(payload.celery_task_id)
|
||||
result_state = result.state
|
||||
|
||||
# inner/outer/inner double check pattern to avoid race conditions when checking for
|
||||
# bad state
|
||||
|
||||
# inner = get_completion / generator_complete not signaled
|
||||
# outer = result.state in READY state
|
||||
status_int = redis_connector_index.get_completion()
|
||||
if status_int is None: # completion signal not set ... check for errors
|
||||
# If we get here, and then the task both sets the completion signal and finishes,
|
||||
# we will incorrectly abort the task. We must check result state, then check
|
||||
# get_completion again to avoid the race condition.
|
||||
if result_state in READY_STATES:
|
||||
if status_int is None: # inner signal not set ... possible error
|
||||
result_state = result.state
|
||||
if (
|
||||
result_state in READY_STATES
|
||||
): # outer signal in terminal state ... possible error
|
||||
# Now double check!
|
||||
if redis_connector_index.get_completion() is None:
|
||||
# IF the task state is READY, THEN generator_complete should be set
|
||||
# if it isn't, then the worker crashed
|
||||
# inner signal still not set (and cannot change when outer result_state is READY)
|
||||
# Task is finished but generator complete isn't set.
|
||||
# We have a problem! Worker may have crashed.
|
||||
|
||||
msg = (
|
||||
f"Connector indexing aborted or exceptioned: "
|
||||
f"attempt={payload.index_attempt_id} "
|
||||
@@ -697,37 +699,6 @@ def monitor_ccpair_indexing_taskset(
|
||||
redis_connector_index.reset()
|
||||
|
||||
|
||||
def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[int]:
|
||||
"""Gets a list of unfenced index attempts. Should not be possible, so we'd typically
|
||||
want to clean them up.
|
||||
|
||||
Unfenced = attempt not in terminal state and fence does not exist.
|
||||
"""
|
||||
unfenced_attempts: list[int] = []
|
||||
|
||||
# do some cleanup before clearing fences
|
||||
# check the db for any outstanding index attempts
|
||||
attempts: list[IndexAttempt] = []
|
||||
attempts.extend(
|
||||
get_all_index_attempts_by_status(IndexingStatus.NOT_STARTED, db_session)
|
||||
)
|
||||
attempts.extend(
|
||||
get_all_index_attempts_by_status(IndexingStatus.IN_PROGRESS, db_session)
|
||||
)
|
||||
|
||||
for attempt in attempts:
|
||||
# if attempts exist in the db but we don't detect them in redis, mark them as failed
|
||||
fence_key = RedisConnectorIndex.fence_key_with_ids(
|
||||
attempt.connector_credential_pair_id, attempt.search_settings_id
|
||||
)
|
||||
if r.exists(fence_key):
|
||||
continue
|
||||
|
||||
unfenced_attempts.append(attempt.id)
|
||||
|
||||
return unfenced_attempts
|
||||
|
||||
|
||||
@shared_task(name="monitor_vespa_sync", soft_time_limit=300, bind=True)
|
||||
def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
|
||||
"""This is a celery beat task that monitors and finalizes metadata sync tasksets.
|
||||
@@ -779,25 +750,6 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
|
||||
f"permissions_sync={n_permissions_sync} "
|
||||
)
|
||||
|
||||
# Fail any index attempts in the DB that don't have fences
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
unfenced_attempt_ids = get_unfenced_index_attempt_ids(db_session, r)
|
||||
for attempt_id in unfenced_attempt_ids:
|
||||
attempt = get_index_attempt(db_session, attempt_id)
|
||||
if not attempt:
|
||||
continue
|
||||
|
||||
failure_reason = (
|
||||
f"Unfenced index attempt found in DB: "
|
||||
f"index_attempt={attempt.id} "
|
||||
f"cc_pair={attempt.connector_credential_pair_id} "
|
||||
f"search_settings={attempt.search_settings_id}"
|
||||
)
|
||||
task_logger.warning(failure_reason)
|
||||
mark_attempt_failed(
|
||||
attempt.id, db_session, failure_reason=failure_reason
|
||||
)
|
||||
|
||||
lock_beat.reacquire()
|
||||
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
|
||||
monitor_connector_taskset(r)
|
||||
|
||||
@@ -79,6 +79,9 @@ def load_personas_from_yaml(
|
||||
if prompts:
|
||||
prompt_ids = [prompt.id for prompt in prompts if prompt is not None]
|
||||
|
||||
if not prompt_ids:
|
||||
raise ValueError("Invalid Persona config, no prompts exist")
|
||||
|
||||
p_id = persona.get("id")
|
||||
tool_ids = []
|
||||
if persona.get("image_generation"):
|
||||
@@ -122,12 +125,16 @@ def load_personas_from_yaml(
|
||||
tool_ids=tool_ids,
|
||||
builtin_persona=True,
|
||||
is_public=True,
|
||||
display_priority=existing_persona.display_priority
|
||||
if existing_persona is not None
|
||||
else persona.get("display_priority"),
|
||||
is_visible=existing_persona.is_visible
|
||||
if existing_persona is not None
|
||||
else persona.get("is_visible"),
|
||||
display_priority=(
|
||||
existing_persona.display_priority
|
||||
if existing_persona is not None
|
||||
else persona.get("display_priority")
|
||||
),
|
||||
is_visible=(
|
||||
existing_persona.is_visible
|
||||
if existing_persona is not None
|
||||
else persona.get("is_visible")
|
||||
),
|
||||
db_session=db_session,
|
||||
)
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ from datetime import timezone
|
||||
from typing import Any
|
||||
from urllib.parse import quote
|
||||
|
||||
from atlassian import Confluence # type: ignore
|
||||
|
||||
from danswer.configs.app_configs import CONFLUENCE_CONNECTOR_LABELS_TO_SKIP
|
||||
from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
|
||||
from danswer.configs.app_configs import INDEX_BATCH_SIZE
|
||||
@@ -70,7 +72,7 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
) -> None:
|
||||
self.batch_size = batch_size
|
||||
self.continue_on_failure = continue_on_failure
|
||||
self.confluence_client: OnyxConfluence | None = None
|
||||
self._confluence_client: OnyxConfluence | None = None
|
||||
self.is_cloud = is_cloud
|
||||
|
||||
# Remove trailing slash from wiki_base if present
|
||||
@@ -97,39 +99,59 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
self.cql_label_filter = ""
|
||||
if labels_to_skip:
|
||||
labels_to_skip = list(set(labels_to_skip))
|
||||
comma_separated_labels = ",".join(f"'{label}'" for label in labels_to_skip)
|
||||
comma_separated_labels = ",".join(
|
||||
f"'{quote(label)}'" for label in labels_to_skip
|
||||
)
|
||||
self.cql_label_filter = f" and label not in ({comma_separated_labels})"
|
||||
|
||||
@property
|
||||
def confluence_client(self) -> OnyxConfluence:
|
||||
if self._confluence_client is None:
|
||||
raise ConnectorMissingCredentialError("Confluence")
|
||||
return self._confluence_client
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
# see https://github.com/atlassian-api/atlassian-python-api/blob/master/atlassian/rest_client.py
|
||||
# for a list of other hidden constructor args
|
||||
self.confluence_client = build_confluence_client(
|
||||
self._confluence_client = build_confluence_client(
|
||||
credentials_json=credentials,
|
||||
is_cloud=self.is_cloud,
|
||||
wiki_base=self.wiki_base,
|
||||
)
|
||||
|
||||
client_without_retries = Confluence(
|
||||
api_version="cloud" if self.is_cloud else "latest",
|
||||
url=self.wiki_base.rstrip("/"),
|
||||
username=credentials["confluence_username"] if self.is_cloud else None,
|
||||
password=credentials["confluence_access_token"] if self.is_cloud else None,
|
||||
token=credentials["confluence_access_token"] if not self.is_cloud else None,
|
||||
)
|
||||
spaces = client_without_retries.get_all_spaces(limit=1)
|
||||
if not spaces:
|
||||
raise RuntimeError(
|
||||
f"No spaces found at {self.wiki_base}! "
|
||||
"Check your credentials and wiki_base and make sure "
|
||||
"is_cloud is set correctly."
|
||||
)
|
||||
return None
|
||||
|
||||
def _get_comment_string_for_page_id(self, page_id: str) -> str:
|
||||
if self.confluence_client is None:
|
||||
raise ConnectorMissingCredentialError("Confluence")
|
||||
|
||||
comment_string = ""
|
||||
|
||||
comment_cql = f"type=comment and container='{page_id}'"
|
||||
comment_cql += self.cql_label_filter
|
||||
|
||||
expand = ",".join(_COMMENT_EXPANSION_FIELDS)
|
||||
for comments in self.confluence_client.paginated_cql_page_retrieval(
|
||||
for comment in self.confluence_client.paginated_cql_retrieval(
|
||||
cql=comment_cql,
|
||||
expand=expand,
|
||||
):
|
||||
for comment in comments:
|
||||
comment_string += "\nComment:\n"
|
||||
comment_string += extract_text_from_confluence_html(
|
||||
confluence_client=self.confluence_client,
|
||||
confluence_object=comment,
|
||||
)
|
||||
comment_string += "\nComment:\n"
|
||||
comment_string += extract_text_from_confluence_html(
|
||||
confluence_client=self.confluence_client,
|
||||
confluence_object=comment,
|
||||
fetched_titles=set(),
|
||||
)
|
||||
|
||||
return comment_string
|
||||
|
||||
@@ -141,9 +163,6 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
If its a page, it extracts the text, adds the comments for the document text.
|
||||
If its an attachment, it just downloads the attachment and converts that into a document.
|
||||
"""
|
||||
if self.confluence_client is None:
|
||||
raise ConnectorMissingCredentialError("Confluence")
|
||||
|
||||
# The url and the id are the same
|
||||
object_url = build_confluence_document_id(
|
||||
self.wiki_base, confluence_object["_links"]["webui"], self.is_cloud
|
||||
@@ -153,16 +172,19 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
# Extract text from page
|
||||
if confluence_object["type"] == "page":
|
||||
object_text = extract_text_from_confluence_html(
|
||||
self.confluence_client, confluence_object
|
||||
confluence_client=self.confluence_client,
|
||||
confluence_object=confluence_object,
|
||||
fetched_titles={confluence_object.get("title", "")},
|
||||
)
|
||||
# Add comments to text
|
||||
object_text += self._get_comment_string_for_page_id(confluence_object["id"])
|
||||
elif confluence_object["type"] == "attachment":
|
||||
object_text = attachment_to_content(
|
||||
self.confluence_client, confluence_object
|
||||
confluence_client=self.confluence_client, attachment=confluence_object
|
||||
)
|
||||
|
||||
if object_text is None:
|
||||
# This only happens for attachments that are not parseable
|
||||
return None
|
||||
|
||||
# Get space name
|
||||
@@ -193,44 +215,39 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
)
|
||||
|
||||
def _fetch_document_batches(self) -> GenerateDocumentsOutput:
|
||||
if self.confluence_client is None:
|
||||
raise ConnectorMissingCredentialError("Confluence")
|
||||
|
||||
doc_batch: list[Document] = []
|
||||
confluence_page_ids: list[str] = []
|
||||
|
||||
page_query = self.cql_page_query + self.cql_label_filter + self.cql_time_filter
|
||||
# Fetch pages as Documents
|
||||
for page_batch in self.confluence_client.paginated_cql_page_retrieval(
|
||||
for page in self.confluence_client.paginated_cql_retrieval(
|
||||
cql=page_query,
|
||||
expand=",".join(_PAGE_EXPANSION_FIELDS),
|
||||
limit=self.batch_size,
|
||||
):
|
||||
for page in page_batch:
|
||||
confluence_page_ids.append(page["id"])
|
||||
doc = self._convert_object_to_document(page)
|
||||
if doc is not None:
|
||||
doc_batch.append(doc)
|
||||
if len(doc_batch) >= self.batch_size:
|
||||
yield doc_batch
|
||||
doc_batch = []
|
||||
confluence_page_ids.append(page["id"])
|
||||
doc = self._convert_object_to_document(page)
|
||||
if doc is not None:
|
||||
doc_batch.append(doc)
|
||||
if len(doc_batch) >= self.batch_size:
|
||||
yield doc_batch
|
||||
doc_batch = []
|
||||
|
||||
# Fetch attachments as Documents
|
||||
for confluence_page_id in confluence_page_ids:
|
||||
attachment_cql = f"type=attachment and container='{confluence_page_id}'"
|
||||
attachment_cql += self.cql_label_filter
|
||||
# TODO: maybe should add time filter as well?
|
||||
for attachments in self.confluence_client.paginated_cql_page_retrieval(
|
||||
for attachment in self.confluence_client.paginated_cql_retrieval(
|
||||
cql=attachment_cql,
|
||||
expand=",".join(_ATTACHMENT_EXPANSION_FIELDS),
|
||||
):
|
||||
for attachment in attachments:
|
||||
doc = self._convert_object_to_document(attachment)
|
||||
if doc is not None:
|
||||
doc_batch.append(doc)
|
||||
if len(doc_batch) >= self.batch_size:
|
||||
yield doc_batch
|
||||
doc_batch = []
|
||||
doc = self._convert_object_to_document(attachment)
|
||||
if doc is not None:
|
||||
doc_batch.append(doc)
|
||||
if len(doc_batch) >= self.batch_size:
|
||||
yield doc_batch
|
||||
doc_batch = []
|
||||
|
||||
if doc_batch:
|
||||
yield doc_batch
|
||||
@@ -255,52 +272,47 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
) -> GenerateSlimDocumentOutput:
|
||||
if self.confluence_client is None:
|
||||
raise ConnectorMissingCredentialError("Confluence")
|
||||
|
||||
doc_metadata_list: list[SlimDocument] = []
|
||||
|
||||
restrictions_expand = ",".join(_RESTRICTIONS_EXPANSION_FIELDS)
|
||||
|
||||
page_query = self.cql_page_query + self.cql_label_filter
|
||||
for pages in self.confluence_client.cql_paginate_all_expansions(
|
||||
for page in self.confluence_client.cql_paginate_all_expansions(
|
||||
cql=page_query,
|
||||
expand=restrictions_expand,
|
||||
):
|
||||
for page in pages:
|
||||
# If the page has restrictions, add them to the perm_sync_data
|
||||
# These will be used by doc_sync.py to sync permissions
|
||||
perm_sync_data = {
|
||||
"restrictions": page.get("restrictions", {}),
|
||||
"space_key": page.get("space", {}).get("key"),
|
||||
}
|
||||
# If the page has restrictions, add them to the perm_sync_data
|
||||
# These will be used by doc_sync.py to sync permissions
|
||||
perm_sync_data = {
|
||||
"restrictions": page.get("restrictions", {}),
|
||||
"space_key": page.get("space", {}).get("key"),
|
||||
}
|
||||
|
||||
doc_metadata_list.append(
|
||||
SlimDocument(
|
||||
id=build_confluence_document_id(
|
||||
self.wiki_base,
|
||||
page["_links"]["webui"],
|
||||
self.is_cloud,
|
||||
),
|
||||
perm_sync_data=perm_sync_data,
|
||||
)
|
||||
)
|
||||
attachment_cql = f"type=attachment and container='{page['id']}'"
|
||||
attachment_cql += self.cql_label_filter
|
||||
for attachment in self.confluence_client.cql_paginate_all_expansions(
|
||||
cql=attachment_cql,
|
||||
expand=restrictions_expand,
|
||||
):
|
||||
doc_metadata_list.append(
|
||||
SlimDocument(
|
||||
id=build_confluence_document_id(
|
||||
self.wiki_base,
|
||||
page["_links"]["webui"],
|
||||
attachment["_links"]["webui"],
|
||||
self.is_cloud,
|
||||
),
|
||||
perm_sync_data=perm_sync_data,
|
||||
)
|
||||
)
|
||||
attachment_cql = f"type=attachment and container='{page['id']}'"
|
||||
attachment_cql += self.cql_label_filter
|
||||
for attachments in self.confluence_client.cql_paginate_all_expansions(
|
||||
cql=attachment_cql,
|
||||
expand=restrictions_expand,
|
||||
):
|
||||
for attachment in attachments:
|
||||
doc_metadata_list.append(
|
||||
SlimDocument(
|
||||
id=build_confluence_document_id(
|
||||
self.wiki_base,
|
||||
attachment["_links"]["webui"],
|
||||
self.is_cloud,
|
||||
),
|
||||
perm_sync_data=perm_sync_data,
|
||||
)
|
||||
)
|
||||
yield doc_metadata_list
|
||||
doc_metadata_list = []
|
||||
yield doc_metadata_list
|
||||
doc_metadata_list = []
|
||||
|
||||
@@ -20,6 +20,10 @@ F = TypeVar("F", bound=Callable[..., Any])
|
||||
|
||||
RATE_LIMIT_MESSAGE_LOWERCASE = "Rate limit exceeded".lower()
|
||||
|
||||
# https://jira.atlassian.com/browse/CONFCLOUD-76433
|
||||
_PROBLEMATIC_EXPANSIONS = "body.storage.value"
|
||||
_REPLACEMENT_EXPANSIONS = "body.view.value"
|
||||
|
||||
|
||||
class ConfluenceRateLimitError(Exception):
|
||||
pass
|
||||
@@ -80,7 +84,7 @@ def handle_confluence_rate_limit(confluence_call: F) -> F:
|
||||
def wrapped_call(*args: list[Any], **kwargs: Any) -> Any:
|
||||
MAX_RETRIES = 5
|
||||
|
||||
TIMEOUT = 3600
|
||||
TIMEOUT = 600
|
||||
timeout_at = time.monotonic() + TIMEOUT
|
||||
|
||||
for attempt in range(MAX_RETRIES):
|
||||
@@ -95,6 +99,10 @@ def handle_confluence_rate_limit(confluence_call: F) -> F:
|
||||
return confluence_call(*args, **kwargs)
|
||||
except HTTPError as e:
|
||||
delay_until = _handle_http_error(e, attempt)
|
||||
logger.warning(
|
||||
f"HTTPError in confluence call. "
|
||||
f"Retrying in {delay_until} seconds..."
|
||||
)
|
||||
while time.monotonic() < delay_until:
|
||||
# in the future, check a signal here to exit
|
||||
time.sleep(1)
|
||||
@@ -141,7 +149,7 @@ class OnyxConfluence(Confluence):
|
||||
|
||||
def _paginate_url(
|
||||
self, url_suffix: str, limit: int | None = None
|
||||
) -> Iterator[list[dict[str, Any]]]:
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
"""
|
||||
This will paginate through the top level query.
|
||||
"""
|
||||
@@ -153,46 +161,43 @@ class OnyxConfluence(Confluence):
|
||||
|
||||
while url_suffix:
|
||||
try:
|
||||
logger.debug(f"Making confluence call to {url_suffix}")
|
||||
next_response = self.get(url_suffix)
|
||||
except Exception as e:
|
||||
logger.exception("Error in danswer_cql: \n")
|
||||
raise e
|
||||
yield next_response.get("results", [])
|
||||
logger.warning(f"Error in confluence call to {url_suffix}")
|
||||
|
||||
# If the problematic expansion is in the url, replace it
|
||||
# with the replacement expansion and try again
|
||||
# If that fails, raise the error
|
||||
if _PROBLEMATIC_EXPANSIONS not in url_suffix:
|
||||
logger.exception(f"Error in confluence call to {url_suffix}")
|
||||
raise e
|
||||
logger.warning(
|
||||
f"Replacing {_PROBLEMATIC_EXPANSIONS} with {_REPLACEMENT_EXPANSIONS}"
|
||||
" and trying again."
|
||||
)
|
||||
url_suffix = url_suffix.replace(
|
||||
_PROBLEMATIC_EXPANSIONS,
|
||||
_REPLACEMENT_EXPANSIONS,
|
||||
)
|
||||
continue
|
||||
|
||||
# yield the results individually
|
||||
yield from next_response.get("results", [])
|
||||
|
||||
url_suffix = next_response.get("_links", {}).get("next")
|
||||
|
||||
def paginated_groups_retrieval(
|
||||
self,
|
||||
limit: int | None = None,
|
||||
) -> Iterator[list[dict[str, Any]]]:
|
||||
return self._paginate_url("rest/api/group", limit)
|
||||
|
||||
def paginated_group_members_retrieval(
|
||||
self,
|
||||
group_name: str,
|
||||
limit: int | None = None,
|
||||
) -> Iterator[list[dict[str, Any]]]:
|
||||
group_name = quote(group_name)
|
||||
return self._paginate_url(f"rest/api/group/{group_name}/member", limit)
|
||||
|
||||
def paginated_cql_user_retrieval(
|
||||
def paginated_cql_retrieval(
|
||||
self,
|
||||
cql: str,
|
||||
expand: str | None = None,
|
||||
limit: int | None = None,
|
||||
) -> Iterator[list[dict[str, Any]]]:
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
"""
|
||||
The content/search endpoint can be used to fetch pages, attachments, and comments.
|
||||
"""
|
||||
expand_string = f"&expand={expand}" if expand else ""
|
||||
return self._paginate_url(
|
||||
f"rest/api/search/user?cql={cql}{expand_string}", limit
|
||||
)
|
||||
|
||||
def paginated_cql_page_retrieval(
|
||||
self,
|
||||
cql: str,
|
||||
expand: str | None = None,
|
||||
limit: int | None = None,
|
||||
) -> Iterator[list[dict[str, Any]]]:
|
||||
expand_string = f"&expand={expand}" if expand else ""
|
||||
return self._paginate_url(
|
||||
yield from self._paginate_url(
|
||||
f"rest/api/content/search?cql={cql}{expand_string}", limit
|
||||
)
|
||||
|
||||
@@ -201,7 +206,7 @@ class OnyxConfluence(Confluence):
|
||||
cql: str,
|
||||
expand: str | None = None,
|
||||
limit: int | None = None,
|
||||
) -> Iterator[list[dict[str, Any]]]:
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
"""
|
||||
This function will paginate through the top level query first, then
|
||||
paginate through all of the expansions.
|
||||
@@ -221,6 +226,44 @@ class OnyxConfluence(Confluence):
|
||||
for item in data:
|
||||
_traverse_and_update(item)
|
||||
|
||||
for results in self.paginated_cql_page_retrieval(cql, expand, limit):
|
||||
_traverse_and_update(results)
|
||||
yield results
|
||||
for confluence_object in self.paginated_cql_retrieval(cql, expand, limit):
|
||||
_traverse_and_update(confluence_object)
|
||||
yield confluence_object
|
||||
|
||||
def paginated_cql_user_retrieval(
|
||||
self,
|
||||
cql: str,
|
||||
expand: str | None = None,
|
||||
limit: int | None = None,
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
"""
|
||||
The search/user endpoint can be used to fetch users.
|
||||
It's a seperate endpoint from the content/search endpoint used only for users.
|
||||
Otherwise it's very similar to the content/search endpoint.
|
||||
"""
|
||||
expand_string = f"&expand={expand}" if expand else ""
|
||||
yield from self._paginate_url(
|
||||
f"rest/api/search/user?cql={cql}{expand_string}", limit
|
||||
)
|
||||
|
||||
def paginated_groups_retrieval(
|
||||
self,
|
||||
limit: int | None = None,
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
"""
|
||||
This is not an SQL like query.
|
||||
It's a confluence specific endpoint that can be used to fetch groups.
|
||||
"""
|
||||
yield from self._paginate_url("rest/api/group", limit)
|
||||
|
||||
def paginated_group_members_retrieval(
|
||||
self,
|
||||
group_name: str,
|
||||
limit: int | None = None,
|
||||
) -> Iterator[dict[str, Any]]:
|
||||
"""
|
||||
This is not an SQL like query.
|
||||
It's a confluence specific endpoint that can be used to fetch the members of a group.
|
||||
"""
|
||||
group_name = quote(group_name)
|
||||
yield from self._paginate_url(f"rest/api/group/{group_name}/member", limit)
|
||||
|
||||
@@ -2,6 +2,7 @@ import io
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from typing import Any
|
||||
from urllib.parse import quote
|
||||
|
||||
import bs4
|
||||
|
||||
@@ -71,7 +72,9 @@ def _get_user(confluence_client: OnyxConfluence, user_id: str) -> str:
|
||||
|
||||
|
||||
def extract_text_from_confluence_html(
|
||||
confluence_client: OnyxConfluence, confluence_object: dict[str, Any]
|
||||
confluence_client: OnyxConfluence,
|
||||
confluence_object: dict[str, Any],
|
||||
fetched_titles: set[str],
|
||||
) -> str:
|
||||
"""Parse a Confluence html page and replace the 'user Id' by the real
|
||||
User Display Name
|
||||
@@ -79,7 +82,7 @@ def extract_text_from_confluence_html(
|
||||
Args:
|
||||
confluence_object (dict): The confluence object as a dict
|
||||
confluence_client (Confluence): Confluence client
|
||||
|
||||
fetched_titles (set[str]): The titles of the pages that have already been fetched
|
||||
Returns:
|
||||
str: loaded and formated Confluence page
|
||||
"""
|
||||
@@ -101,38 +104,72 @@ def extract_text_from_confluence_html(
|
||||
# Include @ sign for tagging, more clear for LLM
|
||||
user.replaceWith("@" + _get_user(confluence_client, user_id))
|
||||
|
||||
for html_page_reference in soup.findAll("ri:page"):
|
||||
for html_page_reference in soup.findAll("ac:structured-macro"):
|
||||
# Here, we only want to process page within page macros
|
||||
if html_page_reference.attrs.get("ac:name") != "include":
|
||||
continue
|
||||
|
||||
page_data = html_page_reference.find("ri:page")
|
||||
if not page_data:
|
||||
logger.warning(
|
||||
f"Skipping retrieval of {html_page_reference} because because page data is missing"
|
||||
)
|
||||
continue
|
||||
|
||||
page_title = page_data.attrs.get("ri:content-title")
|
||||
if not page_title:
|
||||
# only fetch pages that have a title
|
||||
logger.warning(
|
||||
f"Skipping retrieval of {html_page_reference} because it has no title"
|
||||
)
|
||||
continue
|
||||
|
||||
if page_title in fetched_titles:
|
||||
# prevent recursive fetching of pages
|
||||
logger.debug(f"Skipping {page_title} because it has already been fetched")
|
||||
continue
|
||||
|
||||
fetched_titles.add(page_title)
|
||||
|
||||
# Wrap this in a try-except because there are some pages that might not exist
|
||||
try:
|
||||
page_title = html_page_reference.attrs["ri:content-title"]
|
||||
if not page_title:
|
||||
continue
|
||||
|
||||
page_query = f"type=page and title='{page_title}'"
|
||||
page_query = f"type=page and title='{quote(page_title)}'"
|
||||
|
||||
page_contents: dict[str, Any] | None = None
|
||||
# Confluence enforces title uniqueness, so we should only get one result here
|
||||
for page_batch in confluence_client.paginated_cql_page_retrieval(
|
||||
for page in confluence_client.paginated_cql_retrieval(
|
||||
cql=page_query,
|
||||
expand="body.storage.value",
|
||||
limit=1,
|
||||
):
|
||||
page_contents = page_batch[0]
|
||||
page_contents = page
|
||||
break
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Error getting page contents for object {confluence_object}"
|
||||
f"Error getting page contents for object {confluence_object}: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
if not page_contents:
|
||||
continue
|
||||
|
||||
text_from_page = extract_text_from_confluence_html(
|
||||
confluence_client, page_contents
|
||||
confluence_client=confluence_client,
|
||||
confluence_object=page_contents,
|
||||
fetched_titles=fetched_titles,
|
||||
)
|
||||
|
||||
html_page_reference.replaceWith(text_from_page)
|
||||
|
||||
for html_link_body in soup.findAll("ac:link-body"):
|
||||
# This extracts the text from inline links in the page so they can be
|
||||
# represented in the document text as plain text
|
||||
try:
|
||||
text_from_link = html_link_body.text
|
||||
html_link_body.replaceWith(f"(LINK TEXT: {text_from_link})")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error processing ac:link-body: {e}")
|
||||
|
||||
return format_document_soup(soup)
|
||||
|
||||
|
||||
@@ -246,6 +283,6 @@ def build_confluence_client(
|
||||
password=credentials_json["confluence_access_token"] if is_cloud else None,
|
||||
token=credentials_json["confluence_access_token"] if not is_cloud else None,
|
||||
backoff_and_retry=True,
|
||||
max_backoff_retries=60,
|
||||
max_backoff_retries=10,
|
||||
max_backoff_seconds=60,
|
||||
)
|
||||
|
||||
@@ -67,6 +67,13 @@ def create_index_attempt(
|
||||
return new_attempt.id
|
||||
|
||||
|
||||
def delete_index_attempt(db_session: Session, index_attempt_id: int) -> None:
|
||||
index_attempt = get_index_attempt(db_session, index_attempt_id)
|
||||
if index_attempt:
|
||||
db_session.delete(index_attempt)
|
||||
db_session.commit()
|
||||
|
||||
|
||||
def mock_successful_index_attempt(
|
||||
connector_credential_pair_id: int,
|
||||
search_settings_id: int,
|
||||
|
||||
@@ -160,7 +160,7 @@ def create_update_persona(
|
||||
"persona_id": persona_id,
|
||||
"user": user,
|
||||
"db_session": db_session,
|
||||
**create_persona_request.dict(exclude={"users", "groups"}),
|
||||
**create_persona_request.model_dump(exclude={"users", "groups"}),
|
||||
}
|
||||
|
||||
persona = upsert_persona(**persona_data)
|
||||
@@ -391,6 +391,9 @@ def upsert_prompt(
|
||||
return prompt
|
||||
|
||||
|
||||
# NOTE: This operation cannot update persona configuration options that
|
||||
# are core to the persona, such as its display priority and
|
||||
# whether or not the assistant is a built-in / default assistant
|
||||
def upsert_persona(
|
||||
user: User | None,
|
||||
name: str,
|
||||
@@ -423,9 +426,9 @@ def upsert_persona(
|
||||
chunks_below: int = CONTEXT_CHUNKS_BELOW,
|
||||
) -> Persona:
|
||||
if persona_id is not None:
|
||||
persona = db_session.query(Persona).filter_by(id=persona_id).first()
|
||||
existing_persona = db_session.query(Persona).filter_by(id=persona_id).first()
|
||||
else:
|
||||
persona = _get_persona_by_name(
|
||||
existing_persona = _get_persona_by_name(
|
||||
persona_name=name, user=user, db_session=db_session
|
||||
)
|
||||
|
||||
@@ -451,60 +454,78 @@ def upsert_persona(
|
||||
prompts = None
|
||||
if prompt_ids is not None:
|
||||
prompts = db_session.query(Prompt).filter(Prompt.id.in_(prompt_ids)).all()
|
||||
if not prompts and prompt_ids:
|
||||
raise ValueError("prompts not found")
|
||||
|
||||
if prompts is not None and len(prompts) == 0:
|
||||
raise ValueError(
|
||||
f"Invalid Persona config, no valid prompts "
|
||||
f"specified. Specified IDs were: '{prompt_ids}'"
|
||||
)
|
||||
|
||||
# ensure all specified tools are valid
|
||||
if tools:
|
||||
validate_persona_tools(tools)
|
||||
|
||||
if persona:
|
||||
if not builtin_persona and persona.builtin_persona:
|
||||
if existing_persona:
|
||||
# Built-in personas can only be updated through YAML configuration.
|
||||
# This ensures that core system personas are not modified unintentionally.
|
||||
if existing_persona.builtin_persona and not builtin_persona:
|
||||
raise ValueError("Cannot update builtin persona with non-builtin.")
|
||||
|
||||
# this checks if the user has permission to edit the persona
|
||||
persona = fetch_persona_by_id(
|
||||
db_session=db_session, persona_id=persona.id, user=user, get_editable=True
|
||||
# will raise an Exception if the user does not have permission
|
||||
existing_persona = fetch_persona_by_id(
|
||||
db_session=db_session,
|
||||
persona_id=existing_persona.id,
|
||||
user=user,
|
||||
get_editable=True,
|
||||
)
|
||||
|
||||
persona.name = name
|
||||
persona.description = description
|
||||
persona.num_chunks = num_chunks
|
||||
persona.chunks_above = chunks_above
|
||||
persona.chunks_below = chunks_below
|
||||
persona.llm_relevance_filter = llm_relevance_filter
|
||||
persona.llm_filter_extraction = llm_filter_extraction
|
||||
persona.recency_bias = recency_bias
|
||||
persona.builtin_persona = builtin_persona
|
||||
persona.llm_model_provider_override = llm_model_provider_override
|
||||
persona.llm_model_version_override = llm_model_version_override
|
||||
persona.starter_messages = starter_messages
|
||||
persona.deleted = False # Un-delete if previously deleted
|
||||
persona.is_public = is_public
|
||||
persona.icon_color = icon_color
|
||||
persona.icon_shape = icon_shape
|
||||
# The following update excludes `default`, `built-in`, and display priority.
|
||||
# Display priority is handled separately in the `display-priority` endpoint.
|
||||
# `default` and `built-in` properties can only be set when creating a persona.
|
||||
existing_persona.name = name
|
||||
existing_persona.description = description
|
||||
existing_persona.num_chunks = num_chunks
|
||||
existing_persona.chunks_above = chunks_above
|
||||
existing_persona.chunks_below = chunks_below
|
||||
existing_persona.llm_relevance_filter = llm_relevance_filter
|
||||
existing_persona.llm_filter_extraction = llm_filter_extraction
|
||||
existing_persona.recency_bias = recency_bias
|
||||
existing_persona.llm_model_provider_override = llm_model_provider_override
|
||||
existing_persona.llm_model_version_override = llm_model_version_override
|
||||
existing_persona.starter_messages = starter_messages
|
||||
existing_persona.deleted = False # Un-delete if previously deleted
|
||||
existing_persona.is_public = is_public
|
||||
existing_persona.icon_color = icon_color
|
||||
existing_persona.icon_shape = icon_shape
|
||||
if remove_image or uploaded_image_id:
|
||||
persona.uploaded_image_id = uploaded_image_id
|
||||
persona.display_priority = display_priority
|
||||
persona.is_visible = is_visible
|
||||
persona.search_start_date = search_start_date
|
||||
persona.is_default_persona = is_default_persona
|
||||
persona.category_id = category_id
|
||||
existing_persona.uploaded_image_id = uploaded_image_id
|
||||
existing_persona.is_visible = is_visible
|
||||
existing_persona.search_start_date = search_start_date
|
||||
existing_persona.category_id = category_id
|
||||
# Do not delete any associations manually added unless
|
||||
# a new updated list is provided
|
||||
if document_sets is not None:
|
||||
persona.document_sets.clear()
|
||||
persona.document_sets = document_sets or []
|
||||
existing_persona.document_sets.clear()
|
||||
existing_persona.document_sets = document_sets or []
|
||||
|
||||
if prompts is not None:
|
||||
persona.prompts.clear()
|
||||
persona.prompts = prompts or []
|
||||
existing_persona.prompts.clear()
|
||||
existing_persona.prompts = prompts
|
||||
|
||||
if tools is not None:
|
||||
persona.tools = tools or []
|
||||
existing_persona.tools = tools or []
|
||||
|
||||
persona = existing_persona
|
||||
|
||||
else:
|
||||
persona = Persona(
|
||||
if not prompts:
|
||||
raise ValueError(
|
||||
"Invalid Persona config. "
|
||||
"Must specify at least one prompt for a new persona."
|
||||
)
|
||||
|
||||
new_persona = Persona(
|
||||
id=persona_id,
|
||||
user_id=user.id if user else None,
|
||||
is_public=is_public,
|
||||
@@ -517,7 +538,7 @@ def upsert_persona(
|
||||
llm_filter_extraction=llm_filter_extraction,
|
||||
recency_bias=recency_bias,
|
||||
builtin_persona=builtin_persona,
|
||||
prompts=prompts or [],
|
||||
prompts=prompts,
|
||||
document_sets=document_sets or [],
|
||||
llm_model_provider_override=llm_model_provider_override,
|
||||
llm_model_version_override=llm_model_version_override,
|
||||
@@ -532,8 +553,8 @@ def upsert_persona(
|
||||
is_default_persona=is_default_persona,
|
||||
category_id=category_id,
|
||||
)
|
||||
db_session.add(persona)
|
||||
|
||||
db_session.add(new_persona)
|
||||
persona = new_persona
|
||||
if commit:
|
||||
db_session.commit()
|
||||
else:
|
||||
@@ -734,6 +755,8 @@ def get_prompt_by_name(
|
||||
if user and user.role != UserRole.ADMIN:
|
||||
stmt = stmt.where(Prompt.user_id == user.id)
|
||||
|
||||
# Order by ID to ensure consistent result when multiple prompts exist
|
||||
stmt = stmt.order_by(Prompt.id).limit(1)
|
||||
result = db_session.execute(stmt).scalar_one_or_none()
|
||||
return result
|
||||
|
||||
|
||||
@@ -176,6 +176,9 @@ def create_persona(
|
||||
)
|
||||
|
||||
|
||||
# NOTE: This endpoint cannot update persona configuration options that
|
||||
# are core to the persona, such as its display priority and
|
||||
# whether or not the assistant is a built-in / default assistant
|
||||
@basic_router.patch("/{persona_id}")
|
||||
def update_persona(
|
||||
persona_id: int,
|
||||
|
||||
@@ -618,7 +618,6 @@ def update_user_assistant_list(
|
||||
if user is None:
|
||||
if AUTH_TYPE == AuthType.DISABLED:
|
||||
store = get_kv_store()
|
||||
|
||||
no_auth_user = fetch_no_auth_user(store)
|
||||
no_auth_user.preferences.chosen_assistants = request.chosen_assistants
|
||||
set_no_auth_user_preferences(store, no_auth_user.preferences)
|
||||
|
||||
@@ -16,7 +16,7 @@ from danswer.utils.logger import setup_logger
|
||||
logger = setup_logger()
|
||||
|
||||
_VIEWSPACE_PERMISSION_TYPE = "VIEWSPACE"
|
||||
_REQUEST_PAGINATION_LIMIT = 100
|
||||
_REQUEST_PAGINATION_LIMIT = 5000
|
||||
|
||||
|
||||
def _get_server_space_permissions(
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Any
|
||||
from atlassian import Confluence # type: ignore
|
||||
|
||||
from danswer.connectors.confluence.onyx_confluence import OnyxConfluence
|
||||
from danswer.connectors.confluence.utils import build_confluence_client
|
||||
@@ -15,12 +15,8 @@ def _get_group_members_email_paginated(
|
||||
confluence_client: OnyxConfluence,
|
||||
group_name: str,
|
||||
) -> set[str]:
|
||||
members: list[dict[str, Any]] = []
|
||||
for member_batch in confluence_client.paginated_group_members_retrieval(group_name):
|
||||
members.extend(member_batch)
|
||||
|
||||
group_member_emails: set[str] = set()
|
||||
for member in members:
|
||||
for member in confluence_client.paginated_group_members_retrieval(group_name):
|
||||
email = member.get("email")
|
||||
if not email:
|
||||
user_name = member.get("username")
|
||||
@@ -38,19 +34,33 @@ def _get_group_members_email_paginated(
|
||||
def confluence_group_sync(
|
||||
cc_pair: ConnectorCredentialPair,
|
||||
) -> list[ExternalUserGroup]:
|
||||
credentials = cc_pair.credential.credential_json
|
||||
is_cloud = cc_pair.connector.connector_specific_config.get("is_cloud", False)
|
||||
wiki_base = cc_pair.connector.connector_specific_config["wiki_base"]
|
||||
|
||||
# test connection with direct client, no retries
|
||||
confluence_client = Confluence(
|
||||
api_version="cloud" if is_cloud else "latest",
|
||||
url=wiki_base.rstrip("/"),
|
||||
username=credentials["confluence_username"] if is_cloud else None,
|
||||
password=credentials["confluence_access_token"] if is_cloud else None,
|
||||
token=credentials["confluence_access_token"] if not is_cloud else None,
|
||||
)
|
||||
spaces = confluence_client.get_all_spaces(limit=1)
|
||||
if not spaces:
|
||||
raise RuntimeError(f"No spaces found at {wiki_base}!")
|
||||
|
||||
confluence_client = build_confluence_client(
|
||||
credentials_json=cc_pair.credential.credential_json,
|
||||
credentials_json=credentials,
|
||||
is_cloud=is_cloud,
|
||||
wiki_base=cc_pair.connector.connector_specific_config["wiki_base"],
|
||||
wiki_base=wiki_base,
|
||||
)
|
||||
|
||||
# Get all group names
|
||||
group_names: list[str] = []
|
||||
for group_batch in confluence_client.paginated_groups_retrieval():
|
||||
for group in group_batch:
|
||||
if group_name := group.get("name"):
|
||||
group_names.append(group_name)
|
||||
for group in confluence_client.paginated_groups_retrieval():
|
||||
if group_name := group.get("name"):
|
||||
group_names.append(group_name)
|
||||
|
||||
# For each group name, get all members and create a danswer group
|
||||
danswer_groups: list[ExternalUserGroup] = []
|
||||
|
||||
@@ -132,13 +132,18 @@ def _seed_personas(db_session: Session, personas: list[CreatePersonaRequest]) ->
|
||||
if personas:
|
||||
logger.notice("Seeding Personas")
|
||||
for persona in personas:
|
||||
if not persona.prompt_ids:
|
||||
raise ValueError(
|
||||
f"Invalid Persona with name {persona.name}; no prompts exist"
|
||||
)
|
||||
|
||||
upsert_persona(
|
||||
user=None, # Seeding is done as admin
|
||||
name=persona.name,
|
||||
description=persona.description,
|
||||
num_chunks=persona.num_chunks
|
||||
if persona.num_chunks is not None
|
||||
else 0.0,
|
||||
num_chunks=(
|
||||
persona.num_chunks if persona.num_chunks is not None else 0.0
|
||||
),
|
||||
llm_relevance_filter=persona.llm_relevance_filter,
|
||||
llm_filter_extraction=persona.llm_filter_extraction,
|
||||
recency_bias=RecencyBiasSetting.AUTO,
|
||||
|
||||
@@ -3,7 +3,7 @@ cohere==5.6.1
|
||||
fastapi==0.109.2
|
||||
google-cloud-aiplatform==1.58.0
|
||||
numpy==1.26.4
|
||||
openai==1.52.2
|
||||
openai==1.55.3
|
||||
pydantic==2.8.2
|
||||
retry==0.9.2
|
||||
safetensors==0.4.2
|
||||
|
||||
@@ -42,7 +42,7 @@ class PersonaManager:
|
||||
"is_public": is_public,
|
||||
"llm_filter_extraction": llm_filter_extraction,
|
||||
"recency_bias": recency_bias,
|
||||
"prompt_ids": prompt_ids or [],
|
||||
"prompt_ids": prompt_ids or [0],
|
||||
"document_set_ids": document_set_ids or [],
|
||||
"tool_ids": tool_ids or [],
|
||||
"llm_model_provider_override": llm_model_provider_override,
|
||||
|
||||
@@ -379,6 +379,7 @@ export function AssistantEditor({
|
||||
if (!promptResponse.ok) {
|
||||
error = await promptResponse.text();
|
||||
}
|
||||
|
||||
if (!personaResponse) {
|
||||
error = "Failed to create Assistant - no response received";
|
||||
} else if (!personaResponse.ok) {
|
||||
|
||||
@@ -259,9 +259,8 @@ export async function updatePersona(
|
||||
): Promise<[Response, Response | null]> {
|
||||
const { id, existingPromptId } = personaUpdateRequest;
|
||||
|
||||
// first update prompt
|
||||
let promptResponse;
|
||||
let promptId;
|
||||
let promptId: number | null = null;
|
||||
if (existingPromptId !== undefined) {
|
||||
promptResponse = await updatePrompt({
|
||||
promptId: existingPromptId,
|
||||
@@ -278,9 +277,10 @@ export async function updatePersona(
|
||||
taskPrompt: personaUpdateRequest.task_prompt,
|
||||
includeCitations: personaUpdateRequest.include_citations,
|
||||
});
|
||||
promptId = promptResponse.ok ? (await promptResponse.json()).id : null;
|
||||
promptId = promptResponse.ok
|
||||
? ((await promptResponse.json()).id as number)
|
||||
: null;
|
||||
}
|
||||
|
||||
let fileId = null;
|
||||
if (personaUpdateRequest.uploaded_image) {
|
||||
fileId = await uploadFile(personaUpdateRequest.uploaded_image);
|
||||
@@ -290,7 +290,7 @@ export async function updatePersona(
|
||||
}
|
||||
|
||||
const updatePersonaResponse =
|
||||
promptResponse.ok && promptId
|
||||
promptResponse.ok && promptId !== null
|
||||
? await fetch(`/api/persona/${id}`, {
|
||||
method: "PATCH",
|
||||
headers: {
|
||||
|
||||
@@ -33,18 +33,19 @@ import {
|
||||
} from "@/components/ui/select";
|
||||
|
||||
export function AssistantGalleryCard({
|
||||
onlyAssistant,
|
||||
assistant,
|
||||
user,
|
||||
setPopup,
|
||||
selectedAssistant,
|
||||
}: {
|
||||
onlyAssistant: boolean;
|
||||
assistant: Persona;
|
||||
user: User | null;
|
||||
setPopup: (popup: PopupSpec) => void;
|
||||
selectedAssistant: boolean;
|
||||
}) {
|
||||
const { data: categories } = useCategories();
|
||||
|
||||
const { refreshUser } = useUser();
|
||||
|
||||
return (
|
||||
@@ -83,10 +84,7 @@ export function AssistantGalleryCard({
|
||||
"
|
||||
icon={FiMinus}
|
||||
onClick={async () => {
|
||||
if (
|
||||
user.preferences?.chosen_assistants &&
|
||||
user.preferences?.chosen_assistants.length === 1
|
||||
) {
|
||||
if (onlyAssistant) {
|
||||
setPopup({
|
||||
message: `Cannot remove "${assistant.name}" - you must have at least one assistant.`,
|
||||
type: "error",
|
||||
@@ -356,6 +354,7 @@ export function AssistantsGallery() {
|
||||
>
|
||||
{defaultAssistants.map((assistant) => (
|
||||
<AssistantGalleryCard
|
||||
onlyAssistant={visibleAssistants.length === 1}
|
||||
selectedAssistant={visibleAssistants.includes(assistant)}
|
||||
key={assistant.id}
|
||||
assistant={assistant}
|
||||
@@ -389,6 +388,7 @@ export function AssistantsGallery() {
|
||||
>
|
||||
{nonDefaultAssistants.map((assistant) => (
|
||||
<AssistantGalleryCard
|
||||
onlyAssistant={visibleAssistants.length === 1}
|
||||
selectedAssistant={visibleAssistants.includes(assistant)}
|
||||
key={assistant.id}
|
||||
assistant={assistant}
|
||||
|
||||
@@ -60,7 +60,7 @@ import { CustomTooltip } from "@/components/tooltip/CustomTooltip";
|
||||
import { useAssistants } from "@/components/context/AssistantsContext";
|
||||
import { useUser } from "@/components/user/UserProvider";
|
||||
|
||||
function DraggableAssistantListItem(props: any) {
|
||||
function DraggableAssistantListItem({ ...props }: any) {
|
||||
const {
|
||||
attributes,
|
||||
listeners,
|
||||
@@ -100,6 +100,7 @@ function AssistantListItem({
|
||||
deleteAssistant,
|
||||
shareAssistant,
|
||||
isDragging,
|
||||
onlyAssistant,
|
||||
}: {
|
||||
assistant: Persona;
|
||||
user: User | null;
|
||||
@@ -109,14 +110,13 @@ function AssistantListItem({
|
||||
shareAssistant: Dispatch<SetStateAction<Persona | null>>;
|
||||
setPopup: (popupSpec: PopupSpec | null) => void;
|
||||
isDragging?: boolean;
|
||||
onlyAssistant: boolean;
|
||||
}) {
|
||||
const { refreshUser } = useUser();
|
||||
const router = useRouter();
|
||||
const [showSharingModal, setShowSharingModal] = useState(false);
|
||||
|
||||
const isOwnedByUser = checkUserOwnsAssistant(user, assistant);
|
||||
const currentChosenAssistants = user?.preferences
|
||||
?.chosen_assistants as number[];
|
||||
|
||||
return (
|
||||
<>
|
||||
@@ -192,13 +192,14 @@ function AssistantListItem({
|
||||
key="remove"
|
||||
className="flex items-center gap-x-2 px-4 py-2 hover:bg-gray-100 w-full text-left"
|
||||
onClick={async () => {
|
||||
if (currentChosenAssistants?.length === 1) {
|
||||
if (onlyAssistant) {
|
||||
setPopup({
|
||||
message: `Cannot remove "${assistant.name}" - you must have at least one assistant.`,
|
||||
type: "error",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const success = await removeAssistantFromList(
|
||||
assistant.id
|
||||
);
|
||||
@@ -432,6 +433,7 @@ export function AssistantsList() {
|
||||
<div className="w-full items-center py-4">
|
||||
{currentlyVisibleAssistants.map((assistant, index) => (
|
||||
<DraggableAssistantListItem
|
||||
onlyAssistant={currentlyVisibleAssistants.length === 1}
|
||||
deleteAssistant={setDeletingPersona}
|
||||
shareAssistant={setMakePublicPersona}
|
||||
key={assistant.id}
|
||||
@@ -461,6 +463,7 @@ export function AssistantsList() {
|
||||
<div className="w-full p-4">
|
||||
{ownedButHiddenAssistants.map((assistant, index) => (
|
||||
<AssistantListItem
|
||||
onlyAssistant={currentlyVisibleAssistants.length === 1}
|
||||
deleteAssistant={setDeletingPersona}
|
||||
shareAssistant={setMakePublicPersona}
|
||||
key={assistant.id}
|
||||
|
||||
@@ -52,6 +52,7 @@ import {
|
||||
useLayoutEffect,
|
||||
useRef,
|
||||
useState,
|
||||
useMemo,
|
||||
} from "react";
|
||||
import { usePopup } from "@/components/admin/connectors/Popup";
|
||||
import { SEARCH_PARAM_NAMES, shouldSubmitOnLoad } from "./searchParams";
|
||||
@@ -266,7 +267,6 @@ export function ChatPage({
|
||||
availableAssistants[0];
|
||||
|
||||
const noAssistants = liveAssistant == null || liveAssistant == undefined;
|
||||
|
||||
// always set the model override for the chat session, when an assistant, llm provider, or user preference exists
|
||||
useEffect(() => {
|
||||
const personaDefault = getLLMProviderOverrideForPersona(
|
||||
@@ -282,7 +282,7 @@ export function ChatPage({
|
||||
);
|
||||
}
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [liveAssistant, llmProviders, user?.preferences.default_model]);
|
||||
}, [liveAssistant, user?.preferences.default_model]);
|
||||
|
||||
const stopGenerating = () => {
|
||||
const currentSession = currentSessionId();
|
||||
|
||||
@@ -8,13 +8,6 @@ import {
|
||||
} from "@/app/admin/configuration/llm/interfaces";
|
||||
import { ToolSnapshot } from "../tools/interfaces";
|
||||
import { fetchToolsSS } from "../tools/fetchTools";
|
||||
import {
|
||||
OpenAIIcon,
|
||||
AnthropicIcon,
|
||||
AWSIcon,
|
||||
AzureIcon,
|
||||
OpenSourceIcon,
|
||||
} from "@/components/icons/icons";
|
||||
|
||||
export async function fetchAssistantEditorInfoSS(
|
||||
personaId?: number | string
|
||||
@@ -104,15 +97,22 @@ export async function fetchAssistantEditorInfoSS(
|
||||
? ((await personaResponse.json()) as Persona)
|
||||
: null;
|
||||
|
||||
return [
|
||||
{
|
||||
ccPairs,
|
||||
documentSets,
|
||||
llmProviders,
|
||||
user,
|
||||
existingPersona,
|
||||
tools: toolsResponse,
|
||||
},
|
||||
null,
|
||||
];
|
||||
let error: string | null = null;
|
||||
if (existingPersona?.builtin_persona) {
|
||||
return [null, "cannot update builtin persona"];
|
||||
}
|
||||
|
||||
return (
|
||||
error || [
|
||||
{
|
||||
ccPairs,
|
||||
documentSets,
|
||||
llmProviders,
|
||||
user,
|
||||
existingPersona,
|
||||
tools: toolsResponse,
|
||||
},
|
||||
null,
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
@@ -174,7 +174,6 @@ export function useLlmOverride(
|
||||
modelName: "",
|
||||
}
|
||||
);
|
||||
|
||||
const [llmOverride, setLlmOverride] = useState<LlmOverride>(
|
||||
currentChatSession && currentChatSession.current_alternate_model
|
||||
? destructureValue(currentChatSession.current_alternate_model)
|
||||
|
||||
Reference in New Issue
Block a user