diff --git a/src/Cargo.lock b/src/Cargo.lock index c588f60..cec3b4f 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -595,6 +595,14 @@ dependencies = [ "obiskbuilder", ] +[[package]] +name = "obikrope" +version = "0.1.0" +dependencies = [ + "bytes", + "criterion2", +] + [[package]] name = "obikseq" version = "0.1.0" @@ -607,8 +615,8 @@ dependencies = [ name = "obiread" version = "0.1.0" dependencies = [ - "bytes", "niffler", + "obikrope", "ureq", ] @@ -616,9 +624,8 @@ dependencies = [ name = "obiskbuilder" version = "0.1.0" dependencies = [ - "bytes", + "obikrope", "obikseq", - "obiread", ] [[package]] diff --git a/src/Cargo.toml b/src/Cargo.toml index 025769c..d61a33a 100644 --- a/src/Cargo.toml +++ b/src/Cargo.toml @@ -1,3 +1,3 @@ [workspace] resolver = "3" -members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer"] \ No newline at end of file +members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope"] diff --git a/src/obirope/Cargo.toml b/src/obikrope/Cargo.toml similarity index 77% rename from src/obirope/Cargo.toml rename to src/obikrope/Cargo.toml index 73c7c4e..1add0c6 100644 --- a/src/obirope/Cargo.toml +++ b/src/obikrope/Cargo.toml @@ -6,6 +6,5 @@ edition = "2024" [dev-dependencies] criterion2 = { version = "3", features = ["cargo_bench_support"] } -[[bench]] -name = "superkmer" -harness = false +[dependencies] +bytes = "1.11.1" diff --git a/src/obikrope/src/cursor.rs b/src/obikrope/src/cursor.rs new file mode 100644 index 0000000..333053c --- /dev/null +++ b/src/obikrope/src/cursor.rs @@ -0,0 +1,310 @@ +use std::cell::Cell; + +use crate::{Rope, RopeError}; + +#[derive(Clone, Copy)] +pub enum SeekMode { + Absolute, + Relative, + RelativeToEnd, +} + +// ── shared state ────────────────────────────────────────────────────────────── + +#[derive(Clone)] +struct CursorState<'a> { + block_idx: Cell, + block_start: Cell, + block_end: Cell, + block: Cell<&'a [Cell]>, + initialized: Cell, + current: Cell>, +} + +impl<'a> CursorState<'a> { + fn new() -> Self { + Self { + block_idx: Cell::new(0), + block_start: Cell::new(0), + block_end: Cell::new(0), + block: Cell::new(&[]), + initialized: Cell::new(false), + current: Cell::new(None), + } + } + + fn get(&self, rope: &'a Rope, i: usize) -> Option { + if !self.initialized.get() || i < self.block_start.get() || i >= self.block_end.get() { + let (bi, bs, be) = rope.lookup(i)?; + self.block_idx.set(bi); + self.block_start.set(bs); + self.block_end.set(be); + self.block.set(rope.get_block(bi)?); + self.initialized.set(true); + } + Some(self.block.get()[i - self.block_start.get()].get()) + } + + fn set(&self, rope: &'a Rope, i: usize, value: u8) -> Result<(), RopeError> { + if !self.initialized.get() || i < self.block_start.get() || i >= self.block_end.get() { + let (bi, bs, be) = rope.lookup(i).ok_or( + RopeError::OutOfBounds(format!("index out of bounds: i={} > {}", i, rope.len())), + )?; + self.block_idx.set(bi); + self.block_start.set(bs); + self.block_end.set(be); + self.block.set(rope.get_block(bi).ok_or(RopeError::BlockNotFound(format!( + "Cannot find block for index {}", + i + )))?); + self.initialized.set(true); + } + self.block.get()[i - self.block_start.get()].set(value); + Ok(()) + } +} + +// ── trait ───────────────────────────────────────────────────────────────────── + +pub trait RopeCursor { + fn get(&self, i: usize) -> Option; + fn set(&self, i: usize, value: u8) -> Result<(), RopeError>; + fn peek(&self) -> Option; + fn poke(&self, value: u8) -> Result<(), RopeError>; + fn tell(&self) -> Option; + fn seek(&self, pos: isize, mode: SeekMode) -> Result; + fn rewind(&self, go_back_of: usize) -> Result<(), RopeError>; + fn len(&self) -> usize; + fn read_next(&self) -> Result; +} + +// ── ForwardCursor ───────────────────────────────────────────────────────────── + +#[derive(Clone)] +pub struct ForwardCursor<'a> { + rope: &'a Rope, + state: CursorState<'a>, +} + +impl<'a> ForwardCursor<'a> { + pub fn new(rope: &'a Rope) -> Self { + Self { rope, state: CursorState::new() } + } + + pub fn read_ahead(&self, ahead: usize) -> Result { + let pos = self.state.current.get().ok_or(RopeError::CurrentNotSet)?; + self.state + .get(self.rope, pos + ahead) + .ok_or(RopeError::OutOfBounds(format!( + "index out of bounds: i={} + {} > {}", + pos, ahead, self.rope.len() + ))) + } + + pub fn write(&self, value: u8) -> Result<(), RopeError> { + let pos = self.state.current.get().unwrap_or(0); + self.state.set(self.rope, pos, value)?; + self.state.current.set(Some(pos + 1)); + Ok(()) + } + + pub fn iter(&self) -> ForwardIter<'a, '_> { + ForwardIter { cursor: self } + } +} + +impl<'a> RopeCursor for ForwardCursor<'a> { + fn get(&self, i: usize) -> Option { + self.state.get(self.rope, i) + } + + fn set(&self, i: usize, value: u8) -> Result<(), RopeError> { + self.state.set(self.rope, i, value) + } + + fn peek(&self) -> Option { + self.state.get(self.rope, self.state.current.get()?) + } + + fn poke(&self, value: u8) -> Result<(), RopeError> { + let pos = self.state.current.get().ok_or(RopeError::CurrentNotSet)?; + self.state.set(self.rope, pos, value) + } + + fn tell(&self) -> Option { + self.state.current.get() + } + + fn len(&self) -> usize { + self.rope.len() + } + + fn read_next(&self) -> Result { + let next_pos = match self.state.current.get() { + Some(i) => i + 1, + None => 0, + }; + let value = self.state + .get(self.rope, next_pos) + .ok_or(RopeError::OutOfBounds(format!( + "index out of bounds: i={} > {}", + next_pos, self.rope.len() + )))?; + self.state.current.set(Some(next_pos)); + Ok(value) + } + + fn seek(&self, pos: isize, mode: SeekMode) -> Result { + let pos = match mode { + SeekMode::Absolute => pos, + SeekMode::Relative => self.state.current.get().ok_or(RopeError::CurrentNotSet)? as isize + pos, + SeekMode::RelativeToEnd => self.rope.len() as isize - pos, + }; + if pos < 0 { + return Err(RopeError::OutOfBounds(format!("index out of bounds: i={} < 0", pos))); + } + self.state.current.set(Some(pos as usize)); + Ok(pos as usize) + } + + fn rewind(&self, go_back_of: usize) -> Result<(), RopeError> { + self.seek(-(go_back_of as isize), SeekMode::Relative)?; + Ok(()) + } +} + +impl Iterator for ForwardCursor<'_> { + type Item = u8; + fn next(&mut self) -> Option { + self.read_next().ok() + } +} + +pub struct ForwardIter<'a, 'b> { + cursor: &'b ForwardCursor<'a>, +} + +impl Iterator for ForwardIter<'_, '_> { + type Item = u8; + fn next(&mut self) -> Option { + self.cursor.read_next().ok() + } +} + +// ── BackwardCursor ──────────────────────────────────────────────────────────── + +#[derive(Clone)] +pub struct BackwardCursor<'a> { + rope: &'a Rope, + state: CursorState<'a>, +} + +impl<'a> BackwardCursor<'a> { + pub fn new(rope: &'a Rope) -> Self { + Self { rope, state: CursorState::new() } + } + + pub fn read_behind(&self, behind: usize) -> Result { + let pos = self.state.current.get().ok_or(RopeError::CurrentNotSet)?; + let target = pos + .checked_add(behind) + .filter(|&t| t < self.rope.len()) + .ok_or(RopeError::OutOfBounds(format!( + "index out of bounds: i={} + {} > {}", + pos, behind, self.rope.len() + )))?; + self.state + .get(self.rope, target) + .ok_or(RopeError::OutOfBounds(format!( + "index out of bounds: i={} + {} > {}", + pos, behind, self.rope.len() + ))) + } + + pub fn iter(&self) -> BackwardIter<'a, '_> { + BackwardIter { cursor: self } + } +} + +impl<'a> RopeCursor for BackwardCursor<'a> { + fn get(&self, i: usize) -> Option { + self.state.get(self.rope, i) + } + + fn set(&self, i: usize, value: u8) -> Result<(), RopeError> { + self.state.set(self.rope, i, value) + } + + fn peek(&self) -> Option { + self.state.get(self.rope, self.state.current.get()?) + } + + fn poke(&self, value: u8) -> Result<(), RopeError> { + let pos = self.state.current.get().ok_or(RopeError::CurrentNotSet)?; + self.state.set(self.rope, pos, value) + } + + fn tell(&self) -> Option { + self.state.current.get() + } + + fn len(&self) -> usize { + self.rope.len() + } + + fn read_next(&self) -> Result { + let next_pos = match self.state.current.get() { + None => self.rope.len().checked_sub(1).ok_or(RopeError::OutOfBounds( + "BackwardCursor: rope is empty".to_string(), + ))?, + Some(0) => return Err(RopeError::OutOfBounds( + "BackwardCursor: already at beginning".to_string(), + )), + Some(i) => i - 1, + }; + let value = self.state + .get(self.rope, next_pos) + .ok_or(RopeError::OutOfBounds(format!( + "BackwardCursor: index out of bounds at i={}", + next_pos + )))?; + self.state.current.set(Some(next_pos)); + Ok(value) + } + + fn seek(&self, pos: isize, mode: SeekMode) -> Result { + let pos = match mode { + SeekMode::Absolute => pos, + SeekMode::Relative => self.state.current.get().ok_or(RopeError::CurrentNotSet)? as isize - pos, + SeekMode::RelativeToEnd => self.rope.len() as isize - pos, + }; + if pos < 0 { + return Err(RopeError::OutOfBounds(format!("index out of bounds: i={} < 0", pos))); + } + self.state.current.set(Some(pos as usize)); + Ok(pos as usize) + } + + fn rewind(&self, go_back_of: usize) -> Result<(), RopeError> { + self.seek(-(go_back_of as isize), SeekMode::Relative)?; + Ok(()) + } +} + +impl Iterator for BackwardCursor<'_> { + type Item = u8; + fn next(&mut self) -> Option { + self.read_next().ok() + } +} + +pub struct BackwardIter<'a, 'b> { + cursor: &'b BackwardCursor<'a>, +} + +impl Iterator for BackwardIter<'_, '_> { + type Item = u8; + fn next(&mut self) -> Option { + self.cursor.read_next().ok() + } +} diff --git a/src/obikrope/src/error.rs b/src/obikrope/src/error.rs new file mode 100644 index 0000000..748841b --- /dev/null +++ b/src/obikrope/src/error.rs @@ -0,0 +1,6 @@ +#[derive(Debug)] +pub enum RopeError { + OutOfBounds(String), + BlockNotFound(String), + CurrentNotSet, +} diff --git a/src/obikrope/src/lib.rs b/src/obikrope/src/lib.rs new file mode 100644 index 0000000..3d9681b --- /dev/null +++ b/src/obikrope/src/lib.rs @@ -0,0 +1,8 @@ +mod cursor; +mod error; +mod rope; + +pub use { + cursor::BackwardCursor, cursor::ForwardCursor, cursor::RopeCursor, cursor::SeekMode, + error::RopeError, rope::Rope, +}; diff --git a/src/obikrope/src/rope.rs b/src/obikrope/src/rope.rs new file mode 100644 index 0000000..8b57e52 --- /dev/null +++ b/src/obikrope/src/rope.rs @@ -0,0 +1,114 @@ +use crate::{BackwardCursor, ForwardCursor, RopeError}; +use std::cell::Cell; + +pub struct Rope { + pub(crate) blocks: Vec>>, + pub(crate) length: usize, + pub(crate) start_block_idx: Vec, +} + +impl Rope { + pub fn new() -> Self { + Self { + blocks: Vec::new(), + length: 0, + start_block_idx: Vec::new(), + } + } + + pub fn push(&mut self, block: Vec) { + let block_len = block.len(); + self.start_block_idx.push(self.length); + // Safety: Cell has the same memory layout as u8 (guaranteed by the language) + let cell_block: Vec> = unsafe { + let mut v = std::mem::ManuallyDrop::new(block); + Vec::from_raw_parts(v.as_mut_ptr() as *mut Cell, v.len(), v.capacity()) + }; + self.blocks.push(cell_block); + self.length += block_len; + } + + pub fn n_blocks(&self) -> usize { + self.blocks.len() + } + + pub(crate) fn get_block(&self, block_idx: usize) -> Option<&[Cell]> { + self.blocks.get(block_idx).map(Vec::as_slice) + } + + pub fn len(&self) -> usize { + self.length + } + + pub(crate) fn lookup(&self, i: usize) -> Option<(usize, usize, usize)> { + if i >= self.length || self.blocks.is_empty() { + return None; + } + let block_idx = self.start_block_idx.partition_point(|&s| s <= i) - 1; + let from = self.start_block_idx[block_idx]; + let to = if block_idx + 1 < self.blocks.len() { + self.start_block_idx[block_idx + 1] + } else { + self.length + }; + Some((block_idx, from, to)) + } + + pub fn split_off(&mut self, pos: usize) -> Result { + if pos > self.length { + return Err(RopeError::OutOfBounds(format!( + "split_at: pos={} > length={}", + pos, self.length + ))); + } + + // pos == length: tail is empty. + if pos == self.length { + return Ok(Rope::new()); + } + + let (block_idx, from, _) = self.lookup(pos).ok_or_else(|| { + RopeError::OutOfBounds(format!("split_at: lookup failed at pos={}", pos)) + })?; + let cut_offset = pos - from; + + // Keep block_idx in self temporarily, split it, move remainder to tail. + let mut tail_blocks = self.blocks.split_off(block_idx + 1); + self.start_block_idx.truncate(block_idx + 1); + + let tail_part = self.blocks[block_idx].split_off(cut_offset); + if !tail_part.is_empty() { + tail_blocks.insert(0, tail_part); + } + + let mut tail_length = 0; + let tail_starts: Vec = tail_blocks + .iter() + .map(|b| { + let s = tail_length; + tail_length += b.len(); + s + }) + .collect(); + + self.length = pos; + + Ok(Rope { + blocks: tail_blocks, + length: tail_length, + start_block_idx: tail_starts, + }) + } + + pub fn is_empty(&self) -> bool { + self.blocks.is_empty() + } + + pub fn fw_cursor(&self) -> ForwardCursor<'_> { + ForwardCursor::new(self) + } + + pub fn bw_cursor(&self) -> BackwardCursor<'_> { + BackwardCursor::new(self) + } +} diff --git a/src/obiread/Cargo.toml b/src/obiread/Cargo.toml index 7c2cdc9..d182b93 100644 --- a/src/obiread/Cargo.toml +++ b/src/obiread/Cargo.toml @@ -4,6 +4,6 @@ version = "0.1.0" edition = "2024" [dependencies] -bytes = "1" +obikrope = { path = "../obikrope" } niffler = { version = "2", default-features = false, features = ["gz", "bz2", "lzma", "zstd"] } ureq = "2" diff --git a/src/obiread/src/chunk.rs b/src/obiread/src/chunk.rs index 28b27e1..47cf7a2 100644 --- a/src/obiread/src/chunk.rs +++ b/src/obiread/src/chunk.rs @@ -1,54 +1,40 @@ -//! Chunk iterator: yields rope slices that each end on a complete sequence record. +//! Chunk iterator: yields Rope slices that each end on a complete sequence record. //! -//! Each `Vec` yielded by [`SeqChunkIter`] contains one or more reference-counted -//! byte slices that together form a self-contained block of complete sequence records. -//! The slices are NOT contiguous in memory — the consumer iterates over them in order. -//! -//! The splitter operates directly on the rope via [`RopeCursor`], so no packing -//! (flattening into a contiguous buffer) is ever required — even for sequences -//! longer than the read block size. +//! Each `Rope` yielded by [`SeqChunkIter`] contains one or more blocks that +//! together form a self-contained block of complete sequence records. use std::io::{self, Read}; -use bytes::{Bytes, BytesMut}; +use obikrope::Rope; /// A splitter function: given the accumulated rope, returns the absolute byte /// offset at which to cut, or `None` if no complete-record boundary was found. -pub type Splitter = fn(&[Bytes]) -> Option; +pub type Splitter = fn(&Rope) -> Option; -/// Iterator that reads from `R` in blocks and yields `Vec` chunks, +/// Iterator that reads from `R` in blocks and yields `Rope` chunks, /// each ending on a complete sequence record boundary. pub struct SeqChunkIter { source: R, - rope: Vec, + rope: Rope, block_size: usize, splitter: Splitter, - probe: &'static [u8], eof: bool, } impl SeqChunkIter { /// Create a new iterator. - /// - /// - `block_size`: bytes per read call (default 1 MiB). - /// - `splitter`: format-specific backward boundary detector working on the rope. - /// - `probe`: short byte string whose presence is necessary (not sufficient) - /// for a boundary to exist (e.g. `b"\n>"` for FASTA, `b"\n@"` for FASTQ). - pub fn new(source: R, block_size: usize, splitter: Splitter, probe: &'static [u8]) -> Self { + pub fn new(source: R, block_size: usize, splitter: Splitter) -> Self { Self { source, - rope: Vec::with_capacity(4), + rope: Rope::new(), block_size, splitter, - probe, eof: false, } } - /// Read one block from source into a fresh `Bytes`. - /// Returns `None` on EOF (zero bytes read). - fn read_block(&mut self) -> io::Result> { - let mut buf = BytesMut::zeroed(self.block_size); + fn read_block(&mut self) -> io::Result>> { + let mut buf = vec![0u8; self.block_size]; let mut filled = 0; loop { match self.source.read(&mut buf[filled..]) { @@ -67,46 +53,12 @@ impl SeqChunkIter { return Ok(None); } buf.truncate(filled); - Ok(Some(buf.freeze())) - } - - /// Check whether the boundary probe might appear in the newly added block - /// or at the seam between the last two blocks. - /// - /// This is a fast O(block_size) heuristic: if the probe is absent, the - /// splitter is not called. - fn probe_in_last_chunk(&self) -> bool { - let last = match self.rope.last() { - Some(b) => b, - None => return false, - }; - - // Within the last block. - if last.windows(self.probe.len()).any(|w| w == self.probe) { - return true; - } - - // At the seam between the previous block and this one. - if self.rope.len() >= 2 { - let prev = &self.rope[self.rope.len() - 2]; - let overlap = self.probe.len() - 1; - let from = prev.len().saturating_sub(overlap); - let seam: Vec = prev[from..] - .iter() - .chain(last.iter().take(overlap)) - .copied() - .collect(); - if seam.windows(self.probe.len()).any(|w| w == self.probe) { - return true; - } - } - - false + Ok(Some(buf)) } } impl Iterator for SeqChunkIter { - type Item = io::Result>; + type Item = io::Result; fn next(&mut self) -> Option { loop { @@ -114,7 +66,7 @@ impl Iterator for SeqChunkIter { if self.rope.is_empty() { return None; } - return Some(Ok(std::mem::take(&mut self.rope))); + return Some(Ok(std::mem::replace(&mut self.rope, Rope::new()))); } match self.read_block() { @@ -128,54 +80,16 @@ impl Iterator for SeqChunkIter { } } - if self.probe_in_last_chunk() { - if let Some(abs_offset) = (self.splitter)(&self.rope) { - return Some(Ok(self.split_at(abs_offset))); - } + if let Some(abs_offset) = (self.splitter)(&self.rope) { + let tail = self.rope.split_off(abs_offset) + .expect("splitter returned valid offset"); + let chunk = std::mem::replace(&mut self.rope, tail); + return Some(Ok(chunk)); } } } } -impl SeqChunkIter { - /// Split the rope at absolute byte offset `abs_offset`. - /// - /// Returns the chunk (`rope[..abs_offset]`) as a `Vec` and stores - /// the remainder (`rope[abs_offset..]`) in `self.rope`. - fn split_at(&mut self, abs_offset: usize) -> Vec { - let mut chunk = Vec::with_capacity(self.rope.len()); - let mut remaining = abs_offset; - - let mut old_rope = std::mem::take(&mut self.rope); - let mut remainder_rope: Vec = Vec::with_capacity(2); - - for piece in old_rope.drain(..) { - if remaining == 0 { - remainder_rope.push(piece); - } else if remaining >= piece.len() { - remaining -= piece.len(); - chunk.push(piece); - } else { - // The cut falls inside this piece. - // Copy both halves so each gets its own Arc (unique ownership), - // which is required by RopeTape for in-place writing. - let head = BytesMut::from(&piece[..remaining]).freeze(); - let tail = BytesMut::from(&piece[remaining..]).freeze(); - remaining = 0; - if !head.is_empty() { - chunk.push(head); - } - if !tail.is_empty() { - remainder_rope.push(tail); - } - } - } - - self.rope = remainder_rope; - chunk - } -} - #[cfg(test)] mod tests { use super::*; @@ -183,17 +97,15 @@ mod tests { use crate::fastq::end_of_last_fastq_entry; fn fasta_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> { - SeqChunkIter::new(data, block_size, end_of_last_fasta_entry, b"\n>") + SeqChunkIter::new(data, block_size, end_of_last_fasta_entry) } fn fastq_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> { - SeqChunkIter::new(data, block_size, end_of_last_fastq_entry, b"\n@") + SeqChunkIter::new(data, block_size, end_of_last_fastq_entry) } - fn collect_fasta(chunks: Vec>) -> Vec> { - chunks.into_iter().map(|rope| { - rope.into_iter().flat_map(|b| b.to_vec()).collect() - }).collect() + fn rope_to_vec(rope: &Rope) -> Vec { + rope.fw_cursor().collect() } // ── FASTA ───────────────────────────────────────────────────────────────── @@ -203,16 +115,14 @@ mod tests { let data: &[u8] = b">s1\nACGT\n"; let chunks: Vec<_> = fasta_iter(data, 64).collect::>().unwrap(); assert_eq!(chunks.len(), 1); - let flat: Vec = chunks.into_iter().flatten().flat_map(|b| b.to_vec()).collect(); - assert_eq!(flat, b">s1\nACGT\n"); + assert_eq!(rope_to_vec(&chunks[0]), b">s1\nACGT\n"); } #[test] fn fasta_two_records_split_across_chunks() { let data: &[u8] = b">s1\nACGT\n>s2\nTTTT\n"; let chunks: Vec<_> = fasta_iter(data, 10).collect::>().unwrap(); - let flat = collect_fasta(chunks); - let all: Vec = flat.into_iter().flatten().collect(); + let all: Vec = chunks.iter().flat_map(|r| rope_to_vec(r)).collect(); assert_eq!(all, b">s1\nACGT\n>s2\nTTTT\n"); } @@ -222,7 +132,7 @@ mod tests { for block in [8, 12, 20, 100] { let chunks: Vec<_> = fasta_iter(data, block).collect::>().unwrap(); for rope in &chunks { - let flat: Vec = rope.iter().flat_map(|b| b.to_vec()).collect(); + let flat = rope_to_vec(rope); assert_eq!(flat[0], b'>', "block={block}: chunk doesn't start with '>'"); assert_eq!(*flat.last().unwrap(), b'\n', "block={block}: chunk doesn't end with newline"); } @@ -258,7 +168,7 @@ mod tests { (b"TTTTTTTT", b"HHHHHHHH"), ]).into_boxed_slice()); let chunks: Vec<_> = fastq_iter(data, 16).collect::>().unwrap(); - let all: Vec = chunks.into_iter().flatten().flat_map(|b| b.to_vec()).collect(); + let all: Vec = chunks.iter().flat_map(|r| rope_to_vec(r)).collect(); assert_eq!(all, *data); } @@ -273,7 +183,7 @@ mod tests { for block in [18, 30, 60] { let chunks: Vec<_> = fastq_iter(data, block).collect::>().unwrap(); for rope in &chunks { - let first_byte = rope.iter().flat_map(|b| b.iter().copied()).next().unwrap(); + let first_byte = rope_to_vec(rope)[0]; assert_eq!(first_byte, b'@', "block={block}: chunk doesn't start with '@'"); } } diff --git a/src/obiread/src/fasta.rs b/src/obiread/src/fasta.rs index 9bd1658..4dd4602 100644 --- a/src/obiread/src/fasta.rs +++ b/src/obiread/src/fasta.rs @@ -1,8 +1,6 @@ //! Backward boundary detection for FASTA chunks. -use bytes::Bytes; - -use crate::tape::RopeCursor; +use obikrope::{Rope, RopeCursor}; /// Scan the rope backward for the start of the last complete FASTA entry. /// @@ -10,57 +8,51 @@ use crate::tape::RopeCursor; /// record, so that `rope[..offset]` is a self-contained chunk and /// `rope[offset..]` is the remainder carried into the next chunk. /// Returns `None` if no valid boundary is found (need more data). -/// -/// Port of Go's `EndOfLastFastaEntry`, now working directly on a rope -/// via [`RopeCursor`] — no contiguous packing required. -pub fn end_of_last_fasta_entry(rope: &[Bytes]) -> Option { - let total_len: usize = rope.iter().map(|b| b.len()).sum(); - if total_len == 0 { - return None; - } - - let mut cursor = RopeCursor::end(rope)?; +pub fn end_of_last_fasta_entry(rope: &Rope) -> Option { + let cursor = rope.bw_cursor(); let mut state: u8 = 0; let mut last: usize = 0; - let mut i = total_len as isize - 1; - while i >= 0 && state < 2 { - let c = cursor.peek(rope).unwrap(); + for c in cursor.iter() { match state { 0 if c == b'>' => { + last = cursor.tell()?; state = 1; - last = i as usize; } 1 if c == b'\n' || c == b'\r' => { - state = 2; + if last > 0 { + return Some(last); + } + return None; } - _ => { + 1 => { state = 0; } - } - i -= 1; - if i >= 0 { - cursor.previous(rope); + _ => {} } } - - if i <= 0 || state != 2 { - return None; - } - Some(last) + None } #[cfg(test)] mod tests { use super::*; - use bytes::BytesMut; - fn rope(data: &[u8]) -> Vec { - vec![BytesMut::from(data).freeze()] + fn rope(data: &[u8]) -> Rope { + let mut r = Rope::new(); + r.push(data.to_vec()); + r } - fn rope2(a: &[u8], b: &[u8]) -> Vec { - vec![BytesMut::from(a).freeze(), BytesMut::from(b).freeze()] + fn rope2(a: &[u8], b: &[u8]) -> Rope { + let mut r = Rope::new(); + r.push(a.to_vec()); + r.push(b.to_vec()); + r + } + + fn flat(r: &Rope) -> Vec { + r.fw_cursor().collect() } #[test] @@ -71,39 +63,43 @@ mod tests { #[test] fn two_entries_cuts_at_second_header() { let data = b">seq1\nACGT\n>seq2\nTTTT\n"; - let pos = end_of_last_fasta_entry(&rope(data)).unwrap(); - assert_eq!(&data[pos..], b">seq2\nTTTT\n"); - assert_eq!(&data[..pos], b">seq1\nACGT\n"); + let r = rope(data); + let pos = end_of_last_fasta_entry(&r).unwrap(); + assert_eq!(&flat(&r)[pos..], b">seq2\nTTTT\n"); + assert_eq!(&flat(&r)[..pos], b">seq1\nACGT\n"); } #[test] fn three_entries_cuts_at_last_header() { let data = b">s1\nAA\n>s2\nCC\n>s3\nGG\n"; - let pos = end_of_last_fasta_entry(&rope(data)).unwrap(); - assert_eq!(&data[pos..], b">s3\nGG\n"); + let r = rope(data); + let pos = end_of_last_fasta_entry(&r).unwrap(); + assert_eq!(&flat(&r)[pos..], b">s3\nGG\n"); } #[test] fn multiline_sequence() { let data = b">s1\nACGT\nACGT\n>s2\nTTTT\n"; - let pos = end_of_last_fasta_entry(&rope(data)).unwrap(); - assert_eq!(&data[pos..], b">s2\nTTTT\n"); + let r = rope(data); + let pos = end_of_last_fasta_entry(&r).unwrap(); + assert_eq!(&flat(&r)[pos..], b">s2\nTTTT\n"); } #[test] fn crlf_line_endings() { let data = b">s1\r\nACGT\r\n>s2\r\nTTTT\r\n"; - let pos = end_of_last_fasta_entry(&rope(data)).unwrap(); - assert_eq!(&data[pos..], b">s2\r\nTTTT\r\n"); + let r = rope(data); + let pos = end_of_last_fasta_entry(&r).unwrap(); + assert_eq!(&flat(&r)[pos..], b">s2\r\nTTTT\r\n"); } #[test] fn boundary_spans_two_blocks() { - // Split so that "\n" is at end of first block and ">" at start of second. let a = b">s1\nACGT\n"; let b = b">s2\nTTTT\n"; - let total: Vec = a.iter().chain(b.iter()).copied().collect(); - let pos = end_of_last_fasta_entry(&rope2(a, b)).unwrap(); - assert_eq!(&total[pos..], b">s2\nTTTT\n"); + let r = rope2(a, b); + let all: Vec = flat(&r); + let pos = end_of_last_fasta_entry(&r).unwrap(); + assert_eq!(&all[pos..], b">s2\nTTTT\n"); } } diff --git a/src/obiread/src/fastq.rs b/src/obiread/src/fastq.rs index 8cff762..e7f9154 100644 --- a/src/obiread/src/fastq.rs +++ b/src/obiread/src/fastq.rs @@ -9,13 +9,8 @@ //! The `@` in quality lines (Phred 31 = ASCII 64) makes forward heuristics //! unreliable. The backward scanner identifies a genuine record start by //! verifying the structural context around each `@` candidate. -//! -//! Port of Go's `EndOfLastFastqEntry` (7-state machine), now working directly -//! on a rope via [`RopeCursor`] — no contiguous packing required. -use bytes::Bytes; - -use crate::tape::RopeCursor; +use obikrope::{Rope, RopeCursor, SeekMode}; #[inline] fn is_eol(c: u8) -> bool { @@ -38,42 +33,28 @@ fn is_seq_char(c: u8) -> bool { /// record, so that `rope[..offset]` is a self-contained chunk and /// `rope[offset..]` is the remainder for the next chunk. /// Returns `None` if no valid boundary is found (need more data). -pub fn end_of_last_fastq_entry(rope: &[Bytes]) -> Option { - let total_len: usize = rope.iter().map(|b| b.len()).sum(); - if total_len == 0 { - return None; - } - - let mut cursor = RopeCursor::end(rope)?; +pub fn end_of_last_fastq_entry(rope: &Rope) -> Option { + let mut cursor = rope.bw_cursor(); let mut state: u8 = 0; - let mut restart: isize = total_len as isize - 1; - let mut restart_cursor = cursor; - let mut cut: usize = total_len; - let mut i: isize = total_len as isize - 1; - - while i >= 0 && state < 7 { - let c = cursor.peek(rope).unwrap(); + let mut restart: usize = 0; + let mut cut: usize = rope.len(); + while let Some(c) = cursor.next() { match state { - // Looking for `+` separator line content 0 => { if c == b'+' { state = 1; - restart = i; - restart_cursor = cursor; + restart = cursor.tell()?; } } - // Found `+` — expect end-of-line immediately before it (going backward) 1 => { if is_eol(c) { state = 2; } else { state = 0; - i = restart; - cursor = restart_cursor; + cursor.seek(restart as isize, SeekMode::Absolute).ok(); } } - // After `\n+`: skip separators, then expect sequence characters 2 => { if is_sep(c) { // stay @@ -81,11 +62,9 @@ pub fn end_of_last_fastq_entry(rope: &[Bytes]) -> Option { state = 3; } else { state = 0; - i = restart; - cursor = restart_cursor; + cursor.seek(restart as isize, SeekMode::Absolute).ok(); } } - // Scanning sequence characters backward 3 => { if is_eol(c) { state = 4; @@ -93,60 +72,48 @@ pub fn end_of_last_fastq_entry(rope: &[Bytes]) -> Option { // stay } else { state = 0; - i = restart; - cursor = restart_cursor; + cursor.seek(restart as isize, SeekMode::Absolute).ok(); } } - // Found end-of-line before sequence — skip any extra newlines 4 => { - if is_eol(c) { - // stay - } else { + if !is_eol(c) { state = 5; } } - // Scanning header content — looking for `@` at start of line 5 => { if is_eol(c) { state = 0; - i = restart; - cursor = restart_cursor; + cursor.seek(restart as isize, SeekMode::Absolute).ok(); } else if c == b'@' { + cut = cursor.tell()?; state = 6; - cut = i as usize; } - // else: stay + // else stay } - // Found `@` — expect end-of-line before it 6 => { if is_eol(c) { - state = 7; // success + if cut > 0 { + return Some(cut); + } + return None; } else { state = 5; } } _ => unreachable!(), } - - i -= 1; - if i >= 0 { - cursor.previous(rope); - } } - - if i <= 0 || state != 7 { - return None; - } - Some(cut) + None } #[cfg(test)] mod tests { use super::*; - use bytes::BytesMut; - fn rope(data: &[u8]) -> Vec { - vec![BytesMut::from(data).freeze()] + fn rope(data: &[u8]) -> Rope { + let mut r = Rope::new(); + r.push(data.to_vec()); + r } fn make_fastq(records: &[(&[u8], &[u8])]) -> Vec { @@ -162,6 +129,10 @@ mod tests { buf } + fn flat(r: &Rope) -> Vec { + r.fw_cursor().collect() + } + #[test] fn single_record_no_boundary() { let buf = make_fastq(&[(b"ACGT", b"IIII")]); @@ -171,9 +142,10 @@ mod tests { #[test] fn two_records_cuts_at_second() { let buf = make_fastq(&[(b"ACGT", b"IIII"), (b"TTTT", b"HHHH")]); - let pos = end_of_last_fastq_entry(&rope(&buf)).unwrap(); - assert_eq!(buf[pos], b'@'); - assert_eq!(&buf[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()); + let r = rope(&buf); + let pos = end_of_last_fastq_entry(&r).unwrap(); + assert_eq!(flat(&r)[pos], b'@'); + assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()); } #[test] @@ -183,8 +155,9 @@ mod tests { (b"CCCC", b"JJJJ"), (b"GGGG", b"KKKK"), ]); - let pos = end_of_last_fastq_entry(&rope(&buf)).unwrap(); - assert_eq!(&buf[pos..], make_fastq(&[(b"GGGG", b"KKKK")]).as_slice()); + let r = rope(&buf); + let pos = end_of_last_fastq_entry(&r).unwrap(); + assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"GGGG", b"KKKK")]).as_slice()); } #[test] @@ -193,15 +166,17 @@ mod tests { (b"ACGTACGT", b"@@@@IIII"), (b"TTTT", b"HHHH"), ]); - let pos = end_of_last_fastq_entry(&rope(&buf)).unwrap(); - assert_eq!(&buf[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()); + let r = rope(&buf); + let pos = end_of_last_fastq_entry(&r).unwrap(); + assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()); } #[test] fn crlf_line_endings() { - let buf = b"@h\r\nACGT\r\n+\r\nIIII\r\n@h\r\nTTTT\r\n+\r\nHHHH\r\n"; - let pos = end_of_last_fastq_entry(&rope(buf)).unwrap(); - assert_eq!(buf[pos], b'@'); - assert_eq!(&buf[pos..], b"@h\r\nTTTT\r\n+\r\nHHHH\r\n"); + let data = b"@h\r\nACGT\r\n+\r\nIIII\r\n@h\r\nTTTT\r\n+\r\nHHHH\r\n"; + let r = rope(data); + let pos = end_of_last_fastq_entry(&r).unwrap(); + assert_eq!(flat(&r)[pos], b'@'); + assert_eq!(&flat(&r)[pos..], b"@h\r\nTTTT\r\n+\r\nHHHH\r\n"); } } diff --git a/src/obiread/src/lib.rs b/src/obiread/src/lib.rs index 39fea7e..47f438a 100644 --- a/src/obiread/src/lib.rs +++ b/src/obiread/src/lib.rs @@ -1,7 +1,7 @@ //! Streaming sequence file reader for obikmer. //! -//! Yields rope chunks (`Vec`) from FASTA or FASTQ files, each ending -//! on a complete sequence record boundary. Zero allocation in the common case. +//! Yields rope chunks from FASTA or FASTQ files, each ending +//! on a complete sequence record boundary. #![deny(missing_docs)] @@ -9,7 +9,6 @@ mod fasta; mod fastq; pub mod chunk; pub mod normalize; -pub mod tape; pub mod xopen; use std::io::Read; @@ -22,20 +21,10 @@ pub const DEFAULT_BLOCK_SIZE: usize = 1024 * 1024; /// Create a FASTA chunk iterator over `source`. pub fn fasta_chunks(source: R) -> SeqChunkIter { - SeqChunkIter::new( - source, - DEFAULT_BLOCK_SIZE, - fasta::end_of_last_fasta_entry, - b"\n>", - ) + SeqChunkIter::new(source, DEFAULT_BLOCK_SIZE, fasta::end_of_last_fasta_entry) } /// Create a FASTQ chunk iterator over `source`. pub fn fastq_chunks(source: R) -> SeqChunkIter { - SeqChunkIter::new( - source, - DEFAULT_BLOCK_SIZE, - fastq::end_of_last_fastq_entry, - b"\n@", - ) + SeqChunkIter::new(source, DEFAULT_BLOCK_SIZE, fastq::end_of_last_fastq_entry) } diff --git a/src/obiread/src/normalize.rs b/src/obiread/src/normalize.rs index 1cd6a5f..89c329f 100644 --- a/src/obiread/src/normalize.rs +++ b/src/obiread/src/normalize.rs @@ -1,141 +1,114 @@ //! Sequence normalisation automata for FASTA and FASTQ rope chunks. //! -//! Both automata operate on a [`RopeTape`] in place (two-cursor, zero -//! reallocation in the common case) and produce a compact byte stream: -//! ACGT segments of length ≥ k separated by `0x00`, uppercased. -//! -//! Ambiguous bases terminate the current segment. Segments shorter than k -//! are silently discarded by retreating the write cursor. +//! Both automata operate on a Rope in place (two-cursor, zero reallocation) +//! and produce a compact byte stream: ACGT segments of length ≥ k separated +//! by `0x00`, uppercased. Ambiguous bases terminate the current segment. +//! Segments shorter than k are silently discarded. -use bytes::Bytes; - -use crate::tape::{RopeCursor, RopeTape}; +use obikrope::{ForwardCursor, Rope, RopeCursor}; // ── public entry points ─────────────────────────────────────────────────────── -/// Normalise a FASTA chunk: skip headers; copy, filter and concatenate -/// sequence lines across multi-line records. -/// -/// Returns the written region of the tape as a rope of `Bytes`. -pub fn normalize_fasta_chunk(chunks: Vec, k: usize) -> Vec { - let mut tape = RopeTape::new(chunks); - normalize_fasta(&mut tape, k); - tape.into_output() +/// Normalise a FASTA chunk into a compact ACGT\x00-separated rope. +pub fn normalize_fasta_chunk(mut rope: Rope, k: usize) -> Rope { + let write_pos = { + let read = rope.fw_cursor(); + let write = rope.fw_cursor(); + normalize_fasta(&read, &write, k); + write.tell().unwrap_or(0) + }; + let _ = rope.split_off(write_pos); + rope } -/// Normalise a FASTQ chunk: skip headers, `+` lines and quality lines; -/// copy and filter sequence lines. -/// -/// Returns the written region of the tape as a rope of `Bytes`. -pub fn normalize_fastq_chunk(chunks: Vec, k: usize) -> Vec { - let mut tape = RopeTape::new(chunks); - normalize_fastq(&mut tape, k); - tape.into_output() +/// Normalise a FASTQ chunk into a compact ACGT\x00-separated rope. +pub fn normalize_fastq_chunk(mut rope: Rope, k: usize) -> Rope { + let write_pos = { + let read = rope.fw_cursor(); + let write = rope.fw_cursor(); + normalize_fastq(&read, &write, k); + write.tell().unwrap_or(0) + }; + let _ = rope.split_off(write_pos); + rope } // ── FASTA automaton ─────────────────────────────────────────────────────────── -/// Drive the FASTA normalisation automaton on `tape`. -/// -/// After the initial header skip, the automaton reads sequence characters -/// one by one. At each newline run it peeks at the next character to decide: -/// `>` → new record (close segment, skip header); anything else → line -/// continuation within the same sequence. -fn normalize_fasta(tape: &mut RopeTape, k: usize) { - // Skip the first header line (includes the leading `>`). - skip_line(tape); +fn normalize_fasta(read: &ForwardCursor<'_>, write: &ForwardCursor<'_>, k: usize) { + skip_line(read); loop { - let mut seg_start = tape.write_snapshot(); + let mut seg_start = write.tell().unwrap_or(0); 'seq: loop { - let Some(c) = tape.read_next() else { - end_segment(tape, &mut seg_start, k); + let Some(c) = read.read_next().ok() else { + end_segment(write, &mut seg_start, k); return; }; if is_newline(c) { - // Peek at the very next character — no newline run to skip. - // \r\n is handled naturally: \r → peek \n → continue; \n → peek content. - match tape.peek() { + match read.read_ahead(1).ok() { None => { - end_segment(tape, &mut seg_start, k); + end_segment(write, &mut seg_start, k); return; } Some(b'>') => { - // New record: close current segment, skip next header. - end_segment(tape, &mut seg_start, k); - skip_line(tape); - break 'seq; // restart outer loop - } - Some(_) => { - // Line continuation or \r before \n: next char is a nucleotide. - continue 'seq; + end_segment(write, &mut seg_start, k); + skip_line(read); + break 'seq; } + Some(_) => continue 'seq, } } let upper = c & !0x20u8; if is_acgt(upper) { - tape.write_next(upper); + write.write(upper).ok(); } else { - // Ambiguous base: close segment, skip the non-ACGT run. - end_segment(tape, &mut seg_start, k); - skip_until_acgt_or_newline(tape); - seg_start = tape.write_snapshot(); + end_segment(write, &mut seg_start, k); + skip_until_acgt_or_newline(read); + seg_start = write.tell().unwrap_or(0); } } - // Outer loop restarts for the next record. } } // ── FASTQ automaton ─────────────────────────────────────────────────────────── -/// Drive the FASTQ normalisation automaton on `tape`. -/// -/// The FASTQ 4-line structure (`@header`, sequence, `+`, quality) is rigid, -/// so the automaton cycles through four fixed phases without backtracking. -fn normalize_fastq(tape: &mut RopeTape, k: usize) { +fn normalize_fastq(read: &ForwardCursor<'_>, write: &ForwardCursor<'_>, k: usize) { loop { - // ── Phase 1: skip header line (@…\n) ───────────────────────────── - skip_line(tape); + skip_line(read); // skip header - // ── Phase 2: copy sequence ──────────────────────────────────────── - let mut seg_start = tape.write_snapshot(); + let mut seg_start = write.tell().unwrap_or(0); 'seq: loop { - let Some(c) = tape.read_next() else { - // EOF inside a sequence: flush whatever we have. - end_segment(tape, &mut seg_start, k); + let Some(c) = read.read_next().ok() else { + end_segment(write, &mut seg_start, k); return; }; if is_newline(c) { - skip_newlines(tape); + skip_newlines(read); break 'seq; } - let upper = c & !0x20u8; // ASCII uppercase trick + let upper = c & !0x20u8; if is_acgt(upper) { - tape.write_next(upper); + write.write(upper).ok(); } else { - // Ambiguous base: close the current segment, skip non-ACGT run. - end_segment(tape, &mut seg_start, k); - skip_until_acgt_or_newline(tape); - seg_start = tape.write_snapshot(); + end_segment(write, &mut seg_start, k); + skip_until_acgt_or_newline(read); + seg_start = write.tell().unwrap_or(0); } } - // Sequence line ended: close the last segment. - end_segment(tape, &mut seg_start, k); + end_segment(write, &mut seg_start, k); - // ── Phase 3: skip + line ────────────────────────────────────────── - skip_line(tape); + skip_line(read); // skip + line + skip_line(read); // skip quality line - // ── Phase 4: skip quality line ──────────────────────────────────── - skip_line(tape); - - if tape.peek().is_none() { + if read.read_ahead(1).ok().is_none() { return; } } @@ -143,46 +116,38 @@ fn normalize_fastq(tape: &mut RopeTape, k: usize) { // ── shared helpers ──────────────────────────────────────────────────────────── -/// Skip to the end of the current line, consuming the newline run. -fn skip_line(tape: &mut RopeTape) { - while let Some(c) = tape.read_next() { +fn skip_line(read: &ForwardCursor<'_>) { + while let Some(c) = read.read_next().ok() { if is_newline(c) { - skip_newlines(tape); + skip_newlines(read); return; } } } -/// Consume a contiguous run of `\n` / `\r` characters. -fn skip_newlines(tape: &mut RopeTape) { - while matches!(tape.peek(), Some(c) if is_newline(c)) { - tape.read_next(); +fn skip_newlines(read: &ForwardCursor<'_>) { + while matches!(read.read_ahead(1).ok(), Some(c) if is_newline(c)) { + read.read_next().ok(); } } -/// Consume characters until the next ACGT base or newline (leaving it unread). -fn skip_until_acgt_or_newline(tape: &mut RopeTape) { - while let Some(c) = tape.peek() { +fn skip_until_acgt_or_newline(read: &ForwardCursor<'_>) { + while let Some(c) = read.read_ahead(1).ok() { if is_newline(c) || is_acgt(c & !0x20u8) { return; } - tape.read_next(); + read.read_next().ok(); } } -/// Close the current segment. -/// -/// - If `seg_len >= k`: write `0x00` terminator and advance `seg_start`. -/// - If `0 < seg_len < k`: erase by retreating the write cursor. -/// - If `seg_len == 0`: nothing to do. -fn end_segment(tape: &mut RopeTape, seg_start: &mut RopeCursor, k: usize) { - let seg_len = tape.written_since(*seg_start); +fn end_segment(write: &ForwardCursor<'_>, seg_start: &mut usize, k: usize) { + let seg_len = write.tell().unwrap_or(0) - *seg_start; if seg_len >= k { - tape.write_next(0x00); + write.write(0x00).ok(); } else if seg_len > 0 { - tape.write_retreat(seg_len); + write.rewind(seg_len).ok(); } - *seg_start = tape.write_snapshot(); + *seg_start = write.tell().unwrap_or(0); } #[inline] fn is_newline(c: u8) -> bool { c == b'\n' || c == b'\r' } @@ -193,14 +158,23 @@ fn end_segment(tape: &mut RopeTape, seg_start: &mut RopeCursor, k: usize) { #[cfg(test)] mod tests { use super::*; - use bytes::BytesMut; - fn run(data: &[u8], k: usize) -> Vec { - let chunks = vec![BytesMut::from(data).freeze()]; - normalize_fastq_chunk(chunks, k) - .into_iter() - .flat_map(|b| b.to_vec()) - .collect() + fn make_rope(data: &[u8]) -> Rope { + let mut r = Rope::new(); + r.push(data.to_vec()); + r + } + + fn flat(r: Rope) -> Vec { + r.fw_cursor().collect() + } + + fn run_fastq(data: &[u8], k: usize) -> Vec { + flat(normalize_fastq_chunk(make_rope(data), k)) + } + + fn run_fasta(data: &[u8], k: usize) -> Vec { + flat(normalize_fasta_chunk(make_rope(data), k)) } fn make_fastq(records: &[&[u8]]) -> Vec { @@ -216,107 +190,6 @@ mod tests { buf } - // ── basic output format ─────────────────────────────────────────────────── - - #[test] - fn single_record_produces_seq_then_null() { - let out = run(&make_fastq(&[b"ACGTACGT"]), 4); - assert_eq!(out, b"ACGTACGT\x00"); - } - - #[test] - fn two_records_concatenated() { - let out = run(&make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]), 4); - assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00"); - } - - // ── uppercase normalisation ─────────────────────────────────────────────── - - #[test] - fn lowercase_input_uppercased() { - let out = run(&make_fastq(&[b"acgtacgt"]), 4); - assert_eq!(out, b"ACGTACGT\x00"); - } - - #[test] - fn mixed_case_uppercased() { - let out = run(&make_fastq(&[b"AcGtAcGt"]), 4); - assert_eq!(out, b"ACGTACGT\x00"); - } - - // ── k filter ───────────────────────────────────────────────────────────── - - #[test] - fn sequence_shorter_than_k_discarded() { - let out = run(&make_fastq(&[b"ACG"]), 4); - assert_eq!(out, b""); - } - - #[test] - fn sequence_exactly_k_kept() { - let out = run(&make_fastq(&[b"ACGT"]), 4); - assert_eq!(out, b"ACGT\x00"); - } - - #[test] - fn short_record_among_valid_ones_discarded() { - let out = run(&make_fastq(&[b"ACGTACGT", b"AC", b"TTTTTTTT"]), 4); - assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00"); - } - - // ── ambiguous bases ─────────────────────────────────────────────────────── - - #[test] - fn ambiguous_splits_into_two_segments() { - // 'N' in the middle splits "ACGT" + "ACGT" - let out = run(&make_fastq(&[b"ACGTNACGT"]), 4); - assert_eq!(out, b"ACGT\x00ACGT\x00"); - } - - #[test] - fn segment_after_ambiguous_too_short_discarded() { - // "ACGTACGT" + 'N' + "AC" (< k=4) - let out = run(&make_fastq(&[b"ACGTACGTNAC"]), 4); - assert_eq!(out, b"ACGTACGT\x00"); - } - - #[test] - fn consecutive_ambiguous_produce_no_empty_segment() { - let out = run(&make_fastq(&[b"ACGTNNNNACGT"]), 4); - assert_eq!(out, b"ACGT\x00ACGT\x00"); - } - - #[test] - fn ambiguous_at_start_skipped() { - let out = run(&make_fastq(&[b"NNACGTACGT"]), 4); - assert_eq!(out, b"ACGTACGT\x00"); - } - - #[test] - fn ambiguous_at_end_produces_no_trailing_empty() { - let out = run(&make_fastq(&[b"ACGTACGTNN"]), 4); - assert_eq!(out, b"ACGTACGT\x00"); - } - - // ── CRLF line endings ───────────────────────────────────────────────────── - - #[test] - fn crlf_handled() { - let data = b"@hdr\r\nACGTACGT\r\n+\r\nIIIIIIII\r\n"; - let out = run(data, 4); - assert_eq!(out, b"ACGTACGT\x00"); - } - - // ── FASTA ───────────────────────────────────────────────────────────────── - - fn run_fasta(data: &[u8], k: usize) -> Vec { - let chunks = vec![BytesMut::from(data).freeze()]; - normalize_fasta_chunk(chunks, k) - .into_iter() - .flat_map(|b| b.to_vec()) - .collect() - } - fn make_fasta(records: &[(&[u8], &[u8])]) -> Vec { let mut buf = Vec::new(); for (id, seq) in records { @@ -329,112 +202,160 @@ mod tests { buf } + // ── FASTQ basic ────────────────────────────────────────────────────────── + + #[test] + fn single_record_produces_seq_then_null() { + assert_eq!(run_fastq(&make_fastq(&[b"ACGTACGT"]), 4), b"ACGTACGT\x00"); + } + + #[test] + fn two_records_concatenated() { + assert_eq!( + run_fastq(&make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]), 4), + b"ACGTACGT\x00TTTTTTTT\x00" + ); + } + + #[test] + fn lowercase_input_uppercased() { + assert_eq!(run_fastq(&make_fastq(&[b"acgtacgt"]), 4), b"ACGTACGT\x00"); + } + + #[test] + fn mixed_case_uppercased() { + assert_eq!(run_fastq(&make_fastq(&[b"AcGtAcGt"]), 4), b"ACGTACGT\x00"); + } + + #[test] + fn sequence_shorter_than_k_discarded() { + assert_eq!(run_fastq(&make_fastq(&[b"ACG"]), 4), b""); + } + + #[test] + fn sequence_exactly_k_kept() { + assert_eq!(run_fastq(&make_fastq(&[b"ACGT"]), 4), b"ACGT\x00"); + } + + #[test] + fn short_record_among_valid_ones_discarded() { + assert_eq!( + run_fastq(&make_fastq(&[b"ACGTACGT", b"AC", b"TTTTTTTT"]), 4), + b"ACGTACGT\x00TTTTTTTT\x00" + ); + } + + #[test] + fn ambiguous_splits_into_two_segments() { + assert_eq!(run_fastq(&make_fastq(&[b"ACGTNACGT"]), 4), b"ACGT\x00ACGT\x00"); + } + + #[test] + fn segment_after_ambiguous_too_short_discarded() { + assert_eq!(run_fastq(&make_fastq(&[b"ACGTACGTNAC"]), 4), b"ACGTACGT\x00"); + } + + #[test] + fn consecutive_ambiguous_produce_no_empty_segment() { + assert_eq!(run_fastq(&make_fastq(&[b"ACGTNNNNACGT"]), 4), b"ACGT\x00ACGT\x00"); + } + + #[test] + fn ambiguous_at_start_skipped() { + assert_eq!(run_fastq(&make_fastq(&[b"NNACGTACGT"]), 4), b"ACGTACGT\x00"); + } + + #[test] + fn ambiguous_at_end_produces_no_trailing_empty() { + assert_eq!(run_fastq(&make_fastq(&[b"ACGTACGTNN"]), 4), b"ACGTACGT\x00"); + } + + #[test] + fn crlf_handled() { + let data = b"@hdr\r\nACGTACGT\r\n+\r\nIIIIIIII\r\n"; + assert_eq!(run_fastq(data, 4), b"ACGTACGT\x00"); + } + + #[test] + fn multi_slice_rope() { + let data = make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]); + let mid = data.len() / 2; + let mut rope = Rope::new(); + rope.push(data[..mid].to_vec()); + rope.push(data[mid..].to_vec()); + assert_eq!(flat(normalize_fastq_chunk(rope, 4)), b"ACGTACGT\x00TTTTTTTT\x00"); + } + + // ── FASTA ───────────────────────────────────────────────────────────────── + #[test] fn fasta_single_record() { - let out = run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT")]), 4); - assert_eq!(out, b"ACGTACGT\x00"); + assert_eq!(run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT")]), 4), b"ACGTACGT\x00"); } #[test] fn fasta_two_records() { - let out = run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]), 4); - assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00"); + assert_eq!( + run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]), 4), + b"ACGTACGT\x00TTTTTTTT\x00" + ); } #[test] fn fasta_multiline_sequence_concatenated() { - let data = b">s1\nACGT\nACGT\nACGT\n"; - let out = run_fasta(data, 4); - assert_eq!(out, b"ACGTACGTACGT\x00"); + assert_eq!(run_fasta(b">s1\nACGT\nACGT\nACGT\n", 4), b"ACGTACGTACGT\x00"); } #[test] fn fasta_lowercase_uppercased() { - let out = run_fasta(&make_fasta(&[(b"s1", b"acgtacgt")]), 4); - assert_eq!(out, b"ACGTACGT\x00"); + assert_eq!(run_fasta(&make_fasta(&[(b"s1", b"acgtacgt")]), 4), b"ACGTACGT\x00"); } #[test] fn fasta_short_record_discarded() { - let out = run_fasta(&make_fasta(&[(b"s1", b"ACG")]), 4); - assert_eq!(out, b""); + assert_eq!(run_fasta(&make_fasta(&[(b"s1", b"ACG")]), 4), b""); } #[test] fn fasta_short_among_valid_discarded() { - let out = run_fasta( - &make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"AC"), (b"s3", b"TTTTTTTT")]), - 4, + assert_eq!( + run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"AC"), (b"s3", b"TTTTTTTT")]), 4), + b"ACGTACGT\x00TTTTTTTT\x00" ); - assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00"); } #[test] fn fasta_ambiguous_splits_segments() { - let data = b">s1\nACGTNACGT\n"; - let out = run_fasta(data, 4); - assert_eq!(out, b"ACGT\x00ACGT\x00"); + assert_eq!(run_fasta(b">s1\nACGTNACGT\n", 4), b"ACGT\x00ACGT\x00"); } #[test] fn fasta_ambiguous_across_line_boundary() { - // 'N' at start of second line — still ambiguous - let data = b">s1\nACGT\nNACGT\n"; - let out = run_fasta(data, 4); - assert_eq!(out, b"ACGT\x00ACGT\x00"); + assert_eq!(run_fasta(b">s1\nACGT\nNACGT\n", 4), b"ACGT\x00ACGT\x00"); } #[test] fn fasta_ambiguous_short_segment_discarded() { - let data = b">s1\nACGTACGTNAC\n"; - let out = run_fasta(data, 4); - assert_eq!(out, b"ACGTACGT\x00"); + assert_eq!(run_fasta(b">s1\nACGTACGTNAC\n", 4), b"ACGTACGT\x00"); } #[test] fn fasta_no_trailing_newline() { - let data = b">s1\nACGTACGT"; - let out = run_fasta(data, 4); - assert_eq!(out, b"ACGTACGT\x00"); + assert_eq!(run_fasta(b">s1\nACGTACGT", 4), b"ACGTACGT\x00"); } #[test] fn fasta_crlf_line_endings() { - let data = b">s1\r\nACGT\r\nACGT\r\n>s2\r\nTTTT\r\n"; - let out = run_fasta(data, 4); - assert_eq!(out, b"ACGTACGT\x00TTTT\x00"); + assert_eq!(run_fasta(b">s1\r\nACGT\r\nACGT\r\n>s2\r\nTTTT\r\n", 4), b"ACGTACGT\x00TTTT\x00"); } #[test] fn fasta_multi_slice_rope() { let data = make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]); let mid = data.len() / 2; - let chunks = vec![ - BytesMut::from(&data[..mid]).freeze(), - BytesMut::from(&data[mid..]).freeze(), - ]; - let out: Vec = normalize_fasta_chunk(chunks, 4) - .into_iter() - .flat_map(|b| b.to_vec()) - .collect(); - assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00"); - } - - // ── multi-record rope (multiple Bytes slices) ───────────────────────────── - - #[test] - fn multi_slice_rope() { - let data = make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]); - // Split into two slices at an arbitrary boundary. - let mid = data.len() / 2; - let chunks = vec![ - BytesMut::from(&data[..mid]).freeze(), - BytesMut::from(&data[mid..]).freeze(), - ]; - let out: Vec = normalize_fastq_chunk(chunks, 4) - .into_iter() - .flat_map(|b| b.to_vec()) - .collect(); - assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00"); + let mut rope = Rope::new(); + rope.push(data[..mid].to_vec()); + rope.push(data[mid..].to_vec()); + assert_eq!(flat(normalize_fasta_chunk(rope, 4)), b"ACGTACGT\x00TTTTTTTT\x00"); } } diff --git a/src/obiread/src/tape.rs b/src/obiread/src/tape.rs deleted file mode 100644 index 0e6f0bd..0000000 --- a/src/obiread/src/tape.rs +++ /dev/null @@ -1,427 +0,0 @@ -//! Two-cursor tape over a rope of `BytesMut` slices. -//! -//! A [`RopeCursor`] is a position `(slice, byte)` within a rope. -//! Read and write cursors are the same type — the only difference is which -//! primitive they call: [`RopeCursor::peek`] vs [`RopeCursor::poke`]. -//! -//! The read-only methods (`peek`, `advance`, `retreat`, `distance`) are generic -//! over `T: std::ops::Deref` and work on both `Vec` and -//! `Vec`. The write method (`poke`) is specific to `BytesMut`. -//! -//! [`RopeTape`] owns the rope and two cursors, and exposes the four Turing-machine -//! primitives used by the sequence normalisation automata. - -use std::ops::Deref; -use bytes::{Bytes, BytesMut}; - -// ── RopeCursor ──────────────────────────────────────────────────────────────── - -/// A position within a rope (`Vec`), usable as either a read or write head. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct RopeCursor { - slice: usize, - byte: usize, -} - -impl RopeCursor { - /// Position at the very start of the rope. - #[inline] - pub fn new() -> Self { - Self { slice: 0, byte: 0 } - } - - /// Read the byte at the current position without moving. - /// Returns `None` at end of rope. - #[inline] - pub fn peek>(&self, rope: &[T]) -> Option { - rope.get(self.slice)?.get(self.byte).copied() - } - - /// Write `b` at the current position without moving. - #[inline] - pub fn poke(&self, rope: &mut [BytesMut], b: u8) { - rope[self.slice][self.byte] = b; - } - - /// Advance by one byte, crossing slice boundaries transparently. - /// Returns `false` when the end of the rope is reached. - #[inline] - pub fn advance>(&mut self, rope: &[T]) -> bool { - if self.slice >= rope.len() { - return false; - } - self.byte += 1; - if self.byte >= rope[self.slice].len() { - self.slice += 1; - self.byte = 0; - } - self.slice < rope.len() - } - - /// Position at the last byte of the rope. - /// - /// Returns `None` if the rope is empty or all slices are empty. - pub fn end>(rope: &[T]) -> Option { - for (i, slice) in rope.iter().enumerate().rev() { - if !slice.is_empty() { - return Some(Self { slice: i, byte: slice.len() - 1 }); - } - } - None - } - - /// Retreat by one byte and return the byte now under the cursor. - /// - /// Returns `None` if already at the beginning of the rope. - /// This is the inverse of the read-advance pattern used by `RopeTape::read_next`. - pub fn previous>(&mut self, rope: &[T]) -> Option { - if self.slice == 0 && self.byte == 0 { - return None; - } - self.retreat(1, rope); - self.peek(rope) - } - - /// Retreat by `n` bytes, crossing slice boundaries transparently. - /// - /// Panics in debug mode if retreating past the beginning of the rope. - pub fn retreat>(&mut self, mut n: usize, rope: &[T]) { - while n > 0 { - if self.byte >= n { - self.byte -= n; - return; - } - // Consume all bytes in current slice and cross boundary. - n -= self.byte + 1; - debug_assert!(self.slice > 0, "retreat past beginning of rope"); - self.slice -= 1; - self.byte = rope[self.slice].len() - 1; - } - } - - /// Number of bytes from `from` (inclusive) to `to` (exclusive). - /// - /// Requires `from` ≤ `to` in rope order. - pub fn distance>(from: &Self, to: &Self, rope: &[T]) -> usize { - if from.slice == to.slice { - to.byte - from.byte - } else { - let mut d = rope[from.slice].len() - from.byte; - for s in (from.slice + 1)..to.slice { - d += rope[s].len(); - } - d += to.byte; - d - } - } -} - -impl Default for RopeCursor { - fn default() -> Self { Self::new() } -} - -// ── RopeTape ────────────────────────────────────────────────────────────────── - -/// A mutable rope with independent read and write cursors. -/// -/// The write cursor always stays at or behind the read cursor — this invariant -/// is guaranteed structurally by the formats (FASTA headers and FASTQ -/// non-sequence lines are consumed before any sequence byte reaches the same -/// position). -pub struct RopeTape { - rope: Vec, - read: RopeCursor, - write: RopeCursor, -} - -impl RopeTape { - /// Build a tape from a chunk produced by [`crate::chunk::SeqChunkIter`]. - /// - /// Each `Bytes` is converted to `BytesMut` in O(1): succeeds because the - /// chunk reader yields uniquely-owned, non-shared buffers. - pub fn new(chunks: Vec) -> Self { - let rope = chunks - .into_iter() - .map(|b| b.try_into_mut().expect("Bytes not uniquely owned")) - .collect(); - Self { rope, read: RopeCursor::new(), write: RopeCursor::new() } - } - - /// Peek at the byte under the read cursor without advancing. - #[inline] - pub fn peek(&self) -> Option { - self.read.peek(&self.rope) - } - - /// Read the byte under the read cursor and advance it. - #[inline] - pub fn read_next(&mut self) -> Option { - let b = self.read.peek(&self.rope)?; - self.read.advance(&self.rope); - Some(b) - } - - /// Write `b` at the write cursor and advance it. - #[inline] - pub fn write_next(&mut self, b: u8) { - self.write.poke(&mut self.rope, b); - self.write.advance(&self.rope); - } - - /// Retreat the read cursor by one byte and return the byte now under it. - /// - /// This is the inverse of `read_next`: move back one position and read. - /// Returns `None` if already at the beginning of the tape. - #[inline] - pub fn previous(&mut self) -> Option { - self.read.previous(&self.rope) - } - - /// Retreat the write cursor by `n` bytes (to erase a sequence shorter than k). - #[inline] - pub fn write_retreat(&mut self, n: usize) { - self.write.retreat(n, &self.rope); - } - - /// Snapshot the current write position, typically used as `seq_start`. - #[inline] - pub fn write_snapshot(&self) -> RopeCursor { - self.write - } - - /// Number of bytes written since `snapshot`. - #[inline] - pub fn written_since(&self, snapshot: RopeCursor) -> usize { - RopeCursor::distance(&snapshot, &self.write, &self.rope) - } - - /// Consume the tape and return the written region as a `Vec`. - /// - /// Slices fully overwritten are returned in full; the last partial slice is - /// truncated to the write position; all remaining (unread) slices are dropped. - pub fn into_output(mut self) -> Vec { - let ws = self.write.slice; - let wb = self.write.byte; - - if ws < self.rope.len() { - self.rope.truncate(ws + 1); - if let Some(last) = self.rope.last_mut() { - last.truncate(wb); - } - } - - self.rope - .into_iter() - .filter(|s| !s.is_empty()) - .map(BytesMut::freeze) - .collect() - } -} - -// ── tests ───────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - - fn rope(slices: &[&[u8]]) -> Vec { - slices.iter().map(|s| BytesMut::from(*s)).collect() - } - - /// Mimics what SeqChunkIter produces: Bytes allocated from BytesMut (uniquely owned). - fn owned_bytes(data: &[u8]) -> Bytes { - BytesMut::from(data).freeze() - } - - // ── RopeCursor::peek ────────────────────────────────────────────────────── - - #[test] - fn peek_first_byte() { - let r = rope(&[b"ACGT"]); - assert_eq!(RopeCursor::new().peek(&r), Some(b'A')); - } - - #[test] - fn peek_past_end_returns_none() { - let r = rope(&[b"A"]); - let mut c = RopeCursor::new(); - c.advance(&r); - assert_eq!(c.peek(&r), None); - } - - // ── RopeCursor::advance ─────────────────────────────────────────────────── - - #[test] - fn advance_crosses_slice_boundary() { - let r = rope(&[b"AC", b"GT"]); - let mut c = RopeCursor::new(); - c.advance(&r); // → 'C' - c.advance(&r); // → 'G' (slice 1) - assert_eq!(c.peek(&r), Some(b'G')); - } - - #[test] - fn advance_at_end_returns_false() { - let r = rope(&[b"A"]); - let mut c = RopeCursor::new(); - let still_in = c.advance(&r); - assert!(!still_in); - assert_eq!(c.peek(&r), None); - } - - // ── RopeCursor::poke ────────────────────────────────────────────────────── - - #[test] - fn poke_writes_without_advancing() { - let mut r = rope(&[b"XXXX"]); - let c = RopeCursor::new(); - c.poke(&mut r, b'A'); - assert_eq!(r[0][0], b'A'); - assert_eq!(r[0][1], b'X'); // unchanged - // cursor position unchanged: peek still at 0 - assert_eq!(c.peek(&r), Some(b'A')); - } - - // ── RopeCursor::retreat ─────────────────────────────────────────────────── - - #[test] - fn retreat_within_slice() { - let r = rope(&[b"ACGT"]); - let mut c = RopeCursor::new(); - c.advance(&r); c.advance(&r); c.advance(&r); // → 'T' - c.retreat(2, &r); - assert_eq!(c.peek(&r), Some(b'C')); - } - - #[test] - fn retreat_crosses_slice_boundary() { - let r = rope(&[b"ACGT", b"TTTT"]); - let mut c = RopeCursor::new(); - for _ in 0..4 { c.advance(&r); } // → first 'T' of slice 1 - c.retreat(1, &r); - assert_eq!(c.peek(&r), Some(b'T')); // last byte of "ACGT" - } - - #[test] - fn retreat_exactly_to_slice_start() { - let r = rope(&[b"ACGT", b"TTTT"]); - let mut c = RopeCursor::new(); - for _ in 0..5 { c.advance(&r); } // → second 'T' of slice 1 - c.retreat(5, &r); // back to very first byte - assert_eq!(c.peek(&r), Some(b'A')); - } - - #[test] - fn retreat_zero_is_noop() { - let r = rope(&[b"ACGT"]); - let mut c = RopeCursor::new(); - c.advance(&r); c.advance(&r); // → 'G' - c.retreat(0, &r); - assert_eq!(c.peek(&r), Some(b'G')); - } - - // ── RopeCursor::distance ────────────────────────────────────────────────── - - #[test] - fn distance_same_slice() { - let r = rope(&[b"ACGTACGT"]); - let mut a = RopeCursor::new(); - let mut b = RopeCursor::new(); - b.advance(&r); b.advance(&r); b.advance(&r); // at offset 3 - a.advance(&r); // at offset 1 - assert_eq!(RopeCursor::distance(&a, &b, &r), 2); - } - - #[test] - fn distance_across_slices() { - let r = rope(&[b"ACGT", b"TTTT"]); - let from = RopeCursor::new(); // (0,0) - let mut to = RopeCursor::new(); - for _ in 0..6 { to.advance(&r); } // (1,2) - assert_eq!(RopeCursor::distance(&from, &to, &r), 6); - } - - #[test] - fn distance_zero() { - let r = rope(&[b"ACGT"]); - let c = RopeCursor::new(); - assert_eq!(RopeCursor::distance(&c, &c, &r), 0); - } - - // ── RopeTape ────────────────────────────────────────────────────────────── - - #[test] - fn tape_read_skips_write_copies() { - // Simulate: 5 bytes header, then 4 bytes sequence. - // Read 5 without writing, then copy 4. - let chunks = vec![owned_bytes(b">hdr\nACGT")]; - let mut tape = RopeTape::new(chunks); - - for _ in 0..5 { tape.read_next(); } // skip ">hdr\n" - for _ in 0..4 { - let b = tape.read_next().unwrap(); - tape.write_next(b); - } - - let out: Vec = tape.into_output().into_iter().flat_map(|b| b.to_vec()).collect(); - assert_eq!(out, b"ACGT"); - } - - #[test] - fn tape_write_retreat_erases() { - let chunks = vec![owned_bytes(b"XXXXXXACGT")]; - let mut tape = RopeTape::new(chunks); - - // Write 3 bytes, then retreat — simulates a sequence < k being discarded. - let snap = tape.write_snapshot(); - for _ in 0..3 { - let b = tape.read_next().unwrap(); - tape.write_next(b); - } - assert_eq!(tape.written_since(snap), 3); - tape.write_retreat(3); // erase the 3 bytes - assert_eq!(tape.written_since(snap), 0); - - // Now write 4 more bytes (the "ACGT"). - for _ in 0..3 { tape.read_next(); } // skip "XXX" - for _ in 0..4 { - let b = tape.read_next().unwrap(); - tape.write_next(b); - } - - let out: Vec = tape.into_output().into_iter().flat_map(|b| b.to_vec()).collect(); - assert_eq!(out, b"ACGT"); - } - - #[test] - fn tape_into_output_exact_boundary() { - // Write exactly until the last byte of the first slice, nothing more. - let chunks = vec![ - owned_bytes(b"ACGT"), - owned_bytes(b"TTTT"), - ]; - let mut tape = RopeTape::new(chunks); - for _ in 0..4 { - let b = tape.read_next().unwrap(); - tape.write_next(b); - } - // Skip the rest - let out: Vec = tape.into_output().into_iter().flat_map(|b| b.to_vec()).collect(); - assert_eq!(out, b"ACGT"); - } - - #[test] - fn tape_written_since_across_slices() { - let chunks = vec![ - owned_bytes(b"XXXX"), - owned_bytes(b"YYYY"), - ]; - let mut tape = RopeTape::new(chunks); - let snap = tape.write_snapshot(); - for _ in 0..6 { - let b = tape.read_next().unwrap(); - tape.write_next(b); - } - assert_eq!(tape.written_since(snap), 6); - } -} diff --git a/src/obirope/src/cursor.rs b/src/obirope/src/cursor.rs deleted file mode 100644 index 64722f3..0000000 --- a/src/obirope/src/cursor.rs +++ /dev/null @@ -1,7 +0,0 @@ -/// A position within a rope (`Vec`), usable as either a read or write head. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct RopeCursor { - rope: &Rope, - slice: usize, - byte: usize, -} diff --git a/src/obirope/src/lib.rs b/src/obirope/src/lib.rs deleted file mode 100644 index 02ee338..0000000 --- a/src/obirope/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod rope; - -use rope::Rope; diff --git a/src/obirope/src/rope.rs b/src/obirope/src/rope.rs deleted file mode 100644 index 5352843..0000000 --- a/src/obirope/src/rope.rs +++ /dev/null @@ -1,100 +0,0 @@ -use bytes::Bytes; - -pub struct Rope { - blocks: Vec, - length: usize, - cum_length: Vec, - last_block_start: usize, - last_block_end: usize, - last_block_idx: usize, -} - -impl Rope { - pub fn new() -> Self { - Self { - blocks: Vec::new(), - length: 0, - cum_length: Vec::new(), - last_block_start: 0, - last_block_end: 0, - last_block_idx: 0, - } - } - - pub fn push(&mut self, block: Bytes) { - self.blocks.push(block); - self.length += block.len(); - self.cum_length.push(self.length); - if self.blocks.len() == 1 { - self.last_block_start = 0; - self.last_block_end = block.len(); - self.last_block_idx = 0; - } - } - - pub fn n_blocks(&self) -> usize { - self.blocks.len() - } - - pub fn len(&self) -> usize { - self.length - } - - fn lookup(&mut self, i: usize) -> Option<(usize, usize, usize)> { - if i >= self.length { - return None; - } - - let (block_idx, from, to) = match self.blocks.len() { - 0 => return None, - 1 => (0, 0, self.cum_length[0]), - 2 => { - if i < self.cum_length[0] { - (0, 0, self.cum_length[0]) - } else { - (1, self.cum_length[0], self.blocks[1].len()) - } - } - _ => { - let begin_idx = 0; - let end_idx = self.blocks.len() - 1; - while begin_idx < end_idx { - let pivot_idx = (begin_idx + end_idx) / 2; - let offset = self.cum_length[pivot_idx]; - if i < offset { - end_idx = pivot_idx; - } else { - begin_idx = pivot_idx + 1; - } - } - ( - begin_idx - 1, - self.cum_length[begin_idx - 1], - self.blocks[begin_idx].len(), - ) - } - }; - self.last_block_idx = block_idx; - self.last_block_start = from; - self.last_block_end = to; - Some((block_idx, from, to)) - } - - pub fn coordinates(&self, i: usize) -> Option<(usize, usize)> { - let (block_idx, pos) = if i < self.last_block_end && i >= self.last_block_start { - (self.last_block_idx, i - self.last_block_start) - } else { - let (block_idx, from, to) = self.lookup(i)?; - (block_idx, i - from) - }; - Some((block_idx, pos)) - } - - pub fn block(&self, i: usize) -> Option<&Bytes> { - self.blocks.get(i) - } - - pub fn is_empty(&self) -> bool { - self.blocks.is_empty() - } -} diff --git a/src/obiskbuilder/Cargo.toml b/src/obiskbuilder/Cargo.toml index 19d0505..4a4832c 100644 --- a/src/obiskbuilder/Cargo.toml +++ b/src/obiskbuilder/Cargo.toml @@ -5,5 +5,4 @@ edition = "2024" [dependencies] obikseq = { path = "../obikseq" } -obiread = { path = "../obiread" } -bytes = "1" +obikrope = { path = "../obikrope" } diff --git a/src/obiskbuilder/src/iter.rs b/src/obiskbuilder/src/iter.rs index d2b6f52..91760aa 100644 --- a/src/obiskbuilder/src/iter.rs +++ b/src/obiskbuilder/src/iter.rs @@ -15,8 +15,7 @@ //! | minimizer changed | k | //! | super-kmer length = 256| k | -use bytes::Bytes; -use obiread::tape::RopeCursor; +use obikrope::{ForwardCursor, Rope, RopeCursor}; use obikseq::superkmer::SuperKmer; use crate::encoding::encode_base; @@ -26,9 +25,8 @@ use crate::scratch::SuperKmerScratch; use crate::window::KmerWindow; /// Iterator over `(minimizer, SuperKmer)` pairs. -pub struct SuperKmerIter { - rope: Vec, - cursor: RopeCursor, +pub struct SuperKmerIter<'a> { + cursor: ForwardCursor<'a>, k: usize, scratch: SuperKmerScratch, window: KmerWindow, @@ -38,23 +36,16 @@ pub struct SuperKmerIter { prev_minimizer_pos: u8, } -impl SuperKmerIter { +impl<'a> SuperKmerIter<'a> { /// Build an iterator from a normalised rope chunk. /// /// - `k`: k-mer size (1–31) /// - `m`: minimizer size (1 < m < k) /// - `level_max`: maximum sub-word size for entropy (typically 6) /// - `theta`: entropy threshold; k-mers with score ≤ theta are rejected - pub fn new( - chunks: Vec, - k: usize, - m: usize, - level_max: usize, - theta: f64, - ) -> Self { + pub fn new(rope: &'a Rope, k: usize, m: usize, level_max: usize, theta: f64) -> Self { Self { - rope: chunks, - cursor: RopeCursor::new(), + cursor: rope.fw_cursor(), k, scratch: SuperKmerScratch::new(), window: KmerWindow::new(k), @@ -65,13 +56,6 @@ impl SuperKmerIter { } } - #[inline] - fn read_byte(&mut self) -> Option { - let b = self.cursor.peek(&self.rope)?; - self.cursor.advance(&self.rope); - Some(b) - } - fn reset_state(&mut self) { self.window.reset(); self.minimizer.reset(); @@ -80,8 +64,6 @@ impl SuperKmerIter { self.prev_minimizer_pos = 0; } - /// Emit the current scratch as a super-kmer if it holds at least k nucleotides. - /// Returns `None` and silently discards short buffers (< k nt, no complete kmer). fn try_emit(&mut self, min: u64) -> Option<(u64, SuperKmer)> { if self.scratch.len() < self.k { self.scratch.reset(); @@ -93,19 +75,17 @@ impl SuperKmerIter { } } -impl Iterator for SuperKmerIter { +impl<'a> Iterator for SuperKmerIter<'a> { type Item = (u64, SuperKmer); fn next(&mut self) -> Option<(u64, SuperKmer)> { loop { - let byte = match self.read_byte() { + let byte = match self.cursor.read_next().ok() { None => { - // EOF: flush whatever we have. let min = self.prev_minimizer.unwrap_or(0); return self.try_emit(min); } Some(0x00) => { - // Segment boundary: flush current super-kmer, then reset. let min = self.prev_minimizer.unwrap_or(0); let result = self.try_emit(min); self.reset_state(); @@ -118,25 +98,22 @@ impl Iterator for SuperKmerIter { }; let base2 = encode_base(byte); - - // Update sliding windows; kmer_ready is true once we have k bases. let kmer_ready = self.window.push(base2); let current_min = self.minimizer.push(base2); if !kmer_ready { - // Warm-up phase: just accumulate. self.scratch.push(byte); continue; } let kmer = self.window.kmer_u64(); - let min = current_min.unwrap(); // guaranteed when kmer_ready + let min = current_min.unwrap(); // ── 1. Entropy check ───────────────────────────────────────────── if !self.entropy.accept(kmer) { let prev_min = self.prev_minimizer.unwrap_or(0); let result = self.try_emit(prev_min); - self.cursor.retreat(self.k - 1, &self.rope); + self.cursor.rewind(self.k - 1).ok(); self.reset_state(); if result.is_some() { return result; @@ -148,7 +125,7 @@ impl Iterator for SuperKmerIter { if let Some(prev) = self.prev_minimizer { if min != prev { let result = self.try_emit(prev); - self.cursor.retreat(self.k, &self.rope); + self.cursor.rewind(self.k).ok(); self.reset_state(); if result.is_some() { return result; @@ -158,10 +135,9 @@ impl Iterator for SuperKmerIter { } // ── 3. Super-kmer length check ──────────────────────────────────── - // S[j] would be the 257th nucleotide → don't add it. if self.scratch.len() == 256 { let result = self.try_emit(min); - self.cursor.retreat(self.k, &self.rope); + self.cursor.rewind(self.k).ok(); self.reset_state(); if result.is_some() { return result; @@ -169,7 +145,6 @@ impl Iterator for SuperKmerIter { continue; } - // All checks passed: accept S[j]. self.prev_minimizer = Some(min); self.prev_minimizer_pos = self.minimizer.minimizer_pos(); self.scratch.push(byte); @@ -182,29 +157,24 @@ impl Iterator for SuperKmerIter { #[cfg(test)] mod tests { use super::*; - use bytes::BytesMut; - fn chunks(data: &[u8]) -> Vec { - vec![BytesMut::from(data).freeze()] + fn make_rope(data: &[u8]) -> Rope { + let mut r = Rope::new(); + r.push(data.to_vec()); + r } - /// theta=0: accept everything; level_max=1 (minimal entropy computation). fn run_nofilter(data: &[u8], k: usize, m: usize) -> Vec> { - SuperKmerIter::new(chunks(data), k, m, 1, 0.0) + let rope = make_rope(data); + SuperKmerIter::new(&rope, k, m, 1, 0.0) .map(|(_, sk)| sk.to_ascii()) .collect() } - // ── basic segmentation (no entropy / minimizer cut) ─────────────────────── - #[test] fn single_segment_one_superkmer() { - // k=4, m=2; one segment "ACGTACGT\x00" - // All consecutive 4-mers share the same minimizer (or not — but we - // just check that we get the full sequence back with theta=0). let out = run_nofilter(b"ACGTACGT\x00", 4, 2); assert!(!out.is_empty()); - // The concatenation of all emitted superkmers covers the segment. let total: Vec = out.into_iter().flatten().collect(); assert!(total.len() >= 4); } @@ -227,54 +197,37 @@ mod tests { assert!(!out.is_empty()); } - // ── entropy cut ─────────────────────────────────────────────────────────── - #[test] fn low_complexity_kmer_is_rejected() { - // "AAAAAAAAACGT\x00": the run of A's is low-complexity; - // with a strict threshold the A-only kmers get cut. - // With theta=0 everything passes → we get one superkmer. let out_pass = run_nofilter(b"AAAAAAAAACGT\x00", 4, 2); assert!(!out_pass.is_empty()); - // With a high threshold polyA should be rejected. - let out_reject: Vec> = - SuperKmerIter::new(chunks(b"AAAAAAAA\x00"), 4, 2, 6, 0.9) - .map(|(_, sk)| sk.to_ascii()) - .collect(); - // The sequence is pure polyA: all kmers should fail entropy → no emission. + let rope = make_rope(b"AAAAAAAA\x00"); + let out_reject: Vec> = SuperKmerIter::new(&rope, 4, 2, 6, 0.9) + .map(|(_, sk)| sk.to_ascii()) + .collect(); assert!(out_reject.is_empty()); } - // ── multi-slice rope ────────────────────────────────────────────────────── - #[test] fn multi_slice_rope() { let data = b"ACGTACGTACGT\x00"; let mid = data.len() / 2; - let rope = vec![ - BytesMut::from(&data[..mid]).freeze(), - BytesMut::from(&data[mid..]).freeze(), - ]; - let out: Vec> = SuperKmerIter::new(rope, 4, 2, 1, 0.0) + let mut rope = Rope::new(); + rope.push(data[..mid].to_vec()); + rope.push(data[mid..].to_vec()); + let out: Vec> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0) .map(|(_, sk)| sk.to_ascii()) .collect(); assert!(!out.is_empty()); } - // ── minimizer is returned ───────────────────────────────────────────────── - #[test] fn yields_minimizer_value() { - let data = b"ACGTACGT\x00"; - let results: Vec<(u64, Vec)> = - SuperKmerIter::new(chunks(data), 4, 2, 1, 0.0) - .map(|(min, sk)| (min, sk.to_ascii())) - .collect(); + let rope = make_rope(b"ACGTACGT\x00"); + let results: Vec<(u64, Vec)> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0) + .map(|(min, sk)| (min, sk.to_ascii())) + .collect(); assert!(!results.is_empty()); - // The minimizer is a non-trivial u64 (just check it's been computed). - for (min, _) in &results { - let _ = min; // value is format-specific; just verify no panic - } } }