feat: Implement RAII-based file handle throttling #7

Merged
coissac merged 1 commits from push-qtnvlqlooklx into main 2026-05-22 12:03:40 +00:00
3 changed files with 93 additions and 52 deletions
Showing only changes of commit ca71e100ef - Show all commits
+64 -2
View File
@@ -1,5 +1,6 @@
use std::io; use std::io;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Condvar, Mutex};
use clap::Args; use clap::Args;
use obikrope::Rope; use obikrope::Rope;
@@ -43,7 +44,7 @@ pub struct CommonArgs {
)] )]
pub threads: usize, pub threads: usize,
/// Maximum number of input files open simultaneously during scatter /// Maximum number of input files open simultaneously
#[arg(long, default_value_t = 20)] #[arg(long, default_value_t = 20)]
pub max_open_files: usize, pub max_open_files: usize,
} }
@@ -61,10 +62,54 @@ impl CommonArgs {
} }
} }
// ── Open-file throttling ──────────────────────────────────────────────────────
struct FileSlots {
count: Mutex<usize>,
condvar: Condvar,
max: usize,
}
impl FileSlots {
fn new(max: usize) -> Self {
Self { count: Mutex::new(0), condvar: Condvar::new(), max }
}
fn acquire(&self) {
let mut count = self.count.lock().unwrap();
while *count >= self.max {
count = self.condvar.wait(count).unwrap();
}
*count += 1;
}
fn release(&self) {
let mut count = self.count.lock().unwrap();
*count -= 1;
self.condvar.notify_one();
}
}
struct SlotsGuard(Arc<FileSlots>);
impl Drop for SlotsGuard {
fn drop(&mut self) {
self.0.release();
}
}
// ── Pipeline data carrier ───────────────────────────────────────────────────── // ── Pipeline data carrier ─────────────────────────────────────────────────────
/// A path bundled with an opaque guard token.
/// The guard is acquired in the source thread and dropped by the flat worker
/// once the file is fully read, releasing the open-file slot.
pub struct PathWithSlot {
pub path: PathBuf,
pub _guard: Box<dyn Send + 'static>,
}
pub enum PipelineData { pub enum PipelineData {
Path(PathBuf), Path(PathWithSlot),
RawChunk(Rope), RawChunk(Rope),
NormChunk(Rope), NormChunk(Rope),
Batch(Vec<RoutableSuperKmer>), Batch(Vec<RoutableSuperKmer>),
@@ -75,6 +120,23 @@ pub enum PipelineData {
unsafe impl Send for PipelineData {} unsafe impl Send for PipelineData {}
unsafe impl Sync for PipelineData {} unsafe impl Sync for PipelineData {}
/// Wrap a path iterator so that at most `max_open` files are open simultaneously.
/// Acquisition happens in the caller's thread (the pipeline source thread),
/// never inside a worker, preventing deadlocks.
pub fn throttle_paths(
source: impl Iterator<Item = PathBuf> + Send + 'static,
max_open: usize,
) -> impl Iterator<Item = PathWithSlot> + Send + 'static {
let slots = Arc::new(FileSlots::new(max_open));
source.map(move |path| {
slots.acquire();
PathWithSlot {
path,
_guard: Box::new(SlotsGuard(Arc::clone(&slots))),
}
})
}
// ── I/O plumbing ────────────────────────────────────────────────────────────── // ── I/O plumbing ──────────────────────────────────────────────────────────────
pub fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> { pub fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> {
+6 -6
View File
@@ -1,11 +1,10 @@
use std::io::{self, BufWriter, Write}; use std::io::{self, BufWriter, Write};
use std::path::PathBuf;
use clap::Args; use clap::Args;
use obifastwrite::write_scatter; use obifastwrite::write_scatter;
use obikseq::{RoutableSuperKmer, set_k, set_m}; use obikseq::{RoutableSuperKmer, set_k, set_m};
use crate::cli::{CommonArgs, PipelineData, open_chunks, partitions_to_bits}; use crate::cli::{CommonArgs, PipelineData, PathWithSlot, open_chunks, partitions_to_bits, throttle_paths};
#[derive(Args)] #[derive(Args)]
pub struct SuperkmerArgs { pub struct SuperkmerArgs {
@@ -40,17 +39,18 @@ pub fn run(args: SuperkmerArgs) {
let level_max = args.common.level_max; let level_max = args.common.level_max;
let partition_bits = partitions_to_bits(args.common.partitions); let partition_bits = partitions_to_bits(args.common.partitions);
let n_workers = args.common.threads.max(1); let n_workers = args.common.threads.max(1);
let max_open = args.common.max_open_files.max(1);
set_k(k); set_k(k);
set_m(m); set_m(m);
let path_source = args.common.seqfile_paths(); let path_source = throttle_paths(args.common.seqfile_paths(), max_open);
let pipe = obipipeline::make_pipe! { let pipe = obipipeline::make_pipe! {
PipelineData : PathBuf => Vec<RoutableSuperKmer>, PipelineData : PathWithSlot => Vec<RoutableSuperKmer>,
||? { |path| open_chunks(path) } : Path => RawChunk, ||? { |pw: PathWithSlot| open_chunks(pw.path) } : Path => RawChunk,
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) } : NormChunk => Batch,
}; };
let mut out = BufWriter::new(io::stdout()); let mut out = BufWriter::new(io::stdout());
+21 -42
View File
@@ -1,52 +1,30 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{ProgressBar, ProgressStyle};
use obikpartitionner::KmerPartition; use obikpartitionner::KmerPartition;
use obikrope::Rope;
use obisys::{Reporter, Stage}; use obisys::{Reporter, Stage};
use tracing::info; use tracing::info;
use crate::cli::{PipelineData, open_chunks}; use crate::cli::{PipelineData, PathWithSlot, open_chunks, throttle_paths};
// ── Counting semaphore ──────────────────────────────────────────────────────── // ── Iterator that keeps the slot guard alive until the file is exhausted ──────
struct Semaphore {
state: Mutex<usize>,
cvar: Condvar,
}
impl Semaphore {
fn new(n: usize) -> Arc<Self> {
Arc::new(Self { state: Mutex::new(n), cvar: Condvar::new() })
}
fn acquire(self: &Arc<Self>) {
let mut n = self.state.lock().unwrap();
while *n == 0 { n = self.cvar.wait(n).unwrap(); }
*n -= 1;
}
fn release(self: &Arc<Self>) {
*self.state.lock().unwrap() += 1;
self.cvar.notify_one();
}
}
struct SemGuard(Arc<Semaphore>);
impl Drop for SemGuard {
fn drop(&mut self) { self.0.release(); }
}
/// Iterator wrapper that holds a semaphore permit for its lifetime.
/// The permit is released when the iterator is exhausted or dropped.
struct GuardedIter<I> { struct GuardedIter<I> {
inner: I, inner: I,
_guard: SemGuard, _guard: Box<dyn Send + 'static>,
} }
impl<I: Iterator> Iterator for GuardedIter<I> {
type Item = I::Item; impl<I: Iterator<Item = Rope>> Iterator for GuardedIter<I> {
fn next(&mut self) -> Option<Self::Item> { self.inner.next() } type Item = Rope;
fn next(&mut self) -> Option<Rope> {
self.inner.next()
}
} }
// ── scatter ───────────────────────────────────────────────────────────────────
/// Run scatter: normalise → build superkmers → route to partition → close. /// Run scatter: normalise → build superkmers → route to partition → close.
/// Reports the "scatter" stage to `rep`. /// Reports the "scatter" stage to `rep`.
pub fn scatter( pub fn scatter(
@@ -61,18 +39,20 @@ pub fn scatter(
) { ) {
use obikseq::RoutableSuperKmer; use obikseq::RoutableSuperKmer;
let sem = Semaphore::new(max_open); // Throttle in the source thread — never in a worker — to prevent deadlock.
let throttled = throttle_paths(path_source, max_open);
let t = Stage::start("scatter"); let t = Stage::start("scatter");
let pipe = obipipeline::make_pipe! { let pipe = obipipeline::make_pipe! {
PipelineData : PathBuf => Vec<RoutableSuperKmer>, PipelineData : PathWithSlot => Vec<RoutableSuperKmer>,
||? { move |path: PathBuf| { ||? { move |pw: PathWithSlot| {
sem.acquire(); let PathWithSlot { path, _guard } = pw;
info!("indexing: {}", path.display()); info!("indexing: {}", path.display());
open_chunks(path).map(|iter| GuardedIter { inner: iter, _guard: SemGuard(Arc::clone(&sem)) }) // _guard travels into GuardedIter; released when all chunks are read
open_chunks(path).map(|iter| GuardedIter { inner: iter, _guard })
}} : Path => RawChunk, }} : Path => RawChunk,
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) } : NormChunk => Batch,
}; };
let pb = ProgressBar::new_spinner(); let pb = ProgressBar::new_spinner();
@@ -89,7 +69,7 @@ pub fn scatter(
let mut last_bases: u64 = 0; let mut last_bases: u64 = 0;
const ALPHA: f64 = 0.15; const ALPHA: f64 = 0.15;
for batch in pipe.apply(path_source, n_workers, 1) { for batch in pipe.apply(throttled, n_workers, 1) {
total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::<u64>(); total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::<u64>();
let now = Instant::now(); let now = Instant::now();
let dt = now.duration_since(last_t).as_secs_f64(); let dt = now.duration_since(last_t).as_secs_f64();
@@ -115,4 +95,3 @@ pub fn scatter(
kp.close().expect("close error"); kp.close().expect("close error");
rep.push(t.stop()); rep.push(t.stop());
} }