diff --git a/src/obilayeredmap/src/layer.rs b/src/obilayeredmap/src/layer.rs index 6520fe6..166a072 100644 --- a/src/obilayeredmap/src/layer.rs +++ b/src/obilayeredmap/src/layer.rs @@ -1,27 +1,19 @@ use std::collections::HashMap; -use std::fs; use std::path::Path; -use cacheline_ef::{CachelineEf, CachelineEfVec}; -use epserde::prelude::*; use obicompactvec::{ PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, }; use obikseq::CanonicalKmer; use obiskio::{UnitigFileReader, UnitigFileWriter}; -use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64}; use crate::error::{OLMError, OLMResult}; -use crate::evidence::{Evidence, EvidenceWriter}; +use crate::mphf_layer::MphfLayer; +pub(crate) use crate::mphf_layer::UNITIGS_FILE; -pub(crate) const MPHF_FILE: &str = "mphf.bin"; -pub(crate) const UNITIGS_FILE: &str = "unitigs.bin"; -const EVIDENCE_FILE: &str = "evidence.bin"; -const COUNTS_DIR: &str = "counts"; -const PRESENCE_DIR: &str = "presence"; - -type Mphf = PtrHash>, Xx64, Vec>; +const COUNTS_DIR: &str = "counts"; +const PRESENCE_DIR: &str = "presence"; // ── Trait ───────────────────────────────────────────────────────────────────── @@ -56,10 +48,8 @@ impl LayerData for PersistentBitMatrix { // ── Structures ──────────────────────────────────────────────────────────────── pub struct Layer { - mphf: Mphf, - evidence: Evidence, - unitigs: UnitigFileReader, - data: D, + mphf: MphfLayer, + data: D, } pub struct Hit { @@ -71,95 +61,27 @@ pub struct Hit { impl Layer { pub fn open(path: &Path) -> OLMResult { - let mphf: Mphf = Mphf::load_full(&path.join(MPHF_FILE)) - .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; - let unitigs = UnitigFileReader::open(&path.join(UNITIGS_FILE))?; - let evidence = Evidence::open(&path.join(EVIDENCE_FILE))?; - let data = D::open(path)?; - Ok(Self { mphf, evidence, unitigs, data }) + let mphf = MphfLayer::open(path)?; + let data = D::open(path)?; + Ok(Self { mphf, data }) } pub fn query(&self, kmer: CanonicalKmer) -> Option> { - let slot = self.mphf.index(&kmer.raw()); - let (chunk_id, rank) = self.evidence.decode(slot); - if self.unitigs.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer) { - Some(Hit { slot, data: self.data.read(slot) }) - } else { - None - } + self.mphf.find(kmer).map(|slot| Hit { slot, data: self.data.read(slot) }) } + pub fn n(&self) -> usize { self.mphf.n() } + pub fn unitig_writer(out_dir: &Path) -> OLMResult { - fs::create_dir_all(out_dir)?; - Ok(UnitigFileWriter::create(&out_dir.join(UNITIGS_FILE))?) + MphfLayer::unitig_writer(out_dir) } } -// ── Build helpers (private) ─────────────────────────────────────────────────── - -fn build_mphf(out_dir: &Path, n: usize) -> OLMResult { - use rayon::prelude::*; - let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?; - let keys = (0..unitigs.len()) - .into_par_iter() - .flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw())); - let mphf: Mphf = Mphf::new_from_par_iter(n, keys, PtrHashParams::::default()); - mphf.store(&out_dir.join(MPHF_FILE)) - .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; - Ok(mphf) -} - -fn build_second_pass( - out_dir: &Path, - n: usize, - mphf: &Mphf, - fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>, -) -> OLMResult<()> { - let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?; - let mut ev = EvidenceWriter::new(n); - let mut seen = vec![0u8; (n + 7) / 8]; - - for (kmer, chunk_id, rank) in unitigs.iter_indexed_canonical_kmers() { - let slot = mphf.index(&kmer.raw()); - if slot >= n { - return Err(OLMError::Mphf("slot out of bounds".into())); - } - let byte = slot / 8; - let bit = 1u8 << (slot % 8); - if seen[byte] & bit != 0 { - return Err(OLMError::Mphf("duplicate slot".into())); - } - seen[byte] |= bit; - ev.set(slot, chunk_id as u32, rank as u8); - fill_slot(slot, kmer)?; - } - - ev.write(&out_dir.join(EVIDENCE_FILE))?; - Ok(()) -} - -fn empty_layer(out_dir: &Path) -> OLMResult<()> { - fs::File::create(out_dir.join(EVIDENCE_FILE))?; - let mphf: Mphf = Mphf::try_new(&[] as &[u64], PtrHashParams::::default()) - .ok_or_else(|| OLMError::Mphf("construction failed".into()))?; - mphf.store(&out_dir.join(MPHF_FILE)) - .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; - Ok(()) -} - // ── Mode 1 — set membership ─────────────────────────────────────────────────── impl Layer<()> { pub fn build(out_dir: &Path) -> OLMResult { - let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?; - let n = unitigs.n_kmers(); - if n == 0 { - empty_layer(out_dir)?; - return Ok(0); - } - let mphf = build_mphf(out_dir, n)?; - build_second_pass(out_dir, n, &mphf, &mut |_, _| Ok(()))?; - Ok(n) + MphfLayer::build(out_dir, &mut |_, _| Ok(())) } } @@ -167,28 +89,18 @@ impl Layer<()> { impl Layer { pub fn build(out_dir: &Path, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult { - let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?; - let n = unitigs.n_kmers(); + let n = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?.n_kmers(); let counts_dir = out_dir.join(COUNTS_DIR); - if n == 0 { - empty_layer(out_dir)?; - let mut mb = PersistentCompactIntMatrixBuilder::new(0, &counts_dir) - .map_err(OLMError::Io)?; - mb.add_col().map_err(OLMError::Io)?.close().map_err(OLMError::Io)?; - mb.close().map_err(OLMError::Io)?; - return Ok(0); - } - let mphf = build_mphf(out_dir, n)?; let mut mb = PersistentCompactIntMatrixBuilder::new(n, &counts_dir) .map_err(OLMError::Io)?; let mut col = mb.add_col().map_err(OLMError::Io)?; - build_second_pass(out_dir, n, &mphf, &mut |slot, kmer| { + let n_built = MphfLayer::build(out_dir, &mut |slot, kmer| { col.set(slot, count_of(kmer)); Ok(()) })?; col.close().map_err(OLMError::Io)?; mb.close().map_err(OLMError::Io)?; - Ok(n) + Ok(n_built) } pub fn build_from_map( @@ -207,38 +119,23 @@ impl Layer { n_genomes: usize, present_in: impl Fn(CanonicalKmer, usize) -> bool, ) -> OLMResult { - let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?; - let n = unitigs.n_kmers(); + let n = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?.n_kmers(); let presence_dir = out_dir.join(PRESENCE_DIR); - if n == 0 { - empty_layer(out_dir)?; - let mut mb = PersistentBitMatrixBuilder::new(0, &presence_dir) - .map_err(OLMError::Io)?; - for _ in 0..n_genomes { - mb.add_col().map_err(OLMError::Io)?.close().map_err(OLMError::Io)?; - } - mb.close().map_err(OLMError::Io)?; - return Ok(0); - } - let mphf = build_mphf(out_dir, n)?; - let mut mb = PersistentBitMatrixBuilder::new(n, &presence_dir).map_err(OLMError::Io)?; let mut cols: Vec<_> = (0..n_genomes) .map(|_| mb.add_col().map_err(OLMError::Io)) .collect::>()?; - - build_second_pass(out_dir, n, &mphf, &mut |slot, kmer| { + let n_built = MphfLayer::build(out_dir, &mut |slot, kmer| { for (g, col) in cols.iter_mut().enumerate() { col.set(slot, present_in(kmer, g)); } Ok(()) })?; - for col in cols { col.close().map_err(OLMError::Io)?; } mb.close().map_err(OLMError::Io)?; - Ok(n) + Ok(n_built) } } diff --git a/src/obilayeredmap/src/lib.rs b/src/obilayeredmap/src/lib.rs index 32b08d1..a883041 100644 --- a/src/obilayeredmap/src/lib.rs +++ b/src/obilayeredmap/src/lib.rs @@ -4,8 +4,10 @@ pub mod layer; pub mod layered_store; pub mod map; pub mod meta; +pub(crate) mod mphf_layer; pub use error::{OLMError, OLMResult}; pub use layer::{Hit, Layer, LayerData}; pub use layered_store::LayeredStore; pub use map::LayeredMap; +pub use mphf_layer::MphfLayer; diff --git a/src/obilayeredmap/src/mphf_layer.rs b/src/obilayeredmap/src/mphf_layer.rs new file mode 100644 index 0000000..5fcb677 --- /dev/null +++ b/src/obilayeredmap/src/mphf_layer.rs @@ -0,0 +1,113 @@ +use std::fs; +use std::path::Path; + +use cacheline_ef::{CachelineEf, CachelineEfVec}; +use epserde::prelude::*; +use obikseq::CanonicalKmer; +use obiskio::{UnitigFileReader, UnitigFileWriter}; +use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64}; + +use crate::error::{OLMError, OLMResult}; +use crate::evidence::{Evidence, EvidenceWriter}; + +pub(crate) const MPHF_FILE: &str = "mphf.bin"; +pub(crate) const UNITIGS_FILE: &str = "unitigs.bin"; +pub(crate) const EVIDENCE_FILE: &str = "evidence.bin"; + +pub(crate) type Mphf = PtrHash>, Xx64, Vec>; + +/// Autonomous kmer → slot mapping for one layer. +/// +/// Answers presence/absence queries without any attached DataStore. +/// Build once, never rebuilt — data stores are attached and derived externally. +pub struct MphfLayer { + mphf: Mphf, + evidence: Evidence, + unitigs: UnitigFileReader, + n: usize, +} + +impl MphfLayer { + pub fn open(dir: &Path) -> OLMResult { + let mphf: Mphf = Mphf::load_full(&dir.join(MPHF_FILE)) + .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; + let unitigs = UnitigFileReader::open(&dir.join(UNITIGS_FILE))?; + let evidence = Evidence::open(&dir.join(EVIDENCE_FILE))?; + let n = evidence.len(); + Ok(Self { mphf, evidence, unitigs, n }) + } + + /// Returns `Some(slot)` if `kmer` belongs to this layer, `None` otherwise. + #[inline] + pub fn find(&self, kmer: CanonicalKmer) -> Option { + let slot = self.mphf.index(&kmer.raw()); + let (chunk_id, rank) = self.evidence.decode(slot); + if self.unitigs.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer) { + Some(slot) + } else { + None + } + } + + pub fn n(&self) -> usize { self.n } + + pub fn unitig_writer(dir: &Path) -> OLMResult { + fs::create_dir_all(dir)?; + Ok(UnitigFileWriter::create(&dir.join(UNITIGS_FILE))?) + } + + /// Builds the MPHF and evidence from the unitigs file already present in `dir`. + /// Calls `fill_slot(slot, kmer)` once per kmer (second pass) for DataStore population. + /// Returns the number of kmers indexed. + pub(crate) fn build( + dir: &Path, + fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>, + ) -> OLMResult { + use rayon::prelude::*; + + let unitigs = UnitigFileReader::open(&dir.join(UNITIGS_FILE))?; + let n = unitigs.n_kmers(); + + if n == 0 { + fs::File::create(dir.join(EVIDENCE_FILE))?; + let mphf: Mphf = + Mphf::try_new(&[] as &[u64], PtrHashParams::::default()) + .ok_or_else(|| OLMError::Mphf("construction failed".into()))?; + mphf.store(&dir.join(MPHF_FILE)) + .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; + return Ok(0); + } + + // Pass 1 — build MPHF + let keys = (0..unitigs.len()) + .into_par_iter() + .flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw())); + let mphf: Mphf = + Mphf::new_from_par_iter(n, keys, PtrHashParams::::default()); + mphf.store(&dir.join(MPHF_FILE)) + .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; + + // Pass 2 — fill evidence + mode-specific data via callback + let unitigs2 = UnitigFileReader::open(&dir.join(UNITIGS_FILE))?; + let mut ev = EvidenceWriter::new(n); + let mut seen = vec![0u8; (n + 7) / 8]; + + for (kmer, chunk_id, rank) in unitigs2.iter_indexed_canonical_kmers() { + let slot = mphf.index(&kmer.raw()); + if slot >= n { + return Err(OLMError::Mphf("slot out of bounds".into())); + } + let byte = slot / 8; + let bit = 1u8 << (slot % 8); + if seen[byte] & bit != 0 { + return Err(OLMError::Mphf("duplicate slot".into())); + } + seen[byte] |= bit; + ev.set(slot, chunk_id as u32, rank as u8); + fill_slot(slot, kmer)?; + } + + ev.write(&dir.join(EVIDENCE_FILE))?; + Ok(n) + } +}