obipipeline — parallel pipeline library
+obipipeline is a generic, multi-threaded data pipeline crate. It connects a source, a chain of transforms, and a sink via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
Core types
+| Type alias | +Rust type | +Role | +
|---|---|---|
SourceFn<D> |
+Box<dyn FnMut() -> Result<D, PipelineError> + Send+Sync> |
+Called repeatedly; FnMut because it holds iterator state |
+
SharedFn<D> |
+Arc<dyn Fn(D) -> Result<D, PipelineError> + Send+Sync> |
+Shared across workers via Arc::clone (no copy of the closure) |
+
SinkFn<D> |
+Box<dyn Fn(D) -> Result<(), PipelineError> + Send+Sync> |
+Final consumer; returns Result so errors propagate back |
+
Pipeline<D> holds one SourceFn, a Vec<SharedFn>, and one SinkFn.
+WorkerPool<D> wraps a Pipeline with n_workers and channel capacity.
WorkerPool
+WorkerPool::new(pipeline: Pipeline<D>, n_workers: usize, capacity: usize) -> Self
+WorkerPool::run(self)
+| Parameter | +Role | +
|---|---|
n_workers |
+Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. | +
capacity |
+Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees. | +
run consumes self (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.
Data enum
+All pipeline stages communicate through a single user-defined enum:
+enum MyData {
+ Unsigned(u64),
+ Number(f64),
+ Text(String),
+}
+Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages.
+Macros
+Six low-level macros build individual stages; one high-level macro (make_pipeline!) composes them.
Low-level
+make_source!(Enum, iterator, OutputVariant) // iterator yields T
+make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result<T, E>
+
+make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U
+make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>
+
+make_sink!(Enum, func, InputVariant) // func: T -> ()
+make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E>
+Each macro wraps the closure in the correct smart pointer (Box for source/sink, Arc for transforms).
make_pipeline! DSL
+make_pipeline! {
+ DataEnum,
+ source iterator => OutputVariant, // or source? for fallible
+ | func: In => Out, // non-fallible transform
+ |? func: In => Out, // fallible transform
+ sink func @ InputVariant, // or sink? for fallible
+}
+? marks fallibility on source, individual transforms, or sink independently.
+Implemented as a TT muncher: the internal rule @build recurses over transform tokens one at a time, accumulating them into a vec![], then terminates on sink/sink?.
Scheduler architecture
+Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
+ ▲ │
+ [stage_rxs] ────────┘◄──────────────────────────────┘
+ │
+ [sink_err_rx] ← errors from sink (highest priority)
+ │
+ Sink thread
+The scheduler is a single thread running a biased Select over all input channels. Priority order (highest first):
index 0 sink_err_rx abort on sink error
+index 1 stage_rxs[N-1] drain last stage first
+...
+index N stage_rxs[0]
+index N+1 source_rx pull new data last
+This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.
+Workers are generic: each receives (data, SharedFn, result_tx) and calls f(data), sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result.
Termination uses an in_flight counter:
-
+
- incremented when an item is dispatched from source to workers +
- decremented when the item exits the last stage +
- the loop exits only when
source_done && in_flight == 0
+
This guarantees all in-flight items complete before join().
Error handling
+PipelineError has four variants:
| Variant | +Meaning | +
|---|---|
EndOfStream |
+Source exhausted (normal termination, not sent downstream) | +
TypeMismatch |
+Wrong enum variant arrived at a stage | +
StepKindMismatch |
+Internal routing error | +
StepError(Box<dyn Error>) |
+Error from user code (wrapped by make_*_fallible!) |
+
Sink errors flow back to the scheduler via a dedicated Receiver<PipelineError> registered at index 0 of the Select — the pipeline stops immediately on the first sink error.
Example
+enum PipelineData { Unsigned(u64), Number(f64), Text(String) }
+
+fn to_f64(x: u64) -> f64 { x as f64 }
+fn format_num(n: f64) -> String { format!("{}", n) }
+fn reverse(s: String) -> String { s.chars().rev().collect() }
+fn hash(s: String) -> u64 { /* djb2 */ }
+fn print_hash(h: u64) -> Result<(), std::io::Error> { println!("{}", h); Ok(()) }
+
+let pipeline = make_pipeline! {
+ PipelineData,
+ source 1u64..=10 => Unsigned,
+ | to_f64: Unsigned => Number,
+ | format_num: Number => Text,
+ | reverse: Text => Text,
+ | hash: Text => Unsigned,
+ sink? print_hash @ Unsigned,
+};
+
+WorkerPool::new(pipeline, 4, 64).run();
+