feat: add kmer filtering and refactor layer iteration

Introduce a `passes_all` utility to validate kmer rows against multiple filters using short-circuit logic. Integrate a `filters` parameter into the iteration functions to conditionally emit kmers based on filter results. Extract repetitive layer traversal and filtering into an `iter_src_layers` helper, refactoring Pass 1 and Pass 2 to eliminate duplication. Additionally, add a debug conditional to the dump output to include partition and layer metadata alongside kmer sequences.
This commit is contained in:
Eric Coissac
2026-06-04 20:43:22 +02:00
parent 476c7a6394
commit a1499e6153
5 changed files with 86 additions and 57 deletions
+2 -2
View File
@@ -36,7 +36,7 @@ impl KmerIndex {
for i in 0..n { for i in 0..n {
if debug { if debug {
self.partition self.partition
.iter_partition_kmers_located(i, use_counts, n_genomes, |part, layer, kmer, row| { .iter_partition_kmers_located(i, use_counts, n_genomes, &[], |part, layer, kmer, row| {
let seq = String::from_utf8(kmer.to_ascii()) let seq = String::from_utf8(kmer.to_ascii())
.unwrap_or_else(|_| "?".repeat(kmer_size)); .unwrap_or_else(|_| "?".repeat(kmer_size));
let _ = write!(out, "{part},{layer},{seq}"); let _ = write!(out, "{part},{layer},{seq}");
@@ -48,7 +48,7 @@ impl KmerIndex {
.map_err(OKIError::Partition)?; .map_err(OKIError::Partition)?;
} else { } else {
self.partition self.partition
.iter_partition_kmers(i, use_counts, n_genomes, |kmer, row| { .iter_partition_kmers(i, use_counts, n_genomes, &[], |kmer, row| {
let seq = String::from_utf8(kmer.to_ascii()) let seq = String::from_utf8(kmer.to_ascii())
.unwrap_or_else(|_| "?".repeat(kmer_size)); .unwrap_or_else(|_| "?".repeat(kmer_size));
let _ = write!(out, "{seq}"); let _ = write!(out, "{seq}");
+38 -18
View File
@@ -4,6 +4,7 @@ use obiskio::{SKError, SKResult, UnitigFileReader};
use obilayeredmap::{IndexMode, MphfLayer, OLMError}; use obilayeredmap::{IndexMode, MphfLayer, OLMError};
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use crate::filter::{KmerFilter, passes_all};
use crate::partition::KmerPartition; use crate::partition::KmerPartition;
const INDEX_SUBDIR: &str = "index"; const INDEX_SUBDIR: &str = "index";
@@ -16,18 +17,21 @@ fn olm_to_sk(e: OLMError) -> SKError {
} }
impl KmerPartition { impl KmerPartition {
/// Iterate all indexed kmers in partition `part`, calling `cb(kmer, row)` for each. /// Iterate all indexed kmers in partition `part`, calling `cb(kmer, row)` for each
/// kmer that passes every filter in `filters`.
/// ///
/// `use_counts = true` → reads count columns (u32 values per genome). /// `use_counts = true` → reads count columns (u32 values per genome).
/// `use_counts = false` → reads presence columns, converted to 0/1 u32. /// `use_counts = false` → reads presence columns, converted to 0/1 u32.
/// ///
/// If no data matrix exists for a layer (pure set-membership, single genome), /// If no data matrix exists for a layer (pure set-membership, single genome),
/// a row of `n_genomes` ones is emitted for every kmer in that layer. /// a row of `n_genomes` ones is emitted for every kmer in that layer — unless
/// the filter rejects it, in which case the whole layer is skipped.
pub fn iter_partition_kmers( pub fn iter_partition_kmers(
&self, &self,
part: usize, part: usize,
use_counts: bool, use_counts: bool,
n_genomes: usize, n_genomes: usize,
filters: &[Box<dyn KmerFilter>],
mut cb: impl FnMut(CanonicalKmer, Box<[u32]>), mut cb: impl FnMut(CanonicalKmer, Box<[u32]>),
) -> SKResult<()> { ) -> SKResult<()> {
let index_dir = self.part_dir(part).join(INDEX_SUBDIR); let index_dir = self.part_dir(part).join(INDEX_SUBDIR);
@@ -44,17 +48,20 @@ impl KmerPartition {
let layer_dir = index_dir.join(format!("layer_{l}")); let layer_dir = index_dir.join(format!("layer_{l}"));
if !layer_dir.exists() { break; } if !layer_dir.exists() { break; }
l += 1; l += 1;
let mphf = MphfLayer::open(&layer_dir, &index_mode).map_err(olm_to_sk)?; let mphf = MphfLayer::open(&layer_dir, &index_mode).map_err(olm_to_sk)?;
let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?; let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?;
let counts_dir = layer_dir.join("counts"); let counts_dir = layer_dir.join("counts");
let presence_dir = layer_dir.join("presence"); let presence_dir = layer_dir.join("presence");
if use_counts && counts_dir.exists() { if use_counts && counts_dir.exists() {
let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
cb(kmer, mat.row(slot)); let row = mat.row(slot);
if passes_all(filters, &row, n_genomes) {
cb(kmer, row);
}
} }
} }
} else if !use_counts && presence_dir.exists() { } else if !use_counts && presence_dir.exists() {
@@ -62,15 +69,20 @@ impl KmerPartition {
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect(); let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect();
cb(kmer, row); if passes_all(filters, &row, n_genomes) {
cb(kmer, row);
}
} }
} }
} else { } else {
// No data matrix: pure set-membership layer, all kmers belong to every genome. // No data matrix: implicit presence — all values are 1.
// The filter result is identical for every kmer, so evaluate once.
let all_present: Box<[u32]> = vec![1u32; n_genomes].into(); let all_present: Box<[u32]> = vec![1u32; n_genomes].into();
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if passes_all(filters, &all_present, n_genomes) {
if mphf.find(kmer).is_some() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
cb(kmer, all_present.clone()); if mphf.find(kmer).is_some() {
cb(kmer, all_present.clone());
}
} }
} }
} }
@@ -79,13 +91,14 @@ impl KmerPartition {
Ok(()) Ok(())
} }
/// Like [`iter_partition_kmers`] but the callback also receives the layer index, /// Like [`iter_partition_kmers`] but the callback also receives `(partition, layer)`
/// enabling debug output that identifies where each kmer was stored. /// indices, enabling debug output that identifies where each kmer was stored.
pub fn iter_partition_kmers_located( pub fn iter_partition_kmers_located(
&self, &self,
part: usize, part: usize,
use_counts: bool, use_counts: bool,
n_genomes: usize, n_genomes: usize,
filters: &[Box<dyn KmerFilter>],
mut cb: impl FnMut(usize, usize, CanonicalKmer, Box<[u32]>), mut cb: impl FnMut(usize, usize, CanonicalKmer, Box<[u32]>),
) -> SKResult<()> { ) -> SKResult<()> {
let index_dir = self.part_dir(part).join(INDEX_SUBDIR); let index_dir = self.part_dir(part).join(INDEX_SUBDIR);
@@ -101,7 +114,7 @@ impl KmerPartition {
loop { loop {
let layer_dir = index_dir.join(format!("layer_{layer}")); let layer_dir = index_dir.join(format!("layer_{layer}"));
if !layer_dir.exists() { break; } if !layer_dir.exists() { break; }
let mphf = MphfLayer::open(&layer_dir, &index_mode).map_err(olm_to_sk)?; let mphf = MphfLayer::open(&layer_dir, &index_mode).map_err(olm_to_sk)?;
let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?; let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?;
let counts_dir = layer_dir.join("counts"); let counts_dir = layer_dir.join("counts");
@@ -111,7 +124,10 @@ impl KmerPartition {
let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
cb(part, layer, kmer, mat.row(slot)); let row = mat.row(slot);
if passes_all(filters, &row, n_genomes) {
cb(part, layer, kmer, row);
}
} }
} }
} else if !use_counts && presence_dir.exists() { } else if !use_counts && presence_dir.exists() {
@@ -119,14 +135,18 @@ impl KmerPartition {
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect(); let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect();
cb(part, layer, kmer, row); if passes_all(filters, &row, n_genomes) {
cb(part, layer, kmer, row);
}
} }
} }
} else { } else {
let all_present: Box<[u32]> = vec![1u32; n_genomes].into(); let all_present: Box<[u32]> = vec![1u32; n_genomes].into();
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if passes_all(filters, &all_present, n_genomes) {
if mphf.find(kmer).is_some() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
cb(part, layer, kmer, all_present.clone()); if mphf.find(kmer).is_some() {
cb(part, layer, kmer, all_present.clone());
}
} }
} }
} }
+7 -1
View File
@@ -1,4 +1,4 @@
/// Trait for kmer row filters used in the rebuild command. /// Trait for kmer row filters.
/// ///
/// `row` contains raw per-genome counts (or 0/1 for presence/absence data). /// `row` contains raw per-genome counts (or 0/1 for presence/absence data).
/// `n_genomes` equals `row.len()`. /// `n_genomes` equals `row.len()`.
@@ -6,6 +6,12 @@ pub trait KmerFilter: Send + Sync {
fn passes(&self, row: &[u32], n_genomes: usize) -> bool; fn passes(&self, row: &[u32], n_genomes: usize) -> bool;
} }
/// True when `row` passes every filter in `filters`.
/// Returns `true` if `filters` is empty.
pub fn passes_all(filters: &[Box<dyn KmerFilter>], row: &[u32], n_genomes: usize) -> bool {
filters.iter().all(|f| f.passes(row, n_genomes))
}
// ── Quorum filters ───────────────────────────────────────────────────────────── // ── Quorum filters ─────────────────────────────────────────────────────────────
fn present_count(row: &[u32], threshold: u32) -> usize { fn present_count(row: &[u32], threshold: u32) -> usize {
+1 -1
View File
@@ -8,6 +8,6 @@ mod partition;
mod query_layer; mod query_layer;
mod rebuild_layer; mod rebuild_layer;
pub use filter::{GroupQuorumFilter, KmerFilter}; pub use filter::{GroupQuorumFilter, KmerFilter, passes_all};
pub use merge_layer::MergeMode; pub use merge_layer::MergeMode;
pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR}; pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR};
+38 -35
View File
@@ -7,11 +7,12 @@ use obicompactvec::{PersistentBitMatrixBuilder,
PersistentCompactIntMatrixBuilder, PersistentCompactIntMatrixBuilder,
PersistentCompactIntVecBuilder}; PersistentCompactIntVecBuilder};
use obidebruinj::GraphDeBruijn; use obidebruinj::GraphDeBruijn;
use obikseq::CanonicalKmer;
use obiskio::{SKError, SKResult, UnitigFileReader}; use obiskio::{SKError, SKResult, UnitigFileReader};
use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError}; use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError};
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use crate::filter::KmerFilter; use crate::filter::{KmerFilter, passes_all};
use crate::merge_layer::{MergeMode, SrcLayerData}; use crate::merge_layer::{MergeMode, SrcLayerData};
use crate::partition::KmerPartition; use crate::partition::KmerPartition;
@@ -75,8 +76,34 @@ fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
} }
} }
fn passes_all(filters: &[Box<dyn KmerFilter>], row: &[u32], n_genomes: usize) -> bool { /// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`.
filters.iter().all(|f| f.passes(row, n_genomes)) ///
/// Uses [`SrcLayerData`] semantics: counts take priority over presence when
/// `mode = Count`; presence (or implicit all-ones) is used for `Presence`.
fn iter_src_layers(
src_index_dir: &Path,
mode: MergeMode,
n_genomes: usize,
filters: &[Box<dyn KmerFilter>],
mut cb: impl FnMut(CanonicalKmer, Box<[u32]>),
) -> SKResult<()> {
let src_meta = load_meta(src_index_dir)?;
for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
let unitigs_path = src_layer_dir.join("unitigs.bin");
if !unitigs_path.exists() { continue; }
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
let row = src_data.lookup(kmer, n_genomes);
if passes_all(filters, &row, n_genomes) {
cb(kmer, row.into_boxed_slice());
}
}
}
Ok(())
} }
// ── KmerPartition::rebuild_partition ───────────────────────────────────────── // ── KmerPartition::rebuild_partition ─────────────────────────────────────────
@@ -110,22 +137,9 @@ impl KmerPartition {
// ── Pass 1: collect filtered kmers into de Bruijn graph ─────────────── // ── Pass 1: collect filtered kmers into de Bruijn graph ───────────────
let mut g = GraphDeBruijn::new(); let mut g = GraphDeBruijn::new();
iter_src_layers(&src_index_dir, mode, n_genomes, filters, |kmer, _row| {
for l in 0..src_meta.n_layers { g.push(kmer);
let src_layer_dir = src_index_dir.join(format!("layer_{l}")); })?;
let unitigs_path = src_layer_dir.join("unitigs.bin");
if !unitigs_path.exists() { continue; }
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
let row = src_data.lookup(kmer, n_genomes);
if passes_all(filters, &row, n_genomes) {
g.push(kmer);
}
}
}
if g.len() == 0 { if g.len() == 0 {
return Ok(()); return Ok(());
@@ -176,24 +190,13 @@ impl KmerPartition {
}; };
// ── Pass 2: fill builders ───────────────────────────────────────────── // ── Pass 2: fill builders ─────────────────────────────────────────────
for l in 0..src_meta.n_layers { iter_src_layers(&src_index_dir, mode, n_genomes, filters, |kmer, row| {
let src_layer_dir = src_index_dir.join(format!("layer_{l}")); if let Some(slot) = dst_mphf.find(kmer) {
let unitigs_path = src_layer_dir.join("unitigs.bin"); for (col, &value) in row.iter().enumerate() {
if !unitigs_path.exists() { continue; } builders[col].set_val(slot, value);
let reader = UnitigFileReader::open_sequential(&unitigs_path)?;
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
let row = src_data.lookup(kmer, n_genomes);
if !passes_all(filters, &row, n_genomes) { continue; }
if let Some(slot) = dst_mphf.find(kmer) {
for (col, &value) in row.iter().enumerate() {
builders[col].set_val(slot, value);
}
} }
} }
} })?;
// ── Close builders, write metadata ──────────────────────────────────── // ── Close builders, write metadata ────────────────────────────────────
for b in builders { b.close()?; } for b in builders { b.close()?; }