diff --git a/src/obicompactvec/src/bitvec.rs b/src/obicompactvec/src/bitvec.rs index 8cde36b..966d57f 100644 --- a/src/obicompactvec/src/bitvec.rs +++ b/src/obicompactvec/src/bitvec.rs @@ -5,7 +5,7 @@ use std::path::{Path, PathBuf}; use memmap2::{Mmap, MmapMut}; use crate::reader::PersistentCompactIntVec; -use crate::views::{BitSliceView, BitSliceIter}; +use crate::views::{BitSliceIter, BitSliceView, IntSliceView}; const MAGIC: [u8; 4] = *b"PBIV"; @@ -241,6 +241,72 @@ impl PersistentBitVecBuilder { } } + /// OR in bits at slots where `pred(col[slot])` is true. + pub fn or_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) { + assert_eq!(self.n, col.len(), "IntSliceView length mismatch"); + let n = self.n; + let primary = col.primary_bytes(); + let words = self.data_words_mut(); + let nw = n_words(n); + for wi in 0..nw { + let base = wi * 64; + let limit = (base + 64).min(n); + let mut mask = 0u64; + for bit in 0..(limit - base) { + let b = primary[base + bit]; + if b < 255 && pred(b as u32) { mask |= 1u64 << bit; } + } + words[wi] |= mask; + } + for (slot, val) in col.overflow_entries() { + if pred(val) { words[slot >> 6] |= 1u64 << (slot & 63); } + } + } + + /// Clear bits at slots where `pred(col[slot])` is false. + pub fn and_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) { + assert_eq!(self.n, col.len(), "IntSliceView length mismatch"); + let n = self.n; + let primary = col.primary_bytes(); + let words = self.data_words_mut(); + let nw = n_words(n); + for wi in 0..nw { + let base = wi * 64; + let limit = (base + 64).min(n); + let mut mask = 0u64; + for bit in 0..(limit - base) { + let b = primary[base + bit]; + if b < 255 && !pred(b as u32) { mask |= 1u64 << bit; } + } + words[wi] &= !mask; + } + for (slot, val) in col.overflow_entries() { + if !pred(val) { words[slot >> 6] &= !(1u64 << (slot & 63)); } + } + } + + /// Toggle bits at slots where `pred(col[slot])` is true. + pub fn xor_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) { + assert_eq!(self.n, col.len(), "IntSliceView length mismatch"); + let n = self.n; + let primary = col.primary_bytes(); + let words = self.data_words_mut(); + let nw = n_words(n); + for wi in 0..nw { + let base = wi * 64; + let limit = (base + 64).min(n); + let mut mask = 0u64; + for bit in 0..(limit - base) { + let b = primary[base + bit]; + if b < 255 && pred(b as u32) { mask |= 1u64 << bit; } + } + words[wi] ^= mask; + } + for (slot, val) in col.overflow_entries() { + if pred(val) { words[slot >> 6] ^= 1u64 << (slot & 63); } + } + } + pub fn iter(&self) -> BitSliceIter<'_> { self.view().iter() } diff --git a/src/obicompactvec/src/colgroup.rs b/src/obicompactvec/src/colgroup.rs index fa70830..b8c5ec8 100644 --- a/src/obicompactvec/src/colgroup.rs +++ b/src/obicompactvec/src/colgroup.rs @@ -70,3 +70,73 @@ pub trait MatrixGroupOps { b.freeze() } } + +// ── FilterMask — expression tree for column-based slot filters ──────────────── + +/// A composable filter expression that can be evaluated against a matrix +/// using only column operations (no MPHF lookup per kmer). +/// +/// `threshold` semantics follow [`MatrixGroupOps::partial_group_presence_count`]: +/// a slot contributes to the count when its value is **≥ threshold**. +/// To match the row-level filter (`value > t`), callers should pass `t + 1`. +#[derive(Debug, Clone)] +pub enum FilterMask { + /// Slot passes if count of columns in `indices` with value ≥ `threshold` is ≥ `min_count`. + PresenceGeq { indices: Vec, threshold: u32, min_count: usize }, + /// Slot passes if count of columns in `indices` with value ≥ `threshold` is ≤ `max_count`. + PresenceLeq { indices: Vec, threshold: u32, max_count: usize }, + /// Slot passes if sum of values across `indices` columns is ≥ `min_sum`. + SumGeq { indices: Vec, min_sum: u32 }, + /// Slot passes if sum of values across `indices` columns is ≤ `max_sum`. + SumLeq { indices: Vec, max_sum: u32 }, + /// Slot passes if it passes all sub-expressions. Empty `And` is always true. + And(Vec), +} + +/// Evaluate a [`FilterMask`] against `mat`, returning a per-slot `TempBitVec` +/// where bit=1 means the slot passes the filter. +pub fn eval_filter_mask(expr: &FilterMask, mat: &dyn MatrixGroupOps, n: usize) -> io::Result { + match expr { + FilterMask::PresenceGeq { indices, threshold, min_count } => { + let g = ColGroup::new("", indices.clone()); + let counts = mat.partial_group_presence_count(&g, *threshold)?; + let mut b = TempBitVecBuilder::new(n)?; + let mc = *min_count as u32; + b.or_where(counts.view(), |v| v >= mc); + b.freeze() + } + FilterMask::PresenceLeq { indices, threshold, max_count } => { + let g = ColGroup::new("", indices.clone()); + let counts = mat.partial_group_presence_count(&g, *threshold)?; + let mut b = TempBitVecBuilder::new(n)?; + let mc = *max_count as u32; + b.or_where(counts.view(), |v| v <= mc); + b.freeze() + } + FilterMask::SumGeq { indices, min_sum } => { + let g = ColGroup::new("", indices.clone()); + let sums = mat.partial_group_sum(&g)?; + let mut b = TempBitVecBuilder::new(n)?; + let ms = *min_sum; + b.or_where(sums.view(), |v| v >= ms); + b.freeze() + } + FilterMask::SumLeq { indices, max_sum } => { + let g = ColGroup::new("", indices.clone()); + let sums = mat.partial_group_sum(&g)?; + let mut b = TempBitVecBuilder::new(n)?; + let ms = *max_sum; + b.or_where(sums.view(), |v| v <= ms); + b.freeze() + } + FilterMask::And(parts) => { + let mut b = TempBitVecBuilder::new(n)?; + b.not(); // initialise à tout-1 (tout passe) + for part in parts { + let m = eval_filter_mask(part, mat, n)?; + b.and(m.view()); + } + b.freeze() + } + } +} diff --git a/src/obicompactvec/src/lib.rs b/src/obicompactvec/src/lib.rs index ddd3bdc..25a8032 100644 --- a/src/obicompactvec/src/lib.rs +++ b/src/obicompactvec/src/lib.rs @@ -15,7 +15,7 @@ pub mod traits; pub use bitvec::{BitIter, PersistentBitVec, PersistentBitVecBuilder}; pub use bitmatrix::{PersistentBitMatrix, PersistentBitMatrixBuilder, pack_bit_matrix}; pub use builder::PersistentCompactIntVecBuilder; -pub use colgroup::{ColGroup, MatrixGroupOps}; +pub use colgroup::{ColGroup, FilterMask, MatrixGroupOps, eval_filter_mask}; pub use intmatrix::{PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, pack_compact_int_matrix}; pub use layer_meta::LayerMeta; pub use reader::PersistentCompactIntVec; diff --git a/src/obicompactvec/src/tempbitvec.rs b/src/obicompactvec/src/tempbitvec.rs index 3024ffb..b8991df 100644 --- a/src/obicompactvec/src/tempbitvec.rs +++ b/src/obicompactvec/src/tempbitvec.rs @@ -73,18 +73,31 @@ impl TempBitVecBuilder { self.builder.or(other); } - /// Set self[slot] where pred(col[slot]) is true. Two-pass: primary then overflow. + pub(crate) fn and(&mut self, other: BitSliceView<'_>) { + self.builder.and(other); + } + + pub(crate) fn xor(&mut self, other: BitSliceView<'_>) { + self.builder.xor(other); + } + + pub(crate) fn not(&mut self) { + self.builder.not(); + } + + pub(crate) fn copy_from(&mut self, src: BitSliceView<'_>) { + self.builder.copy_from(src); + } + pub fn or_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) { - for slot in 0..col.len() { - let b = col.primary_bytes()[slot]; - if b < 255 && pred(b as u32) { - self.builder.set(slot, true); - } - } - for (slot, val) in col.overflow_entries() { - if pred(val) { - self.builder.set(slot, true); - } - } + self.builder.or_where(col, pred); + } + + pub(crate) fn and_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) { + self.builder.and_where(col, pred); + } + + pub(crate) fn xor_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) { + self.builder.xor_where(col, pred); } } diff --git a/src/obikpartitionner/src/filter.rs b/src/obikpartitionner/src/filter.rs index d5c6346..00f3b03 100644 --- a/src/obikpartitionner/src/filter.rs +++ b/src/obikpartitionner/src/filter.rs @@ -1,9 +1,24 @@ +use obicompactvec::FilterMask; + /// Trait for kmer row filters. /// /// `row` contains raw per-genome counts (or 0/1 for presence/absence data). /// `n_genomes` equals `row.len()`. pub trait KmerFilter: Send + Sync { fn passes(&self, row: &[u32], n_genomes: usize) -> bool; + + /// Express this filter as a [`FilterMask`] column-operation expression. + /// + /// Returns `Some(expr)` if the filter can be evaluated solely from matrix + /// column aggregates (no per-kmer row scan needed). Returns `None` if the + /// filter requires row-level inspection. + /// + /// `threshold` semantics in the returned mask use `>= threshold`, matching + /// [`obicompactvec::MatrixGroupOps`]. Implementations must add 1 to any + /// row-level threshold that uses strict `>` comparison. + fn column_mask_expr(&self, _n_genomes: usize) -> Option { + None + } } /// True when `row` passes every filter in `filters`. @@ -29,6 +44,16 @@ impl KmerFilter for MinGenomeFraction { let p = present_count(row, self.threshold); p as f64 / n_genomes as f64 >= self.frac } + + fn column_mask_expr(&self, n_genomes: usize) -> Option { + let t = self.threshold.checked_add(1)?; + let min_count = (self.frac * n_genomes as f64).ceil() as usize; + Some(FilterMask::PresenceGeq { + indices: (0..n_genomes).collect(), + threshold: t, + min_count, + }) + } } /// At most `frac` fraction of genomes contain this kmer (count > `threshold`). @@ -42,6 +67,16 @@ impl KmerFilter for MaxGenomeFraction { let p = present_count(row, self.threshold); p as f64 / n_genomes as f64 <= self.frac } + + fn column_mask_expr(&self, n_genomes: usize) -> Option { + let t = self.threshold.checked_add(1)?; + let max_count = (self.frac * n_genomes as f64).floor() as usize; + Some(FilterMask::PresenceLeq { + indices: (0..n_genomes).collect(), + threshold: t, + max_count, + }) + } } /// At least `count` genomes contain this kmer (count > `threshold`). @@ -54,6 +89,15 @@ impl KmerFilter for MinGenomeCount { fn passes(&self, row: &[u32], _n_genomes: usize) -> bool { present_count(row, self.threshold) >= self.count } + + fn column_mask_expr(&self, n_genomes: usize) -> Option { + let t = self.threshold.checked_add(1)?; + Some(FilterMask::PresenceGeq { + indices: (0..n_genomes).collect(), + threshold: t, + min_count: self.count, + }) + } } /// At most `count` genomes contain this kmer (count > `threshold`). @@ -66,6 +110,15 @@ impl KmerFilter for MaxGenomeCount { fn passes(&self, row: &[u32], _n_genomes: usize) -> bool { present_count(row, self.threshold) <= self.count } + + fn column_mask_expr(&self, n_genomes: usize) -> Option { + let t = self.threshold.checked_add(1)?; + Some(FilterMask::PresenceLeq { + indices: (0..n_genomes).collect(), + threshold: t, + max_count: self.count, + }) + } } // ── Total-count filters (count indexes only) ─────────────────────────────────── @@ -79,6 +132,13 @@ impl KmerFilter for MinTotalCount { fn passes(&self, row: &[u32], _n_genomes: usize) -> bool { row.iter().sum::() >= self.total } + + fn column_mask_expr(&self, n_genomes: usize) -> Option { + Some(FilterMask::SumGeq { + indices: (0..n_genomes).collect(), + min_sum: self.total, + }) + } } /// Sum of counts across all genomes <= `total`. @@ -90,6 +150,13 @@ impl KmerFilter for MaxTotalCount { fn passes(&self, row: &[u32], _n_genomes: usize) -> bool { row.iter().sum::() <= self.total } + + fn column_mask_expr(&self, n_genomes: usize) -> Option { + Some(FilterMask::SumLeq { + indices: (0..n_genomes).collect(), + max_sum: self.total, + }) + } } // ── Group-based quorum filter ───────────────────────────────────────────────── @@ -113,6 +180,37 @@ pub struct GroupQuorumFilter { pub max_outgroup_frac: f64, } +impl GroupQuorumFilter { + // Build PresenceGeq/PresenceLeq constraints for one group (ingroup or outgroup). + fn group_mask_parts( + indices: &[usize], + threshold: u32, + min_count: usize, + max_count: usize, + min_frac: f64, + max_frac: f64, + parts: &mut Vec, + ) { + let n = indices.len(); + let geq = min_count.max((min_frac * n as f64).ceil() as usize); + if geq > 0 { + parts.push(FilterMask::PresenceGeq { + indices: indices.to_vec(), + threshold, + min_count: geq, + }); + } + let leq = max_count.min((max_frac * n as f64).floor() as usize); + if leq < n { + parts.push(FilterMask::PresenceLeq { + indices: indices.to_vec(), + threshold, + max_count: leq, + }); + } + } +} + impl KmerFilter for GroupQuorumFilter { fn passes(&self, row: &[u32], _n_genomes: usize) -> bool { if !self.ingroup_idx.is_empty() { @@ -139,4 +237,26 @@ impl KmerFilter for GroupQuorumFilter { } true } + + fn column_mask_expr(&self, _n_genomes: usize) -> Option { + let t = self.threshold.checked_add(1)?; + let mut parts: Vec = Vec::new(); + if !self.ingroup_idx.is_empty() { + Self::group_mask_parts( + &self.ingroup_idx, t, + self.min_count, self.max_count, + self.min_frac, self.max_frac, + &mut parts, + ); + } + if !self.outgroup_idx.is_empty() { + Self::group_mask_parts( + &self.outgroup_idx, t, + self.min_outgroup_count, self.max_outgroup_count, + self.min_outgroup_frac, self.max_outgroup_frac, + &mut parts, + ); + } + Some(FilterMask::And(parts)) + } } diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 0701b6d..32750af 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -10,6 +10,7 @@ use obipipeline::{ }; use obicompactvec::{ + MatrixGroupOps, PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder, }; @@ -78,6 +79,41 @@ impl SrcLayerData { } buf } + + pub(crate) fn n_slots(&self) -> usize { + match self { + SrcLayerData::Presence(_, mat) => mat.n(), + SrcLayerData::Count(_, mat) => mat.n(), + } + } + + /// MPHF lookup: returns the slot index for `kmer` (kmer must be in the domain). + #[inline] + pub(crate) fn slot(&self, kmer: CanonicalKmer) -> usize { + match self { + SrcLayerData::Presence(mphf, _) => mphf.index(kmer), + SrcLayerData::Count(mphf, _) => mphf.index(kmer), + } + } + + /// Row lookup by slot index, bypassing the MPHF. + #[inline] + pub(crate) fn fill_row_by_slot(&self, slot: usize, n_genomes: usize) -> Vec { + let mut buf = vec![0u32; n_genomes]; + match self { + SrcLayerData::Presence(_, mat) => mat.fill_row(slot, &mut buf), + SrcLayerData::Count(_, mat) => mat.fill_row(slot, &mut buf), + } + buf + } + + /// Call `f` with a reference to the underlying matrix as `&dyn MatrixGroupOps`. + pub(crate) fn with_matrix(&self, f: impl FnOnce(&dyn MatrixGroupOps) -> R) -> R { + match self { + SrcLayerData::Presence(_, mat) => f(mat), + SrcLayerData::Count(_, mat) => f(mat), + } + } } // ── helpers ─────────────────────────────────────────────────────────────────── diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs index 6bd40f3..b8893ef 100644 --- a/src/obikpartitionner/src/rebuild_layer.rs +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -1,8 +1,9 @@ use std::path::Path; use obicompactvec::{ - PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder, - PersistentCompactIntVecBuilder, + FilterMask, eval_filter_mask, + PersistentBitMatrixBuilder, PersistentBitVecBuilder, + PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder, }; use obidebruinj::GraphDeBruijn; use obikseq::CanonicalKmer; @@ -10,18 +11,135 @@ use obilayeredmap::meta::PartitionMeta; use obilayeredmap::{IndexMode, MphfLayer}; use obiskio::{SKError, SKResult, UnitigFileReader}; -use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta}; -use crate::filter::{KmerFilter, passes_all}; +use crate::common::{load_meta, olm_to_sk}; +use crate::filter::KmerFilter; use crate::graph_pipeline::materialize_layer; use crate::merge_layer::{MergeMode, SrcLayerData}; use crate::partition::KmerPartition; const INDEX_SUBDIR: &str = "index"; -/// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`. +// ── Builders — pair matrix builder + column builders for one mode ───────────── + +enum Builders { + Presence(PersistentBitMatrixBuilder, Vec), + Count(PersistentCompactIntMatrixBuilder, Vec), +} + +impl Builders { + fn new(mode: MergeMode, n: usize, dir: &Path, n_genomes: usize) -> SKResult { + match mode { + MergeMode::Presence => { + let mut mat = PersistentBitMatrixBuilder::new(n, dir).map_err(SKError::Io)?; + let mut cols = Vec::with_capacity(n_genomes); + for _ in 0..n_genomes { cols.push(mat.add_col().map_err(SKError::Io)?); } + Ok(Builders::Presence(mat, cols)) + } + MergeMode::Count => { + let mut mat = PersistentCompactIntMatrixBuilder::new(n, dir).map_err(SKError::Io)?; + let mut cols = Vec::with_capacity(n_genomes); + for _ in 0..n_genomes { cols.push(mat.add_col().map_err(SKError::Io)?); } + Ok(Builders::Count(mat, cols)) + } + } + } + + fn set_val(&mut self, col: usize, slot: usize, value: u32) { + match self { + Builders::Presence(_, cols) => cols[col].set(slot, value > 0), + Builders::Count(_, cols) => cols[col].set(slot, value), + } + } + + fn close(self) -> SKResult<()> { + match self { + Builders::Presence(mat, cols) => { + for b in cols { b.close().map_err(SKError::Io)?; } + mat.close().map_err(SKError::Io) + } + Builders::Count(mat, cols) => { + for b in cols { b.close().map_err(SKError::Io)?; } + mat.close().map_err(SKError::Io) + } + } + } +} + +// ── try_compute_combined_mask ───────────────────────────────────────────────── + +/// Build a per-slot `TempBitVec` mask from `filters` using column operations +/// on the source matrix — no per-kmer MPHF lookup or row read needed. /// -/// Uses [`SrcLayerData`] semantics: counts take priority over presence when -/// `mode = Count`; presence (or implicit all-ones) is used for `Presence`. +/// Returns `Some(mask)` when every filter in `filters` can express itself as +/// a [`FilterMask`] expression. Returns `None` when any filter requires +/// row-level inspection (fall back to `passes_all`). +fn try_compute_combined_mask( + filters: &[Box], + src_data: &SrcLayerData, + n_genomes: usize, +) -> SKResult> { + if filters.is_empty() { + return Ok(None); + } + let mut exprs: Vec = Vec::with_capacity(filters.len()); + for f in filters { + match f.column_mask_expr(n_genomes) { + Some(expr) => exprs.push(expr), + None => return Ok(None), + } + } + let combined = FilterMask::And(exprs); + let n = src_data.n_slots(); + let mask = src_data + .with_matrix(|mat| eval_filter_mask(&combined, mat, n)) + .map_err(SKError::Io)?; + Ok(Some(mask)) +} + +// ── iter_src_kmers_masked (pass 1) ──────────────────────────────────────────── + +/// Iterate all passing kmers in `src_index_dir`, yielding only the kmer value. +/// +/// When all filters can be expressed as column operations, a per-slot mask is +/// computed once per layer and used for O(1) slot-check per kmer instead of a +/// full row read. Falls back to row-level `passes_all` otherwise. +fn iter_src_kmers_masked( + src_index_dir: &Path, + mode: MergeMode, + n_genomes: usize, + filters: &[Box], + mut cb: impl FnMut(CanonicalKmer), +) -> SKResult<()> { + let src_meta = load_meta(src_index_dir, "rebuild")?; + for l in 0..src_meta.n_layers { + let src_layer_dir = src_index_dir.join(format!("layer_{l}")); + let unitigs_path = src_layer_dir.join("unitigs.bin"); + if !unitigs_path.exists() { continue; } + + let src_data = SrcLayerData::open(&src_layer_dir, mode)?; + let mask = try_compute_combined_mask(filters, &src_data, n_genomes)?; + let reader = UnitigFileReader::open_sequential(&unitigs_path)?; + + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + let slot = src_data.slot(kmer); + let passes = match &mask { + Some(m) => m.get(slot), + None => { + let row = src_data.fill_row_by_slot(slot, n_genomes); + filters.iter().all(|f| f.passes(&row, n_genomes)) + } + }; + if passes { cb(kmer); } + } + } + Ok(()) +} + +// ── iter_src_layers (pass 2) ────────────────────────────────────────────────── + +/// Iterate all passing kmers in `src_index_dir`, yielding `(kmer, row)`. +/// +/// When the slot mask is available, skips the row read for filtered-out slots. fn iter_src_layers( src_index_dir: &Path, mode: MergeMode, @@ -33,17 +151,23 @@ fn iter_src_layers( for l in 0..src_meta.n_layers { let src_layer_dir = src_index_dir.join(format!("layer_{l}")); let unitigs_path = src_layer_dir.join("unitigs.bin"); - if !unitigs_path.exists() { - continue; - } + if !unitigs_path.exists() { continue; } - let reader = UnitigFileReader::open_sequential(&unitigs_path)?; let src_data = SrcLayerData::open(&src_layer_dir, mode)?; + let mask = try_compute_combined_mask(filters, &src_data, n_genomes)?; + let reader = UnitigFileReader::open_sequential(&unitigs_path)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { - let row = src_data.lookup(kmer, n_genomes); - if passes_all(filters, &row, n_genomes) { + let slot = src_data.slot(kmer); + if let Some(ref m) = mask { + if !m.get(slot) { continue; } + let row = src_data.fill_row_by_slot(slot, n_genomes); cb(kmer, row.into_boxed_slice()); + } else { + let row = src_data.fill_row_by_slot(slot, n_genomes); + if filters.iter().all(|f| f.passes(&row, n_genomes)) { + cb(kmer, row.into_boxed_slice()); + } } } } @@ -81,7 +205,7 @@ impl KmerPartition { // ── Pass 1: collect filtered kmers into de Bruijn graph ─────────────── let mut g = GraphDeBruijn::new(); - iter_src_layers(&src_index_dir, mode, n_genomes, filters, |kmer, _row| { + iter_src_kmers_masked(&src_index_dir, mode, n_genomes, filters, |kmer| { g.push(kmer); })?; @@ -100,54 +224,22 @@ impl KmerPartition { // ── Prepare matrix builders (one column per genome) ─────────────────── let data_dir = match mode { MergeMode::Presence => dst_layer_dir.join("presence"), - MergeMode::Count => dst_layer_dir.join("counts"), + MergeMode::Count => dst_layer_dir.join("counts"), }; std::fs::create_dir_all(&data_dir)?; - - let mut builders: Vec = match mode { - MergeMode::Presence => { - PersistentBitMatrixBuilder::new(n_new, &data_dir) - .map_err(SKError::Io)? - .close() - .map_err(SKError::Io)?; - (0..n_genomes) - .map(|g| -> SKResult { - let b = PersistentBitVecBuilder::new(n_new, &col_path_bit(&data_dir, g))?; - Ok(ColBuilder::Bit(b)) - }) - .collect::>()? - } - MergeMode::Count => { - PersistentCompactIntMatrixBuilder::new(n_new, &data_dir) - .map_err(SKError::Io)? - .close() - .map_err(SKError::Io)?; - (0..n_genomes) - .map(|g| -> SKResult { - let b = PersistentCompactIntVecBuilder::new( - n_new, - &col_path_int(&data_dir, g), - )?; - Ok(ColBuilder::Int(b)) - }) - .collect::>()? - } - }; + let mut builders = Builders::new(mode, n_new, &data_dir, n_genomes)?; // ── Pass 2: fill builders ───────────────────────────────────────────── iter_src_layers(&src_index_dir, mode, n_genomes, filters, |kmer, row| { if let Some(slot) = dst_mphf.find(kmer) { for (col, &value) in row.iter().enumerate() { - builders[col].set_val(slot, value); + builders.set_val(col, slot, value); } } })?; - // ── Close builders, write metadata ──────────────────────────────────── - for b in builders { - b.close()?; - } - write_matrix_meta(&data_dir, n_new, n_genomes).map_err(SKError::Io)?; + // ── Close builders and write metadata ───────────────────────────────── + builders.close()?; PartitionMeta { n_layers: 1,