refactor: streamline I/O handling and remove deprecated codec module

Removes the `codec` and `limits` modules, eliminating manual serialization logic and OS file descriptor limit queries. Introduces a shared `append_path_suffix` utility to standardize path manipulation across `meta.rs` and `unitig_index.rs`. Refactors the file pool to dynamically size based on available file descriptors and optimizes descriptor lifecycle to prevent leaks. Enhances `SKFileReader` with LRU eviction, consumption tracking, and seek-on-reopen support. Migrates related round-trip and EOF tests to the reader module and updates chunking logic in the unitig index.
This commit is contained in:
Eric Coissac
2026-05-09 19:19:33 +08:00
parent 9e5af6b5a5
commit 92cda13ae4
10 changed files with 91 additions and 202 deletions
-21
View File
@@ -1,21 +0,0 @@
use obikseq::SuperKmer;
use std::io::{self, Read, Write};
/// Serialise one SuperKmer into `w` (uncompressed; caller must wrap with a compressor).
#[inline]
pub(crate) fn write_superkmer<W: Write>(w: &mut W, sk: &SuperKmer) -> io::Result<()> {
sk.write_to_binary(w)
}
/// Deserialise one SuperKmer from `r`. Returns `None` on clean EOF.
pub(crate) fn read_superkmer<R: Read>(r: &mut R) -> io::Result<Option<SuperKmer>> {
match SuperKmer::read_from_binary(r) {
Ok(sk) => Ok(Some(sk)),
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
Err(e) => Err(e),
}
}
#[cfg(test)]
#[path = "tests/codec.rs"]
mod tests;
+1 -1
View File
@@ -21,7 +21,7 @@ impl fmt::Display for SKError {
SKError::Io(e) => write!(f, "I/O error: {e}"), SKError::Io(e) => write!(f, "I/O error: {e}"),
SKError::Compression(e) => write!(f, "compression error: {e}"), SKError::Compression(e) => write!(f, "compression error: {e}"),
SKError::BadMagic { expected, got } => { SKError::BadMagic { expected, got } => {
write!(f, "bad magic in {expected}: expected {:?}, got {:?}", expected.as_bytes(), got) write!(f, "bad magic: expected {:?}, got {:?}", expected.as_bytes(), got)
} }
SKError::Truncated { context } => write!(f, "truncated file: {context}"), SKError::Truncated { context } => write!(f, "truncated file: {context}"),
SKError::InvalidData { context, detail } => { SKError::InvalidData { context, detail } => {
+10 -2
View File
@@ -1,6 +1,4 @@
pub mod codec;
pub mod error; pub mod error;
pub mod limits;
pub mod meta; pub mod meta;
pub mod pool; pub mod pool;
pub mod reader; pub mod reader;
@@ -11,3 +9,13 @@ pub use meta::SKFileMeta;
pub use pool::{create_token, create_token_with, SKFilePool, SharedPool, SKFileWriter}; pub use pool::{create_token, create_token_with, SKFilePool, SharedPool, SKFileWriter};
pub use reader::{SKFileIter, SKFileReader}; pub use reader::{SKFileIter, SKFileReader};
pub use unitig_index::{UnitigFileReader, UnitigFileWriter}; pub use unitig_index::{UnitigFileReader, UnitigFileWriter};
use std::path::{Path, PathBuf};
/// Append `suffix` to a path without altering any existing extension.
/// Uses raw `OsStr` concatenation to preserve non-UTF-8 path components.
pub(crate) fn append_path_suffix(path: &Path, suffix: &str) -> PathBuf {
let mut s = path.as_os_str().to_owned();
s.push(suffix);
PathBuf::from(s)
}
-8
View File
@@ -1,8 +0,0 @@
use rustix::process::{Resource, getrlimit};
/// Returns the maximum number of file descriptors that can be simultaneously open
/// for the current process. This is the process' file descriptor limit as
/// configured by the operating system.
pub fn max_concurrent_files() -> Option<u64> {
getrlimit(Resource::Nofile).current
}
+1 -3
View File
@@ -22,9 +22,7 @@ impl SKFileMeta {
/// Path of the sidecar file for a given SuperKmer file path. /// Path of the sidecar file for a given SuperKmer file path.
pub fn sidecar_path(sk_path: &Path) -> PathBuf { pub fn sidecar_path(sk_path: &Path) -> PathBuf {
let mut s = sk_path.as_os_str().to_owned(); crate::append_path_suffix(sk_path, ".meta")
s.push(".meta");
PathBuf::from(s)
} }
/// Read the sidecar for `sk_path`. Returns `None` if the sidecar is absent. /// Read the sidecar for `sk_path`. Returns `None` if the sidecar is absent.
+7 -10
View File
@@ -1,11 +1,10 @@
use crate::codec::write_superkmer;
use crate::error::SKResult; use crate::error::SKResult;
use crate::limits::max_concurrent_files;
use crate::meta::SKFileMeta; use crate::meta::SKFileMeta;
use lru::LruCache; use lru::LruCache;
use niffler::Level; use niffler::Level;
use niffler::send::compression::Format; use niffler::send::compression::Format;
use obikseq::SuperKmer; use obikseq::SuperKmer;
use rustix::process::{Resource, getrlimit};
use std::fs::{File, OpenOptions}; use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
@@ -88,7 +87,7 @@ impl SKFilePool {
/// Derive pool size from the OS fd limit (75 %, clamped to `[16, MAX_POOL_SIZE]`). /// Derive pool size from the OS fd limit (75 %, clamped to `[16, MAX_POOL_SIZE]`).
pub fn from_system_limits() -> Self { pub fn from_system_limits() -> Self {
let fd_limit = max_concurrent_files().unwrap_or(256) as usize; let fd_limit = getrlimit(Resource::Nofile).current.unwrap_or(256) as usize;
let max_open = (fd_limit * 3 / 4).clamp(16, MAX_POOL_SIZE); let max_open = (fd_limit * 3 / 4).clamp(16, MAX_POOL_SIZE);
Self::new(max_open) Self::new(max_open)
} }
@@ -131,13 +130,9 @@ impl SKFilePool {
// ── private ─────────────────────────────────────────────────────────────── // ── private ───────────────────────────────────────────────────────────────
/// Create file on disk (empty Zstd frame, fd immediately closed), register entry. /// Create file on disk (empty, fd immediately closed), register entry.
fn register(&mut self, path: PathBuf, format: Format, level: Level) -> SKResult<usize> { fn register(&mut self, path: PathBuf, format: Format, level: Level) -> SKResult<usize> {
{ File::create(&path)?;
let file = File::create(&path)?;
let _w = niffler::send::get_writer(Box::new(BufWriter::new(file)), format, level)?;
// _w drops here → empty frame finalized, fd released
}
let id = self.entries.len(); let id = self.entries.len();
self.entries.push(WriteEntry { self.entries.push(WriteEntry {
path, path,
@@ -180,6 +175,8 @@ impl SKFilePool {
/// Uses `try_lock()` so that entries currently being written to are skipped. /// Uses `try_lock()` so that entries currently being written to are skipped.
/// Returns an error if all open fds are in use. /// Returns an error if all open fds are in use.
fn evict_lru(&mut self) -> SKResult<()> { fn evict_lru(&mut self) -> SKResult<()> {
// Collect ids before the loop: iterating `self.open` borrows it immutably,
// which would conflict with `self.open.pop()` inside the loop body.
// iter() yields MRU→LRU; rev() gives LRU→MRU (try least recently used first). // iter() yields MRU→LRU; rev() gives LRU→MRU (try least recently used first).
let candidates: Vec<usize> = self.open.iter().rev().map(|(&id, _)| id).collect(); let candidates: Vec<usize> = self.open.iter().rev().map(|(&id, _)| id).collect();
for lru_id in candidates { for lru_id in candidates {
@@ -285,7 +282,7 @@ impl SKFileWriter {
} }
fn write_one(&mut self, sk: &SuperKmer) -> SKResult<()> { fn write_one(&mut self, sk: &SuperKmer) -> SKResult<()> {
write_superkmer(&mut self.pending, sk)?; sk.write_to_binary(&mut self.pending)?;
self.meta.instances += 1; self.meta.instances += 1;
self.meta.count_sum += sk.count() as u64; self.meta.count_sum += sk.count() as u64;
self.meta.length_sum += sk.seql() as u64; self.meta.length_sum += sk.seql() as u64;
+13 -65
View File
@@ -1,23 +1,16 @@
use crate::codec::read_superkmer;
use crate::error::{SKError, SKResult}; use crate::error::{SKError, SKResult};
use obikseq::superkmer::SuperKmer; use obikseq::superkmer::SuperKmer;
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::{self, BufReader, Read};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
/// Binary reader for SuperKmers, with transparent decompression via niffler. /// Binary reader for SuperKmers, with transparent decompression via niffler.
/// ///
/// Access is strictly sequential. Call [`iter`](SKFileReader::iter) to get an /// Access is strictly sequential. Call [`iter`](SKFileReader::iter) to get an
/// [`Iterator`] over the SuperKmers. /// [`Iterator`] over the SuperKmers.
///
/// The reader also supports LRU-pool eviction and recovery: when physically
/// closed by the pool, it records how many SuperKmers have been consumed so
/// that it can fast-forward on next open.
pub struct SKFileReader { pub struct SKFileReader {
path: PathBuf, path: PathBuf,
reader: Option<Box<dyn std::io::Read + Send>>, reader: Box<dyn std::io::Read + Send>,
/// Number of SuperKmers successfully read so far (for eviction recovery).
consumed: u64,
} }
impl SKFileReader { impl SKFileReader {
@@ -26,26 +19,12 @@ impl SKFileReader {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let (reader, _fmt) = let (reader, _fmt) =
niffler::send::get_reader(Box::new(BufReader::new(File::open(&path)?)))?; niffler::send::get_reader(Box::new(BufReader::new(File::open(&path)?)))?;
Ok(Self { Ok(Self { path, reader })
path,
reader: Some(reader),
consumed: 0,
})
} }
/// Read the next SuperKmer, or `None` at EOF. /// Read the next SuperKmer, or `None` at EOF.
pub fn read(&mut self) -> SKResult<Option<SuperKmer>> { pub fn read(&mut self) -> SKResult<Option<SuperKmer>> {
let r = self.reader.as_mut().ok_or_else(|| { Ok(read_superkmer(&mut self.reader)?)
std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"read from physically closed SKFileReader",
)
})?;
let result = read_superkmer(r)?;
if result.is_some() {
self.consumed += 1;
}
Ok(result)
} }
/// Read up to `n` SuperKmers. /// Read up to `n` SuperKmers.
@@ -60,21 +39,6 @@ impl SKFileReader {
Ok(batch) Ok(batch)
} }
/// Close the physical file handle without resetting the consumed counter.
pub fn close(&mut self) {
self.reader = None;
}
/// `true` if the underlying file handle is currently open.
pub fn is_open(&self) -> bool {
self.reader.is_some()
}
/// Number of SuperKmers successfully read since the file was opened.
pub fn consumed(&self) -> u64 {
self.consumed
}
/// Physical path of this file. /// Physical path of this file.
pub fn path(&self) -> &Path { pub fn path(&self) -> &Path {
&self.path &self.path
@@ -82,31 +46,7 @@ impl SKFileReader {
/// Return an iterator over this reader. /// Return an iterator over this reader.
pub fn iter(&mut self) -> SKFileIter<'_> { pub fn iter(&mut self) -> SKFileIter<'_> {
SKFileIter { SKFileIter { reader: self, error: None }
reader: self,
error: None,
}
}
// ── pool-internal helpers ─────────────────────────────────────────────────
/// Reopen after eviction and fast-forward to the last recorded position.
/// Cost is O(`consumed`) decompression; acceptable for sequential pipelines
/// where eviction of read-mode files is rare.
pub fn reopen_and_seek(&mut self) -> SKResult<()> {
debug_assert!(self.reader.is_none(), "reopen_and_seek on open reader");
let (reader, _fmt) =
niffler::send::get_reader(Box::new(BufReader::new(File::open(&self.path)?)))?;
self.reader = Some(reader);
let target = self.consumed;
self.consumed = 0;
for _ in 0..target {
match read_superkmer(self.reader.as_mut().unwrap())? {
Some(_) => self.consumed += 1,
None => break,
}
}
Ok(())
} }
} }
@@ -142,6 +82,14 @@ impl Iterator for SKFileIter<'_> {
} }
} }
fn read_superkmer<R: Read>(r: &mut R) -> io::Result<Option<SuperKmer>> {
match SuperKmer::read_from_binary(r) {
Ok(sk) => Ok(Some(sk)),
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
Err(e) => Err(e),
}
}
#[cfg(test)] #[cfg(test)]
#[path = "tests/reader.rs"] #[path = "tests/reader.rs"]
mod tests; mod tests;
-64
View File
@@ -1,64 +0,0 @@
use super::*;
use obikseq::set_k;
use std::io::Cursor;
fn make_sk(ascii: &[u8]) -> SuperKmer {
SuperKmer::from_ascii(ascii)
}
#[test]
fn roundtrip_single() {
set_k(4);
let sk = make_sk(b"ACGTACGT");
let mut buf = Vec::new();
write_superkmer(&mut buf, &sk).unwrap();
let mut cur = Cursor::new(&buf);
let got = read_superkmer(&mut cur).unwrap().unwrap();
assert_eq!(sk.to_ascii(), got.to_ascii());
assert_eq!(sk.seql(), got.seql());
}
#[test]
fn roundtrip_all_lengths() {
set_k(11);
let k: usize = 11;
let bases: Vec<u8> = (0..300).map(|i| b"ACGT"[i % 4]).collect();
// With k=11, seql=257 → n_kmers=247 ≤ 256: single chunk, no split.
for len in (k..=k + 8).chain([255, 256, 257]) {
let sk = make_sk(&bases[..len]);
let mut buf = Vec::new();
write_superkmer(&mut buf, &sk).unwrap();
let mut cur = Cursor::new(&buf);
let got = read_superkmer(&mut cur).unwrap().unwrap();
assert_eq!(sk.to_ascii(), got.to_ascii(), "len={len}");
assert_eq!(sk.seql(), got.seql(), "len={len}");
}
}
#[test]
fn eof_returns_none() {
set_k(4);
let buf: Vec<u8> = vec![];
let mut cur = Cursor::new(&buf);
assert!(read_superkmer(&mut cur).unwrap().is_none());
}
#[test]
fn multiple_records() {
set_k(4);
let seqs: &[&[u8]] = &[b"AAAA", b"CCCC", b"GGGG", b"TTTT"];
let mut buf = Vec::new();
for s in seqs {
write_superkmer(&mut buf, &make_sk(s)).unwrap();
}
let mut cur = Cursor::new(&buf);
for s in seqs {
let got = read_superkmer(&mut cur).unwrap().unwrap();
let expected = make_sk(s);
assert_eq!(expected.to_ascii(), got.to_ascii());
}
assert!(read_superkmer(&mut cur).unwrap().is_none());
}
+56 -19
View File
@@ -1,13 +1,19 @@
use super::*; use super::*;
use crate::pool::SKFileWriter; use crate::pool::SKFileWriter;
use std::io::Cursor;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
const TEST_K: usize = 4; const TEST_K: usize = 4;
fn setup() { fn setup() {
obikseq::params::set_k(TEST_K); obikseq::params::set_k(TEST_K);
} }
fn make_sk(ascii: &[u8]) -> SuperKmer {
SuperKmer::from_ascii(ascii)
}
fn make_sks(n: usize) -> Vec<SuperKmer> { fn make_sks(n: usize) -> Vec<SuperKmer> {
(0..n) (0..n)
.map(|i| { .map(|i| {
@@ -36,28 +42,59 @@ fn iter_all() {
} }
} }
// ── serialisation round-trips (formerly codec.rs tests) ──────────────────────
#[test] #[test]
fn reopen_and_seek() { fn roundtrip_single() {
setup(); setup();
let tmp = NamedTempFile::new().unwrap(); let sk = make_sk(b"ACGTACGT");
let sks = make_sks(20); let mut buf = Vec::new();
sk.write_to_binary(&mut buf).unwrap();
{ let mut cur = Cursor::new(&buf);
let mut w = SKFileWriter::create(tmp.path()).unwrap(); let got = read_superkmer(&mut cur).unwrap().unwrap();
w.write_batch(&sks).unwrap(); assert_eq!(sk.to_ascii(), got.to_ascii());
} assert_eq!(sk.seql(), got.seql());
}
let mut r = SKFileReader::open(tmp.path()).unwrap(); #[test]
let first = r.read_batch(10).unwrap(); fn roundtrip_all_lengths() {
r.close(); obikseq::params::set_k(11);
r.reopen_and_seek().unwrap(); let bases: Vec<u8> = (0..300).map(|i| b"ACGT"[i % 4]).collect();
let rest = r.read_batch(20).unwrap(); for len in (11..=19).chain([255, 256, 257]) {
assert_eq!(first.len(), 10); let sk = make_sk(&bases[..len]);
assert_eq!(rest.len(), 10); let mut buf = Vec::new();
for (a, b) in sks[..10].iter().zip(first.iter()) { sk.write_to_binary(&mut buf).unwrap();
assert_eq!(a.to_ascii(), b.to_ascii());
} let mut cur = Cursor::new(&buf);
for (a, b) in sks[10..].iter().zip(rest.iter()) { let got = read_superkmer(&mut cur).unwrap().unwrap();
assert_eq!(a.to_ascii(), b.to_ascii()); assert_eq!(sk.to_ascii(), got.to_ascii(), "len={len}");
assert_eq!(sk.seql(), got.seql(), "len={len}");
} }
} }
#[test]
fn eof_returns_none() {
setup();
let buf: Vec<u8> = vec![];
let mut cur = Cursor::new(&buf);
assert!(read_superkmer(&mut cur).unwrap().is_none());
}
#[test]
fn multiple_records() {
setup();
let seqs: &[&[u8]] = &[b"AAAA", b"CCCC", b"GGGG", b"TTTT"];
let mut buf = Vec::new();
for s in seqs {
make_sk(s).write_to_binary(&mut buf).unwrap();
}
let mut cur = Cursor::new(&buf);
for s in seqs {
let got = read_superkmer(&mut cur).unwrap().unwrap();
let expected = make_sk(s);
assert_eq!(expected.to_ascii(), got.to_ascii());
}
assert!(read_superkmer(&mut cur).unwrap().is_none());
}
+3 -9
View File
@@ -25,14 +25,7 @@ use crate::error::{SKError, SKResult};
const MAGIC: [u8; 4] = *b"UIDX"; const MAGIC: [u8; 4] = *b"UIDX";
fn idx_path(path: &Path) -> PathBuf { fn idx_path(path: &Path) -> PathBuf {
let mut s = path.as_os_str().to_owned(); crate::append_path_suffix(path, ".idx")
s.push(".idx");
PathBuf::from(s)
}
// Extract a sub-sequence [start, end) nucleotides from a unitig.
fn sub_unitig(unitig: &Unitig, start: usize, end: usize) -> Unitig {
unitig.sub(start, end)
} }
// ── Writer ──────────────────────────────────────────────────────────────────── // ── Writer ────────────────────────────────────────────────────────────────────
@@ -88,7 +81,7 @@ impl UnitigFileWriter {
let mut start = 0; let mut start = 0;
while start < seql { while start < seql {
let end = (start + chunk_nucl).min(seql); let end = (start + chunk_nucl).min(seql);
self.write_chunk(&sub_unitig(unitig, start, end))?; self.write_chunk(&unitig.sub(start, end))?;
if end == seql { if end == seql {
break; break;
} }
@@ -102,6 +95,7 @@ impl UnitigFileWriter {
let byte_len = (seql + 3) / 4; let byte_len = (seql + 3) / 4;
// Header is 1 byte (u8: n_kmers 1 = seql k); packed bytes follow. // Header is 1 byte (u8: n_kmers 1 = seql k); packed bytes follow.
debug_assert!(seql - self.k <= u8::MAX as usize, "chunk exceeds MAX_KMERS_PER_CHUNK");
self.packed_offsets.push(self.next_offset + 1); self.packed_offsets.push(self.next_offset + 1);
self.seqls.push((seql - self.k) as u8); self.seqls.push((seql - self.k) as u8);