Push rwqsmuvystym #24

Merged
coissac merged 3 commits from push-rwqsmuvystym into main 2026-06-12 19:33:21 +00:00
Showing only changes of commit b2c8373586 - Show all commits
+107 -20
View File
@@ -301,8 +301,8 @@ impl KmerPartition {
0
};
let new_mphf = if any_new {
Some(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?)
let new_mphf: Option<Arc<MphfOnly>> = if any_new {
Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?))
} else {
None
};
@@ -310,7 +310,7 @@ impl KmerPartition {
// ── Prepare matrix directories for the new layer ──────────────────────
// Absent columns (dst genomes) are written via append_column (all-zero/false).
// Source-genome columns are created as mutable builders for pass 2.
let mut new_src_builders: Vec<ColBuilder> = if any_new {
let new_src_builders: Vec<ColBuilder> = if any_new {
let data_dir = match mode {
MergeMode::Presence => new_layer_dir.join("presence"),
MergeMode::Count => new_layer_dir.join("counts"),
@@ -362,7 +362,7 @@ impl KmerPartition {
// Builders for existing layers: n_src_total per layer.
// Columns n_dst_genomes .. n_dst_genomes + n_src_total - 1.
let mut exist_builders: Vec<Vec<ColBuilder>> = (0..n_dst_layers)
let exist_builders: Vec<Vec<ColBuilder>> = (0..n_dst_layers)
.map(|l| {
let layer_dir = dst_index_dir.join(format!("layer_{l}"));
let n = dst_map.layer(l).n();
@@ -391,7 +391,11 @@ impl KmerPartition {
})
.collect::<SKResult<_>>()?;
// ── Pass 2: fill builders ─────────────────────────────────────────────
// ── Pass 2: fill builders (pipeline) ─────────────────────────────────
// Collect source items before the pipeline so load_meta errors propagate
// via ? before any worker thread is spawned.
let mut pass2_items: Vec<(usize, usize, PathBuf)> = Vec::new();
{
let mut col_offset = 0usize;
for (src, src_n) in sources.iter() {
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
@@ -400,27 +404,110 @@ impl KmerPartition {
continue;
}
let src_meta = load_meta(&src_index_dir)?;
for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
let reader = UnitigFileReader::open_sequential(&src_layer_dir.join("unitigs.bin"))?;
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
let values = src_data.lookup(kmer, *src_n);
for (g, &value) in values.iter().enumerate() {
let builder_idx = col_offset + g;
if let Some((dst_layer, hit)) = dst_map.query(kmer) {
exist_builders[dst_layer][builder_idx].set_val(hit.slot, value);
} else if let Some(ref mphf) = new_mphf {
let slot = mphf.index(kmer);
new_src_builders[builder_idx].set_val(slot, value);
}
}
if src_layer_dir.join("unitigs.bin").exists() {
pass2_items.push((col_offset, *src_n, src_layer_dir));
}
}
col_offset += src_n;
}
}
enum Pass2Data {
SrcLayer((usize, usize, PathBuf)),
RawBatch((usize, usize, Vec<(CanonicalKmer, Vec<u32>)>)),
WriteBatch(Vec<(Option<usize>, usize, usize, u32)>),
}
let builders = Arc::new(Mutex::new((exist_builders, new_src_builders)));
let builders_sink = Arc::clone(&builders);
let dst_map_t2 = Arc::clone(&dst_map);
let new_mphf_t2 = new_mphf.clone();
let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let err_cap2 = Arc::clone(&pass2_err);
let pipeline2 = Pipeline::new(
make_source!(Pass2Data, pass2_items, SrcLayer),
vec![
make_flat_transform!(Pass2Data, {
move |(col_offset, src_n, src_layer_dir): (usize, usize, PathBuf)|
-> Vec<(usize, usize, Vec<(CanonicalKmer, Vec<u32>)>)>
{
let reader = match UnitigFileReader::open_sequential(
&src_layer_dir.join("unitigs.bin"),
) {
Ok(r) => r,
Err(e) => {
*err_cap2.lock().unwrap() = Some(e.to_string());
return vec![];
}
};
let src_data = match SrcLayerData::open(&src_layer_dir, mode) {
Ok(d) => d,
Err(e) => {
*err_cap2.lock().unwrap() = Some(e.to_string());
return vec![];
}
};
let all_items: Vec<(CanonicalKmer, Vec<u32>)> = reader
.iter_indexed_canonical_kmers()
.map(|(kmer, _, _)| (kmer, src_data.lookup(kmer, src_n)))
.collect();
all_items
.chunks(BATCH)
.map(|c| (col_offset, src_n, c.to_vec()))
.collect()
}
}, SrcLayer, RawBatch),
make_transform!(Pass2Data, {
move |(col_offset, _src_n, items): (usize, usize, Vec<(CanonicalKmer, Vec<u32>)>)|
-> Vec<(Option<usize>, usize, usize, u32)>
{
let mut ops: Vec<(Option<usize>, usize, usize, u32)> = Vec::new();
for (kmer, values) in items {
if let Some((dst_layer, hit)) = dst_map_t2.query(kmer) {
for (g, val) in values.into_iter().enumerate() {
ops.push((Some(dst_layer), col_offset + g, hit.slot, val));
}
} else if let Some(ref mphf) = new_mphf_t2 {
let slot = mphf.index(kmer);
for (g, val) in values.into_iter().enumerate() {
ops.push((None, col_offset + g, slot, val));
}
}
}
ops
}
}, RawBatch, WriteBatch),
],
make_sink!(Pass2Data, {
move |ops: Vec<(Option<usize>, usize, usize, u32)>| {
let mut guard = builders_sink.lock().unwrap();
for (layer_opt, col, slot, val) in ops {
match layer_opt {
Some(l) => guard.0[l][col].set_val(slot, val),
None => guard.1[col].set_val(slot, val),
}
}
}
}, WriteBatch),
);
WorkerPool::new(pipeline2, n_workers, capacity).run();
if let Some(msg) = Arc::try_unwrap(pass2_err)
.unwrap_or_else(|_| panic!("pass2: pass2_err not uniquely owned"))
.into_inner()
.unwrap_or_else(|e| e.into_inner())
{
return Err(SKError::InvalidData { context: "merge pass2", detail: msg });
}
let (exist_builders, new_src_builders) = Arc::try_unwrap(builders)
.unwrap_or_else(|_| panic!("pass2: builders not uniquely owned after pipeline"))
.into_inner()
.unwrap_or_else(|e| e.into_inner());
// ── Close builders and update metadata ────────────────────────────────
for (l, builders) in exist_builders.into_iter().enumerate() {