feat(obisys): Add stage-based performance profiler

Establishes the `obisys` crate using Rust 2024 and the `libc` dependency. Introduces a lightweight profiler that captures wall-clock time and `getrusage` system metrics per pipeline stage. Automatically computes parallelism and efficiency ratios, detects bottlenecks such as memory pressure and disk I/O, and prints a formatted diagnostic summary to stderr.
This commit is contained in:
Eric Coissac
2026-05-17 15:36:20 +08:00
parent 4736a7b6de
commit d0c277d5b6
2 changed files with 254 additions and 0 deletions
+7
View File
@@ -0,0 +1,7 @@
[package]
name = "obisys"
version = "0.1.0"
edition = "2024"
[dependencies]
libc = "0.2"
+247
View File
@@ -0,0 +1,247 @@
use std::fmt;
use std::time::Instant;
use libc::{RUSAGE_SELF, getrusage, rusage, timeval};
// ── raw helpers ───────────────────────────────────────────────────────────────
fn get_rusage() -> rusage {
let mut ru = unsafe { std::mem::zeroed::<rusage>() };
unsafe { getrusage(RUSAGE_SELF, &mut ru) };
ru
}
fn tv_to_secs(tv: timeval) -> f64 {
tv.tv_sec as f64 + tv.tv_usec as f64 * 1e-6
}
#[cfg(target_os = "macos")]
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 }
#[cfg(not(target_os = "macos"))]
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 }
// Monotonically increasing counters — negative delta would be a kernel bug.
fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
// ── public API ────────────────────────────────────────────────────────────────
/// Snapshot taken at the start of a pipeline stage.
#[must_use = "call .stop() to record the stage"]
pub struct Stage {
label: String,
wall: Instant,
ru: rusage,
}
impl Stage {
pub fn start(label: impl Into<String>) -> Self {
Self { label: label.into(), wall: Instant::now(), ru: get_rusage() }
}
pub fn stop(self) -> StageStats {
let wall_secs = self.wall.elapsed().as_secs_f64();
let end = get_rusage();
StageStats {
label: self.label,
wall_secs,
user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime),
sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime),
max_rss_bytes: rss_to_bytes(&end),
minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64),
major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64),
vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64),
invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64),
in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64),
out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64),
swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64),
}
}
}
/// Per-stage efficiency metrics collected from `getrusage(RUSAGE_SELF)` deltas.
pub struct StageStats {
pub label: String,
pub wall_secs: f64,
pub user_secs: f64,
pub sys_secs: f64,
/// Peak RSS at end of stage (bytes). ru_maxrss is a process-lifetime maximum,
/// so this reflects the high-water mark up to and including this stage.
pub max_rss_bytes: u64,
pub minor_faults: u64,
pub major_faults: u64,
pub vol_ctx: u64, // voluntary context switches
pub invol_ctx: u64, // involuntary context switches
pub in_blocks: u64, // filesystem block reads (after page cache)
pub out_blocks: u64, // filesystem block writes
pub swaps: u64,
}
impl StageStats {
/// (user + sys) / wall — effective thread count utilisation.
pub fn parallelism(&self) -> f64 {
if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs }
else { 0.0 }
}
/// parallelism / n_cores — fraction of available CPU power used (0..1+).
pub fn efficiency(&self, n_cores: usize) -> f64 {
self.parallelism() / n_cores as f64
}
}
/// Accumulates stage stats and prints a human-readable summary table.
#[derive(Default)]
pub struct Reporter {
stages: Vec<StageStats>,
}
impl Reporter {
pub fn new() -> Self { Self::default() }
pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); }
pub fn stages(&self) -> &[StageStats] { &self.stages }
/// Print the summary to stderr.
pub fn print(&self) { eprint!("{self}"); }
}
// ── diagnosis ─────────────────────────────────────────────────────────────────
struct Diagnosis {
tag: &'static str,
detail: Option<String>,
}
// Thresholds are intentionally conservative to avoid false positives.
fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
let eff = s.efficiency(n_cores);
let cpu_pct = eff * 100.0;
let io_ops = s.in_blocks + s.out_blocks;
if s.swaps > 0 {
return Diagnosis {
tag: "swapping",
detail: Some(format!(
"swapped {} time(s) — severe memory pressure, consider increasing RAM",
s.swaps,
)),
};
}
if s.major_faults > 10 {
return Diagnosis {
tag: "mem pressure",
detail: Some(format!(
"{} major page faults — working set exceeds available RAM",
s.major_faults,
)),
};
}
if eff < 0.3 && io_ops > 100 {
return Diagnosis {
tag: "disk I/O",
detail: Some(format!(
"{} block reads + {} writes — CPU at {:.0}%, stage is I/O-bound",
s.in_blocks, s.out_blocks, cpu_pct,
)),
};
}
if eff < 0.3 && s.vol_ctx > 200 {
return Diagnosis {
tag: "contention",
detail: Some(format!(
"{} voluntary context switches — CPU at {:.0}%, possible lock contention or I/O wait",
s.vol_ctx, cpu_pct,
)),
};
}
Diagnosis { tag: "", detail: None }
}
// ── display helpers ───────────────────────────────────────────────────────────
fn fmt_secs(s: f64) -> String {
if s >= 100.0 { format!("{:.0}s", s) }
else if s >= 10.0 { format!("{:.1}s", s) }
else if s >= 1.0 { format!("{:.2}s", s) }
else { format!("{:.0}ms", s * 1000.0) }
}
fn fmt_bytes(b: u64) -> String {
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) }
else { format!("{:.0} KB", b as f64 / 1024.0) }
}
fn fmt_efficiency(par: f64, n_cores: usize) -> String {
format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0)
}
// ── Display ───────────────────────────────────────────────────────────────────
impl fmt::Display for Reporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.stages.is_empty() { return Ok(()); }
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
// column widths
let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5);
// efficiency col: worst-case width for this run's n_cores value
let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len();
let sep_w = nw + 2 + 7 + 2 + ew + 2 + 8 + 2 + 12;
let sep = "".repeat(sep_w);
// header
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} status",
"stage", "wall", "efficiency", "peak RSS")?;
writeln!(f, "{sep}")?;
// compute all diagnoses up front (needed for both table and footnotes)
let diagnoses: Vec<Diagnosis> = self.stages.iter()
.map(|s| diagnose(s, n_cores))
.collect();
// per-stage rows
for (s, d) in self.stages.iter().zip(diagnoses.iter()) {
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} {}",
s.label,
fmt_secs(s.wall_secs),
fmt_efficiency(s.parallelism(), n_cores),
fmt_bytes(s.max_rss_bytes),
d.tag,
)?;
}
// totals
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0);
let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 };
writeln!(f, "{sep}")?;
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8}",
"TOTAL",
fmt_secs(tw),
fmt_efficiency(tpar, n_cores),
fmt_bytes(trss),
)?;
// bottleneck footnotes (only if at least one anomaly detected)
let bottlenecks: Vec<(&str, &str)> = self.stages.iter()
.zip(diagnoses.iter())
.filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det)))
.collect();
if !bottlenecks.is_empty() {
writeln!(f, "\nBottlenecks:")?;
for (label, detail) in &bottlenecks {
writeln!(f, " {label} — {detail}")?;
}
}
Ok(())
}
}