2026-06-15 11:24:37 +02:00
|
|
|
|
# NUMA-aware partition runner
|
|
|
|
|
|
|
|
|
|
|
|
## Problem
|
|
|
|
|
|
|
|
|
|
|
|
All partition-level parallel loops in obikindex currently fall into two
|
|
|
|
|
|
categories:
|
|
|
|
|
|
|
|
|
|
|
|
**Naive Rayon** — used in `build_layers`, `pack_matrices`, `dump`, `select`,
|
|
|
|
|
|
`stats`, `rebuild`, `reindex`:
|
|
|
|
|
|
|
|
|
|
|
|
```rust
|
|
|
|
|
|
(0..n).into_par_iter().for_each(|i| work(i));
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
Threads come from the global Rayon pool with no NUMA awareness. On
|
|
|
|
|
|
multi-socket machines this produces cross-socket memory traffic and degrades
|
|
|
|
|
|
performance super-linearly (see [NUMA-aware worker pools](numa_worker_pools.md)).
|
|
|
|
|
|
|
|
|
|
|
|
**Ad-hoc adaptive pool** — used in `merge`:
|
|
|
|
|
|
|
|
|
|
|
|
A bespoke implementation with pre-spawned workers, channel-based dispatch, and
|
|
|
|
|
|
activation control. It handles NUMA correctly but is not reusable.
|
|
|
|
|
|
|
|
|
|
|
|
Both cases should be replaced by a single generic mechanism.
|
|
|
|
|
|
|
|
|
|
|
|
## Unified model
|
|
|
|
|
|
|
|
|
|
|
|
The key insight is that **UMA is just the NUMA case with a single node**. The
|
|
|
|
|
|
runner always works the same way: one controller thread per node, each
|
|
|
|
|
|
independently managing its own workers with the same adaptive logic. The only
|
|
|
|
|
|
difference between UMA and NUMA is the number of nodes and whether workers are
|
|
|
|
|
|
pinned.
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
NUMA (k nodes) UMA (1 node)
|
|
|
|
|
|
|
|
|
|
|
|
controller-0 controller-1 … controller-0
|
|
|
|
|
|
│ │ │
|
|
|
|
|
|
workers[0] workers[1] workers[0]
|
|
|
|
|
|
(pinned) (pinned) (global pool)
|
|
|
|
|
|
└───────────────┴──────────────────┘
|
|
|
|
|
|
shared work queue
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
On each node, the Rayon `ThreadPool` is pinned to that node's CPUs.
|
|
|
|
|
|
`pool.install()` ensures all internal Rayon calls (inside the work function)
|
|
|
|
|
|
use the node-local pool. Linux first-touch then places heap allocations in
|
|
|
|
|
|
local DRAM automatically.
|
|
|
|
|
|
|
|
|
|
|
|
On UMA the global Rayon pool is used directly — no pinning, no overhead.
|
|
|
|
|
|
|
|
|
|
|
|
## Adaptive mechanism
|
|
|
|
|
|
|
|
|
|
|
|
Each controller follows the same logic regardless of node count:
|
|
|
|
|
|
|
|
|
|
|
|
1. Pre-spawn `workers_per_node` dormant worker threads (blocked on `activate_rx`).
|
|
|
|
|
|
2. Activate the first worker immediately.
|
|
|
|
|
|
3. Loop on result channel with a `SPAWN_POLL` timeout:
|
|
|
|
|
|
- On result: call `on_done`; check whether to activate the next worker.
|
|
|
|
|
|
- On timeout: same check.
|
|
|
|
|
|
- Activation criterion: `should_spawn_worker(active, global_efficiency, prev_efficiency)`.
|
|
|
|
|
|
4. Drop `activate_tx` when done — dormant workers exit cleanly.
|
|
|
|
|
|
|
|
|
|
|
|
**Global CPU efficiency** (`CpuSample`, reads `/proc/stat` on Linux) is used by
|
|
|
|
|
|
all controllers — no per-node measurement needed. The signal is coarser than
|
|
|
|
|
|
per-node efficiency but correct in practice: if any node saturates memory
|
|
|
|
|
|
bandwidth, the global efficiency drops and all controllers stop activating
|
|
|
|
|
|
workers. Using a standard portable primitive avoids platform-specific CPU
|
|
|
|
|
|
accounting and keeps the implementation clean.
|
|
|
|
|
|
|
|
|
|
|
|
## Proposed API
|
|
|
|
|
|
|
|
|
|
|
|
```rust
|
|
|
|
|
|
pub struct PartitionRunner {
|
|
|
|
|
|
// One entry per NUMA node; one entry total on UMA.
|
|
|
|
|
|
nodes: Vec<NodeConfig>,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
struct NodeConfig {
|
|
|
|
|
|
pool: Option<Arc<rayon::ThreadPool>>, // None = global Rayon pool (UMA)
|
|
|
|
|
|
cpu_ids: Vec<usize>, // empty = no pinning (UMA)
|
|
|
|
|
|
max_workers: usize,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
impl PartitionRunner {
|
|
|
|
|
|
/// Detect topology and build the runner.
|
|
|
|
|
|
/// Returns a single-node runner on UMA / macOS / hwloc failure.
|
|
|
|
|
|
pub fn new() -> Self;
|
|
|
|
|
|
|
|
|
|
|
|
/// Run `f(i)` for every index in `order`, collecting results.
|
|
|
|
|
|
///
|
|
|
|
|
|
/// `on_done(i, result, elapsed)` is called under an internal mutex as
|
|
|
|
|
|
/// each partition completes — use it for progress bars and aggregation.
|
|
|
|
|
|
/// The runner serialises all calls to `on_done` via an internal
|
|
|
|
|
|
/// `Arc<Mutex<C>>`, so no `Sync` bound is required on the callback.
|
|
|
|
|
|
/// `Send` is required because the Arc clone crosses thread boundaries.
|
|
|
|
|
|
///
|
|
|
|
|
|
/// Serialisation is free in practice: a partition takes seconds to
|
|
|
|
|
|
/// minutes; the callback takes microseconds. Contention is negligible.
|
|
|
|
|
|
///
|
|
|
|
|
|
/// Returns the first error from `f`, if any.
|
|
|
|
|
|
pub fn run<F, R, E, C>(
|
|
|
|
|
|
&self,
|
|
|
|
|
|
order: &[usize],
|
|
|
|
|
|
f: F,
|
|
|
|
|
|
on_done: C,
|
|
|
|
|
|
) -> Result<(), E>
|
|
|
|
|
|
where
|
|
|
|
|
|
F: Fn(usize) -> Result<R, E> + Send + Sync,
|
|
|
|
|
|
R: Send,
|
|
|
|
|
|
E: Send,
|
|
|
|
|
|
C: FnMut(usize, R, Duration) + Send; // Send required, Sync is not
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
`order` is caller-supplied so each command chooses its scheduling strategy:
|
|
|
|
|
|
largest-first for `merge`, sequential for `build_layers`, etc.
|
|
|
|
|
|
|
|
|
|
|
|
## Migration examples
|
|
|
|
|
|
|
|
|
|
|
|
### merge.rs (before: ~180 lines of bespoke machinery)
|
|
|
|
|
|
|
|
|
|
|
|
```rust
|
|
|
|
|
|
let runner = PartitionRunner::new();
|
|
|
|
|
|
runner.run(
|
|
|
|
|
|
&order,
|
|
|
|
|
|
|i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence)
|
|
|
|
|
|
.map_err(OKIError::Partition),
|
|
|
|
|
|
|i, g_len, dur| {
|
|
|
|
|
|
pb.inc(1);
|
|
|
|
|
|
debug!("partition {i}: done in {:.1}s — {g_len} new kmers", dur.as_secs_f64());
|
|
|
|
|
|
part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len });
|
|
|
|
|
|
},
|
|
|
|
|
|
)?;
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
### index.rs build_layers (before: naive into_par_iter)
|
|
|
|
|
|
|
|
|
|
|
|
```rust
|
|
|
|
|
|
let order: Vec<usize> = (0..n).collect();
|
|
|
|
|
|
let runner = PartitionRunner::new();
|
|
|
|
|
|
runner.run(
|
|
|
|
|
|
&order,
|
|
|
|
|
|
|i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits)
|
|
|
|
|
|
.map_err(OKIError::Partition),
|
|
|
|
|
|
|_, n_kmers, _| {
|
|
|
|
|
|
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
|
|
|
|
|
|
pb.inc(1);
|
|
|
|
|
|
},
|
|
|
|
|
|
)?;
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
All other sites (`pack_matrices`, `dump`, `select`, etc.) follow the same
|
|
|
|
|
|
pattern.
|
|
|
|
|
|
|
|
|
|
|
|
## Placement
|
|
|
|
|
|
|
|
|
|
|
|
`PartitionRunner` lives in `obikindex/src/numa.rs` alongside `NumaSetup`.
|
|
|
|
|
|
It depends only on standard library primitives and Rayon — no new dependencies.
|
|
|
|
|
|
|
|
|
|
|
|
A single `PartitionRunner` instance can be built once per command invocation
|
|
|
|
|
|
and reused across multiple `run()` calls (e.g. `merge` runs
|
|
|
|
|
|
`merge_partitions` then `pack_matrices`).
|
|
|
|
|
|
|
2026-07-02 10:05:31 +02:00
|
|
|
|
## Known issue: CPU-only activation signal stalls on I/O-bound stages
|
|
|
|
|
|
|
|
|
|
|
|
Observed on a real `filter` run (109 genomes, 256 partitions, 8×24-core NUMA):
|
|
|
|
|
|
`rebuild` (CPU-bound — k-mer construction) scales cleanly from 9 to 43 active
|
|
|
|
|
|
workers as `CpuSample::do_i_activate` (`obisys::lib.rs`) sees efficiency climb.
|
|
|
|
|
|
`pack_matrices` (I/O-bound — reopens and recomposes per-genome column files
|
|
|
|
|
|
into `.pbmx`/`.pcmx`) activates one extra worker then flatlines at 10/192 for
|
|
|
|
|
|
the rest of the stage, even though 256 partitions keep completing over several
|
|
|
|
|
|
minutes. This matches the documented intent (§ Adaptive mechanism — "avoids
|
|
|
|
|
|
over-provisioning ... I/O-bound ... workloads") but conflates two different
|
|
|
|
|
|
things: *"CPU is not the bottleneck"* and *"more workers would not help"*. On
|
|
|
|
|
|
storage with real queue depth (NVMe, RAID, parallel FS) the second stage could
|
|
|
|
|
|
still benefit from more concurrent workers even with flat CPU usage — a signal
|
|
|
|
|
|
the current mechanism cannot see.
|
|
|
|
|
|
|
|
|
|
|
|
A one-off artefact was also found in the same log: right after a stage
|
|
|
|
|
|
transition, `do_i_activate` produced a physically impossible spike (efficiency
|
|
|
|
|
|
~94 cores on a 192-core box) because it has no minimum-window guard — unlike
|
|
|
|
|
|
its sibling `cpu_efficiency`, which returns `0.0` if `wall < 0.1s`
|
|
|
|
|
|
(`obisys::lib.rs:260`). `do_i_activate` unconditionally overwrites
|
|
|
|
|
|
`self.wall`/`self.user_secs`/`self.sys_secs` even when the elapsed window is
|
|
|
|
|
|
too short to be meaningful, so a burst of rapid completions right after
|
|
|
|
|
|
activating a worker can divide a real CPU delta by a near-zero wall delta.
|
|
|
|
|
|
|
|
|
|
|
|
### Implemented: I/O signal + shared debounce guard
|
|
|
|
|
|
|
|
|
|
|
|
`IoSample` (`obisys::lib.rs`, alongside `CpuSample`) is fed by
|
|
|
|
|
|
`read_bytes`/`write_bytes` from `/proc/self/io` on Linux (actual bytes
|
|
|
|
|
|
submitted to the block layer — not `rchar`/`wchar`, which also count
|
|
|
|
|
|
page-cache hits, and not `ru_inblock`/`ru_oublock`, unreliable on macOS), with
|
|
|
|
|
|
a `proc_pid_rusage(RUSAGE_INFO_V4)` fallback on macOS
|
|
|
|
|
|
(`ri_diskio_bytesread`/`ri_diskio_byteswritten`, FFI only via `libc`, no new
|
|
|
|
|
|
dependency — same pattern as the existing `getrusage` bindings). Any other
|
|
|
|
|
|
target degrades gracefully to a signal that never triggers (falls back to
|
|
|
|
|
|
CPU-only activation), same pattern as `cgroup_v2_available`.
|
|
|
|
|
|
|
|
|
|
|
|
`maybe_activate` (`numa.rs`) activates a worker if *either* signal still shows
|
|
|
|
|
|
headroom, making `PartitionRunner` adapt to whichever resource is actually the
|
|
|
|
|
|
bottleneck without per-call configuration. Both samplers are called
|
|
|
|
|
|
unconditionally — no `||` short-circuit — so neither window starves behind
|
|
|
|
|
|
whichever signal fires first:
|
|
|
|
|
|
|
|
|
|
|
|
```rust
|
2026-07-03 12:47:56 +02:00
|
|
|
|
let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
|
|
|
|
|
|
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
|
2026-07-02 10:05:31 +02:00
|
|
|
|
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD);
|
|
|
|
|
|
if cpu_wants_more || io_wants_more {
|
2026-07-03 12:47:56 +02:00
|
|
|
|
activation.grow(GROWTH_DIVISOR, n_total);
|
2026-07-02 10:05:31 +02:00
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
|
2026-07-03 12:47:56 +02:00
|
|
|
|
The CPU threshold is *not* the flat absolute delta it started as: it scales
|
|
|
|
|
|
with `activation.last_step()` — the number of workers activated in the last
|
|
|
|
|
|
growth step, tracked by `NodeActivation` (`numa.rs`) and updated every time
|
|
|
|
|
|
`grow()` actually grows something. Growing by 8 workers should add ~8 cores of
|
|
|
|
|
|
efficiency if the workload is truly CPU-bound; requiring only
|
|
|
|
|
|
`CPU_SPAWN_THRESHOLD` (20 %) of that expected gain confirms the growth was
|
|
|
|
|
|
useful without demanding perfect linear scaling. Scaling by the *last step's
|
|
|
|
|
|
size* rather than the cumulative total keeps the bar equally meaningful
|
|
|
|
|
|
whether it's the 2nd growth step or the 20th — a flat absolute threshold
|
|
|
|
|
|
(0.2 core) is a strong signal at 8 active workers but pure noise at 150; a
|
|
|
|
|
|
threshold scaled by the *cumulative* total instead (considered and rejected)
|
|
|
|
|
|
would have made the bar essentially impossible to clear late in the ramp,
|
|
|
|
|
|
strangling exactly the CPU-bound saturation the mechanism exists to allow.
|
|
|
|
|
|
|
2026-07-02 10:05:31 +02:00
|
|
|
|
Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit),
|
|
|
|
|
|
raw I/O throughput has no natural scale across devices, so `IoSample` uses a
|
|
|
|
|
|
**relative** growth threshold instead of an absolute one:
|
|
|
|
|
|
|
|
|
|
|
|
```rust
|
|
|
|
|
|
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
|
|
|
|
|
let elapsed = self.wall.elapsed().as_secs_f64();
|
|
|
|
|
|
if elapsed < 0.1 { return false; } // state untouched — window keeps accumulating
|
|
|
|
|
|
|
|
|
|
|
|
let n = Self::read_bytes();
|
|
|
|
|
|
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
|
|
|
|
|
|
let activate = if self.previous_rate == 0.0 {
|
|
|
|
|
|
rate > 0.0 // bootstrap: any measured throughput is signal
|
|
|
|
|
|
} else {
|
|
|
|
|
|
(rate - self.previous_rate) / self.previous_rate >= threshold
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
self.bytes = n;
|
|
|
|
|
|
self.wall = Instant::now(); // reset only on a real sample
|
|
|
|
|
|
activate
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
The `elapsed < 0.1s → return false without mutating state` guard was also
|
|
|
|
|
|
back-ported into `CpuSample::do_i_activate` (previously missing — source of
|
|
|
|
|
|
the ~94-core artefact above) — one fix for both problems, and it removes the
|
|
|
|
|
|
need for any arbitrary I/O-rate floor: a short/noisy window is rejected
|
|
|
|
|
|
outright rather than papered over with a hardware-dependent constant.
|
|
|
|
|
|
|
2026-07-03 12:47:56 +02:00
|
|
|
|
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, module-level
|
|
|
|
|
|
`const` in `numa.rs`, both `0.2`) are a starting point, not a derived value:
|
|
|
|
|
|
`0.2` (20 % relative growth) for `IoSample` was chosen to match the CPU
|
|
|
|
|
|
threshold's *implicit* relative sensitivity (in the observed log, an 8→9
|
|
|
|
|
|
worker step raised efficiency by ~12 %) — but I/O throughput is lumpier than
|
|
|
|
|
|
CPU time (buffered writes flush in bursts), so it needs empirical validation
|
|
|
|
|
|
against a real `pack` run before being considered final.
|
|
|
|
|
|
|
|
|
|
|
|
## Known issue: ramp-up too slow, and confused with node count
|
|
|
|
|
|
|
|
|
|
|
|
The original design started `n_nodes` workers (one per node) and grew one
|
|
|
|
|
|
worker at a time. On a real `filter` run this took ~10 minutes to climb from
|
|
|
|
|
|
9 to ~40 active workers even on the CPU-bound `rebuild` stage — most of a
|
|
|
|
|
|
35-minute stage spent under-provisioned while waiting for evidence to
|
|
|
|
|
|
accumulate one worker at a time. There is no scale-down mechanism (`n_active`
|
|
|
|
|
|
only grows), so the original caution was deliberate — but a quarter of
|
|
|
|
|
|
available cores is still far from saturation, and the real risk zone (over-provisioning
|
|
|
|
|
|
a memory-bandwidth-bound stage) only shows up much later in the ramp, near
|
|
|
|
|
|
full occupancy — not at 25 %.
|
|
|
|
|
|
|
|
|
|
|
|
The fix decouples ramp speed from node *count*: both the initial size and the
|
|
|
|
|
|
growth step are a fraction of `workers_per_node` (node *size*), applied
|
|
|
|
|
|
identically on every node. A single-NUMA-node (UMA) machine ramps exactly as
|
|
|
|
|
|
fast as an 8-node one — growing by `n_nodes` per step, as first considered,
|
|
|
|
|
|
would have degenerated to "grow by 1" on UMA, reproducing the original
|
|
|
|
|
|
problem for exactly the machines that need the fix most.
|
|
|
|
|
|
|
|
|
|
|
|
```rust
|
|
|
|
|
|
// NodeActivation::grow — called both at startup (activate_initial) and on
|
|
|
|
|
|
// every CPU/IO-triggered growth step, with a different divisor each time.
|
|
|
|
|
|
let wanted = (self.caps[idx] / divisor).max(1); // INITIAL_DIVISOR=4 at startup, GROWTH_DIVISOR=8 per step
|
|
|
|
|
|
let room = self.caps[idx].saturating_sub(self.active[idx]);
|
|
|
|
|
|
let grow = wanted.min(room).min(n_total.saturating_sub(self.total));
|
|
|
|
|
|
```
|
2026-07-02 10:05:31 +02:00
|
|
|
|
|
2026-07-03 12:47:56 +02:00
|
|
|
|
This also fixed a latent correctness gap: the original single shared
|
|
|
|
|
|
`activate_tx`/`activate_rx` pair had *no* per-node addressing — sending one
|
|
|
|
|
|
activation signal woke up whichever dormant worker (from any node) happened
|
|
|
|
|
|
to win the race on that channel. `crossbeam_channel` gives no fairness
|
|
|
|
|
|
guarantee across competing receivers, so "round-robin across nodes" was an
|
|
|
|
|
|
assumption the code never actually enforced. `PartitionRunner::run` now opens
|
|
|
|
|
|
one activation channel per node (`activate_txs`/`activate_rxs`, one pair per
|
|
|
|
|
|
`NodeConfig`); `NodeActivation` (`numa.rs`) tracks how many of each node's
|
|
|
|
|
|
dormant workers have been woken and grows every node by the same amount per
|
|
|
|
|
|
step, capped by that node's remaining dormant workers and by the run's total
|
|
|
|
|
|
budget (`n_total`) — balance across nodes is now guaranteed by construction,
|
|
|
|
|
|
not incidental to channel implementation details.
|
2026-07-02 10:05:31 +02:00
|
|
|
|
|
2026-06-15 11:24:37 +02:00
|
|
|
|
## Open questions
|
|
|
|
|
|
|
|
|
|
|
|
- **Error handling**: `run` currently returns the first error; remaining errors
|
|
|
|
|
|
are dropped. A `Vec<E>` return would give complete diagnostics.
|
|
|
|
|
|
|
2026-07-03 12:47:56 +02:00
|
|
|
|
- **`INITIAL_DIVISOR` / `GROWTH_DIVISOR` tuning**: currently `4` and `8`
|
|
|
|
|
|
(start at 1/4 of a node's cores, grow by 1/8 per step), chosen to fix an
|
|
|
|
|
|
observed too-slow ramp — not yet validated against a real `pack` (I/O-bound)
|
|
|
|
|
|
run, where over-provisioning risk is different from the CPU-bound `rebuild`
|
|
|
|
|
|
case this was tuned against.
|
2026-06-15 11:24:37 +02:00
|
|
|
|
|
|
|
|
|
|
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
|
|
|
|
|
|
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
|
|
|
|
|
|
boundaries); `Sync` is not (only one thread holds the lock at a time).
|
|
|
|
|
|
Contention is negligible because a partition takes seconds while the callback
|
|
|
|
|
|
takes microseconds. The callback is therefore simple to write (plain
|
|
|
|
|
|
`Vec::push`, plain `FnMut`) with no measurable performance cost.
|