refactor: decouple unitig index generation and add exact evidence
Decouple index generation by introducing `build_unitig_idx()` for retroactive `.idx` creation and optional immediate writing on close. Add `open_sequential()` for index-less iteration while enforcing index requirements for random access. Refactor the MPHF layer to pre-generate the unitig index for parallel random access, integrate `rayon` for k-mer processing, and enforce mapping integrity via duplicate slot validation. Additionally, implement `build_exact_evidence()` to reconstruct evidence from existing artifacts, and update tests to leverage the new index generation and simplified k-mer iteration helpers.
This commit is contained in:
@@ -76,6 +76,14 @@ impl<D: LayerData> Layer<D> {
|
||||
pub fn unitig_writer(out_dir: &Path) -> OLMResult<UnitigFileWriter> {
|
||||
MphfLayer::unitig_writer(out_dir)
|
||||
}
|
||||
|
||||
/// Build `unitigs.bin.idx` and `evidence.bin` from `unitigs.bin` and
|
||||
/// `mphf.bin` already present in `layer_dir`.
|
||||
///
|
||||
/// See [`MphfLayer::build_exact_evidence`] for the full contract.
|
||||
pub fn build_exact_evidence(layer_dir: &Path) -> OLMResult<usize> {
|
||||
MphfLayer::build_exact_evidence(layer_dir)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Mode 1 — set membership ───────────────────────────────────────────────────
|
||||
@@ -106,7 +114,7 @@ impl Layer<()> {
|
||||
|
||||
impl Layer<PersistentCompactIntMatrix> {
|
||||
pub fn build(out_dir: &Path, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> {
|
||||
let n = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?.n_kmers();
|
||||
let n = UnitigFileReader::open_sequential(&out_dir.join(UNITIGS_FILE))?.n_kmers();
|
||||
let counts_dir = out_dir.join(COUNTS_DIR);
|
||||
let mut mb = PersistentCompactIntMatrixBuilder::new(n, &counts_dir)
|
||||
.map_err(OLMError::Io)?;
|
||||
@@ -158,7 +166,7 @@ impl Layer<PersistentBitMatrix> {
|
||||
n_genomes: usize,
|
||||
present_in: impl Fn(CanonicalKmer, usize) -> bool,
|
||||
) -> OLMResult<usize> {
|
||||
let n = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?.n_kmers();
|
||||
let n = UnitigFileReader::open_sequential(&out_dir.join(UNITIGS_FILE))?.n_kmers();
|
||||
let presence_dir = out_dir.join(PRESENCE_DIR);
|
||||
let mut mb = PersistentBitMatrixBuilder::new(n, &presence_dir).map_err(OLMError::Io)?;
|
||||
let mut cols: Vec<_> = (0..n_genomes)
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::path::Path;
|
||||
use cacheline_ef::{CachelineEf, CachelineEfVec};
|
||||
use epserde::prelude::*;
|
||||
use obikseq::CanonicalKmer;
|
||||
use obiskio::{UnitigFileReader, UnitigFileWriter};
|
||||
use obiskio::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS};
|
||||
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
|
||||
|
||||
use crate::error::{OLMError, OLMResult};
|
||||
@@ -41,7 +41,6 @@ impl MphfLayer {
|
||||
#[inline]
|
||||
pub fn find(&self, kmer: CanonicalKmer) -> Option<usize> {
|
||||
let slot = self.mphf.index(&kmer.raw());
|
||||
// PtrHash guarantees slot < n only for its key set; arbitrary queries may exceed bounds.
|
||||
if slot >= self.n { return None; }
|
||||
let (chunk_id, rank) = self.evidence.decode(slot);
|
||||
if self.unitigs.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer) {
|
||||
@@ -58,16 +57,73 @@ impl MphfLayer {
|
||||
Ok(UnitigFileWriter::create(&dir.join(UNITIGS_FILE))?)
|
||||
}
|
||||
|
||||
/// Builds the MPHF and evidence from the unitigs file already present in `dir`.
|
||||
/// Calls `fill_slot(slot, kmer)` once per kmer (second pass) for DataStore population.
|
||||
/// Returns the number of kmers indexed.
|
||||
/// Build `unitigs.bin.idx` and `evidence.bin` from `unitigs.bin` and
|
||||
/// `mphf.bin` already present in `dir`.
|
||||
///
|
||||
/// This is the exact-evidence construction route. It can be called:
|
||||
/// - after the initial build (via [`Self::build`] which calls it internally)
|
||||
/// - standalone to promote an existing (unitigs + mphf) into an exact index
|
||||
/// - standalone to rebuild evidence after a format change
|
||||
/// Build `unitigs.bin.idx` and `evidence.bin` from `unitigs.bin` and
|
||||
/// `mphf.bin` already present in `dir`.
|
||||
///
|
||||
/// Uses sequential iteration — no `.idx` is required on entry.
|
||||
/// Writes both `evidence.bin` and `unitigs.bin.idx` on success.
|
||||
pub fn build_exact_evidence(dir: &Path) -> OLMResult<usize> {
|
||||
let unitig_path = dir.join(UNITIGS_FILE);
|
||||
|
||||
// Sequential scan — no .idx required for iteration
|
||||
let unitigs = UnitigFileReader::open_sequential(&unitig_path)?;
|
||||
let n = unitigs.n_kmers();
|
||||
|
||||
if n == 0 {
|
||||
fs::File::create(dir.join(EVIDENCE_FILE))?;
|
||||
build_unitig_idx(&unitig_path, DEFAULT_BLOCK_BITS)?;
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let mphf: Mphf = Mphf::load_full(&dir.join(MPHF_FILE))
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
|
||||
let mut ev = EvidenceWriter::new(n);
|
||||
let mut seen = vec![0u8; (n + 7) / 8];
|
||||
|
||||
for (kmer, chunk_id, rank) in unitigs.iter_indexed_canonical_kmers() {
|
||||
let slot = mphf.index(&kmer.raw());
|
||||
if slot >= n {
|
||||
return Err(OLMError::Mphf("slot out of bounds".into()));
|
||||
}
|
||||
let byte = slot / 8;
|
||||
let bit = 1u8 << (slot % 8);
|
||||
if seen[byte] & bit != 0 {
|
||||
return Err(OLMError::Mphf("duplicate slot".into()));
|
||||
}
|
||||
seen[byte] |= bit;
|
||||
ev.set(slot, chunk_id as u32, rank as u8);
|
||||
}
|
||||
|
||||
ev.write(&dir.join(EVIDENCE_FILE))?;
|
||||
// Write .idx last: it is only needed for random access (queries).
|
||||
build_unitig_idx(&unitig_path, DEFAULT_BLOCK_BITS)?;
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
/// Build MPHF and exact evidence from the unitigs file already present in
|
||||
/// `dir`. Calls `fill_slot(slot, kmer)` once per kmer for DataStore
|
||||
/// population. Returns the number of kmers indexed.
|
||||
pub(crate) fn build(
|
||||
dir: &Path,
|
||||
fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>,
|
||||
) -> OLMResult<usize> {
|
||||
use rayon::prelude::*;
|
||||
|
||||
let unitigs = UnitigFileReader::open(&dir.join(UNITIGS_FILE))?;
|
||||
let unitig_path = dir.join(UNITIGS_FILE);
|
||||
|
||||
// Write .idx so that UnitigFileReader::open succeeds and parallel
|
||||
// random access is available for MPHF construction.
|
||||
build_unitig_idx(&unitig_path, DEFAULT_BLOCK_BITS)?;
|
||||
|
||||
let unitigs = UnitigFileReader::open(&unitig_path)?;
|
||||
let n = unitigs.n_kmers();
|
||||
|
||||
if n == 0 {
|
||||
@@ -80,7 +136,7 @@ impl MphfLayer {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Pass 1 — build MPHF
|
||||
// Pass 1 — build MPHF (parallel, uses random access via .idx)
|
||||
let keys = (0..unitigs.len())
|
||||
.into_par_iter()
|
||||
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
|
||||
@@ -90,7 +146,7 @@ impl MphfLayer {
|
||||
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
|
||||
|
||||
// Pass 2 — fill evidence + mode-specific data via callback
|
||||
let unitigs2 = UnitigFileReader::open(&dir.join(UNITIGS_FILE))?;
|
||||
let unitigs2 = UnitigFileReader::open(&unitig_path)?;
|
||||
let mut ev = EvidenceWriter::new(n);
|
||||
let mut seen = vec![0u8; (n + 7) / 8];
|
||||
|
||||
|
||||
@@ -11,16 +11,10 @@ fn write_unitigs(dir: &Path, seqs: &[&[u8]]) {
|
||||
w.close().unwrap();
|
||||
}
|
||||
|
||||
fn all_canonical_kmers(dir: &Path, k: usize) -> Vec<CanonicalKmer> {
|
||||
let r = UnitigFileReader::open(&dir.join(UNITIGS_FILE)).unwrap();
|
||||
let mut out = Vec::new();
|
||||
for ci in 0..r.len() {
|
||||
let n = r.seql(ci) - k + 1;
|
||||
for rank in 0..n {
|
||||
out.push(Kmer::from_raw(r.raw_kmer(ci, rank)).canonical());
|
||||
}
|
||||
}
|
||||
out
|
||||
fn all_canonical_kmers(dir: &Path) -> Vec<CanonicalKmer> {
|
||||
UnitigFileReader::open_sequential(&dir.join(UNITIGS_FILE)).unwrap()
|
||||
.iter_canonical_kmers()
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -28,7 +22,7 @@ fn build_and_query_all_kmers_found() {
|
||||
set_k(4);
|
||||
let dir = tempdir().unwrap();
|
||||
write_unitigs(dir.path(), &[b"AAAACGT"]);
|
||||
let kmers = all_canonical_kmers(dir.path(), 4);
|
||||
let kmers = all_canonical_kmers(dir.path());
|
||||
Layer::<()>::build(dir.path()).unwrap();
|
||||
let layer = Layer::<()>::open(dir.path()).unwrap();
|
||||
for kmer in kmers {
|
||||
@@ -41,7 +35,7 @@ fn counts_are_stored_and_retrieved() {
|
||||
set_k(4);
|
||||
let dir = tempdir().unwrap();
|
||||
write_unitigs(dir.path(), &[b"AAAACGT"]);
|
||||
let kmers = all_canonical_kmers(dir.path(), 4);
|
||||
let kmers = all_canonical_kmers(dir.path());
|
||||
let count_map: HashMap<CanonicalKmer, u32> =
|
||||
kmers.iter().enumerate().map(|(i, &k)| (k, i as u32 + 1)).collect();
|
||||
Layer::<PersistentCompactIntMatrix>::build(
|
||||
|
||||
@@ -8,7 +8,7 @@ pub use error::{SKError, SKResult};
|
||||
pub use meta::SKFileMeta;
|
||||
pub use pool::{SKFilePool, SKFileWriter, SharedPool, create_token, create_token_with};
|
||||
pub use reader::{SKFileIter, SKFileReader};
|
||||
pub use unitig_index::{UnitigFileReader, UnitigFileWriter};
|
||||
pub use unitig_index::{UnitigFileReader, UnitigFileWriter, build_unitig_idx, DEFAULT_BLOCK_BITS};
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ fn write_read(seqs: &[&[u8]]) -> (tempfile::TempDir, UnitigFileReader) {
|
||||
w.write(&make_unitig(s)).unwrap();
|
||||
}
|
||||
w.close().unwrap();
|
||||
super::build_unitig_idx(&path, super::DEFAULT_BLOCK_BITS).unwrap();
|
||||
let r = UnitigFileReader::open(&path).unwrap();
|
||||
(dir, r)
|
||||
}
|
||||
@@ -31,6 +32,7 @@ fn roundtrip_empty_index() {
|
||||
let path = dir.path().join("unitigs.bin");
|
||||
let w = UnitigFileWriter::create(&path).unwrap();
|
||||
w.close().unwrap();
|
||||
super::build_unitig_idx(&path, super::DEFAULT_BLOCK_BITS).unwrap();
|
||||
let r = UnitigFileReader::open(&path).unwrap();
|
||||
assert_eq!(r.len(), 0);
|
||||
}
|
||||
@@ -145,6 +147,7 @@ fn iter_kmers_empty_file() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("unitigs.bin");
|
||||
UnitigFileWriter::create(&path).unwrap().close().unwrap();
|
||||
super::build_unitig_idx(&path, super::DEFAULT_BLOCK_BITS).unwrap();
|
||||
let r = UnitigFileReader::open(&path).unwrap();
|
||||
assert_eq!(r.iter_kmers().count(), 0);
|
||||
}
|
||||
@@ -280,6 +283,7 @@ fn write_read_bb(seqs: &[&[u8]], block_bits: u8) -> (tempfile::TempDir, UnitigFi
|
||||
w.write(&make_unitig(s)).unwrap();
|
||||
}
|
||||
w.close().unwrap();
|
||||
super::build_unitig_idx(&path, block_bits).unwrap();
|
||||
let r = UnitigFileReader::open(&path).unwrap();
|
||||
(dir, r)
|
||||
}
|
||||
@@ -293,6 +297,7 @@ fn block_bits_stored_and_read_back() {
|
||||
let w = UnitigFileWriter::create_with_block_bits(&path, bb).unwrap();
|
||||
assert_eq!(w.block_bits(), bb);
|
||||
w.close().unwrap();
|
||||
super::build_unitig_idx(&path, bb).unwrap();
|
||||
let r = UnitigFileReader::open(&path).unwrap();
|
||||
assert_eq!(r.block_bits(), bb, "block_bits={bb} not preserved");
|
||||
}
|
||||
|
||||
@@ -39,7 +39,6 @@ fn idx_path(path: &Path) -> PathBuf {
|
||||
/// Unitigs with more than [`MAX_KMERS_PER_CHUNK`] k-mers are transparently split
|
||||
/// into overlapping chunks (k−1 nucleotide overlap) so no k-mer is lost.
|
||||
pub struct UnitigFileWriter {
|
||||
path: PathBuf,
|
||||
file: BufWriter<File>,
|
||||
block_offsets: Vec<u32>,
|
||||
chunk_count: usize,
|
||||
@@ -64,7 +63,6 @@ impl UnitigFileWriter {
|
||||
assert!(block_bits <= 31, "block_bits must be ≤ 31");
|
||||
let file = File::create(path).map_err(SKError::Io)?;
|
||||
Ok(Self {
|
||||
path: path.to_owned(),
|
||||
file: BufWriter::new(file),
|
||||
block_offsets: Vec::new(),
|
||||
chunk_count: 0,
|
||||
@@ -122,19 +120,14 @@ impl UnitigFileWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flush and close the binary sequence file.
|
||||
///
|
||||
/// The companion `.idx` file is **not** written here; call
|
||||
/// [`build_unitig_idx`] separately when exact evidence is needed.
|
||||
pub fn close(mut self) -> SKResult<()> {
|
||||
self.file.flush().map_err(SKError::Io)?;
|
||||
drop(self.file);
|
||||
|
||||
self.block_offsets.push(self.next_offset);
|
||||
|
||||
write_idx(
|
||||
&idx_path(&self.path),
|
||||
self.chunk_count as u32,
|
||||
self.n_kmers as u64,
|
||||
self.block_bits,
|
||||
&self.block_offsets,
|
||||
)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize { self.chunk_count }
|
||||
@@ -160,6 +153,48 @@ fn write_idx(
|
||||
w.flush().map_err(SKError::Io)
|
||||
}
|
||||
|
||||
/// Scan an existing `unitigs.bin` file and write its companion `.idx`.
|
||||
///
|
||||
/// Called by the exact-evidence construction route after the sequence file is
|
||||
/// closed. `block_bits` controls index granularity (1 << block_bits chunks per
|
||||
/// offset entry); use [`DEFAULT_BLOCK_BITS`] for the default.
|
||||
pub fn build_unitig_idx(unitigs_path: &Path, block_bits: u8) -> SKResult<()> {
|
||||
assert!(block_bits <= 31, "block_bits must be ≤ 31");
|
||||
|
||||
let file = File::open(unitigs_path).map_err(SKError::Io)?;
|
||||
let mmap = unsafe { Mmap::map(&file).map_err(SKError::Io)? };
|
||||
|
||||
let k = obikseq::params::k();
|
||||
let block_size = 1usize << block_bits;
|
||||
let mask = block_size - 1;
|
||||
|
||||
let mut block_offsets: Vec<u32> = Vec::new();
|
||||
let mut offset = 0usize;
|
||||
let mut chunk_count = 0usize;
|
||||
let mut n_kmers = 0usize;
|
||||
|
||||
while offset < mmap.len() {
|
||||
if chunk_count & mask == 0 {
|
||||
block_offsets.push(offset as u32);
|
||||
}
|
||||
let seql_minus_k = mmap[offset] as usize;
|
||||
let byte_len = (seql_minus_k + k + 3) / 4;
|
||||
n_kmers += seql_minus_k + 1;
|
||||
offset += 1 + byte_len;
|
||||
chunk_count += 1;
|
||||
}
|
||||
|
||||
block_offsets.push(offset as u32); // sentinel
|
||||
|
||||
write_idx(
|
||||
&idx_path(unitigs_path),
|
||||
chunk_count as u32,
|
||||
n_kmers as u64,
|
||||
block_bits,
|
||||
&block_offsets,
|
||||
)
|
||||
}
|
||||
|
||||
// ── Reader ────────────────────────────────────────────────────────────────────
|
||||
|
||||
/// Read-only random-access view of a unitig file.
|
||||
@@ -178,6 +213,7 @@ pub struct UnitigFileReader {
|
||||
}
|
||||
|
||||
impl UnitigFileReader {
|
||||
/// Open with `.idx` — enables both sequential iteration and random access.
|
||||
pub fn open(path: &Path) -> SKResult<Self> {
|
||||
let file = File::open(path).map_err(SKError::Io)?;
|
||||
let mmap = unsafe { Mmap::map(&file).map_err(SKError::Io)? };
|
||||
@@ -194,6 +230,37 @@ impl UnitigFileReader {
|
||||
})
|
||||
}
|
||||
|
||||
/// Open without `.idx` — sequential iteration only, no random access.
|
||||
///
|
||||
/// Scans the binary file once to count chunks and k-mers. Use when only
|
||||
/// [`Self::iter_kmers`], [`Self::iter_canonical_kmers`], or
|
||||
/// [`Self::iter_indexed_canonical_kmers`] are needed.
|
||||
pub fn open_sequential(path: &Path) -> SKResult<Self> {
|
||||
let file = File::open(path).map_err(SKError::Io)?;
|
||||
let mmap = unsafe { Mmap::map(&file).map_err(SKError::Io)? };
|
||||
let k = obikseq::params::k();
|
||||
|
||||
let mut offset = 0usize;
|
||||
let mut n_unitigs = 0usize;
|
||||
let mut n_kmers = 0usize;
|
||||
while offset < mmap.len() {
|
||||
let seql_minus_k = mmap[offset] as usize;
|
||||
n_kmers += seql_minus_k + 1;
|
||||
offset += 1 + (seql_minus_k + k + 3) / 4;
|
||||
n_unitigs += 1;
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
mmap,
|
||||
block_offsets: Vec::new(), // empty → random access disabled
|
||||
n_unitigs,
|
||||
n_kmers,
|
||||
k,
|
||||
block_bits: DEFAULT_BLOCK_BITS,
|
||||
mask: (1usize << DEFAULT_BLOCK_BITS) - 1,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize { self.n_unitigs }
|
||||
pub fn is_empty(&self) -> bool { self.n_unitigs == 0 }
|
||||
pub fn n_kmers(&self) -> usize { self.n_kmers }
|
||||
@@ -202,6 +269,8 @@ impl UnitigFileReader {
|
||||
/// Byte offset of the START of record `i` (the seql byte) in the mmap.
|
||||
#[inline]
|
||||
fn chunk_start(&self, i: usize) -> usize {
|
||||
assert!(!self.block_offsets.is_empty(),
|
||||
"random access requires UnitigFileReader::open(); use open_sequential() for iteration only");
|
||||
let block = i >> self.block_bits;
|
||||
let rem = i & self.mask;
|
||||
let mut offset = self.block_offsets[block] as usize;
|
||||
|
||||
Reference in New Issue
Block a user