Compare commits

..

1 Commits

Author SHA1 Message Date
Jamison Lahman
b3bd38e39b chore(devtools): ods dev tunnel 2026-04-20 23:41:44 +00:00
17 changed files with 127 additions and 324 deletions

View File

@@ -16,13 +16,9 @@
"source=onyx-devcontainer-local,target=/home/dev/.local,type=volume"
],
"containerEnv": {
"MODEL_SERVER_HOST": "inference_model_server",
"OPENSEARCH_HOST": "opensearch",
"POSTGRES_HOST": "relational_db",
"REDIS_HOST": "cache",
"S3_ENDPOINT_URL": "http://minio:9000",
"SSH_AUTH_SOCK": "/tmp/ssh-agent.sock",
"VESPA_HOST": "index"
"POSTGRES_HOST": "relational_db",
"REDIS_HOST": "cache"
},
"remoteUser": "${localEnv:DEVCONTAINER_REMOTE_USER:dev}",
"updateRemoteUserUID": false,

View File

@@ -40,7 +40,6 @@ ALLOWED_DOMAINS=(
"api.anthropic.com"
"api-staging.anthropic.com"
"files.anthropic.com"
"huggingface.co"
"sentry.io"
"update.code.visualstudio.com"
"pypi.org"

View File

@@ -403,7 +403,7 @@ jobs:
echo "CERT_ID=$CERT_ID" >> $GITHUB_ENV
echo "Certificate imported."
- uses: tauri-apps/tauri-action@84b9d35b5fc46c1e45415bdb6144030364f7ebc5 # ratchet:tauri-apps/tauri-action@action-v0.6.2
- uses: tauri-apps/tauri-action@73fb865345c54760d875b94642314f8c0c894afa # ratchet:tauri-apps/tauri-action@action-v0.6.1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
APPLE_ID: ${{ env.APPLE_ID }}

View File

@@ -42,7 +42,7 @@ jobs:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # ratchet:actions/checkout@v6
with:
persist-credentials: false
- uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # zizmor: ignore[cache-poisoning]
- uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # zizmor: ignore[cache-poisoning]
with:
go-version: ${{ env.GO_VERSION }}
cache-dependency-path: "**/go.sum"

View File

@@ -1,27 +0,0 @@
"""Add file_id to documents
Revision ID: 91d150c361f6
Revises: a6fcd3d631f9
Create Date: 2026-04-16 15:43:30.314823
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "91d150c361f6"
down_revision = "a6fcd3d631f9"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"document",
sa.Column("file_id", sa.String(), nullable=True),
)
def downgrade() -> None:
op.drop_column("document", "file_id")

View File

@@ -58,8 +58,6 @@ from onyx.db.indexing_coordination import IndexingCoordination
from onyx.db.models import IndexAttempt
from onyx.file_store.document_batch_storage import DocumentBatchStorage
from onyx.file_store.document_batch_storage import get_document_batch_storage
from onyx.file_store.staging import build_raw_file_callback
from onyx.file_store.staging import RawFileCallback
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.indexing.indexing_pipeline import index_doc_batch_prepare
from onyx.redis.redis_hierarchy import cache_hierarchy_nodes_batch
@@ -92,7 +90,6 @@ def _get_connector_runner(
end_time: datetime,
include_permissions: bool,
leave_connector_active: bool = LEAVE_CONNECTOR_ACTIVE_ON_INITIALIZATION_FAILURE,
raw_file_callback: RawFileCallback | None = None,
) -> ConnectorRunner:
"""
NOTE: `start_time` and `end_time` are only used for poll connectors
@@ -111,7 +108,6 @@ def _get_connector_runner(
input_type=task,
connector_specific_config=attempt.connector_credential_pair.connector.connector_specific_config,
credential=attempt.connector_credential_pair.credential,
raw_file_callback=raw_file_callback,
)
# validate the connector settings
@@ -279,12 +275,6 @@ def run_docfetching_entrypoint(
f"credentials='{credential_id}'"
)
raw_file_callback = build_raw_file_callback(
index_attempt_id=index_attempt_id,
cc_pair_id=connector_credential_pair_id,
tenant_id=tenant_id,
)
connector_document_extraction(
app,
index_attempt_id,
@@ -292,7 +282,6 @@ def run_docfetching_entrypoint(
attempt.search_settings_id,
tenant_id,
callback,
raw_file_callback=raw_file_callback,
)
logger.info(
@@ -312,7 +301,6 @@ def connector_document_extraction(
search_settings_id: int,
tenant_id: str,
callback: IndexingHeartbeatInterface | None = None,
raw_file_callback: RawFileCallback | None = None,
) -> None:
"""Extract documents from connector and queue them for indexing pipeline processing.
@@ -463,7 +451,6 @@ def connector_document_extraction(
start_time=window_start,
end_time=window_end,
include_permissions=should_fetch_permissions_during_indexing,
raw_file_callback=raw_file_callback,
)
# don't use a checkpoint if we're explicitly indexing from

View File

@@ -868,15 +868,6 @@ MAX_EMBEDDED_IMAGES_PER_UPLOAD = max(
0, int(os.environ.get("MAX_EMBEDDED_IMAGES_PER_UPLOAD") or 1000)
)
# Maximum non-empty cells to extract from a single xlsx worksheet. Protects
# from OOM on honestly-huge spreadsheets: memory cost in the extractor is
# roughly proportional to this count. Once exceeded, the scan stops and a
# truncation marker row is appended to the sheet's CSV.
# Peak Memory ~= 100 B * MAX_CELLS
MAX_XLSX_CELLS_PER_SHEET = max(
0, int(os.environ.get("MAX_XLSX_CELLS_PER_SHEET") or 10_000_000)
)
# Use document summary for contextual rag
USE_DOCUMENT_SUMMARY = os.environ.get("USE_DOCUMENT_SUMMARY", "true").lower() == "true"
# Use chunk summary for contextual rag

View File

@@ -372,7 +372,6 @@ class FileOrigin(str, Enum):
CONNECTOR_METADATA = "connector_metadata"
GENERATED_REPORT = "generated_report"
INDEXING_CHECKPOINT = "indexing_checkpoint"
INDEXING_STAGING = "indexing_staging"
PLAINTEXT_CACHE = "plaintext_cache"
OTHER = "other"
QUERY_HISTORY_CSV = "query_history_csv"

View File

@@ -22,7 +22,6 @@ from onyx.db.credentials import backend_update_credential_json
from onyx.db.credentials import fetch_credential_by_id
from onyx.db.enums import AccessType
from onyx.db.models import Credential
from onyx.file_store.staging import RawFileCallback
from shared_configs.contextvars import get_current_tenant_id
@@ -108,7 +107,6 @@ def instantiate_connector(
input_type: InputType,
connector_specific_config: dict[str, Any],
credential: Credential,
raw_file_callback: RawFileCallback | None = None,
) -> BaseConnector:
connector_class = identify_connector_class(source, input_type)
@@ -132,9 +130,6 @@ def instantiate_connector(
connector.set_allow_images(get_image_extraction_and_analysis_enabled())
if raw_file_callback is not None:
connector.set_raw_file_callback(raw_file_callback)
return connector

View File

@@ -15,7 +15,6 @@ from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import Document
from onyx.connectors.models import HierarchyNode
from onyx.connectors.models import SlimDocument
from onyx.file_store.staging import RawFileCallback
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.variable_functionality import fetch_ee_implementation_or_noop
@@ -43,9 +42,6 @@ class NormalizationResult(BaseModel):
class BaseConnector(abc.ABC, Generic[CT]):
REDIS_KEY_PREFIX = "da_connector_data:"
# Optional raw-file persistence hook to save original file
raw_file_callback: RawFileCallback | None = None
@abc.abstractmethod
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
raise NotImplementedError
@@ -92,15 +88,6 @@ class BaseConnector(abc.ABC, Generic[CT]):
"""Implement if the underlying connector wants to skip/allow image downloading
based on the application level image analysis setting."""
def set_raw_file_callback(self, callback: RawFileCallback) -> None:
"""Inject the per-attempt raw-file persistence callback.
Wired up by the docfetching entrypoint via `instantiate_connector`.
Connectors that don't care about persisting raw bytes can ignore this
— `raw_file_callback` simply stays `None`.
"""
self.raw_file_callback = callback
@classmethod
def normalize_url(cls, url: str) -> "NormalizationResult": # noqa: ARG003
"""Normalize a URL to match the canonical Document.id format used during ingestion.

View File

@@ -952,7 +952,6 @@ class Document(Base):
semantic_id: Mapped[str] = mapped_column(NullFilteredString)
# First Section's link
link: Mapped[str | None] = mapped_column(NullFilteredString, nullable=True)
file_id: Mapped[str | None] = mapped_column(String, nullable=True)
# The updated time is also used as a measure of the last successful state of the doc
# pulled from the source (to help skip reindexing already updated docs in case of

View File

@@ -25,7 +25,6 @@ from openpyxl.worksheet._read_only import ReadOnlyWorksheet
from PIL import Image
from onyx.configs.app_configs import MAX_EMBEDDED_IMAGES_PER_FILE
from onyx.configs.app_configs import MAX_XLSX_CELLS_PER_SHEET
from onyx.configs.constants import ONYX_METADATA_FILENAME
from onyx.configs.llm_configs import get_image_extraction_and_analysis_enabled
from onyx.file_processing.file_types import OnyxFileExtensions
@@ -471,19 +470,12 @@ def _sheet_to_csv(rows: Iterator[tuple[Any, ...]]) -> str:
Empty rows are never stored. Column occupancy is tracked as a ``bytearray``
bitmap so column trimming needs no transpose or copy. Runs of empty
rows/columns longer than 2 are collapsed; shorter runs are preserved.
Scanning stops once ``MAX_XLSX_CELLS_PER_SHEET`` non-empty cells have been
seen; the output gets a truncation marker row appended so downstream
indexing sees that the sheet was cut off.
"""
MAX_EMPTY_ROWS_IN_OUTPUT = 2
MAX_EMPTY_COLS_IN_OUTPUT = 2
TRUNCATION_MARKER = "[truncated: sheet exceeded cell limit]"
non_empty_rows: list[tuple[int, list[str]]] = []
col_has_data = bytearray()
total_non_empty = 0
truncated = False
for row_idx, row_vals in enumerate(rows):
# Fast-reject empty rows before allocating a list of "".
@@ -498,11 +490,6 @@ def _sheet_to_csv(rows: Iterator[tuple[Any, ...]]) -> str:
for i, v in enumerate(cells):
if v:
col_has_data[i] = 1
total_non_empty += 1
if total_non_empty > MAX_XLSX_CELLS_PER_SHEET:
truncated = True
break
if not non_empty_rows:
return ""
@@ -523,9 +510,6 @@ def _sheet_to_csv(rows: Iterator[tuple[Any, ...]]) -> str:
writer.writerow([cells[c] if c < len(cells) else "" for c in keep_cols])
last_idx = row_idx
if truncated:
writer.writerow([TRUNCATION_MARKER])
return buf.getvalue().rstrip("\n")

View File

@@ -1,63 +0,0 @@
from collections.abc import Callable
from typing import Any
from typing import IO
from onyx.configs.constants import FileOrigin
from onyx.file_store.file_store import get_default_file_store
from onyx.utils.logger import setup_logger
logger = setup_logger()
# (content, content_type) -> file_id
RawFileCallback = Callable[[IO[bytes], str], str]
def stage_raw_file(
content: IO,
content_type: str,
*,
metadata: dict[str, Any],
) -> str:
"""Persist raw bytes to the file store with FileOrigin.INDEXING_STAGING.
`metadata` is attached to the file_record so that downstream promotion
(in docprocessing) and orphan reaping (TTL janitor) can locate the file
by its originating context.
"""
file_store = get_default_file_store()
file_id = file_store.save_file(
content=content,
display_name=None,
file_origin=FileOrigin.INDEXING_STAGING,
file_type=content_type,
file_metadata=metadata,
)
return file_id
def build_raw_file_callback(
*,
index_attempt_id: int,
cc_pair_id: int,
tenant_id: str,
) -> RawFileCallback:
"""Build a per-attempt callback that connectors can invoke to opt in to
raw-file persistence. The closure binds the attempt-level context as the
staging metadata so the connector only needs to pass per-call info
(bytes, content_type) and gets back a file_id to attach to its Document.
"""
metadata: dict[str, Any] = {
"index_attempt_id": index_attempt_id,
"cc_pair_id": cc_pair_id,
"tenant_id": tenant_id,
}
def _callback(content: IO[bytes], content_type: str) -> str:
return stage_raw_file(
content=content,
content_type=content_type,
metadata=metadata,
)
return _callback

View File

@@ -1,130 +0,0 @@
"""External dependency tests for onyx.file_store.staging.
Exercises the raw-file persistence hook used by the docfetching pipeline
against a real file store (Postgres + MinIO/S3), since mocking the store
would defeat the point of verifying that metadata round-trips through
FileRecord.
"""
from collections.abc import Generator
from io import BytesIO
from typing import Any
from uuid import uuid4
import pytest
from sqlalchemy.orm import Session
from onyx.configs.constants import FileOrigin
from onyx.connectors.interfaces import BaseConnector
from onyx.db.file_record import delete_filerecord_by_file_id
from onyx.db.file_record import get_filerecord_by_file_id
from onyx.file_store.file_store import get_default_file_store
from onyx.file_store.staging import build_raw_file_callback
from onyx.file_store.staging import stage_raw_file
@pytest.fixture(scope="function")
def cleanup_file_ids(
db_session: Session,
) -> Generator[list[str], None, None]:
created: list[str] = []
yield created
file_store = get_default_file_store()
for fid in created:
try:
file_store.delete_file(fid)
except Exception:
delete_filerecord_by_file_id(file_id=fid, db_session=db_session)
db_session.commit()
def test_stage_raw_file_persists_with_origin_and_metadata(
db_session: Session,
tenant_context: None, # noqa: ARG001
initialize_file_store: None, # noqa: ARG001
cleanup_file_ids: list[str],
) -> None:
"""stage_raw_file writes a FileRecord with INDEXING_STAGING origin and
round-trips the provided metadata verbatim."""
metadata: dict[str, Any] = {
"index_attempt_id": 42,
"cc_pair_id": 7,
"tenant_id": "tenant-abc",
"extra": "payload",
}
content_bytes = b"hello raw file"
content_type = "application/pdf"
file_id = stage_raw_file(
content=BytesIO(content_bytes),
content_type=content_type,
metadata=metadata,
)
cleanup_file_ids.append(file_id)
db_session.commit()
record = get_filerecord_by_file_id(file_id=file_id, db_session=db_session)
assert record.file_origin == FileOrigin.INDEXING_STAGING
assert record.file_type == content_type
assert record.file_metadata == metadata
def test_build_raw_file_callback_binds_attempt_context_per_call(
db_session: Session,
tenant_context: None, # noqa: ARG001
initialize_file_store: None, # noqa: ARG001
cleanup_file_ids: list[str],
) -> None:
"""The callback returned by build_raw_file_callback must bind the
attempt-level context into every FileRecord it produces, without
leaking state across invocations."""
callback = build_raw_file_callback(
index_attempt_id=1001,
cc_pair_id=202,
tenant_id="tenant-xyz",
)
file_id_a = callback(BytesIO(b"alpha"), "text/plain")
file_id_b = callback(BytesIO(b"beta"), "application/octet-stream")
cleanup_file_ids.extend([file_id_a, file_id_b])
db_session.commit()
assert file_id_a != file_id_b
for fid, expected_content_type in (
(file_id_a, "text/plain"),
(file_id_b, "application/octet-stream"),
):
record = get_filerecord_by_file_id(file_id=fid, db_session=db_session)
assert record.file_origin == FileOrigin.INDEXING_STAGING
assert record.file_type == expected_content_type
assert record.file_metadata == {
"index_attempt_id": 1001,
"cc_pair_id": 202,
"tenant_id": "tenant-xyz",
}
def test_set_raw_file_callback_on_base_connector() -> None:
"""set_raw_file_callback must install the callback as an instance
attribute usable by the connector."""
class _MinimalConnector(BaseConnector):
def load_credentials(
self,
credentials: dict[str, Any], # noqa: ARG002
) -> dict[str, Any] | None:
return None
connector = _MinimalConnector()
assert connector.raw_file_callback is None
sentinel_file_id = f"sentinel-{uuid4().hex[:8]}"
def _fake_callback(_content: Any, _content_type: str) -> str:
return sentinel_file_id
connector.set_raw_file_callback(_fake_callback)
assert connector.raw_file_callback is _fake_callback
assert connector.raw_file_callback(BytesIO(b""), "text/plain") == sentinel_file_id

View File

@@ -339,42 +339,6 @@ class TestSheetToCsvStreaming:
# 5 leading empty rows collapsed to 2
assert csv_text.split("\n") == [",", ",", "A,B"]
def test_cell_cap_truncates_and_appends_marker(self) -> None:
"""When total non-empty cells exceeds the cap, scanning stops and
a truncation marker row is appended so downstream indexing sees
the sheet was cut off."""
with patch(
"onyx.file_processing.extract_file_text.MAX_XLSX_CELLS_PER_SHEET", 5
):
csv_text = _sheet_to_csv(
iter(
[
("A", "B", "C"),
("D", "E", "F"),
("G", "H", "I"),
("J", "K", "L"),
]
)
)
lines = csv_text.split("\n")
assert lines[-1] == "[truncated: sheet exceeded cell limit]"
# First two rows (6 cells) trip the cap=5 check after row 2; the
# third and fourth rows are never scanned.
assert "G" not in csv_text
assert "J" not in csv_text
def test_cell_cap_not_hit_no_marker(self) -> None:
"""Under the cap, no truncation marker is appended."""
csv_text = _sheet_to_csv(
iter(
[
("A", "B"),
("C", "D"),
]
)
)
assert "[truncated" not in csv_text
class TestXlsxSheetExtraction:
def test_one_tuple_per_sheet(self) -> None:

View File

@@ -20,7 +20,8 @@ Commands:
exec Run a command inside the devcontainer
restart Remove and recreate the devcontainer
rebuild Pull the latest image and recreate
stop Stop the running devcontainer`,
stop Stop the running devcontainer
tunnel Tunnel a devcontainer port to the host`,
}
cmd.AddCommand(newDevUpCommand())
@@ -29,6 +30,7 @@ Commands:
cmd.AddCommand(newDevRestartCommand())
cmd.AddCommand(newDevRebuildCommand())
cmd.AddCommand(newDevStopCommand())
cmd.AddCommand(newDevTunnelCommand())
return cmd
}

120
tools/ods/cmd/dev_tunnel.go Normal file
View File

@@ -0,0 +1,120 @@
package cmd
import (
"fmt"
"os"
"os/exec"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/onyx-dot-app/onyx/tools/ods/internal/paths"
)
const (
tunnelNetwork = "onyx_default"
tunnelImage = "alpine/socat"
)
func newDevTunnelCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "tunnel <port|host:container>",
Short: "Tunnel a devcontainer port to the host",
Long: `Forward a TCP port from the running devcontainer to the host.
Launches a short-lived socat sidecar on the devcontainer's docker network that
publishes the chosen host port and proxies connections into the devcontainer.
Runs in the foreground — Ctrl-C tears the tunnel down.
Examples:
ods dev tunnel 8080 # host 8080 -> devcontainer 8080
ods dev tunnel 9000:8080 # host 9000 -> devcontainer 8080`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
hostPort, containerPort, err := parseTunnelPorts(args[0])
if err != nil {
log.Fatalf("Invalid port spec %q: %v", args[0], err)
}
runDevTunnel(hostPort, containerPort)
},
}
return cmd
}
func parseTunnelPorts(spec string) (int, int, error) {
parts := strings.Split(spec, ":")
switch len(parts) {
case 1:
p, err := parsePort(parts[0])
if err != nil {
return 0, 0, err
}
return p, p, nil
case 2:
host, err := parsePort(parts[0])
if err != nil {
return 0, 0, fmt.Errorf("host port: %w", err)
}
container, err := parsePort(parts[1])
if err != nil {
return 0, 0, fmt.Errorf("container port: %w", err)
}
return host, container, nil
default:
return 0, 0, fmt.Errorf("expected <port> or <host>:<container>")
}
}
func parsePort(s string) (int, error) {
p, err := strconv.Atoi(s)
if err != nil {
return 0, fmt.Errorf("not a number: %q", s)
}
if p < 1 || p > 65535 {
return 0, fmt.Errorf("out of range: %d", p)
}
return p, nil
}
func runDevTunnel(hostPort, containerPort int) {
root, err := paths.GitRoot()
if err != nil {
log.Fatalf("Failed to find git root: %v", err)
}
out, err := exec.Command(
"docker", "ps", "-q",
"--filter", "label=devcontainer.local_folder="+root,
).Output()
if err != nil {
log.Fatalf("Failed to find devcontainer: %v", err)
}
containerID := strings.TrimSpace(string(out))
if containerID == "" {
log.Fatal("No running devcontainer found — run `ods dev up` first")
}
log.Infof("Tunneling host :%d -> devcontainer :%d (Ctrl-C to stop)", hostPort, containerPort)
socatArgs := []string{
"run", "--rm", "-i",
"--network", tunnelNetwork,
"-p", fmt.Sprintf("%d:%d", hostPort, containerPort),
tunnelImage,
fmt.Sprintf("TCP-LISTEN:%d,fork,reuseaddr", containerPort),
fmt.Sprintf("TCP:%s:%d", containerID, containerPort),
}
log.Debugf("Running: docker %v", socatArgs)
c := exec.Command("docker", socatArgs...)
c.Stdout = os.Stdout
c.Stderr = os.Stderr
if err := c.Run(); err != nil {
log.Fatalf("docker run failed: %v", err)
}
}