From db730e9cf60603cad46c78e6db076e9d978ac8e0 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 9 Jun 2026 11:04:49 +0200 Subject: [PATCH] refactor: optimize dump partition iteration and add progress tracking Refactor partition iteration to support a generic `on_partition` callback executed after each parallel partition completes. Split the logic into bounded and unbounded paths; the bounded path uses an `AtomicUsize` to enforce row limits, while the unbounded path eliminates atomic contention to improve throughput. Additionally, integrate a progress bar into the dump command by passing an increment callback to `idx.dump()`, ensuring proper initialization and cleanup. --- src/obikindex/src/dump.rs | 74 +++++++++++++++++++++++-------------- src/obikmer/src/cmd/dump.rs | 5 ++- 2 files changed, 51 insertions(+), 28 deletions(-) diff --git a/src/obikindex/src/dump.rs b/src/obikindex/src/dump.rs index 604f77d..756243f 100644 --- a/src/obikindex/src/dump.rs +++ b/src/obikindex/src/dump.rs @@ -21,13 +21,14 @@ impl KmerIndex { /// /// The caller must have set the global kmer length (`obikseq::set_k`) before /// calling this method. - pub fn dump( + pub fn dump( &self, out: &mut W, force_presence: bool, debug: bool, head: Option, filters: &[Box], + on_partition: F, ) -> OKIResult<()> { let genomes = &self.meta.genomes; let use_counts = self.meta.config.with_counts && !force_presence; @@ -45,52 +46,71 @@ impl KmerIndex { writeln!(out)?; // ── Rows — parallel over partitions ─────────────────────────────────── - let n = self.n_partitions(); - let remaining = AtomicUsize::new(head.unwrap_or(usize::MAX)); + let n = self.n_partitions(); - let chunks: Vec>> = (0..n) - .into_par_iter() - .map(|i| { - if remaining.load(Ordering::Relaxed) == 0 { - return Ok(vec![]); - } + let write_row = |buf: &mut Vec, row: &[u32], prefix: &str| { + let _ = buf.write_all(prefix.as_bytes()); + for &v in row { let _ = write!(buf, ",{v}"); } + let _ = buf.write_all(b"\n"); + }; + + let chunks: Vec>> = if let Some(limit) = head { + // ── Bounded: atomic counter, early exit when limit reached ──────── + let remaining = AtomicUsize::new(limit); + (0..n).into_par_iter().map(|i| { + if remaining.load(Ordering::Relaxed) == 0 { return Ok(vec![]); } let mut buf = Vec::::new(); - - let write_kmer = |buf: &mut Vec, row: &[u32], prefix: &str| -> bool { + let try_write = |buf: &mut Vec, row: &[u32], prefix: &str| -> bool { match remaining.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |cur| { if cur > 0 { Some(cur - 1) } else { None } }) { Err(_) => false, - Ok(_) => { - let _ = buf.write_all(prefix.as_bytes()); - for &v in row { let _ = write!(buf, ",{v}"); } - let _ = buf.write_all(b"\n"); - true - } + Ok(_) => { write_row(buf, row, prefix); true } } }; - if debug { self.partition .iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| { - let seq = String::from_utf8(kmer.to_ascii()) - .unwrap_or_else(|_| "?".repeat(kmer_size)); - write_kmer(&mut buf, &row, &format!("{part},{layer},{seq}")) + let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size)); + try_write(&mut buf, &row, &format!("{part},{layer},{seq}")) }) .map_err(OKIError::Partition)?; } else { self.partition .iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| { - let seq = String::from_utf8(kmer.to_ascii()) - .unwrap_or_else(|_| "?".repeat(kmer_size)); - write_kmer(&mut buf, &row, &seq) + let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size)); + try_write(&mut buf, &row, &seq) }) .map_err(OKIError::Partition)?; } - + on_partition(); Ok(buf) - }) - .collect(); + }).collect() + } else { + // ── Unbounded: no atomic, no contention ─────────────────────────── + (0..n).into_par_iter().map(|i| { + let mut buf = Vec::::new(); + if debug { + self.partition + .iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| { + let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size)); + write_row(&mut buf, &row, &format!("{part},{layer},{seq}")); + true + }) + .map_err(OKIError::Partition)?; + } else { + self.partition + .iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| { + let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size)); + write_row(&mut buf, &row, &seq); + true + }) + .map_err(OKIError::Partition)?; + } + on_partition(); + Ok(buf) + }).collect() + }; // ── Sequential write ────────────────────────────────────────────────── for chunk in chunks { diff --git a/src/obikmer/src/cmd/dump.rs b/src/obikmer/src/cmd/dump.rs index 437d7c3..685a151 100644 --- a/src/obikmer/src/cmd/dump.rs +++ b/src/obikmer/src/cmd/dump.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; use clap::Args; use obikindex::KmerIndex; +use obisys::progress_bar; use tracing::info; use super::predicate::FilterArgs; @@ -41,12 +42,14 @@ pub fn run(args: DumpArgs) { ); let filters = args.filter.build_filters(&idx.meta().genomes); + let pb = progress_bar("dump", idx.n_partitions() as u64, "partitions"); let stdout = io::stdout(); let mut out = BufWriter::new(stdout.lock()); - idx.dump(&mut out, args.force_presence, args.debug, args.head, &filters).unwrap_or_else(|e| { + idx.dump(&mut out, args.force_presence, args.debug, args.head, &filters, || pb.inc(1)).unwrap_or_else(|e| { eprintln!("dump error: {e}"); std::process::exit(1); }); + pb.finish_and_clear(); } -- 2.52.0