From c4071eb45043b1753419adfda5d8d3ca841e0330 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sat, 13 Jun 2026 14:54:23 +0200 Subject: [PATCH] refactor(merge): extract adaptive worker spawn logic Centralize inline spawn checks into a `should_spawn_worker` function with adaptive thresholds. The first worker spawns at <95% CPU efficiency, while subsequent workers only trigger if marginal efficiency gain exceeds 25% of the expected `1/n_workers` (minimum 3%). Also increases the spawn poll interval from 10s to 20s. --- src/obikindex/src/merge.rs | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index 3dd3f05..3242916 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -26,6 +26,24 @@ struct PartStat { g_len: usize, } +// ── adaptive spawn criterion ────────────────────────────────────────────────── +// First worker: spawn if efficiency < SPAWN_THRESHOLD (CPU is underutilised). +// Subsequent workers: spawn only if the last spawn raised efficiency by at +// least the expected marginal gain (1/n_workers), with a minimum floor of 3% +// to avoid spurious spawns when efficiency fluctuates around the threshold. +const SPAWN_THRESHOLD: f64 = 0.95; +const MIN_MARGINAL_GAIN: f64 = 0.03; + +fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool { + if n_workers == 1 { + eff < SPAWN_THRESHOLD + } else { + let gain = eff - eff_at_last_spawn; + let expected = 1.0 / n_workers as f64; + gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN) + } +} + // ── main merge entry point ──────────────────────────────────────────────────── impl KmerIndex { @@ -229,7 +247,6 @@ impl KmerIndex { // below the spawn threshold and more partitions remain, spawn one // additional worker. Workers share a crossbeam channel of partition // IDs; each reports (id, g_len, duration) on a result channel. - const SPAWN_THRESHOLD: f64 = 0.95; // spawn when >5% capacity idle let n_cores = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1); @@ -289,7 +306,7 @@ impl KmerIndex { activate_tx.send(()).ok(); n_workers = 1; - const SPAWN_POLL: Duration = Duration::from_secs(10); + const SPAWN_POLL: Duration = Duration::from_secs(20); let mut completed = 0usize; while completed < n_partitions { @@ -301,7 +318,7 @@ impl KmerIndex { Err(crossbeam_channel::RecvTimeoutError::Timeout) => { if n_workers < max_workers { let eff = cpu_sample.cpu_efficiency(n_cores); - if eff < SPAWN_THRESHOLD { + if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) { debug!( "activated worker {} (poll) — efficiency {:.0}%", n_workers + 1, @@ -338,19 +355,7 @@ impl KmerIndex { if n_workers < max_workers && completed < n_partitions { let eff = cpu_sample.cpu_efficiency(n_cores); - // For the first spawn use SPAWN_THRESHOLD. - // For subsequent spawns: the previous worker should - // have raised efficiency by at least a quarter of the expected - // marginal gain (1/n_workers). If not, adding another - // worker won't help. - let should_spawn = if n_workers == 1 { - eff < SPAWN_THRESHOLD - } else { - let gain = eff - efficiency_at_last_spawn; - let expected = 1.0 / n_workers as f64; - gain >= expected * 0.25 - }; - if should_spawn { + if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) { debug!( "activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%", n_workers + 1, -- 2.52.0