Push rtnzuqxzmkon #31

Merged
coissac merged 2 commits from push-rtnzuqxzmkon into main 2026-06-15 09:40:36 +00:00
2 changed files with 19 additions and 192 deletions
Showing only changes of commit 4a64718fd1 - Show all commits
+13 -191
View File
@@ -2,10 +2,8 @@ use std::collections::HashMap;
use std::fs; use std::fs;
use std::io; use std::io;
use std::path::Path; use std::path::Path;
use std::time::{Duration, Instant};
use crossbeam_channel::unbounded; use obisys::{Reporter, Stage, progress_bar, spinner};
use obisys::{CpuSample, Reporter, Stage, progress_bar, spinner};
use tracing::{debug, info}; use tracing::{debug, info};
use obilayeredmap::IndexMode; use obilayeredmap::IndexMode;
@@ -26,24 +24,6 @@ struct PartStat {
g_len: usize, g_len: usize,
} }
// ── adaptive spawn criterion ──────────────────────────────────────────────────
// First worker: spawn if efficiency < SPAWN_THRESHOLD (CPU is underutilised).
// Subsequent workers: spawn only if the last spawn raised efficiency by at
// least the expected marginal gain (1/n_workers), with a minimum floor of 3%
// to avoid spurious spawns when efficiency fluctuates around the threshold.
const SPAWN_THRESHOLD: f64 = 0.95;
const MIN_MARGINAL_GAIN: f64 = 0.03;
fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool {
if n_workers == 1 {
eff < SPAWN_THRESHOLD
} else {
let gain = eff - eff_at_last_spawn;
let expected = 1.0 / n_workers as f64;
gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN)
}
}
// ── main merge entry point ──────────────────────────────────────────────────── // ── main merge entry point ────────────────────────────────────────────────────
impl KmerIndex { impl KmerIndex {
@@ -241,191 +221,33 @@ impl KmerIndex {
let mut order: Vec<usize> = (0..n_partitions).collect(); let mut order: Vec<usize> = (0..n_partitions).collect();
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i])); order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
// ── Adaptive worker pool ──────────────────────────────────────────
// Default (non-NUMA): start with 1 worker, grow adaptively up to
// n_cores/2 based on CPU efficiency.
//
// NUMA mode (Linux, multi-node): one pinned Rayon ThreadPool per
// NUMA node, workers_per_node workers per node, all pre-activated.
// No adaptive spawn: the optimal count is fixed by memory bandwidth.
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let max_workers = (n_cores / 2).max(1);
let _ = budget_fraction; // kept in signature for CLI compatibility let _ = budget_fraction; // kept in signature for CLI compatibility
let numa = crate::numa::build();
// effective_max_workers: slots to pre-spawn.
// numa_all_active: whether to activate all slots immediately.
let (effective_max_workers, numa_all_active) = match &numa {
Some(ns) => (ns.pools.len() * ns.workers_per_node(), true),
None => (max_workers, false),
};
let (part_tx, part_rx) = unbounded::<usize>();
let (result_tx, result_rx) =
unbounded::<(usize, Result<usize, obiskio::SKError>, Duration)>();
// activate_tx: controller sends () to wake the next dormant worker.
// Dropping activate_tx closes the channel; dormant workers exit.
let (activate_tx, activate_rx) = unbounded::<()>();
for &i in &order {
part_tx.send(i).ok();
}
drop(part_tx);
let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
let mut n_workers = 0usize;
let mut cpu_sample = CpuSample::now();
// Efficiency measured just before each spawn, used to assess
// whether the previous worker delivered its expected marginal gain.
let mut efficiency_at_last_spawn = 0.0f64;
// Shadow as references so closures can capture them by copy. // Shadow as references so closures can capture them by copy.
let srcs = &srcs; let srcs = &srcs;
let evidence = &evidence; let evidence = &evidence;
if let Some(ns) = &numa { let runner = crate::numa::PartitionRunner::new();
debug!( let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
"NUMA mode: {} node(s) × {} worker(s)/node = {} total workers",
ns.pools.len(),
ns.workers_per_node(),
effective_max_workers,
);
}
std::thread::scope(|s| -> OKIResult<()> { runner.run(
// Pre-spawn threads. In NUMA mode each thread is pinned to its &order,
// node's CPUs and wraps merge_partition in pool.install() so |i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence),
// that all Rayon calls use the node-local ThreadPool, and |i, g_len, dur| {
// Linux first-touch places graph allocations in local DRAM.
for worker_idx in 0..effective_max_workers {
let prx = part_rx.clone();
let rtx = result_tx.clone();
let arx = activate_rx.clone();
// Per-worker NUMA config: (pool, cpus) for this slot.
let numa_config: Option<(std::sync::Arc<rayon::ThreadPool>, Vec<usize>)> =
numa.as_ref().map(|ns| {
let wpn = ns.workers_per_node();
let node = worker_idx / wpn;
(
std::sync::Arc::clone(&ns.pools[node]),
ns.cpus_per_node[node].clone(),
)
});
s.spawn(move || {
if let Some((_, ref cpus)) = numa_config {
crate::numa::pin_current_thread(cpus);
}
if arx.recv().is_ok() {
for i in &prx {
let t = Instant::now();
let r = if let Some((ref pool, _)) = numa_config {
pool.install(|| {
dst_partition.merge_partition(
i, srcs, mode, n_dst_genomes, block_bits, evidence,
)
})
} else {
dst_partition.merge_partition(
i, srcs, mode, n_dst_genomes, block_bits, evidence,
)
};
rtx.send((i, r, t.elapsed())).ok();
}
}
});
}
drop(result_tx);
if numa_all_active {
// NUMA: activate every worker immediately.
for _ in 0..effective_max_workers {
activate_tx.send(()).ok();
}
n_workers = effective_max_workers;
} else {
// Non-NUMA: activate first worker, grow adaptively.
activate_tx.send(()).ok();
n_workers = 1;
}
const SPAWN_POLL: Duration = Duration::from_secs(20);
let mut completed = 0usize;
while completed < n_partitions {
let result = result_rx.recv_timeout(SPAWN_POLL);
let (i, r, dur) = match result {
Ok(v) => v,
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
if !numa_all_active && n_workers < effective_max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
debug!(
"activated worker {} (poll) — efficiency {:.0}%",
n_workers + 1,
eff * 100.0,
);
efficiency_at_last_spawn = eff;
activate_tx.send(()).ok();
n_workers += 1;
cpu_sample = CpuSample::now();
}
}
continue;
}
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
return Err(OKIError::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"worker channel closed",
)));
}
};
let g_len = r.map_err(OKIError::Partition)?;
pb.inc(1); pb.inc(1);
debug!( debug!(
"partition {i}: done in {:.1}s — {} new kmers", "partition {i}: done in {:.1}s — {} new kmers",
dur.as_secs_f64(), dur.as_secs_f64(),
g_len
);
part_stats.push(PartStat {
id: i,
unitig_bytes: partition_sizes[i],
g_len, g_len,
});
completed += 1;
if !numa_all_active && n_workers < effective_max_workers && completed < n_partitions {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
debug!(
"activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%",
n_workers + 1,
eff * 100.0,
(eff - efficiency_at_last_spawn) * 100.0,
); );
efficiency_at_last_spawn = eff; part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len });
activate_tx.send(()).ok(); },
n_workers += 1; ).map_err(OKIError::Partition)?;
cpu_sample = CpuSample::now();
}
}
}
// Dropping activate_tx signals dormant workers to exit cleanly
// (non-NUMA). In NUMA mode all workers were already activated so
// this drop is just cleanup.
drop(activate_tx);
Ok(())
})?;
pb.finish_and_clear(); pb.finish_and_clear();
// ── Diagnostic report ───────────────────────────────────────────── // ── Diagnostic report ─────────────────────────────────────────────
print_merge_partition_report(&part_stats, n_workers, effective_max_workers); print_merge_partition_report(&part_stats, runner.max_workers());
rep.push(t.stop()); rep.push(t.stop());
} }
@@ -447,7 +269,7 @@ impl KmerIndex {
// ── Diagnostic report ───────────────────────────────────────────────────────── // ── Diagnostic report ─────────────────────────────────────────────────────────
fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_workers: usize) { fn print_merge_partition_report(stats: &[PartStat], max_workers: usize) {
let total_new: usize = stats.iter().map(|s| s.g_len).sum(); let total_new: usize = stats.iter().map(|s| s.g_len).sum();
let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count(); let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count();
@@ -461,7 +283,7 @@ fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_worker
" {} partition(s) processed, {} total new kmers", " {} partition(s) processed, {} total new kmers",
non_empty, total_new, non_empty, total_new,
); );
info!(" workers spawned: {n_workers} / {max_workers} (max)",); info!(" max workers: {max_workers}");
// Top 8 partitions by new-kmer count // Top 8 partitions by new-kmer count
let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect(); let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect();
+5
View File
@@ -154,6 +154,11 @@ pub struct PartitionRunner {
impl PartitionRunner { impl PartitionRunner {
/// Detect topology and build. Falls back to a single-node UMA runner on /// Detect topology and build. Falls back to a single-node UMA runner on
/// macOS, single-socket machines, or hwloc failure. /// macOS, single-socket machines, or hwloc failure.
/// Total number of pre-spawned worker slots across all nodes.
pub fn max_workers(&self) -> usize {
self.nodes.iter().map(|n| n.max_workers).sum()
}
pub fn new() -> Self { pub fn new() -> Self {
let n_cores = std::thread::available_parallelism() let n_cores = std::thread::available_parallelism()
.map(|n| n.get()) .map(|n| n.get())