Push lrwmyplxxzkn #19

Merged
coissac merged 4 commits from push-lrwmyplxxzkn into main 2026-06-09 08:28:21 +00:00
8 changed files with 259 additions and 70 deletions
@@ -1,9 +1,10 @@
# Kmer filtering and ingroup/outgroup predicates # Kmer filtering and ingroup/outgroup predicates
The `rebuild`, `dump`, and `unitig` commands all share the same filtering The `rebuild`, `dump`, and `unitig` commands share the same filtering system,
system. Filters can select k-mers based on per-genome quorum counts, optionally implemented as a shared `FilterArgs` clap argument group embedded in each command
restricted to **ingroup** and **outgroup** genome sets derived from genome via `#[command(flatten)]`. Filters select k-mers based on per-genome quorum
metadata. counts, optionally restricted to **ingroup** and **outgroup** genome sets derived
from genome metadata. All rules described here apply identically to all three commands.
`rebuild` additionally accepts `--min-total-count` / `--max-total-count` filters `rebuild` additionally accepts `--min-total-count` / `--max-total-count` filters
that operate on the sum of counts across all genomes. that operate on the sum of counts across all genomes.
@@ -96,13 +97,72 @@ For each genome:
| `--max-total-count N` | all genomes | sum of per-genome counts ≤ N (`rebuild` only) | | `--max-total-count N` | all genomes | sum of per-genome counts ≤ N (`rebuild` only) |
| `--presence-threshold N` | all | per-genome count > N to be considered "present" (default 0) | | `--presence-threshold N` | all | per-genome count > N to be considered "present" (default 0) |
Defaults: mins = 0 (no lower bound), max counts = group size, max fracs = 1.0 **Conditional defaults** — the defaults for `--min-frac` and `--max-outgroup-count` depend on two conditions:
(no upper bound). A filter with all defaults is a no-op. whether the corresponding group was declared, **and** whether any quorum flag for that group was explicitly set.
> **Rule**: declaring a group activates the smart default **only if no quorum flag for that group is explicitly set**.
> As soon as any quorum flag for a group is present on the command line, all defaults for that group revert to no-op values.
| `--ingroup` | Any ingroup quorum flag? | `--min-frac` default |
|-------------|--------------------------|----------------------|
| not set | — | 0.0 (no-op) |
| set | no | **1.0** — all ingroup genomes must carry the k-mer |
| set | yes | 0.0 — user controls quorum explicitly |
| `--outgroup` | Any outgroup quorum flag? | `--max-outgroup-count` default |
|--------------|---------------------------|-------------------------------|
| not set | — | outgroup size (no-op) |
| set | no | **0** — no outgroup genome may carry the k-mer |
| set | yes | outgroup size — user controls quorum explicitly |
"Any ingroup quorum flag" means any of: `--min-count`, `--max-count`, `--min-frac`, `--max-frac`.
"Any outgroup quorum flag" means any of: `--min-outgroup-count`, `--max-outgroup-count`, `--min-outgroup-frac`, `--max-outgroup-frac`.
**Why this rule?** Setting any quorum flag signals explicit intent — the defaults are there to help when the user omits quorum entirely, not to interfere with deliberate constraints. Mixing implicit and explicit quorum on the same group would risk silent incoherence (e.g. `--max-count 0` with an implicit `--min-frac 1.0`).
All other bounds default to 0 / group size / 0.0 / 1.0 regardless of whether groups are declared.
### Validation
After resolving defaults, the following are checked and cause an immediate error:
| Condition | Error |
|-----------|-------|
| `--min-count > --max-count` | incoherent bounds |
| `--min-frac > --max-frac` | incoherent bounds |
| `--min-outgroup-count > --max-outgroup-count` | incoherent bounds |
| `--min-outgroup-frac > --max-outgroup-frac` | incoherent bounds |
| any fraction outside `[0.0, 1.0]` | invalid value |
The check applies to the **effective** values (after defaults are resolved), so an explicit `--max-frac 0.5` with an implicit `--min-frac 1.0` would have been caught — but the rule above prevents that situation from arising in the first place.
Fractions are computed over the size of the classified group, not over total Fractions are computed over the size of the classified group, not over total
genome count. An empty group (no genome classified as ingroup/outgroup) never genome count. An empty group (no genome classified as ingroup/outgroup) never
triggers a filter failure. triggers a filter failure.
### Conservative rounding of fraction thresholds
When a fraction threshold `F` is applied to a group of size `N`, the effective
integer threshold is determined by the direction of the bound:
| Bound | Effective count | Rounding | Rationale |
|-------|----------------|----------|-----------|
| `--min-frac F` | k-mer in ≥ ⌈F·N⌉ genomes | **ceil** | stricter — a kmer present in exactly ⌊F·N⌋ genomes does not meet the fraction |
| `--max-frac F` | k-mer in ≤ ⌊F·N⌋ genomes | **floor** | stricter — a kmer present in ⌈F·N⌉ genomes already exceeds the fraction |
The same rule applies symmetrically to `--min-outgroup-frac` (ceil) and
`--max-outgroup-frac` (floor). The outgroup direction is not inverted: the
conservative rounding depends only on whether the bound is a minimum or a
maximum, not on which group it applies to.
**Example** — `--min-frac 0.5` with an ingroup of 3 genomes:
`⌈0.5 × 3⌉ = ⌈1.5⌉ = 2` → at least 2 of 3 ingroup genomes must carry the k-mer.
**Implementation note** — the filter evaluates `n / denom < min_frac` directly
(integer `n`, float comparison) rather than pre-computing `⌈F·N⌉`. This is
mathematically equivalent for integer counts: `n / N < F``n < F·N`
`n ≤ ⌈F·N⌉ 1``n < ⌈F·N⌉`. No explicit rounding is needed.
## Examples ## Examples
Keep k-mers specific to *Betula nana* — present in at least 2 *B. nana* genomes Keep k-mers specific to *Betula nana* — present in at least 2 *B. nana* genomes
@@ -148,8 +208,7 @@ obikmer rebuild src --output dst \
--max-outgroup-count 0 --max-outgroup-count 0
``` ```
The same flags work identically for `dump` and `unitig`. To dump only k-mers To dump only k-mers specific to *Betula nana*:
specific to *Betula nana*:
```sh ```sh
obikmer dump myindex \ obikmer dump myindex \
@@ -169,6 +228,31 @@ obikmer unitig myindex \
--max-outgroup-count 0 --max-outgroup-count 0
``` ```
## Command-specific options
### `dump --head N`
Stops output after the first N k-mers that pass all active filters.
Iteration terminates immediately — subsequent partitions and layers are not scanned.
Useful for quick inspection of large indexes without loading the entire dataset.
```sh
obikmer dump myindex --head 100
obikmer dump myindex --head 20 --ingroup "species=Betula_nana" --min-count 1
```
### `distance --presence-threshold N`
When computing Jaccard distance on a **count index**, a k-mer is considered present in a genome if its count is ≥ N (default 1).
This option is independent of the `--presence-threshold` used in `rebuild`/`query` filtering.
```sh
# Jaccard treating kmers with count ≥ 2 as present
obikmer distance myindex --metric jaccard --presence-threshold 2
```
This parameter has no effect on presence/absence indexes (where values are already 0/1) or on metrics other than Jaccard.
## Implementation ## Implementation
- **`obikpartitionner::filter::GroupQuorumFilter`** — implements `KmerFilter` - **`obikpartitionner::filter::GroupQuorumFilter`** — implements `KmerFilter`
+4 -4
View File
@@ -9,12 +9,12 @@
| `superkmer` | Extract super-kmers from a sequence file and write to stdout | | `superkmer` | Extract super-kmers from a sequence file and write to stdout |
| `index` | Build a complete genome index (scatter → dereplicate → count → layered MPHF) | | `index` | Build a complete genome index (scatter → dereplicate → count → layered MPHF) |
| `merge` | Merge multiple built indexes into one | | `merge` | Merge multiple built indexes into one |
| `rebuild` | Filter and compact an existing index into a new single-layer index; supports ingroup/outgroup predicates on genome metadata | | `rebuild` | Filter and compact an existing index into a new single-layer index; supports the shared [kmer filtering](implementation/filtering.md) system |
| `query` | Query an index with sequences and annotate matches | | `query` | Query an index with sequences and annotate matches |
| `dump` | Dump all indexed k-mers as CSV (kmer + per-genome counts or presence); supports the same ingroup/outgroup filtering as `rebuild` | | `dump` | Dump all indexed k-mers as CSV (kmer + per-genome counts or presence); supports the shared [kmer filtering](implementation/filtering.md) system; `--head N` limits output to the first N k-mers |
| `annotate` | Add or update genome metadata from a CSV file; or dump metadata as CSV | | `annotate` | Add or update genome metadata from a CSV file; or dump metadata as CSV |
| `distance` | Compute pairwise distance matrix between genomes; optionally build NJ/UPGMA trees | | `distance` | Compute pairwise distance matrix between genomes; optionally build NJ/UPGMA trees; `--presence-threshold N` sets the minimum count to consider a k-mer present when computing Jaccard on count indexes (default 1) |
| `unitig` | Build a global de Bruijn graph across all partitions and enumerate its unitigs as FASTA; supports the same ingroup/outgroup filtering as `rebuild` | | `unitig` | Build a global de Bruijn graph across all partitions and enumerate its unitigs as FASTA; supports the shared [kmer filtering](implementation/filtering.md) system |
| `estimate` | Estimate approximate-index parameters (z, evidence bits, FP rates) before indexing | | `estimate` | Estimate approximate-index parameters (z, evidence bits, FP rates) before indexing |
| `reindex` | Convert an index's evidence in-place: exact ↔ approx | | `reindex` | Convert an index's evidence in-place: exact ↔ approx |
| `utils` | Miscellaneous index utilities: `--new-label NEW=OLD` renames a genome label; `--upgrade-index` adds missing `layer_meta.json` to old indexes | | `utils` | Miscellaneous index utilities: `--new-label NEW=OLD` renames a genome label; `--upgrade-index` adds missing `layer_meta.json` to old indexes |
+1 -1
View File
@@ -49,7 +49,7 @@ nav:
- PersistentCompactIntVec: implementation/persistent_compact_int_vec.md - PersistentCompactIntVec: implementation/persistent_compact_int_vec.md
- PersistentBitVec: implementation/persistent_bit_vec.md - PersistentBitVec: implementation/persistent_bit_vec.md
- Merge command: implementation/merge.md - Merge command: implementation/merge.md
- Kmer filtering (rebuild/dump/unitig): implementation/rebuild_filter.md - Kmer filtering: implementation/filtering.md
- Architecture: - Architecture:
- Sequences: architecture/sequences/invariant.md - Sequences: architecture/sequences/invariant.md
- Kmer index: architecture/index_architecture.md - Kmer index: architecture/index_architecture.md
+58 -24
View File
@@ -1,4 +1,7 @@
use std::io::Write; use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
use rayon::prelude::*;
use crate::error::{OKIError, OKIResult}; use crate::error::{OKIError, OKIResult};
use crate::index::KmerIndex; use crate::index::KmerIndex;
@@ -13,6 +16,9 @@ impl KmerIndex {
/// `force_presence` overrides `with_counts`: even if the index stores counts, /// `force_presence` overrides `with_counts`: even if the index stores counts,
/// the output uses 0/1 presence columns. /// the output uses 0/1 presence columns.
/// ///
/// Partitions are scanned in parallel; each partition buffers its output locally
/// before the main thread writes the chunks in partition order.
///
/// The caller must have set the global kmer length (`obikseq::set_k`) before /// The caller must have set the global kmer length (`obikseq::set_k`) before
/// calling this method. /// calling this method.
pub fn dump<W: Write>( pub fn dump<W: Write>(
@@ -20,6 +26,7 @@ impl KmerIndex {
out: &mut W, out: &mut W,
force_presence: bool, force_presence: bool,
debug: bool, debug: bool,
head: Option<usize>,
filters: &[Box<dyn KmerFilter>], filters: &[Box<dyn KmerFilter>],
) -> OKIResult<()> { ) -> OKIResult<()> {
let genomes = &self.meta.genomes; let genomes = &self.meta.genomes;
@@ -37,30 +44,57 @@ impl KmerIndex {
} }
writeln!(out)?; writeln!(out)?;
// ── Rows ────────────────────────────────────────────────────────────── // ── Rows — parallel over partitions ───────────────────────────────────
let n = self.n_partitions(); let n = self.n_partitions();
for i in 0..n { let remaining = AtomicUsize::new(head.unwrap_or(usize::MAX));
if debug {
self.partition let chunks: Vec<OKIResult<Vec<u8>>> = (0..n)
.iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| { .into_par_iter()
let seq = String::from_utf8(kmer.to_ascii()) .map(|i| {
.unwrap_or_else(|_| "?".repeat(kmer_size)); if remaining.load(Ordering::Relaxed) == 0 {
let _ = write!(out, "{part},{layer},{seq}"); return Ok(vec![]);
for &v in row.iter() { let _ = write!(out, ",{v}"); } }
let _ = writeln!(out); let mut buf = Vec::<u8>::new();
})
.map_err(OKIError::Partition)?; let write_kmer = |buf: &mut Vec<u8>, row: &[u32], prefix: &str| -> bool {
} else { match remaining.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |cur| {
self.partition if cur > 0 { Some(cur - 1) } else { None }
.iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| { }) {
let seq = String::from_utf8(kmer.to_ascii()) Err(_) => false,
.unwrap_or_else(|_| "?".repeat(kmer_size)); Ok(_) => {
let _ = write!(out, "{seq}"); let _ = buf.write_all(prefix.as_bytes());
for &v in row.iter() { let _ = write!(out, ",{v}"); } for &v in row { let _ = write!(buf, ",{v}"); }
let _ = writeln!(out); let _ = buf.write_all(b"\n");
}) true
.map_err(OKIError::Partition)?; }
} }
};
if debug {
self.partition
.iter_partition_kmers_located(i, use_counts, n_genomes, filters, |part, layer, kmer, row| {
let seq = String::from_utf8(kmer.to_ascii())
.unwrap_or_else(|_| "?".repeat(kmer_size));
write_kmer(&mut buf, &row, &format!("{part},{layer},{seq}"))
})
.map_err(OKIError::Partition)?;
} else {
self.partition
.iter_partition_kmers(i, use_counts, n_genomes, filters, |kmer, row| {
let seq = String::from_utf8(kmer.to_ascii())
.unwrap_or_else(|_| "?".repeat(kmer_size));
write_kmer(&mut buf, &row, &seq)
})
.map_err(OKIError::Partition)?;
}
Ok(buf)
})
.collect();
// ── Sequential write ──────────────────────────────────────────────────
for chunk in chunks {
out.write_all(&chunk?)?;
} }
out.flush()?; out.flush()?;
+5 -1
View File
@@ -20,6 +20,10 @@ pub struct DumpArgs {
#[arg(long, default_value_t = false)] #[arg(long, default_value_t = false)]
pub debug: bool, pub debug: bool,
/// Only output the first N kmers
#[arg(long)]
pub head: Option<usize>,
#[command(flatten)] #[command(flatten)]
pub filter: FilterArgs, pub filter: FilterArgs,
} }
@@ -41,7 +45,7 @@ pub fn run(args: DumpArgs) {
let stdout = io::stdout(); let stdout = io::stdout();
let mut out = BufWriter::new(stdout.lock()); let mut out = BufWriter::new(stdout.lock());
idx.dump(&mut out, args.force_presence, args.debug, &filters).unwrap_or_else(|e| { idx.dump(&mut out, args.force_presence, args.debug, args.head, &filters).unwrap_or_else(|e| {
eprintln!("dump error: {e}"); eprintln!("dump error: {e}");
std::process::exit(1); std::process::exit(1);
}); });
+55 -14
View File
@@ -162,6 +162,7 @@ pub struct FilterArgs {
pub max_count: Option<usize>, pub max_count: Option<usize>,
/// Minimum fraction of ingroup genomes containing the k-mer [0.01.0] /// Minimum fraction of ingroup genomes containing the k-mer [0.01.0]
/// (default 1.0 when --ingroup is set, 0.0 otherwise)
#[arg(long)] #[arg(long)]
pub min_frac: Option<f64>, pub min_frac: Option<f64>,
@@ -174,6 +175,7 @@ pub struct FilterArgs {
pub min_outgroup_count: Option<usize>, pub min_outgroup_count: Option<usize>,
/// Maximum number of outgroup genomes containing the k-mer /// Maximum number of outgroup genomes containing the k-mer
/// (default 0 when --outgroup is set, no constraint otherwise)
#[arg(long)] #[arg(long)]
pub max_outgroup_count: Option<usize>, pub max_outgroup_count: Option<usize>,
@@ -205,7 +207,7 @@ impl FilterArgs {
std::process::exit(1); std::process::exit(1);
})) }))
.collect(); .collect();
vec![Box::new(build_group_filter( let filter = build_group_filter(
genomes, genomes,
&ingroup_preds, &ingroup_preds,
&outgroup_preds, &outgroup_preds,
@@ -220,7 +222,11 @@ impl FilterArgs {
min_outgroup_frac: self.min_outgroup_frac, min_outgroup_frac: self.min_outgroup_frac,
max_outgroup_frac: self.max_outgroup_frac, max_outgroup_frac: self.max_outgroup_frac,
}, },
))] ).unwrap_or_else(|e| {
eprintln!("error in filter parameters: {e}");
std::process::exit(1);
});
vec![Box::new(filter)]
} }
} }
@@ -241,7 +247,7 @@ pub fn build_group_filter(
ingroup_preds: &[MetaPred], ingroup_preds: &[MetaPred],
outgroup_preds: &[MetaPred], outgroup_preds: &[MetaPred],
p: GroupFilterParams, p: GroupFilterParams,
) -> GroupQuorumFilter { ) -> Result<GroupQuorumFilter, String> {
let (ingroup_idx, outgroup_idx) = if ingroup_preds.is_empty() && outgroup_preds.is_empty() { let (ingroup_idx, outgroup_idx) = if ingroup_preds.is_empty() && outgroup_preds.is_empty() {
((0..genomes.len()).collect(), vec![]) ((0..genomes.len()).collect(), vec![])
} else { } else {
@@ -258,17 +264,52 @@ pub fn build_group_filter(
let in_size = ingroup_idx.len(); let in_size = ingroup_idx.len();
let out_size = outgroup_idx.len(); let out_size = outgroup_idx.len();
GroupQuorumFilter { let ingroup_quorum_explicit = p.min_count.is_some() || p.max_count.is_some()
|| p.min_frac.is_some() || p.max_frac.is_some();
let outgroup_quorum_explicit = p.min_outgroup_count.is_some() || p.max_outgroup_count.is_some()
|| p.min_outgroup_frac.is_some() || p.max_outgroup_frac.is_some();
let default_min_frac = if !ingroup_preds.is_empty() && !ingroup_quorum_explicit { 1.0 } else { 0.0 };
let default_max_outgroup_count = if !outgroup_preds.is_empty() && !outgroup_quorum_explicit { 0 } else { out_size };
let min_count = p.min_count.unwrap_or(0);
let max_count = p.max_count.unwrap_or(in_size);
let min_frac = p.min_frac.unwrap_or(default_min_frac);
let max_frac = p.max_frac.unwrap_or(1.0);
let min_outgroup_count = p.min_outgroup_count.unwrap_or(0);
let max_outgroup_count = p.max_outgroup_count.unwrap_or(default_max_outgroup_count);
let min_outgroup_frac = p.min_outgroup_frac.unwrap_or(0.0);
let max_outgroup_frac = p.max_outgroup_frac.unwrap_or(1.0);
for (v, lo, hi) in [
("--min-frac/--max-frac", min_frac, max_frac),
("--min-outgroup-frac/--max-outgroup-frac", min_outgroup_frac, max_outgroup_frac),
] {
if !(0.0..=1.0).contains(&lo) || !(0.0..=1.0).contains(&hi) {
return Err(format!("{v}: fraction values must be in [0.0, 1.0]"));
}
if lo > hi {
return Err(format!("{v}: min ({lo}) is greater than max ({hi})"));
}
}
if min_count > max_count {
return Err(format!("--min-count/--max-count: min ({min_count}) is greater than max ({max_count})"));
}
if min_outgroup_count > max_outgroup_count {
return Err(format!("--min-outgroup-count/--max-outgroup-count: min ({min_outgroup_count}) is greater than max ({max_outgroup_count})"));
}
Ok(GroupQuorumFilter {
ingroup_idx, ingroup_idx,
outgroup_idx, outgroup_idx,
threshold: p.threshold, threshold: p.threshold,
min_count: p.min_count.unwrap_or(0), min_count,
max_count: p.max_count.unwrap_or(in_size), max_count,
min_frac: p.min_frac.unwrap_or(0.0), min_frac,
max_frac: p.max_frac.unwrap_or(1.0), max_frac,
min_outgroup_count: p.min_outgroup_count.unwrap_or(0), min_outgroup_count,
max_outgroup_count: p.max_outgroup_count.unwrap_or(out_size), max_outgroup_count,
min_outgroup_frac: p.min_outgroup_frac.unwrap_or(0.0), min_outgroup_frac,
max_outgroup_frac: p.max_outgroup_frac.unwrap_or(1.0), max_outgroup_frac,
} })
} }
+1
View File
@@ -48,6 +48,7 @@ pub fn run(args: UnitigArgs) {
partition partition
.iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| { .iter_partition_kmers(i, use_counts, n_genomes, &filters, |kmer, _row| {
local_g.push(kmer); local_g.push(kmer);
true
}) })
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
eprintln!("error reading partition {i}: {e}"); eprintln!("error reading partition {i}: {e}");
+43 -18
View File
@@ -26,17 +26,19 @@ impl KmerPartition {
/// If no data matrix exists for a layer (pure set-membership, single genome), /// If no data matrix exists for a layer (pure set-membership, single genome),
/// a row of `n_genomes` ones is emitted for every kmer in that layer — unless /// a row of `n_genomes` ones is emitted for every kmer in that layer — unless
/// the filter rejects it, in which case the whole layer is skipped. /// the filter rejects it, in which case the whole layer is skipped.
/// Like [`iter_partition_kmers`] but the callback returns `false` to stop early.
/// Returns `Ok(true)` if all kmers were visited, `Ok(false)` if the callback halted.
pub fn iter_partition_kmers( pub fn iter_partition_kmers(
&self, &self,
part: usize, part: usize,
use_counts: bool, use_counts: bool,
n_genomes: usize, n_genomes: usize,
filters: &[Box<dyn KmerFilter>], filters: &[Box<dyn KmerFilter>],
mut cb: impl FnMut(CanonicalKmer, Box<[u32]>), mut cb: impl FnMut(CanonicalKmer, Box<[u32]>) -> bool,
) -> SKResult<()> { ) -> SKResult<bool> {
let index_dir = self.part_dir(part).join(INDEX_SUBDIR); let index_dir = self.part_dir(part).join(INDEX_SUBDIR);
if !index_dir.exists() { if !index_dir.exists() {
return Ok(()); return Ok(true);
} }
let index_mode = PartitionMeta::load(&index_dir) let index_mode = PartitionMeta::load(&index_dir)
@@ -54,56 +56,68 @@ impl KmerPartition {
let counts_dir = layer_dir.join("counts"); let counts_dir = layer_dir.join("counts");
let presence_dir = layer_dir.join("presence"); let presence_dir = layer_dir.join("presence");
if use_counts && counts_dir.exists() { let cont = if use_counts && counts_dir.exists() {
let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?;
let mut cont = true;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
let row = mat.row(slot); let row = mat.row(slot);
if passes_all(filters, &row, n_genomes) { if passes_all(filters, &row, n_genomes) {
cb(kmer, row); cont = cb(kmer, row);
if !cont { break; }
} }
} }
} }
cont
} else if !use_counts && presence_dir.exists() { } else if !use_counts && presence_dir.exists() {
let mat = PersistentBitMatrix::open(&layer_dir).map_err(SKError::Io)?; let mat = PersistentBitMatrix::open(&layer_dir).map_err(SKError::Io)?;
let mut cont = true;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect(); let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect();
if passes_all(filters, &row, n_genomes) { if passes_all(filters, &row, n_genomes) {
cb(kmer, row); cont = cb(kmer, row);
if !cont { break; }
} }
} }
} }
cont
} else { } else {
// No data matrix: implicit presence — all values are 1. // No data matrix: implicit presence — all values are 1.
// The filter result is identical for every kmer, so evaluate once. // The filter result is identical for every kmer, so evaluate once.
let all_present: Box<[u32]> = vec![1u32; n_genomes].into(); let all_present: Box<[u32]> = vec![1u32; n_genomes].into();
let mut cont = true;
if passes_all(filters, &all_present, n_genomes) { if passes_all(filters, &all_present, n_genomes) {
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if mphf.find(kmer).is_some() { if mphf.find(kmer).is_some() {
cb(kmer, all_present.clone()); cont = cb(kmer, all_present.clone());
if !cont { break; }
} }
} }
} }
} cont
};
if !cont { return Ok(false); }
} }
Ok(()) Ok(true)
} }
/// Like [`iter_partition_kmers`] but the callback also receives `(partition, layer)` /// Like [`iter_partition_kmers`] but the callback also receives `(partition, layer)`
/// indices, enabling debug output that identifies where each kmer was stored. /// indices, enabling debug output that identifies where each kmer was stored.
/// Returns `Ok(true)` if all kmers were visited, `Ok(false)` if the callback halted.
pub fn iter_partition_kmers_located( pub fn iter_partition_kmers_located(
&self, &self,
part: usize, part: usize,
use_counts: bool, use_counts: bool,
n_genomes: usize, n_genomes: usize,
filters: &[Box<dyn KmerFilter>], filters: &[Box<dyn KmerFilter>],
mut cb: impl FnMut(usize, usize, CanonicalKmer, Box<[u32]>), mut cb: impl FnMut(usize, usize, CanonicalKmer, Box<[u32]>) -> bool,
) -> SKResult<()> { ) -> SKResult<bool> {
let index_dir = self.part_dir(part).join(INDEX_SUBDIR); let index_dir = self.part_dir(part).join(INDEX_SUBDIR);
if !index_dir.exists() { if !index_dir.exists() {
return Ok(()); return Ok(true);
} }
let index_mode = PartitionMeta::load(&index_dir) let index_mode = PartitionMeta::load(&index_dir)
@@ -120,39 +134,50 @@ impl KmerPartition {
let counts_dir = layer_dir.join("counts"); let counts_dir = layer_dir.join("counts");
let presence_dir = layer_dir.join("presence"); let presence_dir = layer_dir.join("presence");
if use_counts && counts_dir.exists() { let cont = if use_counts && counts_dir.exists() {
let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?; let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?;
let mut cont = true;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
let row = mat.row(slot); let row = mat.row(slot);
if passes_all(filters, &row, n_genomes) { if passes_all(filters, &row, n_genomes) {
cb(part, layer, kmer, row); cont = cb(part, layer, kmer, row);
if !cont { break; }
} }
} }
} }
cont
} else if !use_counts && presence_dir.exists() { } else if !use_counts && presence_dir.exists() {
let mat = PersistentBitMatrix::open(&layer_dir).map_err(SKError::Io)?; let mat = PersistentBitMatrix::open(&layer_dir).map_err(SKError::Io)?;
let mut cont = true;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) { if let Some(slot) = mphf.find(kmer) {
let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect(); let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect();
if passes_all(filters, &row, n_genomes) { if passes_all(filters, &row, n_genomes) {
cb(part, layer, kmer, row); cont = cb(part, layer, kmer, row);
if !cont { break; }
} }
} }
} }
cont
} else { } else {
let all_present: Box<[u32]> = vec![1u32; n_genomes].into(); let all_present: Box<[u32]> = vec![1u32; n_genomes].into();
let mut cont = true;
if passes_all(filters, &all_present, n_genomes) { if passes_all(filters, &all_present, n_genomes) {
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if mphf.find(kmer).is_some() { if mphf.find(kmer).is_some() {
cb(part, layer, kmer, all_present.clone()); cont = cb(part, layer, kmer, all_present.clone());
if !cont { break; }
} }
} }
} }
} cont
};
if !cont { return Ok(false); }
layer += 1; layer += 1;
} }
Ok(()) Ok(true)
} }
} }