diff --git a/src/obisys/Cargo.toml b/src/obisys/Cargo.toml new file mode 100644 index 0000000..a34b9db --- /dev/null +++ b/src/obisys/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "obisys" +version = "0.1.0" +edition = "2024" + +[dependencies] +libc = "0.2" diff --git a/src/obisys/src/lib.rs b/src/obisys/src/lib.rs new file mode 100644 index 0000000..d058467 --- /dev/null +++ b/src/obisys/src/lib.rs @@ -0,0 +1,247 @@ +use std::fmt; +use std::time::Instant; + +use libc::{RUSAGE_SELF, getrusage, rusage, timeval}; + +// ── raw helpers ─────────────────────────────────────────────────────────────── + +fn get_rusage() -> rusage { + let mut ru = unsafe { std::mem::zeroed::() }; + unsafe { getrusage(RUSAGE_SELF, &mut ru) }; + ru +} + +fn tv_to_secs(tv: timeval) -> f64 { + tv.tv_sec as f64 + tv.tv_usec as f64 * 1e-6 +} + +#[cfg(target_os = "macos")] +fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 } + +#[cfg(not(target_os = "macos"))] +fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 } + +// Monotonically increasing counters — negative delta would be a kernel bug. +fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 } + +// ── public API ──────────────────────────────────────────────────────────────── + +/// Snapshot taken at the start of a pipeline stage. +#[must_use = "call .stop() to record the stage"] +pub struct Stage { + label: String, + wall: Instant, + ru: rusage, +} + +impl Stage { + pub fn start(label: impl Into) -> Self { + Self { label: label.into(), wall: Instant::now(), ru: get_rusage() } + } + + pub fn stop(self) -> StageStats { + let wall_secs = self.wall.elapsed().as_secs_f64(); + let end = get_rusage(); + StageStats { + label: self.label, + wall_secs, + user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime), + sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime), + max_rss_bytes: rss_to_bytes(&end), + minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64), + major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64), + vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64), + invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64), + in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64), + out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64), + swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64), + } + } +} + +/// Per-stage efficiency metrics collected from `getrusage(RUSAGE_SELF)` deltas. +pub struct StageStats { + pub label: String, + pub wall_secs: f64, + pub user_secs: f64, + pub sys_secs: f64, + /// Peak RSS at end of stage (bytes). ru_maxrss is a process-lifetime maximum, + /// so this reflects the high-water mark up to and including this stage. + pub max_rss_bytes: u64, + pub minor_faults: u64, + pub major_faults: u64, + pub vol_ctx: u64, // voluntary context switches + pub invol_ctx: u64, // involuntary context switches + pub in_blocks: u64, // filesystem block reads (after page cache) + pub out_blocks: u64, // filesystem block writes + pub swaps: u64, +} + +impl StageStats { + /// (user + sys) / wall — effective thread count utilisation. + pub fn parallelism(&self) -> f64 { + if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs } + else { 0.0 } + } + + /// parallelism / n_cores — fraction of available CPU power used (0..1+). + pub fn efficiency(&self, n_cores: usize) -> f64 { + self.parallelism() / n_cores as f64 + } +} + +/// Accumulates stage stats and prints a human-readable summary table. +#[derive(Default)] +pub struct Reporter { + stages: Vec, +} + +impl Reporter { + pub fn new() -> Self { Self::default() } + pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); } + pub fn stages(&self) -> &[StageStats] { &self.stages } + /// Print the summary to stderr. + pub fn print(&self) { eprint!("{self}"); } +} + +// ── diagnosis ───────────────────────────────────────────────────────────────── + +struct Diagnosis { + tag: &'static str, + detail: Option, +} + +// Thresholds are intentionally conservative to avoid false positives. +fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis { + let eff = s.efficiency(n_cores); + let cpu_pct = eff * 100.0; + let io_ops = s.in_blocks + s.out_blocks; + + if s.swaps > 0 { + return Diagnosis { + tag: "swapping", + detail: Some(format!( + "swapped {} time(s) — severe memory pressure, consider increasing RAM", + s.swaps, + )), + }; + } + if s.major_faults > 10 { + return Diagnosis { + tag: "mem pressure", + detail: Some(format!( + "{} major page faults — working set exceeds available RAM", + s.major_faults, + )), + }; + } + if eff < 0.3 && io_ops > 100 { + return Diagnosis { + tag: "disk I/O", + detail: Some(format!( + "{} block reads + {} writes — CPU at {:.0}%, stage is I/O-bound", + s.in_blocks, s.out_blocks, cpu_pct, + )), + }; + } + if eff < 0.3 && s.vol_ctx > 200 { + return Diagnosis { + tag: "contention", + detail: Some(format!( + "{} voluntary context switches — CPU at {:.0}%, possible lock contention or I/O wait", + s.vol_ctx, cpu_pct, + )), + }; + } + Diagnosis { tag: "—", detail: None } +} + +// ── display helpers ─────────────────────────────────────────────────────────── + +fn fmt_secs(s: f64) -> String { + if s >= 100.0 { format!("{:.0}s", s) } + else if s >= 10.0 { format!("{:.1}s", s) } + else if s >= 1.0 { format!("{:.2}s", s) } + else { format!("{:.0}ms", s * 1000.0) } +} + +fn fmt_bytes(b: u64) -> String { + if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) } + else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) } + else { format!("{:.0} KB", b as f64 / 1024.0) } +} + +fn fmt_efficiency(par: f64, n_cores: usize) -> String { + format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0) +} + +// ── Display ─────────────────────────────────────────────────────────────────── + +impl fmt::Display for Reporter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.stages.is_empty() { return Ok(()); } + + let n_cores = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + + // column widths + let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5); + // efficiency col: worst-case width for this run's n_cores value + let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len(); + + let sep_w = nw + 2 + 7 + 2 + ew + 2 + 8 + 2 + 12; + let sep = "─".repeat(sep_w); + + // header + writeln!(f, "{:7} {:>ew$} {:>8} status", + "stage", "wall", "efficiency", "peak RSS")?; + writeln!(f, "{sep}")?; + + // compute all diagnoses up front (needed for both table and footnotes) + let diagnoses: Vec = self.stages.iter() + .map(|s| diagnose(s, n_cores)) + .collect(); + + // per-stage rows + for (s, d) in self.stages.iter().zip(diagnoses.iter()) { + writeln!(f, "{:7} {:>ew$} {:>8} {}", + s.label, + fmt_secs(s.wall_secs), + fmt_efficiency(s.parallelism(), n_cores), + fmt_bytes(s.max_rss_bytes), + d.tag, + )?; + } + + // totals + let tw = self.stages.iter().map(|s| s.wall_secs).sum::(); + let tu = self.stages.iter().map(|s| s.user_secs).sum::(); + let ts = self.stages.iter().map(|s| s.sys_secs).sum::(); + let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0); + let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 }; + + writeln!(f, "{sep}")?; + writeln!(f, "{:7} {:>ew$} {:>8}", + "TOTAL", + fmt_secs(tw), + fmt_efficiency(tpar, n_cores), + fmt_bytes(trss), + )?; + + // bottleneck footnotes (only if at least one anomaly detected) + let bottlenecks: Vec<(&str, &str)> = self.stages.iter() + .zip(diagnoses.iter()) + .filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det))) + .collect(); + + if !bottlenecks.is_empty() { + writeln!(f, "\nBottlenecks:")?; + for (label, detail) in &bottlenecks { + writeln!(f, " {label} — {detail}")?; + } + } + + Ok(()) + } +}