refactor: migrate pipeline to NucPage-based stream processing

Replace the existing chunk and Rope-based processing pipeline with a fixed-size NucPage architecture. Introduce a new nucstream module featuring buffer-pooled, in-place parsing that auto-detects and decompresses FASTA/FASTQ/GenBank inputs into normalized ACGT streams with k-mer overlap preservation. Update obikmer scatter and superkmer stages to consume NucPage iterators and cursor-based navigation, eliminating std::io::Read dependencies and optimizing memory management. Add a configurable max_open_files CLI argument and update implementation documentation to reflect the new record vs. stream reading paths.
This commit is contained in:
Eric Coissac
2026-05-28 22:16:31 +02:00
parent a4b57a96de
commit cfadf63bbc
12 changed files with 876 additions and 609 deletions
+7 -5
View File
@@ -10,19 +10,21 @@ mod fasta;
mod fastq;
mod mimetype;
pub mod normalize;
mod nucstream;
mod path_iterator;
pub mod peakreader;
pub mod record;
pub mod stream;
pub mod xopen;
pub use chunk::{SeqChunkIter, fasta_chunks, fastq_chunks,
read_fasta_chunks, read_fastq_chunks, read_sequence_chunks};
pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk};
pub use chunk::{
SeqChunkIter, fasta_chunks, fastq_chunks, read_fasta_chunks, read_fastq_chunks,
read_sequence_chunks,
};
pub use mimetype::MimeTypeGuesser;
pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk};
pub use nucstream::{NucPage, NucPageCursor, open_nuc_stream};
pub use path_iterator::{PathIter, path_iter};
pub use peakreader::PeekReader;
pub use stream::NormalizedByteStream;
pub use xopen::xopen;
/// Default read block size: 1 MiB.
+732
View File
@@ -0,0 +1,732 @@
use std::io::{self, Read};
use std::mem::ManuallyDrop;
use std::sync::{Arc, Mutex};
use crate::mimetype::MimeTypeGuesser;
use crate::xopen::open_raw;
pub const MAX_K: usize = 31;
const PAGE_SIZE: usize = 65536;
// overlap (MAX_K - 1) + page data (PAGE_SIZE) + 1 byte for the end-of-page terminating 0
const BUF_SIZE: usize = MAX_K + PAGE_SIZE;
// ─── OverlapState ─────────────────────────────────────────────────────────────
pub(crate) struct OverlapState {
data: [u8; MAX_K],
len: usize,
k: usize,
}
impl OverlapState {
pub(crate) fn new(k: usize) -> Self {
assert!(k > 0 && k <= MAX_K);
Self {
data: [0u8; MAX_K],
len: 0,
k,
}
}
}
// ─── NucParser trait ──────────────────────────────────────────────────────────
// Transforms a raw page into a compacted nucleotide stream in-place.
//
// Buffer layout on each call:
// buf[0..overlap_len()] — overlap bytes copied by write_overlap()
// buf[overlap_len()..overlap_len()+n] — raw bytes just read from the source
//
// Returns the number of output bytes in buf[0..returned].
pub(crate) trait NucParser {
// required: format-specific
fn new(k: usize) -> Self
where
Self: Sized;
fn overlap_state(&self) -> &OverlapState;
fn overlap_state_mut(&mut self) -> &mut OverlapState;
fn is_in_seq(&self) -> bool;
fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize;
// provided: format-independent overlap management
fn overlap_len(&self) -> usize {
self.overlap_state().len
}
fn write_overlap(&self, buf: &mut [u8]) {
let ol = &self.overlap_state();
buf[..ol.len].copy_from_slice(&ol.data[..ol.len]);
}
// Called at end of parse_inplace: saves overlap state and returns adjusted j.
// seq_start is the j-position where the last sequence started in this call's output.
fn save_overlap(&mut self, buf: &mut [u8], j: usize, seq_start: usize) -> usize {
if !self.is_in_seq() {
self.overlap_state_mut().len = 0;
return j;
}
let seq_len = j - seq_start;
let k = self.overlap_state().k;
if seq_len >= k {
// Sequence long enough: save last k-1 nucleotides, terminate with 0.
let ol = k - 1;
self.overlap_state_mut().data[..ol].copy_from_slice(&buf[j - ol..j]);
self.overlap_state_mut().len = ol;
// SAFETY: j <= total - 1 < BUF_SIZE = buf.len()
// (total = overlap_len + n <= (MAX_K-1) + PAGE_SIZE = BUF_SIZE - 1)
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j + 1
} else if seq_len > 0 {
// Short sequence (< k): save whole fragment, strip from output.
self.overlap_state_mut().data[..seq_len].copy_from_slice(&buf[seq_start..j]);
self.overlap_state_mut().len = seq_len;
seq_start
} else {
self.overlap_state_mut().len = 0;
j
}
}
}
// ─── FASTA parser ─────────────────────────────────────────────────────────────
#[derive(Clone, Copy)]
enum FastaState {
OutSeq,
InTitle,
InSeq,
InAmbiguous,
}
pub(crate) struct FastaParser {
state: FastaState,
overlap: OverlapState,
}
impl NucParser for FastaParser {
fn new(k: usize) -> Self {
Self {
state: FastaState::OutSeq,
overlap: OverlapState::new(k),
}
}
#[inline]
fn overlap_state(&self) -> &OverlapState {
&self.overlap
}
#[inline]
fn overlap_state_mut(&mut self) -> &mut OverlapState {
&mut self.overlap
}
#[inline]
fn is_in_seq(&self) -> bool {
matches!(self.state, FastaState::InSeq)
}
fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize {
let total = self.overlap.len + n;
let mut i = 0; // read index
let mut j = 0; // write index (invariant: j <= i always)
// j-position where the current sequence started in this call's output;
// meaningful only when state is InSeq.
let mut seq_start: usize = 0;
while i < total {
// SAFETY: i < total <= BUF_SIZE = buf.len()
let byte = unsafe { *buf.get_unchecked(i) };
match self.state {
FastaState::OutSeq => {
if byte == b'>' {
self.state = FastaState::InTitle;
}
i += 1;
}
FastaState::InTitle => {
if byte == b'\n' || byte == b'\r' {
self.state = FastaState::InSeq;
seq_start = j;
}
i += 1;
}
FastaState::InSeq => {
if byte == b'\n' || byte == b'\r' {
i += 1;
continue;
}
let nuc = byte & 0xDF; // to uppercase
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
i += 1;
} else if byte == b'>' {
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = FastaState::InTitle;
i += 1;
} else {
// first ambiguous base: end current sequence if non-empty
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = FastaState::InAmbiguous;
i += 1;
}
}
FastaState::InAmbiguous => {
if byte == b'\n' || byte == b'\r' {
i += 1;
continue;
}
if byte == b'>' {
self.state = FastaState::InTitle;
i += 1;
continue;
}
let nuc = byte & 0xDF;
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
seq_start = j;
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
self.state = FastaState::InSeq;
}
i += 1;
}
}
}
self.save_overlap(buf, j, seq_start)
}
}
// ─── FASTQ parser ─────────────────────────────────────────────────────────────
#[derive(Clone, Copy)]
enum FastqState {
OutSeq,
InTitle,
InSeq,
InAmbiguous,
InQualTitle,
InQual,
}
pub(crate) struct FastqParser {
state: FastqState,
overlap: OverlapState,
}
impl NucParser for FastqParser {
fn new(k: usize) -> Self {
Self {
state: FastqState::OutSeq,
overlap: OverlapState::new(k),
}
}
#[inline]
fn overlap_state(&self) -> &OverlapState {
&self.overlap
}
#[inline]
fn overlap_state_mut(&mut self) -> &mut OverlapState {
&mut self.overlap
}
#[inline]
fn is_in_seq(&self) -> bool {
matches!(self.state, FastqState::InSeq)
}
fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize {
let total = self.overlap.len + n;
let mut i = 0;
let mut j = 0;
let mut seq_start: usize = 0;
while i < total {
// SAFETY: i < total <= BUF_SIZE = buf.len()
let byte = unsafe { *buf.get_unchecked(i) };
match self.state {
FastqState::OutSeq => {
if byte == b'@' {
self.state = FastqState::InTitle;
}
i += 1;
}
FastqState::InTitle => {
if byte == b'\n' || byte == b'\r' {
self.state = FastqState::InSeq;
seq_start = j;
}
i += 1;
}
FastqState::InSeq => {
if byte == b'\n' || byte == b'\r' {
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = FastqState::InQualTitle;
i += 1;
continue;
}
let nuc = byte & 0xDF;
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
} else {
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = FastqState::InAmbiguous;
}
i += 1;
}
FastqState::InAmbiguous => {
if byte == b'\n' || byte == b'\r' {
self.state = FastqState::InQualTitle;
i += 1;
continue;
}
let nuc = byte & 0xDF;
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
seq_start = j;
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
self.state = FastqState::InSeq;
}
i += 1;
}
FastqState::InQualTitle => {
if byte == b'\n' || byte == b'\r' {
self.state = FastqState::InQual;
}
i += 1;
}
FastqState::InQual => {
if byte == b'\n' || byte == b'\r' {
self.state = FastqState::OutSeq;
}
i += 1;
}
}
}
self.save_overlap(buf, j, seq_start)
}
}
// ─── GenBank parser ───────────────────────────────────────────────────────────
const ORIGIN_TAIL: &[u8] = b"RIGIN";
#[derive(Clone, Copy)]
enum GenbankState {
OutSeq,
MatchOrigin,
SkipOriginLine,
InSeq,
InSlash,
InAmbiguous,
}
pub(crate) struct GenbankParser {
state: GenbankState,
overlap: OverlapState,
keyword_pos: usize,
at_line_start: bool,
}
impl NucParser for GenbankParser {
fn new(k: usize) -> Self {
Self {
state: GenbankState::OutSeq,
overlap: OverlapState::new(k),
keyword_pos: 0,
at_line_start: true,
}
}
#[inline]
fn overlap_state(&self) -> &OverlapState {
&self.overlap
}
#[inline]
fn overlap_state_mut(&mut self) -> &mut OverlapState {
&mut self.overlap
}
#[inline]
fn is_in_seq(&self) -> bool {
matches!(self.state, GenbankState::InSeq)
}
fn parse_inplace(&mut self, buf: &mut [u8], n: usize) -> usize {
let total = self.overlap.len + n;
let mut i = 0;
let mut j = 0;
let mut seq_start: usize = 0;
while i < total {
// SAFETY: i < total <= BUF_SIZE = buf.len()
let byte = unsafe { *buf.get_unchecked(i) };
match self.state {
GenbankState::OutSeq => {
if byte == b'\n' || byte == b'\r' {
self.at_line_start = true;
} else if self.at_line_start && byte == b'O' {
self.state = GenbankState::MatchOrigin;
self.keyword_pos = 1;
self.at_line_start = false;
} else {
self.at_line_start = false;
}
i += 1;
}
GenbankState::MatchOrigin => {
if byte == b'\n' || byte == b'\r' {
self.state = GenbankState::OutSeq;
self.at_line_start = true;
} else if byte == ORIGIN_TAIL[self.keyword_pos - 1] {
self.keyword_pos += 1;
if self.keyword_pos == 6 {
self.state = GenbankState::SkipOriginLine;
}
} else {
self.state = GenbankState::OutSeq;
self.at_line_start = false;
}
i += 1;
}
GenbankState::SkipOriginLine => {
if byte == b'\n' || byte == b'\r' {
self.state = GenbankState::InSeq;
seq_start = j;
}
i += 1;
}
GenbankState::InSeq => {
if byte == b'\n' || byte == b'\r' {
self.at_line_start = true;
i += 1;
continue;
}
if self.at_line_start && byte == b'/' {
self.state = GenbankState::InSlash;
self.at_line_start = false;
i += 1;
continue;
}
self.at_line_start = false;
let nuc = byte & 0xDF;
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
} else if byte.is_ascii_digit() || byte == b' ' {
// position numbers and spacing between groups: skip
} else {
// ambiguous base: end current sequence if non-empty
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = GenbankState::InAmbiguous;
}
i += 1;
}
GenbankState::InSlash => {
if byte == b'/' {
// confirmed "//": end of sequence record
if j > seq_start {
unsafe {
*buf.get_unchecked_mut(j) = 0;
}
j += 1;
}
self.state = GenbankState::OutSeq;
self.at_line_start = false;
} else if byte == b'\n' || byte == b'\r' {
// single '/' line: back to sequence
self.state = GenbankState::InSeq;
self.at_line_start = true;
} else {
// false positive: single '/' mid-line, resume sequence
self.state = GenbankState::InSeq;
self.at_line_start = false;
}
i += 1;
}
GenbankState::InAmbiguous => {
if byte == b'\n' || byte == b'\r' {
self.at_line_start = true;
i += 1;
continue;
}
if self.at_line_start && byte == b'/' {
self.state = GenbankState::InSlash;
self.at_line_start = false;
i += 1;
continue;
}
self.at_line_start = false;
let nuc = byte & 0xDF;
if nuc == b'A' || nuc == b'C' || nuc == b'G' || nuc == b'T' {
seq_start = j;
// SAFETY: j <= i < total <= BUF_SIZE = buf.len()
unsafe {
*buf.get_unchecked_mut(j) = nuc;
}
j += 1;
self.state = GenbankState::InSeq;
}
// digits, spaces, other ambiguous codes: skip
i += 1;
}
}
}
self.save_overlap(buf, j, seq_start)
}
}
// ─── NucPage ──────────────────────────────────────────────────────────────────
/// Owned page of compacted nucleotides: uppercase A/C/G/T bytes separated by `0`
/// at sequence boundaries. Automatically returns its buffer to the pool on drop.
pub struct NucPage {
data: ManuallyDrop<Vec<u8>>,
len: usize,
pool: Arc<Mutex<Vec<Vec<u8>>>>,
}
impl std::ops::Deref for NucPage {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.data[..self.len]
}
}
impl Drop for NucPage {
fn drop(&mut self) {
// SAFETY: data is never accessed after this point
let buf = unsafe { ManuallyDrop::take(&mut self.data) };
self.pool.lock().unwrap().push(buf);
}
}
// ─── NucPageCursor ────────────────────────────────────────────────────────────
/// A forward cursor over the normalised bytes of a [`NucPage`].
///
/// Provides the `next_byte` / `rewind` interface consumed by
/// [`obiskbuilder::SuperKmerStreamIter`].
pub struct NucPageCursor<'a> {
data: &'a [u8],
pos: usize,
}
impl NucPageCursor<'_> {
/// Returns the next byte in the page, or `None` at end.
#[inline]
pub fn next_byte(&mut self) -> Option<u8> {
if self.pos < self.data.len() {
let b = self.data[self.pos];
self.pos += 1;
Some(b)
} else {
None
}
}
/// Steps the cursor back by `n` bytes.
///
/// The caller guarantees that the last `n` bytes were all `ACGT`
/// (no `0x00` separators), so they are still in the page buffer.
#[inline]
pub fn rewind(&mut self, n: usize) {
self.pos -= n;
}
/// Total number of bytes in the underlying page.
#[inline]
pub fn len(&self) -> usize {
self.data.len()
}
/// Returns `true` if the page contains no bytes.
#[inline]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
impl NucPage {
/// Creates a forward cursor positioned at the start of this page.
pub fn cursor(&self) -> NucPageCursor<'_> {
NucPageCursor { data: self, pos: 0 }
}
}
// ─── NucStream ────────────────────────────────────────────────────────────────
pub(crate) struct NucStream<R: Read, P: NucParser> {
reader: R,
parser: P,
pool: Arc<Mutex<Vec<Vec<u8>>>>,
eof: bool,
}
impl<R: Read, P: NucParser> NucStream<R, P> {
pub(crate) fn new(reader: R, k: usize) -> Self {
Self {
reader,
parser: P::new(k),
pool: Arc::new(Mutex::new(Vec::new())),
eof: false,
}
}
pub(crate) fn read_page(&mut self) -> Option<NucPage> {
loop {
if self.eof {
return None;
}
// take a buffer from the pool, or allocate fresh if all are in-flight
let mut buf = self
.pool
.lock()
.unwrap()
.pop()
.unwrap_or_else(|| vec![0u8; BUF_SIZE]);
let ol = self.parser.overlap_len();
self.parser.write_overlap(&mut buf[..ol]);
let n = self.reader.read(&mut buf[ol..ol + PAGE_SIZE]).unwrap_or(0);
if n == 0 {
self.eof = true;
if ol == 0 {
self.pool.lock().unwrap().push(buf);
return None;
}
}
let out_len = self.parser.parse_inplace(&mut buf, n);
if out_len > 0 {
return Some(NucPage {
data: ManuallyDrop::new(buf),
len: out_len,
pool: Arc::clone(&self.pool),
});
}
// empty page (all headers/ambiguous): return buf to pool and loop
self.pool.lock().unwrap().push(buf);
}
}
}
impl<R: Read, P: NucParser> Iterator for NucStream<R, P> {
type Item = NucPage;
fn next(&mut self) -> Option<NucPage> {
self.read_page()
}
}
// ─── FastaNucStream ───────────────────────────────────────────────────────────
pub(crate) type FastaNucStream<R> = NucStream<R, FastaParser>;
pub(crate) type FastqNucStream<R> = NucStream<R, FastqParser>;
pub(crate) type GenbankNucStream<R> = NucStream<R, GenbankParser>;
// ─── AnyNucStream ─────────────────────────────────────────────────────────────
pub(crate) enum AnyNucStream<R: Read> {
Fasta(FastaNucStream<R>),
Fastq(FastqNucStream<R>),
Genbank(GenbankNucStream<R>),
}
impl<R: Read> Iterator for AnyNucStream<R> {
type Item = NucPage;
fn next(&mut self) -> Option<NucPage> {
match self {
AnyNucStream::Fasta(s) => s.next(),
AnyNucStream::Fastq(s) => s.next(),
AnyNucStream::Genbank(s) => s.next(),
}
}
}
fn dispatch<R: Read>(
mut guesser: MimeTypeGuesser<R>,
k: usize,
) -> Option<AnyNucStream<MimeTypeGuesser<R>>> {
match guesser.mime_type() {
Some("text/fasta") => Some(AnyNucStream::Fasta(NucStream::new(guesser, k))),
Some("text/fastq") => Some(AnyNucStream::Fastq(NucStream::new(guesser, k))),
Some("text/gbff") => Some(AnyNucStream::Genbank(NucStream::new(guesser, k))),
_ => None,
}
}
/// Wraps an already-open reader in a nucleotide stream, detecting its format.
/// Returns `None` if the format is not recognised.
pub(crate) fn nuc_stream<R: Read>(
reader: R,
k: usize,
) -> Option<AnyNucStream<MimeTypeGuesser<R>>> {
dispatch(MimeTypeGuesser::new(reader), k)
}
/// Opens a nucleotide stream from any source (file path, URL, or `-` for stdin),
/// with transparent decompression and automatic format detection.
///
/// # Errors
/// Returns an `io::Error` if the source cannot be opened, decompression fails,
/// or the format is not recognised.
pub fn open_nuc_stream(
source: &str,
k: usize,
) -> io::Result<Box<dyn Iterator<Item = NucPage> + Send>> {
let reader = open_raw(source)?;
dispatch(MimeTypeGuesser::new(reader), k)
.map(|s| Box::new(s) as Box<dyn Iterator<Item = NucPage> + Send>)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "unknown sequence format"))
}
-507
View File
@@ -1,507 +0,0 @@
//! Streaming byte normaliser for FASTA, FASTQ, and GBFF sequence files.
//!
//! [`NormalizedByteStream`] wraps any `Read` source in a 64 KiB circular ring
//! buffer and emits a normalised byte stream:
//!
//! - Uppercase `ACGT` bytes for in-sequence positions.
//! - `0x00` as a separator between independent sequence segments (record
//! boundary, non-ACGT base, or end of the GBFF `//` record).
//! - `None` at true EOF.
//!
//! Unlike the Rope-based pipeline, the stream never accumulates a full
//! record: it is safe for chromosomes of arbitrary size (e.g. GBFF files
//! with 250 MiB ORIGIN sections).
//!
//! # Rewind invariant
//!
//! `rewind(n)` subtracts `n` from the read head. It is only called by
//! [`obiskbuilder::SuperKmerStreamIter`] when the last `n` bytes were all
//! ACGT (no format overhead in between), so the ring buffer always contains
//! those bytes at positions `[head-n .. head]`. The maximum rewind is
//! `k ≤ 31`, well within the 32 KiB kept below the refill watermark.
use std::io::{self, Read};
// ── ring-buffer constants ─────────────────────────────────────────────────────
const CAP: usize = 65536; // must be a power of two
const MASK: usize = CAP - 1;
const REFILL_THRESHOLD: usize = 32768; // refill when available drops below this
const REFILL_CHUNK: usize = 32768; // target bytes per refill call
// ── format detection ──────────────────────────────────────────────────────────
#[derive(Clone, Copy, PartialEq, Eq)]
enum Format { Fasta, Fastq, Gbff }
fn detect_format(buf: &[u8]) -> Format {
if buf.starts_with(b"LOCUS ") {
Format::Gbff
} else if buf.first() == Some(&b'>') {
Format::Fasta
} else {
Format::Fastq
}
}
// ── format state machines ─────────────────────────────────────────────────────
#[derive(Clone, Copy)]
enum FaState {
OutSeq, // reading header / between records
InSeq, // reading sequence bases
}
#[derive(Clone, Copy)]
enum FqState {
Header, // reading @id line
Seq, // reading sequence bases
Plus, // reading +[id] separator line
Qual(usize), // skipping quality bytes; payload = remaining count
}
const ORIGIN: &[u8] = b"ORIGIN";
#[derive(Clone, Copy)]
enum GbState {
Pre(u8), // pre-ORIGIN: n chars of "ORIGIN" matched at current line start
PreSkip, // pre-ORIGIN: skip rest of non-ORIGIN line then → Pre(0)
LineStart, // ORIGIN data: at column 0 of a numbered data line
Num, // ORIGIN data: skipping spaces + line-number digits
Seq, // ORIGIN data: reading lowercase sequence bases
Slash, // ORIGIN data: saw the first '/' of a potential '//' end marker
}
#[derive(Clone, Copy)]
enum State {
Fa(FaState),
Fq(FqState),
Gb(GbState),
}
// ── NormalizedByteStream ──────────────────────────────────────────────────────
/// A streaming normaliser over any `Read` source.
///
/// Call [`next_byte`](NormalizedByteStream::next_byte) to consume one normalised
/// byte at a time. Call [`rewind`](NormalizedByteStream::rewind) to step back
/// up to 31 bytes (used by the superkmer builder when a minimizer or length
/// boundary is crossed).
pub struct NormalizedByteStream<R: Read> {
reader: R,
buf: Box<[u8; CAP]>,
head: usize, // absolute read position; index = head & MASK
write: usize, // absolute write position; index = write & MASK
eof: bool,
state: State,
seq_len: usize, // FASTQ only: non-newline chars seen in current sequence line
}
impl<R: Read> NormalizedByteStream<R> {
/// Wrap `reader` and detect its format from the first bytes.
pub fn new(mut reader: R) -> io::Result<Self> {
let mut buf = Box::new([0u8; CAP]);
// Initial read — enough to detect the format.
let n = reader.read(&mut buf[..REFILL_CHUNK])?;
let fmt = detect_format(&buf[..n]);
let state = match fmt {
Format::Fasta => State::Fa(FaState::OutSeq),
Format::Fastq => State::Fq(FqState::Header),
Format::Gbff => State::Gb(GbState::Pre(0)),
};
Ok(Self {
reader,
buf,
head: 0,
write: n,
eof: n == 0,
state,
seq_len: 0,
})
}
/// Step the read head back by `n` bytes.
///
/// # Safety
/// The caller guarantees that the last `n` bytes returned by
/// [`next_byte`] were all uppercase ACGT (no `0x00` separators), so those
/// bytes are still present in the ring buffer.
#[inline]
pub fn rewind(&mut self, n: usize) {
self.head -= n;
}
#[inline]
fn available(&self) -> usize {
self.write - self.head
}
fn try_refill(&mut self) -> io::Result<()> {
if self.eof || self.available() >= REFILL_THRESHOLD {
return Ok(());
}
// First segment: write_idx → end of buffer (contiguous, no wrap).
let write_idx = self.write & MASK;
let free = CAP - self.available();
let to_end = CAP - write_idx;
let chunk1 = free.min(to_end).min(REFILL_CHUNK);
if chunk1 > 0 {
let n = self.reader.read(&mut self.buf[write_idx..write_idx + chunk1])?;
self.write += n;
if n == 0 {
self.eof = true;
return Ok(());
}
}
// Second segment: if still below threshold, read into the wrapped region.
if self.available() < REFILL_THRESHOLD && !self.eof {
let write_idx2 = self.write & MASK;
let free2 = CAP - self.available();
let to_end2 = CAP - write_idx2;
let chunk2 = free2.min(to_end2).min(REFILL_CHUNK);
if chunk2 > 0 {
let n = self.reader.read(&mut self.buf[write_idx2..write_idx2 + chunk2])?;
self.write += n;
if n == 0 {
self.eof = true;
}
}
}
Ok(())
}
/// Return the next normalised byte, or `None` at EOF.
///
/// Emits uppercase `A`, `C`, `G`, `T`, or `0x00` (segment separator).
pub fn next_byte(&mut self) -> Option<u8> {
loop {
// Proactive refill: keep ≥ REFILL_THRESHOLD bytes in the buffer
// so rewinds are always safe.
if !self.eof && self.available() < REFILL_THRESHOLD {
let _ = self.try_refill();
}
if self.available() == 0 {
return None;
}
let b = self.buf[self.head & MASK];
self.head += 1;
// Copy state to avoid borrow conflicts while mutating self.state.
let state = self.state;
if let Some(out) = self.process(state, b) {
return Some(out);
}
}
}
// ── per-format byte processors ────────────────────────────────────────────
#[inline]
fn process(&mut self, state: State, b: u8) -> Option<u8> {
match state {
State::Fa(fa) => self.fasta(fa, b),
State::Fq(fq) => self.fastq(fq, b),
State::Gb(gb) => self.gbff(gb, b),
}
}
fn fasta(&mut self, fa: FaState, b: u8) -> Option<u8> {
match fa {
FaState::OutSeq => {
if b == b'\n' {
self.state = State::Fa(FaState::InSeq);
}
None // skip header bytes
}
FaState::InSeq => {
if b == b'>' {
self.state = State::Fa(FaState::OutSeq);
Some(0x00) // record boundary
} else if b == b'\n' || b == b'\r' {
None
} else {
Some(normalize_nuc(b))
}
}
}
}
fn fastq(&mut self, fq: FqState, b: u8) -> Option<u8> {
match fq {
FqState::Header => {
if b == b'\n' {
self.state = State::Fq(FqState::Seq);
self.seq_len = 0;
}
None
}
FqState::Seq => {
if b == b'\n' {
self.state = State::Fq(FqState::Plus);
None
} else if b == b'\r' {
None
} else {
self.seq_len += 1; // count all chars, including non-ACGT
Some(normalize_nuc(b))
}
}
FqState::Plus => {
if b == b'\n' {
self.state = State::Fq(FqState::Qual(self.seq_len));
}
None
}
FqState::Qual(rem) => {
if b == b'\n' {
if rem == 0 {
// Quality line ended after all bases consumed → new record.
self.state = State::Fq(FqState::Header);
Some(0x00) // record boundary
} else {
// Newline inside multi-line quality — keep counting.
None
}
} else if b == b'\r' {
None
} else if rem > 0 {
self.state = State::Fq(FqState::Qual(rem - 1));
None
} else {
// rem == 0 but non-newline: shouldn't happen in valid FASTQ.
None
}
}
}
}
fn gbff(&mut self, gb: GbState, b: u8) -> Option<u8> {
match gb {
GbState::Pre(n) => {
let n = n as usize;
if n < 6 {
if b == ORIGIN[n] {
self.state = State::Gb(GbState::Pre((n + 1) as u8));
} else if b == b'\n' {
self.state = State::Gb(GbState::Pre(0));
} else {
self.state = State::Gb(GbState::PreSkip);
}
} else {
// All 6 chars of "ORIGIN" matched; skip the rest of the header line.
if b == b'\n' {
self.state = State::Gb(GbState::LineStart);
}
// Non-'\n': stay in Pre(6) implicitly (state unchanged).
}
None
}
GbState::PreSkip => {
if b == b'\n' {
self.state = State::Gb(GbState::Pre(0));
}
None
}
GbState::LineStart => {
if b == b'/' {
self.state = State::Gb(GbState::Slash);
} else if b != b'\n' {
// Space or digit: start of a numbered sequence line.
self.state = State::Gb(GbState::Num);
}
None
}
GbState::Num => {
if b == b'\n' {
self.state = State::Gb(GbState::LineStart);
None
} else if b.is_ascii_digit() || b == b' ' {
None // still in the number prefix
} else {
// First letter: transition to Seq and process this byte.
self.state = State::Gb(GbState::Seq);
Some(normalize_nuc(b))
}
}
GbState::Seq => {
if b == b'\n' {
self.state = State::Gb(GbState::LineStart);
None
} else if b == b' ' {
None // inter-group space
} else {
Some(normalize_nuc(b))
}
}
GbState::Slash => {
if b == b'/' {
// End of GBFF record ("//").
self.state = State::Gb(GbState::Pre(0)); // ready for next record
Some(0x00)
} else {
// Stray '/' — shouldn't happen in valid GBFF; recover.
self.state = State::Gb(GbState::LineStart);
None
}
}
}
}
}
// ── nucleotide normalisation ──────────────────────────────────────────────────
#[inline]
fn normalize_nuc(b: u8) -> u8 {
match b | 0x20 {
b'a' => b'A',
b'c' => b'C',
b'g' => b'G',
b't' => b'T',
_ => 0x00,
}
}
// ── tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
fn stream(data: &[u8]) -> Vec<u8> {
let mut s = NormalizedByteStream::new(data).unwrap();
let mut out = Vec::new();
while let Some(b) = s.next_byte() {
out.push(b);
}
out
}
// ── FASTA ─────────────────────────────────────────────────────────────────
#[test]
fn fasta_single_record() {
assert_eq!(stream(b">s1\nACGTACGT\n"), b"ACGTACGT");
}
#[test]
fn fasta_two_records_separated() {
let out = stream(b">s1\nACGT\n>s2\nTTTT\n");
assert_eq!(out, b"ACGT\x00TTTT");
}
#[test]
fn fasta_multiline_concatenated() {
assert_eq!(stream(b">s1\nACGT\nACGT\n"), b"ACGTACGT");
}
#[test]
fn fasta_lowercase_uppercased() {
assert_eq!(stream(b">s1\nacgt\n"), b"ACGT");
}
#[test]
fn fasta_non_acgt_becomes_null() {
let out = stream(b">s1\nACGTNACGT\n");
assert_eq!(out, b"ACGT\x00ACGT");
}
// ── FASTQ ─────────────────────────────────────────────────────────────────
fn make_fastq(records: &[&[u8]]) -> Vec<u8> {
let mut buf = Vec::new();
for seq in records {
buf.extend_from_slice(b"@hdr\n");
buf.extend_from_slice(seq);
buf.push(b'\n');
buf.extend_from_slice(b"+\n");
buf.extend_from_slice(&vec![b'I'; seq.len()]);
buf.push(b'\n');
}
buf
}
#[test]
fn fastq_single_record() {
// Trailing \x00 is emitted on the newline that ends the quality line.
assert_eq!(stream(&make_fastq(&[b"ACGTACGT"])), b"ACGTACGT\x00");
}
#[test]
fn fastq_two_records_separated() {
let out = stream(&make_fastq(&[b"ACGT", b"TTTT"]));
assert_eq!(out, b"ACGT\x00TTTT\x00");
}
#[test]
fn fastq_lowercase_uppercased() {
assert_eq!(stream(&make_fastq(&[b"acgt"])), b"ACGT\x00");
}
#[test]
fn fastq_non_acgt_becomes_null() {
let out = stream(&make_fastq(&[b"ACGTNACGT"]));
assert_eq!(out, b"ACGT\x00ACGT\x00");
}
// ── GBFF ──────────────────────────────────────────────────────────────────
fn make_gbff(seqs: &[&[u8]]) -> Vec<u8> {
let mut buf = Vec::new();
for seq in seqs {
buf.extend_from_slice(b"LOCUS NC_000001\nFEATURES\nORIGIN\n");
// write numbered lines of 60 bases each
let mut pos = 1usize;
for chunk in seq.chunks(60) {
let groups: Vec<&[u8]> = chunk.chunks(10).collect();
let line = groups.join(&b' ');
buf.extend_from_slice(format!("{:9} ", pos).as_bytes());
buf.extend_from_slice(&line);
buf.push(b'\n');
pos += chunk.len();
}
buf.extend_from_slice(b"//\n");
}
buf
}
#[test]
fn gbff_single_record() {
let seq = b"acgtacgtacgt";
let out = stream(&make_gbff(&[seq]));
assert_eq!(out, b"ACGTACGTACGT\x00");
}
#[test]
fn gbff_two_records_separated() {
let out = stream(&make_gbff(&[b"acgtacgt", b"tttttttt"]));
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
}
#[test]
fn gbff_non_acgt_becomes_null() {
let out = stream(&make_gbff(&[b"acgtnacgt"]));
assert_eq!(out, b"ACGT\x00ACGT\x00");
}
// ── rewind ────────────────────────────────────────────────────────────────
#[test]
fn rewind_replays_bytes() {
let mut s = NormalizedByteStream::new(b">s\nACGT\n" as &[u8]).unwrap();
assert_eq!(s.next_byte(), Some(b'A'));
assert_eq!(s.next_byte(), Some(b'C'));
s.rewind(1);
assert_eq!(s.next_byte(), Some(b'C'));
assert_eq!(s.next_byte(), Some(b'G'));
assert_eq!(s.next_byte(), Some(b'T'));
assert_eq!(s.next_byte(), None);
}
}
+12 -5
View File
@@ -20,13 +20,13 @@ use std::io::{self, Read};
/// Open any source for reading, with transparent decompression.
///
/// Returns a `Box<dyn Read>` that yields uncompressed bytes regardless of
/// whether the underlying source is plain text, gzip, bzip2, xz or zstd.
/// Returns a `Box<dyn Read + Send>` that yields uncompressed bytes regardless
/// of whether the underlying source is plain text, gzip, bzip2, xz or zstd.
///
/// # Errors
/// Returns an `io::Error` if the file cannot be opened, the URL cannot be
/// fetched, or the compression header is malformed.
pub fn xopen(source: &str) -> io::Result<MimeTypeGuesser<Box<dyn Read + Send>>> {
pub(crate) fn open_raw(source: &str) -> io::Result<Box<dyn Read + Send>> {
let raw: Box<dyn Read + Send> = match source {
"-" => Box::new(io::stdin()),
s if s.starts_with("http://") || s.starts_with("https://") => http_reader(s)?,
@@ -35,8 +35,15 @@ pub fn xopen(source: &str) -> io::Result<MimeTypeGuesser<Box<dyn Read + Send>>>
Box::new(File::open(expanded.as_ref())?)
}
};
let decompressed = decompress(raw)?;
Ok(MimeTypeGuesser::new(decompressed))
decompress(raw)
}
/// Open any source for reading, with transparent decompression and MIME detection.
///
/// Wraps [`open_raw`] in a [`MimeTypeGuesser`] so callers can inspect the
/// format before consuming the stream.
pub fn xopen(source: &str) -> io::Result<MimeTypeGuesser<Box<dyn Read + Send>>> {
Ok(MimeTypeGuesser::new(open_raw(source)?))
}
// ── internal helpers ──────────────────────────────────────────────────────────