From c09d17401df8d8a17790eed4ed75b30129241cb0 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 24 Apr 2026 21:07:58 +0200 Subject: [PATCH] + obiskio: add binary I/O with LRU pool and compression - Add new obiskio crate for high-performance SuperKmer serialization/deserialization - Implement binary codec with 2-bit packed sequence encoding and raw header format (32 bits) - Add transparent compression support via niffler: Zstd, Gzip/Bgzf/Lz4 - Implement SKFilePool with LRU-based fd management, max-concurrent-fd limiting (75% of ulimit) - Add SKFileWriter with batched writes, configurable flush threshold (8 KiB default), and two-phase locking - Add SKFileReader with sequential access, LRU recovery via reopen_and_seek() + New obikpartitionner crate: basic header/seq handling for binary super-kmer format - Bump niffler from 2.7 to v3, add dependencies: allocator-api2, bitflags(>=1), errno/fastrand/rustix/tempfile/lru/hashbrown/bzip2/thiserror - Update workspace members to include obikpartitionner andobiskio --- src/Cargo.lock | 171 +++++++- src/Cargo.toml | 2 +- src/obikpartitionner/Cargo.toml | 6 + src/obikpartitionner/src/lib.rs | 3 + src/obikpartitionner/src/manager.rs | 150 +++++++ src/obikseq/src/superkmer.rs | 14 + src/obiskio/Cargo.toml | 14 + src/obiskio/src/codec.rs | 94 +++++ src/obiskio/src/error.rs | 40 ++ src/obiskio/src/lib.rs | 9 + src/obiskio/src/limits.rs | 8 + src/obiskio/src/pool.rs | 615 ++++++++++++++++++++++++++++ src/obiskio/src/reader.rs | 203 +++++++++ 13 files changed, 1324 insertions(+), 5 deletions(-) create mode 100644 src/obikpartitionner/Cargo.toml create mode 100644 src/obikpartitionner/src/lib.rs create mode 100644 src/obikpartitionner/src/manager.rs create mode 100644 src/obiskio/Cargo.toml create mode 100644 src/obiskio/src/codec.rs create mode 100644 src/obiskio/src/error.rs create mode 100644 src/obiskio/src/lib.rs create mode 100644 src/obiskio/src/limits.rs create mode 100644 src/obiskio/src/pool.rs create mode 100644 src/obiskio/src/reader.rs diff --git a/src/Cargo.lock b/src/Cargo.lock index b45cdce..8191eed 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -17,6 +17,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "anes" version = "0.2.1" @@ -85,6 +91,24 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bgzip" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b64fd8980fb64af5951bc05de7772b598150a6f7eac42ec17f73e8489915f99b" +dependencies = [ + "flate2", + "log", + "rayon", + "thiserror 1.0.69", +] + +[[package]] +name = "bitflags" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" + [[package]] name = "bitvec" version = "1.0.1" @@ -131,6 +155,15 @@ dependencies = [ "libc", ] +[[package]] +name = "bzip2" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49ecfb22d906f800d4fe833b6282cf4dc1c298f5057ca0b5445e5c209735ca47" +dependencies = [ + "bzip2-sys", +] + [[package]] name = "bzip2-sys" version = "0.1.13+1.0.8" @@ -333,6 +366,28 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + +[[package]] +name = "fastrand" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -355,6 +410,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -404,6 +465,17 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] + [[package]] name = "heck" version = "0.5.0" @@ -586,6 +658,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.8.2" @@ -598,6 +676,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown", +] + [[package]] name = "matchers" version = "0.2.0" @@ -629,11 +716,26 @@ version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e84f25af5326bacf3cbc3b9d2586d096426b10c889050cab891f89c9cb65932" dependencies = [ - "bzip2", + "bzip2 0.4.4", "cfg-if", "flate2", "liblzma", - "thiserror", + "thiserror 2.0.18", + "zstd", +] + +[[package]] +name = "niffler" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62d3115cc93dac9fabdfd25f7d4fda51ad4a959fb4667d39af20f70bddfb3574" +dependencies = [ + "bgzip", + "bzip2 0.5.2", + "cfg-if", + "flate2", + "liblzma", + "thiserror 2.0.18", "zstd", ] @@ -676,6 +778,10 @@ dependencies = [ "obiskbuilder", ] +[[package]] +name = "obikpartitionner" +version = "0.1.0" + [[package]] name = "obikrope" version = "0.1.0" @@ -704,7 +810,7 @@ name = "obiread" version = "0.1.0" dependencies = [ "infer", - "niffler", + "niffler 2.7.0", "obikrope", "regex", "tracing", @@ -721,6 +827,17 @@ dependencies = [ "obikseq", ] +[[package]] +name = "obiskio" +version = "0.1.0" +dependencies = [ + "lru", + "niffler 3.0.0", + "obikseq", + "rustix", + "tempfile", +] + [[package]] name = "once_cell" version = "1.21.4" @@ -859,6 +976,19 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.38" @@ -1025,13 +1155,46 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tempfile" +version = "3.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" +dependencies = [ + "fastrand", + "getrandom 0.3.4", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.18", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] diff --git a/src/Cargo.toml b/src/Cargo.toml index a68f64c..a5059b1 100644 --- a/src/Cargo.toml +++ b/src/Cargo.toml @@ -1,3 +1,3 @@ [workspace] resolver = "3" -members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline"] +members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline", "obikpartitionner","obiskio"] diff --git a/src/obikpartitionner/Cargo.toml b/src/obikpartitionner/Cargo.toml new file mode 100644 index 0000000..b7aecdc --- /dev/null +++ b/src/obikpartitionner/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "obikpartitionner" +version = "0.1.0" +edition = "2024" + +[dependencies] diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs new file mode 100644 index 0000000..a2dd64f --- /dev/null +++ b/src/obikpartitionner/src/lib.rs @@ -0,0 +1,3 @@ +mod limits; + +pub use limits::max_concurrent_files; diff --git a/src/obikpartitionner/src/manager.rs b/src/obikpartitionner/src/manager.rs new file mode 100644 index 0000000..135aa0c --- /dev/null +++ b/src/obikpartitionner/src/manager.rs @@ -0,0 +1,150 @@ +use niffler::compression::{Format, Level, from_reader, from_writer}; +use std::fs::File; +use std::io::{self, BufReader, BufWriter, Read, Write}; +use std::path::Path; + +// ---------- Format binaire d'un super‑kmer (encodage 2 bits) ---------- +pub struct SuperKmerHeader(u32); + +impl SuperKmerHeader { + pub fn seq_len(&self) -> usize { + let seql = (self.0 & 0xFF) as u8; + if seql == 0 { 256 } else { seql as usize } + } + pub fn to_bits(&self) -> u32 { + self.0 + } + pub fn from_bits(bits: u32) -> Self { + Self(bits) + } +} + +pub struct SuperKmer { + pub header: SuperKmerHeader, + pub seq: Box<[u8]>, // déjà encodée en 2 bits par base +} + +impl SuperKmer { + /// Écrit ce super‑kmer dans un écrivain binaire (non compressé). + pub fn write_raw(&self, w: &mut W) -> io::Result<()> { + w.write_all(&self.header.to_bits().to_le_bytes())?; + w.write_all(&self.seq)?; + Ok(()) + } + + /// Lit un super‑kmer depuis un lecteur binaire (non compressé). + /// Retourne `None` si EOF. + pub fn read_raw(r: &mut R, buf_seq: &mut Vec) -> io::Result> { + let mut header_bytes = [0u8; 4]; + if let Err(e) = r.read_exact(&mut header_bytes) { + return if e.kind() == io::ErrorKind::UnexpectedEof { + Ok(None) + } else { + Err(e) + }; + } + let header = SuperKmerHeader::from_bits(u32::from_le_bytes(header_bytes)); + let len_bytes = (header.seq_len() + 3) / 4; // nombre d'octets encodés + buf_seq.clear(); + buf_seq.resize(len_bytes, 0); + r.read_exact(buf_seq)?; + let seq = buf_seq.clone().into_boxed_slice(); + Ok(Some(SuperKmer { header, seq })) + } +} + +// ---------- PartitionManager avec compression (via niffler) ---------- +pub enum CompressionFormat { + Gzip, // .gz + Zstd, // .zst + Lz4, // .lz4 + Bgzf, // .bgzf (indexable par blocs) + None, // pas de compression +} + +impl CompressionFormat { + /// Infère le format à partir de l'extension du fichier. + pub fn from_extension(path: &Path) -> Option { + match path.extension()?.to_str()? { + "gz" => Some(CompressionFormat::Gzip), + "zst" => Some(CompressionFormat::Zstd), + "lz4" => Some(CompressionFormat::Lz4), + "bgzf" => Some(CompressionFormat::Bgzf), + "raw" => Some(CompressionFormat::None), + _ => None, + } + } + + /// Extension de fichier recommandée. + pub fn extension(&self) -> &'static str { + match self { + CompressionFormat::Gzip => "gz", + CompressionFormat::Zstd => "zst", + CompressionFormat::Lz4 => "lz4", + CompressionFormat::Bgzf => "bgzf", + CompressionFormat::None => "raw", + } + } +} + +pub struct PartitionWriter { + writer: Box, // le flux compressé + // buffer interne pour réutiliser les écritures (optionnel) +} + +impl PartitionWriter { + /// Ouvre un fichier en écriture avec la compression demandée. + pub fn create(path: &Path, format: CompressionFormat) -> io::Result { + let file = File::create(path)?; + const DEFAULT_LEVEL: Level = Level::Default; // peut être ajusté + let writer: Box = match format { + CompressionFormat::Gzip => Box::new(from_writer(file, Format::Gzip, DEFAULT_LEVEL)?), + CompressionFormat::Zstd => Box::new(from_writer(file, Format::Zstd, DEFAULT_LEVEL)?), + CompressionFormat::Lz4 => Box::new(from_writer(file, Format::Lz4, DEFAULT_LEVEL)?), + CompressionFormat::Bgzf => Box::new(from_writer(file, Format::Bgzf, DEFAULT_LEVEL)?), + CompressionFormat::None => Box::new(BufWriter::new(file)), + }; + Ok(PartitionWriter { writer }) + } + + /// Écrit un super‑kmer (non compressé individuellement) dans le flux compressé. + pub fn write_kmer(&mut self, kmer: &SuperKmer) -> io::Result<()> { + kmer.write_raw(&mut self.writer) + } + + /// Flush final. + pub fn finish(mut self) -> io::Result<()> { + self.writer.flush() + } +} + +pub struct PartitionReader { + reader: Box, + seq_buf: Vec, // réutilisation pour les séquences +} + +impl PartitionReader { + /// Ouvre un fichier en lecture. Détecte automatiquement le format de compression + /// grâce à `niffler::sniff` ou via l'extension. + pub fn open(path: &Path) -> io::Result { + let file = File::open(path)?; + // `niffler::sniff` examine les premiers octets pour choisir le décompresseur + let reader = match niffler::sniff(Box::new(file)) { + Ok(r) => r, + Err(e) => { + // Si aucune signature connue, on suppose raw + eprintln!("Aucune signature de compression trouvée, lecture brute."); + Box::new(BufReader::new(file)) as Box + } + }; + Ok(PartitionReader { + reader, + seq_buf: Vec::with_capacity(256), + }) + } + + /// Lit le prochain super‑kmer. Retourne `None` à la fin du fichier. + pub fn read_next(&mut self) -> io::Result> { + SuperKmer::read_raw(&mut self.reader, &mut self.seq_buf) + } +} diff --git a/src/obikseq/src/superkmer.rs b/src/obikseq/src/superkmer.rs index 98f6b26..b87ca16 100644 --- a/src/obikseq/src/superkmer.rs +++ b/src/obikseq/src/superkmer.rs @@ -265,6 +265,20 @@ impl SuperKmer { buf } + /// Returns the raw 32-bit header word for binary serialisation. + /// Bits [7:0] = seql encoding (0→256, 1-255 direct). Bits [31:8] = payload. + #[inline] + pub fn header_bits(&self) -> u32 { + self.header.0 + } + + /// Returns a read-only view of the packed 2-bit sequence bytes. + /// Length is always `(seql() + 3) / 4` bytes. + #[inline] + pub fn seq_bytes(&self) -> &[u8] { + &self.seq + } + /// Extract the kmer of length k starting at nucleotide position i (0-based). /// /// Returns an error if k is invalid (0 or > 32) or if position i + k exceeds the sequence length. diff --git a/src/obiskio/Cargo.toml b/src/obiskio/Cargo.toml new file mode 100644 index 0000000..c1235a0 --- /dev/null +++ b/src/obiskio/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "obiskio" +version = "0.1.0" +edition = "2024" + +[dependencies] +niffler = "3.0.0" +rustix = { version = "1.1.4", features = ["process"] } +lru = "0.12" + +obikseq = { path = "../obikseq" } + +[dev-dependencies] +tempfile = "3" diff --git a/src/obiskio/src/codec.rs b/src/obiskio/src/codec.rs new file mode 100644 index 0000000..62f90ae --- /dev/null +++ b/src/obiskio/src/codec.rs @@ -0,0 +1,94 @@ +use obikseq::superkmer::SuperKmer; +use std::io::{self, Read, Write}; + +/// Serialise one SuperKmer into `w` (uncompressed; caller must wrap with a compressor). +#[inline] +pub(crate) fn write_superkmer(w: &mut W, sk: &SuperKmer) -> io::Result<()> { + w.write_all(&sk.header_bits().to_le_bytes())?; + w.write_all(sk.seq_bytes()) +} + +/// Deserialise one SuperKmer from `r`. Returns `None` on clean EOF. +/// `seq_buf` is a reusable scratch buffer to avoid per-record allocation. +pub(crate) fn read_superkmer( + r: &mut R, + seq_buf: &mut Vec, +) -> io::Result> { + let mut hdr = [0u8; 4]; + match r.read_exact(&mut hdr) { + Ok(()) => {} + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), + Err(e) => return Err(e), + } + let bits = u32::from_le_bytes(hdr); + let seql_byte = (bits & 0xFF) as u8; + let nt_len: usize = if seql_byte == 0 { 256 } else { seql_byte as usize }; + let byte_len = (nt_len + 3) / 4; + seq_buf.resize(byte_len, 0); + r.read_exact(seq_buf)?; + Ok(Some(SuperKmer::new(seql_byte, seq_buf.as_slice().into()))) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + fn make_sk(ascii: &[u8]) -> SuperKmer { + SuperKmer::from_ascii(ascii) + } + + #[test] + fn roundtrip_single() { + let sk = make_sk(b"ACGTACGT"); + let mut buf = Vec::new(); + write_superkmer(&mut buf, &sk).unwrap(); + + let mut cur = Cursor::new(&buf); + let mut seq_buf = Vec::new(); + let got = read_superkmer(&mut cur, &mut seq_buf).unwrap().unwrap(); + assert_eq!(sk.to_ascii(), got.to_ascii()); + assert_eq!(sk.seql(), got.seql()); + } + + #[test] + fn roundtrip_all_lengths() { + let bases: Vec = (0..256).map(|i| b"ACGT"[i % 4]).collect(); + for len in (1..=9).chain([255, 256]) { + let sk = make_sk(&bases[..len]); + let mut buf = Vec::new(); + write_superkmer(&mut buf, &sk).unwrap(); + + let mut cur = Cursor::new(&buf); + let mut seq_buf = Vec::new(); + let got = read_superkmer(&mut cur, &mut seq_buf).unwrap().unwrap(); + assert_eq!(sk.to_ascii(), got.to_ascii(), "len={len}"); + } + } + + #[test] + fn eof_returns_none() { + let buf: Vec = vec![]; + let mut cur = Cursor::new(&buf); + let mut seq_buf = Vec::new(); + assert!(read_superkmer(&mut cur, &mut seq_buf).unwrap().is_none()); + } + + #[test] + fn multiple_records() { + let seqs: &[&[u8]] = &[b"AAAA", b"CCCC", b"GGGG", b"TTTT"]; + let mut buf = Vec::new(); + for s in seqs { + write_superkmer(&mut buf, &make_sk(s)).unwrap(); + } + + let mut cur = Cursor::new(&buf); + let mut seq_buf = Vec::new(); + for s in seqs { + let got = read_superkmer(&mut cur, &mut seq_buf).unwrap().unwrap(); + let expected = make_sk(s); + assert_eq!(expected.to_ascii(), got.to_ascii()); + } + assert!(read_superkmer(&mut cur, &mut seq_buf).unwrap().is_none()); + } +} diff --git a/src/obiskio/src/error.rs b/src/obiskio/src/error.rs new file mode 100644 index 0000000..969ec3d --- /dev/null +++ b/src/obiskio/src/error.rs @@ -0,0 +1,40 @@ +use std::fmt; +use std::io; + +#[derive(Debug)] +pub enum SKError { + Io(io::Error), + Compression(niffler::Error), +} + +pub type SKResult = Result; + +impl fmt::Display for SKError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SKError::Io(e) => write!(f, "I/O error: {e}"), + SKError::Compression(e) => write!(f, "compression error: {e}"), + } + } +} + +impl std::error::Error for SKError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + SKError::Io(e) => Some(e), + SKError::Compression(e) => Some(e), + } + } +} + +impl From for SKError { + fn from(e: io::Error) -> Self { + SKError::Io(e) + } +} + +impl From for SKError { + fn from(e: niffler::Error) -> Self { + SKError::Compression(e) + } +} diff --git a/src/obiskio/src/lib.rs b/src/obiskio/src/lib.rs new file mode 100644 index 0000000..041cddd --- /dev/null +++ b/src/obiskio/src/lib.rs @@ -0,0 +1,9 @@ +pub mod codec; +pub mod error; +pub mod limits; +pub mod pool; +pub mod reader; + +pub use error::{SKError, SKResult}; +pub use pool::{create_token, create_token_with, SKFilePool, SharedPool, SKFileWriter}; +pub use reader::{SKFileIter, SKFileReader}; diff --git a/src/obiskio/src/limits.rs b/src/obiskio/src/limits.rs new file mode 100644 index 0000000..b5bf9dd --- /dev/null +++ b/src/obiskio/src/limits.rs @@ -0,0 +1,8 @@ +use rustix::process::{Resource, getrlimit}; + +/// Returns the maximum number of file descriptors that can be simultaneously open +/// for the current process. This is the process' file descriptor limit as +/// configured by the operating system. +pub fn max_concurrent_files() -> Option { + getrlimit(Resource::Nofile).current +} diff --git a/src/obiskio/src/pool.rs b/src/obiskio/src/pool.rs new file mode 100644 index 0000000..3b28441 --- /dev/null +++ b/src/obiskio/src/pool.rs @@ -0,0 +1,615 @@ +use crate::codec::write_superkmer; +use crate::error::SKResult; +use crate::limits::max_concurrent_files; +use lru::LruCache; +use niffler::send::compression::Format; +use niffler::Level; +use obikseq::superkmer::SuperKmer; +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Write}; +use std::num::NonZeroUsize; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, OnceLock}; + +/// Hard upper bound on pool size regardless of OS fd limit. +pub const MAX_POOL_SIZE: usize = 512; + +/// Default pending buffer threshold (bytes) before draining to the physical fd. +pub const DEFAULT_FLUSH_THRESHOLD: usize = 8 * 1024; + +// Convenient alias for the per-entry physical writer slot. +type PhysWriter = Option>; + +// ── WriteEntry ───────────────────────────────────────────────────────────────── + +struct WriteEntry { + path: PathBuf, + format: Format, + level: Level, + /// Per-entry mutex for the physical fd. + /// Independent of the pool mutex to allow parallel writes to different entries. + fd: Arc>, + logically_closed: bool, +} + +// ── SKFilePool ───────────────────────────────────────────────────────────────── + +/// LRU pool of open write file descriptors. +/// +/// # Locking model +/// +/// Two independent locks: +/// +/// | Lock | Scope | Held during | +/// |---|---|---| +/// | `Arc>` (pool lock) | all entries | LRU management only — microseconds | +/// | `Arc>` (entry lock) | one entry | Writing a pending buffer to the fd | +/// +/// **Pool lock is never held while writing data.** This allows parallel writes +/// to different partitions. +/// +/// Lock ordering: always pool lock → entry lock, never reverse. +/// +/// # Eviction +/// +/// `evict_lru()` uses `try_lock()` on entry fd locks. +/// An entry whose fd is currently being written to is skipped; the next LRU +/// candidate is tried instead. If all open fds are in use, an error is returned. +pub struct SKFilePool { + max_open: usize, + /// All registered entries. Index = stable token id. Never shrinks. + entries: Vec, + /// IDs of entries currently holding an open fd, in LRU order. + /// + /// Invariant: `id ∈ open ↔ entries[id].fd.lock().is_some()` + open: LruCache, +} + +/// Shared reference to a pool; the primary way to create `SKFileWriter`s. +pub type SharedPool = Arc>; + +static GLOBAL_POOL: OnceLock = OnceLock::new(); + +fn global_pool() -> &'static SharedPool { + GLOBAL_POOL.get_or_init(|| Arc::new(Mutex::new(SKFilePool::from_system_limits()))) +} + +impl SKFilePool { + /// Create a pool allowing at most `max_open` simultaneously open fds. + pub fn new(max_open: usize) -> Self { + let cap = NonZeroUsize::new(max_open.max(1)).unwrap(); + Self { max_open, entries: Vec::new(), open: LruCache::new(cap) } + } + + /// Derive pool size from the OS fd limit (75 %, clamped to `[16, MAX_POOL_SIZE]`). + pub fn from_system_limits() -> Self { + let fd_limit = max_concurrent_files().unwrap_or(256) as usize; + let max_open = (fd_limit * 3 / 4).clamp(16, MAX_POOL_SIZE); + Self::new(max_open) + } + + pub fn max_open(&self) -> usize { + self.max_open + } + + /// Total number of registered entries (open + evicted). + pub fn len(&self) -> usize { + self.entries.len() + } + + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + /// Number of currently open fds — O(1). + pub fn open_count(&self) -> usize { + self.open.len() + } + + /// Flush and close all registered entries. O(n), one-time. + /// + /// Call only after all concurrent drains have completed (e.g., after joining + /// all write threads). Tokens with unflushed pending must be flushed or closed + /// before this call, or their data will be lost. + pub fn close_all(&mut self) -> SKResult<()> { + for entry in self.entries.iter_mut() { + entry.logically_closed = true; + let mut fd_guard = entry.fd.lock().unwrap(); + if let Some(mut w) = fd_guard.take() { + w.flush()?; + // drop(w) → Zstd frame footer written + } + } + self.open.clear(); + Ok(()) + } + + // ── private ─────────────────────────────────────────────────────────────── + + /// Create file on disk (empty Zstd frame, fd immediately closed), register entry. + fn register(&mut self, path: PathBuf, format: Format, level: Level) -> SKResult { + { + let file = File::create(&path)?; + let _w = niffler::send::get_writer(Box::new(BufWriter::new(file)), format, level)?; + // _w drops here → empty frame finalized, fd released + } + let id = self.entries.len(); + self.entries.push(WriteEntry { + path, + format, + level, + fd: Arc::new(Mutex::new(None)), + logically_closed: false, + }); + Ok(id) + } + + /// Ensure entry `id` has an open fd. Evicts LRU if at capacity. + /// Must be called under pool lock. + fn ensure_open(&mut self, id: usize) -> SKResult<()> { + if self.open.contains(&id) { + return Ok(()); + } + if self.entries[id].logically_closed { + return Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "write to logically closed entry", + ) + .into()); + } + if self.open.len() >= self.max_open { + self.evict_lru()?; + } + let entry = &self.entries[id]; + let file = OpenOptions::new().append(true).open(&entry.path)?; + let writer = + niffler::send::get_writer(Box::new(BufWriter::new(file)), entry.format, entry.level)?; + // Brief fd lock acquisition under pool lock. fd is None here so uncontested. + *self.entries[id].fd.lock().unwrap() = Some(writer); + self.open.put(id, ()); + Ok(()) + } + + /// Evict the least recently used *non-busy* entry. O(1) typical; O(open) worst-case. + /// + /// Uses `try_lock()` so that entries currently being written to are skipped. + /// Returns an error if all open fds are in use. + fn evict_lru(&mut self) -> SKResult<()> { + // iter() yields MRU→LRU; rev() gives LRU→MRU (try least recently used first). + let candidates: Vec = self.open.iter().rev().map(|(&id, _)| id).collect(); + for lru_id in candidates { + let fd_arc = Arc::clone(&self.entries[lru_id].fd); + if let Ok(mut fd_guard) = fd_arc.try_lock() { + if let Some(mut w) = fd_guard.take() { + let _ = w.flush(); // best-effort; drop finalizes Zstd frame + } + self.open.pop(&lru_id); + return Ok(()); + } + // fd locked by a writer thread — skip this candidate + } + Err(std::io::Error::new( + std::io::ErrorKind::ResourceBusy, + "pool saturated: all open fds are currently in use", + ) + .into()) + } +} + +// ── SKFileWriter ──────────────────────────────────────────────────────────────── + +/// Handle to a registered pool entry; owns a local pending write buffer. +/// +/// # Write path (hot path) +/// +/// `write(sk)` serialises into `pending` with **no locking**. +/// When `pending ≥ flush_threshold`, `drain()` is called: +/// +/// 1. Pool lock acquired briefly → `ensure_open(id)` → entry fd lock acquired **under** pool lock. +/// 2. Pool lock released. Entry fd lock still held. +/// 3. Entire `pending` buffer written to fd in one `write_all`. Entry fd lock released. +/// +/// Acquiring the fd lock while the pool lock is still held prevents a race where +/// another thread could evict the just-opened fd between releasing pool lock and +/// acquiring fd lock. `evict_lru()` uses `try_lock()` and will skip an entry +/// whose fd lock is already held. +pub struct SKFileWriter { + id: usize, + pool: Arc>, + path: PathBuf, + pending: Vec, + flush_threshold: usize, + logically_closed: bool, +} + +/// Create a `SKFileWriter` for a new file (Zstd, level 3). +pub fn create_token(pool: &SharedPool, path: PathBuf) -> SKResult { + create_token_with(pool, path, Format::Zstd, Level::Three) +} + +/// Create a `SKFileWriter` for a new file with explicit format and level. +pub fn create_token_with( + pool: &SharedPool, + path: PathBuf, + format: Format, + level: Level, +) -> SKResult { + let id = pool.lock().unwrap().register(path.clone(), format, level)?; + Ok(SKFileWriter { + id, + pool: Arc::clone(pool), + path, + pending: Vec::with_capacity(DEFAULT_FLUSH_THRESHOLD + 128), + flush_threshold: DEFAULT_FLUSH_THRESHOLD, + logically_closed: false, + }) +} + +impl SKFileWriter { + /// Create a standalone file writer (Zstd, level 3). + /// The pool is created internally and is not accessible to the caller. + pub fn create>(path: P) -> SKResult { + Self::create_with(path, Format::Zstd, Level::Three) + } + + /// Create a standalone file writer with explicit format and level. + pub fn create_with>(path: P, format: Format, level: Level) -> SKResult { + create_token_with(global_pool(), path.as_ref().to_owned(), format, level) + } + + /// `true` if the underlying fd is currently open in the pool. + pub fn is_physically_open(&self) -> bool { + self.pool.lock().unwrap().open.contains(&self.id) + } + + /// Accumulate one SuperKmer. Drains to fd when `pending ≥ flush_threshold`. + pub fn write(&mut self, sk: &SuperKmer) -> SKResult<()> { + self.check_not_closed()?; + write_superkmer(&mut self.pending, sk)?; + if self.pending.len() >= self.flush_threshold { + self.drain()?; + } + Ok(()) + } + + /// Accumulate a slice of SuperKmers, draining whenever the threshold is exceeded. + pub fn write_batch(&mut self, sks: &[SuperKmer]) -> SKResult<()> { + self.check_not_closed()?; + for sk in sks { + write_superkmer(&mut self.pending, sk)?; + if self.pending.len() >= self.flush_threshold { + self.drain()?; + } + } + Ok(()) + } + + /// Drain pending bytes to the fd **and** flush the compressor's internal buffer. + pub fn flush(&mut self) -> SKResult<()> { + self.check_not_closed()?; + if self.pending.is_empty() { + return Ok(()); + } + let fd_arc; + let mut fd_guard; + { + let mut pool = self.pool.lock().unwrap(); + pool.ensure_open(self.id)?; + let _ = pool.open.get(&self.id); + fd_arc = Arc::clone(&pool.entries[self.id].fd); + fd_guard = fd_arc.lock().unwrap(); // acquire fd lock under pool lock + // pool drops here → pool lock released, fd lock still held + } + let w = fd_guard.as_mut().expect("fd open after ensure_open"); + w.write_all(&self.pending)?; + w.flush()?; + self.pending.clear(); + Ok(()) + } + + /// Flush pending bytes, finalize Zstd frame, permanently seal this token. + pub fn close(&mut self) -> SKResult<()> { + if self.logically_closed { + return Ok(()); + } + self.logically_closed = true; + + let fd_arc; + let mut fd_guard; + { + let mut pool = self.pool.lock().unwrap(); + let has_pending = !self.pending.is_empty(); + if has_pending { + pool.ensure_open(self.id)?; + } + pool.entries[self.id].logically_closed = true; + pool.open.pop(&self.id); + fd_arc = Arc::clone(&pool.entries[self.id].fd); + fd_guard = fd_arc.lock().unwrap(); // acquire fd lock under pool lock + // pool drops here → pool lock released + } + + if !self.pending.is_empty() { + fd_guard.as_mut().expect("fd open after ensure_open").write_all(&self.pending)?; + self.pending.clear(); + } + if let Some(mut w) = fd_guard.take() { + w.flush()?; + // drop(w) → Zstd frame finalized + } + Ok(()) + } + + /// Adjust the byte threshold at which pending is drained to the fd. + /// Default: `DEFAULT_FLUSH_THRESHOLD` (8 KiB). + pub fn set_flush_threshold(&mut self, bytes: usize) { + self.flush_threshold = bytes; + } + + /// `true` if this token has not been closed. + pub fn is_open(&self) -> bool { + !self.logically_closed + } + + /// Physical path of the file. + pub fn path(&self) -> &Path { + &self.path + } + + // ── private ─────────────────────────────────────────────────────────────── + + fn check_not_closed(&self) -> SKResult<()> { + if self.logically_closed { + Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "write to logically closed SKFileWriter", + ) + .into()) + } else { + Ok(()) + } + } + + /// Drain pending bytes to the fd (no compressor flush). + /// + /// Two-phase locking: + /// 1. Pool lock → ensure_open → promote MRU → acquire entry fd lock (under pool lock). + /// 2. Release pool lock. Write pending under entry fd lock only. + /// + /// Holding fd lock while releasing pool lock prevents eviction of our entry + /// during the write: `evict_lru` uses `try_lock` and will skip us. + fn drain(&mut self) -> SKResult<()> { + debug_assert!(!self.pending.is_empty()); + let fd_arc; + let mut fd_guard; + { + let mut pool = self.pool.lock().unwrap(); + pool.ensure_open(self.id)?; + let _ = pool.open.get(&self.id); // promote to MRU + fd_arc = Arc::clone(&pool.entries[self.id].fd); + fd_guard = fd_arc.lock().unwrap(); // acquire fd lock under pool lock + // pool drops here → pool lock released, fd lock still held + } + fd_guard.as_mut().expect("fd open after ensure_open").write_all(&self.pending)?; + // fd_guard drops → entry fd lock released + self.pending.clear(); + Ok(()) + } +} + +impl Drop for SKFileWriter { + fn drop(&mut self) { + if !self.logically_closed { + let _ = self.close(); + } + } +} + +// ── tests ────────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::reader::SKFileReader; + use obikseq::superkmer::SuperKmer; + use tempfile::{NamedTempFile, TempDir}; + + fn make_sk(seed: usize) -> SuperKmer { + let bases: Vec = (0..8).map(|j| b"ACGT"[(seed + j) % 4]).collect(); + SuperKmer::from_ascii(&bases) + } + + fn pool(max_open: usize) -> SharedPool { + Arc::new(Mutex::new(SKFilePool::new(max_open))) + } + + fn open_token(t: &mut SKFileWriter, sk: &SuperKmer) { + t.set_flush_threshold(1); + t.write(sk).unwrap(); // pending ≥ 1 → drain → fd opened + } + + #[test] + fn creation_holds_no_fd() { + let dir = TempDir::new().unwrap(); + let p = pool(3); + for i in 0..10 { + create_token(&p, dir.path().join(format!("p{i}.zst"))).unwrap(); + } + assert_eq!(p.lock().unwrap().open_count(), 0); + } + + #[test] + fn pool_limits_open_fds() { + let dir = TempDir::new().unwrap(); + let p = pool(3); + let sk = make_sk(0); + + let mut tokens: Vec = (0..6) + .map(|i| create_token(&p, dir.path().join(format!("p{i}.zst"))).unwrap()) + .collect(); + + for t in tokens.iter_mut() { + open_token(t, &sk); + } + + assert!(p.lock().unwrap().open_count() <= 3, "open={}", p.lock().unwrap().open_count()); + } + + #[test] + fn evicted_token_stays_logically_open() { + let dir = TempDir::new().unwrap(); + let p = pool(1); + let sk = make_sk(0); + + let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap(); + let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap(); + + open_token(&mut t0, &sk); // t0 fd open, pool full + open_token(&mut t1, &sk); // evicts t0, t1 fd open + + assert!(t0.is_open(), "t0 must remain logically open after eviction"); + assert_eq!(p.lock().unwrap().open_count(), 1); + } + + #[test] + fn evicted_data_readable_after_close_all() { + let dir = TempDir::new().unwrap(); + let p = pool(1); + let sk = make_sk(0); + + let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap(); + let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap(); + + t0.set_flush_threshold(1); + t0.write(&sk).unwrap(); // t0 fd open, pool full + t1.set_flush_threshold(1); + t1.write(&sk).unwrap(); // evicts t0, t1 fd open + + // t0 still has the record in pending (eviction just closed fd, pending stays in token) + // Actually: t0's pending was drained before drain() returned (drain clears pending). + // So t0 wrote its record, then was evicted (fd closed). + + drop(t0); + drop(t1); + p.lock().unwrap().close_all().unwrap(); + + for name in &["a.zst", "b.zst"] { + let mut r = SKFileReader::open(dir.path().join(name)).unwrap(); + let got = r.read_batch(10).unwrap(); + assert_eq!(got.len(), 1, "{name}: expected 1 record"); + } + } + + #[test] + fn touch_moves_to_mru_so_lru_is_evicted() { + let dir = TempDir::new().unwrap(); + let p = pool(2); + let sk = make_sk(0); + + let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap(); + let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap(); + let mut t2 = create_token(&p, dir.path().join("c.zst")).unwrap(); + + open_token(&mut t0, &sk); // t0 open + open_token(&mut t1, &sk); // t1 open, t0 LRU + + // Write to t0 again → t0 becomes MRU, t1 becomes LRU + t0.set_flush_threshold(1); + t0.write(&sk).unwrap(); + + // Writing to t2 fills pool (cap=2) → evicts LRU = t1 + open_token(&mut t2, &sk); + + let open_count = p.lock().unwrap().open_count(); + assert!(open_count <= 2, "open_count={open_count}"); + } + + #[test] + fn close_all_produces_readable_files() { + let dir = TempDir::new().unwrap(); + let p = pool(8); + let paths: Vec<_> = (0..4).map(|i| dir.path().join(format!("{i}.zst"))).collect(); + + let mut tokens: Vec = + paths.iter().map(|path| create_token(&p, path.clone()).unwrap()).collect(); + + for (i, t) in tokens.iter_mut().enumerate() { + t.write(&make_sk(i)).unwrap(); + } + // close tokens first so pending bytes are flushed and Zstd frames finalized + for t in tokens.iter_mut() { + t.close().unwrap(); + } + p.lock().unwrap().close_all().unwrap(); + + for path in &paths { + let mut r = SKFileReader::open(path).unwrap(); + let got = r.read_batch(10).unwrap(); + assert_eq!(got.len(), 1); + } + } + + #[test] + fn write_batch_roundtrip() { + let dir = TempDir::new().unwrap(); + let p = pool(4); + let sks: Vec<_> = (0..50).map(make_sk).collect(); + let path = dir.path().join("batch.zst"); + + let mut t = create_token(&p, path.clone()).unwrap(); + t.write_batch(&sks).unwrap(); + t.close().unwrap(); + + let mut r = SKFileReader::open(&path).unwrap(); + let got = r.read_batch(100).unwrap(); + assert_eq!(got.len(), 50); + for (a, b) in sks.iter().zip(got.iter()) { + assert_eq!(a.to_ascii(), b.to_ascii()); + } + } + + #[test] + fn from_system_limits_bounded() { + let pool = SKFilePool::from_system_limits(); + assert!(pool.max_open() >= 16); + assert!(pool.max_open() <= MAX_POOL_SIZE); + } + + #[test] + fn standalone_roundtrip_zstd() { + let tmp = NamedTempFile::new().unwrap(); + let sks: Vec<_> = (0..100).map(make_sk).collect(); + { + let mut w = SKFileWriter::create(tmp.path()).unwrap(); + w.write_batch(&sks).unwrap(); + w.close().unwrap(); + } + let mut r = SKFileReader::open(tmp.path()).unwrap(); + let got = r.read_batch(200).unwrap(); + assert_eq!(got.len(), 100); + for (a, b) in sks.iter().zip(got.iter()) { + assert_eq!(a.to_ascii(), b.to_ascii()); + } + } + + #[test] + fn standalone_close_prevents_write() { + let tmp = NamedTempFile::new().unwrap(); + let mut w = SKFileWriter::create(tmp.path()).unwrap(); + w.close().unwrap(); + assert!(!w.is_open()); + assert!(w.write(&make_sk(0)).is_err()); + } + + #[test] + fn standalone_is_physically_open() { + let tmp = NamedTempFile::new().unwrap(); + let mut w = SKFileWriter::create(tmp.path()).unwrap(); + assert!(!w.is_physically_open()); // fd deferred until first drain + w.set_flush_threshold(1); + w.write(&make_sk(0)).unwrap(); // triggers drain → fd opened + assert!(w.is_physically_open()); + w.close().unwrap(); + assert!(!w.is_physically_open()); + } +} diff --git a/src/obiskio/src/reader.rs b/src/obiskio/src/reader.rs new file mode 100644 index 0000000..fe915c4 --- /dev/null +++ b/src/obiskio/src/reader.rs @@ -0,0 +1,203 @@ +use crate::codec::read_superkmer; +use crate::error::{SKError, SKResult}; +use obikseq::superkmer::SuperKmer; +use std::fs::File; +use std::io::BufReader; +use std::path::{Path, PathBuf}; + +/// Binary reader for SuperKmers, with transparent decompression via niffler. +/// +/// Access is strictly sequential. Call [`iter`](SKFileReader::iter) to get an +/// [`Iterator`] over the SuperKmers. +/// +/// The reader also supports LRU-pool eviction and recovery: when physically +/// closed by the pool, it records how many SuperKmers have been consumed so +/// that it can fast-forward on next open. +pub struct SKFileReader { + path: PathBuf, + reader: Option>, + /// Reusable scratch buffer for the `seq` bytes of each record. + seq_buf: Vec, + /// Number of SuperKmers successfully read so far (for eviction recovery). + consumed: u64, +} + +impl SKFileReader { + /// Open a file for reading. Format is auto-detected from magic bytes. + pub fn open>(path: P) -> SKResult { + let path = path.as_ref().to_owned(); + let (reader, _fmt) = niffler::send::get_reader(Box::new(BufReader::new(File::open(&path)?)))?; + Ok(Self { + path, + reader: Some(reader), + seq_buf: Vec::with_capacity(64), + consumed: 0, + }) + } + + /// Read the next SuperKmer, or `None` at EOF. + pub fn read(&mut self) -> SKResult> { + let r = self.reader.as_mut().ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "read from physically closed SKFileReader", + ) + })?; + let result = read_superkmer(r, &mut self.seq_buf)?; + if result.is_some() { + self.consumed += 1; + } + Ok(result) + } + + /// Read up to `n` SuperKmers. + pub fn read_batch(&mut self, n: usize) -> SKResult> { + let mut batch = Vec::with_capacity(n); + for _ in 0..n { + match self.read()? { + Some(sk) => batch.push(sk), + None => break, + } + } + Ok(batch) + } + + /// Close the physical file handle without resetting the consumed counter. + pub fn close(&mut self) { + self.reader = None; + } + + /// `true` if the underlying file handle is currently open. + pub fn is_open(&self) -> bool { + self.reader.is_some() + } + + /// Number of SuperKmers successfully read since the file was opened. + pub fn consumed(&self) -> u64 { + self.consumed + } + + /// Physical path of this file. + pub fn path(&self) -> &Path { + &self.path + } + + /// Return an iterator over this reader. + pub fn iter(&mut self) -> SKFileIter<'_> { + SKFileIter { reader: self, error: None } + } + + // ── pool-internal helpers ───────────────────────────────────────────────── + + /// Reopen after eviction and fast-forward to the last recorded position. + /// Cost is O(`consumed`) decompression; acceptable for sequential pipelines + /// where eviction of read-mode files is rare. + pub fn reopen_and_seek(&mut self) -> SKResult<()> { + debug_assert!(self.reader.is_none(), "reopen_and_seek on open reader"); + let (reader, _fmt) = + niffler::send::get_reader(Box::new(BufReader::new(File::open(&self.path)?)))?; + self.reader = Some(reader); + let target = self.consumed; + self.consumed = 0; + for _ in 0..target { + match read_superkmer(self.reader.as_mut().unwrap(), &mut self.seq_buf)? { + Some(_) => self.consumed += 1, + None => break, + } + } + Ok(()) + } +} + +// ── Iterator ───────────────────────────────────────────────────────────────── + +/// Iterator adapter for [`SKFileReader`]. +/// +/// Errors during iteration are stored and accessible via [`take_error`](SKFileIter::take_error). +pub struct SKFileIter<'a> { + reader: &'a mut SKFileReader, + error: Option, +} + +impl<'a> SKFileIter<'a> { + /// Returns the first I/O error encountered during iteration, if any. + pub fn take_error(&mut self) -> Option { + self.error.take() + } +} + +impl Iterator for SKFileIter<'_> { + type Item = SuperKmer; + + fn next(&mut self) -> Option { + match self.reader.read() { + Ok(Some(sk)) => Some(sk), + Ok(None) => None, + Err(e) => { + self.error = Some(e); + None + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::pool::SKFileWriter; + use tempfile::NamedTempFile; + + fn make_sks(n: usize) -> Vec { + (0..n) + .map(|i| { + let bases: Vec = (0..8).map(|j| b"ACGT"[(i + j) % 4]).collect(); + SuperKmer::from_ascii(&bases) + }) + .collect() + } + + #[test] + fn iter_all() { + let tmp = NamedTempFile::new().unwrap(); + let sks = make_sks(50); + + { + let mut w = SKFileWriter::create(tmp.path()).unwrap(); + w.write_batch(&sks).unwrap(); + } + + let mut r = SKFileReader::open(tmp.path()).unwrap(); + let got: Vec<_> = r.iter().collect(); + assert_eq!(got.len(), 50); + for (a, b) in sks.iter().zip(got.iter()) { + assert_eq!(a.to_ascii(), b.to_ascii()); + } + } + + #[test] + fn reopen_and_seek() { + let tmp = NamedTempFile::new().unwrap(); + let sks = make_sks(20); + + { + let mut w = SKFileWriter::create(tmp.path()).unwrap(); + w.write_batch(&sks).unwrap(); + } + + let mut r = SKFileReader::open(tmp.path()).unwrap(); + // Read 10, then simulate pool eviction + re-access + let first = r.read_batch(10).unwrap(); + r.close(); + r.reopen_and_seek().unwrap(); + // Continue from position 10 + let rest = r.read_batch(20).unwrap(); + assert_eq!(first.len(), 10); + assert_eq!(rest.len(), 10); + for (a, b) in sks[..10].iter().zip(first.iter()) { + assert_eq!(a.to_ascii(), b.to_ascii()); + } + for (a, b) in sks[10..].iter().zip(rest.iter()) { + assert_eq!(a.to_ascii(), b.to_ascii()); + } + } +}