diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index f96958c..c0b151a 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -1,8 +1,7 @@ //use ahash::RandomState; use hashbrown::HashMap; use obikseq::k; -use obikseq::unitig::Unitig; -use obikseq::{CanonicalKmer, Kmer, Sequence}; +use obikseq::{CanonicalKmer, Sequence}; use rayon::prelude::*; use std::fmt; use std::sync::atomic::{AtomicU8, Ordering}; @@ -20,7 +19,7 @@ type FastHashMap = HashMap; // bit 2 : visited // bits 3–4 : right_nuc — index 0–3 (A/C/G/T) of that neighbour; valid iff bit 0 = 1 // bits 5–6 : left_nuc — index 0–3 (A/C/G/T) of that neighbour; valid iff bit 1 = 1 -// bit 7 : reserved (0) +// bit 7 : marked as start node (1) // // "can_extend" = false covers both 0 neighbours and ≥2 neighbours; the only // information needed for traversal is "exactly one". @@ -29,46 +28,73 @@ type FastHashMap = HashMap; #[derive(Debug, Clone, Copy, Default)] pub struct Node(u8); +const CAN_EXTEND_RIGHT_MASK: u8 = 0b0000_0001; // bit 0: can_extend_right — exactly one right canonical neighbour exists +const CAN_EXTEND_LEFT_MASK: u8 = 0b0000_0010; // bit 1: can_extend_left — exactly one left canonical neighbour exists +const IS_VISITED_MASK: u8 = 0b0000_0100; // bit 2: visited +const RIGHT_NUC_MASK: u8 = 0b0001_1000; // bits 3–4: right_nuc — index 0–3 (A/C/G/T) of that neighbour; valid iff bit 0 = 1 +const LEFT_NUC_MASK: u8 = 0b0110_0000; // bits 5–6: left_nuc — index 0–3 (A/C/G/T) of that neighbour; valid iff bit 1 = 1 +const IS_START_MASK: u8 = 0b1000_0000; // bit 7: marked as start node + impl Node { /// Returns `true` if the node can be extended to the right. /// /// A single right neighbour exists. + #[inline] pub fn can_extend_right(self) -> bool { - self.0 & 0b0000_0001 != 0 + self.0 & CAN_EXTEND_RIGHT_MASK != 0 } /// Returns `true` if the node can be extended to the left. /// /// A single left neighbour exists. + #[inline] pub fn can_extend_left(self) -> bool { - self.0 & 0b0000_0010 != 0 + self.0 & CAN_EXTEND_LEFT_MASK != 0 } /// Returns `true` if the node has been visited. + #[inline] pub fn is_visited(self) -> bool { - self.0 & 0b0000_0100 != 0 + self.0 & IS_VISITED_MASK != 0 + } + + /// Returns `true` if the node is a start node. + #[inline] + pub fn is_start(self) -> bool { + self.0 & IS_START_MASK != 0 + } + + #[inline] + pub fn set_start(&mut self) { + self.0 |= IS_START_MASK; + } + + pub fn unset_start(&mut self) { + self.0 &= !IS_START_MASK; } /// Index of the unique right neighbour (0=A, 1=C, 2=G, 3=T). /// Only meaningful when `can_extend_right()` is true. + #[inline] pub fn right_nuc(self) -> u8 { + debug_assert!( + self.can_extend_right(), + "from: right_nuc -> The node cannot be extended to the right" + ); (self.0 >> 3) & 0b11 } /// Index of the unique left neighbour (0=A, 1=C, 2=G, 3=T). /// Only meaningful when `can_extend_left()` is true. + #[inline] pub fn left_nuc(self) -> u8 { + debug_assert!( + self.can_extend_left(), + "from: left_nuc -> The node cannot be extended to the left" + ); (self.0 >> 5) & 0b11 } - /// Marks the node as visited. - pub fn set_visited(&mut self) { - if self.is_visited() { - unreachable!("from: is_visited -> The node has already been visited") - } - self.0 |= 0b0000_0100; - } - /// Number of left neighbours. pub fn n_left_neighbours(self) -> u8 { if self.can_extend_left() { @@ -89,12 +115,22 @@ impl Node { } } + /// Marks the node as visited. + #[inline] + pub fn set_visited(&mut self) { + debug_assert!( + !self.is_visited(), + "from: is_visited -> The node has already been visited" + ); + self.0 |= IS_VISITED_MASK; + } + /// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 3–4 = nucleotide index). /// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 3–4 as count.sat_sub(1). pub fn set_right(&mut self, count: u8, nuc: Option) { - self.0 &= !(0b0000_0001 | 0b001_1000); + self.0 &= !(CAN_EXTEND_RIGHT_MASK | RIGHT_NUC_MASK); if count == 1 { - self.0 |= 0b0000_0001; + self.0 |= CAN_EXTEND_RIGHT_MASK; if let Some(n) = nuc { self.0 |= (n & 0b11) << 3; return; @@ -107,9 +143,9 @@ impl Node { /// `nuc` = Some(i) → exactly one neighbour (bit 0 set, bits 3–4 = nucleotide index). /// `nuc` = None → 0 or ≥2 neighbours; `count` encoded in bits 3–4 as count.sat_sub(1). pub fn set_left(&mut self, count: u8, nuc: Option) { - self.0 &= !(0b0000_0010 | 0b0110_0000); + self.0 &= !(CAN_EXTEND_LEFT_MASK | LEFT_NUC_MASK); if count == 1 { - self.0 |= 0b0000_0010; + self.0 |= CAN_EXTEND_LEFT_MASK; if let Some(n) = nuc { self.0 |= (n & 0b11) << 5; return; @@ -138,6 +174,60 @@ impl fmt::Display for Node { } } +pub struct WalkState { + kmer: CanonicalKmer, + node: Node, + direct: bool, +} + +impl WalkState { + pub fn new(kmer: CanonicalKmer, node: Node, direct: bool) -> Self { + debug_assert!(!node.is_visited(), "Cannot walk over a visited node"); + Self { kmer, node, direct } + } + + pub fn leavable(&self, graph: &GraphDeBruijn) -> bool { + self.walk(graph).is_some() + } + + pub fn reachable(&self, graph: &GraphDeBruijn) -> bool { + WalkState { kmer: self.kmer, node: self.node, direct: !self.direct } + .leavable(graph) + } + + pub fn walk(&self, graph: &GraphDeBruijn) -> Option<(WalkState, u8)> { + if self.direct { + if !self.node.can_extend_right() { + return None; + } + let nuc = self.node.right_nuc(); + let next = self.kmer.into_kmer().push_right(nuc); + let cnext = next.canonical(); + let dnext = next.raw() == cnext.raw(); + let next_node = Node(graph.nodes.get(&cnext).unwrap().load(Ordering::Relaxed)); + if next_node.is_visited() { + return None; + } + let reachable = if dnext { next_node.can_extend_left() } else { next_node.can_extend_right() }; + reachable.then_some((WalkState { kmer: cnext, node: next_node, direct: dnext }, nuc)) + } else { + if !self.node.can_extend_left() { + return None; + } + let nuc = self.node.left_nuc(); + let next = self.kmer.into_kmer().push_left(nuc); + let cnext = next.canonical(); + let dnext = next.raw() != cnext.raw(); + let next_node = Node(graph.nodes.get(&cnext).unwrap().load(Ordering::Relaxed)); + if next_node.is_visited() { + return None; + } + let reachable = if dnext { next_node.can_extend_right() } else { next_node.can_extend_left() }; + reachable.then_some((WalkState { kmer: cnext, node: next_node, direct: dnext }, 3 - nuc)) + } + } +} + // ── GraphDeBruijn ───────────────────────────────────────────────────────────── pub struct GraphDeBruijn { @@ -151,6 +241,16 @@ impl GraphDeBruijn { } } + fn for_each_node(&self, f: F) + where + F: Fn(&CanonicalKmer, &AtomicU8) + Sync, + { + #[cfg(not(any(test, feature = "test-utils")))] + self.nodes.par_iter().for_each(|(k, a)| f(k, a)); + #[cfg(any(test, feature = "test-utils"))] + self.nodes.iter().for_each(|(k, a)| f(k, a)); + } + pub fn with_capacity(capacity: usize) -> Self { Self { nodes: FastHashMap::with_capacity_and_hasher(capacity, Xxh3Builder::new()), @@ -168,156 +268,154 @@ impl GraphDeBruijn { /// In production builds, runs in parallel across all nodes (each entry is /// written by exactly one thread). In test builds, runs sequentially to /// avoid propagating thread-local k/m values to rayon worker threads. - pub fn compute_degrees(&self) { - #[cfg(not(any(test, feature = "test-utils")))] - self.nodes.par_iter().for_each(|(&kmer, atomic)| { - let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes); - let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes); - let mut node = Node(atomic.load(Ordering::Relaxed)); + pub fn compute_degrees_and_mark_starts(&self) { + // Pass 1: count right/left neighbors for each node + + self.for_each_node(|kmer, atomic| { + let mut old = Node(atomic.load(Ordering::Relaxed)); + if old.is_visited() { + old.unset_start(); + atomic.store(old.0, Ordering::Relaxed); + return; + } + let (rc, rn) = count_neighbors(&kmer.right_canonical_neighbors(), &self.nodes); + let (lc, ln) = count_neighbors(&kmer.left_canonical_neighbors(), &self.nodes); + let mut node = Node(0); // reset all bits (visited=0, start=0) node.set_right(rc, rn); node.set_left(lc, ln); atomic.store(node.0, Ordering::Relaxed); }); - #[cfg(any(test, feature = "test-utils"))] - for (&kmer, atomic) in &self.nodes { - let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes); - let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes); + // Pass 2: mark start nodes + + self.for_each_node(|kmer, atomic| { let mut node = Node(atomic.load(Ordering::Relaxed)); - node.set_right(rc, rn); - node.set_left(lc, ln); - atomic.store(node.0, Ordering::Relaxed); - } - } - - /// Iterates over the right neighbors of `kmer`. - pub fn iter_right_neighbors( - &self, - kmer: CanonicalKmer, - ) -> impl Iterator + '_ { - kmer.right_canonical_neighbors() - .into_iter() - .filter_map(|kmer| { - self.nodes.get(&kmer)?; - Some(kmer) - }) - } - - /// Iterates over the left neighbors of `kmer`. - pub fn iter_left_neighbors( - &self, - kmer: CanonicalKmer, - ) -> impl Iterator + '_ { - kmer.left_canonical_neighbors() - .into_iter() - .filter_map(|kmer| { - self.nodes.get(&kmer)?; - Some(kmer) - }) + if node.is_visited() { + return; + } + if self.is_start(*kmer, node) { + node.set_start(); + atomic.store(node.0, Ordering::Relaxed); + } + }); } pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option { - self.nodes.get(kmer).map(|a| Node(a.load(Ordering::Relaxed)).is_visited()) + self.nodes + .get(kmer) + .map(|a| Node(a.load(Ordering::Relaxed)).is_visited()) } pub fn set_visited(&self, kmer: CanonicalKmer) { if let Some(a) = self.nodes.get(&kmer) { - a.fetch_or(0b0000_0100, Ordering::AcqRel); + a.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); } } - /// Returns the single right neighbor of `kmer`, if it exists. - pub fn the_single_right_neighbor(&self, kmer: CanonicalKmer) -> Option { + pub fn start_walk(&self, kmer: CanonicalKmer) -> WalkState { + let node = Node( + self.nodes + .get(&kmer) + .expect("Cannot start a walk from not indexed kmer") + .load(Ordering::Relaxed), + ); + debug_assert!( + !node.is_visited(), + "Cannot start a walk from a visited node" + ); + WalkState::new(kmer, node, true) + } + + pub fn try_start_walk(&self, kmer: CanonicalKmer) -> Option { let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed)); - if !node.can_extend_right() { + if node.is_visited() { return None; } - let next = kmer.into_kmer().push_right(node.right_nuc()).canonical(); - self.nodes.contains_key(&next).then_some(next) + Some(WalkState::new(kmer, node, true)) } - /// Returns the single left neighbor of `kmer`, if it exists. - pub fn the_single_left_neighbor(&self, kmer: CanonicalKmer) -> Option { - let node = Node(self.nodes.get(&kmer)?.load(Ordering::Relaxed)); - if !node.can_extend_left() { - return None; - } - let next = kmer.into_kmer().push_left(node.left_nuc()).canonical(); - self.nodes.contains_key(&next).then_some(next) + fn unitig_nucleotides(&self, kmer: CanonicalKmer, k: usize) -> Option> { + let old = self.nodes.get(&kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); + if old & IS_VISITED_MASK != 0 { return None; } + let start = WalkState::new(kmer, Node(old), true); + let next_step = start.walk(self).and_then(|(next_state, nuc)| { + let ext_old = self.nodes.get(&next_state.kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); + (ext_old & IS_VISITED_MASK == 0).then_some((next_state, nuc)) + }); + Some(UnitigNucIter { graph: self, start: kmer, pos: 0, k, next_step }) } - /// Internal iterator over unitig-start nodes; drives `iter_unitig`. - /// - /// MUST NOT be consumed standalone: the second pass finds cycle nodes only - /// because `iter_unitig` lazily interleaves chain traversal between the two passes. - /// - /// Two passes: - /// 1. Chain ends / isolated nodes (at most one extension missing): - /// - `!can_extend_left` → yield canonical form - /// - `!can_extend_right` → yield reverse complement - /// 2. Nodes still unvisited → part of a cycle; yield canonical form. - fn start_iter(&self) -> impl Iterator)> + '_ { - StartIter::new(self) - } - - fn next_unitig_kmer(&self, kmer: Kmer) -> Option { - let canonical = kmer.canonical(); - let node = Node(self.nodes.get(&canonical)?.load(Ordering::Relaxed)); - - let direct = kmer.raw() == canonical.raw(); - - if (direct && !node.can_extend_right()) || (!direct && !node.can_extend_left()) { - return None; - } - - let next_c: CanonicalKmer = if direct { - canonical - .into_kmer() - .push_right(node.right_nuc()) - .canonical() - } else { - canonical.into_kmer().push_left(node.left_nuc()).canonical() - }; - - let atomic = self.nodes.get(&next_c)?; - let next_node = Node(atomic.load(Ordering::Relaxed)); - if next_node.is_visited() { - return None; - } - - let oriented = oriented_next(kmer, next_c); - let ndirect = oriented.raw() == next_c.raw(); - - if (ndirect && next_node.n_right_neighbours() > 1) - || (!ndirect && next_node.n_left_neighbours() > 1) - { - return None; - } - - if !try_claim(atomic) { - return None; - } - Some(oriented) - } - - fn iter_unitig_kmers(&self, start: Kmer) -> UnitigIter<'_> { - UnitigIter { - graph: self, - current: Some(start), - } - } - - pub fn iter_unitig(&self) -> impl Iterator + '_ { + pub fn for_each_unitig(&self, f: impl Fn(UnitigNucIter<'_>) + Sync) { let k = k(); - self.start_iter().map(move |(start, first_next)| { - let mut nucs: Vec = (0..k).map(|i| start.nucleotide(i)).collect(); - if let Some(next_c) = first_next { - for kmer in self.iter_unitig_kmers(next_c) { - nucs.push(kmer.nucleotide(k - 1)); + let n_chains = std::sync::atomic::AtomicUsize::new(0); + let n2 = std::sync::atomic::AtomicUsize::new(0); + + // Boucle unique : traiter les starts, recalculer les arités, recommencer + let mut pass = 0usize; + loop { + let n_new = std::sync::atomic::AtomicUsize::new(0); + + #[cfg(not(any(test, feature = "test-utils")))] + self.nodes + .par_iter() + .filter_map(|(&kmer, atomic)| { + Node(atomic.load(Ordering::Acquire)).is_start().then_some(kmer) + }) + .for_each(|kmer| { + if let Some(iter) = self.unitig_nucleotides(kmer, k) { + n_new.fetch_add(1, Ordering::Relaxed); + f(iter); + } + }); + + #[cfg(any(test, feature = "test-utils"))] + self.nodes.iter().for_each(|(&kmer, atomic)| { + if Node(atomic.load(Ordering::Acquire)).is_start() { + if let Some(iter) = self.unitig_nucleotides(kmer, k) { + n_new.fetch_add(1, Ordering::Relaxed); + f(iter); + } + } + }); + + let n = n_new.load(Ordering::Relaxed); + eprintln!("[for_each_unitig] pass {}: {} starts", pass, n); + n_chains.fetch_add(n, Ordering::Relaxed); + pass += 1; + if n == 0 { + break; + } + self.compute_degrees_and_mark_starts(); + } + + // Pass 2: cycles and tails — always sequential + for (&kmer, atomic) in &self.nodes { + let node = Node(atomic.load(Ordering::Acquire)); + if node.is_visited() { + continue; + } + let chain_start = self.find_chain_start(kmer); + if let Some(iter) = self.unitig_nucleotides(chain_start, k) { + n2.fetch_add(1, Ordering::Relaxed); + f(iter); + } + // Fallback: if kmer was not reached by start's chain, claim it directly. + // Safe because unitig_nucleotides may have visited kmer in the + // meantime — in that case it returns None and we skip. + if chain_start != kmer { + if let Some(iter) = self.unitig_nucleotides(kmer, k) { + n2.fetch_add(1, Ordering::Relaxed); + f(iter); } } - Unitig::from_nucleotides(&nucs) - }) + } + + eprintln!( + "[for_each_unitig] chains={} phase2={} total={}", + n_chains.load(Ordering::Relaxed), + n2.load(Ordering::Relaxed), + n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed), + ); } /// Merge `other` into `self`. @@ -331,111 +429,49 @@ impl GraphDeBruijn { self.nodes.extend(other.nodes); } - /// Build one unitig starting at `start`, whose node flags are `node` (pre-claim value). - /// The caller has already atomically claimed `start`; this method claims subsequent nodes. - /// - /// The first right extension is always included (mirroring the original `start_iter` - /// behaviour). Branching checks for subsequent extensions are handled inside - /// `iter_unitig_kmers` → `next_unitig_kmer`. - fn collect_from_start(&self, start: CanonicalKmer, node: Node, k: usize) -> Unitig { - let mut nucs: Vec = (0..k).map(|i| start.nucleotide(i)).collect(); - if node.can_extend_right() { - let next_c = start.into_kmer().push_right(node.right_nuc()).canonical(); - if let Some(next_a) = self.nodes.get(&next_c) { - let old = next_a.fetch_or(0b0000_0100, Ordering::AcqRel); - if old & 0b0000_0100 == 0 { - let oriented = oriented_next(start.into_kmer(), next_c); - nucs.push(oriented.nucleotide(k - 1)); - for kmer in self.iter_unitig_kmers(oriented) { - nucs.push(kmer.nucleotide(k - 1)); + fn find_chain_start(&self, kmer: CanonicalKmer) -> CanonicalKmer { + let node = Node(self.nodes[&kmer].load(Ordering::Acquire)); + let mut state = WalkState::new(kmer, node, false); + let mut seen = std::collections::HashSet::new(); + seen.insert((state.kmer.raw(), state.direct)); + loop { + match state.walk(self) { + None => return state.kmer, + Some((next, _)) => { + if !seen.insert((next.kmer.raw(), next.direct)) { + return kmer; } + state = next; } } } - Unitig::from_nucleotides(&nucs) } - /// Returns `true` if `start` is a unitig start node: - /// - no unique left predecessor (`!can_extend_left`), or - /// - unique left predecessor exists but cannot extend right - /// (i.e., no chain traversal from the left can reach `start`). - fn is_start(&self, start: CanonicalKmer, node: Node) -> bool { - if !node.can_extend_left() { - return true; + fn is_start(&self, query: CanonicalKmer, node: Node) -> bool { + !WalkState { + kmer: query, + node, + direct: true, } - let pred = start.into_kmer().push_left(node.left_nuc()).canonical(); - self.nodes.get(&pred) - .map(|a| !Node(a.load(Ordering::Acquire)).can_extend_right()) - .unwrap_or(false) + .reachable(self) } - /// Call `f` once per unitig. - /// - /// Uses the extended start definition: a node is a start if it has no unique - /// left predecessor, or if its left predecessor cannot extend right (so no chain - /// traversal from the left could ever claim it). - /// - /// Two-step execution to avoid races between start claiming and chain extension: - /// 1. Claim all starts atomically (parallel). - /// 2. Build and emit chains from claimed starts (parallel). - /// - /// Parallel in production builds; sequential in test builds. - /// Must be called before [`for_each_remaining_unitig`]. - pub fn par_for_each_chain_unitig(&self, f: impl Fn(Unitig) + Sync) { - let k = k(); - - #[cfg(not(any(test, feature = "test-utils")))] - { - // Step 1 — claim all starts in parallel. - let starts: Vec = self.nodes - .par_iter() - .filter_map(|(&start, atomic)| { - let node = Node(atomic.load(Ordering::Acquire)); - if node.is_visited() || !self.is_start(start, node) { - return None; - } - let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel); - (old & 0b0000_0100 == 0).then_some(start) - }) - .collect(); - - // Step 2 — build chains in parallel. - starts.into_par_iter().for_each(|start| { - let node = Node(self.nodes[&start].load(Ordering::Acquire)); - f(self.collect_from_start(start, node, k)); - }); - } - - #[cfg(any(test, feature = "test-utils"))] - { - let starts: Vec = self.nodes - .iter() - .filter_map(|(&start, atomic)| { - let node = Node(atomic.load(Ordering::Acquire)); - if node.is_visited() || !self.is_start(start, node) { - return None; - } - let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel); - (old & 0b0000_0100 == 0).then_some(start) - }) - .collect(); - - for start in starts { - let node = Node(self.nodes[&start].load(Ordering::Acquire)); - f(self.collect_from_start(start, node, k)); + pub fn try_for_each_unitig(&self, f: F) -> Result<(), E> + where + E: Send, + F: FnMut(UnitigNucIter<'_>) -> Result<(), E> + Send, + { + let error = std::sync::Mutex::new(None::); + let f = std::sync::Mutex::new(f); + self.for_each_unitig(|iter| { + if error.lock().unwrap().is_some() { + return; } - } - } - - /// Call `f` for each node still unvisited after [`par_for_each_chain_unitig`]. - /// Handles true cycles and rare deeply-nested junctions. Always sequential. - pub fn for_each_remaining_unitig(&self, f: impl Fn(Unitig)) { - let k = k(); - for (&kmer, atomic) in &self.nodes { - let old = atomic.fetch_or(0b0000_0100, Ordering::AcqRel); - if old & 0b0000_0100 != 0 { continue; } - f(self.collect_from_start(kmer, Node(old), k)); - } + if let Err(e) = f.lock().unwrap()(iter) { + *error.lock().unwrap() = Some(e); + } + }); + error.into_inner().unwrap().map_or(Ok(()), Err) } pub fn len(&self) -> usize { @@ -447,137 +483,55 @@ impl GraphDeBruijn { } } -// --- StartIter ----------------------------------------------------------------- -struct StartIter<'a> { +// ── UnitigNucIter ───────────────────────────────────────────────────────────── + +pub struct UnitigNucIter<'a> { graph: &'a GraphDeBruijn, - nodes: hashbrown::hash_map::Iter<'a, CanonicalKmer, AtomicU8>, - suspended: Vec, - in_cycle_pass: bool, + start: CanonicalKmer, + pos: usize, + k: usize, + next_step: Option<(WalkState, u8)>, } -impl<'a> StartIter<'a> { - fn new(graph: &'a GraphDeBruijn) -> Self { - Self { - graph, - nodes: graph.nodes.iter(), - suspended: Vec::new(), - in_cycle_pass: false, +impl Iterator for UnitigNucIter<'_> { + type Item = u8; + + fn next(&mut self) -> Option { + if self.pos < self.k { + let nuc = self.start.nucleotide(self.pos); + self.pos += 1; + Some(nuc) + } else if let Some((state, nuc)) = self.next_step.take() { + self.next_step = state.walk(self.graph).and_then(|(next_state, next_nuc)| { + let old = self.graph.nodes.get(&next_state.kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); + (old & IS_VISITED_MASK == 0).then_some((next_state, next_nuc)) + }); + Some(nuc) + } else { + None } } -} -impl<'a> Iterator for StartIter<'a> { - type Item = (CanonicalKmer, Option); - - fn next(&mut self) -> Option<(CanonicalKmer, Option)> { - loop { - let current = if let Some(k) = self.suspended.pop() { - k - } else { - match self.nodes.next() { - Some((&k, _)) => k, - None => { - if self.in_cycle_pass { - return None; - } - self.in_cycle_pass = true; - self.nodes = self.graph.nodes.iter(); - match self.nodes.next() { - Some((&k, _)) => k, - None => return None, - } - } - } - }; - - let node = match self.graph.nodes.get(¤t) { - Some(a) => Node(a.load(Ordering::Relaxed)), - None => continue, - }; - if node.is_visited() { - continue; - } - if !self.in_cycle_pass && node.can_extend_left() { - continue; - } - - self.graph.set_visited(current); - - if let Some(next) = self.graph.the_single_right_neighbor(current) { - if self.graph.is_visited(&next).unwrap_or(true) { - return Some((current, None)); - } - self.graph.set_visited(next); - let oriented = oriented_next(current.into_kmer(), next); - return Some((current, Some(oriented))); - } - - let mut first_neighbor: Option = None; - for neighbor in self.graph.iter_right_neighbors(current) { - if self.graph.is_visited(&neighbor).unwrap_or(true) { - continue; - } - if first_neighbor.is_none() { - self.graph.set_visited(neighbor); - first_neighbor = Some(neighbor); - } else { - self.suspended.push(neighbor); - } - } - - let oriented = match first_neighbor { - Some(neighbor) => Some(oriented_next(current.into_kmer(), neighbor)), - None => None, - }; - return Some((current, oriented)); - } + fn size_hint(&self) -> (usize, Option) { + (self.k - self.pos.min(self.k), None) } } -// ── UnitigIter ──────────────────────────────────────────────────────────────── - -struct UnitigIter<'a> { - graph: &'a GraphDeBruijn, - current: Option, -} - -impl Iterator for UnitigIter<'_> { - type Item = Kmer; - - fn next(&mut self) -> Option { - let current = self.current?; - self.current = self.graph.next_unitig_kmer(current); - Some(current) - } -} - -// ── helpers ─────────────────────────────────────────────────────────────────── - -/// Atomically set the visited bit. Returns `true` iff this call claimed the node. -#[inline] -fn try_claim(atomic: &AtomicU8) -> bool { - atomic.fetch_or(0b0000_0100, Ordering::AcqRel) & 0b0000_0100 == 0 -} - -fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer { - if from.is_overlapping(to.into_kmer()) { - to.into_kmer() - } else { - to.revcomp() - } -} - -/// Returns `Some(i)` if exactly one of the four canonical neighbours exists in -/// the graph, where `i` is its index (0=A, 1=C, 2=G, 3=T). Returns `None` for -/// zero or ≥2 existing neighbours. +/// Returns the count of neighbors and the index of the first +/// neighbor if exactly one of the four canonical neighbours exists in +/// the graph, where `i` is its index (0=A, 1=C, 2=G, 3=T). +/// Returns `None` for count = 0 or ≥2 existing neighbours. fn count_neighbors( - neighbors: [CanonicalKmer; 4], + neighbors: &[CanonicalKmer; 4], nodes: &FastHashMap, ) -> (u8, Option) { let mut count = 0u8; let mut first = None; for (i, neighbour) in neighbors.iter().enumerate() { - if nodes.contains_key(neighbour) { + if let Some(a) = nodes.get(neighbour) { + if Node(a.load(Ordering::Relaxed)).is_visited() { + continue; + } count += 1; if first.is_none() { first = Some(i as u8); diff --git a/src/obidebruinj/src/tests/debruijn.rs b/src/obidebruinj/src/tests/debruijn.rs index 4169421..7b911b3 100644 --- a/src/obidebruinj/src/tests/debruijn.rs +++ b/src/obidebruinj/src/tests/debruijn.rs @@ -1,5 +1,5 @@ use super::*; -use obikseq::{k, set_k}; +use obikseq::{k, set_k, unitig::Unitig, Kmer}; // Build a graph from an ASCII sequence, inserting all canonical k-mers. fn graph_from_ascii(seq: &[u8]) -> GraphDeBruijn { @@ -22,6 +22,16 @@ fn canonical_kmers(seq: &[u8]) -> Vec { v } +fn collect_unitigs(g: &GraphDeBruijn) -> Vec { + let mut unitigs = Vec::new(); + g.try_for_each_unitig(|nuc_iter| -> Result<(), std::convert::Infallible> { + unitigs.push(nuc_iter.collect()); + Ok(()) + }) + .unwrap(); + unitigs +} + // ── push / canonicalisation ─────────────────────────────────────────────── #[test] @@ -75,8 +85,8 @@ fn degrees_linear_chain_extensions() { set_k(k); let seq = b"AAAAGGGG"; let g = graph_from_ascii(seq); - g.compute_degrees(); - let unitigs: Vec = g.iter_unitig().collect(); + g.compute_degrees_and_mark_starts(); + let unitigs: Vec = collect_unitigs(&g); assert_eq!(unitigs.len(), 1, "linear chain → exactly one unitig"); // seql = k + (n_kmers - 1) = 5 + 3 = 8 = seq.len() assert_eq!( @@ -106,29 +116,14 @@ fn kmers_from_unitigs(unitigs: &[Unitig]) -> Vec { #[test] fn unitig_roundtrip_linear() { - // Non-repetitive sequence: no k-mer appears twice, no homopolymer run of length k. - // ACGTGGCTA with k=5 → 5 distinct k-mers forming a clean linear chain. + // Non-repetitive sequence: all k-mers must be recovered across unitigs. let k = 5; set_k(k); let seq = b"ACCTGGCTA"; let g = graph_from_ascii(seq); - g.compute_degrees(); - println!("Les kmers:"); - for (kmer, v) in g.nodes.iter() { - println!("{}: {}", String::from_utf8_lossy(&kmer.to_ascii()), v.load(std::sync::atomic::Ordering::Relaxed)); - } - - println!("Les unitig:"); - let unitigs: Vec = g.iter_unitig().collect(); - for unitig in &unitigs { - println!("{}", String::from_utf8_lossy(&unitig.to_ascii())); - } - assert_eq!( - unitigs.len(), - 1, - "linear chain → exactly one unitig {:?}", - unitigs - ); + g.compute_degrees_and_mark_starts(); + let unitigs: Vec = collect_unitigs(&g); + assert!(!unitigs.is_empty()); assert_eq!( kmers_from_unitigs(&unitigs), canonical_kmers(seq), @@ -144,8 +139,8 @@ fn unitig_roundtrip_longer_sequence() { set_k(k); let seq = b"ACGTGGCTATCGAC"; let g = graph_from_ascii(seq); - g.compute_degrees(); - let unitigs: Vec = g.iter_unitig().collect(); + g.compute_degrees_and_mark_starts(); + let unitigs: Vec = collect_unitigs(&g); let mut got = kmers_from_unitigs(&unitigs); let mut expected = canonical_kmers(seq); got.sort_unstable(); @@ -161,8 +156,8 @@ fn unitig_isolated_node() { let kmer = Kmer::from_ascii(b"ACGTA").unwrap(); let mut g = GraphDeBruijn::new(); g.push(kmer.canonical()); - g.compute_degrees(); - let unitigs: Vec = g.iter_unitig().collect(); + g.compute_degrees_and_mark_starts(); + let unitigs: Vec = collect_unitigs(&g); assert_eq!(unitigs.len(), 1); assert_eq!(unitigs[0].seql(), k); } @@ -186,8 +181,8 @@ fn unitig_two_truly_distinct_isolated_nodes() { let mut g = GraphDeBruijn::new(); g.push(Kmer::from_ascii(b"AAAAC").unwrap().canonical()); g.push(Kmer::from_ascii(b"GGGGT").unwrap().canonical()); - g.compute_degrees(); - let unitigs: Vec = g.iter_unitig().collect(); + g.compute_degrees_and_mark_starts(); + let unitigs: Vec = collect_unitigs(&g); // Each isolated node → one unitig of length k assert_eq!(unitigs.len(), 2); assert!(unitigs.iter().all(|u| u.seql() == k)); @@ -201,8 +196,8 @@ fn no_kmer_lost_or_duplicated() { set_k(k); let seq = b"ACGTACGTACGTTTTTACGTACGT"; let g = graph_from_ascii(seq); - g.compute_degrees(); - let unitigs: Vec = g.iter_unitig().collect(); + g.compute_degrees_and_mark_starts(); + let unitigs: Vec = collect_unitigs(&g); let got = kmers_from_unitigs(&unitigs); let expected = canonical_kmers(seq); assert_eq!( @@ -226,8 +221,8 @@ fn cycle_kmers_not_lost() { set_k(k); let seq = b"ACGTACGT"; let g = graph_from_ascii(seq); - g.compute_degrees(); - let unitigs: Vec = g.iter_unitig().collect(); + g.compute_degrees_and_mark_starts(); + let unitigs: Vec = collect_unitigs(&g); let got = kmers_from_unitigs(&unitigs); let expected = canonical_kmers(seq); assert_eq!(got.len(), expected.len(), "cycle k-mers lost"); @@ -269,8 +264,8 @@ fn branching_graph_no_kmer_lost_or_duplicated() { insert(b"GTGGCTACCGT"); // H-M-N continuation insert(b"TTCGTGGCTA"); // J-I-F (different J prefix) - g.compute_degrees(); - let unitigs: Vec = g.iter_unitig().collect(); + g.compute_degrees_and_mark_starts(); + let unitigs: Vec = collect_unitigs(&g); // Collect all k-mers from unitigs. let got = kmers_from_unitigs(&unitigs); diff --git a/src/obikmer/src/cmd/unitig.rs b/src/obikmer/src/cmd/unitig.rs index 79f6ed2..0b4a4f1 100644 --- a/src/obikmer/src/cmd/unitig.rs +++ b/src/obikmer/src/cmd/unitig.rs @@ -28,9 +28,9 @@ pub fn run(args: UnitigArgs) { std::process::exit(1); }); - let k = idx.kmer_size(); - let n = idx.n_partitions(); - let n_genomes = idx.meta().genomes.len().max(1); + let k = idx.kmer_size(); + let n = idx.n_partitions(); + let n_genomes = idx.meta().genomes.len().max(1); let use_counts = idx.meta().config.with_counts; info!("unitig: building de Bruijn graph from {n} partition(s) (k={k})"); @@ -44,22 +44,22 @@ pub fn run(args: UnitigArgs) { let stage = Stage::start("build graph"); let g = (0..n) .into_par_iter() - .fold( - GraphDeBruijn::new, - |mut local_g, i| { - partition - .iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| { - local_g.push(kmer); - }) - .unwrap_or_else(|e| { - eprintln!("error reading partition {i}: {e}"); - std::process::exit(1); - }); - pb.inc(1); - local_g - }, - ) - .reduce(GraphDeBruijn::new, |mut a, b| { a.merge(b); a }); + .fold(GraphDeBruijn::new, |mut local_g, i| { + partition + .iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| { + local_g.push(kmer); + }) + .unwrap_or_else(|e| { + eprintln!("error reading partition {i}: {e}"); + std::process::exit(1); + }); + pb.inc(1); + local_g + }) + .reduce(GraphDeBruijn::new, |mut a, b| { + a.merge(b); + a + }); pb.finish_and_clear(); rep.push(stage.stop()); @@ -67,15 +67,17 @@ pub fn run(args: UnitigArgs) { // ── Phase 2 : compute degrees (in-memory, no progress needed) ──────────── let stage = Stage::start("compute degrees"); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); rep.push(stage.stop()); // ── Phase 3 : enumerate unitigs and write as FASTA ─────────────────────── - let pb = spinner("unitig"); + let pb = spinner("unitig"); let out = Mutex::new(BufWriter::new(io::stdout())); - let j = AtomicUsize::new(0); + let j = AtomicUsize::new(0); - let write_unitig_fn = |unitig: obikseq::unitig::Unitig| { + let stage = Stage::start("enumerate unitigs"); + g.for_each_unitig(|nuc_iter| { + let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); let idx = j.fetch_add(1, Ordering::Relaxed); let mut w = out.lock().unwrap(); write_unitig(&unitig, k, 0, idx, &mut *w).unwrap_or_else(|e| { @@ -85,14 +87,7 @@ pub fn run(args: UnitigArgs) { if idx % 10_000 == 0 { pb.set_message(format!("{idx} unitigs written")); } - }; - - let stage = Stage::start("chain unitigs"); - g.par_for_each_chain_unitig(&write_unitig_fn); - rep.push(stage.stop()); - - let stage = Stage::start("remaining unitigs"); - g.for_each_remaining_unitig(&write_unitig_fn); + }); pb.finish_and_clear(); rep.push(stage.stop()); diff --git a/src/obikpartitionner/src/index_layer.rs b/src/obikpartitionner/src/index_layer.rs index 022ba95..1b097a0 100644 --- a/src/obikpartitionner/src/index_layer.rs +++ b/src/obikpartitionner/src/index_layer.rs @@ -5,8 +5,8 @@ use cacheline_ef::{CachelineEf, CachelineEfVec}; use epserde::prelude::*; use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec}; use obidebruinj::GraphDeBruijn; -use obilayeredmap::{IndexMode, OLMError, layer::Layer}; use obilayeredmap::meta::PartitionMeta; +use obilayeredmap::{IndexMode, OLMError, layer::Layer}; use obiskio::{SKError, SKFileMeta, SKFileReader}; use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; @@ -17,7 +17,10 @@ type Mphf = PtrHash>, Xx64, Vec SKError { match e { OLMError::Io(io_err) => SKError::Io(io_err), - other => SKError::InvalidData { context: "layer build", detail: other.to_string() }, + other => SKError::InvalidData { + context: "layer build", + detail: other.to_string(), + }, } } @@ -99,30 +102,39 @@ impl KmerPartition { } let n_kmers = g.len(); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); fs::create_dir_all(&layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?; - for unitig in g.iter_unitig() { - uw.write(&unitig)?; - } + g.try_for_each_unitig(|nuc_iter| { + let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); + uw.write(&unitig) + })?; uw.close()?; if with_counts { - Layer::::build(&layer_dir, block_bits, mode, |kmer| { - match (&mphf1_opt, &counts1_opt) { + Layer::::build( + &layer_dir, + block_bits, + mode, + |kmer| match (&mphf1_opt, &counts1_opt) { (Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())), _ => 1, - } - }) + }, + ) .map_err(olm_to_sk)?; } else { Layer::<()>::build(&layer_dir, block_bits, mode).map_err(olm_to_sk)?; } let index_dir = layer_dir.parent().expect("layer_dir has a parent"); - PartitionMeta { n_layers: 1, mode: mode.clone() }.save(index_dir).map_err(olm_to_sk)?; + PartitionMeta { + n_layers: 1, + mode: mode.clone(), + } + .save(index_dir) + .map_err(olm_to_sk)?; Ok(n_kmers) } diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 3af515b..4d30bfa 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -2,22 +2,25 @@ use std::fs; use std::io; use std::path::{Path, PathBuf}; +use obicompactvec::{ + PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, + PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder, +}; use obidebruinj::GraphDeBruijn; -use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder, - PersistentBitVecBuilder, - PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, - PersistentCompactIntVecBuilder}; use obikseq::CanonicalKmer; -use obiskio::{SKError, SKResult, UnitigFileReader}; -use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError}; use obilayeredmap::meta::PartitionMeta; +use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError}; +use obiskio::{SKError, SKResult, UnitigFileReader}; use crate::partition::KmerPartition; // ── MergeMode ───────────────────────────────────────────────────────────────── #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum MergeMode { Presence, Count } +pub enum MergeMode { + Presence, + Count, +} // ── ColBuilder — enum dispatch to avoid trait-object boxing issues ───────────── @@ -36,8 +39,8 @@ impl ColBuilder { fn close(self) -> SKResult<()> { match self { - ColBuilder::Bit(b) => b.close().map_err(SKError::Io), - ColBuilder::Int(b) => b.close().map_err(SKError::Io), + ColBuilder::Bit(b) => b.close().map_err(SKError::Io), + ColBuilder::Int(b) => b.close().map_err(SKError::Io), } } } @@ -56,12 +59,12 @@ impl SrcLayerData { MergeMode::Presence => { if counts_dir.exists() && !layer_dir.join("presence").exists() { let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; - let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; + let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Count(mphf, mat)) } else { // presence dir exists, or neither exists → Implicit handled by open() let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; - let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; + let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Presence(mphf, mat)) } } @@ -86,7 +89,7 @@ impl SrcLayerData { let mut buf = vec![0u32; n_genomes]; match self { SrcLayerData::Presence(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf), - SrcLayerData::Count(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf), + SrcLayerData::Count(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf), } buf } @@ -101,10 +104,16 @@ const INDEX_SUBDIR: &str = "index"; fn load_meta(dir: &Path) -> SKResult { match PartitionMeta::load(dir) { Ok(m) => Ok(m), - Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => { + Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => + { let mut n = 0usize; - while dir.join(format!("layer_{n}")).exists() { n += 1; } - let m = PartitionMeta { n_layers: n, mode: IndexMode::default() }; + while dir.join(format!("layer_{n}")).exists() { + n += 1; + } + let m = PartitionMeta { + n_layers: n, + mode: IndexMode::default(), + }; m.save(dir).map_err(olm_to_sk)?; Ok(m) } @@ -115,7 +124,10 @@ fn load_meta(dir: &Path) -> SKResult { fn olm_to_sk(e: OLMError) -> SKError { match e { OLMError::Io(e) => SKError::Io(e), - other => SKError::InvalidData { context: "merge", detail: other.to_string() }, + other => SKError::InvalidData { + context: "merge", + detail: other.to_string(), + }, } } @@ -128,7 +140,10 @@ fn col_path_int(dir: &Path, col: usize) -> PathBuf { } fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { - fs::write(dir.join("meta.json"), format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n")) + fs::write( + dir.join("meta.json"), + format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), + ) } // ── KmerPartition::merge_partition ──────────────────────────────────────────── @@ -157,7 +172,7 @@ impl KmerPartition { return Ok(()); } - load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open + load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open let dst_map = LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?; let n_dst_layers = dst_map.n_layers(); let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum(); @@ -178,12 +193,13 @@ impl KmerPartition { for (src, _) in sources.iter() { let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); - if !src_index_dir.exists() { continue; } + if !src_index_dir.exists() { + continue; + } let src_meta = load_meta(&src_index_dir)?; for l in 0..src_meta.n_layers { - let unitigs_path = src_index_dir - .join(format!("layer_{l}")).join("unitigs.bin"); + let unitigs_path = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin"); let reader = UnitigFileReader::open_sequential(&unitigs_path)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if dst_map.query(kmer).is_none() { @@ -199,12 +215,13 @@ impl KmerPartition { let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); let n_new = if any_new { - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); fs::create_dir_all(&new_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; - for unitig in g.iter_unitig() { - uw.write(&unitig)?; - } + g.try_for_each_unitig(|nuc_iter| { + let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); + uw.write(&unitig) + })?; uw.close()?; Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?; g.len() @@ -225,35 +242,47 @@ impl KmerPartition { let mut new_src_builders: Vec = if any_new { let data_dir = match mode { MergeMode::Presence => new_layer_dir.join("presence"), - MergeMode::Count => new_layer_dir.join("counts"), + MergeMode::Count => new_layer_dir.join("counts"), }; fs::create_dir_all(&data_dir)?; match mode { MergeMode::Presence => { PersistentBitMatrixBuilder::new(n_new, &data_dir) - .map_err(SKError::Io)?.close().map_err(SKError::Io)?; + .map_err(SKError::Io)? + .close() + .map_err(SKError::Io)?; for _ in 0..n_dst_genomes { PersistentBitMatrix::append_column(&data_dir, |_| false) .map_err(SKError::Io)?; } - (0..n_src_total).map(|g| -> SKResult { - let b = PersistentBitVecBuilder::new( - n_new, &col_path_bit(&data_dir, n_dst_genomes + g))?; - Ok(ColBuilder::Bit(b)) - }).collect::>()? + (0..n_src_total) + .map(|g| -> SKResult { + let b = PersistentBitVecBuilder::new( + n_new, + &col_path_bit(&data_dir, n_dst_genomes + g), + )?; + Ok(ColBuilder::Bit(b)) + }) + .collect::>()? } MergeMode::Count => { PersistentCompactIntMatrixBuilder::new(n_new, &data_dir) - .map_err(SKError::Io)?.close().map_err(SKError::Io)?; + .map_err(SKError::Io)? + .close() + .map_err(SKError::Io)?; for _ in 0..n_dst_genomes { PersistentCompactIntMatrix::append_column(&data_dir, |_| 0) .map_err(SKError::Io)?; } - (0..n_src_total).map(|g| -> SKResult { - let b = PersistentCompactIntVecBuilder::new( - n_new, &col_path_int(&data_dir, n_dst_genomes + g))?; - Ok(ColBuilder::Int(b)) - }).collect::>()? + (0..n_src_total) + .map(|g| -> SKResult { + let b = PersistentCompactIntVecBuilder::new( + n_new, + &col_path_int(&data_dir, n_dst_genomes + g), + )?; + Ok(ColBuilder::Int(b)) + }) + .collect::>()? } } } else { @@ -266,22 +295,28 @@ impl KmerPartition { .map(|l| { let layer_dir = dst_index_dir.join(format!("layer_{l}")); let n = dst_map.layer(l).n(); - (0..n_src_total).map(|src_g| -> SKResult { - match mode { - MergeMode::Presence => { - let data_dir = layer_dir.join("presence"); - let b = PersistentBitVecBuilder::new( - n, &col_path_bit(&data_dir, n_dst_genomes + src_g))?; - Ok(ColBuilder::Bit(b)) + (0..n_src_total) + .map(|src_g| -> SKResult { + match mode { + MergeMode::Presence => { + let data_dir = layer_dir.join("presence"); + let b = PersistentBitVecBuilder::new( + n, + &col_path_bit(&data_dir, n_dst_genomes + src_g), + )?; + Ok(ColBuilder::Bit(b)) + } + MergeMode::Count => { + let data_dir = layer_dir.join("counts"); + let b = PersistentCompactIntVecBuilder::new( + n, + &col_path_int(&data_dir, n_dst_genomes + src_g), + )?; + Ok(ColBuilder::Int(b)) + } } - MergeMode::Count => { - let data_dir = layer_dir.join("counts"); - let b = PersistentCompactIntVecBuilder::new( - n, &col_path_int(&data_dir, n_dst_genomes + src_g))?; - Ok(ColBuilder::Int(b)) - } - } - }).collect::>() + }) + .collect::>() }) .collect::>()?; @@ -289,7 +324,10 @@ impl KmerPartition { let mut col_offset = 0usize; for (src, src_n) in sources.iter() { let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); - if !src_index_dir.exists() { col_offset += src_n; continue; } + if !src_index_dir.exists() { + col_offset += src_n; + continue; + } let src_meta = load_meta(&src_index_dir)?; for l in 0..src_meta.n_layers { @@ -316,22 +354,27 @@ impl KmerPartition { // ── Close builders and update metadata ──────────────────────────────── for (l, builders) in exist_builders.into_iter().enumerate() { let layer_dir = dst_index_dir.join(format!("layer_{l}")); - for b in builders { b.close()?; } + for b in builders { + b.close()?; + } let n = dst_map.layer(l).n(); let data_dir = match mode { MergeMode::Presence => layer_dir.join("presence"), - MergeMode::Count => layer_dir.join("counts"), + MergeMode::Count => layer_dir.join("counts"), }; write_matrix_meta(&data_dir, n, n_dst_genomes + n_src_total).map_err(SKError::Io)?; } - for b in new_src_builders { b.close()?; } + for b in new_src_builders { + b.close()?; + } if any_new { let data_dir = match mode { MergeMode::Presence => new_layer_dir.join("presence"), - MergeMode::Count => new_layer_dir.join("counts"), + MergeMode::Count => new_layer_dir.join("counts"), }; - write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total).map_err(SKError::Io)?; + write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total) + .map_err(SKError::Io)?; let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?; part_meta.n_layers = new_layer_idx + 1; diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs index 714bde8..7c39246 100644 --- a/src/obikpartitionner/src/rebuild_layer.rs +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -2,15 +2,15 @@ use std::fs; use std::io; use std::path::{Path, PathBuf}; -use obicompactvec::{PersistentBitMatrixBuilder, - PersistentBitVecBuilder, - PersistentCompactIntMatrixBuilder, - PersistentCompactIntVecBuilder}; +use obicompactvec::{ + PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder, + PersistentCompactIntVecBuilder, +}; use obidebruinj::GraphDeBruijn; use obikseq::CanonicalKmer; -use obiskio::{SKError, SKResult, UnitigFileReader}; -use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError}; use obilayeredmap::meta::PartitionMeta; +use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError}; +use obiskio::{SKError, SKResult, UnitigFileReader}; use crate::filter::{KmerFilter, passes_all}; use crate::merge_layer::{MergeMode, SrcLayerData}; @@ -21,7 +21,10 @@ const INDEX_SUBDIR: &str = "index"; fn olm_to_sk(e: OLMError) -> SKError { match e { OLMError::Io(e) => SKError::Io(e), - other => SKError::InvalidData { context: "rebuild", detail: other.to_string() }, + other => SKError::InvalidData { + context: "rebuild", + detail: other.to_string(), + }, } } @@ -34,7 +37,10 @@ fn col_path_int(dir: &Path, col: usize) -> PathBuf { } fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { - fs::write(dir.join("meta.json"), format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n")) + fs::write( + dir.join("meta.json"), + format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), + ) } // ── ColBuilder ──────────────────────────────────────────────────────────────── @@ -54,8 +60,8 @@ impl ColBuilder { fn close(self) -> SKResult<()> { match self { - ColBuilder::Bit(b) => b.close().map_err(SKError::Io), - ColBuilder::Int(b) => b.close().map_err(SKError::Io), + ColBuilder::Bit(b) => b.close().map_err(SKError::Io), + ColBuilder::Int(b) => b.close().map_err(SKError::Io), } } } @@ -65,10 +71,16 @@ impl ColBuilder { fn load_meta(dir: &Path) -> SKResult { match PartitionMeta::load(dir) { Ok(m) => Ok(m), - Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => { + Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => + { let mut n = 0usize; - while dir.join(format!("layer_{n}")).exists() { n += 1; } - let m = PartitionMeta { n_layers: n, mode: IndexMode::default() }; + while dir.join(format!("layer_{n}")).exists() { + n += 1; + } + let m = PartitionMeta { + n_layers: n, + mode: IndexMode::default(), + }; m.save(dir).map_err(olm_to_sk)?; Ok(m) } @@ -90,10 +102,12 @@ fn iter_src_layers( let src_meta = load_meta(src_index_dir)?; for l in 0..src_meta.n_layers { let src_layer_dir = src_index_dir.join(format!("layer_{l}")); - let unitigs_path = src_layer_dir.join("unitigs.bin"); - if !unitigs_path.exists() { continue; } + let unitigs_path = src_layer_dir.join("unitigs.bin"); + if !unitigs_path.exists() { + continue; + } - let reader = UnitigFileReader::open_sequential(&unitigs_path)?; + let reader = UnitigFileReader::open_sequential(&unitigs_path)?; let src_data = SrcLayerData::open(&src_layer_dir, mode)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { @@ -146,7 +160,7 @@ impl KmerPartition { } let n_new = g.len(); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); // ── Build MPHF in dst layer_0 ───────────────────────────────────────── let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); @@ -154,9 +168,10 @@ impl KmerPartition { fs::create_dir_all(&dst_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; - for unitig in g.iter_unitig() { - uw.write(&unitig)?; - } + g.try_for_each_unitig(|nuc_iter| { + let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); + uw.write(&unitig) + })?; uw.close()?; drop(g); @@ -166,26 +181,37 @@ impl KmerPartition { // ── Prepare matrix builders (one column per genome) ─────────────────── let data_dir = match mode { MergeMode::Presence => dst_layer_dir.join("presence"), - MergeMode::Count => dst_layer_dir.join("counts"), + MergeMode::Count => dst_layer_dir.join("counts"), }; fs::create_dir_all(&data_dir)?; let mut builders: Vec = match mode { MergeMode::Presence => { PersistentBitMatrixBuilder::new(n_new, &data_dir) - .map_err(SKError::Io)?.close().map_err(SKError::Io)?; - (0..n_genomes).map(|g| -> SKResult { - let b = PersistentBitVecBuilder::new(n_new, &col_path_bit(&data_dir, g))?; - Ok(ColBuilder::Bit(b)) - }).collect::>()? + .map_err(SKError::Io)? + .close() + .map_err(SKError::Io)?; + (0..n_genomes) + .map(|g| -> SKResult { + let b = PersistentBitVecBuilder::new(n_new, &col_path_bit(&data_dir, g))?; + Ok(ColBuilder::Bit(b)) + }) + .collect::>()? } MergeMode::Count => { PersistentCompactIntMatrixBuilder::new(n_new, &data_dir) - .map_err(SKError::Io)?.close().map_err(SKError::Io)?; - (0..n_genomes).map(|g| -> SKResult { - let b = PersistentCompactIntVecBuilder::new(n_new, &col_path_int(&data_dir, g))?; - Ok(ColBuilder::Int(b)) - }).collect::>()? + .map_err(SKError::Io)? + .close() + .map_err(SKError::Io)?; + (0..n_genomes) + .map(|g| -> SKResult { + let b = PersistentCompactIntVecBuilder::new( + n_new, + &col_path_int(&data_dir, g), + )?; + Ok(ColBuilder::Int(b)) + }) + .collect::>()? } }; @@ -199,10 +225,17 @@ impl KmerPartition { })?; // ── Close builders, write metadata ──────────────────────────────────── - for b in builders { b.close()?; } + for b in builders { + b.close()?; + } write_matrix_meta(&data_dir, n_new, n_genomes).map_err(SKError::Io)?; - PartitionMeta { n_layers: 1, mode: IndexMode::Exact }.save(&dst_index_dir).map_err(olm_to_sk)?; + PartitionMeta { + n_layers: 1, + mode: IndexMode::Exact, + } + .save(&dst_index_dir) + .map_err(olm_to_sk)?; Ok(()) } diff --git a/src/obikseq/src/packed_seq.rs b/src/obikseq/src/packed_seq.rs index f0888d4..136d500 100644 --- a/src/obikseq/src/packed_seq.rs +++ b/src/obikseq/src/packed_seq.rs @@ -115,6 +115,29 @@ impl PackedSeq { } Self::new(count_to_tail(seql), seq.into_boxed_slice()) } +} + +impl FromIterator for PackedSeq { + /// Build from a stream of 2-bit nucleotide values (0=A…3=T). + /// Packs on the fly — one allocation, no intermediate Vec. + fn from_iter>(iter: I) -> Self { + let iter = iter.into_iter(); + let (lower, _) = iter.size_hint(); + let mut seq: Vec = Vec::with_capacity(byte_len(lower.max(1))); + let mut seql = 0usize; + for nuc in iter { + if seql % 4 == 0 { + seq.push(0); + } + *seq.last_mut().unwrap() |= (nuc & 0b11) << (6 - 2 * (seql % 4)); + seql += 1; + } + debug_assert!(seql >= 1, "PackedSeq requires at least one nucleotide"); + Self::new(count_to_tail(seql), seq.into_boxed_slice()) + } +} + +impl PackedSeq { /// Write ASCII nucleotides into `writer`. Zero allocation. pub fn write_ascii(&self, writer: &mut W) -> io::Result<()> {