Merge pull request 'feat: add pipeline concurrency throttling and HPC build docs' (#30) from push-owwylwtskwzw into main

Reviewed-on: #30
This commit was merged in pull request #30.
This commit is contained in:
2026-06-15 08:33:41 +00:00
7 changed files with 149 additions and 85 deletions
+16
View File
@@ -53,6 +53,22 @@ cargo build --release
The compiled binary is at `target/release/obikmer`.
### Building on HPC clusters (network filesystems)
HPC home directories are typically on a network filesystem (Lustre, NFS) optimised for large sequential reads — not for the thousands of small file operations that Cargo generates during compilation. Building directly on such a filesystem can be extremely slow (0.1% CPU utilisation, tens of minutes for what should take seconds).
**Always redirect the build directory to a local scratch disk:**
```bash
CARGO_TARGET_DIR=/scratch/local/$USER/cargo-target cargo build --release
```
Adapt the path to the local scratch available on your cluster (`/var/tmp`, `/tmp`, `/scratch/local`, etc.). Once built, copy the binary to a permanent location:
```bash
cp /scratch/local/$USER/cargo-target/release/obikmer ~/bin/
```
## NUMA support
NUMA-aware thread placement is active automatically on multi-socket Linux machines (detected at runtime via hwloc). No special build flag is required — the detection is built in and falls back gracefully to the single-pool adaptive strategy on:
+2 -63
View File
@@ -1,9 +1,9 @@
use std::path::PathBuf;
use std::sync::{Arc, Condvar, Mutex};
use clap::Args;
use obiread::NucPage;
use obikseq::RoutableSuperKmer;
use obipipeline::Throttled;
// ── Shared arguments ──────────────────────────────────────────────────────────
@@ -103,54 +103,10 @@ impl CommonArgs {
}
}
// ── Open-file throttling ──────────────────────────────────────────────────────
struct FileSlots {
count: Mutex<usize>,
condvar: Condvar,
max: usize,
}
impl FileSlots {
fn new(max: usize) -> Self {
Self { count: Mutex::new(0), condvar: Condvar::new(), max }
}
fn acquire(&self) {
let mut count = self.count.lock().unwrap();
while *count >= self.max {
count = self.condvar.wait(count).unwrap();
}
*count += 1;
}
fn release(&self) {
let mut count = self.count.lock().unwrap();
*count -= 1;
self.condvar.notify_one();
}
}
struct SlotsGuard(Arc<FileSlots>);
impl Drop for SlotsGuard {
fn drop(&mut self) {
self.0.release();
}
}
// ── Pipeline data carrier ─────────────────────────────────────────────────────
/// A path bundled with an opaque guard token.
/// The guard is acquired in the source thread and dropped by the flat worker
/// once the file is fully read, releasing the open-file slot.
pub struct PathWithSlot {
pub path: PathBuf,
pub _guard: Box<dyn Send + 'static>,
}
pub enum PipelineData {
Path(PathWithSlot),
Path(Throttled<PathBuf>),
NucPage(NucPage),
Batch(Vec<RoutableSuperKmer>),
}
@@ -158,20 +114,3 @@ pub enum PipelineData {
unsafe impl Send for PipelineData {}
unsafe impl Sync for PipelineData {}
/// Wrap a path iterator so that at most `max_open` files are open simultaneously.
/// Acquisition happens in the caller's thread (the pipeline source thread),
/// never inside a worker, preventing deadlocks.
pub fn throttle_paths(
source: impl Iterator<Item = PathBuf> + Send + 'static,
max_open: usize,
) -> impl Iterator<Item = PathWithSlot> + Send + 'static {
let slots = Arc::new(FileSlots::new(max_open));
source.map(move |path| {
slots.acquire();
PathWithSlot {
path,
_guard: Box::new(SlotsGuard(Arc::clone(&slots))),
}
})
}
+9 -5
View File
@@ -1,10 +1,13 @@
use std::io::{self, BufWriter, Write};
use std::path::PathBuf;
use clap::Args;
use obifastwrite::write_scatter;
use obikseq::{RoutableSuperKmer, set_k, set_m};
use crate::cli::{CommonArgs, PipelineData, PathWithSlot, partitions_to_bits, throttle_paths};
use obipipeline::{Throttled, throttle};
use crate::cli::{CommonArgs, PipelineData, partitions_to_bits};
#[derive(Args)]
pub struct SuperkmerArgs {
@@ -46,14 +49,15 @@ pub fn run(args: SuperkmerArgs) {
set_k(k);
set_m(m);
let path_source = throttle_paths(args.common.seqfile_paths(), max_open);
let path_source = throttle(args.common.seqfile_paths(), max_open);
let pipe = obipipeline::make_pipe! {
PipelineData : PathWithSlot => Vec<RoutableSuperKmer>,
PipelineData : Throttled<PathBuf> => Vec<RoutableSuperKmer>,
||? {
let k = k;
move |pw: PathWithSlot| {
let path_str = pw.path.to_str().unwrap_or("").to_owned();
move |pw: Throttled<PathBuf>| {
let path_str = pw.item.to_str().unwrap_or("").to_owned();
let _guard = pw.guard;
obiread::open_nuc_stream(&path_str, k)
}
} : Path => NucPage,
+9 -7
View File
@@ -6,16 +6,17 @@ use std::time::Instant;
use obisys::spinner;
use obiread::NucPage;
use obikpartitionner::KmerPartition;
use obipipeline::{ThrottleGuard, Throttled, throttle};
use obisys::{Reporter, Stage};
use tracing::info;
use crate::cli::{PipelineData, PathWithSlot, throttle_paths};
use crate::cli::PipelineData;
// ── Iterator that keeps the slot guard alive until the file is exhausted ──────
struct GuardedIter {
inner: Box<dyn Iterator<Item = NucPage> + Send>,
_guard: Box<dyn Send + 'static>,
_guard: ThrottleGuard,
flat_active: Arc<AtomicU32>,
}
@@ -49,7 +50,7 @@ pub fn scatter(
use obikseq::RoutableSuperKmer;
// Throttle in the source thread — never in a worker — to prevent deadlock.
let throttled = throttle_paths(path_source, max_open);
let throttled = throttle(path_source, max_open);
let file_count = Arc::new(AtomicU64::new(0));
let flat_active = Arc::new(AtomicU32::new(0));
@@ -57,19 +58,20 @@ pub fn scatter(
let t = Stage::start("scatter");
let pipe = obipipeline::make_pipe! {
PipelineData : PathWithSlot => Vec<RoutableSuperKmer>,
PipelineData : Throttled<PathBuf> => Vec<RoutableSuperKmer>,
||? {
let file_count = Arc::clone(&file_count);
let flat_active = Arc::clone(&flat_active);
let k = k;
move |pw: PathWithSlot| {
let PathWithSlot { path, _guard } = pw;
move |pw: Throttled<PathBuf>| {
let path = pw.item;
let guard = pw.guard;
let n = file_count.fetch_add(1, Ordering::Relaxed) + 1;
info!("indexing [{}]: {}", n, path.display());
let path_str = path.to_str().unwrap_or("").to_owned();
flat_active.fetch_add(1, Ordering::Relaxed);
obiread::open_nuc_stream(&path_str, k)
.map(|iter| GuardedIter { inner: iter, _guard, flat_active: Arc::clone(&flat_active) })
.map(|iter| GuardedIter { inner: iter, _guard: guard, flat_active: Arc::clone(&flat_active) })
}
} : Path => NucPage,
| {
+22 -10
View File
@@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex};
use tracing::debug;
use obipipeline::{
Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool,
ThrottleGuard, throttle,
make_sink, make_source, make_transform,
};
@@ -221,16 +222,18 @@ impl KmerPartition {
debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)");
enum Pass1Data {
File(PathBuf),
File((PathBuf, ThrottleGuard)),
Batch(Vec<CanonicalKmer>),
NewKmers(Vec<CanonicalKmer>),
}
const BATCH: usize = 4096;
// Inside pool.install() this returns the per-NUMA pool size; outside
// it returns the global pool size. Both are the right value here.
let n_workers = rayon::current_num_threads().max(1);
let capacity = n_workers * 8;
let n_workers = rayon::current_num_threads().min(16).max(4);
let capacity = 2;
// At most 2 files open simultaneously: keeps n_workers-2 workers free
// for the Transform stage. Each open file monopolises one worker for the
// full duration of its read, so this must stay well below n_workers.
let max_open = 2;
let dst_filter = Arc::clone(&dst_map);
let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new()));
@@ -238,15 +241,18 @@ impl KmerPartition {
let pass1_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let err_cap = Arc::clone(&pass1_err);
let throttled_paths = throttle(unitig_paths.into_iter(), max_open);
let pipeline = Pipeline::new(
make_source!(Pass1Data, unitig_paths, File),
make_source!(Pass1Data, throttled_paths.map(|t| (t.item, t.guard)), File),
vec![
Stage::Flat(Arc::new(
move |data: Pass1Data,
push: &PipelineSender<Result<Pass1Data, PipelineError>>,
delta: &PipelineSender<isize>|
{
if let Pass1Data::File(path) = data {
if let Pass1Data::File((path, _guard)) = data {
// _guard is dropped at end of this block, releasing the slot.
let reader = match UnitigFileReader::open_sequential(&path) {
Ok(r) => r,
Err(e) => {
@@ -455,7 +461,7 @@ impl KmerPartition {
}
enum Pass2Data {
SrcLayer((usize, usize, PathBuf)),
SrcLayer((usize, usize, PathBuf, ThrottleGuard)),
RawBatch((usize, usize, Arc<SrcLayerData>, Vec<CanonicalKmer>)),
WriteBatch(Vec<(Option<usize>, usize, usize, u32)>),
}
@@ -477,15 +483,21 @@ impl KmerPartition {
let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let err_cap2 = Arc::clone(&pass2_err);
let throttled_pass2 = throttle(pass2_items.into_iter(), max_open);
let pipeline2 = Pipeline::new(
make_source!(Pass2Data, pass2_items, SrcLayer),
make_source!(Pass2Data, throttled_pass2.map(|t| {
let (col_offset, src_n, src_layer_dir) = t.item;
(col_offset, src_n, src_layer_dir, t.guard)
}), SrcLayer),
vec![
Stage::Flat(Arc::new(
move |data: Pass2Data,
push: &PipelineSender<Result<Pass2Data, PipelineError>>,
delta: &PipelineSender<isize>|
{
if let Pass2Data::SrcLayer((col_offset, src_n, src_layer_dir)) = data {
if let Pass2Data::SrcLayer((col_offset, src_n, src_layer_dir, _guard)) = data {
// _guard dropped at end of block, releasing the slot.
let reader = match UnitigFileReader::open_sequential(
&src_layer_dir.join("unitigs.bin"),
) {
+5
View File
@@ -1,4 +1,5 @@
mod scheduler;
pub mod throttle;
pub use scheduler::Pipe;
pub use scheduler::PipeIter;
@@ -10,6 +11,10 @@ pub use scheduler::SinkFn;
pub use scheduler::SourceFn;
pub use scheduler::Stage;
pub use scheduler::WorkerPool;
pub use throttle::Throttle;
pub use throttle::ThrottleGuard;
pub use throttle::Throttled;
pub use throttle::throttle;
/// Re-export de `crossbeam_channel::Sender` utilisé dans les macros flat transform.
/// Permet aux macros `make_flat_transform!` / `make_flat_transform_fallible!` d'utiliser
+86
View File
@@ -0,0 +1,86 @@
use std::sync::{Arc, Condvar, Mutex};
// ── Throttle ──────────────────────────────────────────────────────────────────
/// Counting semaphore: limits how many items from a source are in-flight
/// simultaneously through the Flat stage of a pipeline.
///
/// Slots are acquired in the source thread before an item is emitted, and
/// released when the corresponding `ThrottleGuard` is dropped (i.e. when the
/// Flat worker finishes processing the item). Acquisition must never happen
/// inside a worker — only in the source thread — to prevent deadlocks.
pub struct Throttle {
count: Mutex<usize>,
condvar: Condvar,
max: usize,
}
impl Throttle {
pub fn new(max: usize) -> Self {
Self { count: Mutex::new(0), condvar: Condvar::new(), max }
}
pub fn acquire(&self) {
let mut count = self.count.lock().unwrap();
while *count >= self.max {
count = self.condvar.wait(count).unwrap();
}
*count += 1;
}
fn release(&self) {
let mut count = self.count.lock().unwrap();
*count -= 1;
self.condvar.notify_one();
}
}
// ── ThrottleGuard ─────────────────────────────────────────────────────────────
/// RAII guard: releases one slot in the `Throttle` when dropped.
pub struct ThrottleGuard(Arc<Throttle>);
impl Drop for ThrottleGuard {
fn drop(&mut self) {
self.0.release();
}
}
// ── Throttled<T> ──────────────────────────────────────────────────────────────
/// An item paired with its throttle guard.
///
/// The guard keeps a slot acquired until this value is dropped. In a Flat
/// pipeline stage, carry the guard inside the worker closure until the item
/// is fully processed, then let it drop.
pub struct Throttled<T> {
pub item: T,
pub guard: ThrottleGuard,
}
// ── throttle() ────────────────────────────────────────────────────────────────
/// Wrap `source` so that at most `max_concurrent` items are emitted before
/// earlier ones have been fully processed (i.e. their `ThrottleGuard` dropped).
///
/// Acquisition blocks the source thread until a slot is available. This must
/// be called in the source thread, never inside a pipeline worker.
///
/// # Example
///
/// ```ignore
/// let throttled = obipipeline::throttle(file_paths, n_workers - 1);
/// // Use `throttled` as the pipeline source; carry `item.guard` through the
/// // Flat stage and let it drop when the file is fully read.
/// ```
pub fn throttle<I>(source: I, max_concurrent: usize) -> impl Iterator<Item = Throttled<I::Item>>
where
I: Iterator,
I::Item: Send + 'static,
{
let t = Arc::new(Throttle::new(max_concurrent));
source.map(move |item| {
t.acquire();
Throttled { item, guard: ThrottleGuard(Arc::clone(&t)) }
})
}