⬆️ refactor superkmer to use obipipeline

- Replace manual threading with Pipeline abstraction from `obipipline`
- Remove crossbeam-channel dependency and format detection logic
- Introduce typed `PipelineData` enum for pipeline stages (RawChunk, Norm Chunk, Batch)
- Implement shared normalization and extraction steps as `SharedFn`ƒ
  - Add unsafe Send/Sync impls for PipelineData (Rope ownership is moved, not shared)
- Replace manual reader/worker/output threads with a single Pipeline execution
  - Uses `make_source_fallible!`, shared transform functions, and a sink for output
- Simplify argument handling (remove `--format` flag)
  - Update Cargo.toml: remove crossbeam-channel, add obipipeline
This commit is contained in:
Eric Coissac
2026-04-24 18:14:00 +02:00
parent 75bf980046
commit f1c8fc85c9
10 changed files with 236 additions and 75 deletions
+8 -5
View File
@@ -194,7 +194,10 @@ pub trait RopeCursor<'a> {
/// Use the returned value with [`SeekMode::Rope`] to restore a position,
/// or as a truncation point after a write pass.
fn rope_tell(&self) -> usize {
self.state().current.get().unwrap_or(self.state().offset.get())
self.state()
.current
.get()
.unwrap_or(self.state().offset.get())
}
/// Number of bytes visible through this cursor (`rope.len() - offset`).
@@ -528,13 +531,13 @@ mod tests {
use crate::Rope;
fn rope(data: &[u8]) -> Rope {
let mut r = Rope::new();
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
fn rope2(a: &[u8], b: &[u8]) -> Rope {
let mut r = Rope::new();
let mut r = Rope::new(None);
r.push(a.to_vec());
r.push(b.to_vec());
r
@@ -703,14 +706,14 @@ mod tests {
#[test]
fn backward_empty_rope_returns_error() {
let r = Rope::new();
let r = Rope::new(None);
let c = r.bw_cursor();
assert!(c.read_next().is_err());
}
#[test]
fn forward_empty_rope_returns_error() {
let r = Rope::new();
let r = Rope::new(None);
let c = r.fw_cursor();
assert!(c.read_next().is_err());
}
+16 -6
View File
@@ -28,6 +28,7 @@ use std::cell::Cell;
///
/// See the [module-level documentation][crate::rope] for a full overview.
pub struct Rope {
pub(crate) mime_type: Option<&'static str>,
pub(crate) blocks: Vec<Vec<Cell<u8>>>,
pub(crate) length: usize,
pub(crate) start_block_idx: Vec<usize>,
@@ -35,8 +36,9 @@ pub struct Rope {
impl Rope {
/// Create an empty rope (no allocations).
pub fn new() -> Self {
pub fn new(mime_type: Option<&'static str>) -> Self {
Self {
mime_type,
blocks: Vec::new(),
length: 0,
start_block_idx: Vec::new(),
@@ -59,6 +61,11 @@ impl Rope {
self.length += block_len;
}
/// The MIME type of the rope, if known.
pub fn mime_type(&self) -> Option<&str> {
self.mime_type.as_deref()
}
/// Total number of blocks.
pub fn n_blocks(&self) -> usize {
self.blocks.len()
@@ -115,7 +122,7 @@ impl Rope {
}
if pos == self.length {
return Ok(Rope::new());
return Ok(Rope::new(self.mime_type.clone()));
}
let (block_idx, from, _) = self.lookup(pos).ok_or_else(|| {
@@ -149,6 +156,7 @@ impl Rope {
self.length = pos;
Ok(Rope {
mime_type: self.mime_type.clone(),
blocks: tail_blocks,
length: tail_length,
start_block_idx: tail_starts,
@@ -185,13 +193,13 @@ mod tests {
}
fn make(data: &[u8]) -> Rope {
let mut r = Rope::new();
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
fn make2(a: &[u8], b: &[u8]) -> Rope {
let mut r = Rope::new();
let mut r = Rope::new(None);
r.push(a.to_vec());
r.push(b.to_vec());
r
@@ -201,7 +209,7 @@ mod tests {
#[test]
fn empty_rope_is_empty() {
let r = Rope::new();
let r = Rope::new(None);
assert!(r.is_empty());
assert_eq!(r.len(), 0);
assert_eq!(r.n_blocks(), 0);
@@ -213,6 +221,7 @@ mod tests {
assert!(!r.is_empty());
assert_eq!(r.len(), 5);
assert_eq!(r.n_blocks(), 1);
assert_eq!(r.mime_type(), None);
}
#[test]
@@ -220,6 +229,7 @@ mod tests {
let r = make2(b"abc", b"de");
assert_eq!(r.len(), 5);
assert_eq!(r.n_blocks(), 2);
assert_eq!(r.mime_type(), None);
}
#[test]
@@ -261,7 +271,7 @@ mod tests {
#[test]
fn lookup_empty_rope_returns_none() {
assert!(Rope::new().lookup(0).is_none());
assert!(Rope::new(None).lookup(0).is_none());
}
#[test]
+132 -22
View File
@@ -3,6 +3,12 @@
//! Each `Rope` yielded by [`SeqChunkIter`] contains one or more blocks that
//! together form a self-contained block of complete sequence records.
use crate::DEFAULT_BLOCK_SIZE;
use crate::MimeTypeGuesser;
use crate::fasta;
use crate::fastq;
use crate::xopen::xopen;
use std::io::{self, Read};
use obikrope::Rope;
@@ -14,19 +20,26 @@ pub type Splitter = fn(&Rope) -> Option<usize>;
/// Iterator that reads from `R` in blocks and yields `Rope` chunks,
/// each ending on a complete sequence record boundary.
pub struct SeqChunkIter<R> {
source: R,
rope: Rope,
mime_type: Option<&'static str>,
source: R,
rope: Rope,
block_size: usize,
splitter: Splitter,
eof: bool,
splitter: Splitter,
eof: bool,
}
impl<R: Read> SeqChunkIter<R> {
/// Create a new iterator.
pub fn new(source: R, block_size: usize, splitter: Splitter) -> Self {
pub fn new(
source: R,
block_size: usize,
splitter: Splitter,
mime_type: Option<&'static str>,
) -> Self {
Self {
mime_type,
source,
rope: Rope::new(),
rope: Rope::new(mime_type),
block_size,
splitter,
eof: false,
@@ -66,7 +79,10 @@ impl<R: Read> Iterator for SeqChunkIter<R> {
if self.rope.is_empty() {
return None;
}
return Some(Ok(std::mem::replace(&mut self.rope, Rope::new())));
return Some(Ok(std::mem::replace(
&mut self.rope,
Rope::new(self.mime_type),
)));
}
match self.read_block() {
@@ -81,7 +97,9 @@ impl<R: Read> Iterator for SeqChunkIter<R> {
}
if let Some(abs_offset) = (self.splitter)(&self.rope) {
let tail = self.rope.split_off(abs_offset)
let tail = self
.rope
.split_off(abs_offset)
.expect("splitter returned valid offset");
let chunk = std::mem::replace(&mut self.rope, tail);
return Some(Ok(chunk));
@@ -90,6 +108,88 @@ impl<R: Read> Iterator for SeqChunkIter<R> {
}
}
/// Open `path` and iterate over its FASTA records as chunks.
pub fn read_fasta_chunks(
path: &str,
) -> io::Result<SeqChunkIter<MimeTypeGuesser<Box<dyn Read + Send>>>> {
let input = match xopen(path) {
Ok(mut i) => {
if i.mime_type() == Some("text/fasta") {
i
} else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"not a FASTA file",
));
}
}
Err(e) => return Err(e),
};
Ok(fasta_chunks(input))
}
/// Open `path` and iterate over its FASTQ records as chunks.
pub fn read_fastq_chunks(
path: &str,
) -> io::Result<SeqChunkIter<MimeTypeGuesser<Box<dyn Read + Send>>>> {
let input = match xopen(path) {
Ok(mut i) => {
if i.mime_type() == Some("text/fastq") {
i
} else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"not a FASTQ file",
));
}
}
Err(e) => return Err(e),
};
Ok(fastq_chunks(input))
}
/// Open `path` and auto-detect whether it is FASTA or FASTQ, then iterate over its records.
///
/// Returns an error if the format cannot be identified as `text/fasta` or `text/fastq`.
pub fn read_sequence_chunks(
path: &str,
) -> io::Result<SeqChunkIter<MimeTypeGuesser<Box<dyn Read + Send>>>> {
let input = match xopen(path) {
Ok(mut i) => match i.mime_type() {
Some("text/fasta") => fasta_chunks(i),
Some("text/fastq") => fastq_chunks(i),
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"not a FASTA or FASTQ file",
));
}
},
Err(e) => return Err(e),
};
Ok(input)
}
/// Create a FASTA chunk iterator over `source`.
pub fn fasta_chunks<R: Read>(source: R) -> SeqChunkIter<R> {
SeqChunkIter::new(
source,
DEFAULT_BLOCK_SIZE,
fasta::end_of_last_fasta_entry,
Some("text/fasta"),
)
}
/// Create a FASTQ chunk iterator over `source`.
pub fn fastq_chunks<R: Read>(source: R) -> SeqChunkIter<R> {
SeqChunkIter::new(
source,
DEFAULT_BLOCK_SIZE,
fastq::end_of_last_fastq_entry,
Some("text/fastq"),
)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -97,11 +197,11 @@ mod tests {
use crate::fastq::end_of_last_fastq_entry;
fn fasta_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> {
SeqChunkIter::new(data, block_size, end_of_last_fasta_entry)
SeqChunkIter::new(data, block_size, end_of_last_fasta_entry, None)
}
fn fastq_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> {
SeqChunkIter::new(data, block_size, end_of_last_fastq_entry)
SeqChunkIter::new(data, block_size, end_of_last_fastq_entry, None)
}
fn rope_to_vec(rope: &Rope) -> Vec<u8> {
@@ -134,7 +234,11 @@ mod tests {
for rope in &chunks {
let flat = rope_to_vec(rope);
assert_eq!(flat[0], b'>', "block={block}: chunk doesn't start with '>'");
assert_eq!(*flat.last().unwrap(), b'\n', "block={block}: chunk doesn't end with newline");
assert_eq!(
*flat.last().unwrap(),
b'\n',
"block={block}: chunk doesn't end with newline"
);
}
}
}
@@ -163,10 +267,10 @@ mod tests {
#[test]
fn fastq_at_in_quality_handled() {
let data = Box::leak(make_fastq(&[
(b"ACGTACGT", b"@@@@IIII"),
(b"TTTTTTTT", b"HHHHHHHH"),
]).into_boxed_slice());
let data = Box::leak(
make_fastq(&[(b"ACGTACGT", b"@@@@IIII"), (b"TTTTTTTT", b"HHHHHHHH")])
.into_boxed_slice(),
);
let chunks: Vec<_> = fastq_iter(data, 16).collect::<Result<_, _>>().unwrap();
let all: Vec<u8> = chunks.iter().flat_map(|r| rope_to_vec(r)).collect();
assert_eq!(all, *data);
@@ -174,17 +278,23 @@ mod tests {
#[test]
fn fastq_each_chunk_starts_with_at() {
let data = Box::leak(make_fastq(&[
(b"ACGT", b"IIII"),
(b"CCCC", b"JJJJ"),
(b"GGGG", b"KKKK"),
(b"TTTT", b"LLLL"),
]).into_boxed_slice());
let data = Box::leak(
make_fastq(&[
(b"ACGT", b"IIII"),
(b"CCCC", b"JJJJ"),
(b"GGGG", b"KKKK"),
(b"TTTT", b"LLLL"),
])
.into_boxed_slice(),
);
for block in [18, 30, 60] {
let chunks: Vec<_> = fastq_iter(data, block).collect::<Result<_, _>>().unwrap();
for rope in &chunks {
let first_byte = rope_to_vec(rope)[0];
assert_eq!(first_byte, b'@', "block={block}: chunk doesn't start with '@'");
assert_eq!(
first_byte, b'@',
"block={block}: chunk doesn't start with '@'"
);
}
}
}
+2 -2
View File
@@ -39,13 +39,13 @@ mod tests {
use super::*;
fn rope(data: &[u8]) -> Rope {
let mut r = Rope::new();
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
fn rope2(a: &[u8], b: &[u8]) -> Rope {
let mut r = Rope::new();
let mut r = Rope::new(None);
r.push(a.to_vec());
r.push(b.to_vec());
r
+15 -13
View File
@@ -111,7 +111,7 @@ mod tests {
use super::*;
fn rope(data: &[u8]) -> Rope {
let mut r = Rope::new();
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
@@ -145,30 +145,32 @@ mod tests {
let r = rope(&buf);
let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(flat(&r)[pos], b'@');
assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice());
assert_eq!(
&flat(&r)[pos..],
make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()
);
}
#[test]
fn three_records_cuts_at_last() {
let buf = make_fastq(&[
(b"ACGT", b"IIII"),
(b"CCCC", b"JJJJ"),
(b"GGGG", b"KKKK"),
]);
let buf = make_fastq(&[(b"ACGT", b"IIII"), (b"CCCC", b"JJJJ"), (b"GGGG", b"KKKK")]);
let r = rope(&buf);
let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"GGGG", b"KKKK")]).as_slice());
assert_eq!(
&flat(&r)[pos..],
make_fastq(&[(b"GGGG", b"KKKK")]).as_slice()
);
}
#[test]
fn at_sign_in_quality_does_not_confuse() {
let buf = make_fastq(&[
(b"ACGTACGT", b"@@@@IIII"),
(b"TTTT", b"HHHH"),
]);
let buf = make_fastq(&[(b"ACGTACGT", b"@@@@IIII"), (b"TTTT", b"HHHH")]);
let r = rope(&buf);
let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice());
assert_eq!(
&flat(&r)[pos..],
make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()
);
}
#[test]
+5 -15
View File
@@ -8,29 +8,19 @@
pub mod chunk;
mod fasta;
mod fastq;
pub mod mimetype;
mod mimetype;
pub mod normalize;
mod path_iterator;
pub mod peakreader;
pub mod xopen;
pub use chunk::{SeqChunkIter, fasta_chunks, fastq_chunks,
read_fasta_chunks, read_fastq_chunks, read_sequence_chunks};
pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk};
pub use mimetype::MimeTypeGuesser;
pub use path_iterator::{PathIter, path_iter};
pub use peakreader::PeekReader;
use std::io::Read;
use chunk::SeqChunkIter;
pub use xopen::xopen;
/// Default read block size: 1 MiB.
pub const DEFAULT_BLOCK_SIZE: usize = 1024 * 1024;
/// Create a FASTA chunk iterator over `source`.
pub fn fasta_chunks<R: Read>(source: R) -> SeqChunkIter<R> {
SeqChunkIter::new(source, DEFAULT_BLOCK_SIZE, fasta::end_of_last_fasta_entry)
}
/// Create a FASTQ chunk iterator over `source`.
pub fn fastq_chunks<R: Read>(source: R) -> SeqChunkIter<R> {
SeqChunkIter::new(source, DEFAULT_BLOCK_SIZE, fastq::end_of_last_fastq_entry)
}
+4
View File
@@ -1,3 +1,4 @@
/// A struct that guesses the MIME type of a reader by peeking at the first few bytes.
use std::io;
use std::sync::LazyLock;
@@ -35,13 +36,16 @@ static INFER: LazyLock<Infer> = LazyLock::new(|| {
infer
});
/// A struct that guesses the MIME type of a reader by peeking at the first few bytes.
pub struct MimeTypeGuesser<R: std::io::Read>(PeekReader<R>);
impl<R: std::io::Read> MimeTypeGuesser<R> {
/// Creates a new MimeTypeGuesser that wraps the given reader.
pub fn new(reader: R) -> Self {
Self(PeekReader::new(reader, BUF_SIZE))
}
/// Returns the detected MIME type, if any.
pub fn mime_type(&mut self) -> Option<&'static str> {
let buf = self.0.header();
INFER.get(buf).map(|kind| kind.mime_type())
+46 -4
View File
@@ -17,10 +17,31 @@
//!
//! After the automaton returns, `wc.rope_start()` is the rope truncation point.
use std::io;
use obikrope::{ForwardCursor, Rope, RopeCursor};
// ── public entry points ───────────────────────────────────────────────────────
/// Normalise a sequence chunk into a compact ACGT\x00-separated rope,
/// dispatching to the FASTA or FASTQ automaton based on the rope's mime type.
///
/// Returns an error if the rope carries no mime type or an unsupported one.
pub fn normalize_sequence_chunk(rope: Rope, k: usize) -> io::Result<Rope> {
match rope.mime_type() {
Some("text/fasta") => Ok(normalize_fasta_chunk(rope, k)),
Some("text/fastq") => Ok(normalize_fastq_chunk(rope, k)),
Some(other) => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unsupported sequence format: {other}"),
)),
None => Err(io::Error::new(
io::ErrorKind::InvalidData,
"rope has no mime type",
)),
}
}
/// Normalise a FASTA chunk into a compact ACGT\x00-separated rope.
pub fn normalize_fasta_chunk(mut rope: Rope, k: usize) -> Rope {
let end = {
@@ -47,7 +68,17 @@ pub fn normalize_fastq_chunk(mut rope: Rope, k: usize) -> Rope {
// ── FASTA automaton ───────────────────────────────────────────────────────────
fn normalize_fasta(read: &ForwardCursor<'_>, mut wc: &mut ForwardCursor<'_>, k: usize) {
/// Normalise a FASTA chunk into a compact ACGT\x00-separated rope.
///
/// # Arguments
///
/// * `read` - The input FASTA chunk cursor.
/// * `wc` - The output rope cursor.
/// * `k` - The k-mer size.
///
/// The k-mer size is used to determine the minimum length of Sequence
/// to be considered valid.
fn normalize_fasta(read: &ForwardCursor<'_>, wc: &mut ForwardCursor<'_>, k: usize) {
skip_line(read);
loop {
@@ -85,6 +116,17 @@ fn normalize_fasta(read: &ForwardCursor<'_>, mut wc: &mut ForwardCursor<'_>, k:
// ── FASTQ automaton ───────────────────────────────────────────────────────────
/// Normalizes a FASTQ read by skipping the header and sequence lines,
/// and writing the sequence to `wc`.
///
/// # Arguments
///
/// * `read` - The input FASTA chunk cursor.
/// * `wc` - The output rope cursor.
/// * `k` - The k-mer size.
///
/// The k-mer size is used to determine the minimum length of Sequence
/// to be considered valid.
fn normalize_fastq(read: &ForwardCursor<'_>, mut wc: &mut ForwardCursor<'_>, k: usize) {
loop {
skip_line(read); // skip header
@@ -177,7 +219,7 @@ mod tests {
use super::*;
fn make_rope(data: &[u8]) -> Rope {
let mut r = Rope::new();
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
@@ -306,7 +348,7 @@ mod tests {
fn multi_slice_rope() {
let data = make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]);
let mid = data.len() / 2;
let mut rope = Rope::new();
let mut rope = Rope::new(None);
rope.push(data[..mid].to_vec());
rope.push(data[mid..].to_vec());
assert_eq!(
@@ -400,7 +442,7 @@ mod tests {
fn fasta_multi_slice_rope() {
let data = make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]);
let mid = data.len() / 2;
let mut rope = Rope::new();
let mut rope = Rope::new(None);
rope.push(data[..mid].to_vec());
rope.push(data[mid..].to_vec());
assert_eq!(
+6 -6
View File
@@ -12,8 +12,9 @@
//! | `http://` or `https://` | HTTP GET via `ureq` |
//! | anything else | local file; `~/` is expanded to the home directory |
use std::io::{self, Read};
use crate::mimetype::MimeTypeGuesser;
use std::fs::File;
use std::io::{self, Read};
// ── public API ────────────────────────────────────────────────────────────────
@@ -25,18 +26,17 @@ use std::fs::File;
/// # Errors
/// Returns an `io::Error` if the file cannot be opened, the URL cannot be
/// fetched, or the compression header is malformed.
pub fn xopen(source: &str) -> io::Result<Box<dyn Read + Send>> {
pub fn xopen(source: &str) -> io::Result<MimeTypeGuesser<Box<dyn Read + Send>>> {
let raw: Box<dyn Read + Send> = match source {
"-" => Box::new(io::stdin()),
s if s.starts_with("http://") || s.starts_with("https://") => {
http_reader(s)?
}
s if s.starts_with("http://") || s.starts_with("https://") => http_reader(s)?,
path => {
let expanded = expand_tilde(path);
Box::new(File::open(expanded.as_ref())?)
}
};
decompress(raw)
let decompressed = decompress(raw)?;
Ok(MimeTypeGuesser::new(decompressed))
}
// ── internal helpers ──────────────────────────────────────────────────────────
+2 -2
View File
@@ -156,7 +156,7 @@ mod tests {
use obikrope::Rope;
fn make_rope(data: &[u8]) -> Rope {
let mut r = Rope::new();
let mut r = Rope::new(None);
r.push(data.to_vec());
r
}
@@ -210,7 +210,7 @@ mod tests {
fn multi_slice_rope() {
let data = b"ACGTACGTACGT\x00";
let mid = data.len() / 2;
let mut rope = Rope::new();
let mut rope = Rope::new(None);
rope.push(data[..mid].to_vec());
rope.push(data[mid..].to_vec());
let out: Vec<Vec<u8>> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0)