Files
obikmer/docmd/implementation/obipipeline.md
T
Eric Coissac 036d044291 refactor: update core types and add approximate evidence support
Refactor `Kmer`, `SuperKmer`, and chunk reader into optimized, generic representations with compile-time length parameters and bitwise operations. Update the pipeline and scheduler to support batch processing, 1→N flat transformations, and multi-source merging. Introduce an approximate evidence mode using b-bit fingerprints and `.idx` files, alongside existing exact mode. Update CLI documentation, minimizer selection, and query output schema accordingly.
2026-05-26 10:04:25 +02:00

180 lines
7.8 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# obipipeline — parallel pipeline library
`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **stages**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
## Core types
| Type alias | Rust type | Role |
|---|---|---|
| `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send>` | Called repeatedly; `FnMut` because it holds iterator state |
| `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send + Sync>` | 1→1 transform shared across workers via `Arc::clone` |
| `SharedFlatFn<D>` | `Arc<dyn Fn(D, &Sender<Result<D, _>>, &Sender<isize>) + Send + Sync>` | 1→N transform; pushes items into channel, sends delta |
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send>` | Final consumer; returns `Result` so errors propagate back |
Stages come in two variants:
```rust
pub enum Stage<D> {
Transform(SharedFn<D>), // 1→1
Flat(SharedFlatFn<D>), // 1→N
}
```
`Pipeline<D>` holds one `SourceFn`, a `Vec<Stage>`, and one `SinkFn`.
`WorkerPool<D>` wraps a `Pipeline` with `n_workers` and channel `capacity`.
## WorkerPool
```rust
WorkerPool::new(pipeline: Pipeline<D>, n_workers: usize, capacity: usize) -> Self
WorkerPool::run(self)
```
| Parameter | Role |
|---|---|
| `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. |
| `capacity` | Bound on every crossbeam channel in the pipeline. Controls memory and back-pressure: a full channel blocks the sender until a slot frees. |
`run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.
## Data enum
All pipeline stages communicate through a single user-defined enum:
```rust
enum MyData {
Unsigned(u64),
Number(f64),
Text(String),
}
```
Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages.
## Macros
Eight low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them.
### Low-level
```rust
make_source!(Enum, iterator, OutputVariant) // iterator yields T
make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result<T, E>
make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U
make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>
make_flat_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> impl IntoIterator<Item=U>
make_flat_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<impl IntoIterator<Item=U>, E>
make_sink!(Enum, func, InputVariant) // func: T -> ()
make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E>
```
Each macro wraps the closure in the correct smart pointer (`Box` for source/sink, `Arc` for transforms).
### make_pipeline! DSL
```
make_pipeline! {
DataEnum,
source iterator => OutputVariant, // or source? for fallible
| func: In => Out, // 1→1 non-fallible transform
|? func: In => Out, // 1→1 fallible transform
|| func: In => Out, // 1→N non-fallible flat transform
||? func: In => Out, // 1→N fallible flat transform
sink func @ InputVariant, // or sink? for fallible
}
```
`?` marks fallibility on source, individual transforms, or sink independently.
Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`.
### make_pipe! DSL
`make_pipe!` builds a sourceless/sinkless `Pipe<D, In, Out>` — a reusable, composable stage sequence:
```
make_pipe! {
DataEnum : InType => OutType,
| func: InVariant => OutVariant,
|? func: InVariant => OutVariant,
|| func: InVariant => OutVariant,
||? func: InVariant => OutVariant,
}
```
Two pipes compose with `.then(other)`. Apply to an iterator with `.apply(iter, n_workers, capacity)` to get a `PipeIter<Out>` — an iterator over the pipeline output, backed by a background `WorkerPool`. The scatter step in `obikmer` uses `make_pipe!` and `.apply()` rather than the full `make_pipeline!` / `WorkerPool` pattern.
## Scheduler architecture
```
Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
▲ │
[stage_rxs] ────────┘◄──────────────────────────────┘
[flat_delta_rx] ──► Scheduler (in_flight adjustment)
[sink_err_rx] ← errors from sink (highest priority)
Sink thread
```
The scheduler is a single thread running a biased `Select` over all input channels. Priority order (highest first):
```
index 0 sink_err_rx abort on sink error
index 1 flat_delta_rx adjust in_flight before dispatching
index 2..=n+1 stage_rxs[n-1..0] drain last stage first
index n+2 source_rx pull new data last
```
This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.
**Workers** are generic: each receives a `WorkerTask` — either `Transform(data, stage_idx)` or `Flat(data, stage_idx)`. For `Transform`, the worker calls `f(data)` and sends the result to `stage_txs[stage_idx]`. For `Flat`, the worker calls `f(data, &push_tx, &delta_tx)`: the closure pushes N items into `push_tx` then sends `N-1` to `delta_tx`. The scheduler uses the delta to adjust `in_flight` without knowing N in advance.
**Termination** uses an `in_flight: isize` counter and a `flat_workers_active: usize` counter:
- `in_flight` incremented when an item is dispatched from source to workers
- `in_flight` decremented when the item exits the last stage to the sink
- `flat_workers_active` incremented when a `Flat` task is dispatched, decremented when the delta arrives
- the loop exits only when `source_done && in_flight == 0 && flat_workers_active == 0`
This guarantees all in-flight items complete (including all N outputs of a flat stage) before `join()`.
## Error handling
`PipelineError` has four variants:
| Variant | Meaning |
|---|---|
| `EndOfStream` | Source exhausted (normal termination, not sent downstream) |
| `TypeMismatch` | Wrong enum variant arrived at a stage |
| `StepKindMismatch` | Internal routing error |
| `StepError(Box<dyn Error + Send + Sync>)` | Error from user code (wrapped by `make_*_fallible!`) |
Sink errors flow back to the scheduler via a dedicated `Receiver<PipelineError>` registered at index 0 of the Select — the pipeline stops immediately on the first sink error.
## Example
```rust
enum PipelineData { Unsigned(u64), Number(f64), Text(String) }
fn to_f64(x: u64) -> f64 { x as f64 }
fn format_num(n: f64) -> String { format!("{}", n) }
fn reverse(s: String) -> String { s.chars().rev().collect() }
fn hash(s: String) -> u64 { /* djb2 */ }
fn print_hash(h: u64) -> Result<(), std::io::Error> { println!("{}", h); Ok(()) }
let pipeline = make_pipeline! {
PipelineData,
source 1u64..=10 => Unsigned,
| to_f64: Unsigned => Number,
| format_num: Number => Text,
| reverse: Text => Text,
| hash: Text => Unsigned,
sink? print_hash @ Unsigned,
};
WorkerPool::new(pipeline, 4, 64).run();
```