refactor: optimize dump partition iteration and add progress tracking #20

Merged
coissac merged 1 commits from push-xqswlxlvmyrq into main 2026-06-09 09:34:13 +00:00
2 changed files with 51 additions and 28 deletions
+47 -27
View File
@@ -21,13 +21,14 @@ impl KmerIndex {
/// ///
/// The caller must have set the global kmer length (`obikseq::set_k`) before /// The caller must have set the global kmer length (`obikseq::set_k`) before
/// calling this method. /// calling this method.
pub fn dump<W: Write>( pub fn dump<W: Write, F: Fn() + Send + Sync>(
&self, &self,
out: &mut W, out: &mut W,
force_presence: bool, force_presence: bool,
debug: bool, debug: bool,
head: Option<usize>, head: Option<usize>,
filters: &[Box<dyn KmerFilter>], filters: &[Box<dyn KmerFilter>],
on_partition: F,
) -> OKIResult<()> { ) -> OKIResult<()> {
let genomes = &self.meta.genomes; let genomes = &self.meta.genomes;
let use_counts = self.meta.config.with_counts && !force_presence; let use_counts = self.meta.config.with_counts && !force_presence;
@@ -45,52 +46,71 @@ impl KmerIndex {
writeln!(out)?; writeln!(out)?;
// ── Rows — parallel over partitions ─────────────────────────────────── // ── Rows — parallel over partitions ───────────────────────────────────
let n = self.n_partitions(); let n = self.n_partitions();
let remaining = AtomicUsize::new(head.unwrap_or(usize::MAX));
let chunks: Vec<OKIResult<Vec<u8>>> = (0..n) let write_row = |buf: &mut Vec<u8>, row: &[u32], prefix: &str| {
.into_par_iter() let _ = buf.write_all(prefix.as_bytes());
.map(|i| { for &v in row { let _ = write!(buf, ",{v}"); }
if remaining.load(Ordering::Relaxed) == 0 { let _ = buf.write_all(b"\n");
return Ok(vec![]); };
}
let chunks: Vec<OKIResult<Vec<u8>>> = if let Some(limit) = head {
// ── Bounded: atomic counter, early exit when limit reached ────────
let remaining = AtomicUsize::new(limit);
(0..n).into_par_iter().map(|i| {
if remaining.load(Ordering::Relaxed) == 0 { return Ok(vec![]); }
let mut buf = Vec::<u8>::new(); let mut buf = Vec::<u8>::new();
let try_write = |buf: &mut Vec<u8>, row: &[u32], prefix: &str| -> bool {
let write_kmer = |buf: &mut Vec<u8>, row: &[u32], prefix: &str| -> bool {
match remaining.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |cur| { match remaining.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |cur| {
if cur > 0 { Some(cur - 1) } else { None } if cur > 0 { Some(cur - 1) } else { None }
}) { }) {
Err(_) => false, Err(_) => false,
Ok(_) => { Ok(_) => { write_row(buf, row, prefix); true }
let _ = buf.write_all(prefix.as_bytes());
for &v in row { let _ = write!(buf, ",{v}"); }
let _ = buf.write_all(b"\n");
true
}
} }
}; };
if debug { if debug {
self.partition self.partition
.iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| { .iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| {
let seq = String::from_utf8(kmer.to_ascii()) let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size));
.unwrap_or_else(|_| "?".repeat(kmer_size)); try_write(&mut buf, &row, &format!("{part},{layer},{seq}"))
write_kmer(&mut buf, &row, &format!("{part},{layer},{seq}"))
}) })
.map_err(OKIError::Partition)?; .map_err(OKIError::Partition)?;
} else { } else {
self.partition self.partition
.iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| { .iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| {
let seq = String::from_utf8(kmer.to_ascii()) let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size));
.unwrap_or_else(|_| "?".repeat(kmer_size)); try_write(&mut buf, &row, &seq)
write_kmer(&mut buf, &row, &seq)
}) })
.map_err(OKIError::Partition)?; .map_err(OKIError::Partition)?;
} }
on_partition();
Ok(buf) Ok(buf)
}) }).collect()
.collect(); } else {
// ── Unbounded: no atomic, no contention ───────────────────────────
(0..n).into_par_iter().map(|i| {
let mut buf = Vec::<u8>::new();
if debug {
self.partition
.iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| {
let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size));
write_row(&mut buf, &row, &format!("{part},{layer},{seq}"));
true
})
.map_err(OKIError::Partition)?;
} else {
self.partition
.iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| {
let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size));
write_row(&mut buf, &row, &seq);
true
})
.map_err(OKIError::Partition)?;
}
on_partition();
Ok(buf)
}).collect()
};
// ── Sequential write ────────────────────────────────────────────────── // ── Sequential write ──────────────────────────────────────────────────
for chunk in chunks { for chunk in chunks {
+4 -1
View File
@@ -3,6 +3,7 @@ use std::path::PathBuf;
use clap::Args; use clap::Args;
use obikindex::KmerIndex; use obikindex::KmerIndex;
use obisys::progress_bar;
use tracing::info; use tracing::info;
use super::predicate::FilterArgs; use super::predicate::FilterArgs;
@@ -41,12 +42,14 @@ pub fn run(args: DumpArgs) {
); );
let filters = args.filter.build_filters(&idx.meta().genomes); let filters = args.filter.build_filters(&idx.meta().genomes);
let pb = progress_bar("dump", idx.n_partitions() as u64, "partitions");
let stdout = io::stdout(); let stdout = io::stdout();
let mut out = BufWriter::new(stdout.lock()); let mut out = BufWriter::new(stdout.lock());
idx.dump(&mut out, args.force_presence, args.debug, args.head, &filters).unwrap_or_else(|e| { idx.dump(&mut out, args.force_presence, args.debug, args.head, &filters, || pb.inc(1)).unwrap_or_else(|e| {
eprintln!("dump error: {e}"); eprintln!("dump error: {e}");
std::process::exit(1); std::process::exit(1);
}); });
pb.finish_and_clear();
} }