feat: add vectorized column filters and optimize partitioner iteration
Adds `FilterMask` and conditional bitwise methods (`*_where`) to `obicompactvec` for composable column-based slot filtering. Extends `obikpartitionner` with a `MatrixGroupOps` trait and `column_mask_expr` method to express aggregate constraints as vectorized masks. Refactors matrix builder management into a unified `Builders` enum and introduces `try_compute_combined_mask`, enabling O(1) slot checks and skipping unnecessary row reads during partitioning and rebuilding passes.
This commit is contained in:
@@ -5,7 +5,7 @@ use std::path::{Path, PathBuf};
|
||||
use memmap2::{Mmap, MmapMut};
|
||||
|
||||
use crate::reader::PersistentCompactIntVec;
|
||||
use crate::views::{BitSliceView, BitSliceIter};
|
||||
use crate::views::{BitSliceIter, BitSliceView, IntSliceView};
|
||||
|
||||
const MAGIC: [u8; 4] = *b"PBIV";
|
||||
|
||||
@@ -241,6 +241,72 @@ impl PersistentBitVecBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// OR in bits at slots where `pred(col[slot])` is true.
|
||||
pub fn or_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) {
|
||||
assert_eq!(self.n, col.len(), "IntSliceView length mismatch");
|
||||
let n = self.n;
|
||||
let primary = col.primary_bytes();
|
||||
let words = self.data_words_mut();
|
||||
let nw = n_words(n);
|
||||
for wi in 0..nw {
|
||||
let base = wi * 64;
|
||||
let limit = (base + 64).min(n);
|
||||
let mut mask = 0u64;
|
||||
for bit in 0..(limit - base) {
|
||||
let b = primary[base + bit];
|
||||
if b < 255 && pred(b as u32) { mask |= 1u64 << bit; }
|
||||
}
|
||||
words[wi] |= mask;
|
||||
}
|
||||
for (slot, val) in col.overflow_entries() {
|
||||
if pred(val) { words[slot >> 6] |= 1u64 << (slot & 63); }
|
||||
}
|
||||
}
|
||||
|
||||
/// Clear bits at slots where `pred(col[slot])` is false.
|
||||
pub fn and_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) {
|
||||
assert_eq!(self.n, col.len(), "IntSliceView length mismatch");
|
||||
let n = self.n;
|
||||
let primary = col.primary_bytes();
|
||||
let words = self.data_words_mut();
|
||||
let nw = n_words(n);
|
||||
for wi in 0..nw {
|
||||
let base = wi * 64;
|
||||
let limit = (base + 64).min(n);
|
||||
let mut mask = 0u64;
|
||||
for bit in 0..(limit - base) {
|
||||
let b = primary[base + bit];
|
||||
if b < 255 && !pred(b as u32) { mask |= 1u64 << bit; }
|
||||
}
|
||||
words[wi] &= !mask;
|
||||
}
|
||||
for (slot, val) in col.overflow_entries() {
|
||||
if !pred(val) { words[slot >> 6] &= !(1u64 << (slot & 63)); }
|
||||
}
|
||||
}
|
||||
|
||||
/// Toggle bits at slots where `pred(col[slot])` is true.
|
||||
pub fn xor_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) {
|
||||
assert_eq!(self.n, col.len(), "IntSliceView length mismatch");
|
||||
let n = self.n;
|
||||
let primary = col.primary_bytes();
|
||||
let words = self.data_words_mut();
|
||||
let nw = n_words(n);
|
||||
for wi in 0..nw {
|
||||
let base = wi * 64;
|
||||
let limit = (base + 64).min(n);
|
||||
let mut mask = 0u64;
|
||||
for bit in 0..(limit - base) {
|
||||
let b = primary[base + bit];
|
||||
if b < 255 && pred(b as u32) { mask |= 1u64 << bit; }
|
||||
}
|
||||
words[wi] ^= mask;
|
||||
}
|
||||
for (slot, val) in col.overflow_entries() {
|
||||
if pred(val) { words[slot >> 6] ^= 1u64 << (slot & 63); }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> BitSliceIter<'_> {
|
||||
self.view().iter()
|
||||
}
|
||||
|
||||
@@ -70,3 +70,73 @@ pub trait MatrixGroupOps {
|
||||
b.freeze()
|
||||
}
|
||||
}
|
||||
|
||||
// ── FilterMask — expression tree for column-based slot filters ────────────────
|
||||
|
||||
/// A composable filter expression that can be evaluated against a matrix
|
||||
/// using only column operations (no MPHF lookup per kmer).
|
||||
///
|
||||
/// `threshold` semantics follow [`MatrixGroupOps::partial_group_presence_count`]:
|
||||
/// a slot contributes to the count when its value is **≥ threshold**.
|
||||
/// To match the row-level filter (`value > t`), callers should pass `t + 1`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum FilterMask {
|
||||
/// Slot passes if count of columns in `indices` with value ≥ `threshold` is ≥ `min_count`.
|
||||
PresenceGeq { indices: Vec<usize>, threshold: u32, min_count: usize },
|
||||
/// Slot passes if count of columns in `indices` with value ≥ `threshold` is ≤ `max_count`.
|
||||
PresenceLeq { indices: Vec<usize>, threshold: u32, max_count: usize },
|
||||
/// Slot passes if sum of values across `indices` columns is ≥ `min_sum`.
|
||||
SumGeq { indices: Vec<usize>, min_sum: u32 },
|
||||
/// Slot passes if sum of values across `indices` columns is ≤ `max_sum`.
|
||||
SumLeq { indices: Vec<usize>, max_sum: u32 },
|
||||
/// Slot passes if it passes all sub-expressions. Empty `And` is always true.
|
||||
And(Vec<FilterMask>),
|
||||
}
|
||||
|
||||
/// Evaluate a [`FilterMask`] against `mat`, returning a per-slot `TempBitVec`
|
||||
/// where bit=1 means the slot passes the filter.
|
||||
pub fn eval_filter_mask(expr: &FilterMask, mat: &dyn MatrixGroupOps, n: usize) -> io::Result<TempBitVec> {
|
||||
match expr {
|
||||
FilterMask::PresenceGeq { indices, threshold, min_count } => {
|
||||
let g = ColGroup::new("", indices.clone());
|
||||
let counts = mat.partial_group_presence_count(&g, *threshold)?;
|
||||
let mut b = TempBitVecBuilder::new(n)?;
|
||||
let mc = *min_count as u32;
|
||||
b.or_where(counts.view(), |v| v >= mc);
|
||||
b.freeze()
|
||||
}
|
||||
FilterMask::PresenceLeq { indices, threshold, max_count } => {
|
||||
let g = ColGroup::new("", indices.clone());
|
||||
let counts = mat.partial_group_presence_count(&g, *threshold)?;
|
||||
let mut b = TempBitVecBuilder::new(n)?;
|
||||
let mc = *max_count as u32;
|
||||
b.or_where(counts.view(), |v| v <= mc);
|
||||
b.freeze()
|
||||
}
|
||||
FilterMask::SumGeq { indices, min_sum } => {
|
||||
let g = ColGroup::new("", indices.clone());
|
||||
let sums = mat.partial_group_sum(&g)?;
|
||||
let mut b = TempBitVecBuilder::new(n)?;
|
||||
let ms = *min_sum;
|
||||
b.or_where(sums.view(), |v| v >= ms);
|
||||
b.freeze()
|
||||
}
|
||||
FilterMask::SumLeq { indices, max_sum } => {
|
||||
let g = ColGroup::new("", indices.clone());
|
||||
let sums = mat.partial_group_sum(&g)?;
|
||||
let mut b = TempBitVecBuilder::new(n)?;
|
||||
let ms = *max_sum;
|
||||
b.or_where(sums.view(), |v| v <= ms);
|
||||
b.freeze()
|
||||
}
|
||||
FilterMask::And(parts) => {
|
||||
let mut b = TempBitVecBuilder::new(n)?;
|
||||
b.not(); // initialise à tout-1 (tout passe)
|
||||
for part in parts {
|
||||
let m = eval_filter_mask(part, mat, n)?;
|
||||
b.and(m.view());
|
||||
}
|
||||
b.freeze()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ pub mod traits;
|
||||
pub use bitvec::{BitIter, PersistentBitVec, PersistentBitVecBuilder};
|
||||
pub use bitmatrix::{PersistentBitMatrix, PersistentBitMatrixBuilder, pack_bit_matrix};
|
||||
pub use builder::PersistentCompactIntVecBuilder;
|
||||
pub use colgroup::{ColGroup, MatrixGroupOps};
|
||||
pub use colgroup::{ColGroup, FilterMask, MatrixGroupOps, eval_filter_mask};
|
||||
pub use intmatrix::{PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, pack_compact_int_matrix};
|
||||
pub use layer_meta::LayerMeta;
|
||||
pub use reader::PersistentCompactIntVec;
|
||||
|
||||
@@ -73,18 +73,31 @@ impl TempBitVecBuilder {
|
||||
self.builder.or(other);
|
||||
}
|
||||
|
||||
/// Set self[slot] where pred(col[slot]) is true. Two-pass: primary then overflow.
|
||||
pub(crate) fn and(&mut self, other: BitSliceView<'_>) {
|
||||
self.builder.and(other);
|
||||
}
|
||||
|
||||
pub(crate) fn xor(&mut self, other: BitSliceView<'_>) {
|
||||
self.builder.xor(other);
|
||||
}
|
||||
|
||||
pub(crate) fn not(&mut self) {
|
||||
self.builder.not();
|
||||
}
|
||||
|
||||
pub(crate) fn copy_from(&mut self, src: BitSliceView<'_>) {
|
||||
self.builder.copy_from(src);
|
||||
}
|
||||
|
||||
pub fn or_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) {
|
||||
for slot in 0..col.len() {
|
||||
let b = col.primary_bytes()[slot];
|
||||
if b < 255 && pred(b as u32) {
|
||||
self.builder.set(slot, true);
|
||||
}
|
||||
}
|
||||
for (slot, val) in col.overflow_entries() {
|
||||
if pred(val) {
|
||||
self.builder.set(slot, true);
|
||||
}
|
||||
}
|
||||
self.builder.or_where(col, pred);
|
||||
}
|
||||
|
||||
pub(crate) fn and_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) {
|
||||
self.builder.and_where(col, pred);
|
||||
}
|
||||
|
||||
pub(crate) fn xor_where(&mut self, col: IntSliceView<'_>, pred: impl Fn(u32) -> bool) {
|
||||
self.builder.xor_where(col, pred);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,24 @@
|
||||
use obicompactvec::FilterMask;
|
||||
|
||||
/// Trait for kmer row filters.
|
||||
///
|
||||
/// `row` contains raw per-genome counts (or 0/1 for presence/absence data).
|
||||
/// `n_genomes` equals `row.len()`.
|
||||
pub trait KmerFilter: Send + Sync {
|
||||
fn passes(&self, row: &[u32], n_genomes: usize) -> bool;
|
||||
|
||||
/// Express this filter as a [`FilterMask`] column-operation expression.
|
||||
///
|
||||
/// Returns `Some(expr)` if the filter can be evaluated solely from matrix
|
||||
/// column aggregates (no per-kmer row scan needed). Returns `None` if the
|
||||
/// filter requires row-level inspection.
|
||||
///
|
||||
/// `threshold` semantics in the returned mask use `>= threshold`, matching
|
||||
/// [`obicompactvec::MatrixGroupOps`]. Implementations must add 1 to any
|
||||
/// row-level threshold that uses strict `>` comparison.
|
||||
fn column_mask_expr(&self, _n_genomes: usize) -> Option<FilterMask> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// True when `row` passes every filter in `filters`.
|
||||
@@ -29,6 +44,16 @@ impl KmerFilter for MinGenomeFraction {
|
||||
let p = present_count(row, self.threshold);
|
||||
p as f64 / n_genomes as f64 >= self.frac
|
||||
}
|
||||
|
||||
fn column_mask_expr(&self, n_genomes: usize) -> Option<FilterMask> {
|
||||
let t = self.threshold.checked_add(1)?;
|
||||
let min_count = (self.frac * n_genomes as f64).ceil() as usize;
|
||||
Some(FilterMask::PresenceGeq {
|
||||
indices: (0..n_genomes).collect(),
|
||||
threshold: t,
|
||||
min_count,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// At most `frac` fraction of genomes contain this kmer (count > `threshold`).
|
||||
@@ -42,6 +67,16 @@ impl KmerFilter for MaxGenomeFraction {
|
||||
let p = present_count(row, self.threshold);
|
||||
p as f64 / n_genomes as f64 <= self.frac
|
||||
}
|
||||
|
||||
fn column_mask_expr(&self, n_genomes: usize) -> Option<FilterMask> {
|
||||
let t = self.threshold.checked_add(1)?;
|
||||
let max_count = (self.frac * n_genomes as f64).floor() as usize;
|
||||
Some(FilterMask::PresenceLeq {
|
||||
indices: (0..n_genomes).collect(),
|
||||
threshold: t,
|
||||
max_count,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// At least `count` genomes contain this kmer (count > `threshold`).
|
||||
@@ -54,6 +89,15 @@ impl KmerFilter for MinGenomeCount {
|
||||
fn passes(&self, row: &[u32], _n_genomes: usize) -> bool {
|
||||
present_count(row, self.threshold) >= self.count
|
||||
}
|
||||
|
||||
fn column_mask_expr(&self, n_genomes: usize) -> Option<FilterMask> {
|
||||
let t = self.threshold.checked_add(1)?;
|
||||
Some(FilterMask::PresenceGeq {
|
||||
indices: (0..n_genomes).collect(),
|
||||
threshold: t,
|
||||
min_count: self.count,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// At most `count` genomes contain this kmer (count > `threshold`).
|
||||
@@ -66,6 +110,15 @@ impl KmerFilter for MaxGenomeCount {
|
||||
fn passes(&self, row: &[u32], _n_genomes: usize) -> bool {
|
||||
present_count(row, self.threshold) <= self.count
|
||||
}
|
||||
|
||||
fn column_mask_expr(&self, n_genomes: usize) -> Option<FilterMask> {
|
||||
let t = self.threshold.checked_add(1)?;
|
||||
Some(FilterMask::PresenceLeq {
|
||||
indices: (0..n_genomes).collect(),
|
||||
threshold: t,
|
||||
max_count: self.count,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ── Total-count filters (count indexes only) ───────────────────────────────────
|
||||
@@ -79,6 +132,13 @@ impl KmerFilter for MinTotalCount {
|
||||
fn passes(&self, row: &[u32], _n_genomes: usize) -> bool {
|
||||
row.iter().sum::<u32>() >= self.total
|
||||
}
|
||||
|
||||
fn column_mask_expr(&self, n_genomes: usize) -> Option<FilterMask> {
|
||||
Some(FilterMask::SumGeq {
|
||||
indices: (0..n_genomes).collect(),
|
||||
min_sum: self.total,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Sum of counts across all genomes <= `total`.
|
||||
@@ -90,6 +150,13 @@ impl KmerFilter for MaxTotalCount {
|
||||
fn passes(&self, row: &[u32], _n_genomes: usize) -> bool {
|
||||
row.iter().sum::<u32>() <= self.total
|
||||
}
|
||||
|
||||
fn column_mask_expr(&self, n_genomes: usize) -> Option<FilterMask> {
|
||||
Some(FilterMask::SumLeq {
|
||||
indices: (0..n_genomes).collect(),
|
||||
max_sum: self.total,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ── Group-based quorum filter ─────────────────────────────────────────────────
|
||||
@@ -113,6 +180,37 @@ pub struct GroupQuorumFilter {
|
||||
pub max_outgroup_frac: f64,
|
||||
}
|
||||
|
||||
impl GroupQuorumFilter {
|
||||
// Build PresenceGeq/PresenceLeq constraints for one group (ingroup or outgroup).
|
||||
fn group_mask_parts(
|
||||
indices: &[usize],
|
||||
threshold: u32,
|
||||
min_count: usize,
|
||||
max_count: usize,
|
||||
min_frac: f64,
|
||||
max_frac: f64,
|
||||
parts: &mut Vec<FilterMask>,
|
||||
) {
|
||||
let n = indices.len();
|
||||
let geq = min_count.max((min_frac * n as f64).ceil() as usize);
|
||||
if geq > 0 {
|
||||
parts.push(FilterMask::PresenceGeq {
|
||||
indices: indices.to_vec(),
|
||||
threshold,
|
||||
min_count: geq,
|
||||
});
|
||||
}
|
||||
let leq = max_count.min((max_frac * n as f64).floor() as usize);
|
||||
if leq < n {
|
||||
parts.push(FilterMask::PresenceLeq {
|
||||
indices: indices.to_vec(),
|
||||
threshold,
|
||||
max_count: leq,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl KmerFilter for GroupQuorumFilter {
|
||||
fn passes(&self, row: &[u32], _n_genomes: usize) -> bool {
|
||||
if !self.ingroup_idx.is_empty() {
|
||||
@@ -139,4 +237,26 @@ impl KmerFilter for GroupQuorumFilter {
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
fn column_mask_expr(&self, _n_genomes: usize) -> Option<FilterMask> {
|
||||
let t = self.threshold.checked_add(1)?;
|
||||
let mut parts: Vec<FilterMask> = Vec::new();
|
||||
if !self.ingroup_idx.is_empty() {
|
||||
Self::group_mask_parts(
|
||||
&self.ingroup_idx, t,
|
||||
self.min_count, self.max_count,
|
||||
self.min_frac, self.max_frac,
|
||||
&mut parts,
|
||||
);
|
||||
}
|
||||
if !self.outgroup_idx.is_empty() {
|
||||
Self::group_mask_parts(
|
||||
&self.outgroup_idx, t,
|
||||
self.min_outgroup_count, self.max_outgroup_count,
|
||||
self.min_outgroup_frac, self.max_outgroup_frac,
|
||||
&mut parts,
|
||||
);
|
||||
}
|
||||
Some(FilterMask::And(parts))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use obipipeline::{
|
||||
};
|
||||
|
||||
use obicompactvec::{
|
||||
MatrixGroupOps,
|
||||
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
|
||||
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder,
|
||||
};
|
||||
@@ -78,6 +79,41 @@ impl SrcLayerData {
|
||||
}
|
||||
buf
|
||||
}
|
||||
|
||||
pub(crate) fn n_slots(&self) -> usize {
|
||||
match self {
|
||||
SrcLayerData::Presence(_, mat) => mat.n(),
|
||||
SrcLayerData::Count(_, mat) => mat.n(),
|
||||
}
|
||||
}
|
||||
|
||||
/// MPHF lookup: returns the slot index for `kmer` (kmer must be in the domain).
|
||||
#[inline]
|
||||
pub(crate) fn slot(&self, kmer: CanonicalKmer) -> usize {
|
||||
match self {
|
||||
SrcLayerData::Presence(mphf, _) => mphf.index(kmer),
|
||||
SrcLayerData::Count(mphf, _) => mphf.index(kmer),
|
||||
}
|
||||
}
|
||||
|
||||
/// Row lookup by slot index, bypassing the MPHF.
|
||||
#[inline]
|
||||
pub(crate) fn fill_row_by_slot(&self, slot: usize, n_genomes: usize) -> Vec<u32> {
|
||||
let mut buf = vec![0u32; n_genomes];
|
||||
match self {
|
||||
SrcLayerData::Presence(_, mat) => mat.fill_row(slot, &mut buf),
|
||||
SrcLayerData::Count(_, mat) => mat.fill_row(slot, &mut buf),
|
||||
}
|
||||
buf
|
||||
}
|
||||
|
||||
/// Call `f` with a reference to the underlying matrix as `&dyn MatrixGroupOps`.
|
||||
pub(crate) fn with_matrix<R>(&self, f: impl FnOnce(&dyn MatrixGroupOps) -> R) -> R {
|
||||
match self {
|
||||
SrcLayerData::Presence(_, mat) => f(mat),
|
||||
SrcLayerData::Count(_, mat) => f(mat),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use std::path::Path;
|
||||
|
||||
use obicompactvec::{
|
||||
PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrixBuilder,
|
||||
PersistentCompactIntVecBuilder,
|
||||
FilterMask, eval_filter_mask,
|
||||
PersistentBitMatrixBuilder, PersistentBitVecBuilder,
|
||||
PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder,
|
||||
};
|
||||
use obidebruinj::GraphDeBruijn;
|
||||
use obikseq::CanonicalKmer;
|
||||
@@ -10,18 +11,135 @@ use obilayeredmap::meta::PartitionMeta;
|
||||
use obilayeredmap::{IndexMode, MphfLayer};
|
||||
use obiskio::{SKError, SKResult, UnitigFileReader};
|
||||
|
||||
use crate::common::{ColBuilder, col_path_bit, col_path_int, load_meta, olm_to_sk, write_matrix_meta};
|
||||
use crate::filter::{KmerFilter, passes_all};
|
||||
use crate::common::{load_meta, olm_to_sk};
|
||||
use crate::filter::KmerFilter;
|
||||
use crate::graph_pipeline::materialize_layer;
|
||||
use crate::merge_layer::{MergeMode, SrcLayerData};
|
||||
use crate::partition::KmerPartition;
|
||||
|
||||
const INDEX_SUBDIR: &str = "index";
|
||||
|
||||
/// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`.
|
||||
// ── Builders — pair matrix builder + column builders for one mode ─────────────
|
||||
|
||||
enum Builders {
|
||||
Presence(PersistentBitMatrixBuilder, Vec<PersistentBitVecBuilder>),
|
||||
Count(PersistentCompactIntMatrixBuilder, Vec<PersistentCompactIntVecBuilder>),
|
||||
}
|
||||
|
||||
impl Builders {
|
||||
fn new(mode: MergeMode, n: usize, dir: &Path, n_genomes: usize) -> SKResult<Self> {
|
||||
match mode {
|
||||
MergeMode::Presence => {
|
||||
let mut mat = PersistentBitMatrixBuilder::new(n, dir).map_err(SKError::Io)?;
|
||||
let mut cols = Vec::with_capacity(n_genomes);
|
||||
for _ in 0..n_genomes { cols.push(mat.add_col().map_err(SKError::Io)?); }
|
||||
Ok(Builders::Presence(mat, cols))
|
||||
}
|
||||
MergeMode::Count => {
|
||||
let mut mat = PersistentCompactIntMatrixBuilder::new(n, dir).map_err(SKError::Io)?;
|
||||
let mut cols = Vec::with_capacity(n_genomes);
|
||||
for _ in 0..n_genomes { cols.push(mat.add_col().map_err(SKError::Io)?); }
|
||||
Ok(Builders::Count(mat, cols))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn set_val(&mut self, col: usize, slot: usize, value: u32) {
|
||||
match self {
|
||||
Builders::Presence(_, cols) => cols[col].set(slot, value > 0),
|
||||
Builders::Count(_, cols) => cols[col].set(slot, value),
|
||||
}
|
||||
}
|
||||
|
||||
fn close(self) -> SKResult<()> {
|
||||
match self {
|
||||
Builders::Presence(mat, cols) => {
|
||||
for b in cols { b.close().map_err(SKError::Io)?; }
|
||||
mat.close().map_err(SKError::Io)
|
||||
}
|
||||
Builders::Count(mat, cols) => {
|
||||
for b in cols { b.close().map_err(SKError::Io)?; }
|
||||
mat.close().map_err(SKError::Io)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── try_compute_combined_mask ─────────────────────────────────────────────────
|
||||
|
||||
/// Build a per-slot `TempBitVec` mask from `filters` using column operations
|
||||
/// on the source matrix — no per-kmer MPHF lookup or row read needed.
|
||||
///
|
||||
/// Uses [`SrcLayerData`] semantics: counts take priority over presence when
|
||||
/// `mode = Count`; presence (or implicit all-ones) is used for `Presence`.
|
||||
/// Returns `Some(mask)` when every filter in `filters` can express itself as
|
||||
/// a [`FilterMask`] expression. Returns `None` when any filter requires
|
||||
/// row-level inspection (fall back to `passes_all`).
|
||||
fn try_compute_combined_mask(
|
||||
filters: &[Box<dyn KmerFilter>],
|
||||
src_data: &SrcLayerData,
|
||||
n_genomes: usize,
|
||||
) -> SKResult<Option<obicompactvec::TempBitVec>> {
|
||||
if filters.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
let mut exprs: Vec<FilterMask> = Vec::with_capacity(filters.len());
|
||||
for f in filters {
|
||||
match f.column_mask_expr(n_genomes) {
|
||||
Some(expr) => exprs.push(expr),
|
||||
None => return Ok(None),
|
||||
}
|
||||
}
|
||||
let combined = FilterMask::And(exprs);
|
||||
let n = src_data.n_slots();
|
||||
let mask = src_data
|
||||
.with_matrix(|mat| eval_filter_mask(&combined, mat, n))
|
||||
.map_err(SKError::Io)?;
|
||||
Ok(Some(mask))
|
||||
}
|
||||
|
||||
// ── iter_src_kmers_masked (pass 1) ────────────────────────────────────────────
|
||||
|
||||
/// Iterate all passing kmers in `src_index_dir`, yielding only the kmer value.
|
||||
///
|
||||
/// When all filters can be expressed as column operations, a per-slot mask is
|
||||
/// computed once per layer and used for O(1) slot-check per kmer instead of a
|
||||
/// full row read. Falls back to row-level `passes_all` otherwise.
|
||||
fn iter_src_kmers_masked(
|
||||
src_index_dir: &Path,
|
||||
mode: MergeMode,
|
||||
n_genomes: usize,
|
||||
filters: &[Box<dyn KmerFilter>],
|
||||
mut cb: impl FnMut(CanonicalKmer),
|
||||
) -> SKResult<()> {
|
||||
let src_meta = load_meta(src_index_dir, "rebuild")?;
|
||||
for l in 0..src_meta.n_layers {
|
||||
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
|
||||
let unitigs_path = src_layer_dir.join("unitigs.bin");
|
||||
if !unitigs_path.exists() { continue; }
|
||||
|
||||
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
||||
let mask = try_compute_combined_mask(filters, &src_data, n_genomes)?;
|
||||
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
|
||||
|
||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||
let slot = src_data.slot(kmer);
|
||||
let passes = match &mask {
|
||||
Some(m) => m.get(slot),
|
||||
None => {
|
||||
let row = src_data.fill_row_by_slot(slot, n_genomes);
|
||||
filters.iter().all(|f| f.passes(&row, n_genomes))
|
||||
}
|
||||
};
|
||||
if passes { cb(kmer); }
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── iter_src_layers (pass 2) ──────────────────────────────────────────────────
|
||||
|
||||
/// Iterate all passing kmers in `src_index_dir`, yielding `(kmer, row)`.
|
||||
///
|
||||
/// When the slot mask is available, skips the row read for filtered-out slots.
|
||||
fn iter_src_layers(
|
||||
src_index_dir: &Path,
|
||||
mode: MergeMode,
|
||||
@@ -33,17 +151,23 @@ fn iter_src_layers(
|
||||
for l in 0..src_meta.n_layers {
|
||||
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
|
||||
let unitigs_path = src_layer_dir.join("unitigs.bin");
|
||||
if !unitigs_path.exists() {
|
||||
continue;
|
||||
}
|
||||
if !unitigs_path.exists() { continue; }
|
||||
|
||||
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
|
||||
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
|
||||
let mask = try_compute_combined_mask(filters, &src_data, n_genomes)?;
|
||||
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
|
||||
|
||||
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
|
||||
let row = src_data.lookup(kmer, n_genomes);
|
||||
if passes_all(filters, &row, n_genomes) {
|
||||
let slot = src_data.slot(kmer);
|
||||
if let Some(ref m) = mask {
|
||||
if !m.get(slot) { continue; }
|
||||
let row = src_data.fill_row_by_slot(slot, n_genomes);
|
||||
cb(kmer, row.into_boxed_slice());
|
||||
} else {
|
||||
let row = src_data.fill_row_by_slot(slot, n_genomes);
|
||||
if filters.iter().all(|f| f.passes(&row, n_genomes)) {
|
||||
cb(kmer, row.into_boxed_slice());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -81,7 +205,7 @@ impl KmerPartition {
|
||||
|
||||
// ── Pass 1: collect filtered kmers into de Bruijn graph ───────────────
|
||||
let mut g = GraphDeBruijn::new();
|
||||
iter_src_layers(&src_index_dir, mode, n_genomes, filters, |kmer, _row| {
|
||||
iter_src_kmers_masked(&src_index_dir, mode, n_genomes, filters, |kmer| {
|
||||
g.push(kmer);
|
||||
})?;
|
||||
|
||||
@@ -103,51 +227,19 @@ impl KmerPartition {
|
||||
MergeMode::Count => dst_layer_dir.join("counts"),
|
||||
};
|
||||
std::fs::create_dir_all(&data_dir)?;
|
||||
|
||||
let mut builders: Vec<ColBuilder> = match mode {
|
||||
MergeMode::Presence => {
|
||||
PersistentBitMatrixBuilder::new(n_new, &data_dir)
|
||||
.map_err(SKError::Io)?
|
||||
.close()
|
||||
.map_err(SKError::Io)?;
|
||||
(0..n_genomes)
|
||||
.map(|g| -> SKResult<ColBuilder> {
|
||||
let b = PersistentBitVecBuilder::new(n_new, &col_path_bit(&data_dir, g))?;
|
||||
Ok(ColBuilder::Bit(b))
|
||||
})
|
||||
.collect::<SKResult<_>>()?
|
||||
}
|
||||
MergeMode::Count => {
|
||||
PersistentCompactIntMatrixBuilder::new(n_new, &data_dir)
|
||||
.map_err(SKError::Io)?
|
||||
.close()
|
||||
.map_err(SKError::Io)?;
|
||||
(0..n_genomes)
|
||||
.map(|g| -> SKResult<ColBuilder> {
|
||||
let b = PersistentCompactIntVecBuilder::new(
|
||||
n_new,
|
||||
&col_path_int(&data_dir, g),
|
||||
)?;
|
||||
Ok(ColBuilder::Int(b))
|
||||
})
|
||||
.collect::<SKResult<_>>()?
|
||||
}
|
||||
};
|
||||
let mut builders = Builders::new(mode, n_new, &data_dir, n_genomes)?;
|
||||
|
||||
// ── Pass 2: fill builders ─────────────────────────────────────────────
|
||||
iter_src_layers(&src_index_dir, mode, n_genomes, filters, |kmer, row| {
|
||||
if let Some(slot) = dst_mphf.find(kmer) {
|
||||
for (col, &value) in row.iter().enumerate() {
|
||||
builders[col].set_val(slot, value);
|
||||
builders.set_val(col, slot, value);
|
||||
}
|
||||
}
|
||||
})?;
|
||||
|
||||
// ── Close builders, write metadata ────────────────────────────────────
|
||||
for b in builders {
|
||||
b.close()?;
|
||||
}
|
||||
write_matrix_meta(&data_dir, n_new, n_genomes).map_err(SKError::Io)?;
|
||||
// ── Close builders and write metadata ─────────────────────────────────
|
||||
builders.close()?;
|
||||
|
||||
PartitionMeta {
|
||||
n_layers: 1,
|
||||
|
||||
Reference in New Issue
Block a user