style: apply consistent formatting and whitespace normalization

Applies consistent formatting, whitespace normalization, and indentation standardization to `debruijn.rs` and `merge.rs`. Reorganizes imports and downgrades a unitig traversal log from `info!` to `debug!`. No functional logic or runtime behavior is altered.
This commit is contained in:
Eric Coissac
2026-06-13 11:41:14 +02:00
parent bc14346f5f
commit fddf630772
2 changed files with 209 additions and 68 deletions
+129 -47
View File
@@ -21,9 +21,9 @@ pub use obikpartitionner::MergeMode;
#[derive(Debug)]
struct PartStat {
id: usize,
id: usize,
unitig_bytes: u64,
g_len: usize,
g_len: usize,
}
// ── main merge entry point ────────────────────────────────────────────────────
@@ -53,9 +53,9 @@ impl KmerIndex {
if src.state() != IndexState::Indexed {
return Err(OKIError::NotIndexed(src.root_path.clone()));
}
if src.kmer_size() != ref0.kmer_size()
|| src.minimizer_size() != ref0.minimizer_size()
|| src.n_partitions() != ref0.n_partitions()
if src.kmer_size() != ref0.kmer_size()
|| src.minimizer_size() != ref0.minimizer_size()
|| src.n_partitions() != ref0.n_partitions()
{
return Err(OKIError::IncompatibleConfig);
}
@@ -65,39 +65,65 @@ impl KmerIndex {
}
// ── Log source characteristics and choose base ────────────────────────
let mode_str = if mode == MergeMode::Presence { "presence" } else { "count" };
let mode_str = if mode == MergeMode::Presence {
"presence"
} else {
"count"
};
info!(
"merge: {} source(s), smer-size={}, mode={}",
sources.len(), sources[0].kmer_size(), mode_str,
sources.len(),
sources[0].kmer_size(),
mode_str,
);
for (i, src) in sources.iter().enumerate() {
let genome_str = if src.meta.genomes.len() == 1 { "mono-genome".to_string() }
else { format!("{} genomes", src.meta.genomes.len()) };
let trivial_str = if is_trivial(src, mode) { " [trivial: no data approximation]" } else { "" };
let genome_str = if src.meta.genomes.len() == 1 {
"mono-genome".to_string()
} else {
format!("{} genomes", src.meta.genomes.len())
};
let trivial_str = if is_trivial(src, mode) {
" [trivial: no data approximation]"
} else {
""
};
info!(
" [{}] {} — {}, {}, {}{}",
i, src.root_path.display(),
i,
src.root_path.display(),
format_evidence(&src.meta.config.evidence),
genome_str, mode_str, trivial_str,
genome_str,
mode_str,
trivial_str,
);
}
let base_idx = choose_base(sources, mode);
let needs_approx = sources.iter().any(|src| {
!is_trivial(src, mode)
&& matches!(src.meta.config.evidence, IndexMode::Approx { .. } | IndexMode::Hybrid { .. })
&& matches!(
src.meta.config.evidence,
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
)
});
info!(
"output evidence: {} ({}base: [{}] {})",
format_evidence(&sources[base_idx].meta.config.evidence),
if needs_approx { "forced approx — " } else { "" },
base_idx, sources[base_idx].root_path.display(),
if needs_approx {
"forced approx — "
} else {
""
},
base_idx,
sources[base_idx].root_path.display(),
);
let mut ordered: Vec<&KmerIndex> = Vec::with_capacity(sources.len());
ordered.push(sources[base_idx]);
for (i, &src) in sources.iter().enumerate() {
if i != base_idx { ordered.push(src); }
if i != base_idx {
ordered.push(src);
}
}
let sources: &[&KmerIndex] = &ordered;
let evidence = sources[0].meta.config.evidence.clone();
@@ -151,7 +177,8 @@ impl KmerIndex {
fs::remove_dir_all(&spectrums_dir)?;
}
for (src, new_labels) in sources.iter().zip(&source_labels) {
let old_labels: Vec<String> = src.meta.genomes.iter().map(|g| g.label.clone()).collect();
let old_labels: Vec<String> =
src.meta.genomes.iter().map(|g| g.label.clone()).collect();
copy_spectrums(&src.root_path, output, &old_labels, new_labels)?;
}
pb.finish_and_clear();
@@ -184,9 +211,12 @@ impl KmerIndex {
// Per-partition unitig byte sizes across remaining sources (stat() only)
let partition_sizes: Vec<u64> = (0..n_partitions)
.map(|i| remaining_sources.iter()
.map(|s| partition_unitig_bytes(s, i))
.sum())
.map(|i| {
remaining_sources
.iter()
.map(|s| partition_unitig_bytes(s, i))
.sum()
})
.collect();
// LFD sort: largest partition first
@@ -201,7 +231,8 @@ impl KmerIndex {
// IDs; each reports (id, g_len, duration) on a result channel.
const SPAWN_THRESHOLD: f64 = 0.95; // spawn when >5% capacity idle
let n_cores = std::thread::available_parallelism()
.map(|n| n.get()).unwrap_or(1);
.map(|n| n.get())
.unwrap_or(1);
let max_workers = (n_cores / 2).max(1);
let _ = budget_fraction; // kept in signature for CLI compatibility
@@ -220,6 +251,9 @@ impl KmerIndex {
let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
let mut n_workers = 0usize;
let mut cpu_sample = CpuSample::now();
// Efficiency measured just before each spawn, used to assess
// whether the previous worker delivered its expected marginal gain.
let mut efficiency_at_last_spawn = 0.0f64;
// Shadow as references so closures can capture them by copy.
let srcs = &srcs;
@@ -237,7 +271,12 @@ impl KmerIndex {
for i in &prx {
let t = Instant::now();
let r = dst_partition.merge_partition(
i, srcs, mode, n_dst_genomes, block_bits, evidence,
i,
srcs,
mode,
n_dst_genomes,
block_bits,
evidence,
);
rtx.send((i, r, t.elapsed())).ok();
}
@@ -252,26 +291,51 @@ impl KmerIndex {
let mut completed = 0usize;
while completed < n_partitions {
let (i, r, dur) = result_rx.recv()
.map_err(|_| OKIError::Io(io::Error::new(
io::ErrorKind::UnexpectedEof, "worker channel closed")))?;
let (i, r, dur) = result_rx.recv().map_err(|_| {
OKIError::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"worker channel closed",
))
})?;
let g_len = r.map_err(OKIError::Partition)?;
pb.inc(1);
debug!("partition {i}: done in {:.1}s — {} new kmers",
dur.as_secs_f64(), g_len);
debug!(
"partition {i}: done in {:.1}s — {} new kmers",
dur.as_secs_f64(),
g_len
);
part_stats.push(PartStat {
id: i, unitig_bytes: partition_sizes[i], g_len,
id: i,
unitig_bytes: partition_sizes[i],
g_len,
});
completed += 1;
if n_workers < max_workers && completed < n_partitions {
let eff = cpu_sample.cpu_efficiency(n_cores);
if eff < SPAWN_THRESHOLD {
// For the first spawn use SPAWN_THRESHOLD.
// For subsequent spawns: the previous worker should
// have raised efficiency by at least a quarter of the expected
// marginal gain (1/n_workers). If not, adding another
// worker won't help.
let should_spawn = if n_workers == 1 {
eff < SPAWN_THRESHOLD
} else {
let gain = eff - efficiency_at_last_spawn;
let expected = 1.0 / n_workers as f64;
gain >= expected * 0.25
};
if should_spawn {
debug!(
"activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%",
n_workers + 1,
eff * 100.0,
(eff - efficiency_at_last_spawn) * 100.0,
);
efficiency_at_last_spawn = eff;
activate_tx.send(()).ok();
n_workers += 1;
cpu_sample = CpuSample::now();
debug!("activated worker {n_workers} — efficiency {:.0}%",
eff * 100.0);
}
}
}
@@ -319,9 +383,7 @@ fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_worker
" {} partition(s) processed, {} total new kmers",
non_empty, total_new,
);
info!(
" workers spawned: {n_workers} / {max_workers} (max)",
);
info!(" workers spawned: {n_workers} / {max_workers} (max)",);
// Top 8 partitions by new-kmer count
let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect();
@@ -343,10 +405,15 @@ fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_worker
// ── helpers ───────────────────────────────────────────────────────────────────
fn fmt_bytes(b: u64) -> String {
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
else if b >= 1 << 20 { format!("{:.1} MB", b as f64 / (1u64 << 20) as f64) }
else if b >= 1 << 10 { format!("{:.1} KB", b as f64 / (1u64 << 10) as f64) }
else { format!("{b} B") }
if b >= 1 << 30 {
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64)
} else if b >= 1 << 20 {
format!("{:.1} MB", b as f64 / (1u64 << 20) as f64)
} else if b >= 1 << 10 {
format!("{:.1} KB", b as f64 / (1u64 << 10) as f64)
} else {
format!("{b} B")
}
}
/// Sum of all unitigs.bin sizes across all layers of partition `i` in `src`.
@@ -354,8 +421,12 @@ fn partition_unitig_bytes(src: &KmerIndex, i: usize) -> u64 {
let mut total = 0u64;
for l in 0.. {
let p = src.layer_unitigs_path(i, l);
if !p.exists() { break; }
if let Ok(m) = std::fs::metadata(&p) { total += m.len(); }
if !p.exists() {
break;
}
if let Ok(m) = std::fs::metadata(&p) {
total += m.len();
}
}
total
}
@@ -382,7 +453,10 @@ fn compute_labels(
};
*count += 1;
labels.push(new_label.clone());
all_genomes.push(GenomeInfo { label: new_label, meta: genome.meta.clone() });
all_genomes.push(GenomeInfo {
label: new_label,
meta: genome.meta.clone(),
});
}
source_labels.push(labels);
}
@@ -425,9 +499,9 @@ fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> {
fn format_evidence(ev: &IndexMode) -> String {
match ev {
IndexMode::Exact => "exact".to_string(),
IndexMode::Approx { b, z } => format!("approx (b={b}, z={z})"),
IndexMode::Hybrid { b, z } => format!("hybrid (b={b}, z={z})"),
IndexMode::Exact => "exact".to_string(),
IndexMode::Approx { b, z } => format!("approx (b={b}, z={z})"),
IndexMode::Hybrid { b, z } => format!("hybrid (b={b}, z={z})"),
}
}
@@ -443,13 +517,21 @@ fn index_unitig_size(src: &KmerIndex) -> u64 {
fn choose_base(sources: &[&KmerIndex], mode: MergeMode) -> usize {
let needs_approx = sources.iter().any(|src| {
!is_trivial(src, mode)
&& matches!(src.meta.config.evidence, IndexMode::Approx { .. } | IndexMode::Hybrid { .. })
&& matches!(
src.meta.config.evidence,
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
)
});
sources.iter().enumerate()
sources
.iter()
.enumerate()
.filter(|(_, src)| {
!needs_approx
|| matches!(src.meta.config.evidence, IndexMode::Approx { .. } | IndexMode::Hybrid { .. })
|| matches!(
src.meta.config.evidence,
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
)
})
.max_by_key(|(_, src)| index_unitig_size(src))
.map(|(i, _)| i)