From 0733287de5a4fb449db00fd3d2ce732e213c7ec8 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 13 May 2026 10:47:39 +0800 Subject: [PATCH] feat(obicompactvec): migrate to memory-mapped file storage Refactor `PersistentCompactIntVecBuilder` to replace in-memory `Vec` with `MmapMut` for primary storage. Decouples initialization from finalization by updating `finalize_pciv` to truncate, append overflow data, and overwrite the header placeholder. Adds file path tracking via a new `path()` accessor, implements `build_from` for efficient copying, and introduces arithmetic/set operations (`min`, `max`, `sum`, `diff`). Expands test coverage for persistence roundtrips, mutations, iteration, and the new vector operations. --- src/obicompactvec/src/builder.rs | 114 ++++++++++++++++++++++------- src/obicompactvec/src/format.rs | 29 +++----- src/obicompactvec/src/reader.rs | 19 ++++- src/obicompactvec/src/tests/mod.rs | 111 +++++++++++++++++++++++----- 4 files changed, 208 insertions(+), 65 deletions(-) diff --git a/src/obicompactvec/src/builder.rs b/src/obicompactvec/src/builder.rs index 0e957b5..831777f 100644 --- a/src/obicompactvec/src/builder.rs +++ b/src/obicompactvec/src/builder.rs @@ -1,56 +1,118 @@ use std::collections::HashMap; +use std::fs::{self, OpenOptions}; use std::io; -use std::path::Path; +use std::path::{Path, PathBuf}; -use crate::format::write_pciv; +use memmap2::MmapMut; + +use crate::format::{finalize_pciv, HEADER_SIZE}; +use crate::reader::PersistentCompactIntVec; pub struct PersistentCompactIntVecBuilder { - primary: Vec, + path: PathBuf, + mmap: MmapMut, + n: usize, overflow: HashMap, } impl PersistentCompactIntVecBuilder { - pub fn new(n: usize) -> Self { - Self { - primary: vec![0u8; n], - overflow: HashMap::new(), - } + /// Create a new, zero-filled PCIV at `path`. Primary is mmapped immediately. + pub fn new(n: usize, path: &Path) -> io::Result { + let file = OpenOptions::new().read(true).write(true).create(true).truncate(true).open(path)?; + file.set_len((HEADER_SIZE + n) as u64)?; + let mmap = unsafe { MmapMut::map_mut(&file)? }; + Ok(Self { path: path.to_path_buf(), mmap, n, overflow: HashMap::new() }) } - pub fn set(&mut self, slot: u64, value: u32) { - if value < 255 { - self.primary[slot as usize] = value as u8; - self.overflow.remove(&slot); - } else { - self.primary[slot as usize] = 255; - self.overflow.insert(slot, value); + /// Copy `source`'s file to `path`, mmap the primary section, load overflow into RAM. + /// Avoids iterating all n slots: the file copy is OS-level, overflow loading is O(n_overflow). + pub fn build_from(source: &PersistentCompactIntVec, path: &Path) -> io::Result { + fs::copy(source.path(), path)?; + + let file = OpenOptions::new().read(true).write(true).open(path)?; + let mmap = unsafe { MmapMut::map_mut(&file)? }; + + let n = source.len(); + let n_overflow = u32::from_le_bytes(mmap[12..16].try_into().unwrap()) as usize; + let data_offset = HEADER_SIZE + n; + + let mut overflow = HashMap::with_capacity(n_overflow); + for i in 0..n_overflow { + let off = data_offset + i * 8; + let slot = u32::from_le_bytes(mmap[off..off + 4].try_into().unwrap()) as u64; + let value = u32::from_le_bytes(mmap[off + 4..off + 8].try_into().unwrap()); + overflow.insert(slot, value); } + + Ok(Self { path: path.to_path_buf(), mmap, n, overflow }) } pub fn get(&self, slot: u64) -> u32 { - match self.primary[slot as usize] { + match self.mmap[HEADER_SIZE + slot as usize] { 255 => *self.overflow.get(&slot).expect("sentinel without overflow entry"), v => v as u32, } } - pub fn len(&self) -> usize { - self.primary.len() + pub fn set(&mut self, slot: u64, value: u32) { + if value < 255 { + self.mmap[HEADER_SIZE + slot as usize] = value as u8; + self.overflow.remove(&slot); + } else { + self.mmap[HEADER_SIZE + slot as usize] = 255; + self.overflow.insert(slot, value); + } } - pub fn is_empty(&self) -> bool { - self.primary.is_empty() + pub fn len(&self) -> usize { self.n } + + pub fn is_empty(&self) -> bool { self.n == 0 } + + pub fn min(&mut self, other: &PersistentCompactIntVec) { + assert_eq!(self.n, other.len(), "length mismatch"); + for (slot, other_val) in other.iter().enumerate() { + if other_val < self.get(slot as u64) { + self.set(slot as u64, other_val); + } + } } - /// Finalise and write a single PCIV file, then drop all in-memory state. - pub fn close(self, path: &Path) -> io::Result<()> { - let mut entries: Vec<(u32, u32)> = self - .overflow + pub fn max(&mut self, other: &PersistentCompactIntVec) { + assert_eq!(self.n, other.len(), "length mismatch"); + for (slot, other_val) in other.iter().enumerate() { + if other_val > self.get(slot as u64) { + self.set(slot as u64, other_val); + } + } + } + + pub fn sum(&mut self, other: &PersistentCompactIntVec) { + assert_eq!(self.n, other.len(), "length mismatch"); + for (slot, other_val) in other.iter().enumerate() { + let cur = self.get(slot as u64); + self.set(slot as u64, cur.checked_add(other_val).expect("u32 overflow in sum")); + } + } + + pub fn diff(&mut self, other: &PersistentCompactIntVec) { + assert_eq!(self.n, other.len(), "length mismatch"); + for (slot, other_val) in other.iter().enumerate() { + self.set(slot as u64, self.get(slot as u64).saturating_sub(other_val)); + } + } + + /// Flush the primary mmap, then write sorted overflow data + index and fix the header. + pub fn close(self) -> io::Result<()> { + self.mmap.flush()?; + let Self { path, mmap, n, overflow } = self; + drop(mmap); + + let mut entries: Vec<(u32, u32)> = overflow .into_iter() - .map(|(slot, value)| (slot as u32, value)) + .map(|(slot, val)| (slot as u32, val)) .collect(); entries.sort_unstable_by_key(|&(slot, _)| slot); - write_pciv(path, &self.primary, &entries) + finalize_pciv(&path, n, &entries) } } diff --git a/src/obicompactvec/src/format.rs b/src/obicompactvec/src/format.rs index 8cdcc36..4931c7c 100644 --- a/src/obicompactvec/src/format.rs +++ b/src/obicompactvec/src/format.rs @@ -1,4 +1,4 @@ -use std::fs::File; +use std::fs::OpenOptions; use std::io::{self, BufWriter, Seek, SeekFrom, Write as _}; use std::path::Path; @@ -8,32 +8,26 @@ pub const HEADER_SIZE: usize = 24; // magic(4) + n(8) + n_overflow(4) + step(4) // Sparse index target: fits in 32 KB L1 cache / 8 bytes per entry. pub const L1_INDEX_ENTRIES: usize = 4096; -/// Write a single PCIV file. +/// Finalise a PCIV file whose placeholder header and primary section are already on disk. /// -/// Layout (write order = computation order): -/// header 24 B placeholder, overwritten at the end -/// primary n B u8 per slot (255 = overflow sentinel) -/// data n_overflow × 8 B (slot: u32, value: u32) sorted by slot -/// index n_index × 8 B (slot: u32, pos: u32) sparse index -pub fn write_pciv(path: &Path, primary: &[u8], entries: &[(u32, u32)]) -> io::Result<()> { - let n = primary.len(); +/// Truncates the file to `HEADER_SIZE + n`, then appends: +/// data n_overflow × 8 B (slot: u32, value: u32) sorted by slot +/// index n_index × 8 B (slot: u32, pos: u32) sparse index +/// and overwrites the header placeholder at offset 0. +pub fn finalize_pciv(path: &Path, n: usize, entries: &[(u32, u32)]) -> io::Result<()> { let n_overflow = entries.len(); - let mut w = BufWriter::new(File::create(path)?); + let file = OpenOptions::new().read(true).write(true).open(path)?; + file.set_len((HEADER_SIZE + n) as u64)?; - // Placeholder header — will be overwritten at the end. - w.write_all(&[0u8; HEADER_SIZE])?; + let mut w = BufWriter::new(file); + w.seek(SeekFrom::End(0))?; - // Primary array. - w.write_all(primary)?; - - // Overflow data (sorted by slot). for &(slot, value) in entries { w.write_all(&slot.to_le_bytes())?; w.write_all(&value.to_le_bytes())?; } - // Sparse index (computed now that n_overflow is known). let step: u32 = if n_overflow <= L1_INDEX_ENTRIES { 0 } else { @@ -53,7 +47,6 @@ pub fn write_pciv(path: &Path, primary: &[u8], entries: &[(u32, u32)]) -> io::Re 0 }; - // Flush, then seek back to write the real header. w.flush()?; let mut file = w.into_inner().map_err(|e| e.into_error())?; file.seek(SeekFrom::Start(0))?; diff --git a/src/obicompactvec/src/reader.rs b/src/obicompactvec/src/reader.rs index f4773cf..707707e 100644 --- a/src/obicompactvec/src/reader.rs +++ b/src/obicompactvec/src/reader.rs @@ -1,6 +1,6 @@ use std::fs::File; use std::io; -use std::path::Path; +use std::path::{Path, PathBuf}; use memmap2::Mmap; @@ -14,6 +14,7 @@ pub struct PersistentCompactIntVec { index: Vec<(u32, u32)>, // (slot, pos) — L1-resident sparse index primary_offset: usize, // = HEADER_SIZE data_offset: usize, // = HEADER_SIZE + n + path: PathBuf, } impl PersistentCompactIntVec { @@ -36,7 +37,6 @@ impl PersistentCompactIntVec { let data_offset = primary_offset + n; let index_offset = data_offset + n_overflow * 8; - // Load the sparse index into RAM (L1-resident at query time). let mut index = Vec::with_capacity(n_index); for i in 0..n_index { let off = index_offset + i * 8; @@ -45,7 +45,20 @@ impl PersistentCompactIntVec { index.push((slot, pos)); } - Ok(Self { mmap, n, n_overflow, step, index, primary_offset, data_offset }) + Ok(Self { + mmap, + n, + n_overflow, + step, + index, + primary_offset, + data_offset, + path: path.to_path_buf(), + }) + } + + pub fn path(&self) -> &Path { + &self.path } pub fn len(&self) -> usize { diff --git a/src/obicompactvec/src/tests/mod.rs b/src/obicompactvec/src/tests/mod.rs index 9f2ba94..3879dda 100644 --- a/src/obicompactvec/src/tests/mod.rs +++ b/src/obicompactvec/src/tests/mod.rs @@ -5,15 +5,27 @@ use crate::{PersistentCompactIntVec, PersistentCompactIntVecBuilder}; fn roundtrip(values: &[(u64, u32)], n: usize) -> Vec { let dir = tempdir().unwrap(); let path = dir.path().join("test.pciv"); - let mut b = PersistentCompactIntVecBuilder::new(n); + let mut b = PersistentCompactIntVecBuilder::new(n, &path).unwrap(); for &(slot, v) in values { b.set(slot, v); } - b.close(&path).unwrap(); + b.close().unwrap(); let r = PersistentCompactIntVec::open(&path).unwrap(); (0..n as u64).map(|s| r.get(s)).collect() } +fn make_pciv(values: &[u32]) -> (tempfile::TempDir, PersistentCompactIntVec) { + let dir = tempdir().unwrap(); + let path = dir.path().join("v.pciv"); + let mut b = PersistentCompactIntVecBuilder::new(values.len(), &path).unwrap(); + for (i, &v) in values.iter().enumerate() { + b.set(i as u64, v); + } + b.close().unwrap(); + let r = PersistentCompactIntVec::open(&path).unwrap(); + (dir, r) +} + #[test] fn all_zero_by_default() { let got = roundtrip(&[], 8); @@ -24,12 +36,12 @@ fn all_zero_by_default() { fn small_values_no_overflow() { let dir = tempdir().unwrap(); let path = dir.path().join("test.pciv"); - let mut b = PersistentCompactIntVecBuilder::new(4); + let mut b = PersistentCompactIntVecBuilder::new(4, &path).unwrap(); b.set(0, 1); b.set(1, 254); b.set(2, 0); b.set(3, 100); - b.close(&path).unwrap(); + b.close().unwrap(); let r = PersistentCompactIntVec::open(&path).unwrap(); assert_eq!(r.get(0), 1); assert_eq!(r.get(1), 254); @@ -51,10 +63,10 @@ fn overflow_values_roundtrip() { fn mutation_downward_removes_from_overflow() { let dir = tempdir().unwrap(); let path = dir.path().join("test.pciv"); - let mut b = PersistentCompactIntVecBuilder::new(2); + let mut b = PersistentCompactIntVecBuilder::new(2, &path).unwrap(); b.set(0, 1000); b.set(0, 42); - b.close(&path).unwrap(); + b.close().unwrap(); let r = PersistentCompactIntVec::open(&path).unwrap(); assert_eq!(r.get(0), 42); assert_eq!(r.step, 0, "no overflow entries expected"); @@ -64,10 +76,10 @@ fn mutation_downward_removes_from_overflow() { fn mutation_upward_updates_overflow() { let dir = tempdir().unwrap(); let path = dir.path().join("test.pciv"); - let mut b = PersistentCompactIntVecBuilder::new(1); + let mut b = PersistentCompactIntVecBuilder::new(1, &path).unwrap(); b.set(0, 300); b.set(0, 500); - b.close(&path).unwrap(); + b.close().unwrap(); let r = PersistentCompactIntVec::open(&path).unwrap(); assert_eq!(r.get(0), 500); } @@ -77,11 +89,11 @@ fn sparse_index_built_for_many_overflows() { let n = 5000usize; let dir = tempdir().unwrap(); let path = dir.path().join("test.pciv"); - let mut b = PersistentCompactIntVecBuilder::new(n); + let mut b = PersistentCompactIntVecBuilder::new(n, &path).unwrap(); for i in 0..n { b.set(i as u64, 1000 + i as u32); } - b.close(&path).unwrap(); + b.close().unwrap(); let r = PersistentCompactIntVec::open(&path).unwrap(); assert!(r.step > 0, "sparse index should have been built"); for i in 0..n { @@ -95,11 +107,11 @@ fn iter_matches_get() { let n = 6; let dir = tempdir().unwrap(); let path = dir.path().join("test.pciv"); - let mut b = PersistentCompactIntVecBuilder::new(n); + let mut b = PersistentCompactIntVecBuilder::new(n, &path).unwrap(); for &(slot, v) in &values { b.set(slot, v); } - b.close(&path).unwrap(); + b.close().unwrap(); let r = PersistentCompactIntVec::open(&path).unwrap(); let via_iter: Vec = r.iter().collect(); @@ -111,9 +123,9 @@ fn iter_matches_get() { fn iter_size_hint_exact() { let dir = tempdir().unwrap(); let path = dir.path().join("test.pciv"); - let mut b = PersistentCompactIntVecBuilder::new(5); + let mut b = PersistentCompactIntVecBuilder::new(5, &path).unwrap(); b.set(2, 1000); - b.close(&path).unwrap(); + b.close().unwrap(); let r = PersistentCompactIntVec::open(&path).unwrap(); let mut it = r.iter(); assert_eq!(it.len(), 5); @@ -125,27 +137,90 @@ fn iter_size_hint_exact() { fn into_iter_for_ref() { let dir = tempdir().unwrap(); let path = dir.path().join("test.pciv"); - let mut b = PersistentCompactIntVecBuilder::new(3); + let mut b = PersistentCompactIntVecBuilder::new(3, &path).unwrap(); b.set(0, 10); b.set(1, 500); b.set(2, 30); - b.close(&path).unwrap(); + b.close().unwrap(); let r = PersistentCompactIntVec::open(&path).unwrap(); let collected: Vec = (&r).into_iter().collect(); assert_eq!(collected, vec![10, 500, 30]); } +#[test] +fn build_from_roundtrip() { + let a = vec![0u32, 50, 255, 1000, 7, 1_313_691]; + let (_dir_a, ra) = make_pciv(&a); + let dir_b = tempdir().unwrap(); + let path_b = dir_b.path().join("b.pciv"); + PersistentCompactIntVecBuilder::build_from(&ra, &path_b).unwrap().close().unwrap(); + let rb = PersistentCompactIntVec::open(&path_b).unwrap(); + assert_eq!(ra.iter().collect::>(), rb.iter().collect::>()); +} + +#[test] +fn combine_min() { + let (_da, ra) = make_pciv(&[10, 300, 0, 1000]); + let (_db, rb) = make_pciv(&[20, 100, 500, 800]); + let dir = tempdir().unwrap(); + let path = dir.path().join("out.pciv"); + let mut b = PersistentCompactIntVecBuilder::build_from(&ra, &path).unwrap(); + b.min(&rb); + b.close().unwrap(); + let r = PersistentCompactIntVec::open(&path).unwrap(); + assert_eq!(r.iter().collect::>(), vec![10, 100, 0, 800]); +} + +#[test] +fn combine_max() { + let (_da, ra) = make_pciv(&[10, 300, 0, 1000]); + let (_db, rb) = make_pciv(&[20, 100, 500, 800]); + let dir = tempdir().unwrap(); + let path = dir.path().join("out.pciv"); + let mut b = PersistentCompactIntVecBuilder::build_from(&ra, &path).unwrap(); + b.max(&rb); + b.close().unwrap(); + let r = PersistentCompactIntVec::open(&path).unwrap(); + assert_eq!(r.iter().collect::>(), vec![20, 300, 500, 1000]); +} + +#[test] +fn combine_sum() { + let (_da, ra) = make_pciv(&[10, 200, 0, 100]); + let (_db, rb) = make_pciv(&[20, 100, 5, 1]); + let dir = tempdir().unwrap(); + let path = dir.path().join("out.pciv"); + let mut b = PersistentCompactIntVecBuilder::build_from(&ra, &path).unwrap(); + b.sum(&rb); + b.close().unwrap(); + let r = PersistentCompactIntVec::open(&path).unwrap(); + assert_eq!(r.iter().collect::>(), vec![30, 300, 5, 101]); +} + +#[test] +fn combine_diff() { + let (_da, ra) = make_pciv(&[20, 1000, 5, 0]); + let (_db, rb) = make_pciv(&[10, 300, 10, 1]); + let dir = tempdir().unwrap(); + let path = dir.path().join("out.pciv"); + let mut b = PersistentCompactIntVecBuilder::build_from(&ra, &path).unwrap(); + b.diff(&rb); + b.close().unwrap(); + let r = PersistentCompactIntVec::open(&path).unwrap(); + assert_eq!(r.iter().collect::>(), vec![10, 700, 0, 0]); +} + #[test] fn mixed_large_dataset() { let n = 1000usize; let dir = tempdir().unwrap(); let path = dir.path().join("test.pciv"); - let mut b = PersistentCompactIntVecBuilder::new(n); + let mut b = PersistentCompactIntVecBuilder::new(n, &path).unwrap(); for i in 0..n { let v = if i % 100 == 0 { 100_000 + i as u32 } else { i as u32 % 200 }; b.set(i as u64, v); } - b.close(&path).unwrap(); + b.close().unwrap(); let r = PersistentCompactIntVec::open(&path).unwrap(); for i in 0..n { let expected = if i % 100 == 0 { 100_000 + i as u32 } else { i as u32 % 200 };