feat: add streaming sequence reader and superkmer iterator

Introduce the `obiread` crate with a streaming byte normalizer that processes FASTA, FASTQ, and GenBank files using a 64 KiB ring buffer for O(1) memory usage. Integrate this crate into `obiskbuilder` to provide `SuperKmerStreamIter`, enabling memory-efficient superkmer traversal with rolling entropy and minimizer-based cut conditions.
This commit is contained in:
Eric Coissac
2026-05-27 00:09:12 +02:00
parent 0d9be53d1f
commit a4b57a96de
8 changed files with 671 additions and 1 deletions
+1
View File
@@ -1637,6 +1637,7 @@ dependencies = [
"lazy_static", "lazy_static",
"obikrope", "obikrope",
"obikseq", "obikseq",
"obiread",
] ]
[[package]] [[package]]
+2
View File
@@ -13,6 +13,7 @@ pub mod normalize;
mod path_iterator; mod path_iterator;
pub mod peakreader; pub mod peakreader;
pub mod record; pub mod record;
pub mod stream;
pub mod xopen; pub mod xopen;
pub use chunk::{SeqChunkIter, fasta_chunks, fastq_chunks, pub use chunk::{SeqChunkIter, fasta_chunks, fastq_chunks,
@@ -21,6 +22,7 @@ pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequ
pub use mimetype::MimeTypeGuesser; pub use mimetype::MimeTypeGuesser;
pub use path_iterator::{PathIter, path_iter}; pub use path_iterator::{PathIter, path_iter};
pub use peakreader::PeekReader; pub use peakreader::PeekReader;
pub use stream::NormalizedByteStream;
pub use xopen::xopen; pub use xopen::xopen;
/// Default read block size: 1 MiB. /// Default read block size: 1 MiB.
+5
View File
@@ -9,6 +9,10 @@ use crate::peakreader::PeekReader;
const BUF_SIZE: usize = 4096; const BUF_SIZE: usize = 4096;
fn is_gbff(buf: &[u8]) -> bool {
buf.starts_with(b"LOCUS ")
}
static RE_FASTA: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^>[^ ]").unwrap()); static RE_FASTA: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^>[^ ]").unwrap());
fn is_fasta(buf: &[u8]) -> bool { fn is_fasta(buf: &[u8]) -> bool {
std::str::from_utf8(buf).map_or(false, |s| RE_FASTA.is_match(s)) std::str::from_utf8(buf).map_or(false, |s| RE_FASTA.is_match(s))
@@ -30,6 +34,7 @@ fn is_text(buf: &[u8]) -> bool {
// Most specific formats (fastq, fasta) come before the generic text/plain fallback. // Most specific formats (fastq, fasta) come before the generic text/plain fallback.
static INFER: LazyLock<Infer> = LazyLock::new(|| { static INFER: LazyLock<Infer> = LazyLock::new(|| {
let mut infer = Infer::new(); let mut infer = Infer::new();
infer.add("text/gbff", "gbff", is_gbff);
infer.add("text/fastq", "fastq", is_fastq); infer.add("text/fastq", "fastq", is_fastq);
infer.add("text/fasta", "fasta", is_fasta); infer.add("text/fasta", "fasta", is_fasta);
infer.add("text/plain", "txt", is_text); infer.add("text/plain", "txt", is_text);
+7 -1
View File
@@ -71,7 +71,7 @@ pub fn path_iter(paths: &[String]) -> PathIter {
PathIter::new(path_bufs) PathIter::new(path_bufs)
} }
/// Returns true if the path ends with a fasta or fastq file extension. /// Returns true if the path ends with a recognised sequence file extension.
fn is_fasta_or_fastq(path: &Path) -> bool { fn is_fasta_or_fastq(path: &Path) -> bool {
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
name.ends_with(".fasta") name.ends_with(".fasta")
@@ -82,4 +82,10 @@ fn is_fasta_or_fastq(path: &Path) -> bool {
|| name.ends_with(".fa.gz") || name.ends_with(".fa.gz")
|| name.ends_with(".fastq.gz") || name.ends_with(".fastq.gz")
|| name.ends_with(".fq.gz") || name.ends_with(".fq.gz")
|| name.ends_with(".gbff")
|| name.ends_with(".gbk")
|| name.ends_with(".gb")
|| name.ends_with(".gbff.gz")
|| name.ends_with(".gbk.gz")
|| name.ends_with(".gb.gz")
} }
+507
View File
@@ -0,0 +1,507 @@
//! Streaming byte normaliser for FASTA, FASTQ, and GBFF sequence files.
//!
//! [`NormalizedByteStream`] wraps any `Read` source in a 64 KiB circular ring
//! buffer and emits a normalised byte stream:
//!
//! - Uppercase `ACGT` bytes for in-sequence positions.
//! - `0x00` as a separator between independent sequence segments (record
//! boundary, non-ACGT base, or end of the GBFF `//` record).
//! - `None` at true EOF.
//!
//! Unlike the Rope-based pipeline, the stream never accumulates a full
//! record: it is safe for chromosomes of arbitrary size (e.g. GBFF files
//! with 250 MiB ORIGIN sections).
//!
//! # Rewind invariant
//!
//! `rewind(n)` subtracts `n` from the read head. It is only called by
//! [`obiskbuilder::SuperKmerStreamIter`] when the last `n` bytes were all
//! ACGT (no format overhead in between), so the ring buffer always contains
//! those bytes at positions `[head-n .. head]`. The maximum rewind is
//! `k ≤ 31`, well within the 32 KiB kept below the refill watermark.
use std::io::{self, Read};
// ── ring-buffer constants ─────────────────────────────────────────────────────
const CAP: usize = 65536; // must be a power of two
const MASK: usize = CAP - 1;
const REFILL_THRESHOLD: usize = 32768; // refill when available drops below this
const REFILL_CHUNK: usize = 32768; // target bytes per refill call
// ── format detection ──────────────────────────────────────────────────────────
#[derive(Clone, Copy, PartialEq, Eq)]
enum Format { Fasta, Fastq, Gbff }
fn detect_format(buf: &[u8]) -> Format {
if buf.starts_with(b"LOCUS ") {
Format::Gbff
} else if buf.first() == Some(&b'>') {
Format::Fasta
} else {
Format::Fastq
}
}
// ── format state machines ─────────────────────────────────────────────────────
#[derive(Clone, Copy)]
enum FaState {
OutSeq, // reading header / between records
InSeq, // reading sequence bases
}
#[derive(Clone, Copy)]
enum FqState {
Header, // reading @id line
Seq, // reading sequence bases
Plus, // reading +[id] separator line
Qual(usize), // skipping quality bytes; payload = remaining count
}
const ORIGIN: &[u8] = b"ORIGIN";
#[derive(Clone, Copy)]
enum GbState {
Pre(u8), // pre-ORIGIN: n chars of "ORIGIN" matched at current line start
PreSkip, // pre-ORIGIN: skip rest of non-ORIGIN line then → Pre(0)
LineStart, // ORIGIN data: at column 0 of a numbered data line
Num, // ORIGIN data: skipping spaces + line-number digits
Seq, // ORIGIN data: reading lowercase sequence bases
Slash, // ORIGIN data: saw the first '/' of a potential '//' end marker
}
#[derive(Clone, Copy)]
enum State {
Fa(FaState),
Fq(FqState),
Gb(GbState),
}
// ── NormalizedByteStream ──────────────────────────────────────────────────────
/// A streaming normaliser over any `Read` source.
///
/// Call [`next_byte`](NormalizedByteStream::next_byte) to consume one normalised
/// byte at a time. Call [`rewind`](NormalizedByteStream::rewind) to step back
/// up to 31 bytes (used by the superkmer builder when a minimizer or length
/// boundary is crossed).
pub struct NormalizedByteStream<R: Read> {
reader: R,
buf: Box<[u8; CAP]>,
head: usize, // absolute read position; index = head & MASK
write: usize, // absolute write position; index = write & MASK
eof: bool,
state: State,
seq_len: usize, // FASTQ only: non-newline chars seen in current sequence line
}
impl<R: Read> NormalizedByteStream<R> {
/// Wrap `reader` and detect its format from the first bytes.
pub fn new(mut reader: R) -> io::Result<Self> {
let mut buf = Box::new([0u8; CAP]);
// Initial read — enough to detect the format.
let n = reader.read(&mut buf[..REFILL_CHUNK])?;
let fmt = detect_format(&buf[..n]);
let state = match fmt {
Format::Fasta => State::Fa(FaState::OutSeq),
Format::Fastq => State::Fq(FqState::Header),
Format::Gbff => State::Gb(GbState::Pre(0)),
};
Ok(Self {
reader,
buf,
head: 0,
write: n,
eof: n == 0,
state,
seq_len: 0,
})
}
/// Step the read head back by `n` bytes.
///
/// # Safety
/// The caller guarantees that the last `n` bytes returned by
/// [`next_byte`] were all uppercase ACGT (no `0x00` separators), so those
/// bytes are still present in the ring buffer.
#[inline]
pub fn rewind(&mut self, n: usize) {
self.head -= n;
}
#[inline]
fn available(&self) -> usize {
self.write - self.head
}
fn try_refill(&mut self) -> io::Result<()> {
if self.eof || self.available() >= REFILL_THRESHOLD {
return Ok(());
}
// First segment: write_idx → end of buffer (contiguous, no wrap).
let write_idx = self.write & MASK;
let free = CAP - self.available();
let to_end = CAP - write_idx;
let chunk1 = free.min(to_end).min(REFILL_CHUNK);
if chunk1 > 0 {
let n = self.reader.read(&mut self.buf[write_idx..write_idx + chunk1])?;
self.write += n;
if n == 0 {
self.eof = true;
return Ok(());
}
}
// Second segment: if still below threshold, read into the wrapped region.
if self.available() < REFILL_THRESHOLD && !self.eof {
let write_idx2 = self.write & MASK;
let free2 = CAP - self.available();
let to_end2 = CAP - write_idx2;
let chunk2 = free2.min(to_end2).min(REFILL_CHUNK);
if chunk2 > 0 {
let n = self.reader.read(&mut self.buf[write_idx2..write_idx2 + chunk2])?;
self.write += n;
if n == 0 {
self.eof = true;
}
}
}
Ok(())
}
/// Return the next normalised byte, or `None` at EOF.
///
/// Emits uppercase `A`, `C`, `G`, `T`, or `0x00` (segment separator).
pub fn next_byte(&mut self) -> Option<u8> {
loop {
// Proactive refill: keep ≥ REFILL_THRESHOLD bytes in the buffer
// so rewinds are always safe.
if !self.eof && self.available() < REFILL_THRESHOLD {
let _ = self.try_refill();
}
if self.available() == 0 {
return None;
}
let b = self.buf[self.head & MASK];
self.head += 1;
// Copy state to avoid borrow conflicts while mutating self.state.
let state = self.state;
if let Some(out) = self.process(state, b) {
return Some(out);
}
}
}
// ── per-format byte processors ────────────────────────────────────────────
#[inline]
fn process(&mut self, state: State, b: u8) -> Option<u8> {
match state {
State::Fa(fa) => self.fasta(fa, b),
State::Fq(fq) => self.fastq(fq, b),
State::Gb(gb) => self.gbff(gb, b),
}
}
fn fasta(&mut self, fa: FaState, b: u8) -> Option<u8> {
match fa {
FaState::OutSeq => {
if b == b'\n' {
self.state = State::Fa(FaState::InSeq);
}
None // skip header bytes
}
FaState::InSeq => {
if b == b'>' {
self.state = State::Fa(FaState::OutSeq);
Some(0x00) // record boundary
} else if b == b'\n' || b == b'\r' {
None
} else {
Some(normalize_nuc(b))
}
}
}
}
fn fastq(&mut self, fq: FqState, b: u8) -> Option<u8> {
match fq {
FqState::Header => {
if b == b'\n' {
self.state = State::Fq(FqState::Seq);
self.seq_len = 0;
}
None
}
FqState::Seq => {
if b == b'\n' {
self.state = State::Fq(FqState::Plus);
None
} else if b == b'\r' {
None
} else {
self.seq_len += 1; // count all chars, including non-ACGT
Some(normalize_nuc(b))
}
}
FqState::Plus => {
if b == b'\n' {
self.state = State::Fq(FqState::Qual(self.seq_len));
}
None
}
FqState::Qual(rem) => {
if b == b'\n' {
if rem == 0 {
// Quality line ended after all bases consumed → new record.
self.state = State::Fq(FqState::Header);
Some(0x00) // record boundary
} else {
// Newline inside multi-line quality — keep counting.
None
}
} else if b == b'\r' {
None
} else if rem > 0 {
self.state = State::Fq(FqState::Qual(rem - 1));
None
} else {
// rem == 0 but non-newline: shouldn't happen in valid FASTQ.
None
}
}
}
}
fn gbff(&mut self, gb: GbState, b: u8) -> Option<u8> {
match gb {
GbState::Pre(n) => {
let n = n as usize;
if n < 6 {
if b == ORIGIN[n] {
self.state = State::Gb(GbState::Pre((n + 1) as u8));
} else if b == b'\n' {
self.state = State::Gb(GbState::Pre(0));
} else {
self.state = State::Gb(GbState::PreSkip);
}
} else {
// All 6 chars of "ORIGIN" matched; skip the rest of the header line.
if b == b'\n' {
self.state = State::Gb(GbState::LineStart);
}
// Non-'\n': stay in Pre(6) implicitly (state unchanged).
}
None
}
GbState::PreSkip => {
if b == b'\n' {
self.state = State::Gb(GbState::Pre(0));
}
None
}
GbState::LineStart => {
if b == b'/' {
self.state = State::Gb(GbState::Slash);
} else if b != b'\n' {
// Space or digit: start of a numbered sequence line.
self.state = State::Gb(GbState::Num);
}
None
}
GbState::Num => {
if b == b'\n' {
self.state = State::Gb(GbState::LineStart);
None
} else if b.is_ascii_digit() || b == b' ' {
None // still in the number prefix
} else {
// First letter: transition to Seq and process this byte.
self.state = State::Gb(GbState::Seq);
Some(normalize_nuc(b))
}
}
GbState::Seq => {
if b == b'\n' {
self.state = State::Gb(GbState::LineStart);
None
} else if b == b' ' {
None // inter-group space
} else {
Some(normalize_nuc(b))
}
}
GbState::Slash => {
if b == b'/' {
// End of GBFF record ("//").
self.state = State::Gb(GbState::Pre(0)); // ready for next record
Some(0x00)
} else {
// Stray '/' — shouldn't happen in valid GBFF; recover.
self.state = State::Gb(GbState::LineStart);
None
}
}
}
}
}
// ── nucleotide normalisation ──────────────────────────────────────────────────
#[inline]
fn normalize_nuc(b: u8) -> u8 {
match b | 0x20 {
b'a' => b'A',
b'c' => b'C',
b'g' => b'G',
b't' => b'T',
_ => 0x00,
}
}
// ── tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
fn stream(data: &[u8]) -> Vec<u8> {
let mut s = NormalizedByteStream::new(data).unwrap();
let mut out = Vec::new();
while let Some(b) = s.next_byte() {
out.push(b);
}
out
}
// ── FASTA ─────────────────────────────────────────────────────────────────
#[test]
fn fasta_single_record() {
assert_eq!(stream(b">s1\nACGTACGT\n"), b"ACGTACGT");
}
#[test]
fn fasta_two_records_separated() {
let out = stream(b">s1\nACGT\n>s2\nTTTT\n");
assert_eq!(out, b"ACGT\x00TTTT");
}
#[test]
fn fasta_multiline_concatenated() {
assert_eq!(stream(b">s1\nACGT\nACGT\n"), b"ACGTACGT");
}
#[test]
fn fasta_lowercase_uppercased() {
assert_eq!(stream(b">s1\nacgt\n"), b"ACGT");
}
#[test]
fn fasta_non_acgt_becomes_null() {
let out = stream(b">s1\nACGTNACGT\n");
assert_eq!(out, b"ACGT\x00ACGT");
}
// ── FASTQ ─────────────────────────────────────────────────────────────────
fn make_fastq(records: &[&[u8]]) -> Vec<u8> {
let mut buf = Vec::new();
for seq in records {
buf.extend_from_slice(b"@hdr\n");
buf.extend_from_slice(seq);
buf.push(b'\n');
buf.extend_from_slice(b"+\n");
buf.extend_from_slice(&vec![b'I'; seq.len()]);
buf.push(b'\n');
}
buf
}
#[test]
fn fastq_single_record() {
// Trailing \x00 is emitted on the newline that ends the quality line.
assert_eq!(stream(&make_fastq(&[b"ACGTACGT"])), b"ACGTACGT\x00");
}
#[test]
fn fastq_two_records_separated() {
let out = stream(&make_fastq(&[b"ACGT", b"TTTT"]));
assert_eq!(out, b"ACGT\x00TTTT\x00");
}
#[test]
fn fastq_lowercase_uppercased() {
assert_eq!(stream(&make_fastq(&[b"acgt"])), b"ACGT\x00");
}
#[test]
fn fastq_non_acgt_becomes_null() {
let out = stream(&make_fastq(&[b"ACGTNACGT"]));
assert_eq!(out, b"ACGT\x00ACGT\x00");
}
// ── GBFF ──────────────────────────────────────────────────────────────────
fn make_gbff(seqs: &[&[u8]]) -> Vec<u8> {
let mut buf = Vec::new();
for seq in seqs {
buf.extend_from_slice(b"LOCUS NC_000001\nFEATURES\nORIGIN\n");
// write numbered lines of 60 bases each
let mut pos = 1usize;
for chunk in seq.chunks(60) {
let groups: Vec<&[u8]> = chunk.chunks(10).collect();
let line = groups.join(&b' ');
buf.extend_from_slice(format!("{:9} ", pos).as_bytes());
buf.extend_from_slice(&line);
buf.push(b'\n');
pos += chunk.len();
}
buf.extend_from_slice(b"//\n");
}
buf
}
#[test]
fn gbff_single_record() {
let seq = b"acgtacgtacgt";
let out = stream(&make_gbff(&[seq]));
assert_eq!(out, b"ACGTACGTACGT\x00");
}
#[test]
fn gbff_two_records_separated() {
let out = stream(&make_gbff(&[b"acgtacgt", b"tttttttt"]));
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
}
#[test]
fn gbff_non_acgt_becomes_null() {
let out = stream(&make_gbff(&[b"acgtnacgt"]));
assert_eq!(out, b"ACGT\x00ACGT\x00");
}
// ── rewind ────────────────────────────────────────────────────────────────
#[test]
fn rewind_replays_bytes() {
let mut s = NormalizedByteStream::new(b">s\nACGT\n" as &[u8]).unwrap();
assert_eq!(s.next_byte(), Some(b'A'));
assert_eq!(s.next_byte(), Some(b'C'));
s.rewind(1);
assert_eq!(s.next_byte(), Some(b'C'));
assert_eq!(s.next_byte(), Some(b'G'));
assert_eq!(s.next_byte(), Some(b'T'));
assert_eq!(s.next_byte(), None);
}
}
+1
View File
@@ -6,6 +6,7 @@ edition = "2024"
[dependencies] [dependencies]
obikseq = { path = "../obikseq" } obikseq = { path = "../obikseq" }
obikrope = { path = "../obikrope" } obikrope = { path = "../obikrope" }
obiread = { path = "../obiread" }
lazy_static = "1.5.0" lazy_static = "1.5.0"
[dev-dependencies] [dev-dependencies]
+2
View File
@@ -6,6 +6,7 @@
#![deny(missing_docs)] #![deny(missing_docs)]
pub mod iter; pub mod iter;
pub mod stream_iter;
mod scratch; mod scratch;
pub(crate) mod encoding; pub(crate) mod encoding;
@@ -14,6 +15,7 @@ pub(crate) mod rolling_stat;
pub use iter::SuperKmerIter; pub use iter::SuperKmerIter;
pub use scratch::SuperKmerScratch; pub use scratch::SuperKmerScratch;
pub use stream_iter::SuperKmerStreamIter;
use obikrope::Rope; use obikrope::Rope;
use obikseq::RoutableSuperKmer; use obikseq::RoutableSuperKmer;
+146
View File
@@ -0,0 +1,146 @@
//! Streaming superkmer iterator that does not require a fully-buffered record.
//!
//! [`SuperKmerStreamIter`] wraps a [`NormalizedByteStream`] and yields
//! [`RoutableSuperKmer`] values one by one, exactly as [`SuperKmerIter`] does
//! over a [`Rope`], but without accumulating the whole input in memory first.
//!
//! This makes it suitable for large GBFF chromosomes (250 MiB ORIGIN sections)
//! or any other source where buffering the full record would exhaust memory.
//!
//! The cut conditions and superkmer semantics are identical to [`SuperKmerIter`]:
//!
//! | Condition | stream rewind |
//! |------------------------|---------------|
//! | entropy(kmer) ≤ θ | k1 |
//! | minimizer changed | k |
//! | super-kmer length = 256| k |
//!
//! [`SuperKmerIter`]: crate::iter::SuperKmerIter
//! [`Rope`]: obikrope::Rope
use std::io::Read;
use obiread::stream::NormalizedByteStream;
use obikseq::RoutableSuperKmer;
use obikseq::kmer::Minimizer;
use crate::rolling_stat::RollingStat;
use crate::scratch::SuperKmerScratch;
/// Streaming iterator over [`RoutableSuperKmer`] values.
pub struct SuperKmerStreamIter<R: Read> {
stream: NormalizedByteStream<R>,
k: usize,
theta: f64,
scratch: SuperKmerScratch,
stat: RollingStat,
prev_min: Option<Minimizer>,
prev_min_pos: usize,
}
impl<R: Read> SuperKmerStreamIter<R> {
/// Build a streaming superkmer iterator from any `Read` source.
///
/// - `reader`: raw bytes (FASTA, FASTQ, or GBFF; format auto-detected)
/// - `k`: k-mer size (must be odd, 11 ≤ k ≤ 31)
/// - `level_max`: maximum sub-word size for entropy (16)
/// - `theta`: entropy threshold; k-mers with score ≤ theta are rejected
pub fn new(reader: R, k: usize, level_max: usize, theta: f64) -> std::io::Result<Self> {
Ok(Self {
stream: NormalizedByteStream::new(reader)?,
k,
theta,
scratch: SuperKmerScratch::new(),
stat: RollingStat::new(level_max),
prev_min: None,
prev_min_pos: 0,
})
}
fn reset(&mut self) {
self.stat.reset();
self.scratch.reset();
self.prev_min = None;
self.prev_min_pos = 0;
}
fn try_emit(&mut self) -> Option<RoutableSuperKmer> {
if self.scratch.len() < self.k {
return None;
}
self.prev_min?;
Some(self.scratch.emit(self.prev_min_pos))
}
}
impl<R: Read> Iterator for SuperKmerStreamIter<R> {
type Item = RoutableSuperKmer;
fn next(&mut self) -> Option<RoutableSuperKmer> {
loop {
let byte = match self.stream.next_byte() {
None => {
return self.try_emit();
}
Some(0x00) => {
let result = self.try_emit();
self.reset();
if result.is_some() {
return result;
}
continue;
}
Some(b) => b,
};
self.stat.push(byte);
if !self.stat.ready() {
self.scratch.push(byte);
continue;
}
// ── 1. Entropy check ─────────────────────────────────────────────
if self.stat.normalized_entropy().unwrap_or(1.0) < self.theta {
let result = self.try_emit();
self.stream.rewind(self.k - 1);
self.reset();
if result.is_some() {
return result;
}
continue;
}
let min = self.stat.canonical_minimizer().unwrap();
let min_pos = self.stat.minimizer_position().unwrap_or(0);
// ── 2. Minimizer change check ─────────────────────────────────────
if let Some(prev) = self.prev_min {
if min != prev {
let result = self.try_emit();
self.stream.rewind(self.k);
self.reset();
if result.is_some() {
return result;
}
continue;
}
}
// ── 3. Super-kmer length check ────────────────────────────────────
if self.scratch.len() == 256 {
let result = self.try_emit();
self.stream.rewind(self.k);
self.reset();
if result.is_some() {
return result;
}
continue;
}
self.prev_min = Some(min);
self.prev_min_pos = min_pos;
self.scratch.push(byte);
}
}
}