Files
obikmer/docmd/implementation/obipipeline.md
T
Eric Coissac 036d044291 refactor: update core types and add approximate evidence support
Refactor `Kmer`, `SuperKmer`, and chunk reader into optimized, generic representations with compile-time length parameters and bitwise operations. Update the pipeline and scheduler to support batch processing, 1→N flat transformations, and multi-source merging. Introduce an approximate evidence mode using b-bit fingerprints and `.idx` files, alongside existing exact mode. Update CLI documentation, minimizer selection, and query output schema accordingly.
2026-05-26 10:04:25 +02:00

7.8 KiB
Raw Blame History

obipipeline — parallel pipeline library

obipipeline is a generic, multi-threaded data pipeline crate. It connects a source, a chain of stages, and a sink via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.

Core types

Type alias Rust type Role
SourceFn<D> Box<dyn FnMut() -> Result<D, PipelineError> + Send> Called repeatedly; FnMut because it holds iterator state
SharedFn<D> Arc<dyn Fn(D) -> Result<D, PipelineError> + Send + Sync> 1→1 transform shared across workers via Arc::clone
SharedFlatFn<D> Arc<dyn Fn(D, &Sender<Result<D, _>>, &Sender<isize>) + Send + Sync> 1→N transform; pushes items into channel, sends delta
SinkFn<D> Box<dyn Fn(D) -> Result<(), PipelineError> + Send> Final consumer; returns Result so errors propagate back

Stages come in two variants:

pub enum Stage<D> {
    Transform(SharedFn<D>),   // 1→1
    Flat(SharedFlatFn<D>),    // 1→N
}

Pipeline<D> holds one SourceFn, a Vec<Stage>, and one SinkFn.
WorkerPool<D> wraps a Pipeline with n_workers and channel capacity.

WorkerPool

WorkerPool::new(pipeline: Pipeline<D>, n_workers: usize, capacity: usize) -> Self
WorkerPool::run(self)
Parameter Role
n_workers Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it.
capacity Bound on every crossbeam channel in the pipeline. Controls memory and back-pressure: a full channel blocks the sender until a slot frees.

run consumes self (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.

Data enum

All pipeline stages communicate through a single user-defined enum:

enum MyData {
    Unsigned(u64),
    Number(f64),
    Text(String),
}

Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages.

Macros

Eight low-level macros build individual stages; one high-level macro (make_pipeline!) composes them.

Low-level

make_source!(Enum, iterator, OutputVariant)          // iterator yields T
make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result<T, E>

make_transform!(Enum, func, InputVariant, OutputVariant)          // func: T -> U
make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>

make_flat_transform!(Enum, func, InputVariant, OutputVariant)          // func: T -> impl IntoIterator<Item=U>
make_flat_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<impl IntoIterator<Item=U>, E>

make_sink!(Enum, func, InputVariant)           // func: T -> ()
make_sink_fallible!(Enum, func, InputVariant)  // func: T -> Result<(), E>

Each macro wraps the closure in the correct smart pointer (Box for source/sink, Arc for transforms).

make_pipeline! DSL

make_pipeline! {
    DataEnum,
    source   iterator     => OutputVariant,   // or source?  for fallible
    |   func: In => Out,                       // 1→1 non-fallible transform
    |?  func: In => Out,                       // 1→1 fallible transform
    ||  func: In => Out,                       // 1→N non-fallible flat transform
    ||? func: In => Out,                       // 1→N fallible flat transform
    sink     func         @ InputVariant,      // or sink?    for fallible
}

? marks fallibility on source, individual transforms, or sink independently.
Implemented as a TT muncher: the internal rule @build recurses over transform tokens one at a time, accumulating them into a vec![], then terminates on sink/sink?.

make_pipe! DSL

make_pipe! builds a sourceless/sinkless Pipe<D, In, Out> — a reusable, composable stage sequence:

make_pipe! {
    DataEnum : InType => OutType,
    |   func: InVariant => OutVariant,
    |?  func: InVariant => OutVariant,
    ||  func: InVariant => OutVariant,
    ||? func: InVariant => OutVariant,
}

Two pipes compose with .then(other). Apply to an iterator with .apply(iter, n_workers, capacity) to get a PipeIter<Out> — an iterator over the pipeline output, backed by a background WorkerPool. The scatter step in obikmer uses make_pipe! and .apply() rather than the full make_pipeline! / WorkerPool pattern.

Scheduler architecture

Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
                                      ▲                               │
                  [stage_rxs] ────────┘◄──────────────────────────────┘
                  [flat_delta_rx] ──► Scheduler  (in_flight adjustment)
                                      │
                              [sink_err_rx]  ← errors from sink (highest priority)
                                      │
                                   Sink thread

The scheduler is a single thread running a biased Select over all input channels. Priority order (highest first):

index 0       sink_err_rx          abort on sink error
index 1       flat_delta_rx        adjust in_flight before dispatching
index 2..=n+1 stage_rxs[n-1..0]   drain last stage first
index n+2     source_rx            pull new data last

This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.

Workers are generic: each receives a WorkerTask — either Transform(data, stage_idx) or Flat(data, stage_idx). For Transform, the worker calls f(data) and sends the result to stage_txs[stage_idx]. For Flat, the worker calls f(data, &push_tx, &delta_tx): the closure pushes N items into push_tx then sends N-1 to delta_tx. The scheduler uses the delta to adjust in_flight without knowing N in advance.

Termination uses an in_flight: isize counter and a flat_workers_active: usize counter:

  • in_flight incremented when an item is dispatched from source to workers
  • in_flight decremented when the item exits the last stage to the sink
  • flat_workers_active incremented when a Flat task is dispatched, decremented when the delta arrives
  • the loop exits only when source_done && in_flight == 0 && flat_workers_active == 0

This guarantees all in-flight items complete (including all N outputs of a flat stage) before join().

Error handling

PipelineError has four variants:

Variant Meaning
EndOfStream Source exhausted (normal termination, not sent downstream)
TypeMismatch Wrong enum variant arrived at a stage
StepKindMismatch Internal routing error
StepError(Box<dyn Error + Send + Sync>) Error from user code (wrapped by make_*_fallible!)

Sink errors flow back to the scheduler via a dedicated Receiver<PipelineError> registered at index 0 of the Select — the pipeline stops immediately on the first sink error.

Example

enum PipelineData { Unsigned(u64), Number(f64), Text(String) }

fn to_f64(x: u64) -> f64 { x as f64 }
fn format_num(n: f64) -> String { format!("{}", n) }
fn reverse(s: String) -> String { s.chars().rev().collect() }
fn hash(s: String) -> u64 { /* djb2 */ }
fn print_hash(h: u64) -> Result<(), std::io::Error> { println!("{}", h); Ok(()) }

let pipeline = make_pipeline! {
    PipelineData,
    source  1u64..=10 => Unsigned,
    | to_f64:     Unsigned => Number,
    | format_num: Number   => Text,
    | reverse:    Text     => Text,
    | hash:       Text     => Unsigned,
    sink?   print_hash     @ Unsigned,
};

WorkerPool::new(pipeline, 4, 64).run();