diff --git a/docmd/architecture/numa_worker_pools.md b/docmd/architecture/numa_worker_pools.md new file mode 100644 index 0000000..434d513 --- /dev/null +++ b/docmd/architecture/numa_worker_pools.md @@ -0,0 +1,97 @@ +# NUMA-aware worker pools for merge + +## Problem + +The merge command's bottleneck is `compute_degrees` in `obidebruinj`: a random pointer-chase over 20–70 M node hash maps that saturates DRAM bandwidth. When multiple partition workers run concurrently, they contend for the shared memory bus, causing super-linear slowdown (measured: 0.016 µs/node solo → 0.95 µs/node with 4–5 concurrent workers, ×60 degradation). + +Modern HPC nodes are multi-socket NUMA machines (observed: 2 sockets × 4 NUMA nodes × 24 cores = 192 cores). Cross-NUMA memory traffic compounds the contention: + +- Full 192-core run: ~15 min/partition (×10 worse than M3 Mac) +- `taskset` restricted to 4 NUMA nodes (96 cores): ~90 s/partition +- OAR job on 1 NUMA node (24 cores): ~80 s/partition, same throughput as 96 cores + +**Conclusion**: the bottleneck is memory bandwidth per NUMA node, not core count. 24 cores on one NUMA node achieve the same throughput as 96 cores across four. + +## Strategy + +Run N worker groups in parallel, one per NUMA node, each with its own Rayon thread pool whose threads are pinned to the NUMA node's CPUs. Linux's first-touch policy then places graph allocations on local DRAM automatically — no explicit NUMA allocator needed. + +Expected throughput: N × single-NUMA throughput. On the 8-NUMA-node HPC: 8 × ~80 s = 9–10 min total instead of >60 min with the current single-pool approach. + +## Rayon thread pool isolation + +Rayon provides `ThreadPool::install(|| { ... })`: any Rayon call (`par_iter`, `current_num_threads`, etc.) inside the closure uses *that* pool exclusively. Wrapping `merge_partition` in `pool.install()` redirects all downstream Rayon calls — including those in `debruijn.rs` and `partition.rs` — without touching those crates. + +```rust +// worker thread, assigned to NUMA pool `pool` +pool.install(|| { + dst_partition.merge_partition(i, srcs, mode, n_dst_genomes, block_bits, evidence) +}) +``` + +`rayon::current_num_threads()` inside `merge_partition` will return the pool size (e.g. 24), not the global thread count — which is the right value for buffer sizing. + +## Thread pinning + +`ThreadPoolBuilder::spawn_handler` provides a hook executed for each thread at creation. Inside, `libc::sched_setaffinity` pins the thread to a CPU set: + +```rust +let cpus: Vec = numa_node_cpus(node); // from /sys/devices/system/node/nodeN/cpulist +rayon::ThreadPoolBuilder::new() + .num_threads(cpus.len()) + .spawn_handler(move |thread| { + let mut b = std::thread::Builder::new(); + std::thread::Builder::new().spawn(move || { + pin_to_cpus(&cpus); // sched_setaffinity via libc + thread.run() + })?; + Ok(()) + }) + .build()? +``` + +NUMA topology is read from `/sys/devices/system/node/node*/cpulist` — no `libnuma` dependency required. If the `numa` crate is linked, `numa_available()` / `numa_run_on_node()` are an alternative. + +## Memory locality + +Linux allocates pages on the NUMA node of the thread that first writes them (first-touch policy). Once Rayon threads are pinned to node N, all graph data built by those threads lands on node N's DRAM. No changes to the allocator, no explicit `numa_alloc_onnode` calls. + +## Adaptive spawn criterion + +The current criterion uses `std::thread::available_parallelism()` (returns total cores = 192) and `max_workers = n_cores / 2`. With NUMA pools: + +- `n_cores` per pool = cores per NUMA node (e.g. 24) +- `max_workers` per pool = pool size / 2 (e.g. 12) +- CPU efficiency is measured per pool, not globally + +Each NUMA group runs its own independent adaptive pool. Workers are distributed across NUMA groups round-robin or by workload (partition assignment can be pre-split by NUMA group index). + +## Required changes + +| File | Change | +|------|--------| +| `obikindex/src/merge.rs` | Detect NUMA topology; build N `ThreadPool`s with pinned threads; assign each pre-spawned worker to a pool; wrap `merge_partition` in `pool.install()` | +| `obikindex/src/merge.rs` | Replace `available_parallelism()` with per-NUMA core count for spawn criterion | +| `obikpartitionner/src/merge_layer.rs` | No change — `merge_partition` already works inside any Rayon context | +| `obidebruinj/src/debruijn.rs` | No change — `par_iter` and `current_num_threads` are pool-context-aware | +| `obikpartitionner/src/partition.rs` | No change — same reason | + +## Platform guard + +NUMA pinning is Linux-only. The fallback is the current single global pool: + +```rust +#[cfg(target_os = "linux")] +fn build_numa_pools() -> Option> { ... } + +#[cfg(not(target_os = "linux"))] +fn build_numa_pools() -> Option> { None } +``` + +When `build_numa_pools()` returns `None` (macOS, UMA, or single-socket), `merge.rs` uses the existing code path unchanged. + +## Open questions + +- **Partition assignment**: split partitions by NUMA group up-front (static) or use a shared queue with per-group workers stealing from a common pool? Static split is simpler; stealing is better for load balance when partitions vary widely in size. +- **Intra-NUMA adaptive criterion**: with 24 cores and ~3–5 effective workers per NUMA node, the current marginal-gain criterion needs re-tuning or can be left as-is with per-pool `n_cores = 24`. +- **I/O**: partition data (unitig files) is on a shared filesystem. With 8 concurrent NUMA groups, I/O concurrency increases 8× — need to verify the filesystem (Lustre or local SSD) can absorb it without becoming the new bottleneck. diff --git a/docmd/installation.md b/docmd/installation.md new file mode 100644 index 0000000..36307bb --- /dev/null +++ b/docmd/installation.md @@ -0,0 +1,68 @@ +# Installation + +## Prerequisites + +### Rust toolchain + +`obikmer` requires **Rust 1.85 or later** (edition 2024). Install or update via [rustup](https://rustup.rs): + +```bash +curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh +rustup update stable +``` + +### C build environment (required for hwloc) + +`obikmer` embeds [hwloc](https://www.open-mpi.org/projects/hwloc/) (Hardware Locality) for NUMA-aware thread placement on multi-socket machines. hwloc is built from source at compile time via the `vendored` feature of the `hwlocality` crate. This requires a standard C build environment. + +#### Linux (Debian/Ubuntu) + +```bash +apt install build-essential automake libtool autoconf pkg-config +``` + +#### Linux (RHEL/Rocky/AlmaLinux) + +```bash +dnf install gcc make automake libtool autoconf pkgconfig +``` + +#### HPC clusters + +Most HPC clusters provide these tools via the module system: + +```bash +module load gcc automake libtool autoconf +``` + +If in doubt, check whether `autoreconf --version` and `libtool --version` return successfully. + +#### macOS + +```bash +brew install automake libtool autoconf pkg-config +``` + +## Building + +```bash +git clone +cd obikmer/src +cargo build --release +``` + +The compiled binary is at `target/release/obikmer`. + +## NUMA support + +NUMA-aware thread placement is active automatically on multi-socket Linux machines (detected at runtime via hwloc). No special build flag is required — the detection is built in and falls back gracefully to the single-pool adaptive strategy on: + +- macOS (Apple Silicon, unified memory) +- single-socket Linux machines +- any system where hwloc reports only one NUMA node + +## Verifying the installation + +```bash +obikmer --help +``` diff --git a/mkdocs.yml b/mkdocs.yml index 6e60689..8a4498d 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -29,6 +29,7 @@ extra_javascript: nav: - Home: index.md + - Installation: installation.md - Theory: - Kmers and super-kmers: kmers.md - DNA encoding: theory/encoding.md @@ -55,6 +56,7 @@ nav: - Architecture: - Sequences: architecture/sequences/invariant.md - Kmer index: architecture/index_architecture.md + - NUMA-aware worker pools: architecture/numa_worker_pools.md watch: - docmd diff --git a/src/Cargo.lock b/src/Cargo.lock index cfe335d..2983231 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -128,6 +128,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ad8689a486416c401ea15715a4694de30054248ec627edbf31f49cb64ee4086" +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "as-slice" version = "0.2.1" @@ -143,6 +149,15 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "autotools" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef941527c41b0fc0dd48511a8154cd5fc7e29200a0ff8b7203c5d777dbc795cf" +dependencies = [ + "cc", +] + [[package]] name = "backtrace" version = "0.3.76" @@ -224,6 +239,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-buffer" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2f6c7dbe95a6ed67ad9f18e57daf93a2f034c524b99fd2b76d18fdfeb6660aa" +dependencies = [ + "hybrid-array", +] + [[package]] name = "block-pseudorand" version = "0.1.2" @@ -415,6 +439,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" +[[package]] +name = "cmake" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.5" @@ -464,6 +497,21 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "const-oid" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" + +[[package]] +name = "convert_case" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -488,6 +536,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -601,6 +658,15 @@ dependencies = [ "typenum", ] +[[package]] +name = "crypto-common" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce6e4c961d6cd6c9a86db418387425e8bdeaf05b3c8bc1411e6dca4c252f1453" +dependencies = [ + "hybrid-array", +] + [[package]] name = "csv" version = "1.4.0" @@ -640,14 +706,48 @@ dependencies = [ "uuid", ] +[[package]] +name = "derive_more" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d751e9e49156b02b44f9c1815bcb94b984cdcc4396ecc32521c739452808b134" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "799a97264921d8623a957f6c3b9011f3b5492f557bbb7a5a19b7fa6d06ba8dcb" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn", + "unicode-xid", +] + [[package]] name = "digest" version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer", - "crypto-common", + "block-buffer 0.10.4", + "crypto-common 0.1.7", +] + +[[package]] +name = "digest" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1dd6dbb5841937940781866fa1281a1ff7bd3bf827091440879f9994983d5c2" +dependencies = [ + "block-buffer 0.12.1", + "const-oid", + "crypto-common 0.2.2", ] [[package]] @@ -742,6 +842,16 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +[[package]] +name = "filetime" +version = "0.2.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c287a33c7f0a620c38e641e7f60827713987b3c0f26e8ddc9462cc69cf75759" +dependencies = [ + "cfg-if", + "libc", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -916,6 +1026,65 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" +[[package]] +name = "http" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6970f50e31d6fc17d3fa27329444bfa74e196cf62e95052a3f6fee181dba6425" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + +[[package]] +name = "hwlocality" +version = "1.0.0-alpha.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c2e65a48d3b300843ac84a2fe8e166bb5a5b00f30054593bcee8157e4b465fd" +dependencies = [ + "arrayvec", + "bitflags 2.11.1", + "derive_more", + "errno", + "hwlocality-sys", + "libc", + "strum", + "thiserror 2.0.18", + "windows-sys 0.61.2", +] + +[[package]] +name = "hwlocality-sys" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a83c43a772c1f774b806deb44891c2a9578eb33cec48aad513482e0da3d4d4" +dependencies = [ + "autotools", + "cmake", + "flate2", + "libc", + "pkg-config", + "sha3", + "tar", + "ureq 3.3.0", + "windows-sys 0.61.2", +] + +[[package]] +name = "hybrid-array" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9155a582abd142abc056962c29e3ce5ff2ad5469f4246b537ed42c5deba857da" +dependencies = [ + "typenum", +] + [[package]] name = "icu_collections" version = "2.2.0" @@ -1145,6 +1314,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "keccak" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e24a010dd405bd7ed803e5253182815b41bf2e6a80cc3bfc066658e03a198aa" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", +] + [[package]] name = "kodama" version = "0.2.3" @@ -1508,6 +1687,7 @@ name = "obikindex" version = "0.1.0" dependencies = [ "crossbeam-channel", + "hwlocality", "indicatif", "ndarray", "obicompactvec", @@ -1636,7 +1816,7 @@ dependencies = [ "regex", "tracing", "tracing-subscriber", - "ureq", + "ureq 2.12.1", ] [[package]] @@ -2177,6 +2357,15 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "1.1.4" @@ -2263,6 +2452,12 @@ dependencies = [ "syn", ] +[[package]] +name = "semver" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" + [[package]] name = "serde" version = "1.0.228" @@ -2313,8 +2508,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", - "digest", + "cpufeatures 0.2.17", + "digest 0.10.7", +] + +[[package]] +name = "sha3" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be176f1a57ce4e3d31c1a166222d9768de5954f811601fb7ca06fc8203905ce1" +dependencies = [ + "digest 0.11.3", + "keccak", ] [[package]] @@ -2375,6 +2580,27 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9628de9b8791db39ceda2b119bbe13134770b56c138ec1d3af810d045c04f9bd" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab85eea0270ee17587ed4156089e10b9e6880ee688791d45a905f5b1ca36f664" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "subtle" version = "2.6.1" @@ -2470,6 +2696,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tar" +version = "0.4.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6221d9a6003c78398e3b239969f352578258df48c8eb051caadae0015bc840" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "tempfile" version = "3.27.0" @@ -2645,12 +2882,24 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unicode-segmentation" +version = "1.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6f5d3c3b1bf09027a88a6bc961fc00497d651009560b5463668dc81b0fa87a8" + [[package]] name = "unicode-width" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "untrusted" version = "0.9.0" @@ -2673,6 +2922,35 @@ dependencies = [ "webpki-roots 0.26.11", ] +[[package]] +name = "ureq" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dea7109cdcd5864d4eeb1b58a1648dc9bf520360d7af16ec26d0a9354bafcfc0" +dependencies = [ + "base64", + "flate2", + "log", + "percent-encoding", + "rustls", + "rustls-pki-types", + "ureq-proto", + "utf8-zero", + "webpki-roots 1.0.7", +] + +[[package]] +name = "ureq-proto" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e994ba84b0bd1b1b0cf92878b7ef898a5c1760108fe7b6010327e274917a808c" +dependencies = [ + "base64", + "http", + "httparse", + "log", +] + [[package]] name = "url" version = "2.5.8" @@ -2685,6 +2963,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf8-zero" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8c0a043c9540bae7c578c88f91dda8bd82e59ae27c21baca69c8b191aaf5a6e" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -3110,6 +3394,16 @@ dependencies = [ "tap", ] +[[package]] +name = "xattr" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +dependencies = [ + "libc", + "rustix", +] + [[package]] name = "xxhash-rust" version = "0.8.15" diff --git a/src/obikindex/Cargo.toml b/src/obikindex/Cargo.toml index 3f89bb0..3541d90 100644 --- a/src/obikindex/Cargo.toml +++ b/src/obikindex/Cargo.toml @@ -17,3 +17,4 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" indicatif = "0.17" tracing = "0.1.44" +hwlocality = { version = "1.0.0-alpha.11", features = ["vendored"] } diff --git a/src/obikindex/src/lib.rs b/src/obikindex/src/lib.rs index 57e3e18..371a05c 100644 --- a/src/obikindex/src/lib.rs +++ b/src/obikindex/src/lib.rs @@ -5,6 +5,7 @@ mod distance; mod dump; mod index; mod merge; +mod numa; mod rebuild; mod reindex; mod select; diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index 3242916..be17ce6 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -242,17 +242,27 @@ impl KmerIndex { order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i])); // ── Adaptive worker pool ────────────────────────────────────────── - // Start with 1 worker thread. After each completed partition, - // measure CPU efficiency (via getrusage delta). If efficiency is - // below the spawn threshold and more partitions remain, spawn one - // additional worker. Workers share a crossbeam channel of partition - // IDs; each reports (id, g_len, duration) on a result channel. + // Default (non-NUMA): start with 1 worker, grow adaptively up to + // n_cores/2 based on CPU efficiency. + // + // NUMA mode (Linux, multi-node): one pinned Rayon ThreadPool per + // NUMA node, workers_per_node workers per node, all pre-activated. + // No adaptive spawn: the optimal count is fixed by memory bandwidth. let n_cores = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1); let max_workers = (n_cores / 2).max(1); let _ = budget_fraction; // kept in signature for CLI compatibility + let numa = crate::numa::build(); + + // effective_max_workers: slots to pre-spawn. + // numa_all_active: whether to activate all slots immediately. + let (effective_max_workers, numa_all_active) = match &numa { + Some(ns) => (ns.pools.len() * ns.workers_per_node(), true), + None => (max_workers, false), + }; + let (part_tx, part_rx) = unbounded::(); let (result_tx, result_rx) = unbounded::<(usize, Result, Duration)>(); @@ -276,25 +286,54 @@ impl KmerIndex { let srcs = &srcs; let evidence = &evidence; + if let Some(ns) = &numa { + debug!( + "NUMA mode: {} node(s) × {} worker(s)/node = {} total workers", + ns.pools.len(), + ns.workers_per_node(), + effective_max_workers, + ); + } + std::thread::scope(|s| -> OKIResult<()> { - // Pre-spawn max_workers threads; each waits for an activation - // signal before consuming from part_rx. - for _ in 0..max_workers { + // Pre-spawn threads. In NUMA mode each thread is pinned to its + // node's CPUs and wraps merge_partition in pool.install() so + // that all Rayon calls use the node-local ThreadPool, and + // Linux first-touch places graph allocations in local DRAM. + for worker_idx in 0..effective_max_workers { let prx = part_rx.clone(); let rtx = result_tx.clone(); let arx = activate_rx.clone(); + + // Per-worker NUMA config: (pool, cpus) for this slot. + let numa_config: Option<(std::sync::Arc, Vec)> = + numa.as_ref().map(|ns| { + let wpn = ns.workers_per_node(); + let node = worker_idx / wpn; + ( + std::sync::Arc::clone(&ns.pools[node]), + ns.cpus_per_node[node].clone(), + ) + }); + s.spawn(move || { + if let Some((_, ref cpus)) = numa_config { + crate::numa::pin_current_thread(cpus); + } if arx.recv().is_ok() { for i in &prx { let t = Instant::now(); - let r = dst_partition.merge_partition( - i, - srcs, - mode, - n_dst_genomes, - block_bits, - evidence, - ); + let r = if let Some((ref pool, _)) = numa_config { + pool.install(|| { + dst_partition.merge_partition( + i, srcs, mode, n_dst_genomes, block_bits, evidence, + ) + }) + } else { + dst_partition.merge_partition( + i, srcs, mode, n_dst_genomes, block_bits, evidence, + ) + }; rtx.send((i, r, t.elapsed())).ok(); } } @@ -302,9 +341,17 @@ impl KmerIndex { } drop(result_tx); - // Activate first worker immediately. - activate_tx.send(()).ok(); - n_workers = 1; + if numa_all_active { + // NUMA: activate every worker immediately. + for _ in 0..effective_max_workers { + activate_tx.send(()).ok(); + } + n_workers = effective_max_workers; + } else { + // Non-NUMA: activate first worker, grow adaptively. + activate_tx.send(()).ok(); + n_workers = 1; + } const SPAWN_POLL: Duration = Duration::from_secs(20); @@ -312,11 +359,10 @@ impl KmerIndex { while completed < n_partitions { let result = result_rx.recv_timeout(SPAWN_POLL); - // On timeout: no partition finished yet, just check efficiency. let (i, r, dur) = match result { Ok(v) => v, Err(crossbeam_channel::RecvTimeoutError::Timeout) => { - if n_workers < max_workers { + if !numa_all_active && n_workers < effective_max_workers { let eff = cpu_sample.cpu_efficiency(n_cores); if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) { debug!( @@ -353,7 +399,7 @@ impl KmerIndex { }); completed += 1; - if n_workers < max_workers && completed < n_partitions { + if !numa_all_active && n_workers < effective_max_workers && completed < n_partitions { let eff = cpu_sample.cpu_efficiency(n_cores); if should_spawn_worker(n_workers, eff, efficiency_at_last_spawn) { debug!( @@ -369,7 +415,9 @@ impl KmerIndex { } } } - // Close activate_tx: dormant workers exit cleanly. + // Dropping activate_tx signals dormant workers to exit cleanly + // (non-NUMA). In NUMA mode all workers were already activated so + // this drop is just cleanup. drop(activate_tx); Ok(()) })?; @@ -377,7 +425,7 @@ impl KmerIndex { pb.finish_and_clear(); // ── Diagnostic report ───────────────────────────────────────────── - print_merge_partition_report(&part_stats, n_workers, max_workers); + print_merge_partition_report(&part_stats, n_workers, effective_max_workers); rep.push(t.stop()); } diff --git a/src/obikindex/src/numa.rs b/src/obikindex/src/numa.rs new file mode 100644 index 0000000..da79b3f --- /dev/null +++ b/src/obikindex/src/numa.rs @@ -0,0 +1,102 @@ +// NUMA-aware Rayon thread pools via hwlocality. +// +// Detects NUMA topology using hwloc (cross-platform: Linux, macOS, etc.) and +// builds one Rayon ThreadPool per NUMA node with threads pinned to that node's +// CPUs. Linux first-touch policy then places graph allocations in local DRAM +// automatically — no explicit memory binding needed. +// +// Returns None when: +// - hwloc topology initialisation fails +// - the system has only one NUMA node (UMA, Apple Silicon, single-socket) +// - any per-node pool fails to build + +use std::sync::Arc; + +use hwlocality::Topology; +use hwlocality::cpu::binding::CpuBindingFlags; +use hwlocality::cpu::cpuset::CpuSet; +use hwlocality::object::types::ObjectType; +use tracing::debug; + +// ── Public interface ────────────────────────────────────────────────────────── + +pub struct NumaSetup { + pub pools: Vec>, + /// CPU indices for each NUMA node, in node order. + pub cpus_per_node: Vec>, +} + +impl NumaSetup { + /// Workers to activate per NUMA node. + /// Empirically ~3 workers saturate one node's memory bandwidth. + pub fn workers_per_node(&self) -> usize { + self.cpus_per_node + .first() + .map(|c| (c.len() / 8).max(3).min(8)) + .unwrap_or(3) + } +} + +/// Detect NUMA topology and build per-node Rayon pools. +/// Returns None on UMA systems, single-node machines, or on failure. +pub fn build() -> Option { + let topology = Topology::new().ok()?; + + let nodes: Vec> = topology + .objects_with_type(ObjectType::NUMANode) + .filter_map(|obj| obj.cpuset()) + .map(|cpuset| { + cpuset + .iter_set() + .map(|idx| usize::from(idx)) + .collect::>() + }) + .filter(|v| !v.is_empty()) + .collect(); + + if nodes.len() <= 1 { + return None; + } + + debug!( + "NUMA topology: {} node(s), {} core(s)/node", + nodes.len(), + nodes.first().map_or(0, |v| v.len()), + ); + + let pools = nodes + .iter() + .map(|cpus| build_pool(cpus).map(Arc::new)) + .collect::>>()?; + + Some(NumaSetup { pools, cpus_per_node: nodes }) +} + +/// Bind the calling thread to `cpu_indices` using hwloc. +/// Silently returns on any error so the thread still runs, just unbound. +pub fn pin_current_thread(cpu_indices: &[usize]) { + let Ok(topology) = Topology::new() else { return }; + let mut cpuset = CpuSet::new(); + for &idx in cpu_indices { + cpuset.set(idx); + } + let _ = topology.bind_cpu(&cpuset, CpuBindingFlags::THREAD); +} + +// ── Internal helpers ────────────────────────────────────────────────────────── + +fn build_pool(cpus: &[usize]) -> Option { + let cpus = cpus.to_vec(); + rayon::ThreadPoolBuilder::new() + .num_threads(cpus.len()) + .spawn_handler(move |thread| { + let cpus = cpus.clone(); + std::thread::Builder::new().spawn(move || { + pin_current_thread(&cpus); + thread.run(); + })?; + Ok(()) + }) + .build() + .ok() +} diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index dc0aa15..8727bef 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -217,6 +217,9 @@ impl KmerPartition { } } + let n_src_layers = unitig_paths.len(); + debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)"); + enum Pass1Data { File(PathBuf), Batch(Vec), @@ -224,7 +227,9 @@ impl KmerPartition { } const BATCH: usize = 4096; - let n_workers = std::thread::available_parallelism().map_or(4, |n| n.get()); + // Inside pool.install() this returns the per-NUMA pool size; outside + // it returns the global pool size. Both are the right value here. + let n_workers = rayon::current_num_threads().max(1); let capacity = n_workers * 8; let dst_filter = Arc::clone(&dst_map); @@ -311,18 +316,20 @@ impl KmerPartition { fs::create_dir_all(&new_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; debug!("partition {i}: unitig traversal start — {} nodes", g.len()); + let mut n_unitigs = 0usize; g.try_for_each_unitig(|unitig| { + n_unitigs += 1; uw.write(unitig) })?; debug!("partition {i}: unitig writer closing"); uw.close()?; - debug!("partition {i}: unitig writer closed — dropping graph ({} nodes)", g.len()); - let n = g.len(); + let n_nodes = g.len(); + debug!("partition {i}: unitig writer closed — dropping graph ({n_nodes} nodes)"); drop(g); - debug!("partition {i}: graph dropped — starting MPHF build ({n} unitigs)"); + debug!("partition {i}: graph dropped — starting MPHF build ({n_unitigs} unitigs)"); Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?; debug!("partition {i}: MPHF build done"); - n + n_nodes } else { drop(g); 0