refactor: aggregate query results at sequence level

Refactor the query pipeline to buffer partition outputs into a per-sequence `seq_results` vector, deferring final accumulation until all partitions complete. This ensures global position ordering before computing k-mer presence, counts, and coverage statistics. Additionally, removes a resolved TODO and documents a known BLAST false-positive issue where chloroplast and bacterial contaminants yield unrealistic high-confidence matches.
This commit is contained in:
Eric Coissac
2026-05-30 07:16:23 +02:00
parent 3138f6382c
commit 708b0abf9b
2 changed files with 89 additions and 38 deletions
+55 -38
View File
@@ -230,16 +230,11 @@ fn process_chunk(
let batch = QueryBatch::from_records(records, k, 6, 0.7);
let n_seqs = batch.ids.len();
let mut accs: Vec<SeqAcc> =
(0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect();
let mut cov: Vec<Vec<Vec<u32>>> = if detail {
batch.n_kmers.iter()
.map(|&n| vec![vec![0u32; n as usize]; n_genomes])
.collect()
} else {
Vec::new()
};
// Per-sequence s-mer result vectors in global sequence position order.
// All partitions fill into this structure before Findere is applied.
let mut seq_results: Vec<Vec<Option<Box<[u32]>>>> = batch.n_kmers.iter()
.map(|&n| vec![None; n as usize])
.collect();
let by_part = batch.split_by_partition(n_partitions);
@@ -256,38 +251,60 @@ fn process_chunk(
std::process::exit(1);
});
let presence = force_presence || !with_counts;
let threshold = presence_threshold;
for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) {
let filtered = apply_findere(sk_kmer_results, effective_z, n_genomes);
let descs = batch.map.get(*rsk).expect("rsk must be in map");
for desc in descs {
let acc = &mut accs[desc.seq_idx as usize];
let offset = desc.kmer_offset as usize;
let dst = &mut seq_results[desc.seq_idx as usize];
for (j, hit) in sk_kmer_results.iter().enumerate() {
dst[offset + j] = hit.as_ref().map(|r| r.clone());
}
}
}
}
for (local_pos, hit) in filtered.iter().enumerate() {
match hit {
None => {
if sk_kmer_results[local_pos].is_none() {
acc.kmer_missing += 1;
}
}
Some(row) => {
acc.kmer_count += 1;
for (g, &v) in row.iter().enumerate() {
if v == 0 { continue; }
let contribution = if presence {
u32::from(v >= threshold)
} else {
v
};
acc.genome_totals[g] += contribution;
if detail {
let abs_pos = desc.kmer_offset as usize + local_pos;
cov[desc.seq_idx as usize][g][abs_pos] += contribution;
}
}
// Apply Findere on each complete sequence vector, then accumulate.
let n_kmers_out: Vec<usize> = batch.n_kmers.iter()
.map(|&n| { let n = n as usize; if n >= effective_z { n - effective_z + 1 } else { 0 } })
.collect();
let mut accs: Vec<SeqAcc> =
(0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect();
let mut cov: Vec<Vec<Vec<u32>>> = if detail {
n_kmers_out.iter()
.map(|&n| vec![vec![0u32; n]; n_genomes])
.collect()
} else {
Vec::new()
};
let presence = force_presence || !with_counts;
let threshold = presence_threshold;
for seq_idx in 0..n_seqs {
let filtered = apply_findere(&seq_results[seq_idx], effective_z, n_genomes);
let acc = &mut accs[seq_idx];
for (pos, hit) in filtered.iter().enumerate() {
match hit {
None => {
if seq_results[seq_idx][pos].is_none() {
acc.kmer_missing += 1;
}
}
Some(row) => {
acc.kmer_count += 1;
for (g, &v) in row.iter().enumerate() {
if v == 0 { continue; }
let contribution = if presence {
u32::from(v >= threshold)
} else {
v
};
acc.genome_totals[g] += contribution;
if detail {
cov[seq_idx][g][pos] += contribution;
}
}
}