feat: add pipeline concurrency throttling and HPC build docs
Introduces a counting semaphore-based throttling mechanism to limit concurrent file I/O and pipeline processing. Replaces custom path wrappers with standardized `Throttled` types across `obikmer` and `obikpartitionner`, ensuring RAII-based resource cleanup and explicit backpressure. Additionally, documents how to redirect Cargo build artifacts to local scratch storage on HPC filesystems to prevent compilation slowdowns.
This commit is contained in:
@@ -53,6 +53,22 @@ cargo build --release
|
||||
|
||||
The compiled binary is at `target/release/obikmer`.
|
||||
|
||||
### Building on HPC clusters (network filesystems)
|
||||
|
||||
HPC home directories are typically on a network filesystem (Lustre, NFS) optimised for large sequential reads — not for the thousands of small file operations that Cargo generates during compilation. Building directly on such a filesystem can be extremely slow (0.1% CPU utilisation, tens of minutes for what should take seconds).
|
||||
|
||||
**Always redirect the build directory to a local scratch disk:**
|
||||
|
||||
```bash
|
||||
CARGO_TARGET_DIR=/scratch/local/$USER/cargo-target cargo build --release
|
||||
```
|
||||
|
||||
Adapt the path to the local scratch available on your cluster (`/var/tmp`, `/tmp`, `/scratch/local`, etc.). Once built, copy the binary to a permanent location:
|
||||
|
||||
```bash
|
||||
cp /scratch/local/$USER/cargo-target/release/obikmer ~/bin/
|
||||
```
|
||||
|
||||
## NUMA support
|
||||
|
||||
NUMA-aware thread placement is active automatically on multi-socket Linux machines (detected at runtime via hwloc). No special build flag is required — the detection is built in and falls back gracefully to the single-pool adaptive strategy on:
|
||||
|
||||
+2
-63
@@ -1,9 +1,9 @@
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
|
||||
use clap::Args;
|
||||
use obiread::NucPage;
|
||||
use obikseq::RoutableSuperKmer;
|
||||
use obipipeline::Throttled;
|
||||
|
||||
// ── Shared arguments ──────────────────────────────────────────────────────────
|
||||
|
||||
@@ -103,54 +103,10 @@ impl CommonArgs {
|
||||
}
|
||||
}
|
||||
|
||||
// ── Open-file throttling ──────────────────────────────────────────────────────
|
||||
|
||||
struct FileSlots {
|
||||
count: Mutex<usize>,
|
||||
condvar: Condvar,
|
||||
max: usize,
|
||||
}
|
||||
|
||||
impl FileSlots {
|
||||
fn new(max: usize) -> Self {
|
||||
Self { count: Mutex::new(0), condvar: Condvar::new(), max }
|
||||
}
|
||||
|
||||
fn acquire(&self) {
|
||||
let mut count = self.count.lock().unwrap();
|
||||
while *count >= self.max {
|
||||
count = self.condvar.wait(count).unwrap();
|
||||
}
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn release(&self) {
|
||||
let mut count = self.count.lock().unwrap();
|
||||
*count -= 1;
|
||||
self.condvar.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
struct SlotsGuard(Arc<FileSlots>);
|
||||
|
||||
impl Drop for SlotsGuard {
|
||||
fn drop(&mut self) {
|
||||
self.0.release();
|
||||
}
|
||||
}
|
||||
|
||||
// ── Pipeline data carrier ─────────────────────────────────────────────────────
|
||||
|
||||
/// A path bundled with an opaque guard token.
|
||||
/// The guard is acquired in the source thread and dropped by the flat worker
|
||||
/// once the file is fully read, releasing the open-file slot.
|
||||
pub struct PathWithSlot {
|
||||
pub path: PathBuf,
|
||||
pub _guard: Box<dyn Send + 'static>,
|
||||
}
|
||||
|
||||
pub enum PipelineData {
|
||||
Path(PathWithSlot),
|
||||
Path(Throttled<PathBuf>),
|
||||
NucPage(NucPage),
|
||||
Batch(Vec<RoutableSuperKmer>),
|
||||
}
|
||||
@@ -158,20 +114,3 @@ pub enum PipelineData {
|
||||
unsafe impl Send for PipelineData {}
|
||||
unsafe impl Sync for PipelineData {}
|
||||
|
||||
/// Wrap a path iterator so that at most `max_open` files are open simultaneously.
|
||||
/// Acquisition happens in the caller's thread (the pipeline source thread),
|
||||
/// never inside a worker, preventing deadlocks.
|
||||
pub fn throttle_paths(
|
||||
source: impl Iterator<Item = PathBuf> + Send + 'static,
|
||||
max_open: usize,
|
||||
) -> impl Iterator<Item = PathWithSlot> + Send + 'static {
|
||||
let slots = Arc::new(FileSlots::new(max_open));
|
||||
source.map(move |path| {
|
||||
slots.acquire();
|
||||
PathWithSlot {
|
||||
path,
|
||||
_guard: Box::new(SlotsGuard(Arc::clone(&slots))),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
use std::io::{self, BufWriter, Write};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use clap::Args;
|
||||
use obifastwrite::write_scatter;
|
||||
use obikseq::{RoutableSuperKmer, set_k, set_m};
|
||||
|
||||
use crate::cli::{CommonArgs, PipelineData, PathWithSlot, partitions_to_bits, throttle_paths};
|
||||
use obipipeline::{Throttled, throttle};
|
||||
|
||||
use crate::cli::{CommonArgs, PipelineData, partitions_to_bits};
|
||||
|
||||
#[derive(Args)]
|
||||
pub struct SuperkmerArgs {
|
||||
@@ -46,14 +49,15 @@ pub fn run(args: SuperkmerArgs) {
|
||||
set_k(k);
|
||||
set_m(m);
|
||||
|
||||
let path_source = throttle_paths(args.common.seqfile_paths(), max_open);
|
||||
let path_source = throttle(args.common.seqfile_paths(), max_open);
|
||||
|
||||
let pipe = obipipeline::make_pipe! {
|
||||
PipelineData : PathWithSlot => Vec<RoutableSuperKmer>,
|
||||
PipelineData : Throttled<PathBuf> => Vec<RoutableSuperKmer>,
|
||||
||? {
|
||||
let k = k;
|
||||
move |pw: PathWithSlot| {
|
||||
let path_str = pw.path.to_str().unwrap_or("").to_owned();
|
||||
move |pw: Throttled<PathBuf>| {
|
||||
let path_str = pw.item.to_str().unwrap_or("").to_owned();
|
||||
let _guard = pw.guard;
|
||||
obiread::open_nuc_stream(&path_str, k)
|
||||
}
|
||||
} : Path => NucPage,
|
||||
|
||||
@@ -6,16 +6,17 @@ use std::time::Instant;
|
||||
use obisys::spinner;
|
||||
use obiread::NucPage;
|
||||
use obikpartitionner::KmerPartition;
|
||||
use obipipeline::{ThrottleGuard, Throttled, throttle};
|
||||
use obisys::{Reporter, Stage};
|
||||
use tracing::info;
|
||||
|
||||
use crate::cli::{PipelineData, PathWithSlot, throttle_paths};
|
||||
use crate::cli::PipelineData;
|
||||
|
||||
// ── Iterator that keeps the slot guard alive until the file is exhausted ──────
|
||||
|
||||
struct GuardedIter {
|
||||
inner: Box<dyn Iterator<Item = NucPage> + Send>,
|
||||
_guard: Box<dyn Send + 'static>,
|
||||
_guard: ThrottleGuard,
|
||||
flat_active: Arc<AtomicU32>,
|
||||
}
|
||||
|
||||
@@ -49,7 +50,7 @@ pub fn scatter(
|
||||
use obikseq::RoutableSuperKmer;
|
||||
|
||||
// Throttle in the source thread — never in a worker — to prevent deadlock.
|
||||
let throttled = throttle_paths(path_source, max_open);
|
||||
let throttled = throttle(path_source, max_open);
|
||||
|
||||
let file_count = Arc::new(AtomicU64::new(0));
|
||||
let flat_active = Arc::new(AtomicU32::new(0));
|
||||
@@ -57,19 +58,20 @@ pub fn scatter(
|
||||
|
||||
let t = Stage::start("scatter");
|
||||
let pipe = obipipeline::make_pipe! {
|
||||
PipelineData : PathWithSlot => Vec<RoutableSuperKmer>,
|
||||
PipelineData : Throttled<PathBuf> => Vec<RoutableSuperKmer>,
|
||||
||? {
|
||||
let file_count = Arc::clone(&file_count);
|
||||
let flat_active = Arc::clone(&flat_active);
|
||||
let k = k;
|
||||
move |pw: PathWithSlot| {
|
||||
let PathWithSlot { path, _guard } = pw;
|
||||
move |pw: Throttled<PathBuf>| {
|
||||
let path = pw.item;
|
||||
let guard = pw.guard;
|
||||
let n = file_count.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
info!("indexing [{}]: {}", n, path.display());
|
||||
let path_str = path.to_str().unwrap_or("").to_owned();
|
||||
flat_active.fetch_add(1, Ordering::Relaxed);
|
||||
obiread::open_nuc_stream(&path_str, k)
|
||||
.map(|iter| GuardedIter { inner: iter, _guard, flat_active: Arc::clone(&flat_active) })
|
||||
.map(|iter| GuardedIter { inner: iter, _guard: guard, flat_active: Arc::clone(&flat_active) })
|
||||
}
|
||||
} : Path => NucPage,
|
||||
| {
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex};
|
||||
use tracing::debug;
|
||||
use obipipeline::{
|
||||
Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool,
|
||||
ThrottleGuard, throttle,
|
||||
make_sink, make_source, make_transform,
|
||||
};
|
||||
|
||||
@@ -221,16 +222,18 @@ impl KmerPartition {
|
||||
debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)");
|
||||
|
||||
enum Pass1Data {
|
||||
File(PathBuf),
|
||||
File((PathBuf, ThrottleGuard)),
|
||||
Batch(Vec<CanonicalKmer>),
|
||||
NewKmers(Vec<CanonicalKmer>),
|
||||
}
|
||||
|
||||
const BATCH: usize = 4096;
|
||||
// Inside pool.install() this returns the per-NUMA pool size; outside
|
||||
// it returns the global pool size. Both are the right value here.
|
||||
let n_workers = rayon::current_num_threads().max(1);
|
||||
let capacity = n_workers * 8;
|
||||
let n_workers = rayon::current_num_threads().min(16).max(4);
|
||||
let capacity = 2;
|
||||
// At most 2 files open simultaneously: keeps n_workers-2 workers free
|
||||
// for the Transform stage. Each open file monopolises one worker for the
|
||||
// full duration of its read, so this must stay well below n_workers.
|
||||
let max_open = 2;
|
||||
|
||||
let dst_filter = Arc::clone(&dst_map);
|
||||
let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new()));
|
||||
@@ -238,15 +241,18 @@ impl KmerPartition {
|
||||
let pass1_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
|
||||
let err_cap = Arc::clone(&pass1_err);
|
||||
|
||||
let throttled_paths = throttle(unitig_paths.into_iter(), max_open);
|
||||
|
||||
let pipeline = Pipeline::new(
|
||||
make_source!(Pass1Data, unitig_paths, File),
|
||||
make_source!(Pass1Data, throttled_paths.map(|t| (t.item, t.guard)), File),
|
||||
vec![
|
||||
Stage::Flat(Arc::new(
|
||||
move |data: Pass1Data,
|
||||
push: &PipelineSender<Result<Pass1Data, PipelineError>>,
|
||||
delta: &PipelineSender<isize>|
|
||||
{
|
||||
if let Pass1Data::File(path) = data {
|
||||
if let Pass1Data::File((path, _guard)) = data {
|
||||
// _guard is dropped at end of this block, releasing the slot.
|
||||
let reader = match UnitigFileReader::open_sequential(&path) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
@@ -455,7 +461,7 @@ impl KmerPartition {
|
||||
}
|
||||
|
||||
enum Pass2Data {
|
||||
SrcLayer((usize, usize, PathBuf)),
|
||||
SrcLayer((usize, usize, PathBuf, ThrottleGuard)),
|
||||
RawBatch((usize, usize, Arc<SrcLayerData>, Vec<CanonicalKmer>)),
|
||||
WriteBatch(Vec<(Option<usize>, usize, usize, u32)>),
|
||||
}
|
||||
@@ -477,15 +483,21 @@ impl KmerPartition {
|
||||
let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
|
||||
let err_cap2 = Arc::clone(&pass2_err);
|
||||
|
||||
let throttled_pass2 = throttle(pass2_items.into_iter(), max_open);
|
||||
|
||||
let pipeline2 = Pipeline::new(
|
||||
make_source!(Pass2Data, pass2_items, SrcLayer),
|
||||
make_source!(Pass2Data, throttled_pass2.map(|t| {
|
||||
let (col_offset, src_n, src_layer_dir) = t.item;
|
||||
(col_offset, src_n, src_layer_dir, t.guard)
|
||||
}), SrcLayer),
|
||||
vec![
|
||||
Stage::Flat(Arc::new(
|
||||
move |data: Pass2Data,
|
||||
push: &PipelineSender<Result<Pass2Data, PipelineError>>,
|
||||
delta: &PipelineSender<isize>|
|
||||
{
|
||||
if let Pass2Data::SrcLayer((col_offset, src_n, src_layer_dir)) = data {
|
||||
if let Pass2Data::SrcLayer((col_offset, src_n, src_layer_dir, _guard)) = data {
|
||||
// _guard dropped at end of block, releasing the slot.
|
||||
let reader = match UnitigFileReader::open_sequential(
|
||||
&src_layer_dir.join("unitigs.bin"),
|
||||
) {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
mod scheduler;
|
||||
pub mod throttle;
|
||||
|
||||
pub use scheduler::Pipe;
|
||||
pub use scheduler::PipeIter;
|
||||
@@ -10,6 +11,10 @@ pub use scheduler::SinkFn;
|
||||
pub use scheduler::SourceFn;
|
||||
pub use scheduler::Stage;
|
||||
pub use scheduler::WorkerPool;
|
||||
pub use throttle::Throttle;
|
||||
pub use throttle::ThrottleGuard;
|
||||
pub use throttle::Throttled;
|
||||
pub use throttle::throttle;
|
||||
|
||||
/// Re-export de `crossbeam_channel::Sender` utilisé dans les macros flat transform.
|
||||
/// Permet aux macros `make_flat_transform!` / `make_flat_transform_fallible!` d'utiliser
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
|
||||
// ── Throttle ──────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Counting semaphore: limits how many items from a source are in-flight
|
||||
/// simultaneously through the Flat stage of a pipeline.
|
||||
///
|
||||
/// Slots are acquired in the source thread before an item is emitted, and
|
||||
/// released when the corresponding `ThrottleGuard` is dropped (i.e. when the
|
||||
/// Flat worker finishes processing the item). Acquisition must never happen
|
||||
/// inside a worker — only in the source thread — to prevent deadlocks.
|
||||
pub struct Throttle {
|
||||
count: Mutex<usize>,
|
||||
condvar: Condvar,
|
||||
max: usize,
|
||||
}
|
||||
|
||||
impl Throttle {
|
||||
pub fn new(max: usize) -> Self {
|
||||
Self { count: Mutex::new(0), condvar: Condvar::new(), max }
|
||||
}
|
||||
|
||||
pub fn acquire(&self) {
|
||||
let mut count = self.count.lock().unwrap();
|
||||
while *count >= self.max {
|
||||
count = self.condvar.wait(count).unwrap();
|
||||
}
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
fn release(&self) {
|
||||
let mut count = self.count.lock().unwrap();
|
||||
*count -= 1;
|
||||
self.condvar.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
// ── ThrottleGuard ─────────────────────────────────────────────────────────────
|
||||
|
||||
/// RAII guard: releases one slot in the `Throttle` when dropped.
|
||||
pub struct ThrottleGuard(Arc<Throttle>);
|
||||
|
||||
impl Drop for ThrottleGuard {
|
||||
fn drop(&mut self) {
|
||||
self.0.release();
|
||||
}
|
||||
}
|
||||
|
||||
// ── Throttled<T> ──────────────────────────────────────────────────────────────
|
||||
|
||||
/// An item paired with its throttle guard.
|
||||
///
|
||||
/// The guard keeps a slot acquired until this value is dropped. In a Flat
|
||||
/// pipeline stage, carry the guard inside the worker closure until the item
|
||||
/// is fully processed, then let it drop.
|
||||
pub struct Throttled<T> {
|
||||
pub item: T,
|
||||
pub guard: ThrottleGuard,
|
||||
}
|
||||
|
||||
// ── throttle() ────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Wrap `source` so that at most `max_concurrent` items are emitted before
|
||||
/// earlier ones have been fully processed (i.e. their `ThrottleGuard` dropped).
|
||||
///
|
||||
/// Acquisition blocks the source thread until a slot is available. This must
|
||||
/// be called in the source thread, never inside a pipeline worker.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```ignore
|
||||
/// let throttled = obipipeline::throttle(file_paths, n_workers - 1);
|
||||
/// // Use `throttled` as the pipeline source; carry `item.guard` through the
|
||||
/// // Flat stage and let it drop when the file is fully read.
|
||||
/// ```
|
||||
pub fn throttle<I>(source: I, max_concurrent: usize) -> impl Iterator<Item = Throttled<I::Item>>
|
||||
where
|
||||
I: Iterator,
|
||||
I::Item: Send + 'static,
|
||||
{
|
||||
let t = Arc::new(Throttle::new(max_concurrent));
|
||||
source.map(move |item| {
|
||||
t.acquire();
|
||||
Throttled { item, guard: ThrottleGuard(Arc::clone(&t)) }
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user