From e1d59fde54b735eb799d4cb141a2f489482d99a9 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Thu, 21 May 2026 05:53:55 +0200 Subject: [PATCH] feat: add merge command to consolidate k-mer indexes Introduces a new `merge` CLI subcommand and underlying implementation to consolidate multiple pre-indexed k-mer indexes into a single output. Adds `append_column` methods to persistent bit and int matrices to enable incremental genome column expansion without rebuilding the MPHF. Includes new error variants for index readiness and configuration mismatches, adds a `--force` flag to the index command, and updates documentation and navigation structure accordingly. --- docmd/architecture/index_architecture.md | 27 +++ docmd/implementation/merge.md | 133 ++++++++++ docmd/implementation/obilayeredmap.md | 54 +++++ mkdocs.yml | 1 + src/obicompactvec/src/bitmatrix.rs | 20 ++ src/obicompactvec/src/intmatrix.rs | 20 ++ src/obikindex/src/error.rs | 16 +- src/obikindex/src/index.rs | 6 +- src/obikindex/src/lib.rs | 2 + src/obikindex/src/merge.rs | 131 ++++++++++ src/obikmer/src/cmd/index.rs | 11 +- src/obikmer/src/cmd/merge.rs | 47 ++++ src/obikmer/src/cmd/mod.rs | 1 + src/obikmer/src/main.rs | 3 + src/obikpartitionner/src/lib.rs | 2 + src/obikpartitionner/src/merge_layer.rs | 294 +++++++++++++++++++++++ src/obilayeredmap/src/layer.rs | 39 +++ 17 files changed, 799 insertions(+), 8 deletions(-) create mode 100644 docmd/implementation/merge.md create mode 100644 src/obikindex/src/merge.rs create mode 100644 src/obikmer/src/cmd/merge.rs create mode 100644 src/obikpartitionner/src/merge_layer.rs diff --git a/docmd/architecture/index_architecture.md b/docmd/architecture/index_architecture.md index cbc1083..0915a9c 100644 --- a/docmd/architecture/index_architecture.md +++ b/docmd/architecture/index_architecture.md @@ -327,3 +327,30 @@ Other derivations: threshold a count matrix → binary presence matrix; union tw 2. Replace `LayerData` trait with the `DataStore` / `ColumnWeights` / `CountPartials` / `BitPartials` system. 3. Rewire `LayeredMap` to hold `LayeredStore` (or bit variant) alongside the MPHF layers. 4. Implement `PartitionedIndex` using `LayeredStore>` for data and parallel dispatch for queries. + +--- + +## Multi-genome column invariant + +### The invariant + +After any merge operation, **every layer in every partition has exactly `n_genomes` columns**, where `n_genomes` is the current total genome count recorded in `index.meta`. This applies to both `PersistentCompactIntMatrix` (Count mode) and `PersistentBitMatrix` (Presence mode). + +### How it is maintained + +The invariant is established and preserved by three coordinated operations: + +**1. Existing layers — column append.** +When merging source genome G into an existing index with `n_existing_genomes` genomes, one column is appended to every existing layer via `append_genome_column`. Slots that contain a kmer present in source G receive its count or `true`; all other slots receive 0 or `false`. After this step, every pre-existing layer has `n_existing_genomes + 1` columns. + +**2. New layers — absent columns prepended.** +If source G introduces kmers not found in any existing layer, a new layer is created for those kmers. Before appending source G's own column, `n_existing_genomes` absent columns (all-zero or all-false) are prepended — one per genome already in the index. This ensures the new layer starts at the same column count as every other layer in the partition immediately after creation. + +**3. First merge, Presence mode — `init_presence_matrix`.** +The initial single-genome index has no `presence/` directory (presence is implicit: every kmer in the index is present in genome 0). On the first merge, before appending any column for source 1, `Layer<()>::init_presence_matrix` creates `presence/col_000000.pbiv` set entirely to `true` for each existing layer. This retroactively materialises genome 0's presence column, bringing the column count from 0 to 1 so that the subsequent append for source 1 raises it to 2. + +### Why the invariant is required + +The `LayeredStore` aggregation traits (`col_weights`, `partial_bray`, `partial_jaccard`, etc.) sum contributions across all `(partition, layer)` pairs without any shape check. If one layer had fewer columns than others, its contribution would silently produce a malformed result — wrong column sums, wrong distance matrices, and incorrect genome-level statistics. + +The invariant is the precondition that makes the progressive aggregation principle correct: each level can blindly sum matrices from the level below because all matrices have the same shape. diff --git a/docmd/implementation/merge.md b/docmd/implementation/merge.md new file mode 100644 index 0000000..8a51609 --- /dev/null +++ b/docmd/implementation/merge.md @@ -0,0 +1,133 @@ +# Merge command + +## Purpose + +`obikmer merge` combines multiple existing kmer indexes into a single index. The result contains all kmers from all sources, with per-genome presence/absence or count data for every genome across every layer. + +--- + +## Modes + +```rust +pub enum MergeMode { Presence, Count } +``` + +Default mode is `Presence`. `Count` mode requires **all** source indexes to have `with_counts=true`; mixing count and non-count sources is rejected at validation. + +| Mode | Column type | Constraint | +|---|---|---| +| `Presence` | `PersistentBitMatrix` (one bit per genome per slot) | none | +| `Count` | `PersistentCompactIntMatrix` (one u32 per genome per slot) | all sources `with_counts=true` | + +--- + +## Input / output constraints + +All source indexes must satisfy: + +- `IndexState::Indexed` (fully built — `index.done` sentinel present) +- Same `kmer_size`, `minimizer_size`, `n_bits` +- If `Count` mode: all sources must have `with_counts=true` + +`--force`: if the output directory already exists, it is deleted before the merge begins. + +--- + +## Algorithm + +### 1. Validation + +Check all sources against the constraints above. Abort on any mismatch. + +### 2. Bootstrap output from first source + +Recursive file copy of `sources[0]` → `output`. This establishes the partition layout, all existing MPHFs, unitigs, and evidence files. The first source's genome is genome 0 in the output. + +### 3. For each subsequent source (parallel across partitions) + +For each partition, process one source at a time sequentially: + +**a. Classify kmers** + +Iterate all kmers in the source partition (via `UnitigFileReader` + canonical kmer iteration). For each kmer, probe the destination `LayeredMap<()>`: + +- **Hit** `(dst_layer, slot)`: record `(dst_layer, slot, value)` where `value` is the source count (Count mode) or `1` (Presence mode). +- **Miss**: add kmer to a `GraphDeBruijn` accumulator; record its value in a `HashMap>`. + +**b. Extend existing layers** + +For each existing layer in the destination partition, call `append_genome_column` once per source genome being merged. Slots that received a hit are populated with their recorded value; all other slots receive 0 (absent). + +If this is the **first merge** and operating in Presence mode, call `Layer<()>::init_presence_matrix` before appending any source column. This creates `presence/` with `col_000000.pbiv` set all-true (genome 0 is present in every slot of this layer). + +**c. Build new layer for new kmers** + +If the `GraphDeBruijn` accumulator is non-empty (misses exist), write compact unitigs from the de Bruijn graph, then call the appropriate `Layer::build` variant. Before appending the source's own column, prepend `n_existing_genomes` absent columns (value 0) — one per genome already in the index — so the column count invariant holds immediately after layer creation. + +**d. Update partition metadata** + +Write updated `meta.json` with the incremented `n_layers` if a new layer was added. + +### 4. Update index metadata + +Append the merged source's genome label(s) to `index.meta.genomes`. Write updated `index.meta`. + +--- + +## Column count invariant + +After any merge, **every layer in every partition has exactly `n_genomes` columns**, where `n_genomes` is the total genome count in the index at that point. + +This is maintained by three mechanisms: + +1. **Existing layers**: one column appended per source genome (`append_genome_column`). +2. **New layers created during merge**: `n_existing_genomes` absent columns prepended before the source's own column. +3. **First merge, Presence mode**: `init_presence_matrix` retroactively creates `presence/col_0` all-true for genome 0 before any source column is appended. + +The invariant is a precondition of the `LayeredStore` aggregation traits: `col_weights()` and all partial distance methods assume every inner store has the same column count. + +--- + +## New layer construction + +All kmers absent from the destination index — across **all** sources being merged — accumulate into a **single** `GraphDeBruijn` per partition. One new layer is built from this graph, not one layer per source. This keeps the layer count bounded and preserves compact unitig representation. + +De Bruijn reconstruction ensures that overlapping k-1 suffixes/prefixes from different source kmers are merged into minimal unitigs before MPHF construction. + +--- + +## On-disk impact + +After merging `G` genomes (1 existing + G-1 new sources): + +``` +partitions/ + part_00000/ + index/ + meta.json ← n_layers updated if new layer added + layer_0/ + mphf.bin ← unchanged (MPHF immutable) + unitigs.bin ← unchanged + evidence.bin ← unchanged + presence/ ← created on first merge (Presence mode) + meta.json {"n": N, "n_cols": G} + col_000000.pbiv ← all-true (genome 0) + col_000001.pbiv ← source 1 presence + ... + counts/ ← extended (Count mode) + meta.json {"n": N, "n_cols": G} + col_000000.pciv ← genome 0 counts (from original build) + col_000001.pciv ← source 1 counts + ... + layer_1/ ← new layer (if new kmers found) + mphf.bin + unitigs.bin + evidence.bin + presence/ or counts/ + meta.json {"n": N1, "n_cols": G} + col_000000.pbiv ← all-false (genome 0 absent from this layer) + ... + col_000001.pbiv ← source 1 presence in this new layer +``` + +The `index.meta` genome list grows from 1 entry to G entries after all sources are merged. diff --git a/docmd/implementation/obilayeredmap.md b/docmd/implementation/obilayeredmap.md index cec43a2..d5ad7d2 100644 --- a/docmd/implementation/obilayeredmap.md +++ b/docmd/implementation/obilayeredmap.md @@ -255,3 +255,57 @@ Each partition's new layer is built independently; the operation is fully parall | `obicompactvec` | payload types + aggregation traits | | `rayon 1` | parallel MPHF construction pass | | `ndarray 0.16` | aggregation output arrays | + +--- + +## Column append and merge support + +These methods extend existing layers with new genome columns without touching the MPHF. They are the building blocks of the `merge` command. + +### Matrix column append + +```rust +impl PersistentCompactIntMatrix { + pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> OLMResult<()> +} + +impl PersistentBitMatrix { + pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> bool) -> OLMResult<()> +} +``` + +Both methods write a new column file (`col_NNNNNN.pciv` / `col_NNNNNN.pbiv`) and update `meta.json` to increment `n_cols`. The `value_of` closure is called once per slot (indexed 0..n) to populate the column. The matrix `n` (row count) is read from the existing `meta.json` and must not change. + +### Presence matrix initialisation + +```rust +impl Layer<()> { + pub fn init_presence_matrix(layer_dir: &Path, n_kmers: usize) -> OLMResult<()> +} +``` + +Called on the first merge of a Presence-mode index. Creates the `presence/` subdirectory with `meta.json {"n": n_kmers, "n_cols": 1}` and `col_000000.pbiv` set entirely to `true`. This retroactively records that genome 0 (the original source) is present in every slot of this layer, satisfying the column count invariant before any new-source column is appended. + +### Layer-level genome column append + +```rust +impl Layer { + pub fn append_genome_column( + layer_dir: &Path, + value_of: impl Fn(usize) -> bool, + ) -> OLMResult<()> +} + +impl Layer { + pub fn append_genome_column( + layer_dir: &Path, + value_of: impl Fn(usize) -> u32, + ) -> OLMResult<()> +} +``` + +These delegate directly to the corresponding `PersistentBitMatrix::append_column` / `PersistentCompactIntMatrix::append_column`. They are typed at the `Layer` level to make call sites mode-aware without exposing the inner matrix path construction. + +### Why the MPHF is never rebuilt + +The MPHF (`mphf.bin`, `evidence.bin`, `unitigs.bin`) is built once from the kmer set of a layer and is immutable for the lifetime of that layer. Adding a genome column does not change the kmer set — it only adds a new data column indexed by the same slot numbers. Rebuilding the MPHF would require re-running the full construction pipeline (two sequential passes over unitigs, parallel ptr_hash construction) and would invalidate any open memory maps. Column append avoids all of this: the only disk writes are one new `.pciv`/`.pbiv` file and a single `meta.json` update. Kmers absent from a given layer are represented as zero (count) or false (presence) values in the new column — no structural change to the layer is required. diff --git a/mkdocs.yml b/mkdocs.yml index af78836..9f2ea83 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -47,6 +47,7 @@ nav: - obilayeredmap crate: implementation/obilayeredmap.md - PersistentCompactIntVec: implementation/persistent_compact_int_vec.md - PersistentBitVec: implementation/persistent_bit_vec.md + - Merge command: implementation/merge.md - Architecture: - Sequences: architecture/sequences/invariant.md - Kmer index: architecture/index_architecture.md diff --git a/src/obicompactvec/src/bitmatrix.rs b/src/obicompactvec/src/bitmatrix.rs index df823d1..b7305ef 100644 --- a/src/obicompactvec/src/bitmatrix.rs +++ b/src/obicompactvec/src/bitmatrix.rs @@ -101,6 +101,26 @@ impl PersistentBitMatrix { } } +// ── Column append ───────────────────────────────────────────────────────────── + +impl PersistentBitMatrix { + /// Append a new column to an existing matrix on disk. + /// + /// Reads `meta.json` to obtain `n` and the current column count, writes + /// `col_{n_cols:06}.pbiv` filled by `value_of(slot)`, then increments + /// `n_cols` in `meta.json`. + pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> bool) -> io::Result<()> { + let mut meta = MatrixMeta::load(dir)?; + let mut b = PersistentBitVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?; + for slot in 0..meta.n { + b.set(slot, value_of(slot)); + } + b.close()?; + meta.n_cols += 1; + meta.save(dir) + } +} + fn upper_pairs(n: usize) -> Vec<(usize, usize)> { (0..n).flat_map(|i| (i + 1..n).map(move |j| (i, j))).collect() } diff --git a/src/obicompactvec/src/intmatrix.rs b/src/obicompactvec/src/intmatrix.rs index c77a135..a9aae75 100644 --- a/src/obicompactvec/src/intmatrix.rs +++ b/src/obicompactvec/src/intmatrix.rs @@ -203,6 +203,26 @@ where m } +// ── Column append ───────────────────────────────────────────────────────────── + +impl PersistentCompactIntMatrix { + /// Append a new column to an existing matrix on disk. + /// + /// Reads `meta.json` to obtain `n` and the current column count, writes + /// `col_{n_cols:06}.pciv` filled by `value_of(slot)`, then increments + /// `n_cols` in `meta.json`. + pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> io::Result<()> { + let mut meta = MatrixMeta::load(dir)?; + let mut b = PersistentCompactIntVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?; + for slot in 0..meta.n { + b.set(slot, value_of(slot)); + } + b.close()?; + meta.n_cols += 1; + meta.save(dir) + } +} + // ── Trait impls ─────────────────────────────────────────────────────────────── use crate::traits::{ColumnWeights, CountPartials}; diff --git a/src/obikindex/src/error.rs b/src/obikindex/src/error.rs index df191ed..74082fe 100644 --- a/src/obikindex/src/error.rs +++ b/src/obikindex/src/error.rs @@ -8,6 +8,12 @@ pub enum OKIError { Io(io::Error), Json(serde_json::Error), Partition(SKError), + /// Source index is not in `Indexed` state. + NotIndexed(std::path::PathBuf), + /// Source indexes have incompatible configurations (k, m, n_bits). + IncompatibleConfig, + /// Count mode requested but a source index lacks count data. + MismatchedMode, } pub type OKIResult = Result; @@ -15,9 +21,12 @@ pub type OKIResult = Result; impl fmt::Display for OKIError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - OKIError::Io(e) => write!(f, "I/O error: {e}"), - OKIError::Json(e) => write!(f, "JSON error: {e}"), - OKIError::Partition(e) => write!(f, "partition error: {e}"), + OKIError::Io(e) => write!(f, "I/O error: {e}"), + OKIError::Json(e) => write!(f, "JSON error: {e}"), + OKIError::Partition(e) => write!(f, "partition error: {e}"), + OKIError::NotIndexed(p) => write!(f, "index not fully built: {}", p.display()), + OKIError::IncompatibleConfig => write!(f, "incompatible index configurations"), + OKIError::MismatchedMode => write!(f, "count mode requires all sources to have with_counts=true"), } } } @@ -28,6 +37,7 @@ impl std::error::Error for OKIError { OKIError::Io(e) => Some(e), OKIError::Json(e) => Some(e), OKIError::Partition(e) => Some(e), + _ => None, } } } diff --git a/src/obikindex/src/index.rs b/src/obikindex/src/index.rs index c5ba267..16c1154 100644 --- a/src/obikindex/src/index.rs +++ b/src/obikindex/src/index.rs @@ -15,9 +15,9 @@ use crate::meta::{IndexConfig, IndexMeta}; use crate::state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED}; pub struct KmerIndex { - root_path: PathBuf, - meta: IndexMeta, - partition: KmerPartition, + pub(crate) root_path: PathBuf, + pub(crate) meta: IndexMeta, + pub(crate) partition: KmerPartition, } impl KmerIndex { diff --git a/src/obikindex/src/lib.rs b/src/obikindex/src/lib.rs index 91a728a..4dcf4ff 100644 --- a/src/obikindex/src/lib.rs +++ b/src/obikindex/src/lib.rs @@ -2,8 +2,10 @@ pub mod error; pub mod meta; pub mod state; mod index; +mod merge; pub use error::{OKIError, OKIResult}; pub use index::KmerIndex; +pub use merge::MergeMode; pub use meta::{IndexConfig, IndexMeta, META_FILENAME}; pub use state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED}; diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs new file mode 100644 index 0000000..be71a02 --- /dev/null +++ b/src/obikindex/src/merge.rs @@ -0,0 +1,131 @@ +use std::fs; +use std::io; +use std::path::Path; + +use obisys::{Reporter, Stage}; +use rayon::prelude::*; +use tracing::info; + +use crate::error::{OKIError, OKIResult}; +use crate::index::KmerIndex; +use crate::meta::IndexMeta; +use crate::state::IndexState; + +pub use obikpartitionner::MergeMode; + +impl KmerIndex { + /// Merge `sources` into a new index at `output`. + /// + /// All sources must be in `Indexed` state and share the same `kmer_size`, + /// `minimizer_size`, and `n_partitions`. Count mode additionally requires + /// every source to have `with_counts = true`. + /// + /// The first source is copied to `output`, then each subsequent source is + /// merged partition-by-partition in parallel. + pub fn merge>( + output: P, + sources: &[&KmerIndex], + mode: MergeMode, + force: bool, + ) -> OKIResult { + let output = output.as_ref(); + + if sources.is_empty() { + return Err(OKIError::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "merge requires at least one source index", + ))); + } + + // ── Validate ────────────────────────────────────────────────────────── + let ref0 = sources[0]; + for src in sources { + if src.state() != IndexState::Indexed { + return Err(OKIError::NotIndexed(src.root_path.clone())); + } + if src.kmer_size() != ref0.kmer_size() + || src.minimizer_size() != ref0.minimizer_size() + || src.n_partitions() != ref0.n_partitions() + { + return Err(OKIError::IncompatibleConfig); + } + if mode == MergeMode::Count && !src.meta.config.with_counts { + return Err(OKIError::MismatchedMode); + } + } + + // ── Prepare output directory ────────────────────────────────────────── + if output.exists() { + if force { + fs::remove_dir_all(output)?; + } else { + return Err(OKIError::Io(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("{}: output directory already exists", output.display()), + ))); + } + } + + // ── Bootstrap: copy first source to output ──────────────────────────── + info!("copying {} → {}", sources[0].root_path.display(), output.display()); + copy_dir_all(&sources[0].root_path, output)?; + + // Rewrite index.meta with all genome labels. + let all_genomes: Vec = sources + .iter() + .flat_map(|s| s.meta.genomes.iter().cloned()) + .collect(); + let mut meta = IndexMeta::read(output).map_err(OKIError::Io)?; + meta.genomes = all_genomes; + meta.write(output)?; + + // Open the destination index. + let dst = KmerIndex::open(output)?; + let n_partitions = dst.n_partitions(); + + // ── Merge each subsequent source partition-by-partition ─────────────── + let remaining_sources: Vec<&KmerIndex> = sources[1..].to_vec(); + if !remaining_sources.is_empty() { + let mut rep = Reporter::new(); + let t = Stage::start("merge_partitions"); + + let dst_partition = &dst.partition; + + let errors: Vec = (0..n_partitions) + .into_par_iter() + .filter_map(|i| { + let srcs: Vec<&obikpartitionner::KmerPartition> = + remaining_sources.iter().map(|s| &s.partition).collect(); + // n_dst_genomes = 1 (copied from source_0 only) + dst_partition.merge_partition(i, &srcs, mode, 1).err() + }) + .collect(); + + if let Some(e) = errors.into_iter().next() { + return Err(OKIError::Partition(e)); + } + + rep.push(t.stop()); + } + + // Re-open to get the updated state. + KmerIndex::open(output) + } +} + +// ── Directory copy ──────────────────────────────────────────────────────────── + +fn copy_dir_all(src: &Path, dst: &Path) -> io::Result<()> { + fs::create_dir_all(dst)?; + for entry in fs::read_dir(src)? { + let entry = entry?; + let src_path = entry.path(); + let dst_path = dst.join(entry.file_name()); + if src_path.is_dir() { + copy_dir_all(&src_path, &dst_path)?; + } else { + fs::copy(&src_path, &dst_path)?; + } + } + Ok(()) +} diff --git a/src/obikmer/src/cmd/index.rs b/src/obikmer/src/cmd/index.rs index 9f9bad2..ccfa487 100644 --- a/src/obikmer/src/cmd/index.rs +++ b/src/obikmer/src/cmd/index.rs @@ -48,20 +48,27 @@ pub fn run(args: IndexArgs) { let mut rep = Reporter::new(); // ── Open or create the index ───────────────────────────────────────────── - let mut idx = if KmerIndex::exists(&output) { + let mut idx = if KmerIndex::exists(&output) && !args.force { info!("resuming from existing index at {}", output.display()); KmerIndex::open(&output).unwrap_or_else(|e| { eprintln!("error opening index: {e}"); std::process::exit(1); }) } else { + if args.force && KmerIndex::exists(&output) { + info!("--force: removing existing index at {}", output.display()); + std::fs::remove_dir_all(&output).unwrap_or_else(|e| { + eprintln!("error removing existing index: {e}"); + std::process::exit(1); + }); + } let config = IndexConfig { kmer_size: args.common.kmer_size, minimizer_size: args.common.minimizer_size, n_bits: args.common.partition_bits, with_counts: args.with_counts, }; - KmerIndex::create(&output, config, args.label.clone(), args.force).unwrap_or_else(|e| { + KmerIndex::create(&output, config, args.label.clone(), false).unwrap_or_else(|e| { eprintln!("error creating index: {e}"); std::process::exit(1); }) diff --git a/src/obikmer/src/cmd/merge.rs b/src/obikmer/src/cmd/merge.rs new file mode 100644 index 0000000..0109617 --- /dev/null +++ b/src/obikmer/src/cmd/merge.rs @@ -0,0 +1,47 @@ +use std::path::PathBuf; + +use clap::Args; +use obikindex::{KmerIndex, MergeMode}; +use obisys::Reporter; +use tracing::info; + +#[derive(Args)] +pub struct MergeArgs { + /// Source index directories to merge + #[arg(required = true)] + pub sources: Vec, + + /// Output index directory + #[arg(short, long)] + pub output: PathBuf, + + /// Overwrite output directory if it already exists + #[arg(long, default_value_t = false)] + pub force: bool, + + /// Merge in count mode (requires all sources to have with_counts=true) + #[arg(long, default_value_t = false)] + pub with_counts: bool, +} + +pub fn run(args: MergeArgs) { + let mode = if args.with_counts { MergeMode::Count } else { MergeMode::Presence }; + + let sources: Vec = args.sources.iter().map(|p| { + info!("opening source index: {}", p.display()); + KmerIndex::open(p).unwrap_or_else(|e| { + eprintln!("error opening source index {}: {e}", p.display()); + std::process::exit(1); + }) + }).collect(); + + let source_refs: Vec<&KmerIndex> = sources.iter().collect(); + + info!("merging {} indexes → {}", sources.len(), args.output.display()); + let rep = Reporter::new(); + KmerIndex::merge(&args.output, &source_refs, mode, args.force).unwrap_or_else(|e| { + eprintln!("error merging: {e}"); + std::process::exit(1); + }); + rep.print(); +} diff --git a/src/obikmer/src/cmd/mod.rs b/src/obikmer/src/cmd/mod.rs index 1d2188e..aaccdbf 100644 --- a/src/obikmer/src/cmd/mod.rs +++ b/src/obikmer/src/cmd/mod.rs @@ -1,3 +1,4 @@ pub mod index; +pub mod merge; pub mod superkmer; pub mod unitig; diff --git a/src/obikmer/src/main.rs b/src/obikmer/src/main.rs index 470da38..52c5953 100644 --- a/src/obikmer/src/main.rs +++ b/src/obikmer/src/main.rs @@ -18,6 +18,8 @@ enum Commands { Superkmer(cmd::superkmer::SuperkmerArgs), /// Build the complete genome index (scatter → dereplicate → count → layered MPHF) Index(cmd::index::IndexArgs), + /// Merge multiple built indexes into one + Merge(cmd::merge::MergeArgs), /// Dump unitigs from a built index to stdout (debug) Unitig(cmd::unitig::UnitigArgs), } @@ -43,6 +45,7 @@ fn main() { match cli.command { Commands::Superkmer(args) => cmd::superkmer::run(args), Commands::Index(args) => cmd::index::run(args), + Commands::Merge(args) => cmd::merge::run(args), Commands::Unitig(args) => cmd::unitig::run(args), } diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index 2e94e56..672c118 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -1,5 +1,7 @@ mod index_layer; mod kmer_sort; +mod merge_layer; mod partition; +pub use merge_layer::MergeMode; pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR}; diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs new file mode 100644 index 0000000..7d77c45 --- /dev/null +++ b/src/obikpartitionner/src/merge_layer.rs @@ -0,0 +1,294 @@ +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; + +use obidebruinj::GraphDeBruijn; +use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder, + PersistentBitVecBuilder, + PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, + PersistentCompactIntVecBuilder}; +use obiskio::{SKError, SKResult, UnitigFileReader}; +use obilayeredmap::{Layer, LayeredMap, MphfLayer, OLMError}; +use obilayeredmap::meta::PartitionMeta; + +use crate::partition::KmerPartition; + +// ── MergeMode ───────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MergeMode { Presence, Count } + +// ── ColBuilder — enum dispatch to avoid trait-object boxing issues ───────────── + +enum ColBuilder { + Bit(PersistentBitVecBuilder), + Int(PersistentCompactIntVecBuilder), +} + +impl ColBuilder { + fn set_val(&mut self, slot: usize, value: u32) { + match self { + ColBuilder::Bit(b) => b.set(slot, value > 0), + ColBuilder::Int(b) => b.set(slot, value), + } + } + + fn close(self) -> SKResult<()> { + match self { + ColBuilder::Bit(b) => b.close().map_err(SKError::Io), + ColBuilder::Int(b) => b.close().map_err(SKError::Io), + } + } +} + +// ── helpers ─────────────────────────────────────────────────────────────────── + +const INDEX_SUBDIR: &str = "index"; + +fn olm_to_sk(e: OLMError) -> SKError { + match e { + OLMError::Io(e) => SKError::Io(e), + other => SKError::InvalidData { context: "merge", detail: other.to_string() }, + } +} + +fn col_path_bit(dir: &Path, col: usize) -> PathBuf { + dir.join(format!("col_{col:06}.pbiv")) +} + +fn col_path_int(dir: &Path, col: usize) -> PathBuf { + dir.join(format!("col_{col:06}.pciv")) +} + +fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { + fs::write(dir.join("meta.json"), format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n")) +} + +// ── KmerPartition::merge_partition ──────────────────────────────────────────── + +impl KmerPartition { + /// Merge `sources` into destination partition `i`. + /// + /// `n_dst_genomes` is the number of genome columns already in the dst + /// matrices (1 after copying source_0, more for subsequent merges). + /// + /// Two-pass algorithm: + /// 1. Classify each source kmer as dst-hit or new → build de Bruijn graph + /// of new kmers → write unitigs → build MPHF for the new layer. + /// 2. Iterate source kmers again → fill per-genome column builders + /// (memory-mapped) → close → update matrix metadata. + pub fn merge_partition( + &self, + i: usize, + sources: &[&KmerPartition], + mode: MergeMode, + n_dst_genomes: usize, + ) -> SKResult<()> { + let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); + if !dst_index_dir.exists() { + return Ok(()); + } + + let dst_map = LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?; + let n_dst_layers = dst_map.n_layers(); + let n_src = sources.len(); + + // First merge in presence mode: init presence matrices on existing layers + // (all slots true — every kmer in a layer belongs to genome_0). + if n_dst_genomes == 1 && mode == MergeMode::Presence { + for l in 0..n_dst_layers { + let layer_dir = dst_index_dir.join(format!("layer_{l}")); + Layer::<()>::init_presence_matrix(&layer_dir, dst_map.layer(l).n()) + .map_err(olm_to_sk)?; + } + } + + // ── Pass 1: classify kmers, build new-kmer de Bruijn graph ─────────── + let mut g = GraphDeBruijn::new(); + let mut any_new = false; + + for src in sources.iter() { + let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); + if !src_index_dir.exists() { continue; } + let src_meta = PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)?; + + for l in 0..src_meta.n_layers { + let unitigs_path = src_index_dir + .join(format!("layer_{l}")).join("unitigs.bin"); + let reader = UnitigFileReader::open(&unitigs_path)?; + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + if dst_map.query(kmer).is_none() { + g.push(kmer); + any_new = true; + } + } + } + } + + // Build new layer from de Bruijn graph if there are new kmers. + let new_layer_idx = n_dst_layers; + let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); + + if any_new { + g.compute_degrees(); + fs::create_dir_all(&new_layer_dir)?; + let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; + for unitig in g.iter_unitig() { + uw.write(&unitig)?; + } + uw.close()?; + Layer::<()>::build(&new_layer_dir).map_err(olm_to_sk)?; + } + drop(g); + + let new_mphf = if any_new { + Some(MphfLayer::open(&new_layer_dir).map_err(olm_to_sk)?) + } else { + None + }; + let n_new = new_mphf.as_ref().map_or(0, |m| m.n()); + + // ── Prepare matrix directories for the new layer ────────────────────── + // Absent columns (dst genomes) are written now via append_column (all-zero/false). + // Source-genome columns are created as mutable builders for pass 2. + let mut new_src_builders: Vec = if any_new { + let data_dir = match mode { + MergeMode::Presence => new_layer_dir.join("presence"), + MergeMode::Count => new_layer_dir.join("counts"), + }; + fs::create_dir_all(&data_dir)?; + // Bootstrap meta with 0 cols. + match mode { + MergeMode::Presence => { + PersistentBitMatrixBuilder::new(n_new, &data_dir) + .map_err(SKError::Io)?.close().map_err(SKError::Io)?; + for _ in 0..n_dst_genomes { + PersistentBitMatrix::append_column(&data_dir, |_| false) + .map_err(SKError::Io)?; + } + (0..n_src).map(|g| -> SKResult { + let b = PersistentBitVecBuilder::new( + n_new, &col_path_bit(&data_dir, n_dst_genomes + g))?; + Ok(ColBuilder::Bit(b)) + }).collect::>()? + } + MergeMode::Count => { + PersistentCompactIntMatrixBuilder::new(n_new, &data_dir) + .map_err(SKError::Io)?.close().map_err(SKError::Io)?; + for _ in 0..n_dst_genomes { + PersistentCompactIntMatrix::append_column(&data_dir, |_| 0) + .map_err(SKError::Io)?; + } + (0..n_src).map(|g| -> SKResult { + let b = PersistentCompactIntVecBuilder::new( + n_new, &col_path_int(&data_dir, n_dst_genomes + g))?; + Ok(ColBuilder::Int(b)) + }).collect::>()? + } + } + } else { + vec![] + }; + + // Builders for existing layers: one per (layer, src_genome). + // Invariant: existing layers already have exactly n_dst_genomes columns. + // New source columns go at positions n_dst_genomes .. n_dst_genomes+n_src-1. + let mut exist_builders: Vec> = (0..n_dst_layers) + .map(|l| { + let layer_dir = dst_index_dir.join(format!("layer_{l}")); + let n = dst_map.layer(l).n(); + (0..n_src).map(|src_g| -> SKResult { + match mode { + MergeMode::Presence => { + let data_dir = layer_dir.join("presence"); + let b = PersistentBitVecBuilder::new( + n, &col_path_bit(&data_dir, n_dst_genomes + src_g))?; + Ok(ColBuilder::Bit(b)) + } + MergeMode::Count => { + let data_dir = layer_dir.join("counts"); + let b = PersistentCompactIntVecBuilder::new( + n, &col_path_int(&data_dir, n_dst_genomes + src_g))?; + Ok(ColBuilder::Int(b)) + } + } + }).collect::>() + }) + .collect::>()?; + + // ── Pass 2: fill builders ───────────────────────────────────────────── + for (src_g, src) in sources.iter().enumerate() { + let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); + if !src_index_dir.exists() { continue; } + let src_meta = PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)?; + + for l in 0..src_meta.n_layers { + let src_layer_dir = src_index_dir.join(format!("layer_{l}")); + let reader = UnitigFileReader::open(&src_layer_dir.join("unitigs.bin"))?; + + // Open source MPHF + count matrix for count mode. + let src_count_data: Option<(MphfLayer, PersistentCompactIntMatrix)> = + if mode == MergeMode::Count { + let counts_dir = src_layer_dir.join("counts"); + if counts_dir.exists() { + let mphf = MphfLayer::open(&src_layer_dir).map_err(olm_to_sk)?; + let mat = PersistentCompactIntMatrix::open(&counts_dir) + .map_err(SKError::Io)?; + Some((mphf, mat)) + } else { + None + } + } else { + None + }; + + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + let value: u32 = match &src_count_data { + Some((mphf, counts)) => { + mphf.find(kmer).map(|s| counts.col(0).get(s)).unwrap_or(1) + } + None => 1, + }; + + if let Some((dst_layer, hit)) = dst_map.query(kmer) { + exist_builders[dst_layer][src_g].set_val(hit.slot, value); + } else if let Some(ref mphf) = new_mphf { + if let Some(slot) = mphf.find(kmer) { + new_src_builders[src_g].set_val(slot, value); + } + } + } + } + } + + // ── Close builders and update metadata ──────────────────────────────── + for (l, builders) in exist_builders.into_iter().enumerate() { + let layer_dir = dst_index_dir.join(format!("layer_{l}")); + for b in builders { b.close()?; } + // Update the matrix meta to reflect the n_src new columns. + let n = dst_map.layer(l).n(); + let data_dir = match mode { + MergeMode::Presence => layer_dir.join("presence"), + MergeMode::Count => layer_dir.join("counts"), + }; + write_matrix_meta(&data_dir, n, n_dst_genomes + n_src).map_err(SKError::Io)?; + } + + for b in new_src_builders { b.close()?; } + // new layer matrix meta was already written by append_column calls above + // with n_dst_genomes cols; update to n_dst_genomes + n_src. + if any_new { + let data_dir = match mode { + MergeMode::Presence => new_layer_dir.join("presence"), + MergeMode::Count => new_layer_dir.join("counts"), + }; + write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src).map_err(SKError::Io)?; + + let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?; + part_meta.n_layers = new_layer_idx + 1; + part_meta.save(&dst_index_dir).map_err(olm_to_sk)?; + } + + Ok(()) + } +} diff --git a/src/obilayeredmap/src/layer.rs b/src/obilayeredmap/src/layer.rs index 166a072..9f41675 100644 --- a/src/obilayeredmap/src/layer.rs +++ b/src/obilayeredmap/src/layer.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::fs; use std::path::Path; use obicompactvec::{ @@ -83,6 +84,22 @@ impl Layer<()> { pub fn build(out_dir: &Path) -> OLMResult { MphfLayer::build(out_dir, &mut |_, _| Ok(())) } + + /// Create a presence matrix for a set-membership layer (first merge). + /// + /// All `n_kmers` slots are set to `true`: every kmer in this layer belongs + /// to genome_0, so genome_0 is present at every slot. + pub fn init_presence_matrix(layer_dir: &Path, n_kmers: usize) -> OLMResult<()> { + let presence_dir = layer_dir.join(PRESENCE_DIR); + fs::create_dir_all(&presence_dir).map_err(OLMError::Io)?; + let mut mb = PersistentBitMatrixBuilder::new(n_kmers, &presence_dir).map_err(OLMError::Io)?; + let mut col = mb.add_col().map_err(OLMError::Io)?; + for slot in 0..n_kmers { + col.set(slot, true); + } + col.close().map_err(OLMError::Io)?; + mb.close().map_err(OLMError::Io) + } } // ── Mode 2 — count matrix (1 column per layer) ──────────────────────────────── @@ -111,9 +128,31 @@ impl Layer { } } +// ── Mode 2 — count matrix column append ────────────────────────────────────── + +impl Layer { + /// Append a genome column to an existing count matrix. + pub fn append_genome_column( + layer_dir: &Path, + value_of: impl Fn(usize) -> u32, + ) -> OLMResult<()> { + PersistentCompactIntMatrix::append_column(&layer_dir.join(COUNTS_DIR), value_of) + .map_err(OLMError::Io) + } +} + // ── Mode 3 — presence/absence matrix (1 column per genome) ─────────────────── impl Layer { + /// Append a genome column to an existing presence matrix. + pub fn append_genome_column( + layer_dir: &Path, + value_of: impl Fn(usize) -> bool, + ) -> OLMResult<()> { + PersistentBitMatrix::append_column(&layer_dir.join(PRESENCE_DIR), value_of) + .map_err(OLMError::Io) + } + pub fn build_presence( out_dir: &Path, n_genomes: usize,