refactor(merge): extract adaptive worker spawn logic

Centralize inline spawn checks into a `should_spawn_worker` function with adaptive thresholds. The first worker spawns at <95% CPU efficiency, while subsequent workers only trigger if marginal efficiency gain exceeds 25% of the expected `1/n_workers` (minimum 3%). Also increases the spawn poll interval from 10s to 20s.
This commit is contained in:
Eric Coissac
2026-06-13 14:54:23 +02:00
parent 817b02cbc1
commit c4071eb450
+21 -16
View File
@@ -26,6 +26,24 @@ struct PartStat {
g_len: usize, g_len: usize,
} }
// ── adaptive spawn criterion ──────────────────────────────────────────────────
// First worker: spawn if efficiency < SPAWN_THRESHOLD (CPU is underutilised).
// Subsequent workers: spawn only if the last spawn raised efficiency by at
// least the expected marginal gain (1/n_workers), with a minimum floor of 3%
// to avoid spurious spawns when efficiency fluctuates around the threshold.
const SPAWN_THRESHOLD: f64 = 0.95;
const MIN_MARGINAL_GAIN: f64 = 0.03;
fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool {
if n_workers == 1 {
eff < SPAWN_THRESHOLD
} else {
let gain = eff - eff_at_last_spawn;
let expected = 1.0 / n_workers as f64;
gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN)
}
}
// ── main merge entry point ──────────────────────────────────────────────────── // ── main merge entry point ────────────────────────────────────────────────────
impl KmerIndex { impl KmerIndex {
@@ -229,7 +247,6 @@ impl KmerIndex {
// below the spawn threshold and more partitions remain, spawn one // below the spawn threshold and more partitions remain, spawn one
// additional worker. Workers share a crossbeam channel of partition // additional worker. Workers share a crossbeam channel of partition
// IDs; each reports (id, g_len, duration) on a result channel. // IDs; each reports (id, g_len, duration) on a result channel.
const SPAWN_THRESHOLD: f64 = 0.95; // spawn when >5% capacity idle
let n_cores = std::thread::available_parallelism() let n_cores = std::thread::available_parallelism()
.map(|n| n.get()) .map(|n| n.get())
.unwrap_or(1); .unwrap_or(1);
@@ -289,7 +306,7 @@ impl KmerIndex {
activate_tx.send(()).ok(); activate_tx.send(()).ok();
n_workers = 1; n_workers = 1;
const SPAWN_POLL: Duration = Duration::from_secs(10); const SPAWN_POLL: Duration = Duration::from_secs(20);
let mut completed = 0usize; let mut completed = 0usize;
while completed < n_partitions { while completed < n_partitions {
@@ -301,7 +318,7 @@ impl KmerIndex {
Err(crossbeam_channel::RecvTimeoutError::Timeout) => { Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
if n_workers < max_workers { if n_workers < max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores); let eff = cpu_sample.cpu_efficiency(n_cores);
if eff < SPAWN_THRESHOLD { if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
debug!( debug!(
"activated worker {} (poll) — efficiency {:.0}%", "activated worker {} (poll) — efficiency {:.0}%",
n_workers + 1, n_workers + 1,
@@ -338,19 +355,7 @@ impl KmerIndex {
if n_workers < max_workers && completed < n_partitions { if n_workers < max_workers && completed < n_partitions {
let eff = cpu_sample.cpu_efficiency(n_cores); let eff = cpu_sample.cpu_efficiency(n_cores);
// For the first spawn use SPAWN_THRESHOLD. if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
// For subsequent spawns: the previous worker should
// have raised efficiency by at least a quarter of the expected
// marginal gain (1/n_workers). If not, adding another
// worker won't help.
let should_spawn = if n_workers == 1 {
eff < SPAWN_THRESHOLD
} else {
let gain = eff - efficiency_at_last_spawn;
let expected = 1.0 / n_workers as f64;
gain >= expected * 0.25
};
if should_spawn {
debug!( debug!(
"activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%", "activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%",
n_workers + 1, n_workers + 1,