7 Commits

Author SHA1 Message Date
coissac 66ab4c6db1 Merge pull request 'feat(numa): introduce I/O sampling to prevent activation stalls' (#55) from push-ooruxnkktvvz into main
Reviewed-on: #55
2026-07-02 09:36:19 +00:00
Eric Coissac f84dd539bf feat(numa): introduce I/O sampling to prevent activation stalls
Release / create-release (push) Successful in 2m25s
Release / build-linux-x86_64 (push) Successful in 8m47s
Release / build-macos-arm64 (push) Failing after 31s
CI / build (pull_request) Successful in 3m30s
Replaces the monolithic CPU scaling threshold with separate CPU and I/O spawn thresholds. Introduces an `IoSample` struct with platform-specific byte reading and a relative throughput growth heuristic. Adds a 0.1s wall-clock guard to `CpuSample` to suppress artificial efficiency spikes, and updates `maybe_activate` to trigger worker scaling when either resource indicates headroom. Bumps `obikmer` to v1.1.33 and updates architecture documentation.
2026-07-02 10:07:22 +02:00
coissac 6378734e1c Merge pull request 'fix(obisys): remove activation guard to always update metrics' (#54) from push-vkloynurrxzu into main
Reviewed-on: #54
2026-07-01 18:34:10 +00:00
Eric Coissac b3a617cce1 fix(obisys): remove activation guard to always update metrics
Release / create-release (push) Successful in 2m26s
CI / build (pull_request) Successful in 3m35s
Release / build-linux-x86_64 (push) Successful in 8m9s
Release / build-macos-arm64 (push) Failing after 30s
Removes the `if activate` conditional in `src/obisys/src/lib.rs`, making debug logging and state updates for performance counters execute unconditionally. This ensures tracking metrics are continuously refreshed regardless of the activation threshold. Also bumps the `obikmer` dependency version.
2026-07-01 20:32:56 +02:00
coissac 2080e5e8a9 Merge pull request 'ci: fix registry auth and bump obikmer to 1.1.30' (#53) from push-zxlknspoxknt into main
Reviewed-on: #53
2026-07-01 14:20:09 +00:00
Eric Coissac 45ed2bc9b8 ci: fix registry auth and bump obikmer to 1.1.30
Release / create-release (push) Successful in 2m26s
Release / build-linux-x86_64 (push) Successful in 8m12s
Release / build-macos-arm64 (push) Failing after 1m55s
CI / build (pull_request) Successful in 3m32s
Update the release workflow to explicitly resolve the Docker registry username from repository secrets instead of inferring it from the runner's actor. Bump the obikmer package version to 1.1.30.
2026-07-01 14:31:30 +02:00
coissac aa126fd89d Merge pull request 'feat: simplify worker spawning logic and update macOS build workflow' (#52) from push-uvmlknmzqqnx into main
Reviewed-on: #52
2026-07-01 09:50:51 +00:00
7 changed files with 236 additions and 35 deletions
Vendored
BIN
View File
Binary file not shown.
+1 -1
View File
@@ -90,7 +90,7 @@ jobs:
- uses: actions/checkout@v4
- name: Login to registry
run: echo "${{ secrets.REGISTRYTOKEN }}" | docker login registry.metabarcoding.org -u ${{ github.actor }} --password-stdin
run: echo "${{ secrets.REGISTRYTOKEN }}" | docker login registry.metabarcoding.org -u ${{ secrets.REGISTRYUSER }} --password-stdin
- name: Cache cargo registry
uses: actions/cache@v4
+95 -2
View File
@@ -162,14 +162,107 @@ A single `PartitionRunner` instance can be built once per command invocation
and reused across multiple `run()` calls (e.g. `merge` runs
`merge_partitions` then `pack_matrices`).
## Known issue: CPU-only activation signal stalls on I/O-bound stages
Observed on a real `filter` run (109 genomes, 256 partitions, 8×24-core NUMA):
`rebuild` (CPU-bound — k-mer construction) scales cleanly from 9 to 43 active
workers as `CpuSample::do_i_activate` (`obisys::lib.rs`) sees efficiency climb.
`pack_matrices` (I/O-bound — reopens and recomposes per-genome column files
into `.pbmx`/`.pcmx`) activates one extra worker then flatlines at 10/192 for
the rest of the stage, even though 256 partitions keep completing over several
minutes. This matches the documented intent (§ Adaptive mechanism — "avoids
over-provisioning ... I/O-bound ... workloads") but conflates two different
things: *"CPU is not the bottleneck"* and *"more workers would not help"*. On
storage with real queue depth (NVMe, RAID, parallel FS) the second stage could
still benefit from more concurrent workers even with flat CPU usage — a signal
the current mechanism cannot see.
A one-off artefact was also found in the same log: right after a stage
transition, `do_i_activate` produced a physically impossible spike (efficiency
~94 cores on a 192-core box) because it has no minimum-window guard — unlike
its sibling `cpu_efficiency`, which returns `0.0` if `wall < 0.1s`
(`obisys::lib.rs:260`). `do_i_activate` unconditionally overwrites
`self.wall`/`self.user_secs`/`self.sys_secs` even when the elapsed window is
too short to be meaningful, so a burst of rapid completions right after
activating a worker can divide a real CPU delta by a near-zero wall delta.
### Implemented: I/O signal + shared debounce guard
`IoSample` (`obisys::lib.rs`, alongside `CpuSample`) is fed by
`read_bytes`/`write_bytes` from `/proc/self/io` on Linux (actual bytes
submitted to the block layer — not `rchar`/`wchar`, which also count
page-cache hits, and not `ru_inblock`/`ru_oublock`, unreliable on macOS), with
a `proc_pid_rusage(RUSAGE_INFO_V4)` fallback on macOS
(`ri_diskio_bytesread`/`ri_diskio_byteswritten`, FFI only via `libc`, no new
dependency — same pattern as the existing `getrusage` bindings). Any other
target degrades gracefully to a signal that never triggers (falls back to
CPU-only activation), same pattern as `cgroup_v2_available`.
`maybe_activate` (`numa.rs`) activates a worker if *either* signal still shows
headroom, making `PartitionRunner` adapt to whichever resource is actually the
bottleneck without per-call configuration. Both samplers are called
unconditionally — no `||` short-circuit — so neither window starves behind
whichever signal fires first:
```rust
let cpu_wants_more = cpu_sample.do_i_activate(CPU_SPAWN_THRESHOLD);
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD);
if cpu_wants_more || io_wants_more {
activate_tx.send(()).ok();
...
}
```
Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit),
raw I/O throughput has no natural scale across devices, so `IoSample` uses a
**relative** growth threshold instead of an absolute one:
```rust
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let elapsed = self.wall.elapsed().as_secs_f64();
if elapsed < 0.1 { return false; } // state untouched — window keeps accumulating
let n = Self::read_bytes();
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
let activate = if self.previous_rate == 0.0 {
rate > 0.0 // bootstrap: any measured throughput is signal
} else {
(rate - self.previous_rate) / self.previous_rate >= threshold
};
self.bytes = n;
self.wall = Instant::now(); // reset only on a real sample
activate
}
```
The `elapsed < 0.1s → return false without mutating state` guard was also
back-ported into `CpuSample::do_i_activate` (previously missing — source of
the ~94-core artefact above) — one fix for both problems, and it removes the
need for any arbitrary I/O-rate floor: a short/noisy window is rejected
outright rather than papered over with a hardware-dependent constant.
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, both `0.2`)
are defined as `const` in `PartitionRunner::run` (`numa.rs`). The I/O value is
a starting point, not a derived one — needs empirical validation against a
real `pack` run.
Starting threshold: `0.2` (20 % relative growth) for `IoSample`, same order of
magnitude as the CPU threshold's *implicit* relative sensitivity (in the
observed log, an 8→9 worker step raised efficiency by ~12 %). This is a
starting point, not a derived value — I/O throughput is lumpier than CPU time
(buffered writes flush in bursts), so it needs empirical validation against a
real `pack` run before being considered final.
## Open questions
- **Error handling**: `run` currently returns the first error; remaining errors
are dropped. A `Vec<E>` return would give complete diagnostics.
- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from
a higher value. A per-call override could be added to the API.
for merge on BeeGFS. Superseded by the I/O signal above for the "more
workers would help despite flat CPU" case — a per-call override may still be
worth keeping as a manual escape hatch.
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
+1 -1
View File
@@ -1704,7 +1704,7 @@ dependencies = [
[[package]]
name = "obikmer"
version = "1.1.29"
version = "1.1.33"
dependencies = [
"clap",
"csv",
+26 -9
View File
@@ -20,7 +20,7 @@ use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet;
#[cfg(feature = "numa")]
use hwlocality::object::types::ObjectType;
use obisys::CpuSample;
use obisys::{CpuSample, IoSample};
use tracing::debug;
// ── Public interface ──────────────────────────────────────────────────────────
@@ -190,10 +190,13 @@ impl PartitionRunner {
/// Run `f(i)` for every index in `order`.
///
/// Workers are pre-spawned dormant and activated adaptively. A timer thread
/// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed
/// fires an efficiency check every `TIMER_SECS` seconds; each completed
/// partition resets that timer (forcing an immediate check) and also
/// triggers its own inline check. A new worker is activated whenever
/// efficiency falls below `SPAWN_THRESHOLD`.
/// triggers its own inline check. A new worker is activated whenever CPU
/// efficiency grows by at least `CPU_SPAWN_THRESHOLD` (absolute, in cores)
/// or I/O throughput grows by at least `IO_SPAWN_THRESHOLD` (relative) since
/// the last check — whichever resource is the actual bottleneck still shows
/// headroom.
///
/// `on_done(i, result, elapsed)` is called from the controller thread as
/// each partition completes — suitable for progress bars and result
@@ -217,7 +220,8 @@ impl PartitionRunner {
return Ok(());
}
const SPAWN_THRESHOLD: f64 = 0.2;
const CPU_SPAWN_THRESHOLD: f64 = 0.2;
const IO_SPAWN_THRESHOLD: f64 = 0.2;
const TIMER_SECS: u64 = 30;
// ── Channels ──────────────────────────────────────────────────────────
@@ -287,6 +291,7 @@ impl PartitionRunner {
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
let mut n_active = initial_workers;
let mut cpu_sample = CpuSample::now();
let mut io_sample = IoSample::now();
let mut completed = 0usize;
while completed < n_total {
@@ -303,13 +308,17 @@ impl PartitionRunner {
// Inline check: same logic as a timer tick.
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
&mut io_sample, IO_SPAWN_THRESHOLD,
completed, n_total,
);
}
WorkerEvent::TimerTick => {
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
&mut io_sample, IO_SPAWN_THRESHOLD,
completed, n_total,
);
}
}
@@ -340,13 +349,21 @@ fn maybe_activate(
n_active: &mut usize,
max_workers: usize,
cpu_sample: &mut CpuSample,
threshold: f64,
cpu_threshold: f64,
io_sample: &mut IoSample,
io_threshold: f64,
completed: usize,
n_total: usize,
) {
if *n_active >= max_workers || completed >= n_total { return; }
if cpu_sample.do_i_activate(threshold) {
// Call both unconditionally (no `||` short-circuit): each sampler must
// advance its own window every tick, regardless of what the other one
// reports, or it would starve behind whichever signal fires first.
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
let io_wants_more = io_sample.do_i_activate(io_threshold);
if cpu_wants_more || io_wants_more {
activate_tx.send(()).ok();
*n_active += 1;
debug!("activated worker {}/{}", n_active, max_workers);
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "obikmer"
version = "1.1.29"
version = "1.1.33"
edition = "2024"
[[bin]]
+93 -2
View File
@@ -266,14 +266,19 @@ impl CpuSample {
}
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let delta_wall = self.wall.elapsed().as_secs_f64();
if delta_wall < 0.1 {
// Window too short to be meaningful — leave state untouched so it
// keeps accumulating until a real sample can be taken.
return false;
}
let n = CpuSample::now();
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
let delta_wall = self.wall.elapsed().as_secs_f64();
let efficiency = delta_ru / delta_wall;
let activate = 0f64.max(efficiency - self.previous) >= threshold;
if activate {
debug!(
"Do I activate : {} -> {} = {} Activate: {}",
self.previous,
@@ -285,7 +290,93 @@ impl CpuSample {
self.user_secs = n.user_secs;
self.sys_secs = n.sys_secs;
self.wall = n.wall;
activate
}
}
// ── IoSample ──────────────────────────────────────────────────────────────────
/// Snapshot of process-wide block I/O (bytes read + written) + wall clock.
///
/// Same activation protocol as [`CpuSample`], but the growth check in
/// [`do_i_activate`](Self::do_i_activate) is *relative* rather than absolute:
/// raw I/O throughput has no portable scale across storage devices, unlike a
/// core count.
pub struct IoSample {
wall: Instant,
bytes: u64,
previous_rate: f64,
}
impl IoSample {
pub fn now() -> Self {
Self {
wall: Instant::now(),
bytes: Self::read_bytes(),
previous_rate: 0.0,
}
}
/// Bytes actually submitted to the block layer (read + write), summed
/// process-wide. Returns 0 if unavailable — degrades gracefully to a
/// signal that never triggers activation (CPU-only heuristic).
#[cfg(target_os = "linux")]
fn read_bytes() -> u64 {
let Ok(io) = std::fs::read_to_string("/proc/self/io") else {
return 0;
};
io.lines()
.filter_map(|l| {
l.strip_prefix("read_bytes: ")
.or_else(|| l.strip_prefix("write_bytes: "))
})
.filter_map(|v| v.trim().parse::<u64>().ok())
.sum()
}
#[cfg(target_os = "macos")]
fn read_bytes() -> u64 {
use libc::{RUSAGE_INFO_V4, getpid, proc_pid_rusage, rusage_info_v4};
let mut info: rusage_info_v4 = unsafe { std::mem::zeroed() };
let ret =
unsafe { proc_pid_rusage(getpid(), RUSAGE_INFO_V4, &mut info as *mut _ as *mut _) };
if ret != 0 {
return 0;
}
info.ri_diskio_bytesread + info.ri_diskio_byteswritten
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
fn read_bytes() -> u64 {
0
}
/// Same protocol as [`CpuSample::do_i_activate`] (0.1 s minimum window,
/// state untouched on early return), but growth is measured relative to
/// the previous rate. `threshold` is a fraction, e.g. `0.2` for a 20 %
/// increase in throughput since the last real sample.
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let elapsed = self.wall.elapsed().as_secs_f64();
if elapsed < 0.1 {
return false;
}
let n = Self::read_bytes();
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
let activate = if self.previous_rate == 0.0 {
rate > 0.0 // bootstrap: any measured throughput is signal enough
} else {
(rate - self.previous_rate) / self.previous_rate >= threshold
};
debug!(
"Do I activate (I/O) : {} -> {} Activate: {}",
self.previous_rate, rate, activate
);
self.previous_rate = rate;
self.bytes = n;
self.wall = Instant::now();
activate
}