diff --git a/src/Cargo.lock b/src/Cargo.lock index 3de361c..80d30d7 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1704,7 +1704,7 @@ dependencies = [ [[package]] name = "obikmer" -version = "1.1.14" +version = "1.1.15" dependencies = [ "clap", "csv", diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs index 4c12013..9032a99 100644 --- a/src/obikindex/src/numa.rs +++ b/src/obikindex/src/numa.rs @@ -5,10 +5,8 @@ // CPUs. Linux first-touch policy then places graph allocations in local DRAM // automatically — no explicit memory binding needed. // -// Returns None when: -// - hwloc topology initialisation fails -// - the system has only one NUMA node (UMA, Apple Silicon, single-socket) -// - any per-node pool fails to build +// UMA systems (single socket, Apple Silicon, etc.) are the degenerate case: +// one synthetic node containing all cores, no pool, no pinning. use std::sync::Arc; use std::time::{Duration, Instant}; @@ -18,60 +16,69 @@ use hwlocality::Topology; use hwlocality::cpu::binding::CpuBindingFlags; use hwlocality::cpu::cpuset::CpuSet; use hwlocality::object::types::ObjectType; +use obisys::CpuSample; use tracing::debug; // ── Public interface ────────────────────────────────────────────────────────── pub struct NumaSetup { - pub pools: Vec>, + /// One entry per NUMA node. `None` on UMA systems (no pool, no pinning). + pub pools: Vec>>, /// CPU indices for each NUMA node, in node order. pub cpus_per_node: Vec>, } impl NumaSetup { - /// Workers to activate per NUMA node. - /// Empirically ~3 workers saturate one node's memory bandwidth. + /// Maximum worker slots per node (one per physical core in the node). pub fn workers_per_node(&self) -> usize { self.cpus_per_node .first() - .map(|c| (c.len() / 8).max(3).min(8)) - .unwrap_or(3) + .map(|c| c.len().max(1)) + .unwrap_or(1) } } /// Detect NUMA topology and build per-node Rayon pools. -/// Returns None on UMA systems, single-node machines, or on failure. -pub fn build() -> Option { - let topology = Topology::new().ok()?; +/// Always succeeds: falls back to a single synthetic UMA node on failure. +pub fn build() -> NumaSetup { + if let Ok(topology) = Topology::new() { + let nodes: Vec> = topology + .objects_with_type(ObjectType::NUMANode) + .filter_map(|obj| obj.cpuset()) + .map(|cpuset| { + cpuset + .iter_set() + .map(|idx| usize::from(idx)) + .collect::>() + }) + .filter(|v| !v.is_empty()) + .collect(); - let nodes: Vec> = topology - .objects_with_type(ObjectType::NUMANode) - .filter_map(|obj| obj.cpuset()) - .map(|cpuset| { - cpuset - .iter_set() - .map(|idx| usize::from(idx)) - .collect::>() - }) - .filter(|v| !v.is_empty()) - .collect(); - - if nodes.len() <= 1 { - return None; + if nodes.len() > 1 { + if let Some(pools) = nodes + .iter() + .map(|cpus| build_pool(cpus).map(|p| Some(Arc::new(p)))) + .collect::>>() + { + debug!( + "NUMA topology: {} node(s), {} core(s)/node", + nodes.len(), + nodes.first().map_or(0, |v| v.len()), + ); + return NumaSetup { pools, cpus_per_node: nodes }; + } + } } - debug!( - "NUMA topology: {} node(s), {} core(s)/node", - nodes.len(), - nodes.first().map_or(0, |v| v.len()), - ); - - let pools = nodes - .iter() - .map(|cpus| build_pool(cpus).map(Arc::new)) - .collect::>>()?; - - Some(NumaSetup { pools, cpus_per_node: nodes }) + // UMA fallback: single synthetic node, all cores, no pool, no pinning. + let n_cores = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + debug!("UMA: single synthetic node, {} core(s)", n_cores); + NumaSetup { + pools: vec![None], + cpus_per_node: vec![(0..n_cores).collect()], + } } /// Bind the calling thread to `cpu_indices` using hwloc. @@ -114,18 +121,19 @@ struct NodeConfig { /// Generic NUMA-aware runner for partition-level parallel work. /// /// Workers are distributed round-robin across NUMA nodes and pinned to their -/// node's CPUs. UMA systems are the degenerate case: one node, no pinning. +/// node's CPUs. UMA is the degenerate case: one node, no pinning. +/// +/// Workers are pre-spawned dormant and activated one by one as CPU efficiency +/// falls below `SPAWN_THRESHOLD`. This avoids over-provisioning on I/O-bound +/// or memory-bandwidth-bound workloads while saturating CPU-bound ones. /// /// # Termination /// -/// Termination is driven entirely by channel closure: -/// /// ```text /// drop(part_tx) → part_rx drains → workers exit → drop their result_tx /// drop(result_tx) → result_rx closes → controller loop exits +/// drop(activate_tx) → dormant workers exit cleanly /// ``` -/// -/// No explicit counter or sentinel needed. pub struct PartitionRunner { nodes: Vec, } @@ -136,50 +144,38 @@ impl PartitionRunner { self.nodes.iter().map(|n| n.max_workers).sum() } - /// Detect topology and build. Falls back to a single-node UMA runner on - /// macOS, single-socket machines, or hwloc failure. + /// Detect topology and build. Always succeeds. pub fn new() -> Self { - match build() { - Some(ns) => { - let wpn = ns.workers_per_node(); - debug!( - "PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node", - ns.pools.len(), wpn, - ); - let nodes = ns.pools - .into_iter() - .zip(ns.cpus_per_node) - .map(|(pool, cpu_ids)| NodeConfig { - pool: Some(pool), - cpu_ids, - max_workers: wpn, - }) - .collect(); - Self { nodes } - } - None => { - let n_cores = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - let max_workers = (n_cores / 2).max(1); - debug!("PartitionRunner: UMA mode — {} worker(s)", max_workers); - Self { - nodes: vec![NodeConfig { - pool: None, - cpu_ids: vec![], - max_workers, - }], - } - } - } + let ns = build(); + let wpn = ns.workers_per_node(); + debug!( + "PartitionRunner: {} node(s) × {} worker(s)/node max", + ns.pools.len(), + wpn, + ); + let nodes = ns.pools + .into_iter() + .zip(ns.cpus_per_node) + .map(|(pool, cpu_ids)| NodeConfig { + pool, + cpu_ids, + max_workers: wpn, + }) + .collect(); + Self { nodes } } /// Run `f(i)` for every index in `order`. /// - /// Workers are spawned upfront and distributed round-robin across NUMA - /// nodes. `on_done(i, result, elapsed)` is called from the controller - /// thread as each partition completes — suitable for progress bars and - /// result aggregation. + /// Workers are pre-spawned dormant and activated adaptively. A timer thread + /// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed + /// partition resets that timer (forcing an immediate check) and also + /// triggers its own inline check. A new worker is activated whenever + /// efficiency falls below `SPAWN_THRESHOLD`. + /// + /// `on_done(i, result, elapsed)` is called from the controller thread as + /// each partition completes — suitable for progress bars and result + /// aggregation. /// /// Returns the first error produced by `f`, if any. pub fn run( @@ -194,28 +190,65 @@ impl PartitionRunner { E: Send, C: FnMut(usize, R, Duration) + Send, { - // Pre-load the work queue, then drop the sender so workers' part_rx - // iterators terminate when the queue is drained. - let (part_tx, part_rx) = unbounded::(); + let n_total = order.len(); + if n_total == 0 { + return Ok(()); + } + + const SPAWN_THRESHOLD: f64 = 0.95; + const TIMER_SECS: u64 = 30; + + let n_cores = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + + // ── Channels ────────────────────────────────────────────────────────── + let (part_tx, part_rx) = unbounded::(); + let (activate_tx, activate_rx) = unbounded::<()>(); + // reset_tx: controller → timer ("reset the 30 s window") + let (reset_tx, reset_rx) = unbounded::<()>(); + // event_tx: workers + timer → controller (unified event stream) + let (event_tx, event_rx) = unbounded::>(); + for &i in order { part_tx.send(i).ok(); } drop(part_tx); - let (result_tx, result_rx) = unbounded::<(usize, Result, Duration)>(); - let n_nodes = self.nodes.len(); - let f = &f; // shared borrow; F: Sync so concurrent calls are safe + let max_workers = self.max_workers(); + let n_nodes = self.nodes.len(); + let f = &f; let mut first_err: Option = None; std::thread::scope(|s| { - // Spawn all workers upfront, round-robin across NUMA nodes. - for w in 0..self.max_workers() { + // ── Timer thread ────────────────────────────────────────────────── + // Sends TimerTick every TIMER_SECS seconds. Resets its window each + // time reset_rx receives a message (i.e. on partition completion). + let timer_tx = event_tx.clone(); + s.spawn(move || { + let period = Duration::from_secs(TIMER_SECS); + loop { + crossbeam_channel::select! { + recv(reset_rx) -> r => { + if r.is_err() { break; } // reset_tx dropped → exit + } + default(period) => { + if timer_tx.send(WorkerEvent::TimerTick).is_err() { break; } + } + } + } + }); + + // ── Pre-spawn workers dormant, round-robin across NUMA nodes ────── + for w in 0..max_workers { let node = &self.nodes[w % n_nodes]; let prx = part_rx.clone(); - let rtx = result_tx.clone(); + let etx = event_tx.clone(); + let arx = activate_rx.clone(); let pool = node.pool.clone(); let cpu_ids = &node.cpu_ids; s.spawn(move || { + if arx.recv().is_err() { return; } if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); } for i in &prx { let t = Instant::now(); @@ -223,24 +256,53 @@ impl PartitionRunner { Some(p) => p.install(|| f(i)), None => f(i), }; - rtx.send((i, r, t.elapsed())).ok(); + etx.send(WorkerEvent::Completed(i, r, t.elapsed())).ok(); } }); } + // Drop controller's event_tx: event_rx closes when all workers + + // timer have exited. + drop(event_tx); - // Drop the controller's sender: result_rx closes once all worker - // rtx clones are dropped (i.e. all workers have exited). - drop(result_tx); + // ── Controller ──────────────────────────────────────────────────── + activate_tx.send(()).ok(); + let mut n_active = 1usize; + let mut cpu_sample = CpuSample::now(); + let mut eff_at_last_spawn = 0.0f64; // 0 = no previous spawn to evaluate + let mut completed = 0usize; - // Drain results concurrently with workers. The for loop exits - // when result_rx is disconnected — at that point all workers are - // done and the scope join below is instantaneous. - for (i, r, dur) in &result_rx { - match r { - Ok(v) => on_done(i, v, dur), - Err(e) => { if first_err.is_none() { first_err = Some(e); } } + while completed < n_total { + let Ok(event) = event_rx.recv() else { break }; + match event { + WorkerEvent::Completed(i, r, dur) => { + match r { + Ok(v) => on_done(i, v, dur), + Err(e) => { if first_err.is_none() { first_err = Some(e); } } + } + completed += 1; + // Reset the 30 s timer. + reset_tx.send(()).ok(); + // Inline check: same logic as a timer tick. + maybe_activate( + &activate_tx, &mut n_active, max_workers, + &mut cpu_sample, &mut eff_at_last_spawn, + n_cores, SPAWN_THRESHOLD, completed, n_total, + ); + } + WorkerEvent::TimerTick => { + maybe_activate( + &activate_tx, &mut n_active, max_workers, + &mut cpu_sample, &mut eff_at_last_spawn, + n_cores, SPAWN_THRESHOLD, completed, n_total, + ); + } } } + + // Dormant workers exit when activate_tx closes. + drop(activate_tx); + // Timer thread exits when reset_tx closes. + drop(reset_tx); }); match first_err { @@ -249,3 +311,51 @@ impl PartitionRunner { } } } + +// ── Internal event type ─────────────────────────────────────────────────────── + +enum WorkerEvent { + Completed(usize, Result, Duration), + TimerTick, +} + +fn maybe_activate( + activate_tx: &crossbeam_channel::Sender<()>, + n_active: &mut usize, + max_workers: usize, + cpu_sample: &mut CpuSample, + eff_at_last_spawn: &mut f64, + n_cores: usize, + threshold: f64, + completed: usize, + n_total: usize, +) { + if *n_active >= max_workers || completed >= n_total { return; } + + let eff = cpu_sample.cpu_efficiency(n_cores); + if eff >= threshold { return; } // CPU already saturated + + // Check that the previous activation was beneficial enough. + // Going from k-1 → k workers, the minimum acceptable speedup is (k-1+0.2)/(k-1). + // For the very first extra worker (n_active == 1, no previous spawn), skip this + // check: eff_at_last_spawn == 0 acts as the sentinel. + let last_spawn_was_beneficial = if *eff_at_last_spawn < 1e-9 { + true // first additional worker: no prior data to evaluate + } else { + let k_before = (*n_active - 1) as f64; + let min_speedup = (k_before + 0.2) / k_before; + let actual_speedup = eff / *eff_at_last_spawn; + actual_speedup >= min_speedup + }; + + if last_spawn_was_beneficial { + activate_tx.send(()).ok(); + *eff_at_last_spawn = eff; + *n_active += 1; + *cpu_sample = CpuSample::now(); + debug!( + "activated worker {}/{} — efficiency {:.0}%", + n_active, max_workers, eff * 100.0, + ); + } +} diff --git a/src/obikmer/Cargo.toml b/src/obikmer/Cargo.toml index 7cd08a0..936e46a 100644 --- a/src/obikmer/Cargo.toml +++ b/src/obikmer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "obikmer" -version = "1.1.14" +version = "1.1.15" edition = "2024" [[bin]]