feat: support exact and approximate evidence in layer construction

Refactored `MphfLayer::build` to accept an `EvidenceKind` parameter, routing to exact (index-based, parallel MPHF, writes `evidence.bin`) or approximate (sequential mmap iterator, writes `fingerprint.bin`) pipelines. Introduced `CanonicalKmerIter` for memory-mapped, chunked k-mer iteration with O(1) resets via `Arc<Mmap>`. Updated layer and map APIs to forward evidence kind, added `push_layer` for count matrices, and adjusted tests and public exports accordingly.
This commit is contained in:
Eric Coissac
2026-05-26 09:41:13 +02:00
parent 036d044291
commit 9d46400898
9 changed files with 215 additions and 68 deletions
+2 -8
View File
@@ -110,7 +110,7 @@ impl KmerPartition {
uw.close()?;
if with_counts {
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, block_bits, |kmer| {
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, block_bits, evidence, |kmer| {
match (&mphf1_opt, &counts1_opt) {
(Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())),
_ => 1,
@@ -118,13 +118,7 @@ impl KmerPartition {
})
.map_err(olm_to_sk)?;
} else {
Layer::<()>::build(&layer_dir, block_bits).map_err(olm_to_sk)?;
}
// For approximate evidence: replace the exact evidence bundle with a
// fingerprint. For exact evidence, build() already wrote it.
if let EvidenceKind::Approx { b, z } = evidence {
Layer::<()>::build_approx_evidence(&layer_dir, *b, *z).map_err(olm_to_sk)?;
Layer::<()>::build(&layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
}
// Write meta.json in the index/ directory so LayeredMap::open works
+2 -2
View File
@@ -9,7 +9,7 @@ use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder,
PersistentCompactIntVecBuilder};
use obikseq::CanonicalKmer;
use obiskio::{SKError, SKResult, UnitigFileReader};
use obilayeredmap::{Layer, LayeredMap, MphfLayer, OLMError};
use obilayeredmap::{EvidenceKind, Layer, LayeredMap, MphfLayer, OLMError};
use obilayeredmap::meta::PartitionMeta;
use crate::partition::KmerPartition;
@@ -217,7 +217,7 @@ impl KmerPartition {
uw.write(&unitig)?;
}
uw.close()?;
Layer::<()>::build(&new_layer_dir, block_bits).map_err(olm_to_sk)?;
Layer::<()>::build(&new_layer_dir, block_bits, &EvidenceKind::Exact).map_err(olm_to_sk)?;
}
drop(g);
+2 -2
View File
@@ -8,7 +8,7 @@ use obicompactvec::{PersistentBitMatrixBuilder,
PersistentCompactIntVecBuilder};
use obidebruinj::GraphDeBruijn;
use obiskio::{SKError, SKResult, UnitigFileReader};
use obilayeredmap::{Layer, MphfLayer, OLMError};
use obilayeredmap::{EvidenceKind, Layer, MphfLayer, OLMError};
use obilayeredmap::meta::PartitionMeta;
use crate::filter::KmerFilter;
@@ -146,7 +146,7 @@ impl KmerPartition {
uw.close()?;
drop(g);
Layer::<()>::build(&dst_layer_dir, block_bits).map_err(olm_to_sk)?;
Layer::<()>::build(&dst_layer_dir, block_bits, &EvidenceKind::Exact).map_err(olm_to_sk)?;
let dst_mphf = MphfLayer::open(&dst_layer_dir).map_err(olm_to_sk)?;
// ── Prepare matrix builders (one column per genome) ───────────────────
+8 -5
View File
@@ -102,8 +102,8 @@ impl<D: LayerData> Layer<D> {
// ── Mode 1 — set membership ───────────────────────────────────────────────────
impl Layer<()> {
pub fn build(out_dir: &Path, block_bits: u8) -> OLMResult<usize> {
MphfLayer::build(out_dir, block_bits, &mut |_, _| Ok(()))
pub fn build(out_dir: &Path, block_bits: u8, evidence_kind: &EvidenceKind) -> OLMResult<usize> {
MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |_, _| Ok(()))
}
/// Create a presence matrix for a set-membership layer (first merge).
@@ -126,6 +126,7 @@ impl Layer<PersistentCompactIntMatrix> {
pub fn build(
out_dir: &Path,
block_bits: u8,
evidence_kind: &EvidenceKind,
count_of: impl Fn(CanonicalKmer) -> u32,
) -> OLMResult<usize> {
let n = UnitigFileReader::open_sequential(&out_dir.join(UNITIGS_FILE))?.n_kmers();
@@ -133,7 +134,7 @@ impl Layer<PersistentCompactIntMatrix> {
let mut mb = PersistentCompactIntMatrixBuilder::new(n, &counts_dir)
.map_err(OLMError::Io)?;
let mut col = mb.add_col().map_err(OLMError::Io)?;
let n_built = MphfLayer::build(out_dir, block_bits, &mut |slot, kmer| {
let n_built = MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |slot, kmer| {
col.set(slot, count_of(kmer));
Ok(())
})?;
@@ -145,9 +146,10 @@ impl Layer<PersistentCompactIntMatrix> {
pub fn build_from_map(
out_dir: &Path,
block_bits: u8,
evidence_kind: &EvidenceKind,
counts: &HashMap<CanonicalKmer, u32>,
) -> OLMResult<usize> {
Self::build(out_dir, block_bits, |kmer| counts.get(&kmer).copied().unwrap_or(0))
Self::build(out_dir, block_bits, evidence_kind, |kmer| counts.get(&kmer).copied().unwrap_or(0))
}
}
@@ -177,6 +179,7 @@ impl Layer<PersistentBitMatrix> {
pub fn build_presence(
out_dir: &Path,
block_bits: u8,
evidence_kind: &EvidenceKind,
n_genomes: usize,
present_in: impl Fn(CanonicalKmer, usize) -> bool,
) -> OLMResult<usize> {
@@ -186,7 +189,7 @@ impl Layer<PersistentBitMatrix> {
let mut cols: Vec<_> = (0..n_genomes)
.map(|_| mb.add_col().map_err(OLMError::Io))
.collect::<OLMResult<_>>()?;
let n_built = MphfLayer::build(out_dir, block_bits, &mut |slot, kmer| {
let n_built = MphfLayer::build(out_dir, block_bits, evidence_kind, &mut |slot, kmer| {
for (g, col) in cols.iter_mut().enumerate() {
col.set(slot, present_in(kmer, g));
}
+3 -2
View File
@@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
use obicompactvec::PersistentCompactIntMatrix;
use obikseq::CanonicalKmer;
use obiskio::{UnitigFileWriter, DEFAULT_BLOCK_BITS};
use crate::meta::EvidenceKind;
use crate::error::OLMResult;
use crate::layer::{Hit, Layer, LayerData};
@@ -90,7 +91,7 @@ impl LayeredMap<()> {
pub fn push_layer(&mut self) -> OLMResult<usize> {
let i = self.layers.len();
let dir = layer_dir(&self.root, i);
Layer::<()>::build(&dir, DEFAULT_BLOCK_BITS)?;
Layer::<()>::build(&dir, DEFAULT_BLOCK_BITS, &EvidenceKind::Exact)?;
self.append_layer()?;
Ok(i)
}
@@ -102,7 +103,7 @@ impl LayeredMap<PersistentCompactIntMatrix> {
pub fn push_layer(&mut self, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> {
let i = self.layers.len();
let dir = layer_dir(&self.root, i);
Layer::<PersistentCompactIntMatrix>::build(&dir, DEFAULT_BLOCK_BITS, count_of)?;
Layer::<PersistentCompactIntMatrix>::build(&dir, DEFAULT_BLOCK_BITS, &EvidenceKind::Exact, count_of)?;
self.append_layer()?;
Ok(i)
}
+111 -45
View File
@@ -4,7 +4,7 @@ use std::path::Path;
use cacheline_ef::{CachelineEf, CachelineEfVec};
use epserde::prelude::*;
use obikseq::CanonicalKmer;
use obiskio::{UnitigFileReader, UnitigFileWriter, build_unitig_idx};
use obiskio::{CanonicalKmerIter, UnitigFileReader, UnitigFileWriter, build_unitig_idx};
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
use crate::error::{OLMError, OLMResult};
@@ -198,65 +198,131 @@ impl MphfLayer {
Ok(n)
}
/// Build MPHF and exact evidence from the unitigs file already present in
/// `dir`. Calls `fill_slot(slot, kmer)` once per kmer for DataStore
/// population. Returns the number of kmers indexed.
/// Build MPHF then evidence from the unitigs file already present in `dir`.
///
/// - Exact: `.idx` is built for pass-1 parallel construction and kept for
/// query-time kmer verification. `evidence.bin` is written.
/// - Approx: pass-1 uses `open_sequential` + `par_bridge` — no `.idx` is
/// ever created. `fingerprint.bin` is written.
///
/// `fill_slot(slot, kmer)` is called once per kmer in both modes.
pub(crate) fn build(
dir: &Path,
block_bits: u8,
evidence_kind: &EvidenceKind,
fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>,
) -> OLMResult<usize> {
use rayon::prelude::*;
let unitig_path = dir.join(UNITIGS_FILE);
build_unitig_idx(&unitig_path, block_bits)?;
match evidence_kind {
// ── Exact path ────────────────────────────────────────────────────
EvidenceKind::Exact => {
build_unitig_idx(&unitig_path, block_bits)?;
let unitigs = UnitigFileReader::open(&unitig_path)?;
let n = unitigs.n_kmers();
let unitigs = UnitigFileReader::open(&unitig_path)?;
let n = unitigs.n_kmers();
if n == 0 {
fs::File::create(dir.join(EVIDENCE_FILE))?;
let mphf: Mphf =
Mphf::try_new(&[] as &[u64], PtrHashParams::<CubicEps>::default())
.ok_or_else(|| OLMError::Mphf("construction failed".into()))?;
mphf.store(&dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
LayerMeta::exact().save(dir)?;
return Ok(0);
}
if n == 0 {
fs::File::create(dir.join(EVIDENCE_FILE))?;
let mphf: Mphf =
Mphf::try_new(&[] as &[u64], PtrHashParams::<CubicEps>::default())
.ok_or_else(|| OLMError::Mphf("construction failed".into()))?;
mphf.store(&dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
LayerMeta::exact().save(dir)?;
return Ok(0);
}
// Pass 1 — build MPHF (parallel, random access via .idx)
let keys = (0..unitigs.len())
.into_par_iter()
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
let mphf: Mphf =
Mphf::new_from_par_iter(n, keys, PtrHashParams::<CubicEps>::default());
mphf.store(&dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
// Pass 1 — parallel MPHF via random access (.idx required)
let keys = (0..unitigs.len())
.into_par_iter()
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
let mphf: Mphf =
Mphf::new_from_par_iter(n, keys, PtrHashParams::<CubicEps>::default());
mphf.store(&dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
// Pass 2 — fill evidence + mode-specific data via callback
let unitigs2 = UnitigFileReader::open(&unitig_path)?;
let mut ev = EvidenceWriter::new(n);
let mut seen = vec![0u8; (n + 7) / 8];
// Pass 2 — sequential: fill evidence.bin + callback
let unitigs2 = UnitigFileReader::open_sequential(&unitig_path)?;
let mut ev = EvidenceWriter::new(n);
let mut seen = vec![0u8; (n + 7) / 8];
for (kmer, chunk_id, rank) in unitigs2.iter_indexed_canonical_kmers() {
let slot = mphf.index(&kmer.raw());
if slot >= n {
return Err(OLMError::Mphf("slot out of bounds".into()));
for (kmer, chunk_id, rank) in unitigs2.iter_indexed_canonical_kmers() {
let slot = mphf.index(&kmer.raw());
if slot >= n {
return Err(OLMError::Mphf("slot out of bounds".into()));
}
let byte = slot / 8;
let bit = 1u8 << (slot % 8);
if seen[byte] & bit != 0 {
return Err(OLMError::Mphf("duplicate slot".into()));
}
seen[byte] |= bit;
ev.set(slot, chunk_id as u32, rank as u8);
fill_slot(slot, kmer)?;
}
ev.write(&dir.join(EVIDENCE_FILE))?;
LayerMeta::exact().save(dir)?;
Ok(n)
}
let byte = slot / 8;
let bit = 1u8 << (slot % 8);
if seen[byte] & bit != 0 {
return Err(OLMError::Mphf("duplicate slot".into()));
}
seen[byte] |= bit;
ev.set(slot, chunk_id as u32, rank as u8);
fill_slot(slot, kmer)?;
}
ev.write(&dir.join(EVIDENCE_FILE))?;
LayerMeta::exact().save(dir)?;
Ok(n)
// ── Approx path ───────────────────────────────────────────────────
// No .idx is created at any point.
EvidenceKind::Approx { b, z } => {
let unitigs = UnitigFileReader::open_sequential(&unitig_path)?;
let n = unitigs.n_kmers();
if n == 0 {
FingerprintVecWriter::new(0, *b).write(&dir.join(FINGERPRINT_FILE))?;
let mphf: Mphf =
Mphf::try_new(&[] as &[u64], PtrHashParams::<CubicEps>::default())
.ok_or_else(|| OLMError::Mphf("construction failed".into()))?;
mphf.store(&dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
LayerMeta::approx(*b, *z).save(dir)?;
return Ok(0);
}
// Pass 1 — MPHF construction via mmap-backed clonable iterator.
// No .idx is created. par_bridge() parallelises the sequential scan;
// Clone on CanonicalKmerRawIter shares the Arc<Mmap> and resets to pos 0.
let keys = CanonicalKmerIter::new(&unitig_path)
.map_err(|e| match e {
obiskio::SKError::Io(io) => OLMError::Io(io),
e => OLMError::InvalidLayer(e.to_string()),
})?;
let mphf: Mphf =
Mphf::new_from_par_iter(n, keys.map(|k| k.raw()).par_bridge(), PtrHashParams::<CubicEps>::default());
mphf.store(&dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
// Pass 2 — sequential: fill fingerprint.bin + callback
let unitigs2 = UnitigFileReader::open_sequential(&unitig_path)?;
let mut fw = FingerprintVecWriter::new(n, *b);
let mut seen = vec![0u8; (n + 7) / 8];
for kmer in unitigs2.iter_canonical_kmers() {
let slot = mphf.index(&kmer.raw());
if slot >= n {
return Err(OLMError::Mphf("slot out of bounds".into()));
}
let byte = slot / 8;
let bit = 1u8 << (slot % 8);
if seen[byte] & bit != 0 {
return Err(OLMError::Mphf("duplicate slot".into()));
}
seen[byte] |= bit;
fw.set(slot, kmer.seq_hash());
fill_slot(slot, kmer)?;
}
fw.write(&dir.join(FINGERPRINT_FILE))?;
LayerMeta::approx(*b, *z).save(dir)?;
Ok(n)
}
}
}
}
+5 -3
View File
@@ -2,6 +2,7 @@ use super::*;
use obicompactvec::PersistentCompactIntMatrix;
use obikseq::{set_k, Kmer, Sequence as _, Unitig};
use obiskio::DEFAULT_BLOCK_BITS;
use crate::meta::EvidenceKind;
use tempfile::tempdir;
fn write_unitigs(dir: &Path, seqs: &[&[u8]]) {
@@ -24,7 +25,7 @@ fn build_and_query_all_kmers_found() {
let dir = tempdir().unwrap();
write_unitigs(dir.path(), &[b"AAAACGT"]);
let kmers = all_canonical_kmers(dir.path());
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS).unwrap();
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact).unwrap();
let layer = Layer::<()>::open(dir.path()).unwrap();
for kmer in kmers {
assert!(layer.query(kmer).is_some(), "kmer should be present");
@@ -42,6 +43,7 @@ fn counts_are_stored_and_retrieved() {
Layer::<PersistentCompactIntMatrix>::build(
dir.path(),
DEFAULT_BLOCK_BITS,
&EvidenceKind::Exact,
|kmer| count_map.get(&kmer).copied().unwrap_or(0),
).unwrap();
let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap();
@@ -56,7 +58,7 @@ fn query_absent_returns_none() {
set_k(4);
let dir = tempdir().unwrap();
write_unitigs(dir.path(), &[b"AAAACGT"]);
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS).unwrap();
Layer::<()>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact).unwrap();
let layer = Layer::<()>::open(dir.path()).unwrap();
let absent = Kmer::from_ascii(b"CCCC").unwrap().canonical();
assert!(layer.query(absent).is_none());
@@ -67,7 +69,7 @@ fn open_after_build_is_consistent() {
set_k(4);
let dir = tempdir().unwrap();
write_unitigs(dir.path(), &[b"AAAACGT"]);
let n = Layer::<PersistentCompactIntMatrix>::build(dir.path(), DEFAULT_BLOCK_BITS, |_| 7).unwrap();
let n = Layer::<PersistentCompactIntMatrix>::build(dir.path(), DEFAULT_BLOCK_BITS, &EvidenceKind::Exact, |_| 7).unwrap();
assert_eq!(n, 4);
let layer = Layer::<PersistentCompactIntMatrix>::open(dir.path()).unwrap();
let kmer = Kmer::from_ascii(b"AAAA").unwrap().canonical();
+2 -1
View File
@@ -8,7 +8,8 @@ pub use error::{SKError, SKResult};
pub use meta::SKFileMeta;
pub use pool::{SKFilePool, SKFileWriter, SharedPool, create_token, create_token_with};
pub use reader::{SKFileIter, SKFileReader};
pub use unitig_index::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS};
pub use unitig_index::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS,
CanonicalKmerIter};
use std::path::{Path, PathBuf};
+80
View File
@@ -1,6 +1,7 @@
use std::fs::File;
use std::io::{BufWriter, Write as _};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use memmap2::Mmap;
use obikseq::{CanonicalKmer, Kmer, Unitig};
@@ -439,6 +440,85 @@ fn extract_kmer_raw(bytes: &[u8], j: usize, k: usize) -> u64 {
raw << (64 - 2 * k)
}
// ── CanonicalKmerRawIter ──────────────────────────────────────────────────────
// ── CanonicalKmerIter ─────────────────────────────────────────────────────────
/// Sequential iterator over [`CanonicalKmer`] from a `unitigs.bin` file.
///
/// Holds an `Arc<Mmap>` so that `Clone` is O(1): both copies share the same
/// memory-mapped pages. Cloning resets the cursor to position 0 — this lets
/// ptr_hash's `new_from_par_iter` (which requires a `Clone`-able parallel
/// iterator via `par_bridge()`) make multiple passes without ever creating
/// a `.idx` file.
pub struct CanonicalKmerIter {
mmap: Arc<Mmap>,
k: usize,
chunk_pos: usize, // byte offset of the current chunk header
data_pos: usize, // byte offset of the current chunk's sequence bytes
n_kmers: usize, // kmers in current chunk
kmer_idx: usize, // next kmer index to yield within the current chunk
}
impl CanonicalKmerIter {
pub fn new(path: &Path) -> SKResult<Self> {
let file = File::open(path).map_err(SKError::Io)?;
let mmap = Arc::new(unsafe { Mmap::map(&file).map_err(SKError::Io)? });
let k = obikseq::params::k();
let mut s = Self { mmap, k, chunk_pos: 0, data_pos: 0, n_kmers: 0, kmer_idx: 0 };
s.load_chunk();
Ok(s)
}
#[inline]
fn load_chunk(&mut self) {
if self.chunk_pos < self.mmap.len() {
let seql_minus_k = self.mmap[self.chunk_pos] as usize;
self.n_kmers = seql_minus_k + 1;
self.data_pos = self.chunk_pos + 1;
self.kmer_idx = 0;
}
}
}
impl Clone for CanonicalKmerIter {
fn clone(&self) -> Self {
let mut c = Self {
mmap: Arc::clone(&self.mmap),
k: self.k,
chunk_pos: 0,
data_pos: 0,
n_kmers: 0,
kmer_idx: 0,
};
c.load_chunk();
c
}
}
impl Iterator for CanonicalKmerIter {
type Item = CanonicalKmer;
#[inline]
fn next(&mut self) -> Option<CanonicalKmer> {
loop {
if self.chunk_pos >= self.mmap.len() {
return None;
}
if self.kmer_idx < self.n_kmers {
let raw = extract_kmer_raw(&self.mmap[self.data_pos..], self.kmer_idx, self.k);
let canon = canonical_raw(raw, self.k);
self.kmer_idx += 1;
return Some(CanonicalKmer::from_raw_unchecked(canon));
}
let seql_minus_k = self.mmap[self.chunk_pos] as usize;
let byte_len = (seql_minus_k + self.k + 3) / 4;
self.chunk_pos += 1 + byte_len;
self.load_chunk();
}
}
}
#[cfg(test)]
#[path = "tests/unitig_index.rs"]
mod tests;