refactor(obilayeredmap): support generic payload types

Replace the hardcoded `Counts` module with a generic `LayerData` trait, parameterizing `Layer` and `LayeredMap` over arbitrary payload types. This decouples read-path access from build-path logic, enabling both set membership and count-based indexing via `PersistentCompactIntVec`. Adds the `obicompactvec` dependency, implements streaming layer construction, and expands test coverage for persistence and multi-layer resolution.
This commit is contained in:
Eric Coissac
2026-05-14 09:24:25 +08:00
parent 0b3fcf3cf0
commit f48f7500cd
9 changed files with 309 additions and 245 deletions
+78 -46
View File
@@ -10,52 +10,63 @@
The MPHF + evidence infrastructure is fixed for all modes. The **payload** — data associated with each slot — is orthogonal and varies by mode.
| Mode | Description | Payload |
|---|---|---|
| 1. Set | membership test only | none |
| 2. Set with count | occurrences per kmer per sample | compact integer vector |
| 3. Presence/absence matrix | which genomes contain each kmer | bit matrix n_kmers × n_genomes |
| 4. Count matrix | occurrences per kmer per genome | integer matrix n_kmers × n_genomes |
| Mode | Description | Payload type | File |
|---|---|---|---|
| 1. Set | membership test only | `()` | — |
| 2. Set with count | occurrences per kmer per sample | `PersistentCompactIntVec` | `counts.pciv` |
| 3. Presence/absence matrix | which genomes contain each kmer | `PersistentBitVec` per genome | `presence_N.pbiv` |
| 4. Count matrix | occurrences per kmer per genome | `PersistentCompactIntVec` per genome | `counts_N.pciv` |
Mode 4 is architecturally identical to mode 3 but with counts instead of bits; the main open question is scale — a naive dense representation reaches hundreds of GB for large genome collections, which may require a sparse encoding.
Both `PersistentCompactIntVec` and `PersistentBitVec` come from the `obicompactvec` crate. Modes 3 and 4 are not yet implemented; the per-genome multi-file layout and query API remain to be designed.
### Payload for mode 2: compact count vector
### Payload for mode 2: PersistentCompactIntVec
Counts follow a roughly geometric distribution: the large majority are below 128, almost all below 16 000, with rare large values in highly covered regions. Options for a mmap-compatible random-access count vector:
`PersistentCompactIntVec` (PCIV) stores one `u32` count per MPHF slot in a single mmap'd `.pciv` file. Its encoding: a primary `u8` array (value 255 = overflow sentinel) backed by a sorted overflow section of `(slot: u64, value: u32)` entries and a sparse L1-fitting index for fast binary search. This handles the geometric count distribution efficiently — most values fit in 1 byte, overflow entries are rare.
- **`Vec<u16>`** — 2 bytes/slot, covers 065 535, O(1) access, trivially mmap-able. Practical upper bound sufficient for most metagenomic use cases.
- **Bit-packed fixed width** — choose B = ⌈log₂(max\_count)⌉ globally (e.g. B=14 for 99.9% coverage at 1.75 bytes/slot). O(1) access via bit-shift arithmetic.
- **Block-varint (PForDelta, StreamVByte)** — good compression but random access requires a separate offset index; no mature Rust crate for mmap use.
Capacity: 0 to u32::MAX per slot. No separate decision needed on bit-width: PCIV adapts to the data.
Decision not yet made. `Vec<u16>` is the default baseline pending profiling.
### Payload for mode 3/4: PersistentBitVec / PersistentCompactIntVec
### Payload for mode 3: presence/absence matrix
`PersistentBitVec` (PBIV) stores one bit per MPHF slot in a mmap'd `.pbiv` file with u64 word-level bulk operations (AND, OR, XOR, NOT, POPCNT, Jaccard, Hamming). One PBIV per genome gives a column-major presence/absence matrix, making per-genome set operations cache-friendly.
Column-major bit matrix: column j (genome j) is a contiguous `n_slots`-bit array. This layout makes per-genome operations (union, intersection, diff) cache-friendly. For 10⁹ slots × 100 genomes ≈ 12.5 GB — large but mmap-able with no resident-memory cost at open time.
Mode 4 replaces PBIV with PCIV per genome. Multi-file layout and query API are not yet designed.
---
## Payload architecture
The payload is orthogonal to the MPHF + evidence layer. `Layer` will be parameterised by a payload type `D`:
The payload is orthogonal to the MPHF + evidence layer. `Layer` is parameterised by `D: LayerData`:
```rust
struct Layer<D = ()> {
pub trait LayerData: Sized {
type Item;
fn open(layer_dir: &Path) -> OLMResult<Self>;
fn read(&self, slot: usize) -> Self::Item;
}
pub struct Layer<D: LayerData = ()> {
mphf: Mphf,
evidence: Evidence,
unitigs: UnitigFileReader,
data: D,
}
pub struct Hit<T = ()> {
pub slot: usize,
pub data: T,
}
```
Where `D` implements a `LayerData` trait covering open/close/get. Concrete instances:
`LayerData` covers the **read path only** (`open` + `read`). The write path (build) is intentionally not in the trait — build signatures differ between modes (mode 1 takes no extra argument, mode 2 takes a `count_of` closure) and forcing this into a trait would require an associated `Context` type with no benefit over specialized `impl` blocks.
- `()` — mode 1
- `CountVec` — mode 2 (u16 or bit-packed)
- `BitMatrix` — mode 3
- `CountMatrix` — mode 4
Implemented concrete types:
This parameterisation is not yet implemented; the current code uses a fixed `Counts` field.
| Type | `Item` | Description |
|---|---|---|
| `()` | `()` | mode 1 — membership only |
| `PersistentCompactIntVec` | `u32` | mode 2 — per-slot count |
`LayeredMap` mirrors the same parameterisation: `LayeredMap<D: LayerData = ()>`.
---
@@ -70,8 +81,8 @@ index_root/ ← LayeredMap (collection)
unitigs.bin
unitigs.bin.idx
evidence.bin
counts.bin [mode 2 only]
presence.bin [mode 3/4 only]
counts.pciv [mode 2 only]
presence_N.pbiv [mode 3/4, one per genome — not yet implemented]
layer_1/
...
part_00001/
@@ -95,8 +106,7 @@ layer_N/
unitigs.bin — packed 2-bit nucleotide sequences (obiskio binary format)
unitigs.bin.idx — UIDX index: n_unitigs, n_kmers, seqls[], packed_offsets[]
evidence.bin — u32 per MPHF slot: (unitig_id: 25 | rank: 7)
counts.bin — [optional] u16 per MPHF slot (mode 2)
presence.bin — [optional] bit matrix n_slots × n_samples (mode 3/4)
counts.pciv — [mode 2] PersistentCompactIntVec: one u32 per slot
```
`unitigs.bin` is the packed-2-bit sequence file produced by `obiskio::UnitigFileWriter`. The companion `.idx` file stores: magic `UIDX`, `n_unitigs: u32`, `n_kmers: u64`, `seqls: [u8; n_unitigs]` (kmer count 1 per chunk), and `packed_offsets: [u32; n_unitigs + 1]` (byte offsets into `unitigs.bin`, sentinel-terminated). This gives O(1) random access to any unitig and the total kmer count without scanning the sequence file.
@@ -125,11 +135,11 @@ The MPHF per layer is configured as:
```rust
type Mphf = PtrHash<
u64, // key type: canonical kmer raw encoding
CubicEps, // bucket fn: balanced (2.4 bits/key, λ=3.5)
u64, // key type: canonical kmer raw encoding
CubicEps, // bucket fn: balanced (2.4 bits/key, λ=3.5)
CachelineEfVec<Vec<CachelineEf>>, // remap: 11.6 bits/entry vs 32 for Vec<u32>
Xx64, // hasher: XXH3-64 with seed, handles structured keys
Vec<u8>, // pilots
Xx64, // hasher: XXH3-64 with seed, handles structured keys
Vec<u8>, // pilots
>;
```
@@ -139,7 +149,30 @@ type Mphf = PtrHash<
**Remap — `CachelineEfVec`:** Elias-Fano variant packing 44 sorted 40-bit values per 64-byte cacheline (11.6 bits/value vs 32 for `Vec<u32>`). Already a transitive dependency of `ptr_hash`. One cacheline per query vs one u32 read; space win dominates for billion-scale key sets.
**Construction:** `new_from_par_iter` avoids materialising all keys as `Vec<u64>`. MPHF correctness is verified inline during the second pass (evidence/counts fill) using an n/8-byte bitset; any duplicate slot or out-of-bounds index returns `OLMError::Mphf`.
---
## Build path
The build path is not part of `LayerData`. Each mode exposes its own `impl Layer<D>::build` with the exact signature it needs. Two private module-level helpers avoid code duplication:
**`build_mphf(out_dir, n) -> OLMResult<Mphf>`**: first pass — opens `unitigs.bin`, iterates all canonical kmers in parallel via `new_from_par_iter`, stores `mphf.bin`. O(n).
**`build_second_pass(out_dir, n, mphf, fill_slot) -> OLMResult<()>`**: second pass — opens `unitigs.bin` again, fills `evidence.bin` and a compact n/8-byte seen-bitset (MPHF correctness check inline), calls `fill_slot(slot, kmer)` once per kmer for the mode-specific payload. O(n).
```rust
// mode 1
impl Layer<()> {
pub fn build(out_dir: &Path) -> OLMResult<usize>
}
// mode 2
impl Layer<PersistentCompactIntVec> {
pub fn build(out_dir: &Path, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize>
pub fn build_from_map(out_dir: &Path, counts: &HashMap<CanonicalKmer, u32>) -> OLMResult<usize>
}
```
Any duplicate slot or out-of-bounds index detected during `build_second_pass` returns `OLMError::Mphf`. `new_from_par_iter` avoids materialising all keys as `Vec<u64>`.
---
@@ -148,20 +181,20 @@ type Mphf = PtrHash<
A kmer query routes through all three levels:
1. **Partition routing**: hash canonical minimiser of the query kmer → partition index → open `part_XXXXX/`.
2. **Layer probing**: iterate layers in order within the partition; for each layer compute `slot = mphf.index(kmer)`, then verify `evidence.decode(slot) == kmer`. First match wins.
3. **Data access**: read payload at `slot` from the matching layer (counts, presence row, etc.).
2. **Layer probing**: iterate layers in order; for each layer compute `slot = mphf.index(kmer)`, decode evidence, compare to query. First match wins.
3. **Data access**: `layer.data.read(slot)` returns `D::Item`.
```
fn query(kmer) → Option<Hit>:
part = partition_of(kmer)
for (i, layer) in part.layers.iter().enumerate():
```rust
// pseudo-code
fn query(kmer) -> Option<(usize, Hit<D::Item>)>:
for (i, layer) in self.layers.iter().enumerate():
slot = layer.mphf.index(&kmer.raw())
if layer.evidence.decode(slot) == kmer:
return Some(Hit { layer: i, slot })
return Some((i, Hit { slot, data: layer.data.read(slot) }))
return None
```
Expected probe depth: 1 for kmers in layer 0, increasing for later layers. In practice the dominant dataset should be layer 0.
Expected probe depth: 1 for kmers in layer 0, increasing for later layers.
---
@@ -188,16 +221,15 @@ Each partition's new layer is built independently; the operation is fully parall
| `epserde 0.8` | zero-copy serialisation of MPHF |
| `memmap2` | mmap of layer files |
| `obiskio` | unitig file writer/reader |
Flat arrays (evidence, counts, presence) use a simple custom binary format (raw bytes, fixed element size) opened via mmap — no additional serialisation crate required.
| `obicompactvec` | payload types: `PersistentCompactIntVec`, `PersistentBitVec` |
---
## Open questions
- **Mode 4 scale**: count matrix (n_kmers × n_genomes × bytes_per_count) reaches hundreds of GB for large collections. A sparse representation (store only non-zero entries) may be required; the access pattern and density threshold are not yet defined.
- **Count vector encoding (mode 2)**: `Vec<u16>` vs bit-packed (B=14). Decision pending; depends on whether counts > 65 535 occur in practice for the target datasets.
- **Presence matrix layout**: column-major favours per-genome operations; row-major favours per-kmer queries. Decide based on dominant access pattern.
- **Layer merge**: merging two `LayeredMap` instances into a single-layer index requires full rebuild. Define API and cost model; maintenance operation, not query-path.
- **Mode 3/4 multi-file layout**: one PBIV/PCIV per genome per layer means O(n_layers × n_genomes) files. Directory layout, open strategy, and query API are not yet designed.
- **Mode 4 scale**: count matrix (n_kmers × n_genomes × bytes_per_count) reaches hundreds of GB for large collections. A sparse representation may be required; access pattern and density threshold are not yet defined.
- **Presence matrix layout**: column-major (one PBIV per genome) favours per-genome operations; row-major favours per-kmer queries. Dominant access pattern not yet characterised.
- **Layer merge**: merging two `LayeredMap` instances into a single-layer index requires full rebuild. Define API and cost model.
- **Canonical kmer orientation**: evidence stores canonical kmer; strand recovery requires one 64-bit revcomp comparison at query time.
- **`try_new_from_par_iter`**: `ptr_hash::new_from_par_iter` silently discards construction failure. Post-construction verification (current workaround) is correct but does not allow retry. A `try_new_from_par_iter` PR upstream would close this gap.
+1
View File
@@ -1752,6 +1752,7 @@ dependencies = [
"cacheline-ef",
"epserde 0.8.0",
"memmap2",
"obicompactvec",
"obikseq",
"obiskio",
"ptr_hash",
+3 -2
View File
@@ -4,8 +4,9 @@ version = "0.1.0"
edition = "2024"
[dependencies]
obikseq = { path = "../obikseq" }
obiskio = { path = "../obiskio" }
obikseq = { path = "../obikseq" }
obiskio = { path = "../obiskio" }
obicompactvec = { path = "../obicompactvec" }
ptr_hash = "1.1"
cacheline-ef = "1.1"
epserde = "0.8"
-54
View File
@@ -1,54 +0,0 @@
// u32 per MPHF slot: raw occurrence count for the kmer at that slot.
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::Path;
use memmap2::Mmap;
use crate::error::{OLMError, OLMResult};
pub struct Counts {
mmap: Mmap,
}
impl Counts {
pub fn open(path: &Path) -> OLMResult<Self> {
let f = File::open(path)?;
let mmap = unsafe { Mmap::map(&f)? };
Ok(Self { mmap })
}
#[inline]
pub fn get(&self, slot: usize) -> u32 {
let off = slot * 4;
u32::from_le_bytes(self.mmap[off..off + 4].try_into().unwrap())
}
pub fn len(&self) -> usize {
self.mmap.len() / 4
}
}
pub struct CountsWriter {
buf: Vec<u32>,
}
impl CountsWriter {
pub fn new(n_slots: usize) -> Self {
Self { buf: vec![0u32; n_slots] }
}
#[inline]
pub fn set(&mut self, slot: usize, count: u32) {
self.buf[slot] = count;
}
pub fn write(self, path: &Path) -> OLMResult<()> {
let mut f = BufWriter::new(File::create(path)?);
for v in self.buf {
f.write_all(&v.to_le_bytes()).map_err(OLMError::Io)?;
}
Ok(())
}
}
+143 -88
View File
@@ -4,130 +4,185 @@ use std::path::Path;
use cacheline_ef::{CachelineEf, CachelineEfVec};
use epserde::prelude::*;
use obicompactvec::{PersistentCompactIntVec, PersistentCompactIntVecBuilder};
use obikseq::CanonicalKmer;
use obiskio::{UnitigFileReader, UnitigFileWriter};
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
use crate::counts::{Counts, CountsWriter};
use crate::error::{OLMError, OLMResult};
use crate::evidence::{Evidence, EvidenceWriter};
const MPHF_FILE: &str = "mphf.bin";
const UNITIGS_FILE: &str = "unitigs.bin";
pub(crate) const MPHF_FILE: &str = "mphf.bin";
pub(crate) const UNITIGS_FILE: &str = "unitigs.bin";
const EVIDENCE_FILE: &str = "evidence.bin";
const COUNTS_FILE: &str = "counts.bin";
const COUNTS_FILE: &str = "counts.pciv";
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
pub struct Layer {
mphf: Mphf,
// ── Trait ─────────────────────────────────────────────────────────────────────
pub trait LayerData: Sized {
type Item;
fn open(layer_dir: &Path) -> OLMResult<Self>;
fn read(&self, slot: usize) -> Self::Item;
}
impl LayerData for () {
type Item = ();
fn open(_layer_dir: &Path) -> OLMResult<Self> { Ok(()) }
fn read(&self, _slot: usize) {}
}
impl LayerData for PersistentCompactIntVec {
type Item = u32;
fn open(layer_dir: &Path) -> OLMResult<Self> {
PersistentCompactIntVec::open(&layer_dir.join(COUNTS_FILE)).map_err(OLMError::Io)
}
fn read(&self, slot: usize) -> u32 { self.get(slot) }
}
// ── Structures ────────────────────────────────────────────────────────────────
pub struct Layer<D: LayerData = ()> {
mphf: Mphf,
evidence: Evidence,
unitigs: UnitigFileReader,
counts: Counts,
unitigs: UnitigFileReader,
data: D,
}
pub struct Hit {
pub struct Hit<T = ()> {
pub slot: usize,
pub count: u32,
pub data: T,
}
impl Layer {
// ── Common read path ──────────────────────────────────────────────────────────
impl<D: LayerData> Layer<D> {
pub fn open(path: &Path) -> OLMResult<Self> {
let mphf: Mphf = Mphf::load_full(&path.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
let unitigs = UnitigFileReader::open(&path.join(UNITIGS_FILE))?;
let unitigs = UnitigFileReader::open(&path.join(UNITIGS_FILE))?;
let evidence = Evidence::open(&path.join(EVIDENCE_FILE))?;
let counts = Counts::open(&path.join(COUNTS_FILE))?;
Ok(Self { mphf, evidence, unitigs, counts })
let data = D::open(path)?;
Ok(Self { mphf, evidence, unitigs, data })
}
pub fn query(&self, kmer: CanonicalKmer) -> Option<Hit> {
pub fn query(&self, kmer: CanonicalKmer) -> Option<Hit<D::Item>> {
let slot = self.mphf.index(&kmer.raw());
let (chunk_id, rank) = self.evidence.decode(slot);
if self
.unitigs
.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer)
{
Some(Hit { slot, count: self.counts.get(slot) })
if self.unitigs.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer) {
Some(Hit { slot, data: self.data.read(slot) })
} else {
None
}
}
/// Build a layer from unitigs already written to `out_dir/unitigs.bin`.
///
/// `count_of` maps each canonical kmer to its occurrence count.
/// Returns the number of kmers indexed.
pub fn build(out_dir: &Path, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> {
use rayon::prelude::*;
let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?;
let n = unitigs.n_kmers();
if n == 0 {
fs::File::create(out_dir.join(EVIDENCE_FILE))?;
fs::File::create(out_dir.join(COUNTS_FILE))?;
let mphf: Mphf = Mphf::try_new(&[] as &[u64], PtrHashParams::<CubicEps>::default())
.ok_or_else(|| OLMError::Mphf("construction failed".into()))?;
mphf.store(&out_dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
return Ok(0);
}
// First pass: build the MPHF from a cloneable parallel iterator.
// flat_map_iter: outer chunks in parallel, inner kmer sliding-window sequential.
let keys = (0..unitigs.len())
.into_par_iter()
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
let mphf: Mphf = Mphf::new_from_par_iter(n, keys, PtrHashParams::<CubicEps>::default());
mphf.store(&out_dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
// Second pass: fill evidence and counts; verify MPHF correctness inline.
// seen is a compact bitset (n/8 bytes) — no extra iteration needed.
let mut ev = EvidenceWriter::new(n);
let mut cnt = CountsWriter::new(n);
let mut seen = vec![0u8; (n + 7) / 8];
for (kmer, chunk_id, rank) in unitigs.iter_indexed_canonical_kmers() {
let slot = mphf.index(&kmer.raw());
if slot >= n {
return Err(OLMError::Mphf("MPHF construction failed: slot out of bounds".into()));
}
let byte = slot / 8;
let bit = 1u8 << (slot % 8);
if seen[byte] & bit != 0 {
return Err(OLMError::Mphf("MPHF construction failed: duplicate slot".into()));
}
seen[byte] |= bit;
ev.set(slot, chunk_id as u32, rank as u8);
cnt.set(slot, count_of(kmer));
}
ev.write(&out_dir.join(EVIDENCE_FILE))?;
cnt.write(&out_dir.join(COUNTS_FILE))?;
Ok(n)
}
/// Convenience variant of `build` that accepts a `HashMap`.
pub fn build_from_map(
out_dir: &Path,
counts: &HashMap<CanonicalKmer, u32>,
) -> OLMResult<usize> {
Self::build(out_dir, |kmer| counts.get(&kmer).copied().unwrap_or(0))
}
/// Return a `UnitigFileWriter` targeting this layer's `unitigs.bin`.
/// The caller writes unitigs, then calls `Layer::build` to finish the layer.
pub fn unitig_writer(out_dir: &Path) -> OLMResult<UnitigFileWriter> {
fs::create_dir_all(out_dir)?;
Ok(UnitigFileWriter::create(&out_dir.join(UNITIGS_FILE))?)
}
}
// ── Build helpers (private) ───────────────────────────────────────────────────
fn build_mphf(out_dir: &Path, n: usize) -> OLMResult<Mphf> {
use rayon::prelude::*;
let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?;
let keys = (0..unitigs.len())
.into_par_iter()
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
let mphf: Mphf = Mphf::new_from_par_iter(n, keys, PtrHashParams::<CubicEps>::default());
mphf.store(&out_dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
Ok(mphf)
}
fn build_second_pass(
out_dir: &Path,
n: usize,
mphf: &Mphf,
fill_slot: &mut impl FnMut(usize, CanonicalKmer) -> OLMResult<()>,
) -> OLMResult<()> {
let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?;
let mut ev = EvidenceWriter::new(n);
let mut seen = vec![0u8; (n + 7) / 8];
for (kmer, chunk_id, rank) in unitigs.iter_indexed_canonical_kmers() {
let slot = mphf.index(&kmer.raw());
if slot >= n {
return Err(OLMError::Mphf("slot out of bounds".into()));
}
let byte = slot / 8;
let bit = 1u8 << (slot % 8);
if seen[byte] & bit != 0 {
return Err(OLMError::Mphf("duplicate slot".into()));
}
seen[byte] |= bit;
ev.set(slot, chunk_id as u32, rank as u8);
fill_slot(slot, kmer)?;
}
ev.write(&out_dir.join(EVIDENCE_FILE))?;
Ok(())
}
fn empty_layer(out_dir: &Path) -> OLMResult<()> {
fs::File::create(out_dir.join(EVIDENCE_FILE))?;
let mphf: Mphf = Mphf::try_new(&[] as &[u64], PtrHashParams::<CubicEps>::default())
.ok_or_else(|| OLMError::Mphf("construction failed".into()))?;
mphf.store(&out_dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
Ok(())
}
// ── Mode 1 — set membership ───────────────────────────────────────────────────
impl Layer<()> {
pub fn build(out_dir: &Path) -> OLMResult<usize> {
let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?;
let n = unitigs.n_kmers();
if n == 0 {
empty_layer(out_dir)?;
return Ok(0);
}
let mphf = build_mphf(out_dir, n)?;
build_second_pass(out_dir, n, &mphf, &mut |_, _| Ok(()))?;
Ok(n)
}
}
// ── Mode 2 — counts (PersistentCompactIntVec) ─────────────────────────────────
impl Layer<PersistentCompactIntVec> {
pub fn build(out_dir: &Path, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> {
let unitigs = UnitigFileReader::open(&out_dir.join(UNITIGS_FILE))?;
let n = unitigs.n_kmers();
if n == 0 {
empty_layer(out_dir)?;
PersistentCompactIntVecBuilder::new(0, &out_dir.join(COUNTS_FILE))
.and_then(|b| b.close())
.map_err(OLMError::Io)?;
return Ok(0);
}
let mphf = build_mphf(out_dir, n)?;
let mut cnt = PersistentCompactIntVecBuilder::new(n, &out_dir.join(COUNTS_FILE))
.map_err(OLMError::Io)?;
build_second_pass(out_dir, n, &mphf, &mut |slot, kmer| {
cnt.set(slot, count_of(kmer));
Ok(())
})?;
cnt.close().map_err(OLMError::Io)?;
Ok(n)
}
pub fn build_from_map(
out_dir: &Path,
counts: &HashMap<CanonicalKmer, u32>,
) -> OLMResult<usize> {
Self::build(out_dir, |kmer| counts.get(&kmer).copied().unwrap_or(0))
}
}
#[cfg(test)]
#[path = "tests/layer.rs"]
mod tests;
+1 -2
View File
@@ -1,4 +1,3 @@
pub mod counts;
pub mod error;
pub mod evidence;
pub mod layer;
@@ -6,5 +5,5 @@ pub mod map;
pub mod meta;
pub use error::{OLMError, OLMResult};
pub use layer::{Hit, Layer};
pub use layer::{Hit, Layer, LayerData};
pub use map::LayeredMap;
+56 -26
View File
@@ -2,11 +2,12 @@ use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use obicompactvec::PersistentCompactIntVec;
use obikseq::CanonicalKmer;
use obiskio::UnitigFileWriter;
use crate::error::OLMResult;
use crate::layer::{Hit, Layer};
use crate::layer::{Hit, Layer, LayerData};
use crate::meta::PartitionMeta;
/// Layered kmer index for a single partition.
@@ -14,20 +15,26 @@ use crate::meta::PartitionMeta;
/// Each layer covers a disjoint kmer set. Queries probe layers in order;
/// the first match wins. Adding a dataset appends a new layer without
/// rebuilding existing ones.
pub struct LayeredMap {
root: PathBuf,
meta: PartitionMeta,
layers: Vec<Layer>,
pub struct LayeredMap<D: LayerData = ()> {
root: PathBuf,
meta: PartitionMeta,
layers: Vec<Layer<D>>,
}
impl LayeredMap {
// ── Common methods ────────────────────────────────────────────────────────────
impl<D: LayerData> LayeredMap<D> {
/// Open an existing layered index at `root`.
pub fn open(root: &Path) -> OLMResult<Self> {
let meta = PartitionMeta::load(root)?;
let layers = (0..meta.n_layers)
.map(|i| Layer::open(&layer_dir(root, i)))
.map(|i| Layer::<D>::open(&layer_dir(root, i)))
.collect::<OLMResult<Vec<_>>>()?;
Ok(Self { root: root.to_owned(), meta, layers })
Ok(Self {
root: root.to_owned(),
meta,
layers,
})
}
/// Create a new, empty layered index at `root`.
@@ -35,48 +42,71 @@ impl LayeredMap {
fs::create_dir_all(root)?;
let meta = PartitionMeta::new();
meta.save(root)?;
Ok(Self { root: root.to_owned(), meta, layers: Vec::new() })
Ok(Self {
root: root.to_owned(),
meta,
layers: Vec::new(),
})
}
/// Return the number of layers in this index.
pub fn n_layers(&self) -> usize {
self.layers.len()
}
pub fn layer(&self, i: usize) -> &Layer {
/// Return a reference to the `i`-th layer.
pub fn layer(&self, i: usize) -> &Layer<D> {
&self.layers[i]
}
/// Query `kmer` across all layers. Returns `(layer_index, Hit)` on match.
pub fn query(&self, kmer: CanonicalKmer) -> Option<(usize, Hit)> {
self.layers.iter().enumerate().find_map(|(i, layer)| {
layer.query(kmer).map(|hit| (i, hit))
})
pub fn query(&self, kmer: CanonicalKmer) -> Option<(usize, Hit<D::Item>)> {
self.layers
.iter()
.enumerate()
.find_map(|(i, layer)| layer.query(kmer).map(|hit| (i, hit)))
}
/// Return a `UnitigFileWriter` for the next layer to be built.
/// The caller writes unitigs, calls `.close()` on the writer,
/// then calls `push_layer` to finish.
pub fn next_layer_writer(&self) -> OLMResult<UnitigFileWriter> {
let dir = layer_dir(&self.root, self.layers.len());
Layer::unitig_writer(&dir)
Layer::<D>::unitig_writer(&dir)
}
/// Build and append the next layer from a count closure.
/// Unitigs must already have been written via `next_layer_writer`.
pub fn push_layer(
&mut self,
count_of: impl Fn(CanonicalKmer) -> u32,
) -> OLMResult<usize> {
/// Append a new layer to the index.
fn append_layer(&mut self) -> OLMResult<()> {
let i = self.layers.len();
let dir = layer_dir(&self.root, i);
Layer::build(&dir, count_of)?;
self.layers.push(Layer::open(&dir)?);
self.layers.push(Layer::<D>::open(&dir)?);
self.meta.n_layers = self.layers.len();
self.meta.save(&self.root)?;
Ok(())
}
}
// ── Mode 1 — set membership ───────────────────────────────────────────────────
impl LayeredMap<()> {
pub fn push_layer(&mut self) -> OLMResult<usize> {
let i = self.layers.len();
let dir = layer_dir(&self.root, i);
Layer::<()>::build(&dir)?;
self.append_layer()?;
Ok(i)
}
}
// ── Mode 2 — counts ───────────────────────────────────────────────────────────
impl LayeredMap<PersistentCompactIntVec> {
pub fn push_layer(&mut self, count_of: impl Fn(CanonicalKmer) -> u32) -> OLMResult<usize> {
let i = self.layers.len();
let dir = layer_dir(&self.root, i);
Layer::<PersistentCompactIntVec>::build(&dir, count_of)?;
self.append_layer()?;
Ok(i)
}
/// Convenience variant of `push_layer` that accepts a `HashMap`.
pub fn push_layer_from_map(
&mut self,
counts: &HashMap<CanonicalKmer, u32>,
+14 -11
View File
@@ -1,4 +1,5 @@
use super::*;
use obicompactvec::PersistentCompactIntVec;
use obikseq::{set_k, Kmer, Sequence as _, Unitig};
use tempfile::tempdir;
@@ -28,8 +29,8 @@ fn build_and_query_all_kmers_found() {
let dir = tempdir().unwrap();
write_unitigs(dir.path(), &[b"AAAACGT"]);
let kmers = all_canonical_kmers(dir.path(), 4);
Layer::build(dir.path(), |_| 1).unwrap();
let layer = Layer::open(dir.path()).unwrap();
Layer::<()>::build(dir.path()).unwrap();
let layer = Layer::<()>::open(dir.path()).unwrap();
for kmer in kmers {
assert!(layer.query(kmer).is_some(), "kmer should be present");
}
@@ -43,11 +44,14 @@ fn counts_are_stored_and_retrieved() {
let kmers = all_canonical_kmers(dir.path(), 4);
let count_map: HashMap<CanonicalKmer, u32> =
kmers.iter().enumerate().map(|(i, &k)| (k, i as u32 + 1)).collect();
Layer::build(dir.path(), |kmer| count_map.get(&kmer).copied().unwrap_or(0)).unwrap();
let layer = Layer::open(dir.path()).unwrap();
Layer::<PersistentCompactIntVec>::build(
dir.path(),
|kmer| count_map.get(&kmer).copied().unwrap_or(0),
).unwrap();
let layer = Layer::<PersistentCompactIntVec>::open(dir.path()).unwrap();
for kmer in &kmers {
let hit = layer.query(*kmer).expect("kmer must be present");
assert_eq!(hit.count, count_map[kmer]);
assert_eq!(hit.data, count_map[kmer]);
}
}
@@ -56,8 +60,8 @@ fn query_absent_returns_none() {
set_k(4);
let dir = tempdir().unwrap();
write_unitigs(dir.path(), &[b"AAAACGT"]);
Layer::build(dir.path(), |_| 1).unwrap();
let layer = Layer::open(dir.path()).unwrap();
Layer::<()>::build(dir.path()).unwrap();
let layer = Layer::<()>::open(dir.path()).unwrap();
let absent = Kmer::from_ascii(b"CCCC").unwrap().canonical();
assert!(layer.query(absent).is_none());
}
@@ -66,12 +70,11 @@ fn query_absent_returns_none() {
fn open_after_build_is_consistent() {
set_k(4);
let dir = tempdir().unwrap();
// "AAAACGT": 7 nucl → 4 kmers, all with distinct canonical forms
write_unitigs(dir.path(), &[b"AAAACGT"]);
let n = Layer::build(dir.path(), |_| 7).unwrap();
let n = Layer::<PersistentCompactIntVec>::build(dir.path(), |_| 7).unwrap();
assert_eq!(n, 4);
let layer = Layer::open(dir.path()).unwrap();
let layer = Layer::<PersistentCompactIntVec>::open(dir.path()).unwrap();
let kmer = Kmer::from_ascii(b"AAAA").unwrap().canonical();
let hit = layer.query(kmer).expect("AAAA must be present");
assert_eq!(hit.count, 7);
assert_eq!(hit.data, 7);
}
+13 -16
View File
@@ -1,9 +1,10 @@
use super::*;
use obicompactvec::PersistentCompactIntVec;
use obikseq::{set_k, Sequence as _, Unitig};
use tempfile::tempdir;
fn push_unitigs_and_layer(
map: &mut LayeredMap,
map: &mut LayeredMap<PersistentCompactIntVec>,
seqs: &[&[u8]],
count: u32,
) {
@@ -23,7 +24,7 @@ fn canonical(ascii: &[u8]) -> CanonicalKmer {
fn create_empty_map() {
set_k(4);
let dir = tempdir().unwrap();
let map = LayeredMap::create(dir.path()).unwrap();
let map = LayeredMap::<()>::create(dir.path()).unwrap();
assert_eq!(map.n_layers(), 0);
}
@@ -32,10 +33,10 @@ fn open_reloads_layer_count() {
set_k(4);
let dir = tempdir().unwrap();
{
let mut map = LayeredMap::create(dir.path()).unwrap();
let mut map = LayeredMap::<PersistentCompactIntVec>::create(dir.path()).unwrap();
push_unitigs_and_layer(&mut map, &[b"AAAACGT"], 1);
}
let map = LayeredMap::open(dir.path()).unwrap();
let map = LayeredMap::<PersistentCompactIntVec>::open(dir.path()).unwrap();
assert_eq!(map.n_layers(), 1);
}
@@ -43,41 +44,37 @@ fn open_reloads_layer_count() {
fn query_finds_kmer_in_layer_zero() {
set_k(4);
let dir = tempdir().unwrap();
let mut map = LayeredMap::create(dir.path()).unwrap();
let mut map = LayeredMap::<PersistentCompactIntVec>::create(dir.path()).unwrap();
push_unitigs_and_layer(&mut map, &[b"AAAACGT"], 3);
let kmer = canonical(b"AAAC");
let (layer_idx, hit) = map.query(kmer).expect("kmer must be found");
assert_eq!(layer_idx, 0);
assert_eq!(hit.count, 3);
assert_eq!(hit.data, 3);
}
#[test]
fn query_finds_kmer_in_correct_layer() {
set_k(4);
let dir = tempdir().unwrap();
let mut map = LayeredMap::create(dir.path()).unwrap();
// Layer 0: AAAACGT
let mut map = LayeredMap::<PersistentCompactIntVec>::create(dir.path()).unwrap();
push_unitigs_and_layer(&mut map, &[b"AAAACGT"], 1);
// Layer 1: GGGACGT (no kmer overlap with layer 0 by construction)
push_unitigs_and_layer(&mut map, &[b"GGGACGT"], 2);
assert_eq!(map.n_layers(), 2);
// AAAA is in layer 0
let (li, hit) = map.query(canonical(b"AAAA")).expect("AAAA must be found");
assert_eq!(li, 0);
assert_eq!(hit.count, 1);
assert_eq!(hit.data, 1);
// GGGA is in layer 1
let (li, hit) = map.query(canonical(b"GGGA")).expect("GGGA must be found");
assert_eq!(li, 1);
assert_eq!(hit.count, 2);
assert_eq!(hit.data, 2);
}
#[test]
fn query_absent_returns_none() {
set_k(4);
let dir = tempdir().unwrap();
let mut map = LayeredMap::create(dir.path()).unwrap();
let mut map = LayeredMap::<PersistentCompactIntVec>::create(dir.path()).unwrap();
push_unitigs_and_layer(&mut map, &[b"AAAACGT"], 1);
let absent = canonical(b"CCCC");
assert!(map.query(absent).is_none());
@@ -87,7 +84,7 @@ fn query_absent_returns_none() {
fn push_layer_from_map_convenience() {
set_k(4);
let dir = tempdir().unwrap();
let mut map = LayeredMap::create(dir.path()).unwrap();
let mut map = LayeredMap::<PersistentCompactIntVec>::create(dir.path()).unwrap();
let mut w = map.next_layer_writer().unwrap();
w.write(&Unitig::from_ascii(b"AAAACGT")).unwrap();
w.close().unwrap();
@@ -96,5 +93,5 @@ fn push_layer_from_map_convenience() {
].into_iter().collect();
map.push_layer_from_map(&counts).unwrap();
let (_, hit) = map.query(canonical(b"AAAA")).unwrap();
assert_eq!(hit.count, 10);
assert_eq!(hit.data, 10);
}