From 22951fb0e868aedbd12962d34348e5c885b9606b Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 24 Apr 2026 08:52:46 +0200 Subject: [PATCH] :bookmark: Add obipipeline parallel pipeline library - Introduce `obipipline` crate with multi-threaded data pipeline architecture - Implement core types: SourceFn, SharedFun (Arc), SinkFN with biased scheduler and crossbeam channels - Add macros: `make_source!`, `transform!/fallible`/sink!, and high-level DSL macro - Replace old wrapper/error modules with unified scheduler module (renamed types, improved error variants) - Update workspace: add `obipipeline` member to Cargo.toml and lockfile - Document pipeline in docmd/implementation with full architecture, error handling & example - Refactor sandbox_pipeline.rs to use new DSL instead of manual channel wiring --- docmd/implementation/obipipeline.md | 147 ++++++ mkdocs.yml | 1 + src/Cargo.lock | 7 + src/Cargo.toml | 2 +- src/obipipeline/Cargo.toml | 3 + src/obipipeline/examples/sandbox_pipeline.rs | 81 ++- src/obipipeline/src/error.rs | 4 - src/obipipeline/src/lib.rs | 12 +- src/obipipeline/src/scheduler.rs | 500 ++++++++++++++----- src/obipipeline/src/wrapper.rs | 90 ---- src/obiread/src/mimetype.rs | 2 +- 11 files changed, 578 insertions(+), 271 deletions(-) create mode 100644 docmd/implementation/obipipeline.md delete mode 100644 src/obipipeline/src/error.rs delete mode 100644 src/obipipeline/src/wrapper.rs diff --git a/docmd/implementation/obipipeline.md b/docmd/implementation/obipipeline.md new file mode 100644 index 0000000..3079bf8 --- /dev/null +++ b/docmd/implementation/obipipeline.md @@ -0,0 +1,147 @@ +# obipipeline — parallel pipeline library + +`obipipeline` is a generic, multi-threaded data pipeline crate. It connects a **source**, a chain of **transforms**, and a **sink** via crossbeam channels, running each stage with a shared worker pool and a biased scheduler. + +## Core types + +| Type alias | Rust type | Role | +|---|---|---| +| `SourceFn` | `Box Result + Send+Sync>` | Called repeatedly; `FnMut` because it holds iterator state | +| `SharedFn` | `Arc Result + Send+Sync>` | Shared across workers via `Arc::clone` (no copy of the closure) | +| `SinkFn` | `Box Result<(), PipelineError> + Send+Sync>` | Final consumer; returns `Result` so errors propagate back | + +`Pipeline` holds one `SourceFn`, a `Vec`, and one `SinkFn`. +`WorkerPool` wraps a `Pipeline` with `n_workers` and channel `capacity`. + +## WorkerPool + +```rust +WorkerPool::new(pipeline: Pipeline, n_workers: usize, capacity: usize) -> Self +WorkerPool::run(self) +``` + +| Parameter | Role | +|---|---| +| `n_workers` | Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it. | +| `capacity` | Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees. | + +`run` consumes `self` (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning. + +## Data enum + +All pipeline stages communicate through a single user-defined enum: + +```rust +enum MyData { + Unsigned(u64), + Number(f64), + Text(String), +} +``` + +Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages. + +## Macros + +Six low-level macros build individual stages; one high-level macro (`make_pipeline!`) composes them. + +### Low-level + +```rust +make_source!(Enum, iterator, OutputVariant) // iterator yields T +make_source_fallible!(Enum, iterator, OutputVariant) // iterator yields Result + +make_transform!(Enum, func, InputVariant, OutputVariant) // func: T -> U +make_transform_fallible!(Enum, func, InputVariant, OutputVariant) // func: T -> Result + +make_sink!(Enum, func, InputVariant) // func: T -> () +make_sink_fallible!(Enum, func, InputVariant) // func: T -> Result<(), E> +``` + +Each macro wraps the closure in the correct smart pointer (`Box` for source/sink, `Arc` for transforms). + +### make_pipeline! DSL + +``` +make_pipeline! { + DataEnum, + source iterator => OutputVariant, // or source? for fallible + | func: In => Out, // non-fallible transform + |? func: In => Out, // fallible transform + sink func @ InputVariant, // or sink? for fallible +} +``` + +`?` marks fallibility on source, individual transforms, or sink independently. +Implemented as a **TT muncher**: the internal rule `@build` recurses over transform tokens one at a time, accumulating them into a `vec![]`, then terminates on `sink`/`sink?`. + +## Scheduler architecture + +``` +Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N) + ▲ │ + [stage_rxs] ────────┘◄──────────────────────────────┘ + │ + [sink_err_rx] ← errors from sink (highest priority) + │ + Sink thread +``` + +The scheduler is a single thread running a biased `Select` over all input channels. Priority order (highest first): + +``` +index 0 sink_err_rx abort on sink error +index 1 stage_rxs[N-1] drain last stage first +... +index N stage_rxs[0] +index N+1 source_rx pull new data last +``` + +This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline. + +**Workers** are generic: each receives `(data, SharedFn, result_tx)` and calls `f(data)`, sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result. + +**Termination** uses an `in_flight` counter: + +- incremented when an item is dispatched from source to workers +- decremented when the item exits the last stage +- the loop exits only when `source_done && in_flight == 0` + +This guarantees all in-flight items complete before `join()`. + +## Error handling + +`PipelineError` has four variants: + +| Variant | Meaning | +|---|---| +| `EndOfStream` | Source exhausted (normal termination, not sent downstream) | +| `TypeMismatch` | Wrong enum variant arrived at a stage | +| `StepKindMismatch` | Internal routing error | +| `StepError(Box)` | Error from user code (wrapped by `make_*_fallible!`) | + +Sink errors flow back to the scheduler via a dedicated `Receiver` registered at index 0 of the Select — the pipeline stops immediately on the first sink error. + +## Example + +```rust +enum PipelineData { Unsigned(u64), Number(f64), Text(String) } + +fn to_f64(x: u64) -> f64 { x as f64 } +fn format_num(n: f64) -> String { format!("{}", n) } +fn reverse(s: String) -> String { s.chars().rev().collect() } +fn hash(s: String) -> u64 { /* djb2 */ } +fn print_hash(h: u64) -> Result<(), std::io::Error> { println!("{}", h); Ok(()) } + +let pipeline = make_pipeline! { + PipelineData, + source 1u64..=10 => Unsigned, + | to_f64: Unsigned => Number, + | format_num: Number => Text, + | reverse: Text => Text, + | hash: Text => Unsigned, + sink? print_hash @ Unsigned, +}; + +WorkerPool::new(pipeline, 4, 64).run(); +``` diff --git a/mkdocs.yml b/mkdocs.yml index e3bff3a..0dfca5e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -34,6 +34,7 @@ nav: - Kmer: implementation/kmer.md - Chunk reader: implementation/chunkreader.md - Construction pipeline: implementation/pipeline.md + - obipipeline library: implementation/obipipeline.md - On-disk storage: implementation/storage.md - MPHF selection: implementation/mphf.md - Architecture: diff --git a/src/Cargo.lock b/src/Cargo.lock index d3012bb..2d0becb 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -692,6 +692,13 @@ dependencies = [ "criterion2", ] +[[package]] +name = "obipipeline" +version = "0.1.0" +dependencies = [ + "crossbeam-channel", +] + [[package]] name = "obiread" version = "0.1.0" diff --git a/src/Cargo.toml b/src/Cargo.toml index d61a33a..a68f64c 100644 --- a/src/Cargo.toml +++ b/src/Cargo.toml @@ -1,3 +1,3 @@ [workspace] resolver = "3" -members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope"] +members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline"] diff --git a/src/obipipeline/Cargo.toml b/src/obipipeline/Cargo.toml index 09d1d73..40aa6bf 100644 --- a/src/obipipeline/Cargo.toml +++ b/src/obipipeline/Cargo.toml @@ -2,3 +2,6 @@ name = "obipipeline" version = "0.1.0" edition = "2024" + +[dependencies] +crossbeam-channel = "0.5.15" diff --git a/src/obipipeline/examples/sandbox_pipeline.rs b/src/obipipeline/examples/sandbox_pipeline.rs index 19f6f76..cb3743c 100644 --- a/src/obipipeline/examples/sandbox_pipeline.rs +++ b/src/obipipeline/examples/sandbox_pipeline.rs @@ -1,19 +1,16 @@ -use crossbeam_channel::bounded; -use obipipeline::{Connection, PipelineData, PipelineError}; +use obipipeline::{WorkerPool, make_pipeline}; +use std::io::Error; + +// ── Itérateur source ────────────────────────────────────────────────────────── struct Compteur { count: u64, - start: u64, stop: u64, } impl Compteur { fn new(start: u64, stop: u64) -> Self { - Self { - count: start, - start, - stop, - } + Self { count: start, stop } } } @@ -30,51 +27,53 @@ impl Iterator for Compteur { } } -fn step_1(number: f64) -> Result { - Ok(format!("{}", number)) +// ── Fonctions de transformation ─────────────────────────────────────────────── + +fn to_f64(x: u64) -> f64 { + x as f64 } -fn step_2(text: String) -> Result { - Ok(text.chars().rev().collect()) +fn step_1(number: f64) -> String { + format!("{}", number) } -fn step_3(text: String) -> Result { +fn step_2(text: String) -> String { + text.chars().rev().collect() +} + +fn step_3(text: String) -> u64 { let mut hash: u64 = 5381; for byte in text.bytes() { hash = hash.wrapping_mul(33).wrapping_add(byte as u64); } - Ok(hash) + hash } -fn step_4(hash: u64) -> Result<(), PipelineError> { - println!("{:?}", hash); +fn step_4(hash: u64) -> Result<(), Error> { + println!("{}", hash); Ok(()) } -fn main() { - // Create bounded channels for connection between stages - let (tx0, rx0) = bounded(100); - let (tx1, rx1) = bounded(100); - let (tx2, rx2) = bounded(100); - let (tx3, rx3) = bounded(100); +// ── Type de données du pipeline ─────────────────────────────────────────────── - // Create connections using crossbeam channels - let conn0 = Connection { - source: tx0, - target: rx0, - }; - let conn1 = Connection { - source: tx1, - target: rx1, - }; - let conn2 = Connection { - source: tx2, - target: rx2, - }; - let conn3 = Connection { - source: tx3, - target: rx3, - }; - - println!("Pipeline with crossbeam channel connections created"); +enum PipelineData { + Unsigned(u64), + Number(f64), + Text(String), +} + +// ── Main ────────────────────────────────────────────────────────────────────── + +fn main() { + let pipeline = make_pipeline! { + PipelineData, + source Compteur::new(1000, 1010) => Unsigned, + | to_f64: Unsigned => Number, + | step_1: Number => Text, + | step_2: Text => Text, + | step_3: Text => Unsigned, + sink? step_4 @ Unsigned, + }; + + WorkerPool::new(pipeline, 4, 64).run(); } diff --git a/src/obipipeline/src/error.rs b/src/obipipeline/src/error.rs deleted file mode 100644 index b1ab4a3..0000000 --- a/src/obipipeline/src/error.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub enum PipelineError { - TypeMismatch, - StepError(Box), -} diff --git a/src/obipipeline/src/lib.rs b/src/obipipeline/src/lib.rs index 1b5c60a..48f370c 100644 --- a/src/obipipeline/src/lib.rs +++ b/src/obipipeline/src/lib.rs @@ -1,2 +1,10 @@ -pub mod error; -pub mod wrapper; +mod scheduler; + +pub use scheduler::Pipeline; +pub use scheduler::PipelineError; +pub use scheduler::SharedFn; +pub use scheduler::SinkFn; +pub use scheduler::SourceFn; +pub use scheduler::WorkerPool; +// make_sink, make_sink_fallible, make_source, make_source_fallible, +// make_transform, make_transform_fallible are exported at crate root via #[macro_export] diff --git a/src/obipipeline/src/scheduler.rs b/src/obipipeline/src/scheduler.rs index dd3dbf7..2e8e6ce 100644 --- a/src/obipipeline/src/scheduler.rs +++ b/src/obipipeline/src/scheduler.rs @@ -1,13 +1,16 @@ -use crossbeam_channel::{bounded, Receiver, Sender}; -use std::thread; +use crossbeam_channel::{Receiver, Select, Sender, bounded}; use std::error::Error; use std::fmt; +use std::sync::Arc; +use std::thread; /// Error type for pipeline operations. #[derive(Debug)] pub enum PipelineError { /// A stage received a `PipelineData` variant it did not expect. TypeMismatch, + /// The step kind is not compatible with the data type. + StepKindMismatch(&'static str), /// The source has no more data to produce. EndOfStream, /// An error occurred inside a stage (e.g., I/O, parsing, custom logic). @@ -18,6 +21,7 @@ impl fmt::Display for PipelineError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { PipelineError::TypeMismatch => write!(f, "data type mismatch in pipeline stage"), + PipelineError::StepKindMismatch(s) => write!(f, "step kind mismatch: {}", s), PipelineError::EndOfStream => write!(f, "end of input stream"), PipelineError::StepError(e) => write!(f, "stage error: {}", e), } @@ -64,19 +68,12 @@ impl Error for PipelineError { /// - A `Source` does not take an input; it yields an iterator of `Result`. /// - A `Transform` takes a `DATA` and returns a `Result`. /// - A `Sink` takes a `DATA` and performs a side effect (no return value). -pub enum StepKind { - /// Source: called repeatedly to produce the next data item. - /// Returns `Ok(data)` when a value is available, - /// `Err(e)` on error, and `Err(PipelineError::EndOfStream)` when exhausted. - Source(Box Result + Send + Sync>), - - /// Transform: pure computation from input to output (may fail). - Transform(Box Result + Send + Sync>), - - /// Sink: final consumer, no result. - Sink(Box), -} +/// Fonction source : appelée répétitivement, retourne le prochain item ou EndOfStream. +/// `FnMut` car elle maintient un état interne (position dans l'itérateur). +pub type SourceFn = Box Result + Send + Sync>; +/// Fonction sink : consomme un item final, peut échouer (erreur d'I/O, etc.). +pub type SinkFn = Box Result<(), PipelineError> + Send + Sync>; /// Creates a `StepKind::Source` from an iterator of plain values. /// Each value is wrapped with the specified `$enum` variant and returned as `Ok`. @@ -87,18 +84,20 @@ pub enum StepKind { /// let iter = vec![1.0, 2.0].into_iter(); /// let source = make_source!(PipelineData, iter, Numeric); /// ``` +#[macro_export] macro_rules! make_source { - ($enum:ident, $iterator:expr, $output:ident) => { - StepKind::Source(Box::new({ - let mut iter = $iterator.into_iter(); - move || -> Result<$enum, PipelineError> { + ($enum:ident, $iterator:expr, $output:ident) => {{ + let mut iter = $iterator.into_iter(); + Box::new( + move || -> ::std::result::Result<$enum, $crate::PipelineError> { match iter.next() { Some(x) => Ok($enum::$output(x)), - None => Err(PipelineError::EndOfStream), + None => Err($crate::PipelineError::EndOfStream), } - } - })) - }; + }, + ) + as Box ::std::result::Result<$enum, $crate::PipelineError> + Send + Sync> + }}; } /// Creates a `StepKind::Source` from an iterator of `Result`. @@ -111,19 +110,21 @@ macro_rules! make_source { /// let iter = vec![Ok(1.0), Err("oops")].into_iter(); /// let source = make_source_fallible!(PipelineData, iter, Numeric); /// ``` +#[macro_export] macro_rules! make_source_fallible { - ($enum:ident, $iterator:expr, $output:ident) => { - StepKind::Source(Box::new({ - let mut iter = $iterator.into_iter(); - move || -> Result<$enum, PipelineError> { + ($enum:ident, $iterator:expr, $output:ident) => {{ + let mut iter = $iterator.into_iter(); + Box::new( + move || -> ::std::result::Result<$enum, $crate::PipelineError> { match iter.next() { Some(Ok(x)) => Ok($enum::$output(x)), - Some(Err(e)) => Err(PipelineError::StepError(Box::new(e))), - None => Err(PipelineError::EndOfStream), + Some(Err(e)) => Err($crate::PipelineError::StepError(Box::new(e))), + None => Err($crate::PipelineError::EndOfStream), } - } - })) - }; + }, + ) + as Box ::std::result::Result<$enum, $crate::PipelineError> + Send + Sync> + }}; } /// Creates a pipeline stage from a pure (non-fallible) function. @@ -157,14 +158,20 @@ macro_rules! make_source_fallible { /// let result = to_string_stage(PipelineData::Int(42)).unwrap(); /// assert!(matches!(result, PipelineData::String(s) if s == "42")); /// ``` +#[macro_export] macro_rules! make_transform { ($enum:ident, $func:ident, $input:ident, $output:ident) => { - StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> { - match data { - $enum::$input(x) => Ok($enum::$output($func(x))), - _ => Err(PipelineError::TypeMismatch), - } - })) + ::std::sync::Arc::from(Box::new( + |data: $enum| -> ::std::result::Result<$enum, $crate::PipelineError> { + match data { + $enum::$input(x) => Ok($enum::$output($func(x))), + _ => Err($crate::PipelineError::TypeMismatch), + } + }, + ) + as Box< + dyn Fn($enum) -> ::std::result::Result<$enum, $crate::PipelineError> + Send + Sync, + >) }; } @@ -203,17 +210,24 @@ macro_rules! make_transform { /// let result = parse_stage(PipelineData::String("42".into())).unwrap(); /// assert!(matches!(result, PipelineData::Int(n) if n == 42)); /// ``` +#[macro_export] macro_rules! make_transform_fallible { ($enum:ident, $func:ident, $input:ident, $output:ident) => { - StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> { - match data { - PipelineData::$input(inner) => { - let result = $func(inner).map_err(|e| PipelineError::StepError(Box::new(e)))?; - Ok($enum::$output(result)) + ::std::sync::Arc::from(Box::new( + |data: $enum| -> ::std::result::Result<$enum, $crate::PipelineError> { + match data { + $enum::$input(inner) => { + let result = $func(inner) + .map_err(|e| $crate::PipelineError::StepError(Box::new(e)))?; + Ok($enum::$output(result)) + } + _ => Err($crate::PipelineError::TypeMismatch), } - _ => Err(PipelineError::TypeMismatch), - } - } + }, + ) + as Box< + dyn Fn($enum) -> ::std::result::Result<$enum, $crate::PipelineError> + Send + Sync, + >) }; } @@ -226,17 +240,21 @@ macro_rules! make_transform_fallible { /// fn print_number(n: f64) { println!("{}", n); } /// let sink = make_sink!(PipelineData, print_number, Numeric); /// ``` +#[macro_export] macro_rules! make_sink { ($enum:ident, $func:ident, $input:ident) => { - StepKind::Sink(Box::new(|data: $enum| -> Result<(), PipelineError> { - match data { - $enum::$input(x) => { - $func(x); - Ok(()) + Box::new( + |data: $enum| -> ::std::result::Result<(), $crate::PipelineError> { + match data { + $enum::$input(x) => { + $func(x); + Ok(()) + } + _ => Err($crate::PipelineError::TypeMismatch), } - _ => Err(PipelineError::TypeMismatch), - } - })) + }, + ) + as Box ::std::result::Result<(), $crate::PipelineError> + Send + Sync> }; } @@ -248,132 +266,350 @@ macro_rules! make_sink { /// fn save_to_file(hash: u64) -> std::io::Result<()> { Ok(()) } /// let sink = make_sink_fallible!(PipelineData, save_to_file, Hash); /// ``` +#[macro_export] macro_rules! make_sink_fallible { ($enum:ident, $func:ident, $input:ident) => { - StepKind::Sink(Box::new(|data: $enum| -> Result<(), PipelineError> { - match data { - $enum::$input(x) => { - $func(x).map_err(|e| PipelineError::StepError(Box::new(e))) + Box::new( + |data: $enum| -> ::std::result::Result<(), $crate::PipelineError> { + match data { + $enum::$input(inner) => { + $func(inner).map_err(|e| $crate::PipelineError::StepError(Box::new(e))) + } + _ => Err($crate::PipelineError::TypeMismatch), } - _ => Err(PipelineError::TypeMismatch), - } - })) + }, + ) + as Box ::std::result::Result<(), $crate::PipelineError> + Send + Sync> }; } -struct Step { - input: Option, - output: Option, - func: StepKind, +/// Construit un `Pipeline` à partir d'une source, d'une liste de transforms et d'un sink. +/// +/// Syntaxe : +/// ```ignore +/// make_pipeline! { +/// MyData, +/// source my_iter => Variant, // source non-fallible +/// source? my_iter => Variant, // source fallible (Result) +/// | func: In => Out, // transform non-fallible (répété 0..N fois) +/// |? func: In => Out, // transform fallible (répété 0..N fois) +/// sink my_func @ Variant, // sink non-fallible +/// sink? my_func @ Variant, // sink fallible +/// } +/// ``` +/// +/// Implémenté comme un TT muncher : la règle interne `@build` traite les transforms +/// un par un en les accumulant, puis termine sur `sink`/`sink?`. +#[macro_export] +macro_rules! make_pipeline { + // ── Points d'entrée ────────────────────────────────────────────────── + + ($enum:ident, source $src:expr => $src_out:ident, $($rest:tt)*) => { + $crate::make_pipeline!(@build $enum, + { $crate::make_source!($enum, $src, $src_out) }, + [], + $($rest)*) + }; + ($enum:ident, source? $src:expr => $src_out:ident, $($rest:tt)*) => { + $crate::make_pipeline!(@build $enum, + { $crate::make_source_fallible!($enum, $src, $src_out) }, + [], + $($rest)*) + }; + + // ── Accumulation des transforms ────────────────────────────────────── + + // transform non-fallible : | + (@build $enum:ident, $source:tt, [$($acc:tt)*], + | $tf:ident : $t_in:ident => $t_out:ident, $($rest:tt)*) => { + $crate::make_pipeline!(@build $enum, $source, + [$($acc)* $crate::make_transform!($enum, $tf, $t_in, $t_out),], + $($rest)*) + }; + + // transform fallible : |? + (@build $enum:ident, $source:tt, [$($acc:tt)*], + |? $tf:ident : $t_in:ident => $t_out:ident, $($rest:tt)*) => { + $crate::make_pipeline!(@build $enum, $source, + [$($acc)* $crate::make_transform_fallible!($enum, $tf, $t_in, $t_out),], + $($rest)*) + }; + + // ── Terminaison : sink ─────────────────────────────────────────────── + + (@build $enum:ident, $source:tt, [$($acc:tt)*], + sink $sink_fn:ident @ $sink_in:ident $(,)?) => { + $crate::Pipeline::new( + $source, + vec![$($acc)*], + $crate::make_sink!($enum, $sink_fn, $sink_in), + ) + }; + (@build $enum:ident, $source:tt, [$($acc:tt)*], + sink? $sink_fn:ident @ $sink_in:ident $(,)?) => { + $crate::Pipeline::new( + $source, + vec![$($acc)*], + $crate::make_sink_fallible!($enum, $sink_fn, $sink_in), + ) + }; } -fn source_runner( - mut source: StepKind, +/// Fonction de transformation partagée entre workers via Arc. +/// Arc permet à plusieurs workers de partager la même closure +/// sans la copier (Arc::clone = simple incrément de compteur). +pub type SharedFn = Arc Result + Send + Sync>; + +/// Tâche envoyée à un worker : donnée + fonction à appliquer + canal de résultat. +/// Le worker n'a pas besoin de connaître sa position dans le pipeline ; +/// le scheduler lui dit exactement quoi faire et où envoyer le résultat. +pub type WorkerTask = (D, SharedFn, Sender>); + +fn source_runner( + mut source: SourceFn, capacity: usize, -) -> (Receiver, std::thread::JoinHandle<()>) +) -> ( + Receiver>, + thread::JoinHandle<()>, +) where - ERROR: std::fmt::Debug, + DATA: Send + Sync + 'static, { let (tx, rx) = bounded(capacity); - let handle = std::thread::spawn(move || { - if let StepKind::Source(src) = &mut source { - loop { - match src() { - Ok(data) => { - if tx.send(data).is_err() { - break; // récepteur disparu - } - } - Err(PipelineError::EndOfStream) => break, - Err(e) => { - eprintln!("Source error: {:?}", e); - break; + let handle = thread::spawn(move || { + loop { + match source() { + Ok(data) => { + if tx.send(Ok(data)).is_err() { + break; // récepteur disparu } } + Err(PipelineError::EndOfStream) => break, + Err(e) => { + eprintln!("Source error: {:?}", e); + let _ = tx.send(Err(e)); + break; + } } - } else { - panic!("source_runner called with non-Source step"); } }); (rx, handle) } -fn runner( - task_rx: Receiver<(DATA, Step, usize)>, - result_tx: Vec>>, -) -> std::thread::JoinHandle<()> { - std::thread::spawn(move || { - while let Ok((data, step, dest)) = task_rx.recv() { - let res = (step.func)(data); - if result_tx[dest].send(res).is_err() { - break; - } +/// Lance un thread worker du pool. +/// +/// Le worker attend des tâches sur `task_rx`. Chaque tâche est un triplet +/// `(data, f, result_tx)` : il applique `f(data)` et envoie le résultat +/// dans `result_tx`. C'est le scheduler qui décide quelle fonction envoyer +/// et quel canal de résultat utiliser — le worker lui-même est générique. +fn transform_runner(task_rx: Receiver>) -> thread::JoinHandle<()> +where + DATA: Send + Sync + 'static, +{ + thread::spawn(move || { + while let Ok((data, f, result_tx)) = task_rx.recv() { + let _ = result_tx.send(f(data)); } }) } -fn sink_runner( - sink: StepKind, - rx: Receiver, -) -> std::thread::JoinHandle<()> { - std::thread::spawn(move || { - if let StepKind::Sink(sink_fn) = sink { - for data in rx { - sink_fn(data); +/// Lance le thread sink. +/// +/// Retourne : +/// - `Sender` : le scheduler y envoie les données finales +/// - `Receiver` : le sink y pousse toute erreur rencontrée ; +/// le scheduler surveille ce canal en priorité absolue +/// pour interrompre le pipeline dès qu'une erreur survient. +fn sink_runner( + sink: SinkFn, + capacity: usize, +) -> ( + Sender, + Receiver, + thread::JoinHandle<()>, +) +where + DATA: Send + Sync + 'static, +{ + let (data_tx, data_rx) = bounded(capacity); + let (err_tx, err_rx) = bounded(capacity); + let handle = thread::spawn(move || { + for data in data_rx { + if let Err(e) = sink(data) { + let _ = err_tx.send(e); + break; // on arrête dès la première erreur } - } else { - panic!("sink_runner called with non-Sink step"); } - }) + }); + (data_tx, err_rx, handle) } -pub struct Pipeline { - source: StepKind, - transforms: Vec>, // toutes Transform - sink: StepKind, +pub struct Pipeline { + source: SourceFn, + transforms: Vec>, + sink: SinkFn, } +impl Pipeline { + pub fn new( + source: SourceFn, + transforms: Vec>, + sink: SinkFn, + ) -> Self { + Self { source, transforms, sink } + } +} - -struct WorkerPool { - pipeline: Pipeline, - task_tx: Vec, dest usize)>>, - task_rx: Vec>, +pub struct WorkerPool { + pipeline: Pipeline, handles: Vec>, n_workers: usize, capacity: usize, } -impl WorkerPool { - pub fn new(pipeline: Pipeline, n_workers: usize, capacity: usize) -> Self { +impl WorkerPool +where + DATA: Send + Sync + 'static, +{ + pub fn new(pipeline: Pipeline, n_workers: usize, capacity: usize) -> Self { Self { pipeline, - task_tx: Vec::new(), - task_rx: Vec::new(), handles: Vec::new(), n_workers, capacity, } - - let (source_tx, source_rx) = crossbeam::channel::bounded>(capacity); - - for i in 0..pipeline.transforms.len() { - let (task_tx, task_rx) = crossbeam::channel::bounded>(capacity); - self.task_tx.push(task_tx); - self.task_rx.push(task_rx); - } } - pub fn run(&mut self) { - let (source_rx, source_handle) = source_runner(self.pipeline.source, self.capacity); - self.handles.push(source_handle); + pub fn run(mut self) { + let n = self.pipeline.transforms.len(); - let (transform_tx, transform_rx) = crossbeam::channel::bounded<(DATA, Step, dest usize)>(self.capacity); - - for i in 0..self.n_workers { - self.handles.push(runner(transform_tx.clone(), transform_rx.clone())); + // ── Canaux inter-stages ──────────────────────────────────────────── + // stage_txs[i] : le worker qui exécute transform[i] y envoie son résultat + // stage_rxs[i] : le scheduler lit ici pour dispatcher au transform[i+1] (ou sink) + let mut stage_txs: Vec>> = Vec::new(); + let mut stage_rxs: Vec>> = Vec::new(); + for _ in 0..n { + let (tx, rx) = bounded(self.capacity); + stage_txs.push(tx); + stage_rxs.push(rx); } + // ── Source thread ────────────────────────────────────────────────── + let (source_rx, src_handle) = source_runner(self.pipeline.source, self.capacity); + self.handles.push(src_handle); + // Les transforms sont déjà des SharedFn — pas de conversion nécessaire. + let transforms = self.pipeline.transforms; + + // ── Worker pool ──────────────────────────────────────────────────── + // Canal partagé par tous les workers : le scheduler y pousse des WorkerTask, + // chaque worker en dépile une à la fois. + let (worker_tx, worker_rx): (Sender>, Receiver>) = + bounded(self.capacity); + + for _ in 0..self.n_workers { + self.handles.push(transform_runner(worker_rx.clone())); + } + + // ── Sink thread ──────────────────────────────────────────────────── + let (sink_tx, sink_err_rx, sink_handle) = sink_runner(self.pipeline.sink, self.capacity); + self.handles.push(sink_handle); + + // ── Boucle principale ───────────────────────────────────────────── + // + // Le Select est reconstruit à chaque itération, ce qui permet de + // retirer source_rx une fois la source épuisée. + // + // Priorités (biased = index le plus bas gagne) : + // index 0 → sink_err_rx (arrêt immédiat sur erreur sink) + // index 1..=N → stage_rxs[N-1..0] (vider le pipeline en priorité) + // index N+1 → source_rx (dernier recours : nouvelles données) + // + // Quand k == 0 : erreur du sink + // Quand 1 <= k <= N : stage concerné = N-k + // Quand k == N+1 : item venant de la source + // + // Terminaison : on quitte uniquement quand source_done ET in_flight == 0, + // ce qui garantit que tous les items ont traversé le pipeline jusqu'au sink. + { + let mut source_done = false; + let mut in_flight: usize = 0; + + loop { + // Condition de sortie : plus rien en vol et source tarie + if source_done && in_flight == 0 { + break; + } + + // Reconstruction du Select (sans source_rx si source épuisée) + let mut sel = Select::new_biased(); + sel.recv(&sink_err_rx); // index 0 + for rx in stage_rxs.iter().rev() { + sel.recv(rx); // indices 1 .. N + } + let src_idx = if !source_done { + Some(sel.recv(&source_rx)) // index N+1 (seulement si encore active) + } else { + None + }; + + let oper = sel.select(); + let k = oper.index(); + + if k == 0 { + // ── Erreur du sink : on arrête tout ────────────────── + match oper.recv(&sink_err_rx) { + Ok(e) => { eprintln!("Sink error: {:?}", e); break; } + Err(_) => break, + } + } else if src_idx == Some(k) { + // ── Nouvel item depuis la source ────────────────────── + match oper.recv(&source_rx) { + Ok(Ok(data)) => { + if n == 0 { + let _ = sink_tx.send(data); // source → sink direct + } else { + in_flight += 1; + let _ = worker_tx.send(( + data, + transforms[0].clone(), + stage_txs[0].clone(), + )); + } + } + Ok(Err(e)) => eprintln!("Source error: {:?}", e), + Err(_) => source_done = true, // source fermée, on continue à drainer + } + } else { + // ── Résultat d'un stage intermédiaire ───────────────── + // k ∈ [1, N] → stage = N-k + let stage = n - k; + match oper.recv(&stage_rxs[stage]) { + Ok(Ok(data)) => { + if stage == n - 1 { + in_flight -= 1; + let _ = sink_tx.send(data); // dernière étape → sink + } else { + let _ = worker_tx.send(( + data, + transforms[stage + 1].clone(), + stage_txs[stage + 1].clone(), + )); + } + } + Ok(Err(e)) => eprintln!("Stage {} error: {:?}", stage, e), + Err(_) => break, // fermeture inattendue d'un canal + } + } + } + } + + // Signaler la fin aux workers et au sink + drop(worker_tx); + drop(sink_tx); + + for h in self.handles { + let _ = h.join(); + } } } diff --git a/src/obipipeline/src/wrapper.rs b/src/obipipeline/src/wrapper.rs deleted file mode 100644 index 281eca3..0000000 --- a/src/obipipeline/src/wrapper.rs +++ /dev/null @@ -1,90 +0,0 @@ -/// Creates a pipeline stage from a pure (non-fallible) function. -/// -/// This macro generates a closure that implements the `PipelineStage` trait by pattern -/// matching on the input `PipelineData` variant, applying the provided function, and -/// wrapping the result in the output variant. -/// -/// # Arguments -/// * `$func` - The function to apply: `Fn(T) -> U` -/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`) -/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`) -/// -/// # Example -/// -/// ```ignore -/// // Define PipelineData enum -/// enum PipelineData { -/// Int(i64), -/// String(String), -/// } -/// -/// // Create pure stage -/// let to_string_stage = make_stage!( -/// to_string, -/// Int(i64), -/// String(String) -/// ); -/// -/// // Use in pipeline -/// let result = to_string_stage(PipelineData::Int(42)).unwrap(); -/// assert!(matches!(result, PipelineData::String(s) if s == "42")); -/// ``` -macro_rules! make_transform { - ($enum:ident, $func:ident, $input:ident, $output:ident) => { - StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> { - match data { - $enum::$input(x) => Ok($enum::$output($func(x))), - _ => Err(PipelineError::TypeMismatch), - } - })) - }; -} - -/// Creates a pipeline stage from a fallible function. -/// -/// This macro generates a closure that pattern matches on the input `PipelineData` -/// variant, applies the provided function, and wraps the result in the output variant. -/// If the function returns an error, it is boxed and wrapped in `PipelineError::StepError`. -/// -/// # Arguments -/// * `$func` - The fallible function to apply: `Fn(T) -> Result` -/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`) -/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`) -/// -/// # Example -/// -/// ```ignore -/// fn parse_int(s: &str) -> Result { -/// s.parse() -/// } -/// -/// // Define PipelineData enum -/// enum PipelineData { -/// String(String), -/// Int(i64), -/// } -/// -/// // Create fallible stage -/// let parse_stage = make_stage_fallible!( -/// parse_int, -/// String(String), -/// Int(i64) -/// ); -/// -/// // Use in pipeline -/// let result = parse_stage(PipelineData::String("42".into())).unwrap(); -/// assert!(matches!(result, PipelineData::Int(n) if n == 42)); -/// ``` -macro_rules! make_transform_fallible { - ($enum:ident, $func:ident, $input:ident, $output:ident) => { - StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> { - match data { - PipelineData::$input(inner) => { - let result = $func(inner).map_err(|e| PipelineError::StepError(Box::new(e)))?; - Ok($enum::$output(result)) - } - _ => Err(PipelineError::TypeMismatch), - } - } - }; -} diff --git a/src/obiread/src/mimetype.rs b/src/obiread/src/mimetype.rs index 8b82b2c..fda9f28 100644 --- a/src/obiread/src/mimetype.rs +++ b/src/obiread/src/mimetype.rs @@ -32,7 +32,7 @@ impl MimeTypeGuesser { pub fn mime_type(&mut self) -> Option<&'static str> { let buf = self.0.header(BUF_SIZE)?; - INFER.get_mime_type_for_bytes(buf).map(|kind| kind.mime_type) + INFER.get_mime_type_for_bytes(buf).map(|kind| kind.mime_type()) } }