feat(matrix): add partial group reductions and column persistence
Expands MatrixGroupOps with partial_group_min/max helpers for bitwise reductions and introduces add_col_from methods to persist external vectors as matrix columns. Refactors column aggregation in the partitioner to leverage these group operations directly, replacing iterative row processing with simplified builder lifecycle management and explicit metadata serialization.
This commit is contained in:
@@ -3,8 +3,9 @@ use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use obicompactvec::{
|
||||
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
|
||||
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder,
|
||||
ColGroup, MatrixGroupOps,
|
||||
PersistentBitMatrix, PersistentBitMatrixBuilder,
|
||||
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder,
|
||||
};
|
||||
use obilayeredmap::meta::PartitionMeta;
|
||||
use obilayeredmap::OLMError;
|
||||
@@ -40,52 +41,6 @@ pub struct OutputCol {
|
||||
pub op: AggOp,
|
||||
}
|
||||
|
||||
// ── Aggregation ───────────────────────────────────────────────────────────────
|
||||
|
||||
#[inline]
|
||||
fn aggregate(op: AggOp, indices: &[usize], src_row: &[u32], threshold: u32) -> u32 {
|
||||
match op {
|
||||
AggOp::Any => {
|
||||
if indices.iter().any(|&i| src_row[i] > threshold) { 1 } else { 0 }
|
||||
}
|
||||
AggOp::All => {
|
||||
if indices.is_empty() { return 0; }
|
||||
if indices.iter().all(|&i| src_row[i] > threshold) { 1 } else { 0 }
|
||||
}
|
||||
AggOp::None => {
|
||||
if indices.iter().all(|&i| src_row[i] <= threshold) { 1 } else { 0 }
|
||||
}
|
||||
AggOp::Sum => {
|
||||
indices.iter().map(|&i| src_row[i]).fold(0u32, |a, b| a.saturating_add(b))
|
||||
}
|
||||
AggOp::Min => indices.iter().map(|&i| src_row[i]).min().unwrap_or(0),
|
||||
AggOp::Max => indices.iter().map(|&i| src_row[i]).max().unwrap_or(0),
|
||||
}
|
||||
}
|
||||
|
||||
// ── ColBuilder ────────────────────────────────────────────────────────────────
|
||||
|
||||
enum ColBuilder {
|
||||
Bit(PersistentBitVecBuilder),
|
||||
Int(PersistentCompactIntVecBuilder),
|
||||
}
|
||||
|
||||
impl ColBuilder {
|
||||
fn set_val(&mut self, slot: usize, value: u32) {
|
||||
match self {
|
||||
ColBuilder::Bit(b) => b.set(slot, value > 0),
|
||||
ColBuilder::Int(b) => b.set(slot, value),
|
||||
}
|
||||
}
|
||||
|
||||
fn close(self) -> SKResult<()> {
|
||||
match self {
|
||||
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
|
||||
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
fn olm_to_sk(e: OLMError) -> SKError {
|
||||
@@ -95,21 +50,6 @@ fn olm_to_sk(e: OLMError) -> SKError {
|
||||
}
|
||||
}
|
||||
|
||||
fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
|
||||
dir.join(format!("col_{col:06}.pbiv"))
|
||||
}
|
||||
|
||||
fn col_path_int(dir: &Path, col: usize) -> PathBuf {
|
||||
dir.join(format!("col_{col:06}.pciv"))
|
||||
}
|
||||
|
||||
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
|
||||
fs::write(
|
||||
dir.join("meta.json"),
|
||||
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
|
||||
)
|
||||
}
|
||||
|
||||
/// Copy all plain files (not subdirectories) from `src_dir` to `dst_dir`.
|
||||
fn copy_layer_files(src_dir: &Path, dst_dir: &Path) -> io::Result<()> {
|
||||
for entry in fs::read_dir(src_dir)? {
|
||||
@@ -125,30 +65,64 @@ fn copy_layer_files(src_dir: &Path, dst_dir: &Path) -> io::Result<()> {
|
||||
// ── fill_builders ─────────────────────────────────────────────────────────────
|
||||
|
||||
fn fill_builders(
|
||||
builders: &mut [ColBuilder],
|
||||
specs: &[OutputCol],
|
||||
n: usize,
|
||||
n_src: usize,
|
||||
src_layer_dir: &Path,
|
||||
src_is_count: bool,
|
||||
threshold: u32,
|
||||
output_presence: bool,
|
||||
mut dst_bit: Option<&mut PersistentBitMatrixBuilder>,
|
||||
mut dst_int: Option<&mut PersistentCompactIntMatrixBuilder>,
|
||||
) -> SKResult<()> {
|
||||
let mut src_buf = vec![0u32; n_src];
|
||||
|
||||
if src_is_count {
|
||||
let mat = PersistentCompactIntMatrix::open(src_layer_dir).map_err(SKError::Io)?;
|
||||
for slot in 0..n {
|
||||
mat.fill_row(slot, &mut src_buf);
|
||||
for (col, spec) in specs.iter().enumerate() {
|
||||
builders[col].set_val(slot, aggregate(spec.op, &spec.indices, &src_buf, threshold));
|
||||
for spec in specs {
|
||||
let g = ColGroup::new(&spec.label, spec.indices.clone());
|
||||
if output_presence {
|
||||
let b = dst_bit.as_deref_mut().unwrap();
|
||||
match spec.op {
|
||||
AggOp::Any => b.add_col_from (&mat.partial_group_any (&g, threshold).map_err(SKError::Io)?),
|
||||
AggOp::All => b.add_col_from (&mat.partial_group_all (&g, threshold).map_err(SKError::Io)?),
|
||||
AggOp::None => b.add_col_from (&mat.partial_group_none(&g, threshold).map_err(SKError::Io)?),
|
||||
AggOp::Sum => b.add_col_from_int(&mat.partial_group_sum (&g).map_err(SKError::Io)?),
|
||||
AggOp::Min => b.add_col_from_int(&mat.partial_group_min (&g).map_err(SKError::Io)?),
|
||||
AggOp::Max => b.add_col_from_int(&mat.partial_group_max (&g).map_err(SKError::Io)?),
|
||||
}.map_err(SKError::Io)?;
|
||||
} else {
|
||||
let b = dst_int.as_deref_mut().unwrap();
|
||||
match spec.op {
|
||||
AggOp::Sum => b.add_col_from (&mat.partial_group_sum (&g).map_err(SKError::Io)?),
|
||||
AggOp::Min => b.add_col_from (&mat.partial_group_min (&g).map_err(SKError::Io)?),
|
||||
AggOp::Max => b.add_col_from (&mat.partial_group_max (&g).map_err(SKError::Io)?),
|
||||
AggOp::Any => b.add_col_from_bit(&mat.partial_group_any (&g, threshold).map_err(SKError::Io)?),
|
||||
AggOp::All => b.add_col_from_bit(&mat.partial_group_all (&g, threshold).map_err(SKError::Io)?),
|
||||
AggOp::None => b.add_col_from_bit(&mat.partial_group_none(&g, threshold).map_err(SKError::Io)?),
|
||||
}.map_err(SKError::Io)?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let mat = PersistentBitMatrix::open(src_layer_dir).map_err(SKError::Io)?;
|
||||
for slot in 0..n {
|
||||
mat.fill_row(slot, &mut src_buf);
|
||||
for (col, spec) in specs.iter().enumerate() {
|
||||
builders[col].set_val(slot, aggregate(spec.op, &spec.indices, &src_buf, threshold));
|
||||
for spec in specs {
|
||||
let g = ColGroup::new(&spec.label, spec.indices.clone());
|
||||
if output_presence {
|
||||
let b = dst_bit.as_deref_mut().unwrap();
|
||||
match spec.op {
|
||||
AggOp::Any => b.add_col_from (&mat.partial_group_any (&g, 1).map_err(SKError::Io)?),
|
||||
AggOp::All => b.add_col_from (&mat.partial_group_all (&g, 1).map_err(SKError::Io)?),
|
||||
AggOp::None => b.add_col_from (&mat.partial_group_none(&g, 1).map_err(SKError::Io)?),
|
||||
AggOp::Sum => b.add_col_from_int(&mat.partial_group_sum (&g).map_err(SKError::Io)?),
|
||||
AggOp::Min => b.add_col_from_int(&mat.partial_group_min (&g).map_err(SKError::Io)?),
|
||||
AggOp::Max => b.add_col_from_int(&mat.partial_group_max (&g).map_err(SKError::Io)?),
|
||||
}.map_err(SKError::Io)?;
|
||||
} else {
|
||||
let b = dst_int.as_deref_mut().unwrap();
|
||||
match spec.op {
|
||||
AggOp::Sum => b.add_col_from (&mat.partial_group_sum (&g).map_err(SKError::Io)?),
|
||||
AggOp::Min => b.add_col_from (&mat.partial_group_min (&g).map_err(SKError::Io)?),
|
||||
AggOp::Max => b.add_col_from (&mat.partial_group_max (&g).map_err(SKError::Io)?),
|
||||
AggOp::Any => b.add_col_from_bit(&mat.partial_group_any (&g, 1).map_err(SKError::Io)?),
|
||||
AggOp::All => b.add_col_from_bit(&mat.partial_group_all (&g, 1).map_err(SKError::Io)?),
|
||||
AggOp::None => b.add_col_from_bit(&mat.partial_group_none(&g, 1).map_err(SKError::Io)?),
|
||||
}.map_err(SKError::Io)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -168,7 +142,7 @@ impl KmerPartition {
|
||||
src: &KmerPartition,
|
||||
i: usize,
|
||||
specs: &[OutputCol],
|
||||
n_src_genomes: usize,
|
||||
_n_src_genomes: usize,
|
||||
threshold: u32,
|
||||
output_presence: bool,
|
||||
in_place: bool,
|
||||
@@ -188,7 +162,6 @@ impl KmerPartition {
|
||||
fs::create_dir_all(&dst_index_dir)?;
|
||||
}
|
||||
|
||||
let n_out = specs.len();
|
||||
let data_subdir = if output_presence { "presence" } else { "counts" };
|
||||
|
||||
for l in 0..src_meta.n_layers {
|
||||
@@ -201,7 +174,7 @@ impl KmerPartition {
|
||||
let presence_dir = src_layer_dir.join("presence");
|
||||
let src_is_count = counts_dir.exists() && !presence_dir.exists();
|
||||
|
||||
// Determine number of slots from the source matrix.
|
||||
// Determine number of slots and detect implicit layers.
|
||||
let n = if counts_dir.exists() {
|
||||
PersistentCompactIntMatrix::open(&src_layer_dir).map_err(SKError::Io)?.n()
|
||||
} else if presence_dir.exists() {
|
||||
@@ -216,7 +189,7 @@ impl KmerPartition {
|
||||
};
|
||||
|
||||
// Choose the output data directory (temp name for in-place).
|
||||
let (dst_data_dir, final_data_dir) = if in_place {
|
||||
let (dst_data_dir, final_data_dir): (PathBuf, PathBuf) = if in_place {
|
||||
let tmp = dst_layer_dir.join(format!("{data_subdir}_new"));
|
||||
let perm = dst_layer_dir.join(data_subdir);
|
||||
(tmp, perm)
|
||||
@@ -231,37 +204,22 @@ impl KmerPartition {
|
||||
}
|
||||
fs::create_dir_all(&dst_data_dir)?;
|
||||
|
||||
// Initialise packed-format skeleton.
|
||||
if output_presence {
|
||||
PersistentBitMatrixBuilder::new(n, &dst_data_dir)
|
||||
.map_err(SKError::Io)?.close().map_err(SKError::Io)?;
|
||||
let (mut dst_bit, mut dst_int) = if output_presence {
|
||||
(Some(PersistentBitMatrixBuilder::new(n, &dst_data_dir).map_err(SKError::Io)?), None)
|
||||
} else {
|
||||
PersistentCompactIntMatrixBuilder::new(n, &dst_data_dir)
|
||||
.map_err(SKError::Io)?.close().map_err(SKError::Io)?;
|
||||
}
|
||||
|
||||
// Create column builders.
|
||||
let mut builders: Vec<ColBuilder> = (0..n_out)
|
||||
.map(|col| -> SKResult<ColBuilder> {
|
||||
if output_presence {
|
||||
Ok(ColBuilder::Bit(PersistentBitVecBuilder::new(
|
||||
n, &col_path_bit(&dst_data_dir, col),
|
||||
)?))
|
||||
} else {
|
||||
Ok(ColBuilder::Int(PersistentCompactIntVecBuilder::new(
|
||||
n, &col_path_int(&dst_data_dir, col),
|
||||
)?))
|
||||
}
|
||||
})
|
||||
.collect::<SKResult<_>>()?;
|
||||
(None, Some(PersistentCompactIntMatrixBuilder::new(n, &dst_data_dir).map_err(SKError::Io)?))
|
||||
};
|
||||
|
||||
fill_builders(
|
||||
&mut builders, specs, n, n_src_genomes,
|
||||
&src_layer_dir, src_is_count, threshold,
|
||||
specs, &src_layer_dir, src_is_count, threshold, output_presence,
|
||||
dst_bit.as_mut(), dst_int.as_mut(),
|
||||
)?;
|
||||
|
||||
for b in builders { b.close()?; }
|
||||
write_matrix_meta(&dst_data_dir, n, n_out).map_err(SKError::Io)?;
|
||||
if output_presence {
|
||||
dst_bit.unwrap().close().map_err(SKError::Io)?;
|
||||
} else {
|
||||
dst_int.unwrap().close().map_err(SKError::Io)?;
|
||||
}
|
||||
|
||||
// In-place: swap old data dir for new.
|
||||
if in_place {
|
||||
|
||||
Reference in New Issue
Block a user