refactor: implement adaptive worker scaling and infallible NUMA build
Replaces the fallible NUMA topology builder with an infallible fallback that synthesizes a single-node UMA configuration on failure or absence. Refactors PartitionRunner to pre-spawn dormant workers and dynamically activate them via CPU efficiency thresholds, replacing static upfront spawning with adaptive scaling. Bumps obikmer crate version to 1.1.15.
This commit is contained in:
Generated
+1
-1
@@ -1704,7 +1704,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "obikmer"
|
name = "obikmer"
|
||||||
version = "1.1.14"
|
version = "1.1.15"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"clap",
|
"clap",
|
||||||
"csv",
|
"csv",
|
||||||
|
|||||||
+180
-70
@@ -5,10 +5,8 @@
|
|||||||
// CPUs. Linux first-touch policy then places graph allocations in local DRAM
|
// CPUs. Linux first-touch policy then places graph allocations in local DRAM
|
||||||
// automatically — no explicit memory binding needed.
|
// automatically — no explicit memory binding needed.
|
||||||
//
|
//
|
||||||
// Returns None when:
|
// UMA systems (single socket, Apple Silicon, etc.) are the degenerate case:
|
||||||
// - hwloc topology initialisation fails
|
// one synthetic node containing all cores, no pool, no pinning.
|
||||||
// - the system has only one NUMA node (UMA, Apple Silicon, single-socket)
|
|
||||||
// - any per-node pool fails to build
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
@@ -18,32 +16,32 @@ use hwlocality::Topology;
|
|||||||
use hwlocality::cpu::binding::CpuBindingFlags;
|
use hwlocality::cpu::binding::CpuBindingFlags;
|
||||||
use hwlocality::cpu::cpuset::CpuSet;
|
use hwlocality::cpu::cpuset::CpuSet;
|
||||||
use hwlocality::object::types::ObjectType;
|
use hwlocality::object::types::ObjectType;
|
||||||
|
use obisys::CpuSample;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
// ── Public interface ──────────────────────────────────────────────────────────
|
// ── Public interface ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
pub struct NumaSetup {
|
pub struct NumaSetup {
|
||||||
pub pools: Vec<Arc<rayon::ThreadPool>>,
|
/// One entry per NUMA node. `None` on UMA systems (no pool, no pinning).
|
||||||
|
pub pools: Vec<Option<Arc<rayon::ThreadPool>>>,
|
||||||
/// CPU indices for each NUMA node, in node order.
|
/// CPU indices for each NUMA node, in node order.
|
||||||
pub cpus_per_node: Vec<Vec<usize>>,
|
pub cpus_per_node: Vec<Vec<usize>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NumaSetup {
|
impl NumaSetup {
|
||||||
/// Workers to activate per NUMA node.
|
/// Maximum worker slots per node (one per physical core in the node).
|
||||||
/// Empirically ~3 workers saturate one node's memory bandwidth.
|
|
||||||
pub fn workers_per_node(&self) -> usize {
|
pub fn workers_per_node(&self) -> usize {
|
||||||
self.cpus_per_node
|
self.cpus_per_node
|
||||||
.first()
|
.first()
|
||||||
.map(|c| (c.len() / 8).max(3).min(8))
|
.map(|c| c.len().max(1))
|
||||||
.unwrap_or(3)
|
.unwrap_or(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Detect NUMA topology and build per-node Rayon pools.
|
/// Detect NUMA topology and build per-node Rayon pools.
|
||||||
/// Returns None on UMA systems, single-node machines, or on failure.
|
/// Always succeeds: falls back to a single synthetic UMA node on failure.
|
||||||
pub fn build() -> Option<NumaSetup> {
|
pub fn build() -> NumaSetup {
|
||||||
let topology = Topology::new().ok()?;
|
if let Ok(topology) = Topology::new() {
|
||||||
|
|
||||||
let nodes: Vec<Vec<usize>> = topology
|
let nodes: Vec<Vec<usize>> = topology
|
||||||
.objects_with_type(ObjectType::NUMANode)
|
.objects_with_type(ObjectType::NUMANode)
|
||||||
.filter_map(|obj| obj.cpuset())
|
.filter_map(|obj| obj.cpuset())
|
||||||
@@ -56,22 +54,31 @@ pub fn build() -> Option<NumaSetup> {
|
|||||||
.filter(|v| !v.is_empty())
|
.filter(|v| !v.is_empty())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
if nodes.len() <= 1 {
|
if nodes.len() > 1 {
|
||||||
return None;
|
if let Some(pools) = nodes
|
||||||
}
|
.iter()
|
||||||
|
.map(|cpus| build_pool(cpus).map(|p| Some(Arc::new(p))))
|
||||||
|
.collect::<Option<Vec<_>>>()
|
||||||
|
{
|
||||||
debug!(
|
debug!(
|
||||||
"NUMA topology: {} node(s), {} core(s)/node",
|
"NUMA topology: {} node(s), {} core(s)/node",
|
||||||
nodes.len(),
|
nodes.len(),
|
||||||
nodes.first().map_or(0, |v| v.len()),
|
nodes.first().map_or(0, |v| v.len()),
|
||||||
);
|
);
|
||||||
|
return NumaSetup { pools, cpus_per_node: nodes };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let pools = nodes
|
// UMA fallback: single synthetic node, all cores, no pool, no pinning.
|
||||||
.iter()
|
let n_cores = std::thread::available_parallelism()
|
||||||
.map(|cpus| build_pool(cpus).map(Arc::new))
|
.map(|n| n.get())
|
||||||
.collect::<Option<Vec<_>>>()?;
|
.unwrap_or(1);
|
||||||
|
debug!("UMA: single synthetic node, {} core(s)", n_cores);
|
||||||
Some(NumaSetup { pools, cpus_per_node: nodes })
|
NumaSetup {
|
||||||
|
pools: vec![None],
|
||||||
|
cpus_per_node: vec![(0..n_cores).collect()],
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bind the calling thread to `cpu_indices` using hwloc.
|
/// Bind the calling thread to `cpu_indices` using hwloc.
|
||||||
@@ -114,18 +121,19 @@ struct NodeConfig {
|
|||||||
/// Generic NUMA-aware runner for partition-level parallel work.
|
/// Generic NUMA-aware runner for partition-level parallel work.
|
||||||
///
|
///
|
||||||
/// Workers are distributed round-robin across NUMA nodes and pinned to their
|
/// Workers are distributed round-robin across NUMA nodes and pinned to their
|
||||||
/// node's CPUs. UMA systems are the degenerate case: one node, no pinning.
|
/// node's CPUs. UMA is the degenerate case: one node, no pinning.
|
||||||
|
///
|
||||||
|
/// Workers are pre-spawned dormant and activated one by one as CPU efficiency
|
||||||
|
/// falls below `SPAWN_THRESHOLD`. This avoids over-provisioning on I/O-bound
|
||||||
|
/// or memory-bandwidth-bound workloads while saturating CPU-bound ones.
|
||||||
///
|
///
|
||||||
/// # Termination
|
/// # Termination
|
||||||
///
|
///
|
||||||
/// Termination is driven entirely by channel closure:
|
|
||||||
///
|
|
||||||
/// ```text
|
/// ```text
|
||||||
/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
|
/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
|
||||||
/// drop(result_tx) → result_rx closes → controller loop exits
|
/// drop(result_tx) → result_rx closes → controller loop exits
|
||||||
|
/// drop(activate_tx) → dormant workers exit cleanly
|
||||||
/// ```
|
/// ```
|
||||||
///
|
|
||||||
/// No explicit counter or sentinel needed.
|
|
||||||
pub struct PartitionRunner {
|
pub struct PartitionRunner {
|
||||||
nodes: Vec<NodeConfig>,
|
nodes: Vec<NodeConfig>,
|
||||||
}
|
}
|
||||||
@@ -136,50 +144,38 @@ impl PartitionRunner {
|
|||||||
self.nodes.iter().map(|n| n.max_workers).sum()
|
self.nodes.iter().map(|n| n.max_workers).sum()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Detect topology and build. Falls back to a single-node UMA runner on
|
/// Detect topology and build. Always succeeds.
|
||||||
/// macOS, single-socket machines, or hwloc failure.
|
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
match build() {
|
let ns = build();
|
||||||
Some(ns) => {
|
|
||||||
let wpn = ns.workers_per_node();
|
let wpn = ns.workers_per_node();
|
||||||
debug!(
|
debug!(
|
||||||
"PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node",
|
"PartitionRunner: {} node(s) × {} worker(s)/node max",
|
||||||
ns.pools.len(), wpn,
|
ns.pools.len(),
|
||||||
|
wpn,
|
||||||
);
|
);
|
||||||
let nodes = ns.pools
|
let nodes = ns.pools
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.zip(ns.cpus_per_node)
|
.zip(ns.cpus_per_node)
|
||||||
.map(|(pool, cpu_ids)| NodeConfig {
|
.map(|(pool, cpu_ids)| NodeConfig {
|
||||||
pool: Some(pool),
|
pool,
|
||||||
cpu_ids,
|
cpu_ids,
|
||||||
max_workers: wpn,
|
max_workers: wpn,
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
Self { nodes }
|
Self { nodes }
|
||||||
}
|
}
|
||||||
None => {
|
|
||||||
let n_cores = std::thread::available_parallelism()
|
|
||||||
.map(|n| n.get())
|
|
||||||
.unwrap_or(1);
|
|
||||||
let max_workers = (n_cores / 2).max(1);
|
|
||||||
debug!("PartitionRunner: UMA mode — {} worker(s)", max_workers);
|
|
||||||
Self {
|
|
||||||
nodes: vec![NodeConfig {
|
|
||||||
pool: None,
|
|
||||||
cpu_ids: vec![],
|
|
||||||
max_workers,
|
|
||||||
}],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Run `f(i)` for every index in `order`.
|
/// Run `f(i)` for every index in `order`.
|
||||||
///
|
///
|
||||||
/// Workers are spawned upfront and distributed round-robin across NUMA
|
/// Workers are pre-spawned dormant and activated adaptively. A timer thread
|
||||||
/// nodes. `on_done(i, result, elapsed)` is called from the controller
|
/// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed
|
||||||
/// thread as each partition completes — suitable for progress bars and
|
/// partition resets that timer (forcing an immediate check) and also
|
||||||
/// result aggregation.
|
/// triggers its own inline check. A new worker is activated whenever
|
||||||
|
/// efficiency falls below `SPAWN_THRESHOLD`.
|
||||||
|
///
|
||||||
|
/// `on_done(i, result, elapsed)` is called from the controller thread as
|
||||||
|
/// each partition completes — suitable for progress bars and result
|
||||||
|
/// aggregation.
|
||||||
///
|
///
|
||||||
/// Returns the first error produced by `f`, if any.
|
/// Returns the first error produced by `f`, if any.
|
||||||
pub fn run<F, R, E, C>(
|
pub fn run<F, R, E, C>(
|
||||||
@@ -194,28 +190,65 @@ impl PartitionRunner {
|
|||||||
E: Send,
|
E: Send,
|
||||||
C: FnMut(usize, R, Duration) + Send,
|
C: FnMut(usize, R, Duration) + Send,
|
||||||
{
|
{
|
||||||
// Pre-load the work queue, then drop the sender so workers' part_rx
|
let n_total = order.len();
|
||||||
// iterators terminate when the queue is drained.
|
if n_total == 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
const SPAWN_THRESHOLD: f64 = 0.95;
|
||||||
|
const TIMER_SECS: u64 = 30;
|
||||||
|
|
||||||
|
let n_cores = std::thread::available_parallelism()
|
||||||
|
.map(|n| n.get())
|
||||||
|
.unwrap_or(1);
|
||||||
|
|
||||||
|
// ── Channels ──────────────────────────────────────────────────────────
|
||||||
let (part_tx, part_rx) = unbounded::<usize>();
|
let (part_tx, part_rx) = unbounded::<usize>();
|
||||||
|
let (activate_tx, activate_rx) = unbounded::<()>();
|
||||||
|
// reset_tx: controller → timer ("reset the 30 s window")
|
||||||
|
let (reset_tx, reset_rx) = unbounded::<()>();
|
||||||
|
// event_tx: workers + timer → controller (unified event stream)
|
||||||
|
let (event_tx, event_rx) = unbounded::<WorkerEvent<R, E>>();
|
||||||
|
|
||||||
for &i in order { part_tx.send(i).ok(); }
|
for &i in order { part_tx.send(i).ok(); }
|
||||||
drop(part_tx);
|
drop(part_tx);
|
||||||
|
|
||||||
let (result_tx, result_rx) = unbounded::<(usize, Result<R, E>, Duration)>();
|
let max_workers = self.max_workers();
|
||||||
let n_nodes = self.nodes.len();
|
let n_nodes = self.nodes.len();
|
||||||
let f = &f; // shared borrow; F: Sync so concurrent calls are safe
|
let f = &f;
|
||||||
|
|
||||||
let mut first_err: Option<E> = None;
|
let mut first_err: Option<E> = None;
|
||||||
|
|
||||||
std::thread::scope(|s| {
|
std::thread::scope(|s| {
|
||||||
// Spawn all workers upfront, round-robin across NUMA nodes.
|
// ── Timer thread ──────────────────────────────────────────────────
|
||||||
for w in 0..self.max_workers() {
|
// Sends TimerTick every TIMER_SECS seconds. Resets its window each
|
||||||
|
// time reset_rx receives a message (i.e. on partition completion).
|
||||||
|
let timer_tx = event_tx.clone();
|
||||||
|
s.spawn(move || {
|
||||||
|
let period = Duration::from_secs(TIMER_SECS);
|
||||||
|
loop {
|
||||||
|
crossbeam_channel::select! {
|
||||||
|
recv(reset_rx) -> r => {
|
||||||
|
if r.is_err() { break; } // reset_tx dropped → exit
|
||||||
|
}
|
||||||
|
default(period) => {
|
||||||
|
if timer_tx.send(WorkerEvent::TimerTick).is_err() { break; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Pre-spawn workers dormant, round-robin across NUMA nodes ──────
|
||||||
|
for w in 0..max_workers {
|
||||||
let node = &self.nodes[w % n_nodes];
|
let node = &self.nodes[w % n_nodes];
|
||||||
let prx = part_rx.clone();
|
let prx = part_rx.clone();
|
||||||
let rtx = result_tx.clone();
|
let etx = event_tx.clone();
|
||||||
|
let arx = activate_rx.clone();
|
||||||
let pool = node.pool.clone();
|
let pool = node.pool.clone();
|
||||||
let cpu_ids = &node.cpu_ids;
|
let cpu_ids = &node.cpu_ids;
|
||||||
|
|
||||||
s.spawn(move || {
|
s.spawn(move || {
|
||||||
|
if arx.recv().is_err() { return; }
|
||||||
if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); }
|
if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); }
|
||||||
for i in &prx {
|
for i in &prx {
|
||||||
let t = Instant::now();
|
let t = Instant::now();
|
||||||
@@ -223,24 +256,53 @@ impl PartitionRunner {
|
|||||||
Some(p) => p.install(|| f(i)),
|
Some(p) => p.install(|| f(i)),
|
||||||
None => f(i),
|
None => f(i),
|
||||||
};
|
};
|
||||||
rtx.send((i, r, t.elapsed())).ok();
|
etx.send(WorkerEvent::Completed(i, r, t.elapsed())).ok();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
// Drop controller's event_tx: event_rx closes when all workers +
|
||||||
|
// timer have exited.
|
||||||
|
drop(event_tx);
|
||||||
|
|
||||||
// Drop the controller's sender: result_rx closes once all worker
|
// ── Controller ────────────────────────────────────────────────────
|
||||||
// rtx clones are dropped (i.e. all workers have exited).
|
activate_tx.send(()).ok();
|
||||||
drop(result_tx);
|
let mut n_active = 1usize;
|
||||||
|
let mut cpu_sample = CpuSample::now();
|
||||||
|
let mut eff_at_last_spawn = 0.0f64; // 0 = no previous spawn to evaluate
|
||||||
|
let mut completed = 0usize;
|
||||||
|
|
||||||
// Drain results concurrently with workers. The for loop exits
|
while completed < n_total {
|
||||||
// when result_rx is disconnected — at that point all workers are
|
let Ok(event) = event_rx.recv() else { break };
|
||||||
// done and the scope join below is instantaneous.
|
match event {
|
||||||
for (i, r, dur) in &result_rx {
|
WorkerEvent::Completed(i, r, dur) => {
|
||||||
match r {
|
match r {
|
||||||
Ok(v) => on_done(i, v, dur),
|
Ok(v) => on_done(i, v, dur),
|
||||||
Err(e) => { if first_err.is_none() { first_err = Some(e); } }
|
Err(e) => { if first_err.is_none() { first_err = Some(e); } }
|
||||||
}
|
}
|
||||||
|
completed += 1;
|
||||||
|
// Reset the 30 s timer.
|
||||||
|
reset_tx.send(()).ok();
|
||||||
|
// Inline check: same logic as a timer tick.
|
||||||
|
maybe_activate(
|
||||||
|
&activate_tx, &mut n_active, max_workers,
|
||||||
|
&mut cpu_sample, &mut eff_at_last_spawn,
|
||||||
|
n_cores, SPAWN_THRESHOLD, completed, n_total,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
WorkerEvent::TimerTick => {
|
||||||
|
maybe_activate(
|
||||||
|
&activate_tx, &mut n_active, max_workers,
|
||||||
|
&mut cpu_sample, &mut eff_at_last_spawn,
|
||||||
|
n_cores, SPAWN_THRESHOLD, completed, n_total,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dormant workers exit when activate_tx closes.
|
||||||
|
drop(activate_tx);
|
||||||
|
// Timer thread exits when reset_tx closes.
|
||||||
|
drop(reset_tx);
|
||||||
});
|
});
|
||||||
|
|
||||||
match first_err {
|
match first_err {
|
||||||
@@ -249,3 +311,51 @@ impl PartitionRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Internal event type ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
enum WorkerEvent<R, E> {
|
||||||
|
Completed(usize, Result<R, E>, Duration),
|
||||||
|
TimerTick,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn maybe_activate(
|
||||||
|
activate_tx: &crossbeam_channel::Sender<()>,
|
||||||
|
n_active: &mut usize,
|
||||||
|
max_workers: usize,
|
||||||
|
cpu_sample: &mut CpuSample,
|
||||||
|
eff_at_last_spawn: &mut f64,
|
||||||
|
n_cores: usize,
|
||||||
|
threshold: f64,
|
||||||
|
completed: usize,
|
||||||
|
n_total: usize,
|
||||||
|
) {
|
||||||
|
if *n_active >= max_workers || completed >= n_total { return; }
|
||||||
|
|
||||||
|
let eff = cpu_sample.cpu_efficiency(n_cores);
|
||||||
|
if eff >= threshold { return; } // CPU already saturated
|
||||||
|
|
||||||
|
// Check that the previous activation was beneficial enough.
|
||||||
|
// Going from k-1 → k workers, the minimum acceptable speedup is (k-1+0.2)/(k-1).
|
||||||
|
// For the very first extra worker (n_active == 1, no previous spawn), skip this
|
||||||
|
// check: eff_at_last_spawn == 0 acts as the sentinel.
|
||||||
|
let last_spawn_was_beneficial = if *eff_at_last_spawn < 1e-9 {
|
||||||
|
true // first additional worker: no prior data to evaluate
|
||||||
|
} else {
|
||||||
|
let k_before = (*n_active - 1) as f64;
|
||||||
|
let min_speedup = (k_before + 0.2) / k_before;
|
||||||
|
let actual_speedup = eff / *eff_at_last_spawn;
|
||||||
|
actual_speedup >= min_speedup
|
||||||
|
};
|
||||||
|
|
||||||
|
if last_spawn_was_beneficial {
|
||||||
|
activate_tx.send(()).ok();
|
||||||
|
*eff_at_last_spawn = eff;
|
||||||
|
*n_active += 1;
|
||||||
|
*cpu_sample = CpuSample::now();
|
||||||
|
debug!(
|
||||||
|
"activated worker {}/{} — efficiency {:.0}%",
|
||||||
|
n_active, max_workers, eff * 100.0,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "obikmer"
|
name = "obikmer"
|
||||||
version = "1.1.14"
|
version = "1.1.15"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[[bin]]
|
[[bin]]
|
||||||
|
|||||||
Reference in New Issue
Block a user