Merge pull request 'refactor(merge): extract adaptive worker spawn logic' (#28) from push-yzruqtyqvopm into main
Reviewed-on: #28
This commit was merged in pull request #28.
This commit is contained in:
+21
-16
@@ -26,6 +26,24 @@ struct PartStat {
|
||||
g_len: usize,
|
||||
}
|
||||
|
||||
// ── adaptive spawn criterion ──────────────────────────────────────────────────
|
||||
// First worker: spawn if efficiency < SPAWN_THRESHOLD (CPU is underutilised).
|
||||
// Subsequent workers: spawn only if the last spawn raised efficiency by at
|
||||
// least the expected marginal gain (1/n_workers), with a minimum floor of 3%
|
||||
// to avoid spurious spawns when efficiency fluctuates around the threshold.
|
||||
const SPAWN_THRESHOLD: f64 = 0.95;
|
||||
const MIN_MARGINAL_GAIN: f64 = 0.03;
|
||||
|
||||
fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool {
|
||||
if n_workers == 1 {
|
||||
eff < SPAWN_THRESHOLD
|
||||
} else {
|
||||
let gain = eff - eff_at_last_spawn;
|
||||
let expected = 1.0 / n_workers as f64;
|
||||
gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN)
|
||||
}
|
||||
}
|
||||
|
||||
// ── main merge entry point ────────────────────────────────────────────────────
|
||||
|
||||
impl KmerIndex {
|
||||
@@ -229,7 +247,6 @@ impl KmerIndex {
|
||||
// below the spawn threshold and more partitions remain, spawn one
|
||||
// additional worker. Workers share a crossbeam channel of partition
|
||||
// IDs; each reports (id, g_len, duration) on a result channel.
|
||||
const SPAWN_THRESHOLD: f64 = 0.95; // spawn when >5% capacity idle
|
||||
let n_cores = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1);
|
||||
@@ -289,7 +306,7 @@ impl KmerIndex {
|
||||
activate_tx.send(()).ok();
|
||||
n_workers = 1;
|
||||
|
||||
const SPAWN_POLL: Duration = Duration::from_secs(10);
|
||||
const SPAWN_POLL: Duration = Duration::from_secs(20);
|
||||
|
||||
let mut completed = 0usize;
|
||||
while completed < n_partitions {
|
||||
@@ -301,7 +318,7 @@ impl KmerIndex {
|
||||
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
|
||||
if n_workers < max_workers {
|
||||
let eff = cpu_sample.cpu_efficiency(n_cores);
|
||||
if eff < SPAWN_THRESHOLD {
|
||||
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
|
||||
debug!(
|
||||
"activated worker {} (poll) — efficiency {:.0}%",
|
||||
n_workers + 1,
|
||||
@@ -338,19 +355,7 @@ impl KmerIndex {
|
||||
|
||||
if n_workers < max_workers && completed < n_partitions {
|
||||
let eff = cpu_sample.cpu_efficiency(n_cores);
|
||||
// For the first spawn use SPAWN_THRESHOLD.
|
||||
// For subsequent spawns: the previous worker should
|
||||
// have raised efficiency by at least a quarter of the expected
|
||||
// marginal gain (1/n_workers). If not, adding another
|
||||
// worker won't help.
|
||||
let should_spawn = if n_workers == 1 {
|
||||
eff < SPAWN_THRESHOLD
|
||||
} else {
|
||||
let gain = eff - efficiency_at_last_spawn;
|
||||
let expected = 1.0 / n_workers as f64;
|
||||
gain >= expected * 0.25
|
||||
};
|
||||
if should_spawn {
|
||||
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
|
||||
debug!(
|
||||
"activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%",
|
||||
n_workers + 1,
|
||||
|
||||
Reference in New Issue
Block a user