From 5c2f48535f7defbc89bab50438971a6e79404b5d Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 5 Jun 2026 19:32:30 +0200 Subject: [PATCH] refactor: rename compute_degrees and mark start nodes Renames `compute_degrees` to `compute_degrees_and_mark_starts` across the De Bruijn graph and partitioner layers to consolidate degree calculation and start-node flagging. Introduces safe neighbor iteration methods and a debug validation block to verify graph consistency. Refactors unitig extraction to use sequential execution with a `Mutex` for safe error propagation. Fixes malformed and duplicated method calls, adds auto-generation of missing `meta.json` files, and ensures persistent matrix builders are explicitly closed to finalize metadata. --- src/obidebruinj/src/debruijn.rs | 297 +++------------------- src/obidebruinj/src/tests/debruijn.rs | 22 +- src/obikmer/src/cmd/unitig.rs | 44 ++-- src/obikpartitionner/src/index_layer.rs | 27 +- src/obikpartitionner/src/merge_layer.rs | 154 +++++++---- src/obikpartitionner/src/rebuild_layer.rs | 92 ++++--- 6 files changed, 252 insertions(+), 384 deletions(-) diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index ee0491b..b634b68 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -215,7 +215,7 @@ impl GraphDeBruijn { /// In production builds, runs in parallel across all nodes (each entry is /// written by exactly one thread). In test builds, runs sequentially to /// avoid propagating thread-local k/m values to rayon worker threads. - pub fn compute_degrees(&self) { + pub fn compute_degrees_and_mark_starts(&self) { // Pass 1: count right/left neighbors for each node self.for_each_node(|kmer, atomic| { @@ -225,8 +225,8 @@ impl GraphDeBruijn { atomic.store(old.0, Ordering::Relaxed); return; } - let (rc, rn) = count_neighbors(kmer.right_canonical_neighbors(), &self.nodes); - let (lc, ln) = count_neighbors(kmer.left_canonical_neighbors(), &self.nodes); + let (rc, rn) = count_neighbors(&kmer.right_canonical_neighbors(), &self.nodes); + let (lc, ln) = count_neighbors(&kmer.left_canonical_neighbors(), &self.nodes); let mut node = Node(0); // reset all bits (visited=0, start=0) node.set_right(rc, rn); node.set_left(lc, ln); @@ -248,32 +248,6 @@ impl GraphDeBruijn { }); } - /// Iterates over the right neighbors of `kmer`. - pub fn iter_right_neighbors( - &self, - kmer: CanonicalKmer, - ) -> impl Iterator + '_ { - kmer.right_canonical_neighbors() - .into_iter() - .filter_map(|kmer| { - self.nodes.get(&kmer)?; - Some(kmer) - }) - } - - /// Iterates over the left neighbors of `kmer`. - pub fn iter_left_neighbors( - &self, - kmer: CanonicalKmer, - ) -> impl Iterator + '_ { - kmer.left_canonical_neighbors() - .into_iter() - .filter_map(|kmer| { - self.nodes.get(&kmer)?; - Some(kmer) - }) - } - pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option { self.nodes .get(kmer) @@ -394,217 +368,7 @@ impl GraphDeBruijn { if n == 0 { break; } - self.compute_degrees(); - } - - // #[cfg(debug_assertions)] - { - let mut residual = GraphDeBruijn { - nodes: FastHashMap::with_hasher(Xxh3Builder::new()), - }; - for (&kmer, atomic) in &self.nodes { - if !Node(atomic.load(Ordering::Relaxed)).is_visited() { - residual.nodes.insert(kmer, AtomicU8::new(0)); - } - } - - let mut sample = 0; - for (&kmer, _) in residual.nodes.iter().take(1000) { - let left = kmer.left_canonical_neighbors(); - let real_left = left - .iter() - .filter(|&nb| residual.nodes.contains_key(nb)) - .count(); - let right = kmer.right_canonical_neighbors(); - let real_right = right - .iter() - .filter(|&nb| residual.nodes.contains_key(nb)) - .count(); - if real_left != 1 || real_right == 1 { - // normal - } else { - sample += 1; - if sample <= 10 { - eprintln!("Kmer {:?}: left={} right={}", kmer, real_left, real_right); - } - } - } - residual.compute_degrees(); - - let mut n_starts = 0usize; - let mut n_no_right = 0usize; - let mut n_no_left = 0usize; - - for (_, a) in &residual.nodes { - let node = Node(a.load(Ordering::Relaxed)); - if node.is_start() { - n_starts += 1; - } - if !node.can_extend_right() { - n_no_right += 1; - } - if !node.can_extend_left() { - n_no_left += 1; - } - } - - eprintln!( - "[for_each_unitig] residual after cascade: {} nodes | starts={} no_right={} no_left={}", - residual.nodes.len(), - n_starts, - n_no_right, - n_no_left, - ); - - let mut left_mismatch = 0; - let mut right_mismatch = 0; - let mut total_left = 0; - let mut total_right = 0; - let n_residual = residual.nodes.len(); - - for (&kmer, atomic) in &residual.nodes { - let node = Node(atomic.load(Ordering::Relaxed)); - - let left_neighbors = kmer.left_canonical_neighbors(); - let actual_left = left_neighbors - .iter() - .filter(|&nb| residual.nodes.contains_key(nb)) - .count(); - total_left += actual_left; - if (actual_left == 1) != node.can_extend_left() { - left_mismatch += 1; - } - - let right_neighbors = kmer.right_canonical_neighbors(); - let actual_right = right_neighbors - .iter() - .filter(|&nb| residual.nodes.contains_key(nb)) - .count(); - total_right += actual_right; - if (actual_right == 1) != node.can_extend_right() { - right_mismatch += 1; - } - } - - eprintln!( - "[consistency] N={} total_left={} total_right={} left_mismatch={} right_mismatch={}", - n_residual, total_left, total_right, left_mismatch, right_mismatch - ); - - let mut real_starts = 0; - let mut sample = 0; - for (&kmer, atomic) in &residual.nodes { - let node = Node(atomic.load(Ordering::Relaxed)); - // Calcul réel des voisins gauches (les 4 possibilités) - let left_neighbors = kmer.left_canonical_neighbors(); - let real_left_count = left_neighbors - .iter() - .filter(|&nb| residual.nodes.contains_key(nb)) - .count(); - let right_neighbors = kmer.right_canonical_neighbors(); - let real_right_count = right_neighbors - .iter() - .filter(|&nb| residual.nodes.contains_key(nb)) - .count(); - - // Vérification de cohérence avec les flags - if (real_left_count == 1) != node.can_extend_left() { - eprintln!( - "Incoherence left for {:?}: real={}, flag={}", - kmer, - real_left_count, - node.can_extend_left() - ); - } - if (real_right_count == 1) != node.can_extend_right() { - eprintln!( - "Incoherence right for {:?}: real={}, flag={}", - kmer, - real_right_count, - node.can_extend_right() - ); - } - - // Détermination si c'est un start selon la définition avec les vrais voisins - let is_start = if real_left_count != 1 { - true - } else { - // Trouver l'unique prédécesseur réel - let pred = left_neighbors - .iter() - .find(|&nb| residual.nodes.contains_key(nb)) - .unwrap(); - let pred_node = Node(residual.nodes.get(pred).unwrap().load(Ordering::Relaxed)); - let pred_right_neighbors = pred.right_canonical_neighbors(); - let pred_real_right_count = pred_right_neighbors - .iter() - .filter(|&nb| residual.nodes.contains_key(nb)) - .count(); - pred_real_right_count != 1 - }; - if is_start { - real_starts += 1; - if sample < 10 { - sample += 1; - eprintln!( - "Real start: {:?}, left_count={}, right_count={}", - kmer, real_left_count, real_right_count - ); - } - } - } - eprintln!("[real starts] count = {}", real_starts); - - let mut ok = 0; - let mut missing_pred = 0; - let mut pred_visited = 0; - let mut pred_no_right = 0; - let mut mismatch = 0; - - for (&kmer, atomic) in &residual.nodes { - let node = Node(atomic.load(Ordering::Relaxed)); - if !node.can_extend_right() { - // Le prédécesseur unique (car can_extend_left est vrai pour tous) - let pred = kmer.left_canonical_neighbors()[node.left_nuc() as usize]; - if let Some(pred_atomic) = residual.nodes.get(&pred) { - let pred_node = Node(pred_atomic.load(Ordering::Relaxed)); - if pred_node.can_extend_right() { - let succ = - pred.right_canonical_neighbors()[pred_node.right_nuc() as usize]; - if succ == kmer { - ok += 1; - } else { - mismatch += 1; - eprintln!( - "Mismatch: pred {:?} right_nuc={} -> {:?} != {:?}", - pred, - pred_node.right_nuc(), - succ, - kmer - ); - } - } else { - pred_no_right += 1; - eprintln!("Pred {:?} has !can_extend_right", pred); - } - } else { - // Prédécesseur absent du résidu : vérifier s'il est visité - if let Some(orig) = self.nodes.get(&pred) { - if Node(orig.load(Ordering::Relaxed)).is_visited() { - pred_visited += 1; - } else { - missing_pred += 1; - } - } else { - missing_pred += 1; - } - } - } - } - eprintln!( - "[diagnostic] nodes without right: ok={} missing_pred={} pred_visited={} pred_no_right={} mismatch={}", - ok, missing_pred, pred_visited, pred_no_right, mismatch - ); + self.compute_degrees_and_mark_starts(); } // Pass 2: cycles and tails — always sequential @@ -624,6 +388,16 @@ impl GraphDeBruijn { n2.fetch_add(1, Ordering::Relaxed); f(self.unitig_nucleotides(start, Node(old), k)); } + // Fallback: if kmer was not reached by start's chain, claim it directly. + // Safe because unitig_nucleotides(start, ...) may have visited kmer in the + // meantime — in that case fetch_or returns IS_VISITED_MASK set and we skip. + if start != kmer { + let kmer_old = atomic.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); + if kmer_old & IS_VISITED_MASK == 0 { + n2.fetch_add(1, Ordering::Relaxed); + f(self.unitig_nucleotides(kmer, Node(kmer_old), k)); + } + } } eprintln!( @@ -668,7 +442,10 @@ impl GraphDeBruijn { return current; } // Stop if asymmetry: pred's right canonical neighbor is not current - let pred_right = pred.into_kmer().push_right(pred_node.right_nuc()).canonical(); + let pred_right = pred + .into_kmer() + .push_right(pred_node.right_nuc()) + .canonical(); if pred_right != current { return current; } @@ -683,28 +460,29 @@ impl GraphDeBruijn { let pred = query.into_kmer().push_left(node.left_nuc()).canonical(); self.nodes .get(&pred) - .map(|a| !Node(a.load(Ordering::Acquire)).can_extend_right()) + .map(|a| { + let pred_node = Node(a.load(Ordering::Acquire)); + !pred_node.can_extend_right() + }) .unwrap_or(false) } - pub fn try_for_each_unitig(&self, mut f: F) -> Result<(), E> + pub fn try_for_each_unitig(&self, f: F) -> Result<(), E> where - F: FnMut(UnitigNucIter<'_>) -> Result<(), E>, + E: Send, + F: FnMut(UnitigNucIter<'_>) -> Result<(), E> + Send, { - let k = k(); - for (&kmer, atomic) in &self.nodes { - let node = Node(atomic.load(Ordering::Acquire)); - if node.is_start() { - f(self.unitig_nucleotides(kmer, node, k))?; + let error = std::sync::Mutex::new(None::); + let f = std::sync::Mutex::new(f); + self.for_each_unitig(|iter| { + if error.lock().unwrap().is_some() { + return; } - } - for (&kmer, atomic) in &self.nodes { - let old = atomic.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); - if old & IS_VISITED_MASK == 0 { - f(self.unitig_nucleotides(kmer, Node(old), k))?; + if let Err(e) = f.lock().unwrap()(iter) { + *error.lock().unwrap() = Some(e); } - } - Ok(()) + }); + error.into_inner().unwrap().map_or(Ok(()), Err) } pub fn len(&self) -> usize { @@ -772,8 +550,9 @@ fn try_claim(atomic: &AtomicU8) -> bool { } fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer { - if from.is_overlapping(to.into_kmer()) { - to.into_kmer() + let direct = to.into_kmer(); + if from.is_overlapping(direct) { + direct } else { to.revcomp() } @@ -784,7 +563,7 @@ fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer { /// the graph, where `i` is its index (0=A, 1=C, 2=G, 3=T). /// Returns `None` for count = 0 or ≥2 existing neighbours. fn count_neighbors( - neighbors: [CanonicalKmer; 4], + neighbors: &[CanonicalKmer; 4], nodes: &FastHashMap, ) -> (u8, Option) { let mut count = 0u8; diff --git a/src/obidebruinj/src/tests/debruijn.rs b/src/obidebruinj/src/tests/debruijn.rs index 4169421..24312b0 100644 --- a/src/obidebruinj/src/tests/debruijn.rs +++ b/src/obidebruinj/src/tests/debruijn.rs @@ -75,7 +75,7 @@ fn degrees_linear_chain_extensions() { set_k(k); let seq = b"AAAAGGGG"; let g = graph_from_ascii(seq); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); let unitigs: Vec = g.iter_unitig().collect(); assert_eq!(unitigs.len(), 1, "linear chain → exactly one unitig"); // seql = k + (n_kmers - 1) = 5 + 3 = 8 = seq.len() @@ -112,10 +112,14 @@ fn unitig_roundtrip_linear() { set_k(k); let seq = b"ACCTGGCTA"; let g = graph_from_ascii(seq); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); println!("Les kmers:"); for (kmer, v) in g.nodes.iter() { - println!("{}: {}", String::from_utf8_lossy(&kmer.to_ascii()), v.load(std::sync::atomic::Ordering::Relaxed)); + println!( + "{}: {}", + String::from_utf8_lossy(&kmer.to_ascii()), + v.load(std::sync::atomic::Ordering::Relaxed) + ); } println!("Les unitig:"); @@ -144,7 +148,7 @@ fn unitig_roundtrip_longer_sequence() { set_k(k); let seq = b"ACGTGGCTATCGAC"; let g = graph_from_ascii(seq); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); let unitigs: Vec = g.iter_unitig().collect(); let mut got = kmers_from_unitigs(&unitigs); let mut expected = canonical_kmers(seq); @@ -161,7 +165,7 @@ fn unitig_isolated_node() { let kmer = Kmer::from_ascii(b"ACGTA").unwrap(); let mut g = GraphDeBruijn::new(); g.push(kmer.canonical()); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); let unitigs: Vec = g.iter_unitig().collect(); assert_eq!(unitigs.len(), 1); assert_eq!(unitigs[0].seql(), k); @@ -186,7 +190,7 @@ fn unitig_two_truly_distinct_isolated_nodes() { let mut g = GraphDeBruijn::new(); g.push(Kmer::from_ascii(b"AAAAC").unwrap().canonical()); g.push(Kmer::from_ascii(b"GGGGT").unwrap().canonical()); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); let unitigs: Vec = g.iter_unitig().collect(); // Each isolated node → one unitig of length k assert_eq!(unitigs.len(), 2); @@ -201,7 +205,7 @@ fn no_kmer_lost_or_duplicated() { set_k(k); let seq = b"ACGTACGTACGTTTTTACGTACGT"; let g = graph_from_ascii(seq); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); let unitigs: Vec = g.iter_unitig().collect(); let got = kmers_from_unitigs(&unitigs); let expected = canonical_kmers(seq); @@ -226,7 +230,7 @@ fn cycle_kmers_not_lost() { set_k(k); let seq = b"ACGTACGT"; let g = graph_from_ascii(seq); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); let unitigs: Vec = g.iter_unitig().collect(); let got = kmers_from_unitigs(&unitigs); let expected = canonical_kmers(seq); @@ -269,7 +273,7 @@ fn branching_graph_no_kmer_lost_or_duplicated() { insert(b"GTGGCTACCGT"); // H-M-N continuation insert(b"TTCGTGGCTA"); // J-I-F (different J prefix) - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); let unitigs: Vec = g.iter_unitig().collect(); // Collect all k-mers from unitigs. diff --git a/src/obikmer/src/cmd/unitig.rs b/src/obikmer/src/cmd/unitig.rs index cd5f92d..0b4a4f1 100644 --- a/src/obikmer/src/cmd/unitig.rs +++ b/src/obikmer/src/cmd/unitig.rs @@ -28,9 +28,9 @@ pub fn run(args: UnitigArgs) { std::process::exit(1); }); - let k = idx.kmer_size(); - let n = idx.n_partitions(); - let n_genomes = idx.meta().genomes.len().max(1); + let k = idx.kmer_size(); + let n = idx.n_partitions(); + let n_genomes = idx.meta().genomes.len().max(1); let use_counts = idx.meta().config.with_counts; info!("unitig: building de Bruijn graph from {n} partition(s) (k={k})"); @@ -44,22 +44,22 @@ pub fn run(args: UnitigArgs) { let stage = Stage::start("build graph"); let g = (0..n) .into_par_iter() - .fold( - GraphDeBruijn::new, - |mut local_g, i| { - partition - .iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| { - local_g.push(kmer); - }) - .unwrap_or_else(|e| { - eprintln!("error reading partition {i}: {e}"); - std::process::exit(1); - }); - pb.inc(1); - local_g - }, - ) - .reduce(GraphDeBruijn::new, |mut a, b| { a.merge(b); a }); + .fold(GraphDeBruijn::new, |mut local_g, i| { + partition + .iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| { + local_g.push(kmer); + }) + .unwrap_or_else(|e| { + eprintln!("error reading partition {i}: {e}"); + std::process::exit(1); + }); + pb.inc(1); + local_g + }) + .reduce(GraphDeBruijn::new, |mut a, b| { + a.merge(b); + a + }); pb.finish_and_clear(); rep.push(stage.stop()); @@ -67,13 +67,13 @@ pub fn run(args: UnitigArgs) { // ── Phase 2 : compute degrees (in-memory, no progress needed) ──────────── let stage = Stage::start("compute degrees"); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); rep.push(stage.stop()); // ── Phase 3 : enumerate unitigs and write as FASTA ─────────────────────── - let pb = spinner("unitig"); + let pb = spinner("unitig"); let out = Mutex::new(BufWriter::new(io::stdout())); - let j = AtomicUsize::new(0); + let j = AtomicUsize::new(0); let stage = Stage::start("enumerate unitigs"); g.for_each_unitig(|nuc_iter| { diff --git a/src/obikpartitionner/src/index_layer.rs b/src/obikpartitionner/src/index_layer.rs index 6b87c99..1b097a0 100644 --- a/src/obikpartitionner/src/index_layer.rs +++ b/src/obikpartitionner/src/index_layer.rs @@ -5,8 +5,8 @@ use cacheline_ef::{CachelineEf, CachelineEfVec}; use epserde::prelude::*; use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec}; use obidebruinj::GraphDeBruijn; -use obilayeredmap::{IndexMode, OLMError, layer::Layer}; use obilayeredmap::meta::PartitionMeta; +use obilayeredmap::{IndexMode, OLMError, layer::Layer}; use obiskio::{SKError, SKFileMeta, SKFileReader}; use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; @@ -17,7 +17,10 @@ type Mphf = PtrHash>, Xx64, Vec SKError { match e { OLMError::Io(io_err) => SKError::Io(io_err), - other => SKError::InvalidData { context: "layer build", detail: other.to_string() }, + other => SKError::InvalidData { + context: "layer build", + detail: other.to_string(), + }, } } @@ -99,7 +102,7 @@ impl KmerPartition { } let n_kmers = g.len(); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); fs::create_dir_all(&layer_dir)?; @@ -111,19 +114,27 @@ impl KmerPartition { uw.close()?; if with_counts { - Layer::::build(&layer_dir, block_bits, mode, |kmer| { - match (&mphf1_opt, &counts1_opt) { + Layer::::build( + &layer_dir, + block_bits, + mode, + |kmer| match (&mphf1_opt, &counts1_opt) { (Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())), _ => 1, - } - }) + }, + ) .map_err(olm_to_sk)?; } else { Layer::<()>::build(&layer_dir, block_bits, mode).map_err(olm_to_sk)?; } let index_dir = layer_dir.parent().expect("layer_dir has a parent"); - PartitionMeta { n_layers: 1, mode: mode.clone() }.save(index_dir).map_err(olm_to_sk)?; + PartitionMeta { + n_layers: 1, + mode: mode.clone(), + } + .save(index_dir) + .map_err(olm_to_sk)?; Ok(n_kmers) } diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 68373b0..4d30bfa 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -2,22 +2,25 @@ use std::fs; use std::io; use std::path::{Path, PathBuf}; +use obicompactvec::{ + PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, + PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder, +}; use obidebruinj::GraphDeBruijn; -use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder, - PersistentBitVecBuilder, - PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, - PersistentCompactIntVecBuilder}; use obikseq::CanonicalKmer; -use obiskio::{SKError, SKResult, UnitigFileReader}; -use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError}; use obilayeredmap::meta::PartitionMeta; +use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError}; +use obiskio::{SKError, SKResult, UnitigFileReader}; use crate::partition::KmerPartition; // ── MergeMode ───────────────────────────────────────────────────────────────── #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum MergeMode { Presence, Count } +pub enum MergeMode { + Presence, + Count, +} // ── ColBuilder — enum dispatch to avoid trait-object boxing issues ───────────── @@ -36,8 +39,8 @@ impl ColBuilder { fn close(self) -> SKResult<()> { match self { - ColBuilder::Bit(b) => b.close().map_err(SKError::Io), - ColBuilder::Int(b) => b.close().map_err(SKError::Io), + ColBuilder::Bit(b) => b.close().map_err(SKError::Io), + ColBuilder::Int(b) => b.close().map_err(SKError::Io), } } } @@ -56,12 +59,12 @@ impl SrcLayerData { MergeMode::Presence => { if counts_dir.exists() && !layer_dir.join("presence").exists() { let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; - let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; + let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Count(mphf, mat)) } else { // presence dir exists, or neither exists → Implicit handled by open() let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; - let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; + let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Presence(mphf, mat)) } } @@ -86,7 +89,7 @@ impl SrcLayerData { let mut buf = vec![0u32; n_genomes]; match self { SrcLayerData::Presence(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf), - SrcLayerData::Count(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf), + SrcLayerData::Count(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf), } buf } @@ -101,10 +104,16 @@ const INDEX_SUBDIR: &str = "index"; fn load_meta(dir: &Path) -> SKResult { match PartitionMeta::load(dir) { Ok(m) => Ok(m), - Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => { + Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => + { let mut n = 0usize; - while dir.join(format!("layer_{n}")).exists() { n += 1; } - let m = PartitionMeta { n_layers: n, mode: IndexMode::default() }; + while dir.join(format!("layer_{n}")).exists() { + n += 1; + } + let m = PartitionMeta { + n_layers: n, + mode: IndexMode::default(), + }; m.save(dir).map_err(olm_to_sk)?; Ok(m) } @@ -115,7 +124,10 @@ fn load_meta(dir: &Path) -> SKResult { fn olm_to_sk(e: OLMError) -> SKError { match e { OLMError::Io(e) => SKError::Io(e), - other => SKError::InvalidData { context: "merge", detail: other.to_string() }, + other => SKError::InvalidData { + context: "merge", + detail: other.to_string(), + }, } } @@ -128,7 +140,10 @@ fn col_path_int(dir: &Path, col: usize) -> PathBuf { } fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { - fs::write(dir.join("meta.json"), format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n")) + fs::write( + dir.join("meta.json"), + format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), + ) } // ── KmerPartition::merge_partition ──────────────────────────────────────────── @@ -157,7 +172,7 @@ impl KmerPartition { return Ok(()); } - load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open + load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open let dst_map = LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?; let n_dst_layers = dst_map.n_layers(); let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum(); @@ -178,12 +193,13 @@ impl KmerPartition { for (src, _) in sources.iter() { let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); - if !src_index_dir.exists() { continue; } + if !src_index_dir.exists() { + continue; + } let src_meta = load_meta(&src_index_dir)?; for l in 0..src_meta.n_layers { - let unitigs_path = src_index_dir - .join(format!("layer_{l}")).join("unitigs.bin"); + let unitigs_path = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin"); let reader = UnitigFileReader::open_sequential(&unitigs_path)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if dst_map.query(kmer).is_none() { @@ -199,7 +215,7 @@ impl KmerPartition { let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); let n_new = if any_new { - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); fs::create_dir_all(&new_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; g.try_for_each_unitig(|nuc_iter| { @@ -226,35 +242,47 @@ impl KmerPartition { let mut new_src_builders: Vec = if any_new { let data_dir = match mode { MergeMode::Presence => new_layer_dir.join("presence"), - MergeMode::Count => new_layer_dir.join("counts"), + MergeMode::Count => new_layer_dir.join("counts"), }; fs::create_dir_all(&data_dir)?; match mode { MergeMode::Presence => { PersistentBitMatrixBuilder::new(n_new, &data_dir) - .map_err(SKError::Io)?.close().map_err(SKError::Io)?; + .map_err(SKError::Io)? + .close() + .map_err(SKError::Io)?; for _ in 0..n_dst_genomes { PersistentBitMatrix::append_column(&data_dir, |_| false) .map_err(SKError::Io)?; } - (0..n_src_total).map(|g| -> SKResult { - let b = PersistentBitVecBuilder::new( - n_new, &col_path_bit(&data_dir, n_dst_genomes + g))?; - Ok(ColBuilder::Bit(b)) - }).collect::>()? + (0..n_src_total) + .map(|g| -> SKResult { + let b = PersistentBitVecBuilder::new( + n_new, + &col_path_bit(&data_dir, n_dst_genomes + g), + )?; + Ok(ColBuilder::Bit(b)) + }) + .collect::>()? } MergeMode::Count => { PersistentCompactIntMatrixBuilder::new(n_new, &data_dir) - .map_err(SKError::Io)?.close().map_err(SKError::Io)?; + .map_err(SKError::Io)? + .close() + .map_err(SKError::Io)?; for _ in 0..n_dst_genomes { PersistentCompactIntMatrix::append_column(&data_dir, |_| 0) .map_err(SKError::Io)?; } - (0..n_src_total).map(|g| -> SKResult { - let b = PersistentCompactIntVecBuilder::new( - n_new, &col_path_int(&data_dir, n_dst_genomes + g))?; - Ok(ColBuilder::Int(b)) - }).collect::>()? + (0..n_src_total) + .map(|g| -> SKResult { + let b = PersistentCompactIntVecBuilder::new( + n_new, + &col_path_int(&data_dir, n_dst_genomes + g), + )?; + Ok(ColBuilder::Int(b)) + }) + .collect::>()? } } } else { @@ -267,22 +295,28 @@ impl KmerPartition { .map(|l| { let layer_dir = dst_index_dir.join(format!("layer_{l}")); let n = dst_map.layer(l).n(); - (0..n_src_total).map(|src_g| -> SKResult { - match mode { - MergeMode::Presence => { - let data_dir = layer_dir.join("presence"); - let b = PersistentBitVecBuilder::new( - n, &col_path_bit(&data_dir, n_dst_genomes + src_g))?; - Ok(ColBuilder::Bit(b)) + (0..n_src_total) + .map(|src_g| -> SKResult { + match mode { + MergeMode::Presence => { + let data_dir = layer_dir.join("presence"); + let b = PersistentBitVecBuilder::new( + n, + &col_path_bit(&data_dir, n_dst_genomes + src_g), + )?; + Ok(ColBuilder::Bit(b)) + } + MergeMode::Count => { + let data_dir = layer_dir.join("counts"); + let b = PersistentCompactIntVecBuilder::new( + n, + &col_path_int(&data_dir, n_dst_genomes + src_g), + )?; + Ok(ColBuilder::Int(b)) + } } - MergeMode::Count => { - let data_dir = layer_dir.join("counts"); - let b = PersistentCompactIntVecBuilder::new( - n, &col_path_int(&data_dir, n_dst_genomes + src_g))?; - Ok(ColBuilder::Int(b)) - } - } - }).collect::>() + }) + .collect::>() }) .collect::>()?; @@ -290,7 +324,10 @@ impl KmerPartition { let mut col_offset = 0usize; for (src, src_n) in sources.iter() { let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); - if !src_index_dir.exists() { col_offset += src_n; continue; } + if !src_index_dir.exists() { + col_offset += src_n; + continue; + } let src_meta = load_meta(&src_index_dir)?; for l in 0..src_meta.n_layers { @@ -317,22 +354,27 @@ impl KmerPartition { // ── Close builders and update metadata ──────────────────────────────── for (l, builders) in exist_builders.into_iter().enumerate() { let layer_dir = dst_index_dir.join(format!("layer_{l}")); - for b in builders { b.close()?; } + for b in builders { + b.close()?; + } let n = dst_map.layer(l).n(); let data_dir = match mode { MergeMode::Presence => layer_dir.join("presence"), - MergeMode::Count => layer_dir.join("counts"), + MergeMode::Count => layer_dir.join("counts"), }; write_matrix_meta(&data_dir, n, n_dst_genomes + n_src_total).map_err(SKError::Io)?; } - for b in new_src_builders { b.close()?; } + for b in new_src_builders { + b.close()?; + } if any_new { let data_dir = match mode { MergeMode::Presence => new_layer_dir.join("presence"), - MergeMode::Count => new_layer_dir.join("counts"), + MergeMode::Count => new_layer_dir.join("counts"), }; - write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total).map_err(SKError::Io)?; + write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total) + .map_err(SKError::Io)?; let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?; part_meta.n_layers = new_layer_idx + 1; diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs index 39a0994..7c39246 100644 --- a/src/obikpartitionner/src/rebuild_layer.rs +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -2,15 +2,15 @@ use std::fs; use std::io; use std::path::{Path, PathBuf}; -use obicompactvec::{PersistentBitMatrixBuilder, - PersistentBitVecBuilder, - PersistentCompactIntMatrixBuilder, - PersistentCompactIntVecBuilder}; +use obicompactvec::{ + PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder, + PersistentCompactIntVecBuilder, +}; use obidebruinj::GraphDeBruijn; use obikseq::CanonicalKmer; -use obiskio::{SKError, SKResult, UnitigFileReader}; -use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError}; use obilayeredmap::meta::PartitionMeta; +use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError}; +use obiskio::{SKError, SKResult, UnitigFileReader}; use crate::filter::{KmerFilter, passes_all}; use crate::merge_layer::{MergeMode, SrcLayerData}; @@ -21,7 +21,10 @@ const INDEX_SUBDIR: &str = "index"; fn olm_to_sk(e: OLMError) -> SKError { match e { OLMError::Io(e) => SKError::Io(e), - other => SKError::InvalidData { context: "rebuild", detail: other.to_string() }, + other => SKError::InvalidData { + context: "rebuild", + detail: other.to_string(), + }, } } @@ -34,7 +37,10 @@ fn col_path_int(dir: &Path, col: usize) -> PathBuf { } fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { - fs::write(dir.join("meta.json"), format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n")) + fs::write( + dir.join("meta.json"), + format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), + ) } // ── ColBuilder ──────────────────────────────────────────────────────────────── @@ -54,8 +60,8 @@ impl ColBuilder { fn close(self) -> SKResult<()> { match self { - ColBuilder::Bit(b) => b.close().map_err(SKError::Io), - ColBuilder::Int(b) => b.close().map_err(SKError::Io), + ColBuilder::Bit(b) => b.close().map_err(SKError::Io), + ColBuilder::Int(b) => b.close().map_err(SKError::Io), } } } @@ -65,10 +71,16 @@ impl ColBuilder { fn load_meta(dir: &Path) -> SKResult { match PartitionMeta::load(dir) { Ok(m) => Ok(m), - Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => { + Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => + { let mut n = 0usize; - while dir.join(format!("layer_{n}")).exists() { n += 1; } - let m = PartitionMeta { n_layers: n, mode: IndexMode::default() }; + while dir.join(format!("layer_{n}")).exists() { + n += 1; + } + let m = PartitionMeta { + n_layers: n, + mode: IndexMode::default(), + }; m.save(dir).map_err(olm_to_sk)?; Ok(m) } @@ -90,10 +102,12 @@ fn iter_src_layers( let src_meta = load_meta(src_index_dir)?; for l in 0..src_meta.n_layers { let src_layer_dir = src_index_dir.join(format!("layer_{l}")); - let unitigs_path = src_layer_dir.join("unitigs.bin"); - if !unitigs_path.exists() { continue; } + let unitigs_path = src_layer_dir.join("unitigs.bin"); + if !unitigs_path.exists() { + continue; + } - let reader = UnitigFileReader::open_sequential(&unitigs_path)?; + let reader = UnitigFileReader::open_sequential(&unitigs_path)?; let src_data = SrcLayerData::open(&src_layer_dir, mode)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { @@ -146,7 +160,7 @@ impl KmerPartition { } let n_new = g.len(); - g.compute_degrees(); + g.compute_degrees_and_mark_starts(); // ── Build MPHF in dst layer_0 ───────────────────────────────────────── let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); @@ -167,26 +181,37 @@ impl KmerPartition { // ── Prepare matrix builders (one column per genome) ─────────────────── let data_dir = match mode { MergeMode::Presence => dst_layer_dir.join("presence"), - MergeMode::Count => dst_layer_dir.join("counts"), + MergeMode::Count => dst_layer_dir.join("counts"), }; fs::create_dir_all(&data_dir)?; let mut builders: Vec = match mode { MergeMode::Presence => { PersistentBitMatrixBuilder::new(n_new, &data_dir) - .map_err(SKError::Io)?.close().map_err(SKError::Io)?; - (0..n_genomes).map(|g| -> SKResult { - let b = PersistentBitVecBuilder::new(n_new, &col_path_bit(&data_dir, g))?; - Ok(ColBuilder::Bit(b)) - }).collect::>()? + .map_err(SKError::Io)? + .close() + .map_err(SKError::Io)?; + (0..n_genomes) + .map(|g| -> SKResult { + let b = PersistentBitVecBuilder::new(n_new, &col_path_bit(&data_dir, g))?; + Ok(ColBuilder::Bit(b)) + }) + .collect::>()? } MergeMode::Count => { PersistentCompactIntMatrixBuilder::new(n_new, &data_dir) - .map_err(SKError::Io)?.close().map_err(SKError::Io)?; - (0..n_genomes).map(|g| -> SKResult { - let b = PersistentCompactIntVecBuilder::new(n_new, &col_path_int(&data_dir, g))?; - Ok(ColBuilder::Int(b)) - }).collect::>()? + .map_err(SKError::Io)? + .close() + .map_err(SKError::Io)?; + (0..n_genomes) + .map(|g| -> SKResult { + let b = PersistentCompactIntVecBuilder::new( + n_new, + &col_path_int(&data_dir, g), + )?; + Ok(ColBuilder::Int(b)) + }) + .collect::>()? } }; @@ -200,10 +225,17 @@ impl KmerPartition { })?; // ── Close builders, write metadata ──────────────────────────────────── - for b in builders { b.close()?; } + for b in builders { + b.close()?; + } write_matrix_meta(&data_dir, n_new, n_genomes).map_err(SKError::Io)?; - PartitionMeta { n_layers: 1, mode: IndexMode::Exact }.save(&dst_index_dir).map_err(olm_to_sk)?; + PartitionMeta { + n_layers: 1, + mode: IndexMode::Exact, + } + .save(&dst_index_dir) + .map_err(olm_to_sk)?; Ok(()) }