feat: add memory-aware parallel merge scheduling and CLI flags
Introduces a memory-aware scheduling strategy for parallel partition merging that replaces unbounded concurrency with a First-Fit Decreasing approach gated by a thread-safe `MemoryBudget` semaphore. An adaptive expansion factor, seeded by a sequential pilot run, dynamically caps concurrent workers to prevent hashbrown OOMs. Adds a `--budget-fraction` CLI flag to configure RAM allocation, enhances the CLI to accept multiple indexes, and introduces comprehensive partition diagnostics including memory utilization tracking, concurrency metrics, and statistical summaries with ASCII histograms. Updates documentation and navigation accordingly.
This commit is contained in:
+225
-62
@@ -2,7 +2,10 @@ use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use obisys::{Reporter, Stage, progress_bar, spinner};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use obisys::{MemoryBudget, Reporter, Stage, available_memory_bytes, progress_bar, spinner};
|
||||
use rayon::prelude::*;
|
||||
use tracing::info;
|
||||
|
||||
@@ -15,23 +18,26 @@ use crate::state::IndexState;
|
||||
|
||||
pub use obikpartitionner::MergeMode;
|
||||
|
||||
// ── per-partition diagnostic record ──────────────────────────────────────────
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PartStat {
|
||||
id: usize,
|
||||
unitig_bytes: u64, // sum of unitigs.bin across remaining sources
|
||||
g_len: usize, // actual new kmers inserted into GraphDeBruijn
|
||||
exp_at_acquire: f64, // expansion factor used to size the budget reservation
|
||||
}
|
||||
|
||||
// ── main merge entry point ────────────────────────────────────────────────────
|
||||
|
||||
impl KmerIndex {
|
||||
/// Merge `sources` into a new index at `output`.
|
||||
///
|
||||
/// All sources must be in `Indexed` state and share the same `kmer_size`,
|
||||
/// `minimizer_size`, and `n_partitions`. Count mode additionally requires
|
||||
/// every source to have `with_counts = true`.
|
||||
///
|
||||
/// Genome labels must be unique across all sources. If `rename_duplicates`
|
||||
/// is true, repeated labels are disambiguated by appending `.1`, `.2`, …
|
||||
/// to the second and subsequent occurrences. Otherwise a
|
||||
/// `DuplicateGenomeLabel` error is returned on the first conflict.
|
||||
pub fn merge<P: AsRef<Path>>(
|
||||
output: P,
|
||||
sources: &[&KmerIndex],
|
||||
mode: MergeMode,
|
||||
force: bool,
|
||||
rename_duplicates: bool,
|
||||
budget_fraction: f64,
|
||||
rep: &mut Reporter,
|
||||
) -> OKIResult<Self> {
|
||||
let output = output.as_ref();
|
||||
@@ -98,7 +104,7 @@ impl KmerIndex {
|
||||
let sources: &[&KmerIndex] = &ordered;
|
||||
let evidence = sources[0].meta.config.evidence.clone();
|
||||
|
||||
// ── Compute final genome labels (rename duplicates if requested) ───────
|
||||
// ── Compute final genome labels ────────────────────────────────────────
|
||||
let (source_labels, all_genomes) = compute_labels(sources, rename_duplicates)?;
|
||||
|
||||
// ── Prepare output directory ──────────────────────────────────────────
|
||||
@@ -125,23 +131,19 @@ impl KmerIndex {
|
||||
pb.set_message("copying index …");
|
||||
copy_dir_all(&sources[0].root_path, output)?;
|
||||
|
||||
// Rewrite index.meta with final genome labels and the effective mode.
|
||||
let mut meta = IndexMeta::read(output).map_err(OKIError::Io)?;
|
||||
meta.genomes = all_genomes;
|
||||
meta.config.with_counts = mode == MergeMode::Count;
|
||||
meta.config.evidence = evidence.clone();
|
||||
meta.write(output)?;
|
||||
|
||||
// In presence/absence mode, purge counts/ directories inherited from
|
||||
// source_0 — they are stale data from the source's count index.
|
||||
if mode == MergeMode::Presence {
|
||||
remove_dirs_named(output, "counts")?;
|
||||
}
|
||||
pb.finish_and_clear();
|
||||
rep.push(t.stop());
|
||||
|
||||
// Rebuild spectrums/ from all sources using the (possibly renamed) labels.
|
||||
// Drop the spectrums/ that were copied from source_0 and rebuild from scratch.
|
||||
// ── Rebuild spectrums ─────────────────────────────────────────────────
|
||||
info!("rebuilding spectrums for {} source(s)", sources.len());
|
||||
let t = Stage::start("spectrums");
|
||||
let pb = spinner("spectrums");
|
||||
@@ -157,12 +159,12 @@ impl KmerIndex {
|
||||
pb.finish_and_clear();
|
||||
rep.push(t.stop());
|
||||
|
||||
// Open the destination index.
|
||||
// ── Open destination ──────────────────────────────────────────────────
|
||||
let dst = KmerIndex::open(output)?;
|
||||
let n_partitions = dst.n_partitions();
|
||||
let n_dst_genomes = sources[0].meta.genomes.len();
|
||||
|
||||
// ── Merge each subsequent source partition-by-partition ───────────────
|
||||
// ── Merge partitions ──────────────────────────────────────────────────
|
||||
let remaining_sources: Vec<&KmerIndex> = sources[1..].to_vec();
|
||||
if !remaining_sources.is_empty() {
|
||||
let n_src_genomes: usize = remaining_sources.iter().map(|s| s.meta.genomes.len()).sum();
|
||||
@@ -176,22 +178,118 @@ impl KmerIndex {
|
||||
let dst_partition = &dst.partition;
|
||||
let block_bits = dst.meta.config.block_bits;
|
||||
|
||||
let errors: Vec<obiskio::SKError> = (0..n_partitions)
|
||||
.into_par_iter()
|
||||
.filter_map(|i| {
|
||||
let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> =
|
||||
remaining_sources.iter().map(|s| (&s.partition, s.meta.genomes.len())).collect();
|
||||
let result = dst_partition.merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence).err();
|
||||
// Pre-build source list once (avoid rebuilding per partition)
|
||||
let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> = remaining_sources
|
||||
.iter()
|
||||
.map(|s| (&s.partition, s.meta.genomes.len()))
|
||||
.collect();
|
||||
|
||||
// Per-partition unitig byte sizes across remaining sources (stat() only)
|
||||
let partition_sizes: Vec<u64> = (0..n_partitions)
|
||||
.map(|i| remaining_sources.iter()
|
||||
.map(|s| partition_unitig_bytes(s, i))
|
||||
.sum())
|
||||
.collect();
|
||||
|
||||
// LFD sort: largest partition first
|
||||
let mut order: Vec<usize> = (0..n_partitions).collect();
|
||||
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
|
||||
|
||||
// ── Sequential pilot: worst partition → seed expansion factor ─────
|
||||
const FALLBACK_EXPANSION: u64 = 4_000; // 4× in fixed-point ×1000
|
||||
let worst_id = order[0];
|
||||
let worst_bytes = partition_sizes[worst_id];
|
||||
|
||||
let worst_g_len = dst_partition
|
||||
.merge_partition(worst_id, &srcs, mode, n_dst_genomes, block_bits, &evidence)
|
||||
.map_err(OKIError::Partition)?;
|
||||
pb.inc(1);
|
||||
|
||||
let seed_expansion = if worst_bytes > 0 {
|
||||
worst_g_len as u64 * 16 * 1000 / worst_bytes
|
||||
} else {
|
||||
FALLBACK_EXPANSION
|
||||
};
|
||||
|
||||
info!(
|
||||
"merge_partitions: pilot partition {} — {} unitig bytes → {} new kmers, \
|
||||
expansion {:.2}×",
|
||||
worst_id, worst_bytes, worst_g_len,
|
||||
seed_expansion as f64 / 1000.0,
|
||||
);
|
||||
|
||||
let part_stats: Arc<Mutex<Vec<PartStat>>> = Arc::new(Mutex::new({
|
||||
let mut v = Vec::with_capacity(n_partitions);
|
||||
v.push(PartStat {
|
||||
id: worst_id,
|
||||
unitig_bytes: worst_bytes,
|
||||
g_len: worst_g_len,
|
||||
exp_at_acquire: seed_expansion as f64 / 1000.0,
|
||||
});
|
||||
v
|
||||
}));
|
||||
|
||||
let max_expansion = AtomicU64::new(seed_expansion);
|
||||
|
||||
// ── Parallel remainder under memory budget ────────────────────────
|
||||
let available = available_memory_bytes();
|
||||
let budget_bytes = (available as f64 * budget_fraction) as u64;
|
||||
let budget = Arc::new(MemoryBudget::new(budget_bytes));
|
||||
|
||||
info!(
|
||||
"merge_partitions: available RAM {}, budget {:.0}% = {}",
|
||||
fmt_bytes(available),
|
||||
budget_fraction * 100.0,
|
||||
fmt_bytes(budget_bytes),
|
||||
);
|
||||
|
||||
let errors: Vec<OKIError> = order[1..].into_par_iter()
|
||||
.filter_map(|&i| {
|
||||
let ubytes = partition_sizes[i];
|
||||
let exp = max_expansion.load(Ordering::Relaxed);
|
||||
let cost = ubytes * exp / 1000;
|
||||
|
||||
budget.acquire(cost);
|
||||
let result = dst_partition
|
||||
.merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence);
|
||||
budget.release(cost);
|
||||
pb.inc(1);
|
||||
result
|
||||
|
||||
match result {
|
||||
Ok(g_len) => {
|
||||
if ubytes > 0 {
|
||||
let actual = g_len as u64 * 16 * 1000 / ubytes;
|
||||
max_expansion.fetch_max(actual, Ordering::Relaxed);
|
||||
}
|
||||
part_stats.lock().unwrap().push(PartStat {
|
||||
id: i,
|
||||
unitig_bytes: ubytes,
|
||||
g_len,
|
||||
exp_at_acquire: exp as f64 / 1000.0,
|
||||
});
|
||||
None
|
||||
}
|
||||
Err(e) => Some(OKIError::Partition(e)),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
pb.finish_and_clear();
|
||||
if let Some(e) = errors.into_iter().next() {
|
||||
return Err(OKIError::Partition(e));
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// ── Diagnostic report ─────────────────────────────────────────────
|
||||
let stats = Arc::try_unwrap(part_stats).unwrap().into_inner().unwrap();
|
||||
print_merge_partition_report(
|
||||
&stats,
|
||||
available,
|
||||
budget_fraction,
|
||||
seed_expansion as f64 / 1000.0,
|
||||
max_expansion.load(Ordering::Relaxed) as f64 / 1000.0,
|
||||
budget.peak_active(),
|
||||
);
|
||||
|
||||
rep.push(t.stop());
|
||||
}
|
||||
|
||||
@@ -206,19 +304,110 @@ impl KmerIndex {
|
||||
rep.push(t.stop());
|
||||
}
|
||||
|
||||
// Re-open to get the updated state.
|
||||
KmerIndex::open(output)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
// ── Diagnostic report ─────────────────────────────────────────────────────────
|
||||
|
||||
fn print_merge_partition_report(
|
||||
stats: &[PartStat],
|
||||
available_ram: u64,
|
||||
budget_fraction: f64,
|
||||
seed_expansion: f64,
|
||||
final_expansion: f64,
|
||||
peak_active: usize,
|
||||
) {
|
||||
// Compute actual expansion per partition (skip empty partitions)
|
||||
let expansions: Vec<(usize, f64)> = stats
|
||||
.iter()
|
||||
.filter(|s| s.unitig_bytes > 0)
|
||||
.map(|s| (s.id, s.g_len as f64 * 16.0 / s.unitig_bytes as f64))
|
||||
.collect();
|
||||
|
||||
if expansions.is_empty() {
|
||||
info!("merge_partitions report: no data (all partitions empty)");
|
||||
return;
|
||||
}
|
||||
|
||||
let mut sorted_exp: Vec<f64> = expansions.iter().map(|(_, e)| *e).collect();
|
||||
sorted_exp.sort_by(|a, b| a.partial_cmp(b).unwrap());
|
||||
let n = sorted_exp.len();
|
||||
let mean_exp = sorted_exp.iter().sum::<f64>() / n as f64;
|
||||
let median_exp = sorted_exp[n / 2];
|
||||
let max_exp = sorted_exp[n - 1];
|
||||
|
||||
info!("─── merge_partitions memory report ───");
|
||||
info!(
|
||||
" available RAM : {} budget {:.0}% = {}",
|
||||
fmt_bytes(available_ram),
|
||||
budget_fraction * 100.0,
|
||||
fmt_bytes((available_ram as f64 * budget_fraction) as u64),
|
||||
);
|
||||
info!(
|
||||
" expansion factor — seed: {:.2}× final max: {:.2}× \
|
||||
(mean: {:.2}× median: {:.2}× observed max: {:.2}×)",
|
||||
seed_expansion, final_expansion, mean_exp, median_exp, max_exp,
|
||||
);
|
||||
info!(" peak concurrent workers: {}", peak_active);
|
||||
|
||||
// Histogram of actual expansion factors
|
||||
let min_e = sorted_exp[0];
|
||||
let max_e = sorted_exp[n - 1];
|
||||
let n_buckets = 8usize;
|
||||
let bucket_w = (max_e - min_e).max(0.01) / n_buckets as f64;
|
||||
let mut counts = vec![0usize; n_buckets];
|
||||
for &e in &sorted_exp {
|
||||
let b = (((e - min_e) / bucket_w) as usize).min(n_buckets - 1);
|
||||
counts[b] += 1;
|
||||
}
|
||||
let max_count = *counts.iter().max().unwrap();
|
||||
info!(" expansion factor distribution ({} partitions with data):", n);
|
||||
for (i, &c) in counts.iter().enumerate() {
|
||||
let lo = min_e + i as f64 * bucket_w;
|
||||
let hi = min_e + (i + 1) as f64 * bucket_w;
|
||||
let bar = "█".repeat(if max_count > 0 { c * 30 / max_count } else { 0 });
|
||||
info!(" {:5.2}× – {:5.2}× │{:<30} {}", lo, hi, bar, c);
|
||||
}
|
||||
|
||||
// Top 8 by actual expansion
|
||||
let mut by_exp: Vec<(usize, f64)> = expansions.clone();
|
||||
by_exp.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
|
||||
info!(" top partitions by actual expansion factor:");
|
||||
for (id, exp) in by_exp.iter().take(8) {
|
||||
let s = stats.iter().find(|s| s.id == *id).unwrap();
|
||||
info!(
|
||||
" partition {:4} : {:.2}× ({} unitigs → {}M kmers, \
|
||||
reserved at {:.2}×)",
|
||||
id, exp,
|
||||
fmt_bytes(s.unitig_bytes),
|
||||
s.g_len / 1_000_000,
|
||||
s.exp_at_acquire,
|
||||
);
|
||||
}
|
||||
info!("──────────────────────────────────────");
|
||||
}
|
||||
|
||||
// ── helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
fn fmt_bytes(b: u64) -> String {
|
||||
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
|
||||
else if b >= 1 << 20 { format!("{:.1} MB", b as f64 / (1u64 << 20) as f64) }
|
||||
else if b >= 1 << 10 { format!("{:.1} KB", b as f64 / (1u64 << 10) as f64) }
|
||||
else { format!("{b} B") }
|
||||
}
|
||||
|
||||
/// Sum of all unitigs.bin sizes across all layers of partition `i` in `src`.
|
||||
fn partition_unitig_bytes(src: &KmerIndex, i: usize) -> u64 {
|
||||
let mut total = 0u64;
|
||||
for l in 0.. {
|
||||
let p = src.layer_unitigs_path(i, l);
|
||||
if !p.exists() { break; }
|
||||
if let Ok(m) = std::fs::metadata(&p) { total += m.len(); }
|
||||
}
|
||||
total
|
||||
}
|
||||
|
||||
/// Compute the final genome label lists for all sources.
|
||||
///
|
||||
/// Returns `(per_source_labels, all_genomes_flat)`.
|
||||
/// The first occurrence of a label keeps the original name. Subsequent
|
||||
/// occurrences receive `.1`, `.2`, … suffixes when `rename_duplicates` is true,
|
||||
/// or trigger a `DuplicateGenomeLabel` error otherwise.
|
||||
fn compute_labels(
|
||||
sources: &[&KmerIndex],
|
||||
rename_duplicates: bool,
|
||||
@@ -249,8 +438,6 @@ fn compute_labels(
|
||||
Ok((source_labels, all_genomes))
|
||||
}
|
||||
|
||||
/// Copy spectrum JSON files from `src_root/spectrums/` to `dst_root/spectrums/`,
|
||||
/// mapping each `old_labels[i]` filename to `new_labels[i]`.
|
||||
fn copy_spectrums(
|
||||
src_root: &Path,
|
||||
dst_root: &Path,
|
||||
@@ -269,7 +456,6 @@ fn copy_spectrums(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Recursively remove every directory named `name` under `root`.
|
||||
fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> {
|
||||
for entry in fs::read_dir(root)? {
|
||||
let entry = entry?;
|
||||
@@ -285,7 +471,6 @@ fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
fn format_evidence(ev: &IndexMode) -> String {
|
||||
match ev {
|
||||
IndexMode::Exact => "exact".to_string(),
|
||||
@@ -294,37 +479,15 @@ fn format_evidence(ev: &IndexMode) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
/// A source is "trivial" if its presence/count values carry no approximation:
|
||||
/// single-genome presence index (SetMembership — all values are 1 by construction).
|
||||
fn is_trivial(src: &KmerIndex, mode: MergeMode) -> bool {
|
||||
src.meta.genomes.len() == 1 && mode == MergeMode::Presence
|
||||
}
|
||||
|
||||
/// Sum of all `unitigs.bin` sizes across every partition and layer.
|
||||
/// Used as a proxy for the number of indexed smers.
|
||||
fn index_unitig_size(src: &KmerIndex) -> u64 {
|
||||
let n = src.partition.n_partitions();
|
||||
let mut total = 0u64;
|
||||
for i in 0..n {
|
||||
let index_dir = src.partition.part_dir(i).join("index");
|
||||
let mut l = 0usize;
|
||||
loop {
|
||||
let p = index_dir.join(format!("layer_{l}")).join("unitigs.bin");
|
||||
if !p.exists() { break; }
|
||||
if let Ok(m) = std::fs::metadata(&p) { total += m.len(); }
|
||||
l += 1;
|
||||
}
|
||||
}
|
||||
total
|
||||
(0..n).map(|i| partition_unitig_bytes(src, i)).sum()
|
||||
}
|
||||
|
||||
/// Choose the index to use as bootstrap base.
|
||||
///
|
||||
/// Rule — mieux-disant: if any non-trivial source uses approximate evidence
|
||||
/// (Approx or Hybrid), the output must also be approximate; the base must
|
||||
/// therefore come from an approximate source so its layers carry the right
|
||||
/// evidence files. Among qualifying candidates, the largest (by unitig size)
|
||||
/// is chosen to minimise the number of new smers in the merge layer.
|
||||
fn choose_base(sources: &[&KmerIndex], mode: MergeMode) -> usize {
|
||||
let needs_approx = sources.iter().any(|src| {
|
||||
!is_trivial(src, mode)
|
||||
|
||||
Reference in New Issue
Block a user