refactor: centralize k-mer config and introduce packed sequences

Centralize k-mer and minimizer configuration using a thread-safe global module, and replace manual bit-packing with a memory-efficient `PackedSeq` type. Refactor core sequence and k-mer types to use compile-time length enforcement and centralized hashing. Introduce a new De Bruijn graph implementation with compact node encoding and traversal iterators. Update I/O, partitioning, and builder modules to align with the new architecture, and add the `xxhash-rust` dependency.
This commit is contained in:
Eric Coissac
2026-05-05 18:08:19 +02:00
parent 602f414957
commit 8c17bf958b
37 changed files with 2641 additions and 2456 deletions
+1
View File
@@ -14,3 +14,4 @@ obikseq = { path = "../obikseq" }
[dev-dependencies]
tempfile = "3"
obikseq = { path = "../obikseq", features = ["test-utils"] }
+20 -53
View File
@@ -1,46 +1,19 @@
use obikseq::superkmer::SuperKmer;
use obikseq::SuperKmer;
use std::io::{self, Read, Write};
/// Serialise one SuperKmer into `w` (uncompressed; caller must wrap with a compressor).
///
/// Bits [7:0] of the header store `n_kmers = seql - k + 1` (kmer units, 1255),
/// not the raw nucleotide length. This removes the 0=256 wrapping convention.
#[inline]
pub(crate) fn write_superkmer<W: Write>(w: &mut W, sk: &SuperKmer, k: usize) -> io::Result<()> {
let n_kmers = sk.len() - k + 1;
let new_bits = (sk.header_bits() & !0xFF) | (n_kmers as u32);
w.write_all(&new_bits.to_le_bytes())?;
w.write_all(sk.seq_bytes())
pub(crate) fn write_superkmer<W: Write>(w: &mut W, sk: &SuperKmer) -> io::Result<()> {
sk.write_to_binary(w)
}
/// Deserialise one SuperKmer from `r`. Returns `None` on clean EOF.
/// `seq_buf` is a reusable scratch buffer to avoid per-record allocation.
/// Bits [7:0] of the on-disk header contain `n_kmers`; nucleotide length is
/// reconstructed as `n_kmers + k - 1`.
pub(crate) fn read_superkmer<R: Read>(
r: &mut R,
seq_buf: &mut Vec<u8>,
k: usize,
) -> io::Result<Option<SuperKmer>> {
let mut hdr = [0u8; 4];
match r.read_exact(&mut hdr) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e),
pub(crate) fn read_superkmer<R: Read>(r: &mut R) -> io::Result<Option<SuperKmer>> {
match SuperKmer::read_from_binary(r) {
Ok(sk) => Ok(Some(sk)),
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
Err(e) => Err(e),
}
let bits = u32::from_le_bytes(hdr);
let n_kmers = (bits & 0xFF) as usize;
let nt_len = n_kmers + k - 1;
let byte_len = (nt_len + 3) / 4;
seq_buf.resize(byte_len, 0);
r.read_exact(seq_buf)?;
// Reconstruct the in-memory seql byte (0 encodes 256, 1-255 direct).
let seql_byte = if nt_len == 256 { 0u8 } else { nt_len as u8 };
let mem_bits = (bits & !0xFF) | (seql_byte as u32);
Ok(Some(SuperKmer::from_header_bits(
mem_bits,
seq_buf.as_slice().into(),
)))
}
#[cfg(test)]
@@ -54,32 +27,29 @@ mod tests {
#[test]
fn roundtrip_single() {
let k = 4;
let sk = make_sk(b"ACGTACGT");
let mut buf = Vec::new();
write_superkmer(&mut buf, &sk, k).unwrap();
write_superkmer(&mut buf, &sk).unwrap();
let mut cur = Cursor::new(&buf);
let mut seq_buf = Vec::new();
let got = read_superkmer(&mut cur, &mut seq_buf, k).unwrap().unwrap();
let got = read_superkmer(&mut cur).unwrap().unwrap();
assert_eq!(sk.to_ascii(), got.to_ascii());
assert_eq!(sk.len(), got.len());
assert_eq!(sk.seql(), got.seql());
}
#[test]
fn roundtrip_all_lengths() {
let bases: Vec<u8> = (0..256).map(|i| b"ACGT"[i % 4]).collect();
// k=11 is the project minimum; test seql from k to 256.
let bases: Vec<u8> = (0..300).map(|i| b"ACGT"[i % 4]).collect();
let k = 11;
for len in (k..=k + 8).chain([255, 256]) {
for len in (k..=k + 8).chain([255, 256, 257]) {
let sk = make_sk(&bases[..len]);
let mut buf = Vec::new();
write_superkmer(&mut buf, &sk, k).unwrap();
write_superkmer(&mut buf, &sk).unwrap();
let mut cur = Cursor::new(&buf);
let mut seq_buf = Vec::new();
let got = read_superkmer(&mut cur, &mut seq_buf, k).unwrap().unwrap();
let got = read_superkmer(&mut cur).unwrap().unwrap();
assert_eq!(sk.to_ascii(), got.to_ascii(), "len={len}");
assert_eq!(sk.seql(), got.seql(), "len={len}");
}
}
@@ -87,26 +57,23 @@ mod tests {
fn eof_returns_none() {
let buf: Vec<u8> = vec![];
let mut cur = Cursor::new(&buf);
let mut seq_buf = Vec::new();
assert!(read_superkmer(&mut cur, &mut seq_buf, 4).unwrap().is_none());
assert!(read_superkmer(&mut cur).unwrap().is_none());
}
#[test]
fn multiple_records() {
let k = 4;
let seqs: &[&[u8]] = &[b"AAAA", b"CCCC", b"GGGG", b"TTTT"];
let mut buf = Vec::new();
for s in seqs {
write_superkmer(&mut buf, &make_sk(s), k).unwrap();
write_superkmer(&mut buf, &make_sk(s)).unwrap();
}
let mut cur = Cursor::new(&buf);
let mut seq_buf = Vec::new();
for s in seqs {
let got = read_superkmer(&mut cur, &mut seq_buf, k).unwrap().unwrap();
let got = read_superkmer(&mut cur).unwrap().unwrap();
let expected = make_sk(s);
assert_eq!(expected.to_ascii(), got.to_ascii());
}
assert!(read_superkmer(&mut cur, &mut seq_buf, k).unwrap().is_none());
assert!(read_superkmer(&mut cur).unwrap().is_none());
}
}
+41 -38
View File
@@ -5,7 +5,7 @@ use crate::meta::SKFileMeta;
use lru::LruCache;
use niffler::Level;
use niffler::send::compression::Format;
use obikseq::superkmer::SuperKmer;
use obikseq::SuperKmer;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::num::NonZeroUsize;
@@ -222,7 +222,6 @@ pub struct SKFileWriter {
id: usize,
pool: Arc<Mutex<SKFilePool>>,
path: PathBuf,
k: usize,
pending: Vec<u8>,
flush_threshold: usize,
logically_closed: bool,
@@ -230,15 +229,14 @@ pub struct SKFileWriter {
}
/// Create a `SKFileWriter` for a new file (Zstd, level 3).
pub fn create_token(pool: &SharedPool, path: PathBuf, k: usize) -> SKResult<SKFileWriter> {
create_token_with(pool, path, k, Format::Zstd, Level::Three)
pub fn create_token(pool: &SharedPool, path: PathBuf) -> SKResult<SKFileWriter> {
create_token_with(pool, path, Format::Zstd, Level::Three)
}
/// Create a `SKFileWriter` for a new file with explicit format and level.
pub fn create_token_with(
pool: &SharedPool,
path: PathBuf,
k: usize,
format: Format,
level: Level,
) -> SKResult<SKFileWriter> {
@@ -247,7 +245,6 @@ pub fn create_token_with(
id,
pool: Arc::clone(pool),
path,
k,
pending: Vec::with_capacity(DEFAULT_FLUSH_THRESHOLD + 128),
flush_threshold: DEFAULT_FLUSH_THRESHOLD,
logically_closed: false,
@@ -258,18 +255,13 @@ pub fn create_token_with(
impl SKFileWriter {
/// Create a standalone file writer (Zstd, level 3).
/// The pool is created internally and is not accessible to the caller.
pub fn create<P: AsRef<Path>>(path: P, k: usize) -> SKResult<Self> {
Self::create_with(path, k, Format::Zstd, Level::Three)
pub fn create<P: AsRef<Path>>(path: P) -> SKResult<Self> {
Self::create_with(path, Format::Zstd, Level::Three)
}
/// Create a standalone file writer with explicit format and level.
pub fn create_with<P: AsRef<Path>>(
path: P,
k: usize,
format: Format,
level: Level,
) -> SKResult<Self> {
create_token_with(global_pool(), path.as_ref().to_owned(), k, format, level)
pub fn create_with<P: AsRef<Path>>(path: P, format: Format, level: Level) -> SKResult<Self> {
create_token_with(global_pool(), path.as_ref().to_owned(), format, level)
}
/// `true` if the underlying fd is currently open in the pool.
@@ -280,10 +272,10 @@ impl SKFileWriter {
/// Accumulate one SuperKmer. Drains to fd when `pending ≥ flush_threshold`.
pub fn write(&mut self, sk: &SuperKmer) -> SKResult<()> {
self.check_not_closed()?;
write_superkmer(&mut self.pending, sk, self.k)?;
write_superkmer(&mut self.pending, sk)?;
self.meta.instances += 1;
self.meta.count_sum += sk.count() as u64;
self.meta.length_sum += sk.len() as u64;
self.meta.length_sum += sk.seql() as u64;
if self.pending.len() >= self.flush_threshold {
self.drain()?;
}
@@ -294,10 +286,10 @@ impl SKFileWriter {
pub fn write_batch(&mut self, sks: &[SuperKmer]) -> SKResult<()> {
self.check_not_closed()?;
for sk in sks {
write_superkmer(&mut self.pending, sk, self.k)?;
write_superkmer(&mut self.pending, sk)?;
self.meta.instances += 1;
self.meta.count_sum += sk.count() as u64;
self.meta.length_sum += sk.len() as u64;
self.meta.length_sum += sk.seql() as u64;
if self.pending.len() >= self.flush_threshold {
self.drain()?;
}
@@ -439,7 +431,7 @@ impl Drop for SKFileWriter {
mod tests {
use super::*;
use crate::reader::SKFileReader;
use obikseq::superkmer::SuperKmer;
use obikseq::{SuperKmer, set_k};
use tempfile::{NamedTempFile, TempDir};
const TEST_K: usize = 4;
@@ -460,22 +452,24 @@ mod tests {
#[test]
fn creation_holds_no_fd() {
set_k(TEST_K);
let dir = TempDir::new().unwrap();
let p = pool(3);
for i in 0..10 {
create_token(&p, dir.path().join(format!("p{i}.zst")), TEST_K).unwrap();
create_token(&p, dir.path().join(format!("p{i}.zst"))).unwrap();
}
assert_eq!(p.lock().unwrap().open_count(), 0);
}
#[test]
fn pool_limits_open_fds() {
set_k(TEST_K);
let dir = TempDir::new().unwrap();
let p = pool(3);
let sk = make_sk(0);
let mut tokens: Vec<SKFileWriter> = (0..6)
.map(|i| create_token(&p, dir.path().join(format!("p{i}.zst")), TEST_K).unwrap())
.map(|i| create_token(&p, dir.path().join(format!("p{i}.zst"))).unwrap())
.collect();
for t in tokens.iter_mut() {
@@ -491,12 +485,13 @@ mod tests {
#[test]
fn evicted_token_stays_logically_open() {
set_k(TEST_K);
let dir = TempDir::new().unwrap();
let p = pool(1);
let sk = make_sk(0);
let mut t0 = create_token(&p, dir.path().join("a.zst"), TEST_K).unwrap();
let mut t1 = create_token(&p, dir.path().join("b.zst"), TEST_K).unwrap();
let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap();
let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap();
open_token(&mut t0, &sk); // t0 fd open, pool full
open_token(&mut t1, &sk); // evicts t0, t1 fd open
@@ -507,12 +502,13 @@ mod tests {
#[test]
fn evicted_data_readable_after_close_all() {
set_k(TEST_K);
let dir = TempDir::new().unwrap();
let p = pool(1);
let sk = make_sk(0);
let mut t0 = create_token(&p, dir.path().join("a.zst"), TEST_K).unwrap();
let mut t1 = create_token(&p, dir.path().join("b.zst"), TEST_K).unwrap();
let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap();
let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap();
t0.set_flush_threshold(1);
t0.write(&sk).unwrap(); // t0 fd open, pool full
@@ -528,7 +524,7 @@ mod tests {
p.lock().unwrap().close_all().unwrap();
for name in &["a.zst", "b.zst"] {
let mut r = SKFileReader::open(dir.path().join(name), TEST_K).unwrap();
let mut r = SKFileReader::open(dir.path().join(name)).unwrap();
let got = r.read_batch(10).unwrap();
assert_eq!(got.len(), 1, "{name}: expected 1 record");
}
@@ -536,13 +532,14 @@ mod tests {
#[test]
fn touch_moves_to_mru_so_lru_is_evicted() {
set_k(TEST_K);
let dir = TempDir::new().unwrap();
let p = pool(2);
let sk = make_sk(0);
let mut t0 = create_token(&p, dir.path().join("a.zst"), TEST_K).unwrap();
let mut t1 = create_token(&p, dir.path().join("b.zst"), TEST_K).unwrap();
let mut t2 = create_token(&p, dir.path().join("c.zst"), TEST_K).unwrap();
let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap();
let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap();
let mut t2 = create_token(&p, dir.path().join("c.zst")).unwrap();
open_token(&mut t0, &sk); // t0 open
open_token(&mut t1, &sk); // t1 open, t0 LRU
@@ -560,6 +557,7 @@ mod tests {
#[test]
fn close_all_produces_readable_files() {
set_k(TEST_K);
let dir = TempDir::new().unwrap();
let p = pool(8);
let paths: Vec<_> = (0..4)
@@ -568,7 +566,7 @@ mod tests {
let mut tokens: Vec<SKFileWriter> = paths
.iter()
.map(|path| create_token(&p, path.clone(), TEST_K).unwrap())
.map(|path| create_token(&p, path.clone()).unwrap())
.collect();
for (i, t) in tokens.iter_mut().enumerate() {
@@ -581,7 +579,7 @@ mod tests {
p.lock().unwrap().close_all().unwrap();
for path in &paths {
let mut r = SKFileReader::open(path, TEST_K).unwrap();
let mut r = SKFileReader::open(path).unwrap();
let got = r.read_batch(10).unwrap();
assert_eq!(got.len(), 1);
}
@@ -589,16 +587,17 @@ mod tests {
#[test]
fn write_batch_roundtrip() {
set_k(TEST_K);
let dir = TempDir::new().unwrap();
let p = pool(4);
let sks: Vec<_> = (0..50).map(make_sk).collect();
let path = dir.path().join("batch.zst");
let mut t = create_token(&p, path.clone(), TEST_K).unwrap();
let mut t = create_token(&p, path.clone()).unwrap();
t.write_batch(&sks).unwrap();
t.close().unwrap();
let mut r = SKFileReader::open(&path, TEST_K).unwrap();
let mut r = SKFileReader::open(&path).unwrap();
let got = r.read_batch(100).unwrap();
assert_eq!(got.len(), 50);
for (a, b) in sks.iter().zip(got.iter()) {
@@ -608,6 +607,7 @@ mod tests {
#[test]
fn from_system_limits_bounded() {
set_k(TEST_K);
let pool = SKFilePool::from_system_limits();
assert!(pool.max_open() >= 16);
assert!(pool.max_open() <= MAX_POOL_SIZE);
@@ -615,14 +615,15 @@ mod tests {
#[test]
fn standalone_roundtrip_zstd() {
set_k(TEST_K);
let tmp = NamedTempFile::new().unwrap();
let sks: Vec<_> = (0..100).map(make_sk).collect();
{
let mut w = SKFileWriter::create(tmp.path(), TEST_K).unwrap();
let mut w = SKFileWriter::create(tmp.path()).unwrap();
w.write_batch(&sks).unwrap();
w.close().unwrap();
}
let mut r = SKFileReader::open(tmp.path(), TEST_K).unwrap();
let mut r = SKFileReader::open(tmp.path()).unwrap();
let got = r.read_batch(200).unwrap();
assert_eq!(got.len(), 100);
for (a, b) in sks.iter().zip(got.iter()) {
@@ -632,8 +633,9 @@ mod tests {
#[test]
fn standalone_close_prevents_write() {
set_k(TEST_K);
let tmp = NamedTempFile::new().unwrap();
let mut w = SKFileWriter::create(tmp.path(), TEST_K).unwrap();
let mut w = SKFileWriter::create(tmp.path()).unwrap();
w.close().unwrap();
assert!(!w.is_open());
assert!(w.write(&make_sk(0)).is_err());
@@ -641,8 +643,9 @@ mod tests {
#[test]
fn standalone_is_physically_open() {
set_k(TEST_K);
let tmp = NamedTempFile::new().unwrap();
let mut w = SKFileWriter::create(tmp.path(), TEST_K).unwrap();
let mut w = SKFileWriter::create(tmp.path()).unwrap();
assert!(!w.is_physically_open()); // fd deferred until first drain
w.set_flush_threshold(1);
w.write(&make_sk(0)).unwrap(); // triggers drain → fd opened
+19 -15
View File
@@ -15,25 +15,20 @@ use std::path::{Path, PathBuf};
/// that it can fast-forward on next open.
pub struct SKFileReader {
path: PathBuf,
k: usize,
reader: Option<Box<dyn std::io::Read + Send>>,
/// Reusable scratch buffer for the `seq` bytes of each record.
seq_buf: Vec<u8>,
/// Number of SuperKmers successfully read so far (for eviction recovery).
consumed: u64,
}
impl SKFileReader {
/// Open a file for reading. Format is auto-detected from magic bytes.
/// `k` is the kmer size of the partition; required to decode the on-disk n_kmers field.
pub fn open<P: AsRef<Path>>(path: P, k: usize) -> SKResult<Self> {
pub fn open<P: AsRef<Path>>(path: P) -> SKResult<Self> {
let path = path.as_ref().to_owned();
let (reader, _fmt) = niffler::send::get_reader(Box::new(BufReader::new(File::open(&path)?)))?;
let (reader, _fmt) =
niffler::send::get_reader(Box::new(BufReader::new(File::open(&path)?)))?;
Ok(Self {
path,
k,
reader: Some(reader),
seq_buf: Vec::with_capacity(64),
consumed: 0,
})
}
@@ -46,7 +41,7 @@ impl SKFileReader {
"read from physically closed SKFileReader",
)
})?;
let result = read_superkmer(r, &mut self.seq_buf, self.k)?;
let result = read_superkmer(r)?;
if result.is_some() {
self.consumed += 1;
}
@@ -87,7 +82,10 @@ impl SKFileReader {
/// Return an iterator over this reader.
pub fn iter(&mut self) -> SKFileIter<'_> {
SKFileIter { reader: self, error: None }
SKFileIter {
reader: self,
error: None,
}
}
// ── pool-internal helpers ─────────────────────────────────────────────────
@@ -103,7 +101,7 @@ impl SKFileReader {
let target = self.consumed;
self.consumed = 0;
for _ in 0..target {
match read_superkmer(self.reader.as_mut().unwrap(), &mut self.seq_buf, self.k)? {
match read_superkmer(self.reader.as_mut().unwrap())? {
Some(_) => self.consumed += 1,
None => break,
}
@@ -152,6 +150,10 @@ mod tests {
const TEST_K: usize = 4; // test sequences are 8 bases; k=4 gives n_kmers=5
fn setup() {
obikseq::params::set_k(TEST_K);
}
fn make_sks(n: usize) -> Vec<SuperKmer> {
(0..n)
.map(|i| {
@@ -163,15 +165,16 @@ mod tests {
#[test]
fn iter_all() {
setup();
let tmp = NamedTempFile::new().unwrap();
let sks = make_sks(50);
{
let mut w = SKFileWriter::create(tmp.path(), TEST_K).unwrap();
let mut w = SKFileWriter::create(tmp.path()).unwrap();
w.write_batch(&sks).unwrap();
}
let mut r = SKFileReader::open(tmp.path(), TEST_K).unwrap();
let mut r = SKFileReader::open(tmp.path()).unwrap();
let got: Vec<_> = r.iter().collect();
assert_eq!(got.len(), 50);
for (a, b) in sks.iter().zip(got.iter()) {
@@ -181,15 +184,16 @@ mod tests {
#[test]
fn reopen_and_seek() {
setup();
let tmp = NamedTempFile::new().unwrap();
let sks = make_sks(20);
{
let mut w = SKFileWriter::create(tmp.path(), TEST_K).unwrap();
let mut w = SKFileWriter::create(tmp.path()).unwrap();
w.write_batch(&sks).unwrap();
}
let mut r = SKFileReader::open(tmp.path(), TEST_K).unwrap();
let mut r = SKFileReader::open(tmp.path()).unwrap();
// Read 10, then simulate pool eviction + re-access
let first = r.read_batch(10).unwrap();
r.close();