refactor: optimize unitig iteration and graph traversal

Switches unitig processing to a lazy, fallible `try_for_each_unitig` API across partitioner layers, reducing intermediate allocations and enabling proper error propagation. Refactors de Bruijn graph traversal into a two-pass algorithm with explicit node flags, named constants, and diagnostic logging. Introduces parallel chain processing and staged performance profiling for the unitig command, and adds a memory-efficient `FromIterator` implementation for packed nucleotide sequences.
This commit is contained in:
Eric Coissac
2026-06-05 18:31:52 +02:00
parent ea2c594c86
commit 27088ab810
6 changed files with 511 additions and 274 deletions
+472 -256
View File
@@ -1,9 +1,9 @@
//use ahash::RandomState; //use ahash::RandomState;
use hashbrown::HashMap; use hashbrown::HashMap;
use obikseq::k; use obikseq::k;
use obikseq::unitig::Unitig;
use obikseq::{CanonicalKmer, Kmer, Sequence}; use obikseq::{CanonicalKmer, Kmer, Sequence};
use rayon::prelude::*; use rayon::prelude::*;
use std::arch::naked_asm;
use std::fmt; use std::fmt;
use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::atomic::{AtomicU8, Ordering};
use xxhash_rust::xxh3::Xxh3Builder; use xxhash_rust::xxh3::Xxh3Builder;
@@ -20,7 +20,7 @@ type FastHashMap<K, V> = HashMap<K, V, Xxh3Builder>;
// bit 2 : visited // bit 2 : visited
// bits 34 : right_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 0 = 1 // bits 34 : right_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 0 = 1
// bits 56 : left_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 1 = 1 // bits 56 : left_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 1 = 1
// bit 7 : reserved (0) // bit 7 : marked as start node (1)
// //
// "can_extend" = false covers both 0 neighbours and ≥2 neighbours; the only // "can_extend" = false covers both 0 neighbours and ≥2 neighbours; the only
// information needed for traversal is "exactly one". // information needed for traversal is "exactly one".
@@ -29,46 +29,73 @@ type FastHashMap<K, V> = HashMap<K, V, Xxh3Builder>;
#[derive(Debug, Clone, Copy, Default)] #[derive(Debug, Clone, Copy, Default)]
pub struct Node(u8); pub struct Node(u8);
const CAN_EXTEND_RIGHT_MASK: u8 = 0b0000_0001; // bit 0: can_extend_right — exactly one right canonical neighbour exists
const CAN_EXTEND_LEFT_MASK: u8 = 0b0000_0010; // bit 1: can_extend_left — exactly one left canonical neighbour exists
const IS_VISITED_MASK: u8 = 0b0000_0100; // bit 2: visited
const RIGHT_NUC_MASK: u8 = 0b0001_1000; // bits 34: right_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 0 = 1
const LEFT_NUC_MASK: u8 = 0b0110_0000; // bits 56: left_nuc — index 03 (A/C/G/T) of that neighbour; valid iff bit 1 = 1
const IS_START_MASK: u8 = 0b1000_0000; // bit 7: marked as start node
impl Node { impl Node {
/// Returns `true` if the node can be extended to the right. /// Returns `true` if the node can be extended to the right.
/// ///
/// A single right neighbour exists. /// A single right neighbour exists.
#[inline]
pub fn can_extend_right(self) -> bool { pub fn can_extend_right(self) -> bool {
self.0 & 0b0000_0001 != 0 self.0 & CAN_EXTEND_RIGHT_MASK != 0
} }
/// Returns `true` if the node can be extended to the left. /// Returns `true` if the node can be extended to the left.
/// ///
/// A single left neighbour exists. /// A single left neighbour exists.
#[inline]
pub fn can_extend_left(self) -> bool { pub fn can_extend_left(self) -> bool {
self.0 & 0b0000_0010 != 0 self.0 & CAN_EXTEND_LEFT_MASK != 0
} }
/// Returns `true` if the node has been visited. /// Returns `true` if the node has been visited.
#[inline]
pub fn is_visited(self) -> bool { pub fn is_visited(self) -> bool {
self.0 & 0b0000_0100 != 0 self.0 & IS_VISITED_MASK != 0
}
/// Returns `true` if the node is a start node.
#[inline]
pub fn is_start(self) -> bool {
self.0 & IS_START_MASK != 0
}
#[inline]
pub fn set_start(&mut self) {
self.0 |= IS_START_MASK;
}
pub fn unset_start(&mut self) {
self.0 &= !IS_START_MASK;
} }
/// Index of the unique right neighbour (0=A, 1=C, 2=G, 3=T). /// Index of the unique right neighbour (0=A, 1=C, 2=G, 3=T).
/// Only meaningful when `can_extend_right()` is true. /// Only meaningful when `can_extend_right()` is true.
#[inline]
pub fn right_nuc(self) -> u8 { pub fn right_nuc(self) -> u8 {
debug_assert!(
self.can_extend_right(),
"from: right_nuc -> The node cannot be extended to the right"
);
(self.0 >> 3) & 0b11 (self.0 >> 3) & 0b11
} }
/// Index of the unique left neighbour (0=A, 1=C, 2=G, 3=T). /// Index of the unique left neighbour (0=A, 1=C, 2=G, 3=T).
/// Only meaningful when `can_extend_left()` is true. /// Only meaningful when `can_extend_left()` is true.
#[inline]
pub fn left_nuc(self) -> u8 { pub fn left_nuc(self) -> u8 {
debug_assert!(
self.can_extend_left(),
"from: left_nuc -> The node cannot be extended to the left"
);
(self.0 >> 5) & 0b11 (self.0 >> 5) & 0b11
} }
/// Marks the node as visited.
pub fn set_visited(&mut self) {
if self.is_visited() {
unreachable!("from: is_visited -> The node has already been visited")
}
self.0 |= 0b0000_0100;
}
/// Number of left neighbours. /// Number of left neighbours.
pub fn n_left_neighbours(self) -> u8 { pub fn n_left_neighbours(self) -> u8 {
if self.can_extend_left() { if self.can_extend_left() {
@@ -89,12 +116,22 @@ impl Node {
} }
} }
/// Marks the node as visited.
#[inline]
pub fn set_visited(&mut self) {
debug_assert!(
!self.is_visited(),
"from: is_visited -> The node has already been visited"
);
self.0 |= IS_VISITED_MASK;
}
/// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 34 = nucleotide index). /// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 34 = nucleotide index).
/// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 34 as count.sat_sub(1). /// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 34 as count.sat_sub(1).
pub fn set_right(&mut self, count: u8, nuc: Option<u8>) { pub fn set_right(&mut self, count: u8, nuc: Option<u8>) {
self.0 &= !(0b0000_0001 | 0b001_1000); self.0 &= !(CAN_EXTEND_RIGHT_MASK | RIGHT_NUC_MASK);
if count == 1 { if count == 1 {
self.0 |= 0b0000_0001; self.0 |= CAN_EXTEND_RIGHT_MASK;
if let Some(n) = nuc { if let Some(n) = nuc {
self.0 |= (n & 0b11) << 3; self.0 |= (n & 0b11) << 3;
return; return;
@@ -107,9 +144,9 @@ impl Node {
/// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 34 = nucleotide index). /// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 34 = nucleotide index).
/// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 34 as count.sat_sub(1). /// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 34 as count.sat_sub(1).
pub fn set_left(&mut self, count: u8, nuc: Option<u8>) { pub fn set_left(&mut self, count: u8, nuc: Option<u8>) {
self.0 &= !(0b0000_0010 | 0b0110_0000); self.0 &= !(CAN_EXTEND_LEFT_MASK | LEFT_NUC_MASK);
if count == 1 { if count == 1 {
self.0 |= 0b0000_0010; self.0 |= CAN_EXTEND_LEFT_MASK;
if let Some(n) = nuc { if let Some(n) = nuc {
self.0 |= (n & 0b11) << 5; self.0 |= (n & 0b11) << 5;
return; return;
@@ -151,6 +188,16 @@ impl GraphDeBruijn {
} }
} }
fn for_each_node<F>(&self, f: F)
where
F: Fn(&CanonicalKmer, &AtomicU8) + Sync,
{
#[cfg(not(any(test, feature = "test-utils")))]
self.nodes.par_iter().for_each(|(k, a)| f(k, a));
#[cfg(any(test, feature = "test-utils"))]
self.nodes.iter().for_each(|(k, a)| f(k, a));
}
pub fn with_capacity(capacity: usize) -> Self { pub fn with_capacity(capacity: usize) -> Self {
Self { Self {
nodes: FastHashMap::with_capacity_and_hasher(capacity, Xxh3Builder::new()), nodes: FastHashMap::with_capacity_and_hasher(capacity, Xxh3Builder::new()),
@@ -169,25 +216,36 @@ impl GraphDeBruijn {
/// written by exactly one thread). In test builds, runs sequentially to /// written by exactly one thread). In test builds, runs sequentially to
/// avoid propagating thread-local k/m values to rayon worker threads. /// avoid propagating thread-local k/m values to rayon worker threads.
pub fn compute_degrees(&self) { pub fn compute_degrees(&self) {
#[cfg(not(any(test, feature = "test-utils")))] // Pass 1: count right/left neighbors for each node
self.nodes.par_iter().for_each(|(&kmer, atomic)| {
self.for_each_node(|kmer, atomic| {
let mut old = Node(atomic.load(Ordering::Relaxed));
if old.is_visited() {
old.unset_start();
atomic.store(old.0, Ordering::Relaxed);
return;
}
let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes); let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes);
let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes); let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes);
let mut node = Node(atomic.load(Ordering::Relaxed)); let mut node = Node(0); // reset all bits (visited=0, start=0)
node.set_right(rc, rn); node.set_right(rc, rn);
node.set_left(lc, ln); node.set_left(lc, ln);
atomic.store(node.0, Ordering::Relaxed); atomic.store(node.0, Ordering::Relaxed);
}); });
#[cfg(any(test, feature = "test-utils"))] // Pass 2: mark start nodes
for (&kmer, atomic) in &self.nodes {
let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes); self.for_each_node(|kmer, atomic| {
let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes);
let mut node = Node(atomic.load(Ordering::Relaxed)); let mut node = Node(atomic.load(Ordering::Relaxed));
node.set_right(rc, rn); if node.is_visited() {
node.set_left(lc, ln); return;
atomic.store(node.0, Ordering::Relaxed); }
} if self.is_start(*kmer, node) {
node.set_start();
node.set_visited();
atomic.store(node.0, Ordering::Relaxed);
}
});
} }
/// Iterates over the right neighbors of `kmer`. /// Iterates over the right neighbors of `kmer`.
@@ -217,49 +275,17 @@ impl GraphDeBruijn {
} }
pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> { pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> {
self.nodes.get(kmer).map(|a| Node(a.load(Ordering::Relaxed)).is_visited()) self.nodes
.get(kmer)
.map(|a| Node(a.load(Ordering::Relaxed)).is_visited())
} }
pub fn set_visited(&self, kmer: CanonicalKmer) { pub fn set_visited(&self, kmer: CanonicalKmer) {
if let Some(a) = self.nodes.get(&kmer) { if let Some(a) = self.nodes.get(&kmer) {
a.fetch_or(0b0000_0100, Ordering::AcqRel); a.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
} }
} }
/// Returns the single right neighbor of `kmer`, if it exists.
pub fn the_single_right_neighbor(&self, kmer: CanonicalKmer) -> Option<CanonicalKmer> {
let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed));
if !node.can_extend_right() {
return None;
}
let next = kmer.into_kmer().push_right(node.right_nuc()).canonical();
self.nodes.contains_key(&next).then_some(next)
}
/// Returns the single left neighbor of `kmer`, if it exists.
pub fn the_single_left_neighbor(&self, kmer: CanonicalKmer) -> Option<CanonicalKmer> {
let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed));
if !node.can_extend_left() {
return None;
}
let next = kmer.into_kmer().push_left(node.left_nuc()).canonical();
self.nodes.contains_key(&next).then_some(next)
}
/// Internal iterator over unitig-start nodes; drives `iter_unitig`.
///
/// MUST NOT be consumed standalone: the second pass finds cycle nodes only
/// because `iter_unitig` lazily interleaves chain traversal between the two passes.
///
/// Two passes:
/// 1. Chain ends / isolated nodes (at most one extension missing):
/// - `!can_extend_left` → yield canonical form
/// - `!can_extend_right` → yield reverse complement
/// 2. Nodes still unvisited → part of a cycle; yield canonical form.
fn start_iter(&self) -> impl Iterator<Item = (CanonicalKmer, Option<Kmer>)> + '_ {
StartIter::new(self)
}
fn next_unitig_kmer(&self, kmer: Kmer) -> Option<Kmer> { fn next_unitig_kmer(&self, kmer: Kmer) -> Option<Kmer> {
let canonical = kmer.canonical(); let canonical = kmer.canonical();
let node = Node(self.nodes.get(&canonical)?.load(Ordering::Relaxed)); let node = Node(self.nodes.get(&canonical)?.load(Ordering::Relaxed));
@@ -307,17 +333,305 @@ impl GraphDeBruijn {
} }
} }
pub fn iter_unitig(&self) -> impl Iterator<Item = Unitig> + '_ { fn unitig_nucleotides(&self, start: CanonicalKmer, node: Node, k: usize) -> UnitigNucIter<'_> {
let chain = if node.can_extend_right() {
let next_c = start.into_kmer().push_right(node.right_nuc()).canonical();
self.nodes.get(&next_c).and_then(|next_a| {
let old = next_a.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
if old & IS_VISITED_MASK == 0 {
let oriented = oriented_next(start.into_kmer(), next_c);
Some(self.iter_unitig_kmers(oriented))
} else {
None
}
})
} else {
None
};
UnitigNucIter {
start,
pos: 0,
k,
chain,
}
}
pub fn for_each_unitig(&self, f: impl Fn(UnitigNucIter<'_>) + Sync) {
let k = k(); let k = k();
self.start_iter().map(move |(start, first_next)| { let n_chains = std::sync::atomic::AtomicUsize::new(0);
let mut nucs: Vec<u8> = (0..k).map(|i| start.nucleotide(i)).collect(); let n2 = std::sync::atomic::AtomicUsize::new(0);
if let Some(next_c) = first_next {
for kmer in self.iter_unitig_kmers(next_c) { // Boucle unique : traiter les starts, recalculer les arités, recommencer
nucs.push(kmer.nucleotide(k - 1)); let mut pass = 0usize;
loop {
let n_new = std::sync::atomic::AtomicUsize::new(0);
#[cfg(not(any(test, feature = "test-utils")))]
self.nodes
.par_iter()
.filter_map(|(&kmer, atomic)| {
let node = Node(atomic.load(Ordering::Acquire));
node.is_start().then_some((kmer, node))
})
.for_each(|(start, node)| {
n_new.fetch_add(1, Ordering::Relaxed);
f(self.unitig_nucleotides(start, node, k));
});
#[cfg(any(test, feature = "test-utils"))]
self.nodes.iter().for_each(|(&kmer, atomic)| {
let node = Node(atomic.load(Ordering::Acquire));
if node.is_start() {
n_new.fetch_add(1, Ordering::Relaxed);
f(self.unitig_nucleotides(kmer, node, k));
}
});
let n = n_new.load(Ordering::Relaxed);
eprintln!("[for_each_unitig] pass {}: {} starts", pass, n);
n_chains.fetch_add(n, Ordering::Relaxed);
pass += 1;
if n == 0 {
break;
}
self.compute_degrees();
}
// #[cfg(debug_assertions)]
{
let mut residual = GraphDeBruijn {
nodes: FastHashMap::with_hasher(Xxh3Builder::new()),
};
for (&kmer, atomic) in &self.nodes {
if !Node(atomic.load(Ordering::Relaxed)).is_visited() {
residual.nodes.insert(kmer, AtomicU8::new(0));
} }
} }
Unitig::from_nucleotides(&nucs)
}) let mut sample = 0;
for (&kmer, _) in residual.nodes.iter().take(1000) {
let left = kmer.left_canonical_neighbors();
let real_left = left
.iter()
.filter(|&nb| residual.nodes.contains_key(nb))
.count();
let right = kmer.right_canonical_neighbors();
let real_right = right
.iter()
.filter(|&nb| residual.nodes.contains_key(nb))
.count();
if real_left != 1 || real_right == 1 {
// normal
} else {
sample += 1;
if sample <= 10 {
eprintln!("Kmer {:?}: left={} right={}", kmer, real_left, real_right);
}
}
}
residual.compute_degrees();
let mut n_starts = 0usize;
let mut n_no_right = 0usize;
let mut n_no_left = 0usize;
for (_, a) in &residual.nodes {
let node = Node(a.load(Ordering::Relaxed));
if node.is_start() {
n_starts += 1;
}
if !node.can_extend_right() {
n_no_right += 1;
}
if !node.can_extend_left() {
n_no_left += 1;
}
}
eprintln!(
"[for_each_unitig] residual after cascade: {} nodes | starts={} no_right={} no_left={}",
residual.nodes.len(),
n_starts,
n_no_right,
n_no_left,
);
let mut left_mismatch = 0;
let mut right_mismatch = 0;
let mut total_left = 0;
let mut total_right = 0;
let n_residual = residual.nodes.len();
for (&kmer, atomic) in &residual.nodes {
let node = Node(atomic.load(Ordering::Relaxed));
let left_neighbors = kmer.left_canonical_neighbors();
let actual_left = left_neighbors
.iter()
.filter(|&nb| residual.nodes.contains_key(nb))
.count();
total_left += actual_left;
if (actual_left == 1) != node.can_extend_left() {
left_mismatch += 1;
}
let right_neighbors = kmer.right_canonical_neighbors();
let actual_right = right_neighbors
.iter()
.filter(|&nb| residual.nodes.contains_key(nb))
.count();
total_right += actual_right;
if (actual_right == 1) != node.can_extend_right() {
right_mismatch += 1;
}
}
eprintln!(
"[consistency] N={} total_left={} total_right={} left_mismatch={} right_mismatch={}",
n_residual, total_left, total_right, left_mismatch, right_mismatch
);
let mut real_starts = 0;
let mut sample = 0;
for (&kmer, atomic) in &residual.nodes {
let node = Node(atomic.load(Ordering::Relaxed));
// Calcul réel des voisins gauches (les 4 possibilités)
let left_neighbors = kmer.left_canonical_neighbors();
let real_left_count = left_neighbors
.iter()
.filter(|&nb| residual.nodes.contains_key(nb))
.count();
let right_neighbors = kmer.right_canonical_neighbors();
let real_right_count = right_neighbors
.iter()
.filter(|&nb| residual.nodes.contains_key(nb))
.count();
// Vérification de cohérence avec les flags
if (real_left_count == 1) != node.can_extend_left() {
eprintln!(
"Incoherence left for {:?}: real={}, flag={}",
kmer,
real_left_count,
node.can_extend_left()
);
}
if (real_right_count == 1) != node.can_extend_right() {
eprintln!(
"Incoherence right for {:?}: real={}, flag={}",
kmer,
real_right_count,
node.can_extend_right()
);
}
// Détermination si c'est un start selon la définition avec les vrais voisins
let is_start = if real_left_count != 1 {
true
} else {
// Trouver l'unique prédécesseur réel
let pred = left_neighbors
.iter()
.find(|&nb| residual.nodes.contains_key(nb))
.unwrap();
let pred_node = Node(residual.nodes.get(pred).unwrap().load(Ordering::Relaxed));
let pred_right_neighbors = pred.right_canonical_neighbors();
let pred_real_right_count = pred_right_neighbors
.iter()
.filter(|&nb| residual.nodes.contains_key(nb))
.count();
pred_real_right_count != 1
};
if is_start {
real_starts += 1;
if sample < 10 {
sample += 1;
eprintln!(
"Real start: {:?}, left_count={}, right_count={}",
kmer, real_left_count, real_right_count
);
}
}
}
eprintln!("[real starts] count = {}", real_starts);
let mut ok = 0;
let mut missing_pred = 0;
let mut pred_visited = 0;
let mut pred_no_right = 0;
let mut mismatch = 0;
for (&kmer, atomic) in &residual.nodes {
let node = Node(atomic.load(Ordering::Relaxed));
if !node.can_extend_right() {
// Le prédécesseur unique (car can_extend_left est vrai pour tous)
let pred = kmer.left_canonical_neighbors()[node.left_nuc() as usize];
if let Some(pred_atomic) = residual.nodes.get(&pred) {
let pred_node = Node(pred_atomic.load(Ordering::Relaxed));
if pred_node.can_extend_right() {
let succ =
pred.right_canonical_neighbors()[pred_node.right_nuc() as usize];
if succ == kmer {
ok += 1;
} else {
mismatch += 1;
eprintln!(
"Mismatch: pred {:?} right_nuc={} -> {:?} != {:?}",
pred,
pred_node.right_nuc(),
succ,
kmer
);
}
} else {
pred_no_right += 1;
eprintln!("Pred {:?} has !can_extend_right", pred);
}
} else {
// Prédécesseur absent du résidu : vérifier s'il est visité
if let Some(orig) = self.nodes.get(&pred) {
if Node(orig.load(Ordering::Relaxed)).is_visited() {
pred_visited += 1;
} else {
missing_pred += 1;
}
} else {
missing_pred += 1;
}
}
}
}
eprintln!(
"[diagnostic] nodes without right: ok={} missing_pred={} pred_visited={} pred_no_right={} mismatch={}",
ok, missing_pred, pred_visited, pred_no_right, mismatch
);
}
// Pass 2: cycles and tails — always sequential
for (&kmer, atomic) in &self.nodes {
let node = Node(atomic.load(Ordering::Acquire));
if node.is_visited() {
continue;
}
let start = if !node.can_extend_right() && node.can_extend_left() {
self.find_left_chain_start(kmer)
} else {
kmer
};
let start_atomic = &self.nodes[&start];
let old = start_atomic.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
if old & IS_VISITED_MASK == 0 {
n2.fetch_add(1, Ordering::Relaxed);
f(self.unitig_nucleotides(start, Node(old), k));
}
}
eprintln!(
"[for_each_unitig] chains={} phase2={} total={}",
n_chains.load(Ordering::Relaxed),
n2.load(Ordering::Relaxed),
n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed),
);
} }
/// Merge `other` into `self`. /// Merge `other` into `self`.
@@ -331,111 +645,66 @@ impl GraphDeBruijn {
self.nodes.extend(other.nodes); self.nodes.extend(other.nodes);
} }
/// Build one unitig starting at `start`, whose node flags are `node` (pre-claim value). /// Returns `true` if `query` is a unitig start node:
/// The caller has already atomically claimed `start`; this method claims subsequent nodes.
///
/// The first right extension is always included (mirroring the original `start_iter`
/// behaviour). Branching checks for subsequent extensions are handled inside
/// `iter_unitig_kmers` → `next_unitig_kmer`.
fn collect_from_start(&self, start: CanonicalKmer, node: Node, k: usize) -> Unitig {
let mut nucs: Vec<u8> = (0..k).map(|i| start.nucleotide(i)).collect();
if node.can_extend_right() {
let next_c = start.into_kmer().push_right(node.right_nuc()).canonical();
if let Some(next_a) = self.nodes.get(&next_c) {
let old = next_a.fetch_or(0b0000_0100, Ordering::AcqRel);
if old & 0b0000_0100 == 0 {
let oriented = oriented_next(start.into_kmer(), next_c);
nucs.push(oriented.nucleotide(k - 1));
for kmer in self.iter_unitig_kmers(oriented) {
nucs.push(kmer.nucleotide(k - 1));
}
}
}
}
Unitig::from_nucleotides(&nucs)
}
/// Returns `true` if `start` is a unitig start node:
/// - no unique left predecessor (`!can_extend_left`), or /// - no unique left predecessor (`!can_extend_left`), or
/// - unique left predecessor exists but cannot extend right /// - unique left predecessor exists but cannot extend right
/// (i.e., no chain traversal from the left can reach `start`). /// (i.e., no chain traversal from the left can reach `start`).
fn is_start(&self, start: CanonicalKmer, node: Node) -> bool { fn find_left_chain_start(&self, kmer: CanonicalKmer) -> CanonicalKmer {
let mut current = kmer;
loop {
let node = Node(self.nodes[&current].load(Ordering::Acquire));
if !node.can_extend_left() {
return current;
}
let pred = current.into_kmer().push_left(node.left_nuc()).canonical();
let Some(pred_a) = self.nodes.get(&pred) else {
return current;
};
let pred_node = Node(pred_a.load(Ordering::Acquire));
if pred_node.is_visited() {
return current;
}
if !pred_node.can_extend_right() {
return current;
}
// Stop if asymmetry: pred's right canonical neighbor is not current
let pred_right = pred.into_kmer().push_right(pred_node.right_nuc()).canonical();
if pred_right != current {
return current;
}
current = pred;
}
}
fn is_start(&self, query: CanonicalKmer, node: Node) -> bool {
if !node.can_extend_left() { if !node.can_extend_left() {
return true; return true;
} }
let pred = start.into_kmer().push_left(node.left_nuc()).canonical(); let pred = query.into_kmer().push_left(node.left_nuc()).canonical();
self.nodes.get(&pred) self.nodes
.get(&pred)
.map(|a| !Node(a.load(Ordering::Acquire)).can_extend_right()) .map(|a| !Node(a.load(Ordering::Acquire)).can_extend_right())
.unwrap_or(false) .unwrap_or(false)
} }
/// Call `f` once per unitig. pub fn try_for_each_unitig<E, F>(&self, mut f: F) -> Result<(), E>
/// where
/// Uses the extended start definition: a node is a start if it has no unique F: FnMut(UnitigNucIter<'_>) -> Result<(), E>,
/// left predecessor, or if its left predecessor cannot extend right (so no chain {
/// traversal from the left could ever claim it).
///
/// Two-step execution to avoid races between start claiming and chain extension:
/// 1. Claim all starts atomically (parallel).
/// 2. Build and emit chains from claimed starts (parallel).
///
/// Parallel in production builds; sequential in test builds.
/// Must be called before [`for_each_remaining_unitig`].
pub fn par_for_each_chain_unitig(&self, f: impl Fn(Unitig) + Sync) {
let k = k();
#[cfg(not(any(test, feature = "test-utils")))]
{
// Step 1 — claim all starts in parallel.
let starts: Vec<CanonicalKmer> = self.nodes
.par_iter()
.filter_map(|(&start, atomic)| {
let node = Node(atomic.load(Ordering::Acquire));
if node.is_visited() || !self.is_start(start, node) {
return None;
}
let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel);
(old & 0b0000_0100 == 0).then_some(start)
})
.collect();
// Step 2 — build chains in parallel.
starts.into_par_iter().for_each(|start| {
let node = Node(self.nodes[&start].load(Ordering::Acquire));
f(self.collect_from_start(start, node, k));
});
}
#[cfg(any(test, feature = "test-utils"))]
{
let starts: Vec<CanonicalKmer> = self.nodes
.iter()
.filter_map(|(&start, atomic)| {
let node = Node(atomic.load(Ordering::Acquire));
if node.is_visited() || !self.is_start(start, node) {
return None;
}
let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel);
(old & 0b0000_0100 == 0).then_some(start)
})
.collect();
for start in starts {
let node = Node(self.nodes[&start].load(Ordering::Acquire));
f(self.collect_from_start(start, node, k));
}
}
}
/// Call `f` for each node still unvisited after [`par_for_each_chain_unitig`].
/// Handles true cycles and rare deeply-nested junctions. Always sequential.
pub fn for_each_remaining_unitig(&self, f: impl Fn(Unitig)) {
let k = k(); let k = k();
for (&kmer, atomic) in &self.nodes { for (&kmer, atomic) in &self.nodes {
let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel); let node = Node(atomic.load(Ordering::Acquire));
if old & 0b0000_0100 != 0 { continue; } if node.is_start() {
f(self.collect_from_start(kmer, Node(old), k)); f(self.unitig_nucleotides(kmer, node, k))?;
}
} }
for (&kmer, atomic) in &self.nodes {
let old = atomic.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
if old & IS_VISITED_MASK == 0 {
f(self.unitig_nucleotides(kmer, Node(old), k))?;
}
}
Ok(())
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
@@ -447,93 +716,6 @@ impl GraphDeBruijn {
} }
} }
// --- StartIter -----------------------------------------------------------------
struct StartIter<'a> {
graph: &'a GraphDeBruijn,
nodes: hashbrown::hash_map::Iter<'a, CanonicalKmer, AtomicU8>,
suspended: Vec<CanonicalKmer>,
in_cycle_pass: bool,
}
impl<'a> StartIter<'a> {
fn new(graph: &'a GraphDeBruijn) -> Self {
Self {
graph,
nodes: graph.nodes.iter(),
suspended: Vec::new(),
in_cycle_pass: false,
}
}
}
impl<'a> Iterator for StartIter<'a> {
type Item = (CanonicalKmer, Option<Kmer>);
fn next(&mut self) -> Option<(CanonicalKmer, Option<Kmer>)> {
loop {
let current = if let Some(k) = self.suspended.pop() {
k
} else {
match self.nodes.next() {
Some((&k, _)) => k,
None => {
if self.in_cycle_pass {
return None;
}
self.in_cycle_pass = true;
self.nodes = self.graph.nodes.iter();
match self.nodes.next() {
Some((&k, _)) => k,
None => return None,
}
}
}
};
let node = match self.graph.nodes.get(&current) {
Some(a) => Node(a.load(Ordering::Relaxed)),
None => continue,
};
if node.is_visited() {
continue;
}
if !self.in_cycle_pass && node.can_extend_left() {
continue;
}
self.graph.set_visited(current);
if let Some(next) = self.graph.the_single_right_neighbor(current) {
if self.graph.is_visited(&next).unwrap_or(true) {
return Some((current, None));
}
self.graph.set_visited(next);
let oriented = oriented_next(current.into_kmer(), next);
return Some((current, Some(oriented)));
}
let mut first_neighbor: Option<CanonicalKmer> = None;
for neighbor in self.graph.iter_right_neighbors(current) {
if self.graph.is_visited(&neighbor).unwrap_or(true) {
continue;
}
if first_neighbor.is_none() {
self.graph.set_visited(neighbor);
first_neighbor = Some(neighbor);
} else {
self.suspended.push(neighbor);
}
}
let oriented = match first_neighbor {
Some(neighbor) => Some(oriented_next(current.into_kmer(), neighbor)),
None => None,
};
return Some((current, oriented));
}
}
}
// ── UnitigIter ──────────────────────────────────────────────────────────────── // ── UnitigIter ────────────────────────────────────────────────────────────────
struct UnitigIter<'a> { struct UnitigIter<'a> {
@@ -551,12 +733,42 @@ impl Iterator for UnitigIter<'_> {
} }
} }
// ── UnitigNucIter ─────────────────────────────────────────────────────────────
pub struct UnitigNucIter<'a> {
start: CanonicalKmer,
pos: usize,
k: usize,
chain: Option<UnitigIter<'a>>,
}
impl Iterator for UnitigNucIter<'_> {
type Item = u8;
fn next(&mut self) -> Option<u8> {
if self.pos < self.k {
let nuc = self.start.nucleotide(self.pos);
self.pos += 1;
Some(nuc)
} else {
self.chain
.as_mut()?
.next()
.map(|kmer| kmer.nucleotide(self.k - 1))
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.k - self.pos, None)
}
}
// ── helpers ─────────────────────────────────────────────────────────────────── // ── helpers ───────────────────────────────────────────────────────────────────
/// Atomically set the visited bit. Returns `true` iff this call claimed the node. /// Atomically set the visited bit. Returns `true` iff this call claimed the node.
#[inline] #[inline]
fn try_claim(atomic: &AtomicU8) -> bool { fn try_claim(atomic: &AtomicU8) -> bool {
atomic.fetch_or(0b0000_0100, Ordering::AcqRel) & 0b0000_0100 == 0 atomic.fetch_or(IS_VISITED_MASK, Ordering::AcqRel) & IS_VISITED_MASK == 0
} }
fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer { fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer {
@@ -567,9 +779,10 @@ fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer {
} }
} }
/// Returns `Some(i)` if exactly one of the four canonical neighbours exists in /// Returns the count of neighbors and the index of the first
/// the graph, where `i` is its index (0=A, 1=C, 2=G, 3=T). Returns `None` for /// neighbor if exactly one of the four canonical neighbours exists in
/// zero or ≥2 existing neighbours. /// the graph, where `i` is its index (0=A, 1=C, 2=G, 3=T).
/// Returns `None` for count = 0 or ≥2 existing neighbours.
fn count_neighbors( fn count_neighbors(
neighbors: [CanonicalKmer; 4], neighbors: [CanonicalKmer; 4],
nodes: &FastHashMap<CanonicalKmer, AtomicU8>, nodes: &FastHashMap<CanonicalKmer, AtomicU8>,
@@ -577,7 +790,10 @@ fn count_neighbors(
let mut count = 0u8; let mut count = 0u8;
let mut first = None; let mut first = None;
for (i, neighbour) in neighbors.iter().enumerate() { for (i, neighbour) in neighbors.iter().enumerate() {
if nodes.contains_key(neighbour) { if let Some(a) = nodes.get(neighbour) {
if Node(a.load(Ordering::Relaxed)).is_visited() {
continue;
}
count += 1; count += 1;
if first.is_none() { if first.is_none() {
first = Some(i as u8); first = Some(i as u8);
+4 -9
View File
@@ -75,7 +75,9 @@ pub fn run(args: UnitigArgs) {
let out = Mutex::new(BufWriter::new(io::stdout())); let out = Mutex::new(BufWriter::new(io::stdout()));
let j = AtomicUsize::new(0); let j = AtomicUsize::new(0);
let write_unitig_fn = |unitig: obikseq::unitig::Unitig| { let stage = Stage::start("enumerate unitigs");
g.for_each_unitig(|nuc_iter| {
let unitig: obikseq::unitig::Unitig = nuc_iter.collect();
let idx = j.fetch_add(1, Ordering::Relaxed); let idx = j.fetch_add(1, Ordering::Relaxed);
let mut w = out.lock().unwrap(); let mut w = out.lock().unwrap();
write_unitig(&unitig, k, 0, idx, &mut *w).unwrap_or_else(|e| { write_unitig(&unitig, k, 0, idx, &mut *w).unwrap_or_else(|e| {
@@ -85,14 +87,7 @@ pub fn run(args: UnitigArgs) {
if idx % 10_000 == 0 { if idx % 10_000 == 0 {
pb.set_message(format!("{idx} unitigs written")); pb.set_message(format!("{idx} unitigs written"));
} }
}; });
let stage = Stage::start("chain unitigs");
g.par_for_each_chain_unitig(&write_unitig_fn);
rep.push(stage.stop());
let stage = Stage::start("remaining unitigs");
g.for_each_remaining_unitig(&write_unitig_fn);
pb.finish_and_clear(); pb.finish_and_clear();
rep.push(stage.stop()); rep.push(stage.stop());
+4 -3
View File
@@ -104,9 +104,10 @@ impl KmerPartition {
fs::create_dir_all(&layer_dir)?; fs::create_dir_all(&layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?; let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?;
for unitig in g.iter_unitig() { g.try_for_each_unitig(|nuc_iter| {
uw.write(&unitig)?; let unitig: obikseq::unitig::Unitig = nuc_iter.collect();
} uw.write(&unitig)
})?;
uw.close()?; uw.close()?;
if with_counts { if with_counts {
+4 -3
View File
@@ -202,9 +202,10 @@ impl KmerPartition {
g.compute_degrees(); g.compute_degrees();
fs::create_dir_all(&new_layer_dir)?; fs::create_dir_all(&new_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?;
for unitig in g.iter_unitig() { g.try_for_each_unitig(|nuc_iter| {
uw.write(&unitig)?; let unitig: obikseq::unitig::Unitig = nuc_iter.collect();
} uw.write(&unitig)
})?;
uw.close()?; uw.close()?;
Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?; Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
g.len() g.len()
+4 -3
View File
@@ -154,9 +154,10 @@ impl KmerPartition {
fs::create_dir_all(&dst_layer_dir)?; fs::create_dir_all(&dst_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?;
for unitig in g.iter_unitig() { g.try_for_each_unitig(|nuc_iter| {
uw.write(&unitig)?; let unitig: obikseq::unitig::Unitig = nuc_iter.collect();
} uw.write(&unitig)
})?;
uw.close()?; uw.close()?;
drop(g); drop(g);
+23
View File
@@ -115,6 +115,29 @@ impl PackedSeq {
} }
Self::new(count_to_tail(seql), seq.into_boxed_slice()) Self::new(count_to_tail(seql), seq.into_boxed_slice())
} }
}
impl FromIterator<u8> for PackedSeq {
/// Build from a stream of 2-bit nucleotide values (0=A…3=T).
/// Packs on the fly — one allocation, no intermediate Vec.
fn from_iter<I: IntoIterator<Item = u8>>(iter: I) -> Self {
let iter = iter.into_iter();
let (lower, _) = iter.size_hint();
let mut seq: Vec<u8> = Vec::with_capacity(byte_len(lower.max(1)));
let mut seql = 0usize;
for nuc in iter {
if seql % 4 == 0 {
seq.push(0);
}
*seq.last_mut().unwrap() |= (nuc & 0b11) << (6 - 2 * (seql % 4));
seql += 1;
}
debug_assert!(seql >= 1, "PackedSeq requires at least one nucleotide");
Self::new(count_to_tail(seql), seq.into_boxed_slice())
}
}
impl PackedSeq {
/// Write ASCII nucleotides into `writer`. Zero allocation. /// Write ASCII nucleotides into `writer`. Zero allocation.
pub fn write_ascii<W: Write>(&self, writer: &mut W) -> io::Result<()> { pub fn write_ascii<W: Write>(&self, writer: &mut W) -> io::Result<()> {