From c61213276361385515d78609bbad1fbd3b3f807b Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 1 Jul 2026 11:35:06 +0200 Subject: [PATCH] feat: simplify worker spawning logic and update macOS build workflow Updates the release workflow to run macOS builds inside a Docker container with explicit registry authentication and adjusted artifact paths. Bumps the obikmer crate version to 1.1.29 and adds *.log to .gitignore. Simplifies NUMA worker spawning by lowering the activation threshold from 0.95 to 0.2, replacing complex stateful tracking with a direct efficiency check, and downgrading progress logging to debug level. Includes general code formatting improvements for readability. --- .gitea/workflows/release.yml | 20 +-- .gitignore | 1 + src/Cargo.lock | 2 +- src/obikindex/src/numa.rs | 58 ++----- src/obikmer/Cargo.toml | 2 +- src/obisys/src/lib.rs | 310 ++++++++++++++++++++++++----------- 6 files changed, 237 insertions(+), 156 deletions(-) diff --git a/.gitea/workflows/release.yml b/.gitea/workflows/release.yml index 810a12e..672c7c7 100644 --- a/.gitea/workflows/release.yml +++ b/.gitea/workflows/release.yml @@ -86,17 +86,12 @@ jobs: build-macos-arm64: needs: create-release runs-on: ubuntu-latest - container: - image: registry.metabarcoding.org/cibuilder/rustcrossosx:latest - credentials: - username: ${{ github.actor }} - password: ${{ secrets.REGISTRYTOKEN }} - defaults: - run: - working-directory: src steps: - uses: actions/checkout@v4 + - name: Login to registry + run: echo "${{ secrets.REGISTRYTOKEN }}" | docker login registry.metabarcoding.org -u ${{ github.actor }} --password-stdin + - name: Cache cargo registry uses: actions/cache@v4 with: @@ -108,7 +103,12 @@ jobs: restore-keys: macos-arm64-cargo- - name: Build macOS binary - run: cargo build --release --target aarch64-apple-darwin --no-default-features + run: | + docker run --rm \ + -v "${{ github.workspace }}:/src" \ + -w /src/src \ + registry.metabarcoding.org/cibuilder/rustcrossosx:latest \ + cargo build --release --target aarch64-apple-darwin --no-default-features - name: Prepare and upload artifact env: @@ -116,7 +116,7 @@ jobs: RELEASE_ID: ${{ needs.create-release.outputs.release_id }} run: | mkdir -p /tmp/dist - cp target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64 + cp src/target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64 curl -s -X POST \ "${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \ -H "Authorization: token $GITEA_TOKEN" \ diff --git a/.gitignore b/.gitignore index e63f216..b44f5fc 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ data-stress *.pb ./**/*.json *.bin +*.log Betula_exilis--IGA-24-33 benchmark/genomes benchmark/simulated_data diff --git a/src/Cargo.lock b/src/Cargo.lock index 02603ed..8ae9014 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1704,7 +1704,7 @@ dependencies = [ [[package]] name = "obikmer" -version = "1.1.27" +version = "1.1.29" dependencies = [ "clap", "csv", diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs index d5c368f..4fca05c 100644 --- a/src/obikindex/src/numa.rs +++ b/src/obikindex/src/numa.rs @@ -217,13 +217,9 @@ impl PartitionRunner { return Ok(()); } - const SPAWN_THRESHOLD: f64 = 0.95; + const SPAWN_THRESHOLD: f64 = 0.2; const TIMER_SECS: u64 = 30; - let n_cores = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - // ── Channels ────────────────────────────────────────────────────────── let (part_tx, part_rx) = unbounded::(); let (activate_tx, activate_rx) = unbounded::<()>(); @@ -290,9 +286,8 @@ impl PartitionRunner { let initial_workers = n_nodes.min(max_workers).min(n_total); for _ in 0..initial_workers { activate_tx.send(()).ok(); } let mut n_active = initial_workers; - let mut cpu_sample = CpuSample::now(); - let mut eff_at_last_spawn = 0.0f64; // 0 = no previous spawn to evaluate - let mut completed = 0usize; + let mut cpu_sample = CpuSample::now(); + let mut completed = 0usize; while completed < n_total { let Ok(event) = event_rx.recv() else { break }; @@ -308,15 +303,13 @@ impl PartitionRunner { // Inline check: same logic as a timer tick. maybe_activate( &activate_tx, &mut n_active, max_workers, - &mut cpu_sample, &mut eff_at_last_spawn, - n_cores, SPAWN_THRESHOLD, completed, n_total, + &mut cpu_sample, SPAWN_THRESHOLD, completed, n_total, ); } WorkerEvent::TimerTick => { maybe_activate( &activate_tx, &mut n_active, max_workers, - &mut cpu_sample, &mut eff_at_last_spawn, - n_cores, SPAWN_THRESHOLD, completed, n_total, + &mut cpu_sample, SPAWN_THRESHOLD, completed, n_total, ); } } @@ -343,42 +336,19 @@ enum WorkerEvent { } fn maybe_activate( - activate_tx: &crossbeam_channel::Sender<()>, - n_active: &mut usize, - max_workers: usize, - cpu_sample: &mut CpuSample, - eff_at_last_spawn: &mut f64, - n_cores: usize, - threshold: f64, - completed: usize, - n_total: usize, + activate_tx: &crossbeam_channel::Sender<()>, + n_active: &mut usize, + max_workers: usize, + cpu_sample: &mut CpuSample, + threshold: f64, + completed: usize, + n_total: usize, ) { if *n_active >= max_workers || completed >= n_total { return; } - let eff = cpu_sample.cpu_efficiency(n_cores); - if eff >= threshold { return; } // CPU already saturated - - // Check that the previous activation was beneficial enough. - // Going from k-1 → k workers, the minimum acceptable speedup is (k-1+0.2)/(k-1). - // For the very first extra worker (n_active == 1, no previous spawn), skip this - // check: eff_at_last_spawn == 0 acts as the sentinel. - let last_spawn_was_beneficial = if *eff_at_last_spawn < 1e-9 || eff < 1e-9 { - true // first additional worker, or measurement too short: no prior data to evaluate - } else { - let k_new = *n_active as f64; // worker count after the last spawn - let min_gain = 0.2 / k_new; - let actual_gain = (eff - *eff_at_last_spawn) / eff; - actual_gain >= min_gain - }; - - if last_spawn_was_beneficial { + if cpu_sample.do_i_activate(threshold) { activate_tx.send(()).ok(); - *eff_at_last_spawn = eff; *n_active += 1; - *cpu_sample = CpuSample::now(); - debug!( - "activated worker {}/{} — efficiency {:.0}%", - n_active, max_workers, eff * 100.0, - ); + debug!("activated worker {}/{}", n_active, max_workers); } } diff --git a/src/obikmer/Cargo.toml b/src/obikmer/Cargo.toml index 6658b67..086f4ce 100644 --- a/src/obikmer/Cargo.toml +++ b/src/obikmer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "obikmer" -version = "1.1.27" +version = "1.1.29" edition = "2024" [[bin]] diff --git a/src/obisys/src/lib.rs b/src/obisys/src/lib.rs index aeccb1b..ebf2d3f 100644 --- a/src/obisys/src/lib.rs +++ b/src/obisys/src/lib.rs @@ -4,7 +4,7 @@ use std::sync::{Condvar, Mutex}; use std::time::{Duration, Instant}; use indicatif::{ProgressBar, ProgressStyle}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; @@ -14,24 +14,25 @@ const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧ /// a TTY (e.g. HPC job logs): every 10% for bounded bars, every ~10 s for /// spinners (throttled on `set_message`). pub struct TracedBar { - pb: ProgressBar, - label: String, - unit: String, - total: u64, // 0 for spinners - start: Instant, // creation time, for spinner throttling - last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet - last_log_ms: AtomicU64, // ms since `start` at last spinner log + pb: ProgressBar, + label: String, + unit: String, + total: u64, // 0 for spinners + start: Instant, // creation time, for spinner throttling + last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet + last_log_ms: AtomicU64, // ms since `start` at last spinner log } impl TracedBar { pub fn inc(&self, delta: u64) { self.pb.inc(delta); if self.pb.is_hidden() && self.total > 0 { - let pos = self.pb.position(); + let pos = self.pb.position(); let pct10 = (pos * 10) / self.total; // 0..=10 - let last = self.last_pct.load(Ordering::Relaxed); + let last = self.last_pct.load(Ordering::Relaxed); if pct10 > last - && self.last_pct + && self + .last_pct .compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { @@ -49,14 +50,14 @@ impl TracedBar { let msg = msg.into(); if self.pb.is_hidden() { if self.total > 0 { - // bounded bar: always log (already rate-limited by 10% threshold in inc) - info!(stage = %self.label, "{msg}"); + debug!(stage = %self.label, "{msg}"); } else { // spinner: throttle to ~10 s let now_ms = self.start.elapsed().as_millis() as u64; - let last = self.last_log_ms.load(Ordering::Relaxed); + let last = self.last_log_ms.load(Ordering::Relaxed); if now_ms >= last + 10_000 - && self.last_log_ms + && self + .last_log_ms .compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { @@ -83,8 +84,13 @@ pub fn spinner(label: &str) -> TracedBar { ); pb.enable_steady_tick(Duration::from_millis(100)); TracedBar { - pb, label: label.to_string(), unit: String::new(), total: 0, - start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0), + pb, + label: label.to_string(), + unit: String::new(), + total: 0, + start: Instant::now(), + last_pct: AtomicU64::new(0), + last_log_ms: AtomicU64::new(0), } } @@ -101,8 +107,13 @@ pub fn progress_bar(label: &str, n: u64, unit: &str) -> TracedBar { ); pb.enable_steady_tick(Duration::from_millis(100)); TracedBar { - pb, label: label.to_string(), unit: unit.to_string(), total: n, - start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0), + pb, + label: label.to_string(), + unit: unit.to_string(), + total: n, + start: Instant::now(), + last_pct: AtomicU64::new(0), + last_log_ms: AtomicU64::new(0), } } @@ -204,13 +215,19 @@ fn tv_to_secs(tv: timeval) -> f64 { } #[cfg(target_os = "macos")] -fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 } +fn rss_to_bytes(ru: &rusage) -> u64 { + ru.ru_maxrss as u64 +} #[cfg(not(target_os = "macos"))] -fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 } +fn rss_to_bytes(ru: &rusage) -> u64 { + ru.ru_maxrss as u64 * 1024 +} // Monotonically increasing counters — negative delta would be a kernel bug. -fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 } +fn delta(end: i64, start: i64) -> u64 { + (end - start).max(0) as u64 +} // ── CpuSample ───────────────────────────────────────────────────────────────── @@ -218,31 +235,60 @@ fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 } /// Use [`cpu_efficiency`](Self::cpu_efficiency) to measure the fraction of /// available cores used since the snapshot was taken. pub struct CpuSample { - wall: Instant, + wall: Instant, user_secs: f64, - sys_secs: f64, + sys_secs: f64, + previous: f64, } impl CpuSample { pub fn now() -> Self { let ru = get_rusage(); Self { - wall: Instant::now(), + wall: Instant::now(), user_secs: tv_to_secs(ru.ru_utime), - sys_secs: tv_to_secs(ru.ru_stime), + sys_secs: tv_to_secs(ru.ru_stime), + previous: 0.0, } } /// (user_delta + sys_delta) / (wall_delta × n_cores) since this snapshot. /// Returns 0.0 if less than 100 ms have elapsed (too noisy). pub fn cpu_efficiency(&self, n_cores: usize) -> f64 { - let ru = get_rusage(); + let ru = get_rusage(); let wall = self.wall.elapsed().as_secs_f64(); - if wall < 0.1 { return 0.0; } - let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs) - + (tv_to_secs(ru.ru_stime) - self.sys_secs); + if wall < 0.1 { + return 0.0; + } + let cpu = + (tv_to_secs(ru.ru_utime) - self.user_secs) + (tv_to_secs(ru.ru_stime) - self.sys_secs); cpu / (wall * n_cores as f64) } + + pub fn do_i_activate(&mut self, threshold: f64) -> bool { + let n = CpuSample::now(); + let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs); + let delta_wall = self.wall.elapsed().as_secs_f64(); + + let efficiency = delta_ru / delta_wall; + let activate = 0f64.max(efficiency - self.previous) >= threshold; + + if activate { + debug!( + "Do I activate : {} -> {} = {} Activate: {}", + self.previous, + efficiency, + 0f64.max(efficiency - self.previous), + activate + ); + self.previous = efficiency; + self.user_secs = n.user_secs; + self.sys_secs = n.sys_secs; + self.wall = n.wall; + } + + activate + } } // ── public API ──────────────────────────────────────────────────────────────── @@ -251,33 +297,37 @@ impl CpuSample { #[must_use = "call .stop() to record the stage"] pub struct Stage { label: String, - wall: Instant, - ru: rusage, + wall: Instant, + ru: rusage, } impl Stage { pub fn start(label: impl Into) -> Self { let label = label.into(); info!(stage = %label, "started"); - Self { label, wall: Instant::now(), ru: get_rusage() } + Self { + label, + wall: Instant::now(), + ru: get_rusage(), + } } pub fn stop(self) -> StageStats { let wall_secs = self.wall.elapsed().as_secs_f64(); let end = get_rusage(); let stats = StageStats { - label: self.label, + label: self.label, wall_secs, - user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime), - sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime), + user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime), + sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime), max_rss_bytes: rss_to_bytes(&end), - minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64), - major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64), - vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64), - invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64), - in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64), - out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64), - swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64), + minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64), + major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64), + vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64), + invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64), + in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64), + out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64), + swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64), }; info!( stage = %stats.label, @@ -299,27 +349,30 @@ impl Stage { /// Per-stage efficiency metrics collected from `getrusage(RUSAGE_SELF)` deltas. pub struct StageStats { - pub label: String, - pub wall_secs: f64, - pub user_secs: f64, - pub sys_secs: f64, + pub label: String, + pub wall_secs: f64, + pub user_secs: f64, + pub sys_secs: f64, /// Peak RSS at end of stage (bytes). ru_maxrss is a process-lifetime maximum, /// so this reflects the high-water mark up to and including this stage. pub max_rss_bytes: u64, - pub minor_faults: u64, - pub major_faults: u64, - pub vol_ctx: u64, // voluntary context switches - pub invol_ctx: u64, // involuntary context switches - pub in_blocks: u64, // filesystem block reads (after page cache) - pub out_blocks: u64, // filesystem block writes - pub swaps: u64, + pub minor_faults: u64, + pub major_faults: u64, + pub vol_ctx: u64, // voluntary context switches + pub invol_ctx: u64, // involuntary context switches + pub in_blocks: u64, // filesystem block reads (after page cache) + pub out_blocks: u64, // filesystem block writes + pub swaps: u64, } impl StageStats { /// (user + sys) / wall — effective thread count utilisation. pub fn parallelism(&self) -> f64 { - if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs } - else { 0.0 } + if self.wall_secs > 1e-9 { + (self.user_secs + self.sys_secs) / self.wall_secs + } else { + 0.0 + } } /// parallelism / n_cores — fraction of available CPU power used (0..1+). @@ -335,25 +388,33 @@ pub struct Reporter { } impl Reporter { - pub fn new() -> Self { Self::default() } - pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); } - pub fn stages(&self) -> &[StageStats] { &self.stages } + pub fn new() -> Self { + Self::default() + } + pub fn push(&mut self, stats: StageStats) { + self.stages.push(stats); + } + pub fn stages(&self) -> &[StageStats] { + &self.stages + } /// Print the summary to stderr. - pub fn print(&self) { eprint!("{self}"); } + pub fn print(&self) { + eprint!("{self}"); + } } // ── diagnosis ───────────────────────────────────────────────────────────────── struct Diagnosis { - tag: &'static str, + tag: &'static str, detail: Option, } // Thresholds are intentionally conservative to avoid false positives. fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis { - let eff = s.efficiency(n_cores); + let eff = s.efficiency(n_cores); let cpu_pct = eff * 100.0; - let io_ops = s.in_blocks + s.out_blocks; + let io_ops = s.in_blocks + s.out_blocks; // swaps > 0 is the only reliable cross-platform indicator of true RAM exhaustion. // ru_majflt is intentionally excluded: on macOS it counts all file-backed mmap @@ -387,26 +448,43 @@ fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis { )), }; } - Diagnosis { tag: "—", detail: None } + Diagnosis { + tag: "—", + detail: None, + } } // ── display helpers ─────────────────────────────────────────────────────────── fn fmt_secs(s: f64) -> String { - if s >= 100.0 { format!("{:.0}s", s) } - else if s >= 10.0 { format!("{:.1}s", s) } - else if s >= 1.0 { format!("{:.2}s", s) } - else { format!("{:.0}ms", s * 1000.0) } + if s >= 100.0 { + format!("{:.0}s", s) + } else if s >= 10.0 { + format!("{:.1}s", s) + } else if s >= 1.0 { + format!("{:.2}s", s) + } else { + format!("{:.0}ms", s * 1000.0) + } } fn fmt_bytes(b: u64) -> String { - if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) } - else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) } - else { format!("{:.0} KB", b as f64 / 1024.0) } + if b >= 1 << 30 { + format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) + } else if b >= 1 << 20 { + format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) + } else { + format!("{:.0} KB", b as f64 / 1024.0) + } } fn fmt_efficiency(par: f64, n_cores: usize) -> String { - format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0) + format!( + "{:.1}×/{} ({:.0}%)", + par, + n_cores, + par / n_cores as f64 * 100.0 + ) } // ── Display ─────────────────────────────────────────────────────────────────── @@ -414,8 +492,8 @@ fn fmt_efficiency(par: f64, n_cores: usize) -> String { // ── MemoryBudget ────────────────────────────────────────────────────────────── struct BudgetInner { - remaining: u64, - active: usize, + remaining: u64, + active: usize, peak_active: usize, } @@ -425,8 +503,8 @@ struct BudgetInner { /// completion. Non-deadlock guarantee: when no worker is active the next /// acquire always succeeds regardless of cost vs. remaining budget. pub struct MemoryBudget { - total: u64, - inner: Mutex, + total: u64, + inner: Mutex, condvar: Condvar, } @@ -434,7 +512,11 @@ impl MemoryBudget { pub fn new(total: u64) -> Self { Self { total, - inner: Mutex::new(BudgetInner { remaining: total, active: 0, peak_active: 0 }), + inner: Mutex::new(BudgetInner { + remaining: total, + active: 0, + peak_active: 0, + }), condvar: Condvar::new(), } } @@ -443,9 +525,9 @@ impl MemoryBudget { let mut g = self.inner.lock().unwrap(); loop { if g.active == 0 || g.remaining >= cost { - g.remaining = g.remaining.saturating_sub(cost); - g.active += 1; - g.peak_active = g.peak_active.max(g.active); + g.remaining = g.remaining.saturating_sub(cost); + g.active += 1; + g.peak_active = g.peak_active.max(g.active); return; } g = self.condvar.wait(g).unwrap(); @@ -455,47 +537,66 @@ impl MemoryBudget { pub fn release(&self, cost: u64) { let mut g = self.inner.lock().unwrap(); g.remaining = (g.remaining + cost).min(self.total); - g.active -= 1; + g.active -= 1; self.condvar.notify_all(); } - pub fn total(&self) -> u64 { self.total } - pub fn active(&self) -> usize { self.inner.lock().unwrap().active } - pub fn remaining(&self) -> u64 { self.inner.lock().unwrap().remaining } - pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active } + pub fn total(&self) -> u64 { + self.total + } + pub fn active(&self) -> usize { + self.inner.lock().unwrap().active + } + pub fn remaining(&self) -> u64 { + self.inner.lock().unwrap().remaining + } + pub fn peak_active(&self) -> usize { + self.inner.lock().unwrap().peak_active + } } // ── Display ─────────────────────────────────────────────────────────────────── impl fmt::Display for Reporter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if self.stages.is_empty() { return Ok(()); } + if self.stages.is_empty() { + return Ok(()); + } let n_cores = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1); // column widths - let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5); + let nw = self + .stages + .iter() + .map(|s| s.label.len()) + .max() + .unwrap_or(5) + .max(5); // efficiency col: worst-case width for this run's n_cores value let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len(); let sep_w = nw + 2 + 7 + 2 + ew + 2 + 8 + 2 + 12; - let sep = "─".repeat(sep_w); + let sep = "─".repeat(sep_w); // header - writeln!(f, "{:7} {:>ew$} {:>8} status", - "stage", "wall", "efficiency", "peak RSS")?; + writeln!( + f, + "{:7} {:>ew$} {:>8} status", + "stage", "wall", "efficiency", "peak RSS" + )?; writeln!(f, "{sep}")?; // compute all diagnoses up front (needed for both table and footnotes) - let diagnoses: Vec = self.stages.iter() - .map(|s| diagnose(s, n_cores)) - .collect(); + let diagnoses: Vec = self.stages.iter().map(|s| diagnose(s, n_cores)).collect(); // per-stage rows for (s, d) in self.stages.iter().zip(diagnoses.iter()) { - writeln!(f, "{:7} {:>ew$} {:>8} {}", + writeln!( + f, + "{:7} {:>ew$} {:>8} {}", s.label, fmt_secs(s.wall_secs), fmt_efficiency(s.parallelism(), n_cores), @@ -505,14 +606,21 @@ impl fmt::Display for Reporter { } // totals - let tw = self.stages.iter().map(|s| s.wall_secs).sum::(); - let tu = self.stages.iter().map(|s| s.user_secs).sum::(); - let ts = self.stages.iter().map(|s| s.sys_secs).sum::(); - let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0); + let tw = self.stages.iter().map(|s| s.wall_secs).sum::(); + let tu = self.stages.iter().map(|s| s.user_secs).sum::(); + let ts = self.stages.iter().map(|s| s.sys_secs).sum::(); + let trss = self + .stages + .iter() + .map(|s| s.max_rss_bytes) + .max() + .unwrap_or(0); let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 }; writeln!(f, "{sep}")?; - writeln!(f, "{:7} {:>ew$} {:>8}", + writeln!( + f, + "{:7} {:>ew$} {:>8}", "TOTAL", fmt_secs(tw), fmt_efficiency(tpar, n_cores), @@ -520,7 +628,9 @@ impl fmt::Display for Reporter { )?; // bottleneck footnotes (only if at least one anomaly detected) - let bottlenecks: Vec<(&str, &str)> = self.stages.iter() + let bottlenecks: Vec<(&str, &str)> = self + .stages + .iter() .zip(diagnoses.iter()) .filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det))) .collect(); -- 2.52.0