Merge pull request 'Push ooxwzorvsqvy' (#26) from push-ooxwzorvsqvy into main

Reviewed-on: #26
This commit was merged in pull request #26.
This commit is contained in:
2026-06-13 09:59:07 +00:00
10 changed files with 438 additions and 174 deletions
+2
View File
@@ -1486,6 +1486,7 @@ name = "obidebruinj"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"ahash", "ahash",
"crossbeam-channel",
"hashbrown 0.14.5", "hashbrown 0.14.5",
"obifastwrite", "obifastwrite",
"obikseq", "obikseq",
@@ -1506,6 +1507,7 @@ dependencies = [
name = "obikindex" name = "obikindex"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"crossbeam-channel",
"indicatif", "indicatif",
"ndarray", "ndarray",
"obicompactvec", "obicompactvec",
+2 -1
View File
@@ -8,7 +8,8 @@ obikseq = { path = "../obikseq" }
obifastwrite = { path = "../obifastwrite" } obifastwrite = { path = "../obifastwrite" }
ahash = "0.8" ahash = "0.8"
hashbrown = { version = "0.14", features = ["rayon"] } hashbrown = { version = "0.14", features = ["rayon"] }
rayon = "1" rayon = "1"
crossbeam-channel = "0.5"
xxhash-rust = { version = "0.8.15", features = ["xxh3", "const_xxh3"] } xxhash-rust = { version = "0.8.15", features = ["xxh3", "const_xxh3"] }
tracing = "0.1" tracing = "0.1"
+113 -55
View File
@@ -1,12 +1,15 @@
//use ahash::RandomState; //use ahash::RandomState;
use crossbeam_channel;
use hashbrown::HashMap; use hashbrown::HashMap;
use obikseq::k; use obikseq::k;
use obikseq::{CanonicalKmer, Sequence}; use obikseq::{CanonicalKmer, Sequence, Unitig};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::cell::RefCell;
use std::fmt; use std::fmt;
use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::atomic::{AtomicU8, Ordering};
use xxhash_rust::xxh3::Xxh3Builder; use std::time::Instant;
use tracing::{debug, info}; use tracing::{debug, info};
use xxhash_rust::xxh3::Xxh3Builder;
// ── Types ───────────────────────────────────────────────────────────────────── // ── Types ─────────────────────────────────────────────────────────────────────
@@ -96,26 +99,6 @@ impl Node {
(self.0 >> 5) & 0b11 (self.0 >> 5) & 0b11
} }
/// Number of left neighbours.
pub fn n_left_neighbours(self) -> u8 {
if self.can_extend_left() {
1
} else {
let v = (self.0 >> 5) & 0b11;
v + (v != 0) as u8
}
}
/// Number of right neighbours.
pub fn n_right_neighbours(self) -> u8 {
if self.can_extend_right() {
1
} else {
let v = (self.0 >> 3) & 0b11;
v + (v != 0) as u8
}
}
/// Marks the node as visited. /// Marks the node as visited.
#[inline] #[inline]
pub fn set_visited(&mut self) { pub fn set_visited(&mut self) {
@@ -162,13 +145,17 @@ impl fmt::Display for Node {
const NUC: [char; 4] = ['A', 'C', 'G', 'T']; const NUC: [char; 4] = ['A', 'C', 'G', 'T'];
let r = if self.can_extend_right() { let r = if self.can_extend_right() {
format!("{}", NUC[self.right_nuc() as usize]) format!("{}", NUC[self.right_nuc() as usize])
} else if (self.0 >> 3) & 0b11 == 0 {
"→0".to_string()
} else { } else {
format!("{}", self.n_right_neighbours()) "→≥2".to_string()
}; };
let l = if self.can_extend_left() { let l = if self.can_extend_left() {
format!("{}", NUC[self.left_nuc() as usize]) format!("{}", NUC[self.left_nuc() as usize])
} else if (self.0 >> 5) & 0b11 == 0 {
"←0".to_string()
} else { } else {
format!("{}", self.n_left_neighbours()) "←≥2".to_string()
}; };
let v = if self.is_visited() { "V" } else { "." }; let v = if self.is_visited() { "V" } else { "." };
write!(f, "Node({r} {l} {v})") write!(f, "Node({r} {l} {v})")
@@ -192,8 +179,12 @@ impl WalkState {
} }
pub fn reachable(&self, graph: &GraphDeBruijn) -> bool { pub fn reachable(&self, graph: &GraphDeBruijn) -> bool {
WalkState { kmer: self.kmer, node: self.node, direct: !self.direct } WalkState {
.leavable(graph) kmer: self.kmer,
node: self.node,
direct: !self.direct,
}
.leavable(graph)
} }
pub fn walk(&self, graph: &GraphDeBruijn) -> Option<(WalkState, u8)> { pub fn walk(&self, graph: &GraphDeBruijn) -> Option<(WalkState, u8)> {
@@ -209,8 +200,19 @@ impl WalkState {
if next_node.is_visited() { if next_node.is_visited() {
return None; return None;
} }
let reachable = if dnext { next_node.can_extend_left() } else { next_node.can_extend_right() }; let reachable = if dnext {
reachable.then_some((WalkState { kmer: cnext, node: next_node, direct: dnext }, nuc)) next_node.can_extend_left()
} else {
next_node.can_extend_right()
};
reachable.then_some((
WalkState {
kmer: cnext,
node: next_node,
direct: dnext,
},
nuc,
))
} else { } else {
if !self.node.can_extend_left() { if !self.node.can_extend_left() {
return None; return None;
@@ -223,8 +225,19 @@ impl WalkState {
if next_node.is_visited() { if next_node.is_visited() {
return None; return None;
} }
let reachable = if dnext { next_node.can_extend_right() } else { next_node.can_extend_left() }; let reachable = if dnext {
reachable.then_some((WalkState { kmer: cnext, node: next_node, direct: dnext }, 3 - nuc)) next_node.can_extend_right()
} else {
next_node.can_extend_left()
};
reachable.then_some((
WalkState {
kmer: cnext,
node: next_node,
direct: dnext,
},
3 - nuc,
))
} }
} }
} }
@@ -272,6 +285,7 @@ impl GraphDeBruijn {
pub fn compute_degrees_and_mark_starts(&self) { pub fn compute_degrees_and_mark_starts(&self) {
// Pass 1: count right/left neighbors for each node // Pass 1: count right/left neighbors for each node
let t1 = Instant::now();
self.for_each_node(|kmer, atomic| { self.for_each_node(|kmer, atomic| {
let mut old = Node(atomic.load(Ordering::Relaxed)); let mut old = Node(atomic.load(Ordering::Relaxed));
if old.is_visited() { if old.is_visited() {
@@ -286,9 +300,15 @@ impl GraphDeBruijn {
node.set_left(lc, ln); node.set_left(lc, ln);
atomic.store(node.0, Ordering::Relaxed); atomic.store(node.0, Ordering::Relaxed);
}); });
debug!(
"[compute_degrees] pass 1 (degrees): {:?} — {} nodes",
t1.elapsed(),
self.nodes.len()
);
// Pass 2: mark start nodes // Pass 2: mark start nodes
let t2 = Instant::now();
self.for_each_node(|kmer, atomic| { self.for_each_node(|kmer, atomic| {
let mut node = Node(atomic.load(Ordering::Relaxed)); let mut node = Node(atomic.load(Ordering::Relaxed));
if node.is_visited() { if node.is_visited() {
@@ -299,6 +319,11 @@ impl GraphDeBruijn {
atomic.store(node.0, Ordering::Relaxed); atomic.store(node.0, Ordering::Relaxed);
} }
}); });
debug!(
"[compute_degrees] pass 2 (starts): {:?} — {} nodes",
t2.elapsed(),
self.nodes.len()
);
} }
pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> { pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> {
@@ -336,14 +361,28 @@ impl GraphDeBruijn {
} }
fn unitig_nucleotides(&self, kmer: CanonicalKmer, k: usize) -> Option<UnitigNucIter<'_>> { fn unitig_nucleotides(&self, kmer: CanonicalKmer, k: usize) -> Option<UnitigNucIter<'_>> {
let old = self.nodes.get(&kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); let old = self
if old & IS_VISITED_MASK != 0 { return None; } .nodes
.get(&kmer)?
.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
if old & IS_VISITED_MASK != 0 {
return None;
}
let start = WalkState::new(kmer, Node(old), true); let start = WalkState::new(kmer, Node(old), true);
let next_step = start.walk(self).and_then(|(next_state, nuc)| { let next_step = start.walk(self).and_then(|(next_state, nuc)| {
let ext_old = self.nodes.get(&next_state.kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); let ext_old = self
.nodes
.get(&next_state.kmer)?
.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
(ext_old & IS_VISITED_MASK == 0).then_some((next_state, nuc)) (ext_old & IS_VISITED_MASK == 0).then_some((next_state, nuc))
}); });
Some(UnitigNucIter { graph: self, start: kmer, pos: 0, k, next_step }) Some(UnitigNucIter {
graph: self,
start: kmer,
pos: 0,
k,
next_step,
})
} }
pub fn for_each_unitig(&self, f: impl Fn(UnitigNucIter<'_>) + Sync) { pub fn for_each_unitig(&self, f: impl Fn(UnitigNucIter<'_>) + Sync) {
@@ -360,7 +399,9 @@ impl GraphDeBruijn {
self.nodes self.nodes
.par_iter() .par_iter()
.filter_map(|(&kmer, atomic)| { .filter_map(|(&kmer, atomic)| {
Node(atomic.load(Ordering::Acquire)).is_start().then_some(kmer) Node(atomic.load(Ordering::Acquire))
.is_start()
.then_some(kmer)
}) })
.for_each(|kmer| { .for_each(|kmer| {
if let Some(iter) = self.unitig_nucleotides(kmer, k) { if let Some(iter) = self.unitig_nucleotides(kmer, k) {
@@ -411,10 +452,10 @@ impl GraphDeBruijn {
} }
} }
info!( debug!(
chains = n_chains.load(Ordering::Relaxed), chains = n_chains.load(Ordering::Relaxed),
phase2 = n2.load(Ordering::Relaxed), phase2 = n2.load(Ordering::Relaxed),
total = n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed), total = n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed),
"unitig traversal complete" "unitig traversal complete"
); );
} }
@@ -460,19 +501,31 @@ impl GraphDeBruijn {
pub fn try_for_each_unitig<E, F>(&self, f: F) -> Result<(), E> pub fn try_for_each_unitig<E, F>(&self, f: F) -> Result<(), E>
where where
E: Send, E: Send,
F: FnMut(UnitigNucIter<'_>) -> Result<(), E> + Send, F: FnMut(&Unitig) -> Result<(), E> + Send,
{ {
let error = std::sync::Mutex::new(None::<E>); thread_local! {
let f = std::sync::Mutex::new(f); static BUF: RefCell<Vec<u8>> = RefCell::new(Vec::with_capacity(4096));
self.for_each_unitig(|iter| { }
if error.lock().unwrap().is_some() { let (tx, rx) = crossbeam_channel::bounded::<Unitig>(rayon::current_num_threads() * 256);
return; std::thread::scope(|s| {
} let writer = s.spawn(move || -> Result<(), E> {
if let Err(e) = f.lock().unwrap()(iter) { let mut f = f;
*error.lock().unwrap() = Some(e); for unitig in rx {
} f(&unitig)?;
}); }
error.into_inner().unwrap().map_or(Ok(()), Err) Ok(())
});
self.for_each_unitig(|iter| {
BUF.with(|buf| {
let mut buf = buf.borrow_mut();
buf.clear();
buf.extend(iter);
tx.send(Unitig::from_nucleotides(&buf)).ok();
});
});
drop(tx);
writer.join().expect("writer thread panicked")
})
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
@@ -504,7 +557,11 @@ impl Iterator for UnitigNucIter<'_> {
Some(nuc) Some(nuc)
} else if let Some((state, nuc)) = self.next_step.take() { } else if let Some((state, nuc)) = self.next_step.take() {
self.next_step = state.walk(self.graph).and_then(|(next_state, next_nuc)| { self.next_step = state.walk(self.graph).and_then(|(next_state, next_nuc)| {
let old = self.graph.nodes.get(&next_state.kmer)?.fetch_or(IS_VISITED_MASK, Ordering::AcqRel); let old = self
.graph
.nodes
.get(&next_state.kmer)?
.fetch_or(IS_VISITED_MASK, Ordering::AcqRel);
(old & IS_VISITED_MASK == 0).then_some((next_state, next_nuc)) (old & IS_VISITED_MASK == 0).then_some((next_state, next_nuc))
}); });
Some(nuc) Some(nuc)
@@ -527,22 +584,23 @@ fn count_neighbors(
nodes: &FastHashMap<CanonicalKmer, AtomicU8>, nodes: &FastHashMap<CanonicalKmer, AtomicU8>,
) -> (u8, Option<u8>) { ) -> (u8, Option<u8>) {
let mut count = 0u8; let mut count = 0u8;
let mut first = None; let mut nuc = 0u8;
for (i, neighbour) in neighbors.iter().enumerate() { for (i, neighbour) in neighbors.iter().enumerate() {
if let Some(a) = nodes.get(neighbour) { if let Some(a) = nodes.get(neighbour) {
if Node(a.load(Ordering::Relaxed)).is_visited() { if Node(a.load(Ordering::Relaxed)).is_visited() {
continue; continue;
} }
nuc = i as u8;
count += 1; count += 1;
if first.is_none() { if count >= 2 {
first = Some(i as u8); return (2, None);
} }
} }
} }
if count == 1 { if count == 1 {
(1, first) (1, Some(nuc))
} else { } else {
(count, None) (0, None)
} }
} }
+2 -2
View File
@@ -24,8 +24,8 @@ fn canonical_kmers(seq: &[u8]) -> Vec<CanonicalKmer> {
fn collect_unitigs(g: &GraphDeBruijn) -> Vec<Unitig> { fn collect_unitigs(g: &GraphDeBruijn) -> Vec<Unitig> {
let mut unitigs = Vec::new(); let mut unitigs = Vec::new();
g.try_for_each_unitig(|nuc_iter| -> Result<(), std::convert::Infallible> { g.try_for_each_unitig(|unitig| -> Result<(), std::convert::Infallible> {
unitigs.push(nuc_iter.collect()); unitigs.push(unitig.clone());
Ok(()) Ok(())
}) })
.unwrap(); .unwrap();
+2 -1
View File
@@ -11,7 +11,8 @@ obisys = { path = "../obisys" }
obicompactvec = { path = "../obicompactvec" } obicompactvec = { path = "../obicompactvec" }
obilayeredmap = { path = "../obilayeredmap" } obilayeredmap = { path = "../obilayeredmap" }
ndarray = "0.16" ndarray = "0.16"
rayon = "1" rayon = "1"
crossbeam-channel = "0.5"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
indicatif = "0.17" indicatif = "0.17"
+202 -66
View File
@@ -2,8 +2,10 @@ use std::collections::HashMap;
use std::fs; use std::fs;
use std::io; use std::io;
use std::path::Path; use std::path::Path;
use std::time::{Duration, Instant};
use obisys::{Reporter, Stage, progress_bar, spinner}; use crossbeam_channel::unbounded;
use obisys::{CpuSample, Reporter, Stage, progress_bar, spinner};
use tracing::{debug, info}; use tracing::{debug, info};
use obilayeredmap::IndexMode; use obilayeredmap::IndexMode;
@@ -19,9 +21,9 @@ pub use obikpartitionner::MergeMode;
#[derive(Debug)] #[derive(Debug)]
struct PartStat { struct PartStat {
id: usize, id: usize,
unitig_bytes: u64, unitig_bytes: u64,
g_len: usize, g_len: usize,
} }
// ── main merge entry point ──────────────────────────────────────────────────── // ── main merge entry point ────────────────────────────────────────────────────
@@ -51,9 +53,9 @@ impl KmerIndex {
if src.state() != IndexState::Indexed { if src.state() != IndexState::Indexed {
return Err(OKIError::NotIndexed(src.root_path.clone())); return Err(OKIError::NotIndexed(src.root_path.clone()));
} }
if src.kmer_size() != ref0.kmer_size() if src.kmer_size() != ref0.kmer_size()
|| src.minimizer_size() != ref0.minimizer_size() || src.minimizer_size() != ref0.minimizer_size()
|| src.n_partitions() != ref0.n_partitions() || src.n_partitions() != ref0.n_partitions()
{ {
return Err(OKIError::IncompatibleConfig); return Err(OKIError::IncompatibleConfig);
} }
@@ -63,39 +65,65 @@ impl KmerIndex {
} }
// ── Log source characteristics and choose base ──────────────────────── // ── Log source characteristics and choose base ────────────────────────
let mode_str = if mode == MergeMode::Presence { "presence" } else { "count" }; let mode_str = if mode == MergeMode::Presence {
"presence"
} else {
"count"
};
info!( info!(
"merge: {} source(s), smer-size={}, mode={}", "merge: {} source(s), smer-size={}, mode={}",
sources.len(), sources[0].kmer_size(), mode_str, sources.len(),
sources[0].kmer_size(),
mode_str,
); );
for (i, src) in sources.iter().enumerate() { for (i, src) in sources.iter().enumerate() {
let genome_str = if src.meta.genomes.len() == 1 { "mono-genome".to_string() } let genome_str = if src.meta.genomes.len() == 1 {
else { format!("{} genomes", src.meta.genomes.len()) }; "mono-genome".to_string()
let trivial_str = if is_trivial(src, mode) { " [trivial: no data approximation]" } else { "" }; } else {
format!("{} genomes", src.meta.genomes.len())
};
let trivial_str = if is_trivial(src, mode) {
" [trivial: no data approximation]"
} else {
""
};
info!( info!(
" [{}] {} — {}, {}, {}{}", " [{}] {} — {}, {}, {}{}",
i, src.root_path.display(), i,
src.root_path.display(),
format_evidence(&src.meta.config.evidence), format_evidence(&src.meta.config.evidence),
genome_str, mode_str, trivial_str, genome_str,
mode_str,
trivial_str,
); );
} }
let base_idx = choose_base(sources, mode); let base_idx = choose_base(sources, mode);
let needs_approx = sources.iter().any(|src| { let needs_approx = sources.iter().any(|src| {
!is_trivial(src, mode) !is_trivial(src, mode)
&& matches!(src.meta.config.evidence, IndexMode::Approx { .. } | IndexMode::Hybrid { .. }) && matches!(
src.meta.config.evidence,
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
)
}); });
info!( info!(
"output evidence: {} ({}base: [{}] {})", "output evidence: {} ({}base: [{}] {})",
format_evidence(&sources[base_idx].meta.config.evidence), format_evidence(&sources[base_idx].meta.config.evidence),
if needs_approx { "forced approx — " } else { "" }, if needs_approx {
base_idx, sources[base_idx].root_path.display(), "forced approx — "
} else {
""
},
base_idx,
sources[base_idx].root_path.display(),
); );
let mut ordered: Vec<&KmerIndex> = Vec::with_capacity(sources.len()); let mut ordered: Vec<&KmerIndex> = Vec::with_capacity(sources.len());
ordered.push(sources[base_idx]); ordered.push(sources[base_idx]);
for (i, &src) in sources.iter().enumerate() { for (i, &src) in sources.iter().enumerate() {
if i != base_idx { ordered.push(src); } if i != base_idx {
ordered.push(src);
}
} }
let sources: &[&KmerIndex] = &ordered; let sources: &[&KmerIndex] = &ordered;
let evidence = sources[0].meta.config.evidence.clone(); let evidence = sources[0].meta.config.evidence.clone();
@@ -149,7 +177,8 @@ impl KmerIndex {
fs::remove_dir_all(&spectrums_dir)?; fs::remove_dir_all(&spectrums_dir)?;
} }
for (src, new_labels) in sources.iter().zip(&source_labels) { for (src, new_labels) in sources.iter().zip(&source_labels) {
let old_labels: Vec<String> = src.meta.genomes.iter().map(|g| g.label.clone()).collect(); let old_labels: Vec<String> =
src.meta.genomes.iter().map(|g| g.label.clone()).collect();
copy_spectrums(&src.root_path, output, &old_labels, new_labels)?; copy_spectrums(&src.root_path, output, &old_labels, new_labels)?;
} }
pb.finish_and_clear(); pb.finish_and_clear();
@@ -182,57 +211,143 @@ impl KmerIndex {
// Per-partition unitig byte sizes across remaining sources (stat() only) // Per-partition unitig byte sizes across remaining sources (stat() only)
let partition_sizes: Vec<u64> = (0..n_partitions) let partition_sizes: Vec<u64> = (0..n_partitions)
.map(|i| remaining_sources.iter() .map(|i| {
.map(|s| partition_unitig_bytes(s, i)) remaining_sources
.sum()) .iter()
.map(|s| partition_unitig_bytes(s, i))
.sum()
})
.collect(); .collect();
// LFD sort: largest partition first // LFD sort: largest partition first
let mut order: Vec<usize> = (0..n_partitions).collect(); let mut order: Vec<usize> = (0..n_partitions).collect();
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i])); order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
// ── First partition (largest) ───────────────────────────────────── // ── Adaptive worker pool ──────────────────────────────────────────
let worst_id = order[0]; // Start with 1 worker thread. After each completed partition,
let worst_bytes = partition_sizes[worst_id]; // measure CPU efficiency (via getrusage delta). If efficiency is
// below the spawn threshold and more partitions remain, spawn one
// additional worker. Workers share a crossbeam channel of partition
// IDs; each reports (id, g_len, duration) on a result channel.
const SPAWN_THRESHOLD: f64 = 0.95; // spawn when >5% capacity idle
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let max_workers = (n_cores / 2).max(1);
let _ = budget_fraction; // kept in signature for CLI compatibility
let worst_g_len = dst_partition let (part_tx, part_rx) = unbounded::<usize>();
.merge_partition(worst_id, &srcs, mode, n_dst_genomes, block_bits, &evidence) let (result_tx, result_rx) =
.map_err(OKIError::Partition)?; unbounded::<(usize, Result<usize, obiskio::SKError>, Duration)>();
pb.inc(1); // activate_tx: controller sends () to wake the next dormant worker.
// Dropping activate_tx closes the channel; dormant workers exit.
let (activate_tx, activate_rx) = unbounded::<()>();
info!( for &i in &order {
"merge_partitions: first partition {} — {} unitig bytes → {} new kmers", part_tx.send(i).ok();
worst_id, fmt_bytes(worst_bytes), worst_g_len, }
); drop(part_tx);
let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions); let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
part_stats.push(PartStat { let mut n_workers = 0usize;
id: worst_id, let mut cpu_sample = CpuSample::now();
unitig_bytes: worst_bytes, // Efficiency measured just before each spawn, used to assess
g_len: worst_g_len, // whether the previous worker delivered its expected marginal gain.
}); let mut efficiency_at_last_spawn = 0.0f64;
// ── Sequential remainder ────────────────────────────────────────── // Shadow as references so closures can capture them by copy.
// One partition at a time; each partition uses an internal pipeline let srcs = &srcs;
// (obipipeline) to parallelise file I/O and dst_map filtering. let evidence = &evidence;
let _ = budget_fraction; // kept in signature for CLI compatibility
for &i in &order[1..] {
let ubytes = partition_sizes[i];
debug!("partition {i}: start — {} unitig bytes", fmt_bytes(ubytes));
let g_len = dst_partition std::thread::scope(|s| -> OKIResult<()> {
.merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence) // Pre-spawn max_workers threads; each waits for an activation
.map_err(OKIError::Partition)?; // signal before consuming from part_rx.
pb.inc(1); for _ in 0..max_workers {
let prx = part_rx.clone();
let rtx = result_tx.clone();
let arx = activate_rx.clone();
s.spawn(move || {
if arx.recv().is_ok() {
for i in &prx {
let t = Instant::now();
let r = dst_partition.merge_partition(
i,
srcs,
mode,
n_dst_genomes,
block_bits,
evidence,
);
rtx.send((i, r, t.elapsed())).ok();
}
}
});
}
drop(result_tx);
debug!("partition {i}: done — {} new kmers", g_len); // Activate first worker immediately.
part_stats.push(PartStat { id: i, unitig_bytes: ubytes, g_len }); activate_tx.send(()).ok();
} n_workers = 1;
let mut completed = 0usize;
while completed < n_partitions {
let (i, r, dur) = result_rx.recv().map_err(|_| {
OKIError::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"worker channel closed",
))
})?;
let g_len = r.map_err(OKIError::Partition)?;
pb.inc(1);
debug!(
"partition {i}: done in {:.1}s — {} new kmers",
dur.as_secs_f64(),
g_len
);
part_stats.push(PartStat {
id: i,
unitig_bytes: partition_sizes[i],
g_len,
});
completed += 1;
if n_workers < max_workers && completed < n_partitions {
let eff = cpu_sample.cpu_efficiency(n_cores);
// For the first spawn use SPAWN_THRESHOLD.
// For subsequent spawns: the previous worker should
// have raised efficiency by at least a quarter of the expected
// marginal gain (1/n_workers). If not, adding another
// worker won't help.
let should_spawn = if n_workers == 1 {
eff < SPAWN_THRESHOLD
} else {
let gain = eff - efficiency_at_last_spawn;
let expected = 1.0 / n_workers as f64;
gain >= expected * 0.25
};
if should_spawn {
debug!(
"activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%",
n_workers + 1,
eff * 100.0,
(eff - efficiency_at_last_spawn) * 100.0,
);
efficiency_at_last_spawn = eff;
activate_tx.send(()).ok();
n_workers += 1;
cpu_sample = CpuSample::now();
}
}
}
// Close activate_tx: dormant workers exit cleanly.
drop(activate_tx);
Ok(())
})?;
pb.finish_and_clear(); pb.finish_and_clear();
// ── Diagnostic report ───────────────────────────────────────────── // ── Diagnostic report ─────────────────────────────────────────────
print_merge_partition_report(&part_stats); print_merge_partition_report(&part_stats, n_workers, max_workers);
rep.push(t.stop()); rep.push(t.stop());
} }
@@ -254,7 +369,7 @@ impl KmerIndex {
// ── Diagnostic report ───────────────────────────────────────────────────────── // ── Diagnostic report ─────────────────────────────────────────────────────────
fn print_merge_partition_report(stats: &[PartStat]) { fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_workers: usize) {
let total_new: usize = stats.iter().map(|s| s.g_len).sum(); let total_new: usize = stats.iter().map(|s| s.g_len).sum();
let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count(); let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count();
@@ -268,6 +383,7 @@ fn print_merge_partition_report(stats: &[PartStat]) {
" {} partition(s) processed, {} total new kmers", " {} partition(s) processed, {} total new kmers",
non_empty, total_new, non_empty, total_new,
); );
info!(" workers spawned: {n_workers} / {max_workers} (max)",);
// Top 8 partitions by new-kmer count // Top 8 partitions by new-kmer count
let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect(); let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect();
@@ -289,10 +405,15 @@ fn print_merge_partition_report(stats: &[PartStat]) {
// ── helpers ─────────────────────────────────────────────────────────────────── // ── helpers ───────────────────────────────────────────────────────────────────
fn fmt_bytes(b: u64) -> String { fn fmt_bytes(b: u64) -> String {
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) } if b >= 1 << 30 {
else if b >= 1 << 20 { format!("{:.1} MB", b as f64 / (1u64 << 20) as f64) } format!("{:.1} GB", b as f64 / (1u64 << 30) as f64)
else if b >= 1 << 10 { format!("{:.1} KB", b as f64 / (1u64 << 10) as f64) } } else if b >= 1 << 20 {
else { format!("{b} B") } format!("{:.1} MB", b as f64 / (1u64 << 20) as f64)
} else if b >= 1 << 10 {
format!("{:.1} KB", b as f64 / (1u64 << 10) as f64)
} else {
format!("{b} B")
}
} }
/// Sum of all unitigs.bin sizes across all layers of partition `i` in `src`. /// Sum of all unitigs.bin sizes across all layers of partition `i` in `src`.
@@ -300,8 +421,12 @@ fn partition_unitig_bytes(src: &KmerIndex, i: usize) -> u64 {
let mut total = 0u64; let mut total = 0u64;
for l in 0.. { for l in 0.. {
let p = src.layer_unitigs_path(i, l); let p = src.layer_unitigs_path(i, l);
if !p.exists() { break; } if !p.exists() {
if let Ok(m) = std::fs::metadata(&p) { total += m.len(); } break;
}
if let Ok(m) = std::fs::metadata(&p) {
total += m.len();
}
} }
total total
} }
@@ -328,7 +453,10 @@ fn compute_labels(
}; };
*count += 1; *count += 1;
labels.push(new_label.clone()); labels.push(new_label.clone());
all_genomes.push(GenomeInfo { label: new_label, meta: genome.meta.clone() }); all_genomes.push(GenomeInfo {
label: new_label,
meta: genome.meta.clone(),
});
} }
source_labels.push(labels); source_labels.push(labels);
} }
@@ -371,9 +499,9 @@ fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> {
fn format_evidence(ev: &IndexMode) -> String { fn format_evidence(ev: &IndexMode) -> String {
match ev { match ev {
IndexMode::Exact => "exact".to_string(), IndexMode::Exact => "exact".to_string(),
IndexMode::Approx { b, z } => format!("approx (b={b}, z={z})"), IndexMode::Approx { b, z } => format!("approx (b={b}, z={z})"),
IndexMode::Hybrid { b, z } => format!("hybrid (b={b}, z={z})"), IndexMode::Hybrid { b, z } => format!("hybrid (b={b}, z={z})"),
} }
} }
@@ -389,13 +517,21 @@ fn index_unitig_size(src: &KmerIndex) -> u64 {
fn choose_base(sources: &[&KmerIndex], mode: MergeMode) -> usize { fn choose_base(sources: &[&KmerIndex], mode: MergeMode) -> usize {
let needs_approx = sources.iter().any(|src| { let needs_approx = sources.iter().any(|src| {
!is_trivial(src, mode) !is_trivial(src, mode)
&& matches!(src.meta.config.evidence, IndexMode::Approx { .. } | IndexMode::Hybrid { .. }) && matches!(
src.meta.config.evidence,
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
)
}); });
sources.iter().enumerate() sources
.iter()
.enumerate()
.filter(|(_, src)| { .filter(|(_, src)| {
!needs_approx !needs_approx
|| matches!(src.meta.config.evidence, IndexMode::Approx { .. } | IndexMode::Hybrid { .. }) || matches!(
src.meta.config.evidence,
IndexMode::Approx { .. } | IndexMode::Hybrid { .. }
)
}) })
.max_by_key(|(_, src)| index_unitig_size(src)) .max_by_key(|(_, src)| index_unitig_size(src))
.map(|(i, _)| i) .map(|(i, _)| i)
+2 -3
View File
@@ -107,9 +107,8 @@ impl KmerPartition {
fs::create_dir_all(&layer_dir)?; fs::create_dir_all(&layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?; let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?;
g.try_for_each_unitig(|nuc_iter| { g.try_for_each_unitig(|unitig| {
let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); uw.write(unitig)
uw.write(&unitig)
})?; })?;
uw.close()?; uw.close()?;
+78 -43
View File
@@ -3,7 +3,11 @@ use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use obipipeline::{Pipeline, WorkerPool, make_flat_transform, make_sink, make_source, make_transform}; use tracing::debug;
use obipipeline::{
Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool,
make_sink, make_source, make_transform,
};
use obicompactvec::{ use obicompactvec::{
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
@@ -232,23 +236,38 @@ impl KmerPartition {
let pipeline = Pipeline::new( let pipeline = Pipeline::new(
make_source!(Pass1Data, unitig_paths, File), make_source!(Pass1Data, unitig_paths, File),
vec![ vec![
make_flat_transform!(Pass1Data, { Stage::Flat(Arc::new(
move |path: PathBuf| -> Vec<Vec<CanonicalKmer>> { move |data: Pass1Data,
match UnitigFileReader::open_sequential(&path) { push: &PipelineSender<Result<Pass1Data, PipelineError>>,
Err(e) => { delta: &PipelineSender<isize>|
*err_cap.lock().unwrap() = Some(e.to_string()); {
vec![] if let Pass1Data::File(path) = data {
let reader = match UnitigFileReader::open_sequential(&path) {
Ok(r) => r,
Err(e) => {
*err_cap.lock().unwrap() = Some(e.to_string());
delta.send(-1).ok();
return;
}
};
let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
let mut count: isize = 0;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
batch.push(kmer);
if batch.len() == BATCH {
let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH));
push.send(Ok(Pass1Data::Batch(b))).ok();
count += 1;
}
} }
Ok(reader) => { if !batch.is_empty() {
let kmers: Vec<CanonicalKmer> = reader push.send(Ok(Pass1Data::Batch(batch))).ok();
.iter_indexed_canonical_kmers() count += 1;
.map(|(k, _, _)| k)
.collect();
kmers.chunks(BATCH).map(|c| c.to_vec()).collect()
} }
delta.send(count - 1).ok();
} }
} }
}, File, Batch), ) as SharedFlatFn<Pass1Data>),
make_transform!(Pass1Data, { make_transform!(Pass1Data, {
move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> { move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> {
batch.into_iter() batch.into_iter()
@@ -278,6 +297,7 @@ impl KmerPartition {
.into_inner() .into_inner()
.unwrap_or_else(|e| e.into_inner()); .unwrap_or_else(|e| e.into_inner());
let any_new = g.len() > 0; let any_new = g.len() > 0;
debug!("partition {i}: de Bruijn graph done — {} new kmers", g.len());
// Build new layer from de Bruijn graph if there are new kmers. // Build new layer from de Bruijn graph if there are new kmers.
let new_layer_idx = n_dst_layers; let new_layer_idx = n_dst_layers;
@@ -287,9 +307,8 @@ impl KmerPartition {
g.compute_degrees_and_mark_starts(); g.compute_degrees_and_mark_starts();
fs::create_dir_all(&new_layer_dir)?; fs::create_dir_all(&new_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?;
g.try_for_each_unitig(|nuc_iter| { g.try_for_each_unitig(|unitig| {
let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); uw.write(unitig)
uw.write(&unitig)
})?; })?;
uw.close()?; uw.close()?;
let n = g.len(); let n = g.len();
@@ -430,36 +449,52 @@ impl KmerPartition {
let pipeline2 = Pipeline::new( let pipeline2 = Pipeline::new(
make_source!(Pass2Data, pass2_items, SrcLayer), make_source!(Pass2Data, pass2_items, SrcLayer),
vec![ vec![
make_flat_transform!(Pass2Data, { Stage::Flat(Arc::new(
move |(col_offset, src_n, src_layer_dir): (usize, usize, PathBuf)| move |data: Pass2Data,
-> Vec<(usize, usize, Arc<SrcLayerData>, Vec<CanonicalKmer>)> push: &PipelineSender<Result<Pass2Data, PipelineError>>,
delta: &PipelineSender<isize>|
{ {
let reader = match UnitigFileReader::open_sequential( if let Pass2Data::SrcLayer((col_offset, src_n, src_layer_dir)) = data {
&src_layer_dir.join("unitigs.bin"), let reader = match UnitigFileReader::open_sequential(
) { &src_layer_dir.join("unitigs.bin"),
Ok(r) => r, ) {
Err(e) => { Ok(r) => r,
*err_cap2.lock().unwrap() = Some(e.to_string()); Err(e) => {
return vec![]; *err_cap2.lock().unwrap() = Some(e.to_string());
delta.send(-1).ok();
return;
}
};
let src_data = match SrcLayerData::open(&src_layer_dir, mode) {
Ok(d) => Arc::new(d),
Err(e) => {
*err_cap2.lock().unwrap() = Some(e.to_string());
delta.send(-1).ok();
return;
}
};
let mut batch: Vec<CanonicalKmer> = Vec::with_capacity(BATCH);
let mut count: isize = 0;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
batch.push(kmer);
if batch.len() == BATCH {
let b = std::mem::replace(&mut batch, Vec::with_capacity(BATCH));
push.send(Ok(Pass2Data::RawBatch((
col_offset, src_n, Arc::clone(&src_data), b,
)))).ok();
count += 1;
}
} }
}; if !batch.is_empty() {
let src_data = match SrcLayerData::open(&src_layer_dir, mode) { push.send(Ok(Pass2Data::RawBatch((
Ok(d) => Arc::new(d), col_offset, src_n, src_data, batch,
Err(e) => { )))).ok();
*err_cap2.lock().unwrap() = Some(e.to_string()); count += 1;
return vec![];
} }
}; delta.send(count - 1).ok();
let all_kmers: Vec<CanonicalKmer> = reader }
.iter_indexed_canonical_kmers()
.map(|(kmer, _, _)| kmer)
.collect();
all_kmers
.chunks(BATCH)
.map(|c| (col_offset, src_n, Arc::clone(&src_data), c.to_vec()))
.collect()
} }
}, SrcLayer, RawBatch), ) as SharedFlatFn<Pass2Data>),
make_transform!(Pass2Data, { make_transform!(Pass2Data, {
move |(col_offset, src_n, src_data, kmers): (usize, usize, Arc<SrcLayerData>, Vec<CanonicalKmer>)| move |(col_offset, src_n, src_data, kmers): (usize, usize, Arc<SrcLayerData>, Vec<CanonicalKmer>)|
-> Vec<(Option<usize>, usize, usize, u32)> -> Vec<(Option<usize>, usize, usize, u32)>
+2 -3
View File
@@ -168,9 +168,8 @@ impl KmerPartition {
fs::create_dir_all(&dst_layer_dir)?; fs::create_dir_all(&dst_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?;
g.try_for_each_unitig(|nuc_iter| { g.try_for_each_unitig(|unitig| {
let unitig: obikseq::unitig::Unitig = nuc_iter.collect(); uw.write(unitig)
uw.write(&unitig)
})?; })?;
uw.close()?; uw.close()?;
drop(g); drop(g);
+33
View File
@@ -212,6 +212,39 @@ fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 }
// Monotonically increasing counters — negative delta would be a kernel bug. // Monotonically increasing counters — negative delta would be a kernel bug.
fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 } fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
// ── CpuSample ─────────────────────────────────────────────────────────────────
/// Snapshot of process-wide CPU time + wall clock at a point in time.
/// Use [`cpu_efficiency`](Self::cpu_efficiency) to measure the fraction of
/// available cores used since the snapshot was taken.
pub struct CpuSample {
wall: Instant,
user_secs: f64,
sys_secs: f64,
}
impl CpuSample {
pub fn now() -> Self {
let ru = get_rusage();
Self {
wall: Instant::now(),
user_secs: tv_to_secs(ru.ru_utime),
sys_secs: tv_to_secs(ru.ru_stime),
}
}
/// (user_delta + sys_delta) / (wall_delta × n_cores) since this snapshot.
/// Returns 0.0 if less than 100 ms have elapsed (too noisy).
pub fn cpu_efficiency(&self, n_cores: usize) -> f64 {
let ru = get_rusage();
let wall = self.wall.elapsed().as_secs_f64();
if wall < 0.1 { return 0.0; }
let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs)
+ (tv_to_secs(ru.ru_stime) - self.sys_secs);
cpu / (wall * n_cores as f64)
}
}
// ── public API ──────────────────────────────────────────────────────────────── // ── public API ────────────────────────────────────────────────────────────────
/// Snapshot taken at the start of a pipeline stage. /// Snapshot taken at the start of a pipeline stage.