refactor: replace mutex with channels for parallel debruijn processing
Add `rayon` and `crossbeam-channel` dependencies to support concurrent execution. Replace the synchronous, mutex-protected closure pattern with a channel-based producer-consumer approach using `std::thread::scope`. This decouples unitig iteration from processing, eliminating lock contention and `Mutex` overhead while enabling parallel workloads.
This commit is contained in:
Generated
+1
@@ -1486,6 +1486,7 @@ name = "obidebruinj"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash",
|
"ahash",
|
||||||
|
"crossbeam-channel",
|
||||||
"hashbrown 0.14.5",
|
"hashbrown 0.14.5",
|
||||||
"obifastwrite",
|
"obifastwrite",
|
||||||
"obikseq",
|
"obikseq",
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ obifastwrite = { path = "../obifastwrite" }
|
|||||||
ahash = "0.8"
|
ahash = "0.8"
|
||||||
hashbrown = { version = "0.14", features = ["rayon"] }
|
hashbrown = { version = "0.14", features = ["rayon"] }
|
||||||
rayon = "1"
|
rayon = "1"
|
||||||
|
crossbeam-channel = "0.5"
|
||||||
xxhash-rust = { version = "0.8.15", features = ["xxh3", "const_xxh3"] }
|
xxhash-rust = { version = "0.8.15", features = ["xxh3", "const_xxh3"] }
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use obikseq::{CanonicalKmer, Sequence};
|
|||||||
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use crossbeam_channel;
|
||||||
use std::sync::atomic::{AtomicU8, Ordering};
|
use std::sync::atomic::{AtomicU8, Ordering};
|
||||||
use xxhash_rust::xxh3::Xxh3Builder;
|
use xxhash_rust::xxh3::Xxh3Builder;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
@@ -454,24 +455,30 @@ impl GraphDeBruijn {
|
|||||||
F: FnMut(&[u8]) -> Result<(), E> + Send,
|
F: FnMut(&[u8]) -> Result<(), E> + Send,
|
||||||
{
|
{
|
||||||
thread_local! {
|
thread_local! {
|
||||||
static BUF: std::cell::RefCell<Vec<u8>> = RefCell::new(Vec::with_capacity(4096));
|
static BUF: RefCell<Vec<u8>> = RefCell::new(Vec::with_capacity(4096));
|
||||||
}
|
}
|
||||||
let error = std::sync::Mutex::new(None::<E>);
|
let (tx, rx) = crossbeam_channel::bounded::<Vec<u8>>(rayon::current_num_threads() * 256);
|
||||||
let f = std::sync::Mutex::new(f);
|
std::thread::scope(|s| {
|
||||||
|
let writer = s.spawn(move || -> Result<(), E> {
|
||||||
|
let mut f = f;
|
||||||
|
for nucs in rx {
|
||||||
|
f(&nucs)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
self.for_each_unitig(|iter| {
|
self.for_each_unitig(|iter| {
|
||||||
if error.lock().unwrap().is_some() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
BUF.with(|buf| {
|
BUF.with(|buf| {
|
||||||
let mut buf = buf.borrow_mut();
|
let mut buf = buf.borrow_mut();
|
||||||
buf.clear();
|
buf.clear();
|
||||||
buf.extend(iter);
|
buf.extend(iter);
|
||||||
if let Err(e) = f.lock().unwrap()(&buf) {
|
let to_send = buf.clone();
|
||||||
*error.lock().unwrap() = Some(e);
|
buf.clear();
|
||||||
}
|
tx.send(to_send).ok();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
error.into_inner().unwrap().map_or(Ok(()), Err)
|
drop(tx);
|
||||||
|
writer.join().expect("writer thread panicked")
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
|
|||||||
Reference in New Issue
Block a user