diff --git a/docmd/implementation/merge_parallelism.md b/docmd/implementation/merge_parallelism.md new file mode 100644 index 0000000..fddd40e --- /dev/null +++ b/docmd/implementation/merge_parallelism.md @@ -0,0 +1,207 @@ +# Merge parallelism and memory pressure + +## Problem observed + +Running `obikmer merge` over 109 indexes (108 sources + 1 bootstrap) on a 192-core machine +produces a fatal OOM during the `merge_partitions` stage: + +``` +memory allocation of 9126805520 bytes failed +``` + +A single allocation of ~8.5 GB fails. This is not an aggregate; it is one `malloc` call +from hashbrown during a HashMap resize. + +--- + +## Root cause + +### The merge pipeline per partition + +``` +source unitigs.bin + → iter_indexed_canonical_kmers() + → GraphDeBruijn::push() ← HashSet + 1 byte flags, all in RAM + → compute_degrees_and_mark_starts() + → try_for_each_unitig() + → unitigs.bin (new layer) + → Layer::build() → MPHF + evidence +``` + +`GraphDeBruijn` is a `FastHashMap` — a `HashSet` with +one flag byte per node. Neighbor lookup is implicit: 4 probes into the same map. +No edges are stored. The full kmer set of one partition must reside in RAM +simultaneously to compute degrees and mark unitig starts. + +The matrix builders that follow (pass 2) are mmapped files — they do **not** consume +significant RAM. The pressure is entirely in pass 1. + +### Unbounded Rayon parallelism + +With 192 cores, Rayon ran up to 192 partitions concurrently. Each partition built its +own `GraphDeBruijn` accumulating all kmers absent from the destination. Peak memory = +192 × peak_partition_hashset. + +### The 8.5 GB single allocation + +hashbrown allocates the entire backing array in one call when rehashing. +At load factor 7/8: `capacity × (sizeof(K,V) + 1 control byte)`. +For `(u64, AtomicU8)` with alignment: ~16 bytes per slot. + +``` +9 127 MB / 16 bytes ≈ 570 M slots → ~380 M new kmers in one partition +``` + +Plausible for the largest partition of 108 Salix/Betula sources (~450 Mbp each). + +--- + +## Partition size distribution + +`obikmer utils --partition-stats` measures the sum of `unitigs.bin` file sizes +per partition across all source indexes (pure `stat()` syscalls, negligible cost). + +Observed on a 9-genome pilot (256 partitions): + +| Stat | Value | +|---|---| +| min | 30.5 MB | +| max | 232.1 MB | +| mean | 40.1 MB | +| median | 37.2 MB | +| p95 | 47.1 MB | +| max/median ratio | 6.23× | + +The distribution is **bimodal with a heavy tail**: +- 238/256 partitions in a narrow 30–50 MB band +- 4 structurally extreme partitions (3–6× the median): 221, 233, 135, 191 + +These correspond to minimizers over-represented in repetitive regions shared across +all sources. They are extreme in every run on this dataset. + +With 109 sources, outlier partitions do not scale linearly: only kmers **absent from +the destination** enter the GraphDeBruijn, and inter-source overlap is high for closely +related species. Partition 221 is the likely trigger for the 8.5 GB crash. + +--- + +## Solution: LFD scheduling + memory budget semaphore + +### Principle + +Pre-sort partitions by **decreasing estimated size** (First Fit Decreasing — FFD), +then schedule them through a **continuous memory budget semaphore**. Each worker +acquires an estimated cost before starting and releases it on completion. + +Large partitions run first when the full budget is available; small partitions fill +the gaps. No hard outlier threshold is needed. + +### `MemoryBudget` (`obisys`) + +```rust +pub struct MemoryBudget { … } + +impl MemoryBudget { + pub fn new(total: u64) -> Self; + pub fn acquire(&self, cost: u64); // blocks until budget available + pub fn release(&self, cost: u64); + pub fn peak_active(&self) -> usize; +} +``` + +Non-deadlock guarantee: when `active == 0`, acquire always succeeds regardless of cost. +Without this, a partition whose estimated cost exceeds the total budget would block forever. + +### Adaptive expansion factor + +The expansion factor converts raw `unitigs.bin` bytes into an estimated GraphDeBruijn +RAM footprint. hashbrown stores each kmer as `(u64, AtomicU8)` ≈ 16 bytes/kmer at 7/8 +load factor; unitig files encode ≈ 2 bits/base. The ratio depends on average unitig +length (short unitigs: ~2×; long unitigs: up to ~50×). + +**Phase 1 — sequential pilot (worst partition)** + +The largest partition runs alone first. Its actual `g.len()` seeds the expansion factor +before any parallel job starts. `FALLBACK_EXPANSION = 4×` is used only for empty partitions. + +```rust +let worst_g_len = dst_partition.merge_partition(worst_id, …)?; +// ↑ now returns SKResult (was SKResult<()>) + +let seed_expansion = worst_g_len as u64 * 16 * 1000 / worst_bytes; +let max_expansion = AtomicU64::new(seed_expansion); +``` + +**Phase 2 — parallel with adaptive updates** + +```rust +order[1..].into_par_iter().for_each(|&i| { + let cost = partition_sizes[i] * max_expansion.load(Relaxed) / 1000; + budget.acquire(cost); + let g_len = dst_partition.merge_partition(i, …)?; + budget.release(cost); // releases estimated cost, not actual + + let actual = g_len as u64 * 16 * 1000 / partition_sizes[i]; + max_expansion.fetch_max(actual, Relaxed); // always pessimistic (max) +}); +``` + +`budget.release(cost)` uses the estimated cost, not the actual one. The budget tracks +reservations, not physical RAM; each partition pays what it promised at acquisition. + +**On the safety margin** + +There is no separate multiplier `k`. It is redundant with `budget_fraction`: both +reduce effective concurrency by the same amount. A single parameter is easier to +calibrate. `budget_fraction = 0.5` (default) reserves half of available RAM for the +OS, MPHF build, pass 2, and estimation error. + +`--budget-fraction` is exposed as a CLI flag — the only escape hatch for pathological +cases (extreme repetitive content, unusually long unitigs) that still cause OOM. + +### RAM source + +`obisys::available_memory_bytes()` — wraps `sysinfo::System::available_memory()`, +falls back to `total / 2` on macOS when the memory compressor returns 0. + +--- + +## Diagnostic report + +After the parallel phase, `merge_partition` emits a structured report via `tracing::info!`: + +``` +─── merge_partitions memory report ─── + available RAM : 512.0 GB budget 50% = 256.0 GB + expansion factor — seed: 4.2× final max: 6.1× (mean: 1.8× median: 1.6×) + peak concurrent workers: 42 + expansion factor distribution (256 partitions with data): + 0.50× – 1.25× │██████████████████████████████ 148 + 1.25× – 2.00× │████████████████████████ 82 + … + 5.50× – 6.25× │█ 2 + top partitions by actual expansion factor: + partition 221 : 6.10× (232.1 MB unitigs → 48M kmers, reserved at 4.20×) + partition 135 : 5.82× (127.3 MB unitigs → 24M kmers, reserved at 4.20×) + … +────────────────────────────────────── +``` + +Fields useful for diagnosis: + +| Field | Interpretation | +|---|---| +| `seed` vs `final max` expansion | gap indicates partitions with higher expansion than the worst-by-size | +| `reserved at X×` | the factor used at acquisition; if much lower than actual, the budget was under-reserved for that partition | +| `peak concurrent workers` | effective parallelism achieved under the budget constraint | +| `mean` / `median` expansion | typical dataset characteristic; stable across runs on the same data | + +--- + +## Parameters + +| Parameter | Default | CLI flag | Notes | +|---|---|---|---| +| `fallback_expansion` | 4× | — | seed for empty partitions only | +| `budget_fraction` | 0.5 | `--budget-fraction` | reduce if OOM persists | +| RAM source | `obisys::available_memory_bytes()` | — | falls back to `total/2` on macOS | diff --git a/mkdocs.yml b/mkdocs.yml index f37c40b..6e60689 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -49,6 +49,7 @@ nav: - PersistentCompactIntVec: implementation/persistent_compact_int_vec.md - PersistentBitVec: implementation/persistent_bit_vec.md - Merge command: implementation/merge.md + - Merge parallelism & memory: implementation/merge_parallelism.md - Kmer filtering: implementation/filtering.md - Select command: implementation/select.md - Architecture: diff --git a/src/obicompactvec/src/intmatrix.rs b/src/obicompactvec/src/intmatrix.rs index 37547c6..8db78da 100644 --- a/src/obicompactvec/src/intmatrix.rs +++ b/src/obicompactvec/src/intmatrix.rs @@ -54,6 +54,14 @@ impl ColumnarCompactIntMatrix { Array1::from_vec(sums) } + pub(crate) fn count_nonzero(&self) -> Array1 { + let counts: Vec = (0..self.n_cols()) + .into_par_iter() + .map(|c| self.col(c).count_nonzero()) + .collect(); + Array1::from_vec(counts) + } + pub(crate) fn partial_bray_dist_matrix(&self) -> Array2 { self.pairwise_u64(|i, j| self.col(i).partial_bray_dist(self.col(j))) } @@ -234,6 +242,14 @@ impl PackedCompactIntMatrix { ) } + pub(crate) fn count_nonzero(&self) -> Array1 { + Array1::from_vec( + (0..self.n_cols).into_par_iter() + .map(|c| (0..self.n_rows).filter(|&s| self.get(c, s) > 0).count() as u64) + .collect() + ) + } + // ── Pair primitives ─────────────────────────────────────────────────────── fn pair_partial_bray(&self, i: usize, j: usize) -> u64 { @@ -421,6 +437,10 @@ impl PersistentCompactIntMatrix { match self { Self::Columnar(m) => m.sum(), Self::Packed(m) => m.sum() } } + pub fn count_nonzero(&self) -> Array1 { + match self { Self::Columnar(m) => m.count_nonzero(), Self::Packed(m) => m.count_nonzero() } + } + pub fn partial_bray_dist_matrix(&self) -> Array2 { match self { Self::Columnar(m) => m.partial_bray_dist_matrix(), Self::Packed(m) => m.partial_bray_dist_matrix() } } @@ -451,6 +471,7 @@ use crate::traits::{ColumnWeights, CountPartials}; impl ColumnWeights for PersistentCompactIntMatrix { fn col_weights(&self) -> Array1 { self.sum() } + fn partial_kmer_counts(&self) -> Array1 { self.count_nonzero() } } impl CountPartials for PersistentCompactIntMatrix { diff --git a/src/obicompactvec/src/reader.rs b/src/obicompactvec/src/reader.rs index 0f4ce25..057ce29 100644 --- a/src/obicompactvec/src/reader.rs +++ b/src/obicompactvec/src/reader.rs @@ -133,11 +133,15 @@ impl PersistentCompactIntVec { } #[inline] - /// Returns the sum of all values in the compact int vector. pub fn sum(&self) -> u64 { self.iter().map(|v| v as u64).sum() } + #[inline] + pub fn count_nonzero(&self) -> u64 { + self.iter().filter(|&v| v > 0).count() as u64 + } + #[inline] /// Returns the Bray-Curtis distance between two compact int vectors. pub fn bray_dist(&self, other: &PersistentCompactIntVec) -> f64 { diff --git a/src/obicompactvec/src/traits.rs b/src/obicompactvec/src/traits.rs index f65ad9d..b61e69b 100644 --- a/src/obicompactvec/src/traits.rs +++ b/src/obicompactvec/src/traits.rs @@ -2,8 +2,16 @@ use ndarray::{Array1, Array2}; /// Column-level weight statistic — total count or presence count per column. /// Additive across layers and partitions; used as denominator in normalised distances. +/// +/// `partial_kmer_counts` returns the number of **distinct k-mers** present per +/// column (presence = 1 entries; count > 0 entries). For presence matrices this +/// equals `col_weights`; for count matrices it differs (count_nonzero vs sum). pub trait ColumnWeights: Send + Sync { fn col_weights(&self) -> Array1; + + fn partial_kmer_counts(&self) -> Array1 { + self.col_weights() + } } /// Partial distance matrices for count-based data (`PersistentCompactIntMatrix`). diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index d2e95da..108098b 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -2,7 +2,10 @@ use std::collections::HashMap; use std::fs; use std::io; use std::path::Path; -use obisys::{Reporter, Stage, progress_bar, spinner}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; + +use obisys::{MemoryBudget, Reporter, Stage, available_memory_bytes, progress_bar, spinner}; use rayon::prelude::*; use tracing::info; @@ -15,23 +18,26 @@ use crate::state::IndexState; pub use obikpartitionner::MergeMode; +// ── per-partition diagnostic record ────────────────────────────────────────── + +#[derive(Debug)] +struct PartStat { + id: usize, + unitig_bytes: u64, // sum of unitigs.bin across remaining sources + g_len: usize, // actual new kmers inserted into GraphDeBruijn + exp_at_acquire: f64, // expansion factor used to size the budget reservation +} + +// ── main merge entry point ──────────────────────────────────────────────────── + impl KmerIndex { - /// Merge `sources` into a new index at `output`. - /// - /// All sources must be in `Indexed` state and share the same `kmer_size`, - /// `minimizer_size`, and `n_partitions`. Count mode additionally requires - /// every source to have `with_counts = true`. - /// - /// Genome labels must be unique across all sources. If `rename_duplicates` - /// is true, repeated labels are disambiguated by appending `.1`, `.2`, … - /// to the second and subsequent occurrences. Otherwise a - /// `DuplicateGenomeLabel` error is returned on the first conflict. pub fn merge>( output: P, sources: &[&KmerIndex], mode: MergeMode, force: bool, rename_duplicates: bool, + budget_fraction: f64, rep: &mut Reporter, ) -> OKIResult { let output = output.as_ref(); @@ -98,7 +104,7 @@ impl KmerIndex { let sources: &[&KmerIndex] = &ordered; let evidence = sources[0].meta.config.evidence.clone(); - // ── Compute final genome labels (rename duplicates if requested) ─────── + // ── Compute final genome labels ──────────────────────────────────────── let (source_labels, all_genomes) = compute_labels(sources, rename_duplicates)?; // ── Prepare output directory ────────────────────────────────────────── @@ -125,23 +131,19 @@ impl KmerIndex { pb.set_message("copying index …"); copy_dir_all(&sources[0].root_path, output)?; - // Rewrite index.meta with final genome labels and the effective mode. let mut meta = IndexMeta::read(output).map_err(OKIError::Io)?; meta.genomes = all_genomes; meta.config.with_counts = mode == MergeMode::Count; meta.config.evidence = evidence.clone(); meta.write(output)?; - // In presence/absence mode, purge counts/ directories inherited from - // source_0 — they are stale data from the source's count index. if mode == MergeMode::Presence { remove_dirs_named(output, "counts")?; } pb.finish_and_clear(); rep.push(t.stop()); - // Rebuild spectrums/ from all sources using the (possibly renamed) labels. - // Drop the spectrums/ that were copied from source_0 and rebuild from scratch. + // ── Rebuild spectrums ───────────────────────────────────────────────── info!("rebuilding spectrums for {} source(s)", sources.len()); let t = Stage::start("spectrums"); let pb = spinner("spectrums"); @@ -157,12 +159,12 @@ impl KmerIndex { pb.finish_and_clear(); rep.push(t.stop()); - // Open the destination index. + // ── Open destination ────────────────────────────────────────────────── let dst = KmerIndex::open(output)?; let n_partitions = dst.n_partitions(); let n_dst_genomes = sources[0].meta.genomes.len(); - // ── Merge each subsequent source partition-by-partition ─────────────── + // ── Merge partitions ────────────────────────────────────────────────── let remaining_sources: Vec<&KmerIndex> = sources[1..].to_vec(); if !remaining_sources.is_empty() { let n_src_genomes: usize = remaining_sources.iter().map(|s| s.meta.genomes.len()).sum(); @@ -176,22 +178,118 @@ impl KmerIndex { let dst_partition = &dst.partition; let block_bits = dst.meta.config.block_bits; - let errors: Vec = (0..n_partitions) - .into_par_iter() - .filter_map(|i| { - let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> = - remaining_sources.iter().map(|s| (&s.partition, s.meta.genomes.len())).collect(); - let result = dst_partition.merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence).err(); + // Pre-build source list once (avoid rebuilding per partition) + let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> = remaining_sources + .iter() + .map(|s| (&s.partition, s.meta.genomes.len())) + .collect(); + + // Per-partition unitig byte sizes across remaining sources (stat() only) + let partition_sizes: Vec = (0..n_partitions) + .map(|i| remaining_sources.iter() + .map(|s| partition_unitig_bytes(s, i)) + .sum()) + .collect(); + + // LFD sort: largest partition first + let mut order: Vec = (0..n_partitions).collect(); + order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i])); + + // ── Sequential pilot: worst partition → seed expansion factor ───── + const FALLBACK_EXPANSION: u64 = 4_000; // 4× in fixed-point ×1000 + let worst_id = order[0]; + let worst_bytes = partition_sizes[worst_id]; + + let worst_g_len = dst_partition + .merge_partition(worst_id, &srcs, mode, n_dst_genomes, block_bits, &evidence) + .map_err(OKIError::Partition)?; + pb.inc(1); + + let seed_expansion = if worst_bytes > 0 { + worst_g_len as u64 * 16 * 1000 / worst_bytes + } else { + FALLBACK_EXPANSION + }; + + info!( + "merge_partitions: pilot partition {} — {} unitig bytes → {} new kmers, \ + expansion {:.2}×", + worst_id, worst_bytes, worst_g_len, + seed_expansion as f64 / 1000.0, + ); + + let part_stats: Arc>> = Arc::new(Mutex::new({ + let mut v = Vec::with_capacity(n_partitions); + v.push(PartStat { + id: worst_id, + unitig_bytes: worst_bytes, + g_len: worst_g_len, + exp_at_acquire: seed_expansion as f64 / 1000.0, + }); + v + })); + + let max_expansion = AtomicU64::new(seed_expansion); + + // ── Parallel remainder under memory budget ──────────────────────── + let available = available_memory_bytes(); + let budget_bytes = (available as f64 * budget_fraction) as u64; + let budget = Arc::new(MemoryBudget::new(budget_bytes)); + + info!( + "merge_partitions: available RAM {}, budget {:.0}% = {}", + fmt_bytes(available), + budget_fraction * 100.0, + fmt_bytes(budget_bytes), + ); + + let errors: Vec = order[1..].into_par_iter() + .filter_map(|&i| { + let ubytes = partition_sizes[i]; + let exp = max_expansion.load(Ordering::Relaxed); + let cost = ubytes * exp / 1000; + + budget.acquire(cost); + let result = dst_partition + .merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence); + budget.release(cost); pb.inc(1); - result + + match result { + Ok(g_len) => { + if ubytes > 0 { + let actual = g_len as u64 * 16 * 1000 / ubytes; + max_expansion.fetch_max(actual, Ordering::Relaxed); + } + part_stats.lock().unwrap().push(PartStat { + id: i, + unitig_bytes: ubytes, + g_len, + exp_at_acquire: exp as f64 / 1000.0, + }); + None + } + Err(e) => Some(OKIError::Partition(e)), + } }) .collect(); pb.finish_and_clear(); if let Some(e) = errors.into_iter().next() { - return Err(OKIError::Partition(e)); + return Err(e); } + // ── Diagnostic report ───────────────────────────────────────────── + let stats = Arc::try_unwrap(part_stats).unwrap().into_inner().unwrap(); + print_merge_partition_report( + &stats, + available, + budget_fraction, + seed_expansion as f64 / 1000.0, + max_expansion.load(Ordering::Relaxed) as f64 / 1000.0, + budget.peak_active(), + ); + rep.push(t.stop()); } @@ -206,19 +304,110 @@ impl KmerIndex { rep.push(t.stop()); } - // Re-open to get the updated state. KmerIndex::open(output) } } -// ── Helpers ─────────────────────────────────────────────────────────────────── +// ── Diagnostic report ───────────────────────────────────────────────────────── + +fn print_merge_partition_report( + stats: &[PartStat], + available_ram: u64, + budget_fraction: f64, + seed_expansion: f64, + final_expansion: f64, + peak_active: usize, +) { + // Compute actual expansion per partition (skip empty partitions) + let expansions: Vec<(usize, f64)> = stats + .iter() + .filter(|s| s.unitig_bytes > 0) + .map(|s| (s.id, s.g_len as f64 * 16.0 / s.unitig_bytes as f64)) + .collect(); + + if expansions.is_empty() { + info!("merge_partitions report: no data (all partitions empty)"); + return; + } + + let mut sorted_exp: Vec = expansions.iter().map(|(_, e)| *e).collect(); + sorted_exp.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let n = sorted_exp.len(); + let mean_exp = sorted_exp.iter().sum::() / n as f64; + let median_exp = sorted_exp[n / 2]; + let max_exp = sorted_exp[n - 1]; + + info!("─── merge_partitions memory report ───"); + info!( + " available RAM : {} budget {:.0}% = {}", + fmt_bytes(available_ram), + budget_fraction * 100.0, + fmt_bytes((available_ram as f64 * budget_fraction) as u64), + ); + info!( + " expansion factor — seed: {:.2}× final max: {:.2}× \ + (mean: {:.2}× median: {:.2}× observed max: {:.2}×)", + seed_expansion, final_expansion, mean_exp, median_exp, max_exp, + ); + info!(" peak concurrent workers: {}", peak_active); + + // Histogram of actual expansion factors + let min_e = sorted_exp[0]; + let max_e = sorted_exp[n - 1]; + let n_buckets = 8usize; + let bucket_w = (max_e - min_e).max(0.01) / n_buckets as f64; + let mut counts = vec![0usize; n_buckets]; + for &e in &sorted_exp { + let b = (((e - min_e) / bucket_w) as usize).min(n_buckets - 1); + counts[b] += 1; + } + let max_count = *counts.iter().max().unwrap(); + info!(" expansion factor distribution ({} partitions with data):", n); + for (i, &c) in counts.iter().enumerate() { + let lo = min_e + i as f64 * bucket_w; + let hi = min_e + (i + 1) as f64 * bucket_w; + let bar = "█".repeat(if max_count > 0 { c * 30 / max_count } else { 0 }); + info!(" {:5.2}× – {:5.2}× │{:<30} {}", lo, hi, bar, c); + } + + // Top 8 by actual expansion + let mut by_exp: Vec<(usize, f64)> = expansions.clone(); + by_exp.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap()); + info!(" top partitions by actual expansion factor:"); + for (id, exp) in by_exp.iter().take(8) { + let s = stats.iter().find(|s| s.id == *id).unwrap(); + info!( + " partition {:4} : {:.2}× ({} unitigs → {}M kmers, \ + reserved at {:.2}×)", + id, exp, + fmt_bytes(s.unitig_bytes), + s.g_len / 1_000_000, + s.exp_at_acquire, + ); + } + info!("──────────────────────────────────────"); +} + +// ── helpers ─────────────────────────────────────────────────────────────────── + +fn fmt_bytes(b: u64) -> String { + if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) } + else if b >= 1 << 20 { format!("{:.1} MB", b as f64 / (1u64 << 20) as f64) } + else if b >= 1 << 10 { format!("{:.1} KB", b as f64 / (1u64 << 10) as f64) } + else { format!("{b} B") } +} + +/// Sum of all unitigs.bin sizes across all layers of partition `i` in `src`. +fn partition_unitig_bytes(src: &KmerIndex, i: usize) -> u64 { + let mut total = 0u64; + for l in 0.. { + let p = src.layer_unitigs_path(i, l); + if !p.exists() { break; } + if let Ok(m) = std::fs::metadata(&p) { total += m.len(); } + } + total +} -/// Compute the final genome label lists for all sources. -/// -/// Returns `(per_source_labels, all_genomes_flat)`. -/// The first occurrence of a label keeps the original name. Subsequent -/// occurrences receive `.1`, `.2`, … suffixes when `rename_duplicates` is true, -/// or trigger a `DuplicateGenomeLabel` error otherwise. fn compute_labels( sources: &[&KmerIndex], rename_duplicates: bool, @@ -249,8 +438,6 @@ fn compute_labels( Ok((source_labels, all_genomes)) } -/// Copy spectrum JSON files from `src_root/spectrums/` to `dst_root/spectrums/`, -/// mapping each `old_labels[i]` filename to `new_labels[i]`. fn copy_spectrums( src_root: &Path, dst_root: &Path, @@ -269,7 +456,6 @@ fn copy_spectrums( Ok(()) } -/// Recursively remove every directory named `name` under `root`. fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> { for entry in fs::read_dir(root)? { let entry = entry?; @@ -285,7 +471,6 @@ fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> { Ok(()) } - fn format_evidence(ev: &IndexMode) -> String { match ev { IndexMode::Exact => "exact".to_string(), @@ -294,37 +479,15 @@ fn format_evidence(ev: &IndexMode) -> String { } } -/// A source is "trivial" if its presence/count values carry no approximation: -/// single-genome presence index (SetMembership — all values are 1 by construction). fn is_trivial(src: &KmerIndex, mode: MergeMode) -> bool { src.meta.genomes.len() == 1 && mode == MergeMode::Presence } -/// Sum of all `unitigs.bin` sizes across every partition and layer. -/// Used as a proxy for the number of indexed smers. fn index_unitig_size(src: &KmerIndex) -> u64 { let n = src.partition.n_partitions(); - let mut total = 0u64; - for i in 0..n { - let index_dir = src.partition.part_dir(i).join("index"); - let mut l = 0usize; - loop { - let p = index_dir.join(format!("layer_{l}")).join("unitigs.bin"); - if !p.exists() { break; } - if let Ok(m) = std::fs::metadata(&p) { total += m.len(); } - l += 1; - } - } - total + (0..n).map(|i| partition_unitig_bytes(src, i)).sum() } -/// Choose the index to use as bootstrap base. -/// -/// Rule — mieux-disant: if any non-trivial source uses approximate evidence -/// (Approx or Hybrid), the output must also be approximate; the base must -/// therefore come from an approximate source so its layers carry the right -/// evidence files. Among qualifying candidates, the largest (by unitig size) -/// is chosen to minimise the number of new smers in the merge layer. fn choose_base(sources: &[&KmerIndex], mode: MergeMode) -> usize { let needs_approx = sources.iter().any(|src| { !is_trivial(src, mode) diff --git a/src/obikindex/src/stats.rs b/src/obikindex/src/stats.rs index e3dffc9..434af17 100644 --- a/src/obikindex/src/stats.rs +++ b/src/obikindex/src/stats.rs @@ -1,7 +1,8 @@ use std::fs; use std::path::Path; -use obicompactvec::LayerMeta; +use obicompactvec::{LayerMeta, PersistentBitMatrix, PersistentCompactIntMatrix}; +use obicompactvec::traits::ColumnWeights; use obilayeredmap::meta::PartitionMeta; use rayon::prelude::*; @@ -124,4 +125,68 @@ impl KmerIndex { total: bpk(mphf_b + evidence_b + matrix_b), }) } + + /// Return `(total_distinct_kmers, per_genome_kmer_counts)`. + /// + /// For each genome, the count is the number of distinct k-mers for which + /// that genome has a non-zero value (presence = 1, count > 0). + /// Partitions are scanned in parallel; results are summed across partitions. + pub fn genome_kmer_counts(&self) -> OKIResult<(usize, Vec)> { + let n = self.n_partitions(); + let n_genomes = self.meta.genomes.len(); + + let partials: Vec<(usize, Vec)> = (0..n) + .into_par_iter() + .map(|i| { + let mut counts = vec![0u64; n_genomes]; + let mut n_kmers = 0usize; + + let index_dir = self.partition.part_dir(i).join("index"); + if !index_dir.exists() { return (0, counts); } + + let n_layers = PartitionMeta::load(&index_dir) + .map(|m| m.n_layers) + .unwrap_or(0); + + for l in 0..n_layers { + let layer_dir = index_dir.join(format!("layer_{l}")); + if !layer_dir.exists() { continue; } + + n_kmers += LayerMeta::load(&layer_dir).map(|m| m.n).unwrap_or(0); + + let mat: Box = + if layer_dir.join("counts").exists() + && !layer_dir.join("presence").exists() + { + match PersistentCompactIntMatrix::open(&layer_dir) { + Ok(m) => Box::new(m), + Err(_) => continue, + } + } else { + match PersistentBitMatrix::open(&layer_dir) { + Ok(m) => Box::new(m), + Err(_) => continue, + } + }; + let col_counts = mat.partial_kmer_counts(); + + for (c, &v) in col_counts.iter().enumerate() { + if c < n_genomes { counts[c] += v; } + } + } + + (n_kmers, counts) + }) + .collect(); + + let total_kmers: usize = partials.iter().map(|(n, _)| n).sum(); + let mut total_counts = vec![0u64; n_genomes]; + for (_, counts) in partials { + for (i, v) in counts.into_iter().enumerate() { + total_counts[i] += v; + } + } + + Ok((total_kmers, total_counts)) + } } diff --git a/src/obikmer/src/cmd/merge.rs b/src/obikmer/src/cmd/merge.rs index 9fe0ee4..f0ce2dc 100644 --- a/src/obikmer/src/cmd/merge.rs +++ b/src/obikmer/src/cmd/merge.rs @@ -26,6 +26,11 @@ pub struct MergeArgs { /// Disambiguate duplicate genome labels by appending .1, .2, … instead of erroring #[arg(long, default_value_t = false)] pub rename_duplicates: bool, + + /// Fraction of available RAM reserved as memory budget for parallel partition merging. + /// Reduce if OOM occurs despite the adaptive scheduler (e.g. --budget-fraction 0.3). + #[arg(long, default_value_t = 0.5)] + pub budget_fraction: f64, } pub fn run(args: MergeArgs) { @@ -60,7 +65,7 @@ pub fn run(args: MergeArgs) { ); let mut rep = Reporter::new(); - KmerIndex::merge(&args.output, &source_refs, mode, args.force, args.rename_duplicates, &mut rep).unwrap_or_else(|e| { + KmerIndex::merge(&args.output, &source_refs, mode, args.force, args.rename_duplicates, args.budget_fraction, &mut rep).unwrap_or_else(|e| { eprintln!("error merging: {e}"); std::process::exit(1); }); diff --git a/src/obikmer/src/cmd/utils.rs b/src/obikmer/src/cmd/utils.rs index 5aa564a..f73c308 100644 --- a/src/obikmer/src/cmd/utils.rs +++ b/src/obikmer/src/cmd/utils.rs @@ -1,3 +1,4 @@ +use std::io::{self, Write}; use std::path::PathBuf; use clap::Args; @@ -6,20 +7,33 @@ use tracing::info; #[derive(Args)] pub struct UtilsArgs { - /// Index directory to operate on - pub index: PathBuf, + /// Index directories to operate on (one or more) + #[arg(required = true, num_args = 1..)] + pub indexes: Vec, - /// Set a new genome label: NEW_LABEL=OLD_LABEL + /// Set a new genome label: NEW_LABEL=OLD_LABEL (single-index only) #[arg(long, value_name = "NEW=OLD")] pub new_label: Option, - /// Add missing layer_meta.json files to each layer (required after upgrading from old indexes) + /// Add missing layer_meta.json files to each layer (single-index only) #[arg(long)] pub upgrade_index: bool, - /// Print bits-per-kmer statistics (MPHF, evidence, matrix, total) + /// Print bits-per-kmer statistics (single-index only) #[arg(long)] pub bits_per_kmer: bool, + + /// Print per-genome k-mer counts as CSV (single-index only) + #[arg(long)] + pub stats: bool, + + /// Print partition size distribution report (accepts multiple indexes) + #[arg(long)] + pub partition_stats: bool, + + /// Write per-(partition, source) raw data as CSV to FILE (used with --partition-stats) + #[arg(long, value_name = "FILE")] + pub csv: Option, } pub fn run(args: UtilsArgs) { @@ -27,25 +41,266 @@ pub fn run(args: UtilsArgs) { if let Some(spec) = &args.new_label { any = true; - run_rename(&args.index, spec); + run_rename(single_index(&args), spec); } if args.upgrade_index { any = true; - run_upgrade_index(&args.index); + run_upgrade_index(single_index(&args)); } if args.bits_per_kmer { any = true; - run_bits_per_kmer(&args.index); + run_bits_per_kmer(single_index(&args)); + } + + if args.stats { + any = true; + run_stats(single_index(&args)); + } + + if args.partition_stats { + any = true; + run_partition_stats(&args.indexes, args.csv.as_deref()); } if !any { - eprintln!("utils: no operation specified. Available options: --new-label NEW=OLD, --upgrade-index, --bits-per-kmer"); + eprintln!( + "utils: no operation specified. \ + Available: --new-label, --upgrade-index, --bits-per-kmer, --stats, --partition-stats" + ); std::process::exit(1); } } +// ── helpers ─────────────────────────────────────────────────────────────────── + +fn single_index(args: &UtilsArgs) -> &PathBuf { + if args.indexes.len() > 1 { + eprintln!("utils: this option requires exactly one index (got {})", args.indexes.len()); + std::process::exit(1); + } + &args.indexes[0] +} + +// ── --partition-stats ───────────────────────────────────────────────────────── + +/// Per-partition, per-source byte count of all unitigs.bin files summed across layers. +struct PartRow { + partition: usize, + source: String, + bytes: u64, +} + +fn collect_rows(indexes: &[PathBuf]) -> Vec { + let mut rows = Vec::new(); + for path in indexes { + let idx = KmerIndex::open(path).unwrap_or_else(|e| { + eprintln!("error opening index {}: {e}", path.display()); + std::process::exit(1); + }); + let name = path + .file_name() + .map(|n| n.to_string_lossy().into_owned()) + .unwrap_or_else(|| path.display().to_string()); + let n_parts = idx.n_partitions(); + for i in 0..n_parts { + let mut bytes = 0u64; + for l in 0.. { + let p = idx.layer_unitigs_path(i, l); + if !p.exists() { + break; + } + if let Ok(m) = std::fs::metadata(&p) { + bytes += m.len(); + } + } + rows.push(PartRow { partition: i, source: name.clone(), bytes }); + } + } + rows +} + +/// Sum bytes per partition across all sources. +fn partition_totals(rows: &[PartRow], n_parts: usize) -> Vec { + let mut totals = vec![0u64; n_parts]; + for r in rows { + totals[r.partition] += r.bytes; + } + totals +} + +fn stats_summary(totals: &[u64]) -> (u64, u64, f64, f64, u64, u64, u64) { + let mut sorted = totals.to_vec(); + sorted.sort_unstable(); + let n = sorted.len(); + let min = sorted[0]; + let max = sorted[n - 1]; + let mean = sorted.iter().sum::() as f64 / n as f64; + let median = if n % 2 == 0 { + (sorted[n / 2 - 1] + sorted[n / 2]) as f64 / 2.0 + } else { + sorted[n / 2] as f64 + }; + let p95 = sorted[(n as f64 * 0.95) as usize]; + let p99 = sorted[(n as f64 * 0.99) as usize]; + let variance = sorted + .iter() + .map(|&v| (v as f64 - mean).powi(2)) + .sum::() + / n as f64; + let std_dev = variance.sqrt(); + (min, max, mean, median, p95, p99, std_dev as u64) +} + +fn human_bytes(b: u64) -> String { + if b >= 1 << 30 { + format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) + } else if b >= 1 << 20 { + format!("{:.1} MB", b as f64 / (1u64 << 20) as f64) + } else if b >= 1 << 10 { + format!("{:.1} KB", b as f64 / (1u64 << 10) as f64) + } else { + format!("{b} B") + } +} + +fn ascii_histogram(totals: &[u64], n_buckets: usize, bar_width: usize) -> String { + let min = *totals.iter().min().unwrap(); + let max = *totals.iter().max().unwrap(); + if min == max { + return format!(" (all partitions identical: {})\n", human_bytes(min)); + } + + let bucket_size = (max - min).max(1) as f64 / n_buckets as f64; + let mut counts = vec![0usize; n_buckets]; + for &v in totals { + let b = (((v - min) as f64 / bucket_size) as usize).min(n_buckets - 1); + counts[b] += 1; + } + + let max_count = *counts.iter().max().unwrap(); + let mut out = String::new(); + for (i, &c) in counts.iter().enumerate() { + let lo = min + (i as f64 * bucket_size) as u64; + let hi = min + ((i + 1) as f64 * bucket_size) as u64; + let bar_len = if max_count > 0 { c * bar_width / max_count } else { 0 }; + let bar = "█".repeat(bar_len); + out.push_str(&format!( + " {:>8} – {:>8} │{:) { + let rows = collect_rows(indexes); + if rows.is_empty() { + eprintln!("partition-stats: no data found"); + std::process::exit(1); + } + + let n_parts = rows.iter().map(|r| r.partition).max().unwrap() + 1; + let totals = partition_totals(&rows, n_parts); + let (min, max, mean, median, p95, p99, std_dev) = stats_summary(&totals); + + // outliers: > median + 1.5 × IQR (approximate via > 1.5 × median as fallback) + let mut sorted_t = totals.clone(); + sorted_t.sort_unstable(); + let q1 = sorted_t[n_parts / 4] as f64; + let q3 = sorted_t[3 * n_parts / 4] as f64; + let iqr = q3 - q1; + let outlier_threshold = q3 + 1.5 * iqr; + + let mut out = String::new(); + out.push_str("# Partition size report\n\n"); + out.push_str(&format!( + "Sources: {} \nPartitions: {} \n\n", + indexes.len(), + n_parts + )); + + out.push_str("## Summary statistics (total unitigs.bin bytes per partition, sum across sources)\n\n"); + out.push_str("| Stat | Value |\n|---|---|\n"); + out.push_str(&format!("| min | {} |\n", human_bytes(min))); + out.push_str(&format!("| max | {} |\n", human_bytes(max))); + out.push_str(&format!("| mean | {} |\n", human_bytes(mean as u64))); + out.push_str(&format!("| median | {} |\n", human_bytes(median as u64))); + out.push_str(&format!("| p95 | {} |\n", human_bytes(p95))); + out.push_str(&format!("| p99 | {} |\n", human_bytes(p99))); + out.push_str(&format!("| std | {} |\n", human_bytes(std_dev))); + out.push_str(&format!("| max/median ratio | {:.2}× |\n\n", max as f64 / median)); + + out.push_str("## Histogram\n\n```\n"); + out.push_str(&ascii_histogram(&totals, 30, 40)); + out.push_str("```\n\n"); + + let outliers: Vec<(usize, u64)> = totals + .iter() + .enumerate() + .filter(|(_, v)| **v as f64 > outlier_threshold) + .map(|(i, v)| (i, *v)) + .collect(); + + if outliers.is_empty() { + out.push_str("## Outliers\n\nNone (threshold: Q3 + 1.5×IQR = "); + out.push_str(&human_bytes(outlier_threshold as u64)); + out.push_str(").\n"); + } else { + out.push_str(&format!( + "## Outliers (> Q3 + 1.5×IQR = {})\n\n| Partition | Total size | Ratio to median |\n|---|---|---|\n", + human_bytes(outlier_threshold as u64) + )); + for (i, v) in &outliers { + out.push_str(&format!( + "| {} | {} | {:.2}× |\n", + i, + human_bytes(*v), + *v as f64 / median + )); + } + out.push('\n'); + } + + print!("{out}"); + + if let Some(csv_out) = csv_path { + let file = std::fs::File::create(csv_out).unwrap_or_else(|e| { + eprintln!("error creating CSV file {}: {e}", csv_out.display()); + std::process::exit(1); + }); + let mut w = io::BufWriter::new(file); + writeln!(w, "partition,source,bytes").unwrap(); + for r in &rows { + writeln!(w, "{},{},{}", r.partition, r.source, r.bytes).unwrap(); + } + eprintln!("CSV written to {}", csv_out.display()); + } +} + +// ── existing single-index operations ───────────────────────────────────────── + +fn run_stats(index_path: &PathBuf) { + let idx = KmerIndex::open(index_path).unwrap_or_else(|e| { + eprintln!("error opening index: {e}"); + std::process::exit(1); + }); + let (total, per_genome) = idx.genome_kmer_counts().unwrap_or_else(|e| { + eprintln!("error computing stats: {e}"); + std::process::exit(1); + }); + println!("genome,n_kmers"); + for (g, &n) in idx.meta().genomes.iter().zip(per_genome.iter()) { + println!("{},{}", g.label, n); + } + println!("total,{total}"); +} + fn run_bits_per_kmer(index_path: &PathBuf) { let idx = KmerIndex::open(index_path).unwrap_or_else(|e| { eprintln!("error opening index: {e}"); @@ -59,8 +314,10 @@ fn run_bits_per_kmer(index_path: &PathBuf) { println!("genomes : {}", stats.n_genomes); println!("mphf : {:6.2} bits/kmer", stats.mphf); println!("evidence : {:6.2} bits/kmer", stats.evidence); - println!("matrix : {:6.2} bits/kmer ({:.2} bits/kmer/genome)", - stats.matrix, stats.matrix_per_genome); + println!( + "matrix : {:6.2} bits/kmer ({:.2} bits/kmer/genome)", + stats.matrix, stats.matrix_per_genome + ); println!("total : {:6.2} bits/kmer", stats.total); } @@ -99,7 +356,6 @@ fn run_rename(index_path: &PathBuf, spec: &str) { std::process::exit(1); }); - // Check the new label is not already taken. if idx.meta().genomes.iter().any(|g| g.label == new_label) { eprintln!("error: label '{new_label}' already exists in index"); std::process::exit(1); @@ -111,7 +367,6 @@ fn run_rename(index_path: &PathBuf, spec: &str) { std::process::exit(1); }); - // Rename the spectrum file if it exists. let spectrums_dir = index_path.join("spectrums"); let old_spectrum = spectrums_dir.join(format!("{old_label}.json")); let new_spectrum = spectrums_dir.join(format!("{new_label}.json")); diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 4d30bfa..6b55f5d 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -166,10 +166,10 @@ impl KmerPartition { n_dst_genomes: usize, block_bits: u8, evidence: &IndexMode, - ) -> SKResult<()> { + ) -> SKResult { let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); if !dst_index_dir.exists() { - return Ok(()); + return Ok(0); } load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open @@ -381,6 +381,6 @@ impl KmerPartition { part_meta.save(&dst_index_dir).map_err(olm_to_sk)?; } - Ok(()) + Ok(n_new) } } diff --git a/src/obisys/src/lib.rs b/src/obisys/src/lib.rs index b57578f..2d800d2 100644 --- a/src/obisys/src/lib.rs +++ b/src/obisys/src/lib.rs @@ -1,5 +1,6 @@ use std::fmt; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Condvar, Mutex}; use std::time::{Duration, Instant}; use indicatif::{ProgressBar, ProgressStyle}; @@ -309,6 +310,60 @@ fn fmt_efficiency(par: f64, n_cores: usize) -> String { // ── Display ─────────────────────────────────────────────────────────────────── +// ── MemoryBudget ────────────────────────────────────────────────────────────── + +struct BudgetInner { + remaining: u64, + active: usize, + peak_active: usize, +} + +/// Counting semaphore that limits total concurrent estimated memory usage. +/// +/// Each worker acquires a cost (bytes) before starting and releases it on +/// completion. Non-deadlock guarantee: when no worker is active the next +/// acquire always succeeds regardless of cost vs. remaining budget. +pub struct MemoryBudget { + total: u64, + inner: Mutex, + condvar: Condvar, +} + +impl MemoryBudget { + pub fn new(total: u64) -> Self { + Self { + total, + inner: Mutex::new(BudgetInner { remaining: total, active: 0, peak_active: 0 }), + condvar: Condvar::new(), + } + } + + pub fn acquire(&self, cost: u64) { + let mut g = self.inner.lock().unwrap(); + loop { + if g.active == 0 || g.remaining >= cost { + g.remaining = g.remaining.saturating_sub(cost); + g.active += 1; + g.peak_active = g.peak_active.max(g.active); + return; + } + g = self.condvar.wait(g).unwrap(); + } + } + + pub fn release(&self, cost: u64) { + let mut g = self.inner.lock().unwrap(); + g.remaining = (g.remaining + cost).min(self.total); + g.active -= 1; + self.condvar.notify_all(); + } + + pub fn total(&self) -> u64 { self.total } + pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active } +} + +// ── Display ─────────────────────────────────────────────────────────────────── + impl fmt::Display for Reporter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if self.stages.is_empty() { return Ok(()); }