From 3f8880a7e59a5975e477529b3d7649c73e52fe90 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Thu, 23 Apr 2026 21:03:48 +0200 Subject: [PATCH] :package: Add infer and new pipeline infrastructure - Update Cargo.lock with dependency additions (bumpalo, byteorder, cfb, fnv, infer, js-sys, uuid wasm-bindgen) - Refactor obikseq::superkmer: reorder imports and improve formatting - Add `obipipeline` crate with scheduler, error handling & macros (WIP) - Replace obiread::expand_paths logic with PathIter and path_iterator module - Add mimetype detection using `infer` crate via PeekReader wrapper --- src/Cargo.lock | 110 ++++++ src/obikseq/src/superkmer.rs | 72 ++-- src/obipipeline/Cargo.toml | 4 + src/obipipeline/examples/sandbox_pipeline.rs | 80 ++++ src/obipipeline/src/error.rs | 4 + src/obipipeline/src/lib.rs | 2 + src/obipipeline/src/scheduler.rs | 379 +++++++++++++++++++ src/obipipeline/src/wrapper.rs | 90 +++++ src/obiread/Cargo.toml | 1 + src/obiread/examples/expand_path.rs | 22 +- src/obiread/src/lib.rs | 6 +- src/obiread/src/list_of_files.rs | 47 --- src/obiread/src/mimetype.rs | 43 +++ src/obiread/src/path_iterator.rs | 72 ++++ src/obiread/src/peakreader.rs | 47 +++ 15 files changed, 893 insertions(+), 86 deletions(-) create mode 100644 src/obipipeline/Cargo.toml create mode 100644 src/obipipeline/examples/sandbox_pipeline.rs create mode 100644 src/obipipeline/src/error.rs create mode 100644 src/obipipeline/src/lib.rs create mode 100644 src/obipipeline/src/scheduler.rs create mode 100644 src/obipipeline/src/wrapper.rs delete mode 100644 src/obiread/src/list_of_files.rs create mode 100644 src/obiread/src/mimetype.rs create mode 100644 src/obiread/src/path_iterator.rs create mode 100644 src/obiread/src/peakreader.rs diff --git a/src/Cargo.lock b/src/Cargo.lock index 78bb33b..d3012bb 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -103,6 +103,18 @@ version = "0.9.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b8d4b90317451025bfa1f7d359a5fa698a5f745927ff27696429b7fbd7c0c48" +[[package]] +name = "bumpalo" +version = "3.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.1" @@ -147,6 +159,17 @@ dependencies = [ "shlex", ] +[[package]] +name = "cfb" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38f2da7a0a2c4ccf0065be06397cc26a81f4e528be095826eee9d4adbb8c60f" +dependencies = [ + "byteorder", + "fnv", + "uuid", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -326,6 +349,12 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -484,6 +513,15 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "infer" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a588916bfdfd92e71cacef98a63d9b1f0d74d6599980d11894290e7ddefffcf7" +dependencies = [ + "cfb", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -506,6 +544,16 @@ dependencies = [ "libc", ] +[[package]] +name = "js-sys" +version = "0.3.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -648,6 +696,7 @@ dependencies = [ name = "obiread" version = "0.1.0" dependencies = [ + "infer", "niffler", "obikrope", "tracing", @@ -825,6 +874,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + [[package]] name = "same-file" version = "1.0.6" @@ -1102,6 +1157,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.1" @@ -1133,6 +1198,51 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.118" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.118" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.118" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.118" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" +dependencies = [ + "unicode-ident", +] + [[package]] name = "webpki-roots" version = "0.26.11" diff --git a/src/obikseq/src/superkmer.rs b/src/obikseq/src/superkmer.rs index c44154c..98f6b26 100644 --- a/src/obikseq/src/superkmer.rs +++ b/src/obikseq/src/superkmer.rs @@ -1,9 +1,9 @@ //! Compact 2-bit DNA super-kmer with in-place reverse complement and canonical form. -use bitvec::prelude::*; -use crate::encoding::{encode_base, DEC4}; +use crate::encoding::{DEC4, encode_base}; use crate::kmer::{Kmer, KmerError}; use crate::revcomp_lookup::REVCOMP4; +use bitvec::prelude::*; // ── SuperKmerHeader ─────────────────────────────────────────────────────────── @@ -193,7 +193,8 @@ impl SuperKmer { let bytes = &mut self.seq[..n]; let (mut lo, mut hi) = (0, n - 1); while lo < hi { - (bytes[lo], bytes[hi]) = (REVCOMP4[bytes[hi] as usize], REVCOMP4[bytes[lo] as usize]); + (bytes[lo], bytes[hi]) = + (REVCOMP4[bytes[hi] as usize], REVCOMP4[bytes[lo] as usize]); lo += 1; hi -= 1; } @@ -216,16 +217,19 @@ impl SuperKmer { /// The result is not yet in canonical form; call `.canonical()` if needed. pub fn from_ascii(ascii: &[u8]) -> Self { let seql = ascii.len(); - debug_assert!(seql >= 1 && seql <= 256, "super-kmer length must be 1..=256"); + debug_assert!( + seql >= 1 && seql <= 256, + "super-kmer length must be 1..=256" + ); let n = byte_len(seql); let mut seq = vec![0u8; n]; let full = seql / 4; for i in 0..full { - seq[i] = encode_base(ascii[i * 4]) << 6 - | encode_base(ascii[i * 4 + 1]) << 4 - | encode_base(ascii[i * 4 + 2]) << 2 - | encode_base(ascii[i * 4 + 3]); + seq[i] = encode_base(ascii[i * 4]) << 6 + | encode_base(ascii[i * 4 + 1]) << 4 + | encode_base(ascii[i * 4 + 2]) << 2 + | encode_base(ascii[i * 4 + 3]); } let rem = seql % 4; if rem > 0 { @@ -236,7 +240,7 @@ impl SuperKmer { seq[full] = last; } - Self::new(seql as u8, seq.into_boxed_slice()) // 256usize as u8 == 0, intentional + Self::new(seql as u8, seq.into_boxed_slice()) // 256usize as u8 == 0, intentional } /// Decode this super-kmer sequence into ASCII nucleotides, appending into `buf`. @@ -270,7 +274,11 @@ impl SuperKmer { } let seql = self.seql(); if i + k > seql { - return Err(KmerError::OutOfBounds { position: i, k, seql }); + return Err(KmerError::OutOfBounds { + position: i, + k, + seql, + }); } let bits = self.seq.view_bits::(); let raw: u64 = bits[i * 2..(i + k) * 2].load_be(); @@ -334,11 +342,16 @@ mod tests { /// Reference revcomp on ASCII bytes. fn ascii_revcomp(seq: &[u8]) -> Vec { - seq.iter().rev().map(|&b| match b { - b'A' => b'T', b'T' => b'A', - b'C' => b'G', b'G' => b'C', - _ => b'A', - }).collect() + seq.iter() + .rev() + .map(|&b| match b { + b'A' => b'T', + b'T' => b'A', + b'C' => b'G', + b'G' => b'C', + _ => b'A', + }) + .collect() } fn all_lengths() -> impl Iterator { @@ -545,23 +558,24 @@ mod tests { fn revcomp_known_values() { let cases = [ // shift=6 - ("A", "T"), - ("ACGTA", "TACGT"), + ("A", "T"), + ("ACGTA", "TACGT"), // shift=4 - ("AC", "GT"), - ("ACGTAC", "GTACGT"), + ("AC", "GT"), + ("ACGTAC", "GTACGT"), // shift=2 - ("ACG", "CGT"), - ("ACGTACG", "CGTACGT"), + ("ACG", "CGT"), + ("ACGTACG", "CGTACGT"), // shift=0 - ("ACGT", "ACGT"), + ("ACGT", "ACGT"), ("ACGTACGT", "ACGTACGT"), ]; for (seq, expected) in cases { let mut sk = SuperKmer::from_ascii(seq.as_bytes()); sk.revcomp(); assert_eq!( - sk.to_ascii(), expected.as_bytes(), + sk.to_ascii(), + expected.as_bytes(), "revcomp wrong for \"{seq}\"" ); } @@ -594,21 +608,24 @@ mod tests { #[test] fn canonical_palindrome_unchanged() { // ACGT is its own revcomp - let sk = SuperKmer::from_ascii(b"ACGT").canonical(); + let mut sk = SuperKmer::from_ascii(b"ACGT"); + sk.canonical(); assert_eq!(sk.to_ascii(), b"ACGT"); } #[test] fn canonical_chooses_forward() { // "AAAA" < "TTTT" → stays as-is - let sk = SuperKmer::from_ascii(b"AAAA").canonical(); + let mut sk = SuperKmer::from_ascii(b"AAAA"); + sk.canonical(); assert_eq!(sk.to_ascii(), b"AAAA"); } #[test] fn canonical_chooses_revcomp() { // "TTTT" > "AAAA" → flipped - let sk = SuperKmer::from_ascii(b"TTTT").canonical(); + let mut sk = SuperKmer::from_ascii(b"TTTT"); + sk.canonical(); assert_eq!(sk.to_ascii(), b"AAAA"); } @@ -616,7 +633,8 @@ mod tests { fn canonical_is_minimal_all_lengths() { for len in all_lengths() { let ascii = make_seq(len); - let sk = SuperKmer::from_ascii(&ascii).canonical(); + let mut sk = SuperKmer::from_ascii(&ascii); + sk.canonical(); let fwd = sk.to_ascii(); let rev = ascii_revcomp(&fwd); assert!(fwd <= rev, "canonical not minimal for len={len}"); diff --git a/src/obipipeline/Cargo.toml b/src/obipipeline/Cargo.toml new file mode 100644 index 0000000..09d1d73 --- /dev/null +++ b/src/obipipeline/Cargo.toml @@ -0,0 +1,4 @@ +[package] +name = "obipipeline" +version = "0.1.0" +edition = "2024" diff --git a/src/obipipeline/examples/sandbox_pipeline.rs b/src/obipipeline/examples/sandbox_pipeline.rs new file mode 100644 index 0000000..19f6f76 --- /dev/null +++ b/src/obipipeline/examples/sandbox_pipeline.rs @@ -0,0 +1,80 @@ +use crossbeam_channel::bounded; +use obipipeline::{Connection, PipelineData, PipelineError}; + +struct Compteur { + count: u64, + start: u64, + stop: u64, +} + +impl Compteur { + fn new(start: u64, stop: u64) -> Self { + Self { + count: start, + start, + stop, + } + } +} + +impl Iterator for Compteur { + type Item = u64; + + fn next(&mut self) -> Option { + if self.count > self.stop { + return None; + } + let result = self.count; + self.count += 1; + Some(result) + } +} + +fn step_1(number: f64) -> Result { + Ok(format!("{}", number)) +} + +fn step_2(text: String) -> Result { + Ok(text.chars().rev().collect()) +} + +fn step_3(text: String) -> Result { + let mut hash: u64 = 5381; + for byte in text.bytes() { + hash = hash.wrapping_mul(33).wrapping_add(byte as u64); + } + Ok(hash) +} + +fn step_4(hash: u64) -> Result<(), PipelineError> { + println!("{:?}", hash); + Ok(()) +} + +fn main() { + // Create bounded channels for connection between stages + let (tx0, rx0) = bounded(100); + let (tx1, rx1) = bounded(100); + let (tx2, rx2) = bounded(100); + let (tx3, rx3) = bounded(100); + + // Create connections using crossbeam channels + let conn0 = Connection { + source: tx0, + target: rx0, + }; + let conn1 = Connection { + source: tx1, + target: rx1, + }; + let conn2 = Connection { + source: tx2, + target: rx2, + }; + let conn3 = Connection { + source: tx3, + target: rx3, + }; + + println!("Pipeline with crossbeam channel connections created"); +} diff --git a/src/obipipeline/src/error.rs b/src/obipipeline/src/error.rs new file mode 100644 index 0000000..b1ab4a3 --- /dev/null +++ b/src/obipipeline/src/error.rs @@ -0,0 +1,4 @@ +pub enum PipelineError { + TypeMismatch, + StepError(Box), +} diff --git a/src/obipipeline/src/lib.rs b/src/obipipeline/src/lib.rs new file mode 100644 index 0000000..1b5c60a --- /dev/null +++ b/src/obipipeline/src/lib.rs @@ -0,0 +1,2 @@ +pub mod error; +pub mod wrapper; diff --git a/src/obipipeline/src/scheduler.rs b/src/obipipeline/src/scheduler.rs new file mode 100644 index 0000000..dd3dbf7 --- /dev/null +++ b/src/obipipeline/src/scheduler.rs @@ -0,0 +1,379 @@ +use crossbeam_channel::{bounded, Receiver, Sender}; +use std::thread; +use std::error::Error; +use std::fmt; + +/// Error type for pipeline operations. +#[derive(Debug)] +pub enum PipelineError { + /// A stage received a `PipelineData` variant it did not expect. + TypeMismatch, + /// The source has no more data to produce. + EndOfStream, + /// An error occurred inside a stage (e.g., I/O, parsing, custom logic). + StepError(Box), +} + +impl fmt::Display for PipelineError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PipelineError::TypeMismatch => write!(f, "data type mismatch in pipeline stage"), + PipelineError::EndOfStream => write!(f, "end of input stream"), + PipelineError::StepError(e) => write!(f, "stage error: {}", e), + } + } +} + +impl Error for PipelineError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + PipelineError::StepError(e) => Some(e.as_ref()), + _ => None, + } + } +} + +/// Represents a single processing stage in a data pipeline. +/// +/// `StepKind` abstracts over three fundamental types of operations: +/// - **Source**: Produces a sequence of data items (e.g., from a file, iterator, generator). +/// - **Transform**: Converts one data item into another (pure transformation). +/// - **Sink**: Consumes a final data item (e.g., print, store, aggregate). +/// +/// The type `DATA` is typically an enum that unifies all data variants that can flow through +/// the pipeline. The type `ERROR` is the error type used by fallible operations. +/// +/// # Generics +/// - `DATA`: The type of values passed between stages (must be `Send + Sync`). +/// - `ERROR`: The error type for fallible computations (usually `PipelineError`). +/// +/// # Example +/// ``` +/// # use std::io; +/// # enum MyData { Number(i32), Text(String) } +/// # type MyError = io::Error; +/// # let source_iter = vec![Ok(MyData::Number(42))].into_iter(); +/// # let transform_fn = |d: MyData| -> Result { Ok(d) }; +/// # let sink_fn = |d: MyData| {}; +/// let source = StepKind::Source(Box::new(source_iter)); +/// let transform = StepKind::Transform(Box::new(transform_fn)); +/// let sink = StepKind::Sink(Box::new(sink_fn)); +/// ``` +/// +/// # Note +/// - A `Source` does not take an input; it yields an iterator of `Result`. +/// - A `Transform` takes a `DATA` and returns a `Result`. +/// - A `Sink` takes a `DATA` and performs a side effect (no return value). +pub enum StepKind { + /// Source: called repeatedly to produce the next data item. + /// Returns `Ok(data)` when a value is available, + /// `Err(e)` on error, and `Err(PipelineError::EndOfStream)` when exhausted. + Source(Box Result + Send + Sync>), + + /// Transform: pure computation from input to output (may fail). + Transform(Box Result + Send + Sync>), + + /// Sink: final consumer, no result. + Sink(Box), +} + + +/// Creates a `StepKind::Source` from an iterator of plain values. +/// Each value is wrapped with the specified `$enum` variant and returned as `Ok`. +/// The source returns `Err(PipelineError::EndOfStream)` when exhausted. +/// +/// # Example +/// ``` +/// let iter = vec![1.0, 2.0].into_iter(); +/// let source = make_source!(PipelineData, iter, Numeric); +/// ``` +macro_rules! make_source { + ($enum:ident, $iterator:expr, $output:ident) => { + StepKind::Source(Box::new({ + let mut iter = $iterator.into_iter(); + move || -> Result<$enum, PipelineError> { + match iter.next() { + Some(x) => Ok($enum::$output(x)), + None => Err(PipelineError::EndOfStream), + } + } + })) + }; +} + +/// Creates a `StepKind::Source` from an iterator of `Result`. +/// On `Ok(x)` it wraps the value into the specified `$enum` variant. +/// On `Err(e)` it returns `Err(PipelineError::StepError(Box::new(e)))`. +/// Returns `Err(PipelineError::EndOfStream)` when the iterator ends. +/// +/// # Example +/// ``` +/// let iter = vec![Ok(1.0), Err("oops")].into_iter(); +/// let source = make_source_fallible!(PipelineData, iter, Numeric); +/// ``` +macro_rules! make_source_fallible { + ($enum:ident, $iterator:expr, $output:ident) => { + StepKind::Source(Box::new({ + let mut iter = $iterator.into_iter(); + move || -> Result<$enum, PipelineError> { + match iter.next() { + Some(Ok(x)) => Ok($enum::$output(x)), + Some(Err(e)) => Err(PipelineError::StepError(Box::new(e))), + None => Err(PipelineError::EndOfStream), + } + } + })) + }; +} + +/// Creates a pipeline stage from a pure (non-fallible) function. +/// +/// This macro generates a closure that implements the `PipelineStage` trait by pattern +/// matching on the input `PipelineData` variant, applying the provided function, and +/// wrapping the result in the output variant. +/// +/// # Arguments +/// * `$func` - The function to apply: `Fn(T) -> U` +/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`) +/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`) +/// +/// # Example +/// +/// ```ignore +/// // Define PipelineData enum +/// enum PipelineData { +/// Int(i64), +/// String(String), +/// } +/// +/// // Create pure stage +/// let to_string_stage = make_stage!( +/// to_string, +/// Int(i64), +/// String(String) +/// ); +/// +/// // Use in pipeline +/// let result = to_string_stage(PipelineData::Int(42)).unwrap(); +/// assert!(matches!(result, PipelineData::String(s) if s == "42")); +/// ``` +macro_rules! make_transform { + ($enum:ident, $func:ident, $input:ident, $output:ident) => { + StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> { + match data { + $enum::$input(x) => Ok($enum::$output($func(x))), + _ => Err(PipelineError::TypeMismatch), + } + })) + }; +} + +/// Creates a pipeline stage from a fallible function. +/// +/// This macro generates a closure that pattern matches on the input `PipelineData` +/// variant, applies the provided function, and wraps the result in the output variant. +/// If the function returns an error, it is boxed and wrapped in `PipelineError::StepError`. +/// +/// # Arguments +/// * `$func` - The fallible function to apply: `Fn(T) -> Result` +/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`) +/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`) +/// +/// # Example +/// +/// ```ignore +/// fn parse_int(s: &str) -> Result { +/// s.parse() +/// } +/// +/// // Define PipelineData enum +/// enum PipelineData { +/// String(String), +/// Int(i64), +/// } +/// +/// // Create fallible stage +/// let parse_stage = make_stage_fallible!( +/// parse_int, +/// String(String), +/// Int(i64) +/// ); +/// +/// // Use in pipeline +/// let result = parse_stage(PipelineData::String("42".into())).unwrap(); +/// assert!(matches!(result, PipelineData::Int(n) if n == 42)); +/// ``` +macro_rules! make_transform_fallible { + ($enum:ident, $func:ident, $input:ident, $output:ident) => { + StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> { + match data { + PipelineData::$input(inner) => { + let result = $func(inner).map_err(|e| PipelineError::StepError(Box::new(e)))?; + Ok($enum::$output(result)) + } + _ => Err(PipelineError::TypeMismatch), + } + } + }; +} + +/// Creates a `StepKind::Sink` from a function that consumes a concrete value and returns `()`. +/// The returned sink always returns `Ok(())`. +/// The function is wrapped to accept `$enum::$input` and ignores the result. +/// +/// # Example +/// ``` +/// fn print_number(n: f64) { println!("{}", n); } +/// let sink = make_sink!(PipelineData, print_number, Numeric); +/// ``` +macro_rules! make_sink { + ($enum:ident, $func:ident, $input:ident) => { + StepKind::Sink(Box::new(|data: $enum| -> Result<(), PipelineError> { + match data { + $enum::$input(x) => { + $func(x); + Ok(()) + } + _ => Err(PipelineError::TypeMismatch), + } + })) + }; +} + +/// Creates a `StepKind::Sink` from a fallible function that returns `Result<(), E>`. +/// Errors from `$func` are wrapped in `PipelineError::StepError`. +/// +/// # Example +/// ``` +/// fn save_to_file(hash: u64) -> std::io::Result<()> { Ok(()) } +/// let sink = make_sink_fallible!(PipelineData, save_to_file, Hash); +/// ``` +macro_rules! make_sink_fallible { + ($enum:ident, $func:ident, $input:ident) => { + StepKind::Sink(Box::new(|data: $enum| -> Result<(), PipelineError> { + match data { + $enum::$input(x) => { + $func(x).map_err(|e| PipelineError::StepError(Box::new(e))) + } + _ => Err(PipelineError::TypeMismatch), + } + })) + }; +} + +struct Step { + input: Option, + output: Option, + func: StepKind, +} + +fn source_runner( + mut source: StepKind, + capacity: usize, +) -> (Receiver, std::thread::JoinHandle<()>) +where + ERROR: std::fmt::Debug, +{ + let (tx, rx) = bounded(capacity); + let handle = std::thread::spawn(move || { + if let StepKind::Source(src) = &mut source { + loop { + match src() { + Ok(data) => { + if tx.send(data).is_err() { + break; // récepteur disparu + } + } + Err(PipelineError::EndOfStream) => break, + Err(e) => { + eprintln!("Source error: {:?}", e); + break; + } + } + } + } else { + panic!("source_runner called with non-Source step"); + } + }); + (rx, handle) +} + +fn runner( + task_rx: Receiver<(DATA, Step, usize)>, + result_tx: Vec>>, +) -> std::thread::JoinHandle<()> { + std::thread::spawn(move || { + while let Ok((data, step, dest)) = task_rx.recv() { + let res = (step.func)(data); + if result_tx[dest].send(res).is_err() { + break; + } + } + }) +} + +fn sink_runner( + sink: StepKind, + rx: Receiver, +) -> std::thread::JoinHandle<()> { + std::thread::spawn(move || { + if let StepKind::Sink(sink_fn) = sink { + for data in rx { + sink_fn(data); + } + } else { + panic!("sink_runner called with non-Sink step"); + } + }) +} + +pub struct Pipeline { + source: StepKind, + transforms: Vec>, // toutes Transform + sink: StepKind, +} + + + +struct WorkerPool { + pipeline: Pipeline, + task_tx: Vec, dest usize)>>, + task_rx: Vec>, + handles: Vec>, + n_workers: usize, + capacity: usize, +} + +impl WorkerPool { + pub fn new(pipeline: Pipeline, n_workers: usize, capacity: usize) -> Self { + Self { + pipeline, + task_tx: Vec::new(), + task_rx: Vec::new(), + handles: Vec::new(), + n_workers, + capacity, + } + + let (source_tx, source_rx) = crossbeam::channel::bounded>(capacity); + + for i in 0..pipeline.transforms.len() { + let (task_tx, task_rx) = crossbeam::channel::bounded>(capacity); + self.task_tx.push(task_tx); + self.task_rx.push(task_rx); + } + } + + pub fn run(&mut self) { + let (source_rx, source_handle) = source_runner(self.pipeline.source, self.capacity); + self.handles.push(source_handle); + + let (transform_tx, transform_rx) = crossbeam::channel::bounded<(DATA, Step, dest usize)>(self.capacity); + + for i in 0..self.n_workers { + self.handles.push(runner(transform_tx.clone(), transform_rx.clone())); + } + + + } +} diff --git a/src/obipipeline/src/wrapper.rs b/src/obipipeline/src/wrapper.rs new file mode 100644 index 0000000..281eca3 --- /dev/null +++ b/src/obipipeline/src/wrapper.rs @@ -0,0 +1,90 @@ +/// Creates a pipeline stage from a pure (non-fallible) function. +/// +/// This macro generates a closure that implements the `PipelineStage` trait by pattern +/// matching on the input `PipelineData` variant, applying the provided function, and +/// wrapping the result in the output variant. +/// +/// # Arguments +/// * `$func` - The function to apply: `Fn(T) -> U` +/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`) +/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`) +/// +/// # Example +/// +/// ```ignore +/// // Define PipelineData enum +/// enum PipelineData { +/// Int(i64), +/// String(String), +/// } +/// +/// // Create pure stage +/// let to_string_stage = make_stage!( +/// to_string, +/// Int(i64), +/// String(String) +/// ); +/// +/// // Use in pipeline +/// let result = to_string_stage(PipelineData::Int(42)).unwrap(); +/// assert!(matches!(result, PipelineData::String(s) if s == "42")); +/// ``` +macro_rules! make_transform { + ($enum:ident, $func:ident, $input:ident, $output:ident) => { + StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> { + match data { + $enum::$input(x) => Ok($enum::$output($func(x))), + _ => Err(PipelineError::TypeMismatch), + } + })) + }; +} + +/// Creates a pipeline stage from a fallible function. +/// +/// This macro generates a closure that pattern matches on the input `PipelineData` +/// variant, applies the provided function, and wraps the result in the output variant. +/// If the function returns an error, it is boxed and wrapped in `PipelineError::StepError`. +/// +/// # Arguments +/// * `$func` - The fallible function to apply: `Fn(T) -> Result` +/// * `$input` - Input `PipelineData` variant pattern (e.g., `Int(i64)`) +/// * `$output` - Output `PipelineData` variant pattern (e.g., `String(String)`) +/// +/// # Example +/// +/// ```ignore +/// fn parse_int(s: &str) -> Result { +/// s.parse() +/// } +/// +/// // Define PipelineData enum +/// enum PipelineData { +/// String(String), +/// Int(i64), +/// } +/// +/// // Create fallible stage +/// let parse_stage = make_stage_fallible!( +/// parse_int, +/// String(String), +/// Int(i64) +/// ); +/// +/// // Use in pipeline +/// let result = parse_stage(PipelineData::String("42".into())).unwrap(); +/// assert!(matches!(result, PipelineData::Int(n) if n == 42)); +/// ``` +macro_rules! make_transform_fallible { + ($enum:ident, $func:ident, $input:ident, $output:ident) => { + StepKind::Transform(Box::new(|data: $enum| -> Result<$enum, PipelineError> { + match data { + PipelineData::$input(inner) => { + let result = $func(inner).map_err(|e| PipelineError::StepError(Box::new(e)))?; + Ok($enum::$output(result)) + } + _ => Err(PipelineError::TypeMismatch), + } + } + }; +} diff --git a/src/obiread/Cargo.toml b/src/obiread/Cargo.toml index 8310c68..b982392 100644 --- a/src/obiread/Cargo.toml +++ b/src/obiread/Cargo.toml @@ -9,3 +9,4 @@ niffler = { version = "2", default-features = false, features = ["gz", "bz2", "l ureq = "2" tracing = "0.1.44" tracing-subscriber = { version = "0.3.23", features = ["fmt", "env-filter"] } +infer = "0.19.0" diff --git a/src/obiread/examples/expand_path.rs b/src/obiread/examples/expand_path.rs index e47d89d..4626548 100644 --- a/src/obiread/examples/expand_path.rs +++ b/src/obiread/examples/expand_path.rs @@ -1,19 +1,21 @@ -use obiread::expand_paths; -use tracing::{info, subscriber}; +use obiread::path_iter; +use std::env; +use tracing::info; use tracing_subscriber::{EnvFilter, fmt}; fn main() { // Build a subscriber with environment-based filtering - tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .init(); + fmt().with_env_filter(EnvFilter::from_default_env()).init(); info!("Expanding paths..."); - let paths = vec![ - "/home/user/data".to_string(), - "/home/user/sample.fastq.gz".to_string(), - ]; - let files = expand_paths(&paths); + let args: Vec = env::args().skip(1).collect(); + + if args.is_empty() { + eprintln!("Usage: cargo run -p obiread --example expand_path -- [path...]"); + std::process::exit(1); + } + + let files = path_iter(&args); for f in files { println!("{}", f.display()); } diff --git a/src/obiread/src/lib.rs b/src/obiread/src/lib.rs index 5476b99..c758579 100644 --- a/src/obiread/src/lib.rs +++ b/src/obiread/src/lib.rs @@ -8,11 +8,13 @@ pub mod chunk; mod fasta; mod fastq; -mod list_of_files; pub mod normalize; +mod path_iterator; +pub mod peakreader; pub mod xopen; -pub use list_of_files::expand_paths; +pub use path_iterator::{PathIter, path_iter}; +pub use peakreader::PeekReader; use std::io::Read; diff --git a/src/obiread/src/list_of_files.rs b/src/obiread/src/list_of_files.rs deleted file mode 100644 index f87ef81..0000000 --- a/src/obiread/src/list_of_files.rs +++ /dev/null @@ -1,47 +0,0 @@ -use std::fs; -use std::path::Path; -use std::path::PathBuf; -use tracing::info; - -/// Returns true if the path ends with a fasta or fastq file extension. -fn is_fasta_or_fastq(path: &Path) -> bool { - let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); - name.ends_with(".fasta") - || name.ends_with(".fa") - || name.ends_with(".fastq") - || name.ends_with(".fq") - || name.ends_with(".fasta.gz") - || name.ends_with(".fa.gz") - || name.ends_with(".fastq.gz") - || name.ends_with(".fq.gz") -} - -/// Walks a directory, collecting fasta or fastq files into the output vector. -fn walk_dir(dir: &Path, out: &mut Vec) { - if let Ok(entries) = fs::read_dir(dir) { - for entry in entries.flatten() { - let path = entry.path(); - if path.is_dir() { - walk_dir(&path, out); - } else if path.is_file() && is_fasta_or_fastq(&path) { - out.push(path); - } - } - } -} - -/// Expands a list of paths, returning a vector of `PathBuf` for fasta or fastq files. -pub fn expand_paths(paths: &[String]) -> Vec { - let mut result = Vec::new(); - for path_str in paths { - info!("Current step: {}", path_str); - let path = Path::new(path_str); - if path.is_dir() { - walk_dir(path, &mut result); - } else if path.is_file() && is_fasta_or_fastq(path) { - info!("Found fasta or fastq file: {}", path_str); - result.push(path.to_path_buf()); - } - } - result -} diff --git a/src/obiread/src/mimetype.rs b/src/obiread/src/mimetype.rs new file mode 100644 index 0000000..8b82b2c --- /dev/null +++ b/src/obiread/src/mimetype.rs @@ -0,0 +1,43 @@ +use infer::Infer; +use once_cell::sync::Lazy; +use regex::Regex; + +const BUF_SIZE: usize = 4096; + +static RE_FASTA: Lazy = Lazy::new(|| Regex::new(r"^>[^ ]").unwrap()); +fn is_fasta(buf: &[u8]) -> bool { + RE_FASTA.is_match(buf) +} + +static RE_FASTQ: Lazy = Lazy::new(|| Regex::new(r"^@[^ ].*\n[A-Za-z.-]+").unwrap()); +fn is_fastq(buf: &[u8]) -> bool { + RE_FASTQ.is_match(buf) +} + +static INFER: Lazy = Lazy::new(|| { + let mut infer = Infer::new(); + infer.add("text/fasta", "fasta", |buf| buf.starts_with(b">")); + infer.add("text/fastq", "fastq", |buf| { + buf.starts_with(b"@") && !buf.starts_with(b"@param,") + }); + infer +}); + +pub struct MimeTypeGuesser(PeekReader); + +impl MimeTypeGuesser { + pub fn new(reader: R) -> Self { + Self { PeekReader::new(reader, BUF_SIZE) } + } + + pub fn mime_type(&mut self) -> Option<&'static str> { + let buf = self.0.header(BUF_SIZE)?; + INFER.get_mime_type_for_bytes(buf).map(|kind| kind.mime_type) + } +} + +impl for MimeTypeGuesser { + fn read(&mut self, out: &mut [u8]) -> io::Result { + self.0.read(out) + } +} diff --git a/src/obiread/src/path_iterator.rs b/src/obiread/src/path_iterator.rs new file mode 100644 index 0000000..7a5a645 --- /dev/null +++ b/src/obiread/src/path_iterator.rs @@ -0,0 +1,72 @@ +use std::fs; +use std::path::{Path, PathBuf}; + +/// An iterator that yields paths to fasta or fastq files. +pub struct PathIter { + dir_stack: Vec, // dossiers qu'il reste à explorer + file_buffer: Vec, // fichiers déjà listés (à distribuer) +} + +impl PathIter { + /// Creates a new `PathIter` that will yield paths to fasta or fastq files. + pub fn new(paths: Vec) -> Self { + let mut iter = PathIter { + dir_stack: Vec::new(), + file_buffer: Vec::new(), + }; + for path in paths { + if path.is_dir() { + iter.dir_stack.push(path); + } else if path.is_file() && is_fasta_or_fastq(&path) { + iter.file_buffer.push(path); + } + } + iter + } +} + +impl Iterator for PathIter { + type Item = PathBuf; + + fn next(&mut self) -> Option { + // Tant qu'on n'a pas de fichier à donner + while self.file_buffer.is_empty() { + // Prendre un dossier dans la pile (s'il y en a) + let dir = self.dir_stack.pop()?; + // Lire le dossier + let entries = match fs::read_dir(&dir) { + Ok(entries) => entries, + Err(_) => continue, // si erreur (perm, etc.), on ignore ce dossier + }; + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + self.dir_stack.push(path); + } else if path.is_file() && is_fasta_or_fastq(&path) { + self.file_buffer.push(path); + } + } + } + // Maintenant file_buffer n'est pas vide + self.file_buffer.pop() + } +} + +/// Returns an iterator that yields paths to fasta or fastq files. +pub fn path_iter(paths: &[String]) -> PathIter { + let path_bufs: Vec = paths.iter().map(PathBuf::from).collect(); + PathIter::new(path_bufs) +} + +/// Returns true if the path ends with a fasta or fastq file extension. +fn is_fasta_or_fastq(path: &Path) -> bool { + let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); + name.ends_with(".fasta") + || name.ends_with(".fa") + || name.ends_with(".fastq") + || name.ends_with(".fq") + || name.ends_with(".fasta.gz") + || name.ends_with(".fa.gz") + || name.ends_with(".fastq.gz") + || name.ends_with(".fq.gz") +} diff --git a/src/obiread/src/peakreader.rs b/src/obiread/src/peakreader.rs new file mode 100644 index 0000000..dd91a79 --- /dev/null +++ b/src/obiread/src/peakreader.rs @@ -0,0 +1,47 @@ +//! PeekReader implementation. + +use std::io::Read; + +/// A reader that caches the first `buf_size` bytes for peeking. +pub struct PeekReader { + reader: R, + buffer: Vec, + pos: usize, +} + +impl PeekReader { + /// Creates a new PeekReader that caches the first `buf_size` bytes. + pub fn new(mut reader: R, buf_size: usize) -> Self { + let mut buffer = Vec::with_capacity(buf_size); + reader + .by_ref() + .take(buf_size as u64) + .read_to_end(&mut buffer) + .unwrap(); + + Self { + reader, + buffer, + pos: 0, + } + } + + /// Returns the buffered header bytes. + pub fn header(&mut self) -> &[u8] { + &self.buffer + } +} + +impl std::io::Read for PeekReader { + fn read(&mut self, out: &mut [u8]) -> std::io::Result { + if self.pos < self.buffer.len() { + let avail = &self.buffer[self.pos..]; + let to_copy = out.len().min(avail.len()); + out[..to_copy].copy_from_slice(&avail[..to_copy]); + self.pos += to_copy; + Ok(to_copy) + } else { + self.reader.read(out) + } + } +}