feat: add kmer iterators and optimize layered map performance
Replace `ph` with `ptr_hash` and introduce `epserde` and `rayon` dependencies. Refactor MPHF construction to leverage parallel iteration, eliminating intermediate `Vec<u64>` allocations and reducing memory footprint. Add a `n_kmers` field to track and serialize total kmer counts, alongside three zero-allocation iterators for efficient chunk traversal. Include comprehensive unit tests for the new iterators and update CLAUDE.md to enforce explicit dependency validation policies.
This commit is contained in:
@@ -6,7 +6,9 @@ edition = "2024"
|
||||
[dependencies]
|
||||
obikseq = { path = "../obikseq" }
|
||||
obiskio = { path = "../obiskio" }
|
||||
ph = "0.11"
|
||||
ptr_hash = "1.1"
|
||||
epserde = "0.8"
|
||||
rayon = "1"
|
||||
memmap2 = "0.9"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
|
||||
@@ -1,51 +1,61 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::io::BufWriter;
|
||||
use std::path::Path;
|
||||
|
||||
use obikseq::{CanonicalKmer, Kmer, Sequence};
|
||||
use epserde::prelude::*;
|
||||
use obikseq::CanonicalKmer;
|
||||
use obiskio::{UnitigFileReader, UnitigFileWriter};
|
||||
use ph::fmph;
|
||||
use ptr_hash::{DefaultPtrHash, PtrHashParams};
|
||||
|
||||
use crate::counts::{Counts, CountsWriter};
|
||||
use crate::error::{OLMError, OLMResult};
|
||||
use crate::evidence::{Evidence, EvidenceWriter};
|
||||
|
||||
const MPHF_FILE: &str = "mphf.bin";
|
||||
const UNITIGS_FILE: &str = "unitigs.bin";
|
||||
const MPHF_FILE: &str = "mphf.bin";
|
||||
const UNITIGS_FILE: &str = "unitigs.bin";
|
||||
const EVIDENCE_FILE: &str = "evidence.bin";
|
||||
const COUNTS_FILE: &str = "counts.bin";
|
||||
const COUNTS_FILE: &str = "counts.bin";
|
||||
|
||||
pub struct Layer {
|
||||
mphf: fmph::Function,
|
||||
mphf: DefaultPtrHash,
|
||||
evidence: Evidence,
|
||||
unitigs: UnitigFileReader,
|
||||
counts: Counts,
|
||||
unitigs: UnitigFileReader,
|
||||
counts: Counts,
|
||||
}
|
||||
|
||||
pub struct Hit {
|
||||
pub slot: usize,
|
||||
pub slot: usize,
|
||||
pub count: u32,
|
||||
}
|
||||
|
||||
impl Layer {
|
||||
pub fn open(path: &Path) -> OLMResult<Self> {
|
||||
let mphf = fmph::Function::read(
|
||||
&mut fs::File::open(path.join(MPHF_FILE))?
|
||||
).map_err(OLMError::Io)?;
|
||||
let mphf: DefaultPtrHash = DefaultPtrHash::load_full(&path.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
|
||||
let unitigs = UnitigFileReader::open(&path.join(UNITIGS_FILE))?;
|
||||
let unitigs = UnitigFileReader::open(&path.join(UNITIGS_FILE))?;
|
||||
let evidence = Evidence::open(&path.join(EVIDENCE_FILE))?;
|
||||
let counts = Counts::open(&path.join(COUNTS_FILE))?;
|
||||
let counts = Counts::open(&path.join(COUNTS_FILE))?;
|
||||
|
||||
Ok(Self { mphf, evidence, unitigs, counts })
|
||||
Ok(Self {
|
||||
mphf,
|
||||
evidence,
|
||||
unitigs,
|
||||
counts,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn query(&self, kmer: CanonicalKmer) -> Option<Hit> {
|
||||
let slot = self.mphf.get(&kmer.raw())? as usize;
|
||||
let slot = self.mphf.index(&kmer.raw());
|
||||
let (chunk_id, rank) = self.evidence.decode(slot);
|
||||
if self.unitigs.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer) {
|
||||
Some(Hit { slot, count: self.counts.get(slot) })
|
||||
if self
|
||||
.unitigs
|
||||
.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer)
|
||||
{
|
||||
Some(Hit {
|
||||
slot,
|
||||
count: self.counts.get(slot),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -55,43 +65,38 @@ impl Layer {
|
||||
///
|
||||
/// `count_of` maps each canonical kmer to its occurrence count.
|
||||
/// Returns the number of kmers indexed.
|
||||
pub fn build(
|
||||
out_dir: &Path,
|
||||
count_of: impl Fn(CanonicalKmer) -> u32,
|
||||
) -> OLMResult<usize> {
|
||||
let k = obikseq::params::k();
|
||||
pub fn build(out_dir: &Path, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> {
|
||||
use rayon::prelude::*;
|
||||
|
||||
let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?;
|
||||
let n = unitigs.n_kmers();
|
||||
|
||||
let mut entries: Vec<(u64, u32, u8)> = Vec::new();
|
||||
for chunk_id in 0..unitigs.len() {
|
||||
let n_kmers = unitigs.seql(chunk_id) - k + 1;
|
||||
for rank in 0..n_kmers {
|
||||
let raw = unitigs.raw_kmer(chunk_id, rank);
|
||||
let canonical: CanonicalKmer = Kmer::from_raw(raw).canonical();
|
||||
entries.push((canonical.raw(), chunk_id as u32, rank as u8));
|
||||
}
|
||||
}
|
||||
|
||||
let n = entries.len();
|
||||
if n == 0 {
|
||||
fs::File::create(out_dir.join(EVIDENCE_FILE))?;
|
||||
fs::File::create(out_dir.join(COUNTS_FILE))?;
|
||||
let mphf = fmph::Function::new(Vec::<u64>::new());
|
||||
mphf.write(&mut BufWriter::new(fs::File::create(out_dir.join(MPHF_FILE))?))?;
|
||||
let mphf: DefaultPtrHash = DefaultPtrHash::new(&[] as &[u64], PtrHashParams::default());
|
||||
mphf.store(&out_dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let keys: Vec<u64> = entries.iter().map(|(k, _, _)| *k).collect();
|
||||
let mphf = fmph::Function::new(keys);
|
||||
mphf.write(&mut BufWriter::new(fs::File::create(out_dir.join(MPHF_FILE))?))?;
|
||||
// Build MPHF from a cloneable parallel iterator — no Vec<u64> allocation.
|
||||
// flat_map_iter: outer chunks in parallel, inner kmer sliding-window sequential.
|
||||
let keys = (0..unitigs.len())
|
||||
.into_par_iter()
|
||||
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
|
||||
let mphf: DefaultPtrHash =
|
||||
DefaultPtrHash::new_from_par_iter(n, keys, PtrHashParams::default());
|
||||
mphf.store(&out_dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
|
||||
// Second pass: fill evidence and counts
|
||||
let mut ev = EvidenceWriter::new(n);
|
||||
let mut cnt = CountsWriter::new(n);
|
||||
|
||||
for (key, chunk_id, rank) in &entries {
|
||||
let slot = mphf.get(key).unwrap() as usize;
|
||||
ev.set(slot, *chunk_id, *rank);
|
||||
let kmer = CanonicalKmer::from_raw_unchecked(*key);
|
||||
for (kmer, chunk_id, rank) in unitigs.iter_indexed_canonical_kmers() {
|
||||
let slot = mphf.index(&kmer.raw());
|
||||
ev.set(slot, chunk_id as u32, rank as u8);
|
||||
cnt.set(slot, count_of(kmer));
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use super::*;
|
||||
use obikseq::{set_k, Unitig};
|
||||
use obikseq::{set_k, Kmer, Sequence as _, Unitig};
|
||||
use tempfile::tempdir;
|
||||
|
||||
fn write_unitigs(dir: &Path, seqs: &[&[u8]]) {
|
||||
|
||||
Reference in New Issue
Block a user