From f84dd539bf846f5172304c5e82fd90466194d0c8 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Thu, 2 Jul 2026 10:05:31 +0200 Subject: [PATCH] feat(numa): introduce I/O sampling to prevent activation stalls Replaces the monolithic CPU scaling threshold with separate CPU and I/O spawn thresholds. Introduces an `IoSample` struct with platform-specific byte reading and a relative throughput growth heuristic. Adds a 0.1s wall-clock guard to `CpuSample` to suppress artificial efficiency spikes, and updates `maybe_activate` to trigger worker scaling when either resource indicates headroom. Bumps `obikmer` to v1.1.33 and updates architecture documentation. --- docmd/architecture/numa_partition_runner.md | 97 ++++++++++++++++++++- src/Cargo.lock | 2 +- src/obikindex/src/numa.rs | 51 +++++++---- src/obikmer/Cargo.toml | 2 +- src/obisys/src/lib.rs | 95 +++++++++++++++++++- 5 files changed, 225 insertions(+), 22 deletions(-) diff --git a/docmd/architecture/numa_partition_runner.md b/docmd/architecture/numa_partition_runner.md index d955e55..db24975 100644 --- a/docmd/architecture/numa_partition_runner.md +++ b/docmd/architecture/numa_partition_runner.md @@ -162,14 +162,107 @@ A single `PartitionRunner` instance can be built once per command invocation and reused across multiple `run()` calls (e.g. `merge` runs `merge_partitions` then `pack_matrices`). +## Known issue: CPU-only activation signal stalls on I/O-bound stages + +Observed on a real `filter` run (109 genomes, 256 partitions, 8×24-core NUMA): +`rebuild` (CPU-bound — k-mer construction) scales cleanly from 9 to 43 active +workers as `CpuSample::do_i_activate` (`obisys::lib.rs`) sees efficiency climb. +`pack_matrices` (I/O-bound — reopens and recomposes per-genome column files +into `.pbmx`/`.pcmx`) activates one extra worker then flatlines at 10/192 for +the rest of the stage, even though 256 partitions keep completing over several +minutes. This matches the documented intent (§ Adaptive mechanism — "avoids +over-provisioning ... I/O-bound ... workloads") but conflates two different +things: *"CPU is not the bottleneck"* and *"more workers would not help"*. On +storage with real queue depth (NVMe, RAID, parallel FS) the second stage could +still benefit from more concurrent workers even with flat CPU usage — a signal +the current mechanism cannot see. + +A one-off artefact was also found in the same log: right after a stage +transition, `do_i_activate` produced a physically impossible spike (efficiency +~94 cores on a 192-core box) because it has no minimum-window guard — unlike +its sibling `cpu_efficiency`, which returns `0.0` if `wall < 0.1s` +(`obisys::lib.rs:260`). `do_i_activate` unconditionally overwrites +`self.wall`/`self.user_secs`/`self.sys_secs` even when the elapsed window is +too short to be meaningful, so a burst of rapid completions right after +activating a worker can divide a real CPU delta by a near-zero wall delta. + +### Implemented: I/O signal + shared debounce guard + +`IoSample` (`obisys::lib.rs`, alongside `CpuSample`) is fed by +`read_bytes`/`write_bytes` from `/proc/self/io` on Linux (actual bytes +submitted to the block layer — not `rchar`/`wchar`, which also count +page-cache hits, and not `ru_inblock`/`ru_oublock`, unreliable on macOS), with +a `proc_pid_rusage(RUSAGE_INFO_V4)` fallback on macOS +(`ri_diskio_bytesread`/`ri_diskio_byteswritten`, FFI only via `libc`, no new +dependency — same pattern as the existing `getrusage` bindings). Any other +target degrades gracefully to a signal that never triggers (falls back to +CPU-only activation), same pattern as `cgroup_v2_available`. + +`maybe_activate` (`numa.rs`) activates a worker if *either* signal still shows +headroom, making `PartitionRunner` adapt to whichever resource is actually the +bottleneck without per-call configuration. Both samplers are called +unconditionally — no `||` short-circuit — so neither window starves behind +whichever signal fires first: + +```rust +let cpu_wants_more = cpu_sample.do_i_activate(CPU_SPAWN_THRESHOLD); +let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD); +if cpu_wants_more || io_wants_more { + activate_tx.send(()).ok(); + ... +} +``` + +Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit), +raw I/O throughput has no natural scale across devices, so `IoSample` uses a +**relative** growth threshold instead of an absolute one: + +```rust +pub fn do_i_activate(&mut self, threshold: f64) -> bool { + let elapsed = self.wall.elapsed().as_secs_f64(); + if elapsed < 0.1 { return false; } // state untouched — window keeps accumulating + + let n = Self::read_bytes(); + let rate = n.saturating_sub(self.bytes) as f64 / elapsed; + let activate = if self.previous_rate == 0.0 { + rate > 0.0 // bootstrap: any measured throughput is signal + } else { + (rate - self.previous_rate) / self.previous_rate >= threshold + }; + + self.bytes = n; + self.wall = Instant::now(); // reset only on a real sample + activate +} +``` + +The `elapsed < 0.1s → return false without mutating state` guard was also +back-ported into `CpuSample::do_i_activate` (previously missing — source of +the ~94-core artefact above) — one fix for both problems, and it removes the +need for any arbitrary I/O-rate floor: a short/noisy window is rejected +outright rather than papered over with a hardware-dependent constant. + +Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, both `0.2`) +are defined as `const` in `PartitionRunner::run` (`numa.rs`). The I/O value is +a starting point, not a derived one — needs empirical validation against a +real `pack` run. + +Starting threshold: `0.2` (20 % relative growth) for `IoSample`, same order of +magnitude as the CPU threshold's *implicit* relative sensitivity (in the +observed log, an 8→9 worker step raised efficiency by ~12 %). This is a +starting point, not a derived value — I/O throughput is lumpier than CPU time +(buffered writes flush in bursts), so it needs empirical validation against a +real `pack` run before being considered final. + ## Open questions - **Error handling**: `run` currently returns the first error; remaining errors are dropped. A `Vec` return would give complete diagnostics. - **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated - for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from - a higher value. A per-call override could be added to the API. + for merge on BeeGFS. Superseded by the I/O signal above for the "more + workers would help despite flat CPU" case — a per-call override may still be + worth keeping as a manual escape hatch. - **`on_done` ordering**: the runner serialises calls to `on_done` via an internal `Arc>`. `Send` is required (the Arc clone crosses thread diff --git a/src/Cargo.lock b/src/Cargo.lock index c160e69..129707c 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1704,7 +1704,7 @@ dependencies = [ [[package]] name = "obikmer" -version = "1.1.32" +version = "1.1.33" dependencies = [ "clap", "csv", diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs index 4fca05c..39f63c0 100644 --- a/src/obikindex/src/numa.rs +++ b/src/obikindex/src/numa.rs @@ -20,7 +20,7 @@ use hwlocality::cpu::binding::CpuBindingFlags; use hwlocality::cpu::cpuset::CpuSet; #[cfg(feature = "numa")] use hwlocality::object::types::ObjectType; -use obisys::CpuSample; +use obisys::{CpuSample, IoSample}; use tracing::debug; // ── Public interface ────────────────────────────────────────────────────────── @@ -190,10 +190,13 @@ impl PartitionRunner { /// Run `f(i)` for every index in `order`. /// /// Workers are pre-spawned dormant and activated adaptively. A timer thread - /// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed + /// fires an efficiency check every `TIMER_SECS` seconds; each completed /// partition resets that timer (forcing an immediate check) and also - /// triggers its own inline check. A new worker is activated whenever - /// efficiency falls below `SPAWN_THRESHOLD`. + /// triggers its own inline check. A new worker is activated whenever CPU + /// efficiency grows by at least `CPU_SPAWN_THRESHOLD` (absolute, in cores) + /// or I/O throughput grows by at least `IO_SPAWN_THRESHOLD` (relative) since + /// the last check — whichever resource is the actual bottleneck still shows + /// headroom. /// /// `on_done(i, result, elapsed)` is called from the controller thread as /// each partition completes — suitable for progress bars and result @@ -217,8 +220,9 @@ impl PartitionRunner { return Ok(()); } - const SPAWN_THRESHOLD: f64 = 0.2; - const TIMER_SECS: u64 = 30; + const CPU_SPAWN_THRESHOLD: f64 = 0.2; + const IO_SPAWN_THRESHOLD: f64 = 0.2; + const TIMER_SECS: u64 = 30; // ── Channels ────────────────────────────────────────────────────────── let (part_tx, part_rx) = unbounded::(); @@ -285,8 +289,9 @@ impl PartitionRunner { // ── Controller ──────────────────────────────────────────────────── let initial_workers = n_nodes.min(max_workers).min(n_total); for _ in 0..initial_workers { activate_tx.send(()).ok(); } - let mut n_active = initial_workers; + let mut n_active = initial_workers; let mut cpu_sample = CpuSample::now(); + let mut io_sample = IoSample::now(); let mut completed = 0usize; while completed < n_total { @@ -303,13 +308,17 @@ impl PartitionRunner { // Inline check: same logic as a timer tick. maybe_activate( &activate_tx, &mut n_active, max_workers, - &mut cpu_sample, SPAWN_THRESHOLD, completed, n_total, + &mut cpu_sample, CPU_SPAWN_THRESHOLD, + &mut io_sample, IO_SPAWN_THRESHOLD, + completed, n_total, ); } WorkerEvent::TimerTick => { maybe_activate( &activate_tx, &mut n_active, max_workers, - &mut cpu_sample, SPAWN_THRESHOLD, completed, n_total, + &mut cpu_sample, CPU_SPAWN_THRESHOLD, + &mut io_sample, IO_SPAWN_THRESHOLD, + completed, n_total, ); } } @@ -336,17 +345,25 @@ enum WorkerEvent { } fn maybe_activate( - activate_tx: &crossbeam_channel::Sender<()>, - n_active: &mut usize, - max_workers: usize, - cpu_sample: &mut CpuSample, - threshold: f64, - completed: usize, - n_total: usize, + activate_tx: &crossbeam_channel::Sender<()>, + n_active: &mut usize, + max_workers: usize, + cpu_sample: &mut CpuSample, + cpu_threshold: f64, + io_sample: &mut IoSample, + io_threshold: f64, + completed: usize, + n_total: usize, ) { if *n_active >= max_workers || completed >= n_total { return; } - if cpu_sample.do_i_activate(threshold) { + // Call both unconditionally (no `||` short-circuit): each sampler must + // advance its own window every tick, regardless of what the other one + // reports, or it would starve behind whichever signal fires first. + let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold); + let io_wants_more = io_sample.do_i_activate(io_threshold); + + if cpu_wants_more || io_wants_more { activate_tx.send(()).ok(); *n_active += 1; debug!("activated worker {}/{}", n_active, max_workers); diff --git a/src/obikmer/Cargo.toml b/src/obikmer/Cargo.toml index ea3b33e..5278946 100644 --- a/src/obikmer/Cargo.toml +++ b/src/obikmer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "obikmer" -version = "1.1.32" +version = "1.1.33" edition = "2024" [[bin]] diff --git a/src/obisys/src/lib.rs b/src/obisys/src/lib.rs index a4f3034..0c539a8 100644 --- a/src/obisys/src/lib.rs +++ b/src/obisys/src/lib.rs @@ -266,9 +266,15 @@ impl CpuSample { } pub fn do_i_activate(&mut self, threshold: f64) -> bool { + let delta_wall = self.wall.elapsed().as_secs_f64(); + if delta_wall < 0.1 { + // Window too short to be meaningful — leave state untouched so it + // keeps accumulating until a real sample can be taken. + return false; + } + let n = CpuSample::now(); let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs); - let delta_wall = self.wall.elapsed().as_secs_f64(); let efficiency = delta_ru / delta_wall; let activate = 0f64.max(efficiency - self.previous) >= threshold; @@ -289,6 +295,93 @@ impl CpuSample { } } +// ── IoSample ────────────────────────────────────────────────────────────────── + +/// Snapshot of process-wide block I/O (bytes read + written) + wall clock. +/// +/// Same activation protocol as [`CpuSample`], but the growth check in +/// [`do_i_activate`](Self::do_i_activate) is *relative* rather than absolute: +/// raw I/O throughput has no portable scale across storage devices, unlike a +/// core count. +pub struct IoSample { + wall: Instant, + bytes: u64, + previous_rate: f64, +} + +impl IoSample { + pub fn now() -> Self { + Self { + wall: Instant::now(), + bytes: Self::read_bytes(), + previous_rate: 0.0, + } + } + + /// Bytes actually submitted to the block layer (read + write), summed + /// process-wide. Returns 0 if unavailable — degrades gracefully to a + /// signal that never triggers activation (CPU-only heuristic). + #[cfg(target_os = "linux")] + fn read_bytes() -> u64 { + let Ok(io) = std::fs::read_to_string("/proc/self/io") else { + return 0; + }; + io.lines() + .filter_map(|l| { + l.strip_prefix("read_bytes: ") + .or_else(|| l.strip_prefix("write_bytes: ")) + }) + .filter_map(|v| v.trim().parse::().ok()) + .sum() + } + + #[cfg(target_os = "macos")] + fn read_bytes() -> u64 { + use libc::{RUSAGE_INFO_V4, getpid, proc_pid_rusage, rusage_info_v4}; + let mut info: rusage_info_v4 = unsafe { std::mem::zeroed() }; + let ret = + unsafe { proc_pid_rusage(getpid(), RUSAGE_INFO_V4, &mut info as *mut _ as *mut _) }; + if ret != 0 { + return 0; + } + info.ri_diskio_bytesread + info.ri_diskio_byteswritten + } + + #[cfg(not(any(target_os = "linux", target_os = "macos")))] + fn read_bytes() -> u64 { + 0 + } + + /// Same protocol as [`CpuSample::do_i_activate`] (0.1 s minimum window, + /// state untouched on early return), but growth is measured relative to + /// the previous rate. `threshold` is a fraction, e.g. `0.2` for a 20 % + /// increase in throughput since the last real sample. + pub fn do_i_activate(&mut self, threshold: f64) -> bool { + let elapsed = self.wall.elapsed().as_secs_f64(); + if elapsed < 0.1 { + return false; + } + + let n = Self::read_bytes(); + let rate = n.saturating_sub(self.bytes) as f64 / elapsed; + let activate = if self.previous_rate == 0.0 { + rate > 0.0 // bootstrap: any measured throughput is signal enough + } else { + (rate - self.previous_rate) / self.previous_rate >= threshold + }; + + debug!( + "Do I activate (I/O) : {} -> {} Activate: {}", + self.previous_rate, rate, activate + ); + self.previous_rate = rate; + self.bytes = n; + self.wall = Instant::now(); + + activate + } +} + // ── public API ──────────────────────────────────────────────────────────────── /// Snapshot taken at the start of a pipeline stage.