c694e1f2b0
Introduces a Make-based orchestration for simulating, indexing, merging, filtering, and verifying k-mer counts and presence. Exposes internal builder and iterator APIs publicly, enforces mandatory leading slashes for predicate patterns, registers the `obitaxonomy` crate, and updates tooling configurations alongside documentation.
457 lines
16 KiB
Rust
457 lines
16 KiB
Rust
use std::collections::HashMap;
|
||
use std::fs;
|
||
use std::io;
|
||
use std::path::Path;
|
||
|
||
use obisys::{Reporter, Stage, progress_bar, spinner};
|
||
use tracing::{debug, info};
|
||
|
||
use obilayeredmap::IndexMode;
|
||
|
||
use crate::error::{OKIError, OKIResult};
|
||
use crate::index::KmerIndex;
|
||
use crate::meta::{GenomeInfo, IndexMeta};
|
||
use crate::state::{IndexState, SENTINEL_INDEXED};
|
||
|
||
pub use obikpartitionner::MergeMode;
|
||
|
||
// ── per-partition diagnostic record ──────────────────────────────────────────
|
||
|
||
#[derive(Debug)]
|
||
struct PartStat {
|
||
id: usize,
|
||
unitig_bytes: u64,
|
||
g_len: usize,
|
||
}
|
||
|
||
// ── main merge entry point ────────────────────────────────────────────────────
|
||
|
||
impl KmerIndex {
|
||
pub fn merge<P: AsRef<Path>>(
|
||
output: P,
|
||
sources: &[&KmerIndex],
|
||
mode: MergeMode,
|
||
force: bool,
|
||
rename_duplicates: bool,
|
||
budget_fraction: f64,
|
||
rep: &mut Reporter,
|
||
) -> OKIResult<Self> {
|
||
let output = output.as_ref();
|
||
|
||
if sources.is_empty() {
|
||
return Err(OKIError::Io(io::Error::new(
|
||
io::ErrorKind::InvalidInput,
|
||
"merge requires at least one source index",
|
||
)));
|
||
}
|
||
|
||
// ── Validate config compatibility ─────────────────────────────────────
|
||
let ref0 = sources[0];
|
||
for src in sources {
|
||
if src.state() != IndexState::Indexed {
|
||
return Err(OKIError::NotIndexed(src.root_path.clone()));
|
||
}
|
||
if src.kmer_size() != ref0.kmer_size()
|
||
|| src.minimizer_size() != ref0.minimizer_size()
|
||
|| src.n_partitions() != ref0.n_partitions()
|
||
{
|
||
return Err(OKIError::IncompatibleConfig);
|
||
}
|
||
if mode == MergeMode::Count && !src.meta.config.with_counts {
|
||
return Err(OKIError::MismatchedMode);
|
||
}
|
||
}
|
||
|
||
// ── Log source characteristics and choose base ────────────────────────
|
||
let mode_str = if mode == MergeMode::Presence {
|
||
"presence"
|
||
} else {
|
||
"count"
|
||
};
|
||
info!(
|
||
"merge: {} source(s), smer-size={}, mode={}",
|
||
sources.len(),
|
||
sources[0].kmer_size(),
|
||
mode_str,
|
||
);
|
||
for (i, src) in sources.iter().enumerate() {
|
||
let genome_str = if src.meta.genomes.len() == 1 {
|
||
"mono-genome".to_string()
|
||
} else {
|
||
format!("{} genomes", src.meta.genomes.len())
|
||
};
|
||
let trivial_str = if is_trivial(src, mode) {
|
||
" [trivial: no data approximation]"
|
||
} else {
|
||
""
|
||
};
|
||
info!(
|
||
" [{}] {} — {}, {}, {}{}",
|
||
i,
|
||
src.root_path.display(),
|
||
format_evidence(&src.meta.config.evidence),
|
||
genome_str,
|
||
mode_str,
|
||
trivial_str,
|
||
);
|
||
}
|
||
|
||
let base_idx = choose_base(sources, mode);
|
||
let needs_approx = sources.iter().any(|src| {
|
||
!is_trivial(src, mode)
|
||
&& matches!(
|
||
src.meta.config.evidence,
|
||
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
|
||
)
|
||
});
|
||
info!(
|
||
"output evidence: {} ({}base: [{}] {})",
|
||
format_evidence(&sources[base_idx].meta.config.evidence),
|
||
if needs_approx {
|
||
"forced approx — "
|
||
} else {
|
||
""
|
||
},
|
||
base_idx,
|
||
sources[base_idx].root_path.display(),
|
||
);
|
||
|
||
let mut ordered: Vec<&KmerIndex> = Vec::with_capacity(sources.len());
|
||
ordered.push(sources[base_idx]);
|
||
for (i, &src) in sources.iter().enumerate() {
|
||
if i != base_idx {
|
||
ordered.push(src);
|
||
}
|
||
}
|
||
let sources: &[&KmerIndex] = &ordered;
|
||
let evidence = sources[0].meta.config.evidence.clone();
|
||
|
||
// ── Compute final genome labels ────────────────────────────────────────
|
||
let (source_labels, all_genomes) = compute_labels(sources, rename_duplicates)?;
|
||
|
||
// ── Prepare output directory ──────────────────────────────────────────
|
||
if output.exists() {
|
||
if force {
|
||
fs::remove_dir_all(output)?;
|
||
} else {
|
||
return Err(OKIError::Io(io::Error::new(
|
||
io::ErrorKind::AlreadyExists,
|
||
format!("{}: output directory already exists", output.display()),
|
||
)));
|
||
}
|
||
}
|
||
|
||
// ── Bootstrap: copy first source to output ────────────────────────────
|
||
info!(
|
||
"bootstrap: copying {} → {} ({} genome(s))",
|
||
sources[0].root_path.display(),
|
||
output.display(),
|
||
sources[0].meta.genomes.len(),
|
||
);
|
||
let t = Stage::start("bootstrap");
|
||
let pb = spinner("bootstrap");
|
||
pb.set_message("copying index …");
|
||
copy_dir_all(&sources[0].root_path, output)?;
|
||
|
||
let mut meta = IndexMeta::read(output).map_err(OKIError::Io)?;
|
||
meta.genomes = all_genomes;
|
||
meta.config.with_counts = mode == MergeMode::Count;
|
||
meta.config.evidence = evidence.clone();
|
||
meta.write(output)?;
|
||
|
||
if mode == MergeMode::Presence {
|
||
remove_dirs_named(output, "counts")?;
|
||
}
|
||
pb.finish_and_clear();
|
||
rep.push(t.stop());
|
||
|
||
// ── Rebuild spectrums ─────────────────────────────────────────────────
|
||
info!("rebuilding spectrums for {} source(s)", sources.len());
|
||
let t = Stage::start("spectrums");
|
||
let pb = spinner("spectrums");
|
||
pb.set_message("copying …");
|
||
let spectrums_dir = output.join("spectrums");
|
||
if spectrums_dir.exists() {
|
||
fs::remove_dir_all(&spectrums_dir)?;
|
||
}
|
||
for (src, new_labels) in sources.iter().zip(&source_labels) {
|
||
let old_labels: Vec<String> =
|
||
src.meta.genomes.iter().map(|g| g.label.clone()).collect();
|
||
copy_spectrums(&src.root_path, output, &old_labels, new_labels)?;
|
||
}
|
||
pb.finish_and_clear();
|
||
rep.push(t.stop());
|
||
|
||
// ── Open destination ──────────────────────────────────────────────────
|
||
let dst = KmerIndex::open(output)?;
|
||
let n_partitions = dst.n_partitions();
|
||
let n_dst_genomes = sources[0].meta.genomes.len();
|
||
|
||
// ── Merge partitions ──────────────────────────────────────────────────
|
||
let remaining_sources: Vec<&KmerIndex> = sources[1..].to_vec();
|
||
if !remaining_sources.is_empty() {
|
||
let n_src_genomes: usize = remaining_sources.iter().map(|s| s.meta.genomes.len()).sum();
|
||
info!(
|
||
"merging {} partition(s) × {} additional source genome(s) into {} destination genome(s)",
|
||
n_partitions, n_src_genomes, n_dst_genomes,
|
||
);
|
||
let t = Stage::start("merge_partitions");
|
||
let pb = progress_bar("merge", n_partitions as u64, "partitions");
|
||
|
||
let dst_partition = &dst.partition;
|
||
let block_bits = dst.meta.config.block_bits;
|
||
|
||
// Pre-build source list once (avoid rebuilding per partition)
|
||
let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> = remaining_sources
|
||
.iter()
|
||
.map(|s| (&s.partition, s.meta.genomes.len()))
|
||
.collect();
|
||
|
||
// Per-partition unitig byte sizes across remaining sources (stat() only)
|
||
let partition_sizes: Vec<u64> = (0..n_partitions)
|
||
.map(|i| {
|
||
remaining_sources
|
||
.iter()
|
||
.map(|s| partition_unitig_bytes(s, i))
|
||
.sum()
|
||
})
|
||
.collect();
|
||
|
||
// LFD sort: largest partition first
|
||
let mut order: Vec<usize> = (0..n_partitions).collect();
|
||
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
|
||
|
||
let _ = budget_fraction; // kept in signature for CLI compatibility
|
||
|
||
// Shadow as references so closures can capture them by copy.
|
||
let srcs = &srcs;
|
||
let evidence = &evidence;
|
||
|
||
let runner = crate::numa::PartitionRunner::new();
|
||
let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
|
||
|
||
runner.run(
|
||
&order,
|
||
|i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence),
|
||
|i, g_len, dur| {
|
||
pb.inc(1);
|
||
debug!(
|
||
"partition {i}: done in {:.1}s — {} new kmers",
|
||
dur.as_secs_f64(),
|
||
g_len,
|
||
);
|
||
part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len });
|
||
},
|
||
).map_err(OKIError::Partition)?;
|
||
|
||
pb.finish_and_clear();
|
||
|
||
// ── Diagnostic report ─────────────────────────────────────────────
|
||
print_merge_partition_report(&part_stats, runner.max_workers());
|
||
|
||
rep.push(t.stop());
|
||
}
|
||
|
||
// ── Pack matrices after merge ─────────────────────────────────────────
|
||
{
|
||
let t = Stage::start("pack");
|
||
let pb = spinner("pack");
|
||
pb.set_message("consolidating column files …");
|
||
let dst2 = KmerIndex::open(output)?;
|
||
dst2.pack_matrices()?;
|
||
pb.finish_and_clear();
|
||
rep.push(t.stop());
|
||
}
|
||
|
||
fs::File::create(output.join(SENTINEL_INDEXED)).map_err(OKIError::Io)?;
|
||
|
||
KmerIndex::open(output)
|
||
}
|
||
}
|
||
|
||
// ── Diagnostic report ─────────────────────────────────────────────────────────
|
||
|
||
fn print_merge_partition_report(stats: &[PartStat], max_workers: usize) {
|
||
let total_new: usize = stats.iter().map(|s| s.g_len).sum();
|
||
let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count();
|
||
|
||
if non_empty == 0 {
|
||
info!("merge_partitions report: no data (all partitions empty)");
|
||
return;
|
||
}
|
||
|
||
info!("─── merge_partitions report ───");
|
||
info!(
|
||
" {} partition(s) processed, {} total new kmers",
|
||
non_empty, total_new,
|
||
);
|
||
info!(" max workers: {max_workers}");
|
||
|
||
// Top 8 partitions by new-kmer count
|
||
let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect();
|
||
by_new.sort_by_key(|s| std::cmp::Reverse(s.g_len));
|
||
if !by_new.is_empty() {
|
||
info!(" top partitions by new kmers:");
|
||
for s in by_new.iter().take(8) {
|
||
info!(
|
||
" partition {:4} : {}M new kmers ({} unitig bytes)",
|
||
s.id,
|
||
s.g_len / 1_000_000,
|
||
fmt_bytes(s.unitig_bytes),
|
||
);
|
||
}
|
||
}
|
||
info!("───────────────────────────────");
|
||
}
|
||
|
||
// ── helpers ───────────────────────────────────────────────────────────────────
|
||
|
||
fn fmt_bytes(b: u64) -> String {
|
||
if b >= 1 << 30 {
|
||
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64)
|
||
} else if b >= 1 << 20 {
|
||
format!("{:.1} MB", b as f64 / (1u64 << 20) as f64)
|
||
} else if b >= 1 << 10 {
|
||
format!("{:.1} KB", b as f64 / (1u64 << 10) as f64)
|
||
} else {
|
||
format!("{b} B")
|
||
}
|
||
}
|
||
|
||
/// Sum of all unitigs.bin sizes across all layers of partition `i` in `src`.
|
||
fn partition_unitig_bytes(src: &KmerIndex, i: usize) -> u64 {
|
||
let mut total = 0u64;
|
||
for l in 0.. {
|
||
let p = src.layer_unitigs_path(i, l);
|
||
if !p.exists() {
|
||
break;
|
||
}
|
||
if let Ok(m) = std::fs::metadata(&p) {
|
||
total += m.len();
|
||
}
|
||
}
|
||
total
|
||
}
|
||
|
||
fn compute_labels(
|
||
sources: &[&KmerIndex],
|
||
rename_duplicates: bool,
|
||
) -> OKIResult<(Vec<Vec<String>>, Vec<GenomeInfo>)> {
|
||
let mut seen: HashMap<String, usize> = HashMap::new();
|
||
let mut source_labels: Vec<Vec<String>> = Vec::with_capacity(sources.len());
|
||
let mut all_genomes: Vec<GenomeInfo> = Vec::new();
|
||
|
||
for src in sources {
|
||
let mut labels = Vec::with_capacity(src.meta.genomes.len());
|
||
for genome in &src.meta.genomes {
|
||
let label = &genome.label;
|
||
let count = seen.entry(label.clone()).or_insert(0);
|
||
let new_label = if *count == 0 {
|
||
label.clone()
|
||
} else if rename_duplicates {
|
||
format!("{label}.{count}")
|
||
} else {
|
||
return Err(OKIError::DuplicateGenomeLabel(label.clone()));
|
||
};
|
||
*count += 1;
|
||
labels.push(new_label.clone());
|
||
all_genomes.push(GenomeInfo {
|
||
label: new_label,
|
||
meta: genome.meta.clone(),
|
||
});
|
||
}
|
||
source_labels.push(labels);
|
||
}
|
||
|
||
Ok((source_labels, all_genomes))
|
||
}
|
||
|
||
fn copy_spectrums(
|
||
src_root: &Path,
|
||
dst_root: &Path,
|
||
old_labels: &[String],
|
||
new_labels: &[String],
|
||
) -> io::Result<()> {
|
||
let src_dir = src_root.join("spectrums");
|
||
let dst_dir = dst_root.join("spectrums");
|
||
fs::create_dir_all(&dst_dir)?;
|
||
for (old, new) in old_labels.iter().zip(new_labels.iter()) {
|
||
let src_file = src_dir.join(format!("{old}.json"));
|
||
if src_file.exists() {
|
||
fs::copy(&src_file, dst_dir.join(format!("{new}.json")))?;
|
||
}
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> {
|
||
for entry in fs::read_dir(root)? {
|
||
let entry = entry?;
|
||
let path = entry.path();
|
||
if path.is_dir() {
|
||
if path.file_name().and_then(|n| n.to_str()) == Some(name) {
|
||
fs::remove_dir_all(&path)?;
|
||
} else {
|
||
remove_dirs_named(&path, name)?;
|
||
}
|
||
}
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
fn format_evidence(ev: &IndexMode) -> String {
|
||
match ev {
|
||
IndexMode::Exact => "exact".to_string(),
|
||
IndexMode::Approx { b, z } => format!("approx (b={b}, z={z})"),
|
||
IndexMode::Hybrid { b, z } => format!("hybrid (b={b}, z={z})"),
|
||
}
|
||
}
|
||
|
||
fn is_trivial(src: &KmerIndex, mode: MergeMode) -> bool {
|
||
src.meta.genomes.len() == 1 && mode == MergeMode::Presence
|
||
}
|
||
|
||
fn index_unitig_size(src: &KmerIndex) -> u64 {
|
||
let n = src.partition.n_partitions();
|
||
(0..n).map(|i| partition_unitig_bytes(src, i)).sum()
|
||
}
|
||
|
||
fn choose_base(sources: &[&KmerIndex], mode: MergeMode) -> usize {
|
||
let needs_approx = sources.iter().any(|src| {
|
||
!is_trivial(src, mode)
|
||
&& matches!(
|
||
src.meta.config.evidence,
|
||
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
|
||
)
|
||
});
|
||
|
||
sources
|
||
.iter()
|
||
.enumerate()
|
||
.filter(|(_, src)| {
|
||
!needs_approx
|
||
|| matches!(
|
||
src.meta.config.evidence,
|
||
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
|
||
)
|
||
})
|
||
.max_by_key(|(_, src)| index_unitig_size(src))
|
||
.map(|(i, _)| i)
|
||
.unwrap()
|
||
}
|
||
|
||
fn copy_dir_all(src: &Path, dst: &Path) -> io::Result<()> {
|
||
fs::create_dir_all(dst)?;
|
||
for entry in fs::read_dir(src)? {
|
||
let entry = entry?;
|
||
let src_path = entry.path();
|
||
let dst_path = dst.join(entry.file_name());
|
||
if src_path.is_dir() {
|
||
copy_dir_all(&src_path, &dst_path)?;
|
||
} else {
|
||
fs::copy(&src_path, &dst_path)?;
|
||
}
|
||
}
|
||
Ok(())
|
||
}
|