refactor(query): parallelize query execution with obipipeline

Extracts chunk processing into a dedicated function and introduces a QueryData enum with unsafe Send/Sync implementations to safely distribute Rope chunks across worker threads. Replaces nested iteration with a flat iterator and parallel block processing. Adds CLI argument parsing for presence, threshold, and detail flags to configure the pipeline.
This commit is contained in:
Eric Coissac
2026-05-29 07:24:48 +02:00
parent eaa52eaab5
commit be0e8f1041
+114 -72
View File
@@ -1,16 +1,30 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{self, BufWriter, Write}; use std::io::{self, BufWriter, Write};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use clap::Args; use clap::Args;
use obikindex::KmerIndex; use obikindex::KmerIndex;
use obilayeredmap::IndexMode; use obilayeredmap::IndexMode;
use obiread::record::{SeqRecord, parse_chunk};
use obiread::chunk::read_sequence_chunks; use obiread::chunk::read_sequence_chunks;
use obiread::record::{SeqRecord, parse_chunk};
use obikrope::Rope;
use obikseq::{RoutableSuperKmer, set_k, set_m}; use obikseq::{RoutableSuperKmer, set_k, set_m};
use obiskbuilder::SuperKmerIter; use obiskbuilder::SuperKmerIter;
use tracing::info; use tracing::info;
// ── Pipeline data ─────────────────────────────────────────────────────────────
enum QueryData {
Chunk(Rope),
Output(Vec<u8>),
}
// SAFETY: Rope contains Cell<u8> which is !Sync, but pipeline items are owned
// exclusively through channels — no item is ever shared across threads.
unsafe impl Send for QueryData {}
unsafe impl Sync for QueryData {}
// ── CLI ─────────────────────────────────────────────────────────────────────── // ── CLI ───────────────────────────────────────────────────────────────────────
#[derive(Args)] #[derive(Args)]
@@ -146,14 +160,6 @@ impl SeqAcc {
// ── Findere z-window filter ─────────────────────────────────────────────────── // ── Findere z-window filter ───────────────────────────────────────────────────
/// Apply the Findere z-window filter to per-kmer query results for one superkmer. /// Apply the Findere z-window filter to per-kmer query results for one superkmer.
///
/// A k-mer at position i for genome g is confirmed only if it belongs to at least
/// one run of z consecutive positions where all k-mers are present for g.
/// Unconfirmed positions are zeroed; positions whose entire row becomes zero are
/// returned as `None`.
///
/// When z <= 1 or the superkmer is shorter than z k-mers, results are returned
/// unchanged (short superkmers cannot satisfy the z-window constraint).
fn apply_findere( fn apply_findere(
results: &[Option<Box<[u32]>>], results: &[Option<Box<[u32]>>],
z: usize, z: usize,
@@ -205,58 +211,24 @@ fn apply_findere(
}).collect() }).collect()
} }
// ── Entry point ─────────────────────────────────────────────────────────────── // ── process_chunk ─────────────────────────────────────────────────────────────
pub fn run(args: QueryArgs) { fn process_chunk(
let idx = KmerIndex::open(&args.index).unwrap_or_else(|e| { idx: &KmerIndex,
eprintln!("error opening index: {e}"); rope: Rope,
std::process::exit(1); k: usize,
}); n_genomes: usize,
n_partitions: usize,
set_k(idx.kmer_size()); with_counts: bool,
set_m(idx.minimizer_size()); effective_z: usize,
detail: bool,
let k = idx.kmer_size(); count_missing: bool,
let n_genomes = idx.meta().genomes.len(); force_presence: bool,
let n_partitions = idx.n_partitions(); presence_threshold: u32,
let with_counts = idx.meta().config.with_counts; ) -> Vec<u8> {
let records = parse_chunk(&rope, k);
let effective_z: usize = args.findere_z.unwrap_or_else(|| {
match idx.meta().config.evidence {
IndexMode::Approx { z, .. } | IndexMode::Hybrid { z, .. } => z as usize,
IndexMode::Exact => 1,
}
});
info!(
"query: k={k}, {} genome(s), with_counts={with_counts}, z={effective_z}, \
mismatch={}, detail={}",
n_genomes, args.mismatch, args.detail
);
if args.mismatch {
eprintln!("warning: --mismatch not yet implemented, ignored");
}
let paths: Vec<PathBuf> = args.inputs.iter().map(PathBuf::from).collect();
let mut out = BufWriter::new(io::stdout());
for path in &paths {
let chunks = read_sequence_chunks(path.to_str().unwrap_or(""))
.unwrap_or_else(|e| {
eprintln!("error opening {}: {e}", path.display());
std::process::exit(1);
});
for chunk_result in chunks {
let chunk = chunk_result.unwrap_or_else(|e| {
eprintln!("read error: {e}");
std::process::exit(1);
});
let records = parse_chunk(&chunk, k);
if records.is_empty() { if records.is_empty() {
continue; return Vec::new();
} }
let batch = QueryBatch::from_records(records, k, 6, 0.7); let batch = QueryBatch::from_records(records, k, 6, 0.7);
@@ -265,8 +237,7 @@ pub fn run(args: QueryArgs) {
let mut accs: Vec<SeqAcc> = let mut accs: Vec<SeqAcc> =
(0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect(); (0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect();
// [seq_idx][genome_idx][kmer_position] — allocated only with --detail let mut cov: Vec<Vec<Vec<u32>>> = if detail {
let mut cov: Vec<Vec<Vec<u32>>> = if args.detail {
batch.n_kmers.iter() batch.n_kmers.iter()
.map(|&n| vec![vec![0u32; n as usize]; n_genomes]) .map(|&n| vec![vec![0u32; n as usize]; n_genomes])
.collect() .collect()
@@ -289,8 +260,8 @@ pub fn run(args: QueryArgs) {
std::process::exit(1); std::process::exit(1);
}); });
let presence = args.force_presence || !with_counts; let presence = force_presence || !with_counts;
let threshold = args.presence_threshold; let threshold = presence_threshold;
for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) { for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) {
let filtered = apply_findere(sk_kmer_results, effective_z, n_genomes); let filtered = apply_findere(sk_kmer_results, effective_z, n_genomes);
@@ -302,7 +273,6 @@ pub fn run(args: QueryArgs) {
for (local_pos, hit) in filtered.iter().enumerate() { for (local_pos, hit) in filtered.iter().enumerate() {
match hit { match hit {
None => { None => {
// Only truly missing if the index also had no entry.
if sk_kmer_results[local_pos].is_none() { if sk_kmer_results[local_pos].is_none() {
acc.kmer_missing += 1; acc.kmer_missing += 1;
} }
@@ -310,16 +280,14 @@ pub fn run(args: QueryArgs) {
Some(row) => { Some(row) => {
acc.kmer_count += 1; acc.kmer_count += 1;
for (g, &v) in row.iter().enumerate() { for (g, &v) in row.iter().enumerate() {
if v == 0 { if v == 0 { continue; }
continue;
}
let contribution = if presence { let contribution = if presence {
u32::from(v >= threshold) u32::from(v >= threshold)
} else { } else {
v v
}; };
acc.genome_totals[g] += contribution; acc.genome_totals[g] += contribution;
if args.detail { if detail {
let abs_pos = desc.kmer_offset as usize + local_pos; let abs_pos = desc.kmer_offset as usize + local_pos;
cov[desc.seq_idx as usize][g][abs_pos] += contribution; cov[desc.seq_idx as usize][g][abs_pos] += contribution;
} }
@@ -331,13 +299,87 @@ pub fn run(args: QueryArgs) {
} }
} }
emit_batch( let mut buf = Vec::new();
&batch, &accs, idx.meta(), emit_batch(&batch, &accs, idx.meta(), count_missing, detail, &cov, &mut buf);
args.count_missing, args.detail, &cov, buf
&mut out, }
// ── Entry point ───────────────────────────────────────────────────────────────
pub fn run(args: QueryArgs) {
let idx = Arc::new(KmerIndex::open(&args.index).unwrap_or_else(|e| {
eprintln!("error opening index: {e}");
std::process::exit(1);
}));
set_k(idx.kmer_size());
set_m(idx.minimizer_size());
let k = idx.kmer_size();
let n_genomes = idx.meta().genomes.len();
let n_partitions = idx.n_partitions();
let with_counts = idx.meta().config.with_counts;
let n_workers = args.threads.max(1);
let effective_z: usize = args.findere_z.unwrap_or_else(|| {
match idx.meta().config.evidence {
IndexMode::Approx { z, .. } | IndexMode::Hybrid { z, .. } => z as usize,
IndexMode::Exact => 1,
}
});
info!(
"query: k={k}, {} genome(s), with_counts={with_counts}, z={effective_z}, \
mismatch={}, detail={}",
n_genomes, args.mismatch, args.detail
); );
if args.mismatch {
eprintln!("warning: --mismatch not yet implemented, ignored");
}
let detail = args.detail;
let count_missing = args.count_missing;
let force_presence = args.force_presence;
let presence_threshold = args.presence_threshold;
// Flat iterator over all Rope chunks from all input files.
// I/O runs in the source thread; chunk processing is parallelised by the pipe.
let paths: Vec<PathBuf> = args.inputs.iter().map(PathBuf::from).collect();
let all_chunks = paths.into_iter().flat_map(|path| {
let path_str = path.to_str().unwrap_or("").to_owned();
match read_sequence_chunks(&path_str) {
Ok(iter) => Box::new(iter.filter_map(|r| match r {
Ok(rope) => Some(rope),
Err(e) => { eprintln!("read error: {e}"); None }
})) as Box<dyn Iterator<Item = Rope> + Send>,
Err(e) => {
eprintln!("error opening {path_str}: {e}");
std::process::exit(1);
} }
} }
});
let pipe = obipipeline::make_pipe! {
QueryData : Rope => Vec<u8>,
| {
let idx = Arc::clone(&idx);
move |rope: Rope| {
process_chunk(
&idx, rope, k, n_genomes, n_partitions, with_counts, effective_z,
detail, count_missing, force_presence, presence_threshold,
)
}
} : Chunk => Output,
};
let mut out = BufWriter::new(io::stdout());
for block in pipe.apply(all_chunks, n_workers, 2) {
if !block.is_empty() {
out.write_all(&block).expect("write error");
}
}
out.flush().expect("flush error");
} }
// ── Output ──────────────────────────────────────────────────────────────────── // ── Output ────────────────────────────────────────────────────────────────────