Merge pull request 'refactor: optimize dump partition iteration and add progress tracking' (#20) from push-xqswlxlvmyrq into main
Reviewed-on: #20
This commit was merged in pull request #20.
This commit is contained in:
+47
-27
@@ -21,13 +21,14 @@ impl KmerIndex {
|
|||||||
///
|
///
|
||||||
/// The caller must have set the global kmer length (`obikseq::set_k`) before
|
/// The caller must have set the global kmer length (`obikseq::set_k`) before
|
||||||
/// calling this method.
|
/// calling this method.
|
||||||
pub fn dump<W: Write>(
|
pub fn dump<W: Write, F: Fn() + Send + Sync>(
|
||||||
&self,
|
&self,
|
||||||
out: &mut W,
|
out: &mut W,
|
||||||
force_presence: bool,
|
force_presence: bool,
|
||||||
debug: bool,
|
debug: bool,
|
||||||
head: Option<usize>,
|
head: Option<usize>,
|
||||||
filters: &[Box<dyn KmerFilter>],
|
filters: &[Box<dyn KmerFilter>],
|
||||||
|
on_partition: F,
|
||||||
) -> OKIResult<()> {
|
) -> OKIResult<()> {
|
||||||
let genomes = &self.meta.genomes;
|
let genomes = &self.meta.genomes;
|
||||||
let use_counts = self.meta.config.with_counts && !force_presence;
|
let use_counts = self.meta.config.with_counts && !force_presence;
|
||||||
@@ -45,52 +46,71 @@ impl KmerIndex {
|
|||||||
writeln!(out)?;
|
writeln!(out)?;
|
||||||
|
|
||||||
// ── Rows — parallel over partitions ───────────────────────────────────
|
// ── Rows — parallel over partitions ───────────────────────────────────
|
||||||
let n = self.n_partitions();
|
let n = self.n_partitions();
|
||||||
let remaining = AtomicUsize::new(head.unwrap_or(usize::MAX));
|
|
||||||
|
|
||||||
let chunks: Vec<OKIResult<Vec<u8>>> = (0..n)
|
let write_row = |buf: &mut Vec<u8>, row: &[u32], prefix: &str| {
|
||||||
.into_par_iter()
|
let _ = buf.write_all(prefix.as_bytes());
|
||||||
.map(|i| {
|
for &v in row { let _ = write!(buf, ",{v}"); }
|
||||||
if remaining.load(Ordering::Relaxed) == 0 {
|
let _ = buf.write_all(b"\n");
|
||||||
return Ok(vec![]);
|
};
|
||||||
}
|
|
||||||
|
let chunks: Vec<OKIResult<Vec<u8>>> = if let Some(limit) = head {
|
||||||
|
// ── Bounded: atomic counter, early exit when limit reached ────────
|
||||||
|
let remaining = AtomicUsize::new(limit);
|
||||||
|
(0..n).into_par_iter().map(|i| {
|
||||||
|
if remaining.load(Ordering::Relaxed) == 0 { return Ok(vec![]); }
|
||||||
let mut buf = Vec::<u8>::new();
|
let mut buf = Vec::<u8>::new();
|
||||||
|
let try_write = |buf: &mut Vec<u8>, row: &[u32], prefix: &str| -> bool {
|
||||||
let write_kmer = |buf: &mut Vec<u8>, row: &[u32], prefix: &str| -> bool {
|
|
||||||
match remaining.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |cur| {
|
match remaining.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |cur| {
|
||||||
if cur > 0 { Some(cur - 1) } else { None }
|
if cur > 0 { Some(cur - 1) } else { None }
|
||||||
}) {
|
}) {
|
||||||
Err(_) => false,
|
Err(_) => false,
|
||||||
Ok(_) => {
|
Ok(_) => { write_row(buf, row, prefix); true }
|
||||||
let _ = buf.write_all(prefix.as_bytes());
|
|
||||||
for &v in row { let _ = write!(buf, ",{v}"); }
|
|
||||||
let _ = buf.write_all(b"\n");
|
|
||||||
true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if debug {
|
if debug {
|
||||||
self.partition
|
self.partition
|
||||||
.iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| {
|
.iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| {
|
||||||
let seq = String::from_utf8(kmer.to_ascii())
|
let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size));
|
||||||
.unwrap_or_else(|_| "?".repeat(kmer_size));
|
try_write(&mut buf, &row, &format!("{part},{layer},{seq}"))
|
||||||
write_kmer(&mut buf, &row, &format!("{part},{layer},{seq}"))
|
|
||||||
})
|
})
|
||||||
.map_err(OKIError::Partition)?;
|
.map_err(OKIError::Partition)?;
|
||||||
} else {
|
} else {
|
||||||
self.partition
|
self.partition
|
||||||
.iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| {
|
.iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| {
|
||||||
let seq = String::from_utf8(kmer.to_ascii())
|
let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size));
|
||||||
.unwrap_or_else(|_| "?".repeat(kmer_size));
|
try_write(&mut buf, &row, &seq)
|
||||||
write_kmer(&mut buf, &row, &seq)
|
|
||||||
})
|
})
|
||||||
.map_err(OKIError::Partition)?;
|
.map_err(OKIError::Partition)?;
|
||||||
}
|
}
|
||||||
|
on_partition();
|
||||||
Ok(buf)
|
Ok(buf)
|
||||||
})
|
}).collect()
|
||||||
.collect();
|
} else {
|
||||||
|
// ── Unbounded: no atomic, no contention ───────────────────────────
|
||||||
|
(0..n).into_par_iter().map(|i| {
|
||||||
|
let mut buf = Vec::<u8>::new();
|
||||||
|
if debug {
|
||||||
|
self.partition
|
||||||
|
.iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| {
|
||||||
|
let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size));
|
||||||
|
write_row(&mut buf, &row, &format!("{part},{layer},{seq}"));
|
||||||
|
true
|
||||||
|
})
|
||||||
|
.map_err(OKIError::Partition)?;
|
||||||
|
} else {
|
||||||
|
self.partition
|
||||||
|
.iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| {
|
||||||
|
let seq = String::from_utf8(kmer.to_ascii()).unwrap_or_else(|_| "?".repeat(kmer_size));
|
||||||
|
write_row(&mut buf, &row, &seq);
|
||||||
|
true
|
||||||
|
})
|
||||||
|
.map_err(OKIError::Partition)?;
|
||||||
|
}
|
||||||
|
on_partition();
|
||||||
|
Ok(buf)
|
||||||
|
}).collect()
|
||||||
|
};
|
||||||
|
|
||||||
// ── Sequential write ──────────────────────────────────────────────────
|
// ── Sequential write ──────────────────────────────────────────────────
|
||||||
for chunk in chunks {
|
for chunk in chunks {
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ use std::path::PathBuf;
|
|||||||
|
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use obikindex::KmerIndex;
|
use obikindex::KmerIndex;
|
||||||
|
use obisys::progress_bar;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
use super::predicate::FilterArgs;
|
use super::predicate::FilterArgs;
|
||||||
@@ -41,12 +42,14 @@ pub fn run(args: DumpArgs) {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let filters = args.filter.build_filters(&idx.meta().genomes);
|
let filters = args.filter.build_filters(&idx.meta().genomes);
|
||||||
|
let pb = progress_bar("dump", idx.n_partitions() as u64, "partitions");
|
||||||
|
|
||||||
let stdout = io::stdout();
|
let stdout = io::stdout();
|
||||||
let mut out = BufWriter::new(stdout.lock());
|
let mut out = BufWriter::new(stdout.lock());
|
||||||
|
|
||||||
idx.dump(&mut out, args.force_presence, args.debug, args.head, &filters).unwrap_or_else(|e| {
|
idx.dump(&mut out, args.force_presence, args.debug, args.head, &filters, || pb.inc(1)).unwrap_or_else(|e| {
|
||||||
eprintln!("dump error: {e}");
|
eprintln!("dump error: {e}");
|
||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
pb.finish_and_clear();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user