feat: add Findere z-window filtering and detail mode coverage tracking

Introduces the `--findere-z` CLI flag to override the index's sliding window parameter and implements `apply_findere` to filter k-mer hits using a z-consecutive positions window. Adds structural support for `--detail` mode, including per-sequence k-mer offsets, conditional allocation of per-position coverage vectors, and JSON serialization. Updates architecture documentation, CLI references, and annotation schemas to align with the new implementation, resolving prior discrepancies with `--detail` and `--mismatch` flags.
This commit is contained in:
Eric Coissac
2026-05-26 15:25:57 +02:00
parent 26ab165807
commit 694da5208e
2 changed files with 218 additions and 73 deletions
+139 -35
View File
@@ -4,6 +4,7 @@ use std::path::PathBuf;
use clap::Args;
use obikindex::KmerIndex;
use obilayeredmap::IndexMode;
use obiread::record::{SeqRecord, parse_chunk};
use obiread::chunk::read_sequence_chunks;
use obikseq::{RoutableSuperKmer, set_k, set_m};
@@ -21,7 +22,7 @@ pub struct QueryArgs {
#[arg(num_args = 1..)]
pub inputs: Vec<String>,
/// Also report per-position coverage vectors (cov_<i> per genome)
/// Report per-position coverage vectors per genome (adds "coverage" to JSON)
#[arg(long)]
pub detail: bool,
@@ -41,6 +42,10 @@ pub struct QueryArgs {
#[arg(long, default_value_t = 1)]
pub presence_threshold: u32,
/// Override the Findere z parameter from index metadata
#[arg(short = 'z', long)]
pub findere_z: Option<usize>,
/// Number of worker threads
#[arg(
short = 'T',
@@ -59,25 +64,18 @@ pub struct SKDesc {
/// Index of the source sequence within the batch.
pub seq_idx: u32,
/// Kmer offset of the first kmer of this superkmer within its sequence.
/// Reserved for `--detail` coverage vectors (not yet consumed).
#[allow(dead_code)]
pub kmer_offset: u32,
}
// ── QueryBatch ────────────────────────────────────────────────────────────────
/// A batch of query sequences with their superkmers deduplicated.
///
/// Each unique `RoutableSuperKmer` maps to all the (seq_idx, kmer_offset)
/// positions it occupies across the batch. The superkmer is queried once
/// per partition; the result is broadcast to every SKDesc entry.
pub struct QueryBatch {
/// Sequence ids in batch order.
pub ids: Vec<String>,
/// Raw sequence bytes (for output), in batch order.
pub seqs: Vec<Vec<u8>>,
/// Per-sequence total kmer count. Reserved for `--detail` (not yet consumed).
#[allow(dead_code)]
/// Total kmer count per sequence (used for `--detail` coverage allocation).
pub n_kmers: Vec<u32>,
/// Deduplicated superkmer map.
pub map: HashMap<RoutableSuperKmer, Vec<SKDesc>>,
@@ -91,8 +89,8 @@ impl QueryBatch {
level_max: usize,
theta: f64,
) -> Self {
let mut ids = Vec::with_capacity(records.len());
let mut seqs = Vec::with_capacity(records.len());
let mut ids = Vec::with_capacity(records.len());
let mut seqs = Vec::with_capacity(records.len());
let mut n_kmers = Vec::with_capacity(records.len());
let mut map: HashMap<RoutableSuperKmer, Vec<SKDesc>> = HashMap::new();
@@ -116,9 +114,6 @@ impl QueryBatch {
}
/// Split the superkmer map by partition index.
///
/// Returns a vec of length `n_partitions`; each slot holds the RSK refs
/// whose minimizer routes to that partition.
pub fn split_by_partition(&self, n_partitions: usize) -> Vec<Vec<&RoutableSuperKmer>> {
let mask = (n_partitions as u64) - 1;
let mut by_part: Vec<Vec<&RoutableSuperKmer>> = vec![Vec::new(); n_partitions];
@@ -133,22 +128,83 @@ impl QueryBatch {
// ── Per-sequence accumulator ──────────────────────────────────────────────────
struct SeqAcc {
kmer_count: u32,
kmer_missing: u32,
/// Per-genome accumulated count (count mode) or presence sum (presence mode).
kmer_count: u32,
kmer_missing: u32,
genome_totals: Vec<u32>,
}
impl SeqAcc {
fn new(n_genomes: usize) -> Self {
Self {
kmer_count: 0,
kmer_missing: 0,
kmer_count: 0,
kmer_missing: 0,
genome_totals: vec![0u32; n_genomes],
}
}
}
// ── Findere z-window filter ───────────────────────────────────────────────────
/// Apply the Findere z-window filter to per-kmer query results for one superkmer.
///
/// A k-mer at position i for genome g is confirmed only if it belongs to at least
/// one run of z consecutive positions where all k-mers are present for g.
/// Unconfirmed positions are zeroed; positions whose entire row becomes zero are
/// returned as `None`.
///
/// When z <= 1 or the superkmer is shorter than z k-mers, results are returned
/// unchanged (short superkmers cannot satisfy the z-window constraint).
fn apply_findere(
results: &[Option<Box<[u32]>>],
z: usize,
n_genomes: usize,
) -> Vec<Option<Box<[u32]>>> {
let n = results.len();
if z <= 1 || n < z {
return results.iter().map(|r| r.as_ref().map(|row| row.clone())).collect();
}
let mut confirmed = vec![vec![false; n_genomes]; n];
for g in 0..n_genomes {
let present: Vec<bool> = results
.iter()
.map(|r| r.as_ref().map_or(false, |row| row[g] > 0))
.collect();
let mut window_count = present[..z].iter().filter(|&&p| p).count();
if window_count == z {
for c in confirmed[..z].iter_mut() {
c[g] = true;
}
}
for j in 1..=(n - z) {
if present[j - 1] { window_count -= 1; }
if present[j + z - 1] { window_count += 1; }
if window_count == z {
for c in confirmed[j..j + z].iter_mut() {
c[g] = true;
}
}
}
}
results.iter().enumerate().map(|(i, opt)| {
let row = opt.as_ref()?;
let mut new_row: Box<[u32]> = row.clone();
let mut any = false;
for g in 0..n_genomes {
if !confirmed[i][g] {
new_row[g] = 0;
} else {
any = true;
}
}
if any { Some(new_row) } else { None }
}).collect()
}
// ── Entry point ───────────────────────────────────────────────────────────────
pub fn run(args: QueryArgs) {
@@ -165,17 +221,22 @@ pub fn run(args: QueryArgs) {
let n_partitions = idx.n_partitions();
let with_counts = idx.meta().config.with_counts;
let effective_z: usize = args.findere_z.unwrap_or_else(|| {
match idx.meta().config.evidence {
IndexMode::Approx { z, .. } | IndexMode::Hybrid { z, .. } => z as usize,
IndexMode::Exact => 1,
}
});
info!(
"query: k={k}, {} genome(s), with_counts={with_counts}, mismatch={}, detail={}",
"query: k={k}, {} genome(s), with_counts={with_counts}, z={effective_z}, \
mismatch={}, detail={}",
n_genomes, args.mismatch, args.detail
);
if args.mismatch {
eprintln!("warning: --mismatch not yet implemented, ignored");
}
if args.detail {
eprintln!("warning: --detail not yet implemented, ignored");
}
let paths: Vec<PathBuf> = args.inputs.iter().map(PathBuf::from).collect();
let mut out = BufWriter::new(io::stdout());
@@ -198,10 +259,20 @@ pub fn run(args: QueryArgs) {
continue;
}
let batch = QueryBatch::from_records(records, k, 6, 0.7);
let batch = QueryBatch::from_records(records, k, 6, 0.7);
let n_seqs = batch.ids.len();
let mut accs: Vec<SeqAcc> = (0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect();
let mut accs: Vec<SeqAcc> =
(0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect();
// [seq_idx][genome_idx][kmer_position] — allocated only with --detail
let mut cov: Vec<Vec<Vec<u32>>> = if args.detail {
batch.n_kmers.iter()
.map(|&n| vec![vec![0u32; n as usize]; n_genomes])
.collect()
} else {
Vec::new()
};
let by_part = batch.split_by_partition(n_partitions);
@@ -218,24 +289,40 @@ pub fn run(args: QueryArgs) {
std::process::exit(1);
});
let presence = args.force_presence || !with_counts;
let threshold = args.presence_threshold;
let presence = args.force_presence || !with_counts;
let threshold = args.presence_threshold;
for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) {
let filtered = apply_findere(sk_kmer_results, effective_z, n_genomes);
let descs = batch.map.get(*rsk).expect("rsk must be in map");
for desc in descs {
let acc = &mut accs[desc.seq_idx as usize];
for hit in sk_kmer_results.iter() {
for (local_pos, hit) in filtered.iter().enumerate() {
match hit {
None => acc.kmer_missing += 1,
None => {
// Only truly missing if the index also had no entry.
if sk_kmer_results[local_pos].is_none() {
acc.kmer_missing += 1;
}
}
Some(row) => {
acc.kmer_count += 1;
for (g, &v) in row.iter().enumerate() {
acc.genome_totals[g] += if presence {
if v == 0 {
continue;
}
let contribution = if presence {
u32::from(v >= threshold)
} else {
v
};
acc.genome_totals[g] += contribution;
if args.detail {
let abs_pos = desc.kmer_offset as usize + local_pos;
cov[desc.seq_idx as usize][g][abs_pos] += contribution;
}
}
}
}
@@ -244,7 +331,11 @@ pub fn run(args: QueryArgs) {
}
}
emit_batch(&batch, &accs, idx.meta(), args.count_missing, &mut out);
emit_batch(
&batch, &accs, idx.meta(),
args.count_missing, args.detail, &cov,
&mut out,
);
}
}
}
@@ -252,11 +343,13 @@ pub fn run(args: QueryArgs) {
// ── Output ────────────────────────────────────────────────────────────────────
fn emit_batch(
batch: &QueryBatch,
accs: &[SeqAcc],
meta: &obikindex::meta::IndexMeta,
batch: &QueryBatch,
accs: &[SeqAcc],
meta: &obikindex::meta::IndexMeta,
count_missing: bool,
out: &mut impl Write,
detail: bool,
cov: &[Vec<Vec<u32>>],
out: &mut impl Write,
) {
for (seq_idx, (id, seq)) in batch.ids.iter().zip(batch.seqs.iter()).enumerate() {
let acc = &accs[seq_idx];
@@ -266,12 +359,23 @@ fn emit_batch(
if count_missing {
ann.insert("kmer_missing".into(), acc.kmer_missing.into());
}
let mut match_map = serde_json::Map::new();
for (g, genome) in meta.genomes.iter().enumerate() {
match_map.insert(genome.label.clone(), acc.genome_totals[g].into());
}
ann.insert("kmer_strict_matches".into(), match_map.into());
if detail && !cov.is_empty() {
let mut cov_map = serde_json::Map::new();
for (g, genome) in meta.genomes.iter().enumerate() {
let v: Vec<serde_json::Value> =
cov[seq_idx][g].iter().map(|&x| x.into()).collect();
cov_map.insert(genome.label.clone(), v.into());
}
ann.insert("coverage".into(), cov_map.into());
}
let ann_str = serde_json::to_string(&ann).unwrap_or_else(|_| "{}".to_string());
// OBITools4 FASTA format: >id {"key":value,...}