feat: introduce NUMA-aware PartitionRunner for adaptive parallelism

Replace NUMA-naive Rayon loops and ad-hoc adaptive pools with a unified `PartitionRunner` that manages a NUMA-aware worker pool. The implementation uses pinned Rayon thread pools per node and activates dormant threads based on real-time CPU efficiency metrics. This standardizes partition-level parallelism, optimizes memory locality, and eliminates cross-socket traffic. Includes architecture documentation and updates mkdocs navigation.
This commit is contained in:
Eric Coissac
2026-06-15 11:24:37 +02:00
parent 313d73838a
commit 7a87e911b6
3 changed files with 429 additions and 1 deletions
+249 -1
View File
@@ -10,12 +10,15 @@
// - the system has only one NUMA node (UMA, Apple Silicon, single-socket)
// - any per-node pool fails to build
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crossbeam_channel::{RecvTimeoutError, unbounded};
use hwlocality::Topology;
use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet;
use hwlocality::object::types::ObjectType;
use obisys::CpuSample;
use tracing::debug;
// ── Public interface ──────────────────────────────────────────────────────────
@@ -100,3 +103,248 @@ fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
.build()
.ok()
}
// ── Adaptive spawn heuristic ──────────────────────────────────────────────────
//
// First worker: spawn if CPU efficiency is below SPAWN_THRESHOLD (machine is
// under-utilised). Subsequent workers: spawn only if the last worker raised
// efficiency by at least the expected marginal gain (1/n_workers), with a
// minimum floor to avoid spurious spawns from efficiency fluctuations.
const SPAWN_THRESHOLD: f64 = 0.95;
const MIN_MARGINAL_GAIN: f64 = 0.03;
const SPAWN_POLL: Duration = Duration::from_secs(20);
fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool {
if n_workers == 1 {
eff < SPAWN_THRESHOLD
} else {
let gain = eff - eff_at_last_spawn;
let expected = 1.0 / n_workers as f64;
gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN)
}
}
// ── PartitionRunner ───────────────────────────────────────────────────────────
struct NodeConfig {
pool: Option<Arc<rayon::ThreadPool>>,
cpu_ids: Vec<usize>,
max_workers: usize,
}
/// Generic NUMA-aware runner for partition-level parallel work.
///
/// Encapsulates worker spawning, NUMA pinning, adaptive activation, and result
/// collection. UMA systems are handled as the degenerate case of a single node
/// with no pinning.
///
/// # Model
///
/// One controller thread per NUMA node (one total on UMA). Each controller
/// manages up to `max_workers` dormant workers that drain a shared work queue.
/// Workers are activated one at a time; a new worker is added when global CPU
/// efficiency justifies it. On NUMA all workers are activated immediately
/// (memory bandwidth, not CPU count, is the bottleneck).
pub struct PartitionRunner {
nodes: Vec<NodeConfig>,
n_cores: usize,
}
impl PartitionRunner {
/// Detect topology and build. Falls back to a single-node UMA runner on
/// macOS, single-socket machines, or hwloc failure.
pub fn new() -> Self {
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
match build() {
Some(ns) => {
let wpn = ns.workers_per_node();
debug!(
"PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node",
ns.pools.len(),
wpn,
);
let nodes = ns.pools
.into_iter()
.zip(ns.cpus_per_node)
.map(|(pool, cpu_ids)| NodeConfig {
pool: Some(pool),
cpu_ids,
max_workers: wpn,
})
.collect();
Self { nodes, n_cores }
}
None => {
let max_workers = (n_cores / 2).max(1);
debug!(
"PartitionRunner: UMA mode — adaptive up to {} worker(s)",
max_workers,
);
Self {
nodes: vec![NodeConfig {
pool: None,
cpu_ids: vec![],
max_workers,
}],
n_cores,
}
}
}
}
/// Run `f(i)` for every index in `order`.
///
/// `on_done(i, result, elapsed)` is called under an internal mutex as each
/// partition completes — suitable for progress bars, logging, and result
/// aggregation. No `Send` or `Sync` bound is required on the callback.
///
/// The work queue is shared across all NUMA nodes: any idle worker takes
/// the next available partition regardless of node, ensuring load balance.
///
/// Returns the first error produced by `f`, if any.
pub fn run<F, R, E, C>(
&self,
order: &[usize],
f: F,
on_done: C,
) -> Result<(), E>
where
F: Fn(usize) -> Result<R, E> + Send + Sync,
R: Send,
E: Send,
C: FnMut(usize, R, Duration) + Send,
{
let f = Arc::new(f);
let on_done = Arc::new(Mutex::new(on_done));
let first_err: Arc<Mutex<Option<E>>> = Arc::new(Mutex::new(None));
// Shared work queue — pre-loaded in caller-supplied order.
let (part_tx, part_rx) = unbounded::<usize>();
for &i in order {
part_tx.send(i).ok();
}
drop(part_tx);
let n_cores = self.n_cores;
std::thread::scope(|s| {
for node in &self.nodes {
let f = Arc::clone(&f);
let on_done = Arc::clone(&on_done);
let first_err = Arc::clone(&first_err);
let part_rx = part_rx.clone();
s.spawn(move || {
// Per-node result and activation channels.
let (result_tx, result_rx) =
unbounded::<(usize, Result<R, E>, Duration)>();
let (activate_tx, activate_rx) = unbounded::<()>();
std::thread::scope(|ws| {
// Pre-spawn workers (all dormant until activated).
for _ in 0..node.max_workers {
let prx = part_rx.clone();
let rtx = result_tx.clone();
let arx = activate_rx.clone();
let f = Arc::clone(&f);
let pool = node.pool.clone();
let cpu_ids = node.cpu_ids.clone();
ws.spawn(move || {
if !cpu_ids.is_empty() {
pin_current_thread(&cpu_ids);
}
if arx.recv().is_err() {
return; // never activated — exit cleanly
}
for i in &prx {
let t = Instant::now();
let r = match &pool {
Some(p) => p.install(|| f(i)),
None => f(i),
};
rtx.send((i, r, t.elapsed())).ok();
}
});
}
// Drop the controller's copy: result_rx disconnects
// once all worker copies are also dropped (workers done).
drop(result_tx);
// In NUMA mode activate all workers immediately;
// in UMA mode activate one and grow adaptively.
let numa_mode = node.pool.is_some();
let initial = if numa_mode { node.max_workers } else { 1 };
for _ in 0..initial {
activate_tx.send(()).ok();
}
let mut active_workers = initial;
let mut cpu_sample = CpuSample::now();
let mut eff_at_last_spawn = 0.0f64;
// Controller loop.
loop {
match result_rx.recv_timeout(SPAWN_POLL) {
Ok((i, r, dur)) => {
match r {
Ok(v) => {
on_done.lock().unwrap()(i, v, dur);
}
Err(e) => {
let mut g = first_err.lock().unwrap();
if g.is_none() { *g = Some(e); }
}
}
if !numa_mode && active_workers < node.max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
debug!(
"activated worker {} — efficiency {:.0}%",
active_workers + 1,
eff * 100.0,
);
activate_tx.send(()).ok();
active_workers += 1;
eff_at_last_spawn = eff;
cpu_sample = CpuSample::now();
}
}
}
Err(RecvTimeoutError::Timeout) => {
if !numa_mode && active_workers < node.max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
debug!(
"activated worker {} (poll) — efficiency {:.0}%",
active_workers + 1,
eff * 100.0,
);
activate_tx.send(()).ok();
active_workers += 1;
eff_at_last_spawn = eff;
cpu_sample = CpuSample::now();
}
}
}
Err(RecvTimeoutError::Disconnected) => break,
}
}
// Signal any dormant workers that were never activated
// to exit (UMA mode where max_workers was never reached).
drop(activate_tx);
}); // ws: waits for all workers of this node
});
}
}); // s: waits for all node controllers
let mut g = first_err.lock().unwrap();
match g.take() {
Some(e) => Err(e),
None => Ok(()),
}
}
}