Compare commits

...

2 Commits

Author SHA1 Message Date
joachim-danswer
7bce2d287d Dual search pipeline for non-tool-calling LLMs (#4872)
Added dual pipeline also for non-tool-calling LLMs. 
A helper function was created.
2025-06-12 19:51:19 -07:00
Evan Lohn
71712df320 jira daylight savings handling (#4797) 2025-06-03 09:17:11 -07:00
4 changed files with 80 additions and 17 deletions

View File

@@ -83,6 +83,22 @@ def _expand_query(
return rephrased_query
def _expand_query_non_tool_calling_llm(
expanded_keyword_thread: TimeoutThread[str],
expanded_semantic_thread: TimeoutThread[str],
) -> QueryExpansions | None:
keyword_expansion: str | None = wait_on_background(expanded_keyword_thread)
semantic_expansion: str | None = wait_on_background(expanded_semantic_thread)
if keyword_expansion is None or semantic_expansion is None:
return None
return QueryExpansions(
keywords_expansions=[keyword_expansion],
semantic_expansions=[semantic_expansion],
)
# TODO: break this out into an implementation function
# and a function that handles extracting the necessary fields
# from the state and config
@@ -186,6 +202,28 @@ def choose_tool(
is_keyword, keywords = wait_on_background(keyword_thread)
override_kwargs.precomputed_is_keyword = is_keyword
override_kwargs.precomputed_keywords = keywords
# dual keyword expansion needs to be added here for non-tool calling LLM case
if (
USE_SEMANTIC_KEYWORD_EXPANSIONS_BASIC_SEARCH
and expanded_keyword_thread
and expanded_semantic_thread
and tool.name == SearchTool._NAME
):
override_kwargs.expanded_queries = _expand_query_non_tool_calling_llm(
expanded_keyword_thread=expanded_keyword_thread,
expanded_semantic_thread=expanded_semantic_thread,
)
if (
USE_SEMANTIC_KEYWORD_EXPANSIONS_BASIC_SEARCH
and tool.name == SearchTool._NAME
and override_kwargs.expanded_queries
):
if (
override_kwargs.expanded_queries.keywords_expansions is None
or override_kwargs.expanded_queries.semantic_expansions is None
):
raise ValueError("No expanded keyword or semantic threads found.")
return ToolChoiceUpdate(
tool_choice=ToolChoice(
tool=tool,
@@ -283,18 +321,23 @@ def choose_tool(
and expanded_keyword_thread
and expanded_semantic_thread
):
keyword_expansion = wait_on_background(expanded_keyword_thread)
semantic_expansion = wait_on_background(expanded_semantic_thread)
override_kwargs.expanded_queries = QueryExpansions(
keywords_expansions=[keyword_expansion],
semantic_expansions=[semantic_expansion],
)
logger.info(
f"Original query: {agent_config.inputs.prompt_builder.raw_user_query}"
override_kwargs.expanded_queries = _expand_query_non_tool_calling_llm(
expanded_keyword_thread=expanded_keyword_thread,
expanded_semantic_thread=expanded_semantic_thread,
)
logger.info(f"Expanded keyword queries: {keyword_expansion}")
logger.info(f"Expanded semantic queries: {semantic_expansion}")
if (
USE_SEMANTIC_KEYWORD_EXPANSIONS_BASIC_SEARCH
and selected_tool.name == SearchTool._NAME
and override_kwargs.expanded_queries
):
# TODO: this is a hack to handle the case where the expanded queries are not found.
# We should refactor this to be more robust.
if (
override_kwargs.expanded_queries.keywords_expansions is None
or override_kwargs.expanded_queries.semantic_expansions is None
):
raise ValueError("No expanded keyword or semantic threads found.")
return ToolChoiceUpdate(
tool_choice=ToolChoice(

View File

@@ -21,6 +21,9 @@ from onyx.connectors.confluence.utils import datetime_from_string
from onyx.connectors.confluence.utils import process_attachment
from onyx.connectors.confluence.utils import update_param_in_path
from onyx.connectors.confluence.utils import validate_attachment_filetype
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
is_atlassian_date_error,
)
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
@@ -76,10 +79,6 @@ ONE_DAY = ONE_HOUR * 24
MAX_CACHED_IDS = 100
def _should_propagate_error(e: Exception) -> bool:
return "field 'updated' is invalid" in str(e)
class ConfluenceCheckpoint(ConnectorCheckpoint):
next_page_url: str | None
@@ -367,7 +366,7 @@ class ConfluenceConnector(
)
except Exception as e:
logger.error(f"Error converting page {page.get('id', 'unknown')}: {e}")
if _should_propagate_error(e):
if is_atlassian_date_error(e): # propagate error to be caught and retried
raise
return ConnectorFailure(
failed_document=DocumentFailure(
@@ -446,7 +445,9 @@ class ConfluenceConnector(
f"Failed to extract/summarize attachment {attachment['title']}",
exc_info=e,
)
if _should_propagate_error(e):
if is_atlassian_date_error(
e
): # propagate error to be caught and retried
raise
return ConnectorFailure(
failed_document=DocumentFailure(
@@ -536,7 +537,7 @@ class ConfluenceConnector(
try:
return self._fetch_document_batches(checkpoint, start, end)
except Exception as e:
if _should_propagate_error(e) and start is not None:
if is_atlassian_date_error(e) and start is not None:
logger.warning(
"Confluence says we provided an invalid 'updated' field. This may indicate"
"a real issue, but can also appear during edge cases like daylight"

View File

@@ -86,3 +86,7 @@ def get_oauth_callback_uri(base_domain: str, connector_id: str) -> str:
# Used for development
base_domain = CONNECTOR_LOCALHOST_OVERRIDE
return f"{base_domain.strip('/')}/connector/oauth/callback/{connector_id}"
def is_atlassian_date_error(e: Exception) -> bool:
return "field 'updated' is invalid" in str(e)

View File

@@ -12,6 +12,9 @@ from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.app_configs import JIRA_CONNECTOR_LABELS_TO_SKIP
from onyx.configs.app_configs import JIRA_CONNECTOR_MAX_TICKET_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
is_atlassian_date_error,
)
from onyx.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
@@ -40,6 +43,8 @@ from onyx.utils.logger import setup_logger
logger = setup_logger()
ONE_HOUR = 3600
JIRA_API_VERSION = os.environ.get("JIRA_API_VERSION") or "2"
_JIRA_SLIM_PAGE_SIZE = 500
_JIRA_FULL_PAGE_SIZE = 50
@@ -240,7 +245,17 @@ class JiraConnector(CheckpointedConnector[JiraConnectorCheckpoint], SlimConnecto
checkpoint: JiraConnectorCheckpoint,
) -> CheckpointOutput[JiraConnectorCheckpoint]:
jql = self._get_jql_query(start, end)
try:
return self._load_from_checkpoint(jql, checkpoint)
except Exception as e:
if is_atlassian_date_error(e):
jql = self._get_jql_query(start - ONE_HOUR, end)
return self._load_from_checkpoint(jql, checkpoint)
raise e
def _load_from_checkpoint(
self, jql: str, checkpoint: JiraConnectorCheckpoint
) -> CheckpointOutput[JiraConnectorCheckpoint]:
# Get the current offset from checkpoint or start at 0
starting_offset = checkpoint.offset or 0
current_offset = starting_offset