Merge pull request 'feat: Implement RAII-based file handle throttling' (#7) from push-qtnvlqlooklx into main
Reviewed-on: #7
This commit was merged in pull request #7.
This commit is contained in:
+64
-2
@@ -1,5 +1,6 @@
|
|||||||
use std::io;
|
use std::io;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::sync::{Arc, Condvar, Mutex};
|
||||||
|
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use obikrope::Rope;
|
use obikrope::Rope;
|
||||||
@@ -43,7 +44,7 @@ pub struct CommonArgs {
|
|||||||
)]
|
)]
|
||||||
pub threads: usize,
|
pub threads: usize,
|
||||||
|
|
||||||
/// Maximum number of input files open simultaneously during scatter
|
/// Maximum number of input files open simultaneously
|
||||||
#[arg(long, default_value_t = 20)]
|
#[arg(long, default_value_t = 20)]
|
||||||
pub max_open_files: usize,
|
pub max_open_files: usize,
|
||||||
}
|
}
|
||||||
@@ -61,10 +62,54 @@ impl CommonArgs {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Open-file throttling ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
struct FileSlots {
|
||||||
|
count: Mutex<usize>,
|
||||||
|
condvar: Condvar,
|
||||||
|
max: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FileSlots {
|
||||||
|
fn new(max: usize) -> Self {
|
||||||
|
Self { count: Mutex::new(0), condvar: Condvar::new(), max }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn acquire(&self) {
|
||||||
|
let mut count = self.count.lock().unwrap();
|
||||||
|
while *count >= self.max {
|
||||||
|
count = self.condvar.wait(count).unwrap();
|
||||||
|
}
|
||||||
|
*count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn release(&self) {
|
||||||
|
let mut count = self.count.lock().unwrap();
|
||||||
|
*count -= 1;
|
||||||
|
self.condvar.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SlotsGuard(Arc<FileSlots>);
|
||||||
|
|
||||||
|
impl Drop for SlotsGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.0.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── Pipeline data carrier ─────────────────────────────────────────────────────
|
// ── Pipeline data carrier ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// A path bundled with an opaque guard token.
|
||||||
|
/// The guard is acquired in the source thread and dropped by the flat worker
|
||||||
|
/// once the file is fully read, releasing the open-file slot.
|
||||||
|
pub struct PathWithSlot {
|
||||||
|
pub path: PathBuf,
|
||||||
|
pub _guard: Box<dyn Send + 'static>,
|
||||||
|
}
|
||||||
|
|
||||||
pub enum PipelineData {
|
pub enum PipelineData {
|
||||||
Path(PathBuf),
|
Path(PathWithSlot),
|
||||||
RawChunk(Rope),
|
RawChunk(Rope),
|
||||||
NormChunk(Rope),
|
NormChunk(Rope),
|
||||||
Batch(Vec<RoutableSuperKmer>),
|
Batch(Vec<RoutableSuperKmer>),
|
||||||
@@ -75,6 +120,23 @@ pub enum PipelineData {
|
|||||||
unsafe impl Send for PipelineData {}
|
unsafe impl Send for PipelineData {}
|
||||||
unsafe impl Sync for PipelineData {}
|
unsafe impl Sync for PipelineData {}
|
||||||
|
|
||||||
|
/// Wrap a path iterator so that at most `max_open` files are open simultaneously.
|
||||||
|
/// Acquisition happens in the caller's thread (the pipeline source thread),
|
||||||
|
/// never inside a worker, preventing deadlocks.
|
||||||
|
pub fn throttle_paths(
|
||||||
|
source: impl Iterator<Item = PathBuf> + Send + 'static,
|
||||||
|
max_open: usize,
|
||||||
|
) -> impl Iterator<Item = PathWithSlot> + Send + 'static {
|
||||||
|
let slots = Arc::new(FileSlots::new(max_open));
|
||||||
|
source.map(move |path| {
|
||||||
|
slots.acquire();
|
||||||
|
PathWithSlot {
|
||||||
|
path,
|
||||||
|
_guard: Box::new(SlotsGuard(Arc::clone(&slots))),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// ── I/O plumbing ──────────────────────────────────────────────────────────────
|
// ── I/O plumbing ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
pub fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> {
|
pub fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> {
|
||||||
|
|||||||
@@ -1,11 +1,10 @@
|
|||||||
use std::io::{self, BufWriter, Write};
|
use std::io::{self, BufWriter, Write};
|
||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use obifastwrite::write_scatter;
|
use obifastwrite::write_scatter;
|
||||||
use obikseq::{RoutableSuperKmer, set_k, set_m};
|
use obikseq::{RoutableSuperKmer, set_k, set_m};
|
||||||
|
|
||||||
use crate::cli::{CommonArgs, PipelineData, open_chunks, partitions_to_bits};
|
use crate::cli::{CommonArgs, PipelineData, PathWithSlot, open_chunks, partitions_to_bits, throttle_paths};
|
||||||
|
|
||||||
#[derive(Args)]
|
#[derive(Args)]
|
||||||
pub struct SuperkmerArgs {
|
pub struct SuperkmerArgs {
|
||||||
@@ -40,17 +39,18 @@ pub fn run(args: SuperkmerArgs) {
|
|||||||
let level_max = args.common.level_max;
|
let level_max = args.common.level_max;
|
||||||
let partition_bits = partitions_to_bits(args.common.partitions);
|
let partition_bits = partitions_to_bits(args.common.partitions);
|
||||||
let n_workers = args.common.threads.max(1);
|
let n_workers = args.common.threads.max(1);
|
||||||
|
let max_open = args.common.max_open_files.max(1);
|
||||||
|
|
||||||
set_k(k);
|
set_k(k);
|
||||||
set_m(m);
|
set_m(m);
|
||||||
|
|
||||||
let path_source = args.common.seqfile_paths();
|
let path_source = throttle_paths(args.common.seqfile_paths(), max_open);
|
||||||
|
|
||||||
let pipe = obipipeline::make_pipe! {
|
let pipe = obipipeline::make_pipe! {
|
||||||
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
|
PipelineData : PathWithSlot => Vec<RoutableSuperKmer>,
|
||||||
||? { |path| open_chunks(path) } : Path => RawChunk,
|
||? { |pw: PathWithSlot| open_chunks(pw.path) } : Path => RawChunk,
|
||||||
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
|
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
|
||||||
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
|
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) } : NormChunk => Batch,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut out = BufWriter::new(io::stdout());
|
let mut out = BufWriter::new(io::stdout());
|
||||||
|
|||||||
@@ -1,52 +1,30 @@
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::{Arc, Condvar, Mutex};
|
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
use obikpartitionner::KmerPartition;
|
use obikpartitionner::KmerPartition;
|
||||||
|
use obikrope::Rope;
|
||||||
use obisys::{Reporter, Stage};
|
use obisys::{Reporter, Stage};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
use crate::cli::{PipelineData, open_chunks};
|
use crate::cli::{PipelineData, PathWithSlot, open_chunks, throttle_paths};
|
||||||
|
|
||||||
// ── Counting semaphore ────────────────────────────────────────────────────────
|
// ── Iterator that keeps the slot guard alive until the file is exhausted ──────
|
||||||
|
|
||||||
struct Semaphore {
|
|
||||||
state: Mutex<usize>,
|
|
||||||
cvar: Condvar,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Semaphore {
|
|
||||||
fn new(n: usize) -> Arc<Self> {
|
|
||||||
Arc::new(Self { state: Mutex::new(n), cvar: Condvar::new() })
|
|
||||||
}
|
|
||||||
fn acquire(self: &Arc<Self>) {
|
|
||||||
let mut n = self.state.lock().unwrap();
|
|
||||||
while *n == 0 { n = self.cvar.wait(n).unwrap(); }
|
|
||||||
*n -= 1;
|
|
||||||
}
|
|
||||||
fn release(self: &Arc<Self>) {
|
|
||||||
*self.state.lock().unwrap() += 1;
|
|
||||||
self.cvar.notify_one();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SemGuard(Arc<Semaphore>);
|
|
||||||
impl Drop for SemGuard {
|
|
||||||
fn drop(&mut self) { self.0.release(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Iterator wrapper that holds a semaphore permit for its lifetime.
|
|
||||||
/// The permit is released when the iterator is exhausted or dropped.
|
|
||||||
struct GuardedIter<I> {
|
struct GuardedIter<I> {
|
||||||
inner: I,
|
inner: I,
|
||||||
_guard: SemGuard,
|
_guard: Box<dyn Send + 'static>,
|
||||||
}
|
}
|
||||||
impl<I: Iterator> Iterator for GuardedIter<I> {
|
|
||||||
type Item = I::Item;
|
impl<I: Iterator<Item = Rope>> Iterator for GuardedIter<I> {
|
||||||
fn next(&mut self) -> Option<Self::Item> { self.inner.next() }
|
type Item = Rope;
|
||||||
|
fn next(&mut self) -> Option<Rope> {
|
||||||
|
self.inner.next()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── scatter ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
/// Run scatter: normalise → build superkmers → route to partition → close.
|
/// Run scatter: normalise → build superkmers → route to partition → close.
|
||||||
/// Reports the "scatter" stage to `rep`.
|
/// Reports the "scatter" stage to `rep`.
|
||||||
pub fn scatter(
|
pub fn scatter(
|
||||||
@@ -61,18 +39,20 @@ pub fn scatter(
|
|||||||
) {
|
) {
|
||||||
use obikseq::RoutableSuperKmer;
|
use obikseq::RoutableSuperKmer;
|
||||||
|
|
||||||
let sem = Semaphore::new(max_open);
|
// Throttle in the source thread — never in a worker — to prevent deadlock.
|
||||||
|
let throttled = throttle_paths(path_source, max_open);
|
||||||
|
|
||||||
let t = Stage::start("scatter");
|
let t = Stage::start("scatter");
|
||||||
let pipe = obipipeline::make_pipe! {
|
let pipe = obipipeline::make_pipe! {
|
||||||
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
|
PipelineData : PathWithSlot => Vec<RoutableSuperKmer>,
|
||||||
||? { move |path: PathBuf| {
|
||? { move |pw: PathWithSlot| {
|
||||||
sem.acquire();
|
let PathWithSlot { path, _guard } = pw;
|
||||||
info!("indexing: {}", path.display());
|
info!("indexing: {}", path.display());
|
||||||
open_chunks(path).map(|iter| GuardedIter { inner: iter, _guard: SemGuard(Arc::clone(&sem)) })
|
// _guard travels into GuardedIter; released when all chunks are read
|
||||||
|
open_chunks(path).map(|iter| GuardedIter { inner: iter, _guard })
|
||||||
}} : Path => RawChunk,
|
}} : Path => RawChunk,
|
||||||
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
|
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
|
||||||
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
|
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) } : NormChunk => Batch,
|
||||||
};
|
};
|
||||||
|
|
||||||
let pb = ProgressBar::new_spinner();
|
let pb = ProgressBar::new_spinner();
|
||||||
@@ -89,7 +69,7 @@ pub fn scatter(
|
|||||||
let mut last_bases: u64 = 0;
|
let mut last_bases: u64 = 0;
|
||||||
const ALPHA: f64 = 0.15;
|
const ALPHA: f64 = 0.15;
|
||||||
|
|
||||||
for batch in pipe.apply(path_source, n_workers, 1) {
|
for batch in pipe.apply(throttled, n_workers, 1) {
|
||||||
total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::<u64>();
|
total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::<u64>();
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let dt = now.duration_since(last_t).as_secs_f64();
|
let dt = now.duration_since(last_t).as_secs_f64();
|
||||||
@@ -115,4 +95,3 @@ pub fn scatter(
|
|||||||
kp.close().expect("close error");
|
kp.close().expect("close error");
|
||||||
rep.push(t.stop());
|
rep.push(t.stop());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user