Add persistent compact integer vector and cache-line-optimized MPHF

Introduce the `obicompactvec` crate, featuring a two-tier, memory-mapped integer vector that uses a primary `u8` array with a sentinel for overflow dispatch and a sparse L1-resident index for fast random access. Implement builder and reader modules with zero-copy serialization and comprehensive test coverage. Update `obilayeredmap` to replace the default hash function with a cache-line-optimized `Mphf`, adding explicit bounds checking and duplicate-slot detection. Add documentation for both modules and update project configuration files accordingly.
This commit is contained in:
Eric Coissac
2026-05-13 06:24:43 +08:00
parent 84ed752b78
commit f2de79acde
14 changed files with 710 additions and 91 deletions
+4
View File
@@ -0,0 +1,4 @@
## Dans OBILayeredMap
- Est-ce que CachelineEfVec est vraiment justifier dans notre cas. vu les contraintes sur la distribution des valeurs imposées par CachelineEfVec en terme d'ordre, de grandeur et de dispersion ?
- Il semble que le count de kmer soit stocké, ce qui doit-être une possibilité pas une obligation.
+97 -62
View File
@@ -6,6 +6,59 @@
---
## Four usage modes
The MPHF + evidence infrastructure is fixed for all modes. The **payload** — data associated with each slot — is orthogonal and varies by mode.
| Mode | Description | Payload |
|---|---|---|
| 1. Set | membership test only | none |
| 2. Set with count | occurrences per kmer per sample | compact integer vector |
| 3. Presence/absence matrix | which genomes contain each kmer | bit matrix n_kmers × n_genomes |
| 4. Count matrix | occurrences per kmer per genome | integer matrix n_kmers × n_genomes |
Mode 4 is architecturally identical to mode 3 but with counts instead of bits; the main open question is scale — a naive dense representation reaches hundreds of GB for large genome collections, which may require a sparse encoding.
### Payload for mode 2: compact count vector
Counts follow a roughly geometric distribution: the large majority are below 128, almost all below 16 000, with rare large values in highly covered regions. Options for a mmap-compatible random-access count vector:
- **`Vec<u16>`** — 2 bytes/slot, covers 065 535, O(1) access, trivially mmap-able. Practical upper bound sufficient for most metagenomic use cases.
- **Bit-packed fixed width** — choose B = ⌈log₂(max\_count)⌉ globally (e.g. B=14 for 99.9% coverage at 1.75 bytes/slot). O(1) access via bit-shift arithmetic.
- **Block-varint (PForDelta, StreamVByte)** — good compression but random access requires a separate offset index; no mature Rust crate for mmap use.
Decision not yet made. `Vec<u16>` is the default baseline pending profiling.
### Payload for mode 3: presence/absence matrix
Column-major bit matrix: column j (genome j) is a contiguous `n_slots`-bit array. This layout makes per-genome operations (union, intersection, diff) cache-friendly. For 10⁹ slots × 100 genomes ≈ 12.5 GB — large but mmap-able with no resident-memory cost at open time.
---
## Payload architecture
The payload is orthogonal to the MPHF + evidence layer. `Layer` will be parameterised by a payload type `D`:
```rust
struct Layer<D = ()> {
mphf: Mphf,
evidence: Evidence,
unitigs: UnitigFileReader,
data: D,
}
```
Where `D` implements a `LayerData` trait covering open/close/get. Concrete instances:
- `()` — mode 1
- `CountVec` — mode 2 (u16 or bit-packed)
- `BitMatrix` — mode 3
- `CountMatrix` — mode 4
This parameterisation is not yet implemented; the current code uses a fixed `Counts` field.
---
## Three-level hierarchy
```
@@ -15,14 +68,14 @@ index_root/ ← LayeredMap (collection)
layer_0/ ← Layer
mphf.bin
unitigs.bin
unitigs.bin.idx
evidence.bin
counts.bin
presence.bin
counts.bin [mode 2 only]
presence.bin [mode 3/4 only]
layer_1/
...
part_00001/
layer_0/
layer_1/
...
```
@@ -38,19 +91,15 @@ index_root/ ← LayeredMap (collection)
```
layer_N/
mphf.bin — ptr_hash MPHF (exact key count, phase-2 construction)
unitigs.bin — packed 2-bit nucleotide sequences (concatenated, variable-length)
unitig_offsets.bin — u32 per unitig: nucleotide offset of unitig j in unitigs.bin
mphf.bin — ptr_hash MPHF (epserde, ptr_hash native format)
unitigs.bin — packed 2-bit nucleotide sequences (obiskio binary format)
unitigs.bin.idx — UIDX index: n_unitigs, n_kmers, seqls[], packed_offsets[]
evidence.bin — u32 per MPHF slot: (unitig_id: 25 | rank: 7)
counts.bin — u32 per MPHF slot (total kmer occurrences)
presence.bin — bit matrix: n_slots × n_samples [optional]
counts.bin — [optional] u16 per MPHF slot (mode 2)
presence.bin — [optional] bit matrix n_slots × n_samples (mode 3/4)
```
Unitigs have variable lengths. Each record in `unitigs.bin` is self-delimiting: it begins with a varint `seql` (sequence length in nucleotides) followed by `(seql+3)/4` packed bytes — the streaming format defined in `obiskio`. Sequential scan is always possible using the in-record `seql`.
For O(1) random access, `unitig_offsets.bin` is a **precomputed derived index**: a u32 array of byte offsets into `unitigs.bin`, with n_unitigs + 1 entries (sentinel = total byte size). Built once at construction by a single sequential scan; reconstructible from `unitigs.bin` if lost. Access: `unitigs.bin[offsets[j] .. offsets[j+1]]`.
All files except `mphf.bin` are flat arrays of fixed-size elements, serialised with **rkyv** for zero-copy mmap access. `mphf.bin` uses ptr_hash's native serialisation; rkyv integration is deferred (see open questions).
`unitigs.bin` is the packed-2-bit sequence file produced by `obiskio::UnitigFileWriter`. The companion `.idx` file stores: magic `UIDX`, `n_unitigs: u32`, `n_kmers: u64`, `seqls: [u8; n_unitigs]` (kmer count 1 per chunk), and `packed_offsets: [u32; n_unitigs + 1]` (byte offsets into `unitigs.bin`, sentinel-terminated). This gives O(1) random access to any unitig and the total kmer count without scanning the sequence file.
### Evidence encoding
@@ -68,9 +117,29 @@ kmer = unitigs[unitig_id][rank .. rank + k] // 2-bit packed slice
`rank` is the kmer's 0-based index within the unitig (kmer units, not nucleotides). For k=31, m=11, the structural maximum is k m + 1 = 21 kmers per unitig; the empirical maximum observed is ~46 kmers. A `u7` (0127) is sufficient.
### Presence/absence matrix
---
Column-major bit matrix: column j (sample j) is a contiguous `n_slots`-bit array. This layout makes per-sample operations (union, intersection, diff over a column) cache-friendly. For large matrices (e.g. 10 M slots × 1 000 samples ≈ 1.25 GB per partition), rkyv + mmap avoids loading the full matrix at open time.
## ptr_hash configuration
The MPHF per layer is configured as:
```rust
type Mphf = PtrHash<
u64, // key type: canonical kmer raw encoding
CubicEps, // bucket fn: balanced (2.4 bits/key, λ=3.5)
CachelineEfVec<Vec<CachelineEf>>, // remap: 11.6 bits/entry vs 32 for Vec<u32>
Xx64, // hasher: XXH3-64 with seed, handles structured keys
Vec<u8>, // pilots
>;
```
**Hasher choice — `Xx64`:** k-mer raw values are left-aligned u64 with structural zeros in low bits (42 zeros for k=11, 2 zeros for k=31). `FxHash` (single multiply) distributes these poorly. `Xx64` (XXH3 64-bit, seeded) handles structured input correctly.
**Bucket function — `CubicEps` with `PtrHashParams::<CubicEps>::default()`:** λ=3.5, α=0.99. Balanced tradeoff: 2× slower construction than `Linear/λ=3.0` (the `default_fast` preset), 20% less space. `default_compact` (λ=4.0) saves a further 12.5% at 2× more construction time and reduced reliability — not chosen.
**Remap — `CachelineEfVec`:** Elias-Fano variant packing 44 sorted 40-bit values per 64-byte cacheline (11.6 bits/value vs 32 for `Vec<u32>`). Already a transitive dependency of `ptr_hash`. One cacheline per query vs one u32 read; space win dominates for billion-scale key sets.
**Construction:** `new_from_par_iter` avoids materialising all keys as `Vec<u64>`. MPHF correctness is verified inline during the second pass (evidence/counts fill) using an n/8-byte bitset; any duplicate slot or out-of-bounds index returns `OLMError::Mphf`.
---
@@ -79,14 +148,14 @@ Column-major bit matrix: column j (sample j) is a contiguous `n_slots`-bit array
A kmer query routes through all three levels:
1. **Partition routing**: hash canonical minimiser of the query kmer → partition index → open `part_XXXXX/`.
2. **Layer probing**: iterate layers in order within the partition; for each layer compute `slot = mphf.query(kmer)`, then verify `evidence.decode(slot) == kmer`. First match wins.
3. **Data access**: read `counts[slot]` and/or `presence[slot]` from the matching layer.
2. **Layer probing**: iterate layers in order within the partition; for each layer compute `slot = mphf.index(kmer)`, then verify `evidence.decode(slot) == kmer`. First match wins.
3. **Data access**: read payload at `slot` from the matching layer (counts, presence row, etc.).
```
fn query(kmer) → Option<Hit>:
part = partition_of(kmer)
for (i, layer) in part.layers.iter().enumerate():
slot = layer.mphf.query(kmer)
slot = layer.mphf.index(&kmer.raw())
if layer.evidence.decode(slot) == kmer:
return Some(Hit { layer: i, slot })
return None
@@ -102,7 +171,7 @@ When adding dataset B to an existing index:
1. For each partition, iterate kmers of B routed to that partition.
2. Probe existing layers; collect kmers absent from all layers → `B \ index`.
3. Build a new layer from `B \ index` using the two-phase pipeline (FMPHGO provisional → ptr_hash definitive).
3. Build a new layer from `B \ index`.
4. Append the new layer directory under each `part_XXXXX/`.
5. Update `meta.json` (layer count, sample registry).
@@ -110,59 +179,25 @@ Each partition's new layer is built independently; the operation is fully parall
---
## Core API (sketch)
```rust
// Open an existing index
let map = LayeredMap::open(path)?;
// Query a canonical kmer across all partitions and layers
match map.query(kmer) {
Some(hit) => {
let count = hit.count();
let present = hit.presence_row(); // bit slice over samples
}
None => { /* absent */ }
}
// Non-destructive extension with a new dataset
// unitigs produced by the two-phase pipeline, one per partition
let layer_idx = map.add_layer(unitigs_dir, counts_dir, presence_path)?;
```
---
## Dependencies
| crate | role |
|---|---|
| `ptr_hash` | phase-2 MPHF per layer |
| `ph` (FMPHGO) | phase-1 provisional MPHF during layer construction |
| `rkyv` | zero-copy serialisation of flat arrays (evidence, counts, presence) |
| `ptr_hash 1.1` | MPHF per layer (epserde serialisation) |
| `cacheline-ef 1.1` | compact remap storage inside ptr_hash |
| `epserde 0.8` | zero-copy serialisation of MPHF |
| `memmap2` | mmap of layer files |
| `bitm` | bit-packed presence matrix |
| `obiskio` | unitig file writer/reader |
---
## Serialisation strategy
All flat arrays use `rkyv::Archive`:
```rust
#[derive(Archive, Serialize, Deserialize)]
struct Evidence { slots: Vec<u32> } // packed (unitig_id: 25 | rank: 7)
#[derive(Archive, Serialize, Deserialize)]
struct Counts { data: Vec<u32> }
```
At open time, each file is mmapped and cast to its archived type — no allocation, no copy. The MPHF is loaded via ptr_hash's own API; a rkyv wrapper is a future refinement.
Flat arrays (evidence, counts, presence) use a simple custom binary format (raw bytes, fixed element size) opened via mmap — no additional serialisation crate required.
---
## Open questions
- **ptr_hash + rkyv**: ptr_hash's internals are flat bit arrays; a rkyv-compatible wrapper is structurally feasible. Assess upstream willingness or implement a thin newtype wrapper.
- **Presence matrix layout**: column-major favours per-sample operations; row-major favours per-kmer queries. Decide based on dominant access pattern.
- **Mode 4 scale**: count matrix (n_kmers × n_genomes × bytes_per_count) reaches hundreds of GB for large collections. A sparse representation (store only non-zero entries) may be required; the access pattern and density threshold are not yet defined.
- **Count vector encoding (mode 2)**: `Vec<u16>` vs bit-packed (B=14). Decision pending; depends on whether counts > 65 535 occur in practice for the target datasets.
- **Presence matrix layout**: column-major favours per-genome operations; row-major favours per-kmer queries. Decide based on dominant access pattern.
- **Layer merge**: merging two `LayeredMap` instances into a single-layer index requires full rebuild. Define API and cost model; maintenance operation, not query-path.
- **Canonical kmer orientation**: evidence stores canonical kmer; strand recovery requires one 64-bit revcomp comparison at query time.
- **`try_new_from_par_iter`**: `ptr_hash::new_from_par_iter` silently discards construction failure. Post-construction verification (current workaround) is correct but does not allow retry. A `try_new_from_par_iter` PR upstream would close this gap.
@@ -0,0 +1,186 @@
# PersistentCompactIntVec
## Purpose
`PersistentCompactIntVec` stores a dense array of non-negative integers indexed by MPHF slot where the vast majority of values are small (0254) and large values are rare. It is designed for mmap-compatible random access with minimal memory footprint and optimal cache behaviour.
Motivation from observed count distributions in genomics data: 99.9% of k-mer counts fit in a u8; overflow (count ≥ 255) affects ~0.07% of distinct k-mers but can reach values above 10⁶ (chloroplast, ribosomal repeats).
---
## Design
Two-tier structure:
1. **Primary array**`[u8; n]`, mmap'd as a flat file. Values 0254 are stored directly. Value **255 is a sentinel** meaning "look in overflow".
2. **Overflow structure** — sorted list of `(slot: u32, value: u32)` pairs for all slots where the true value ≥ 255, with a **sparse L1-fitting index** for fast lookup.
```
primary[slot] < 255 → return primary[slot]
primary[slot] == 255 → binary search in overflow
```
---
## Lifecycle
The structure has two distinct runtime roles with different APIs.
### Builder (`PersistentCompactIntVecBuilder`)
Used during layer construction. Holds the primary array and overflow map in memory; supports arbitrary reads and writes before finalisation.
```rust
struct PersistentCompactIntVecBuilder {
primary: Vec<u8>, // in memory; written to disk at close()
overflow: HashMap<u64, u32>, // O(1) get/set for values ≥ 255
}
```
**Phase 1 — `new(n: usize)`**
Allocates `primary` of length `n` initialised to 0. `overflow` is empty.
**Phase 2 — fill (repeated `set` / `get`)**
```rust
fn set(&mut self, slot: u64, value: u32) {
if value < 255 {
self.primary[slot] = value as u8;
self.overflow.remove(&slot); // in case of downward mutation
} else {
self.primary[slot] = 255; // sentinel
self.overflow.insert(slot, value);
}
}
fn get(&self, slot: u64) -> u32 {
match self.primary[slot] {
255 => *self.overflow.get(&slot).unwrap(),
v => v as u32,
}
}
```
Reads and mutations are both O(1). Overflow entries can be created, updated, or removed freely during this phase.
**Phase 3 — `close(primary_path, overflow_path)`**
1. Write `primary` as raw bytes to `counts_primary.bin`.
2. Collect `overflow` into `Vec<(u32, u32)>`, sort by slot.
3. Compute `step` from `n_overflow` (see below).
4. Build sparse index.
5. Write `counts_overflow.bin`.
6. Drop all in-memory state.
The `HashMap` is the only extra allocation: bounded by `n_overflow × (8 + 4 + overhead)` bytes, typically a few MB in practice.
---
### Reader (`PersistentCompactIntVec`)
Used at query time. Both files are mmap'd; the sparse index is loaded into a `Vec` at open time (≤ 32 KB, L1-resident).
```rust
struct PersistentCompactIntVec {
primary: Mmap, // mmap of counts_primary.bin
index: Vec<(u32, u32)>, // sparse index, loaded into RAM at open
data: Mmap, // mmap of overflow data region
n_overflow: u32,
step: u32,
}
```
**`open(primary_path, overflow_path)`**
Mmaps both files. Parses the overflow file header; copies the sparse index into a `Vec` (tiny, warm in cache). The data region stays mmap'd.
**`get(slot: u64) -> u32`** — see Lookup section.
---
## Overflow file format
```
magic: [u8; 4] = b"PCIV"
n_overflow: u32
step: u32 (0 if n_overflow ≤ L1_entries → no sparse index)
[if step > 0]
n_index: u32 = ⌈n_overflow / step⌉
index: [(slot: u32, pos: u32); n_index] ← loaded into RAM at open
data: [(slot: u32, value: u32); n_overflow] sorted by slot, mmap'd
```
`index[i]` stores the slot value and data-array position of the `i × step`-th overflow entry.
---
## Step computation
The step is chosen at `close()` time, once `n_overflow` is known:
```
L1_SIZE = 32 * 1024 // 32 KB conservative target
INDEX_ENTRY = 8 // bytes: (u32, u32)
L1_entries = L1_SIZE / INDEX_ENTRY = 4096
if n_overflow ≤ L1_entries:
step = 0 // no sparse index; data itself fits in a few cache lines
else:
step = ⌈n_overflow / L1_entries⌉
```
For the Betula nana reference (359 044 overflows): step = 88, index = 4 080 entries = 31.9 KB.
---
## Lookup
```
fn get(slot: u64) -> u32:
if primary[slot] < 255:
return primary[slot] as u32
if step == 0:
return binary_search(data[0..n_overflow], slot)
// 1. binary search in index (Vec, L1-resident)
i = upper_bound(index[..].slot, slot) - 1
pos_start = index[i].pos
pos_end = if i+1 < n_index { index[i+1].pos } else { n_overflow }
// 2. binary search in contiguous block (mmap'd)
return binary_search(data[pos_start..pos_end], slot)
```
Cache behaviour: step 1 is entirely within the L1-resident `Vec<(u32,u32)>`; step 2 loads a contiguous block of ≤ `step × 8` bytes from the mmap.
---
## Files
```
layer_N/
counts_primary.bin — [u8; n_slots], raw bytes
counts_overflow.bin — PCIV header + sparse index + sorted data
(absent if n_overflow == 0)
```
If `counts_overflow.bin` is absent, no slot has value ≥ 255; all reads go directly to the primary array.
---
## Complexity
| Operation | Time | Notes |
|---|---|---|
| `set` / `get` (builder) | O(1) | HashMap for overflow |
| `get` (no overflow) | O(1) | single byte read |
| `get` (overflow, with index) | O(log step) | ~2 memory regions |
| `get` (overflow, no index) | O(log n_overflow) | data fits in a few cache lines |
| `close` | O(n_overflow log n_overflow) | sort + index build |
| `open` | O(n_index) | index copy into Vec |
---
## Generalisation
The sentinel (255) and primary type (u8) are fixed. The overflow value type is u32, sufficient for any realistic k-mer count. For a count matrix (mode 4), one `PersistentCompactIntVec` per genome column shares the primary array layout.
+1
View File
@@ -45,6 +45,7 @@ nav:
- MPHF selection: implementation/mphf.md
- Unitig evidence encoding: implementation/unitig_evidence.md
- obilayeredmap crate: implementation/obilayeredmap.md
- PersistentCompactIntVec: implementation/persistent_compact_int_vec.md
- Architecture:
- Sequences: architecture/sequences/invariant.md
+9
View File
@@ -1655,6 +1655,14 @@ dependencies = [
"autocfg",
]
[[package]]
name = "obicompactvec"
version = "0.1.0"
dependencies = [
"memmap2",
"tempfile",
]
[[package]]
name = "obidebruinj"
version = "0.1.0"
@@ -1741,6 +1749,7 @@ dependencies = [
name = "obilayeredmap"
version = "0.1.0"
dependencies = [
"cacheline-ef",
"epserde 0.8.0",
"memmap2",
"obikseq",
+1 -1
View File
@@ -1,5 +1,5 @@
[workspace]
resolver = "3"
members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline", "obikpartitionner","obiskio","obidebruinj","obilayeredmap"]
members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline", "obikpartitionner","obiskio","obidebruinj","obilayeredmap", "obicompactvec"]
[profile.release]
debug = 1
+10
View File
@@ -0,0 +1,10 @@
[package]
name = "obicompactvec"
version = "0.1.0"
edition = "2024"
[dependencies]
memmap2 = "0.9"
[dev-dependencies]
tempfile = "3"
+64
View File
@@ -0,0 +1,64 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufWriter, Write as _};
use std::path::Path;
use crate::format::write_overflow;
pub struct PersistentCompactIntVecBuilder {
primary: Vec<u8>,
overflow: HashMap<u64, u32>,
}
impl PersistentCompactIntVecBuilder {
pub fn new(n: usize) -> Self {
Self {
primary: vec![0u8; n],
overflow: HashMap::new(),
}
}
pub fn set(&mut self, slot: u64, value: u32) {
if value < 255 {
self.primary[slot as usize] = value as u8;
self.overflow.remove(&slot);
} else {
self.primary[slot as usize] = 255;
self.overflow.insert(slot, value);
}
}
pub fn get(&self, slot: u64) -> u32 {
match self.primary[slot as usize] {
255 => *self.overflow.get(&slot).expect("sentinel without overflow entry"),
v => v as u32,
}
}
pub fn len(&self) -> usize {
self.primary.len()
}
/// Write `counts_primary.bin` and (if needed) `counts_overflow.bin`, then drop all state.
pub fn close(self, primary_path: &Path, overflow_path: &Path) -> io::Result<()> {
// Write primary array.
let mut w = BufWriter::new(File::create(primary_path)?);
w.write_all(&self.primary)?;
w.flush()?;
drop(w);
if self.overflow.is_empty() {
return Ok(());
}
// Sort overflow entries by slot.
let mut entries: Vec<(u32, u32)> = self
.overflow
.into_iter()
.map(|(slot, value)| (slot as u32, value))
.collect();
entries.sort_unstable_by_key(|&(slot, _)| slot);
write_overflow(overflow_path, &entries)
}
}
+81
View File
@@ -0,0 +1,81 @@
use std::fs::File;
use std::io::{self, BufWriter, Write as _};
use std::path::Path;
pub const MAGIC: [u8; 4] = *b"PCIV";
// L1 cache target: 32 KB / 8 bytes per index entry
pub const L1_INDEX_ENTRIES: usize = 4096;
/// Write the overflow file from a sorted list of (slot, value) pairs.
pub fn write_overflow(
path: &Path,
entries: &[(u32, u32)],
) -> io::Result<()> {
let n_overflow = entries.len() as u32;
let step: u32 = if entries.len() <= L1_INDEX_ENTRIES {
0
} else {
entries.len().div_ceil(L1_INDEX_ENTRIES) as u32
};
let mut w = BufWriter::new(File::create(path)?);
w.write_all(&MAGIC)?;
w.write_all(&n_overflow.to_le_bytes())?;
w.write_all(&step.to_le_bytes())?;
if step > 0 {
let n_index = entries.len().div_ceil(step as usize) as u32;
w.write_all(&n_index.to_le_bytes())?;
for (pos, chunk) in entries.chunks(step as usize).enumerate() {
let slot = chunk[0].0;
w.write_all(&slot.to_le_bytes())?;
w.write_all(&((pos * step as usize) as u32).to_le_bytes())?;
}
}
for &(slot, value) in entries {
w.write_all(&slot.to_le_bytes())?;
w.write_all(&value.to_le_bytes())?;
}
w.flush()
}
/// Parse the header and sparse index from the overflow file bytes.
/// Returns (n_overflow, step, index_entries, data_byte_offset).
pub fn parse_overflow_header(
data: &[u8],
) -> io::Result<(u32, u32, Vec<(u32, u32)>, usize)> {
let mut pos = 0;
let magic = read4(data, &mut pos)?;
if magic != MAGIC {
return Err(io::Error::new(io::ErrorKind::InvalidData, "bad PCIV magic"));
}
let n_overflow = u32::from_le_bytes(read4(data, &mut pos)?);
let step = u32::from_le_bytes(read4(data, &mut pos)?);
let mut index = Vec::new();
if step > 0 {
let n_index = u32::from_le_bytes(read4(data, &mut pos)?) as usize;
index.reserve(n_index);
for _ in 0..n_index {
let slot = u32::from_le_bytes(read4(data, &mut pos)?);
let ipos = u32::from_le_bytes(read4(data, &mut pos)?);
index.push((slot, ipos));
}
}
Ok((n_overflow, step, index, pos))
}
fn read4(data: &[u8], pos: &mut usize) -> io::Result<[u8; 4]> {
data.get(*pos..*pos + 4)
.and_then(|s| s.try_into().ok())
.map(|arr| { *pos += 4; arr })
.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "truncated PCIV file"))
}
+10
View File
@@ -0,0 +1,10 @@
mod builder;
mod format;
mod reader;
pub use builder::PersistentCompactIntVecBuilder;
pub use reader::PersistentCompactIntVec;
#[cfg(test)]
#[path = "tests/mod.rs"]
mod tests;
+93
View File
@@ -0,0 +1,93 @@
use std::fs::File;
use std::io;
use std::path::Path;
use memmap2::Mmap;
use crate::format::parse_overflow_header;
pub struct PersistentCompactIntVec {
primary: Mmap,
index: Vec<(u32, u32)>, // (slot, pos) — L1-resident sparse index
data: Option<Mmap>, // mmap of overflow data region
n_overflow: u32,
pub step: u32,
data_offset: usize, // byte offset of data region within overflow mmap
}
impl PersistentCompactIntVec {
/// Open a previously written `PersistentCompactIntVec`.
///
/// `overflow_path` is optional: pass `None` if no overflow file was written
/// (i.e. all values were < 255).
pub fn open(primary_path: &Path, overflow_path: Option<&Path>) -> io::Result<Self> {
let primary = unsafe { Mmap::map(&File::open(primary_path)?)? };
let (data, n_overflow, step, index, data_offset) = match overflow_path {
None => (None, 0, 0, Vec::new(), 0),
Some(p) => {
let mmap = unsafe { Mmap::map(&File::open(p)?)? };
let (n_overflow, step, index, data_offset) =
parse_overflow_header(&mmap)?;
(Some(mmap), n_overflow, step, index, data_offset)
}
};
Ok(Self { primary, index, data, n_overflow, step, data_offset })
}
pub fn len(&self) -> usize {
self.primary.len()
}
pub fn is_empty(&self) -> bool {
self.primary.is_empty()
}
pub fn get(&self, slot: u64) -> u32 {
match self.primary[slot as usize] {
255 => self.overflow_get(slot as u32),
v => v as u32,
}
}
fn overflow_get(&self, slot: u32) -> u32 {
let data = self.data.as_ref().expect("sentinel without overflow file");
let raw = &data[self.data_offset..];
let pos_start;
let pos_end;
if self.step == 0 {
pos_start = 0;
pos_end = self.n_overflow as usize;
} else {
// Binary search in the L1-resident sparse index.
let i = self.index.partition_point(|&(s, _)| s <= slot).saturating_sub(1);
pos_start = self.index[i].1 as usize;
pos_end = if i + 1 < self.index.len() {
self.index[i + 1].1 as usize
} else {
self.n_overflow as usize
};
}
// Binary search within the block.
let block = Self::data_slice(raw, pos_start, pos_end);
match block.binary_search_by_key(&slot, |&(s, _)| s) {
Ok(i) => block[i].1,
Err(_) => panic!("slot {slot} marked as overflow but not found in data"),
}
}
/// Interpret a byte slice as a slice of (u32, u32) pairs without copying.
fn data_slice(raw: &[u8], from: usize, to: usize) -> &[(u32, u32)] {
let byte_start = from * 8;
let byte_end = to * 8;
let bytes = &raw[byte_start..byte_end];
// Safety: the file was written as LE u32 pairs; alignment is guaranteed
// because we read from a byte slice and cast explicitly.
let ptr = bytes.as_ptr() as *const (u32, u32);
unsafe { std::slice::from_raw_parts(ptr, to - from) }
}
}
+119
View File
@@ -0,0 +1,119 @@
use tempfile::tempdir;
use crate::{PersistentCompactIntVec, PersistentCompactIntVecBuilder};
fn primary(dir: &std::path::Path) -> std::path::PathBuf { dir.join("primary.bin") }
fn overflow(dir: &std::path::Path) -> std::path::PathBuf { dir.join("overflow.bin") }
fn roundtrip(values: &[(u64, u32)], n: usize) -> Vec<u32> {
let dir = tempdir().unwrap();
let mut b = PersistentCompactIntVecBuilder::new(n);
for &(slot, v) in values {
b.set(slot, v);
}
let ov = overflow(dir.path());
b.close(&primary(dir.path()), &ov).unwrap();
let ov_path = ov.exists().then_some(ov.as_path());
let r = PersistentCompactIntVec::open(&primary(dir.path()), ov_path).unwrap();
(0..n as u64).map(|s| r.get(s)).collect()
}
#[test]
fn all_zero_by_default() {
let got = roundtrip(&[], 8);
assert!(got.iter().all(|&v| v == 0));
}
#[test]
fn small_values_no_overflow_file() {
let dir = tempdir().unwrap();
let mut b = PersistentCompactIntVecBuilder::new(4);
b.set(0, 1);
b.set(1, 254);
b.set(2, 0);
b.set(3, 100);
let ov = overflow(dir.path());
b.close(&primary(dir.path()), &ov).unwrap();
assert!(!ov.exists(), "no overflow file expected");
let r = PersistentCompactIntVec::open(&primary(dir.path()), None).unwrap();
assert_eq!(r.get(0), 1);
assert_eq!(r.get(1), 254);
assert_eq!(r.get(2), 0);
assert_eq!(r.get(3), 100);
}
#[test]
fn overflow_values_roundtrip() {
let values = [(0, 255), (1, 1000), (2, 50), (3, 1_313_691)];
let got = roundtrip(&values, 4);
assert_eq!(got[0], 255);
assert_eq!(got[1], 1000);
assert_eq!(got[2], 50);
assert_eq!(got[3], 1_313_691);
}
#[test]
fn mutation_downward_removes_from_overflow() {
let dir = tempdir().unwrap();
let mut b = PersistentCompactIntVecBuilder::new(2);
b.set(0, 1000); // goes to overflow
b.set(0, 42); // comes back below threshold
let ov = overflow(dir.path());
b.close(&primary(dir.path()), &ov).unwrap();
assert!(!ov.exists(), "no overflow file expected after downward mutation");
let r = PersistentCompactIntVec::open(&primary(dir.path()), None).unwrap();
assert_eq!(r.get(0), 42);
}
#[test]
fn mutation_upward_updates_overflow() {
let dir = tempdir().unwrap();
let mut b = PersistentCompactIntVecBuilder::new(1);
b.set(0, 300);
b.set(0, 500);
let ov = overflow(dir.path());
b.close(&primary(dir.path()), &ov).unwrap();
let r = PersistentCompactIntVec::open(&primary(dir.path()), Some(&ov)).unwrap();
assert_eq!(r.get(0), 500);
}
#[test]
fn sparse_index_built_for_many_overflows() {
// Generate more than L1_INDEX_ENTRIES (4096) overflow entries to trigger the sparse index.
let n = 5000usize;
let dir = tempdir().unwrap();
let mut b = PersistentCompactIntVecBuilder::new(n);
for i in 0..n {
b.set(i as u64, 1000 + i as u32); // all ≥ 255
}
let ov = overflow(dir.path());
b.close(&primary(dir.path()), &ov).unwrap();
assert!(ov.exists());
let r = PersistentCompactIntVec::open(&primary(dir.path()), Some(&ov)).unwrap();
assert!(r.step > 0, "sparse index should have been built");
for i in 0..n {
assert_eq!(r.get(i as u64), 1000 + i as u32, "mismatch at slot {i}");
}
}
#[test]
fn mixed_large_dataset() {
// Mirrors realistic distribution: most values small, sparse overflows.
let n = 1000usize;
let dir = tempdir().unwrap();
let mut b = PersistentCompactIntVecBuilder::new(n);
for i in 0..n {
let v = if i % 100 == 0 { 100_000 + i as u32 } else { i as u32 % 200 };
b.set(i as u64, v);
}
let ov = overflow(dir.path());
b.close(&primary(dir.path()), &ov).unwrap();
let r = PersistentCompactIntVec::open(&primary(dir.path()), ov.exists().then_some(ov.as_path())).unwrap();
for i in 0..n {
let expected = if i % 100 == 0 { 100_000 + i as u32 } else { i as u32 % 200 };
assert_eq!(r.get(i as u64), expected, "slot {i}");
}
}
+9 -8
View File
@@ -4,14 +4,15 @@ version = "0.1.0"
edition = "2024"
[dependencies]
obikseq = { path = "../obikseq" }
obiskio = { path = "../obiskio" }
ptr_hash = "1.1"
epserde = "0.8"
rayon = "1"
memmap2 = "0.9"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
obikseq = { path = "../obikseq" }
obiskio = { path = "../obiskio" }
ptr_hash = "1.1"
cacheline-ef = "1.1"
epserde = "0.8"
rayon = "1"
memmap2 = "0.9"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
[dev-dependencies]
tempfile = "3"
+26 -20
View File
@@ -2,10 +2,11 @@ use std::collections::HashMap;
use std::fs;
use std::path::Path;
use cacheline_ef::{CachelineEf, CachelineEfVec};
use epserde::prelude::*;
use obikseq::CanonicalKmer;
use obiskio::{UnitigFileReader, UnitigFileWriter};
use ptr_hash::{DefaultPtrHash, PtrHashParams};
use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64};
use crate::counts::{Counts, CountsWriter};
use crate::error::{OLMError, OLMResult};
@@ -16,8 +17,10 @@ const UNITIGS_FILE: &str = "unitigs.bin";
const EVIDENCE_FILE: &str = "evidence.bin";
const COUNTS_FILE: &str = "counts.bin";
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
pub struct Layer {
mphf: DefaultPtrHash,
mphf: Mphf,
evidence: Evidence,
unitigs: UnitigFileReader,
counts: Counts,
@@ -30,19 +33,14 @@ pub struct Hit {
impl Layer {
pub fn open(path: &Path) -> OLMResult<Self> {
let mphf: DefaultPtrHash = DefaultPtrHash::load_full(&path.join(MPHF_FILE))
let mphf: Mphf = Mphf::load_full(&path.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
let unitigs = UnitigFileReader::open(&path.join(UNITIGS_FILE))?;
let evidence = Evidence::open(&path.join(EVIDENCE_FILE))?;
let counts = Counts::open(&path.join(COUNTS_FILE))?;
Ok(Self {
mphf,
evidence,
unitigs,
counts,
})
Ok(Self { mphf, evidence, unitigs, counts })
}
pub fn query(&self, kmer: CanonicalKmer) -> Option<Hit> {
@@ -52,10 +50,7 @@ impl Layer {
.unitigs
.verify_canonical_kmer(chunk_id as usize, rank as usize, kmer)
{
Some(Hit {
slot,
count: self.counts.get(slot),
})
Some(Hit { slot, count: self.counts.get(slot) })
} else {
None
}
@@ -74,28 +69,39 @@ impl Layer {
if n == 0 {
fs::File::create(out_dir.join(EVIDENCE_FILE))?;
fs::File::create(out_dir.join(COUNTS_FILE))?;
let mphf: DefaultPtrHash = DefaultPtrHash::new(&[] as &[u64], PtrHashParams::default());
let mphf: Mphf = Mphf::try_new(&[] as &[u64], PtrHashParams::<CubicEps>::default())
.ok_or_else(|| OLMError::Mphf("construction failed".into()))?;
mphf.store(&out_dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
return Ok(0);
}
// Build MPHF from a cloneable parallel iterator — no Vec<u64> allocation.
// First pass: build the MPHF from a cloneable parallel iterator.
// flat_map_iter: outer chunks in parallel, inner kmer sliding-window sequential.
let keys = (0..unitigs.len())
.into_par_iter()
.flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw()));
let mphf: DefaultPtrHash =
DefaultPtrHash::new_from_par_iter(n, keys, PtrHashParams::default());
let mphf: Mphf = Mphf::new_from_par_iter(n, keys, PtrHashParams::<CubicEps>::default());
mphf.store(&out_dir.join(MPHF_FILE))
.map_err(|e| OLMError::InvalidLayer(e.to_string()))?;
// Second pass: fill evidence and counts
let mut ev = EvidenceWriter::new(n);
let mut cnt = CountsWriter::new(n);
// Second pass: fill evidence and counts; verify MPHF correctness inline.
// seen is a compact bitset (n/8 bytes) — no extra iteration needed.
let mut ev = EvidenceWriter::new(n);
let mut cnt = CountsWriter::new(n);
let mut seen = vec![0u8; (n + 7) / 8];
for (kmer, chunk_id, rank) in unitigs.iter_indexed_canonical_kmers() {
let slot = mphf.index(&kmer.raw());
if slot >= n {
return Err(OLMError::Mphf("MPHF construction failed: slot out of bounds".into()));
}
let byte = slot / 8;
let bit = 1u8 << (slot % 8);
if seen[byte] & bit != 0 {
return Err(OLMError::Mphf("MPHF construction failed: duplicate slot".into()));
}
seen[byte] |= bit;
ev.set(slot, chunk_id as u32, rank as u8);
cnt.set(slot, count_of(kmer));
}