6 Commits

Author SHA1 Message Date
coissac 5bdc0f826a Merge pull request 'fix: validate packed matrix columns before repacking' (#57) from push-vkqvorvsqnqx into main
Reviewed-on: #57
2026-07-03 15:26:55 +00:00
Eric Coissac cd2f2f9417 fix: validate packed matrix columns before repacking
Release / create-release (push) Successful in 2m27s
Release / build-linux-x86_64 (push) Successful in 8m15s
Release / build-macos-arm64 (push) Failing after 30s
CI / build (pull_request) Successful in 3m17s
Add header parsing helpers to extract column counts without memory mapping. Update packing functions to verify existing files match current metadata, preventing stale or widened-column artifacts. Extract inline tests in obilayeredmap to an external module and add comprehensive aggregation tests. Bump obikmer to 1.1.35 and clean up repository configuration.
2026-07-03 17:20:22 +02:00
coissac 7844239a8e Merge pull request 'Push msotyzponsls' (#56) from push-msotyzponsls into main
Reviewed-on: #56
2026-07-03 11:28:49 +00:00
Eric Coissac 2b37e8aac4 fix(bitmatrix): explicitly compute diagonal entries for self-similarity
Release / create-release (push) Successful in 2m26s
Release / build-linux-x86_64 (push) Successful in 8m13s
Release / build-macos-arm64 (push) Failing after 30s
CI / build (pull_request) Successful in 3m21s
The pairwise matrix functions now explicitly calculate and overwrite diagonal entries using `f(i,i)`, replacing previous implicit symmetric mirroring or default values. Documentation has been updated to clarify that diagonals represent self-comparison weights, ensuring accurate self-similarity calculations. Additionally, the obikmer crate version has been bumped to 1.1.34.
2026-07-03 13:04:40 +02:00
Eric Coissac 67b4e4da53 refactor(numa): replace flat runner with per-node activation channels
Shifts the NUMA-aware runner from a flat, round-robin model to a per-node architecture using dedicated `NodeActivation` channels. Replaces absolute deltas with relative scaling based on the previous growth step's worker count, decoupling growth from node count to fix slow ramp-up and enforce per-node fairness. Updates architecture documentation to reflect these changes and focus tuning questions on `INITIAL`/`GROWTH_DIVISOR` parameters for I/O-bound validation.
2026-07-03 13:03:31 +02:00
coissac 66ab4c6db1 Merge pull request 'feat(numa): introduce I/O sampling to prevent activation stalls' (#55) from push-ooruxnkktvvz into main
Reviewed-on: #55
2026-07-02 09:36:19 +00:00
10 changed files with 752 additions and 285 deletions
+1
View File
@@ -15,6 +15,7 @@ benchmark/simulated_data
benchmark/specimen_index_presence
benchmark/specimen_index_count
benchmark/global_index_presence
benchmark/all_specific
benchmark/global_index_count
benchmark/stats
benchmark/reference_index
+68 -17
View File
@@ -205,14 +205,28 @@ unconditionally — no `||` short-circuit — so neither window starves behind
whichever signal fires first:
```rust
let cpu_wants_more = cpu_sample.do_i_activate(CPU_SPAWN_THRESHOLD);
let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD);
if cpu_wants_more || io_wants_more {
activate_tx.send(()).ok();
...
activation.grow(GROWTH_DIVISOR, n_total);
}
```
The CPU threshold is *not* the flat absolute delta it started as: it scales
with `activation.last_step()` — the number of workers activated in the last
growth step, tracked by `NodeActivation` (`numa.rs`) and updated every time
`grow()` actually grows something. Growing by 8 workers should add ~8 cores of
efficiency if the workload is truly CPU-bound; requiring only
`CPU_SPAWN_THRESHOLD` (20 %) of that expected gain confirms the growth was
useful without demanding perfect linear scaling. Scaling by the *last step's
size* rather than the cumulative total keeps the bar equally meaningful
whether it's the 2nd growth step or the 20th — a flat absolute threshold
(0.2 core) is a strong signal at 8 active workers but pure noise at 150; a
threshold scaled by the *cumulative* total instead (considered and rejected)
would have made the bar essentially impossible to clear late in the ramp,
strangling exactly the CPU-bound saturation the mechanism exists to allow.
Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit),
raw I/O throughput has no natural scale across devices, so `IoSample` uses a
**relative** growth threshold instead of an absolute one:
@@ -242,27 +256,64 @@ the ~94-core artefact above) — one fix for both problems, and it removes the
need for any arbitrary I/O-rate floor: a short/noisy window is rejected
outright rather than papered over with a hardware-dependent constant.
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, both `0.2`)
are defined as `const` in `PartitionRunner::run` (`numa.rs`). The I/O value is
a starting point, not a derived one — needs empirical validation against a
real `pack` run.
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, module-level
`const` in `numa.rs`, both `0.2`) are a starting point, not a derived value:
`0.2` (20 % relative growth) for `IoSample` was chosen to match the CPU
threshold's *implicit* relative sensitivity (in the observed log, an 8→9
worker step raised efficiency by ~12 %) — but I/O throughput is lumpier than
CPU time (buffered writes flush in bursts), so it needs empirical validation
against a real `pack` run before being considered final.
Starting threshold: `0.2` (20 % relative growth) for `IoSample`, same order of
magnitude as the CPU threshold's *implicit* relative sensitivity (in the
observed log, an 8→9 worker step raised efficiency by ~12 %). This is a
starting point, not a derived value — I/O throughput is lumpier than CPU time
(buffered writes flush in bursts), so it needs empirical validation against a
real `pack` run before being considered final.
## Known issue: ramp-up too slow, and confused with node count
The original design started `n_nodes` workers (one per node) and grew one
worker at a time. On a real `filter` run this took ~10 minutes to climb from
9 to ~40 active workers even on the CPU-bound `rebuild` stage — most of a
35-minute stage spent under-provisioned while waiting for evidence to
accumulate one worker at a time. There is no scale-down mechanism (`n_active`
only grows), so the original caution was deliberate — but a quarter of
available cores is still far from saturation, and the real risk zone (over-provisioning
a memory-bandwidth-bound stage) only shows up much later in the ramp, near
full occupancy — not at 25 %.
The fix decouples ramp speed from node *count*: both the initial size and the
growth step are a fraction of `workers_per_node` (node *size*), applied
identically on every node. A single-NUMA-node (UMA) machine ramps exactly as
fast as an 8-node one — growing by `n_nodes` per step, as first considered,
would have degenerated to "grow by 1" on UMA, reproducing the original
problem for exactly the machines that need the fix most.
```rust
// NodeActivation::grow — called both at startup (activate_initial) and on
// every CPU/IO-triggered growth step, with a different divisor each time.
let wanted = (self.caps[idx] / divisor).max(1); // INITIAL_DIVISOR=4 at startup, GROWTH_DIVISOR=8 per step
let room = self.caps[idx].saturating_sub(self.active[idx]);
let grow = wanted.min(room).min(n_total.saturating_sub(self.total));
```
This also fixed a latent correctness gap: the original single shared
`activate_tx`/`activate_rx` pair had *no* per-node addressing — sending one
activation signal woke up whichever dormant worker (from any node) happened
to win the race on that channel. `crossbeam_channel` gives no fairness
guarantee across competing receivers, so "round-robin across nodes" was an
assumption the code never actually enforced. `PartitionRunner::run` now opens
one activation channel per node (`activate_txs`/`activate_rxs`, one pair per
`NodeConfig`); `NodeActivation` (`numa.rs`) tracks how many of each node's
dormant workers have been woken and grows every node by the same amount per
step, capped by that node's remaining dormant workers and by the run's total
budget (`n_total`) — balance across nodes is now guaranteed by construction,
not incidental to channel implementation details.
## Open questions
- **Error handling**: `run` currently returns the first error; remaining errors
are dropped. A `Vec<E>` return would give complete diagnostics.
- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
for merge on BeeGFS. Superseded by the I/O signal above for the "more
workers would help despite flat CPU" case — a per-call override may still be
worth keeping as a manual escape hatch.
- **`INITIAL_DIVISOR` / `GROWTH_DIVISOR` tuning**: currently `4` and `8`
(start at 1/4 of a node's cores, grow by 1/8 per step), chosen to fix an
observed too-slow ramp — not yet validated against a real `pack` (I/O-bound)
run, where over-provisioning risk is different from the CPU-bound `rebuild`
case this was tuned against.
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
+1 -1
View File
@@ -1704,7 +1704,7 @@ dependencies = [
[[package]]
name = "obikmer"
version = "1.1.33"
version = "1.1.35"
dependencies = [
"clap",
"csv",
BIN
View File
Binary file not shown.
+47 -9
View File
@@ -1,5 +1,5 @@
use std::fs::{self, File};
use std::io::{self, BufWriter, Write as _};
use std::io::{self, BufWriter, Read as _, Write as _};
use std::path::{Path, PathBuf};
use memmap2::Mmap;
@@ -171,19 +171,43 @@ impl PackedBitMatrix {
}
}
/// Reads just the `n_cols` field from an existing packed matrix's header,
/// without mapping the file. Used by `pack_bit_matrix` to tell a genuinely
/// complete pack from a stale one that predates a later column-widening.
fn packed_bit_matrix_n_cols(path: &Path) -> io::Result<usize> {
let mut f = File::open(path)?;
let mut header = [0u8; PBMX_HEADER];
f.read_exact(&mut header)?;
Ok(u64::from_le_bytes(header[16..24].try_into().unwrap()) as usize)
}
/// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files.
pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> {
let packed_path = dir.join("matrix.pbmx");
if packed_path.exists() {
// Matrix complete; remove any leftover column files from a killed cleanup.
if let Ok(meta) = MatrixMeta::load(dir) {
let meta = match MatrixMeta::load(dir) {
Ok(meta) => meta,
Err(e) => {
// No columnar data pending: either this layer was already
// packed and cleaned up (matrix.pbmx complete, nothing left to
// do), or genuinely nothing was ever written here.
return if packed_path.exists() { Ok(()) } else { Err(e) };
}
};
// A `matrix.pbmx` can already exist here even though columnar data is
// still pending — e.g. copied verbatim from a merge's base source
// before this layer was widened with more genome columns (see
// `obikpartitionner::merge_partition`). Only skip (re-)packing if the
// existing file already reflects the current column count; otherwise
// the columnar files are newer and must be (re-)packed, overwriting the
// stale one — never silently discarded as "leftover cleanup".
if packed_bit_matrix_n_cols(&packed_path).ok() == Some(meta.n_cols) {
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
let _ = fs::remove_file(dir.join("meta.json"));
}
return Ok(());
}
let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols;
// Compute offsets from file sizes — no column data loaded into RAM.
@@ -500,17 +524,26 @@ where T: Clone + Default {
}
/// Compute a symmetric `n×n` matrix in parallel by evaluating `f(i,j)` for
/// all upper-triangle pairs. `T: Copy` avoids the `.clone()` needed for the
/// lower-triangle mirror.
/// all upper-triangle pairs, plus `f(i,i)` for the diagonal. `T: Copy` avoids
/// the `.clone()` needed for the lower-triangle mirror.
///
/// The diagonal is *not* generally `T::default()`: for a self-comparison,
/// `f(i,i)` is often the column's own weight (e.g. intersection-with-self —
/// see `pairwise2_matrix`), not zero. Distance finalisations that need a
/// zero diagonal (self-distance) already overwrite it explicitly.
pub(crate) fn pairwise_matrix<T>(n: usize, f: impl Fn(usize, usize) -> T + Sync) -> Array2<T>
where T: Copy + Default + Send {
let results: Vec<(usize, usize, T)> = upper_pairs(n)
.into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect();
fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v)))
let mut m = fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v)));
for i in 0..n { m[[i, i]] = f(i, i); }
m
}
/// Same as `pairwise_matrix` but `f` returns two values that fill two
/// symmetric matrices simultaneously (e.g. intersection + union for Jaccard).
/// The diagonal is `f(i,i)` (e.g. a genome's kmer count intersected with
/// itself), not `T::default()` — see `pairwise_matrix` for why that matters.
pub(crate) fn pairwise2_matrix<T>(n: usize, f: impl Fn(usize, usize) -> (T, T) + Sync) -> (Array2<T>, Array2<T>)
where T: Copy + Default + Send {
let results: Vec<(usize, usize, T, T)> = upper_pairs(n)
@@ -523,5 +556,10 @@ where T: Copy + Default + Send {
m0[[i, j]] = a; m0[[j, i]] = a;
m1[[i, j]] = b; m1[[j, i]] = b;
}
for i in 0..n {
let (a, b) = f(i, i);
m0[[i, i]] = a;
m1[[i, i]] = b;
}
(m0, m1)
}
+32 -5
View File
@@ -1,5 +1,5 @@
use std::fs::{self, File};
use std::io::{self, BufWriter, Write as _};
use std::io::{self, BufWriter, Read as _, Write as _};
use std::path::{Path, PathBuf};
use memmap2::Mmap;
@@ -228,17 +228,44 @@ impl PackedCompactIntMatrix {
}
}
/// Reads just the `n_cols` field from an existing packed matrix's header,
/// without mapping the file. Used by `pack_compact_int_matrix` to tell a
/// genuinely complete pack from a stale one that predates a later
/// column-widening.
fn packed_int_matrix_n_cols(path: &Path) -> io::Result<usize> {
let mut f = File::open(path)?;
let mut header = [0u8; PCMX_HEADER];
f.read_exact(&mut header)?;
Ok(u64::from_le_bytes(header[16..24].try_into().unwrap()) as usize)
}
/// Build `counts/matrix.pcmx` from existing `col_*.pciv` files.
pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> {
let packed_path = dir.join("matrix.pcmx");
if packed_path.exists() {
if let Ok(meta) = MatrixMeta::load(dir) {
let meta = match MatrixMeta::load(dir) {
Ok(meta) => meta,
Err(e) => {
// No columnar data pending: either this layer was already
// packed and cleaned up (matrix.pcmx complete, nothing left to
// do), or genuinely nothing was ever written here.
return if packed_path.exists() { Ok(()) } else { Err(e) };
}
};
// A `matrix.pcmx` can already exist here even though columnar data is
// still pending — e.g. copied verbatim from a merge's base source
// before this layer was widened with more genome columns (see
// `obikpartitionner::merge_partition`). Only skip (re-)packing if the
// existing file already reflects the current column count; otherwise
// the columnar files are newer and must be (re-)packed, overwriting the
// stale one — never silently discarded as "leftover cleanup".
if packed_int_matrix_n_cols(&packed_path).ok() == Some(meta.n_cols) {
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
let _ = fs::remove_file(dir.join("meta.json"));
}
return Ok(());
}
let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols;
let col_sizes: Vec<u64> = (0..n_cols)
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
+186 -60
View File
@@ -70,7 +70,10 @@ pub fn build() -> NumaSetup {
nodes.len(),
nodes.first().map_or(0, |v| v.len()),
);
return NumaSetup { pools, cpus_per_node: nodes };
return NumaSetup {
pools,
cpus_per_node: nodes,
};
}
}
}
@@ -102,7 +105,9 @@ pub fn build() -> NumaSetup {
/// Silently returns on any error so the thread still runs, just unbound.
#[cfg(feature = "numa")]
pub fn pin_current_thread(cpu_indices: &[usize]) {
let Ok(topology) = Topology::new() else { return };
let Ok(topology) = Topology::new() else {
return;
};
let mut cpuset = CpuSet::new();
for &idx in cpu_indices {
cpuset.set(idx);
@@ -132,7 +137,22 @@ fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
.ok()
}
// ── PartitionRunner ───────────────────────────────────────────────────────────
// ── PartitionRunner ─────────────────────────────────────────────────────────
/// Growth step (fraction of a node's worker capacity added per activation
/// event, see [`NodeActivation::grow`]).
const GROWTH_DIVISOR: usize = 8;
/// Minimum CPU efficiency growth to activate more workers, as a fraction of
/// the size of the *last growth step* (e.g. `0.2` after adding 8 workers
/// requires the next check to show at least +1.6 cores of growth — 20 % of
/// the ~8 cores those 8 workers should contribute if the workload is truly
/// CPU-bound). Scaling by the last step's size — not the cumulative total —
/// keeps the bar meaningful regardless of how many workers are already
/// active, instead of demanding an ever-larger absolute jump as the pool
/// grows.
const CPU_SPAWN_THRESHOLD: f64 = 0.2;
/// Minimum I/O throughput growth (relative) to activate more workers.
const IO_SPAWN_THRESHOLD: f64 = 0.2;
struct NodeConfig {
pool: Option<Arc<rayon::ThreadPool>>,
@@ -142,19 +162,23 @@ struct NodeConfig {
/// Generic NUMA-aware runner for partition-level parallel work.
///
/// Workers are distributed round-robin across NUMA nodes and pinned to their
/// Workers are distributed evenly across NUMA nodes and pinned to their
/// node's CPUs. UMA is the degenerate case: one node, no pinning.
///
/// Workers are pre-spawned dormant and activated one by one as CPU efficiency
/// falls below `SPAWN_THRESHOLD`. This avoids over-provisioning on I/O-bound
/// or memory-bandwidth-bound workloads while saturating CPU-bound ones.
/// Workers are pre-spawned dormant, one activation channel per node so
/// growth always targets a specific node rather than whichever dormant
/// worker happens to wake up first on a shared channel. Growth (both the
/// initial count and each subsequent step) is expressed as a fraction of
/// `workers_per_node`, applied identically to every node, so the pace of
/// ramp-up depends on node size rather than node count — a single-NUMA-node
/// (UMA) machine ramps just as fast as an 8-node one.
///
/// # Termination
///
/// ```text
/// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
/// drop(result_tx) → result_rx closes → controller loop exits
/// drop(activate_tx) → dormant workers exit cleanly
/// drop(activate_txs) → dormant workers exit cleanly
/// ```
pub struct PartitionRunner {
nodes: Vec<NodeConfig>,
@@ -175,7 +199,8 @@ impl PartitionRunner {
ns.pools.len(),
wpn,
);
let nodes = ns.pools
let nodes = ns
.pools
.into_iter()
.zip(ns.cpus_per_node)
.map(|(pool, cpu_ids)| NodeConfig {
@@ -189,26 +214,24 @@ impl PartitionRunner {
/// Run `f(i)` for every index in `order`.
///
/// Workers are pre-spawned dormant and activated adaptively. A timer thread
/// fires an efficiency check every `TIMER_SECS` seconds; each completed
/// partition resets that timer (forcing an immediate check) and also
/// triggers its own inline check. A new worker is activated whenever CPU
/// efficiency grows by at least `CPU_SPAWN_THRESHOLD` (absolute, in cores)
/// or I/O throughput grows by at least `IO_SPAWN_THRESHOLD` (relative) since
/// the last check — whichever resource is the actual bottleneck still shows
/// headroom.
/// Workers are pre-spawned dormant and activated adaptively, per node:
/// `(workers_per_node / INITIAL_DIVISOR).max(1)` are woken immediately on
/// every node, then `(workers_per_node / GROWTH_DIVISOR).max(1)` more per
/// node each time the check below fires. A timer thread fires that check
/// every `TIMER_SECS` seconds; each completed partition resets that timer
/// (forcing an immediate check) and also triggers its own inline check. A
/// growth step happens whenever CPU efficiency grows by at least
/// `CPU_SPAWN_THRESHOLD` of what the last growth step should have
/// contributed, or I/O throughput grows by at least `IO_SPAWN_THRESHOLD`
/// (relative) since the last check — whichever resource is the actual
/// bottleneck still shows headroom.
///
/// `on_done(i, result, elapsed)` is called from the controller thread as
/// each partition completes — suitable for progress bars and result
/// aggregation.
///
/// Returns the first error produced by `f`, if any.
pub fn run<F, R, E, C>(
&self,
order: &[usize],
f: F,
mut on_done: C,
) -> Result<(), E>
pub fn run<F, R, E, C>(&self, order: &[usize], f: F, mut on_done: C) -> Result<(), E>
where
F: Fn(usize) -> Result<R, E> + Send + Sync,
R: Send,
@@ -220,23 +243,28 @@ impl PartitionRunner {
return Ok(());
}
const CPU_SPAWN_THRESHOLD: f64 = 0.2;
const IO_SPAWN_THRESHOLD: f64 = 0.2;
const TIMER_SECS: u64 = 30;
const INITIAL_DIVISOR: usize = 4;
// ── Channels ──────────────────────────────────────────────────────────
let (part_tx, part_rx) = unbounded::<usize>();
let (activate_tx, activate_rx) = unbounded::<()>();
// reset_tx: controller → timer ("reset the 30 s window")
let (reset_tx, reset_rx) = unbounded::<()>();
// event_tx: workers + timer → controller (unified event stream)
let (event_tx, event_rx) = unbounded::<WorkerEvent<R, E>>();
// One activation channel per node: growth always targets a specific
// node, rather than whichever dormant worker happens to win the race
// on a channel shared across all nodes.
let (activate_txs, activate_rxs): (Vec<_>, Vec<_>) =
(0..self.nodes.len()).map(|_| unbounded::<()>()).unzip();
for &i in order { part_tx.send(i).ok(); }
for &i in order {
part_tx.send(i).ok();
}
drop(part_tx);
let max_workers = self.max_workers();
let n_nodes = self.nodes.len();
let node_caps: Vec<usize> = self.nodes.iter().map(|n| n.max_workers).collect();
let f = &f;
let mut first_err: Option<E> = None;
@@ -260,18 +288,23 @@ impl PartitionRunner {
}
});
// ── Pre-spawn workers dormant, round-robin across NUMA nodes ──────
for w in 0..max_workers {
let node = &self.nodes[w % n_nodes];
// ── Pre-spawn workers dormant, grouped by node ────────────────────
// Each worker listens on its own node's activation channel only.
for (node, arx) in self.nodes.iter().zip(activate_rxs.iter()) {
let cpu_ids = &node.cpu_ids;
for _ in 0..node.max_workers {
let prx = part_rx.clone();
let etx = event_tx.clone();
let arx = activate_rx.clone();
let arx = arx.clone();
let pool = node.pool.clone();
let cpu_ids = &node.cpu_ids;
s.spawn(move || {
if arx.recv().is_err() { return; }
if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); }
if arx.recv().is_err() {
return;
}
if !cpu_ids.is_empty() {
pin_current_thread(cpu_ids);
}
for i in &prx {
let t = Instant::now();
let r = match &pool {
@@ -282,14 +315,15 @@ impl PartitionRunner {
}
});
}
}
// Drop controller's event_tx: event_rx closes when all workers +
// timer have exited.
drop(event_tx);
// ── Controller ────────────────────────────────────────────────────
let initial_workers = n_nodes.min(max_workers).min(n_total);
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
let mut n_active = initial_workers;
let mut activation = NodeActivation::new(&activate_txs, &node_caps, max_workers);
activation.activate_initial(INITIAL_DIVISOR, n_total);
let mut cpu_sample = CpuSample::now();
let mut io_sample = IoSample::now();
let mut completed = 0usize;
@@ -300,32 +334,39 @@ impl PartitionRunner {
WorkerEvent::Completed(i, r, dur) => {
match r {
Ok(v) => on_done(i, v, dur),
Err(e) => { if first_err.is_none() { first_err = Some(e); } }
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
}
}
completed += 1;
// Reset the 30 s timer.
reset_tx.send(()).ok();
// Inline check: same logic as a timer tick.
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
&mut io_sample, IO_SPAWN_THRESHOLD,
completed, n_total,
&mut activation,
&mut cpu_sample,
&mut io_sample,
completed,
n_total,
);
}
WorkerEvent::TimerTick => {
maybe_activate(
&activate_tx, &mut n_active, max_workers,
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
&mut io_sample, IO_SPAWN_THRESHOLD,
completed, n_total,
&mut activation,
&mut cpu_sample,
&mut io_sample,
completed,
n_total,
);
}
}
}
// Dormant workers exit when activate_tx closes.
drop(activate_tx);
// Dormant workers exit once every sender for their node's channel
// is dropped — `activate_txs` holds the only ones.
drop(activate_txs);
// Timer thread exits when reset_tx closes.
drop(reset_tx);
});
@@ -344,28 +385,113 @@ enum WorkerEvent<R, E> {
TimerTick,
}
/// Tracks how many of each node's dormant workers have been woken, and
/// grows every node by the same amount at each step (capped by that node's
/// remaining dormant workers and by the run's total budget) so load stays
/// balanced across nodes at every point in time — never just "one more
/// worker somewhere". Also remembers the size of the last real growth step
/// (`last_step`), used to scale the CPU activation threshold to what that
/// step could plausibly have contributed (see `maybe_activate`).
struct NodeActivation<'a> {
txs: &'a [crossbeam_channel::Sender<()>],
caps: &'a [usize],
active: Vec<usize>,
total: usize,
max: usize,
last_step: usize,
}
impl<'a> NodeActivation<'a> {
fn new(txs: &'a [crossbeam_channel::Sender<()>], caps: &'a [usize], max: usize) -> Self {
Self {
txs,
caps,
active: vec![0; txs.len()],
total: 0,
max,
last_step: 0,
}
}
fn total(&self) -> usize {
self.total
}
fn last_step(&self) -> usize {
self.last_step
}
fn max(&self) -> usize {
self.max
}
fn is_full(&self) -> bool {
self.total >= self.max
}
/// Wake up to `(node_cap / divisor).max(1)` dormant workers on every
/// node, capped by `n_total`. Called once at startup, unconditionally.
fn activate_initial(&mut self, divisor: usize, n_total: usize) {
self.grow(divisor, n_total);
}
/// Same per-node sizing as [`activate_initial`](Self::activate_initial),
/// applied as a growth step. Returns the number of workers actually
/// activated (may be less than requested once a node or the total
/// budget is exhausted). Updates `last_step` when it actually grew.
fn grow(&mut self, divisor: usize, n_total: usize) -> usize {
let before = self.total;
for idx in 0..self.txs.len() {
let wanted = (self.caps[idx] / divisor).max(1);
let room = self.caps[idx].saturating_sub(self.active[idx]);
let grow = wanted.min(room).min(n_total.saturating_sub(self.total));
for _ in 0..grow {
self.txs[idx].send(()).ok();
}
self.active[idx] += grow;
self.total += grow;
}
let grew = self.total - before;
if grew > 0 {
self.last_step = grew;
}
grew
}
}
fn maybe_activate(
activate_tx: &crossbeam_channel::Sender<()>,
n_active: &mut usize,
max_workers: usize,
activation: &mut NodeActivation,
cpu_sample: &mut CpuSample,
cpu_threshold: f64,
io_sample: &mut IoSample,
io_threshold: f64,
completed: usize,
n_total: usize,
) {
if *n_active >= max_workers || completed >= n_total { return; }
if activation.is_full() || completed >= n_total {
return;
}
// Expect roughly 1 core of extra efficiency per worker activated in the
// last growth step (CPU-bound case); require at least CPU_SPAWN_THRESHOLD
// (20 %) of that expected gain before growing again. Scaling by the last
// step's size — not the cumulative total — keeps the bar meaningful
// regardless of how many workers are already active: growing by 8 should
// always take ~+1.6 cores to confirm, whether that's the 2nd growth step
// or the 20th.
let cpu_threshold = CPU_SPAWN_THRESHOLD * activation.last_step() as f64;
// Call both unconditionally (no `||` short-circuit): each sampler must
// advance its own window every tick, regardless of what the other one
// reports, or it would starve behind whichever signal fires first.
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
let io_wants_more = io_sample.do_i_activate(io_threshold);
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD * activation.last_step() as f64);
if !(cpu_wants_more || io_wants_more) {
return;
}
if cpu_wants_more || io_wants_more {
activate_tx.send(()).ok();
*n_active += 1;
debug!("activated worker {}/{}", n_active, max_workers);
let grew = activation.grow(GROWTH_DIVISOR, n_total);
if grew > 0 {
debug!(
"activated {} worker(s) — {}/{} active",
grew,
activation.total(),
activation.max()
);
}
}
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "obikmer"
version = "1.1.33"
version = "1.1.35"
edition = "2024"
[[bin]]
+2 -159
View File
@@ -96,162 +96,5 @@ impl<S: BitPartials> BitPartials for LayeredStore<S> {
// ── Tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use obicompactvec::{
PersistentBitMatrix, PersistentBitMatrixBuilder,
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder,
};
use tempfile::tempdir;
fn make_int_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) {
let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap();
let mut b = PersistentCompactIntMatrixBuilder::new(n, &dir.path().join("counts")).unwrap();
for &col in cols {
let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
cb.close().unwrap();
}
b.close().unwrap();
let m = PersistentCompactIntMatrix::open(dir.path()).unwrap();
(dir, m)
}
fn make_bit_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) {
let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap();
let mut b = PersistentBitMatrixBuilder::new(n, &dir.path().join("presence")).unwrap();
for &col in cols {
let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
cb.close().unwrap();
}
b.close().unwrap();
let m = PersistentBitMatrix::open(dir.path()).unwrap();
(dir, m)
}
// ── ColumnWeights ─────────────────────────────────────────────────────────
#[test]
fn col_weights_sums_across_layers() {
// layer 0: col0=[1,2], col1=[3,4] → weights [3, 7]
// layer 1: col0=[10,0], col1=[0,10] → weights [10, 10]
// combined: [13, 17]
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[3, 4]]);
let (_d1, m1) = make_int_matrix(&[&[10, 0], &[0, 10]]);
let store = LayeredStore::new(vec![m0, m1]);
let w = store.col_weights();
assert_eq!(w[0], 13);
assert_eq!(w[1], 17);
}
#[test]
fn col_weights_bit_sums_across_layers() {
// layer 0: col0=[T,F,T], col1=[F,T,T] → counts [2, 2]
// layer 1: col0=[F,F,T], col1=[T,T,F] → counts [1, 2]
// combined: [3, 4]
let (_d0, m0) = make_bit_matrix(&[&[true, false, true], &[false, true, true]]);
let (_d1, m1) = make_bit_matrix(&[&[false, false, true], &[true, true, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let w = store.col_weights();
assert_eq!(w[0], 3);
assert_eq!(w[1], 4);
}
// ── CountPartials — layered (one partition) ───────────────────────────────
#[test]
fn layered_bray_matches_combined() {
// Split [1,2,3,4,5] across two layers; bray dist should equal direct computation
// on [1,2,3,4,5] for each column pair.
// col0=[1,2,3,4,5], col1=[5,4,3,2,1]
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]); // slots 0-1
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]); // slots 2-4
let store = LayeredStore::new(vec![m0, m1]);
// direct on full data
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let expected = CountPartials::bray_dist_matrix(&mf);
let got = CountPartials::bray_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "bray [0,1]");
assert!((got[[1, 0]] - expected[[1, 0]]).abs() < 1e-12, "bray [1,0]");
}
#[test]
fn layered_relfreq_bray_matches_combined() {
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]);
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let expected = CountPartials::relfreq_bray_dist_matrix(&mf);
let got = CountPartials::relfreq_bray_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "relfreq_bray [0,1]");
}
#[test]
fn layered_euclidean_matches_combined() {
let (_d0, m0) = make_int_matrix(&[&[3, 0], &[0, 4]]);
let (_d1, m1) = make_int_matrix(&[&[1, 1], &[2, 2]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 4, 2, 2]]);
let expected = CountPartials::euclidean_dist_matrix(&mf);
let got = CountPartials::euclidean_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "euclidean [0,1]");
}
// ── CountPartials — partitioned (LayeredStore<LayeredStore<_>>) ───────────
#[test]
fn partitioned_bray_matches_combined() {
// partition 0: slots [1,2,3,4,5] col0 vs col1
// partition 1: slots [10,20] col0 vs col1
let (_d0, p0) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let (_d1, p1) = make_int_matrix(&[&[10, 20], &[20, 10]]);
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
]);
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5, 10, 20], &[5, 4, 3, 2, 1, 20, 10]]);
let expected = CountPartials::bray_dist_matrix(&mf);
let got = CountPartials::bray_dist_matrix(&partitioned);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "partitioned bray [0,1]");
}
// ── BitPartials ───────────────────────────────────────────────────────────
#[test]
fn layered_jaccard_matches_combined() {
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[true, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true],
&[false, true, true, false],
]);
let expected = BitPartials::jaccard_dist_matrix(&mf);
let got = BitPartials::jaccard_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "jaccard [0,1]");
}
#[test]
fn layered_hamming_matches_combined() {
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[false, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true],
&[false, true, false, false],
]);
let expected = BitPartials::hamming_dist_matrix(&mf);
let got = BitPartials::hamming_dist_matrix(&store);
assert_eq!(got[[0, 1]], expected[[0, 1]], "hamming [0,1]");
}
}
#[path = "tests/layered_store.rs"]
mod tests;
@@ -0,0 +1,381 @@
use super::*;
use obicompactvec::{
PersistentBitMatrix, PersistentBitMatrixBuilder,
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder,
};
use tempfile::tempdir;
fn make_int_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) {
let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap();
let mut b = PersistentCompactIntMatrixBuilder::new(n, &dir.path().join("counts")).unwrap();
for &col in cols {
let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
cb.close().unwrap();
}
b.close().unwrap();
let m = PersistentCompactIntMatrix::open(dir.path()).unwrap();
(dir, m)
}
fn make_bit_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) {
let n = cols.first().map_or(0, |c| c.len());
let dir = tempdir().unwrap();
let mut b = PersistentBitMatrixBuilder::new(n, &dir.path().join("presence")).unwrap();
for &col in cols {
let mut cb = b.add_col().unwrap();
for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); }
cb.close().unwrap();
}
b.close().unwrap();
let m = PersistentBitMatrix::open(dir.path()).unwrap();
(dir, m)
}
// ── ColumnWeights ─────────────────────────────────────────────────────────
#[test]
fn col_weights_sums_across_layers() {
// layer 0: col0=[1,2], col1=[3,4] → weights [3, 7]
// layer 1: col0=[10,0], col1=[0,10] → weights [10, 10]
// combined: [13, 17]
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[3, 4]]);
let (_d1, m1) = make_int_matrix(&[&[10, 0], &[0, 10]]);
let store = LayeredStore::new(vec![m0, m1]);
let w = store.col_weights();
assert_eq!(w[0], 13);
assert_eq!(w[1], 17);
}
#[test]
fn col_weights_bit_sums_across_layers() {
// layer 0: col0=[T,F,T], col1=[F,T,T] → counts [2, 2]
// layer 1: col0=[F,F,T], col1=[T,T,F] → counts [1, 2]
// combined: [3, 4]
let (_d0, m0) = make_bit_matrix(&[&[true, false, true], &[false, true, true]]);
let (_d1, m1) = make_bit_matrix(&[&[false, false, true], &[true, true, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let w = store.col_weights();
assert_eq!(w[0], 3);
assert_eq!(w[1], 4);
}
// ── CountPartials — layered (one partition) ───────────────────────────────
#[test]
fn layered_bray_matches_combined() {
// Split [1,2,3,4,5] across two layers; bray dist should equal direct computation
// on [1,2,3,4,5] for each column pair.
// col0=[1,2,3,4,5], col1=[5,4,3,2,1]
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]); // slots 0-1
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]); // slots 2-4
let store = LayeredStore::new(vec![m0, m1]);
// direct on full data
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let expected = CountPartials::bray_dist_matrix(&mf);
let got = CountPartials::bray_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "bray [0,1]");
assert!((got[[1, 0]] - expected[[1, 0]]).abs() < 1e-12, "bray [1,0]");
}
#[test]
fn layered_relfreq_bray_matches_combined() {
let (_d0, m0) = make_int_matrix(&[&[1, 2], &[5, 4]]);
let (_d1, m1) = make_int_matrix(&[&[3, 4, 5], &[3, 2, 1]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let expected = CountPartials::relfreq_bray_dist_matrix(&mf);
let got = CountPartials::relfreq_bray_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "relfreq_bray [0,1]");
}
#[test]
fn layered_euclidean_matches_combined() {
let (_d0, m0) = make_int_matrix(&[&[3, 0], &[0, 4]]);
let (_d1, m1) = make_int_matrix(&[&[1, 1], &[2, 2]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 4, 2, 2]]);
let expected = CountPartials::euclidean_dist_matrix(&mf);
let got = CountPartials::euclidean_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "euclidean [0,1]");
}
// ── CountPartials — partitioned (LayeredStore<LayeredStore<_>>) ───────────
#[test]
fn partitioned_bray_matches_combined() {
// partition 0: slots [1,2,3,4,5] col0 vs col1
// partition 1: slots [10,20] col0 vs col1
let (_d0, p0) = make_int_matrix(&[&[1, 2, 3, 4, 5], &[5, 4, 3, 2, 1]]);
let (_d1, p1) = make_int_matrix(&[&[10, 20], &[20, 10]]);
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
]);
let (_df, mf) = make_int_matrix(&[&[1, 2, 3, 4, 5, 10, 20], &[5, 4, 3, 2, 1, 20, 10]]);
let expected = CountPartials::bray_dist_matrix(&mf);
let got = CountPartials::bray_dist_matrix(&partitioned);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "partitioned bray [0,1]");
}
#[test]
fn partitioned_threshold_jaccard_off_diagonal_is_pairwise() {
// 3 genomes, 2 partitions, 1 layer each — mirrors distance.rs's
// LayeredStore<LayeredStore<PersistentCompactIntMatrix>> shape.
// partition 0: col0=[3,0], col1=[0,3], col2=[3,3]
// partition 1: col0=[1,1], col1=[1,0], col2=[0,1]
let (_d0, p0) = make_int_matrix(&[&[3, 0], &[0, 3], &[3, 3]]);
let (_d1, p1) = make_int_matrix(&[&[1, 1], &[1, 0], &[0, 1]]);
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
]);
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 3, 1, 0], &[3, 3, 0, 1]]);
let threshold = 1u32;
let (inter_p, union_p) = CountPartials::partial_threshold_jaccard(&partitioned, threshold);
let (inter_f, union_f) = CountPartials::partial_threshold_jaccard(&mf, threshold);
let n = 3;
for i in 0..n {
for j in 0..n {
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
}
}
}
#[test]
fn partitioned_threshold_jaccard_packed_off_diagonal_is_pairwise() {
// Same as `partitioned_threshold_jaccard_off_diagonal_is_pairwise` but
// each partition matrix is packed into a single .pcmx file first —
// the on-disk format actually used in production after `pack_matrices`.
use obicompactvec::pack_compact_int_matrix;
let (d0, _p0) = make_int_matrix(&[&[3, 0], &[0, 3], &[3, 3]]);
pack_compact_int_matrix(&d0.path().join("counts")).unwrap();
let p0 = PersistentCompactIntMatrix::open(d0.path()).unwrap();
let (d1, _p1) = make_int_matrix(&[&[1, 1], &[1, 0], &[0, 1]]);
pack_compact_int_matrix(&d1.path().join("counts")).unwrap();
let p1 = PersistentCompactIntMatrix::open(d1.path()).unwrap();
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
]);
let (_df, mf) = make_int_matrix(&[&[3, 0, 1, 1], &[0, 3, 1, 0], &[3, 3, 0, 1]]);
let threshold = 1u32;
let (inter_p, union_p) = CountPartials::partial_threshold_jaccard(&partitioned, threshold);
let (inter_f, union_f) = CountPartials::partial_threshold_jaccard(&mf, threshold);
let n = 3;
for i in 0..n {
for j in 0..n {
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
}
}
}
#[test]
fn partitioned_multilayer_threshold_jaccard_off_diagonal_is_pairwise() {
// 2 partitions, 2 layers each — the shape production indexes actually
// have (MPHF collision layers within a partition).
// partition 0, layer 0: col0=[3,0], col1=[0,3], col2=[3,3]
// partition 0, layer 1: col0=[2,0], col1=[0,0], col2=[2,0]
// partition 1, layer 0: col0=[1,1], col1=[1,0], col2=[0,1]
// partition 1, layer 1: col0=[0,5], col1=[5,5], col2=[0,0]
let (_d0a, p0a) = make_int_matrix(&[&[3, 0], &[0, 3], &[3, 3]]);
let (_d0b, p0b) = make_int_matrix(&[&[2, 0], &[0, 0], &[2, 0]]);
let (_d1a, p1a) = make_int_matrix(&[&[1, 1], &[1, 0], &[0, 1]]);
let (_d1b, p1b) = make_int_matrix(&[&[0, 5], &[5, 5], &[0, 0]]);
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0a, p0b]),
LayeredStore::new(vec![p1a, p1b]),
]);
// Flattened equivalent: concatenate every layer's slots into one matrix.
let (_df, mf) = make_int_matrix(&[
&[3, 0, 2, 0, 1, 1, 0, 5],
&[0, 3, 0, 0, 1, 0, 5, 5],
&[3, 3, 2, 0, 0, 1, 0, 0],
]);
let threshold = 1u32;
let (inter_p, union_p) = CountPartials::partial_threshold_jaccard(&partitioned, threshold);
let (inter_f, union_f) = CountPartials::partial_threshold_jaccard(&mf, threshold);
let n = 3;
for i in 0..n {
for j in 0..n {
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
}
}
}
// ── BitPartials ───────────────────────────────────────────────────────────
#[test]
fn layered_jaccard_matches_combined() {
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[true, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true],
&[false, true, true, false],
]);
let expected = BitPartials::jaccard_dist_matrix(&mf);
let got = BitPartials::jaccard_dist_matrix(&store);
assert!((got[[0, 1]] - expected[[0, 1]]).abs() < 1e-12, "jaccard [0,1]");
}
#[test]
fn layered_hamming_matches_combined() {
let (_d0, m0) = make_bit_matrix(&[&[true, false], &[false, true]]);
let (_d1, m1) = make_bit_matrix(&[&[true, true], &[false, false]]);
let store = LayeredStore::new(vec![m0, m1]);
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true],
&[false, true, false, false],
]);
let expected = BitPartials::hamming_dist_matrix(&mf);
let got = BitPartials::hamming_dist_matrix(&store);
assert_eq!(got[[0, 1]], expected[[0, 1]], "hamming [0,1]");
}
#[test]
fn partitioned_bit_jaccard_off_diagonal_is_pairwise() {
// Same shape as the count-based `partitioned_multilayer_threshold_jaccard_*`
// tests, but for the presence/bit path (`with_counts = false` — what
// `all_specifics` actually uses in production).
// 4 genomes, 3 partitions, 2 layers in the last one.
let (_d0, p0) = make_bit_matrix(&[
&[true, false, true],
&[false, true, true],
&[true, true, false],
&[false, false, true],
]);
let (_d1, p1) = make_bit_matrix(&[
&[true, true],
&[false, true],
&[true, false],
&[true, true],
]);
let (_d2a, p2a) = make_bit_matrix(&[
&[false, true],
&[true, true],
&[false, false],
&[true, false],
]);
let (_d2b, p2b) = make_bit_matrix(&[
&[true],
&[false],
&[true],
&[true],
]);
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
LayeredStore::new(vec![p2a, p2b]),
]);
// Flattened equivalent: concatenate every partition/layer's slots.
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true, true, false, true, true],
&[false, true, true, false, true, true, true, false],
&[true, true, false, true, false, false, false, true],
&[false, false, true, true, true, true, false, true],
]);
let (inter_p, union_p) = BitPartials::partial_jaccard(&partitioned);
let (inter_f, union_f) = BitPartials::partial_jaccard(&mf);
let n = 4;
for i in 0..n {
for j in 0..n {
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
}
}
}
#[test]
fn partitioned_bit_jaccard_packed_off_diagonal_is_pairwise() {
// Same as `partitioned_bit_jaccard_off_diagonal_is_pairwise` but every
// partition's presence matrix is packed into a single .pbmx file —
// the on-disk format actually used in production after `pack_matrices`.
use obicompactvec::pack_bit_matrix;
let (d0, _p0) = make_bit_matrix(&[
&[true, false, true],
&[false, true, true],
&[true, true, false],
&[false, false, true],
]);
pack_bit_matrix(&d0.path().join("presence")).unwrap();
let p0 = PersistentBitMatrix::open(d0.path()).unwrap();
let (d1, _p1) = make_bit_matrix(&[
&[true, true],
&[false, true],
&[true, false],
&[true, true],
]);
pack_bit_matrix(&d1.path().join("presence")).unwrap();
let p1 = PersistentBitMatrix::open(d1.path()).unwrap();
let (d2a, _p2a) = make_bit_matrix(&[
&[false, true],
&[true, true],
&[false, false],
&[true, false],
]);
pack_bit_matrix(&d2a.path().join("presence")).unwrap();
let p2a = PersistentBitMatrix::open(d2a.path()).unwrap();
let (d2b, _p2b) = make_bit_matrix(&[
&[true],
&[false],
&[true],
&[true],
]);
pack_bit_matrix(&d2b.path().join("presence")).unwrap();
let p2b = PersistentBitMatrix::open(d2b.path()).unwrap();
let partitioned = LayeredStore::new(vec![
LayeredStore::new(vec![p0]),
LayeredStore::new(vec![p1]),
LayeredStore::new(vec![p2a, p2b]),
]);
let (_df, mf) = make_bit_matrix(&[
&[true, false, true, true, true, false, true, true],
&[false, true, true, false, true, true, true, false],
&[true, true, false, true, false, false, false, true],
&[false, false, true, true, true, true, false, true],
]);
let (inter_p, union_p) = BitPartials::partial_jaccard(&partitioned);
let (inter_f, union_f) = BitPartials::partial_jaccard(&mf);
let n = 4;
for i in 0..n {
for j in 0..n {
assert_eq!(inter_p[[i, j]], inter_f[[i, j]], "inter[{i},{j}]");
assert_eq!(union_p[[i, j]], union_f[[i, j]], "union[{i},{j}]");
}
}
}