diff --git a/docmd/installation.md b/docmd/installation.md index 36307bb..d9a5cda 100644 --- a/docmd/installation.md +++ b/docmd/installation.md @@ -53,6 +53,22 @@ cargo build --release The compiled binary is at `target/release/obikmer`. +### Building on HPC clusters (network filesystems) + +HPC home directories are typically on a network filesystem (Lustre, NFS) optimised for large sequential reads — not for the thousands of small file operations that Cargo generates during compilation. Building directly on such a filesystem can be extremely slow (0.1% CPU utilisation, tens of minutes for what should take seconds). + +**Always redirect the build directory to a local scratch disk:** + +```bash +CARGO_TARGET_DIR=/scratch/local/$USER/cargo-target cargo build --release +``` + +Adapt the path to the local scratch available on your cluster (`/var/tmp`, `/tmp`, `/scratch/local`, etc.). Once built, copy the binary to a permanent location: + +```bash +cp /scratch/local/$USER/cargo-target/release/obikmer ~/bin/ +``` + ## NUMA support NUMA-aware thread placement is active automatically on multi-socket Linux machines (detected at runtime via hwloc). No special build flag is required — the detection is built in and falls back gracefully to the single-pool adaptive strategy on: diff --git a/src/obikmer/src/cli.rs b/src/obikmer/src/cli.rs index 8a84c03..928264e 100644 --- a/src/obikmer/src/cli.rs +++ b/src/obikmer/src/cli.rs @@ -1,9 +1,9 @@ use std::path::PathBuf; -use std::sync::{Arc, Condvar, Mutex}; use clap::Args; use obiread::NucPage; use obikseq::RoutableSuperKmer; +use obipipeline::Throttled; // ── Shared arguments ────────────────────────────────────────────────────────── @@ -103,54 +103,10 @@ impl CommonArgs { } } -// ── Open-file throttling ────────────────────────────────────────────────────── - -struct FileSlots { - count: Mutex, - condvar: Condvar, - max: usize, -} - -impl FileSlots { - fn new(max: usize) -> Self { - Self { count: Mutex::new(0), condvar: Condvar::new(), max } - } - - fn acquire(&self) { - let mut count = self.count.lock().unwrap(); - while *count >= self.max { - count = self.condvar.wait(count).unwrap(); - } - *count += 1; - } - - fn release(&self) { - let mut count = self.count.lock().unwrap(); - *count -= 1; - self.condvar.notify_one(); - } -} - -struct SlotsGuard(Arc); - -impl Drop for SlotsGuard { - fn drop(&mut self) { - self.0.release(); - } -} - // ── Pipeline data carrier ───────────────────────────────────────────────────── -/// A path bundled with an opaque guard token. -/// The guard is acquired in the source thread and dropped by the flat worker -/// once the file is fully read, releasing the open-file slot. -pub struct PathWithSlot { - pub path: PathBuf, - pub _guard: Box, -} - pub enum PipelineData { - Path(PathWithSlot), + Path(Throttled), NucPage(NucPage), Batch(Vec), } @@ -158,20 +114,3 @@ pub enum PipelineData { unsafe impl Send for PipelineData {} unsafe impl Sync for PipelineData {} -/// Wrap a path iterator so that at most `max_open` files are open simultaneously. -/// Acquisition happens in the caller's thread (the pipeline source thread), -/// never inside a worker, preventing deadlocks. -pub fn throttle_paths( - source: impl Iterator + Send + 'static, - max_open: usize, -) -> impl Iterator + Send + 'static { - let slots = Arc::new(FileSlots::new(max_open)); - source.map(move |path| { - slots.acquire(); - PathWithSlot { - path, - _guard: Box::new(SlotsGuard(Arc::clone(&slots))), - } - }) -} - diff --git a/src/obikmer/src/cmd/superkmer.rs b/src/obikmer/src/cmd/superkmer.rs index 1b0d7cd..707651f 100644 --- a/src/obikmer/src/cmd/superkmer.rs +++ b/src/obikmer/src/cmd/superkmer.rs @@ -1,10 +1,13 @@ use std::io::{self, BufWriter, Write}; +use std::path::PathBuf; use clap::Args; use obifastwrite::write_scatter; use obikseq::{RoutableSuperKmer, set_k, set_m}; -use crate::cli::{CommonArgs, PipelineData, PathWithSlot, partitions_to_bits, throttle_paths}; +use obipipeline::{Throttled, throttle}; + +use crate::cli::{CommonArgs, PipelineData, partitions_to_bits}; #[derive(Args)] pub struct SuperkmerArgs { @@ -46,14 +49,15 @@ pub fn run(args: SuperkmerArgs) { set_k(k); set_m(m); - let path_source = throttle_paths(args.common.seqfile_paths(), max_open); + let path_source = throttle(args.common.seqfile_paths(), max_open); let pipe = obipipeline::make_pipe! { - PipelineData : PathWithSlot => Vec, + PipelineData : Throttled => Vec, ||? { let k = k; - move |pw: PathWithSlot| { - let path_str = pw.path.to_str().unwrap_or("").to_owned(); + move |pw: Throttled| { + let path_str = pw.item.to_str().unwrap_or("").to_owned(); + let _guard = pw.guard; obiread::open_nuc_stream(&path_str, k) } } : Path => NucPage, diff --git a/src/obikmer/src/steps/scatter.rs b/src/obikmer/src/steps/scatter.rs index 8888839..28bd90a 100644 --- a/src/obikmer/src/steps/scatter.rs +++ b/src/obikmer/src/steps/scatter.rs @@ -6,16 +6,17 @@ use std::time::Instant; use obisys::spinner; use obiread::NucPage; use obikpartitionner::KmerPartition; +use obipipeline::{ThrottleGuard, Throttled, throttle}; use obisys::{Reporter, Stage}; use tracing::info; -use crate::cli::{PipelineData, PathWithSlot, throttle_paths}; +use crate::cli::PipelineData; // ── Iterator that keeps the slot guard alive until the file is exhausted ────── struct GuardedIter { inner: Box + Send>, - _guard: Box, + _guard: ThrottleGuard, flat_active: Arc, } @@ -49,7 +50,7 @@ pub fn scatter( use obikseq::RoutableSuperKmer; // Throttle in the source thread — never in a worker — to prevent deadlock. - let throttled = throttle_paths(path_source, max_open); + let throttled = throttle(path_source, max_open); let file_count = Arc::new(AtomicU64::new(0)); let flat_active = Arc::new(AtomicU32::new(0)); @@ -57,19 +58,20 @@ pub fn scatter( let t = Stage::start("scatter"); let pipe = obipipeline::make_pipe! { - PipelineData : PathWithSlot => Vec, + PipelineData : Throttled => Vec, ||? { let file_count = Arc::clone(&file_count); let flat_active = Arc::clone(&flat_active); let k = k; - move |pw: PathWithSlot| { - let PathWithSlot { path, _guard } = pw; + move |pw: Throttled| { + let path = pw.item; + let guard = pw.guard; let n = file_count.fetch_add(1, Ordering::Relaxed) + 1; info!("indexing [{}]: {}", n, path.display()); let path_str = path.to_str().unwrap_or("").to_owned(); flat_active.fetch_add(1, Ordering::Relaxed); obiread::open_nuc_stream(&path_str, k) - .map(|iter| GuardedIter { inner: iter, _guard, flat_active: Arc::clone(&flat_active) }) + .map(|iter| GuardedIter { inner: iter, _guard: guard, flat_active: Arc::clone(&flat_active) }) } } : Path => NucPage, | { diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 8727bef..d56e71a 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex}; use tracing::debug; use obipipeline::{ Pipeline, PipelineError, PipelineSender, SharedFlatFn, Stage, WorkerPool, + ThrottleGuard, throttle, make_sink, make_source, make_transform, }; @@ -221,16 +222,18 @@ impl KmerPartition { debug!("partition {i}: de Bruijn graph build start — {n_src_layers} source layer(s)"); enum Pass1Data { - File(PathBuf), + File((PathBuf, ThrottleGuard)), Batch(Vec), NewKmers(Vec), } const BATCH: usize = 4096; - // Inside pool.install() this returns the per-NUMA pool size; outside - // it returns the global pool size. Both are the right value here. - let n_workers = rayon::current_num_threads().max(1); - let capacity = n_workers * 8; + let n_workers = rayon::current_num_threads().min(16).max(4); + let capacity = 2; + // At most 2 files open simultaneously: keeps n_workers-2 workers free + // for the Transform stage. Each open file monopolises one worker for the + // full duration of its read, so this must stay well below n_workers. + let max_open = 2; let dst_filter = Arc::clone(&dst_map); let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new())); @@ -238,15 +241,18 @@ impl KmerPartition { let pass1_err: Arc>> = Arc::new(Mutex::new(None)); let err_cap = Arc::clone(&pass1_err); + let throttled_paths = throttle(unitig_paths.into_iter(), max_open); + let pipeline = Pipeline::new( - make_source!(Pass1Data, unitig_paths, File), + make_source!(Pass1Data, throttled_paths.map(|t| (t.item, t.guard)), File), vec![ Stage::Flat(Arc::new( move |data: Pass1Data, push: &PipelineSender>, delta: &PipelineSender| { - if let Pass1Data::File(path) = data { + if let Pass1Data::File((path, _guard)) = data { + // _guard is dropped at end of this block, releasing the slot. let reader = match UnitigFileReader::open_sequential(&path) { Ok(r) => r, Err(e) => { @@ -455,7 +461,7 @@ impl KmerPartition { } enum Pass2Data { - SrcLayer((usize, usize, PathBuf)), + SrcLayer((usize, usize, PathBuf, ThrottleGuard)), RawBatch((usize, usize, Arc, Vec)), WriteBatch(Vec<(Option, usize, usize, u32)>), } @@ -477,15 +483,21 @@ impl KmerPartition { let pass2_err: Arc>> = Arc::new(Mutex::new(None)); let err_cap2 = Arc::clone(&pass2_err); + let throttled_pass2 = throttle(pass2_items.into_iter(), max_open); + let pipeline2 = Pipeline::new( - make_source!(Pass2Data, pass2_items, SrcLayer), + make_source!(Pass2Data, throttled_pass2.map(|t| { + let (col_offset, src_n, src_layer_dir) = t.item; + (col_offset, src_n, src_layer_dir, t.guard) + }), SrcLayer), vec![ Stage::Flat(Arc::new( move |data: Pass2Data, push: &PipelineSender>, delta: &PipelineSender| { - if let Pass2Data::SrcLayer((col_offset, src_n, src_layer_dir)) = data { + if let Pass2Data::SrcLayer((col_offset, src_n, src_layer_dir, _guard)) = data { + // _guard dropped at end of block, releasing the slot. let reader = match UnitigFileReader::open_sequential( &src_layer_dir.join("unitigs.bin"), ) { diff --git a/src/obipipeline/src/lib.rs b/src/obipipeline/src/lib.rs index 92f8a71..099ea6e 100644 --- a/src/obipipeline/src/lib.rs +++ b/src/obipipeline/src/lib.rs @@ -1,4 +1,5 @@ mod scheduler; +pub mod throttle; pub use scheduler::Pipe; pub use scheduler::PipeIter; @@ -10,6 +11,10 @@ pub use scheduler::SinkFn; pub use scheduler::SourceFn; pub use scheduler::Stage; pub use scheduler::WorkerPool; +pub use throttle::Throttle; +pub use throttle::ThrottleGuard; +pub use throttle::Throttled; +pub use throttle::throttle; /// Re-export de `crossbeam_channel::Sender` utilisé dans les macros flat transform. /// Permet aux macros `make_flat_transform!` / `make_flat_transform_fallible!` d'utiliser diff --git a/src/obipipeline/src/throttle.rs b/src/obipipeline/src/throttle.rs new file mode 100644 index 0000000..d83b4a6 --- /dev/null +++ b/src/obipipeline/src/throttle.rs @@ -0,0 +1,86 @@ +use std::sync::{Arc, Condvar, Mutex}; + +// ── Throttle ────────────────────────────────────────────────────────────────── + +/// Counting semaphore: limits how many items from a source are in-flight +/// simultaneously through the Flat stage of a pipeline. +/// +/// Slots are acquired in the source thread before an item is emitted, and +/// released when the corresponding `ThrottleGuard` is dropped (i.e. when the +/// Flat worker finishes processing the item). Acquisition must never happen +/// inside a worker — only in the source thread — to prevent deadlocks. +pub struct Throttle { + count: Mutex, + condvar: Condvar, + max: usize, +} + +impl Throttle { + pub fn new(max: usize) -> Self { + Self { count: Mutex::new(0), condvar: Condvar::new(), max } + } + + pub fn acquire(&self) { + let mut count = self.count.lock().unwrap(); + while *count >= self.max { + count = self.condvar.wait(count).unwrap(); + } + *count += 1; + } + + fn release(&self) { + let mut count = self.count.lock().unwrap(); + *count -= 1; + self.condvar.notify_one(); + } +} + +// ── ThrottleGuard ───────────────────────────────────────────────────────────── + +/// RAII guard: releases one slot in the `Throttle` when dropped. +pub struct ThrottleGuard(Arc); + +impl Drop for ThrottleGuard { + fn drop(&mut self) { + self.0.release(); + } +} + +// ── Throttled ────────────────────────────────────────────────────────────── + +/// An item paired with its throttle guard. +/// +/// The guard keeps a slot acquired until this value is dropped. In a Flat +/// pipeline stage, carry the guard inside the worker closure until the item +/// is fully processed, then let it drop. +pub struct Throttled { + pub item: T, + pub guard: ThrottleGuard, +} + +// ── throttle() ──────────────────────────────────────────────────────────────── + +/// Wrap `source` so that at most `max_concurrent` items are emitted before +/// earlier ones have been fully processed (i.e. their `ThrottleGuard` dropped). +/// +/// Acquisition blocks the source thread until a slot is available. This must +/// be called in the source thread, never inside a pipeline worker. +/// +/// # Example +/// +/// ```ignore +/// let throttled = obipipeline::throttle(file_paths, n_workers - 1); +/// // Use `throttled` as the pipeline source; carry `item.guard` through the +/// // Flat stage and let it drop when the file is fully read. +/// ``` +pub fn throttle(source: I, max_concurrent: usize) -> impl Iterator> +where + I: Iterator, + I::Item: Send + 'static, +{ + let t = Arc::new(Throttle::new(max_concurrent)); + source.map(move |item| { + t.acquire(); + Throttled { item, guard: ThrottleGuard(Arc::clone(&t)) } + }) +}