Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f84dd539bf | |||
| 6378734e1c | |||
| b3a617cce1 | |||
| 2080e5e8a9 |
@@ -162,14 +162,107 @@ A single `PartitionRunner` instance can be built once per command invocation
|
||||
and reused across multiple `run()` calls (e.g. `merge` runs
|
||||
`merge_partitions` then `pack_matrices`).
|
||||
|
||||
## Known issue: CPU-only activation signal stalls on I/O-bound stages
|
||||
|
||||
Observed on a real `filter` run (109 genomes, 256 partitions, 8×24-core NUMA):
|
||||
`rebuild` (CPU-bound — k-mer construction) scales cleanly from 9 to 43 active
|
||||
workers as `CpuSample::do_i_activate` (`obisys::lib.rs`) sees efficiency climb.
|
||||
`pack_matrices` (I/O-bound — reopens and recomposes per-genome column files
|
||||
into `.pbmx`/`.pcmx`) activates one extra worker then flatlines at 10/192 for
|
||||
the rest of the stage, even though 256 partitions keep completing over several
|
||||
minutes. This matches the documented intent (§ Adaptive mechanism — "avoids
|
||||
over-provisioning ... I/O-bound ... workloads") but conflates two different
|
||||
things: *"CPU is not the bottleneck"* and *"more workers would not help"*. On
|
||||
storage with real queue depth (NVMe, RAID, parallel FS) the second stage could
|
||||
still benefit from more concurrent workers even with flat CPU usage — a signal
|
||||
the current mechanism cannot see.
|
||||
|
||||
A one-off artefact was also found in the same log: right after a stage
|
||||
transition, `do_i_activate` produced a physically impossible spike (efficiency
|
||||
~94 cores on a 192-core box) because it has no minimum-window guard — unlike
|
||||
its sibling `cpu_efficiency`, which returns `0.0` if `wall < 0.1s`
|
||||
(`obisys::lib.rs:260`). `do_i_activate` unconditionally overwrites
|
||||
`self.wall`/`self.user_secs`/`self.sys_secs` even when the elapsed window is
|
||||
too short to be meaningful, so a burst of rapid completions right after
|
||||
activating a worker can divide a real CPU delta by a near-zero wall delta.
|
||||
|
||||
### Implemented: I/O signal + shared debounce guard
|
||||
|
||||
`IoSample` (`obisys::lib.rs`, alongside `CpuSample`) is fed by
|
||||
`read_bytes`/`write_bytes` from `/proc/self/io` on Linux (actual bytes
|
||||
submitted to the block layer — not `rchar`/`wchar`, which also count
|
||||
page-cache hits, and not `ru_inblock`/`ru_oublock`, unreliable on macOS), with
|
||||
a `proc_pid_rusage(RUSAGE_INFO_V4)` fallback on macOS
|
||||
(`ri_diskio_bytesread`/`ri_diskio_byteswritten`, FFI only via `libc`, no new
|
||||
dependency — same pattern as the existing `getrusage` bindings). Any other
|
||||
target degrades gracefully to a signal that never triggers (falls back to
|
||||
CPU-only activation), same pattern as `cgroup_v2_available`.
|
||||
|
||||
`maybe_activate` (`numa.rs`) activates a worker if *either* signal still shows
|
||||
headroom, making `PartitionRunner` adapt to whichever resource is actually the
|
||||
bottleneck without per-call configuration. Both samplers are called
|
||||
unconditionally — no `||` short-circuit — so neither window starves behind
|
||||
whichever signal fires first:
|
||||
|
||||
```rust
|
||||
let cpu_wants_more = cpu_sample.do_i_activate(CPU_SPAWN_THRESHOLD);
|
||||
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD);
|
||||
if cpu_wants_more || io_wants_more {
|
||||
activate_tx.send(()).ok();
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit),
|
||||
raw I/O throughput has no natural scale across devices, so `IoSample` uses a
|
||||
**relative** growth threshold instead of an absolute one:
|
||||
|
||||
```rust
|
||||
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
||||
let elapsed = self.wall.elapsed().as_secs_f64();
|
||||
if elapsed < 0.1 { return false; } // state untouched — window keeps accumulating
|
||||
|
||||
let n = Self::read_bytes();
|
||||
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
|
||||
let activate = if self.previous_rate == 0.0 {
|
||||
rate > 0.0 // bootstrap: any measured throughput is signal
|
||||
} else {
|
||||
(rate - self.previous_rate) / self.previous_rate >= threshold
|
||||
};
|
||||
|
||||
self.bytes = n;
|
||||
self.wall = Instant::now(); // reset only on a real sample
|
||||
activate
|
||||
}
|
||||
```
|
||||
|
||||
The `elapsed < 0.1s → return false without mutating state` guard was also
|
||||
back-ported into `CpuSample::do_i_activate` (previously missing — source of
|
||||
the ~94-core artefact above) — one fix for both problems, and it removes the
|
||||
need for any arbitrary I/O-rate floor: a short/noisy window is rejected
|
||||
outright rather than papered over with a hardware-dependent constant.
|
||||
|
||||
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, both `0.2`)
|
||||
are defined as `const` in `PartitionRunner::run` (`numa.rs`). The I/O value is
|
||||
a starting point, not a derived one — needs empirical validation against a
|
||||
real `pack` run.
|
||||
|
||||
Starting threshold: `0.2` (20 % relative growth) for `IoSample`, same order of
|
||||
magnitude as the CPU threshold's *implicit* relative sensitivity (in the
|
||||
observed log, an 8→9 worker step raised efficiency by ~12 %). This is a
|
||||
starting point, not a derived value — I/O throughput is lumpier than CPU time
|
||||
(buffered writes flush in bursts), so it needs empirical validation against a
|
||||
real `pack` run before being considered final.
|
||||
|
||||
## Open questions
|
||||
|
||||
- **Error handling**: `run` currently returns the first error; remaining errors
|
||||
are dropped. A `Vec<E>` return would give complete diagnostics.
|
||||
|
||||
- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
|
||||
for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from
|
||||
a higher value. A per-call override could be added to the API.
|
||||
for merge on BeeGFS. Superseded by the I/O signal above for the "more
|
||||
workers would help despite flat CPU" case — a per-call override may still be
|
||||
worth keeping as a manual escape hatch.
|
||||
|
||||
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
|
||||
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
|
||||
|
||||
Generated
+1
-1
@@ -1704,7 +1704,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "obikmer"
|
||||
version = "1.1.30"
|
||||
version = "1.1.33"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"csv",
|
||||
|
||||
@@ -20,7 +20,7 @@ use hwlocality::cpu::binding::CpuBindingFlags;
|
||||
use hwlocality::cpu::cpuset::CpuSet;
|
||||
#[cfg(feature = "numa")]
|
||||
use hwlocality::object::types::ObjectType;
|
||||
use obisys::CpuSample;
|
||||
use obisys::{CpuSample, IoSample};
|
||||
use tracing::debug;
|
||||
|
||||
// ── Public interface ──────────────────────────────────────────────────────────
|
||||
@@ -190,10 +190,13 @@ impl PartitionRunner {
|
||||
/// Run `f(i)` for every index in `order`.
|
||||
///
|
||||
/// Workers are pre-spawned dormant and activated adaptively. A timer thread
|
||||
/// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed
|
||||
/// fires an efficiency check every `TIMER_SECS` seconds; each completed
|
||||
/// partition resets that timer (forcing an immediate check) and also
|
||||
/// triggers its own inline check. A new worker is activated whenever
|
||||
/// efficiency falls below `SPAWN_THRESHOLD`.
|
||||
/// triggers its own inline check. A new worker is activated whenever CPU
|
||||
/// efficiency grows by at least `CPU_SPAWN_THRESHOLD` (absolute, in cores)
|
||||
/// or I/O throughput grows by at least `IO_SPAWN_THRESHOLD` (relative) since
|
||||
/// the last check — whichever resource is the actual bottleneck still shows
|
||||
/// headroom.
|
||||
///
|
||||
/// `on_done(i, result, elapsed)` is called from the controller thread as
|
||||
/// each partition completes — suitable for progress bars and result
|
||||
@@ -217,7 +220,8 @@ impl PartitionRunner {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
const SPAWN_THRESHOLD: f64 = 0.2;
|
||||
const CPU_SPAWN_THRESHOLD: f64 = 0.2;
|
||||
const IO_SPAWN_THRESHOLD: f64 = 0.2;
|
||||
const TIMER_SECS: u64 = 30;
|
||||
|
||||
// ── Channels ──────────────────────────────────────────────────────────
|
||||
@@ -287,6 +291,7 @@ impl PartitionRunner {
|
||||
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
|
||||
let mut n_active = initial_workers;
|
||||
let mut cpu_sample = CpuSample::now();
|
||||
let mut io_sample = IoSample::now();
|
||||
let mut completed = 0usize;
|
||||
|
||||
while completed < n_total {
|
||||
@@ -303,13 +308,17 @@ impl PartitionRunner {
|
||||
// Inline check: same logic as a timer tick.
|
||||
maybe_activate(
|
||||
&activate_tx, &mut n_active, max_workers,
|
||||
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
|
||||
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
|
||||
&mut io_sample, IO_SPAWN_THRESHOLD,
|
||||
completed, n_total,
|
||||
);
|
||||
}
|
||||
WorkerEvent::TimerTick => {
|
||||
maybe_activate(
|
||||
&activate_tx, &mut n_active, max_workers,
|
||||
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
|
||||
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
|
||||
&mut io_sample, IO_SPAWN_THRESHOLD,
|
||||
completed, n_total,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -340,13 +349,21 @@ fn maybe_activate(
|
||||
n_active: &mut usize,
|
||||
max_workers: usize,
|
||||
cpu_sample: &mut CpuSample,
|
||||
threshold: f64,
|
||||
cpu_threshold: f64,
|
||||
io_sample: &mut IoSample,
|
||||
io_threshold: f64,
|
||||
completed: usize,
|
||||
n_total: usize,
|
||||
) {
|
||||
if *n_active >= max_workers || completed >= n_total { return; }
|
||||
|
||||
if cpu_sample.do_i_activate(threshold) {
|
||||
// Call both unconditionally (no `||` short-circuit): each sampler must
|
||||
// advance its own window every tick, regardless of what the other one
|
||||
// reports, or it would starve behind whichever signal fires first.
|
||||
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
|
||||
let io_wants_more = io_sample.do_i_activate(io_threshold);
|
||||
|
||||
if cpu_wants_more || io_wants_more {
|
||||
activate_tx.send(()).ok();
|
||||
*n_active += 1;
|
||||
debug!("activated worker {}/{}", n_active, max_workers);
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "obikmer"
|
||||
version = "1.1.30"
|
||||
version = "1.1.33"
|
||||
edition = "2024"
|
||||
|
||||
[[bin]]
|
||||
|
||||
+93
-2
@@ -266,14 +266,19 @@ impl CpuSample {
|
||||
}
|
||||
|
||||
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
||||
let delta_wall = self.wall.elapsed().as_secs_f64();
|
||||
if delta_wall < 0.1 {
|
||||
// Window too short to be meaningful — leave state untouched so it
|
||||
// keeps accumulating until a real sample can be taken.
|
||||
return false;
|
||||
}
|
||||
|
||||
let n = CpuSample::now();
|
||||
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
|
||||
let delta_wall = self.wall.elapsed().as_secs_f64();
|
||||
|
||||
let efficiency = delta_ru / delta_wall;
|
||||
let activate = 0f64.max(efficiency - self.previous) >= threshold;
|
||||
|
||||
if activate {
|
||||
debug!(
|
||||
"Do I activate : {} -> {} = {} Activate: {}",
|
||||
self.previous,
|
||||
@@ -285,7 +290,93 @@ impl CpuSample {
|
||||
self.user_secs = n.user_secs;
|
||||
self.sys_secs = n.sys_secs;
|
||||
self.wall = n.wall;
|
||||
|
||||
activate
|
||||
}
|
||||
}
|
||||
|
||||
// ── IoSample ──────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Snapshot of process-wide block I/O (bytes read + written) + wall clock.
|
||||
///
|
||||
/// Same activation protocol as [`CpuSample`], but the growth check in
|
||||
/// [`do_i_activate`](Self::do_i_activate) is *relative* rather than absolute:
|
||||
/// raw I/O throughput has no portable scale across storage devices, unlike a
|
||||
/// core count.
|
||||
pub struct IoSample {
|
||||
wall: Instant,
|
||||
bytes: u64,
|
||||
previous_rate: f64,
|
||||
}
|
||||
|
||||
impl IoSample {
|
||||
pub fn now() -> Self {
|
||||
Self {
|
||||
wall: Instant::now(),
|
||||
bytes: Self::read_bytes(),
|
||||
previous_rate: 0.0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Bytes actually submitted to the block layer (read + write), summed
|
||||
/// process-wide. Returns 0 if unavailable — degrades gracefully to a
|
||||
/// signal that never triggers activation (CPU-only heuristic).
|
||||
#[cfg(target_os = "linux")]
|
||||
fn read_bytes() -> u64 {
|
||||
let Ok(io) = std::fs::read_to_string("/proc/self/io") else {
|
||||
return 0;
|
||||
};
|
||||
io.lines()
|
||||
.filter_map(|l| {
|
||||
l.strip_prefix("read_bytes: ")
|
||||
.or_else(|| l.strip_prefix("write_bytes: "))
|
||||
})
|
||||
.filter_map(|v| v.trim().parse::<u64>().ok())
|
||||
.sum()
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
fn read_bytes() -> u64 {
|
||||
use libc::{RUSAGE_INFO_V4, getpid, proc_pid_rusage, rusage_info_v4};
|
||||
let mut info: rusage_info_v4 = unsafe { std::mem::zeroed() };
|
||||
let ret =
|
||||
unsafe { proc_pid_rusage(getpid(), RUSAGE_INFO_V4, &mut info as *mut _ as *mut _) };
|
||||
if ret != 0 {
|
||||
return 0;
|
||||
}
|
||||
info.ri_diskio_bytesread + info.ri_diskio_byteswritten
|
||||
}
|
||||
|
||||
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
|
||||
fn read_bytes() -> u64 {
|
||||
0
|
||||
}
|
||||
|
||||
/// Same protocol as [`CpuSample::do_i_activate`] (0.1 s minimum window,
|
||||
/// state untouched on early return), but growth is measured relative to
|
||||
/// the previous rate. `threshold` is a fraction, e.g. `0.2` for a 20 %
|
||||
/// increase in throughput since the last real sample.
|
||||
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
||||
let elapsed = self.wall.elapsed().as_secs_f64();
|
||||
if elapsed < 0.1 {
|
||||
return false;
|
||||
}
|
||||
|
||||
let n = Self::read_bytes();
|
||||
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
|
||||
let activate = if self.previous_rate == 0.0 {
|
||||
rate > 0.0 // bootstrap: any measured throughput is signal enough
|
||||
} else {
|
||||
(rate - self.previous_rate) / self.previous_rate >= threshold
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Do I activate (I/O) : {} -> {} Activate: {}",
|
||||
self.previous_rate, rate, activate
|
||||
);
|
||||
self.previous_rate = rate;
|
||||
self.bytes = n;
|
||||
self.wall = Instant::now();
|
||||
|
||||
activate
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user