From 7a87e911b63382cd96e4ede2eea2b8d2f795a6d5 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 15 Jun 2026 11:24:37 +0200 Subject: [PATCH 1/2] feat: introduce NUMA-aware PartitionRunner for adaptive parallelism Replace NUMA-naive Rayon loops and ad-hoc adaptive pools with a unified `PartitionRunner` that manages a NUMA-aware worker pool. The implementation uses pinned Rayon thread pools per node and activates dormant threads based on real-time CPU efficiency metrics. This standardizes partition-level parallelism, optimizes memory locality, and eliminates cross-socket traffic. Includes architecture documentation and updates mkdocs navigation. --- docmd/architecture/numa_partition_runner.md | 179 ++++++++++++++ mkdocs.yml | 1 + src/obikindex/src/numa.rs | 250 +++++++++++++++++++- 3 files changed, 429 insertions(+), 1 deletion(-) create mode 100644 docmd/architecture/numa_partition_runner.md diff --git a/docmd/architecture/numa_partition_runner.md b/docmd/architecture/numa_partition_runner.md new file mode 100644 index 0000000..d955e55 --- /dev/null +++ b/docmd/architecture/numa_partition_runner.md @@ -0,0 +1,179 @@ +# NUMA-aware partition runner + +## Problem + +All partition-level parallel loops in obikindex currently fall into two +categories: + +**Naive Rayon** — used in `build_layers`, `pack_matrices`, `dump`, `select`, +`stats`, `rebuild`, `reindex`: + +```rust +(0..n).into_par_iter().for_each(|i| work(i)); +``` + +Threads come from the global Rayon pool with no NUMA awareness. On +multi-socket machines this produces cross-socket memory traffic and degrades +performance super-linearly (see [NUMA-aware worker pools](numa_worker_pools.md)). + +**Ad-hoc adaptive pool** — used in `merge`: + +A bespoke implementation with pre-spawned workers, channel-based dispatch, and +activation control. It handles NUMA correctly but is not reusable. + +Both cases should be replaced by a single generic mechanism. + +## Unified model + +The key insight is that **UMA is just the NUMA case with a single node**. The +runner always works the same way: one controller thread per node, each +independently managing its own workers with the same adaptive logic. The only +difference between UMA and NUMA is the number of nodes and whether workers are +pinned. + +``` +NUMA (k nodes) UMA (1 node) + +controller-0 controller-1 … controller-0 + │ │ │ +workers[0] workers[1] workers[0] +(pinned) (pinned) (global pool) + └───────────────┴──────────────────┘ + shared work queue +``` + +On each node, the Rayon `ThreadPool` is pinned to that node's CPUs. +`pool.install()` ensures all internal Rayon calls (inside the work function) +use the node-local pool. Linux first-touch then places heap allocations in +local DRAM automatically. + +On UMA the global Rayon pool is used directly — no pinning, no overhead. + +## Adaptive mechanism + +Each controller follows the same logic regardless of node count: + +1. Pre-spawn `workers_per_node` dormant worker threads (blocked on `activate_rx`). +2. Activate the first worker immediately. +3. Loop on result channel with a `SPAWN_POLL` timeout: + - On result: call `on_done`; check whether to activate the next worker. + - On timeout: same check. + - Activation criterion: `should_spawn_worker(active, global_efficiency, prev_efficiency)`. +4. Drop `activate_tx` when done — dormant workers exit cleanly. + +**Global CPU efficiency** (`CpuSample`, reads `/proc/stat` on Linux) is used by +all controllers — no per-node measurement needed. The signal is coarser than +per-node efficiency but correct in practice: if any node saturates memory +bandwidth, the global efficiency drops and all controllers stop activating +workers. Using a standard portable primitive avoids platform-specific CPU +accounting and keeps the implementation clean. + +## Proposed API + +```rust +pub struct PartitionRunner { + // One entry per NUMA node; one entry total on UMA. + nodes: Vec, +} + +struct NodeConfig { + pool: Option>, // None = global Rayon pool (UMA) + cpu_ids: Vec, // empty = no pinning (UMA) + max_workers: usize, +} + +impl PartitionRunner { + /// Detect topology and build the runner. + /// Returns a single-node runner on UMA / macOS / hwloc failure. + pub fn new() -> Self; + + /// Run `f(i)` for every index in `order`, collecting results. + /// + /// `on_done(i, result, elapsed)` is called under an internal mutex as + /// each partition completes — use it for progress bars and aggregation. + /// The runner serialises all calls to `on_done` via an internal + /// `Arc>`, so no `Sync` bound is required on the callback. + /// `Send` is required because the Arc clone crosses thread boundaries. + /// + /// Serialisation is free in practice: a partition takes seconds to + /// minutes; the callback takes microseconds. Contention is negligible. + /// + /// Returns the first error from `f`, if any. + pub fn run( + &self, + order: &[usize], + f: F, + on_done: C, + ) -> Result<(), E> + where + F: Fn(usize) -> Result + Send + Sync, + R: Send, + E: Send, + C: FnMut(usize, R, Duration) + Send; // Send required, Sync is not +} +``` + +`order` is caller-supplied so each command chooses its scheduling strategy: +largest-first for `merge`, sequential for `build_layers`, etc. + +## Migration examples + +### merge.rs (before: ~180 lines of bespoke machinery) + +```rust +let runner = PartitionRunner::new(); +runner.run( + &order, + |i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence) + .map_err(OKIError::Partition), + |i, g_len, dur| { + pb.inc(1); + debug!("partition {i}: done in {:.1}s — {g_len} new kmers", dur.as_secs_f64()); + part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len }); + }, +)?; +``` + +### index.rs build_layers (before: naive into_par_iter) + +```rust +let order: Vec = (0..n).collect(); +let runner = PartitionRunner::new(); +runner.run( + &order, + |i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits) + .map_err(OKIError::Partition), + |_, n_kmers, _| { + total_kmers.fetch_add(n_kmers, Ordering::Relaxed); + pb.inc(1); + }, +)?; +``` + +All other sites (`pack_matrices`, `dump`, `select`, etc.) follow the same +pattern. + +## Placement + +`PartitionRunner` lives in `obikindex/src/numa.rs` alongside `NumaSetup`. +It depends only on standard library primitives and Rayon — no new dependencies. + +A single `PartitionRunner` instance can be built once per command invocation +and reused across multiple `run()` calls (e.g. `merge` runs +`merge_partitions` then `pack_matrices`). + +## Open questions + +- **Error handling**: `run` currently returns the first error; remaining errors + are dropped. A `Vec` return would give complete diagnostics. + +- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated + for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from + a higher value. A per-call override could be added to the API. + +- **`on_done` ordering**: the runner serialises calls to `on_done` via an + internal `Arc>`. `Send` is required (the Arc clone crosses thread + boundaries); `Sync` is not (only one thread holds the lock at a time). + Contention is negligible because a partition takes seconds while the callback + takes microseconds. The callback is therefore simple to write (plain + `Vec::push`, plain `FnMut`) with no measurable performance cost. diff --git a/mkdocs.yml b/mkdocs.yml index 8a4498d..c27d1a9 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -57,6 +57,7 @@ nav: - Sequences: architecture/sequences/invariant.md - Kmer index: architecture/index_architecture.md - NUMA-aware worker pools: architecture/numa_worker_pools.md + - NUMA-aware partition runner: architecture/numa_partition_runner.md watch: - docmd diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs index da79b3f..2b0332c 100644 --- a/src/obikindex/src/numa.rs +++ b/src/obikindex/src/numa.rs @@ -10,12 +10,15 @@ // - the system has only one NUMA node (UMA, Apple Silicon, single-socket) // - any per-node pool fails to build -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; +use crossbeam_channel::{RecvTimeoutError, unbounded}; use hwlocality::Topology; use hwlocality::cpu::binding::CpuBindingFlags; use hwlocality::cpu::cpuset::CpuSet; use hwlocality::object::types::ObjectType; +use obisys::CpuSample; use tracing::debug; // ── Public interface ────────────────────────────────────────────────────────── @@ -100,3 +103,248 @@ fn build_pool(cpus: &[usize]) -> Option { .build() .ok() } + +// ── Adaptive spawn heuristic ────────────────────────────────────────────────── +// +// First worker: spawn if CPU efficiency is below SPAWN_THRESHOLD (machine is +// under-utilised). Subsequent workers: spawn only if the last worker raised +// efficiency by at least the expected marginal gain (1/n_workers), with a +// minimum floor to avoid spurious spawns from efficiency fluctuations. + +const SPAWN_THRESHOLD: f64 = 0.95; +const MIN_MARGINAL_GAIN: f64 = 0.03; +const SPAWN_POLL: Duration = Duration::from_secs(20); + +fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool { + if n_workers == 1 { + eff < SPAWN_THRESHOLD + } else { + let gain = eff - eff_at_last_spawn; + let expected = 1.0 / n_workers as f64; + gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN) + } +} + +// ── PartitionRunner ─────────────────────────────────────────────────────────── + +struct NodeConfig { + pool: Option>, + cpu_ids: Vec, + max_workers: usize, +} + +/// Generic NUMA-aware runner for partition-level parallel work. +/// +/// Encapsulates worker spawning, NUMA pinning, adaptive activation, and result +/// collection. UMA systems are handled as the degenerate case of a single node +/// with no pinning. +/// +/// # Model +/// +/// One controller thread per NUMA node (one total on UMA). Each controller +/// manages up to `max_workers` dormant workers that drain a shared work queue. +/// Workers are activated one at a time; a new worker is added when global CPU +/// efficiency justifies it. On NUMA all workers are activated immediately +/// (memory bandwidth, not CPU count, is the bottleneck). +pub struct PartitionRunner { + nodes: Vec, + n_cores: usize, +} + +impl PartitionRunner { + /// Detect topology and build. Falls back to a single-node UMA runner on + /// macOS, single-socket machines, or hwloc failure. + pub fn new() -> Self { + let n_cores = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + + match build() { + Some(ns) => { + let wpn = ns.workers_per_node(); + debug!( + "PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node", + ns.pools.len(), + wpn, + ); + let nodes = ns.pools + .into_iter() + .zip(ns.cpus_per_node) + .map(|(pool, cpu_ids)| NodeConfig { + pool: Some(pool), + cpu_ids, + max_workers: wpn, + }) + .collect(); + Self { nodes, n_cores } + } + None => { + let max_workers = (n_cores / 2).max(1); + debug!( + "PartitionRunner: UMA mode — adaptive up to {} worker(s)", + max_workers, + ); + Self { + nodes: vec![NodeConfig { + pool: None, + cpu_ids: vec![], + max_workers, + }], + n_cores, + } + } + } + } + + /// Run `f(i)` for every index in `order`. + /// + /// `on_done(i, result, elapsed)` is called under an internal mutex as each + /// partition completes — suitable for progress bars, logging, and result + /// aggregation. No `Send` or `Sync` bound is required on the callback. + /// + /// The work queue is shared across all NUMA nodes: any idle worker takes + /// the next available partition regardless of node, ensuring load balance. + /// + /// Returns the first error produced by `f`, if any. + pub fn run( + &self, + order: &[usize], + f: F, + on_done: C, + ) -> Result<(), E> + where + F: Fn(usize) -> Result + Send + Sync, + R: Send, + E: Send, + C: FnMut(usize, R, Duration) + Send, + { + let f = Arc::new(f); + let on_done = Arc::new(Mutex::new(on_done)); + let first_err: Arc>> = Arc::new(Mutex::new(None)); + + // Shared work queue — pre-loaded in caller-supplied order. + let (part_tx, part_rx) = unbounded::(); + for &i in order { + part_tx.send(i).ok(); + } + drop(part_tx); + + let n_cores = self.n_cores; + + std::thread::scope(|s| { + for node in &self.nodes { + let f = Arc::clone(&f); + let on_done = Arc::clone(&on_done); + let first_err = Arc::clone(&first_err); + let part_rx = part_rx.clone(); + + s.spawn(move || { + // Per-node result and activation channels. + let (result_tx, result_rx) = + unbounded::<(usize, Result, Duration)>(); + let (activate_tx, activate_rx) = unbounded::<()>(); + + std::thread::scope(|ws| { + // Pre-spawn workers (all dormant until activated). + for _ in 0..node.max_workers { + let prx = part_rx.clone(); + let rtx = result_tx.clone(); + let arx = activate_rx.clone(); + let f = Arc::clone(&f); + let pool = node.pool.clone(); + let cpu_ids = node.cpu_ids.clone(); + + ws.spawn(move || { + if !cpu_ids.is_empty() { + pin_current_thread(&cpu_ids); + } + if arx.recv().is_err() { + return; // never activated — exit cleanly + } + for i in &prx { + let t = Instant::now(); + let r = match &pool { + Some(p) => p.install(|| f(i)), + None => f(i), + }; + rtx.send((i, r, t.elapsed())).ok(); + } + }); + } + // Drop the controller's copy: result_rx disconnects + // once all worker copies are also dropped (workers done). + drop(result_tx); + + // In NUMA mode activate all workers immediately; + // in UMA mode activate one and grow adaptively. + let numa_mode = node.pool.is_some(); + let initial = if numa_mode { node.max_workers } else { 1 }; + for _ in 0..initial { + activate_tx.send(()).ok(); + } + let mut active_workers = initial; + let mut cpu_sample = CpuSample::now(); + let mut eff_at_last_spawn = 0.0f64; + + // Controller loop. + loop { + match result_rx.recv_timeout(SPAWN_POLL) { + Ok((i, r, dur)) => { + match r { + Ok(v) => { + on_done.lock().unwrap()(i, v, dur); + } + Err(e) => { + let mut g = first_err.lock().unwrap(); + if g.is_none() { *g = Some(e); } + } + } + if !numa_mode && active_workers < node.max_workers { + let eff = cpu_sample.cpu_efficiency(n_cores); + if should_spawn_worker(active_workers, eff, eff_at_last_spawn) { + debug!( + "activated worker {} — efficiency {:.0}%", + active_workers + 1, + eff * 100.0, + ); + activate_tx.send(()).ok(); + active_workers += 1; + eff_at_last_spawn = eff; + cpu_sample = CpuSample::now(); + } + } + } + Err(RecvTimeoutError::Timeout) => { + if !numa_mode && active_workers < node.max_workers { + let eff = cpu_sample.cpu_efficiency(n_cores); + if should_spawn_worker(active_workers, eff, eff_at_last_spawn) { + debug!( + "activated worker {} (poll) — efficiency {:.0}%", + active_workers + 1, + eff * 100.0, + ); + activate_tx.send(()).ok(); + active_workers += 1; + eff_at_last_spawn = eff; + cpu_sample = CpuSample::now(); + } + } + } + Err(RecvTimeoutError::Disconnected) => break, + } + } + // Signal any dormant workers that were never activated + // to exit (UMA mode where max_workers was never reached). + drop(activate_tx); + }); // ws: waits for all workers of this node + }); + } + }); // s: waits for all node controllers + + let mut g = first_err.lock().unwrap(); + match g.take() { + Some(e) => Err(e), + None => Ok(()), + } + } +} -- 2.52.0 From 4a64718fd114145841a53a71155cc3a2f45f7b31 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 15 Jun 2026 11:30:46 +0200 Subject: [PATCH 2/2] perf: replace partition processing with adaptive NUMA worker pool Replaces the previous partition processing logic with an adaptive, NUMA-aware multi-threaded worker pool that dynamically scales active threads based on real-time CPU efficiency. Introduces pre-spawned, CPU-pinned threads managed via crossbeam channels and Rayon to optimize memory bandwidth and core utilization. Adds a `max_workers()` accessor to aggregate maximum worker capacity across NUMA nodes and updates diagnostics to report active versus maximum worker counts. --- src/obikindex/src/merge.rs | 206 +++---------------------------------- src/obikindex/src/numa.rs | 5 + 2 files changed, 19 insertions(+), 192 deletions(-) diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index be17ce6..c637c9b 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -2,10 +2,8 @@ use std::collections::HashMap; use std::fs; use std::io; use std::path::Path; -use std::time::{Duration, Instant}; -use crossbeam_channel::unbounded; -use obisys::{CpuSample, Reporter, Stage, progress_bar, spinner}; +use obisys::{Reporter, Stage, progress_bar, spinner}; use tracing::{debug, info}; use obilayeredmap::IndexMode; @@ -26,24 +24,6 @@ struct PartStat { g_len: usize, } -// ── adaptive spawn criterion ────────────────────────────────────────────────── -// First worker: spawn if efficiency < SPAWN_THRESHOLD (CPU is underutilised). -// Subsequent workers: spawn only if the last spawn raised efficiency by at -// least the expected marginal gain (1/n_workers), with a minimum floor of 3% -// to avoid spurious spawns when efficiency fluctuates around the threshold. -const SPAWN_THRESHOLD: f64 = 0.95; -const MIN_MARGINAL_GAIN: f64 = 0.03; - -fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool { - if n_workers == 1 { - eff < SPAWN_THRESHOLD - } else { - let gain = eff - eff_at_last_spawn; - let expected = 1.0 / n_workers as f64; - gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN) - } -} - // ── main merge entry point ──────────────────────────────────────────────────── impl KmerIndex { @@ -241,191 +221,33 @@ impl KmerIndex { let mut order: Vec = (0..n_partitions).collect(); order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i])); - // ── Adaptive worker pool ────────────────────────────────────────── - // Default (non-NUMA): start with 1 worker, grow adaptively up to - // n_cores/2 based on CPU efficiency. - // - // NUMA mode (Linux, multi-node): one pinned Rayon ThreadPool per - // NUMA node, workers_per_node workers per node, all pre-activated. - // No adaptive spawn: the optimal count is fixed by memory bandwidth. - let n_cores = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - let max_workers = (n_cores / 2).max(1); let _ = budget_fraction; // kept in signature for CLI compatibility - let numa = crate::numa::build(); - - // effective_max_workers: slots to pre-spawn. - // numa_all_active: whether to activate all slots immediately. - let (effective_max_workers, numa_all_active) = match &numa { - Some(ns) => (ns.pools.len() * ns.workers_per_node(), true), - None => (max_workers, false), - }; - - let (part_tx, part_rx) = unbounded::(); - let (result_tx, result_rx) = - unbounded::<(usize, Result, Duration)>(); - // activate_tx: controller sends () to wake the next dormant worker. - // Dropping activate_tx closes the channel; dormant workers exit. - let (activate_tx, activate_rx) = unbounded::<()>(); - - for &i in &order { - part_tx.send(i).ok(); - } - drop(part_tx); - - let mut part_stats: Vec = Vec::with_capacity(n_partitions); - let mut n_workers = 0usize; - let mut cpu_sample = CpuSample::now(); - // Efficiency measured just before each spawn, used to assess - // whether the previous worker delivered its expected marginal gain. - let mut efficiency_at_last_spawn = 0.0f64; - // Shadow as references so closures can capture them by copy. let srcs = &srcs; let evidence = &evidence; - if let Some(ns) = &numa { - debug!( - "NUMA mode: {} node(s) × {} worker(s)/node = {} total workers", - ns.pools.len(), - ns.workers_per_node(), - effective_max_workers, - ); - } + let runner = crate::numa::PartitionRunner::new(); + let mut part_stats: Vec = Vec::with_capacity(n_partitions); - std::thread::scope(|s| -> OKIResult<()> { - // Pre-spawn threads. In NUMA mode each thread is pinned to its - // node's CPUs and wraps merge_partition in pool.install() so - // that all Rayon calls use the node-local ThreadPool, and - // Linux first-touch places graph allocations in local DRAM. - for worker_idx in 0..effective_max_workers { - let prx = part_rx.clone(); - let rtx = result_tx.clone(); - let arx = activate_rx.clone(); - - // Per-worker NUMA config: (pool, cpus) for this slot. - let numa_config: Option<(std::sync::Arc, Vec)> = - numa.as_ref().map(|ns| { - let wpn = ns.workers_per_node(); - let node = worker_idx / wpn; - ( - std::sync::Arc::clone(&ns.pools[node]), - ns.cpus_per_node[node].clone(), - ) - }); - - s.spawn(move || { - if let Some((_, ref cpus)) = numa_config { - crate::numa::pin_current_thread(cpus); - } - if arx.recv().is_ok() { - for i in &prx { - let t = Instant::now(); - let r = if let Some((ref pool, _)) = numa_config { - pool.install(|| { - dst_partition.merge_partition( - i, srcs, mode, n_dst_genomes, block_bits, evidence, - ) - }) - } else { - dst_partition.merge_partition( - i, srcs, mode, n_dst_genomes, block_bits, evidence, - ) - }; - rtx.send((i, r, t.elapsed())).ok(); - } - } - }); - } - drop(result_tx); - - if numa_all_active { - // NUMA: activate every worker immediately. - for _ in 0..effective_max_workers { - activate_tx.send(()).ok(); - } - n_workers = effective_max_workers; - } else { - // Non-NUMA: activate first worker, grow adaptively. - activate_tx.send(()).ok(); - n_workers = 1; - } - - const SPAWN_POLL: Duration = Duration::from_secs(20); - - let mut completed = 0usize; - while completed < n_partitions { - let result = result_rx.recv_timeout(SPAWN_POLL); - - let (i, r, dur) = match result { - Ok(v) => v, - Err(crossbeam_channel::RecvTimeoutError::Timeout) => { - if !numa_all_active && n_workers < effective_max_workers { - let eff = cpu_sample.cpu_efficiency(n_cores); - if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) { - debug!( - "activated worker {} (poll) — efficiency {:.0}%", - n_workers + 1, - eff * 100.0, - ); - efficiency_at_last_spawn = eff; - activate_tx.send(()).ok(); - n_workers += 1; - cpu_sample = CpuSample::now(); - } - } - continue; - } - Err(crossbeam_channel::RecvTimeoutError::Disconnected) => { - return Err(OKIError::Io(io::Error::new( - io::ErrorKind::UnexpectedEof, - "worker channel closed", - ))); - } - }; - let g_len = r.map_err(OKIError::Partition)?; + runner.run( + &order, + |i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence), + |i, g_len, dur| { pb.inc(1); debug!( "partition {i}: done in {:.1}s — {} new kmers", dur.as_secs_f64(), - g_len - ); - part_stats.push(PartStat { - id: i, - unitig_bytes: partition_sizes[i], g_len, - }); - completed += 1; - - if !numa_all_active && n_workers < effective_max_workers && completed < n_partitions { - let eff = cpu_sample.cpu_efficiency(n_cores); - if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) { - debug!( - "activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%", - n_workers + 1, - eff * 100.0, - (eff - efficiency_at_last_spawn) * 100.0, - ); - efficiency_at_last_spawn = eff; - activate_tx.send(()).ok(); - n_workers += 1; - cpu_sample = CpuSample::now(); - } - } - } - // Dropping activate_tx signals dormant workers to exit cleanly - // (non-NUMA). In NUMA mode all workers were already activated so - // this drop is just cleanup. - drop(activate_tx); - Ok(()) - })?; + ); + part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len }); + }, + ).map_err(OKIError::Partition)?; pb.finish_and_clear(); // ── Diagnostic report ───────────────────────────────────────────── - print_merge_partition_report(&part_stats, n_workers, effective_max_workers); + print_merge_partition_report(&part_stats, runner.max_workers()); rep.push(t.stop()); } @@ -447,7 +269,7 @@ impl KmerIndex { // ── Diagnostic report ───────────────────────────────────────────────────────── -fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_workers: usize) { +fn print_merge_partition_report(stats: &[PartStat], max_workers: usize) { let total_new: usize = stats.iter().map(|s| s.g_len).sum(); let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count(); @@ -461,7 +283,7 @@ fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_worker " {} partition(s) processed, {} total new kmers", non_empty, total_new, ); - info!(" workers spawned: {n_workers} / {max_workers} (max)",); + info!(" max workers: {max_workers}"); // Top 8 partitions by new-kmer count let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect(); diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs index 2b0332c..dde62b7 100644 --- a/src/obikindex/src/numa.rs +++ b/src/obikindex/src/numa.rs @@ -154,6 +154,11 @@ pub struct PartitionRunner { impl PartitionRunner { /// Detect topology and build. Falls back to a single-node UMA runner on /// macOS, single-socket machines, or hwloc failure. + /// Total number of pre-spawned worker slots across all nodes. + pub fn max_workers(&self) -> usize { + self.nodes.iter().map(|n| n.max_workers).sum() + } + pub fn new() -> Self { let n_cores = std::thread::available_parallelism() .map(|n| n.get()) -- 2.52.0