feat: enhance memory budgeting and add rebuild diagnostics

This commit improves memory management by respecting Linux cgroup v1/v2 limits and introduces a configurable memory budget for the new `rebuild` subcommand to prevent OOM during index reconstruction. The rebuild process now supports filtering, compaction, and parallelization. Diagnostic capabilities are expanded with debug-level tracing for partition merges, k-mer expansion tracking, and utility flags for label renaming, matrix size breakdowns, per-genome counts, and partition distribution reporting. Accessor methods for active and remaining memory have also been added to the stats struct.
This commit is contained in:
Eric Coissac
2026-06-12 15:18:37 +02:00
parent 97e3fb9761
commit 52fd2cf801
3 changed files with 104 additions and 9 deletions
+15 -2
View File
@@ -51,7 +51,13 @@ Non-ACGT characters act as hard breaks between k-mer segments in all formats.
Runs scatter → dereplicate → count → layered MPHF. Runs scatter → dereplicate → count → layered MPHF.
Resumes automatically if interrupted. Resumes automatically if interrupted.
merge Merge multiple independently built indexes into one. merge Merge multiple independently built indexes into one.
rebuild Filter and compact an existing index: apply count thresholds, Schedules partitions largest-first under a memory budget semaphore
to avoid OOM on machines with many cores. The worst partition runs
alone first to calibrate the expansion estimator; subsequent
partitions run in parallel within the budget.
--budget-fraction F fraction of available RAM to use as budget
(default 0.5; reduce if OOM persists).
filter Filter and compact an existing index: apply count thresholds,
drop layers, rewrite as a single-layer index. drop layers, rewrite as a single-layer index.
reindex Convert evidence in-place across all layers: reindex Convert evidence in-place across all layers:
exact (evidence.bin) ↔ approximate (fingerprint.bin). exact (evidence.bin) ↔ approximate (fingerprint.bin).
@@ -74,7 +80,14 @@ Non-ACGT characters act as hard breaks between k-mer segments in all formats.
Diagnostic / pipeline use. Diagnostic / pipeline use.
unitig Dump the unitig sequences stored in a built index. Debug use. unitig Dump the unitig sequences stored in a built index. Debug use.
utils Miscellaneous utilities. utils Miscellaneous utilities.
--new-label NEW=OLD renames a genome label in-place. --new-label NEW=OLD rename a genome label in-place.
--bits-per-kmer print MPHF / evidence / matrix size breakdown.
--stats per-genome k-mer counts as CSV.
--partition-stats partition size distribution across one or more
indexes (markdown report to stdout). Useful to
diagnose minimizer imbalance before a large merge.
--csv FILE write per-(partition, source) raw data to FILE
(used with --partition-stats).
## Quick start ## Quick start
+23 -5
View File
@@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex};
use obisys::{MemoryBudget, Reporter, Stage, available_memory_bytes, progress_bar, spinner}; use obisys::{MemoryBudget, Reporter, Stage, available_memory_bytes, progress_bar, spinner};
use rayon::prelude::*; use rayon::prelude::*;
use tracing::info; use tracing::{debug, info};
use obilayeredmap::IndexMode; use obilayeredmap::IndexMode;
@@ -250,6 +250,15 @@ impl KmerIndex {
let cost = ubytes * exp / 1000; let cost = ubytes * exp / 1000;
budget.acquire(cost); budget.acquire(cost);
debug!(
"partition {i}: start — est. {} ({:.2}×), \
{} workers active, {} budget remaining",
fmt_bytes(cost),
exp as f64 / 1000.0,
budget.active(),
fmt_bytes(budget.remaining()),
);
let result = dst_partition let result = dst_partition
.merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence); .merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence);
budget.release(cost); budget.release(cost);
@@ -257,10 +266,19 @@ impl KmerIndex {
match result { match result {
Ok(g_len) => { Ok(g_len) => {
if ubytes > 0 { let actual_exp = if ubytes > 0 {
let actual = g_len as u64 * 16 * 1000 / ubytes; g_len as u64 * 16 * 1000 / ubytes
max_expansion.fetch_max(actual, Ordering::Relaxed); } else {
} 0
};
max_expansion.fetch_max(actual_exp, Ordering::Relaxed);
debug!(
"partition {i}: done — {} new kmers, actual {:.2}× \
(estimated {:.2}×)",
g_len,
actual_exp as f64 / 1000.0,
exp as f64 / 1000.0,
);
part_stats.lock().unwrap().push(PartStat { part_stats.lock().unwrap().push(PartStat {
id: i, id: i,
unitig_bytes: ubytes, unitig_bytes: ubytes,
+66 -2
View File
@@ -111,16 +111,78 @@ use sysinfo::System;
// ── Memory query ────────────────────────────────────────────────────────────── // ── Memory query ──────────────────────────────────────────────────────────────
/// Returns the number of bytes available for allocation on this machine. /// Returns the number of bytes available for allocation in the current process context.
///
/// On Linux, cgroup memory limits (SLURM, containers) are checked first: the
/// process may be constrained to far less than the host's available RAM.
/// Returns `min(cgroup_available, host_available)` when a finite limit is found.
/// ///
/// On macOS, `available_memory()` can return 0 when the memory compressor /// On macOS, `available_memory()` can return 0 when the memory compressor
/// inflates the page count; in that case we fall back to half of total memory. /// inflates the page count; in that case we fall back to half of total memory.
pub fn available_memory_bytes() -> u64 { pub fn available_memory_bytes() -> u64 {
let sys = System::new_all(); let sys = System::new_all();
match sys.available_memory() { let host_avail = match sys.available_memory() {
0 => sys.total_memory() / 2, 0 => sys.total_memory() / 2,
n => n, n => n,
};
#[cfg(target_os = "linux")]
if let Some(cg) = cgroup_v2_available().or_else(cgroup_v1_available) {
return cg.min(host_avail);
} }
host_avail
}
/// cgroup v2 (unified hierarchy): reads memory.max and memory.current for the
/// current process's cgroup. Returns None if unlimited or on any parse error.
#[cfg(target_os = "linux")]
fn cgroup_v2_available() -> Option<u64> {
let cgroup = std::fs::read_to_string("/proc/self/cgroup").ok()?;
let rel = cgroup
.lines()
.find(|l| l.starts_with("0::"))?
.strip_prefix("0::")?
.trim();
let base = format!("/sys/fs/cgroup{rel}");
// "max" means no limit → parse::<u64>() fails → None
let limit: u64 = std::fs::read_to_string(format!("{base}/memory.max"))
.ok()?
.trim()
.parse()
.ok()?;
let used: u64 = std::fs::read_to_string(format!("{base}/memory.current"))
.ok()?
.trim()
.parse()
.ok()?;
Some(limit.saturating_sub(used))
}
/// cgroup v1 (memory subsystem): reads memory.limit_in_bytes and
/// memory.usage_in_bytes. Returns None if unlimited or on any parse error.
#[cfg(target_os = "linux")]
fn cgroup_v1_available() -> Option<u64> {
let cgroup = std::fs::read_to_string("/proc/self/cgroup").ok()?;
let path = cgroup
.lines()
.find(|l| l.contains(":memory:"))?
.split(':')
.nth(2)?;
let base = format!("/sys/fs/cgroup/memory{path}");
let limit: u64 = std::fs::read_to_string(format!("{base}/memory.limit_in_bytes"))
.ok()?
.trim()
.parse()
.ok()?;
// Kernel uses 2^63 (rounded to page) as "no limit" sentinel
if limit > (1u64 << 62) {
return None;
}
let used: u64 = std::fs::read_to_string(format!("{base}/memory.usage_in_bytes"))
.ok()?
.trim()
.parse()
.ok()?;
Some(limit.saturating_sub(used))
} }
// ── raw helpers ─────────────────────────────────────────────────────────────── // ── raw helpers ───────────────────────────────────────────────────────────────
@@ -359,6 +421,8 @@ impl MemoryBudget {
} }
pub fn total(&self) -> u64 { self.total } pub fn total(&self) -> u64 { self.total }
pub fn active(&self) -> usize { self.inner.lock().unwrap().active }
pub fn remaining(&self) -> u64 { self.inner.lock().unwrap().remaining }
pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active } pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active }
} }