From 8b563d0804b65dcbc23b231d54b644138fbbfdb4 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sat, 13 Jun 2026 09:49:33 +0200 Subject: [PATCH 1/6] refactor: migrate pipeline stages and improve graph processing Refactored neighbor resolution to explicitly track unvisited indices for degree-1 nodes, updated display formatting, and added timing and debug logging to the degree computation routine. Migrated pipeline stages from eager vector returns to explicit flat implementations, enabling backpressure-aware streaming, configurable batch processing, incremental yielding, and progress tracking via a delta channel. --- src/obidebruinj/src/debruijn.rs | 45 +++------ src/obikpartitionner/src/merge_layer.rs | 116 ++++++++++++++++-------- 2 files changed, 91 insertions(+), 70 deletions(-) diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index 4c499b4..cebe17c 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -6,6 +6,7 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::fmt; use std::sync::atomic::{AtomicU8, Ordering}; use xxhash_rust::xxh3::Xxh3Builder; +use std::time::Instant; use tracing::{debug, info}; // ── Types ───────────────────────────────────────────────────────────────────── @@ -96,25 +97,6 @@ impl Node { (self.0 >> 5) & 0b11 } - /// Number of left neighbours. - pub fn n_left_neighbours(self) -> u8 { - if self.can_extend_left() { - 1 - } else { - let v = (self.0 >> 5) & 0b11; - v + (v != 0) as u8 - } - } - - /// Number of right neighbours. - pub fn n_right_neighbours(self) -> u8 { - if self.can_extend_right() { - 1 - } else { - let v = (self.0 >> 3) & 0b11; - v + (v != 0) as u8 - } - } /// Marks the node as visited. #[inline] @@ -162,13 +144,17 @@ impl fmt::Display for Node { const NUC: [char; 4] = ['A', 'C', 'G', 'T']; let r = if self.can_extend_right() { format!("→{}", NUC[self.right_nuc() as usize]) + } else if (self.0 >> 3) & 0b11 == 0 { + "→0".to_string() } else { - format!("→{}", self.n_right_neighbours()) + "→≥2".to_string() }; let l = if self.can_extend_left() { format!("←{}", NUC[self.left_nuc() as usize]) + } else if (self.0 >> 5) & 0b11 == 0 { + "←0".to_string() } else { - format!("←{}", self.n_left_neighbours()) + "←≥2".to_string() }; let v = if self.is_visited() { "V" } else { "." }; write!(f, "Node({r} {l} {v})") @@ -272,6 +258,7 @@ impl GraphDeBruijn { pub fn compute_degrees_and_mark_starts(&self) { // Pass 1: count right/left neighbors for each node + let t1 = Instant::now(); self.for_each_node(|kmer, atomic| { let mut old = Node(atomic.load(Ordering::Relaxed)); if old.is_visited() { @@ -286,9 +273,11 @@ impl GraphDeBruijn { node.set_left(lc, ln); atomic.store(node.0, Ordering::Relaxed); }); + debug!("[compute_degrees] pass 1 (degrees): {:?} — {} nodes", t1.elapsed(), self.nodes.len()); // Pass 2: mark start nodes + let t2 = Instant::now(); self.for_each_node(|kmer, atomic| { let mut node = Node(atomic.load(Ordering::Relaxed)); if node.is_visited() { @@ -299,6 +288,7 @@ impl GraphDeBruijn { atomic.store(node.0, Ordering::Relaxed); } }); + debug!("[compute_degrees] pass 2 (starts): {:?} — {} nodes", t2.elapsed(), self.nodes.len()); } pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option { @@ -527,23 +517,18 @@ fn count_neighbors( nodes: &FastHashMap, ) -> (u8, Option) { let mut count = 0u8; - let mut first = None; + let mut nuc = 0u8; for (i, neighbour) in neighbors.iter().enumerate() { if let Some(a) = nodes.get(neighbour) { if Node(a.load(Ordering::Relaxed)).is_visited() { continue; } + nuc = i as u8; count += 1; - if first.is_none() { - first = Some(i as u8); - } + if count >= 2 { return (2, None); } } } - if count == 1 { - (1, first) - } else { - (count, None) - } + if count == 1 { (1, Some(nuc)) } else { (0, None) } } // ── tests ───────────────────────────────────────────────────────────────────── diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 4733cf4..2b67dd4 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -3,7 +3,11 @@ use std::io; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; -use obipipeline::{Pipeline, WorkerPool, make_flat_transform, make_sink, make_source, make_transform}; +use tracing::debug; +use obipipeline::{ + Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool, + make_sink, make_source, make_transform, +}; use obicompactvec::{ PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, @@ -232,23 +236,38 @@ impl KmerPartition { let pipeline = Pipeline::new( make_source!(Pass1Data, unitig_paths, File), vec![ - make_flat_transform!(Pass1Data, { - move |path: PathBuf| -> Vec> { - match UnitigFileReader::open_sequential(&path) { - Err(e) => { - *err_cap.lock().unwrap() = Some(e.to_string()); - vec![] + Stage::Flat(Arc::new( + move |data: Pass1Data, + push: &PipelineSender>, + delta: &PipelineSender| + { + if let Pass1Data::File(path) = data { + let reader = match UnitigFileReader::open_sequential(&path) { + Ok(r) => r, + Err(e) => { + *err_cap.lock().unwrap() = Some(e.to_string()); + delta.send(-1).ok(); + return; + } + }; + let mut batch: Vec = Vec::with_capacity(BATCH); + let mut count: isize = 0; + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + batch.push(kmer); + if batch.len() == BATCH { + let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH)); + push.send(Ok(Pass1Data::Batch(b))).ok(); + count += 1; + } } - Ok(reader) => { - let kmers: Vec = reader - .iter_indexed_canonical_kmers() - .map(|(k, _, _)| k) - .collect(); - kmers.chunks(BATCH).map(|c| c.to_vec()).collect() + if !batch.is_empty() { + push.send(Ok(Pass1Data::Batch(batch))).ok(); + count += 1; } + delta.send(count - 1).ok(); } } - }, File, Batch), + ) as SharedFlatFn), make_transform!(Pass1Data, { move |batch: Vec| -> Vec { batch.into_iter() @@ -278,6 +297,7 @@ impl KmerPartition { .into_inner() .unwrap_or_else(|e| e.into_inner()); let any_new = g.len() > 0; + debug!("partition {i}: de Bruijn graph done — {} new kmers", g.len()); // Build new layer from de Bruijn graph if there are new kmers. let new_layer_idx = n_dst_layers; @@ -430,36 +450,52 @@ impl KmerPartition { let pipeline2 = Pipeline::new( make_source!(Pass2Data, pass2_items, SrcLayer), vec![ - make_flat_transform!(Pass2Data, { - move |(col_offset, src_n, src_layer_dir): (usize, usize, PathBuf)| - -> Vec<(usize, usize, Arc, Vec)> + Stage::Flat(Arc::new( + move |data: Pass2Data, + push: &PipelineSender>, + delta: &PipelineSender| { - let reader = match UnitigFileReader::open_sequential( - &src_layer_dir.join("unitigs.bin"), - ) { - Ok(r) => r, - Err(e) => { - *err_cap2.lock().unwrap() = Some(e.to_string()); - return vec![]; + if let Pass2Data::SrcLayer((col_offset, src_n, src_layer_dir)) = data { + let reader = match UnitigFileReader::open_sequential( + &src_layer_dir.join("unitigs.bin"), + ) { + Ok(r) => r, + Err(e) => { + *err_cap2.lock().unwrap() = Some(e.to_string()); + delta.send(-1).ok(); + return; + } + }; + let src_data = match SrcLayerData::open(&src_layer_dir, mode) { + Ok(d) => Arc::new(d), + Err(e) => { + *err_cap2.lock().unwrap() = Some(e.to_string()); + delta.send(-1).ok(); + return; + } + }; + let mut batch: Vec = Vec::with_capacity(BATCH); + let mut count: isize = 0; + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + batch.push(kmer); + if batch.len() == BATCH { + let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH)); + push.send(Ok(Pass2Data::RawBatch(( + col_offset, src_n, Arc::clone(&src_data), b, + )))).ok(); + count += 1; + } } - }; - let src_data = match SrcLayerData::open(&src_layer_dir, mode) { - Ok(d) => Arc::new(d), - Err(e) => { - *err_cap2.lock().unwrap() = Some(e.to_string()); - return vec![]; + if !batch.is_empty() { + push.send(Ok(Pass2Data::RawBatch(( + col_offset, src_n, src_data, batch, + )))).ok(); + count += 1; } - }; - let all_kmers: Vec = reader - .iter_indexed_canonical_kmers() - .map(|(kmer, _, _)| kmer) - .collect(); - all_kmers - .chunks(BATCH) - .map(|c| (col_offset, src_n, Arc::clone(&src_data), c.to_vec())) - .collect() + delta.send(count - 1).ok(); + } } - }, SrcLayer, RawBatch), + ) as SharedFlatFn), make_transform!(Pass2Data, { move |(col_offset, src_n, src_data, kmers): (usize, usize, Arc, Vec)| -> Vec<(Option, usize, usize, u32)> -- 2.52.0 From 5f98d2ef96b349986964985a26aa1bd0e646bb81 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sat, 13 Jun 2026 10:09:07 +0200 Subject: [PATCH 2/6] refactor: replace explicit collect with Unitig::from_nucleotides Introduce a thread-local buffer to materialize nucleotide iterators into contiguous slices. Update `try_for_each_unitig` across the debruijn, index, merge, and rebuild layers to directly instantiate `Unitig` via `from_nucleotides()` instead of explicitly collecting iterators. This eliminates intermediate allocations and aligns test code with the new approach. --- src/obidebruinj/src/debruijn.rs | 17 +++++++++++++---- src/obidebruinj/src/tests/debruijn.rs | 4 ++-- src/obikpartitionner/src/index_layer.rs | 5 ++--- src/obikpartitionner/src/merge_layer.rs | 5 ++--- src/obikpartitionner/src/rebuild_layer.rs | 5 ++--- 5 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index cebe17c..865567e 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -3,6 +3,7 @@ use hashbrown::HashMap; use obikseq::k; use obikseq::{CanonicalKmer, Sequence}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use std::cell::RefCell; use std::fmt; use std::sync::atomic::{AtomicU8, Ordering}; use xxhash_rust::xxh3::Xxh3Builder; @@ -450,17 +451,25 @@ impl GraphDeBruijn { pub fn try_for_each_unitig(&self, f: F) -> Result<(), E> where E: Send, - F: FnMut(UnitigNucIter<'_>) -> Result<(), E> + Send, + F: FnMut(&[u8]) -> Result<(), E> + Send, { + thread_local! { + static BUF: std::cell::RefCell> = RefCell::new(Vec::with_capacity(4096)); + } let error = std::sync::Mutex::new(None::); let f = std::sync::Mutex::new(f); self.for_each_unitig(|iter| { if error.lock().unwrap().is_some() { return; } - if let Err(e) = f.lock().unwrap()(iter) { - *error.lock().unwrap() = Some(e); - } + BUF.with(|buf| { + let mut buf = buf.borrow_mut(); + buf.clear(); + buf.extend(iter); + if let Err(e) = f.lock().unwrap()(&buf) { + *error.lock().unwrap() = Some(e); + } + }); }); error.into_inner().unwrap().map_or(Ok(()), Err) } diff --git a/src/obidebruinj/src/tests/debruijn.rs b/src/obidebruinj/src/tests/debruijn.rs index 7b911b3..17f4b2f 100644 --- a/src/obidebruinj/src/tests/debruijn.rs +++ b/src/obidebruinj/src/tests/debruijn.rs @@ -24,8 +24,8 @@ fn canonical_kmers(seq: &[u8]) -> Vec { fn collect_unitigs(g: &GraphDeBruijn) -> Vec { let mut unitigs = Vec::new(); - g.try_for_each_unitig(|nuc_iter| -> Result<(), std::convert::Infallible> { - unitigs.push(nuc_iter.collect()); + g.try_for_each_unitig(|nucs| -> Result<(), std::convert::Infallible> { + unitigs.push(Unitig::from_nucleotides(nucs)); Ok(()) }) .unwrap(); diff --git a/src/obikpartitionner/src/index_layer.rs b/src/obikpartitionner/src/index_layer.rs index 1b097a0..acc8623 100644 --- a/src/obikpartitionner/src/index_layer.rs +++ b/src/obikpartitionner/src/index_layer.rs @@ -107,9 +107,8 @@ impl KmerPartition { fs::create_dir_all(&layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?; - g.try_for_each_unitig(|nuc_iter| { - let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); - uw.write(&unitig) + g.try_for_each_unitig(|nucs| { + uw.write(&obikseq::unitig::Unitig::from_nucleotides(nucs)) })?; uw.close()?; diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 2b67dd4..d38042a 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -307,9 +307,8 @@ impl KmerPartition { g.compute_degrees_and_mark_starts(); fs::create_dir_all(&new_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; - g.try_for_each_unitig(|nuc_iter| { - let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); - uw.write(&unitig) + g.try_for_each_unitig(|nucs| { + uw.write(&obikseq::unitig::Unitig::from_nucleotides(nucs)) })?; uw.close()?; let n = g.len(); diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs index 7c39246..1a4c238 100644 --- a/src/obikpartitionner/src/rebuild_layer.rs +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -168,9 +168,8 @@ impl KmerPartition { fs::create_dir_all(&dst_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; - g.try_for_each_unitig(|nuc_iter| { - let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); - uw.write(&unitig) + g.try_for_each_unitig(|nucs| { + uw.write(&obikseq::unitig::Unitig::from_nucleotides(nucs)) })?; uw.close()?; drop(g); -- 2.52.0 From 1f336fe496c2a4cc9e457db69512b1580c6a7bba Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sat, 13 Jun 2026 10:34:31 +0200 Subject: [PATCH 3/6] refactor: replace mutex with channels for parallel debruijn processing Add `rayon` and `crossbeam-channel` dependencies to support concurrent execution. Replace the synchronous, mutex-protected closure pattern with a channel-based producer-consumer approach using `std::thread::scope`. This decouples unitig iteration from processing, eliminating lock contention and `Mutex` overhead while enabling parallel workloads. --- src/Cargo.lock | 1 + src/obidebruinj/Cargo.toml | 3 ++- src/obidebruinj/src/debruijn.rs | 37 ++++++++++++++++++++------------- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/Cargo.lock b/src/Cargo.lock index 3108163..39706b2 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1486,6 +1486,7 @@ name = "obidebruinj" version = "0.1.0" dependencies = [ "ahash", + "crossbeam-channel", "hashbrown 0.14.5", "obifastwrite", "obikseq", diff --git a/src/obidebruinj/Cargo.toml b/src/obidebruinj/Cargo.toml index acbc742..feb5848 100644 --- a/src/obidebruinj/Cargo.toml +++ b/src/obidebruinj/Cargo.toml @@ -8,7 +8,8 @@ obikseq = { path = "../obikseq" } obifastwrite = { path = "../obifastwrite" } ahash = "0.8" hashbrown = { version = "0.14", features = ["rayon"] } -rayon = "1" +rayon = "1" +crossbeam-channel = "0.5" xxhash-rust = { version = "0.8.15", features = ["xxh3", "const_xxh3"] } tracing = "0.1" diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index 865567e..ed52c4d 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -5,6 +5,7 @@ use obikseq::{CanonicalKmer, Sequence}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::cell::RefCell; use std::fmt; +use crossbeam_channel; use std::sync::atomic::{AtomicU8, Ordering}; use xxhash_rust::xxh3::Xxh3Builder; use std::time::Instant; @@ -454,24 +455,30 @@ impl GraphDeBruijn { F: FnMut(&[u8]) -> Result<(), E> + Send, { thread_local! { - static BUF: std::cell::RefCell> = RefCell::new(Vec::with_capacity(4096)); + static BUF: RefCell> = RefCell::new(Vec::with_capacity(4096)); } - let error = std::sync::Mutex::new(None::); - let f = std::sync::Mutex::new(f); - self.for_each_unitig(|iter| { - if error.lock().unwrap().is_some() { - return; - } - BUF.with(|buf| { - let mut buf = buf.borrow_mut(); - buf.clear(); - buf.extend(iter); - if let Err(e) = f.lock().unwrap()(&buf) { - *error.lock().unwrap() = Some(e); + let (tx, rx) = crossbeam_channel::bounded::>(rayon::current_num_threads() * 256); + std::thread::scope(|s| { + let writer = s.spawn(move || -> Result<(), E> { + let mut f = f; + for nucs in rx { + f(&nucs)?; } + Ok(()) }); - }); - error.into_inner().unwrap().map_or(Ok(()), Err) + self.for_each_unitig(|iter| { + BUF.with(|buf| { + let mut buf = buf.borrow_mut(); + buf.clear(); + buf.extend(iter); + let to_send = buf.clone(); + buf.clear(); + tx.send(to_send).ok(); + }); + }); + drop(tx); + writer.join().expect("writer thread panicked") + }) } pub fn len(&self) -> usize { -- 2.52.0 From fb8c6e427cc1974927cb2f30dfc86c5846faaa1c Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sat, 13 Jun 2026 10:46:35 +0200 Subject: [PATCH 4/6] refactor: pass Unitig objects directly instead of raw byte slices Refactored `try_for_each_unitig` and related pipelines across `obidebruinj` and `obikpartitionner` to accept `Unitig` instances directly. This eliminates manual `Unitig::from_nucleotides()` conversions, simplifies the data flow, and reduces unnecessary allocation overhead. --- src/obidebruinj/src/debruijn.rs | 14 ++++++-------- src/obidebruinj/src/tests/debruijn.rs | 4 ++-- src/obikpartitionner/src/index_layer.rs | 4 ++-- src/obikpartitionner/src/merge_layer.rs | 4 ++-- src/obikpartitionner/src/rebuild_layer.rs | 4 ++-- 5 files changed, 14 insertions(+), 16 deletions(-) diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index ed52c4d..a435d83 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -1,7 +1,7 @@ //use ahash::RandomState; use hashbrown::HashMap; use obikseq::k; -use obikseq::{CanonicalKmer, Sequence}; +use obikseq::{CanonicalKmer, Sequence, Unitig}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::cell::RefCell; use std::fmt; @@ -452,17 +452,17 @@ impl GraphDeBruijn { pub fn try_for_each_unitig(&self, f: F) -> Result<(), E> where E: Send, - F: FnMut(&[u8]) -> Result<(), E> + Send, + F: FnMut(&Unitig) -> Result<(), E> + Send, { thread_local! { static BUF: RefCell> = RefCell::new(Vec::with_capacity(4096)); } - let (tx, rx) = crossbeam_channel::bounded::>(rayon::current_num_threads() * 256); + let (tx, rx) = crossbeam_channel::bounded::(rayon::current_num_threads() * 256); std::thread::scope(|s| { let writer = s.spawn(move || -> Result<(), E> { let mut f = f; - for nucs in rx { - f(&nucs)?; + for unitig in rx { + f(&unitig)?; } Ok(()) }); @@ -471,9 +471,7 @@ impl GraphDeBruijn { let mut buf = buf.borrow_mut(); buf.clear(); buf.extend(iter); - let to_send = buf.clone(); - buf.clear(); - tx.send(to_send).ok(); + tx.send(Unitig::from_nucleotides(&buf)).ok(); }); }); drop(tx); diff --git a/src/obidebruinj/src/tests/debruijn.rs b/src/obidebruinj/src/tests/debruijn.rs index 17f4b2f..a315ce7 100644 --- a/src/obidebruinj/src/tests/debruijn.rs +++ b/src/obidebruinj/src/tests/debruijn.rs @@ -24,8 +24,8 @@ fn canonical_kmers(seq: &[u8]) -> Vec { fn collect_unitigs(g: &GraphDeBruijn) -> Vec { let mut unitigs = Vec::new(); - g.try_for_each_unitig(|nucs| -> Result<(), std::convert::Infallible> { - unitigs.push(Unitig::from_nucleotides(nucs)); + g.try_for_each_unitig(|unitig| -> Result<(), std::convert::Infallible> { + unitigs.push(unitig.clone()); Ok(()) }) .unwrap(); diff --git a/src/obikpartitionner/src/index_layer.rs b/src/obikpartitionner/src/index_layer.rs index acc8623..8eed6db 100644 --- a/src/obikpartitionner/src/index_layer.rs +++ b/src/obikpartitionner/src/index_layer.rs @@ -107,8 +107,8 @@ impl KmerPartition { fs::create_dir_all(&layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?; - g.try_for_each_unitig(|nucs| { - uw.write(&obikseq::unitig::Unitig::from_nucleotides(nucs)) + g.try_for_each_unitig(|unitig| { + uw.write(unitig) })?; uw.close()?; diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index d38042a..494cb71 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -307,8 +307,8 @@ impl KmerPartition { g.compute_degrees_and_mark_starts(); fs::create_dir_all(&new_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; - g.try_for_each_unitig(|nucs| { - uw.write(&obikseq::unitig::Unitig::from_nucleotides(nucs)) + g.try_for_each_unitig(|unitig| { + uw.write(unitig) })?; uw.close()?; let n = g.len(); diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs index 1a4c238..ea09536 100644 --- a/src/obikpartitionner/src/rebuild_layer.rs +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -168,8 +168,8 @@ impl KmerPartition { fs::create_dir_all(&dst_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; - g.try_for_each_unitig(|nucs| { - uw.write(&obikseq::unitig::Unitig::from_nucleotides(nucs)) + g.try_for_each_unitig(|unitig| { + uw.write(unitig) })?; uw.close()?; drop(g); -- 2.52.0 From bc14346f5fe8a687d2181d6d2732a0493e0052f8 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sat, 13 Jun 2026 11:32:12 +0200 Subject: [PATCH 5/6] feat: add CPU-aware parallel worker pool for partition merging Introduce CpuSample to measure process-level CPU efficiency and wall-clock time. Use crossbeam-channel to distribute partition merging tasks to a dynamic worker pool that scales based on CPU utilization, capped at half the available cores. Update diagnostics to track pool usage. --- src/Cargo.lock | 1 + src/obikindex/Cargo.toml | 3 +- src/obikindex/src/merge.rs | 120 +++++++++++++++++++++++++++---------- src/obisys/src/lib.rs | 33 ++++++++++ 4 files changed, 123 insertions(+), 34 deletions(-) diff --git a/src/Cargo.lock b/src/Cargo.lock index 39706b2..cfe335d 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1507,6 +1507,7 @@ dependencies = [ name = "obikindex" version = "0.1.0" dependencies = [ + "crossbeam-channel", "indicatif", "ndarray", "obicompactvec", diff --git a/src/obikindex/Cargo.toml b/src/obikindex/Cargo.toml index d670480..3f89bb0 100644 --- a/src/obikindex/Cargo.toml +++ b/src/obikindex/Cargo.toml @@ -11,7 +11,8 @@ obisys = { path = "../obisys" } obicompactvec = { path = "../obicompactvec" } obilayeredmap = { path = "../obilayeredmap" } ndarray = "0.16" -rayon = "1" +rayon = "1" +crossbeam-channel = "0.5" serde = { version = "1", features = ["derive"] } serde_json = "1" indicatif = "0.17" diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index 0380e98..fa990d8 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -2,8 +2,10 @@ use std::collections::HashMap; use std::fs; use std::io; use std::path::Path; +use std::time::{Duration, Instant}; -use obisys::{Reporter, Stage, progress_bar, spinner}; +use crossbeam_channel::unbounded; +use obisys::{CpuSample, Reporter, Stage, progress_bar, spinner}; use tracing::{debug, info}; use obilayeredmap::IndexMode; @@ -191,48 +193,97 @@ impl KmerIndex { let mut order: Vec = (0..n_partitions).collect(); order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i])); - // ── First partition (largest) ───────────────────────────────────── - let worst_id = order[0]; - let worst_bytes = partition_sizes[worst_id]; + // ── Adaptive worker pool ────────────────────────────────────────── + // Start with 1 worker thread. After each completed partition, + // measure CPU efficiency (via getrusage delta). If efficiency is + // below the spawn threshold and more partitions remain, spawn one + // additional worker. Workers share a crossbeam channel of partition + // IDs; each reports (id, g_len, duration) on a result channel. + const SPAWN_THRESHOLD: f64 = 0.95; // spawn when >5% capacity idle + let n_cores = std::thread::available_parallelism() + .map(|n| n.get()).unwrap_or(1); + let max_workers = (n_cores / 2).max(1); + let _ = budget_fraction; // kept in signature for CLI compatibility - let worst_g_len = dst_partition - .merge_partition(worst_id, &srcs, mode, n_dst_genomes, block_bits, &evidence) - .map_err(OKIError::Partition)?; - pb.inc(1); + let (part_tx, part_rx) = unbounded::(); + let (result_tx, result_rx) = + unbounded::<(usize, Result, Duration)>(); + // activate_tx: controller sends () to wake the next dormant worker. + // Dropping activate_tx closes the channel; dormant workers exit. + let (activate_tx, activate_rx) = unbounded::<()>(); - info!( - "merge_partitions: first partition {} — {} unitig bytes → {} new kmers", - worst_id, fmt_bytes(worst_bytes), worst_g_len, - ); + for &i in &order { + part_tx.send(i).ok(); + } + drop(part_tx); let mut part_stats: Vec = Vec::with_capacity(n_partitions); - part_stats.push(PartStat { - id: worst_id, - unitig_bytes: worst_bytes, - g_len: worst_g_len, - }); + let mut n_workers = 0usize; + let mut cpu_sample = CpuSample::now(); - // ── Sequential remainder ────────────────────────────────────────── - // One partition at a time; each partition uses an internal pipeline - // (obipipeline) to parallelise file I/O and dst_map filtering. - let _ = budget_fraction; // kept in signature for CLI compatibility - for &i in &order[1..] { - let ubytes = partition_sizes[i]; - debug!("partition {i}: start — {} unitig bytes", fmt_bytes(ubytes)); + // Shadow as references so closures can capture them by copy. + let srcs = &srcs; + let evidence = &evidence; - let g_len = dst_partition - .merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence) - .map_err(OKIError::Partition)?; - pb.inc(1); + std::thread::scope(|s| -> OKIResult<()> { + // Pre-spawn max_workers threads; each waits for an activation + // signal before consuming from part_rx. + for _ in 0..max_workers { + let prx = part_rx.clone(); + let rtx = result_tx.clone(); + let arx = activate_rx.clone(); + s.spawn(move || { + if arx.recv().is_ok() { + for i in &prx { + let t = Instant::now(); + let r = dst_partition.merge_partition( + i, srcs, mode, n_dst_genomes, block_bits, evidence, + ); + rtx.send((i, r, t.elapsed())).ok(); + } + } + }); + } + drop(result_tx); - debug!("partition {i}: done — {} new kmers", g_len); - part_stats.push(PartStat { id: i, unitig_bytes: ubytes, g_len }); - } + // Activate first worker immediately. + activate_tx.send(()).ok(); + n_workers = 1; + + let mut completed = 0usize; + while completed < n_partitions { + let (i, r, dur) = result_rx.recv() + .map_err(|_| OKIError::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, "worker channel closed")))?; + let g_len = r.map_err(OKIError::Partition)?; + pb.inc(1); + debug!("partition {i}: done in {:.1}s — {} new kmers", + dur.as_secs_f64(), g_len); + part_stats.push(PartStat { + id: i, unitig_bytes: partition_sizes[i], g_len, + }); + completed += 1; + + if n_workers < max_workers && completed < n_partitions { + let eff = cpu_sample.cpu_efficiency(n_cores); + if eff < SPAWN_THRESHOLD { + activate_tx.send(()).ok(); + n_workers += 1; + cpu_sample = CpuSample::now(); + debug!("activated worker {n_workers} — efficiency {:.0}%", + eff * 100.0); + } + } + } + // Close activate_tx: dormant workers exit cleanly. + drop(activate_tx); + Ok(()) + })?; pb.finish_and_clear(); // ── Diagnostic report ───────────────────────────────────────────── - print_merge_partition_report(&part_stats); + print_merge_partition_report(&part_stats, n_workers, max_workers); rep.push(t.stop()); } @@ -254,7 +305,7 @@ impl KmerIndex { // ── Diagnostic report ───────────────────────────────────────────────────────── -fn print_merge_partition_report(stats: &[PartStat]) { +fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_workers: usize) { let total_new: usize = stats.iter().map(|s| s.g_len).sum(); let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count(); @@ -268,6 +319,9 @@ fn print_merge_partition_report(stats: &[PartStat]) { " {} partition(s) processed, {} total new kmers", non_empty, total_new, ); + info!( + " workers spawned: {n_workers} / {max_workers} (max)", + ); // Top 8 partitions by new-kmer count let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect(); diff --git a/src/obisys/src/lib.rs b/src/obisys/src/lib.rs index 5f8a3d3..aeccb1b 100644 --- a/src/obisys/src/lib.rs +++ b/src/obisys/src/lib.rs @@ -212,6 +212,39 @@ fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 } // Monotonically increasing counters — negative delta would be a kernel bug. fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 } +// ── CpuSample ───────────────────────────────────────────────────────────────── + +/// Snapshot of process-wide CPU time + wall clock at a point in time. +/// Use [`cpu_efficiency`](Self::cpu_efficiency) to measure the fraction of +/// available cores used since the snapshot was taken. +pub struct CpuSample { + wall: Instant, + user_secs: f64, + sys_secs: f64, +} + +impl CpuSample { + pub fn now() -> Self { + let ru = get_rusage(); + Self { + wall: Instant::now(), + user_secs: tv_to_secs(ru.ru_utime), + sys_secs: tv_to_secs(ru.ru_stime), + } + } + + /// (user_delta + sys_delta) / (wall_delta × n_cores) since this snapshot. + /// Returns 0.0 if less than 100 ms have elapsed (too noisy). + pub fn cpu_efficiency(&self, n_cores: usize) -> f64 { + let ru = get_rusage(); + let wall = self.wall.elapsed().as_secs_f64(); + if wall < 0.1 { return 0.0; } + let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs) + + (tv_to_secs(ru.ru_stime) - self.sys_secs); + cpu / (wall * n_cores as f64) + } +} + // ── public API ──────────────────────────────────────────────────────────────── /// Snapshot taken at the start of a pipeline stage. -- 2.52.0 From fddf630772ab6b9954154c41292f6bf30ec09eb0 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sat, 13 Jun 2026 11:41:14 +0200 Subject: [PATCH 6/6] style: apply consistent formatting and whitespace normalization Applies consistent formatting, whitespace normalization, and indentation standardization to `debruijn.rs` and `merge.rs`. Reorganizes imports and downgrades a unitig traversal log from `info!` to `debug!`. No functional logic or runtime behavior is altered. --- src/obidebruinj/src/debruijn.rs | 101 ++++++++++++++---- src/obikindex/src/merge.rs | 176 +++++++++++++++++++++++--------- 2 files changed, 209 insertions(+), 68 deletions(-) diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index a435d83..23c1a1c 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -1,15 +1,15 @@ //use ahash::RandomState; +use crossbeam_channel; use hashbrown::HashMap; use obikseq::k; use obikseq::{CanonicalKmer, Sequence, Unitig}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::cell::RefCell; use std::fmt; -use crossbeam_channel; use std::sync::atomic::{AtomicU8, Ordering}; -use xxhash_rust::xxh3::Xxh3Builder; use std::time::Instant; use tracing::{debug, info}; +use xxhash_rust::xxh3::Xxh3Builder; // ── Types ───────────────────────────────────────────────────────────────────── @@ -99,7 +99,6 @@ impl Node { (self.0 >> 5) & 0b11 } - /// Marks the node as visited. #[inline] pub fn set_visited(&mut self) { @@ -180,8 +179,12 @@ impl WalkState { } pub fn reachable(&self, graph: &GraphDeBruijn) -> bool { - WalkState { kmer: self.kmer, node: self.node, direct: !self.direct } - .leavable(graph) + WalkState { + kmer: self.kmer, + node: self.node, + direct: !self.direct, + } + .leavable(graph) } pub fn walk(&self, graph: &GraphDeBruijn) -> Option<(WalkState, u8)> { @@ -197,8 +200,19 @@ impl WalkState { if next_node.is_visited() { return None; } - let reachable = if dnext { next_node.can_extend_left() } else { next_node.can_extend_right() }; - reachable.then_some((WalkState { kmer: cnext, node: next_node, direct: dnext }, nuc)) + let reachable = if dnext { + next_node.can_extend_left() + } else { + next_node.can_extend_right() + }; + reachable.then_some(( + WalkState { + kmer: cnext, + node: next_node, + direct: dnext, + }, + nuc, + )) } else { if !self.node.can_extend_left() { return None; @@ -211,8 +225,19 @@ impl WalkState { if next_node.is_visited() { return None; } - let reachable = if dnext { next_node.can_extend_right() } else { next_node.can_extend_left() }; - reachable.then_some((WalkState { kmer: cnext, node: next_node, direct: dnext }, 3 - nuc)) + let reachable = if dnext { + next_node.can_extend_right() + } else { + next_node.can_extend_left() + }; + reachable.then_some(( + WalkState { + kmer: cnext, + node: next_node, + direct: dnext, + }, + 3 - nuc, + )) } } } @@ -275,7 +300,11 @@ impl GraphDeBruijn { node.set_left(lc, ln); atomic.store(node.0, Ordering::Relaxed); }); - debug!("[compute_degrees] pass 1 (degrees): {:?} — {} nodes", t1.elapsed(), self.nodes.len()); + debug!( + "[compute_degrees] pass 1 (degrees): {:?} — {} nodes", + t1.elapsed(), + self.nodes.len() + ); // Pass 2: mark start nodes @@ -290,7 +319,11 @@ impl GraphDeBruijn { atomic.store(node.0, Ordering::Relaxed); } }); - debug!("[compute_degrees] pass 2 (starts): {:?} — {} nodes", t2.elapsed(), self.nodes.len()); + debug!( + "[compute_degrees] pass 2 (starts): {:?} — {} nodes", + t2.elapsed(), + self.nodes.len() + ); } pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option { @@ -328,14 +361,28 @@ impl GraphDeBruijn { } fn unitig_nucleotides(&self, kmer: CanonicalKmer, k: usize) -> Option> { - let old = self.nodes.get(&kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); - if old & IS_VISITED_MASK != 0 { return None; } + let old = self + .nodes + .get(&kmer)? + .fetch_or(IS_VISITED_MASK, Ordering::AcqRel); + if old & IS_VISITED_MASK != 0 { + return None; + } let start = WalkState::new(kmer, Node(old), true); let next_step = start.walk(self).and_then(|(next_state, nuc)| { - let ext_old = self.nodes.get(&next_state.kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); + let ext_old = self + .nodes + .get(&next_state.kmer)? + .fetch_or(IS_VISITED_MASK, Ordering::AcqRel); (ext_old & IS_VISITED_MASK == 0).then_some((next_state, nuc)) }); - Some(UnitigNucIter { graph: self, start: kmer, pos: 0, k, next_step }) + Some(UnitigNucIter { + graph: self, + start: kmer, + pos: 0, + k, + next_step, + }) } pub fn for_each_unitig(&self, f: impl Fn(UnitigNucIter<'_>) + Sync) { @@ -352,7 +399,9 @@ impl GraphDeBruijn { self.nodes .par_iter() .filter_map(|(&kmer, atomic)| { - Node(atomic.load(Ordering::Acquire)).is_start().then_some(kmer) + Node(atomic.load(Ordering::Acquire)) + .is_start() + .then_some(kmer) }) .for_each(|kmer| { if let Some(iter) = self.unitig_nucleotides(kmer, k) { @@ -403,10 +452,10 @@ impl GraphDeBruijn { } } - info!( + debug!( chains = n_chains.load(Ordering::Relaxed), phase2 = n2.load(Ordering::Relaxed), - total = n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed), + total = n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed), "unitig traversal complete" ); } @@ -508,7 +557,11 @@ impl Iterator for UnitigNucIter<'_> { Some(nuc) } else if let Some((state, nuc)) = self.next_step.take() { self.next_step = state.walk(self.graph).and_then(|(next_state, next_nuc)| { - let old = self.graph.nodes.get(&next_state.kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); + let old = self + .graph + .nodes + .get(&next_state.kmer)? + .fetch_or(IS_VISITED_MASK, Ordering::AcqRel); (old & IS_VISITED_MASK == 0).then_some((next_state, next_nuc)) }); Some(nuc) @@ -539,10 +592,16 @@ fn count_neighbors( } nuc = i as u8; count += 1; - if count >= 2 { return (2, None); } + if count >= 2 { + return (2, None); + } } } - if count == 1 { (1, Some(nuc)) } else { (0, None) } + if count == 1 { + (1, Some(nuc)) + } else { + (0, None) + } } // ── tests ───────────────────────────────────────────────────────────────────── diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index fa990d8..f32fc8f 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -21,9 +21,9 @@ pub use obikpartitionner::MergeMode; #[derive(Debug)] struct PartStat { - id: usize, + id: usize, unitig_bytes: u64, - g_len: usize, + g_len: usize, } // ── main merge entry point ──────────────────────────────────────────────────── @@ -53,9 +53,9 @@ impl KmerIndex { if src.state() != IndexState::Indexed { return Err(OKIError::NotIndexed(src.root_path.clone())); } - if src.kmer_size() != ref0.kmer_size() - || src.minimizer_size() != ref0.minimizer_size() - || src.n_partitions() != ref0.n_partitions() + if src.kmer_size() != ref0.kmer_size() + || src.minimizer_size() != ref0.minimizer_size() + || src.n_partitions() != ref0.n_partitions() { return Err(OKIError::IncompatibleConfig); } @@ -65,39 +65,65 @@ impl KmerIndex { } // ── Log source characteristics and choose base ──────────────────────── - let mode_str = if mode == MergeMode::Presence { "presence" } else { "count" }; + let mode_str = if mode == MergeMode::Presence { + "presence" + } else { + "count" + }; info!( "merge: {} source(s), smer-size={}, mode={}", - sources.len(), sources[0].kmer_size(), mode_str, + sources.len(), + sources[0].kmer_size(), + mode_str, ); for (i, src) in sources.iter().enumerate() { - let genome_str = if src.meta.genomes.len() == 1 { "mono-genome".to_string() } - else { format!("{} genomes", src.meta.genomes.len()) }; - let trivial_str = if is_trivial(src, mode) { " [trivial: no data approximation]" } else { "" }; + let genome_str = if src.meta.genomes.len() == 1 { + "mono-genome".to_string() + } else { + format!("{} genomes", src.meta.genomes.len()) + }; + let trivial_str = if is_trivial(src, mode) { + " [trivial: no data approximation]" + } else { + "" + }; info!( " [{}] {} — {}, {}, {}{}", - i, src.root_path.display(), + i, + src.root_path.display(), format_evidence(&src.meta.config.evidence), - genome_str, mode_str, trivial_str, + genome_str, + mode_str, + trivial_str, ); } let base_idx = choose_base(sources, mode); let needs_approx = sources.iter().any(|src| { !is_trivial(src, mode) - && matches!(src.meta.config.evidence, IndexMode::Approx { .. } | IndexMode::Hybrid { .. }) + && matches!( + src.meta.config.evidence, + IndexMode::Approx { .. } | IndexMode::Hybrid { .. } + ) }); info!( "output evidence: {} ({}base: [{}] {})", format_evidence(&sources[base_idx].meta.config.evidence), - if needs_approx { "forced approx — " } else { "" }, - base_idx, sources[base_idx].root_path.display(), + if needs_approx { + "forced approx — " + } else { + "" + }, + base_idx, + sources[base_idx].root_path.display(), ); let mut ordered: Vec<&KmerIndex> = Vec::with_capacity(sources.len()); ordered.push(sources[base_idx]); for (i, &src) in sources.iter().enumerate() { - if i != base_idx { ordered.push(src); } + if i != base_idx { + ordered.push(src); + } } let sources: &[&KmerIndex] = &ordered; let evidence = sources[0].meta.config.evidence.clone(); @@ -151,7 +177,8 @@ impl KmerIndex { fs::remove_dir_all(&spectrums_dir)?; } for (src, new_labels) in sources.iter().zip(&source_labels) { - let old_labels: Vec = src.meta.genomes.iter().map(|g| g.label.clone()).collect(); + let old_labels: Vec = + src.meta.genomes.iter().map(|g| g.label.clone()).collect(); copy_spectrums(&src.root_path, output, &old_labels, new_labels)?; } pb.finish_and_clear(); @@ -184,9 +211,12 @@ impl KmerIndex { // Per-partition unitig byte sizes across remaining sources (stat() only) let partition_sizes: Vec = (0..n_partitions) - .map(|i| remaining_sources.iter() - .map(|s| partition_unitig_bytes(s, i)) - .sum()) + .map(|i| { + remaining_sources + .iter() + .map(|s| partition_unitig_bytes(s, i)) + .sum() + }) .collect(); // LFD sort: largest partition first @@ -201,7 +231,8 @@ impl KmerIndex { // IDs; each reports (id, g_len, duration) on a result channel. const SPAWN_THRESHOLD: f64 = 0.95; // spawn when >5% capacity idle let n_cores = std::thread::available_parallelism() - .map(|n| n.get()).unwrap_or(1); + .map(|n| n.get()) + .unwrap_or(1); let max_workers = (n_cores / 2).max(1); let _ = budget_fraction; // kept in signature for CLI compatibility @@ -220,6 +251,9 @@ impl KmerIndex { let mut part_stats: Vec = Vec::with_capacity(n_partitions); let mut n_workers = 0usize; let mut cpu_sample = CpuSample::now(); + // Efficiency measured just before each spawn, used to assess + // whether the previous worker delivered its expected marginal gain. + let mut efficiency_at_last_spawn = 0.0f64; // Shadow as references so closures can capture them by copy. let srcs = &srcs; @@ -237,7 +271,12 @@ impl KmerIndex { for i in &prx { let t = Instant::now(); let r = dst_partition.merge_partition( - i, srcs, mode, n_dst_genomes, block_bits, evidence, + i, + srcs, + mode, + n_dst_genomes, + block_bits, + evidence, ); rtx.send((i, r, t.elapsed())).ok(); } @@ -252,26 +291,51 @@ impl KmerIndex { let mut completed = 0usize; while completed < n_partitions { - let (i, r, dur) = result_rx.recv() - .map_err(|_| OKIError::Io(io::Error::new( - io::ErrorKind::UnexpectedEof, "worker channel closed")))?; + let (i, r, dur) = result_rx.recv().map_err(|_| { + OKIError::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "worker channel closed", + )) + })?; let g_len = r.map_err(OKIError::Partition)?; pb.inc(1); - debug!("partition {i}: done in {:.1}s — {} new kmers", - dur.as_secs_f64(), g_len); + debug!( + "partition {i}: done in {:.1}s — {} new kmers", + dur.as_secs_f64(), + g_len + ); part_stats.push(PartStat { - id: i, unitig_bytes: partition_sizes[i], g_len, + id: i, + unitig_bytes: partition_sizes[i], + g_len, }); completed += 1; if n_workers < max_workers && completed < n_partitions { let eff = cpu_sample.cpu_efficiency(n_cores); - if eff < SPAWN_THRESHOLD { + // For the first spawn use SPAWN_THRESHOLD. + // For subsequent spawns: the previous worker should + // have raised efficiency by at least a quarter of the expected + // marginal gain (1/n_workers). If not, adding another + // worker won't help. + let should_spawn = if n_workers == 1 { + eff < SPAWN_THRESHOLD + } else { + let gain = eff - efficiency_at_last_spawn; + let expected = 1.0 / n_workers as f64; + gain >= expected * 0.25 + }; + if should_spawn { + debug!( + "activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%", + n_workers + 1, + eff * 100.0, + (eff - efficiency_at_last_spawn) * 100.0, + ); + efficiency_at_last_spawn = eff; activate_tx.send(()).ok(); n_workers += 1; cpu_sample = CpuSample::now(); - debug!("activated worker {n_workers} — efficiency {:.0}%", - eff * 100.0); } } } @@ -319,9 +383,7 @@ fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_worker " {} partition(s) processed, {} total new kmers", non_empty, total_new, ); - info!( - " workers spawned: {n_workers} / {max_workers} (max)", - ); + info!(" workers spawned: {n_workers} / {max_workers} (max)",); // Top 8 partitions by new-kmer count let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect(); @@ -343,10 +405,15 @@ fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_worker // ── helpers ─────────────────────────────────────────────────────────────────── fn fmt_bytes(b: u64) -> String { - if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) } - else if b >= 1 << 20 { format!("{:.1} MB", b as f64 / (1u64 << 20) as f64) } - else if b >= 1 << 10 { format!("{:.1} KB", b as f64 / (1u64 << 10) as f64) } - else { format!("{b} B") } + if b >= 1 << 30 { + format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) + } else if b >= 1 << 20 { + format!("{:.1} MB", b as f64 / (1u64 << 20) as f64) + } else if b >= 1 << 10 { + format!("{:.1} KB", b as f64 / (1u64 << 10) as f64) + } else { + format!("{b} B") + } } /// Sum of all unitigs.bin sizes across all layers of partition `i` in `src`. @@ -354,8 +421,12 @@ fn partition_unitig_bytes(src: &KmerIndex, i: usize) -> u64 { let mut total = 0u64; for l in 0.. { let p = src.layer_unitigs_path(i, l); - if !p.exists() { break; } - if let Ok(m) = std::fs::metadata(&p) { total += m.len(); } + if !p.exists() { + break; + } + if let Ok(m) = std::fs::metadata(&p) { + total += m.len(); + } } total } @@ -382,7 +453,10 @@ fn compute_labels( }; *count += 1; labels.push(new_label.clone()); - all_genomes.push(GenomeInfo { label: new_label, meta: genome.meta.clone() }); + all_genomes.push(GenomeInfo { + label: new_label, + meta: genome.meta.clone(), + }); } source_labels.push(labels); } @@ -425,9 +499,9 @@ fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> { fn format_evidence(ev: &IndexMode) -> String { match ev { - IndexMode::Exact => "exact".to_string(), - IndexMode::Approx { b, z } => format!("approx (b={b}, z={z})"), - IndexMode::Hybrid { b, z } => format!("hybrid (b={b}, z={z})"), + IndexMode::Exact => "exact".to_string(), + IndexMode::Approx { b, z } => format!("approx (b={b}, z={z})"), + IndexMode::Hybrid { b, z } => format!("hybrid (b={b}, z={z})"), } } @@ -443,13 +517,21 @@ fn index_unitig_size(src: &KmerIndex) -> u64 { fn choose_base(sources: &[&KmerIndex], mode: MergeMode) -> usize { let needs_approx = sources.iter().any(|src| { !is_trivial(src, mode) - && matches!(src.meta.config.evidence, IndexMode::Approx { .. } | IndexMode::Hybrid { .. }) + && matches!( + src.meta.config.evidence, + IndexMode::Approx { .. } | IndexMode::Hybrid { .. } + ) }); - sources.iter().enumerate() + sources + .iter() + .enumerate() .filter(|(_, src)| { !needs_approx - || matches!(src.meta.config.evidence, IndexMode::Approx { .. } | IndexMode::Hybrid { .. }) + || matches!( + src.meta.config.evidence, + IndexMode::Approx { .. } | IndexMode::Hybrid { .. } + ) }) .max_by_key(|(_, src)| index_unitig_size(src)) .map(|(i, _)| i) -- 2.52.0