feat: limit concurrent open files during scatter #6
@@ -42,6 +42,10 @@ pub struct CommonArgs {
|
|||||||
.unwrap_or(1)
|
.unwrap_or(1)
|
||||||
)]
|
)]
|
||||||
pub threads: usize,
|
pub threads: usize,
|
||||||
|
|
||||||
|
/// Maximum number of input files open simultaneously during scatter
|
||||||
|
#[arg(long, default_value_t = 20)]
|
||||||
|
pub max_open_files: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Smallest `b` such that `2^b >= n` (i.e. `n.next_power_of_two().ilog2()`).
|
/// Smallest `b` such that `2^b >= n` (i.e. `n.next_power_of_two().ilog2()`).
|
||||||
|
|||||||
@@ -105,7 +105,8 @@ pub fn run(args: IndexArgs) {
|
|||||||
let theta = args.common.theta;
|
let theta = args.common.theta;
|
||||||
let n_workers = args.common.threads.max(1);
|
let n_workers = args.common.threads.max(1);
|
||||||
|
|
||||||
scatter(idx.partition_mut(), args.common.seqfile_paths(), k, level_max, theta, n_workers, &mut rep);
|
let max_open = args.common.max_open_files.max(1);
|
||||||
|
scatter(idx.partition_mut(), args.common.seqfile_paths(), k, level_max, theta, n_workers, max_open, &mut rep);
|
||||||
|
|
||||||
idx.mark_scattered().unwrap_or_else(|e| {
|
idx.mark_scattered().unwrap_or_else(|e| {
|
||||||
eprintln!("error marking scatter done: {e}");
|
eprintln!("error marking scatter done: {e}");
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::sync::{Arc, Condvar, Mutex};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
@@ -8,23 +9,68 @@ use tracing::info;
|
|||||||
|
|
||||||
use crate::cli::{PipelineData, open_chunks};
|
use crate::cli::{PipelineData, open_chunks};
|
||||||
|
|
||||||
|
// ── Counting semaphore ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
struct Semaphore {
|
||||||
|
state: Mutex<usize>,
|
||||||
|
cvar: Condvar,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Semaphore {
|
||||||
|
fn new(n: usize) -> Arc<Self> {
|
||||||
|
Arc::new(Self { state: Mutex::new(n), cvar: Condvar::new() })
|
||||||
|
}
|
||||||
|
fn acquire(self: &Arc<Self>) {
|
||||||
|
let mut n = self.state.lock().unwrap();
|
||||||
|
while *n == 0 { n = self.cvar.wait(n).unwrap(); }
|
||||||
|
*n -= 1;
|
||||||
|
}
|
||||||
|
fn release(self: &Arc<Self>) {
|
||||||
|
*self.state.lock().unwrap() += 1;
|
||||||
|
self.cvar.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SemGuard(Arc<Semaphore>);
|
||||||
|
impl Drop for SemGuard {
|
||||||
|
fn drop(&mut self) { self.0.release(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Iterator wrapper that holds a semaphore permit for its lifetime.
|
||||||
|
/// The permit is released when the iterator is exhausted or dropped.
|
||||||
|
struct GuardedIter<I> {
|
||||||
|
inner: I,
|
||||||
|
_guard: SemGuard,
|
||||||
|
}
|
||||||
|
impl<I: Iterator> Iterator for GuardedIter<I> {
|
||||||
|
type Item = I::Item;
|
||||||
|
fn next(&mut self) -> Option<Self::Item> { self.inner.next() }
|
||||||
|
}
|
||||||
|
|
||||||
/// Run scatter: normalise → build superkmers → route to partition → close.
|
/// Run scatter: normalise → build superkmers → route to partition → close.
|
||||||
/// Reports the "scatter" stage to `rep`.
|
/// Reports the "scatter" stage to `rep`.
|
||||||
pub fn scatter(
|
pub fn scatter(
|
||||||
kp: &mut KmerPartition,
|
kp: &mut KmerPartition,
|
||||||
path_source: impl Iterator<Item = std::path::PathBuf> + Send + 'static,
|
path_source: impl Iterator<Item = PathBuf> + Send + 'static,
|
||||||
k: usize,
|
k: usize,
|
||||||
level_max: usize,
|
level_max: usize,
|
||||||
theta: f64,
|
theta: f64,
|
||||||
n_workers: usize,
|
n_workers: usize,
|
||||||
|
max_open: usize,
|
||||||
rep: &mut Reporter,
|
rep: &mut Reporter,
|
||||||
) {
|
) {
|
||||||
use obikseq::RoutableSuperKmer;
|
use obikseq::RoutableSuperKmer;
|
||||||
|
|
||||||
|
let sem = Semaphore::new(max_open);
|
||||||
|
|
||||||
let t = Stage::start("scatter");
|
let t = Stage::start("scatter");
|
||||||
let pipe = obipipeline::make_pipe! {
|
let pipe = obipipeline::make_pipe! {
|
||||||
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
|
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
|
||||||
||? { |path: PathBuf| { info!("indexing: {}", path.display()); open_chunks(path) } } : Path => RawChunk,
|
||? { move |path: PathBuf| {
|
||||||
|
sem.acquire();
|
||||||
|
info!("indexing: {}", path.display());
|
||||||
|
open_chunks(path).map(|iter| GuardedIter { inner: iter, _guard: SemGuard(Arc::clone(&sem)) })
|
||||||
|
}} : Path => RawChunk,
|
||||||
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
|
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
|
||||||
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
|
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user