From 85e190189873481eba366676c81d9aedf346f045 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 22 May 2026 11:25:46 +0200 Subject: [PATCH] feat: limit concurrent open files during scatter Introduces a `max_open_files` CLI argument (default: 20) to cap concurrently open input files during scatter operations. The scatter phase now parallelizes sequence file partitioning across worker threads while enforcing a configurable concurrency limit using a custom semaphore and `GuardedIter` wrapper. This ensures bounded resource usage and prevents file handle exhaustion during index construction. --- src/obikmer/src/cli.rs | 4 +++ src/obikmer/src/cmd/index.rs | 3 +- src/obikmer/src/steps/scatter.rs | 50 ++++++++++++++++++++++++++++++-- 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/src/obikmer/src/cli.rs b/src/obikmer/src/cli.rs index 1b79fa0..f68a92c 100644 --- a/src/obikmer/src/cli.rs +++ b/src/obikmer/src/cli.rs @@ -42,6 +42,10 @@ pub struct CommonArgs { .unwrap_or(1) )] pub threads: usize, + + /// Maximum number of input files open simultaneously during scatter + #[arg(long, default_value_t = 20)] + pub max_open_files: usize, } /// Smallest `b` such that `2^b >= n` (i.e. `n.next_power_of_two().ilog2()`). diff --git a/src/obikmer/src/cmd/index.rs b/src/obikmer/src/cmd/index.rs index 7b45c4d..5559f95 100644 --- a/src/obikmer/src/cmd/index.rs +++ b/src/obikmer/src/cmd/index.rs @@ -105,7 +105,8 @@ pub fn run(args: IndexArgs) { let theta = args.common.theta; let n_workers = args.common.threads.max(1); - scatter(idx.partition_mut(), args.common.seqfile_paths(), k, level_max, theta, n_workers, &mut rep); + let max_open = args.common.max_open_files.max(1); + scatter(idx.partition_mut(), args.common.seqfile_paths(), k, level_max, theta, n_workers, max_open, &mut rep); idx.mark_scattered().unwrap_or_else(|e| { eprintln!("error marking scatter done: {e}"); diff --git a/src/obikmer/src/steps/scatter.rs b/src/obikmer/src/steps/scatter.rs index aa63bbe..75655bf 100644 --- a/src/obikmer/src/steps/scatter.rs +++ b/src/obikmer/src/steps/scatter.rs @@ -1,4 +1,5 @@ use std::path::PathBuf; +use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; use indicatif::{ProgressBar, ProgressStyle}; @@ -8,23 +9,68 @@ use tracing::info; use crate::cli::{PipelineData, open_chunks}; +// ── Counting semaphore ──────────────────────────────────────────────────────── + +struct Semaphore { + state: Mutex, + cvar: Condvar, +} + +impl Semaphore { + fn new(n: usize) -> Arc { + Arc::new(Self { state: Mutex::new(n), cvar: Condvar::new() }) + } + fn acquire(self: &Arc) { + let mut n = self.state.lock().unwrap(); + while *n == 0 { n = self.cvar.wait(n).unwrap(); } + *n -= 1; + } + fn release(self: &Arc) { + *self.state.lock().unwrap() += 1; + self.cvar.notify_one(); + } +} + +struct SemGuard(Arc); +impl Drop for SemGuard { + fn drop(&mut self) { self.0.release(); } +} + +/// Iterator wrapper that holds a semaphore permit for its lifetime. +/// The permit is released when the iterator is exhausted or dropped. +struct GuardedIter { + inner: I, + _guard: SemGuard, +} +impl Iterator for GuardedIter { + type Item = I::Item; + fn next(&mut self) -> Option { self.inner.next() } +} + /// Run scatter: normalise → build superkmers → route to partition → close. /// Reports the "scatter" stage to `rep`. pub fn scatter( kp: &mut KmerPartition, - path_source: impl Iterator + Send + 'static, + path_source: impl Iterator + Send + 'static, k: usize, level_max: usize, theta: f64, n_workers: usize, + max_open: usize, rep: &mut Reporter, ) { use obikseq::RoutableSuperKmer; + let sem = Semaphore::new(max_open); + let t = Stage::start("scatter"); let pipe = obipipeline::make_pipe! { PipelineData : PathBuf => Vec, - ||? { |path: PathBuf| { info!("indexing: {}", path.display()); open_chunks(path) } } : Path => RawChunk, + ||? { move |path: PathBuf| { + sem.acquire(); + info!("indexing: {}", path.display()); + open_chunks(path).map(|iter| GuardedIter { inner: iter, _guard: SemGuard(Arc::clone(&sem)) }) + }} : Path => RawChunk, |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, };