From 4a64718fd114145841a53a71155cc3a2f45f7b31 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 15 Jun 2026 11:30:46 +0200 Subject: [PATCH] perf: replace partition processing with adaptive NUMA worker pool Replaces the previous partition processing logic with an adaptive, NUMA-aware multi-threaded worker pool that dynamically scales active threads based on real-time CPU efficiency. Introduces pre-spawned, CPU-pinned threads managed via crossbeam channels and Rayon to optimize memory bandwidth and core utilization. Adds a `max_workers()` accessor to aggregate maximum worker capacity across NUMA nodes and updates diagnostics to report active versus maximum worker counts. --- src/obikindex/src/merge.rs | 206 +++---------------------------------- src/obikindex/src/numa.rs | 5 + 2 files changed, 19 insertions(+), 192 deletions(-) diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index be17ce6..c637c9b 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -2,10 +2,8 @@ use std::collections::HashMap; use std::fs; use std::io; use std::path::Path; -use std::time::{Duration, Instant}; -use crossbeam_channel::unbounded; -use obisys::{CpuSample, Reporter, Stage, progress_bar, spinner}; +use obisys::{Reporter, Stage, progress_bar, spinner}; use tracing::{debug, info}; use obilayeredmap::IndexMode; @@ -26,24 +24,6 @@ struct PartStat { g_len: usize, } -// ── adaptive spawn criterion ────────────────────────────────────────────────── -// First worker: spawn if efficiency < SPAWN_THRESHOLD (CPU is underutilised). -// Subsequent workers: spawn only if the last spawn raised efficiency by at -// least the expected marginal gain (1/n_workers), with a minimum floor of 3% -// to avoid spurious spawns when efficiency fluctuates around the threshold. -const SPAWN_THRESHOLD: f64 = 0.95; -const MIN_MARGINAL_GAIN: f64 = 0.03; - -fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool { - if n_workers == 1 { - eff < SPAWN_THRESHOLD - } else { - let gain = eff - eff_at_last_spawn; - let expected = 1.0 / n_workers as f64; - gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN) - } -} - // ── main merge entry point ──────────────────────────────────────────────────── impl KmerIndex { @@ -241,191 +221,33 @@ impl KmerIndex { let mut order: Vec = (0..n_partitions).collect(); order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i])); - // ── Adaptive worker pool ────────────────────────────────────────── - // Default (non-NUMA): start with 1 worker, grow adaptively up to - // n_cores/2 based on CPU efficiency. - // - // NUMA mode (Linux, multi-node): one pinned Rayon ThreadPool per - // NUMA node, workers_per_node workers per node, all pre-activated. - // No adaptive spawn: the optimal count is fixed by memory bandwidth. - let n_cores = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - let max_workers = (n_cores / 2).max(1); let _ = budget_fraction; // kept in signature for CLI compatibility - let numa = crate::numa::build(); - - // effective_max_workers: slots to pre-spawn. - // numa_all_active: whether to activate all slots immediately. - let (effective_max_workers, numa_all_active) = match &numa { - Some(ns) => (ns.pools.len() * ns.workers_per_node(), true), - None => (max_workers, false), - }; - - let (part_tx, part_rx) = unbounded::(); - let (result_tx, result_rx) = - unbounded::<(usize, Result, Duration)>(); - // activate_tx: controller sends () to wake the next dormant worker. - // Dropping activate_tx closes the channel; dormant workers exit. - let (activate_tx, activate_rx) = unbounded::<()>(); - - for &i in &order { - part_tx.send(i).ok(); - } - drop(part_tx); - - let mut part_stats: Vec = Vec::with_capacity(n_partitions); - let mut n_workers = 0usize; - let mut cpu_sample = CpuSample::now(); - // Efficiency measured just before each spawn, used to assess - // whether the previous worker delivered its expected marginal gain. - let mut efficiency_at_last_spawn = 0.0f64; - // Shadow as references so closures can capture them by copy. let srcs = &srcs; let evidence = &evidence; - if let Some(ns) = &numa { - debug!( - "NUMA mode: {} node(s) × {} worker(s)/node = {} total workers", - ns.pools.len(), - ns.workers_per_node(), - effective_max_workers, - ); - } + let runner = crate::numa::PartitionRunner::new(); + let mut part_stats: Vec = Vec::with_capacity(n_partitions); - std::thread::scope(|s| -> OKIResult<()> { - // Pre-spawn threads. In NUMA mode each thread is pinned to its - // node's CPUs and wraps merge_partition in pool.install() so - // that all Rayon calls use the node-local ThreadPool, and - // Linux first-touch places graph allocations in local DRAM. - for worker_idx in 0..effective_max_workers { - let prx = part_rx.clone(); - let rtx = result_tx.clone(); - let arx = activate_rx.clone(); - - // Per-worker NUMA config: (pool, cpus) for this slot. - let numa_config: Option<(std::sync::Arc, Vec)> = - numa.as_ref().map(|ns| { - let wpn = ns.workers_per_node(); - let node = worker_idx / wpn; - ( - std::sync::Arc::clone(&ns.pools[node]), - ns.cpus_per_node[node].clone(), - ) - }); - - s.spawn(move || { - if let Some((_, ref cpus)) = numa_config { - crate::numa::pin_current_thread(cpus); - } - if arx.recv().is_ok() { - for i in &prx { - let t = Instant::now(); - let r = if let Some((ref pool, _)) = numa_config { - pool.install(|| { - dst_partition.merge_partition( - i, srcs, mode, n_dst_genomes, block_bits, evidence, - ) - }) - } else { - dst_partition.merge_partition( - i, srcs, mode, n_dst_genomes, block_bits, evidence, - ) - }; - rtx.send((i, r, t.elapsed())).ok(); - } - } - }); - } - drop(result_tx); - - if numa_all_active { - // NUMA: activate every worker immediately. - for _ in 0..effective_max_workers { - activate_tx.send(()).ok(); - } - n_workers = effective_max_workers; - } else { - // Non-NUMA: activate first worker, grow adaptively. - activate_tx.send(()).ok(); - n_workers = 1; - } - - const SPAWN_POLL: Duration = Duration::from_secs(20); - - let mut completed = 0usize; - while completed < n_partitions { - let result = result_rx.recv_timeout(SPAWN_POLL); - - let (i, r, dur) = match result { - Ok(v) => v, - Err(crossbeam_channel::RecvTimeoutError::Timeout) => { - if !numa_all_active && n_workers < effective_max_workers { - let eff = cpu_sample.cpu_efficiency(n_cores); - if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) { - debug!( - "activated worker {} (poll) — efficiency {:.0}%", - n_workers + 1, - eff * 100.0, - ); - efficiency_at_last_spawn = eff; - activate_tx.send(()).ok(); - n_workers += 1; - cpu_sample = CpuSample::now(); - } - } - continue; - } - Err(crossbeam_channel::RecvTimeoutError::Disconnected) => { - return Err(OKIError::Io(io::Error::new( - io::ErrorKind::UnexpectedEof, - "worker channel closed", - ))); - } - }; - let g_len = r.map_err(OKIError::Partition)?; + runner.run( + &order, + |i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence), + |i, g_len, dur| { pb.inc(1); debug!( "partition {i}: done in {:.1}s — {} new kmers", dur.as_secs_f64(), - g_len - ); - part_stats.push(PartStat { - id: i, - unitig_bytes: partition_sizes[i], g_len, - }); - completed += 1; - - if !numa_all_active && n_workers < effective_max_workers && completed < n_partitions { - let eff = cpu_sample.cpu_efficiency(n_cores); - if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) { - debug!( - "activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%", - n_workers + 1, - eff * 100.0, - (eff - efficiency_at_last_spawn) * 100.0, - ); - efficiency_at_last_spawn = eff; - activate_tx.send(()).ok(); - n_workers += 1; - cpu_sample = CpuSample::now(); - } - } - } - // Dropping activate_tx signals dormant workers to exit cleanly - // (non-NUMA). In NUMA mode all workers were already activated so - // this drop is just cleanup. - drop(activate_tx); - Ok(()) - })?; + ); + part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len }); + }, + ).map_err(OKIError::Partition)?; pb.finish_and_clear(); // ── Diagnostic report ───────────────────────────────────────────── - print_merge_partition_report(&part_stats, n_workers, effective_max_workers); + print_merge_partition_report(&part_stats, runner.max_workers()); rep.push(t.stop()); } @@ -447,7 +269,7 @@ impl KmerIndex { // ── Diagnostic report ───────────────────────────────────────────────────────── -fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_workers: usize) { +fn print_merge_partition_report(stats: &[PartStat], max_workers: usize) { let total_new: usize = stats.iter().map(|s| s.g_len).sum(); let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count(); @@ -461,7 +283,7 @@ fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_worker " {} partition(s) processed, {} total new kmers", non_empty, total_new, ); - info!(" workers spawned: {n_workers} / {max_workers} (max)",); + info!(" max workers: {max_workers}"); // Top 8 partitions by new-kmer count let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect(); diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs index 2b0332c..dde62b7 100644 --- a/src/obikindex/src/numa.rs +++ b/src/obikindex/src/numa.rs @@ -154,6 +154,11 @@ pub struct PartitionRunner { impl PartitionRunner { /// Detect topology and build. Falls back to a single-node UMA runner on /// macOS, single-socket machines, or hwloc failure. + /// Total number of pre-spawned worker slots across all nodes. + pub fn max_workers(&self) -> usize { + self.nodes.iter().map(|n| n.max_workers).sum() + } + pub fn new() -> Self { let n_cores = std::thread::available_parallelism() .map(|n| n.get())