diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 5cb1f70..275027d 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -301,8 +301,8 @@ impl KmerPartition { 0 }; - let new_mphf = if any_new { - Some(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?) + let new_mphf: Option> = if any_new { + Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?)) } else { None }; @@ -310,7 +310,7 @@ impl KmerPartition { // ── Prepare matrix directories for the new layer ────────────────────── // Absent columns (dst genomes) are written via append_column (all-zero/false). // Source-genome columns are created as mutable builders for pass 2. - let mut new_src_builders: Vec = if any_new { + let new_src_builders: Vec = if any_new { let data_dir = match mode { MergeMode::Presence => new_layer_dir.join("presence"), MergeMode::Count => new_layer_dir.join("counts"), @@ -362,7 +362,7 @@ impl KmerPartition { // Builders for existing layers: n_src_total per layer. // Columns n_dst_genomes .. n_dst_genomes + n_src_total - 1. - let mut exist_builders: Vec> = (0..n_dst_layers) + let exist_builders: Vec> = (0..n_dst_layers) .map(|l| { let layer_dir = dst_index_dir.join(format!("layer_{l}")); let n = dst_map.layer(l).n(); @@ -391,37 +391,124 @@ impl KmerPartition { }) .collect::>()?; - // ── Pass 2: fill builders ───────────────────────────────────────────── - let mut col_offset = 0usize; - for (src, src_n) in sources.iter() { - let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); - if !src_index_dir.exists() { + // ── Pass 2: fill builders (pipeline) ───────────────────────────────── + // Collect source items before the pipeline so load_meta errors propagate + // via ? before any worker thread is spawned. + let mut pass2_items: Vec<(usize, usize, PathBuf)> = Vec::new(); + { + let mut col_offset = 0usize; + for (src, src_n) in sources.iter() { + let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); + if !src_index_dir.exists() { + col_offset += src_n; + continue; + } + let src_meta = load_meta(&src_index_dir)?; + for l in 0..src_meta.n_layers { + let src_layer_dir = src_index_dir.join(format!("layer_{l}")); + if src_layer_dir.join("unitigs.bin").exists() { + pass2_items.push((col_offset, *src_n, src_layer_dir)); + } + } col_offset += src_n; - continue; } - let src_meta = load_meta(&src_index_dir)?; + } - for l in 0..src_meta.n_layers { - let src_layer_dir = src_index_dir.join(format!("layer_{l}")); - let reader = UnitigFileReader::open_sequential(&src_layer_dir.join("unitigs.bin"))?; - let src_data = SrcLayerData::open(&src_layer_dir, mode)?; + enum Pass2Data { + SrcLayer((usize, usize, PathBuf)), + RawBatch((usize, usize, Vec<(CanonicalKmer, Vec)>)), + WriteBatch(Vec<(Option, usize, usize, u32)>), + } - for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { - let values = src_data.lookup(kmer, *src_n); - for (g, &value) in values.iter().enumerate() { - let builder_idx = col_offset + g; - if let Some((dst_layer, hit)) = dst_map.query(kmer) { - exist_builders[dst_layer][builder_idx].set_val(hit.slot, value); - } else if let Some(ref mphf) = new_mphf { - let slot = mphf.index(kmer); - new_src_builders[builder_idx].set_val(slot, value); + let builders = Arc::new(Mutex::new((exist_builders, new_src_builders))); + let builders_sink = Arc::clone(&builders); + let dst_map_t2 = Arc::clone(&dst_map); + let new_mphf_t2 = new_mphf.clone(); + let pass2_err: Arc>> = Arc::new(Mutex::new(None)); + let err_cap2 = Arc::clone(&pass2_err); + + let pipeline2 = Pipeline::new( + make_source!(Pass2Data, pass2_items, SrcLayer), + vec![ + make_flat_transform!(Pass2Data, { + move |(col_offset, src_n, src_layer_dir): (usize, usize, PathBuf)| + -> Vec<(usize, usize, Vec<(CanonicalKmer, Vec)>)> + { + let reader = match UnitigFileReader::open_sequential( + &src_layer_dir.join("unitigs.bin"), + ) { + Ok(r) => r, + Err(e) => { + *err_cap2.lock().unwrap() = Some(e.to_string()); + return vec![]; + } + }; + let src_data = match SrcLayerData::open(&src_layer_dir, mode) { + Ok(d) => d, + Err(e) => { + *err_cap2.lock().unwrap() = Some(e.to_string()); + return vec![]; + } + }; + let all_items: Vec<(CanonicalKmer, Vec)> = reader + .iter_indexed_canonical_kmers() + .map(|(kmer, _, _)| (kmer, src_data.lookup(kmer, src_n))) + .collect(); + all_items + .chunks(BATCH) + .map(|c| (col_offset, src_n, c.to_vec())) + .collect() + } + }, SrcLayer, RawBatch), + make_transform!(Pass2Data, { + move |(col_offset, _src_n, items): (usize, usize, Vec<(CanonicalKmer, Vec)>)| + -> Vec<(Option, usize, usize, u32)> + { + let mut ops: Vec<(Option, usize, usize, u32)> = Vec::new(); + for (kmer, values) in items { + if let Some((dst_layer, hit)) = dst_map_t2.query(kmer) { + for (g, val) in values.into_iter().enumerate() { + ops.push((Some(dst_layer), col_offset + g, hit.slot, val)); + } + } else if let Some(ref mphf) = new_mphf_t2 { + let slot = mphf.index(kmer); + for (g, val) in values.into_iter().enumerate() { + ops.push((None, col_offset + g, slot, val)); + } + } + } + ops + } + }, RawBatch, WriteBatch), + ], + make_sink!(Pass2Data, { + move |ops: Vec<(Option, usize, usize, u32)>| { + let mut guard = builders_sink.lock().unwrap(); + for (layer_opt, col, slot, val) in ops { + match layer_opt { + Some(l) => guard.0[l][col].set_val(slot, val), + None => guard.1[col].set_val(slot, val), } } } - } - col_offset += src_n; + }, WriteBatch), + ); + + WorkerPool::new(pipeline2, n_workers, capacity).run(); + + if let Some(msg) = Arc::try_unwrap(pass2_err) + .unwrap_or_else(|_| panic!("pass2: pass2_err not uniquely owned")) + .into_inner() + .unwrap_or_else(|e| e.into_inner()) + { + return Err(SKError::InvalidData { context: "merge pass2", detail: msg }); } + let (exist_builders, new_src_builders) = Arc::try_unwrap(builders) + .unwrap_or_else(|_| panic!("pass2: builders not uniquely owned after pipeline")) + .into_inner() + .unwrap_or_else(|e| e.into_inner()); + // ── Close builders and update metadata ──────────────────────────────── for (l, builders) in exist_builders.into_iter().enumerate() { let layer_dir = dst_index_dir.join(format!("layer_{l}"));