Files
obikmer/docmd/architecture/numa_partition_runner.md
T
Eric Coissac 67b4e4da53 refactor(numa): replace flat runner with per-node activation channels
Shifts the NUMA-aware runner from a flat, round-robin model to a per-node architecture using dedicated `NodeActivation` channels. Replaces absolute deltas with relative scaling based on the previous growth step's worker count, decoupling growth from node count to fix slow ramp-up and enforce per-node fairness. Updates architecture documentation to reflect these changes and focus tuning questions on `INITIAL`/`GROWTH_DIVISOR` parameters for I/O-bound validation.
2026-07-03 13:03:31 +02:00

14 KiB
Raw Blame History

NUMA-aware partition runner

Problem

All partition-level parallel loops in obikindex currently fall into two categories:

Naive Rayon — used in build_layers, pack_matrices, dump, select, stats, rebuild, reindex:

(0..n).into_par_iter().for_each(|i| work(i));

Threads come from the global Rayon pool with no NUMA awareness. On multi-socket machines this produces cross-socket memory traffic and degrades performance super-linearly (see NUMA-aware worker pools).

Ad-hoc adaptive pool — used in merge:

A bespoke implementation with pre-spawned workers, channel-based dispatch, and activation control. It handles NUMA correctly but is not reusable.

Both cases should be replaced by a single generic mechanism.

Unified model

The key insight is that UMA is just the NUMA case with a single node. The runner always works the same way: one controller thread per node, each independently managing its own workers with the same adaptive logic. The only difference between UMA and NUMA is the number of nodes and whether workers are pinned.

NUMA (k nodes)                    UMA (1 node)

controller-0  controller-1  …     controller-0
    │               │                  │
workers[0]     workers[1]         workers[0]
(pinned)       (pinned)           (global pool)
    └───────────────┴──────────────────┘
              shared work queue

On each node, the Rayon ThreadPool is pinned to that node's CPUs. pool.install() ensures all internal Rayon calls (inside the work function) use the node-local pool. Linux first-touch then places heap allocations in local DRAM automatically.

On UMA the global Rayon pool is used directly — no pinning, no overhead.

Adaptive mechanism

Each controller follows the same logic regardless of node count:

  1. Pre-spawn workers_per_node dormant worker threads (blocked on activate_rx).
  2. Activate the first worker immediately.
  3. Loop on result channel with a SPAWN_POLL timeout:
    • On result: call on_done; check whether to activate the next worker.
    • On timeout: same check.
    • Activation criterion: should_spawn_worker(active, global_efficiency, prev_efficiency).
  4. Drop activate_tx when done — dormant workers exit cleanly.

Global CPU efficiency (CpuSample, reads /proc/stat on Linux) is used by all controllers — no per-node measurement needed. The signal is coarser than per-node efficiency but correct in practice: if any node saturates memory bandwidth, the global efficiency drops and all controllers stop activating workers. Using a standard portable primitive avoids platform-specific CPU accounting and keeps the implementation clean.

Proposed API

pub struct PartitionRunner {
    // One entry per NUMA node; one entry total on UMA.
    nodes: Vec<NodeConfig>,
}

struct NodeConfig {
    pool:       Option<Arc<rayon::ThreadPool>>,  // None = global Rayon pool (UMA)
    cpu_ids:    Vec<usize>,                      // empty = no pinning (UMA)
    max_workers: usize,
}

impl PartitionRunner {
    /// Detect topology and build the runner.
    /// Returns a single-node runner on UMA / macOS / hwloc failure.
    pub fn new() -> Self;

    /// Run `f(i)` for every index in `order`, collecting results.
    ///
    /// `on_done(i, result, elapsed)` is called under an internal mutex as
    /// each partition completes — use it for progress bars and aggregation.
    /// The runner serialises all calls to `on_done` via an internal
    /// `Arc<Mutex<C>>`, so no `Sync` bound is required on the callback.
    /// `Send` is required because the Arc clone crosses thread boundaries.
    ///
    /// Serialisation is free in practice: a partition takes seconds to
    /// minutes; the callback takes microseconds.  Contention is negligible.
    ///
    /// Returns the first error from `f`, if any.
    pub fn run<F, R, E, C>(
        &self,
        order:   &[usize],
        f:       F,
        on_done: C,
    ) -> Result<(), E>
    where
        F: Fn(usize) -> Result<R, E> + Send + Sync,
        R: Send,
        E: Send,
        C: FnMut(usize, R, Duration) + Send;   // Send required, Sync is not
}

