diff --git a/src/obikmer/src/cmd/query.rs b/src/obikmer/src/cmd/query.rs index f86ec02..beb303e 100644 --- a/src/obikmer/src/cmd/query.rs +++ b/src/obikmer/src/cmd/query.rs @@ -1,16 +1,30 @@ use std::collections::HashMap; use std::io::{self, BufWriter, Write}; use std::path::PathBuf; +use std::sync::Arc; use clap::Args; use obikindex::KmerIndex; use obilayeredmap::IndexMode; -use obiread::record::{SeqRecord, parse_chunk}; use obiread::chunk::read_sequence_chunks; +use obiread::record::{SeqRecord, parse_chunk}; +use obikrope::Rope; use obikseq::{RoutableSuperKmer, set_k, set_m}; use obiskbuilder::SuperKmerIter; use tracing::info; +// ── Pipeline data ───────────────────────────────────────────────────────────── + +enum QueryData { + Chunk(Rope), + Output(Vec), +} + +// SAFETY: Rope contains Cell which is !Sync, but pipeline items are owned +// exclusively through channels — no item is ever shared across threads. +unsafe impl Send for QueryData {} +unsafe impl Sync for QueryData {} + // ── CLI ─────────────────────────────────────────────────────────────────────── #[derive(Args)] @@ -146,14 +160,6 @@ impl SeqAcc { // ── Findere z-window filter ─────────────────────────────────────────────────── /// Apply the Findere z-window filter to per-kmer query results for one superkmer. -/// -/// A k-mer at position i for genome g is confirmed only if it belongs to at least -/// one run of z consecutive positions where all k-mers are present for g. -/// Unconfirmed positions are zeroed; positions whose entire row becomes zero are -/// returned as `None`. -/// -/// When z <= 1 or the superkmer is shorter than z k-mers, results are returned -/// unchanged (short superkmers cannot satisfy the z-window constraint). fn apply_findere( results: &[Option>], z: usize, @@ -205,13 +211,106 @@ fn apply_findere( }).collect() } +// ── process_chunk ───────────────────────────────────────────────────────────── + +fn process_chunk( + idx: &KmerIndex, + rope: Rope, + k: usize, + n_genomes: usize, + n_partitions: usize, + with_counts: bool, + effective_z: usize, + detail: bool, + count_missing: bool, + force_presence: bool, + presence_threshold: u32, +) -> Vec { + let records = parse_chunk(&rope, k); + if records.is_empty() { + return Vec::new(); + } + + let batch = QueryBatch::from_records(records, k, 6, 0.7); + let n_seqs = batch.ids.len(); + + let mut accs: Vec = + (0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect(); + + let mut cov: Vec>> = if detail { + batch.n_kmers.iter() + .map(|&n| vec![vec![0u32; n as usize]; n_genomes]) + .collect() + } else { + Vec::new() + }; + + let by_part = batch.split_by_partition(n_partitions); + + for (part_idx, part_sks) in by_part.iter().enumerate() { + if part_sks.is_empty() { + continue; + } + + let kmer_results = idx + .partition() + .query_partition(part_idx, part_sks, k, n_genomes, with_counts) + .unwrap_or_else(|e| { + eprintln!("query error on partition {part_idx}: {e}"); + std::process::exit(1); + }); + + let presence = force_presence || !with_counts; + let threshold = presence_threshold; + + for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) { + let filtered = apply_findere(sk_kmer_results, effective_z, n_genomes); + let descs = batch.map.get(*rsk).expect("rsk must be in map"); + + for desc in descs { + let acc = &mut accs[desc.seq_idx as usize]; + + for (local_pos, hit) in filtered.iter().enumerate() { + match hit { + None => { + if sk_kmer_results[local_pos].is_none() { + acc.kmer_missing += 1; + } + } + Some(row) => { + acc.kmer_count += 1; + for (g, &v) in row.iter().enumerate() { + if v == 0 { continue; } + let contribution = if presence { + u32::from(v >= threshold) + } else { + v + }; + acc.genome_totals[g] += contribution; + if detail { + let abs_pos = desc.kmer_offset as usize + local_pos; + cov[desc.seq_idx as usize][g][abs_pos] += contribution; + } + } + } + } + } + } + } + } + + let mut buf = Vec::new(); + emit_batch(&batch, &accs, idx.meta(), count_missing, detail, &cov, &mut buf); + buf +} + // ── Entry point ─────────────────────────────────────────────────────────────── pub fn run(args: QueryArgs) { - let idx = KmerIndex::open(&args.index).unwrap_or_else(|e| { + let idx = Arc::new(KmerIndex::open(&args.index).unwrap_or_else(|e| { eprintln!("error opening index: {e}"); std::process::exit(1); - }); + })); set_k(idx.kmer_size()); set_m(idx.minimizer_size()); @@ -220,6 +319,7 @@ pub fn run(args: QueryArgs) { let n_genomes = idx.meta().genomes.len(); let n_partitions = idx.n_partitions(); let with_counts = idx.meta().config.with_counts; + let n_workers = args.threads.max(1); let effective_z: usize = args.findere_z.unwrap_or_else(|| { match idx.meta().config.evidence { @@ -238,106 +338,48 @@ pub fn run(args: QueryArgs) { eprintln!("warning: --mismatch not yet implemented, ignored"); } + let detail = args.detail; + let count_missing = args.count_missing; + let force_presence = args.force_presence; + let presence_threshold = args.presence_threshold; + + // Flat iterator over all Rope chunks from all input files. + // I/O runs in the source thread; chunk processing is parallelised by the pipe. let paths: Vec = args.inputs.iter().map(PathBuf::from).collect(); + let all_chunks = paths.into_iter().flat_map(|path| { + let path_str = path.to_str().unwrap_or("").to_owned(); + match read_sequence_chunks(&path_str) { + Ok(iter) => Box::new(iter.filter_map(|r| match r { + Ok(rope) => Some(rope), + Err(e) => { eprintln!("read error: {e}"); None } + })) as Box + Send>, + Err(e) => { + eprintln!("error opening {path_str}: {e}"); + std::process::exit(1); + } + } + }); + + let pipe = obipipeline::make_pipe! { + QueryData : Rope => Vec, + | { + let idx = Arc::clone(&idx); + move |rope: Rope| { + process_chunk( + &idx, rope, k, n_genomes, n_partitions, with_counts, effective_z, + detail, count_missing, force_presence, presence_threshold, + ) + } + } : Chunk => Output, + }; + let mut out = BufWriter::new(io::stdout()); - - for path in &paths { - let chunks = read_sequence_chunks(path.to_str().unwrap_or("")) - .unwrap_or_else(|e| { - eprintln!("error opening {}: {e}", path.display()); - std::process::exit(1); - }); - - for chunk_result in chunks { - let chunk = chunk_result.unwrap_or_else(|e| { - eprintln!("read error: {e}"); - std::process::exit(1); - }); - - let records = parse_chunk(&chunk, k); - if records.is_empty() { - continue; - } - - let batch = QueryBatch::from_records(records, k, 6, 0.7); - let n_seqs = batch.ids.len(); - - let mut accs: Vec = - (0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect(); - - // [seq_idx][genome_idx][kmer_position] — allocated only with --detail - let mut cov: Vec>> = if args.detail { - batch.n_kmers.iter() - .map(|&n| vec![vec![0u32; n as usize]; n_genomes]) - .collect() - } else { - Vec::new() - }; - - let by_part = batch.split_by_partition(n_partitions); - - for (part_idx, part_sks) in by_part.iter().enumerate() { - if part_sks.is_empty() { - continue; - } - - let kmer_results = idx - .partition() - .query_partition(part_idx, part_sks, k, n_genomes, with_counts) - .unwrap_or_else(|e| { - eprintln!("query error on partition {part_idx}: {e}"); - std::process::exit(1); - }); - - let presence = args.force_presence || !with_counts; - let threshold = args.presence_threshold; - - for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) { - let filtered = apply_findere(sk_kmer_results, effective_z, n_genomes); - let descs = batch.map.get(*rsk).expect("rsk must be in map"); - - for desc in descs { - let acc = &mut accs[desc.seq_idx as usize]; - - for (local_pos, hit) in filtered.iter().enumerate() { - match hit { - None => { - // Only truly missing if the index also had no entry. - if sk_kmer_results[local_pos].is_none() { - acc.kmer_missing += 1; - } - } - Some(row) => { - acc.kmer_count += 1; - for (g, &v) in row.iter().enumerate() { - if v == 0 { - continue; - } - let contribution = if presence { - u32::from(v >= threshold) - } else { - v - }; - acc.genome_totals[g] += contribution; - if args.detail { - let abs_pos = desc.kmer_offset as usize + local_pos; - cov[desc.seq_idx as usize][g][abs_pos] += contribution; - } - } - } - } - } - } - } - } - - emit_batch( - &batch, &accs, idx.meta(), - args.count_missing, args.detail, &cov, - &mut out, - ); + for block in pipe.apply(all_chunks, n_workers, 2) { + if !block.is_empty() { + out.write_all(&block).expect("write error"); } } + out.flush().expect("flush error"); } // ── Output ────────────────────────────────────────────────────────────────────