1
0
forked from github/onyx

Compare commits

...

6 Commits

Author SHA1 Message Date
pablodanswer
189d62b72e k 2024-11-08 18:01:11 -08:00
pablodanswer
89cb3b503c minor updates 2024-11-08 17:59:15 -08:00
pablodanswer
cdda24f9ea remove log 2024-11-08 17:57:48 -08:00
pablodanswer
6dc4ca344c k 2024-11-08 14:08:16 -08:00
pablodanswer
f91bac1cd9 improved logging 2024-11-08 12:16:45 -08:00
pablodanswer
5e25488d0a add additional logs 2024-11-08 11:52:22 -08:00
7 changed files with 58 additions and 31 deletions

View File

@@ -359,6 +359,7 @@ def try_creating_indexing_task(
task_id=custom_task_id,
priority=DanswerCeleryPriority.MEDIUM,
)
if not result:
raise RuntimeError("send_task for connector_indexing_proxy_task failed.")
@@ -406,7 +407,6 @@ def connector_indexing_proxy_task(
search_settings_id,
tenant_id,
global_version.is_ee_version(),
pure=False,
)
if not job:
@@ -441,14 +441,23 @@ def connector_indexing_proxy_task(
if not index_attempt.is_finished():
continue
if job.process:
exit_code = job.process.exitcode
task_logger.info(
f"Job exit code: {exit_code} for attempt={index_attempt_id} "
f"tenant={tenant_id} cc_pair={cc_pair_id} search_settings={search_settings_id}"
)
if job.status == "error":
if job.exception_queue and not job.exception_queue.empty():
error_message = job.exception_queue.get()
else:
error_message = job.exception()
task_logger.error(
f"Indexing proxy - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"error={job.exception()}"
f"attempt={index_attempt_id} tenant={tenant_id} "
f"cc_pair={cc_pair_id} search_settings={search_settings_id} "
f"error={error_message}"
)
job.release()
@@ -469,7 +478,7 @@ def connector_indexing_task(
search_settings_id: int,
tenant_id: str | None,
is_ee: bool,
) -> int | None:
) -> None:
"""Indexing task. For a cc pair, this task pulls all document IDs from the source
and compares those IDs to locally stored documents and deletes all locally stored IDs missing
from the most recently pulled document ID list
@@ -626,6 +635,7 @@ def connector_indexing_task(
# get back the total number of indexed docs and return it
n_final_progress = redis_connector_index.get_progress()
redis_connector_index.set_generator_complete(HTTPStatus.OK.value)
except Exception as e:
logger.exception(
f"Indexing spawned task failed: attempt={index_attempt_id} "
@@ -647,5 +657,5 @@ def connector_indexing_task(
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
f"n_indexed_docs={n_final_progress}"
)
return n_final_progress

View File

@@ -4,9 +4,12 @@ not follow the expected behavior, etc.
NOTE: cannot use Celery directly due to
https://github.com/celery/celery/issues/7007#issuecomment-1740139367"""
import traceback
from collections.abc import Callable
from dataclasses import dataclass
from functools import partial
from multiprocessing import Process
from multiprocessing import Queue
from typing import Any
from typing import Literal
from typing import Optional
@@ -56,6 +59,8 @@ class SimpleJob:
id: int
process: Optional["Process"] = None
exception_info: Optional[str] = None
exception_queue: Optional[Queue] = None
def cancel(self) -> bool:
return self.release()
@@ -89,14 +94,24 @@ class SimpleJob:
def exception(self) -> str:
"""Needed to match the Dask API, but not implemented since we don't currently
have a way to get back the exception information from the child process."""
return (
f"Job with ID '{self.id}' was killed or encountered an unhandled exception."
)
if self.exception_info:
return self.exception_info
else:
return f"No exception info available for job with ID '{self.id}'."
def _wrapper(q: Queue, func: Callable, *args: Any, **kwargs: Any) -> None:
try:
func(*args, **kwargs)
except Exception:
error_trace = traceback.format_exc()
q.put(error_trace)
# Re-raise the exception to ensure the process exits with a non-zero code
raise
class SimpleJobClient:
"""Drop in replacement for `dask.distributed.Client`"""
def __init__(self, n_workers: int = 1) -> None:
self.n_workers = n_workers
self.job_id_counter = 0
@@ -110,22 +125,22 @@ class SimpleJobClient:
logger.debug(f"Cleaning up job with id: '{job.id}'")
del self.jobs[job.id]
def submit(self, func: Callable, *args: Any, pure: bool = True) -> SimpleJob | None:
"""NOTE: `pure` arg is needed so this can be a drop in replacement for Dask"""
def submit(self, func: Callable, *args: Any, **kwargs: Any) -> Optional[SimpleJob]:
self._cleanup_completed_jobs()
if len(self.jobs) >= self.n_workers:
logger.debug(
f"No available workers to run job. Currently running '{len(self.jobs)}' jobs, with a limit of '{self.n_workers}'."
)
return None
job_id = self.job_id_counter
self.job_id_counter += 1
job_id = self.job_id_counter
process = Process(target=_run_in_process, args=(func, args), daemon=True)
job = SimpleJob(id=job_id, process=process)
process.start()
q: Queue = Queue()
wrapped_func = partial(_wrapper, q, func)
p = Process(target=wrapped_func, args=args, kwargs=kwargs)
job = SimpleJob(id=job_id, process=p)
p.start()
job.process = p
job.exception_queue = q # Store the queue in the job object
self.jobs[job_id] = job
return job

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-beat
image: danswer/danswer-backend:v0.11.0-cloud.beta.8
image: danswer/danswer-backend:v0.12.0-cloud.beta.1
imagePullPolicy: Always
command:
[
@@ -31,7 +31,7 @@ spec:
name: danswer-secrets
key: redis_password
- name: DANSWER_VERSION
value: "v0.11.0-cloud.beta.8"
value: "v0.12.0-cloud.beta.1"
envFrom:
- configMapRef:
name: env-configmap

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-heavy
image: danswer/danswer-backend:v0.11.0-cloud.beta.8
image: danswer/danswer-backend:v0.12.0-cloud.beta.1
imagePullPolicy: Always
command:
[
@@ -34,7 +34,7 @@ spec:
name: danswer-secrets
key: redis_password
- name: DANSWER_VERSION
value: "v0.11.0-cloud.beta.8"
value: "v0.12.0-cloud.beta.1"
envFrom:
- configMapRef:
name: env-configmap

View File

@@ -26,6 +26,8 @@ spec:
"--hostname=indexing@%n",
"-Q",
"connector_indexing",
"--concurrency=1",
"--pool=solo",
]
env:
- name: REDIS_PASSWORD
@@ -34,7 +36,7 @@ spec:
name: danswer-secrets
key: redis_password
- name: DANSWER_VERSION
value: "v0.11.0-cloud.beta.8"
value: "v0.12.0-cloud.beta.1"
envFrom:
- configMapRef:
name: env-configmap

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-light
image: danswer/danswer-backend:v0.11.0-cloud.beta.8
image: danswer/danswer-backend:v0.12.0-cloud.beta.1
imagePullPolicy: Always
command:
[
@@ -34,7 +34,7 @@ spec:
name: danswer-secrets
key: redis_password
- name: DANSWER_VERSION
value: "v0.11.0-cloud.beta.8"
value: "v0.12.0-cloud.beta.1"
envFrom:
- configMapRef:
name: env-configmap

View File

@@ -14,7 +14,7 @@ spec:
spec:
containers:
- name: celery-worker-primary
image: danswer/danswer-backend:v0.11.0-cloud.beta.8
image: danswer/danswer-backend:v0.12.0-cloud.beta.1
imagePullPolicy: Always
command:
[
@@ -34,7 +34,7 @@ spec:
name: danswer-secrets
key: redis_password
- name: DANSWER_VERSION
value: "v0.11.0-cloud.beta.8"
value: "v0.12.0-cloud.beta.1"
envFrom:
- configMapRef:
name: env-configmap