From f48f7500cdcd62e834da236f405b4f3e7eadcd29 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Thu, 14 May 2026 09:24:25 +0800 Subject: [PATCH] refactor(obilayeredmap): support generic payload types Replace the hardcoded `Counts` module with a generic `LayerData` trait, parameterizing `Layer` and `LayeredMap` over arbitrary payload types. This decouples read-path access from build-path logic, enabling both set membership and count-based indexing via `PersistentCompactIntVec`. Adds the `obicompactvec` dependency, implements streaming layer construction, and expands test coverage for persistence and multi-layer resolution. --- docmd/implementation/obilayeredmap.md | 124 +++++++++----- src/Cargo.lock | 1 + src/obilayeredmap/Cargo.toml | 5 +- src/obilayeredmap/src/counts.rs | 54 ------ src/obilayeredmap/src/layer.rs | 231 ++++++++++++++++---------- src/obilayeredmap/src/lib.rs | 3 +- src/obilayeredmap/src/map.rs | 82 ++++++--- src/obilayeredmap/src/tests/layer.rs | 25 +-- src/obilayeredmap/src/tests/map.rs | 29 ++-- 9 files changed, 309 insertions(+), 245 deletions(-) delete mode 100644 src/obilayeredmap/src/counts.rs diff --git a/docmd/implementation/obilayeredmap.md b/docmd/implementation/obilayeredmap.md index d10bba3..9a5ccbf 100644 --- a/docmd/implementation/obilayeredmap.md +++ b/docmd/implementation/obilayeredmap.md @@ -10,52 +10,63 @@ The MPHF + evidence infrastructure is fixed for all modes. The **payload** — data associated with each slot — is orthogonal and varies by mode. -| Mode | Description | Payload | -|---|---|---| -| 1. Set | membership test only | none | -| 2. Set with count | occurrences per kmer per sample | compact integer vector | -| 3. Presence/absence matrix | which genomes contain each kmer | bit matrix n_kmers × n_genomes | -| 4. Count matrix | occurrences per kmer per genome | integer matrix n_kmers × n_genomes | +| Mode | Description | Payload type | File | +|---|---|---|---| +| 1. Set | membership test only | `()` | — | +| 2. Set with count | occurrences per kmer per sample | `PersistentCompactIntVec` | `counts.pciv` | +| 3. Presence/absence matrix | which genomes contain each kmer | `PersistentBitVec` per genome | `presence_N.pbiv` | +| 4. Count matrix | occurrences per kmer per genome | `PersistentCompactIntVec` per genome | `counts_N.pciv` | -Mode 4 is architecturally identical to mode 3 but with counts instead of bits; the main open question is scale — a naive dense representation reaches hundreds of GB for large genome collections, which may require a sparse encoding. +Both `PersistentCompactIntVec` and `PersistentBitVec` come from the `obicompactvec` crate. Modes 3 and 4 are not yet implemented; the per-genome multi-file layout and query API remain to be designed. -### Payload for mode 2: compact count vector +### Payload for mode 2: PersistentCompactIntVec -Counts follow a roughly geometric distribution: the large majority are below 128, almost all below 16 000, with rare large values in highly covered regions. Options for a mmap-compatible random-access count vector: +`PersistentCompactIntVec` (PCIV) stores one `u32` count per MPHF slot in a single mmap'd `.pciv` file. Its encoding: a primary `u8` array (value 255 = overflow sentinel) backed by a sorted overflow section of `(slot: u64, value: u32)` entries and a sparse L1-fitting index for fast binary search. This handles the geometric count distribution efficiently — most values fit in 1 byte, overflow entries are rare. -- **`Vec`** — 2 bytes/slot, covers 0–65 535, O(1) access, trivially mmap-able. Practical upper bound sufficient for most metagenomic use cases. -- **Bit-packed fixed width** — choose B = ⌈log₂(max\_count)⌉ globally (e.g. B=14 for 99.9% coverage at 1.75 bytes/slot). O(1) access via bit-shift arithmetic. -- **Block-varint (PForDelta, StreamVByte)** — good compression but random access requires a separate offset index; no mature Rust crate for mmap use. +Capacity: 0 to u32::MAX per slot. No separate decision needed on bit-width: PCIV adapts to the data. -Decision not yet made. `Vec` is the default baseline pending profiling. +### Payload for mode 3/4: PersistentBitVec / PersistentCompactIntVec -### Payload for mode 3: presence/absence matrix +`PersistentBitVec` (PBIV) stores one bit per MPHF slot in a mmap'd `.pbiv` file with u64 word-level bulk operations (AND, OR, XOR, NOT, POPCNT, Jaccard, Hamming). One PBIV per genome gives a column-major presence/absence matrix, making per-genome set operations cache-friendly. -Column-major bit matrix: column j (genome j) is a contiguous `n_slots`-bit array. This layout makes per-genome operations (union, intersection, diff) cache-friendly. For 10⁹ slots × 100 genomes ≈ 12.5 GB — large but mmap-able with no resident-memory cost at open time. +Mode 4 replaces PBIV with PCIV per genome. Multi-file layout and query API are not yet designed. --- ## Payload architecture -The payload is orthogonal to the MPHF + evidence layer. `Layer` will be parameterised by a payload type `D`: +The payload is orthogonal to the MPHF + evidence layer. `Layer` is parameterised by `D: LayerData`: ```rust -struct Layer { +pub trait LayerData: Sized { + type Item; + fn open(layer_dir: &Path) -> OLMResult; + fn read(&self, slot: usize) -> Self::Item; +} + +pub struct Layer { mphf: Mphf, evidence: Evidence, unitigs: UnitigFileReader, data: D, } + +pub struct Hit { + pub slot: usize, + pub data: T, +} ``` -Where `D` implements a `LayerData` trait covering open/close/get. Concrete instances: +`LayerData` covers the **read path only** (`open` + `read`). The write path (build) is intentionally not in the trait — build signatures differ between modes (mode 1 takes no extra argument, mode 2 takes a `count_of` closure) and forcing this into a trait would require an associated `Context` type with no benefit over specialized `impl` blocks. -- `()` — mode 1 -- `CountVec` — mode 2 (u16 or bit-packed) -- `BitMatrix` — mode 3 -- `CountMatrix` — mode 4 +Implemented concrete types: -This parameterisation is not yet implemented; the current code uses a fixed `Counts` field. +| Type | `Item` | Description | +|---|---|---| +| `()` | `()` | mode 1 — membership only | +| `PersistentCompactIntVec` | `u32` | mode 2 — per-slot count | + +`LayeredMap` mirrors the same parameterisation: `LayeredMap`. --- @@ -70,8 +81,8 @@ index_root/ ← LayeredMap (collection) unitigs.bin unitigs.bin.idx evidence.bin - counts.bin [mode 2 only] - presence.bin [mode 3/4 only] + counts.pciv [mode 2 only] + presence_N.pbiv [mode 3/4, one per genome — not yet implemented] layer_1/ ... part_00001/ @@ -95,8 +106,7 @@ layer_N/ unitigs.bin — packed 2-bit nucleotide sequences (obiskio binary format) unitigs.bin.idx — UIDX index: n_unitigs, n_kmers, seqls[], packed_offsets[] evidence.bin — u32 per MPHF slot: (unitig_id: 25 | rank: 7) - counts.bin — [optional] u16 per MPHF slot (mode 2) - presence.bin — [optional] bit matrix n_slots × n_samples (mode 3/4) + counts.pciv — [mode 2] PersistentCompactIntVec: one u32 per slot ``` `unitigs.bin` is the packed-2-bit sequence file produced by `obiskio::UnitigFileWriter`. The companion `.idx` file stores: magic `UIDX`, `n_unitigs: u32`, `n_kmers: u64`, `seqls: [u8; n_unitigs]` (kmer count − 1 per chunk), and `packed_offsets: [u32; n_unitigs + 1]` (byte offsets into `unitigs.bin`, sentinel-terminated). This gives O(1) random access to any unitig and the total kmer count without scanning the sequence file. @@ -125,11 +135,11 @@ The MPHF per layer is configured as: ```rust type Mphf = PtrHash< - u64, // key type: canonical kmer raw encoding - CubicEps, // bucket fn: balanced (2.4 bits/key, λ=3.5) + u64, // key type: canonical kmer raw encoding + CubicEps, // bucket fn: balanced (2.4 bits/key, λ=3.5) CachelineEfVec>, // remap: 11.6 bits/entry vs 32 for Vec - Xx64, // hasher: XXH3-64 with seed, handles structured keys - Vec, // pilots + Xx64, // hasher: XXH3-64 with seed, handles structured keys + Vec, // pilots >; ``` @@ -139,7 +149,30 @@ type Mphf = PtrHash< **Remap — `CachelineEfVec`:** Elias-Fano variant packing 44 sorted 40-bit values per 64-byte cacheline (11.6 bits/value vs 32 for `Vec`). Already a transitive dependency of `ptr_hash`. One cacheline per query vs one u32 read; space win dominates for billion-scale key sets. -**Construction:** `new_from_par_iter` avoids materialising all keys as `Vec`. MPHF correctness is verified inline during the second pass (evidence/counts fill) using an n/8-byte bitset; any duplicate slot or out-of-bounds index returns `OLMError::Mphf`. +--- + +## Build path + +The build path is not part of `LayerData`. Each mode exposes its own `impl Layer::build` with the exact signature it needs. Two private module-level helpers avoid code duplication: + +**`build_mphf(out_dir, n) -> OLMResult`**: first pass — opens `unitigs.bin`, iterates all canonical kmers in parallel via `new_from_par_iter`, stores `mphf.bin`. O(n). + +**`build_second_pass(out_dir, n, mphf, fill_slot) -> OLMResult<()>`**: second pass — opens `unitigs.bin` again, fills `evidence.bin` and a compact n/8-byte seen-bitset (MPHF correctness check inline), calls `fill_slot(slot, kmer)` once per kmer for the mode-specific payload. O(n). + +```rust +// mode 1 +impl Layer<()> { + pub fn build(out_dir: &Path) -> OLMResult +} + +// mode 2 +impl Layer { + pub fn build(out_dir: &Path, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult + pub fn build_from_map(out_dir: &Path, counts: &HashMap) -> OLMResult +} +``` + +Any duplicate slot or out-of-bounds index detected during `build_second_pass` returns `OLMError::Mphf`. `new_from_par_iter` avoids materialising all keys as `Vec`. --- @@ -148,20 +181,20 @@ type Mphf = PtrHash< A kmer query routes through all three levels: 1. **Partition routing**: hash canonical minimiser of the query kmer → partition index → open `part_XXXXX/`. -2. **Layer probing**: iterate layers in order within the partition; for each layer compute `slot = mphf.index(kmer)`, then verify `evidence.decode(slot) == kmer`. First match wins. -3. **Data access**: read payload at `slot` from the matching layer (counts, presence row, etc.). +2. **Layer probing**: iterate layers in order; for each layer compute `slot = mphf.index(kmer)`, decode evidence, compare to query. First match wins. +3. **Data access**: `layer.data.read(slot)` returns `D::Item`. -``` -fn query(kmer) → Option: - part = partition_of(kmer) - for (i, layer) in part.layers.iter().enumerate(): +```rust +// pseudo-code +fn query(kmer) -> Option<(usize, Hit)>: + for (i, layer) in self.layers.iter().enumerate(): slot = layer.mphf.index(&kmer.raw()) if layer.evidence.decode(slot) == kmer: - return Some(Hit { layer: i, slot }) + return Some((i, Hit { slot, data: layer.data.read(slot) })) return None ``` -Expected probe depth: 1 for kmers in layer 0, increasing for later layers. In practice the dominant dataset should be layer 0. +Expected probe depth: 1 for kmers in layer 0, increasing for later layers. --- @@ -188,16 +221,15 @@ Each partition's new layer is built independently; the operation is fully parall | `epserde 0.8` | zero-copy serialisation of MPHF | | `memmap2` | mmap of layer files | | `obiskio` | unitig file writer/reader | - -Flat arrays (evidence, counts, presence) use a simple custom binary format (raw bytes, fixed element size) opened via mmap — no additional serialisation crate required. +| `obicompactvec` | payload types: `PersistentCompactIntVec`, `PersistentBitVec` | --- ## Open questions -- **Mode 4 scale**: count matrix (n_kmers × n_genomes × bytes_per_count) reaches hundreds of GB for large collections. A sparse representation (store only non-zero entries) may be required; the access pattern and density threshold are not yet defined. -- **Count vector encoding (mode 2)**: `Vec` vs bit-packed (B=14). Decision pending; depends on whether counts > 65 535 occur in practice for the target datasets. -- **Presence matrix layout**: column-major favours per-genome operations; row-major favours per-kmer queries. Decide based on dominant access pattern. -- **Layer merge**: merging two `LayeredMap` instances into a single-layer index requires full rebuild. Define API and cost model; maintenance operation, not query-path. +- **Mode 3/4 multi-file layout**: one PBIV/PCIV per genome per layer means O(n_layers × n_genomes) files. Directory layout, open strategy, and query API are not yet designed. +- **Mode 4 scale**: count matrix (n_kmers × n_genomes × bytes_per_count) reaches hundreds of GB for large collections. A sparse representation may be required; access pattern and density threshold are not yet defined. +- **Presence matrix layout**: column-major (one PBIV per genome) favours per-genome operations; row-major favours per-kmer queries. Dominant access pattern not yet characterised. +- **Layer merge**: merging two `LayeredMap` instances into a single-layer index requires full rebuild. Define API and cost model. - **Canonical kmer orientation**: evidence stores canonical kmer; strand recovery requires one 64-bit revcomp comparison at query time. - **`try_new_from_par_iter`**: `ptr_hash::new_from_par_iter` silently discards construction failure. Post-construction verification (current workaround) is correct but does not allow retry. A `try_new_from_par_iter` PR upstream would close this gap. diff --git a/src/Cargo.lock b/src/Cargo.lock index a26afbd..e428b22 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1752,6 +1752,7 @@ dependencies = [ "cacheline-ef", "epserde 0.8.0", "memmap2", + "obicompactvec", "obikseq", "obiskio", "ptr_hash", diff --git a/src/obilayeredmap/Cargo.toml b/src/obilayeredmap/Cargo.toml index 95d16a0..2964d07 100644 --- a/src/obilayeredmap/Cargo.toml +++ b/src/obilayeredmap/Cargo.toml @@ -4,8 +4,9 @@ version = "0.1.0" edition = "2024" [dependencies] -obikseq = { path = "../obikseq" } -obiskio = { path = "../obiskio" } +obikseq = { path = "../obikseq" } +obiskio = { path = "../obiskio" } +obicompactvec = { path = "../obicompactvec" } ptr_hash = "1.1" cacheline-ef = "1.1" epserde = "0.8" diff --git a/src/obilayeredmap/src/counts.rs b/src/obilayeredmap/src/counts.rs deleted file mode 100644 index cd258d9..0000000 --- a/src/obilayeredmap/src/counts.rs +++ /dev/null @@ -1,54 +0,0 @@ -// u32 per MPHF slot: raw occurrence count for the kmer at that slot. - -use std::fs::File; -use std::io::{BufWriter, Write}; -use std::path::Path; - -use memmap2::Mmap; - -use crate::error::{OLMError, OLMResult}; - -pub struct Counts { - mmap: Mmap, -} - -impl Counts { - pub fn open(path: &Path) -> OLMResult { - let f = File::open(path)?; - let mmap = unsafe { Mmap::map(&f)? }; - Ok(Self { mmap }) - } - - #[inline] - pub fn get(&self, slot: usize) -> u32 { - let off = slot * 4; - u32::from_le_bytes(self.mmap[off..off + 4].try_into().unwrap()) - } - - pub fn len(&self) -> usize { - self.mmap.len() / 4 - } -} - -pub struct CountsWriter { - buf: Vec, -} - -impl CountsWriter { - pub fn new(n_slots: usize) -> Self { - Self { buf: vec![0u32; n_slots] } - } - - #[inline] - pub fn set(&mut self, slot: usize, count: u32) { - self.buf[slot] = count; - } - - pub fn write(self, path: &Path) -> OLMResult<()> { - let mut f = BufWriter::new(File::create(path)?); - for v in self.buf { - f.write_all(&v.to_le_bytes()).map_err(OLMError::Io)?; - } - Ok(()) - } -} diff --git a/src/obilayeredmap/src/layer.rs b/src/obilayeredmap/src/layer.rs index 4c8c0ce..281d907 100644 --- a/src/obilayeredmap/src/layer.rs +++ b/src/obilayeredmap/src/layer.rs @@ -4,130 +4,185 @@ use std::path::Path; use cacheline_ef::{CachelineEf, CachelineEfVec}; use epserde::prelude::*; +use obicompactvec::{PersistentCompactIntVec, PersistentCompactIntVecBuilder}; use obikseq::CanonicalKmer; use obiskio::{UnitigFileReader, UnitigFileWriter}; use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64}; -use crate::counts::{Counts, CountsWriter}; use crate::error::{OLMError, OLMResult}; use crate::evidence::{Evidence, EvidenceWriter}; -const MPHF_FILE: &str = "mphf.bin"; -const UNITIGS_FILE: &str = "unitigs.bin"; +pub(crate) const MPHF_FILE: &str = "mphf.bin"; +pub(crate) const UNITIGS_FILE: &str = "unitigs.bin"; const EVIDENCE_FILE: &str = "evidence.bin"; -const COUNTS_FILE: &str = "counts.bin"; +const COUNTS_FILE: &str = "counts.pciv"; type Mphf = PtrHash>, Xx64, Vec>; -pub struct Layer { - mphf: Mphf, +// ── Trait ───────────────────────────────────────────────────────────────────── + +pub trait LayerData: Sized { + type Item; + fn open(layer_dir: &Path) -> OLMResult; + fn read(&self, slot: usize) -> Self::Item; +} + +impl LayerData for () { + type Item = (); + fn open(_layer_dir: &Path) -> OLMResult { Ok(()) } + fn read(&self, _slot: usize) {} +} + +impl LayerData for PersistentCompactIntVec { + type Item = u32; + fn open(layer_dir: &Path) -> OLMResult { + PersistentCompactIntVec::open(&layer_dir.join(COUNTS_FILE)).map_err(OLMError::Io) + } + fn read(&self, slot: usize) -> u32 { self.get(slot) } +} + +// ── Structures ──────────────────────────────────────────────────────────────── + +pub struct Layer { + mphf: Mphf, evidence: Evidence, - unitigs: UnitigFileReader, - counts: Counts, + unitigs: UnitigFileReader, + data: D, } -pub struct Hit { +pub struct Hit { pub slot: usize, - pub count: u32, + pub data: T, } -impl Layer { +// ── Common read path ────────────────────────────────────────────────────────── + +impl Layer { pub fn open(path: &Path) -> OLMResult { let mphf: Mphf = Mphf::load_full(&path.join(MPHF_FILE)) .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; - - let unitigs = UnitigFileReader::open(&path.join(UNITIGS_FILE))?; + let unitigs = UnitigFileReader::open(&path.join(UNITIGS_FILE))?; let evidence = Evidence::open(&path.join(EVIDENCE_FILE))?; - let counts = Counts::open(&path.join(COUNTS_FILE))?; - - Ok(Self { mphf, evidence, unitigs, counts }) + let data = D::open(path)?; + Ok(Self { mphf, evidence, unitigs, data }) } - pub fn query(&self, kmer: CanonicalKmer) -> Option { + pub fn query(&self, kmer: CanonicalKmer) -> Option> { let slot = self.mphf.index(&kmer.raw()); let (chunk_id, rank) = self.evidence.decode(slot); - if self - .unitigs - .verify_canonical_kmer(chunk_id as usize, rank as usize, kmer) - { - Some(Hit { slot, count: self.counts.get(slot) }) + if self.unitigs.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer) { + Some(Hit { slot, data: self.data.read(slot) }) } else { None } } - /// Build a layer from unitigs already written to `out_dir/unitigs.bin`. - /// - /// `count_of` maps each canonical kmer to its occurrence count. - /// Returns the number of kmers indexed. - pub fn build(out_dir: &Path, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult { - use rayon::prelude::*; - - let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?; - let n = unitigs.n_kmers(); - - if n == 0 { - fs::File::create(out_dir.join(EVIDENCE_FILE))?; - fs::File::create(out_dir.join(COUNTS_FILE))?; - let mphf: Mphf = Mphf::try_new(&[] as &[u64], PtrHashParams::::default()) - .ok_or_else(|| OLMError::Mphf("construction failed".into()))?; - mphf.store(&out_dir.join(MPHF_FILE)) - .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; - return Ok(0); - } - - // First pass: build the MPHF from a cloneable parallel iterator. - // flat_map_iter: outer chunks in parallel, inner kmer sliding-window sequential. - let keys = (0..unitigs.len()) - .into_par_iter() - .flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw())); - let mphf: Mphf = Mphf::new_from_par_iter(n, keys, PtrHashParams::::default()); - mphf.store(&out_dir.join(MPHF_FILE)) - .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; - - // Second pass: fill evidence and counts; verify MPHF correctness inline. - // seen is a compact bitset (n/8 bytes) — no extra iteration needed. - let mut ev = EvidenceWriter::new(n); - let mut cnt = CountsWriter::new(n); - let mut seen = vec![0u8; (n + 7) / 8]; - - for (kmer, chunk_id, rank) in unitigs.iter_indexed_canonical_kmers() { - let slot = mphf.index(&kmer.raw()); - if slot >= n { - return Err(OLMError::Mphf("MPHF construction failed: slot out of bounds".into())); - } - let byte = slot / 8; - let bit = 1u8 << (slot % 8); - if seen[byte] & bit != 0 { - return Err(OLMError::Mphf("MPHF construction failed: duplicate slot".into())); - } - seen[byte] |= bit; - ev.set(slot, chunk_id as u32, rank as u8); - cnt.set(slot, count_of(kmer)); - } - - ev.write(&out_dir.join(EVIDENCE_FILE))?; - cnt.write(&out_dir.join(COUNTS_FILE))?; - - Ok(n) - } - - /// Convenience variant of `build` that accepts a `HashMap`. - pub fn build_from_map( - out_dir: &Path, - counts: &HashMap, - ) -> OLMResult { - Self::build(out_dir, |kmer| counts.get(&kmer).copied().unwrap_or(0)) - } - - /// Return a `UnitigFileWriter` targeting this layer's `unitigs.bin`. - /// The caller writes unitigs, then calls `Layer::build` to finish the layer. pub fn unitig_writer(out_dir: &Path) -> OLMResult { fs::create_dir_all(out_dir)?; Ok(UnitigFileWriter::create(&out_dir.join(UNITIGS_FILE))?) } } +// ── Build helpers (private) ─────────────────────────────────────────────────── + +fn build_mphf(out_dir: &Path, n: usize) -> OLMResult { + use rayon::prelude::*; + let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?; + let keys = (0..unitigs.len()) + .into_par_iter() + .flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw())); + let mphf: Mphf = Mphf::new_from_par_iter(n, keys, PtrHashParams::::default()); + mphf.store(&out_dir.join(MPHF_FILE)) + .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; + Ok(mphf) +} + +fn build_second_pass( + out_dir: &Path, + n: usize, + mphf: &Mphf, + fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>, +) -> OLMResult<()> { + let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?; + let mut ev = EvidenceWriter::new(n); + let mut seen = vec![0u8; (n + 7) / 8]; + + for (kmer, chunk_id, rank) in unitigs.iter_indexed_canonical_kmers() { + let slot = mphf.index(&kmer.raw()); + if slot >= n { + return Err(OLMError::Mphf("slot out of bounds".into())); + } + let byte = slot / 8; + let bit = 1u8 << (slot % 8); + if seen[byte] & bit != 0 { + return Err(OLMError::Mphf("duplicate slot".into())); + } + seen[byte] |= bit; + ev.set(slot, chunk_id as u32, rank as u8); + fill_slot(slot, kmer)?; + } + + ev.write(&out_dir.join(EVIDENCE_FILE))?; + Ok(()) +} + +fn empty_layer(out_dir: &Path) -> OLMResult<()> { + fs::File::create(out_dir.join(EVIDENCE_FILE))?; + let mphf: Mphf = Mphf::try_new(&[] as &[u64], PtrHashParams::::default()) + .ok_or_else(|| OLMError::Mphf("construction failed".into()))?; + mphf.store(&out_dir.join(MPHF_FILE)) + .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; + Ok(()) +} + +// ── Mode 1 — set membership ─────────────────────────────────────────────────── + +impl Layer<()> { + pub fn build(out_dir: &Path) -> OLMResult { + let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?; + let n = unitigs.n_kmers(); + if n == 0 { + empty_layer(out_dir)?; + return Ok(0); + } + let mphf = build_mphf(out_dir, n)?; + build_second_pass(out_dir, n, &mphf, &mut |_, _| Ok(()))?; + Ok(n) + } +} + +// ── Mode 2 — counts (PersistentCompactIntVec) ───────────────────────────────── + +impl Layer { + pub fn build(out_dir: &Path, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult { + let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?; + let n = unitigs.n_kmers(); + if n == 0 { + empty_layer(out_dir)?; + PersistentCompactIntVecBuilder::new(0, &out_dir.join(COUNTS_FILE)) + .and_then(|b| b.close()) + .map_err(OLMError::Io)?; + return Ok(0); + } + let mphf = build_mphf(out_dir, n)?; + let mut cnt = PersistentCompactIntVecBuilder::new(n, &out_dir.join(COUNTS_FILE)) + .map_err(OLMError::Io)?; + build_second_pass(out_dir, n, &mphf, &mut |slot, kmer| { + cnt.set(slot, count_of(kmer)); + Ok(()) + })?; + cnt.close().map_err(OLMError::Io)?; + Ok(n) + } + + pub fn build_from_map( + out_dir: &Path, + counts: &HashMap, + ) -> OLMResult { + Self::build(out_dir, |kmer| counts.get(&kmer).copied().unwrap_or(0)) + } +} + #[cfg(test)] #[path = "tests/layer.rs"] mod tests; diff --git a/src/obilayeredmap/src/lib.rs b/src/obilayeredmap/src/lib.rs index babd20e..0872bdc 100644 --- a/src/obilayeredmap/src/lib.rs +++ b/src/obilayeredmap/src/lib.rs @@ -1,4 +1,3 @@ -pub mod counts; pub mod error; pub mod evidence; pub mod layer; @@ -6,5 +5,5 @@ pub mod map; pub mod meta; pub use error::{OLMError, OLMResult}; -pub use layer::{Hit, Layer}; +pub use layer::{Hit, Layer, LayerData}; pub use map::LayeredMap; diff --git a/src/obilayeredmap/src/map.rs b/src/obilayeredmap/src/map.rs index 2c8a349..a2cc43b 100644 --- a/src/obilayeredmap/src/map.rs +++ b/src/obilayeredmap/src/map.rs @@ -2,11 +2,12 @@ use std::collections::HashMap; use std::fs; use std::path::{Path, PathBuf}; +use obicompactvec::PersistentCompactIntVec; use obikseq::CanonicalKmer; use obiskio::UnitigFileWriter; use crate::error::OLMResult; -use crate::layer::{Hit, Layer}; +use crate::layer::{Hit, Layer, LayerData}; use crate::meta::PartitionMeta; /// Layered kmer index for a single partition. @@ -14,20 +15,26 @@ use crate::meta::PartitionMeta; /// Each layer covers a disjoint kmer set. Queries probe layers in order; /// the first match wins. Adding a dataset appends a new layer without /// rebuilding existing ones. -pub struct LayeredMap { - root: PathBuf, - meta: PartitionMeta, - layers: Vec, +pub struct LayeredMap { + root: PathBuf, + meta: PartitionMeta, + layers: Vec>, } -impl LayeredMap { +// ── Common methods ──────────────────────────────────────────────────────────── + +impl LayeredMap { /// Open an existing layered index at `root`. pub fn open(root: &Path) -> OLMResult { let meta = PartitionMeta::load(root)?; let layers = (0..meta.n_layers) - .map(|i| Layer::open(&layer_dir(root, i))) + .map(|i| Layer::::open(&layer_dir(root, i))) .collect::>>()?; - Ok(Self { root: root.to_owned(), meta, layers }) + Ok(Self { + root: root.to_owned(), + meta, + layers, + }) } /// Create a new, empty layered index at `root`. @@ -35,48 +42,71 @@ impl LayeredMap { fs::create_dir_all(root)?; let meta = PartitionMeta::new(); meta.save(root)?; - Ok(Self { root: root.to_owned(), meta, layers: Vec::new() }) + Ok(Self { + root: root.to_owned(), + meta, + layers: Vec::new(), + }) } + /// Return the number of layers in this index. pub fn n_layers(&self) -> usize { self.layers.len() } - pub fn layer(&self, i: usize) -> &Layer { + /// Return a reference to the `i`-th layer. + pub fn layer(&self, i: usize) -> &Layer { &self.layers[i] } /// Query `kmer` across all layers. Returns `(layer_index, Hit)` on match. - pub fn query(&self, kmer: CanonicalKmer) -> Option<(usize, Hit)> { - self.layers.iter().enumerate().find_map(|(i, layer)| { - layer.query(kmer).map(|hit| (i, hit)) - }) + pub fn query(&self, kmer: CanonicalKmer) -> Option<(usize, Hit)> { + self.layers + .iter() + .enumerate() + .find_map(|(i, layer)| layer.query(kmer).map(|hit| (i, hit))) } /// Return a `UnitigFileWriter` for the next layer to be built. - /// The caller writes unitigs, calls `.close()` on the writer, - /// then calls `push_layer` to finish. pub fn next_layer_writer(&self) -> OLMResult { let dir = layer_dir(&self.root, self.layers.len()); - Layer::unitig_writer(&dir) + Layer::::unitig_writer(&dir) } - /// Build and append the next layer from a count closure. - /// Unitigs must already have been written via `next_layer_writer`. - pub fn push_layer( - &mut self, - count_of: impl Fn(CanonicalKmer) -> u32, - ) -> OLMResult { + /// Append a new layer to the index. + fn append_layer(&mut self) -> OLMResult<()> { let i = self.layers.len(); let dir = layer_dir(&self.root, i); - Layer::build(&dir, count_of)?; - self.layers.push(Layer::open(&dir)?); + self.layers.push(Layer::::open(&dir)?); self.meta.n_layers = self.layers.len(); self.meta.save(&self.root)?; + Ok(()) + } +} + +// ── Mode 1 — set membership ─────────────────────────────────────────────────── + +impl LayeredMap<()> { + pub fn push_layer(&mut self) -> OLMResult { + let i = self.layers.len(); + let dir = layer_dir(&self.root, i); + Layer::<()>::build(&dir)?; + self.append_layer()?; + Ok(i) + } +} + +// ── Mode 2 — counts ─────────────────────────────────────────────────────────── + +impl LayeredMap { + pub fn push_layer(&mut self, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult { + let i = self.layers.len(); + let dir = layer_dir(&self.root, i); + Layer::::build(&dir, count_of)?; + self.append_layer()?; Ok(i) } - /// Convenience variant of `push_layer` that accepts a `HashMap`. pub fn push_layer_from_map( &mut self, counts: &HashMap, diff --git a/src/obilayeredmap/src/tests/layer.rs b/src/obilayeredmap/src/tests/layer.rs index 4831adf..d32a386 100644 --- a/src/obilayeredmap/src/tests/layer.rs +++ b/src/obilayeredmap/src/tests/layer.rs @@ -1,4 +1,5 @@ use super::*; +use obicompactvec::PersistentCompactIntVec; use obikseq::{set_k, Kmer, Sequence as _, Unitig}; use tempfile::tempdir; @@ -28,8 +29,8 @@ fn build_and_query_all_kmers_found() { let dir = tempdir().unwrap(); write_unitigs(dir.path(), &[b"AAAACGT"]); let kmers = all_canonical_kmers(dir.path(), 4); - Layer::build(dir.path(), |_| 1).unwrap(); - let layer = Layer::open(dir.path()).unwrap(); + Layer::<()>::build(dir.path()).unwrap(); + let layer = Layer::<()>::open(dir.path()).unwrap(); for kmer in kmers { assert!(layer.query(kmer).is_some(), "kmer should be present"); } @@ -43,11 +44,14 @@ fn counts_are_stored_and_retrieved() { let kmers = all_canonical_kmers(dir.path(), 4); let count_map: HashMap = kmers.iter().enumerate().map(|(i, &k)| (k, i as u32 + 1)).collect(); - Layer::build(dir.path(), |kmer| count_map.get(&kmer).copied().unwrap_or(0)).unwrap(); - let layer = Layer::open(dir.path()).unwrap(); + Layer::::build( + dir.path(), + |kmer| count_map.get(&kmer).copied().unwrap_or(0), + ).unwrap(); + let layer = Layer::::open(dir.path()).unwrap(); for kmer in &kmers { let hit = layer.query(*kmer).expect("kmer must be present"); - assert_eq!(hit.count, count_map[kmer]); + assert_eq!(hit.data, count_map[kmer]); } } @@ -56,8 +60,8 @@ fn query_absent_returns_none() { set_k(4); let dir = tempdir().unwrap(); write_unitigs(dir.path(), &[b"AAAACGT"]); - Layer::build(dir.path(), |_| 1).unwrap(); - let layer = Layer::open(dir.path()).unwrap(); + Layer::<()>::build(dir.path()).unwrap(); + let layer = Layer::<()>::open(dir.path()).unwrap(); let absent = Kmer::from_ascii(b"CCCC").unwrap().canonical(); assert!(layer.query(absent).is_none()); } @@ -66,12 +70,11 @@ fn query_absent_returns_none() { fn open_after_build_is_consistent() { set_k(4); let dir = tempdir().unwrap(); - // "AAAACGT": 7 nucl → 4 kmers, all with distinct canonical forms write_unitigs(dir.path(), &[b"AAAACGT"]); - let n = Layer::build(dir.path(), |_| 7).unwrap(); + let n = Layer::::build(dir.path(), |_| 7).unwrap(); assert_eq!(n, 4); - let layer = Layer::open(dir.path()).unwrap(); + let layer = Layer::::open(dir.path()).unwrap(); let kmer = Kmer::from_ascii(b"AAAA").unwrap().canonical(); let hit = layer.query(kmer).expect("AAAA must be present"); - assert_eq!(hit.count, 7); + assert_eq!(hit.data, 7); } diff --git a/src/obilayeredmap/src/tests/map.rs b/src/obilayeredmap/src/tests/map.rs index 932a048..57e2450 100644 --- a/src/obilayeredmap/src/tests/map.rs +++ b/src/obilayeredmap/src/tests/map.rs @@ -1,9 +1,10 @@ use super::*; +use obicompactvec::PersistentCompactIntVec; use obikseq::{set_k, Sequence as _, Unitig}; use tempfile::tempdir; fn push_unitigs_and_layer( - map: &mut LayeredMap, + map: &mut LayeredMap, seqs: &[&[u8]], count: u32, ) { @@ -23,7 +24,7 @@ fn canonical(ascii: &[u8]) -> CanonicalKmer { fn create_empty_map() { set_k(4); let dir = tempdir().unwrap(); - let map = LayeredMap::create(dir.path()).unwrap(); + let map = LayeredMap::<()>::create(dir.path()).unwrap(); assert_eq!(map.n_layers(), 0); } @@ -32,10 +33,10 @@ fn open_reloads_layer_count() { set_k(4); let dir = tempdir().unwrap(); { - let mut map = LayeredMap::create(dir.path()).unwrap(); + let mut map = LayeredMap::::create(dir.path()).unwrap(); push_unitigs_and_layer(&mut map, &[b"AAAACGT"], 1); } - let map = LayeredMap::open(dir.path()).unwrap(); + let map = LayeredMap::::open(dir.path()).unwrap(); assert_eq!(map.n_layers(), 1); } @@ -43,41 +44,37 @@ fn open_reloads_layer_count() { fn query_finds_kmer_in_layer_zero() { set_k(4); let dir = tempdir().unwrap(); - let mut map = LayeredMap::create(dir.path()).unwrap(); + let mut map = LayeredMap::::create(dir.path()).unwrap(); push_unitigs_and_layer(&mut map, &[b"AAAACGT"], 3); let kmer = canonical(b"AAAC"); let (layer_idx, hit) = map.query(kmer).expect("kmer must be found"); assert_eq!(layer_idx, 0); - assert_eq!(hit.count, 3); + assert_eq!(hit.data, 3); } #[test] fn query_finds_kmer_in_correct_layer() { set_k(4); let dir = tempdir().unwrap(); - let mut map = LayeredMap::create(dir.path()).unwrap(); - // Layer 0: AAAACGT + let mut map = LayeredMap::::create(dir.path()).unwrap(); push_unitigs_and_layer(&mut map, &[b"AAAACGT"], 1); - // Layer 1: GGGACGT (no kmer overlap with layer 0 by construction) push_unitigs_and_layer(&mut map, &[b"GGGACGT"], 2); assert_eq!(map.n_layers(), 2); - // AAAA is in layer 0 let (li, hit) = map.query(canonical(b"AAAA")).expect("AAAA must be found"); assert_eq!(li, 0); - assert_eq!(hit.count, 1); + assert_eq!(hit.data, 1); - // GGGA is in layer 1 let (li, hit) = map.query(canonical(b"GGGA")).expect("GGGA must be found"); assert_eq!(li, 1); - assert_eq!(hit.count, 2); + assert_eq!(hit.data, 2); } #[test] fn query_absent_returns_none() { set_k(4); let dir = tempdir().unwrap(); - let mut map = LayeredMap::create(dir.path()).unwrap(); + let mut map = LayeredMap::::create(dir.path()).unwrap(); push_unitigs_and_layer(&mut map, &[b"AAAACGT"], 1); let absent = canonical(b"CCCC"); assert!(map.query(absent).is_none()); @@ -87,7 +84,7 @@ fn query_absent_returns_none() { fn push_layer_from_map_convenience() { set_k(4); let dir = tempdir().unwrap(); - let mut map = LayeredMap::create(dir.path()).unwrap(); + let mut map = LayeredMap::::create(dir.path()).unwrap(); let mut w = map.next_layer_writer().unwrap(); w.write(&Unitig::from_ascii(b"AAAACGT")).unwrap(); w.close().unwrap(); @@ -96,5 +93,5 @@ fn push_layer_from_map_convenience() { ].into_iter().collect(); map.push_layer_from_map(&counts).unwrap(); let (_, hit) = map.query(canonical(b"AAAA")).unwrap(); - assert_eq!(hit.count, 10); + assert_eq!(hit.data, 10); }