Skip to content

Ingester

The ingester is a long-running service that watches sources for changes and feeds documents into haiku.rag's LanceDB. It runs as a separate process (haiku-ingester serve), owns its own job queue (SQLite by default, or a database server), and exposes a small HTTP control plane for operations.

Use the ingester when:

  • you have a corpus you want to keep in sync continuously
  • documents arrive over time from filesystem, S3, or HTTP sources
  • you want retry + dead-letter behavior, not "fire and forget"

For one-off ingestion, the haiku-rag add-src CLI is enough — see CLI → Add Documents.

On this page:

Single-writer constraint: only one ingester per LanceDB. See Storage → Deployment Pattern.

Install

The ingester ships behind an optional extra:

pip install 'haiku.rag-slim[ingester]'
# or, for the full package:
pip install 'haiku.rag[ingester]'

That pulls fastapi, uvicorn, sqlalchemy, aiosqlite, asyncpg, and the [s3] extra. The production binary is haiku-ingester.

Configure sources

Add an ingester: block to your haiku.rag.yaml. The minimum is a single source:

ingester:
  sources:
    - type: fs
      id: local-docs
      root: /Users/you/docs
      delete_orphans: true

Filesystem

ingester:
  sources:
    - type: fs
      id: local-docs                          # optional; auto-derives from root
      root: /Users/you/docs
      poll_interval_s: 300
      delete_orphans: true
      ignore_patterns: ["**/.git/**", "**/node_modules/**"]
      include_patterns: ["*.md", "*.pdf"]    # optional whitelist

Uses watchfiles for push events plus a periodic sweep that catches anything the OS dropped between starts. Patterns follow gitignore syntax.

S3 / object storage

ingester:
  sources:
    - type: s3
      id: corp-docs
      uri: s3://my-bucket/incoming/
      poll_interval_s: 300
      delete_orphans: true
      ignore_patterns: ["draft*"]
      include_patterns: ["*.pdf", "*.md"]
      storage_options:
        endpoint: http://seaweed:8333         # omit for AWS default chain
        aws_access_key_id: ${AWS_KEY}
        aws_secret_access_key: ${AWS_SECRET}
        region: us-east-1
        allow_http: "true"

ETags are the cheap-skip key. Each sweep lists the prefix, compares the listed ETag against the document's stored metadata["source_revision"], and only fetches keys whose ETag has changed. If the bytes turn out to match the stored MD5 (multipart re-upload landing a new ETag on the same content), only the revision is refreshed — no re-chunk.

storage_options follows the same convention as lancedb.storage_options — the dict is passed straight to obstore (the Rust object_store library LanceDB uses internally), so credentials configured for the LanceDB backend can be copy-pasted here.

HTTP

ingester:
  sources:
    - type: http
      id: arxiv
      urls:
        - https://arxiv.org/pdf/2301.12345.pdf
      headers:
        Authorization: Bearer ${SOME_TOKEN}
      poll_interval_s: 86400

HTTP is pull-based with HEAD-driven change detection. A 410 Gone response from a configured URL triggers a delete event; other failure statuses fall through to UPSERT-with-no-revision so the worker can GET and decide.

WebDAV

ingester:
  sources:
    - type: webdav
      id: nextcloud
      base_url: https://nextcloud.example.com/remote.php/dav/files/alice/Documents/
      username: alice
      password: ${NEXTCLOUD_APP_PASSWORD}
      ignore_patterns: ["**/Trash/**"]
      poll_interval_s: 600

Each sweep issues one PROPFIND with Depth: infinity against base_url and parses the multistatus response. Files (non-collection resources) are emitted as UPSERT / UNCHANGED based on the getetag property (falling back to getlastmodified if the server omits it); URIs that were in the previous snapshot but no longer appear under the collection are emitted as DELETE.

Fetches are plain HTTP GETs — any WebDAV server already supports them.

Bearer-token auth can replace HTTP Basic via the standard headers map:

    - type: webdav
      id: kdrive
      base_url: https://kdrive.infomaniak.com/app/drive/123/
      headers:
        Authorization: Bearer ${KDRIVE_TOKEN}

File size limits

Any source can set max_file_size (bytes) to reject oversized files before they are read into memory. Files exceeding the limit go straight to the DLQ without retrying.

    - type: fs
      root: /data/docs
      max_file_size: 104857600        # 100 MB

