Parallelize partition iteration using Rayon
Introduce thread-local `Vec<u8>` buffers to eliminate concurrent I/O contention. Replace the mutable row counter with an `AtomicUsize` and `fetch_update` to enable lock-free early termination when the limit is reached. Collected chunks are then written sequentially to preserve partition ordering.
This commit is contained in:
+45
-19
@@ -1,4 +1,7 @@
|
|||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
|
use rayon::prelude::*;
|
||||||
|
|
||||||
use crate::error::{OKIError, OKIResult};
|
use crate::error::{OKIError, OKIResult};
|
||||||
use crate::index::KmerIndex;
|
use crate::index::KmerIndex;
|
||||||
@@ -13,6 +16,9 @@ impl KmerIndex {
|
|||||||
/// `force_presence` overrides `with_counts`: even if the index stores counts,
|
/// `force_presence` overrides `with_counts`: even if the index stores counts,
|
||||||
/// the output uses 0/1 presence columns.
|
/// the output uses 0/1 presence columns.
|
||||||
///
|
///
|
||||||
|
/// Partitions are scanned in parallel; each partition buffers its output locally
|
||||||
|
/// before the main thread writes the chunks in partition order.
|
||||||
|
///
|
||||||
/// The caller must have set the global kmer length (`obikseq::set_k`) before
|
/// The caller must have set the global kmer length (`obikseq::set_k`) before
|
||||||
/// calling this method.
|
/// calling this method.
|
||||||
pub fn dump<W: Write>(
|
pub fn dump<W: Write>(
|
||||||
@@ -38,37 +44,57 @@ impl KmerIndex {
|
|||||||
}
|
}
|
||||||
writeln!(out)?;
|
writeln!(out)?;
|
||||||
|
|
||||||
// ── Rows ──────────────────────────────────────────────────────────────
|
// ── Rows — parallel over partitions ───────────────────────────────────
|
||||||
let n = self.n_partitions();
|
let n = self.n_partitions();
|
||||||
let mut remaining = head.unwrap_or(usize::MAX);
|
let remaining = AtomicUsize::new(head.unwrap_or(usize::MAX));
|
||||||
for i in 0..n {
|
|
||||||
if remaining == 0 { break; }
|
let chunks: Vec<OKIResult<Vec<u8>>> = (0..n)
|
||||||
let cont = if debug {
|
.into_par_iter()
|
||||||
|
.map(|i| {
|
||||||
|
if remaining.load(Ordering::Relaxed) == 0 {
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
let mut buf = Vec::<u8>::new();
|
||||||
|
|
||||||
|
let write_kmer = |buf: &mut Vec<u8>, row: &[u32], prefix: &str| -> bool {
|
||||||
|
match remaining.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |cur| {
|
||||||
|
if cur > 0 { Some(cur - 1) } else { None }
|
||||||
|
}) {
|
||||||
|
Err(_) => false,
|
||||||
|
Ok(_) => {
|
||||||
|
let _ = buf.write_all(prefix.as_bytes());
|
||||||
|
for &v in row { let _ = write!(buf, ",{v}"); }
|
||||||
|
let _ = buf.write_all(b"\n");
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if debug {
|
||||||
self.partition
|
self.partition
|
||||||
.iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| {
|
.iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| {
|
||||||
let seq = String::from_utf8(kmer.to_ascii())
|
let seq = String::from_utf8(kmer.to_ascii())
|
||||||
.unwrap_or_else(|_| "?".repeat(kmer_size));
|
.unwrap_or_else(|_| "?".repeat(kmer_size));
|
||||||
let _ = write!(out, "{part},{layer},{seq}");
|
write_kmer(&mut buf, &row, &format!("{part},{layer},{seq}"))
|
||||||
for &v in row.iter() { let _ = write!(out, ",{v}"); }
|
|
||||||
let _ = writeln!(out);
|
|
||||||
remaining -= 1;
|
|
||||||
remaining > 0
|
|
||||||
})
|
})
|
||||||
.map_err(OKIError::Partition)?
|
.map_err(OKIError::Partition)?;
|
||||||
} else {
|
} else {
|
||||||
self.partition
|
self.partition
|
||||||
.iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| {
|
.iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| {
|
||||||
let seq = String::from_utf8(kmer.to_ascii())
|
let seq = String::from_utf8(kmer.to_ascii())
|
||||||
.unwrap_or_else(|_| "?".repeat(kmer_size));
|
.unwrap_or_else(|_| "?".repeat(kmer_size));
|
||||||
let _ = write!(out, "{seq}");
|
write_kmer(&mut buf, &row, &seq)
|
||||||
for &v in row.iter() { let _ = write!(out, ",{v}"); }
|
|
||||||
let _ = writeln!(out);
|
|
||||||
remaining -= 1;
|
|
||||||
remaining > 0
|
|
||||||
})
|
})
|
||||||
.map_err(OKIError::Partition)?
|
.map_err(OKIError::Partition)?;
|
||||||
};
|
}
|
||||||
if !cont { break; }
|
|
||||||
|
Ok(buf)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// ── Sequential write ──────────────────────────────────────────────────
|
||||||
|
for chunk in chunks {
|
||||||
|
out.write_all(&chunk?)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
out.flush()?;
|
out.flush()?;
|
||||||
|
|||||||
Reference in New Issue
Block a user