From 27088ab810fadbaf29262abb3bd5c34491e62e41 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 5 Jun 2026 18:31:52 +0200 Subject: [PATCH] refactor: optimize unitig iteration and graph traversal Switches unitig processing to a lazy, fallible `try_for_each_unitig` API across partitioner layers, reducing intermediate allocations and enabling proper error propagation. Refactors de Bruijn graph traversal into a two-pass algorithm with explicit node flags, named constants, and diagnostic logging. Introduces parallel chain processing and staged performance profiling for the unitig command, and adds a memory-efficient `FromIterator` implementation for packed nucleotide sequences. --- src/obidebruinj/src/debruijn.rs | 728 ++++++++++++++-------- src/obikmer/src/cmd/unitig.rs | 13 +- src/obikpartitionner/src/index_layer.rs | 7 +- src/obikpartitionner/src/merge_layer.rs | 7 +- src/obikpartitionner/src/rebuild_layer.rs | 7 +- src/obikseq/src/packed_seq.rs | 23 + 6 files changed, 511 insertions(+), 274 deletions(-) diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index f96958c..ee0491b 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -1,9 +1,9 @@ //use ahash::RandomState; use hashbrown::HashMap; use obikseq::k; -use obikseq::unitig::Unitig; use obikseq::{CanonicalKmer, Kmer, Sequence}; use rayon::prelude::*; +use std::arch::naked_asm; use std::fmt; use std::sync::atomic::{AtomicU8, Ordering}; use xxhash_rust::xxh3::Xxh3Builder; @@ -20,7 +20,7 @@ type FastHashMap = HashMap; // bit 2 : visited // bits 3–4 : right_nuc — index 0–3 (A/C/G/T) of that neighbour; valid iff bit 0 = 1 // bits 5–6 : left_nuc — index 0–3 (A/C/G/T) of that neighbour; valid iff bit 1 = 1 -// bit 7 : reserved (0) +// bit 7 : marked as start node (1) // // "can_extend" = false covers both 0 neighbours and ≥2 neighbours; the only // information needed for traversal is "exactly one". @@ -29,46 +29,73 @@ type FastHashMap = HashMap; #[derive(Debug, Clone, Copy, Default)] pub struct Node(u8); +const CAN_EXTEND_RIGHT_MASK: u8 = 0b0000_0001; // bit 0: can_extend_right — exactly one right canonical neighbour exists +const CAN_EXTEND_LEFT_MASK: u8 = 0b0000_0010; // bit 1: can_extend_left — exactly one left canonical neighbour exists +const IS_VISITED_MASK: u8 = 0b0000_0100; // bit 2: visited +const RIGHT_NUC_MASK: u8 = 0b0001_1000; // bits 3–4: right_nuc — index 0–3 (A/C/G/T) of that neighbour; valid iff bit 0 = 1 +const LEFT_NUC_MASK: u8 = 0b0110_0000; // bits 5–6: left_nuc — index 0–3 (A/C/G/T) of that neighbour; valid iff bit 1 = 1 +const IS_START_MASK: u8 = 0b1000_0000; // bit 7: marked as start node + impl Node { /// Returns `true` if the node can be extended to the right. /// /// A single right neighbour exists. + #[inline] pub fn can_extend_right(self) -> bool { - self.0 & 0b0000_0001 != 0 + self.0 & CAN_EXTEND_RIGHT_MASK != 0 } /// Returns `true` if the node can be extended to the left. /// /// A single left neighbour exists. + #[inline] pub fn can_extend_left(self) -> bool { - self.0 & 0b0000_0010 != 0 + self.0 & CAN_EXTEND_LEFT_MASK != 0 } /// Returns `true` if the node has been visited. + #[inline] pub fn is_visited(self) -> bool { - self.0 & 0b0000_0100 != 0 + self.0 & IS_VISITED_MASK != 0 + } + + /// Returns `true` if the node is a start node. + #[inline] + pub fn is_start(self) -> bool { + self.0 & IS_START_MASK != 0 + } + + #[inline] + pub fn set_start(&mut self) { + self.0 |= IS_START_MASK; + } + + pub fn unset_start(&mut self) { + self.0 &= !IS_START_MASK; } /// Index of the unique right neighbour (0=A, 1=C, 2=G, 3=T). /// Only meaningful when `can_extend_right()` is true. + #[inline] pub fn right_nuc(self) -> u8 { + debug_assert!( + self.can_extend_right(), + "from: right_nuc -> The node cannot be extended to the right" + ); (self.0 >> 3) & 0b11 } /// Index of the unique left neighbour (0=A, 1=C, 2=G, 3=T). /// Only meaningful when `can_extend_left()` is true. + #[inline] pub fn left_nuc(self) -> u8 { + debug_assert!( + self.can_extend_left(), + "from: left_nuc -> The node cannot be extended to the left" + ); (self.0 >> 5) & 0b11 } - /// Marks the node as visited. - pub fn set_visited(&mut self) { - if self.is_visited() { - unreachable!("from: is_visited -> The node has already been visited") - } - self.0 |= 0b0000_0100; - } - /// Number of left neighbours. pub fn n_left_neighbours(self) -> u8 { if self.can_extend_left() { @@ -89,12 +116,22 @@ impl Node { } } + /// Marks the node as visited. + #[inline] + pub fn set_visited(&mut self) { + debug_assert!( + !self.is_visited(), + "from: is_visited -> The node has already been visited" + ); + self.0 |= IS_VISITED_MASK; + } + /// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 3–4 = nucleotide index). /// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 3–4 as count.sat_sub(1). pub fn set_right(&mut self, count: u8, nuc: Option) { - self.0 &= !(0b0000_0001 | 0b001_1000); + self.0 &= !(CAN_EXTEND_RIGHT_MASK | RIGHT_NUC_MASK); if count == 1 { - self.0 |= 0b0000_0001; + self.0 |= CAN_EXTEND_RIGHT_MASK; if let Some(n) = nuc { self.0 |= (n & 0b11) << 3; return; @@ -107,9 +144,9 @@ impl Node { /// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 3–4 = nucleotide index). /// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 3–4 as count.sat_sub(1). pub fn set_left(&mut self, count: u8, nuc: Option) { - self.0 &= !(0b0000_0010 | 0b0110_0000); + self.0 &= !(CAN_EXTEND_LEFT_MASK | LEFT_NUC_MASK); if count == 1 { - self.0 |= 0b0000_0010; + self.0 |= CAN_EXTEND_LEFT_MASK; if let Some(n) = nuc { self.0 |= (n & 0b11) << 5; return; @@ -151,6 +188,16 @@ impl GraphDeBruijn { } } + fn for_each_node(&self, f: F) + where + F: Fn(&CanonicalKmer, &AtomicU8) + Sync, + { + #[cfg(not(any(test, feature = "test-utils")))] + self.nodes.par_iter().for_each(|(k, a)| f(k, a)); + #[cfg(any(test, feature = "test-utils"))] + self.nodes.iter().for_each(|(k, a)| f(k, a)); + } + pub fn with_capacity(capacity: usize) -> Self { Self { nodes: FastHashMap::with_capacity_and_hasher(capacity, Xxh3Builder::new()), @@ -169,25 +216,36 @@ impl GraphDeBruijn { /// written by exactly one thread). In test builds, runs sequentially to /// avoid propagating thread-local k/m values to rayon worker threads. pub fn compute_degrees(&self) { - #[cfg(not(any(test, feature = "test-utils")))] - self.nodes.par_iter().for_each(|(&kmer, atomic)| { + // Pass 1: count right/left neighbors for each node + + self.for_each_node(|kmer, atomic| { + let mut old = Node(atomic.load(Ordering::Relaxed)); + if old.is_visited() { + old.unset_start(); + atomic.store(old.0, Ordering::Relaxed); + return; + } let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes); let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes); - let mut node = Node(atomic.load(Ordering::Relaxed)); + let mut node = Node(0); // reset all bits (visited=0, start=0) node.set_right(rc, rn); node.set_left(lc, ln); atomic.store(node.0, Ordering::Relaxed); }); - #[cfg(any(test, feature = "test-utils"))] - for (&kmer, atomic) in &self.nodes { - let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes); - let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes); + // Pass 2: mark start nodes + + self.for_each_node(|kmer, atomic| { let mut node = Node(atomic.load(Ordering::Relaxed)); - node.set_right(rc, rn); - node.set_left(lc, ln); - atomic.store(node.0, Ordering::Relaxed); - } + if node.is_visited() { + return; + } + if self.is_start(*kmer, node) { + node.set_start(); + node.set_visited(); + atomic.store(node.0, Ordering::Relaxed); + } + }); } /// Iterates over the right neighbors of `kmer`. @@ -217,49 +275,17 @@ impl GraphDeBruijn { } pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option { - self.nodes.get(kmer).map(|a| Node(a.load(Ordering::Relaxed)).is_visited()) + self.nodes + .get(kmer) + .map(|a| Node(a.load(Ordering::Relaxed)).is_visited()) } pub fn set_visited(&self, kmer: CanonicalKmer) { if let Some(a) = self.nodes.get(&kmer) { - a.fetch_or(0b0000_0100, Ordering::AcqRel); + a.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); } } - /// Returns the single right neighbor of `kmer`, if it exists. - pub fn the_single_right_neighbor(&self, kmer: CanonicalKmer) -> Option { - let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed)); - if !node.can_extend_right() { - return None; - } - let next = kmer.into_kmer().push_right(node.right_nuc()).canonical(); - self.nodes.contains_key(&next).then_some(next) - } - - /// Returns the single left neighbor of `kmer`, if it exists. - pub fn the_single_left_neighbor(&self, kmer: CanonicalKmer) -> Option { - let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed)); - if !node.can_extend_left() { - return None; - } - let next = kmer.into_kmer().push_left(node.left_nuc()).canonical(); - self.nodes.contains_key(&next).then_some(next) - } - - /// Internal iterator over unitig-start nodes; drives `iter_unitig`. - /// - /// MUST NOT be consumed standalone: the second pass finds cycle nodes only - /// because `iter_unitig` lazily interleaves chain traversal between the two passes. - /// - /// Two passes: - /// 1. Chain ends / isolated nodes (at most one extension missing): - /// - `!can_extend_left` → yield canonical form - /// - `!can_extend_right` → yield reverse complement - /// 2. Nodes still unvisited → part of a cycle; yield canonical form. - fn start_iter(&self) -> impl Iterator)> + '_ { - StartIter::new(self) - } - fn next_unitig_kmer(&self, kmer: Kmer) -> Option { let canonical = kmer.canonical(); let node = Node(self.nodes.get(&canonical)?.load(Ordering::Relaxed)); @@ -307,17 +333,305 @@ impl GraphDeBruijn { } } - pub fn iter_unitig(&self) -> impl Iterator + '_ { + fn unitig_nucleotides(&self, start: CanonicalKmer, node: Node, k: usize) -> UnitigNucIter<'_> { + let chain = if node.can_extend_right() { + let next_c = start.into_kmer().push_right(node.right_nuc()).canonical(); + self.nodes.get(&next_c).and_then(|next_a| { + let old = next_a.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); + if old & IS_VISITED_MASK == 0 { + let oriented = oriented_next(start.into_kmer(), next_c); + Some(self.iter_unitig_kmers(oriented)) + } else { + None + } + }) + } else { + None + }; + UnitigNucIter { + start, + pos: 0, + k, + chain, + } + } + + pub fn for_each_unitig(&self, f: impl Fn(UnitigNucIter<'_>) + Sync) { let k = k(); - self.start_iter().map(move |(start, first_next)| { - let mut nucs: Vec = (0..k).map(|i| start.nucleotide(i)).collect(); - if let Some(next_c) = first_next { - for kmer in self.iter_unitig_kmers(next_c) { - nucs.push(kmer.nucleotide(k - 1)); + let n_chains = std::sync::atomic::AtomicUsize::new(0); + let n2 = std::sync::atomic::AtomicUsize::new(0); + + // Boucle unique : traiter les starts, recalculer les arités, recommencer + let mut pass = 0usize; + loop { + let n_new = std::sync::atomic::AtomicUsize::new(0); + + #[cfg(not(any(test, feature = "test-utils")))] + self.nodes + .par_iter() + .filter_map(|(&kmer, atomic)| { + let node = Node(atomic.load(Ordering::Acquire)); + node.is_start().then_some((kmer, node)) + }) + .for_each(|(start, node)| { + n_new.fetch_add(1, Ordering::Relaxed); + f(self.unitig_nucleotides(start, node, k)); + }); + + #[cfg(any(test, feature = "test-utils"))] + self.nodes.iter().for_each(|(&kmer, atomic)| { + let node = Node(atomic.load(Ordering::Acquire)); + if node.is_start() { + n_new.fetch_add(1, Ordering::Relaxed); + f(self.unitig_nucleotides(kmer, node, k)); + } + }); + + let n = n_new.load(Ordering::Relaxed); + eprintln!("[for_each_unitig] pass {}: {} starts", pass, n); + n_chains.fetch_add(n, Ordering::Relaxed); + pass += 1; + if n == 0 { + break; + } + self.compute_degrees(); + } + + // #[cfg(debug_assertions)] + { + let mut residual = GraphDeBruijn { + nodes: FastHashMap::with_hasher(Xxh3Builder::new()), + }; + for (&kmer, atomic) in &self.nodes { + if !Node(atomic.load(Ordering::Relaxed)).is_visited() { + residual.nodes.insert(kmer, AtomicU8::new(0)); } } - Unitig::from_nucleotides(&nucs) - }) + + let mut sample = 0; + for (&kmer, _) in residual.nodes.iter().take(1000) { + let left = kmer.left_canonical_neighbors(); + let real_left = left + .iter() + .filter(|&nb| residual.nodes.contains_key(nb)) + .count(); + let right = kmer.right_canonical_neighbors(); + let real_right = right + .iter() + .filter(|&nb| residual.nodes.contains_key(nb)) + .count(); + if real_left != 1 || real_right == 1 { + // normal + } else { + sample += 1; + if sample <= 10 { + eprintln!("Kmer {:?}: left={} right={}", kmer, real_left, real_right); + } + } + } + residual.compute_degrees(); + + let mut n_starts = 0usize; + let mut n_no_right = 0usize; + let mut n_no_left = 0usize; + + for (_, a) in &residual.nodes { + let node = Node(a.load(Ordering::Relaxed)); + if node.is_start() { + n_starts += 1; + } + if !node.can_extend_right() { + n_no_right += 1; + } + if !node.can_extend_left() { + n_no_left += 1; + } + } + + eprintln!( + "[for_each_unitig] residual after cascade: {} nodes | starts={} no_right={} no_left={}", + residual.nodes.len(), + n_starts, + n_no_right, + n_no_left, + ); + + let mut left_mismatch = 0; + let mut right_mismatch = 0; + let mut total_left = 0; + let mut total_right = 0; + let n_residual = residual.nodes.len(); + + for (&kmer, atomic) in &residual.nodes { + let node = Node(atomic.load(Ordering::Relaxed)); + + let left_neighbors = kmer.left_canonical_neighbors(); + let actual_left = left_neighbors + .iter() + .filter(|&nb| residual.nodes.contains_key(nb)) + .count(); + total_left += actual_left; + if (actual_left == 1) != node.can_extend_left() { + left_mismatch += 1; + } + + let right_neighbors = kmer.right_canonical_neighbors(); + let actual_right = right_neighbors + .iter() + .filter(|&nb| residual.nodes.contains_key(nb)) + .count(); + total_right += actual_right; + if (actual_right == 1) != node.can_extend_right() { + right_mismatch += 1; + } + } + + eprintln!( + "[consistency] N={} total_left={} total_right={} left_mismatch={} right_mismatch={}", + n_residual, total_left, total_right, left_mismatch, right_mismatch + ); + + let mut real_starts = 0; + let mut sample = 0; + for (&kmer, atomic) in &residual.nodes { + let node = Node(atomic.load(Ordering::Relaxed)); + // Calcul réel des voisins gauches (les 4 possibilités) + let left_neighbors = kmer.left_canonical_neighbors(); + let real_left_count = left_neighbors + .iter() + .filter(|&nb| residual.nodes.contains_key(nb)) + .count(); + let right_neighbors = kmer.right_canonical_neighbors(); + let real_right_count = right_neighbors + .iter() + .filter(|&nb| residual.nodes.contains_key(nb)) + .count(); + + // Vérification de cohérence avec les flags + if (real_left_count == 1) != node.can_extend_left() { + eprintln!( + "Incoherence left for {:?}: real={}, flag={}", + kmer, + real_left_count, + node.can_extend_left() + ); + } + if (real_right_count == 1) != node.can_extend_right() { + eprintln!( + "Incoherence right for {:?}: real={}, flag={}", + kmer, + real_right_count, + node.can_extend_right() + ); + } + + // Détermination si c'est un start selon la définition avec les vrais voisins + let is_start = if real_left_count != 1 { + true + } else { + // Trouver l'unique prédécesseur réel + let pred = left_neighbors + .iter() + .find(|&nb| residual.nodes.contains_key(nb)) + .unwrap(); + let pred_node = Node(residual.nodes.get(pred).unwrap().load(Ordering::Relaxed)); + let pred_right_neighbors = pred.right_canonical_neighbors(); + let pred_real_right_count = pred_right_neighbors + .iter() + .filter(|&nb| residual.nodes.contains_key(nb)) + .count(); + pred_real_right_count != 1 + }; + if is_start { + real_starts += 1; + if sample < 10 { + sample += 1; + eprintln!( + "Real start: {:?}, left_count={}, right_count={}", + kmer, real_left_count, real_right_count + ); + } + } + } + eprintln!("[real starts] count = {}", real_starts); + + let mut ok = 0; + let mut missing_pred = 0; + let mut pred_visited = 0; + let mut pred_no_right = 0; + let mut mismatch = 0; + + for (&kmer, atomic) in &residual.nodes { + let node = Node(atomic.load(Ordering::Relaxed)); + if !node.can_extend_right() { + // Le prédécesseur unique (car can_extend_left est vrai pour tous) + let pred = kmer.left_canonical_neighbors()[node.left_nuc() as usize]; + if let Some(pred_atomic) = residual.nodes.get(&pred) { + let pred_node = Node(pred_atomic.load(Ordering::Relaxed)); + if pred_node.can_extend_right() { + let succ = + pred.right_canonical_neighbors()[pred_node.right_nuc() as usize]; + if succ == kmer { + ok += 1; + } else { + mismatch += 1; + eprintln!( + "Mismatch: pred {:?} right_nuc={} -> {:?} != {:?}", + pred, + pred_node.right_nuc(), + succ, + kmer + ); + } + } else { + pred_no_right += 1; + eprintln!("Pred {:?} has !can_extend_right", pred); + } + } else { + // Prédécesseur absent du résidu : vérifier s'il est visité + if let Some(orig) = self.nodes.get(&pred) { + if Node(orig.load(Ordering::Relaxed)).is_visited() { + pred_visited += 1; + } else { + missing_pred += 1; + } + } else { + missing_pred += 1; + } + } + } + } + eprintln!( + "[diagnostic] nodes without right: ok={} missing_pred={} pred_visited={} pred_no_right={} mismatch={}", + ok, missing_pred, pred_visited, pred_no_right, mismatch + ); + } + + // Pass 2: cycles and tails — always sequential + for (&kmer, atomic) in &self.nodes { + let node = Node(atomic.load(Ordering::Acquire)); + if node.is_visited() { + continue; + } + let start = if !node.can_extend_right() && node.can_extend_left() { + self.find_left_chain_start(kmer) + } else { + kmer + }; + let start_atomic = &self.nodes[&start]; + let old = start_atomic.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); + if old & IS_VISITED_MASK == 0 { + n2.fetch_add(1, Ordering::Relaxed); + f(self.unitig_nucleotides(start, Node(old), k)); + } + } + + eprintln!( + "[for_each_unitig] chains={} phase2={} total={}", + n_chains.load(Ordering::Relaxed), + n2.load(Ordering::Relaxed), + n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed), + ); } /// Merge `other` into `self`. @@ -331,111 +645,66 @@ impl GraphDeBruijn { self.nodes.extend(other.nodes); } - /// Build one unitig starting at `start`, whose node flags are `node` (pre-claim value). - /// The caller has already atomically claimed `start`; this method claims subsequent nodes. - /// - /// The first right extension is always included (mirroring the original `start_iter` - /// behaviour). Branching checks for subsequent extensions are handled inside - /// `iter_unitig_kmers` → `next_unitig_kmer`. - fn collect_from_start(&self, start: CanonicalKmer, node: Node, k: usize) -> Unitig { - let mut nucs: Vec = (0..k).map(|i| start.nucleotide(i)).collect(); - if node.can_extend_right() { - let next_c = start.into_kmer().push_right(node.right_nuc()).canonical(); - if let Some(next_a) = self.nodes.get(&next_c) { - let old = next_a.fetch_or(0b0000_0100, Ordering::AcqRel); - if old & 0b0000_0100 == 0 { - let oriented = oriented_next(start.into_kmer(), next_c); - nucs.push(oriented.nucleotide(k - 1)); - for kmer in self.iter_unitig_kmers(oriented) { - nucs.push(kmer.nucleotide(k - 1)); - } - } - } - } - Unitig::from_nucleotides(&nucs) - } - - /// Returns `true` if `start` is a unitig start node: + /// Returns `true` if `query` is a unitig start node: /// - no unique left predecessor (`!can_extend_left`), or /// - unique left predecessor exists but cannot extend right /// (i.e., no chain traversal from the left can reach `start`). - fn is_start(&self, start: CanonicalKmer, node: Node) -> bool { + fn find_left_chain_start(&self, kmer: CanonicalKmer) -> CanonicalKmer { + let mut current = kmer; + loop { + let node = Node(self.nodes[¤t].load(Ordering::Acquire)); + if !node.can_extend_left() { + return current; + } + let pred = current.into_kmer().push_left(node.left_nuc()).canonical(); + let Some(pred_a) = self.nodes.get(&pred) else { + return current; + }; + let pred_node = Node(pred_a.load(Ordering::Acquire)); + if pred_node.is_visited() { + return current; + } + if !pred_node.can_extend_right() { + return current; + } + // Stop if asymmetry: pred's right canonical neighbor is not current + let pred_right = pred.into_kmer().push_right(pred_node.right_nuc()).canonical(); + if pred_right != current { + return current; + } + current = pred; + } + } + + fn is_start(&self, query: CanonicalKmer, node: Node) -> bool { if !node.can_extend_left() { return true; } - let pred = start.into_kmer().push_left(node.left_nuc()).canonical(); - self.nodes.get(&pred) + let pred = query.into_kmer().push_left(node.left_nuc()).canonical(); + self.nodes + .get(&pred) .map(|a| !Node(a.load(Ordering::Acquire)).can_extend_right()) .unwrap_or(false) } - /// Call `f` once per unitig. - /// - /// Uses the extended start definition: a node is a start if it has no unique - /// left predecessor, or if its left predecessor cannot extend right (so no chain - /// traversal from the left could ever claim it). - /// - /// Two-step execution to avoid races between start claiming and chain extension: - /// 1. Claim all starts atomically (parallel). - /// 2. Build and emit chains from claimed starts (parallel). - /// - /// Parallel in production builds; sequential in test builds. - /// Must be called before [`for_each_remaining_unitig`]. - pub fn par_for_each_chain_unitig(&self, f: impl Fn(Unitig) + Sync) { - let k = k(); - - #[cfg(not(any(test, feature = "test-utils")))] - { - // Step 1 — claim all starts in parallel. - let starts: Vec = self.nodes - .par_iter() - .filter_map(|(&start, atomic)| { - let node = Node(atomic.load(Ordering::Acquire)); - if node.is_visited() || !self.is_start(start, node) { - return None; - } - let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel); - (old & 0b0000_0100 == 0).then_some(start) - }) - .collect(); - - // Step 2 — build chains in parallel. - starts.into_par_iter().for_each(|start| { - let node = Node(self.nodes[&start].load(Ordering::Acquire)); - f(self.collect_from_start(start, node, k)); - }); - } - - #[cfg(any(test, feature = "test-utils"))] - { - let starts: Vec = self.nodes - .iter() - .filter_map(|(&start, atomic)| { - let node = Node(atomic.load(Ordering::Acquire)); - if node.is_visited() || !self.is_start(start, node) { - return None; - } - let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel); - (old & 0b0000_0100 == 0).then_some(start) - }) - .collect(); - - for start in starts { - let node = Node(self.nodes[&start].load(Ordering::Acquire)); - f(self.collect_from_start(start, node, k)); - } - } - } - - /// Call `f` for each node still unvisited after [`par_for_each_chain_unitig`]. - /// Handles true cycles and rare deeply-nested junctions. Always sequential. - pub fn for_each_remaining_unitig(&self, f: impl Fn(Unitig)) { + pub fn try_for_each_unitig(&self, mut f: F) -> Result<(), E> + where + F: FnMut(UnitigNucIter<'_>) -> Result<(), E>, + { let k = k(); for (&kmer, atomic) in &self.nodes { - let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel); - if old & 0b0000_0100 != 0 { continue; } - f(self.collect_from_start(kmer, Node(old), k)); + let node = Node(atomic.load(Ordering::Acquire)); + if node.is_start() { + f(self.unitig_nucleotides(kmer, node, k))?; + } } + for (&kmer, atomic) in &self.nodes { + let old = atomic.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); + if old & IS_VISITED_MASK == 0 { + f(self.unitig_nucleotides(kmer, Node(old), k))?; + } + } + Ok(()) } pub fn len(&self) -> usize { @@ -447,93 +716,6 @@ impl GraphDeBruijn { } } -// --- StartIter ----------------------------------------------------------------- -struct StartIter<'a> { - graph: &'a GraphDeBruijn, - nodes: hashbrown::hash_map::Iter<'a, CanonicalKmer, AtomicU8>, - suspended: Vec, - in_cycle_pass: bool, -} - -impl<'a> StartIter<'a> { - fn new(graph: &'a GraphDeBruijn) -> Self { - Self { - graph, - nodes: graph.nodes.iter(), - suspended: Vec::new(), - in_cycle_pass: false, - } - } -} - -impl<'a> Iterator for StartIter<'a> { - type Item = (CanonicalKmer, Option); - - fn next(&mut self) -> Option<(CanonicalKmer, Option)> { - loop { - let current = if let Some(k) = self.suspended.pop() { - k - } else { - match self.nodes.next() { - Some((&k, _)) => k, - None => { - if self.in_cycle_pass { - return None; - } - self.in_cycle_pass = true; - self.nodes = self.graph.nodes.iter(); - match self.nodes.next() { - Some((&k, _)) => k, - None => return None, - } - } - } - }; - - let node = match self.graph.nodes.get(¤t) { - Some(a) => Node(a.load(Ordering::Relaxed)), - None => continue, - }; - if node.is_visited() { - continue; - } - if !self.in_cycle_pass && node.can_extend_left() { - continue; - } - - self.graph.set_visited(current); - - if let Some(next) = self.graph.the_single_right_neighbor(current) { - if self.graph.is_visited(&next).unwrap_or(true) { - return Some((current, None)); - } - self.graph.set_visited(next); - let oriented = oriented_next(current.into_kmer(), next); - return Some((current, Some(oriented))); - } - - let mut first_neighbor: Option = None; - for neighbor in self.graph.iter_right_neighbors(current) { - if self.graph.is_visited(&neighbor).unwrap_or(true) { - continue; - } - if first_neighbor.is_none() { - self.graph.set_visited(neighbor); - first_neighbor = Some(neighbor); - } else { - self.suspended.push(neighbor); - } - } - - let oriented = match first_neighbor { - Some(neighbor) => Some(oriented_next(current.into_kmer(), neighbor)), - None => None, - }; - return Some((current, oriented)); - } - } -} - // ── UnitigIter ──────────────────────────────────────────────────────────────── struct UnitigIter<'a> { @@ -551,12 +733,42 @@ impl Iterator for UnitigIter<'_> { } } +// ── UnitigNucIter ───────────────────────────────────────────────────────────── + +pub struct UnitigNucIter<'a> { + start: CanonicalKmer, + pos: usize, + k: usize, + chain: Option>, +} + +impl Iterator for UnitigNucIter<'_> { + type Item = u8; + + fn next(&mut self) -> Option { + if self.pos < self.k { + let nuc = self.start.nucleotide(self.pos); + self.pos += 1; + Some(nuc) + } else { + self.chain + .as_mut()? + .next() + .map(|kmer| kmer.nucleotide(self.k - 1)) + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.k - self.pos, None) + } +} + // ── helpers ─────────────────────────────────────────────────────────────────── /// Atomically set the visited bit. Returns `true` iff this call claimed the node. #[inline] fn try_claim(atomic: &AtomicU8) -> bool { - atomic.fetch_or(0b0000_0100, Ordering::AcqRel) & 0b0000_0100 == 0 + atomic.fetch_or(IS_VISITED_MASK, Ordering::AcqRel) & IS_VISITED_MASK == 0 } fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer { @@ -567,9 +779,10 @@ fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer { } } -/// Returns `Some(i)` if exactly one of the four canonical neighbours exists in -/// the graph, where `i` is its index (0=A, 1=C, 2=G, 3=T). Returns `None` for -/// zero or ≥2 existing neighbours. +/// Returns the count of neighbors and the index of the first +/// neighbor if exactly one of the four canonical neighbours exists in +/// the graph, where `i` is its index (0=A, 1=C, 2=G, 3=T). +/// Returns `None` for count = 0 or ≥2 existing neighbours. fn count_neighbors( neighbors: [CanonicalKmer; 4], nodes: &FastHashMap, @@ -577,7 +790,10 @@ fn count_neighbors( let mut count = 0u8; let mut first = None; for (i, neighbour) in neighbors.iter().enumerate() { - if nodes.contains_key(neighbour) { + if let Some(a) = nodes.get(neighbour) { + if Node(a.load(Ordering::Relaxed)).is_visited() { + continue; + } count += 1; if first.is_none() { first = Some(i as u8); diff --git a/src/obikmer/src/cmd/unitig.rs b/src/obikmer/src/cmd/unitig.rs index 79f6ed2..cd5f92d 100644 --- a/src/obikmer/src/cmd/unitig.rs +++ b/src/obikmer/src/cmd/unitig.rs @@ -75,7 +75,9 @@ pub fn run(args: UnitigArgs) { let out = Mutex::new(BufWriter::new(io::stdout())); let j = AtomicUsize::new(0); - let write_unitig_fn = |unitig: obikseq::unitig::Unitig| { + let stage = Stage::start("enumerate unitigs"); + g.for_each_unitig(|nuc_iter| { + let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); let idx = j.fetch_add(1, Ordering::Relaxed); let mut w = out.lock().unwrap(); write_unitig(&unitig, k, 0, idx, &mut *w).unwrap_or_else(|e| { @@ -85,14 +87,7 @@ pub fn run(args: UnitigArgs) { if idx % 10_000 == 0 { pb.set_message(format!("{idx} unitigs written")); } - }; - - let stage = Stage::start("chain unitigs"); - g.par_for_each_chain_unitig(&write_unitig_fn); - rep.push(stage.stop()); - - let stage = Stage::start("remaining unitigs"); - g.for_each_remaining_unitig(&write_unitig_fn); + }); pb.finish_and_clear(); rep.push(stage.stop()); diff --git a/src/obikpartitionner/src/index_layer.rs b/src/obikpartitionner/src/index_layer.rs index 022ba95..6b87c99 100644 --- a/src/obikpartitionner/src/index_layer.rs +++ b/src/obikpartitionner/src/index_layer.rs @@ -104,9 +104,10 @@ impl KmerPartition { fs::create_dir_all(&layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?; - for unitig in g.iter_unitig() { - uw.write(&unitig)?; - } + g.try_for_each_unitig(|nuc_iter| { + let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); + uw.write(&unitig) + })?; uw.close()?; if with_counts { diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 3af515b..68373b0 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -202,9 +202,10 @@ impl KmerPartition { g.compute_degrees(); fs::create_dir_all(&new_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; - for unitig in g.iter_unitig() { - uw.write(&unitig)?; - } + g.try_for_each_unitig(|nuc_iter| { + let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); + uw.write(&unitig) + })?; uw.close()?; Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?; g.len() diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs index 714bde8..39a0994 100644 --- a/src/obikpartitionner/src/rebuild_layer.rs +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -154,9 +154,10 @@ impl KmerPartition { fs::create_dir_all(&dst_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; - for unitig in g.iter_unitig() { - uw.write(&unitig)?; - } + g.try_for_each_unitig(|nuc_iter| { + let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); + uw.write(&unitig) + })?; uw.close()?; drop(g); diff --git a/src/obikseq/src/packed_seq.rs b/src/obikseq/src/packed_seq.rs index f0888d4..136d500 100644 --- a/src/obikseq/src/packed_seq.rs +++ b/src/obikseq/src/packed_seq.rs @@ -115,6 +115,29 @@ impl PackedSeq { } Self::new(count_to_tail(seql), seq.into_boxed_slice()) } +} + +impl FromIterator for PackedSeq { + /// Build from a stream of 2-bit nucleotide values (0=A…3=T). + /// Packs on the fly — one allocation, no intermediate Vec. + fn from_iter>(iter: I) -> Self { + let iter = iter.into_iter(); + let (lower, _) = iter.size_hint(); + let mut seq: Vec = Vec::with_capacity(byte_len(lower.max(1))); + let mut seql = 0usize; + for nuc in iter { + if seql % 4 == 0 { + seq.push(0); + } + *seq.last_mut().unwrap() |= (nuc & 0b11) << (6 - 2 * (seql % 4)); + seql += 1; + } + debug_assert!(seql >= 1, "PackedSeq requires at least one nucleotide"); + Self::new(count_to_tail(seql), seq.into_boxed_slice()) + } +} + +impl PackedSeq { /// Write ASCII nucleotides into `writer`. Zero allocation. pub fn write_ascii(&self, writer: &mut W) -> io::Result<()> {