Merge pull request 'Push ylnwstyzqwrt' (#22) from push-ylnwstyzqwrt into main

Reviewed-on: #22
This commit was merged in pull request #22.
This commit is contained in:
2026-06-12 10:10:03 +00:00
11 changed files with 865 additions and 81 deletions
+207
View File
@@ -0,0 +1,207 @@
# Merge parallelism and memory pressure
## Problem observed
Running `obikmer merge` over 109 indexes (108 sources + 1 bootstrap) on a 192-core machine
produces a fatal OOM during the `merge_partitions` stage:
```
memory allocation of 9126805520 bytes failed
```
A single allocation of ~8.5 GB fails. This is not an aggregate; it is one `malloc` call
from hashbrown during a HashMap resize.
---
## Root cause
### The merge pipeline per partition
```
source unitigs.bin
→ iter_indexed_canonical_kmers()
→ GraphDeBruijn::push() ← HashSet<u64> + 1 byte flags, all in RAM
→ compute_degrees_and_mark_starts()
→ try_for_each_unitig()
→ unitigs.bin (new layer)
→ Layer::build() → MPHF + evidence
```
`GraphDeBruijn` is a `FastHashMap<CanonicalKmer, AtomicU8>` — a `HashSet<u64>` with
one flag byte per node. Neighbor lookup is implicit: 4 probes into the same map.
No edges are stored. The full kmer set of one partition must reside in RAM
simultaneously to compute degrees and mark unitig starts.
The matrix builders that follow (pass 2) are mmapped files — they do **not** consume
significant RAM. The pressure is entirely in pass 1.
### Unbounded Rayon parallelism
With 192 cores, Rayon ran up to 192 partitions concurrently. Each partition built its
own `GraphDeBruijn` accumulating all kmers absent from the destination. Peak memory =
192 × peak_partition_hashset.
### The 8.5 GB single allocation
hashbrown allocates the entire backing array in one call when rehashing.
At load factor 7/8: `capacity × (sizeof(K,V) + 1 control byte)`.
For `(u64, AtomicU8)` with alignment: ~16 bytes per slot.
```
9 127 MB / 16 bytes ≈ 570 M slots → ~380 M new kmers in one partition
```
Plausible for the largest partition of 108 Salix/Betula sources (~450 Mbp each).
---
## Partition size distribution
`obikmer utils --partition-stats` measures the sum of `unitigs.bin` file sizes
per partition across all source indexes (pure `stat()` syscalls, negligible cost).
Observed on a 9-genome pilot (256 partitions):
| Stat | Value |
|---|---|
| min | 30.5 MB |
| max | 232.1 MB |
| mean | 40.1 MB |
| median | 37.2 MB |
| p95 | 47.1 MB |
| max/median ratio | 6.23× |
The distribution is **bimodal with a heavy tail**:
- 238/256 partitions in a narrow 3050 MB band
- 4 structurally extreme partitions (36× the median): 221, 233, 135, 191
These correspond to minimizers over-represented in repetitive regions shared across
all sources. They are extreme in every run on this dataset.
With 109 sources, outlier partitions do not scale linearly: only kmers **absent from
the destination** enter the GraphDeBruijn, and inter-source overlap is high for closely
related species. Partition 221 is the likely trigger for the 8.5 GB crash.
---
## Solution: LFD scheduling + memory budget semaphore
### Principle
Pre-sort partitions by **decreasing estimated size** (First Fit Decreasing — FFD),
then schedule them through a **continuous memory budget semaphore**. Each worker
acquires an estimated cost before starting and releases it on completion.
Large partitions run first when the full budget is available; small partitions fill
the gaps. No hard outlier threshold is needed.
### `MemoryBudget` (`obisys`)
```rust
pub struct MemoryBudget { }
impl MemoryBudget {
pub fn new(total: u64) -> Self;
pub fn acquire(&self, cost: u64); // blocks until budget available
pub fn release(&self, cost: u64);
pub fn peak_active(&self) -> usize;
}
```
Non-deadlock guarantee: when `active == 0`, acquire always succeeds regardless of cost.
Without this, a partition whose estimated cost exceeds the total budget would block forever.
### Adaptive expansion factor
The expansion factor converts raw `unitigs.bin` bytes into an estimated GraphDeBruijn
RAM footprint. hashbrown stores each kmer as `(u64, AtomicU8)` ≈ 16 bytes/kmer at 7/8
load factor; unitig files encode ≈ 2 bits/base. The ratio depends on average unitig
length (short unitigs: ~2×; long unitigs: up to ~50×).
**Phase 1 — sequential pilot (worst partition)**
The largest partition runs alone first. Its actual `g.len()` seeds the expansion factor
before any parallel job starts. `FALLBACK_EXPANSION = 4×` is used only for empty partitions.
```rust
let worst_g_len = dst_partition.merge_partition(worst_id, )?;
// ↑ now returns SKResult<usize> (was SKResult<()>)
let seed_expansion = worst_g_len as u64 * 16 * 1000 / worst_bytes;
let max_expansion = AtomicU64::new(seed_expansion);
```
**Phase 2 — parallel with adaptive updates**
```rust
order[1..].into_par_iter().for_each(|&i| {
let cost = partition_sizes[i] * max_expansion.load(Relaxed) / 1000;
budget.acquire(cost);
let g_len = dst_partition.merge_partition(i, )?;
budget.release(cost); // releases estimated cost, not actual
let actual = g_len as u64 * 16 * 1000 / partition_sizes[i];
max_expansion.fetch_max(actual, Relaxed); // always pessimistic (max)
});
```
`budget.release(cost)` uses the estimated cost, not the actual one. The budget tracks
reservations, not physical RAM; each partition pays what it promised at acquisition.
**On the safety margin**
There is no separate multiplier `k`. It is redundant with `budget_fraction`: both
reduce effective concurrency by the same amount. A single parameter is easier to
calibrate. `budget_fraction = 0.5` (default) reserves half of available RAM for the
OS, MPHF build, pass 2, and estimation error.
`--budget-fraction` is exposed as a CLI flag — the only escape hatch for pathological
cases (extreme repetitive content, unusually long unitigs) that still cause OOM.
### RAM source
`obisys::available_memory_bytes()` — wraps `sysinfo::System::available_memory()`,
falls back to `total / 2` on macOS when the memory compressor returns 0.
---
## Diagnostic report
After the parallel phase, `merge_partition` emits a structured report via `tracing::info!`:
```
─── merge_partitions memory report ───
available RAM : 512.0 GB budget 50% = 256.0 GB
expansion factor — seed: 4.2× final max: 6.1× (mean: 1.8× median: 1.6×)
peak concurrent workers: 42
expansion factor distribution (256 partitions with data):
0.50× – 1.25× │██████████████████████████████ 148
1.25× – 2.00× │████████████████████████ 82
5.50× 6.25× │█ 2
top partitions by actual expansion factor:
partition 221 : 6.10× (232.1 MB unitigs → 48M kmers, reserved at 4.20×)
partition 135 : 5.82× (127.3 MB unitigs → 24M kmers, reserved at 4.20×)
──────────────────────────────────────
```
Fields useful for diagnosis:
| Field | Interpretation |
|---|---|
| `seed` vs `final max` expansion | gap indicates partitions with higher expansion than the worst-by-size |
| `reserved at X×` | the factor used at acquisition; if much lower than actual, the budget was under-reserved for that partition |
| `peak concurrent workers` | effective parallelism achieved under the budget constraint |
| `mean` / `median` expansion | typical dataset characteristic; stable across runs on the same data |
---
## Parameters
| Parameter | Default | CLI flag | Notes |
|---|---|---|---|
| `fallback_expansion` | 4× | — | seed for empty partitions only |
| `budget_fraction` | 0.5 | `--budget-fraction` | reduce if OOM persists |
| RAM source | `obisys::available_memory_bytes()` | — | falls back to `total/2` on macOS |
+1
View File
@@ -49,6 +49,7 @@ nav:
- PersistentCompactIntVec: implementation/persistent_compact_int_vec.md
- PersistentBitVec: implementation/persistent_bit_vec.md
- Merge command: implementation/merge.md
- Merge parallelism & memory: implementation/merge_parallelism.md
- Kmer filtering: implementation/filtering.md
- Select command: implementation/select.md
- Architecture:
+21
View File
@@ -54,6 +54,14 @@ impl ColumnarCompactIntMatrix {
Array1::from_vec(sums)
}
pub(crate) fn count_nonzero(&self) -> Array1<u64> {
let counts: Vec<u64> = (0..self.n_cols())
.into_par_iter()
.map(|c| self.col(c).count_nonzero())
.collect();
Array1::from_vec(counts)
}
pub(crate) fn partial_bray_dist_matrix(&self) -> Array2<u64> {
self.pairwise_u64(|i, j| self.col(i).partial_bray_dist(self.col(j)))
}
@@ -234,6 +242,14 @@ impl PackedCompactIntMatrix {
)
}
pub(crate) fn count_nonzero(&self) -> Array1<u64> {
Array1::from_vec(
(0..self.n_cols).into_par_iter()
.map(|c| (0..self.n_rows).filter(|&s| self.get(c, s) > 0).count() as u64)
.collect()
)
}
// ── Pair primitives ───────────────────────────────────────────────────────
fn pair_partial_bray(&self, i: usize, j: usize) -> u64 {
@@ -421,6 +437,10 @@ impl PersistentCompactIntMatrix {
match self { Self::Columnar(m) => m.sum(), Self::Packed(m) => m.sum() }
}
pub fn count_nonzero(&self) -> Array1<u64> {
match self { Self::Columnar(m) => m.count_nonzero(), Self::Packed(m) => m.count_nonzero() }
}
pub fn partial_bray_dist_matrix(&self) -> Array2<u64> {
match self { Self::Columnar(m) => m.partial_bray_dist_matrix(), Self::Packed(m) => m.partial_bray_dist_matrix() }
}
@@ -451,6 +471,7 @@ use crate::traits::{ColumnWeights, CountPartials};
impl ColumnWeights for PersistentCompactIntMatrix {
fn col_weights(&self) -> Array1<u64> { self.sum() }
fn partial_kmer_counts(&self) -> Array1<u64> { self.count_nonzero() }
}
impl CountPartials for PersistentCompactIntMatrix {
+5 -1
View File
@@ -133,11 +133,15 @@ impl PersistentCompactIntVec {
}
#[inline]
/// Returns the sum of all values in the compact int vector.
pub fn sum(&self) -> u64 {
self.iter().map(|v| v as u64).sum()
}
#[inline]
pub fn count_nonzero(&self) -> u64 {
self.iter().filter(|&v| v > 0).count() as u64
}
#[inline]
/// Returns the Bray-Curtis distance between two compact int vectors.
pub fn bray_dist(&self, other: &PersistentCompactIntVec) -> f64 {
+8
View File
@@ -2,8 +2,16 @@ use ndarray::{Array1, Array2};
/// Column-level weight statistic — total count or presence count per column.
/// Additive across layers and partitions; used as denominator in normalised distances.
///
/// `partial_kmer_counts` returns the number of **distinct k-mers** present per
/// column (presence = 1 entries; count > 0 entries). For presence matrices this
/// equals `col_weights`; for count matrices it differs (count_nonzero vs sum).
pub trait ColumnWeights: Send + Sync {
fn col_weights(&self) -> Array1<u64>;
fn partial_kmer_counts(&self) -> Array1<u64> {
self.col_weights()
}
}
/// Partial distance matrices for count-based data (`PersistentCompactIntMatrix`).
+225 -62
View File
@@ -2,7 +2,10 @@ use std::collections::HashMap;
use std::fs;
use std::io;
use std::path::Path;
use obisys::{Reporter, Stage, progress_bar, spinner};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use obisys::{MemoryBudget, Reporter, Stage, available_memory_bytes, progress_bar, spinner};
use rayon::prelude::*;
use tracing::info;
@@ -15,23 +18,26 @@ use crate::state::IndexState;
pub use obikpartitionner::MergeMode;
// ── per-partition diagnostic record ──────────────────────────────────────────
#[derive(Debug)]
struct PartStat {
id: usize,
unitig_bytes: u64, // sum of unitigs.bin across remaining sources
g_len: usize, // actual new kmers inserted into GraphDeBruijn
exp_at_acquire: f64, // expansion factor used to size the budget reservation
}
// ── main merge entry point ────────────────────────────────────────────────────
impl KmerIndex {
/// Merge `sources` into a new index at `output`.
///
/// All sources must be in `Indexed` state and share the same `kmer_size`,
/// `minimizer_size`, and `n_partitions`. Count mode additionally requires
/// every source to have `with_counts = true`.
///
/// Genome labels must be unique across all sources. If `rename_duplicates`
/// is true, repeated labels are disambiguated by appending `.1`, `.2`, …
/// to the second and subsequent occurrences. Otherwise a
/// `DuplicateGenomeLabel` error is returned on the first conflict.
pub fn merge<P: AsRef<Path>>(
output: P,
sources: &[&KmerIndex],
mode: MergeMode,
force: bool,
rename_duplicates: bool,
budget_fraction: f64,
rep: &mut Reporter,
) -> OKIResult<Self> {
let output = output.as_ref();
@@ -98,7 +104,7 @@ impl KmerIndex {
let sources: &[&KmerIndex] = &ordered;
let evidence = sources[0].meta.config.evidence.clone();
// ── Compute final genome labels (rename duplicates if requested) ───────
// ── Compute final genome labels ────────────────────────────────────────
let (source_labels, all_genomes) = compute_labels(sources, rename_duplicates)?;
// ── Prepare output directory ──────────────────────────────────────────
@@ -125,23 +131,19 @@ impl KmerIndex {
pb.set_message("copying index …");
copy_dir_all(&sources[0].root_path, output)?;
// Rewrite index.meta with final genome labels and the effective mode.
let mut meta = IndexMeta::read(output).map_err(OKIError::Io)?;
meta.genomes = all_genomes;
meta.config.with_counts = mode == MergeMode::Count;
meta.config.evidence = evidence.clone();
meta.write(output)?;
// In presence/absence mode, purge counts/ directories inherited from
// source_0 — they are stale data from the source's count index.
if mode == MergeMode::Presence {
remove_dirs_named(output, "counts")?;
}
pb.finish_and_clear();
rep.push(t.stop());
// Rebuild spectrums/ from all sources using the (possibly renamed) labels.
// Drop the spectrums/ that were copied from source_0 and rebuild from scratch.
// ── Rebuild spectrums ─────────────────────────────────────────────────
info!("rebuilding spectrums for {} source(s)", sources.len());
let t = Stage::start("spectrums");
let pb = spinner("spectrums");
@@ -157,12 +159,12 @@ impl KmerIndex {
pb.finish_and_clear();
rep.push(t.stop());
// Open the destination index.
// ── Open destination ──────────────────────────────────────────────────
let dst = KmerIndex::open(output)?;
let n_partitions = dst.n_partitions();
let n_dst_genomes = sources[0].meta.genomes.len();
// ── Merge each subsequent source partition-by-partition ───────────────
// ── Merge partitions ──────────────────────────────────────────────────
let remaining_sources: Vec<&KmerIndex> = sources[1..].to_vec();
if !remaining_sources.is_empty() {
let n_src_genomes: usize = remaining_sources.iter().map(|s| s.meta.genomes.len()).sum();
@@ -176,22 +178,118 @@ impl KmerIndex {
let dst_partition = &dst.partition;
let block_bits = dst.meta.config.block_bits;
let errors: Vec<obiskio::SKError> = (0..n_partitions)
.into_par_iter()
.filter_map(|i| {
let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> =
remaining_sources.iter().map(|s| (&s.partition, s.meta.genomes.len())).collect();
let result = dst_partition.merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence).err();
// Pre-build source list once (avoid rebuilding per partition)
let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> = remaining_sources
.iter()
.map(|s| (&s.partition, s.meta.genomes.len()))
.collect();
// Per-partition unitig byte sizes across remaining sources (stat() only)
let partition_sizes: Vec<u64> = (0..n_partitions)
.map(|i| remaining_sources.iter()
.map(|s| partition_unitig_bytes(s, i))
.sum())
.collect();
// LFD sort: largest partition first
let mut order: Vec<usize> = (0..n_partitions).collect();
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
// ── Sequential pilot: worst partition → seed expansion factor ─────
const FALLBACK_EXPANSION: u64 = 4_000; // 4× in fixed-point ×1000
let worst_id = order[0];
let worst_bytes = partition_sizes[worst_id];
let worst_g_len = dst_partition
.merge_partition(worst_id, &srcs, mode, n_dst_genomes, block_bits, &evidence)
.map_err(OKIError::Partition)?;
pb.inc(1);
result
let seed_expansion = if worst_bytes > 0 {
worst_g_len as u64 * 16 * 1000 / worst_bytes
} else {
FALLBACK_EXPANSION
};
info!(
"merge_partitions: pilot partition {} — {} unitig bytes → {} new kmers, \
expansion {:.2}×",
worst_id, worst_bytes, worst_g_len,
seed_expansion as f64 / 1000.0,
);
let part_stats: Arc<Mutex<Vec<PartStat>>> = Arc::new(Mutex::new({
let mut v = Vec::with_capacity(n_partitions);
v.push(PartStat {
id: worst_id,
unitig_bytes: worst_bytes,
g_len: worst_g_len,
exp_at_acquire: seed_expansion as f64 / 1000.0,
});
v
}));
let max_expansion = AtomicU64::new(seed_expansion);
// ── Parallel remainder under memory budget ────────────────────────
let available = available_memory_bytes();
let budget_bytes = (available as f64 * budget_fraction) as u64;
let budget = Arc::new(MemoryBudget::new(budget_bytes));
info!(
"merge_partitions: available RAM {}, budget {:.0}% = {}",
fmt_bytes(available),
budget_fraction * 100.0,
fmt_bytes(budget_bytes),
);
let errors: Vec<OKIError> = order[1..].into_par_iter()
.filter_map(|&i| {
let ubytes = partition_sizes[i];
let exp = max_expansion.load(Ordering::Relaxed);
let cost = ubytes * exp / 1000;
budget.acquire(cost);
let result = dst_partition
.merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence);
budget.release(cost);
pb.inc(1);
match result {
Ok(g_len) => {
if ubytes > 0 {
let actual = g_len as u64 * 16 * 1000 / ubytes;
max_expansion.fetch_max(actual, Ordering::Relaxed);
}
part_stats.lock().unwrap().push(PartStat {
id: i,
unitig_bytes: ubytes,
g_len,
exp_at_acquire: exp as f64 / 1000.0,
});
None
}
Err(e) => Some(OKIError::Partition(e)),
}
})
.collect();
pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::Partition(e));
return Err(e);
}
// ── Diagnostic report ─────────────────────────────────────────────
let stats = Arc::try_unwrap(part_stats).unwrap().into_inner().unwrap();
print_merge_partition_report(
&stats,
available,
budget_fraction,
seed_expansion as f64 / 1000.0,
max_expansion.load(Ordering::Relaxed) as f64 / 1000.0,
budget.peak_active(),
);
rep.push(t.stop());
}
@@ -206,19 +304,110 @@ impl KmerIndex {
rep.push(t.stop());
}
// Re-open to get the updated state.
KmerIndex::open(output)
}
}
// ── Helpers ───────────────────────────────────────────────────────────────────
// ── Diagnostic report ─────────────────────────────────────────────────────────
fn print_merge_partition_report(
stats: &[PartStat],
available_ram: u64,
budget_fraction: f64,
seed_expansion: f64,
final_expansion: f64,
peak_active: usize,
) {
// Compute actual expansion per partition (skip empty partitions)
let expansions: Vec<(usize, f64)> = stats
.iter()
.filter(|s| s.unitig_bytes > 0)
.map(|s| (s.id, s.g_len as f64 * 16.0 / s.unitig_bytes as f64))
.collect();
if expansions.is_empty() {
info!("merge_partitions report: no data (all partitions empty)");
return;
}
let mut sorted_exp: Vec<f64> = expansions.iter().map(|(_, e)| *e).collect();
sorted_exp.sort_by(|a, b| a.partial_cmp(b).unwrap());
let n = sorted_exp.len();
let mean_exp = sorted_exp.iter().sum::<f64>() / n as f64;
let median_exp = sorted_exp[n / 2];
let max_exp = sorted_exp[n - 1];
info!("─── merge_partitions memory report ───");
info!(
" available RAM : {} budget {:.0}% = {}",
fmt_bytes(available_ram),
budget_fraction * 100.0,
fmt_bytes((available_ram as f64 * budget_fraction) as u64),
);
info!(
" expansion factor — seed: {:.2}× final max: {:.2}× \
(mean: {:.2}× median: {:.2}× observed max: {:.2}×)",
seed_expansion, final_expansion, mean_exp, median_exp, max_exp,
);
info!(" peak concurrent workers: {}", peak_active);
// Histogram of actual expansion factors
let min_e = sorted_exp[0];
let max_e = sorted_exp[n - 1];
let n_buckets = 8usize;
let bucket_w = (max_e - min_e).max(0.01) / n_buckets as f64;
let mut counts = vec![0usize; n_buckets];
for &e in &sorted_exp {
let b = (((e - min_e) / bucket_w) as usize).min(n_buckets - 1);
counts[b] += 1;
}
let max_count = *counts.iter().max().unwrap();
info!(" expansion factor distribution ({} partitions with data):", n);
for (i, &c) in counts.iter().enumerate() {
let lo = min_e + i as f64 * bucket_w;
let hi = min_e + (i + 1) as f64 * bucket_w;
let bar = "".repeat(if max_count > 0 { c * 30 / max_count } else { 0 });
info!(" {:5.2}× {:5.2}× │{:<30} {}", lo, hi, bar, c);
}
// Top 8 by actual expansion
let mut by_exp: Vec<(usize, f64)> = expansions.clone();
by_exp.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
info!(" top partitions by actual expansion factor:");
for (id, exp) in by_exp.iter().take(8) {
let s = stats.iter().find(|s| s.id == *id).unwrap();
info!(
" partition {:4} : {:.2}× ({} unitigs → {}M kmers, \
reserved at {:.2}×)",
id, exp,
fmt_bytes(s.unitig_bytes),
s.g_len / 1_000_000,
s.exp_at_acquire,
);
}
info!("──────────────────────────────────────");
}
// ── helpers ───────────────────────────────────────────────────────────────────
fn fmt_bytes(b: u64) -> String {
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
else if b >= 1 << 20 { format!("{:.1} MB", b as f64 / (1u64 << 20) as f64) }
else if b >= 1 << 10 { format!("{:.1} KB", b as f64 / (1u64 << 10) as f64) }
else { format!("{b} B") }
}
/// Sum of all unitigs.bin sizes across all layers of partition `i` in `src`.
fn partition_unitig_bytes(src: &KmerIndex, i: usize) -> u64 {
let mut total = 0u64;
for l in 0.. {
let p = src.layer_unitigs_path(i, l);
if !p.exists() { break; }
if let Ok(m) = std::fs::metadata(&p) { total += m.len(); }
}
total
}
/// Compute the final genome label lists for all sources.
///
/// Returns `(per_source_labels, all_genomes_flat)`.
/// The first occurrence of a label keeps the original name. Subsequent
/// occurrences receive `.1`, `.2`, … suffixes when `rename_duplicates` is true,
/// or trigger a `DuplicateGenomeLabel` error otherwise.
fn compute_labels(
sources: &[&KmerIndex],
rename_duplicates: bool,
@@ -249,8 +438,6 @@ fn compute_labels(
Ok((source_labels, all_genomes))
}
/// Copy spectrum JSON files from `src_root/spectrums/` to `dst_root/spectrums/`,
/// mapping each `old_labels[i]` filename to `new_labels[i]`.
fn copy_spectrums(
src_root: &Path,
dst_root: &Path,
@@ -269,7 +456,6 @@ fn copy_spectrums(
Ok(())
}
/// Recursively remove every directory named `name` under `root`.
fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> {
for entry in fs::read_dir(root)? {
let entry = entry?;
@@ -285,7 +471,6 @@ fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> {
Ok(())
}
fn format_evidence(ev: &IndexMode) -> String {
match ev {
IndexMode::Exact => "exact".to_string(),
@@ -294,37 +479,15 @@ fn format_evidence(ev: &IndexMode) -> String {
}
}
/// A source is "trivial" if its presence/count values carry no approximation:
/// single-genome presence index (SetMembership — all values are 1 by construction).
fn is_trivial(src: &KmerIndex, mode: MergeMode) -> bool {
src.meta.genomes.len() == 1 && mode == MergeMode::Presence
}
/// Sum of all `unitigs.bin` sizes across every partition and layer.
/// Used as a proxy for the number of indexed smers.
fn index_unitig_size(src: &KmerIndex) -> u64 {
let n = src.partition.n_partitions();
let mut total = 0u64;
for i in 0..n {
let index_dir = src.partition.part_dir(i).join("index");
let mut l = 0usize;
loop {
let p = index_dir.join(format!("layer_{l}")).join("unitigs.bin");
if !p.exists() { break; }
if let Ok(m) = std::fs::metadata(&p) { total += m.len(); }
l += 1;
}
}
total
(0..n).map(|i| partition_unitig_bytes(src, i)).sum()
}
/// Choose the index to use as bootstrap base.
///
/// Rule — mieux-disant: if any non-trivial source uses approximate evidence
/// (Approx or Hybrid), the output must also be approximate; the base must
/// therefore come from an approximate source so its layers carry the right
/// evidence files. Among qualifying candidates, the largest (by unitig size)
/// is chosen to minimise the number of new smers in the merge layer.
fn choose_base(sources: &[&KmerIndex], mode: MergeMode) -> usize {
let needs_approx = sources.iter().any(|src| {
!is_trivial(src, mode)
+66 -1
View File
@@ -1,7 +1,8 @@
use std::fs;
use std::path::Path;
use obicompactvec::LayerMeta;
use obicompactvec::{LayerMeta, PersistentBitMatrix, PersistentCompactIntMatrix};
use obicompactvec::traits::ColumnWeights;
use obilayeredmap::meta::PartitionMeta;
use rayon::prelude::*;
@@ -124,4 +125,68 @@ impl KmerIndex {
total: bpk(mphf_b + evidence_b + matrix_b),
})
}
/// Return `(total_distinct_kmers, per_genome_kmer_counts)`.
///
/// For each genome, the count is the number of distinct k-mers for which
/// that genome has a non-zero value (presence = 1, count > 0).
/// Partitions are scanned in parallel; results are summed across partitions.
pub fn genome_kmer_counts(&self) -> OKIResult<(usize, Vec<u64>)> {
let n = self.n_partitions();
let n_genomes = self.meta.genomes.len();
let partials: Vec<(usize, Vec<u64>)> = (0..n)
.into_par_iter()
.map(|i| {
let mut counts = vec![0u64; n_genomes];
let mut n_kmers = 0usize;
let index_dir = self.partition.part_dir(i).join("index");
if !index_dir.exists() { return (0, counts); }
let n_layers = PartitionMeta::load(&index_dir)
.map(|m| m.n_layers)
.unwrap_or(0);
for l in 0..n_layers {
let layer_dir = index_dir.join(format!("layer_{l}"));
if !layer_dir.exists() { continue; }
n_kmers += LayerMeta::load(&layer_dir).map(|m| m.n).unwrap_or(0);
let mat: Box<dyn ColumnWeights> =
if layer_dir.join("counts").exists()
&& !layer_dir.join("presence").exists()
{
match PersistentCompactIntMatrix::open(&layer_dir) {
Ok(m) => Box::new(m),
Err(_) => continue,
}
} else {
match PersistentBitMatrix::open(&layer_dir) {
Ok(m) => Box::new(m),
Err(_) => continue,
}
};
let col_counts = mat.partial_kmer_counts();
for (c, &v) in col_counts.iter().enumerate() {
if c < n_genomes { counts[c] += v; }
}
}
(n_kmers, counts)
})
.collect();
let total_kmers: usize = partials.iter().map(|(n, _)| n).sum();
let mut total_counts = vec![0u64; n_genomes];
for (_, counts) in partials {
for (i, v) in counts.into_iter().enumerate() {
total_counts[i] += v;
}
}
Ok((total_kmers, total_counts))
}
}
+6 -1
View File
@@ -26,6 +26,11 @@ pub struct MergeArgs {
/// Disambiguate duplicate genome labels by appending .1, .2, … instead of erroring
#[arg(long, default_value_t = false)]
pub rename_duplicates: bool,
/// Fraction of available RAM reserved as memory budget for parallel partition merging.
/// Reduce if OOM occurs despite the adaptive scheduler (e.g. --budget-fraction 0.3).
#[arg(long, default_value_t = 0.5)]
pub budget_fraction: f64,
}
pub fn run(args: MergeArgs) {
@@ -60,7 +65,7 @@ pub fn run(args: MergeArgs) {
);
let mut rep = Reporter::new();
KmerIndex::merge(&args.output, &source_refs, mode, args.force, args.rename_duplicates, &mut rep).unwrap_or_else(|e| {
KmerIndex::merge(&args.output, &source_refs, mode, args.force, args.rename_duplicates, args.budget_fraction, &mut rep).unwrap_or_else(|e| {
eprintln!("error merging: {e}");
std::process::exit(1);
});
+268 -13
View File
@@ -1,3 +1,4 @@
use std::io::{self, Write};
use std::path::PathBuf;
use clap::Args;
@@ -6,20 +7,33 @@ use tracing::info;
#[derive(Args)]
pub struct UtilsArgs {
/// Index directory to operate on
pub index: PathBuf,
/// Index directories to operate on (one or more)
#[arg(required = true, num_args = 1..)]
pub indexes: Vec<PathBuf>,
/// Set a new genome label: NEW_LABEL=OLD_LABEL
/// Set a new genome label: NEW_LABEL=OLD_LABEL (single-index only)
#[arg(long, value_name = "NEW=OLD")]
pub new_label: Option<String>,
/// Add missing layer_meta.json files to each layer (required after upgrading from old indexes)
/// Add missing layer_meta.json files to each layer (single-index only)
#[arg(long)]
pub upgrade_index: bool,
/// Print bits-per-kmer statistics (MPHF, evidence, matrix, total)
/// Print bits-per-kmer statistics (single-index only)
#[arg(long)]
pub bits_per_kmer: bool,
/// Print per-genome k-mer counts as CSV (single-index only)
#[arg(long)]
pub stats: bool,
/// Print partition size distribution report (accepts multiple indexes)
#[arg(long)]
pub partition_stats: bool,
/// Write per-(partition, source) raw data as CSV to FILE (used with --partition-stats)
#[arg(long, value_name = "FILE")]
pub csv: Option<PathBuf>,
}
pub fn run(args: UtilsArgs) {
@@ -27,25 +41,266 @@ pub fn run(args: UtilsArgs) {
if let Some(spec) = &args.new_label {
any = true;
run_rename(&args.index, spec);
run_rename(single_index(&args), spec);
}
if args.upgrade_index {
any = true;
run_upgrade_index(&args.index);
run_upgrade_index(single_index(&args));
}
if args.bits_per_kmer {
any = true;
run_bits_per_kmer(&args.index);
run_bits_per_kmer(single_index(&args));
}
if args.stats {
any = true;
run_stats(single_index(&args));
}
if args.partition_stats {
any = true;
run_partition_stats(&args.indexes, args.csv.as_deref());
}
if !any {
eprintln!("utils: no operation specified. Available options: --new-label NEW=OLD, --upgrade-index, --bits-per-kmer");
eprintln!(
"utils: no operation specified. \
Available: --new-label, --upgrade-index, --bits-per-kmer, --stats, --partition-stats"
);
std::process::exit(1);
}
}
// ── helpers ───────────────────────────────────────────────────────────────────
fn single_index(args: &UtilsArgs) -> &PathBuf {
if args.indexes.len() > 1 {
eprintln!("utils: this option requires exactly one index (got {})", args.indexes.len());
std::process::exit(1);
}
&args.indexes[0]
}
// ── --partition-stats ─────────────────────────────────────────────────────────
/// Per-partition, per-source byte count of all unitigs.bin files summed across layers.
struct PartRow {
partition: usize,
source: String,
bytes: u64,
}
fn collect_rows(indexes: &[PathBuf]) -> Vec<PartRow> {
let mut rows = Vec::new();
for path in indexes {
let idx = KmerIndex::open(path).unwrap_or_else(|e| {
eprintln!("error opening index {}: {e}", path.display());
std::process::exit(1);
});
let name = path
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_else(|| path.display().to_string());
let n_parts = idx.n_partitions();
for i in 0..n_parts {
let mut bytes = 0u64;
for l in 0.. {
let p = idx.layer_unitigs_path(i, l);
if !p.exists() {
break;
}
if let Ok(m) = std::fs::metadata(&p) {
bytes += m.len();
}
}
rows.push(PartRow { partition: i, source: name.clone(), bytes });
}
}
rows
}
/// Sum bytes per partition across all sources.
fn partition_totals(rows: &[PartRow], n_parts: usize) -> Vec<u64> {
let mut totals = vec![0u64; n_parts];
for r in rows {
totals[r.partition] += r.bytes;
}
totals
}
fn stats_summary(totals: &[u64]) -> (u64, u64, f64, f64, u64, u64, u64) {
let mut sorted = totals.to_vec();
sorted.sort_unstable();
let n = sorted.len();
let min = sorted[0];
let max = sorted[n - 1];
let mean = sorted.iter().sum::<u64>() as f64 / n as f64;
let median = if n % 2 == 0 {
(sorted[n / 2 - 1] + sorted[n / 2]) as f64 / 2.0
} else {
sorted[n / 2] as f64
};
let p95 = sorted[(n as f64 * 0.95) as usize];
let p99 = sorted[(n as f64 * 0.99) as usize];
let variance = sorted
.iter()
.map(|&v| (v as f64 - mean).powi(2))
.sum::<f64>()
/ n as f64;
let std_dev = variance.sqrt();
(min, max, mean, median, p95, p99, std_dev as u64)
}
fn human_bytes(b: u64) -> String {
if b >= 1 << 30 {
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64)
} else if b >= 1 << 20 {
format!("{:.1} MB", b as f64 / (1u64 << 20) as f64)
} else if b >= 1 << 10 {
format!("{:.1} KB", b as f64 / (1u64 << 10) as f64)
} else {
format!("{b} B")
}
}
fn ascii_histogram(totals: &[u64], n_buckets: usize, bar_width: usize) -> String {
let min = *totals.iter().min().unwrap();
let max = *totals.iter().max().unwrap();
if min == max {
return format!(" (all partitions identical: {})\n", human_bytes(min));
}
let bucket_size = (max - min).max(1) as f64 / n_buckets as f64;
let mut counts = vec![0usize; n_buckets];
for &v in totals {
let b = (((v - min) as f64 / bucket_size) as usize).min(n_buckets - 1);
counts[b] += 1;
}
let max_count = *counts.iter().max().unwrap();
let mut out = String::new();
for (i, &c) in counts.iter().enumerate() {
let lo = min + (i as f64 * bucket_size) as u64;
let hi = min + ((i + 1) as f64 * bucket_size) as u64;
let bar_len = if max_count > 0 { c * bar_width / max_count } else { 0 };
let bar = "".repeat(bar_len);
out.push_str(&format!(
" {:>8} {:>8}{:<width$} {}\n",
human_bytes(lo),
human_bytes(hi),
bar,
c,
width = bar_width
));
}
out
}
fn run_partition_stats(indexes: &[PathBuf], csv_path: Option<&std::path::Path>) {
let rows = collect_rows(indexes);
if rows.is_empty() {
eprintln!("partition-stats: no data found");
std::process::exit(1);
}
let n_parts = rows.iter().map(|r| r.partition).max().unwrap() + 1;
let totals = partition_totals(&rows, n_parts);
let (min, max, mean, median, p95, p99, std_dev) = stats_summary(&totals);
// outliers: > median + 1.5 × IQR (approximate via > 1.5 × median as fallback)
let mut sorted_t = totals.clone();
sorted_t.sort_unstable();
let q1 = sorted_t[n_parts / 4] as f64;
let q3 = sorted_t[3 * n_parts / 4] as f64;
let iqr = q3 - q1;
let outlier_threshold = q3 + 1.5 * iqr;
let mut out = String::new();
out.push_str("# Partition size report\n\n");
out.push_str(&format!(
"Sources: {} \nPartitions: {} \n\n",
indexes.len(),
n_parts
));
out.push_str("## Summary statistics (total unitigs.bin bytes per partition, sum across sources)\n\n");
out.push_str("| Stat | Value |\n|---|---|\n");
out.push_str(&format!("| min | {} |\n", human_bytes(min)));
out.push_str(&format!("| max | {} |\n", human_bytes(max)));
out.push_str(&format!("| mean | {} |\n", human_bytes(mean as u64)));
out.push_str(&format!("| median | {} |\n", human_bytes(median as u64)));
out.push_str(&format!("| p95 | {} |\n", human_bytes(p95)));
out.push_str(&format!("| p99 | {} |\n", human_bytes(p99)));
out.push_str(&format!("| std | {} |\n", human_bytes(std_dev)));
out.push_str(&format!("| max/median ratio | {:.2}× |\n\n", max as f64 / median));
out.push_str("## Histogram\n\n```\n");
out.push_str(&ascii_histogram(&totals, 30, 40));
out.push_str("```\n\n");
let outliers: Vec<(usize, u64)> = totals
.iter()
.enumerate()
.filter(|(_, v)| **v as f64 > outlier_threshold)
.map(|(i, v)| (i, *v))
.collect();
if outliers.is_empty() {
out.push_str("## Outliers\n\nNone (threshold: Q3 + 1.5×IQR = ");
out.push_str(&human_bytes(outlier_threshold as u64));
out.push_str(").\n");
} else {
out.push_str(&format!(
"## Outliers (> Q3 + 1.5×IQR = {})\n\n| Partition | Total size | Ratio to median |\n|---|---|---|\n",
human_bytes(outlier_threshold as u64)
));
for (i, v) in &outliers {
out.push_str(&format!(
"| {} | {} | {:.2}× |\n",
i,
human_bytes(*v),
*v as f64 / median
));
}
out.push('\n');
}
print!("{out}");
if let Some(csv_out) = csv_path {
let file = std::fs::File::create(csv_out).unwrap_or_else(|e| {
eprintln!("error creating CSV file {}: {e}", csv_out.display());
std::process::exit(1);
});
let mut w = io::BufWriter::new(file);
writeln!(w, "partition,source,bytes").unwrap();
for r in &rows {
writeln!(w, "{},{},{}", r.partition, r.source, r.bytes).unwrap();
}
eprintln!("CSV written to {}", csv_out.display());
}
}
// ── existing single-index operations ─────────────────────────────────────────
fn run_stats(index_path: &PathBuf) {
let idx = KmerIndex::open(index_path).unwrap_or_else(|e| {
eprintln!("error opening index: {e}");
std::process::exit(1);
});
let (total, per_genome) = idx.genome_kmer_counts().unwrap_or_else(|e| {
eprintln!("error computing stats: {e}");
std::process::exit(1);
});
println!("genome,n_kmers");
for (g, &n) in idx.meta().genomes.iter().zip(per_genome.iter()) {
println!("{},{}", g.label, n);
}
println!("total,{total}");
}
fn run_bits_per_kmer(index_path: &PathBuf) {
let idx = KmerIndex::open(index_path).unwrap_or_else(|e| {
eprintln!("error opening index: {e}");
@@ -59,8 +314,10 @@ fn run_bits_per_kmer(index_path: &PathBuf) {
println!("genomes : {}", stats.n_genomes);
println!("mphf : {:6.2} bits/kmer", stats.mphf);
println!("evidence : {:6.2} bits/kmer", stats.evidence);
println!("matrix : {:6.2} bits/kmer ({:.2} bits/kmer/genome)",
stats.matrix, stats.matrix_per_genome);
println!(
"matrix : {:6.2} bits/kmer ({:.2} bits/kmer/genome)",
stats.matrix, stats.matrix_per_genome
);
println!("total : {:6.2} bits/kmer", stats.total);
}
@@ -99,7 +356,6 @@ fn run_rename(index_path: &PathBuf, spec: &str) {
std::process::exit(1);
});
// Check the new label is not already taken.
if idx.meta().genomes.iter().any(|g| g.label == new_label) {
eprintln!("error: label '{new_label}' already exists in index");
std::process::exit(1);
@@ -111,7 +367,6 @@ fn run_rename(index_path: &PathBuf, spec: &str) {
std::process::exit(1);
});
// Rename the spectrum file if it exists.
let spectrums_dir = index_path.join("spectrums");
let old_spectrum = spectrums_dir.join(format!("{old_label}.json"));
let new_spectrum = spectrums_dir.join(format!("{new_label}.json"));
+3 -3
View File
@@ -166,10 +166,10 @@ impl KmerPartition {
n_dst_genomes: usize,
block_bits: u8,
evidence: &IndexMode,
) -> SKResult<()> {
) -> SKResult<usize> {
let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR);
if !dst_index_dir.exists() {
return Ok(());
return Ok(0);
}
load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open
@@ -381,6 +381,6 @@ impl KmerPartition {
part_meta.save(&dst_index_dir).map_err(olm_to_sk)?;
}
Ok(())
Ok(n_new)
}
}
+55
View File
@@ -1,5 +1,6 @@
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};
use indicatif::{ProgressBar, ProgressStyle};
@@ -309,6 +310,60 @@ fn fmt_efficiency(par: f64, n_cores: usize) -> String {
// ── Display ───────────────────────────────────────────────────────────────────
// ── MemoryBudget ──────────────────────────────────────────────────────────────
struct BudgetInner {
remaining: u64,
active: usize,
peak_active: usize,
}
/// Counting semaphore that limits total concurrent estimated memory usage.
///
/// Each worker acquires a cost (bytes) before starting and releases it on
/// completion. Non-deadlock guarantee: when no worker is active the next
/// acquire always succeeds regardless of cost vs. remaining budget.
pub struct MemoryBudget {
total: u64,
inner: Mutex<BudgetInner>,
condvar: Condvar,
}
impl MemoryBudget {
pub fn new(total: u64) -> Self {
Self {
total,
inner: Mutex::new(BudgetInner { remaining: total, active: 0, peak_active: 0 }),
condvar: Condvar::new(),
}
}
pub fn acquire(&self, cost: u64) {
let mut g = self.inner.lock().unwrap();
loop {
if g.active == 0 || g.remaining >= cost {
g.remaining = g.remaining.saturating_sub(cost);
g.active += 1;
g.peak_active = g.peak_active.max(g.active);
return;
}
g = self.condvar.wait(g).unwrap();
}
}
pub fn release(&self, cost: u64) {
let mut g = self.inner.lock().unwrap();
g.remaining = (g.remaining + cost).min(self.total);
g.active -= 1;
self.condvar.notify_all();
}
pub fn total(&self) -> u64 { self.total }
pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active }
}
// ── Display ───────────────────────────────────────────────────────────────────
impl fmt::Display for Reporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.stages.is_empty() { return Ok(()); }