Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cd2f2f9417 | |||
| 7844239a8e | |||
| 2b37e8aac4 | |||
| 67b4e4da53 | |||
| 66ab4c6db1 | |||
| f84dd539bf | |||
| 6378734e1c | |||
| b3a617cce1 | |||
| 2080e5e8a9 |
@@ -15,6 +15,7 @@ benchmark/simulated_data
|
|||||||
benchmark/specimen_index_presence
|
benchmark/specimen_index_presence
|
||||||
benchmark/specimen_index_count
|
benchmark/specimen_index_count
|
||||||
benchmark/global_index_presence
|
benchmark/global_index_presence
|
||||||
|
benchmark/all_specific
|
||||||
benchmark/global_index_count
|
benchmark/global_index_count
|
||||||
benchmark/stats
|
benchmark/stats
|
||||||
benchmark/reference_index
|
benchmark/reference_index
|
||||||
|
|||||||
@@ -162,14 +162,158 @@ A single `PartitionRunner` instance can be built once per command invocation
|
|||||||
and reused across multiple `run()` calls (e.g. `merge` runs
|
and reused across multiple `run()` calls (e.g. `merge` runs
|
||||||
`merge_partitions` then `pack_matrices`).
|
`merge_partitions` then `pack_matrices`).
|
||||||
|
|
||||||
|
## Known issue: CPU-only activation signal stalls on I/O-bound stages
|
||||||
|
|
||||||
|
Observed on a real `filter` run (109 genomes, 256 partitions, 8×24-core NUMA):
|
||||||
|
`rebuild` (CPU-bound — k-mer construction) scales cleanly from 9 to 43 active
|
||||||
|
workers as `CpuSample::do_i_activate` (`obisys::lib.rs`) sees efficiency climb.
|
||||||
|
`pack_matrices` (I/O-bound — reopens and recomposes per-genome column files
|
||||||
|
into `.pbmx`/`.pcmx`) activates one extra worker then flatlines at 10/192 for
|
||||||
|
the rest of the stage, even though 256 partitions keep completing over several
|
||||||
|
minutes. This matches the documented intent (§ Adaptive mechanism — "avoids
|
||||||
|
over-provisioning ... I/O-bound ... workloads") but conflates two different
|
||||||
|
things: *"CPU is not the bottleneck"* and *"more workers would not help"*. On
|
||||||
|
storage with real queue depth (NVMe, RAID, parallel FS) the second stage could
|
||||||
|
still benefit from more concurrent workers even with flat CPU usage — a signal
|
||||||
|
the current mechanism cannot see.
|
||||||
|
|
||||||
|
A one-off artefact was also found in the same log: right after a stage
|
||||||
|
transition, `do_i_activate` produced a physically impossible spike (efficiency
|
||||||
|
~94 cores on a 192-core box) because it has no minimum-window guard — unlike
|
||||||
|
its sibling `cpu_efficiency`, which returns `0.0` if `wall < 0.1s`
|
||||||
|
(`obisys::lib.rs:260`). `do_i_activate` unconditionally overwrites
|
||||||
|
`self.wall`/`self.user_secs`/`self.sys_secs` even when the elapsed window is
|
||||||
|
too short to be meaningful, so a burst of rapid completions right after
|
||||||
|
activating a worker can divide a real CPU delta by a near-zero wall delta.
|
||||||
|
|
||||||
|
### Implemented: I/O signal + shared debounce guard
|
||||||
|
|
||||||
|
`IoSample` (`obisys::lib.rs`, alongside `CpuSample`) is fed by
|
||||||
|
`read_bytes`/`write_bytes` from `/proc/self/io` on Linux (actual bytes
|
||||||
|
submitted to the block layer — not `rchar`/`wchar`, which also count
|
||||||
|
page-cache hits, and not `ru_inblock`/`ru_oublock`, unreliable on macOS), with
|
||||||
|
a `proc_pid_rusage(RUSAGE_INFO_V4)` fallback on macOS
|
||||||
|
(`ri_diskio_bytesread`/`ri_diskio_byteswritten`, FFI only via `libc`, no new
|
||||||
|
dependency — same pattern as the existing `getrusage` bindings). Any other
|
||||||
|
target degrades gracefully to a signal that never triggers (falls back to
|
||||||
|
CPU-only activation), same pattern as `cgroup_v2_available`.
|
||||||
|
|
||||||
|
`maybe_activate` (`numa.rs`) activates a worker if *either* signal still shows
|
||||||
|
headroom, making `PartitionRunner` adapt to whichever resource is actually the
|
||||||
|
bottleneck without per-call configuration. Both samplers are called
|
||||||
|
unconditionally — no `||` short-circuit — so neither window starves behind
|
||||||
|
whichever signal fires first:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
|
||||||
|
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
|
||||||
|
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD);
|
||||||
|
if cpu_wants_more || io_wants_more {
|
||||||
|
activation.grow(GROWTH_DIVISOR, n_total);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The CPU threshold is *not* the flat absolute delta it started as: it scales
|
||||||
|
with `activation.last_step()` — the number of workers activated in the last
|
||||||
|
growth step, tracked by `NodeActivation` (`numa.rs`) and updated every time
|
||||||
|
`grow()` actually grows something. Growing by 8 workers should add ~8 cores of
|
||||||
|
efficiency if the workload is truly CPU-bound; requiring only
|
||||||
|
`CPU_SPAWN_THRESHOLD` (20 %) of that expected gain confirms the growth was
|
||||||
|
useful without demanding perfect linear scaling. Scaling by the *last step's
|
||||||
|
size* rather than the cumulative total keeps the bar equally meaningful
|
||||||
|
whether it's the 2nd growth step or the 20th — a flat absolute threshold
|
||||||
|
(0.2 core) is a strong signal at 8 active workers but pure noise at 150; a
|
||||||
|
threshold scaled by the *cumulative* total instead (considered and rejected)
|
||||||
|
would have made the bar essentially impossible to clear late in the ramp,
|
||||||
|
strangling exactly the CPU-bound saturation the mechanism exists to allow.
|
||||||
|
|
||||||
|
Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit),
|
||||||
|
raw I/O throughput has no natural scale across devices, so `IoSample` uses a
|
||||||
|
**relative** growth threshold instead of an absolute one:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
||||||
|
let elapsed = self.wall.elapsed().as_secs_f64();
|
||||||
|
if elapsed < 0.1 { return false; } // state untouched — window keeps accumulating
|
||||||
|
|
||||||
|
let n = Self::read_bytes();
|
||||||
|
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
|
||||||
|
let activate = if self.previous_rate == 0.0 {
|
||||||
|
rate > 0.0 // bootstrap: any measured throughput is signal
|
||||||
|
} else {
|
||||||
|
(rate - self.previous_rate) / self.previous_rate >= threshold
|
||||||
|
};
|
||||||
|
|
||||||
|
self.bytes = n;
|
||||||
|
self.wall = Instant::now(); // reset only on a real sample
|
||||||
|
activate
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The `elapsed < 0.1s → return false without mutating state` guard was also
|
||||||
|
back-ported into `CpuSample::do_i_activate` (previously missing — source of
|
||||||
|
the ~94-core artefact above) — one fix for both problems, and it removes the
|
||||||
|
need for any arbitrary I/O-rate floor: a short/noisy window is rejected
|
||||||
|
outright rather than papered over with a hardware-dependent constant.
|
||||||
|
|
||||||
|
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, module-level
|
||||||
|
`const` in `numa.rs`, both `0.2`) are a starting point, not a derived value:
|
||||||
|
`0.2` (20 % relative growth) for `IoSample` was chosen to match the CPU
|
||||||
|
threshold's *implicit* relative sensitivity (in the observed log, an 8→9
|
||||||
|
worker step raised efficiency by ~12 %) — but I/O throughput is lumpier than
|
||||||
|
CPU time (buffered writes flush in bursts), so it needs empirical validation
|
||||||
|
against a real `pack` run before being considered final.
|
||||||
|
|
||||||
|
## Known issue: ramp-up too slow, and confused with node count
|
||||||
|
|
||||||
|
The original design started `n_nodes` workers (one per node) and grew one
|
||||||
|
worker at a time. On a real `filter` run this took ~10 minutes to climb from
|
||||||
|
9 to ~40 active workers even on the CPU-bound `rebuild` stage — most of a
|
||||||
|
35-minute stage spent under-provisioned while waiting for evidence to
|
||||||
|
accumulate one worker at a time. There is no scale-down mechanism (`n_active`
|
||||||
|
only grows), so the original caution was deliberate — but a quarter of
|
||||||
|
available cores is still far from saturation, and the real risk zone (over-provisioning
|
||||||
|
a memory-bandwidth-bound stage) only shows up much later in the ramp, near
|
||||||
|
full occupancy — not at 25 %.
|
||||||
|
|
||||||
|
The fix decouples ramp speed from node *count*: both the initial size and the
|
||||||
|
growth step are a fraction of `workers_per_node` (node *size*), applied
|
||||||
|
identically on every node. A single-NUMA-node (UMA) machine ramps exactly as
|
||||||
|
fast as an 8-node one — growing by `n_nodes` per step, as first considered,
|
||||||
|
would have degenerated to "grow by 1" on UMA, reproducing the original
|
||||||
|
problem for exactly the machines that need the fix most.
|
||||||
|
|
||||||
|
```rust
|
||||||
|
// NodeActivation::grow — called both at startup (activate_initial) and on
|
||||||
|
// every CPU/IO-triggered growth step, with a different divisor each time.
|
||||||
|
let wanted = (self.caps[idx] / divisor).max(1); // INITIAL_DIVISOR=4 at startup, GROWTH_DIVISOR=8 per step
|
||||||
|
let room = self.caps[idx].saturating_sub(self.active[idx]);
|
||||||
|
let grow = wanted.min(room).min(n_total.saturating_sub(self.total));
|
||||||
|
```
|
||||||
|
|
||||||
|
This also fixed a latent correctness gap: the original single shared
|
||||||
|
`activate_tx`/`activate_rx` pair had *no* per-node addressing — sending one
|
||||||
|
activation signal woke up whichever dormant worker (from any node) happened
|
||||||
|
to win the race on that channel. `crossbeam_channel` gives no fairness
|
||||||
|
guarantee across competing receivers, so "round-robin across nodes" was an
|
||||||
|
assumption the code never actually enforced. `PartitionRunner::run` now opens
|
||||||
|
one activation channel per node (`activate_txs`/`activate_rxs`, one pair per
|
||||||
|
`NodeConfig`); `NodeActivation` (`numa.rs`) tracks how many of each node's
|
||||||
|
dormant workers have been woken and grows every node by the same amount per
|
||||||
|
step, capped by that node's remaining dormant workers and by the run's total
|
||||||
|
budget (`n_total`) — balance across nodes is now guaranteed by construction,
|
||||||
|
not incidental to channel implementation details.
|
||||||
|
|
||||||
## Open questions
|
## Open questions
|
||||||
|
|
||||||
- **Error handling**: `run` currently returns the first error; remaining errors
|
- **Error handling**: `run` currently returns the first error; remaining errors
|
||||||
are dropped. A `Vec<E>` return would give complete diagnostics.
|
are dropped. A `Vec<E>` return would give complete diagnostics.
|
||||||
|
|
||||||
- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
|
- **`INITIAL_DIVISOR` / `GROWTH_DIVISOR` tuning**: currently `4` and `8`
|
||||||
for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from
|
(start at 1/4 of a node's cores, grow by 1/8 per step), chosen to fix an
|
||||||
a higher value. A per-call override could be added to the API.
|
observed too-slow ramp — not yet validated against a real `pack` (I/O-bound)
|
||||||
|
run, where over-provisioning risk is different from the CPU-bound `rebuild`
|
||||||
|
case this was tuned against.
|
||||||
|
|
||||||
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
|
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
|
||||||
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
|
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
|
||||||
|
|||||||
Generated
+1
-1
@@ -1704,7 +1704,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "obikmer"
|
name = "obikmer"
|
||||||
version = "1.1.30"
|
version = "1.1.35"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"clap",
|
"clap",
|
||||||
"csv",
|
"csv",
|
||||||
|
|||||||
Binary file not shown.
@@ -1,5 +1,5 @@
|
|||||||
use std::fs::{self, File};
|
use std::fs::{self, File};
|
||||||
use std::io::{self, BufWriter, Write as _};
|
use std::io::{self, BufWriter, Read as _, Write as _};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
use memmap2::Mmap;
|
use memmap2::Mmap;
|
||||||
@@ -171,19 +171,43 @@ impl PackedBitMatrix {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Reads just the `n_cols` field from an existing packed matrix's header,
|
||||||
|
/// without mapping the file. Used by `pack_bit_matrix` to tell a genuinely
|
||||||
|
/// complete pack from a stale one that predates a later column-widening.
|
||||||
|
fn packed_bit_matrix_n_cols(path: &Path) -> io::Result<usize> {
|
||||||
|
let mut f = File::open(path)?;
|
||||||
|
let mut header = [0u8; PBMX_HEADER];
|
||||||
|
f.read_exact(&mut header)?;
|
||||||
|
Ok(u64::from_le_bytes(header[16..24].try_into().unwrap()) as usize)
|
||||||
|
}
|
||||||
|
|
||||||
/// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files.
|
/// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files.
|
||||||
pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> {
|
pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> {
|
||||||
let packed_path = dir.join("matrix.pbmx");
|
let packed_path = dir.join("matrix.pbmx");
|
||||||
if packed_path.exists() {
|
|
||||||
// Matrix complete; remove any leftover column files from a killed cleanup.
|
let meta = match MatrixMeta::load(dir) {
|
||||||
if let Ok(meta) = MatrixMeta::load(dir) {
|
Ok(meta) => meta,
|
||||||
|
Err(e) => {
|
||||||
|
// No columnar data pending: either this layer was already
|
||||||
|
// packed and cleaned up (matrix.pbmx complete, nothing left to
|
||||||
|
// do), or genuinely nothing was ever written here.
|
||||||
|
return if packed_path.exists() { Ok(()) } else { Err(e) };
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// A `matrix.pbmx` can already exist here even though columnar data is
|
||||||
|
// still pending — e.g. copied verbatim from a merge's base source
|
||||||
|
// before this layer was widened with more genome columns (see
|
||||||
|
// `obikpartitionner::merge_partition`). Only skip (re-)packing if the
|
||||||
|
// existing file already reflects the current column count; otherwise
|
||||||
|
// the columnar files are newer and must be (re-)packed, overwriting the
|
||||||
|
// stale one — never silently discarded as "leftover cleanup".
|
||||||
|
if packed_bit_matrix_n_cols(&packed_path).ok() == Some(meta.n_cols) {
|
||||||
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
|
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
|
||||||
let _ = fs::remove_file(dir.join("meta.json"));
|
let _ = fs::remove_file(dir.join("meta.json"));
|
||||||
}
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let meta = MatrixMeta::load(dir)?;
|
|
||||||
let n_cols = meta.n_cols;
|
let n_cols = meta.n_cols;
|
||||||
|
|
||||||
// Compute offsets from file sizes — no column data loaded into RAM.
|
// Compute offsets from file sizes — no column data loaded into RAM.
|
||||||
@@ -500,17 +524,26 @@ where T: Clone + Default {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Compute a symmetric `n×n` matrix in parallel by evaluating `f(i,j)` for
|
/// Compute a symmetric `n×n` matrix in parallel by evaluating `f(i,j)` for
|
||||||
/// all upper-triangle pairs. `T: Copy` avoids the `.clone()` needed for the
|
/// all upper-triangle pairs, plus `f(i,i)` for the diagonal. `T: Copy` avoids
|
||||||
/// lower-triangle mirror.
|
/// the `.clone()` needed for the lower-triangle mirror.
|
||||||
|
///
|
||||||
|
/// The diagonal is *not* generally `T::default()`: for a self-comparison,
|
||||||
|
/// `f(i,i)` is often the column's own weight (e.g. intersection-with-self —
|
||||||
|
/// see `pairwise2_matrix`), not zero. Distance finalisations that need a
|
||||||
|
/// zero diagonal (self-distance) already overwrite it explicitly.
|
||||||
pub(crate) fn pairwise_matrix<T>(n: usize, f: impl Fn(usize, usize) -> T + Sync) -> Array2<T>
|
pub(crate) fn pairwise_matrix<T>(n: usize, f: impl Fn(usize, usize) -> T + Sync) -> Array2<T>
|
||||||
where T: Copy + Default + Send {
|
where T: Copy + Default + Send {
|
||||||
let results: Vec<(usize, usize, T)> = upper_pairs(n)
|
let results: Vec<(usize, usize, T)> = upper_pairs(n)
|
||||||
.into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect();
|
.into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect();
|
||||||
fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v)))
|
let mut m = fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v)));
|
||||||
|
for i in 0..n { m[[i, i]] = f(i, i); }
|
||||||
|
m
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Same as `pairwise_matrix` but `f` returns two values that fill two
|
/// Same as `pairwise_matrix` but `f` returns two values that fill two
|
||||||
/// symmetric matrices simultaneously (e.g. intersection + union for Jaccard).
|
/// symmetric matrices simultaneously (e.g. intersection + union for Jaccard).
|
||||||
|
/// The diagonal is `f(i,i)` (e.g. a genome's kmer count intersected with
|
||||||
|
/// itself), not `T::default()` — see `pairwise_matrix` for why that matters.
|
||||||
pub(crate) fn pairwise2_matrix<T>(n: usize, f: impl Fn(usize, usize) -> (T, T) + Sync) -> (Array2<T>, Array2<T>)
|
pub(crate) fn pairwise2_matrix<T>(n: usize, f: impl Fn(usize, usize) -> (T, T) + Sync) -> (Array2<T>, Array2<T>)
|
||||||
where T: Copy + Default + Send {
|
where T: Copy + Default + Send {
|
||||||
let results: Vec<(usize, usize, T, T)> = upper_pairs(n)
|
let results: Vec<(usize, usize, T, T)> = upper_pairs(n)
|
||||||
@@ -523,5 +556,10 @@ where T: Copy + Default + Send {
|
|||||||
m0[[i, j]] = a; m0[[j, i]] = a;
|
m0[[i, j]] = a; m0[[j, i]] = a;
|
||||||
m1[[i, j]] = b; m1[[j, i]] = b;
|
m1[[i, j]] = b; m1[[j, i]] = b;
|
||||||
}
|
}
|
||||||
|
for i in 0..n {
|
||||||
|
let (a, b) = f(i, i);
|
||||||
|
m0[[i, i]] = a;
|
||||||
|
m1[[i, i]] = b;
|
||||||
|
}
|
||||||
(m0, m1)
|
(m0, m1)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use std::fs::{self, File};
|
use std::fs::{self, File};
|
||||||
use std::io::{self, BufWriter, Write as _};
|
use std::io::{self, BufWriter, Read as _, Write as _};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
use memmap2::Mmap;
|
use memmap2::Mmap;
|
||||||
@@ -228,17 +228,44 @@ impl PackedCompactIntMatrix {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Reads just the `n_cols` field from an existing packed matrix's header,
|
||||||
|
/// without mapping the file. Used by `pack_compact_int_matrix` to tell a
|
||||||
|
/// genuinely complete pack from a stale one that predates a later
|
||||||
|
/// column-widening.
|
||||||
|
fn packed_int_matrix_n_cols(path: &Path) -> io::Result<usize> {
|
||||||
|
let mut f = File::open(path)?;
|
||||||
|
let mut header = [0u8; PCMX_HEADER];
|
||||||
|
f.read_exact(&mut header)?;
|
||||||
|
Ok(u64::from_le_bytes(header[16..24].try_into().unwrap()) as usize)
|
||||||
|
}
|
||||||
|
|
||||||
/// Build `counts/matrix.pcmx` from existing `col_*.pciv` files.
|
/// Build `counts/matrix.pcmx` from existing `col_*.pciv` files.
|
||||||
pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> {
|
pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> {
|
||||||
let packed_path = dir.join("matrix.pcmx");
|
let packed_path = dir.join("matrix.pcmx");
|
||||||
if packed_path.exists() {
|
|
||||||
if let Ok(meta) = MatrixMeta::load(dir) {
|
let meta = match MatrixMeta::load(dir) {
|
||||||
|
Ok(meta) => meta,
|
||||||
|
Err(e) => {
|
||||||
|
// No columnar data pending: either this layer was already
|
||||||
|
// packed and cleaned up (matrix.pcmx complete, nothing left to
|
||||||
|
// do), or genuinely nothing was ever written here.
|
||||||
|
return if packed_path.exists() { Ok(()) } else { Err(e) };
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// A `matrix.pcmx` can already exist here even though columnar data is
|
||||||
|
// still pending — e.g. copied verbatim from a merge's base source
|
||||||
|
// before this layer was widened with more genome columns (see
|
||||||
|
// `obikpartitionner::merge_partition`). Only skip (re-)packing if the
|
||||||
|
// existing file already reflects the current column count; otherwise
|
||||||
|
// the columnar files are newer and must be (re-)packed, overwriting the
|
||||||
|
// stale one — never silently discarded as "leftover cleanup".
|
||||||
|
if packed_int_matrix_n_cols(&packed_path).ok() == Some(meta.n_cols) {
|
||||||
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
|
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
|
||||||
let _ = fs::remove_file(dir.join("meta.json"));
|
let _ = fs::remove_file(dir.join("meta.json"));
|
||||||
}
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
let meta = MatrixMeta::load(dir)?;
|
|
||||||
let n_cols = meta.n_cols;
|
let n_cols = meta.n_cols;
|
||||||
let col_sizes: Vec<u64> = (0..n_cols)
|
let col_sizes: Vec<u64> = (0..n_cols)
|
||||||
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
|
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
|
||||||
|
|||||||
+194
-51
@@ -20,7 +20,7 @@ use hwlocality::cpu::binding::CpuBindingFlags;
|
|||||||
use hwlocality::cpu::cpuset::CpuSet;
|
use hwlocality::cpu::cpuset::CpuSet;
|
||||||
#[cfg(feature = "numa")]
|
#[cfg(feature = "numa")]
|
||||||
use hwlocality::object::types::ObjectType;
|
use hwlocality::object::types::ObjectType;
|
||||||
use obisys::CpuSample;
|
use obisys::{CpuSample, IoSample};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
// ── Public interface ──────────────────────────────────────────────────────────
|
// ── Public interface ──────────────────────────────────────────────────────────
|
||||||
@@ -70,7 +70,10 @@ pub fn build() -> NumaSetup {
|
|||||||
nodes.len(),
|
nodes.len(),
|
||||||
nodes.first().map_or(0, |v| v.len()),
|
nodes.first().map_or(0, |v| v.len()),
|
||||||
);
|
);
|
||||||
return NumaSetup { pools, cpus_per_node: nodes };
|
return NumaSetup {
|
||||||
|
pools,
|
||||||
|
cpus_per_node: nodes,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -102,7 +105,9 @@ pub fn build() -> NumaSetup {
|
|||||||
/// Silently returns on any error so the thread still runs, just unbound.
|
/// Silently returns on any error so the thread still runs, just unbound.
|
||||||
#[cfg(feature = "numa")]
|
#[cfg(feature = "numa")]
|
||||||
pub fn pin_current_thread(cpu_indices: &[usize]) {
|
pub fn pin_current_thread(cpu_indices: &[usize]) {
|
||||||
let Ok(topology) = Topology::new() else { return };
|
let Ok(topology) = Topology::new() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
let mut cpuset = CpuSet::new();
|
let mut cpuset = CpuSet::new();
|
||||||
for &idx in cpu_indices {
|
for &idx in cpu_indices {
|
||||||
cpuset.set(idx);
|
cpuset.set(idx);
|
||||||
@@ -132,7 +137,22 @@ fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
|
|||||||
.ok()
|
.ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── PartitionRunner ───────────────────────────────────────────────────────────
|
// ── PartitionRunner ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Growth step (fraction of a node's worker capacity added per activation
|
||||||
|
/// event, see [`NodeActivation::grow`]).
|
||||||
|
const GROWTH_DIVISOR: usize = 8;
|
||||||
|
/// Minimum CPU efficiency growth to activate more workers, as a fraction of
|
||||||
|
/// the size of the *last growth step* (e.g. `0.2` after adding 8 workers
|
||||||
|
/// requires the next check to show at least +1.6 cores of growth — 20 % of
|
||||||
|
/// the ~8 cores those 8 workers should contribute if the workload is truly
|
||||||
|
/// CPU-bound). Scaling by the last step's size — not the cumulative total —
|
||||||
|
/// keeps the bar meaningful regardless of how many workers are already
|
||||||
|
/// active, instead of demanding an ever-larger absolute jump as the pool
|
||||||
|
/// grows.
|
||||||
|
const CPU_SPAWN_THRESHOLD: f64 = 0.2;
|
||||||
|
/// Minimum I/O throughput growth (relative) to activate more workers.
|
||||||
|
const IO_SPAWN_THRESHOLD: f64 = 0.2;
|
||||||
|
|
||||||
struct NodeConfig {
|
struct NodeConfig {
|
||||||
pool: Option<Arc<rayon::ThreadPool>>,
|
pool: Option<Arc<rayon::ThreadPool>>,
|
||||||
@@ -142,19 +162,23 @@ struct NodeConfig {
|
|||||||
|
|
||||||
/// Generic NUMA-aware runner for partition-level parallel work.
|
/// Generic NUMA-aware runner for partition-level parallel work.
|
||||||
///
|
///
|
||||||
/// Workers are distributed round-robin across NUMA nodes and pinned to their
|
/// Workers are distributed evenly across NUMA nodes and pinned to their
|
||||||
/// node's CPUs. UMA is the degenerate case: one node, no pinning.
|
/// node's CPUs. UMA is the degenerate case: one node, no pinning.
|
||||||
///
|
///
|
||||||
/// Workers are pre-spawned dormant and activated one by one as CPU efficiency
|
/// Workers are pre-spawned dormant, one activation channel per node so
|
||||||
/// falls below `SPAWN_THRESHOLD`. This avoids over-provisioning on I/O-bound
|
/// growth always targets a specific node rather than whichever dormant
|
||||||
/// or memory-bandwidth-bound workloads while saturating CPU-bound ones.
|
/// worker happens to wake up first on a shared channel. Growth (both the
|
||||||
|
/// initial count and each subsequent step) is expressed as a fraction of
|
||||||
|
/// `workers_per_node`, applied identically to every node, so the pace of
|
||||||
|
/// ramp-up depends on node size rather than node count — a single-NUMA-node
|
||||||
|
/// (UMA) machine ramps just as fast as an 8-node one.
|
||||||
///
|
///
|
||||||
/// # Termination
|
/// # Termination
|
||||||
///
|
///
|
||||||
/// ```text
|
/// ```text
|
||||||
/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
|
/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
|
||||||
/// drop(result_tx) → result_rx closes → controller loop exits
|
/// drop(result_tx) → result_rx closes → controller loop exits
|
||||||
/// drop(activate_tx) → dormant workers exit cleanly
|
/// drop(activate_txs) → dormant workers exit cleanly
|
||||||
/// ```
|
/// ```
|
||||||
pub struct PartitionRunner {
|
pub struct PartitionRunner {
|
||||||
nodes: Vec<NodeConfig>,
|
nodes: Vec<NodeConfig>,
|
||||||
@@ -175,7 +199,8 @@ impl PartitionRunner {
|
|||||||
ns.pools.len(),
|
ns.pools.len(),
|
||||||
wpn,
|
wpn,
|
||||||
);
|
);
|
||||||
let nodes = ns.pools
|
let nodes = ns
|
||||||
|
.pools
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.zip(ns.cpus_per_node)
|
.zip(ns.cpus_per_node)
|
||||||
.map(|(pool, cpu_ids)| NodeConfig {
|
.map(|(pool, cpu_ids)| NodeConfig {
|
||||||
@@ -189,23 +214,24 @@ impl PartitionRunner {
|
|||||||
|
|
||||||
/// Run `f(i)` for every index in `order`.
|
/// Run `f(i)` for every index in `order`.
|
||||||
///
|
///
|
||||||
/// Workers are pre-spawned dormant and activated adaptively. A timer thread
|
/// Workers are pre-spawned dormant and activated adaptively, per node:
|
||||||
/// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed
|
/// `(workers_per_node / INITIAL_DIVISOR).max(1)` are woken immediately on
|
||||||
/// partition resets that timer (forcing an immediate check) and also
|
/// every node, then `(workers_per_node / GROWTH_DIVISOR).max(1)` more per
|
||||||
/// triggers its own inline check. A new worker is activated whenever
|
/// node each time the check below fires. A timer thread fires that check
|
||||||
/// efficiency falls below `SPAWN_THRESHOLD`.
|
/// every `TIMER_SECS` seconds; each completed partition resets that timer
|
||||||
|
/// (forcing an immediate check) and also triggers its own inline check. A
|
||||||
|
/// growth step happens whenever CPU efficiency grows by at least
|
||||||
|
/// `CPU_SPAWN_THRESHOLD` of what the last growth step should have
|
||||||
|
/// contributed, or I/O throughput grows by at least `IO_SPAWN_THRESHOLD`
|
||||||
|
/// (relative) since the last check — whichever resource is the actual
|
||||||
|
/// bottleneck still shows headroom.
|
||||||
///
|
///
|
||||||
/// `on_done(i, result, elapsed)` is called from the controller thread as
|
/// `on_done(i, result, elapsed)` is called from the controller thread as
|
||||||
/// each partition completes — suitable for progress bars and result
|
/// each partition completes — suitable for progress bars and result
|
||||||
/// aggregation.
|
/// aggregation.
|
||||||
///
|
///
|
||||||
/// Returns the first error produced by `f`, if any.
|
/// Returns the first error produced by `f`, if any.
|
||||||
pub fn run<F, R, E, C>(
|
pub fn run<F, R, E, C>(&self, order: &[usize], f: F, mut on_done: C) -> Result<(), E>
|
||||||
&self,
|
|
||||||
order: &[usize],
|
|
||||||
f: F,
|
|
||||||
mut on_done: C,
|
|
||||||
) -> Result<(), E>
|
|
||||||
where
|
where
|
||||||
F: Fn(usize) -> Result<R, E> + Send + Sync,
|
F: Fn(usize) -> Result<R, E> + Send + Sync,
|
||||||
R: Send,
|
R: Send,
|
||||||
@@ -217,22 +243,28 @@ impl PartitionRunner {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
const SPAWN_THRESHOLD: f64 = 0.2;
|
|
||||||
const TIMER_SECS: u64 = 30;
|
const TIMER_SECS: u64 = 30;
|
||||||
|
const INITIAL_DIVISOR: usize = 4;
|
||||||
|
|
||||||
// ── Channels ──────────────────────────────────────────────────────────
|
// ── Channels ──────────────────────────────────────────────────────────
|
||||||
let (part_tx, part_rx) = unbounded::<usize>();
|
let (part_tx, part_rx) = unbounded::<usize>();
|
||||||
let (activate_tx, activate_rx) = unbounded::<()>();
|
|
||||||
// reset_tx: controller → timer ("reset the 30 s window")
|
// reset_tx: controller → timer ("reset the 30 s window")
|
||||||
let (reset_tx, reset_rx) = unbounded::<()>();
|
let (reset_tx, reset_rx) = unbounded::<()>();
|
||||||
// event_tx: workers + timer → controller (unified event stream)
|
// event_tx: workers + timer → controller (unified event stream)
|
||||||
let (event_tx, event_rx) = unbounded::<WorkerEvent<R, E>>();
|
let (event_tx, event_rx) = unbounded::<WorkerEvent<R, E>>();
|
||||||
|
// One activation channel per node: growth always targets a specific
|
||||||
|
// node, rather than whichever dormant worker happens to win the race
|
||||||
|
// on a channel shared across all nodes.
|
||||||
|
let (activate_txs, activate_rxs): (Vec<_>, Vec<_>) =
|
||||||
|
(0..self.nodes.len()).map(|_| unbounded::<()>()).unzip();
|
||||||
|
|
||||||
for &i in order { part_tx.send(i).ok(); }
|
for &i in order {
|
||||||
|
part_tx.send(i).ok();
|
||||||
|
}
|
||||||
drop(part_tx);
|
drop(part_tx);
|
||||||
|
|
||||||
let max_workers = self.max_workers();
|
let max_workers = self.max_workers();
|
||||||
let n_nodes = self.nodes.len();
|
let node_caps: Vec<usize> = self.nodes.iter().map(|n| n.max_workers).collect();
|
||||||
let f = &f;
|
let f = &f;
|
||||||
|
|
||||||
let mut first_err: Option<E> = None;
|
let mut first_err: Option<E> = None;
|
||||||
@@ -256,18 +288,23 @@ impl PartitionRunner {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// ── Pre-spawn workers dormant, round-robin across NUMA nodes ──────
|
// ── Pre-spawn workers dormant, grouped by node ────────────────────
|
||||||
for w in 0..max_workers {
|
// Each worker listens on its own node's activation channel only.
|
||||||
let node = &self.nodes[w % n_nodes];
|
for (node, arx) in self.nodes.iter().zip(activate_rxs.iter()) {
|
||||||
|
let cpu_ids = &node.cpu_ids;
|
||||||
|
for _ in 0..node.max_workers {
|
||||||
let prx = part_rx.clone();
|
let prx = part_rx.clone();
|
||||||
let etx = event_tx.clone();
|
let etx = event_tx.clone();
|
||||||
let arx = activate_rx.clone();
|
let arx = arx.clone();
|
||||||
let pool = node.pool.clone();
|
let pool = node.pool.clone();
|
||||||
let cpu_ids = &node.cpu_ids;
|
|
||||||
|
|
||||||
s.spawn(move || {
|
s.spawn(move || {
|
||||||
if arx.recv().is_err() { return; }
|
if arx.recv().is_err() {
|
||||||
if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); }
|
return;
|
||||||
|
}
|
||||||
|
if !cpu_ids.is_empty() {
|
||||||
|
pin_current_thread(cpu_ids);
|
||||||
|
}
|
||||||
for i in &prx {
|
for i in &prx {
|
||||||
let t = Instant::now();
|
let t = Instant::now();
|
||||||
let r = match &pool {
|
let r = match &pool {
|
||||||
@@ -278,15 +315,17 @@ impl PartitionRunner {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Drop controller's event_tx: event_rx closes when all workers +
|
// Drop controller's event_tx: event_rx closes when all workers +
|
||||||
// timer have exited.
|
// timer have exited.
|
||||||
drop(event_tx);
|
drop(event_tx);
|
||||||
|
|
||||||
// ── Controller ────────────────────────────────────────────────────
|
// ── Controller ────────────────────────────────────────────────────
|
||||||
let initial_workers = n_nodes.min(max_workers).min(n_total);
|
let mut activation = NodeActivation::new(&activate_txs, &node_caps, max_workers);
|
||||||
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
|
activation.activate_initial(INITIAL_DIVISOR, n_total);
|
||||||
let mut n_active = initial_workers;
|
|
||||||
let mut cpu_sample = CpuSample::now();
|
let mut cpu_sample = CpuSample::now();
|
||||||
|
let mut io_sample = IoSample::now();
|
||||||
let mut completed = 0usize;
|
let mut completed = 0usize;
|
||||||
|
|
||||||
while completed < n_total {
|
while completed < n_total {
|
||||||
@@ -295,28 +334,39 @@ impl PartitionRunner {
|
|||||||
WorkerEvent::Completed(i, r, dur) => {
|
WorkerEvent::Completed(i, r, dur) => {
|
||||||
match r {
|
match r {
|
||||||
Ok(v) => on_done(i, v, dur),
|
Ok(v) => on_done(i, v, dur),
|
||||||
Err(e) => { if first_err.is_none() { first_err = Some(e); } }
|
Err(e) => {
|
||||||
|
if first_err.is_none() {
|
||||||
|
first_err = Some(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
completed += 1;
|
completed += 1;
|
||||||
// Reset the 30 s timer.
|
// Reset the 30 s timer.
|
||||||
reset_tx.send(()).ok();
|
reset_tx.send(()).ok();
|
||||||
// Inline check: same logic as a timer tick.
|
// Inline check: same logic as a timer tick.
|
||||||
maybe_activate(
|
maybe_activate(
|
||||||
&activate_tx, &mut n_active, max_workers,
|
&mut activation,
|
||||||
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
|
&mut cpu_sample,
|
||||||
|
&mut io_sample,
|
||||||
|
completed,
|
||||||
|
n_total,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
WorkerEvent::TimerTick => {
|
WorkerEvent::TimerTick => {
|
||||||
maybe_activate(
|
maybe_activate(
|
||||||
&activate_tx, &mut n_active, max_workers,
|
&mut activation,
|
||||||
&mut cpu_sample, SPAWN_THRESHOLD, completed, n_total,
|
&mut cpu_sample,
|
||||||
|
&mut io_sample,
|
||||||
|
completed,
|
||||||
|
n_total,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dormant workers exit when activate_tx closes.
|
// Dormant workers exit once every sender for their node's channel
|
||||||
drop(activate_tx);
|
// is dropped — `activate_txs` holds the only ones.
|
||||||
|
drop(activate_txs);
|
||||||
// Timer thread exits when reset_tx closes.
|
// Timer thread exits when reset_tx closes.
|
||||||
drop(reset_tx);
|
drop(reset_tx);
|
||||||
});
|
});
|
||||||
@@ -335,20 +385,113 @@ enum WorkerEvent<R, E> {
|
|||||||
TimerTick,
|
TimerTick,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tracks how many of each node's dormant workers have been woken, and
|
||||||
|
/// grows every node by the same amount at each step (capped by that node's
|
||||||
|
/// remaining dormant workers and by the run's total budget) so load stays
|
||||||
|
/// balanced across nodes at every point in time — never just "one more
|
||||||
|
/// worker somewhere". Also remembers the size of the last real growth step
|
||||||
|
/// (`last_step`), used to scale the CPU activation threshold to what that
|
||||||
|
/// step could plausibly have contributed (see `maybe_activate`).
|
||||||
|
struct NodeActivation<'a> {
|
||||||
|
txs: &'a [crossbeam_channel::Sender<()>],
|
||||||
|
caps: &'a [usize],
|
||||||
|
active: Vec<usize>,
|
||||||
|
total: usize,
|
||||||
|
max: usize,
|
||||||
|
last_step: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> NodeActivation<'a> {
|
||||||
|
fn new(txs: &'a [crossbeam_channel::Sender<()>], caps: &'a [usize], max: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
txs,
|
||||||
|
caps,
|
||||||
|
active: vec![0; txs.len()],
|
||||||
|
total: 0,
|
||||||
|
max,
|
||||||
|
last_step: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn total(&self) -> usize {
|
||||||
|
self.total
|
||||||
|
}
|
||||||
|
fn last_step(&self) -> usize {
|
||||||
|
self.last_step
|
||||||
|
}
|
||||||
|
fn max(&self) -> usize {
|
||||||
|
self.max
|
||||||
|
}
|
||||||
|
fn is_full(&self) -> bool {
|
||||||
|
self.total >= self.max
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wake up to `(node_cap / divisor).max(1)` dormant workers on every
|
||||||
|
/// node, capped by `n_total`. Called once at startup, unconditionally.
|
||||||
|
fn activate_initial(&mut self, divisor: usize, n_total: usize) {
|
||||||
|
self.grow(divisor, n_total);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same per-node sizing as [`activate_initial`](Self::activate_initial),
|
||||||
|
/// applied as a growth step. Returns the number of workers actually
|
||||||
|
/// activated (may be less than requested once a node or the total
|
||||||
|
/// budget is exhausted). Updates `last_step` when it actually grew.
|
||||||
|
fn grow(&mut self, divisor: usize, n_total: usize) -> usize {
|
||||||
|
let before = self.total;
|
||||||
|
for idx in 0..self.txs.len() {
|
||||||
|
let wanted = (self.caps[idx] / divisor).max(1);
|
||||||
|
let room = self.caps[idx].saturating_sub(self.active[idx]);
|
||||||
|
let grow = wanted.min(room).min(n_total.saturating_sub(self.total));
|
||||||
|
for _ in 0..grow {
|
||||||
|
self.txs[idx].send(()).ok();
|
||||||
|
}
|
||||||
|
self.active[idx] += grow;
|
||||||
|
self.total += grow;
|
||||||
|
}
|
||||||
|
let grew = self.total - before;
|
||||||
|
if grew > 0 {
|
||||||
|
self.last_step = grew;
|
||||||
|
}
|
||||||
|
grew
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn maybe_activate(
|
fn maybe_activate(
|
||||||
activate_tx: &crossbeam_channel::Sender<()>,
|
activation: &mut NodeActivation,
|
||||||
n_active: &mut usize,
|
|
||||||
max_workers: usize,
|
|
||||||
cpu_sample: &mut CpuSample,
|
cpu_sample: &mut CpuSample,
|
||||||
threshold: f64,
|
io_sample: &mut IoSample,
|
||||||
completed: usize,
|
completed: usize,
|
||||||
n_total: usize,
|
n_total: usize,
|
||||||
) {
|
) {
|
||||||
if *n_active >= max_workers || completed >= n_total { return; }
|
if activation.is_full() || completed >= n_total {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if cpu_sample.do_i_activate(threshold) {
|
// Expect roughly 1 core of extra efficiency per worker activated in the
|
||||||
activate_tx.send(()).ok();
|
// last growth step (CPU-bound case); require at least CPU_SPAWN_THRESHOLD
|
||||||
*n_active += 1;
|
// (20 %) of that expected gain before growing again. Scaling by the last
|
||||||
debug!("activated worker {}/{}", n_active, max_workers);
|
// step's size — not the cumulative total — keeps the bar meaningful
|
||||||
|
// regardless of how many workers are already active: growing by 8 should
|
||||||
|
// always take ~+1.6 cores to confirm, whether that's the 2nd growth step
|
||||||
|
// or the 20th.
|
||||||
|
let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
|
||||||
|
|
||||||
|
// Call both unconditionally (no `||` short-circuit): each sampler must
|
||||||
|
// advance its own window every tick, regardless of what the other one
|
||||||
|
// reports, or it would starve behind whichever signal fires first.
|
||||||
|
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
|
||||||
|
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD * activation.last_step() as f64);
|
||||||
|
if !(cpu_wants_more || io_wants_more) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let grew = activation.grow(GROWTH_DIVISOR, n_total);
|
||||||
|
if grew > 0 {
|
||||||
|
debug!(
|
||||||
|
"activated {} worker(s) — {}/{} active",
|
||||||
|
grew,
|
||||||
|
activation.total(),
|
||||||
|
activation.max()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "obikmer"
|
name = "obikmer"
|
||||||
version = "1.1.30"
|
version = "1.1.35"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[[bin]]
|
[[bin]]
|
||||||
|
|||||||
@@ -96,162 +96,5 @@ impl<S: BitPartials> BitPartials for LayeredStore<S> {
|
|||||||
// ── Tests ─────────────────────────────────────────────────────────────────────
|
// ── Tests ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
#[path = "tests/layered_store.rs"]
|
||||||
use super::*;
|
mod tests;
|
||||||
use obicompactvec::{
|
|
||||||
PersistentBitMatrix, PersistentBitMatrixBuilder,
|
|
||||||
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder,
|
|
||||||
};
|
|
||||||
use tempfile::tempdir;
|
|
||||||
|
|
||||||
fn make_int_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) {
|
|
||||||
let n = cols.first().map_or(0, |c| c.len());
|
|
||||||
let dir = tempdir().unwrap();
|
|
||||||
let mut b = PersistentCompactIntMatrixBuilder::new(n, &dir.path().join("counts")).unwrap();
|
|
||||||
for &col in cols {
|
|
||||||
let mut cb = b.add_col().unwrap();
|
|
||||||
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
|
|
||||||
cb.close().unwrap();
|
|
||||||
}
|
|
||||||
b.close().unwrap();
|
|
||||||
let m = PersistentCompactIntMatrix::open(dir.path()).unwrap();
|
|
||||||
(dir, m)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_bit_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) {
|
|
||||||
let n = cols.first().map_or(0, |c| c.len());
|
|
||||||
let dir = tempdir().unwrap();
|
|
||||||
let mut b = PersistentBitMatrixBuilder::new(n, &dir.path().join("presence")).unwrap();
|
|
||||||
for &col in cols {
|
|
||||||
let mut cb = b.add_col().unwrap();
|
|
||||||
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
|
|
||||||
cb.close().unwrap();
|
|
||||||
}
|
|
||||||
b.close().unwrap();
|
|
||||||
let m = PersistentBitMatrix::open(dir.path()).unwrap();
|
|
||||||
(dir, m)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── ColumnWeights ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn col_weights_sums_across_layers() {
|
|
||||||
// layer 0: col0=[1,2], col1=[3,4] → weights [3, 7]
|
|
||||||
// layer 1: col0=[10,0], col1=[0,10] → weights [10, 10]
|
|
||||||
// combined: [13, 17]
|
|
||||||
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[3, 4]]);
|
|
||||||
let (_d1, m1) = make_int_matrix(&[&[10, 0], &[0, 10]]);
|
|
||||||
let store = LayeredStore::new(vec![m0, m1]);
|
|
||||||
let w = store.col_weights();
|
|
||||||
assert_eq!(w[0], 13);
|
|
||||||
assert_eq!(w[1], 17);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn col_weights_bit_sums_across_layers() {
|
|
||||||
// layer 0: col0=[T,F,T], col1=[F,T,T] → counts [2, 2]
|
|
||||||
// layer 1: col0=[F,F,T], col1=[T,T,F] → counts [1, 2]
|
|
||||||
// combined: [3, 4]
|
|
||||||
let (_d0, m0) = make_bit_matrix(&[&[true, false, true], &[false, true, true]]);
|
|
||||||
let (_d1, m1) = make_bit_matrix(&[&[false, false, true], &[true, true, false]]);
|
|
||||||
let store = LayeredStore::new(vec![m0, m1]);
|
|
||||||
let w = store.col_weights();
|
|
||||||
assert_eq!(w[0], 3);
|
|
||||||
assert_eq!(w[1], 4);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── CountPartials — layered (one partition) ───────────────────────────────
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn layered_bray_matches_combined() {
|
|
||||||
// Split [1,2,3,4,5] across two layers; bray dist should equal direct computation
|
|
||||||
// on [1,2,3,4,5] for each column pair.
|
|
||||||
// col0=[1,2,3,4,5], col1=[5,4,3,2,1]
|
|
||||||
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]); // slots 0-1
|
|
||||||
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]); // slots 2-4
|
|
||||||
let store = LayeredStore::new(vec![m0, m1]);
|
|
||||||
|
|
||||||
// direct on full data
|
|
||||||
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
|
|
||||||
let expected = CountPartials::bray_dist_matrix(&mf);
|
|
||||||
let got = CountPartials::bray_dist_matrix(&store);
|
|
||||||
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "bray [0,1]");
|
|
||||||
assert!((got[[1, 0]] - expected[[1, 0]]).abs() < 1e-12, "bray [1,0]");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn layered_relfreq_bray_matches_combined() {
|
|
||||||
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]);
|
|
||||||
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]);
|
|
||||||
let store = LayeredStore::new(vec![m0, m1]);
|
|
||||||
|
|
||||||
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
|
|
||||||
let expected = CountPartials::relfreq_bray_dist_matrix(&mf);
|
|
||||||
let got = CountPartials::relfreq_bray_dist_matrix(&store);
|
|
||||||
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "relfreq_bray [0,1]");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn layered_euclidean_matches_combined() {
|
|
||||||
let (_d0, m0) = make_int_matrix(&[&[3, 0], &[0, 4]]);
|
|
||||||
let (_d1, m1) = make_int_matrix(&[&[1, 1], &[2, 2]]);
|
|
||||||
let store = LayeredStore::new(vec![m0, m1]);
|
|
||||||
|
|
||||||
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 4, 2, 2]]);
|
|
||||||
let expected = CountPartials::euclidean_dist_matrix(&mf);
|
|
||||||
let got = CountPartials::euclidean_dist_matrix(&store);
|
|
||||||
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "euclidean [0,1]");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── CountPartials — partitioned (LayeredStore<LayeredStore<_>>) ───────────
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn partitioned_bray_matches_combined() {
|
|
||||||
// partition 0: slots [1,2,3,4,5] col0 vs col1
|
|
||||||
// partition 1: slots [10,20] col0 vs col1
|
|
||||||
let (_d0, p0) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
|
|
||||||
let (_d1, p1) = make_int_matrix(&[&[10, 20], &[20, 10]]);
|
|
||||||
|
|
||||||
let partitioned = LayeredStore::new(vec![
|
|
||||||
LayeredStore::new(vec![p0]),
|
|
||||||
LayeredStore::new(vec![p1]),
|
|
||||||
]);
|
|
||||||
|
|
||||||
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5, 10, 20], &[5, 4, 3, 2, 1, 20, 10]]);
|
|
||||||
let expected = CountPartials::bray_dist_matrix(&mf);
|
|
||||||
let got = CountPartials::bray_dist_matrix(&partitioned);
|
|
||||||
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "partitioned bray [0,1]");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── BitPartials ───────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn layered_jaccard_matches_combined() {
|
|
||||||
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
|
|
||||||
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[true, false]]);
|
|
||||||
let store = LayeredStore::new(vec![m0, m1]);
|
|
||||||
|
|
||||||
let (_df, mf) = make_bit_matrix(&[
|
|
||||||
&[true, false, true, true],
|
|
||||||
&[false, true, true, false],
|
|
||||||
]);
|
|
||||||
let expected = BitPartials::jaccard_dist_matrix(&mf);
|
|
||||||
let got = BitPartials::jaccard_dist_matrix(&store);
|
|
||||||
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "jaccard [0,1]");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn layered_hamming_matches_combined() {
|
|
||||||
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
|
|
||||||
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[false, false]]);
|
|
||||||
let store = LayeredStore::new(vec![m0, m1]);
|
|
||||||
|
|
||||||
let (_df, mf) = make_bit_matrix(&[
|
|
||||||
&[true, false, true, true],
|
|
||||||
&[false, true, false, false],
|
|
||||||
]);
|
|
||||||
let expected = BitPartials::hamming_dist_matrix(&mf);
|
|
||||||
let got = BitPartials::hamming_dist_matrix(&store);
|
|
||||||
assert_eq!(got[[0, 1]], expected[[0, 1]], "hamming [0,1]");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -0,0 +1,381 @@
|
|||||||
|
use super::*;
|
||||||
|
use obicompactvec::{
|
||||||
|
PersistentBitMatrix, PersistentBitMatrixBuilder,
|
||||||
|
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder,
|
||||||
|
};
|
||||||
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
fn make_int_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) {
|
||||||
|
let n = cols.first().map_or(0, |c| c.len());
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let mut b = PersistentCompactIntMatrixBuilder::new(n, &dir.path().join("counts")).unwrap();
|
||||||
|
for &col in cols {
|
||||||
|
let mut cb = b.add_col().unwrap();
|
||||||
|
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
|
||||||
|
cb.close().unwrap();
|
||||||
|
}
|
||||||
|
b.close().unwrap();
|
||||||
|
let m = PersistentCompactIntMatrix::open(dir.path()).unwrap();
|
||||||
|
(dir, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_bit_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) {
|
||||||
|
let n = cols.first().map_or(0, |c| c.len());
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let mut b = PersistentBitMatrixBuilder::new(n, &dir.path().join("presence")).unwrap();
|
||||||
|
for &col in cols {
|
||||||
|
let mut cb = b.add_col().unwrap();
|
||||||
|
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
|
||||||
|
cb.close().unwrap();
|
||||||
|
}
|
||||||
|
b.close().unwrap();
|
||||||
|
let m = PersistentBitMatrix::open(dir.path()).unwrap();
|
||||||
|
(dir, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── ColumnWeights ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn col_weights_sums_across_layers() {
|
||||||
|
// layer 0: col0=[1,2], col1=[3,4] → weights [3, 7]
|
||||||
|
// layer 1: col0=[10,0], col1=[0,10] → weights [10, 10]
|
||||||
|
// combined: [13, 17]
|
||||||
|
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[3, 4]]);
|
||||||
|
let (_d1, m1) = make_int_matrix(&[&[10, 0], &[0, 10]]);
|
||||||
|
let store = LayeredStore::new(vec![m0, m1]);
|
||||||
|
let w = store.col_weights();
|
||||||
|
assert_eq!(w[0], 13);
|
||||||
|
assert_eq!(w[1], 17);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn col_weights_bit_sums_across_layers() {
|
||||||
|
// layer 0: col0=[T,F,T], col1=[F,T,T] → counts [2, 2]
|
||||||
|
// layer 1: col0=[F,F,T], col1=[T,T,F] → counts [1, 2]
|
||||||
|
// combined: [3, 4]
|
||||||
|
let (_d0, m0) = make_bit_matrix(&[&[true, false, true], &[false, true, true]]);
|
||||||
|
let (_d1, m1) = make_bit_matrix(&[&[false, false, true], &[true, true, false]]);
|
||||||
|
let store = LayeredStore::new(vec![m0, m1]);
|
||||||
|
let w = store.col_weights();
|
||||||
|
assert_eq!(w[0], 3);
|
||||||
|
assert_eq!(w[1], 4);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── CountPartials — layered (one partition) ───────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn layered_bray_matches_combined() {
|
||||||
|
// Split [1,2,3,4,5] across two layers; bray dist should equal direct computation
|
||||||
|
// on [1,2,3,4,5] for each column pair.
|
||||||
|
// col0=[1,2,3,4,5], col1=[5,4,3,2,1]
|
||||||
|
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]); // slots 0-1
|
||||||
|
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]); // slots 2-4
|
||||||
|
let store = LayeredStore::new(vec![m0, m1]);
|
||||||
|
|
||||||
|
// direct on full data
|
||||||
|
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
|
||||||
|
let expected = CountPartials::bray_dist_matrix(&mf);
|
||||||
|
let got = CountPartials::bray_dist_matrix(&store);
|
||||||
|
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "bray [0,1]");
|
||||||
|
assert!((got[[1, 0]] - expected[[1, 0]]).abs() < 1e-12, "bray [1,0]");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn layered_relfreq_bray_matches_combined() {
|
||||||
|
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]);
|
||||||
|
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]);
|
||||||
|
let store = LayeredStore::new(vec![m0, m1]);
|
||||||
|
|
||||||
|
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
|
||||||
|
let expected = CountPartials::relfreq_bray_dist_matrix(&mf);
|
||||||
|
let got = CountPartials::relfreq_bray_dist_matrix(&store);
|
||||||
|
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "relfreq_bray [0,1]");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn layered_euclidean_matches_combined() {
|
||||||
|
let (_d0, m0) = make_int_matrix(&[&[3, 0], &[0, 4]]);
|
||||||
|
let (_d1, m1) = make_int_matrix(&[&[1, 1], &[2, 2]]);
|
||||||
|
let store = LayeredStore::new(vec![m0, m1]);
|
||||||
|
|
||||||
|
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 4, 2, 2]]);
|
||||||
|
let expected = CountPartials::euclidean_dist_matrix(&mf);
|
||||||
|
let got = CountPartials::euclidean_dist_matrix(&store);
|
||||||
|
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "euclidean [0,1]");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── CountPartials — partitioned (LayeredStore<LayeredStore<_>>) ───────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn partitioned_bray_matches_combined() {
|
||||||
|
// partition 0: slots [1,2,3,4,5] col0 vs col1
|
||||||
|
// partition 1: slots [10,20] col0 vs col1
|
||||||
|
let (_d0, p0) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
|
||||||
|
let (_d1, p1) = make_int_matrix(&[&[10, 20], &[20, 10]]);
|
||||||
|
|
||||||
|
let partitioned = LayeredStore::new(vec![
|
||||||
|
LayeredStore::new(vec![p0]),
|
||||||
|
LayeredStore::new(vec![p1]),
|
||||||
|
]);
|
||||||
|
|
||||||
|
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5, 10, 20], &[5, 4, 3, 2, 1, 20, 10]]);
|
||||||
|
let expected = CountPartials::bray_dist_matrix(&mf);
|
||||||
|
let got = CountPartials::bray_dist_matrix(&partitioned);
|
||||||
|
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "partitioned bray [0,1]");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn partitioned_threshold_jaccard_off_diagonal_is_pairwise() {
|
||||||
|
// 3 genomes, 2 partitions, 1 layer each — mirrors distance.rs's
|
||||||
|
// LayeredStore<LayeredStore<PersistentCompactIntMatrix>> shape.
|
||||||
|
// partition 0: col0=[3,0], col1=[0,3], col2=[3,3]
|
||||||
|
// partition 1: col0=[1,1], col1=[1,0], col2=[0,1]
|
||||||
|
let (_d0, p0) = make_int_matrix(&[&[3, 0], &[0, 3], &[3, 3]]);
|
||||||
|
let (_d1, p1) = make_int_matrix(&[&[1, 1], &[1, 0], &[0, 1]]);
|
||||||
|
|
||||||
|
let partitioned = LayeredStore::new(vec![
|
||||||
|
LayeredStore::new(vec![p0]),
|
||||||
|
LayeredStore::new(vec![p1]),
|
||||||
|
]);
|
||||||
|
|
||||||
|
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 3, 1, 0], &[3, 3, 0, 1]]);
|
||||||
|
let threshold = 1u32;
|
||||||
|
let (inter_p, union_p) = CountPartials::partial_threshold_jaccard(&partitioned, threshold);
|
||||||
|
let (inter_f, union_f) = CountPartials::partial_threshold_jaccard(&mf, threshold);
|
||||||
|
|
||||||
|
let n = 3;
|
||||||
|
for i in 0..n {
|
||||||
|
for j in 0..n {
|
||||||
|
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
|
||||||
|
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn partitioned_threshold_jaccard_packed_off_diagonal_is_pairwise() {
|
||||||
|
// Same as `partitioned_threshold_jaccard_off_diagonal_is_pairwise` but
|
||||||
|
// each partition matrix is packed into a single .pcmx file first —
|
||||||
|
// the on-disk format actually used in production after `pack_matrices`.
|
||||||
|
use obicompactvec::pack_compact_int_matrix;
|
||||||
|
|
||||||
|
let (d0, _p0) = make_int_matrix(&[&[3, 0], &[0, 3], &[3, 3]]);
|
||||||
|
pack_compact_int_matrix(&d0.path().join("counts")).unwrap();
|
||||||
|
let p0 = PersistentCompactIntMatrix::open(d0.path()).unwrap();
|
||||||
|
|
||||||
|
let (d1, _p1) = make_int_matrix(&[&[1, 1], &[1, 0], &[0, 1]]);
|
||||||
|
pack_compact_int_matrix(&d1.path().join("counts")).unwrap();
|
||||||
|
let p1 = PersistentCompactIntMatrix::open(d1.path()).unwrap();
|
||||||
|
|
||||||
|
let partitioned = LayeredStore::new(vec![
|
||||||
|
LayeredStore::new(vec![p0]),
|
||||||
|
LayeredStore::new(vec![p1]),
|
||||||
|
]);
|
||||||
|
|
||||||
|
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 3, 1, 0], &[3, 3, 0, 1]]);
|
||||||
|
let threshold = 1u32;
|
||||||
|
let (inter_p, union_p) = CountPartials::partial_threshold_jaccard(&partitioned, threshold);
|
||||||
|
let (inter_f, union_f) = CountPartials::partial_threshold_jaccard(&mf, threshold);
|
||||||
|
|
||||||
|
let n = 3;
|
||||||
|
for i in 0..n {
|
||||||
|
for j in 0..n {
|
||||||
|
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
|
||||||
|
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn partitioned_multilayer_threshold_jaccard_off_diagonal_is_pairwise() {
|
||||||
|
// 2 partitions, 2 layers each — the shape production indexes actually
|
||||||
|
// have (MPHF collision layers within a partition).
|
||||||
|
// partition 0, layer 0: col0=[3,0], col1=[0,3], col2=[3,3]
|
||||||
|
// partition 0, layer 1: col0=[2,0], col1=[0,0], col2=[2,0]
|
||||||
|
// partition 1, layer 0: col0=[1,1], col1=[1,0], col2=[0,1]
|
||||||
|
// partition 1, layer 1: col0=[0,5], col1=[5,5], col2=[0,0]
|
||||||
|
let (_d0a, p0a) = make_int_matrix(&[&[3, 0], &[0, 3], &[3, 3]]);
|
||||||
|
let (_d0b, p0b) = make_int_matrix(&[&[2, 0], &[0, 0], &[2, 0]]);
|
||||||
|
let (_d1a, p1a) = make_int_matrix(&[&[1, 1], &[1, 0], &[0, 1]]);
|
||||||
|
let (_d1b, p1b) = make_int_matrix(&[&[0, 5], &[5, 5], &[0, 0]]);
|
||||||
|
|
||||||
|
let partitioned = LayeredStore::new(vec![
|
||||||
|
LayeredStore::new(vec![p0a, p0b]),
|
||||||
|
LayeredStore::new(vec![p1a, p1b]),
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Flattened equivalent: concatenate every layer's slots into one matrix.
|
||||||
|
let (_df, mf) = make_int_matrix(&[
|
||||||
|
&[3, 0, 2, 0, 1, 1, 0, 5],
|
||||||
|
&[0, 3, 0, 0, 1, 0, 5, 5],
|
||||||
|
&[3, 3, 2, 0, 0, 1, 0, 0],
|
||||||
|
]);
|
||||||
|
let threshold = 1u32;
|
||||||
|
let (inter_p, union_p) = CountPartials::partial_threshold_jaccard(&partitioned, threshold);
|
||||||
|
let (inter_f, union_f) = CountPartials::partial_threshold_jaccard(&mf, threshold);
|
||||||
|
|
||||||
|
let n = 3;
|
||||||
|
for i in 0..n {
|
||||||
|
for j in 0..n {
|
||||||
|
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
|
||||||
|
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── BitPartials ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn layered_jaccard_matches_combined() {
|
||||||
|
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
|
||||||
|
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[true, false]]);
|
||||||
|
let store = LayeredStore::new(vec![m0, m1]);
|
||||||
|
|
||||||
|
let (_df, mf) = make_bit_matrix(&[
|
||||||
|
&[true, false, true, true],
|
||||||
|
&[false, true, true, false],
|
||||||
|
]);
|
||||||
|
let expected = BitPartials::jaccard_dist_matrix(&mf);
|
||||||
|
let got = BitPartials::jaccard_dist_matrix(&store);
|
||||||
|
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "jaccard [0,1]");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn layered_hamming_matches_combined() {
|
||||||
|
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
|
||||||
|
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[false, false]]);
|
||||||
|
let store = LayeredStore::new(vec![m0, m1]);
|
||||||
|
|
||||||
|
let (_df, mf) = make_bit_matrix(&[
|
||||||
|
&[true, false, true, true],
|
||||||
|
&[false, true, false, false],
|
||||||
|
]);
|
||||||
|
let expected = BitPartials::hamming_dist_matrix(&mf);
|
||||||
|
let got = BitPartials::hamming_dist_matrix(&store);
|
||||||
|
assert_eq!(got[[0, 1]], expected[[0, 1]], "hamming [0,1]");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn partitioned_bit_jaccard_off_diagonal_is_pairwise() {
|
||||||
|
// Same shape as the count-based `partitioned_multilayer_threshold_jaccard_*`
|
||||||
|
// tests, but for the presence/bit path (`with_counts = false` — what
|
||||||
|
// `all_specifics` actually uses in production).
|
||||||
|
// 4 genomes, 3 partitions, 2 layers in the last one.
|
||||||
|
let (_d0, p0) = make_bit_matrix(&[
|
||||||
|
&[true, false, true],
|
||||||
|
&[false, true, true],
|
||||||
|
&[true, true, false],
|
||||||
|
&[false, false, true],
|
||||||
|
]);
|
||||||
|
let (_d1, p1) = make_bit_matrix(&[
|
||||||
|
&[true, true],
|
||||||
|
&[false, true],
|
||||||
|
&[true, false],
|
||||||
|
&[true, true],
|
||||||
|
]);
|
||||||
|
let (_d2a, p2a) = make_bit_matrix(&[
|
||||||
|
&[false, true],
|
||||||
|
&[true, true],
|
||||||
|
&[false, false],
|
||||||
|
&[true, false],
|
||||||
|
]);
|
||||||
|
let (_d2b, p2b) = make_bit_matrix(&[
|
||||||
|
&[true],
|
||||||
|
&[false],
|
||||||
|
&[true],
|
||||||
|
&[true],
|
||||||
|
]);
|
||||||
|
|
||||||
|
let partitioned = LayeredStore::new(vec![
|
||||||
|
LayeredStore::new(vec![p0]),
|
||||||
|
LayeredStore::new(vec![p1]),
|
||||||
|
LayeredStore::new(vec![p2a, p2b]),
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Flattened equivalent: concatenate every partition/layer's slots.
|
||||||
|
let (_df, mf) = make_bit_matrix(&[
|
||||||
|
&[true, false, true, true, true, false, true, true],
|
||||||
|
&[false, true, true, false, true, true, true, false],
|
||||||
|
&[true, true, false, true, false, false, false, true],
|
||||||
|
&[false, false, true, true, true, true, false, true],
|
||||||
|
]);
|
||||||
|
|
||||||
|
let (inter_p, union_p) = BitPartials::partial_jaccard(&partitioned);
|
||||||
|
let (inter_f, union_f) = BitPartials::partial_jaccard(&mf);
|
||||||
|
|
||||||
|
let n = 4;
|
||||||
|
for i in 0..n {
|
||||||
|
for j in 0..n {
|
||||||
|
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
|
||||||
|
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn partitioned_bit_jaccard_packed_off_diagonal_is_pairwise() {
|
||||||
|
// Same as `partitioned_bit_jaccard_off_diagonal_is_pairwise` but every
|
||||||
|
// partition's presence matrix is packed into a single .pbmx file —
|
||||||
|
// the on-disk format actually used in production after `pack_matrices`.
|
||||||
|
use obicompactvec::pack_bit_matrix;
|
||||||
|
|
||||||
|
let (d0, _p0) = make_bit_matrix(&[
|
||||||
|
&[true, false, true],
|
||||||
|
&[false, true, true],
|
||||||
|
&[true, true, false],
|
||||||
|
&[false, false, true],
|
||||||
|
]);
|
||||||
|
pack_bit_matrix(&d0.path().join("presence")).unwrap();
|
||||||
|
let p0 = PersistentBitMatrix::open(d0.path()).unwrap();
|
||||||
|
|
||||||
|
let (d1, _p1) = make_bit_matrix(&[
|
||||||
|
&[true, true],
|
||||||
|
&[false, true],
|
||||||
|
&[true, false],
|
||||||
|
&[true, true],
|
||||||
|
]);
|
||||||
|
pack_bit_matrix(&d1.path().join("presence")).unwrap();
|
||||||
|
let p1 = PersistentBitMatrix::open(d1.path()).unwrap();
|
||||||
|
|
||||||
|
let (d2a, _p2a) = make_bit_matrix(&[
|
||||||
|
&[false, true],
|
||||||
|
&[true, true],
|
||||||
|
&[false, false],
|
||||||
|
&[true, false],
|
||||||
|
]);
|
||||||
|
pack_bit_matrix(&d2a.path().join("presence")).unwrap();
|
||||||
|
let p2a = PersistentBitMatrix::open(d2a.path()).unwrap();
|
||||||
|
|
||||||
|
let (d2b, _p2b) = make_bit_matrix(&[
|
||||||
|
&[true],
|
||||||
|
&[false],
|
||||||
|
&[true],
|
||||||
|
&[true],
|
||||||
|
]);
|
||||||
|
pack_bit_matrix(&d2b.path().join("presence")).unwrap();
|
||||||
|
let p2b = PersistentBitMatrix::open(d2b.path()).unwrap();
|
||||||
|
|
||||||
|
let partitioned = LayeredStore::new(vec![
|
||||||
|
LayeredStore::new(vec![p0]),
|
||||||
|
LayeredStore::new(vec![p1]),
|
||||||
|
LayeredStore::new(vec![p2a, p2b]),
|
||||||
|
]);
|
||||||
|
|
||||||
|
let (_df, mf) = make_bit_matrix(&[
|
||||||
|
&[true, false, true, true, true, false, true, true],
|
||||||
|
&[false, true, true, false, true, true, true, false],
|
||||||
|
&[true, true, false, true, false, false, false, true],
|
||||||
|
&[false, false, true, true, true, true, false, true],
|
||||||
|
]);
|
||||||
|
|
||||||
|
let (inter_p, union_p) = BitPartials::partial_jaccard(&partitioned);
|
||||||
|
let (inter_f, union_f) = BitPartials::partial_jaccard(&mf);
|
||||||
|
|
||||||
|
let n = 4;
|
||||||
|
for i in 0..n {
|
||||||
|
for j in 0..n {
|
||||||
|
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
|
||||||
|
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
+93
-2
@@ -266,14 +266,19 @@ impl CpuSample {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
||||||
|
let delta_wall = self.wall.elapsed().as_secs_f64();
|
||||||
|
if delta_wall < 0.1 {
|
||||||
|
// Window too short to be meaningful — leave state untouched so it
|
||||||
|
// keeps accumulating until a real sample can be taken.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
let n = CpuSample::now();
|
let n = CpuSample::now();
|
||||||
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
|
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
|
||||||
let delta_wall = self.wall.elapsed().as_secs_f64();
|
|
||||||
|
|
||||||
let efficiency = delta_ru / delta_wall;
|
let efficiency = delta_ru / delta_wall;
|
||||||
let activate = 0f64.max(efficiency - self.previous) >= threshold;
|
let activate = 0f64.max(efficiency - self.previous) >= threshold;
|
||||||
|
|
||||||
if activate {
|
|
||||||
debug!(
|
debug!(
|
||||||
"Do I activate : {} -> {} = {} Activate: {}",
|
"Do I activate : {} -> {} = {} Activate: {}",
|
||||||
self.previous,
|
self.previous,
|
||||||
@@ -285,7 +290,93 @@ impl CpuSample {
|
|||||||
self.user_secs = n.user_secs;
|
self.user_secs = n.user_secs;
|
||||||
self.sys_secs = n.sys_secs;
|
self.sys_secs = n.sys_secs;
|
||||||
self.wall = n.wall;
|
self.wall = n.wall;
|
||||||
|
|
||||||
|
activate
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── IoSample ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Snapshot of process-wide block I/O (bytes read + written) + wall clock.
|
||||||
|
///
|
||||||
|
/// Same activation protocol as [`CpuSample`], but the growth check in
|
||||||
|
/// [`do_i_activate`](Self::do_i_activate) is *relative* rather than absolute:
|
||||||
|
/// raw I/O throughput has no portable scale across storage devices, unlike a
|
||||||
|
/// core count.
|
||||||
|
pub struct IoSample {
|
||||||
|
wall: Instant,
|
||||||
|
bytes: u64,
|
||||||
|
previous_rate: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IoSample {
|
||||||
|
pub fn now() -> Self {
|
||||||
|
Self {
|
||||||
|
wall: Instant::now(),
|
||||||
|
bytes: Self::read_bytes(),
|
||||||
|
previous_rate: 0.0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bytes actually submitted to the block layer (read + write), summed
|
||||||
|
/// process-wide. Returns 0 if unavailable — degrades gracefully to a
|
||||||
|
/// signal that never triggers activation (CPU-only heuristic).
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
fn read_bytes() -> u64 {
|
||||||
|
let Ok(io) = std::fs::read_to_string("/proc/self/io") else {
|
||||||
|
return 0;
|
||||||
|
};
|
||||||
|
io.lines()
|
||||||
|
.filter_map(|l| {
|
||||||
|
l.strip_prefix("read_bytes: ")
|
||||||
|
.or_else(|| l.strip_prefix("write_bytes: "))
|
||||||
|
})
|
||||||
|
.filter_map(|v| v.trim().parse::<u64>().ok())
|
||||||
|
.sum()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(target_os = "macos")]
|
||||||
|
fn read_bytes() -> u64 {
|
||||||
|
use libc::{RUSAGE_INFO_V4, getpid, proc_pid_rusage, rusage_info_v4};
|
||||||
|
let mut info: rusage_info_v4 = unsafe { std::mem::zeroed() };
|
||||||
|
let ret =
|
||||||
|
unsafe { proc_pid_rusage(getpid(), RUSAGE_INFO_V4, &mut info as *mut _ as *mut _) };
|
||||||
|
if ret != 0 {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
info.ri_diskio_bytesread + info.ri_diskio_byteswritten
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
|
||||||
|
fn read_bytes() -> u64 {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same protocol as [`CpuSample::do_i_activate`] (0.1 s minimum window,
|
||||||
|
/// state untouched on early return), but growth is measured relative to
|
||||||
|
/// the previous rate. `threshold` is a fraction, e.g. `0.2` for a 20 %
|
||||||
|
/// increase in throughput since the last real sample.
|
||||||
|
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
||||||
|
let elapsed = self.wall.elapsed().as_secs_f64();
|
||||||
|
if elapsed < 0.1 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let n = Self::read_bytes();
|
||||||
|
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
|
||||||
|
let activate = if self.previous_rate == 0.0 {
|
||||||
|
rate > 0.0 // bootstrap: any measured throughput is signal enough
|
||||||
|
} else {
|
||||||
|
(rate - self.previous_rate) / self.previous_rate >= threshold
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"Do I activate (I/O) : {} -> {} Activate: {}",
|
||||||
|
self.previous_rate, rate, activate
|
||||||
|
);
|
||||||
|
self.previous_rate = rate;
|
||||||
|
self.bytes = n;
|
||||||
|
self.wall = Instant::now();
|
||||||
|
|
||||||
activate
|
activate
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user