From 9f1df96ea730696922eba640e97dc1f4907fee3e Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 22 Jun 2026 16:57:56 +0200 Subject: [PATCH 1/3] ci: restrict push trigger to main branch Replace the wildcard `['**']` in the push trigger with `['main']`. This prevents redundant pipeline runs on non-main branches during push events. --- .gitea/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml index 7c2b524..ae5729f 100644 --- a/.gitea/workflows/ci.yml +++ b/.gitea/workflows/ci.yml @@ -2,7 +2,7 @@ name: CI on: push: - branches: ['**'] + branches: ['main'] pull_request: jobs: From 469e53b6f571495897ee1be78d3cc979e07c6018 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 22 Jun 2026 17:28:48 +0200 Subject: [PATCH 2/3] Add genomic distance benchmarking suite and test data Introduces scripts to compute and validate pairwise genomic distance matrices across multiple metrics. Updates the Makefile with build and comparison targets, adds .gitignore rules for generated outputs, and includes test CSV matrices and a Newick phylogenetic tree for validating the distance computation pipeline. --- .gitignore | 2 + benchmark/Makefile | 90 +++++++++++- benchmark/build_reference_dist.py | 226 ++++++++++++++++++++++++++++++ benchmark/compare_all_dist.py | 182 ++++++++++++++++++++++++ benchmark/test_dist.csv | 21 +++ benchmark/test_nj.nwk | 1 + benchmark/test_shared.csv | 21 +++ 7 files changed, 541 insertions(+), 2 deletions(-) create mode 100755 benchmark/build_reference_dist.py create mode 100755 benchmark/compare_all_dist.py create mode 100644 benchmark/test_dist.csv create mode 100644 benchmark/test_nj.nwk create mode 100644 benchmark/test_shared.csv diff --git a/.gitignore b/.gitignore index ec94743..e63f216 100644 --- a/.gitignore +++ b/.gitignore @@ -17,5 +17,7 @@ benchmark/global_index_presence benchmark/global_index_count benchmark/stats benchmark/reference_index +benchmark/reference_dist +benchmark/obikmer_dist benchmark/specific_index_count benchmark/specific_index_presence diff --git a/benchmark/Makefile b/benchmark/Makefile index 5654ecc..14a3358 100644 --- a/benchmark/Makefile +++ b/benchmark/Makefile @@ -10,6 +10,23 @@ GENOMES := $(wildcard genomes/*.fna.gz) -include deps.mk REF_NPZS := $(SPECIMENS:%=reference_index/%.npz) +REF_DIST_CSVS := $(addprefix reference_dist/, \ + shared_kmers.csv hamming_dist.csv jaccard_dist.csv \ + bray_curtis_dist.csv relfreq_bray_curtis_dist.csv \ + euclidean_dist.csv relfreq_euclidean_dist.csv \ + hellinger_dist.csv hellinger_euclidean_dist.csv) +OBIKMER_PRESENCE_DIST := $(addprefix obikmer_dist/presence/, \ + jaccard_dist.csv jaccard_shared.csv jaccard_nj.nwk \ + hamming_dist.csv hamming_nj.nwk) +OBIKMER_COUNT_DIST := $(addprefix obikmer_dist/count/, \ + jaccard_dist.csv jaccard_shared.csv jaccard_nj.nwk \ + bray_curtis_dist.csv bray_curtis_nj.nwk \ + relfreq_bray_curtis_dist.csv relfreq_bray_curtis_nj.nwk \ + euclidean_dist.csv euclidean_nj.nwk \ + relfreq_euclidean_dist.csv relfreq_euclidean_nj.nwk \ + hellinger_dist.csv hellinger_nj.nwk \ + hellinger_euclidean_dist.csv hellinger_euclidean_nj.nwk) +DIST_COMPARISON := stats/dist_comparison/summary.csv PRESENCE_DONE := $(SPECIMENS:%=specimen_index_presence/%/index.done) PRESENCE_STATS := $(SPECIMENS:%=stats/indexing_presence/%.stats) COUNT_DONE := $(SPECIMENS:%=specimen_index_count/%/index.done) @@ -24,7 +41,9 @@ SIMULATED_READS := $(foreach s,$(SPECIMENS),simulated_data/$(subst --,/,$s)/read .NOTPARALLEL: -.PHONY: all simulate reference \ +.PHONY: all simulate reference reference_dist \ + obikmer_dist obikmer_dist_presence obikmer_dist_count \ + dist_comparison \ index_presence index_count \ aggregate_index_presence aggregate_index_count \ merge_presence merge_count \ @@ -39,7 +58,8 @@ verify_merge_count: stats/verify_merge_count/current.csv all: aggregate_verify_presence aggregate_verify_count \ verify_merge_presence verify_merge_count \ - aggregate_filter_presence aggregate_filter_count + aggregate_filter_presence aggregate_filter_count \ + dist_comparison # ── dependency file ─────────────────────────────────────────────────────────── @@ -62,6 +82,72 @@ reference_index/%.npz: reference: $(REF_NPZS) +# ── reference distance matrices ─────────────────────────────────────────────── + +$(REF_DIST_CSVS) &: $(REF_NPZS) build_reference_dist.py + $(VENV_PY) build_reference_dist.py + +reference_dist: $(REF_DIST_CSVS) + +# ── obikmer distance (presence index) ──────────────────────────────────────── + +$(OBIKMER_PRESENCE_DIST) &: global_index_presence/index.done $(BINARY) + mkdir -p obikmer_dist/presence + $(BINARY) distance \ + --output obikmer_dist/presence/jaccard \ + --metric jaccard --shared-kmers --nj \ + global_index_presence + $(BINARY) distance \ + --output obikmer_dist/presence/hamming \ + --metric hamming --nj \ + global_index_presence + +obikmer_dist_presence: $(OBIKMER_PRESENCE_DIST) + +# ── obikmer distance (count index) ─────────────────────────────────────────── + +$(OBIKMER_COUNT_DIST) &: global_index_count/index.done $(BINARY) + mkdir -p obikmer_dist/count + $(BINARY) distance \ + --output obikmer_dist/count/jaccard \ + --metric jaccard --shared-kmers --nj \ + global_index_count + $(BINARY) distance \ + --output obikmer_dist/count/bray_curtis \ + --metric bray-curtis --nj \ + global_index_count + $(BINARY) distance \ + --output obikmer_dist/count/relfreq_bray_curtis \ + --metric relfreq-bray-curtis --nj \ + global_index_count + $(BINARY) distance \ + --output obikmer_dist/count/euclidean \ + --metric euclidean --nj \ + global_index_count + $(BINARY) distance \ + --output obikmer_dist/count/relfreq_euclidean \ + --metric relfreq-euclidean --nj \ + global_index_count + $(BINARY) distance \ + --output obikmer_dist/count/hellinger \ + --metric hellinger --nj \ + global_index_count + $(BINARY) distance \ + --output obikmer_dist/count/hellinger_euclidean \ + --metric hellinger-euclidean --nj \ + global_index_count + +obikmer_dist_count: $(OBIKMER_COUNT_DIST) + +obikmer_dist: obikmer_dist_presence obikmer_dist_count + +# ── distance comparison ─────────────────────────────────────────────────────── + +$(DIST_COMPARISON): $(REF_DIST_CSVS) $(OBIKMER_PRESENCE_DIST) $(OBIKMER_COUNT_DIST) compare_all_dist.py + $(VENV_PY) compare_all_dist.py --out $(DIST_COMPARISON) + +dist_comparison: $(DIST_COMPARISON) + # ── per-specimen indexing ───────────────────────────────────────────────────── # Prerequisites (reads → index.done + .stats) are in deps.mk. diff --git a/benchmark/build_reference_dist.py b/benchmark/build_reference_dist.py new file mode 100755 index 0000000..edee858 --- /dev/null +++ b/benchmark/build_reference_dist.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +"""Compute reference pairwise distance matrices from per-specimen .npz kmer indexes. + +Reads all .npz files in reference_index/ (each containing sorted uint64 `kmers` +and uint32 `counts`), computes all distance metrics supported by `obikmer distance`, +and writes one CSV per metric to reference_dist/. + +Output CSV format matches `obikmer distance --output`: + - first row: "genome", then specimen names + - subsequent rows: specimen name, then float or int values + +Metrics written + jaccard_dist.csv Jaccard distance (presence/absence) + shared_kmers.csv Shared-kmer count matrix (intersection size) + bray_curtis_dist.csv Bray-Curtis dissimilarity (raw counts) + relfreq_bray_curtis_dist.csv Bray-Curtis on relative frequencies + euclidean_dist.csv Euclidean distance (raw counts) + relfreq_euclidean_dist.csv Euclidean distance on relative frequencies + hellinger_dist.csv Hellinger distance + hellinger_euclidean_dist.csv Euclidean distance in Hellinger space +""" +import argparse +import sys +from pathlib import Path + +import numpy as np + + +# ── pairwise helpers ────────────────────────────────────────────────────────── + +def shared_indices(a_kmers: np.ndarray, b_kmers: np.ndarray): + """Return index arrays (idx_a, idx_b) for kmers present in both sets. + + Both arrays must be sorted uint64. Uses searchsorted: O(|B| log |A|). + """ + pos = np.searchsorted(a_kmers, b_kmers) + pos = np.clip(pos, 0, len(a_kmers) - 1) + mask = a_kmers[pos] == b_kmers + idx_b = np.where(mask)[0] + idx_a = pos[idx_b] + return idx_a, idx_b + + +def pairwise_stats(specimens: list[dict]) -> dict[str, np.ndarray]: + """Compute all pairwise distance matrices at once. + + Returns a dict metric_name → ndarray (n×n float64 or int64). + Each specimen dict has keys: name, kmers, counts. + """ + n = len(specimens) + + # Pre-compute per-specimen scalars + kmer_counts = np.array([len(s['kmers']) for s in specimens], dtype=np.uint64) + count_sums = np.array([s['counts'].sum() for s in specimens], dtype=np.uint64) + + # Per-specimen sum-of-squares (for Euclidean decomposition) + sq_sums = np.array([(s['counts'].astype(np.float64) ** 2).sum() for s in specimens]) + + # Allocate output matrices + shared_mat = np.zeros((n, n), dtype=np.uint64) + hamming_mat = np.zeros((n, n), dtype=np.float64) + jaccard_mat = np.zeros((n, n), dtype=np.float64) + bray_mat = np.zeros((n, n), dtype=np.float64) + relfreq_bray = np.zeros((n, n), dtype=np.float64) + euclidean_mat = np.zeros((n, n), dtype=np.float64) + relfreq_eucl = np.zeros((n, n), dtype=np.float64) + hellinger_mat = np.zeros((n, n), dtype=np.float64) + hell_eucl_mat = np.zeros((n, n), dtype=np.float64) + + for i in range(n): + a_km = specimens[i]['kmers'] + a_ct = specimens[i]['counts'].astype(np.float64) + sa = float(count_sums[i]) + na = int(kmer_counts[i]) + + for j in range(i + 1, n): + b_km = specimens[j]['kmers'] + b_ct = specimens[j]['counts'].astype(np.float64) + sb = float(count_sums[j]) + nb = int(kmer_counts[j]) + + idx_a, idx_b = shared_indices(a_km, b_km) + inter = len(idx_a) + + ca_sh = a_ct[idx_a] + cb_sh = b_ct[idx_b] + + # ── Presence metrics ────────────────────────────────────────────── + + union = na + nb - inter + jac = (1.0 - inter / union) if union else 0.0 + hamming = float(na + nb - 2 * inter) # |A Δ B| + + # ── Count metrics ───────────────────────────────────────────────── + + # Bray-Curtis: 1 - 2*Σmin(a,b) / (Σa + Σb) + sum_min = np.minimum(ca_sh, cb_sh).sum() + denom_bc = sa + sb + bc = (1.0 - 2.0 * sum_min / denom_bc) if denom_bc else 0.0 + + # RelfreqBray: 1 - Σmin(a/sa, b/sb) [only shared contribute] + if sa and sb: + rfb = 1.0 - np.minimum(ca_sh / sa, cb_sh / sb).sum() + else: + rfb = 0.0 + + # Euclidean: √(Σa² + Σb² - 2·Σ(a·b)_shared) + cross = (ca_sh * cb_sh).sum() + eucl_partial = sq_sums[i] + sq_sums[j] - 2.0 * cross + eucl = np.sqrt(max(eucl_partial, 0.0)) + + # RelfreqEuclidean: √(Σ(a/sa - b/sb)²) + # = √(Σa²/sa² + Σb²/sb² - 2·Σ(a·b)_shared/(sa·sb)) + if sa and sb: + rf_cross = (ca_sh / sa * (cb_sh / sb)).sum() + rfe_partial = (sq_sums[i] / sa**2 + + sq_sums[j] / sb**2 + - 2.0 * rf_cross) + rfe = np.sqrt(max(rfe_partial, 0.0)) + else: + rfe = 0.0 + + # Hellinger partial: Σ(√(a/sa) - √(b/sb))² over global universe + # = 2 - 2·Σ√(a·b)_shared / √(sa·sb) + if sa and sb: + bc_coeff = np.sqrt(ca_sh * cb_sh).sum() / np.sqrt(sa * sb) + hell_partial = max(2.0 - 2.0 * bc_coeff, 0.0) + else: + hell_partial = 0.0 + + sq2 = np.sqrt(2.0) + hell = np.sqrt(hell_partial) / sq2 + hell_euc = np.sqrt(hell_partial) + + # ── Fill symmetric matrices ─────────────────────────────────────── + for mat, val in [ + (shared_mat, inter), + (hamming_mat, hamming), + (jaccard_mat, jac), + (bray_mat, bc), + (relfreq_bray, rfb), + (euclidean_mat, eucl), + (relfreq_eucl, rfe), + (hellinger_mat, hell), + (hell_eucl_mat, hell_euc), + ]: + mat[i, j] = val + mat[j, i] = val + + return { + 'shared_kmers': shared_mat, + 'hamming_dist': hamming_mat, + 'jaccard_dist': jaccard_mat, + 'bray_curtis_dist': bray_mat, + 'relfreq_bray_curtis_dist': relfreq_bray, + 'euclidean_dist': euclidean_mat, + 'relfreq_euclidean_dist': relfreq_eucl, + 'hellinger_dist': hellinger_mat, + 'hellinger_euclidean_dist': hell_eucl_mat, + } + + +# ── I/O ─────────────────────────────────────────────────────────────────────── + +def write_csv(path: Path, labels: list[str], mat: np.ndarray, fmt: str) -> None: + with path.open('w') as fh: + fh.write('genome,' + ','.join(labels) + '\n') + for i, label in enumerate(labels): + row = ','.join(format(mat[i, j], fmt) for j in range(len(labels))) + fh.write(f'{label},{row}\n') + print(f' → {path}', file=sys.stderr) + + +# ── main ───────────────────────────────────────────────────────────────────── + +def main() -> None: + ap = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument('--ref-dir', default='reference_index', + help='Directory with per-specimen .npz files (default: reference_index)') + ap.add_argument('--out-dir', default='reference_dist', + help='Output directory for CSV files (default: reference_dist)') + args = ap.parse_args() + + ref_dir = Path(args.ref_dir) + out_dir = Path(args.out_dir) + out_dir.mkdir(exist_ok=True) + + npz_files = sorted(ref_dir.glob('*.npz')) + if not npz_files: + print(f'ERROR: no .npz files found in {ref_dir}', file=sys.stderr) + sys.exit(1) + + print(f'Loading {len(npz_files)} specimen(s) from {ref_dir}/', file=sys.stderr) + specimens = [] + for f in npz_files: + data = np.load(f) + specimens.append({ + 'name': f.stem, + 'kmers': data['kmers'], + 'counts': data['counts'], + }) + print(f' {f.stem}: {len(data["kmers"]):,} kmers', file=sys.stderr) + + labels = [s['name'] for s in specimens] + n = len(labels) + print(f'\nComputing pairwise distances for {n} specimens…', file=sys.stderr) + + matrices = pairwise_stats(specimens) + + print(f'\nWriting CSVs to {out_dir}/', file=sys.stderr) + write_csv(out_dir / 'shared_kmers.csv', labels, matrices['shared_kmers'], 'd') + write_csv(out_dir / 'hamming_dist.csv', labels, matrices['hamming_dist'], '.6f') + write_csv(out_dir / 'jaccard_dist.csv', labels, matrices['jaccard_dist'], '.6f') + write_csv(out_dir / 'bray_curtis_dist.csv', labels, matrices['bray_curtis_dist'], '.6f') + write_csv(out_dir / 'relfreq_bray_curtis_dist.csv', labels, matrices['relfreq_bray_curtis_dist'], '.6f') + write_csv(out_dir / 'euclidean_dist.csv', labels, matrices['euclidean_dist'], '.6f') + write_csv(out_dir / 'relfreq_euclidean_dist.csv', labels, matrices['relfreq_euclidean_dist'], '.6f') + write_csv(out_dir / 'hellinger_dist.csv', labels, matrices['hellinger_dist'], '.6f') + write_csv(out_dir / 'hellinger_euclidean_dist.csv', labels, matrices['hellinger_euclidean_dist'], '.6f') + + print('\nDone.', file=sys.stderr) + + +if __name__ == '__main__': + main() diff --git a/benchmark/compare_all_dist.py b/benchmark/compare_all_dist.py new file mode 100755 index 0000000..f34ddc6 --- /dev/null +++ b/benchmark/compare_all_dist.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +"""Compare all reference distance matrices against obikmer distance outputs. + +Reads from: + reference_dist/ — ground-truth matrices computed by build_reference_dist.py + obikmer_dist/ — matrices produced by `obikmer distance` + +Handles label reordering: both matrices are sorted by genome label before +element-wise comparison, so column/row order differences are irrelevant. + +Output: stats/dist_comparison/summary.csv + comparison,max_abs,mean_abs,rmse,n_pairs,status +""" +import csv +import sys +from pathlib import Path + +import numpy as np + + +# ── CSV loading ─────────────────────────────────────────────────────────────── + +def load_matrix(path: Path) -> tuple[list[str], np.ndarray]: + """Load a distance-matrix CSV; return (sorted_labels, matrix_float64).""" + with path.open() as fh: + reader = csv.reader(fh) + header = next(reader)[1:] # skip 'genome' column + raw: dict[str, list[float]] = {} + for row in reader: + raw[row[0]] = [float(x) for x in row[1:]] + + label_to_col = {h: i for i, h in enumerate(header)} + labels = sorted(raw.keys()) + n = len(labels) + mat = np.zeros((n, n), dtype=np.float64) + for i, ri in enumerate(labels): + for j, cj in enumerate(labels): + mat[i, j] = raw[ri][label_to_col[cj]] + return labels, mat + + +# ── comparison ──────────────────────────────────────────────────────────────── + +def compare(label: str, + ref_path: Path, + obi_path: Path, + tol: float = 1e-4) -> dict: + if not ref_path.exists(): + return {'comparison': label, 'status': 'REF_MISSING', + 'max_abs': '', 'mean_abs': '', 'rmse': '', 'n_pairs': ''} + if not obi_path.exists(): + return {'comparison': label, 'status': 'OBI_MISSING', + 'max_abs': '', 'mean_abs': '', 'rmse': '', 'n_pairs': ''} + + ref_labels, ref_mat = load_matrix(ref_path) + obi_labels, obi_mat = load_matrix(obi_path) + + if ref_labels != obi_labels: + only_ref = sorted(set(ref_labels) - set(obi_labels)) + only_obi = sorted(set(obi_labels) - set(ref_labels)) + print(f' [{label}] label mismatch — ' + f'only_ref={only_ref} only_obi={only_obi}', file=sys.stderr) + return {'comparison': label, 'status': 'LABEL_MISMATCH', + 'max_abs': '', 'mean_abs': '', 'rmse': '', 'n_pairs': ''} + + n = len(ref_labels) + # Off-diagonal mask + mask = ~np.eye(n, dtype=bool) + diff = np.abs(ref_mat[mask] - obi_mat[mask]) + n_pairs = diff.size + + max_abs = float(diff.max()) + mean_abs = float(diff.mean()) + rmse = float(np.sqrt((diff ** 2).mean())) + status = 'PASS' if max_abs <= tol else 'FAIL' + + print(f' [{label}] n={n_pairs} ' + f'max={max_abs:.3e} mean={mean_abs:.3e} rmse={rmse:.3e} {status}', + file=sys.stderr) + return { + 'comparison': label, + 'max_abs': f'{max_abs:.6e}', + 'mean_abs': f'{mean_abs:.6e}', + 'rmse': f'{rmse:.6e}', + 'n_pairs': str(n_pairs), + 'status': status, + } + + +# ── comparison table ────────────────────────────────────────────────────────── + +# (label, ref_csv, obikmer_csv) +# The reference jaccard/shared is presence-based, which should match both +# presence/jaccard and count/jaccard (threshold=1). +COMPARISONS = [ + # ── presence index ──────────────────────────────────────────────────────── + ('presence/jaccard_dist', + 'reference_dist/jaccard_dist.csv', + 'obikmer_dist/presence/jaccard_dist.csv'), + + ('presence/jaccard_shared', + 'reference_dist/shared_kmers.csv', + 'obikmer_dist/presence/jaccard_shared.csv'), + + ('presence/hamming_dist', + 'reference_dist/hamming_dist.csv', + 'obikmer_dist/presence/hamming_dist.csv'), + + # ── count index (jaccard cross-check) ───────────────────────────────────── + ('count/jaccard_dist', + 'reference_dist/jaccard_dist.csv', + 'obikmer_dist/count/jaccard_dist.csv'), + + ('count/jaccard_shared', + 'reference_dist/shared_kmers.csv', + 'obikmer_dist/count/jaccard_shared.csv'), + + # ── count index (count-based metrics) ──────────────────────────────────── + ('count/bray_curtis_dist', + 'reference_dist/bray_curtis_dist.csv', + 'obikmer_dist/count/bray_curtis_dist.csv'), + + ('count/relfreq_bray_curtis_dist', + 'reference_dist/relfreq_bray_curtis_dist.csv', + 'obikmer_dist/count/relfreq_bray_curtis_dist.csv'), + + ('count/euclidean_dist', + 'reference_dist/euclidean_dist.csv', + 'obikmer_dist/count/euclidean_dist.csv'), + + ('count/relfreq_euclidean_dist', + 'reference_dist/relfreq_euclidean_dist.csv', + 'obikmer_dist/count/relfreq_euclidean_dist.csv'), + + ('count/hellinger_dist', + 'reference_dist/hellinger_dist.csv', + 'obikmer_dist/count/hellinger_dist.csv'), + + ('count/hellinger_euclidean_dist', + 'reference_dist/hellinger_euclidean_dist.csv', + 'obikmer_dist/count/hellinger_euclidean_dist.csv'), +] + + +# ── main ───────────────────────────────────────────────────────────────────── + +def main() -> None: + import argparse + ap = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument('--tol', type=float, default=1e-4, + help='Max abs diff threshold for PASS/FAIL (default 1e-4)') + ap.add_argument('--out', default='stats/dist_comparison/summary.csv', + help='Output summary CSV path') + args = ap.parse_args() + + out_path = Path(args.out) + out_path.parent.mkdir(parents=True, exist_ok=True) + + print(f'Comparing {len(COMPARISONS)} matrix pairs…', file=sys.stderr) + rows = [] + for label, ref, obi in COMPARISONS: + rows.append(compare(label, Path(ref), Path(obi), tol=args.tol)) + + fields = ['comparison', 'max_abs', 'mean_abs', 'rmse', 'n_pairs', 'status'] + with out_path.open('w', newline='') as fh: + w = csv.DictWriter(fh, fieldnames=fields) + w.writeheader() + w.writerows(rows) + + print(f'\n→ {out_path}', file=sys.stderr) + + n_fail = sum(1 for r in rows if r.get('status') == 'FAIL') + n_pass = sum(1 for r in rows if r.get('status') == 'PASS') + print(f'Summary: {n_pass} PASS {n_fail} FAIL ' + f'{len(rows) - n_pass - n_fail} SKIP', file=sys.stderr) + if n_fail: + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/benchmark/test_dist.csv b/benchmark/test_dist.csv new file mode 100644 index 0000000..960fc1f --- /dev/null +++ b/benchmark/test_dist.csv @@ -0,0 +1,21 @@ +genome,Candidozyma_auris--GCF_003013715.1_ASM301371v2,Acidobacterium_capsulatum--ATCC_51196,Bacillus_subtilis--168,Escherichia_coli--CFT073,Escherichia_coli--EDL933,Escherichia_coli--K-12_MG1655,Escherichia_coli--K-12_W3110,Klebsiella_pneumoniae--ATCC_13883,Klebsiella_pneumoniae--HS11286,Klebsiella_pneumoniae--MGH_78578,Opitutus_terrae--PB90-1,Proteus_mirabilis--HI4320,Saccharolobus_islandicus--M.16.4,Salmonella_enterica--AKU_12601,Salmonella_enterica--CT18,Salmonella_enterica--LT2,Salmonella_enterica--P125109,Shouchella_clausii--KSM-K16,Wolbachia_endosymbiont--GCF_000306885.1_ASM30688v1,Yersinia_ruckeri--YRB +Candidozyma_auris--GCF_003013715.1_ASM301371v2,0.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000 +Acidobacterium_capsulatum--ATCC_51196,1.000000,0.000000,0.999981,0.999990,0.999989,0.999987,0.999987,0.999990,0.999988,0.999988,0.999994,0.999989,1.000000,0.999988,0.999987,0.999987,0.999988,0.999989,0.999991,0.999987 +Bacillus_subtilis--168,1.000000,0.999981,0.000000,0.999990,0.999989,0.999989,0.999989,0.999989,0.999988,0.999986,0.999995,0.999985,0.999999,0.999988,0.999987,0.999989,0.999988,0.999778,0.999993,0.999987 +Escherichia_coli--CFT073,1.000000,0.999990,0.999990,0.000000,0.825741,0.807495,0.807218,0.991156,0.996855,0.997849,0.999996,0.999633,1.000000,0.993885,0.996736,0.994148,0.993821,0.999991,0.999984,0.999291 +Escherichia_coli--EDL933,1.000000,0.999989,0.999989,0.825741,0.000000,0.735107,0.734775,0.996126,0.998058,0.997908,0.999997,0.999640,1.000000,0.993993,0.997126,0.994390,0.994059,0.999991,0.999986,0.999292 +Escherichia_coli--K-12_MG1655,1.000000,0.999987,0.999989,0.807495,0.735107,0.000000,0.382567,0.996190,0.997747,0.997455,0.999996,0.999604,1.000000,0.993444,0.996645,0.993773,0.993431,0.999989,0.999984,0.999174 +Escherichia_coli--K-12_W3110,1.000000,0.999987,0.999989,0.807218,0.734775,0.382567,0.000000,0.996220,0.997761,0.997467,0.999995,0.999604,1.000000,0.993445,0.996669,0.993769,0.993443,0.999990,0.999985,0.999165 +Klebsiella_pneumoniae--ATCC_13883,1.000000,0.999990,0.999989,0.991156,0.996126,0.996190,0.996220,0.000000,0.845220,0.840545,0.999997,0.999648,1.000000,0.996177,0.998128,0.996268,0.996052,0.999990,0.999987,0.999325 +Klebsiella_pneumoniae--HS11286,1.000000,0.999988,0.999988,0.996855,0.998058,0.997747,0.997761,0.845220,0.000000,0.906475,0.999996,0.999683,1.000000,0.997724,0.995697,0.997776,0.997769,0.999989,0.999979,0.999463 +Klebsiella_pneumoniae--MGH_78578,1.000000,0.999988,0.999986,0.997849,0.997908,0.997455,0.997467,0.840545,0.906475,0.000000,0.999996,0.999704,1.000000,0.997928,0.995054,0.997844,0.997868,0.999990,0.999980,0.999479 +Opitutus_terrae--PB90-1,1.000000,0.999994,0.999995,0.999996,0.999997,0.999996,0.999995,0.999997,0.999996,0.999996,0.000000,0.999997,0.999998,0.999996,0.999996,0.999996,0.999995,0.999997,0.999993,0.999996 +Proteus_mirabilis--HI4320,1.000000,0.999989,0.999985,0.999633,0.999640,0.999604,0.999604,0.999648,0.999683,0.999704,0.999997,0.000000,1.000000,0.999604,0.999699,0.999622,0.999613,0.999987,0.999983,0.999505 +Saccharolobus_islandicus--M.16.4,1.000000,1.000000,0.999999,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,0.999998,1.000000,0.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000,1.000000 +Salmonella_enterica--AKU_12601,1.000000,0.999988,0.999988,0.993885,0.993993,0.993444,0.993445,0.996177,0.997724,0.997928,0.999996,0.999604,1.000000,0.000000,0.869238,0.682277,0.663383,0.999990,0.999985,0.999260 +Salmonella_enterica--CT18,1.000000,0.999987,0.999987,0.996736,0.997126,0.996645,0.996669,0.998128,0.995697,0.995054,0.999996,0.999699,1.000000,0.869238,0.000000,0.890872,0.886148,0.999988,0.999976,0.999524 +Salmonella_enterica--LT2,1.000000,0.999987,0.999989,0.994148,0.994390,0.993773,0.993769,0.996268,0.997776,0.997844,0.999996,0.999622,1.000000,0.682277,0.890872,0.000000,0.622606,0.999989,0.999985,0.999296 +Salmonella_enterica--P125109,1.000000,0.999988,0.999988,0.993821,0.994059,0.993431,0.993443,0.996052,0.997769,0.997868,0.999995,0.999613,1.000000,0.663383,0.886148,0.622606,0.000000,0.999988,0.999983,0.999270 +Shouchella_clausii--KSM-K16,1.000000,0.999989,0.999778,0.999991,0.999991,0.999989,0.999990,0.999990,0.999989,0.999990,0.999997,0.999987,1.000000,0.999990,0.999988,0.999989,0.999988,0.000000,0.999991,0.999988 +Wolbachia_endosymbiont--GCF_000306885.1_ASM30688v1,1.000000,0.999991,0.999993,0.999984,0.999986,0.999984,0.999985,0.999987,0.999979,0.999980,0.999993,0.999983,1.000000,0.999985,0.999976,0.999985,0.999983,0.999991,0.000000,0.999983 +Yersinia_ruckeri--YRB,1.000000,0.999987,0.999987,0.999291,0.999292,0.999174,0.999165,0.999325,0.999463,0.999479,0.999996,0.999505,1.000000,0.999260,0.999524,0.999296,0.999270,0.999988,0.999983,0.000000 diff --git a/benchmark/test_nj.nwk b/benchmark/test_nj.nwk new file mode 100644 index 0000000..124abba --- /dev/null +++ b/benchmark/test_nj.nwk @@ -0,0 +1 @@ +(((((((((((Candidozyma_auris--GCF_003013715.1_ASM301371v2:0.5000001881725941,Saccharolobus_islandicus--M.16.4:0.4999993211600824):0.0000023411501775538747,Opitutus_terrae--PB90-1:0.499997075187947):0.0000029791191795691675,(Acidobacterium_capsulatum--ATCC_51196:0.49999227771334689,(Bacillus_subtilis--168:0.49988797935621456,Shouchella_clausii--KSM-K16:0.49988984146059159):0.0001037210285571577):0.0000023959836053522034):0.0000034093646568700288,Wolbachia_endosymbiont--GCF_000306885.1_ASM30688v1:0.4999920159222422):0.000199555100890203,Proteus_mirabilis--HI4320:0.49979129185300427):0.00010103619067070024,Yersinia_ruckeri--YRB:0.4996806650749249):0.0013719139155004,(Klebsiella_pneumoniae--HS11286:0.43798845051648258,(Klebsiella_pneumoniae--ATCC_13883:0.41780293826821265,Klebsiella_pneumoniae--MGH_78578:0.42274184870836559):0.017586732339732737):0.0604124197073832):0.0006482538063555254,(Salmonella_enterica--CT18:0.43952894448143017,(Salmonella_enterica--AKU_12601:0.3357977326267918,(Salmonella_enterica--LT2:0.31203395843666389,Salmonella_enterica--P125109:0.31057217324861216):0.025729515856701136):0.10292985918524672):0.05825411485542886):0.08937928015651564,Escherichia_coli--CFT073:0.40806501650701029):0.0410131211869626,Escherichia_coli--EDL933:0.3681464750911808):0.1755112579711463,Escherichia_coli--K-12_MG1655:0.19129818036662728,Escherichia_coli--K-12_W3110:0.19126872019906239); \ No newline at end of file diff --git a/benchmark/test_shared.csv b/benchmark/test_shared.csv new file mode 100644 index 0000000..1fbc20d --- /dev/null +++ b/benchmark/test_shared.csv @@ -0,0 +1,21 @@ +genome,Candidozyma_auris--GCF_003013715.1_ASM301371v2,Acidobacterium_capsulatum--ATCC_51196,Bacillus_subtilis--168,Escherichia_coli--CFT073,Escherichia_coli--EDL933,Escherichia_coli--K-12_MG1655,Escherichia_coli--K-12_W3110,Klebsiella_pneumoniae--ATCC_13883,Klebsiella_pneumoniae--HS11286,Klebsiella_pneumoniae--MGH_78578,Opitutus_terrae--PB90-1,Proteus_mirabilis--HI4320,Saccharolobus_islandicus--M.16.4,Salmonella_enterica--AKU_12601,Salmonella_enterica--CT18,Salmonella_enterica--LT2,Salmonella_enterica--P125109,Shouchella_clausii--KSM-K16,Wolbachia_endosymbiont--GCF_000306885.1_ASM30688v1,Yersinia_ruckeri--YRB +Candidozyma_auris--GCF_003013715.1_ASM301371v2,0,0,0,0,0,0,0,0,0,0,0,0,8,0,1,0,0,0,0,3 +Acidobacterium_capsulatum--ATCC_51196,0,0,203,119,128,141,140,116,109,111,78,112,0,136,109,147,134,117,55,129 +Bacillus_subtilis--168,0,203,0,124,132,128,123,133,109,130,66,158,6,131,112,124,135,2393,46,124 +Escherichia_coli--CFT073,0,119,124,0,1966777,1998059,1999094,117743,32029,22312,63,4225,0,74946,31918,73311,76585,113,128,7854 +Escherichia_coli--EDL933,0,128,132,1966777,0,2627885,2628700,52488,20134,22064,48,4202,0,74655,28602,71244,74665,112,108,7963 +Escherichia_coli--K-12_MG1655,0,141,128,1998059,2627885,0,4452541,48302,21382,24602,47,4277,0,75729,30449,73622,76778,119,111,8566 +Escherichia_coli--K-12_W3110,0,140,123,1999094,2628700,4452541,0,47894,21226,24470,68,4278,0,75658,30207,73614,76583,112,108,8660 +Klebsiella_pneumoniae--ATCC_13883,0,116,133,117743,52488,48302,47894,0,1416091,1477759,42,4172,0,48296,18988,48144,50416,120,106,7712 +Klebsiella_pneumoniae--HS11286,0,109,109,32029,20134,21382,21226,1416091,0,644063,42,2738,0,21498,29758,21606,21376,99,102,4417 +Klebsiella_pneumoniae--MGH_78578,0,111,130,22312,22064,24602,24470,1477759,644063,0,42,2614,0,19948,35067,21330,20813,97,102,4374 +Opitutus_terrae--PB90-1,0,78,66,63,48,47,68,42,42,42,0,43,18,57,42,53,66,39,58,43 +Proteus_mirabilis--HI4320,0,112,158,4225,4202,4277,4278,4172,2738,2614,43,0,0,4254,2481,4166,4215,131,103,4704 +Saccharolobus_islandicus--M.16.4,8,0,6,0,0,0,0,0,0,0,18,0,0,0,0,0,0,0,0,0 +Salmonella_enterica--AKU_12601,0,136,131,74946,74655,75729,75658,48296,21498,19948,57,4254,0,0,1047731,2857146,2951421,117,108,7643 +Salmonella_enterica--CT18,1,109,112,31918,28602,30449,30207,18988,29758,35067,42,2481,0,1047731,0,917948,940297,106,106,3716 +Salmonella_enterica--LT2,0,147,124,73311,71244,73622,73614,48144,21606,21330,53,4166,0,2857146,917948,0,3284800,122,108,7460 +Salmonella_enterica--P125109,0,134,135,76585,74665,76778,76583,50416,21376,20813,66,4215,0,2951421,940297,3284800,0,134,124,7645 +Shouchella_clausii--KSM-K16,0,117,2393,113,112,119,112,120,99,97,39,131,0,117,106,122,134,0,58,124 +Wolbachia_endosymbiont--GCF_000306885.1_ASM30688v1,0,55,46,128,108,111,108,106,102,102,58,103,0,108,106,108,124,58,0,96 +Yersinia_ruckeri--YRB,3,129,124,7854,7963,8566,8660,7712,4417,4374,43,4704,0,7643,3716,7460,7645,124,96,0 From ac3ef106e7ce65df61230d2e5506bdbb634a0c5a Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 22 Jun 2026 18:04:56 +0200 Subject: [PATCH 3/3] refactor: implement adaptive worker scaling and infallible NUMA build Replaces the fallible NUMA topology builder with an infallible fallback that synthesizes a single-node UMA configuration on failure or absence. Refactors PartitionRunner to pre-spawn dormant workers and dynamically activate them via CPU efficiency thresholds, replacing static upfront spawning with adaptive scaling. Bumps obikmer crate version to 1.1.15. --- src/Cargo.lock | 2 +- src/obikindex/src/numa.rs | 314 +++++++++++++++++++++++++------------- src/obikmer/Cargo.toml | 2 +- 3 files changed, 214 insertions(+), 104 deletions(-) diff --git a/src/Cargo.lock b/src/Cargo.lock index 3de361c..80d30d7 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1704,7 +1704,7 @@ dependencies = [ [[package]] name = "obikmer" -version = "1.1.14" +version = "1.1.15" dependencies = [ "clap", "csv", diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs index 4c12013..9032a99 100644 --- a/src/obikindex/src/numa.rs +++ b/src/obikindex/src/numa.rs @@ -5,10 +5,8 @@ // CPUs. Linux first-touch policy then places graph allocations in local DRAM // automatically — no explicit memory binding needed. // -// Returns None when: -// - hwloc topology initialisation fails -// - the system has only one NUMA node (UMA, Apple Silicon, single-socket) -// - any per-node pool fails to build +// UMA systems (single socket, Apple Silicon, etc.) are the degenerate case: +// one synthetic node containing all cores, no pool, no pinning. use std::sync::Arc; use std::time::{Duration, Instant}; @@ -18,60 +16,69 @@ use hwlocality::Topology; use hwlocality::cpu::binding::CpuBindingFlags; use hwlocality::cpu::cpuset::CpuSet; use hwlocality::object::types::ObjectType; +use obisys::CpuSample; use tracing::debug; // ── Public interface ────────────────────────────────────────────────────────── pub struct NumaSetup { - pub pools: Vec>, + /// One entry per NUMA node. `None` on UMA systems (no pool, no pinning). + pub pools: Vec>>, /// CPU indices for each NUMA node, in node order. pub cpus_per_node: Vec>, } impl NumaSetup { - /// Workers to activate per NUMA node. - /// Empirically ~3 workers saturate one node's memory bandwidth. + /// Maximum worker slots per node (one per physical core in the node). pub fn workers_per_node(&self) -> usize { self.cpus_per_node .first() - .map(|c| (c.len() / 8).max(3).min(8)) - .unwrap_or(3) + .map(|c| c.len().max(1)) + .unwrap_or(1) } } /// Detect NUMA topology and build per-node Rayon pools. -/// Returns None on UMA systems, single-node machines, or on failure. -pub fn build() -> Option { - let topology = Topology::new().ok()?; +/// Always succeeds: falls back to a single synthetic UMA node on failure. +pub fn build() -> NumaSetup { + if let Ok(topology) = Topology::new() { + let nodes: Vec> = topology + .objects_with_type(ObjectType::NUMANode) + .filter_map(|obj| obj.cpuset()) + .map(|cpuset| { + cpuset + .iter_set() + .map(|idx| usize::from(idx)) + .collect::>() + }) + .filter(|v| !v.is_empty()) + .collect(); - let nodes: Vec> = topology - .objects_with_type(ObjectType::NUMANode) - .filter_map(|obj| obj.cpuset()) - .map(|cpuset| { - cpuset - .iter_set() - .map(|idx| usize::from(idx)) - .collect::>() - }) - .filter(|v| !v.is_empty()) - .collect(); - - if nodes.len() <= 1 { - return None; + if nodes.len() > 1 { + if let Some(pools) = nodes + .iter() + .map(|cpus| build_pool(cpus).map(|p| Some(Arc::new(p)))) + .collect::>>() + { + debug!( + "NUMA topology: {} node(s), {} core(s)/node", + nodes.len(), + nodes.first().map_or(0, |v| v.len()), + ); + return NumaSetup { pools, cpus_per_node: nodes }; + } + } } - debug!( - "NUMA topology: {} node(s), {} core(s)/node", - nodes.len(), - nodes.first().map_or(0, |v| v.len()), - ); - - let pools = nodes - .iter() - .map(|cpus| build_pool(cpus).map(Arc::new)) - .collect::>>()?; - - Some(NumaSetup { pools, cpus_per_node: nodes }) + // UMA fallback: single synthetic node, all cores, no pool, no pinning. + let n_cores = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + debug!("UMA: single synthetic node, {} core(s)", n_cores); + NumaSetup { + pools: vec![None], + cpus_per_node: vec![(0..n_cores).collect()], + } } /// Bind the calling thread to `cpu_indices` using hwloc. @@ -114,18 +121,19 @@ struct NodeConfig { /// Generic NUMA-aware runner for partition-level parallel work. /// /// Workers are distributed round-robin across NUMA nodes and pinned to their -/// node's CPUs. UMA systems are the degenerate case: one node, no pinning. +/// node's CPUs. UMA is the degenerate case: one node, no pinning. +/// +/// Workers are pre-spawned dormant and activated one by one as CPU efficiency +/// falls below `SPAWN_THRESHOLD`. This avoids over-provisioning on I/O-bound +/// or memory-bandwidth-bound workloads while saturating CPU-bound ones. /// /// # Termination /// -/// Termination is driven entirely by channel closure: -/// /// ```text /// drop(part_tx) → part_rx drains → workers exit → drop their result_tx /// drop(result_tx) → result_rx closes → controller loop exits +/// drop(activate_tx) → dormant workers exit cleanly /// ``` -/// -/// No explicit counter or sentinel needed. pub struct PartitionRunner { nodes: Vec, } @@ -136,50 +144,38 @@ impl PartitionRunner { self.nodes.iter().map(|n| n.max_workers).sum() } - /// Detect topology and build. Falls back to a single-node UMA runner on - /// macOS, single-socket machines, or hwloc failure. + /// Detect topology and build. Always succeeds. pub fn new() -> Self { - match build() { - Some(ns) => { - let wpn = ns.workers_per_node(); - debug!( - "PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node", - ns.pools.len(), wpn, - ); - let nodes = ns.pools - .into_iter() - .zip(ns.cpus_per_node) - .map(|(pool, cpu_ids)| NodeConfig { - pool: Some(pool), - cpu_ids, - max_workers: wpn, - }) - .collect(); - Self { nodes } - } - None => { - let n_cores = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - let max_workers = (n_cores / 2).max(1); - debug!("PartitionRunner: UMA mode — {} worker(s)", max_workers); - Self { - nodes: vec![NodeConfig { - pool: None, - cpu_ids: vec![], - max_workers, - }], - } - } - } + let ns = build(); + let wpn = ns.workers_per_node(); + debug!( + "PartitionRunner: {} node(s) × {} worker(s)/node max", + ns.pools.len(), + wpn, + ); + let nodes = ns.pools + .into_iter() + .zip(ns.cpus_per_node) + .map(|(pool, cpu_ids)| NodeConfig { + pool, + cpu_ids, + max_workers: wpn, + }) + .collect(); + Self { nodes } } /// Run `f(i)` for every index in `order`. /// - /// Workers are spawned upfront and distributed round-robin across NUMA - /// nodes. `on_done(i, result, elapsed)` is called from the controller - /// thread as each partition completes — suitable for progress bars and - /// result aggregation. + /// Workers are pre-spawned dormant and activated adaptively. A timer thread + /// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed + /// partition resets that timer (forcing an immediate check) and also + /// triggers its own inline check. A new worker is activated whenever + /// efficiency falls below `SPAWN_THRESHOLD`. + /// + /// `on_done(i, result, elapsed)` is called from the controller thread as + /// each partition completes — suitable for progress bars and result + /// aggregation. /// /// Returns the first error produced by `f`, if any. pub fn run( @@ -194,28 +190,65 @@ impl PartitionRunner { E: Send, C: FnMut(usize, R, Duration) + Send, { - // Pre-load the work queue, then drop the sender so workers' part_rx - // iterators terminate when the queue is drained. - let (part_tx, part_rx) = unbounded::(); + let n_total = order.len(); + if n_total == 0 { + return Ok(()); + } + + const SPAWN_THRESHOLD: f64 = 0.95; + const TIMER_SECS: u64 = 30; + + let n_cores = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + + // ── Channels ────────────────────────────────────────────────────────── + let (part_tx, part_rx) = unbounded::(); + let (activate_tx, activate_rx) = unbounded::<()>(); + // reset_tx: controller → timer ("reset the 30 s window") + let (reset_tx, reset_rx) = unbounded::<()>(); + // event_tx: workers + timer → controller (unified event stream) + let (event_tx, event_rx) = unbounded::>(); + for &i in order { part_tx.send(i).ok(); } drop(part_tx); - let (result_tx, result_rx) = unbounded::<(usize, Result, Duration)>(); - let n_nodes = self.nodes.len(); - let f = &f; // shared borrow; F: Sync so concurrent calls are safe + let max_workers = self.max_workers(); + let n_nodes = self.nodes.len(); + let f = &f; let mut first_err: Option = None; std::thread::scope(|s| { - // Spawn all workers upfront, round-robin across NUMA nodes. - for w in 0..self.max_workers() { + // ── Timer thread ────────────────────────────────────────────────── + // Sends TimerTick every TIMER_SECS seconds. Resets its window each + // time reset_rx receives a message (i.e. on partition completion). + let timer_tx = event_tx.clone(); + s.spawn(move || { + let period = Duration::from_secs(TIMER_SECS); + loop { + crossbeam_channel::select! { + recv(reset_rx) -> r => { + if r.is_err() { break; } // reset_tx dropped → exit + } + default(period) => { + if timer_tx.send(WorkerEvent::TimerTick).is_err() { break; } + } + } + } + }); + + // ── Pre-spawn workers dormant, round-robin across NUMA nodes ────── + for w in 0..max_workers { let node = &self.nodes[w % n_nodes]; let prx = part_rx.clone(); - let rtx = result_tx.clone(); + let etx = event_tx.clone(); + let arx = activate_rx.clone(); let pool = node.pool.clone(); let cpu_ids = &node.cpu_ids; s.spawn(move || { + if arx.recv().is_err() { return; } if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); } for i in &prx { let t = Instant::now(); @@ -223,24 +256,53 @@ impl PartitionRunner { Some(p) => p.install(|| f(i)), None => f(i), }; - rtx.send((i, r, t.elapsed())).ok(); + etx.send(WorkerEvent::Completed(i, r, t.elapsed())).ok(); } }); } + // Drop controller's event_tx: event_rx closes when all workers + + // timer have exited. + drop(event_tx); - // Drop the controller's sender: result_rx closes once all worker - // rtx clones are dropped (i.e. all workers have exited). - drop(result_tx); + // ── Controller ──────────────────────────────────────────────────── + activate_tx.send(()).ok(); + let mut n_active = 1usize; + let mut cpu_sample = CpuSample::now(); + let mut eff_at_last_spawn = 0.0f64; // 0 = no previous spawn to evaluate + let mut completed = 0usize; - // Drain results concurrently with workers. The for loop exits - // when result_rx is disconnected — at that point all workers are - // done and the scope join below is instantaneous. - for (i, r, dur) in &result_rx { - match r { - Ok(v) => on_done(i, v, dur), - Err(e) => { if first_err.is_none() { first_err = Some(e); } } + while completed < n_total { + let Ok(event) = event_rx.recv() else { break }; + match event { + WorkerEvent::Completed(i, r, dur) => { + match r { + Ok(v) => on_done(i, v, dur), + Err(e) => { if first_err.is_none() { first_err = Some(e); } } + } + completed += 1; + // Reset the 30 s timer. + reset_tx.send(()).ok(); + // Inline check: same logic as a timer tick. + maybe_activate( + &activate_tx, &mut n_active, max_workers, + &mut cpu_sample, &mut eff_at_last_spawn, + n_cores, SPAWN_THRESHOLD, completed, n_total, + ); + } + WorkerEvent::TimerTick => { + maybe_activate( + &activate_tx, &mut n_active, max_workers, + &mut cpu_sample, &mut eff_at_last_spawn, + n_cores, SPAWN_THRESHOLD, completed, n_total, + ); + } } } + + // Dormant workers exit when activate_tx closes. + drop(activate_tx); + // Timer thread exits when reset_tx closes. + drop(reset_tx); }); match first_err { @@ -249,3 +311,51 @@ impl PartitionRunner { } } } + +// ── Internal event type ─────────────────────────────────────────────────────── + +enum WorkerEvent { + Completed(usize, Result, Duration), + TimerTick, +} + +fn maybe_activate( + activate_tx: &crossbeam_channel::Sender<()>, + n_active: &mut usize, + max_workers: usize, + cpu_sample: &mut CpuSample, + eff_at_last_spawn: &mut f64, + n_cores: usize, + threshold: f64, + completed: usize, + n_total: usize, +) { + if *n_active >= max_workers || completed >= n_total { return; } + + let eff = cpu_sample.cpu_efficiency(n_cores); + if eff >= threshold { return; } // CPU already saturated + + // Check that the previous activation was beneficial enough. + // Going from k-1 → k workers, the minimum acceptable speedup is (k-1+0.2)/(k-1). + // For the very first extra worker (n_active == 1, no previous spawn), skip this + // check: eff_at_last_spawn == 0 acts as the sentinel. + let last_spawn_was_beneficial = if *eff_at_last_spawn < 1e-9 { + true // first additional worker: no prior data to evaluate + } else { + let k_before = (*n_active - 1) as f64; + let min_speedup = (k_before + 0.2) / k_before; + let actual_speedup = eff / *eff_at_last_spawn; + actual_speedup >= min_speedup + }; + + if last_spawn_was_beneficial { + activate_tx.send(()).ok(); + *eff_at_last_spawn = eff; + *n_active += 1; + *cpu_sample = CpuSample::now(); + debug!( + "activated worker {}/{} — efficiency {:.0}%", + n_active, max_workers, eff * 100.0, + ); + } +} diff --git a/src/obikmer/Cargo.toml b/src/obikmer/Cargo.toml index 7cd08a0..936e46a 100644 --- a/src/obikmer/Cargo.toml +++ b/src/obikmer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "obikmer" -version = "1.1.14" +version = "1.1.15" edition = "2024" [[bin]]