From 09d9e217449054012d7ea68b9e4069e93f3ee7de Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 8 Jun 2026 19:48:17 +0200 Subject: [PATCH 1/3] feat: integrate tracing and enhance bit matrix operations Add the `tracing` crate to `obidebruinj`, `obisys`, and resolve it in `Cargo.lock`. Replace `eprintln!` statements with structured `debug!` and `info!` macros. Introduce a `TracedBar` wrapper for progress bars and enhance the `Stage` lifecycle to emit structured events for timing, memory metrics, and swap warnings. Add a progress spinner for unitig degree computation. Extend `PersistentBitMatrix` with columnar bit-vector operations and parallel distance methods, enabling uniform distance computations across all storage layouts while replacing previous panics with dimension-based fallbacks. --- src/Cargo.lock | 2 + src/obicompactvec/src/bitmatrix.rs | 136 +++++++++++++++++++-- src/obicompactvec/src/intmatrix.rs | 186 ++++++++++++++++++++++++++--- src/obidebruinj/Cargo.toml | 1 + src/obidebruinj/src/debruijn.rs | 15 +-- src/obikmer/src/cmd/unitig.rs | 4 +- src/obisys/Cargo.toml | 1 + src/obisys/src/lib.rs | 101 +++++++++++++++- 8 files changed, 405 insertions(+), 41 deletions(-) diff --git a/src/Cargo.lock b/src/Cargo.lock index 20e91ae..df90bda 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1490,6 +1490,7 @@ dependencies = [ "obifastwrite", "obikseq", "rayon", + "tracing", "xxhash-rust", ] @@ -1666,6 +1667,7 @@ dependencies = [ "indicatif", "libc", "sysinfo", + "tracing", ] [[package]] diff --git a/src/obicompactvec/src/bitmatrix.rs b/src/obicompactvec/src/bitmatrix.rs index 5fd426b..269f247 100644 --- a/src/obicompactvec/src/bitmatrix.rs +++ b/src/obicompactvec/src/bitmatrix.rs @@ -163,6 +163,108 @@ impl PackedBitMatrix { (self.mmap[self.data_offsets[c] + (slot >> 3)] >> (slot & 7)) & 1 != 0 }).collect() } + + #[inline] + fn col_bytes(&self, c: usize) -> &[u8] { + let start = self.data_offsets[c]; + let len = (self.n_rows + 7) / 8; + &self.mmap[start..start + len] + } + + fn count_ones_col(&self, c: usize) -> u64 { + let bytes = self.col_bytes(c); + let full = self.n_rows / 8; + let rem = self.n_rows % 8; + let mut n: u64 = bytes[..full].iter().map(|b| b.count_ones() as u64).sum(); + if rem > 0 { n += (bytes[full] & ((1u8 << rem) - 1)).count_ones() as u64; } + n + } + + fn pair_op(&self, i: usize, j: usize, and_or: bool) -> u64 { + let ai = self.col_bytes(i); + let aj = self.col_bytes(j); + let full = self.n_rows / 8; + let rem = self.n_rows % 8; + let mut n: u64 = ai[..full].iter().zip(aj[..full].iter()) + .map(|(a, b)| if and_or { a & b } else { a ^ b }.count_ones() as u64) + .sum(); + if rem > 0 { + let mask = (1u8 << rem) - 1; + let last = if and_or { ai[full] & aj[full] } else { ai[full] ^ aj[full] }; + n += (last & mask).count_ones() as u64; + } + n + } + + fn partial_jaccard_col(&self, i: usize, j: usize) -> (u64, u64) { + let ai = self.col_bytes(i); + let aj = self.col_bytes(j); + let full = self.n_rows / 8; + let rem = self.n_rows % 8; + let (mut inter, mut union) = ai[..full].iter().zip(aj[..full].iter()) + .fold((0u64, 0u64), |(inter, union), (a, b)| { + (inter + (a & b).count_ones() as u64, + union + (a | b).count_ones() as u64) + }); + if rem > 0 { + let mask = (1u8 << rem) - 1; + inter += ((ai[full] & aj[full]) & mask).count_ones() as u64; + union += ((ai[full] | aj[full]) & mask).count_ones() as u64; + } + (inter, union) + } + + pub(crate) fn count_ones(&self) -> Array1 { + Array1::from_vec( + (0..self.n_cols).into_par_iter().map(|c| self.count_ones_col(c)).collect() + ) + } + + pub(crate) fn jaccard_dist_matrix(&self) -> Array2 { + let n = self.n_cols; + let results: Vec<(usize, usize, f64)> = upper_pairs(n) + .into_par_iter() + .map(|(i, j)| { + let (inter, union) = self.partial_jaccard_col(i, j); + let d = if union == 0 { 0.0 } else { 1.0 - inter as f64 / union as f64 }; + (i, j, d) + }) + .collect(); + fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) + } + + pub(crate) fn hamming_dist_matrix(&self) -> Array2 { + let n = self.n_cols; + let results: Vec<(usize, usize, u64)> = upper_pairs(n) + .into_par_iter() + .map(|(i, j)| (i, j, self.pair_op(i, j, false))) + .collect(); + fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) + } + + pub(crate) fn partial_jaccard_dist_matrix(&self) -> (Array2, Array2) { + let n = self.n_cols; + let results: Vec<(usize, usize, u64, u64)> = upper_pairs(n) + .into_par_iter() + .map(|(i, j)| { let (inter, union) = self.partial_jaccard_col(i, j); (i, j, inter, union) }) + .collect(); + let mut inter_m = Array2::zeros((n, n)); + let mut union_m = Array2::zeros((n, n)); + for (i, j, inter, union) in results { + inter_m[[i, j]] = inter; inter_m[[j, i]] = inter; + union_m[[i, j]] = union; union_m[[j, i]] = union; + } + (inter_m, union_m) + } + + pub(crate) fn partial_hamming_dist_matrix(&self) -> Array2 { + let n = self.n_cols; + let results: Vec<(usize, usize, u64)> = upper_pairs(n) + .into_par_iter() + .map(|(i, j)| (i, j, self.pair_op(i, j, false))) + .collect(); + fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) + } } /// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files. @@ -282,36 +384,50 @@ impl PersistentBitMatrix { pub fn count_ones(&self) -> Array1 { match self { - Self::Columnar(m) => m.count_ones(), - _ => panic!("count_ones() only available on Columnar PersistentBitMatrix"), + Self::Columnar(m) => m.count_ones(), + Self::Packed(m) => m.count_ones(), + Self::Implicit { n_rows, n_cols } => Array1::from_elem(*n_cols, *n_rows as u64), } } pub fn jaccard_dist_matrix(&self) -> Array2 { match self { - Self::Columnar(m) => m.jaccard_dist_matrix(), - _ => panic!("jaccard_dist_matrix() only available on Columnar PersistentBitMatrix"), + Self::Columnar(m) => m.jaccard_dist_matrix(), + Self::Packed(m) => m.jaccard_dist_matrix(), + Self::Implicit { n_cols, .. } => Array2::zeros((*n_cols, *n_cols)), } } pub fn hamming_dist_matrix(&self) -> Array2 { match self { - Self::Columnar(m) => m.hamming_dist_matrix(), - _ => panic!("hamming_dist_matrix() only available on Columnar PersistentBitMatrix"), + Self::Columnar(m) => m.hamming_dist_matrix(), + Self::Packed(m) => m.hamming_dist_matrix(), + Self::Implicit { n_cols, .. } => Array2::zeros((*n_cols, *n_cols)), } } pub fn partial_jaccard_dist_matrix(&self) -> (Array2, Array2) { match self { - Self::Columnar(m) => m.partial_jaccard_dist_matrix(), - _ => panic!("partial_jaccard_dist_matrix() only available on Columnar PersistentBitMatrix"), + Self::Columnar(m) => m.partial_jaccard_dist_matrix(), + Self::Packed(m) => m.partial_jaccard_dist_matrix(), + Self::Implicit { n_rows, n_cols } => { + let v = *n_rows as u64; + let n = *n_cols; + let mut inter = Array2::zeros((n, n)); + let mut union = Array2::zeros((n, n)); + for i in 0..n { for j in 0..n { + inter[[i, j]] = v; union[[i, j]] = v; + }} + (inter, union) + } } } pub fn partial_hamming_dist_matrix(&self) -> Array2 { match self { - Self::Columnar(m) => m.partial_hamming_dist_matrix(), - _ => panic!("partial_hamming_dist_matrix() only available on Columnar PersistentBitMatrix"), + Self::Columnar(m) => m.partial_hamming_dist_matrix(), + Self::Packed(m) => m.partial_hamming_dist_matrix(), + Self::Implicit { n_cols, .. } => Array2::zeros((*n_cols, *n_cols)), } } diff --git a/src/obicompactvec/src/intmatrix.rs b/src/obicompactvec/src/intmatrix.rs index d1421e5..198f210 100644 --- a/src/obicompactvec/src/intmatrix.rs +++ b/src/obicompactvec/src/intmatrix.rs @@ -266,6 +266,161 @@ impl PackedCompactIntMatrix { pub(crate) fn row(&self, slot: usize) -> Box<[u32]> { (0..self.n_cols).map(|c| self.get(c, slot)).collect() } + + pub(crate) fn sum(&self) -> Array1 { + Array1::from_vec( + (0..self.n_cols).into_par_iter() + .map(|c| (0..self.n_rows).map(|s| self.get(c, s) as u64).sum()) + .collect() + ) + } + + // ── Pair primitives ─────────────────────────────────────────────────────── + + fn pair_partial_bray(&self, i: usize, j: usize) -> u64 { + (0..self.n_rows).map(|s| self.get(i, s).min(self.get(j, s)) as u64).sum() + } + + fn pair_partial_euclidean(&self, i: usize, j: usize) -> f64 { + (0..self.n_rows).map(|s| { + let d = self.get(i, s) as f64 - self.get(j, s) as f64; + d * d + }).sum() + } + + fn pair_partial_threshold_jaccard(&self, i: usize, j: usize, t: u32) -> (u64, u64) { + let (mut inter, mut union) = (0u64, 0u64); + for s in 0..self.n_rows { + let a = self.get(i, s) >= t; + let b = self.get(j, s) >= t; + if a && b { inter += 1; } + if a || b { union += 1; } + } + (inter, union) + } + + fn pair_partial_relfreq_bray(&self, i: usize, j: usize, si: f64, sj: f64) -> f64 { + if si == 0.0 || sj == 0.0 { return 0.0; } + (0..self.n_rows).map(|s| { + (self.get(i, s) as f64 / si).min(self.get(j, s) as f64 / sj) + }).sum() + } + + fn pair_partial_relfreq_euclidean(&self, i: usize, j: usize, si: f64, sj: f64) -> f64 { + if si == 0.0 || sj == 0.0 { return 0.0; } + (0..self.n_rows).map(|s| { + let d = self.get(i, s) as f64 / si - self.get(j, s) as f64 / sj; + d * d + }).sum() + } + + fn pair_partial_hellinger(&self, i: usize, j: usize, si: f64, sj: f64) -> f64 { + if si == 0.0 || sj == 0.0 { return 0.0; } + (0..self.n_rows).map(|s| { + let d = (self.get(i, s) as f64 / si).sqrt() - (self.get(j, s) as f64 / sj).sqrt(); + d * d + }).sum() + } + + // ── Matrix methods ──────────────────────────────────────────────────────── + + fn pairwise(&self, f: impl Fn(usize, usize) -> T + Sync) -> Array2 + where T: Clone + Default + Send { + let n = self.n_cols; + let results: Vec<(usize, usize, T)> = upper_pairs(n) + .into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect(); + fill_symmetric(n, results.into_iter().map(|(i, j, v)| { let w = v.clone(); (i, j, v, w) })) + } + + fn pairwise_u64(&self, f: impl Fn(usize, usize) -> u64 + Sync) -> Array2 { + let n = self.n_cols; + let results: Vec<(usize, usize, u64)> = upper_pairs(n) + .into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect(); + fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) + } + + pub(crate) fn partial_bray_dist_matrix(&self) -> Array2 { + self.pairwise_u64(|i, j| self.pair_partial_bray(i, j)) + } + + pub(crate) fn bray_dist_matrix(&self) -> Array2 { + let col_sums = self.sum(); + let sum_min = self.partial_bray_dist_matrix(); + let n = self.n_cols; + let mut m = Array2::zeros((n, n)); + for i in 0..n { for j in 0..n { + if i != j { + let denom = col_sums[i] + col_sums[j]; + m[[i, j]] = if denom == 0 { 0.0 } + else { 1.0 - 2.0 * sum_min[[i, j]] as f64 / denom as f64 }; + } + }} + m + } + + pub(crate) fn partial_euclidean_dist_matrix(&self) -> Array2 { + self.pairwise(|i, j| self.pair_partial_euclidean(i, j)) + } + + pub(crate) fn euclidean_dist_matrix(&self) -> Array2 { + self.pairwise(|i, j| self.pair_partial_euclidean(i, j).sqrt()) + } + + pub(crate) fn partial_threshold_jaccard_dist_matrix(&self, t: u32) -> (Array2, Array2) { + let n = self.n_cols; + let results: Vec<(usize, usize, u64, u64)> = upper_pairs(n) + .into_par_iter() + .map(|(i, j)| { let (inter, union) = self.pair_partial_threshold_jaccard(i, j, t); (i, j, inter, union) }) + .collect(); + let mut inter_m = Array2::zeros((n, n)); + let mut union_m = Array2::zeros((n, n)); + for (i, j, inter, union) in results { + inter_m[[i, j]] = inter; inter_m[[j, i]] = inter; + union_m[[i, j]] = union; union_m[[j, i]] = union; + } + (inter_m, union_m) + } + + pub(crate) fn jaccard_dist_matrix(&self) -> Array2 { + self.pairwise(|i, j| { + let (inter, union) = self.pair_partial_threshold_jaccard(i, j, 1); + if union == 0 { 0.0 } else { 1.0 - inter as f64 / union as f64 } + }) + } + + pub(crate) fn threshold_jaccard_dist_matrix(&self, t: u32) -> Array2 { + self.pairwise(|i, j| { + let (inter, union) = self.pair_partial_threshold_jaccard(i, j, t); + if union == 0 { 0.0 } else { 1.0 - inter as f64 / union as f64 } + }) + } + + pub(crate) fn partial_relfreq_bray_dist_matrix(&self, col_sums: &Array1) -> Array2 { + self.pairwise(|i, j| self.pair_partial_relfreq_bray(i, j, col_sums[i] as f64, col_sums[j] as f64)) + } + + pub(crate) fn relfreq_bray_dist_matrix(&self) -> Array2 { + let col_sums = self.sum(); + self.partial_relfreq_bray_dist_matrix(&col_sums) + } + + pub(crate) fn partial_relfreq_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { + self.pairwise(|i, j| self.pair_partial_relfreq_euclidean(i, j, col_sums[i] as f64, col_sums[j] as f64)) + } + + pub(crate) fn relfreq_euclidean_dist_matrix(&self) -> Array2 { + let col_sums = self.sum(); + self.partial_relfreq_euclidean_dist_matrix(&col_sums) + } + + pub(crate) fn partial_hellinger_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { + self.pairwise(|i, j| self.pair_partial_hellinger(i, j, col_sums[i] as f64, col_sums[j] as f64)) + } + + pub(crate) fn hellinger_dist_matrix(&self) -> Array2 { + let col_sums = self.sum(); + self.partial_hellinger_euclidean_dist_matrix(&col_sums) + } } /// Build `counts/matrix.pcmx` from existing `col_*.pciv` files. @@ -350,50 +505,47 @@ impl PersistentCompactIntMatrix { } pub fn sum(&self) -> Array1 { - match self { - Self::Columnar(m) => m.sum(), - _ => panic!("sum() only available on Columnar PersistentCompactIntMatrix"), - } + match self { Self::Columnar(m) => m.sum(), Self::Packed(m) => m.sum() } } pub fn bray_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.bray_dist_matrix(), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.bray_dist_matrix(), Self::Packed(m) => m.bray_dist_matrix() } } pub fn relfreq_bray_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.relfreq_bray_dist_matrix(), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.relfreq_bray_dist_matrix(), Self::Packed(m) => m.relfreq_bray_dist_matrix() } } pub fn euclidean_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.euclidean_dist_matrix(), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.euclidean_dist_matrix(), Self::Packed(m) => m.euclidean_dist_matrix() } } pub fn relfreq_euclidean_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.relfreq_euclidean_dist_matrix(), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.relfreq_euclidean_dist_matrix(), Self::Packed(m) => m.relfreq_euclidean_dist_matrix() } } pub fn hellinger_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.hellinger_dist_matrix(), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.hellinger_dist_matrix(), Self::Packed(m) => m.hellinger_dist_matrix() } } pub fn jaccard_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.jaccard_dist_matrix(), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.jaccard_dist_matrix(), Self::Packed(m) => m.jaccard_dist_matrix() } } pub fn threshold_jaccard_dist_matrix(&self, threshold: u32) -> Array2 { - match self { Self::Columnar(m) => m.threshold_jaccard_dist_matrix(threshold), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.threshold_jaccard_dist_matrix(threshold), Self::Packed(m) => m.threshold_jaccard_dist_matrix(threshold) } } pub fn partial_bray_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.partial_bray_dist_matrix(), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.partial_bray_dist_matrix(), Self::Packed(m) => m.partial_bray_dist_matrix() } } pub fn partial_euclidean_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.partial_euclidean_dist_matrix(), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.partial_euclidean_dist_matrix(), Self::Packed(m) => m.partial_euclidean_dist_matrix() } } pub fn partial_threshold_jaccard_dist_matrix(&self, threshold: u32) -> (Array2, Array2) { - match self { Self::Columnar(m) => m.partial_threshold_jaccard_dist_matrix(threshold), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.partial_threshold_jaccard_dist_matrix(threshold), Self::Packed(m) => m.partial_threshold_jaccard_dist_matrix(threshold) } } pub fn partial_relfreq_bray_dist_matrix(&self, col_sums: &Array1) -> Array2 { - match self { Self::Columnar(m) => m.partial_relfreq_bray_dist_matrix(col_sums), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.partial_relfreq_bray_dist_matrix(col_sums), Self::Packed(m) => m.partial_relfreq_bray_dist_matrix(col_sums) } } pub fn partial_relfreq_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { - match self { Self::Columnar(m) => m.partial_relfreq_euclidean_dist_matrix(col_sums), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.partial_relfreq_euclidean_dist_matrix(col_sums), Self::Packed(m) => m.partial_relfreq_euclidean_dist_matrix(col_sums) } } pub fn partial_hellinger_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { - match self { Self::Columnar(m) => m.partial_hellinger_euclidean_dist_matrix(col_sums), _ => panic!("Columnar only") } + match self { Self::Columnar(m) => m.partial_hellinger_euclidean_dist_matrix(col_sums), Self::Packed(m) => m.partial_hellinger_euclidean_dist_matrix(col_sums) } } pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> io::Result<()> { diff --git a/src/obidebruinj/Cargo.toml b/src/obidebruinj/Cargo.toml index 82e1cad..acbc742 100644 --- a/src/obidebruinj/Cargo.toml +++ b/src/obidebruinj/Cargo.toml @@ -10,6 +10,7 @@ ahash = "0.8" hashbrown = { version = "0.14", features = ["rayon"] } rayon = "1" xxhash-rust = { version = "0.8.15", features = ["xxh3", "const_xxh3"] } +tracing = "0.1" [features] test-utils = [] diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index c0b151a..4c499b4 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -2,10 +2,11 @@ use hashbrown::HashMap; use obikseq::k; use obikseq::{CanonicalKmer, Sequence}; -use rayon::prelude::*; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::fmt; use std::sync::atomic::{AtomicU8, Ordering}; use xxhash_rust::xxh3::Xxh3Builder; +use tracing::{debug, info}; // ── Types ───────────────────────────────────────────────────────────────────── @@ -379,7 +380,7 @@ impl GraphDeBruijn { }); let n = n_new.load(Ordering::Relaxed); - eprintln!("[for_each_unitig] pass {}: {} starts", pass, n); + debug!("[for_each_unitig] pass {}: {} starts", pass, n); n_chains.fetch_add(n, Ordering::Relaxed); pass += 1; if n == 0 { @@ -410,11 +411,11 @@ impl GraphDeBruijn { } } - eprintln!( - "[for_each_unitig] chains={} phase2={} total={}", - n_chains.load(Ordering::Relaxed), - n2.load(Ordering::Relaxed), - n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed), + info!( + chains = n_chains.load(Ordering::Relaxed), + phase2 = n2.load(Ordering::Relaxed), + total = n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed), + "unitig traversal complete" ); } diff --git a/src/obikmer/src/cmd/unitig.rs b/src/obikmer/src/cmd/unitig.rs index 0b4a4f1..461943d 100644 --- a/src/obikmer/src/cmd/unitig.rs +++ b/src/obikmer/src/cmd/unitig.rs @@ -65,9 +65,11 @@ pub fn run(args: UnitigArgs) { info!("unitig: {} distinct k-mers", g.len()); - // ── Phase 2 : compute degrees (in-memory, no progress needed) ──────────── + // ── Phase 2 : compute degrees ───────────────────────────────────────────── + let pb = spinner("degrees"); let stage = Stage::start("compute degrees"); g.compute_degrees_and_mark_starts(); + pb.finish_and_clear(); rep.push(stage.stop()); // ── Phase 3 : enumerate unitigs and write as FASTA ─────────────────────── diff --git a/src/obisys/Cargo.toml b/src/obisys/Cargo.toml index eabb9e6..da677db 100644 --- a/src/obisys/Cargo.toml +++ b/src/obisys/Cargo.toml @@ -7,3 +7,4 @@ edition = "2024" libc = "0.2" sysinfo = "0.33" indicatif = "0.17" +tracing = "0.1" diff --git a/src/obisys/src/lib.rs b/src/obisys/src/lib.rs index 49b80c0..b57578f 100644 --- a/src/obisys/src/lib.rs +++ b/src/obisys/src/lib.rs @@ -1,13 +1,79 @@ use std::fmt; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use indicatif::{ProgressBar, ProgressStyle}; +use tracing::{info, warn}; const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; +// ── TracedBar ────────────────────────────────────────────────────────────────── + +/// Wrapper around `ProgressBar` that emits `tracing` events when stderr is not +/// a TTY (e.g. HPC job logs): every 10% for bounded bars, every ~10 s for +/// spinners (throttled on `set_message`). +pub struct TracedBar { + pb: ProgressBar, + label: String, + unit: String, + total: u64, // 0 for spinners + start: Instant, // creation time, for spinner throttling + last_pct: AtomicU64, // last emitted 10%-bucket (1..=10), 0 = none yet + last_log_ms: AtomicU64, // ms since `start` at last spinner log +} + +impl TracedBar { + pub fn inc(&self, delta: u64) { + self.pb.inc(delta); + if self.pb.is_hidden() && self.total > 0 { + let pos = self.pb.position(); + let pct10 = (pos * 10) / self.total; // 0..=10 + let last = self.last_pct.load(Ordering::Relaxed); + if pct10 > last + && self.last_pct + .compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + info!( + stage = %self.label, + progress = format_args!("{}%", pct10 * 10), + "{}/{} {}", + pos, self.total, self.unit + ); + } + } + } + + pub fn set_message(&self, msg: impl Into) { + let msg = msg.into(); + if self.pb.is_hidden() { + if self.total > 0 { + // bounded bar: always log (already rate-limited by 10% threshold in inc) + info!(stage = %self.label, "{msg}"); + } else { + // spinner: throttle to ~10 s + let now_ms = self.start.elapsed().as_millis() as u64; + let last = self.last_log_ms.load(Ordering::Relaxed); + if now_ms >= last + 10_000 + && self.last_log_ms + .compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + info!(stage = %self.label, "{msg}"); + } + } + } + self.pb.set_message(msg); + } + + pub fn finish_and_clear(&self) { + self.pb.finish_and_clear(); + } +} + /// Spinner with the standard project look: `⠋ label — msg 0s`. /// Caller updates the message with `pb.set_message(...)`. -pub fn spinner(label: &str) -> ProgressBar { +pub fn spinner(label: &str) -> TracedBar { let pb = ProgressBar::new_spinner(); pb.set_style( ProgressStyle::with_template(&format!("{{spinner}} {label} — {{msg}} {{elapsed}}")) @@ -15,12 +81,15 @@ pub fn spinner(label: &str) -> ProgressBar { .tick_strings(BRAILLE), ); pb.enable_steady_tick(Duration::from_millis(100)); - pb + TracedBar { + pb, label: label.to_string(), unit: String::new(), total: 0, + start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0), + } } /// Progress bar with the standard project look: /// `⠋ label — [████░░░░] pos/len unit elapsed`. -pub fn progress_bar(label: &str, n: u64, unit: &str) -> ProgressBar { +pub fn progress_bar(label: &str, n: u64, unit: &str) -> TracedBar { let pb = ProgressBar::new(n); pb.set_style( ProgressStyle::with_template(&format!( @@ -30,7 +99,10 @@ pub fn progress_bar(label: &str, n: u64, unit: &str) -> ProgressBar { .tick_strings(BRAILLE), ); pb.enable_steady_tick(Duration::from_millis(100)); - pb + TracedBar { + pb, label: label.to_string(), unit: unit.to_string(), total: n, + start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0), + } } use libc::{RUSAGE_SELF, getrusage, rusage, timeval}; @@ -83,13 +155,15 @@ pub struct Stage { impl Stage { pub fn start(label: impl Into) -> Self { - Self { label: label.into(), wall: Instant::now(), ru: get_rusage() } + let label = label.into(); + info!(stage = %label, "started"); + Self { label, wall: Instant::now(), ru: get_rusage() } } pub fn stop(self) -> StageStats { let wall_secs = self.wall.elapsed().as_secs_f64(); let end = get_rusage(); - StageStats { + let stats = StageStats { label: self.label, wall_secs, user_secs: tv_to_secs(end.ru_utime) - tv_to_secs(self.ru.ru_utime), @@ -102,7 +176,22 @@ impl Stage { in_blocks: delta(end.ru_inblock as i64, self.ru.ru_inblock as i64), out_blocks: delta(end.ru_oublock as i64, self.ru.ru_oublock as i64), swaps: delta(end.ru_nswap as i64, self.ru.ru_nswap as i64), + }; + info!( + stage = %stats.label, + wall_secs = format_args!("{:.3}", stats.wall_secs), + rss = %fmt_bytes(stats.max_rss_bytes), + swaps = stats.swaps, + "done" + ); + if stats.swaps > 0 { + warn!( + stage = %stats.label, + swaps = stats.swaps, + "working set exceeds available RAM" + ); } + stats } } From 1ec65922df342bb6356040fa5fb7e12c8c46f688 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 8 Jun 2026 20:04:51 +0200 Subject: [PATCH 2/3] feat: implement parallel pairwise distance matrices Introduces parallelized pairwise distance matrix computation for Jaccard, Hamming, Bray-Curtis, Euclidean, and Hellinger metrics across `Columnar`, `Packed`, and `Implicit` matrix variants. Adds trait methods and convenience wrappers, safely handles normalization and zero-denominator edge cases, and updates test suites to import required traits for validation. --- src/obicompactvec/src/bitmatrix.rs | 55 ------------ src/obicompactvec/src/intmatrix.rs | 108 ----------------------- src/obicompactvec/src/tests/bitmatrix.rs | 1 + src/obicompactvec/src/tests/intmatrix.rs | 1 + src/obicompactvec/src/traits.rs | 4 + 5 files changed, 6 insertions(+), 163 deletions(-) diff --git a/src/obicompactvec/src/bitmatrix.rs b/src/obicompactvec/src/bitmatrix.rs index 269f247..631db63 100644 --- a/src/obicompactvec/src/bitmatrix.rs +++ b/src/obicompactvec/src/bitmatrix.rs @@ -53,14 +53,6 @@ impl ColumnarBitMatrix { Array1::from_vec(counts) } - pub(crate) fn jaccard_dist_matrix(&self) -> Array2 { - self.pairwise_f64(|i, j| self.col(i).jaccard_dist(self.col(j))) - } - - pub(crate) fn hamming_dist_matrix(&self) -> Array2 { - self.pairwise_u64(|i, j| self.col(i).hamming_dist(self.col(j))) - } - pub(crate) fn partial_jaccard_dist_matrix(&self) -> (Array2, Array2) { let n = self.n_cols(); let results: Vec<(usize, usize, u64, u64)> = upper_pairs(n) @@ -83,15 +75,6 @@ impl ColumnarBitMatrix { self.pairwise_u64(|i, j| self.col(i).hamming_dist(self.col(j))) } - fn pairwise_f64(&self, f: impl Fn(usize, usize) -> f64 + Sync) -> Array2 { - let n = self.n_cols(); - let results: Vec<(usize, usize, f64)> = upper_pairs(n) - .into_par_iter() - .map(|(i, j)| (i, j, f(i, j))) - .collect(); - fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) - } - fn pairwise_u64(&self, f: impl Fn(usize, usize) -> u64 + Sync) -> Array2 { let n = self.n_cols(); let results: Vec<(usize, usize, u64)> = upper_pairs(n) @@ -220,28 +203,6 @@ impl PackedBitMatrix { ) } - pub(crate) fn jaccard_dist_matrix(&self) -> Array2 { - let n = self.n_cols; - let results: Vec<(usize, usize, f64)> = upper_pairs(n) - .into_par_iter() - .map(|(i, j)| { - let (inter, union) = self.partial_jaccard_col(i, j); - let d = if union == 0 { 0.0 } else { 1.0 - inter as f64 / union as f64 }; - (i, j, d) - }) - .collect(); - fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) - } - - pub(crate) fn hamming_dist_matrix(&self) -> Array2 { - let n = self.n_cols; - let results: Vec<(usize, usize, u64)> = upper_pairs(n) - .into_par_iter() - .map(|(i, j)| (i, j, self.pair_op(i, j, false))) - .collect(); - fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) - } - pub(crate) fn partial_jaccard_dist_matrix(&self) -> (Array2, Array2) { let n = self.n_cols; let results: Vec<(usize, usize, u64, u64)> = upper_pairs(n) @@ -390,22 +351,6 @@ impl PersistentBitMatrix { } } - pub fn jaccard_dist_matrix(&self) -> Array2 { - match self { - Self::Columnar(m) => m.jaccard_dist_matrix(), - Self::Packed(m) => m.jaccard_dist_matrix(), - Self::Implicit { n_cols, .. } => Array2::zeros((*n_cols, *n_cols)), - } - } - - pub fn hamming_dist_matrix(&self) -> Array2 { - match self { - Self::Columnar(m) => m.hamming_dist_matrix(), - Self::Packed(m) => m.hamming_dist_matrix(), - Self::Implicit { n_cols, .. } => Array2::zeros((*n_cols, *n_cols)), - } - } - pub fn partial_jaccard_dist_matrix(&self) -> (Array2, Array2) { match self { Self::Columnar(m) => m.partial_jaccard_dist_matrix(), diff --git a/src/obicompactvec/src/intmatrix.rs b/src/obicompactvec/src/intmatrix.rs index 198f210..37547c6 100644 --- a/src/obicompactvec/src/intmatrix.rs +++ b/src/obicompactvec/src/intmatrix.rs @@ -54,23 +54,6 @@ impl ColumnarCompactIntMatrix { Array1::from_vec(sums) } - pub(crate) fn bray_dist_matrix(&self) -> Array2 { - let sum_min = self.partial_bray_dist_matrix(); - let col_sums = self.sum(); - let n = self.n_cols(); - let mut m = Array2::zeros((n, n)); - for i in 0..n { - for j in 0..n { - if i != j { - let denom = col_sums[i] + col_sums[j]; - m[[i, j]] = if denom == 0 { 0.0 } - else { 1.0 - 2.0 * sum_min[[i, j]] as f64 / denom as f64 }; - } - } - } - m - } - pub(crate) fn partial_bray_dist_matrix(&self) -> Array2 { self.pairwise_u64(|i, j| self.col(i).partial_bray_dist(self.col(j))) } @@ -119,30 +102,6 @@ impl ColumnarCompactIntMatrix { }) } - pub(crate) fn relfreq_bray_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| self.col(i).relfreq_bray_dist(self.col(j))) - } - - pub(crate) fn euclidean_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| self.col(i).euclidean_dist(self.col(j))) - } - - pub(crate) fn relfreq_euclidean_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| self.col(i).relfreq_euclidean_dist(self.col(j))) - } - - pub(crate) fn hellinger_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| self.col(i).hellinger_dist(self.col(j))) - } - - pub(crate) fn jaccard_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| self.col(i).jaccard_dist(self.col(j))) - } - - pub(crate) fn threshold_jaccard_dist_matrix(&self, threshold: u32) -> Array2 { - self.pairwise(|i, j| self.col(i).threshold_jaccard_dist(self.col(j), threshold)) - } - pub(crate) fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> io::Result<()> { let mut meta = MatrixMeta::load(dir)?; let mut b = PersistentCompactIntVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?; @@ -343,29 +302,11 @@ impl PackedCompactIntMatrix { self.pairwise_u64(|i, j| self.pair_partial_bray(i, j)) } - pub(crate) fn bray_dist_matrix(&self) -> Array2 { - let col_sums = self.sum(); - let sum_min = self.partial_bray_dist_matrix(); - let n = self.n_cols; - let mut m = Array2::zeros((n, n)); - for i in 0..n { for j in 0..n { - if i != j { - let denom = col_sums[i] + col_sums[j]; - m[[i, j]] = if denom == 0 { 0.0 } - else { 1.0 - 2.0 * sum_min[[i, j]] as f64 / denom as f64 }; - } - }} - m - } pub(crate) fn partial_euclidean_dist_matrix(&self) -> Array2 { self.pairwise(|i, j| self.pair_partial_euclidean(i, j)) } - pub(crate) fn euclidean_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| self.pair_partial_euclidean(i, j).sqrt()) - } - pub(crate) fn partial_threshold_jaccard_dist_matrix(&self, t: u32) -> (Array2, Array2) { let n = self.n_cols; let results: Vec<(usize, usize, u64, u64)> = upper_pairs(n) @@ -381,46 +322,18 @@ impl PackedCompactIntMatrix { (inter_m, union_m) } - pub(crate) fn jaccard_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| { - let (inter, union) = self.pair_partial_threshold_jaccard(i, j, 1); - if union == 0 { 0.0 } else { 1.0 - inter as f64 / union as f64 } - }) - } - - pub(crate) fn threshold_jaccard_dist_matrix(&self, t: u32) -> Array2 { - self.pairwise(|i, j| { - let (inter, union) = self.pair_partial_threshold_jaccard(i, j, t); - if union == 0 { 0.0 } else { 1.0 - inter as f64 / union as f64 } - }) - } - pub(crate) fn partial_relfreq_bray_dist_matrix(&self, col_sums: &Array1) -> Array2 { self.pairwise(|i, j| self.pair_partial_relfreq_bray(i, j, col_sums[i] as f64, col_sums[j] as f64)) } - pub(crate) fn relfreq_bray_dist_matrix(&self) -> Array2 { - let col_sums = self.sum(); - self.partial_relfreq_bray_dist_matrix(&col_sums) - } - pub(crate) fn partial_relfreq_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { self.pairwise(|i, j| self.pair_partial_relfreq_euclidean(i, j, col_sums[i] as f64, col_sums[j] as f64)) } - pub(crate) fn relfreq_euclidean_dist_matrix(&self) -> Array2 { - let col_sums = self.sum(); - self.partial_relfreq_euclidean_dist_matrix(&col_sums) - } - pub(crate) fn partial_hellinger_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { self.pairwise(|i, j| self.pair_partial_hellinger(i, j, col_sums[i] as f64, col_sums[j] as f64)) } - pub(crate) fn hellinger_dist_matrix(&self) -> Array2 { - let col_sums = self.sum(); - self.partial_hellinger_euclidean_dist_matrix(&col_sums) - } } /// Build `counts/matrix.pcmx` from existing `col_*.pciv` files. @@ -508,27 +421,6 @@ impl PersistentCompactIntMatrix { match self { Self::Columnar(m) => m.sum(), Self::Packed(m) => m.sum() } } - pub fn bray_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.bray_dist_matrix(), Self::Packed(m) => m.bray_dist_matrix() } - } - pub fn relfreq_bray_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.relfreq_bray_dist_matrix(), Self::Packed(m) => m.relfreq_bray_dist_matrix() } - } - pub fn euclidean_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.euclidean_dist_matrix(), Self::Packed(m) => m.euclidean_dist_matrix() } - } - pub fn relfreq_euclidean_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.relfreq_euclidean_dist_matrix(), Self::Packed(m) => m.relfreq_euclidean_dist_matrix() } - } - pub fn hellinger_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.hellinger_dist_matrix(), Self::Packed(m) => m.hellinger_dist_matrix() } - } - pub fn jaccard_dist_matrix(&self) -> Array2 { - match self { Self::Columnar(m) => m.jaccard_dist_matrix(), Self::Packed(m) => m.jaccard_dist_matrix() } - } - pub fn threshold_jaccard_dist_matrix(&self, threshold: u32) -> Array2 { - match self { Self::Columnar(m) => m.threshold_jaccard_dist_matrix(threshold), Self::Packed(m) => m.threshold_jaccard_dist_matrix(threshold) } - } pub fn partial_bray_dist_matrix(&self) -> Array2 { match self { Self::Columnar(m) => m.partial_bray_dist_matrix(), Self::Packed(m) => m.partial_bray_dist_matrix() } } diff --git a/src/obicompactvec/src/tests/bitmatrix.rs b/src/obicompactvec/src/tests/bitmatrix.rs index d825d85..741a07c 100644 --- a/src/obicompactvec/src/tests/bitmatrix.rs +++ b/src/obicompactvec/src/tests/bitmatrix.rs @@ -1,6 +1,7 @@ use tempfile::tempdir; use crate::{PersistentBitMatrix, PersistentBitMatrixBuilder}; +use crate::traits::BitPartials; fn make_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) { let n = cols.first().map_or(0, |c| c.len()); diff --git a/src/obicompactvec/src/tests/intmatrix.rs b/src/obicompactvec/src/tests/intmatrix.rs index b9e1841..c4c0a98 100644 --- a/src/obicompactvec/src/tests/intmatrix.rs +++ b/src/obicompactvec/src/tests/intmatrix.rs @@ -1,6 +1,7 @@ use tempfile::tempdir; use crate::{PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder}; +use crate::traits::CountPartials; fn make_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) { let n = cols.first().map_or(0, |c| c.len()); diff --git a/src/obicompactvec/src/traits.rs b/src/obicompactvec/src/traits.rs index 7cccf48..f65ad9d 100644 --- a/src/obicompactvec/src/traits.rs +++ b/src/obicompactvec/src/traits.rs @@ -46,6 +46,10 @@ pub trait CountPartials: ColumnWeights { self.partial_euclidean().mapv(|v| v.sqrt()) } + fn jaccard_dist_matrix(&self) -> Array2 { + self.threshold_jaccard_dist_matrix(1) + } + fn threshold_jaccard_dist_matrix(&self, threshold: u32) -> Array2 { let (inter, union) = self.partial_threshold_jaccard(threshold); let n = inter.shape()[0]; From eb7805c7470e39812af2d00049d26dd850ea2c99 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 8 Jun 2026 20:13:07 +0200 Subject: [PATCH 3/3] feat: add configurable presence threshold to kmer distance Introduce a `--presence-threshold` CLI argument (default: 1) and update `KmerIndex::distance` to accept a `presence_threshold` parameter. This replaces hardcoded zero thresholds, enabling configurable filtering of low-abundance kmers during Jaccard distance calculations. --- src/obikindex/src/distance.rs | 7 +++---- src/obikmer/src/cmd/distance.rs | 6 +++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/obikindex/src/distance.rs b/src/obikindex/src/distance.rs index a1c9cb0..867cff0 100644 --- a/src/obikindex/src/distance.rs +++ b/src/obikindex/src/distance.rs @@ -52,7 +52,7 @@ impl DistanceMetric { // ── KmerIndex::distance ─────────────────────────────────────────────────────── impl KmerIndex { - pub fn distance(&self, metric: DistanceMetric, shared_kmers: bool) -> OKIResult { + pub fn distance(&self, metric: DistanceMetric, shared_kmers: bool, presence_threshold: u32) -> OKIResult { let n_genomes = self.meta.genomes.len(); if n_genomes < 2 { return Err(OKIError::InvalidInput( @@ -83,8 +83,7 @@ impl KmerIndex { DistanceMetric::RelfreqEuclidean => CountPartials::relfreq_euclidean_dist_matrix(&global), DistanceMetric::Hellinger => CountPartials::hellinger_dist_matrix(&global), DistanceMetric::HellingerEuclidean => CountPartials::hellinger_euclidean_dist_matrix(&global), - // Jaccard on count data: threshold at 0 (present if count > 0) - DistanceMetric::Jaccard => CountPartials::threshold_jaccard_dist_matrix(&global, 0), + DistanceMetric::Jaccard => CountPartials::threshold_jaccard_dist_matrix(&global, presence_threshold), DistanceMetric::Hamming => { return Err(OKIError::InvalidInput( "Hamming is only available for presence/absence indexes".into(), @@ -93,7 +92,7 @@ impl KmerIndex { }; let shared = if shared_kmers { - let (inter, _) = CountPartials::partial_threshold_jaccard(&global, 0); + let (inter, _) = CountPartials::partial_threshold_jaccard(&global, presence_threshold); Some(inter) } else { None diff --git a/src/obikmer/src/cmd/distance.rs b/src/obikmer/src/cmd/distance.rs index d6d4599..f66d78f 100644 --- a/src/obikmer/src/cmd/distance.rs +++ b/src/obikmer/src/cmd/distance.rs @@ -46,6 +46,10 @@ pub struct DistanceArgs { #[arg(long, value_enum, default_value = "jaccard")] pub metric: MetricArg, + /// Minimum count to consider a kmer present when computing Jaccard on count indexes + #[arg(long, default_value = "1")] + pub presence_threshold: u32, + /// Also output the shared-kmer count matrix (CSV) #[arg(long)] pub shared_kmers: bool, @@ -81,7 +85,7 @@ pub fn run(args: DistanceArgs) { let need_shared = args.shared_kmers || args.nj || args.upgma; let result = idx - .distance(args.metric.into(), need_shared) + .distance(args.metric.into(), need_shared, args.presence_threshold) .unwrap_or_else(|e| { eprintln!("error computing distances: {e}"); std::process::exit(1);