From 2f29ee22409042d2f8a7eef16feeae41296afdc3 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Thu, 4 Jun 2026 22:40:21 +0200 Subject: [PATCH] feat: Add parallel execution and thread-safe graph operations Integrate rayon to enable parallel processing of k-mer partitions and degree computation. Replace Cell with AtomicU8 to ensure thread-safe node state management, and add a merge method for combining disjoint graphs. Additionally, introduce progress tracking utilities and a test-utils feature flag for development dependencies. --- src/Cargo.lock | 2 + src/obidebruinj/Cargo.toml | 6 ++- src/obidebruinj/src/debruijn.rs | 68 ++++++++++++++++++--------- src/obidebruinj/src/tests/debruijn.rs | 2 +- src/obikmer/src/cmd/unitig.rs | 46 ++++++++++++------ 5 files changed, 85 insertions(+), 39 deletions(-) diff --git a/src/Cargo.lock b/src/Cargo.lock index e946c77..20e91ae 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -884,6 +884,7 @@ checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash", "allocator-api2", + "rayon", ] [[package]] @@ -1488,6 +1489,7 @@ dependencies = [ "hashbrown 0.14.5", "obifastwrite", "obikseq", + "rayon", "xxhash-rust", ] diff --git a/src/obidebruinj/Cargo.toml b/src/obidebruinj/Cargo.toml index 5fb8273..82e1cad 100644 --- a/src/obidebruinj/Cargo.toml +++ b/src/obidebruinj/Cargo.toml @@ -7,8 +7,12 @@ edition = "2021" obikseq = { path = "../obikseq" } obifastwrite = { path = "../obifastwrite" } ahash = "0.8" -hashbrown = "0.14" +hashbrown = { version = "0.14", features = ["rayon"] } +rayon = "1" xxhash-rust = { version = "0.8.15", features = ["xxh3", "const_xxh3"] } +[features] +test-utils = [] + [dev-dependencies] obikseq = { path = "../obikseq", features = ["test-utils"] } diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index a27de2a..1bef858 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -3,8 +3,9 @@ use hashbrown::HashMap; use obikseq::k; use obikseq::unitig::Unitig; use obikseq::{CanonicalKmer, Kmer, Sequence}; -use std::cell::Cell; +use rayon::prelude::*; use std::fmt; +use std::sync::atomic::{AtomicU8, Ordering}; use xxhash_rust::xxh3::Xxh3Builder; // ── Types ───────────────────────────────────────────────────────────────────── @@ -140,7 +141,7 @@ impl fmt::Display for Node { // ── GraphDeBruijn ───────────────────────────────────────────────────────────── pub struct GraphDeBruijn { - nodes: FastHashMap>, + nodes: FastHashMap, } impl GraphDeBruijn { @@ -158,24 +159,34 @@ impl GraphDeBruijn { /// Insert a canonical kmer into the graph. No-op if already present. pub fn push(&mut self, kmer: CanonicalKmer) { - self.nodes - .entry(kmer) - .or_insert_with(|| Cell::new(Node::default())); + self.nodes.entry(kmer).or_insert_with(|| AtomicU8::new(0)); } /// For every node, find its unique right/left canonical neighbour (if any) /// and store the nucleotide index in the Node flags. /// - /// Single pass thanks to Cell interior mutability. + /// In production builds, runs in parallel across all nodes (each entry is + /// written by exactly one thread). In test builds, runs sequentially to + /// avoid propagating thread-local k/m values to rayon worker threads. pub fn compute_degrees(&self) { - for (&kmer, cell) in &self.nodes { + #[cfg(not(any(test, feature = "test-utils")))] + self.nodes.par_iter().for_each(|(&kmer, atomic)| { let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes); let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes); - - let mut node = cell.get(); + let mut node = Node(atomic.load(Ordering::Relaxed)); node.set_right(rc, rn); node.set_left(lc, ln); - cell.set(node); + atomic.store(node.0, Ordering::Relaxed); + }); + + #[cfg(any(test, feature = "test-utils"))] + for (&kmer, atomic) in &self.nodes { + let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes); + let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes); + let mut node = Node(atomic.load(Ordering::Relaxed)); + node.set_right(rc, rn); + node.set_left(lc, ln); + atomic.store(node.0, Ordering::Relaxed); } } @@ -206,20 +217,20 @@ impl GraphDeBruijn { } pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option { - self.nodes.get(kmer).map(|cell| cell.get().is_visited()) + self.nodes.get(kmer).map(|a| Node(a.load(Ordering::Relaxed)).is_visited()) } pub fn set_visited(&self, kmer: CanonicalKmer) { - if let Some(cell) = self.nodes.get(&kmer) { - let mut node = cell.get(); + if let Some(a) = self.nodes.get(&kmer) { + let mut node = Node(a.load(Ordering::Relaxed)); node.set_visited(); - cell.set(node); + a.store(node.0, Ordering::Relaxed); } } /// Returns the single right neighbor of `kmer`, if it exists. pub fn the_single_right_neighbor(&self, kmer: CanonicalKmer) -> Option { - let node = self.nodes.get(&kmer)?.get(); + let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed)); if !node.can_extend_right() { return None; } @@ -229,7 +240,7 @@ impl GraphDeBruijn { /// Returns the single left neighbor of `kmer`, if it exists. pub fn the_single_left_neighbor(&self, kmer: CanonicalKmer) -> Option { - let node = self.nodes.get(&kmer)?.get(); + let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed)); if !node.can_extend_left() { return None; } @@ -253,7 +264,7 @@ impl GraphDeBruijn { fn next_unitig_kmer(&self, kmer: Kmer) -> Option { let canonical = kmer.canonical(); - let node = self.nodes.get(&canonical)?.get(); + let node = Node(self.nodes.get(&canonical)?.load(Ordering::Relaxed)); let direct = kmer.raw() == canonical.raw(); @@ -270,8 +281,8 @@ impl GraphDeBruijn { canonical.into_kmer().push_left(node.left_nuc()).canonical() }; - let cell = self.nodes.get(&next_c)?; - let next_node = cell.get(); + let atomic = self.nodes.get(&next_c)?; + let next_node = Node(atomic.load(Ordering::Relaxed)); if next_node.is_visited() { return None; } @@ -287,7 +298,7 @@ impl GraphDeBruijn { let mut updated = next_node; updated.set_visited(); - cell.set(updated); + atomic.store(updated.0, Ordering::Relaxed); Some(oriented) } @@ -311,6 +322,17 @@ impl GraphDeBruijn { }) } + /// Merge `other` into `self`. + /// + /// The caller guarantees that the two graphs have **disjoint** kmer sets — + /// this is structurally true when each graph was built from a distinct + /// minimizer partition. No conflict resolution is needed: entries are moved + /// directly without checking for duplicates. + pub fn merge(&mut self, other: Self) { + self.nodes.reserve(other.nodes.len()); + self.nodes.extend(other.nodes); + } + pub fn len(&self) -> usize { self.nodes.len() } @@ -323,7 +345,7 @@ impl GraphDeBruijn { // --- StartIter ----------------------------------------------------------------- struct StartIter<'a> { graph: &'a GraphDeBruijn, - nodes: hashbrown::hash_map::Iter<'a, CanonicalKmer, Cell>, + nodes: hashbrown::hash_map::Iter<'a, CanonicalKmer, AtomicU8>, suspended: Vec, in_cycle_pass: bool, } @@ -364,7 +386,7 @@ impl<'a> Iterator for StartIter<'a> { }; let node = match self.graph.nodes.get(¤t) { - Some(c) => c.get(), + Some(a) => Node(a.load(Ordering::Relaxed)), None => continue, }; if node.is_visited() { @@ -439,7 +461,7 @@ fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer { /// zero or ≥2 existing neighbours. fn count_neighbors( neighbors: [CanonicalKmer; 4], - nodes: &FastHashMap>, + nodes: &FastHashMap, ) -> (u8, Option) { let mut count = 0u8; let mut first = None; diff --git a/src/obidebruinj/src/tests/debruijn.rs b/src/obidebruinj/src/tests/debruijn.rs index 1ba2459..4169421 100644 --- a/src/obidebruinj/src/tests/debruijn.rs +++ b/src/obidebruinj/src/tests/debruijn.rs @@ -115,7 +115,7 @@ fn unitig_roundtrip_linear() { g.compute_degrees(); println!("Les kmers:"); for (kmer, v) in g.nodes.iter() { - println!("{}: {}", String::from_utf8_lossy(&kmer.to_ascii()), v.get()); + println!("{}: {}", String::from_utf8_lossy(&kmer.to_ascii()), v.load(std::sync::atomic::Ordering::Relaxed)); } println!("Les unitig:"); diff --git a/src/obikmer/src/cmd/unitig.rs b/src/obikmer/src/cmd/unitig.rs index 6c36bb6..f00eb8b 100644 --- a/src/obikmer/src/cmd/unitig.rs +++ b/src/obikmer/src/cmd/unitig.rs @@ -5,6 +5,8 @@ use clap::Args; use obidebruinj::GraphDeBruijn; use obifastwrite::write_unitig; use obikindex::KmerIndex; +use obisys::{progress_bar, spinner}; +use rayon::prelude::*; use tracing::info; use super::predicate::FilterArgs; @@ -32,25 +34,37 @@ pub fn run(args: UnitigArgs) { info!("unitig: building de Bruijn graph from {n} partition(s) (k={k})"); let filters = args.filter.build_filters(&idx.meta().genomes); + let partition = idx.partition(); - // ── Collect all filtered kmers into a single de Bruijn graph ───────────── - let mut g = GraphDeBruijn::new(); - - for i in 0..n { - idx.partition() - .iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| { - g.push(kmer); - }) - .unwrap_or_else(|e| { - eprintln!("error reading partition {i}: {e}"); - std::process::exit(1); - }); - } + // ── Phase 1 : collect filtered kmers in parallel ────────────────────────── + let pb = progress_bar("unitig", n as u64, "partitions"); + let g = (0..n) + .into_par_iter() + .fold( + GraphDeBruijn::new, + |mut local_g, i| { + partition + .iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| { + local_g.push(kmer); + }) + .unwrap_or_else(|e| { + eprintln!("error reading partition {i}: {e}"); + std::process::exit(1); + }); + pb.inc(1); + local_g + }, + ) + .reduce(GraphDeBruijn::new, |mut a, b| { a.merge(b); a }); + pb.finish_and_clear(); info!("unitig: {} distinct k-mers", g.len()); + + // ── Phase 2 : compute degrees (in-memory, no progress needed) ──────────── g.compute_degrees(); - // ── Enumerate unitigs and write as FASTA ────────────────────────────────── + // ── Phase 3 : enumerate unitigs and write as FASTA ──────────────────────── + let pb = spinner("unitig"); let stdout = io::stdout(); let mut out = BufWriter::new(stdout.lock()); @@ -59,7 +73,11 @@ pub fn run(args: UnitigArgs) { eprintln!("write error: {e}"); std::process::exit(1); }); + if j % 10_000 == 0 { + pb.set_message(format!("{j} unitigs written")); + } } + pb.finish_and_clear(); out.flush().expect("flush error"); }