FS and S3 sources know the size before downloading (stat, object metadata), so the limit is always enforced. For HTTP and WebDAV the check relies on a Content-Length response header; a server that omits it (for example a chunked response) is fetched in full and the limit does not apply.

Workers and retry

ingester:
  workers:
    worker_count: 4
    poll_idle_interval_s: 1.0
    claim_timeout_s: 1800
    reaper_interval_s: 60
    shutdown_grace_s: 60            # SIGTERM drains in-flight up to this long
    retry:
      max_attempts: 5
      base_delay_s: 2.0
      max_delay_s: 300.0
      jitter: 0.25                  # ±25%

The worker pool runs worker_count async workers, each processing one job at a time. worker_count is therefore also the maximum number of concurrent in-flight jobs. Jobs that hit a TransientError are rescheduled with exponential backoff plus jitter, up to max_attempts, then land in the dead-letter queue. PermanentError (unsupported extension, 4xx HTTP except 408/429, etc.) skips retry entirely.

A reaper task resets jobs whose claimed_at is older than claim_timeout_s so a crashed worker doesn't strand its job.

Backpressure. Each poller skips its periodic sweep when its source already has queued or claimed jobs in the queue. The unique-index dedup would coalesce a re-sweep anyway; the skip saves the listing round-trip (PROPFIND / S3 LIST / FS walk). FS push events from watchfiles still flow during a skipped sweep, so new files aren't lost.

Graceful shutdown. On SIGINT / SIGTERM, pollers stop immediately and workers are given shutdown_grace_s to finish in-flight jobs. Jobs still running after the grace window are cancelled — they stay claimed in the queue and are reset by the reaper on the next start once claim_timeout_s elapses.

Tuning.

  • claim_timeout_s must exceed the longest legitimate job duration; a shorter value lets the reaper resurrect in-flight jobs.
  • worker_count should match downstream capacity. docling-serve processes one task per instance, so worker_count above the number of providers.docling_serve.base_url entries over-subscribes the fleet — extra submissions queue inside docling-serve and inflate claimed_at duration toward claim_timeout_s.
  • poll_idle_interval_s: lower = faster pickup, more SQLite churn.
  • reaper_interval_s: worst-case post-crash reclaim is claim_timeout_s + reaper_interval_s.

Per-source override. A source can opt out of the global retry policy:

ingester:
  sources:
    - type: http
      id: flaky-api
      urls: [...]
      retry:
        max_attempts: 10
        base_delay_s: 10

Circuit breaker

After N consecutive discover() failures, a source's circuit breaker opens and polling pauses for a cooldown. Other sources keep running.

ingester:
  sources:
    - type: http
      id: rate-limited
      urls: [...]
      circuit_breaker:
        failure_threshold: 5
        cooldown_s: 600

Run it

haiku-ingester serve                          # workers + pollers + API
haiku-ingester serve --no-api                 # workers + pollers only
haiku-ingester serve --db /path.lancedb       # explicit DB
haiku-ingester serve --host 0.0.0.0           # bind API on all interfaces
haiku-ingester serve --port 9000              # override API port

--host and --port are CLI overrides for ingester.api.host and ingester.api.port in haiku.rag.yaml. Both default to the YAML value (which itself defaults to 127.0.0.1:8765 — loopback only).

The service blocks until SIGINT or SIGTERM. Shutdown drains the API server, then pollers, then in-flight workers.

Single-writer constraint

LanceDB supports exactly one writer + N readers per database URI. Run exactly one haiku-ingester serve against a given LanceDB. Multiple MCP servers or read-only consumers against the same DB are fine.

HTTP control plane

By default the ingester exposes a FastAPI control plane on 127.0.0.1:8765. Set ingester.api.auth_token to require a Bearer token; without one the API stays open and the service logs a warning.

Non-loopback binds need a token

Loopback (127.0.0.1) is local-only and safe to leave open. If you bind to any other interface (0.0.0.0, a LAN IP, behind a reverse proxy) set auth_token — the control plane can cancel jobs, retry from the DLQ, and trigger source refreshes. The startup warning is your only signal that you forgot.