order is caller-supplied so each command chooses its scheduling strategy: largest-first for merge, sequential for build_layers, etc.

Migration examples

merge.rs (before: ~180 lines of bespoke machinery)

let runner = PartitionRunner::new();
runner.run(
    &order,
    |i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence)
            .map_err(OKIError::Partition),
    |i, g_len, dur| {
        pb.inc(1);
        debug!("partition {i}: done in {:.1}s — {g_len} new kmers", dur.as_secs_f64());
        part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len });
    },
)?;

index.rs build_layers (before: naive into_par_iter)

let order: Vec<usize> = (0..n).collect();
let runner = PartitionRunner::new();
runner.run(
    &order,
    |i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits)
            .map_err(OKIError::Partition),
    |_, n_kmers, _| {
        total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
        pb.inc(1);
    },
)?;

All other sites (pack_matrices, dump, select, etc.) follow the same pattern.

Placement

PartitionRunner lives in obikindex/src/numa.rs alongside NumaSetup. It depends only on standard library primitives and Rayon — no new dependencies.

A single PartitionRunner instance can be built once per command invocation and reused across multiple run() calls (e.g. merge runs merge_partitions then pack_matrices).

Known issue: CPU-only activation signal stalls on I/O-bound stages

Observed on a real filter run (109 genomes, 256 partitions, 8×24-core NUMA): rebuild (CPU-bound — k-mer construction) scales cleanly from 9 to 43 active workers as CpuSample::do_i_activate (obisys::lib.rs) sees efficiency climb. pack_matrices (I/O-bound — reopens and recomposes per-genome column files into .pbmx/.pcmx) activates one extra worker then flatlines at 10/192 for the rest of the stage, even though 256 partitions keep completing over several minutes. This matches the documented intent (§ Adaptive mechanism — "avoids over-provisioning ... I/O-bound ... workloads") but conflates two different things: "CPU is not the bottleneck" and "more workers would not help". On storage with real queue depth (NVMe, RAID, parallel FS) the second stage could still benefit from more concurrent workers even with flat CPU usage — a signal the current mechanism cannot see.

A one-off artefact was also found in the same log: right after a stage transition, do_i_activate produced a physically impossible spike (efficiency ~94 cores on a 192-core box) because it has no minimum-window guard — unlike its sibling cpu_efficiency, which returns 0.0 if wall < 0.1s (obisys::lib.rs:260). do_i_activate unconditionally overwrites self.wall/self.user_secs/self.sys_secs even when the elapsed window is too short to be meaningful, so a burst of rapid completions right after activating a worker can divide a real CPU delta by a near-zero wall delta.

Implemented: I/O signal + shared debounce guard

IoSample (obisys::lib.rs, alongside CpuSample) is fed by read_bytes/write_bytes from /proc/self/io on Linux (actual bytes submitted to the block layer — not rchar/wchar, which also count page-cache hits, and not ru_inblock/ru_oublock, unreliable on macOS), with a proc_pid_rusage(RUSAGE_INFO_V4) fallback on macOS (ri_diskio_bytesread/ri_diskio_byteswritten, FFI only via libc, no new dependency — same pattern as the existing getrusage bindings). Any other target degrades gracefully to a signal that never triggers (falls back to CPU-only activation), same pattern as cgroup_v2_available.

maybe_activate (numa.rs) activates a worker if either signal still shows headroom, making PartitionRunner adapt to whichever resource is actually the bottleneck without per-call configuration. Both samplers are called unconditionally — no || short-circuit — so neither window starves behind whichever signal fires first:

let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
let io_wants_more  = io_sample.do_i_activate(IO_SPAWN_THRESHOLD);
if cpu_wants_more || io_wants_more {
    activation.grow(GROWTH_DIVISOR, n_total);
}

The CPU threshold is not the flat absolute delta it started as: it scales with activation.last_step() — the number of workers activated in the last growth step, tracked by NodeActivation (numa.rs) and updated every time grow() actually grows something. Growing by 8 workers should add ~8 cores of efficiency if the workload is truly CPU-bound; requiring only CPU_SPAWN_THRESHOLD (20 %) of that expected gain confirms the growth was useful without demanding perfect linear scaling. Scaling by the last step's size rather than the cumulative total keeps the bar equally meaningful whether it's the 2nd growth step or the 20th — a flat absolute threshold (0.2 core) is a strong signal at 8 active workers but pure noise at 150; a threshold scaled by the cumulative total instead (considered and rejected) would have made the bar essentially impossible to clear late in the ramp, strangling exactly the CPU-bound saturation the mechanism exists to allow.

Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit), raw I/O throughput has no natural scale across devices, so IoSample uses a relative growth threshold instead of an absolute one:

