Compare commits

...

7 Commits

Author SHA1 Message Date
Richard Kuo (Danswer)
a978460d04 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_status 2024-12-11 13:29:30 -08:00
Richard Kuo (Danswer)
defcb9ef88 addressed comments 2024-12-11 13:18:13 -08:00
Richard Kuo (Danswer)
d111551263 Merge branch 'main' of https://github.com/danswer-ai/danswer into feature/background_status
# Conflicts:
#	backend/danswer/server/documents/models.py
2024-12-11 11:56:49 -08:00
Richard Kuo (Danswer)
739edc87cc cleaned imports and commented code 2024-12-11 10:59:39 -08:00
Richard Kuo (Danswer)
391bf5629c not using dayjs 2024-12-11 10:51:40 -08:00
Richard Kuo (Danswer)
124bd23eb9 fix imports 2024-12-11 10:39:11 -08:00
Richard Kuo (Danswer)
e177c02b36 add background indexing status page 2024-12-11 10:19:44 -08:00
15 changed files with 357 additions and 48 deletions

View File

@@ -64,6 +64,7 @@ from danswer.redis.redis_connector_doc_perm_sync import (
)
from danswer.redis.redis_connector_index import RedisConnectorIndex
from danswer.redis.redis_connector_prune import RedisConnectorPrune
from danswer.redis.redis_connector_utils import RedisConnectorUtils
from danswer.redis.redis_document_set import RedisDocumentSet
from danswer.redis.redis_pool import get_redis_client
from danswer.redis.redis_usergroup import RedisUserGroup
@@ -402,7 +403,7 @@ def monitor_connector_deletion_taskset(
tenant_id: str | None, key_bytes: bytes, r: Redis
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
cc_pair_id_str = RedisConnectorUtils.get_id_from_fence_key(fence_key)
if cc_pair_id_str is None:
task_logger.warning(f"could not parse cc_pair_id from {fence_key}")
return
@@ -528,7 +529,7 @@ def monitor_ccpair_pruning_taskset(
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
cc_pair_id_str = RedisConnectorUtils.get_id_from_fence_key(fence_key)
if cc_pair_id_str is None:
task_logger.warning(
f"monitor_ccpair_pruning_taskset: could not parse cc_pair_id from {fence_key}"
@@ -566,7 +567,7 @@ def monitor_ccpair_permissions_taskset(
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
) -> None:
fence_key = key_bytes.decode("utf-8")
cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key)
cc_pair_id_str = RedisConnectorUtils.get_id_from_fence_key(fence_key)
if cc_pair_id_str is None:
task_logger.warning(
f"monitor_ccpair_permissions_taskset: could not parse cc_pair_id from {fence_key}"
@@ -604,25 +605,15 @@ def monitor_ccpair_permissions_taskset(
def monitor_ccpair_indexing_taskset(
tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session
) -> None:
# if the fence doesn't exist, there's nothing to do
fence_key = key_bytes.decode("utf-8")
composite_id = RedisConnector.get_id_from_fence_key(fence_key)
if composite_id is None:
redis_ids = RedisConnectorIndex.parse_key(key_bytes)
if not redis_ids:
task_logger.warning(
f"monitor_ccpair_indexing_taskset: could not parse composite_id from {fence_key}"
f"monitor_ccpair_indexing_taskset: could not parse composite_id from {key_bytes!r}"
)
return
# parse out metadata and initialize the helper class with it
parts = composite_id.split("/")
if len(parts) != 2:
return
cc_pair_id = int(parts[0])
search_settings_id = int(parts[1])
redis_connector = RedisConnector(tenant_id, cc_pair_id)
redis_connector_index = redis_connector.new_index(search_settings_id)
redis_connector = RedisConnector(tenant_id, redis_ids.cc_pair_id)
redis_connector_index = redis_connector.new_index(redis_ids.search_settings_id)
if not redis_connector_index.fenced:
return
@@ -635,8 +626,8 @@ def monitor_ccpair_indexing_taskset(
progress = redis_connector_index.get_progress()
if progress is not None:
task_logger.info(
f"Connector indexing progress: cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"Connector indexing progress: cc_pair={redis_ids.cc_pair_id} "
f"search_settings={redis_ids.search_settings_id} "
f"progress={progress} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
)
@@ -671,8 +662,8 @@ def monitor_ccpair_indexing_taskset(
f"Connector indexing aborted or exceptioned: "
f"attempt={payload.index_attempt_id} "
f"celery_task={payload.celery_task_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"cc_pair={redis_ids.cc_pair_id} "
f"search_settings={redis_ids.search_settings_id} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f} "
f"result.state={task_state} "
f"result.result={task_result} "
@@ -699,8 +690,8 @@ def monitor_ccpair_indexing_taskset(
"monitor_ccpair_indexing_taskset - transient exception marking index attempt as failed: "
f"attempt={payload.index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
f"cc_pair={redis_ids.cc_pair_id} "
f"search_settings={redis_ids.search_settings_id}"
)
redis_connector_index.reset()
@@ -709,8 +700,8 @@ def monitor_ccpair_indexing_taskset(
status_enum = HTTPStatus(status_int)
task_logger.info(
f"Connector indexing finished: cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"Connector indexing finished: cc_pair={redis_ids.cc_pair_id} "
f"search_settings={redis_ids.search_settings_id} "
f"progress={progress} "
f"status={status_enum.name} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"

View File

@@ -375,6 +375,6 @@ class Chunker:
final_chunks.extend(chunks)
if self.callback:
self.callback.progress("Chunker.chunk", len(chunks))
self.callback.progress("Chunker.chunk", 0)
return final_chunks

View File

@@ -12,4 +12,5 @@ class IndexingHeartbeatInterface(ABC):
@abstractmethod
def progress(self, tag: str, amount: int) -> None:
"""Send progress updates to the caller."""
"""Send progress updates to the caller.
Send 0 for the amount if you just want to do a keep alive ping."""

View File

@@ -182,7 +182,7 @@ class EmbeddingModel:
embeddings.extend(response.embeddings)
if self.callback:
self.callback.progress("_batch_encode_texts", 1)
self.callback.progress("_batch_encode_texts", 0)
return embeddings
def encode(

View File

@@ -72,24 +72,6 @@ class RedisConnector:
return finished
@staticmethod
def get_id_from_fence_key(key: str) -> str | None:
"""
Extracts the object ID from a fence key in the format `PREFIX_fence_X`.
Args:
key (str): The fence key string.
Returns:
Optional[int]: The extracted ID if the key is in the correct format, otherwise None.
"""
parts = key.split("_")
if len(parts) != 3:
return None
object_id = parts[2]
return object_id
@staticmethod
def get_id_from_task_id(task_id: str) -> str | None:
"""

View File

@@ -5,6 +5,13 @@ from uuid import uuid4
import redis
from pydantic import BaseModel
from danswer.redis.redis_connector_utils import RedisConnectorUtils
class RedisConnectorIndexIdentifiers(BaseModel):
cc_pair_id: int
search_settings_id: int
class RedisConnectorIndexPayload(BaseModel):
index_attempt_id: int | None
@@ -119,7 +126,8 @@ class RedisConnectorIndex:
self.redis.delete(self.generator_complete_key)
def get_progress(self) -> int | None:
"""Returns None if the key doesn't exist. The"""
"""Returns None if the key doesn't exist. Returns an int representing
the number of documents processed otherwise."""
# TODO: move into fence?
bytes = self.redis.get(self.generator_progress_key)
if bytes is None:
@@ -143,6 +151,26 @@ class RedisConnectorIndex:
self.redis.delete(self.generator_complete_key)
self.redis.delete(self.fence_key)
@staticmethod
def parse_key(key_bytes: bytes) -> RedisConnectorIndexIdentifiers | None:
"""Parses the redis key for an indexing job and extracts
the cc_pair_id and search_settings_id"""
fence_key = key_bytes.decode("utf-8")
composite_id = RedisConnectorUtils.get_id_from_fence_key(fence_key)
if composite_id is None:
return None
# parse out metadata and initialize the helper class with it
parts = composite_id.split("/")
if len(parts) != 2:
return None
cc_pair_id = int(parts[0])
search_settings_id = int(parts[1])
return RedisConnectorIndexIdentifiers(
cc_pair_id=cc_pair_id, search_settings_id=search_settings_id
)
@staticmethod
def reset_all(r: redis.Redis) -> None:
"""Deletes all redis values for all connectors"""

View File

@@ -0,0 +1,18 @@
class RedisConnectorUtils:
@staticmethod
def get_id_from_fence_key(key: str) -> str | None:
"""
Extracts the object ID from a fence key in the format `PREFIX_fence_X`.
Args:
key (str): The fence key string.
Returns:
Optional[int]: The extracted ID if the key is in the correct format, otherwise None.
"""
parts = key.split("_")
if len(parts) != 3:
return None
object_id = parts[2]
return object_id

View File

@@ -65,6 +65,7 @@ from danswer.db.connector import update_connector
from danswer.db.connector_credential_pair import add_credential_to_connector
from danswer.db.connector_credential_pair import get_cc_pair_groups_for_ids
from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.connector_credential_pair import get_connector_credential_pairs
from danswer.db.credentials import cleanup_gmail_credentials
from danswer.db.credentials import cleanup_google_drive_credentials
@@ -90,8 +91,11 @@ from danswer.file_processing.extract_file_text import convert_docx_to_txt
from danswer.file_store.file_store import get_default_file_store
from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.redis.redis_connector import RedisConnector
from danswer.redis.redis_connector_index import RedisConnectorIndex
from danswer.redis.redis_pool import get_redis_client
from danswer.server.documents.models import AuthStatus
from danswer.server.documents.models import AuthUrl
from danswer.server.documents.models import ConnectorBackgroundIndexingStatus
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
from danswer.server.documents.models import ConnectorIndexingStatus
from danswer.server.documents.models import ConnectorSnapshot
@@ -642,6 +646,64 @@ def get_connector_indexing_status(
return indexing_statuses
@router.get("/admin/background/indexing")
def get_background_indexing(
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
tenant_id: str | None = Depends(get_current_tenant_id),
) -> list[ConnectorBackgroundIndexingStatus]:
indexing_statuses: list[ConnectorBackgroundIndexingStatus] = []
r = get_redis_client(tenant_id=tenant_id)
for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
redis_ids = RedisConnectorIndex.parse_key(key_bytes)
if not redis_ids:
continue
redis_connector = RedisConnector(tenant_id, redis_ids.cc_pair_id)
redis_connector_index = redis_connector.new_index(redis_ids.search_settings_id)
if not redis_connector_index.fenced:
continue
payload = redis_connector_index.payload
if not payload:
continue
if not payload.started:
continue
n_progress = redis_connector_index.get_progress()
if not n_progress:
n_progress = 0
cc_pair = get_connector_credential_pair_from_id(
redis_ids.cc_pair_id, db_session
)
if not cc_pair:
continue
indexing_statuses.append(
ConnectorBackgroundIndexingStatus(
name=cc_pair.name,
source=cc_pair.connector.source,
started=payload.started,
progress=n_progress,
index_attempt_id=payload.index_attempt_id,
cc_pair_id=redis_ids.cc_pair_id,
search_settings_id=redis_ids.search_settings_id,
)
)
# Sort the statuses
indexing_statuses = sorted(
indexing_statuses,
key=lambda status: (status.cc_pair_id, status.search_settings_id),
)
return indexing_statuses
def _validate_connector_allowed(source: DocumentSource) -> None:
valid_connectors = [
x for x in ENABLED_CONNECTOR_TYPES.replace("_", "").split(",") if x

View File

@@ -364,6 +364,16 @@ class RunConnectorRequest(BaseModel):
from_beginning: bool = False
class ConnectorBackgroundIndexingStatus(BaseModel):
name: str
source: DocumentSource
cc_pair_id: int
search_settings_id: int
index_attempt_id: int | None
started: datetime | None
progress: int | None
class CCPropertyUpdateRequest(BaseModel):
name: str
value: str

View File

@@ -0,0 +1,94 @@
import { SourceIcon } from "@/components/SourceIcon";
import {
Table,
TableBody,
TableCell,
TableHead,
TableHeader,
TableRow,
} from "@/components/ui/table";
import { getTimeElapsedString } from "@/lib/dateUtils";
import { getSourceDisplayName } from "@/lib/sources";
import { ConnectorBackgroundIndexingStatus, ValidSources } from "@/lib/types";
import React from "react";
import { useRouter } from "next/navigation";
function IndexingAttemptRow({
status,
}: {
status: ConnectorBackgroundIndexingStatus;
}) {
let elapsedString = "Waiting to start...";
if (status.started) {
const parsedDate = new Date(status.started);
elapsedString = getTimeElapsedString(parsedDate);
}
const router = useRouter();
const source = status.source as ValidSources;
return (
<TableRow
className="border-border bg-white py-4 rounded-sm !border cursor-pointer"
onClick={() => {
router.push(`/admin/connector/${status.cc_pair_id}`);
}}
>
<TableCell>
<div className="text-xl font-semibold">{status.name}</div>
</TableCell>
<TableCell>
<div className="text-xl flex items-center truncate ellipsis gap-x-2 font-semibold">
<SourceIcon iconSize={20} sourceType={source} />
{getSourceDisplayName(source)}
</div>
</TableCell>
<TableCell>
<div className="text-xl font-semibold">{status.cc_pair_id}</div>
</TableCell>
<TableCell>
<div className="text-xl font-semibold">{status.search_settings_id}</div>
</TableCell>
<TableCell>
<div className="text-xl font-semibold">{status.index_attempt_id}</div>
</TableCell>
<TableCell>
<div className="text-xl font-semibold">{status.progress}</div>
</TableCell>
<TableCell>
<div className="text-xl font-semibold">{elapsedString}</div>
</TableCell>
<TableCell />
</TableRow>
);
}
export function BackgroundIndexingStatusTable({
indexingStatuses,
}: {
indexingStatuses: ConnectorBackgroundIndexingStatus[];
}) {
return (
<Table>
<TableHeader>
<TableRow>
<TableHead>Name</TableHead>
<TableHead>Connector</TableHead>
<TableHead>CC Pair</TableHead>
<TableHead>Search Settings</TableHead>
<TableHead>Attempt ID</TableHead>
<TableHead>Progress</TableHead>
<TableHead>Elapsed</TableHead>
</TableRow>
</TableHeader>
<TableBody>
{indexingStatuses.map((status, index) => (
<React.Fragment key={index}>
<IndexingAttemptRow status={status} />
</React.Fragment>
))}
</TableBody>
</Table>
);
}

View File

@@ -0,0 +1,58 @@
"use client";
import { LoadingAnimation } from "@/components/Loading";
import { AdminPageTitle } from "@/components/admin/Title";
import { NotebookIcon } from "@/components/icons/icons";
import Text from "@/components/ui/text";
import { useBackgroundIndexingStatus } from "@/lib/hooks";
import { useEffect } from "react";
import { BackgroundIndexingStatusTable } from "./BackgroundIndexingStatusTable";
function Main() {
const {
data: indexingData,
isLoading: isIndexingDataLoading,
error: indexingError,
refreshBackgroundIndexingStatus: refreshStatus,
} = useBackgroundIndexingStatus();
useEffect(() => {
const interval = setInterval(() => {
refreshStatus(); // Call the refresh function every 5 seconds
}, 5000);
return () => clearInterval(interval); // Cleanup on component unmount
}, [refreshStatus]);
if (isIndexingDataLoading) {
return <LoadingAnimation text="" />;
}
if (indexingError || !indexingData) {
return (
<div className="text-error">
{indexingError?.info?.detail ||
"Error getting background indexing status."}
</div>
);
}
if (indexingData.length === 0) {
return <Text>No indexing attempts in progress.</Text>;
}
return <BackgroundIndexingStatusTable indexingStatuses={indexingData} />;
}
export default function Status() {
return (
<div className="mx-auto container">
<AdminPageTitle
icon={<NotebookIcon size={32} />}
title="Background Indexing"
/>
<Main />
</div>
);
}

View File

@@ -22,6 +22,7 @@ import {
ClosedBookIcon,
SearchIcon,
DocumentIcon2,
NotebookIcon,
} from "@/components/icons/icons";
import { UserRole } from "@/lib/types";
import { FiActivity, FiBarChart2 } from "react-icons/fi";
@@ -346,6 +347,20 @@ export function ClientLayout({
),
link: "/admin/performance/custom-analytics",
},
{
name: (
<div className="flex">
<NotebookIcon
className="text-icon-settings-sidebar"
size={18}
/>
<div className="ml-1">
Background Indexing
</div>
</div>
),
link: "/admin/background",
},
],
},
]

View File

@@ -93,3 +93,25 @@ export const getTimeAgoString = (date: Date | null) => {
if (diffDays < 30) return `${diffWeeks}w ago`;
return `${diffMonths}mo ago`;
};
export const getTimeElapsedString = (dateStart: Date) => {
// return a readable string representing time elapsed
// between the given time and now
// aka 1d 5h 10m 6s
// omits leading units if they are zero for readability
const now = new Date(); // Current date and time
const diffMs = now.getTime() - dateStart.getTime(); // Difference in milliseconds
const diffSeconds = Math.floor(diffMs / 1000); // Convert to seconds
const diffMinutes = Math.floor(diffSeconds / 60); // Convert to minutes
const diffHours = Math.floor(diffMinutes / 60); // Convert to hours
const diffDays = Math.floor(diffHours / 24); // Convert to days
const days = diffDays > 0 ? `${diffDays}d ` : "";
const hours = diffHours % 24 > 0 ? `${diffHours % 24}h ` : "";
const minutes = diffMinutes % 60 > 0 ? `${diffMinutes % 60}m ` : "";
const seconds = `${diffSeconds % 60}s`;
return `${days}${hours}${minutes}${seconds}`.trim();
};

View File

@@ -5,6 +5,7 @@ import {
DocumentBoostStatus,
Tag,
UserGroup,
ConnectorBackgroundIndexingStatus,
} from "@/lib/types";
import useSWR, { mutate, useSWRConfig } from "swr";
import { errorHandlingFetcher } from "./fetcher";
@@ -88,6 +89,22 @@ export const useConnectorCredentialIndexingStatus = (
};
};
export const useBackgroundIndexingStatus = (refreshInterval = 15000) => {
const url = "/api/manage/admin/background/indexing";
const { mutate } = useSWRConfig();
const swrResponse = useSWR<ConnectorBackgroundIndexingStatus[]>(
url,
errorHandlingFetcher,
{ refreshInterval: refreshInterval }
);
return {
...swrResponse,
refreshBackgroundIndexingStatus: () => mutate(url),
};
};
export const useCategories = () => {
const { mutate } = useSWRConfig();
const swrResponse = useSWR<PersonaCategory[]>(

View File

@@ -135,6 +135,17 @@ export interface ConnectorIndexingStatus<
in_progress: boolean;
}
// slimmer set of data used for the background indexing monitoring page
export interface ConnectorBackgroundIndexingStatus {
name: string;
source: string;
cc_pair_id: number;
search_settings_id: number;
index_attempt_id: number | null;
started: string | null;
progress: number | null;
}
export interface OAuthPrepareAuthorizationResponse {
url: string;
}