diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..d93136d --- /dev/null +++ b/TODO.md @@ -0,0 +1,4 @@ +## Dans OBILayeredMap + +- Est-ce que CachelineEfVec est vraiment justifier dans notre cas. vu les contraintes sur la distribution des valeurs imposées par CachelineEfVec en terme d'ordre, de grandeur et de dispersion ? +- Il semble que le count de kmer soit stocké, ce qui doit-être une possibilité pas une obligation. diff --git a/docmd/implementation/obilayeredmap.md b/docmd/implementation/obilayeredmap.md index 44c6473..d10bba3 100644 --- a/docmd/implementation/obilayeredmap.md +++ b/docmd/implementation/obilayeredmap.md @@ -6,6 +6,59 @@ --- +## Four usage modes + +The MPHF + evidence infrastructure is fixed for all modes. The **payload** — data associated with each slot — is orthogonal and varies by mode. + +| Mode | Description | Payload | +|---|---|---| +| 1. Set | membership test only | none | +| 2. Set with count | occurrences per kmer per sample | compact integer vector | +| 3. Presence/absence matrix | which genomes contain each kmer | bit matrix n_kmers × n_genomes | +| 4. Count matrix | occurrences per kmer per genome | integer matrix n_kmers × n_genomes | + +Mode 4 is architecturally identical to mode 3 but with counts instead of bits; the main open question is scale — a naive dense representation reaches hundreds of GB for large genome collections, which may require a sparse encoding. + +### Payload for mode 2: compact count vector + +Counts follow a roughly geometric distribution: the large majority are below 128, almost all below 16 000, with rare large values in highly covered regions. Options for a mmap-compatible random-access count vector: + +- **`Vec`** — 2 bytes/slot, covers 0–65 535, O(1) access, trivially mmap-able. Practical upper bound sufficient for most metagenomic use cases. +- **Bit-packed fixed width** — choose B = ⌈log₂(max\_count)⌉ globally (e.g. B=14 for 99.9% coverage at 1.75 bytes/slot). O(1) access via bit-shift arithmetic. +- **Block-varint (PForDelta, StreamVByte)** — good compression but random access requires a separate offset index; no mature Rust crate for mmap use. + +Decision not yet made. `Vec` is the default baseline pending profiling. + +### Payload for mode 3: presence/absence matrix + +Column-major bit matrix: column j (genome j) is a contiguous `n_slots`-bit array. This layout makes per-genome operations (union, intersection, diff) cache-friendly. For 10⁹ slots × 100 genomes ≈ 12.5 GB — large but mmap-able with no resident-memory cost at open time. + +--- + +## Payload architecture + +The payload is orthogonal to the MPHF + evidence layer. `Layer` will be parameterised by a payload type `D`: + +```rust +struct Layer { + mphf: Mphf, + evidence: Evidence, + unitigs: UnitigFileReader, + data: D, +} +``` + +Where `D` implements a `LayerData` trait covering open/close/get. Concrete instances: + +- `()` — mode 1 +- `CountVec` — mode 2 (u16 or bit-packed) +- `BitMatrix` — mode 3 +- `CountMatrix` — mode 4 + +This parameterisation is not yet implemented; the current code uses a fixed `Counts` field. + +--- + ## Three-level hierarchy ``` @@ -15,14 +68,14 @@ index_root/ ← LayeredMap (collection) layer_0/ ← Layer mphf.bin unitigs.bin + unitigs.bin.idx evidence.bin - counts.bin - presence.bin + counts.bin [mode 2 only] + presence.bin [mode 3/4 only] layer_1/ ... part_00001/ layer_0/ - layer_1/ ... ``` @@ -38,19 +91,15 @@ index_root/ ← LayeredMap (collection) ``` layer_N/ - mphf.bin — ptr_hash MPHF (exact key count, phase-2 construction) - unitigs.bin — packed 2-bit nucleotide sequences (concatenated, variable-length) - unitig_offsets.bin — u32 per unitig: nucleotide offset of unitig j in unitigs.bin + mphf.bin — ptr_hash MPHF (epserde, ptr_hash native format) + unitigs.bin — packed 2-bit nucleotide sequences (obiskio binary format) + unitigs.bin.idx — UIDX index: n_unitigs, n_kmers, seqls[], packed_offsets[] evidence.bin — u32 per MPHF slot: (unitig_id: 25 | rank: 7) - counts.bin — u32 per MPHF slot (total kmer occurrences) - presence.bin — bit matrix: n_slots × n_samples [optional] + counts.bin — [optional] u16 per MPHF slot (mode 2) + presence.bin — [optional] bit matrix n_slots × n_samples (mode 3/4) ``` -Unitigs have variable lengths. Each record in `unitigs.bin` is self-delimiting: it begins with a varint `seql` (sequence length in nucleotides) followed by `(seql+3)/4` packed bytes — the streaming format defined in `obiskio`. Sequential scan is always possible using the in-record `seql`. - -For O(1) random access, `unitig_offsets.bin` is a **precomputed derived index**: a u32 array of byte offsets into `unitigs.bin`, with n_unitigs + 1 entries (sentinel = total byte size). Built once at construction by a single sequential scan; reconstructible from `unitigs.bin` if lost. Access: `unitigs.bin[offsets[j] .. offsets[j+1]]`. - -All files except `mphf.bin` are flat arrays of fixed-size elements, serialised with **rkyv** for zero-copy mmap access. `mphf.bin` uses ptr_hash's native serialisation; rkyv integration is deferred (see open questions). +`unitigs.bin` is the packed-2-bit sequence file produced by `obiskio::UnitigFileWriter`. The companion `.idx` file stores: magic `UIDX`, `n_unitigs: u32`, `n_kmers: u64`, `seqls: [u8; n_unitigs]` (kmer count − 1 per chunk), and `packed_offsets: [u32; n_unitigs + 1]` (byte offsets into `unitigs.bin`, sentinel-terminated). This gives O(1) random access to any unitig and the total kmer count without scanning the sequence file. ### Evidence encoding @@ -68,9 +117,29 @@ kmer = unitigs[unitig_id][rank .. rank + k] // 2-bit packed slice `rank` is the kmer's 0-based index within the unitig (kmer units, not nucleotides). For k=31, m=11, the structural maximum is k − m + 1 = 21 kmers per unitig; the empirical maximum observed is ~46 kmers. A `u7` (0–127) is sufficient. -### Presence/absence matrix +--- -Column-major bit matrix: column j (sample j) is a contiguous `n_slots`-bit array. This layout makes per-sample operations (union, intersection, diff over a column) cache-friendly. For large matrices (e.g. 10 M slots × 1 000 samples ≈ 1.25 GB per partition), rkyv + mmap avoids loading the full matrix at open time. +## ptr_hash configuration + +The MPHF per layer is configured as: + +```rust +type Mphf = PtrHash< + u64, // key type: canonical kmer raw encoding + CubicEps, // bucket fn: balanced (2.4 bits/key, λ=3.5) + CachelineEfVec>, // remap: 11.6 bits/entry vs 32 for Vec + Xx64, // hasher: XXH3-64 with seed, handles structured keys + Vec, // pilots +>; +``` + +**Hasher choice — `Xx64`:** k-mer raw values are left-aligned u64 with structural zeros in low bits (42 zeros for k=11, 2 zeros for k=31). `FxHash` (single multiply) distributes these poorly. `Xx64` (XXH3 64-bit, seeded) handles structured input correctly. + +**Bucket function — `CubicEps` with `PtrHashParams::::default()`:** λ=3.5, α=0.99. Balanced tradeoff: 2× slower construction than `Linear/λ=3.0` (the `default_fast` preset), 20% less space. `default_compact` (λ=4.0) saves a further 12.5% at 2× more construction time and reduced reliability — not chosen. + +**Remap — `CachelineEfVec`:** Elias-Fano variant packing 44 sorted 40-bit values per 64-byte cacheline (11.6 bits/value vs 32 for `Vec`). Already a transitive dependency of `ptr_hash`. One cacheline per query vs one u32 read; space win dominates for billion-scale key sets. + +**Construction:** `new_from_par_iter` avoids materialising all keys as `Vec`. MPHF correctness is verified inline during the second pass (evidence/counts fill) using an n/8-byte bitset; any duplicate slot or out-of-bounds index returns `OLMError::Mphf`. --- @@ -79,14 +148,14 @@ Column-major bit matrix: column j (sample j) is a contiguous `n_slots`-bit array A kmer query routes through all three levels: 1. **Partition routing**: hash canonical minimiser of the query kmer → partition index → open `part_XXXXX/`. -2. **Layer probing**: iterate layers in order within the partition; for each layer compute `slot = mphf.query(kmer)`, then verify `evidence.decode(slot) == kmer`. First match wins. -3. **Data access**: read `counts[slot]` and/or `presence[slot]` from the matching layer. +2. **Layer probing**: iterate layers in order within the partition; for each layer compute `slot = mphf.index(kmer)`, then verify `evidence.decode(slot) == kmer`. First match wins. +3. **Data access**: read payload at `slot` from the matching layer (counts, presence row, etc.). ``` fn query(kmer) → Option: part = partition_of(kmer) for (i, layer) in part.layers.iter().enumerate(): - slot = layer.mphf.query(kmer) + slot = layer.mphf.index(&kmer.raw()) if layer.evidence.decode(slot) == kmer: return Some(Hit { layer: i, slot }) return None @@ -102,7 +171,7 @@ When adding dataset B to an existing index: 1. For each partition, iterate kmers of B routed to that partition. 2. Probe existing layers; collect kmers absent from all layers → `B \ index`. -3. Build a new layer from `B \ index` using the two-phase pipeline (FMPHGO provisional → ptr_hash definitive). +3. Build a new layer from `B \ index`. 4. Append the new layer directory under each `part_XXXXX/`. 5. Update `meta.json` (layer count, sample registry). @@ -110,59 +179,25 @@ Each partition's new layer is built independently; the operation is fully parall --- -## Core API (sketch) - -```rust -// Open an existing index -let map = LayeredMap::open(path)?; - -// Query a canonical kmer across all partitions and layers -match map.query(kmer) { - Some(hit) => { - let count = hit.count(); - let present = hit.presence_row(); // bit slice over samples - } - None => { /* absent */ } -} - -// Non-destructive extension with a new dataset -// unitigs produced by the two-phase pipeline, one per partition -let layer_idx = map.add_layer(unitigs_dir, counts_dir, presence_path)?; -``` - ---- - ## Dependencies | crate | role | |---|---| -| `ptr_hash` | phase-2 MPHF per layer | -| `ph` (FMPHGO) | phase-1 provisional MPHF during layer construction | -| `rkyv` | zero-copy serialisation of flat arrays (evidence, counts, presence) | +| `ptr_hash 1.1` | MPHF per layer (epserde serialisation) | +| `cacheline-ef 1.1` | compact remap storage inside ptr_hash | +| `epserde 0.8` | zero-copy serialisation of MPHF | | `memmap2` | mmap of layer files | -| `bitm` | bit-packed presence matrix | +| `obiskio` | unitig file writer/reader | ---- - -## Serialisation strategy - -All flat arrays use `rkyv::Archive`: - -```rust -#[derive(Archive, Serialize, Deserialize)] -struct Evidence { slots: Vec } // packed (unitig_id: 25 | rank: 7) - -#[derive(Archive, Serialize, Deserialize)] -struct Counts { data: Vec } -``` - -At open time, each file is mmapped and cast to its archived type — no allocation, no copy. The MPHF is loaded via ptr_hash's own API; a rkyv wrapper is a future refinement. +Flat arrays (evidence, counts, presence) use a simple custom binary format (raw bytes, fixed element size) opened via mmap — no additional serialisation crate required. --- ## Open questions -- **ptr_hash + rkyv**: ptr_hash's internals are flat bit arrays; a rkyv-compatible wrapper is structurally feasible. Assess upstream willingness or implement a thin newtype wrapper. -- **Presence matrix layout**: column-major favours per-sample operations; row-major favours per-kmer queries. Decide based on dominant access pattern. +- **Mode 4 scale**: count matrix (n_kmers × n_genomes × bytes_per_count) reaches hundreds of GB for large collections. A sparse representation (store only non-zero entries) may be required; the access pattern and density threshold are not yet defined. +- **Count vector encoding (mode 2)**: `Vec` vs bit-packed (B=14). Decision pending; depends on whether counts > 65 535 occur in practice for the target datasets. +- **Presence matrix layout**: column-major favours per-genome operations; row-major favours per-kmer queries. Decide based on dominant access pattern. - **Layer merge**: merging two `LayeredMap` instances into a single-layer index requires full rebuild. Define API and cost model; maintenance operation, not query-path. - **Canonical kmer orientation**: evidence stores canonical kmer; strand recovery requires one 64-bit revcomp comparison at query time. +- **`try_new_from_par_iter`**: `ptr_hash::new_from_par_iter` silently discards construction failure. Post-construction verification (current workaround) is correct but does not allow retry. A `try_new_from_par_iter` PR upstream would close this gap. diff --git a/docmd/implementation/persistent_compact_int_vec.md b/docmd/implementation/persistent_compact_int_vec.md new file mode 100644 index 0000000..ba3b4b6 --- /dev/null +++ b/docmd/implementation/persistent_compact_int_vec.md @@ -0,0 +1,186 @@ +# PersistentCompactIntVec + +## Purpose + +`PersistentCompactIntVec` stores a dense array of non-negative integers indexed by MPHF slot where the vast majority of values are small (0–254) and large values are rare. It is designed for mmap-compatible random access with minimal memory footprint and optimal cache behaviour. + +Motivation from observed count distributions in genomics data: 99.9% of k-mer counts fit in a u8; overflow (count ≥ 255) affects ~0.07% of distinct k-mers but can reach values above 10⁶ (chloroplast, ribosomal repeats). + +--- + +## Design + +Two-tier structure: + +1. **Primary array** — `[u8; n]`, mmap'd as a flat file. Values 0–254 are stored directly. Value **255 is a sentinel** meaning "look in overflow". +2. **Overflow structure** — sorted list of `(slot: u32, value: u32)` pairs for all slots where the true value ≥ 255, with a **sparse L1-fitting index** for fast lookup. + +``` +primary[slot] < 255 → return primary[slot] +primary[slot] == 255 → binary search in overflow +``` + +--- + +## Lifecycle + +The structure has two distinct runtime roles with different APIs. + +### Builder (`PersistentCompactIntVecBuilder`) + +Used during layer construction. Holds the primary array and overflow map in memory; supports arbitrary reads and writes before finalisation. + +```rust +struct PersistentCompactIntVecBuilder { + primary: Vec, // in memory; written to disk at close() + overflow: HashMap, // O(1) get/set for values ≥ 255 +} +``` + +**Phase 1 — `new(n: usize)`** +Allocates `primary` of length `n` initialised to 0. `overflow` is empty. + +**Phase 2 — fill (repeated `set` / `get`)** + +```rust +fn set(&mut self, slot: u64, value: u32) { + if value < 255 { + self.primary[slot] = value as u8; + self.overflow.remove(&slot); // in case of downward mutation + } else { + self.primary[slot] = 255; // sentinel + self.overflow.insert(slot, value); + } +} + +fn get(&self, slot: u64) -> u32 { + match self.primary[slot] { + 255 => *self.overflow.get(&slot).unwrap(), + v => v as u32, + } +} +``` + +Reads and mutations are both O(1). Overflow entries can be created, updated, or removed freely during this phase. + +**Phase 3 — `close(primary_path, overflow_path)`** + +1. Write `primary` as raw bytes to `counts_primary.bin`. +2. Collect `overflow` into `Vec<(u32, u32)>`, sort by slot. +3. Compute `step` from `n_overflow` (see below). +4. Build sparse index. +5. Write `counts_overflow.bin`. +6. Drop all in-memory state. + +The `HashMap` is the only extra allocation: bounded by `n_overflow × (8 + 4 + overhead)` bytes, typically a few MB in practice. + +--- + +### Reader (`PersistentCompactIntVec`) + +Used at query time. Both files are mmap'd; the sparse index is loaded into a `Vec` at open time (≤ 32 KB, L1-resident). + +```rust +struct PersistentCompactIntVec { + primary: Mmap, // mmap of counts_primary.bin + index: Vec<(u32, u32)>, // sparse index, loaded into RAM at open + data: Mmap, // mmap of overflow data region + n_overflow: u32, + step: u32, +} +``` + +**`open(primary_path, overflow_path)`** +Mmaps both files. Parses the overflow file header; copies the sparse index into a `Vec` (tiny, warm in cache). The data region stays mmap'd. + +**`get(slot: u64) -> u32`** — see Lookup section. + +--- + +## Overflow file format + +``` +magic: [u8; 4] = b"PCIV" +n_overflow: u32 +step: u32 (0 if n_overflow ≤ L1_entries → no sparse index) +[if step > 0] + n_index: u32 = ⌈n_overflow / step⌉ + index: [(slot: u32, pos: u32); n_index] ← loaded into RAM at open +data: [(slot: u32, value: u32); n_overflow] sorted by slot, mmap'd +``` + +`index[i]` stores the slot value and data-array position of the `i × step`-th overflow entry. + +--- + +## Step computation + +The step is chosen at `close()` time, once `n_overflow` is known: + +``` +L1_SIZE = 32 * 1024 // 32 KB conservative target +INDEX_ENTRY = 8 // bytes: (u32, u32) +L1_entries = L1_SIZE / INDEX_ENTRY = 4096 + +if n_overflow ≤ L1_entries: + step = 0 // no sparse index; data itself fits in a few cache lines +else: + step = ⌈n_overflow / L1_entries⌉ +``` + +For the Betula nana reference (359 044 overflows): step = 88, index = 4 080 entries = 31.9 KB. + +--- + +## Lookup + +``` +fn get(slot: u64) -> u32: + if primary[slot] < 255: + return primary[slot] as u32 + + if step == 0: + return binary_search(data[0..n_overflow], slot) + + // 1. binary search in index (Vec, L1-resident) + i = upper_bound(index[..].slot, slot) - 1 + pos_start = index[i].pos + pos_end = if i+1 < n_index { index[i+1].pos } else { n_overflow } + + // 2. binary search in contiguous block (mmap'd) + return binary_search(data[pos_start..pos_end], slot) +``` + +Cache behaviour: step 1 is entirely within the L1-resident `Vec<(u32,u32)>`; step 2 loads a contiguous block of ≤ `step × 8` bytes from the mmap. + +--- + +## Files + +``` +layer_N/ + counts_primary.bin — [u8; n_slots], raw bytes + counts_overflow.bin — PCIV header + sparse index + sorted data + (absent if n_overflow == 0) +``` + +If `counts_overflow.bin` is absent, no slot has value ≥ 255; all reads go directly to the primary array. + +--- + +## Complexity + +| Operation | Time | Notes | +|---|---|---| +| `set` / `get` (builder) | O(1) | HashMap for overflow | +| `get` (no overflow) | O(1) | single byte read | +| `get` (overflow, with index) | O(log step) | ~2 memory regions | +| `get` (overflow, no index) | O(log n_overflow) | data fits in a few cache lines | +| `close` | O(n_overflow log n_overflow) | sort + index build | +| `open` | O(n_index) | index copy into Vec | + +--- + +## Generalisation + +The sentinel (255) and primary type (u8) are fixed. The overflow value type is u32, sufficient for any realistic k-mer count. For a count matrix (mode 4), one `PersistentCompactIntVec` per genome column shares the primary array layout. diff --git a/mkdocs.yml b/mkdocs.yml index 6c2cfa4..c4aac85 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -45,6 +45,7 @@ nav: - MPHF selection: implementation/mphf.md - Unitig evidence encoding: implementation/unitig_evidence.md - obilayeredmap crate: implementation/obilayeredmap.md + - PersistentCompactIntVec: implementation/persistent_compact_int_vec.md - Architecture: - Sequences: architecture/sequences/invariant.md diff --git a/src/Cargo.lock b/src/Cargo.lock index 6670645..a26afbd 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1655,6 +1655,14 @@ dependencies = [ "autocfg", ] +[[package]] +name = "obicompactvec" +version = "0.1.0" +dependencies = [ + "memmap2", + "tempfile", +] + [[package]] name = "obidebruinj" version = "0.1.0" @@ -1741,6 +1749,7 @@ dependencies = [ name = "obilayeredmap" version = "0.1.0" dependencies = [ + "cacheline-ef", "epserde 0.8.0", "memmap2", "obikseq", diff --git a/src/Cargo.toml b/src/Cargo.toml index 997fbfe..9f3fc56 100644 --- a/src/Cargo.toml +++ b/src/Cargo.toml @@ -1,5 +1,5 @@ [workspace] resolver = "3" -members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline", "obikpartitionner","obiskio","obidebruinj","obilayeredmap"] +members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline", "obikpartitionner","obiskio","obidebruinj","obilayeredmap", "obicompactvec"] [profile.release] debug = 1 diff --git a/src/obicompactvec/Cargo.toml b/src/obicompactvec/Cargo.toml new file mode 100644 index 0000000..1d65aa9 --- /dev/null +++ b/src/obicompactvec/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "obicompactvec" +version = "0.1.0" +edition = "2024" + +[dependencies] +memmap2 = "0.9" + +[dev-dependencies] +tempfile = "3" diff --git a/src/obicompactvec/src/builder.rs b/src/obicompactvec/src/builder.rs new file mode 100644 index 0000000..ed3a1a8 --- /dev/null +++ b/src/obicompactvec/src/builder.rs @@ -0,0 +1,64 @@ +use std::collections::HashMap; +use std::fs::File; +use std::io::{self, BufWriter, Write as _}; +use std::path::Path; + +use crate::format::write_overflow; + +pub struct PersistentCompactIntVecBuilder { + primary: Vec, + overflow: HashMap, +} + +impl PersistentCompactIntVecBuilder { + pub fn new(n: usize) -> Self { + Self { + primary: vec![0u8; n], + overflow: HashMap::new(), + } + } + + pub fn set(&mut self, slot: u64, value: u32) { + if value < 255 { + self.primary[slot as usize] = value as u8; + self.overflow.remove(&slot); + } else { + self.primary[slot as usize] = 255; + self.overflow.insert(slot, value); + } + } + + pub fn get(&self, slot: u64) -> u32 { + match self.primary[slot as usize] { + 255 => *self.overflow.get(&slot).expect("sentinel without overflow entry"), + v => v as u32, + } + } + + pub fn len(&self) -> usize { + self.primary.len() + } + + /// Write `counts_primary.bin` and (if needed) `counts_overflow.bin`, then drop all state. + pub fn close(self, primary_path: &Path, overflow_path: &Path) -> io::Result<()> { + // Write primary array. + let mut w = BufWriter::new(File::create(primary_path)?); + w.write_all(&self.primary)?; + w.flush()?; + drop(w); + + if self.overflow.is_empty() { + return Ok(()); + } + + // Sort overflow entries by slot. + let mut entries: Vec<(u32, u32)> = self + .overflow + .into_iter() + .map(|(slot, value)| (slot as u32, value)) + .collect(); + entries.sort_unstable_by_key(|&(slot, _)| slot); + + write_overflow(overflow_path, &entries) + } +} diff --git a/src/obicompactvec/src/format.rs b/src/obicompactvec/src/format.rs new file mode 100644 index 0000000..ec64bcb --- /dev/null +++ b/src/obicompactvec/src/format.rs @@ -0,0 +1,81 @@ +use std::fs::File; +use std::io::{self, BufWriter, Write as _}; +use std::path::Path; + +pub const MAGIC: [u8; 4] = *b"PCIV"; + +// L1 cache target: 32 KB / 8 bytes per index entry +pub const L1_INDEX_ENTRIES: usize = 4096; + +/// Write the overflow file from a sorted list of (slot, value) pairs. +pub fn write_overflow( + path: &Path, + entries: &[(u32, u32)], +) -> io::Result<()> { + let n_overflow = entries.len() as u32; + + let step: u32 = if entries.len() <= L1_INDEX_ENTRIES { + 0 + } else { + entries.len().div_ceil(L1_INDEX_ENTRIES) as u32 + }; + + let mut w = BufWriter::new(File::create(path)?); + + w.write_all(&MAGIC)?; + w.write_all(&n_overflow.to_le_bytes())?; + w.write_all(&step.to_le_bytes())?; + + if step > 0 { + let n_index = entries.len().div_ceil(step as usize) as u32; + w.write_all(&n_index.to_le_bytes())?; + for (pos, chunk) in entries.chunks(step as usize).enumerate() { + let slot = chunk[0].0; + w.write_all(&slot.to_le_bytes())?; + w.write_all(&((pos * step as usize) as u32).to_le_bytes())?; + } + } + + for &(slot, value) in entries { + w.write_all(&slot.to_le_bytes())?; + w.write_all(&value.to_le_bytes())?; + } + + w.flush() +} + +/// Parse the header and sparse index from the overflow file bytes. +/// Returns (n_overflow, step, index_entries, data_byte_offset). +pub fn parse_overflow_header( + data: &[u8], +) -> io::Result<(u32, u32, Vec<(u32, u32)>, usize)> { + let mut pos = 0; + + let magic = read4(data, &mut pos)?; + if magic != MAGIC { + return Err(io::Error::new(io::ErrorKind::InvalidData, "bad PCIV magic")); + } + + let n_overflow = u32::from_le_bytes(read4(data, &mut pos)?); + let step = u32::from_le_bytes(read4(data, &mut pos)?); + + let mut index = Vec::new(); + if step > 0 { + let n_index = u32::from_le_bytes(read4(data, &mut pos)?) as usize; + index.reserve(n_index); + for _ in 0..n_index { + let slot = u32::from_le_bytes(read4(data, &mut pos)?); + let ipos = u32::from_le_bytes(read4(data, &mut pos)?); + index.push((slot, ipos)); + } + } + + Ok((n_overflow, step, index, pos)) +} + +fn read4(data: &[u8], pos: &mut usize) -> io::Result<[u8; 4]> { + data.get(*pos..*pos + 4) + .and_then(|s| s.try_into().ok()) + .map(|arr| { *pos += 4; arr }) + .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "truncated PCIV file")) +} diff --git a/src/obicompactvec/src/lib.rs b/src/obicompactvec/src/lib.rs new file mode 100644 index 0000000..e91eaa6 --- /dev/null +++ b/src/obicompactvec/src/lib.rs @@ -0,0 +1,10 @@ +mod builder; +mod format; +mod reader; + +pub use builder::PersistentCompactIntVecBuilder; +pub use reader::PersistentCompactIntVec; + +#[cfg(test)] +#[path = "tests/mod.rs"] +mod tests; diff --git a/src/obicompactvec/src/reader.rs b/src/obicompactvec/src/reader.rs new file mode 100644 index 0000000..02a6e94 --- /dev/null +++ b/src/obicompactvec/src/reader.rs @@ -0,0 +1,93 @@ +use std::fs::File; +use std::io; +use std::path::Path; + +use memmap2::Mmap; + +use crate::format::parse_overflow_header; + +pub struct PersistentCompactIntVec { + primary: Mmap, + index: Vec<(u32, u32)>, // (slot, pos) — L1-resident sparse index + data: Option, // mmap of overflow data region + n_overflow: u32, + pub step: u32, + data_offset: usize, // byte offset of data region within overflow mmap +} + +impl PersistentCompactIntVec { + /// Open a previously written `PersistentCompactIntVec`. + /// + /// `overflow_path` is optional: pass `None` if no overflow file was written + /// (i.e. all values were < 255). + pub fn open(primary_path: &Path, overflow_path: Option<&Path>) -> io::Result { + let primary = unsafe { Mmap::map(&File::open(primary_path)?)? }; + + let (data, n_overflow, step, index, data_offset) = match overflow_path { + None => (None, 0, 0, Vec::new(), 0), + Some(p) => { + let mmap = unsafe { Mmap::map(&File::open(p)?)? }; + let (n_overflow, step, index, data_offset) = + parse_overflow_header(&mmap)?; + (Some(mmap), n_overflow, step, index, data_offset) + } + }; + + Ok(Self { primary, index, data, n_overflow, step, data_offset }) + } + + pub fn len(&self) -> usize { + self.primary.len() + } + + pub fn is_empty(&self) -> bool { + self.primary.is_empty() + } + + pub fn get(&self, slot: u64) -> u32 { + match self.primary[slot as usize] { + 255 => self.overflow_get(slot as u32), + v => v as u32, + } + } + + fn overflow_get(&self, slot: u32) -> u32 { + let data = self.data.as_ref().expect("sentinel without overflow file"); + let raw = &data[self.data_offset..]; + + let pos_start; + let pos_end; + + if self.step == 0 { + pos_start = 0; + pos_end = self.n_overflow as usize; + } else { + // Binary search in the L1-resident sparse index. + let i = self.index.partition_point(|&(s, _)| s <= slot).saturating_sub(1); + pos_start = self.index[i].1 as usize; + pos_end = if i + 1 < self.index.len() { + self.index[i + 1].1 as usize + } else { + self.n_overflow as usize + }; + } + + // Binary search within the block. + let block = Self::data_slice(raw, pos_start, pos_end); + match block.binary_search_by_key(&slot, |&(s, _)| s) { + Ok(i) => block[i].1, + Err(_) => panic!("slot {slot} marked as overflow but not found in data"), + } + } + + /// Interpret a byte slice as a slice of (u32, u32) pairs without copying. + fn data_slice(raw: &[u8], from: usize, to: usize) -> &[(u32, u32)] { + let byte_start = from * 8; + let byte_end = to * 8; + let bytes = &raw[byte_start..byte_end]; + // Safety: the file was written as LE u32 pairs; alignment is guaranteed + // because we read from a byte slice and cast explicitly. + let ptr = bytes.as_ptr() as *const (u32, u32); + unsafe { std::slice::from_raw_parts(ptr, to - from) } + } +} diff --git a/src/obicompactvec/src/tests/mod.rs b/src/obicompactvec/src/tests/mod.rs new file mode 100644 index 0000000..581c809 --- /dev/null +++ b/src/obicompactvec/src/tests/mod.rs @@ -0,0 +1,119 @@ +use tempfile::tempdir; + +use crate::{PersistentCompactIntVec, PersistentCompactIntVecBuilder}; + +fn primary(dir: &std::path::Path) -> std::path::PathBuf { dir.join("primary.bin") } +fn overflow(dir: &std::path::Path) -> std::path::PathBuf { dir.join("overflow.bin") } + +fn roundtrip(values: &[(u64, u32)], n: usize) -> Vec { + let dir = tempdir().unwrap(); + let mut b = PersistentCompactIntVecBuilder::new(n); + for &(slot, v) in values { + b.set(slot, v); + } + let ov = overflow(dir.path()); + b.close(&primary(dir.path()), &ov).unwrap(); + + let ov_path = ov.exists().then_some(ov.as_path()); + let r = PersistentCompactIntVec::open(&primary(dir.path()), ov_path).unwrap(); + (0..n as u64).map(|s| r.get(s)).collect() +} + +#[test] +fn all_zero_by_default() { + let got = roundtrip(&[], 8); + assert!(got.iter().all(|&v| v == 0)); +} + +#[test] +fn small_values_no_overflow_file() { + let dir = tempdir().unwrap(); + let mut b = PersistentCompactIntVecBuilder::new(4); + b.set(0, 1); + b.set(1, 254); + b.set(2, 0); + b.set(3, 100); + let ov = overflow(dir.path()); + b.close(&primary(dir.path()), &ov).unwrap(); + assert!(!ov.exists(), "no overflow file expected"); + let r = PersistentCompactIntVec::open(&primary(dir.path()), None).unwrap(); + assert_eq!(r.get(0), 1); + assert_eq!(r.get(1), 254); + assert_eq!(r.get(2), 0); + assert_eq!(r.get(3), 100); +} + +#[test] +fn overflow_values_roundtrip() { + let values = [(0, 255), (1, 1000), (2, 50), (3, 1_313_691)]; + let got = roundtrip(&values, 4); + assert_eq!(got[0], 255); + assert_eq!(got[1], 1000); + assert_eq!(got[2], 50); + assert_eq!(got[3], 1_313_691); +} + +#[test] +fn mutation_downward_removes_from_overflow() { + let dir = tempdir().unwrap(); + let mut b = PersistentCompactIntVecBuilder::new(2); + b.set(0, 1000); // goes to overflow + b.set(0, 42); // comes back below threshold + let ov = overflow(dir.path()); + b.close(&primary(dir.path()), &ov).unwrap(); + assert!(!ov.exists(), "no overflow file expected after downward mutation"); + let r = PersistentCompactIntVec::open(&primary(dir.path()), None).unwrap(); + assert_eq!(r.get(0), 42); +} + +#[test] +fn mutation_upward_updates_overflow() { + let dir = tempdir().unwrap(); + let mut b = PersistentCompactIntVecBuilder::new(1); + b.set(0, 300); + b.set(0, 500); + let ov = overflow(dir.path()); + b.close(&primary(dir.path()), &ov).unwrap(); + let r = PersistentCompactIntVec::open(&primary(dir.path()), Some(&ov)).unwrap(); + assert_eq!(r.get(0), 500); +} + +#[test] +fn sparse_index_built_for_many_overflows() { + // Generate more than L1_INDEX_ENTRIES (4096) overflow entries to trigger the sparse index. + let n = 5000usize; + let dir = tempdir().unwrap(); + let mut b = PersistentCompactIntVecBuilder::new(n); + for i in 0..n { + b.set(i as u64, 1000 + i as u32); // all ≥ 255 + } + let ov = overflow(dir.path()); + b.close(&primary(dir.path()), &ov).unwrap(); + assert!(ov.exists()); + + let r = PersistentCompactIntVec::open(&primary(dir.path()), Some(&ov)).unwrap(); + assert!(r.step > 0, "sparse index should have been built"); + for i in 0..n { + assert_eq!(r.get(i as u64), 1000 + i as u32, "mismatch at slot {i}"); + } +} + +#[test] +fn mixed_large_dataset() { + // Mirrors realistic distribution: most values small, sparse overflows. + let n = 1000usize; + let dir = tempdir().unwrap(); + let mut b = PersistentCompactIntVecBuilder::new(n); + for i in 0..n { + let v = if i % 100 == 0 { 100_000 + i as u32 } else { i as u32 % 200 }; + b.set(i as u64, v); + } + let ov = overflow(dir.path()); + b.close(&primary(dir.path()), &ov).unwrap(); + + let r = PersistentCompactIntVec::open(&primary(dir.path()), ov.exists().then_some(ov.as_path())).unwrap(); + for i in 0..n { + let expected = if i % 100 == 0 { 100_000 + i as u32 } else { i as u32 % 200 }; + assert_eq!(r.get(i as u64), expected, "slot {i}"); + } +} diff --git a/src/obilayeredmap/Cargo.toml b/src/obilayeredmap/Cargo.toml index 137bb92..95d16a0 100644 --- a/src/obilayeredmap/Cargo.toml +++ b/src/obilayeredmap/Cargo.toml @@ -4,14 +4,15 @@ version = "0.1.0" edition = "2024" [dependencies] -obikseq = { path = "../obikseq" } -obiskio = { path = "../obiskio" } -ptr_hash = "1.1" -epserde = "0.8" -rayon = "1" -memmap2 = "0.9" -serde = { version = "1", features = ["derive"] } -serde_json = "1" +obikseq = { path = "../obikseq" } +obiskio = { path = "../obiskio" } +ptr_hash = "1.1" +cacheline-ef = "1.1" +epserde = "0.8" +rayon = "1" +memmap2 = "0.9" +serde = { version = "1", features = ["derive"] } +serde_json = "1" [dev-dependencies] tempfile = "3" diff --git a/src/obilayeredmap/src/layer.rs b/src/obilayeredmap/src/layer.rs index 507bb26..4c8c0ce 100644 --- a/src/obilayeredmap/src/layer.rs +++ b/src/obilayeredmap/src/layer.rs @@ -2,10 +2,11 @@ use std::collections::HashMap; use std::fs; use std::path::Path; +use cacheline_ef::{CachelineEf, CachelineEfVec}; use epserde::prelude::*; use obikseq::CanonicalKmer; use obiskio::{UnitigFileReader, UnitigFileWriter}; -use ptr_hash::{DefaultPtrHash, PtrHashParams}; +use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64}; use crate::counts::{Counts, CountsWriter}; use crate::error::{OLMError, OLMResult}; @@ -16,8 +17,10 @@ const UNITIGS_FILE: &str = "unitigs.bin"; const EVIDENCE_FILE: &str = "evidence.bin"; const COUNTS_FILE: &str = "counts.bin"; +type Mphf = PtrHash>, Xx64, Vec>; + pub struct Layer { - mphf: DefaultPtrHash, + mphf: Mphf, evidence: Evidence, unitigs: UnitigFileReader, counts: Counts, @@ -30,19 +33,14 @@ pub struct Hit { impl Layer { pub fn open(path: &Path) -> OLMResult { - let mphf: DefaultPtrHash = DefaultPtrHash::load_full(&path.join(MPHF_FILE)) + let mphf: Mphf = Mphf::load_full(&path.join(MPHF_FILE)) .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; let unitigs = UnitigFileReader::open(&path.join(UNITIGS_FILE))?; let evidence = Evidence::open(&path.join(EVIDENCE_FILE))?; let counts = Counts::open(&path.join(COUNTS_FILE))?; - Ok(Self { - mphf, - evidence, - unitigs, - counts, - }) + Ok(Self { mphf, evidence, unitigs, counts }) } pub fn query(&self, kmer: CanonicalKmer) -> Option { @@ -52,10 +50,7 @@ impl Layer { .unitigs .verify_canonical_kmer(chunk_id as usize, rank as usize, kmer) { - Some(Hit { - slot, - count: self.counts.get(slot), - }) + Some(Hit { slot, count: self.counts.get(slot) }) } else { None } @@ -74,28 +69,39 @@ impl Layer { if n == 0 { fs::File::create(out_dir.join(EVIDENCE_FILE))?; fs::File::create(out_dir.join(COUNTS_FILE))?; - let mphf: DefaultPtrHash = DefaultPtrHash::new(&[] as &[u64], PtrHashParams::default()); + let mphf: Mphf = Mphf::try_new(&[] as &[u64], PtrHashParams::::default()) + .ok_or_else(|| OLMError::Mphf("construction failed".into()))?; mphf.store(&out_dir.join(MPHF_FILE)) .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; return Ok(0); } - // Build MPHF from a cloneable parallel iterator — no Vec allocation. + // First pass: build the MPHF from a cloneable parallel iterator. // flat_map_iter: outer chunks in parallel, inner kmer sliding-window sequential. let keys = (0..unitigs.len()) .into_par_iter() .flat_map_iter(|ci| unitigs.unitig(ci).into_canonical_kmers().map(|km| km.raw())); - let mphf: DefaultPtrHash = - DefaultPtrHash::new_from_par_iter(n, keys, PtrHashParams::default()); + let mphf: Mphf = Mphf::new_from_par_iter(n, keys, PtrHashParams::::default()); mphf.store(&out_dir.join(MPHF_FILE)) .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; - // Second pass: fill evidence and counts - let mut ev = EvidenceWriter::new(n); - let mut cnt = CountsWriter::new(n); + // Second pass: fill evidence and counts; verify MPHF correctness inline. + // seen is a compact bitset (n/8 bytes) — no extra iteration needed. + let mut ev = EvidenceWriter::new(n); + let mut cnt = CountsWriter::new(n); + let mut seen = vec![0u8; (n + 7) / 8]; for (kmer, chunk_id, rank) in unitigs.iter_indexed_canonical_kmers() { let slot = mphf.index(&kmer.raw()); + if slot >= n { + return Err(OLMError::Mphf("MPHF construction failed: slot out of bounds".into())); + } + let byte = slot / 8; + let bit = 1u8 << (slot % 8); + if seen[byte] & bit != 0 { + return Err(OLMError::Mphf("MPHF construction failed: duplicate slot".into())); + } + seen[byte] |= bit; ev.set(slot, chunk_id as u32, rank as u8); cnt.set(slot, count_of(kmer)); }