From 52fd2cf801c88bd40ec0ff6505c7d713ced0d2cf Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 12 Jun 2026 15:18:37 +0200 Subject: [PATCH] feat: enhance memory budgeting and add rebuild diagnostics This commit improves memory management by respecting Linux cgroup v1/v2 limits and introduces a configurable memory budget for the new `rebuild` subcommand to prevent OOM during index reconstruction. The rebuild process now supports filtering, compaction, and parallelization. Diagnostic capabilities are expanded with debug-level tracing for partition merges, k-mer expansion tracking, and utility flags for label renaming, matrix size breakdowns, per-genome counts, and partition distribution reporting. Accessor methods for active and remaining memory have also been added to the stats struct. --- README.md | 17 ++++++++-- src/obikindex/src/merge.rs | 28 +++++++++++++--- src/obisys/src/lib.rs | 68 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 104 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 34d557a..0fb073e 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,13 @@ Non-ACGT characters act as hard breaks between k-mer segments in all formats. Runs scatter → dereplicate → count → layered MPHF. Resumes automatically if interrupted. merge Merge multiple independently built indexes into one. - rebuild Filter and compact an existing index: apply count thresholds, + Schedules partitions largest-first under a memory budget semaphore + to avoid OOM on machines with many cores. The worst partition runs + alone first to calibrate the expansion estimator; subsequent + partitions run in parallel within the budget. + --budget-fraction F fraction of available RAM to use as budget + (default 0.5; reduce if OOM persists). + filter Filter and compact an existing index: apply count thresholds, drop layers, rewrite as a single-layer index. reindex Convert evidence in-place across all layers: exact (evidence.bin) ↔ approximate (fingerprint.bin). @@ -74,7 +80,14 @@ Non-ACGT characters act as hard breaks between k-mer segments in all formats. Diagnostic / pipeline use. unitig Dump the unitig sequences stored in a built index. Debug use. utils Miscellaneous utilities. - --new-label NEW=OLD renames a genome label in-place. + --new-label NEW=OLD rename a genome label in-place. + --bits-per-kmer print MPHF / evidence / matrix size breakdown. + --stats per-genome k-mer counts as CSV. + --partition-stats partition size distribution across one or more + indexes (markdown report to stdout). Useful to + diagnose minimizer imbalance before a large merge. + --csv FILE write per-(partition, source) raw data to FILE + (used with --partition-stats). ## Quick start diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index 108098b..0857013 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex}; use obisys::{MemoryBudget, Reporter, Stage, available_memory_bytes, progress_bar, spinner}; use rayon::prelude::*; -use tracing::info; +use tracing::{debug, info}; use obilayeredmap::IndexMode; @@ -250,6 +250,15 @@ impl KmerIndex { let cost = ubytes * exp / 1000; budget.acquire(cost); + debug!( + "partition {i}: start — est. {} ({:.2}×), \ + {} workers active, {} budget remaining", + fmt_bytes(cost), + exp as f64 / 1000.0, + budget.active(), + fmt_bytes(budget.remaining()), + ); + let result = dst_partition .merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence); budget.release(cost); @@ -257,10 +266,19 @@ impl KmerIndex { match result { Ok(g_len) => { - if ubytes > 0 { - let actual = g_len as u64 * 16 * 1000 / ubytes; - max_expansion.fetch_max(actual, Ordering::Relaxed); - } + let actual_exp = if ubytes > 0 { + g_len as u64 * 16 * 1000 / ubytes + } else { + 0 + }; + max_expansion.fetch_max(actual_exp, Ordering::Relaxed); + debug!( + "partition {i}: done — {} new kmers, actual {:.2}× \ + (estimated {:.2}×)", + g_len, + actual_exp as f64 / 1000.0, + exp as f64 / 1000.0, + ); part_stats.lock().unwrap().push(PartStat { id: i, unitig_bytes: ubytes, diff --git a/src/obisys/src/lib.rs b/src/obisys/src/lib.rs index 2d800d2..bf2e678 100644 --- a/src/obisys/src/lib.rs +++ b/src/obisys/src/lib.rs @@ -111,16 +111,78 @@ use sysinfo::System; // ── Memory query ────────────────────────────────────────────────────────────── -/// Returns the number of bytes available for allocation on this machine. +/// Returns the number of bytes available for allocation in the current process context. +/// +/// On Linux, cgroup memory limits (SLURM, containers) are checked first: the +/// process may be constrained to far less than the host's available RAM. +/// Returns `min(cgroup_available, host_available)` when a finite limit is found. /// /// On macOS, `available_memory()` can return 0 when the memory compressor /// inflates the page count; in that case we fall back to half of total memory. pub fn available_memory_bytes() -> u64 { let sys = System::new_all(); - match sys.available_memory() { + let host_avail = match sys.available_memory() { 0 => sys.total_memory() / 2, n => n, + }; + #[cfg(target_os = "linux")] + if let Some(cg) = cgroup_v2_available().or_else(cgroup_v1_available) { + return cg.min(host_avail); } + host_avail +} + +/// cgroup v2 (unified hierarchy): reads memory.max and memory.current for the +/// current process's cgroup. Returns None if unlimited or on any parse error. +#[cfg(target_os = "linux")] +fn cgroup_v2_available() -> Option { + let cgroup = std::fs::read_to_string("/proc/self/cgroup").ok()?; + let rel = cgroup + .lines() + .find(|l| l.starts_with("0::"))? + .strip_prefix("0::")? + .trim(); + let base = format!("/sys/fs/cgroup{rel}"); + // "max" means no limit → parse::() fails → None + let limit: u64 = std::fs::read_to_string(format!("{base}/memory.max")) + .ok()? + .trim() + .parse() + .ok()?; + let used: u64 = std::fs::read_to_string(format!("{base}/memory.current")) + .ok()? + .trim() + .parse() + .ok()?; + Some(limit.saturating_sub(used)) +} + +/// cgroup v1 (memory subsystem): reads memory.limit_in_bytes and +/// memory.usage_in_bytes. Returns None if unlimited or on any parse error. +#[cfg(target_os = "linux")] +fn cgroup_v1_available() -> Option { + let cgroup = std::fs::read_to_string("/proc/self/cgroup").ok()?; + let path = cgroup + .lines() + .find(|l| l.contains(":memory:"))? + .split(':') + .nth(2)?; + let base = format!("/sys/fs/cgroup/memory{path}"); + let limit: u64 = std::fs::read_to_string(format!("{base}/memory.limit_in_bytes")) + .ok()? + .trim() + .parse() + .ok()?; + // Kernel uses 2^63 (rounded to page) as "no limit" sentinel + if limit > (1u64 << 62) { + return None; + } + let used: u64 = std::fs::read_to_string(format!("{base}/memory.usage_in_bytes")) + .ok()? + .trim() + .parse() + .ok()?; + Some(limit.saturating_sub(used)) } // ── raw helpers ─────────────────────────────────────────────────────────────── @@ -359,6 +421,8 @@ impl MemoryBudget { } pub fn total(&self) -> u64 { self.total } + pub fn active(&self) -> usize { self.inner.lock().unwrap().active } + pub fn remaining(&self) -> u64 { self.inner.lock().unwrap().remaining } pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active } }