Files
obikmer/docmd/implementation/obipipeline.md
T
Eric Coissac 22951fb0e8 🔖 Add obipipeline parallel pipeline library
- Introduce `obipipline` crate with multi-threaded data pipeline architecture
- Implement core types: SourceFn, SharedFun (Arc), SinkFN with biased scheduler and crossbeam channels
- Add macros: `make_source!`, `transform!/fallible`/sink!, and high-level DSL macro
- Replace old wrapper/error modules with unified scheduler module (renamed types, improved error variants)

- Update workspace: add `obipipeline` member to Cargo.toml and lockfile  
- Document pipeline in docmd/implementation with full architecture, error handling & example
- Refactor sandbox_pipeline.rs to use new DSL instead of manual channel wiring
2026-04-24 17:10:07 +02:00

148 lines
5.9 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# obipipeline — parallel pipeline library
`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **transforms**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
## Core types
| Type alias | Rust type | Role |
|---|---|---|
| `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send+Sync>` | Called repeatedly; `FnMut` because it holds iterator state |
| `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send+Sync>` | Shared across workers via `Arc::clone` (no copy of the closure) |
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send+Sync>` | Final consumer; returns `Result` so errors propagate back |
`Pipeline<D>` holds one `SourceFn`, a `Vec<SharedFn>`, and one `SinkFn`.
`WorkerPool<D>` wraps a `Pipeline` with `n_workers` and channel `capacity`.
## WorkerPool
```rust
WorkerPool::new(pipeline: Pipeline<D>, n_workers: usize, capacity: usize) -> Self
WorkerPool::run(self)
```
| Parameter | Role |
|---|---|
| `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. |
| `capacity` | Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees. |
`run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.
## Data enum
All pipeline stages communicate through a single user-defined enum:
```rust
enum MyData {
Unsigned(u64),
Number(f64),
Text(String),
}
```
Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages.
## Macros
Six low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them.
### Low-level
```rust
make_source!(Enum, iterator, OutputVariant) // iterator yields T
make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result<T, E>
make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U
make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>
make_sink!(Enum, func, InputVariant) // func: T -> ()
make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E>
```
Each macro wraps the closure in the correct smart pointer (`Box` for source/sink, `Arc` for transforms).
### make_pipeline! DSL
```
make_pipeline! {
DataEnum,
source iterator => OutputVariant, // or source? for fallible
| func: In => Out, // non-fallible transform
|? func: In => Out, // fallible transform
sink func @ InputVariant, // or sink? for fallible
}
```
`?` marks fallibility on source, individual transforms, or sink independently.
Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`.
## Scheduler architecture
```
Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
▲ │
[stage_rxs] ────────┘◄──────────────────────────────┘
[sink_err_rx] ← errors from sink (highest priority)
Sink thread
```
The scheduler is a single thread running a biased `Select` over all input channels. Priority order (highest first):
```
index 0 sink_err_rx abort on sink error
index 1 stage_rxs[N-1] drain last stage first
...
index N stage_rxs[0]
index N+1 source_rx pull new data last
```
This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.
**Workers** are generic: each receives `(data, SharedFn, result_tx)` and calls `f(data)`, sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result.
**Termination** uses an `in_flight` counter:
- incremented when an item is dispatched from source to workers
- decremented when the item exits the last stage
- the loop exits only when `source_done && in_flight == 0`
This guarantees all in-flight items complete before `join()`.
## Error handling
`PipelineError` has four variants:
| Variant | Meaning |
|---|---|
| `EndOfStream` | Source exhausted (normal termination, not sent downstream) |
| `TypeMismatch` | Wrong enum variant arrived at a stage |
| `StepKindMismatch` | Internal routing error |
| `StepError(Box<dyn Error>)` | Error from user code (wrapped by `make_*_fallible!`) |
Sink errors flow back to the scheduler via a dedicated `Receiver<PipelineError>` registered at index 0 of the Select — the pipeline stops immediately on the first sink error.
## Example
```rust
enum PipelineData { Unsigned(u64), Number(f64), Text(String) }
fn to_f64(x: u64) -> f64 { x as f64 }
fn format_num(n: f64) -> String { format!("{}", n) }
fn reverse(s: String) -> String { s.chars().rev().collect() }
fn hash(s: String) -> u64 { /* djb2 */ }
fn print_hash(h: u64) -> Result<(), std::io::Error> { println!("{}", h); Ok(()) }
let pipeline = make_pipeline! {
PipelineData,
source 1u64..=10 => Unsigned,
| to_f64: Unsigned => Number,
| format_num: Number => Text,
| reverse: Text => Text,
| hash: Text => Unsigned,
sink? print_hash @ Unsigned,
};
WorkerPool::new(pipeline, 4, 64).run();
```