♻️ refactor rope implementation to use obikrope

- rename `obirope` → `obikroper`
- replace legacy rope with new in-place, Cell-based implementation
  - add ForwardCursor/Backward Cursor & SeekMode support (no more BytesMut)
- update all dependents:
  - obiread: switch to Rope + cursors, remove tape.rs
    • chunk iterator yields `Rope` instead of Vec<Bytes>
  - obiskbuilder: use ForwardCursor over Rope
- remove bytes dependency from affected crates
This commit is contained in:
Eric Coissac
2026-04-19 21:22:10 +02:00
parent 5fab59f92c
commit 0dcb5dd6c2
19 changed files with 790 additions and 1140 deletions
+10 -3
View File
@@ -595,6 +595,14 @@ dependencies = [
"obiskbuilder", "obiskbuilder",
] ]
[[package]]
name = "obikrope"
version = "0.1.0"
dependencies = [
"bytes",
"criterion2",
]
[[package]] [[package]]
name = "obikseq" name = "obikseq"
version = "0.1.0" version = "0.1.0"
@@ -607,8 +615,8 @@ dependencies = [
name = "obiread" name = "obiread"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bytes",
"niffler", "niffler",
"obikrope",
"ureq", "ureq",
] ]
@@ -616,9 +624,8 @@ dependencies = [
name = "obiskbuilder" name = "obiskbuilder"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bytes", "obikrope",
"obikseq", "obikseq",
"obiread",
] ]
[[package]] [[package]]
+1 -1
View File
@@ -1,3 +1,3 @@
[workspace] [workspace]
resolver = "3" resolver = "3"
members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer"] members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope"]
@@ -6,6 +6,5 @@ edition = "2024"
[dev-dependencies] [dev-dependencies]
criterion2 = { version = "3", features = ["cargo_bench_support"] } criterion2 = { version = "3", features = ["cargo_bench_support"] }
[[bench]] [dependencies]
name = "superkmer" bytes = "1.11.1"
harness = false
+310
View File
@@ -0,0 +1,310 @@
use std::cell::Cell;
use crate::{Rope, RopeError};
#[derive(Clone, Copy)]
pub enum SeekMode {
Absolute,
Relative,
RelativeToEnd,
}
// ── shared state ──────────────────────────────────────────────────────────────
#[derive(Clone)]
struct CursorState<'a> {
block_idx: Cell<usize>,
block_start: Cell<usize>,
block_end: Cell<usize>,
block: Cell<&'a [Cell<u8>]>,
initialized: Cell<bool>,
current: Cell<Option<usize>>,
}
impl<'a> CursorState<'a> {
fn new() -> Self {
Self {
block_idx: Cell::new(0),
block_start: Cell::new(0),
block_end: Cell::new(0),
block: Cell::new(&[]),
initialized: Cell::new(false),
current: Cell::new(None),
}
}
fn get(&self, rope: &'a Rope, i: usize) -> Option<u8> {
if !self.initialized.get() || i < self.block_start.get() || i >= self.block_end.get() {
let (bi, bs, be) = rope.lookup(i)?;
self.block_idx.set(bi);
self.block_start.set(bs);
self.block_end.set(be);
self.block.set(rope.get_block(bi)?);
self.initialized.set(true);
}
Some(self.block.get()[i - self.block_start.get()].get())
}
fn set(&self, rope: &'a Rope, i: usize, value: u8) -> Result<(), RopeError> {
if !self.initialized.get() || i < self.block_start.get() || i >= self.block_end.get() {
let (bi, bs, be) = rope.lookup(i).ok_or(
RopeError::OutOfBounds(format!("index out of bounds: i={} > {}", i, rope.len())),
)?;
self.block_idx.set(bi);
self.block_start.set(bs);
self.block_end.set(be);
self.block.set(rope.get_block(bi).ok_or(RopeError::BlockNotFound(format!(
"Cannot find block for index {}",
i
)))?);
self.initialized.set(true);
}
self.block.get()[i - self.block_start.get()].set(value);
Ok(())
}
}
// ── trait ─────────────────────────────────────────────────────────────────────
pub trait RopeCursor {
fn get(&self, i: usize) -> Option<u8>;
fn set(&self, i: usize, value: u8) -> Result<(), RopeError>;
fn peek(&self) -> Option<u8>;
fn poke(&self, value: u8) -> Result<(), RopeError>;
fn tell(&self) -> Option<usize>;
fn seek(&self, pos: isize, mode: SeekMode) -> Result<usize, RopeError>;
fn rewind(&self, go_back_of: usize) -> Result<(), RopeError>;
fn len(&self) -> usize;
fn read_next(&self) -> Result<u8, RopeError>;
}
// ── ForwardCursor ─────────────────────────────────────────────────────────────
#[derive(Clone)]
pub struct ForwardCursor<'a> {
rope: &'a Rope,
state: CursorState<'a>,
}
impl<'a> ForwardCursor<'a> {
pub fn new(rope: &'a Rope) -> Self {
Self { rope, state: CursorState::new() }
}
pub fn read_ahead(&self, ahead: usize) -> Result<u8, RopeError> {
let pos = self.state.current.get().ok_or(RopeError::CurrentNotSet)?;
self.state
.get(self.rope, pos + ahead)
.ok_or(RopeError::OutOfBounds(format!(
"index out of bounds: i={} + {} > {}",
pos, ahead, self.rope.len()
)))
}
pub fn write(&self, value: u8) -> Result<(), RopeError> {
let pos = self.state.current.get().unwrap_or(0);
self.state.set(self.rope, pos, value)?;
self.state.current.set(Some(pos + 1));
Ok(())
}
pub fn iter(&self) -> ForwardIter<'a, '_> {
ForwardIter { cursor: self }
}
}
impl<'a> RopeCursor for ForwardCursor<'a> {
fn get(&self, i: usize) -> Option<u8> {
self.state.get(self.rope, i)
}
fn set(&self, i: usize, value: u8) -> Result<(), RopeError> {
self.state.set(self.rope, i, value)
}
fn peek(&self) -> Option<u8> {
self.state.get(self.rope, self.state.current.get()?)
}
fn poke(&self, value: u8) -> Result<(), RopeError> {
let pos = self.state.current.get().ok_or(RopeError::CurrentNotSet)?;
self.state.set(self.rope, pos, value)
}
fn tell(&self) -> Option<usize> {
self.state.current.get()
}
fn len(&self) -> usize {
self.rope.len()
}
fn read_next(&self) -> Result<u8, RopeError> {
let next_pos = match self.state.current.get() {
Some(i) => i + 1,
None => 0,
};
let value = self.state
.get(self.rope, next_pos)
.ok_or(RopeError::OutOfBounds(format!(
"index out of bounds: i={} > {}",
next_pos, self.rope.len()
)))?;
self.state.current.set(Some(next_pos));
Ok(value)
}
fn seek(&self, pos: isize, mode: SeekMode) -> Result<usize, RopeError> {
let pos = match mode {
SeekMode::Absolute => pos,
SeekMode::Relative => self.state.current.get().ok_or(RopeError::CurrentNotSet)? as isize + pos,
SeekMode::RelativeToEnd => self.rope.len() as isize - pos,
};
if pos < 0 {
return Err(RopeError::OutOfBounds(format!("index out of bounds: i={} < 0", pos)));
}
self.state.current.set(Some(pos as usize));
Ok(pos as usize)
}
fn rewind(&self, go_back_of: usize) -> Result<(), RopeError> {
self.seek(-(go_back_of as isize), SeekMode::Relative)?;
Ok(())
}
}
impl Iterator for ForwardCursor<'_> {
type Item = u8;
fn next(&mut self) -> Option<Self::Item> {
self.read_next().ok()
}
}
pub struct ForwardIter<'a, 'b> {
cursor: &'b ForwardCursor<'a>,
}
impl Iterator for ForwardIter<'_, '_> {
type Item = u8;
fn next(&mut self) -> Option<u8> {
self.cursor.read_next().ok()
}
}
// ── BackwardCursor ────────────────────────────────────────────────────────────
#[derive(Clone)]
pub struct BackwardCursor<'a> {
rope: &'a Rope,
state: CursorState<'a>,
}
impl<'a> BackwardCursor<'a> {
pub fn new(rope: &'a Rope) -> Self {
Self { rope, state: CursorState::new() }
}
pub fn read_behind(&self, behind: usize) -> Result<u8, RopeError> {
let pos = self.state.current.get().ok_or(RopeError::CurrentNotSet)?;
let target = pos
.checked_add(behind)
.filter(|&t| t < self.rope.len())
.ok_or(RopeError::OutOfBounds(format!(
"index out of bounds: i={} + {} > {}",
pos, behind, self.rope.len()
)))?;
self.state
.get(self.rope, target)
.ok_or(RopeError::OutOfBounds(format!(
"index out of bounds: i={} + {} > {}",
pos, behind, self.rope.len()
)))
}
pub fn iter(&self) -> BackwardIter<'a, '_> {
BackwardIter { cursor: self }
}
}
impl<'a> RopeCursor for BackwardCursor<'a> {
fn get(&self, i: usize) -> Option<u8> {
self.state.get(self.rope, i)
}
fn set(&self, i: usize, value: u8) -> Result<(), RopeError> {
self.state.set(self.rope, i, value)
}
fn peek(&self) -> Option<u8> {
self.state.get(self.rope, self.state.current.get()?)
}
fn poke(&self, value: u8) -> Result<(), RopeError> {
let pos = self.state.current.get().ok_or(RopeError::CurrentNotSet)?;
self.state.set(self.rope, pos, value)
}
fn tell(&self) -> Option<usize> {
self.state.current.get()
}
fn len(&self) -> usize {
self.rope.len()
}
fn read_next(&self) -> Result<u8, RopeError> {
let next_pos = match self.state.current.get() {
None => self.rope.len().checked_sub(1).ok_or(RopeError::OutOfBounds(
"BackwardCursor: rope is empty".to_string(),
))?,
Some(0) => return Err(RopeError::OutOfBounds(
"BackwardCursor: already at beginning".to_string(),
)),
Some(i) => i - 1,
};
let value = self.state
.get(self.rope, next_pos)
.ok_or(RopeError::OutOfBounds(format!(
"BackwardCursor: index out of bounds at i={}",
next_pos
)))?;
self.state.current.set(Some(next_pos));
Ok(value)
}
fn seek(&self, pos: isize, mode: SeekMode) -> Result<usize, RopeError> {
let pos = match mode {
SeekMode::Absolute => pos,
SeekMode::Relative => self.state.current.get().ok_or(RopeError::CurrentNotSet)? as isize - pos,
SeekMode::RelativeToEnd => self.rope.len() as isize - pos,
};
if pos < 0 {
return Err(RopeError::OutOfBounds(format!("index out of bounds: i={} < 0", pos)));
}
self.state.current.set(Some(pos as usize));
Ok(pos as usize)
}
fn rewind(&self, go_back_of: usize) -> Result<(), RopeError> {
self.seek(-(go_back_of as isize), SeekMode::Relative)?;
Ok(())
}
}
impl Iterator for BackwardCursor<'_> {
type Item = u8;
fn next(&mut self) -> Option<Self::Item> {
self.read_next().ok()
}
}
pub struct BackwardIter<'a, 'b> {
cursor: &'b BackwardCursor<'a>,
}
impl Iterator for BackwardIter<'_, '_> {
type Item = u8;
fn next(&mut self) -> Option<u8> {
self.cursor.read_next().ok()
}
}
+6
View File
@@ -0,0 +1,6 @@
#[derive(Debug)]
pub enum RopeError {
OutOfBounds(String),
BlockNotFound(String),
CurrentNotSet,
}
+8
View File
@@ -0,0 +1,8 @@
mod cursor;
mod error;
mod rope;
pub use {
cursor::BackwardCursor, cursor::ForwardCursor, cursor::RopeCursor, cursor::SeekMode,
error::RopeError, rope::Rope,
};
+114
View File
@@ -0,0 +1,114 @@
use crate::{BackwardCursor, ForwardCursor, RopeError};
use std::cell::Cell;
pub struct Rope {
pub(crate) blocks: Vec<Vec<Cell<u8>>>,
pub(crate) length: usize,
pub(crate) start_block_idx: Vec<usize>,
}
impl Rope {
pub fn new() -> Self {
Self {
blocks: Vec::new(),
length: 0,
start_block_idx: Vec::new(),
}
}
pub fn push(&mut self, block: Vec<u8>) {
let block_len = block.len();
self.start_block_idx.push(self.length);
// Safety: Cell<u8> has the same memory layout as u8 (guaranteed by the language)
let cell_block: Vec<Cell<u8>> = unsafe {
let mut v = std::mem::ManuallyDrop::new(block);
Vec::from_raw_parts(v.as_mut_ptr() as *mut Cell<u8>, v.len(), v.capacity())
};
self.blocks.push(cell_block);
self.length += block_len;
}
pub fn n_blocks(&self) -> usize {
self.blocks.len()
}
pub(crate) fn get_block(&self, block_idx: usize) -> Option<&[Cell<u8>]> {
self.blocks.get(block_idx).map(Vec::as_slice)
}
pub fn len(&self) -> usize {
self.length
}
pub(crate) fn lookup(&self, i: usize) -> Option<(usize, usize, usize)> {
if i >= self.length || self.blocks.is_empty() {
return None;
}
let block_idx = self.start_block_idx.partition_point(|&s| s <= i) - 1;
let from = self.start_block_idx[block_idx];
let to = if block_idx + 1 < self.blocks.len() {
self.start_block_idx[block_idx + 1]
} else {
self.length
};
Some((block_idx, from, to))
}
pub fn split_off(&mut self, pos: usize) -> Result<Rope, RopeError> {
if pos > self.length {
return Err(RopeError::OutOfBounds(format!(
"split_at: pos={} > length={}",
pos, self.length
)));
}
// pos == length: tail is empty.
if pos == self.length {
return Ok(Rope::new());
}
let (block_idx, from, _) = self.lookup(pos).ok_or_else(|| {
RopeError::OutOfBounds(format!("split_at: lookup failed at pos={}", pos))
})?;
let cut_offset = pos - from;
// Keep block_idx in self temporarily, split it, move remainder to tail.
let mut tail_blocks = self.blocks.split_off(block_idx + 1);
self.start_block_idx.truncate(block_idx + 1);
let tail_part = self.blocks[block_idx].split_off(cut_offset);
if !tail_part.is_empty() {
tail_blocks.insert(0, tail_part);
}
let mut tail_length = 0;
let tail_starts: Vec<usize> = tail_blocks
.iter()
.map(|b| {
let s = tail_length;
tail_length += b.len();
s
})
.collect();
self.length = pos;
Ok(Rope {
blocks: tail_blocks,
length: tail_length,
start_block_idx: tail_starts,
})
}
pub fn is_empty(&self) -> bool {
self.blocks.is_empty()
}
pub fn fw_cursor(&self) -> ForwardCursor<'_> {
ForwardCursor::new(self)
}
pub fn bw_cursor(&self) -> BackwardCursor<'_> {
BackwardCursor::new(self)
}
}
+1 -1
View File
@@ -4,6 +4,6 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
bytes = "1" obikrope = { path = "../obikrope" }
niffler = { version = "2", default-features = false, features = ["gz", "bz2", "lzma", "zstd"] } niffler = { version = "2", default-features = false, features = ["gz", "bz2", "lzma", "zstd"] }
ureq = "2" ureq = "2"
+27 -117
View File
@@ -1,54 +1,40 @@
//! Chunk iterator: yields rope slices that each end on a complete sequence record. //! Chunk iterator: yields Rope slices that each end on a complete sequence record.
//! //!
//! Each `Vec<Bytes>` yielded by [`SeqChunkIter`] contains one or more reference-counted //! Each `Rope` yielded by [`SeqChunkIter`] contains one or more blocks that
//! byte slices that together form a self-contained block of complete sequence records. //! together form a self-contained block of complete sequence records.
//! The slices are NOT contiguous in memory — the consumer iterates over them in order.
//!
//! The splitter operates directly on the rope via [`RopeCursor`], so no packing
//! (flattening into a contiguous buffer) is ever required — even for sequences
//! longer than the read block size.
use std::io::{self, Read}; use std::io::{self, Read};
use bytes::{Bytes, BytesMut}; use obikrope::Rope;
/// A splitter function: given the accumulated rope, returns the absolute byte /// A splitter function: given the accumulated rope, returns the absolute byte
/// offset at which to cut, or `None` if no complete-record boundary was found. /// offset at which to cut, or `None` if no complete-record boundary was found.
pub type Splitter = fn(&[Bytes]) -> Option<usize>; pub type Splitter = fn(&Rope) -> Option<usize>;
/// Iterator that reads from `R` in blocks and yields `Vec<Bytes>` chunks, /// Iterator that reads from `R` in blocks and yields `Rope` chunks,
/// each ending on a complete sequence record boundary. /// each ending on a complete sequence record boundary.
pub struct SeqChunkIter<R> { pub struct SeqChunkIter<R> {
source: R, source: R,
rope: Vec<Bytes>, rope: Rope,
block_size: usize, block_size: usize,
splitter: Splitter, splitter: Splitter,
probe: &'static [u8],
eof: bool, eof: bool,
} }
impl<R: Read> SeqChunkIter<R> { impl<R: Read> SeqChunkIter<R> {
/// Create a new iterator. /// Create a new iterator.
/// pub fn new(source: R, block_size: usize, splitter: Splitter) -> Self {
/// - `block_size`: bytes per read call (default 1 MiB).
/// - `splitter`: format-specific backward boundary detector working on the rope.
/// - `probe`: short byte string whose presence is necessary (not sufficient)
/// for a boundary to exist (e.g. `b"\n>"` for FASTA, `b"\n@"` for FASTQ).
pub fn new(source: R, block_size: usize, splitter: Splitter, probe: &'static [u8]) -> Self {
Self { Self {
source, source,
rope: Vec::with_capacity(4), rope: Rope::new(),
block_size, block_size,
splitter, splitter,
probe,
eof: false, eof: false,
} }
} }
/// Read one block from source into a fresh `Bytes`. fn read_block(&mut self) -> io::Result<Option<Vec<u8>>> {
/// Returns `None` on EOF (zero bytes read). let mut buf = vec![0u8; self.block_size];
fn read_block(&mut self) -> io::Result<Option<Bytes>> {
let mut buf = BytesMut::zeroed(self.block_size);
let mut filled = 0; let mut filled = 0;
loop { loop {
match self.source.read(&mut buf[filled..]) { match self.source.read(&mut buf[filled..]) {
@@ -67,46 +53,12 @@ impl<R: Read> SeqChunkIter<R> {
return Ok(None); return Ok(None);
} }
buf.truncate(filled); buf.truncate(filled);
Ok(Some(buf.freeze())) Ok(Some(buf))
}
/// Check whether the boundary probe might appear in the newly added block
/// or at the seam between the last two blocks.
///
/// This is a fast O(block_size) heuristic: if the probe is absent, the
/// splitter is not called.
fn probe_in_last_chunk(&self) -> bool {
let last = match self.rope.last() {
Some(b) => b,
None => return false,
};
// Within the last block.
if last.windows(self.probe.len()).any(|w| w == self.probe) {
return true;
}
// At the seam between the previous block and this one.
if self.rope.len() >= 2 {
let prev = &self.rope[self.rope.len() - 2];
let overlap = self.probe.len() - 1;
let from = prev.len().saturating_sub(overlap);
let seam: Vec<u8> = prev[from..]
.iter()
.chain(last.iter().take(overlap))
.copied()
.collect();
if seam.windows(self.probe.len()).any(|w| w == self.probe) {
return true;
}
}
false
} }
} }
impl<R: Read> Iterator for SeqChunkIter<R> { impl<R: Read> Iterator for SeqChunkIter<R> {
type Item = io::Result<Vec<Bytes>>; type Item = io::Result<Rope>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
loop { loop {
@@ -114,7 +66,7 @@ impl<R: Read> Iterator for SeqChunkIter<R> {
if self.rope.is_empty() { if self.rope.is_empty() {
return None; return None;
} }
return Some(Ok(std::mem::take(&mut self.rope))); return Some(Ok(std::mem::replace(&mut self.rope, Rope::new())));
} }
match self.read_block() { match self.read_block() {
@@ -128,53 +80,15 @@ impl<R: Read> Iterator for SeqChunkIter<R> {
} }
} }
if self.probe_in_last_chunk() {
if let Some(abs_offset) = (self.splitter)(&self.rope) { if let Some(abs_offset) = (self.splitter)(&self.rope) {
return Some(Ok(self.split_at(abs_offset))); let tail = self.rope.split_off(abs_offset)
.expect("splitter returned valid offset");
let chunk = std::mem::replace(&mut self.rope, tail);
return Some(Ok(chunk));
} }
} }
} }
} }
}
impl<R> SeqChunkIter<R> {
/// Split the rope at absolute byte offset `abs_offset`.
///
/// Returns the chunk (`rope[..abs_offset]`) as a `Vec<Bytes>` and stores
/// the remainder (`rope[abs_offset..]`) in `self.rope`.
fn split_at(&mut self, abs_offset: usize) -> Vec<Bytes> {
let mut chunk = Vec::with_capacity(self.rope.len());
let mut remaining = abs_offset;
let mut old_rope = std::mem::take(&mut self.rope);
let mut remainder_rope: Vec<Bytes> = Vec::with_capacity(2);
for piece in old_rope.drain(..) {
if remaining == 0 {
remainder_rope.push(piece);
} else if remaining >= piece.len() {
remaining -= piece.len();
chunk.push(piece);
} else {
// The cut falls inside this piece.
// Copy both halves so each gets its own Arc (unique ownership),
// which is required by RopeTape for in-place writing.
let head = BytesMut::from(&piece[..remaining]).freeze();
let tail = BytesMut::from(&piece[remaining..]).freeze();
remaining = 0;
if !head.is_empty() {
chunk.push(head);
}
if !tail.is_empty() {
remainder_rope.push(tail);
}
}
}
self.rope = remainder_rope;
chunk
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
@@ -183,17 +97,15 @@ mod tests {
use crate::fastq::end_of_last_fastq_entry; use crate::fastq::end_of_last_fastq_entry;
fn fasta_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> { fn fasta_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> {
SeqChunkIter::new(data, block_size, end_of_last_fasta_entry, b"\n>") SeqChunkIter::new(data, block_size, end_of_last_fasta_entry)
} }
fn fastq_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> { fn fastq_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> {
SeqChunkIter::new(data, block_size, end_of_last_fastq_entry, b"\n@") SeqChunkIter::new(data, block_size, end_of_last_fastq_entry)
} }
fn collect_fasta(chunks: Vec<Vec<Bytes>>) -> Vec<Vec<u8>> { fn rope_to_vec(rope: &Rope) -> Vec<u8> {
chunks.into_iter().map(|rope| { rope.fw_cursor().collect()
rope.into_iter().flat_map(|b| b.to_vec()).collect()
}).collect()
} }
// ── FASTA ───────────────────────────────────────────────────────────────── // ── FASTA ─────────────────────────────────────────────────────────────────
@@ -203,16 +115,14 @@ mod tests {
let data: &[u8] = b">s1\nACGT\n"; let data: &[u8] = b">s1\nACGT\n";
let chunks: Vec<_> = fasta_iter(data, 64).collect::<Result<_, _>>().unwrap(); let chunks: Vec<_> = fasta_iter(data, 64).collect::<Result<_, _>>().unwrap();
assert_eq!(chunks.len(), 1); assert_eq!(chunks.len(), 1);
let flat: Vec<u8> = chunks.into_iter().flatten().flat_map(|b| b.to_vec()).collect(); assert_eq!(rope_to_vec(&chunks[0]), b">s1\nACGT\n");
assert_eq!(flat, b">s1\nACGT\n");
} }
#[test] #[test]
fn fasta_two_records_split_across_chunks() { fn fasta_two_records_split_across_chunks() {
let data: &[u8] = b">s1\nACGT\n>s2\nTTTT\n"; let data: &[u8] = b">s1\nACGT\n>s2\nTTTT\n";
let chunks: Vec<_> = fasta_iter(data, 10).collect::<Result<_, _>>().unwrap(); let chunks: Vec<_> = fasta_iter(data, 10).collect::<Result<_, _>>().unwrap();
let flat = collect_fasta(chunks); let all: Vec<u8> = chunks.iter().flat_map(|r| rope_to_vec(r)).collect();
let all: Vec<u8> = flat.into_iter().flatten().collect();
assert_eq!(all, b">s1\nACGT\n>s2\nTTTT\n"); assert_eq!(all, b">s1\nACGT\n>s2\nTTTT\n");
} }
@@ -222,7 +132,7 @@ mod tests {
for block in [8, 12, 20, 100] { for block in [8, 12, 20, 100] {
let chunks: Vec<_> = fasta_iter(data, block).collect::<Result<_, _>>().unwrap(); let chunks: Vec<_> = fasta_iter(data, block).collect::<Result<_, _>>().unwrap();
for rope in &chunks { for rope in &chunks {
let flat: Vec<u8> = rope.iter().flat_map(|b| b.to_vec()).collect(); let flat = rope_to_vec(rope);
assert_eq!(flat[0], b'>', "block={block}: chunk doesn't start with '>'"); assert_eq!(flat[0], b'>', "block={block}: chunk doesn't start with '>'");
assert_eq!(*flat.last().unwrap(), b'\n', "block={block}: chunk doesn't end with newline"); assert_eq!(*flat.last().unwrap(), b'\n', "block={block}: chunk doesn't end with newline");
} }
@@ -258,7 +168,7 @@ mod tests {
(b"TTTTTTTT", b"HHHHHHHH"), (b"TTTTTTTT", b"HHHHHHHH"),
]).into_boxed_slice()); ]).into_boxed_slice());
let chunks: Vec<_> = fastq_iter(data, 16).collect::<Result<_, _>>().unwrap(); let chunks: Vec<_> = fastq_iter(data, 16).collect::<Result<_, _>>().unwrap();
let all: Vec<u8> = chunks.into_iter().flatten().flat_map(|b| b.to_vec()).collect(); let all: Vec<u8> = chunks.iter().flat_map(|r| rope_to_vec(r)).collect();
assert_eq!(all, *data); assert_eq!(all, *data);
} }
@@ -273,7 +183,7 @@ mod tests {
for block in [18, 30, 60] { for block in [18, 30, 60] {
let chunks: Vec<_> = fastq_iter(data, block).collect::<Result<_, _>>().unwrap(); let chunks: Vec<_> = fastq_iter(data, block).collect::<Result<_, _>>().unwrap();
for rope in &chunks { for rope in &chunks {
let first_byte = rope.iter().flat_map(|b| b.iter().copied()).next().unwrap(); let first_byte = rope_to_vec(rope)[0];
assert_eq!(first_byte, b'@', "block={block}: chunk doesn't start with '@'"); assert_eq!(first_byte, b'@', "block={block}: chunk doesn't start with '@'");
} }
} }
+44 -48
View File
@@ -1,8 +1,6 @@
//! Backward boundary detection for FASTA chunks. //! Backward boundary detection for FASTA chunks.
use bytes::Bytes; use obikrope::{Rope, RopeCursor};
use crate::tape::RopeCursor;
/// Scan the rope backward for the start of the last complete FASTA entry. /// Scan the rope backward for the start of the last complete FASTA entry.
/// ///
@@ -10,57 +8,51 @@ use crate::tape::RopeCursor;
/// record, so that `rope[..offset]` is a self-contained chunk and /// record, so that `rope[..offset]` is a self-contained chunk and
/// `rope[offset..]` is the remainder carried into the next chunk. /// `rope[offset..]` is the remainder carried into the next chunk.
/// Returns `None` if no valid boundary is found (need more data). /// Returns `None` if no valid boundary is found (need more data).
/// pub fn end_of_last_fasta_entry(rope: &Rope) -> Option<usize> {
/// Port of Go's `EndOfLastFastaEntry`, now working directly on a rope let cursor = rope.bw_cursor();
/// via [`RopeCursor`] — no contiguous packing required.
pub fn end_of_last_fasta_entry(rope: &[Bytes]) -> Option<usize> {
let total_len: usize = rope.iter().map(|b| b.len()).sum();
if total_len == 0 {
return None;
}
let mut cursor = RopeCursor::end(rope)?;
let mut state: u8 = 0; let mut state: u8 = 0;
let mut last: usize = 0; let mut last: usize = 0;
let mut i = total_len as isize - 1;
while i >= 0 && state < 2 { for c in cursor.iter() {
let c = cursor.peek(rope).unwrap();
match state { match state {
0 if c == b'>' => { 0 if c == b'>' => {
last = cursor.tell()?;
state = 1; state = 1;
last = i as usize;
} }
1 if c == b'\n' || c == b'\r' => { 1 if c == b'\n' || c == b'\r' => {
state = 2; if last > 0 {
return Some(last);
} }
_ => {
state = 0;
}
}
i -= 1;
if i >= 0 {
cursor.previous(rope);
}
}
if i <= 0 || state != 2 {
return None; return None;
} }
Some(last) 1 => {
state = 0;
}
_ => {}
}
}
None
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use bytes::BytesMut;
fn rope(data: &[u8]) -> Vec<Bytes> { fn rope(data: &[u8]) -> Rope {
vec![BytesMut::from(data).freeze()] let mut r = Rope::new();
r.push(data.to_vec());
r
} }
fn rope2(a: &[u8], b: &[u8]) -> Vec<Bytes> { fn rope2(a: &[u8], b: &[u8]) -> Rope {
vec![BytesMut::from(a).freeze(), BytesMut::from(b).freeze()] let mut r = Rope::new();
r.push(a.to_vec());
r.push(b.to_vec());
r
}
fn flat(r: &Rope) -> Vec<u8> {
r.fw_cursor().collect()
} }
#[test] #[test]
@@ -71,39 +63,43 @@ mod tests {
#[test] #[test]
fn two_entries_cuts_at_second_header() { fn two_entries_cuts_at_second_header() {
let data = b">seq1\nACGT\n>seq2\nTTTT\n"; let data = b">seq1\nACGT\n>seq2\nTTTT\n";
let pos = end_of_last_fasta_entry(&rope(data)).unwrap(); let r = rope(data);
assert_eq!(&data[pos..], b">seq2\nTTTT\n"); let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&data[..pos], b">seq1\nACGT\n"); assert_eq!(&flat(&r)[pos..], b">seq2\nTTTT\n");
assert_eq!(&flat(&r)[..pos], b">seq1\nACGT\n");
} }
#[test] #[test]
fn three_entries_cuts_at_last_header() { fn three_entries_cuts_at_last_header() {
let data = b">s1\nAA\n>s2\nCC\n>s3\nGG\n"; let data = b">s1\nAA\n>s2\nCC\n>s3\nGG\n";
let pos = end_of_last_fasta_entry(&rope(data)).unwrap(); let r = rope(data);
assert_eq!(&data[pos..], b">s3\nGG\n"); let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], b">s3\nGG\n");
} }
#[test] #[test]
fn multiline_sequence() { fn multiline_sequence() {
let data = b">s1\nACGT\nACGT\n>s2\nTTTT\n"; let data = b">s1\nACGT\nACGT\n>s2\nTTTT\n";
let pos = end_of_last_fasta_entry(&rope(data)).unwrap(); let r = rope(data);
assert_eq!(&data[pos..], b">s2\nTTTT\n"); let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], b">s2\nTTTT\n");
} }
#[test] #[test]
fn crlf_line_endings() { fn crlf_line_endings() {
let data = b">s1\r\nACGT\r\n>s2\r\nTTTT\r\n"; let data = b">s1\r\nACGT\r\n>s2\r\nTTTT\r\n";
let pos = end_of_last_fasta_entry(&rope(data)).unwrap(); let r = rope(data);
assert_eq!(&data[pos..], b">s2\r\nTTTT\r\n"); let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], b">s2\r\nTTTT\r\n");
} }
#[test] #[test]
fn boundary_spans_two_blocks() { fn boundary_spans_two_blocks() {
// Split so that "\n" is at end of first block and ">" at start of second.
let a = b">s1\nACGT\n"; let a = b">s1\nACGT\n";
let b = b">s2\nTTTT\n"; let b = b">s2\nTTTT\n";
let total: Vec<u8> = a.iter().chain(b.iter()).copied().collect(); let r = rope2(a, b);
let pos = end_of_last_fasta_entry(&rope2(a, b)).unwrap(); let all: Vec<u8> = flat(&r);
assert_eq!(&total[pos..], b">s2\nTTTT\n"); let pos = end_of_last_fasta_entry(&r).unwrap();
assert_eq!(&all[pos..], b">s2\nTTTT\n");
} }
} }
+42 -67
View File
@@ -9,13 +9,8 @@
//! The `@` in quality lines (Phred 31 = ASCII 64) makes forward heuristics //! The `@` in quality lines (Phred 31 = ASCII 64) makes forward heuristics
//! unreliable. The backward scanner identifies a genuine record start by //! unreliable. The backward scanner identifies a genuine record start by
//! verifying the structural context around each `@` candidate. //! verifying the structural context around each `@` candidate.
//!
//! Port of Go's `EndOfLastFastqEntry` (7-state machine), now working directly
//! on a rope via [`RopeCursor`] — no contiguous packing required.
use bytes::Bytes; use obikrope::{Rope, RopeCursor, SeekMode};
use crate::tape::RopeCursor;
#[inline] #[inline]
fn is_eol(c: u8) -> bool { fn is_eol(c: u8) -> bool {
@@ -38,42 +33,28 @@ fn is_seq_char(c: u8) -> bool {
/// record, so that `rope[..offset]` is a self-contained chunk and /// record, so that `rope[..offset]` is a self-contained chunk and
/// `rope[offset..]` is the remainder for the next chunk. /// `rope[offset..]` is the remainder for the next chunk.
/// Returns `None` if no valid boundary is found (need more data). /// Returns `None` if no valid boundary is found (need more data).
pub fn end_of_last_fastq_entry(rope: &[Bytes]) -> Option<usize> { pub fn end_of_last_fastq_entry(rope: &Rope) -> Option<usize> {
let total_len: usize = rope.iter().map(|b| b.len()).sum(); let mut cursor = rope.bw_cursor();
if total_len == 0 {
return None;
}
let mut cursor = RopeCursor::end(rope)?;
let mut state: u8 = 0; let mut state: u8 = 0;
let mut restart: isize = total_len as isize - 1; let mut restart: usize = 0;
let mut restart_cursor = cursor; let mut cut: usize = rope.len();
let mut cut: usize = total_len;
let mut i: isize = total_len as isize - 1;
while i >= 0 && state < 7 {
let c = cursor.peek(rope).unwrap();
while let Some(c) = cursor.next() {
match state { match state {
// Looking for `+` separator line content
0 => { 0 => {
if c == b'+' { if c == b'+' {
state = 1; state = 1;
restart = i; restart = cursor.tell()?;
restart_cursor = cursor;
} }
} }
// Found `+` — expect end-of-line immediately before it (going backward)
1 => { 1 => {
if is_eol(c) { if is_eol(c) {
state = 2; state = 2;
} else { } else {
state = 0; state = 0;
i = restart; cursor.seek(restart as isize, SeekMode::Absolute).ok();
cursor = restart_cursor;
} }
} }
// After `\n+`: skip separators, then expect sequence characters
2 => { 2 => {
if is_sep(c) { if is_sep(c) {
// stay // stay
@@ -81,11 +62,9 @@ pub fn end_of_last_fastq_entry(rope: &[Bytes]) -> Option<usize> {
state = 3; state = 3;
} else { } else {
state = 0; state = 0;
i = restart; cursor.seek(restart as isize, SeekMode::Absolute).ok();
cursor = restart_cursor;
} }
} }
// Scanning sequence characters backward
3 => { 3 => {
if is_eol(c) { if is_eol(c) {
state = 4; state = 4;
@@ -93,60 +72,48 @@ pub fn end_of_last_fastq_entry(rope: &[Bytes]) -> Option<usize> {
// stay // stay
} else { } else {
state = 0; state = 0;
i = restart; cursor.seek(restart as isize, SeekMode::Absolute).ok();
cursor = restart_cursor;
} }
} }
// Found end-of-line before sequence — skip any extra newlines
4 => { 4 => {
if is_eol(c) { if !is_eol(c) {
// stay
} else {
state = 5; state = 5;
} }
} }
// Scanning header content — looking for `@` at start of line
5 => { 5 => {
if is_eol(c) { if is_eol(c) {
state = 0; state = 0;
i = restart; cursor.seek(restart as isize, SeekMode::Absolute).ok();
cursor = restart_cursor;
} else if c == b'@' { } else if c == b'@' {
cut = cursor.tell()?;
state = 6; state = 6;
cut = i as usize;
} }
// else: stay // else stay
} }
// Found `@` — expect end-of-line before it
6 => { 6 => {
if is_eol(c) { if is_eol(c) {
state = 7; // success if cut > 0 {
return Some(cut);
}
return None;
} else { } else {
state = 5; state = 5;
} }
} }
_ => unreachable!(), _ => unreachable!(),
} }
i -= 1;
if i >= 0 {
cursor.previous(rope);
} }
} None
if i <= 0 || state != 7 {
return None;
}
Some(cut)
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use bytes::BytesMut;
fn rope(data: &[u8]) -> Vec<Bytes> { fn rope(data: &[u8]) -> Rope {
vec![BytesMut::from(data).freeze()] let mut r = Rope::new();
r.push(data.to_vec());
r
} }
fn make_fastq(records: &[(&[u8], &[u8])]) -> Vec<u8> { fn make_fastq(records: &[(&[u8], &[u8])]) -> Vec<u8> {
@@ -162,6 +129,10 @@ mod tests {
buf buf
} }
fn flat(r: &Rope) -> Vec<u8> {
r.fw_cursor().collect()
}
#[test] #[test]
fn single_record_no_boundary() { fn single_record_no_boundary() {
let buf = make_fastq(&[(b"ACGT", b"IIII")]); let buf = make_fastq(&[(b"ACGT", b"IIII")]);
@@ -171,9 +142,10 @@ mod tests {
#[test] #[test]
fn two_records_cuts_at_second() { fn two_records_cuts_at_second() {
let buf = make_fastq(&[(b"ACGT", b"IIII"), (b"TTTT", b"HHHH")]); let buf = make_fastq(&[(b"ACGT", b"IIII"), (b"TTTT", b"HHHH")]);
let pos = end_of_last_fastq_entry(&rope(&buf)).unwrap(); let r = rope(&buf);
assert_eq!(buf[pos], b'@'); let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(&buf[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()); assert_eq!(flat(&r)[pos], b'@');
assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice());
} }
#[test] #[test]
@@ -183,8 +155,9 @@ mod tests {
(b"CCCC", b"JJJJ"), (b"CCCC", b"JJJJ"),
(b"GGGG", b"KKKK"), (b"GGGG", b"KKKK"),
]); ]);
let pos = end_of_last_fastq_entry(&rope(&buf)).unwrap(); let r = rope(&buf);
assert_eq!(&buf[pos..], make_fastq(&[(b"GGGG", b"KKKK")]).as_slice()); let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"GGGG", b"KKKK")]).as_slice());
} }
#[test] #[test]
@@ -193,15 +166,17 @@ mod tests {
(b"ACGTACGT", b"@@@@IIII"), (b"ACGTACGT", b"@@@@IIII"),
(b"TTTT", b"HHHH"), (b"TTTT", b"HHHH"),
]); ]);
let pos = end_of_last_fastq_entry(&rope(&buf)).unwrap(); let r = rope(&buf);
assert_eq!(&buf[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice()); let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(&flat(&r)[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice());
} }
#[test] #[test]
fn crlf_line_endings() { fn crlf_line_endings() {
let buf = b"@h\r\nACGT\r\n+\r\nIIII\r\n@h\r\nTTTT\r\n+\r\nHHHH\r\n"; let data = b"@h\r\nACGT\r\n+\r\nIIII\r\n@h\r\nTTTT\r\n+\r\nHHHH\r\n";
let pos = end_of_last_fastq_entry(&rope(buf)).unwrap(); let r = rope(data);
assert_eq!(buf[pos], b'@'); let pos = end_of_last_fastq_entry(&r).unwrap();
assert_eq!(&buf[pos..], b"@h\r\nTTTT\r\n+\r\nHHHH\r\n"); assert_eq!(flat(&r)[pos], b'@');
assert_eq!(&flat(&r)[pos..], b"@h\r\nTTTT\r\n+\r\nHHHH\r\n");
} }
} }
+4 -15
View File
@@ -1,7 +1,7 @@
//! Streaming sequence file reader for obikmer. //! Streaming sequence file reader for obikmer.
//! //!
//! Yields rope chunks (`Vec<Bytes>`) from FASTA or FASTQ files, each ending //! Yields rope chunks from FASTA or FASTQ files, each ending
//! on a complete sequence record boundary. Zero allocation in the common case. //! on a complete sequence record boundary.
#![deny(missing_docs)] #![deny(missing_docs)]
@@ -9,7 +9,6 @@ mod fasta;
mod fastq; mod fastq;
pub mod chunk; pub mod chunk;
pub mod normalize; pub mod normalize;
pub mod tape;
pub mod xopen; pub mod xopen;
use std::io::Read; use std::io::Read;
@@ -22,20 +21,10 @@ pub const DEFAULT_BLOCK_SIZE: usize = 1024 * 1024;
/// Create a FASTA chunk iterator over `source`. /// Create a FASTA chunk iterator over `source`.
pub fn fasta_chunks<R: Read>(source: R) -> SeqChunkIter<R> { pub fn fasta_chunks<R: Read>(source: R) -> SeqChunkIter<R> {
SeqChunkIter::new( SeqChunkIter::new(source, DEFAULT_BLOCK_SIZE, fasta::end_of_last_fasta_entry)
source,
DEFAULT_BLOCK_SIZE,
fasta::end_of_last_fasta_entry,
b"\n>",
)
} }
/// Create a FASTQ chunk iterator over `source`. /// Create a FASTQ chunk iterator over `source`.
pub fn fastq_chunks<R: Read>(source: R) -> SeqChunkIter<R> { pub fn fastq_chunks<R: Read>(source: R) -> SeqChunkIter<R> {
SeqChunkIter::new( SeqChunkIter::new(source, DEFAULT_BLOCK_SIZE, fastq::end_of_last_fastq_entry)
source,
DEFAULT_BLOCK_SIZE,
fastq::end_of_last_fastq_entry,
b"\n@",
)
} }
+191 -270
View File
@@ -1,141 +1,114 @@
//! Sequence normalisation automata for FASTA and FASTQ rope chunks. //! Sequence normalisation automata for FASTA and FASTQ rope chunks.
//! //!
//! Both automata operate on a [`RopeTape`] in place (two-cursor, zero //! Both automata operate on a Rope in place (two-cursor, zero reallocation)
//! reallocation in the common case) and produce a compact byte stream: //! and produce a compact byte stream: ACGT segments of length ≥ k separated
//! ACGT segments of length ≥ k separated by `0x00`, uppercased. //! by `0x00`, uppercased. Ambiguous bases terminate the current segment.
//! //! Segments shorter than k are silently discarded.
//! Ambiguous bases terminate the current segment. Segments shorter than k
//! are silently discarded by retreating the write cursor.
use bytes::Bytes; use obikrope::{ForwardCursor, Rope, RopeCursor};
use crate::tape::{RopeCursor, RopeTape};
// ── public entry points ─────────────────────────────────────────────────────── // ── public entry points ───────────────────────────────────────────────────────
/// Normalise a FASTA chunk: skip headers; copy, filter and concatenate /// Normalise a FASTA chunk into a compact ACGT\x00-separated rope.
/// sequence lines across multi-line records. pub fn normalize_fasta_chunk(mut rope: Rope, k: usize) -> Rope {
/// let write_pos = {
/// Returns the written region of the tape as a rope of `Bytes`. let read = rope.fw_cursor();
pub fn normalize_fasta_chunk(chunks: Vec<Bytes>, k: usize) -> Vec<Bytes> { let write = rope.fw_cursor();
let mut tape = RopeTape::new(chunks); normalize_fasta(&read, &write, k);
normalize_fasta(&mut tape, k); write.tell().unwrap_or(0)
tape.into_output() };
let _ = rope.split_off(write_pos);
rope
} }
/// Normalise a FASTQ chunk: skip headers, `+` lines and quality lines; /// Normalise a FASTQ chunk into a compact ACGT\x00-separated rope.
/// copy and filter sequence lines. pub fn normalize_fastq_chunk(mut rope: Rope, k: usize) -> Rope {
/// let write_pos = {
/// Returns the written region of the tape as a rope of `Bytes`. let read = rope.fw_cursor();
pub fn normalize_fastq_chunk(chunks: Vec<Bytes>, k: usize) -> Vec<Bytes> { let write = rope.fw_cursor();
let mut tape = RopeTape::new(chunks); normalize_fastq(&read, &write, k);
normalize_fastq(&mut tape, k); write.tell().unwrap_or(0)
tape.into_output() };
let _ = rope.split_off(write_pos);
rope
} }
// ── FASTA automaton ─────────────────────────────────────────────────────────── // ── FASTA automaton ───────────────────────────────────────────────────────────
/// Drive the FASTA normalisation automaton on `tape`. fn normalize_fasta(read: &ForwardCursor<'_>, write: &ForwardCursor<'_>, k: usize) {
/// skip_line(read);
/// After the initial header skip, the automaton reads sequence characters
/// one by one. At each newline run it peeks at the next character to decide:
/// `>` → new record (close segment, skip header); anything else → line
/// continuation within the same sequence.
fn normalize_fasta(tape: &mut RopeTape, k: usize) {
// Skip the first header line (includes the leading `>`).
skip_line(tape);
loop { loop {
let mut seg_start = tape.write_snapshot(); let mut seg_start = write.tell().unwrap_or(0);
'seq: loop { 'seq: loop {
let Some(c) = tape.read_next() else { let Some(c) = read.read_next().ok() else {
end_segment(tape, &mut seg_start, k); end_segment(write, &mut seg_start, k);
return; return;
}; };
if is_newline(c) { if is_newline(c) {
// Peek at the very next character — no newline run to skip. match read.read_ahead(1).ok() {
// \r\n is handled naturally: \r → peek \n → continue; \n → peek content.
match tape.peek() {
None => { None => {
end_segment(tape, &mut seg_start, k); end_segment(write, &mut seg_start, k);
return; return;
} }
Some(b'>') => { Some(b'>') => {
// New record: close current segment, skip next header. end_segment(write, &mut seg_start, k);
end_segment(tape, &mut seg_start, k); skip_line(read);
skip_line(tape); break 'seq;
break 'seq; // restart outer loop
}
Some(_) => {
// Line continuation or \r before \n: next char is a nucleotide.
continue 'seq;
} }
Some(_) => continue 'seq,
} }
} }
let upper = c & !0x20u8; let upper = c & !0x20u8;
if is_acgt(upper) { if is_acgt(upper) {
tape.write_next(upper); write.write(upper).ok();
} else { } else {
// Ambiguous base: close segment, skip the non-ACGT run. end_segment(write, &mut seg_start, k);
end_segment(tape, &mut seg_start, k); skip_until_acgt_or_newline(read);
skip_until_acgt_or_newline(tape); seg_start = write.tell().unwrap_or(0);
seg_start = tape.write_snapshot();
} }
} }
// Outer loop restarts for the next record.
} }
} }
// ── FASTQ automaton ─────────────────────────────────────────────────────────── // ── FASTQ automaton ───────────────────────────────────────────────────────────
/// Drive the FASTQ normalisation automaton on `tape`. fn normalize_fastq(read: &ForwardCursor<'_>, write: &ForwardCursor<'_>, k: usize) {
///
/// The FASTQ 4-line structure (`@header`, sequence, `+`, quality) is rigid,
/// so the automaton cycles through four fixed phases without backtracking.
fn normalize_fastq(tape: &mut RopeTape, k: usize) {
loop { loop {
// ── Phase 1: skip header line (@…\n) ───────────────────────────── skip_line(read); // skip header
skip_line(tape);
// ── Phase 2: copy sequence ──────────────────────────────────────── let mut seg_start = write.tell().unwrap_or(0);
let mut seg_start = tape.write_snapshot();
'seq: loop { 'seq: loop {
let Some(c) = tape.read_next() else { let Some(c) = read.read_next().ok() else {
// EOF inside a sequence: flush whatever we have. end_segment(write, &mut seg_start, k);
end_segment(tape, &mut seg_start, k);
return; return;
}; };
if is_newline(c) { if is_newline(c) {
skip_newlines(tape); skip_newlines(read);
break 'seq; break 'seq;
} }
let upper = c & !0x20u8; // ASCII uppercase trick let upper = c & !0x20u8;
if is_acgt(upper) { if is_acgt(upper) {
tape.write_next(upper); write.write(upper).ok();
} else { } else {
// Ambiguous base: close the current segment, skip non-ACGT run. end_segment(write, &mut seg_start, k);
end_segment(tape, &mut seg_start, k); skip_until_acgt_or_newline(read);
skip_until_acgt_or_newline(tape); seg_start = write.tell().unwrap_or(0);
seg_start = tape.write_snapshot();
} }
} }
// Sequence line ended: close the last segment. end_segment(write, &mut seg_start, k);
end_segment(tape, &mut seg_start, k);
// ── Phase 3: skip + line ────────────────────────────────────────── skip_line(read); // skip + line
skip_line(tape); skip_line(read); // skip quality line
// ── Phase 4: skip quality line ──────────────────────────────────── if read.read_ahead(1).ok().is_none() {
skip_line(tape);
if tape.peek().is_none() {
return; return;
} }
} }
@@ -143,46 +116,38 @@ fn normalize_fastq(tape: &mut RopeTape, k: usize) {
// ── shared helpers ──────────────────────────────────────────────────────────── // ── shared helpers ────────────────────────────────────────────────────────────
/// Skip to the end of the current line, consuming the newline run. fn skip_line(read: &ForwardCursor<'_>) {
fn skip_line(tape: &mut RopeTape) { while let Some(c) = read.read_next().ok() {
while let Some(c) = tape.read_next() {
if is_newline(c) { if is_newline(c) {
skip_newlines(tape); skip_newlines(read);
return; return;
} }
} }
} }
/// Consume a contiguous run of `\n` / `\r` characters. fn skip_newlines(read: &ForwardCursor<'_>) {
fn skip_newlines(tape: &mut RopeTape) { while matches!(read.read_ahead(1).ok(), Some(c) if is_newline(c)) {
while matches!(tape.peek(), Some(c) if is_newline(c)) { read.read_next().ok();
tape.read_next();
} }
} }
/// Consume characters until the next ACGT base or newline (leaving it unread). fn skip_until_acgt_or_newline(read: &ForwardCursor<'_>) {
fn skip_until_acgt_or_newline(tape: &mut RopeTape) { while let Some(c) = read.read_ahead(1).ok() {
while let Some(c) = tape.peek() {
if is_newline(c) || is_acgt(c & !0x20u8) { if is_newline(c) || is_acgt(c & !0x20u8) {
return; return;
} }
tape.read_next(); read.read_next().ok();
} }
} }
/// Close the current segment. fn end_segment(write: &ForwardCursor<'_>, seg_start: &mut usize, k: usize) {
/// let seg_len = write.tell().unwrap_or(0) - *seg_start;
/// - If `seg_len >= k`: write `0x00` terminator and advance `seg_start`.
/// - If `0 < seg_len < k`: erase by retreating the write cursor.
/// - If `seg_len == 0`: nothing to do.
fn end_segment(tape: &mut RopeTape, seg_start: &mut RopeCursor, k: usize) {
let seg_len = tape.written_since(*seg_start);
if seg_len >= k { if seg_len >= k {
tape.write_next(0x00); write.write(0x00).ok();
} else if seg_len > 0 { } else if seg_len > 0 {
tape.write_retreat(seg_len); write.rewind(seg_len).ok();
} }
*seg_start = tape.write_snapshot(); *seg_start = write.tell().unwrap_or(0);
} }
#[inline] fn is_newline(c: u8) -> bool { c == b'\n' || c == b'\r' } #[inline] fn is_newline(c: u8) -> bool { c == b'\n' || c == b'\r' }
@@ -193,14 +158,23 @@ fn end_segment(tape: &mut RopeTape, seg_start: &mut RopeCursor, k: usize) {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use bytes::BytesMut;
fn run(data: &[u8], k: usize) -> Vec<u8> { fn make_rope(data: &[u8]) -> Rope {
let chunks = vec![BytesMut::from(data).freeze()]; let mut r = Rope::new();
normalize_fastq_chunk(chunks, k) r.push(data.to_vec());
.into_iter() r
.flat_map(|b| b.to_vec()) }
.collect()
fn flat(r: Rope) -> Vec<u8> {
r.fw_cursor().collect()
}
fn run_fastq(data: &[u8], k: usize) -> Vec<u8> {
flat(normalize_fastq_chunk(make_rope(data), k))
}
fn run_fasta(data: &[u8], k: usize) -> Vec<u8> {
flat(normalize_fasta_chunk(make_rope(data), k))
} }
fn make_fastq(records: &[&[u8]]) -> Vec<u8> { fn make_fastq(records: &[&[u8]]) -> Vec<u8> {
@@ -216,107 +190,6 @@ mod tests {
buf buf
} }
// ── basic output format ───────────────────────────────────────────────────
#[test]
fn single_record_produces_seq_then_null() {
let out = run(&make_fastq(&[b"ACGTACGT"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn two_records_concatenated() {
let out = run(&make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]), 4);
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
}
// ── uppercase normalisation ───────────────────────────────────────────────
#[test]
fn lowercase_input_uppercased() {
let out = run(&make_fastq(&[b"acgtacgt"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn mixed_case_uppercased() {
let out = run(&make_fastq(&[b"AcGtAcGt"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
// ── k filter ─────────────────────────────────────────────────────────────
#[test]
fn sequence_shorter_than_k_discarded() {
let out = run(&make_fastq(&[b"ACG"]), 4);
assert_eq!(out, b"");
}
#[test]
fn sequence_exactly_k_kept() {
let out = run(&make_fastq(&[b"ACGT"]), 4);
assert_eq!(out, b"ACGT\x00");
}
#[test]
fn short_record_among_valid_ones_discarded() {
let out = run(&make_fastq(&[b"ACGTACGT", b"AC", b"TTTTTTTT"]), 4);
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
}
// ── ambiguous bases ───────────────────────────────────────────────────────
#[test]
fn ambiguous_splits_into_two_segments() {
// 'N' in the middle splits "ACGT" + "ACGT"
let out = run(&make_fastq(&[b"ACGTNACGT"]), 4);
assert_eq!(out, b"ACGT\x00ACGT\x00");
}
#[test]
fn segment_after_ambiguous_too_short_discarded() {
// "ACGTACGT" + 'N' + "AC" (< k=4)
let out = run(&make_fastq(&[b"ACGTACGTNAC"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn consecutive_ambiguous_produce_no_empty_segment() {
let out = run(&make_fastq(&[b"ACGTNNNNACGT"]), 4);
assert_eq!(out, b"ACGT\x00ACGT\x00");
}
#[test]
fn ambiguous_at_start_skipped() {
let out = run(&make_fastq(&[b"NNACGTACGT"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn ambiguous_at_end_produces_no_trailing_empty() {
let out = run(&make_fastq(&[b"ACGTACGTNN"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
// ── CRLF line endings ─────────────────────────────────────────────────────
#[test]
fn crlf_handled() {
let data = b"@hdr\r\nACGTACGT\r\n+\r\nIIIIIIII\r\n";
let out = run(data, 4);
assert_eq!(out, b"ACGTACGT\x00");
}
// ── FASTA ─────────────────────────────────────────────────────────────────
fn run_fasta(data: &[u8], k: usize) -> Vec<u8> {
let chunks = vec![BytesMut::from(data).freeze()];
normalize_fasta_chunk(chunks, k)
.into_iter()
.flat_map(|b| b.to_vec())
.collect()
}
fn make_fasta(records: &[(&[u8], &[u8])]) -> Vec<u8> { fn make_fasta(records: &[(&[u8], &[u8])]) -> Vec<u8> {
let mut buf = Vec::new(); let mut buf = Vec::new();
for (id, seq) in records { for (id, seq) in records {
@@ -329,112 +202,160 @@ mod tests {
buf buf
} }
// ── FASTQ basic ──────────────────────────────────────────────────────────
#[test]
fn single_record_produces_seq_then_null() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGTACGT"]), 4), b"ACGTACGT\x00");
}
#[test]
fn two_records_concatenated() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]), 4),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
#[test]
fn lowercase_input_uppercased() {
assert_eq!(run_fastq(&make_fastq(&[b"acgtacgt"]), 4), b"ACGTACGT\x00");
}
#[test]
fn mixed_case_uppercased() {
assert_eq!(run_fastq(&make_fastq(&[b"AcGtAcGt"]), 4), b"ACGTACGT\x00");
}
#[test]
fn sequence_shorter_than_k_discarded() {
assert_eq!(run_fastq(&make_fastq(&[b"ACG"]), 4), b"");
}
#[test]
fn sequence_exactly_k_kept() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGT"]), 4), b"ACGT\x00");
}
#[test]
fn short_record_among_valid_ones_discarded() {
assert_eq!(
run_fastq(&make_fastq(&[b"ACGTACGT", b"AC", b"TTTTTTTT"]), 4),
b"ACGTACGT\x00TTTTTTTT\x00"
);
}
#[test]
fn ambiguous_splits_into_two_segments() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGTNACGT"]), 4), b"ACGT\x00ACGT\x00");
}
#[test]
fn segment_after_ambiguous_too_short_discarded() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGTACGTNAC"]), 4), b"ACGTACGT\x00");
}
#[test]
fn consecutive_ambiguous_produce_no_empty_segment() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGTNNNNACGT"]), 4), b"ACGT\x00ACGT\x00");
}
#[test]
fn ambiguous_at_start_skipped() {
assert_eq!(run_fastq(&make_fastq(&[b"NNACGTACGT"]), 4), b"ACGTACGT\x00");
}
#[test]
fn ambiguous_at_end_produces_no_trailing_empty() {
assert_eq!(run_fastq(&make_fastq(&[b"ACGTACGTNN"]), 4), b"ACGTACGT\x00");
}
#[test]
fn crlf_handled() {
let data = b"@hdr\r\nACGTACGT\r\n+\r\nIIIIIIII\r\n";
assert_eq!(run_fastq(data, 4), b"ACGTACGT\x00");
}
#[test]
fn multi_slice_rope() {
let data = make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]);
let mid = data.len() / 2;
let mut rope = Rope::new();
rope.push(data[..mid].to_vec());
rope.push(data[mid..].to_vec());
assert_eq!(flat(normalize_fastq_chunk(rope, 4)), b"ACGTACGT\x00TTTTTTTT\x00");
}
// ── FASTA ─────────────────────────────────────────────────────────────────
#[test] #[test]
fn fasta_single_record() { fn fasta_single_record() {
let out = run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT")]), 4); assert_eq!(run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT")]), 4), b"ACGTACGT\x00");
assert_eq!(out, b"ACGTACGT\x00");
} }
#[test] #[test]
fn fasta_two_records() { fn fasta_two_records() {
let out = run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]), 4); assert_eq!(
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00"); run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]), 4),
b"ACGTACGT\x00TTTTTTTT\x00"
);
} }
#[test] #[test]
fn fasta_multiline_sequence_concatenated() { fn fasta_multiline_sequence_concatenated() {
let data = b">s1\nACGT\nACGT\nACGT\n"; assert_eq!(run_fasta(b">s1\nACGT\nACGT\nACGT\n", 4), b"ACGTACGTACGT\x00");
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGTACGTACGT\x00");
} }
#[test] #[test]
fn fasta_lowercase_uppercased() { fn fasta_lowercase_uppercased() {
let out = run_fasta(&make_fasta(&[(b"s1", b"acgtacgt")]), 4); assert_eq!(run_fasta(&make_fasta(&[(b"s1", b"acgtacgt")]), 4), b"ACGTACGT\x00");
assert_eq!(out, b"ACGTACGT\x00");
} }
#[test] #[test]
fn fasta_short_record_discarded() { fn fasta_short_record_discarded() {
let out = run_fasta(&make_fasta(&[(b"s1", b"ACG")]), 4); assert_eq!(run_fasta(&make_fasta(&[(b"s1", b"ACG")]), 4), b"");
assert_eq!(out, b"");
} }
#[test] #[test]
fn fasta_short_among_valid_discarded() { fn fasta_short_among_valid_discarded() {
let out = run_fasta( assert_eq!(
&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"AC"), (b"s3", b"TTTTTTTT")]), run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"AC"), (b"s3", b"TTTTTTTT")]), 4),
4, b"ACGTACGT\x00TTTTTTTT\x00"
); );
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
} }
#[test] #[test]
fn fasta_ambiguous_splits_segments() { fn fasta_ambiguous_splits_segments() {
let data = b">s1\nACGTNACGT\n"; assert_eq!(run_fasta(b">s1\nACGTNACGT\n", 4), b"ACGT\x00ACGT\x00");
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGT\x00ACGT\x00");
} }
#[test] #[test]
fn fasta_ambiguous_across_line_boundary() { fn fasta_ambiguous_across_line_boundary() {
// 'N' at start of second line — still ambiguous assert_eq!(run_fasta(b">s1\nACGT\nNACGT\n", 4), b"ACGT\x00ACGT\x00");
let data = b">s1\nACGT\nNACGT\n";
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGT\x00ACGT\x00");
} }
#[test] #[test]
fn fasta_ambiguous_short_segment_discarded() { fn fasta_ambiguous_short_segment_discarded() {
let data = b">s1\nACGTACGTNAC\n"; assert_eq!(run_fasta(b">s1\nACGTACGTNAC\n", 4), b"ACGTACGT\x00");
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGTACGT\x00");
} }
#[test] #[test]
fn fasta_no_trailing_newline() { fn fasta_no_trailing_newline() {
let data = b">s1\nACGTACGT"; assert_eq!(run_fasta(b">s1\nACGTACGT", 4), b"ACGTACGT\x00");
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGTACGT\x00");
} }
#[test] #[test]
fn fasta_crlf_line_endings() { fn fasta_crlf_line_endings() {
let data = b">s1\r\nACGT\r\nACGT\r\n>s2\r\nTTTT\r\n"; assert_eq!(run_fasta(b">s1\r\nACGT\r\nACGT\r\n>s2\r\nTTTT\r\n", 4), b"ACGTACGT\x00TTTT\x00");
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGTACGT\x00TTTT\x00");
} }
#[test] #[test]
fn fasta_multi_slice_rope() { fn fasta_multi_slice_rope() {
let data = make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]); let data = make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]);
let mid = data.len() / 2; let mid = data.len() / 2;
let chunks = vec![ let mut rope = Rope::new();
BytesMut::from(&data[..mid]).freeze(), rope.push(data[..mid].to_vec());
BytesMut::from(&data[mid..]).freeze(), rope.push(data[mid..].to_vec());
]; assert_eq!(flat(normalize_fasta_chunk(rope, 4)), b"ACGTACGT\x00TTTTTTTT\x00");
let out: Vec<u8> = normalize_fasta_chunk(chunks, 4)
.into_iter()
.flat_map(|b| b.to_vec())
.collect();
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
}
// ── multi-record rope (multiple Bytes slices) ─────────────────────────────
#[test]
fn multi_slice_rope() {
let data = make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]);
// Split into two slices at an arbitrary boundary.
let mid = data.len() / 2;
let chunks = vec![
BytesMut::from(&data[..mid]).freeze(),
BytesMut::from(&data[mid..]).freeze(),
];
let out: Vec<u8> = normalize_fastq_chunk(chunks, 4)
.into_iter()
.flat_map(|b| b.to_vec())
.collect();
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
} }
} }
-427
View File
@@ -1,427 +0,0 @@
//! Two-cursor tape over a rope of `BytesMut` slices.
//!
//! A [`RopeCursor`] is a position `(slice, byte)` within a rope.
//! Read and write cursors are the same type — the only difference is which
//! primitive they call: [`RopeCursor::peek`] vs [`RopeCursor::poke`].
//!
//! The read-only methods (`peek`, `advance`, `retreat`, `distance`) are generic
//! over `T: std::ops::Deref<Target = [u8]>` and work on both `Vec<Bytes>` and
//! `Vec<BytesMut>`. The write method (`poke`) is specific to `BytesMut`.
//!
//! [`RopeTape`] owns the rope and two cursors, and exposes the four Turing-machine
//! primitives used by the sequence normalisation automata.
use std::ops::Deref;
use bytes::{Bytes, BytesMut};
// ── RopeCursor ────────────────────────────────────────────────────────────────
/// A position within a rope (`Vec<BytesMut>`), usable as either a read or write head.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RopeCursor {
slice: usize,
byte: usize,
}
impl RopeCursor {
/// Position at the very start of the rope.
#[inline]
pub fn new() -> Self {
Self { slice: 0, byte: 0 }
}
/// Read the byte at the current position without moving.
/// Returns `None` at end of rope.
#[inline]
pub fn peek<T: Deref<Target = [u8]>>(&self, rope: &[T]) -> Option<u8> {
rope.get(self.slice)?.get(self.byte).copied()
}
/// Write `b` at the current position without moving.
#[inline]
pub fn poke(&self, rope: &mut [BytesMut], b: u8) {
rope[self.slice][self.byte] = b;
}
/// Advance by one byte, crossing slice boundaries transparently.
/// Returns `false` when the end of the rope is reached.
#[inline]
pub fn advance<T: Deref<Target = [u8]>>(&mut self, rope: &[T]) -> bool {
if self.slice >= rope.len() {
return false;
}
self.byte += 1;
if self.byte >= rope[self.slice].len() {
self.slice += 1;
self.byte = 0;
}
self.slice < rope.len()
}
/// Position at the last byte of the rope.
///
/// Returns `None` if the rope is empty or all slices are empty.
pub fn end<T: Deref<Target = [u8]>>(rope: &[T]) -> Option<Self> {
for (i, slice) in rope.iter().enumerate().rev() {
if !slice.is_empty() {
return Some(Self { slice: i, byte: slice.len() - 1 });
}
}
None
}
/// Retreat by one byte and return the byte now under the cursor.
///
/// Returns `None` if already at the beginning of the rope.
/// This is the inverse of the read-advance pattern used by `RopeTape::read_next`.
pub fn previous<T: Deref<Target = [u8]>>(&mut self, rope: &[T]) -> Option<u8> {
if self.slice == 0 && self.byte == 0 {
return None;
}
self.retreat(1, rope);
self.peek(rope)
}
/// Retreat by `n` bytes, crossing slice boundaries transparently.
///
/// Panics in debug mode if retreating past the beginning of the rope.
pub fn retreat<T: Deref<Target = [u8]>>(&mut self, mut n: usize, rope: &[T]) {
while n > 0 {
if self.byte >= n {
self.byte -= n;
return;
}
// Consume all bytes in current slice and cross boundary.
n -= self.byte + 1;
debug_assert!(self.slice > 0, "retreat past beginning of rope");
self.slice -= 1;
self.byte = rope[self.slice].len() - 1;
}
}
/// Number of bytes from `from` (inclusive) to `to` (exclusive).
///
/// Requires `from` ≤ `to` in rope order.
pub fn distance<T: Deref<Target = [u8]>>(from: &Self, to: &Self, rope: &[T]) -> usize {
if from.slice == to.slice {
to.byte - from.byte
} else {
let mut d = rope[from.slice].len() - from.byte;
for s in (from.slice + 1)..to.slice {
d += rope[s].len();
}
d += to.byte;
d
}
}
}
impl Default for RopeCursor {
fn default() -> Self { Self::new() }
}
// ── RopeTape ──────────────────────────────────────────────────────────────────
/// A mutable rope with independent read and write cursors.
///
/// The write cursor always stays at or behind the read cursor — this invariant
/// is guaranteed structurally by the formats (FASTA headers and FASTQ
/// non-sequence lines are consumed before any sequence byte reaches the same
/// position).
pub struct RopeTape {
rope: Vec<BytesMut>,
read: RopeCursor,
write: RopeCursor,
}
impl RopeTape {
/// Build a tape from a chunk produced by [`crate::chunk::SeqChunkIter`].
///
/// Each `Bytes` is converted to `BytesMut` in O(1): succeeds because the
/// chunk reader yields uniquely-owned, non-shared buffers.
pub fn new(chunks: Vec<Bytes>) -> Self {
let rope = chunks
.into_iter()
.map(|b| b.try_into_mut().expect("Bytes not uniquely owned"))
.collect();
Self { rope, read: RopeCursor::new(), write: RopeCursor::new() }
}
/// Peek at the byte under the read cursor without advancing.
#[inline]
pub fn peek(&self) -> Option<u8> {
self.read.peek(&self.rope)
}
/// Read the byte under the read cursor and advance it.
#[inline]
pub fn read_next(&mut self) -> Option<u8> {
let b = self.read.peek(&self.rope)?;
self.read.advance(&self.rope);
Some(b)
}
/// Write `b` at the write cursor and advance it.
#[inline]
pub fn write_next(&mut self, b: u8) {
self.write.poke(&mut self.rope, b);
self.write.advance(&self.rope);
}
/// Retreat the read cursor by one byte and return the byte now under it.
///
/// This is the inverse of `read_next`: move back one position and read.
/// Returns `None` if already at the beginning of the tape.
#[inline]
pub fn previous(&mut self) -> Option<u8> {
self.read.previous(&self.rope)
}
/// Retreat the write cursor by `n` bytes (to erase a sequence shorter than k).
#[inline]
pub fn write_retreat(&mut self, n: usize) {
self.write.retreat(n, &self.rope);
}
/// Snapshot the current write position, typically used as `seq_start`.
#[inline]
pub fn write_snapshot(&self) -> RopeCursor {
self.write
}
/// Number of bytes written since `snapshot`.
#[inline]
pub fn written_since(&self, snapshot: RopeCursor) -> usize {
RopeCursor::distance(&snapshot, &self.write, &self.rope)
}
/// Consume the tape and return the written region as a `Vec<Bytes>`.
///
/// Slices fully overwritten are returned in full; the last partial slice is
/// truncated to the write position; all remaining (unread) slices are dropped.
pub fn into_output(mut self) -> Vec<Bytes> {
let ws = self.write.slice;
let wb = self.write.byte;
if ws < self.rope.len() {
self.rope.truncate(ws + 1);
if let Some(last) = self.rope.last_mut() {
last.truncate(wb);
}
}
self.rope
.into_iter()
.filter(|s| !s.is_empty())
.map(BytesMut::freeze)
.collect()
}
}
// ── tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
fn rope(slices: &[&[u8]]) -> Vec<BytesMut> {
slices.iter().map(|s| BytesMut::from(*s)).collect()
}
/// Mimics what SeqChunkIter produces: Bytes allocated from BytesMut (uniquely owned).
fn owned_bytes(data: &[u8]) -> Bytes {
BytesMut::from(data).freeze()
}
// ── RopeCursor::peek ──────────────────────────────────────────────────────
#[test]
fn peek_first_byte() {
let r = rope(&[b"ACGT"]);
assert_eq!(RopeCursor::new().peek(&r), Some(b'A'));
}
#[test]
fn peek_past_end_returns_none() {
let r = rope(&[b"A"]);
let mut c = RopeCursor::new();
c.advance(&r);
assert_eq!(c.peek(&r), None);
}
// ── RopeCursor::advance ───────────────────────────────────────────────────
#[test]
fn advance_crosses_slice_boundary() {
let r = rope(&[b"AC", b"GT"]);
let mut c = RopeCursor::new();
c.advance(&r); // → 'C'
c.advance(&r); // → 'G' (slice 1)
assert_eq!(c.peek(&r), Some(b'G'));
}
#[test]
fn advance_at_end_returns_false() {
let r = rope(&[b"A"]);
let mut c = RopeCursor::new();
let still_in = c.advance(&r);
assert!(!still_in);
assert_eq!(c.peek(&r), None);
}
// ── RopeCursor::poke ──────────────────────────────────────────────────────
#[test]
fn poke_writes_without_advancing() {
let mut r = rope(&[b"XXXX"]);
let c = RopeCursor::new();
c.poke(&mut r, b'A');
assert_eq!(r[0][0], b'A');
assert_eq!(r[0][1], b'X'); // unchanged
// cursor position unchanged: peek still at 0
assert_eq!(c.peek(&r), Some(b'A'));
}
// ── RopeCursor::retreat ───────────────────────────────────────────────────
#[test]
fn retreat_within_slice() {
let r = rope(&[b"ACGT"]);
let mut c = RopeCursor::new();
c.advance(&r); c.advance(&r); c.advance(&r); // → 'T'
c.retreat(2, &r);
assert_eq!(c.peek(&r), Some(b'C'));
}
#[test]
fn retreat_crosses_slice_boundary() {
let r = rope(&[b"ACGT", b"TTTT"]);
let mut c = RopeCursor::new();
for _ in 0..4 { c.advance(&r); } // → first 'T' of slice 1
c.retreat(1, &r);
assert_eq!(c.peek(&r), Some(b'T')); // last byte of "ACGT"
}
#[test]
fn retreat_exactly_to_slice_start() {
let r = rope(&[b"ACGT", b"TTTT"]);
let mut c = RopeCursor::new();
for _ in 0..5 { c.advance(&r); } // → second 'T' of slice 1
c.retreat(5, &r); // back to very first byte
assert_eq!(c.peek(&r), Some(b'A'));
}
#[test]
fn retreat_zero_is_noop() {
let r = rope(&[b"ACGT"]);
let mut c = RopeCursor::new();
c.advance(&r); c.advance(&r); // → 'G'
c.retreat(0, &r);
assert_eq!(c.peek(&r), Some(b'G'));
}
// ── RopeCursor::distance ──────────────────────────────────────────────────
#[test]
fn distance_same_slice() {
let r = rope(&[b"ACGTACGT"]);
let mut a = RopeCursor::new();
let mut b = RopeCursor::new();
b.advance(&r); b.advance(&r); b.advance(&r); // at offset 3
a.advance(&r); // at offset 1
assert_eq!(RopeCursor::distance(&a, &b, &r), 2);
}
#[test]
fn distance_across_slices() {
let r = rope(&[b"ACGT", b"TTTT"]);
let from = RopeCursor::new(); // (0,0)
let mut to = RopeCursor::new();
for _ in 0..6 { to.advance(&r); } // (1,2)
assert_eq!(RopeCursor::distance(&from, &to, &r), 6);
}
#[test]
fn distance_zero() {
let r = rope(&[b"ACGT"]);
let c = RopeCursor::new();
assert_eq!(RopeCursor::distance(&c, &c, &r), 0);
}
// ── RopeTape ──────────────────────────────────────────────────────────────
#[test]
fn tape_read_skips_write_copies() {
// Simulate: 5 bytes header, then 4 bytes sequence.
// Read 5 without writing, then copy 4.
let chunks = vec![owned_bytes(b">hdr\nACGT")];
let mut tape = RopeTape::new(chunks);
for _ in 0..5 { tape.read_next(); } // skip ">hdr\n"
for _ in 0..4 {
let b = tape.read_next().unwrap();
tape.write_next(b);
}
let out: Vec<u8> = tape.into_output().into_iter().flat_map(|b| b.to_vec()).collect();
assert_eq!(out, b"ACGT");
}
#[test]
fn tape_write_retreat_erases() {
let chunks = vec![owned_bytes(b"XXXXXXACGT")];
let mut tape = RopeTape::new(chunks);
// Write 3 bytes, then retreat — simulates a sequence < k being discarded.
let snap = tape.write_snapshot();
for _ in 0..3 {
let b = tape.read_next().unwrap();
tape.write_next(b);
}
assert_eq!(tape.written_since(snap), 3);
tape.write_retreat(3); // erase the 3 bytes
assert_eq!(tape.written_since(snap), 0);
// Now write 4 more bytes (the "ACGT").
for _ in 0..3 { tape.read_next(); } // skip "XXX"
for _ in 0..4 {
let b = tape.read_next().unwrap();
tape.write_next(b);
}
let out: Vec<u8> = tape.into_output().into_iter().flat_map(|b| b.to_vec()).collect();
assert_eq!(out, b"ACGT");
}
#[test]
fn tape_into_output_exact_boundary() {
// Write exactly until the last byte of the first slice, nothing more.
let chunks = vec![
owned_bytes(b"ACGT"),
owned_bytes(b"TTTT"),
];
let mut tape = RopeTape::new(chunks);
for _ in 0..4 {
let b = tape.read_next().unwrap();
tape.write_next(b);
}
// Skip the rest
let out: Vec<u8> = tape.into_output().into_iter().flat_map(|b| b.to_vec()).collect();
assert_eq!(out, b"ACGT");
}
#[test]
fn tape_written_since_across_slices() {
let chunks = vec![
owned_bytes(b"XXXX"),
owned_bytes(b"YYYY"),
];
let mut tape = RopeTape::new(chunks);
let snap = tape.write_snapshot();
for _ in 0..6 {
let b = tape.read_next().unwrap();
tape.write_next(b);
}
assert_eq!(tape.written_since(snap), 6);
}
}
-7
View File
@@ -1,7 +0,0 @@
/// A position within a rope (`Vec<BytesMut>`), usable as either a read or write head.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RopeCursor {
rope: &Rope,
slice: usize,
byte: usize,
}
-3
View File
@@ -1,3 +0,0 @@
mod rope;
use rope::Rope;
-100
View File
@@ -1,100 +0,0 @@
use bytes::Bytes;
pub struct Rope {
blocks: Vec<Bytes>,
length: usize,
cum_length: Vec<usize>,
last_block_start: usize,
last_block_end: usize,
last_block_idx: usize,
}
impl Rope {
pub fn new() -> Self {
Self {
blocks: Vec::new(),
length: 0,
cum_length: Vec::new(),
last_block_start: 0,
last_block_end: 0,
last_block_idx: 0,
}
}
pub fn push(&mut self, block: Bytes) {
self.blocks.push(block);
self.length += block.len();
self.cum_length.push(self.length);
if self.blocks.len() == 1 {
self.last_block_start = 0;
self.last_block_end = block.len();
self.last_block_idx = 0;
}
}
pub fn n_blocks(&self) -> usize {
self.blocks.len()
}
pub fn len(&self) -> usize {
self.length
}
fn lookup(&mut self, i: usize) -> Option<(usize, usize, usize)> {
if i >= self.length {
return None;
}
let (block_idx, from, to) = match self.blocks.len() {
0 => return None,
1 => (0, 0, self.cum_length[0]),
2 => {
if i < self.cum_length[0] {
(0, 0, self.cum_length[0])
} else {
(1, self.cum_length[0], self.blocks[1].len())
}
}
_ => {
let begin_idx = 0;
let end_idx = self.blocks.len() - 1;
while begin_idx < end_idx {
let pivot_idx = (begin_idx + end_idx) / 2;
let offset = self.cum_length[pivot_idx];
if i < offset {
end_idx = pivot_idx;
} else {
begin_idx = pivot_idx + 1;
}
}
(
begin_idx - 1,
self.cum_length[begin_idx - 1],
self.blocks[begin_idx].len(),
)
}
};
self.last_block_idx = block_idx;
self.last_block_start = from;
self.last_block_end = to;
Some((block_idx, from, to))
}
pub fn coordinates(&self, i: usize) -> Option<(usize, usize)> {
let (block_idx, pos) = if i < self.last_block_end && i >= self.last_block_start {
(self.last_block_idx, i - self.last_block_start)
} else {
let (block_idx, from, to) = self.lookup(i)?;
(block_idx, i - from)
};
Some((block_idx, pos))
}
pub fn block(&self, i: usize) -> Option<&Bytes> {
self.blocks.get(i)
}
pub fn is_empty(&self) -> bool {
self.blocks.is_empty()
}
}
+1 -2
View File
@@ -5,5 +5,4 @@ edition = "2024"
[dependencies] [dependencies]
obikseq = { path = "../obikseq" } obikseq = { path = "../obikseq" }
obiread = { path = "../obiread" } obikrope = { path = "../obikrope" }
bytes = "1"
+26 -73
View File
@@ -15,8 +15,7 @@
//! | minimizer changed | k | //! | minimizer changed | k |
//! | super-kmer length = 256| k | //! | super-kmer length = 256| k |
use bytes::Bytes; use obikrope::{ForwardCursor, Rope, RopeCursor};
use obiread::tape::RopeCursor;
use obikseq::superkmer::SuperKmer; use obikseq::superkmer::SuperKmer;
use crate::encoding::encode_base; use crate::encoding::encode_base;
@@ -26,9 +25,8 @@ use crate::scratch::SuperKmerScratch;
use crate::window::KmerWindow; use crate::window::KmerWindow;
/// Iterator over `(minimizer, SuperKmer)` pairs. /// Iterator over `(minimizer, SuperKmer)` pairs.
pub struct SuperKmerIter { pub struct SuperKmerIter<'a> {
rope: Vec<Bytes>, cursor: ForwardCursor<'a>,
cursor: RopeCursor,
k: usize, k: usize,
scratch: SuperKmerScratch, scratch: SuperKmerScratch,
window: KmerWindow, window: KmerWindow,
@@ -38,23 +36,16 @@ pub struct SuperKmerIter {
prev_minimizer_pos: u8, prev_minimizer_pos: u8,
} }
impl SuperKmerIter { impl<'a> SuperKmerIter<'a> {
/// Build an iterator from a normalised rope chunk. /// Build an iterator from a normalised rope chunk.
/// ///
/// - `k`: k-mer size (131) /// - `k`: k-mer size (131)
/// - `m`: minimizer size (1 < m < k) /// - `m`: minimizer size (1 < m < k)
/// - `level_max`: maximum sub-word size for entropy (typically 6) /// - `level_max`: maximum sub-word size for entropy (typically 6)
/// - `theta`: entropy threshold; k-mers with score ≤ theta are rejected /// - `theta`: entropy threshold; k-mers with score ≤ theta are rejected
pub fn new( pub fn new(rope: &'a Rope, k: usize, m: usize, level_max: usize, theta: f64) -> Self {
chunks: Vec<Bytes>,
k: usize,
m: usize,
level_max: usize,
theta: f64,
) -> Self {
Self { Self {
rope: chunks, cursor: rope.fw_cursor(),
cursor: RopeCursor::new(),
k, k,
scratch: SuperKmerScratch::new(), scratch: SuperKmerScratch::new(),
window: KmerWindow::new(k), window: KmerWindow::new(k),
@@ -65,13 +56,6 @@ impl SuperKmerIter {
} }
} }
#[inline]
fn read_byte(&mut self) -> Option<u8> {
let b = self.cursor.peek(&self.rope)?;
self.cursor.advance(&self.rope);
Some(b)
}
fn reset_state(&mut self) { fn reset_state(&mut self) {
self.window.reset(); self.window.reset();
self.minimizer.reset(); self.minimizer.reset();
@@ -80,8 +64,6 @@ impl SuperKmerIter {
self.prev_minimizer_pos = 0; self.prev_minimizer_pos = 0;
} }
/// Emit the current scratch as a super-kmer if it holds at least k nucleotides.
/// Returns `None` and silently discards short buffers (< k nt, no complete kmer).
fn try_emit(&mut self, min: u64) -> Option<(u64, SuperKmer)> { fn try_emit(&mut self, min: u64) -> Option<(u64, SuperKmer)> {
if self.scratch.len() < self.k { if self.scratch.len() < self.k {
self.scratch.reset(); self.scratch.reset();
@@ -93,19 +75,17 @@ impl SuperKmerIter {
} }
} }
impl Iterator for SuperKmerIter { impl<'a> Iterator for SuperKmerIter<'a> {
type Item = (u64, SuperKmer); type Item = (u64, SuperKmer);
fn next(&mut self) -> Option<(u64, SuperKmer)> { fn next(&mut self) -> Option<(u64, SuperKmer)> {
loop { loop {
let byte = match self.read_byte() { let byte = match self.cursor.read_next().ok() {
None => { None => {
// EOF: flush whatever we have.
let min = self.prev_minimizer.unwrap_or(0); let min = self.prev_minimizer.unwrap_or(0);
return self.try_emit(min); return self.try_emit(min);
} }
Some(0x00) => { Some(0x00) => {
// Segment boundary: flush current super-kmer, then reset.
let min = self.prev_minimizer.unwrap_or(0); let min = self.prev_minimizer.unwrap_or(0);
let result = self.try_emit(min); let result = self.try_emit(min);
self.reset_state(); self.reset_state();
@@ -118,25 +98,22 @@ impl Iterator for SuperKmerIter {
}; };
let base2 = encode_base(byte); let base2 = encode_base(byte);
// Update sliding windows; kmer_ready is true once we have k bases.
let kmer_ready = self.window.push(base2); let kmer_ready = self.window.push(base2);
let current_min = self.minimizer.push(base2); let current_min = self.minimizer.push(base2);
if !kmer_ready { if !kmer_ready {
// Warm-up phase: just accumulate.
self.scratch.push(byte); self.scratch.push(byte);
continue; continue;
} }
let kmer = self.window.kmer_u64(); let kmer = self.window.kmer_u64();
let min = current_min.unwrap(); // guaranteed when kmer_ready let min = current_min.unwrap();
// ── 1. Entropy check ───────────────────────────────────────────── // ── 1. Entropy check ─────────────────────────────────────────────
if !self.entropy.accept(kmer) { if !self.entropy.accept(kmer) {
let prev_min = self.prev_minimizer.unwrap_or(0); let prev_min = self.prev_minimizer.unwrap_or(0);
let result = self.try_emit(prev_min); let result = self.try_emit(prev_min);
self.cursor.retreat(self.k - 1, &self.rope); self.cursor.rewind(self.k - 1).ok();
self.reset_state(); self.reset_state();
if result.is_some() { if result.is_some() {
return result; return result;
@@ -148,7 +125,7 @@ impl Iterator for SuperKmerIter {
if let Some(prev) = self.prev_minimizer { if let Some(prev) = self.prev_minimizer {
if min != prev { if min != prev {
let result = self.try_emit(prev); let result = self.try_emit(prev);
self.cursor.retreat(self.k, &self.rope); self.cursor.rewind(self.k).ok();
self.reset_state(); self.reset_state();
if result.is_some() { if result.is_some() {
return result; return result;
@@ -158,10 +135,9 @@ impl Iterator for SuperKmerIter {
} }
// ── 3. Super-kmer length check ──────────────────────────────────── // ── 3. Super-kmer length check ────────────────────────────────────
// S[j] would be the 257th nucleotide → don't add it.
if self.scratch.len() == 256 { if self.scratch.len() == 256 {
let result = self.try_emit(min); let result = self.try_emit(min);
self.cursor.retreat(self.k, &self.rope); self.cursor.rewind(self.k).ok();
self.reset_state(); self.reset_state();
if result.is_some() { if result.is_some() {
return result; return result;
@@ -169,7 +145,6 @@ impl Iterator for SuperKmerIter {
continue; continue;
} }
// All checks passed: accept S[j].
self.prev_minimizer = Some(min); self.prev_minimizer = Some(min);
self.prev_minimizer_pos = self.minimizer.minimizer_pos(); self.prev_minimizer_pos = self.minimizer.minimizer_pos();
self.scratch.push(byte); self.scratch.push(byte);
@@ -182,29 +157,24 @@ impl Iterator for SuperKmerIter {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use bytes::BytesMut;
fn chunks(data: &[u8]) -> Vec<Bytes> { fn make_rope(data: &[u8]) -> Rope {
vec![BytesMut::from(data).freeze()] let mut r = Rope::new();
r.push(data.to_vec());
r
} }
/// theta=0: accept everything; level_max=1 (minimal entropy computation).
fn run_nofilter(data: &[u8], k: usize, m: usize) -> Vec<Vec<u8>> { fn run_nofilter(data: &[u8], k: usize, m: usize) -> Vec<Vec<u8>> {
SuperKmerIter::new(chunks(data), k, m, 1, 0.0) let rope = make_rope(data);
SuperKmerIter::new(&rope, k, m, 1, 0.0)
.map(|(_, sk)| sk.to_ascii()) .map(|(_, sk)| sk.to_ascii())
.collect() .collect()
} }
// ── basic segmentation (no entropy / minimizer cut) ───────────────────────
#[test] #[test]
fn single_segment_one_superkmer() { fn single_segment_one_superkmer() {
// k=4, m=2; one segment "ACGTACGT\x00"
// All consecutive 4-mers share the same minimizer (or not — but we
// just check that we get the full sequence back with theta=0).
let out = run_nofilter(b"ACGTACGT\x00", 4, 2); let out = run_nofilter(b"ACGTACGT\x00", 4, 2);
assert!(!out.is_empty()); assert!(!out.is_empty());
// The concatenation of all emitted superkmers covers the segment.
let total: Vec<u8> = out.into_iter().flatten().collect(); let total: Vec<u8> = out.into_iter().flatten().collect();
assert!(total.len() >= 4); assert!(total.len() >= 4);
} }
@@ -227,54 +197,37 @@ mod tests {
assert!(!out.is_empty()); assert!(!out.is_empty());
} }
// ── entropy cut ───────────────────────────────────────────────────────────
#[test] #[test]
fn low_complexity_kmer_is_rejected() { fn low_complexity_kmer_is_rejected() {
// "AAAAAAAAACGT\x00": the run of A's is low-complexity;
// with a strict threshold the A-only kmers get cut.
// With theta=0 everything passes → we get one superkmer.
let out_pass = run_nofilter(b"AAAAAAAAACGT\x00", 4, 2); let out_pass = run_nofilter(b"AAAAAAAAACGT\x00", 4, 2);
assert!(!out_pass.is_empty()); assert!(!out_pass.is_empty());
// With a high threshold polyA should be rejected. let rope = make_rope(b"AAAAAAAA\x00");
let out_reject: Vec<Vec<u8>> = let out_reject: Vec<Vec<u8>> = SuperKmerIter::new(&rope, 4, 2, 6, 0.9)
SuperKmerIter::new(chunks(b"AAAAAAAA\x00"), 4, 2, 6, 0.9)
.map(|(_, sk)| sk.to_ascii()) .map(|(_, sk)| sk.to_ascii())
.collect(); .collect();
// The sequence is pure polyA: all kmers should fail entropy → no emission.
assert!(out_reject.is_empty()); assert!(out_reject.is_empty());
} }
// ── multi-slice rope ──────────────────────────────────────────────────────
#[test] #[test]
fn multi_slice_rope() { fn multi_slice_rope() {
let data = b"ACGTACGTACGT\x00"; let data = b"ACGTACGTACGT\x00";
let mid = data.len() / 2; let mid = data.len() / 2;
let rope = vec![ let mut rope = Rope::new();
BytesMut::from(&data[..mid]).freeze(), rope.push(data[..mid].to_vec());
BytesMut::from(&data[mid..]).freeze(), rope.push(data[mid..].to_vec());
]; let out: Vec<Vec<u8>> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0)
let out: Vec<Vec<u8>> = SuperKmerIter::new(rope, 4, 2, 1, 0.0)
.map(|(_, sk)| sk.to_ascii()) .map(|(_, sk)| sk.to_ascii())
.collect(); .collect();
assert!(!out.is_empty()); assert!(!out.is_empty());
} }
// ── minimizer is returned ─────────────────────────────────────────────────
#[test] #[test]
fn yields_minimizer_value() { fn yields_minimizer_value() {
let data = b"ACGTACGT\x00"; let rope = make_rope(b"ACGTACGT\x00");
let results: Vec<(u64, Vec<u8>)> = let results: Vec<(u64, Vec<u8>)> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0)
SuperKmerIter::new(chunks(data), 4, 2, 1, 0.0)
.map(|(min, sk)| (min, sk.to_ascii())) .map(|(min, sk)| (min, sk.to_ascii()))
.collect(); .collect();
assert!(!results.is_empty()); assert!(!results.is_empty());
// The minimizer is a non-trivial u64 (just check it's been computed).
for (min, _) in &results {
let _ = min; // value is format-specific; just verify no panic
}
} }
} }