📦 Add infer and new pipeline infrastructure

- Update Cargo.lock with dependency additions (bumpalo, byteorder, cfb, fnv, infer, js-sys, uuid wasm-bindgen)
- Refactor obikseq::superkmer: reorder imports and improve formatting
  - Add `obipipeline` crate with scheduler, error handling & macros (WIP)
- Replace obiread::expand_paths logic with PathIter and path_iterator module
  - Add mimetype detection using `infer` crate via PeekReader wrapper
This commit is contained in:
Eric Coissac
2026-04-23 21:03:48 +02:00
parent 664d0216b5
commit 3f8880a7e5
15 changed files with 893 additions and 86 deletions
+4
View File
@@ -0,0 +1,4 @@
[package]
name = "obipipeline"
version = "0.1.0"
edition = "2024"
@@ -0,0 +1,80 @@
use crossbeam_channel::bounded;
use obipipeline::{Connection, PipelineData, PipelineError};
struct Compteur {
count: u64,
start: u64,
stop: u64,
}
impl Compteur {
fn new(start: u64, stop: u64) -> Self {
Self {
count: start,
start,
stop,
}
}
}
impl Iterator for Compteur {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.count > self.stop {
return None;
}
let result = self.count;
self.count += 1;
Some(result)
}
}
fn step_1(number: f64) -> Result<String, PipelineError> {
Ok(format!("{}", number))
}
fn step_2(text: String) -> Result<String, PipelineError> {
Ok(text.chars().rev().collect())
}
fn step_3(text: String) -> Result<u64, PipelineError> {
let mut hash: u64 = 5381;
for byte in text.bytes() {
hash = hash.wrapping_mul(33).wrapping_add(byte as u64);
}
Ok(hash)
}
fn step_4(hash: u64) -> Result<(), PipelineError> {
println!("{:?}", hash);
Ok(())
}
fn main() {
// Create bounded channels for connection between stages
let (tx0, rx0) = bounded(100);
let (tx1, rx1) = bounded(100);
let (tx2, rx2) = bounded(100);
let (tx3, rx3) = bounded(100);
// Create connections using crossbeam channels
let conn0 = Connection {
source: tx0,
target: rx0,
};
let conn1 = Connection {
source: tx1,
target: rx1,
};
let conn2 = Connection {
source: tx2,
target: rx2,
};
let conn3 = Connection {
source: tx3,
target: rx3,
};
println!("Pipeline with crossbeam channel connections created");
}
+4
View File
@@ -0,0 +1,4 @@
pub enum PipelineError {
TypeMismatch,
StepError(Box<dyn std::error::Error + Send + Sync>),
}
+2
View File
@@ -0,0 +1,2 @@
pub mod error;
pub mod wrapper;
+379
View File
@@ -0,0 +1,379 @@
use crossbeam_channel::{bounded, Receiver, Sender};
use std::thread;
use std::error::Error;
use std::fmt;
/// Error type for pipeline operations.
#[derive(Debug)]
pub enum PipelineError {
/// A stage received a `PipelineData` variant it did not expect.
TypeMismatch,
/// The source has no more data to produce.
EndOfStream,
/// An error occurred inside a stage (e.g., I/O, parsing, custom logic).
StepError(Box<dyn Error + Send + Sync>),
}
impl fmt::Display for PipelineError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PipelineError::TypeMismatch => write!(f, "data type mismatch in pipeline stage"),
PipelineError::EndOfStream => write!(f, "end of input stream"),
PipelineError::StepError(e) => write!(f, "stage error: {}", e),
}
}
}
impl Error for PipelineError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
PipelineError::StepError(e) => Some(e.as_ref()),
_ => None,
}
}
}
/// Represents a single processing stage in a data pipeline.
///
/// `StepKind` abstracts over three fundamental types of operations:
/// - **Source**: Produces a sequence of data items (e.g., from a file, iterator, generator).
/// - **Transform**: Converts one data item into another (pure transformation).
/// - **Sink**: Consumes a final data item (e.g., print, store, aggregate).
///
/// The type `DATA` is typically an enum that unifies all data variants that can flow through
/// the pipeline. The type `ERROR` is the error type used by fallible operations.
///
/// # Generics
/// - `DATA`: The type of values passed between stages (must be `Send + Sync`).
/// - `ERROR`: The error type for fallible computations (usually `PipelineError`).
///
/// # Example
/// ```
/// # use std::io;
/// # enum MyData { Number(i32), Text(String) }
/// # type MyError = io::Error;
/// # let source_iter = vec![Ok(MyData::Number(42))].into_iter();
/// # let transform_fn = |d: MyData| -> Result<MyData, MyError> { Ok(d) };
/// # let sink_fn = |d: MyData| {};
/// let source = StepKind::Source(Box::new(source_iter));
/// let transform = StepKind::Transform(Box::new(transform_fn));
/// let sink = StepKind::Sink(Box::new(sink_fn));
/// ```
///
/// # Note
/// - A `Source` does not take an input; it yields an iterator of `Result<DATA, ERROR>`.
/// - A `Transform` takes a `DATA` and returns a `Result<DATA, ERROR>`.
/// - A `Sink` takes a `DATA` and performs a side effect (no return value).
pub enum StepKind<DATA, ERROR> {
/// Source: called repeatedly to produce the next data item.
/// Returns `Ok(data)` when a value is available,
/// `Err(e)` on error, and `Err(PipelineError::EndOfStream)` when exhausted.
Source(Box<dyn FnMut() -> Result<DATA, ERROR> + Send + Sync>),
/// Transform: pure computation from input to output (may fail).
Transform(Box<dyn Fn(DATA) -> Result<DATA, ERROR> + Send + Sync>),
/// Sink: final consumer, no result.
Sink(Box<dyn Fn(DATA) + Send + Sync>),
}
/// Creates a `StepKind::Source` from an iterator of plain values.
/// Each value is wrapped with the specified `$enum` variant and returned as `Ok`.
/// The source returns `Err(PipelineError::EndOfStream)` when exhausted.
///
/// # Example
/// ```
/// let iter = vec![1.0, 2.0].into_iter();
/// let source = make_source!(PipelineData, iter, Numeric);
/// ```
macro_rules! make_source {
($enum:ident, $iterator:expr, $output:ident) => {
StepKind::Source(Box::new({
let mut iter = $iterator.into_iter();
move || -> Result<$enum, PipelineError> {
match iter.next() {
Some(x) => Ok($enum::$output(x)),
None => Err(PipelineError::EndOfStream),
}
}
}))
};
}
/// Creates a `StepKind::Source` from an iterator of `Result<T, E>`.
/// On `Ok(x)` it wraps the value into the specified `$enum` variant.
/// On `Err(e)` it returns `Err(PipelineError::StepError(Box::new(e)))`.
/// Returns `Err(PipelineError::EndOfStream)` when the iterator ends.
///
/// # Example
/// ```
/// let iter = vec![Ok(1.0), Err("oops")].into_iter();
/// let source = make_source_fallible!(PipelineData, iter, Numeric);
/// ```
macro_rules! make_source_fallible {
($enum:ident, $iterator:expr, $output:ident) => {
StepKind::Source(Box::new({
let mut iter = $iterator.into_iter();
move || -> Result<$enum, PipelineError> {
match iter.next() {
Some(Ok(x)) => Ok($enum::$output(x)),
Some(Err(e)) => Err(PipelineError::StepError(Box::new(e))),
None => Err(PipelineError::EndOfStream),
}
}
}))
};
}
/// Creates a pipeline stage from a pure (non-fallible) function.
///
/// This macro generates a closure that implements the `PipelineStage` trait by pattern
/// matching on the input `PipelineData` variant, applying the provided function, and
/// wrapping the result in the output variant.
///
/// # Arguments
/// * `$func` - The function to apply: `Fn(T) -> U`
/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`)
/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`)
///
/// # Example
///
/// ```ignore
/// // Define PipelineData enum
/// enum PipelineData {
/// Int(i64),
/// String(String),
/// }
///
/// // Create pure stage
/// let to_string_stage = make_stage!(
/// to_string,
/// Int(i64),
/// String(String)
/// );
///
/// // Use in pipeline
/// let result = to_string_stage(PipelineData::Int(42)).unwrap();
/// assert!(matches!(result, PipelineData::String(s) if s == "42"));
/// ```
macro_rules! make_transform {
($enum:ident, $func:ident, $input:ident, $output:ident) => {
StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> {
match data {
$enum::$input(x) => Ok($enum::$output($func(x))),
_ => Err(PipelineError::TypeMismatch),
}
}))
};
}
/// Creates a pipeline stage from a fallible function.
///
/// This macro generates a closure that pattern matches on the input `PipelineData`
/// variant, applies the provided function, and wraps the result in the output variant.
/// If the function returns an error, it is boxed and wrapped in `PipelineError::StepError`.
///
/// # Arguments
/// * `$func` - The fallible function to apply: `Fn(T) -> Result<U, E>`
/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`)
/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`)
///
/// # Example
///
/// ```ignore
/// fn parse_int(s: &str) -> Result<i64, std::num::ParseIntError> {
/// s.parse()
/// }
///
/// // Define PipelineData enum
/// enum PipelineData {
/// String(String),
/// Int(i64),
/// }
///
/// // Create fallible stage
/// let parse_stage = make_stage_fallible!(
/// parse_int,
/// String(String),
/// Int(i64)
/// );
///
/// // Use in pipeline
/// let result = parse_stage(PipelineData::String("42".into())).unwrap();
/// assert!(matches!(result, PipelineData::Int(n) if n == 42));
/// ```
macro_rules! make_transform_fallible {
($enum:ident, $func:ident, $input:ident, $output:ident) => {
StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> {
match data {
PipelineData::$input(inner) => {
let result = $func(inner).map_err(|e| PipelineError::StepError(Box::new(e)))?;
Ok($enum::$output(result))
}
_ => Err(PipelineError::TypeMismatch),
}
}
};
}
/// Creates a `StepKind::Sink` from a function that consumes a concrete value and returns `()`.
/// The returned sink always returns `Ok(())`.
/// The function is wrapped to accept `$enum::$input` and ignores the result.
///
/// # Example
/// ```
/// fn print_number(n: f64) { println!("{}", n); }
/// let sink = make_sink!(PipelineData, print_number, Numeric);
/// ```
macro_rules! make_sink {
($enum:ident, $func:ident, $input:ident) => {
StepKind::Sink(Box::new(|data: $enum| -> Result<(), PipelineError> {
match data {
$enum::$input(x) => {
$func(x);
Ok(())
}
_ => Err(PipelineError::TypeMismatch),
}
}))
};
}
/// Creates a `StepKind::Sink` from a fallible function that returns `Result<(), E>`.
/// Errors from `$func` are wrapped in `PipelineError::StepError`.
///
/// # Example
/// ```
/// fn save_to_file(hash: u64) -> std::io::Result<()> { Ok(()) }
/// let sink = make_sink_fallible!(PipelineData, save_to_file, Hash);
/// ```
macro_rules! make_sink_fallible {
($enum:ident, $func:ident, $input:ident) => {
StepKind::Sink(Box::new(|data: $enum| -> Result<(), PipelineError> {
match data {
$enum::$input(x) => {
$func(x).map_err(|e| PipelineError::StepError(Box::new(e)))
}
_ => Err(PipelineError::TypeMismatch),
}
}))
};
}
struct Step<DATA, ERROR> {
input: Option<DATA>,
output: Option<DATA>,
func: StepKind<DATA, ERROR>,
}
fn source_runner<DATA, ERROR>(
mut source: StepKind<DATA, ERROR>,
capacity: usize,
) -> (Receiver<DATA>, std::thread::JoinHandle<()>)
where
ERROR: std::fmt::Debug,
{
let (tx, rx) = bounded(capacity);
let handle = std::thread::spawn(move || {
if let StepKind::Source(src) = &mut source {
loop {
match src() {
Ok(data) => {
if tx.send(data).is_err() {
break; // récepteur disparu
}
}
Err(PipelineError::EndOfStream) => break,
Err(e) => {
eprintln!("Source error: {:?}", e);
break;
}
}
}
} else {
panic!("source_runner called with non-Source step");
}
});
(rx, handle)
}
fn runner<DATA, ERROR>(
task_rx: Receiver<(DATA, Step<DATA, ERROR>, usize)>,
result_tx: Vec<Sender<Result<DATA, ERROR>>>,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
while let Ok((data, step, dest)) = task_rx.recv() {
let res = (step.func)(data);
if result_tx[dest].send(res).is_err() {
break;
}
}
})
}
fn sink_runner<DATA, ERROR>(
sink: StepKind<DATA, ERROR>,
rx: Receiver<DATA>,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
if let StepKind::Sink(sink_fn) = sink {
for data in rx {
sink_fn(data);
}
} else {
panic!("sink_runner called with non-Sink step");
}
})
}
pub struct Pipeline<DATA, ERROR> {
source: StepKind<DATA, ERROR>,
transforms: Vec<StepKind<DATA, ERROR>>, // toutes Transform
sink: StepKind<DATA, ERROR>,
}
struct WorkerPool<DATA, ERROR> {
pipeline: Pipeline<DATA, ERROR>,
task_tx: Vec<Sender<(DATA, Step<DATA, ERROR>, dest usize)>>,
task_rx: Vec<Receiver<DATA>>,
handles: Vec<std::thread::JoinHandle<()>>,
n_workers: usize,
capacity: usize,
}
impl<DATA, ERROR> WorkerPool<DATA, ERROR> {
pub fn new(pipeline: Pipeline<DATA, ERROR>, n_workers: usize, capacity: usize) -> Self {
Self {
pipeline,
task_tx: Vec::new(),
task_rx: Vec::new(),
handles: Vec::new(),
n_workers,
capacity,
}
let (source_tx, source_rx) = crossbeam::channel::bounded<Result<DATA,PipelineError>>(capacity);
for i in 0..pipeline.transforms.len() {
let (task_tx, task_rx) = crossbeam::channel::bounded<Result<DATA,PipelineError>>(capacity);
self.task_tx.push(task_tx);
self.task_rx.push(task_rx);
}
}
pub fn run(&mut self) {
let (source_rx, source_handle) = source_runner(self.pipeline.source, self.capacity);
self.handles.push(source_handle);
let (transform_tx, transform_rx) = crossbeam::channel::bounded<(DATA, Step<DATA, ERROR>, dest usize)>(self.capacity);
for i in 0..self.n_workers {
self.handles.push(runner(transform_tx.clone(), transform_rx.clone()));
}
}
}
+90
View File
@@ -0,0 +1,90 @@
/// Creates a pipeline stage from a pure (non-fallible) function.
///
/// This macro generates a closure that implements the `PipelineStage` trait by pattern
/// matching on the input `PipelineData` variant, applying the provided function, and
/// wrapping the result in the output variant.
///
/// # Arguments
/// * `$func` - The function to apply: `Fn(T) -> U`
/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`)
/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`)
///
/// # Example
///
/// ```ignore
/// // Define PipelineData enum
/// enum PipelineData {
/// Int(i64),
/// String(String),
/// }
///
/// // Create pure stage
/// let to_string_stage = make_stage!(
/// to_string,
/// Int(i64),
/// String(String)
/// );
///
/// // Use in pipeline
/// let result = to_string_stage(PipelineData::Int(42)).unwrap();
/// assert!(matches!(result, PipelineData::String(s) if s == "42"));
/// ```
macro_rules! make_transform {
($enum:ident, $func:ident, $input:ident, $output:ident) => {
StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> {
match data {
$enum::$input(x) => Ok($enum::$output($func(x))),
_ => Err(PipelineError::TypeMismatch),
}
}))
};
}
/// Creates a pipeline stage from a fallible function.
///
/// This macro generates a closure that pattern matches on the input `PipelineData`
/// variant, applies the provided function, and wraps the result in the output variant.
/// If the function returns an error, it is boxed and wrapped in `PipelineError::StepError`.
///
/// # Arguments
/// * `$func` - The fallible function to apply: `Fn(T) -> Result<U, E>`
/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`)
/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`)
///
/// # Example
///
/// ```ignore
/// fn parse_int(s: &str) -> Result<i64, std::num::ParseIntError> {
/// s.parse()
/// }
///
/// // Define PipelineData enum
/// enum PipelineData {
/// String(String),
/// Int(i64),
/// }
///
/// // Create fallible stage
/// let parse_stage = make_stage_fallible!(
/// parse_int,
/// String(String),
/// Int(i64)
/// );
///
/// // Use in pipeline
/// let result = parse_stage(PipelineData::String("42".into())).unwrap();
/// assert!(matches!(result, PipelineData::Int(n) if n == 42));
/// ```
macro_rules! make_transform_fallible {
($enum:ident, $func:ident, $input:ident, $output:ident) => {
StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> {
match data {
PipelineData::$input(inner) => {
let result = $func(inner).map_err(|e| PipelineError::StepError(Box::new(e)))?;
Ok($enum::$output(result))
}
_ => Err(PipelineError::TypeMismatch),
}
}
};
}