diff --git a/docmd/implementation/chunkreader.md b/docmd/implementation/chunkreader.md index 4021c57..06184d1 100644 --- a/docmd/implementation/chunkreader.md +++ b/docmd/implementation/chunkreader.md @@ -1,6 +1,24 @@ # Chunk reader — implementation -The `obiread` crate provides a streaming iterator that reads FASTA or FASTQ files in fixed-size blocks and yields self-contained chunks, each ending on a complete sequence record boundary. Chunks are consumed in parallel by downstream workers. +`obiread` exposes two distinct sequence reading paths, each optimised for a different use case. + +## Two reading paths + +| Path | API | Output unit | Per-record identity | Use case | +|------|-----|-------------|---------------------|----------| +| **Record path** | `read_sequence_chunks` → `parse_chunk` | `SeqRecord` (id + raw sequence + normalised rope) | yes | `query` — must read complete records | +| **Stream path** | `open_nuc_stream` | `NucPage` (flat normalised byte buffer) | no | `index`, `superkmer` — bulk throughput | + +The record path uses `Rope`-backed chunks and is described in detail below. +The stream path (`NucStream` / `NucPage`) is described in the scatter section of [pipeline](pipeline.md). + +--- + +## Record path: chunk reader + +The chunk reader reads FASTA or FASTQ files in fixed-size blocks and yields self-contained chunks, each ending on a complete sequence record boundary. `parse_chunk` then converts each chunk into a `Vec`, where each record carries its identifier, raw sequence bytes, and a normalised rope ready for superkmer building. + +This path is mandatory for `query`, where superkmers must be tracked back to their originating sequence (id, kmer offset) for output annotation. ## Output type: Rope diff --git a/docmd/implementation/pipeline.md b/docmd/implementation/pipeline.md index 2cfac4b..b95a075 100644 --- a/docmd/implementation/pipeline.md +++ b/docmd/implementation/pipeline.md @@ -19,7 +19,11 @@ The histogram gives: ## Phase 1 — Scatter -Single streaming pass over raw input files (FASTA/FASTQ, gzip). FASTQ quality scores are ignored. For each read: +Single streaming pass over raw input files (FASTA/FASTQ, gzip). FASTQ quality scores are ignored. + +Input files are read via `open_nuc_stream`, which opens and decompresses the file, auto-detects the format (FASTA / FASTQ / GenBank), and yields a sequence of `NucPage` buffers. Each `NucPage` is a flat 64 KB buffer of normalised bytes (`ACGT` + `\x00` separators), carrying a k−1 byte overlap from the preceding page so that no k-mer is lost at page boundaries. Per-record identity (sequence id, raw bytes) is not preserved; this is intentional — the scatter phase only needs normalised bases to produce superkmers. + +For each read fragment within a page: 1. **Ambiguous base filter**: cut at any non-ACGT base; discard fragments shorter than k. 2. **Entropy filter**: scan each fragment with a sliding window of size k. When the kmer $K_i = S[i \mathinner{..} i+k-1]$ ended by nucleotide $S[j]$ (with $j = i+k-1$) has entropy below threshold $\theta$, emit the current segment and start a new one (see algorithm below). $K_i$ belongs to neither segment, and no valid kmer is lost. diff --git a/src/obikmer/src/cli.rs b/src/obikmer/src/cli.rs index 82c3a38..8a84c03 100644 --- a/src/obikmer/src/cli.rs +++ b/src/obikmer/src/cli.rs @@ -1,9 +1,8 @@ -use std::io; use std::path::PathBuf; use std::sync::{Arc, Condvar, Mutex}; use clap::Args; -use obikrope::Rope; +use obiread::NucPage; use obikseq::RoutableSuperKmer; // ── Shared arguments ────────────────────────────────────────────────────────── @@ -45,9 +44,11 @@ pub struct CommonArgs { )] pub threads: usize, - /// Maximum number of input files open simultaneously - #[arg(long, default_value_t = 20)] - pub max_open_files: usize, + /// Maximum number of input files open simultaneously. + /// Defaults to threads/4 (minimum 1). Keep below the number of workers + /// to ensure CPU workers are always available for the transform stage. + #[arg(long)] + pub max_open_files: Option, } /// Smallest `b` such that `2^b >= n` (i.e. `n.next_power_of_two().ilog2()`). @@ -86,6 +87,12 @@ impl CommonArgs { } } + pub fn effective_max_open(&self) -> usize { + self.max_open_files + .unwrap_or_else(|| (self.threads / 4).max(1)) + .max(1) + } + pub fn seqfile_paths(&self) -> obiread::PathIter { let paths: Vec = if self.inputs.is_empty() { vec![PathBuf::from("-")] @@ -144,13 +151,10 @@ pub struct PathWithSlot { pub enum PipelineData { Path(PathWithSlot), - RawChunk(Rope), - NormChunk(Rope), + NucPage(NucPage), Batch(Vec), } -// SAFETY: Rope contains Cell which is !Sync, but pipeline ownership transfers -// exclusively through channels — no item is ever shared across threads. unsafe impl Send for PipelineData {} unsafe impl Sync for PipelineData {} @@ -171,18 +175,3 @@ pub fn throttle_paths( }) } -// ── I/O plumbing ────────────────────────────────────────────────────────────── - -pub fn open_chunks(path: PathBuf) -> io::Result> { - let path_str = path - .to_str() - .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "non-UTF-8 path"))?; - let iter = obiread::read_sequence_chunks(path_str)?; - Ok(iter.filter_map(|r| match r { - Ok(rope) => Some(rope), - Err(e) => { - eprintln!("chunk read error: {e}"); - None - } - })) -} diff --git a/src/obikmer/src/cmd/index.rs b/src/obikmer/src/cmd/index.rs index 13268d8..1a796f9 100644 --- a/src/obikmer/src/cmd/index.rs +++ b/src/obikmer/src/cmd/index.rs @@ -231,7 +231,7 @@ pub fn run(args: IndexArgs) { let theta = args.common.theta; let n_workers = args.common.threads.max(1); - let max_open = args.common.max_open_files.max(1); + let max_open = args.common.effective_max_open(); scatter(idx.partition_mut(), args.common.seqfile_paths(), k, level_max, theta, n_workers, max_open, &mut rep); idx.mark_scattered().unwrap_or_else(|e| { diff --git a/src/obikmer/src/cmd/superkmer.rs b/src/obikmer/src/cmd/superkmer.rs index a1c6398..1b0d7cd 100644 --- a/src/obikmer/src/cmd/superkmer.rs +++ b/src/obikmer/src/cmd/superkmer.rs @@ -4,7 +4,7 @@ use clap::Args; use obifastwrite::write_scatter; use obikseq::{RoutableSuperKmer, set_k, set_m}; -use crate::cli::{CommonArgs, PipelineData, PathWithSlot, open_chunks, partitions_to_bits, throttle_paths}; +use crate::cli::{CommonArgs, PipelineData, PathWithSlot, partitions_to_bits, throttle_paths}; #[derive(Args)] pub struct SuperkmerArgs { @@ -41,7 +41,7 @@ pub fn run(args: SuperkmerArgs) { let level_max = args.common.level_max; let partition_bits = partitions_to_bits(args.common.partitions); let n_workers = args.common.threads.max(1); - let max_open = args.common.max_open_files.max(1); + let max_open = args.common.effective_max_open(); set_k(k); set_m(m); @@ -50,9 +50,14 @@ pub fn run(args: SuperkmerArgs) { let pipe = obipipeline::make_pipe! { PipelineData : PathWithSlot => Vec, - ||? { |pw: PathWithSlot| open_chunks(pw.path) } : Path => RawChunk, - |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, - | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) } : NormChunk => Batch, + ||? { + let k = k; + move |pw: PathWithSlot| { + let path_str = pw.path.to_str().unwrap_or("").to_owned(); + obiread::open_nuc_stream(&path_str, k) + } + } : Path => NucPage, + | { move |page| obiskbuilder::build_superkmers_page(page, k, level_max, theta) } : NucPage => Batch, }; let mut out = BufWriter::new(io::stdout()); diff --git a/src/obikmer/src/steps/scatter.rs b/src/obikmer/src/steps/scatter.rs index 777e92b..80d789e 100644 --- a/src/obikmer/src/steps/scatter.rs +++ b/src/obikmer/src/steps/scatter.rs @@ -1,30 +1,37 @@ use std::path::PathBuf; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use indicatif::{ProgressBar, ProgressStyle}; +use obiread::NucPage; use obikpartitionner::KmerPartition; -use obikrope::Rope; use obisys::{Reporter, Stage}; use tracing::info; -use crate::cli::{PipelineData, PathWithSlot, open_chunks, throttle_paths}; +use crate::cli::{PipelineData, PathWithSlot, throttle_paths}; // ── Iterator that keeps the slot guard alive until the file is exhausted ────── -struct GuardedIter { - inner: I, - _guard: Box, +struct GuardedIter { + inner: Box + Send>, + _guard: Box, + flat_active: Arc, } -impl> Iterator for GuardedIter { - type Item = Rope; - fn next(&mut self) -> Option { +impl Iterator for GuardedIter { + type Item = NucPage; + fn next(&mut self) -> Option { self.inner.next() } } +impl Drop for GuardedIter { + fn drop(&mut self) { + self.flat_active.fetch_sub(1, Ordering::Relaxed); + } +} + // ── scatter ─────────────────────────────────────────────────────────────────── /// Run scatter: normalise → build superkmers → route to partition → close. @@ -44,23 +51,36 @@ pub fn scatter( // Throttle in the source thread — never in a worker — to prevent deadlock. let throttled = throttle_paths(path_source, max_open); - let file_count = Arc::new(AtomicU64::new(0)); + let file_count = Arc::new(AtomicU64::new(0)); + let flat_active = Arc::new(AtomicU32::new(0)); + let transform_active = Arc::new(AtomicU32::new(0)); let t = Stage::start("scatter"); let pipe = obipipeline::make_pipe! { PipelineData : PathWithSlot => Vec, ||? { - let file_count = Arc::clone(&file_count); + let file_count = Arc::clone(&file_count); + let flat_active = Arc::clone(&flat_active); + let k = k; move |pw: PathWithSlot| { let PathWithSlot { path, _guard } = pw; let n = file_count.fetch_add(1, Ordering::Relaxed) + 1; info!("indexing [{}]: {}", n, path.display()); - // _guard travels into GuardedIter; released when all chunks are read - open_chunks(path).map(|iter| GuardedIter { inner: iter, _guard }) + let path_str = path.to_str().unwrap_or("").to_owned(); + flat_active.fetch_add(1, Ordering::Relaxed); + obiread::open_nuc_stream(&path_str, k) + .map(|iter| GuardedIter { inner: iter, _guard, flat_active: Arc::clone(&flat_active) }) } - } : Path => RawChunk, - |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, - | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) } : NormChunk => Batch, + } : Path => NucPage, + | { + let transform_active = Arc::clone(&transform_active); + move |page| { + transform_active.fetch_add(1, Ordering::Relaxed); + let result = obiskbuilder::build_superkmers_page(page, k, level_max, theta); + transform_active.fetch_sub(1, Ordering::Relaxed); + result + } + } : NucPage => Batch, }; let pb = ProgressBar::new_spinner(); @@ -93,7 +113,9 @@ pub fn scatter( (format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6)) }; let n_files = file_count.load(Ordering::Relaxed); - pb.set_message(format!("{count_str} {rate_str} {n_files} files")); + let r = flat_active.load(Ordering::Relaxed); + let c = transform_active.load(Ordering::Relaxed); + pb.set_message(format!("{count_str} {rate_str} {n_files} files [R:{r} C:{c}]")); } kp.write_batch(batch).unwrap_or_else(|e| { eprintln!("error: {e}"); diff --git a/src/obiread/src/lib.rs b/src/obiread/src/lib.rs index 94f0b8e..2370bb4 100644 --- a/src/obiread/src/lib.rs +++ b/src/obiread/src/lib.rs @@ -10,19 +10,21 @@ mod fasta; mod fastq; mod mimetype; pub mod normalize; +mod nucstream; mod path_iterator; pub mod peakreader; pub mod record; -pub mod stream; pub mod xopen; -pub use chunk::{SeqChunkIter, fasta_chunks, fastq_chunks, - read_fasta_chunks, read_fastq_chunks, read_sequence_chunks}; -pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk}; +pub use chunk::{ + SeqChunkIter, fasta_chunks, fastq_chunks, read_fasta_chunks, read_fastq_chunks, + read_sequence_chunks, +}; pub use mimetype::MimeTypeGuesser; +pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk}; +pub use nucstream::{NucPage, NucPageCursor, open_nuc_stream}; pub use path_iterator::{PathIter, path_iter}; pub use peakreader::PeekReader; -pub use stream::NormalizedByteStream; pub use xopen::xopen; /// Default read block size: 1 MiB. diff --git a/src/obiread/src/nucstream.rs b/src/obiread/src/nucstream.rs new file mode 100644 index 0000000..eb252d6 --- /dev/null +++ b/src/obiread/src/nucstream.rs @@ -0,0 +1,732 @@ +use std::io::{self, Read}; +use std::mem::ManuallyDrop; +use std::sync::{Arc, Mutex}; + +use crate::mimetype::MimeTypeGuesser; +use crate::xopen::open_raw; + +pub const MAX_K: usize = 31; +const PAGE_SIZE: usize = 65536; +// overlap (MAX_K - 1) + page data (PAGE_SIZE) + 1 byte for the end-of-page terminating 0 +const BUF_SIZE: usize = MAX_K + PAGE_SIZE; + +// ─── OverlapState ───────────────────────────────────────────────────────────── + +pub(crate) struct OverlapState { + data: [u8; MAX_K], + len: usize, + k: usize, +} + +impl OverlapState { + pub(crate) fn new(k: usize) -> Self { + assert!(k > 0 && k <= MAX_K); + Self { + data: [0u8; MAX_K], + len: 0, + k, + } + } +} + +// ─── NucParser trait ────────────────────────────────────────────────────────── + +// Transforms a raw page into a compacted nucleotide stream in-place. +// +// Buffer layout on each call: +// buf[0..overlap_len()] — overlap bytes copied by write_overlap() +// buf[overlap_len()..overlap_len()+n] — raw bytes just read from the source +// +// Returns the number of output bytes in buf[0..returned]. +pub(crate) trait NucParser { + // required: format-specific + fn new(k: usize) -> Self + where + Self: Sized; + fn overlap_state(&self) -> &OverlapState; + fn overlap_state_mut(&mut self) -> &mut OverlapState; + fn is_in_seq(&self) -> bool; + fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize; + + // provided: format-independent overlap management + fn overlap_len(&self) -> usize { + self.overlap_state().len + } + + fn write_overlap(&self, buf: &mut [u8]) { + let ol = &self.overlap_state(); + buf[..ol.len].copy_from_slice(&ol.data[..ol.len]); + } + + // Called at end of parse_inplace: saves overlap state and returns adjusted j. + // seq_start is the j-position where the last sequence started in this call's output. + fn save_overlap(&mut self, buf: &mut [u8], j: usize, seq_start: usize) -> usize { + if !self.is_in_seq() { + self.overlap_state_mut().len = 0; + return j; + } + let seq_len = j - seq_start; + let k = self.overlap_state().k; + if seq_len >= k { + // Sequence long enough: save last k-1 nucleotides, terminate with 0. + let ol = k - 1; + self.overlap_state_mut().data[..ol].copy_from_slice(&buf[j - ol..j]); + self.overlap_state_mut().len = ol; + // SAFETY: j <= total - 1 < BUF_SIZE = buf.len() + // (total = overlap_len + n <= (MAX_K-1) + PAGE_SIZE = BUF_SIZE - 1) + unsafe { + *buf.get_unchecked_mut(j) = 0; + } + j + 1 + } else if seq_len > 0 { + // Short sequence (< k): save whole fragment, strip from output. + self.overlap_state_mut().data[..seq_len].copy_from_slice(&buf[seq_start..j]); + self.overlap_state_mut().len = seq_len; + seq_start + } else { + self.overlap_state_mut().len = 0; + j + } + } +} + +// ─── FASTA parser ───────────────────────────────────────────────────────────── + +#[derive(Clone, Copy)] +enum FastaState { + OutSeq, + InTitle, + InSeq, + InAmbiguous, +} + +pub(crate) struct FastaParser { + state: FastaState, + overlap: OverlapState, +} + +impl NucParser for FastaParser { + fn new(k: usize) -> Self { + Self { + state: FastaState::OutSeq, + overlap: OverlapState::new(k), + } + } + + #[inline] + fn overlap_state(&self) -> &OverlapState { + &self.overlap + } + + #[inline] + fn overlap_state_mut(&mut self) -> &mut OverlapState { + &mut self.overlap + } + + #[inline] + fn is_in_seq(&self) -> bool { + matches!(self.state, FastaState::InSeq) + } + + fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize { + let total = self.overlap.len + n; + let mut i = 0; // read index + let mut j = 0; // write index (invariant: j <= i always) + // j-position where the current sequence started in this call's output; + // meaningful only when state is InSeq. + let mut seq_start: usize = 0; + + while i < total { + // SAFETY: i < total <= BUF_SIZE = buf.len() + let byte = unsafe { *buf.get_unchecked(i) }; + + match self.state { + FastaState::OutSeq => { + if byte == b'>' { + self.state = FastaState::InTitle; + } + i += 1; + } + FastaState::InTitle => { + if byte == b'\n' || byte == b'\r' { + self.state = FastaState::InSeq; + seq_start = j; + } + i += 1; + } + FastaState::InSeq => { + if byte == b'\n' || byte == b'\r' { + i += 1; + continue; + } + let nuc = byte & 0xDF; // to uppercase + if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' { + // SAFETY: j <= i < total <= BUF_SIZE = buf.len() + unsafe { + *buf.get_unchecked_mut(j) = nuc; + } + j += 1; + i += 1; + } else if byte == b'>' { + if j > seq_start { + unsafe { + *buf.get_unchecked_mut(j) = 0; + } + j += 1; + } + self.state = FastaState::InTitle; + i += 1; + } else { + // first ambiguous base: end current sequence if non-empty + if j > seq_start { + unsafe { + *buf.get_unchecked_mut(j) = 0; + } + j += 1; + } + self.state = FastaState::InAmbiguous; + i += 1; + } + } + FastaState::InAmbiguous => { + if byte == b'\n' || byte == b'\r' { + i += 1; + continue; + } + if byte == b'>' { + self.state = FastaState::InTitle; + i += 1; + continue; + } + let nuc = byte & 0xDF; + if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' { + seq_start = j; + // SAFETY: j <= i < total <= BUF_SIZE = buf.len() + unsafe { + *buf.get_unchecked_mut(j) = nuc; + } + j += 1; + self.state = FastaState::InSeq; + } + i += 1; + } + } + } + + self.save_overlap(buf, j, seq_start) + } +} + +// ─── FASTQ parser ───────────────────────────────────────────────────────────── + +#[derive(Clone, Copy)] +enum FastqState { + OutSeq, + InTitle, + InSeq, + InAmbiguous, + InQualTitle, + InQual, +} + +pub(crate) struct FastqParser { + state: FastqState, + overlap: OverlapState, +} + +impl NucParser for FastqParser { + fn new(k: usize) -> Self { + Self { + state: FastqState::OutSeq, + overlap: OverlapState::new(k), + } + } + + #[inline] + fn overlap_state(&self) -> &OverlapState { + &self.overlap + } + + #[inline] + fn overlap_state_mut(&mut self) -> &mut OverlapState { + &mut self.overlap + } + + #[inline] + fn is_in_seq(&self) -> bool { + matches!(self.state, FastqState::InSeq) + } + + fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize { + let total = self.overlap.len + n; + let mut i = 0; + let mut j = 0; + let mut seq_start: usize = 0; + + while i < total { + // SAFETY: i < total <= BUF_SIZE = buf.len() + let byte = unsafe { *buf.get_unchecked(i) }; + + match self.state { + FastqState::OutSeq => { + if byte == b'@' { + self.state = FastqState::InTitle; + } + i += 1; + } + FastqState::InTitle => { + if byte == b'\n' || byte == b'\r' { + self.state = FastqState::InSeq; + seq_start = j; + } + i += 1; + } + FastqState::InSeq => { + if byte == b'\n' || byte == b'\r' { + if j > seq_start { + unsafe { + *buf.get_unchecked_mut(j) = 0; + } + j += 1; + } + self.state = FastqState::InQualTitle; + i += 1; + continue; + } + let nuc = byte & 0xDF; + if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' { + // SAFETY: j <= i < total <= BUF_SIZE = buf.len() + unsafe { + *buf.get_unchecked_mut(j) = nuc; + } + j += 1; + } else { + if j > seq_start { + unsafe { + *buf.get_unchecked_mut(j) = 0; + } + j += 1; + } + self.state = FastqState::InAmbiguous; + } + i += 1; + } + FastqState::InAmbiguous => { + if byte == b'\n' || byte == b'\r' { + self.state = FastqState::InQualTitle; + i += 1; + continue; + } + let nuc = byte & 0xDF; + if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' { + seq_start = j; + // SAFETY: j <= i < total <= BUF_SIZE = buf.len() + unsafe { + *buf.get_unchecked_mut(j) = nuc; + } + j += 1; + self.state = FastqState::InSeq; + } + i += 1; + } + FastqState::InQualTitle => { + if byte == b'\n' || byte == b'\r' { + self.state = FastqState::InQual; + } + i += 1; + } + FastqState::InQual => { + if byte == b'\n' || byte == b'\r' { + self.state = FastqState::OutSeq; + } + i += 1; + } + } + } + + self.save_overlap(buf, j, seq_start) + } +} + +// ─── GenBank parser ─────────────────────────────────────────────────────────── + +const ORIGIN_TAIL: &[u8] = b"RIGIN"; + +#[derive(Clone, Copy)] +enum GenbankState { + OutSeq, + MatchOrigin, + SkipOriginLine, + InSeq, + InSlash, + InAmbiguous, +} + +pub(crate) struct GenbankParser { + state: GenbankState, + overlap: OverlapState, + keyword_pos: usize, + at_line_start: bool, +} + +impl NucParser for GenbankParser { + fn new(k: usize) -> Self { + Self { + state: GenbankState::OutSeq, + overlap: OverlapState::new(k), + keyword_pos: 0, + at_line_start: true, + } + } + + #[inline] + fn overlap_state(&self) -> &OverlapState { + &self.overlap + } + + #[inline] + fn overlap_state_mut(&mut self) -> &mut OverlapState { + &mut self.overlap + } + + #[inline] + fn is_in_seq(&self) -> bool { + matches!(self.state, GenbankState::InSeq) + } + + fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize { + let total = self.overlap.len + n; + let mut i = 0; + let mut j = 0; + let mut seq_start: usize = 0; + + while i < total { + // SAFETY: i < total <= BUF_SIZE = buf.len() + let byte = unsafe { *buf.get_unchecked(i) }; + + match self.state { + GenbankState::OutSeq => { + if byte == b'\n' || byte == b'\r' { + self.at_line_start = true; + } else if self.at_line_start && byte == b'O' { + self.state = GenbankState::MatchOrigin; + self.keyword_pos = 1; + self.at_line_start = false; + } else { + self.at_line_start = false; + } + i += 1; + } + GenbankState::MatchOrigin => { + if byte == b'\n' || byte == b'\r' { + self.state = GenbankState::OutSeq; + self.at_line_start = true; + } else if byte == ORIGIN_TAIL[self.keyword_pos - 1] { + self.keyword_pos += 1; + if self.keyword_pos == 6 { + self.state = GenbankState::SkipOriginLine; + } + } else { + self.state = GenbankState::OutSeq; + self.at_line_start = false; + } + i += 1; + } + GenbankState::SkipOriginLine => { + if byte == b'\n' || byte == b'\r' { + self.state = GenbankState::InSeq; + seq_start = j; + } + i += 1; + } + GenbankState::InSeq => { + if byte == b'\n' || byte == b'\r' { + self.at_line_start = true; + i += 1; + continue; + } + if self.at_line_start && byte == b'/' { + self.state = GenbankState::InSlash; + self.at_line_start = false; + i += 1; + continue; + } + self.at_line_start = false; + let nuc = byte & 0xDF; + if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' { + // SAFETY: j <= i < total <= BUF_SIZE = buf.len() + unsafe { + *buf.get_unchecked_mut(j) = nuc; + } + j += 1; + } else if byte.is_ascii_digit() || byte == b' ' { + // position numbers and spacing between groups: skip + } else { + // ambiguous base: end current sequence if non-empty + if j > seq_start { + unsafe { + *buf.get_unchecked_mut(j) = 0; + } + j += 1; + } + self.state = GenbankState::InAmbiguous; + } + i += 1; + } + GenbankState::InSlash => { + if byte == b'/' { + // confirmed "//": end of sequence record + if j > seq_start { + unsafe { + *buf.get_unchecked_mut(j) = 0; + } + j += 1; + } + self.state = GenbankState::OutSeq; + self.at_line_start = false; + } else if byte == b'\n' || byte == b'\r' { + // single '/' line: back to sequence + self.state = GenbankState::InSeq; + self.at_line_start = true; + } else { + // false positive: single '/' mid-line, resume sequence + self.state = GenbankState::InSeq; + self.at_line_start = false; + } + i += 1; + } + GenbankState::InAmbiguous => { + if byte == b'\n' || byte == b'\r' { + self.at_line_start = true; + i += 1; + continue; + } + if self.at_line_start && byte == b'/' { + self.state = GenbankState::InSlash; + self.at_line_start = false; + i += 1; + continue; + } + self.at_line_start = false; + let nuc = byte & 0xDF; + if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' { + seq_start = j; + // SAFETY: j <= i < total <= BUF_SIZE = buf.len() + unsafe { + *buf.get_unchecked_mut(j) = nuc; + } + j += 1; + self.state = GenbankState::InSeq; + } + // digits, spaces, other ambiguous codes: skip + i += 1; + } + } + } + + self.save_overlap(buf, j, seq_start) + } +} + +// ─── NucPage ────────────────────────────────────────────────────────────────── + +/// Owned page of compacted nucleotides: uppercase A/C/G/T bytes separated by `0` +/// at sequence boundaries. Automatically returns its buffer to the pool on drop. +pub struct NucPage { + data: ManuallyDrop>, + len: usize, + pool: Arc>>>, +} + +impl std::ops::Deref for NucPage { + type Target = [u8]; + fn deref(&self) -> &[u8] { + &self.data[..self.len] + } +} + +impl Drop for NucPage { + fn drop(&mut self) { + // SAFETY: data is never accessed after this point + let buf = unsafe { ManuallyDrop::take(&mut self.data) }; + self.pool.lock().unwrap().push(buf); + } +} + +// ─── NucPageCursor ──────────────────────────────────────────────────────────── + +/// A forward cursor over the normalised bytes of a [`NucPage`]. +/// +/// Provides the `next_byte` / `rewind` interface consumed by +/// [`obiskbuilder::SuperKmerStreamIter`]. +pub struct NucPageCursor<'a> { + data: &'a [u8], + pos: usize, +} + +impl NucPageCursor<'_> { + /// Returns the next byte in the page, or `None` at end. + #[inline] + pub fn next_byte(&mut self) -> Option { + if self.pos < self.data.len() { + let b = self.data[self.pos]; + self.pos += 1; + Some(b) + } else { + None + } + } + + /// Steps the cursor back by `n` bytes. + /// + /// The caller guarantees that the last `n` bytes were all `ACGT` + /// (no `0x00` separators), so they are still in the page buffer. + #[inline] + pub fn rewind(&mut self, n: usize) { + self.pos -= n; + } + + /// Total number of bytes in the underlying page. + #[inline] + pub fn len(&self) -> usize { + self.data.len() + } + + /// Returns `true` if the page contains no bytes. + #[inline] + pub fn is_empty(&self) -> bool { + self.data.is_empty() + } +} + +impl NucPage { + /// Creates a forward cursor positioned at the start of this page. + pub fn cursor(&self) -> NucPageCursor<'_> { + NucPageCursor { data: self, pos: 0 } + } +} + +// ─── NucStream ──────────────────────────────────────────────────────────────── + +pub(crate) struct NucStream { + reader: R, + parser: P, + pool: Arc>>>, + eof: bool, +} + +impl NucStream { + pub(crate) fn new(reader: R, k: usize) -> Self { + Self { + reader, + parser: P::new(k), + pool: Arc::new(Mutex::new(Vec::new())), + eof: false, + } + } + + pub(crate) fn read_page(&mut self) -> Option { + loop { + if self.eof { + return None; + } + // take a buffer from the pool, or allocate fresh if all are in-flight + let mut buf = self + .pool + .lock() + .unwrap() + .pop() + .unwrap_or_else(|| vec![0u8; BUF_SIZE]); + + let ol = self.parser.overlap_len(); + self.parser.write_overlap(&mut buf[..ol]); + let n = self.reader.read(&mut buf[ol..ol + PAGE_SIZE]).unwrap_or(0); + if n == 0 { + self.eof = true; + if ol == 0 { + self.pool.lock().unwrap().push(buf); + return None; + } + } + let out_len = self.parser.parse_inplace(&mut buf, n); + if out_len > 0 { + return Some(NucPage { + data: ManuallyDrop::new(buf), + len: out_len, + pool: Arc::clone(&self.pool), + }); + } + // empty page (all headers/ambiguous): return buf to pool and loop + self.pool.lock().unwrap().push(buf); + } + } +} + +impl Iterator for NucStream { + type Item = NucPage; + fn next(&mut self) -> Option { + self.read_page() + } +} + +// ─── FastaNucStream ─────────────────────────────────────────────────────────── + +pub(crate) type FastaNucStream = NucStream; +pub(crate) type FastqNucStream = NucStream; +pub(crate) type GenbankNucStream = NucStream; + +// ─── AnyNucStream ───────────────────────────────────────────────────────────── + +pub(crate) enum AnyNucStream { + Fasta(FastaNucStream), + Fastq(FastqNucStream), + Genbank(GenbankNucStream), +} + +impl Iterator for AnyNucStream { + type Item = NucPage; + fn next(&mut self) -> Option { + match self { + AnyNucStream::Fasta(s) => s.next(), + AnyNucStream::Fastq(s) => s.next(), + AnyNucStream::Genbank(s) => s.next(), + } + } +} + +fn dispatch( + mut guesser: MimeTypeGuesser, + k: usize, +) -> Option>> { + match guesser.mime_type() { + Some("text/fasta") => Some(AnyNucStream::Fasta(NucStream::new(guesser, k))), + Some("text/fastq") => Some(AnyNucStream::Fastq(NucStream::new(guesser, k))), + Some("text/gbff") => Some(AnyNucStream::Genbank(NucStream::new(guesser, k))), + _ => None, + } +} + +/// Wraps an already-open reader in a nucleotide stream, detecting its format. +/// Returns `None` if the format is not recognised. +pub(crate) fn nuc_stream( + reader: R, + k: usize, +) -> Option>> { + dispatch(MimeTypeGuesser::new(reader), k) +} + +/// Opens a nucleotide stream from any source (file path, URL, or `-` for stdin), +/// with transparent decompression and automatic format detection. +/// +/// # Errors +/// Returns an `io::Error` if the source cannot be opened, decompression fails, +/// or the format is not recognised. +pub fn open_nuc_stream( + source: &str, + k: usize, +) -> io::Result + Send>> { + let reader = open_raw(source)?; + dispatch(MimeTypeGuesser::new(reader), k) + .map(|s| Box::new(s) as Box + Send>) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "unknown sequence format")) +} diff --git a/src/obiread/src/stream.rs b/src/obiread/src/stream.rs deleted file mode 100644 index e797ee2..0000000 --- a/src/obiread/src/stream.rs +++ /dev/null @@ -1,507 +0,0 @@ -//! Streaming byte normaliser for FASTA, FASTQ, and GBFF sequence files. -//! -//! [`NormalizedByteStream`] wraps any `Read` source in a 64 KiB circular ring -//! buffer and emits a normalised byte stream: -//! -//! - Uppercase `ACGT` bytes for in-sequence positions. -//! - `0x00` as a separator between independent sequence segments (record -//! boundary, non-ACGT base, or end of the GBFF `//` record). -//! - `None` at true EOF. -//! -//! Unlike the Rope-based pipeline, the stream never accumulates a full -//! record: it is safe for chromosomes of arbitrary size (e.g. GBFF files -//! with 250 MiB ORIGIN sections). -//! -//! # Rewind invariant -//! -//! `rewind(n)` subtracts `n` from the read head. It is only called by -//! [`obiskbuilder::SuperKmerStreamIter`] when the last `n` bytes were all -//! ACGT (no format overhead in between), so the ring buffer always contains -//! those bytes at positions `[head-n .. head]`. The maximum rewind is -//! `k ≤ 31`, well within the 32 KiB kept below the refill watermark. - -use std::io::{self, Read}; - -// ── ring-buffer constants ───────────────────────────────────────────────────── - -const CAP: usize = 65536; // must be a power of two -const MASK: usize = CAP - 1; -const REFILL_THRESHOLD: usize = 32768; // refill when available drops below this -const REFILL_CHUNK: usize = 32768; // target bytes per refill call - -// ── format detection ────────────────────────────────────────────────────────── - -#[derive(Clone, Copy, PartialEq, Eq)] -enum Format { Fasta, Fastq, Gbff } - -fn detect_format(buf: &[u8]) -> Format { - if buf.starts_with(b"LOCUS ") { - Format::Gbff - } else if buf.first() == Some(&b'>') { - Format::Fasta - } else { - Format::Fastq - } -} - -// ── format state machines ───────────────────────────────────────────────────── - -#[derive(Clone, Copy)] -enum FaState { - OutSeq, // reading header / between records - InSeq, // reading sequence bases -} - -#[derive(Clone, Copy)] -enum FqState { - Header, // reading @id line - Seq, // reading sequence bases - Plus, // reading +[id] separator line - Qual(usize), // skipping quality bytes; payload = remaining count -} - -const ORIGIN: &[u8] = b"ORIGIN"; - -#[derive(Clone, Copy)] -enum GbState { - Pre(u8), // pre-ORIGIN: n chars of "ORIGIN" matched at current line start - PreSkip, // pre-ORIGIN: skip rest of non-ORIGIN line then → Pre(0) - LineStart, // ORIGIN data: at column 0 of a numbered data line - Num, // ORIGIN data: skipping spaces + line-number digits - Seq, // ORIGIN data: reading lowercase sequence bases - Slash, // ORIGIN data: saw the first '/' of a potential '//' end marker -} - -#[derive(Clone, Copy)] -enum State { - Fa(FaState), - Fq(FqState), - Gb(GbState), -} - -// ── NormalizedByteStream ────────────────────────────────────────────────────── - -/// A streaming normaliser over any `Read` source. -/// -/// Call [`next_byte`](NormalizedByteStream::next_byte) to consume one normalised -/// byte at a time. Call [`rewind`](NormalizedByteStream::rewind) to step back -/// up to 31 bytes (used by the superkmer builder when a minimizer or length -/// boundary is crossed). -pub struct NormalizedByteStream { - reader: R, - buf: Box<[u8; CAP]>, - head: usize, // absolute read position; index = head & MASK - write: usize, // absolute write position; index = write & MASK - eof: bool, - state: State, - seq_len: usize, // FASTQ only: non-newline chars seen in current sequence line -} - -impl NormalizedByteStream { - /// Wrap `reader` and detect its format from the first bytes. - pub fn new(mut reader: R) -> io::Result { - let mut buf = Box::new([0u8; CAP]); - - // Initial read — enough to detect the format. - let n = reader.read(&mut buf[..REFILL_CHUNK])?; - let fmt = detect_format(&buf[..n]); - let state = match fmt { - Format::Fasta => State::Fa(FaState::OutSeq), - Format::Fastq => State::Fq(FqState::Header), - Format::Gbff => State::Gb(GbState::Pre(0)), - }; - - Ok(Self { - reader, - buf, - head: 0, - write: n, - eof: n == 0, - state, - seq_len: 0, - }) - } - - /// Step the read head back by `n` bytes. - /// - /// # Safety - /// The caller guarantees that the last `n` bytes returned by - /// [`next_byte`] were all uppercase ACGT (no `0x00` separators), so those - /// bytes are still present in the ring buffer. - #[inline] - pub fn rewind(&mut self, n: usize) { - self.head -= n; - } - - #[inline] - fn available(&self) -> usize { - self.write - self.head - } - - fn try_refill(&mut self) -> io::Result<()> { - if self.eof || self.available() >= REFILL_THRESHOLD { - return Ok(()); - } - - // First segment: write_idx → end of buffer (contiguous, no wrap). - let write_idx = self.write & MASK; - let free = CAP - self.available(); - let to_end = CAP - write_idx; - let chunk1 = free.min(to_end).min(REFILL_CHUNK); - - if chunk1 > 0 { - let n = self.reader.read(&mut self.buf[write_idx..write_idx + chunk1])?; - self.write += n; - if n == 0 { - self.eof = true; - return Ok(()); - } - } - - // Second segment: if still below threshold, read into the wrapped region. - if self.available() < REFILL_THRESHOLD && !self.eof { - let write_idx2 = self.write & MASK; - let free2 = CAP - self.available(); - let to_end2 = CAP - write_idx2; - let chunk2 = free2.min(to_end2).min(REFILL_CHUNK); - if chunk2 > 0 { - let n = self.reader.read(&mut self.buf[write_idx2..write_idx2 + chunk2])?; - self.write += n; - if n == 0 { - self.eof = true; - } - } - } - - Ok(()) - } - - /// Return the next normalised byte, or `None` at EOF. - /// - /// Emits uppercase `A`, `C`, `G`, `T`, or `0x00` (segment separator). - pub fn next_byte(&mut self) -> Option { - loop { - // Proactive refill: keep ≥ REFILL_THRESHOLD bytes in the buffer - // so rewinds are always safe. - if !self.eof && self.available() < REFILL_THRESHOLD { - let _ = self.try_refill(); - } - - if self.available() == 0 { - return None; - } - - let b = self.buf[self.head & MASK]; - self.head += 1; - - // Copy state to avoid borrow conflicts while mutating self.state. - let state = self.state; - if let Some(out) = self.process(state, b) { - return Some(out); - } - } - } - - // ── per-format byte processors ──────────────────────────────────────────── - - #[inline] - fn process(&mut self, state: State, b: u8) -> Option { - match state { - State::Fa(fa) => self.fasta(fa, b), - State::Fq(fq) => self.fastq(fq, b), - State::Gb(gb) => self.gbff(gb, b), - } - } - - fn fasta(&mut self, fa: FaState, b: u8) -> Option { - match fa { - FaState::OutSeq => { - if b == b'\n' { - self.state = State::Fa(FaState::InSeq); - } - None // skip header bytes - } - FaState::InSeq => { - if b == b'>' { - self.state = State::Fa(FaState::OutSeq); - Some(0x00) // record boundary - } else if b == b'\n' || b == b'\r' { - None - } else { - Some(normalize_nuc(b)) - } - } - } - } - - fn fastq(&mut self, fq: FqState, b: u8) -> Option { - match fq { - FqState::Header => { - if b == b'\n' { - self.state = State::Fq(FqState::Seq); - self.seq_len = 0; - } - None - } - FqState::Seq => { - if b == b'\n' { - self.state = State::Fq(FqState::Plus); - None - } else if b == b'\r' { - None - } else { - self.seq_len += 1; // count all chars, including non-ACGT - Some(normalize_nuc(b)) - } - } - FqState::Plus => { - if b == b'\n' { - self.state = State::Fq(FqState::Qual(self.seq_len)); - } - None - } - FqState::Qual(rem) => { - if b == b'\n' { - if rem == 0 { - // Quality line ended after all bases consumed → new record. - self.state = State::Fq(FqState::Header); - Some(0x00) // record boundary - } else { - // Newline inside multi-line quality — keep counting. - None - } - } else if b == b'\r' { - None - } else if rem > 0 { - self.state = State::Fq(FqState::Qual(rem - 1)); - None - } else { - // rem == 0 but non-newline: shouldn't happen in valid FASTQ. - None - } - } - } - } - - fn gbff(&mut self, gb: GbState, b: u8) -> Option { - match gb { - GbState::Pre(n) => { - let n = n as usize; - if n < 6 { - if b == ORIGIN[n] { - self.state = State::Gb(GbState::Pre((n + 1) as u8)); - } else if b == b'\n' { - self.state = State::Gb(GbState::Pre(0)); - } else { - self.state = State::Gb(GbState::PreSkip); - } - } else { - // All 6 chars of "ORIGIN" matched; skip the rest of the header line. - if b == b'\n' { - self.state = State::Gb(GbState::LineStart); - } - // Non-'\n': stay in Pre(6) implicitly (state unchanged). - } - None - } - GbState::PreSkip => { - if b == b'\n' { - self.state = State::Gb(GbState::Pre(0)); - } - None - } - GbState::LineStart => { - if b == b'/' { - self.state = State::Gb(GbState::Slash); - } else if b != b'\n' { - // Space or digit: start of a numbered sequence line. - self.state = State::Gb(GbState::Num); - } - None - } - GbState::Num => { - if b == b'\n' { - self.state = State::Gb(GbState::LineStart); - None - } else if b.is_ascii_digit() || b == b' ' { - None // still in the number prefix - } else { - // First letter: transition to Seq and process this byte. - self.state = State::Gb(GbState::Seq); - Some(normalize_nuc(b)) - } - } - GbState::Seq => { - if b == b'\n' { - self.state = State::Gb(GbState::LineStart); - None - } else if b == b' ' { - None // inter-group space - } else { - Some(normalize_nuc(b)) - } - } - GbState::Slash => { - if b == b'/' { - // End of GBFF record ("//"). - self.state = State::Gb(GbState::Pre(0)); // ready for next record - Some(0x00) - } else { - // Stray '/' — shouldn't happen in valid GBFF; recover. - self.state = State::Gb(GbState::LineStart); - None - } - } - } - } -} - -// ── nucleotide normalisation ────────────────────────────────────────────────── - -#[inline] -fn normalize_nuc(b: u8) -> u8 { - match b | 0x20 { - b'a' => b'A', - b'c' => b'C', - b'g' => b'G', - b't' => b'T', - _ => 0x00, - } -} - -// ── tests ───────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - - fn stream(data: &[u8]) -> Vec { - let mut s = NormalizedByteStream::new(data).unwrap(); - let mut out = Vec::new(); - while let Some(b) = s.next_byte() { - out.push(b); - } - out - } - - // ── FASTA ───────────────────────────────────────────────────────────────── - - #[test] - fn fasta_single_record() { - assert_eq!(stream(b">s1\nACGTACGT\n"), b"ACGTACGT"); - } - - #[test] - fn fasta_two_records_separated() { - let out = stream(b">s1\nACGT\n>s2\nTTTT\n"); - assert_eq!(out, b"ACGT\x00TTTT"); - } - - #[test] - fn fasta_multiline_concatenated() { - assert_eq!(stream(b">s1\nACGT\nACGT\n"), b"ACGTACGT"); - } - - #[test] - fn fasta_lowercase_uppercased() { - assert_eq!(stream(b">s1\nacgt\n"), b"ACGT"); - } - - #[test] - fn fasta_non_acgt_becomes_null() { - let out = stream(b">s1\nACGTNACGT\n"); - assert_eq!(out, b"ACGT\x00ACGT"); - } - - // ── FASTQ ───────────────────────────────────────────────────────────────── - - fn make_fastq(records: &[&[u8]]) -> Vec { - let mut buf = Vec::new(); - for seq in records { - buf.extend_from_slice(b"@hdr\n"); - buf.extend_from_slice(seq); - buf.push(b'\n'); - buf.extend_from_slice(b"+\n"); - buf.extend_from_slice(&vec![b'I'; seq.len()]); - buf.push(b'\n'); - } - buf - } - - #[test] - fn fastq_single_record() { - // Trailing \x00 is emitted on the newline that ends the quality line. - assert_eq!(stream(&make_fastq(&[b"ACGTACGT"])), b"ACGTACGT\x00"); - } - - #[test] - fn fastq_two_records_separated() { - let out = stream(&make_fastq(&[b"ACGT", b"TTTT"])); - assert_eq!(out, b"ACGT\x00TTTT\x00"); - } - - #[test] - fn fastq_lowercase_uppercased() { - assert_eq!(stream(&make_fastq(&[b"acgt"])), b"ACGT\x00"); - } - - #[test] - fn fastq_non_acgt_becomes_null() { - let out = stream(&make_fastq(&[b"ACGTNACGT"])); - assert_eq!(out, b"ACGT\x00ACGT\x00"); - } - - // ── GBFF ────────────────────────────────────────────────────────────────── - - fn make_gbff(seqs: &[&[u8]]) -> Vec { - let mut buf = Vec::new(); - for seq in seqs { - buf.extend_from_slice(b"LOCUS NC_000001\nFEATURES\nORIGIN\n"); - // write numbered lines of 60 bases each - let mut pos = 1usize; - for chunk in seq.chunks(60) { - let groups: Vec<&[u8]> = chunk.chunks(10).collect(); - let line = groups.join(&b' '); - buf.extend_from_slice(format!("{:9} ", pos).as_bytes()); - buf.extend_from_slice(&line); - buf.push(b'\n'); - pos += chunk.len(); - } - buf.extend_from_slice(b"//\n"); - } - buf - } - - #[test] - fn gbff_single_record() { - let seq = b"acgtacgtacgt"; - let out = stream(&make_gbff(&[seq])); - assert_eq!(out, b"ACGTACGTACGT\x00"); - } - - #[test] - fn gbff_two_records_separated() { - let out = stream(&make_gbff(&[b"acgtacgt", b"tttttttt"])); - assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00"); - } - - #[test] - fn gbff_non_acgt_becomes_null() { - let out = stream(&make_gbff(&[b"acgtnacgt"])); - assert_eq!(out, b"ACGT\x00ACGT\x00"); - } - - // ── rewind ──────────────────────────────────────────────────────────────── - - #[test] - fn rewind_replays_bytes() { - let mut s = NormalizedByteStream::new(b">s\nACGT\n" as &[u8]).unwrap(); - assert_eq!(s.next_byte(), Some(b'A')); - assert_eq!(s.next_byte(), Some(b'C')); - s.rewind(1); - assert_eq!(s.next_byte(), Some(b'C')); - assert_eq!(s.next_byte(), Some(b'G')); - assert_eq!(s.next_byte(), Some(b'T')); - assert_eq!(s.next_byte(), None); - } -} diff --git a/src/obiread/src/xopen.rs b/src/obiread/src/xopen.rs index f5dbbf0..f86585c 100644 --- a/src/obiread/src/xopen.rs +++ b/src/obiread/src/xopen.rs @@ -20,13 +20,13 @@ use std::io::{self, Read}; /// Open any source for reading, with transparent decompression. /// -/// Returns a `Box` that yields uncompressed bytes regardless of -/// whether the underlying source is plain text, gzip, bzip2, xz or zstd. +/// Returns a `Box` that yields uncompressed bytes regardless +/// of whether the underlying source is plain text, gzip, bzip2, xz or zstd. /// /// # Errors /// Returns an `io::Error` if the file cannot be opened, the URL cannot be /// fetched, or the compression header is malformed. -pub fn xopen(source: &str) -> io::Result>> { +pub(crate) fn open_raw(source: &str) -> io::Result> { let raw: Box = match source { "-" => Box::new(io::stdin()), s if s.starts_with("http://") || s.starts_with("https://") => http_reader(s)?, @@ -35,8 +35,15 @@ pub fn xopen(source: &str) -> io::Result>> Box::new(File::open(expanded.as_ref())?) } }; - let decompressed = decompress(raw)?; - Ok(MimeTypeGuesser::new(decompressed)) + decompress(raw) +} + +/// Open any source for reading, with transparent decompression and MIME detection. +/// +/// Wraps [`open_raw`] in a [`MimeTypeGuesser`] so callers can inspect the +/// format before consuming the stream. +pub fn xopen(source: &str) -> io::Result>> { + Ok(MimeTypeGuesser::new(open_raw(source)?)) } // ── internal helpers ────────────────────────────────────────────────────────── diff --git a/src/obiskbuilder/src/lib.rs b/src/obiskbuilder/src/lib.rs index a5409e1..6dc2de1 100644 --- a/src/obiskbuilder/src/lib.rs +++ b/src/obiskbuilder/src/lib.rs @@ -17,9 +17,21 @@ pub use iter::SuperKmerIter; pub use scratch::SuperKmerScratch; pub use stream_iter::SuperKmerStreamIter; +use obiread::NucPage; use obikrope::Rope; use obikseq::RoutableSuperKmer; +/// Collect all super-kmers from a normalised [`NucPage`]. +pub fn build_superkmers_page( + page: NucPage, + k: usize, + level_max: usize, + theta: f64, +) -> Vec { + let cursor = page.cursor(); + SuperKmerStreamIter::new(cursor, k, level_max, theta).collect() +} + /// Collect all super-kmers from a normalised rope chunk. pub fn build_superkmers( rope: Rope, diff --git a/src/obiskbuilder/src/stream_iter.rs b/src/obiskbuilder/src/stream_iter.rs index 25f41c5..dabb72b 100644 --- a/src/obiskbuilder/src/stream_iter.rs +++ b/src/obiskbuilder/src/stream_iter.rs @@ -1,35 +1,23 @@ -//! Streaming superkmer iterator that does not require a fully-buffered record. -//! -//! [`SuperKmerStreamIter`] wraps a [`NormalizedByteStream`] and yields -//! [`RoutableSuperKmer`] values one by one, exactly as [`SuperKmerIter`] does -//! over a [`Rope`], but without accumulating the whole input in memory first. -//! -//! This makes it suitable for large GBFF chromosomes (250 MiB ORIGIN sections) -//! or any other source where buffering the full record would exhaust memory. -//! -//! The cut conditions and superkmer semantics are identical to [`SuperKmerIter`]: -//! -//! | Condition | stream rewind | -//! |------------------------|---------------| -//! | entropy(kmer) ≤ θ | k−1 | -//! | minimizer changed | k | -//! | super-kmer length = 256| k | -//! -//! [`SuperKmerIter`]: crate::iter::SuperKmerIter -//! [`Rope`]: obikrope::Rope +//! Streaming superkmer iterator over a [`NucPageCursor`]. -use std::io::Read; - -use obiread::stream::NormalizedByteStream; +use obiread::NucPageCursor; use obikseq::RoutableSuperKmer; use obikseq::kmer::Minimizer; use crate::rolling_stat::RollingStat; use crate::scratch::SuperKmerScratch; -/// Streaming iterator over [`RoutableSuperKmer`] values. -pub struct SuperKmerStreamIter { - stream: NormalizedByteStream, +/// Streaming iterator over [`RoutableSuperKmer`] values from a [`NucPageCursor`]. +/// +/// Cut conditions (checked in order per nucleotide, once the k-mer window is full): +/// +/// | Condition | cursor rewind | +/// |------------------------|---------------| +/// | entropy(kmer) ≤ θ | k−1 | +/// | minimizer changed | k | +/// | super-kmer length = 256| k | +pub struct SuperKmerStreamIter<'a> { + cursor: NucPageCursor<'a>, k: usize, theta: f64, scratch: SuperKmerScratch, @@ -38,23 +26,18 @@ pub struct SuperKmerStreamIter { prev_min_pos: usize, } -impl SuperKmerStreamIter { - /// Build a streaming superkmer iterator from any `Read` source. - /// - /// - `reader`: raw bytes (FASTA, FASTQ, or GBFF; format auto-detected) - /// - `k`: k-mer size (must be odd, 11 ≤ k ≤ 31) - /// - `level_max`: maximum sub-word size for entropy (1–6) - /// - `theta`: entropy threshold; k-mers with score ≤ theta are rejected - pub fn new(reader: R, k: usize, level_max: usize, theta: f64) -> std::io::Result { - Ok(Self { - stream: NormalizedByteStream::new(reader)?, +impl<'a> SuperKmerStreamIter<'a> { + /// Build an iterator from a [`NucPageCursor`] over normalised sequence data. + pub fn new(cursor: NucPageCursor<'a>, k: usize, level_max: usize, theta: f64) -> Self { + Self { + cursor, k, theta, scratch: SuperKmerScratch::new(), stat: RollingStat::new(level_max), prev_min: None, prev_min_pos: 0, - }) + } } fn reset(&mut self) { @@ -73,12 +56,12 @@ impl SuperKmerStreamIter { } } -impl Iterator for SuperKmerStreamIter { +impl Iterator for SuperKmerStreamIter<'_> { type Item = RoutableSuperKmer; fn next(&mut self) -> Option { loop { - let byte = match self.stream.next_byte() { + let byte = match self.cursor.next_byte() { None => { return self.try_emit(); } @@ -103,7 +86,7 @@ impl Iterator for SuperKmerStreamIter { // ── 1. Entropy check ───────────────────────────────────────────── if self.stat.normalized_entropy().unwrap_or(1.0) < self.theta { let result = self.try_emit(); - self.stream.rewind(self.k - 1); + self.cursor.rewind(self.k - 1); self.reset(); if result.is_some() { return result; @@ -114,11 +97,11 @@ impl Iterator for SuperKmerStreamIter { let min = self.stat.canonical_minimizer().unwrap(); let min_pos = self.stat.minimizer_position().unwrap_or(0); - // ── 2. Minimizer change check ───────────────────────────────────── + // ── 2. Minimizer change ─────────────────────────────────────────── if let Some(prev) = self.prev_min { if min != prev { let result = self.try_emit(); - self.stream.rewind(self.k); + self.cursor.rewind(self.k); self.reset(); if result.is_some() { return result; @@ -127,10 +110,10 @@ impl Iterator for SuperKmerStreamIter { } } - // ── 3. Super-kmer length check ──────────────────────────────────── + // ── 3. Super-kmer length cap ────────────────────────────────────── if self.scratch.len() == 256 { let result = self.try_emit(); - self.stream.rewind(self.k); + self.cursor.rewind(self.k); self.reset(); if result.is_some() { return result;