Files
obikmer/src/obiskio/src/pool.rs
T
Eric Coissac c09d17401d + obiskio: add binary I/O with LRU pool and compression
- Add new obiskio crate for high-performance SuperKmer serialization/deserialization
- Implement binary codec with 2-bit packed sequence encoding and raw header format (32 bits)
- Add transparent compression support via niffler: Zstd, Gzip/Bgzf/Lz4
- Implement SKFilePool with LRU-based fd management, max-concurrent-fd limiting (75% of ulimit)
- Add SKFileWriter with batched writes, configurable flush threshold (8 KiB default), and two-phase locking
- Add SKFileReader with sequential access, LRU recovery via reopen_and_seek()
+ New obikpartitionner crate: basic header/seq handling for binary super-kmer format
- Bump niffler from 2.7 to v3, add dependencies: allocator-api2, bitflags(>=1), errno/fastrand/rustix/tempfile/lru/hashbrown/bzip2/thiserror
- Update workspace members to include obikpartitionner andobiskio
2026-04-25 14:15:01 +02:00

616 lines
22 KiB
Rust

use crate::codec::write_superkmer;
use crate::error::SKResult;
use crate::limits::max_concurrent_files;
use lru::LruCache;
use niffler::send::compression::Format;
use niffler::Level;
use obikseq::superkmer::SuperKmer;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, OnceLock};
/// Hard upper bound on pool size regardless of OS fd limit.
pub const MAX_POOL_SIZE: usize = 512;
/// Default pending buffer threshold (bytes) before draining to the physical fd.
pub const DEFAULT_FLUSH_THRESHOLD: usize = 8 * 1024;
// Convenient alias for the per-entry physical writer slot.
type PhysWriter = Option<Box<dyn Write + Send>>;
// ── WriteEntry ─────────────────────────────────────────────────────────────────
struct WriteEntry {
path: PathBuf,
format: Format,
level: Level,
/// Per-entry mutex for the physical fd.
/// Independent of the pool mutex to allow parallel writes to different entries.
fd: Arc<Mutex<PhysWriter>>,
logically_closed: bool,
}
// ── SKFilePool ─────────────────────────────────────────────────────────────────
/// LRU pool of open write file descriptors.
///
/// # Locking model
///
/// Two independent locks:
///
/// | Lock | Scope | Held during |
/// |---|---|---|
/// | `Arc<Mutex<SKFilePool>>` (pool lock) | all entries | LRU management only — microseconds |
/// | `Arc<Mutex<PhysWriter>>` (entry lock) | one entry | Writing a pending buffer to the fd |
///
/// **Pool lock is never held while writing data.** This allows parallel writes
/// to different partitions.
///
/// Lock ordering: always pool lock → entry lock, never reverse.
///
/// # Eviction
///
/// `evict_lru()` uses `try_lock()` on entry fd locks.
/// An entry whose fd is currently being written to is skipped; the next LRU
/// candidate is tried instead. If all open fds are in use, an error is returned.
pub struct SKFilePool {
max_open: usize,
/// All registered entries. Index = stable token id. Never shrinks.
entries: Vec<WriteEntry>,
/// IDs of entries currently holding an open fd, in LRU order.
///
/// Invariant: `id ∈ open ↔ entries[id].fd.lock().is_some()`
open: LruCache<usize, ()>,
}
/// Shared reference to a pool; the primary way to create `SKFileWriter`s.
pub type SharedPool = Arc<Mutex<SKFilePool>>;
static GLOBAL_POOL: OnceLock<SharedPool> = OnceLock::new();
fn global_pool() -> &'static SharedPool {
GLOBAL_POOL.get_or_init(|| Arc::new(Mutex::new(SKFilePool::from_system_limits())))
}
impl SKFilePool {
/// Create a pool allowing at most `max_open` simultaneously open fds.
pub fn new(max_open: usize) -> Self {
let cap = NonZeroUsize::new(max_open.max(1)).unwrap();
Self { max_open, entries: Vec::new(), open: LruCache::new(cap) }
}
/// Derive pool size from the OS fd limit (75 %, clamped to `[16, MAX_POOL_SIZE]`).
pub fn from_system_limits() -> Self {
let fd_limit = max_concurrent_files().unwrap_or(256) as usize;
let max_open = (fd_limit * 3 / 4).clamp(16, MAX_POOL_SIZE);
Self::new(max_open)
}
pub fn max_open(&self) -> usize {
self.max_open
}
/// Total number of registered entries (open + evicted).
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
/// Number of currently open fds — O(1).
pub fn open_count(&self) -> usize {
self.open.len()
}
/// Flush and close all registered entries. O(n), one-time.
///
/// Call only after all concurrent drains have completed (e.g., after joining
/// all write threads). Tokens with unflushed pending must be flushed or closed
/// before this call, or their data will be lost.
pub fn close_all(&mut self) -> SKResult<()> {
for entry in self.entries.iter_mut() {
entry.logically_closed = true;
let mut fd_guard = entry.fd.lock().unwrap();
if let Some(mut w) = fd_guard.take() {
w.flush()?;
// drop(w) → Zstd frame footer written
}
}
self.open.clear();
Ok(())
}
// ── private ───────────────────────────────────────────────────────────────
/// Create file on disk (empty Zstd frame, fd immediately closed), register entry.
fn register(&mut self, path: PathBuf, format: Format, level: Level) -> SKResult<usize> {
{
let file = File::create(&path)?;
let _w = niffler::send::get_writer(Box::new(BufWriter::new(file)), format, level)?;
// _w drops here → empty frame finalized, fd released
}
let id = self.entries.len();
self.entries.push(WriteEntry {
path,
format,
level,
fd: Arc::new(Mutex::new(None)),
logically_closed: false,
});
Ok(id)
}
/// Ensure entry `id` has an open fd. Evicts LRU if at capacity.
/// Must be called under pool lock.
fn ensure_open(&mut self, id: usize) -> SKResult<()> {
if self.open.contains(&id) {
return Ok(());
}
if self.entries[id].logically_closed {
return Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"write to logically closed entry",
)
.into());
}
if self.open.len() >= self.max_open {
self.evict_lru()?;
}
let entry = &self.entries[id];
let file = OpenOptions::new().append(true).open(&entry.path)?;
let writer =
niffler::send::get_writer(Box::new(BufWriter::new(file)), entry.format, entry.level)?;
// Brief fd lock acquisition under pool lock. fd is None here so uncontested.
*self.entries[id].fd.lock().unwrap() = Some(writer);
self.open.put(id, ());
Ok(())
}
/// Evict the least recently used *non-busy* entry. O(1) typical; O(open) worst-case.
///
/// Uses `try_lock()` so that entries currently being written to are skipped.
/// Returns an error if all open fds are in use.
fn evict_lru(&mut self) -> SKResult<()> {
// iter() yields MRU→LRU; rev() gives LRU→MRU (try least recently used first).
let candidates: Vec<usize> = self.open.iter().rev().map(|(&id, _)| id).collect();
for lru_id in candidates {
let fd_arc = Arc::clone(&self.entries[lru_id].fd);
if let Ok(mut fd_guard) = fd_arc.try_lock() {
if let Some(mut w) = fd_guard.take() {
let _ = w.flush(); // best-effort; drop finalizes Zstd frame
}
self.open.pop(&lru_id);
return Ok(());
}
// fd locked by a writer thread — skip this candidate
}
Err(std::io::Error::new(
std::io::ErrorKind::ResourceBusy,
"pool saturated: all open fds are currently in use",
)
.into())
}
}
// ── SKFileWriter ────────────────────────────────────────────────────────────────
/// Handle to a registered pool entry; owns a local pending write buffer.
///
/// # Write path (hot path)
///
/// `write(sk)` serialises into `pending` with **no locking**.
/// When `pending ≥ flush_threshold`, `drain()` is called:
///
/// 1. Pool lock acquired briefly → `ensure_open(id)` → entry fd lock acquired **under** pool lock.
/// 2. Pool lock released. Entry fd lock still held.
/// 3. Entire `pending` buffer written to fd in one `write_all`. Entry fd lock released.
///
/// Acquiring the fd lock while the pool lock is still held prevents a race where
/// another thread could evict the just-opened fd between releasing pool lock and
/// acquiring fd lock. `evict_lru()` uses `try_lock()` and will skip an entry
/// whose fd lock is already held.
pub struct SKFileWriter {
id: usize,
pool: Arc<Mutex<SKFilePool>>,
path: PathBuf,
pending: Vec<u8>,
flush_threshold: usize,
logically_closed: bool,
}
/// Create a `SKFileWriter` for a new file (Zstd, level 3).
pub fn create_token(pool: &SharedPool, path: PathBuf) -> SKResult<SKFileWriter> {
create_token_with(pool, path, Format::Zstd, Level::Three)
}
/// Create a `SKFileWriter` for a new file with explicit format and level.
pub fn create_token_with(
pool: &SharedPool,
path: PathBuf,
format: Format,
level: Level,
) -> SKResult<SKFileWriter> {
let id = pool.lock().unwrap().register(path.clone(), format, level)?;
Ok(SKFileWriter {
id,
pool: Arc::clone(pool),
path,
pending: Vec::with_capacity(DEFAULT_FLUSH_THRESHOLD + 128),
flush_threshold: DEFAULT_FLUSH_THRESHOLD,
logically_closed: false,
})
}
impl SKFileWriter {
/// Create a standalone file writer (Zstd, level 3).
/// The pool is created internally and is not accessible to the caller.
pub fn create<P: AsRef<Path>>(path: P) -> SKResult<Self> {
Self::create_with(path, Format::Zstd, Level::Three)
}
/// Create a standalone file writer with explicit format and level.
pub fn create_with<P: AsRef<Path>>(path: P, format: Format, level: Level) -> SKResult<Self> {
create_token_with(global_pool(), path.as_ref().to_owned(), format, level)
}
/// `true` if the underlying fd is currently open in the pool.
pub fn is_physically_open(&self) -> bool {
self.pool.lock().unwrap().open.contains(&self.id)
}
/// Accumulate one SuperKmer. Drains to fd when `pending ≥ flush_threshold`.
pub fn write(&mut self, sk: &SuperKmer) -> SKResult<()> {
self.check_not_closed()?;
write_superkmer(&mut self.pending, sk)?;
if self.pending.len() >= self.flush_threshold {
self.drain()?;
}
Ok(())
}
/// Accumulate a slice of SuperKmers, draining whenever the threshold is exceeded.
pub fn write_batch(&mut self, sks: &[SuperKmer]) -> SKResult<()> {
self.check_not_closed()?;
for sk in sks {
write_superkmer(&mut self.pending, sk)?;
if self.pending.len() >= self.flush_threshold {
self.drain()?;
}
}
Ok(())
}
/// Drain pending bytes to the fd **and** flush the compressor's internal buffer.
pub fn flush(&mut self) -> SKResult<()> {
self.check_not_closed()?;
if self.pending.is_empty() {
return Ok(());
}
let fd_arc;
let mut fd_guard;
{
let mut pool = self.pool.lock().unwrap();
pool.ensure_open(self.id)?;
let _ = pool.open.get(&self.id);
fd_arc = Arc::clone(&pool.entries[self.id].fd);
fd_guard = fd_arc.lock().unwrap(); // acquire fd lock under pool lock
// pool drops here → pool lock released, fd lock still held
}
let w = fd_guard.as_mut().expect("fd open after ensure_open");
w.write_all(&self.pending)?;
w.flush()?;
self.pending.clear();
Ok(())
}
/// Flush pending bytes, finalize Zstd frame, permanently seal this token.
pub fn close(&mut self) -> SKResult<()> {
if self.logically_closed {
return Ok(());
}
self.logically_closed = true;
let fd_arc;
let mut fd_guard;
{
let mut pool = self.pool.lock().unwrap();
let has_pending = !self.pending.is_empty();
if has_pending {
pool.ensure_open(self.id)?;
}
pool.entries[self.id].logically_closed = true;
pool.open.pop(&self.id);
fd_arc = Arc::clone(&pool.entries[self.id].fd);
fd_guard = fd_arc.lock().unwrap(); // acquire fd lock under pool lock
// pool drops here → pool lock released
}
if !self.pending.is_empty() {
fd_guard.as_mut().expect("fd open after ensure_open").write_all(&self.pending)?;
self.pending.clear();
}
if let Some(mut w) = fd_guard.take() {
w.flush()?;
// drop(w) → Zstd frame finalized
}
Ok(())
}
/// Adjust the byte threshold at which pending is drained to the fd.
/// Default: `DEFAULT_FLUSH_THRESHOLD` (8 KiB).
pub fn set_flush_threshold(&mut self, bytes: usize) {
self.flush_threshold = bytes;
}
/// `true` if this token has not been closed.
pub fn is_open(&self) -> bool {
!self.logically_closed
}
/// Physical path of the file.
pub fn path(&self) -> &Path {
&self.path
}
// ── private ───────────────────────────────────────────────────────────────
fn check_not_closed(&self) -> SKResult<()> {
if self.logically_closed {
Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"write to logically closed SKFileWriter",
)
.into())
} else {
Ok(())
}
}
/// Drain pending bytes to the fd (no compressor flush).
///
/// Two-phase locking:
/// 1. Pool lock → ensure_open → promote MRU → acquire entry fd lock (under pool lock).
/// 2. Release pool lock. Write pending under entry fd lock only.
///
/// Holding fd lock while releasing pool lock prevents eviction of our entry
/// during the write: `evict_lru` uses `try_lock` and will skip us.
fn drain(&mut self) -> SKResult<()> {
debug_assert!(!self.pending.is_empty());
let fd_arc;
let mut fd_guard;
{
let mut pool = self.pool.lock().unwrap();
pool.ensure_open(self.id)?;
let _ = pool.open.get(&self.id); // promote to MRU
fd_arc = Arc::clone(&pool.entries[self.id].fd);
fd_guard = fd_arc.lock().unwrap(); // acquire fd lock under pool lock
// pool drops here → pool lock released, fd lock still held
}
fd_guard.as_mut().expect("fd open after ensure_open").write_all(&self.pending)?;
// fd_guard drops → entry fd lock released
self.pending.clear();
Ok(())
}
}
impl Drop for SKFileWriter {
fn drop(&mut self) {
if !self.logically_closed {
let _ = self.close();
}
}
}
// ── tests ──────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use crate::reader::SKFileReader;
use obikseq::superkmer::SuperKmer;
use tempfile::{NamedTempFile, TempDir};
fn make_sk(seed: usize) -> SuperKmer {
let bases: Vec<u8> = (0..8).map(|j| b"ACGT"[(seed + j) % 4]).collect();
SuperKmer::from_ascii(&bases)
}
fn pool(max_open: usize) -> SharedPool {
Arc::new(Mutex::new(SKFilePool::new(max_open)))
}
fn open_token(t: &mut SKFileWriter, sk: &SuperKmer) {
t.set_flush_threshold(1);
t.write(sk).unwrap(); // pending ≥ 1 → drain → fd opened
}
#[test]
fn creation_holds_no_fd() {
let dir = TempDir::new().unwrap();
let p = pool(3);
for i in 0..10 {
create_token(&p, dir.path().join(format!("p{i}.zst"))).unwrap();
}
assert_eq!(p.lock().unwrap().open_count(), 0);
}
#[test]
fn pool_limits_open_fds() {
let dir = TempDir::new().unwrap();
let p = pool(3);
let sk = make_sk(0);
let mut tokens: Vec<SKFileWriter> = (0..6)
.map(|i| create_token(&p, dir.path().join(format!("p{i}.zst"))).unwrap())
.collect();
for t in tokens.iter_mut() {
open_token(t, &sk);
}
assert!(p.lock().unwrap().open_count() <= 3, "open={}", p.lock().unwrap().open_count());
}
#[test]
fn evicted_token_stays_logically_open() {
let dir = TempDir::new().unwrap();
let p = pool(1);
let sk = make_sk(0);
let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap();
let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap();
open_token(&mut t0, &sk); // t0 fd open, pool full
open_token(&mut t1, &sk); // evicts t0, t1 fd open
assert!(t0.is_open(), "t0 must remain logically open after eviction");
assert_eq!(p.lock().unwrap().open_count(), 1);
}
#[test]
fn evicted_data_readable_after_close_all() {
let dir = TempDir::new().unwrap();
let p = pool(1);
let sk = make_sk(0);
let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap();
let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap();
t0.set_flush_threshold(1);
t0.write(&sk).unwrap(); // t0 fd open, pool full
t1.set_flush_threshold(1);
t1.write(&sk).unwrap(); // evicts t0, t1 fd open
// t0 still has the record in pending (eviction just closed fd, pending stays in token)
// Actually: t0's pending was drained before drain() returned (drain clears pending).
// So t0 wrote its record, then was evicted (fd closed).
drop(t0);
drop(t1);
p.lock().unwrap().close_all().unwrap();
for name in &["a.zst", "b.zst"] {
let mut r = SKFileReader::open(dir.path().join(name)).unwrap();
let got = r.read_batch(10).unwrap();
assert_eq!(got.len(), 1, "{name}: expected 1 record");
}
}
#[test]
fn touch_moves_to_mru_so_lru_is_evicted() {
let dir = TempDir::new().unwrap();
let p = pool(2);
let sk = make_sk(0);
let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap();
let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap();
let mut t2 = create_token(&p, dir.path().join("c.zst")).unwrap();
open_token(&mut t0, &sk); // t0 open
open_token(&mut t1, &sk); // t1 open, t0 LRU
// Write to t0 again → t0 becomes MRU, t1 becomes LRU
t0.set_flush_threshold(1);
t0.write(&sk).unwrap();
// Writing to t2 fills pool (cap=2) → evicts LRU = t1
open_token(&mut t2, &sk);
let open_count = p.lock().unwrap().open_count();
assert!(open_count <= 2, "open_count={open_count}");
}
#[test]
fn close_all_produces_readable_files() {
let dir = TempDir::new().unwrap();
let p = pool(8);
let paths: Vec<_> = (0..4).map(|i| dir.path().join(format!("{i}.zst"))).collect();
let mut tokens: Vec<SKFileWriter> =
paths.iter().map(|path| create_token(&p, path.clone()).unwrap()).collect();
for (i, t) in tokens.iter_mut().enumerate() {
t.write(&make_sk(i)).unwrap();
}
// close tokens first so pending bytes are flushed and Zstd frames finalized
for t in tokens.iter_mut() {
t.close().unwrap();
}
p.lock().unwrap().close_all().unwrap();
for path in &paths {
let mut r = SKFileReader::open(path).unwrap();
let got = r.read_batch(10).unwrap();
assert_eq!(got.len(), 1);
}
}
#[test]
fn write_batch_roundtrip() {
let dir = TempDir::new().unwrap();
let p = pool(4);
let sks: Vec<_> = (0..50).map(make_sk).collect();
let path = dir.path().join("batch.zst");
let mut t = create_token(&p, path.clone()).unwrap();
t.write_batch(&sks).unwrap();
t.close().unwrap();
let mut r = SKFileReader::open(&path).unwrap();
let got = r.read_batch(100).unwrap();
assert_eq!(got.len(), 50);
for (a, b) in sks.iter().zip(got.iter()) {
assert_eq!(a.to_ascii(), b.to_ascii());
}
}
#[test]
fn from_system_limits_bounded() {
let pool = SKFilePool::from_system_limits();
assert!(pool.max_open() >= 16);
assert!(pool.max_open() <= MAX_POOL_SIZE);
}
#[test]
fn standalone_roundtrip_zstd() {
let tmp = NamedTempFile::new().unwrap();
let sks: Vec<_> = (0..100).map(make_sk).collect();
{
let mut w = SKFileWriter::create(tmp.path()).unwrap();
w.write_batch(&sks).unwrap();
w.close().unwrap();
}
let mut r = SKFileReader::open(tmp.path()).unwrap();
let got = r.read_batch(200).unwrap();
assert_eq!(got.len(), 100);
for (a, b) in sks.iter().zip(got.iter()) {
assert_eq!(a.to_ascii(), b.to_ascii());
}
}
#[test]
fn standalone_close_prevents_write() {
let tmp = NamedTempFile::new().unwrap();
let mut w = SKFileWriter::create(tmp.path()).unwrap();
w.close().unwrap();
assert!(!w.is_open());
assert!(w.write(&make_sk(0)).is_err());
}
#[test]
fn standalone_is_physically_open() {
let tmp = NamedTempFile::new().unwrap();
let mut w = SKFileWriter::create(tmp.path()).unwrap();
assert!(!w.is_physically_open()); // fd deferred until first drain
w.set_flush_threshold(1);
w.write(&make_sk(0)).unwrap(); // triggers drain → fd opened
assert!(w.is_physically_open());
w.close().unwrap();
assert!(!w.is_physically_open());
}
}