From 0350ca855bcd7ca3017efdb6098dd82f2f25ddf0 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 1 Jun 2026 13:56:48 +0200 Subject: [PATCH] refactor: streamline merge pipeline and MPHF indexing Replace mphf.find() with direct mphf.index() calls to eliminate absence checks and fallback vectors. Introduce a lightweight MphfOnly wrapper for faster index loading, and standardize k-mer iteration across merge and rebuild layers. Update IndexMeta configuration and n_new calculation to leverage MPHF cardinality, streamlining the overall merge pipeline. --- src/obikindex/src/merge.rs | 4 +- src/obikpartitionner/src/merge_layer.rs | 48 +++++++++++------------ src/obikpartitionner/src/rebuild_layer.rs | 4 +- src/obilayeredmap/src/lib.rs | 2 +- src/obilayeredmap/src/mphf_layer.rs | 24 ++++++++++++ 5 files changed, 51 insertions(+), 31 deletions(-) diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index 4f9cd04..e0744c4 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -96,7 +96,7 @@ impl KmerIndex { let mut meta = IndexMeta::read(output).map_err(OKIError::Io)?; meta.genomes = all_genomes; meta.config.with_counts = mode == MergeMode::Count; - meta.config.evidence = evidence; + meta.config.evidence = evidence.clone(); meta.write(output)?; // In presence/absence mode, purge counts/ directories inherited from @@ -147,7 +147,7 @@ impl KmerIndex { .filter_map(|i| { let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> = remaining_sources.iter().map(|s| (&s.partition, s.meta.genomes.len())).collect(); - let result = dst_partition.merge_partition(i, &srcs, mode, n_dst_genomes, block_bits).err(); + let result = dst_partition.merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence).err(); pb.inc(1); result }) diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 6107f0b..b8673a2 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -9,7 +9,7 @@ use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentCompactIntVecBuilder}; use obikseq::CanonicalKmer; use obiskio::{SKError, SKResult, UnitigFileReader}; -use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfLayer, OLMError}; +use obilayeredmap::{IndexMode, Layer, LayeredMap, MphfOnly, OLMError}; use obilayeredmap::meta::PartitionMeta; use crate::partition::KmerPartition; @@ -47,22 +47,22 @@ impl ColBuilder { pub(crate) enum SrcLayerData { /// Pure set-membership layer (no data matrix): every kmer is present in all genomes. SetMembership, - Presence(MphfLayer, PersistentBitMatrix), - Count(MphfLayer, PersistentCompactIntMatrix), + Presence(MphfOnly, PersistentBitMatrix), + Count(MphfOnly, PersistentCompactIntMatrix), } impl SrcLayerData { - pub(crate) fn open(layer_dir: &Path, merge_mode: MergeMode, index_mode: &IndexMode) -> SKResult { + pub(crate) fn open(layer_dir: &Path, merge_mode: MergeMode) -> SKResult { let presence_dir = layer_dir.join("presence"); let counts_dir = layer_dir.join("counts"); match merge_mode { MergeMode::Presence => { if presence_dir.exists() { - let mphf = MphfLayer::open(layer_dir, index_mode).map_err(olm_to_sk)?; + let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Presence(mphf, mat)) } else if counts_dir.exists() { - let mphf = MphfLayer::open(layer_dir, index_mode).map_err(olm_to_sk)?; + let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Count(mphf, mat)) } else { @@ -71,7 +71,7 @@ impl SrcLayerData { } MergeMode::Count => { if counts_dir.exists() { - let mphf = MphfLayer::open(layer_dir, index_mode).map_err(olm_to_sk)?; + let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Count(mphf, mat)) } else { @@ -82,22 +82,16 @@ impl SrcLayerData { } /// Return one value per source genome for `kmer`. + /// The caller guarantees `kmer` is in the source MPHF domain. + #[inline] pub(crate) fn lookup(&self, kmer: CanonicalKmer, n_genomes: usize) -> Vec { match self { SrcLayerData::SetMembership => vec![1u32; n_genomes], SrcLayerData::Presence(mphf, mat) => { - if let Some(slot) = mphf.find(kmer) { - mat.row(slot).iter().map(|&b| b as u32).collect() - } else { - vec![0u32; n_genomes] - } + mat.row(mphf.index(kmer)).iter().map(|&b| b as u32).collect() } SrcLayerData::Count(mphf, mat) => { - if let Some(slot) = mphf.find(kmer) { - mat.row(slot).iter().copied().collect() - } else { - vec![0u32; n_genomes] - } + mat.row(mphf.index(kmer)).iter().copied().collect() } } } @@ -161,6 +155,7 @@ impl KmerPartition { mode: MergeMode, n_dst_genomes: usize, block_bits: u8, + evidence: &IndexMode, ) -> SKResult<()> { let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); if !dst_index_dir.exists() { @@ -208,7 +203,7 @@ impl KmerPartition { let new_layer_idx = n_dst_layers; let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); - if any_new { + let n_new = if any_new { g.compute_degrees(); fs::create_dir_all(&new_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; @@ -216,16 +211,18 @@ impl KmerPartition { uw.write(&unitig)?; } uw.close()?; - Layer::<()>::build(&new_layer_dir, block_bits, &IndexMode::Exact).map_err(olm_to_sk)?; - } + Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?; + g.len() + } else { + 0 + }; drop(g); let new_mphf = if any_new { - Some(MphfLayer::open(&new_layer_dir, &IndexMode::Exact).map_err(olm_to_sk)?) + Some(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?) } else { None }; - let n_new = new_mphf.as_ref().map_or(0, |m| m.n()); // ── Prepare matrix directories for the new layer ────────────────────── // Absent columns (dst genomes) are written via append_column (all-zero/false). @@ -303,7 +300,7 @@ impl KmerPartition { for l in 0..src_meta.n_layers { let src_layer_dir = src_index_dir.join(format!("layer_{l}")); let reader = UnitigFileReader::open_sequential(&src_layer_dir.join("unitigs.bin"))?; - let src_data = SrcLayerData::open(&src_layer_dir, mode, &src_meta.mode)?; + let src_data = SrcLayerData::open(&src_layer_dir, mode)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { let values = src_data.lookup(kmer, *src_n); @@ -312,9 +309,8 @@ impl KmerPartition { if let Some((dst_layer, hit)) = dst_map.query(kmer) { exist_builders[dst_layer][builder_idx].set_val(hit.slot, value); } else if let Some(ref mphf) = new_mphf { - if let Some(slot) = mphf.find(kmer) { - new_src_builders[builder_idx].set_val(slot, value); - } + let slot = mphf.index(kmer); + new_src_builders[builder_idx].set_val(slot, value); } } } diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs index 907a580..ed6732a 100644 --- a/src/obikpartitionner/src/rebuild_layer.rs +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -117,7 +117,7 @@ impl KmerPartition { if !unitigs_path.exists() { continue; } let reader = UnitigFileReader::open_sequential(&unitigs_path)?; - let src_data = SrcLayerData::open(&src_layer_dir, mode, &src_meta.mode)?; + let src_data = SrcLayerData::open(&src_layer_dir, mode)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { let row = src_data.lookup(kmer, n_genomes); @@ -182,7 +182,7 @@ impl KmerPartition { if !unitigs_path.exists() { continue; } let reader = UnitigFileReader::open_sequential(&unitigs_path)?; - let src_data = SrcLayerData::open(&src_layer_dir, mode, &src_meta.mode)?; + let src_data = SrcLayerData::open(&src_layer_dir, mode)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { let row = src_data.lookup(kmer, n_genomes); diff --git a/src/obilayeredmap/src/lib.rs b/src/obilayeredmap/src/lib.rs index 9b275a0..65ae2e2 100644 --- a/src/obilayeredmap/src/lib.rs +++ b/src/obilayeredmap/src/lib.rs @@ -12,4 +12,4 @@ pub use layer::{Hit, Layer, LayerData}; pub use layered_store::LayeredStore; pub use map::LayeredMap; pub use meta::{IndexMode, PartitionMeta}; -pub use mphf_layer::MphfLayer; +pub use mphf_layer::{MphfLayer, MphfOnly}; diff --git a/src/obilayeredmap/src/mphf_layer.rs b/src/obilayeredmap/src/mphf_layer.rs index 2c318ec..ac6f33d 100644 --- a/src/obilayeredmap/src/mphf_layer.rs +++ b/src/obilayeredmap/src/mphf_layer.rs @@ -129,7 +129,31 @@ impl MphfLayer { } pub fn n(&self) -> usize { self.n } +} +// ── MphfOnly ────────────────────────────────────────────────────────────────── + +/// Lightweight wrapper that loads only the MPHF file, without evidence or unitigs. +/// +/// Use this when the caller guarantees that all queried kmers are in the MPHF +/// domain (e.g. when iterating the source's own unitigs during merge). +pub struct MphfOnly(Mphf); + +impl MphfOnly { + pub fn open(dir: &Path) -> OLMResult { + let mphf: Mphf = Mphf::load_full(&dir.join(MPHF_FILE)) + .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; + Ok(Self(mphf)) + } + + /// Return the slot for `kmer`. Only valid when `kmer` is in the MPHF domain. + #[inline] + pub fn index(&self, kmer: CanonicalKmer) -> usize { + self.0.index(&kmer.raw()) + } +} + +impl MphfLayer { // ── Build helpers ───────────────────────────────────────────────────────── pub fn unitig_writer(dir: &Path) -> OLMResult {