Compare commits

...

5 Commits

Author SHA1 Message Date
rkuo-danswer
06c35d9e17 Merge pull request #3016 from danswer-ai/hotfix/v0.11-indexing-logs
Merge hotfix/v0.11-indexing-logs into release/v0.11
2024-10-31 15:22:05 -07:00
rkuo-danswer
1df917013e Feature/indexing logs (#3002)
* improve logging around indexing tasks

* task_logger doesn't work inside the spawned task
2024-10-31 20:24:28 +00:00
rkuo-danswer
8f063e1be0 Merge pull request #2990 from danswer-ai/hotfix/v0.11-worker-process-init
Merge hotfix/v0.11-worker-process-init into release/v0.11
2024-10-29 12:12:44 -07:00
rkuo-danswer
67be4bbe5e init sqlalchemy in child process (#2987) 2024-10-29 18:52:38 +00:00
hagen-danswer
6ff559a0f9 fixed label filter (#2978)
* added old error handling to comment fetching

* Not

* properly escaped cql labels

* reverted changes
2024-10-29 16:23:38 +00:00
6 changed files with 90 additions and 36 deletions

View File

@@ -21,6 +21,8 @@ celery_app.config_from_object("danswer.background.celery.configs.beat")
@beat_init.connect
def on_beat_init(sender: Any, **kwargs: Any) -> None:
logger.info("beat_init signal received.")
# celery beat shouldn't touch the db at all. But just setting a low minimum here.
SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME)
SqlEngine.init_engine(pool_size=2, max_overflow=0)
app_base.wait_for_redis(sender, **kwargs)

View File

@@ -58,7 +58,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
SqlEngine.init_engine(pool_size=8, max_overflow=0)
SqlEngine.init_engine(pool_size=4, max_overflow=12)
app_base.wait_for_redis(sender, **kwargs)
app_base.on_secondary_worker_init(sender, **kwargs)

View File

@@ -166,19 +166,6 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
r.delete(key)
# @worker_process_init.connect
# def on_worker_process_init(sender: Any, **kwargs: Any) -> None:
# """This only runs inside child processes when the worker is in pool=prefork mode.
# This may be technically unnecessary since we're finding prefork pools to be
# unstable and currently aren't planning on using them."""
# logger.info("worker_process_init signal received.")
# SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
# SqlEngine.init_engine(pool_size=5, max_overflow=0)
# # https://stackoverflow.com/questions/43944787/sqlalchemy-celery-with-scoped-session-error
# SqlEngine.get_engine().dispose(close=False)
@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
app_base.on_worker_ready(sender, **kwargs)

View File

@@ -388,7 +388,12 @@ def connector_indexing_proxy_task(
tenant_id: str | None,
) -> None:
"""celery tasks are forked, but forking is unstable. This proxies work to a spawned task."""
task_logger.info(
f"Indexing proxy - starting: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
client = SimpleJobClient()
job = client.submit(
@@ -402,29 +407,56 @@ def connector_indexing_proxy_task(
)
if not job:
task_logger.info(
f"Indexing proxy - spawn failed: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
return
task_logger.info(
f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
while True:
sleep(10)
with get_session_with_tenant(tenant_id) as db_session:
index_attempt = get_index_attempt(
db_session=db_session, index_attempt_id=index_attempt_id
)
# do nothing for ongoing jobs that haven't been stopped
if not job.done():
# do nothing for ongoing jobs that haven't been stopped
if not job.done():
with get_session_with_tenant(tenant_id) as db_session:
index_attempt = get_index_attempt(
db_session=db_session, index_attempt_id=index_attempt_id
)
if not index_attempt:
continue
if not index_attempt.is_finished():
continue
if job.status == "error":
logger.error(job.exception())
if job.status == "error":
task_logger.error(
f"Indexing proxy - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"error={job.exception()}"
)
job.release()
break
job.release()
break
task_logger.info(
f"Indexing proxy - finished: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
return
@@ -446,7 +478,17 @@ def connector_indexing_task(
Returns None if the task did not run (possibly due to a conflict).
Otherwise, returns an int >= 0 representing the number of indexed docs.
NOTE: if an exception is raised out of this task, the primary worker will detect
that the task transitioned to a "READY" state but the generator_complete_key doesn't exist.
This will cause the primary worker to abort the indexing attempt and clean up.
"""
logger.info(
f"Indexing spawned task starting: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
attempt = None
n_final_progress = 0
@@ -485,19 +527,19 @@ def connector_indexing_task(
cast(str, fence_json)
)
except ValueError:
task_logger.exception(
logger.exception(
f"connector_indexing_task: fence_data not decodeable: fence={rci.fence_key}"
)
raise
if fence_data.index_attempt_id is None or fence_data.celery_task_id is None:
task_logger.info(
logger.info(
f"connector_indexing_task - Waiting for fence: fence={rci.fence_key}"
)
sleep(1)
continue
task_logger.info(
logger.info(
f"connector_indexing_task - Fence found, continuing...: fence={rci.fence_key}"
)
break
@@ -509,7 +551,7 @@ def connector_indexing_task(
acquired = lock.acquire(blocking=False)
if not acquired:
task_logger.warning(
logger.warning(
f"Indexing task already running, exiting...: "
f"cc_pair={cc_pair_id} search_settings={search_settings_id}"
)
@@ -552,6 +594,13 @@ def connector_indexing_task(
rcs.fence_key, rci.generator_progress_key, lock, r
)
logger.info(
f"Indexing spawned task running entrypoint: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
run_indexing_entrypoint(
index_attempt_id,
tenant_id,
@@ -570,7 +619,12 @@ def connector_indexing_task(
r.set(rci.generator_complete_key, HTTPStatus.OK.value)
except Exception as e:
task_logger.exception(f"Indexing failed: cc_pair={cc_pair_id}")
logger.exception(
f"Indexing spawned task failed: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
if attempt:
with get_session_with_tenant(tenant_id) as db_session:
mark_attempt_failed(attempt, db_session, failure_reason=str(e))
@@ -584,4 +638,10 @@ def connector_indexing_task(
if lock.owned():
lock.release()
logger.info(
f"Indexing spawned task finished: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
return n_final_progress

View File

@@ -11,7 +11,8 @@ from typing import Any
from typing import Literal
from typing import Optional
from danswer.db.engine import get_sqlalchemy_engine
from danswer.configs.constants import POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME
from danswer.db.engine import SqlEngine
from danswer.utils.logger import setup_logger
logger = setup_logger()
@@ -37,7 +38,9 @@ def _initializer(
if kwargs is None:
kwargs = {}
get_sqlalchemy_engine().dispose(close=False)
logger.info("Initializing spawned worker child process.")
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
return func(*args, **kwargs)

View File

@@ -91,12 +91,13 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
cql_page_query += f" and id='{page_id}'"
self.cql_page_query = cql_page_query
self.cql_label_filter = ""
self.cql_time_filter = ""
self.cql_label_filter = ""
if labels_to_skip:
labels_to_skip = list(set(labels_to_skip))
comma_separated_labels = ",".join(labels_to_skip)
self.cql_label_filter = f"&label not in ({comma_separated_labels})"
comma_separated_labels = ",".join(f"'{label}'" for label in labels_to_skip)
self.cql_label_filter = f" and label not in ({comma_separated_labels})"
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
# see https://github.com/atlassian-api/atlassian-python-api/blob/master/atlassian/rest_client.py
@@ -125,7 +126,8 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
for comment in comments:
comment_string += "\nComment:\n"
comment_string += extract_text_from_confluence_html(
confluence_client=self.confluence_client, confluence_object=comment
confluence_client=self.confluence_client,
confluence_object=comment,
)
return comment_string