From 2bc189e96259c3c1d3c7abe0b4e1cfcc5da47e9e Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 12 Jun 2026 16:28:03 +0200 Subject: [PATCH 1/3] feat: dynamically compute seed expansion based on RSS Introduce a `peak_rss_bytes()` utility for accurate per-phase RAM measurement. Replace the genome-length heuristic with a dynamic seed expansion ratio based on actual RSS delta. Explicitly drop the `GraphDeBruijn` instance before MPHF construction to prevent resource contention and ensure proper memory management. --- src/obikindex/src/merge.rs | 12 ++++++++---- src/obikpartitionner/src/merge_layer.rs | 6 ++++-- src/obilayeredmap/src/map.rs | 1 + src/obisys/src/lib.rs | 6 ++++++ 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index 0857013..f7879f4 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -5,7 +5,7 @@ use std::path::Path; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; -use obisys::{MemoryBudget, Reporter, Stage, available_memory_bytes, progress_bar, spinner}; +use obisys::{MemoryBudget, Reporter, Stage, available_memory_bytes, peak_rss_bytes, progress_bar, spinner}; use rayon::prelude::*; use tracing::{debug, info}; @@ -200,21 +200,25 @@ impl KmerIndex { let worst_id = order[0]; let worst_bytes = partition_sizes[worst_id]; + let rss_before_pilot = peak_rss_bytes(); let worst_g_len = dst_partition .merge_partition(worst_id, &srcs, mode, n_dst_genomes, block_bits, &evidence) .map_err(OKIError::Partition)?; + let rss_after_pilot = peak_rss_bytes(); pb.inc(1); - let seed_expansion = if worst_bytes > 0 { - worst_g_len as u64 * 16 * 1000 / worst_bytes + let pilot_rss = rss_after_pilot.saturating_sub(rss_before_pilot); + let seed_expansion = if worst_bytes > 0 && pilot_rss > 0 { + pilot_rss * 1000 / worst_bytes } else { FALLBACK_EXPANSION }; info!( "merge_partitions: pilot partition {} — {} unitig bytes → {} new kmers, \ - expansion {:.2}×", + RSS delta {}, expansion {:.2}×", worst_id, worst_bytes, worst_g_len, + fmt_bytes(pilot_rss), seed_expansion as f64 / 1000.0, ); diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 6b55f5d..3cc5a47 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -223,12 +223,14 @@ impl KmerPartition { uw.write(&unitig) })?; uw.close()?; + let n = g.len(); + drop(g); // release GraphDeBruijn before MPHF build Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?; - g.len() + n } else { + drop(g); 0 }; - drop(g); let new_mphf = if any_new { Some(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?) diff --git a/src/obilayeredmap/src/map.rs b/src/obilayeredmap/src/map.rs index 18d3c55..31cf20c 100644 --- a/src/obilayeredmap/src/map.rs +++ b/src/obilayeredmap/src/map.rs @@ -104,3 +104,4 @@ fn layer_dir(root: &Path, i: usize) -> PathBuf { #[cfg(test)] #[path = "tests/map.rs"] mod tests; + diff --git a/src/obisys/src/lib.rs b/src/obisys/src/lib.rs index bf2e678..5f8a3d3 100644 --- a/src/obisys/src/lib.rs +++ b/src/obisys/src/lib.rs @@ -119,6 +119,12 @@ use sysinfo::System; /// /// On macOS, `available_memory()` can return 0 when the memory compressor /// inflates the page count; in that case we fall back to half of total memory. +/// Returns the process peak RSS (high-water mark since process start). +/// Monotonically increasing — use delta before/after a phase to measure its RAM cost. +pub fn peak_rss_bytes() -> u64 { + rss_to_bytes(&get_rusage()) +} + pub fn available_memory_bytes() -> u64 { let sys = System::new_all(); let host_avail = match sys.available_memory() { -- 2.52.0 From ba49af6f9e4b4b94b89d60e3b2d45163742441c7 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 12 Jun 2026 20:54:44 +0200 Subject: [PATCH 2/3] refactor: parallelize merge and partition logic with obipipeline Introduce the `obipipeline` dependency and refactor merge and partition logic to leverage parallel execution. Update `merge_partitions` to use rayon with dynamic memory budgeting and concurrency control via a pilot run. Refactor Pass 1 to concurrently read unitigs, filter kmers through a shared `LayeredMap`, and populate the graph safely. Simplify diagnostics to report total kmer counts and replace manual flags with graph length validation. --- src/Cargo.lock | 1 + src/obikindex/src/merge.rs | 224 ++++++------------------ src/obikpartitionner/Cargo.toml | 3 +- src/obikpartitionner/src/merge_layer.rs | 93 ++++++++-- 4 files changed, 134 insertions(+), 187 deletions(-) diff --git a/src/Cargo.lock b/src/Cargo.lock index df90bda..3108163 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1562,6 +1562,7 @@ dependencies = [ "obikrope", "obikseq", "obilayeredmap", + "obipipeline", "obiread", "obiskbuilder", "obiskio", diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index f7879f4..0380e98 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -2,11 +2,8 @@ use std::collections::HashMap; use std::fs; use std::io; use std::path::Path; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; -use obisys::{MemoryBudget, Reporter, Stage, available_memory_bytes, peak_rss_bytes, progress_bar, spinner}; -use rayon::prelude::*; +use obisys::{Reporter, Stage, progress_bar, spinner}; use tracing::{debug, info}; use obilayeredmap::IndexMode; @@ -22,10 +19,9 @@ pub use obikpartitionner::MergeMode; #[derive(Debug)] struct PartStat { - id: usize, - unitig_bytes: u64, // sum of unitigs.bin across remaining sources - g_len: usize, // actual new kmers inserted into GraphDeBruijn - exp_at_acquire: f64, // expansion factor used to size the budget reservation + id: usize, + unitig_bytes: u64, + g_len: usize, } // ── main merge entry point ──────────────────────────────────────────────────── @@ -195,122 +191,48 @@ impl KmerIndex { let mut order: Vec = (0..n_partitions).collect(); order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i])); - // ── Sequential pilot: worst partition → seed expansion factor ───── - const FALLBACK_EXPANSION: u64 = 4_000; // 4× in fixed-point ×1000 + // ── First partition (largest) ───────────────────────────────────── let worst_id = order[0]; let worst_bytes = partition_sizes[worst_id]; - let rss_before_pilot = peak_rss_bytes(); let worst_g_len = dst_partition .merge_partition(worst_id, &srcs, mode, n_dst_genomes, block_bits, &evidence) .map_err(OKIError::Partition)?; - let rss_after_pilot = peak_rss_bytes(); pb.inc(1); - let pilot_rss = rss_after_pilot.saturating_sub(rss_before_pilot); - let seed_expansion = if worst_bytes > 0 && pilot_rss > 0 { - pilot_rss * 1000 / worst_bytes - } else { - FALLBACK_EXPANSION - }; - info!( - "merge_partitions: pilot partition {} — {} unitig bytes → {} new kmers, \ - RSS delta {}, expansion {:.2}×", - worst_id, worst_bytes, worst_g_len, - fmt_bytes(pilot_rss), - seed_expansion as f64 / 1000.0, + "merge_partitions: first partition {} — {} unitig bytes → {} new kmers", + worst_id, fmt_bytes(worst_bytes), worst_g_len, ); - let part_stats: Arc>> = Arc::new(Mutex::new({ - let mut v = Vec::with_capacity(n_partitions); - v.push(PartStat { - id: worst_id, - unitig_bytes: worst_bytes, - g_len: worst_g_len, - exp_at_acquire: seed_expansion as f64 / 1000.0, - }); - v - })); + let mut part_stats: Vec = Vec::with_capacity(n_partitions); + part_stats.push(PartStat { + id: worst_id, + unitig_bytes: worst_bytes, + g_len: worst_g_len, + }); - let max_expansion = AtomicU64::new(seed_expansion); + // ── Sequential remainder ────────────────────────────────────────── + // One partition at a time; each partition uses an internal pipeline + // (obipipeline) to parallelise file I/O and dst_map filtering. + let _ = budget_fraction; // kept in signature for CLI compatibility + for &i in &order[1..] { + let ubytes = partition_sizes[i]; + debug!("partition {i}: start — {} unitig bytes", fmt_bytes(ubytes)); - // ── Parallel remainder under memory budget ──────────────────────── - let available = available_memory_bytes(); - let budget_bytes = (available as f64 * budget_fraction) as u64; - let budget = Arc::new(MemoryBudget::new(budget_bytes)); + let g_len = dst_partition + .merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence) + .map_err(OKIError::Partition)?; + pb.inc(1); - info!( - "merge_partitions: available RAM {}, budget {:.0}% = {}", - fmt_bytes(available), - budget_fraction * 100.0, - fmt_bytes(budget_bytes), - ); - - let errors: Vec = order[1..].into_par_iter() - .filter_map(|&i| { - let ubytes = partition_sizes[i]; - let exp = max_expansion.load(Ordering::Relaxed); - let cost = ubytes * exp / 1000; - - budget.acquire(cost); - debug!( - "partition {i}: start — est. {} ({:.2}×), \ - {} workers active, {} budget remaining", - fmt_bytes(cost), - exp as f64 / 1000.0, - budget.active(), - fmt_bytes(budget.remaining()), - ); - - let result = dst_partition - .merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence); - budget.release(cost); - pb.inc(1); - - match result { - Ok(g_len) => { - let actual_exp = if ubytes > 0 { - g_len as u64 * 16 * 1000 / ubytes - } else { - 0 - }; - max_expansion.fetch_max(actual_exp, Ordering::Relaxed); - debug!( - "partition {i}: done — {} new kmers, actual {:.2}× \ - (estimated {:.2}×)", - g_len, - actual_exp as f64 / 1000.0, - exp as f64 / 1000.0, - ); - part_stats.lock().unwrap().push(PartStat { - id: i, - unitig_bytes: ubytes, - g_len, - exp_at_acquire: exp as f64 / 1000.0, - }); - None - } - Err(e) => Some(OKIError::Partition(e)), - } - }) - .collect(); - - pb.finish_and_clear(); - if let Some(e) = errors.into_iter().next() { - return Err(e); + debug!("partition {i}: done — {} new kmers", g_len); + part_stats.push(PartStat { id: i, unitig_bytes: ubytes, g_len }); } + pb.finish_and_clear(); + // ── Diagnostic report ───────────────────────────────────────────── - let stats = Arc::try_unwrap(part_stats).unwrap().into_inner().unwrap(); - print_merge_partition_report( - &stats, - available, - budget_fraction, - seed_expansion as f64 / 1000.0, - max_expansion.load(Ordering::Relaxed) as f64 / 1000.0, - budget.peak_active(), - ); + print_merge_partition_report(&part_stats); rep.push(t.stop()); } @@ -332,82 +254,36 @@ impl KmerIndex { // ── Diagnostic report ───────────────────────────────────────────────────────── -fn print_merge_partition_report( - stats: &[PartStat], - available_ram: u64, - budget_fraction: f64, - seed_expansion: f64, - final_expansion: f64, - peak_active: usize, -) { - // Compute actual expansion per partition (skip empty partitions) - let expansions: Vec<(usize, f64)> = stats - .iter() - .filter(|s| s.unitig_bytes > 0) - .map(|s| (s.id, s.g_len as f64 * 16.0 / s.unitig_bytes as f64)) - .collect(); +fn print_merge_partition_report(stats: &[PartStat]) { + let total_new: usize = stats.iter().map(|s| s.g_len).sum(); + let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count(); - if expansions.is_empty() { + if non_empty == 0 { info!("merge_partitions report: no data (all partitions empty)"); return; } - let mut sorted_exp: Vec = expansions.iter().map(|(_, e)| *e).collect(); - sorted_exp.sort_by(|a, b| a.partial_cmp(b).unwrap()); - let n = sorted_exp.len(); - let mean_exp = sorted_exp.iter().sum::() / n as f64; - let median_exp = sorted_exp[n / 2]; - let max_exp = sorted_exp[n - 1]; - - info!("─── merge_partitions memory report ───"); + info!("─── merge_partitions report ───"); info!( - " available RAM : {} budget {:.0}% = {}", - fmt_bytes(available_ram), - budget_fraction * 100.0, - fmt_bytes((available_ram as f64 * budget_fraction) as u64), + " {} partition(s) processed, {} total new kmers", + non_empty, total_new, ); - info!( - " expansion factor — seed: {:.2}× final max: {:.2}× \ - (mean: {:.2}× median: {:.2}× observed max: {:.2}×)", - seed_expansion, final_expansion, mean_exp, median_exp, max_exp, - ); - info!(" peak concurrent workers: {}", peak_active); - // Histogram of actual expansion factors - let min_e = sorted_exp[0]; - let max_e = sorted_exp[n - 1]; - let n_buckets = 8usize; - let bucket_w = (max_e - min_e).max(0.01) / n_buckets as f64; - let mut counts = vec![0usize; n_buckets]; - for &e in &sorted_exp { - let b = (((e - min_e) / bucket_w) as usize).min(n_buckets - 1); - counts[b] += 1; + // Top 8 partitions by new-kmer count + let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect(); + by_new.sort_by_key(|s| std::cmp::Reverse(s.g_len)); + if !by_new.is_empty() { + info!(" top partitions by new kmers:"); + for s in by_new.iter().take(8) { + info!( + " partition {:4} : {}M new kmers ({} unitig bytes)", + s.id, + s.g_len / 1_000_000, + fmt_bytes(s.unitig_bytes), + ); + } } - let max_count = *counts.iter().max().unwrap(); - info!(" expansion factor distribution ({} partitions with data):", n); - for (i, &c) in counts.iter().enumerate() { - let lo = min_e + i as f64 * bucket_w; - let hi = min_e + (i + 1) as f64 * bucket_w; - let bar = "█".repeat(if max_count > 0 { c * 30 / max_count } else { 0 }); - info!(" {:5.2}× – {:5.2}× │{:<30} {}", lo, hi, bar, c); - } - - // Top 8 by actual expansion - let mut by_exp: Vec<(usize, f64)> = expansions.clone(); - by_exp.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap()); - info!(" top partitions by actual expansion factor:"); - for (id, exp) in by_exp.iter().take(8) { - let s = stats.iter().find(|s| s.id == *id).unwrap(); - info!( - " partition {:4} : {:.2}× ({} unitigs → {}M kmers, \ - reserved at {:.2}×)", - id, exp, - fmt_bytes(s.unitig_bytes), - s.g_len / 1_000_000, - s.exp_at_acquire, - ); - } - info!("──────────────────────────────────────"); + info!("───────────────────────────────"); } // ── helpers ─────────────────────────────────────────────────────────────────── diff --git a/src/obikpartitionner/Cargo.toml b/src/obikpartitionner/Cargo.toml index e131573..1c42cab 100644 --- a/src/obikpartitionner/Cargo.toml +++ b/src/obikpartitionner/Cargo.toml @@ -28,4 +28,5 @@ memmap2 = "0.9.10" obicompactvec = { path = "../obicompactvec" } ptr_hash = "1.1" indicatif = "0.17" -obisys = { path = "../obisys" } +obisys = { path = "../obisys" } +obipipeline = { path = "../obipipeline" } diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 3cc5a47..5cb1f70 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -1,6 +1,9 @@ use std::fs; use std::io; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use obipipeline::{Pipeline, WorkerPool, make_flat_transform, make_sink, make_source, make_transform}; use obicompactvec::{ PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, @@ -173,7 +176,7 @@ impl KmerPartition { } load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open - let dst_map = LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?; + let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?); let n_dst_layers = dst_map.n_layers(); let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum(); @@ -187,29 +190,95 @@ impl KmerPartition { } } - // ── Pass 1: classify kmers, build new-kmer de Bruijn graph ─────────── - let mut g = GraphDeBruijn::new(); - let mut any_new = false; + // ── Pass 1: pipeline — parallel file read + dst_map filter + graph fill ─ + // + // Source : list of unitigs.bin paths (one per source × layer) + // Flat : open file, emit Vec batches (BeeGFS parallel I/O) + // Transform: filter via dst_map.query() — thread-safe, LayeredMap<()>: Sync + // Sink : push new kmers into GraphDeBruijn (single thread, no locks needed) + // Collect file paths (propagates load_meta errors before the pipeline starts) + let mut unitig_paths: Vec = Vec::new(); for (src, _) in sources.iter() { let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); if !src_index_dir.exists() { continue; } let src_meta = load_meta(&src_index_dir)?; - for l in 0..src_meta.n_layers { - let unitigs_path = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin"); - let reader = UnitigFileReader::open_sequential(&unitigs_path)?; - for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { - if dst_map.query(kmer).is_none() { - g.push(kmer); - any_new = true; - } + let p = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin"); + if p.exists() { + unitig_paths.push(p); } } } + enum Pass1Data { + File(PathBuf), + Batch(Vec), + NewKmers(Vec), + } + + const BATCH: usize = 4096; + let n_workers = std::thread::available_parallelism().map_or(4, |n| n.get()); + let capacity = n_workers * 8; + + let dst_filter = Arc::clone(&dst_map); + let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new())); + let g_sink = Arc::clone(&g_shared); + let pass1_err: Arc>> = Arc::new(Mutex::new(None)); + let err_cap = Arc::clone(&pass1_err); + + let pipeline = Pipeline::new( + make_source!(Pass1Data, unitig_paths, File), + vec![ + make_flat_transform!(Pass1Data, { + move |path: PathBuf| -> Vec> { + match UnitigFileReader::open_sequential(&path) { + Err(e) => { + *err_cap.lock().unwrap() = Some(e.to_string()); + vec![] + } + Ok(reader) => { + let kmers: Vec = reader + .iter_indexed_canonical_kmers() + .map(|(k, _, _)| k) + .collect(); + kmers.chunks(BATCH).map(|c| c.to_vec()).collect() + } + } + } + }, File, Batch), + make_transform!(Pass1Data, { + move |batch: Vec| -> Vec { + batch.into_iter() + .filter(|&k| dst_filter.query(k).is_none()) + .collect() + } + }, Batch, NewKmers), + ], + make_sink!(Pass1Data, { + move |batch: Vec| { + let mut g = g_sink.lock().unwrap(); + for kmer in batch { + g.push(kmer); + } + } + }, NewKmers), + ); + + WorkerPool::new(pipeline, n_workers, capacity).run(); + + if let Some(msg) = Arc::try_unwrap(pass1_err).unwrap().into_inner().unwrap() { + return Err(SKError::InvalidData { context: "merge pass1", detail: msg }); + } + + let g = Arc::try_unwrap(g_shared) + .unwrap_or_else(|_| panic!("pass1: g_shared not uniquely owned after pipeline")) + .into_inner() + .unwrap_or_else(|e| e.into_inner()); + let any_new = g.len() > 0; + // Build new layer from de Bruijn graph if there are new kmers. let new_layer_idx = n_dst_layers; let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); -- 2.52.0 From b2c83735860ebcfaf435a3b5f8afb1193fe6c183 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 12 Jun 2026 21:28:48 +0200 Subject: [PATCH 3/3] refactor: parallelize merge layer with WorkerPool pipeline Replaces the synchronous sequential loop with a multi-threaded pipeline using `WorkerPool` and custom stage macros. Shared mutable state is wrapped in `Arc>` for thread-safe updates, while pipeline errors are centralized via `Arc>>` to propagate failures before thread join. --- src/obikpartitionner/src/merge_layer.rs | 139 +++++++++++++++++++----- 1 file changed, 113 insertions(+), 26 deletions(-) diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 5cb1f70..275027d 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -301,8 +301,8 @@ impl KmerPartition { 0 }; - let new_mphf = if any_new { - Some(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?) + let new_mphf: Option> = if any_new { + Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?)) } else { None }; @@ -310,7 +310,7 @@ impl KmerPartition { // ── Prepare matrix directories for the new layer ────────────────────── // Absent columns (dst genomes) are written via append_column (all-zero/false). // Source-genome columns are created as mutable builders for pass 2. - let mut new_src_builders: Vec = if any_new { + let new_src_builders: Vec = if any_new { let data_dir = match mode { MergeMode::Presence => new_layer_dir.join("presence"), MergeMode::Count => new_layer_dir.join("counts"), @@ -362,7 +362,7 @@ impl KmerPartition { // Builders for existing layers: n_src_total per layer. // Columns n_dst_genomes .. n_dst_genomes + n_src_total - 1. - let mut exist_builders: Vec> = (0..n_dst_layers) + let exist_builders: Vec> = (0..n_dst_layers) .map(|l| { let layer_dir = dst_index_dir.join(format!("layer_{l}")); let n = dst_map.layer(l).n(); @@ -391,37 +391,124 @@ impl KmerPartition { }) .collect::>()?; - // ── Pass 2: fill builders ───────────────────────────────────────────── - let mut col_offset = 0usize; - for (src, src_n) in sources.iter() { - let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); - if !src_index_dir.exists() { + // ── Pass 2: fill builders (pipeline) ───────────────────────────────── + // Collect source items before the pipeline so load_meta errors propagate + // via ? before any worker thread is spawned. + let mut pass2_items: Vec<(usize, usize, PathBuf)> = Vec::new(); + { + let mut col_offset = 0usize; + for (src, src_n) in sources.iter() { + let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); + if !src_index_dir.exists() { + col_offset += src_n; + continue; + } + let src_meta = load_meta(&src_index_dir)?; + for l in 0..src_meta.n_layers { + let src_layer_dir = src_index_dir.join(format!("layer_{l}")); + if src_layer_dir.join("unitigs.bin").exists() { + pass2_items.push((col_offset, *src_n, src_layer_dir)); + } + } col_offset += src_n; - continue; } - let src_meta = load_meta(&src_index_dir)?; + } - for l in 0..src_meta.n_layers { - let src_layer_dir = src_index_dir.join(format!("layer_{l}")); - let reader = UnitigFileReader::open_sequential(&src_layer_dir.join("unitigs.bin"))?; - let src_data = SrcLayerData::open(&src_layer_dir, mode)?; + enum Pass2Data { + SrcLayer((usize, usize, PathBuf)), + RawBatch((usize, usize, Vec<(CanonicalKmer, Vec)>)), + WriteBatch(Vec<(Option, usize, usize, u32)>), + } - for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { - let values = src_data.lookup(kmer, *src_n); - for (g, &value) in values.iter().enumerate() { - let builder_idx = col_offset + g; - if let Some((dst_layer, hit)) = dst_map.query(kmer) { - exist_builders[dst_layer][builder_idx].set_val(hit.slot, value); - } else if let Some(ref mphf) = new_mphf { - let slot = mphf.index(kmer); - new_src_builders[builder_idx].set_val(slot, value); + let builders = Arc::new(Mutex::new((exist_builders, new_src_builders))); + let builders_sink = Arc::clone(&builders); + let dst_map_t2 = Arc::clone(&dst_map); + let new_mphf_t2 = new_mphf.clone(); + let pass2_err: Arc>> = Arc::new(Mutex::new(None)); + let err_cap2 = Arc::clone(&pass2_err); + + let pipeline2 = Pipeline::new( + make_source!(Pass2Data, pass2_items, SrcLayer), + vec![ + make_flat_transform!(Pass2Data, { + move |(col_offset, src_n, src_layer_dir): (usize, usize, PathBuf)| + -> Vec<(usize, usize, Vec<(CanonicalKmer, Vec)>)> + { + let reader = match UnitigFileReader::open_sequential( + &src_layer_dir.join("unitigs.bin"), + ) { + Ok(r) => r, + Err(e) => { + *err_cap2.lock().unwrap() = Some(e.to_string()); + return vec![]; + } + }; + let src_data = match SrcLayerData::open(&src_layer_dir, mode) { + Ok(d) => d, + Err(e) => { + *err_cap2.lock().unwrap() = Some(e.to_string()); + return vec![]; + } + }; + let all_items: Vec<(CanonicalKmer, Vec)> = reader + .iter_indexed_canonical_kmers() + .map(|(kmer, _, _)| (kmer, src_data.lookup(kmer, src_n))) + .collect(); + all_items + .chunks(BATCH) + .map(|c| (col_offset, src_n, c.to_vec())) + .collect() + } + }, SrcLayer, RawBatch), + make_transform!(Pass2Data, { + move |(col_offset, _src_n, items): (usize, usize, Vec<(CanonicalKmer, Vec)>)| + -> Vec<(Option, usize, usize, u32)> + { + let mut ops: Vec<(Option, usize, usize, u32)> = Vec::new(); + for (kmer, values) in items { + if let Some((dst_layer, hit)) = dst_map_t2.query(kmer) { + for (g, val) in values.into_iter().enumerate() { + ops.push((Some(dst_layer), col_offset + g, hit.slot, val)); + } + } else if let Some(ref mphf) = new_mphf_t2 { + let slot = mphf.index(kmer); + for (g, val) in values.into_iter().enumerate() { + ops.push((None, col_offset + g, slot, val)); + } + } + } + ops + } + }, RawBatch, WriteBatch), + ], + make_sink!(Pass2Data, { + move |ops: Vec<(Option, usize, usize, u32)>| { + let mut guard = builders_sink.lock().unwrap(); + for (layer_opt, col, slot, val) in ops { + match layer_opt { + Some(l) => guard.0[l][col].set_val(slot, val), + None => guard.1[col].set_val(slot, val), } } } - } - col_offset += src_n; + }, WriteBatch), + ); + + WorkerPool::new(pipeline2, n_workers, capacity).run(); + + if let Some(msg) = Arc::try_unwrap(pass2_err) + .unwrap_or_else(|_| panic!("pass2: pass2_err not uniquely owned")) + .into_inner() + .unwrap_or_else(|e| e.into_inner()) + { + return Err(SKError::InvalidData { context: "merge pass2", detail: msg }); } + let (exist_builders, new_src_builders) = Arc::try_unwrap(builders) + .unwrap_or_else(|_| panic!("pass2: builders not uniquely owned after pipeline")) + .into_inner() + .unwrap_or_else(|e| e.into_inner()); + // ── Close builders and update metadata ──────────────────────────────── for (l, builders) in exist_builders.into_iter().enumerate() { let layer_dir = dst_index_dir.join(format!("layer_{l}")); -- 2.52.0