diff --git a/docmd/implementation/obicompactvec.md b/docmd/implementation/obicompactvec.md index e3c8568..301b021 100644 --- a/docmd/implementation/obicompactvec.md +++ b/docmd/implementation/obicompactvec.md @@ -444,71 +444,64 @@ Defined **once at the index level** from column metadata. Valid in all matrices ### MatrixGroupOps -Group operations expose only **additive intermediates** backed by temp files. Final predicates are applied at the index level after accumulation. +Five required primitives + two default methods derived from them. All return temp-file-backed types. ```rust pub trait MatrixGroupOps { + // required fn partial_group_presence_count(&self, g: &ColGroup, threshold: u32) -> io::Result; - fn partial_group_sum(&self, g: &ColGroup) -> io::Result; - fn partial_group_any(&self, g: &ColGroup, threshold: u32) -> io::Result; + fn partial_group_min(&self, g: &ColGroup) + -> io::Result; + fn partial_group_max(&self, g: &ColGroup) + -> io::Result; + + // defaults derived from partial_group_presence_count + fn partial_group_all(&self, g: &ColGroup, threshold: u32) + -> io::Result; // slot=1 iff count == g.indices.len() + fn partial_group_none(&self, g: &ColGroup, threshold: u32) + -> io::Result; // slot=1 iff count == 0 } ``` -Implemented for both `PersistentCompactIntMatrix` and `PersistentBitMatrix`. For bit matrices, `partial_group_sum` delegates to `partial_group_presence_count(g, 1)`. +Implemented for both `PersistentCompactIntMatrix` and `PersistentBitMatrix`. + +For **bit matrices**: values are 0/1, so `partial_group_sum` = `partial_group_presence_count(g, 1)`; `partial_group_min` is AND (set first column then mask-with remaining); `partial_group_max` is OR via `partial_group_any` + `inc_present`. **`partial_group_presence_count` — chunking for large groups:** -When `g.indices.len() < 255`: per-slot counts stay within `u8` range. Use `inc_present_fast` (bit matrix) or `inc_predicate_fast(col_view(c), |v| v >= threshold)` (int matrix) — raw u8 increment, no overflow map written. +When `g.indices.len() < 255`: per-slot counts stay within `u8` range. Use `inc_present_fast` (bit) or `inc_predicate_fast(col_view(c), |v| v >= threshold)` (int) — raw u8 increment, no overflow entry written. -When `g.indices.len() ≥ 255`: process in chunks of 254 columns (each chunk stays within u8 range), accumulate into a running builder via `.add(chunk_frozen.view())`. +When `g.indices.len() ≥ 255`: process in chunks of 254 columns, accumulate via `.add(chunk_frozen.view())`. -``` -fast path (< 255 cols): - builder = TempCompactIntVecBuilder::new(n) - for c in group: - builder.inc_predicate_fast(matrix.col_view(c), |v| v >= threshold) - builder.freeze() +**`partial_group_min` (int matrix)**: copy first column via `.add(col_view(first))` (start from 0 ⇒ copy), then `.min(col_view(c))` for remaining. -slow path (≥ 255 cols): - result = TempCompactIntVecBuilder::new(n) - for chunk in group.chunks(254): - chunk_b = TempCompactIntVecBuilder::new(n) - for c in chunk: - chunk_b.inc_predicate_fast(matrix.col_view(c), |v| v >= threshold) - frozen = chunk_b.freeze() - result.add(frozen.view()) - result.freeze() -``` +**`partial_group_max` (int matrix)**: `.max(col_view(c))` for all columns (start from 0 ⇒ first column acts as copy). -**`partial_group_any`** uses `or_where` on `TempBitVecBuilder`: +**`partial_group_any`** uses `or_where` on `TempBitVecBuilder` (two-pass: primary bytes then overflow entries). -``` -result = TempBitVecBuilder::new(n) -for c in group: - result.or_where(matrix.col_view(c), |v| v >= threshold) -result.freeze() -``` +**`partial_group_all` / `partial_group_none`** (default): call `partial_group_presence_count`, then iterate slots to produce the bit result. O(n) extra pass, not chunked. -**Non-additive predicates** (`group_all`, `group_at_least(k)`) are composed at the index level: +### add_col_from — matrix builder integration + +Both matrix builders accept temp-file results directly: ```rust -// "present in >= 2 ingroup columns with count >= 3, absent from all outgroup" -let presence = layers.map(|l| l.partial_group_presence_count(&ingroup, 3)?).add_all()?; -let in_mask = presence.view().geq(2); // IntSliceView method +// PersistentBitMatrixBuilder +fn add_col_from(&mut self, src: &TempBitVec) -> io::Result<()> +fn add_col_from_int(&mut self, src: &TempCompactIntVec) -> io::Result<()> // nonzero → 1 -let out_sum = layers.map(|l| l.partial_group_sum(&outgroup)?).add_all()?; -let out_mask = out_sum.view().leq(0); - -let mut mask_b = TempBitVecBuilder::new(n)?; -mask_b.copy_from(in_mask); -mask_b.and(out_mask); +// PersistentCompactIntMatrixBuilder +fn add_col_from(&mut self, src: &TempCompactIntVec) -> io::Result<()> +fn add_col_from_bit(&mut self, src: &TempBitVec) -> io::Result<()> // bit → 0/1 u32 ``` +`add_col_from` copies the temp file to the matrix directory and increments `n_cols`; `close()` writes `meta.json` with the final column count. No separate `write_meta` step needed. + ### mask_with Direct method on `PersistentCompactIntVecBuilder` (and delegation via `TempCompactIntVecBuilder`). Zeros every slot where the corresponding mask bit is 0. Iterates only zero bits — O(n_zeros), O(1) when mask is all-ones. diff --git a/src/obicompactvec/src/bitmatrix.rs b/src/obicompactvec/src/bitmatrix.rs index 2717174..55a4561 100644 --- a/src/obicompactvec/src/bitmatrix.rs +++ b/src/obicompactvec/src/bitmatrix.rs @@ -402,6 +402,26 @@ impl PersistentBitMatrixBuilder { PersistentBitVecBuilder::new(self.n, &path) } + pub fn add_col_from(&mut self, src: &TempBitVec) -> io::Result<()> { + src.make_persistent(&col_path(&self.dir, self.n_cols))?; + self.n_cols += 1; + Ok(()) + } + + pub fn add_col_from_int(&mut self, src: &TempCompactIntVec) -> io::Result<()> { + let path = col_path(&self.dir, self.n_cols); + self.n_cols += 1; + let mut b = PersistentBitVecBuilder::new(self.n, &path)?; + let view = src.view(); + for slot in 0..self.n { + if view.primary_bytes()[slot] > 0 { b.set(slot, true); } + } + for (slot, _) in view.overflow_entries() { + b.set(slot, true); + } + b.close() + } + pub fn close(self) -> io::Result<()> { MatrixMeta { n: self.n, n_cols: self.n_cols }.save(&self.dir) } @@ -446,6 +466,26 @@ impl MatrixGroupOps for PersistentBitMatrix { } result.freeze() } + + fn partial_group_min(&self, g: &ColGroup) -> io::Result { + // min of 0/1 values = AND: 1 only if ALL columns are 1 + let n = self.n(); + let mut result = TempCompactIntVecBuilder::new(n)?; + if let Some((&first, rest)) = g.indices.split_first() { + result.inc_present_fast(self.col_view(first)); + for &c in rest { result.mask_with(self.col_view(c)); } + } + result.freeze() + } + + fn partial_group_max(&self, g: &ColGroup) -> io::Result { + // max of 0/1 values = OR: 1 if any column is 1 + let any = self.partial_group_any(g, 1)?; + let n = any.len(); + let mut result = TempCompactIntVecBuilder::new(n)?; + result.inc_present(any.view()); + result.freeze() + } } // ── Shared matrix helpers (also used by intmatrix.rs) ───────────────────────── diff --git a/src/obicompactvec/src/colgroup.rs b/src/obicompactvec/src/colgroup.rs index c238a62..fa70830 100644 --- a/src/obicompactvec/src/colgroup.rs +++ b/src/obicompactvec/src/colgroup.rs @@ -1,6 +1,6 @@ use std::io; -use crate::tempbitvec::TempBitVec; +use crate::tempbitvec::{TempBitVec, TempBitVecBuilder}; use crate::tempintvec::TempCompactIntVec; // ── ColGroup ────────────────────────────────────────────────────────────────── @@ -23,12 +23,14 @@ impl ColGroup { // ── MatrixGroupOps ──────────────────────────────────────────────────────────── -/// Per-matrix group aggregations that return **additive intermediates**. +/// Per-matrix group aggregations. /// -/// Results must be composed by the caller (concat across partitions, add across -/// layers) before applying final predicates (`geq`, `leq`, …). Non-additive -/// predicates like `group_all` or `group_at_least(k)` are intentionally absent -/// — they are derived at the index level from these intermediates. +/// `partial_group_presence_count`, `partial_group_sum`, `partial_group_any`, +/// `partial_group_min`, `partial_group_max` are the primitives; each impl must +/// provide all five. +/// +/// `partial_group_all` and `partial_group_none` have default implementations +/// derived from `partial_group_presence_count` and should rarely need overriding. pub trait MatrixGroupOps { /// Per-slot count of group columns whose value ≥ `threshold`. fn partial_group_presence_count(&self, g: &ColGroup, threshold: u32) -> io::Result; @@ -36,6 +38,35 @@ pub trait MatrixGroupOps { /// Per-slot sum of values across all group columns. fn partial_group_sum(&self, g: &ColGroup) -> io::Result; - /// Per-slot OR: true if any group column has value ≥ `threshold`. + /// Per-slot OR: 1 if any group column has value ≥ `threshold`. fn partial_group_any(&self, g: &ColGroup, threshold: u32) -> io::Result; + + /// Per-slot min value across all group columns (0 if group is empty). + fn partial_group_min(&self, g: &ColGroup) -> io::Result; + + /// Per-slot max value across all group columns (0 if group is empty). + fn partial_group_max(&self, g: &ColGroup) -> io::Result; + + /// Per-slot AND: 1 if ALL group columns have value ≥ `threshold`. + fn partial_group_all(&self, g: &ColGroup, threshold: u32) -> io::Result { + let counts = self.partial_group_presence_count(g, threshold)?; + let n = counts.len(); + let n_required = g.indices.len() as u32; + let mut b = TempBitVecBuilder::new(n)?; + for slot in 0..n { + if counts.get(slot) >= n_required { b.set(slot, true); } + } + b.freeze() + } + + /// Per-slot NOR: 1 if NO group column has value ≥ `threshold`. + fn partial_group_none(&self, g: &ColGroup, threshold: u32) -> io::Result { + let counts = self.partial_group_presence_count(g, threshold)?; + let n = counts.len(); + let mut b = TempBitVecBuilder::new(n)?; + for slot in 0..n { + if counts.get(slot) == 0 { b.set(slot, true); } + } + b.freeze() + } } diff --git a/src/obicompactvec/src/intmatrix.rs b/src/obicompactvec/src/intmatrix.rs index f3486d6..b2fa97e 100644 --- a/src/obicompactvec/src/intmatrix.rs +++ b/src/obicompactvec/src/intmatrix.rs @@ -386,6 +386,21 @@ impl PersistentCompactIntMatrixBuilder { self.n_cols += 1; PersistentCompactIntVecBuilder::new(self.n, &path) } + + pub fn add_col_from(&mut self, src: &TempCompactIntVec) -> io::Result<()> { + src.make_persistent(&col_path(&self.dir, self.n_cols))?; + self.n_cols += 1; + Ok(()) + } + + pub fn add_col_from_bit(&mut self, src: &TempBitVec) -> io::Result<()> { + let path = col_path(&self.dir, self.n_cols); + self.n_cols += 1; + let mut b = PersistentCompactIntVecBuilder::new(self.n, &path)?; + b.inc_present(src.view()); + b.close() + } + pub fn close(self) -> io::Result<()> { MatrixMeta { n: self.n, n_cols: self.n_cols }.save(&self.dir) } @@ -431,4 +446,21 @@ impl MatrixGroupOps for PersistentCompactIntMatrix { } result.freeze() } + + fn partial_group_min(&self, g: &ColGroup) -> io::Result { + let n = self.n(); + let mut result = TempCompactIntVecBuilder::new(n)?; + if let Some((&first, rest)) = g.indices.split_first() { + result.add(self.col_view(first)); + for &c in rest { result.min(self.col_view(c)); } + } + result.freeze() + } + + fn partial_group_max(&self, g: &ColGroup) -> io::Result { + let n = self.n(); + let mut result = TempCompactIntVecBuilder::new(n)?; + for &c in &g.indices { result.max(self.col_view(c)); } + result.freeze() + } } diff --git a/src/obikpartitionner/src/select_layer.rs b/src/obikpartitionner/src/select_layer.rs index 36286c0..c7f45e4 100644 --- a/src/obikpartitionner/src/select_layer.rs +++ b/src/obikpartitionner/src/select_layer.rs @@ -3,8 +3,9 @@ use std::io; use std::path::{Path, PathBuf}; use obicompactvec::{ - PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, - PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder, + ColGroup, MatrixGroupOps, + PersistentBitMatrix, PersistentBitMatrixBuilder, + PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, }; use obilayeredmap::meta::PartitionMeta; use obilayeredmap::OLMError; @@ -40,52 +41,6 @@ pub struct OutputCol { pub op: AggOp, } -// ── Aggregation ─────────────────────────────────────────────────────────────── - -#[inline] -fn aggregate(op: AggOp, indices: &[usize], src_row: &[u32], threshold: u32) -> u32 { - match op { - AggOp::Any => { - if indices.iter().any(|&i| src_row[i] > threshold) { 1 } else { 0 } - } - AggOp::All => { - if indices.is_empty() { return 0; } - if indices.iter().all(|&i| src_row[i] > threshold) { 1 } else { 0 } - } - AggOp::None => { - if indices.iter().all(|&i| src_row[i] <= threshold) { 1 } else { 0 } - } - AggOp::Sum => { - indices.iter().map(|&i| src_row[i]).fold(0u32, |a, b| a.saturating_add(b)) - } - AggOp::Min => indices.iter().map(|&i| src_row[i]).min().unwrap_or(0), - AggOp::Max => indices.iter().map(|&i| src_row[i]).max().unwrap_or(0), - } -} - -// ── ColBuilder ──────────────────────────────────────────────────────────────── - -enum ColBuilder { - Bit(PersistentBitVecBuilder), - Int(PersistentCompactIntVecBuilder), -} - -impl ColBuilder { - fn set_val(&mut self, slot: usize, value: u32) { - match self { - ColBuilder::Bit(b) => b.set(slot, value > 0), - ColBuilder::Int(b) => b.set(slot, value), - } - } - - fn close(self) -> SKResult<()> { - match self { - ColBuilder::Bit(b) => b.close().map_err(SKError::Io), - ColBuilder::Int(b) => b.close().map_err(SKError::Io), - } - } -} - // ── Helpers ─────────────────────────────────────────────────────────────────── fn olm_to_sk(e: OLMError) -> SKError { @@ -95,21 +50,6 @@ fn olm_to_sk(e: OLMError) -> SKError { } } -fn col_path_bit(dir: &Path, col: usize) -> PathBuf { - dir.join(format!("col_{col:06}.pbiv")) -} - -fn col_path_int(dir: &Path, col: usize) -> PathBuf { - dir.join(format!("col_{col:06}.pciv")) -} - -fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { - fs::write( - dir.join("meta.json"), - format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), - ) -} - /// Copy all plain files (not subdirectories) from `src_dir` to `dst_dir`. fn copy_layer_files(src_dir: &Path, dst_dir: &Path) -> io::Result<()> { for entry in fs::read_dir(src_dir)? { @@ -125,30 +65,64 @@ fn copy_layer_files(src_dir: &Path, dst_dir: &Path) -> io::Result<()> { // ── fill_builders ───────────────────────────────────────────────────────────── fn fill_builders( - builders: &mut [ColBuilder], specs: &[OutputCol], - n: usize, - n_src: usize, src_layer_dir: &Path, src_is_count: bool, threshold: u32, + output_presence: bool, + mut dst_bit: Option<&mut PersistentBitMatrixBuilder>, + mut dst_int: Option<&mut PersistentCompactIntMatrixBuilder>, ) -> SKResult<()> { - let mut src_buf = vec![0u32; n_src]; - if src_is_count { let mat = PersistentCompactIntMatrix::open(src_layer_dir).map_err(SKError::Io)?; - for slot in 0..n { - mat.fill_row(slot, &mut src_buf); - for (col, spec) in specs.iter().enumerate() { - builders[col].set_val(slot, aggregate(spec.op, &spec.indices, &src_buf, threshold)); + for spec in specs { + let g = ColGroup::new(&spec.label, spec.indices.clone()); + if output_presence { + let b = dst_bit.as_deref_mut().unwrap(); + match spec.op { + AggOp::Any => b.add_col_from (&mat.partial_group_any (&g, threshold).map_err(SKError::Io)?), + AggOp::All => b.add_col_from (&mat.partial_group_all (&g, threshold).map_err(SKError::Io)?), + AggOp::None => b.add_col_from (&mat.partial_group_none(&g, threshold).map_err(SKError::Io)?), + AggOp::Sum => b.add_col_from_int(&mat.partial_group_sum (&g).map_err(SKError::Io)?), + AggOp::Min => b.add_col_from_int(&mat.partial_group_min (&g).map_err(SKError::Io)?), + AggOp::Max => b.add_col_from_int(&mat.partial_group_max (&g).map_err(SKError::Io)?), + }.map_err(SKError::Io)?; + } else { + let b = dst_int.as_deref_mut().unwrap(); + match spec.op { + AggOp::Sum => b.add_col_from (&mat.partial_group_sum (&g).map_err(SKError::Io)?), + AggOp::Min => b.add_col_from (&mat.partial_group_min (&g).map_err(SKError::Io)?), + AggOp::Max => b.add_col_from (&mat.partial_group_max (&g).map_err(SKError::Io)?), + AggOp::Any => b.add_col_from_bit(&mat.partial_group_any (&g, threshold).map_err(SKError::Io)?), + AggOp::All => b.add_col_from_bit(&mat.partial_group_all (&g, threshold).map_err(SKError::Io)?), + AggOp::None => b.add_col_from_bit(&mat.partial_group_none(&g, threshold).map_err(SKError::Io)?), + }.map_err(SKError::Io)?; } } } else { let mat = PersistentBitMatrix::open(src_layer_dir).map_err(SKError::Io)?; - for slot in 0..n { - mat.fill_row(slot, &mut src_buf); - for (col, spec) in specs.iter().enumerate() { - builders[col].set_val(slot, aggregate(spec.op, &spec.indices, &src_buf, threshold)); + for spec in specs { + let g = ColGroup::new(&spec.label, spec.indices.clone()); + if output_presence { + let b = dst_bit.as_deref_mut().unwrap(); + match spec.op { + AggOp::Any => b.add_col_from (&mat.partial_group_any (&g, 1).map_err(SKError::Io)?), + AggOp::All => b.add_col_from (&mat.partial_group_all (&g, 1).map_err(SKError::Io)?), + AggOp::None => b.add_col_from (&mat.partial_group_none(&g, 1).map_err(SKError::Io)?), + AggOp::Sum => b.add_col_from_int(&mat.partial_group_sum (&g).map_err(SKError::Io)?), + AggOp::Min => b.add_col_from_int(&mat.partial_group_min (&g).map_err(SKError::Io)?), + AggOp::Max => b.add_col_from_int(&mat.partial_group_max (&g).map_err(SKError::Io)?), + }.map_err(SKError::Io)?; + } else { + let b = dst_int.as_deref_mut().unwrap(); + match spec.op { + AggOp::Sum => b.add_col_from (&mat.partial_group_sum (&g).map_err(SKError::Io)?), + AggOp::Min => b.add_col_from (&mat.partial_group_min (&g).map_err(SKError::Io)?), + AggOp::Max => b.add_col_from (&mat.partial_group_max (&g).map_err(SKError::Io)?), + AggOp::Any => b.add_col_from_bit(&mat.partial_group_any (&g, 1).map_err(SKError::Io)?), + AggOp::All => b.add_col_from_bit(&mat.partial_group_all (&g, 1).map_err(SKError::Io)?), + AggOp::None => b.add_col_from_bit(&mat.partial_group_none(&g, 1).map_err(SKError::Io)?), + }.map_err(SKError::Io)?; } } } @@ -168,7 +142,7 @@ impl KmerPartition { src: &KmerPartition, i: usize, specs: &[OutputCol], - n_src_genomes: usize, + _n_src_genomes: usize, threshold: u32, output_presence: bool, in_place: bool, @@ -188,7 +162,6 @@ impl KmerPartition { fs::create_dir_all(&dst_index_dir)?; } - let n_out = specs.len(); let data_subdir = if output_presence { "presence" } else { "counts" }; for l in 0..src_meta.n_layers { @@ -201,7 +174,7 @@ impl KmerPartition { let presence_dir = src_layer_dir.join("presence"); let src_is_count = counts_dir.exists() && !presence_dir.exists(); - // Determine number of slots from the source matrix. + // Determine number of slots and detect implicit layers. let n = if counts_dir.exists() { PersistentCompactIntMatrix::open(&src_layer_dir).map_err(SKError::Io)?.n() } else if presence_dir.exists() { @@ -216,7 +189,7 @@ impl KmerPartition { }; // Choose the output data directory (temp name for in-place). - let (dst_data_dir, final_data_dir) = if in_place { + let (dst_data_dir, final_data_dir): (PathBuf, PathBuf) = if in_place { let tmp = dst_layer_dir.join(format!("{data_subdir}_new")); let perm = dst_layer_dir.join(data_subdir); (tmp, perm) @@ -231,37 +204,22 @@ impl KmerPartition { } fs::create_dir_all(&dst_data_dir)?; - // Initialise packed-format skeleton. - if output_presence { - PersistentBitMatrixBuilder::new(n, &dst_data_dir) - .map_err(SKError::Io)?.close().map_err(SKError::Io)?; + let (mut dst_bit, mut dst_int) = if output_presence { + (Some(PersistentBitMatrixBuilder::new(n, &dst_data_dir).map_err(SKError::Io)?), None) } else { - PersistentCompactIntMatrixBuilder::new(n, &dst_data_dir) - .map_err(SKError::Io)?.close().map_err(SKError::Io)?; - } - - // Create column builders. - let mut builders: Vec = (0..n_out) - .map(|col| -> SKResult { - if output_presence { - Ok(ColBuilder::Bit(PersistentBitVecBuilder::new( - n, &col_path_bit(&dst_data_dir, col), - )?)) - } else { - Ok(ColBuilder::Int(PersistentCompactIntVecBuilder::new( - n, &col_path_int(&dst_data_dir, col), - )?)) - } - }) - .collect::>()?; + (None, Some(PersistentCompactIntMatrixBuilder::new(n, &dst_data_dir).map_err(SKError::Io)?)) + }; fill_builders( - &mut builders, specs, n, n_src_genomes, - &src_layer_dir, src_is_count, threshold, + specs, &src_layer_dir, src_is_count, threshold, output_presence, + dst_bit.as_mut(), dst_int.as_mut(), )?; - for b in builders { b.close()?; } - write_matrix_meta(&dst_data_dir, n, n_out).map_err(SKError::Io)?; + if output_presence { + dst_bit.unwrap().close().map_err(SKError::Io)?; + } else { + dst_int.unwrap().close().map_err(SKError::Io)?; + } // In-place: swap old data dir for new. if in_place {