27 Commits

Author SHA1 Message Date
Eric Coissac 2b37e8aac4 fix(bitmatrix): explicitly compute diagonal entries for self-similarity
Release / create-release (push) Successful in 2m26s
Release / build-linux-x86_64 (push) Successful in 8m13s
Release / build-macos-arm64 (push) Failing after 30s
CI / build (pull_request) Successful in 3m21s
The pairwise matrix functions now explicitly calculate and overwrite diagonal entries using `f(i,i)`, replacing previous implicit symmetric mirroring or default values. Documentation has been updated to clarify that diagonals represent self-comparison weights, ensuring accurate self-similarity calculations. Additionally, the obikmer crate version has been bumped to 1.1.34.
2026-07-03 13:04:40 +02:00
Eric Coissac 67b4e4da53 refactor(numa): replace flat runner with per-node activation channels
Shifts the NUMA-aware runner from a flat, round-robin model to a per-node architecture using dedicated `NodeActivation` channels. Replaces absolute deltas with relative scaling based on the previous growth step's worker count, decoupling growth from node count to fix slow ramp-up and enforce per-node fairness. Updates architecture documentation to reflect these changes and focus tuning questions on `INITIAL`/`GROWTH_DIVISOR` parameters for I/O-bound validation.
2026-07-03 13:03:31 +02:00
coissac 66ab4c6db1 Merge pull request 'feat(numa): introduce I/O sampling to prevent activation stalls' (#55) from push-ooruxnkktvvz into main
Reviewed-on: #55
2026-07-02 09:36:19 +00:00
Eric Coissac f84dd539bf feat(numa): introduce I/O sampling to prevent activation stalls
Release / create-release (push) Successful in 2m25s
Release / build-linux-x86_64 (push) Successful in 8m47s
Release / build-macos-arm64 (push) Failing after 31s
CI / build (pull_request) Successful in 3m30s
Replaces the monolithic CPU scaling threshold with separate CPU and I/O spawn thresholds. Introduces an `IoSample` struct with platform-specific byte reading and a relative throughput growth heuristic. Adds a 0.1s wall-clock guard to `CpuSample` to suppress artificial efficiency spikes, and updates `maybe_activate` to trigger worker scaling when either resource indicates headroom. Bumps `obikmer` to v1.1.33 and updates architecture documentation.
2026-07-02 10:07:22 +02:00
coissac 6378734e1c Merge pull request 'fix(obisys): remove activation guard to always update metrics' (#54) from push-vkloynurrxzu into main
Reviewed-on: #54
2026-07-01 18:34:10 +00:00
Eric Coissac b3a617cce1 fix(obisys): remove activation guard to always update metrics
Release / create-release (push) Successful in 2m26s
CI / build (pull_request) Successful in 3m35s
Release / build-linux-x86_64 (push) Successful in 8m9s
Release / build-macos-arm64 (push) Failing after 30s
Removes the `if activate` conditional in `src/obisys/src/lib.rs`, making debug logging and state updates for performance counters execute unconditionally. This ensures tracking metrics are continuously refreshed regardless of the activation threshold. Also bumps the `obikmer` dependency version.
2026-07-01 20:32:56 +02:00
coissac 2080e5e8a9 Merge pull request 'ci: fix registry auth and bump obikmer to 1.1.30' (#53) from push-zxlknspoxknt into main
Reviewed-on: #53
2026-07-01 14:20:09 +00:00
Eric Coissac 45ed2bc9b8 ci: fix registry auth and bump obikmer to 1.1.30
Release / create-release (push) Successful in 2m26s
Release / build-linux-x86_64 (push) Successful in 8m12s
Release / build-macos-arm64 (push) Failing after 1m55s
CI / build (pull_request) Successful in 3m32s
Update the release workflow to explicitly resolve the Docker registry username from repository secrets instead of inferring it from the runner's actor. Bump the obikmer package version to 1.1.30.
2026-07-01 14:31:30 +02:00
coissac aa126fd89d Merge pull request 'feat: simplify worker spawning logic and update macOS build workflow' (#52) from push-uvmlknmzqqnx into main
Reviewed-on: #52
2026-07-01 09:50:51 +00:00
Eric Coissac c612132763 feat: simplify worker spawning logic and update macOS build workflow
Release / create-release (push) Successful in 2m59s
Release / build-linux-x86_64 (push) Successful in 8m13s
Release / build-macos-arm64 (push) Failing after 8s
CI / build (pull_request) Successful in 3m24s
Updates the release workflow to run macOS builds inside a Docker container with explicit registry authentication and adjusted artifact paths. Bumps the obikmer crate version to 1.1.29 and adds *.log to .gitignore. Simplifies NUMA worker spawning by lowering the activation threshold from 0.95 to 0.2, replacing complex stateful tracking with a direct efficiency check, and downgrading progress logging to debug level. Includes general code formatting improvements for readability.
2026-07-01 11:40:57 +02:00
coissac 19660f8cd0 Merge pull request 'ci: update registry auth and improve adaptive worker scaling' (#51) from push-qlpywtroutvx into main
Reviewed-on: #51
2026-06-26 13:16:23 +00:00
Eric Coissac 7b07540a69 ci: update registry auth and improve adaptive worker scaling
Release / create-release (push) Successful in 2m27s
CI / build (pull_request) Successful in 3m17s
Release / build-linux-x86_64 (push) Successful in 8m3s
Release / build-macos-arm64 (push) Failing after 1s
Refactor the release workflow to use a structured container object with authenticated pulls for macOS ARM64 builds. Replace single-worker activation with dynamic upfront provisioning based on node and worker counts. Implement an absolute efficiency gain threshold for scaling checks and add early termination to improve adaptive scaling stability. Bump obikmer crate version to 1.1.27.
2026-06-26 15:13:13 +02:00
coissac 89c43e28f5 Merge pull request 'ci: update release workflow and bump obikmer to 1.1.26' (#50) from push-npttlqpomtvz into main
Reviewed-on: #50
2026-06-24 13:55:40 +00:00
Eric Coissac b9b2e42ad2 ci: update release workflow and bump obikmer to 1.1.26
Release / create-release (push) Successful in 2m32s
CI / build (pull_request) Successful in 3m47s
Release / build-linux-x86_64 (push) Successful in 8m18s
Release / build-macos-arm64 (push) Failing after 0s
Replaces the macOS ARM64 cross-compilation container with a custom internal registry image. Adds explicit steps to install the `aarch64-apple-darwin` Rust target and `jq`, and updates the build command to use `--no-default-features`. Bumps the `obikmer` package version from 1.1.25 to 1.1.26.
2026-06-24 15:55:02 +02:00
coissac ca42fdff2f Merge pull request 'ci: update macOS ARM64 build workflow and bump obikmer version' (#49) from push-lllnsqlrqrut into main
Reviewed-on: #49
2026-06-23 13:15:20 +00:00
Eric Coissac 136cd89efb ci: update macOS ARM64 build workflow and bump obikmer version
Release / create-release (push) Successful in 2m27s
Release / build-linux-x86_64 (push) Successful in 7m52s
Release / build-macos-arm64 (push) Failing after 8m53s
CI / build (pull_request) Successful in 5m31s
Replace manual Zig/cargo-zigbuild setup with a pre-configured Docker container (`joseluisq/rust-linux-darwin-builder`). Use explicit Clang cross-compilers for native macOS ARM64 compilation. Bump the `obikmer` package version to 1.1.25.
2026-06-23 15:01:17 +02:00
coissac a4bbf607b7 Merge pull request 'Push kxsopnzprltv' (#48) from push-kxsopnzprltv into main
Reviewed-on: #48
2026-06-23 09:51:33 +00:00
Eric Coissac 9927100a1c chore: update obikmer to 1.1.24
Release / create-release (push) Successful in 2m24s
Release / build-linux-x86_64 (push) Successful in 7m49s
Release / build-macos-arm64 (push) Failing after 3m31s
CI / build (pull_request) Successful in 3m22s
Bumps the obikmer version in Cargo.toml from 1.1.21 to 1.1.24 and updates Cargo.lock to align with the upstream patch release (1.1.23). This ensures consistent dependency resolution across builds.
2026-06-23 11:47:54 +02:00
Eric Coissac 527258f822 ci: enforce macOS 11.0 deployment target for ARM builds
Adds MACOSX_DEPLOYMENT_TARGET=11.0 environment variable and updates the cargo zigbuild target to aarch64-apple-darwin11.0 to explicitly require macOS 11.0 for ARM binary compilation.
2026-06-23 11:46:09 +02:00
coissac ef62f1947e Merge pull request 'chore: bump version to 1.1.21 and update obikindex features' (#47) from push-xwutoxpnxorz into main
Reviewed-on: #47
2026-06-23 08:31:31 +00:00
Eric Coissac d02316dcf6 chore: bump version to 1.1.21 and update obikindex features
Release / create-release (push) Successful in 2m29s
CI / build (pull_request) Successful in 4m36s
Release / build-linux-x86_64 (push) Successful in 10m25s
Release / build-macos-arm64 (push) Failing after 4m50s
Disables default features for the `obikindex` dependency and introduces a `[features]` block. The new `numa` feature is set as the default, conditionally enabling NUMA support in `obikindex`.
2026-06-23 10:30:39 +02:00
coissac c323b3eaef Merge pull request 'Bump obikmer to 1.1.20 and update release workflow' (#46) from push-wpnywwlwxrps into main
Reviewed-on: #46
2026-06-23 08:03:58 +00:00
Eric Coissac b77d8e9ca0 Bump obikmer to 1.1.20 and update release workflow
Release / create-release (push) Successful in 2m26s
CI / build (pull_request) Successful in 3m14s
Release / build-linux-x86_64 (push) Successful in 7m44s
Release / build-macos-arm64 (push) Failing after 7m13s
Update the Gitea release workflow to fetch a full git clone with complete history, ensuring all commits and tags are available for accurate version resolution. This prepares the repository for the standard patch-level release of obikmer v1.1.20.
2026-06-23 10:03:15 +02:00
coissac 7c5bab3694 Merge pull request 'fix(ci): restrict workflow to PRs and improve release tagging' (#45) from push-louqrszyuqpz into main
Reviewed-on: #45
2026-06-23 07:52:35 +00:00
Eric Coissac fab4e0d6de fix(ci): restrict workflow to PRs and improve release tagging
Release / create-release (push) Failing after 26s
Release / build-linux-x86_64 (push) Has been skipped
Release / build-macos-arm64 (push) Has been skipped
CI / build (pull_request) Successful in 3m17s
Restrict the CI pipeline to pull request events only by removing the unconfigured push trigger and eliminating a duplicate pull_request block in the workflow file. Update the Makefile to suppress stderr from the aichat command and introduce a fallback release tag message for robust version tagging. Additionally, bump the obikmer crate version to 1.1.19.
2026-06-23 09:42:23 +02:00
coissac 973a3f3d6e Merge pull request 'feat: add numa feature flag and automate release workflow' (#44) from push-uymxyvsyooro into main
CI / build (push) Successful in 3m16s
Reviewed-on: #44
2026-06-23 07:22:33 +00:00
Eric Coissac 1a839a295a feat: add numa feature flag and automate release workflow
Release / create-release (push) Failing after 37s
Release / build-linux-x86_64 (push) Has been skipped
Release / build-macos-arm64 (push) Has been skipped
CI / build (pull_request) Successful in 3m23s
Refactor the Gitea release pipeline to generate releases via API and upload binaries using a shared ID. Automate changelog generation by fetching recent commits with `jj log` and producing markdown notes via `aichat`. Convert `hwlocality` to an optional dependency gated by a default `numa` feature, providing fallback implementations for graceful degradation when NUMA support is disabled. Bump obikmer to 1.1.18.
2026-06-23 09:07:04 +02:00
11 changed files with 712 additions and 232 deletions
Vendored
BIN
View File
Binary file not shown.
+1 -2
View File
@@ -1,9 +1,8 @@
name: CI
on:
push:
branches: ['main']
pull_request:
branches: ['main']
jobs:
build:
+15 -14
View File
@@ -11,6 +11,10 @@ jobs:
outputs:
release_id: ${{ steps.create.outputs.release_id }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Create Gitea release
id: create
env:
@@ -18,11 +22,12 @@ jobs:
TAG: ${{ github.ref_name }}
run: |
sudo apt-get update -qq && sudo apt-get install -y -qq jq
body=$(git for-each-ref --format='%(contents)' "refs/tags/$TAG")
release_id=$(curl -s -X POST \
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases" \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"tag_name\":\"$TAG\",\"name\":\"$TAG\"}" | jq -r '.id')
-d "{\"tag_name\":\"$TAG\",\"name\":\"$TAG\",\"body\":$(echo "$body" | jq -Rs .)}" | jq -r '.id')
echo "release_id=$release_id" >> $GITHUB_OUTPUT
build-linux-x86_64:
@@ -81,20 +86,11 @@ jobs:
build-macos-arm64:
needs: create-release
runs-on: ubuntu-latest
defaults:
run:
working-directory: src
steps:
- uses: actions/checkout@v4
- name: Install Rust + zigbuild
run: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain stable
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
sudo apt-get update -qq && sudo apt-get install -y -qq jq
pip install ziglang --quiet --break-system-packages
$HOME/.cargo/bin/cargo install cargo-zigbuild
$HOME/.cargo/bin/rustup target add aarch64-apple-darwin
- name: Login to registry
run: echo "${{ secrets.REGISTRYTOKEN }}" | docker login registry.metabarcoding.org -u ${{ secrets.REGISTRYUSER }} --password-stdin
- name: Cache cargo registry
uses: actions/cache@v4
@@ -107,7 +103,12 @@ jobs:
restore-keys: macos-arm64-cargo-
- name: Build macOS binary
run: cargo zigbuild --release --target aarch64-apple-darwin --no-default-features
run: |
docker run --rm \
-v "${{ github.workspace }}:/src" \
-w /src/src \
registry.metabarcoding.org/cibuilder/rustcrossosx:latest \
cargo build --release --target aarch64-apple-darwin --no-default-features
- name: Prepare and upload artifact
env:
@@ -115,7 +116,7 @@ jobs:
RELEASE_ID: ${{ needs.create-release.outputs.release_id }}
run: |
mkdir -p /tmp/dist
cp target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
cp src/target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
curl -s -X POST \
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \
-H "Authorization: token $GITEA_TOKEN" \
+1
View File
@@ -8,6 +8,7 @@ data-stress
*.pb
./**/*.json
*.bin
*.log
Betula_exilis--IGA-24-33
benchmark/genomes
benchmark/simulated_data
+5 -1
View File
@@ -90,5 +90,9 @@ release: bump-version
@jj git push --change @
@new_version=$$(grep '^version = ' $(CARGO_TOML) | head -n 1 | sed 's/version = "\(.*\)"/\1/'); \
git_hash=$$(jj log -r @ --no-graph -T 'commit_id'); \
git tag "v$$new_version" "$$git_hash" && \
commits=$$(jj log -r 'latest(tags())..@' --no-graph -T 'description ++ "\n"' 2>/dev/null || \
jj log --no-graph -T 'description ++ "\n"' --limit 30); \
notes=$$(printf 'Write concise markdown release notes for obikmer (a Rust kmer genomics tool). Be technical and direct. Base them strictly on these commit messages:\n\n%s' "$$commits" | aichat 2>/dev/null); \
tag_msg="$${notes:-Release v$$new_version}"; \
git tag -a "v$$new_version" -m "$$tag_msg" "$$git_hash" && \
git push origin "v$$new_version"
+147 -3
View File
@@ -162,14 +162,158 @@ A single `PartitionRunner` instance can be built once per command invocation
and reused across multiple `run()` calls (e.g. `merge` runs
`merge_partitions` then `pack_matrices`).
## Known issue: CPU-only activation signal stalls on I/O-bound stages
Observed on a real `filter` run (109 genomes, 256 partitions, 8×24-core NUMA):
`rebuild` (CPU-bound — k-mer construction) scales cleanly from 9 to 43 active
workers as `CpuSample::do_i_activate` (`obisys::lib.rs`) sees efficiency climb.
`pack_matrices` (I/O-bound — reopens and recomposes per-genome column files
into `.pbmx`/`.pcmx`) activates one extra worker then flatlines at 10/192 for
the rest of the stage, even though 256 partitions keep completing over several
minutes. This matches the documented intent (§ Adaptive mechanism — "avoids
over-provisioning ... I/O-bound ... workloads") but conflates two different
things: *"CPU is not the bottleneck"* and *"more workers would not help"*. On
storage with real queue depth (NVMe, RAID, parallel FS) the second stage could
still benefit from more concurrent workers even with flat CPU usage — a signal
the current mechanism cannot see.
A one-off artefact was also found in the same log: right after a stage
transition, `do_i_activate` produced a physically impossible spike (efficiency
~94 cores on a 192-core box) because it has no minimum-window guard — unlike
its sibling `cpu_efficiency`, which returns `0.0` if `wall < 0.1s`
(`obisys::lib.rs:260`). `do_i_activate` unconditionally overwrites
`self.wall`/`self.user_secs`/`self.sys_secs` even when the elapsed window is
too short to be meaningful, so a burst of rapid completions right after
activating a worker can divide a real CPU delta by a near-zero wall delta.
### Implemented: I/O signal + shared debounce guard
`IoSample` (`obisys::lib.rs`, alongside `CpuSample`) is fed by
`read_bytes`/`write_bytes` from `/proc/self/io` on Linux (actual bytes
submitted to the block layer — not `rchar`/`wchar`, which also count
page-cache hits, and not `ru_inblock`/`ru_oublock`, unreliable on macOS), with
a `proc_pid_rusage(RUSAGE_INFO_V4)` fallback on macOS
(`ri_diskio_bytesread`/`ri_diskio_byteswritten`, FFI only via `libc`, no new
dependency — same pattern as the existing `getrusage` bindings). Any other
target degrades gracefully to a signal that never triggers (falls back to
CPU-only activation), same pattern as `cgroup_v2_available`.
`maybe_activate` (`numa.rs`) activates a worker if *either* signal still shows
headroom, making `PartitionRunner` adapt to whichever resource is actually the
bottleneck without per-call configuration. Both samplers are called
unconditionally — no `||` short-circuit — so neither window starves behind
whichever signal fires first:
```rust
let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD);
if cpu_wants_more || io_wants_more {
activation.grow(GROWTH_DIVISOR, n_total);
}
```
The CPU threshold is *not* the flat absolute delta it started as: it scales
with `activation.last_step()` — the number of workers activated in the last
growth step, tracked by `NodeActivation` (`numa.rs`) and updated every time
`grow()` actually grows something. Growing by 8 workers should add ~8 cores of
efficiency if the workload is truly CPU-bound; requiring only
`CPU_SPAWN_THRESHOLD` (20 %) of that expected gain confirms the growth was
useful without demanding perfect linear scaling. Scaling by the *last step's
size* rather than the cumulative total keeps the bar equally meaningful
whether it's the 2nd growth step or the 20th — a flat absolute threshold
(0.2 core) is a strong signal at 8 active workers but pure noise at 150; a
threshold scaled by the *cumulative* total instead (considered and rejected)
would have made the bar essentially impossible to clear late in the ramp,
strangling exactly the CPU-bound saturation the mechanism exists to allow.
Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit),
raw I/O throughput has no natural scale across devices, so `IoSample` uses a
**relative** growth threshold instead of an absolute one:
```rust
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let elapsed = self.wall.elapsed().as_secs_f64();
if elapsed < 0.1 { return false; } // state untouched — window keeps accumulating
let n = Self::read_bytes();
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
let activate = if self.previous_rate == 0.0 {
rate > 0.0 // bootstrap: any measured throughput is signal
} else {
(rate - self.previous_rate) / self.previous_rate >= threshold
};
self.bytes = n;
self.wall = Instant::now(); // reset only on a real sample
activate
}
```
The `elapsed < 0.1s → return false without mutating state` guard was also
back-ported into `CpuSample::do_i_activate` (previously missing — source of
the ~94-core artefact above) — one fix for both problems, and it removes the
need for any arbitrary I/O-rate floor: a short/noisy window is rejected
outright rather than papered over with a hardware-dependent constant.
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, module-level
`const` in `numa.rs`, both `0.2`) are a starting point, not a derived value:
`0.2` (20 % relative growth) for `IoSample` was chosen to match the CPU
threshold's *implicit* relative sensitivity (in the observed log, an 8→9
worker step raised efficiency by ~12 %) — but I/O throughput is lumpier than
CPU time (buffered writes flush in bursts), so it needs empirical validation
against a real `pack` run before being considered final.
## Known issue: ramp-up too slow, and confused with node count
The original design started `n_nodes` workers (one per node) and grew one
worker at a time. On a real `filter` run this took ~10 minutes to climb from
9 to ~40 active workers even on the CPU-bound `rebuild` stage — most of a
35-minute stage spent under-provisioned while waiting for evidence to
accumulate one worker at a time. There is no scale-down mechanism (`n_active`
only grows), so the original caution was deliberate — but a quarter of
available cores is still far from saturation, and the real risk zone (over-provisioning
a memory-bandwidth-bound stage) only shows up much later in the ramp, near
full occupancy — not at 25 %.
The fix decouples ramp speed from node *count*: both the initial size and the
growth step are a fraction of `workers_per_node` (node *size*), applied
identically on every node. A single-NUMA-node (UMA) machine ramps exactly as
fast as an 8-node one — growing by `n_nodes` per step, as first considered,
would have degenerated to "grow by 1" on UMA, reproducing the original
problem for exactly the machines that need the fix most.
```rust
// NodeActivation::grow — called both at startup (activate_initial) and on
// every CPU/IO-triggered growth step, with a different divisor each time.
let wanted = (self.caps[idx] / divisor).max(1); // INITIAL_DIVISOR=4 at startup, GROWTH_DIVISOR=8 per step
let room = self.caps[idx].saturating_sub(self.active[idx]);
let grow = wanted.min(room).min(n_total.saturating_sub(self.total));
```
This also fixed a latent correctness gap: the original single shared
`activate_tx`/`activate_rx` pair had *no* per-node addressing — sending one
activation signal woke up whichever dormant worker (from any node) happened
to win the race on that channel. `crossbeam_channel` gives no fairness
guarantee across competing receivers, so "round-robin across nodes" was an
assumption the code never actually enforced. `PartitionRunner::run` now opens
one activation channel per node (`activate_txs`/`activate_rxs`, one pair per
`NodeConfig`); `NodeActivation` (`numa.rs`) tracks how many of each node's
dormant workers have been woken and grows every node by the same amount per
step, capped by that node's remaining dormant workers and by the run's total
budget (`n_total`) — balance across nodes is now guaranteed by construction,
not incidental to channel implementation details.
## Open questions
- **Error handling**: `run` currently returns the first error; remaining errors
are dropped. A `Vec<E>` return would give complete diagnostics.
- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from
a higher value. A per-call override could be added to the API.
- **`INITIAL_DIVISOR` / `GROWTH_DIVISOR` tuning**: currently `4` and `8`
(start at 1/4 of a node's cores, grow by 1/8 per step), chosen to fix an
observed too-slow ramp — not yet validated against a real `pack` (I/O-bound)
run, where over-provisioning risk is different from the CPU-bound `rebuild`
case this was tuned against.
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
+1 -1
View File
@@ -1704,7 +1704,7 @@ dependencies = [
[[package]]
name = "obikmer"
version = "1.1.16"
version = "1.1.34"
dependencies = [
"clap",
"csv",
+17 -3
View File
@@ -500,17 +500,26 @@ where T: Clone + Default {
}
/// Compute a symmetric `n×n` matrix in parallel by evaluating `f(i,j)` for
/// all upper-triangle pairs. `T: Copy` avoids the `.clone()` needed for the
/// lower-triangle mirror.
/// all upper-triangle pairs, plus `f(i,i)` for the diagonal. `T: Copy` avoids
/// the `.clone()` needed for the lower-triangle mirror.
///
/// The diagonal is *not* generally `T::default()`: for a self-comparison,
/// `f(i,i)` is often the column's own weight (e.g. intersection-with-self —
/// see `pairwise2_matrix`), not zero. Distance finalisations that need a
/// zero diagonal (self-distance) already overwrite it explicitly.
pub(crate) fn pairwise_matrix<T>(n: usize, f: impl Fn(usize, usize) -> T + Sync) -> Array2<T>
where T: Copy + Default + Send {
let results: Vec<(usize, usize, T)> = upper_pairs(n)
.into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect();
fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v)))
let mut m = fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v)));
for i in 0..n { m[[i, i]] = f(i, i); }
m
}
/// Same as `pairwise_matrix` but `f` returns two values that fill two
/// symmetric matrices simultaneously (e.g. intersection + union for Jaccard).
/// The diagonal is `f(i,i)` (e.g. a genome's kmer count intersected with
/// itself), not `T::default()` — see `pairwise_matrix` for why that matters.
pub(crate) fn pairwise2_matrix<T>(n: usize, f: impl Fn(usize, usize) -> (T, T) + Sync) -> (Array2<T>, Array2<T>)
where T: Copy + Default + Send {
let results: Vec<(usize, usize, T, T)> = upper_pairs(n)
@@ -523,5 +532,10 @@ where T: Copy + Default + Send {
m0[[i, j]] = a; m0[[j, i]] = a;
m1[[i, j]] = b; m1[[j, i]] = b;
}
for i in 0..n {
let (a, b) = f(i, i);
m0[[i, i]] = a;
m1[[i, i]] = b;
}
(m0, m1)
}
+190 -76
View File
@@ -20,7 +20,7 @@ use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet;
#[cfg(feature = "numa")]
use hwlocality::object::types::ObjectType;
use obisys::CpuSample;
use obisys::{CpuSample, IoSample};
use tracing::debug;
// ── Public interface ──────────────────────────────────────────────────────────
@@ -70,7 +70,10 @@ pub fn build() -> NumaSetup {
nodes.len(),
nodes.first().map_or(0, |v| v.len()),
);
return NumaSetup { pools, cpus_per_node: nodes };
return NumaSetup {
pools,
cpus_per_node: nodes,
};
}
}
}
@@ -102,7 +105,9 @@ pub fn build() -> NumaSetup {
/// Silently returns on any error so the thread still runs, just unbound.
#[cfg(feature = "numa")]
pub fn pin_current_thread(cpu_indices: &[usize]) {
let Ok(topology) = Topology::new() else { return };
let Ok(topology) = Topology::new() else {
return;
};
let mut cpuset = CpuSet::new();
for &idx in cpu_indices {
cpuset.set(idx);
@@ -132,7 +137,22 @@ fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
.ok()
}
// ── PartitionRunner ───────────────────────────────────────────────────────────
// ── PartitionRunner ─────────────────────────────────────────────────────────
/// Growth step (fraction of a node's worker capacity added per activation
/// event, see [`NodeActivation::grow`]).
const GROWTH_DIVISOR: usize = 8;
/// Minimum CPU efficiency growth to activate more workers, as a fraction of
/// the size of the *last growth step* (e.g. `0.2` after adding 8 workers
/// requires the next check to show at least +1.6 cores of growth — 20 % of
/// the ~8 cores those 8 workers should contribute if the workload is truly
/// CPU-bound). Scaling by the last step's size — not the cumulative total —
/// keeps the bar meaningful regardless of how many workers are already
/// active, instead of demanding an ever-larger absolute jump as the pool
/// grows.
const CPU_SPAWN_THRESHOLD: f64 = 0.2;
/// Minimum I/O throughput growth (relative) to activate more workers.
const IO_SPAWN_THRESHOLD: f64 = 0.2;
struct NodeConfig {
pool: Option<Arc<rayon::ThreadPool>>,
@@ -142,19 +162,23 @@ struct NodeConfig {
/// Generic NUMA-aware runner for partition-level parallel work.
///
/// Workers are distributed round-robin across NUMA nodes and pinned to their
/// Workers are distributed evenly across NUMA nodes and pinned to their
/// node's CPUs. UMA is the degenerate case: one node, no pinning.
///
/// Workers are pre-spawned dormant and activated one by one as CPU efficiency
/// falls below `SPAWN_THRESHOLD`. This avoids over-provisioning on I/O-bound
/// or memory-bandwidth-bound workloads while saturating CPU-bound ones.
/// Workers are pre-spawned dormant, one activation channel per node so
/// growth always targets a specific node rather than whichever dormant
/// worker happens to wake up first on a shared channel. Growth (both the
/// initial count and each subsequent step) is expressed as a fraction of
/// `workers_per_node`, applied identically to every node, so the pace of
/// ramp-up depends on node size rather than node count — a single-NUMA-node
/// (UMA) machine ramps just as fast as an 8-node one.
///
/// # Termination
///
/// ```text
/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
/// drop(result_tx) → result_rx closes → controller loop exits
/// drop(activate_tx) → dormant workers exit cleanly
/// drop(activate_txs) → dormant workers exit cleanly
/// ```
pub struct PartitionRunner {
nodes: Vec<NodeConfig>,
@@ -175,7 +199,8 @@ impl PartitionRunner {
ns.pools.len(),
wpn,
);
let nodes = ns.pools
let nodes = ns
.pools
.into_iter()
.zip(ns.cpus_per_node)
.map(|(pool, cpu_ids)| NodeConfig {
@@ -189,23 +214,24 @@ impl PartitionRunner {
/// Run `f(i)` for every index in `order`.
///
/// Workers are pre-spawned dormant and activated adaptively. A timer thread
/// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed
/// partition resets that timer (forcing an immediate check) and also
/// triggers its own inline check. A new worker is activated whenever
/// efficiency falls below `SPAWN_THRESHOLD`.
/// Workers are pre-spawned dormant and activated adaptively, per node:
/// `(workers_per_node / INITIAL_DIVISOR).max(1)` are woken immediately on
/// every node, then `(workers_per_node / GROWTH_DIVISOR).max(1)` more per
/// node each time the check below fires. A timer thread fires that check
/// every `TIMER_SECS` seconds; each completed partition resets that timer
/// (forcing an immediate check) and also triggers its own inline check. A
/// growth step happens whenever CPU efficiency grows by at least
/// `CPU_SPAWN_THRESHOLD` of what the last growth step should have
/// contributed, or I/O throughput grows by at least `IO_SPAWN_THRESHOLD`
/// (relative) since the last check — whichever resource is the actual
/// bottleneck still shows headroom.
///
/// `on_done(i, result, elapsed)` is called from the controller thread as
/// each partition completes — suitable for progress bars and result
/// aggregation.
///
/// Returns the first error produced by `f`, if any.
pub fn run<F, R, E, C>(
&self,
order: &[usize],
f: F,
mut on_done: C,
) -> Result<(), E>
pub fn run<F, R, E, C>(&self, order: &[usize], f: F, mut on_done: C) -> Result<(), E>
where
F: Fn(usize) -> Result<R, E> + Send + Sync,
R: Send,
@@ -217,26 +243,28 @@ impl PartitionRunner {
return Ok(());
}
const SPAWN_THRESHOLD: f64 = 0.95;
const TIMER_SECS: u64 = 30;
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
const INITIAL_DIVISOR: usize = 4;
// ── Channels ──────────────────────────────────────────────────────────
let (part_tx, part_rx) = unbounded::<usize>();
let (activate_tx, activate_rx) = unbounded::<()>();
// reset_tx: controller → timer ("reset the 30 s window")
let (reset_tx, reset_rx) = unbounded::<()>();
// event_tx: workers + timer → controller (unified event stream)
let (event_tx, event_rx) = unbounded::<WorkerEvent<R, E>>();
// One activation channel per node: growth always targets a specific
// node, rather than whichever dormant worker happens to win the race
// on a channel shared across all nodes.
let (activate_txs, activate_rxs): (Vec<_>, Vec<_>) =
(0..self.nodes.len()).map(|_| unbounded::<()>()).unzip();
for &i in order { part_tx.send(i).ok(); }
for &i in order {
part_tx.send(i).ok();
}
drop(part_tx);
let max_workers = self.max_workers();
let n_nodes = self.nodes.len();
let node_caps: Vec<usize> = self.nodes.iter().map(|n| n.max_workers).collect();
let f = &f;
let mut first_err: Option<E> = None;
@@ -260,18 +288,23 @@ impl PartitionRunner {
}
});
// ── Pre-spawn workers dormant, round-robin across NUMA nodes ──────
for w in 0..max_workers {
let node = &self.nodes[w % n_nodes];
// ── Pre-spawn workers dormant, grouped by node ────────────────────
// Each worker listens on its own node's activation channel only.
for (node, arx) in self.nodes.iter().zip(activate_rxs.iter()) {
let cpu_ids = &node.cpu_ids;
for _ in 0..node.max_workers {
let prx = part_rx.clone();
let etx = event_tx.clone();
let arx = activate_rx.clone();
let arx = arx.clone();
let pool = node.pool.clone();
let cpu_ids = &node.cpu_ids;
s.spawn(move || {
if arx.recv().is_err() { return; }
if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); }
if arx.recv().is_err() {
return;
}
if !cpu_ids.is_empty() {
pin_current_thread(cpu_ids);
}
for i in &prx {
let t = Instant::now();
let r = match &pool {
@@ -282,15 +315,17 @@ impl PartitionRunner {
}
});
}
}
// Drop controller's event_tx: event_rx closes when all workers +
// timer have exited.
drop(event_tx);
// ── Controller ────────────────────────────────────────────────────
activate_tx.send(()).ok();
let mut n_active = 1usize;
let mut activation = NodeActivation::new(&activate_txs, &node_caps, max_workers);
activation.activate_initial(INITIAL_DIVISOR, n_total);
let mut cpu_sample = CpuSample::now();
let mut eff_at_last_spawn = 0.0f64; // 0 = no previous spawn to evaluate
let mut io_sample = IoSample::now();
let mut completed = 0usize;
while completed < n_total {
@@ -299,30 +334,39 @@ impl PartitionRunner {
WorkerEvent::Completed(i, r, dur) => {
match r {
Ok(v) => on_done(i, v, dur),
Err(e) => { if first_err.is_none() { first_err = Some(e); } }
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
}
}
completed += 1;
// Reset the 30 s timer.
reset_tx.send(()).ok();
// Inline check: same logic as a timer tick.
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, &mut eff_at_last_spawn,
n_cores, SPAWN_THRESHOLD, completed, n_total,
&mut activation,
&mut cpu_sample,
&mut io_sample,
completed,
n_total,
);
}
WorkerEvent::TimerTick => {
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, &mut eff_at_last_spawn,
n_cores, SPAWN_THRESHOLD, completed, n_total,
&mut activation,
&mut cpu_sample,
&mut io_sample,
completed,
n_total,
);
}
}
}
// Dormant workers exit when activate_tx closes.
drop(activate_tx);
// Dormant workers exit once every sender for their node's channel
// is dropped — `activate_txs` holds the only ones.
drop(activate_txs);
// Timer thread exits when reset_tx closes.
drop(reset_tx);
});
@@ -341,43 +385,113 @@ enum WorkerEvent<R, E> {
TimerTick,
}
/// Tracks how many of each node's dormant workers have been woken, and
/// grows every node by the same amount at each step (capped by that node's
/// remaining dormant workers and by the run's total budget) so load stays
/// balanced across nodes at every point in time — never just "one more
/// worker somewhere". Also remembers the size of the last real growth step
/// (`last_step`), used to scale the CPU activation threshold to what that
/// step could plausibly have contributed (see `maybe_activate`).
struct NodeActivation<'a> {
txs: &'a [crossbeam_channel::Sender<()>],
caps: &'a [usize],
active: Vec<usize>,
total: usize,
max: usize,
last_step: usize,
}
impl<'a> NodeActivation<'a> {
fn new(txs: &'a [crossbeam_channel::Sender<()>], caps: &'a [usize], max: usize) -> Self {
Self {
txs,
caps,
active: vec![0; txs.len()],
total: 0,
max,
last_step: 0,
}
}
fn total(&self) -> usize {
self.total
}
fn last_step(&self) -> usize {
self.last_step
}
fn max(&self) -> usize {
self.max
}
fn is_full(&self) -> bool {
self.total >= self.max
}
/// Wake up to `(node_cap / divisor).max(1)` dormant workers on every
/// node, capped by `n_total`. Called once at startup, unconditionally.
fn activate_initial(&mut self, divisor: usize, n_total: usize) {
self.grow(divisor, n_total);
}
/// Same per-node sizing as [`activate_initial`](Self::activate_initial),
/// applied as a growth step. Returns the number of workers actually
/// activated (may be less than requested once a node or the total
/// budget is exhausted). Updates `last_step` when it actually grew.
fn grow(&mut self, divisor: usize, n_total: usize) -> usize {
let before = self.total;
for idx in 0..self.txs.len() {
let wanted = (self.caps[idx] / divisor).max(1);
let room = self.caps[idx].saturating_sub(self.active[idx]);
let grow = wanted.min(room).min(n_total.saturating_sub(self.total));
for _ in 0..grow {
self.txs[idx].send(()).ok();
}
self.active[idx] += grow;
self.total += grow;
}
let grew = self.total - before;
if grew > 0 {
self.last_step = grew;
}
grew
}
}
fn maybe_activate(
activate_tx: &crossbeam_channel::Sender<()>,
n_active: &mut usize,
max_workers: usize,
activation: &mut NodeActivation,
cpu_sample: &mut CpuSample,
eff_at_last_spawn: &mut f64,
n_cores: usize,
threshold: f64,
io_sample: &mut IoSample,
completed: usize,
n_total: usize,
) {
if *n_active >= max_workers || completed >= n_total { return; }
if activation.is_full() || completed >= n_total {
return;
}
let eff = cpu_sample.cpu_efficiency(n_cores);
if eff >= threshold { return; } // CPU already saturated
// Expect roughly 1 core of extra efficiency per worker activated in the
// last growth step (CPU-bound case); require at least CPU_SPAWN_THRESHOLD
// (20 %) of that expected gain before growing again. Scaling by the last
// step's size — not the cumulative total — keeps the bar meaningful
// regardless of how many workers are already active: growing by 8 should
// always take ~+1.6 cores to confirm, whether that's the 2nd growth step
// or the 20th.
let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
// Check that the previous activation was beneficial enough.
// Going from k-1 → k workers, the minimum acceptable speedup is (k-1+0.2)/(k-1).
// For the very first extra worker (n_active == 1, no previous spawn), skip this
// check: eff_at_last_spawn == 0 acts as the sentinel.
let last_spawn_was_beneficial = if *eff_at_last_spawn < 1e-9 {
true // first additional worker: no prior data to evaluate
} else {
let k_before = (*n_active - 1) as f64;
let min_speedup = (k_before + 0.2) / k_before;
let actual_speedup = eff / *eff_at_last_spawn;
actual_speedup >= min_speedup
};
// Call both unconditionally (no `||` short-circuit): each sampler must
// advance its own window every tick, regardless of what the other one
// reports, or it would starve behind whichever signal fires first.
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD * activation.last_step() as f64);
if !(cpu_wants_more || io_wants_more) {
return;
}
if last_spawn_was_beneficial {
activate_tx.send(()).ok();
*eff_at_last_spawn = eff;
*n_active += 1;
*cpu_sample = CpuSample::now();
let grew = activation.grow(GROWTH_DIVISOR, n_total);
if grew > 0 {
debug!(
"activated worker {}/{} — efficiency {:.0}%",
n_active, max_workers, eff * 100.0,
"activated {} worker(s) — {}/{} active",
grew,
activation.total(),
activation.max()
);
}
}
+4 -2
View File
@@ -1,6 +1,6 @@
[package]
name = "obikmer"
version = "1.1.16"
version = "1.1.34"
edition = "2024"
[[bin]]
@@ -18,7 +18,7 @@ obikrope = { path = "../obikrope" }
obikpartitionner = { path = "../obikpartitionner" }
obisys = { path = "../obisys" }
obiskio = { path = "../obiskio" }
obikindex = { path = "../obikindex" }
obikindex = { path = "../obikindex", default-features = false }
obitaxonomy = { path = "../obitaxonomy" }
obilayeredmap = { path = "../obilayeredmap" }
clap = { version = "4", features = ["derive"] }
@@ -33,4 +33,6 @@ tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
pprof = { version = "0.13", features = ["prost-codec"], optional = true }
[features]
default = ["numa"]
numa = ["obikindex/numa"]
profiling = ["dep:pprof"]
+248 -47
View File
@@ -4,7 +4,7 @@ use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};
use indicatif::{ProgressBar, ProgressStyle};
use tracing::{info, warn};
use tracing::{debug, info, warn};
const BRAILLE: &[&str] = &["", "", "", "", "", "", "", "", "", ""];
@@ -31,7 +31,8 @@ impl TracedBar {
let pct10 = (pos * 10) / self.total; // 0..=10
let last = self.last_pct.load(Ordering::Relaxed);
if pct10 > last
&& self.last_pct
&& self
.last_pct
.compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
@@ -49,14 +50,14 @@ impl TracedBar {
let msg = msg.into();
if self.pb.is_hidden() {
if self.total > 0 {
// bounded bar: always log (already rate-limited by 10% threshold in inc)
info!(stage = %self.label, "{msg}");
debug!(stage = %self.label, "{msg}");
} else {
// spinner: throttle to ~10 s
let now_ms = self.start.elapsed().as_millis() as u64;
let last = self.last_log_ms.load(Ordering::Relaxed);
if now_ms >= last + 10_000
&& self.last_log_ms
&& self
.last_log_ms
.compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
@@ -83,8 +84,13 @@ pub fn spinner(label: &str) -> TracedBar {
);
pb.enable_steady_tick(Duration::from_millis(100));
TracedBar {
pb, label: label.to_string(), unit: String::new(), total: 0,
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
pb,
label: label.to_string(),
unit: String::new(),
total: 0,
start: Instant::now(),
last_pct: AtomicU64::new(0),
last_log_ms: AtomicU64::new(0),
}
}
@@ -101,8 +107,13 @@ pub fn progress_bar(label: &str, n: u64, unit: &str) -> TracedBar {
);
pb.enable_steady_tick(Duration::from_millis(100));
TracedBar {
pb, label: label.to_string(), unit: unit.to_string(), total: n,
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
pb,
label: label.to_string(),
unit: unit.to_string(),
total: n,
start: Instant::now(),
last_pct: AtomicU64::new(0),
last_log_ms: AtomicU64::new(0),
}
}
@@ -204,13 +215,19 @@ fn tv_to_secs(tv: timeval) -> f64 {
}
#[cfg(target_os = "macos")]
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 }
fn rss_to_bytes(ru: &rusage) -> u64 {
ru.ru_maxrss as u64
}
#[cfg(not(target_os = "macos"))]
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 }
fn rss_to_bytes(ru: &rusage) -> u64 {
ru.ru_maxrss as u64 * 1024
}
// Monotonically increasing counters — negative delta would be a kernel bug.
fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
fn delta(end: i64, start: i64) -> u64 {
(end - start).max(0) as u64
}
// ── CpuSample ─────────────────────────────────────────────────────────────────
@@ -221,6 +238,7 @@ pub struct CpuSample {
wall: Instant,
user_secs: f64,
sys_secs: f64,
previous: f64,
}
impl CpuSample {
@@ -230,6 +248,7 @@ impl CpuSample {
wall: Instant::now(),
user_secs: tv_to_secs(ru.ru_utime),
sys_secs: tv_to_secs(ru.ru_stime),
previous: 0.0,
}
}
@@ -238,11 +257,129 @@ impl CpuSample {
pub fn cpu_efficiency(&self, n_cores: usize) -> f64 {
let ru = get_rusage();
let wall = self.wall.elapsed().as_secs_f64();
if wall < 0.1 { return 0.0; }
let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs)
+ (tv_to_secs(ru.ru_stime) - self.sys_secs);
if wall < 0.1 {
return 0.0;
}
let cpu =
(tv_to_secs(ru.ru_utime) - self.user_secs) + (tv_to_secs(ru.ru_stime) - self.sys_secs);
cpu / (wall * n_cores as f64)
}
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let delta_wall = self.wall.elapsed().as_secs_f64();
if delta_wall < 0.1 {
// Window too short to be meaningful — leave state untouched so it
// keeps accumulating until a real sample can be taken.
return false;
}
let n = CpuSample::now();
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
let efficiency = delta_ru / delta_wall;
let activate = 0f64.max(efficiency - self.previous) >= threshold;
debug!(
"Do I activate : {} -> {} = {} Activate: {}",
self.previous,
efficiency,
0f64.max(efficiency - self.previous),
activate
);
self.previous = efficiency;
self.user_secs = n.user_secs;
self.sys_secs = n.sys_secs;
self.wall = n.wall;
activate
}
}
// ── IoSample ──────────────────────────────────────────────────────────────────
/// Snapshot of process-wide block I/O (bytes read + written) + wall clock.
///
/// Same activation protocol as [`CpuSample`], but the growth check in
/// [`do_i_activate`](Self::do_i_activate) is *relative* rather than absolute:
/// raw I/O throughput has no portable scale across storage devices, unlike a
/// core count.
pub struct IoSample {
wall: Instant,
bytes: u64,
previous_rate: f64,
}
impl IoSample {
pub fn now() -> Self {
Self {
wall: Instant::now(),
bytes: Self::read_bytes(),
previous_rate: 0.0,
}
}
/// Bytes actually submitted to the block layer (read + write), summed
/// process-wide. Returns 0 if unavailable — degrades gracefully to a
/// signal that never triggers activation (CPU-only heuristic).
#[cfg(target_os = "linux")]
fn read_bytes() -> u64 {
let Ok(io) = std::fs::read_to_string("/proc/self/io") else {
return 0;
};
io.lines()
.filter_map(|l| {
l.strip_prefix("read_bytes: ")
.or_else(|| l.strip_prefix("write_bytes: "))
})
.filter_map(|v| v.trim().parse::<u64>().ok())
.sum()
}
#[cfg(target_os = "macos")]
fn read_bytes() -> u64 {
use libc::{RUSAGE_INFO_V4, getpid, proc_pid_rusage, rusage_info_v4};
let mut info: rusage_info_v4 = unsafe { std::mem::zeroed() };
let ret =
unsafe { proc_pid_rusage(getpid(), RUSAGE_INFO_V4, &mut info as *mut _ as *mut _) };
if ret != 0 {
return 0;
}
info.ri_diskio_bytesread + info.ri_diskio_byteswritten
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
fn read_bytes() -> u64 {
0
}
/// Same protocol as [`CpuSample::do_i_activate`] (0.1 s minimum window,
/// state untouched on early return), but growth is measured relative to
/// the previous rate. `threshold` is a fraction, e.g. `0.2` for a 20 %
/// increase in throughput since the last real sample.
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
let elapsed = self.wall.elapsed().as_secs_f64();
if elapsed < 0.1 {
return false;
}
let n = Self::read_bytes();
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
let activate = if self.previous_rate == 0.0 {
rate > 0.0 // bootstrap: any measured throughput is signal enough
} else {
(rate - self.previous_rate) / self.previous_rate >= threshold
};
debug!(
"Do I activate (I/O) : {} -> {} Activate: {}",
self.previous_rate, rate, activate
);
self.previous_rate = rate;
self.bytes = n;
self.wall = Instant::now();
activate
}
}
// ── public API ────────────────────────────────────────────────────────────────
@@ -259,7 +396,11 @@ impl Stage {
pub fn start(label: impl Into<String>) -> Self {
let label = label.into();
info!(stage = %label, "started");
Self { label, wall: Instant::now(), ru: get_rusage() }
Self {
label,
wall: Instant::now(),
ru: get_rusage(),
}
}
pub fn stop(self) -> StageStats {
@@ -318,8 +459,11 @@ pub struct StageStats {
impl StageStats {
/// (user + sys) / wall — effective thread count utilisation.
pub fn parallelism(&self) -> f64 {
if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs }
else { 0.0 }
if self.wall_secs > 1e-9 {
(self.user_secs + self.sys_secs) / self.wall_secs
} else {
0.0
}
}
/// parallelism / n_cores — fraction of available CPU power used (0..1+).
@@ -335,11 +479,19 @@ pub struct Reporter {
}
impl Reporter {
pub fn new() -> Self { Self::default() }
pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); }
pub fn stages(&self) -> &[StageStats] { &self.stages }
pub fn new() -> Self {
Self::default()
}
pub fn push(&mut self, stats: StageStats) {
self.stages.push(stats);
}
pub fn stages(&self) -> &[StageStats] {
&self.stages
}
/// Print the summary to stderr.
pub fn print(&self) { eprint!("{self}"); }
pub fn print(&self) {
eprint!("{self}");
}
}
// ── diagnosis ─────────────────────────────────────────────────────────────────
@@ -387,26 +539,43 @@ fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
)),
};
}
Diagnosis { tag: "", detail: None }
Diagnosis {
tag: "",
detail: None,
}
}
// ── display helpers ───────────────────────────────────────────────────────────
fn fmt_secs(s: f64) -> String {
if s >= 100.0 { format!("{:.0}s", s) }
else if s >= 10.0 { format!("{:.1}s", s) }
else if s >= 1.0 { format!("{:.2}s", s) }
else { format!("{:.0}ms", s * 1000.0) }
if s >= 100.0 {
format!("{:.0}s", s)
} else if s >= 10.0 {
format!("{:.1}s", s)
} else if s >= 1.0 {
format!("{:.2}s", s)
} else {
format!("{:.0}ms", s * 1000.0)
}
}
fn fmt_bytes(b: u64) -> String {
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) }
else { format!("{:.0} KB", b as f64 / 1024.0) }
if b >= 1 << 30 {
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64)
} else if b >= 1 << 20 {
format!("{:.0} MB", b as f64 / (1u64 << 20) as f64)
} else {
format!("{:.0} KB", b as f64 / 1024.0)
}
}
fn fmt_efficiency(par: f64, n_cores: usize) -> String {
format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0)
format!(
"{:.1}×/{} ({:.0}%)",
par,
n_cores,
par / n_cores as f64 * 100.0
)
}
// ── Display ───────────────────────────────────────────────────────────────────
@@ -434,7 +603,11 @@ impl MemoryBudget {
pub fn new(total: u64) -> Self {
Self {
total,
inner: Mutex::new(BudgetInner { remaining: total, active: 0, peak_active: 0 }),
inner: Mutex::new(BudgetInner {
remaining: total,
active: 0,
peak_active: 0,
}),
condvar: Condvar::new(),
}
}
@@ -459,24 +632,40 @@ impl MemoryBudget {
self.condvar.notify_all();
}
pub fn total(&self) -> u64 { self.total }
pub fn active(&self) -> usize { self.inner.lock().unwrap().active }
pub fn remaining(&self) -> u64 { self.inner.lock().unwrap().remaining }
pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active }
pub fn total(&self) -> u64 {
self.total
}
pub fn active(&self) -> usize {
self.inner.lock().unwrap().active
}
pub fn remaining(&self) -> u64 {
self.inner.lock().unwrap().remaining
}
pub fn peak_active(&self) -> usize {
self.inner.lock().unwrap().peak_active
}
}
// ── Display ───────────────────────────────────────────────────────────────────
impl fmt::Display for Reporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.stages.is_empty() { return Ok(()); }
if self.stages.is_empty() {
return Ok(());
}
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
// column widths
let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5);
let nw = self
.stages
.iter()
.map(|s| s.label.len())
.max()
.unwrap_or(5)
.max(5);
// efficiency col: worst-case width for this run's n_cores value
let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len();
@@ -484,18 +673,21 @@ impl fmt::Display for Reporter {
let sep = "".repeat(sep_w);
// header
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} status",
"stage", "wall", "efficiency", "peak RSS")?;
writeln!(
f,
"{:<nw$} {:>7} {:>ew$} {:>8} status",
"stage", "wall", "efficiency", "peak RSS"
)?;
writeln!(f, "{sep}")?;
// compute all diagnoses up front (needed for both table and footnotes)
let diagnoses: Vec<Diagnosis> = self.stages.iter()
.map(|s| diagnose(s, n_cores))
.collect();
let diagnoses: Vec<Diagnosis> = self.stages.iter().map(|s| diagnose(s, n_cores)).collect();
// per-stage rows
for (s, d) in self.stages.iter().zip(diagnoses.iter()) {
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} {}",
writeln!(
f,
"{:<nw$} {:>7} {:>ew$} {:>8} {}",
s.label,
fmt_secs(s.wall_secs),
fmt_efficiency(s.parallelism(), n_cores),
@@ -508,11 +700,18 @@ impl fmt::Display for Reporter {
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0);
let trss = self
.stages
.iter()
.map(|s| s.max_rss_bytes)
.max()
.unwrap_or(0);
let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 };
writeln!(f, "{sep}")?;
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8}",
writeln!(
f,
"{:<nw$} {:>7} {:>ew$} {:>8}",
"TOTAL",
fmt_secs(tw),
fmt_efficiency(tpar, n_cores),
@@ -520,7 +719,9 @@ impl fmt::Display for Reporter {
)?;
// bottleneck footnotes (only if at least one anomaly detected)
let bottlenecks: Vec<(&str, &str)> = self.stages.iter()
let bottlenecks: Vec<(&str, &str)> = self
.stages
.iter()
.zip(diagnoses.iter())
.filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det)))
.collect();