diff --git a/docmd/architecture/index_architecture.md b/docmd/architecture/index_architecture.md new file mode 100644 index 0000000..102b92f --- /dev/null +++ b/docmd/architecture/index_architecture.md @@ -0,0 +1,179 @@ +# Kmer index architecture + +## Fundamental invariant + +A given canonical kmer belongs to **exactly one partition** and **exactly one layer** within that partition. This is the property that makes all aggregation operations decomposable and parallelisable without coordination. + +--- + +## Three-level hierarchy + +``` +PartitionedIndex +├── LayeredPartition (one per minimiser bucket) +│ ├── MphfLayer 0 kmer → slot (immutable bijection) +│ │ ├── DataStore A slot → T (e.g. counts) +│ │ └── DataStore B slot → T (e.g. presence/absence, derived) +│ ├── MphfLayer 1 +│ │ └── DataStore A +│ └── ... +├── LayeredPartition +│ └── ... +``` + +**PartitionedIndex**: routes queries to partitions via canonical minimiser hash. Owns the partition count and routing scheme (fixed at creation). Dispatches aggregations across partitions in parallel. + +**LayeredPartition**: one directory per minimiser bucket. Holds a `Vec`. Each layer covers a disjoint kmer set — layer 0 is built from dataset A; layer 1 covers kmers in B absent from layer 0; and so on. Layers within a partition are always disjoint. + +**MphfLayer**: the MPHF + evidence + unitig spine. Maps `kmer → slot` for its disjoint kmer set. Immutable once built. Independent of any data attached to it. + +**DataStore**: a slot-indexed data array (e.g. `PersistentCompactIntMatrix`, `PersistentBitMatrix`). Attached to a `MphfLayer` externally. Multiple stores of different types can coexist on the same `MphfLayer`. + +--- + +## MphfLayer — autonomous mapping layer + +```rust +MphfLayer::find(kmer: CanonicalKmer) -> Option // slot, or None if absent +MphfLayer::n() -> usize // number of slots +MphfLayer::build(dir: &Path) -> OLMResult<(Self, usize)> // from unitigs.bin +MphfLayer::open(dir: &Path) -> OLMResult +``` + +`find` returns `Some(slot)` only if the kmer is actually in this layer (evidence check included). Returns `None` for kmers present in other layers or absent from the index. + +The MPHF (`mphf.bin`, `evidence.bin`, `unitigs.bin`) is built once and never rebuilt. All data derivation operations (count → presence, thresholding, merging) reuse the same `MphfLayer`. + +--- + +## DataStore — slot-indexed data + +```rust +trait DataStore { + type Item; + fn get(&self, slot: usize) -> Self::Item; + fn n(&self) -> usize; +} +``` + +Concrete types from `obicompactvec`: + +| Type | `Item` | Use | +|---|---|---| +| `PersistentCompactIntMatrix` | `Box<[u32]>` | count per sample per slot | +| `PersistentBitMatrix` | `Box<[bool]>` | presence per sample per slot | + +A `DataStore` knows nothing about kmers or MPHFs. It is indexed by `usize` slot only. The path to its on-disk files is managed by the `LayeredPartition`, not embedded in the store type. + +--- + +## Query model + +### Point query — `kmer → Option` + +``` +minimiser(kmer) → partition p +for each layer l in p: + slot = MphfLayer_l.find(kmer) + if slot is Some: + return DataStore_l.get(slot) +return None +``` + +O(n_layers) MPHF probes in the worst case; O(1) expected (kmer in layer 0). No cross-layer data fusion — the result comes from exactly one layer. + +### Sequence scan — `sequence → Vec<(kmer, Option)>` + +Decompose into canonical kmers, group by partition, dispatch to each partition in parallel. Within a partition, probe layers in order per kmer. Collect results. + +Parallelism: across partitions (independent). Within a partition: per-kmer probing is sequential across layers but different kmers are independent. + +### Aggregation — `→ Accumulator` + +For operations that traverse all kmers (distance, presence matrix, global counts): + +``` +result = reduce( + for p in partitions: // parallel + for l in layers(p): // parallel + partial(DataStore_p_l) +) +``` + +Each `(partition, layer)` contributes an independent `Partial`. Global result = `reduce(all partials)`. + +--- + +## Aggregator pattern + +```rust +trait Aggregator { + type Partial: Send; + type Result; + fn partial(&self, store: &D) -> Self::Partial; + fn reduce(&self, parts: impl Iterator) -> Self::Result; +} +``` + +Concrete aggregators: + +| Aggregator | `Partial` | `Result` | +|---|---|---| +| `BrayCurtis(i, j)` | `(sum_min, sum_a, sum_b): (u64, u64, u64)` | `f64` | +| `Jaccard(i, j)` | `(inter, union): (u64, u64)` | `f64` | +| `Hellinger(i, j)` | `(sum_sqrt_prod, sum_a, sum_b): (f64, f64, f64)` | `f64` | +| `DistanceMatrix(metric)` | `n×n partial matrix` | `n×n f64 matrix` | +| `PresenceQuery(kmer)` | — | routed to point query | + +The `partial` for `BrayCurtis(i, j)` on a `PersistentCompactIntMatrix` with columns i and j already exists as `PersistentCompactIntVec::partial_bray_dist` — it needs to be lifted to the column-pair level on the matrix. + +--- + +## Parallelism model + +| Level | Unit | Coordination | +|---|---|---| +| Across partitions | `LayeredPartition` | none — fully independent | +| Across layers (aggregation) | `(partition, layer)` pair | none — disjoint kmer sets | +| Within a layer (point query) | n/a — single layer per kmer | n/a | +| DataStore derivation | one `(partition, layer)` per task | none | + +The dispatch model: `PartitionedIndex::aggregate(aggregator)` fans out over partitions (rayon `par_iter`), each partition fans out over its layers, collects partials, then a top-level `reduce` combines. + +--- + +## DataStore derivation + +Because the `MphfLayer` is independent of its data stores, new stores can be derived from existing ones without rebuilding the MPHF: + +``` +// count → presence/absence, parallel across (partition, layer) +for (p, l) in all_partition_layer_pairs().par_iter(): + count_store = open PersistentCompactIntMatrix at (p, l) + presence_store = PersistentBitMatrix::from_count_matrix(count_store, threshold, dir) + attach presence_store to MphfLayer(p, l) +``` + +Other derivations: +- Threshold a count matrix → binary presence matrix +- Union two presence matrices (same MPHF, different samples) +- Merge two count matrices (saturating add, column-wise) + +All derivations are local to a `(partition, layer)` pair and fully parallelisable. + +--- + +## Relationship to current implementation + +The current `obilayeredmap` crate implements a subset of this architecture. Key divergences: + +- `Layer` fuses `MphfLayer` and one `DataStore` into a single generic type. Multiple data stores on the same MPHF are not supported. +- `LayerData::open(dir)` embeds the path convention (`counts/`, `presence/`) inside the store type, preventing the `PartitionedIndex` from managing paths externally. +- The `Aggregator` pattern is not yet implemented; partial distance methods exist on `PersistentCompactIntVec` but are not composed across layers and partitions. +- No `PartitionedIndex` type exists; `LayeredMap` is a single-partition structure. + +Planned refactoring: +1. Extract `MphfLayer` from `Layer` as an autonomous type. +2. Replace `LayerData` trait with `DataStore` trait (no path knowledge). +3. Implement `LayeredPartition` that holds `Vec` and attaches data stores externally. +4. Implement `PartitionedIndex` with parallel dispatch and the `Aggregator` pattern. diff --git a/docmd/implementation/obilayeredmap.md b/docmd/implementation/obilayeredmap.md index 5568608..fd4f191 100644 --- a/docmd/implementation/obilayeredmap.md +++ b/docmd/implementation/obilayeredmap.md @@ -246,6 +246,12 @@ Each partition's new layer is built independently; the operation is fully parall --- +## Relationship to target architecture + +The target architecture (see [Kmer index architecture](../architecture/index_architecture.md)) separates `MphfLayer` from data stores entirely and introduces a `PartitionedIndex` with parallel dispatch and an `Aggregator` pattern. The current implementation is a stepping stone: `obicompactvec` types are already fully decoupled from the MPHF; the remaining refactoring is within `obilayeredmap` itself. + +--- + ## Open questions - **Mode 4**: count matrix (n_kmers × n_genomes × bytes_per_count) is structurally identical to mode 3 but uses `PersistentCompactIntMatrix` with G columns. Build API not yet implemented. Scale concern: hundreds of GB for large collections — a sparse representation may be required at high genome counts. diff --git a/mkdocs.yml b/mkdocs.yml index 43fd737..af78836 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -49,6 +49,7 @@ nav: - PersistentBitVec: implementation/persistent_bit_vec.md - Architecture: - Sequences: architecture/sequences/invariant.md + - Kmer index: architecture/index_architecture.md watch: - docmd diff --git a/src/obicompactvec/src/bitvec.rs b/src/obicompactvec/src/bitvec.rs index 4afdc5f..cfc26aa 100644 --- a/src/obicompactvec/src/bitvec.rs +++ b/src/obicompactvec/src/bitvec.rs @@ -14,16 +14,20 @@ const MAGIC: [u8; 4] = *b"PBIV"; const HEADER_SIZE: usize = 16; #[inline] -fn n_words(n: usize) -> usize { n.div_ceil(64) } +fn n_words(n: usize) -> usize { + n.div_ceil(64) +} #[inline] -fn n_bytes_for_words(n: usize) -> usize { n_words(n) * 8 } +fn n_bytes_for_words(n: usize) -> usize { + n_words(n) * 8 +} // ── Reader ──────────────────────────────────────────────────────────────────── pub struct PersistentBitVec { mmap: Mmap, - n: usize, + n: usize, path: PathBuf, } @@ -31,18 +35,31 @@ impl PersistentBitVec { pub fn open(path: &Path) -> io::Result { let mmap = unsafe { Mmap::map(&File::open(path)?)? }; if mmap.len() < HEADER_SIZE { - return Err(io::Error::new(io::ErrorKind::InvalidData, "PBIV file too short")); + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "PBIV file too short", + )); } if &mmap[0..4] != &MAGIC { return Err(io::Error::new(io::ErrorKind::InvalidData, "bad PBIV magic")); } let n = u64::from_le_bytes(mmap[8..16].try_into().unwrap()) as usize; - Ok(Self { mmap, n, path: path.to_path_buf() }) + Ok(Self { + mmap, + n, + path: path.to_path_buf(), + }) } - pub fn path(&self) -> &Path { &self.path } - pub fn len(&self) -> usize { self.n } - pub fn is_empty(&self) -> bool { self.n == 0 } + pub fn path(&self) -> &Path { + &self.path + } + pub fn len(&self) -> usize { + self.n + } + pub fn is_empty(&self) -> bool { + self.n == 0 + } pub fn get(&self, slot: usize) -> bool { (self.mmap[HEADER_SIZE + (slot >> 3)] >> (slot & 7)) & 1 != 0 @@ -56,14 +73,17 @@ impl PersistentBitVec { // Bulk word view. SAFETY: mmap is page-aligned, HEADER_SIZE=16 is divisible by 8, // so &mmap[HEADER_SIZE] is u64-aligned. Slice length is n_words * 8 bytes. fn data_words(&self) -> &[u64] { - let nw = n_words(self.n); + let nw = n_words(self.n); let ptr = self.mmap[HEADER_SIZE..].as_ptr() as *const u64; unsafe { std::slice::from_raw_parts(ptr, nw) } } pub fn count_ones(&self) -> u64 { // Padding bits in the last word are 0, so no masking needed. - self.data_words().iter().map(|w| w.count_ones() as u64).sum() + self.data_words() + .iter() + .map(|w| w.count_ones() as u64) + .sum() } pub fn count_zeros(&self) -> u64 { @@ -71,37 +91,56 @@ impl PersistentBitVec { } pub fn jaccard_dist(&self, other: &PersistentBitVec) -> f64 { - assert_eq!(self.n, other.n, "length mismatch"); - let (inter, union) = self.data_words().iter().zip(other.data_words()).fold( - (0u64, 0u64), - |(i, u), (&a, &b)| (i + (a & b).count_ones() as u64, u + (a | b).count_ones() as u64), - ); - if union == 0 { return 0.0; } + let (inter, union) = self.partial_jaccard_dist(other); + if union == 0 { + return 0.0; + } 1.0 - inter as f64 / union as f64 } + pub fn partial_jaccard_dist(&self, other: &PersistentBitVec) -> (u64, u64) { + assert_eq!(self.n, other.n, "length mismatch"); + self.data_words() + .iter() + .zip(other.data_words()) + .fold((0u64, 0u64), |(i, u), (&a, &b)| { + ( + i + (a & b).count_ones() as u64, + u + (a | b).count_ones() as u64, + ) + }) + } + pub fn hamming_dist(&self, other: &PersistentBitVec) -> u64 { assert_eq!(self.n, other.n, "length mismatch"); - self.data_words().iter().zip(other.data_words()) + self.data_words() + .iter() + .zip(other.data_words()) .map(|(&a, &b)| (a ^ b).count_ones() as u64) .sum() } pub fn iter(&self) -> BitIter<'_> { - BitIter { bytes: self.data_bytes(), slot: 0, n: self.n } + BitIter { + bytes: self.data_bytes(), + slot: 0, + n: self.n, + } } } impl<'a> IntoIterator for &'a PersistentBitVec { type Item = bool; type IntoIter = BitIter<'a>; - fn into_iter(self) -> BitIter<'a> { self.iter() } + fn into_iter(self) -> BitIter<'a> { + self.iter() + } } pub struct BitIter<'a> { bytes: &'a [u8], - slot: usize, - n: usize, + slot: usize, + n: usize, } impl ExactSizeIterator for BitIter<'_> {} @@ -110,7 +149,9 @@ impl Iterator for BitIter<'_> { type Item = bool; fn next(&mut self) -> Option { - if self.slot >= self.n { return None; } + if self.slot >= self.n { + return None; + } let v = (self.bytes[self.slot >> 3] >> (self.slot & 7)) & 1 != 0; self.slot += 1; Some(v) @@ -126,17 +167,20 @@ impl Iterator for BitIter<'_> { pub struct PersistentBitVecBuilder { mmap: MmapMut, - n: usize, + n: usize, } impl PersistentBitVecBuilder { pub fn new(n: usize, path: &Path) -> io::Result { let file_size = HEADER_SIZE + n_bytes_for_words(n); let mut file = OpenOptions::new() - .read(true).write(true).create(true).truncate(true) + .read(true) + .write(true) + .create(true) + .truncate(true) .open(path)?; file.write_all(&MAGIC)?; - file.write_all(&[0u8; 4])?; // padding + file.write_all(&[0u8; 4])?; // padding file.write_all(&(n as u64).to_le_bytes())?; file.seek(SeekFrom::Start(0))?; file.set_len(file_size as u64)?; @@ -152,8 +196,12 @@ impl PersistentBitVecBuilder { Ok(Self { mmap, n }) } - pub fn len(&self) -> usize { self.n } - pub fn is_empty(&self) -> bool { self.n == 0 } + pub fn len(&self) -> usize { + self.n + } + pub fn is_empty(&self) -> bool { + self.n == 0 + } pub fn get(&self, slot: usize) -> bool { (self.mmap[HEADER_SIZE + (slot >> 3)] >> (slot & 7)) & 1 != 0 @@ -161,53 +209,70 @@ impl PersistentBitVecBuilder { pub fn set(&mut self, slot: usize, value: bool) { let byte = HEADER_SIZE + (slot >> 3); - let bit = 1u8 << (slot & 7); - if value { self.mmap[byte] |= bit; } else { self.mmap[byte] &= !bit; } + let bit = 1u8 << (slot & 7); + if value { + self.mmap[byte] |= bit; + } else { + self.mmap[byte] &= !bit; + } } // SAFETY: same alignment argument as PersistentBitVec::data_words. fn data_words_mut(&mut self) -> &mut [u64] { - let nw = n_words(self.n); + let nw = n_words(self.n); let ptr = self.mmap[HEADER_SIZE..].as_mut_ptr() as *mut u64; unsafe { std::slice::from_raw_parts_mut(ptr, nw) } } pub fn and(&mut self, other: &PersistentBitVec) { assert_eq!(self.n, other.n, "length mismatch"); - for (sw, &ow) in self.data_words_mut().iter_mut().zip(other.data_words()) { *sw &= ow; } + for (sw, &ow) in self.data_words_mut().iter_mut().zip(other.data_words()) { + *sw &= ow; + } } pub fn or(&mut self, other: &PersistentBitVec) { assert_eq!(self.n, other.n, "length mismatch"); - for (sw, &ow) in self.data_words_mut().iter_mut().zip(other.data_words()) { *sw |= ow; } + for (sw, &ow) in self.data_words_mut().iter_mut().zip(other.data_words()) { + *sw |= ow; + } } pub fn xor(&mut self, other: &PersistentBitVec) { assert_eq!(self.n, other.n, "length mismatch"); - for (sw, &ow) in self.data_words_mut().iter_mut().zip(other.data_words()) { *sw ^= ow; } + for (sw, &ow) in self.data_words_mut().iter_mut().zip(other.data_words()) { + *sw ^= ow; + } } pub fn not(&mut self) { let rem = self.n % 64; let words = self.data_words_mut(); - for w in words.iter_mut() { *w ^= u64::MAX; } + for w in words.iter_mut() { + *w ^= u64::MAX; + } // Zero padding bits in the last word so count_ones / jaccard remain correct. if rem != 0 { - if let Some(last) = words.last_mut() { *last &= (1u64 << rem) - 1; } + if let Some(last) = words.last_mut() { + *last &= (1u64 << rem) - 1; + } } } /// Convert a count vector to a bit vector: bit set iff count >= threshold. /// Fills u64 words directly from the count iterator — O(n), no bit-level set() overhead. pub fn build_from_counts( - source: &PersistentCompactIntVec, + source: &PersistentCompactIntVec, threshold: u32, - path: &Path, + path: &Path, ) -> io::Result { let n = source.len(); let file_size = HEADER_SIZE + n_bytes_for_words(n); let mut file = OpenOptions::new() - .read(true).write(true).create(true).truncate(true) + .read(true) + .write(true) + .create(true) + .truncate(true) .open(path)?; file.write_all(&MAGIC)?; file.write_all(&[0u8; 4])?; @@ -217,7 +282,7 @@ impl PersistentBitVecBuilder { let mut mmap = unsafe { MmapMut::map_mut(&file)? }; { - let nw = n_words(n); + let nw = n_words(n); let ptr = mmap[HEADER_SIZE..].as_mut_ptr() as *mut u64; let words = unsafe { std::slice::from_raw_parts_mut(ptr, nw) }; for (slot, count) in source.iter().enumerate() {