pub fn do_i_activate(&mut self, threshold: f64) -> bool {
    let elapsed = self.wall.elapsed().as_secs_f64();
    if elapsed < 0.1 { return false; }        // state untouched — window keeps accumulating

    let n = Self::read_bytes();
    let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
    let activate = if self.previous_rate == 0.0 {
        rate > 0.0                            // bootstrap: any measured throughput is signal
    } else {
        (rate - self.previous_rate) / self.previous_rate >= threshold
    };

    self.bytes = n;
    self.wall  = Instant::now();              // reset only on a real sample
    activate
}

The elapsed < 0.1s → return false without mutating state guard was also back-ported into CpuSample::do_i_activate (previously missing — source of the ~94-core artefact above) — one fix for both problems, and it removes the need for any arbitrary I/O-rate floor: a short/noisy window is rejected outright rather than papered over with a hardware-dependent constant.

Both spawn thresholds (CPU_SPAWN_THRESHOLD, IO_SPAWN_THRESHOLD, module-level const in numa.rs, both 0.2) are a starting point, not a derived value: 0.2 (20 % relative growth) for IoSample was chosen to match the CPU threshold's implicit relative sensitivity (in the observed log, an 8→9 worker step raised efficiency by ~12 %) — but I/O throughput is lumpier than CPU time (buffered writes flush in bursts), so it needs empirical validation against a real pack run before being considered final.

Known issue: ramp-up too slow, and confused with node count

The original design started n_nodes workers (one per node) and grew one worker at a time. On a real filter run this took ~10 minutes to climb from 9 to ~40 active workers even on the CPU-bound rebuild stage — most of a 35-minute stage spent under-provisioned while waiting for evidence to accumulate one worker at a time. There is no scale-down mechanism (n_active only grows), so the original caution was deliberate — but a quarter of available cores is still far from saturation, and the real risk zone (over-provisioning a memory-bandwidth-bound stage) only shows up much later in the ramp, near full occupancy — not at 25 %.

The fix decouples ramp speed from node count: both the initial size and the growth step are a fraction of workers_per_node (node size), applied identically on every node. A single-NUMA-node (UMA) machine ramps exactly as fast as an 8-node one — growing by n_nodes per step, as first considered, would have degenerated to "grow by 1" on UMA, reproducing the original problem for exactly the machines that need the fix most.

// NodeActivation::grow — called both at startup (activate_initial) and on
// every CPU/IO-triggered growth step, with a different divisor each time.
let wanted = (self.caps[idx] / divisor).max(1);   // INITIAL_DIVISOR=4 at startup, GROWTH_DIVISOR=8 per step
let room   = self.caps[idx].saturating_sub(self.active[idx]);
let grow   = wanted.min(room).min(n_total.saturating_sub(self.total));

This also fixed a latent correctness gap: the original single shared activate_tx/activate_rx pair had no per-node addressing — sending one activation signal woke up whichever dormant worker (from any node) happened to win the race on that channel. crossbeam_channel gives no fairness guarantee across competing receivers, so "round-robin across nodes" was an assumption the code never actually enforced. PartitionRunner::run now opens one activation channel per node (activate_txs/activate_rxs, one pair per NodeConfig); NodeActivation (numa.rs) tracks how many of each node's dormant workers have been woken and grows every node by the same amount per step, capped by that node's remaining dormant workers and by the run's total budget (n_total) — balance across nodes is now guaranteed by construction, not incidental to channel implementation details.

Open questions

  • Error handling: run currently returns the first error; remaining errors are dropped. A Vec<E> return would give complete diagnostics.

  • INITIAL_DIVISOR / GROWTH_DIVISOR tuning: currently 4 and 8 (start at 1/4 of a node's cores, grow by 1/8 per step), chosen to fix an observed too-slow ramp — not yet validated against a real pack (I/O-bound) run, where over-provisioning risk is different from the CPU-bound rebuild case this was tuned against.

  • on_done ordering: the runner serialises calls to on_done via an internal Arc<Mutex<C>>. Send is required (the Arc clone crosses thread boundaries); Sync is not (only one thread holds the lock at a time). Contention is negligible because a partition takes seconds while the callback takes microseconds. The callback is therefore simple to write (plain Vec::push, plain FnMut) with no measurable performance cost.