1 Commits

Author SHA1 Message Date
Eric Coissac 1f0233d033 refactor: make hwlocality optional and streamline release workflow
Release / create-release (push) Successful in 2m13s
Release / build-linux-x86_64 (push) Successful in 8m14s
Release / build-macos-arm64 (push) Failing after 4m33s
Extracts release creation into a dedicated `create-release` job that outputs a shared `release_id`, allowing downstream build jobs to upload binaries directly and ensuring atomic initialization. Introduces a `numa` feature flag for `obikindex` to make `hwlocality` optional, providing graceful fallbacks like a synthetic UMA node when disabled. Also bumps `obikmer` to 1.1.16.
2026-06-23 08:53:37 +02:00
8 changed files with 166 additions and 254 deletions
+2 -1
View File
@@ -1,8 +1,9 @@
name: CI
on:
pull_request:
push:
branches: ['main']
pull_request:
jobs:
build:
+14 -15
View File
@@ -11,10 +11,6 @@ jobs:
outputs:
release_id: ${{ steps.create.outputs.release_id }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Create Gitea release
id: create
env:
@@ -22,12 +18,11 @@ jobs:
TAG: ${{ github.ref_name }}
run: |
sudo apt-get update -qq && sudo apt-get install -y -qq jq
body=$(git for-each-ref --format='%(contents)' "refs/tags/$TAG")
release_id=$(curl -s -X POST \
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases" \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"tag_name\":\"$TAG\",\"name\":\"$TAG\",\"body\":$(echo "$body" | jq -Rs .)}" | jq -r '.id')
-d "{\"tag_name\":\"$TAG\",\"name\":\"$TAG\"}" | jq -r '.id')
echo "release_id=$release_id" >> $GITHUB_OUTPUT
build-linux-x86_64:
@@ -86,11 +81,20 @@ jobs:
build-macos-arm64:
needs: create-release
runs-on: ubuntu-latest
defaults:
run:
working-directory: src
steps:
- uses: actions/checkout@v4
- name: Login to registry
run: echo "${{ secrets.REGISTRYTOKEN }}" | docker login registry.metabarcoding.org -u ${{ github.actor }} --password-stdin
- name: Install Rust + zigbuild
run: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain stable
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
sudo apt-get update -qq && sudo apt-get install -y -qq jq
pip install ziglang --quiet --break-system-packages
$HOME/.cargo/bin/cargo install cargo-zigbuild
$HOME/.cargo/bin/rustup target add aarch64-apple-darwin
- name: Cache cargo registry
uses: actions/cache@v4
@@ -103,12 +107,7 @@ jobs:
restore-keys: macos-arm64-cargo-
- name: Build macOS binary
run: |
docker run --rm \
-v "${{ github.workspace }}:/src" \
-w /src/src \
registry.metabarcoding.org/cibuilder/rustcrossosx:latest \
cargo build --release --target aarch64-apple-darwin --no-default-features
run: cargo zigbuild --release --target aarch64-apple-darwin --no-default-features
- name: Prepare and upload artifact
env:
@@ -116,7 +115,7 @@ jobs:
RELEASE_ID: ${{ needs.create-release.outputs.release_id }}
run: |
mkdir -p /tmp/dist
cp src/target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
cp target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
curl -s -X POST \
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \
-H "Authorization: token $GITEA_TOKEN" \
-1
View File
@@ -8,7 +8,6 @@ data-stress
*.pb
./**/*.json
*.bin
*.log
Betula_exilis--IGA-24-33
benchmark/genomes
benchmark/simulated_data
+1 -5
View File
@@ -90,9 +90,5 @@ release: bump-version
@jj git push --change @
@new_version=$$(grep '^version = ' $(CARGO_TOML) | head -n 1 | sed 's/version = "\(.*\)"/\1/'); \
git_hash=$$(jj log -r @ --no-graph -T 'commit_id'); \
commits=$$(jj log -r 'latest(tags())..@' --no-graph -T 'description ++ "\n"' 2>/dev/null || \
jj log --no-graph -T 'description ++ "\n"' --limit 30); \
notes=$$(printf 'Write concise markdown release notes for obikmer (a Rust kmer genomics tool). Be technical and direct. Base them strictly on these commit messages:\n\n%s' "$$commits" | aichat 2>/dev/null); \
tag_msg="$${notes:-Release v$$new_version}"; \
git tag -a "v$$new_version" -m "$$tag_msg" "$$git_hash" && \
git tag "v$$new_version" "$$git_hash" && \
git push origin "v$$new_version"
+1 -1
View File
@@ -1704,7 +1704,7 @@ dependencies = [
[[package]]
name = "obikmer"
version = "1.1.29"
version = "1.1.16"
dependencies = [
"clap",
"csv",
+37 -8
View File
@@ -217,9 +217,13 @@ impl PartitionRunner {
return Ok(());
}
const SPAWN_THRESHOLD: f64 = 0.2;
const SPAWN_THRESHOLD: f64 = 0.95;
const TIMER_SECS: u64 = 30;
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
// ── Channels ──────────────────────────────────────────────────────────
let (part_tx, part_rx) = unbounded::<usize>();
let (activate_tx, activate_rx) = unbounded::<()>();
@@ -283,10 +287,10 @@ impl PartitionRunner {
drop(event_tx);
// ── Controller ────────────────────────────────────────────────────
let initial_workers = n_nodes.min(max_workers).min(n_total);
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
let mut n_active = initial_workers;
activate_tx.send(()).ok();
let mut n_active = 1usize;
let mut cpu_sample = CpuSample::now();
let mut eff_at_last_spawn = 0.0f64; // 0 = no previous spawn to evaluate
let mut completed = 0usize;
while completed < n_total {
@@ -303,13 +307,15 @@ impl PartitionRunner {
// Inline check: same logic as a timer tick.
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
&mut cpu_sample, &mut eff_at_last_spawn,
n_cores, SPAWN_THRESHOLD, completed, n_total,
);
}
WorkerEvent::TimerTick => {
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
&mut cpu_sample, &mut eff_at_last_spawn,
n_cores, SPAWN_THRESHOLD, completed, n_total,
);
}
}
@@ -340,15 +346,38 @@ fn maybe_activate(
n_active: &mut usize,
max_workers: usize,
cpu_sample: &mut CpuSample,
eff_at_last_spawn: &mut f64,
n_cores: usize,
threshold: f64,
completed: usize,
n_total: usize,
) {
if *n_active >= max_workers || completed >= n_total { return; }
if cpu_sample.do_i_activate(threshold) {
let eff = cpu_sample.cpu_efficiency(n_cores);
if eff >= threshold { return; } // CPU already saturated
// Check that the previous activation was beneficial enough.
// Going from k-1 → k workers, the minimum acceptable speedup is (k-1+0.2)/(k-1).
// For the very first extra worker (n_active == 1, no previous spawn), skip this
// check: eff_at_last_spawn == 0 acts as the sentinel.
let last_spawn_was_beneficial = if *eff_at_last_spawn < 1e-9 {
true // first additional worker: no prior data to evaluate
} else {
let k_before = (*n_active - 1) as f64;
let min_speedup = (k_before + 0.2) / k_before;
let actual_speedup = eff / *eff_at_last_spawn;
actual_speedup >= min_speedup
};
if last_spawn_was_beneficial {
activate_tx.send(()).ok();
*eff_at_last_spawn = eff;
*n_active += 1;
debug!("activated worker {}/{}", n_active, max_workers);
*cpu_sample = CpuSample::now();
debug!(
"activated worker {}/{} — efficiency {:.0}%",
n_active, max_workers, eff * 100.0,
);
}
}
+2 -4
View File
@@ -1,6 +1,6 @@
[package]
name = "obikmer"
version = "1.1.29"
version = "1.1.16"
edition = "2024"
[[bin]]
@@ -18,7 +18,7 @@ obikrope = { path = "../obikrope" }
obikpartitionner = { path = "../obikpartitionner" }
obisys = { path = "../obisys" }
obiskio = { path = "../obiskio" }
obikindex = { path = "../obikindex", default-features = false }
obikindex = { path = "../obikindex" }
obitaxonomy = { path = "../obitaxonomy" }
obilayeredmap = { path = "../obilayeredmap" }
clap = { version = "4", features = ["derive"] }
@@ -33,6 +33,4 @@ tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
pprof = { version = "0.13", features = ["prost-codec"], optional = true }
[features]
default = ["numa"]
numa = ["obikindex/numa"]
profiling = ["dep:pprof"]
+47 -157
View File
@@ -4,7 +4,7 @@ use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};
use indicatif::{ProgressBar, ProgressStyle};
use tracing::{debug, info, warn};
use tracing::{info, warn};
const BRAILLE: &[&str] = &["", "", "", "", "", "", "", "", "", ""];
@@ -31,8 +31,7 @@ impl TracedBar {
let pct10 = (pos * 10) / self.total; // 0..=10
let last = self.last_pct.load(Ordering::Relaxed);
if pct10 > last
&& self
.last_pct
&& self.last_pct
.compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
@@ -50,14 +49,14 @@ impl TracedBar {
let msg = msg.into();
if self.pb.is_hidden() {
if self.total > 0 {
debug!(stage = %self.label, "{msg}");
// bounded bar: always log (already rate-limited by 10% threshold in inc)
info!(stage = %self.label, "{msg}");
} else {
// spinner: throttle to ~10 s
let now_ms = self.start.elapsed().as_millis() as u64;
let last = self.last_log_ms.load(Ordering::Relaxed);
if now_ms >= last + 10_000
&& self
.last_log_ms
&& self.last_log_ms
.compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
@@ -84,13 +83,8 @@ pub fn spinner(label: &str) -> TracedBar {
);
pb.enable_steady_tick(Duration::from_millis(100));
TracedBar {
pb,
label: label.to_string(),
unit: String::new(),
total: 0,
start: Instant::now(),
last_pct: AtomicU64::new(0),
last_log_ms: AtomicU64::new(0),
pb, label: label.to_string(), unit: String::new(), total: 0,
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
}
}
@@ -107,13 +101,8 @@ pub fn progress_bar(label: &str, n: u64, unit: &str) -> TracedBar {
);
pb.enable_steady_tick(Duration::from_millis(100));
TracedBar {
pb,
label: label.to_string(),
unit: unit.to_string(),
total: n,
start: Instant::now(),
last_pct: AtomicU64::new(0),
last_log_ms: AtomicU64::new(0),
pb, label: label.to_string(), unit: unit.to_string(), total: n,
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
}
}
@@ -215,19 +204,13 @@ fn tv_to_secs(tv: timeval) -> f64 {
}
#[cfg(target_os = "macos")]
fn rss_to_bytes(ru: &rusage) -> u64 {
ru.ru_maxrss as u64
}
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 }
#[cfg(not(target_os = "macos"))]
fn rss_to_bytes(ru: &rusage) -> u64 {
ru.ru_maxrss as u64 * 1024
}
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 }
// Monotonically increasing counters — negative delta would be a kernel bug.
fn delta(end: i64, start: i64) -> u64 {
(end - start).max(0) as u64
}
fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
// ── CpuSample ─────────────────────────────────────────────────────────────────
@@ -238,7 +221,6 @@ pub struct CpuSample {
wall: Instant,
user_secs: f64,
sys_secs: f64,
previous: f64,
}
impl CpuSample {
@@ -248,7 +230,6 @@ impl CpuSample {
wall: Instant::now(),
user_secs: tv_to_secs(ru.ru_utime),
sys_secs: tv_to_secs(ru.ru_stime),
previous: 0.0,
}
}
@@ -257,38 +238,11 @@ impl CpuSample {
pub fn cpu_efficiency(&self, n_cores: usize) -> f64 {
let ru = get_rusage();
let wall = self.wall.elapsed().as_secs_f64();
if wall < 0.1 {
return 0.0;
}
let cpu =
(tv_to_secs(ru.ru_utime) - self.user_secs) + (tv_to_secs(ru.ru_stime) - self.sys_secs);
if wall < 0.1 { return 0.0; }
let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs)
+ (tv_to_secs(ru.ru_stime) - self.sys_secs);
cpu / (wall * n_cores as f64)
}
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let n = CpuSample::now();
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
let delta_wall = self.wall.elapsed().as_secs_f64();
let efficiency = delta_ru / delta_wall;
let activate = 0f64.max(efficiency - self.previous) >= threshold;
if activate {
debug!(
"Do I activate : {} -> {} = {} Activate: {}",
self.previous,
efficiency,
0f64.max(efficiency - self.previous),
activate
);
self.previous = efficiency;
self.user_secs = n.user_secs;
self.sys_secs = n.sys_secs;
self.wall = n.wall;
}
activate
}
}
// ── public API ────────────────────────────────────────────────────────────────
@@ -305,11 +259,7 @@ impl Stage {
pub fn start(label: impl Into<String>) -> Self {
let label = label.into();
info!(stage = %label, "started");
Self {
label,
wall: Instant::now(),
ru: get_rusage(),
}
Self { label, wall: Instant::now(), ru: get_rusage() }
}
pub fn stop(self) -> StageStats {
@@ -368,11 +318,8 @@ pub struct StageStats {
impl StageStats {
/// (user + sys) / wall — effective thread count utilisation.
pub fn parallelism(&self) -> f64 {
if self.wall_secs > 1e-9 {
(self.user_secs + self.sys_secs) / self.wall_secs
} else {
0.0
}
if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs }
else { 0.0 }
}
/// parallelism / n_cores — fraction of available CPU power used (0..1+).
@@ -388,19 +335,11 @@ pub struct Reporter {
}
impl Reporter {
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, stats: StageStats) {
self.stages.push(stats);
}
pub fn stages(&self) -> &[StageStats] {
&self.stages
}
pub fn new() -> Self { Self::default() }
pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); }
pub fn stages(&self) -> &[StageStats] { &self.stages }
/// Print the summary to stderr.
pub fn print(&self) {
eprint!("{self}");
}
pub fn print(&self) { eprint!("{self}"); }
}
// ── diagnosis ─────────────────────────────────────────────────────────────────
@@ -448,43 +387,26 @@ fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
)),
};
}
Diagnosis {
tag: "",
detail: None,
}
Diagnosis { tag: "", detail: None }
}
// ── display helpers ───────────────────────────────────────────────────────────
fn fmt_secs(s: f64) -> String {
if s >= 100.0 {
format!("{:.0}s", s)
} else if s >= 10.0 {
format!("{:.1}s", s)
} else if s >= 1.0 {
format!("{:.2}s", s)
} else {
format!("{:.0}ms", s * 1000.0)
}
if s >= 100.0 { format!("{:.0}s", s) }
else if s >= 10.0 { format!("{:.1}s", s) }
else if s >= 1.0 { format!("{:.2}s", s) }
else { format!("{:.0}ms", s * 1000.0) }
}
fn fmt_bytes(b: u64) -> String {
if b >= 1 << 30 {
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64)
} else if b >= 1 << 20 {
format!("{:.0} MB", b as f64 / (1u64 << 20) as f64)
} else {
format!("{:.0} KB", b as f64 / 1024.0)
}
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) }
else { format!("{:.0} KB", b as f64 / 1024.0) }
}
fn fmt_efficiency(par: f64, n_cores: usize) -> String {
format!(
"{:.1}×/{} ({:.0}%)",
par,
n_cores,
par / n_cores as f64 * 100.0
)
format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0)
}
// ── Display ───────────────────────────────────────────────────────────────────
@@ -512,11 +434,7 @@ impl MemoryBudget {
pub fn new(total: u64) -> Self {
Self {
total,
inner: Mutex::new(BudgetInner {
remaining: total,
active: 0,
peak_active: 0,
}),
inner: Mutex::new(BudgetInner { remaining: total, active: 0, peak_active: 0 }),
condvar: Condvar::new(),
}
}
@@ -541,40 +459,24 @@ impl MemoryBudget {
self.condvar.notify_all();
}
pub fn total(&self) -> u64 {
self.total
}
pub fn active(&self) -> usize {
self.inner.lock().unwrap().active
}
pub fn remaining(&self) -> u64 {
self.inner.lock().unwrap().remaining
}
pub fn peak_active(&self) -> usize {
self.inner.lock().unwrap().peak_active
}
pub fn total(&self) -> u64 { self.total }
pub fn active(&self) -> usize { self.inner.lock().unwrap().active }
pub fn remaining(&self) -> u64 { self.inner.lock().unwrap().remaining }
pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active }
}
// ── Display ───────────────────────────────────────────────────────────────────
impl fmt::Display for Reporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.stages.is_empty() {
return Ok(());
}
if self.stages.is_empty() { return Ok(()); }
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
// column widths
let nw = self
.stages
.iter()
.map(|s| s.label.len())
.max()
.unwrap_or(5)
.max(5);
let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5);
// efficiency col: worst-case width for this run's n_cores value
let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len();
@@ -582,21 +484,18 @@ impl fmt::Display for Reporter {
let sep = "".repeat(sep_w);
// header
writeln!(
f,
"{:<nw$} {:>7} {:>ew$} {:>8} status",
"stage", "wall", "efficiency", "peak RSS"
)?;
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} status",
"stage", "wall", "efficiency", "peak RSS")?;
writeln!(f, "{sep}")?;
// compute all diagnoses up front (needed for both table and footnotes)
let diagnoses: Vec<Diagnosis> = self.stages.iter().map(|s| diagnose(s, n_cores)).collect();
let diagnoses: Vec<Diagnosis> = self.stages.iter()
.map(|s| diagnose(s, n_cores))
.collect();
// per-stage rows
for (s, d) in self.stages.iter().zip(diagnoses.iter()) {
writeln!(
f,
"{:<nw$} {:>7} {:>ew$} {:>8} {}",
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} {}",
s.label,
fmt_secs(s.wall_secs),
fmt_efficiency(s.parallelism(), n_cores),
@@ -609,18 +508,11 @@ impl fmt::Display for Reporter {
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
let trss = self
.stages
.iter()
.map(|s| s.max_rss_bytes)
.max()
.unwrap_or(0);
let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0);
let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 };
writeln!(f, "{sep}")?;
writeln!(
f,
"{:<nw$} {:>7} {:>ew$} {:>8}",
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8}",
"TOTAL",
fmt_secs(tw),
fmt_efficiency(tpar, n_cores),
@@ -628,9 +520,7 @@ impl fmt::Display for Reporter {
)?;
// bottleneck footnotes (only if at least one anomaly detected)
let bottlenecks: Vec<(&str, &str)> = self
.stages
.iter()
let bottlenecks: Vec<(&str, &str)> = self.stages.iter()
.zip(diagnoses.iter())
.filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det)))
.collect();