Skip to content

obipipeline — parallel pipeline library

obipipeline is a generic, multi-threaded data pipeline crate. It connects a source, a chain of transforms, and a sink via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.

Core types

Type alias Rust type Role
SourceFn<D> Box<dyn FnMut() -> Result<D, PipelineError> + Send+Sync> Called repeatedly; FnMut because it holds iterator state
SharedFn<D> Arc<dyn Fn(D) -> Result<D, PipelineError> + Send+Sync> Shared across workers via Arc::clone (no copy of the closure)
SinkFn<D> Box<dyn Fn(D) -> Result<(), PipelineError> + Send+Sync> Final consumer; returns Result so errors propagate back

Pipeline<D> holds one SourceFn, a Vec<SharedFn>, and one SinkFn.
WorkerPool<D> wraps a Pipeline with n_workers and channel capacity.

WorkerPool

WorkerPool::new(pipeline: Pipeline<D>, n_workers: usize, capacity: usize) -> Self
WorkerPool::run(self)
Parameter Role
n_workers Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it.
capacity Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees.

run consumes self (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.

Data enum

All pipeline stages communicate through a single user-defined enum:

enum MyData {
    Unsigned(u64),
    Number(f64),
    Text(String),
}

Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages.

Macros

Six low-level macros build individual stages; one high-level macro (make_pipeline!) composes them.

Low-level

make_source!(Enum, iterator, OutputVariant)          // iterator yields T
make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result<T, E>

make_transform!(Enum, func, InputVariant, OutputVariant)          // func: T -> U
make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>

make_sink!(Enum, func, InputVariant)           // func: T -> ()
make_sink_fallible!(Enum, func, InputVariant)  // func: T -> Result<(), E>

Each macro wraps the closure in the correct smart pointer (Box for source/sink, Arc for transforms).

make_pipeline! DSL

make_pipeline! {
    DataEnum,
    source   iterator     => OutputVariant,   // or source?  for fallible
    | func:  In => Out,                        // non-fallible transform
    |? func: In => Out,                        // fallible transform
    sink     func         @ InputVariant,      // or sink?    for fallible
}

? marks fallibility on source, individual transforms, or sink independently.
Implemented as a TT muncher: the internal rule @build recurses over transform tokens one at a time, accumulating them into a vec![], then terminates on sink/sink?.

Scheduler architecture

Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
                                      ▲                               │
                  [stage_rxs] ────────┘◄──────────────────────────────┘
                                      │
                              [sink_err_rx]  ← errors from sink (highest priority)
                                      │
                                   Sink thread

The scheduler is a single thread running a biased Select over all input channels. Priority order (highest first):

index 0       sink_err_rx          abort on sink error
index 1       stage_rxs[N-1]       drain last stage first
...
index N       stage_rxs[0]
index N+1     source_rx            pull new data last

This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.

Workers are generic: each receives (data, SharedFn, result_tx) and calls f(data), sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result.

Termination uses an in_flight counter:

  • incremented when an item is dispatched from source to workers
  • decremented when the item exits the last stage
  • the loop exits only when source_done && in_flight == 0

This guarantees all in-flight items complete before join().

Error handling

PipelineError has four variants:

Variant Meaning
EndOfStream Source exhausted (normal termination, not sent downstream)
TypeMismatch Wrong enum variant arrived at a stage
StepKindMismatch Internal routing error
StepError(Box<dyn Error>) Error from user code (wrapped by make_*_fallible!)

Sink errors flow back to the scheduler via a dedicated Receiver<PipelineError> registered at index 0 of the Select — the pipeline stops immediately on the first sink error.

Example

enum PipelineData { Unsigned(u64), Number(f64), Text(String) }

fn to_f64(x: u64) -> f64 { x as f64 }
fn format_num(n: f64) -> String { format!("{}", n) }
fn reverse(s: String) -> String { s.chars().rev().collect() }
fn hash(s: String) -> u64 { /* djb2 */ }
fn print_hash(h: u64) -> Result<(), std::io::Error> { println!("{}", h); Ok(()) }

let pipeline = make_pipeline! {
    PipelineData,
    source  1u64..=10 => Unsigned,
    | to_f64:     Unsigned => Number,
    | format_num: Number   => Text,
    | reverse:    Text     => Text,
    | hash:       Text     => Unsigned,
    sink?   print_hash     @ Unsigned,
};

WorkerPool::new(pipeline, 4, 64).run();