Merge pull request 'feat: implement NUMA-aware worker pools for merge command' (#29) from push-wusvurukprsr into main

Reviewed-on: #29
This commit was merged in pull request #29.
This commit is contained in:
2026-06-14 21:57:21 +00:00
9 changed files with 654 additions and 34 deletions
+97
View File
@@ -0,0 +1,97 @@
# NUMA-aware worker pools for merge
## Problem
The merge command's bottleneck is `compute_degrees` in `obidebruinj`: a random pointer-chase over 2070 M node hash maps that saturates DRAM bandwidth. When multiple partition workers run concurrently, they contend for the shared memory bus, causing super-linear slowdown (measured: 0.016 µs/node solo → 0.95 µs/node with 45 concurrent workers, ×60 degradation).
Modern HPC nodes are multi-socket NUMA machines (observed: 2 sockets × 4 NUMA nodes × 24 cores = 192 cores). Cross-NUMA memory traffic compounds the contention:
- Full 192-core run: ~15 min/partition (×10 worse than M3 Mac)
- `taskset` restricted to 4 NUMA nodes (96 cores): ~90 s/partition
- OAR job on 1 NUMA node (24 cores): ~80 s/partition, same throughput as 96 cores
**Conclusion**: the bottleneck is memory bandwidth per NUMA node, not core count. 24 cores on one NUMA node achieve the same throughput as 96 cores across four.
## Strategy
Run N worker groups in parallel, one per NUMA node, each with its own Rayon thread pool whose threads are pinned to the NUMA node's CPUs. Linux's first-touch policy then places graph allocations on local DRAM automatically — no explicit NUMA allocator needed.
Expected throughput: N × single-NUMA throughput. On the 8-NUMA-node HPC: 8 × ~80 s = 910 min total instead of >60 min with the current single-pool approach.
## Rayon thread pool isolation
Rayon provides `ThreadPool::install(|| { ... })`: any Rayon call (`par_iter`, `current_num_threads`, etc.) inside the closure uses *that* pool exclusively. Wrapping `merge_partition` in `pool.install()` redirects all downstream Rayon calls — including those in `debruijn.rs` and `partition.rs` — without touching those crates.
```rust
// worker thread, assigned to NUMA pool `pool`
pool.install(|| {
dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence)
})
```
`rayon::current_num_threads()` inside `merge_partition` will return the pool size (e.g. 24), not the global thread count — which is the right value for buffer sizing.
## Thread pinning
`ThreadPoolBuilder::spawn_handler` provides a hook executed for each thread at creation. Inside, `libc::sched_setaffinity` pins the thread to a CPU set:
```rust
let cpus: Vec<usize> = numa_node_cpus(node); // from /sys/devices/system/node/nodeN/cpulist
rayon::ThreadPoolBuilder::new()
.num_threads(cpus.len())
.spawn_handler(move |thread| {
let mut b = std::thread::Builder::new();
std::thread::Builder::new().spawn(move || {
pin_to_cpus(&cpus); // sched_setaffinity via libc
thread.run()
})?;
Ok(())
})
.build()?
```
NUMA topology is read from `/sys/devices/system/node/node*/cpulist` — no `libnuma` dependency required. If the `numa` crate is linked, `numa_available()` / `numa_run_on_node()` are an alternative.
## Memory locality
Linux allocates pages on the NUMA node of the thread that first writes them (first-touch policy). Once Rayon threads are pinned to node N, all graph data built by those threads lands on node N's DRAM. No changes to the allocator, no explicit `numa_alloc_onnode` calls.
## Adaptive spawn criterion
The current criterion uses `std::thread::available_parallelism()` (returns total cores = 192) and `max_workers = n_cores / 2`. With NUMA pools:
- `n_cores` per pool = cores per NUMA node (e.g. 24)
- `max_workers` per pool = pool size / 2 (e.g. 12)
- CPU efficiency is measured per pool, not globally
Each NUMA group runs its own independent adaptive pool. Workers are distributed across NUMA groups round-robin or by workload (partition assignment can be pre-split by NUMA group index).
## Required changes
| File | Change |
|------|--------|
| `obikindex/src/merge.rs` | Detect NUMA topology; build N `ThreadPool`s with pinned threads; assign each pre-spawned worker to a pool; wrap `merge_partition` in `pool.install()` |
| `obikindex/src/merge.rs` | Replace `available_parallelism()` with per-NUMA core count for spawn criterion |
| `obikpartitionner/src/merge_layer.rs` | No change — `merge_partition` already works inside any Rayon context |
| `obidebruinj/src/debruijn.rs` | No change — `par_iter` and `current_num_threads` are pool-context-aware |
| `obikpartitionner/src/partition.rs` | No change — same reason |
## Platform guard
NUMA pinning is Linux-only. The fallback is the current single global pool:
```rust
#[cfg(target_os = "linux")]
fn build_numa_pools() -> Option<Vec<rayon::ThreadPool>> { ... }
#[cfg(not(target_os = "linux"))]
fn build_numa_pools() -> Option<Vec<rayon::ThreadPool>> { None }
```
When `build_numa_pools()` returns `None` (macOS, UMA, or single-socket), `merge.rs` uses the existing code path unchanged.
## Open questions
- **Partition assignment**: split partitions by NUMA group up-front (static) or use a shared queue with per-group workers stealing from a common pool? Static split is simpler; stealing is better for load balance when partitions vary widely in size.
- **Intra-NUMA adaptive criterion**: with 24 cores and ~35 effective workers per NUMA node, the current marginal-gain criterion needs re-tuning or can be left as-is with per-pool `n_cores = 24`.
- **I/O**: partition data (unitig files) is on a shared filesystem. With 8 concurrent NUMA groups, I/O concurrency increases 8× — need to verify the filesystem (Lustre or local SSD) can absorb it without becoming the new bottleneck.
+68
View File
@@ -0,0 +1,68 @@
# Installation
## Prerequisites
### Rust toolchain
`obikmer` requires **Rust 1.85 or later** (edition 2024). Install or update via [rustup](https://rustup.rs):
```bash
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
rustup update stable
```
### C build environment (required for hwloc)
`obikmer` embeds [hwloc](https://www.open-mpi.org/projects/hwloc/) (Hardware Locality) for NUMA-aware thread placement on multi-socket machines. hwloc is built from source at compile time via the `vendored` feature of the `hwlocality` crate. This requires a standard C build environment.
#### Linux (Debian/Ubuntu)
```bash
apt install build-essential automake libtool autoconf pkg-config
```
#### Linux (RHEL/Rocky/AlmaLinux)
```bash
dnf install gcc make automake libtool autoconf pkgconfig
```
#### HPC clusters
Most HPC clusters provide these tools via the module system:
```bash
module load gcc automake libtool autoconf
```
If in doubt, check whether `autoreconf --version` and `libtool --version` return successfully.
#### macOS
```bash
brew install automake libtool autoconf pkg-config
```
## Building
```bash
git clone <repository-url>
cd obikmer/src
cargo build --release
```
The compiled binary is at `target/release/obikmer`.
## NUMA support
NUMA-aware thread placement is active automatically on multi-socket Linux machines (detected at runtime via hwloc). No special build flag is required — the detection is built in and falls back gracefully to the single-pool adaptive strategy on:
- macOS (Apple Silicon, unified memory)
- single-socket Linux machines
- any system where hwloc reports only one NUMA node
## Verifying the installation
```bash
obikmer --help
```
+2
View File
@@ -29,6 +29,7 @@ extra_javascript:
nav:
- Home: index.md
- Installation: installation.md
- Theory:
- Kmers and super-kmers: kmers.md
- DNA encoding: theory/encoding.md
@@ -55,6 +56,7 @@ nav:
- Architecture:
- Sequences: architecture/sequences/invariant.md
- Kmer index: architecture/index_architecture.md
- NUMA-aware worker pools: architecture/numa_worker_pools.md
watch:
- docmd
+299 -5
View File
@@ -128,6 +128,12 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ad8689a486416c401ea15715a4694de30054248ec627edbf31f49cb64ee4086"
[[package]]
name = "arrayvec"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "as-slice"
version = "0.2.1"
@@ -143,6 +149,15 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "autotools"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef941527c41b0fc0dd48511a8154cd5fc7e29200a0ff8b7203c5d777dbc795cf"
dependencies = [
"cc",
]
[[package]]
name = "backtrace"
version = "0.3.76"
@@ -224,6 +239,15 @@ dependencies = [
"generic-array",
]
[[package]]
name = "block-buffer"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2f6c7dbe95a6ed67ad9f18e57daf93a2f034c524b99fd2b76d18fdfeb6660aa"
dependencies = [
"hybrid-array",
]
[[package]]
name = "block-pseudorand"
version = "0.1.2"
@@ -415,6 +439,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9"
[[package]]
name = "cmake"
version = "0.1.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678"
dependencies = [
"cc",
]
[[package]]
name = "colorchoice"
version = "1.0.5"
@@ -464,6 +497,21 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "const-oid"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c"
[[package]]
name = "convert_case"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
@@ -488,6 +536,15 @@ dependencies = [
"libc",
]
[[package]]
name = "cpufeatures"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201"
dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.5.0"
@@ -601,6 +658,15 @@ dependencies = [
"typenum",
]
[[package]]
name = "crypto-common"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce6e4c961d6cd6c9a86db418387425e8bdeaf05b3c8bc1411e6dca4c252f1453"
dependencies = [
"hybrid-array",
]
[[package]]
name = "csv"
version = "1.4.0"
@@ -640,14 +706,48 @@ dependencies = [
"uuid",
]
[[package]]
name = "derive_more"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d751e9e49156b02b44f9c1815bcb94b984cdcc4396ecc32521c739452808b134"
dependencies = [
"derive_more-impl",
]
[[package]]
name = "derive_more-impl"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "799a97264921d8623a957f6c3b9011f3b5492f557bbb7a5a19b7fa6d06ba8dcb"
dependencies = [
"convert_case",
"proc-macro2",
"quote",
"rustc_version",
"syn",
"unicode-xid",
]
[[package]]
name = "digest"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"crypto-common",
"block-buffer 0.10.4",
"crypto-common 0.1.7",
]
[[package]]
name = "digest"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1dd6dbb5841937940781866fa1281a1ff7bd3bf827091440879f9994983d5c2"
dependencies = [
"block-buffer 0.12.1",
"const-oid",
"crypto-common 0.2.2",
]
[[package]]
@@ -742,6 +842,16 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
[[package]]
name = "filetime"
version = "0.2.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c287a33c7f0a620c38e641e7f60827713987b3c0f26e8ddc9462cc69cf75759"
dependencies = [
"cfg-if",
"libc",
]
[[package]]
name = "find-msvc-tools"
version = "0.1.9"
@@ -916,6 +1026,65 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
[[package]]
name = "http"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6970f50e31d6fc17d3fa27329444bfa74e196cf62e95052a3f6fee181dba6425"
dependencies = [
"bytes",
"itoa",
]
[[package]]
name = "httparse"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "hwlocality"
version = "1.0.0-alpha.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c2e65a48d3b300843ac84a2fe8e166bb5a5b00f30054593bcee8157e4b465fd"
dependencies = [
"arrayvec",
"bitflags 2.11.1",
"derive_more",
"errno",
"hwlocality-sys",
"libc",
"strum",
"thiserror 2.0.18",
"windows-sys 0.61.2",
]
[[package]]
name = "hwlocality-sys"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a83c43a772c1f774b806deb44891c2a9578eb33cec48aad513482e0da3d4d4"
dependencies = [
"autotools",
"cmake",
"flate2",
"libc",
"pkg-config",
"sha3",
"tar",
"ureq 3.3.0",
"windows-sys 0.61.2",
]
[[package]]
name = "hybrid-array"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9155a582abd142abc056962c29e3ce5ff2ad5469f4246b537ed42c5deba857da"
dependencies = [
"typenum",
]
[[package]]
name = "icu_collections"
version = "2.2.0"
@@ -1145,6 +1314,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "keccak"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e24a010dd405bd7ed803e5253182815b41bf2e6a80cc3bfc066658e03a198aa"
dependencies = [
"cfg-if",
"cpufeatures 0.3.0",
]
[[package]]
name = "kodama"
version = "0.2.3"
@@ -1508,6 +1687,7 @@ name = "obikindex"
version = "0.1.0"
dependencies = [
"crossbeam-channel",
"hwlocality",
"indicatif",
"ndarray",
"obicompactvec",
@@ -1636,7 +1816,7 @@ dependencies = [
"regex",
"tracing",
"tracing-subscriber",
"ureq",
"ureq 2.12.1",
]
[[package]]
@@ -2177,6 +2357,15 @@ version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe"
[[package]]
name = "rustc_version"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
dependencies = [
"semver",
]
[[package]]
name = "rustix"
version = "1.1.4"
@@ -2263,6 +2452,12 @@ dependencies = [
"syn",
]
[[package]]
name = "semver"
version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd"
[[package]]
name = "serde"
version = "1.0.228"
@@ -2313,8 +2508,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
"cpufeatures 0.2.17",
"digest 0.10.7",
]
[[package]]
name = "sha3"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be176f1a57ce4e3d31c1a166222d9768de5954f811601fb7ca06fc8203905ce1"
dependencies = [
"digest 0.11.3",
"keccak",
]
[[package]]
@@ -2375,6 +2580,27 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "strum"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9628de9b8791db39ceda2b119bbe13134770b56c138ec1d3af810d045c04f9bd"
dependencies = [
"strum_macros",
]
[[package]]
name = "strum_macros"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab85eea0270ee17587ed4156089e10b9e6880ee688791d45a905f5b1ca36f664"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "subtle"
version = "2.6.1"
@@ -2470,6 +2696,17 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "tar"
version = "0.4.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6221d9a6003c78398e3b239969f352578258df48c8eb051caadae0015bc840"
dependencies = [
"filetime",
"libc",
"xattr",
]
[[package]]
name = "tempfile"
version = "3.27.0"
@@ -2645,12 +2882,24 @@ version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"
[[package]]
name = "unicode-segmentation"
version = "1.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6f5d3c3b1bf09027a88a6bc961fc00497d651009560b5463668dc81b0fa87a8"
[[package]]
name = "unicode-width"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254"
[[package]]
name = "unicode-xid"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
[[package]]
name = "untrusted"
version = "0.9.0"
@@ -2673,6 +2922,35 @@ dependencies = [
"webpki-roots 0.26.11",
]
[[package]]
name = "ureq"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dea7109cdcd5864d4eeb1b58a1648dc9bf520360d7af16ec26d0a9354bafcfc0"
dependencies = [
"base64",
"flate2",
"log",
"percent-encoding",
"rustls",
"rustls-pki-types",
"ureq-proto",
"utf8-zero",
"webpki-roots 1.0.7",
]
[[package]]
name = "ureq-proto"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e994ba84b0bd1b1b0cf92878b7ef898a5c1760108fe7b6010327e274917a808c"
dependencies = [
"base64",
"http",
"httparse",
"log",
]
[[package]]
name = "url"
version = "2.5.8"
@@ -2685,6 +2963,12 @@ dependencies = [
"serde",
]
[[package]]
name = "utf8-zero"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8c0a043c9540bae7c578c88f91dda8bd82e59ae27c21baca69c8b191aaf5a6e"
[[package]]
name = "utf8_iter"
version = "1.0.4"
@@ -3110,6 +3394,16 @@ dependencies = [
"tap",
]
[[package]]
name = "xattr"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156"
dependencies = [
"libc",
"rustix",
]
[[package]]
name = "xxhash-rust"
version = "0.8.15"
+1
View File
@@ -17,3 +17,4 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
indicatif = "0.17"
tracing = "0.1.44"
hwlocality = { version = "1.0.0-alpha.11", features = ["vendored"] }
+1
View File
@@ -5,6 +5,7 @@ mod distance;
mod dump;
mod index;
mod merge;
mod numa;
mod rebuild;
mod reindex;
mod select;
+72 -24
View File
@@ -242,17 +242,27 @@ impl KmerIndex {
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
// ── Adaptive worker pool ──────────────────────────────────────────
// Start with 1 worker thread. After each completed partition,
// measure CPU efficiency (via getrusage delta). If efficiency is
// below the spawn threshold and more partitions remain, spawn one
// additional worker. Workers share a crossbeam channel of partition
// IDs; each reports (id, g_len, duration) on a result channel.
// Default (non-NUMA): start with 1 worker, grow adaptively up to
// n_cores/2 based on CPU efficiency.
//
// NUMA mode (Linux, multi-node): one pinned Rayon ThreadPool per
// NUMA node, workers_per_node workers per node, all pre-activated.
// No adaptive spawn: the optimal count is fixed by memory bandwidth.
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let max_workers = (n_cores / 2).max(1);
let _ = budget_fraction; // kept in signature for CLI compatibility
let numa = crate::numa::build();
// effective_max_workers: slots to pre-spawn.
// numa_all_active: whether to activate all slots immediately.
let (effective_max_workers, numa_all_active) = match &numa {
Some(ns) => (ns.pools.len() * ns.workers_per_node(), true),
None => (max_workers, false),
};
let (part_tx, part_rx) = unbounded::<usize>();
let (result_tx, result_rx) =
unbounded::<(usize, Result<usize, obiskio::SKError>, Duration)>();
@@ -276,25 +286,54 @@ impl KmerIndex {
let srcs = &srcs;
let evidence = &evidence;
if let Some(ns) = &numa {
debug!(
"NUMA mode: {} node(s) × {} worker(s)/node = {} total workers",
ns.pools.len(),
ns.workers_per_node(),
effective_max_workers,
);
}
std::thread::scope(|s| -> OKIResult<()> {
// Pre-spawn max_workers threads; each waits for an activation
// signal before consuming from part_rx.
for _ in 0..max_workers {
// Pre-spawn threads. In NUMA mode each thread is pinned to its
// node's CPUs and wraps merge_partition in pool.install() so
// that all Rayon calls use the node-local ThreadPool, and
// Linux first-touch places graph allocations in local DRAM.
for worker_idx in 0..effective_max_workers {
let prx = part_rx.clone();
let rtx = result_tx.clone();
let arx = activate_rx.clone();
// Per-worker NUMA config: (pool, cpus) for this slot.
let numa_config: Option<(std::sync::Arc<rayon::ThreadPool>, Vec<usize>)> =
numa.as_ref().map(|ns| {
let wpn = ns.workers_per_node();
let node = worker_idx / wpn;
(
std::sync::Arc::clone(&ns.pools[node]),
ns.cpus_per_node[node].clone(),
)
});
s.spawn(move || {
if let Some((_, ref cpus)) = numa_config {
crate::numa::pin_current_thread(cpus);
}
if arx.recv().is_ok() {
for i in &prx {
let t = Instant::now();
let r = dst_partition.merge_partition(
i,
srcs,
mode,
n_dst_genomes,
block_bits,
evidence,
);
let r = if let Some((ref pool, _)) = numa_config {
pool.install(|| {
dst_partition.merge_partition(
i, srcs, mode, n_dst_genomes, block_bits, evidence,
)
})
} else {
dst_partition.merge_partition(
i, srcs, mode, n_dst_genomes, block_bits, evidence,
)
};
rtx.send((i, r, t.elapsed())).ok();
}
}
@@ -302,9 +341,17 @@ impl KmerIndex {
}
drop(result_tx);
// Activate first worker immediately.
activate_tx.send(()).ok();
n_workers = 1;
if numa_all_active {
// NUMA: activate every worker immediately.
for _ in 0..effective_max_workers {
activate_tx.send(()).ok();
}
n_workers = effective_max_workers;
} else {
// Non-NUMA: activate first worker, grow adaptively.
activate_tx.send(()).ok();
n_workers = 1;
}
const SPAWN_POLL: Duration = Duration::from_secs(20);
@@ -312,11 +359,10 @@ impl KmerIndex {
while completed < n_partitions {
let result = result_rx.recv_timeout(SPAWN_POLL);
// On timeout: no partition finished yet, just check efficiency.
let (i, r, dur) = match result {
Ok(v) => v,
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
if n_workers < max_workers {
if !numa_all_active && n_workers < effective_max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
debug!(
@@ -353,7 +399,7 @@ impl KmerIndex {
});
completed += 1;
if n_workers < max_workers && completed < n_partitions {
if !numa_all_active && n_workers < effective_max_workers && completed < n_partitions {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) {
debug!(
@@ -369,7 +415,9 @@ impl KmerIndex {
}
}
}
// Close activate_tx: dormant workers exit cleanly.
// Dropping activate_tx signals dormant workers to exit cleanly
// (non-NUMA). In NUMA mode all workers were already activated so
// this drop is just cleanup.
drop(activate_tx);
Ok(())
})?;
@@ -377,7 +425,7 @@ impl KmerIndex {
pb.finish_and_clear();
// ── Diagnostic report ─────────────────────────────────────────────
print_merge_partition_report(&part_stats, n_workers, max_workers);
print_merge_partition_report(&part_stats, n_workers, effective_max_workers);
rep.push(t.stop());
}
+102
View File
@@ -0,0 +1,102 @@
// NUMA-aware Rayon thread pools via hwlocality.
//
// Detects NUMA topology using hwloc (cross-platform: Linux, macOS, etc.) and
// builds one Rayon ThreadPool per NUMA node with threads pinned to that node's
// CPUs. Linux first-touch policy then places graph allocations in local DRAM
// automatically — no explicit memory binding needed.
//
// Returns None when:
// - hwloc topology initialisation fails
// - the system has only one NUMA node (UMA, Apple Silicon, single-socket)
// - any per-node pool fails to build
use std::sync::Arc;
use hwlocality::Topology;
use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet;
use hwlocality::object::types::ObjectType;
use tracing::debug;
// ── Public interface ──────────────────────────────────────────────────────────
pub struct NumaSetup {
pub pools: Vec<Arc<rayon::ThreadPool>>,
/// CPU indices for each NUMA node, in node order.
pub cpus_per_node: Vec<Vec<usize>>,
}
impl NumaSetup {
/// Workers to activate per NUMA node.
/// Empirically ~3 workers saturate one node's memory bandwidth.
pub fn workers_per_node(&self) -> usize {
self.cpus_per_node
.first()
.map(|c| (c.len() / 8).max(3).min(8))
.unwrap_or(3)
}
}
/// Detect NUMA topology and build per-node Rayon pools.
/// Returns None on UMA systems, single-node machines, or on failure.
pub fn build() -> Option<NumaSetup> {
let topology = Topology::new().ok()?;
let nodes: Vec<Vec<usize>> = topology
.objects_with_type(ObjectType::NUMANode)
.filter_map(|obj| obj.cpuset())
.map(|cpuset| {
cpuset
.iter_set()
.map(|idx| usize::from(idx))
.collect::<Vec<_>>()
})
.filter(|v| !v.is_empty())
.collect();
if nodes.len() <= 1 {
return None;
}
debug!(
"NUMA topology: {} node(s), {} core(s)/node",
nodes.len(),
nodes.first().map_or(0, |v| v.len()),
);
let pools = nodes
.iter()
.map(|cpus| build_pool(cpus).map(Arc::new))
.collect::<Option<Vec<_>>>()?;
Some(NumaSetup { pools, cpus_per_node: nodes })
}
/// Bind the calling thread to `cpu_indices` using hwloc.
/// Silently returns on any error so the thread still runs, just unbound.
pub fn pin_current_thread(cpu_indices: &[usize]) {
let Ok(topology) = Topology::new() else { return };
let mut cpuset = CpuSet::new();
for &idx in cpu_indices {
cpuset.set(idx);
}
let _ = topology.bind_cpu(&cpuset, CpuBindingFlags::THREAD);
}
// ── Internal helpers ──────────────────────────────────────────────────────────
fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
let cpus = cpus.to_vec();
rayon::ThreadPoolBuilder::new()
.num_threads(cpus.len())
.spawn_handler(move |thread| {
let cpus = cpus.clone();
std::thread::Builder::new().spawn(move || {
pin_current_thread(&cpus);
thread.run();
})?;
Ok(())
})
.build()
.ok()
}
+12 -5
View File
@@ -217,6 +217,9 @@ impl KmerPartition {
}
}
let n_src_layers = unitig_paths.len();
debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)");
enum Pass1Data {
File(PathBuf),
Batch(Vec<CanonicalKmer>),
@@ -224,7 +227,9 @@ impl KmerPartition {
}
const BATCH: usize = 4096;
let n_workers = std::thread::available_parallelism().map_or(4, |n| n.get());
// Inside pool.install() this returns the per-NUMA pool size; outside
// it returns the global pool size. Both are the right value here.
let n_workers = rayon::current_num_threads().max(1);
let capacity = n_workers * 8;
let dst_filter = Arc::clone(&dst_map);
@@ -311,18 +316,20 @@ impl KmerPartition {
fs::create_dir_all(&new_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?;
debug!("partition {i}: unitig traversal start — {} nodes", g.len());
let mut n_unitigs = 0usize;
g.try_for_each_unitig(|unitig| {
n_unitigs += 1;
uw.write(unitig)
})?;
debug!("partition {i}: unitig writer closing");
uw.close()?;
debug!("partition {i}: unitig writer closed — dropping graph ({} nodes)", g.len());
let n = g.len();
let n_nodes = g.len();
debug!("partition {i}: unitig writer closed — dropping graph ({n_nodes} nodes)");
drop(g);
debug!("partition {i}: graph dropped — starting MPHF build ({n} unitigs)");
debug!("partition {i}: graph dropped — starting MPHF build ({n_unitigs} unitigs)");
Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
debug!("partition {i}: MPHF build done");
n
n_nodes
} else {
drop(g);
0