6d85387077
This change enhances observability and adaptability in the merge pipeline. Performance timing and debug logging are added to the De Bruijn graph and partition merge layers to track phase durations and pipeline metrics. The merge module replaces blocking receives with timed polls to sample CPU efficiency, dynamically spawning workers when utilization drops below a threshold. A new script is also introduced to parse merge debug logs and generate structured Markdown reports detailing throughput, phase breakdowns, and partition performance.
580 lines
22 KiB
Rust
580 lines
22 KiB
Rust
use std::collections::HashMap;
|
||
use std::fs;
|
||
use std::io;
|
||
use std::path::Path;
|
||
use std::time::{Duration, Instant};
|
||
|
||
use crossbeam_channel::unbounded;
|
||
use obisys::{CpuSample, Reporter, Stage, progress_bar, spinner};
|
||
use tracing::{debug, info};
|
||
|
||
use obilayeredmap::IndexMode;
|
||
|
||
use crate::error::{OKIError, OKIResult};
|
||
use crate::index::KmerIndex;
|
||
use crate::meta::{GenomeInfo, IndexMeta};
|
||
use crate::state::IndexState;
|
||
|
||
pub use obikpartitionner::MergeMode;
|
||
|
||
// ── per-partition diagnostic record ──────────────────────────────────────────
|
||
|
||
#[derive(Debug)]
|
||
struct PartStat {
|
||
id: usize,
|
||
unitig_bytes: u64,
|
||
g_len: usize,
|
||
}
|
||
|
||
// ── main merge entry point ────────────────────────────────────────────────────
|
||
|
||
impl KmerIndex {
|
||
pub fn merge<P: AsRef<Path>>(
|
||
output: P,
|
||
sources: &[&KmerIndex],
|
||
mode: MergeMode,
|
||
force: bool,
|
||
rename_duplicates: bool,
|
||
budget_fraction: f64,
|
||
rep: &mut Reporter,
|
||
) -> OKIResult<Self> {
|
||
let output = output.as_ref();
|
||
|
||
if sources.is_empty() {
|
||
return Err(OKIError::Io(io::Error::new(
|
||
io::ErrorKind::InvalidInput,
|
||
"merge requires at least one source index",
|
||
)));
|
||
}
|
||
|
||
// ── Validate config compatibility ─────────────────────────────────────
|
||
let ref0 = sources[0];
|
||
for src in sources {
|
||
if src.state() != IndexState::Indexed {
|
||
return Err(OKIError::NotIndexed(src.root_path.clone()));
|
||
}
|
||
if src.kmer_size() != ref0.kmer_size()
|
||
|| src.minimizer_size() != ref0.minimizer_size()
|
||
|| src.n_partitions() != ref0.n_partitions()
|
||
{
|
||
return Err(OKIError::IncompatibleConfig);
|
||
}
|
||
if mode == MergeMode::Count && !src.meta.config.with_counts {
|
||
return Err(OKIError::MismatchedMode);
|
||
}
|
||
}
|
||
|
||
// ── Log source characteristics and choose base ────────────────────────
|
||
let mode_str = if mode == MergeMode::Presence {
|
||
"presence"
|
||
} else {
|
||
"count"
|
||
};
|
||
info!(
|
||
"merge: {} source(s), smer-size={}, mode={}",
|
||
sources.len(),
|
||
sources[0].kmer_size(),
|
||
mode_str,
|
||
);
|
||
for (i, src) in sources.iter().enumerate() {
|
||
let genome_str = if src.meta.genomes.len() == 1 {
|
||
"mono-genome".to_string()
|
||
} else {
|
||
format!("{} genomes", src.meta.genomes.len())
|
||
};
|
||
let trivial_str = if is_trivial(src, mode) {
|
||
" [trivial: no data approximation]"
|
||
} else {
|
||
""
|
||
};
|
||
info!(
|
||
" [{}] {} — {}, {}, {}{}",
|
||
i,
|
||
src.root_path.display(),
|
||
format_evidence(&src.meta.config.evidence),
|
||
genome_str,
|
||
mode_str,
|
||
trivial_str,
|
||
);
|
||
}
|
||
|
||
let base_idx = choose_base(sources, mode);
|
||
let needs_approx = sources.iter().any(|src| {
|
||
!is_trivial(src, mode)
|
||
&& matches!(
|
||
src.meta.config.evidence,
|
||
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
|
||
)
|
||
});
|
||
info!(
|
||
"output evidence: {} ({}base: [{}] {})",
|
||
format_evidence(&sources[base_idx].meta.config.evidence),
|
||
if needs_approx {
|
||
"forced approx — "
|
||
} else {
|
||
""
|
||
},
|
||
base_idx,
|
||
sources[base_idx].root_path.display(),
|
||
);
|
||
|
||
let mut ordered: Vec<&KmerIndex> = Vec::with_capacity(sources.len());
|
||
ordered.push(sources[base_idx]);
|
||
for (i, &src) in sources.iter().enumerate() {
|
||
if i != base_idx {
|
||
ordered.push(src);
|
||
}
|
||
}
|
||
let sources: &[&KmerIndex] = &ordered;
|
||
let evidence = sources[0].meta.config.evidence.clone();
|
||
|
||
// ── Compute final genome labels ────────────────────────────────────────
|
||
let (source_labels, all_genomes) = compute_labels(sources, rename_duplicates)?;
|
||
|
||
// ── Prepare output directory ──────────────────────────────────────────
|
||
if output.exists() {
|
||
if force {
|
||
fs::remove_dir_all(output)?;
|
||
} else {
|
||
return Err(OKIError::Io(io::Error::new(
|
||
io::ErrorKind::AlreadyExists,
|
||
format!("{}: output directory already exists", output.display()),
|
||
)));
|
||
}
|
||
}
|
||
|
||
// ── Bootstrap: copy first source to output ────────────────────────────
|
||
info!(
|
||
"bootstrap: copying {} → {} ({} genome(s))",
|
||
sources[0].root_path.display(),
|
||
output.display(),
|
||
sources[0].meta.genomes.len(),
|
||
);
|
||
let t = Stage::start("bootstrap");
|
||
let pb = spinner("bootstrap");
|
||
pb.set_message("copying index …");
|
||
copy_dir_all(&sources[0].root_path, output)?;
|
||
|
||
let mut meta = IndexMeta::read(output).map_err(OKIError::Io)?;
|
||
meta.genomes = all_genomes;
|
||
meta.config.with_counts = mode == MergeMode::Count;
|
||
meta.config.evidence = evidence.clone();
|
||
meta.write(output)?;
|
||
|
||
if mode == MergeMode::Presence {
|
||
remove_dirs_named(output, "counts")?;
|
||
}
|
||
pb.finish_and_clear();
|
||
rep.push(t.stop());
|
||
|
||
// ── Rebuild spectrums ─────────────────────────────────────────────────
|
||
info!("rebuilding spectrums for {} source(s)", sources.len());
|
||
let t = Stage::start("spectrums");
|
||
let pb = spinner("spectrums");
|
||
pb.set_message("copying …");
|
||
let spectrums_dir = output.join("spectrums");
|
||
if spectrums_dir.exists() {
|
||
fs::remove_dir_all(&spectrums_dir)?;
|
||
}
|
||
for (src, new_labels) in sources.iter().zip(&source_labels) {
|
||
let old_labels: Vec<String> =
|
||
src.meta.genomes.iter().map(|g| g.label.clone()).collect();
|
||
copy_spectrums(&src.root_path, output, &old_labels, new_labels)?;
|
||
}
|
||
pb.finish_and_clear();
|
||
rep.push(t.stop());
|
||
|
||
// ── Open destination ──────────────────────────────────────────────────
|
||
let dst = KmerIndex::open(output)?;
|
||
let n_partitions = dst.n_partitions();
|
||
let n_dst_genomes = sources[0].meta.genomes.len();
|
||
|
||
// ── Merge partitions ──────────────────────────────────────────────────
|
||
let remaining_sources: Vec<&KmerIndex> = sources[1..].to_vec();
|
||
if !remaining_sources.is_empty() {
|
||
let n_src_genomes: usize = remaining_sources.iter().map(|s| s.meta.genomes.len()).sum();
|
||
info!(
|
||
"merging {} partition(s) × {} additional source genome(s) into {} destination genome(s)",
|
||
n_partitions, n_src_genomes, n_dst_genomes,
|
||
);
|
||
let t = Stage::start("merge_partitions");
|
||
let pb = progress_bar("merge", n_partitions as u64, "partitions");
|
||
|
||
let dst_partition = &dst.partition;
|
||
let block_bits = dst.meta.config.block_bits;
|
||
|
||
// Pre-build source list once (avoid rebuilding per partition)
|
||
let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> = remaining_sources
|
||
.iter()
|
||
.map(|s| (&s.partition, s.meta.genomes.len()))
|
||
.collect();
|
||
|
||
// Per-partition unitig byte sizes across remaining sources (stat() only)
|
||
let partition_sizes: Vec<u64> = (0..n_partitions)
|
||
.map(|i| {
|
||
remaining_sources
|
||
.iter()
|
||
.map(|s| partition_unitig_bytes(s, i))
|
||
.sum()
|
||
})
|
||
.collect();
|
||
|
||
// LFD sort: largest partition first
|
||
let mut order: Vec<usize> = (0..n_partitions).collect();
|
||
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
|
||
|
||
// ── Adaptive worker pool ──────────────────────────────────────────
|
||
// Start with 1 worker thread. After each completed partition,
|
||
// measure CPU efficiency (via getrusage delta). If efficiency is
|
||
// below the spawn threshold and more partitions remain, spawn one
|
||
// additional worker. Workers share a crossbeam channel of partition
|
||
// IDs; each reports (id, g_len, duration) on a result channel.
|
||
const SPAWN_THRESHOLD: f64 = 0.95; // spawn when >5% capacity idle
|
||
let n_cores = std::thread::available_parallelism()
|
||
.map(|n| n.get())
|
||
.unwrap_or(1);
|
||
let max_workers = (n_cores / 2).max(1);
|
||
let _ = budget_fraction; // kept in signature for CLI compatibility
|
||
|
||
let (part_tx, part_rx) = unbounded::<usize>();
|
||
let (result_tx, result_rx) =
|
||
unbounded::<(usize, Result<usize, obiskio::SKError>, Duration)>();
|
||
// activate_tx: controller sends () to wake the next dormant worker.
|
||
// Dropping activate_tx closes the channel; dormant workers exit.
|
||
let (activate_tx, activate_rx) = unbounded::<()>();
|
||
|
||
for &i in &order {
|
||
part_tx.send(i).ok();
|
||
}
|
||
drop(part_tx);
|
||
|
||
let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
|
||
let mut n_workers = 0usize;
|
||
let mut cpu_sample = CpuSample::now();
|
||
// Efficiency measured just before each spawn, used to assess
|
||
// whether the previous worker delivered its expected marginal gain.
|
||
let mut efficiency_at_last_spawn = 0.0f64;
|
||
|
||
// Shadow as references so closures can capture them by copy.
|
||
let srcs = &srcs;
|
||
let evidence = &evidence;
|
||
|
||
std::thread::scope(|s| -> OKIResult<()> {
|
||
// Pre-spawn max_workers threads; each waits for an activation
|
||
// signal before consuming from part_rx.
|
||
for _ in 0..max_workers {
|
||
let prx = part_rx.clone();
|
||
let rtx = result_tx.clone();
|
||
let arx = activate_rx.clone();
|
||
s.spawn(move || {
|
||
if arx.recv().is_ok() {
|
||
for i in &prx {
|
||
let t = Instant::now();
|
||
let r = dst_partition.merge_partition(
|
||
i,
|
||
srcs,
|
||
mode,
|
||
n_dst_genomes,
|
||
block_bits,
|
||
evidence,
|
||
);
|
||
rtx.send((i, r, t.elapsed())).ok();
|
||
}
|
||
}
|
||
});
|
||
}
|
||
drop(result_tx);
|
||
|
||
// Activate first worker immediately.
|
||
activate_tx.send(()).ok();
|
||
n_workers = 1;
|
||
|
||
const SPAWN_POLL: Duration = Duration::from_secs(10);
|
||
|
||
let mut completed = 0usize;
|
||
while completed < n_partitions {
|
||
let result = result_rx.recv_timeout(SPAWN_POLL);
|
||
|
||
// On timeout: no partition finished yet, just check efficiency.
|
||
let (i, r, dur) = match result {
|
||
Ok(v) => v,
|
||
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
|
||
if n_workers < max_workers {
|
||
let eff = cpu_sample.cpu_efficiency(n_cores);
|
||
if eff < SPAWN_THRESHOLD {
|
||
debug!(
|
||
"activated worker {} (poll) — efficiency {:.0}%",
|
||
n_workers + 1,
|
||
eff * 100.0,
|
||
);
|
||
efficiency_at_last_spawn = eff;
|
||
activate_tx.send(()).ok();
|
||
n_workers += 1;
|
||
cpu_sample = CpuSample::now();
|
||
}
|
||
}
|
||
continue;
|
||
}
|
||
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
|
||
return Err(OKIError::Io(io::Error::new(
|
||
io::ErrorKind::UnexpectedEof,
|
||
"worker channel closed",
|
||
)));
|
||
}
|
||
};
|
||
let g_len = r.map_err(OKIError::Partition)?;
|
||
pb.inc(1);
|
||
debug!(
|
||
"partition {i}: done in {:.1}s — {} new kmers",
|
||
dur.as_secs_f64(),
|
||
g_len
|
||
);
|
||
part_stats.push(PartStat {
|
||
id: i,
|
||
unitig_bytes: partition_sizes[i],
|
||
g_len,
|
||
});
|
||
completed += 1;
|
||
|
||
if n_workers < max_workers && completed < n_partitions {
|
||
let eff = cpu_sample.cpu_efficiency(n_cores);
|
||
// For the first spawn use SPAWN_THRESHOLD.
|
||
// For subsequent spawns: the previous worker should
|
||
// have raised efficiency by at least a quarter of the expected
|
||
// marginal gain (1/n_workers). If not, adding another
|
||
// worker won't help.
|
||
let should_spawn = if n_workers == 1 {
|
||
eff < SPAWN_THRESHOLD
|
||
} else {
|
||
let gain = eff - efficiency_at_last_spawn;
|
||
let expected = 1.0 / n_workers as f64;
|
||
gain >= expected * 0.25
|
||
};
|
||
if should_spawn {
|
||
debug!(
|
||
"activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%",
|
||
n_workers + 1,
|
||
eff * 100.0,
|
||
(eff - efficiency_at_last_spawn) * 100.0,
|
||
);
|
||
efficiency_at_last_spawn = eff;
|
||
activate_tx.send(()).ok();
|
||
n_workers += 1;
|
||
cpu_sample = CpuSample::now();
|
||
}
|
||
}
|
||
}
|
||
// Close activate_tx: dormant workers exit cleanly.
|
||
drop(activate_tx);
|
||
Ok(())
|
||
})?;
|
||
|
||
pb.finish_and_clear();
|
||
|
||
// ── Diagnostic report ─────────────────────────────────────────────
|
||
print_merge_partition_report(&part_stats, n_workers, max_workers);
|
||
|
||
rep.push(t.stop());
|
||
}
|
||
|
||
// ── Pack matrices after merge ─────────────────────────────────────────
|
||
{
|
||
let t = Stage::start("pack");
|
||
let pb = spinner("pack");
|
||
pb.set_message("consolidating column files …");
|
||
let dst2 = KmerIndex::open(output)?;
|
||
dst2.pack_matrices()?;
|
||
pb.finish_and_clear();
|
||
rep.push(t.stop());
|
||
}
|
||
|
||
KmerIndex::open(output)
|
||
}
|
||
}
|
||
|
||
// ── Diagnostic report ─────────────────────────────────────────────────────────
|
||
|
||
fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_workers: usize) {
|
||
let total_new: usize = stats.iter().map(|s| s.g_len).sum();
|
||
let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count();
|
||
|
||
if non_empty == 0 {
|
||
info!("merge_partitions report: no data (all partitions empty)");
|
||
return;
|
||
}
|
||
|
||
info!("─── merge_partitions report ───");
|
||
info!(
|
||
" {} partition(s) processed, {} total new kmers",
|
||
non_empty, total_new,
|
||
);
|
||
info!(" workers spawned: {n_workers} / {max_workers} (max)",);
|
||
|
||
// Top 8 partitions by new-kmer count
|
||
let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect();
|
||
by_new.sort_by_key(|s| std::cmp::Reverse(s.g_len));
|
||
if !by_new.is_empty() {
|
||
info!(" top partitions by new kmers:");
|
||
for s in by_new.iter().take(8) {
|
||
info!(
|
||
" partition {:4} : {}M new kmers ({} unitig bytes)",
|
||
s.id,
|
||
s.g_len / 1_000_000,
|
||
fmt_bytes(s.unitig_bytes),
|
||
);
|
||
}
|
||
}
|
||
info!("───────────────────────────────");
|
||
}
|
||
|
||
// ── helpers ───────────────────────────────────────────────────────────────────
|
||
|
||
fn fmt_bytes(b: u64) -> String {
|
||
if b >= 1 << 30 {
|
||
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64)
|
||
} else if b >= 1 << 20 {
|
||
format!("{:.1} MB", b as f64 / (1u64 << 20) as f64)
|
||
} else if b >= 1 << 10 {
|
||
format!("{:.1} KB", b as f64 / (1u64 << 10) as f64)
|
||
} else {
|
||
format!("{b} B")
|
||
}
|
||
}
|
||
|
||
/// Sum of all unitigs.bin sizes across all layers of partition `i` in `src`.
|
||
fn partition_unitig_bytes(src: &KmerIndex, i: usize) -> u64 {
|
||
let mut total = 0u64;
|
||
for l in 0.. {
|
||
let p = src.layer_unitigs_path(i, l);
|
||
if !p.exists() {
|
||
break;
|
||
}
|
||
if let Ok(m) = std::fs::metadata(&p) {
|
||
total += m.len();
|
||
}
|
||
}
|
||
total
|
||
}
|
||
|
||
fn compute_labels(
|
||
sources: &[&KmerIndex],
|
||
rename_duplicates: bool,
|
||
) -> OKIResult<(Vec<Vec<String>>, Vec<GenomeInfo>)> {
|
||
let mut seen: HashMap<String, usize> = HashMap::new();
|
||
let mut source_labels: Vec<Vec<String>> = Vec::with_capacity(sources.len());
|
||
let mut all_genomes: Vec<GenomeInfo> = Vec::new();
|
||
|
||
for src in sources {
|
||
let mut labels = Vec::with_capacity(src.meta.genomes.len());
|
||
for genome in &src.meta.genomes {
|
||
let label = &genome.label;
|
||
let count = seen.entry(label.clone()).or_insert(0);
|
||
let new_label = if *count == 0 {
|
||
label.clone()
|
||
} else if rename_duplicates {
|
||
format!("{label}.{count}")
|
||
} else {
|
||
return Err(OKIError::DuplicateGenomeLabel(label.clone()));
|
||
};
|
||
*count += 1;
|
||
labels.push(new_label.clone());
|
||
all_genomes.push(GenomeInfo {
|
||
label: new_label,
|
||
meta: genome.meta.clone(),
|
||
});
|
||
}
|
||
source_labels.push(labels);
|
||
}
|
||
|
||
Ok((source_labels, all_genomes))
|
||
}
|
||
|
||
fn copy_spectrums(
|
||
src_root: &Path,
|
||
dst_root: &Path,
|
||
old_labels: &[String],
|
||
new_labels: &[String],
|
||
) -> io::Result<()> {
|
||
let src_dir = src_root.join("spectrums");
|
||
let dst_dir = dst_root.join("spectrums");
|
||
fs::create_dir_all(&dst_dir)?;
|
||
for (old, new) in old_labels.iter().zip(new_labels.iter()) {
|
||
let src_file = src_dir.join(format!("{old}.json"));
|
||
if src_file.exists() {
|
||
fs::copy(&src_file, dst_dir.join(format!("{new}.json")))?;
|
||
}
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> {
|
||
for entry in fs::read_dir(root)? {
|
||
let entry = entry?;
|
||
let path = entry.path();
|
||
if path.is_dir() {
|
||
if path.file_name().and_then(|n| n.to_str()) == Some(name) {
|
||
fs::remove_dir_all(&path)?;
|
||
} else {
|
||
remove_dirs_named(&path, name)?;
|
||
}
|
||
}
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
fn format_evidence(ev: &IndexMode) -> String {
|
||
match ev {
|
||
IndexMode::Exact => "exact".to_string(),
|
||
IndexMode::Approx { b, z } => format!("approx (b={b}, z={z})"),
|
||
IndexMode::Hybrid { b, z } => format!("hybrid (b={b}, z={z})"),
|
||
}
|
||
}
|
||
|
||
fn is_trivial(src: &KmerIndex, mode: MergeMode) -> bool {
|
||
src.meta.genomes.len() == 1 && mode == MergeMode::Presence
|
||
}
|
||
|
||
fn index_unitig_size(src: &KmerIndex) -> u64 {
|
||
let n = src.partition.n_partitions();
|
||
(0..n).map(|i| partition_unitig_bytes(src, i)).sum()
|
||
}
|
||
|
||
fn choose_base(sources: &[&KmerIndex], mode: MergeMode) -> usize {
|
||
let needs_approx = sources.iter().any(|src| {
|
||
!is_trivial(src, mode)
|
||
&& matches!(
|
||
src.meta.config.evidence,
|
||
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
|
||
)
|
||
});
|
||
|
||
sources
|
||
.iter()
|
||
.enumerate()
|
||
.filter(|(_, src)| {
|
||
!needs_approx
|
||
|| matches!(
|
||
src.meta.config.evidence,
|
||
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
|
||
)
|
||
})
|
||
.max_by_key(|(_, src)| index_unitig_size(src))
|
||
.map(|(i, _)| i)
|
||
.unwrap()
|
||
}
|
||
|
||
fn copy_dir_all(src: &Path, dst: &Path) -> io::Result<()> {
|
||
fs::create_dir_all(dst)?;
|
||
for entry in fs::read_dir(src)? {
|
||
let entry = entry?;
|
||
let src_path = entry.path();
|
||
let dst_path = dst.join(entry.file_name());
|
||
if src_path.is_dir() {
|
||
copy_dir_all(&src_path, &dst_path)?;
|
||
} else {
|
||
fs::copy(&src_path, &dst_path)?;
|
||
}
|
||
}
|
||
Ok(())
|
||
}
|