From 1661dd6b1c1c112f6c8410f2d52fd0c1ed19bf10 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 2 Jun 2026 15:52:23 +0200 Subject: [PATCH] feat: introduce preloaded index cache and thread-safe progress tracker Introduce `PreloadedIndex` to cache partition indices and eliminate redundant I/O during repeated queries. Refactor the query pipeline to route through this pre-loaded index, and expose it publicly in `obikpartitionner`. Additionally, add a thread-safe, lazily-initialized `MultiProgress` singleton for improved progress tracking. --- src/obikindex/src/progress.rs | 21 +++++ src/obikmer/src/cmd/query.rs | 22 +++-- src/obikpartitionner/src/lib.rs | 1 + src/obikpartitionner/src/query_layer.rs | 104 ++++++++++++++++++++++-- 4 files changed, 137 insertions(+), 11 deletions(-) create mode 100644 src/obikindex/src/progress.rs diff --git a/src/obikindex/src/progress.rs b/src/obikindex/src/progress.rs new file mode 100644 index 0000000..1c3f948 --- /dev/null +++ b/src/obikindex/src/progress.rs @@ -0,0 +1,21 @@ +use std::sync::OnceLock; + +use indicatif::MultiProgress; + +static MULTI: OnceLock = OnceLock::new(); + +/// Initialise the shared progress display. Call once from the binary before +/// any index operation. Subsequent calls are silently ignored. +pub fn init(multi: MultiProgress) { + let _ = MULTI.set(multi); +} + +/// Return the shared `MultiProgress`, creating a plain default one if the +/// binary never called [`init`]. +pub fn get() -> &'static MultiProgress { + MULTI.get_or_init(MultiProgress::new) +} + +pub(crate) fn multi() -> &'static MultiProgress { + get() +} diff --git a/src/obikmer/src/cmd/query.rs b/src/obikmer/src/cmd/query.rs index 3c0321b..10ea5d6 100644 --- a/src/obikmer/src/cmd/query.rs +++ b/src/obikmer/src/cmd/query.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use clap::Args; use obikindex::KmerIndex; +use obikpartitionner::PreloadedIndex; use obilayeredmap::IndexMode; use obiread::chunk::read_sequence_chunks; use obiread::record::{SeqRecord, parse_chunk}; @@ -211,6 +212,7 @@ fn apply_findere( fn process_chunk( idx: &KmerIndex, + preloaded: &PreloadedIndex, rope: Rope, k: usize, n_genomes: usize, @@ -243,9 +245,8 @@ fn process_chunk( continue; } - let kmer_results = idx - .partition() - .query_partition(part_idx, part_sks, k, n_genomes, with_counts) + let kmer_results = preloaded + .query_partition(part_idx, part_sks, k, n_genomes) .unwrap_or_else(|e| { eprintln!("query error on partition {part_idx}: {e}"); std::process::exit(1); @@ -351,6 +352,14 @@ pub fn run(args: QueryArgs) { eprintln!("warning: --mismatch not yet implemented, ignored"); } + let preloaded = Arc::new( + PreloadedIndex::new(idx.partition(), n_partitions, with_counts) + .unwrap_or_else(|e| { + eprintln!("error loading index layers: {e}"); + std::process::exit(1); + }) + ); + let detail = args.detail; let count_missing = args.count_missing; let force_presence = args.force_presence; @@ -376,11 +385,12 @@ pub fn run(args: QueryArgs) { let pipe = obipipeline::make_pipe! { QueryData : Rope => Vec, | { - let idx = Arc::clone(&idx); + let idx = Arc::clone(&idx); + let preloaded = Arc::clone(&preloaded); move |rope: Rope| { process_chunk( - &idx, rope, k, n_genomes, n_partitions, with_counts, effective_z, - detail, count_missing, force_presence, presence_threshold, + &idx, &preloaded, rope, k, n_genomes, n_partitions, with_counts, + effective_z, detail, count_missing, force_presence, presence_threshold, ) } } : Chunk => Output, diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index 49a81fc..aa587ad 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -11,3 +11,4 @@ mod rebuild_layer; pub use filter::KmerFilter; pub use merge_layer::MergeMode; pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR}; +pub use query_layer::PreloadedIndex; diff --git a/src/obikpartitionner/src/query_layer.rs b/src/obikpartitionner/src/query_layer.rs index fadfc1d..0d55ce3 100644 --- a/src/obikpartitionner/src/query_layer.rs +++ b/src/obikpartitionner/src/query_layer.rs @@ -68,17 +68,111 @@ impl QueryLayer { } } -// ── KmerPartition::query_partition ─────────────────────────────────────────── +// ── PreloadedIndex ──────────────────────────────────────────────────────────── -impl KmerPartition { - /// Query a single partition for a slice of (already-routed) super-kmers. +/// All query layers for every partition, opened once at startup. +/// +/// Wrap in `Arc` and share across worker threads — all access is read-only. +pub struct PreloadedIndex { + /// `layers[part_idx]` — ordered vec of query layers for that partition. + /// Empty vec when the partition has no index directory yet. + layers: Vec>, +} + +// SAFETY: QueryLayer and its contents are opened read-only (mmap + in-memory +// data structures). No mutation occurs after construction. +unsafe impl Sync for PreloadedIndex {} +unsafe impl Send for PreloadedIndex {} + +impl PreloadedIndex { + /// Open all partition index directories and deserialise every MPHF once. + /// + /// This is the expensive call — do it once before spawning query workers. + pub fn new( + partition: &KmerPartition, + n_partitions: usize, + with_counts: bool, + ) -> SKResult { + let active: Vec = (0..n_partitions).collect(); + Self::new_subset(partition, n_partitions, &active, with_counts) + } + + /// Open only the listed partition indices. + /// + /// Keeps file-descriptor and memory usage bounded to the active set. + /// Unlisted partitions have an empty layer vec and return all-None on query. + pub fn new_subset( + partition: &KmerPartition, + n_partitions: usize, + active: &[usize], + with_counts: bool, + ) -> SKResult { + let mut layers: Vec> = (0..n_partitions).map(|_| Vec::new()).collect(); + for &i in active { + let index_dir = partition.part_dir(i).join(INDEX_SUBDIR); + if !index_dir.exists() { + continue; + } + let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?; + layers[i] = (0..meta.n_layers) + .map(|l| QueryLayer::open( + &index_dir.join(format!("layer_{l}")), + with_counts, + &meta.mode, + )) + .collect::>()?; + } + Ok(Self { layers }) + } + + /// Query one partition for a slice of already-routed super-kmers. /// /// Returns one entry per input super-kmer; each entry is a `Vec` with one /// `Option>` per k-mer inside that super-kmer: /// - `None` — k-mer absent from the index - /// - `Some(row)` — per-genome count (count index) or 0/1 (presence index) + /// - `Some(row)` — per-genome count or 0/1 presence + pub fn query_partition( + &self, + part_idx: usize, + superkmers: &[&RoutableSuperKmer], + k: usize, + n_genomes: usize, + ) -> SKResult>>>> { + if superkmers.is_empty() { + return Ok(Vec::new()); + } + + let layers = &self.layers[part_idx]; + + if layers.is_empty() { + return Ok(superkmers + .iter() + .map(|rsk| vec![None; rsk.seql() - k + 1]) + .collect()); + } + + Ok(superkmers + .iter() + .map(|rsk| { + rsk.superkmer() + .iter_canonical_kmers() + .map(|kmer| { + layers.iter().find_map(|layer| layer.find(kmer, n_genomes)) + }) + .collect() + }) + .collect()) + } +} + +// ── KmerPartition::query_partition (kept for backward compatibility) ────────── + +impl KmerPartition { + /// Query a single partition for a slice of (already-routed) super-kmers. /// - /// All `superkmers` must belong to this partition (same minimizer bucket). + /// **Prefer [`PreloadedIndex`] for repeated queries** — this method + /// re-opens and deserialises the MPHF on every call. + #[deprecated(note = "use PreloadedIndex::query_partition to avoid repeated MPHF I/O")] pub fn query_partition( &self, part_idx: usize,