From a4b57a96dec2a1ec3b8cefca100dbe9f07965974 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 27 May 2026 00:09:12 +0200 Subject: [PATCH] feat: add streaming sequence reader and superkmer iterator Introduce the `obiread` crate with a streaming byte normalizer that processes FASTA, FASTQ, and GenBank files using a 64 KiB ring buffer for O(1) memory usage. Integrate this crate into `obiskbuilder` to provide `SuperKmerStreamIter`, enabling memory-efficient superkmer traversal with rolling entropy and minimizer-based cut conditions. --- src/Cargo.lock | 1 + src/obiread/src/lib.rs | 2 + src/obiread/src/mimetype.rs | 5 + src/obiread/src/path_iterator.rs | 8 +- src/obiread/src/stream.rs | 507 ++++++++++++++++++++++++++++ src/obiskbuilder/Cargo.toml | 1 + src/obiskbuilder/src/lib.rs | 2 + src/obiskbuilder/src/stream_iter.rs | 146 ++++++++ 8 files changed, 671 insertions(+), 1 deletion(-) create mode 100644 src/obiread/src/stream.rs create mode 100644 src/obiskbuilder/src/stream_iter.rs diff --git a/src/Cargo.lock b/src/Cargo.lock index dd85960..4cfce7b 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1637,6 +1637,7 @@ dependencies = [ "lazy_static", "obikrope", "obikseq", + "obiread", ] [[package]] diff --git a/src/obiread/src/lib.rs b/src/obiread/src/lib.rs index 771d839..94f0b8e 100644 --- a/src/obiread/src/lib.rs +++ b/src/obiread/src/lib.rs @@ -13,6 +13,7 @@ pub mod normalize; mod path_iterator; pub mod peakreader; pub mod record; +pub mod stream; pub mod xopen; pub use chunk::{SeqChunkIter, fasta_chunks, fastq_chunks, @@ -21,6 +22,7 @@ pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequ pub use mimetype::MimeTypeGuesser; pub use path_iterator::{PathIter, path_iter}; pub use peakreader::PeekReader; +pub use stream::NormalizedByteStream; pub use xopen::xopen; /// Default read block size: 1 MiB. diff --git a/src/obiread/src/mimetype.rs b/src/obiread/src/mimetype.rs index 906b627..ab7ea5c 100644 --- a/src/obiread/src/mimetype.rs +++ b/src/obiread/src/mimetype.rs @@ -9,6 +9,10 @@ use crate::peakreader::PeekReader; const BUF_SIZE: usize = 4096; +fn is_gbff(buf: &[u8]) -> bool { + buf.starts_with(b"LOCUS ") +} + static RE_FASTA: LazyLock = LazyLock::new(|| Regex::new(r"^>[^ ]").unwrap()); fn is_fasta(buf: &[u8]) -> bool { std::str::from_utf8(buf).map_or(false, |s| RE_FASTA.is_match(s)) @@ -30,6 +34,7 @@ fn is_text(buf: &[u8]) -> bool { // Most specific formats (fastq, fasta) come before the generic text/plain fallback. static INFER: LazyLock = LazyLock::new(|| { let mut infer = Infer::new(); + infer.add("text/gbff", "gbff", is_gbff); infer.add("text/fastq", "fastq", is_fastq); infer.add("text/fasta", "fasta", is_fasta); infer.add("text/plain", "txt", is_text); diff --git a/src/obiread/src/path_iterator.rs b/src/obiread/src/path_iterator.rs index 9e73144..815f22c 100644 --- a/src/obiread/src/path_iterator.rs +++ b/src/obiread/src/path_iterator.rs @@ -71,7 +71,7 @@ pub fn path_iter(paths: &[String]) -> PathIter { PathIter::new(path_bufs) } -/// Returns true if the path ends with a fasta or fastq file extension. +/// Returns true if the path ends with a recognised sequence file extension. fn is_fasta_or_fastq(path: &Path) -> bool { let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); name.ends_with(".fasta") @@ -82,4 +82,10 @@ fn is_fasta_or_fastq(path: &Path) -> bool { || name.ends_with(".fa.gz") || name.ends_with(".fastq.gz") || name.ends_with(".fq.gz") + || name.ends_with(".gbff") + || name.ends_with(".gbk") + || name.ends_with(".gb") + || name.ends_with(".gbff.gz") + || name.ends_with(".gbk.gz") + || name.ends_with(".gb.gz") } diff --git a/src/obiread/src/stream.rs b/src/obiread/src/stream.rs new file mode 100644 index 0000000..e797ee2 --- /dev/null +++ b/src/obiread/src/stream.rs @@ -0,0 +1,507 @@ +//! Streaming byte normaliser for FASTA, FASTQ, and GBFF sequence files. +//! +//! [`NormalizedByteStream`] wraps any `Read` source in a 64 KiB circular ring +//! buffer and emits a normalised byte stream: +//! +//! - Uppercase `ACGT` bytes for in-sequence positions. +//! - `0x00` as a separator between independent sequence segments (record +//! boundary, non-ACGT base, or end of the GBFF `//` record). +//! - `None` at true EOF. +//! +//! Unlike the Rope-based pipeline, the stream never accumulates a full +//! record: it is safe for chromosomes of arbitrary size (e.g. GBFF files +//! with 250 MiB ORIGIN sections). +//! +//! # Rewind invariant +//! +//! `rewind(n)` subtracts `n` from the read head. It is only called by +//! [`obiskbuilder::SuperKmerStreamIter`] when the last `n` bytes were all +//! ACGT (no format overhead in between), so the ring buffer always contains +//! those bytes at positions `[head-n .. head]`. The maximum rewind is +//! `k ≤ 31`, well within the 32 KiB kept below the refill watermark. + +use std::io::{self, Read}; + +// ── ring-buffer constants ───────────────────────────────────────────────────── + +const CAP: usize = 65536; // must be a power of two +const MASK: usize = CAP - 1; +const REFILL_THRESHOLD: usize = 32768; // refill when available drops below this +const REFILL_CHUNK: usize = 32768; // target bytes per refill call + +// ── format detection ────────────────────────────────────────────────────────── + +#[derive(Clone, Copy, PartialEq, Eq)] +enum Format { Fasta, Fastq, Gbff } + +fn detect_format(buf: &[u8]) -> Format { + if buf.starts_with(b"LOCUS ") { + Format::Gbff + } else if buf.first() == Some(&b'>') { + Format::Fasta + } else { + Format::Fastq + } +} + +// ── format state machines ───────────────────────────────────────────────────── + +#[derive(Clone, Copy)] +enum FaState { + OutSeq, // reading header / between records + InSeq, // reading sequence bases +} + +#[derive(Clone, Copy)] +enum FqState { + Header, // reading @id line + Seq, // reading sequence bases + Plus, // reading +[id] separator line + Qual(usize), // skipping quality bytes; payload = remaining count +} + +const ORIGIN: &[u8] = b"ORIGIN"; + +#[derive(Clone, Copy)] +enum GbState { + Pre(u8), // pre-ORIGIN: n chars of "ORIGIN" matched at current line start + PreSkip, // pre-ORIGIN: skip rest of non-ORIGIN line then → Pre(0) + LineStart, // ORIGIN data: at column 0 of a numbered data line + Num, // ORIGIN data: skipping spaces + line-number digits + Seq, // ORIGIN data: reading lowercase sequence bases + Slash, // ORIGIN data: saw the first '/' of a potential '//' end marker +} + +#[derive(Clone, Copy)] +enum State { + Fa(FaState), + Fq(FqState), + Gb(GbState), +} + +// ── NormalizedByteStream ────────────────────────────────────────────────────── + +/// A streaming normaliser over any `Read` source. +/// +/// Call [`next_byte`](NormalizedByteStream::next_byte) to consume one normalised +/// byte at a time. Call [`rewind`](NormalizedByteStream::rewind) to step back +/// up to 31 bytes (used by the superkmer builder when a minimizer or length +/// boundary is crossed). +pub struct NormalizedByteStream { + reader: R, + buf: Box<[u8; CAP]>, + head: usize, // absolute read position; index = head & MASK + write: usize, // absolute write position; index = write & MASK + eof: bool, + state: State, + seq_len: usize, // FASTQ only: non-newline chars seen in current sequence line +} + +impl NormalizedByteStream { + /// Wrap `reader` and detect its format from the first bytes. + pub fn new(mut reader: R) -> io::Result { + let mut buf = Box::new([0u8; CAP]); + + // Initial read — enough to detect the format. + let n = reader.read(&mut buf[..REFILL_CHUNK])?; + let fmt = detect_format(&buf[..n]); + let state = match fmt { + Format::Fasta => State::Fa(FaState::OutSeq), + Format::Fastq => State::Fq(FqState::Header), + Format::Gbff => State::Gb(GbState::Pre(0)), + }; + + Ok(Self { + reader, + buf, + head: 0, + write: n, + eof: n == 0, + state, + seq_len: 0, + }) + } + + /// Step the read head back by `n` bytes. + /// + /// # Safety + /// The caller guarantees that the last `n` bytes returned by + /// [`next_byte`] were all uppercase ACGT (no `0x00` separators), so those + /// bytes are still present in the ring buffer. + #[inline] + pub fn rewind(&mut self, n: usize) { + self.head -= n; + } + + #[inline] + fn available(&self) -> usize { + self.write - self.head + } + + fn try_refill(&mut self) -> io::Result<()> { + if self.eof || self.available() >= REFILL_THRESHOLD { + return Ok(()); + } + + // First segment: write_idx → end of buffer (contiguous, no wrap). + let write_idx = self.write & MASK; + let free = CAP - self.available(); + let to_end = CAP - write_idx; + let chunk1 = free.min(to_end).min(REFILL_CHUNK); + + if chunk1 > 0 { + let n = self.reader.read(&mut self.buf[write_idx..write_idx + chunk1])?; + self.write += n; + if n == 0 { + self.eof = true; + return Ok(()); + } + } + + // Second segment: if still below threshold, read into the wrapped region. + if self.available() < REFILL_THRESHOLD && !self.eof { + let write_idx2 = self.write & MASK; + let free2 = CAP - self.available(); + let to_end2 = CAP - write_idx2; + let chunk2 = free2.min(to_end2).min(REFILL_CHUNK); + if chunk2 > 0 { + let n = self.reader.read(&mut self.buf[write_idx2..write_idx2 + chunk2])?; + self.write += n; + if n == 0 { + self.eof = true; + } + } + } + + Ok(()) + } + + /// Return the next normalised byte, or `None` at EOF. + /// + /// Emits uppercase `A`, `C`, `G`, `T`, or `0x00` (segment separator). + pub fn next_byte(&mut self) -> Option { + loop { + // Proactive refill: keep ≥ REFILL_THRESHOLD bytes in the buffer + // so rewinds are always safe. + if !self.eof && self.available() < REFILL_THRESHOLD { + let _ = self.try_refill(); + } + + if self.available() == 0 { + return None; + } + + let b = self.buf[self.head & MASK]; + self.head += 1; + + // Copy state to avoid borrow conflicts while mutating self.state. + let state = self.state; + if let Some(out) = self.process(state, b) { + return Some(out); + } + } + } + + // ── per-format byte processors ──────────────────────────────────────────── + + #[inline] + fn process(&mut self, state: State, b: u8) -> Option { + match state { + State::Fa(fa) => self.fasta(fa, b), + State::Fq(fq) => self.fastq(fq, b), + State::Gb(gb) => self.gbff(gb, b), + } + } + + fn fasta(&mut self, fa: FaState, b: u8) -> Option { + match fa { + FaState::OutSeq => { + if b == b'\n' { + self.state = State::Fa(FaState::InSeq); + } + None // skip header bytes + } + FaState::InSeq => { + if b == b'>' { + self.state = State::Fa(FaState::OutSeq); + Some(0x00) // record boundary + } else if b == b'\n' || b == b'\r' { + None + } else { + Some(normalize_nuc(b)) + } + } + } + } + + fn fastq(&mut self, fq: FqState, b: u8) -> Option { + match fq { + FqState::Header => { + if b == b'\n' { + self.state = State::Fq(FqState::Seq); + self.seq_len = 0; + } + None + } + FqState::Seq => { + if b == b'\n' { + self.state = State::Fq(FqState::Plus); + None + } else if b == b'\r' { + None + } else { + self.seq_len += 1; // count all chars, including non-ACGT + Some(normalize_nuc(b)) + } + } + FqState::Plus => { + if b == b'\n' { + self.state = State::Fq(FqState::Qual(self.seq_len)); + } + None + } + FqState::Qual(rem) => { + if b == b'\n' { + if rem == 0 { + // Quality line ended after all bases consumed → new record. + self.state = State::Fq(FqState::Header); + Some(0x00) // record boundary + } else { + // Newline inside multi-line quality — keep counting. + None + } + } else if b == b'\r' { + None + } else if rem > 0 { + self.state = State::Fq(FqState::Qual(rem - 1)); + None + } else { + // rem == 0 but non-newline: shouldn't happen in valid FASTQ. + None + } + } + } + } + + fn gbff(&mut self, gb: GbState, b: u8) -> Option { + match gb { + GbState::Pre(n) => { + let n = n as usize; + if n < 6 { + if b == ORIGIN[n] { + self.state = State::Gb(GbState::Pre((n + 1) as u8)); + } else if b == b'\n' { + self.state = State::Gb(GbState::Pre(0)); + } else { + self.state = State::Gb(GbState::PreSkip); + } + } else { + // All 6 chars of "ORIGIN" matched; skip the rest of the header line. + if b == b'\n' { + self.state = State::Gb(GbState::LineStart); + } + // Non-'\n': stay in Pre(6) implicitly (state unchanged). + } + None + } + GbState::PreSkip => { + if b == b'\n' { + self.state = State::Gb(GbState::Pre(0)); + } + None + } + GbState::LineStart => { + if b == b'/' { + self.state = State::Gb(GbState::Slash); + } else if b != b'\n' { + // Space or digit: start of a numbered sequence line. + self.state = State::Gb(GbState::Num); + } + None + } + GbState::Num => { + if b == b'\n' { + self.state = State::Gb(GbState::LineStart); + None + } else if b.is_ascii_digit() || b == b' ' { + None // still in the number prefix + } else { + // First letter: transition to Seq and process this byte. + self.state = State::Gb(GbState::Seq); + Some(normalize_nuc(b)) + } + } + GbState::Seq => { + if b == b'\n' { + self.state = State::Gb(GbState::LineStart); + None + } else if b == b' ' { + None // inter-group space + } else { + Some(normalize_nuc(b)) + } + } + GbState::Slash => { + if b == b'/' { + // End of GBFF record ("//"). + self.state = State::Gb(GbState::Pre(0)); // ready for next record + Some(0x00) + } else { + // Stray '/' — shouldn't happen in valid GBFF; recover. + self.state = State::Gb(GbState::LineStart); + None + } + } + } + } +} + +// ── nucleotide normalisation ────────────────────────────────────────────────── + +#[inline] +fn normalize_nuc(b: u8) -> u8 { + match b | 0x20 { + b'a' => b'A', + b'c' => b'C', + b'g' => b'G', + b't' => b'T', + _ => 0x00, + } +} + +// ── tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + fn stream(data: &[u8]) -> Vec { + let mut s = NormalizedByteStream::new(data).unwrap(); + let mut out = Vec::new(); + while let Some(b) = s.next_byte() { + out.push(b); + } + out + } + + // ── FASTA ───────────────────────────────────────────────────────────────── + + #[test] + fn fasta_single_record() { + assert_eq!(stream(b">s1\nACGTACGT\n"), b"ACGTACGT"); + } + + #[test] + fn fasta_two_records_separated() { + let out = stream(b">s1\nACGT\n>s2\nTTTT\n"); + assert_eq!(out, b"ACGT\x00TTTT"); + } + + #[test] + fn fasta_multiline_concatenated() { + assert_eq!(stream(b">s1\nACGT\nACGT\n"), b"ACGTACGT"); + } + + #[test] + fn fasta_lowercase_uppercased() { + assert_eq!(stream(b">s1\nacgt\n"), b"ACGT"); + } + + #[test] + fn fasta_non_acgt_becomes_null() { + let out = stream(b">s1\nACGTNACGT\n"); + assert_eq!(out, b"ACGT\x00ACGT"); + } + + // ── FASTQ ───────────────────────────────────────────────────────────────── + + fn make_fastq(records: &[&[u8]]) -> Vec { + let mut buf = Vec::new(); + for seq in records { + buf.extend_from_slice(b"@hdr\n"); + buf.extend_from_slice(seq); + buf.push(b'\n'); + buf.extend_from_slice(b"+\n"); + buf.extend_from_slice(&vec![b'I'; seq.len()]); + buf.push(b'\n'); + } + buf + } + + #[test] + fn fastq_single_record() { + // Trailing \x00 is emitted on the newline that ends the quality line. + assert_eq!(stream(&make_fastq(&[b"ACGTACGT"])), b"ACGTACGT\x00"); + } + + #[test] + fn fastq_two_records_separated() { + let out = stream(&make_fastq(&[b"ACGT", b"TTTT"])); + assert_eq!(out, b"ACGT\x00TTTT\x00"); + } + + #[test] + fn fastq_lowercase_uppercased() { + assert_eq!(stream(&make_fastq(&[b"acgt"])), b"ACGT\x00"); + } + + #[test] + fn fastq_non_acgt_becomes_null() { + let out = stream(&make_fastq(&[b"ACGTNACGT"])); + assert_eq!(out, b"ACGT\x00ACGT\x00"); + } + + // ── GBFF ────────────────────────────────────────────────────────────────── + + fn make_gbff(seqs: &[&[u8]]) -> Vec { + let mut buf = Vec::new(); + for seq in seqs { + buf.extend_from_slice(b"LOCUS NC_000001\nFEATURES\nORIGIN\n"); + // write numbered lines of 60 bases each + let mut pos = 1usize; + for chunk in seq.chunks(60) { + let groups: Vec<&[u8]> = chunk.chunks(10).collect(); + let line = groups.join(&b' '); + buf.extend_from_slice(format!("{:9} ", pos).as_bytes()); + buf.extend_from_slice(&line); + buf.push(b'\n'); + pos += chunk.len(); + } + buf.extend_from_slice(b"//\n"); + } + buf + } + + #[test] + fn gbff_single_record() { + let seq = b"acgtacgtacgt"; + let out = stream(&make_gbff(&[seq])); + assert_eq!(out, b"ACGTACGTACGT\x00"); + } + + #[test] + fn gbff_two_records_separated() { + let out = stream(&make_gbff(&[b"acgtacgt", b"tttttttt"])); + assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00"); + } + + #[test] + fn gbff_non_acgt_becomes_null() { + let out = stream(&make_gbff(&[b"acgtnacgt"])); + assert_eq!(out, b"ACGT\x00ACGT\x00"); + } + + // ── rewind ──────────────────────────────────────────────────────────────── + + #[test] + fn rewind_replays_bytes() { + let mut s = NormalizedByteStream::new(b">s\nACGT\n" as &[u8]).unwrap(); + assert_eq!(s.next_byte(), Some(b'A')); + assert_eq!(s.next_byte(), Some(b'C')); + s.rewind(1); + assert_eq!(s.next_byte(), Some(b'C')); + assert_eq!(s.next_byte(), Some(b'G')); + assert_eq!(s.next_byte(), Some(b'T')); + assert_eq!(s.next_byte(), None); + } +} diff --git a/src/obiskbuilder/Cargo.toml b/src/obiskbuilder/Cargo.toml index 0b68fb8..5e19801 100644 --- a/src/obiskbuilder/Cargo.toml +++ b/src/obiskbuilder/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] obikseq = { path = "../obikseq" } obikrope = { path = "../obikrope" } +obiread = { path = "../obiread" } lazy_static = "1.5.0" [dev-dependencies] diff --git a/src/obiskbuilder/src/lib.rs b/src/obiskbuilder/src/lib.rs index 89ea349..a5409e1 100644 --- a/src/obiskbuilder/src/lib.rs +++ b/src/obiskbuilder/src/lib.rs @@ -6,6 +6,7 @@ #![deny(missing_docs)] pub mod iter; +pub mod stream_iter; mod scratch; pub(crate) mod encoding; @@ -14,6 +15,7 @@ pub(crate) mod rolling_stat; pub use iter::SuperKmerIter; pub use scratch::SuperKmerScratch; +pub use stream_iter::SuperKmerStreamIter; use obikrope::Rope; use obikseq::RoutableSuperKmer; diff --git a/src/obiskbuilder/src/stream_iter.rs b/src/obiskbuilder/src/stream_iter.rs new file mode 100644 index 0000000..25f41c5 --- /dev/null +++ b/src/obiskbuilder/src/stream_iter.rs @@ -0,0 +1,146 @@ +//! Streaming superkmer iterator that does not require a fully-buffered record. +//! +//! [`SuperKmerStreamIter`] wraps a [`NormalizedByteStream`] and yields +//! [`RoutableSuperKmer`] values one by one, exactly as [`SuperKmerIter`] does +//! over a [`Rope`], but without accumulating the whole input in memory first. +//! +//! This makes it suitable for large GBFF chromosomes (250 MiB ORIGIN sections) +//! or any other source where buffering the full record would exhaust memory. +//! +//! The cut conditions and superkmer semantics are identical to [`SuperKmerIter`]: +//! +//! | Condition | stream rewind | +//! |------------------------|---------------| +//! | entropy(kmer) ≤ θ | k−1 | +//! | minimizer changed | k | +//! | super-kmer length = 256| k | +//! +//! [`SuperKmerIter`]: crate::iter::SuperKmerIter +//! [`Rope`]: obikrope::Rope + +use std::io::Read; + +use obiread::stream::NormalizedByteStream; +use obikseq::RoutableSuperKmer; +use obikseq::kmer::Minimizer; + +use crate::rolling_stat::RollingStat; +use crate::scratch::SuperKmerScratch; + +/// Streaming iterator over [`RoutableSuperKmer`] values. +pub struct SuperKmerStreamIter { + stream: NormalizedByteStream, + k: usize, + theta: f64, + scratch: SuperKmerScratch, + stat: RollingStat, + prev_min: Option, + prev_min_pos: usize, +} + +impl SuperKmerStreamIter { + /// Build a streaming superkmer iterator from any `Read` source. + /// + /// - `reader`: raw bytes (FASTA, FASTQ, or GBFF; format auto-detected) + /// - `k`: k-mer size (must be odd, 11 ≤ k ≤ 31) + /// - `level_max`: maximum sub-word size for entropy (1–6) + /// - `theta`: entropy threshold; k-mers with score ≤ theta are rejected + pub fn new(reader: R, k: usize, level_max: usize, theta: f64) -> std::io::Result { + Ok(Self { + stream: NormalizedByteStream::new(reader)?, + k, + theta, + scratch: SuperKmerScratch::new(), + stat: RollingStat::new(level_max), + prev_min: None, + prev_min_pos: 0, + }) + } + + fn reset(&mut self) { + self.stat.reset(); + self.scratch.reset(); + self.prev_min = None; + self.prev_min_pos = 0; + } + + fn try_emit(&mut self) -> Option { + if self.scratch.len() < self.k { + return None; + } + self.prev_min?; + Some(self.scratch.emit(self.prev_min_pos)) + } +} + +impl Iterator for SuperKmerStreamIter { + type Item = RoutableSuperKmer; + + fn next(&mut self) -> Option { + loop { + let byte = match self.stream.next_byte() { + None => { + return self.try_emit(); + } + Some(0x00) => { + let result = self.try_emit(); + self.reset(); + if result.is_some() { + return result; + } + continue; + } + Some(b) => b, + }; + + self.stat.push(byte); + + if !self.stat.ready() { + self.scratch.push(byte); + continue; + } + + // ── 1. Entropy check ───────────────────────────────────────────── + if self.stat.normalized_entropy().unwrap_or(1.0) < self.theta { + let result = self.try_emit(); + self.stream.rewind(self.k - 1); + self.reset(); + if result.is_some() { + return result; + } + continue; + } + + let min = self.stat.canonical_minimizer().unwrap(); + let min_pos = self.stat.minimizer_position().unwrap_or(0); + + // ── 2. Minimizer change check ───────────────────────────────────── + if let Some(prev) = self.prev_min { + if min != prev { + let result = self.try_emit(); + self.stream.rewind(self.k); + self.reset(); + if result.is_some() { + return result; + } + continue; + } + } + + // ── 3. Super-kmer length check ──────────────────────────────────── + if self.scratch.len() == 256 { + let result = self.try_emit(); + self.stream.rewind(self.k); + self.reset(); + if result.is_some() { + return result; + } + continue; + } + + self.prev_min = Some(min); + self.prev_min_pos = min_pos; + self.scratch.push(byte); + } + } +}