🔖 Add obipipeline parallel pipeline library

- Introduce `obipipline` crate with multi-threaded data pipeline architecture
- Implement core types: SourceFn, SharedFun (Arc), SinkFN with biased scheduler and crossbeam channels
- Add macros: `make_source!`, `transform!/fallible`/sink!, and high-level DSL macro
- Replace old wrapper/error modules with unified scheduler module (renamed types, improved error variants)

- Update workspace: add `obipipeline` member to Cargo.toml and lockfile  
- Document pipeline in docmd/implementation with full architecture, error handling & example
- Refactor sandbox_pipeline.rs to use new DSL instead of manual channel wiring
This commit is contained in:
Eric Coissac
2026-04-24 08:52:46 +02:00
parent 3f8880a7e5
commit 22951fb0e8
11 changed files with 578 additions and 271 deletions
+147
View File
@@ -0,0 +1,147 @@
# obipipeline — parallel pipeline library
`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **transforms**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.
## Core types
| Type alias | Rust type | Role |
|---|---|---|
| `SourceFn<D>` | `Box<dyn FnMut() -> Result<D, PipelineError> + Send+Sync>` | Called repeatedly; `FnMut` because it holds iterator state |
| `SharedFn<D>` | `Arc<dyn Fn(D) -> Result<D, PipelineError> + Send+Sync>` | Shared across workers via `Arc::clone` (no copy of the closure) |
| `SinkFn<D>` | `Box<dyn Fn(D) -> Result<(), PipelineError> + Send+Sync>` | Final consumer; returns `Result` so errors propagate back |
`Pipeline<D>` holds one `SourceFn`, a `Vec<SharedFn>`, and one `SinkFn`.
`WorkerPool<D>` wraps a `Pipeline` with `n_workers` and channel `capacity`.
## WorkerPool
```rust
WorkerPool::new(pipeline: Pipeline<D>, n_workers: usize, capacity: usize) -> Self
WorkerPool::run(self)
```
| Parameter | Role |
|---|---|
| `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. |
| `capacity` | Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees. |
`run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.
## Data enum
All pipeline stages communicate through a single user-defined enum:
```rust
enum MyData {
Unsigned(u64),
Number(f64),
Text(String),
}
```
Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages.
## Macros
Six low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them.
### Low-level
```rust
make_source!(Enum, iterator, OutputVariant) // iterator yields T
make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result<T, E>
make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U
make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result<U, E>
make_sink!(Enum, func, InputVariant) // func: T -> ()
make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E>
```
Each macro wraps the closure in the correct smart pointer (`Box` for source/sink, `Arc` for transforms).
### make_pipeline! DSL
```
make_pipeline! {
DataEnum,
source iterator => OutputVariant, // or source? for fallible
| func: In => Out, // non-fallible transform
|? func: In => Out, // fallible transform
sink func @ InputVariant, // or sink? for fallible
}
```
`?` marks fallibility on source, individual transforms, or sink independently.
Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`.
## Scheduler architecture
```
Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
▲ │
[stage_rxs] ────────┘◄──────────────────────────────┘
[sink_err_rx] ← errors from sink (highest priority)
Sink thread
```
The scheduler is a single thread running a biased `Select` over all input channels. Priority order (highest first):
```
index 0 sink_err_rx abort on sink error
index 1 stage_rxs[N-1] drain last stage first
...
index N stage_rxs[0]
index N+1 source_rx pull new data last
```
This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.
**Workers** are generic: each receives `(data, SharedFn, result_tx)` and calls `f(data)`, sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result.
**Termination** uses an `in_flight` counter:
- incremented when an item is dispatched from source to workers
- decremented when the item exits the last stage
- the loop exits only when `source_done && in_flight == 0`
This guarantees all in-flight items complete before `join()`.
## Error handling
`PipelineError` has four variants:
| Variant | Meaning |
|---|---|
| `EndOfStream` | Source exhausted (normal termination, not sent downstream) |
| `TypeMismatch` | Wrong enum variant arrived at a stage |
| `StepKindMismatch` | Internal routing error |
| `StepError(Box<dyn Error>)` | Error from user code (wrapped by `make_*_fallible!`) |
Sink errors flow back to the scheduler via a dedicated `Receiver<PipelineError>` registered at index 0 of the Select — the pipeline stops immediately on the first sink error.
## Example
```rust
enum PipelineData { Unsigned(u64), Number(f64), Text(String) }
fn to_f64(x: u64) -> f64 { x as f64 }
fn format_num(n: f64) -> String { format!("{}", n) }
fn reverse(s: String) -> String { s.chars().rev().collect() }
fn hash(s: String) -> u64 { /* djb2 */ }
fn print_hash(h: u64) -> Result<(), std::io::Error> { println!("{}", h); Ok(()) }
let pipeline = make_pipeline! {
PipelineData,
source 1u64..=10 => Unsigned,
| to_f64: Unsigned => Number,
| format_num: Number => Text,
| reverse: Text => Text,
| hash: Text => Unsigned,
sink? print_hash @ Unsigned,
};
WorkerPool::new(pipeline, 4, 64).run();
```
+1
View File
@@ -34,6 +34,7 @@ nav:
- Kmer: implementation/kmer.md - Kmer: implementation/kmer.md
- Chunk reader: implementation/chunkreader.md - Chunk reader: implementation/chunkreader.md
- Construction pipeline: implementation/pipeline.md - Construction pipeline: implementation/pipeline.md
- obipipeline library: implementation/obipipeline.md
- On-disk storage: implementation/storage.md - On-disk storage: implementation/storage.md
- MPHF selection: implementation/mphf.md - MPHF selection: implementation/mphf.md
- Architecture: - Architecture:
+7
View File
@@ -692,6 +692,13 @@ dependencies = [
"criterion2", "criterion2",
] ]
[[package]]
name = "obipipeline"
version = "0.1.0"
dependencies = [
"crossbeam-channel",
]
[[package]] [[package]]
name = "obiread" name = "obiread"
version = "0.1.0" version = "0.1.0"
+1 -1
View File
@@ -1,3 +1,3 @@
[workspace] [workspace]
resolver = "3" resolver = "3"
members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope"] members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline"]
+3
View File
@@ -2,3 +2,6 @@
name = "obipipeline" name = "obipipeline"
version = "0.1.0" version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies]
crossbeam-channel = "0.5.15"
+40 -41
View File
@@ -1,19 +1,16 @@
use crossbeam_channel::bounded; use obipipeline::{WorkerPool, make_pipeline};
use obipipeline::{Connection, PipelineData, PipelineError}; use std::io::Error;
// ── Itérateur source ──────────────────────────────────────────────────────────
struct Compteur { struct Compteur {
count: u64, count: u64,
start: u64,
stop: u64, stop: u64,
} }
impl Compteur { impl Compteur {
fn new(start: u64, stop: u64) -> Self { fn new(start: u64, stop: u64) -> Self {
Self { Self { count: start, stop }
count: start,
start,
stop,
}
} }
} }
@@ -30,51 +27,53 @@ impl Iterator for Compteur {
} }
} }
fn step_1(number: f64) -> Result<String, PipelineError> { // ── Fonctions de transformation ───────────────────────────────────────────────
Ok(format!("{}", number))
fn to_f64(x: u64) -> f64 {
x as f64
} }
fn step_2(text: String) -> Result<String, PipelineError> { fn step_1(number: f64) -> String {
Ok(text.chars().rev().collect()) format!("{}", number)
} }
fn step_3(text: String) -> Result<u64, PipelineError> { fn step_2(text: String) -> String {
text.chars().rev().collect()
}
fn step_3(text: String) -> u64 {
let mut hash: u64 = 5381; let mut hash: u64 = 5381;
for byte in text.bytes() { for byte in text.bytes() {
hash = hash.wrapping_mul(33).wrapping_add(byte as u64); hash = hash.wrapping_mul(33).wrapping_add(byte as u64);
} }
Ok(hash) hash
} }
fn step_4(hash: u64) -> Result<(), PipelineError> { fn step_4(hash: u64) -> Result<(), Error> {
println!("{:?}", hash); println!("{}", hash);
Ok(()) Ok(())
} }
fn main() { // ── Type de données du pipeline ───────────────────────────────────────────────
// Create bounded channels for connection between stages
let (tx0, rx0) = bounded(100);
let (tx1, rx1) = bounded(100);
let (tx2, rx2) = bounded(100);
let (tx3, rx3) = bounded(100);
// Create connections using crossbeam channels enum PipelineData {
let conn0 = Connection { Unsigned(u64),
source: tx0, Number(f64),
target: rx0, Text(String),
}; }
let conn1 = Connection {
source: tx1, // ── Main ──────────────────────────────────────────────────────────────────────
target: rx1,
}; fn main() {
let conn2 = Connection { let pipeline = make_pipeline! {
source: tx2, PipelineData,
target: rx2, source Compteur::new(1000, 1010) => Unsigned,
}; | to_f64: Unsigned => Number,
let conn3 = Connection { | step_1: Number => Text,
source: tx3, | step_2: Text => Text,
target: rx3, | step_3: Text => Unsigned,
}; sink? step_4 @ Unsigned,
};
println!("Pipeline with crossbeam channel connections created");
WorkerPool::new(pipeline, 4, 64).run();
} }
-4
View File
@@ -1,4 +0,0 @@
pub enum PipelineError {
TypeMismatch,
StepError(Box<dyn std::error::Error + Send + Sync>),
}
+10 -2
View File
@@ -1,2 +1,10 @@
pub mod error; mod scheduler;
pub mod wrapper;
pub use scheduler::Pipeline;
pub use scheduler::PipelineError;
pub use scheduler::SharedFn;
pub use scheduler::SinkFn;
pub use scheduler::SourceFn;
pub use scheduler::WorkerPool;
// make_sink, make_sink_fallible, make_source, make_source_fallible,
// make_transform, make_transform_fallible are exported at crate root via #[macro_export]
+368 -132
View File
@@ -1,13 +1,16 @@
use crossbeam_channel::{bounded, Receiver, Sender}; use crossbeam_channel::{Receiver, Select, Sender, bounded};
use std::thread;
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::sync::Arc;
use std::thread;
/// Error type for pipeline operations. /// Error type for pipeline operations.
#[derive(Debug)] #[derive(Debug)]
pub enum PipelineError { pub enum PipelineError {
/// A stage received a `PipelineData` variant it did not expect. /// A stage received a `PipelineData` variant it did not expect.
TypeMismatch, TypeMismatch,
/// The step kind is not compatible with the data type.
StepKindMismatch(&'static str),
/// The source has no more data to produce. /// The source has no more data to produce.
EndOfStream, EndOfStream,
/// An error occurred inside a stage (e.g., I/O, parsing, custom logic). /// An error occurred inside a stage (e.g., I/O, parsing, custom logic).
@@ -18,6 +21,7 @@ impl fmt::Display for PipelineError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
PipelineError::TypeMismatch => write!(f, "data type mismatch in pipeline stage"), PipelineError::TypeMismatch => write!(f, "data type mismatch in pipeline stage"),
PipelineError::StepKindMismatch(s) => write!(f, "step kind mismatch: {}", s),
PipelineError::EndOfStream => write!(f, "end of input stream"), PipelineError::EndOfStream => write!(f, "end of input stream"),
PipelineError::StepError(e) => write!(f, "stage error: {}", e), PipelineError::StepError(e) => write!(f, "stage error: {}", e),
} }
@@ -64,19 +68,12 @@ impl Error for PipelineError {
/// - A `Source` does not take an input; it yields an iterator of `Result<DATA, ERROR>`. /// - A `Source` does not take an input; it yields an iterator of `Result<DATA, ERROR>`.
/// - A `Transform` takes a `DATA` and returns a `Result<DATA, ERROR>`. /// - A `Transform` takes a `DATA` and returns a `Result<DATA, ERROR>`.
/// - A `Sink` takes a `DATA` and performs a side effect (no return value). /// - A `Sink` takes a `DATA` and performs a side effect (no return value).
pub enum StepKind<DATA, ERROR> { /// Fonction source : appelée répétitivement, retourne le prochain item ou EndOfStream.
/// Source: called repeatedly to produce the next data item. /// `FnMut` car elle maintient un état interne (position dans l'itérateur).
/// Returns `Ok(data)` when a value is available, pub type SourceFn<D> = Box<dyn FnMut() -> Result<D, PipelineError> + Send + Sync>;
/// `Err(e)` on error, and `Err(PipelineError::EndOfStream)` when exhausted.
Source(Box<dyn FnMut() -> Result<DATA, ERROR> + Send + Sync>),
/// Transform: pure computation from input to output (may fail).
Transform(Box<dyn Fn(DATA) -> Result<DATA, ERROR> + Send + Sync>),
/// Sink: final consumer, no result.
Sink(Box<dyn Fn(DATA) + Send + Sync>),
}
/// Fonction sink : consomme un item final, peut échouer (erreur d'I/O, etc.).
pub type SinkFn<D> = Box<dyn Fn(D) -> Result<(), PipelineError> + Send + Sync>;
/// Creates a `StepKind::Source` from an iterator of plain values. /// Creates a `StepKind::Source` from an iterator of plain values.
/// Each value is wrapped with the specified `$enum` variant and returned as `Ok`. /// Each value is wrapped with the specified `$enum` variant and returned as `Ok`.
@@ -87,18 +84,20 @@ pub enum StepKind<DATA, ERROR> {
/// let iter = vec![1.0, 2.0].into_iter(); /// let iter = vec![1.0, 2.0].into_iter();
/// let source = make_source!(PipelineData, iter, Numeric); /// let source = make_source!(PipelineData, iter, Numeric);
/// ``` /// ```
#[macro_export]
macro_rules! make_source { macro_rules! make_source {
($enum:ident, $iterator:expr, $output:ident) => { ($enum:ident, $iterator:expr, $output:ident) => {{
StepKind::Source(Box::new({ let mut iter = $iterator.into_iter();
let mut iter = $iterator.into_iter(); Box::new(
move || -> Result<$enum, PipelineError> { move || -> ::std::result::Result<$enum, $crate::PipelineError> {
match iter.next() { match iter.next() {
Some(x) => Ok($enum::$output(x)), Some(x) => Ok($enum::$output(x)),
None => Err(PipelineError::EndOfStream), None => Err($crate::PipelineError::EndOfStream),
} }
} },
})) )
}; as Box<dyn FnMut() -> ::std::result::Result<$enum, $crate::PipelineError> + Send + Sync>
}};
} }
/// Creates a `StepKind::Source` from an iterator of `Result<T, E>`. /// Creates a `StepKind::Source` from an iterator of `Result<T, E>`.
@@ -111,19 +110,21 @@ macro_rules! make_source {
/// let iter = vec![Ok(1.0), Err("oops")].into_iter(); /// let iter = vec![Ok(1.0), Err("oops")].into_iter();
/// let source = make_source_fallible!(PipelineData, iter, Numeric); /// let source = make_source_fallible!(PipelineData, iter, Numeric);
/// ``` /// ```
#[macro_export]
macro_rules! make_source_fallible { macro_rules! make_source_fallible {
($enum:ident, $iterator:expr, $output:ident) => { ($enum:ident, $iterator:expr, $output:ident) => {{
StepKind::Source(Box::new({ let mut iter = $iterator.into_iter();
let mut iter = $iterator.into_iter(); Box::new(
move || -> Result<$enum, PipelineError> { move || -> ::std::result::Result<$enum, $crate::PipelineError> {
match iter.next() { match iter.next() {
Some(Ok(x)) => Ok($enum::$output(x)), Some(Ok(x)) => Ok($enum::$output(x)),
Some(Err(e)) => Err(PipelineError::StepError(Box::new(e))), Some(Err(e)) => Err($crate::PipelineError::StepError(Box::new(e))),
None => Err(PipelineError::EndOfStream), None => Err($crate::PipelineError::EndOfStream),
} }
} },
})) )
}; as Box<dyn FnMut() -> ::std::result::Result<$enum, $crate::PipelineError> + Send + Sync>
}};
} }
/// Creates a pipeline stage from a pure (non-fallible) function. /// Creates a pipeline stage from a pure (non-fallible) function.
@@ -157,14 +158,20 @@ macro_rules! make_source_fallible {
/// let result = to_string_stage(PipelineData::Int(42)).unwrap(); /// let result = to_string_stage(PipelineData::Int(42)).unwrap();
/// assert!(matches!(result, PipelineData::String(s) if s == "42")); /// assert!(matches!(result, PipelineData::String(s) if s == "42"));
/// ``` /// ```
#[macro_export]
macro_rules! make_transform { macro_rules! make_transform {
($enum:ident, $func:ident, $input:ident, $output:ident) => { ($enum:ident, $func:ident, $input:ident, $output:ident) => {
StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> { ::std::sync::Arc::from(Box::new(
match data { |data: $enum| -> ::std::result::Result<$enum, $crate::PipelineError> {
$enum::$input(x) => Ok($enum::$output($func(x))), match data {
_ => Err(PipelineError::TypeMismatch), $enum::$input(x) => Ok($enum::$output($func(x))),
} _ => Err($crate::PipelineError::TypeMismatch),
})) }
},
)
as Box<
dyn Fn($enum) -> ::std::result::Result<$enum, $crate::PipelineError> + Send + Sync,
>)
}; };
} }
@@ -203,17 +210,24 @@ macro_rules! make_transform {
/// let result = parse_stage(PipelineData::String("42".into())).unwrap(); /// let result = parse_stage(PipelineData::String("42".into())).unwrap();
/// assert!(matches!(result, PipelineData::Int(n) if n == 42)); /// assert!(matches!(result, PipelineData::Int(n) if n == 42));
/// ``` /// ```
#[macro_export]
macro_rules! make_transform_fallible { macro_rules! make_transform_fallible {
($enum:ident, $func:ident, $input:ident, $output:ident) => { ($enum:ident, $func:ident, $input:ident, $output:ident) => {
StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> { ::std::sync::Arc::from(Box::new(
match data { |data: $enum| -> ::std::result::Result<$enum, $crate::PipelineError> {
PipelineData::$input(inner) => { match data {
let result = $func(inner).map_err(|e| PipelineError::StepError(Box::new(e)))?; $enum::$input(inner) => {
Ok($enum::$output(result)) let result = $func(inner)
.map_err(|e| $crate::PipelineError::StepError(Box::new(e)))?;
Ok($enum::$output(result))
}
_ => Err($crate::PipelineError::TypeMismatch),
} }
_ => Err(PipelineError::TypeMismatch), },
} )
} as Box<
dyn Fn($enum) -> ::std::result::Result<$enum, $crate::PipelineError> + Send + Sync,
>)
}; };
} }
@@ -226,17 +240,21 @@ macro_rules! make_transform_fallible {
/// fn print_number(n: f64) { println!("{}", n); } /// fn print_number(n: f64) { println!("{}", n); }
/// let sink = make_sink!(PipelineData, print_number, Numeric); /// let sink = make_sink!(PipelineData, print_number, Numeric);
/// ``` /// ```
#[macro_export]
macro_rules! make_sink { macro_rules! make_sink {
($enum:ident, $func:ident, $input:ident) => { ($enum:ident, $func:ident, $input:ident) => {
StepKind::Sink(Box::new(|data: $enum| -> Result<(), PipelineError> { Box::new(
match data { |data: $enum| -> ::std::result::Result<(), $crate::PipelineError> {
$enum::$input(x) => { match data {
$func(x); $enum::$input(x) => {
Ok(()) $func(x);
Ok(())
}
_ => Err($crate::PipelineError::TypeMismatch),
} }
_ => Err(PipelineError::TypeMismatch), },
} )
})) as Box<dyn Fn($enum) -> ::std::result::Result<(), $crate::PipelineError> + Send + Sync>
}; };
} }
@@ -248,132 +266,350 @@ macro_rules! make_sink {
/// fn save_to_file(hash: u64) -> std::io::Result<()> { Ok(()) } /// fn save_to_file(hash: u64) -> std::io::Result<()> { Ok(()) }
/// let sink = make_sink_fallible!(PipelineData, save_to_file, Hash); /// let sink = make_sink_fallible!(PipelineData, save_to_file, Hash);
/// ``` /// ```
#[macro_export]
macro_rules! make_sink_fallible { macro_rules! make_sink_fallible {
($enum:ident, $func:ident, $input:ident) => { ($enum:ident, $func:ident, $input:ident) => {
StepKind::Sink(Box::new(|data: $enum| -> Result<(), PipelineError> { Box::new(
match data { |data: $enum| -> ::std::result::Result<(), $crate::PipelineError> {
$enum::$input(x) => { match data {
$func(x).map_err(|e| PipelineError::StepError(Box::new(e))) $enum::$input(inner) => {
$func(inner).map_err(|e| $crate::PipelineError::StepError(Box::new(e)))
}
_ => Err($crate::PipelineError::TypeMismatch),
} }
_ => Err(PipelineError::TypeMismatch), },
} )
})) as Box<dyn Fn($enum) -> ::std::result::Result<(), $crate::PipelineError> + Send + Sync>
}; };
} }
struct Step<DATA, ERROR> { /// Construit un `Pipeline` à partir d'une source, d'une liste de transforms et d'un sink.
input: Option<DATA>, ///
output: Option<DATA>, /// Syntaxe :
func: StepKind<DATA, ERROR>, /// ```ignore
/// make_pipeline! {
/// MyData,
/// source my_iter => Variant, // source non-fallible
/// source? my_iter => Variant, // source fallible (Result<T, E>)
/// | func: In => Out, // transform non-fallible (répété 0..N fois)
/// |? func: In => Out, // transform fallible (répété 0..N fois)
/// sink my_func @ Variant, // sink non-fallible
/// sink? my_func @ Variant, // sink fallible
/// }
/// ```
///
/// Implémenté comme un TT muncher : la règle interne `@build` traite les transforms
/// un par un en les accumulant, puis termine sur `sink`/`sink?`.
#[macro_export]
macro_rules! make_pipeline {
// ── Points d'entrée ──────────────────────────────────────────────────
($enum:ident, source $src:expr => $src_out:ident, $($rest:tt)*) => {
$crate::make_pipeline!(@build $enum,
{ $crate::make_source!($enum, $src, $src_out) },
[],
$($rest)*)
};
($enum:ident, source? $src:expr => $src_out:ident, $($rest:tt)*) => {
$crate::make_pipeline!(@build $enum,
{ $crate::make_source_fallible!($enum, $src, $src_out) },
[],
$($rest)*)
};
// ── Accumulation des transforms ──────────────────────────────────────
// transform non-fallible : |
(@build $enum:ident, $source:tt, [$($acc:tt)*],
| $tf:ident : $t_in:ident => $t_out:ident, $($rest:tt)*) => {
$crate::make_pipeline!(@build $enum, $source,
[$($acc)* $crate::make_transform!($enum, $tf, $t_in, $t_out),],
$($rest)*)
};
// transform fallible : |?
(@build $enum:ident, $source:tt, [$($acc:tt)*],
|? $tf:ident : $t_in:ident => $t_out:ident, $($rest:tt)*) => {
$crate::make_pipeline!(@build $enum, $source,
[$($acc)* $crate::make_transform_fallible!($enum, $tf, $t_in, $t_out),],
$($rest)*)
};
// ── Terminaison : sink ───────────────────────────────────────────────
(@build $enum:ident, $source:tt, [$($acc:tt)*],
sink $sink_fn:ident @ $sink_in:ident $(,)?) => {
$crate::Pipeline::new(
$source,
vec![$($acc)*],
$crate::make_sink!($enum, $sink_fn, $sink_in),
)
};
(@build $enum:ident, $source:tt, [$($acc:tt)*],
sink? $sink_fn:ident @ $sink_in:ident $(,)?) => {
$crate::Pipeline::new(
$source,
vec![$($acc)*],
$crate::make_sink_fallible!($enum, $sink_fn, $sink_in),
)
};
} }
fn source_runner<DATA, ERROR>( /// Fonction de transformation partagée entre workers via Arc.
mut source: StepKind<DATA, ERROR>, /// Arc<dyn Fn> permet à plusieurs workers de partager la même closure
/// sans la copier (Arc::clone = simple incrément de compteur).
pub type SharedFn<D> = Arc<dyn Fn(D) -> Result<D, PipelineError> + Send + Sync>;
/// Tâche envoyée à un worker : donnée + fonction à appliquer + canal de résultat.
/// Le worker n'a pas besoin de connaître sa position dans le pipeline ;
/// le scheduler lui dit exactement quoi faire et où envoyer le résultat.
pub type WorkerTask<D> = (D, SharedFn<D>, Sender<Result<D, PipelineError>>);
fn source_runner<DATA>(
mut source: SourceFn<DATA>,
capacity: usize, capacity: usize,
) -> (Receiver<DATA>, std::thread::JoinHandle<()>) ) -> (
Receiver<Result<DATA, PipelineError>>,
thread::JoinHandle<()>,
)
where where
ERROR: std::fmt::Debug, DATA: Send + Sync + 'static,
{ {
let (tx, rx) = bounded(capacity); let (tx, rx) = bounded(capacity);
let handle = std::thread::spawn(move || { let handle = thread::spawn(move || {
if let StepKind::Source(src) = &mut source { loop {
loop { match source() {
match src() { Ok(data) => {
Ok(data) => { if tx.send(Ok(data)).is_err() {
if tx.send(data).is_err() { break; // récepteur disparu
break; // récepteur disparu
}
}
Err(PipelineError::EndOfStream) => break,
Err(e) => {
eprintln!("Source error: {:?}", e);
break;
} }
} }
Err(PipelineError::EndOfStream) => break,
Err(e) => {
eprintln!("Source error: {:?}", e);
let _ = tx.send(Err(e));
break;
}
} }
} else {
panic!("source_runner called with non-Source step");
} }
}); });
(rx, handle) (rx, handle)
} }
fn runner<DATA, ERROR>( /// Lance un thread worker du pool.
task_rx: Receiver<(DATA, Step<DATA, ERROR>, usize)>, ///
result_tx: Vec<Sender<Result<DATA, ERROR>>>, /// Le worker attend des tâches sur `task_rx`. Chaque tâche est un triplet
) -> std::thread::JoinHandle<()> { /// `(data, f, result_tx)` : il applique `f(data)` et envoie le résultat
std::thread::spawn(move || { /// dans `result_tx`. C'est le scheduler qui décide quelle fonction envoyer
while let Ok((data, step, dest)) = task_rx.recv() { /// et quel canal de résultat utiliser — le worker lui-même est générique.
let res = (step.func)(data); fn transform_runner<DATA>(task_rx: Receiver<WorkerTask<DATA>>) -> thread::JoinHandle<()>
if result_tx[dest].send(res).is_err() { where
break; DATA: Send + Sync + 'static,
} {
thread::spawn(move || {
while let Ok((data, f, result_tx)) = task_rx.recv() {
let _ = result_tx.send(f(data));
} }
}) })
} }
fn sink_runner<DATA, ERROR>( /// Lance le thread sink.
sink: StepKind<DATA, ERROR>, ///
rx: Receiver<DATA>, /// Retourne :
) -> std::thread::JoinHandle<()> { /// - `Sender<DATA>` : le scheduler y envoie les données finales
std::thread::spawn(move || { /// - `Receiver<PipelineError>` : le sink y pousse toute erreur rencontrée ;
if let StepKind::Sink(sink_fn) = sink { /// le scheduler surveille ce canal en priorité absolue
for data in rx { /// pour interrompre le pipeline dès qu'une erreur survient.
sink_fn(data); fn sink_runner<DATA>(
sink: SinkFn<DATA>,
capacity: usize,
) -> (
Sender<DATA>,
Receiver<PipelineError>,
thread::JoinHandle<()>,
)
where
DATA: Send + Sync + 'static,
{
let (data_tx, data_rx) = bounded(capacity);
let (err_tx, err_rx) = bounded(capacity);
let handle = thread::spawn(move || {
for data in data_rx {
if let Err(e) = sink(data) {
let _ = err_tx.send(e);
break; // on arrête dès la première erreur
} }
} else {
panic!("sink_runner called with non-Sink step");
} }
}) });
(data_tx, err_rx, handle)
} }
pub struct Pipeline<DATA, ERROR> { pub struct Pipeline<DATA> {
source: StepKind<DATA, ERROR>, source: SourceFn<DATA>,
transforms: Vec<StepKind<DATA, ERROR>>, // toutes Transform transforms: Vec<SharedFn<DATA>>,
sink: StepKind<DATA, ERROR>, sink: SinkFn<DATA>,
} }
impl<DATA> Pipeline<DATA> {
pub fn new(
source: SourceFn<DATA>,
transforms: Vec<SharedFn<DATA>>,
sink: SinkFn<DATA>,
) -> Self {
Self { source, transforms, sink }
}
}
pub struct WorkerPool<DATA> {
struct WorkerPool<DATA, ERROR> { pipeline: Pipeline<DATA>,
pipeline: Pipeline<DATA, ERROR>,
task_tx: Vec<Sender<(DATA, Step<DATA, ERROR>, dest usize)>>,
task_rx: Vec<Receiver<DATA>>,
handles: Vec<std::thread::JoinHandle<()>>, handles: Vec<std::thread::JoinHandle<()>>,
n_workers: usize, n_workers: usize,
capacity: usize, capacity: usize,
} }
impl<DATA, ERROR> WorkerPool<DATA, ERROR> { impl<DATA> WorkerPool<DATA>
pub fn new(pipeline: Pipeline<DATA, ERROR>, n_workers: usize, capacity: usize) -> Self { where
DATA: Send + Sync + 'static,
{
pub fn new(pipeline: Pipeline<DATA>, n_workers: usize, capacity: usize) -> Self {
Self { Self {
pipeline, pipeline,
task_tx: Vec::new(),
task_rx: Vec::new(),
handles: Vec::new(), handles: Vec::new(),
n_workers, n_workers,
capacity, capacity,
} }
let (source_tx, source_rx) = crossbeam::channel::bounded<Result<DATA,PipelineError>>(capacity);
for i in 0..pipeline.transforms.len() {
let (task_tx, task_rx) = crossbeam::channel::bounded<Result<DATA,PipelineError>>(capacity);
self.task_tx.push(task_tx);
self.task_rx.push(task_rx);
}
} }
pub fn run(&mut self) { pub fn run(mut self) {
let (source_rx, source_handle) = source_runner(self.pipeline.source, self.capacity); let n = self.pipeline.transforms.len();
self.handles.push(source_handle);
let (transform_tx, transform_rx) = crossbeam::channel::bounded<(DATA, Step<DATA, ERROR>, dest usize)>(self.capacity); // ── Canaux inter-stages ────────────────────────────────────────────
// stage_txs[i] : le worker qui exécute transform[i] y envoie son résultat
for i in 0..self.n_workers { // stage_rxs[i] : le scheduler lit ici pour dispatcher au transform[i+1] (ou sink)
self.handles.push(runner(transform_tx.clone(), transform_rx.clone())); let mut stage_txs: Vec<Sender<Result<DATA, PipelineError>>> = Vec::new();
let mut stage_rxs: Vec<Receiver<Result<DATA, PipelineError>>> = Vec::new();
for _ in 0..n {
let (tx, rx) = bounded(self.capacity);
stage_txs.push(tx);
stage_rxs.push(rx);
} }
// ── Source thread ──────────────────────────────────────────────────
let (source_rx, src_handle) = source_runner(self.pipeline.source, self.capacity);
self.handles.push(src_handle);
// Les transforms sont déjà des SharedFn<DATA> — pas de conversion nécessaire.
let transforms = self.pipeline.transforms;
// ── Worker pool ────────────────────────────────────────────────────
// Canal partagé par tous les workers : le scheduler y pousse des WorkerTask,
// chaque worker en dépile une à la fois.
let (worker_tx, worker_rx): (Sender<WorkerTask<DATA>>, Receiver<WorkerTask<DATA>>) =
bounded(self.capacity);
for _ in 0..self.n_workers {
self.handles.push(transform_runner(worker_rx.clone()));
}
// ── Sink thread ────────────────────────────────────────────────────
let (sink_tx, sink_err_rx, sink_handle) = sink_runner(self.pipeline.sink, self.capacity);
self.handles.push(sink_handle);
// ── Boucle principale ─────────────────────────────────────────────
//
// Le Select est reconstruit à chaque itération, ce qui permet de
// retirer source_rx une fois la source épuisée.
//
// Priorités (biased = index le plus bas gagne) :
// index 0 → sink_err_rx (arrêt immédiat sur erreur sink)
// index 1..=N → stage_rxs[N-1..0] (vider le pipeline en priorité)
// index N+1 → source_rx (dernier recours : nouvelles données)
//
// Quand k == 0 : erreur du sink
// Quand 1 <= k <= N : stage concerné = N-k
// Quand k == N+1 : item venant de la source
//
// Terminaison : on quitte uniquement quand source_done ET in_flight == 0,
// ce qui garantit que tous les items ont traversé le pipeline jusqu'au sink.
{
let mut source_done = false;
let mut in_flight: usize = 0;
loop {
// Condition de sortie : plus rien en vol et source tarie
if source_done && in_flight == 0 {
break;
}
// Reconstruction du Select (sans source_rx si source épuisée)
let mut sel = Select::new_biased();
sel.recv(&sink_err_rx); // index 0
for rx in stage_rxs.iter().rev() {
sel.recv(rx); // indices 1 .. N
}
let src_idx = if !source_done {
Some(sel.recv(&source_rx)) // index N+1 (seulement si encore active)
} else {
None
};
let oper = sel.select();
let k = oper.index();
if k == 0 {
// ── Erreur du sink : on arrête tout ──────────────────
match oper.recv(&sink_err_rx) {
Ok(e) => { eprintln!("Sink error: {:?}", e); break; }
Err(_) => break,
}
} else if src_idx == Some(k) {
// ── Nouvel item depuis la source ──────────────────────
match oper.recv(&source_rx) {
Ok(Ok(data)) => {
if n == 0 {
let _ = sink_tx.send(data); // source → sink direct
} else {
in_flight += 1;
let _ = worker_tx.send((
data,
transforms[0].clone(),
stage_txs[0].clone(),
));
}
}
Ok(Err(e)) => eprintln!("Source error: {:?}", e),
Err(_) => source_done = true, // source fermée, on continue à drainer
}
} else {
// ── Résultat d'un stage intermédiaire ─────────────────
// k ∈ [1, N] → stage = N-k
let stage = n - k;
match oper.recv(&stage_rxs[stage]) {
Ok(Ok(data)) => {
if stage == n - 1 {
in_flight -= 1;
let _ = sink_tx.send(data); // dernière étape → sink
} else {
let _ = worker_tx.send((
data,
transforms[stage + 1].clone(),
stage_txs[stage + 1].clone(),
));
}
}
Ok(Err(e)) => eprintln!("Stage {} error: {:?}", stage, e),
Err(_) => break, // fermeture inattendue d'un canal
}
}
}
}
// Signaler la fin aux workers et au sink
drop(worker_tx);
drop(sink_tx);
for h in self.handles {
let _ = h.join();
}
} }
} }
-90
View File
@@ -1,90 +0,0 @@
/// Creates a pipeline stage from a pure (non-fallible) function.
///
/// This macro generates a closure that implements the `PipelineStage` trait by pattern
/// matching on the input `PipelineData` variant, applying the provided function, and
/// wrapping the result in the output variant.
///
/// # Arguments
/// * `$func` - The function to apply: `Fn(T) -> U`
/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`)
/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`)
///
/// # Example
///
/// ```ignore
/// // Define PipelineData enum
/// enum PipelineData {
/// Int(i64),
/// String(String),
/// }
///
/// // Create pure stage
/// let to_string_stage = make_stage!(
/// to_string,
/// Int(i64),
/// String(String)
/// );
///
/// // Use in pipeline
/// let result = to_string_stage(PipelineData::Int(42)).unwrap();
/// assert!(matches!(result, PipelineData::String(s) if s == "42"));
/// ```
macro_rules! make_transform {
($enum:ident, $func:ident, $input:ident, $output:ident) => {
StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> {
match data {
$enum::$input(x) => Ok($enum::$output($func(x))),
_ => Err(PipelineError::TypeMismatch),
}
}))
};
}
/// Creates a pipeline stage from a fallible function.
///
/// This macro generates a closure that pattern matches on the input `PipelineData`
/// variant, applies the provided function, and wraps the result in the output variant.
/// If the function returns an error, it is boxed and wrapped in `PipelineError::StepError`.
///
/// # Arguments
/// * `$func` - The fallible function to apply: `Fn(T) -> Result<U, E>`
/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`)
/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`)
///
/// # Example
///
/// ```ignore
/// fn parse_int(s: &str) -> Result<i64, std::num::ParseIntError> {
/// s.parse()
/// }
///
/// // Define PipelineData enum
/// enum PipelineData {
/// String(String),
/// Int(i64),
/// }
///
/// // Create fallible stage
/// let parse_stage = make_stage_fallible!(
/// parse_int,
/// String(String),
/// Int(i64)
/// );
///
/// // Use in pipeline
/// let result = parse_stage(PipelineData::String("42".into())).unwrap();
/// assert!(matches!(result, PipelineData::Int(n) if n == 42));
/// ```
macro_rules! make_transform_fallible {
($enum:ident, $func:ident, $input:ident, $output:ident) => {
StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> {
match data {
PipelineData::$input(inner) => {
let result = $func(inner).map_err(|e| PipelineError::StepError(Box::new(e)))?;
Ok($enum::$output(result))
}
_ => Err(PipelineError::TypeMismatch),
}
}
};
}
+1 -1
View File
@@ -32,7 +32,7 @@ impl<R: std::io::Read> MimeTypeGuesser<R> {
pub fn mime_type(&mut self) -> Option<&'static str> { pub fn mime_type(&mut self) -> Option<&'static str> {
let buf = self.0.header(BUF_SIZE)?; let buf = self.0.header(BUF_SIZE)?;
INFER.get_mime_type_for_bytes(buf).map(|kind| kind.mime_type) INFER.get_mime_type_for_bytes(buf).map(|kind| kind.mime_type())
} }
} }