feat: add CPU-aware parallel worker pool for partition merging

Introduce CpuSample to measure process-level CPU efficiency and wall-clock time. Use crossbeam-channel to distribute partition merging tasks to a dynamic worker pool that scales based on CPU utilization, capped at half the available cores. Update diagnostics to track pool usage.
This commit is contained in:
Eric Coissac
2026-06-13 11:32:12 +02:00
parent fb8c6e427c
commit bc14346f5f
4 changed files with 123 additions and 34 deletions
+2 -1
View File
@@ -11,7 +11,8 @@ obisys = { path = "../obisys" }
obicompactvec = { path = "../obicompactvec" }
obilayeredmap = { path = "../obilayeredmap" }
ndarray = "0.16"
rayon = "1"
rayon = "1"
crossbeam-channel = "0.5"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
indicatif = "0.17"
+87 -33
View File
@@ -2,8 +2,10 @@ use std::collections::HashMap;
use std::fs;
use std::io;
use std::path::Path;
use std::time::{Duration, Instant};
use obisys::{Reporter, Stage, progress_bar, spinner};
use crossbeam_channel::unbounded;
use obisys::{CpuSample, Reporter, Stage, progress_bar, spinner};
use tracing::{debug, info};
use obilayeredmap::IndexMode;
@@ -191,48 +193,97 @@ impl KmerIndex {
let mut order: Vec<usize> = (0..n_partitions).collect();
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
// ── First partition (largest) ─────────────────────────────────────
let worst_id = order[0];
let worst_bytes = partition_sizes[worst_id];
// ── Adaptive worker pool ──────────────────────────────────────────
// Start with 1 worker thread. After each completed partition,
// measure CPU efficiency (via getrusage delta). If efficiency is
// below the spawn threshold and more partitions remain, spawn one
// additional worker. Workers share a crossbeam channel of partition
// IDs; each reports (id, g_len, duration) on a result channel.
const SPAWN_THRESHOLD: f64 = 0.95; // spawn when >5% capacity idle
let n_cores = std::thread::available_parallelism()
.map(|n| n.get()).unwrap_or(1);
let max_workers = (n_cores / 2).max(1);
let _ = budget_fraction; // kept in signature for CLI compatibility
let worst_g_len = dst_partition
.merge_partition(worst_id, &srcs, mode, n_dst_genomes, block_bits, &evidence)
.map_err(OKIError::Partition)?;
pb.inc(1);
let (part_tx, part_rx) = unbounded::<usize>();
let (result_tx, result_rx) =
unbounded::<(usize, Result<usize, obiskio::SKError>, Duration)>();
// activate_tx: controller sends () to wake the next dormant worker.
// Dropping activate_tx closes the channel; dormant workers exit.
let (activate_tx, activate_rx) = unbounded::<()>();
info!(
"merge_partitions: first partition {} — {} unitig bytes → {} new kmers",
worst_id, fmt_bytes(worst_bytes), worst_g_len,
);
for &i in &order {
part_tx.send(i).ok();
}
drop(part_tx);
let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
part_stats.push(PartStat {
id: worst_id,
unitig_bytes: worst_bytes,
g_len: worst_g_len,
});
let mut n_workers = 0usize;
let mut cpu_sample = CpuSample::now();
// ── Sequential remainder ──────────────────────────────────────────
// One partition at a time; each partition uses an internal pipeline
// (obipipeline) to parallelise file I/O and dst_map filtering.
let _ = budget_fraction; // kept in signature for CLI compatibility
for &i in &order[1..] {
let ubytes = partition_sizes[i];
debug!("partition {i}: start — {} unitig bytes", fmt_bytes(ubytes));
// Shadow as references so closures can capture them by copy.
let srcs = &srcs;
let evidence = &evidence;
let g_len = dst_partition
.merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence)
.map_err(OKIError::Partition)?;
pb.inc(1);
std::thread::scope(|s| -> OKIResult<()> {
// Pre-spawn max_workers threads; each waits for an activation
// signal before consuming from part_rx.
for _ in 0..max_workers {
let prx = part_rx.clone();
let rtx = result_tx.clone();
let arx = activate_rx.clone();
s.spawn(move || {
if arx.recv().is_ok() {
for i in &prx {
let t = Instant::now();
let r = dst_partition.merge_partition(
i, srcs, mode, n_dst_genomes, block_bits, evidence,
);
rtx.send((i, r, t.elapsed())).ok();
}
}
});
}
drop(result_tx);
debug!("partition {i}: done — {} new kmers", g_len);
part_stats.push(PartStat { id: i, unitig_bytes: ubytes, g_len });
}
// Activate first worker immediately.
activate_tx.send(()).ok();
n_workers = 1;
let mut completed = 0usize;
while completed < n_partitions {
let (i, r, dur) = result_rx.recv()
.map_err(|_| OKIError::Io(io::Error::new(
io::ErrorKind::UnexpectedEof, "worker channel closed")))?;
let g_len = r.map_err(OKIError::Partition)?;
pb.inc(1);
debug!("partition {i}: done in {:.1}s — {} new kmers",
dur.as_secs_f64(), g_len);
part_stats.push(PartStat {
id: i, unitig_bytes: partition_sizes[i], g_len,
});
completed += 1;
if n_workers < max_workers && completed < n_partitions {
let eff = cpu_sample.cpu_efficiency(n_cores);
if eff < SPAWN_THRESHOLD {
activate_tx.send(()).ok();
n_workers += 1;
cpu_sample = CpuSample::now();
debug!("activated worker {n_workers} — efficiency {:.0}%",
eff * 100.0);
}
}
}
// Close activate_tx: dormant workers exit cleanly.
drop(activate_tx);
Ok(())
})?;
pb.finish_and_clear();
// ── Diagnostic report ─────────────────────────────────────────────
print_merge_partition_report(&part_stats);
print_merge_partition_report(&part_stats, n_workers, max_workers);
rep.push(t.stop());
}
@@ -254,7 +305,7 @@ impl KmerIndex {
// ── Diagnostic report ─────────────────────────────────────────────────────────
fn print_merge_partition_report(stats: &[PartStat]) {
fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_workers: usize) {
let total_new: usize = stats.iter().map(|s| s.g_len).sum();
let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count();
@@ -268,6 +319,9 @@ fn print_merge_partition_report(stats: &[PartStat]) {
" {} partition(s) processed, {} total new kmers",
non_empty, total_new,
);
info!(
" workers spawned: {n_workers} / {max_workers} (max)",
);
// Top 8 partitions by new-kmer count
let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect();