feat: Add parallel execution and thread-safe graph operations
Integrate rayon to enable parallel processing of k-mer partitions and degree computation. Replace Cell with AtomicU8 to ensure thread-safe node state management, and add a merge method for combining disjoint graphs. Additionally, introduce progress tracking utilities and a test-utils feature flag for development dependencies.
This commit is contained in:
Generated
+2
@@ -884,6 +884,7 @@ checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash",
|
"ahash",
|
||||||
"allocator-api2",
|
"allocator-api2",
|
||||||
|
"rayon",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1488,6 +1489,7 @@ dependencies = [
|
|||||||
"hashbrown 0.14.5",
|
"hashbrown 0.14.5",
|
||||||
"obifastwrite",
|
"obifastwrite",
|
||||||
"obikseq",
|
"obikseq",
|
||||||
|
"rayon",
|
||||||
"xxhash-rust",
|
"xxhash-rust",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -7,8 +7,12 @@ edition = "2021"
|
|||||||
obikseq = { path = "../obikseq" }
|
obikseq = { path = "../obikseq" }
|
||||||
obifastwrite = { path = "../obifastwrite" }
|
obifastwrite = { path = "../obifastwrite" }
|
||||||
ahash = "0.8"
|
ahash = "0.8"
|
||||||
hashbrown = "0.14"
|
hashbrown = { version = "0.14", features = ["rayon"] }
|
||||||
|
rayon = "1"
|
||||||
xxhash-rust = { version = "0.8.15", features = ["xxh3", "const_xxh3"] }
|
xxhash-rust = { version = "0.8.15", features = ["xxh3", "const_xxh3"] }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
test-utils = []
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
obikseq = { path = "../obikseq", features = ["test-utils"] }
|
obikseq = { path = "../obikseq", features = ["test-utils"] }
|
||||||
|
|||||||
@@ -3,8 +3,9 @@ use hashbrown::HashMap;
|
|||||||
use obikseq::k;
|
use obikseq::k;
|
||||||
use obikseq::unitig::Unitig;
|
use obikseq::unitig::Unitig;
|
||||||
use obikseq::{CanonicalKmer, Kmer, Sequence};
|
use obikseq::{CanonicalKmer, Kmer, Sequence};
|
||||||
use std::cell::Cell;
|
use rayon::prelude::*;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use std::sync::atomic::{AtomicU8, Ordering};
|
||||||
use xxhash_rust::xxh3::Xxh3Builder;
|
use xxhash_rust::xxh3::Xxh3Builder;
|
||||||
|
|
||||||
// ── Types ─────────────────────────────────────────────────────────────────────
|
// ── Types ─────────────────────────────────────────────────────────────────────
|
||||||
@@ -140,7 +141,7 @@ impl fmt::Display for Node {
|
|||||||
// ── GraphDeBruijn ─────────────────────────────────────────────────────────────
|
// ── GraphDeBruijn ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
pub struct GraphDeBruijn {
|
pub struct GraphDeBruijn {
|
||||||
nodes: FastHashMap<CanonicalKmer, Cell<Node>>,
|
nodes: FastHashMap<CanonicalKmer, AtomicU8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GraphDeBruijn {
|
impl GraphDeBruijn {
|
||||||
@@ -158,24 +159,34 @@ impl GraphDeBruijn {
|
|||||||
|
|
||||||
/// Insert a canonical kmer into the graph. No-op if already present.
|
/// Insert a canonical kmer into the graph. No-op if already present.
|
||||||
pub fn push(&mut self, kmer: CanonicalKmer) {
|
pub fn push(&mut self, kmer: CanonicalKmer) {
|
||||||
self.nodes
|
self.nodes.entry(kmer).or_insert_with(|| AtomicU8::new(0));
|
||||||
.entry(kmer)
|
|
||||||
.or_insert_with(|| Cell::new(Node::default()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// For every node, find its unique right/left canonical neighbour (if any)
|
/// For every node, find its unique right/left canonical neighbour (if any)
|
||||||
/// and store the nucleotide index in the Node flags.
|
/// and store the nucleotide index in the Node flags.
|
||||||
///
|
///
|
||||||
/// Single pass thanks to Cell interior mutability.
|
/// In production builds, runs in parallel across all nodes (each entry is
|
||||||
|
/// written by exactly one thread). In test builds, runs sequentially to
|
||||||
|
/// avoid propagating thread-local k/m values to rayon worker threads.
|
||||||
pub fn compute_degrees(&self) {
|
pub fn compute_degrees(&self) {
|
||||||
for (&kmer, cell) in &self.nodes {
|
#[cfg(not(any(test, feature = "test-utils")))]
|
||||||
|
self.nodes.par_iter().for_each(|(&kmer, atomic)| {
|
||||||
let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes);
|
let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes);
|
||||||
let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes);
|
let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes);
|
||||||
|
let mut node = Node(atomic.load(Ordering::Relaxed));
|
||||||
let mut node = cell.get();
|
|
||||||
node.set_right(rc, rn);
|
node.set_right(rc, rn);
|
||||||
node.set_left(lc, ln);
|
node.set_left(lc, ln);
|
||||||
cell.set(node);
|
atomic.store(node.0, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
|
||||||
|
#[cfg(any(test, feature = "test-utils"))]
|
||||||
|
for (&kmer, atomic) in &self.nodes {
|
||||||
|
let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes);
|
||||||
|
let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes);
|
||||||
|
let mut node = Node(atomic.load(Ordering::Relaxed));
|
||||||
|
node.set_right(rc, rn);
|
||||||
|
node.set_left(lc, ln);
|
||||||
|
atomic.store(node.0, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -206,20 +217,20 @@ impl GraphDeBruijn {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> {
|
pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> {
|
||||||
self.nodes.get(kmer).map(|cell| cell.get().is_visited())
|
self.nodes.get(kmer).map(|a| Node(a.load(Ordering::Relaxed)).is_visited())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_visited(&self, kmer: CanonicalKmer) {
|
pub fn set_visited(&self, kmer: CanonicalKmer) {
|
||||||
if let Some(cell) = self.nodes.get(&kmer) {
|
if let Some(a) = self.nodes.get(&kmer) {
|
||||||
let mut node = cell.get();
|
let mut node = Node(a.load(Ordering::Relaxed));
|
||||||
node.set_visited();
|
node.set_visited();
|
||||||
cell.set(node);
|
a.store(node.0, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the single right neighbor of `kmer`, if it exists.
|
/// Returns the single right neighbor of `kmer`, if it exists.
|
||||||
pub fn the_single_right_neighbor(&self, kmer: CanonicalKmer) -> Option<CanonicalKmer> {
|
pub fn the_single_right_neighbor(&self, kmer: CanonicalKmer) -> Option<CanonicalKmer> {
|
||||||
let node = self.nodes.get(&kmer)?.get();
|
let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed));
|
||||||
if !node.can_extend_right() {
|
if !node.can_extend_right() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
@@ -229,7 +240,7 @@ impl GraphDeBruijn {
|
|||||||
|
|
||||||
/// Returns the single left neighbor of `kmer`, if it exists.
|
/// Returns the single left neighbor of `kmer`, if it exists.
|
||||||
pub fn the_single_left_neighbor(&self, kmer: CanonicalKmer) -> Option<CanonicalKmer> {
|
pub fn the_single_left_neighbor(&self, kmer: CanonicalKmer) -> Option<CanonicalKmer> {
|
||||||
let node = self.nodes.get(&kmer)?.get();
|
let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed));
|
||||||
if !node.can_extend_left() {
|
if !node.can_extend_left() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
@@ -253,7 +264,7 @@ impl GraphDeBruijn {
|
|||||||
|
|
||||||
fn next_unitig_kmer(&self, kmer: Kmer) -> Option<Kmer> {
|
fn next_unitig_kmer(&self, kmer: Kmer) -> Option<Kmer> {
|
||||||
let canonical = kmer.canonical();
|
let canonical = kmer.canonical();
|
||||||
let node = self.nodes.get(&canonical)?.get();
|
let node = Node(self.nodes.get(&canonical)?.load(Ordering::Relaxed));
|
||||||
|
|
||||||
let direct = kmer.raw() == canonical.raw();
|
let direct = kmer.raw() == canonical.raw();
|
||||||
|
|
||||||
@@ -270,8 +281,8 @@ impl GraphDeBruijn {
|
|||||||
canonical.into_kmer().push_left(node.left_nuc()).canonical()
|
canonical.into_kmer().push_left(node.left_nuc()).canonical()
|
||||||
};
|
};
|
||||||
|
|
||||||
let cell = self.nodes.get(&next_c)?;
|
let atomic = self.nodes.get(&next_c)?;
|
||||||
let next_node = cell.get();
|
let next_node = Node(atomic.load(Ordering::Relaxed));
|
||||||
if next_node.is_visited() {
|
if next_node.is_visited() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
@@ -287,7 +298,7 @@ impl GraphDeBruijn {
|
|||||||
|
|
||||||
let mut updated = next_node;
|
let mut updated = next_node;
|
||||||
updated.set_visited();
|
updated.set_visited();
|
||||||
cell.set(updated);
|
atomic.store(updated.0, Ordering::Relaxed);
|
||||||
Some(oriented)
|
Some(oriented)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -311,6 +322,17 @@ impl GraphDeBruijn {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Merge `other` into `self`.
|
||||||
|
///
|
||||||
|
/// The caller guarantees that the two graphs have **disjoint** kmer sets —
|
||||||
|
/// this is structurally true when each graph was built from a distinct
|
||||||
|
/// minimizer partition. No conflict resolution is needed: entries are moved
|
||||||
|
/// directly without checking for duplicates.
|
||||||
|
pub fn merge(&mut self, other: Self) {
|
||||||
|
self.nodes.reserve(other.nodes.len());
|
||||||
|
self.nodes.extend(other.nodes);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.nodes.len()
|
self.nodes.len()
|
||||||
}
|
}
|
||||||
@@ -323,7 +345,7 @@ impl GraphDeBruijn {
|
|||||||
// --- StartIter -----------------------------------------------------------------
|
// --- StartIter -----------------------------------------------------------------
|
||||||
struct StartIter<'a> {
|
struct StartIter<'a> {
|
||||||
graph: &'a GraphDeBruijn,
|
graph: &'a GraphDeBruijn,
|
||||||
nodes: hashbrown::hash_map::Iter<'a, CanonicalKmer, Cell<Node>>,
|
nodes: hashbrown::hash_map::Iter<'a, CanonicalKmer, AtomicU8>,
|
||||||
suspended: Vec<CanonicalKmer>,
|
suspended: Vec<CanonicalKmer>,
|
||||||
in_cycle_pass: bool,
|
in_cycle_pass: bool,
|
||||||
}
|
}
|
||||||
@@ -364,7 +386,7 @@ impl<'a> Iterator for StartIter<'a> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let node = match self.graph.nodes.get(¤t) {
|
let node = match self.graph.nodes.get(¤t) {
|
||||||
Some(c) => c.get(),
|
Some(a) => Node(a.load(Ordering::Relaxed)),
|
||||||
None => continue,
|
None => continue,
|
||||||
};
|
};
|
||||||
if node.is_visited() {
|
if node.is_visited() {
|
||||||
@@ -439,7 +461,7 @@ fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer {
|
|||||||
/// zero or ≥2 existing neighbours.
|
/// zero or ≥2 existing neighbours.
|
||||||
fn count_neighbors(
|
fn count_neighbors(
|
||||||
neighbors: [CanonicalKmer; 4],
|
neighbors: [CanonicalKmer; 4],
|
||||||
nodes: &FastHashMap<CanonicalKmer, Cell<Node>>,
|
nodes: &FastHashMap<CanonicalKmer, AtomicU8>,
|
||||||
) -> (u8, Option<u8>) {
|
) -> (u8, Option<u8>) {
|
||||||
let mut count = 0u8;
|
let mut count = 0u8;
|
||||||
let mut first = None;
|
let mut first = None;
|
||||||
|
|||||||
@@ -115,7 +115,7 @@ fn unitig_roundtrip_linear() {
|
|||||||
g.compute_degrees();
|
g.compute_degrees();
|
||||||
println!("Les kmers:");
|
println!("Les kmers:");
|
||||||
for (kmer, v) in g.nodes.iter() {
|
for (kmer, v) in g.nodes.iter() {
|
||||||
println!("{}: {}", String::from_utf8_lossy(&kmer.to_ascii()), v.get());
|
println!("{}: {}", String::from_utf8_lossy(&kmer.to_ascii()), v.load(std::sync::atomic::Ordering::Relaxed));
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("Les unitig:");
|
println!("Les unitig:");
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ use clap::Args;
|
|||||||
use obidebruinj::GraphDeBruijn;
|
use obidebruinj::GraphDeBruijn;
|
||||||
use obifastwrite::write_unitig;
|
use obifastwrite::write_unitig;
|
||||||
use obikindex::KmerIndex;
|
use obikindex::KmerIndex;
|
||||||
|
use obisys::{progress_bar, spinner};
|
||||||
|
use rayon::prelude::*;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
use super::predicate::FilterArgs;
|
use super::predicate::FilterArgs;
|
||||||
@@ -32,25 +34,37 @@ pub fn run(args: UnitigArgs) {
|
|||||||
info!("unitig: building de Bruijn graph from {n} partition(s) (k={k})");
|
info!("unitig: building de Bruijn graph from {n} partition(s) (k={k})");
|
||||||
|
|
||||||
let filters = args.filter.build_filters(&idx.meta().genomes);
|
let filters = args.filter.build_filters(&idx.meta().genomes);
|
||||||
|
let partition = idx.partition();
|
||||||
|
|
||||||
// ── Collect all filtered kmers into a single de Bruijn graph ─────────────
|
// ── Phase 1 : collect filtered kmers in parallel ──────────────────────────
|
||||||
let mut g = GraphDeBruijn::new();
|
let pb = progress_bar("unitig", n as u64, "partitions");
|
||||||
|
let g = (0..n)
|
||||||
for i in 0..n {
|
.into_par_iter()
|
||||||
idx.partition()
|
.fold(
|
||||||
|
GraphDeBruijn::new,
|
||||||
|
|mut local_g, i| {
|
||||||
|
partition
|
||||||
.iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| {
|
.iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| {
|
||||||
g.push(kmer);
|
local_g.push(kmer);
|
||||||
})
|
})
|
||||||
.unwrap_or_else(|e| {
|
.unwrap_or_else(|e| {
|
||||||
eprintln!("error reading partition {i}: {e}");
|
eprintln!("error reading partition {i}: {e}");
|
||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
}
|
pb.inc(1);
|
||||||
|
local_g
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.reduce(GraphDeBruijn::new, |mut a, b| { a.merge(b); a });
|
||||||
|
pb.finish_and_clear();
|
||||||
|
|
||||||
info!("unitig: {} distinct k-mers", g.len());
|
info!("unitig: {} distinct k-mers", g.len());
|
||||||
|
|
||||||
|
// ── Phase 2 : compute degrees (in-memory, no progress needed) ────────────
|
||||||
g.compute_degrees();
|
g.compute_degrees();
|
||||||
|
|
||||||
// ── Enumerate unitigs and write as FASTA ──────────────────────────────────
|
// ── Phase 3 : enumerate unitigs and write as FASTA ────────────────────────
|
||||||
|
let pb = spinner("unitig");
|
||||||
let stdout = io::stdout();
|
let stdout = io::stdout();
|
||||||
let mut out = BufWriter::new(stdout.lock());
|
let mut out = BufWriter::new(stdout.lock());
|
||||||
|
|
||||||
@@ -59,7 +73,11 @@ pub fn run(args: UnitigArgs) {
|
|||||||
eprintln!("write error: {e}");
|
eprintln!("write error: {e}");
|
||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
if j % 10_000 == 0 {
|
||||||
|
pb.set_message(format!("{j} unitigs written"));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
pb.finish_and_clear();
|
||||||
|
|
||||||
out.flush().expect("flush error");
|
out.flush().expect("flush error");
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user