Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c612132763 | |||
| 19660f8cd0 | |||
| 7b07540a69 | |||
| 89c43e28f5 | |||
| b9b2e42ad2 | |||
| ca42fdff2f | |||
| 136cd89efb | |||
| a4bbf607b7 | |||
| 9927100a1c | |||
| 527258f822 | |||
| ef62f1947e | |||
| d02316dcf6 | |||
| c323b3eaef | |||
| b77d8e9ca0 | |||
| 7c5bab3694 | |||
| fab4e0d6de | |||
| 973a3f3d6e | |||
| 1a839a295a |
@@ -1,9 +1,8 @@
|
||||
name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: ['main']
|
||||
pull_request:
|
||||
branches: ['main']
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
@@ -13,7 +13,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-tags: true
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Create Gitea release
|
||||
id: create
|
||||
@@ -86,20 +86,11 @@ jobs:
|
||||
build-macos-arm64:
|
||||
needs: create-release
|
||||
runs-on: ubuntu-latest
|
||||
defaults:
|
||||
run:
|
||||
working-directory: src
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install Rust + zigbuild
|
||||
run: |
|
||||
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain stable
|
||||
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
|
||||
sudo apt-get update -qq && sudo apt-get install -y -qq jq
|
||||
pip install ziglang --quiet --break-system-packages
|
||||
$HOME/.cargo/bin/cargo install cargo-zigbuild
|
||||
$HOME/.cargo/bin/rustup target add aarch64-apple-darwin
|
||||
- name: Login to registry
|
||||
run: echo "${{ secrets.REGISTRYTOKEN }}" | docker login registry.metabarcoding.org -u ${{ github.actor }} --password-stdin
|
||||
|
||||
- name: Cache cargo registry
|
||||
uses: actions/cache@v4
|
||||
@@ -112,7 +103,12 @@ jobs:
|
||||
restore-keys: macos-arm64-cargo-
|
||||
|
||||
- name: Build macOS binary
|
||||
run: cargo zigbuild --release --target aarch64-apple-darwin --no-default-features
|
||||
run: |
|
||||
docker run --rm \
|
||||
-v "${{ github.workspace }}:/src" \
|
||||
-w /src/src \
|
||||
registry.metabarcoding.org/cibuilder/rustcrossosx:latest \
|
||||
cargo build --release --target aarch64-apple-darwin --no-default-features
|
||||
|
||||
- name: Prepare and upload artifact
|
||||
env:
|
||||
@@ -120,7 +116,7 @@ jobs:
|
||||
RELEASE_ID: ${{ needs.create-release.outputs.release_id }}
|
||||
run: |
|
||||
mkdir -p /tmp/dist
|
||||
cp target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
|
||||
cp src/target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
|
||||
curl -s -X POST \
|
||||
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \
|
||||
-H "Authorization: token $GITEA_TOKEN" \
|
||||
|
||||
@@ -8,6 +8,7 @@ data-stress
|
||||
*.pb
|
||||
./**/*.json
|
||||
*.bin
|
||||
*.log
|
||||
Betula_exilis--IGA-24-33
|
||||
benchmark/genomes
|
||||
benchmark/simulated_data
|
||||
|
||||
@@ -92,6 +92,7 @@ release: bump-version
|
||||
git_hash=$$(jj log -r @ --no-graph -T 'commit_id'); \
|
||||
commits=$$(jj log -r 'latest(tags())..@' --no-graph -T 'description ++ "\n"' 2>/dev/null || \
|
||||
jj log --no-graph -T 'description ++ "\n"' --limit 30); \
|
||||
notes=$$(printf 'Write concise markdown release notes for obikmer (a Rust kmer genomics tool). Be technical and direct. Base them strictly on these commit messages:\n\n%s' "$$commits" | aichat); \
|
||||
git tag -a "v$$new_version" -m "$$notes" "$$git_hash" && \
|
||||
notes=$$(printf 'Write concise markdown release notes for obikmer (a Rust kmer genomics tool). Be technical and direct. Base them strictly on these commit messages:\n\n%s' "$$commits" | aichat 2>/dev/null); \
|
||||
tag_msg="$${notes:-Release v$$new_version}"; \
|
||||
git tag -a "v$$new_version" -m "$$tag_msg" "$$git_hash" && \
|
||||
git push origin "v$$new_version"
|
||||
|
||||
Generated
+1
-1
@@ -1704,7 +1704,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "obikmer"
|
||||
version = "1.1.17"
|
||||
version = "1.1.29"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"csv",
|
||||
|
||||
+17
-46
@@ -217,13 +217,9 @@ impl PartitionRunner {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
const SPAWN_THRESHOLD: f64 = 0.95;
|
||||
const SPAWN_THRESHOLD: f64 = 0.2;
|
||||
const TIMER_SECS: u64 = 30;
|
||||
|
||||
let n_cores = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1);
|
||||
|
||||
// ── Channels ──────────────────────────────────────────────────────────
|
||||
let (part_tx, part_rx) = unbounded::<usize>();
|
||||
let (activate_tx, activate_rx) = unbounded::<()>();
|
||||
@@ -287,11 +283,11 @@ impl PartitionRunner {
|
||||
drop(event_tx);
|
||||
|
||||
// ── Controller ────────────────────────────────────────────────────
|
||||
activate_tx.send(()).ok();
|
||||
let mut n_active = 1usize;
|
||||
let mut cpu_sample = CpuSample::now();
|
||||
let mut eff_at_last_spawn = 0.0f64; // 0 = no previous spawn to evaluate
|
||||
let mut completed = 0usize;
|
||||
let initial_workers = n_nodes.min(max_workers).min(n_total);
|
||||
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
|
||||
let mut n_active = initial_workers;
|
||||
let mut cpu_sample = CpuSample::now();
|
||||
let mut completed = 0usize;
|
||||
|
||||
while completed < n_total {
|
||||
let Ok(event) = event_rx.recv() else { break };
|
||||
@@ -307,15 +303,13 @@ impl PartitionRunner {
|
||||
// Inline check: same logic as a timer tick.
|
||||
maybe_activate(
|
||||
&activate_tx, &mut n_active, max_workers,
|
||||
&mut cpu_sample, &mut eff_at_last_spawn,
|
||||
n_cores, SPAWN_THRESHOLD, completed, n_total,
|
||||
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
|
||||
);
|
||||
}
|
||||
WorkerEvent::TimerTick => {
|
||||
maybe_activate(
|
||||
&activate_tx, &mut n_active, max_workers,
|
||||
&mut cpu_sample, &mut eff_at_last_spawn,
|
||||
n_cores, SPAWN_THRESHOLD, completed, n_total,
|
||||
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -342,42 +336,19 @@ enum WorkerEvent<R, E> {
|
||||
}
|
||||
|
||||
fn maybe_activate(
|
||||
activate_tx: &crossbeam_channel::Sender<()>,
|
||||
n_active: &mut usize,
|
||||
max_workers: usize,
|
||||
cpu_sample: &mut CpuSample,
|
||||
eff_at_last_spawn: &mut f64,
|
||||
n_cores: usize,
|
||||
threshold: f64,
|
||||
completed: usize,
|
||||
n_total: usize,
|
||||
activate_tx: &crossbeam_channel::Sender<()>,
|
||||
n_active: &mut usize,
|
||||
max_workers: usize,
|
||||
cpu_sample: &mut CpuSample,
|
||||
threshold: f64,
|
||||
completed: usize,
|
||||
n_total: usize,
|
||||
) {
|
||||
if *n_active >= max_workers || completed >= n_total { return; }
|
||||
|
||||
let eff = cpu_sample.cpu_efficiency(n_cores);
|
||||
if eff >= threshold { return; } // CPU already saturated
|
||||
|
||||
// Check that the previous activation was beneficial enough.
|
||||
// Going from k-1 → k workers, the minimum acceptable speedup is (k-1+0.2)/(k-1).
|
||||
// For the very first extra worker (n_active == 1, no previous spawn), skip this
|
||||
// check: eff_at_last_spawn == 0 acts as the sentinel.
|
||||
let last_spawn_was_beneficial = if *eff_at_last_spawn < 1e-9 {
|
||||
true // first additional worker: no prior data to evaluate
|
||||
} else {
|
||||
let k_before = (*n_active - 1) as f64;
|
||||
let min_speedup = (k_before + 0.2) / k_before;
|
||||
let actual_speedup = eff / *eff_at_last_spawn;
|
||||
actual_speedup >= min_speedup
|
||||
};
|
||||
|
||||
if last_spawn_was_beneficial {
|
||||
if cpu_sample.do_i_activate(threshold) {
|
||||
activate_tx.send(()).ok();
|
||||
*eff_at_last_spawn = eff;
|
||||
*n_active += 1;
|
||||
*cpu_sample = CpuSample::now();
|
||||
debug!(
|
||||
"activated worker {}/{} — efficiency {:.0}%",
|
||||
n_active, max_workers, eff * 100.0,
|
||||
);
|
||||
debug!("activated worker {}/{}", n_active, max_workers);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "obikmer"
|
||||
version = "1.1.17"
|
||||
version = "1.1.29"
|
||||
edition = "2024"
|
||||
|
||||
[[bin]]
|
||||
@@ -18,7 +18,7 @@ obikrope = { path = "../obikrope" }
|
||||
obikpartitionner = { path = "../obikpartitionner" }
|
||||
obisys = { path = "../obisys" }
|
||||
obiskio = { path = "../obiskio" }
|
||||
obikindex = { path = "../obikindex" }
|
||||
obikindex = { path = "../obikindex", default-features = false }
|
||||
obitaxonomy = { path = "../obitaxonomy" }
|
||||
obilayeredmap = { path = "../obilayeredmap" }
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
@@ -33,4 +33,6 @@ tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
|
||||
pprof = { version = "0.13", features = ["prost-codec"], optional = true }
|
||||
|
||||
[features]
|
||||
default = ["numa"]
|
||||
numa = ["obikindex/numa"]
|
||||
profiling = ["dep:pprof"]
|
||||
|
||||
+210
-100
@@ -4,7 +4,7 @@ use std::sync::{Condvar, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use tracing::{info, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"];
|
||||
|
||||
@@ -14,24 +14,25 @@ const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧
|
||||
/// a TTY (e.g. HPC job logs): every 10% for bounded bars, every ~10 s for
|
||||
/// spinners (throttled on `set_message`).
|
||||
pub struct TracedBar {
|
||||
pb: ProgressBar,
|
||||
label: String,
|
||||
unit: String,
|
||||
total: u64, // 0 for spinners
|
||||
start: Instant, // creation time, for spinner throttling
|
||||
last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet
|
||||
last_log_ms: AtomicU64, // ms since `start` at last spinner log
|
||||
pb: ProgressBar,
|
||||
label: String,
|
||||
unit: String,
|
||||
total: u64, // 0 for spinners
|
||||
start: Instant, // creation time, for spinner throttling
|
||||
last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet
|
||||
last_log_ms: AtomicU64, // ms since `start` at last spinner log
|
||||
}
|
||||
|
||||
impl TracedBar {
|
||||
pub fn inc(&self, delta: u64) {
|
||||
self.pb.inc(delta);
|
||||
if self.pb.is_hidden() && self.total > 0 {
|
||||
let pos = self.pb.position();
|
||||
let pos = self.pb.position();
|
||||
let pct10 = (pos * 10) / self.total; // 0..=10
|
||||
let last = self.last_pct.load(Ordering::Relaxed);
|
||||
let last = self.last_pct.load(Ordering::Relaxed);
|
||||
if pct10 > last
|
||||
&& self.last_pct
|
||||
&& self
|
||||
.last_pct
|
||||
.compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
@@ -49,14 +50,14 @@ impl TracedBar {
|
||||
let msg = msg.into();
|
||||
if self.pb.is_hidden() {
|
||||
if self.total > 0 {
|
||||
// bounded bar: always log (already rate-limited by 10% threshold in inc)
|
||||
info!(stage = %self.label, "{msg}");
|
||||
debug!(stage = %self.label, "{msg}");
|
||||
} else {
|
||||
// spinner: throttle to ~10 s
|
||||
let now_ms = self.start.elapsed().as_millis() as u64;
|
||||
let last = self.last_log_ms.load(Ordering::Relaxed);
|
||||
let last = self.last_log_ms.load(Ordering::Relaxed);
|
||||
if now_ms >= last + 10_000
|
||||
&& self.last_log_ms
|
||||
&& self
|
||||
.last_log_ms
|
||||
.compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
@@ -83,8 +84,13 @@ pub fn spinner(label: &str) -> TracedBar {
|
||||
);
|
||||
pb.enable_steady_tick(Duration::from_millis(100));
|
||||
TracedBar {
|
||||
pb, label: label.to_string(), unit: String::new(), total: 0,
|
||||
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
|
||||
pb,
|
||||
label: label.to_string(),
|
||||
unit: String::new(),
|
||||
total: 0,
|
||||
start: Instant::now(),
|
||||
last_pct: AtomicU64::new(0),
|
||||
last_log_ms: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,8 +107,13 @@ pub fn progress_bar(label: &str, n: u64, unit: &str) -> TracedBar {
|
||||
);
|
||||
pb.enable_steady_tick(Duration::from_millis(100));
|
||||
TracedBar {
|
||||
pb, label: label.to_string(), unit: unit.to_string(), total: n,
|
||||
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
|
||||
pb,
|
||||
label: label.to_string(),
|
||||
unit: unit.to_string(),
|
||||
total: n,
|
||||
start: Instant::now(),
|
||||
last_pct: AtomicU64::new(0),
|
||||
last_log_ms: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,13 +215,19 @@ fn tv_to_secs(tv: timeval) -> f64 {
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 }
|
||||
fn rss_to_bytes(ru: &rusage) -> u64 {
|
||||
ru.ru_maxrss as u64
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 }
|
||||
fn rss_to_bytes(ru: &rusage) -> u64 {
|
||||
ru.ru_maxrss as u64 * 1024
|
||||
}
|
||||
|
||||
// Monotonically increasing counters — negative delta would be a kernel bug.
|
||||
fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
|
||||
fn delta(end: i64, start: i64) -> u64 {
|
||||
(end - start).max(0) as u64
|
||||
}
|
||||
|
||||
// ── CpuSample ─────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -218,31 +235,60 @@ fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
|
||||
/// Use [`cpu_efficiency`](Self::cpu_efficiency) to measure the fraction of
|
||||
/// available cores used since the snapshot was taken.
|
||||
pub struct CpuSample {
|
||||
wall: Instant,
|
||||
wall: Instant,
|
||||
user_secs: f64,
|
||||
sys_secs: f64,
|
||||
sys_secs: f64,
|
||||
previous: f64,
|
||||
}
|
||||
|
||||
impl CpuSample {
|
||||
pub fn now() -> Self {
|
||||
let ru = get_rusage();
|
||||
Self {
|
||||
wall: Instant::now(),
|
||||
wall: Instant::now(),
|
||||
user_secs: tv_to_secs(ru.ru_utime),
|
||||
sys_secs: tv_to_secs(ru.ru_stime),
|
||||
sys_secs: tv_to_secs(ru.ru_stime),
|
||||
previous: 0.0,
|
||||
}
|
||||
}
|
||||
|
||||
/// (user_delta + sys_delta) / (wall_delta × n_cores) since this snapshot.
|
||||
/// Returns 0.0 if less than 100 ms have elapsed (too noisy).
|
||||
pub fn cpu_efficiency(&self, n_cores: usize) -> f64 {
|
||||
let ru = get_rusage();
|
||||
let ru = get_rusage();
|
||||
let wall = self.wall.elapsed().as_secs_f64();
|
||||
if wall < 0.1 { return 0.0; }
|
||||
let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs)
|
||||
+ (tv_to_secs(ru.ru_stime) - self.sys_secs);
|
||||
if wall < 0.1 {
|
||||
return 0.0;
|
||||
}
|
||||
let cpu =
|
||||
(tv_to_secs(ru.ru_utime) - self.user_secs) + (tv_to_secs(ru.ru_stime) - self.sys_secs);
|
||||
cpu / (wall * n_cores as f64)
|
||||
}
|
||||
|
||||
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
||||
let n = CpuSample::now();
|
||||
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
|
||||
let delta_wall = self.wall.elapsed().as_secs_f64();
|
||||
|
||||
let efficiency = delta_ru / delta_wall;
|
||||
let activate = 0f64.max(efficiency - self.previous) >= threshold;
|
||||
|
||||
if activate {
|
||||
debug!(
|
||||
"Do I activate : {} -> {} = {} Activate: {}",
|
||||
self.previous,
|
||||
efficiency,
|
||||
0f64.max(efficiency - self.previous),
|
||||
activate
|
||||
);
|
||||
self.previous = efficiency;
|
||||
self.user_secs = n.user_secs;
|
||||
self.sys_secs = n.sys_secs;
|
||||
self.wall = n.wall;
|
||||
}
|
||||
|
||||
activate
|
||||
}
|
||||
}
|
||||
|
||||
// ── public API ────────────────────────────────────────────────────────────────
|
||||
@@ -251,33 +297,37 @@ impl CpuSample {
|
||||
#[must_use = "call .stop() to record the stage"]
|
||||
pub struct Stage {
|
||||
label: String,
|
||||
wall: Instant,
|
||||
ru: rusage,
|
||||
wall: Instant,
|
||||
ru: rusage,
|
||||
}
|
||||
|
||||
impl Stage {
|
||||
pub fn start(label: impl Into<String>) -> Self {
|
||||
let label = label.into();
|
||||
info!(stage = %label, "started");
|
||||
Self { label, wall: Instant::now(), ru: get_rusage() }
|
||||
Self {
|
||||
label,
|
||||
wall: Instant::now(),
|
||||
ru: get_rusage(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(self) -> StageStats {
|
||||
let wall_secs = self.wall.elapsed().as_secs_f64();
|
||||
let end = get_rusage();
|
||||
let stats = StageStats {
|
||||
label: self.label,
|
||||
label: self.label,
|
||||
wall_secs,
|
||||
user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime),
|
||||
sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime),
|
||||
user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime),
|
||||
sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime),
|
||||
max_rss_bytes: rss_to_bytes(&end),
|
||||
minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64),
|
||||
major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64),
|
||||
vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64),
|
||||
invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64),
|
||||
in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64),
|
||||
out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64),
|
||||
swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64),
|
||||
minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64),
|
||||
major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64),
|
||||
vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64),
|
||||
invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64),
|
||||
in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64),
|
||||
out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64),
|
||||
swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64),
|
||||
};
|
||||
info!(
|
||||
stage = %stats.label,
|
||||
@@ -299,27 +349,30 @@ impl Stage {
|
||||
|
||||
/// Per-stage efficiency metrics collected from `getrusage(RUSAGE_SELF)` deltas.
|
||||
pub struct StageStats {
|
||||
pub label: String,
|
||||
pub wall_secs: f64,
|
||||
pub user_secs: f64,
|
||||
pub sys_secs: f64,
|
||||
pub label: String,
|
||||
pub wall_secs: f64,
|
||||
pub user_secs: f64,
|
||||
pub sys_secs: f64,
|
||||
/// Peak RSS at end of stage (bytes). ru_maxrss is a process-lifetime maximum,
|
||||
/// so this reflects the high-water mark up to and including this stage.
|
||||
pub max_rss_bytes: u64,
|
||||
pub minor_faults: u64,
|
||||
pub major_faults: u64,
|
||||
pub vol_ctx: u64, // voluntary context switches
|
||||
pub invol_ctx: u64, // involuntary context switches
|
||||
pub in_blocks: u64, // filesystem block reads (after page cache)
|
||||
pub out_blocks: u64, // filesystem block writes
|
||||
pub swaps: u64,
|
||||
pub minor_faults: u64,
|
||||
pub major_faults: u64,
|
||||
pub vol_ctx: u64, // voluntary context switches
|
||||
pub invol_ctx: u64, // involuntary context switches
|
||||
pub in_blocks: u64, // filesystem block reads (after page cache)
|
||||
pub out_blocks: u64, // filesystem block writes
|
||||
pub swaps: u64,
|
||||
}
|
||||
|
||||
impl StageStats {
|
||||
/// (user + sys) / wall — effective thread count utilisation.
|
||||
pub fn parallelism(&self) -> f64 {
|
||||
if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs }
|
||||
else { 0.0 }
|
||||
if self.wall_secs > 1e-9 {
|
||||
(self.user_secs + self.sys_secs) / self.wall_secs
|
||||
} else {
|
||||
0.0
|
||||
}
|
||||
}
|
||||
|
||||
/// parallelism / n_cores — fraction of available CPU power used (0..1+).
|
||||
@@ -335,25 +388,33 @@ pub struct Reporter {
|
||||
}
|
||||
|
||||
impl Reporter {
|
||||
pub fn new() -> Self { Self::default() }
|
||||
pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); }
|
||||
pub fn stages(&self) -> &[StageStats] { &self.stages }
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
pub fn push(&mut self, stats: StageStats) {
|
||||
self.stages.push(stats);
|
||||
}
|
||||
pub fn stages(&self) -> &[StageStats] {
|
||||
&self.stages
|
||||
}
|
||||
/// Print the summary to stderr.
|
||||
pub fn print(&self) { eprint!("{self}"); }
|
||||
pub fn print(&self) {
|
||||
eprint!("{self}");
|
||||
}
|
||||
}
|
||||
|
||||
// ── diagnosis ─────────────────────────────────────────────────────────────────
|
||||
|
||||
struct Diagnosis {
|
||||
tag: &'static str,
|
||||
tag: &'static str,
|
||||
detail: Option<String>,
|
||||
}
|
||||
|
||||
// Thresholds are intentionally conservative to avoid false positives.
|
||||
fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
|
||||
let eff = s.efficiency(n_cores);
|
||||
let eff = s.efficiency(n_cores);
|
||||
let cpu_pct = eff * 100.0;
|
||||
let io_ops = s.in_blocks + s.out_blocks;
|
||||
let io_ops = s.in_blocks + s.out_blocks;
|
||||
|
||||
// swaps > 0 is the only reliable cross-platform indicator of true RAM exhaustion.
|
||||
// ru_majflt is intentionally excluded: on macOS it counts all file-backed mmap
|
||||
@@ -387,26 +448,43 @@ fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
|
||||
)),
|
||||
};
|
||||
}
|
||||
Diagnosis { tag: "—", detail: None }
|
||||
Diagnosis {
|
||||
tag: "—",
|
||||
detail: None,
|
||||
}
|
||||
}
|
||||
|
||||
// ── display helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
fn fmt_secs(s: f64) -> String {
|
||||
if s >= 100.0 { format!("{:.0}s", s) }
|
||||
else if s >= 10.0 { format!("{:.1}s", s) }
|
||||
else if s >= 1.0 { format!("{:.2}s", s) }
|
||||
else { format!("{:.0}ms", s * 1000.0) }
|
||||
if s >= 100.0 {
|
||||
format!("{:.0}s", s)
|
||||
} else if s >= 10.0 {
|
||||
format!("{:.1}s", s)
|
||||
} else if s >= 1.0 {
|
||||
format!("{:.2}s", s)
|
||||
} else {
|
||||
format!("{:.0}ms", s * 1000.0)
|
||||
}
|
||||
}
|
||||
|
||||
fn fmt_bytes(b: u64) -> String {
|
||||
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
|
||||
else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) }
|
||||
else { format!("{:.0} KB", b as f64 / 1024.0) }
|
||||
if b >= 1 << 30 {
|
||||
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64)
|
||||
} else if b >= 1 << 20 {
|
||||
format!("{:.0} MB", b as f64 / (1u64 << 20) as f64)
|
||||
} else {
|
||||
format!("{:.0} KB", b as f64 / 1024.0)
|
||||
}
|
||||
}
|
||||
|
||||
fn fmt_efficiency(par: f64, n_cores: usize) -> String {
|
||||
format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0)
|
||||
format!(
|
||||
"{:.1}×/{} ({:.0}%)",
|
||||
par,
|
||||
n_cores,
|
||||
par / n_cores as f64 * 100.0
|
||||
)
|
||||
}
|
||||
|
||||
// ── Display ───────────────────────────────────────────────────────────────────
|
||||
@@ -414,8 +492,8 @@ fn fmt_efficiency(par: f64, n_cores: usize) -> String {
|
||||
// ── MemoryBudget ──────────────────────────────────────────────────────────────
|
||||
|
||||
struct BudgetInner {
|
||||
remaining: u64,
|
||||
active: usize,
|
||||
remaining: u64,
|
||||
active: usize,
|
||||
peak_active: usize,
|
||||
}
|
||||
|
||||
@@ -425,8 +503,8 @@ struct BudgetInner {
|
||||
/// completion. Non-deadlock guarantee: when no worker is active the next
|
||||
/// acquire always succeeds regardless of cost vs. remaining budget.
|
||||
pub struct MemoryBudget {
|
||||
total: u64,
|
||||
inner: Mutex<BudgetInner>,
|
||||
total: u64,
|
||||
inner: Mutex<BudgetInner>,
|
||||
condvar: Condvar,
|
||||
}
|
||||
|
||||
@@ -434,7 +512,11 @@ impl MemoryBudget {
|
||||
pub fn new(total: u64) -> Self {
|
||||
Self {
|
||||
total,
|
||||
inner: Mutex::new(BudgetInner { remaining: total, active: 0, peak_active: 0 }),
|
||||
inner: Mutex::new(BudgetInner {
|
||||
remaining: total,
|
||||
active: 0,
|
||||
peak_active: 0,
|
||||
}),
|
||||
condvar: Condvar::new(),
|
||||
}
|
||||
}
|
||||
@@ -443,9 +525,9 @@ impl MemoryBudget {
|
||||
let mut g = self.inner.lock().unwrap();
|
||||
loop {
|
||||
if g.active == 0 || g.remaining >= cost {
|
||||
g.remaining = g.remaining.saturating_sub(cost);
|
||||
g.active += 1;
|
||||
g.peak_active = g.peak_active.max(g.active);
|
||||
g.remaining = g.remaining.saturating_sub(cost);
|
||||
g.active += 1;
|
||||
g.peak_active = g.peak_active.max(g.active);
|
||||
return;
|
||||
}
|
||||
g = self.condvar.wait(g).unwrap();
|
||||
@@ -455,47 +537,66 @@ impl MemoryBudget {
|
||||
pub fn release(&self, cost: u64) {
|
||||
let mut g = self.inner.lock().unwrap();
|
||||
g.remaining = (g.remaining + cost).min(self.total);
|
||||
g.active -= 1;
|
||||
g.active -= 1;
|
||||
self.condvar.notify_all();
|
||||
}
|
||||
|
||||
pub fn total(&self) -> u64 { self.total }
|
||||
pub fn active(&self) -> usize { self.inner.lock().unwrap().active }
|
||||
pub fn remaining(&self) -> u64 { self.inner.lock().unwrap().remaining }
|
||||
pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active }
|
||||
pub fn total(&self) -> u64 {
|
||||
self.total
|
||||
}
|
||||
pub fn active(&self) -> usize {
|
||||
self.inner.lock().unwrap().active
|
||||
}
|
||||
pub fn remaining(&self) -> u64 {
|
||||
self.inner.lock().unwrap().remaining
|
||||
}
|
||||
pub fn peak_active(&self) -> usize {
|
||||
self.inner.lock().unwrap().peak_active
|
||||
}
|
||||
}
|
||||
|
||||
// ── Display ───────────────────────────────────────────────────────────────────
|
||||
|
||||
impl fmt::Display for Reporter {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
if self.stages.is_empty() { return Ok(()); }
|
||||
if self.stages.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let n_cores = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1);
|
||||
|
||||
// column widths
|
||||
let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5);
|
||||
let nw = self
|
||||
.stages
|
||||
.iter()
|
||||
.map(|s| s.label.len())
|
||||
.max()
|
||||
.unwrap_or(5)
|
||||
.max(5);
|
||||
// efficiency col: worst-case width for this run's n_cores value
|
||||
let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len();
|
||||
|
||||
let sep_w = nw + 2 + 7 + 2 + ew + 2 + 8 + 2 + 12;
|
||||
let sep = "─".repeat(sep_w);
|
||||
let sep = "─".repeat(sep_w);
|
||||
|
||||
// header
|
||||
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} status",
|
||||
"stage", "wall", "efficiency", "peak RSS")?;
|
||||
writeln!(
|
||||
f,
|
||||
"{:<nw$} {:>7} {:>ew$} {:>8} status",
|
||||
"stage", "wall", "efficiency", "peak RSS"
|
||||
)?;
|
||||
writeln!(f, "{sep}")?;
|
||||
|
||||
// compute all diagnoses up front (needed for both table and footnotes)
|
||||
let diagnoses: Vec<Diagnosis> = self.stages.iter()
|
||||
.map(|s| diagnose(s, n_cores))
|
||||
.collect();
|
||||
let diagnoses: Vec<Diagnosis> = self.stages.iter().map(|s| diagnose(s, n_cores)).collect();
|
||||
|
||||
// per-stage rows
|
||||
for (s, d) in self.stages.iter().zip(diagnoses.iter()) {
|
||||
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} {}",
|
||||
writeln!(
|
||||
f,
|
||||
"{:<nw$} {:>7} {:>ew$} {:>8} {}",
|
||||
s.label,
|
||||
fmt_secs(s.wall_secs),
|
||||
fmt_efficiency(s.parallelism(), n_cores),
|
||||
@@ -505,14 +606,21 @@ impl fmt::Display for Reporter {
|
||||
}
|
||||
|
||||
// totals
|
||||
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
|
||||
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
|
||||
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
|
||||
let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0);
|
||||
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
|
||||
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
|
||||
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
|
||||
let trss = self
|
||||
.stages
|
||||
.iter()
|
||||
.map(|s| s.max_rss_bytes)
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 };
|
||||
|
||||
writeln!(f, "{sep}")?;
|
||||
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8}",
|
||||
writeln!(
|
||||
f,
|
||||
"{:<nw$} {:>7} {:>ew$} {:>8}",
|
||||
"TOTAL",
|
||||
fmt_secs(tw),
|
||||
fmt_efficiency(tpar, n_cores),
|
||||
@@ -520,7 +628,9 @@ impl fmt::Display for Reporter {
|
||||
)?;
|
||||
|
||||
// bottleneck footnotes (only if at least one anomaly detected)
|
||||
let bottlenecks: Vec<(&str, &str)> = self.stages.iter()
|
||||
let bottlenecks: Vec<(&str, &str)> = self
|
||||
.stages
|
||||
.iter()
|
||||
.zip(diagnoses.iter())
|
||||
.filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det)))
|
||||
.collect();
|
||||
|
||||
Reference in New Issue
Block a user