208 lines
7.6 KiB
Markdown
208 lines
7.6 KiB
Markdown
|
|
# Merge parallelism and memory pressure
|
|||
|
|
|
|||
|
|
## Problem observed
|
|||
|
|
|
|||
|
|
Running `obikmer merge` over 109 indexes (108 sources + 1 bootstrap) on a 192-core machine
|
|||
|
|
produces a fatal OOM during the `merge_partitions` stage:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
memory allocation of 9126805520 bytes failed
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
A single allocation of ~8.5 GB fails. This is not an aggregate; it is one `malloc` call
|
|||
|
|
from hashbrown during a HashMap resize.
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Root cause
|
|||
|
|
|
|||
|
|
### The merge pipeline per partition
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
source unitigs.bin
|
|||
|
|
→ iter_indexed_canonical_kmers()
|
|||
|
|
→ GraphDeBruijn::push() ← HashSet<u64> + 1 byte flags, all in RAM
|
|||
|
|
→ compute_degrees_and_mark_starts()
|
|||
|
|
→ try_for_each_unitig()
|
|||
|
|
→ unitigs.bin (new layer)
|
|||
|
|
→ Layer::build() → MPHF + evidence
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
`GraphDeBruijn` is a `FastHashMap<CanonicalKmer, AtomicU8>` — a `HashSet<u64>` with
|
|||
|
|
one flag byte per node. Neighbor lookup is implicit: 4 probes into the same map.
|
|||
|
|
No edges are stored. The full kmer set of one partition must reside in RAM
|
|||
|
|
simultaneously to compute degrees and mark unitig starts.
|
|||
|
|
|
|||
|
|
The matrix builders that follow (pass 2) are mmapped files — they do **not** consume
|
|||
|
|
significant RAM. The pressure is entirely in pass 1.
|
|||
|
|
|
|||
|
|
### Unbounded Rayon parallelism
|
|||
|
|
|
|||
|
|
With 192 cores, Rayon ran up to 192 partitions concurrently. Each partition built its
|
|||
|
|
own `GraphDeBruijn` accumulating all kmers absent from the destination. Peak memory =
|
|||
|
|
192 × peak_partition_hashset.
|
|||
|
|
|
|||
|
|
### The 8.5 GB single allocation
|
|||
|
|
|
|||
|
|
hashbrown allocates the entire backing array in one call when rehashing.
|
|||
|
|
At load factor 7/8: `capacity × (sizeof(K,V) + 1 control byte)`.
|
|||
|
|
For `(u64, AtomicU8)` with alignment: ~16 bytes per slot.
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
9 127 MB / 16 bytes ≈ 570 M slots → ~380 M new kmers in one partition
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Plausible for the largest partition of 108 Salix/Betula sources (~450 Mbp each).
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Partition size distribution
|
|||
|
|
|
|||
|
|
`obikmer utils --partition-stats` measures the sum of `unitigs.bin` file sizes
|
|||
|
|
per partition across all source indexes (pure `stat()` syscalls, negligible cost).
|
|||
|
|
|
|||
|
|
Observed on a 9-genome pilot (256 partitions):
|
|||
|
|
|
|||
|
|
| Stat | Value |
|
|||
|
|
|---|---|
|
|||
|
|
| min | 30.5 MB |
|
|||
|
|
| max | 232.1 MB |
|
|||
|
|
| mean | 40.1 MB |
|
|||
|
|
| median | 37.2 MB |
|
|||
|
|
| p95 | 47.1 MB |
|
|||
|
|
| max/median ratio | 6.23× |
|
|||
|
|
|
|||
|
|
The distribution is **bimodal with a heavy tail**:
|
|||
|
|
- 238/256 partitions in a narrow 30–50 MB band
|
|||
|
|
- 4 structurally extreme partitions (3–6× the median): 221, 233, 135, 191
|
|||
|
|
|
|||
|
|
These correspond to minimizers over-represented in repetitive regions shared across
|
|||
|
|
all sources. They are extreme in every run on this dataset.
|
|||
|
|
|
|||
|
|
With 109 sources, outlier partitions do not scale linearly: only kmers **absent from
|
|||
|
|
the destination** enter the GraphDeBruijn, and inter-source overlap is high for closely
|
|||
|
|
related species. Partition 221 is the likely trigger for the 8.5 GB crash.
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Solution: LFD scheduling + memory budget semaphore
|
|||
|
|
|
|||
|
|
### Principle
|
|||
|
|
|
|||
|
|
Pre-sort partitions by **decreasing estimated size** (First Fit Decreasing — FFD),
|
|||
|
|
then schedule them through a **continuous memory budget semaphore**. Each worker
|
|||
|
|
acquires an estimated cost before starting and releases it on completion.
|
|||
|
|
|
|||
|
|
Large partitions run first when the full budget is available; small partitions fill
|
|||
|
|
the gaps. No hard outlier threshold is needed.
|
|||
|
|
|
|||
|
|
### `MemoryBudget` (`obisys`)
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
pub struct MemoryBudget { … }
|
|||
|
|
|
|||
|
|
impl MemoryBudget {
|
|||
|
|
pub fn new(total: u64) -> Self;
|
|||
|
|
pub fn acquire(&self, cost: u64); // blocks until budget available
|
|||
|
|
pub fn release(&self, cost: u64);
|
|||
|
|
pub fn peak_active(&self) -> usize;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Non-deadlock guarantee: when `active == 0`, acquire always succeeds regardless of cost.
|
|||
|
|
Without this, a partition whose estimated cost exceeds the total budget would block forever.
|
|||
|
|
|
|||
|
|
### Adaptive expansion factor
|
|||
|
|
|
|||
|
|
The expansion factor converts raw `unitigs.bin` bytes into an estimated GraphDeBruijn
|
|||
|
|
RAM footprint. hashbrown stores each kmer as `(u64, AtomicU8)` ≈ 16 bytes/kmer at 7/8
|
|||
|
|
load factor; unitig files encode ≈ 2 bits/base. The ratio depends on average unitig
|
|||
|
|
length (short unitigs: ~2×; long unitigs: up to ~50×).
|
|||
|
|
|
|||
|
|
**Phase 1 — sequential pilot (worst partition)**
|
|||
|
|
|
|||
|
|
The largest partition runs alone first. Its actual `g.len()` seeds the expansion factor
|
|||
|
|
before any parallel job starts. `FALLBACK_EXPANSION = 4×` is used only for empty partitions.
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
let worst_g_len = dst_partition.merge_partition(worst_id, …)?;
|
|||
|
|
// ↑ now returns SKResult<usize> (was SKResult<()>)
|
|||
|
|
|
|||
|
|
let seed_expansion = worst_g_len as u64 * 16 * 1000 / worst_bytes;
|
|||
|
|
let max_expansion = AtomicU64::new(seed_expansion);
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**Phase 2 — parallel with adaptive updates**
|
|||
|
|
|
|||
|
|
```rust
|
|||
|
|
order[1..].into_par_iter().for_each(|&i| {
|
|||
|
|
let cost = partition_sizes[i] * max_expansion.load(Relaxed) / 1000;
|
|||
|
|
budget.acquire(cost);
|
|||
|
|
let g_len = dst_partition.merge_partition(i, …)?;
|
|||
|
|
budget.release(cost); // releases estimated cost, not actual
|
|||
|
|
|
|||
|
|
let actual = g_len as u64 * 16 * 1000 / partition_sizes[i];
|
|||
|
|
max_expansion.fetch_max(actual, Relaxed); // always pessimistic (max)
|
|||
|
|
});
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
`budget.release(cost)` uses the estimated cost, not the actual one. The budget tracks
|
|||
|
|
reservations, not physical RAM; each partition pays what it promised at acquisition.
|
|||
|
|
|
|||
|
|
**On the safety margin**
|
|||
|
|
|
|||
|
|
There is no separate multiplier `k`. It is redundant with `budget_fraction`: both
|
|||
|
|
reduce effective concurrency by the same amount. A single parameter is easier to
|
|||
|
|
calibrate. `budget_fraction = 0.5` (default) reserves half of available RAM for the
|
|||
|
|
OS, MPHF build, pass 2, and estimation error.
|
|||
|
|
|
|||
|
|
`--budget-fraction` is exposed as a CLI flag — the only escape hatch for pathological
|
|||
|
|
cases (extreme repetitive content, unusually long unitigs) that still cause OOM.
|
|||
|
|
|
|||
|
|
### RAM source
|
|||
|
|
|
|||
|
|
`obisys::available_memory_bytes()` — wraps `sysinfo::System::available_memory()`,
|
|||
|
|
falls back to `total / 2` on macOS when the memory compressor returns 0.
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Diagnostic report
|
|||
|
|
|
|||
|
|
After the parallel phase, `merge_partition` emits a structured report via `tracing::info!`:
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
─── merge_partitions memory report ───
|
|||
|
|
available RAM : 512.0 GB budget 50% = 256.0 GB
|
|||
|
|
expansion factor — seed: 4.2× final max: 6.1× (mean: 1.8× median: 1.6×)
|
|||
|
|
peak concurrent workers: 42
|
|||
|
|
expansion factor distribution (256 partitions with data):
|
|||
|
|
0.50× – 1.25× │██████████████████████████████ 148
|
|||
|
|
1.25× – 2.00× │████████████████████████ 82
|
|||
|
|
…
|
|||
|
|
5.50× – 6.25× │█ 2
|
|||
|
|
top partitions by actual expansion factor:
|
|||
|
|
partition 221 : 6.10× (232.1 MB unitigs → 48M kmers, reserved at 4.20×)
|
|||
|
|
partition 135 : 5.82× (127.3 MB unitigs → 24M kmers, reserved at 4.20×)
|
|||
|
|
…
|
|||
|
|
──────────────────────────────────────
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Fields useful for diagnosis:
|
|||
|
|
|
|||
|
|
| Field | Interpretation |
|
|||
|
|
|---|---|
|
|||
|
|
| `seed` vs `final max` expansion | gap indicates partitions with higher expansion than the worst-by-size |
|
|||
|
|
| `reserved at X×` | the factor used at acquisition; if much lower than actual, the budget was under-reserved for that partition |
|
|||
|
|
| `peak concurrent workers` | effective parallelism achieved under the budget constraint |
|
|||
|
|
| `mean` / `median` expansion | typical dataset characteristic; stable across runs on the same data |
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## Parameters
|
|||
|
|
|
|||
|
|
| Parameter | Default | CLI flag | Notes |
|
|||
|
|
|---|---|---|---|
|
|||
|
|
| `fallback_expansion` | 4× | — | seed for empty partitions only |
|
|||
|
|
| `budget_fraction` | 0.5 | `--budget-fraction` | reduce if OOM persists |
|
|||
|
|
| RAM source | `obisys::available_memory_bytes()` | — | falls back to `total/2` on macOS |
|