From 2465cfbc4b5bf0bf07c087addfcd1eee9e2712cd Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 9 Jun 2026 09:54:41 +0200 Subject: [PATCH] Parallelize partition iteration using Rayon Introduce thread-local `Vec` buffers to eliminate concurrent I/O contention. Replace the mutable row counter with an `AtomicUsize` and `fetch_update` to enable lock-free early termination when the limit is reached. Collected chunks are then written sequentially to preserve partition ordering. --- src/obikindex/src/dump.rs | 88 +++++++++++++++++++++++++-------------- 1 file changed, 57 insertions(+), 31 deletions(-) diff --git a/src/obikindex/src/dump.rs b/src/obikindex/src/dump.rs index 917dff2..604f77d 100644 --- a/src/obikindex/src/dump.rs +++ b/src/obikindex/src/dump.rs @@ -1,4 +1,7 @@ use std::io::Write; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use rayon::prelude::*; use crate::error::{OKIError, OKIResult}; use crate::index::KmerIndex; @@ -13,6 +16,9 @@ impl KmerIndex { /// `force_presence` overrides `with_counts`: even if the index stores counts, /// the output uses 0/1 presence columns. /// + /// Partitions are scanned in parallel; each partition buffers its output locally + /// before the main thread writes the chunks in partition order. + /// /// The caller must have set the global kmer length (`obikseq::set_k`) before /// calling this method. pub fn dump( @@ -38,37 +44,57 @@ impl KmerIndex { } writeln!(out)?; - // ── Rows ────────────────────────────────────────────────────────────── - let n = self.n_partitions(); - let mut remaining = head.unwrap_or(usize::MAX); - for i in 0..n { - if remaining == 0 { break; } - let cont = if debug { - self.partition - .iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| { - let seq = String::from_utf8(kmer.to_ascii()) - .unwrap_or_else(|_| "?".repeat(kmer_size)); - let _ = write!(out, "{part},{layer},{seq}"); - for &v in row.iter() { let _ = write!(out, ",{v}"); } - let _ = writeln!(out); - remaining -= 1; - remaining > 0 - }) - .map_err(OKIError::Partition)? - } else { - self.partition - .iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| { - let seq = String::from_utf8(kmer.to_ascii()) - .unwrap_or_else(|_| "?".repeat(kmer_size)); - let _ = write!(out, "{seq}"); - for &v in row.iter() { let _ = write!(out, ",{v}"); } - let _ = writeln!(out); - remaining -= 1; - remaining > 0 - }) - .map_err(OKIError::Partition)? - }; - if !cont { break; } + // ── Rows — parallel over partitions ─────────────────────────────────── + let n = self.n_partitions(); + let remaining = AtomicUsize::new(head.unwrap_or(usize::MAX)); + + let chunks: Vec>> = (0..n) + .into_par_iter() + .map(|i| { + if remaining.load(Ordering::Relaxed) == 0 { + return Ok(vec![]); + } + let mut buf = Vec::::new(); + + let write_kmer = |buf: &mut Vec, row: &[u32], prefix: &str| -> bool { + match remaining.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |cur| { + if cur > 0 { Some(cur - 1) } else { None } + }) { + Err(_) => false, + Ok(_) => { + let _ = buf.write_all(prefix.as_bytes()); + for &v in row { let _ = write!(buf, ",{v}"); } + let _ = buf.write_all(b"\n"); + true + } + } + }; + + if debug { + self.partition + .iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| { + let seq = String::from_utf8(kmer.to_ascii()) + .unwrap_or_else(|_| "?".repeat(kmer_size)); + write_kmer(&mut buf, &row, &format!("{part},{layer},{seq}")) + }) + .map_err(OKIError::Partition)?; + } else { + self.partition + .iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| { + let seq = String::from_utf8(kmer.to_ascii()) + .unwrap_or_else(|_| "?".repeat(kmer_size)); + write_kmer(&mut buf, &row, &seq) + }) + .map_err(OKIError::Partition)?; + } + + Ok(buf) + }) + .collect(); + + // ── Sequential write ────────────────────────────────────────────────── + for chunk in chunks { + out.write_all(&chunk?)?; } out.flush()?;