diff --git a/src/obikmer/src/cli.rs b/src/obikmer/src/cli.rs index f68a92c..b20ae6b 100644 --- a/src/obikmer/src/cli.rs +++ b/src/obikmer/src/cli.rs @@ -1,5 +1,6 @@ use std::io; use std::path::PathBuf; +use std::sync::{Arc, Condvar, Mutex}; use clap::Args; use obikrope::Rope; @@ -43,7 +44,7 @@ pub struct CommonArgs { )] pub threads: usize, - /// Maximum number of input files open simultaneously during scatter + /// Maximum number of input files open simultaneously #[arg(long, default_value_t = 20)] pub max_open_files: usize, } @@ -61,10 +62,54 @@ impl CommonArgs { } } +// ── Open-file throttling ────────────────────────────────────────────────────── + +struct FileSlots { + count: Mutex, + condvar: Condvar, + max: usize, +} + +impl FileSlots { + fn new(max: usize) -> Self { + Self { count: Mutex::new(0), condvar: Condvar::new(), max } + } + + fn acquire(&self) { + let mut count = self.count.lock().unwrap(); + while *count >= self.max { + count = self.condvar.wait(count).unwrap(); + } + *count += 1; + } + + fn release(&self) { + let mut count = self.count.lock().unwrap(); + *count -= 1; + self.condvar.notify_one(); + } +} + +struct SlotsGuard(Arc); + +impl Drop for SlotsGuard { + fn drop(&mut self) { + self.0.release(); + } +} + // ── Pipeline data carrier ───────────────────────────────────────────────────── +/// A path bundled with an opaque guard token. +/// The guard is acquired in the source thread and dropped by the flat worker +/// once the file is fully read, releasing the open-file slot. +pub struct PathWithSlot { + pub path: PathBuf, + pub _guard: Box, +} + pub enum PipelineData { - Path(PathBuf), + Path(PathWithSlot), RawChunk(Rope), NormChunk(Rope), Batch(Vec), @@ -75,6 +120,23 @@ pub enum PipelineData { unsafe impl Send for PipelineData {} unsafe impl Sync for PipelineData {} +/// Wrap a path iterator so that at most `max_open` files are open simultaneously. +/// Acquisition happens in the caller's thread (the pipeline source thread), +/// never inside a worker, preventing deadlocks. +pub fn throttle_paths( + source: impl Iterator + Send + 'static, + max_open: usize, +) -> impl Iterator + Send + 'static { + let slots = Arc::new(FileSlots::new(max_open)); + source.map(move |path| { + slots.acquire(); + PathWithSlot { + path, + _guard: Box::new(SlotsGuard(Arc::clone(&slots))), + } + }) +} + // ── I/O plumbing ────────────────────────────────────────────────────────────── pub fn open_chunks(path: PathBuf) -> io::Result> { diff --git a/src/obikmer/src/cmd/superkmer.rs b/src/obikmer/src/cmd/superkmer.rs index 881d4a6..7d4f91a 100644 --- a/src/obikmer/src/cmd/superkmer.rs +++ b/src/obikmer/src/cmd/superkmer.rs @@ -1,11 +1,10 @@ use std::io::{self, BufWriter, Write}; -use std::path::PathBuf; use clap::Args; use obifastwrite::write_scatter; use obikseq::{RoutableSuperKmer, set_k, set_m}; -use crate::cli::{CommonArgs, PipelineData, open_chunks, partitions_to_bits}; +use crate::cli::{CommonArgs, PipelineData, PathWithSlot, open_chunks, partitions_to_bits, throttle_paths}; #[derive(Args)] pub struct SuperkmerArgs { @@ -40,17 +39,18 @@ pub fn run(args: SuperkmerArgs) { let level_max = args.common.level_max; let partition_bits = partitions_to_bits(args.common.partitions); let n_workers = args.common.threads.max(1); + let max_open = args.common.max_open_files.max(1); set_k(k); set_m(m); - let path_source = args.common.seqfile_paths(); + let path_source = throttle_paths(args.common.seqfile_paths(), max_open); let pipe = obipipeline::make_pipe! { - PipelineData : PathBuf => Vec, - ||? { |path| open_chunks(path) } : Path => RawChunk, - |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, - | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, + PipelineData : PathWithSlot => Vec, + ||? { |pw: PathWithSlot| open_chunks(pw.path) } : Path => RawChunk, + |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, + | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) } : NormChunk => Batch, }; let mut out = BufWriter::new(io::stdout()); diff --git a/src/obikmer/src/steps/scatter.rs b/src/obikmer/src/steps/scatter.rs index 75655bf..f451ea0 100644 --- a/src/obikmer/src/steps/scatter.rs +++ b/src/obikmer/src/steps/scatter.rs @@ -1,52 +1,30 @@ use std::path::PathBuf; -use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; use indicatif::{ProgressBar, ProgressStyle}; use obikpartitionner::KmerPartition; +use obikrope::Rope; use obisys::{Reporter, Stage}; use tracing::info; -use crate::cli::{PipelineData, open_chunks}; +use crate::cli::{PipelineData, PathWithSlot, open_chunks, throttle_paths}; -// ── Counting semaphore ──────────────────────────────────────────────────────── +// ── Iterator that keeps the slot guard alive until the file is exhausted ────── -struct Semaphore { - state: Mutex, - cvar: Condvar, -} - -impl Semaphore { - fn new(n: usize) -> Arc { - Arc::new(Self { state: Mutex::new(n), cvar: Condvar::new() }) - } - fn acquire(self: &Arc) { - let mut n = self.state.lock().unwrap(); - while *n == 0 { n = self.cvar.wait(n).unwrap(); } - *n -= 1; - } - fn release(self: &Arc) { - *self.state.lock().unwrap() += 1; - self.cvar.notify_one(); - } -} - -struct SemGuard(Arc); -impl Drop for SemGuard { - fn drop(&mut self) { self.0.release(); } -} - -/// Iterator wrapper that holds a semaphore permit for its lifetime. -/// The permit is released when the iterator is exhausted or dropped. struct GuardedIter { inner: I, - _guard: SemGuard, + _guard: Box, } -impl Iterator for GuardedIter { - type Item = I::Item; - fn next(&mut self) -> Option { self.inner.next() } + +impl> Iterator for GuardedIter { + type Item = Rope; + fn next(&mut self) -> Option { + self.inner.next() + } } +// ── scatter ─────────────────────────────────────────────────────────────────── + /// Run scatter: normalise → build superkmers → route to partition → close. /// Reports the "scatter" stage to `rep`. pub fn scatter( @@ -61,18 +39,20 @@ pub fn scatter( ) { use obikseq::RoutableSuperKmer; - let sem = Semaphore::new(max_open); + // Throttle in the source thread — never in a worker — to prevent deadlock. + let throttled = throttle_paths(path_source, max_open); let t = Stage::start("scatter"); let pipe = obipipeline::make_pipe! { - PipelineData : PathBuf => Vec, - ||? { move |path: PathBuf| { - sem.acquire(); + PipelineData : PathWithSlot => Vec, + ||? { move |pw: PathWithSlot| { + let PathWithSlot { path, _guard } = pw; info!("indexing: {}", path.display()); - open_chunks(path).map(|iter| GuardedIter { inner: iter, _guard: SemGuard(Arc::clone(&sem)) }) + // _guard travels into GuardedIter; released when all chunks are read + open_chunks(path).map(|iter| GuardedIter { inner: iter, _guard }) }} : Path => RawChunk, - |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, - | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, + |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, + | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) } : NormChunk => Batch, }; let pb = ProgressBar::new_spinner(); @@ -89,7 +69,7 @@ pub fn scatter( let mut last_bases: u64 = 0; const ALPHA: f64 = 0.15; - for batch in pipe.apply(path_source, n_workers, 1) { + for batch in pipe.apply(throttled, n_workers, 1) { total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::(); let now = Instant::now(); let dt = now.duration_since(last_t).as_secs_f64(); @@ -115,4 +95,3 @@ pub fn scatter( kp.close().expect("close error"); rep.push(t.stop()); } -