refactor: Enforce Rayon parallelism and fix merge_layer concurrency
Updated memory guidelines and feedback docs to explicitly classify intra-partition phases as parallel, correcting prior assumptions of sequential execution. Refactored merge_layer.rs to wrap column builders in Arc<Mutex<ColBuilder>> and use Arc::try_unwrap for safe concurrent access, eliminating race conditions and preventing double-closes during pass2.
This commit is contained in:
@@ -453,8 +453,18 @@ impl KmerPartition {
|
||||
WriteBatch(Vec<(Option<usize>, usize, usize, u32)>),
|
||||
}
|
||||
|
||||
let builders = Arc::new(Mutex::new((exist_builders, new_src_builders)));
|
||||
let builders_sink = Arc::clone(&builders);
|
||||
let exist_locked: Vec<Vec<Arc<Mutex<ColBuilder>>>> = exist_builders
|
||||
.into_iter()
|
||||
.map(|layer| layer.into_iter().map(|b| Arc::new(Mutex::new(b))).collect())
|
||||
.collect();
|
||||
let new_locked: Vec<Arc<Mutex<ColBuilder>>> = new_src_builders
|
||||
.into_iter()
|
||||
.map(|b| Arc::new(Mutex::new(b)))
|
||||
.collect();
|
||||
let exist_sink: Vec<Vec<Arc<Mutex<ColBuilder>>>> = exist_locked.iter()
|
||||
.map(|layer| layer.iter().map(Arc::clone).collect())
|
||||
.collect();
|
||||
let new_sink: Vec<Arc<Mutex<ColBuilder>>> = new_locked.iter().map(Arc::clone).collect();
|
||||
let dst_map_t2 = Arc::clone(&dst_map);
|
||||
let new_mphf_t2 = new_mphf.clone();
|
||||
let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
|
||||
@@ -533,11 +543,10 @@ impl KmerPartition {
|
||||
],
|
||||
make_sink!(Pass2Data, {
|
||||
move |ops: Vec<(Option<usize>, usize, usize, u32)>| {
|
||||
let mut guard = builders_sink.lock().unwrap();
|
||||
for (layer_opt, col, slot, val) in ops {
|
||||
match layer_opt {
|
||||
Some(l) => guard.0[l][col].set_val(slot, val),
|
||||
None => guard.1[col].set_val(slot, val),
|
||||
Some(l) => exist_sink[l][col].lock().unwrap().set_val(slot, val),
|
||||
None => new_sink[col].lock().unwrap().set_val(slot, val),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -555,17 +564,16 @@ impl KmerPartition {
|
||||
return Err(SKError::InvalidData { context: "merge pass2", detail: msg });
|
||||
}
|
||||
|
||||
let (exist_builders, new_src_builders) = Arc::try_unwrap(builders)
|
||||
.unwrap_or_else(|_| panic!("pass2: builders not uniquely owned after pipeline"))
|
||||
.into_inner()
|
||||
.unwrap_or_else(|e| e.into_inner());
|
||||
|
||||
let t_close = std::time::Instant::now();
|
||||
// ── Close builders and update metadata ────────────────────────────────
|
||||
for (l, builders) in exist_builders.into_iter().enumerate() {
|
||||
for (l, builders) in exist_locked.into_iter().enumerate() {
|
||||
let layer_dir = dst_index_dir.join(format!("layer_{l}"));
|
||||
for b in builders {
|
||||
b.close()?;
|
||||
Arc::try_unwrap(b)
|
||||
.unwrap_or_else(|_| panic!("pass2: exist_builder[{l}] not uniquely owned"))
|
||||
.into_inner()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
.close()?;
|
||||
}
|
||||
let n = dst_map.layer(l).n();
|
||||
let data_dir = match mode {
|
||||
@@ -575,8 +583,12 @@ impl KmerPartition {
|
||||
write_matrix_meta(&data_dir, n, n_dst_genomes + n_src_total).map_err(SKError::Io)?;
|
||||
}
|
||||
|
||||
for b in new_src_builders {
|
||||
b.close()?;
|
||||
for b in new_locked {
|
||||
Arc::try_unwrap(b)
|
||||
.unwrap_or_else(|_| panic!("pass2: new_builder not uniquely owned"))
|
||||
.into_inner()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
.close()?;
|
||||
}
|
||||
if any_new {
|
||||
let data_dir = match mode {
|
||||
|
||||
Reference in New Issue
Block a user