refactor: migrate pipeline stages and improve graph processing
Refactored neighbor resolution to explicitly track unvisited indices for degree-1 nodes, updated display formatting, and added timing and debug logging to the degree computation routine. Migrated pipeline stages from eager vector returns to explicit flat implementations, enabling backpressure-aware streaming, configurable batch processing, incremental yielding, and progress tracking via a delta channel.
This commit is contained in:
@@ -6,6 +6,7 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::atomic::{AtomicU8, Ordering};
|
use std::sync::atomic::{AtomicU8, Ordering};
|
||||||
use xxhash_rust::xxh3::Xxh3Builder;
|
use xxhash_rust::xxh3::Xxh3Builder;
|
||||||
|
use std::time::Instant;
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
|
|
||||||
// ── Types ─────────────────────────────────────────────────────────────────────
|
// ── Types ─────────────────────────────────────────────────────────────────────
|
||||||
@@ -96,25 +97,6 @@ impl Node {
|
|||||||
(self.0 >> 5) & 0b11
|
(self.0 >> 5) & 0b11
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Number of left neighbours.
|
|
||||||
pub fn n_left_neighbours(self) -> u8 {
|
|
||||||
if self.can_extend_left() {
|
|
||||||
1
|
|
||||||
} else {
|
|
||||||
let v = (self.0 >> 5) & 0b11;
|
|
||||||
v + (v != 0) as u8
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Number of right neighbours.
|
|
||||||
pub fn n_right_neighbours(self) -> u8 {
|
|
||||||
if self.can_extend_right() {
|
|
||||||
1
|
|
||||||
} else {
|
|
||||||
let v = (self.0 >> 3) & 0b11;
|
|
||||||
v + (v != 0) as u8
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Marks the node as visited.
|
/// Marks the node as visited.
|
||||||
#[inline]
|
#[inline]
|
||||||
@@ -162,13 +144,17 @@ impl fmt::Display for Node {
|
|||||||
const NUC: [char; 4] = ['A', 'C', 'G', 'T'];
|
const NUC: [char; 4] = ['A', 'C', 'G', 'T'];
|
||||||
let r = if self.can_extend_right() {
|
let r = if self.can_extend_right() {
|
||||||
format!("→{}", NUC[self.right_nuc() as usize])
|
format!("→{}", NUC[self.right_nuc() as usize])
|
||||||
|
} else if (self.0 >> 3) & 0b11 == 0 {
|
||||||
|
"→0".to_string()
|
||||||
} else {
|
} else {
|
||||||
format!("→{}", self.n_right_neighbours())
|
"→≥2".to_string()
|
||||||
};
|
};
|
||||||
let l = if self.can_extend_left() {
|
let l = if self.can_extend_left() {
|
||||||
format!("←{}", NUC[self.left_nuc() as usize])
|
format!("←{}", NUC[self.left_nuc() as usize])
|
||||||
|
} else if (self.0 >> 5) & 0b11 == 0 {
|
||||||
|
"←0".to_string()
|
||||||
} else {
|
} else {
|
||||||
format!("←{}", self.n_left_neighbours())
|
"←≥2".to_string()
|
||||||
};
|
};
|
||||||
let v = if self.is_visited() { "V" } else { "." };
|
let v = if self.is_visited() { "V" } else { "." };
|
||||||
write!(f, "Node({r} {l} {v})")
|
write!(f, "Node({r} {l} {v})")
|
||||||
@@ -272,6 +258,7 @@ impl GraphDeBruijn {
|
|||||||
pub fn compute_degrees_and_mark_starts(&self) {
|
pub fn compute_degrees_and_mark_starts(&self) {
|
||||||
// Pass 1: count right/left neighbors for each node
|
// Pass 1: count right/left neighbors for each node
|
||||||
|
|
||||||
|
let t1 = Instant::now();
|
||||||
self.for_each_node(|kmer, atomic| {
|
self.for_each_node(|kmer, atomic| {
|
||||||
let mut old = Node(atomic.load(Ordering::Relaxed));
|
let mut old = Node(atomic.load(Ordering::Relaxed));
|
||||||
if old.is_visited() {
|
if old.is_visited() {
|
||||||
@@ -286,9 +273,11 @@ impl GraphDeBruijn {
|
|||||||
node.set_left(lc, ln);
|
node.set_left(lc, ln);
|
||||||
atomic.store(node.0, Ordering::Relaxed);
|
atomic.store(node.0, Ordering::Relaxed);
|
||||||
});
|
});
|
||||||
|
debug!("[compute_degrees] pass 1 (degrees): {:?} — {} nodes", t1.elapsed(), self.nodes.len());
|
||||||
|
|
||||||
// Pass 2: mark start nodes
|
// Pass 2: mark start nodes
|
||||||
|
|
||||||
|
let t2 = Instant::now();
|
||||||
self.for_each_node(|kmer, atomic| {
|
self.for_each_node(|kmer, atomic| {
|
||||||
let mut node = Node(atomic.load(Ordering::Relaxed));
|
let mut node = Node(atomic.load(Ordering::Relaxed));
|
||||||
if node.is_visited() {
|
if node.is_visited() {
|
||||||
@@ -299,6 +288,7 @@ impl GraphDeBruijn {
|
|||||||
atomic.store(node.0, Ordering::Relaxed);
|
atomic.store(node.0, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
debug!("[compute_degrees] pass 2 (starts): {:?} — {} nodes", t2.elapsed(), self.nodes.len());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> {
|
pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> {
|
||||||
@@ -527,23 +517,18 @@ fn count_neighbors(
|
|||||||
nodes: &FastHashMap<CanonicalKmer, AtomicU8>,
|
nodes: &FastHashMap<CanonicalKmer, AtomicU8>,
|
||||||
) -> (u8, Option<u8>) {
|
) -> (u8, Option<u8>) {
|
||||||
let mut count = 0u8;
|
let mut count = 0u8;
|
||||||
let mut first = None;
|
let mut nuc = 0u8;
|
||||||
for (i, neighbour) in neighbors.iter().enumerate() {
|
for (i, neighbour) in neighbors.iter().enumerate() {
|
||||||
if let Some(a) = nodes.get(neighbour) {
|
if let Some(a) = nodes.get(neighbour) {
|
||||||
if Node(a.load(Ordering::Relaxed)).is_visited() {
|
if Node(a.load(Ordering::Relaxed)).is_visited() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
nuc = i as u8;
|
||||||
count += 1;
|
count += 1;
|
||||||
if first.is_none() {
|
if count >= 2 { return (2, None); }
|
||||||
first = Some(i as u8);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if count == 1 {
|
if count == 1 { (1, Some(nuc)) } else { (0, None) }
|
||||||
(1, first)
|
|
||||||
} else {
|
|
||||||
(count, None)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── tests ─────────────────────────────────────────────────────────────────────
|
// ── tests ─────────────────────────────────────────────────────────────────────
|
||||||
|
|||||||
@@ -3,7 +3,11 @@ use std::io;
|
|||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use obipipeline::{Pipeline, WorkerPool, make_flat_transform, make_sink, make_source, make_transform};
|
use tracing::debug;
|
||||||
|
use obipipeline::{
|
||||||
|
Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool,
|
||||||
|
make_sink, make_source, make_transform,
|
||||||
|
};
|
||||||
|
|
||||||
use obicompactvec::{
|
use obicompactvec::{
|
||||||
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
|
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
|
||||||
@@ -232,23 +236,38 @@ impl KmerPartition {
|
|||||||
let pipeline = Pipeline::new(
|
let pipeline = Pipeline::new(
|
||||||
make_source!(Pass1Data, unitig_paths, File),
|
make_source!(Pass1Data, unitig_paths, File),
|
||||||
vec![
|
vec![
|
||||||
make_flat_transform!(Pass1Data, {
|
Stage::Flat(Arc::new(
|
||||||
move |path: PathBuf| -> Vec<Vec<CanonicalKmer>> {
|
move |data: Pass1Data,
|
||||||
match UnitigFileReader::open_sequential(&path) {
|
push: &PipelineSender<Result<Pass1Data, PipelineError>>,
|
||||||
Err(e) => {
|
delta: &PipelineSender<isize>|
|
||||||
*err_cap.lock().unwrap() = Some(e.to_string());
|
{
|
||||||
vec![]
|
if let Pass1Data::File(path) = data {
|
||||||
|
let reader = match UnitigFileReader::open_sequential(&path) {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
*err_cap.lock().unwrap() = Some(e.to_string());
|
||||||
|
delta.send(-1).ok();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
|
||||||
|
let mut count: isize = 0;
|
||||||
|
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||||
|
batch.push(kmer);
|
||||||
|
if batch.len() == BATCH {
|
||||||
|
let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH));
|
||||||
|
push.send(Ok(Pass1Data::Batch(b))).ok();
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(reader) => {
|
if !batch.is_empty() {
|
||||||
let kmers: Vec<CanonicalKmer> = reader
|
push.send(Ok(Pass1Data::Batch(batch))).ok();
|
||||||
.iter_indexed_canonical_kmers()
|
count += 1;
|
||||||
.map(|(k, _, _)| k)
|
|
||||||
.collect();
|
|
||||||
kmers.chunks(BATCH).map(|c| c.to_vec()).collect()
|
|
||||||
}
|
}
|
||||||
|
delta.send(count - 1).ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, File, Batch),
|
) as SharedFlatFn<Pass1Data>),
|
||||||
make_transform!(Pass1Data, {
|
make_transform!(Pass1Data, {
|
||||||
move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> {
|
move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> {
|
||||||
batch.into_iter()
|
batch.into_iter()
|
||||||
@@ -278,6 +297,7 @@ impl KmerPartition {
|
|||||||
.into_inner()
|
.into_inner()
|
||||||
.unwrap_or_else(|e| e.into_inner());
|
.unwrap_or_else(|e| e.into_inner());
|
||||||
let any_new = g.len() > 0;
|
let any_new = g.len() > 0;
|
||||||
|
debug!("partition {i}: de Bruijn graph done — {} new kmers", g.len());
|
||||||
|
|
||||||
// Build new layer from de Bruijn graph if there are new kmers.
|
// Build new layer from de Bruijn graph if there are new kmers.
|
||||||
let new_layer_idx = n_dst_layers;
|
let new_layer_idx = n_dst_layers;
|
||||||
@@ -430,36 +450,52 @@ impl KmerPartition {
|
|||||||
let pipeline2 = Pipeline::new(
|
let pipeline2 = Pipeline::new(
|
||||||
make_source!(Pass2Data, pass2_items, SrcLayer),
|
make_source!(Pass2Data, pass2_items, SrcLayer),
|
||||||
vec![
|
vec![
|
||||||
make_flat_transform!(Pass2Data, {
|
Stage::Flat(Arc::new(
|
||||||
move |(col_offset, src_n, src_layer_dir): (usize, usize, PathBuf)|
|
move |data: Pass2Data,
|
||||||
-> Vec<(usize, usize, Arc<SrcLayerData>, Vec<CanonicalKmer>)>
|
push: &PipelineSender<Result<Pass2Data, PipelineError>>,
|
||||||
|
delta: &PipelineSender<isize>|
|
||||||
{
|
{
|
||||||
let reader = match UnitigFileReader::open_sequential(
|
if let Pass2Data::SrcLayer((col_offset, src_n, src_layer_dir)) = data {
|
||||||
&src_layer_dir.join("unitigs.bin"),
|
let reader = match UnitigFileReader::open_sequential(
|
||||||
) {
|
&src_layer_dir.join("unitigs.bin"),
|
||||||
Ok(r) => r,
|
) {
|
||||||
Err(e) => {
|
Ok(r) => r,
|
||||||
*err_cap2.lock().unwrap() = Some(e.to_string());
|
Err(e) => {
|
||||||
return vec![];
|
*err_cap2.lock().unwrap() = Some(e.to_string());
|
||||||
|
delta.send(-1).ok();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let src_data = match SrcLayerData::open(&src_layer_dir, mode) {
|
||||||
|
Ok(d) => Arc::new(d),
|
||||||
|
Err(e) => {
|
||||||
|
*err_cap2.lock().unwrap() = Some(e.to_string());
|
||||||
|
delta.send(-1).ok();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
|
||||||
|
let mut count: isize = 0;
|
||||||
|
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||||
|
batch.push(kmer);
|
||||||
|
if batch.len() == BATCH {
|
||||||
|
let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH));
|
||||||
|
push.send(Ok(Pass2Data::RawBatch((
|
||||||
|
col_offset, src_n, Arc::clone(&src_data), b,
|
||||||
|
)))).ok();
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
if !batch.is_empty() {
|
||||||
let src_data = match SrcLayerData::open(&src_layer_dir, mode) {
|
push.send(Ok(Pass2Data::RawBatch((
|
||||||
Ok(d) => Arc::new(d),
|
col_offset, src_n, src_data, batch,
|
||||||
Err(e) => {
|
)))).ok();
|
||||||
*err_cap2.lock().unwrap() = Some(e.to_string());
|
count += 1;
|
||||||
return vec![];
|
|
||||||
}
|
}
|
||||||
};
|
delta.send(count - 1).ok();
|
||||||
let all_kmers: Vec<CanonicalKmer> = reader
|
}
|
||||||
.iter_indexed_canonical_kmers()
|
|
||||||
.map(|(kmer, _, _)| kmer)
|
|
||||||
.collect();
|
|
||||||
all_kmers
|
|
||||||
.chunks(BATCH)
|
|
||||||
.map(|c| (col_offset, src_n, Arc::clone(&src_data), c.to_vec()))
|
|
||||||
.collect()
|
|
||||||
}
|
}
|
||||||
}, SrcLayer, RawBatch),
|
) as SharedFlatFn<Pass2Data>),
|
||||||
make_transform!(Pass2Data, {
|
make_transform!(Pass2Data, {
|
||||||
move |(col_offset, src_n, src_data, kmers): (usize, usize, Arc<SrcLayerData>, Vec<CanonicalKmer>)|
|
move |(col_offset, src_n, src_data, kmers): (usize, usize, Arc<SrcLayerData>, Vec<CanonicalKmer>)|
|
||||||
-> Vec<(Option<usize>, usize, usize, u32)>
|
-> Vec<(Option<usize>, usize, usize, u32)>
|
||||||
|
|||||||
Reference in New Issue
Block a user