feat: implement partition-based merge command for k-mer indices
Implements a new `merge` command that aggregates k-mer counts and presence/absence matrices from multiple source indices using a parallelized, partition-based algorithm. Adds CLI progress bars and execution timing across the bootstrap, spectrum rebuild, and merge phases. Updates logging to report the aggregate genome count and introduces a bounds check in the perfect hash layer to safely return `None` for unknown k-mers, preventing out-of-bounds access in downstream operations.
This commit is contained in:
@@ -2,7 +2,9 @@ use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use obisys::{Reporter, Stage};
|
||||
use rayon::prelude::*;
|
||||
use tracing::info;
|
||||
@@ -31,6 +33,7 @@ impl KmerIndex {
|
||||
mode: MergeMode,
|
||||
force: bool,
|
||||
rename_duplicates: bool,
|
||||
rep: &mut Reporter,
|
||||
) -> OKIResult<Self> {
|
||||
let output = output.as_ref();
|
||||
|
||||
@@ -74,7 +77,14 @@ impl KmerIndex {
|
||||
}
|
||||
|
||||
// ── Bootstrap: copy first source to output ────────────────────────────
|
||||
info!("copying {} → {}", sources[0].root_path.display(), output.display());
|
||||
info!(
|
||||
"bootstrap: copying {} → {} ({} genome(s))",
|
||||
sources[0].root_path.display(),
|
||||
output.display(),
|
||||
sources[0].meta.genomes.len(),
|
||||
);
|
||||
let t = Stage::start("bootstrap");
|
||||
let pb = spinner("bootstrap — copying index …");
|
||||
copy_dir_all(&sources[0].root_path, output)?;
|
||||
|
||||
// Rewrite index.meta with final genome labels and the effective mode.
|
||||
@@ -88,9 +98,14 @@ impl KmerIndex {
|
||||
if mode == MergeMode::Presence {
|
||||
remove_dirs_named(output, "counts")?;
|
||||
}
|
||||
pb.finish_and_clear();
|
||||
rep.push(t.stop());
|
||||
|
||||
// Rebuild spectrums/ from all sources using the (possibly renamed) labels.
|
||||
// Drop the spectrums/ that were copied from source_0 and rebuild from scratch.
|
||||
info!("rebuilding spectrums for {} source(s)", sources.len());
|
||||
let t = Stage::start("spectrums");
|
||||
let pb = spinner("spectrums — copying …");
|
||||
let spectrums_dir = output.join("spectrums");
|
||||
if spectrums_dir.exists() {
|
||||
fs::remove_dir_all(&spectrums_dir)?;
|
||||
@@ -98,6 +113,8 @@ impl KmerIndex {
|
||||
for (src, new_labels) in sources.iter().zip(&source_labels) {
|
||||
copy_spectrums(&src.root_path, output, &src.meta.genomes, new_labels)?;
|
||||
}
|
||||
pb.finish_and_clear();
|
||||
rep.push(t.stop());
|
||||
|
||||
// Open the destination index.
|
||||
let dst = KmerIndex::open(output)?;
|
||||
@@ -107,8 +124,13 @@ impl KmerIndex {
|
||||
// ── Merge each subsequent source partition-by-partition ───────────────
|
||||
let remaining_sources: Vec<&KmerIndex> = sources[1..].to_vec();
|
||||
if !remaining_sources.is_empty() {
|
||||
let mut rep = Reporter::new();
|
||||
let n_src_genomes: usize = remaining_sources.iter().map(|s| s.meta.genomes.len()).sum();
|
||||
info!(
|
||||
"merging {} partition(s) × {} additional source genome(s) into {} destination genome(s)",
|
||||
n_partitions, n_src_genomes, n_dst_genomes,
|
||||
);
|
||||
let t = Stage::start("merge_partitions");
|
||||
let pb = partition_bar(n_partitions as u64);
|
||||
|
||||
let dst_partition = &dst.partition;
|
||||
|
||||
@@ -117,10 +139,13 @@ impl KmerIndex {
|
||||
.filter_map(|i| {
|
||||
let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> =
|
||||
remaining_sources.iter().map(|s| (&s.partition, s.meta.genomes.len())).collect();
|
||||
dst_partition.merge_partition(i, &srcs, mode, n_dst_genomes).err()
|
||||
let result = dst_partition.merge_partition(i, &srcs, mode, n_dst_genomes).err();
|
||||
pb.inc(1);
|
||||
result
|
||||
})
|
||||
.collect();
|
||||
|
||||
pb.finish_and_clear();
|
||||
if let Some(e) = errors.into_iter().next() {
|
||||
return Err(OKIError::Partition(e));
|
||||
}
|
||||
@@ -206,6 +231,31 @@ fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn spinner(msg: &'static str) -> ProgressBar {
|
||||
let pb = ProgressBar::new_spinner();
|
||||
pb.set_style(
|
||||
ProgressStyle::with_template("{spinner} {msg} {elapsed}")
|
||||
.unwrap()
|
||||
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
|
||||
);
|
||||
pb.set_message(msg);
|
||||
pb.enable_steady_tick(Duration::from_millis(100));
|
||||
pb
|
||||
}
|
||||
|
||||
fn partition_bar(n: u64) -> ProgressBar {
|
||||
let pb = ProgressBar::new(n);
|
||||
pb.set_style(
|
||||
ProgressStyle::with_template(
|
||||
"{spinner} merge — {bar:40.cyan/blue} {pos}/{len} partitions {elapsed}",
|
||||
)
|
||||
.unwrap()
|
||||
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
|
||||
);
|
||||
pb.enable_steady_tick(Duration::from_millis(100));
|
||||
pb
|
||||
}
|
||||
|
||||
fn copy_dir_all(src: &Path, dst: &Path) -> io::Result<()> {
|
||||
fs::create_dir_all(dst)?;
|
||||
for entry in fs::read_dir(src)? {
|
||||
|
||||
Reference in New Issue
Block a user