Compare commits
28 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c612132763 | |||
| 19660f8cd0 | |||
| 7b07540a69 | |||
| 89c43e28f5 | |||
| b9b2e42ad2 | |||
| ca42fdff2f | |||
| 136cd89efb | |||
| a4bbf607b7 | |||
| 9927100a1c | |||
| 527258f822 | |||
| ef62f1947e | |||
| d02316dcf6 | |||
| c323b3eaef | |||
| b77d8e9ca0 | |||
| 7c5bab3694 | |||
| fab4e0d6de | |||
| 973a3f3d6e | |||
| 1a839a295a | |||
| 2ea58703c7 | |||
| ac3ef106e7 | |||
| 469e53b6f5 | |||
| 9f1df96ea7 | |||
| 4e4cce2879 | |||
| 68b05b93c4 | |||
| 0a668cf8a6 | |||
| e6d6942e2f | |||
| bf9c9aeacb | |||
| 22a65857a1 |
@@ -1,9 +1,8 @@
|
||||
name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: ['**']
|
||||
pull_request:
|
||||
branches: ['main']
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
@@ -6,7 +6,32 @@ on:
|
||||
- "v*"
|
||||
|
||||
jobs:
|
||||
build-linux-static:
|
||||
create-release:
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
release_id: ${{ steps.create.outputs.release_id }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Create Gitea release
|
||||
id: create
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEATOKEN }}
|
||||
TAG: ${{ github.ref_name }}
|
||||
run: |
|
||||
sudo apt-get update -qq && sudo apt-get install -y -qq jq
|
||||
body=$(git for-each-ref --format='%(contents)' "refs/tags/$TAG")
|
||||
release_id=$(curl -s -X POST \
|
||||
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases" \
|
||||
-H "Authorization: token $GITEA_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"tag_name\":\"$TAG\",\"name\":\"$TAG\",\"body\":$(echo "$body" | jq -Rs .)}" | jq -r '.id')
|
||||
echo "release_id=$release_id" >> $GITHUB_OUTPUT
|
||||
|
||||
build-linux-x86_64:
|
||||
needs: create-release
|
||||
runs-on: ubuntu-latest
|
||||
defaults:
|
||||
run:
|
||||
@@ -41,25 +66,58 @@ jobs:
|
||||
restore-keys: linux-musl-cargo-
|
||||
|
||||
- name: Build static binary
|
||||
env:
|
||||
PKG_CONFIG_ALLOW_CROSS: "1"
|
||||
run: cargo zigbuild --release --target x86_64-unknown-linux-musl
|
||||
|
||||
- name: Prepare artifact
|
||||
- name: Prepare and upload artifact
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEATOKEN }}
|
||||
RELEASE_ID: ${{ needs.create-release.outputs.release_id }}
|
||||
run: |
|
||||
mkdir -p /tmp/dist
|
||||
cp target/x86_64-unknown-linux-musl/release/obikmer /tmp/dist/obikmer-linux-x86_64
|
||||
strip /tmp/dist/obikmer-linux-x86_64
|
||||
|
||||
- name: Create Gitea release and upload binary
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEATOKEN }}
|
||||
TAG: ${{ github.ref_name }}
|
||||
run: |
|
||||
release_id=$(curl -s -X POST \
|
||||
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases" \
|
||||
-H "Authorization: token $GITEA_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"tag_name\":\"$TAG\",\"name\":\"$TAG\"}" | jq -r '.id')
|
||||
curl -s -X POST \
|
||||
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$release_id/assets" \
|
||||
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \
|
||||
-H "Authorization: token $GITEA_TOKEN" \
|
||||
-F "attachment=@/tmp/dist/obikmer-linux-x86_64"
|
||||
|
||||
build-macos-arm64:
|
||||
needs: create-release
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Login to registry
|
||||
run: echo "${{ secrets.REGISTRYTOKEN }}" | docker login registry.metabarcoding.org -u ${{ github.actor }} --password-stdin
|
||||
|
||||
- name: Cache cargo registry
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
~/.cargo/git
|
||||
src/target
|
||||
key: macos-arm64-cargo-${{ hashFiles('src/Cargo.lock') }}
|
||||
restore-keys: macos-arm64-cargo-
|
||||
|
||||
- name: Build macOS binary
|
||||
run: |
|
||||
docker run --rm \
|
||||
-v "${{ github.workspace }}:/src" \
|
||||
-w /src/src \
|
||||
registry.metabarcoding.org/cibuilder/rustcrossosx:latest \
|
||||
cargo build --release --target aarch64-apple-darwin --no-default-features
|
||||
|
||||
- name: Prepare and upload artifact
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEATOKEN }}
|
||||
RELEASE_ID: ${{ needs.create-release.outputs.release_id }}
|
||||
run: |
|
||||
mkdir -p /tmp/dist
|
||||
cp src/target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
|
||||
curl -s -X POST \
|
||||
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \
|
||||
-H "Authorization: token $GITEA_TOKEN" \
|
||||
-F "attachment=@/tmp/dist/obikmer-macos-arm64"
|
||||
|
||||
@@ -8,6 +8,7 @@ data-stress
|
||||
*.pb
|
||||
./**/*.json
|
||||
*.bin
|
||||
*.log
|
||||
Betula_exilis--IGA-24-33
|
||||
benchmark/genomes
|
||||
benchmark/simulated_data
|
||||
@@ -17,5 +18,7 @@ benchmark/global_index_presence
|
||||
benchmark/global_index_count
|
||||
benchmark/stats
|
||||
benchmark/reference_index
|
||||
benchmark/reference_dist
|
||||
benchmark/obikmer_dist
|
||||
benchmark/specific_index_count
|
||||
benchmark/specific_index_presence
|
||||
|
||||
@@ -86,9 +86,13 @@ bump-version:
|
||||
|
||||
.PHONY: release
|
||||
release: bump-version
|
||||
@new_version=$$(grep '^version = ' $(CARGO_TOML) | head -n 1 | sed 's/version = "\(.*\)"/\1/'); \
|
||||
git tag "v$$new_version"
|
||||
@jj auto-describe
|
||||
@jj git push --change @
|
||||
@new_version=$$(grep '^version = ' $(CARGO_TOML) | head -n 1 | sed 's/version = "\(.*\)"/\1/'); \
|
||||
git_hash=$$(jj log -r @ --no-graph -T 'commit_id'); \
|
||||
commits=$$(jj log -r 'latest(tags())..@' --no-graph -T 'description ++ "\n"' 2>/dev/null || \
|
||||
jj log --no-graph -T 'description ++ "\n"' --limit 30); \
|
||||
notes=$$(printf 'Write concise markdown release notes for obikmer (a Rust kmer genomics tool). Be technical and direct. Base them strictly on these commit messages:\n\n%s' "$$commits" | aichat 2>/dev/null); \
|
||||
tag_msg="$${notes:-Release v$$new_version}"; \
|
||||
git tag -a "v$$new_version" -m "$$tag_msg" "$$git_hash" && \
|
||||
git push origin "v$$new_version"
|
||||
|
||||
+88
-2
@@ -10,6 +10,23 @@ GENOMES := $(wildcard genomes/*.fna.gz)
|
||||
-include deps.mk
|
||||
|
||||
REF_NPZS := $(SPECIMENS:%=reference_index/%.npz)
|
||||
REF_DIST_CSVS := $(addprefix reference_dist/, \
|
||||
shared_kmers.csv hamming_dist.csv jaccard_dist.csv \
|
||||
bray_curtis_dist.csv relfreq_bray_curtis_dist.csv \
|
||||
euclidean_dist.csv relfreq_euclidean_dist.csv \
|
||||
hellinger_dist.csv hellinger_euclidean_dist.csv)
|
||||
OBIKMER_PRESENCE_DIST := $(addprefix obikmer_dist/presence/, \
|
||||
jaccard_dist.csv jaccard_shared.csv jaccard_nj.nwk \
|
||||
hamming_dist.csv hamming_nj.nwk)
|
||||
OBIKMER_COUNT_DIST := $(addprefix obikmer_dist/count/, \
|
||||
jaccard_dist.csv jaccard_shared.csv jaccard_nj.nwk \
|
||||
bray_curtis_dist.csv bray_curtis_nj.nwk \
|
||||
relfreq_bray_curtis_dist.csv relfreq_bray_curtis_nj.nwk \
|
||||
euclidean_dist.csv euclidean_nj.nwk \
|
||||
relfreq_euclidean_dist.csv relfreq_euclidean_nj.nwk \
|
||||
hellinger_dist.csv hellinger_nj.nwk \
|
||||
hellinger_euclidean_dist.csv hellinger_euclidean_nj.nwk)
|
||||
DIST_COMPARISON := stats/dist_comparison/summary.csv
|
||||
PRESENCE_DONE := $(SPECIMENS:%=specimen_index_presence/%/index.done)
|
||||
PRESENCE_STATS := $(SPECIMENS:%=stats/indexing_presence/%.stats)
|
||||
COUNT_DONE := $(SPECIMENS:%=specimen_index_count/%/index.done)
|
||||
@@ -24,7 +41,9 @@ SIMULATED_READS := $(foreach s,$(SPECIMENS),simulated_data/$(subst --,/,$s)/read
|
||||
|
||||
.NOTPARALLEL:
|
||||
|
||||
.PHONY: all simulate reference \
|
||||
.PHONY: all simulate reference reference_dist \
|
||||
obikmer_dist obikmer_dist_presence obikmer_dist_count \
|
||||
dist_comparison \
|
||||
index_presence index_count \
|
||||
aggregate_index_presence aggregate_index_count \
|
||||
merge_presence merge_count \
|
||||
@@ -39,7 +58,8 @@ verify_merge_count: stats/verify_merge_count/current.csv
|
||||
|
||||
all: aggregate_verify_presence aggregate_verify_count \
|
||||
verify_merge_presence verify_merge_count \
|
||||
aggregate_filter_presence aggregate_filter_count
|
||||
aggregate_filter_presence aggregate_filter_count \
|
||||
dist_comparison
|
||||
|
||||
# ── dependency file ───────────────────────────────────────────────────────────
|
||||
|
||||
@@ -62,6 +82,72 @@ reference_index/%.npz:
|
||||
|
||||
reference: $(REF_NPZS)
|
||||
|
||||
# ── reference distance matrices ───────────────────────────────────────────────
|
||||
|
||||
$(REF_DIST_CSVS) &: $(REF_NPZS) build_reference_dist.py
|
||||
$(VENV_PY) build_reference_dist.py
|
||||
|
||||
reference_dist: $(REF_DIST_CSVS)
|
||||
|
||||
# ── obikmer distance (presence index) ────────────────────────────────────────
|
||||
|
||||
$(OBIKMER_PRESENCE_DIST) &: global_index_presence/index.done $(BINARY)
|
||||
mkdir -p obikmer_dist/presence
|
||||
$(BINARY) distance \
|
||||
--output obikmer_dist/presence/jaccard \
|
||||
--metric jaccard --shared-kmers --nj \
|
||||
global_index_presence
|
||||
$(BINARY) distance \
|
||||
--output obikmer_dist/presence/hamming \
|
||||
--metric hamming --nj \
|
||||
global_index_presence
|
||||
|
||||
obikmer_dist_presence: $(OBIKMER_PRESENCE_DIST)
|
||||
|
||||
# ── obikmer distance (count index) ───────────────────────────────────────────
|
||||
|
||||
$(OBIKMER_COUNT_DIST) &: global_index_count/index.done $(BINARY)
|
||||
mkdir -p obikmer_dist/count
|
||||
$(BINARY) distance \
|
||||
--output obikmer_dist/count/jaccard \
|
||||
--metric jaccard --shared-kmers --nj \
|
||||
global_index_count
|
||||
$(BINARY) distance \
|
||||
--output obikmer_dist/count/bray_curtis \
|
||||
--metric bray-curtis --nj \
|
||||
global_index_count
|
||||
$(BINARY) distance \
|
||||
--output obikmer_dist/count/relfreq_bray_curtis \
|
||||
--metric relfreq-bray-curtis --nj \
|
||||
global_index_count
|
||||
$(BINARY) distance \
|
||||
--output obikmer_dist/count/euclidean \
|
||||
--metric euclidean --nj \
|
||||
global_index_count
|
||||
$(BINARY) distance \
|
||||
--output obikmer_dist/count/relfreq_euclidean \
|
||||
--metric relfreq-euclidean --nj \
|
||||
global_index_count
|
||||
$(BINARY) distance \
|
||||
--output obikmer_dist/count/hellinger \
|
||||
--metric hellinger --nj \
|
||||
global_index_count
|
||||
$(BINARY) distance \
|
||||
--output obikmer_dist/count/hellinger_euclidean \
|
||||
--metric hellinger-euclidean --nj \
|
||||
global_index_count
|
||||
|
||||
obikmer_dist_count: $(OBIKMER_COUNT_DIST)
|
||||
|
||||
obikmer_dist: obikmer_dist_presence obikmer_dist_count
|
||||
|
||||
# ── distance comparison ───────────────────────────────────────────────────────
|
||||
|
||||
$(DIST_COMPARISON): $(REF_DIST_CSVS) $(OBIKMER_PRESENCE_DIST) $(OBIKMER_COUNT_DIST) compare_all_dist.py
|
||||
$(VENV_PY) compare_all_dist.py --out $(DIST_COMPARISON)
|
||||
|
||||
dist_comparison: $(DIST_COMPARISON)
|
||||
|
||||
# ── per-specimen indexing ─────────────────────────────────────────────────────
|
||||
# Prerequisites (reads → index.done + .stats) are in deps.mk.
|
||||
|
||||
|
||||
Executable
+226
@@ -0,0 +1,226 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Compute reference pairwise distance matrices from per-specimen .npz kmer indexes.
|
||||
|
||||
Reads all .npz files in reference_index/ (each containing sorted uint64 `kmers`
|
||||
and uint32 `counts`), computes all distance metrics supported by `obikmer distance`,
|
||||
and writes one CSV per metric to reference_dist/.
|
||||
|
||||
Output CSV format matches `obikmer distance --output`:
|
||||
- first row: "genome", then specimen names
|
||||
- subsequent rows: specimen name, then float or int values
|
||||
|
||||
Metrics written
|
||||
jaccard_dist.csv Jaccard distance (presence/absence)
|
||||
shared_kmers.csv Shared-kmer count matrix (intersection size)
|
||||
bray_curtis_dist.csv Bray-Curtis dissimilarity (raw counts)
|
||||
relfreq_bray_curtis_dist.csv Bray-Curtis on relative frequencies
|
||||
euclidean_dist.csv Euclidean distance (raw counts)
|
||||
relfreq_euclidean_dist.csv Euclidean distance on relative frequencies
|
||||
hellinger_dist.csv Hellinger distance
|
||||
hellinger_euclidean_dist.csv Euclidean distance in Hellinger space
|
||||
"""
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
# ── pairwise helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
def shared_indices(a_kmers: np.ndarray, b_kmers: np.ndarray):
|
||||
"""Return index arrays (idx_a, idx_b) for kmers present in both sets.
|
||||
|
||||
Both arrays must be sorted uint64. Uses searchsorted: O(|B| log |A|).
|
||||
"""
|
||||
pos = np.searchsorted(a_kmers, b_kmers)
|
||||
pos = np.clip(pos, 0, len(a_kmers) - 1)
|
||||
mask = a_kmers[pos] == b_kmers
|
||||
idx_b = np.where(mask)[0]
|
||||
idx_a = pos[idx_b]
|
||||
return idx_a, idx_b
|
||||
|
||||
|
||||
def pairwise_stats(specimens: list[dict]) -> dict[str, np.ndarray]:
|
||||
"""Compute all pairwise distance matrices at once.
|
||||
|
||||
Returns a dict metric_name → ndarray (n×n float64 or int64).
|
||||
Each specimen dict has keys: name, kmers, counts.
|
||||
"""
|
||||
n = len(specimens)
|
||||
|
||||
# Pre-compute per-specimen scalars
|
||||
kmer_counts = np.array([len(s['kmers']) for s in specimens], dtype=np.uint64)
|
||||
count_sums = np.array([s['counts'].sum() for s in specimens], dtype=np.uint64)
|
||||
|
||||
# Per-specimen sum-of-squares (for Euclidean decomposition)
|
||||
sq_sums = np.array([(s['counts'].astype(np.float64) ** 2).sum() for s in specimens])
|
||||
|
||||
# Allocate output matrices
|
||||
shared_mat = np.zeros((n, n), dtype=np.uint64)
|
||||
hamming_mat = np.zeros((n, n), dtype=np.float64)
|
||||
jaccard_mat = np.zeros((n, n), dtype=np.float64)
|
||||
bray_mat = np.zeros((n, n), dtype=np.float64)
|
||||
relfreq_bray = np.zeros((n, n), dtype=np.float64)
|
||||
euclidean_mat = np.zeros((n, n), dtype=np.float64)
|
||||
relfreq_eucl = np.zeros((n, n), dtype=np.float64)
|
||||
hellinger_mat = np.zeros((n, n), dtype=np.float64)
|
||||
hell_eucl_mat = np.zeros((n, n), dtype=np.float64)
|
||||
|
||||
for i in range(n):
|
||||
a_km = specimens[i]['kmers']
|
||||
a_ct = specimens[i]['counts'].astype(np.float64)
|
||||
sa = float(count_sums[i])
|
||||
na = int(kmer_counts[i])
|
||||
|
||||
for j in range(i + 1, n):
|
||||
b_km = specimens[j]['kmers']
|
||||
b_ct = specimens[j]['counts'].astype(np.float64)
|
||||
sb = float(count_sums[j])
|
||||
nb = int(kmer_counts[j])
|
||||
|
||||
idx_a, idx_b = shared_indices(a_km, b_km)
|
||||
inter = len(idx_a)
|
||||
|
||||
ca_sh = a_ct[idx_a]
|
||||
cb_sh = b_ct[idx_b]
|
||||
|
||||
# ── Presence metrics ──────────────────────────────────────────────
|
||||
|
||||
union = na + nb - inter
|
||||
jac = (1.0 - inter / union) if union else 0.0
|
||||
hamming = float(na + nb - 2 * inter) # |A Δ B|
|
||||
|
||||
# ── Count metrics ─────────────────────────────────────────────────
|
||||
|
||||
# Bray-Curtis: 1 - 2*Σmin(a,b) / (Σa + Σb)
|
||||
sum_min = np.minimum(ca_sh, cb_sh).sum()
|
||||
denom_bc = sa + sb
|
||||
bc = (1.0 - 2.0 * sum_min / denom_bc) if denom_bc else 0.0
|
||||
|
||||
# RelfreqBray: 1 - Σmin(a/sa, b/sb) [only shared contribute]
|
||||
if sa and sb:
|
||||
rfb = 1.0 - np.minimum(ca_sh / sa, cb_sh / sb).sum()
|
||||
else:
|
||||
rfb = 0.0
|
||||
|
||||
# Euclidean: √(Σa² + Σb² - 2·Σ(a·b)_shared)
|
||||
cross = (ca_sh * cb_sh).sum()
|
||||
eucl_partial = sq_sums[i] + sq_sums[j] - 2.0 * cross
|
||||
eucl = np.sqrt(max(eucl_partial, 0.0))
|
||||
|
||||
# RelfreqEuclidean: √(Σ(a/sa - b/sb)²)
|
||||
# = √(Σa²/sa² + Σb²/sb² - 2·Σ(a·b)_shared/(sa·sb))
|
||||
if sa and sb:
|
||||
rf_cross = (ca_sh / sa * (cb_sh / sb)).sum()
|
||||
rfe_partial = (sq_sums[i] / sa**2
|
||||
+ sq_sums[j] / sb**2
|
||||
- 2.0 * rf_cross)
|
||||
rfe = np.sqrt(max(rfe_partial, 0.0))
|
||||
else:
|
||||
rfe = 0.0
|
||||
|
||||
# Hellinger partial: Σ(√(a/sa) - √(b/sb))² over global universe
|
||||
# = 2 - 2·Σ√(a·b)_shared / √(sa·sb)
|
||||
if sa and sb:
|
||||
bc_coeff = np.sqrt(ca_sh * cb_sh).sum() / np.sqrt(sa * sb)
|
||||
hell_partial = max(2.0 - 2.0 * bc_coeff, 0.0)
|
||||
else:
|
||||
hell_partial = 0.0
|
||||
|
||||
sq2 = np.sqrt(2.0)
|
||||
hell = np.sqrt(hell_partial) / sq2
|
||||
hell_euc = np.sqrt(hell_partial)
|
||||
|
||||
# ── Fill symmetric matrices ───────────────────────────────────────
|
||||
for mat, val in [
|
||||
(shared_mat, inter),
|
||||
(hamming_mat, hamming),
|
||||
(jaccard_mat, jac),
|
||||
(bray_mat, bc),
|
||||
(relfreq_bray, rfb),
|
||||
(euclidean_mat, eucl),
|
||||
(relfreq_eucl, rfe),
|
||||
(hellinger_mat, hell),
|
||||
(hell_eucl_mat, hell_euc),
|
||||
]:
|
||||
mat[i, j] = val
|
||||
mat[j, i] = val
|
||||
|
||||
return {
|
||||
'shared_kmers': shared_mat,
|
||||
'hamming_dist': hamming_mat,
|
||||
'jaccard_dist': jaccard_mat,
|
||||
'bray_curtis_dist': bray_mat,
|
||||
'relfreq_bray_curtis_dist': relfreq_bray,
|
||||
'euclidean_dist': euclidean_mat,
|
||||
'relfreq_euclidean_dist': relfreq_eucl,
|
||||
'hellinger_dist': hellinger_mat,
|
||||
'hellinger_euclidean_dist': hell_eucl_mat,
|
||||
}
|
||||
|
||||
|
||||
# ── I/O ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def write_csv(path: Path, labels: list[str], mat: np.ndarray, fmt: str) -> None:
|
||||
with path.open('w') as fh:
|
||||
fh.write('genome,' + ','.join(labels) + '\n')
|
||||
for i, label in enumerate(labels):
|
||||
row = ','.join(format(mat[i, j], fmt) for j in range(len(labels)))
|
||||
fh.write(f'{label},{row}\n')
|
||||
print(f' → {path}', file=sys.stderr)
|
||||
|
||||
|
||||
# ── main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser(description=__doc__,
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter)
|
||||
ap.add_argument('--ref-dir', default='reference_index',
|
||||
help='Directory with per-specimen .npz files (default: reference_index)')
|
||||
ap.add_argument('--out-dir', default='reference_dist',
|
||||
help='Output directory for CSV files (default: reference_dist)')
|
||||
args = ap.parse_args()
|
||||
|
||||
ref_dir = Path(args.ref_dir)
|
||||
out_dir = Path(args.out_dir)
|
||||
out_dir.mkdir(exist_ok=True)
|
||||
|
||||
npz_files = sorted(ref_dir.glob('*.npz'))
|
||||
if not npz_files:
|
||||
print(f'ERROR: no .npz files found in {ref_dir}', file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
print(f'Loading {len(npz_files)} specimen(s) from {ref_dir}/', file=sys.stderr)
|
||||
specimens = []
|
||||
for f in npz_files:
|
||||
data = np.load(f)
|
||||
specimens.append({
|
||||
'name': f.stem,
|
||||
'kmers': data['kmers'],
|
||||
'counts': data['counts'],
|
||||
})
|
||||
print(f' {f.stem}: {len(data["kmers"]):,} kmers', file=sys.stderr)
|
||||
|
||||
labels = [s['name'] for s in specimens]
|
||||
n = len(labels)
|
||||
print(f'\nComputing pairwise distances for {n} specimens…', file=sys.stderr)
|
||||
|
||||
matrices = pairwise_stats(specimens)
|
||||
|
||||
print(f'\nWriting CSVs to {out_dir}/', file=sys.stderr)
|
||||
write_csv(out_dir / 'shared_kmers.csv', labels, matrices['shared_kmers'], 'd')
|
||||
write_csv(out_dir / 'hamming_dist.csv', labels, matrices['hamming_dist'], '.6f')
|
||||
write_csv(out_dir / 'jaccard_dist.csv', labels, matrices['jaccard_dist'], '.6f')
|
||||
write_csv(out_dir / 'bray_curtis_dist.csv', labels, matrices['bray_curtis_dist'], '.6f')
|
||||
write_csv(out_dir / 'relfreq_bray_curtis_dist.csv', labels, matrices['relfreq_bray_curtis_dist'], '.6f')
|
||||
write_csv(out_dir / 'euclidean_dist.csv', labels, matrices['euclidean_dist'], '.6f')
|
||||
write_csv(out_dir / 'relfreq_euclidean_dist.csv', labels, matrices['relfreq_euclidean_dist'], '.6f')
|
||||
write_csv(out_dir / 'hellinger_dist.csv', labels, matrices['hellinger_dist'], '.6f')
|
||||
write_csv(out_dir / 'hellinger_euclidean_dist.csv', labels, matrices['hellinger_euclidean_dist'], '.6f')
|
||||
|
||||
print('\nDone.', file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Executable
+182
@@ -0,0 +1,182 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Compare all reference distance matrices against obikmer distance outputs.
|
||||
|
||||
Reads from:
|
||||
reference_dist/ — ground-truth matrices computed by build_reference_dist.py
|
||||
obikmer_dist/ — matrices produced by `obikmer distance`
|
||||
|
||||
Handles label reordering: both matrices are sorted by genome label before
|
||||
element-wise comparison, so column/row order differences are irrelevant.
|
||||
|
||||
Output: stats/dist_comparison/summary.csv
|
||||
comparison,max_abs,mean_abs,rmse,n_pairs,status
|
||||
"""
|
||||
import csv
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
# ── CSV loading ───────────────────────────────────────────────────────────────
|
||||
|
||||
def load_matrix(path: Path) -> tuple[list[str], np.ndarray]:
|
||||
"""Load a distance-matrix CSV; return (sorted_labels, matrix_float64)."""
|
||||
with path.open() as fh:
|
||||
reader = csv.reader(fh)
|
||||
header = next(reader)[1:] # skip 'genome' column
|
||||
raw: dict[str, list[float]] = {}
|
||||
for row in reader:
|
||||
raw[row[0]] = [float(x) for x in row[1:]]
|
||||
|
||||
label_to_col = {h: i for i, h in enumerate(header)}
|
||||
labels = sorted(raw.keys())
|
||||
n = len(labels)
|
||||
mat = np.zeros((n, n), dtype=np.float64)
|
||||
for i, ri in enumerate(labels):
|
||||
for j, cj in enumerate(labels):
|
||||
mat[i, j] = raw[ri][label_to_col[cj]]
|
||||
return labels, mat
|
||||
|
||||
|
||||
# ── comparison ────────────────────────────────────────────────────────────────
|
||||
|
||||
def compare(label: str,
|
||||
ref_path: Path,
|
||||
obi_path: Path,
|
||||
tol: float = 1e-4) -> dict:
|
||||
if not ref_path.exists():
|
||||
return {'comparison': label, 'status': 'REF_MISSING',
|
||||
'max_abs': '', 'mean_abs': '', 'rmse': '', 'n_pairs': ''}
|
||||
if not obi_path.exists():
|
||||
return {'comparison': label, 'status': 'OBI_MISSING',
|
||||
'max_abs': '', 'mean_abs': '', 'rmse': '', 'n_pairs': ''}
|
||||
|
||||
ref_labels, ref_mat = load_matrix(ref_path)
|
||||
obi_labels, obi_mat = load_matrix(obi_path)
|
||||
|
||||
if ref_labels != obi_labels:
|
||||
only_ref = sorted(set(ref_labels) - set(obi_labels))
|
||||
only_obi = sorted(set(obi_labels) - set(ref_labels))
|
||||
print(f' [{label}] label mismatch — '
|
||||
f'only_ref={only_ref} only_obi={only_obi}', file=sys.stderr)
|
||||
return {'comparison': label, 'status': 'LABEL_MISMATCH',
|
||||
'max_abs': '', 'mean_abs': '', 'rmse': '', 'n_pairs': ''}
|
||||
|
||||
n = len(ref_labels)
|
||||
# Off-diagonal mask
|
||||
mask = ~np.eye(n, dtype=bool)
|
||||
diff = np.abs(ref_mat[mask] - obi_mat[mask])
|
||||
n_pairs = diff.size
|
||||
|
||||
max_abs = float(diff.max())
|
||||
mean_abs = float(diff.mean())
|
||||
rmse = float(np.sqrt((diff ** 2).mean()))
|
||||
status = 'PASS' if max_abs <= tol else 'FAIL'
|
||||
|
||||
print(f' [{label}] n={n_pairs} '
|
||||
f'max={max_abs:.3e} mean={mean_abs:.3e} rmse={rmse:.3e} {status}',
|
||||
file=sys.stderr)
|
||||
return {
|
||||
'comparison': label,
|
||||
'max_abs': f'{max_abs:.6e}',
|
||||
'mean_abs': f'{mean_abs:.6e}',
|
||||
'rmse': f'{rmse:.6e}',
|
||||
'n_pairs': str(n_pairs),
|
||||
'status': status,
|
||||
}
|
||||
|
||||
|
||||
# ── comparison table ──────────────────────────────────────────────────────────
|
||||
|
||||
# (label, ref_csv, obikmer_csv)
|
||||
# The reference jaccard/shared is presence-based, which should match both
|
||||
# presence/jaccard and count/jaccard (threshold=1).
|
||||
COMPARISONS = [
|
||||
# ── presence index ────────────────────────────────────────────────────────
|
||||
('presence/jaccard_dist',
|
||||
'reference_dist/jaccard_dist.csv',
|
||||
'obikmer_dist/presence/jaccard_dist.csv'),
|
||||
|
||||
('presence/jaccard_shared',
|
||||
'reference_dist/shared_kmers.csv',
|
||||
'obikmer_dist/presence/jaccard_shared.csv'),
|
||||
|
||||
('presence/hamming_dist',
|
||||
'reference_dist/hamming_dist.csv',
|
||||
'obikmer_dist/presence/hamming_dist.csv'),
|
||||
|
||||
# ── count index (jaccard cross-check) ─────────────────────────────────────
|
||||
('count/jaccard_dist',
|
||||
'reference_dist/jaccard_dist.csv',
|
||||
'obikmer_dist/count/jaccard_dist.csv'),
|
||||
|
||||
('count/jaccard_shared',
|
||||
'reference_dist/shared_kmers.csv',
|
||||
'obikmer_dist/count/jaccard_shared.csv'),
|
||||
|
||||
# ── count index (count-based metrics) ────────────────────────────────────
|
||||
('count/bray_curtis_dist',
|
||||
'reference_dist/bray_curtis_dist.csv',
|
||||
'obikmer_dist/count/bray_curtis_dist.csv'),
|
||||
|
||||
('count/relfreq_bray_curtis_dist',
|
||||
'reference_dist/relfreq_bray_curtis_dist.csv',
|
||||
'obikmer_dist/count/relfreq_bray_curtis_dist.csv'),
|
||||
|
||||
('count/euclidean_dist',
|
||||
'reference_dist/euclidean_dist.csv',
|
||||
'obikmer_dist/count/euclidean_dist.csv'),
|
||||
|
||||
('count/relfreq_euclidean_dist',
|
||||
'reference_dist/relfreq_euclidean_dist.csv',
|
||||
'obikmer_dist/count/relfreq_euclidean_dist.csv'),
|
||||
|
||||
('count/hellinger_dist',
|
||||
'reference_dist/hellinger_dist.csv',
|
||||
'obikmer_dist/count/hellinger_dist.csv'),
|
||||
|
||||
('count/hellinger_euclidean_dist',
|
||||
'reference_dist/hellinger_euclidean_dist.csv',
|
||||
'obikmer_dist/count/hellinger_euclidean_dist.csv'),
|
||||
]
|
||||
|
||||
|
||||
# ── main ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
import argparse
|
||||
ap = argparse.ArgumentParser(description=__doc__,
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter)
|
||||
ap.add_argument('--tol', type=float, default=1e-4,
|
||||
help='Max abs diff threshold for PASS/FAIL (default 1e-4)')
|
||||
ap.add_argument('--out', default='stats/dist_comparison/summary.csv',
|
||||
help='Output summary CSV path')
|
||||
args = ap.parse_args()
|
||||
|
||||
out_path = Path(args.out)
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
print(f'Comparing {len(COMPARISONS)} matrix pairs…', file=sys.stderr)
|
||||
rows = []
|
||||
for label, ref, obi in COMPARISONS:
|
||||
rows.append(compare(label, Path(ref), Path(obi), tol=args.tol))
|
||||
|
||||
fields = ['comparison', 'max_abs', 'mean_abs', 'rmse', 'n_pairs', 'status']
|
||||
with out_path.open('w', newline='') as fh:
|
||||
w = csv.DictWriter(fh, fieldnames=fields)
|
||||
w.writeheader()
|
||||
w.writerows(rows)
|
||||
|
||||
print(f'\n→ {out_path}', file=sys.stderr)
|
||||
|
||||
n_fail = sum(1 for r in rows if r.get('status') == 'FAIL')
|
||||
n_pass = sum(1 for r in rows if r.get('status') == 'PASS')
|
||||
print(f'Summary: {n_pass} PASS {n_fail} FAIL '
|
||||
f'{len(rows) - n_pass - n_fail} SKIP', file=sys.stderr)
|
||||
if n_fail:
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -0,0 +1,21 @@
|
||||
genome,Candidozyma_auris--GCF_003013715.1_ASM301371v2,Acidobacterium_capsulatum--ATCC_51196,Bacillus_subtilis--168,Escherichia_coli--CFT073,Escherichia_coli--EDL933,Escherichia_coli--K-12_MG1655,Escherichia_coli--K-12_W3110,Klebsiella_pneumoniae--ATCC_13883,Klebsiella_pneumoniae--HS11286,Klebsiella_pneumoniae--MGH_78578,Opitutus_terrae--PB90-1,Proteus_mirabilis--HI4320,Saccharolobus_islandicus--M.16.4,Salmonella_enterica--AKU_12601,Salmonella_enterica--CT18,Salmonella_enterica--LT2,Salmonella_enterica--P125109,Shouchella_clausii--KSM-K16,Wolbachia_endosymbiont--GCF_000306885.1_ASM30688v1,Yersinia_ruckeri--YRB
|
||||
Candidozyma_auris--GCF_003013715.1_ASM301371v2,0.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000
|
||||
Acidobacterium_capsulatum--ATCC_51196,1.000000,0.000000,0.999981,0.999990,0.999989,0.999987,0.999987,0.999990,0.999988,0.999988,0.999994,0.999989,1.000000,0.999988,0.999987,0.999987,0.999988,0.999989,0.999991,0.999987
|
||||
Bacillus_subtilis--168,1.000000,0.999981,0.000000,0.999990,0.999989,0.999989,0.999989,0.999989,0.999988,0.999986,0.999995,0.999985,0.999999,0.999988,0.999987,0.999989,0.999988,0.999778,0.999993,0.999987
|
||||
Escherichia_coli--CFT073,1.000000,0.999990,0.999990,0.000000,0.825741,0.807495,0.807218,0.991156,0.996855,0.997849,0.999996,0.999633,1.000000,0.993885,0.996736,0.994148,0.993821,0.999991,0.999984,0.999291
|
||||
Escherichia_coli--EDL933,1.000000,0.999989,0.999989,0.825741,0.000000,0.735107,0.734775,0.996126,0.998058,0.997908,0.999997,0.999640,1.000000,0.993993,0.997126,0.994390,0.994059,0.999991,0.999986,0.999292
|
||||
Escherichia_coli--K-12_MG1655,1.000000,0.999987,0.999989,0.807495,0.735107,0.000000,0.382567,0.996190,0.997747,0.997455,0.999996,0.999604,1.000000,0.993444,0.996645,0.993773,0.993431,0.999989,0.999984,0.999174
|
||||
Escherichia_coli--K-12_W3110,1.000000,0.999987,0.999989,0.807218,0.734775,0.382567,0.000000,0.996220,0.997761,0.997467,0.999995,0.999604,1.000000,0.993445,0.996669,0.993769,0.993443,0.999990,0.999985,0.999165
|
||||
Klebsiella_pneumoniae--ATCC_13883,1.000000,0.999990,0.999989,0.991156,0.996126,0.996190,0.996220,0.000000,0.845220,0.840545,0.999997,0.999648,1.000000,0.996177,0.998128,0.996268,0.996052,0.999990,0.999987,0.999325
|
||||
Klebsiella_pneumoniae--HS11286,1.000000,0.999988,0.999988,0.996855,0.998058,0.997747,0.997761,0.845220,0.000000,0.906475,0.999996,0.999683,1.000000,0.997724,0.995697,0.997776,0.997769,0.999989,0.999979,0.999463
|
||||
Klebsiella_pneumoniae--MGH_78578,1.000000,0.999988,0.999986,0.997849,0.997908,0.997455,0.997467,0.840545,0.906475,0.000000,0.999996,0.999704,1.000000,0.997928,0.995054,0.997844,0.997868,0.999990,0.999980,0.999479
|
||||
Opitutus_terrae--PB90-1,1.000000,0.999994,0.999995,0.999996,0.999997,0.999996,0.999995,0.999997,0.999996,0.999996,0.000000,0.999997,0.999998,0.999996,0.999996,0.999996,0.999995,0.999997,0.999993,0.999996
|
||||
Proteus_mirabilis--HI4320,1.000000,0.999989,0.999985,0.999633,0.999640,0.999604,0.999604,0.999648,0.999683,0.999704,0.999997,0.000000,1.000000,0.999604,0.999699,0.999622,0.999613,0.999987,0.999983,0.999505
|
||||
Saccharolobus_islandicus--M.16.4,1.000000,1.000000,0.999999,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,0.999998,1.000000,0.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000
|
||||
Salmonella_enterica--AKU_12601,1.000000,0.999988,0.999988,0.993885,0.993993,0.993444,0.993445,0.996177,0.997724,0.997928,0.999996,0.999604,1.000000,0.000000,0.869238,0.682277,0.663383,0.999990,0.999985,0.999260
|
||||
Salmonella_enterica--CT18,1.000000,0.999987,0.999987,0.996736,0.997126,0.996645,0.996669,0.998128,0.995697,0.995054,0.999996,0.999699,1.000000,0.869238,0.000000,0.890872,0.886148,0.999988,0.999976,0.999524
|
||||
Salmonella_enterica--LT2,1.000000,0.999987,0.999989,0.994148,0.994390,0.993773,0.993769,0.996268,0.997776,0.997844,0.999996,0.999622,1.000000,0.682277,0.890872,0.000000,0.622606,0.999989,0.999985,0.999296
|
||||
Salmonella_enterica--P125109,1.000000,0.999988,0.999988,0.993821,0.994059,0.993431,0.993443,0.996052,0.997769,0.997868,0.999995,0.999613,1.000000,0.663383,0.886148,0.622606,0.000000,0.999988,0.999983,0.999270
|
||||
Shouchella_clausii--KSM-K16,1.000000,0.999989,0.999778,0.999991,0.999991,0.999989,0.999990,0.999990,0.999989,0.999990,0.999997,0.999987,1.000000,0.999990,0.999988,0.999989,0.999988,0.000000,0.999991,0.999988
|
||||
Wolbachia_endosymbiont--GCF_000306885.1_ASM30688v1,1.000000,0.999991,0.999993,0.999984,0.999986,0.999984,0.999985,0.999987,0.999979,0.999980,0.999993,0.999983,1.000000,0.999985,0.999976,0.999985,0.999983,0.999991,0.000000,0.999983
|
||||
Yersinia_ruckeri--YRB,1.000000,0.999987,0.999987,0.999291,0.999292,0.999174,0.999165,0.999325,0.999463,0.999479,0.999996,0.999505,1.000000,0.999260,0.999524,0.999296,0.999270,0.999988,0.999983,0.000000
|
||||
|
@@ -0,0 +1 @@
|
||||
(((((((((((Candidozyma_auris--GCF_003013715.1_ASM301371v2:0.5000001881725941,Saccharolobus_islandicus--M.16.4:0.4999993211600824):0.0000023411501775538747,Opitutus_terrae--PB90-1:0.499997075187947):0.0000029791191795691675,(Acidobacterium_capsulatum--ATCC_51196:0.49999227771334689,(Bacillus_subtilis--168:0.49988797935621456,Shouchella_clausii--KSM-K16:0.49988984146059159):0.0001037210285571577):0.0000023959836053522034):0.0000034093646568700288,Wolbachia_endosymbiont--GCF_000306885.1_ASM30688v1:0.4999920159222422):0.000199555100890203,Proteus_mirabilis--HI4320:0.49979129185300427):0.00010103619067070024,Yersinia_ruckeri--YRB:0.4996806650749249):0.0013719139155004,(Klebsiella_pneumoniae--HS11286:0.43798845051648258,(Klebsiella_pneumoniae--ATCC_13883:0.41780293826821265,Klebsiella_pneumoniae--MGH_78578:0.42274184870836559):0.017586732339732737):0.0604124197073832):0.0006482538063555254,(Salmonella_enterica--CT18:0.43952894448143017,(Salmonella_enterica--AKU_12601:0.3357977326267918,(Salmonella_enterica--LT2:0.31203395843666389,Salmonella_enterica--P125109:0.31057217324861216):0.025729515856701136):0.10292985918524672):0.05825411485542886):0.08937928015651564,Escherichia_coli--CFT073:0.40806501650701029):0.0410131211869626,Escherichia_coli--EDL933:0.3681464750911808):0.1755112579711463,Escherichia_coli--K-12_MG1655:0.19129818036662728,Escherichia_coli--K-12_W3110:0.19126872019906239);
|
||||
@@ -0,0 +1,21 @@
|
||||
genome,Candidozyma_auris--GCF_003013715.1_ASM301371v2,Acidobacterium_capsulatum--ATCC_51196,Bacillus_subtilis--168,Escherichia_coli--CFT073,Escherichia_coli--EDL933,Escherichia_coli--K-12_MG1655,Escherichia_coli--K-12_W3110,Klebsiella_pneumoniae--ATCC_13883,Klebsiella_pneumoniae--HS11286,Klebsiella_pneumoniae--MGH_78578,Opitutus_terrae--PB90-1,Proteus_mirabilis--HI4320,Saccharolobus_islandicus--M.16.4,Salmonella_enterica--AKU_12601,Salmonella_enterica--CT18,Salmonella_enterica--LT2,Salmonella_enterica--P125109,Shouchella_clausii--KSM-K16,Wolbachia_endosymbiont--GCF_000306885.1_ASM30688v1,Yersinia_ruckeri--YRB
|
||||
Candidozyma_auris--GCF_003013715.1_ASM301371v2,0,0,0,0,0,0,0,0,0,0,0,0,8,0,1,0,0,0,0,3
|
||||
Acidobacterium_capsulatum--ATCC_51196,0,0,203,119,128,141,140,116,109,111,78,112,0,136,109,147,134,117,55,129
|
||||
Bacillus_subtilis--168,0,203,0,124,132,128,123,133,109,130,66,158,6,131,112,124,135,2393,46,124
|
||||
Escherichia_coli--CFT073,0,119,124,0,1966777,1998059,1999094,117743,32029,22312,63,4225,0,74946,31918,73311,76585,113,128,7854
|
||||
Escherichia_coli--EDL933,0,128,132,1966777,0,2627885,2628700,52488,20134,22064,48,4202,0,74655,28602,71244,74665,112,108,7963
|
||||
Escherichia_coli--K-12_MG1655,0,141,128,1998059,2627885,0,4452541,48302,21382,24602,47,4277,0,75729,30449,73622,76778,119,111,8566
|
||||
Escherichia_coli--K-12_W3110,0,140,123,1999094,2628700,4452541,0,47894,21226,24470,68,4278,0,75658,30207,73614,76583,112,108,8660
|
||||
Klebsiella_pneumoniae--ATCC_13883,0,116,133,117743,52488,48302,47894,0,1416091,1477759,42,4172,0,48296,18988,48144,50416,120,106,7712
|
||||
Klebsiella_pneumoniae--HS11286,0,109,109,32029,20134,21382,21226,1416091,0,644063,42,2738,0,21498,29758,21606,21376,99,102,4417
|
||||
Klebsiella_pneumoniae--MGH_78578,0,111,130,22312,22064,24602,24470,1477759,644063,0,42,2614,0,19948,35067,21330,20813,97,102,4374
|
||||
Opitutus_terrae--PB90-1,0,78,66,63,48,47,68,42,42,42,0,43,18,57,42,53,66,39,58,43
|
||||
Proteus_mirabilis--HI4320,0,112,158,4225,4202,4277,4278,4172,2738,2614,43,0,0,4254,2481,4166,4215,131,103,4704
|
||||
Saccharolobus_islandicus--M.16.4,8,0,6,0,0,0,0,0,0,0,18,0,0,0,0,0,0,0,0,0
|
||||
Salmonella_enterica--AKU_12601,0,136,131,74946,74655,75729,75658,48296,21498,19948,57,4254,0,0,1047731,2857146,2951421,117,108,7643
|
||||
Salmonella_enterica--CT18,1,109,112,31918,28602,30449,30207,18988,29758,35067,42,2481,0,1047731,0,917948,940297,106,106,3716
|
||||
Salmonella_enterica--LT2,0,147,124,73311,71244,73622,73614,48144,21606,21330,53,4166,0,2857146,917948,0,3284800,122,108,7460
|
||||
Salmonella_enterica--P125109,0,134,135,76585,74665,76778,76583,50416,21376,20813,66,4215,0,2951421,940297,3284800,0,134,124,7645
|
||||
Shouchella_clausii--KSM-K16,0,117,2393,113,112,119,112,120,99,97,39,131,0,117,106,122,134,0,58,124
|
||||
Wolbachia_endosymbiont--GCF_000306885.1_ASM30688v1,0,55,46,128,108,111,108,106,102,102,58,103,0,108,106,108,124,58,0,96
|
||||
Yersinia_ruckeri--YRB,3,129,124,7854,7963,8566,8660,7712,4417,4374,43,4704,0,7643,3716,7460,7645,124,96,0
|
||||
|
Generated
+1
-1
@@ -1704,7 +1704,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "obikmer"
|
||||
version = "1.1.11"
|
||||
version = "1.1.29"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"csv",
|
||||
|
||||
@@ -17,4 +17,8 @@ serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
indicatif = "0.17"
|
||||
tracing = "0.1.44"
|
||||
hwlocality = { version = "1.0.0-alpha.11", features = ["vendored"] }
|
||||
hwlocality = { version = "1.0.0-alpha.11", features = ["vendored"], optional = true }
|
||||
|
||||
[features]
|
||||
default = ["numa"]
|
||||
numa = ["hwlocality"]
|
||||
|
||||
+204
-101
@@ -5,77 +5,102 @@
|
||||
// CPUs. Linux first-touch policy then places graph allocations in local DRAM
|
||||
// automatically — no explicit memory binding needed.
|
||||
//
|
||||
// Returns None when:
|
||||
// - hwloc topology initialisation fails
|
||||
// - the system has only one NUMA node (UMA, Apple Silicon, single-socket)
|
||||
// - any per-node pool fails to build
|
||||
// UMA systems (single socket, Apple Silicon, etc.) are the degenerate case:
|
||||
// one synthetic node containing all cores, no pool, no pinning.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crossbeam_channel::unbounded;
|
||||
#[cfg(feature = "numa")]
|
||||
use hwlocality::Topology;
|
||||
#[cfg(feature = "numa")]
|
||||
use hwlocality::cpu::binding::CpuBindingFlags;
|
||||
#[cfg(feature = "numa")]
|
||||
use hwlocality::cpu::cpuset::CpuSet;
|
||||
#[cfg(feature = "numa")]
|
||||
use hwlocality::object::types::ObjectType;
|
||||
use obisys::CpuSample;
|
||||
use tracing::debug;
|
||||
|
||||
// ── Public interface ──────────────────────────────────────────────────────────
|
||||
|
||||
pub struct NumaSetup {
|
||||
pub pools: Vec<Arc<rayon::ThreadPool>>,
|
||||
/// One entry per NUMA node. `None` on UMA systems (no pool, no pinning).
|
||||
pub pools: Vec<Option<Arc<rayon::ThreadPool>>>,
|
||||
/// CPU indices for each NUMA node, in node order.
|
||||
pub cpus_per_node: Vec<Vec<usize>>,
|
||||
}
|
||||
|
||||
impl NumaSetup {
|
||||
/// Workers to activate per NUMA node.
|
||||
/// Empirically ~3 workers saturate one node's memory bandwidth.
|
||||
/// Maximum worker slots per node (one per physical core in the node).
|
||||
pub fn workers_per_node(&self) -> usize {
|
||||
self.cpus_per_node
|
||||
.first()
|
||||
.map(|c| (c.len() / 8).max(3).min(8))
|
||||
.unwrap_or(3)
|
||||
.map(|c| c.len().max(1))
|
||||
.unwrap_or(1)
|
||||
}
|
||||
}
|
||||
|
||||
/// Detect NUMA topology and build per-node Rayon pools.
|
||||
/// Returns None on UMA systems, single-node machines, or on failure.
|
||||
pub fn build() -> Option<NumaSetup> {
|
||||
let topology = Topology::new().ok()?;
|
||||
/// Always succeeds: falls back to a single synthetic UMA node on failure.
|
||||
#[cfg(feature = "numa")]
|
||||
pub fn build() -> NumaSetup {
|
||||
if let Ok(topology) = Topology::new() {
|
||||
let nodes: Vec<Vec<usize>> = topology
|
||||
.objects_with_type(ObjectType::NUMANode)
|
||||
.filter_map(|obj| obj.cpuset())
|
||||
.map(|cpuset| {
|
||||
cpuset
|
||||
.iter_set()
|
||||
.map(|idx| usize::from(idx))
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.filter(|v| !v.is_empty())
|
||||
.collect();
|
||||
|
||||
let nodes: Vec<Vec<usize>> = topology
|
||||
.objects_with_type(ObjectType::NUMANode)
|
||||
.filter_map(|obj| obj.cpuset())
|
||||
.map(|cpuset| {
|
||||
cpuset
|
||||
.iter_set()
|
||||
.map(|idx| usize::from(idx))
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.filter(|v| !v.is_empty())
|
||||
.collect();
|
||||
|
||||
if nodes.len() <= 1 {
|
||||
return None;
|
||||
if nodes.len() > 1 {
|
||||
if let Some(pools) = nodes
|
||||
.iter()
|
||||
.map(|cpus| build_pool(cpus).map(|p| Some(Arc::new(p))))
|
||||
.collect::<Option<Vec<_>>>()
|
||||
{
|
||||
debug!(
|
||||
"NUMA topology: {} node(s), {} core(s)/node",
|
||||
nodes.len(),
|
||||
nodes.first().map_or(0, |v| v.len()),
|
||||
);
|
||||
return NumaSetup { pools, cpus_per_node: nodes };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!(
|
||||
"NUMA topology: {} node(s), {} core(s)/node",
|
||||
nodes.len(),
|
||||
nodes.first().map_or(0, |v| v.len()),
|
||||
);
|
||||
// UMA fallback: single synthetic node, all cores, no pool, no pinning.
|
||||
let n_cores = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1);
|
||||
debug!("UMA: single synthetic node, {} core(s)", n_cores);
|
||||
NumaSetup {
|
||||
pools: vec![None],
|
||||
cpus_per_node: vec![(0..n_cores).collect()],
|
||||
}
|
||||
}
|
||||
|
||||
let pools = nodes
|
||||
.iter()
|
||||
.map(|cpus| build_pool(cpus).map(Arc::new))
|
||||
.collect::<Option<Vec<_>>>()?;
|
||||
|
||||
Some(NumaSetup { pools, cpus_per_node: nodes })
|
||||
#[cfg(not(feature = "numa"))]
|
||||
pub fn build() -> NumaSetup {
|
||||
let n_cores = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1);
|
||||
debug!("UMA: single synthetic node, {} core(s)", n_cores);
|
||||
NumaSetup {
|
||||
pools: vec![None],
|
||||
cpus_per_node: vec![(0..n_cores).collect()],
|
||||
}
|
||||
}
|
||||
|
||||
/// Bind the calling thread to `cpu_indices` using hwloc.
|
||||
/// Silently returns on any error so the thread still runs, just unbound.
|
||||
#[cfg(feature = "numa")]
|
||||
pub fn pin_current_thread(cpu_indices: &[usize]) {
|
||||
let Ok(topology) = Topology::new() else { return };
|
||||
let mut cpuset = CpuSet::new();
|
||||
@@ -85,8 +110,12 @@ pub fn pin_current_thread(cpu_indices: &[usize]) {
|
||||
let _ = topology.bind_cpu(&cpuset, CpuBindingFlags::THREAD);
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "numa"))]
|
||||
pub fn pin_current_thread(_cpu_indices: &[usize]) {}
|
||||
|
||||
// ── Internal helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(feature = "numa")]
|
||||
fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
|
||||
let cpus = cpus.to_vec();
|
||||
rayon::ThreadPoolBuilder::new()
|
||||
@@ -114,18 +143,19 @@ struct NodeConfig {
|
||||
/// Generic NUMA-aware runner for partition-level parallel work.
|
||||
///
|
||||
/// Workers are distributed round-robin across NUMA nodes and pinned to their
|
||||
/// node's CPUs. UMA systems are the degenerate case: one node, no pinning.
|
||||
/// node's CPUs. UMA is the degenerate case: one node, no pinning.
|
||||
///
|
||||
/// Workers are pre-spawned dormant and activated one by one as CPU efficiency
|
||||
/// falls below `SPAWN_THRESHOLD`. This avoids over-provisioning on I/O-bound
|
||||
/// or memory-bandwidth-bound workloads while saturating CPU-bound ones.
|
||||
///
|
||||
/// # Termination
|
||||
///
|
||||
/// Termination is driven entirely by channel closure:
|
||||
///
|
||||
/// ```text
|
||||
/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
|
||||
/// drop(result_tx) → result_rx closes → controller loop exits
|
||||
/// drop(activate_tx) → dormant workers exit cleanly
|
||||
/// ```
|
||||
///
|
||||
/// No explicit counter or sentinel needed.
|
||||
pub struct PartitionRunner {
|
||||
nodes: Vec<NodeConfig>,
|
||||
}
|
||||
@@ -136,50 +166,38 @@ impl PartitionRunner {
|
||||
self.nodes.iter().map(|n| n.max_workers).sum()
|
||||
}
|
||||
|
||||
/// Detect topology and build. Falls back to a single-node UMA runner on
|
||||
/// macOS, single-socket machines, or hwloc failure.
|
||||
/// Detect topology and build. Always succeeds.
|
||||
pub fn new() -> Self {
|
||||
match build() {
|
||||
Some(ns) => {
|
||||
let wpn = ns.workers_per_node();
|
||||
debug!(
|
||||
"PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node",
|
||||
ns.pools.len(), wpn,
|
||||
);
|
||||
let nodes = ns.pools
|
||||
.into_iter()
|
||||
.zip(ns.cpus_per_node)
|
||||
.map(|(pool, cpu_ids)| NodeConfig {
|
||||
pool: Some(pool),
|
||||
cpu_ids,
|
||||
max_workers: wpn,
|
||||
})
|
||||
.collect();
|
||||
Self { nodes }
|
||||
}
|
||||
None => {
|
||||
let n_cores = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1);
|
||||
let max_workers = (n_cores / 2).max(1);
|
||||
debug!("PartitionRunner: UMA mode — {} worker(s)", max_workers);
|
||||
Self {
|
||||
nodes: vec![NodeConfig {
|
||||
pool: None,
|
||||
cpu_ids: vec![],
|
||||
max_workers,
|
||||
}],
|
||||
}
|
||||
}
|
||||
}
|
||||
let ns = build();
|
||||
let wpn = ns.workers_per_node();
|
||||
debug!(
|
||||
"PartitionRunner: {} node(s) × {} worker(s)/node max",
|
||||
ns.pools.len(),
|
||||
wpn,
|
||||
);
|
||||
let nodes = ns.pools
|
||||
.into_iter()
|
||||
.zip(ns.cpus_per_node)
|
||||
.map(|(pool, cpu_ids)| NodeConfig {
|
||||
pool,
|
||||
cpu_ids,
|
||||
max_workers: wpn,
|
||||
})
|
||||
.collect();
|
||||
Self { nodes }
|
||||
}
|
||||
|
||||
/// Run `f(i)` for every index in `order`.
|
||||
///
|
||||
/// Workers are spawned upfront and distributed round-robin across NUMA
|
||||
/// nodes. `on_done(i, result, elapsed)` is called from the controller
|
||||
/// thread as each partition completes — suitable for progress bars and
|
||||
/// result aggregation.
|
||||
/// Workers are pre-spawned dormant and activated adaptively. A timer thread
|
||||
/// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed
|
||||
/// partition resets that timer (forcing an immediate check) and also
|
||||
/// triggers its own inline check. A new worker is activated whenever
|
||||
/// efficiency falls below `SPAWN_THRESHOLD`.
|
||||
///
|
||||
/// `on_done(i, result, elapsed)` is called from the controller thread as
|
||||
/// each partition completes — suitable for progress bars and result
|
||||
/// aggregation.
|
||||
///
|
||||
/// Returns the first error produced by `f`, if any.
|
||||
pub fn run<F, R, E, C>(
|
||||
@@ -194,28 +212,61 @@ impl PartitionRunner {
|
||||
E: Send,
|
||||
C: FnMut(usize, R, Duration) + Send,
|
||||
{
|
||||
// Pre-load the work queue, then drop the sender so workers' part_rx
|
||||
// iterators terminate when the queue is drained.
|
||||
let (part_tx, part_rx) = unbounded::<usize>();
|
||||
let n_total = order.len();
|
||||
if n_total == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
const SPAWN_THRESHOLD: f64 = 0.2;
|
||||
const TIMER_SECS: u64 = 30;
|
||||
|
||||
// ── Channels ──────────────────────────────────────────────────────────
|
||||
let (part_tx, part_rx) = unbounded::<usize>();
|
||||
let (activate_tx, activate_rx) = unbounded::<()>();
|
||||
// reset_tx: controller → timer ("reset the 30 s window")
|
||||
let (reset_tx, reset_rx) = unbounded::<()>();
|
||||
// event_tx: workers + timer → controller (unified event stream)
|
||||
let (event_tx, event_rx) = unbounded::<WorkerEvent<R, E>>();
|
||||
|
||||
for &i in order { part_tx.send(i).ok(); }
|
||||
drop(part_tx);
|
||||
|
||||
let (result_tx, result_rx) = unbounded::<(usize, Result<R, E>, Duration)>();
|
||||
let n_nodes = self.nodes.len();
|
||||
let f = &f; // shared borrow; F: Sync so concurrent calls are safe
|
||||
let max_workers = self.max_workers();
|
||||
let n_nodes = self.nodes.len();
|
||||
let f = &f;
|
||||
|
||||
let mut first_err: Option<E> = None;
|
||||
|
||||
std::thread::scope(|s| {
|
||||
// Spawn all workers upfront, round-robin across NUMA nodes.
|
||||
for w in 0..self.max_workers() {
|
||||
// ── Timer thread ──────────────────────────────────────────────────
|
||||
// Sends TimerTick every TIMER_SECS seconds. Resets its window each
|
||||
// time reset_rx receives a message (i.e. on partition completion).
|
||||
let timer_tx = event_tx.clone();
|
||||
s.spawn(move || {
|
||||
let period = Duration::from_secs(TIMER_SECS);
|
||||
loop {
|
||||
crossbeam_channel::select! {
|
||||
recv(reset_rx) -> r => {
|
||||
if r.is_err() { break; } // reset_tx dropped → exit
|
||||
}
|
||||
default(period) => {
|
||||
if timer_tx.send(WorkerEvent::TimerTick).is_err() { break; }
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// ── Pre-spawn workers dormant, round-robin across NUMA nodes ──────
|
||||
for w in 0..max_workers {
|
||||
let node = &self.nodes[w % n_nodes];
|
||||
let prx = part_rx.clone();
|
||||
let rtx = result_tx.clone();
|
||||
let etx = event_tx.clone();
|
||||
let arx = activate_rx.clone();
|
||||
let pool = node.pool.clone();
|
||||
let cpu_ids = &node.cpu_ids;
|
||||
|
||||
s.spawn(move || {
|
||||
if arx.recv().is_err() { return; }
|
||||
if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); }
|
||||
for i in &prx {
|
||||
let t = Instant::now();
|
||||
@@ -223,24 +274,51 @@ impl PartitionRunner {
|
||||
Some(p) => p.install(|| f(i)),
|
||||
None => f(i),
|
||||
};
|
||||
rtx.send((i, r, t.elapsed())).ok();
|
||||
etx.send(WorkerEvent::Completed(i, r, t.elapsed())).ok();
|
||||
}
|
||||
});
|
||||
}
|
||||
// Drop controller's event_tx: event_rx closes when all workers +
|
||||
// timer have exited.
|
||||
drop(event_tx);
|
||||
|
||||
// Drop the controller's sender: result_rx closes once all worker
|
||||
// rtx clones are dropped (i.e. all workers have exited).
|
||||
drop(result_tx);
|
||||
// ── Controller ────────────────────────────────────────────────────
|
||||
let initial_workers = n_nodes.min(max_workers).min(n_total);
|
||||
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
|
||||
let mut n_active = initial_workers;
|
||||
let mut cpu_sample = CpuSample::now();
|
||||
let mut completed = 0usize;
|
||||
|
||||
// Drain results concurrently with workers. The for loop exits
|
||||
// when result_rx is disconnected — at that point all workers are
|
||||
// done and the scope join below is instantaneous.
|
||||
for (i, r, dur) in &result_rx {
|
||||
match r {
|
||||
Ok(v) => on_done(i, v, dur),
|
||||
Err(e) => { if first_err.is_none() { first_err = Some(e); } }
|
||||
while completed < n_total {
|
||||
let Ok(event) = event_rx.recv() else { break };
|
||||
match event {
|
||||
WorkerEvent::Completed(i, r, dur) => {
|
||||
match r {
|
||||
Ok(v) => on_done(i, v, dur),
|
||||
Err(e) => { if first_err.is_none() { first_err = Some(e); } }
|
||||
}
|
||||
completed += 1;
|
||||
// Reset the 30 s timer.
|
||||
reset_tx.send(()).ok();
|
||||
// Inline check: same logic as a timer tick.
|
||||
maybe_activate(
|
||||
&activate_tx, &mut n_active, max_workers,
|
||||
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
|
||||
);
|
||||
}
|
||||
WorkerEvent::TimerTick => {
|
||||
maybe_activate(
|
||||
&activate_tx, &mut n_active, max_workers,
|
||||
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Dormant workers exit when activate_tx closes.
|
||||
drop(activate_tx);
|
||||
// Timer thread exits when reset_tx closes.
|
||||
drop(reset_tx);
|
||||
});
|
||||
|
||||
match first_err {
|
||||
@@ -249,3 +327,28 @@ impl PartitionRunner {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Internal event type ───────────────────────────────────────────────────────
|
||||
|
||||
enum WorkerEvent<R, E> {
|
||||
Completed(usize, Result<R, E>, Duration),
|
||||
TimerTick,
|
||||
}
|
||||
|
||||
fn maybe_activate(
|
||||
activate_tx: &crossbeam_channel::Sender<()>,
|
||||
n_active: &mut usize,
|
||||
max_workers: usize,
|
||||
cpu_sample: &mut CpuSample,
|
||||
threshold: f64,
|
||||
completed: usize,
|
||||
n_total: usize,
|
||||
) {
|
||||
if *n_active >= max_workers || completed >= n_total { return; }
|
||||
|
||||
if cpu_sample.do_i_activate(threshold) {
|
||||
activate_tx.send(()).ok();
|
||||
*n_active += 1;
|
||||
debug!("activated worker {}/{}", n_active, max_workers);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "obikmer"
|
||||
version = "1.1.11"
|
||||
version = "1.1.29"
|
||||
edition = "2024"
|
||||
|
||||
[[bin]]
|
||||
@@ -18,7 +18,7 @@ obikrope = { path = "../obikrope" }
|
||||
obikpartitionner = { path = "../obikpartitionner" }
|
||||
obisys = { path = "../obisys" }
|
||||
obiskio = { path = "../obiskio" }
|
||||
obikindex = { path = "../obikindex" }
|
||||
obikindex = { path = "../obikindex", default-features = false }
|
||||
obitaxonomy = { path = "../obitaxonomy" }
|
||||
obilayeredmap = { path = "../obilayeredmap" }
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
@@ -33,4 +33,6 @@ tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
|
||||
pprof = { version = "0.13", features = ["prost-codec"], optional = true }
|
||||
|
||||
[features]
|
||||
default = ["numa"]
|
||||
numa = ["obikindex/numa"]
|
||||
profiling = ["dep:pprof"]
|
||||
|
||||
+210
-100
@@ -4,7 +4,7 @@ use std::sync::{Condvar, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use tracing::{info, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"];
|
||||
|
||||
@@ -14,24 +14,25 @@ const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧
|
||||
/// a TTY (e.g. HPC job logs): every 10% for bounded bars, every ~10 s for
|
||||
/// spinners (throttled on `set_message`).
|
||||
pub struct TracedBar {
|
||||
pb: ProgressBar,
|
||||
label: String,
|
||||
unit: String,
|
||||
total: u64, // 0 for spinners
|
||||
start: Instant, // creation time, for spinner throttling
|
||||
last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet
|
||||
last_log_ms: AtomicU64, // ms since `start` at last spinner log
|
||||
pb: ProgressBar,
|
||||
label: String,
|
||||
unit: String,
|
||||
total: u64, // 0 for spinners
|
||||
start: Instant, // creation time, for spinner throttling
|
||||
last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet
|
||||
last_log_ms: AtomicU64, // ms since `start` at last spinner log
|
||||
}
|
||||
|
||||
impl TracedBar {
|
||||
pub fn inc(&self, delta: u64) {
|
||||
self.pb.inc(delta);
|
||||
if self.pb.is_hidden() && self.total > 0 {
|
||||
let pos = self.pb.position();
|
||||
let pos = self.pb.position();
|
||||
let pct10 = (pos * 10) / self.total; // 0..=10
|
||||
let last = self.last_pct.load(Ordering::Relaxed);
|
||||
let last = self.last_pct.load(Ordering::Relaxed);
|
||||
if pct10 > last
|
||||
&& self.last_pct
|
||||
&& self
|
||||
.last_pct
|
||||
.compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
@@ -49,14 +50,14 @@ impl TracedBar {
|
||||
let msg = msg.into();
|
||||
if self.pb.is_hidden() {
|
||||
if self.total > 0 {
|
||||
// bounded bar: always log (already rate-limited by 10% threshold in inc)
|
||||
info!(stage = %self.label, "{msg}");
|
||||
debug!(stage = %self.label, "{msg}");
|
||||
} else {
|
||||
// spinner: throttle to ~10 s
|
||||
let now_ms = self.start.elapsed().as_millis() as u64;
|
||||
let last = self.last_log_ms.load(Ordering::Relaxed);
|
||||
let last = self.last_log_ms.load(Ordering::Relaxed);
|
||||
if now_ms >= last + 10_000
|
||||
&& self.last_log_ms
|
||||
&& self
|
||||
.last_log_ms
|
||||
.compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
@@ -83,8 +84,13 @@ pub fn spinner(label: &str) -> TracedBar {
|
||||
);
|
||||
pb.enable_steady_tick(Duration::from_millis(100));
|
||||
TracedBar {
|
||||
pb, label: label.to_string(), unit: String::new(), total: 0,
|
||||
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
|
||||
pb,
|
||||
label: label.to_string(),
|
||||
unit: String::new(),
|
||||
total: 0,
|
||||
start: Instant::now(),
|
||||
last_pct: AtomicU64::new(0),
|
||||
last_log_ms: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,8 +107,13 @@ pub fn progress_bar(label: &str, n: u64, unit: &str) -> TracedBar {
|
||||
);
|
||||
pb.enable_steady_tick(Duration::from_millis(100));
|
||||
TracedBar {
|
||||
pb, label: label.to_string(), unit: unit.to_string(), total: n,
|
||||
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
|
||||
pb,
|
||||
label: label.to_string(),
|
||||
unit: unit.to_string(),
|
||||
total: n,
|
||||
start: Instant::now(),
|
||||
last_pct: AtomicU64::new(0),
|
||||
last_log_ms: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,13 +215,19 @@ fn tv_to_secs(tv: timeval) -> f64 {
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 }
|
||||
fn rss_to_bytes(ru: &rusage) -> u64 {
|
||||
ru.ru_maxrss as u64
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 }
|
||||
fn rss_to_bytes(ru: &rusage) -> u64 {
|
||||
ru.ru_maxrss as u64 * 1024
|
||||
}
|
||||
|
||||
// Monotonically increasing counters — negative delta would be a kernel bug.
|
||||
fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
|
||||
fn delta(end: i64, start: i64) -> u64 {
|
||||
(end - start).max(0) as u64
|
||||
}
|
||||
|
||||
// ── CpuSample ─────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -218,31 +235,60 @@ fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
|
||||
/// Use [`cpu_efficiency`](Self::cpu_efficiency) to measure the fraction of
|
||||
/// available cores used since the snapshot was taken.
|
||||
pub struct CpuSample {
|
||||
wall: Instant,
|
||||
wall: Instant,
|
||||
user_secs: f64,
|
||||
sys_secs: f64,
|
||||
sys_secs: f64,
|
||||
previous: f64,
|
||||
}
|
||||
|
||||
impl CpuSample {
|
||||
pub fn now() -> Self {
|
||||
let ru = get_rusage();
|
||||
Self {
|
||||
wall: Instant::now(),
|
||||
wall: Instant::now(),
|
||||
user_secs: tv_to_secs(ru.ru_utime),
|
||||
sys_secs: tv_to_secs(ru.ru_stime),
|
||||
sys_secs: tv_to_secs(ru.ru_stime),
|
||||
previous: 0.0,
|
||||
}
|
||||
}
|
||||
|
||||
/// (user_delta + sys_delta) / (wall_delta × n_cores) since this snapshot.
|
||||
/// Returns 0.0 if less than 100 ms have elapsed (too noisy).
|
||||
pub fn cpu_efficiency(&self, n_cores: usize) -> f64 {
|
||||
let ru = get_rusage();
|
||||
let ru = get_rusage();
|
||||
let wall = self.wall.elapsed().as_secs_f64();
|
||||
if wall < 0.1 { return 0.0; }
|
||||
let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs)
|
||||
+ (tv_to_secs(ru.ru_stime) - self.sys_secs);
|
||||
if wall < 0.1 {
|
||||
return 0.0;
|
||||
}
|
||||
let cpu =
|
||||
(tv_to_secs(ru.ru_utime) - self.user_secs) + (tv_to_secs(ru.ru_stime) - self.sys_secs);
|
||||
cpu / (wall * n_cores as f64)
|
||||
}
|
||||
|
||||
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
||||
let n = CpuSample::now();
|
||||
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
|
||||
let delta_wall = self.wall.elapsed().as_secs_f64();
|
||||
|
||||
let efficiency = delta_ru / delta_wall;
|
||||
let activate = 0f64.max(efficiency - self.previous) >= threshold;
|
||||
|
||||
if activate {
|
||||
debug!(
|
||||
"Do I activate : {} -> {} = {} Activate: {}",
|
||||
self.previous,
|
||||
efficiency,
|
||||
0f64.max(efficiency - self.previous),
|
||||
activate
|
||||
);
|
||||
self.previous = efficiency;
|
||||
self.user_secs = n.user_secs;
|
||||
self.sys_secs = n.sys_secs;
|
||||
self.wall = n.wall;
|
||||
}
|
||||
|
||||
activate
|
||||
}
|
||||
}
|
||||
|
||||
// ── public API ────────────────────────────────────────────────────────────────
|
||||
@@ -251,33 +297,37 @@ impl CpuSample {
|
||||
#[must_use = "call .stop() to record the stage"]
|
||||
pub struct Stage {
|
||||
label: String,
|
||||
wall: Instant,
|
||||
ru: rusage,
|
||||
wall: Instant,
|
||||
ru: rusage,
|
||||
}
|
||||
|
||||
impl Stage {
|
||||
pub fn start(label: impl Into<String>) -> Self {
|
||||
let label = label.into();
|
||||
info!(stage = %label, "started");
|
||||
Self { label, wall: Instant::now(), ru: get_rusage() }
|
||||
Self {
|
||||
label,
|
||||
wall: Instant::now(),
|
||||
ru: get_rusage(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(self) -> StageStats {
|
||||
let wall_secs = self.wall.elapsed().as_secs_f64();
|
||||
let end = get_rusage();
|
||||
let stats = StageStats {
|
||||
label: self.label,
|
||||
label: self.label,
|
||||
wall_secs,
|
||||
user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime),
|
||||
sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime),
|
||||
user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime),
|
||||
sys_secs: tv_to_secs(end.ru_stime) - tv_to_secs(self.ru.ru_stime),
|
||||
max_rss_bytes: rss_to_bytes(&end),
|
||||
minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64),
|
||||
major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64),
|
||||
vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64),
|
||||
invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64),
|
||||
in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64),
|
||||
out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64),
|
||||
swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64),
|
||||
minor_faults: delta(end.ru_minflt as i64, self.ru.ru_minflt as i64),
|
||||
major_faults: delta(end.ru_majflt as i64, self.ru.ru_majflt as i64),
|
||||
vol_ctx: delta(end.ru_nvcsw as i64, self.ru.ru_nvcsw as i64),
|
||||
invol_ctx: delta(end.ru_nivcsw as i64, self.ru.ru_nivcsw as i64),
|
||||
in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64),
|
||||
out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64),
|
||||
swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64),
|
||||
};
|
||||
info!(
|
||||
stage = %stats.label,
|
||||
@@ -299,27 +349,30 @@ impl Stage {
|
||||
|
||||
/// Per-stage efficiency metrics collected from `getrusage(RUSAGE_SELF)` deltas.
|
||||
pub struct StageStats {
|
||||
pub label: String,
|
||||
pub wall_secs: f64,
|
||||
pub user_secs: f64,
|
||||
pub sys_secs: f64,
|
||||
pub label: String,
|
||||
pub wall_secs: f64,
|
||||
pub user_secs: f64,
|
||||
pub sys_secs: f64,
|
||||
/// Peak RSS at end of stage (bytes). ru_maxrss is a process-lifetime maximum,
|
||||
/// so this reflects the high-water mark up to and including this stage.
|
||||
pub max_rss_bytes: u64,
|
||||
pub minor_faults: u64,
|
||||
pub major_faults: u64,
|
||||
pub vol_ctx: u64, // voluntary context switches
|
||||
pub invol_ctx: u64, // involuntary context switches
|
||||
pub in_blocks: u64, // filesystem block reads (after page cache)
|
||||
pub out_blocks: u64, // filesystem block writes
|
||||
pub swaps: u64,
|
||||
pub minor_faults: u64,
|
||||
pub major_faults: u64,
|
||||
pub vol_ctx: u64, // voluntary context switches
|
||||
pub invol_ctx: u64, // involuntary context switches
|
||||
pub in_blocks: u64, // filesystem block reads (after page cache)
|
||||
pub out_blocks: u64, // filesystem block writes
|
||||
pub swaps: u64,
|
||||
}
|
||||
|
||||
impl StageStats {
|
||||
/// (user + sys) / wall — effective thread count utilisation.
|
||||
pub fn parallelism(&self) -> f64 {
|
||||
if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs }
|
||||
else { 0.0 }
|
||||
if self.wall_secs > 1e-9 {
|
||||
(self.user_secs + self.sys_secs) / self.wall_secs
|
||||
} else {
|
||||
0.0
|
||||
}
|
||||
}
|
||||
|
||||
/// parallelism / n_cores — fraction of available CPU power used (0..1+).
|
||||
@@ -335,25 +388,33 @@ pub struct Reporter {
|
||||
}
|
||||
|
||||
impl Reporter {
|
||||
pub fn new() -> Self { Self::default() }
|
||||
pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); }
|
||||
pub fn stages(&self) -> &[StageStats] { &self.stages }
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
pub fn push(&mut self, stats: StageStats) {
|
||||
self.stages.push(stats);
|
||||
}
|
||||
pub fn stages(&self) -> &[StageStats] {
|
||||
&self.stages
|
||||
}
|
||||
/// Print the summary to stderr.
|
||||
pub fn print(&self) { eprint!("{self}"); }
|
||||
pub fn print(&self) {
|
||||
eprint!("{self}");
|
||||
}
|
||||
}
|
||||
|
||||
// ── diagnosis ─────────────────────────────────────────────────────────────────
|
||||
|
||||
struct Diagnosis {
|
||||
tag: &'static str,
|
||||
tag: &'static str,
|
||||
detail: Option<String>,
|
||||
}
|
||||
|
||||
// Thresholds are intentionally conservative to avoid false positives.
|
||||
fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
|
||||
let eff = s.efficiency(n_cores);
|
||||
let eff = s.efficiency(n_cores);
|
||||
let cpu_pct = eff * 100.0;
|
||||
let io_ops = s.in_blocks + s.out_blocks;
|
||||
let io_ops = s.in_blocks + s.out_blocks;
|
||||
|
||||
// swaps > 0 is the only reliable cross-platform indicator of true RAM exhaustion.
|
||||
// ru_majflt is intentionally excluded: on macOS it counts all file-backed mmap
|
||||
@@ -387,26 +448,43 @@ fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
|
||||
)),
|
||||
};
|
||||
}
|
||||
Diagnosis { tag: "—", detail: None }
|
||||
Diagnosis {
|
||||
tag: "—",
|
||||
detail: None,
|
||||
}
|
||||
}
|
||||
|
||||
// ── display helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
fn fmt_secs(s: f64) -> String {
|
||||
if s >= 100.0 { format!("{:.0}s", s) }
|
||||
else if s >= 10.0 { format!("{:.1}s", s) }
|
||||
else if s >= 1.0 { format!("{:.2}s", s) }
|
||||
else { format!("{:.0}ms", s * 1000.0) }
|
||||
if s >= 100.0 {
|
||||
format!("{:.0}s", s)
|
||||
} else if s >= 10.0 {
|
||||
format!("{:.1}s", s)
|
||||
} else if s >= 1.0 {
|
||||
format!("{:.2}s", s)
|
||||
} else {
|
||||
format!("{:.0}ms", s * 1000.0)
|
||||
}
|
||||
}
|
||||
|
||||
fn fmt_bytes(b: u64) -> String {
|
||||
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
|
||||
else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) }
|
||||
else { format!("{:.0} KB", b as f64 / 1024.0) }
|
||||
if b >= 1 << 30 {
|
||||
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64)
|
||||
} else if b >= 1 << 20 {
|
||||
format!("{:.0} MB", b as f64 / (1u64 << 20) as f64)
|
||||
} else {
|
||||
format!("{:.0} KB", b as f64 / 1024.0)
|
||||
}
|
||||
}
|
||||
|
||||
fn fmt_efficiency(par: f64, n_cores: usize) -> String {
|
||||
format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0)
|
||||
format!(
|
||||
"{:.1}×/{} ({:.0}%)",
|
||||
par,
|
||||
n_cores,
|
||||
par / n_cores as f64 * 100.0
|
||||
)
|
||||
}
|
||||
|
||||
// ── Display ───────────────────────────────────────────────────────────────────
|
||||
@@ -414,8 +492,8 @@ fn fmt_efficiency(par: f64, n_cores: usize) -> String {
|
||||
// ── MemoryBudget ──────────────────────────────────────────────────────────────
|
||||
|
||||
struct BudgetInner {
|
||||
remaining: u64,
|
||||
active: usize,
|
||||
remaining: u64,
|
||||
active: usize,
|
||||
peak_active: usize,
|
||||
}
|
||||
|
||||
@@ -425,8 +503,8 @@ struct BudgetInner {
|
||||
/// completion. Non-deadlock guarantee: when no worker is active the next
|
||||
/// acquire always succeeds regardless of cost vs. remaining budget.
|
||||
pub struct MemoryBudget {
|
||||
total: u64,
|
||||
inner: Mutex<BudgetInner>,
|
||||
total: u64,
|
||||
inner: Mutex<BudgetInner>,
|
||||
condvar: Condvar,
|
||||
}
|
||||
|
||||
@@ -434,7 +512,11 @@ impl MemoryBudget {
|
||||
pub fn new(total: u64) -> Self {
|
||||
Self {
|
||||
total,
|
||||
inner: Mutex::new(BudgetInner { remaining: total, active: 0, peak_active: 0 }),
|
||||
inner: Mutex::new(BudgetInner {
|
||||
remaining: total,
|
||||
active: 0,
|
||||
peak_active: 0,
|
||||
}),
|
||||
condvar: Condvar::new(),
|
||||
}
|
||||
}
|
||||
@@ -443,9 +525,9 @@ impl MemoryBudget {
|
||||
let mut g = self.inner.lock().unwrap();
|
||||
loop {
|
||||
if g.active == 0 || g.remaining >= cost {
|
||||
g.remaining = g.remaining.saturating_sub(cost);
|
||||
g.active += 1;
|
||||
g.peak_active = g.peak_active.max(g.active);
|
||||
g.remaining = g.remaining.saturating_sub(cost);
|
||||
g.active += 1;
|
||||
g.peak_active = g.peak_active.max(g.active);
|
||||
return;
|
||||
}
|
||||
g = self.condvar.wait(g).unwrap();
|
||||
@@ -455,47 +537,66 @@ impl MemoryBudget {
|
||||
pub fn release(&self, cost: u64) {
|
||||
let mut g = self.inner.lock().unwrap();
|
||||
g.remaining = (g.remaining + cost).min(self.total);
|
||||
g.active -= 1;
|
||||
g.active -= 1;
|
||||
self.condvar.notify_all();
|
||||
}
|
||||
|
||||
pub fn total(&self) -> u64 { self.total }
|
||||
pub fn active(&self) -> usize { self.inner.lock().unwrap().active }
|
||||
pub fn remaining(&self) -> u64 { self.inner.lock().unwrap().remaining }
|
||||
pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active }
|
||||
pub fn total(&self) -> u64 {
|
||||
self.total
|
||||
}
|
||||
pub fn active(&self) -> usize {
|
||||
self.inner.lock().unwrap().active
|
||||
}
|
||||
pub fn remaining(&self) -> u64 {
|
||||
self.inner.lock().unwrap().remaining
|
||||
}
|
||||
pub fn peak_active(&self) -> usize {
|
||||
self.inner.lock().unwrap().peak_active
|
||||
}
|
||||
}
|
||||
|
||||
// ── Display ───────────────────────────────────────────────────────────────────
|
||||
|
||||
impl fmt::Display for Reporter {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
if self.stages.is_empty() { return Ok(()); }
|
||||
if self.stages.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let n_cores = std::thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1);
|
||||
|
||||
// column widths
|
||||
let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5);
|
||||
let nw = self
|
||||
.stages
|
||||
.iter()
|
||||
.map(|s| s.label.len())
|
||||
.max()
|
||||
.unwrap_or(5)
|
||||
.max(5);
|
||||
// efficiency col: worst-case width for this run's n_cores value
|
||||
let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len();
|
||||
|
||||
let sep_w = nw + 2 + 7 + 2 + ew + 2 + 8 + 2 + 12;
|
||||
let sep = "─".repeat(sep_w);
|
||||
let sep = "─".repeat(sep_w);
|
||||
|
||||
// header
|
||||
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} status",
|
||||
"stage", "wall", "efficiency", "peak RSS")?;
|
||||
writeln!(
|
||||
f,
|
||||
"{:<nw$} {:>7} {:>ew$} {:>8} status",
|
||||
"stage", "wall", "efficiency", "peak RSS"
|
||||
)?;
|
||||
writeln!(f, "{sep}")?;
|
||||
|
||||
// compute all diagnoses up front (needed for both table and footnotes)
|
||||
let diagnoses: Vec<Diagnosis> = self.stages.iter()
|
||||
.map(|s| diagnose(s, n_cores))
|
||||
.collect();
|
||||
let diagnoses: Vec<Diagnosis> = self.stages.iter().map(|s| diagnose(s, n_cores)).collect();
|
||||
|
||||
// per-stage rows
|
||||
for (s, d) in self.stages.iter().zip(diagnoses.iter()) {
|
||||
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} {}",
|
||||
writeln!(
|
||||
f,
|
||||
"{:<nw$} {:>7} {:>ew$} {:>8} {}",
|
||||
s.label,
|
||||
fmt_secs(s.wall_secs),
|
||||
fmt_efficiency(s.parallelism(), n_cores),
|
||||
@@ -505,14 +606,21 @@ impl fmt::Display for Reporter {
|
||||
}
|
||||
|
||||
// totals
|
||||
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
|
||||
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
|
||||
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
|
||||
let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0);
|
||||
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
|
||||
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
|
||||
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
|
||||
let trss = self
|
||||
.stages
|
||||
.iter()
|
||||
.map(|s| s.max_rss_bytes)
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 };
|
||||
|
||||
writeln!(f, "{sep}")?;
|
||||
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8}",
|
||||
writeln!(
|
||||
f,
|
||||
"{:<nw$} {:>7} {:>ew$} {:>8}",
|
||||
"TOTAL",
|
||||
fmt_secs(tw),
|
||||
fmt_efficiency(tpar, n_cores),
|
||||
@@ -520,7 +628,9 @@ impl fmt::Display for Reporter {
|
||||
)?;
|
||||
|
||||
// bottleneck footnotes (only if at least one anomaly detected)
|
||||
let bottlenecks: Vec<(&str, &str)> = self.stages.iter()
|
||||
let bottlenecks: Vec<(&str, &str)> = self
|
||||
.stages
|
||||
.iter()
|
||||
.zip(diagnoses.iter())
|
||||
.filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det)))
|
||||
.collect();
|
||||
|
||||
Reference in New Issue
Block a user