From de1a41810ab2bbbf8076baeb628f6b5d128915a0 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 3 Jun 2026 09:39:49 +0200 Subject: [PATCH] perf: enable zero-allocation queries and memory-mapped indexes Introduce zero-allocation row extraction and query result buffers across `obicompactvec` and `obikpartitionner` to eliminate per-kmer heap allocations. Replace in-memory MPHF deserialization with memory-mapped, zero-copy views to reduce runtime memory footprint. Add configurable I/O chunking, a RAM-aware `--chunk-size` parameter, and system memory monitoring via the new `sysinfo` dependency. Re-export `PreloadedIndex` for external consumers. --- src/Cargo.lock | 1 + src/obicompactvec/src/bitmatrix.rs | 8 + src/obicompactvec/src/intmatrix.rs | 8 + src/obikmer/src/cmd/query.rs | 444 +++++++++++++++--------- src/obikpartitionner/src/lib.rs | 1 - src/obikpartitionner/src/query_layer.rs | 166 ++++----- src/obilayeredmap/src/mphf_layer.rs | 13 +- src/obiread/src/chunk.rs | 16 +- src/obiread/src/lib.rs | 2 +- src/obisys/Cargo.toml | 3 +- src/obisys/src/lib.rs | 15 + 11 files changed, 403 insertions(+), 274 deletions(-) diff --git a/src/Cargo.lock b/src/Cargo.lock index 4cfce7b..11f4615 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1659,6 +1659,7 @@ name = "obisys" version = "0.1.0" dependencies = [ "libc", + "sysinfo", ] [[package]] diff --git a/src/obicompactvec/src/bitmatrix.rs b/src/obicompactvec/src/bitmatrix.rs index b7305ef..ffa42d5 100644 --- a/src/obicompactvec/src/bitmatrix.rs +++ b/src/obicompactvec/src/bitmatrix.rs @@ -32,6 +32,14 @@ impl PersistentBitMatrix { self.cols.iter().map(|c| c.get(slot)).collect() } + /// Fill `buf[i]` with `col_i[slot]` as 0/1 u32, without allocating. + /// `buf` must have length ≥ `self.n_cols()`. + pub fn fill_row(&self, slot: usize, buf: &mut [u32]) { + for (c, col) in self.cols.iter().enumerate() { + buf[c] = col.get(slot) as u32; + } + } + /// Returns the number of set bits in each column as `Array1`. pub fn count_ones(&self) -> Array1 { let counts: Vec = (0..self.n_cols()) diff --git a/src/obicompactvec/src/intmatrix.rs b/src/obicompactvec/src/intmatrix.rs index a9aae75..794d396 100644 --- a/src/obicompactvec/src/intmatrix.rs +++ b/src/obicompactvec/src/intmatrix.rs @@ -33,6 +33,14 @@ impl PersistentCompactIntMatrix { self.cols.iter().map(|c| c.get(slot)).collect() } + /// Fill `buf[i]` with `col_i[slot]`, without allocating. + /// `buf` must have length ≥ `self.n_cols()`. + pub fn fill_row(&self, slot: usize, buf: &mut [u32]) { + for (c, col) in self.cols.iter().enumerate() { + buf[c] = col.get(slot); + } + } + // ── Distance matrices ───────────────────────────────────────────────────── pub fn bray_dist_matrix(&self) -> Array2 { diff --git a/src/obikmer/src/cmd/query.rs b/src/obikmer/src/cmd/query.rs index 10ea5d6..6a06156 100644 --- a/src/obikmer/src/cmd/query.rs +++ b/src/obikmer/src/cmd/query.rs @@ -5,13 +5,13 @@ use std::sync::Arc; use clap::Args; use obikindex::KmerIndex; -use obikpartitionner::PreloadedIndex; -use obilayeredmap::IndexMode; -use obiread::chunk::read_sequence_chunks; -use obiread::record::{SeqRecord, parse_chunk}; use obikrope::Rope; use obikseq::{RoutableSuperKmer, set_k, set_m}; +use obilayeredmap::IndexMode; +use obiread::chunk::read_sequence_chunks_sized; +use obiread::record::{SeqRecord, parse_chunk}; use obiskbuilder::SuperKmerIter; +use obisys::available_memory_bytes; use tracing::info; // ── Pipeline data ───────────────────────────────────────────────────────────── @@ -70,6 +70,10 @@ pub struct QueryArgs { .unwrap_or(1) )] pub threads: usize, + + /// I/O chunk size in MiB (default: auto-sized from available RAM and thread count) + #[arg(long)] + pub chunk_size: Option, } // ── SKDesc — one occurrence of a superkmer in the batch ─────────────────────── @@ -98,25 +102,24 @@ pub struct QueryBatch { impl QueryBatch { /// Build a batch from a vec of parsed sequence records. - pub fn from_records( - records: Vec, - k: usize, - level_max: usize, - theta: f64, - ) -> Self { - let mut ids = Vec::with_capacity(records.len()); - let mut seqs = Vec::with_capacity(records.len()); + pub fn from_records(records: Vec, k: usize, level_max: usize, theta: f64) -> Self { + let mut ids = Vec::with_capacity(records.len()); + let mut seqs = Vec::with_capacity(records.len()); let mut n_kmers = Vec::with_capacity(records.len()); - let mut map: HashMap> = HashMap::new(); + // Upper-bound estimate: at most one superkmer per k bases. + // Avoids repeated rehash on large chunks. + let cap = records.iter().map(|r| r.normalized.len()).sum::() / k.max(1); + let mut map: HashMap> = HashMap::with_capacity(cap); for (seq_idx, record) in records.into_iter().enumerate() { let mut kmer_offset = 0u32; for rsk in SuperKmerIter::new(&record.normalized, k, level_max, theta) { let n = (rsk.seql() - k + 1) as u32; - map.entry(rsk) - .or_default() - .push(SKDesc { seq_idx: seq_idx as u32, kmer_offset }); + map.entry(rsk).or_default().push(SKDesc { + seq_idx: seq_idx as u32, + kmer_offset, + }); kmer_offset += n; } @@ -125,7 +128,12 @@ impl QueryBatch { n_kmers.push(kmer_offset); } - Self { ids, seqs, n_kmers, map } + Self { + ids, + seqs, + n_kmers, + map, + } } /// Split the superkmer map by partition index. @@ -140,88 +148,90 @@ impl QueryBatch { } } +// ── KmerResults — allocation-free ragged result matrix ─────────────────────── + +/// Flat storage for per-kmer query results across all sequences in a chunk. +/// +/// Replaces `Vec>>>` — a single allocation for the whole +/// chunk instead of one `Box<[u32]>` per found k-mer. +struct KmerResults { + data: Vec, // total_kmers × n_genomes, row-major + in_index: Vec, // total_kmers — true if the kmer was found in the index + offsets: Vec, // offsets[i]..offsets[i+1] = kmer range for sequence i + n_genomes: usize, +} + +impl KmerResults { + fn new(n_kmers_per_seq: &[u32], n_genomes: usize) -> Self { + let mut offsets = Vec::with_capacity(n_kmers_per_seq.len() + 1); + let mut total = 0usize; + offsets.push(0); + for &n in n_kmers_per_seq { + total += n as usize; + offsets.push(total); + } + Self { + data: vec![0u32; total * n_genomes], + in_index: vec![false; total], + offsets, + n_genomes, + } + } + + fn n_kmers_for(&self, seq: usize) -> usize { + self.offsets[seq + 1] - self.offsets[seq] + } + + fn set(&mut self, seq: usize, kmer: usize, row: &[u32]) { + let abs = self.offsets[seq] + kmer; + self.in_index[abs] = true; + let base = abs * self.n_genomes; + self.data[base..base + self.n_genomes].copy_from_slice(row); + } + + #[inline] + fn is_in_index(&self, seq: usize, kmer: usize) -> bool { + self.in_index[self.offsets[seq] + kmer] + } + + /// Value for genome `g` at (seq, kmer); meaningful only when `is_in_index`. + #[inline] + fn val(&self, seq: usize, kmer: usize, g: usize) -> u32 { + self.data[(self.offsets[seq] + kmer) * self.n_genomes + g] + } +} + // ── Per-sequence accumulator ────────────────────────────────────────────────── struct SeqAcc { - kmer_count: u32, - kmer_missing: u32, + kmer_count: u32, + kmer_missing: u32, genome_totals: Vec, } impl SeqAcc { fn new(n_genomes: usize) -> Self { Self { - kmer_count: 0, - kmer_missing: 0, + kmer_count: 0, + kmer_missing: 0, genome_totals: vec![0u32; n_genomes], } } } -// ── Findere z-window filter ─────────────────────────────────────────────────── - -/// Apply the Findere z-window filter to per-kmer query results for one superkmer. -/// Aggregate s-mer query results into k-mer answers using a Findere z-window. -/// -/// Input: N s-mer results (indexed kmer size s = k − z + 1). -/// Output: N − z + 1 k-mer results (user kmer size k). -/// -/// For each genome g independently: k-mer at position i is confirmed iff all z values -/// results[i..i+z][g] are nonzero (None counts as zero for all genomes). -/// Output values are taken from results[i]; genomes not confirmed are zeroed. -fn apply_findere( - results: &[Option>], - z: usize, - n_genomes: usize, -) -> Vec>> { - let n = results.len(); - if z <= 1 { - return results.iter().map(|r| r.as_ref().map(|row| row.clone())).collect(); - } - if n < z { - return Vec::new(); - } - - let out_n = n - z + 1; - let mut confirmed = vec![vec![false; n_genomes]; out_n]; - - for g in 0..n_genomes { - let hit = |i: usize| results[i].as_ref().map_or(false, |r| r[g] > 0); - - let mut count: u32 = (0..z).filter(|&j| hit(j)).count() as u32; - if count == z as u32 { confirmed[0][g] = true; } - - for i in 1..out_n { - if hit(i - 1) { count -= 1; } - if hit(i + z - 1) { count += 1; } - if count == z as u32 { confirmed[i][g] = true; } - } - } - - (0..out_n).map(|i| { - let first = results[i].as_ref()?; - let mut row: Box<[u32]> = first.clone(); - for g in 0..n_genomes { - if !confirmed[i][g] { row[g] = 0; } - } - if row.iter().any(|&v| v > 0) { Some(row) } else { None } - }).collect() -} - // ── process_chunk ───────────────────────────────────────────────────────────── fn process_chunk( - idx: &KmerIndex, - preloaded: &PreloadedIndex, - rope: Rope, - k: usize, - n_genomes: usize, - n_partitions: usize, - with_counts: bool, - effective_z: usize, - detail: bool, - count_missing: bool, - force_presence: bool, + idx: &KmerIndex, + rope: Rope, + k: usize, + n_genomes: usize, + n_partitions: usize, + with_counts: bool, + effective_z: usize, + detail: bool, + count_missing: bool, + force_presence: bool, presence_threshold: u32, ) -> Vec { let records = parse_chunk(&rope, k); @@ -229,14 +239,11 @@ fn process_chunk( return Vec::new(); } - let batch = QueryBatch::from_records(records, k, 6, 0.7); + let batch = QueryBatch::from_records(records, k, 6, 0.7); let n_seqs = batch.ids.len(); - // Per-sequence s-mer result vectors in global sequence position order. - // All partitions fill into this structure before Findere is applied. - let mut seq_results: Vec>>> = batch.n_kmers.iter() - .map(|&n| vec![None; n as usize]) - .collect(); + // Flat result matrix — one allocation for the whole chunk. + let mut results = KmerResults::new(&batch.n_kmers, n_genomes); let by_part = batch.split_by_partition(n_partitions); @@ -245,76 +252,170 @@ fn process_chunk( continue; } - let kmer_results = preloaded - .query_partition(part_idx, part_sks, k, n_genomes) + idx.partition() + .query_partition_with( + part_idx, + part_sks, + k, + n_genomes, + with_counts, + |sk_idx, kmer_idx, row| { + let rsk = part_sks[sk_idx]; + let descs = batch.map.get(rsk).expect("rsk must be in map"); + for desc in descs { + results.set( + desc.seq_idx as usize, + desc.kmer_offset as usize + kmer_idx, + row, + ); + } + }, + ) .unwrap_or_else(|e| { eprintln!("query error on partition {part_idx}: {e}"); std::process::exit(1); }); - - for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) { - let descs = batch.map.get(*rsk).expect("rsk must be in map"); - for desc in descs { - let offset = desc.kmer_offset as usize; - let dst = &mut seq_results[desc.seq_idx as usize]; - for (j, hit) in sk_kmer_results.iter().enumerate() { - dst[offset + j] = hit.as_ref().map(|r| r.clone()); - } - } - } } - // Apply Findere on each complete sequence vector, then accumulate. - let n_kmers_out: Vec = batch.n_kmers.iter() - .map(|&n| { let n = n as usize; if n >= effective_z { n - effective_z + 1 } else { 0 } }) + // Findere z-window filter + accumulation — no intermediate allocations. + // One `confirmed` buffer reused across all sequences. + let max_n_kmers = batch.n_kmers.iter().map(|&n| n as usize).max().unwrap_or(0); + let mut confirmed = vec![false; max_n_kmers * n_genomes]; + + let mut accs: Vec = (0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect(); + + let n_kmers_out: Vec = batch + .n_kmers + .iter() + .map(|&n| { + let n = n as usize; + if n >= effective_z { + n - effective_z + 1 + } else { + 0 + } + }) .collect(); - let mut accs: Vec = - (0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect(); - let mut cov: Vec>> = if detail { - n_kmers_out.iter() + n_kmers_out + .iter() .map(|&n| vec![vec![0u32; n]; n_genomes]) .collect() } else { Vec::new() }; - let presence = force_presence || !with_counts; + let presence = force_presence || !with_counts; let threshold = presence_threshold; + let z = effective_z; for seq_idx in 0..n_seqs { - let filtered = apply_findere(&seq_results[seq_idx], effective_z, n_genomes); - let acc = &mut accs[seq_idx]; + let n = results.n_kmers_for(seq_idx); + let out_n = n_kmers_out[seq_idx]; + if out_n == 0 { + continue; + } - for (pos, hit) in filtered.iter().enumerate() { - match hit { - None => { - if seq_results[seq_idx][pos].is_none() { - acc.kmer_missing += 1; + if z <= 1 { + // No Findere — every indexed s-mer is confirmed. + let acc = &mut accs[seq_idx]; + for pos in 0..n { + if !results.is_in_index(seq_idx, pos) { + acc.kmer_missing += 1; + continue; + } + acc.kmer_count += 1; + for g in 0..n_genomes { + let v = results.val(seq_idx, pos, g); + if v == 0 { + continue; + } + let c = if presence { + u32::from(v >= threshold) + } else { + v + }; + acc.genome_totals[g] += c; + if detail { + cov[seq_idx][g][pos] += c; } } - Some(row) => { - acc.kmer_count += 1; - for (g, &v) in row.iter().enumerate() { - if v == 0 { continue; } - let contribution = if presence { - u32::from(v >= threshold) - } else { - v - }; - acc.genome_totals[g] += contribution; - if detail { - cov[seq_idx][g][pos] += contribution; - } + } + } else { + // Build confirmed[pos * n_genomes + g] via sliding window. + let conf = &mut confirmed[..out_n * n_genomes]; + conf.fill(false); + + for g in 0..n_genomes { + let hit = + |i: usize| results.is_in_index(seq_idx, i) && results.val(seq_idx, i, g) > 0; + + let mut cnt: u32 = (0..z).filter(|&j| hit(j)).count() as u32; + if cnt == z as u32 { + conf[g] = true; + } + + for i in 1..out_n { + if hit(i - 1) { + cnt -= 1; + } + if hit(i + z - 1) { + cnt += 1; + } + if cnt == z as u32 { + conf[i * n_genomes + g] = true; + } + } + } + + let acc = &mut accs[seq_idx]; + for pos in 0..out_n { + let any = (0..n_genomes).any(|g| conf[pos * n_genomes + g]); + if !any { + if !results.is_in_index(seq_idx, pos) { + acc.kmer_missing += 1; + } + continue; + } + acc.kmer_count += 1; + for g in 0..n_genomes { + if !conf[pos * n_genomes + g] { + continue; + } + let v = results.val(seq_idx, pos, g); + if v == 0 { + continue; + } + let c = if presence { + u32::from(v >= threshold) + } else { + v + }; + acc.genome_totals[g] += c; + if detail { + cov[seq_idx][g][pos] += c; } } } } } - let mut buf = Vec::new(); - emit_batch(&batch, &accs, idx.meta(), count_missing, detail, &cov, &mut buf); + // Capacity estimate: actual sequence + ID bytes, plus JSON overhead per record. + // JSON per record ≈ 50 fixed chars + ~20 per genome (label + count value) + 100 (overhead). + let seq_bytes: usize = batch.seqs.iter().map(|s| s.len()).sum(); + let id_bytes: usize = batch.ids.iter().map(|s| s.len()).sum(); + let cap = seq_bytes + id_bytes + n_seqs * (4 + 50 + n_genomes * 20) + 100; + let mut buf = Vec::with_capacity(cap); + emit_batch( + &batch, + &accs, + idx.meta(), + count_missing, + detail, + &cov, + &mut buf, + ); buf } @@ -329,18 +430,30 @@ pub fn run(args: QueryArgs) { set_k(idx.kmer_size()); set_m(idx.minimizer_size()); - let k = idx.kmer_size(); - let n_genomes = idx.meta().genomes.len(); + let k = idx.kmer_size(); + let n_genomes = idx.meta().genomes.len(); let n_partitions = idx.n_partitions(); - let with_counts = idx.meta().config.with_counts; - let n_workers = args.threads.max(1); + let with_counts = idx.meta().config.with_counts; + let n_workers = args.threads.max(1); - let effective_z: usize = args.findere_z.unwrap_or_else(|| { - match idx.meta().config.evidence { + // Chunk size: each chunk stays in memory for its entire processing lifetime. + // Overhead per raw byte is ~8× (Rope + parsed records + superkmers + results). + // We target ≤ 50 % of available RAM across all concurrent workers. + let chunk_bytes = args + .chunk_size + .map(|mb| mb * 1024 * 1024) + .unwrap_or_else(|| { + let avail = available_memory_bytes(); + let computed = avail / (n_workers as u64 * 16); + computed.clamp(4 * 1024 * 1024, 256 * 1024 * 1024) as usize + }); + + let effective_z: usize = args + .findere_z + .unwrap_or_else(|| match idx.meta().config.evidence { IndexMode::Approx { z, .. } | IndexMode::Hybrid { z, .. } => z as usize, IndexMode::Exact => 1, - } - }); + }); info!( "query: k={k}, {} genome(s), with_counts={with_counts}, z={effective_z}, \ @@ -352,28 +465,25 @@ pub fn run(args: QueryArgs) { eprintln!("warning: --mismatch not yet implemented, ignored"); } - let preloaded = Arc::new( - PreloadedIndex::new(idx.partition(), n_partitions, with_counts) - .unwrap_or_else(|e| { - eprintln!("error loading index layers: {e}"); - std::process::exit(1); - }) - ); - - let detail = args.detail; - let count_missing = args.count_missing; - let force_presence = args.force_presence; + let detail = args.detail; + let count_missing = args.count_missing; + let force_presence = args.force_presence; let presence_threshold = args.presence_threshold; // Flat iterator over all Rope chunks from all input files. // I/O runs in the source thread; chunk processing is parallelised by the pipe. + info!("query: chunk_size={}MiB", chunk_bytes / (1024 * 1024)); + let paths: Vec = args.inputs.iter().map(PathBuf::from).collect(); - let all_chunks = paths.into_iter().flat_map(|path| { + let all_chunks = paths.into_iter().flat_map(move |path| { let path_str = path.to_str().unwrap_or("").to_owned(); - match read_sequence_chunks(&path_str) { + match read_sequence_chunks_sized(&path_str, chunk_bytes) { Ok(iter) => Box::new(iter.filter_map(|r| match r { Ok(rope) => Some(rope), - Err(e) => { eprintln!("read error: {e}"); None } + Err(e) => { + eprintln!("read error: {e}"); + None + } })) as Box + Send>, Err(e) => { eprintln!("error opening {path_str}: {e}"); @@ -385,11 +495,10 @@ pub fn run(args: QueryArgs) { let pipe = obipipeline::make_pipe! { QueryData : Rope => Vec, | { - let idx = Arc::clone(&idx); - let preloaded = Arc::clone(&preloaded); + let idx = Arc::clone(&idx); move |rope: Rope| { process_chunk( - &idx, &preloaded, rope, k, n_genomes, n_partitions, with_counts, + &idx, rope, k, n_genomes, n_partitions, with_counts, effective_z, detail, count_missing, force_presence, presence_threshold, ) } @@ -408,13 +517,13 @@ pub fn run(args: QueryArgs) { // ── Output ──────────────────────────────────────────────────────────────────── fn emit_batch( - batch: &QueryBatch, - accs: &[SeqAcc], - meta: &obikindex::meta::IndexMeta, + batch: &QueryBatch, + accs: &[SeqAcc], + meta: &obikindex::meta::IndexMeta, count_missing: bool, - detail: bool, - cov: &[Vec>], - out: &mut impl Write, + detail: bool, + cov: &[Vec>], + out: &mut impl Write, ) { for (seq_idx, (id, seq)) in batch.ids.iter().zip(batch.seqs.iter()).enumerate() { let acc = &accs[seq_idx]; @@ -434,17 +543,18 @@ fn emit_batch( if detail && !cov.is_empty() { let mut cov_map = serde_json::Map::new(); for (g, genome) in meta.genomes.iter().enumerate() { - let v: Vec = - cov[seq_idx][g].iter().map(|&x| x.into()).collect(); + let v: Vec = cov[seq_idx][g].iter().map(|&x| x.into()).collect(); cov_map.insert(genome.label.clone(), v.into()); } ann.insert("coverage".into(), cov_map.into()); } - let ann_str = serde_json::to_string(&ann).unwrap_or_else(|_| "{}".to_string()); - // OBITools4 FASTA format: >id {"key":value,...} - let _ = writeln!(out, ">{id} {ann_str}"); + let _ = out.write_all(b">"); + let _ = out.write_all(id.as_bytes()); + let _ = out.write_all(b" "); + let _ = serde_json::to_writer(&mut *out, &ann); + let _ = out.write_all(b"\n"); let _ = out.write_all(seq); let _ = out.write_all(b"\n"); } diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index aa587ad..49a81fc 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -11,4 +11,3 @@ mod rebuild_layer; pub use filter::KmerFilter; pub use merge_layer::MergeMode; pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR}; -pub use query_layer::PreloadedIndex; diff --git a/src/obikpartitionner/src/query_layer.rs b/src/obikpartitionner/src/query_layer.rs index 0d55ce3..97e09e3 100644 --- a/src/obikpartitionner/src/query_layer.rs +++ b/src/obikpartitionner/src/query_layer.rs @@ -50,129 +50,91 @@ impl QueryLayer { } } - /// Return `Some(per-genome row)` if `kmer` is indexed in this layer, else `None`. - fn find(&self, kmer: CanonicalKmer, n_genomes: usize) -> Option> { + /// Write per-genome values into `buf` if `kmer` is indexed in this layer. + /// Returns `true` on hit; `buf` is untouched on miss. + fn find_into(&self, kmer: CanonicalKmer, n_genomes: usize, buf: &mut [u32]) -> bool { match self { QueryLayer::SetOnly(mphf) => { - mphf.find(kmer) - .map(|_| vec![1u32; n_genomes].into_boxed_slice()) + if mphf.find(kmer).is_some() { + buf[..n_genomes].fill(1); + true + } else { + false + } } QueryLayer::Presence(mphf, mat) => { - mphf.find(kmer) - .map(|slot| mat.row(slot).iter().map(|&b| b as u32).collect()) + if let Some(slot) = mphf.find(kmer) { + mat.fill_row(slot, &mut buf[..n_genomes]); + true + } else { + false + } } QueryLayer::Count(mphf, mat) => { - mphf.find(kmer).map(|slot| mat.row(slot)) + if let Some(slot) = mphf.find(kmer) { + mat.fill_row(slot, &mut buf[..n_genomes]); + true + } else { + false + } } } } } -// ── PreloadedIndex ──────────────────────────────────────────────────────────── +// ── KmerPartition::query_partition* ────────────────────────────────────────── -/// All query layers for every partition, opened once at startup. -/// -/// Wrap in `Arc` and share across worker threads — all access is read-only. -pub struct PreloadedIndex { - /// `layers[part_idx]` — ordered vec of query layers for that partition. - /// Empty vec when the partition has no index directory yet. - layers: Vec>, -} - -// SAFETY: QueryLayer and its contents are opened read-only (mmap + in-memory -// data structures). No mutation occurs after construction. -unsafe impl Sync for PreloadedIndex {} -unsafe impl Send for PreloadedIndex {} - -impl PreloadedIndex { - /// Open all partition index directories and deserialise every MPHF once. +impl KmerPartition { + /// Query a single partition, calling `on_hit(sk_idx, kmer_idx, row)` for + /// every found k-mer without allocating intermediate result vectors. /// - /// This is the expensive call — do it once before spawning query workers. - pub fn new( - partition: &KmerPartition, - n_partitions: usize, - with_counts: bool, - ) -> SKResult { - let active: Vec = (0..n_partitions).collect(); - Self::new_subset(partition, n_partitions, &active, with_counts) - } - - /// Open only the listed partition indices. - /// - /// Keeps file-descriptor and memory usage bounded to the active set. - /// Unlisted partitions have an empty layer vec and return all-None on query. - pub fn new_subset( - partition: &KmerPartition, - n_partitions: usize, - active: &[usize], - with_counts: bool, - ) -> SKResult { - let mut layers: Vec> = (0..n_partitions).map(|_| Vec::new()).collect(); - for &i in active { - let index_dir = partition.part_dir(i).join(INDEX_SUBDIR); - if !index_dir.exists() { - continue; - } - let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?; - layers[i] = (0..meta.n_layers) - .map(|l| QueryLayer::open( - &index_dir.join(format!("layer_{l}")), - with_counts, - &meta.mode, - )) - .collect::>()?; - } - Ok(Self { layers }) - } - - /// Query one partition for a slice of already-routed super-kmers. - /// - /// Returns one entry per input super-kmer; each entry is a `Vec` with one - /// `Option>` per k-mer inside that super-kmer: - /// - `None` — k-mer absent from the index - /// - `Some(row)` — per-genome count or 0/1 presence - pub fn query_partition( + /// `row` is a shared scratch buffer valid only for the duration of the call; + /// the callback must copy what it needs before returning. + pub fn query_partition_with( &self, part_idx: usize, superkmers: &[&RoutableSuperKmer], k: usize, n_genomes: usize, - ) -> SKResult>>>> { + with_counts: bool, + mut on_hit: F, + ) -> SKResult<()> + where + F: FnMut(usize, usize, &[u32]), + { if superkmers.is_empty() { - return Ok(Vec::new()); + return Ok(()); } - let layers = &self.layers[part_idx]; - - if layers.is_empty() { - return Ok(superkmers - .iter() - .map(|rsk| vec![None; rsk.seql() - k + 1]) - .collect()); + let index_dir = self.part_dir(part_idx).join(INDEX_SUBDIR); + if !index_dir.exists() { + return Ok(()); } - Ok(superkmers - .iter() - .map(|rsk| { - rsk.superkmer() - .iter_canonical_kmers() - .map(|kmer| { - layers.iter().find_map(|layer| layer.find(kmer, n_genomes)) - }) - .collect() - }) - .collect()) + let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?; + let layers: Vec = (0..meta.n_layers) + .map(|i| QueryLayer::open(&index_dir.join(format!("layer_{i}")), with_counts, &meta.mode)) + .collect::>()?; + + let mut buf = vec![0u32; n_genomes]; + + for (sk_idx, rsk) in superkmers.iter().enumerate() { + for (kmer_idx, kmer) in rsk.superkmer().iter_canonical_kmers().enumerate() { + for layer in &layers { + if layer.find_into(kmer, n_genomes, &mut buf) { + on_hit(sk_idx, kmer_idx, &buf); + buf.fill(0); + break; + } + } + } + } + + Ok(()) } -} -// ── KmerPartition::query_partition (kept for backward compatibility) ────────── - -impl KmerPartition { /// Query a single partition for a slice of (already-routed) super-kmers. - /// - /// **Prefer [`PreloadedIndex`] for repeated queries** — this method - /// re-opens and deserialises the MPHF on every call. - #[deprecated(note = "use PreloadedIndex::query_partition to avoid repeated MPHF I/O")] + /// Prefer [`query_partition_with`] to avoid per-kmer heap allocations. pub fn query_partition( &self, part_idx: usize, @@ -199,13 +161,21 @@ impl KmerPartition { .map(|i| QueryLayer::open(&index_dir.join(format!("layer_{i}")), with_counts, &meta.mode)) .collect::>()?; + let mut buf = vec![0u32; n_genomes]; Ok(superkmers .iter() .map(|rsk| { rsk.superkmer() .iter_canonical_kmers() .map(|kmer| { - layers.iter().find_map(|layer| layer.find(kmer, n_genomes)) + for layer in &layers { + if layer.find_into(kmer, n_genomes, &mut buf) { + let row: Box<[u32]> = buf[..n_genomes].into(); + buf.fill(0); + return Some(row); + } + } + None }) .collect() }) diff --git a/src/obilayeredmap/src/mphf_layer.rs b/src/obilayeredmap/src/mphf_layer.rs index ac6f33d..a5e30b7 100644 --- a/src/obilayeredmap/src/mphf_layer.rs +++ b/src/obilayeredmap/src/mphf_layer.rs @@ -17,8 +17,13 @@ pub(crate) const UNITIGS_FILE: &str = "unitigs.bin"; pub(crate) const EVIDENCE_FILE: &str = "evidence.bin"; pub(crate) const FINGERPRINT_FILE: &str = "fingerprint.bin"; +/// Owned MPHF — used only at build time (construction + store). pub(crate) type Mphf = PtrHash>, Xx64, Vec>; +/// Zero-copy MPHF for querying — ε-deserialized view into a memory-mapped file. +/// `MemCase` owns the mmap backing; `'static` is sound because MemCase pins the memory. +type MphfEps = PtrHash, Xx64, &'static [u8]>; + // ── LayerEvidence ───────────────────────────────────────────────────────────── enum LayerEvidence { @@ -36,7 +41,7 @@ enum LayerEvidence { /// - [`find_strict`](Self::find_strict) — always exact; O(1) on Exact/Hybrid layers, /// O(n) sequential scan on Approx layers. pub struct MphfLayer { - mphf: Mphf, + mphf: MemCase, ev: LayerEvidence, n: usize, } @@ -45,7 +50,7 @@ impl MphfLayer { /// Open a layer using the index-level `mode` determined at `LayeredMap` open time. /// No per-layer metadata file is read. pub fn open(dir: &Path, mode: &IndexMode) -> OLMResult { - let mphf: Mphf = Mphf::load_full(&dir.join(MPHF_FILE)) + let mphf: MemCase = Mphf::mmap(&dir.join(MPHF_FILE), Flags::empty()) .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; let (ev, n) = match mode { IndexMode::Exact => { @@ -137,11 +142,11 @@ impl MphfLayer { /// /// Use this when the caller guarantees that all queried kmers are in the MPHF /// domain (e.g. when iterating the source's own unitigs during merge). -pub struct MphfOnly(Mphf); +pub struct MphfOnly(MemCase); impl MphfOnly { pub fn open(dir: &Path) -> OLMResult { - let mphf: Mphf = Mphf::load_full(&dir.join(MPHF_FILE)) + let mphf: MemCase = Mphf::mmap(&dir.join(MPHF_FILE), Flags::empty()) .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; Ok(Self(mphf)) } diff --git a/src/obiread/src/chunk.rs b/src/obiread/src/chunk.rs index 1bd7f0b..63fea7e 100644 --- a/src/obiread/src/chunk.rs +++ b/src/obiread/src/chunk.rs @@ -153,11 +153,23 @@ pub fn read_fastq_chunks( /// Returns an error if the format cannot be identified as `text/fasta` or `text/fastq`. pub fn read_sequence_chunks( path: &str, +) -> io::Result>>> { + read_sequence_chunks_sized(path, DEFAULT_BLOCK_SIZE) +} + +/// Same as [`read_sequence_chunks`] but with an explicit I/O block size. +/// +/// Larger values amortise per-partition open/close overhead across more superkmers. +pub fn read_sequence_chunks_sized( + path: &str, + block_size: usize, ) -> io::Result>>> { let input = match xopen(path) { Ok(mut i) => match i.mime_type() { - Some("text/fasta") => fasta_chunks(i), - Some("text/fastq") => fastq_chunks(i), + Some("text/fasta") => SeqChunkIter::new(i, block_size, + fasta::end_of_last_fasta_entry, Some("text/fasta")), + Some("text/fastq") => SeqChunkIter::new(i, block_size, + fastq::end_of_last_fastq_entry, Some("text/fastq")), _ => { return Err(io::Error::new( io::ErrorKind::InvalidData, diff --git a/src/obiread/src/lib.rs b/src/obiread/src/lib.rs index 2370bb4..4f4fce7 100644 --- a/src/obiread/src/lib.rs +++ b/src/obiread/src/lib.rs @@ -18,7 +18,7 @@ pub mod xopen; pub use chunk::{ SeqChunkIter, fasta_chunks, fastq_chunks, read_fasta_chunks, read_fastq_chunks, - read_sequence_chunks, + read_sequence_chunks, read_sequence_chunks_sized, }; pub use mimetype::MimeTypeGuesser; pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk}; diff --git a/src/obisys/Cargo.toml b/src/obisys/Cargo.toml index a34b9db..89d7cb5 100644 --- a/src/obisys/Cargo.toml +++ b/src/obisys/Cargo.toml @@ -4,4 +4,5 @@ version = "0.1.0" edition = "2024" [dependencies] -libc = "0.2" +libc = "0.2" +sysinfo = "0.33" diff --git a/src/obisys/src/lib.rs b/src/obisys/src/lib.rs index 5867eca..d352a39 100644 --- a/src/obisys/src/lib.rs +++ b/src/obisys/src/lib.rs @@ -2,6 +2,21 @@ use std::fmt; use std::time::Instant; use libc::{RUSAGE_SELF, getrusage, rusage, timeval}; +use sysinfo::System; + +// ── Memory query ────────────────────────────────────────────────────────────── + +/// Returns the number of bytes available for allocation on this machine. +/// +/// On macOS, `available_memory()` can return 0 when the memory compressor +/// inflates the page count; in that case we fall back to half of total memory. +pub fn available_memory_bytes() -> u64 { + let sys = System::new_all(); + match sys.available_memory() { + 0 => sys.total_memory() / 2, + n => n, + } +} // ── raw helpers ───────────────────────────────────────────────────────────────