diff --git a/docmd/architecture/numa_partition_runner.md b/docmd/architecture/numa_partition_runner.md new file mode 100644 index 0000000..d955e55 --- /dev/null +++ b/docmd/architecture/numa_partition_runner.md @@ -0,0 +1,179 @@ +# NUMA-aware partition runner + +## Problem + +All partition-level parallel loops in obikindex currently fall into two +categories: + +**Naive Rayon** — used in `build_layers`, `pack_matrices`, `dump`, `select`, +`stats`, `rebuild`, `reindex`: + +```rust +(0..n).into_par_iter().for_each(|i| work(i)); +``` + +Threads come from the global Rayon pool with no NUMA awareness. On +multi-socket machines this produces cross-socket memory traffic and degrades +performance super-linearly (see [NUMA-aware worker pools](numa_worker_pools.md)). + +**Ad-hoc adaptive pool** — used in `merge`: + +A bespoke implementation with pre-spawned workers, channel-based dispatch, and +activation control. It handles NUMA correctly but is not reusable. + +Both cases should be replaced by a single generic mechanism. + +## Unified model + +The key insight is that **UMA is just the NUMA case with a single node**. The +runner always works the same way: one controller thread per node, each +independently managing its own workers with the same adaptive logic. The only +difference between UMA and NUMA is the number of nodes and whether workers are +pinned. + +``` +NUMA (k nodes) UMA (1 node) + +controller-0 controller-1 … controller-0 + │ │ │ +workers[0] workers[1] workers[0] +(pinned) (pinned) (global pool) + └───────────────┴──────────────────┘ + shared work queue +``` + +On each node, the Rayon `ThreadPool` is pinned to that node's CPUs. +`pool.install()` ensures all internal Rayon calls (inside the work function) +use the node-local pool. Linux first-touch then places heap allocations in +local DRAM automatically. + +On UMA the global Rayon pool is used directly — no pinning, no overhead. + +## Adaptive mechanism + +Each controller follows the same logic regardless of node count: + +1. Pre-spawn `workers_per_node` dormant worker threads (blocked on `activate_rx`). +2. Activate the first worker immediately. +3. Loop on result channel with a `SPAWN_POLL` timeout: + - On result: call `on_done`; check whether to activate the next worker. + - On timeout: same check. + - Activation criterion: `should_spawn_worker(active, global_efficiency, prev_efficiency)`. +4. Drop `activate_tx` when done — dormant workers exit cleanly. + +**Global CPU efficiency** (`CpuSample`, reads `/proc/stat` on Linux) is used by +all controllers — no per-node measurement needed. The signal is coarser than +per-node efficiency but correct in practice: if any node saturates memory +bandwidth, the global efficiency drops and all controllers stop activating +workers. Using a standard portable primitive avoids platform-specific CPU +accounting and keeps the implementation clean. + +## Proposed API + +```rust +pub struct PartitionRunner { + // One entry per NUMA node; one entry total on UMA. + nodes: Vec, +} + +struct NodeConfig { + pool: Option>, // None = global Rayon pool (UMA) + cpu_ids: Vec, // empty = no pinning (UMA) + max_workers: usize, +} + +impl PartitionRunner { + /// Detect topology and build the runner. + /// Returns a single-node runner on UMA / macOS / hwloc failure. + pub fn new() -> Self; + + /// Run `f(i)` for every index in `order`, collecting results. + /// + /// `on_done(i, result, elapsed)` is called under an internal mutex as + /// each partition completes — use it for progress bars and aggregation. + /// The runner serialises all calls to `on_done` via an internal + /// `Arc>`, so no `Sync` bound is required on the callback. + /// `Send` is required because the Arc clone crosses thread boundaries. + /// + /// Serialisation is free in practice: a partition takes seconds to + /// minutes; the callback takes microseconds. Contention is negligible. + /// + /// Returns the first error from `f`, if any. + pub fn run( + &self, + order: &[usize], + f: F, + on_done: C, + ) -> Result<(), E> + where + F: Fn(usize) -> Result + Send + Sync, + R: Send, + E: Send, + C: FnMut(usize, R, Duration) + Send; // Send required, Sync is not +} +``` + +`order` is caller-supplied so each command chooses its scheduling strategy: +largest-first for `merge`, sequential for `build_layers`, etc. + +## Migration examples + +### merge.rs (before: ~180 lines of bespoke machinery) + +```rust +let runner = PartitionRunner::new(); +runner.run( + &order, + |i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence) + .map_err(OKIError::Partition), + |i, g_len, dur| { + pb.inc(1); + debug!("partition {i}: done in {:.1}s — {g_len} new kmers", dur.as_secs_f64()); + part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len }); + }, +)?; +``` + +### index.rs build_layers (before: naive into_par_iter) + +```rust +let order: Vec = (0..n).collect(); +let runner = PartitionRunner::new(); +runner.run( + &order, + |i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits) + .map_err(OKIError::Partition), + |_, n_kmers, _| { + total_kmers.fetch_add(n_kmers, Ordering::Relaxed); + pb.inc(1); + }, +)?; +``` + +All other sites (`pack_matrices`, `dump`, `select`, etc.) follow the same +pattern. + +## Placement + +`PartitionRunner` lives in `obikindex/src/numa.rs` alongside `NumaSetup`. +It depends only on standard library primitives and Rayon — no new dependencies. + +A single `PartitionRunner` instance can be built once per command invocation +and reused across multiple `run()` calls (e.g. `merge` runs +`merge_partitions` then `pack_matrices`). + +## Open questions + +- **Error handling**: `run` currently returns the first error; remaining errors + are dropped. A `Vec` return would give complete diagnostics. + +- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated + for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from + a higher value. A per-call override could be added to the API. + +- **`on_done` ordering**: the runner serialises calls to `on_done` via an + internal `Arc>`. `Send` is required (the Arc clone crosses thread + boundaries); `Sync` is not (only one thread holds the lock at a time). + Contention is negligible because a partition takes seconds while the callback + takes microseconds. The callback is therefore simple to write (plain + `Vec::push`, plain `FnMut`) with no measurable performance cost. diff --git a/mkdocs.yml b/mkdocs.yml index 8a4498d..c27d1a9 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -57,6 +57,7 @@ nav: - Sequences: architecture/sequences/invariant.md - Kmer index: architecture/index_architecture.md - NUMA-aware worker pools: architecture/numa_worker_pools.md + - NUMA-aware partition runner: architecture/numa_partition_runner.md watch: - docmd diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs index da79b3f..2b0332c 100644 --- a/src/obikindex/src/numa.rs +++ b/src/obikindex/src/numa.rs @@ -10,12 +10,15 @@ // - the system has only one NUMA node (UMA, Apple Silicon, single-socket) // - any per-node pool fails to build -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; +use crossbeam_channel::{RecvTimeoutError, unbounded}; use hwlocality::Topology; use hwlocality::cpu::binding::CpuBindingFlags; use hwlocality::cpu::cpuset::CpuSet; use hwlocality::object::types::ObjectType; +use obisys::CpuSample; use tracing::debug; // ── Public interface ────────────────────────────────────────────────────────── @@ -100,3 +103,248 @@ fn build_pool(cpus: &[usize]) -> Option { .build() .ok() } + +// ── Adaptive spawn heuristic ────────────────────────────────────────────────── +// +// First worker: spawn if CPU efficiency is below SPAWN_THRESHOLD (machine is +// under-utilised). Subsequent workers: spawn only if the last worker raised +// efficiency by at least the expected marginal gain (1/n_workers), with a +// minimum floor to avoid spurious spawns from efficiency fluctuations. + +const SPAWN_THRESHOLD: f64 = 0.95; +const MIN_MARGINAL_GAIN: f64 = 0.03; +const SPAWN_POLL: Duration = Duration::from_secs(20); + +fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool { + if n_workers == 1 { + eff < SPAWN_THRESHOLD + } else { + let gain = eff - eff_at_last_spawn; + let expected = 1.0 / n_workers as f64; + gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN) + } +} + +// ── PartitionRunner ─────────────────────────────────────────────────────────── + +struct NodeConfig { + pool: Option>, + cpu_ids: Vec, + max_workers: usize, +} + +/// Generic NUMA-aware runner for partition-level parallel work. +/// +/// Encapsulates worker spawning, NUMA pinning, adaptive activation, and result +/// collection. UMA systems are handled as the degenerate case of a single node +/// with no pinning. +/// +/// # Model +/// +/// One controller thread per NUMA node (one total on UMA). Each controller +/// manages up to `max_workers` dormant workers that drain a shared work queue. +/// Workers are activated one at a time; a new worker is added when global CPU +/// efficiency justifies it. On NUMA all workers are activated immediately +/// (memory bandwidth, not CPU count, is the bottleneck). +pub struct PartitionRunner { + nodes: Vec, + n_cores: usize, +} + +impl PartitionRunner { + /// Detect topology and build. Falls back to a single-node UMA runner on + /// macOS, single-socket machines, or hwloc failure. + pub fn new() -> Self { + let n_cores = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + + match build() { + Some(ns) => { + let wpn = ns.workers_per_node(); + debug!( + "PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node", + ns.pools.len(), + wpn, + ); + let nodes = ns.pools + .into_iter() + .zip(ns.cpus_per_node) + .map(|(pool, cpu_ids)| NodeConfig { + pool: Some(pool), + cpu_ids, + max_workers: wpn, + }) + .collect(); + Self { nodes, n_cores } + } + None => { + let max_workers = (n_cores / 2).max(1); + debug!( + "PartitionRunner: UMA mode — adaptive up to {} worker(s)", + max_workers, + ); + Self { + nodes: vec![NodeConfig { + pool: None, + cpu_ids: vec![], + max_workers, + }], + n_cores, + } + } + } + } + + /// Run `f(i)` for every index in `order`. + /// + /// `on_done(i, result, elapsed)` is called under an internal mutex as each + /// partition completes — suitable for progress bars, logging, and result + /// aggregation. No `Send` or `Sync` bound is required on the callback. + /// + /// The work queue is shared across all NUMA nodes: any idle worker takes + /// the next available partition regardless of node, ensuring load balance. + /// + /// Returns the first error produced by `f`, if any. + pub fn run( + &self, + order: &[usize], + f: F, + on_done: C, + ) -> Result<(), E> + where + F: Fn(usize) -> Result + Send + Sync, + R: Send, + E: Send, + C: FnMut(usize, R, Duration) + Send, + { + let f = Arc::new(f); + let on_done = Arc::new(Mutex::new(on_done)); + let first_err: Arc>> = Arc::new(Mutex::new(None)); + + // Shared work queue — pre-loaded in caller-supplied order. + let (part_tx, part_rx) = unbounded::(); + for &i in order { + part_tx.send(i).ok(); + } + drop(part_tx); + + let n_cores = self.n_cores; + + std::thread::scope(|s| { + for node in &self.nodes { + let f = Arc::clone(&f); + let on_done = Arc::clone(&on_done); + let first_err = Arc::clone(&first_err); + let part_rx = part_rx.clone(); + + s.spawn(move || { + // Per-node result and activation channels. + let (result_tx, result_rx) = + unbounded::<(usize, Result, Duration)>(); + let (activate_tx, activate_rx) = unbounded::<()>(); + + std::thread::scope(|ws| { + // Pre-spawn workers (all dormant until activated). + for _ in 0..node.max_workers { + let prx = part_rx.clone(); + let rtx = result_tx.clone(); + let arx = activate_rx.clone(); + let f = Arc::clone(&f); + let pool = node.pool.clone(); + let cpu_ids = node.cpu_ids.clone(); + + ws.spawn(move || { + if !cpu_ids.is_empty() { + pin_current_thread(&cpu_ids); + } + if arx.recv().is_err() { + return; // never activated — exit cleanly + } + for i in &prx { + let t = Instant::now(); + let r = match &pool { + Some(p) => p.install(|| f(i)), + None => f(i), + }; + rtx.send((i, r, t.elapsed())).ok(); + } + }); + } + // Drop the controller's copy: result_rx disconnects + // once all worker copies are also dropped (workers done). + drop(result_tx); + + // In NUMA mode activate all workers immediately; + // in UMA mode activate one and grow adaptively. + let numa_mode = node.pool.is_some(); + let initial = if numa_mode { node.max_workers } else { 1 }; + for _ in 0..initial { + activate_tx.send(()).ok(); + } + let mut active_workers = initial; + let mut cpu_sample = CpuSample::now(); + let mut eff_at_last_spawn = 0.0f64; + + // Controller loop. + loop { + match result_rx.recv_timeout(SPAWN_POLL) { + Ok((i, r, dur)) => { + match r { + Ok(v) => { + on_done.lock().unwrap()(i, v, dur); + } + Err(e) => { + let mut g = first_err.lock().unwrap(); + if g.is_none() { *g = Some(e); } + } + } + if !numa_mode && active_workers < node.max_workers { + let eff = cpu_sample.cpu_efficiency(n_cores); + if should_spawn_worker(active_workers, eff, eff_at_last_spawn) { + debug!( + "activated worker {} — efficiency {:.0}%", + active_workers + 1, + eff * 100.0, + ); + activate_tx.send(()).ok(); + active_workers += 1; + eff_at_last_spawn = eff; + cpu_sample = CpuSample::now(); + } + } + } + Err(RecvTimeoutError::Timeout) => { + if !numa_mode && active_workers < node.max_workers { + let eff = cpu_sample.cpu_efficiency(n_cores); + if should_spawn_worker(active_workers, eff, eff_at_last_spawn) { + debug!( + "activated worker {} (poll) — efficiency {:.0}%", + active_workers + 1, + eff * 100.0, + ); + activate_tx.send(()).ok(); + active_workers += 1; + eff_at_last_spawn = eff; + cpu_sample = CpuSample::now(); + } + } + } + Err(RecvTimeoutError::Disconnected) => break, + } + } + // Signal any dormant workers that were never activated + // to exit (UMA mode where max_workers was never reached). + drop(activate_tx); + }); // ws: waits for all workers of this node + }); + } + }); // s: waits for all node controllers + + let mut g = first_err.lock().unwrap(); + match g.take() { + Some(e) => Err(e), + None => Ok(()), + } + } +}