From 9e1d6f2f25615428e1f8af50ae848609516eda62 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Thu, 21 May 2026 11:16:00 +0200 Subject: [PATCH] feat: implement partition-based merge command for k-mer indices Implements a new `merge` command that aggregates k-mer counts and presence/absence matrices from multiple source indices using a parallelized, partition-based algorithm. Adds CLI progress bars and execution timing across the bootstrap, spectrum rebuild, and merge phases. Updates logging to report the aggregate genome count and introduces a bounds check in the perfect hash layer to safely return `None` for unknown k-mers, preventing out-of-bounds access in downstream operations. --- TODO.md | 14 -------- src/obikindex/src/merge.rs | 56 +++++++++++++++++++++++++++-- src/obikmer/src/cmd/merge.rs | 12 +++++-- src/obilayeredmap/src/mphf_layer.rs | 2 ++ 4 files changed, 64 insertions(+), 20 deletions(-) diff --git a/TODO.md b/TODO.md index 038bd71..592fc74 100644 --- a/TODO.md +++ b/TODO.md @@ -9,20 +9,6 @@ ## commandes à ajouter -- merge : pour construire un index à partir d'index existants - - deux modes : count et presence/absence. count exige que tous les index mergés soient déjà en mode count. mode presence/absence par defaut. Si passage de mode count à mode presence/absence, par defaut presence = count >= 1. Possibilité de spécifier un seuil personnalisé. - - le merge doit se faire en parallèle sur chaque partition - - en entrée : une liste de chemins vers les index à fusionner - - en sortie : un nouvel index fusionné (option -o ) - - j'imagine comme algo: - - on copie le premier index dans le nouvel index - - on ajoute a chaque partition une matrice de count ou de presence s'il n'y en avait pas déjà. - - si besoin, on cree la colone 0 de la matrice de count ou de presence pour le genome courant - - on parcourt les partitions et les index à fusionner en parallèle - - pour chaque partition, on ajoute les kmer présents dans les index à fusionner au nouvel index - - si le kmer est déjà présent dans le nouvel index on ajoute le compte ou la presence du kmer dans la matrice de count ou de presence - - sinon, on ajoute le kmer dans une nouvelle layer - - filter : produit un nouvel index filtré à partir d'un index existant en verifiant que les kmer présents dans le nouvel index respectent les critères de filtrage spécifiés - quorum de presence en fraction-(min/max) du nombre de génomes, en nombre-(min/max) de génomes, si mode count la présence peut être défini par un seuil personnalisé minimum et maximum diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index 305d344..c31e3e5 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -2,7 +2,9 @@ use std::collections::HashMap; use std::fs; use std::io; use std::path::Path; +use std::time::Duration; +use indicatif::{ProgressBar, ProgressStyle}; use obisys::{Reporter, Stage}; use rayon::prelude::*; use tracing::info; @@ -31,6 +33,7 @@ impl KmerIndex { mode: MergeMode, force: bool, rename_duplicates: bool, + rep: &mut Reporter, ) -> OKIResult { let output = output.as_ref(); @@ -74,7 +77,14 @@ impl KmerIndex { } // ── Bootstrap: copy first source to output ──────────────────────────── - info!("copying {} → {}", sources[0].root_path.display(), output.display()); + info!( + "bootstrap: copying {} → {} ({} genome(s))", + sources[0].root_path.display(), + output.display(), + sources[0].meta.genomes.len(), + ); + let t = Stage::start("bootstrap"); + let pb = spinner("bootstrap — copying index …"); copy_dir_all(&sources[0].root_path, output)?; // Rewrite index.meta with final genome labels and the effective mode. @@ -88,9 +98,14 @@ impl KmerIndex { if mode == MergeMode::Presence { remove_dirs_named(output, "counts")?; } + pb.finish_and_clear(); + rep.push(t.stop()); // Rebuild spectrums/ from all sources using the (possibly renamed) labels. // Drop the spectrums/ that were copied from source_0 and rebuild from scratch. + info!("rebuilding spectrums for {} source(s)", sources.len()); + let t = Stage::start("spectrums"); + let pb = spinner("spectrums — copying …"); let spectrums_dir = output.join("spectrums"); if spectrums_dir.exists() { fs::remove_dir_all(&spectrums_dir)?; @@ -98,6 +113,8 @@ impl KmerIndex { for (src, new_labels) in sources.iter().zip(&source_labels) { copy_spectrums(&src.root_path, output, &src.meta.genomes, new_labels)?; } + pb.finish_and_clear(); + rep.push(t.stop()); // Open the destination index. let dst = KmerIndex::open(output)?; @@ -107,8 +124,13 @@ impl KmerIndex { // ── Merge each subsequent source partition-by-partition ─────────────── let remaining_sources: Vec<&KmerIndex> = sources[1..].to_vec(); if !remaining_sources.is_empty() { - let mut rep = Reporter::new(); + let n_src_genomes: usize = remaining_sources.iter().map(|s| s.meta.genomes.len()).sum(); + info!( + "merging {} partition(s) × {} additional source genome(s) into {} destination genome(s)", + n_partitions, n_src_genomes, n_dst_genomes, + ); let t = Stage::start("merge_partitions"); + let pb = partition_bar(n_partitions as u64); let dst_partition = &dst.partition; @@ -117,10 +139,13 @@ impl KmerIndex { .filter_map(|i| { let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> = remaining_sources.iter().map(|s| (&s.partition, s.meta.genomes.len())).collect(); - dst_partition.merge_partition(i, &srcs, mode, n_dst_genomes).err() + let result = dst_partition.merge_partition(i, &srcs, mode, n_dst_genomes).err(); + pb.inc(1); + result }) .collect(); + pb.finish_and_clear(); if let Some(e) = errors.into_iter().next() { return Err(OKIError::Partition(e)); } @@ -206,6 +231,31 @@ fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> { Ok(()) } +fn spinner(msg: &'static str) -> ProgressBar { + let pb = ProgressBar::new_spinner(); + pb.set_style( + ProgressStyle::with_template("{spinner} {msg} {elapsed}") + .unwrap() + .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]), + ); + pb.set_message(msg); + pb.enable_steady_tick(Duration::from_millis(100)); + pb +} + +fn partition_bar(n: u64) -> ProgressBar { + let pb = ProgressBar::new(n); + pb.set_style( + ProgressStyle::with_template( + "{spinner} merge — {bar:40.cyan/blue} {pos}/{len} partitions {elapsed}", + ) + .unwrap() + .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]), + ); + pb.enable_steady_tick(Duration::from_millis(100)); + pb +} + fn copy_dir_all(src: &Path, dst: &Path) -> io::Result<()> { fs::create_dir_all(dst)?; for entry in fs::read_dir(src)? { diff --git a/src/obikmer/src/cmd/merge.rs b/src/obikmer/src/cmd/merge.rs index 0b18ac8..3442273 100644 --- a/src/obikmer/src/cmd/merge.rs +++ b/src/obikmer/src/cmd/merge.rs @@ -55,9 +55,15 @@ pub fn run(args: MergeArgs) { set_k(sources[0].kmer_size()); set_m(sources[0].minimizer_size()); - info!("merging {} indexes → {}", sources.len(), args.output.display()); - let rep = Reporter::new(); - KmerIndex::merge(&args.output, &source_refs, mode, args.force, args.rename_duplicates).unwrap_or_else(|e| { + + let n_genomes: usize = sources.iter().map(|s| s.meta().genomes.len()).sum(); + info!( + "merging {} index(es), {} genome(s) total → {}", + sources.len(), n_genomes, args.output.display() + ); + + let mut rep = Reporter::new(); + KmerIndex::merge(&args.output, &source_refs, mode, args.force, args.rename_duplicates, &mut rep).unwrap_or_else(|e| { eprintln!("error merging: {e}"); std::process::exit(1); }); diff --git a/src/obilayeredmap/src/mphf_layer.rs b/src/obilayeredmap/src/mphf_layer.rs index 5fcb677..0844237 100644 --- a/src/obilayeredmap/src/mphf_layer.rs +++ b/src/obilayeredmap/src/mphf_layer.rs @@ -41,6 +41,8 @@ impl MphfLayer { #[inline] pub fn find(&self, kmer: CanonicalKmer) -> Option { let slot = self.mphf.index(&kmer.raw()); + // PtrHash guarantees slot < n only for its key set; arbitrary queries may exceed bounds. + if slot >= self.n { return None; } let (chunk_id, rank) = self.evidence.decode(slot); if self.unitigs.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer) { Some(slot)