1 Commits

Author SHA1 Message Date
Eric Coissac 21a20ce7ca feat: automate release workflow and add optional NUMA support
Release / create-release (push) Failing after 38s
Release / build-macos-arm64 (push) Has been skipped
Release / build-linux-x86_64 (push) Has been skipped
Refactor the Gitea release workflow to introduce a dedicated `create-release` job that exposes a shared release ID, eliminating redundant inline creation. Update the Makefile to automatically generate annotated tags with AI-derived markdown release notes from recent commits. Add an optional `numa` feature to `obikindex` that gates `hwlocality` behind conditional compilation, providing a graceful UMA fallback when disabled. Bump `obikmer` to v1.1.17.
2026-06-23 08:59:52 +02:00
15 changed files with 405 additions and 1153 deletions
Vendored
BIN
View File
Binary file not shown.
+2 -1
View File
@@ -1,8 +1,9 @@
name: CI name: CI
on: on:
pull_request: push:
branches: ['main'] branches: ['main']
pull_request:
jobs: jobs:
build: build:
+14 -10
View File
@@ -13,7 +13,7 @@ jobs:
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
with: with:
fetch-depth: 0 fetch-tags: true
- name: Create Gitea release - name: Create Gitea release
id: create id: create
@@ -86,11 +86,20 @@ jobs:
build-macos-arm64: build-macos-arm64:
needs: create-release needs: create-release
runs-on: ubuntu-latest runs-on: ubuntu-latest
defaults:
run:
working-directory: src
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Login to registry - name: Install Rust + zigbuild
run: echo "${{ secrets.REGISTRYTOKEN }}" | docker login registry.metabarcoding.org -u ${{ secrets.REGISTRYUSER }} --password-stdin run: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain stable
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
sudo apt-get update -qq && sudo apt-get install -y -qq jq
pip install ziglang --quiet --break-system-packages
$HOME/.cargo/bin/cargo install cargo-zigbuild
$HOME/.cargo/bin/rustup target add aarch64-apple-darwin
- name: Cache cargo registry - name: Cache cargo registry
uses: actions/cache@v4 uses: actions/cache@v4
@@ -103,12 +112,7 @@ jobs:
restore-keys: macos-arm64-cargo- restore-keys: macos-arm64-cargo-
- name: Build macOS binary - name: Build macOS binary
run: | run: cargo zigbuild --release --target aarch64-apple-darwin --no-default-features
docker run --rm \
-v "${{ github.workspace }}:/src" \
-w /src/src \
registry.metabarcoding.org/cibuilder/rustcrossosx:latest \
cargo build --release --target aarch64-apple-darwin --no-default-features
- name: Prepare and upload artifact - name: Prepare and upload artifact
env: env:
@@ -116,7 +120,7 @@ jobs:
RELEASE_ID: ${{ needs.create-release.outputs.release_id }} RELEASE_ID: ${{ needs.create-release.outputs.release_id }}
run: | run: |
mkdir -p /tmp/dist mkdir -p /tmp/dist
cp src/target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64 cp target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
curl -s -X POST \ curl -s -X POST \
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \ "${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \
-H "Authorization: token $GITEA_TOKEN" \ -H "Authorization: token $GITEA_TOKEN" \
-2
View File
@@ -8,14 +8,12 @@ data-stress
*.pb *.pb
./**/*.json ./**/*.json
*.bin *.bin
*.log
Betula_exilis--IGA-24-33 Betula_exilis--IGA-24-33
benchmark/genomes benchmark/genomes
benchmark/simulated_data benchmark/simulated_data
benchmark/specimen_index_presence benchmark/specimen_index_presence
benchmark/specimen_index_count benchmark/specimen_index_count
benchmark/global_index_presence benchmark/global_index_presence
benchmark/all_specific
benchmark/global_index_count benchmark/global_index_count
benchmark/stats benchmark/stats
benchmark/reference_index benchmark/reference_index
+2 -3
View File
@@ -92,7 +92,6 @@ release: bump-version
git_hash=$$(jj log -r @ --no-graph -T 'commit_id'); \ git_hash=$$(jj log -r @ --no-graph -T 'commit_id'); \
commits=$$(jj log -r 'latest(tags())..@' --no-graph -T 'description ++ "\n"' 2>/dev/null || \ commits=$$(jj log -r 'latest(tags())..@' --no-graph -T 'description ++ "\n"' 2>/dev/null || \
jj log --no-graph -T 'description ++ "\n"' --limit 30); \ jj log --no-graph -T 'description ++ "\n"' --limit 30); \
notes=$$(printf 'Write concise markdown release notes for obikmer (a Rust kmer genomics tool). Be technical and direct. Base them strictly on these commit messages:\n\n%s' "$$commits" | aichat 2>/dev/null); \ notes=$$(printf 'Write concise markdown release notes for obikmer (a Rust kmer genomics tool). Be technical and direct. Base them strictly on these commit messages:\n\n%s' "$$commits" | aichat); \
tag_msg="$${notes:-Release v$$new_version}"; \ git tag -a "v$$new_version" -m "$$notes" "$$git_hash" && \
git tag -a "v$$new_version" -m "$$tag_msg" "$$git_hash" && \
git push origin "v$$new_version" git push origin "v$$new_version"
+3 -147
View File
@@ -162,158 +162,14 @@ A single `PartitionRunner` instance can be built once per command invocation
and reused across multiple `run()` calls (e.g. `merge` runs and reused across multiple `run()` calls (e.g. `merge` runs
`merge_partitions` then `pack_matrices`). `merge_partitions` then `pack_matrices`).
## Known issue: CPU-only activation signal stalls on I/O-bound stages
Observed on a real `filter` run (109 genomes, 256 partitions, 8×24-core NUMA):
`rebuild` (CPU-bound — k-mer construction) scales cleanly from 9 to 43 active
workers as `CpuSample::do_i_activate` (`obisys::lib.rs`) sees efficiency climb.
`pack_matrices` (I/O-bound — reopens and recomposes per-genome column files
into `.pbmx`/`.pcmx`) activates one extra worker then flatlines at 10/192 for
the rest of the stage, even though 256 partitions keep completing over several
minutes. This matches the documented intent (§ Adaptive mechanism — "avoids
over-provisioning ... I/O-bound ... workloads") but conflates two different
things: *"CPU is not the bottleneck"* and *"more workers would not help"*. On
storage with real queue depth (NVMe, RAID, parallel FS) the second stage could
still benefit from more concurrent workers even with flat CPU usage — a signal
the current mechanism cannot see.
A one-off artefact was also found in the same log: right after a stage
transition, `do_i_activate` produced a physically impossible spike (efficiency
~94 cores on a 192-core box) because it has no minimum-window guard — unlike
its sibling `cpu_efficiency`, which returns `0.0` if `wall < 0.1s`
(`obisys::lib.rs:260`). `do_i_activate` unconditionally overwrites
`self.wall`/`self.user_secs`/`self.sys_secs` even when the elapsed window is
too short to be meaningful, so a burst of rapid completions right after
activating a worker can divide a real CPU delta by a near-zero wall delta.
### Implemented: I/O signal + shared debounce guard
`IoSample` (`obisys::lib.rs`, alongside `CpuSample`) is fed by
`read_bytes`/`write_bytes` from `/proc/self/io` on Linux (actual bytes
submitted to the block layer — not `rchar`/`wchar`, which also count
page-cache hits, and not `ru_inblock`/`ru_oublock`, unreliable on macOS), with
a `proc_pid_rusage(RUSAGE_INFO_V4)` fallback on macOS
(`ri_diskio_bytesread`/`ri_diskio_byteswritten`, FFI only via `libc`, no new
dependency — same pattern as the existing `getrusage` bindings). Any other
target degrades gracefully to a signal that never triggers (falls back to
CPU-only activation), same pattern as `cgroup_v2_available`.
`maybe_activate` (`numa.rs`) activates a worker if *either* signal still shows
headroom, making `PartitionRunner` adapt to whichever resource is actually the
bottleneck without per-call configuration. Both samplers are called
unconditionally — no `||` short-circuit — so neither window starves behind
whichever signal fires first:
```rust
let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD);
if cpu_wants_more || io_wants_more {
activation.grow(GROWTH_DIVISOR, n_total);
}
```
The CPU threshold is *not* the flat absolute delta it started as: it scales
with `activation.last_step()` — the number of workers activated in the last
growth step, tracked by `NodeActivation` (`numa.rs`) and updated every time
`grow()` actually grows something. Growing by 8 workers should add ~8 cores of
efficiency if the workload is truly CPU-bound; requiring only
`CPU_SPAWN_THRESHOLD` (20 %) of that expected gain confirms the growth was
useful without demanding perfect linear scaling. Scaling by the *last step's
size* rather than the cumulative total keeps the bar equally meaningful
whether it's the 2nd growth step or the 20th — a flat absolute threshold
(0.2 core) is a strong signal at 8 active workers but pure noise at 150; a
threshold scaled by the *cumulative* total instead (considered and rejected)
would have made the bar essentially impossible to clear late in the ramp,
strangling exactly the CPU-bound saturation the mechanism exists to allow.
Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit),
raw I/O throughput has no natural scale across devices, so `IoSample` uses a
**relative** growth threshold instead of an absolute one:
```rust
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let elapsed = self.wall.elapsed().as_secs_f64();
if elapsed < 0.1 { return false; } // state untouched — window keeps accumulating
let n = Self::read_bytes();
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
let activate = if self.previous_rate == 0.0 {
rate > 0.0 // bootstrap: any measured throughput is signal
} else {
(rate - self.previous_rate) / self.previous_rate >= threshold
};
self.bytes = n;
self.wall = Instant::now(); // reset only on a real sample
activate
}
```
The `elapsed < 0.1s → return false without mutating state` guard was also
back-ported into `CpuSample::do_i_activate` (previously missing — source of
the ~94-core artefact above) — one fix for both problems, and it removes the
need for any arbitrary I/O-rate floor: a short/noisy window is rejected
outright rather than papered over with a hardware-dependent constant.
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, module-level
`const` in `numa.rs`, both `0.2`) are a starting point, not a derived value:
`0.2` (20 % relative growth) for `IoSample` was chosen to match the CPU
threshold's *implicit* relative sensitivity (in the observed log, an 8→9
worker step raised efficiency by ~12 %) — but I/O throughput is lumpier than
CPU time (buffered writes flush in bursts), so it needs empirical validation
against a real `pack` run before being considered final.
## Known issue: ramp-up too slow, and confused with node count
The original design started `n_nodes` workers (one per node) and grew one
worker at a time. On a real `filter` run this took ~10 minutes to climb from
9 to ~40 active workers even on the CPU-bound `rebuild` stage — most of a
35-minute stage spent under-provisioned while waiting for evidence to
accumulate one worker at a time. There is no scale-down mechanism (`n_active`
only grows), so the original caution was deliberate — but a quarter of
available cores is still far from saturation, and the real risk zone (over-provisioning
a memory-bandwidth-bound stage) only shows up much later in the ramp, near
full occupancy — not at 25 %.
The fix decouples ramp speed from node *count*: both the initial size and the
growth step are a fraction of `workers_per_node` (node *size*), applied
identically on every node. A single-NUMA-node (UMA) machine ramps exactly as
fast as an 8-node one — growing by `n_nodes` per step, as first considered,
would have degenerated to "grow by 1" on UMA, reproducing the original
problem for exactly the machines that need the fix most.
```rust
// NodeActivation::grow — called both at startup (activate_initial) and on
// every CPU/IO-triggered growth step, with a different divisor each time.
let wanted = (self.caps[idx] / divisor).max(1); // INITIAL_DIVISOR=4 at startup, GROWTH_DIVISOR=8 per step
let room = self.caps[idx].saturating_sub(self.active[idx]);
let grow = wanted.min(room).min(n_total.saturating_sub(self.total));
```
This also fixed a latent correctness gap: the original single shared
`activate_tx`/`activate_rx` pair had *no* per-node addressing — sending one
activation signal woke up whichever dormant worker (from any node) happened
to win the race on that channel. `crossbeam_channel` gives no fairness
guarantee across competing receivers, so "round-robin across nodes" was an
assumption the code never actually enforced. `PartitionRunner::run` now opens
one activation channel per node (`activate_txs`/`activate_rxs`, one pair per
`NodeConfig`); `NodeActivation` (`numa.rs`) tracks how many of each node's
dormant workers have been woken and grows every node by the same amount per
step, capped by that node's remaining dormant workers and by the run's total
budget (`n_total`) — balance across nodes is now guaranteed by construction,
not incidental to channel implementation details.
## Open questions ## Open questions
- **Error handling**: `run` currently returns the first error; remaining errors - **Error handling**: `run` currently returns the first error; remaining errors
are dropped. A `Vec<E>` return would give complete diagnostics. are dropped. A `Vec<E>` return would give complete diagnostics.
- **`INITIAL_DIVISOR` / `GROWTH_DIVISOR` tuning**: currently `4` and `8` - **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
(start at 1/4 of a node's cores, grow by 1/8 per step), chosen to fix an for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from
observed too-slow ramp — not yet validated against a real `pack` (I/O-bound) a higher value. A per-call override could be added to the API.
run, where over-provisioning risk is different from the CPU-bound `rebuild`
case this was tuned against.
- **`on_done` ordering**: the runner serialises calls to `on_done` via an - **`on_done` ordering**: the runner serialises calls to `on_done` via an
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
+1 -1
View File
@@ -1704,7 +1704,7 @@ dependencies = [
[[package]] [[package]]
name = "obikmer" name = "obikmer"
version = "1.1.35" version = "1.1.17"
dependencies = [ dependencies = [
"clap", "clap",
"csv", "csv",
BIN
View File
Binary file not shown.
+10 -48
View File
@@ -1,5 +1,5 @@
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{self, BufWriter, Read as _, Write as _}; use std::io::{self, BufWriter, Write as _};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use memmap2::Mmap; use memmap2::Mmap;
@@ -171,43 +171,19 @@ impl PackedBitMatrix {
} }
} }
/// Reads just the `n_cols` field from an existing packed matrix's header,
/// without mapping the file. Used by `pack_bit_matrix` to tell a genuinely
/// complete pack from a stale one that predates a later column-widening.
fn packed_bit_matrix_n_cols(path: &Path) -> io::Result<usize> {
let mut f = File::open(path)?;
let mut header = [0u8; PBMX_HEADER];
f.read_exact(&mut header)?;
Ok(u64::from_le_bytes(header[16..24].try_into().unwrap()) as usize)
}
/// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files. /// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files.
pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> { pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> {
let packed_path = dir.join("matrix.pbmx"); let packed_path = dir.join("matrix.pbmx");
if packed_path.exists() {
let meta = match MatrixMeta::load(dir) { // Matrix complete; remove any leftover column files from a killed cleanup.
Ok(meta) => meta, if let Ok(meta) = MatrixMeta::load(dir) {
Err(e) => { for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
// No columnar data pending: either this layer was already let _ = fs::remove_file(dir.join("meta.json"));
// packed and cleaned up (matrix.pbmx complete, nothing left to
// do), or genuinely nothing was ever written here.
return if packed_path.exists() { Ok(()) } else { Err(e) };
} }
};
// A `matrix.pbmx` can already exist here even though columnar data is
// still pending — e.g. copied verbatim from a merge's base source
// before this layer was widened with more genome columns (see
// `obikpartitionner::merge_partition`). Only skip (re-)packing if the
// existing file already reflects the current column count; otherwise
// the columnar files are newer and must be (re-)packed, overwriting the
// stale one — never silently discarded as "leftover cleanup".
if packed_bit_matrix_n_cols(&packed_path).ok() == Some(meta.n_cols) {
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
let _ = fs::remove_file(dir.join("meta.json"));
return Ok(()); return Ok(());
} }
let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols; let n_cols = meta.n_cols;
// Compute offsets from file sizes — no column data loaded into RAM. // Compute offsets from file sizes — no column data loaded into RAM.
@@ -524,26 +500,17 @@ where T: Clone + Default {
} }
/// Compute a symmetric `n×n` matrix in parallel by evaluating `f(i,j)` for /// Compute a symmetric `n×n` matrix in parallel by evaluating `f(i,j)` for
/// all upper-triangle pairs, plus `f(i,i)` for the diagonal. `T: Copy` avoids /// all upper-triangle pairs. `T: Copy` avoids the `.clone()` needed for the
/// the `.clone()` needed for the lower-triangle mirror. /// lower-triangle mirror.
///
/// The diagonal is *not* generally `T::default()`: for a self-comparison,
/// `f(i,i)` is often the column's own weight (e.g. intersection-with-self —
/// see `pairwise2_matrix`), not zero. Distance finalisations that need a
/// zero diagonal (self-distance) already overwrite it explicitly.
pub(crate) fn pairwise_matrix<T>(n: usize, f: impl Fn(usize, usize) -> T + Sync) -> Array2<T> pub(crate) fn pairwise_matrix<T>(n: usize, f: impl Fn(usize, usize) -> T + Sync) -> Array2<T>
where T: Copy + Default + Send { where T: Copy + Default + Send {
let results: Vec<(usize, usize, T)> = upper_pairs(n) let results: Vec<(usize, usize, T)> = upper_pairs(n)
.into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect(); .into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect();
let mut m = fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))); fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v)))
for i in 0..n { m[[i, i]] = f(i, i); }
m
} }
/// Same as `pairwise_matrix` but `f` returns two values that fill two /// Same as `pairwise_matrix` but `f` returns two values that fill two
/// symmetric matrices simultaneously (e.g. intersection + union for Jaccard). /// symmetric matrices simultaneously (e.g. intersection + union for Jaccard).
/// The diagonal is `f(i,i)` (e.g. a genome's kmer count intersected with
/// itself), not `T::default()` — see `pairwise_matrix` for why that matters.
pub(crate) fn pairwise2_matrix<T>(n: usize, f: impl Fn(usize, usize) -> (T, T) + Sync) -> (Array2<T>, Array2<T>) pub(crate) fn pairwise2_matrix<T>(n: usize, f: impl Fn(usize, usize) -> (T, T) + Sync) -> (Array2<T>, Array2<T>)
where T: Copy + Default + Send { where T: Copy + Default + Send {
let results: Vec<(usize, usize, T, T)> = upper_pairs(n) let results: Vec<(usize, usize, T, T)> = upper_pairs(n)
@@ -556,10 +523,5 @@ where T: Copy + Default + Send {
m0[[i, j]] = a; m0[[j, i]] = a; m0[[i, j]] = a; m0[[j, i]] = a;
m1[[i, j]] = b; m1[[j, i]] = b; m1[[i, j]] = b; m1[[j, i]] = b;
} }
for i in 0..n {
let (a, b) = f(i, i);
m0[[i, i]] = a;
m1[[i, i]] = b;
}
(m0, m1) (m0, m1)
} }
+6 -33
View File
@@ -1,5 +1,5 @@
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{self, BufWriter, Read as _, Write as _}; use std::io::{self, BufWriter, Write as _};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use memmap2::Mmap; use memmap2::Mmap;
@@ -228,44 +228,17 @@ impl PackedCompactIntMatrix {
} }
} }
/// Reads just the `n_cols` field from an existing packed matrix's header,
/// without mapping the file. Used by `pack_compact_int_matrix` to tell a
/// genuinely complete pack from a stale one that predates a later
/// column-widening.
fn packed_int_matrix_n_cols(path: &Path) -> io::Result<usize> {
let mut f = File::open(path)?;
let mut header = [0u8; PCMX_HEADER];
f.read_exact(&mut header)?;
Ok(u64::from_le_bytes(header[16..24].try_into().unwrap()) as usize)
}
/// Build `counts/matrix.pcmx` from existing `col_*.pciv` files. /// Build `counts/matrix.pcmx` from existing `col_*.pciv` files.
pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> { pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> {
let packed_path = dir.join("matrix.pcmx"); let packed_path = dir.join("matrix.pcmx");
if packed_path.exists() {
let meta = match MatrixMeta::load(dir) { if let Ok(meta) = MatrixMeta::load(dir) {
Ok(meta) => meta, for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
Err(e) => { let _ = fs::remove_file(dir.join("meta.json"));
// No columnar data pending: either this layer was already
// packed and cleaned up (matrix.pcmx complete, nothing left to
// do), or genuinely nothing was ever written here.
return if packed_path.exists() { Ok(()) } else { Err(e) };
} }
};
// A `matrix.pcmx` can already exist here even though columnar data is
// still pending — e.g. copied verbatim from a merge's base source
// before this layer was widened with more genome columns (see
// `obikpartitionner::merge_partition`). Only skip (re-)packing if the
// existing file already reflects the current column count; otherwise
// the columnar files are newer and must be (re-)packed, overwriting the
// stale one — never silently discarded as "leftover cleanup".
if packed_int_matrix_n_cols(&packed_path).ok() == Some(meta.n_cols) {
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
let _ = fs::remove_file(dir.join("meta.json"));
return Ok(()); return Ok(());
} }
let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols; let n_cols = meta.n_cols;
let col_sizes: Vec<u64> = (0..n_cols) let col_sizes: Vec<u64> = (0..n_cols)
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len())) .map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
+106 -220
View File
@@ -20,7 +20,7 @@ use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet; use hwlocality::cpu::cpuset::CpuSet;
#[cfg(feature = "numa")] #[cfg(feature = "numa")]
use hwlocality::object::types::ObjectType; use hwlocality::object::types::ObjectType;
use obisys::{CpuSample, IoSample}; use obisys::CpuSample;
use tracing::debug; use tracing::debug;
// ── Public interface ────────────────────────────────────────────────────────── // ── Public interface ──────────────────────────────────────────────────────────
@@ -70,10 +70,7 @@ pub fn build() -> NumaSetup {
nodes.len(), nodes.len(),
nodes.first().map_or(0, |v| v.len()), nodes.first().map_or(0, |v| v.len()),
); );
return NumaSetup { return NumaSetup { pools, cpus_per_node: nodes };
pools,
cpus_per_node: nodes,
};
} }
} }
} }
@@ -84,7 +81,7 @@ pub fn build() -> NumaSetup {
.unwrap_or(1); .unwrap_or(1);
debug!("UMA: single synthetic node, {} core(s)", n_cores); debug!("UMA: single synthetic node, {} core(s)", n_cores);
NumaSetup { NumaSetup {
pools: vec![None], pools: vec![None],
cpus_per_node: vec![(0..n_cores).collect()], cpus_per_node: vec![(0..n_cores).collect()],
} }
} }
@@ -96,7 +93,7 @@ pub fn build() -> NumaSetup {
.unwrap_or(1); .unwrap_or(1);
debug!("UMA: single synthetic node, {} core(s)", n_cores); debug!("UMA: single synthetic node, {} core(s)", n_cores);
NumaSetup { NumaSetup {
pools: vec![None], pools: vec![None],
cpus_per_node: vec![(0..n_cores).collect()], cpus_per_node: vec![(0..n_cores).collect()],
} }
} }
@@ -105,9 +102,7 @@ pub fn build() -> NumaSetup {
/// Silently returns on any error so the thread still runs, just unbound. /// Silently returns on any error so the thread still runs, just unbound.
#[cfg(feature = "numa")] #[cfg(feature = "numa")]
pub fn pin_current_thread(cpu_indices: &[usize]) { pub fn pin_current_thread(cpu_indices: &[usize]) {
let Ok(topology) = Topology::new() else { let Ok(topology) = Topology::new() else { return };
return;
};
let mut cpuset = CpuSet::new(); let mut cpuset = CpuSet::new();
for &idx in cpu_indices { for &idx in cpu_indices {
cpuset.set(idx); cpuset.set(idx);
@@ -137,48 +132,29 @@ fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
.ok() .ok()
} }
// ── PartitionRunner ───────────────────────────────────────────────────────── // ── PartitionRunner ───────────────────────────────────────────────────────────
/// Growth step (fraction of a node's worker capacity added per activation
/// event, see [`NodeActivation::grow`]).
const GROWTH_DIVISOR: usize = 8;
/// Minimum CPU efficiency growth to activate more workers, as a fraction of
/// the size of the *last growth step* (e.g. `0.2` after adding 8 workers
/// requires the next check to show at least +1.6 cores of growth — 20 % of
/// the ~8 cores those 8 workers should contribute if the workload is truly
/// CPU-bound). Scaling by the last step's size — not the cumulative total —
/// keeps the bar meaningful regardless of how many workers are already
/// active, instead of demanding an ever-larger absolute jump as the pool
/// grows.
const CPU_SPAWN_THRESHOLD: f64 = 0.2;
/// Minimum I/O throughput growth (relative) to activate more workers.
const IO_SPAWN_THRESHOLD: f64 = 0.2;
struct NodeConfig { struct NodeConfig {
pool: Option<Arc<rayon::ThreadPool>>, pool: Option<Arc<rayon::ThreadPool>>,
cpu_ids: Vec<usize>, cpu_ids: Vec<usize>,
max_workers: usize, max_workers: usize,
} }
/// Generic NUMA-aware runner for partition-level parallel work. /// Generic NUMA-aware runner for partition-level parallel work.
/// ///
/// Workers are distributed evenly across NUMA nodes and pinned to their /// Workers are distributed round-robin across NUMA nodes and pinned to their
/// node's CPUs. UMA is the degenerate case: one node, no pinning. /// node's CPUs. UMA is the degenerate case: one node, no pinning.
/// ///
/// Workers are pre-spawned dormant, one activation channel per node so /// Workers are pre-spawned dormant and activated one by one as CPU efficiency
/// growth always targets a specific node rather than whichever dormant /// falls below `SPAWN_THRESHOLD`. This avoids over-provisioning on I/O-bound
/// worker happens to wake up first on a shared channel. Growth (both the /// or memory-bandwidth-bound workloads while saturating CPU-bound ones.
/// initial count and each subsequent step) is expressed as a fraction of
/// `workers_per_node`, applied identically to every node, so the pace of
/// ramp-up depends on node size rather than node count — a single-NUMA-node
/// (UMA) machine ramps just as fast as an 8-node one.
/// ///
/// # Termination /// # Termination
/// ///
/// ```text /// ```text
/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx /// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
/// drop(result_tx) → result_rx closes → controller loop exits /// drop(result_tx) → result_rx closes → controller loop exits
/// drop(activate_txs) → dormant workers exit cleanly /// drop(activate_tx) → dormant workers exit cleanly
/// ``` /// ```
pub struct PartitionRunner { pub struct PartitionRunner {
nodes: Vec<NodeConfig>, nodes: Vec<NodeConfig>,
@@ -199,8 +175,7 @@ impl PartitionRunner {
ns.pools.len(), ns.pools.len(),
wpn, wpn,
); );
let nodes = ns let nodes = ns.pools
.pools
.into_iter() .into_iter()
.zip(ns.cpus_per_node) .zip(ns.cpus_per_node)
.map(|(pool, cpu_ids)| NodeConfig { .map(|(pool, cpu_ids)| NodeConfig {
@@ -214,24 +189,23 @@ impl PartitionRunner {
/// Run `f(i)` for every index in `order`. /// Run `f(i)` for every index in `order`.
/// ///
/// Workers are pre-spawned dormant and activated adaptively, per node: /// Workers are pre-spawned dormant and activated adaptively. A timer thread
/// `(workers_per_node / INITIAL_DIVISOR).max(1)` are woken immediately on /// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed
/// every node, then `(workers_per_node / GROWTH_DIVISOR).max(1)` more per /// partition resets that timer (forcing an immediate check) and also
/// node each time the check below fires. A timer thread fires that check /// triggers its own inline check. A new worker is activated whenever
/// every `TIMER_SECS` seconds; each completed partition resets that timer /// efficiency falls below `SPAWN_THRESHOLD`.
/// (forcing an immediate check) and also triggers its own inline check. A
/// growth step happens whenever CPU efficiency grows by at least
/// `CPU_SPAWN_THRESHOLD` of what the last growth step should have
/// contributed, or I/O throughput grows by at least `IO_SPAWN_THRESHOLD`
/// (relative) since the last check — whichever resource is the actual
/// bottleneck still shows headroom.
/// ///
/// `on_done(i, result, elapsed)` is called from the controller thread as /// `on_done(i, result, elapsed)` is called from the controller thread as
/// each partition completes — suitable for progress bars and result /// each partition completes — suitable for progress bars and result
/// aggregation. /// aggregation.
/// ///
/// Returns the first error produced by `f`, if any. /// Returns the first error produced by `f`, if any.
pub fn run<F, R, E, C>(&self, order: &[usize], f: F, mut on_done: C) -> Result<(), E> pub fn run<F, R, E, C>(
&self,
order: &[usize],
f: F,
mut on_done: C,
) -> Result<(), E>
where where
F: Fn(usize) -> Result<R, E> + Send + Sync, F: Fn(usize) -> Result<R, E> + Send + Sync,
R: Send, R: Send,
@@ -243,29 +217,27 @@ impl PartitionRunner {
return Ok(()); return Ok(());
} }
const TIMER_SECS: u64 = 30; const SPAWN_THRESHOLD: f64 = 0.95;
const INITIAL_DIVISOR: usize = 4; const TIMER_SECS: u64 = 30;
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
// ── Channels ────────────────────────────────────────────────────────── // ── Channels ──────────────────────────────────────────────────────────
let (part_tx, part_rx) = unbounded::<usize>(); let (part_tx, part_rx) = unbounded::<usize>();
let (activate_tx, activate_rx) = unbounded::<()>();
// reset_tx: controller → timer ("reset the 30 s window") // reset_tx: controller → timer ("reset the 30 s window")
let (reset_tx, reset_rx) = unbounded::<()>(); let (reset_tx, reset_rx) = unbounded::<()>();
// event_tx: workers + timer → controller (unified event stream) // event_tx: workers + timer → controller (unified event stream)
let (event_tx, event_rx) = unbounded::<WorkerEvent<R, E>>(); let (event_tx, event_rx) = unbounded::<WorkerEvent<R, E>>();
// One activation channel per node: growth always targets a specific
// node, rather than whichever dormant worker happens to win the race
// on a channel shared across all nodes.
let (activate_txs, activate_rxs): (Vec<_>, Vec<_>) =
(0..self.nodes.len()).map(|_| unbounded::<()>()).unzip();
for &i in order { for &i in order { part_tx.send(i).ok(); }
part_tx.send(i).ok();
}
drop(part_tx); drop(part_tx);
let max_workers = self.max_workers(); let max_workers = self.max_workers();
let node_caps: Vec<usize> = self.nodes.iter().map(|n| n.max_workers).collect(); let n_nodes = self.nodes.len();
let f = &f; let f = &f;
let mut first_err: Option<E> = None; let mut first_err: Option<E> = None;
@@ -288,92 +260,76 @@ impl PartitionRunner {
} }
}); });
// ── Pre-spawn workers dormant, grouped by node ──────────────────── // ── Pre-spawn workers dormant, round-robin across NUMA nodes ──────
// Each worker listens on its own node's activation channel only. for w in 0..max_workers {
for (node, arx) in self.nodes.iter().zip(activate_rxs.iter()) { let node = &self.nodes[w % n_nodes];
let prx = part_rx.clone();
let etx = event_tx.clone();
let arx = activate_rx.clone();
let pool = node.pool.clone();
let cpu_ids = &node.cpu_ids; let cpu_ids = &node.cpu_ids;
for _ in 0..node.max_workers {
let prx = part_rx.clone();
let etx = event_tx.clone();
let arx = arx.clone();
let pool = node.pool.clone();
s.spawn(move || { s.spawn(move || {
if arx.recv().is_err() { if arx.recv().is_err() { return; }
return; if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); }
} for i in &prx {
if !cpu_ids.is_empty() { let t = Instant::now();
pin_current_thread(cpu_ids); let r = match &pool {
} Some(p) => p.install(|| f(i)),
for i in &prx { None => f(i),
let t = Instant::now(); };
let r = match &pool { etx.send(WorkerEvent::Completed(i, r, t.elapsed())).ok();
Some(p) => p.install(|| f(i)), }
None => f(i), });
};
etx.send(WorkerEvent::Completed(i, r, t.elapsed())).ok();
}
});
}
} }
// Drop controller's event_tx: event_rx closes when all workers + // Drop controller's event_tx: event_rx closes when all workers +
// timer have exited. // timer have exited.
drop(event_tx); drop(event_tx);
// ── Controller ──────────────────────────────────────────────────── // ── Controller ────────────────────────────────────────────────────
let mut activation = NodeActivation::new(&activate_txs, &node_caps, max_workers); activate_tx.send(()).ok();
activation.activate_initial(INITIAL_DIVISOR, n_total); let mut n_active = 1usize;
let mut cpu_sample = CpuSample::now();
let mut cpu_sample = CpuSample::now(); let mut eff_at_last_spawn = 0.0f64; // 0 = no previous spawn to evaluate
let mut io_sample = IoSample::now(); let mut completed = 0usize;
let mut completed = 0usize;
while completed < n_total { while completed < n_total {
let Ok(event) = event_rx.recv() else { break }; let Ok(event) = event_rx.recv() else { break };
match event { match event {
WorkerEvent::Completed(i, r, dur) => { WorkerEvent::Completed(i, r, dur) => {
match r { match r {
Ok(v) => on_done(i, v, dur), Ok(v) => on_done(i, v, dur),
Err(e) => { Err(e) => { if first_err.is_none() { first_err = Some(e); } }
if first_err.is_none() {
first_err = Some(e);
}
}
} }
completed += 1; completed += 1;
// Reset the 30 s timer. // Reset the 30 s timer.
reset_tx.send(()).ok(); reset_tx.send(()).ok();
// Inline check: same logic as a timer tick. // Inline check: same logic as a timer tick.
maybe_activate( maybe_activate(
&mut activation, &activate_tx, &mut n_active, max_workers,
&mut cpu_sample, &mut cpu_sample, &mut eff_at_last_spawn,
&mut io_sample, n_cores, SPAWN_THRESHOLD, completed, n_total,
completed,
n_total,
); );
} }
WorkerEvent::TimerTick => { WorkerEvent::TimerTick => {
maybe_activate( maybe_activate(
&mut activation, &activate_tx, &mut n_active, max_workers,
&mut cpu_sample, &mut cpu_sample, &mut eff_at_last_spawn,
&mut io_sample, n_cores, SPAWN_THRESHOLD, completed, n_total,
completed,
n_total,
); );
} }
} }
} }
// Dormant workers exit once every sender for their node's channel // Dormant workers exit when activate_tx closes.
// is dropped — `activate_txs` holds the only ones. drop(activate_tx);
drop(activate_txs);
// Timer thread exits when reset_tx closes. // Timer thread exits when reset_tx closes.
drop(reset_tx); drop(reset_tx);
}); });
match first_err { match first_err {
Some(e) => Err(e), Some(e) => Err(e),
None => Ok(()), None => Ok(()),
} }
} }
} }
@@ -385,113 +341,43 @@ enum WorkerEvent<R, E> {
TimerTick, TimerTick,
} }
/// Tracks how many of each node's dormant workers have been woken, and
/// grows every node by the same amount at each step (capped by that node's
/// remaining dormant workers and by the run's total budget) so load stays
/// balanced across nodes at every point in time — never just "one more
/// worker somewhere". Also remembers the size of the last real growth step
/// (`last_step`), used to scale the CPU activation threshold to what that
/// step could plausibly have contributed (see `maybe_activate`).
struct NodeActivation<'a> {
txs: &'a [crossbeam_channel::Sender<()>],
caps: &'a [usize],
active: Vec<usize>,
total: usize,
max: usize,
last_step: usize,
}
impl<'a> NodeActivation<'a> {
fn new(txs: &'a [crossbeam_channel::Sender<()>], caps: &'a [usize], max: usize) -> Self {
Self {
txs,
caps,
active: vec![0; txs.len()],
total: 0,
max,
last_step: 0,
}
}
fn total(&self) -> usize {
self.total
}
fn last_step(&self) -> usize {
self.last_step
}
fn max(&self) -> usize {
self.max
}
fn is_full(&self) -> bool {
self.total >= self.max
}
/// Wake up to `(node_cap / divisor).max(1)` dormant workers on every
/// node, capped by `n_total`. Called once at startup, unconditionally.
fn activate_initial(&mut self, divisor: usize, n_total: usize) {
self.grow(divisor, n_total);
}
/// Same per-node sizing as [`activate_initial`](Self::activate_initial),
/// applied as a growth step. Returns the number of workers actually
/// activated (may be less than requested once a node or the total
/// budget is exhausted). Updates `last_step` when it actually grew.
fn grow(&mut self, divisor: usize, n_total: usize) -> usize {
let before = self.total;
for idx in 0..self.txs.len() {
let wanted = (self.caps[idx] / divisor).max(1);
let room = self.caps[idx].saturating_sub(self.active[idx]);
let grow = wanted.min(room).min(n_total.saturating_sub(self.total));
for _ in 0..grow {
self.txs[idx].send(()).ok();
}
self.active[idx] += grow;
self.total += grow;
}
let grew = self.total - before;
if grew > 0 {
self.last_step = grew;
}
grew
}
}
fn maybe_activate( fn maybe_activate(
activation: &mut NodeActivation, activate_tx: &crossbeam_channel::Sender<()>,
cpu_sample: &mut CpuSample, n_active: &mut usize,
io_sample: &mut IoSample, max_workers: usize,
completed: usize, cpu_sample: &mut CpuSample,
n_total: usize, eff_at_last_spawn: &mut f64,
n_cores: usize,
threshold: f64,
completed: usize,
n_total: usize,
) { ) {
if activation.is_full() || completed >= n_total { if *n_active >= max_workers || completed >= n_total { return; }
return;
}
// Expect roughly 1 core of extra efficiency per worker activated in the let eff = cpu_sample.cpu_efficiency(n_cores);
// last growth step (CPU-bound case); require at least CPU_SPAWN_THRESHOLD if eff >= threshold { return; } // CPU already saturated
// (20 %) of that expected gain before growing again. Scaling by the last
// step's size — not the cumulative total — keeps the bar meaningful
// regardless of how many workers are already active: growing by 8 should
// always take ~+1.6 cores to confirm, whether that's the 2nd growth step
// or the 20th.
let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
// Call both unconditionally (no `||` short-circuit): each sampler must // Check that the previous activation was beneficial enough.
// advance its own window every tick, regardless of what the other one // Going from k-1 → k workers, the minimum acceptable speedup is (k-1+0.2)/(k-1).
// reports, or it would starve behind whichever signal fires first. // For the very first extra worker (n_active == 1, no previous spawn), skip this
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold); // check: eff_at_last_spawn == 0 acts as the sentinel.
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD * activation.last_step() as f64); let last_spawn_was_beneficial = if *eff_at_last_spawn < 1e-9 {
if !(cpu_wants_more || io_wants_more) { true // first additional worker: no prior data to evaluate
return; } else {
} let k_before = (*n_active - 1) as f64;
let min_speedup = (k_before + 0.2) / k_before;
let actual_speedup = eff / *eff_at_last_spawn;
actual_speedup >= min_speedup
};
let grew = activation.grow(GROWTH_DIVISOR, n_total); if last_spawn_was_beneficial {
if grew > 0 { activate_tx.send(()).ok();
*eff_at_last_spawn = eff;
*n_active += 1;
*cpu_sample = CpuSample::now();
debug!( debug!(
"activated {} worker(s) — {}/{} active", "activated worker {}/{} — efficiency {:.0}%",
grew, n_active, max_workers, eff * 100.0,
activation.total(),
activation.max()
); );
} }
} }
+2 -4
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "obikmer" name = "obikmer"
version = "1.1.35" version = "1.1.17"
edition = "2024" edition = "2024"
[[bin]] [[bin]]
@@ -18,7 +18,7 @@ obikrope = { path = "../obikrope" }
obikpartitionner = { path = "../obikpartitionner" } obikpartitionner = { path = "../obikpartitionner" }
obisys = { path = "../obisys" } obisys = { path = "../obisys" }
obiskio = { path = "../obiskio" } obiskio = { path = "../obiskio" }
obikindex = { path = "../obikindex", default-features = false } obikindex = { path = "../obikindex" }
obitaxonomy = { path = "../obitaxonomy" } obitaxonomy = { path = "../obitaxonomy" }
obilayeredmap = { path = "../obilayeredmap" } obilayeredmap = { path = "../obilayeredmap" }
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
@@ -33,6 +33,4 @@ tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
pprof = { version = "0.13", features = ["prost-codec"], optional = true } pprof = { version = "0.13", features = ["prost-codec"], optional = true }
[features] [features]
default = ["numa"]
numa = ["obikindex/numa"]
profiling = ["dep:pprof"] profiling = ["dep:pprof"]
+159 -2
View File
@@ -96,5 +96,162 @@ impl<S: BitPartials> BitPartials for LayeredStore<S> {
// ── Tests ───────────────────────────────────────────────────────────────────── // ── Tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)] #[cfg(test)]
#[path = "tests/layered_store.rs"] mod tests {
mod tests; use super::*;
use obicompactvec::{
PersistentBitMatrix, PersistentBitMatrixBuilder,
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder,
};
use tempfile::tempdir;
fn make_int_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) {
let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap();
let mut b = PersistentCompactIntMatrixBuilder::new(n, &dir.path().join("counts")).unwrap();
for &col in cols {
let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
cb.close().unwrap();
}
b.close().unwrap();
let m = PersistentCompactIntMatrix::open(dir.path()).unwrap();
(dir, m)
}
fn make_bit_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) {
let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap();
let mut b = PersistentBitMatrixBuilder::new(n, &dir.path().join("presence")).unwrap();
for &col in cols {
let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
cb.close().unwrap();
}
b.close().unwrap();
let m = PersistentBitMatrix::open(dir.path()).unwrap();
(dir, m)
}
// ── ColumnWeights ─────────────────────────────────────────────────────────
#[test]
fn col_weights_sums_across_layers() {
// layer 0: col0=[1,2], col1=[3,4] → weights [3, 7]
// layer 1: col0=[10,0], col1=[0,10] → weights [10, 10]
// combined: [13, 17]
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[3, 4]]);
let (_d1, m1) = make_int_matrix(&[&[10, 0], &[0, 10]]);
let store = LayeredStore::new(vec![m0, m1]);
let w = store.col_weights();
assert_eq!(w[0], 13);
assert_eq!(w[1], 17);
}
#[test]
fn col_weights_bit_sums_across_layers() {
// layer 0: col0=[T,F,T], col1=[F,T,T] → counts [2, 2]
// layer 1: col0=[F,F,T], col1=[T,T,F] → counts [1, 2]
// combined: [3, 4]
let (_d0, m0) = make_bit_matrix(&[&[true, false, true], &[false, true, true]]);
let (_d1, m1) = make_bit_matrix(&[&[false, false, true], &[true, true, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let w = store.col_weights();
assert_eq!(w[0], 3);
assert_eq!(w[1], 4);
}
// ── CountPartials — layered (one partition) ───────────────────────────────
#[test]
fn layered_bray_matches_combined() {
// Split [1,2,3,4,5] across two layers; bray dist should equal direct computation
// on [1,2,3,4,5] for each column pair.
// col0=[1,2,3,4,5], col1=[5,4,3,2,1]
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]); // slots 0-1
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]); // slots 2-4
let store = LayeredStore::new(vec![m0, m1]);
// direct on full data
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let expected = CountPartials::bray_dist_matrix(&mf);
let got = CountPartials::bray_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "bray [0,1]");
assert!((got[[1, 0]] - expected[[1, 0]]).abs() < 1e-12, "bray [1,0]");
}
#[test]
fn layered_relfreq_bray_matches_combined() {
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]);
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let expected = CountPartials::relfreq_bray_dist_matrix(&mf);
let got = CountPartials::relfreq_bray_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "relfreq_bray [0,1]");
}
#[test]
fn layered_euclidean_matches_combined() {
let (_d0, m0) = make_int_matrix(&[&[3, 0], &[0, 4]]);
let (_d1, m1) = make_int_matrix(&[&[1, 1], &[2, 2]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 4, 2, 2]]);
let expected = CountPartials::euclidean_dist_matrix(&mf);
let got = CountPartials::euclidean_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "euclidean [0,1]");
}
// ── CountPartials — partitioned (LayeredStore<LayeredStore<_>>) ───────────
#[test]
fn partitioned_bray_matches_combined() {
// partition 0: slots [1,2,3,4,5] col0 vs col1
// partition 1: slots [10,20] col0 vs col1
let (_d0, p0) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let (_d1, p1) = make_int_matrix(&[&[10, 20], &[20, 10]]);
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
]);
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5, 10, 20], &[5, 4, 3, 2, 1, 20, 10]]);
let expected = CountPartials::bray_dist_matrix(&mf);
let got = CountPartials::bray_dist_matrix(&partitioned);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "partitioned bray [0,1]");
}
// ── BitPartials ───────────────────────────────────────────────────────────
#[test]
fn layered_jaccard_matches_combined() {
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[true, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true],
&[false, true, true, false],
]);
let expected = BitPartials::jaccard_dist_matrix(&mf);
let got = BitPartials::jaccard_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "jaccard [0,1]");
}
#[test]
fn layered_hamming_matches_combined() {
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[false, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true],
&[false, true, false, false],
]);
let expected = BitPartials::hamming_dist_matrix(&mf);
let got = BitPartials::hamming_dist_matrix(&store);
assert_eq!(got[[0, 1]], expected[[0, 1]], "hamming [0,1]");
}
}
@@ -1,381 +0,0 @@
use super::*;
use obicompactvec::{
PersistentBitMatrix, PersistentBitMatrixBuilder,
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder,
};
use tempfile::tempdir;
fn make_int_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) {
let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap();
let mut b = PersistentCompactIntMatrixBuilder::new(n, &dir.path().join("counts")).unwrap();
for &col in cols {
let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
cb.close().unwrap();
}
b.close().unwrap();
let m = PersistentCompactIntMatrix::open(dir.path()).unwrap();
(dir, m)
}
fn make_bit_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) {
let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap();
let mut b = PersistentBitMatrixBuilder::new(n, &dir.path().join("presence")).unwrap();
for &col in cols {
let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
cb.close().unwrap();
}
b.close().unwrap();
let m = PersistentBitMatrix::open(dir.path()).unwrap();
(dir, m)
}
// ── ColumnWeights ─────────────────────────────────────────────────────────
#[test]
fn col_weights_sums_across_layers() {
// layer 0: col0=[1,2], col1=[3,4] → weights [3, 7]
// layer 1: col0=[10,0], col1=[0,10] → weights [10, 10]
// combined: [13, 17]
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[3, 4]]);
let (_d1, m1) = make_int_matrix(&[&[10, 0], &[0, 10]]);
let store = LayeredStore::new(vec![m0, m1]);
let w = store.col_weights();
assert_eq!(w[0], 13);
assert_eq!(w[1], 17);
}
#[test]
fn col_weights_bit_sums_across_layers() {
// layer 0: col0=[T,F,T], col1=[F,T,T] → counts [2, 2]
// layer 1: col0=[F,F,T], col1=[T,T,F] → counts [1, 2]
// combined: [3, 4]
let (_d0, m0) = make_bit_matrix(&[&[true, false, true], &[false, true, true]]);
let (_d1, m1) = make_bit_matrix(&[&[false, false, true], &[true, true, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let w = store.col_weights();
assert_eq!(w[0], 3);
assert_eq!(w[1], 4);
}
// ── CountPartials — layered (one partition) ───────────────────────────────
#[test]
fn layered_bray_matches_combined() {
// Split [1,2,3,4,5] across two layers; bray dist should equal direct computation
// on [1,2,3,4,5] for each column pair.
// col0=[1,2,3,4,5], col1=[5,4,3,2,1]
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]); // slots 0-1
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]); // slots 2-4
let store = LayeredStore::new(vec![m0, m1]);
// direct on full data
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let expected = CountPartials::bray_dist_matrix(&mf);
let got = CountPartials::bray_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "bray [0,1]");
assert!((got[[1, 0]] - expected[[1, 0]]).abs() < 1e-12, "bray [1,0]");
}
#[test]
fn layered_relfreq_bray_matches_combined() {
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]);
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let expected = CountPartials::relfreq_bray_dist_matrix(&mf);
let got = CountPartials::relfreq_bray_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "relfreq_bray [0,1]");
}
#[test]
fn layered_euclidean_matches_combined() {
let (_d0, m0) = make_int_matrix(&[&[3, 0], &[0, 4]]);
let (_d1, m1) = make_int_matrix(&[&[1, 1], &[2, 2]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 4, 2, 2]]);
let expected = CountPartials::euclidean_dist_matrix(&mf);
let got = CountPartials::euclidean_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "euclidean [0,1]");
}
// ── CountPartials — partitioned (LayeredStore<LayeredStore<_>>) ───────────
#[test]
fn partitioned_bray_matches_combined() {
// partition 0: slots [1,2,3,4,5] col0 vs col1
// partition 1: slots [10,20] col0 vs col1
let (_d0, p0) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let (_d1, p1) = make_int_matrix(&[&[10, 20], &[20, 10]]);
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
]);
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5, 10, 20], &[5, 4, 3, 2, 1, 20, 10]]);
let expected = CountPartials::bray_dist_matrix(&mf);
let got = CountPartials::bray_dist_matrix(&partitioned);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "partitioned bray [0,1]");
}
#[test]
fn partitioned_threshold_jaccard_off_diagonal_is_pairwise() {
// 3 genomes, 2 partitions, 1 layer each — mirrors distance.rs's
// LayeredStore<LayeredStore<PersistentCompactIntMatrix>> shape.
// partition 0: col0=[3,0], col1=[0,3], col2=[3,3]
// partition 1: col0=[1,1], col1=[1,0], col2=[0,1]
let (_d0, p0) = make_int_matrix(&[&[3, 0], &[0, 3], &[3, 3]]);
let (_d1, p1) = make_int_matrix(&[&[1, 1], &[1, 0], &[0, 1]]);
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
]);
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 3, 1, 0], &[3, 3, 0, 1]]);
let threshold = 1u32;
let (inter_p, union_p) = CountPartials::partial_threshold_jaccard(&partitioned, threshold);
let (inter_f, union_f) = CountPartials::partial_threshold_jaccard(&mf, threshold);
let n = 3;
for i in 0..n {
for j in 0..n {
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
}
}
}
#[test]
fn partitioned_threshold_jaccard_packed_off_diagonal_is_pairwise() {
// Same as `partitioned_threshold_jaccard_off_diagonal_is_pairwise` but
// each partition matrix is packed into a single .pcmx file first —
// the on-disk format actually used in production after `pack_matrices`.
use obicompactvec::pack_compact_int_matrix;
let (d0, _p0) = make_int_matrix(&[&[3, 0], &[0, 3], &[3, 3]]);
pack_compact_int_matrix(&d0.path().join("counts")).unwrap();
let p0 = PersistentCompactIntMatrix::open(d0.path()).unwrap();
let (d1, _p1) = make_int_matrix(&[&[1, 1], &[1, 0], &[0, 1]]);
pack_compact_int_matrix(&d1.path().join("counts")).unwrap();
let p1 = PersistentCompactIntMatrix::open(d1.path()).unwrap();
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
]);
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 3, 1, 0], &[3, 3, 0, 1]]);
let threshold = 1u32;
let (inter_p, union_p) = CountPartials::partial_threshold_jaccard(&partitioned, threshold);
let (inter_f, union_f) = CountPartials::partial_threshold_jaccard(&mf, threshold);
let n = 3;
for i in 0..n {
for j in 0..n {
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
}
}
}
#[test]
fn partitioned_multilayer_threshold_jaccard_off_diagonal_is_pairwise() {
// 2 partitions, 2 layers each — the shape production indexes actually
// have (MPHF collision layers within a partition).
// partition 0, layer 0: col0=[3,0], col1=[0,3], col2=[3,3]
// partition 0, layer 1: col0=[2,0], col1=[0,0], col2=[2,0]
// partition 1, layer 0: col0=[1,1], col1=[1,0], col2=[0,1]
// partition 1, layer 1: col0=[0,5], col1=[5,5], col2=[0,0]
let (_d0a, p0a) = make_int_matrix(&[&[3, 0], &[0, 3], &[3, 3]]);
let (_d0b, p0b) = make_int_matrix(&[&[2, 0], &[0, 0], &[2, 0]]);
let (_d1a, p1a) = make_int_matrix(&[&[1, 1], &[1, 0], &[0, 1]]);
let (_d1b, p1b) = make_int_matrix(&[&[0, 5], &[5, 5], &[0, 0]]);
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0a, p0b]),
LayeredStore::new(vec![p1a, p1b]),
]);
// Flattened equivalent: concatenate every layer's slots into one matrix.
let (_df, mf) = make_int_matrix(&[
&[3, 0, 2, 0, 1, 1, 0, 5],
&[0, 3, 0, 0, 1, 0, 5, 5],
&[3, 3, 2, 0, 0, 1, 0, 0],
]);
let threshold = 1u32;
let (inter_p, union_p) = CountPartials::partial_threshold_jaccard(&partitioned, threshold);
let (inter_f, union_f) = CountPartials::partial_threshold_jaccard(&mf, threshold);
let n = 3;
for i in 0..n {
for j in 0..n {
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
}
}
}
// ── BitPartials ───────────────────────────────────────────────────────────
#[test]
fn layered_jaccard_matches_combined() {
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[true, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true],
&[false, true, true, false],
]);
let expected = BitPartials::jaccard_dist_matrix(&mf);
let got = BitPartials::jaccard_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "jaccard [0,1]");
}
#[test]
fn layered_hamming_matches_combined() {
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[false, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true],
&[false, true, false, false],
]);
let expected = BitPartials::hamming_dist_matrix(&mf);
let got = BitPartials::hamming_dist_matrix(&store);
assert_eq!(got[[0, 1]], expected[[0, 1]], "hamming [0,1]");
}
#[test]
fn partitioned_bit_jaccard_off_diagonal_is_pairwise() {
// Same shape as the count-based `partitioned_multilayer_threshold_jaccard_*`
// tests, but for the presence/bit path (`with_counts = false` — what
// `all_specifics` actually uses in production).
// 4 genomes, 3 partitions, 2 layers in the last one.
let (_d0, p0) = make_bit_matrix(&[
&[true, false, true],
&[false, true, true],
&[true, true, false],
&[false, false, true],
]);
let (_d1, p1) = make_bit_matrix(&[
&[true, true],
&[false, true],
&[true, false],
&[true, true],
]);
let (_d2a, p2a) = make_bit_matrix(&[
&[false, true],
&[true, true],
&[false, false],
&[true, false],
]);
let (_d2b, p2b) = make_bit_matrix(&[
&[true],
&[false],
&[true],
&[true],
]);
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
LayeredStore::new(vec![p2a, p2b]),
]);
// Flattened equivalent: concatenate every partition/layer's slots.
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true, true, false, true, true],
&[false, true, true, false, true, true, true, false],
&[true, true, false, true, false, false, false, true],
&[false, false, true, true, true, true, false, true],
]);
let (inter_p, union_p) = BitPartials::partial_jaccard(&partitioned);
let (inter_f, union_f) = BitPartials::partial_jaccard(&mf);
let n = 4;
for i in 0..n {
for j in 0..n {
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
}
}
}
#[test]
fn partitioned_bit_jaccard_packed_off_diagonal_is_pairwise() {
// Same as `partitioned_bit_jaccard_off_diagonal_is_pairwise` but every
// partition's presence matrix is packed into a single .pbmx file —
// the on-disk format actually used in production after `pack_matrices`.
use obicompactvec::pack_bit_matrix;
let (d0, _p0) = make_bit_matrix(&[
&[true, false, true],
&[false, true, true],
&[true, true, false],
&[false, false, true],
]);
pack_bit_matrix(&d0.path().join("presence")).unwrap();
let p0 = PersistentBitMatrix::open(d0.path()).unwrap();
let (d1, _p1) = make_bit_matrix(&[
&[true, true],
&[false, true],
&[true, false],
&[true, true],
]);
pack_bit_matrix(&d1.path().join("presence")).unwrap();
let p1 = PersistentBitMatrix::open(d1.path()).unwrap();
let (d2a, _p2a) = make_bit_matrix(&[
&[false, true],
&[true, true],
&[false, false],
&[true, false],
]);
pack_bit_matrix(&d2a.path().join("presence")).unwrap();
let p2a = PersistentBitMatrix::open(d2a.path()).unwrap();
let (d2b, _p2b) = make_bit_matrix(&[
&[true],
&[false],
&[true],
&[true],
]);
pack_bit_matrix(&d2b.path().join("presence")).unwrap();
let p2b = PersistentBitMatrix::open(d2b.path()).unwrap();
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
LayeredStore::new(vec![p2a, p2b]),
]);
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true, true, false, true, true],
&[false, true, true, false, true, true, true, false],
&[true, true, false, true, false, false, false, true],
&[false, false, true, true, true, true, false, true],
]);
let (inter_p, union_p) = BitPartials::partial_jaccard(&partitioned);
let (inter_f, union_f) = BitPartials::partial_jaccard(&mf);
let n = 4;
for i in 0..n {
for j in 0..n {
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
}
}
}
+100 -301
View File
@@ -4,7 +4,7 @@ use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{ProgressBar, ProgressStyle};
use tracing::{debug, info, warn}; use tracing::{info, warn};
const BRAILLE: &[&str] = &["", "", "", "", "", "", "", "", "", ""]; const BRAILLE: &[&str] = &["", "", "", "", "", "", "", "", "", ""];
@@ -14,25 +14,24 @@ const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧
/// a TTY (e.g. HPC job logs): every 10% for bounded bars, every ~10 s for /// a TTY (e.g. HPC job logs): every 10% for bounded bars, every ~10 s for
/// spinners (throttled on `set_message`). /// spinners (throttled on `set_message`).
pub struct TracedBar { pub struct TracedBar {
pb: ProgressBar, pb: ProgressBar,
label: String, label: String,
unit: String, unit: String,
total: u64, // 0 for spinners total: u64, // 0 for spinners
start: Instant, // creation time, for spinner throttling start: Instant, // creation time, for spinner throttling
last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet
last_log_ms: AtomicU64, // ms since `start` at last spinner log last_log_ms: AtomicU64, // ms since `start` at last spinner log
} }
impl TracedBar { impl TracedBar {
pub fn inc(&self, delta: u64) { pub fn inc(&self, delta: u64) {
self.pb.inc(delta); self.pb.inc(delta);
if self.pb.is_hidden() && self.total > 0 { if self.pb.is_hidden() && self.total > 0 {
let pos = self.pb.position(); let pos = self.pb.position();
let pct10 = (pos * 10) / self.total; // 0..=10 let pct10 = (pos * 10) / self.total; // 0..=10
let last = self.last_pct.load(Ordering::Relaxed); let last = self.last_pct.load(Ordering::Relaxed);
if pct10 > last if pct10 > last
&& self && self.last_pct
.last_pct
.compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed) .compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed)
.is_ok() .is_ok()
{ {
@@ -50,14 +49,14 @@ impl TracedBar {
let msg = msg.into(); let msg = msg.into();
if self.pb.is_hidden() { if self.pb.is_hidden() {
if self.total > 0 { if self.total > 0 {
debug!(stage = %self.label, "{msg}"); // bounded bar: always log (already rate-limited by 10% threshold in inc)
info!(stage = %self.label, "{msg}");
} else { } else {
// spinner: throttle to ~10 s // spinner: throttle to ~10 s
let now_ms = self.start.elapsed().as_millis() as u64; let now_ms = self.start.elapsed().as_millis() as u64;
let last = self.last_log_ms.load(Ordering::Relaxed); let last = self.last_log_ms.load(Ordering::Relaxed);
if now_ms >= last + 10_000 if now_ms >= last + 10_000
&& self && self.last_log_ms
.last_log_ms
.compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed) .compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed)
.is_ok() .is_ok()
{ {
@@ -84,13 +83,8 @@ pub fn spinner(label: &str) -> TracedBar {
); );
pb.enable_steady_tick(Duration::from_millis(100)); pb.enable_steady_tick(Duration::from_millis(100));
TracedBar { TracedBar {
pb, pb, label: label.to_string(), unit: String::new(), total: 0,
label: label.to_string(), start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
unit: String::new(),
total: 0,
start: Instant::now(),
last_pct: AtomicU64::new(0),
last_log_ms: AtomicU64::new(0),
} }
} }
@@ -107,13 +101,8 @@ pub fn progress_bar(label: &str, n: u64, unit: &str) -> TracedBar {
); );
pb.enable_steady_tick(Duration::from_millis(100)); pb.enable_steady_tick(Duration::from_millis(100));
TracedBar { TracedBar {
pb, pb, label: label.to_string(), unit: unit.to_string(), total: n,
label: label.to_string(), start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
unit: unit.to_string(),
total: n,
start: Instant::now(),
last_pct: AtomicU64::new(0),
last_log_ms: AtomicU64::new(0),
} }
} }
@@ -215,19 +204,13 @@ fn tv_to_secs(tv: timeval) -> f64 {
} }
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
fn rss_to_bytes(ru: &rusage) -> u64 { fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 }
ru.ru_maxrss as u64
}
#[cfg(not(target_os = "macos"))] #[cfg(not(target_os = "macos"))]
fn rss_to_bytes(ru: &rusage) -> u64 { fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 }
ru.ru_maxrss as u64 * 1024
}
// Monotonically increasing counters — negative delta would be a kernel bug. // Monotonically increasing counters — negative delta would be a kernel bug.
fn delta(end: i64, start: i64) -> u64 { fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
(end - start).max(0) as u64
}
// ── CpuSample ───────────────────────────────────────────────────────────────── // ── CpuSample ─────────────────────────────────────────────────────────────────
@@ -235,151 +218,31 @@ fn delta(end: i64, start: i64) -> u64 {
/// Use [`cpu_efficiency`](Self::cpu_efficiency) to measure the fraction of /// Use [`cpu_efficiency`](Self::cpu_efficiency) to measure the fraction of
/// available cores used since the snapshot was taken. /// available cores used since the snapshot was taken.
pub struct CpuSample { pub struct CpuSample {
wall: Instant, wall: Instant,
user_secs: f64, user_secs: f64,
sys_secs: f64, sys_secs: f64,
previous: f64,
} }
impl CpuSample { impl CpuSample {
pub fn now() -> Self { pub fn now() -> Self {
let ru = get_rusage(); let ru = get_rusage();
Self { Self {
wall: Instant::now(), wall: Instant::now(),
user_secs: tv_to_secs(ru.ru_utime), user_secs: tv_to_secs(ru.ru_utime),
sys_secs: tv_to_secs(ru.ru_stime), sys_secs: tv_to_secs(ru.ru_stime),
previous: 0.0,
} }
} }
/// (user_delta + sys_delta) / (wall_delta × n_cores) since this snapshot. /// (user_delta + sys_delta) / (wall_delta × n_cores) since this snapshot.
/// Returns 0.0 if less than 100 ms have elapsed (too noisy). /// Returns 0.0 if less than 100 ms have elapsed (too noisy).
pub fn cpu_efficiency(&self, n_cores: usize) -> f64 { pub fn cpu_efficiency(&self, n_cores: usize) -> f64 {
let ru = get_rusage(); let ru = get_rusage();
let wall = self.wall.elapsed().as_secs_f64(); let wall = self.wall.elapsed().as_secs_f64();
if wall < 0.1 { if wall < 0.1 { return 0.0; }
return 0.0; let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs)
} + (tv_to_secs(ru.ru_stime) - self.sys_secs);
let cpu =
(tv_to_secs(ru.ru_utime) - self.user_secs) + (tv_to_secs(ru.ru_stime) - self.sys_secs);
cpu / (wall * n_cores as f64) cpu / (wall * n_cores as f64)
} }
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let delta_wall = self.wall.elapsed().as_secs_f64();
if delta_wall < 0.1 {
// Window too short to be meaningful — leave state untouched so it
// keeps accumulating until a real sample can be taken.
return false;
}
let n = CpuSample::now();
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
let efficiency = delta_ru / delta_wall;
let activate = 0f64.max(efficiency - self.previous) >= threshold;
debug!(
"Do I activate : {} -> {} = {} Activate: {}",
self.previous,
efficiency,
0f64.max(efficiency - self.previous),
activate
);
self.previous = efficiency;
self.user_secs = n.user_secs;
self.sys_secs = n.sys_secs;
self.wall = n.wall;
activate
}
}
// ── IoSample ──────────────────────────────────────────────────────────────────
/// Snapshot of process-wide block I/O (bytes read + written) + wall clock.
///
/// Same activation protocol as [`CpuSample`], but the growth check in
/// [`do_i_activate`](Self::do_i_activate) is *relative* rather than absolute:
/// raw I/O throughput has no portable scale across storage devices, unlike a
/// core count.
pub struct IoSample {
wall: Instant,
bytes: u64,
previous_rate: f64,
}
impl IoSample {
pub fn now() -> Self {
Self {
wall: Instant::now(),
bytes: Self::read_bytes(),
previous_rate: 0.0,
}
}
/// Bytes actually submitted to the block layer (read + write), summed
/// process-wide. Returns 0 if unavailable — degrades gracefully to a
/// signal that never triggers activation (CPU-only heuristic).
#[cfg(target_os = "linux")]
fn read_bytes() -> u64 {
let Ok(io) = std::fs::read_to_string("/proc/self/io") else {
return 0;
};
io.lines()
.filter_map(|l| {
l.strip_prefix("read_bytes: ")
.or_else(|| l.strip_prefix("write_bytes: "))
})
.filter_map(|v| v.trim().parse::<u64>().ok())
.sum()
}
#[cfg(target_os = "macos")]
fn read_bytes() -> u64 {
use libc::{RUSAGE_INFO_V4, getpid, proc_pid_rusage, rusage_info_v4};
let mut info: rusage_info_v4 = unsafe { std::mem::zeroed() };
let ret =
unsafe { proc_pid_rusage(getpid(), RUSAGE_INFO_V4, &mut info as *mut _ as *mut _) };
if ret != 0 {
return 0;
}
info.ri_diskio_bytesread + info.ri_diskio_byteswritten
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
fn read_bytes() -> u64 {
0
}
/// Same protocol as [`CpuSample::do_i_activate`] (0.1 s minimum window,
/// state untouched on early return), but growth is measured relative to
/// the previous rate. `threshold` is a fraction, e.g. `0.2` for a 20 %
/// increase in throughput since the last real sample.
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let elapsed = self.wall.elapsed().as_secs_f64();
if elapsed < 0.1 {
return false;
}
let n = Self::read_bytes();
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
let activate = if self.previous_rate == 0.0 {
rate > 0.0 // bootstrap: any measured throughput is signal enough
} else {
(rate - self.previous_rate) / self.previous_rate >= threshold
};
debug!(
"Do I activate (I/O) : {} -> {} Activate: {}",
self.previous_rate, rate, activate
);
self.previous_rate = rate;
self.bytes = n;
self.wall = Instant::now();
activate
}
} }
// ── public API ──────────────────────────────────────────────────────────────── // ── public API ────────────────────────────────────────────────────────────────
@@ -388,37 +251,33 @@ impl IoSample {
#[must_use = "call .stop() to record the stage"] #[must_use = "call .stop() to record the stage"]
pub struct Stage { pub struct Stage {
label: String, label: String,
wall: Instant, wall: Instant,
ru: rusage, ru: rusage,
} }
impl Stage { impl Stage {
pub fn start(label: impl Into<String>) -> Self { pub fn start(label: impl Into<String>) -> Self {
let label = label.into(); let label = label.into();
info!(stage = %label, "started"); info!(stage = %label, "started");
Self { Self { label, wall: Instant::now(), ru: get_rusage() }
label,
wall: Instant::now(),
ru: get_rusage(),
}
} }
pub fn stop(self) -> StageStats { pub fn stop(self) -> StageStats {
let wall_secs = self.wall.elapsed().as_secs_f64(); let wall_secs = self.wall.elapsed().as_secs_f64();
let end = get_rusage(); let end = get_rusage();
let stats = StageStats { let stats = StageStats {
label: self.label, label: self.label,
wall_secs, wall_secs,
user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime), user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime),
sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime), sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime),
max_rss_bytes: rss_to_bytes(&end), max_rss_bytes: rss_to_bytes(&end),
minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64), minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64),
major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64), major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64),
vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64), vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64),
invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64), invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64),
in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64), in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64),
out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64), out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64),
swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64), swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64),
}; };
info!( info!(
stage = %stats.label, stage = %stats.label,
@@ -440,30 +299,27 @@ impl Stage {
/// Per-stage efficiency metrics collected from `getrusage(RUSAGE_SELF)` deltas. /// Per-stage efficiency metrics collected from `getrusage(RUSAGE_SELF)` deltas.
pub struct StageStats { pub struct StageStats {
pub label: String, pub label: String,
pub wall_secs: f64, pub wall_secs: f64,
pub user_secs: f64, pub user_secs: f64,
pub sys_secs: f64, pub sys_secs: f64,
/// Peak RSS at end of stage (bytes). ru_maxrss is a process-lifetime maximum, /// Peak RSS at end of stage (bytes). ru_maxrss is a process-lifetime maximum,
/// so this reflects the high-water mark up to and including this stage. /// so this reflects the high-water mark up to and including this stage.
pub max_rss_bytes: u64, pub max_rss_bytes: u64,
pub minor_faults: u64, pub minor_faults: u64,
pub major_faults: u64, pub major_faults: u64,
pub vol_ctx: u64, // voluntary context switches pub vol_ctx: u64, // voluntary context switches
pub invol_ctx: u64, // involuntary context switches pub invol_ctx: u64, // involuntary context switches
pub in_blocks: u64, // filesystem block reads (after page cache) pub in_blocks: u64, // filesystem block reads (after page cache)
pub out_blocks: u64, // filesystem block writes pub out_blocks: u64, // filesystem block writes
pub swaps: u64, pub swaps: u64,
} }
impl StageStats { impl StageStats {
/// (user + sys) / wall — effective thread count utilisation. /// (user + sys) / wall — effective thread count utilisation.
pub fn parallelism(&self) -> f64 { pub fn parallelism(&self) -> f64 {
if self.wall_secs > 1e-9 { if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs }
(self.user_secs + self.sys_secs) / self.wall_secs else { 0.0 }
} else {
0.0
}
} }
/// parallelism / n_cores — fraction of available CPU power used (0..1+). /// parallelism / n_cores — fraction of available CPU power used (0..1+).
@@ -479,33 +335,25 @@ pub struct Reporter {
} }
impl Reporter { impl Reporter {
pub fn new() -> Self { pub fn new() -> Self { Self::default() }
Self::default() pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); }
} pub fn stages(&self) -> &[StageStats] { &self.stages }
pub fn push(&mut self, stats: StageStats) {
self.stages.push(stats);
}
pub fn stages(&self) -> &[StageStats] {
&self.stages
}
/// Print the summary to stderr. /// Print the summary to stderr.
pub fn print(&self) { pub fn print(&self) { eprint!("{self}"); }
eprint!("{self}");
}
} }
// ── diagnosis ───────────────────────────────────────────────────────────────── // ── diagnosis ─────────────────────────────────────────────────────────────────
struct Diagnosis { struct Diagnosis {
tag: &'static str, tag: &'static str,
detail: Option<String>, detail: Option<String>,
} }
// Thresholds are intentionally conservative to avoid false positives. // Thresholds are intentionally conservative to avoid false positives.
fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis { fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
let eff = s.efficiency(n_cores); let eff = s.efficiency(n_cores);
let cpu_pct = eff * 100.0; let cpu_pct = eff * 100.0;
let io_ops = s.in_blocks + s.out_blocks; let io_ops = s.in_blocks + s.out_blocks;
// swaps > 0 is the only reliable cross-platform indicator of true RAM exhaustion. // swaps > 0 is the only reliable cross-platform indicator of true RAM exhaustion.
// ru_majflt is intentionally excluded: on macOS it counts all file-backed mmap // ru_majflt is intentionally excluded: on macOS it counts all file-backed mmap
@@ -539,43 +387,26 @@ fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
)), )),
}; };
} }
Diagnosis { Diagnosis { tag: "", detail: None }
tag: "",
detail: None,
}
} }
// ── display helpers ─────────────────────────────────────────────────────────── // ── display helpers ───────────────────────────────────────────────────────────
fn fmt_secs(s: f64) -> String { fn fmt_secs(s: f64) -> String {
if s >= 100.0 { if s >= 100.0 { format!("{:.0}s", s) }
format!("{:.0}s", s) else if s >= 10.0 { format!("{:.1}s", s) }
} else if s >= 10.0 { else if s >= 1.0 { format!("{:.2}s", s) }
format!("{:.1}s", s) else { format!("{:.0}ms", s * 1000.0) }
} else if s >= 1.0 {
format!("{:.2}s", s)
} else {
format!("{:.0}ms", s * 1000.0)
}
} }
fn fmt_bytes(b: u64) -> String { fn fmt_bytes(b: u64) -> String {
if b >= 1 << 30 { if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) }
} else if b >= 1 << 20 { else { format!("{:.0} KB", b as f64 / 1024.0) }
format!("{:.0} MB", b as f64 / (1u64 << 20) as f64)
} else {
format!("{:.0} KB", b as f64 / 1024.0)
}
} }
fn fmt_efficiency(par: f64, n_cores: usize) -> String { fn fmt_efficiency(par: f64, n_cores: usize) -> String {
format!( format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0)
"{:.1}×/{} ({:.0}%)",
par,
n_cores,
par / n_cores as f64 * 100.0
)
} }
// ── Display ─────────────────────────────────────────────────────────────────── // ── Display ───────────────────────────────────────────────────────────────────
@@ -583,8 +414,8 @@ fn fmt_efficiency(par: f64, n_cores: usize) -> String {
// ── MemoryBudget ────────────────────────────────────────────────────────────── // ── MemoryBudget ──────────────────────────────────────────────────────────────
struct BudgetInner { struct BudgetInner {
remaining: u64, remaining: u64,
active: usize, active: usize,
peak_active: usize, peak_active: usize,
} }
@@ -594,8 +425,8 @@ struct BudgetInner {
/// completion. Non-deadlock guarantee: when no worker is active the next /// completion. Non-deadlock guarantee: when no worker is active the next
/// acquire always succeeds regardless of cost vs. remaining budget. /// acquire always succeeds regardless of cost vs. remaining budget.
pub struct MemoryBudget { pub struct MemoryBudget {
total: u64, total: u64,
inner: Mutex<BudgetInner>, inner: Mutex<BudgetInner>,
condvar: Condvar, condvar: Condvar,
} }
@@ -603,11 +434,7 @@ impl MemoryBudget {
pub fn new(total: u64) -> Self { pub fn new(total: u64) -> Self {
Self { Self {
total, total,
inner: Mutex::new(BudgetInner { inner: Mutex::new(BudgetInner { remaining: total, active: 0, peak_active: 0 }),
remaining: total,
active: 0,
peak_active: 0,
}),
condvar: Condvar::new(), condvar: Condvar::new(),
} }
} }
@@ -616,9 +443,9 @@ impl MemoryBudget {
let mut g = self.inner.lock().unwrap(); let mut g = self.inner.lock().unwrap();
loop { loop {
if g.active == 0 || g.remaining >= cost { if g.active == 0 || g.remaining >= cost {
g.remaining = g.remaining.saturating_sub(cost); g.remaining = g.remaining.saturating_sub(cost);
g.active += 1; g.active += 1;
g.peak_active = g.peak_active.max(g.active); g.peak_active = g.peak_active.max(g.active);
return; return;
} }
g = self.condvar.wait(g).unwrap(); g = self.condvar.wait(g).unwrap();
@@ -628,66 +455,47 @@ impl MemoryBudget {
pub fn release(&self, cost: u64) { pub fn release(&self, cost: u64) {
let mut g = self.inner.lock().unwrap(); let mut g = self.inner.lock().unwrap();
g.remaining = (g.remaining + cost).min(self.total); g.remaining = (g.remaining + cost).min(self.total);
g.active -= 1; g.active -= 1;
self.condvar.notify_all(); self.condvar.notify_all();
} }
pub fn total(&self) -> u64 { pub fn total(&self) -> u64 { self.total }
self.total pub fn active(&self) -> usize { self.inner.lock().unwrap().active }
} pub fn remaining(&self) -> u64 { self.inner.lock().unwrap().remaining }
pub fn active(&self) -> usize { pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active }
self.inner.lock().unwrap().active
}
pub fn remaining(&self) -> u64 {
self.inner.lock().unwrap().remaining
}
pub fn peak_active(&self) -> usize {
self.inner.lock().unwrap().peak_active
}
} }
// ── Display ─────────────────────────────────────────────────────────────────── // ── Display ───────────────────────────────────────────────────────────────────
impl fmt::Display for Reporter { impl fmt::Display for Reporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.stages.is_empty() { if self.stages.is_empty() { return Ok(()); }
return Ok(());
}
let n_cores = std::thread::available_parallelism() let n_cores = std::thread::available_parallelism()
.map(|n| n.get()) .map(|n| n.get())
.unwrap_or(1); .unwrap_or(1);
// column widths // column widths
let nw = self let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5);
.stages
.iter()
.map(|s| s.label.len())
.max()
.unwrap_or(5)
.max(5);
// efficiency col: worst-case width for this run's n_cores value // efficiency col: worst-case width for this run's n_cores value
let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len(); let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len();
let sep_w = nw + 2 + 7 + 2 + ew + 2 + 8 + 2 + 12; let sep_w = nw + 2 + 7 + 2 + ew + 2 + 8 + 2 + 12;
let sep = "".repeat(sep_w); let sep = "".repeat(sep_w);
// header // header
writeln!( writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} status",
f, "stage", "wall", "efficiency", "peak RSS")?;
"{:<nw$} {:>7} {:>ew$} {:>8} status",
"stage", "wall", "efficiency", "peak RSS"
)?;
writeln!(f, "{sep}")?; writeln!(f, "{sep}")?;
// compute all diagnoses up front (needed for both table and footnotes) // compute all diagnoses up front (needed for both table and footnotes)
let diagnoses: Vec<Diagnosis> = self.stages.iter().map(|s| diagnose(s, n_cores)).collect(); let diagnoses: Vec<Diagnosis> = self.stages.iter()
.map(|s| diagnose(s, n_cores))
.collect();
// per-stage rows // per-stage rows
for (s, d) in self.stages.iter().zip(diagnoses.iter()) { for (s, d) in self.stages.iter().zip(diagnoses.iter()) {
writeln!( writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} {}",
f,
"{:<nw$} {:>7} {:>ew$} {:>8} {}",
s.label, s.label,
fmt_secs(s.wall_secs), fmt_secs(s.wall_secs),
fmt_efficiency(s.parallelism(), n_cores), fmt_efficiency(s.parallelism(), n_cores),
@@ -697,21 +505,14 @@ impl fmt::Display for Reporter {
} }
// totals // totals
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>(); let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>(); let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>(); let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
let trss = self let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0);
.stages
.iter()
.map(|s| s.max_rss_bytes)
.max()
.unwrap_or(0);
let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 }; let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 };
writeln!(f, "{sep}")?; writeln!(f, "{sep}")?;
writeln!( writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8}",
f,
"{:<nw$} {:>7} {:>ew$} {:>8}",
"TOTAL", "TOTAL",
fmt_secs(tw), fmt_secs(tw),
fmt_efficiency(tpar, n_cores), fmt_efficiency(tpar, n_cores),
@@ -719,9 +520,7 @@ impl fmt::Display for Reporter {
)?; )?;
// bottleneck footnotes (only if at least one anomaly detected) // bottleneck footnotes (only if at least one anomaly detected)
let bottlenecks: Vec<(&str, &str)> = self let bottlenecks: Vec<(&str, &str)> = self.stages.iter()
.stages
.iter()
.zip(diagnoses.iter()) .zip(diagnoses.iter())
.filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det))) .filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det)))
.collect(); .collect();