feat(numa): introduce I/O sampling to prevent activation stalls
Release / create-release (push) Successful in 2m25s
Release / build-linux-x86_64 (push) Successful in 8m47s
Release / build-macos-arm64 (push) Failing after 31s
CI / build (pull_request) Successful in 3m30s

Replaces the monolithic CPU scaling threshold with separate CPU and I/O spawn thresholds. Introduces an `IoSample` struct with platform-specific byte reading and a relative throughput growth heuristic. Adds a 0.1s wall-clock guard to `CpuSample` to suppress artificial efficiency spikes, and updates `maybe_activate` to trigger worker scaling when either resource indicates headroom. Bumps `obikmer` to v1.1.33 and updates architecture documentation.
This commit is contained in:
Eric Coissac
2026-07-02 10:05:31 +02:00
parent 6378734e1c
commit f84dd539bf
5 changed files with 225 additions and 22 deletions
+1 -1
View File
@@ -1704,7 +1704,7 @@ dependencies = [
[[package]]
name = "obikmer"
version = "1.1.32"
version = "1.1.33"
dependencies = [
"clap",
"csv",
+34 -17
View File
@@ -20,7 +20,7 @@ use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet;
#[cfg(feature = "numa")]
use hwlocality::object::types::ObjectType;
use obisys::CpuSample;
use obisys::{CpuSample, IoSample};
use tracing::debug;
// ── Public interface ──────────────────────────────────────────────────────────
@@ -190,10 +190,13 @@ impl PartitionRunner {
/// Run `f(i)` for every index in `order`.
///
/// Workers are pre-spawned dormant and activated adaptively. A timer thread
/// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed
/// fires an efficiency check every `TIMER_SECS` seconds; each completed
/// partition resets that timer (forcing an immediate check) and also
/// triggers its own inline check. A new worker is activated whenever
/// efficiency falls below `SPAWN_THRESHOLD`.
/// triggers its own inline check. A new worker is activated whenever CPU
/// efficiency grows by at least `CPU_SPAWN_THRESHOLD` (absolute, in cores)
/// or I/O throughput grows by at least `IO_SPAWN_THRESHOLD` (relative) since
/// the last check — whichever resource is the actual bottleneck still shows
/// headroom.
///
/// `on_done(i, result, elapsed)` is called from the controller thread as
/// each partition completes — suitable for progress bars and result
@@ -217,8 +220,9 @@ impl PartitionRunner {
return Ok(());
}
const SPAWN_THRESHOLD: f64 = 0.2;
const TIMER_SECS: u64 = 30;
const CPU_SPAWN_THRESHOLD: f64 = 0.2;
const IO_SPAWN_THRESHOLD: f64 = 0.2;
const TIMER_SECS: u64 = 30;
// ── Channels ──────────────────────────────────────────────────────────
let (part_tx, part_rx) = unbounded::<usize>();
@@ -285,8 +289,9 @@ impl PartitionRunner {
// ── Controller ────────────────────────────────────────────────────
let initial_workers = n_nodes.min(max_workers).min(n_total);
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
let mut n_active = initial_workers;
let mut n_active = initial_workers;
let mut cpu_sample = CpuSample::now();
let mut io_sample = IoSample::now();
let mut completed = 0usize;
while completed < n_total {
@@ -303,13 +308,17 @@ impl PartitionRunner {
// Inline check: same logic as a timer tick.
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
&mut io_sample, IO_SPAWN_THRESHOLD,
completed, n_total,
);
}
WorkerEvent::TimerTick => {
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
&mut io_sample, IO_SPAWN_THRESHOLD,
completed, n_total,
);
}
}
@@ -336,17 +345,25 @@ enum WorkerEvent<R, E> {
}
fn maybe_activate(
activate_tx: &crossbeam_channel::Sender<()>,
n_active: &mut usize,
max_workers: usize,
cpu_sample: &mut CpuSample,
threshold: f64,
completed: usize,
n_total: usize,
activate_tx: &crossbeam_channel::Sender<()>,
n_active: &mut usize,
max_workers: usize,
cpu_sample: &mut CpuSample,
cpu_threshold: f64,
io_sample: &mut IoSample,
io_threshold: f64,
completed: usize,
n_total: usize,
) {
if *n_active >= max_workers || completed >= n_total { return; }
if cpu_sample.do_i_activate(threshold) {
// Call both unconditionally (no `||` short-circuit): each sampler must
// advance its own window every tick, regardless of what the other one
// reports, or it would starve behind whichever signal fires first.
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
let io_wants_more = io_sample.do_i_activate(io_threshold);
if cpu_wants_more || io_wants_more {
activate_tx.send(()).ok();
*n_active += 1;
debug!("activated worker {}/{}", n_active, max_workers);
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "obikmer"
version = "1.1.32"
version = "1.1.33"
edition = "2024"
[[bin]]
+94 -1
View File
@@ -266,9 +266,15 @@ impl CpuSample {
}
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let delta_wall = self.wall.elapsed().as_secs_f64();
if delta_wall < 0.1 {
// Window too short to be meaningful — leave state untouched so it
// keeps accumulating until a real sample can be taken.
return false;
}
let n = CpuSample::now();
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
let delta_wall = self.wall.elapsed().as_secs_f64();
let efficiency = delta_ru / delta_wall;
let activate = 0f64.max(efficiency - self.previous) >= threshold;
@@ -289,6 +295,93 @@ impl CpuSample {
}
}
// ── IoSample ──────────────────────────────────────────────────────────────────
/// Snapshot of process-wide block I/O (bytes read + written) + wall clock.
///
/// Same activation protocol as [`CpuSample`], but the growth check in
/// [`do_i_activate`](Self::do_i_activate) is *relative* rather than absolute:
/// raw I/O throughput has no portable scale across storage devices, unlike a
/// core count.
pub struct IoSample {
wall: Instant,
bytes: u64,
previous_rate: f64,
}
impl IoSample {
pub fn now() -> Self {
Self {
wall: Instant::now(),
bytes: Self::read_bytes(),
previous_rate: 0.0,
}
}
/// Bytes actually submitted to the block layer (read + write), summed
/// process-wide. Returns 0 if unavailable — degrades gracefully to a
/// signal that never triggers activation (CPU-only heuristic).
#[cfg(target_os = "linux")]
fn read_bytes() -> u64 {
let Ok(io) = std::fs::read_to_string("/proc/self/io") else {
return 0;
};
io.lines()
.filter_map(|l| {
l.strip_prefix("read_bytes: ")
.or_else(|| l.strip_prefix("write_bytes: "))
})
.filter_map(|v| v.trim().parse::<u64>().ok())
.sum()
}
#[cfg(target_os = "macos")]
fn read_bytes() -> u64 {
use libc::{RUSAGE_INFO_V4, getpid, proc_pid_rusage, rusage_info_v4};
let mut info: rusage_info_v4 = unsafe { std::mem::zeroed() };
let ret =
unsafe { proc_pid_rusage(getpid(), RUSAGE_INFO_V4, &mut info as *mut _ as *mut _) };
if ret != 0 {
return 0;
}
info.ri_diskio_bytesread + info.ri_diskio_byteswritten
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
fn read_bytes() -> u64 {
0
}
/// Same protocol as [`CpuSample::do_i_activate`] (0.1 s minimum window,
/// state untouched on early return), but growth is measured relative to
/// the previous rate. `threshold` is a fraction, e.g. `0.2` for a 20 %
/// increase in throughput since the last real sample.
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let elapsed = self.wall.elapsed().as_secs_f64();
if elapsed < 0.1 {
return false;
}
let n = Self::read_bytes();
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
let activate = if self.previous_rate == 0.0 {
rate > 0.0 // bootstrap: any measured throughput is signal enough
} else {
(rate - self.previous_rate) / self.previous_rate >= threshold
};
debug!(
"Do I activate (I/O) : {} -> {} Activate: {}",
self.previous_rate, rate, activate
);
self.previous_rate = rate;
self.bytes = n;
self.wall = Instant::now();
activate
}
}
// ── public API ────────────────────────────────────────────────────────────────
/// Snapshot taken at the start of a pipeline stage.