Method Path Purpose
GET / browser dashboard (HTML; unauthenticated, the JS attaches the bearer on its own JSON fetches)
GET /health liveness + queue counts + live worker/poller counts; status is "ok" or "degraded"
GET /sources configured pollers + last-poll time + breaker state + last skip reason
POST /sources/{id}/refresh force an out-of-band sweep
GET /jobs filtered list (status, source_id, uri, limit, offset)
GET /jobs/{id} one job
POST /jobs/{id}/retry reset attempts to 0, status to queued
DELETE /jobs/{id} cancel a queued/claimed job
GET /dlq dead jobs
POST /dlq/{id}/retry resurrect from DLQ
GET /stats rolling throughput (5m / 30m / 1h succeeded), worker occupancy, oldest queued age, per-source DLQ + backlog

OpenAPI docs at http://localhost:8765/docs. The dashboard at / polls the JSON endpoints above every few seconds and surfaces the same data visually — queue depth chips, per-source health with a queue busy badge when sweeps are skipped, throughput counters, active jobs with a Cancel button, recent failures with a Retry button, and the last-completed feed.

Ingester dashboard mid-ingest: queue depth, per-source health, active and recent jobs

ingester:
  api:
    enabled: true
    host: 127.0.0.1
    port: 8765
    auth_token: secret                        # null → unauthenticated

Operating

One-shot batch build

run-batch runs a single discover sweep across every configured source, drains the queue, then exits. New and changed resources are ingested, resources that vanished from a source are deleted. The periodic poller loops never start, so the run is deterministic and finishes as soon as the queue is empty. This is the mode for building a database in CI or on a schedule rather than running the service continuously.

haiku-ingester run-batch
haiku-ingester run-batch --db rag.lancedb

Orphan deletion compares each source against sync_state in the queue DB, so persist ingester.db between runs for deletions to be detected. It exits non-zero if any job dead-letters or a source's discovery sweep does not complete.

The queue

The ingester's SQLite queue lives at ~/Library/Application Support/haiku.rag/ingester.db on macOS (platform user data dir; configurable via ingester.queue.path). It's created automatically by serve.

For ops setup you can pre-create it:

haiku-ingester queue init             # create the DB and schema
haiku-ingester queue migrate          # apply pending schema changes

Terminal job rows (succeeded and dead) are kept for history and pruned by the reaper once they age past retention_days:

ingester:
  queue:
    path: /var/lib/haiku-rag/ingester.db
    retention_days: 30                # null disables pruning

The reaper deletes terminal rows whose completed_at is older than the window on its reaper_interval_s cadence. Set retention_days: null to keep all terminal rows.

Using a database server

If you already run a database server, point the queue at it with ingester.queue.dburi, a SQLAlchemy async URL. SQLite is used when dburi is unset.

ingester:
  queue:
    dburi: postgresql+asyncpg://haiku:secret@db:5432/haiku_rag

Postgres (postgresql+asyncpg://) is supported alongside the default SQLite. The asyncpg driver ships with the [ingester] extra. dburi overrides path, and the --queue CLI flag is ignored while it is set. Create the schema the same way as for SQLite:

haiku-ingester queue init

Workers claim jobs with FOR UPDATE SKIP LOCKED, so several haiku-ingester serve processes can share one Postgres queue and scale out horizontally. One caveat: idle workers wake on new work instantly only within their own process. Workers in other processes pick up enqueued jobs on their next poll_idle_interval_s tick rather than immediately.

Logs

The service logs via Python logging to stderr through a Rich handler. A typical run looks like:

INFO     Ingester running: 4 worker(s), 1 source(s)
INFO     API listening on 127.0.0.1:8765
INFO     Swept local-docs: 142 upsert, 0 delete, 8 unchanged
INFO     Processing upsert file:///.../a.md (job 5d9a...)
INFO     Job 5d9a... succeeded in 0.34s: file:///.../a.md

When LOGFIRE_TOKEN is set, spans are also shipped to Logfire.

Operating against the API

TOKEN=$INGESTER_TOKEN   # omit -H entirely if no token configured

curl http://localhost:8765/health
curl -H "Authorization: Bearer $TOKEN" http://localhost:8765/sources
curl -H "Authorization: Bearer $TOKEN" 'http://localhost:8765/jobs?status=dead'

# Force a poll now
curl -H "Authorization: Bearer $TOKEN" -X POST \
    http://localhost:8765/sources/local-docs/refresh

# Resurrect a dead job
curl -H "Authorization: Bearer $TOKEN" -X POST \
    http://localhost:8765/jobs/<id>/retry