148 lines
5.9 KiB
Markdown
148 lines
5.9 KiB
Markdown
|
|
# obipipeline — parallel pipeline library
|
|||
|
|
|
|||
|
|
`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **transforms**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
|
|||
|
|
|
|||
|
|
## Core types
|
|||
|
|
|
|||
|
|
| Type alias | Rust type | Role |
|
|||
|
|
|---|---|---|
|
|||
|
|
| `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send+Sync>` | Called repeatedly; `FnMut` because it holds iterator state |
|
|||
|
|
| `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send+Sync>` | Shared across workers via `Arc::clone` (no copy of the closure) |
|
|||
|
|
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send+Sync>` | Final consumer; returns `Result` so errors propagate back |
|
|||
|
|
|
|||
|
|
`Pipeline<D>` holds one `SourceFn`, a `Vec<SharedFn>`, and one `SinkFn`.
|
|||
|
|
`WorkerPool<D>` wraps a `Pipeline` with `n_workers` and channel `capacity`.
|
|||
|
|
|
|||
|
|
## WorkerPool
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
WorkerPool::new(pipeline: Pipeline<D>, n_workers: usize, capacity: usize) -> Self
|
|||
|
|
WorkerPool::run(self)
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
| Parameter | Role |
|
|||
|
|
|---|---|
|
|||
|
|
| `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. |
|
|||
|
|
| `capacity` | Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees. |
|
|||
|
|
|
|||
|
|
`run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.
|
|||
|
|
|
|||
|
|
## Data enum
|
|||
|
|
|
|||
|
|
All pipeline stages communicate through a single user-defined enum:
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
enum MyData {
|
|||
|
|
Unsigned(u64),
|
|||
|
|
Number(f64),
|
|||
|
|
Text(String),
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages.
|
|||
|
|
|
|||
|
|
## Macros
|
|||
|
|
|
|||
|
|
Six low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them.
|
|||
|
|
|
|||
|
|
### Low-level
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
make_source!(Enum, iterator, OutputVariant) // iterator yields T
|
|||
|
|
make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result<T, E>
|
|||
|
|
|
|||
|
|
make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U
|
|||
|
|
make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>
|
|||
|
|
|
|||
|
|
make_sink!(Enum, func, InputVariant) // func: T -> ()
|
|||
|
|
make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E>
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Each macro wraps the closure in the correct smart pointer (`Box` for source/sink, `Arc` for transforms).
|
|||
|
|
|
|||
|
|
### make_pipeline! DSL
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
make_pipeline! {
|
|||
|
|
DataEnum,
|
|||
|
|
source iterator => OutputVariant, // or source? for fallible
|
|||
|
|
| func: In => Out, // non-fallible transform
|
|||
|
|
|? func: In => Out, // fallible transform
|
|||
|
|
sink func @ InputVariant, // or sink? for fallible
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
`?` marks fallibility on source, individual transforms, or sink independently.
|
|||
|
|
Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`.
|
|||
|
|
|
|||
|
|
## Scheduler architecture
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
|
|||
|
|
▲ │
|
|||
|
|
[stage_rxs] ────────┘◄──────────────────────────────┘
|
|||
|
|
│
|
|||
|
|
[sink_err_rx] ← errors from sink (highest priority)
|
|||
|
|
│
|
|||
|
|
Sink thread
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
The scheduler is a single thread running a biased `Select` over all input channels. Priority order (highest first):
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
index 0 sink_err_rx abort on sink error
|
|||
|
|
index 1 stage_rxs[N-1] drain last stage first
|
|||
|
|
...
|
|||
|
|
index N stage_rxs[0]
|
|||
|
|
index N+1 source_rx pull new data last
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.
|
|||
|
|
|
|||
|
|
**Workers** are generic: each receives `(data, SharedFn, result_tx)` and calls `f(data)`, sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result.
|
|||
|
|
|
|||
|
|
**Termination** uses an `in_flight` counter:
|
|||
|
|
|
|||
|
|
- incremented when an item is dispatched from source to workers
|
|||
|
|
- decremented when the item exits the last stage
|
|||
|
|
- the loop exits only when `source_done && in_flight == 0`
|
|||
|
|
|
|||
|
|
This guarantees all in-flight items complete before `join()`.
|
|||
|
|
|
|||
|
|
## Error handling
|
|||
|
|
|
|||
|
|
`PipelineError` has four variants:
|
|||
|
|
|
|||
|
|
| Variant | Meaning |
|
|||
|
|
|---|---|
|
|||
|
|
| `EndOfStream` | Source exhausted (normal termination, not sent downstream) |
|
|||
|
|
| `TypeMismatch` | Wrong enum variant arrived at a stage |
|
|||
|
|
| `StepKindMismatch` | Internal routing error |
|
|||
|
|
| `StepError(Box<dyn Error>)` | Error from user code (wrapped by `make_*_fallible!`) |
|
|||
|
|
|
|||
|
|
Sink errors flow back to the scheduler via a dedicated `Receiver<PipelineError>` registered at index 0 of the Select — the pipeline stops immediately on the first sink error.
|
|||
|
|
|
|||
|
|
## Example
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
enum PipelineData { Unsigned(u64), Number(f64), Text(String) }
|
|||
|
|
|
|||
|
|
fn to_f64(x: u64) -> f64 { x as f64 }
|
|||
|
|
fn format_num(n: f64) -> String { format!("{}", n) }
|
|||
|
|
fn reverse(s: String) -> String { s.chars().rev().collect() }
|
|||
|
|
fn hash(s: String) -> u64 { /* djb2 */ }
|
|||
|
|
fn print_hash(h: u64) -> Result<(), std::io::Error> { println!("{}", h); Ok(()) }
|
|||
|
|
|
|||
|
|
let pipeline = make_pipeline! {
|
|||
|
|
PipelineData,
|
|||
|
|
source 1u64..=10 => Unsigned,
|
|||
|
|
| to_f64: Unsigned => Number,
|
|||
|
|
| format_num: Number => Text,
|
|||
|
|
| reverse: Text => Text,
|
|||
|
|
| hash: Text => Unsigned,
|
|||
|
|
sink? print_hash @ Unsigned,
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
WorkerPool::new(pipeline, 4, 64).run();
|
|||
|
|
```
|