diff --git a/src/Cargo.lock b/src/Cargo.lock index 39706b2..cfe335d 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1507,6 +1507,7 @@ dependencies = [ name = "obikindex" version = "0.1.0" dependencies = [ + "crossbeam-channel", "indicatif", "ndarray", "obicompactvec", diff --git a/src/obikindex/Cargo.toml b/src/obikindex/Cargo.toml index d670480..3f89bb0 100644 --- a/src/obikindex/Cargo.toml +++ b/src/obikindex/Cargo.toml @@ -11,7 +11,8 @@ obisys = { path = "../obisys" } obicompactvec = { path = "../obicompactvec" } obilayeredmap = { path = "../obilayeredmap" } ndarray = "0.16" -rayon = "1" +rayon = "1" +crossbeam-channel = "0.5" serde = { version = "1", features = ["derive"] } serde_json = "1" indicatif = "0.17" diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index 0380e98..fa990d8 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -2,8 +2,10 @@ use std::collections::HashMap; use std::fs; use std::io; use std::path::Path; +use std::time::{Duration, Instant}; -use obisys::{Reporter, Stage, progress_bar, spinner}; +use crossbeam_channel::unbounded; +use obisys::{CpuSample, Reporter, Stage, progress_bar, spinner}; use tracing::{debug, info}; use obilayeredmap::IndexMode; @@ -191,48 +193,97 @@ impl KmerIndex { let mut order: Vec = (0..n_partitions).collect(); order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i])); - // ── First partition (largest) ───────────────────────────────────── - let worst_id = order[0]; - let worst_bytes = partition_sizes[worst_id]; + // ── Adaptive worker pool ────────────────────────────────────────── + // Start with 1 worker thread. After each completed partition, + // measure CPU efficiency (via getrusage delta). If efficiency is + // below the spawn threshold and more partitions remain, spawn one + // additional worker. Workers share a crossbeam channel of partition + // IDs; each reports (id, g_len, duration) on a result channel. + const SPAWN_THRESHOLD: f64 = 0.95; // spawn when >5% capacity idle + let n_cores = std::thread::available_parallelism() + .map(|n| n.get()).unwrap_or(1); + let max_workers = (n_cores / 2).max(1); + let _ = budget_fraction; // kept in signature for CLI compatibility - let worst_g_len = dst_partition - .merge_partition(worst_id, &srcs, mode, n_dst_genomes, block_bits, &evidence) - .map_err(OKIError::Partition)?; - pb.inc(1); + let (part_tx, part_rx) = unbounded::(); + let (result_tx, result_rx) = + unbounded::<(usize, Result, Duration)>(); + // activate_tx: controller sends () to wake the next dormant worker. + // Dropping activate_tx closes the channel; dormant workers exit. + let (activate_tx, activate_rx) = unbounded::<()>(); - info!( - "merge_partitions: first partition {} — {} unitig bytes → {} new kmers", - worst_id, fmt_bytes(worst_bytes), worst_g_len, - ); + for &i in &order { + part_tx.send(i).ok(); + } + drop(part_tx); let mut part_stats: Vec = Vec::with_capacity(n_partitions); - part_stats.push(PartStat { - id: worst_id, - unitig_bytes: worst_bytes, - g_len: worst_g_len, - }); + let mut n_workers = 0usize; + let mut cpu_sample = CpuSample::now(); - // ── Sequential remainder ────────────────────────────────────────── - // One partition at a time; each partition uses an internal pipeline - // (obipipeline) to parallelise file I/O and dst_map filtering. - let _ = budget_fraction; // kept in signature for CLI compatibility - for &i in &order[1..] { - let ubytes = partition_sizes[i]; - debug!("partition {i}: start — {} unitig bytes", fmt_bytes(ubytes)); + // Shadow as references so closures can capture them by copy. + let srcs = &srcs; + let evidence = &evidence; - let g_len = dst_partition - .merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence) - .map_err(OKIError::Partition)?; - pb.inc(1); + std::thread::scope(|s| -> OKIResult<()> { + // Pre-spawn max_workers threads; each waits for an activation + // signal before consuming from part_rx. + for _ in 0..max_workers { + let prx = part_rx.clone(); + let rtx = result_tx.clone(); + let arx = activate_rx.clone(); + s.spawn(move || { + if arx.recv().is_ok() { + for i in &prx { + let t = Instant::now(); + let r = dst_partition.merge_partition( + i, srcs, mode, n_dst_genomes, block_bits, evidence, + ); + rtx.send((i, r, t.elapsed())).ok(); + } + } + }); + } + drop(result_tx); - debug!("partition {i}: done — {} new kmers", g_len); - part_stats.push(PartStat { id: i, unitig_bytes: ubytes, g_len }); - } + // Activate first worker immediately. + activate_tx.send(()).ok(); + n_workers = 1; + + let mut completed = 0usize; + while completed < n_partitions { + let (i, r, dur) = result_rx.recv() + .map_err(|_| OKIError::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, "worker channel closed")))?; + let g_len = r.map_err(OKIError::Partition)?; + pb.inc(1); + debug!("partition {i}: done in {:.1}s — {} new kmers", + dur.as_secs_f64(), g_len); + part_stats.push(PartStat { + id: i, unitig_bytes: partition_sizes[i], g_len, + }); + completed += 1; + + if n_workers < max_workers && completed < n_partitions { + let eff = cpu_sample.cpu_efficiency(n_cores); + if eff < SPAWN_THRESHOLD { + activate_tx.send(()).ok(); + n_workers += 1; + cpu_sample = CpuSample::now(); + debug!("activated worker {n_workers} — efficiency {:.0}%", + eff * 100.0); + } + } + } + // Close activate_tx: dormant workers exit cleanly. + drop(activate_tx); + Ok(()) + })?; pb.finish_and_clear(); // ── Diagnostic report ───────────────────────────────────────────── - print_merge_partition_report(&part_stats); + print_merge_partition_report(&part_stats, n_workers, max_workers); rep.push(t.stop()); } @@ -254,7 +305,7 @@ impl KmerIndex { // ── Diagnostic report ───────────────────────────────────────────────────────── -fn print_merge_partition_report(stats: &[PartStat]) { +fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_workers: usize) { let total_new: usize = stats.iter().map(|s| s.g_len).sum(); let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count(); @@ -268,6 +319,9 @@ fn print_merge_partition_report(stats: &[PartStat]) { " {} partition(s) processed, {} total new kmers", non_empty, total_new, ); + info!( + " workers spawned: {n_workers} / {max_workers} (max)", + ); // Top 8 partitions by new-kmer count let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect(); diff --git a/src/obisys/src/lib.rs b/src/obisys/src/lib.rs index 5f8a3d3..aeccb1b 100644 --- a/src/obisys/src/lib.rs +++ b/src/obisys/src/lib.rs @@ -212,6 +212,39 @@ fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 } // Monotonically increasing counters — negative delta would be a kernel bug. fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 } +// ── CpuSample ───────────────────────────────────────────────────────────────── + +/// Snapshot of process-wide CPU time + wall clock at a point in time. +/// Use [`cpu_efficiency`](Self::cpu_efficiency) to measure the fraction of +/// available cores used since the snapshot was taken. +pub struct CpuSample { + wall: Instant, + user_secs: f64, + sys_secs: f64, +} + +impl CpuSample { + pub fn now() -> Self { + let ru = get_rusage(); + Self { + wall: Instant::now(), + user_secs: tv_to_secs(ru.ru_utime), + sys_secs: tv_to_secs(ru.ru_stime), + } + } + + /// (user_delta + sys_delta) / (wall_delta × n_cores) since this snapshot. + /// Returns 0.0 if less than 100 ms have elapsed (too noisy). + pub fn cpu_efficiency(&self, n_cores: usize) -> f64 { + let ru = get_rusage(); + let wall = self.wall.elapsed().as_secs_f64(); + if wall < 0.1 { return 0.0; } + let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs) + + (tv_to_secs(ru.ru_stime) - self.sys_secs); + cpu / (wall * n_cores as f64) + } +} + // ── public API ──────────────────────────────────────────────────────────────── /// Snapshot taken at the start of a pipeline stage.