refactor: update core types and add approximate evidence support
Refactor `Kmer`, `SuperKmer`, and chunk reader into optimized, generic representations with compile-time length parameters and bitwise operations. Update the pipeline and scheduler to support batch processing, 1→N flat transformations, and multi-source merging. Introduce an approximate evidence mode using b-bit fingerprints and `.idx` files, alongside existing exact mode. Update CLI documentation, minimizer selection, and query output schema accordingly.
This commit is contained in:
@@ -1,16 +1,26 @@
|
||||
# obipipeline — parallel pipeline library
|
||||
|
||||
`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **transforms**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
|
||||
`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **stages**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
|
||||
|
||||
## Core types
|
||||
|
||||
| Type alias | Rust type | Role |
|
||||
|---|---|---|
|
||||
| `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send+Sync>` | Called repeatedly; `FnMut` because it holds iterator state |
|
||||
| `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send+Sync>` | Shared across workers via `Arc::clone` (no copy of the closure) |
|
||||
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send+Sync>` | Final consumer; returns `Result` so errors propagate back |
|
||||
| `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send>` | Called repeatedly; `FnMut` because it holds iterator state |
|
||||
| `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send + Sync>` | 1→1 transform shared across workers via `Arc::clone` |
|
||||
| `SharedFlatFn<D>` | `Arc<dyn Fn(D, &Sender<Result<D, _>>, &Sender<isize>) + Send + Sync>` | 1→N transform; pushes items into channel, sends delta |
|
||||
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send>` | Final consumer; returns `Result` so errors propagate back |
|
||||
|
||||
`Pipeline<D>` holds one `SourceFn`, a `Vec<SharedFn>`, and one `SinkFn`.
|
||||
Stages come in two variants:
|
||||
|
||||
```rust
|
||||
pub enum Stage<D> {
|
||||
Transform(SharedFn<D>), // 1→1
|
||||
Flat(SharedFlatFn<D>), // 1→N
|
||||
}
|
||||
```
|
||||
|
||||
`Pipeline<D>` holds one `SourceFn`, a `Vec<Stage>`, and one `SinkFn`.
|
||||
`WorkerPool<D>` wraps a `Pipeline` with `n_workers` and channel `capacity`.
|
||||
|
||||
## WorkerPool
|
||||
@@ -23,7 +33,7 @@ WorkerPool::run(self)
|
||||
| Parameter | Role |
|
||||
|---|---|
|
||||
| `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. |
|
||||
| `capacity` | Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees. |
|
||||
| `capacity` | Bound on every crossbeam channel in the pipeline. Controls memory and back-pressure: a full channel blocks the sender until a slot frees. |
|
||||
|
||||
`run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.
|
||||
|
||||
@@ -43,7 +53,7 @@ Each variant carries the concrete type for one stage's output. The macros patter
|
||||
|
||||
## Macros
|
||||
|
||||
Six low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them.
|
||||
Eight low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them.
|
||||
|
||||
### Low-level
|
||||
|
||||
@@ -54,6 +64,9 @@ make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result<T
|
||||
make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U
|
||||
make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>
|
||||
|
||||
make_flat_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> impl IntoIterator<Item=U>
|
||||
make_flat_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<impl IntoIterator<Item=U>, E>
|
||||
|
||||
make_sink!(Enum, func, InputVariant) // func: T -> ()
|
||||
make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E>
|
||||
```
|
||||
@@ -66,8 +79,10 @@ Each macro wraps the closure in the correct smart pointer (`Box` for source/sink
|
||||
make_pipeline! {
|
||||
DataEnum,
|
||||
source iterator => OutputVariant, // or source? for fallible
|
||||
| func: In => Out, // non-fallible transform
|
||||
|? func: In => Out, // fallible transform
|
||||
| func: In => Out, // 1→1 non-fallible transform
|
||||
|? func: In => Out, // 1→1 fallible transform
|
||||
|| func: In => Out, // 1→N non-fallible flat transform
|
||||
||? func: In => Out, // 1→N fallible flat transform
|
||||
sink func @ InputVariant, // or sink? for fallible
|
||||
}
|
||||
```
|
||||
@@ -75,12 +90,29 @@ make_pipeline! {
|
||||
`?` marks fallibility on source, individual transforms, or sink independently.
|
||||
Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`.
|
||||
|
||||
### make_pipe! DSL
|
||||
|
||||
`make_pipe!` builds a sourceless/sinkless `Pipe<D, In, Out>` — a reusable, composable stage sequence:
|
||||
|
||||
```
|
||||
make_pipe! {
|
||||
DataEnum : InType => OutType,
|
||||
| func: InVariant => OutVariant,
|
||||
|? func: InVariant => OutVariant,
|
||||
|| func: InVariant => OutVariant,
|
||||
||? func: InVariant => OutVariant,
|
||||
}
|
||||
```
|
||||
|
||||
Two pipes compose with `.then(other)`. Apply to an iterator with `.apply(iter, n_workers, capacity)` to get a `PipeIter<Out>` — an iterator over the pipeline output, backed by a background `WorkerPool`. The scatter step in `obikmer` uses `make_pipe!` and `.apply()` rather than the full `make_pipeline!` / `WorkerPool` pattern.
|
||||
|
||||
## Scheduler architecture
|
||||
|
||||
```
|
||||
Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
|
||||
▲ │
|
||||
[stage_rxs] ────────┘◄──────────────────────────────┘
|
||||
[flat_delta_rx] ──► Scheduler (in_flight adjustment)
|
||||
│
|
||||
[sink_err_rx] ← errors from sink (highest priority)
|
||||
│
|
||||
@@ -91,23 +123,23 @@ The scheduler is a single thread running a biased `Select` over all input channe
|
||||
|
||||
```
|
||||
index 0 sink_err_rx abort on sink error
|
||||
index 1 stage_rxs[N-1] drain last stage first
|
||||
...
|
||||
index N stage_rxs[0]
|
||||
index N+1 source_rx pull new data last
|
||||
index 1 flat_delta_rx adjust in_flight before dispatching
|
||||
index 2..=n+1 stage_rxs[n-1..0] drain last stage first
|
||||
index n+2 source_rx pull new data last
|
||||
```
|
||||
|
||||
This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.
|
||||
|
||||
**Workers** are generic: each receives `(data, SharedFn, result_tx)` and calls `f(data)`, sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result.
|
||||
**Workers** are generic: each receives a `WorkerTask` — either `Transform(data, stage_idx)` or `Flat(data, stage_idx)`. For `Transform`, the worker calls `f(data)` and sends the result to `stage_txs[stage_idx]`. For `Flat`, the worker calls `f(data, &push_tx, &delta_tx)`: the closure pushes N items into `push_tx` then sends `N-1` to `delta_tx`. The scheduler uses the delta to adjust `in_flight` without knowing N in advance.
|
||||
|
||||
**Termination** uses an `in_flight` counter:
|
||||
**Termination** uses an `in_flight: isize` counter and a `flat_workers_active: usize` counter:
|
||||
|
||||
- incremented when an item is dispatched from source to workers
|
||||
- decremented when the item exits the last stage
|
||||
- the loop exits only when `source_done && in_flight == 0`
|
||||
- `in_flight` incremented when an item is dispatched from source to workers
|
||||
- `in_flight` decremented when the item exits the last stage to the sink
|
||||
- `flat_workers_active` incremented when a `Flat` task is dispatched, decremented when the delta arrives
|
||||
- the loop exits only when `source_done && in_flight == 0 && flat_workers_active == 0`
|
||||
|
||||
This guarantees all in-flight items complete before `join()`.
|
||||
This guarantees all in-flight items complete (including all N outputs of a flat stage) before `join()`.
|
||||
|
||||
## Error handling
|
||||
|
||||
@@ -118,7 +150,7 @@ This guarantees all in-flight items complete before `join()`.
|
||||
| `EndOfStream` | Source exhausted (normal termination, not sent downstream) |
|
||||
| `TypeMismatch` | Wrong enum variant arrived at a stage |
|
||||
| `StepKindMismatch` | Internal routing error |
|
||||
| `StepError(Box<dyn Error>)` | Error from user code (wrapped by `make_*_fallible!`) |
|
||||
| `StepError(Box<dyn Error + Send + Sync>)` | Error from user code (wrapped by `make_*_fallible!`) |
|
||||
|
||||
Sink errors flow back to the scheduler via a dedicated `Receiver<PipelineError>` registered at index 0 of the Select — the pipeline stops immediately on the first sink error.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user