From 9d46400898737284aec82d12753bfedd55f92757 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 26 May 2026 09:41:13 +0200 Subject: [PATCH] feat: support exact and approximate evidence in layer construction Refactored `MphfLayer::build` to accept an `EvidenceKind` parameter, routing to exact (index-based, parallel MPHF, writes `evidence.bin`) or approximate (sequential mmap iterator, writes `fingerprint.bin`) pipelines. Introduced `CanonicalKmerIter` for memory-mapped, chunked k-mer iteration with O(1) resets via `Arc`. Updated layer and map APIs to forward evidence kind, added `push_layer` for count matrices, and adjusted tests and public exports accordingly. --- src/obikpartitionner/src/index_layer.rs | 10 +- src/obikpartitionner/src/merge_layer.rs | 4 +- src/obikpartitionner/src/rebuild_layer.rs | 4 +- src/obilayeredmap/src/layer.rs | 13 +- src/obilayeredmap/src/map.rs | 5 +- src/obilayeredmap/src/mphf_layer.rs | 156 +++++++++++++++------- src/obilayeredmap/src/tests/layer.rs | 8 +- src/obiskio/src/lib.rs | 3 +- src/obiskio/src/unitig_index.rs | 80 +++++++++++ 9 files changed, 215 insertions(+), 68 deletions(-) diff --git a/src/obikpartitionner/src/index_layer.rs b/src/obikpartitionner/src/index_layer.rs index 4665197..39069aa 100644 --- a/src/obikpartitionner/src/index_layer.rs +++ b/src/obikpartitionner/src/index_layer.rs @@ -110,7 +110,7 @@ impl KmerPartition { uw.close()?; if with_counts { - Layer::::build(&layer_dir, block_bits, |kmer| { + Layer::::build(&layer_dir, block_bits, evidence, |kmer| { match (&mphf1_opt, &counts1_opt) { (Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())), _ => 1, @@ -118,13 +118,7 @@ impl KmerPartition { }) .map_err(olm_to_sk)?; } else { - Layer::<()>::build(&layer_dir, block_bits).map_err(olm_to_sk)?; - } - - // For approximate evidence: replace the exact evidence bundle with a - // fingerprint. For exact evidence, build() already wrote it. - if let EvidenceKind::Approx { b, z } = evidence { - Layer::<()>::build_approx_evidence(&layer_dir, *b, *z).map_err(olm_to_sk)?; + Layer::<()>::build(&layer_dir, block_bits, evidence).map_err(olm_to_sk)?; } // Write meta.json in the index/ directory so LayeredMap::open works diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index c07de97..d9d5a38 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -9,7 +9,7 @@ use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentCompactIntVecBuilder}; use obikseq::CanonicalKmer; use obiskio::{SKError, SKResult, UnitigFileReader}; -use obilayeredmap::{Layer, LayeredMap, MphfLayer, OLMError}; +use obilayeredmap::{EvidenceKind, Layer, LayeredMap, MphfLayer, OLMError}; use obilayeredmap::meta::PartitionMeta; use crate::partition::KmerPartition; @@ -217,7 +217,7 @@ impl KmerPartition { uw.write(&unitig)?; } uw.close()?; - Layer::<()>::build(&new_layer_dir, block_bits).map_err(olm_to_sk)?; + Layer::<()>::build(&new_layer_dir, block_bits, &EvidenceKind::Exact).map_err(olm_to_sk)?; } drop(g); diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs index 6ba32e4..eb30cb2 100644 --- a/src/obikpartitionner/src/rebuild_layer.rs +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -8,7 +8,7 @@ use obicompactvec::{PersistentBitMatrixBuilder, PersistentCompactIntVecBuilder}; use obidebruinj::GraphDeBruijn; use obiskio::{SKError, SKResult, UnitigFileReader}; -use obilayeredmap::{Layer, MphfLayer, OLMError}; +use obilayeredmap::{EvidenceKind, Layer, MphfLayer, OLMError}; use obilayeredmap::meta::PartitionMeta; use crate::filter::KmerFilter; @@ -146,7 +146,7 @@ impl KmerPartition { uw.close()?; drop(g); - Layer::<()>::build(&dst_layer_dir, block_bits).map_err(olm_to_sk)?; + Layer::<()>::build(&dst_layer_dir, block_bits, &EvidenceKind::Exact).map_err(olm_to_sk)?; let dst_mphf = MphfLayer::open(&dst_layer_dir).map_err(olm_to_sk)?; // ── Prepare matrix builders (one column per genome) ─────────────────── diff --git a/src/obilayeredmap/src/layer.rs b/src/obilayeredmap/src/layer.rs index 6a4f06f..cca2a4f 100644 --- a/src/obilayeredmap/src/layer.rs +++ b/src/obilayeredmap/src/layer.rs @@ -102,8 +102,8 @@ impl Layer { // ── Mode 1 — set membership ─────────────────────────────────────────────────── impl Layer<()> { - pub fn build(out_dir: &Path, block_bits: u8) -> OLMResult { - MphfLayer::build(out_dir, block_bits, &mut |_, _| Ok(())) + pub fn build(out_dir: &Path, block_bits: u8, evidence_kind: &EvidenceKind) -> OLMResult { + MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |_, _| Ok(())) } /// Create a presence matrix for a set-membership layer (first merge). @@ -126,6 +126,7 @@ impl Layer { pub fn build( out_dir: &Path, block_bits: u8, + evidence_kind: &EvidenceKind, count_of: impl Fn(CanonicalKmer) -> u32, ) -> OLMResult { let n = UnitigFileReader::open_sequential(&out_dir.join(UNITIGS_FILE))?.n_kmers(); @@ -133,7 +134,7 @@ impl Layer { let mut mb = PersistentCompactIntMatrixBuilder::new(n, &counts_dir) .map_err(OLMError::Io)?; let mut col = mb.add_col().map_err(OLMError::Io)?; - let n_built = MphfLayer::build(out_dir, block_bits, &mut |slot, kmer| { + let n_built = MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |slot, kmer| { col.set(slot, count_of(kmer)); Ok(()) })?; @@ -145,9 +146,10 @@ impl Layer { pub fn build_from_map( out_dir: &Path, block_bits: u8, + evidence_kind: &EvidenceKind, counts: &HashMap, ) -> OLMResult { - Self::build(out_dir, block_bits, |kmer| counts.get(&kmer).copied().unwrap_or(0)) + Self::build(out_dir, block_bits, evidence_kind, |kmer| counts.get(&kmer).copied().unwrap_or(0)) } } @@ -177,6 +179,7 @@ impl Layer { pub fn build_presence( out_dir: &Path, block_bits: u8, + evidence_kind: &EvidenceKind, n_genomes: usize, present_in: impl Fn(CanonicalKmer, usize) -> bool, ) -> OLMResult { @@ -186,7 +189,7 @@ impl Layer { let mut cols: Vec<_> = (0..n_genomes) .map(|_| mb.add_col().map_err(OLMError::Io)) .collect::>()?; - let n_built = MphfLayer::build(out_dir, block_bits, &mut |slot, kmer| { + let n_built = MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |slot, kmer| { for (g, col) in cols.iter_mut().enumerate() { col.set(slot, present_in(kmer, g)); } diff --git a/src/obilayeredmap/src/map.rs b/src/obilayeredmap/src/map.rs index da01aee..391ca99 100644 --- a/src/obilayeredmap/src/map.rs +++ b/src/obilayeredmap/src/map.rs @@ -5,6 +5,7 @@ use std::path::{Path, PathBuf}; use obicompactvec::PersistentCompactIntMatrix; use obikseq::CanonicalKmer; use obiskio::{UnitigFileWriter, DEFAULT_BLOCK_BITS}; +use crate::meta::EvidenceKind; use crate::error::OLMResult; use crate::layer::{Hit, Layer, LayerData}; @@ -90,7 +91,7 @@ impl LayeredMap<()> { pub fn push_layer(&mut self) -> OLMResult { let i = self.layers.len(); let dir = layer_dir(&self.root, i); - Layer::<()>::build(&dir, DEFAULT_BLOCK_BITS)?; + Layer::<()>::build(&dir, DEFAULT_BLOCK_BITS, &EvidenceKind::Exact)?; self.append_layer()?; Ok(i) } @@ -102,7 +103,7 @@ impl LayeredMap { pub fn push_layer(&mut self, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult { let i = self.layers.len(); let dir = layer_dir(&self.root, i); - Layer::::build(&dir, DEFAULT_BLOCK_BITS, count_of)?; + Layer::::build(&dir, DEFAULT_BLOCK_BITS, &EvidenceKind::Exact, count_of)?; self.append_layer()?; Ok(i) } diff --git a/src/obilayeredmap/src/mphf_layer.rs b/src/obilayeredmap/src/mphf_layer.rs index 3bac003..0e1e0f8 100644 --- a/src/obilayeredmap/src/mphf_layer.rs +++ b/src/obilayeredmap/src/mphf_layer.rs @@ -4,7 +4,7 @@ use std::path::Path; use cacheline_ef::{CachelineEf, CachelineEfVec}; use epserde::prelude::*; use obikseq::CanonicalKmer; -use obiskio::{UnitigFileReader, UnitigFileWriter, build_unitig_idx}; +use obiskio::{CanonicalKmerIter, UnitigFileReader, UnitigFileWriter, build_unitig_idx}; use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64}; use crate::error::{OLMError, OLMResult}; @@ -198,65 +198,131 @@ impl MphfLayer { Ok(n) } - /// Build MPHF and exact evidence from the unitigs file already present in - /// `dir`. Calls `fill_slot(slot, kmer)` once per kmer for DataStore - /// population. Returns the number of kmers indexed. + /// Build MPHF then evidence from the unitigs file already present in `dir`. + /// + /// - Exact: `.idx` is built for pass-1 parallel construction and kept for + /// query-time kmer verification. `evidence.bin` is written. + /// - Approx: pass-1 uses `open_sequential` + `par_bridge` — no `.idx` is + /// ever created. `fingerprint.bin` is written. + /// + /// `fill_slot(slot, kmer)` is called once per kmer in both modes. pub(crate) fn build( dir: &Path, block_bits: u8, + evidence_kind: &EvidenceKind, fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>, ) -> OLMResult { use rayon::prelude::*; let unitig_path = dir.join(UNITIGS_FILE); - build_unitig_idx(&unitig_path, block_bits)?; + match evidence_kind { + // ── Exact path ──────────────────────────────────────────────────── + EvidenceKind::Exact => { + build_unitig_idx(&unitig_path, block_bits)?; - let unitigs = UnitigFileReader::open(&unitig_path)?; - let n = unitigs.n_kmers(); + let unitigs = UnitigFileReader::open(&unitig_path)?; + let n = unitigs.n_kmers(); - if n == 0 { - fs::File::create(dir.join(EVIDENCE_FILE))?; - let mphf: Mphf = - Mphf::try_new(&[] as &[u64], PtrHashParams::::default()) - .ok_or_else(|| OLMError::Mphf("construction failed".into()))?; - mphf.store(&dir.join(MPHF_FILE)) - .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; - LayerMeta::exact().save(dir)?; - return Ok(0); - } + if n == 0 { + fs::File::create(dir.join(EVIDENCE_FILE))?; + let mphf: Mphf = + Mphf::try_new(&[] as &[u64], PtrHashParams::::default()) + .ok_or_else(|| OLMError::Mphf("construction failed".into()))?; + mphf.store(&dir.join(MPHF_FILE)) + .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; + LayerMeta::exact().save(dir)?; + return Ok(0); + } - // Pass 1 — build MPHF (parallel, random access via .idx) - let keys = (0..unitigs.len()) - .into_par_iter() - .flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw())); - let mphf: Mphf = - Mphf::new_from_par_iter(n, keys, PtrHashParams::::default()); - mphf.store(&dir.join(MPHF_FILE)) - .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; + // Pass 1 — parallel MPHF via random access (.idx required) + let keys = (0..unitigs.len()) + .into_par_iter() + .flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw())); + let mphf: Mphf = + Mphf::new_from_par_iter(n, keys, PtrHashParams::::default()); + mphf.store(&dir.join(MPHF_FILE)) + .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; - // Pass 2 — fill evidence + mode-specific data via callback - let unitigs2 = UnitigFileReader::open(&unitig_path)?; - let mut ev = EvidenceWriter::new(n); - let mut seen = vec![0u8; (n + 7) / 8]; + // Pass 2 — sequential: fill evidence.bin + callback + let unitigs2 = UnitigFileReader::open_sequential(&unitig_path)?; + let mut ev = EvidenceWriter::new(n); + let mut seen = vec![0u8; (n + 7) / 8]; - for (kmer, chunk_id, rank) in unitigs2.iter_indexed_canonical_kmers() { - let slot = mphf.index(&kmer.raw()); - if slot >= n { - return Err(OLMError::Mphf("slot out of bounds".into())); + for (kmer, chunk_id, rank) in unitigs2.iter_indexed_canonical_kmers() { + let slot = mphf.index(&kmer.raw()); + if slot >= n { + return Err(OLMError::Mphf("slot out of bounds".into())); + } + let byte = slot / 8; + let bit = 1u8 << (slot % 8); + if seen[byte] & bit != 0 { + return Err(OLMError::Mphf("duplicate slot".into())); + } + seen[byte] |= bit; + ev.set(slot, chunk_id as u32, rank as u8); + fill_slot(slot, kmer)?; + } + + ev.write(&dir.join(EVIDENCE_FILE))?; + LayerMeta::exact().save(dir)?; + Ok(n) } - let byte = slot / 8; - let bit = 1u8 << (slot % 8); - if seen[byte] & bit != 0 { - return Err(OLMError::Mphf("duplicate slot".into())); - } - seen[byte] |= bit; - ev.set(slot, chunk_id as u32, rank as u8); - fill_slot(slot, kmer)?; - } - ev.write(&dir.join(EVIDENCE_FILE))?; - LayerMeta::exact().save(dir)?; - Ok(n) + // ── Approx path ─────────────────────────────────────────────────── + // No .idx is created at any point. + EvidenceKind::Approx { b, z } => { + let unitigs = UnitigFileReader::open_sequential(&unitig_path)?; + let n = unitigs.n_kmers(); + + if n == 0 { + FingerprintVecWriter::new(0, *b).write(&dir.join(FINGERPRINT_FILE))?; + let mphf: Mphf = + Mphf::try_new(&[] as &[u64], PtrHashParams::::default()) + .ok_or_else(|| OLMError::Mphf("construction failed".into()))?; + mphf.store(&dir.join(MPHF_FILE)) + .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; + LayerMeta::approx(*b, *z).save(dir)?; + return Ok(0); + } + + // Pass 1 — MPHF construction via mmap-backed clonable iterator. + // No .idx is created. par_bridge() parallelises the sequential scan; + // Clone on CanonicalKmerRawIter shares the Arc and resets to pos 0. + let keys = CanonicalKmerIter::new(&unitig_path) + .map_err(|e| match e { + obiskio::SKError::Io(io) => OLMError::Io(io), + e => OLMError::InvalidLayer(e.to_string()), + })?; + let mphf: Mphf = + Mphf::new_from_par_iter(n, keys.map(|k| k.raw()).par_bridge(), PtrHashParams::::default()); + mphf.store(&dir.join(MPHF_FILE)) + .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; + + // Pass 2 — sequential: fill fingerprint.bin + callback + let unitigs2 = UnitigFileReader::open_sequential(&unitig_path)?; + let mut fw = FingerprintVecWriter::new(n, *b); + let mut seen = vec![0u8; (n + 7) / 8]; + + for kmer in unitigs2.iter_canonical_kmers() { + let slot = mphf.index(&kmer.raw()); + if slot >= n { + return Err(OLMError::Mphf("slot out of bounds".into())); + } + let byte = slot / 8; + let bit = 1u8 << (slot % 8); + if seen[byte] & bit != 0 { + return Err(OLMError::Mphf("duplicate slot".into())); + } + seen[byte] |= bit; + fw.set(slot, kmer.seq_hash()); + fill_slot(slot, kmer)?; + } + + fw.write(&dir.join(FINGERPRINT_FILE))?; + LayerMeta::approx(*b, *z).save(dir)?; + Ok(n) + } + } } } diff --git a/src/obilayeredmap/src/tests/layer.rs b/src/obilayeredmap/src/tests/layer.rs index 2a9c3da..308fb0d 100644 --- a/src/obilayeredmap/src/tests/layer.rs +++ b/src/obilayeredmap/src/tests/layer.rs @@ -2,6 +2,7 @@ use super::*; use obicompactvec::PersistentCompactIntMatrix; use obikseq::{set_k, Kmer, Sequence as _, Unitig}; use obiskio::DEFAULT_BLOCK_BITS; +use crate::meta::EvidenceKind; use tempfile::tempdir; fn write_unitigs(dir: &Path, seqs: &[&[u8]]) { @@ -24,7 +25,7 @@ fn build_and_query_all_kmers_found() { let dir = tempdir().unwrap(); write_unitigs(dir.path(), &[b"AAAACGT"]); let kmers = all_canonical_kmers(dir.path()); - Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS).unwrap(); + Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact).unwrap(); let layer = Layer::<()>::open(dir.path()).unwrap(); for kmer in kmers { assert!(layer.query(kmer).is_some(), "kmer should be present"); @@ -42,6 +43,7 @@ fn counts_are_stored_and_retrieved() { Layer::::build( dir.path(), DEFAULT_BLOCK_BITS, + &EvidenceKind::Exact, |kmer| count_map.get(&kmer).copied().unwrap_or(0), ).unwrap(); let layer = Layer::::open(dir.path()).unwrap(); @@ -56,7 +58,7 @@ fn query_absent_returns_none() { set_k(4); let dir = tempdir().unwrap(); write_unitigs(dir.path(), &[b"AAAACGT"]); - Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS).unwrap(); + Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact).unwrap(); let layer = Layer::<()>::open(dir.path()).unwrap(); let absent = Kmer::from_ascii(b"CCCC").unwrap().canonical(); assert!(layer.query(absent).is_none()); @@ -67,7 +69,7 @@ fn open_after_build_is_consistent() { set_k(4); let dir = tempdir().unwrap(); write_unitigs(dir.path(), &[b"AAAACGT"]); - let n = Layer::::build(dir.path(), DEFAULT_BLOCK_BITS, |_| 7).unwrap(); + let n = Layer::::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact, |_| 7).unwrap(); assert_eq!(n, 4); let layer = Layer::::open(dir.path()).unwrap(); let kmer = Kmer::from_ascii(b"AAAA").unwrap().canonical(); diff --git a/src/obiskio/src/lib.rs b/src/obiskio/src/lib.rs index 1d960cc..61d9650 100644 --- a/src/obiskio/src/lib.rs +++ b/src/obiskio/src/lib.rs @@ -8,7 +8,8 @@ pub use error::{SKError, SKResult}; pub use meta::SKFileMeta; pub use pool::{SKFilePool, SKFileWriter, SharedPool, create_token, create_token_with}; pub use reader::{SKFileIter, SKFileReader}; -pub use unitig_index::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS}; +pub use unitig_index::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS, + CanonicalKmerIter}; use std::path::{Path, PathBuf}; diff --git a/src/obiskio/src/unitig_index.rs b/src/obiskio/src/unitig_index.rs index 53bdaa5..f5bf310 100644 --- a/src/obiskio/src/unitig_index.rs +++ b/src/obiskio/src/unitig_index.rs @@ -1,6 +1,7 @@ use std::fs::File; use std::io::{BufWriter, Write as _}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use memmap2::Mmap; use obikseq::{CanonicalKmer, Kmer, Unitig}; @@ -439,6 +440,85 @@ fn extract_kmer_raw(bytes: &[u8], j: usize, k: usize) -> u64 { raw << (64 - 2 * k) } +// ── CanonicalKmerRawIter ────────────────────────────────────────────────────── + +// ── CanonicalKmerIter ───────────────────────────────────────────────────────── + +/// Sequential iterator over [`CanonicalKmer`] from a `unitigs.bin` file. +/// +/// Holds an `Arc` so that `Clone` is O(1): both copies share the same +/// memory-mapped pages. Cloning resets the cursor to position 0 — this lets +/// ptr_hash's `new_from_par_iter` (which requires a `Clone`-able parallel +/// iterator via `par_bridge()`) make multiple passes without ever creating +/// a `.idx` file. +pub struct CanonicalKmerIter { + mmap: Arc, + k: usize, + chunk_pos: usize, // byte offset of the current chunk header + data_pos: usize, // byte offset of the current chunk's sequence bytes + n_kmers: usize, // kmers in current chunk + kmer_idx: usize, // next kmer index to yield within the current chunk +} + +impl CanonicalKmerIter { + pub fn new(path: &Path) -> SKResult { + let file = File::open(path).map_err(SKError::Io)?; + let mmap = Arc::new(unsafe { Mmap::map(&file).map_err(SKError::Io)? }); + let k = obikseq::params::k(); + let mut s = Self { mmap, k, chunk_pos: 0, data_pos: 0, n_kmers: 0, kmer_idx: 0 }; + s.load_chunk(); + Ok(s) + } + + #[inline] + fn load_chunk(&mut self) { + if self.chunk_pos < self.mmap.len() { + let seql_minus_k = self.mmap[self.chunk_pos] as usize; + self.n_kmers = seql_minus_k + 1; + self.data_pos = self.chunk_pos + 1; + self.kmer_idx = 0; + } + } +} + +impl Clone for CanonicalKmerIter { + fn clone(&self) -> Self { + let mut c = Self { + mmap: Arc::clone(&self.mmap), + k: self.k, + chunk_pos: 0, + data_pos: 0, + n_kmers: 0, + kmer_idx: 0, + }; + c.load_chunk(); + c + } +} + +impl Iterator for CanonicalKmerIter { + type Item = CanonicalKmer; + + #[inline] + fn next(&mut self) -> Option { + loop { + if self.chunk_pos >= self.mmap.len() { + return None; + } + if self.kmer_idx < self.n_kmers { + let raw = extract_kmer_raw(&self.mmap[self.data_pos..], self.kmer_idx, self.k); + let canon = canonical_raw(raw, self.k); + self.kmer_idx += 1; + return Some(CanonicalKmer::from_raw_unchecked(canon)); + } + let seql_minus_k = self.mmap[self.chunk_pos] as usize; + let byte_len = (seql_minus_k + self.k + 3) / 4; + self.chunk_pos += 1 + byte_len; + self.load_chunk(); + } + } +} + #[cfg(test)] #[path = "tests/unitig_index.rs"] mod tests;