From 8b563d0804b65dcbc23b231d54b644138fbbfdb4 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sat, 13 Jun 2026 09:49:33 +0200 Subject: [PATCH] refactor: migrate pipeline stages and improve graph processing Refactored neighbor resolution to explicitly track unvisited indices for degree-1 nodes, updated display formatting, and added timing and debug logging to the degree computation routine. Migrated pipeline stages from eager vector returns to explicit flat implementations, enabling backpressure-aware streaming, configurable batch processing, incremental yielding, and progress tracking via a delta channel. --- src/obidebruinj/src/debruijn.rs | 45 +++------ src/obikpartitionner/src/merge_layer.rs | 116 ++++++++++++++++-------- 2 files changed, 91 insertions(+), 70 deletions(-) diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index 4c499b4..cebe17c 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -6,6 +6,7 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::fmt; use std::sync::atomic::{AtomicU8, Ordering}; use xxhash_rust::xxh3::Xxh3Builder; +use std::time::Instant; use tracing::{debug, info}; // ── Types ───────────────────────────────────────────────────────────────────── @@ -96,25 +97,6 @@ impl Node { (self.0 >> 5) & 0b11 } - /// Number of left neighbours. - pub fn n_left_neighbours(self) -> u8 { - if self.can_extend_left() { - 1 - } else { - let v = (self.0 >> 5) & 0b11; - v + (v != 0) as u8 - } - } - - /// Number of right neighbours. - pub fn n_right_neighbours(self) -> u8 { - if self.can_extend_right() { - 1 - } else { - let v = (self.0 >> 3) & 0b11; - v + (v != 0) as u8 - } - } /// Marks the node as visited. #[inline] @@ -162,13 +144,17 @@ impl fmt::Display for Node { const NUC: [char; 4] = ['A', 'C', 'G', 'T']; let r = if self.can_extend_right() { format!("→{}", NUC[self.right_nuc() as usize]) + } else if (self.0 >> 3) & 0b11 == 0 { + "→0".to_string() } else { - format!("→{}", self.n_right_neighbours()) + "→≥2".to_string() }; let l = if self.can_extend_left() { format!("←{}", NUC[self.left_nuc() as usize]) + } else if (self.0 >> 5) & 0b11 == 0 { + "←0".to_string() } else { - format!("←{}", self.n_left_neighbours()) + "←≥2".to_string() }; let v = if self.is_visited() { "V" } else { "." }; write!(f, "Node({r} {l} {v})") @@ -272,6 +258,7 @@ impl GraphDeBruijn { pub fn compute_degrees_and_mark_starts(&self) { // Pass 1: count right/left neighbors for each node + let t1 = Instant::now(); self.for_each_node(|kmer, atomic| { let mut old = Node(atomic.load(Ordering::Relaxed)); if old.is_visited() { @@ -286,9 +273,11 @@ impl GraphDeBruijn { node.set_left(lc, ln); atomic.store(node.0, Ordering::Relaxed); }); + debug!("[compute_degrees] pass 1 (degrees): {:?} — {} nodes", t1.elapsed(), self.nodes.len()); // Pass 2: mark start nodes + let t2 = Instant::now(); self.for_each_node(|kmer, atomic| { let mut node = Node(atomic.load(Ordering::Relaxed)); if node.is_visited() { @@ -299,6 +288,7 @@ impl GraphDeBruijn { atomic.store(node.0, Ordering::Relaxed); } }); + debug!("[compute_degrees] pass 2 (starts): {:?} — {} nodes", t2.elapsed(), self.nodes.len()); } pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option { @@ -527,23 +517,18 @@ fn count_neighbors( nodes: &FastHashMap, ) -> (u8, Option) { let mut count = 0u8; - let mut first = None; + let mut nuc = 0u8; for (i, neighbour) in neighbors.iter().enumerate() { if let Some(a) = nodes.get(neighbour) { if Node(a.load(Ordering::Relaxed)).is_visited() { continue; } + nuc = i as u8; count += 1; - if first.is_none() { - first = Some(i as u8); - } + if count >= 2 { return (2, None); } } } - if count == 1 { - (1, first) - } else { - (count, None) - } + if count == 1 { (1, Some(nuc)) } else { (0, None) } } // ── tests ───────────────────────────────────────────────────────────────────── diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 4733cf4..2b67dd4 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -3,7 +3,11 @@ use std::io; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; -use obipipeline::{Pipeline, WorkerPool, make_flat_transform, make_sink, make_source, make_transform}; +use tracing::debug; +use obipipeline::{ + Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool, + make_sink, make_source, make_transform, +}; use obicompactvec::{ PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, @@ -232,23 +236,38 @@ impl KmerPartition { let pipeline = Pipeline::new( make_source!(Pass1Data, unitig_paths, File), vec![ - make_flat_transform!(Pass1Data, { - move |path: PathBuf| -> Vec> { - match UnitigFileReader::open_sequential(&path) { - Err(e) => { - *err_cap.lock().unwrap() = Some(e.to_string()); - vec![] + Stage::Flat(Arc::new( + move |data: Pass1Data, + push: &PipelineSender>, + delta: &PipelineSender| + { + if let Pass1Data::File(path) = data { + let reader = match UnitigFileReader::open_sequential(&path) { + Ok(r) => r, + Err(e) => { + *err_cap.lock().unwrap() = Some(e.to_string()); + delta.send(-1).ok(); + return; + } + }; + let mut batch: Vec = Vec::with_capacity(BATCH); + let mut count: isize = 0; + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + batch.push(kmer); + if batch.len() == BATCH { + let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH)); + push.send(Ok(Pass1Data::Batch(b))).ok(); + count += 1; + } } - Ok(reader) => { - let kmers: Vec = reader - .iter_indexed_canonical_kmers() - .map(|(k, _, _)| k) - .collect(); - kmers.chunks(BATCH).map(|c| c.to_vec()).collect() + if !batch.is_empty() { + push.send(Ok(Pass1Data::Batch(batch))).ok(); + count += 1; } + delta.send(count - 1).ok(); } } - }, File, Batch), + ) as SharedFlatFn), make_transform!(Pass1Data, { move |batch: Vec| -> Vec { batch.into_iter() @@ -278,6 +297,7 @@ impl KmerPartition { .into_inner() .unwrap_or_else(|e| e.into_inner()); let any_new = g.len() > 0; + debug!("partition {i}: de Bruijn graph done — {} new kmers", g.len()); // Build new layer from de Bruijn graph if there are new kmers. let new_layer_idx = n_dst_layers; @@ -430,36 +450,52 @@ impl KmerPartition { let pipeline2 = Pipeline::new( make_source!(Pass2Data, pass2_items, SrcLayer), vec![ - make_flat_transform!(Pass2Data, { - move |(col_offset, src_n, src_layer_dir): (usize, usize, PathBuf)| - -> Vec<(usize, usize, Arc, Vec)> + Stage::Flat(Arc::new( + move |data: Pass2Data, + push: &PipelineSender>, + delta: &PipelineSender| { - let reader = match UnitigFileReader::open_sequential( - &src_layer_dir.join("unitigs.bin"), - ) { - Ok(r) => r, - Err(e) => { - *err_cap2.lock().unwrap() = Some(e.to_string()); - return vec![]; + if let Pass2Data::SrcLayer((col_offset, src_n, src_layer_dir)) = data { + let reader = match UnitigFileReader::open_sequential( + &src_layer_dir.join("unitigs.bin"), + ) { + Ok(r) => r, + Err(e) => { + *err_cap2.lock().unwrap() = Some(e.to_string()); + delta.send(-1).ok(); + return; + } + }; + let src_data = match SrcLayerData::open(&src_layer_dir, mode) { + Ok(d) => Arc::new(d), + Err(e) => { + *err_cap2.lock().unwrap() = Some(e.to_string()); + delta.send(-1).ok(); + return; + } + }; + let mut batch: Vec = Vec::with_capacity(BATCH); + let mut count: isize = 0; + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + batch.push(kmer); + if batch.len() == BATCH { + let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH)); + push.send(Ok(Pass2Data::RawBatch(( + col_offset, src_n, Arc::clone(&src_data), b, + )))).ok(); + count += 1; + } } - }; - let src_data = match SrcLayerData::open(&src_layer_dir, mode) { - Ok(d) => Arc::new(d), - Err(e) => { - *err_cap2.lock().unwrap() = Some(e.to_string()); - return vec![]; + if !batch.is_empty() { + push.send(Ok(Pass2Data::RawBatch(( + col_offset, src_n, src_data, batch, + )))).ok(); + count += 1; } - }; - let all_kmers: Vec = reader - .iter_indexed_canonical_kmers() - .map(|(kmer, _, _)| kmer) - .collect(); - all_kmers - .chunks(BATCH) - .map(|c| (col_offset, src_n, Arc::clone(&src_data), c.to_vec())) - .collect() + delta.send(count - 1).ok(); + } } - }, SrcLayer, RawBatch), + ) as SharedFlatFn), make_transform!(Pass2Data, { move |(col_offset, src_n, src_data, kmers): (usize, usize, Arc, Vec)| -> Vec<(Option, usize, usize, u32)>