# obipipeline — parallel pipeline library `obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **transforms**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler. ## Core types | Type alias | Rust type | Role | |---|---|---| | `SourceFn` | `Box Result + Send+Sync>` | Called repeatedly; `FnMut` because it holds iterator state | | `SharedFn` | `Arc Result + Send+Sync>` | Shared across workers via `Arc::clone` (no copy of the closure) | | `SinkFn` | `Box Result<(), PipelineError> + Send+Sync>` | Final consumer; returns `Result` so errors propagate back | `Pipeline` holds one `SourceFn`, a `Vec`, and one `SinkFn`. `WorkerPool` wraps a `Pipeline` with `n_workers` and channel `capacity`. ## WorkerPool ```rust WorkerPool::new(pipeline: Pipeline, n_workers: usize, capacity: usize) -> Self WorkerPool::run(self) ``` | Parameter | Role | |---|---| | `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. | | `capacity` | Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees. | `run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning. ## Data enum All pipeline stages communicate through a single user-defined enum: ```rust enum MyData { Unsigned(u64), Number(f64), Text(String), } ``` Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages. ## Macros Six low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them. ### Low-level ```rust make_source!(Enum, iterator, OutputVariant) // iterator yields T make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result make_sink!(Enum, func, InputVariant) // func: T -> () make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E> ``` Each macro wraps the closure in the correct smart pointer (`Box` for source/sink, `Arc` for transforms). ### make_pipeline! DSL ``` make_pipeline! { DataEnum, source iterator => OutputVariant, // or source? for fallible | func: In => Out, // non-fallible transform |? func: In => Out, // fallible transform sink func @ InputVariant, // or sink? for fallible } ``` `?` marks fallibility on source, individual transforms, or sink independently. Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`. ## Scheduler architecture ``` Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N) ▲ │ [stage_rxs] ────────┘◄──────────────────────────────┘ │ [sink_err_rx] ← errors from sink (highest priority) │ Sink thread ``` The scheduler is a single thread running a biased `Select` over all input channels. Priority order (highest first): ``` index 0 sink_err_rx abort on sink error index 1 stage_rxs[N-1] drain last stage first ... index N stage_rxs[0] index N+1 source_rx pull new data last ``` This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline. **Workers** are generic: each receives `(data, SharedFn, result_tx)` and calls `f(data)`, sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result. **Termination** uses an `in_flight` counter: - incremented when an item is dispatched from source to workers - decremented when the item exits the last stage - the loop exits only when `source_done && in_flight == 0` This guarantees all in-flight items complete before `join()`. ## Error handling `PipelineError` has four variants: | Variant | Meaning | |---|---| | `EndOfStream` | Source exhausted (normal termination, not sent downstream) | | `TypeMismatch` | Wrong enum variant arrived at a stage | | `StepKindMismatch` | Internal routing error | | `StepError(Box)` | Error from user code (wrapped by `make_*_fallible!`) | Sink errors flow back to the scheduler via a dedicated `Receiver` registered at index 0 of the Select — the pipeline stops immediately on the first sink error. ## Example ```rust enum PipelineData { Unsigned(u64), Number(f64), Text(String) } fn to_f64(x: u64) -> f64 { x as f64 } fn format_num(n: f64) -> String { format!("{}", n) } fn reverse(s: String) -> String { s.chars().rev().collect() } fn hash(s: String) -> u64 { /* djb2 */ } fn print_hash(h: u64) -> Result<(), std::io::Error> { println!("{}", h); Ok(()) } let pipeline = make_pipeline! { PipelineData, source 1u64..=10 => Unsigned, | to_f64: Unsigned => Number, | format_num: Number => Text, | reverse: Text => Text, | hash: Text => Unsigned, sink? print_hash @ Unsigned, }; WorkerPool::new(pipeline, 4, 64).run(); ```