1 Commits

Author SHA1 Message Date
Eric Coissac 21a20ce7ca feat: automate release workflow and add optional NUMA support
Release / create-release (push) Failing after 38s
Release / build-macos-arm64 (push) Has been skipped
Release / build-linux-x86_64 (push) Has been skipped
Refactor the Gitea release workflow to introduce a dedicated `create-release` job that exposes a shared release ID, eliminating redundant inline creation. Update the Makefile to automatically generate annotated tags with AI-derived markdown release notes from recent commits. Add an optional `numa` feature to `obikindex` that gates `hwlocality` behind conditional compilation, providing a graceful UMA fallback when disabled. Bump `obikmer` to v1.1.17.
2026-06-23 08:59:52 +02:00
10 changed files with 173 additions and 454 deletions
Vendored
BIN
View File
Binary file not shown.
+2 -1
View File
@@ -1,8 +1,9 @@
name: CI name: CI
on: on:
pull_request: push:
branches: ['main'] branches: ['main']
pull_request:
jobs: jobs:
build: build:
+14 -10
View File
@@ -13,7 +13,7 @@ jobs:
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with: with:
fetch-depth: 0 fetch-tags: true
- name: Create Gitea release - name: Create Gitea release
id: create id: create
@@ -86,11 +86,20 @@ jobs:
build-macos-arm64: build-macos-arm64:
needs: create-release needs: create-release
runs-on: ubuntu-latest runs-on: ubuntu-latest
defaults:
run:
working-directory: src
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Login to registry - name: Install Rust + zigbuild
run: echo "${{ secrets.REGISTRYTOKEN }}" | docker login registry.metabarcoding.org -u ${{ secrets.REGISTRYUSER }} --password-stdin run: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain stable
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
sudo apt-get update -qq && sudo apt-get install -y -qq jq
pip install ziglang --quiet --break-system-packages
$HOME/.cargo/bin/cargo install cargo-zigbuild
$HOME/.cargo/bin/rustup target add aarch64-apple-darwin
- name: Cache cargo registry - name: Cache cargo registry
uses: actions/cache@v4 uses: actions/cache@v4
@@ -103,12 +112,7 @@ jobs:
restore-keys: macos-arm64-cargo- restore-keys: macos-arm64-cargo-
- name: Build macOS binary - name: Build macOS binary
run: | run: cargo zigbuild --release --target aarch64-apple-darwin --no-default-features
docker run --rm \
-v "${{ github.workspace }}:/src" \
-w /src/src \
registry.metabarcoding.org/cibuilder/rustcrossosx:latest \
cargo build --release --target aarch64-apple-darwin --no-default-features
- name: Prepare and upload artifact - name: Prepare and upload artifact
env: env:
@@ -116,7 +120,7 @@ jobs:
RELEASE_ID: ${{ needs.create-release.outputs.release_id }} RELEASE_ID: ${{ needs.create-release.outputs.release_id }}
run: | run: |
mkdir -p /tmp/dist mkdir -p /tmp/dist
cp src/target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64 cp target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
curl -s -X POST \ curl -s -X POST \
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \ "${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \
-H "Authorization: token $GITEA_TOKEN" \ -H "Authorization: token $GITEA_TOKEN" \
-1
View File
@@ -8,7 +8,6 @@ data-stress
*.pb *.pb
./**/*.json ./**/*.json
*.bin *.bin
*.log
Betula_exilis--IGA-24-33 Betula_exilis--IGA-24-33
benchmark/genomes benchmark/genomes
benchmark/simulated_data benchmark/simulated_data
+2 -3
View File
@@ -92,7 +92,6 @@ release: bump-version
git_hash=$$(jj log -r @ --no-graph -T 'commit_id'); \ git_hash=$$(jj log -r @ --no-graph -T 'commit_id'); \
commits=$$(jj log -r 'latest(tags())..@' --no-graph -T 'description ++ "\n"' 2>/dev/null || \ commits=$$(jj log -r 'latest(tags())..@' --no-graph -T 'description ++ "\n"' 2>/dev/null || \
jj log --no-graph -T 'description ++ "\n"' --limit 30); \ jj log --no-graph -T 'description ++ "\n"' --limit 30); \
notes=$$(printf 'Write concise markdown release notes for obikmer (a Rust kmer genomics tool). Be technical and direct. Base them strictly on these commit messages:\n\n%s' "$$commits" | aichat 2>/dev/null); \ notes=$$(printf 'Write concise markdown release notes for obikmer (a Rust kmer genomics tool). Be technical and direct. Base them strictly on these commit messages:\n\n%s' "$$commits" | aichat); \
tag_msg="$${notes:-Release v$$new_version}"; \ git tag -a "v$$new_version" -m "$$notes" "$$git_hash" && \
git tag -a "v$$new_version" -m "$$tag_msg" "$$git_hash" && \
git push origin "v$$new_version" git push origin "v$$new_version"
+2 -95
View File
@@ -162,107 +162,14 @@ A single `PartitionRunner` instance can be built once per command invocation
and reused across multiple `run()` calls (e.g. `merge` runs and reused across multiple `run()` calls (e.g. `merge` runs
`merge_partitions` then `pack_matrices`). `merge_partitions` then `pack_matrices`).
## Known issue: CPU-only activation signal stalls on I/O-bound stages
Observed on a real `filter` run (109 genomes, 256 partitions, 8×24-core NUMA):
`rebuild` (CPU-bound — k-mer construction) scales cleanly from 9 to 43 active
workers as `CpuSample::do_i_activate` (`obisys::lib.rs`) sees efficiency climb.
`pack_matrices` (I/O-bound — reopens and recomposes per-genome column files
into `.pbmx`/`.pcmx`) activates one extra worker then flatlines at 10/192 for
the rest of the stage, even though 256 partitions keep completing over several
minutes. This matches the documented intent (§ Adaptive mechanism — "avoids
over-provisioning ... I/O-bound ... workloads") but conflates two different
things: *"CPU is not the bottleneck"* and *"more workers would not help"*. On
storage with real queue depth (NVMe, RAID, parallel FS) the second stage could
still benefit from more concurrent workers even with flat CPU usage — a signal
the current mechanism cannot see.
A one-off artefact was also found in the same log: right after a stage
transition, `do_i_activate` produced a physically impossible spike (efficiency
~94 cores on a 192-core box) because it has no minimum-window guard — unlike
its sibling `cpu_efficiency`, which returns `0.0` if `wall < 0.1s`
(`obisys::lib.rs:260`). `do_i_activate` unconditionally overwrites
`self.wall`/`self.user_secs`/`self.sys_secs` even when the elapsed window is
too short to be meaningful, so a burst of rapid completions right after
activating a worker can divide a real CPU delta by a near-zero wall delta.
### Implemented: I/O signal + shared debounce guard
`IoSample` (`obisys::lib.rs`, alongside `CpuSample`) is fed by
`read_bytes`/`write_bytes` from `/proc/self/io` on Linux (actual bytes
submitted to the block layer — not `rchar`/`wchar`, which also count
page-cache hits, and not `ru_inblock`/`ru_oublock`, unreliable on macOS), with
a `proc_pid_rusage(RUSAGE_INFO_V4)` fallback on macOS
(`ri_diskio_bytesread`/`ri_diskio_byteswritten`, FFI only via `libc`, no new
dependency — same pattern as the existing `getrusage` bindings). Any other
target degrades gracefully to a signal that never triggers (falls back to
CPU-only activation), same pattern as `cgroup_v2_available`.
`maybe_activate` (`numa.rs`) activates a worker if *either* signal still shows
headroom, making `PartitionRunner` adapt to whichever resource is actually the
bottleneck without per-call configuration. Both samplers are called
unconditionally — no `||` short-circuit — so neither window starves behind
whichever signal fires first:
```rust
let cpu_wants_more = cpu_sample.do_i_activate(CPU_SPAWN_THRESHOLD);
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD);
if cpu_wants_more || io_wants_more {
activate_tx.send(()).ok();
...
}
```
Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit),
raw I/O throughput has no natural scale across devices, so `IoSample` uses a
**relative** growth threshold instead of an absolute one:
```rust
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let elapsed = self.wall.elapsed().as_secs_f64();
if elapsed < 0.1 { return false; } // state untouched — window keeps accumulating
let n = Self::read_bytes();
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
let activate = if self.previous_rate == 0.0 {
rate > 0.0 // bootstrap: any measured throughput is signal
} else {
(rate - self.previous_rate) / self.previous_rate >= threshold
};
self.bytes = n;
self.wall = Instant::now(); // reset only on a real sample
activate
}
```
The `elapsed < 0.1s → return false without mutating state` guard was also
back-ported into `CpuSample::do_i_activate` (previously missing — source of
the ~94-core artefact above) — one fix for both problems, and it removes the
need for any arbitrary I/O-rate floor: a short/noisy window is rejected
outright rather than papered over with a hardware-dependent constant.
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, both `0.2`)
are defined as `const` in `PartitionRunner::run` (`numa.rs`). The I/O value is
a starting point, not a derived one — needs empirical validation against a
real `pack` run.
Starting threshold: `0.2` (20 % relative growth) for `IoSample`, same order of
magnitude as the CPU threshold's *implicit* relative sensitivity (in the
observed log, an 8→9 worker step raised efficiency by ~12 %). This is a
starting point, not a derived value — I/O throughput is lumpier than CPU time
(buffered writes flush in bursts), so it needs empirical validation against a
real `pack` run before being considered final.
## Open questions ## Open questions
- **Error handling**: `run` currently returns the first error; remaining errors - **Error handling**: `run` currently returns the first error; remaining errors
are dropped. A `Vec<E>` return would give complete diagnostics. are dropped. A `Vec<E>` return would give complete diagnostics.
- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated - **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
for merge on BeeGFS. Superseded by the I/O signal above for the "more for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from
workers would help despite flat CPU" case — a per-call override may still be a higher value. A per-call override could be added to the API.
worth keeping as a manual escape hatch.
- **`on_done` ordering**: the runner serialises calls to `on_done` via an - **`on_done` ordering**: the runner serialises calls to `on_done` via an
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
+1 -1
View File
@@ -1704,7 +1704,7 @@ dependencies = [
[[package]] [[package]]
name = "obikmer" name = "obikmer"
version = "1.1.33" version = "1.1.17"
dependencies = [ dependencies = [
"clap", "clap",
"csv", "csv",
+50 -38
View File
@@ -20,7 +20,7 @@ use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet; use hwlocality::cpu::cpuset::CpuSet;
#[cfg(feature = "numa")] #[cfg(feature = "numa")]
use hwlocality::object::types::ObjectType; use hwlocality::object::types::ObjectType;
use obisys::{CpuSample, IoSample}; use obisys::CpuSample;
use tracing::debug; use tracing::debug;
// ── Public interface ────────────────────────────────────────────────────────── // ── Public interface ──────────────────────────────────────────────────────────
@@ -190,13 +190,10 @@ impl PartitionRunner {
/// Run `f(i)` for every index in `order`. /// Run `f(i)` for every index in `order`.
/// ///
/// Workers are pre-spawned dormant and activated adaptively. A timer thread /// Workers are pre-spawned dormant and activated adaptively. A timer thread
/// fires an efficiency check every `TIMER_SECS` seconds; each completed /// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed
/// partition resets that timer (forcing an immediate check) and also /// partition resets that timer (forcing an immediate check) and also
/// triggers its own inline check. A new worker is activated whenever CPU /// triggers its own inline check. A new worker is activated whenever
/// efficiency grows by at least `CPU_SPAWN_THRESHOLD` (absolute, in cores) /// efficiency falls below `SPAWN_THRESHOLD`.
/// or I/O throughput grows by at least `IO_SPAWN_THRESHOLD` (relative) since
/// the last check — whichever resource is the actual bottleneck still shows
/// headroom.
/// ///
/// `on_done(i, result, elapsed)` is called from the controller thread as /// `on_done(i, result, elapsed)` is called from the controller thread as
/// each partition completes — suitable for progress bars and result /// each partition completes — suitable for progress bars and result
@@ -220,9 +217,12 @@ impl PartitionRunner {
return Ok(()); return Ok(());
} }
const CPU_SPAWN_THRESHOLD: f64 = 0.2; const SPAWN_THRESHOLD: f64 = 0.95;
const IO_SPAWN_THRESHOLD: f64 = 0.2; const TIMER_SECS: u64 = 30;
const TIMER_SECS: u64 = 30;
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
// ── Channels ────────────────────────────────────────────────────────── // ── Channels ──────────────────────────────────────────────────────────
let (part_tx, part_rx) = unbounded::<usize>(); let (part_tx, part_rx) = unbounded::<usize>();
@@ -287,12 +287,11 @@ impl PartitionRunner {
drop(event_tx); drop(event_tx);
// ── Controller ──────────────────────────────────────────────────── // ── Controller ────────────────────────────────────────────────────
let initial_workers = n_nodes.min(max_workers).min(n_total); activate_tx.send(()).ok();
for _ in 0..initial_workers { activate_tx.send(()).ok(); } let mut n_active = 1usize;
let mut n_active = initial_workers; let mut cpu_sample = CpuSample::now();
let mut cpu_sample = CpuSample::now(); let mut eff_at_last_spawn = 0.0f64; // 0 = no previous spawn to evaluate
let mut io_sample = IoSample::now(); let mut completed = 0usize;
let mut completed = 0usize;
while completed < n_total { while completed < n_total {
let Ok(event) = event_rx.recv() else { break }; let Ok(event) = event_rx.recv() else { break };
@@ -308,17 +307,15 @@ impl PartitionRunner {
// Inline check: same logic as a timer tick. // Inline check: same logic as a timer tick.
maybe_activate( maybe_activate(
&activate_tx, &mut n_active, max_workers, &activate_tx, &mut n_active, max_workers,
&mut cpu_sample, CPU_SPAWN_THRESHOLD, &mut cpu_sample, &mut eff_at_last_spawn,
&mut io_sample, IO_SPAWN_THRESHOLD, n_cores, SPAWN_THRESHOLD, completed, n_total,
completed, n_total,
); );
} }
WorkerEvent::TimerTick => { WorkerEvent::TimerTick => {
maybe_activate( maybe_activate(
&activate_tx, &mut n_active, max_workers, &activate_tx, &mut n_active, max_workers,
&mut cpu_sample, CPU_SPAWN_THRESHOLD, &mut cpu_sample, &mut eff_at_last_spawn,
&mut io_sample, IO_SPAWN_THRESHOLD, n_cores, SPAWN_THRESHOLD, completed, n_total,
completed, n_total,
); );
} }
} }
@@ -345,27 +342,42 @@ enum WorkerEvent<R, E> {
} }
fn maybe_activate( fn maybe_activate(
activate_tx: &crossbeam_channel::Sender<()>, activate_tx: &crossbeam_channel::Sender<()>,
n_active: &mut usize, n_active: &mut usize,
max_workers: usize, max_workers: usize,
cpu_sample: &mut CpuSample, cpu_sample: &mut CpuSample,
cpu_threshold: f64, eff_at_last_spawn: &mut f64,
io_sample: &mut IoSample, n_cores: usize,
io_threshold: f64, threshold: f64,
completed: usize, completed: usize,
n_total: usize, n_total: usize,
) { ) {
if *n_active >= max_workers || completed >= n_total { return; } if *n_active >= max_workers || completed >= n_total { return; }
// Call both unconditionally (no `||` short-circuit): each sampler must let eff = cpu_sample.cpu_efficiency(n_cores);
// advance its own window every tick, regardless of what the other one if eff >= threshold { return; } // CPU already saturated
// reports, or it would starve behind whichever signal fires first.
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
let io_wants_more = io_sample.do_i_activate(io_threshold);
if cpu_wants_more || io_wants_more { // Check that the previous activation was beneficial enough.
// Going from k-1 → k workers, the minimum acceptable speedup is (k-1+0.2)/(k-1).
// For the very first extra worker (n_active == 1, no previous spawn), skip this
// check: eff_at_last_spawn == 0 acts as the sentinel.
let last_spawn_was_beneficial = if *eff_at_last_spawn < 1e-9 {
true // first additional worker: no prior data to evaluate
} else {
let k_before = (*n_active - 1) as f64;
let min_speedup = (k_before + 0.2) / k_before;
let actual_speedup = eff / *eff_at_last_spawn;
actual_speedup >= min_speedup
};
if last_spawn_was_beneficial {
activate_tx.send(()).ok(); activate_tx.send(()).ok();
*eff_at_last_spawn = eff;
*n_active += 1; *n_active += 1;
debug!("activated worker {}/{}", n_active, max_workers); *cpu_sample = CpuSample::now();
debug!(
"activated worker {}/{} — efficiency {:.0}%",
n_active, max_workers, eff * 100.0,
);
} }
} }
+2 -4
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "obikmer" name = "obikmer"
version = "1.1.33" version = "1.1.17"
edition = "2024" edition = "2024"
[[bin]] [[bin]]
@@ -18,7 +18,7 @@ obikrope = { path = "../obikrope" }
obikpartitionner = { path = "../obikpartitionner" } obikpartitionner = { path = "../obikpartitionner" }
obisys = { path = "../obisys" } obisys = { path = "../obisys" }
obiskio = { path = "../obiskio" } obiskio = { path = "../obiskio" }
obikindex = { path = "../obikindex", default-features = false } obikindex = { path = "../obikindex" }
obitaxonomy = { path = "../obitaxonomy" } obitaxonomy = { path = "../obitaxonomy" }
obilayeredmap = { path = "../obilayeredmap" } obilayeredmap = { path = "../obilayeredmap" }
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
@@ -33,6 +33,4 @@ tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
pprof = { version = "0.13", features = ["prost-codec"], optional = true } pprof = { version = "0.13", features = ["prost-codec"], optional = true }
[features] [features]
default = ["numa"]
numa = ["obikindex/numa"]
profiling = ["dep:pprof"] profiling = ["dep:pprof"]
+100 -301
View File
@@ -4,7 +4,7 @@ use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{ProgressBar, ProgressStyle};
use tracing::{debug, info, warn}; use tracing::{info, warn};
const BRAILLE: &[&str] = &["", "", "", "", "", "", "", "", "", ""]; const BRAILLE: &[&str] = &["", "", "", "", "", "", "", "", "", ""];
@@ -14,25 +14,24 @@ const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧
/// a TTY (e.g. HPC job logs): every 10% for bounded bars, every ~10 s for /// a TTY (e.g. HPC job logs): every 10% for bounded bars, every ~10 s for
/// spinners (throttled on `set_message`). /// spinners (throttled on `set_message`).
pub struct TracedBar { pub struct TracedBar {
pb: ProgressBar, pb: ProgressBar,
label: String, label: String,
unit: String, unit: String,
total: u64, // 0 for spinners total: u64, // 0 for spinners
start: Instant, // creation time, for spinner throttling start: Instant, // creation time, for spinner throttling
last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet
last_log_ms: AtomicU64, // ms since `start` at last spinner log last_log_ms: AtomicU64, // ms since `start` at last spinner log
} }
impl TracedBar { impl TracedBar {
pub fn inc(&self, delta: u64) { pub fn inc(&self, delta: u64) {
self.pb.inc(delta); self.pb.inc(delta);
if self.pb.is_hidden() && self.total > 0 { if self.pb.is_hidden() && self.total > 0 {
let pos = self.pb.position(); let pos = self.pb.position();
let pct10 = (pos * 10) / self.total; // 0..=10 let pct10 = (pos * 10) / self.total; // 0..=10
let last = self.last_pct.load(Ordering::Relaxed); let last = self.last_pct.load(Ordering::Relaxed);
if pct10 > last if pct10 > last
&& self && self.last_pct
.last_pct
.compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed) .compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed)
.is_ok() .is_ok()
{ {
@@ -50,14 +49,14 @@ impl TracedBar {
let msg = msg.into(); let msg = msg.into();
if self.pb.is_hidden() { if self.pb.is_hidden() {
if self.total > 0 { if self.total > 0 {
debug!(stage = %self.label, "{msg}"); // bounded bar: always log (already rate-limited by 10% threshold in inc)
info!(stage = %self.label, "{msg}");
} else { } else {
// spinner: throttle to ~10 s // spinner: throttle to ~10 s
let now_ms = self.start.elapsed().as_millis() as u64; let now_ms = self.start.elapsed().as_millis() as u64;
let last = self.last_log_ms.load(Ordering::Relaxed); let last = self.last_log_ms.load(Ordering::Relaxed);
if now_ms >= last + 10_000 if now_ms >= last + 10_000
&& self && self.last_log_ms
.last_log_ms
.compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed) .compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed)
.is_ok() .is_ok()
{ {
@@ -84,13 +83,8 @@ pub fn spinner(label: &str) -> TracedBar {
); );
pb.enable_steady_tick(Duration::from_millis(100)); pb.enable_steady_tick(Duration::from_millis(100));
TracedBar { TracedBar {
pb, pb, label: label.to_string(), unit: String::new(), total: 0,
label: label.to_string(), start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
unit: String::new(),
total: 0,
start: Instant::now(),
last_pct: AtomicU64::new(0),
last_log_ms: AtomicU64::new(0),
} }
} }
@@ -107,13 +101,8 @@ pub fn progress_bar(label: &str, n: u64, unit: &str) -> TracedBar {
); );
pb.enable_steady_tick(Duration::from_millis(100)); pb.enable_steady_tick(Duration::from_millis(100));
TracedBar { TracedBar {
pb, pb, label: label.to_string(), unit: unit.to_string(), total: n,
label: label.to_string(), start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
unit: unit.to_string(),
total: n,
start: Instant::now(),
last_pct: AtomicU64::new(0),
last_log_ms: AtomicU64::new(0),
} }
} }
@@ -215,19 +204,13 @@ fn tv_to_secs(tv: timeval) -> f64 {
} }
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
fn rss_to_bytes(ru: &rusage) -> u64 { fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 }
ru.ru_maxrss as u64
}
#[cfg(not(target_os = "macos"))] #[cfg(not(target_os = "macos"))]
fn rss_to_bytes(ru: &rusage) -> u64 { fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 }
ru.ru_maxrss as u64 * 1024
}
// Monotonically increasing counters — negative delta would be a kernel bug. // Monotonically increasing counters — negative delta would be a kernel bug.
fn delta(end: i64, start: i64) -> u64 { fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
(end - start).max(0) as u64
}
// ── CpuSample ───────────────────────────────────────────────────────────────── // ── CpuSample ─────────────────────────────────────────────────────────────────
@@ -235,151 +218,31 @@ fn delta(end: i64, start: i64) -> u64 {
/// Use [`cpu_efficiency`](Self::cpu_efficiency) to measure the fraction of /// Use [`cpu_efficiency`](Self::cpu_efficiency) to measure the fraction of
/// available cores used since the snapshot was taken. /// available cores used since the snapshot was taken.
pub struct CpuSample { pub struct CpuSample {
wall: Instant, wall: Instant,
user_secs: f64, user_secs: f64,
sys_secs: f64, sys_secs: f64,
previous: f64,
} }
impl CpuSample { impl CpuSample {
pub fn now() -> Self { pub fn now() -> Self {
let ru = get_rusage(); let ru = get_rusage();
Self { Self {
wall: Instant::now(), wall: Instant::now(),
user_secs: tv_to_secs(ru.ru_utime), user_secs: tv_to_secs(ru.ru_utime),
sys_secs: tv_to_secs(ru.ru_stime), sys_secs: tv_to_secs(ru.ru_stime),
previous: 0.0,
} }
} }
/// (user_delta + sys_delta) / (wall_delta × n_cores) since this snapshot. /// (user_delta + sys_delta) / (wall_delta × n_cores) since this snapshot.
/// Returns 0.0 if less than 100 ms have elapsed (too noisy). /// Returns 0.0 if less than 100 ms have elapsed (too noisy).
pub fn cpu_efficiency(&self, n_cores: usize) -> f64 { pub fn cpu_efficiency(&self, n_cores: usize) -> f64 {
let ru = get_rusage(); let ru = get_rusage();
let wall = self.wall.elapsed().as_secs_f64(); let wall = self.wall.elapsed().as_secs_f64();
if wall < 0.1 { if wall < 0.1 { return 0.0; }
return 0.0; let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs)
} + (tv_to_secs(ru.ru_stime) - self.sys_secs);
let cpu =
(tv_to_secs(ru.ru_utime) - self.user_secs) + (tv_to_secs(ru.ru_stime) - self.sys_secs);
cpu / (wall * n_cores as f64) cpu / (wall * n_cores as f64)
} }
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let delta_wall = self.wall.elapsed().as_secs_f64();
if delta_wall < 0.1 {
// Window too short to be meaningful — leave state untouched so it
// keeps accumulating until a real sample can be taken.
return false;
}
let n = CpuSample::now();
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
let efficiency = delta_ru / delta_wall;
let activate = 0f64.max(efficiency - self.previous) >= threshold;
debug!(
"Do I activate : {} -> {} = {} Activate: {}",
self.previous,
efficiency,
0f64.max(efficiency - self.previous),
activate
);
self.previous = efficiency;
self.user_secs = n.user_secs;
self.sys_secs = n.sys_secs;
self.wall = n.wall;
activate
}
}
// ── IoSample ──────────────────────────────────────────────────────────────────
/// Snapshot of process-wide block I/O (bytes read + written) + wall clock.
///
/// Same activation protocol as [`CpuSample`], but the growth check in
/// [`do_i_activate`](Self::do_i_activate) is *relative* rather than absolute:
/// raw I/O throughput has no portable scale across storage devices, unlike a
/// core count.
pub struct IoSample {
wall: Instant,
bytes: u64,
previous_rate: f64,
}
impl IoSample {
pub fn now() -> Self {
Self {
wall: Instant::now(),
bytes: Self::read_bytes(),
previous_rate: 0.0,
}
}
/// Bytes actually submitted to the block layer (read + write), summed
/// process-wide. Returns 0 if unavailable — degrades gracefully to a
/// signal that never triggers activation (CPU-only heuristic).
#[cfg(target_os = "linux")]
fn read_bytes() -> u64 {
let Ok(io) = std::fs::read_to_string("/proc/self/io") else {
return 0;
};
io.lines()
.filter_map(|l| {
l.strip_prefix("read_bytes: ")
.or_else(|| l.strip_prefix("write_bytes: "))
})
.filter_map(|v| v.trim().parse::<u64>().ok())
.sum()
}
#[cfg(target_os = "macos")]
fn read_bytes() -> u64 {
use libc::{RUSAGE_INFO_V4, getpid, proc_pid_rusage, rusage_info_v4};
let mut info: rusage_info_v4 = unsafe { std::mem::zeroed() };
let ret =
unsafe { proc_pid_rusage(getpid(), RUSAGE_INFO_V4, &mut info as *mut _ as *mut _) };
if ret != 0 {
return 0;
}
info.ri_diskio_bytesread + info.ri_diskio_byteswritten
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
fn read_bytes() -> u64 {
0
}
/// Same protocol as [`CpuSample::do_i_activate`] (0.1 s minimum window,
/// state untouched on early return), but growth is measured relative to
/// the previous rate. `threshold` is a fraction, e.g. `0.2` for a 20 %
/// increase in throughput since the last real sample.
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let elapsed = self.wall.elapsed().as_secs_f64();
if elapsed < 0.1 {
return false;
}
let n = Self::read_bytes();
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
let activate = if self.previous_rate == 0.0 {
rate > 0.0 // bootstrap: any measured throughput is signal enough
} else {
(rate - self.previous_rate) / self.previous_rate >= threshold
};
debug!(
"Do I activate (I/O) : {} -> {} Activate: {}",
self.previous_rate, rate, activate
);
self.previous_rate = rate;
self.bytes = n;
self.wall = Instant::now();
activate
}
} }
// ── public API ──────────────────────────────────────────────────────────────── // ── public API ────────────────────────────────────────────────────────────────
@@ -388,37 +251,33 @@ impl IoSample {
#[must_use = "call .stop() to record the stage"] #[must_use = "call .stop() to record the stage"]
pub struct Stage { pub struct Stage {
label: String, label: String,
wall: Instant, wall: Instant,
ru: rusage, ru: rusage,
} }
impl Stage { impl Stage {
pub fn start(label: impl Into<String>) -> Self { pub fn start(label: impl Into<String>) -> Self {
let label = label.into(); let label = label.into();
info!(stage = %label, "started"); info!(stage = %label, "started");
Self { Self { label, wall: Instant::now(), ru: get_rusage() }
label,
wall: Instant::now(),
ru: get_rusage(),
}
} }
pub fn stop(self) -> StageStats { pub fn stop(self) -> StageStats {
let wall_secs = self.wall.elapsed().as_secs_f64(); let wall_secs = self.wall.elapsed().as_secs_f64();
let end = get_rusage(); let end = get_rusage();
let stats = StageStats { let stats = StageStats {
label: self.label, label: self.label,
wall_secs, wall_secs,
user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime), user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime),
sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime), sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime),
max_rss_bytes: rss_to_bytes(&end), max_rss_bytes: rss_to_bytes(&end),
minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64), minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64),
major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64), major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64),
vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64), vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64),
invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64), invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64),
in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64), in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64),
out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64), out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64),
swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64), swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64),
}; };
info!( info!(
stage = %stats.label, stage = %stats.label,
@@ -440,30 +299,27 @@ impl Stage {
/// Per-stage efficiency metrics collected from `getrusage(RUSAGE_SELF)` deltas. /// Per-stage efficiency metrics collected from `getrusage(RUSAGE_SELF)` deltas.
pub struct StageStats { pub struct StageStats {
pub label: String, pub label: String,
pub wall_secs: f64, pub wall_secs: f64,
pub user_secs: f64, pub user_secs: f64,
pub sys_secs: f64, pub sys_secs: f64,
/// Peak RSS at end of stage (bytes). ru_maxrss is a process-lifetime maximum, /// Peak RSS at end of stage (bytes). ru_maxrss is a process-lifetime maximum,
/// so this reflects the high-water mark up to and including this stage. /// so this reflects the high-water mark up to and including this stage.
pub max_rss_bytes: u64, pub max_rss_bytes: u64,
pub minor_faults: u64, pub minor_faults: u64,
pub major_faults: u64, pub major_faults: u64,
pub vol_ctx: u64, // voluntary context switches pub vol_ctx: u64, // voluntary context switches
pub invol_ctx: u64, // involuntary context switches pub invol_ctx: u64, // involuntary context switches
pub in_blocks: u64, // filesystem block reads (after page cache) pub in_blocks: u64, // filesystem block reads (after page cache)
pub out_blocks: u64, // filesystem block writes pub out_blocks: u64, // filesystem block writes
pub swaps: u64, pub swaps: u64,
} }
impl StageStats { impl StageStats {
/// (user + sys) / wall — effective thread count utilisation. /// (user + sys) / wall — effective thread count utilisation.
pub fn parallelism(&self) -> f64 { pub fn parallelism(&self) -> f64 {
if self.wall_secs > 1e-9 { if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs }
(self.user_secs + self.sys_secs) / self.wall_secs else { 0.0 }
} else {
0.0
}
} }
/// parallelism / n_cores — fraction of available CPU power used (0..1+). /// parallelism / n_cores — fraction of available CPU power used (0..1+).
@@ -479,33 +335,25 @@ pub struct Reporter {
} }
impl Reporter { impl Reporter {
pub fn new() -> Self { pub fn new() -> Self { Self::default() }
Self::default() pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); }
} pub fn stages(&self) -> &[StageStats] { &self.stages }
pub fn push(&mut self, stats: StageStats) {
self.stages.push(stats);
}
pub fn stages(&self) -> &[StageStats] {
&self.stages
}
/// Print the summary to stderr. /// Print the summary to stderr.
pub fn print(&self) { pub fn print(&self) { eprint!("{self}"); }
eprint!("{self}");
}
} }
// ── diagnosis ───────────────────────────────────────────────────────────────── // ── diagnosis ─────────────────────────────────────────────────────────────────
struct Diagnosis { struct Diagnosis {
tag: &'static str, tag: &'static str,
detail: Option<String>, detail: Option<String>,
} }
// Thresholds are intentionally conservative to avoid false positives. // Thresholds are intentionally conservative to avoid false positives.
fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis { fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
let eff = s.efficiency(n_cores); let eff = s.efficiency(n_cores);
let cpu_pct = eff * 100.0; let cpu_pct = eff * 100.0;
let io_ops = s.in_blocks + s.out_blocks; let io_ops = s.in_blocks + s.out_blocks;
// swaps > 0 is the only reliable cross-platform indicator of true RAM exhaustion. // swaps > 0 is the only reliable cross-platform indicator of true RAM exhaustion.
// ru_majflt is intentionally excluded: on macOS it counts all file-backed mmap // ru_majflt is intentionally excluded: on macOS it counts all file-backed mmap
@@ -539,43 +387,26 @@ fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
)), )),
}; };
} }
Diagnosis { Diagnosis { tag: "", detail: None }
tag: "",
detail: None,
}
} }
// ── display helpers ─────────────────────────────────────────────────────────── // ── display helpers ───────────────────────────────────────────────────────────
fn fmt_secs(s: f64) -> String { fn fmt_secs(s: f64) -> String {
if s >= 100.0 { if s >= 100.0 { format!("{:.0}s", s) }
format!("{:.0}s", s) else if s >= 10.0 { format!("{:.1}s", s) }
} else if s >= 10.0 { else if s >= 1.0 { format!("{:.2}s", s) }
format!("{:.1}s", s) else { format!("{:.0}ms", s * 1000.0) }
} else if s >= 1.0 {
format!("{:.2}s", s)
} else {
format!("{:.0}ms", s * 1000.0)
}
} }
fn fmt_bytes(b: u64) -> String { fn fmt_bytes(b: u64) -> String {
if b >= 1 << 30 { if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) }
} else if b >= 1 << 20 { else { format!("{:.0} KB", b as f64 / 1024.0) }
format!("{:.0} MB", b as f64 / (1u64 << 20) as f64)
} else {
format!("{:.0} KB", b as f64 / 1024.0)
}
} }
fn fmt_efficiency(par: f64, n_cores: usize) -> String { fn fmt_efficiency(par: f64, n_cores: usize) -> String {
format!( format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0)
"{:.1}×/{} ({:.0}%)",
par,
n_cores,
par / n_cores as f64 * 100.0
)
} }
// ── Display ─────────────────────────────────────────────────────────────────── // ── Display ───────────────────────────────────────────────────────────────────
@@ -583,8 +414,8 @@ fn fmt_efficiency(par: f64, n_cores: usize) -> String {
// ── MemoryBudget ────────────────────────────────────────────────────────────── // ── MemoryBudget ──────────────────────────────────────────────────────────────
struct BudgetInner { struct BudgetInner {
remaining: u64, remaining: u64,
active: usize, active: usize,
peak_active: usize, peak_active: usize,
} }
@@ -594,8 +425,8 @@ struct BudgetInner {
/// completion. Non-deadlock guarantee: when no worker is active the next /// completion. Non-deadlock guarantee: when no worker is active the next
/// acquire always succeeds regardless of cost vs. remaining budget. /// acquire always succeeds regardless of cost vs. remaining budget.
pub struct MemoryBudget { pub struct MemoryBudget {
total: u64, total: u64,
inner: Mutex<BudgetInner>, inner: Mutex<BudgetInner>,
condvar: Condvar, condvar: Condvar,
} }
@@ -603,11 +434,7 @@ impl MemoryBudget {
pub fn new(total: u64) -> Self { pub fn new(total: u64) -> Self {
Self { Self {
total, total,
inner: Mutex::new(BudgetInner { inner: Mutex::new(BudgetInner { remaining: total, active: 0, peak_active: 0 }),
remaining: total,
active: 0,
peak_active: 0,
}),
condvar: Condvar::new(), condvar: Condvar::new(),
} }
} }
@@ -616,9 +443,9 @@ impl MemoryBudget {
let mut g = self.inner.lock().unwrap(); let mut g = self.inner.lock().unwrap();
loop { loop {
if g.active == 0 || g.remaining >= cost { if g.active == 0 || g.remaining >= cost {
g.remaining = g.remaining.saturating_sub(cost); g.remaining = g.remaining.saturating_sub(cost);
g.active += 1; g.active += 1;
g.peak_active = g.peak_active.max(g.active); g.peak_active = g.peak_active.max(g.active);
return; return;
} }
g = self.condvar.wait(g).unwrap(); g = self.condvar.wait(g).unwrap();
@@ -628,66 +455,47 @@ impl MemoryBudget {
pub fn release(&self, cost: u64) { pub fn release(&self, cost: u64) {
let mut g = self.inner.lock().unwrap(); let mut g = self.inner.lock().unwrap();
g.remaining = (g.remaining + cost).min(self.total); g.remaining = (g.remaining + cost).min(self.total);
g.active -= 1; g.active -= 1;
self.condvar.notify_all(); self.condvar.notify_all();
} }
pub fn total(&self) -> u64 { pub fn total(&self) -> u64 { self.total }
self.total pub fn active(&self) -> usize { self.inner.lock().unwrap().active }
} pub fn remaining(&self) -> u64 { self.inner.lock().unwrap().remaining }
pub fn active(&self) -> usize { pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active }
self.inner.lock().unwrap().active
}
pub fn remaining(&self) -> u64 {
self.inner.lock().unwrap().remaining
}
pub fn peak_active(&self) -> usize {
self.inner.lock().unwrap().peak_active
}
} }
// ── Display ─────────────────────────────────────────────────────────────────── // ── Display ───────────────────────────────────────────────────────────────────
impl fmt::Display for Reporter { impl fmt::Display for Reporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.stages.is_empty() { if self.stages.is_empty() { return Ok(()); }
return Ok(());
}
let n_cores = std::thread::available_parallelism() let n_cores = std::thread::available_parallelism()
.map(|n| n.get()) .map(|n| n.get())
.unwrap_or(1); .unwrap_or(1);
// column widths // column widths
let nw = self let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5);
.stages
.iter()
.map(|s| s.label.len())
.max()
.unwrap_or(5)
.max(5);
// efficiency col: worst-case width for this run's n_cores value // efficiency col: worst-case width for this run's n_cores value
let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len(); let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len();
let sep_w = nw + 2 + 7 + 2 + ew + 2 + 8 + 2 + 12; let sep_w = nw + 2 + 7 + 2 + ew + 2 + 8 + 2 + 12;
let sep = "".repeat(sep_w); let sep = "".repeat(sep_w);
// header // header
writeln!( writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} status",
f, "stage", "wall", "efficiency", "peak RSS")?;
"{:<nw$} {:>7} {:>ew$} {:>8} status",
"stage", "wall", "efficiency", "peak RSS"
)?;
writeln!(f, "{sep}")?; writeln!(f, "{sep}")?;
// compute all diagnoses up front (needed for both table and footnotes) // compute all diagnoses up front (needed for both table and footnotes)
let diagnoses: Vec<Diagnosis> = self.stages.iter().map(|s| diagnose(s, n_cores)).collect(); let diagnoses: Vec<Diagnosis> = self.stages.iter()
.map(|s| diagnose(s, n_cores))
.collect();
// per-stage rows // per-stage rows
for (s, d) in self.stages.iter().zip(diagnoses.iter()) { for (s, d) in self.stages.iter().zip(diagnoses.iter()) {
writeln!( writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} {}",
f,
"{:<nw$} {:>7} {:>ew$} {:>8} {}",
s.label, s.label,
fmt_secs(s.wall_secs), fmt_secs(s.wall_secs),
fmt_efficiency(s.parallelism(), n_cores), fmt_efficiency(s.parallelism(), n_cores),
@@ -697,21 +505,14 @@ impl fmt::Display for Reporter {
} }
// totals // totals
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>(); let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>(); let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>(); let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
let trss = self let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0);
.stages
.iter()
.map(|s| s.max_rss_bytes)
.max()
.unwrap_or(0);
let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 }; let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 };
writeln!(f, "{sep}")?; writeln!(f, "{sep}")?;
writeln!( writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8}",
f,
"{:<nw$} {:>7} {:>ew$} {:>8}",
"TOTAL", "TOTAL",
fmt_secs(tw), fmt_secs(tw),
fmt_efficiency(tpar, n_cores), fmt_efficiency(tpar, n_cores),
@@ -719,9 +520,7 @@ impl fmt::Display for Reporter {
)?; )?;
// bottleneck footnotes (only if at least one anomaly detected) // bottleneck footnotes (only if at least one anomaly detected)
let bottlenecks: Vec<(&str, &str)> = self let bottlenecks: Vec<(&str, &str)> = self.stages.iter()
.stages
.iter()
.zip(diagnoses.iter()) .zip(diagnoses.iter())
.filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det))) .filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det)))
.collect(); .collect();