refactor: delegate MPHF construction and I/O to MphfLayer
Extracts MPHF construction, evidence encoding, and unitig I/O into a new `MphfLayer` module. This removes direct dependencies on `Evidence`, `PersistentBitMatrix`, and `PersistentCompactIntMatrix` from `Layer`. The `query` method is simplified to perform direct MPHF lookups, while build logic and serialization are consolidated within the new module.
This commit is contained in:
+20
-123
@@ -1,27 +1,19 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
use cacheline_ef::{CachelineEf, CachelineEfVec};
|
||||
use epserde::prelude::*;
|
||||
use obicompactvec::{
|
||||
PersistentBitMatrix, PersistentBitMatrixBuilder,
|
||||
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder,
|
||||
};
|
||||
use obikseq::CanonicalKmer;
|
||||
use obiskio::{UnitigFileReader, UnitigFileWriter};
|
||||
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
|
||||
|
||||
use crate::error::{OLMError, OLMResult};
|
||||
use crate::evidence::{Evidence, EvidenceWriter};
|
||||
use crate::mphf_layer::MphfLayer;
|
||||
pub(crate) use crate::mphf_layer::UNITIGS_FILE;
|
||||
|
||||
pub(crate) const MPHF_FILE: &str = "mphf.bin";
|
||||
pub(crate) const UNITIGS_FILE: &str = "unitigs.bin";
|
||||
const EVIDENCE_FILE: &str = "evidence.bin";
|
||||
const COUNTS_DIR: &str = "counts";
|
||||
const PRESENCE_DIR: &str = "presence";
|
||||
|
||||
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
|
||||
const COUNTS_DIR: &str = "counts";
|
||||
const PRESENCE_DIR: &str = "presence";
|
||||
|
||||
// ── Trait ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -56,10 +48,8 @@ impl LayerData for PersistentBitMatrix {
|
||||
// ── Structures ────────────────────────────────────────────────────────────────
|
||||
|
||||
pub struct Layer<D: LayerData = ()> {
|
||||
mphf: Mphf,
|
||||
evidence: Evidence,
|
||||
unitigs: UnitigFileReader,
|
||||
data: D,
|
||||
mphf: MphfLayer,
|
||||
data: D,
|
||||
}
|
||||
|
||||
pub struct Hit<T = ()> {
|
||||
@@ -71,95 +61,27 @@ pub struct Hit<T = ()> {
|
||||
|
||||
impl<D: LayerData> Layer<D> {
|
||||
pub fn open(path: &Path) -> OLMResult<Self> {
|
||||
let mphf: Mphf = Mphf::load_full(&path.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
let unitigs = UnitigFileReader::open(&path.join(UNITIGS_FILE))?;
|
||||
let evidence = Evidence::open(&path.join(EVIDENCE_FILE))?;
|
||||
let data = D::open(path)?;
|
||||
Ok(Self { mphf, evidence, unitigs, data })
|
||||
let mphf = MphfLayer::open(path)?;
|
||||
let data = D::open(path)?;
|
||||
Ok(Self { mphf, data })
|
||||
}
|
||||
|
||||
pub fn query(&self, kmer: CanonicalKmer) -> Option<Hit<D::Item>> {
|
||||
let slot = self.mphf.index(&kmer.raw());
|
||||
let (chunk_id, rank) = self.evidence.decode(slot);
|
||||
if self.unitigs.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer) {
|
||||
Some(Hit { slot, data: self.data.read(slot) })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
self.mphf.find(kmer).map(|slot| Hit { slot, data: self.data.read(slot) })
|
||||
}
|
||||
|
||||
pub fn n(&self) -> usize { self.mphf.n() }
|
||||
|
||||
pub fn unitig_writer(out_dir: &Path) -> OLMResult<UnitigFileWriter> {
|
||||
fs::create_dir_all(out_dir)?;
|
||||
Ok(UnitigFileWriter::create(&out_dir.join(UNITIGS_FILE))?)
|
||||
MphfLayer::unitig_writer(out_dir)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Build helpers (private) ───────────────────────────────────────────────────
|
||||
|
||||
fn build_mphf(out_dir: &Path, n: usize) -> OLMResult<Mphf> {
|
||||
use rayon::prelude::*;
|
||||
let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?;
|
||||
let keys = (0..unitigs.len())
|
||||
.into_par_iter()
|
||||
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
|
||||
let mphf: Mphf = Mphf::new_from_par_iter(n, keys, PtrHashParams::<CubicEps>::default());
|
||||
mphf.store(&out_dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
Ok(mphf)
|
||||
}
|
||||
|
||||
fn build_second_pass(
|
||||
out_dir: &Path,
|
||||
n: usize,
|
||||
mphf: &Mphf,
|
||||
fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>,
|
||||
) -> OLMResult<()> {
|
||||
let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?;
|
||||
let mut ev = EvidenceWriter::new(n);
|
||||
let mut seen = vec![0u8; (n + 7) / 8];
|
||||
|
||||
for (kmer, chunk_id, rank) in unitigs.iter_indexed_canonical_kmers() {
|
||||
let slot = mphf.index(&kmer.raw());
|
||||
if slot >= n {
|
||||
return Err(OLMError::Mphf("slot out of bounds".into()));
|
||||
}
|
||||
let byte = slot / 8;
|
||||
let bit = 1u8 << (slot % 8);
|
||||
if seen[byte] & bit != 0 {
|
||||
return Err(OLMError::Mphf("duplicate slot".into()));
|
||||
}
|
||||
seen[byte] |= bit;
|
||||
ev.set(slot, chunk_id as u32, rank as u8);
|
||||
fill_slot(slot, kmer)?;
|
||||
}
|
||||
|
||||
ev.write(&out_dir.join(EVIDENCE_FILE))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn empty_layer(out_dir: &Path) -> OLMResult<()> {
|
||||
fs::File::create(out_dir.join(EVIDENCE_FILE))?;
|
||||
let mphf: Mphf = Mphf::try_new(&[] as &[u64], PtrHashParams::<CubicEps>::default())
|
||||
.ok_or_else(|| OLMError::Mphf("construction failed".into()))?;
|
||||
mphf.store(&out_dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Mode 1 — set membership ───────────────────────────────────────────────────
|
||||
|
||||
impl Layer<()> {
|
||||
pub fn build(out_dir: &Path) -> OLMResult<usize> {
|
||||
let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?;
|
||||
let n = unitigs.n_kmers();
|
||||
if n == 0 {
|
||||
empty_layer(out_dir)?;
|
||||
return Ok(0);
|
||||
}
|
||||
let mphf = build_mphf(out_dir, n)?;
|
||||
build_second_pass(out_dir, n, &mphf, &mut |_, _| Ok(()))?;
|
||||
Ok(n)
|
||||
MphfLayer::build(out_dir, &mut |_, _| Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -167,28 +89,18 @@ impl Layer<()> {
|
||||
|
||||
impl Layer<PersistentCompactIntMatrix> {
|
||||
pub fn build(out_dir: &Path, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> {
|
||||
let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?;
|
||||
let n = unitigs.n_kmers();
|
||||
let n = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?.n_kmers();
|
||||
let counts_dir = out_dir.join(COUNTS_DIR);
|
||||
if n == 0 {
|
||||
empty_layer(out_dir)?;
|
||||
let mut mb = PersistentCompactIntMatrixBuilder::new(0, &counts_dir)
|
||||
.map_err(OLMError::Io)?;
|
||||
mb.add_col().map_err(OLMError::Io)?.close().map_err(OLMError::Io)?;
|
||||
mb.close().map_err(OLMError::Io)?;
|
||||
return Ok(0);
|
||||
}
|
||||
let mphf = build_mphf(out_dir, n)?;
|
||||
let mut mb = PersistentCompactIntMatrixBuilder::new(n, &counts_dir)
|
||||
.map_err(OLMError::Io)?;
|
||||
let mut col = mb.add_col().map_err(OLMError::Io)?;
|
||||
build_second_pass(out_dir, n, &mphf, &mut |slot, kmer| {
|
||||
let n_built = MphfLayer::build(out_dir, &mut |slot, kmer| {
|
||||
col.set(slot, count_of(kmer));
|
||||
Ok(())
|
||||
})?;
|
||||
col.close().map_err(OLMError::Io)?;
|
||||
mb.close().map_err(OLMError::Io)?;
|
||||
Ok(n)
|
||||
Ok(n_built)
|
||||
}
|
||||
|
||||
pub fn build_from_map(
|
||||
@@ -207,38 +119,23 @@ impl Layer<PersistentBitMatrix> {
|
||||
n_genomes: usize,
|
||||
present_in: impl Fn(CanonicalKmer, usize) -> bool,
|
||||
) -> OLMResult<usize> {
|
||||
let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?;
|
||||
let n = unitigs.n_kmers();
|
||||
let n = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?.n_kmers();
|
||||
let presence_dir = out_dir.join(PRESENCE_DIR);
|
||||
if n == 0 {
|
||||
empty_layer(out_dir)?;
|
||||
let mut mb = PersistentBitMatrixBuilder::new(0, &presence_dir)
|
||||
.map_err(OLMError::Io)?;
|
||||
for _ in 0..n_genomes {
|
||||
mb.add_col().map_err(OLMError::Io)?.close().map_err(OLMError::Io)?;
|
||||
}
|
||||
mb.close().map_err(OLMError::Io)?;
|
||||
return Ok(0);
|
||||
}
|
||||
let mphf = build_mphf(out_dir, n)?;
|
||||
|
||||
let mut mb = PersistentBitMatrixBuilder::new(n, &presence_dir).map_err(OLMError::Io)?;
|
||||
let mut cols: Vec<_> = (0..n_genomes)
|
||||
.map(|_| mb.add_col().map_err(OLMError::Io))
|
||||
.collect::<OLMResult<_>>()?;
|
||||
|
||||
build_second_pass(out_dir, n, &mphf, &mut |slot, kmer| {
|
||||
let n_built = MphfLayer::build(out_dir, &mut |slot, kmer| {
|
||||
for (g, col) in cols.iter_mut().enumerate() {
|
||||
col.set(slot, present_in(kmer, g));
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
for col in cols {
|
||||
col.close().map_err(OLMError::Io)?;
|
||||
}
|
||||
mb.close().map_err(OLMError::Io)?;
|
||||
Ok(n)
|
||||
Ok(n_built)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,8 +4,10 @@ pub mod layer;
|
||||
pub mod layered_store;
|
||||
pub mod map;
|
||||
pub mod meta;
|
||||
pub(crate) mod mphf_layer;
|
||||
|
||||
pub use error::{OLMError, OLMResult};
|
||||
pub use layer::{Hit, Layer, LayerData};
|
||||
pub use layered_store::LayeredStore;
|
||||
pub use map::LayeredMap;
|
||||
pub use mphf_layer::MphfLayer;
|
||||
|
||||
@@ -0,0 +1,113 @@
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
use cacheline_ef::{CachelineEf, CachelineEfVec};
|
||||
use epserde::prelude::*;
|
||||
use obikseq::CanonicalKmer;
|
||||
use obiskio::{UnitigFileReader, UnitigFileWriter};
|
||||
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
|
||||
|
||||
use crate::error::{OLMError, OLMResult};
|
||||
use crate::evidence::{Evidence, EvidenceWriter};
|
||||
|
||||
pub(crate) const MPHF_FILE: &str = "mphf.bin";
|
||||
pub(crate) const UNITIGS_FILE: &str = "unitigs.bin";
|
||||
pub(crate) const EVIDENCE_FILE: &str = "evidence.bin";
|
||||
|
||||
pub(crate) type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
|
||||
|
||||
/// Autonomous kmer → slot mapping for one layer.
|
||||
///
|
||||
/// Answers presence/absence queries without any attached DataStore.
|
||||
/// Build once, never rebuilt — data stores are attached and derived externally.
|
||||
pub struct MphfLayer {
|
||||
mphf: Mphf,
|
||||
evidence: Evidence,
|
||||
unitigs: UnitigFileReader,
|
||||
n: usize,
|
||||
}
|
||||
|
||||
impl MphfLayer {
|
||||
pub fn open(dir: &Path) -> OLMResult<Self> {
|
||||
let mphf: Mphf = Mphf::load_full(&dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
let unitigs = UnitigFileReader::open(&dir.join(UNITIGS_FILE))?;
|
||||
let evidence = Evidence::open(&dir.join(EVIDENCE_FILE))?;
|
||||
let n = evidence.len();
|
||||
Ok(Self { mphf, evidence, unitigs, n })
|
||||
}
|
||||
|
||||
/// Returns `Some(slot)` if `kmer` belongs to this layer, `None` otherwise.
|
||||
#[inline]
|
||||
pub fn find(&self, kmer: CanonicalKmer) -> Option<usize> {
|
||||
let slot = self.mphf.index(&kmer.raw());
|
||||
let (chunk_id, rank) = self.evidence.decode(slot);
|
||||
if self.unitigs.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer) {
|
||||
Some(slot)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn n(&self) -> usize { self.n }
|
||||
|
||||
pub fn unitig_writer(dir: &Path) -> OLMResult<UnitigFileWriter> {
|
||||
fs::create_dir_all(dir)?;
|
||||
Ok(UnitigFileWriter::create(&dir.join(UNITIGS_FILE))?)
|
||||
}
|
||||
|
||||
/// Builds the MPHF and evidence from the unitigs file already present in `dir`.
|
||||
/// Calls `fill_slot(slot, kmer)` once per kmer (second pass) for DataStore population.
|
||||
/// Returns the number of kmers indexed.
|
||||
pub(crate) fn build(
|
||||
dir: &Path,
|
||||
fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>,
|
||||
) -> OLMResult<usize> {
|
||||
use rayon::prelude::*;
|
||||
|
||||
let unitigs = UnitigFileReader::open(&dir.join(UNITIGS_FILE))?;
|
||||
let n = unitigs.n_kmers();
|
||||
|
||||
if n == 0 {
|
||||
fs::File::create(dir.join(EVIDENCE_FILE))?;
|
||||
let mphf: Mphf =
|
||||
Mphf::try_new(&[] as &[u64], PtrHashParams::<CubicEps>::default())
|
||||
.ok_or_else(|| OLMError::Mphf("construction failed".into()))?;
|
||||
mphf.store(&dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Pass 1 — build MPHF
|
||||
let keys = (0..unitigs.len())
|
||||
.into_par_iter()
|
||||
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
|
||||
let mphf: Mphf =
|
||||
Mphf::new_from_par_iter(n, keys, PtrHashParams::<CubicEps>::default());
|
||||
mphf.store(&dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
|
||||
// Pass 2 — fill evidence + mode-specific data via callback
|
||||
let unitigs2 = UnitigFileReader::open(&dir.join(UNITIGS_FILE))?;
|
||||
let mut ev = EvidenceWriter::new(n);
|
||||
let mut seen = vec![0u8; (n + 7) / 8];
|
||||
|
||||
for (kmer, chunk_id, rank) in unitigs2.iter_indexed_canonical_kmers() {
|
||||
let slot = mphf.index(&kmer.raw());
|
||||
if slot >= n {
|
||||
return Err(OLMError::Mphf("slot out of bounds".into()));
|
||||
}
|
||||
let byte = slot / 8;
|
||||
let bit = 1u8 << (slot % 8);
|
||||
if seen[byte] & bit != 0 {
|
||||
return Err(OLMError::Mphf("duplicate slot".into()));
|
||||
}
|
||||
seen[byte] |= bit;
|
||||
ev.set(slot, chunk_id as u32, rank as u8);
|
||||
fill_slot(slot, kmer)?;
|
||||
}
|
||||
|
||||
ev.write(&dir.join(EVIDENCE_FILE))?;
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user