Compare commits

...

2 Commits

Author SHA1 Message Date
pablodanswer
f91bac1cd9 improved logging 2024-11-08 12:16:45 -08:00
pablodanswer
5e25488d0a add additional logs 2024-11-08 11:52:22 -08:00
4 changed files with 46 additions and 27 deletions

View File

@@ -132,7 +132,8 @@ async def run_async_migrations() -> None:
)
except Exception as e:
logger.error(f"Error migrating schema {schema}: {e}")
raise
logger.warning("CONTINUING")
# raise
else:
try:
logger.info(f"Migrating schema: {schema_name}")

View File

@@ -406,7 +406,6 @@ def connector_indexing_proxy_task(
search_settings_id,
tenant_id,
global_version.is_ee_version(),
pure=False,
)
if not job:
@@ -441,14 +440,23 @@ def connector_indexing_proxy_task(
if not index_attempt.is_finished():
continue
if job.process:
exit_code = job.process.exitcode
task_logger.info(
f"Job exit code: {exit_code} for attempt={index_attempt_id} "
f"tenant={tenant_id} cc_pair={cc_pair_id} search_settings={search_settings_id}"
)
if job.status == "error":
if job.exception_queue and not job.exception_queue.empty():
error_message = job.exception_queue.get()
else:
error_message = job.exception()
task_logger.error(
f"Indexing proxy - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"error={job.exception()}"
f"attempt={index_attempt_id} tenant={tenant_id} "
f"cc_pair={cc_pair_id} search_settings={search_settings_id} "
f"error={error_message}"
)
job.release()

View File

@@ -4,9 +4,12 @@ not follow the expected behavior, etc.
NOTE: cannot use Celery directly due to
https://github.com/celery/celery/issues/7007#issuecomment-1740139367"""
import traceback
from collections.abc import Callable
from dataclasses import dataclass
from functools import partial
from multiprocessing import Process
from multiprocessing import Queue
from typing import Any
from typing import Literal
from typing import Optional
@@ -56,6 +59,8 @@ class SimpleJob:
id: int
process: Optional["Process"] = None
exception_info: Optional[str] = None
exception_queue: Optional[Queue] = None
def cancel(self) -> bool:
return self.release()
@@ -89,14 +94,24 @@ class SimpleJob:
def exception(self) -> str:
"""Needed to match the Dask API, but not implemented since we don't currently
have a way to get back the exception information from the child process."""
return (
f"Job with ID '{self.id}' was killed or encountered an unhandled exception."
)
if self.exception_info:
return self.exception_info
else:
return f"Job with ID '{self.id}' was killed or encountered an unhandled exception."
def _wrapper(q: Queue, func: Callable, *args: Any, **kwargs: Any) -> None:
try:
func(*args, **kwargs)
except Exception:
error_trace = traceback.format_exc()
q.put(error_trace)
# Re-raise the exception to ensure the process exits with a non-zero code
raise
class SimpleJobClient:
"""Drop in replacement for `dask.distributed.Client`"""
def __init__(self, n_workers: int = 1) -> None:
self.n_workers = n_workers
self.job_id_counter = 0
@@ -110,22 +125,16 @@ class SimpleJobClient:
logger.debug(f"Cleaning up job with id: '{job.id}'")
del self.jobs[job.id]
def submit(self, func: Callable, *args: Any, pure: bool = True) -> SimpleJob | None:
"""NOTE: `pure` arg is needed so this can be a drop in replacement for Dask"""
self._cleanup_completed_jobs()
if len(self.jobs) >= self.n_workers:
logger.debug(
f"No available workers to run job. Currently running '{len(self.jobs)}' jobs, with a limit of '{self.n_workers}'."
)
return None
job_id = self.job_id_counter
def submit(self, func: Callable, *args: Any, **kwargs: Any) -> Optional[SimpleJob]:
self.job_id_counter += 1
job_id = self.job_id_counter
process = Process(target=_run_in_process, args=(func, args), daemon=True)
job = SimpleJob(id=job_id, process=process)
process.start()
q: Queue = Queue()
wrapped_func = partial(_wrapper, q, func)
p = Process(target=wrapped_func, args=args, kwargs=kwargs)
job = SimpleJob(id=job_id, process=p)
p.start()
job.process = p
job.exception_queue = q # Store the queue in the job object
self.jobs[job_id] = job
return job

View File

@@ -26,6 +26,7 @@ spec:
"--hostname=indexing@%n",
"-Q",
"connector_indexing",
"--concurrency=1",
]
env:
- name: REDIS_PASSWORD