20 Commits

Author SHA1 Message Date
Eric Coissac 45ed2bc9b8 ci: fix registry auth and bump obikmer to 1.1.30
Release / create-release (push) Successful in 2m26s
Release / build-linux-x86_64 (push) Successful in 8m12s
Release / build-macos-arm64 (push) Failing after 1m55s
CI / build (pull_request) Successful in 3m32s
Update the release workflow to explicitly resolve the Docker registry username from repository secrets instead of inferring it from the runner's actor. Bump the obikmer package version to 1.1.30.
2026-07-01 14:31:30 +02:00
coissac aa126fd89d Merge pull request 'feat: simplify worker spawning logic and update macOS build workflow' (#52) from push-uvmlknmzqqnx into main
Reviewed-on: #52
2026-07-01 09:50:51 +00:00
Eric Coissac c612132763 feat: simplify worker spawning logic and update macOS build workflow
Release / create-release (push) Successful in 2m59s
Release / build-linux-x86_64 (push) Successful in 8m13s
Release / build-macos-arm64 (push) Failing after 8s
CI / build (pull_request) Successful in 3m24s
Updates the release workflow to run macOS builds inside a Docker container with explicit registry authentication and adjusted artifact paths. Bumps the obikmer crate version to 1.1.29 and adds *.log to .gitignore. Simplifies NUMA worker spawning by lowering the activation threshold from 0.95 to 0.2, replacing complex stateful tracking with a direct efficiency check, and downgrading progress logging to debug level. Includes general code formatting improvements for readability.
2026-07-01 11:40:57 +02:00
coissac 19660f8cd0 Merge pull request 'ci: update registry auth and improve adaptive worker scaling' (#51) from push-qlpywtroutvx into main
Reviewed-on: #51
2026-06-26 13:16:23 +00:00
Eric Coissac 7b07540a69 ci: update registry auth and improve adaptive worker scaling
Release / create-release (push) Successful in 2m27s
CI / build (pull_request) Successful in 3m17s
Release / build-linux-x86_64 (push) Successful in 8m3s
Release / build-macos-arm64 (push) Failing after 1s
Refactor the release workflow to use a structured container object with authenticated pulls for macOS ARM64 builds. Replace single-worker activation with dynamic upfront provisioning based on node and worker counts. Implement an absolute efficiency gain threshold for scaling checks and add early termination to improve adaptive scaling stability. Bump obikmer crate version to 1.1.27.
2026-06-26 15:13:13 +02:00
coissac 89c43e28f5 Merge pull request 'ci: update release workflow and bump obikmer to 1.1.26' (#50) from push-npttlqpomtvz into main
Reviewed-on: #50
2026-06-24 13:55:40 +00:00
Eric Coissac b9b2e42ad2 ci: update release workflow and bump obikmer to 1.1.26
Release / create-release (push) Successful in 2m32s
CI / build (pull_request) Successful in 3m47s
Release / build-linux-x86_64 (push) Successful in 8m18s
Release / build-macos-arm64 (push) Failing after 0s
Replaces the macOS ARM64 cross-compilation container with a custom internal registry image. Adds explicit steps to install the `aarch64-apple-darwin` Rust target and `jq`, and updates the build command to use `--no-default-features`. Bumps the `obikmer` package version from 1.1.25 to 1.1.26.
2026-06-24 15:55:02 +02:00
coissac ca42fdff2f Merge pull request 'ci: update macOS ARM64 build workflow and bump obikmer version' (#49) from push-lllnsqlrqrut into main
Reviewed-on: #49
2026-06-23 13:15:20 +00:00
Eric Coissac 136cd89efb ci: update macOS ARM64 build workflow and bump obikmer version
Release / create-release (push) Successful in 2m27s
Release / build-linux-x86_64 (push) Successful in 7m52s
Release / build-macos-arm64 (push) Failing after 8m53s
CI / build (pull_request) Successful in 5m31s
Replace manual Zig/cargo-zigbuild setup with a pre-configured Docker container (`joseluisq/rust-linux-darwin-builder`). Use explicit Clang cross-compilers for native macOS ARM64 compilation. Bump the `obikmer` package version to 1.1.25.
2026-06-23 15:01:17 +02:00
coissac a4bbf607b7 Merge pull request 'Push kxsopnzprltv' (#48) from push-kxsopnzprltv into main
Reviewed-on: #48
2026-06-23 09:51:33 +00:00
Eric Coissac 9927100a1c chore: update obikmer to 1.1.24
Release / create-release (push) Successful in 2m24s
Release / build-linux-x86_64 (push) Successful in 7m49s
Release / build-macos-arm64 (push) Failing after 3m31s
CI / build (pull_request) Successful in 3m22s
Bumps the obikmer version in Cargo.toml from 1.1.21 to 1.1.24 and updates Cargo.lock to align with the upstream patch release (1.1.23). This ensures consistent dependency resolution across builds.
2026-06-23 11:47:54 +02:00
Eric Coissac 527258f822 ci: enforce macOS 11.0 deployment target for ARM builds
Adds MACOSX_DEPLOYMENT_TARGET=11.0 environment variable and updates the cargo zigbuild target to aarch64-apple-darwin11.0 to explicitly require macOS 11.0 for ARM binary compilation.
2026-06-23 11:46:09 +02:00
coissac ef62f1947e Merge pull request 'chore: bump version to 1.1.21 and update obikindex features' (#47) from push-xwutoxpnxorz into main
Reviewed-on: #47
2026-06-23 08:31:31 +00:00
Eric Coissac d02316dcf6 chore: bump version to 1.1.21 and update obikindex features
Release / create-release (push) Successful in 2m29s
CI / build (pull_request) Successful in 4m36s
Release / build-linux-x86_64 (push) Successful in 10m25s
Release / build-macos-arm64 (push) Failing after 4m50s
Disables default features for the `obikindex` dependency and introduces a `[features]` block. The new `numa` feature is set as the default, conditionally enabling NUMA support in `obikindex`.
2026-06-23 10:30:39 +02:00
coissac c323b3eaef Merge pull request 'Bump obikmer to 1.1.20 and update release workflow' (#46) from push-wpnywwlwxrps into main
Reviewed-on: #46
2026-06-23 08:03:58 +00:00
Eric Coissac b77d8e9ca0 Bump obikmer to 1.1.20 and update release workflow
Release / create-release (push) Successful in 2m26s
CI / build (pull_request) Successful in 3m14s
Release / build-linux-x86_64 (push) Successful in 7m44s
Release / build-macos-arm64 (push) Failing after 7m13s
Update the Gitea release workflow to fetch a full git clone with complete history, ensuring all commits and tags are available for accurate version resolution. This prepares the repository for the standard patch-level release of obikmer v1.1.20.
2026-06-23 10:03:15 +02:00
coissac 7c5bab3694 Merge pull request 'fix(ci): restrict workflow to PRs and improve release tagging' (#45) from push-louqrszyuqpz into main
Reviewed-on: #45
2026-06-23 07:52:35 +00:00
Eric Coissac fab4e0d6de fix(ci): restrict workflow to PRs and improve release tagging
Release / create-release (push) Failing after 26s
Release / build-linux-x86_64 (push) Has been skipped
Release / build-macos-arm64 (push) Has been skipped
CI / build (pull_request) Successful in 3m17s
Restrict the CI pipeline to pull request events only by removing the unconfigured push trigger and eliminating a duplicate pull_request block in the workflow file. Update the Makefile to suppress stderr from the aichat command and introduce a fallback release tag message for robust version tagging. Additionally, bump the obikmer crate version to 1.1.19.
2026-06-23 09:42:23 +02:00
coissac 973a3f3d6e Merge pull request 'feat: add numa feature flag and automate release workflow' (#44) from push-uymxyvsyooro into main
CI / build (push) Successful in 3m16s
Reviewed-on: #44
2026-06-23 07:22:33 +00:00
Eric Coissac 1a839a295a feat: add numa feature flag and automate release workflow
Release / create-release (push) Failing after 37s
Release / build-linux-x86_64 (push) Has been skipped
Release / build-macos-arm64 (push) Has been skipped
CI / build (pull_request) Successful in 3m23s
Refactor the Gitea release pipeline to generate releases via API and upload binaries using a shared ID. Automate changelog generation by fetching recent commits with `jj log` and producing markdown notes via `aichat`. Convert `hwlocality` to an optional dependency gated by a default `numa` feature, providing fallback implementations for graceful degradation when NUMA support is disabled. Bump obikmer to 1.1.18.
2026-06-23 09:07:04 +02:00
9 changed files with 336 additions and 167 deletions
+1 -2
View File
@@ -1,9 +1,8 @@
name: CI
on:
push:
branches: ['main']
pull_request:
branches: ['main']
jobs:
build:
+70 -14
View File
@@ -6,7 +6,32 @@ on:
- "v*"
jobs:
build-linux-static:
create-release:
runs-on: ubuntu-latest
outputs:
release_id: ${{ steps.create.outputs.release_id }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Create Gitea release
id: create
env:
GITEA_TOKEN: ${{ secrets.GITEATOKEN }}
TAG: ${{ github.ref_name }}
run: |
sudo apt-get update -qq && sudo apt-get install -y -qq jq
body=$(git for-each-ref --format='%(contents)' "refs/tags/$TAG")
release_id=$(curl -s -X POST \
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases" \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"tag_name\":\"$TAG\",\"name\":\"$TAG\",\"body\":$(echo "$body" | jq -Rs .)}" | jq -r '.id')
echo "release_id=$release_id" >> $GITHUB_OUTPUT
build-linux-x86_64:
needs: create-release
runs-on: ubuntu-latest
defaults:
run:
@@ -45,23 +70,54 @@ jobs:
PKG_CONFIG_ALLOW_CROSS: "1"
run: cargo zigbuild --release --target x86_64-unknown-linux-musl
- name: Prepare artifact
- name: Prepare and upload artifact
env:
GITEA_TOKEN: ${{ secrets.GITEATOKEN }}
RELEASE_ID: ${{ needs.create-release.outputs.release_id }}
run: |
mkdir -p /tmp/dist
cp target/x86_64-unknown-linux-musl/release/obikmer /tmp/dist/obikmer-linux-x86_64
strip /tmp/dist/obikmer-linux-x86_64
- name: Create Gitea release and upload binary
env:
GITEA_TOKEN: ${{ secrets.GITEATOKEN }}
TAG: ${{ github.ref_name }}
run: |
release_id=$(curl -s -X POST \
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases" \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"tag_name\":\"$TAG\",\"name\":\"$TAG\"}" | jq -r '.id')
curl -s -X POST \
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$release_id/assets" \
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \
-H "Authorization: token $GITEA_TOKEN" \
-F "attachment=@/tmp/dist/obikmer-linux-x86_64"
build-macos-arm64:
needs: create-release
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Login to registry
run: echo "${{ secrets.REGISTRYTOKEN }}" | docker login registry.metabarcoding.org -u ${{ secrets.REGISTRYUSER }} --password-stdin
- name: Cache cargo registry
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
src/target
key: macos-arm64-cargo-${{ hashFiles('src/Cargo.lock') }}
restore-keys: macos-arm64-cargo-
- name: Build macOS binary
run: |
docker run --rm \
-v "${{ github.workspace }}:/src" \
-w /src/src \
registry.metabarcoding.org/cibuilder/rustcrossosx:latest \
cargo build --release --target aarch64-apple-darwin --no-default-features
- name: Prepare and upload artifact
env:
GITEA_TOKEN: ${{ secrets.GITEATOKEN }}
RELEASE_ID: ${{ needs.create-release.outputs.release_id }}
run: |
mkdir -p /tmp/dist
cp src/target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
curl -s -X POST \
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \
-H "Authorization: token $GITEA_TOKEN" \
-F "attachment=@/tmp/dist/obikmer-macos-arm64"
+1
View File
@@ -8,6 +8,7 @@ data-stress
*.pb
./**/*.json
*.bin
*.log
Betula_exilis--IGA-24-33
benchmark/genomes
benchmark/simulated_data
+5 -1
View File
@@ -90,5 +90,9 @@ release: bump-version
@jj git push --change @
@new_version=$$(grep '^version = ' $(CARGO_TOML) | head -n 1 | sed 's/version = "\(.*\)"/\1/'); \
git_hash=$$(jj log -r @ --no-graph -T 'commit_id'); \
git tag "v$$new_version" "$$git_hash" && \
commits=$$(jj log -r 'latest(tags())..@' --no-graph -T 'description ++ "\n"' 2>/dev/null || \
jj log --no-graph -T 'description ++ "\n"' --limit 30); \
notes=$$(printf 'Write concise markdown release notes for obikmer (a Rust kmer genomics tool). Be technical and direct. Base them strictly on these commit messages:\n\n%s' "$$commits" | aichat 2>/dev/null); \
tag_msg="$${notes:-Release v$$new_version}"; \
git tag -a "v$$new_version" -m "$$tag_msg" "$$git_hash" && \
git push origin "v$$new_version"
+1 -1
View File
@@ -1704,7 +1704,7 @@ dependencies = [
[[package]]
name = "obikmer"
version = "1.1.15"
version = "1.1.30"
dependencies = [
"clap",
"csv",
+5 -1
View File
@@ -17,4 +17,8 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
indicatif = "0.17"
tracing = "0.1.44"
hwlocality = { version = "1.0.0-alpha.11", features = ["vendored"] }
hwlocality = { version = "1.0.0-alpha.11", features = ["vendored"], optional = true }
[features]
default = ["numa"]
numa = ["hwlocality"]
+39 -46
View File
@@ -12,9 +12,13 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use crossbeam_channel::unbounded;
#[cfg(feature = "numa")]
use hwlocality::Topology;
#[cfg(feature = "numa")]
use hwlocality::cpu::binding::CpuBindingFlags;
#[cfg(feature = "numa")]
use hwlocality::cpu::cpuset::CpuSet;
#[cfg(feature = "numa")]
use hwlocality::object::types::ObjectType;
use obisys::CpuSample;
use tracing::debug;
@@ -40,6 +44,7 @@ impl NumaSetup {
/// Detect NUMA topology and build per-node Rayon pools.
/// Always succeeds: falls back to a single synthetic UMA node on failure.
#[cfg(feature = "numa")]
pub fn build() -> NumaSetup {
if let Ok(topology) = Topology::new() {
let nodes: Vec<Vec<usize>> = topology
@@ -81,8 +86,21 @@ pub fn build() -> NumaSetup {
}
}
#[cfg(not(feature = "numa"))]
pub fn build() -> NumaSetup {
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
debug!("UMA: single synthetic node, {} core(s)", n_cores);
NumaSetup {
pools: vec![None],
cpus_per_node: vec![(0..n_cores).collect()],
}
}
/// Bind the calling thread to `cpu_indices` using hwloc.
/// Silently returns on any error so the thread still runs, just unbound.
#[cfg(feature = "numa")]
pub fn pin_current_thread(cpu_indices: &[usize]) {
let Ok(topology) = Topology::new() else { return };
let mut cpuset = CpuSet::new();
@@ -92,8 +110,12 @@ pub fn pin_current_thread(cpu_indices: &[usize]) {
let _ = topology.bind_cpu(&cpuset, CpuBindingFlags::THREAD);
}
#[cfg(not(feature = "numa"))]
pub fn pin_current_thread(_cpu_indices: &[usize]) {}
// ── Internal helpers ──────────────────────────────────────────────────────────
#[cfg(feature = "numa")]
fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
let cpus = cpus.to_vec();
rayon::ThreadPoolBuilder::new()
@@ -195,13 +217,9 @@ impl PartitionRunner {
return Ok(());
}
const SPAWN_THRESHOLD: f64 = 0.95;
const SPAWN_THRESHOLD: f64 = 0.2;
const TIMER_SECS: u64 = 30;
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
// ── Channels ──────────────────────────────────────────────────────────
let (part_tx, part_rx) = unbounded::<usize>();
let (activate_tx, activate_rx) = unbounded::<()>();
@@ -265,11 +283,11 @@ impl PartitionRunner {
drop(event_tx);
// ── Controller ────────────────────────────────────────────────────
activate_tx.send(()).ok();
let mut n_active = 1usize;
let mut cpu_sample = CpuSample::now();
let mut eff_at_last_spawn = 0.0f64; // 0 = no previous spawn to evaluate
let mut completed = 0usize;
let initial_workers = n_nodes.min(max_workers).min(n_total);
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
let mut n_active = initial_workers;
let mut cpu_sample = CpuSample::now();
let mut completed = 0usize;
while completed < n_total {
let Ok(event) = event_rx.recv() else { break };
@@ -285,15 +303,13 @@ impl PartitionRunner {
// Inline check: same logic as a timer tick.
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, &mut eff_at_last_spawn,
n_cores, SPAWN_THRESHOLD, completed, n_total,
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
);
}
WorkerEvent::TimerTick => {
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, &mut eff_at_last_spawn,
n_cores, SPAWN_THRESHOLD, completed, n_total,
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
);
}
}
@@ -320,42 +336,19 @@ enum WorkerEvent<R, E> {
}
fn maybe_activate(
activate_tx: &crossbeam_channel::Sender<()>,
n_active: &mut usize,
max_workers: usize,
cpu_sample: &mut CpuSample,
eff_at_last_spawn: &mut f64,
n_cores: usize,
threshold: f64,
completed: usize,
n_total: usize,
activate_tx: &crossbeam_channel::Sender<()>,
n_active: &mut usize,
max_workers: usize,
cpu_sample: &mut CpuSample,
threshold: f64,
completed: usize,
n_total: usize,
) {
if *n_active >= max_workers || completed >= n_total { return; }
let eff = cpu_sample.cpu_efficiency(n_cores);
if eff >= threshold { return; } // CPU already saturated
// Check that the previous activation was beneficial enough.
// Going from k-1 → k workers, the minimum acceptable speedup is (k-1+0.2)/(k-1).
// For the very first extra worker (n_active == 1, no previous spawn), skip this
// check: eff_at_last_spawn == 0 acts as the sentinel.
let last_spawn_was_beneficial = if *eff_at_last_spawn < 1e-9 {
true // first additional worker: no prior data to evaluate
} else {
let k_before = (*n_active - 1) as f64;
let min_speedup = (k_before + 0.2) / k_before;
let actual_speedup = eff / *eff_at_last_spawn;
actual_speedup >= min_speedup
};
if last_spawn_was_beneficial {
if cpu_sample.do_i_activate(threshold) {
activate_tx.send(()).ok();
*eff_at_last_spawn = eff;
*n_active += 1;
*cpu_sample = CpuSample::now();
debug!(
"activated worker {}/{} — efficiency {:.0}%",
n_active, max_workers, eff * 100.0,
);
debug!("activated worker {}/{}", n_active, max_workers);
}
}
+4 -2
View File
@@ -1,6 +1,6 @@
[package]
name = "obikmer"
version = "1.1.15"
version = "1.1.30"
edition = "2024"
[[bin]]
@@ -18,7 +18,7 @@ obikrope = { path = "../obikrope" }
obikpartitionner = { path = "../obikpartitionner" }
obisys = { path = "../obisys" }
obiskio = { path = "../obiskio" }
obikindex = { path = "../obikindex" }
obikindex = { path = "../obikindex", default-features = false }
obitaxonomy = { path = "../obitaxonomy" }
obilayeredmap = { path = "../obilayeredmap" }
clap = { version = "4", features = ["derive"] }
@@ -33,4 +33,6 @@ tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
pprof = { version = "0.13", features = ["prost-codec"], optional = true }
[features]
default = ["numa"]
numa = ["obikindex/numa"]
profiling = ["dep:pprof"]
+210 -100
View File
@@ -4,7 +4,7 @@ use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};
use indicatif::{ProgressBar, ProgressStyle};
use tracing::{info, warn};
use tracing::{debug, info, warn};
const BRAILLE: &[&str] = &["", "", "", "", "", "", "", "", "", ""];
@@ -14,24 +14,25 @@ const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧
/// a TTY (e.g. HPC job logs): every 10% for bounded bars, every ~10 s for
/// spinners (throttled on `set_message`).
pub struct TracedBar {
pb: ProgressBar,
label: String,
unit: String,
total: u64, // 0 for spinners
start: Instant, // creation time, for spinner throttling
last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet
last_log_ms: AtomicU64, // ms since `start` at last spinner log
pb: ProgressBar,
label: String,
unit: String,
total: u64, // 0 for spinners
start: Instant, // creation time, for spinner throttling
last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet
last_log_ms: AtomicU64, // ms since `start` at last spinner log
}
impl TracedBar {
pub fn inc(&self, delta: u64) {
self.pb.inc(delta);
if self.pb.is_hidden() && self.total > 0 {
let pos = self.pb.position();
let pos = self.pb.position();
let pct10 = (pos * 10) / self.total; // 0..=10
let last = self.last_pct.load(Ordering::Relaxed);
let last = self.last_pct.load(Ordering::Relaxed);
if pct10 > last
&& self.last_pct
&& self
.last_pct
.compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
@@ -49,14 +50,14 @@ impl TracedBar {
let msg = msg.into();
if self.pb.is_hidden() {
if self.total > 0 {
// bounded bar: always log (already rate-limited by 10% threshold in inc)
info!(stage = %self.label, "{msg}");
debug!(stage = %self.label, "{msg}");
} else {
// spinner: throttle to ~10 s
let now_ms = self.start.elapsed().as_millis() as u64;
let last = self.last_log_ms.load(Ordering::Relaxed);
let last = self.last_log_ms.load(Ordering::Relaxed);
if now_ms >= last + 10_000
&& self.last_log_ms
&& self
.last_log_ms
.compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
@@ -83,8 +84,13 @@ pub fn spinner(label: &str) -> TracedBar {
);
pb.enable_steady_tick(Duration::from_millis(100));
TracedBar {
pb, label: label.to_string(), unit: String::new(), total: 0,
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
pb,
label: label.to_string(),
unit: String::new(),
total: 0,
start: Instant::now(),
last_pct: AtomicU64::new(0),
last_log_ms: AtomicU64::new(0),
}
}
@@ -101,8 +107,13 @@ pub fn progress_bar(label: &str, n: u64, unit: &str) -> TracedBar {
);
pb.enable_steady_tick(Duration::from_millis(100));
TracedBar {
pb, label: label.to_string(), unit: unit.to_string(), total: n,
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
pb,
label: label.to_string(),
unit: unit.to_string(),
total: n,
start: Instant::now(),
last_pct: AtomicU64::new(0),
last_log_ms: AtomicU64::new(0),
}
}
@@ -204,13 +215,19 @@ fn tv_to_secs(tv: timeval) -> f64 {
}
#[cfg(target_os = "macos")]
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 }
fn rss_to_bytes(ru: &rusage) -> u64 {
ru.ru_maxrss as u64
}
#[cfg(not(target_os = "macos"))]
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 }
fn rss_to_bytes(ru: &rusage) -> u64 {
ru.ru_maxrss as u64 * 1024
}
// Monotonically increasing counters — negative delta would be a kernel bug.
fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
fn delta(end: i64, start: i64) -> u64 {
(end - start).max(0) as u64
}
// ── CpuSample ─────────────────────────────────────────────────────────────────
@@ -218,31 +235,60 @@ fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
/// Use [`cpu_efficiency`](Self::cpu_efficiency) to measure the fraction of
/// available cores used since the snapshot was taken.
pub struct CpuSample {
wall: Instant,
wall: Instant,
user_secs: f64,
sys_secs: f64,
sys_secs: f64,
previous: f64,
}
impl CpuSample {
pub fn now() -> Self {
let ru = get_rusage();
Self {
wall: Instant::now(),
wall: Instant::now(),
user_secs: tv_to_secs(ru.ru_utime),
sys_secs: tv_to_secs(ru.ru_stime),
sys_secs: tv_to_secs(ru.ru_stime),
previous: 0.0,
}
}
/// (user_delta + sys_delta) / (wall_delta × n_cores) since this snapshot.
/// Returns 0.0 if less than 100 ms have elapsed (too noisy).
pub fn cpu_efficiency(&self, n_cores: usize) -> f64 {
let ru = get_rusage();
let ru = get_rusage();
let wall = self.wall.elapsed().as_secs_f64();
if wall < 0.1 { return 0.0; }
let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs)
+ (tv_to_secs(ru.ru_stime) - self.sys_secs);
if wall < 0.1 {
return 0.0;
}
let cpu =
(tv_to_secs(ru.ru_utime) - self.user_secs) + (tv_to_secs(ru.ru_stime) - self.sys_secs);
cpu / (wall * n_cores as f64)
}
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let n = CpuSample::now();
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
let delta_wall = self.wall.elapsed().as_secs_f64();
let efficiency = delta_ru / delta_wall;
let activate = 0f64.max(efficiency - self.previous) >= threshold;
if activate {
debug!(
"Do I activate : {} -> {} = {} Activate: {}",
self.previous,
efficiency,
0f64.max(efficiency - self.previous),
activate
);
self.previous = efficiency;
self.user_secs = n.user_secs;
self.sys_secs = n.sys_secs;
self.wall = n.wall;
}
activate
}
}
// ── public API ────────────────────────────────────────────────────────────────
@@ -251,33 +297,37 @@ impl CpuSample {
#[must_use = "call .stop() to record the stage"]
pub struct Stage {
label: String,
wall: Instant,
ru: rusage,
wall: Instant,
ru: rusage,
}
impl Stage {
pub fn start(label: impl Into<String>) -> Self {
let label = label.into();
info!(stage = %label, "started");
Self { label, wall: Instant::now(), ru: get_rusage() }
Self {
label,
wall: Instant::now(),
ru: get_rusage(),
}
}
pub fn stop(self) -> StageStats {
let wall_secs = self.wall.elapsed().as_secs_f64();
let end = get_rusage();
let stats = StageStats {
label: self.label,
label: self.label,
wall_secs,
user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime),
sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime),
user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime),
sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime),
max_rss_bytes: rss_to_bytes(&end),
minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64),
major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64),
vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64),
invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64),
in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64),
out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64),
swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64),
minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64),
major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64),
vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64),
invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64),
in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64),
out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64),
swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64),
};
info!(
stage = %stats.label,
@@ -299,27 +349,30 @@ impl Stage {
/// Per-stage efficiency metrics collected from `getrusage(RUSAGE_SELF)` deltas.
pub struct StageStats {
pub label: String,
pub wall_secs: f64,
pub user_secs: f64,
pub sys_secs: f64,
pub label: String,
pub wall_secs: f64,
pub user_secs: f64,
pub sys_secs: f64,
/// Peak RSS at end of stage (bytes). ru_maxrss is a process-lifetime maximum,
/// so this reflects the high-water mark up to and including this stage.
pub max_rss_bytes: u64,
pub minor_faults: u64,
pub major_faults: u64,
pub vol_ctx: u64, // voluntary context switches
pub invol_ctx: u64, // involuntary context switches
pub in_blocks: u64, // filesystem block reads (after page cache)
pub out_blocks: u64, // filesystem block writes
pub swaps: u64,
pub minor_faults: u64,
pub major_faults: u64,
pub vol_ctx: u64, // voluntary context switches
pub invol_ctx: u64, // involuntary context switches
pub in_blocks: u64, // filesystem block reads (after page cache)
pub out_blocks: u64, // filesystem block writes
pub swaps: u64,
}
impl StageStats {
/// (user + sys) / wall — effective thread count utilisation.
pub fn parallelism(&self) -> f64 {
if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs }
else { 0.0 }
if self.wall_secs > 1e-9 {
(self.user_secs + self.sys_secs) / self.wall_secs
} else {
0.0
}
}
/// parallelism / n_cores — fraction of available CPU power used (0..1+).
@@ -335,25 +388,33 @@ pub struct Reporter {
}
impl Reporter {
pub fn new() -> Self { Self::default() }
pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); }
pub fn stages(&self) -> &[StageStats] { &self.stages }
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, stats: StageStats) {
self.stages.push(stats);
}
pub fn stages(&self) -> &[StageStats] {
&self.stages
}
/// Print the summary to stderr.
pub fn print(&self) { eprint!("{self}"); }
pub fn print(&self) {
eprint!("{self}");
}
}
// ── diagnosis ─────────────────────────────────────────────────────────────────
struct Diagnosis {
tag: &'static str,
tag: &'static str,
detail: Option<String>,
}
// Thresholds are intentionally conservative to avoid false positives.
fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
let eff = s.efficiency(n_cores);
let eff = s.efficiency(n_cores);
let cpu_pct = eff * 100.0;
let io_ops = s.in_blocks + s.out_blocks;
let io_ops = s.in_blocks + s.out_blocks;
// swaps > 0 is the only reliable cross-platform indicator of true RAM exhaustion.
// ru_majflt is intentionally excluded: on macOS it counts all file-backed mmap
@@ -387,26 +448,43 @@ fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
)),
};
}
Diagnosis { tag: "", detail: None }
Diagnosis {
tag: "",
detail: None,
}
}
// ── display helpers ───────────────────────────────────────────────────────────
fn fmt_secs(s: f64) -> String {
if s >= 100.0 { format!("{:.0}s", s) }
else if s >= 10.0 { format!("{:.1}s", s) }
else if s >= 1.0 { format!("{:.2}s", s) }
else { format!("{:.0}ms", s * 1000.0) }
if s >= 100.0 {
format!("{:.0}s", s)
} else if s >= 10.0 {
format!("{:.1}s", s)
} else if s >= 1.0 {
format!("{:.2}s", s)
} else {
format!("{:.0}ms", s * 1000.0)
}
}
fn fmt_bytes(b: u64) -> String {
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) }
else { format!("{:.0} KB", b as f64 / 1024.0) }
if b >= 1 << 30 {
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64)
} else if b >= 1 << 20 {
format!("{:.0} MB", b as f64 / (1u64 << 20) as f64)
} else {
format!("{:.0} KB", b as f64 / 1024.0)
}
}
fn fmt_efficiency(par: f64, n_cores: usize) -> String {
format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0)
format!(
"{:.1}×/{} ({:.0}%)",
par,
n_cores,
par / n_cores as f64 * 100.0
)
}
// ── Display ───────────────────────────────────────────────────────────────────
@@ -414,8 +492,8 @@ fn fmt_efficiency(par: f64, n_cores: usize) -> String {
// ── MemoryBudget ──────────────────────────────────────────────────────────────
struct BudgetInner {
remaining: u64,
active: usize,
remaining: u64,
active: usize,
peak_active: usize,
}
@@ -425,8 +503,8 @@ struct BudgetInner {
/// completion. Non-deadlock guarantee: when no worker is active the next
/// acquire always succeeds regardless of cost vs. remaining budget.
pub struct MemoryBudget {
total: u64,
inner: Mutex<BudgetInner>,
total: u64,
inner: Mutex<BudgetInner>,
condvar: Condvar,
}
@@ -434,7 +512,11 @@ impl MemoryBudget {
pub fn new(total: u64) -> Self {
Self {
total,
inner: Mutex::new(BudgetInner { remaining: total, active: 0, peak_active: 0 }),
inner: Mutex::new(BudgetInner {
remaining: total,
active: 0,
peak_active: 0,
}),
condvar: Condvar::new(),
}
}
@@ -443,9 +525,9 @@ impl MemoryBudget {
let mut g = self.inner.lock().unwrap();
loop {
if g.active == 0 || g.remaining >= cost {
g.remaining = g.remaining.saturating_sub(cost);
g.active += 1;
g.peak_active = g.peak_active.max(g.active);
g.remaining = g.remaining.saturating_sub(cost);
g.active += 1;
g.peak_active = g.peak_active.max(g.active);
return;
}
g = self.condvar.wait(g).unwrap();
@@ -455,47 +537,66 @@ impl MemoryBudget {
pub fn release(&self, cost: u64) {
let mut g = self.inner.lock().unwrap();
g.remaining = (g.remaining + cost).min(self.total);
g.active -= 1;
g.active -= 1;
self.condvar.notify_all();
}
pub fn total(&self) -> u64 { self.total }
pub fn active(&self) -> usize { self.inner.lock().unwrap().active }
pub fn remaining(&self) -> u64 { self.inner.lock().unwrap().remaining }
pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active }
pub fn total(&self) -> u64 {
self.total
}
pub fn active(&self) -> usize {
self.inner.lock().unwrap().active
}
pub fn remaining(&self) -> u64 {
self.inner.lock().unwrap().remaining
}
pub fn peak_active(&self) -> usize {
self.inner.lock().unwrap().peak_active
}
}
// ── Display ───────────────────────────────────────────────────────────────────
impl fmt::Display for Reporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.stages.is_empty() { return Ok(()); }
if self.stages.is_empty() {
return Ok(());
}
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
// column widths
let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5);
let nw = self
.stages
.iter()
.map(|s| s.label.len())
.max()
.unwrap_or(5)
.max(5);
// efficiency col: worst-case width for this run's n_cores value
let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len();
let sep_w = nw + 2 + 7 + 2 + ew + 2 + 8 + 2 + 12;
let sep = "".repeat(sep_w);
let sep = "".repeat(sep_w);
// header
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} status",
"stage", "wall", "efficiency", "peak RSS")?;
writeln!(
f,
"{:<nw$} {:>7} {:>ew$} {:>8} status",
"stage", "wall", "efficiency", "peak RSS"
)?;
writeln!(f, "{sep}")?;
// compute all diagnoses up front (needed for both table and footnotes)
let diagnoses: Vec<Diagnosis> = self.stages.iter()
.map(|s| diagnose(s, n_cores))
.collect();
let diagnoses: Vec<Diagnosis> = self.stages.iter().map(|s| diagnose(s, n_cores)).collect();
// per-stage rows
for (s, d) in self.stages.iter().zip(diagnoses.iter()) {
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} {}",
writeln!(
f,
"{:<nw$} {:>7} {:>ew$} {:>8} {}",
s.label,
fmt_secs(s.wall_secs),
fmt_efficiency(s.parallelism(), n_cores),
@@ -505,14 +606,21 @@ impl fmt::Display for Reporter {
}
// totals
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0);
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
let trss = self
.stages
.iter()
.map(|s| s.max_rss_bytes)
.max()
.unwrap_or(0);
let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 };
writeln!(f, "{sep}")?;
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8}",
writeln!(
f,
"{:<nw$} {:>7} {:>ew$} {:>8}",
"TOTAL",
fmt_secs(tw),
fmt_efficiency(tpar, n_cores),
@@ -520,7 +628,9 @@ impl fmt::Display for Reporter {
)?;
// bottleneck footnotes (only if at least one anomaly detected)
let bottlenecks: Vec<(&str, &str)> = self.stages.iter()
let bottlenecks: Vec<(&str, &str)> = self
.stages
.iter()
.zip(diagnoses.iter())
.filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det)))
.collect();