feat: implement NUMA-aware worker pools for merge command

Replaces the global Rayon pool with per-NUMA-node thread pools that pin worker threads to their respective nodes, leveraging Linux first-touch allocation to reduce cross-NUMA memory contention and improve cache locality. Integrates the `hwlocality` crate with a vendored build, includes graceful fallbacks for single-socket or non-Linux systems, and updates dependency constraints. Also adds installation and architecture documentation, and corrects parallelism detection in the partitioner.
This commit is contained in:
Eric Coissac
2026-06-14 23:40:09 +02:00
parent f1d76f3203
commit ea767376bd
9 changed files with 654 additions and 34 deletions
+1
View File
@@ -17,3 +17,4 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
indicatif = "0.17"
tracing = "0.1.44"
hwlocality = { version = "1.0.0-alpha.11", features = ["vendored"] }
+1
View File
@@ -5,6 +5,7 @@ mod distance;
mod dump;
mod index;
mod merge;
mod numa;
mod rebuild;
mod reindex;
mod select;
+72 -24
View File
@@ -242,17 +242,27 @@ impl KmerIndex {
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
// ── Adaptive worker pool ──────────────────────────────────────────
// Start with 1 worker thread. After each completed partition,
// measure CPU efficiency (via getrusage delta). If efficiency is
// below the spawn threshold and more partitions remain, spawn one
// additional worker. Workers share a crossbeam channel of partition
// IDs; each reports (id, g_len, duration) on a result channel.
// Default (non-NUMA): start with 1 worker, grow adaptively up to
// n_cores/2 based on CPU efficiency.
//
// NUMA mode (Linux, multi-node): one pinned Rayon ThreadPool per
// NUMA node, workers_per_node workers per node, all pre-activated.
// No adaptive spawn: the optimal count is fixed by memory bandwidth.
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let max_workers = (n_cores / 2).max(1);
let _ = budget_fraction; // kept in signature for CLI compatibility
let numa = crate::numa::build();
// effective_max_workers: slots to pre-spawn.
// numa_all_active: whether to activate all slots immediately.
let (effective_max_workers, numa_all_active) = match &numa {
Some(ns) => (ns.pools.len() * ns.workers_per_node(), true),
None => (max_workers, false),
};
let (part_tx, part_rx) = unbounded::<usize>();
let (result_tx, result_rx) =
unbounded::<(usize, Result<usize, obiskio::SKError>, Duration)>();
@@ -276,25 +286,54 @@ impl KmerIndex {
let srcs = &srcs;
let evidence = &evidence;
if let Some(ns) = &numa {
debug!(
"NUMA mode: {} node(s) × {} worker(s)/node = {} total workers",
ns.pools.len(),
ns.workers_per_node(),
effective_max_workers,
);
}
std::thread::scope(|s| -> OKIResult<()> {
// Pre-spawn max_workers threads; each waits for an activation
// signal before consuming from part_rx.
for _ in 0..max_workers {
// Pre-spawn threads. In NUMA mode each thread is pinned to its
// node's CPUs and wraps merge_partition in pool.install() so
// that all Rayon calls use the node-local ThreadPool, and
// Linux first-touch places graph allocations in local DRAM.
for worker_idx in 0..effective_max_workers {
let prx = part_rx.clone();
let rtx = result_tx.clone();
let arx = activate_rx.clone();
// Per-worker NUMA config: (pool, cpus) for this slot.
let numa_config: Option<(std::sync::Arc<rayon::ThreadPool>, Vec<usize>)> =
numa.as_ref().map(|ns| {
let wpn = ns.workers_per_node();
let node = worker_idx / wpn;
(
std::sync::Arc::clone(&ns.pools[node]),
ns.cpus_per_node[node].clone(),
)
});
s.spawn(move || {
if let Some((_, ref cpus)) = numa_config {
crate::numa::pin_current_thread(cpus);
}
if arx.recv().is_ok() {
for i in &prx {
let t = Instant::now();
let r = dst_partition.merge_partition(
i,
srcs,
mode,
n_dst_genomes,
block_bits,
evidence,
);
let r = if let Some((ref pool, _)) = numa_config {
pool.install(|| {
dst_partition.merge_partition(
i, srcs, mode, n_dst_genomes, block_bits, evidence,
)
})
} else {
dst_partition.merge_partition(
i, srcs, mode, n_dst_genomes, block_bits, evidence,
)
};
rtx.send((i, r, t.elapsed())).ok();
}
}
@@ -302,9 +341,17 @@ impl KmerIndex {
}
drop(result_tx);
// Activate first worker immediately.
activate_tx.send(()).ok();
n_workers = 1;
if numa_all_active {
// NUMA: activate every worker immediately.
for _ in 0..effective_max_workers {
activate_tx.send(()).ok();
}
n_workers = effective_max_workers;
} else {
// Non-NUMA: activate first worker, grow adaptively.
activate_tx.send(()).ok();
n_workers = 1;
}
const SPAWN_POLL: Duration = Duration::from_secs(20);
@@ -312,11 +359,10 @@ impl KmerIndex {
while completed < n_partitions {
let result = result_rx.recv_timeout(SPAWN_POLL);
// On timeout: no partition finished yet, just check efficiency.
let (i, r, dur) = match result {
Ok(v) => v,
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
if n_workers < max_workers {
if !numa_all_active && n_workers < effective_max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
debug!(
@@ -353,7 +399,7 @@ impl KmerIndex {
});
completed += 1;
if n_workers < max_workers && completed < n_partitions {
if !numa_all_active && n_workers < effective_max_workers && completed < n_partitions {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
debug!(
@@ -369,7 +415,9 @@ impl KmerIndex {
}
}
}
// Close activate_tx: dormant workers exit cleanly.
// Dropping activate_tx signals dormant workers to exit cleanly
// (non-NUMA). In NUMA mode all workers were already activated so
// this drop is just cleanup.
drop(activate_tx);
Ok(())
})?;
@@ -377,7 +425,7 @@ impl KmerIndex {
pb.finish_and_clear();
// ── Diagnostic report ─────────────────────────────────────────────
print_merge_partition_report(&part_stats, n_workers, max_workers);
print_merge_partition_report(&part_stats, n_workers, effective_max_workers);
rep.push(t.stop());
}
+102
View File
@@ -0,0 +1,102 @@
// NUMA-aware Rayon thread pools via hwlocality.
//
// Detects NUMA topology using hwloc (cross-platform: Linux, macOS, etc.) and
// builds one Rayon ThreadPool per NUMA node with threads pinned to that node's
// CPUs. Linux first-touch policy then places graph allocations in local DRAM
// automatically — no explicit memory binding needed.
//
// Returns None when:
// - hwloc topology initialisation fails
// - the system has only one NUMA node (UMA, Apple Silicon, single-socket)
// - any per-node pool fails to build
use std::sync::Arc;
use hwlocality::Topology;
use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet;
use hwlocality::object::types::ObjectType;
use tracing::debug;
// ── Public interface ──────────────────────────────────────────────────────────
pub struct NumaSetup {
pub pools: Vec<Arc<rayon::ThreadPool>>,
/// CPU indices for each NUMA node, in node order.
pub cpus_per_node: Vec<Vec<usize>>,
}
impl NumaSetup {
/// Workers to activate per NUMA node.
/// Empirically ~3 workers saturate one node's memory bandwidth.
pub fn workers_per_node(&self) -> usize {
self.cpus_per_node
.first()
.map(|c| (c.len() / 8).max(3).min(8))
.unwrap_or(3)
}
}
/// Detect NUMA topology and build per-node Rayon pools.
/// Returns None on UMA systems, single-node machines, or on failure.
pub fn build() -> Option<NumaSetup> {
let topology = Topology::new().ok()?;
let nodes: Vec<Vec<usize>> = topology
.objects_with_type(ObjectType::NUMANode)
.filter_map(|obj| obj.cpuset())
.map(|cpuset| {
cpuset
.iter_set()
.map(|idx| usize::from(idx))
.collect::<Vec<_>>()
})
.filter(|v| !v.is_empty())
.collect();
if nodes.len() <= 1 {
return None;
}
debug!(
"NUMA topology: {} node(s), {} core(s)/node",
nodes.len(),
nodes.first().map_or(0, |v| v.len()),
);
let pools = nodes
.iter()
.map(|cpus| build_pool(cpus).map(Arc::new))
.collect::<Option<Vec<_>>>()?;
Some(NumaSetup { pools, cpus_per_node: nodes })
}
/// Bind the calling thread to `cpu_indices` using hwloc.
/// Silently returns on any error so the thread still runs, just unbound.
pub fn pin_current_thread(cpu_indices: &[usize]) {
let Ok(topology) = Topology::new() else { return };
let mut cpuset = CpuSet::new();
for &idx in cpu_indices {
cpuset.set(idx);
}
let _ = topology.bind_cpu(&cpuset, CpuBindingFlags::THREAD);
}
// ── Internal helpers ──────────────────────────────────────────────────────────
fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
let cpus = cpus.to_vec();
rayon::ThreadPoolBuilder::new()
.num_threads(cpus.len())
.spawn_handler(move |thread| {
let cpus = cpus.clone();
std::thread::Builder::new().spawn(move || {
pin_current_thread(&cpus);
thread.run();
})?;
Ok(())
})
.build()
.ok()
}