Merge pull request 'Push rtnzuqxzmkon' (#31) from push-rtnzuqxzmkon into main

Reviewed-on: #31
This commit was merged in pull request #31.
This commit is contained in:
2026-06-15 09:40:35 +00:00
4 changed files with 448 additions and 193 deletions
+179
View File
@@ -0,0 +1,179 @@
# NUMA-aware partition runner
## Problem
All partition-level parallel loops in obikindex currently fall into two
categories:
**Naive Rayon** — used in `build_layers`, `pack_matrices`, `dump`, `select`,
`stats`, `rebuild`, `reindex`:
```rust
(0..n).into_par_iter().for_each(|i| work(i));
```
Threads come from the global Rayon pool with no NUMA awareness. On
multi-socket machines this produces cross-socket memory traffic and degrades
performance super-linearly (see [NUMA-aware worker pools](numa_worker_pools.md)).
**Ad-hoc adaptive pool** — used in `merge`:
A bespoke implementation with pre-spawned workers, channel-based dispatch, and
activation control. It handles NUMA correctly but is not reusable.
Both cases should be replaced by a single generic mechanism.
## Unified model
The key insight is that **UMA is just the NUMA case with a single node**. The
runner always works the same way: one controller thread per node, each
independently managing its own workers with the same adaptive logic. The only
difference between UMA and NUMA is the number of nodes and whether workers are
pinned.
```
NUMA (k nodes) UMA (1 node)
controller-0 controller-1 … controller-0
│ │ │
workers[0] workers[1] workers[0]
(pinned) (pinned) (global pool)
└───────────────┴──────────────────┘
shared work queue
```
On each node, the Rayon `ThreadPool` is pinned to that node's CPUs.
`pool.install()` ensures all internal Rayon calls (inside the work function)
use the node-local pool. Linux first-touch then places heap allocations in
local DRAM automatically.
On UMA the global Rayon pool is used directly — no pinning, no overhead.
## Adaptive mechanism
Each controller follows the same logic regardless of node count:
1. Pre-spawn `workers_per_node` dormant worker threads (blocked on `activate_rx`).
2. Activate the first worker immediately.
3. Loop on result channel with a `SPAWN_POLL` timeout:
- On result: call `on_done`; check whether to activate the next worker.
- On timeout: same check.
- Activation criterion: `should_spawn_worker(active, global_efficiency, prev_efficiency)`.
4. Drop `activate_tx` when done — dormant workers exit cleanly.
**Global CPU efficiency** (`CpuSample`, reads `/proc/stat` on Linux) is used by
all controllers — no per-node measurement needed. The signal is coarser than
per-node efficiency but correct in practice: if any node saturates memory
bandwidth, the global efficiency drops and all controllers stop activating
workers. Using a standard portable primitive avoids platform-specific CPU
accounting and keeps the implementation clean.
## Proposed API
```rust
pub struct PartitionRunner {
// One entry per NUMA node; one entry total on UMA.
nodes: Vec<NodeConfig>,
}
struct NodeConfig {
pool: Option<Arc<rayon::ThreadPool>>, // None = global Rayon pool (UMA)
cpu_ids: Vec<usize>, // empty = no pinning (UMA)
max_workers: usize,
}
impl PartitionRunner {
/// Detect topology and build the runner.
/// Returns a single-node runner on UMA / macOS / hwloc failure.
pub fn new() -> Self;
/// Run `f(i)` for every index in `order`, collecting results.
///
/// `on_done(i, result, elapsed)` is called under an internal mutex as
/// each partition completes — use it for progress bars and aggregation.
/// The runner serialises all calls to `on_done` via an internal
/// `Arc<Mutex<C>>`, so no `Sync` bound is required on the callback.
/// `Send` is required because the Arc clone crosses thread boundaries.
///
/// Serialisation is free in practice: a partition takes seconds to
/// minutes; the callback takes microseconds. Contention is negligible.
///
/// Returns the first error from `f`, if any.
pub fn run<F, R, E, C>(
&self,
order: &[usize],
f: F,
on_done: C,
) -> Result<(), E>
where
F: Fn(usize) -> Result<R, E> + Send + Sync,
R: Send,
E: Send,
C: FnMut(usize, R, Duration) + Send; // Send required, Sync is not
}
```
`order` is caller-supplied so each command chooses its scheduling strategy:
largest-first for `merge`, sequential for `build_layers`, etc.
## Migration examples
### merge.rs (before: ~180 lines of bespoke machinery)
```rust
let runner = PartitionRunner::new();
runner.run(
&order,
|i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence)
.map_err(OKIError::Partition),
|i, g_len, dur| {
pb.inc(1);
debug!("partition {i}: done in {:.1}s — {g_len} new kmers", dur.as_secs_f64());
part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len });
},
)?;
```
### index.rs build_layers (before: naive into_par_iter)
```rust
let order: Vec<usize> = (0..n).collect();
let runner = PartitionRunner::new();
runner.run(
&order,
|i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits)
.map_err(OKIError::Partition),
|_, n_kmers, _| {
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
pb.inc(1);
},
)?;
```
All other sites (`pack_matrices`, `dump`, `select`, etc.) follow the same
pattern.
## Placement
`PartitionRunner` lives in `obikindex/src/numa.rs` alongside `NumaSetup`.
It depends only on standard library primitives and Rayon — no new dependencies.
A single `PartitionRunner` instance can be built once per command invocation
and reused across multiple `run()` calls (e.g. `merge` runs
`merge_partitions` then `pack_matrices`).
## Open questions
- **Error handling**: `run` currently returns the first error; remaining errors
are dropped. A `Vec<E>` return would give complete diagnostics.
- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from
a higher value. A per-call override could be added to the API.
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
boundaries); `Sync` is not (only one thread holds the lock at a time).
Contention is negligible because a partition takes seconds while the callback
takes microseconds. The callback is therefore simple to write (plain
`Vec::push`, plain `FnMut`) with no measurable performance cost.
+1
View File
@@ -57,6 +57,7 @@ nav:
- Sequences: architecture/sequences/invariant.md
- Kmer index: architecture/index_architecture.md
- NUMA-aware worker pools: architecture/numa_worker_pools.md
- NUMA-aware partition runner: architecture/numa_partition_runner.md
watch:
- docmd
+14 -192
View File
@@ -2,10 +2,8 @@ use std::collections::HashMap;
use std::fs;
use std::io;
use std::path::Path;
use std::time::{Duration, Instant};
use crossbeam_channel::unbounded;
use obisys::{CpuSample, Reporter, Stage, progress_bar, spinner};
use obisys::{Reporter, Stage, progress_bar, spinner};
use tracing::{debug, info};
use obilayeredmap::IndexMode;
@@ -26,24 +24,6 @@ struct PartStat {
g_len: usize,
}
// ── adaptive spawn criterion ──────────────────────────────────────────────────
// First worker: spawn if efficiency < SPAWN_THRESHOLD (CPU is underutilised).
// Subsequent workers: spawn only if the last spawn raised efficiency by at
// least the expected marginal gain (1/n_workers), with a minimum floor of 3%
// to avoid spurious spawns when efficiency fluctuates around the threshold.
const SPAWN_THRESHOLD: f64 = 0.95;
const MIN_MARGINAL_GAIN: f64 = 0.03;
fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool {
if n_workers == 1 {
eff < SPAWN_THRESHOLD
} else {
let gain = eff - eff_at_last_spawn;
let expected = 1.0 / n_workers as f64;
gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN)
}
}
// ── main merge entry point ────────────────────────────────────────────────────
impl KmerIndex {
@@ -241,191 +221,33 @@ impl KmerIndex {
let mut order: Vec<usize> = (0..n_partitions).collect();
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
// ── Adaptive worker pool ──────────────────────────────────────────
// Default (non-NUMA): start with 1 worker, grow adaptively up to
// n_cores/2 based on CPU efficiency.
//
// NUMA mode (Linux, multi-node): one pinned Rayon ThreadPool per
// NUMA node, workers_per_node workers per node, all pre-activated.
// No adaptive spawn: the optimal count is fixed by memory bandwidth.
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let max_workers = (n_cores / 2).max(1);
let _ = budget_fraction; // kept in signature for CLI compatibility
let numa = crate::numa::build();
// effective_max_workers: slots to pre-spawn.
// numa_all_active: whether to activate all slots immediately.
let (effective_max_workers, numa_all_active) = match &numa {
Some(ns) => (ns.pools.len() * ns.workers_per_node(), true),
None => (max_workers, false),
};
let (part_tx, part_rx) = unbounded::<usize>();
let (result_tx, result_rx) =
unbounded::<(usize, Result<usize, obiskio::SKError>, Duration)>();
// activate_tx: controller sends () to wake the next dormant worker.
// Dropping activate_tx closes the channel; dormant workers exit.
let (activate_tx, activate_rx) = unbounded::<()>();
for &i in &order {
part_tx.send(i).ok();
}
drop(part_tx);
let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
let mut n_workers = 0usize;
let mut cpu_sample = CpuSample::now();
// Efficiency measured just before each spawn, used to assess
// whether the previous worker delivered its expected marginal gain.
let mut efficiency_at_last_spawn = 0.0f64;
// Shadow as references so closures can capture them by copy.
let srcs = &srcs;
let evidence = &evidence;
if let Some(ns) = &numa {
debug!(
"NUMA mode: {} node(s) × {} worker(s)/node = {} total workers",
ns.pools.len(),
ns.workers_per_node(),
effective_max_workers,
);
}
let runner = crate::numa::PartitionRunner::new();
let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
std::thread::scope(|s| -> OKIResult<()> {
// Pre-spawn threads. In NUMA mode each thread is pinned to its
// node's CPUs and wraps merge_partition in pool.install() so
// that all Rayon calls use the node-local ThreadPool, and
// Linux first-touch places graph allocations in local DRAM.
for worker_idx in 0..effective_max_workers {
let prx = part_rx.clone();
let rtx = result_tx.clone();
let arx = activate_rx.clone();
// Per-worker NUMA config: (pool, cpus) for this slot.
let numa_config: Option<(std::sync::Arc<rayon::ThreadPool>, Vec<usize>)> =
numa.as_ref().map(|ns| {
let wpn = ns.workers_per_node();
let node = worker_idx / wpn;
(
std::sync::Arc::clone(&ns.pools[node]),
ns.cpus_per_node[node].clone(),
)
});
s.spawn(move || {
if let Some((_, ref cpus)) = numa_config {
crate::numa::pin_current_thread(cpus);
}
if arx.recv().is_ok() {
for i in &prx {
let t = Instant::now();
let r = if let Some((ref pool, _)) = numa_config {
pool.install(|| {
dst_partition.merge_partition(
i, srcs, mode, n_dst_genomes, block_bits, evidence,
)
})
} else {
dst_partition.merge_partition(
i, srcs, mode, n_dst_genomes, block_bits, evidence,
)
};
rtx.send((i, r, t.elapsed())).ok();
}
}
});
}
drop(result_tx);
if numa_all_active {
// NUMA: activate every worker immediately.
for _ in 0..effective_max_workers {
activate_tx.send(()).ok();
}
n_workers = effective_max_workers;
} else {
// Non-NUMA: activate first worker, grow adaptively.
activate_tx.send(()).ok();
n_workers = 1;
}
const SPAWN_POLL: Duration = Duration::from_secs(20);
let mut completed = 0usize;
while completed < n_partitions {
let result = result_rx.recv_timeout(SPAWN_POLL);
let (i, r, dur) = match result {
Ok(v) => v,
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
if !numa_all_active && n_workers < effective_max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
debug!(
"activated worker {} (poll) — efficiency {:.0}%",
n_workers + 1,
eff * 100.0,
);
efficiency_at_last_spawn = eff;
activate_tx.send(()).ok();
n_workers += 1;
cpu_sample = CpuSample::now();
}
}
continue;
}
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
return Err(OKIError::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"worker channel closed",
)));
}
};
let g_len = r.map_err(OKIError::Partition)?;
runner.run(
&order,
|i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence),
|i, g_len, dur| {
pb.inc(1);
debug!(
"partition {i}: done in {:.1}s — {} new kmers",
dur.as_secs_f64(),
g_len
);
part_stats.push(PartStat {
id: i,
unitig_bytes: partition_sizes[i],
g_len,
});
completed += 1;
if !numa_all_active && n_workers < effective_max_workers && completed < n_partitions {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
debug!(
"activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%",
n_workers + 1,
eff * 100.0,
(eff - efficiency_at_last_spawn) * 100.0,
);
efficiency_at_last_spawn = eff;
activate_tx.send(()).ok();
n_workers += 1;
cpu_sample = CpuSample::now();
}
}
}
// Dropping activate_tx signals dormant workers to exit cleanly
// (non-NUMA). In NUMA mode all workers were already activated so
// this drop is just cleanup.
drop(activate_tx);
Ok(())
})?;
);
part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len });
},
).map_err(OKIError::Partition)?;
pb.finish_and_clear();
// ── Diagnostic report ─────────────────────────────────────────────
print_merge_partition_report(&part_stats, n_workers, effective_max_workers);
print_merge_partition_report(&part_stats, runner.max_workers());
rep.push(t.stop());
}
@@ -447,7 +269,7 @@ impl KmerIndex {
// ── Diagnostic report ─────────────────────────────────────────────────────────
fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_workers: usize) {
fn print_merge_partition_report(stats: &[PartStat], max_workers: usize) {
let total_new: usize = stats.iter().map(|s| s.g_len).sum();
let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count();
@@ -461,7 +283,7 @@ fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_worker
" {} partition(s) processed, {} total new kmers",
non_empty, total_new,
);
info!(" workers spawned: {n_workers} / {max_workers} (max)",);
info!(" max workers: {max_workers}");
// Top 8 partitions by new-kmer count
let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect();
+254 -1
View File
@@ -10,12 +10,15 @@
// - the system has only one NUMA node (UMA, Apple Silicon, single-socket)
// - any per-node pool fails to build
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crossbeam_channel::{RecvTimeoutError, unbounded};
use hwlocality::Topology;
use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet;
use hwlocality::object::types::ObjectType;
use obisys::CpuSample;
use tracing::debug;
// ── Public interface ──────────────────────────────────────────────────────────
@@ -100,3 +103,253 @@ fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
.build()
.ok()
}
// ── Adaptive spawn heuristic ──────────────────────────────────────────────────
//
// First worker: spawn if CPU efficiency is below SPAWN_THRESHOLD (machine is
// under-utilised). Subsequent workers: spawn only if the last worker raised
// efficiency by at least the expected marginal gain (1/n_workers), with a
// minimum floor to avoid spurious spawns from efficiency fluctuations.
const SPAWN_THRESHOLD: f64 = 0.95;
const MIN_MARGINAL_GAIN: f64 = 0.03;
const SPAWN_POLL: Duration = Duration::from_secs(20);
fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool {
if n_workers == 1 {
eff < SPAWN_THRESHOLD
} else {
let gain = eff - eff_at_last_spawn;
let expected = 1.0 / n_workers as f64;
gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN)
}
}
// ── PartitionRunner ───────────────────────────────────────────────────────────
struct NodeConfig {
pool: Option<Arc<rayon::ThreadPool>>,
cpu_ids: Vec<usize>,
max_workers: usize,
}
/// Generic NUMA-aware runner for partition-level parallel work.
///
/// Encapsulates worker spawning, NUMA pinning, adaptive activation, and result
/// collection. UMA systems are handled as the degenerate case of a single node
/// with no pinning.
///
/// # Model
///
/// One controller thread per NUMA node (one total on UMA). Each controller
/// manages up to `max_workers` dormant workers that drain a shared work queue.
/// Workers are activated one at a time; a new worker is added when global CPU
/// efficiency justifies it. On NUMA all workers are activated immediately
/// (memory bandwidth, not CPU count, is the bottleneck).
pub struct PartitionRunner {
nodes: Vec<NodeConfig>,
n_cores: usize,
}
impl PartitionRunner {
/// Detect topology and build. Falls back to a single-node UMA runner on
/// macOS, single-socket machines, or hwloc failure.
/// Total number of pre-spawned worker slots across all nodes.
pub fn max_workers(&self) -> usize {
self.nodes.iter().map(|n| n.max_workers).sum()
}
pub fn new() -> Self {
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
match build() {
Some(ns) => {
let wpn = ns.workers_per_node();
debug!(
"PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node",
ns.pools.len(),
wpn,
);
let nodes = ns.pools
.into_iter()
.zip(ns.cpus_per_node)
.map(|(pool, cpu_ids)| NodeConfig {
pool: Some(pool),
cpu_ids,
max_workers: wpn,
})
.collect();
Self { nodes, n_cores }
}
None => {
let max_workers = (n_cores / 2).max(1);
debug!(
"PartitionRunner: UMA mode — adaptive up to {} worker(s)",
max_workers,
);
Self {
nodes: vec![NodeConfig {
pool: None,
cpu_ids: vec![],
max_workers,
}],
n_cores,
}
}
}
}
/// Run `f(i)` for every index in `order`.
///
/// `on_done(i, result, elapsed)` is called under an internal mutex as each
/// partition completes — suitable for progress bars, logging, and result
/// aggregation. No `Send` or `Sync` bound is required on the callback.
///
/// The work queue is shared across all NUMA nodes: any idle worker takes
/// the next available partition regardless of node, ensuring load balance.
///
/// Returns the first error produced by `f`, if any.
pub fn run<F, R, E, C>(
&self,
order: &[usize],
f: F,
on_done: C,
) -> Result<(), E>
where
F: Fn(usize) -> Result<R, E> + Send + Sync,
R: Send,
E: Send,
C: FnMut(usize, R, Duration) + Send,
{
let f = Arc::new(f);
let on_done = Arc::new(Mutex::new(on_done));
let first_err: Arc<Mutex<Option<E>>> = Arc::new(Mutex::new(None));
// Shared work queue — pre-loaded in caller-supplied order.
let (part_tx, part_rx) = unbounded::<usize>();
for &i in order {
part_tx.send(i).ok();
}
drop(part_tx);
let n_cores = self.n_cores;
std::thread::scope(|s| {
for node in &self.nodes {
let f = Arc::clone(&f);
let on_done = Arc::clone(&on_done);
let first_err = Arc::clone(&first_err);
let part_rx = part_rx.clone();
s.spawn(move || {
// Per-node result and activation channels.
let (result_tx, result_rx) =
unbounded::<(usize, Result<R, E>, Duration)>();
let (activate_tx, activate_rx) = unbounded::<()>();
std::thread::scope(|ws| {
// Pre-spawn workers (all dormant until activated).
for _ in 0..node.max_workers {
let prx = part_rx.clone();
let rtx = result_tx.clone();
let arx = activate_rx.clone();
let f = Arc::clone(&f);
let pool = node.pool.clone();
let cpu_ids = node.cpu_ids.clone();
ws.spawn(move || {
if !cpu_ids.is_empty() {
pin_current_thread(&cpu_ids);
}
if arx.recv().is_err() {
return; // never activated — exit cleanly
}
for i in &prx {
let t = Instant::now();
let r = match &pool {
Some(p) => p.install(|| f(i)),
None => f(i),
};
rtx.send((i, r, t.elapsed())).ok();
}
});
}
// Drop the controller's copy: result_rx disconnects
// once all worker copies are also dropped (workers done).
drop(result_tx);
// In NUMA mode activate all workers immediately;
// in UMA mode activate one and grow adaptively.
let numa_mode = node.pool.is_some();
let initial = if numa_mode { node.max_workers } else { 1 };
for _ in 0..initial {
activate_tx.send(()).ok();
}
let mut active_workers = initial;
let mut cpu_sample = CpuSample::now();
let mut eff_at_last_spawn = 0.0f64;
// Controller loop.
loop {
match result_rx.recv_timeout(SPAWN_POLL) {
Ok((i, r, dur)) => {
match r {
Ok(v) => {
on_done.lock().unwrap()(i, v, dur);
}
Err(e) => {
let mut g = first_err.lock().unwrap();
if g.is_none() { *g = Some(e); }
}
}
if !numa_mode && active_workers < node.max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
debug!(
"activated worker {} — efficiency {:.0}%",
active_workers + 1,
eff * 100.0,
);
activate_tx.send(()).ok();
active_workers += 1;
eff_at_last_spawn = eff;
cpu_sample = CpuSample::now();
}
}
}
Err(RecvTimeoutError::Timeout) => {
if !numa_mode && active_workers < node.max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
debug!(
"activated worker {} (poll) — efficiency {:.0}%",
active_workers + 1,
eff * 100.0,
);
activate_tx.send(()).ok();
active_workers += 1;
eff_at_last_spawn = eff;
cpu_sample = CpuSample::now();
}
}
}
Err(RecvTimeoutError::Disconnected) => break,
}
}
// Signal any dormant workers that were never activated
// to exit (UMA mode where max_workers was never reached).
drop(activate_tx);
}); // ws: waits for all workers of this node
});
}
}); // s: waits for all node controllers
let mut g = first_err.lock().unwrap();
match g.take() {
Some(e) => Err(e),
None => Ok(()),
}
}
}