From 67b4e4da5377d6191ac35f25ad10ce3ffb6900c4 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 3 Jul 2026 12:47:56 +0200 Subject: [PATCH] refactor(numa): replace flat runner with per-node activation channels Shifts the NUMA-aware runner from a flat, round-robin model to a per-node architecture using dedicated `NodeActivation` channels. Replaces absolute deltas with relative scaling based on the previous growth step's worker count, decoupling growth from node count to fix slow ramp-up and enforce per-node fairness. Updates architecture documentation to reflect these changes and focus tuning questions on `INITIAL`/`GROWTH_DIVISOR` parameters for I/O-bound validation. --- docmd/architecture/numa_partition_runner.md | 85 ++++-- src/obikindex/src/numa.rs | 308 ++++++++++++++------ 2 files changed, 285 insertions(+), 108 deletions(-) diff --git a/docmd/architecture/numa_partition_runner.md b/docmd/architecture/numa_partition_runner.md index db24975..06b7765 100644 --- a/docmd/architecture/numa_partition_runner.md +++ b/docmd/architecture/numa_partition_runner.md @@ -205,14 +205,28 @@ unconditionally — no `||` short-circuit — so neither window starves behind whichever signal fires first: ```rust -let cpu_wants_more = cpu_sample.do_i_activate(CPU_SPAWN_THRESHOLD); +let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64; +let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold); let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD); if cpu_wants_more || io_wants_more { - activate_tx.send(()).ok(); - ... + activation.grow(GROWTH_DIVISOR, n_total); } ``` +The CPU threshold is *not* the flat absolute delta it started as: it scales +with `activation.last_step()` — the number of workers activated in the last +growth step, tracked by `NodeActivation` (`numa.rs`) and updated every time +`grow()` actually grows something. Growing by 8 workers should add ~8 cores of +efficiency if the workload is truly CPU-bound; requiring only +`CPU_SPAWN_THRESHOLD` (20 %) of that expected gain confirms the growth was +useful without demanding perfect linear scaling. Scaling by the *last step's +size* rather than the cumulative total keeps the bar equally meaningful +whether it's the 2nd growth step or the 20th — a flat absolute threshold +(0.2 core) is a strong signal at 8 active workers but pure noise at 150; a +threshold scaled by the *cumulative* total instead (considered and rejected) +would have made the bar essentially impossible to clear late in the ramp, +strangling exactly the CPU-bound saturation the mechanism exists to allow. + Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit), raw I/O throughput has no natural scale across devices, so `IoSample` uses a **relative** growth threshold instead of an absolute one: @@ -242,27 +256,64 @@ the ~94-core artefact above) — one fix for both problems, and it removes the need for any arbitrary I/O-rate floor: a short/noisy window is rejected outright rather than papered over with a hardware-dependent constant. -Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, both `0.2`) -are defined as `const` in `PartitionRunner::run` (`numa.rs`). The I/O value is -a starting point, not a derived one — needs empirical validation against a -real `pack` run. +Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, module-level +`const` in `numa.rs`, both `0.2`) are a starting point, not a derived value: +`0.2` (20 % relative growth) for `IoSample` was chosen to match the CPU +threshold's *implicit* relative sensitivity (in the observed log, an 8→9 +worker step raised efficiency by ~12 %) — but I/O throughput is lumpier than +CPU time (buffered writes flush in bursts), so it needs empirical validation +against a real `pack` run before being considered final. -Starting threshold: `0.2` (20 % relative growth) for `IoSample`, same order of -magnitude as the CPU threshold's *implicit* relative sensitivity (in the -observed log, an 8→9 worker step raised efficiency by ~12 %). This is a -starting point, not a derived value — I/O throughput is lumpier than CPU time -(buffered writes flush in bursts), so it needs empirical validation against a -real `pack` run before being considered final. +## Known issue: ramp-up too slow, and confused with node count + +The original design started `n_nodes` workers (one per node) and grew one +worker at a time. On a real `filter` run this took ~10 minutes to climb from +9 to ~40 active workers even on the CPU-bound `rebuild` stage — most of a +35-minute stage spent under-provisioned while waiting for evidence to +accumulate one worker at a time. There is no scale-down mechanism (`n_active` +only grows), so the original caution was deliberate — but a quarter of +available cores is still far from saturation, and the real risk zone (over-provisioning +a memory-bandwidth-bound stage) only shows up much later in the ramp, near +full occupancy — not at 25 %. + +The fix decouples ramp speed from node *count*: both the initial size and the +growth step are a fraction of `workers_per_node` (node *size*), applied +identically on every node. A single-NUMA-node (UMA) machine ramps exactly as +fast as an 8-node one — growing by `n_nodes` per step, as first considered, +would have degenerated to "grow by 1" on UMA, reproducing the original +problem for exactly the machines that need the fix most. + +```rust +// NodeActivation::grow — called both at startup (activate_initial) and on +// every CPU/IO-triggered growth step, with a different divisor each time. +let wanted = (self.caps[idx] / divisor).max(1); // INITIAL_DIVISOR=4 at startup, GROWTH_DIVISOR=8 per step +let room = self.caps[idx].saturating_sub(self.active[idx]); +let grow = wanted.min(room).min(n_total.saturating_sub(self.total)); +``` + +This also fixed a latent correctness gap: the original single shared +`activate_tx`/`activate_rx` pair had *no* per-node addressing — sending one +activation signal woke up whichever dormant worker (from any node) happened +to win the race on that channel. `crossbeam_channel` gives no fairness +guarantee across competing receivers, so "round-robin across nodes" was an +assumption the code never actually enforced. `PartitionRunner::run` now opens +one activation channel per node (`activate_txs`/`activate_rxs`, one pair per +`NodeConfig`); `NodeActivation` (`numa.rs`) tracks how many of each node's +dormant workers have been woken and grows every node by the same amount per +step, capped by that node's remaining dormant workers and by the run's total +budget (`n_total`) — balance across nodes is now guaranteed by construction, +not incidental to channel implementation details. ## Open questions - **Error handling**: `run` currently returns the first error; remaining errors are dropped. A `Vec` return would give complete diagnostics. -- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated - for merge on BeeGFS. Superseded by the I/O signal above for the "more - workers would help despite flat CPU" case — a per-call override may still be - worth keeping as a manual escape hatch. +- **`INITIAL_DIVISOR` / `GROWTH_DIVISOR` tuning**: currently `4` and `8` + (start at 1/4 of a node's cores, grow by 1/8 per step), chosen to fix an + observed too-slow ramp — not yet validated against a real `pack` (I/O-bound) + run, where over-provisioning risk is different from the CPU-bound `rebuild` + case this was tuned against. - **`on_done` ordering**: the runner serialises calls to `on_done` via an internal `Arc>`. `Send` is required (the Arc clone crosses thread diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs index 39f63c0..0a9d243 100644 --- a/src/obikindex/src/numa.rs +++ b/src/obikindex/src/numa.rs @@ -70,7 +70,10 @@ pub fn build() -> NumaSetup { nodes.len(), nodes.first().map_or(0, |v| v.len()), ); - return NumaSetup { pools, cpus_per_node: nodes }; + return NumaSetup { + pools, + cpus_per_node: nodes, + }; } } } @@ -81,7 +84,7 @@ pub fn build() -> NumaSetup { .unwrap_or(1); debug!("UMA: single synthetic node, {} core(s)", n_cores); NumaSetup { - pools: vec![None], + pools: vec![None], cpus_per_node: vec![(0..n_cores).collect()], } } @@ -93,7 +96,7 @@ pub fn build() -> NumaSetup { .unwrap_or(1); debug!("UMA: single synthetic node, {} core(s)", n_cores); NumaSetup { - pools: vec![None], + pools: vec![None], cpus_per_node: vec![(0..n_cores).collect()], } } @@ -102,7 +105,9 @@ pub fn build() -> NumaSetup { /// Silently returns on any error so the thread still runs, just unbound. #[cfg(feature = "numa")] pub fn pin_current_thread(cpu_indices: &[usize]) { - let Ok(topology) = Topology::new() else { return }; + let Ok(topology) = Topology::new() else { + return; + }; let mut cpuset = CpuSet::new(); for &idx in cpu_indices { cpuset.set(idx); @@ -132,29 +137,48 @@ fn build_pool(cpus: &[usize]) -> Option { .ok() } -// ── PartitionRunner ─────────────────────────────────────────────────────────── +// ── PartitionRunner ───────────────────────────────────────────────────────── + +/// Growth step (fraction of a node's worker capacity added per activation +/// event, see [`NodeActivation::grow`]). +const GROWTH_DIVISOR: usize = 8; +/// Minimum CPU efficiency growth to activate more workers, as a fraction of +/// the size of the *last growth step* (e.g. `0.2` after adding 8 workers +/// requires the next check to show at least +1.6 cores of growth — 20 % of +/// the ~8 cores those 8 workers should contribute if the workload is truly +/// CPU-bound). Scaling by the last step's size — not the cumulative total — +/// keeps the bar meaningful regardless of how many workers are already +/// active, instead of demanding an ever-larger absolute jump as the pool +/// grows. +const CPU_SPAWN_THRESHOLD: f64 = 0.2; +/// Minimum I/O throughput growth (relative) to activate more workers. +const IO_SPAWN_THRESHOLD: f64 = 0.2; struct NodeConfig { - pool: Option>, - cpu_ids: Vec, + pool: Option>, + cpu_ids: Vec, max_workers: usize, } /// Generic NUMA-aware runner for partition-level parallel work. /// -/// Workers are distributed round-robin across NUMA nodes and pinned to their +/// Workers are distributed evenly across NUMA nodes and pinned to their /// node's CPUs. UMA is the degenerate case: one node, no pinning. /// -/// Workers are pre-spawned dormant and activated one by one as CPU efficiency -/// falls below `SPAWN_THRESHOLD`. This avoids over-provisioning on I/O-bound -/// or memory-bandwidth-bound workloads while saturating CPU-bound ones. +/// Workers are pre-spawned dormant, one activation channel per node so +/// growth always targets a specific node rather than whichever dormant +/// worker happens to wake up first on a shared channel. Growth (both the +/// initial count and each subsequent step) is expressed as a fraction of +/// `workers_per_node`, applied identically to every node, so the pace of +/// ramp-up depends on node size rather than node count — a single-NUMA-node +/// (UMA) machine ramps just as fast as an 8-node one. /// /// # Termination /// /// ```text -/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx -/// drop(result_tx) → result_rx closes → controller loop exits -/// drop(activate_tx) → dormant workers exit cleanly +/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx +/// drop(result_tx) → result_rx closes → controller loop exits +/// drop(activate_txs) → dormant workers exit cleanly /// ``` pub struct PartitionRunner { nodes: Vec, @@ -175,7 +199,8 @@ impl PartitionRunner { ns.pools.len(), wpn, ); - let nodes = ns.pools + let nodes = ns + .pools .into_iter() .zip(ns.cpus_per_node) .map(|(pool, cpu_ids)| NodeConfig { @@ -189,26 +214,24 @@ impl PartitionRunner { /// Run `f(i)` for every index in `order`. /// - /// Workers are pre-spawned dormant and activated adaptively. A timer thread - /// fires an efficiency check every `TIMER_SECS` seconds; each completed - /// partition resets that timer (forcing an immediate check) and also - /// triggers its own inline check. A new worker is activated whenever CPU - /// efficiency grows by at least `CPU_SPAWN_THRESHOLD` (absolute, in cores) - /// or I/O throughput grows by at least `IO_SPAWN_THRESHOLD` (relative) since - /// the last check — whichever resource is the actual bottleneck still shows - /// headroom. + /// Workers are pre-spawned dormant and activated adaptively, per node: + /// `(workers_per_node / INITIAL_DIVISOR).max(1)` are woken immediately on + /// every node, then `(workers_per_node / GROWTH_DIVISOR).max(1)` more per + /// node each time the check below fires. A timer thread fires that check + /// every `TIMER_SECS` seconds; each completed partition resets that timer + /// (forcing an immediate check) and also triggers its own inline check. A + /// growth step happens whenever CPU efficiency grows by at least + /// `CPU_SPAWN_THRESHOLD` of what the last growth step should have + /// contributed, or I/O throughput grows by at least `IO_SPAWN_THRESHOLD` + /// (relative) since the last check — whichever resource is the actual + /// bottleneck still shows headroom. /// /// `on_done(i, result, elapsed)` is called from the controller thread as /// each partition completes — suitable for progress bars and result /// aggregation. /// /// Returns the first error produced by `f`, if any. - pub fn run( - &self, - order: &[usize], - f: F, - mut on_done: C, - ) -> Result<(), E> + pub fn run(&self, order: &[usize], f: F, mut on_done: C) -> Result<(), E> where F: Fn(usize) -> Result + Send + Sync, R: Send, @@ -220,24 +243,29 @@ impl PartitionRunner { return Ok(()); } - const CPU_SPAWN_THRESHOLD: f64 = 0.2; - const IO_SPAWN_THRESHOLD: f64 = 0.2; - const TIMER_SECS: u64 = 30; + const TIMER_SECS: u64 = 30; + const INITIAL_DIVISOR: usize = 4; // ── Channels ────────────────────────────────────────────────────────── - let (part_tx, part_rx) = unbounded::(); - let (activate_tx, activate_rx) = unbounded::<()>(); + let (part_tx, part_rx) = unbounded::(); // reset_tx: controller → timer ("reset the 30 s window") - let (reset_tx, reset_rx) = unbounded::<()>(); + let (reset_tx, reset_rx) = unbounded::<()>(); // event_tx: workers + timer → controller (unified event stream) - let (event_tx, event_rx) = unbounded::>(); + let (event_tx, event_rx) = unbounded::>(); + // One activation channel per node: growth always targets a specific + // node, rather than whichever dormant worker happens to win the race + // on a channel shared across all nodes. + let (activate_txs, activate_rxs): (Vec<_>, Vec<_>) = + (0..self.nodes.len()).map(|_| unbounded::<()>()).unzip(); - for &i in order { part_tx.send(i).ok(); } + for &i in order { + part_tx.send(i).ok(); + } drop(part_tx); let max_workers = self.max_workers(); - let n_nodes = self.nodes.len(); - let f = &f; + let node_caps: Vec = self.nodes.iter().map(|n| n.max_workers).collect(); + let f = &f; let mut first_err: Option = None; @@ -260,79 +288,92 @@ impl PartitionRunner { } }); - // ── Pre-spawn workers dormant, round-robin across NUMA nodes ────── - for w in 0..max_workers { - let node = &self.nodes[w % n_nodes]; - let prx = part_rx.clone(); - let etx = event_tx.clone(); - let arx = activate_rx.clone(); - let pool = node.pool.clone(); + // ── Pre-spawn workers dormant, grouped by node ──────────────────── + // Each worker listens on its own node's activation channel only. + for (node, arx) in self.nodes.iter().zip(activate_rxs.iter()) { let cpu_ids = &node.cpu_ids; + for _ in 0..node.max_workers { + let prx = part_rx.clone(); + let etx = event_tx.clone(); + let arx = arx.clone(); + let pool = node.pool.clone(); - s.spawn(move || { - if arx.recv().is_err() { return; } - if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); } - for i in &prx { - let t = Instant::now(); - let r = match &pool { - Some(p) => p.install(|| f(i)), - None => f(i), - }; - etx.send(WorkerEvent::Completed(i, r, t.elapsed())).ok(); - } - }); + s.spawn(move || { + if arx.recv().is_err() { + return; + } + if !cpu_ids.is_empty() { + pin_current_thread(cpu_ids); + } + for i in &prx { + let t = Instant::now(); + let r = match &pool { + Some(p) => p.install(|| f(i)), + None => f(i), + }; + etx.send(WorkerEvent::Completed(i, r, t.elapsed())).ok(); + } + }); + } } // Drop controller's event_tx: event_rx closes when all workers + // timer have exited. drop(event_tx); // ── Controller ──────────────────────────────────────────────────── - let initial_workers = n_nodes.min(max_workers).min(n_total); - for _ in 0..initial_workers { activate_tx.send(()).ok(); } - let mut n_active = initial_workers; + let mut activation = NodeActivation::new(&activate_txs, &node_caps, max_workers); + activation.activate_initial(INITIAL_DIVISOR, n_total); + let mut cpu_sample = CpuSample::now(); - let mut io_sample = IoSample::now(); - let mut completed = 0usize; + let mut io_sample = IoSample::now(); + let mut completed = 0usize; while completed < n_total { let Ok(event) = event_rx.recv() else { break }; match event { WorkerEvent::Completed(i, r, dur) => { match r { - Ok(v) => on_done(i, v, dur), - Err(e) => { if first_err.is_none() { first_err = Some(e); } } + Ok(v) => on_done(i, v, dur), + Err(e) => { + if first_err.is_none() { + first_err = Some(e); + } + } } completed += 1; // Reset the 30 s timer. reset_tx.send(()).ok(); // Inline check: same logic as a timer tick. maybe_activate( - &activate_tx, &mut n_active, max_workers, - &mut cpu_sample, CPU_SPAWN_THRESHOLD, - &mut io_sample, IO_SPAWN_THRESHOLD, - completed, n_total, + &mut activation, + &mut cpu_sample, + &mut io_sample, + completed, + n_total, ); } WorkerEvent::TimerTick => { maybe_activate( - &activate_tx, &mut n_active, max_workers, - &mut cpu_sample, CPU_SPAWN_THRESHOLD, - &mut io_sample, IO_SPAWN_THRESHOLD, - completed, n_total, + &mut activation, + &mut cpu_sample, + &mut io_sample, + completed, + n_total, ); } } } - // Dormant workers exit when activate_tx closes. - drop(activate_tx); + // Dormant workers exit once every sender for their node's channel + // is dropped — `activate_txs` holds the only ones. + drop(activate_txs); // Timer thread exits when reset_tx closes. drop(reset_tx); }); match first_err { Some(e) => Err(e), - None => Ok(()), + None => Ok(()), } } } @@ -344,28 +385,113 @@ enum WorkerEvent { TimerTick, } +/// Tracks how many of each node's dormant workers have been woken, and +/// grows every node by the same amount at each step (capped by that node's +/// remaining dormant workers and by the run's total budget) so load stays +/// balanced across nodes at every point in time — never just "one more +/// worker somewhere". Also remembers the size of the last real growth step +/// (`last_step`), used to scale the CPU activation threshold to what that +/// step could plausibly have contributed (see `maybe_activate`). +struct NodeActivation<'a> { + txs: &'a [crossbeam_channel::Sender<()>], + caps: &'a [usize], + active: Vec, + total: usize, + max: usize, + last_step: usize, +} + +impl<'a> NodeActivation<'a> { + fn new(txs: &'a [crossbeam_channel::Sender<()>], caps: &'a [usize], max: usize) -> Self { + Self { + txs, + caps, + active: vec![0; txs.len()], + total: 0, + max, + last_step: 0, + } + } + + fn total(&self) -> usize { + self.total + } + fn last_step(&self) -> usize { + self.last_step + } + fn max(&self) -> usize { + self.max + } + fn is_full(&self) -> bool { + self.total >= self.max + } + + /// Wake up to `(node_cap / divisor).max(1)` dormant workers on every + /// node, capped by `n_total`. Called once at startup, unconditionally. + fn activate_initial(&mut self, divisor: usize, n_total: usize) { + self.grow(divisor, n_total); + } + + /// Same per-node sizing as [`activate_initial`](Self::activate_initial), + /// applied as a growth step. Returns the number of workers actually + /// activated (may be less than requested once a node or the total + /// budget is exhausted). Updates `last_step` when it actually grew. + fn grow(&mut self, divisor: usize, n_total: usize) -> usize { + let before = self.total; + for idx in 0..self.txs.len() { + let wanted = (self.caps[idx] / divisor).max(1); + let room = self.caps[idx].saturating_sub(self.active[idx]); + let grow = wanted.min(room).min(n_total.saturating_sub(self.total)); + for _ in 0..grow { + self.txs[idx].send(()).ok(); + } + self.active[idx] += grow; + self.total += grow; + } + let grew = self.total - before; + if grew > 0 { + self.last_step = grew; + } + grew + } +} + fn maybe_activate( - activate_tx: &crossbeam_channel::Sender<()>, - n_active: &mut usize, - max_workers: usize, - cpu_sample: &mut CpuSample, - cpu_threshold: f64, - io_sample: &mut IoSample, - io_threshold: f64, - completed: usize, - n_total: usize, + activation: &mut NodeActivation, + cpu_sample: &mut CpuSample, + io_sample: &mut IoSample, + completed: usize, + n_total: usize, ) { - if *n_active >= max_workers || completed >= n_total { return; } + if activation.is_full() || completed >= n_total { + return; + } + + // Expect roughly 1 core of extra efficiency per worker activated in the + // last growth step (CPU-bound case); require at least CPU_SPAWN_THRESHOLD + // (20 %) of that expected gain before growing again. Scaling by the last + // step's size — not the cumulative total — keeps the bar meaningful + // regardless of how many workers are already active: growing by 8 should + // always take ~+1.6 cores to confirm, whether that's the 2nd growth step + // or the 20th. + let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64; // Call both unconditionally (no `||` short-circuit): each sampler must // advance its own window every tick, regardless of what the other one // reports, or it would starve behind whichever signal fires first. let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold); - let io_wants_more = io_sample.do_i_activate(io_threshold); + let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD * activation.last_step() as f64); + if !(cpu_wants_more || io_wants_more) { + return; + } - if cpu_wants_more || io_wants_more { - activate_tx.send(()).ok(); - *n_active += 1; - debug!("activated worker {}/{}", n_active, max_workers); + let grew = activation.grow(GROWTH_DIVISOR, n_total); + if grew > 0 { + debug!( + "activated {} worker(s) — {}/{} active", + grew, + activation.total(), + activation.max() + ); } }