From f1c8fc85c9c2e7e0141c7f21a89fc15e2ce39dc0 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 24 Apr 2026 18:14:00 +0200 Subject: [PATCH] :arrow_up: refactor superkmer to use obipipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace manual threading with Pipeline abstraction from `obipipline` - Remove crossbeam-channel dependency and format detection logic - Introduce typed `PipelineData` enum for pipeline stages (RawChunk, Norm Chunk, Batch) - Implement shared normalization and extraction steps as `SharedFn`ƒ - Add unsafe Send/Sync impls for PipelineData (Rope ownership is moved, not shared) - Replace manual reader/worker/output threads with a single Pipeline execution - Uses `make_source_fallible!`, shared transform functions, and a sink for output - Simplify argument handling (remove `--format` flag) - Update Cargo.toml: remove crossbeam-channel, add obipipeline --- src/obikrope/src/cursor.rs | 13 +-- src/obikrope/src/rope.rs | 22 +++-- src/obiread/src/chunk.rs | 154 ++++++++++++++++++++++++++++++----- src/obiread/src/fasta.rs | 4 +- src/obiread/src/fastq.rs | 28 ++++--- src/obiread/src/lib.rs | 20 ++--- src/obiread/src/mimetype.rs | 4 + src/obiread/src/normalize.rs | 50 +++++++++++- src/obiread/src/xopen.rs | 12 +-- src/obiskbuilder/src/iter.rs | 4 +- 10 files changed, 236 insertions(+), 75 deletions(-) diff --git a/src/obikrope/src/cursor.rs b/src/obikrope/src/cursor.rs index 480fc06..3d1719c 100644 --- a/src/obikrope/src/cursor.rs +++ b/src/obikrope/src/cursor.rs @@ -194,7 +194,10 @@ pub trait RopeCursor<'a> { /// Use the returned value with [`SeekMode::Rope`] to restore a position, /// or as a truncation point after a write pass. fn rope_tell(&self) -> usize { - self.state().current.get().unwrap_or(self.state().offset.get()) + self.state() + .current + .get() + .unwrap_or(self.state().offset.get()) } /// Number of bytes visible through this cursor (`rope.len() - offset`). @@ -528,13 +531,13 @@ mod tests { use crate::Rope; fn rope(data: &[u8]) -> Rope { - let mut r = Rope::new(); + let mut r = Rope::new(None); r.push(data.to_vec()); r } fn rope2(a: &[u8], b: &[u8]) -> Rope { - let mut r = Rope::new(); + let mut r = Rope::new(None); r.push(a.to_vec()); r.push(b.to_vec()); r @@ -703,14 +706,14 @@ mod tests { #[test] fn backward_empty_rope_returns_error() { - let r = Rope::new(); + let r = Rope::new(None); let c = r.bw_cursor(); assert!(c.read_next().is_err()); } #[test] fn forward_empty_rope_returns_error() { - let r = Rope::new(); + let r = Rope::new(None); let c = r.fw_cursor(); assert!(c.read_next().is_err()); } diff --git a/src/obikrope/src/rope.rs b/src/obikrope/src/rope.rs index 0045a22..ffcbaef 100644 --- a/src/obikrope/src/rope.rs +++ b/src/obikrope/src/rope.rs @@ -28,6 +28,7 @@ use std::cell::Cell; /// /// See the [module-level documentation][crate::rope] for a full overview. pub struct Rope { + pub(crate) mime_type: Option<&'static str>, pub(crate) blocks: Vec>>, pub(crate) length: usize, pub(crate) start_block_idx: Vec, @@ -35,8 +36,9 @@ pub struct Rope { impl Rope { /// Create an empty rope (no allocations). - pub fn new() -> Self { + pub fn new(mime_type: Option<&'static str>) -> Self { Self { + mime_type, blocks: Vec::new(), length: 0, start_block_idx: Vec::new(), @@ -59,6 +61,11 @@ impl Rope { self.length += block_len; } + /// The MIME type of the rope, if known. + pub fn mime_type(&self) -> Option<&str> { + self.mime_type.as_deref() + } + /// Total number of blocks. pub fn n_blocks(&self) -> usize { self.blocks.len() @@ -115,7 +122,7 @@ impl Rope { } if pos == self.length { - return Ok(Rope::new()); + return Ok(Rope::new(self.mime_type.clone())); } let (block_idx, from, _) = self.lookup(pos).ok_or_else(|| { @@ -149,6 +156,7 @@ impl Rope { self.length = pos; Ok(Rope { + mime_type: self.mime_type.clone(), blocks: tail_blocks, length: tail_length, start_block_idx: tail_starts, @@ -185,13 +193,13 @@ mod tests { } fn make(data: &[u8]) -> Rope { - let mut r = Rope::new(); + let mut r = Rope::new(None); r.push(data.to_vec()); r } fn make2(a: &[u8], b: &[u8]) -> Rope { - let mut r = Rope::new(); + let mut r = Rope::new(None); r.push(a.to_vec()); r.push(b.to_vec()); r @@ -201,7 +209,7 @@ mod tests { #[test] fn empty_rope_is_empty() { - let r = Rope::new(); + let r = Rope::new(None); assert!(r.is_empty()); assert_eq!(r.len(), 0); assert_eq!(r.n_blocks(), 0); @@ -213,6 +221,7 @@ mod tests { assert!(!r.is_empty()); assert_eq!(r.len(), 5); assert_eq!(r.n_blocks(), 1); + assert_eq!(r.mime_type(), None); } #[test] @@ -220,6 +229,7 @@ mod tests { let r = make2(b"abc", b"de"); assert_eq!(r.len(), 5); assert_eq!(r.n_blocks(), 2); + assert_eq!(r.mime_type(), None); } #[test] @@ -261,7 +271,7 @@ mod tests { #[test] fn lookup_empty_rope_returns_none() { - assert!(Rope::new().lookup(0).is_none()); + assert!(Rope::new(None).lookup(0).is_none()); } #[test] diff --git a/src/obiread/src/chunk.rs b/src/obiread/src/chunk.rs index 47cf7a2..691e6d9 100644 --- a/src/obiread/src/chunk.rs +++ b/src/obiread/src/chunk.rs @@ -3,6 +3,12 @@ //! Each `Rope` yielded by [`SeqChunkIter`] contains one or more blocks that //! together form a self-contained block of complete sequence records. +use crate::DEFAULT_BLOCK_SIZE; +use crate::MimeTypeGuesser; +use crate::fasta; +use crate::fastq; +use crate::xopen::xopen; + use std::io::{self, Read}; use obikrope::Rope; @@ -14,19 +20,26 @@ pub type Splitter = fn(&Rope) -> Option; /// Iterator that reads from `R` in blocks and yields `Rope` chunks, /// each ending on a complete sequence record boundary. pub struct SeqChunkIter { - source: R, - rope: Rope, + mime_type: Option<&'static str>, + source: R, + rope: Rope, block_size: usize, - splitter: Splitter, - eof: bool, + splitter: Splitter, + eof: bool, } impl SeqChunkIter { /// Create a new iterator. - pub fn new(source: R, block_size: usize, splitter: Splitter) -> Self { + pub fn new( + source: R, + block_size: usize, + splitter: Splitter, + mime_type: Option<&'static str>, + ) -> Self { Self { + mime_type, source, - rope: Rope::new(), + rope: Rope::new(mime_type), block_size, splitter, eof: false, @@ -66,7 +79,10 @@ impl Iterator for SeqChunkIter { if self.rope.is_empty() { return None; } - return Some(Ok(std::mem::replace(&mut self.rope, Rope::new()))); + return Some(Ok(std::mem::replace( + &mut self.rope, + Rope::new(self.mime_type), + ))); } match self.read_block() { @@ -81,7 +97,9 @@ impl Iterator for SeqChunkIter { } if let Some(abs_offset) = (self.splitter)(&self.rope) { - let tail = self.rope.split_off(abs_offset) + let tail = self + .rope + .split_off(abs_offset) .expect("splitter returned valid offset"); let chunk = std::mem::replace(&mut self.rope, tail); return Some(Ok(chunk)); @@ -90,6 +108,88 @@ impl Iterator for SeqChunkIter { } } +/// Open `path` and iterate over its FASTA records as chunks. +pub fn read_fasta_chunks( + path: &str, +) -> io::Result>>> { + let input = match xopen(path) { + Ok(mut i) => { + if i.mime_type() == Some("text/fasta") { + i + } else { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "not a FASTA file", + )); + } + } + Err(e) => return Err(e), + }; + Ok(fasta_chunks(input)) +} + +/// Open `path` and iterate over its FASTQ records as chunks. +pub fn read_fastq_chunks( + path: &str, +) -> io::Result>>> { + let input = match xopen(path) { + Ok(mut i) => { + if i.mime_type() == Some("text/fastq") { + i + } else { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "not a FASTQ file", + )); + } + } + Err(e) => return Err(e), + }; + Ok(fastq_chunks(input)) +} + +/// Open `path` and auto-detect whether it is FASTA or FASTQ, then iterate over its records. +/// +/// Returns an error if the format cannot be identified as `text/fasta` or `text/fastq`. +pub fn read_sequence_chunks( + path: &str, +) -> io::Result>>> { + let input = match xopen(path) { + Ok(mut i) => match i.mime_type() { + Some("text/fasta") => fasta_chunks(i), + Some("text/fastq") => fastq_chunks(i), + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "not a FASTA or FASTQ file", + )); + } + }, + Err(e) => return Err(e), + }; + Ok(input) +} + +/// Create a FASTA chunk iterator over `source`. +pub fn fasta_chunks(source: R) -> SeqChunkIter { + SeqChunkIter::new( + source, + DEFAULT_BLOCK_SIZE, + fasta::end_of_last_fasta_entry, + Some("text/fasta"), + ) +} + +/// Create a FASTQ chunk iterator over `source`. +pub fn fastq_chunks(source: R) -> SeqChunkIter { + SeqChunkIter::new( + source, + DEFAULT_BLOCK_SIZE, + fastq::end_of_last_fastq_entry, + Some("text/fastq"), + ) +} + #[cfg(test)] mod tests { use super::*; @@ -97,11 +197,11 @@ mod tests { use crate::fastq::end_of_last_fastq_entry; fn fasta_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> { - SeqChunkIter::new(data, block_size, end_of_last_fasta_entry) + SeqChunkIter::new(data, block_size, end_of_last_fasta_entry, None) } fn fastq_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> { - SeqChunkIter::new(data, block_size, end_of_last_fastq_entry) + SeqChunkIter::new(data, block_size, end_of_last_fastq_entry, None) } fn rope_to_vec(rope: &Rope) -> Vec { @@ -134,7 +234,11 @@ mod tests { for rope in &chunks { let flat = rope_to_vec(rope); assert_eq!(flat[0], b'>', "block={block}: chunk doesn't start with '>'"); - assert_eq!(*flat.last().unwrap(), b'\n', "block={block}: chunk doesn't end with newline"); + assert_eq!( + *flat.last().unwrap(), + b'\n', + "block={block}: chunk doesn't end with newline" + ); } } } @@ -163,10 +267,10 @@ mod tests { #[test] fn fastq_at_in_quality_handled() { - let data = Box::leak(make_fastq(&[ - (b"ACGTACGT", b"@@@@IIII"), - (b"TTTTTTTT", b"HHHHHHHH"), - ]).into_boxed_slice()); + let data = Box::leak( + make_fastq(&[(b"ACGTACGT", b"@@@@IIII"), (b"TTTTTTTT", b"HHHHHHHH")]) + .into_boxed_slice(), + ); let chunks: Vec<_> = fastq_iter(data, 16).collect::>().unwrap(); let all: Vec = chunks.iter().flat_map(|r| rope_to_vec(r)).collect(); assert_eq!(all, *data); @@ -174,17 +278,23 @@ mod tests { #[test] fn fastq_each_chunk_starts_with_at() { - let data = Box::leak(make_fastq(&[ - (b"ACGT", b"IIII"), - (b"CCCC", b"JJJJ"), - (b"GGGG", b"KKKK"), - (b"TTTT", b"LLLL"), - ]).into_boxed_slice()); + let data = Box::leak( + make_fastq(&[ + (b"ACGT", b"IIII"), + (b"CCCC", b"JJJJ"), + (b"GGGG", b"KKKK"), + (b"TTTT", b"LLLL"), + ]) + .into_boxed_slice(), + ); for block in [18, 30, 60] { let chunks: Vec<_> = fastq_iter(data, block).collect::>().unwrap(); for rope in &chunks { let first_byte = rope_to_vec(rope)[0]; - assert_eq!(first_byte, b'@', "block={block}: chunk doesn't start with '@'"); + assert_eq!( + first_byte, b'@', + "block={block}: chunk doesn't start with '@'" + ); } } } diff --git a/src/obiread/src/fasta.rs b/src/obiread/src/fasta.rs index 4dd4602..b426ed6 100644 --- a/src/obiread/src/fasta.rs +++ b/src/obiread/src/fasta.rs @@ -39,13 +39,13 @@ mod tests { use super::*; fn rope(data: &[u8]) -> Rope { - let mut r = Rope::new(); + let mut r = Rope::new(None); r.push(data.to_vec()); r } fn rope2(a: &[u8], b: &[u8]) -> Rope { - let mut r = Rope::new(); + let mut r = Rope::new(None); r.push(a.to_vec()); r.push(b.to_vec()); r diff --git a/src/obiread/src/fastq.rs b/src/obiread/src/fastq.rs index 6f45ea8..39d3665 100644 --- a/src/obiread/src/fastq.rs +++ b/src/obiread/src/fastq.rs @@ -111,7 +111,7 @@ mod tests { use super::*; fn rope(data: &[u8]) -> Rope { - let mut r = Rope::new(); + let mut r = Rope::new(None); r.push(data.to_vec()); r } @@ -145,30 +145,32 @@ mod tests { let r = rope(&buf); let pos = end_of_last_fastq_entry(&r).unwrap(); assert_eq!(flat(&r)[pos], b'@'); - assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()); + assert_eq!( + &flat(&r)[pos..], + make_fastq(&[(b"TTTT", b"HHHH")]).as_slice() + ); } #[test] fn three_records_cuts_at_last() { - let buf = make_fastq(&[ - (b"ACGT", b"IIII"), - (b"CCCC", b"JJJJ"), - (b"GGGG", b"KKKK"), - ]); + let buf = make_fastq(&[(b"ACGT", b"IIII"), (b"CCCC", b"JJJJ"), (b"GGGG", b"KKKK")]); let r = rope(&buf); let pos = end_of_last_fastq_entry(&r).unwrap(); - assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"GGGG", b"KKKK")]).as_slice()); + assert_eq!( + &flat(&r)[pos..], + make_fastq(&[(b"GGGG", b"KKKK")]).as_slice() + ); } #[test] fn at_sign_in_quality_does_not_confuse() { - let buf = make_fastq(&[ - (b"ACGTACGT", b"@@@@IIII"), - (b"TTTT", b"HHHH"), - ]); + let buf = make_fastq(&[(b"ACGTACGT", b"@@@@IIII"), (b"TTTT", b"HHHH")]); let r = rope(&buf); let pos = end_of_last_fastq_entry(&r).unwrap(); - assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()); + assert_eq!( + &flat(&r)[pos..], + make_fastq(&[(b"TTTT", b"HHHH")]).as_slice() + ); } #[test] diff --git a/src/obiread/src/lib.rs b/src/obiread/src/lib.rs index 047d0b8..a07d250 100644 --- a/src/obiread/src/lib.rs +++ b/src/obiread/src/lib.rs @@ -8,29 +8,19 @@ pub mod chunk; mod fasta; mod fastq; -pub mod mimetype; +mod mimetype; pub mod normalize; mod path_iterator; pub mod peakreader; pub mod xopen; +pub use chunk::{SeqChunkIter, fasta_chunks, fastq_chunks, + read_fasta_chunks, read_fastq_chunks, read_sequence_chunks}; +pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk}; +pub use mimetype::MimeTypeGuesser; pub use path_iterator::{PathIter, path_iter}; pub use peakreader::PeekReader; - -use std::io::Read; - -use chunk::SeqChunkIter; pub use xopen::xopen; /// Default read block size: 1 MiB. pub const DEFAULT_BLOCK_SIZE: usize = 1024 * 1024; - -/// Create a FASTA chunk iterator over `source`. -pub fn fasta_chunks(source: R) -> SeqChunkIter { - SeqChunkIter::new(source, DEFAULT_BLOCK_SIZE, fasta::end_of_last_fasta_entry) -} - -/// Create a FASTQ chunk iterator over `source`. -pub fn fastq_chunks(source: R) -> SeqChunkIter { - SeqChunkIter::new(source, DEFAULT_BLOCK_SIZE, fastq::end_of_last_fastq_entry) -} diff --git a/src/obiread/src/mimetype.rs b/src/obiread/src/mimetype.rs index 78a66c7..906b627 100644 --- a/src/obiread/src/mimetype.rs +++ b/src/obiread/src/mimetype.rs @@ -1,3 +1,4 @@ +/// A struct that guesses the MIME type of a reader by peeking at the first few bytes. use std::io; use std::sync::LazyLock; @@ -35,13 +36,16 @@ static INFER: LazyLock = LazyLock::new(|| { infer }); +/// A struct that guesses the MIME type of a reader by peeking at the first few bytes. pub struct MimeTypeGuesser(PeekReader); impl MimeTypeGuesser { + /// Creates a new MimeTypeGuesser that wraps the given reader. pub fn new(reader: R) -> Self { Self(PeekReader::new(reader, BUF_SIZE)) } + /// Returns the detected MIME type, if any. pub fn mime_type(&mut self) -> Option<&'static str> { let buf = self.0.header(); INFER.get(buf).map(|kind| kind.mime_type()) diff --git a/src/obiread/src/normalize.rs b/src/obiread/src/normalize.rs index 6f37639..b92eb41 100644 --- a/src/obiread/src/normalize.rs +++ b/src/obiread/src/normalize.rs @@ -17,10 +17,31 @@ //! //! After the automaton returns, `wc.rope_start()` is the rope truncation point. +use std::io; + use obikrope::{ForwardCursor, Rope, RopeCursor}; // ── public entry points ─────────────────────────────────────────────────────── +/// Normalise a sequence chunk into a compact ACGT\x00-separated rope, +/// dispatching to the FASTA or FASTQ automaton based on the rope's mime type. +/// +/// Returns an error if the rope carries no mime type or an unsupported one. +pub fn normalize_sequence_chunk(rope: Rope, k: usize) -> io::Result { + match rope.mime_type() { + Some("text/fasta") => Ok(normalize_fasta_chunk(rope, k)), + Some("text/fastq") => Ok(normalize_fastq_chunk(rope, k)), + Some(other) => Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unsupported sequence format: {other}"), + )), + None => Err(io::Error::new( + io::ErrorKind::InvalidData, + "rope has no mime type", + )), + } +} + /// Normalise a FASTA chunk into a compact ACGT\x00-separated rope. pub fn normalize_fasta_chunk(mut rope: Rope, k: usize) -> Rope { let end = { @@ -47,7 +68,17 @@ pub fn normalize_fastq_chunk(mut rope: Rope, k: usize) -> Rope { // ── FASTA automaton ─────────────────────────────────────────────────────────── -fn normalize_fasta(read: &ForwardCursor<'_>, mut wc: &mut ForwardCursor<'_>, k: usize) { +/// Normalise a FASTA chunk into a compact ACGT\x00-separated rope. +/// +/// # Arguments +/// +/// * `read` - The input FASTA chunk cursor. +/// * `wc` - The output rope cursor. +/// * `k` - The k-mer size. +/// +/// The k-mer size is used to determine the minimum length of Sequence +/// to be considered valid. +fn normalize_fasta(read: &ForwardCursor<'_>, wc: &mut ForwardCursor<'_>, k: usize) { skip_line(read); loop { @@ -85,6 +116,17 @@ fn normalize_fasta(read: &ForwardCursor<'_>, mut wc: &mut ForwardCursor<'_>, k: // ── FASTQ automaton ─────────────────────────────────────────────────────────── +/// Normalizes a FASTQ read by skipping the header and sequence lines, +/// and writing the sequence to `wc`. +/// +/// # Arguments +/// +/// * `read` - The input FASTA chunk cursor. +/// * `wc` - The output rope cursor. +/// * `k` - The k-mer size. +/// +/// The k-mer size is used to determine the minimum length of Sequence +/// to be considered valid. fn normalize_fastq(read: &ForwardCursor<'_>, mut wc: &mut ForwardCursor<'_>, k: usize) { loop { skip_line(read); // skip header @@ -177,7 +219,7 @@ mod tests { use super::*; fn make_rope(data: &[u8]) -> Rope { - let mut r = Rope::new(); + let mut r = Rope::new(None); r.push(data.to_vec()); r } @@ -306,7 +348,7 @@ mod tests { fn multi_slice_rope() { let data = make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]); let mid = data.len() / 2; - let mut rope = Rope::new(); + let mut rope = Rope::new(None); rope.push(data[..mid].to_vec()); rope.push(data[mid..].to_vec()); assert_eq!( @@ -400,7 +442,7 @@ mod tests { fn fasta_multi_slice_rope() { let data = make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]); let mid = data.len() / 2; - let mut rope = Rope::new(); + let mut rope = Rope::new(None); rope.push(data[..mid].to_vec()); rope.push(data[mid..].to_vec()); assert_eq!( diff --git a/src/obiread/src/xopen.rs b/src/obiread/src/xopen.rs index 38ebbb4..f5dbbf0 100644 --- a/src/obiread/src/xopen.rs +++ b/src/obiread/src/xopen.rs @@ -12,8 +12,9 @@ //! | `http://` or `https://` | HTTP GET via `ureq` | //! | anything else | local file; `~/` is expanded to the home directory | -use std::io::{self, Read}; +use crate::mimetype::MimeTypeGuesser; use std::fs::File; +use std::io::{self, Read}; // ── public API ──────────────────────────────────────────────────────────────── @@ -25,18 +26,17 @@ use std::fs::File; /// # Errors /// Returns an `io::Error` if the file cannot be opened, the URL cannot be /// fetched, or the compression header is malformed. -pub fn xopen(source: &str) -> io::Result> { +pub fn xopen(source: &str) -> io::Result>> { let raw: Box = match source { "-" => Box::new(io::stdin()), - s if s.starts_with("http://") || s.starts_with("https://") => { - http_reader(s)? - } + s if s.starts_with("http://") || s.starts_with("https://") => http_reader(s)?, path => { let expanded = expand_tilde(path); Box::new(File::open(expanded.as_ref())?) } }; - decompress(raw) + let decompressed = decompress(raw)?; + Ok(MimeTypeGuesser::new(decompressed)) } // ── internal helpers ────────────────────────────────────────────────────────── diff --git a/src/obiskbuilder/src/iter.rs b/src/obiskbuilder/src/iter.rs index d6a06fb..0a64b12 100644 --- a/src/obiskbuilder/src/iter.rs +++ b/src/obiskbuilder/src/iter.rs @@ -156,7 +156,7 @@ mod tests { use obikrope::Rope; fn make_rope(data: &[u8]) -> Rope { - let mut r = Rope::new(); + let mut r = Rope::new(None); r.push(data.to_vec()); r } @@ -210,7 +210,7 @@ mod tests { fn multi_slice_rope() { let data = b"ACGTACGTACGT\x00"; let mid = data.len() / 2; - let mut rope = Rope::new(); + let mut rope = Rope::new(None); rope.push(data[..mid].to_vec()); rope.push(data[mid..].to_vec()); let out: Vec> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0)