From 2ebc5f0d751cdd9f6fb089b17a361d606da1cfa5 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 1 Jun 2026 15:18:12 +0200 Subject: [PATCH 1/6] chore: add logging infrastructure to merge routine Adds comprehensive logging for source metadata, merge modes, and forced approximation detection. Introduces `format_evidence` and `is_trivial` helpers to format `IndexMode` variants and identify single-genome presence indices. The core merge algorithm remains unmodified, with all changes focused on enhanced runtime observability. --- src/obikindex/src/merge.rs | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index 68d8db7..39b4395 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -63,8 +63,36 @@ impl KmerIndex { } } - // ── Choose base: largest source in the output evidence mode ─────────── + // ── Log source characteristics and choose base ──────────────────────── + let mode_str = if mode == MergeMode::Presence { "presence" } else { "count" }; + info!( + "merge: {} source(s), smer-size={}, mode={}", + sources.len(), sources[0].kmer_size(), mode_str, + ); + for (i, src) in sources.iter().enumerate() { + let genome_str = if src.meta.genomes.len() == 1 { "mono-genome".to_string() } + else { format!("{} genomes", src.meta.genomes.len()) }; + let trivial_str = if is_trivial(src, mode) { " [trivial: no data approximation]" } else { "" }; + info!( + " [{}] {} — {}, {}, {}{}", + i, src.root_path.display(), + format_evidence(&src.meta.config.evidence), + genome_str, mode_str, trivial_str, + ); + } + let base_idx = choose_base(sources, mode); + let needs_approx = sources.iter().any(|src| { + !is_trivial(src, mode) + && matches!(src.meta.config.evidence, IndexMode::Approx { .. } | IndexMode::Hybrid { .. }) + }); + info!( + "output evidence: {} ({}base: [{}] {})", + format_evidence(&sources[base_idx].meta.config.evidence), + if needs_approx { "forced approx — " } else { "" }, + base_idx, sources[base_idx].root_path.display(), + ); + let mut ordered: Vec<&KmerIndex> = Vec::with_capacity(sources.len()); ordered.push(sources[base_idx]); for (i, &src) in sources.iter().enumerate() { @@ -272,6 +300,14 @@ fn partition_bar(n: u64) -> ProgressBar { pb } +fn format_evidence(ev: &IndexMode) -> String { + match ev { + IndexMode::Exact => "exact".to_string(), + IndexMode::Approx { b, z } => format!("approx (b={b}, z={z})"), + IndexMode::Hybrid { b, z } => format!("hybrid (b={b}, z={z})"), + } +} + /// A source is "trivial" if its presence/count values carry no approximation: /// single-genome presence index (SetMembership — all values are 1 by construction). fn is_trivial(src: &KmerIndex, mode: MergeMode) -> bool { From 1661dd6b1c1c112f6c8410f2d52fd0c1ed19bf10 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 2 Jun 2026 15:52:23 +0200 Subject: [PATCH 2/6] feat: introduce preloaded index cache and thread-safe progress tracker Introduce `PreloadedIndex` to cache partition indices and eliminate redundant I/O during repeated queries. Refactor the query pipeline to route through this pre-loaded index, and expose it publicly in `obikpartitionner`. Additionally, add a thread-safe, lazily-initialized `MultiProgress` singleton for improved progress tracking. --- src/obikindex/src/progress.rs | 21 +++++ src/obikmer/src/cmd/query.rs | 22 +++-- src/obikpartitionner/src/lib.rs | 1 + src/obikpartitionner/src/query_layer.rs | 104 ++++++++++++++++++++++-- 4 files changed, 137 insertions(+), 11 deletions(-) create mode 100644 src/obikindex/src/progress.rs diff --git a/src/obikindex/src/progress.rs b/src/obikindex/src/progress.rs new file mode 100644 index 0000000..1c3f948 --- /dev/null +++ b/src/obikindex/src/progress.rs @@ -0,0 +1,21 @@ +use std::sync::OnceLock; + +use indicatif::MultiProgress; + +static MULTI: OnceLock = OnceLock::new(); + +/// Initialise the shared progress display. Call once from the binary before +/// any index operation. Subsequent calls are silently ignored. +pub fn init(multi: MultiProgress) { + let _ = MULTI.set(multi); +} + +/// Return the shared `MultiProgress`, creating a plain default one if the +/// binary never called [`init`]. +pub fn get() -> &'static MultiProgress { + MULTI.get_or_init(MultiProgress::new) +} + +pub(crate) fn multi() -> &'static MultiProgress { + get() +} diff --git a/src/obikmer/src/cmd/query.rs b/src/obikmer/src/cmd/query.rs index 3c0321b..10ea5d6 100644 --- a/src/obikmer/src/cmd/query.rs +++ b/src/obikmer/src/cmd/query.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use clap::Args; use obikindex::KmerIndex; +use obikpartitionner::PreloadedIndex; use obilayeredmap::IndexMode; use obiread::chunk::read_sequence_chunks; use obiread::record::{SeqRecord, parse_chunk}; @@ -211,6 +212,7 @@ fn apply_findere( fn process_chunk( idx: &KmerIndex, + preloaded: &PreloadedIndex, rope: Rope, k: usize, n_genomes: usize, @@ -243,9 +245,8 @@ fn process_chunk( continue; } - let kmer_results = idx - .partition() - .query_partition(part_idx, part_sks, k, n_genomes, with_counts) + let kmer_results = preloaded + .query_partition(part_idx, part_sks, k, n_genomes) .unwrap_or_else(|e| { eprintln!("query error on partition {part_idx}: {e}"); std::process::exit(1); @@ -351,6 +352,14 @@ pub fn run(args: QueryArgs) { eprintln!("warning: --mismatch not yet implemented, ignored"); } + let preloaded = Arc::new( + PreloadedIndex::new(idx.partition(), n_partitions, with_counts) + .unwrap_or_else(|e| { + eprintln!("error loading index layers: {e}"); + std::process::exit(1); + }) + ); + let detail = args.detail; let count_missing = args.count_missing; let force_presence = args.force_presence; @@ -376,11 +385,12 @@ pub fn run(args: QueryArgs) { let pipe = obipipeline::make_pipe! { QueryData : Rope => Vec, | { - let idx = Arc::clone(&idx); + let idx = Arc::clone(&idx); + let preloaded = Arc::clone(&preloaded); move |rope: Rope| { process_chunk( - &idx, rope, k, n_genomes, n_partitions, with_counts, effective_z, - detail, count_missing, force_presence, presence_threshold, + &idx, &preloaded, rope, k, n_genomes, n_partitions, with_counts, + effective_z, detail, count_missing, force_presence, presence_threshold, ) } } : Chunk => Output, diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index 49a81fc..aa587ad 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -11,3 +11,4 @@ mod rebuild_layer; pub use filter::KmerFilter; pub use merge_layer::MergeMode; pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR}; +pub use query_layer::PreloadedIndex; diff --git a/src/obikpartitionner/src/query_layer.rs b/src/obikpartitionner/src/query_layer.rs index fadfc1d..0d55ce3 100644 --- a/src/obikpartitionner/src/query_layer.rs +++ b/src/obikpartitionner/src/query_layer.rs @@ -68,17 +68,111 @@ impl QueryLayer { } } -// ── KmerPartition::query_partition ─────────────────────────────────────────── +// ── PreloadedIndex ──────────────────────────────────────────────────────────── -impl KmerPartition { - /// Query a single partition for a slice of (already-routed) super-kmers. +/// All query layers for every partition, opened once at startup. +/// +/// Wrap in `Arc` and share across worker threads — all access is read-only. +pub struct PreloadedIndex { + /// `layers[part_idx]` — ordered vec of query layers for that partition. + /// Empty vec when the partition has no index directory yet. + layers: Vec>, +} + +// SAFETY: QueryLayer and its contents are opened read-only (mmap + in-memory +// data structures). No mutation occurs after construction. +unsafe impl Sync for PreloadedIndex {} +unsafe impl Send for PreloadedIndex {} + +impl PreloadedIndex { + /// Open all partition index directories and deserialise every MPHF once. + /// + /// This is the expensive call — do it once before spawning query workers. + pub fn new( + partition: &KmerPartition, + n_partitions: usize, + with_counts: bool, + ) -> SKResult { + let active: Vec = (0..n_partitions).collect(); + Self::new_subset(partition, n_partitions, &active, with_counts) + } + + /// Open only the listed partition indices. + /// + /// Keeps file-descriptor and memory usage bounded to the active set. + /// Unlisted partitions have an empty layer vec and return all-None on query. + pub fn new_subset( + partition: &KmerPartition, + n_partitions: usize, + active: &[usize], + with_counts: bool, + ) -> SKResult { + let mut layers: Vec> = (0..n_partitions).map(|_| Vec::new()).collect(); + for &i in active { + let index_dir = partition.part_dir(i).join(INDEX_SUBDIR); + if !index_dir.exists() { + continue; + } + let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?; + layers[i] = (0..meta.n_layers) + .map(|l| QueryLayer::open( + &index_dir.join(format!("layer_{l}")), + with_counts, + &meta.mode, + )) + .collect::>()?; + } + Ok(Self { layers }) + } + + /// Query one partition for a slice of already-routed super-kmers. /// /// Returns one entry per input super-kmer; each entry is a `Vec` with one /// `Option>` per k-mer inside that super-kmer: /// - `None` — k-mer absent from the index - /// - `Some(row)` — per-genome count (count index) or 0/1 (presence index) + /// - `Some(row)` — per-genome count or 0/1 presence + pub fn query_partition( + &self, + part_idx: usize, + superkmers: &[&RoutableSuperKmer], + k: usize, + n_genomes: usize, + ) -> SKResult>>>> { + if superkmers.is_empty() { + return Ok(Vec::new()); + } + + let layers = &self.layers[part_idx]; + + if layers.is_empty() { + return Ok(superkmers + .iter() + .map(|rsk| vec![None; rsk.seql() - k + 1]) + .collect()); + } + + Ok(superkmers + .iter() + .map(|rsk| { + rsk.superkmer() + .iter_canonical_kmers() + .map(|kmer| { + layers.iter().find_map(|layer| layer.find(kmer, n_genomes)) + }) + .collect() + }) + .collect()) + } +} + +// ── KmerPartition::query_partition (kept for backward compatibility) ────────── + +impl KmerPartition { + /// Query a single partition for a slice of (already-routed) super-kmers. /// - /// All `superkmers` must belong to this partition (same minimizer bucket). + /// **Prefer [`PreloadedIndex`] for repeated queries** — this method + /// re-opens and deserialises the MPHF on every call. + #[deprecated(note = "use PreloadedIndex::query_partition to avoid repeated MPHF I/O")] pub fn query_partition( &self, part_idx: usize, From de1a41810ab2bbbf8076baeb628f6b5d128915a0 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 3 Jun 2026 09:39:49 +0200 Subject: [PATCH 3/6] perf: enable zero-allocation queries and memory-mapped indexes Introduce zero-allocation row extraction and query result buffers across `obicompactvec` and `obikpartitionner` to eliminate per-kmer heap allocations. Replace in-memory MPHF deserialization with memory-mapped, zero-copy views to reduce runtime memory footprint. Add configurable I/O chunking, a RAM-aware `--chunk-size` parameter, and system memory monitoring via the new `sysinfo` dependency. Re-export `PreloadedIndex` for external consumers. --- src/Cargo.lock | 1 + src/obicompactvec/src/bitmatrix.rs | 8 + src/obicompactvec/src/intmatrix.rs | 8 + src/obikmer/src/cmd/query.rs | 444 +++++++++++++++--------- src/obikpartitionner/src/lib.rs | 1 - src/obikpartitionner/src/query_layer.rs | 166 ++++----- src/obilayeredmap/src/mphf_layer.rs | 13 +- src/obiread/src/chunk.rs | 16 +- src/obiread/src/lib.rs | 2 +- src/obisys/Cargo.toml | 3 +- src/obisys/src/lib.rs | 15 + 11 files changed, 403 insertions(+), 274 deletions(-) diff --git a/src/Cargo.lock b/src/Cargo.lock index 4cfce7b..11f4615 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1659,6 +1659,7 @@ name = "obisys" version = "0.1.0" dependencies = [ "libc", + "sysinfo", ] [[package]] diff --git a/src/obicompactvec/src/bitmatrix.rs b/src/obicompactvec/src/bitmatrix.rs index b7305ef..ffa42d5 100644 --- a/src/obicompactvec/src/bitmatrix.rs +++ b/src/obicompactvec/src/bitmatrix.rs @@ -32,6 +32,14 @@ impl PersistentBitMatrix { self.cols.iter().map(|c| c.get(slot)).collect() } + /// Fill `buf[i]` with `col_i[slot]` as 0/1 u32, without allocating. + /// `buf` must have length ≥ `self.n_cols()`. + pub fn fill_row(&self, slot: usize, buf: &mut [u32]) { + for (c, col) in self.cols.iter().enumerate() { + buf[c] = col.get(slot) as u32; + } + } + /// Returns the number of set bits in each column as `Array1`. pub fn count_ones(&self) -> Array1 { let counts: Vec = (0..self.n_cols()) diff --git a/src/obicompactvec/src/intmatrix.rs b/src/obicompactvec/src/intmatrix.rs index a9aae75..794d396 100644 --- a/src/obicompactvec/src/intmatrix.rs +++ b/src/obicompactvec/src/intmatrix.rs @@ -33,6 +33,14 @@ impl PersistentCompactIntMatrix { self.cols.iter().map(|c| c.get(slot)).collect() } + /// Fill `buf[i]` with `col_i[slot]`, without allocating. + /// `buf` must have length ≥ `self.n_cols()`. + pub fn fill_row(&self, slot: usize, buf: &mut [u32]) { + for (c, col) in self.cols.iter().enumerate() { + buf[c] = col.get(slot); + } + } + // ── Distance matrices ───────────────────────────────────────────────────── pub fn bray_dist_matrix(&self) -> Array2 { diff --git a/src/obikmer/src/cmd/query.rs b/src/obikmer/src/cmd/query.rs index 10ea5d6..6a06156 100644 --- a/src/obikmer/src/cmd/query.rs +++ b/src/obikmer/src/cmd/query.rs @@ -5,13 +5,13 @@ use std::sync::Arc; use clap::Args; use obikindex::KmerIndex; -use obikpartitionner::PreloadedIndex; -use obilayeredmap::IndexMode; -use obiread::chunk::read_sequence_chunks; -use obiread::record::{SeqRecord, parse_chunk}; use obikrope::Rope; use obikseq::{RoutableSuperKmer, set_k, set_m}; +use obilayeredmap::IndexMode; +use obiread::chunk::read_sequence_chunks_sized; +use obiread::record::{SeqRecord, parse_chunk}; use obiskbuilder::SuperKmerIter; +use obisys::available_memory_bytes; use tracing::info; // ── Pipeline data ───────────────────────────────────────────────────────────── @@ -70,6 +70,10 @@ pub struct QueryArgs { .unwrap_or(1) )] pub threads: usize, + + /// I/O chunk size in MiB (default: auto-sized from available RAM and thread count) + #[arg(long)] + pub chunk_size: Option, } // ── SKDesc — one occurrence of a superkmer in the batch ─────────────────────── @@ -98,25 +102,24 @@ pub struct QueryBatch { impl QueryBatch { /// Build a batch from a vec of parsed sequence records. - pub fn from_records( - records: Vec, - k: usize, - level_max: usize, - theta: f64, - ) -> Self { - let mut ids = Vec::with_capacity(records.len()); - let mut seqs = Vec::with_capacity(records.len()); + pub fn from_records(records: Vec, k: usize, level_max: usize, theta: f64) -> Self { + let mut ids = Vec::with_capacity(records.len()); + let mut seqs = Vec::with_capacity(records.len()); let mut n_kmers = Vec::with_capacity(records.len()); - let mut map: HashMap> = HashMap::new(); + // Upper-bound estimate: at most one superkmer per k bases. + // Avoids repeated rehash on large chunks. + let cap = records.iter().map(|r| r.normalized.len()).sum::() / k.max(1); + let mut map: HashMap> = HashMap::with_capacity(cap); for (seq_idx, record) in records.into_iter().enumerate() { let mut kmer_offset = 0u32; for rsk in SuperKmerIter::new(&record.normalized, k, level_max, theta) { let n = (rsk.seql() - k + 1) as u32; - map.entry(rsk) - .or_default() - .push(SKDesc { seq_idx: seq_idx as u32, kmer_offset }); + map.entry(rsk).or_default().push(SKDesc { + seq_idx: seq_idx as u32, + kmer_offset, + }); kmer_offset += n; } @@ -125,7 +128,12 @@ impl QueryBatch { n_kmers.push(kmer_offset); } - Self { ids, seqs, n_kmers, map } + Self { + ids, + seqs, + n_kmers, + map, + } } /// Split the superkmer map by partition index. @@ -140,88 +148,90 @@ impl QueryBatch { } } +// ── KmerResults — allocation-free ragged result matrix ─────────────────────── + +/// Flat storage for per-kmer query results across all sequences in a chunk. +/// +/// Replaces `Vec>>>` — a single allocation for the whole +/// chunk instead of one `Box<[u32]>` per found k-mer. +struct KmerResults { + data: Vec, // total_kmers × n_genomes, row-major + in_index: Vec, // total_kmers — true if the kmer was found in the index + offsets: Vec, // offsets[i]..offsets[i+1] = kmer range for sequence i + n_genomes: usize, +} + +impl KmerResults { + fn new(n_kmers_per_seq: &[u32], n_genomes: usize) -> Self { + let mut offsets = Vec::with_capacity(n_kmers_per_seq.len() + 1); + let mut total = 0usize; + offsets.push(0); + for &n in n_kmers_per_seq { + total += n as usize; + offsets.push(total); + } + Self { + data: vec![0u32; total * n_genomes], + in_index: vec![false; total], + offsets, + n_genomes, + } + } + + fn n_kmers_for(&self, seq: usize) -> usize { + self.offsets[seq + 1] - self.offsets[seq] + } + + fn set(&mut self, seq: usize, kmer: usize, row: &[u32]) { + let abs = self.offsets[seq] + kmer; + self.in_index[abs] = true; + let base = abs * self.n_genomes; + self.data[base..base + self.n_genomes].copy_from_slice(row); + } + + #[inline] + fn is_in_index(&self, seq: usize, kmer: usize) -> bool { + self.in_index[self.offsets[seq] + kmer] + } + + /// Value for genome `g` at (seq, kmer); meaningful only when `is_in_index`. + #[inline] + fn val(&self, seq: usize, kmer: usize, g: usize) -> u32 { + self.data[(self.offsets[seq] + kmer) * self.n_genomes + g] + } +} + // ── Per-sequence accumulator ────────────────────────────────────────────────── struct SeqAcc { - kmer_count: u32, - kmer_missing: u32, + kmer_count: u32, + kmer_missing: u32, genome_totals: Vec, } impl SeqAcc { fn new(n_genomes: usize) -> Self { Self { - kmer_count: 0, - kmer_missing: 0, + kmer_count: 0, + kmer_missing: 0, genome_totals: vec![0u32; n_genomes], } } } -// ── Findere z-window filter ─────────────────────────────────────────────────── - -/// Apply the Findere z-window filter to per-kmer query results for one superkmer. -/// Aggregate s-mer query results into k-mer answers using a Findere z-window. -/// -/// Input: N s-mer results (indexed kmer size s = k − z + 1). -/// Output: N − z + 1 k-mer results (user kmer size k). -/// -/// For each genome g independently: k-mer at position i is confirmed iff all z values -/// results[i..i+z][g] are nonzero (None counts as zero for all genomes). -/// Output values are taken from results[i]; genomes not confirmed are zeroed. -fn apply_findere( - results: &[Option>], - z: usize, - n_genomes: usize, -) -> Vec>> { - let n = results.len(); - if z <= 1 { - return results.iter().map(|r| r.as_ref().map(|row| row.clone())).collect(); - } - if n < z { - return Vec::new(); - } - - let out_n = n - z + 1; - let mut confirmed = vec![vec![false; n_genomes]; out_n]; - - for g in 0..n_genomes { - let hit = |i: usize| results[i].as_ref().map_or(false, |r| r[g] > 0); - - let mut count: u32 = (0..z).filter(|&j| hit(j)).count() as u32; - if count == z as u32 { confirmed[0][g] = true; } - - for i in 1..out_n { - if hit(i - 1) { count -= 1; } - if hit(i + z - 1) { count += 1; } - if count == z as u32 { confirmed[i][g] = true; } - } - } - - (0..out_n).map(|i| { - let first = results[i].as_ref()?; - let mut row: Box<[u32]> = first.clone(); - for g in 0..n_genomes { - if !confirmed[i][g] { row[g] = 0; } - } - if row.iter().any(|&v| v > 0) { Some(row) } else { None } - }).collect() -} - // ── process_chunk ───────────────────────────────────────────────────────────── fn process_chunk( - idx: &KmerIndex, - preloaded: &PreloadedIndex, - rope: Rope, - k: usize, - n_genomes: usize, - n_partitions: usize, - with_counts: bool, - effective_z: usize, - detail: bool, - count_missing: bool, - force_presence: bool, + idx: &KmerIndex, + rope: Rope, + k: usize, + n_genomes: usize, + n_partitions: usize, + with_counts: bool, + effective_z: usize, + detail: bool, + count_missing: bool, + force_presence: bool, presence_threshold: u32, ) -> Vec { let records = parse_chunk(&rope, k); @@ -229,14 +239,11 @@ fn process_chunk( return Vec::new(); } - let batch = QueryBatch::from_records(records, k, 6, 0.7); + let batch = QueryBatch::from_records(records, k, 6, 0.7); let n_seqs = batch.ids.len(); - // Per-sequence s-mer result vectors in global sequence position order. - // All partitions fill into this structure before Findere is applied. - let mut seq_results: Vec>>> = batch.n_kmers.iter() - .map(|&n| vec![None; n as usize]) - .collect(); + // Flat result matrix — one allocation for the whole chunk. + let mut results = KmerResults::new(&batch.n_kmers, n_genomes); let by_part = batch.split_by_partition(n_partitions); @@ -245,76 +252,170 @@ fn process_chunk( continue; } - let kmer_results = preloaded - .query_partition(part_idx, part_sks, k, n_genomes) + idx.partition() + .query_partition_with( + part_idx, + part_sks, + k, + n_genomes, + with_counts, + |sk_idx, kmer_idx, row| { + let rsk = part_sks[sk_idx]; + let descs = batch.map.get(rsk).expect("rsk must be in map"); + for desc in descs { + results.set( + desc.seq_idx as usize, + desc.kmer_offset as usize + kmer_idx, + row, + ); + } + }, + ) .unwrap_or_else(|e| { eprintln!("query error on partition {part_idx}: {e}"); std::process::exit(1); }); - - for (rsk, sk_kmer_results) in part_sks.iter().zip(kmer_results.iter()) { - let descs = batch.map.get(*rsk).expect("rsk must be in map"); - for desc in descs { - let offset = desc.kmer_offset as usize; - let dst = &mut seq_results[desc.seq_idx as usize]; - for (j, hit) in sk_kmer_results.iter().enumerate() { - dst[offset + j] = hit.as_ref().map(|r| r.clone()); - } - } - } } - // Apply Findere on each complete sequence vector, then accumulate. - let n_kmers_out: Vec = batch.n_kmers.iter() - .map(|&n| { let n = n as usize; if n >= effective_z { n - effective_z + 1 } else { 0 } }) + // Findere z-window filter + accumulation — no intermediate allocations. + // One `confirmed` buffer reused across all sequences. + let max_n_kmers = batch.n_kmers.iter().map(|&n| n as usize).max().unwrap_or(0); + let mut confirmed = vec![false; max_n_kmers * n_genomes]; + + let mut accs: Vec = (0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect(); + + let n_kmers_out: Vec = batch + .n_kmers + .iter() + .map(|&n| { + let n = n as usize; + if n >= effective_z { + n - effective_z + 1 + } else { + 0 + } + }) .collect(); - let mut accs: Vec = - (0..n_seqs).map(|_| SeqAcc::new(n_genomes)).collect(); - let mut cov: Vec>> = if detail { - n_kmers_out.iter() + n_kmers_out + .iter() .map(|&n| vec![vec![0u32; n]; n_genomes]) .collect() } else { Vec::new() }; - let presence = force_presence || !with_counts; + let presence = force_presence || !with_counts; let threshold = presence_threshold; + let z = effective_z; for seq_idx in 0..n_seqs { - let filtered = apply_findere(&seq_results[seq_idx], effective_z, n_genomes); - let acc = &mut accs[seq_idx]; + let n = results.n_kmers_for(seq_idx); + let out_n = n_kmers_out[seq_idx]; + if out_n == 0 { + continue; + } - for (pos, hit) in filtered.iter().enumerate() { - match hit { - None => { - if seq_results[seq_idx][pos].is_none() { - acc.kmer_missing += 1; + if z <= 1 { + // No Findere — every indexed s-mer is confirmed. + let acc = &mut accs[seq_idx]; + for pos in 0..n { + if !results.is_in_index(seq_idx, pos) { + acc.kmer_missing += 1; + continue; + } + acc.kmer_count += 1; + for g in 0..n_genomes { + let v = results.val(seq_idx, pos, g); + if v == 0 { + continue; + } + let c = if presence { + u32::from(v >= threshold) + } else { + v + }; + acc.genome_totals[g] += c; + if detail { + cov[seq_idx][g][pos] += c; } } - Some(row) => { - acc.kmer_count += 1; - for (g, &v) in row.iter().enumerate() { - if v == 0 { continue; } - let contribution = if presence { - u32::from(v >= threshold) - } else { - v - }; - acc.genome_totals[g] += contribution; - if detail { - cov[seq_idx][g][pos] += contribution; - } + } + } else { + // Build confirmed[pos * n_genomes + g] via sliding window. + let conf = &mut confirmed[..out_n * n_genomes]; + conf.fill(false); + + for g in 0..n_genomes { + let hit = + |i: usize| results.is_in_index(seq_idx, i) && results.val(seq_idx, i, g) > 0; + + let mut cnt: u32 = (0..z).filter(|&j| hit(j)).count() as u32; + if cnt == z as u32 { + conf[g] = true; + } + + for i in 1..out_n { + if hit(i - 1) { + cnt -= 1; + } + if hit(i + z - 1) { + cnt += 1; + } + if cnt == z as u32 { + conf[i * n_genomes + g] = true; + } + } + } + + let acc = &mut accs[seq_idx]; + for pos in 0..out_n { + let any = (0..n_genomes).any(|g| conf[pos * n_genomes + g]); + if !any { + if !results.is_in_index(seq_idx, pos) { + acc.kmer_missing += 1; + } + continue; + } + acc.kmer_count += 1; + for g in 0..n_genomes { + if !conf[pos * n_genomes + g] { + continue; + } + let v = results.val(seq_idx, pos, g); + if v == 0 { + continue; + } + let c = if presence { + u32::from(v >= threshold) + } else { + v + }; + acc.genome_totals[g] += c; + if detail { + cov[seq_idx][g][pos] += c; } } } } } - let mut buf = Vec::new(); - emit_batch(&batch, &accs, idx.meta(), count_missing, detail, &cov, &mut buf); + // Capacity estimate: actual sequence + ID bytes, plus JSON overhead per record. + // JSON per record ≈ 50 fixed chars + ~20 per genome (label + count value) + 100 (overhead). + let seq_bytes: usize = batch.seqs.iter().map(|s| s.len()).sum(); + let id_bytes: usize = batch.ids.iter().map(|s| s.len()).sum(); + let cap = seq_bytes + id_bytes + n_seqs * (4 + 50 + n_genomes * 20) + 100; + let mut buf = Vec::with_capacity(cap); + emit_batch( + &batch, + &accs, + idx.meta(), + count_missing, + detail, + &cov, + &mut buf, + ); buf } @@ -329,18 +430,30 @@ pub fn run(args: QueryArgs) { set_k(idx.kmer_size()); set_m(idx.minimizer_size()); - let k = idx.kmer_size(); - let n_genomes = idx.meta().genomes.len(); + let k = idx.kmer_size(); + let n_genomes = idx.meta().genomes.len(); let n_partitions = idx.n_partitions(); - let with_counts = idx.meta().config.with_counts; - let n_workers = args.threads.max(1); + let with_counts = idx.meta().config.with_counts; + let n_workers = args.threads.max(1); - let effective_z: usize = args.findere_z.unwrap_or_else(|| { - match idx.meta().config.evidence { + // Chunk size: each chunk stays in memory for its entire processing lifetime. + // Overhead per raw byte is ~8× (Rope + parsed records + superkmers + results). + // We target ≤ 50 % of available RAM across all concurrent workers. + let chunk_bytes = args + .chunk_size + .map(|mb| mb * 1024 * 1024) + .unwrap_or_else(|| { + let avail = available_memory_bytes(); + let computed = avail / (n_workers as u64 * 16); + computed.clamp(4 * 1024 * 1024, 256 * 1024 * 1024) as usize + }); + + let effective_z: usize = args + .findere_z + .unwrap_or_else(|| match idx.meta().config.evidence { IndexMode::Approx { z, .. } | IndexMode::Hybrid { z, .. } => z as usize, IndexMode::Exact => 1, - } - }); + }); info!( "query: k={k}, {} genome(s), with_counts={with_counts}, z={effective_z}, \ @@ -352,28 +465,25 @@ pub fn run(args: QueryArgs) { eprintln!("warning: --mismatch not yet implemented, ignored"); } - let preloaded = Arc::new( - PreloadedIndex::new(idx.partition(), n_partitions, with_counts) - .unwrap_or_else(|e| { - eprintln!("error loading index layers: {e}"); - std::process::exit(1); - }) - ); - - let detail = args.detail; - let count_missing = args.count_missing; - let force_presence = args.force_presence; + let detail = args.detail; + let count_missing = args.count_missing; + let force_presence = args.force_presence; let presence_threshold = args.presence_threshold; // Flat iterator over all Rope chunks from all input files. // I/O runs in the source thread; chunk processing is parallelised by the pipe. + info!("query: chunk_size={}MiB", chunk_bytes / (1024 * 1024)); + let paths: Vec = args.inputs.iter().map(PathBuf::from).collect(); - let all_chunks = paths.into_iter().flat_map(|path| { + let all_chunks = paths.into_iter().flat_map(move |path| { let path_str = path.to_str().unwrap_or("").to_owned(); - match read_sequence_chunks(&path_str) { + match read_sequence_chunks_sized(&path_str, chunk_bytes) { Ok(iter) => Box::new(iter.filter_map(|r| match r { Ok(rope) => Some(rope), - Err(e) => { eprintln!("read error: {e}"); None } + Err(e) => { + eprintln!("read error: {e}"); + None + } })) as Box + Send>, Err(e) => { eprintln!("error opening {path_str}: {e}"); @@ -385,11 +495,10 @@ pub fn run(args: QueryArgs) { let pipe = obipipeline::make_pipe! { QueryData : Rope => Vec, | { - let idx = Arc::clone(&idx); - let preloaded = Arc::clone(&preloaded); + let idx = Arc::clone(&idx); move |rope: Rope| { process_chunk( - &idx, &preloaded, rope, k, n_genomes, n_partitions, with_counts, + &idx, rope, k, n_genomes, n_partitions, with_counts, effective_z, detail, count_missing, force_presence, presence_threshold, ) } @@ -408,13 +517,13 @@ pub fn run(args: QueryArgs) { // ── Output ──────────────────────────────────────────────────────────────────── fn emit_batch( - batch: &QueryBatch, - accs: &[SeqAcc], - meta: &obikindex::meta::IndexMeta, + batch: &QueryBatch, + accs: &[SeqAcc], + meta: &obikindex::meta::IndexMeta, count_missing: bool, - detail: bool, - cov: &[Vec>], - out: &mut impl Write, + detail: bool, + cov: &[Vec>], + out: &mut impl Write, ) { for (seq_idx, (id, seq)) in batch.ids.iter().zip(batch.seqs.iter()).enumerate() { let acc = &accs[seq_idx]; @@ -434,17 +543,18 @@ fn emit_batch( if detail && !cov.is_empty() { let mut cov_map = serde_json::Map::new(); for (g, genome) in meta.genomes.iter().enumerate() { - let v: Vec = - cov[seq_idx][g].iter().map(|&x| x.into()).collect(); + let v: Vec = cov[seq_idx][g].iter().map(|&x| x.into()).collect(); cov_map.insert(genome.label.clone(), v.into()); } ann.insert("coverage".into(), cov_map.into()); } - let ann_str = serde_json::to_string(&ann).unwrap_or_else(|_| "{}".to_string()); - // OBITools4 FASTA format: >id {"key":value,...} - let _ = writeln!(out, ">{id} {ann_str}"); + let _ = out.write_all(b">"); + let _ = out.write_all(id.as_bytes()); + let _ = out.write_all(b" "); + let _ = serde_json::to_writer(&mut *out, &ann); + let _ = out.write_all(b"\n"); let _ = out.write_all(seq); let _ = out.write_all(b"\n"); } diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index aa587ad..49a81fc 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -11,4 +11,3 @@ mod rebuild_layer; pub use filter::KmerFilter; pub use merge_layer::MergeMode; pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR}; -pub use query_layer::PreloadedIndex; diff --git a/src/obikpartitionner/src/query_layer.rs b/src/obikpartitionner/src/query_layer.rs index 0d55ce3..97e09e3 100644 --- a/src/obikpartitionner/src/query_layer.rs +++ b/src/obikpartitionner/src/query_layer.rs @@ -50,129 +50,91 @@ impl QueryLayer { } } - /// Return `Some(per-genome row)` if `kmer` is indexed in this layer, else `None`. - fn find(&self, kmer: CanonicalKmer, n_genomes: usize) -> Option> { + /// Write per-genome values into `buf` if `kmer` is indexed in this layer. + /// Returns `true` on hit; `buf` is untouched on miss. + fn find_into(&self, kmer: CanonicalKmer, n_genomes: usize, buf: &mut [u32]) -> bool { match self { QueryLayer::SetOnly(mphf) => { - mphf.find(kmer) - .map(|_| vec![1u32; n_genomes].into_boxed_slice()) + if mphf.find(kmer).is_some() { + buf[..n_genomes].fill(1); + true + } else { + false + } } QueryLayer::Presence(mphf, mat) => { - mphf.find(kmer) - .map(|slot| mat.row(slot).iter().map(|&b| b as u32).collect()) + if let Some(slot) = mphf.find(kmer) { + mat.fill_row(slot, &mut buf[..n_genomes]); + true + } else { + false + } } QueryLayer::Count(mphf, mat) => { - mphf.find(kmer).map(|slot| mat.row(slot)) + if let Some(slot) = mphf.find(kmer) { + mat.fill_row(slot, &mut buf[..n_genomes]); + true + } else { + false + } } } } } -// ── PreloadedIndex ──────────────────────────────────────────────────────────── +// ── KmerPartition::query_partition* ────────────────────────────────────────── -/// All query layers for every partition, opened once at startup. -/// -/// Wrap in `Arc` and share across worker threads — all access is read-only. -pub struct PreloadedIndex { - /// `layers[part_idx]` — ordered vec of query layers for that partition. - /// Empty vec when the partition has no index directory yet. - layers: Vec>, -} - -// SAFETY: QueryLayer and its contents are opened read-only (mmap + in-memory -// data structures). No mutation occurs after construction. -unsafe impl Sync for PreloadedIndex {} -unsafe impl Send for PreloadedIndex {} - -impl PreloadedIndex { - /// Open all partition index directories and deserialise every MPHF once. +impl KmerPartition { + /// Query a single partition, calling `on_hit(sk_idx, kmer_idx, row)` for + /// every found k-mer without allocating intermediate result vectors. /// - /// This is the expensive call — do it once before spawning query workers. - pub fn new( - partition: &KmerPartition, - n_partitions: usize, - with_counts: bool, - ) -> SKResult { - let active: Vec = (0..n_partitions).collect(); - Self::new_subset(partition, n_partitions, &active, with_counts) - } - - /// Open only the listed partition indices. - /// - /// Keeps file-descriptor and memory usage bounded to the active set. - /// Unlisted partitions have an empty layer vec and return all-None on query. - pub fn new_subset( - partition: &KmerPartition, - n_partitions: usize, - active: &[usize], - with_counts: bool, - ) -> SKResult { - let mut layers: Vec> = (0..n_partitions).map(|_| Vec::new()).collect(); - for &i in active { - let index_dir = partition.part_dir(i).join(INDEX_SUBDIR); - if !index_dir.exists() { - continue; - } - let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?; - layers[i] = (0..meta.n_layers) - .map(|l| QueryLayer::open( - &index_dir.join(format!("layer_{l}")), - with_counts, - &meta.mode, - )) - .collect::>()?; - } - Ok(Self { layers }) - } - - /// Query one partition for a slice of already-routed super-kmers. - /// - /// Returns one entry per input super-kmer; each entry is a `Vec` with one - /// `Option>` per k-mer inside that super-kmer: - /// - `None` — k-mer absent from the index - /// - `Some(row)` — per-genome count or 0/1 presence - pub fn query_partition( + /// `row` is a shared scratch buffer valid only for the duration of the call; + /// the callback must copy what it needs before returning. + pub fn query_partition_with( &self, part_idx: usize, superkmers: &[&RoutableSuperKmer], k: usize, n_genomes: usize, - ) -> SKResult>>>> { + with_counts: bool, + mut on_hit: F, + ) -> SKResult<()> + where + F: FnMut(usize, usize, &[u32]), + { if superkmers.is_empty() { - return Ok(Vec::new()); + return Ok(()); } - let layers = &self.layers[part_idx]; - - if layers.is_empty() { - return Ok(superkmers - .iter() - .map(|rsk| vec![None; rsk.seql() - k + 1]) - .collect()); + let index_dir = self.part_dir(part_idx).join(INDEX_SUBDIR); + if !index_dir.exists() { + return Ok(()); } - Ok(superkmers - .iter() - .map(|rsk| { - rsk.superkmer() - .iter_canonical_kmers() - .map(|kmer| { - layers.iter().find_map(|layer| layer.find(kmer, n_genomes)) - }) - .collect() - }) - .collect()) + let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?; + let layers: Vec = (0..meta.n_layers) + .map(|i| QueryLayer::open(&index_dir.join(format!("layer_{i}")), with_counts, &meta.mode)) + .collect::>()?; + + let mut buf = vec![0u32; n_genomes]; + + for (sk_idx, rsk) in superkmers.iter().enumerate() { + for (kmer_idx, kmer) in rsk.superkmer().iter_canonical_kmers().enumerate() { + for layer in &layers { + if layer.find_into(kmer, n_genomes, &mut buf) { + on_hit(sk_idx, kmer_idx, &buf); + buf.fill(0); + break; + } + } + } + } + + Ok(()) } -} -// ── KmerPartition::query_partition (kept for backward compatibility) ────────── - -impl KmerPartition { /// Query a single partition for a slice of (already-routed) super-kmers. - /// - /// **Prefer [`PreloadedIndex`] for repeated queries** — this method - /// re-opens and deserialises the MPHF on every call. - #[deprecated(note = "use PreloadedIndex::query_partition to avoid repeated MPHF I/O")] + /// Prefer [`query_partition_with`] to avoid per-kmer heap allocations. pub fn query_partition( &self, part_idx: usize, @@ -199,13 +161,21 @@ impl KmerPartition { .map(|i| QueryLayer::open(&index_dir.join(format!("layer_{i}")), with_counts, &meta.mode)) .collect::>()?; + let mut buf = vec![0u32; n_genomes]; Ok(superkmers .iter() .map(|rsk| { rsk.superkmer() .iter_canonical_kmers() .map(|kmer| { - layers.iter().find_map(|layer| layer.find(kmer, n_genomes)) + for layer in &layers { + if layer.find_into(kmer, n_genomes, &mut buf) { + let row: Box<[u32]> = buf[..n_genomes].into(); + buf.fill(0); + return Some(row); + } + } + None }) .collect() }) diff --git a/src/obilayeredmap/src/mphf_layer.rs b/src/obilayeredmap/src/mphf_layer.rs index ac6f33d..a5e30b7 100644 --- a/src/obilayeredmap/src/mphf_layer.rs +++ b/src/obilayeredmap/src/mphf_layer.rs @@ -17,8 +17,13 @@ pub(crate) const UNITIGS_FILE: &str = "unitigs.bin"; pub(crate) const EVIDENCE_FILE: &str = "evidence.bin"; pub(crate) const FINGERPRINT_FILE: &str = "fingerprint.bin"; +/// Owned MPHF — used only at build time (construction + store). pub(crate) type Mphf = PtrHash>, Xx64, Vec>; +/// Zero-copy MPHF for querying — ε-deserialized view into a memory-mapped file. +/// `MemCase` owns the mmap backing; `'static` is sound because MemCase pins the memory. +type MphfEps = PtrHash, Xx64, &'static [u8]>; + // ── LayerEvidence ───────────────────────────────────────────────────────────── enum LayerEvidence { @@ -36,7 +41,7 @@ enum LayerEvidence { /// - [`find_strict`](Self::find_strict) — always exact; O(1) on Exact/Hybrid layers, /// O(n) sequential scan on Approx layers. pub struct MphfLayer { - mphf: Mphf, + mphf: MemCase, ev: LayerEvidence, n: usize, } @@ -45,7 +50,7 @@ impl MphfLayer { /// Open a layer using the index-level `mode` determined at `LayeredMap` open time. /// No per-layer metadata file is read. pub fn open(dir: &Path, mode: &IndexMode) -> OLMResult { - let mphf: Mphf = Mphf::load_full(&dir.join(MPHF_FILE)) + let mphf: MemCase = Mphf::mmap(&dir.join(MPHF_FILE), Flags::empty()) .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; let (ev, n) = match mode { IndexMode::Exact => { @@ -137,11 +142,11 @@ impl MphfLayer { /// /// Use this when the caller guarantees that all queried kmers are in the MPHF /// domain (e.g. when iterating the source's own unitigs during merge). -pub struct MphfOnly(Mphf); +pub struct MphfOnly(MemCase); impl MphfOnly { pub fn open(dir: &Path) -> OLMResult { - let mphf: Mphf = Mphf::load_full(&dir.join(MPHF_FILE)) + let mphf: MemCase = Mphf::mmap(&dir.join(MPHF_FILE), Flags::empty()) .map_err(|e| OLMError::InvalidLayer(e.to_string()))?; Ok(Self(mphf)) } diff --git a/src/obiread/src/chunk.rs b/src/obiread/src/chunk.rs index 1bd7f0b..63fea7e 100644 --- a/src/obiread/src/chunk.rs +++ b/src/obiread/src/chunk.rs @@ -153,11 +153,23 @@ pub fn read_fastq_chunks( /// Returns an error if the format cannot be identified as `text/fasta` or `text/fastq`. pub fn read_sequence_chunks( path: &str, +) -> io::Result>>> { + read_sequence_chunks_sized(path, DEFAULT_BLOCK_SIZE) +} + +/// Same as [`read_sequence_chunks`] but with an explicit I/O block size. +/// +/// Larger values amortise per-partition open/close overhead across more superkmers. +pub fn read_sequence_chunks_sized( + path: &str, + block_size: usize, ) -> io::Result>>> { let input = match xopen(path) { Ok(mut i) => match i.mime_type() { - Some("text/fasta") => fasta_chunks(i), - Some("text/fastq") => fastq_chunks(i), + Some("text/fasta") => SeqChunkIter::new(i, block_size, + fasta::end_of_last_fasta_entry, Some("text/fasta")), + Some("text/fastq") => SeqChunkIter::new(i, block_size, + fastq::end_of_last_fastq_entry, Some("text/fastq")), _ => { return Err(io::Error::new( io::ErrorKind::InvalidData, diff --git a/src/obiread/src/lib.rs b/src/obiread/src/lib.rs index 2370bb4..4f4fce7 100644 --- a/src/obiread/src/lib.rs +++ b/src/obiread/src/lib.rs @@ -18,7 +18,7 @@ pub mod xopen; pub use chunk::{ SeqChunkIter, fasta_chunks, fastq_chunks, read_fasta_chunks, read_fastq_chunks, - read_sequence_chunks, + read_sequence_chunks, read_sequence_chunks_sized, }; pub use mimetype::MimeTypeGuesser; pub use normalize::{normalize_fasta_chunk, normalize_fastq_chunk, normalize_sequence_chunk}; diff --git a/src/obisys/Cargo.toml b/src/obisys/Cargo.toml index a34b9db..89d7cb5 100644 --- a/src/obisys/Cargo.toml +++ b/src/obisys/Cargo.toml @@ -4,4 +4,5 @@ version = "0.1.0" edition = "2024" [dependencies] -libc = "0.2" +libc = "0.2" +sysinfo = "0.33" diff --git a/src/obisys/src/lib.rs b/src/obisys/src/lib.rs index 5867eca..d352a39 100644 --- a/src/obisys/src/lib.rs +++ b/src/obisys/src/lib.rs @@ -2,6 +2,21 @@ use std::fmt; use std::time::Instant; use libc::{RUSAGE_SELF, getrusage, rusage, timeval}; +use sysinfo::System; + +// ── Memory query ────────────────────────────────────────────────────────────── + +/// Returns the number of bytes available for allocation on this machine. +/// +/// On macOS, `available_memory()` can return 0 when the memory compressor +/// inflates the page count; in that case we fall back to half of total memory. +pub fn available_memory_bytes() -> u64 { + let sys = System::new_all(); + match sys.available_memory() { + 0 => sys.total_memory() / 2, + n => n, + } +} // ── raw helpers ─────────────────────────────────────────────────────────────── From 173ac9fb42479d4f5f5a7cc3bce32d79d7705201 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 3 Jun 2026 11:50:39 +0200 Subject: [PATCH 4/6] feat: introduce packed matrix storage and layer metadata Unifies bit and integer matrix storage into `PersistentBitMatrix` and `PersistentCompactIntMatrix` enums, supporting both columnar and memory-mapped single-file layouts. Introduces `LayerMeta` to persist layer dimensions as `layer_meta.json`, enabling correct initialization of implicit presence matrices. Adds CLI commands (`pack` and `--upgrade-index`) to convert existing columnar indices to the compact format and backfill missing metadata. Updates partitionner and layered map logic to use the new persistent builders, optimized memory allocation, and auto-detected storage backends. --- src/obicompactvec/Cargo.toml | 2 +- src/obicompactvec/src/bitmatrix.rs | 285 ++++++++++++--- src/obicompactvec/src/intmatrix.rs | 445 ++++++++++++++++------- src/obicompactvec/src/layer_meta.rs | 33 ++ src/obicompactvec/src/lib.rs | 6 +- src/obicompactvec/src/tests/bitmatrix.rs | 21 +- src/obicompactvec/src/tests/intmatrix.rs | 10 +- src/obikindex/src/index.rs | 82 +++++ src/obikindex/src/merge.rs | 10 + src/obikmer/src/cmd/mod.rs | 1 + src/obikmer/src/cmd/pack.rs | 36 ++ src/obikmer/src/cmd/utils.rs | 25 +- src/obikmer/src/main.rs | 3 + src/obikpartitionner/src/distance.rs | 10 +- src/obikpartitionner/src/dump_layer.rs | 8 +- src/obikpartitionner/src/merge_layer.rs | 37 +- src/obikpartitionner/src/query_layer.rs | 46 +-- src/obilayeredmap/src/layer.rs | 4 +- src/obilayeredmap/src/layered_store.rs | 4 +- src/obilayeredmap/src/mphf_layer.rs | 2 + 20 files changed, 799 insertions(+), 271 deletions(-) create mode 100644 src/obicompactvec/src/layer_meta.rs create mode 100644 src/obikmer/src/cmd/pack.rs diff --git a/src/obicompactvec/Cargo.toml b/src/obicompactvec/Cargo.toml index dc4527a..ddb1e40 100644 --- a/src/obicompactvec/Cargo.toml +++ b/src/obicompactvec/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -memmap2 = "0.9" +memmap2 = "0.9" ndarray = "0.16" rayon = "1" diff --git a/src/obicompactvec/src/bitmatrix.rs b/src/obicompactvec/src/bitmatrix.rs index ffa42d5..ead53af 100644 --- a/src/obicompactvec/src/bitmatrix.rs +++ b/src/obicompactvec/src/bitmatrix.rs @@ -1,22 +1,29 @@ -use std::{fs, io, path::{Path, PathBuf}}; +use std::fs::{self, File}; +use std::io::{self, Write as _}; +use std::path::{Path, PathBuf}; +use memmap2::Mmap; use ndarray::{Array1, Array2}; use rayon::prelude::*; use crate::bitvec::{PersistentBitVec, PersistentBitVecBuilder}; +use crate::layer_meta::LayerMeta; use crate::meta::MatrixMeta; fn col_path(dir: &Path, col: usize) -> PathBuf { dir.join(format!("col_{col:06}.pbiv")) } -pub struct PersistentBitMatrix { +// ── ColumnarBitMatrix ───────────────────────────────────────────────────────── + +/// Per-column file layout (original format). +pub struct ColumnarBitMatrix { cols: Vec, n: usize, } -impl PersistentBitMatrix { - pub fn open(dir: &Path) -> io::Result { +impl ColumnarBitMatrix { + pub(crate) fn open(dir: &Path) -> io::Result { let meta = MatrixMeta::load(dir)?; let cols = (0..meta.n_cols) .map(|c| PersistentBitVec::open(&col_path(dir, c))) @@ -24,24 +31,21 @@ impl PersistentBitMatrix { Ok(Self { cols, n: meta.n }) } - pub fn n(&self) -> usize { self.n } - pub fn n_cols(&self) -> usize { self.cols.len() } - pub fn col(&self, c: usize) -> &PersistentBitVec { &self.cols[c] } + pub(crate) fn n(&self) -> usize { self.n } + pub(crate) fn n_cols(&self) -> usize { self.cols.len() } + pub(crate) fn col(&self, c: usize) -> &PersistentBitVec { &self.cols[c] } - pub fn row(&self, slot: usize) -> Box<[bool]> { + pub(crate) fn row(&self, slot: usize) -> Box<[bool]> { self.cols.iter().map(|c| c.get(slot)).collect() } - /// Fill `buf[i]` with `col_i[slot]` as 0/1 u32, without allocating. - /// `buf` must have length ≥ `self.n_cols()`. - pub fn fill_row(&self, slot: usize, buf: &mut [u32]) { + pub(crate) fn fill_row(&self, slot: usize, buf: &mut [u32]) { for (c, col) in self.cols.iter().enumerate() { buf[c] = col.get(slot) as u32; } } - /// Returns the number of set bits in each column as `Array1`. - pub fn count_ones(&self) -> Array1 { + pub(crate) fn count_ones(&self) -> Array1 { let counts: Vec = (0..self.n_cols()) .into_par_iter() .map(|c| self.col(c).count_ones()) @@ -49,21 +53,15 @@ impl PersistentBitMatrix { Array1::from_vec(counts) } - // ── Distance matrices ───────────────────────────────────────────────────── - - pub fn jaccard_dist_matrix(&self) -> Array2 { + pub(crate) fn jaccard_dist_matrix(&self) -> Array2 { self.pairwise_f64(|i, j| self.col(i).jaccard_dist(self.col(j))) } - pub fn hamming_dist_matrix(&self) -> Array2 { + pub(crate) fn hamming_dist_matrix(&self) -> Array2 { self.pairwise_u64(|i, j| self.col(i).hamming_dist(self.col(j))) } - // ── Partial matrices (additively decomposable across layers) ────────────── - - /// Returns `(inter[n×n], union[n×n])`. - /// Reduce across layers by element-wise addition before computing `1 - inter/union`. - pub fn partial_jaccard_dist_matrix(&self) -> (Array2, Array2) { + pub(crate) fn partial_jaccard_dist_matrix(&self) -> (Array2, Array2) { let n = self.n_cols(); let results: Vec<(usize, usize, u64, u64)> = upper_pairs(n) .into_par_iter() @@ -72,7 +70,6 @@ impl PersistentBitMatrix { (i, j, inter, union) }) .collect(); - let mut inter_m = Array2::zeros((n, n)); let mut union_m = Array2::zeros((n, n)); for (i, j, inter, union) in results { @@ -82,14 +79,10 @@ impl PersistentBitMatrix { (inter_m, union_m) } - /// Returns `hamming[n×n]` (number of differing bits per pair). - /// Additive across layers — reduce by element-wise addition. - pub fn partial_hamming_dist_matrix(&self) -> Array2 { + pub(crate) fn partial_hamming_dist_matrix(&self) -> Array2 { self.pairwise_u64(|i, j| self.col(i).hamming_dist(self.col(j))) } - // ── Private helpers ─────────────────────────────────────────────────────── - fn pairwise_f64(&self, f: impl Fn(usize, usize) -> f64 + Sync) -> Array2 { let n = self.n_cols(); let results: Vec<(usize, usize, f64)> = upper_pairs(n) @@ -107,17 +100,8 @@ impl PersistentBitMatrix { .collect(); fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) } -} -// ── Column append ───────────────────────────────────────────────────────────── - -impl PersistentBitMatrix { - /// Append a new column to an existing matrix on disk. - /// - /// Reads `meta.json` to obtain `n` and the current column count, writes - /// `col_{n_cols:06}.pbiv` filled by `value_of(slot)`, then increments - /// `n_cols` in `meta.json`. - pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> bool) -> io::Result<()> { + pub(crate) fn append_column(dir: &Path, value_of: impl Fn(usize) -> bool) -> io::Result<()> { let mut meta = MatrixMeta::load(dir)?; let mut b = PersistentBitVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?; for slot in 0..meta.n { @@ -129,20 +113,208 @@ impl PersistentBitMatrix { } } -fn upper_pairs(n: usize) -> Vec<(usize, usize)> { - (0..n).flat_map(|i| (i + 1..n).map(move |j| (i, j))).collect() +// ── PackedBitMatrix ─────────────────────────────────────────────────────────── + +const PBMX_MAGIC: [u8; 4] = *b"PBMX"; +const PBMX_HEADER: usize = 24; // magic(4) + pad(4) + n_rows(8) + n_cols(8) +const PBIV_HEADER: usize = 16; // magic(4) + pad(4) + n(8) + +/// Single-file packed layout: all columns concatenated behind a header. +pub struct PackedBitMatrix { + mmap: Mmap, + n_rows: usize, + n_cols: usize, + /// Absolute byte offset to the start of each column's bit data + /// (= file offset of the PBIV blob + PBIV_HEADER). + data_offsets: Vec, } -fn fill_symmetric(n: usize, vals: impl Iterator) -> Array2 -where - T: Clone + Default, -{ - let mut m = Array2::from_elem((n, n), T::default()); - for (i, j, vij, vji) in vals { - m[[i, j]] = vij; - m[[j, i]] = vji; +impl PackedBitMatrix { + pub(crate) fn open(path: &Path) -> io::Result { + let mmap = unsafe { Mmap::map(&File::open(path)?)? }; + if mmap.len() < PBMX_HEADER { + return Err(io::Error::new(io::ErrorKind::InvalidData, "PBMX file too short")); + } + if &mmap[0..4] != &PBMX_MAGIC { + return Err(io::Error::new(io::ErrorKind::InvalidData, "bad PBMX magic")); + } + let n_rows = u64::from_le_bytes(mmap[8..16].try_into().unwrap()) as usize; + let n_cols = u64::from_le_bytes(mmap[16..24].try_into().unwrap()) as usize; + + let mut data_offsets = Vec::with_capacity(n_cols); + for c in 0..n_cols { + let off_pos = PBMX_HEADER + c * 8; + let col_file_off = u64::from_le_bytes(mmap[off_pos..off_pos+8].try_into().unwrap()) as usize; + data_offsets.push(col_file_off + PBIV_HEADER); + } + + Ok(Self { mmap, n_rows, n_cols, data_offsets }) + } + + #[inline] + pub(crate) fn fill_row(&self, slot: usize, buf: &mut [u32]) { + for (c, &data_off) in self.data_offsets.iter().enumerate() { + buf[c] = ((self.mmap[data_off + (slot >> 3)] >> (slot & 7)) & 1) as u32; + } + } + + pub(crate) fn row(&self, slot: usize) -> Box<[bool]> { + (0..self.n_cols).map(|c| { + (self.mmap[self.data_offsets[c] + (slot >> 3)] >> (slot & 7)) & 1 != 0 + }).collect() + } +} + +/// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files. +pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> { + let meta = MatrixMeta::load(dir)?; + let n_cols = meta.n_cols; + + let col_files: Vec> = (0..n_cols) + .map(|c| fs::read(col_path(dir, c))) + .collect::>()?; + + let header_size = PBMX_HEADER + n_cols * 8; + let mut col_offset = header_size; + let mut offsets = Vec::with_capacity(n_cols); + for data in &col_files { + offsets.push(col_offset as u64); + col_offset += data.len(); + } + + let packed_path = dir.join("matrix.pbmx"); + let mut file = File::create(&packed_path)?; + file.write_all(&PBMX_MAGIC)?; + file.write_all(&[0u8; 4])?; + file.write_all(&(meta.n as u64).to_le_bytes())?; + file.write_all(&(n_cols as u64).to_le_bytes())?; + for &off in &offsets { file.write_all(&off.to_le_bytes())?; } + for data in &col_files { file.write_all(data)?; } + Ok(()) +} + +// ── PersistentBitMatrix — public enum ──────────────────────────────────────── + +/// Bit matrix that transparently handles columnar, packed, and implicit formats. +/// +/// - `Columnar`: per-column `.pbiv` files (original format, used during build) +/// - `Packed`: single `matrix.pbmx` file (optimised for query — one `mmap`) +/// - `Implicit`: no file — all values are 1 (mono-genome presence/absence) +pub enum PersistentBitMatrix { + Columnar(ColumnarBitMatrix), + Packed(PackedBitMatrix), + Implicit { n_rows: usize, n_cols: usize }, +} + +impl PersistentBitMatrix { + /// Open from `layer_dir`, auto-detecting the format. + /// + /// Checks (in order): + /// 1. `layer_dir/presence/matrix.pbmx` → Packed + /// 2. `layer_dir/presence/meta.json` → Columnar + /// 3. `layer_dir/layer_meta.json` → Implicit (new index) + /// 4. `layer_dir/unitigs.bin` → Implicit with warning (old index) + pub fn open(layer_dir: &Path) -> io::Result { + let presence_dir = layer_dir.join("presence"); + + if presence_dir.join("matrix.pbmx").exists() { + return Ok(Self::Packed(PackedBitMatrix::open(&presence_dir.join("matrix.pbmx"))?)); + } + + if MatrixMeta::load(&presence_dir).is_ok() { + return Ok(Self::Columnar(ColumnarBitMatrix::open(&presence_dir)?)); + } + + // No presence matrix → Implicit; requires layer_meta.json + let meta = LayerMeta::load(layer_dir).map_err(|_| io::Error::new( + io::ErrorKind::NotFound, + format!( + "no presence matrix and no layer_meta.json in {} — run 'obikmer upgrade'", + layer_dir.display() + ), + ))?; + Ok(Self::Implicit { n_rows: meta.n, n_cols: 1 }) + } + + pub fn n(&self) -> usize { + match self { + Self::Columnar(m) => m.n(), + Self::Packed(m) => m.n_rows, + Self::Implicit { n_rows, .. } => *n_rows, + } + } + + pub fn n_cols(&self) -> usize { + match self { + Self::Columnar(m) => m.n_cols(), + Self::Packed(m) => m.n_cols, + Self::Implicit { n_cols, .. } => *n_cols, + } + } + + pub fn col(&self, c: usize) -> &PersistentBitVec { + match self { + Self::Columnar(m) => m.col(c), + _ => panic!("col() only available on Columnar PersistentBitMatrix"), + } + } + + pub fn row(&self, slot: usize) -> Box<[bool]> { + match self { + Self::Columnar(m) => m.row(slot), + Self::Packed(m) => m.row(slot), + Self::Implicit { n_cols, .. } => vec![true; *n_cols].into_boxed_slice(), + } + } + + /// Fill `buf[i]` with `col_i[slot]` as 0/1 u32, without allocating. + pub fn fill_row(&self, slot: usize, buf: &mut [u32]) { + match self { + Self::Columnar(m) => m.fill_row(slot, buf), + Self::Packed(m) => m.fill_row(slot, buf), + Self::Implicit { n_cols, .. } => buf[..*n_cols].fill(1), + } + } + + pub fn count_ones(&self) -> Array1 { + match self { + Self::Columnar(m) => m.count_ones(), + _ => panic!("count_ones() only available on Columnar PersistentBitMatrix"), + } + } + + pub fn jaccard_dist_matrix(&self) -> Array2 { + match self { + Self::Columnar(m) => m.jaccard_dist_matrix(), + _ => panic!("jaccard_dist_matrix() only available on Columnar PersistentBitMatrix"), + } + } + + pub fn hamming_dist_matrix(&self) -> Array2 { + match self { + Self::Columnar(m) => m.hamming_dist_matrix(), + _ => panic!("hamming_dist_matrix() only available on Columnar PersistentBitMatrix"), + } + } + + pub fn partial_jaccard_dist_matrix(&self) -> (Array2, Array2) { + match self { + Self::Columnar(m) => m.partial_jaccard_dist_matrix(), + _ => panic!("partial_jaccard_dist_matrix() only available on Columnar PersistentBitMatrix"), + } + } + + pub fn partial_hamming_dist_matrix(&self) -> Array2 { + match self { + Self::Columnar(m) => m.partial_hamming_dist_matrix(), + _ => panic!("partial_hamming_dist_matrix() only available on Columnar PersistentBitMatrix"), + } + } + + /// Append a new column to an on-disk Columnar matrix. + pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> bool) -> io::Result<()> { + ColumnarBitMatrix::append_column(dir, value_of) } - m } // ── Trait impls ─────────────────────────────────────────────────────────────── @@ -162,7 +334,7 @@ impl BitPartials for PersistentBitMatrix { } } -// ── Builder ─────────────────────────────────────────────────────────────────── +// ── Builder (unchanged — always builds Columnar) ────────────────────────────── pub struct PersistentBitMatrixBuilder { dir: PathBuf, @@ -189,3 +361,16 @@ impl PersistentBitMatrixBuilder { MatrixMeta { n: self.n, n_cols: self.n_cols }.save(&self.dir) } } + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +fn upper_pairs(n: usize) -> Vec<(usize, usize)> { + (0..n).flat_map(|i| (i + 1..n).map(move |j| (i, j))).collect() +} + +fn fill_symmetric(n: usize, vals: impl Iterator) -> Array2 +where T: Clone + Default { + let mut m = Array2::from_elem((n, n), T::default()); + for (i, j, vij, vji) in vals { m[[i, j]] = vij; m[[j, i]] = vji; } + m +} diff --git a/src/obicompactvec/src/intmatrix.rs b/src/obicompactvec/src/intmatrix.rs index 794d396..5fa1cc4 100644 --- a/src/obicompactvec/src/intmatrix.rs +++ b/src/obicompactvec/src/intmatrix.rs @@ -1,9 +1,14 @@ -use std::{fs, io, path::{Path, PathBuf}}; +use std::cmp::Ordering; +use std::fs::{self, File}; +use std::io::{self, Write as _}; +use std::path::{Path, PathBuf}; +use memmap2::Mmap; use ndarray::{Array1, Array2}; use rayon::prelude::*; use crate::builder::PersistentCompactIntVecBuilder; +use crate::format::{HEADER_SIZE, INDEX_ENTRY_SIZE, OVERFLOW_ENTRY_SIZE}; use crate::meta::MatrixMeta; use crate::reader::PersistentCompactIntVec; @@ -11,13 +16,15 @@ fn col_path(dir: &Path, col: usize) -> PathBuf { dir.join(format!("col_{col:06}.pciv")) } -pub struct PersistentCompactIntMatrix { +// ── ColumnarCompactIntMatrix ────────────────────────────────────────────────── + +pub struct ColumnarCompactIntMatrix { cols: Vec, n: usize, } -impl PersistentCompactIntMatrix { - pub fn open(dir: &Path) -> io::Result { +impl ColumnarCompactIntMatrix { + pub(crate) fn open(dir: &Path) -> io::Result { let meta = MatrixMeta::load(dir)?; let cols = (0..meta.n_cols) .map(|c| PersistentCompactIntVec::open(&col_path(dir, c))) @@ -25,25 +32,29 @@ impl PersistentCompactIntMatrix { Ok(Self { cols, n: meta.n }) } - pub fn n(&self) -> usize { self.n } - pub fn n_cols(&self) -> usize { self.cols.len() } - pub fn col(&self, c: usize) -> &PersistentCompactIntVec { &self.cols[c] } + pub(crate) fn n(&self) -> usize { self.n } + pub(crate) fn n_cols(&self) -> usize { self.cols.len() } + pub(crate) fn col(&self, c: usize) -> &PersistentCompactIntVec { &self.cols[c] } - pub fn row(&self, slot: usize) -> Box<[u32]> { + pub(crate) fn row(&self, slot: usize) -> Box<[u32]> { self.cols.iter().map(|c| c.get(slot)).collect() } - /// Fill `buf[i]` with `col_i[slot]`, without allocating. - /// `buf` must have length ≥ `self.n_cols()`. - pub fn fill_row(&self, slot: usize, buf: &mut [u32]) { + pub(crate) fn fill_row(&self, slot: usize, buf: &mut [u32]) { for (c, col) in self.cols.iter().enumerate() { buf[c] = col.get(slot); } } - // ── Distance matrices ───────────────────────────────────────────────────── + pub(crate) fn sum(&self) -> Array1 { + let sums: Vec = (0..self.n_cols()) + .into_par_iter() + .map(|c| self.col(c).sum()) + .collect(); + Array1::from_vec(sums) + } - pub fn bray_dist_matrix(&self) -> Array2 { + pub(crate) fn bray_dist_matrix(&self) -> Array2 { let sum_min = self.partial_bray_dist_matrix(); let col_sums = self.sum(); let n = self.n_cols(); @@ -60,63 +71,19 @@ impl PersistentCompactIntMatrix { m } - pub fn relfreq_bray_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| self.col(i).relfreq_bray_dist(self.col(j))) - } - - pub fn euclidean_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| self.col(i).euclidean_dist(self.col(j))) - } - - pub fn relfreq_euclidean_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| self.col(i).relfreq_euclidean_dist(self.col(j))) - } - - pub fn hellinger_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| self.col(i).hellinger_dist(self.col(j))) - } - - pub fn jaccard_dist_matrix(&self) -> Array2 { - self.pairwise(|i, j| self.col(i).jaccard_dist(self.col(j))) - } - - pub fn threshold_jaccard_dist_matrix(&self, threshold: u32) -> Array2 { - self.pairwise(|i, j| self.col(i).threshold_jaccard_dist(self.col(j), threshold)) - } - - /// Returns the sum of each column as `Array1`. - pub fn sum(&self) -> Array1 { - let sums: Vec = (0..self.n_cols()) - .into_par_iter() - .map(|c| self.col(c).sum()) - .collect(); - Array1::from_vec(sums) - } - - // ── Partial matrices (additively decomposable across layers) ────────────── - - /// Returns `sum_min[n×n]` where `sum_min[i,j]` = Σ_slot min(col_i[slot], col_j[slot]). - /// The denominator `col_sums[i] + col_sums[j]` is obtained from `self.sum()`. - /// Additive across layers by element-wise addition. - pub fn partial_bray_dist_matrix(&self) -> Array2 { + pub(crate) fn partial_bray_dist_matrix(&self) -> Array2 { self.pairwise_u64(|i, j| self.col(i).partial_bray_dist(self.col(j))) } - /// Returns sum of squared differences `[n×n]`. - /// Reduce across layers by element-wise addition, then take `sqrt` for the final distance. - pub fn partial_euclidean_dist_matrix(&self) -> Array2 { + pub(crate) fn partial_euclidean_dist_matrix(&self) -> Array2 { self.pairwise(|i, j| self.col(i).partial_euclidean_dist(self.col(j))) } - /// Returns `(inter[n×n], union[n×n])` for threshold-Jaccard. - /// Reduce across layers by element-wise addition before computing `1 - inter/union`. - pub fn partial_threshold_jaccard_dist_matrix( - &self, - threshold: u32, + pub(crate) fn partial_threshold_jaccard_dist_matrix( + &self, threshold: u32, ) -> (Array2, Array2) { let n = self.n_cols(); let pairs = upper_pairs(n); - let results: Vec<(usize, usize, u64, u64)> = pairs .into_par_iter() .map(|(i, j)| { @@ -125,7 +92,6 @@ impl PersistentCompactIntMatrix { (i, j, inter, union) }) .collect(); - let mut inter_m = Array2::zeros((n, n)); let mut union_m = Array2::zeros((n, n)); for (i, j, inter, union) in results { @@ -135,99 +101,299 @@ impl PersistentCompactIntMatrix { (inter_m, union_m) } - /// Returns matrix of `Σ_slot min(col_i[slot]/sum_i, col_j[slot]/sum_j)` per pair. - /// `col_sums` must be the GLOBAL sums across all layers/partitions. - /// Reduce across layers by element-wise addition; final distance = `1 - value`. - pub fn partial_relfreq_bray_dist_matrix(&self, col_sums: &Array1) -> Array2 { + pub(crate) fn partial_relfreq_bray_dist_matrix(&self, col_sums: &Array1) -> Array2 { self.pairwise(|i, j| { - self.col(i).partial_relfreq_bray_dist( - self.col(j), - col_sums[i] as f64, - col_sums[j] as f64, - ) + self.col(i).partial_relfreq_bray_dist(self.col(j), col_sums[i] as f64, col_sums[j] as f64) }) } - /// Returns matrix of `Σ_slot (col_i[slot]/sum_i - col_j[slot]/sum_j)²` per pair. - /// `col_sums` must be the GLOBAL sums across all layers/partitions. - /// Reduce across layers by element-wise addition; final distance = `sqrt(value)`. - pub fn partial_relfreq_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { + pub(crate) fn partial_relfreq_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { self.pairwise(|i, j| { - self.col(i).partial_relfreq_euclidean_dist( - self.col(j), - col_sums[i] as f64, - col_sums[j] as f64, - ) + self.col(i).partial_relfreq_euclidean_dist(self.col(j), col_sums[i] as f64, col_sums[j] as f64) }) } - /// Returns matrix of `Σ_slot (√(col_i/sum_i) - √(col_j/sum_j))²` per pair. - /// `col_sums` must be the GLOBAL sums across all layers/partitions. - /// Reduce across layers by element-wise addition; final distance = `sqrt(value) / √2`. - pub fn partial_hellinger_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { + pub(crate) fn partial_hellinger_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { self.pairwise(|i, j| { - self.col(i).partial_hellinger_euclidean_dist( - self.col(j), - col_sums[i] as f64, - col_sums[j] as f64, - ) + self.col(i).partial_hellinger_euclidean_dist(self.col(j), col_sums[i] as f64, col_sums[j] as f64) }) } - // ── Private helpers ─────────────────────────────────────────────────────── + pub(crate) fn relfreq_bray_dist_matrix(&self) -> Array2 { + self.pairwise(|i, j| self.col(i).relfreq_bray_dist(self.col(j))) + } + + pub(crate) fn euclidean_dist_matrix(&self) -> Array2 { + self.pairwise(|i, j| self.col(i).euclidean_dist(self.col(j))) + } + + pub(crate) fn relfreq_euclidean_dist_matrix(&self) -> Array2 { + self.pairwise(|i, j| self.col(i).relfreq_euclidean_dist(self.col(j))) + } + + pub(crate) fn hellinger_dist_matrix(&self) -> Array2 { + self.pairwise(|i, j| self.col(i).hellinger_dist(self.col(j))) + } + + pub(crate) fn jaccard_dist_matrix(&self) -> Array2 { + self.pairwise(|i, j| self.col(i).jaccard_dist(self.col(j))) + } + + pub(crate) fn threshold_jaccard_dist_matrix(&self, threshold: u32) -> Array2 { + self.pairwise(|i, j| self.col(i).threshold_jaccard_dist(self.col(j), threshold)) + } + + pub(crate) fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> io::Result<()> { + let mut meta = MatrixMeta::load(dir)?; + let mut b = PersistentCompactIntVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?; + for slot in 0..meta.n { b.set(slot, value_of(slot)); } + b.close()?; + meta.n_cols += 1; + meta.save(dir) + } fn pairwise(&self, f: impl Fn(usize, usize) -> f64 + Sync) -> Array2 { let n = self.n_cols(); let results: Vec<(usize, usize, f64)> = upper_pairs(n) - .into_par_iter() - .map(|(i, j)| (i, j, f(i, j))) - .collect(); + .into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect(); fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) } fn pairwise_u64(&self, f: impl Fn(usize, usize) -> u64 + Sync) -> Array2 { let n = self.n_cols(); let results: Vec<(usize, usize, u64)> = upper_pairs(n) - .into_par_iter() - .map(|(i, j)| (i, j, f(i, j))) - .collect(); + .into_par_iter().map(|(i, j)| (i, j, f(i, j))).collect(); fill_symmetric(n, results.into_iter().map(|(i, j, v)| (i, j, v, v))) } } -fn upper_pairs(n: usize) -> Vec<(usize, usize)> { - (0..n).flat_map(|i| (i + 1..n).map(move |j| (i, j))).collect() +// ── PackedCompactIntMatrix ──────────────────────────────────────────────────── + +const PCMX_MAGIC: [u8; 4] = *b"PCMX"; +const PCMX_HEADER: usize = 24; // magic(4) + pad(4) + n_rows(8) + n_cols(8) + +/// Per-column metadata pre-parsed from the embedded PCIV header. +struct ColInfo { + primary_start: usize, // absolute mmap offset to primary array + data_offset: usize, // absolute mmap offset to overflow array + n_overflow: usize, + step: usize, + index: Vec<(usize, usize)>, } -fn fill_symmetric(n: usize, vals: impl Iterator) -> Array2 -where - T: Clone + Default, -{ - let mut m = Array2::from_elem((n, n), T::default()); - for (i, j, vij, vji) in vals { - m[[i, j]] = vij; - m[[j, i]] = vji; +pub struct PackedCompactIntMatrix { + mmap: Mmap, + n_rows: usize, + n_cols: usize, + columns: Vec, +} + +impl PackedCompactIntMatrix { + pub(crate) fn open(path: &Path) -> io::Result { + let mmap = unsafe { Mmap::map(&File::open(path)?)? }; + if mmap.len() < PCMX_HEADER { + return Err(io::Error::new(io::ErrorKind::InvalidData, "PCMX file too short")); + } + if &mmap[0..4] != &PCMX_MAGIC { + return Err(io::Error::new(io::ErrorKind::InvalidData, "bad PCMX magic")); + } + let n_rows = u64::from_le_bytes(mmap[8..16].try_into().unwrap()) as usize; + let n_cols = u64::from_le_bytes(mmap[16..24].try_into().unwrap()) as usize; + + let mut columns = Vec::with_capacity(n_cols); + for c in 0..n_cols { + let off_pos = PCMX_HEADER + c * 8; + let col_base = u64::from_le_bytes(mmap[off_pos..off_pos+8].try_into().unwrap()) as usize; + // Parse embedded PCIV header at col_base + let n_ov = u64::from_le_bytes(mmap[col_base+16..col_base+24].try_into().unwrap()) as usize; + let n_idx = u64::from_le_bytes(mmap[col_base+24..col_base+32].try_into().unwrap()) as usize; + let step = u64::from_le_bytes(mmap[col_base+32..col_base+40].try_into().unwrap()) as usize; + let n_pciv = u64::from_le_bytes(mmap[col_base+8..col_base+16].try_into().unwrap()) as usize; + + let primary_start = col_base + HEADER_SIZE; + let data_offset = primary_start + n_pciv; + let index_offset = data_offset + n_ov * OVERFLOW_ENTRY_SIZE; + + let mut index = Vec::with_capacity(n_idx); + for i in 0..n_idx { + let ioff = index_offset + i * INDEX_ENTRY_SIZE; + let slot = u64::from_le_bytes(mmap[ioff..ioff+8].try_into().unwrap()) as usize; + let pos = u64::from_le_bytes(mmap[ioff+8..ioff+16].try_into().unwrap()) as usize; + index.push((slot, pos)); + } + columns.push(ColInfo { primary_start, data_offset, n_overflow: n_ov, step, index }); + } + + Ok(Self { mmap, n_rows, n_cols, columns }) + } + + #[inline] + pub(crate) fn get(&self, col: usize, slot: usize) -> u32 { + let ci = &self.columns[col]; + let v = self.mmap[ci.primary_start + slot]; + if v < 255 { return v as u32; } + self.overflow_get(ci, slot) + } + + fn overflow_get(&self, ci: &ColInfo, slot: usize) -> u32 { + let (pos_start, pos_end) = if ci.step == 0 { + (0, ci.n_overflow) + } else { + let i = ci.index.partition_point(|&(s, _)| s <= slot).saturating_sub(1); + let start = ci.index[i].1; + let end = if i + 1 < ci.index.len() { ci.index[i+1].1 } else { ci.n_overflow }; + (start, end) + }; + let mut lo = pos_start; + let mut hi = pos_end; + while lo < hi { + let mid = lo + (hi - lo) / 2; + let off = ci.data_offset + mid * OVERFLOW_ENTRY_SIZE; + let stored = u64::from_le_bytes(self.mmap[off..off+8].try_into().unwrap()) as usize; + match stored.cmp(&slot) { + Ordering::Equal => return u32::from_le_bytes(self.mmap[off+8..off+12].try_into().unwrap()), + Ordering::Less => lo = mid + 1, + Ordering::Greater => hi = mid, + } + } + panic!("slot {slot} marked overflow but not found") + } + + pub(crate) fn fill_row(&self, slot: usize, buf: &mut [u32]) { + for c in 0..self.n_cols { buf[c] = self.get(c, slot); } + } + + pub(crate) fn row(&self, slot: usize) -> Box<[u32]> { + (0..self.n_cols).map(|c| self.get(c, slot)).collect() } - m } -// ── Column append ───────────────────────────────────────────────────────────── +/// Build `counts/matrix.pcmx` from existing `col_*.pciv` files. +pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> { + let meta = MatrixMeta::load(dir)?; + let n_cols = meta.n_cols; + + let col_files: Vec> = (0..n_cols) + .map(|c| fs::read(col_path(dir, c))) + .collect::>()?; + + let header_size = PCMX_HEADER + n_cols * 8; + let mut col_offset = header_size; + let mut offsets = Vec::with_capacity(n_cols); + for data in &col_files { + offsets.push(col_offset as u64); + col_offset += data.len(); + } + + let packed_path = dir.join("matrix.pcmx"); + let mut file = File::create(&packed_path)?; + file.write_all(&PCMX_MAGIC)?; + file.write_all(&[0u8; 4])?; + file.write_all(&(meta.n as u64).to_le_bytes())?; + file.write_all(&(n_cols as u64).to_le_bytes())?; + for &off in &offsets { file.write_all(&off.to_le_bytes())?; } + for data in &col_files { file.write_all(data)?; } + Ok(()) +} + +// ── PersistentCompactIntMatrix — public enum ────────────────────────────────── + +pub enum PersistentCompactIntMatrix { + Columnar(ColumnarCompactIntMatrix), + Packed(PackedCompactIntMatrix), +} impl PersistentCompactIntMatrix { - /// Append a new column to an existing matrix on disk. - /// - /// Reads `meta.json` to obtain `n` and the current column count, writes - /// `col_{n_cols:06}.pciv` filled by `value_of(slot)`, then increments - /// `n_cols` in `meta.json`. - pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> io::Result<()> { - let mut meta = MatrixMeta::load(dir)?; - let mut b = PersistentCompactIntVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?; - for slot in 0..meta.n { - b.set(slot, value_of(slot)); + /// Open from `layer_dir`, auto-detecting Packed or Columnar. + pub fn open(layer_dir: &Path) -> io::Result { + let counts_dir = layer_dir.join("counts"); + + if counts_dir.join("matrix.pcmx").exists() { + return Ok(Self::Packed(PackedCompactIntMatrix::open(&counts_dir.join("matrix.pcmx"))?)); } - b.close()?; - meta.n_cols += 1; - meta.save(dir) + + if MatrixMeta::load(&counts_dir).is_ok() { + return Ok(Self::Columnar(ColumnarCompactIntMatrix::open(&counts_dir)?)); + } + + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("no count matrix found in {} — run 'obikmer upgrade'", layer_dir.display()), + )) + } + + pub fn n(&self) -> usize { + match self { Self::Columnar(m) => m.n(), Self::Packed(m) => m.n_rows } + } + + pub fn n_cols(&self) -> usize { + match self { Self::Columnar(m) => m.n_cols(), Self::Packed(m) => m.n_cols } + } + + pub fn col(&self, c: usize) -> &PersistentCompactIntVec { + match self { + Self::Columnar(m) => m.col(c), + _ => panic!("col() only available on Columnar PersistentCompactIntMatrix"), + } + } + + pub fn row(&self, slot: usize) -> Box<[u32]> { + match self { Self::Columnar(m) => m.row(slot), Self::Packed(m) => m.row(slot) } + } + + pub fn fill_row(&self, slot: usize, buf: &mut [u32]) { + match self { Self::Columnar(m) => m.fill_row(slot, buf), Self::Packed(m) => m.fill_row(slot, buf) } + } + + pub fn sum(&self) -> Array1 { + match self { + Self::Columnar(m) => m.sum(), + _ => panic!("sum() only available on Columnar PersistentCompactIntMatrix"), + } + } + + pub fn bray_dist_matrix(&self) -> Array2 { + match self { Self::Columnar(m) => m.bray_dist_matrix(), _ => panic!("Columnar only") } + } + pub fn relfreq_bray_dist_matrix(&self) -> Array2 { + match self { Self::Columnar(m) => m.relfreq_bray_dist_matrix(), _ => panic!("Columnar only") } + } + pub fn euclidean_dist_matrix(&self) -> Array2 { + match self { Self::Columnar(m) => m.euclidean_dist_matrix(), _ => panic!("Columnar only") } + } + pub fn relfreq_euclidean_dist_matrix(&self) -> Array2 { + match self { Self::Columnar(m) => m.relfreq_euclidean_dist_matrix(), _ => panic!("Columnar only") } + } + pub fn hellinger_dist_matrix(&self) -> Array2 { + match self { Self::Columnar(m) => m.hellinger_dist_matrix(), _ => panic!("Columnar only") } + } + pub fn jaccard_dist_matrix(&self) -> Array2 { + match self { Self::Columnar(m) => m.jaccard_dist_matrix(), _ => panic!("Columnar only") } + } + pub fn threshold_jaccard_dist_matrix(&self, threshold: u32) -> Array2 { + match self { Self::Columnar(m) => m.threshold_jaccard_dist_matrix(threshold), _ => panic!("Columnar only") } + } + pub fn partial_bray_dist_matrix(&self) -> Array2 { + match self { Self::Columnar(m) => m.partial_bray_dist_matrix(), _ => panic!("Columnar only") } + } + pub fn partial_euclidean_dist_matrix(&self) -> Array2 { + match self { Self::Columnar(m) => m.partial_euclidean_dist_matrix(), _ => panic!("Columnar only") } + } + pub fn partial_threshold_jaccard_dist_matrix(&self, threshold: u32) -> (Array2, Array2) { + match self { Self::Columnar(m) => m.partial_threshold_jaccard_dist_matrix(threshold), _ => panic!("Columnar only") } + } + pub fn partial_relfreq_bray_dist_matrix(&self, col_sums: &Array1) -> Array2 { + match self { Self::Columnar(m) => m.partial_relfreq_bray_dist_matrix(col_sums), _ => panic!("Columnar only") } + } + pub fn partial_relfreq_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { + match self { Self::Columnar(m) => m.partial_relfreq_euclidean_dist_matrix(col_sums), _ => panic!("Columnar only") } + } + pub fn partial_hellinger_euclidean_dist_matrix(&self, col_sums: &Array1) -> Array2 { + match self { Self::Columnar(m) => m.partial_hellinger_euclidean_dist_matrix(col_sums), _ => panic!("Columnar only") } + } + + pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> io::Result<()> { + ColumnarCompactIntMatrix::append_column(dir, value_of) } } @@ -240,24 +406,12 @@ impl ColumnWeights for PersistentCompactIntMatrix { } impl CountPartials for PersistentCompactIntMatrix { - fn partial_bray(&self) -> Array2 { - self.partial_bray_dist_matrix() - } - fn partial_euclidean(&self) -> Array2 { - self.partial_euclidean_dist_matrix() - } - fn partial_threshold_jaccard(&self, threshold: u32) -> (Array2, Array2) { - self.partial_threshold_jaccard_dist_matrix(threshold) - } - fn partial_relfreq_bray(&self, global: &Array1) -> Array2 { - self.partial_relfreq_bray_dist_matrix(global) - } - fn partial_relfreq_euclidean(&self, global: &Array1) -> Array2 { - self.partial_relfreq_euclidean_dist_matrix(global) - } - fn partial_hellinger(&self, global: &Array1) -> Array2 { - self.partial_hellinger_euclidean_dist_matrix(global) - } + fn partial_bray(&self) -> Array2 { self.partial_bray_dist_matrix() } + fn partial_euclidean(&self) -> Array2 { self.partial_euclidean_dist_matrix() } + fn partial_threshold_jaccard(&self, t: u32) -> (Array2, Array2) { self.partial_threshold_jaccard_dist_matrix(t) } + fn partial_relfreq_bray(&self, g: &Array1) -> Array2 { self.partial_relfreq_bray_dist_matrix(g) } + fn partial_relfreq_euclidean(&self, g: &Array1) -> Array2 { self.partial_relfreq_euclidean_dist_matrix(g) } + fn partial_hellinger(&self, g: &Array1) -> Array2 { self.partial_hellinger_euclidean_dist_matrix(g) } } // ── Builder ─────────────────────────────────────────────────────────────────── @@ -287,3 +441,16 @@ impl PersistentCompactIntMatrixBuilder { MatrixMeta { n: self.n, n_cols: self.n_cols }.save(&self.dir) } } + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +fn upper_pairs(n: usize) -> Vec<(usize, usize)> { + (0..n).flat_map(|i| (i + 1..n).map(move |j| (i, j))).collect() +} + +fn fill_symmetric(n: usize, vals: impl Iterator) -> Array2 +where T: Clone + Default { + let mut m = Array2::from_elem((n, n), T::default()); + for (i, j, vij, vji) in vals { m[[i, j]] = vij; m[[j, i]] = vji; } + m +} diff --git a/src/obicompactvec/src/layer_meta.rs b/src/obicompactvec/src/layer_meta.rs new file mode 100644 index 0000000..65dc5bc --- /dev/null +++ b/src/obicompactvec/src/layer_meta.rs @@ -0,0 +1,33 @@ +use std::{fs, io, path::Path}; + +/// Lightweight metadata stored at the layer level (`layer_meta.json`). +/// +/// Written by `obilayeredmap::MphfLayer::build` alongside `mphf.bin`. +/// Read by `PersistentBitMatrix::open` to determine `n_rows` for the +/// implicit (mono-genome presence/absence) case. +pub struct LayerMeta { + pub n: usize, +} + +impl LayerMeta { + pub const FILENAME: &'static str = "layer_meta.json"; + + pub fn save(layer_dir: &Path, n: usize) -> io::Result<()> { + fs::write(layer_dir.join(Self::FILENAME), format!("{{\"n\":{n}}}\n")) + } + + pub fn load(layer_dir: &Path) -> io::Result { + let s = fs::read_to_string(layer_dir.join(Self::FILENAME))?; + Self::parse(&s) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "bad layer_meta.json")) + } + + fn parse(s: &str) -> Option { + let key = "\"n\":"; + let pos = s.find(key)? + key.len(); + let rest = s[pos..].trim_start(); + let end = rest.find(|c: char| !c.is_ascii_digit()).unwrap_or(rest.len()); + let n = rest[..end].parse().ok()?; + Some(Self { n }) + } +} diff --git a/src/obicompactvec/src/lib.rs b/src/obicompactvec/src/lib.rs index 270956f..8a1e5bb 100644 --- a/src/obicompactvec/src/lib.rs +++ b/src/obicompactvec/src/lib.rs @@ -3,14 +3,16 @@ mod bitmatrix; mod builder; mod format; mod intmatrix; +mod layer_meta; mod meta; mod reader; pub mod traits; pub use bitvec::{BitIter, PersistentBitVec, PersistentBitVecBuilder}; -pub use bitmatrix::{PersistentBitMatrix, PersistentBitMatrixBuilder}; +pub use bitmatrix::{PersistentBitMatrix, PersistentBitMatrixBuilder, pack_bit_matrix}; pub use builder::PersistentCompactIntVecBuilder; -pub use intmatrix::{PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder}; +pub use intmatrix::{PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, pack_compact_int_matrix}; +pub use layer_meta::LayerMeta; pub use reader::PersistentCompactIntVec; pub use traits::{BitPartials, ColumnWeights, CountPartials}; diff --git a/src/obicompactvec/src/tests/bitmatrix.rs b/src/obicompactvec/src/tests/bitmatrix.rs index f5845d6..d825d85 100644 --- a/src/obicompactvec/src/tests/bitmatrix.rs +++ b/src/obicompactvec/src/tests/bitmatrix.rs @@ -5,7 +5,8 @@ use crate::{PersistentBitMatrix, PersistentBitMatrixBuilder}; fn make_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) { let n = cols.first().map_or(0, |c| c.len()); let dir = tempdir().unwrap(); - let mut b = PersistentBitMatrixBuilder::new(n, dir.path()).unwrap(); + let presence = dir.path().join("presence"); + let mut b = PersistentBitMatrixBuilder::new(n, &presence).unwrap(); for &col in cols { let mut cb = b.add_col().unwrap(); for (slot, &v) in col.iter().enumerate() { @@ -21,7 +22,8 @@ fn make_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) { #[test] fn single_col_roundtrip() { let dir = tempdir().unwrap(); - let mut b = PersistentBitMatrixBuilder::new(4, dir.path()).unwrap(); + let presence = dir.path().join("presence"); + let mut b = PersistentBitMatrixBuilder::new(4, &presence).unwrap(); let mut col = b.add_col().unwrap(); col.set(0, true); col.set(1, false); @@ -42,7 +44,8 @@ fn single_col_roundtrip() { #[test] fn two_cols_roundtrip() { let dir = tempdir().unwrap(); - let mut b = PersistentBitMatrixBuilder::new(3, dir.path()).unwrap(); + let presence = dir.path().join("presence"); + let mut b = PersistentBitMatrixBuilder::new(3, &presence).unwrap(); let mut col0 = b.add_col().unwrap(); col0.set(0, true); col0.set(1, false); col0.set(2, true); col0.close().unwrap(); @@ -61,7 +64,8 @@ fn two_cols_roundtrip() { #[test] fn col_accessor() { let dir = tempdir().unwrap(); - let mut b = PersistentBitMatrixBuilder::new(3, dir.path()).unwrap(); + let presence = dir.path().join("presence"); + let mut b = PersistentBitMatrixBuilder::new(3, &presence).unwrap(); let mut col = b.add_col().unwrap(); col.set(0, true); col.set(1, false); col.set(2, true); col.close().unwrap(); @@ -76,7 +80,8 @@ fn col_accessor() { #[test] fn zero_cols_roundtrip() { let dir = tempdir().unwrap(); - let b = PersistentBitMatrixBuilder::new(8, dir.path()).unwrap(); + let presence = dir.path().join("presence"); + let b = PersistentBitMatrixBuilder::new(8, &presence).unwrap(); b.close().unwrap(); let m = PersistentBitMatrix::open(dir.path()).unwrap(); @@ -89,9 +94,9 @@ fn zero_cols_roundtrip() { #[test] fn count_ones_per_column() { let (_d, m) = make_matrix(&[ - &[true, false, true, true], // 3 ones - &[false, false, false, false], // 0 ones - &[true, true, true, false], // 3 ones + &[true, false, true, true], + &[false, false, false, false], + &[true, true, true, false], ]); let c = m.count_ones(); assert_eq!(c[0], 3); diff --git a/src/obicompactvec/src/tests/intmatrix.rs b/src/obicompactvec/src/tests/intmatrix.rs index 2b9d45e..b9e1841 100644 --- a/src/obicompactvec/src/tests/intmatrix.rs +++ b/src/obicompactvec/src/tests/intmatrix.rs @@ -5,7 +5,7 @@ use crate::{PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder}; fn make_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) { let n = cols.first().map_or(0, |c| c.len()); let dir = tempdir().unwrap(); - let mut b = PersistentCompactIntMatrixBuilder::new(n, dir.path()).unwrap(); + let mut b = PersistentCompactIntMatrixBuilder::new(n, &dir.path().join("counts")).unwrap(); for &col in cols { let mut cb = b.add_col().unwrap(); for (slot, &v) in col.iter().enumerate() { @@ -21,7 +21,7 @@ fn make_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatri #[test] fn single_col_roundtrip() { let dir = tempdir().unwrap(); - let mut b = PersistentCompactIntMatrixBuilder::new(4, dir.path()).unwrap(); + let mut b = PersistentCompactIntMatrixBuilder::new(4, &dir.path().join("counts")).unwrap(); let mut col = b.add_col().unwrap(); col.set(0, 10); col.set(1, 200); @@ -42,7 +42,7 @@ fn single_col_roundtrip() { #[test] fn two_cols_roundtrip() { let dir = tempdir().unwrap(); - let mut b = PersistentCompactIntMatrixBuilder::new(3, dir.path()).unwrap(); + let mut b = PersistentCompactIntMatrixBuilder::new(3, &dir.path().join("counts")).unwrap(); let mut col0 = b.add_col().unwrap(); col0.set(0, 1); col0.set(1, 2); col0.set(2, 3); col0.close().unwrap(); @@ -61,7 +61,7 @@ fn two_cols_roundtrip() { #[test] fn col_accessor() { let dir = tempdir().unwrap(); - let mut b = PersistentCompactIntMatrixBuilder::new(2, dir.path()).unwrap(); + let mut b = PersistentCompactIntMatrixBuilder::new(2, &dir.path().join("counts")).unwrap(); let mut col0 = b.add_col().unwrap(); col0.set(0, 5); col0.set(1, 7); col0.close().unwrap(); @@ -75,7 +75,7 @@ fn col_accessor() { #[test] fn zero_cols_roundtrip() { let dir = tempdir().unwrap(); - let b = PersistentCompactIntMatrixBuilder::new(10, dir.path()).unwrap(); + let b = PersistentCompactIntMatrixBuilder::new(10, &dir.path().join("counts")).unwrap(); b.close().unwrap(); let m = PersistentCompactIntMatrix::open(dir.path()).unwrap(); diff --git a/src/obikindex/src/index.rs b/src/obikindex/src/index.rs index 8e207a8..6583968 100644 --- a/src/obikindex/src/index.rs +++ b/src/obikindex/src/index.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex}; use indicatif::{ProgressBar, ProgressStyle}; use obikpartitionner::{KmerPartition, KmerSpectrum}; +use obilayeredmap; use obisys::{Reporter, Stage}; use rayon::prelude::*; use tracing::info; @@ -199,6 +200,87 @@ impl KmerIndex { .join(format!("layer_{layer}")) .join("unitigs.bin") } + + /// Pack all partition matrices into single-file format (presence → .pbmx, counts → .pcmx). + /// + /// Reduces per-query file-open overhead from O(n_genomes) to O(1) per partition. + /// Column files are kept in place; packed files take priority when opening. + pub fn pack_matrices(&self) -> OKIResult<()> { + use obicompactvec::{pack_bit_matrix, pack_compact_int_matrix}; + use obilayeredmap::meta::PartitionMeta; + + let n = self.n_partitions(); + let errors: Vec<_> = (0..n) + .into_par_iter() + .filter_map(|i| { + let index_dir = self.partition.part_dir(i).join("index"); + if !index_dir.exists() { return None; } + let meta = match PartitionMeta::load(&index_dir) { + Ok(m) => m, + Err(e) => return Some(OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))), + }; + for l in 0..meta.n_layers { + let layer_dir = index_dir.join(format!("layer_{l}")); + let presence_dir = layer_dir.join("presence"); + let counts_dir = layer_dir.join("counts"); + if presence_dir.exists() { + if let Err(e) = pack_bit_matrix(&presence_dir) { + return Some(OKIError::Io(e)); + } + } + if counts_dir.exists() { + if let Err(e) = pack_compact_int_matrix(&counts_dir) { + return Some(OKIError::Io(e)); + } + } + } + None + }) + .collect(); + + if let Some(e) = errors.into_iter().next() { return Err(e); } + Ok(()) + } + + /// Write a `layer_meta.json` in any layer directory that is missing one. + /// + /// Old indexes were built before this file was required. The number of + /// kmers is recovered from `unitigs.bin`, which is always present. + pub fn upgrade_layer_meta(&self) -> OKIResult<()> { + use obicompactvec::LayerMeta; + use obiskio::UnitigFileReader; + use obilayeredmap::meta::PartitionMeta; + + let n = self.n_partitions(); + let errors: Vec<_> = (0..n) + .into_par_iter() + .filter_map(|i| { + let index_dir = self.partition.part_dir(i).join("index"); + if !index_dir.exists() { return None; } + let meta = match PartitionMeta::load(&index_dir) { + Ok(m) => m, + Err(e) => return Some(OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))), + }; + for l in 0..meta.n_layers { + let layer_dir = index_dir.join(format!("layer_{l}")); + let meta_path = layer_dir.join(LayerMeta::FILENAME); + if meta_path.exists() { continue; } + let unitigs_path = layer_dir.join("unitigs.bin"); + let n_kmers = match UnitigFileReader::open_sequential(&unitigs_path) { + Ok(r) => r.n_kmers(), + Err(e) => return Some(OKIError::Partition(e)), + }; + if let Err(e) = LayerMeta::save(&layer_dir, n_kmers) { + return Some(OKIError::Io(e)); + } + } + None + }) + .collect(); + + if let Some(e) = errors.into_iter().next() { return Err(e); } + Ok(()) + } } fn label_from_path(path: &Path) -> String { diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index 39b4395..dc129d8 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -196,6 +196,16 @@ impl KmerIndex { rep.push(t.stop()); } + // ── Pack matrices after merge ───────────────────────────────────────── + { + let t = Stage::start("pack"); + let pb = spinner("pack — consolidating column files …"); + let dst2 = KmerIndex::open(output)?; + dst2.pack_matrices()?; + pb.finish_and_clear(); + rep.push(t.stop()); + } + // Re-open to get the updated state. KmerIndex::open(output) } diff --git a/src/obikmer/src/cmd/mod.rs b/src/obikmer/src/cmd/mod.rs index 2fc940a..569b543 100644 --- a/src/obikmer/src/cmd/mod.rs +++ b/src/obikmer/src/cmd/mod.rs @@ -1,4 +1,5 @@ pub mod annotate; +pub mod pack; pub mod utils; pub mod distance; pub mod dump; diff --git a/src/obikmer/src/cmd/pack.rs b/src/obikmer/src/cmd/pack.rs new file mode 100644 index 0000000..f3c0d3d --- /dev/null +++ b/src/obikmer/src/cmd/pack.rs @@ -0,0 +1,36 @@ +use std::path::PathBuf; + +use clap::Args; +use obikindex::KmerIndex; +use obisys::{Reporter, Stage}; +use tracing::info; + +#[derive(Args)] +pub struct PackArgs { + /// Index directory to pack + pub index: PathBuf, +} + +pub fn run(args: PackArgs) { + let idx = KmerIndex::open(&args.index).unwrap_or_else(|e| { + eprintln!("error opening index: {e}"); + std::process::exit(1); + }); + + info!( + "pack: {} partition(s), {} genome(s)", + idx.n_partitions(), + idx.meta().genomes.len(), + ); + + let mut rep = Reporter::new(); + let t = Stage::start("pack"); + + idx.pack_matrices().unwrap_or_else(|e| { + eprintln!("pack error: {e}"); + std::process::exit(1); + }); + + rep.push(t.stop()); + rep.print(); +} diff --git a/src/obikmer/src/cmd/utils.rs b/src/obikmer/src/cmd/utils.rs index 0e7ec20..3bbaf4f 100644 --- a/src/obikmer/src/cmd/utils.rs +++ b/src/obikmer/src/cmd/utils.rs @@ -2,6 +2,7 @@ use std::path::PathBuf; use clap::Args; use obikindex::{validate_label, KmerIndex}; +use obikseq::set_k; use tracing::info; #[derive(Args)] @@ -12,6 +13,10 @@ pub struct UtilsArgs { /// Set a new genome label: NEW_LABEL=OLD_LABEL #[arg(long, value_name = "NEW=OLD")] pub new_label: Option, + + /// Add missing layer_meta.json files to each layer (required after upgrading from old indexes) + #[arg(long)] + pub upgrade_index: bool, } pub fn run(args: UtilsArgs) { @@ -22,12 +27,30 @@ pub fn run(args: UtilsArgs) { run_rename(&args.index, spec); } + if args.upgrade_index { + any = true; + run_upgrade_index(&args.index); + } + if !any { - eprintln!("utils: no operation specified. Available options: --new-label NEW=OLD"); + eprintln!("utils: no operation specified. Available options: --new-label NEW=OLD, --upgrade-index"); std::process::exit(1); } } +fn run_upgrade_index(index_path: &PathBuf) { + let idx = KmerIndex::open(index_path).unwrap_or_else(|e| { + eprintln!("error opening index: {e}"); + std::process::exit(1); + }); + set_k(idx.kmer_size()); + idx.upgrade_layer_meta().unwrap_or_else(|e| { + eprintln!("upgrade error: {e}"); + std::process::exit(1); + }); + info!("upgrade-index: layer_meta.json written to all layers that were missing it"); +} + fn run_rename(index_path: &PathBuf, spec: &str) { let (old_label, new_label) = parse_rename_spec(spec); diff --git a/src/obikmer/src/main.rs b/src/obikmer/src/main.rs index a872e46..47ff4c7 100644 --- a/src/obikmer/src/main.rs +++ b/src/obikmer/src/main.rs @@ -38,6 +38,8 @@ enum Commands { Reindex(cmd::reindex::ReindexArgs), /// Miscellaneous index utilities (--rename, …) Utils(cmd::utils::UtilsArgs), + /// Pack matrix column files into single-file format to reduce query I/O + Pack(cmd::pack::PackArgs), } fn main() { @@ -71,6 +73,7 @@ fn main() { Commands::Estimate(args) => cmd::estimate::run(args), Commands::Reindex(args) => cmd::reindex::run(args), Commands::Utils(args) => cmd::utils::run(args), + Commands::Pack(args) => cmd::pack::run(args), } #[cfg(feature = "profiling")] diff --git a/src/obikpartitionner/src/distance.rs b/src/obikpartitionner/src/distance.rs index b5e6914..f25a45f 100644 --- a/src/obikpartitionner/src/distance.rs +++ b/src/obikpartitionner/src/distance.rs @@ -22,8 +22,9 @@ impl KmerPartition { } let matrices = (0..probe_n_layers(&index_dir)) .filter_map(|l| { - let dir = index_dir.join(format!("layer_{l}")).join("counts"); - dir.exists().then(|| PersistentCompactIntMatrix::open(&dir).map_err(SKError::Io)) + let layer_dir = index_dir.join(format!("layer_{l}")); + layer_dir.join("counts").exists() + .then(|| PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)) }) .collect::>>()?; Ok(LayeredStore::new(matrices)) @@ -38,8 +39,9 @@ impl KmerPartition { } let matrices = (0..probe_n_layers(&index_dir)) .filter_map(|l| { - let dir = index_dir.join(format!("layer_{l}")).join("presence"); - dir.exists().then(|| PersistentBitMatrix::open(&dir).map_err(SKError::Io)) + let layer_dir = index_dir.join(format!("layer_{l}")); + layer_dir.join("presence").exists() + .then(|| PersistentBitMatrix::open(&layer_dir).map_err(SKError::Io)) }) .collect::>>()?; Ok(LayeredStore::new(matrices)) diff --git a/src/obikpartitionner/src/dump_layer.rs b/src/obikpartitionner/src/dump_layer.rs index 4f060e9..77c7cd3 100644 --- a/src/obikpartitionner/src/dump_layer.rs +++ b/src/obikpartitionner/src/dump_layer.rs @@ -51,14 +51,14 @@ impl KmerPartition { let presence_dir = layer_dir.join("presence"); if use_counts && counts_dir.exists() { - let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; + let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if let Some(slot) = mphf.find(kmer) { cb(kmer, mat.row(slot)); } } } else if !use_counts && presence_dir.exists() { - let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; + let mat = PersistentBitMatrix::open(&layer_dir).map_err(SKError::Io)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if let Some(slot) = mphf.find(kmer) { let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect(); @@ -108,14 +108,14 @@ impl KmerPartition { let presence_dir = layer_dir.join("presence"); if use_counts && counts_dir.exists() { - let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; + let mat = PersistentCompactIntMatrix::open(&layer_dir).map_err(SKError::Io)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if let Some(slot) = mphf.find(kmer) { cb(part, layer, kmer, mat.row(slot)); } } } else if !use_counts && presence_dir.exists() { - let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; + let mat = PersistentBitMatrix::open(&layer_dir).map_err(SKError::Io)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { if let Some(slot) = mphf.find(kmer) { let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect(); diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index b8673a2..3af515b 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -45,37 +45,35 @@ impl ColBuilder { // ── SrcLayerData — opened source matrix for pass-2 lookup ───────────────────── pub(crate) enum SrcLayerData { - /// Pure set-membership layer (no data matrix): every kmer is present in all genomes. - SetMembership, Presence(MphfOnly, PersistentBitMatrix), Count(MphfOnly, PersistentCompactIntMatrix), } impl SrcLayerData { pub(crate) fn open(layer_dir: &Path, merge_mode: MergeMode) -> SKResult { - let presence_dir = layer_dir.join("presence"); - let counts_dir = layer_dir.join("counts"); + let counts_dir = layer_dir.join("counts"); match merge_mode { MergeMode::Presence => { - if presence_dir.exists() { + if counts_dir.exists() && !layer_dir.join("presence").exists() { let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; - let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; - Ok(SrcLayerData::Presence(mphf, mat)) - } else if counts_dir.exists() { - let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; - let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; + let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Count(mphf, mat)) } else { - Ok(SrcLayerData::SetMembership) + // presence dir exists, or neither exists → Implicit handled by open() + let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; + let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; + Ok(SrcLayerData::Presence(mphf, mat)) } } MergeMode::Count => { + let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; if counts_dir.exists() { - let mphf = MphfOnly::open(layer_dir).map_err(olm_to_sk)?; - let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; + let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(SrcLayerData::Count(mphf, mat)) } else { - Ok(SrcLayerData::SetMembership) + // No counts → treat as implicit presence (all 1s) + let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; + Ok(SrcLayerData::Presence(mphf, mat)) } } } @@ -85,15 +83,12 @@ impl SrcLayerData { /// The caller guarantees `kmer` is in the source MPHF domain. #[inline] pub(crate) fn lookup(&self, kmer: CanonicalKmer, n_genomes: usize) -> Vec { + let mut buf = vec![0u32; n_genomes]; match self { - SrcLayerData::SetMembership => vec![1u32; n_genomes], - SrcLayerData::Presence(mphf, mat) => { - mat.row(mphf.index(kmer)).iter().map(|&b| b as u32).collect() - } - SrcLayerData::Count(mphf, mat) => { - mat.row(mphf.index(kmer)).iter().copied().collect() - } + SrcLayerData::Presence(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf), + SrcLayerData::Count(mphf, mat) => mat.fill_row(mphf.index(kmer), &mut buf), } + buf } } diff --git a/src/obikpartitionner/src/query_layer.rs b/src/obikpartitionner/src/query_layer.rs index 97e09e3..e918ec6 100644 --- a/src/obikpartitionner/src/query_layer.rs +++ b/src/obikpartitionner/src/query_layer.rs @@ -20,48 +20,33 @@ fn olm_to_sk(e: OLMError) -> SKError { // ── per-layer query handle ──────────────────────────────────────────────────── enum QueryLayer { - /// Layer<()> — MPHF-only, no data matrix; all indexed kmers map to 1 per genome. - SetOnly(MphfLayer), Presence(MphfLayer, PersistentBitMatrix), Count(MphfLayer, PersistentCompactIntMatrix), } impl QueryLayer { fn open(layer_dir: &Path, with_counts: bool, mode: &IndexMode) -> SKResult { + let mphf = MphfLayer::open(layer_dir, mode).map_err(olm_to_sk)?; + let counts_dir = layer_dir.join("counts"); let presence_dir = layer_dir.join("presence"); - let counts_dir = layer_dir.join("counts"); if with_counts && counts_dir.exists() { - let mphf = MphfLayer::open(layer_dir, mode).map_err(olm_to_sk)?; - let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; + let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(QueryLayer::Count(mphf, mat)) - } else if presence_dir.exists() { - let mphf = MphfLayer::open(layer_dir, mode).map_err(olm_to_sk)?; - let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; + } else if presence_dir.exists() || !counts_dir.exists() { + // presence mode, or no matrix at all → Implicit handled inside open() + let mat = PersistentBitMatrix::open(layer_dir).map_err(SKError::Io)?; Ok(QueryLayer::Presence(mphf, mat)) - } else if counts_dir.exists() { - // presence query on a count index — return counts as-is - let mphf = MphfLayer::open(layer_dir, mode).map_err(olm_to_sk)?; - let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; - Ok(QueryLayer::Count(mphf, mat)) } else { - let mphf = MphfLayer::open(layer_dir, mode).map_err(olm_to_sk)?; - Ok(QueryLayer::SetOnly(mphf)) + // counts exist but not presence — count layer, no presence requested + let mat = PersistentCompactIntMatrix::open(layer_dir).map_err(SKError::Io)?; + Ok(QueryLayer::Count(mphf, mat)) } } - /// Write per-genome values into `buf` if `kmer` is indexed in this layer. - /// Returns `true` on hit; `buf` is untouched on miss. + /// Write per-genome values into `buf` if `kmer` is indexed; returns true on hit. fn find_into(&self, kmer: CanonicalKmer, n_genomes: usize, buf: &mut [u32]) -> bool { match self { - QueryLayer::SetOnly(mphf) => { - if mphf.find(kmer).is_some() { - buf[..n_genomes].fill(1); - true - } else { - false - } - } QueryLayer::Presence(mphf, mat) => { if let Some(slot) = mphf.find(kmer) { mat.fill_row(slot, &mut buf[..n_genomes]); @@ -87,14 +72,11 @@ impl QueryLayer { impl KmerPartition { /// Query a single partition, calling `on_hit(sk_idx, kmer_idx, row)` for /// every found k-mer without allocating intermediate result vectors. - /// - /// `row` is a shared scratch buffer valid only for the duration of the call; - /// the callback must copy what it needs before returning. pub fn query_partition_with( &self, part_idx: usize, superkmers: &[&RoutableSuperKmer], - k: usize, + _k: usize, n_genomes: usize, with_counts: bool, mut on_hit: F, @@ -133,13 +115,13 @@ impl KmerPartition { Ok(()) } - /// Query a single partition for a slice of (already-routed) super-kmers. + /// Query a single partition for a slice of super-kmers, returning per-kmer rows. /// Prefer [`query_partition_with`] to avoid per-kmer heap allocations. pub fn query_partition( &self, part_idx: usize, superkmers: &[&RoutableSuperKmer], - k: usize, + _k: usize, n_genomes: usize, with_counts: bool, ) -> SKResult>>>> { @@ -152,7 +134,7 @@ impl KmerPartition { if !index_dir.exists() { return Ok(superkmers .iter() - .map(|rsk| vec![None; rsk.seql() - k + 1]) + .map(|rsk| vec![None; rsk.seql()]) .collect()); } diff --git a/src/obilayeredmap/src/layer.rs b/src/obilayeredmap/src/layer.rs index 63b99d2..72b38ea 100644 --- a/src/obilayeredmap/src/layer.rs +++ b/src/obilayeredmap/src/layer.rs @@ -34,7 +34,7 @@ impl LayerData for () { impl LayerData for PersistentCompactIntMatrix { type Item = Box<[u32]>; fn open(layer_dir: &Path) -> OLMResult { - PersistentCompactIntMatrix::open(&layer_dir.join(COUNTS_DIR)).map_err(OLMError::Io) + PersistentCompactIntMatrix::open(layer_dir).map_err(OLMError::Io) } fn read(&self, slot: usize) -> Box<[u32]> { self.row(slot) } } @@ -42,7 +42,7 @@ impl LayerData for PersistentCompactIntMatrix { impl LayerData for PersistentBitMatrix { type Item = Box<[bool]>; fn open(layer_dir: &Path) -> OLMResult { - PersistentBitMatrix::open(&layer_dir.join(PRESENCE_DIR)).map_err(OLMError::Io) + PersistentBitMatrix::open(layer_dir).map_err(OLMError::Io) } fn read(&self, slot: usize) -> Box<[bool]> { self.row(slot) } } diff --git a/src/obilayeredmap/src/layered_store.rs b/src/obilayeredmap/src/layered_store.rs index f25855d..433183e 100644 --- a/src/obilayeredmap/src/layered_store.rs +++ b/src/obilayeredmap/src/layered_store.rs @@ -107,7 +107,7 @@ mod tests { fn make_int_matrix(cols: &[&[u32]]) -> (tempfile::TempDir, PersistentCompactIntMatrix) { let n = cols.first().map_or(0, |c| c.len()); let dir = tempdir().unwrap(); - let mut b = PersistentCompactIntMatrixBuilder::new(n, dir.path()).unwrap(); + let mut b = PersistentCompactIntMatrixBuilder::new(n, &dir.path().join("counts")).unwrap(); for &col in cols { let mut cb = b.add_col().unwrap(); for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); } @@ -121,7 +121,7 @@ mod tests { fn make_bit_matrix(cols: &[&[bool]]) -> (tempfile::TempDir, PersistentBitMatrix) { let n = cols.first().map_or(0, |c| c.len()); let dir = tempdir().unwrap(); - let mut b = PersistentBitMatrixBuilder::new(n, dir.path()).unwrap(); + let mut b = PersistentBitMatrixBuilder::new(n, &dir.path().join("presence")).unwrap(); for &col in cols { let mut cb = b.add_col().unwrap(); for (slot, &v) in col.iter().enumerate() { cb.set(slot, v); } diff --git a/src/obilayeredmap/src/mphf_layer.rs b/src/obilayeredmap/src/mphf_layer.rs index a5e30b7..2661123 100644 --- a/src/obilayeredmap/src/mphf_layer.rs +++ b/src/obilayeredmap/src/mphf_layer.rs @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf}; use cacheline_ef::{CachelineEf, CachelineEfVec}; use epserde::prelude::*; +use obicompactvec::LayerMeta; use obikseq::CanonicalKmer; use obiskio::{CanonicalKmerIter, UnitigFileReader, UnitigFileWriter, build_unitig_idx}; use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64}; @@ -341,6 +342,7 @@ impl MphfLayer { } } + LayerMeta::save(dir, n)?; Ok(n) } } From bfe0cb4b828ae57700766b37350078547fe9576d Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 3 Jun 2026 11:55:11 +0200 Subject: [PATCH 5/6] feat: integrate obikseq to configure global k-mer and minimizer sizes This change adds the `obikseq` crate as a local dependency and inserts `set_k` and `set_m` calls across index creation and command modules. By synchronizing the runtime's global k-mer and minimizer dimensions with the loaded index parameters, downstream sequence processing and partitioning operations now consistently use the correct structural constraints. --- src/Cargo.lock | 1 + src/obikindex/Cargo.toml | 1 + src/obikindex/src/index.rs | 6 ++++++ src/obikmer/src/cmd/distance.rs | 3 --- src/obikmer/src/cmd/dump.rs | 2 -- src/obikmer/src/cmd/index.rs | 3 --- src/obikmer/src/cmd/merge.rs | 3 --- src/obikmer/src/cmd/query.rs | 5 +---- src/obikmer/src/cmd/rebuild.rs | 3 --- src/obikmer/src/cmd/utils.rs | 2 -- 10 files changed, 9 insertions(+), 20 deletions(-) diff --git a/src/Cargo.lock b/src/Cargo.lock index 11f4615..2a23bcf 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1507,6 +1507,7 @@ dependencies = [ "ndarray", "obicompactvec", "obikpartitionner", + "obikseq", "obilayeredmap", "obiskio", "obisys", diff --git a/src/obikindex/Cargo.toml b/src/obikindex/Cargo.toml index 538cffb..d670480 100644 --- a/src/obikindex/Cargo.toml +++ b/src/obikindex/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] +obikseq = { path = "../obikseq" } obikpartitionner = { path = "../obikpartitionner" } obiskio = { path = "../obiskio" } obisys = { path = "../obisys" } diff --git a/src/obikindex/src/index.rs b/src/obikindex/src/index.rs index 6583968..b785c1a 100644 --- a/src/obikindex/src/index.rs +++ b/src/obikindex/src/index.rs @@ -11,6 +11,8 @@ use obisys::{Reporter, Stage}; use rayon::prelude::*; use tracing::info; +use obikseq::{set_k, set_m}; + use crate::error::{OKIError, OKIResult}; use crate::meta::{GenomeInfo, IndexConfig, IndexMeta}; use crate::state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED}; @@ -40,6 +42,8 @@ impl KmerIndex { config.minimizer_size, force, )?; + set_k(config.kmer_size); + set_m(config.minimizer_size); let mut meta = IndexMeta::new(config); if let Some(info) = genome_info { meta.genomes.push(info); @@ -51,6 +55,8 @@ impl KmerIndex { pub fn open>(path: P) -> OKIResult { let root_path = path.as_ref().to_owned(); let meta = IndexMeta::read(&root_path).map_err(OKIError::Io)?; + set_k(meta.config.kmer_size); + set_m(meta.config.minimizer_size); let partition = KmerPartition::open_with_config( &root_path, meta.config.kmer_size, diff --git a/src/obikmer/src/cmd/distance.rs b/src/obikmer/src/cmd/distance.rs index a1813f8..d6d4599 100644 --- a/src/obikmer/src/cmd/distance.rs +++ b/src/obikmer/src/cmd/distance.rs @@ -4,7 +4,6 @@ use std::path::PathBuf; use clap::Args; use kodama::{Method, linkage}; use obikindex::{DistanceMetric, KmerIndex}; -use obikseq::{set_k, set_m}; use speedytree::{DistanceMatrix, Hybrid, NeighborJoiningSolver, to_newick}; use tracing::info; @@ -72,8 +71,6 @@ pub fn run(args: DistanceArgs) { std::process::exit(1); }); - set_k(idx.kmer_size()); - set_m(idx.minimizer_size()); let labels: Vec = idx.meta().genomes.iter().map(|g| g.label.clone()).collect(); let n = labels.len(); diff --git a/src/obikmer/src/cmd/dump.rs b/src/obikmer/src/cmd/dump.rs index f7c324c..c1d56ed 100644 --- a/src/obikmer/src/cmd/dump.rs +++ b/src/obikmer/src/cmd/dump.rs @@ -3,7 +3,6 @@ use std::path::PathBuf; use clap::Args; use obikindex::KmerIndex; -use obikseq::set_k; use tracing::info; #[derive(Args)] @@ -26,7 +25,6 @@ pub fn run(args: DumpArgs) { std::process::exit(1); }); - set_k(idx.kmer_size()); info!( "dumping {} partitions, {} genome(s)", idx.n_partitions(), diff --git a/src/obikmer/src/cmd/index.rs b/src/obikmer/src/cmd/index.rs index 0f5f770..31102fb 100644 --- a/src/obikmer/src/cmd/index.rs +++ b/src/obikmer/src/cmd/index.rs @@ -8,7 +8,6 @@ fn parse_key_value(s: &str) -> Result<(String, String), String> { let pos = s.find('=').ok_or_else(|| format!("invalid key=value: no '=' in '{s}'"))?; Ok((s[..pos].to_string(), s[pos + 1..].to_string())) } -use obikseq::{set_k, set_m}; use obisys::Reporter; use tracing::info; @@ -222,8 +221,6 @@ pub fn run(args: IndexArgs) { }) }; - set_k(idx.kmer_size()); - set_m(idx.minimizer_size()); // ── Stage 1: scatter ───────────────────────────────────────────────────── if idx.state() < IndexState::Scattered { diff --git a/src/obikmer/src/cmd/merge.rs b/src/obikmer/src/cmd/merge.rs index 3442273..9fe0ee4 100644 --- a/src/obikmer/src/cmd/merge.rs +++ b/src/obikmer/src/cmd/merge.rs @@ -2,7 +2,6 @@ use std::path::PathBuf; use clap::Args; use obikindex::{KmerIndex, MergeMode}; -use obikseq::{set_k, set_m}; use obisys::Reporter; use tracing::info; @@ -53,8 +52,6 @@ pub fn run(args: MergeArgs) { let source_refs: Vec<&KmerIndex> = sources.iter().collect(); - set_k(sources[0].kmer_size()); - set_m(sources[0].minimizer_size()); let n_genomes: usize = sources.iter().map(|s| s.meta().genomes.len()).sum(); info!( diff --git a/src/obikmer/src/cmd/query.rs b/src/obikmer/src/cmd/query.rs index 6a06156..10662da 100644 --- a/src/obikmer/src/cmd/query.rs +++ b/src/obikmer/src/cmd/query.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use clap::Args; use obikindex::KmerIndex; use obikrope::Rope; -use obikseq::{RoutableSuperKmer, set_k, set_m}; +use obikseq::RoutableSuperKmer; use obilayeredmap::IndexMode; use obiread::chunk::read_sequence_chunks_sized; use obiread::record::{SeqRecord, parse_chunk}; @@ -427,9 +427,6 @@ pub fn run(args: QueryArgs) { std::process::exit(1); })); - set_k(idx.kmer_size()); - set_m(idx.minimizer_size()); - let k = idx.kmer_size(); let n_genomes = idx.meta().genomes.len(); let n_partitions = idx.n_partitions(); diff --git a/src/obikmer/src/cmd/rebuild.rs b/src/obikmer/src/cmd/rebuild.rs index f172811..8d24d93 100644 --- a/src/obikmer/src/cmd/rebuild.rs +++ b/src/obikmer/src/cmd/rebuild.rs @@ -7,7 +7,6 @@ use obikpartitionner::filter::{ MinGenomeCount, MinGenomeFraction, MinTotalCount, }; use obisys::Reporter; -use obikseq::{set_k, set_m}; use tracing::info; #[derive(Args)] @@ -62,8 +61,6 @@ pub fn run(args: RebuildArgs) { std::process::exit(1); }); - set_k(src.kmer_size()); - set_m(src.minimizer_size()); let n_genomes = src.meta().genomes.len(); let mode = if args.presence || !src.meta().config.with_counts { diff --git a/src/obikmer/src/cmd/utils.rs b/src/obikmer/src/cmd/utils.rs index 3bbaf4f..c5fa3e7 100644 --- a/src/obikmer/src/cmd/utils.rs +++ b/src/obikmer/src/cmd/utils.rs @@ -2,7 +2,6 @@ use std::path::PathBuf; use clap::Args; use obikindex::{validate_label, KmerIndex}; -use obikseq::set_k; use tracing::info; #[derive(Args)] @@ -43,7 +42,6 @@ fn run_upgrade_index(index_path: &PathBuf) { eprintln!("error opening index: {e}"); std::process::exit(1); }); - set_k(idx.kmer_size()); idx.upgrade_layer_meta().unwrap_or_else(|e| { eprintln!("upgrade error: {e}"); std::process::exit(1); From bba5147f0faf159a015e63b464eb77c55cb9939a Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 3 Jun 2026 15:10:15 +0200 Subject: [PATCH 6/6] fix: account for k-mer overlap in total_bases calculation Introduces a `kmer_overlap` variable (`k-1`) and modifies the `total_bases` accumulation to subtract this overlap from each sequence's length. This ensures the base count accurately reflects only valid k-mer starting positions rather than raw sequence length. --- src/obikmer/src/steps/scatter.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/obikmer/src/steps/scatter.rs b/src/obikmer/src/steps/scatter.rs index 80d789e..cb5a14b 100644 --- a/src/obikmer/src/steps/scatter.rs +++ b/src/obikmer/src/steps/scatter.rs @@ -95,10 +95,13 @@ pub fn scatter( let mut ema_rate: f64 = 0.0; let mut last_t = Instant::now(); let mut last_bases: u64 = 0; + let kmer_overlap = (k - 1) as u64; const ALPHA: f64 = 0.15; for batch in pipe.apply(throttled, n_workers, 1) { - total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::(); + total_bases += batch.iter() + .map(|sk| (sk.seql() as u64).saturating_sub(kmer_overlap)) + .sum::(); let now = Instant::now(); let dt = now.duration_since(last_t).as_secs_f64(); if dt > 0.1 {