refactor: parallelize merge and partition logic with obipipeline

Introduce the `obipipeline` dependency and refactor merge and partition logic to leverage parallel execution. Update `merge_partitions` to use rayon with dynamic memory budgeting and concurrency control via a pilot run. Refactor Pass 1 to concurrently read unitigs, filter kmers through a shared `LayeredMap`, and populate the graph safely. Simplify diagnostics to report total kmer counts and replace manual flags with graph length validation.
This commit is contained in:
Eric Coissac
2026-06-12 20:54:44 +02:00
parent 2bc189e962
commit ba49af6f9e
4 changed files with 134 additions and 187 deletions
+2 -1
View File
@@ -28,4 +28,5 @@ memmap2 = "0.9.10"
obicompactvec = { path = "../obicompactvec" }
ptr_hash = "1.1"
indicatif = "0.17"
obisys = { path = "../obisys" }
obisys = { path = "../obisys" }
obipipeline = { path = "../obipipeline" }
+81 -12
View File
@@ -1,6 +1,9 @@
use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use obipipeline::{Pipeline, WorkerPool, make_flat_transform, make_sink, make_source, make_transform};
use obicompactvec::{
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
@@ -173,7 +176,7 @@ impl KmerPartition {
}
load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open
let dst_map = LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?;
let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?);
let n_dst_layers = dst_map.n_layers();
let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum();
@@ -187,29 +190,95 @@ impl KmerPartition {
}
}
// ── Pass 1: classify kmers, build new-kmer de Bruijn graph ──────────
let mut g = GraphDeBruijn::new();
let mut any_new = false;
// ── Pass 1: pipeline — parallel file read + dst_map filter + graph fill
//
// Source : list of unitigs.bin paths (one per source × layer)
// Flat : open file, emit Vec<CanonicalKmer> batches (BeeGFS parallel I/O)
// Transform: filter via dst_map.query() — thread-safe, LayeredMap<()>: Sync
// Sink : push new kmers into GraphDeBruijn (single thread, no locks needed)
// Collect file paths (propagates load_meta errors before the pipeline starts)
let mut unitig_paths: Vec<PathBuf> = Vec::new();
for (src, _) in sources.iter() {
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
if !src_index_dir.exists() {
continue;
}
let src_meta = load_meta(&src_index_dir)?;
for l in 0..src_meta.n_layers {
let unitigs_path = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin");
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if dst_map.query(kmer).is_none() {
g.push(kmer);
any_new = true;
}
let p = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin");
if p.exists() {
unitig_paths.push(p);
}
}
}
enum Pass1Data {
File(PathBuf),
Batch(Vec<CanonicalKmer>),
NewKmers(Vec<CanonicalKmer>),
}
const BATCH: usize = 4096;
let n_workers = std::thread::available_parallelism().map_or(4, |n| n.get());
let capacity = n_workers * 8;
let dst_filter = Arc::clone(&dst_map);
let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new()));
let g_sink = Arc::clone(&g_shared);
let pass1_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let err_cap = Arc::clone(&pass1_err);
let pipeline = Pipeline::new(
make_source!(Pass1Data, unitig_paths, File),
vec![
make_flat_transform!(Pass1Data, {
move |path: PathBuf| -> Vec<Vec<CanonicalKmer>> {
match UnitigFileReader::open_sequential(&path) {
Err(e) => {
*err_cap.lock().unwrap() = Some(e.to_string());
vec![]
}
Ok(reader) => {
let kmers: Vec<CanonicalKmer> = reader
.iter_indexed_canonical_kmers()
.map(|(k, _, _)| k)
.collect();
kmers.chunks(BATCH).map(|c| c.to_vec()).collect()
}
}
}
}, File, Batch),
make_transform!(Pass1Data, {
move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> {
batch.into_iter()
.filter(|&k| dst_filter.query(k).is_none())
.collect()
}
}, Batch, NewKmers),
],
make_sink!(Pass1Data, {
move |batch: Vec<CanonicalKmer>| {
let mut g = g_sink.lock().unwrap();
for kmer in batch {
g.push(kmer);
}
}
}, NewKmers),
);
WorkerPool::new(pipeline, n_workers, capacity).run();
if let Some(msg) = Arc::try_unwrap(pass1_err).unwrap().into_inner().unwrap() {
return Err(SKError::InvalidData { context: "merge pass1", detail: msg });
}
let g = Arc::try_unwrap(g_shared)
.unwrap_or_else(|_| panic!("pass1: g_shared not uniquely owned after pipeline"))
.into_inner()
.unwrap_or_else(|e| e.into_inner());
let any_new = g.len() > 0;
// Build new layer from de Bruijn graph if there are new kmers.
let new_layer_idx = n_dst_layers;
let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}"));