Compare commits

...

1 Commits

Author SHA1 Message Date
pablodanswer
4b737e9101 improve logging + query history 2025-02-10 13:24:37 -08:00
3 changed files with 65 additions and 23 deletions

View File

@@ -1,5 +1,6 @@
import csv
import io
from collections.abc import Generator
from datetime import datetime
from datetime import timezone
from uuid import UUID
@@ -35,6 +36,8 @@ from onyx.server.query_and_chat.models import ChatSessionsResponse
router = APIRouter()
CHUNK_SIZE = 500 # or whatever size is appropriate
def fetch_and_process_chat_session_history(
db_session: Session,
@@ -203,34 +206,61 @@ def get_query_history_as_csv(
end: datetime | None = None,
db_session: Session = Depends(get_session),
) -> StreamingResponse:
complete_chat_session_history = fetch_and_process_chat_session_history(
db_session=db_session,
start=start or datetime.fromtimestamp(0, tz=timezone.utc),
end=end or datetime.now(tz=timezone.utc),
feedback_type=None,
limit=None,
)
"""Stream the CSV in chunks to avoid timeout and high memory usage."""
# Set up the time range defaults
start_time = start or datetime.fromtimestamp(0, tz=timezone.utc)
end_time = end or datetime.now(tz=timezone.utc)
question_answer_pairs: list[QuestionAnswerPairSnapshot] = []
for chat_session_snapshot in complete_chat_session_history:
question_answer_pairs.extend(
QuestionAnswerPairSnapshot.from_chat_session_snapshot(chat_session_snapshot)
)
# Build the streaming generator
def generate_csv() -> Generator[str, None, None]:
# Prepare CSV writer
fieldnames = list(QuestionAnswerPairSnapshot.model_fields.keys())
buffer = io.StringIO()
writer = csv.DictWriter(buffer, fieldnames=fieldnames)
# Create an in-memory text stream
stream = io.StringIO()
writer = csv.DictWriter(
stream, fieldnames=list(QuestionAnswerPairSnapshot.model_fields.keys())
)
writer.writeheader()
for row in question_answer_pairs:
writer.writerow(row.to_json())
writer.writeheader()
yield buffer.getvalue()
buffer.seek(0)
buffer.truncate(0)
# Reset the stream's position to the start
stream.seek(0)
# Paginate or chunk the fetching of sessions
offset = 0
while True:
# Example chunk-based fetch from DB
chunk_of_chat_sessions = fetch_chat_sessions_eagerly_by_time(
start=start_time,
end=end_time,
db_session=db_session,
limit=CHUNK_SIZE,
offset=offset,
)
if not chunk_of_chat_sessions:
break # no more data
# Convert each chat session to question-answer pairs
for chat_session in chunk_of_chat_sessions:
chat_snapshot = snapshot_from_chat_session(
chat_session=chat_session,
db_session=db_session,
)
if chat_snapshot is None:
continue
# Build Q/A pairs
qa_pairs = QuestionAnswerPairSnapshot.from_chat_session_snapshot(
chat_snapshot
)
for row in qa_pairs:
writer.writerow(row.to_json())
# Yield the CSV row text
yield buffer.getvalue()
buffer.seek(0)
buffer.truncate(0)
offset += CHUNK_SIZE
return StreamingResponse(
iter([stream.getvalue()]),
generate_csv(),
media_type="text/csv",
headers={"Content-Disposition": "attachment;filename=onyx_query_history.csv"},
)

View File

@@ -68,6 +68,8 @@ logger = setup_logger()
def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
"""a lightweight task used to kick off indexing tasks.
Occcasionally does some validation of existing state to clear up error conditions"""
task = f"check_for_indexing start: tenant_id={tenant_id}"
time_start = time.monotonic()
tasks_created = 0
@@ -86,8 +88,11 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
task_logger.debug(f"{task} - Lock not acquired")
return None
task_logger.debug(f"{task} - Lock acquired")
try:
locked = True

View File

@@ -234,6 +234,8 @@ def cloud_beat_task_generator(
expires: int = BEAT_EXPIRES_DEFAULT,
) -> bool | None:
"""a lightweight task used to kick off individual beat tasks per tenant."""
task = f"cloud_beat_task_generator start: task_name={task_name}, queue={queue}, priority={priority}"
time_start = time.monotonic()
redis_client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
@@ -245,8 +247,10 @@ def cloud_beat_task_generator(
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
task_logger.debug(f"{task} - Lock not acquired")
return None
task_logger.debug(f"{task} - Lock acquired")
last_lock_time = time.monotonic()
tenant_ids: list[str] | list[None] = []
@@ -262,6 +266,9 @@ def cloud_beat_task_generator(
if IGNORED_SYNCING_TENANT_LIST and tenant_id in IGNORED_SYNCING_TENANT_LIST:
continue
task_logger.debug(
f"{task} - sending task: task_name={task_name}, tenant_id={tenant_id}"
)
self.app.send_task(
task_name,
kwargs=dict(