diff --git a/src/obikindex/src/dump.rs b/src/obikindex/src/dump.rs index 037bb58..cb28652 100644 --- a/src/obikindex/src/dump.rs +++ b/src/obikindex/src/dump.rs @@ -36,7 +36,7 @@ impl KmerIndex { for i in 0..n { if debug { self.partition - .iter_partition_kmers_located(i, use_counts, n_genomes, |part, layer, kmer, row| { + .iter_partition_kmers_located(i, use_counts, n_genomes, &[], |part, layer, kmer, row| { let seq = String::from_utf8(kmer.to_ascii()) .unwrap_or_else(|_| "?".repeat(kmer_size)); let _ = write!(out, "{part},{layer},{seq}"); @@ -48,7 +48,7 @@ impl KmerIndex { .map_err(OKIError::Partition)?; } else { self.partition - .iter_partition_kmers(i, use_counts, n_genomes, |kmer, row| { + .iter_partition_kmers(i, use_counts, n_genomes, &[], |kmer, row| { let seq = String::from_utf8(kmer.to_ascii()) .unwrap_or_else(|_| "?".repeat(kmer_size)); let _ = write!(out, "{seq}"); diff --git a/src/obikpartitionner/src/dump_layer.rs b/src/obikpartitionner/src/dump_layer.rs index 77c7cd3..adc26ac 100644 --- a/src/obikpartitionner/src/dump_layer.rs +++ b/src/obikpartitionner/src/dump_layer.rs @@ -4,6 +4,7 @@ use obiskio::{SKError, SKResult, UnitigFileReader}; use obilayeredmap::{IndexMode, MphfLayer, OLMError}; use obilayeredmap::meta::PartitionMeta; +use crate::filter::{KmerFilter, passes_all}; use crate::partition::KmerPartition; const INDEX_SUBDIR: &str = "index"; @@ -16,18 +17,21 @@ fn olm_to_sk(e: OLMError) -> SKError { } impl KmerPartition { - /// Iterate all indexed kmers in partition `part`, calling `cb(kmer, row)` for each. + /// Iterate all indexed kmers in partition `part`, calling `cb(kmer, row)` for each + /// kmer that passes every filter in `filters`. /// /// `use_counts = true` → reads count columns (u32 values per genome). /// `use_counts = false` → reads presence columns, converted to 0/1 u32. /// /// If no data matrix exists for a layer (pure set-membership, single genome), - /// a row of `n_genomes` ones is emitted for every kmer in that layer. + /// a row of `n_genomes` ones is emitted for every kmer in that layer — unless + /// the filter rejects it, in which case the whole layer is skipped. pub fn iter_partition_kmers( &self, part: usize, use_counts: bool, n_genomes: usize, + filters: &[Box], mut cb: impl FnMut(CanonicalKmer, Box<[u32]>), ) -> SKResult<()> { let index_dir = self.part_dir(part).join(INDEX_SUBDIR); @@ -44,17 +48,20 @@ impl KmerPartition { let layer_dir = index_dir.join(format!("layer_{l}")); if !layer_dir.exists() { break; } l += 1; - let mphf = MphfLayer::open(&layer_dir, &index_mode).map_err(olm_to_sk)?; + let mphf = MphfLayer::open(&layer_dir, &index_mode).map_err(olm_to_sk)?; let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?; - let counts_dir = layer_dir.join("counts"); + let counts_dir = layer_dir.join("counts"); let presence_dir = layer_dir.join("presence"); if use_counts && counts_dir.exists() { let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if let Some(slot) = mphf.find(kmer) { - cb(kmer, mat.row(slot)); + let row = mat.row(slot); + if passes_all(filters, &row, n_genomes) { + cb(kmer, row); + } } } } else if !use_counts && presence_dir.exists() { @@ -62,15 +69,20 @@ impl KmerPartition { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if let Some(slot) = mphf.find(kmer) { let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect(); - cb(kmer, row); + if passes_all(filters, &row, n_genomes) { + cb(kmer, row); + } } } } else { - // No data matrix: pure set-membership layer, all kmers belong to every genome. + // No data matrix: implicit presence — all values are 1. + // The filter result is identical for every kmer, so evaluate once. let all_present: Box<[u32]> = vec![1u32; n_genomes].into(); - for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { - if mphf.find(kmer).is_some() { - cb(kmer, all_present.clone()); + if passes_all(filters, &all_present, n_genomes) { + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + if mphf.find(kmer).is_some() { + cb(kmer, all_present.clone()); + } } } } @@ -79,13 +91,14 @@ impl KmerPartition { Ok(()) } - /// Like [`iter_partition_kmers`] but the callback also receives the layer index, - /// enabling debug output that identifies where each kmer was stored. + /// Like [`iter_partition_kmers`] but the callback also receives `(partition, layer)` + /// indices, enabling debug output that identifies where each kmer was stored. pub fn iter_partition_kmers_located( &self, part: usize, use_counts: bool, n_genomes: usize, + filters: &[Box], mut cb: impl FnMut(usize, usize, CanonicalKmer, Box<[u32]>), ) -> SKResult<()> { let index_dir = self.part_dir(part).join(INDEX_SUBDIR); @@ -101,7 +114,7 @@ impl KmerPartition { loop { let layer_dir = index_dir.join(format!("layer_{layer}")); if !layer_dir.exists() { break; } - let mphf = MphfLayer::open(&layer_dir, &index_mode).map_err(olm_to_sk)?; + let mphf = MphfLayer::open(&layer_dir, &index_mode).map_err(olm_to_sk)?; let reader = UnitigFileReader::open_sequential(&layer_dir.join("unitigs.bin"))?; let counts_dir = layer_dir.join("counts"); @@ -111,7 +124,10 @@ impl KmerPartition { let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if let Some(slot) = mphf.find(kmer) { - cb(part, layer, kmer, mat.row(slot)); + let row = mat.row(slot); + if passes_all(filters, &row, n_genomes) { + cb(part, layer, kmer, row); + } } } } else if !use_counts && presence_dir.exists() { @@ -119,14 +135,18 @@ impl KmerPartition { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if let Some(slot) = mphf.find(kmer) { let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect(); - cb(part, layer, kmer, row); + if passes_all(filters, &row, n_genomes) { + cb(part, layer, kmer, row); + } } } } else { let all_present: Box<[u32]> = vec![1u32; n_genomes].into(); - for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { - if mphf.find(kmer).is_some() { - cb(part, layer, kmer, all_present.clone()); + if passes_all(filters, &all_present, n_genomes) { + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + if mphf.find(kmer).is_some() { + cb(part, layer, kmer, all_present.clone()); + } } } } diff --git a/src/obikpartitionner/src/filter.rs b/src/obikpartitionner/src/filter.rs index 6f3e0f0..d5c6346 100644 --- a/src/obikpartitionner/src/filter.rs +++ b/src/obikpartitionner/src/filter.rs @@ -1,4 +1,4 @@ -/// Trait for kmer row filters used in the rebuild command. +/// Trait for kmer row filters. /// /// `row` contains raw per-genome counts (or 0/1 for presence/absence data). /// `n_genomes` equals `row.len()`. @@ -6,6 +6,12 @@ pub trait KmerFilter: Send + Sync { fn passes(&self, row: &[u32], n_genomes: usize) -> bool; } +/// True when `row` passes every filter in `filters`. +/// Returns `true` if `filters` is empty. +pub fn passes_all(filters: &[Box], row: &[u32], n_genomes: usize) -> bool { + filters.iter().all(|f| f.passes(row, n_genomes)) +} + // ── Quorum filters ───────────────────────────────────────────────────────────── fn present_count(row: &[u32], threshold: u32) -> usize { diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index a6a010b..b152383 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -8,6 +8,6 @@ mod partition; mod query_layer; mod rebuild_layer; -pub use filter::{GroupQuorumFilter, KmerFilter}; +pub use filter::{GroupQuorumFilter, KmerFilter, passes_all}; pub use merge_layer::MergeMode; pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR}; diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs index ed6732a..714bde8 100644 --- a/src/obikpartitionner/src/rebuild_layer.rs +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -7,11 +7,12 @@ use obicompactvec::{PersistentBitMatrixBuilder, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder}; use obidebruinj::GraphDeBruijn; +use obikseq::CanonicalKmer; use obiskio::{SKError, SKResult, UnitigFileReader}; use obilayeredmap::{IndexMode, Layer, MphfLayer, OLMError}; use obilayeredmap::meta::PartitionMeta; -use crate::filter::KmerFilter; +use crate::filter::{KmerFilter, passes_all}; use crate::merge_layer::{MergeMode, SrcLayerData}; use crate::partition::KmerPartition; @@ -75,8 +76,34 @@ fn load_meta(dir: &Path) -> SKResult { } } -fn passes_all(filters: &[Box], row: &[u32], n_genomes: usize) -> bool { - filters.iter().all(|f| f.passes(row, n_genomes)) +/// Iterate all kmers in `src_index_dir` that pass `filters`, yielding `(kmer, row)`. +/// +/// Uses [`SrcLayerData`] semantics: counts take priority over presence when +/// `mode = Count`; presence (or implicit all-ones) is used for `Presence`. +fn iter_src_layers( + src_index_dir: &Path, + mode: MergeMode, + n_genomes: usize, + filters: &[Box], + mut cb: impl FnMut(CanonicalKmer, Box<[u32]>), +) -> SKResult<()> { + let src_meta = load_meta(src_index_dir)?; + for l in 0..src_meta.n_layers { + let src_layer_dir = src_index_dir.join(format!("layer_{l}")); + let unitigs_path = src_layer_dir.join("unitigs.bin"); + if !unitigs_path.exists() { continue; } + + let reader = UnitigFileReader::open_sequential(&unitigs_path)?; + let src_data = SrcLayerData::open(&src_layer_dir, mode)?; + + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + let row = src_data.lookup(kmer, n_genomes); + if passes_all(filters, &row, n_genomes) { + cb(kmer, row.into_boxed_slice()); + } + } + } + Ok(()) } // ── KmerPartition::rebuild_partition ───────────────────────────────────────── @@ -110,22 +137,9 @@ impl KmerPartition { // ── Pass 1: collect filtered kmers into de Bruijn graph ─────────────── let mut g = GraphDeBruijn::new(); - - for l in 0..src_meta.n_layers { - let src_layer_dir = src_index_dir.join(format!("layer_{l}")); - let unitigs_path = src_layer_dir.join("unitigs.bin"); - if !unitigs_path.exists() { continue; } - - let reader = UnitigFileReader::open_sequential(&unitigs_path)?; - let src_data = SrcLayerData::open(&src_layer_dir, mode)?; - - for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { - let row = src_data.lookup(kmer, n_genomes); - if passes_all(filters, &row, n_genomes) { - g.push(kmer); - } - } - } + iter_src_layers(&src_index_dir, mode, n_genomes, filters, |kmer, _row| { + g.push(kmer); + })?; if g.len() == 0 { return Ok(()); @@ -176,24 +190,13 @@ impl KmerPartition { }; // ── Pass 2: fill builders ───────────────────────────────────────────── - for l in 0..src_meta.n_layers { - let src_layer_dir = src_index_dir.join(format!("layer_{l}")); - let unitigs_path = src_layer_dir.join("unitigs.bin"); - if !unitigs_path.exists() { continue; } - - let reader = UnitigFileReader::open_sequential(&unitigs_path)?; - let src_data = SrcLayerData::open(&src_layer_dir, mode)?; - - for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { - let row = src_data.lookup(kmer, n_genomes); - if !passes_all(filters, &row, n_genomes) { continue; } - if let Some(slot) = dst_mphf.find(kmer) { - for (col, &value) in row.iter().enumerate() { - builders[col].set_val(slot, value); - } + iter_src_layers(&src_index_dir, mode, n_genomes, filters, |kmer, row| { + if let Some(slot) = dst_mphf.find(kmer) { + for (col, &value) in row.iter().enumerate() { + builders[col].set_val(slot, value); } } - } + })?; // ── Close builders, write metadata ──────────────────────────────────── for b in builders { b.close()?; }