refactor(numa): replace flat runner with per-node activation channels
Shifts the NUMA-aware runner from a flat, round-robin model to a per-node architecture using dedicated `NodeActivation` channels. Replaces absolute deltas with relative scaling based on the previous growth step's worker count, decoupling growth from node count to fix slow ramp-up and enforce per-node fairness. Updates architecture documentation to reflect these changes and focus tuning questions on `INITIAL`/`GROWTH_DIVISOR` parameters for I/O-bound validation.
This commit is contained in:
@@ -205,14 +205,28 @@ unconditionally — no `||` short-circuit — so neither window starves behind
|
||||
whichever signal fires first:
|
||||
|
||||
```rust
|
||||
let cpu_wants_more = cpu_sample.do_i_activate(CPU_SPAWN_THRESHOLD);
|
||||
let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
|
||||
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
|
||||
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD);
|
||||
if cpu_wants_more || io_wants_more {
|
||||
activate_tx.send(()).ok();
|
||||
...
|
||||
activation.grow(GROWTH_DIVISOR, n_total);
|
||||
}
|
||||
```
|
||||
|
||||
The CPU threshold is *not* the flat absolute delta it started as: it scales
|
||||
with `activation.last_step()` — the number of workers activated in the last
|
||||
growth step, tracked by `NodeActivation` (`numa.rs`) and updated every time
|
||||
`grow()` actually grows something. Growing by 8 workers should add ~8 cores of
|
||||
efficiency if the workload is truly CPU-bound; requiring only
|
||||
`CPU_SPAWN_THRESHOLD` (20 %) of that expected gain confirms the growth was
|
||||
useful without demanding perfect linear scaling. Scaling by the *last step's
|
||||
size* rather than the cumulative total keeps the bar equally meaningful
|
||||
whether it's the 2nd growth step or the 20th — a flat absolute threshold
|
||||
(0.2 core) is a strong signal at 8 active workers but pure noise at 150; a
|
||||
threshold scaled by the *cumulative* total instead (considered and rejected)
|
||||
would have made the bar essentially impossible to clear late in the ramp,
|
||||
strangling exactly the CPU-bound saturation the mechanism exists to allow.
|
||||
|
||||
Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit),
|
||||
raw I/O throughput has no natural scale across devices, so `IoSample` uses a
|
||||
**relative** growth threshold instead of an absolute one:
|
||||
@@ -242,27 +256,64 @@ the ~94-core artefact above) — one fix for both problems, and it removes the
|
||||
need for any arbitrary I/O-rate floor: a short/noisy window is rejected
|
||||
outright rather than papered over with a hardware-dependent constant.
|
||||
|
||||
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, both `0.2`)
|
||||
are defined as `const` in `PartitionRunner::run` (`numa.rs`). The I/O value is
|
||||
a starting point, not a derived one — needs empirical validation against a
|
||||
real `pack` run.
|
||||
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, module-level
|
||||
`const` in `numa.rs`, both `0.2`) are a starting point, not a derived value:
|
||||
`0.2` (20 % relative growth) for `IoSample` was chosen to match the CPU
|
||||
threshold's *implicit* relative sensitivity (in the observed log, an 8→9
|
||||
worker step raised efficiency by ~12 %) — but I/O throughput is lumpier than
|
||||
CPU time (buffered writes flush in bursts), so it needs empirical validation
|
||||
against a real `pack` run before being considered final.
|
||||
|
||||
Starting threshold: `0.2` (20 % relative growth) for `IoSample`, same order of
|
||||
magnitude as the CPU threshold's *implicit* relative sensitivity (in the
|
||||
observed log, an 8→9 worker step raised efficiency by ~12 %). This is a
|
||||
starting point, not a derived value — I/O throughput is lumpier than CPU time
|
||||
(buffered writes flush in bursts), so it needs empirical validation against a
|
||||
real `pack` run before being considered final.
|
||||
## Known issue: ramp-up too slow, and confused with node count
|
||||
|
||||
The original design started `n_nodes` workers (one per node) and grew one
|
||||
worker at a time. On a real `filter` run this took ~10 minutes to climb from
|
||||
9 to ~40 active workers even on the CPU-bound `rebuild` stage — most of a
|
||||
35-minute stage spent under-provisioned while waiting for evidence to
|
||||
accumulate one worker at a time. There is no scale-down mechanism (`n_active`
|
||||
only grows), so the original caution was deliberate — but a quarter of
|
||||
available cores is still far from saturation, and the real risk zone (over-provisioning
|
||||
a memory-bandwidth-bound stage) only shows up much later in the ramp, near
|
||||
full occupancy — not at 25 %.
|
||||
|
||||
The fix decouples ramp speed from node *count*: both the initial size and the
|
||||
growth step are a fraction of `workers_per_node` (node *size*), applied
|
||||
identically on every node. A single-NUMA-node (UMA) machine ramps exactly as
|
||||
fast as an 8-node one — growing by `n_nodes` per step, as first considered,
|
||||
would have degenerated to "grow by 1" on UMA, reproducing the original
|
||||
problem for exactly the machines that need the fix most.
|
||||
|
||||
```rust
|
||||
// NodeActivation::grow — called both at startup (activate_initial) and on
|
||||
// every CPU/IO-triggered growth step, with a different divisor each time.
|
||||
let wanted = (self.caps[idx] / divisor).max(1); // INITIAL_DIVISOR=4 at startup, GROWTH_DIVISOR=8 per step
|
||||
let room = self.caps[idx].saturating_sub(self.active[idx]);
|
||||
let grow = wanted.min(room).min(n_total.saturating_sub(self.total));
|
||||
```
|
||||
|
||||
This also fixed a latent correctness gap: the original single shared
|
||||
`activate_tx`/`activate_rx` pair had *no* per-node addressing — sending one
|
||||
activation signal woke up whichever dormant worker (from any node) happened
|
||||
to win the race on that channel. `crossbeam_channel` gives no fairness
|
||||
guarantee across competing receivers, so "round-robin across nodes" was an
|
||||
assumption the code never actually enforced. `PartitionRunner::run` now opens
|
||||
one activation channel per node (`activate_txs`/`activate_rxs`, one pair per
|
||||
`NodeConfig`); `NodeActivation` (`numa.rs`) tracks how many of each node's
|
||||
dormant workers have been woken and grows every node by the same amount per
|
||||
step, capped by that node's remaining dormant workers and by the run's total
|
||||
budget (`n_total`) — balance across nodes is now guaranteed by construction,
|
||||
not incidental to channel implementation details.
|
||||
|
||||
## Open questions
|
||||
|
||||
- **Error handling**: `run` currently returns the first error; remaining errors
|
||||
are dropped. A `Vec<E>` return would give complete diagnostics.
|
||||
|
||||
- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
|
||||
for merge on BeeGFS. Superseded by the I/O signal above for the "more
|
||||
workers would help despite flat CPU" case — a per-call override may still be
|
||||
worth keeping as a manual escape hatch.
|
||||
- **`INITIAL_DIVISOR` / `GROWTH_DIVISOR` tuning**: currently `4` and `8`
|
||||
(start at 1/4 of a node's cores, grow by 1/8 per step), chosen to fix an
|
||||
observed too-slow ramp — not yet validated against a real `pack` (I/O-bound)
|
||||
run, where over-provisioning risk is different from the CPU-bound `rebuild`
|
||||
case this was tuned against.
|
||||
|
||||
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
|
||||
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
|
||||
|
||||
+217
-91
@@ -70,7 +70,10 @@ pub fn build() -> NumaSetup {
|
||||
nodes.len(),
|
||||
nodes.first().map_or(0, |v| v.len()),
|
||||
);
|
||||
return NumaSetup { pools, cpus_per_node: nodes };
|
||||
return NumaSetup {
|
||||
pools,
|
||||
cpus_per_node: nodes,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -81,7 +84,7 @@ pub fn build() -> NumaSetup {
|
||||
.unwrap_or(1);
|
||||
debug!("UMA: single synthetic node, {} core(s)", n_cores);
|
||||
NumaSetup {
|
||||
pools: vec![None],
|
||||
pools: vec![None],
|
||||
cpus_per_node: vec![(0..n_cores).collect()],
|
||||
}
|
||||
}
|
||||
@@ -93,7 +96,7 @@ pub fn build() -> NumaSetup {
|
||||
.unwrap_or(1);
|
||||
debug!("UMA: single synthetic node, {} core(s)", n_cores);
|
||||
NumaSetup {
|
||||
pools: vec![None],
|
||||
pools: vec![None],
|
||||
cpus_per_node: vec![(0..n_cores).collect()],
|
||||
}
|
||||
}
|
||||
@@ -102,7 +105,9 @@ pub fn build() -> NumaSetup {
|
||||
/// Silently returns on any error so the thread still runs, just unbound.
|
||||
#[cfg(feature = "numa")]
|
||||
pub fn pin_current_thread(cpu_indices: &[usize]) {
|
||||
let Ok(topology) = Topology::new() else { return };
|
||||
let Ok(topology) = Topology::new() else {
|
||||
return;
|
||||
};
|
||||
let mut cpuset = CpuSet::new();
|
||||
for &idx in cpu_indices {
|
||||
cpuset.set(idx);
|
||||
@@ -132,29 +137,48 @@ fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
|
||||
.ok()
|
||||
}
|
||||
|
||||
// ── PartitionRunner ───────────────────────────────────────────────────────────
|
||||
// ── PartitionRunner ─────────────────────────────────────────────────────────
|
||||
|
||||
/// Growth step (fraction of a node's worker capacity added per activation
|
||||
/// event, see [`NodeActivation::grow`]).
|
||||
const GROWTH_DIVISOR: usize = 8;
|
||||
/// Minimum CPU efficiency growth to activate more workers, as a fraction of
|
||||
/// the size of the *last growth step* (e.g. `0.2` after adding 8 workers
|
||||
/// requires the next check to show at least +1.6 cores of growth — 20 % of
|
||||
/// the ~8 cores those 8 workers should contribute if the workload is truly
|
||||
/// CPU-bound). Scaling by the last step's size — not the cumulative total —
|
||||
/// keeps the bar meaningful regardless of how many workers are already
|
||||
/// active, instead of demanding an ever-larger absolute jump as the pool
|
||||
/// grows.
|
||||
const CPU_SPAWN_THRESHOLD: f64 = 0.2;
|
||||
/// Minimum I/O throughput growth (relative) to activate more workers.
|
||||
const IO_SPAWN_THRESHOLD: f64 = 0.2;
|
||||
|
||||
struct NodeConfig {
|
||||
pool: Option<Arc<rayon::ThreadPool>>,
|
||||
cpu_ids: Vec<usize>,
|
||||
pool: Option<Arc<rayon::ThreadPool>>,
|
||||
cpu_ids: Vec<usize>,
|
||||
max_workers: usize,
|
||||
}
|
||||
|
||||
/// Generic NUMA-aware runner for partition-level parallel work.
|
||||
///
|
||||
/// Workers are distributed round-robin across NUMA nodes and pinned to their
|
||||
/// Workers are distributed evenly across NUMA nodes and pinned to their
|
||||
/// node's CPUs. UMA is the degenerate case: one node, no pinning.
|
||||
///
|
||||
/// Workers are pre-spawned dormant and activated one by one as CPU efficiency
|
||||
/// falls below `SPAWN_THRESHOLD`. This avoids over-provisioning on I/O-bound
|
||||
/// or memory-bandwidth-bound workloads while saturating CPU-bound ones.
|
||||
/// Workers are pre-spawned dormant, one activation channel per node so
|
||||
/// growth always targets a specific node rather than whichever dormant
|
||||
/// worker happens to wake up first on a shared channel. Growth (both the
|
||||
/// initial count and each subsequent step) is expressed as a fraction of
|
||||
/// `workers_per_node`, applied identically to every node, so the pace of
|
||||
/// ramp-up depends on node size rather than node count — a single-NUMA-node
|
||||
/// (UMA) machine ramps just as fast as an 8-node one.
|
||||
///
|
||||
/// # Termination
|
||||
///
|
||||
/// ```text
|
||||
/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
|
||||
/// drop(result_tx) → result_rx closes → controller loop exits
|
||||
/// drop(activate_tx) → dormant workers exit cleanly
|
||||
/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
|
||||
/// drop(result_tx) → result_rx closes → controller loop exits
|
||||
/// drop(activate_txs) → dormant workers exit cleanly
|
||||
/// ```
|
||||
pub struct PartitionRunner {
|
||||
nodes: Vec<NodeConfig>,
|
||||
@@ -175,7 +199,8 @@ impl PartitionRunner {
|
||||
ns.pools.len(),
|
||||
wpn,
|
||||
);
|
||||
let nodes = ns.pools
|
||||
let nodes = ns
|
||||
.pools
|
||||
.into_iter()
|
||||
.zip(ns.cpus_per_node)
|
||||
.map(|(pool, cpu_ids)| NodeConfig {
|
||||
@@ -189,26 +214,24 @@ impl PartitionRunner {
|
||||
|
||||
/// Run `f(i)` for every index in `order`.
|
||||
///
|
||||
/// Workers are pre-spawned dormant and activated adaptively. A timer thread
|
||||
/// fires an efficiency check every `TIMER_SECS` seconds; each completed
|
||||
/// partition resets that timer (forcing an immediate check) and also
|
||||
/// triggers its own inline check. A new worker is activated whenever CPU
|
||||
/// efficiency grows by at least `CPU_SPAWN_THRESHOLD` (absolute, in cores)
|
||||
/// or I/O throughput grows by at least `IO_SPAWN_THRESHOLD` (relative) since
|
||||
/// the last check — whichever resource is the actual bottleneck still shows
|
||||
/// headroom.
|
||||
/// Workers are pre-spawned dormant and activated adaptively, per node:
|
||||
/// `(workers_per_node / INITIAL_DIVISOR).max(1)` are woken immediately on
|
||||
/// every node, then `(workers_per_node / GROWTH_DIVISOR).max(1)` more per
|
||||
/// node each time the check below fires. A timer thread fires that check
|
||||
/// every `TIMER_SECS` seconds; each completed partition resets that timer
|
||||
/// (forcing an immediate check) and also triggers its own inline check. A
|
||||
/// growth step happens whenever CPU efficiency grows by at least
|
||||
/// `CPU_SPAWN_THRESHOLD` of what the last growth step should have
|
||||
/// contributed, or I/O throughput grows by at least `IO_SPAWN_THRESHOLD`
|
||||
/// (relative) since the last check — whichever resource is the actual
|
||||
/// bottleneck still shows headroom.
|
||||
///
|
||||
/// `on_done(i, result, elapsed)` is called from the controller thread as
|
||||
/// each partition completes — suitable for progress bars and result
|
||||
/// aggregation.
|
||||
///
|
||||
/// Returns the first error produced by `f`, if any.
|
||||
pub fn run<F, R, E, C>(
|
||||
&self,
|
||||
order: &[usize],
|
||||
f: F,
|
||||
mut on_done: C,
|
||||
) -> Result<(), E>
|
||||
pub fn run<F, R, E, C>(&self, order: &[usize], f: F, mut on_done: C) -> Result<(), E>
|
||||
where
|
||||
F: Fn(usize) -> Result<R, E> + Send + Sync,
|
||||
R: Send,
|
||||
@@ -220,24 +243,29 @@ impl PartitionRunner {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
const CPU_SPAWN_THRESHOLD: f64 = 0.2;
|
||||
const IO_SPAWN_THRESHOLD: f64 = 0.2;
|
||||
const TIMER_SECS: u64 = 30;
|
||||
const TIMER_SECS: u64 = 30;
|
||||
const INITIAL_DIVISOR: usize = 4;
|
||||
|
||||
// ── Channels ──────────────────────────────────────────────────────────
|
||||
let (part_tx, part_rx) = unbounded::<usize>();
|
||||
let (activate_tx, activate_rx) = unbounded::<()>();
|
||||
let (part_tx, part_rx) = unbounded::<usize>();
|
||||
// reset_tx: controller → timer ("reset the 30 s window")
|
||||
let (reset_tx, reset_rx) = unbounded::<()>();
|
||||
let (reset_tx, reset_rx) = unbounded::<()>();
|
||||
// event_tx: workers + timer → controller (unified event stream)
|
||||
let (event_tx, event_rx) = unbounded::<WorkerEvent<R, E>>();
|
||||
let (event_tx, event_rx) = unbounded::<WorkerEvent<R, E>>();
|
||||
// One activation channel per node: growth always targets a specific
|
||||
// node, rather than whichever dormant worker happens to win the race
|
||||
// on a channel shared across all nodes.
|
||||
let (activate_txs, activate_rxs): (Vec<_>, Vec<_>) =
|
||||
(0..self.nodes.len()).map(|_| unbounded::<()>()).unzip();
|
||||
|
||||
for &i in order { part_tx.send(i).ok(); }
|
||||
for &i in order {
|
||||
part_tx.send(i).ok();
|
||||
}
|
||||
drop(part_tx);
|
||||
|
||||
let max_workers = self.max_workers();
|
||||
let n_nodes = self.nodes.len();
|
||||
let f = &f;
|
||||
let node_caps: Vec<usize> = self.nodes.iter().map(|n| n.max_workers).collect();
|
||||
let f = &f;
|
||||
|
||||
let mut first_err: Option<E> = None;
|
||||
|
||||
@@ -260,79 +288,92 @@ impl PartitionRunner {
|
||||
}
|
||||
});
|
||||
|
||||
// ── Pre-spawn workers dormant, round-robin across NUMA nodes ──────
|
||||
for w in 0..max_workers {
|
||||
let node = &self.nodes[w % n_nodes];
|
||||
let prx = part_rx.clone();
|
||||
let etx = event_tx.clone();
|
||||
let arx = activate_rx.clone();
|
||||
let pool = node.pool.clone();
|
||||
// ── Pre-spawn workers dormant, grouped by node ────────────────────
|
||||
// Each worker listens on its own node's activation channel only.
|
||||
for (node, arx) in self.nodes.iter().zip(activate_rxs.iter()) {
|
||||
let cpu_ids = &node.cpu_ids;
|
||||
for _ in 0..node.max_workers {
|
||||
let prx = part_rx.clone();
|
||||
let etx = event_tx.clone();
|
||||
let arx = arx.clone();
|
||||
let pool = node.pool.clone();
|
||||
|
||||
s.spawn(move || {
|
||||
if arx.recv().is_err() { return; }
|
||||
if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); }
|
||||
for i in &prx {
|
||||
let t = Instant::now();
|
||||
let r = match &pool {
|
||||
Some(p) => p.install(|| f(i)),
|
||||
None => f(i),
|
||||
};
|
||||
etx.send(WorkerEvent::Completed(i, r, t.elapsed())).ok();
|
||||
}
|
||||
});
|
||||
s.spawn(move || {
|
||||
if arx.recv().is_err() {
|
||||
return;
|
||||
}
|
||||
if !cpu_ids.is_empty() {
|
||||
pin_current_thread(cpu_ids);
|
||||
}
|
||||
for i in &prx {
|
||||
let t = Instant::now();
|
||||
let r = match &pool {
|
||||
Some(p) => p.install(|| f(i)),
|
||||
None => f(i),
|
||||
};
|
||||
etx.send(WorkerEvent::Completed(i, r, t.elapsed())).ok();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
// Drop controller's event_tx: event_rx closes when all workers +
|
||||
// timer have exited.
|
||||
drop(event_tx);
|
||||
|
||||
// ── Controller ────────────────────────────────────────────────────
|
||||
let initial_workers = n_nodes.min(max_workers).min(n_total);
|
||||
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
|
||||
let mut n_active = initial_workers;
|
||||
let mut activation = NodeActivation::new(&activate_txs, &node_caps, max_workers);
|
||||
activation.activate_initial(INITIAL_DIVISOR, n_total);
|
||||
|
||||
let mut cpu_sample = CpuSample::now();
|
||||
let mut io_sample = IoSample::now();
|
||||
let mut completed = 0usize;
|
||||
let mut io_sample = IoSample::now();
|
||||
let mut completed = 0usize;
|
||||
|
||||
while completed < n_total {
|
||||
let Ok(event) = event_rx.recv() else { break };
|
||||
match event {
|
||||
WorkerEvent::Completed(i, r, dur) => {
|
||||
match r {
|
||||
Ok(v) => on_done(i, v, dur),
|
||||
Err(e) => { if first_err.is_none() { first_err = Some(e); } }
|
||||
Ok(v) => on_done(i, v, dur),
|
||||
Err(e) => {
|
||||
if first_err.is_none() {
|
||||
first_err = Some(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
completed += 1;
|
||||
// Reset the 30 s timer.
|
||||
reset_tx.send(()).ok();
|
||||
// Inline check: same logic as a timer tick.
|
||||
maybe_activate(
|
||||
&activate_tx, &mut n_active, max_workers,
|
||||
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
|
||||
&mut io_sample, IO_SPAWN_THRESHOLD,
|
||||
completed, n_total,
|
||||
&mut activation,
|
||||
&mut cpu_sample,
|
||||
&mut io_sample,
|
||||
completed,
|
||||
n_total,
|
||||
);
|
||||
}
|
||||
WorkerEvent::TimerTick => {
|
||||
maybe_activate(
|
||||
&activate_tx, &mut n_active, max_workers,
|
||||
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
|
||||
&mut io_sample, IO_SPAWN_THRESHOLD,
|
||||
completed, n_total,
|
||||
&mut activation,
|
||||
&mut cpu_sample,
|
||||
&mut io_sample,
|
||||
completed,
|
||||
n_total,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Dormant workers exit when activate_tx closes.
|
||||
drop(activate_tx);
|
||||
// Dormant workers exit once every sender for their node's channel
|
||||
// is dropped — `activate_txs` holds the only ones.
|
||||
drop(activate_txs);
|
||||
// Timer thread exits when reset_tx closes.
|
||||
drop(reset_tx);
|
||||
});
|
||||
|
||||
match first_err {
|
||||
Some(e) => Err(e),
|
||||
None => Ok(()),
|
||||
None => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -344,28 +385,113 @@ enum WorkerEvent<R, E> {
|
||||
TimerTick,
|
||||
}
|
||||
|
||||
/// Tracks how many of each node's dormant workers have been woken, and
|
||||
/// grows every node by the same amount at each step (capped by that node's
|
||||
/// remaining dormant workers and by the run's total budget) so load stays
|
||||
/// balanced across nodes at every point in time — never just "one more
|
||||
/// worker somewhere". Also remembers the size of the last real growth step
|
||||
/// (`last_step`), used to scale the CPU activation threshold to what that
|
||||
/// step could plausibly have contributed (see `maybe_activate`).
|
||||
struct NodeActivation<'a> {
|
||||
txs: &'a [crossbeam_channel::Sender<()>],
|
||||
caps: &'a [usize],
|
||||
active: Vec<usize>,
|
||||
total: usize,
|
||||
max: usize,
|
||||
last_step: usize,
|
||||
}
|
||||
|
||||
impl<'a> NodeActivation<'a> {
|
||||
fn new(txs: &'a [crossbeam_channel::Sender<()>], caps: &'a [usize], max: usize) -> Self {
|
||||
Self {
|
||||
txs,
|
||||
caps,
|
||||
active: vec![0; txs.len()],
|
||||
total: 0,
|
||||
max,
|
||||
last_step: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn total(&self) -> usize {
|
||||
self.total
|
||||
}
|
||||
fn last_step(&self) -> usize {
|
||||
self.last_step
|
||||
}
|
||||
fn max(&self) -> usize {
|
||||
self.max
|
||||
}
|
||||
fn is_full(&self) -> bool {
|
||||
self.total >= self.max
|
||||
}
|
||||
|
||||
/// Wake up to `(node_cap / divisor).max(1)` dormant workers on every
|
||||
/// node, capped by `n_total`. Called once at startup, unconditionally.
|
||||
fn activate_initial(&mut self, divisor: usize, n_total: usize) {
|
||||
self.grow(divisor, n_total);
|
||||
}
|
||||
|
||||
/// Same per-node sizing as [`activate_initial`](Self::activate_initial),
|
||||
/// applied as a growth step. Returns the number of workers actually
|
||||
/// activated (may be less than requested once a node or the total
|
||||
/// budget is exhausted). Updates `last_step` when it actually grew.
|
||||
fn grow(&mut self, divisor: usize, n_total: usize) -> usize {
|
||||
let before = self.total;
|
||||
for idx in 0..self.txs.len() {
|
||||
let wanted = (self.caps[idx] / divisor).max(1);
|
||||
let room = self.caps[idx].saturating_sub(self.active[idx]);
|
||||
let grow = wanted.min(room).min(n_total.saturating_sub(self.total));
|
||||
for _ in 0..grow {
|
||||
self.txs[idx].send(()).ok();
|
||||
}
|
||||
self.active[idx] += grow;
|
||||
self.total += grow;
|
||||
}
|
||||
let grew = self.total - before;
|
||||
if grew > 0 {
|
||||
self.last_step = grew;
|
||||
}
|
||||
grew
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_activate(
|
||||
activate_tx: &crossbeam_channel::Sender<()>,
|
||||
n_active: &mut usize,
|
||||
max_workers: usize,
|
||||
cpu_sample: &mut CpuSample,
|
||||
cpu_threshold: f64,
|
||||
io_sample: &mut IoSample,
|
||||
io_threshold: f64,
|
||||
completed: usize,
|
||||
n_total: usize,
|
||||
activation: &mut NodeActivation,
|
||||
cpu_sample: &mut CpuSample,
|
||||
io_sample: &mut IoSample,
|
||||
completed: usize,
|
||||
n_total: usize,
|
||||
) {
|
||||
if *n_active >= max_workers || completed >= n_total { return; }
|
||||
if activation.is_full() || completed >= n_total {
|
||||
return;
|
||||
}
|
||||
|
||||
// Expect roughly 1 core of extra efficiency per worker activated in the
|
||||
// last growth step (CPU-bound case); require at least CPU_SPAWN_THRESHOLD
|
||||
// (20 %) of that expected gain before growing again. Scaling by the last
|
||||
// step's size — not the cumulative total — keeps the bar meaningful
|
||||
// regardless of how many workers are already active: growing by 8 should
|
||||
// always take ~+1.6 cores to confirm, whether that's the 2nd growth step
|
||||
// or the 20th.
|
||||
let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
|
||||
|
||||
// Call both unconditionally (no `||` short-circuit): each sampler must
|
||||
// advance its own window every tick, regardless of what the other one
|
||||
// reports, or it would starve behind whichever signal fires first.
|
||||
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
|
||||
let io_wants_more = io_sample.do_i_activate(io_threshold);
|
||||
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD * activation.last_step() as f64);
|
||||
if !(cpu_wants_more || io_wants_more) {
|
||||
return;
|
||||
}
|
||||
|
||||
if cpu_wants_more || io_wants_more {
|
||||
activate_tx.send(()).ok();
|
||||
*n_active += 1;
|
||||
debug!("activated worker {}/{}", n_active, max_workers);
|
||||
let grew = activation.grow(GROWTH_DIVISOR, n_total);
|
||||
if grew > 0 {
|
||||
debug!(
|
||||
"activated {} worker(s) — {}/{} active",
|
||||
grew,
|
||||
activation.total(),
|
||||
activation.max()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user