Push rtnzuqxzmkon #31
@@ -0,0 +1,179 @@
|
|||||||
|
# NUMA-aware partition runner
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
All partition-level parallel loops in obikindex currently fall into two
|
||||||
|
categories:
|
||||||
|
|
||||||
|
**Naive Rayon** — used in `build_layers`, `pack_matrices`, `dump`, `select`,
|
||||||
|
`stats`, `rebuild`, `reindex`:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
(0..n).into_par_iter().for_each(|i| work(i));
|
||||||
|
```
|
||||||
|
|
||||||
|
Threads come from the global Rayon pool with no NUMA awareness. On
|
||||||
|
multi-socket machines this produces cross-socket memory traffic and degrades
|
||||||
|
performance super-linearly (see [NUMA-aware worker pools](numa_worker_pools.md)).
|
||||||
|
|
||||||
|
**Ad-hoc adaptive pool** — used in `merge`:
|
||||||
|
|
||||||
|
A bespoke implementation with pre-spawned workers, channel-based dispatch, and
|
||||||
|
activation control. It handles NUMA correctly but is not reusable.
|
||||||
|
|
||||||
|
Both cases should be replaced by a single generic mechanism.
|
||||||
|
|
||||||
|
## Unified model
|
||||||
|
|
||||||
|
The key insight is that **UMA is just the NUMA case with a single node**. The
|
||||||
|
runner always works the same way: one controller thread per node, each
|
||||||
|
independently managing its own workers with the same adaptive logic. The only
|
||||||
|
difference between UMA and NUMA is the number of nodes and whether workers are
|
||||||
|
pinned.
|
||||||
|
|
||||||
|
```
|
||||||
|
NUMA (k nodes) UMA (1 node)
|
||||||
|
|
||||||
|
controller-0 controller-1 … controller-0
|
||||||
|
│ │ │
|
||||||
|
workers[0] workers[1] workers[0]
|
||||||
|
(pinned) (pinned) (global pool)
|
||||||
|
└───────────────┴──────────────────┘
|
||||||
|
shared work queue
|
||||||
|
```
|
||||||
|
|
||||||
|
On each node, the Rayon `ThreadPool` is pinned to that node's CPUs.
|
||||||
|
`pool.install()` ensures all internal Rayon calls (inside the work function)
|
||||||
|
use the node-local pool. Linux first-touch then places heap allocations in
|
||||||
|
local DRAM automatically.
|
||||||
|
|
||||||
|
On UMA the global Rayon pool is used directly — no pinning, no overhead.
|
||||||
|
|
||||||
|
## Adaptive mechanism
|
||||||
|
|
||||||
|
Each controller follows the same logic regardless of node count:
|
||||||
|
|
||||||
|
1. Pre-spawn `workers_per_node` dormant worker threads (blocked on `activate_rx`).
|
||||||
|
2. Activate the first worker immediately.
|
||||||
|
3. Loop on result channel with a `SPAWN_POLL` timeout:
|
||||||
|
- On result: call `on_done`; check whether to activate the next worker.
|
||||||
|
- On timeout: same check.
|
||||||
|
- Activation criterion: `should_spawn_worker(active, global_efficiency, prev_efficiency)`.
|
||||||
|
4. Drop `activate_tx` when done — dormant workers exit cleanly.
|
||||||
|
|
||||||
|
**Global CPU efficiency** (`CpuSample`, reads `/proc/stat` on Linux) is used by
|
||||||
|
all controllers — no per-node measurement needed. The signal is coarser than
|
||||||
|
per-node efficiency but correct in practice: if any node saturates memory
|
||||||
|
bandwidth, the global efficiency drops and all controllers stop activating
|
||||||
|
workers. Using a standard portable primitive avoids platform-specific CPU
|
||||||
|
accounting and keeps the implementation clean.
|
||||||
|
|
||||||
|
## Proposed API
|
||||||
|
|
||||||
|
```rust
|
||||||
|
pub struct PartitionRunner {
|
||||||
|
// One entry per NUMA node; one entry total on UMA.
|
||||||
|
nodes: Vec<NodeConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct NodeConfig {
|
||||||
|
pool: Option<Arc<rayon::ThreadPool>>, // None = global Rayon pool (UMA)
|
||||||
|
cpu_ids: Vec<usize>, // empty = no pinning (UMA)
|
||||||
|
max_workers: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartitionRunner {
|
||||||
|
/// Detect topology and build the runner.
|
||||||
|
/// Returns a single-node runner on UMA / macOS / hwloc failure.
|
||||||
|
pub fn new() -> Self;
|
||||||
|
|
||||||
|
/// Run `f(i)` for every index in `order`, collecting results.
|
||||||
|
///
|
||||||
|
/// `on_done(i, result, elapsed)` is called under an internal mutex as
|
||||||
|
/// each partition completes — use it for progress bars and aggregation.
|
||||||
|
/// The runner serialises all calls to `on_done` via an internal
|
||||||
|
/// `Arc<Mutex<C>>`, so no `Sync` bound is required on the callback.
|
||||||
|
/// `Send` is required because the Arc clone crosses thread boundaries.
|
||||||
|
///
|
||||||
|
/// Serialisation is free in practice: a partition takes seconds to
|
||||||
|
/// minutes; the callback takes microseconds. Contention is negligible.
|
||||||
|
///
|
||||||
|
/// Returns the first error from `f`, if any.
|
||||||
|
pub fn run<F, R, E, C>(
|
||||||
|
&self,
|
||||||
|
order: &[usize],
|
||||||
|
f: F,
|
||||||
|
on_done: C,
|
||||||
|
) -> Result<(), E>
|
||||||
|
where
|
||||||
|
F: Fn(usize) -> Result<R, E> + Send + Sync,
|
||||||
|
R: Send,
|
||||||
|
E: Send,
|
||||||
|
C: FnMut(usize, R, Duration) + Send; // Send required, Sync is not
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
`order` is caller-supplied so each command chooses its scheduling strategy:
|
||||||
|
largest-first for `merge`, sequential for `build_layers`, etc.
|
||||||
|
|
||||||
|
## Migration examples
|
||||||
|
|
||||||
|
### merge.rs (before: ~180 lines of bespoke machinery)
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let runner = PartitionRunner::new();
|
||||||
|
runner.run(
|
||||||
|
&order,
|
||||||
|
|i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence)
|
||||||
|
.map_err(OKIError::Partition),
|
||||||
|
|i, g_len, dur| {
|
||||||
|
pb.inc(1);
|
||||||
|
debug!("partition {i}: done in {:.1}s — {g_len} new kmers", dur.as_secs_f64());
|
||||||
|
part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len });
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
```
|
||||||
|
|
||||||
|
### index.rs build_layers (before: naive into_par_iter)
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let order: Vec<usize> = (0..n).collect();
|
||||||
|
let runner = PartitionRunner::new();
|
||||||
|
runner.run(
|
||||||
|
&order,
|
||||||
|
|i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits)
|
||||||
|
.map_err(OKIError::Partition),
|
||||||
|
|_, n_kmers, _| {
|
||||||
|
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
|
||||||
|
pb.inc(1);
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
```
|
||||||
|
|
||||||
|
All other sites (`pack_matrices`, `dump`, `select`, etc.) follow the same
|
||||||
|
pattern.
|
||||||
|
|
||||||
|
## Placement
|
||||||
|
|
||||||
|
`PartitionRunner` lives in `obikindex/src/numa.rs` alongside `NumaSetup`.
|
||||||
|
It depends only on standard library primitives and Rayon — no new dependencies.
|
||||||
|
|
||||||
|
A single `PartitionRunner` instance can be built once per command invocation
|
||||||
|
and reused across multiple `run()` calls (e.g. `merge` runs
|
||||||
|
`merge_partitions` then `pack_matrices`).
|
||||||
|
|
||||||
|
## Open questions
|
||||||
|
|
||||||
|
- **Error handling**: `run` currently returns the first error; remaining errors
|
||||||
|
are dropped. A `Vec<E>` return would give complete diagnostics.
|
||||||
|
|
||||||
|
- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
|
||||||
|
for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from
|
||||||
|
a higher value. A per-call override could be added to the API.
|
||||||
|
|
||||||
|
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
|
||||||
|
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
|
||||||
|
boundaries); `Sync` is not (only one thread holds the lock at a time).
|
||||||
|
Contention is negligible because a partition takes seconds while the callback
|
||||||
|
takes microseconds. The callback is therefore simple to write (plain
|
||||||
|
`Vec::push`, plain `FnMut`) with no measurable performance cost.
|
||||||
@@ -57,6 +57,7 @@ nav:
|
|||||||
- Sequences: architecture/sequences/invariant.md
|
- Sequences: architecture/sequences/invariant.md
|
||||||
- Kmer index: architecture/index_architecture.md
|
- Kmer index: architecture/index_architecture.md
|
||||||
- NUMA-aware worker pools: architecture/numa_worker_pools.md
|
- NUMA-aware worker pools: architecture/numa_worker_pools.md
|
||||||
|
- NUMA-aware partition runner: architecture/numa_partition_runner.md
|
||||||
|
|
||||||
watch:
|
watch:
|
||||||
- docmd
|
- docmd
|
||||||
|
|||||||
+14
-192
@@ -2,10 +2,8 @@ use std::collections::HashMap;
|
|||||||
use std::fs;
|
use std::fs;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
|
|
||||||
use crossbeam_channel::unbounded;
|
use obisys::{Reporter, Stage, progress_bar, spinner};
|
||||||
use obisys::{CpuSample, Reporter, Stage, progress_bar, spinner};
|
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
|
|
||||||
use obilayeredmap::IndexMode;
|
use obilayeredmap::IndexMode;
|
||||||
@@ -26,24 +24,6 @@ struct PartStat {
|
|||||||
g_len: usize,
|
g_len: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── adaptive spawn criterion ──────────────────────────────────────────────────
|
|
||||||
// First worker: spawn if efficiency < SPAWN_THRESHOLD (CPU is underutilised).
|
|
||||||
// Subsequent workers: spawn only if the last spawn raised efficiency by at
|
|
||||||
// least the expected marginal gain (1/n_workers), with a minimum floor of 3%
|
|
||||||
// to avoid spurious spawns when efficiency fluctuates around the threshold.
|
|
||||||
const SPAWN_THRESHOLD: f64 = 0.95;
|
|
||||||
const MIN_MARGINAL_GAIN: f64 = 0.03;
|
|
||||||
|
|
||||||
fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool {
|
|
||||||
if n_workers == 1 {
|
|
||||||
eff < SPAWN_THRESHOLD
|
|
||||||
} else {
|
|
||||||
let gain = eff - eff_at_last_spawn;
|
|
||||||
let expected = 1.0 / n_workers as f64;
|
|
||||||
gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── main merge entry point ────────────────────────────────────────────────────
|
// ── main merge entry point ────────────────────────────────────────────────────
|
||||||
|
|
||||||
impl KmerIndex {
|
impl KmerIndex {
|
||||||
@@ -241,191 +221,33 @@ impl KmerIndex {
|
|||||||
let mut order: Vec<usize> = (0..n_partitions).collect();
|
let mut order: Vec<usize> = (0..n_partitions).collect();
|
||||||
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
|
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
|
||||||
|
|
||||||
// ── Adaptive worker pool ──────────────────────────────────────────
|
|
||||||
// Default (non-NUMA): start with 1 worker, grow adaptively up to
|
|
||||||
// n_cores/2 based on CPU efficiency.
|
|
||||||
//
|
|
||||||
// NUMA mode (Linux, multi-node): one pinned Rayon ThreadPool per
|
|
||||||
// NUMA node, workers_per_node workers per node, all pre-activated.
|
|
||||||
// No adaptive spawn: the optimal count is fixed by memory bandwidth.
|
|
||||||
let n_cores = std::thread::available_parallelism()
|
|
||||||
.map(|n| n.get())
|
|
||||||
.unwrap_or(1);
|
|
||||||
let max_workers = (n_cores / 2).max(1);
|
|
||||||
let _ = budget_fraction; // kept in signature for CLI compatibility
|
let _ = budget_fraction; // kept in signature for CLI compatibility
|
||||||
|
|
||||||
let numa = crate::numa::build();
|
|
||||||
|
|
||||||
// effective_max_workers: slots to pre-spawn.
|
|
||||||
// numa_all_active: whether to activate all slots immediately.
|
|
||||||
let (effective_max_workers, numa_all_active) = match &numa {
|
|
||||||
Some(ns) => (ns.pools.len() * ns.workers_per_node(), true),
|
|
||||||
None => (max_workers, false),
|
|
||||||
};
|
|
||||||
|
|
||||||
let (part_tx, part_rx) = unbounded::<usize>();
|
|
||||||
let (result_tx, result_rx) =
|
|
||||||
unbounded::<(usize, Result<usize, obiskio::SKError>, Duration)>();
|
|
||||||
// activate_tx: controller sends () to wake the next dormant worker.
|
|
||||||
// Dropping activate_tx closes the channel; dormant workers exit.
|
|
||||||
let (activate_tx, activate_rx) = unbounded::<()>();
|
|
||||||
|
|
||||||
for &i in &order {
|
|
||||||
part_tx.send(i).ok();
|
|
||||||
}
|
|
||||||
drop(part_tx);
|
|
||||||
|
|
||||||
let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
|
|
||||||
let mut n_workers = 0usize;
|
|
||||||
let mut cpu_sample = CpuSample::now();
|
|
||||||
// Efficiency measured just before each spawn, used to assess
|
|
||||||
// whether the previous worker delivered its expected marginal gain.
|
|
||||||
let mut efficiency_at_last_spawn = 0.0f64;
|
|
||||||
|
|
||||||
// Shadow as references so closures can capture them by copy.
|
// Shadow as references so closures can capture them by copy.
|
||||||
let srcs = &srcs;
|
let srcs = &srcs;
|
||||||
let evidence = &evidence;
|
let evidence = &evidence;
|
||||||
|
|
||||||
if let Some(ns) = &numa {
|
let runner = crate::numa::PartitionRunner::new();
|
||||||
debug!(
|
let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
|
||||||
"NUMA mode: {} node(s) × {} worker(s)/node = {} total workers",
|
|
||||||
ns.pools.len(),
|
|
||||||
ns.workers_per_node(),
|
|
||||||
effective_max_workers,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::thread::scope(|s| -> OKIResult<()> {
|
runner.run(
|
||||||
// Pre-spawn threads. In NUMA mode each thread is pinned to its
|
&order,
|
||||||
// node's CPUs and wraps merge_partition in pool.install() so
|
|i| dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence),
|
||||||
// that all Rayon calls use the node-local ThreadPool, and
|
|i, g_len, dur| {
|
||||||
// Linux first-touch places graph allocations in local DRAM.
|
|
||||||
for worker_idx in 0..effective_max_workers {
|
|
||||||
let prx = part_rx.clone();
|
|
||||||
let rtx = result_tx.clone();
|
|
||||||
let arx = activate_rx.clone();
|
|
||||||
|
|
||||||
// Per-worker NUMA config: (pool, cpus) for this slot.
|
|
||||||
let numa_config: Option<(std::sync::Arc<rayon::ThreadPool>, Vec<usize>)> =
|
|
||||||
numa.as_ref().map(|ns| {
|
|
||||||
let wpn = ns.workers_per_node();
|
|
||||||
let node = worker_idx / wpn;
|
|
||||||
(
|
|
||||||
std::sync::Arc::clone(&ns.pools[node]),
|
|
||||||
ns.cpus_per_node[node].clone(),
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
s.spawn(move || {
|
|
||||||
if let Some((_, ref cpus)) = numa_config {
|
|
||||||
crate::numa::pin_current_thread(cpus);
|
|
||||||
}
|
|
||||||
if arx.recv().is_ok() {
|
|
||||||
for i in &prx {
|
|
||||||
let t = Instant::now();
|
|
||||||
let r = if let Some((ref pool, _)) = numa_config {
|
|
||||||
pool.install(|| {
|
|
||||||
dst_partition.merge_partition(
|
|
||||||
i, srcs, mode, n_dst_genomes, block_bits, evidence,
|
|
||||||
)
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
dst_partition.merge_partition(
|
|
||||||
i, srcs, mode, n_dst_genomes, block_bits, evidence,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
rtx.send((i, r, t.elapsed())).ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
drop(result_tx);
|
|
||||||
|
|
||||||
if numa_all_active {
|
|
||||||
// NUMA: activate every worker immediately.
|
|
||||||
for _ in 0..effective_max_workers {
|
|
||||||
activate_tx.send(()).ok();
|
|
||||||
}
|
|
||||||
n_workers = effective_max_workers;
|
|
||||||
} else {
|
|
||||||
// Non-NUMA: activate first worker, grow adaptively.
|
|
||||||
activate_tx.send(()).ok();
|
|
||||||
n_workers = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
const SPAWN_POLL: Duration = Duration::from_secs(20);
|
|
||||||
|
|
||||||
let mut completed = 0usize;
|
|
||||||
while completed < n_partitions {
|
|
||||||
let result = result_rx.recv_timeout(SPAWN_POLL);
|
|
||||||
|
|
||||||
let (i, r, dur) = match result {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
|
|
||||||
if !numa_all_active && n_workers < effective_max_workers {
|
|
||||||
let eff = cpu_sample.cpu_efficiency(n_cores);
|
|
||||||
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
|
|
||||||
debug!(
|
|
||||||
"activated worker {} (poll) — efficiency {:.0}%",
|
|
||||||
n_workers + 1,
|
|
||||||
eff * 100.0,
|
|
||||||
);
|
|
||||||
efficiency_at_last_spawn = eff;
|
|
||||||
activate_tx.send(()).ok();
|
|
||||||
n_workers += 1;
|
|
||||||
cpu_sample = CpuSample::now();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
|
|
||||||
return Err(OKIError::Io(io::Error::new(
|
|
||||||
io::ErrorKind::UnexpectedEof,
|
|
||||||
"worker channel closed",
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let g_len = r.map_err(OKIError::Partition)?;
|
|
||||||
pb.inc(1);
|
pb.inc(1);
|
||||||
debug!(
|
debug!(
|
||||||
"partition {i}: done in {:.1}s — {} new kmers",
|
"partition {i}: done in {:.1}s — {} new kmers",
|
||||||
dur.as_secs_f64(),
|
dur.as_secs_f64(),
|
||||||
g_len
|
|
||||||
);
|
|
||||||
part_stats.push(PartStat {
|
|
||||||
id: i,
|
|
||||||
unitig_bytes: partition_sizes[i],
|
|
||||||
g_len,
|
g_len,
|
||||||
});
|
);
|
||||||
completed += 1;
|
part_stats.push(PartStat { id: i, unitig_bytes: partition_sizes[i], g_len });
|
||||||
|
},
|
||||||
if !numa_all_active && n_workers < effective_max_workers && completed < n_partitions {
|
).map_err(OKIError::Partition)?;
|
||||||
let eff = cpu_sample.cpu_efficiency(n_cores);
|
|
||||||
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
|
|
||||||
debug!(
|
|
||||||
"activated worker {} — efficiency {:.0}%, gain vs prev {:.0}%",
|
|
||||||
n_workers + 1,
|
|
||||||
eff * 100.0,
|
|
||||||
(eff - efficiency_at_last_spawn) * 100.0,
|
|
||||||
);
|
|
||||||
efficiency_at_last_spawn = eff;
|
|
||||||
activate_tx.send(()).ok();
|
|
||||||
n_workers += 1;
|
|
||||||
cpu_sample = CpuSample::now();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Dropping activate_tx signals dormant workers to exit cleanly
|
|
||||||
// (non-NUMA). In NUMA mode all workers were already activated so
|
|
||||||
// this drop is just cleanup.
|
|
||||||
drop(activate_tx);
|
|
||||||
Ok(())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
pb.finish_and_clear();
|
pb.finish_and_clear();
|
||||||
|
|
||||||
// ── Diagnostic report ─────────────────────────────────────────────
|
// ── Diagnostic report ─────────────────────────────────────────────
|
||||||
print_merge_partition_report(&part_stats, n_workers, effective_max_workers);
|
print_merge_partition_report(&part_stats, runner.max_workers());
|
||||||
|
|
||||||
rep.push(t.stop());
|
rep.push(t.stop());
|
||||||
}
|
}
|
||||||
@@ -447,7 +269,7 @@ impl KmerIndex {
|
|||||||
|
|
||||||
// ── Diagnostic report ─────────────────────────────────────────────────────────
|
// ── Diagnostic report ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_workers: usize) {
|
fn print_merge_partition_report(stats: &[PartStat], max_workers: usize) {
|
||||||
let total_new: usize = stats.iter().map(|s| s.g_len).sum();
|
let total_new: usize = stats.iter().map(|s| s.g_len).sum();
|
||||||
let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count();
|
let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count();
|
||||||
|
|
||||||
@@ -461,7 +283,7 @@ fn print_merge_partition_report(stats: &[PartStat], n_workers: usize, max_worker
|
|||||||
" {} partition(s) processed, {} total new kmers",
|
" {} partition(s) processed, {} total new kmers",
|
||||||
non_empty, total_new,
|
non_empty, total_new,
|
||||||
);
|
);
|
||||||
info!(" workers spawned: {n_workers} / {max_workers} (max)",);
|
info!(" max workers: {max_workers}");
|
||||||
|
|
||||||
// Top 8 partitions by new-kmer count
|
// Top 8 partitions by new-kmer count
|
||||||
let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect();
|
let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect();
|
||||||
|
|||||||
+254
-1
@@ -10,12 +10,15 @@
|
|||||||
// - the system has only one NUMA node (UMA, Apple Silicon, single-socket)
|
// - the system has only one NUMA node (UMA, Apple Silicon, single-socket)
|
||||||
// - any per-node pool fails to build
|
// - any per-node pool fails to build
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use crossbeam_channel::{RecvTimeoutError, unbounded};
|
||||||
use hwlocality::Topology;
|
use hwlocality::Topology;
|
||||||
use hwlocality::cpu::binding::CpuBindingFlags;
|
use hwlocality::cpu::binding::CpuBindingFlags;
|
||||||
use hwlocality::cpu::cpuset::CpuSet;
|
use hwlocality::cpu::cpuset::CpuSet;
|
||||||
use hwlocality::object::types::ObjectType;
|
use hwlocality::object::types::ObjectType;
|
||||||
|
use obisys::CpuSample;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
// ── Public interface ──────────────────────────────────────────────────────────
|
// ── Public interface ──────────────────────────────────────────────────────────
|
||||||
@@ -100,3 +103,253 @@ fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
|
|||||||
.build()
|
.build()
|
||||||
.ok()
|
.ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Adaptive spawn heuristic ──────────────────────────────────────────────────
|
||||||
|
//
|
||||||
|
// First worker: spawn if CPU efficiency is below SPAWN_THRESHOLD (machine is
|
||||||
|
// under-utilised). Subsequent workers: spawn only if the last worker raised
|
||||||
|
// efficiency by at least the expected marginal gain (1/n_workers), with a
|
||||||
|
// minimum floor to avoid spurious spawns from efficiency fluctuations.
|
||||||
|
|
||||||
|
const SPAWN_THRESHOLD: f64 = 0.95;
|
||||||
|
const MIN_MARGINAL_GAIN: f64 = 0.03;
|
||||||
|
const SPAWN_POLL: Duration = Duration::from_secs(20);
|
||||||
|
|
||||||
|
fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool {
|
||||||
|
if n_workers == 1 {
|
||||||
|
eff < SPAWN_THRESHOLD
|
||||||
|
} else {
|
||||||
|
let gain = eff - eff_at_last_spawn;
|
||||||
|
let expected = 1.0 / n_workers as f64;
|
||||||
|
gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── PartitionRunner ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
struct NodeConfig {
|
||||||
|
pool: Option<Arc<rayon::ThreadPool>>,
|
||||||
|
cpu_ids: Vec<usize>,
|
||||||
|
max_workers: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generic NUMA-aware runner for partition-level parallel work.
|
||||||
|
///
|
||||||
|
/// Encapsulates worker spawning, NUMA pinning, adaptive activation, and result
|
||||||
|
/// collection. UMA systems are handled as the degenerate case of a single node
|
||||||
|
/// with no pinning.
|
||||||
|
///
|
||||||
|
/// # Model
|
||||||
|
///
|
||||||
|
/// One controller thread per NUMA node (one total on UMA). Each controller
|
||||||
|
/// manages up to `max_workers` dormant workers that drain a shared work queue.
|
||||||
|
/// Workers are activated one at a time; a new worker is added when global CPU
|
||||||
|
/// efficiency justifies it. On NUMA all workers are activated immediately
|
||||||
|
/// (memory bandwidth, not CPU count, is the bottleneck).
|
||||||
|
pub struct PartitionRunner {
|
||||||
|
nodes: Vec<NodeConfig>,
|
||||||
|
n_cores: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartitionRunner {
|
||||||
|
/// Detect topology and build. Falls back to a single-node UMA runner on
|
||||||
|
/// macOS, single-socket machines, or hwloc failure.
|
||||||
|
/// Total number of pre-spawned worker slots across all nodes.
|
||||||
|
pub fn max_workers(&self) -> usize {
|
||||||
|
self.nodes.iter().map(|n| n.max_workers).sum()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new() -> Self {
|
||||||
|
let n_cores = std::thread::available_parallelism()
|
||||||
|
.map(|n| n.get())
|
||||||
|
.unwrap_or(1);
|
||||||
|
|
||||||
|
match build() {
|
||||||
|
Some(ns) => {
|
||||||
|
let wpn = ns.workers_per_node();
|
||||||
|
debug!(
|
||||||
|
"PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node",
|
||||||
|
ns.pools.len(),
|
||||||
|
wpn,
|
||||||
|
);
|
||||||
|
let nodes = ns.pools
|
||||||
|
.into_iter()
|
||||||
|
.zip(ns.cpus_per_node)
|
||||||
|
.map(|(pool, cpu_ids)| NodeConfig {
|
||||||
|
pool: Some(pool),
|
||||||
|
cpu_ids,
|
||||||
|
max_workers: wpn,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Self { nodes, n_cores }
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
let max_workers = (n_cores / 2).max(1);
|
||||||
|
debug!(
|
||||||
|
"PartitionRunner: UMA mode — adaptive up to {} worker(s)",
|
||||||
|
max_workers,
|
||||||
|
);
|
||||||
|
Self {
|
||||||
|
nodes: vec![NodeConfig {
|
||||||
|
pool: None,
|
||||||
|
cpu_ids: vec![],
|
||||||
|
max_workers,
|
||||||
|
}],
|
||||||
|
n_cores,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run `f(i)` for every index in `order`.
|
||||||
|
///
|
||||||
|
/// `on_done(i, result, elapsed)` is called under an internal mutex as each
|
||||||
|
/// partition completes — suitable for progress bars, logging, and result
|
||||||
|
/// aggregation. No `Send` or `Sync` bound is required on the callback.
|
||||||
|
///
|
||||||
|
/// The work queue is shared across all NUMA nodes: any idle worker takes
|
||||||
|
/// the next available partition regardless of node, ensuring load balance.
|
||||||
|
///
|
||||||
|
/// Returns the first error produced by `f`, if any.
|
||||||
|
pub fn run<F, R, E, C>(
|
||||||
|
&self,
|
||||||
|
order: &[usize],
|
||||||
|
f: F,
|
||||||
|
on_done: C,
|
||||||
|
) -> Result<(), E>
|
||||||
|
where
|
||||||
|
F: Fn(usize) -> Result<R, E> + Send + Sync,
|
||||||
|
R: Send,
|
||||||
|
E: Send,
|
||||||
|
C: FnMut(usize, R, Duration) + Send,
|
||||||
|
{
|
||||||
|
let f = Arc::new(f);
|
||||||
|
let on_done = Arc::new(Mutex::new(on_done));
|
||||||
|
let first_err: Arc<Mutex<Option<E>>> = Arc::new(Mutex::new(None));
|
||||||
|
|
||||||
|
// Shared work queue — pre-loaded in caller-supplied order.
|
||||||
|
let (part_tx, part_rx) = unbounded::<usize>();
|
||||||
|
for &i in order {
|
||||||
|
part_tx.send(i).ok();
|
||||||
|
}
|
||||||
|
drop(part_tx);
|
||||||
|
|
||||||
|
let n_cores = self.n_cores;
|
||||||
|
|
||||||
|
std::thread::scope(|s| {
|
||||||
|
for node in &self.nodes {
|
||||||
|
let f = Arc::clone(&f);
|
||||||
|
let on_done = Arc::clone(&on_done);
|
||||||
|
let first_err = Arc::clone(&first_err);
|
||||||
|
let part_rx = part_rx.clone();
|
||||||
|
|
||||||
|
s.spawn(move || {
|
||||||
|
// Per-node result and activation channels.
|
||||||
|
let (result_tx, result_rx) =
|
||||||
|
unbounded::<(usize, Result<R, E>, Duration)>();
|
||||||
|
let (activate_tx, activate_rx) = unbounded::<()>();
|
||||||
|
|
||||||
|
std::thread::scope(|ws| {
|
||||||
|
// Pre-spawn workers (all dormant until activated).
|
||||||
|
for _ in 0..node.max_workers {
|
||||||
|
let prx = part_rx.clone();
|
||||||
|
let rtx = result_tx.clone();
|
||||||
|
let arx = activate_rx.clone();
|
||||||
|
let f = Arc::clone(&f);
|
||||||
|
let pool = node.pool.clone();
|
||||||
|
let cpu_ids = node.cpu_ids.clone();
|
||||||
|
|
||||||
|
ws.spawn(move || {
|
||||||
|
if !cpu_ids.is_empty() {
|
||||||
|
pin_current_thread(&cpu_ids);
|
||||||
|
}
|
||||||
|
if arx.recv().is_err() {
|
||||||
|
return; // never activated — exit cleanly
|
||||||
|
}
|
||||||
|
for i in &prx {
|
||||||
|
let t = Instant::now();
|
||||||
|
let r = match &pool {
|
||||||
|
Some(p) => p.install(|| f(i)),
|
||||||
|
None => f(i),
|
||||||
|
};
|
||||||
|
rtx.send((i, r, t.elapsed())).ok();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// Drop the controller's copy: result_rx disconnects
|
||||||
|
// once all worker copies are also dropped (workers done).
|
||||||
|
drop(result_tx);
|
||||||
|
|
||||||
|
// In NUMA mode activate all workers immediately;
|
||||||
|
// in UMA mode activate one and grow adaptively.
|
||||||
|
let numa_mode = node.pool.is_some();
|
||||||
|
let initial = if numa_mode { node.max_workers } else { 1 };
|
||||||
|
for _ in 0..initial {
|
||||||
|
activate_tx.send(()).ok();
|
||||||
|
}
|
||||||
|
let mut active_workers = initial;
|
||||||
|
let mut cpu_sample = CpuSample::now();
|
||||||
|
let mut eff_at_last_spawn = 0.0f64;
|
||||||
|
|
||||||
|
// Controller loop.
|
||||||
|
loop {
|
||||||
|
match result_rx.recv_timeout(SPAWN_POLL) {
|
||||||
|
Ok((i, r, dur)) => {
|
||||||
|
match r {
|
||||||
|
Ok(v) => {
|
||||||
|
on_done.lock().unwrap()(i, v, dur);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let mut g = first_err.lock().unwrap();
|
||||||
|
if g.is_none() { *g = Some(e); }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !numa_mode && active_workers < node.max_workers {
|
||||||
|
let eff = cpu_sample.cpu_efficiency(n_cores);
|
||||||
|
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
|
||||||
|
debug!(
|
||||||
|
"activated worker {} — efficiency {:.0}%",
|
||||||
|
active_workers + 1,
|
||||||
|
eff * 100.0,
|
||||||
|
);
|
||||||
|
activate_tx.send(()).ok();
|
||||||
|
active_workers += 1;
|
||||||
|
eff_at_last_spawn = eff;
|
||||||
|
cpu_sample = CpuSample::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(RecvTimeoutError::Timeout) => {
|
||||||
|
if !numa_mode && active_workers < node.max_workers {
|
||||||
|
let eff = cpu_sample.cpu_efficiency(n_cores);
|
||||||
|
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
|
||||||
|
debug!(
|
||||||
|
"activated worker {} (poll) — efficiency {:.0}%",
|
||||||
|
active_workers + 1,
|
||||||
|
eff * 100.0,
|
||||||
|
);
|
||||||
|
activate_tx.send(()).ok();
|
||||||
|
active_workers += 1;
|
||||||
|
eff_at_last_spawn = eff;
|
||||||
|
cpu_sample = CpuSample::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(RecvTimeoutError::Disconnected) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Signal any dormant workers that were never activated
|
||||||
|
// to exit (UMA mode where max_workers was never reached).
|
||||||
|
drop(activate_tx);
|
||||||
|
}); // ws: waits for all workers of this node
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}); // s: waits for all node controllers
|
||||||
|
|
||||||
|
let mut g = first_err.lock().unwrap();
|
||||||
|
match g.take() {
|
||||||
|
Some(e) => Err(e),
|
||||||
|
None => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user