diff --git a/src/obiskio/src/codec.rs b/src/obiskio/src/codec.rs deleted file mode 100644 index 1a0d065..0000000 --- a/src/obiskio/src/codec.rs +++ /dev/null @@ -1,21 +0,0 @@ -use obikseq::SuperKmer; -use std::io::{self, Read, Write}; - -/// Serialise one SuperKmer into `w` (uncompressed; caller must wrap with a compressor). -#[inline] -pub(crate) fn write_superkmer(w: &mut W, sk: &SuperKmer) -> io::Result<()> { - sk.write_to_binary(w) -} - -/// Deserialise one SuperKmer from `r`. Returns `None` on clean EOF. -pub(crate) fn read_superkmer(r: &mut R) -> io::Result> { - match SuperKmer::read_from_binary(r) { - Ok(sk) => Ok(Some(sk)), - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(None), - Err(e) => Err(e), - } -} - -#[cfg(test)] -#[path = "tests/codec.rs"] -mod tests; diff --git a/src/obiskio/src/error.rs b/src/obiskio/src/error.rs index 135cba2..a6fbf39 100644 --- a/src/obiskio/src/error.rs +++ b/src/obiskio/src/error.rs @@ -21,7 +21,7 @@ impl fmt::Display for SKError { SKError::Io(e) => write!(f, "I/O error: {e}"), SKError::Compression(e) => write!(f, "compression error: {e}"), SKError::BadMagic { expected, got } => { - write!(f, "bad magic in {expected}: expected {:?}, got {:?}", expected.as_bytes(), got) + write!(f, "bad magic: expected {:?}, got {:?}", expected.as_bytes(), got) } SKError::Truncated { context } => write!(f, "truncated file: {context}"), SKError::InvalidData { context, detail } => { diff --git a/src/obiskio/src/lib.rs b/src/obiskio/src/lib.rs index b6f5844..1c01539 100644 --- a/src/obiskio/src/lib.rs +++ b/src/obiskio/src/lib.rs @@ -1,6 +1,4 @@ -pub mod codec; pub mod error; -pub mod limits; pub mod meta; pub mod pool; pub mod reader; @@ -11,3 +9,13 @@ pub use meta::SKFileMeta; pub use pool::{create_token, create_token_with, SKFilePool, SharedPool, SKFileWriter}; pub use reader::{SKFileIter, SKFileReader}; pub use unitig_index::{UnitigFileReader, UnitigFileWriter}; + +use std::path::{Path, PathBuf}; + +/// Append `suffix` to a path without altering any existing extension. +/// Uses raw `OsStr` concatenation to preserve non-UTF-8 path components. +pub(crate) fn append_path_suffix(path: &Path, suffix: &str) -> PathBuf { + let mut s = path.as_os_str().to_owned(); + s.push(suffix); + PathBuf::from(s) +} diff --git a/src/obiskio/src/limits.rs b/src/obiskio/src/limits.rs deleted file mode 100644 index b5bf9dd..0000000 --- a/src/obiskio/src/limits.rs +++ /dev/null @@ -1,8 +0,0 @@ -use rustix::process::{Resource, getrlimit}; - -/// Returns the maximum number of file descriptors that can be simultaneously open -/// for the current process. This is the process' file descriptor limit as -/// configured by the operating system. -pub fn max_concurrent_files() -> Option { - getrlimit(Resource::Nofile).current -} diff --git a/src/obiskio/src/meta.rs b/src/obiskio/src/meta.rs index 95ca422..87f2aee 100644 --- a/src/obiskio/src/meta.rs +++ b/src/obiskio/src/meta.rs @@ -22,9 +22,7 @@ impl SKFileMeta { /// Path of the sidecar file for a given SuperKmer file path. pub fn sidecar_path(sk_path: &Path) -> PathBuf { - let mut s = sk_path.as_os_str().to_owned(); - s.push(".meta"); - PathBuf::from(s) + crate::append_path_suffix(sk_path, ".meta") } /// Read the sidecar for `sk_path`. Returns `None` if the sidecar is absent. diff --git a/src/obiskio/src/pool.rs b/src/obiskio/src/pool.rs index 85ce42c..8360d60 100644 --- a/src/obiskio/src/pool.rs +++ b/src/obiskio/src/pool.rs @@ -1,11 +1,10 @@ -use crate::codec::write_superkmer; use crate::error::SKResult; -use crate::limits::max_concurrent_files; use crate::meta::SKFileMeta; use lru::LruCache; use niffler::Level; use niffler::send::compression::Format; use obikseq::SuperKmer; +use rustix::process::{Resource, getrlimit}; use std::fs::{File, OpenOptions}; use std::io::{BufWriter, Write}; use std::num::NonZeroUsize; @@ -88,7 +87,7 @@ impl SKFilePool { /// Derive pool size from the OS fd limit (75 %, clamped to `[16, MAX_POOL_SIZE]`). pub fn from_system_limits() -> Self { - let fd_limit = max_concurrent_files().unwrap_or(256) as usize; + let fd_limit = getrlimit(Resource::Nofile).current.unwrap_or(256) as usize; let max_open = (fd_limit * 3 / 4).clamp(16, MAX_POOL_SIZE); Self::new(max_open) } @@ -131,13 +130,9 @@ impl SKFilePool { // ── private ─────────────────────────────────────────────────────────────── - /// Create file on disk (empty Zstd frame, fd immediately closed), register entry. + /// Create file on disk (empty, fd immediately closed), register entry. fn register(&mut self, path: PathBuf, format: Format, level: Level) -> SKResult { - { - let file = File::create(&path)?; - let _w = niffler::send::get_writer(Box::new(BufWriter::new(file)), format, level)?; - // _w drops here → empty frame finalized, fd released - } + File::create(&path)?; let id = self.entries.len(); self.entries.push(WriteEntry { path, @@ -180,6 +175,8 @@ impl SKFilePool { /// Uses `try_lock()` so that entries currently being written to are skipped. /// Returns an error if all open fds are in use. fn evict_lru(&mut self) -> SKResult<()> { + // Collect ids before the loop: iterating `self.open` borrows it immutably, + // which would conflict with `self.open.pop()` inside the loop body. // iter() yields MRU→LRU; rev() gives LRU→MRU (try least recently used first). let candidates: Vec = self.open.iter().rev().map(|(&id, _)| id).collect(); for lru_id in candidates { @@ -285,7 +282,7 @@ impl SKFileWriter { } fn write_one(&mut self, sk: &SuperKmer) -> SKResult<()> { - write_superkmer(&mut self.pending, sk)?; + sk.write_to_binary(&mut self.pending)?; self.meta.instances += 1; self.meta.count_sum += sk.count() as u64; self.meta.length_sum += sk.seql() as u64; diff --git a/src/obiskio/src/reader.rs b/src/obiskio/src/reader.rs index 139c020..3b9ed33 100644 --- a/src/obiskio/src/reader.rs +++ b/src/obiskio/src/reader.rs @@ -1,23 +1,16 @@ -use crate::codec::read_superkmer; use crate::error::{SKError, SKResult}; use obikseq::superkmer::SuperKmer; use std::fs::File; -use std::io::BufReader; +use std::io::{self, BufReader, Read}; use std::path::{Path, PathBuf}; /// Binary reader for SuperKmers, with transparent decompression via niffler. /// /// Access is strictly sequential. Call [`iter`](SKFileReader::iter) to get an /// [`Iterator`] over the SuperKmers. -/// -/// The reader also supports LRU-pool eviction and recovery: when physically -/// closed by the pool, it records how many SuperKmers have been consumed so -/// that it can fast-forward on next open. pub struct SKFileReader { path: PathBuf, - reader: Option>, - /// Number of SuperKmers successfully read so far (for eviction recovery). - consumed: u64, + reader: Box, } impl SKFileReader { @@ -26,26 +19,12 @@ impl SKFileReader { let path = path.as_ref().to_owned(); let (reader, _fmt) = niffler::send::get_reader(Box::new(BufReader::new(File::open(&path)?)))?; - Ok(Self { - path, - reader: Some(reader), - consumed: 0, - }) + Ok(Self { path, reader }) } /// Read the next SuperKmer, or `None` at EOF. pub fn read(&mut self) -> SKResult> { - let r = self.reader.as_mut().ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::BrokenPipe, - "read from physically closed SKFileReader", - ) - })?; - let result = read_superkmer(r)?; - if result.is_some() { - self.consumed += 1; - } - Ok(result) + Ok(read_superkmer(&mut self.reader)?) } /// Read up to `n` SuperKmers. @@ -60,21 +39,6 @@ impl SKFileReader { Ok(batch) } - /// Close the physical file handle without resetting the consumed counter. - pub fn close(&mut self) { - self.reader = None; - } - - /// `true` if the underlying file handle is currently open. - pub fn is_open(&self) -> bool { - self.reader.is_some() - } - - /// Number of SuperKmers successfully read since the file was opened. - pub fn consumed(&self) -> u64 { - self.consumed - } - /// Physical path of this file. pub fn path(&self) -> &Path { &self.path @@ -82,31 +46,7 @@ impl SKFileReader { /// Return an iterator over this reader. pub fn iter(&mut self) -> SKFileIter<'_> { - SKFileIter { - reader: self, - error: None, - } - } - - // ── pool-internal helpers ───────────────────────────────────────────────── - - /// Reopen after eviction and fast-forward to the last recorded position. - /// Cost is O(`consumed`) decompression; acceptable for sequential pipelines - /// where eviction of read-mode files is rare. - pub fn reopen_and_seek(&mut self) -> SKResult<()> { - debug_assert!(self.reader.is_none(), "reopen_and_seek on open reader"); - let (reader, _fmt) = - niffler::send::get_reader(Box::new(BufReader::new(File::open(&self.path)?)))?; - self.reader = Some(reader); - let target = self.consumed; - self.consumed = 0; - for _ in 0..target { - match read_superkmer(self.reader.as_mut().unwrap())? { - Some(_) => self.consumed += 1, - None => break, - } - } - Ok(()) + SKFileIter { reader: self, error: None } } } @@ -142,6 +82,14 @@ impl Iterator for SKFileIter<'_> { } } +fn read_superkmer(r: &mut R) -> io::Result> { + match SuperKmer::read_from_binary(r) { + Ok(sk) => Ok(Some(sk)), + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(None), + Err(e) => Err(e), + } +} + #[cfg(test)] #[path = "tests/reader.rs"] mod tests; diff --git a/src/obiskio/src/tests/codec.rs b/src/obiskio/src/tests/codec.rs deleted file mode 100644 index 788c4b1..0000000 --- a/src/obiskio/src/tests/codec.rs +++ /dev/null @@ -1,64 +0,0 @@ -use super::*; -use obikseq::set_k; -use std::io::Cursor; - -fn make_sk(ascii: &[u8]) -> SuperKmer { - SuperKmer::from_ascii(ascii) -} - -#[test] -fn roundtrip_single() { - set_k(4); - let sk = make_sk(b"ACGTACGT"); - let mut buf = Vec::new(); - write_superkmer(&mut buf, &sk).unwrap(); - - let mut cur = Cursor::new(&buf); - let got = read_superkmer(&mut cur).unwrap().unwrap(); - assert_eq!(sk.to_ascii(), got.to_ascii()); - assert_eq!(sk.seql(), got.seql()); -} - -#[test] -fn roundtrip_all_lengths() { - set_k(11); - let k: usize = 11; - let bases: Vec = (0..300).map(|i| b"ACGT"[i % 4]).collect(); - // With k=11, seql=257 → n_kmers=247 ≤ 256: single chunk, no split. - for len in (k..=k + 8).chain([255, 256, 257]) { - let sk = make_sk(&bases[..len]); - let mut buf = Vec::new(); - write_superkmer(&mut buf, &sk).unwrap(); - - let mut cur = Cursor::new(&buf); - let got = read_superkmer(&mut cur).unwrap().unwrap(); - assert_eq!(sk.to_ascii(), got.to_ascii(), "len={len}"); - assert_eq!(sk.seql(), got.seql(), "len={len}"); - } -} - -#[test] -fn eof_returns_none() { - set_k(4); - let buf: Vec = vec![]; - let mut cur = Cursor::new(&buf); - assert!(read_superkmer(&mut cur).unwrap().is_none()); -} - -#[test] -fn multiple_records() { - set_k(4); - let seqs: &[&[u8]] = &[b"AAAA", b"CCCC", b"GGGG", b"TTTT"]; - let mut buf = Vec::new(); - for s in seqs { - write_superkmer(&mut buf, &make_sk(s)).unwrap(); - } - - let mut cur = Cursor::new(&buf); - for s in seqs { - let got = read_superkmer(&mut cur).unwrap().unwrap(); - let expected = make_sk(s); - assert_eq!(expected.to_ascii(), got.to_ascii()); - } - assert!(read_superkmer(&mut cur).unwrap().is_none()); -} diff --git a/src/obiskio/src/tests/reader.rs b/src/obiskio/src/tests/reader.rs index c8a8cdd..57378c5 100644 --- a/src/obiskio/src/tests/reader.rs +++ b/src/obiskio/src/tests/reader.rs @@ -1,13 +1,19 @@ use super::*; use crate::pool::SKFileWriter; +use std::io::Cursor; use tempfile::NamedTempFile; + const TEST_K: usize = 4; fn setup() { obikseq::params::set_k(TEST_K); } +fn make_sk(ascii: &[u8]) -> SuperKmer { + SuperKmer::from_ascii(ascii) +} + fn make_sks(n: usize) -> Vec { (0..n) .map(|i| { @@ -36,28 +42,59 @@ fn iter_all() { } } +// ── serialisation round-trips (formerly codec.rs tests) ────────────────────── + #[test] -fn reopen_and_seek() { +fn roundtrip_single() { setup(); - let tmp = NamedTempFile::new().unwrap(); - let sks = make_sks(20); + let sk = make_sk(b"ACGTACGT"); + let mut buf = Vec::new(); + sk.write_to_binary(&mut buf).unwrap(); - { - let mut w = SKFileWriter::create(tmp.path()).unwrap(); - w.write_batch(&sks).unwrap(); - } + let mut cur = Cursor::new(&buf); + let got = read_superkmer(&mut cur).unwrap().unwrap(); + assert_eq!(sk.to_ascii(), got.to_ascii()); + assert_eq!(sk.seql(), got.seql()); +} - let mut r = SKFileReader::open(tmp.path()).unwrap(); - let first = r.read_batch(10).unwrap(); - r.close(); - r.reopen_and_seek().unwrap(); - let rest = r.read_batch(20).unwrap(); - assert_eq!(first.len(), 10); - assert_eq!(rest.len(), 10); - for (a, b) in sks[..10].iter().zip(first.iter()) { - assert_eq!(a.to_ascii(), b.to_ascii()); - } - for (a, b) in sks[10..].iter().zip(rest.iter()) { - assert_eq!(a.to_ascii(), b.to_ascii()); +#[test] +fn roundtrip_all_lengths() { + obikseq::params::set_k(11); + let bases: Vec = (0..300).map(|i| b"ACGT"[i % 4]).collect(); + for len in (11..=19).chain([255, 256, 257]) { + let sk = make_sk(&bases[..len]); + let mut buf = Vec::new(); + sk.write_to_binary(&mut buf).unwrap(); + + let mut cur = Cursor::new(&buf); + let got = read_superkmer(&mut cur).unwrap().unwrap(); + assert_eq!(sk.to_ascii(), got.to_ascii(), "len={len}"); + assert_eq!(sk.seql(), got.seql(), "len={len}"); } } + +#[test] +fn eof_returns_none() { + setup(); + let buf: Vec = vec![]; + let mut cur = Cursor::new(&buf); + assert!(read_superkmer(&mut cur).unwrap().is_none()); +} + +#[test] +fn multiple_records() { + setup(); + let seqs: &[&[u8]] = &[b"AAAA", b"CCCC", b"GGGG", b"TTTT"]; + let mut buf = Vec::new(); + for s in seqs { + make_sk(s).write_to_binary(&mut buf).unwrap(); + } + + let mut cur = Cursor::new(&buf); + for s in seqs { + let got = read_superkmer(&mut cur).unwrap().unwrap(); + let expected = make_sk(s); + assert_eq!(expected.to_ascii(), got.to_ascii()); + } + assert!(read_superkmer(&mut cur).unwrap().is_none()); +} diff --git a/src/obiskio/src/unitig_index.rs b/src/obiskio/src/unitig_index.rs index 7bb7b5c..5f47c5e 100644 --- a/src/obiskio/src/unitig_index.rs +++ b/src/obiskio/src/unitig_index.rs @@ -25,14 +25,7 @@ use crate::error::{SKError, SKResult}; const MAGIC: [u8; 4] = *b"UIDX"; fn idx_path(path: &Path) -> PathBuf { - let mut s = path.as_os_str().to_owned(); - s.push(".idx"); - PathBuf::from(s) -} - -// Extract a sub-sequence [start, end) nucleotides from a unitig. -fn sub_unitig(unitig: &Unitig, start: usize, end: usize) -> Unitig { - unitig.sub(start, end) + crate::append_path_suffix(path, ".idx") } // ── Writer ──────────────────────────────────────────────────────────────────── @@ -88,7 +81,7 @@ impl UnitigFileWriter { let mut start = 0; while start < seql { let end = (start + chunk_nucl).min(seql); - self.write_chunk(&sub_unitig(unitig, start, end))?; + self.write_chunk(&unitig.sub(start, end))?; if end == seql { break; } @@ -102,6 +95,7 @@ impl UnitigFileWriter { let byte_len = (seql + 3) / 4; // Header is 1 byte (u8: n_kmers − 1 = seql − k); packed bytes follow. + debug_assert!(seql - self.k <= u8::MAX as usize, "chunk exceeds MAX_KMERS_PER_CHUNK"); self.packed_offsets.push(self.next_offset + 1); self.seqls.push((seql - self.k) as u8);