refactor: replace rayon with NUMA-aware PartitionRunner

Replaces `rayon` parallel iteration across index, rebuild, reindex, and select modules with a custom `PartitionRunner`. This introduces NUMA-aware task distribution with CPU pinning and round-robin scheduling, eliminating `Arc`, `Mutex`, and atomic synchronization primitives in favor of a flat, pre-spawned worker architecture. Error handling is simplified via `.map_err()` and the `?` operator, while progress bar updates are decoupled into dedicated callbacks.
This commit is contained in:
Eric Coissac
2026-06-15 15:45:04 +02:00
parent bc92dc4592
commit b6fcbc545f
8 changed files with 198 additions and 319 deletions
+33 -16
View File
@@ -1,5 +1,5 @@
use std::fs::{self, File};
use std::io::{self, Write as _};
use std::io::{self, BufWriter, Write as _};
use std::path::{Path, PathBuf};
use memmap2::Mmap;
@@ -230,30 +230,47 @@ impl PackedBitMatrix {
/// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files.
pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> {
let packed_path = dir.join("matrix.pbmx");
if packed_path.exists() {
// Matrix complete; remove any leftover column files from a killed cleanup.
if let Ok(meta) = MatrixMeta::load(dir) {
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
let _ = fs::remove_file(dir.join("meta.json"));
}
return Ok(());
}
let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols;
let col_files: Vec<Vec<u8>> = (0..n_cols)
.map(|c| fs::read(col_path(dir, c)))
// Compute offsets from file sizes — no column data loaded into RAM.
let col_sizes: Vec<u64> = (0..n_cols)
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
.collect::<io::Result<_>>()?;
let header_size = PBMX_HEADER + n_cols * 8;
let header_size = (PBMX_HEADER + n_cols * 8) as u64;
let mut col_offset = header_size;
let mut offsets = Vec::with_capacity(n_cols);
for data in &col_files {
offsets.push(col_offset as u64);
col_offset += data.len();
for &size in &col_sizes {
offsets.push(col_offset);
col_offset += size;
}
let packed_path = dir.join("matrix.pbmx");
let mut file = File::create(&packed_path)?;
file.write_all(&PBMX_MAGIC)?;
file.write_all(&[0u8; 4])?;
file.write_all(&(meta.n as u64).to_le_bytes())?;
file.write_all(&(n_cols as u64).to_le_bytes())?;
for &off in &offsets { file.write_all(&off.to_le_bytes())?; }
for data in &col_files { file.write_all(data)?; }
drop(file);
// Write to a temp file; rename atomically so a killed process never leaves
// a truncated matrix.pbmx that would be mistaken for a complete file.
let tmp_path = dir.join("matrix.pbmx.tmp");
let mut out = BufWriter::new(File::create(&tmp_path)?);
out.write_all(&PBMX_MAGIC)?;
out.write_all(&[0u8; 4])?;
out.write_all(&(meta.n as u64).to_le_bytes())?;
out.write_all(&(n_cols as u64).to_le_bytes())?;
for &off in &offsets { out.write_all(&off.to_le_bytes())?; }
for c in 0..n_cols {
io::copy(&mut File::open(col_path(dir, c))?, &mut out)?;
}
out.flush()?;
drop(out);
fs::rename(&tmp_path, &packed_path)?;
for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; }
fs::remove_file(dir.join("meta.json"))?;
+33 -16
View File
@@ -1,6 +1,6 @@
use std::cmp::Ordering;
use std::fs::{self, File};
use std::io::{self, Write as _};
use std::io::{self, BufWriter, Write as _};
use std::path::{Path, PathBuf};
use memmap2::Mmap;
@@ -354,30 +354,47 @@ impl PackedCompactIntMatrix {
/// Build `counts/matrix.pcmx` from existing `col_*.pciv` files.
pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> {
let packed_path = dir.join("matrix.pcmx");
if packed_path.exists() {
// Matrix complete; remove any leftover column files from a killed cleanup.
if let Ok(meta) = MatrixMeta::load(dir) {
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
let _ = fs::remove_file(dir.join("meta.json"));
}
return Ok(());
}
let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols;
let col_files: Vec<Vec<u8>> = (0..n_cols)
.map(|c| fs::read(col_path(dir, c)))
// Compute offsets from file sizes — no column data loaded into RAM.
let col_sizes: Vec<u64> = (0..n_cols)
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
.collect::<io::Result<_>>()?;
let header_size = PCMX_HEADER + n_cols * 8;
let header_size = (PCMX_HEADER + n_cols * 8) as u64;
let mut col_offset = header_size;
let mut offsets = Vec::with_capacity(n_cols);
for data in &col_files {
offsets.push(col_offset as u64);
col_offset += data.len();
for &size in &col_sizes {
offsets.push(col_offset);
col_offset += size;
}
let packed_path = dir.join("matrix.pcmx");
let mut file = File::create(&packed_path)?;
file.write_all(&PCMX_MAGIC)?;
file.write_all(&[0u8; 4])?;
file.write_all(&(meta.n as u64).to_le_bytes())?;
file.write_all(&(n_cols as u64).to_le_bytes())?;
for &off in &offsets { file.write_all(&off.to_le_bytes())?; }
for data in &col_files { file.write_all(data)?; }
drop(file);
// Write to a temp file; rename atomically so a killed process never leaves
// a truncated matrix.pcmx that would be mistaken for a complete file.
let tmp_path = dir.join("matrix.pcmx.tmp");
let mut out = BufWriter::new(File::create(&tmp_path)?);
out.write_all(&PCMX_MAGIC)?;
out.write_all(&[0u8; 4])?;
out.write_all(&(meta.n as u64).to_le_bytes())?;
out.write_all(&(n_cols as u64).to_le_bytes())?;
for &off in &offsets { out.write_all(&off.to_le_bytes())?; }
for c in 0..n_cols {
io::copy(&mut File::open(col_path(dir, c))?, &mut out)?;
}
out.flush()?;
drop(out);
fs::rename(&tmp_path, &packed_path)?;
for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; }
fs::remove_file(dir.join("meta.json"))?;