Compare commits

...

5 Commits

Author SHA1 Message Date
pablodanswer
2e8493b837 nit 2025-02-10 14:43:16 -08:00
pablodanswer
b467e29179 quick cleanup 2025-02-10 14:36:56 -08:00
pablodanswer
8a905571b0 nit 2025-02-10 14:26:42 -08:00
pablodanswer
1fa34c93ea update 2025-02-10 13:54:59 -08:00
pablodanswer
4b737e9101 improve logging + query history 2025-02-10 13:24:37 -08:00
9 changed files with 106 additions and 52 deletions

View File

@@ -13,6 +13,7 @@ def make_persona_private(
persona_id: int,
user_ids: list[UUID] | None,
group_ids: list[int] | None,
creator_id: UUID | None,
db_session: Session,
) -> None:
db_session.query(Persona__User).filter(
@@ -26,14 +27,15 @@ def make_persona_private(
for user_uuid in user_ids:
db_session.add(Persona__User(persona_id=persona_id, user_id=user_uuid))
create_notification(
user_id=user_uuid,
notif_type=NotificationType.PERSONA_SHARED,
db_session=db_session,
additional_data=PersonaSharedNotificationData(
persona_id=persona_id,
).model_dump(),
)
if creator_id and creator_id != user_uuid:
create_notification(
user_id=user_uuid,
notif_type=NotificationType.PERSONA_SHARED,
db_session=db_session,
additional_data=PersonaSharedNotificationData(
persona_id=persona_id,
).model_dump(),
)
if group_ids:
for group_id in group_ids:
db_session.add(

View File

@@ -132,8 +132,13 @@ def fetch_chat_sessions_eagerly_by_time(
end: datetime,
db_session: Session,
limit: int | None = 500,
offset: int | None = 0,
initial_time: datetime | None = None,
) -> list[ChatSession]:
"""
Fetch chunks of ChatSession objects eagerly by time, supporting pagination
with limit/offset.
"""
time_order: UnaryExpression = desc(ChatSession.time_created)
message_order: UnaryExpression = asc(ChatMessage.id)
@@ -150,6 +155,7 @@ def fetch_chat_sessions_eagerly_by_time(
.order_by(ChatSession.id, time_order)
.distinct(ChatSession.id)
.limit(limit)
.offset(offset)
.subquery()
)
@@ -167,6 +173,4 @@ def fetch_chat_sessions_eagerly_by_time(
.order_by(time_order, message_order)
)
chat_sessions = query.all()
return chat_sessions
return query.all()

View File

@@ -1,5 +1,6 @@
import csv
import io
from collections.abc import Generator
from datetime import datetime
from datetime import timezone
from uuid import UUID
@@ -35,6 +36,8 @@ from onyx.server.query_and_chat.models import ChatSessionsResponse
router = APIRouter()
CHUNK_SIZE = 500 # or whatever size is appropriate
def fetch_and_process_chat_session_history(
db_session: Session,
@@ -203,34 +206,61 @@ def get_query_history_as_csv(
end: datetime | None = None,
db_session: Session = Depends(get_session),
) -> StreamingResponse:
complete_chat_session_history = fetch_and_process_chat_session_history(
db_session=db_session,
start=start or datetime.fromtimestamp(0, tz=timezone.utc),
end=end or datetime.now(tz=timezone.utc),
feedback_type=None,
limit=None,
)
"""Stream the CSV in chunks to avoid timeout and high memory usage."""
# Set up the time range defaults
start_time = start or datetime.fromtimestamp(0, tz=timezone.utc)
end_time = end or datetime.now(tz=timezone.utc)
question_answer_pairs: list[QuestionAnswerPairSnapshot] = []
for chat_session_snapshot in complete_chat_session_history:
question_answer_pairs.extend(
QuestionAnswerPairSnapshot.from_chat_session_snapshot(chat_session_snapshot)
)
# Build the streaming generator
def generate_csv() -> Generator[str, None, None]:
# Prepare CSV writer
fieldnames = list(QuestionAnswerPairSnapshot.model_fields.keys())
buffer = io.StringIO()
writer = csv.DictWriter(buffer, fieldnames=fieldnames)
# Create an in-memory text stream
stream = io.StringIO()
writer = csv.DictWriter(
stream, fieldnames=list(QuestionAnswerPairSnapshot.model_fields.keys())
)
writer.writeheader()
for row in question_answer_pairs:
writer.writerow(row.to_json())
writer.writeheader()
yield buffer.getvalue()
buffer.seek(0)
buffer.truncate(0)
# Reset the stream's position to the start
stream.seek(0)
# Paginate or chunk the fetching of sessions
offset = 0
while True:
# Example chunk-based fetch from DB
chunk_of_chat_sessions = fetch_chat_sessions_eagerly_by_time(
start=start_time,
end=end_time,
db_session=db_session,
limit=CHUNK_SIZE,
offset=offset,
)
if not chunk_of_chat_sessions:
break # no more data
# Convert each chat session to question-answer pairs
for chat_session in chunk_of_chat_sessions:
chat_snapshot = snapshot_from_chat_session(
chat_session=chat_session,
db_session=db_session,
)
if chat_snapshot is None:
continue
# Build Q/A pairs
qa_pairs = QuestionAnswerPairSnapshot.from_chat_session_snapshot(
chat_snapshot
)
for row in qa_pairs:
writer.writerow(row.to_json())
# Yield the CSV row text
yield buffer.getvalue()
buffer.seek(0)
buffer.truncate(0)
offset += CHUNK_SIZE
return StreamingResponse(
iter([stream.getvalue()]),
generate_csv(),
media_type="text/csv",
headers={"Content-Disposition": "attachment;filename=onyx_query_history.csv"},
)

View File

@@ -156,6 +156,8 @@ class DynamicTenantScheduler(PersistentScheduler):
logger.info(
"_try_updating_schedule: Current schedule is up to date, no changes needed"
)
logger.info("Current schedule is the following:")
logger.info(current_schedule)
return
logger.info(

View File

@@ -68,6 +68,8 @@ logger = setup_logger()
def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
"""a lightweight task used to kick off indexing tasks.
Occcasionally does some validation of existing state to clear up error conditions"""
task = f"check_for_indexing start: tenant_id={tenant_id}"
time_start = time.monotonic()
tasks_created = 0
@@ -86,8 +88,11 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
task_logger.info(f"{task} - Lock not acquired")
return None
task_logger.info(f"{task} - Lock acquired")
try:
locked = True

View File

@@ -234,6 +234,8 @@ def cloud_beat_task_generator(
expires: int = BEAT_EXPIRES_DEFAULT,
) -> bool | None:
"""a lightweight task used to kick off individual beat tasks per tenant."""
task = f"cloud_beat_task_generator start: task_name={task_name}, queue={queue}, priority={priority}"
time_start = time.monotonic()
redis_client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID)
@@ -245,8 +247,10 @@ def cloud_beat_task_generator(
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
task_logger.info(f"{task} - Lock not acquired")
return None
task_logger.info(f"{task} - Lock acquired")
last_lock_time = time.monotonic()
tenant_ids: list[str] | list[None] = []
@@ -262,6 +266,9 @@ def cloud_beat_task_generator(
if IGNORED_SYNCING_TENANT_LIST and tenant_id in IGNORED_SYNCING_TENANT_LIST:
continue
task_logger.info(
f"{task} - sending task: task_name={task_name}, tenant_id={tenant_id}"
)
self.app.send_task(
task_name,
kwargs=dict(

View File

@@ -163,6 +163,7 @@ def make_persona_private(
persona_id: int,
user_ids: list[UUID] | None,
group_ids: list[int] | None,
creator_id: UUID | None,
db_session: Session,
) -> None:
if user_ids is not None:
@@ -173,14 +174,15 @@ def make_persona_private(
for user_uuid in user_ids:
db_session.add(Persona__User(persona_id=persona_id, user_id=user_uuid))
create_notification(
user_id=user_uuid,
notif_type=NotificationType.PERSONA_SHARED,
db_session=db_session,
additional_data=PersonaSharedNotificationData(
persona_id=persona_id,
).model_dump(),
)
if creator_id and creator_id != user_uuid:
create_notification(
user_id=user_uuid,
notif_type=NotificationType.PERSONA_SHARED,
db_session=db_session,
additional_data=PersonaSharedNotificationData(
persona_id=persona_id,
).model_dump(),
)
db_session.commit()
@@ -240,6 +242,7 @@ def create_update_persona(
persona_id=persona.id,
user_ids=create_persona_request.users,
group_ids=create_persona_request.groups,
creator_id=user.id if user else None,
db_session=db_session,
)
@@ -275,6 +278,7 @@ def update_persona_shared_users(
persona_id=persona_id,
user_ids=user_ids,
group_ids=None,
creator_id=user.id if user else None,
db_session=db_session,
)

View File

@@ -195,7 +195,7 @@ export function UserDropdown({
) : hideUserDropdown ? (
<DropdownOption
onClick={() => router.push("/auth/login")}
icon={<UserIcon className="h-5w-5 my-auto " />}
icon={<UserIcon className="h-5 w-5 my-auto " />}
label="Log In"
/>
) : (
@@ -208,14 +208,14 @@ export function UserDropdown({
item.svg_logo ? (
<div
className="
h-4
w-4
my-auto
overflow-hidden
flex
items-center
justify-center
"
h-4
w-4
my-auto
overflow-hidden
flex
items-center
justify-center
"
aria-label={item.title}
>
<svg

View File

@@ -62,7 +62,7 @@ export function Popover({
<RadixPopover.Portal>
<RadixPopover.Content
className={`
PopoverContent z-[100]
PopoverContent z-50
${contentClassName}
${matchWidth ? " PopoverContentMatchWidth" : ""}
`}