feat(obicompactvec): migrate to memory-mapped file storage
Refactor `PersistentCompactIntVecBuilder` to replace in-memory `Vec<u8>` with `MmapMut` for primary storage. Decouples initialization from finalization by updating `finalize_pciv` to truncate, append overflow data, and overwrite the header placeholder. Adds file path tracking via a new `path()` accessor, implements `build_from` for efficient copying, and introduces arithmetic/set operations (`min`, `max`, `sum`, `diff`). Expands test coverage for persistence roundtrips, mutations, iteration, and the new vector operations.
This commit is contained in:
@@ -1,56 +1,118 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fs::{self, OpenOptions};
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use crate::format::write_pciv;
|
||||
use memmap2::MmapMut;
|
||||
|
||||
use crate::format::{finalize_pciv, HEADER_SIZE};
|
||||
use crate::reader::PersistentCompactIntVec;
|
||||
|
||||
pub struct PersistentCompactIntVecBuilder {
|
||||
primary: Vec<u8>,
|
||||
path: PathBuf,
|
||||
mmap: MmapMut,
|
||||
n: usize,
|
||||
overflow: HashMap<u64, u32>,
|
||||
}
|
||||
|
||||
impl PersistentCompactIntVecBuilder {
|
||||
pub fn new(n: usize) -> Self {
|
||||
Self {
|
||||
primary: vec![0u8; n],
|
||||
overflow: HashMap::new(),
|
||||
}
|
||||
/// Create a new, zero-filled PCIV at `path`. Primary is mmapped immediately.
|
||||
pub fn new(n: usize, path: &Path) -> io::Result<Self> {
|
||||
let file = OpenOptions::new().read(true).write(true).create(true).truncate(true).open(path)?;
|
||||
file.set_len((HEADER_SIZE + n) as u64)?;
|
||||
let mmap = unsafe { MmapMut::map_mut(&file)? };
|
||||
Ok(Self { path: path.to_path_buf(), mmap, n, overflow: HashMap::new() })
|
||||
}
|
||||
|
||||
pub fn set(&mut self, slot: u64, value: u32) {
|
||||
if value < 255 {
|
||||
self.primary[slot as usize] = value as u8;
|
||||
self.overflow.remove(&slot);
|
||||
} else {
|
||||
self.primary[slot as usize] = 255;
|
||||
self.overflow.insert(slot, value);
|
||||
/// Copy `source`'s file to `path`, mmap the primary section, load overflow into RAM.
|
||||
/// Avoids iterating all n slots: the file copy is OS-level, overflow loading is O(n_overflow).
|
||||
pub fn build_from(source: &PersistentCompactIntVec, path: &Path) -> io::Result<Self> {
|
||||
fs::copy(source.path(), path)?;
|
||||
|
||||
let file = OpenOptions::new().read(true).write(true).open(path)?;
|
||||
let mmap = unsafe { MmapMut::map_mut(&file)? };
|
||||
|
||||
let n = source.len();
|
||||
let n_overflow = u32::from_le_bytes(mmap[12..16].try_into().unwrap()) as usize;
|
||||
let data_offset = HEADER_SIZE + n;
|
||||
|
||||
let mut overflow = HashMap::with_capacity(n_overflow);
|
||||
for i in 0..n_overflow {
|
||||
let off = data_offset + i * 8;
|
||||
let slot = u32::from_le_bytes(mmap[off..off + 4].try_into().unwrap()) as u64;
|
||||
let value = u32::from_le_bytes(mmap[off + 4..off + 8].try_into().unwrap());
|
||||
overflow.insert(slot, value);
|
||||
}
|
||||
|
||||
Ok(Self { path: path.to_path_buf(), mmap, n, overflow })
|
||||
}
|
||||
|
||||
pub fn get(&self, slot: u64) -> u32 {
|
||||
match self.primary[slot as usize] {
|
||||
match self.mmap[HEADER_SIZE + slot as usize] {
|
||||
255 => *self.overflow.get(&slot).expect("sentinel without overflow entry"),
|
||||
v => v as u32,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.primary.len()
|
||||
pub fn set(&mut self, slot: u64, value: u32) {
|
||||
if value < 255 {
|
||||
self.mmap[HEADER_SIZE + slot as usize] = value as u8;
|
||||
self.overflow.remove(&slot);
|
||||
} else {
|
||||
self.mmap[HEADER_SIZE + slot as usize] = 255;
|
||||
self.overflow.insert(slot, value);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.primary.is_empty()
|
||||
pub fn len(&self) -> usize { self.n }
|
||||
|
||||
pub fn is_empty(&self) -> bool { self.n == 0 }
|
||||
|
||||
pub fn min(&mut self, other: &PersistentCompactIntVec) {
|
||||
assert_eq!(self.n, other.len(), "length mismatch");
|
||||
for (slot, other_val) in other.iter().enumerate() {
|
||||
if other_val < self.get(slot as u64) {
|
||||
self.set(slot as u64, other_val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Finalise and write a single PCIV file, then drop all in-memory state.
|
||||
pub fn close(self, path: &Path) -> io::Result<()> {
|
||||
let mut entries: Vec<(u32, u32)> = self
|
||||
.overflow
|
||||
pub fn max(&mut self, other: &PersistentCompactIntVec) {
|
||||
assert_eq!(self.n, other.len(), "length mismatch");
|
||||
for (slot, other_val) in other.iter().enumerate() {
|
||||
if other_val > self.get(slot as u64) {
|
||||
self.set(slot as u64, other_val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sum(&mut self, other: &PersistentCompactIntVec) {
|
||||
assert_eq!(self.n, other.len(), "length mismatch");
|
||||
for (slot, other_val) in other.iter().enumerate() {
|
||||
let cur = self.get(slot as u64);
|
||||
self.set(slot as u64, cur.checked_add(other_val).expect("u32 overflow in sum"));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn diff(&mut self, other: &PersistentCompactIntVec) {
|
||||
assert_eq!(self.n, other.len(), "length mismatch");
|
||||
for (slot, other_val) in other.iter().enumerate() {
|
||||
self.set(slot as u64, self.get(slot as u64).saturating_sub(other_val));
|
||||
}
|
||||
}
|
||||
|
||||
/// Flush the primary mmap, then write sorted overflow data + index and fix the header.
|
||||
pub fn close(self) -> io::Result<()> {
|
||||
self.mmap.flush()?;
|
||||
let Self { path, mmap, n, overflow } = self;
|
||||
drop(mmap);
|
||||
|
||||
let mut entries: Vec<(u32, u32)> = overflow
|
||||
.into_iter()
|
||||
.map(|(slot, value)| (slot as u32, value))
|
||||
.map(|(slot, val)| (slot as u32, val))
|
||||
.collect();
|
||||
entries.sort_unstable_by_key(|&(slot, _)| slot);
|
||||
|
||||
write_pciv(path, &self.primary, &entries)
|
||||
finalize_pciv(&path, n, &entries)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::{self, BufWriter, Seek, SeekFrom, Write as _};
|
||||
use std::path::Path;
|
||||
|
||||
@@ -8,32 +8,26 @@ pub const HEADER_SIZE: usize = 24; // magic(4) + n(8) + n_overflow(4) + step(4)
|
||||
// Sparse index target: fits in 32 KB L1 cache / 8 bytes per entry.
|
||||
pub const L1_INDEX_ENTRIES: usize = 4096;
|
||||
|
||||
/// Write a single PCIV file.
|
||||
/// Finalise a PCIV file whose placeholder header and primary section are already on disk.
|
||||
///
|
||||
/// Layout (write order = computation order):
|
||||
/// header 24 B placeholder, overwritten at the end
|
||||
/// primary n B u8 per slot (255 = overflow sentinel)
|
||||
/// data n_overflow × 8 B (slot: u32, value: u32) sorted by slot
|
||||
/// index n_index × 8 B (slot: u32, pos: u32) sparse index
|
||||
pub fn write_pciv(path: &Path, primary: &[u8], entries: &[(u32, u32)]) -> io::Result<()> {
|
||||
let n = primary.len();
|
||||
/// Truncates the file to `HEADER_SIZE + n`, then appends:
|
||||
/// data n_overflow × 8 B (slot: u32, value: u32) sorted by slot
|
||||
/// index n_index × 8 B (slot: u32, pos: u32) sparse index
|
||||
/// and overwrites the header placeholder at offset 0.
|
||||
pub fn finalize_pciv(path: &Path, n: usize, entries: &[(u32, u32)]) -> io::Result<()> {
|
||||
let n_overflow = entries.len();
|
||||
|
||||
let mut w = BufWriter::new(File::create(path)?);
|
||||
let file = OpenOptions::new().read(true).write(true).open(path)?;
|
||||
file.set_len((HEADER_SIZE + n) as u64)?;
|
||||
|
||||
// Placeholder header — will be overwritten at the end.
|
||||
w.write_all(&[0u8; HEADER_SIZE])?;
|
||||
let mut w = BufWriter::new(file);
|
||||
w.seek(SeekFrom::End(0))?;
|
||||
|
||||
// Primary array.
|
||||
w.write_all(primary)?;
|
||||
|
||||
// Overflow data (sorted by slot).
|
||||
for &(slot, value) in entries {
|
||||
w.write_all(&slot.to_le_bytes())?;
|
||||
w.write_all(&value.to_le_bytes())?;
|
||||
}
|
||||
|
||||
// Sparse index (computed now that n_overflow is known).
|
||||
let step: u32 = if n_overflow <= L1_INDEX_ENTRIES {
|
||||
0
|
||||
} else {
|
||||
@@ -53,7 +47,6 @@ pub fn write_pciv(path: &Path, primary: &[u8], entries: &[(u32, u32)]) -> io::Re
|
||||
0
|
||||
};
|
||||
|
||||
// Flush, then seek back to write the real header.
|
||||
w.flush()?;
|
||||
let mut file = w.into_inner().map_err(|e| e.into_error())?;
|
||||
file.seek(SeekFrom::Start(0))?;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use memmap2::Mmap;
|
||||
|
||||
@@ -14,6 +14,7 @@ pub struct PersistentCompactIntVec {
|
||||
index: Vec<(u32, u32)>, // (slot, pos) — L1-resident sparse index
|
||||
primary_offset: usize, // = HEADER_SIZE
|
||||
data_offset: usize, // = HEADER_SIZE + n
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl PersistentCompactIntVec {
|
||||
@@ -36,7 +37,6 @@ impl PersistentCompactIntVec {
|
||||
let data_offset = primary_offset + n;
|
||||
let index_offset = data_offset + n_overflow * 8;
|
||||
|
||||
// Load the sparse index into RAM (L1-resident at query time).
|
||||
let mut index = Vec::with_capacity(n_index);
|
||||
for i in 0..n_index {
|
||||
let off = index_offset + i * 8;
|
||||
@@ -45,7 +45,20 @@ impl PersistentCompactIntVec {
|
||||
index.push((slot, pos));
|
||||
}
|
||||
|
||||
Ok(Self { mmap, n, n_overflow, step, index, primary_offset, data_offset })
|
||||
Ok(Self {
|
||||
mmap,
|
||||
n,
|
||||
n_overflow,
|
||||
step,
|
||||
index,
|
||||
primary_offset,
|
||||
data_offset,
|
||||
path: path.to_path_buf(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn path(&self) -> &Path {
|
||||
&self.path
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
|
||||
@@ -5,15 +5,27 @@ use crate::{PersistentCompactIntVec, PersistentCompactIntVecBuilder};
|
||||
fn roundtrip(values: &[(u64, u32)], n: usize) -> Vec<u32> {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::new(n);
|
||||
let mut b = PersistentCompactIntVecBuilder::new(n, &path).unwrap();
|
||||
for &(slot, v) in values {
|
||||
b.set(slot, v);
|
||||
}
|
||||
b.close(&path).unwrap();
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
(0..n as u64).map(|s| r.get(s)).collect()
|
||||
}
|
||||
|
||||
fn make_pciv(values: &[u32]) -> (tempfile::TempDir, PersistentCompactIntVec) {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("v.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::new(values.len(), &path).unwrap();
|
||||
for (i, &v) in values.iter().enumerate() {
|
||||
b.set(i as u64, v);
|
||||
}
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
(dir, r)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn all_zero_by_default() {
|
||||
let got = roundtrip(&[], 8);
|
||||
@@ -24,12 +36,12 @@ fn all_zero_by_default() {
|
||||
fn small_values_no_overflow() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::new(4);
|
||||
let mut b = PersistentCompactIntVecBuilder::new(4, &path).unwrap();
|
||||
b.set(0, 1);
|
||||
b.set(1, 254);
|
||||
b.set(2, 0);
|
||||
b.set(3, 100);
|
||||
b.close(&path).unwrap();
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
assert_eq!(r.get(0), 1);
|
||||
assert_eq!(r.get(1), 254);
|
||||
@@ -51,10 +63,10 @@ fn overflow_values_roundtrip() {
|
||||
fn mutation_downward_removes_from_overflow() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::new(2);
|
||||
let mut b = PersistentCompactIntVecBuilder::new(2, &path).unwrap();
|
||||
b.set(0, 1000);
|
||||
b.set(0, 42);
|
||||
b.close(&path).unwrap();
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
assert_eq!(r.get(0), 42);
|
||||
assert_eq!(r.step, 0, "no overflow entries expected");
|
||||
@@ -64,10 +76,10 @@ fn mutation_downward_removes_from_overflow() {
|
||||
fn mutation_upward_updates_overflow() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::new(1);
|
||||
let mut b = PersistentCompactIntVecBuilder::new(1, &path).unwrap();
|
||||
b.set(0, 300);
|
||||
b.set(0, 500);
|
||||
b.close(&path).unwrap();
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
assert_eq!(r.get(0), 500);
|
||||
}
|
||||
@@ -77,11 +89,11 @@ fn sparse_index_built_for_many_overflows() {
|
||||
let n = 5000usize;
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::new(n);
|
||||
let mut b = PersistentCompactIntVecBuilder::new(n, &path).unwrap();
|
||||
for i in 0..n {
|
||||
b.set(i as u64, 1000 + i as u32);
|
||||
}
|
||||
b.close(&path).unwrap();
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
assert!(r.step > 0, "sparse index should have been built");
|
||||
for i in 0..n {
|
||||
@@ -95,11 +107,11 @@ fn iter_matches_get() {
|
||||
let n = 6;
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::new(n);
|
||||
let mut b = PersistentCompactIntVecBuilder::new(n, &path).unwrap();
|
||||
for &(slot, v) in &values {
|
||||
b.set(slot, v);
|
||||
}
|
||||
b.close(&path).unwrap();
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
|
||||
let via_iter: Vec<u32> = r.iter().collect();
|
||||
@@ -111,9 +123,9 @@ fn iter_matches_get() {
|
||||
fn iter_size_hint_exact() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::new(5);
|
||||
let mut b = PersistentCompactIntVecBuilder::new(5, &path).unwrap();
|
||||
b.set(2, 1000);
|
||||
b.close(&path).unwrap();
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
let mut it = r.iter();
|
||||
assert_eq!(it.len(), 5);
|
||||
@@ -125,27 +137,90 @@ fn iter_size_hint_exact() {
|
||||
fn into_iter_for_ref() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::new(3);
|
||||
let mut b = PersistentCompactIntVecBuilder::new(3, &path).unwrap();
|
||||
b.set(0, 10);
|
||||
b.set(1, 500);
|
||||
b.set(2, 30);
|
||||
b.close(&path).unwrap();
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
let collected: Vec<u32> = (&r).into_iter().collect();
|
||||
assert_eq!(collected, vec![10, 500, 30]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_from_roundtrip() {
|
||||
let a = vec![0u32, 50, 255, 1000, 7, 1_313_691];
|
||||
let (_dir_a, ra) = make_pciv(&a);
|
||||
let dir_b = tempdir().unwrap();
|
||||
let path_b = dir_b.path().join("b.pciv");
|
||||
PersistentCompactIntVecBuilder::build_from(&ra, &path_b).unwrap().close().unwrap();
|
||||
let rb = PersistentCompactIntVec::open(&path_b).unwrap();
|
||||
assert_eq!(ra.iter().collect::<Vec<_>>(), rb.iter().collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn combine_min() {
|
||||
let (_da, ra) = make_pciv(&[10, 300, 0, 1000]);
|
||||
let (_db, rb) = make_pciv(&[20, 100, 500, 800]);
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("out.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::build_from(&ra, &path).unwrap();
|
||||
b.min(&rb);
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
assert_eq!(r.iter().collect::<Vec<_>>(), vec![10, 100, 0, 800]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn combine_max() {
|
||||
let (_da, ra) = make_pciv(&[10, 300, 0, 1000]);
|
||||
let (_db, rb) = make_pciv(&[20, 100, 500, 800]);
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("out.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::build_from(&ra, &path).unwrap();
|
||||
b.max(&rb);
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
assert_eq!(r.iter().collect::<Vec<_>>(), vec![20, 300, 500, 1000]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn combine_sum() {
|
||||
let (_da, ra) = make_pciv(&[10, 200, 0, 100]);
|
||||
let (_db, rb) = make_pciv(&[20, 100, 5, 1]);
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("out.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::build_from(&ra, &path).unwrap();
|
||||
b.sum(&rb);
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
assert_eq!(r.iter().collect::<Vec<_>>(), vec![30, 300, 5, 101]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn combine_diff() {
|
||||
let (_da, ra) = make_pciv(&[20, 1000, 5, 0]);
|
||||
let (_db, rb) = make_pciv(&[10, 300, 10, 1]);
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("out.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::build_from(&ra, &path).unwrap();
|
||||
b.diff(&rb);
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
assert_eq!(r.iter().collect::<Vec<_>>(), vec![10, 700, 0, 0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mixed_large_dataset() {
|
||||
let n = 1000usize;
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.pciv");
|
||||
let mut b = PersistentCompactIntVecBuilder::new(n);
|
||||
let mut b = PersistentCompactIntVecBuilder::new(n, &path).unwrap();
|
||||
for i in 0..n {
|
||||
let v = if i % 100 == 0 { 100_000 + i as u32 } else { i as u32 % 200 };
|
||||
b.set(i as u64, v);
|
||||
}
|
||||
b.close(&path).unwrap();
|
||||
b.close().unwrap();
|
||||
let r = PersistentCompactIntVec::open(&path).unwrap();
|
||||
for i in 0..n {
|
||||
let expected = if i % 100 == 0 { 100_000 + i as u32 } else { i as u32 % 200 };
|
||||
|
||||
Reference in New Issue
Block a user