Push qkpyqurltlpk #1

Merged
coissac merged 11 commits from push-qkpyqurltlpk into main 2026-05-21 16:57:19 +00:00
17 changed files with 799 additions and 8 deletions
Showing only changes of commit e1d59fde54 - Show all commits
+27
View File
@@ -327,3 +327,30 @@ Other derivations: threshold a count matrix → binary presence matrix; union tw
2. Replace `LayerData` trait with the `DataStore` / `ColumnWeights` / `CountPartials` / `BitPartials` system.
3. Rewire `LayeredMap` to hold `LayeredStore<PersistentCompactIntMatrix>` (or bit variant) alongside the MPHF layers.
4. Implement `PartitionedIndex` using `LayeredStore<LayeredStore<S>>` for data and parallel dispatch for queries.
---
## Multi-genome column invariant
### The invariant
After any merge operation, **every layer in every partition has exactly `n_genomes` columns**, where `n_genomes` is the current total genome count recorded in `index.meta`. This applies to both `PersistentCompactIntMatrix` (Count mode) and `PersistentBitMatrix` (Presence mode).
### How it is maintained
The invariant is established and preserved by three coordinated operations:
**1. Existing layers — column append.**
When merging source genome G into an existing index with `n_existing_genomes` genomes, one column is appended to every existing layer via `append_genome_column`. Slots that contain a kmer present in source G receive its count or `true`; all other slots receive 0 or `false`. After this step, every pre-existing layer has `n_existing_genomes + 1` columns.
**2. New layers — absent columns prepended.**
If source G introduces kmers not found in any existing layer, a new layer is created for those kmers. Before appending source G's own column, `n_existing_genomes` absent columns (all-zero or all-false) are prepended — one per genome already in the index. This ensures the new layer starts at the same column count as every other layer in the partition immediately after creation.
**3. First merge, Presence mode — `init_presence_matrix`.**
The initial single-genome index has no `presence/` directory (presence is implicit: every kmer in the index is present in genome 0). On the first merge, before appending any column for source 1, `Layer<()>::init_presence_matrix` creates `presence/col_000000.pbiv` set entirely to `true` for each existing layer. This retroactively materialises genome 0's presence column, bringing the column count from 0 to 1 so that the subsequent append for source 1 raises it to 2.
### Why the invariant is required
The `LayeredStore` aggregation traits (`col_weights`, `partial_bray`, `partial_jaccard`, etc.) sum contributions across all `(partition, layer)` pairs without any shape check. If one layer had fewer columns than others, its contribution would silently produce a malformed result — wrong column sums, wrong distance matrices, and incorrect genome-level statistics.
The invariant is the precondition that makes the progressive aggregation principle correct: each level can blindly sum matrices from the level below because all matrices have the same shape.
+133
View File
@@ -0,0 +1,133 @@
# Merge command
## Purpose
`obikmer merge` combines multiple existing kmer indexes into a single index. The result contains all kmers from all sources, with per-genome presence/absence or count data for every genome across every layer.
---
## Modes
```rust
pub enum MergeMode { Presence, Count }
```
Default mode is `Presence`. `Count` mode requires **all** source indexes to have `with_counts=true`; mixing count and non-count sources is rejected at validation.
| Mode | Column type | Constraint |
|---|---|---|
| `Presence` | `PersistentBitMatrix` (one bit per genome per slot) | none |
| `Count` | `PersistentCompactIntMatrix` (one u32 per genome per slot) | all sources `with_counts=true` |
---
## Input / output constraints
All source indexes must satisfy:
- `IndexState::Indexed` (fully built — `index.done` sentinel present)
- Same `kmer_size`, `minimizer_size`, `n_bits`
- If `Count` mode: all sources must have `with_counts=true`
`--force`: if the output directory already exists, it is deleted before the merge begins.
---
## Algorithm
### 1. Validation
Check all sources against the constraints above. Abort on any mismatch.
### 2. Bootstrap output from first source
Recursive file copy of `sources[0]``output`. This establishes the partition layout, all existing MPHFs, unitigs, and evidence files. The first source's genome is genome 0 in the output.
### 3. For each subsequent source (parallel across partitions)
For each partition, process one source at a time sequentially:
**a. Classify kmers**
Iterate all kmers in the source partition (via `UnitigFileReader` + canonical kmer iteration). For each kmer, probe the destination `LayeredMap<()>`:
- **Hit** `(dst_layer, slot)`: record `(dst_layer, slot, value)` where `value` is the source count (Count mode) or `1` (Presence mode).
- **Miss**: add kmer to a `GraphDeBruijn` accumulator; record its value in a `HashMap<CanonicalKmer, Vec<u32>>`.
**b. Extend existing layers**
For each existing layer in the destination partition, call `append_genome_column` once per source genome being merged. Slots that received a hit are populated with their recorded value; all other slots receive 0 (absent).
If this is the **first merge** and operating in Presence mode, call `Layer<()>::init_presence_matrix` before appending any source column. This creates `presence/` with `col_000000.pbiv` set all-true (genome 0 is present in every slot of this layer).
**c. Build new layer for new kmers**
If the `GraphDeBruijn` accumulator is non-empty (misses exist), write compact unitigs from the de Bruijn graph, then call the appropriate `Layer::build` variant. Before appending the source's own column, prepend `n_existing_genomes` absent columns (value 0) — one per genome already in the index — so the column count invariant holds immediately after layer creation.
**d. Update partition metadata**
Write updated `meta.json` with the incremented `n_layers` if a new layer was added.
### 4. Update index metadata
Append the merged source's genome label(s) to `index.meta.genomes`. Write updated `index.meta`.
---
## Column count invariant
After any merge, **every layer in every partition has exactly `n_genomes` columns**, where `n_genomes` is the total genome count in the index at that point.
This is maintained by three mechanisms:
1. **Existing layers**: one column appended per source genome (`append_genome_column`).
2. **New layers created during merge**: `n_existing_genomes` absent columns prepended before the source's own column.
3. **First merge, Presence mode**: `init_presence_matrix` retroactively creates `presence/col_0` all-true for genome 0 before any source column is appended.
The invariant is a precondition of the `LayeredStore` aggregation traits: `col_weights()` and all partial distance methods assume every inner store has the same column count.
---
## New layer construction
All kmers absent from the destination index — across **all** sources being merged — accumulate into a **single** `GraphDeBruijn` per partition. One new layer is built from this graph, not one layer per source. This keeps the layer count bounded and preserves compact unitig representation.
De Bruijn reconstruction ensures that overlapping k-1 suffixes/prefixes from different source kmers are merged into minimal unitigs before MPHF construction.
---
## On-disk impact
After merging `G` genomes (1 existing + G-1 new sources):
```
partitions/
part_00000/
index/
meta.json ← n_layers updated if new layer added
layer_0/
mphf.bin ← unchanged (MPHF immutable)
unitigs.bin ← unchanged
evidence.bin ← unchanged
presence/ ← created on first merge (Presence mode)
meta.json {"n": N, "n_cols": G}
col_000000.pbiv ← all-true (genome 0)
col_000001.pbiv ← source 1 presence
...
counts/ ← extended (Count mode)
meta.json {"n": N, "n_cols": G}
col_000000.pciv ← genome 0 counts (from original build)
col_000001.pciv ← source 1 counts
...
layer_1/ ← new layer (if new kmers found)
mphf.bin
unitigs.bin
evidence.bin
presence/ or counts/
meta.json {"n": N1, "n_cols": G}
col_000000.pbiv ← all-false (genome 0 absent from this layer)
...
col_000001.pbiv ← source 1 presence in this new layer
```
The `index.meta` genome list grows from 1 entry to G entries after all sources are merged.
+54
View File
@@ -255,3 +255,57 @@ Each partition's new layer is built independently; the operation is fully parall
| `obicompactvec` | payload types + aggregation traits |
| `rayon 1` | parallel MPHF construction pass |
| `ndarray 0.16` | aggregation output arrays |
---
## Column append and merge support
These methods extend existing layers with new genome columns without touching the MPHF. They are the building blocks of the `merge` command.
### Matrix column append
```rust
impl PersistentCompactIntMatrix {
pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> OLMResult<()>
}
impl PersistentBitMatrix {
pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> bool) -> OLMResult<()>
}
```
Both methods write a new column file (`col_NNNNNN.pciv` / `col_NNNNNN.pbiv`) and update `meta.json` to increment `n_cols`. The `value_of` closure is called once per slot (indexed 0..n) to populate the column. The matrix `n` (row count) is read from the existing `meta.json` and must not change.
### Presence matrix initialisation
```rust
impl Layer<()> {
pub fn init_presence_matrix(layer_dir: &Path, n_kmers: usize) -> OLMResult<()>
}
```
Called on the first merge of a Presence-mode index. Creates the `presence/` subdirectory with `meta.json {"n": n_kmers, "n_cols": 1}` and `col_000000.pbiv` set entirely to `true`. This retroactively records that genome 0 (the original source) is present in every slot of this layer, satisfying the column count invariant before any new-source column is appended.
### Layer-level genome column append
```rust
impl Layer<PersistentBitMatrix> {
pub fn append_genome_column(
layer_dir: &Path,
value_of: impl Fn(usize) -> bool,
) -> OLMResult<()>
}
impl Layer<PersistentCompactIntMatrix> {
pub fn append_genome_column(
layer_dir: &Path,
value_of: impl Fn(usize) -> u32,
) -> OLMResult<()>
}
```
These delegate directly to the corresponding `PersistentBitMatrix::append_column` / `PersistentCompactIntMatrix::append_column`. They are typed at the `Layer` level to make call sites mode-aware without exposing the inner matrix path construction.
### Why the MPHF is never rebuilt
The MPHF (`mphf.bin`, `evidence.bin`, `unitigs.bin`) is built once from the kmer set of a layer and is immutable for the lifetime of that layer. Adding a genome column does not change the kmer set — it only adds a new data column indexed by the same slot numbers. Rebuilding the MPHF would require re-running the full construction pipeline (two sequential passes over unitigs, parallel ptr_hash construction) and would invalidate any open memory maps. Column append avoids all of this: the only disk writes are one new `.pciv`/`.pbiv` file and a single `meta.json` update. Kmers absent from a given layer are represented as zero (count) or false (presence) values in the new column — no structural change to the layer is required.
+1
View File
@@ -47,6 +47,7 @@ nav:
- obilayeredmap crate: implementation/obilayeredmap.md
- PersistentCompactIntVec: implementation/persistent_compact_int_vec.md
- PersistentBitVec: implementation/persistent_bit_vec.md
- Merge command: implementation/merge.md
- Architecture:
- Sequences: architecture/sequences/invariant.md
- Kmer index: architecture/index_architecture.md
+20
View File
@@ -101,6 +101,26 @@ impl PersistentBitMatrix {
}
}
// ── Column append ─────────────────────────────────────────────────────────────
impl PersistentBitMatrix {
/// Append a new column to an existing matrix on disk.
///
/// Reads `meta.json` to obtain `n` and the current column count, writes
/// `col_{n_cols:06}.pbiv` filled by `value_of(slot)`, then increments
/// `n_cols` in `meta.json`.
pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> bool) -> io::Result<()> {
let mut meta = MatrixMeta::load(dir)?;
let mut b = PersistentBitVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?;
for slot in 0..meta.n {
b.set(slot, value_of(slot));
}
b.close()?;
meta.n_cols += 1;
meta.save(dir)
}
}
fn upper_pairs(n: usize) -> Vec<(usize, usize)> {
(0..n).flat_map(|i| (i + 1..n).map(move |j| (i, j))).collect()
}
+20
View File
@@ -203,6 +203,26 @@ where
m
}
// ── Column append ─────────────────────────────────────────────────────────────
impl PersistentCompactIntMatrix {
/// Append a new column to an existing matrix on disk.
///
/// Reads `meta.json` to obtain `n` and the current column count, writes
/// `col_{n_cols:06}.pciv` filled by `value_of(slot)`, then increments
/// `n_cols` in `meta.json`.
pub fn append_column(dir: &Path, value_of: impl Fn(usize) -> u32) -> io::Result<()> {
let mut meta = MatrixMeta::load(dir)?;
let mut b = PersistentCompactIntVecBuilder::new(meta.n, &col_path(dir, meta.n_cols))?;
for slot in 0..meta.n {
b.set(slot, value_of(slot));
}
b.close()?;
meta.n_cols += 1;
meta.save(dir)
}
}
// ── Trait impls ───────────────────────────────────────────────────────────────
use crate::traits::{ColumnWeights, CountPartials};
+13 -3
View File
@@ -8,6 +8,12 @@ pub enum OKIError {
Io(io::Error),
Json(serde_json::Error),
Partition(SKError),
/// Source index is not in `Indexed` state.
NotIndexed(std::path::PathBuf),
/// Source indexes have incompatible configurations (k, m, n_bits).
IncompatibleConfig,
/// Count mode requested but a source index lacks count data.
MismatchedMode,
}
pub type OKIResult<T> = Result<T, OKIError>;
@@ -15,9 +21,12 @@ pub type OKIResult<T> = Result<T, OKIError>;
impl fmt::Display for OKIError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
OKIError::Io(e) => write!(f, "I/O error: {e}"),
OKIError::Json(e) => write!(f, "JSON error: {e}"),
OKIError::Partition(e) => write!(f, "partition error: {e}"),
OKIError::Io(e) => write!(f, "I/O error: {e}"),
OKIError::Json(e) => write!(f, "JSON error: {e}"),
OKIError::Partition(e) => write!(f, "partition error: {e}"),
OKIError::NotIndexed(p) => write!(f, "index not fully built: {}", p.display()),
OKIError::IncompatibleConfig => write!(f, "incompatible index configurations"),
OKIError::MismatchedMode => write!(f, "count mode requires all sources to have with_counts=true"),
}
}
}
@@ -28,6 +37,7 @@ impl std::error::Error for OKIError {
OKIError::Io(e) => Some(e),
OKIError::Json(e) => Some(e),
OKIError::Partition(e) => Some(e),
_ => None,
}
}
}
+3 -3
View File
@@ -15,9 +15,9 @@ use crate::meta::{IndexConfig, IndexMeta};
use crate::state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED};
pub struct KmerIndex {
root_path: PathBuf,
meta: IndexMeta,
partition: KmerPartition,
pub(crate) root_path: PathBuf,
pub(crate) meta: IndexMeta,
pub(crate) partition: KmerPartition,
}
impl KmerIndex {
+2
View File
@@ -2,8 +2,10 @@ pub mod error;
pub mod meta;
pub mod state;
mod index;
mod merge;
pub use error::{OKIError, OKIResult};
pub use index::KmerIndex;
pub use merge::MergeMode;
pub use meta::{IndexConfig, IndexMeta, META_FILENAME};
pub use state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED};
+131
View File
@@ -0,0 +1,131 @@
use std::fs;
use std::io;
use std::path::Path;
use obisys::{Reporter, Stage};
use rayon::prelude::*;
use tracing::info;
use crate::error::{OKIError, OKIResult};
use crate::index::KmerIndex;
use crate::meta::IndexMeta;
use crate::state::IndexState;
pub use obikpartitionner::MergeMode;
impl KmerIndex {
/// Merge `sources` into a new index at `output`.
///
/// All sources must be in `Indexed` state and share the same `kmer_size`,
/// `minimizer_size`, and `n_partitions`. Count mode additionally requires
/// every source to have `with_counts = true`.
///
/// The first source is copied to `output`, then each subsequent source is
/// merged partition-by-partition in parallel.
pub fn merge<P: AsRef<Path>>(
output: P,
sources: &[&KmerIndex],
mode: MergeMode,
force: bool,
) -> OKIResult<Self> {
let output = output.as_ref();
if sources.is_empty() {
return Err(OKIError::Io(io::Error::new(
io::ErrorKind::InvalidInput,
"merge requires at least one source index",
)));
}
// ── Validate ──────────────────────────────────────────────────────────
let ref0 = sources[0];
for src in sources {
if src.state() != IndexState::Indexed {
return Err(OKIError::NotIndexed(src.root_path.clone()));
}
if src.kmer_size() != ref0.kmer_size()
|| src.minimizer_size() != ref0.minimizer_size()
|| src.n_partitions() != ref0.n_partitions()
{
return Err(OKIError::IncompatibleConfig);
}
if mode == MergeMode::Count && !src.meta.config.with_counts {
return Err(OKIError::MismatchedMode);
}
}
// ── Prepare output directory ──────────────────────────────────────────
if output.exists() {
if force {
fs::remove_dir_all(output)?;
} else {
return Err(OKIError::Io(io::Error::new(
io::ErrorKind::AlreadyExists,
format!("{}: output directory already exists", output.display()),
)));
}
}
// ── Bootstrap: copy first source to output ────────────────────────────
info!("copying {} → {}", sources[0].root_path.display(), output.display());
copy_dir_all(&sources[0].root_path, output)?;
// Rewrite index.meta with all genome labels.
let all_genomes: Vec<String> = sources
.iter()
.flat_map(|s| s.meta.genomes.iter().cloned())
.collect();
let mut meta = IndexMeta::read(output).map_err(OKIError::Io)?;
meta.genomes = all_genomes;
meta.write(output)?;
// Open the destination index.
let dst = KmerIndex::open(output)?;
let n_partitions = dst.n_partitions();
// ── Merge each subsequent source partition-by-partition ───────────────
let remaining_sources: Vec<&KmerIndex> = sources[1..].to_vec();
if !remaining_sources.is_empty() {
let mut rep = Reporter::new();
let t = Stage::start("merge_partitions");
let dst_partition = &dst.partition;
let errors: Vec<obiskio::SKError> = (0..n_partitions)
.into_par_iter()
.filter_map(|i| {
let srcs: Vec<&obikpartitionner::KmerPartition> =
remaining_sources.iter().map(|s| &s.partition).collect();
// n_dst_genomes = 1 (copied from source_0 only)
dst_partition.merge_partition(i, &srcs, mode, 1).err()
})
.collect();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::Partition(e));
}
rep.push(t.stop());
}
// Re-open to get the updated state.
KmerIndex::open(output)
}
}
// ── Directory copy ────────────────────────────────────────────────────────────
fn copy_dir_all(src: &Path, dst: &Path) -> io::Result<()> {
fs::create_dir_all(dst)?;
for entry in fs::read_dir(src)? {
let entry = entry?;
let src_path = entry.path();
let dst_path = dst.join(entry.file_name());
if src_path.is_dir() {
copy_dir_all(&src_path, &dst_path)?;
} else {
fs::copy(&src_path, &dst_path)?;
}
}
Ok(())
}
+9 -2
View File
@@ -48,20 +48,27 @@ pub fn run(args: IndexArgs) {
let mut rep = Reporter::new();
// ── Open or create the index ─────────────────────────────────────────────
let mut idx = if KmerIndex::exists(&output) {
let mut idx = if KmerIndex::exists(&output) && !args.force {
info!("resuming from existing index at {}", output.display());
KmerIndex::open(&output).unwrap_or_else(|e| {
eprintln!("error opening index: {e}");
std::process::exit(1);
})
} else {
if args.force && KmerIndex::exists(&output) {
info!("--force: removing existing index at {}", output.display());
std::fs::remove_dir_all(&output).unwrap_or_else(|e| {
eprintln!("error removing existing index: {e}");
std::process::exit(1);
});
}
let config = IndexConfig {
kmer_size: args.common.kmer_size,
minimizer_size: args.common.minimizer_size,
n_bits: args.common.partition_bits,
with_counts: args.with_counts,
};
KmerIndex::create(&output, config, args.label.clone(), args.force).unwrap_or_else(|e| {
KmerIndex::create(&output, config, args.label.clone(), false).unwrap_or_else(|e| {
eprintln!("error creating index: {e}");
std::process::exit(1);
})
+47
View File
@@ -0,0 +1,47 @@
use std::path::PathBuf;
use clap::Args;
use obikindex::{KmerIndex, MergeMode};
use obisys::Reporter;
use tracing::info;
#[derive(Args)]
pub struct MergeArgs {
/// Source index directories to merge
#[arg(required = true)]
pub sources: Vec<PathBuf>,
/// Output index directory
#[arg(short, long)]
pub output: PathBuf,
/// Overwrite output directory if it already exists
#[arg(long, default_value_t = false)]
pub force: bool,
/// Merge in count mode (requires all sources to have with_counts=true)
#[arg(long, default_value_t = false)]
pub with_counts: bool,
}
pub fn run(args: MergeArgs) {
let mode = if args.with_counts { MergeMode::Count } else { MergeMode::Presence };
let sources: Vec<KmerIndex> = args.sources.iter().map(|p| {
info!("opening source index: {}", p.display());
KmerIndex::open(p).unwrap_or_else(|e| {
eprintln!("error opening source index {}: {e}", p.display());
std::process::exit(1);
})
}).collect();
let source_refs: Vec<&KmerIndex> = sources.iter().collect();
info!("merging {} indexes → {}", sources.len(), args.output.display());
let rep = Reporter::new();
KmerIndex::merge(&args.output, &source_refs, mode, args.force).unwrap_or_else(|e| {
eprintln!("error merging: {e}");
std::process::exit(1);
});
rep.print();
}
+1
View File
@@ -1,3 +1,4 @@
pub mod index;
pub mod merge;
pub mod superkmer;
pub mod unitig;
+3
View File
@@ -18,6 +18,8 @@ enum Commands {
Superkmer(cmd::superkmer::SuperkmerArgs),
/// Build the complete genome index (scatter → dereplicate → count → layered MPHF)
Index(cmd::index::IndexArgs),
/// Merge multiple built indexes into one
Merge(cmd::merge::MergeArgs),
/// Dump unitigs from a built index to stdout (debug)
Unitig(cmd::unitig::UnitigArgs),
}
@@ -43,6 +45,7 @@ fn main() {
match cli.command {
Commands::Superkmer(args) => cmd::superkmer::run(args),
Commands::Index(args) => cmd::index::run(args),
Commands::Merge(args) => cmd::merge::run(args),
Commands::Unitig(args) => cmd::unitig::run(args),
}
+2
View File
@@ -1,5 +1,7 @@
mod index_layer;
mod kmer_sort;
mod merge_layer;
mod partition;
pub use merge_layer::MergeMode;
pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR};
+294
View File
@@ -0,0 +1,294 @@
use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use obidebruinj::GraphDeBruijn;
use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder,
PersistentBitVecBuilder,
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder,
PersistentCompactIntVecBuilder};
use obiskio::{SKError, SKResult, UnitigFileReader};
use obilayeredmap::{Layer, LayeredMap, MphfLayer, OLMError};
use obilayeredmap::meta::PartitionMeta;
use crate::partition::KmerPartition;
// ── MergeMode ─────────────────────────────────────────────────────────────────
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MergeMode { Presence, Count }
// ── ColBuilder — enum dispatch to avoid trait-object boxing issues ─────────────
enum ColBuilder {
Bit(PersistentBitVecBuilder),
Int(PersistentCompactIntVecBuilder),
}
impl ColBuilder {
fn set_val(&mut self, slot: usize, value: u32) {
match self {
ColBuilder::Bit(b) => b.set(slot, value > 0),
ColBuilder::Int(b) => b.set(slot, value),
}
}
fn close(self) -> SKResult<()> {
match self {
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
}
}
}
// ── helpers ───────────────────────────────────────────────────────────────────
const INDEX_SUBDIR: &str = "index";
fn olm_to_sk(e: OLMError) -> SKError {
match e {
OLMError::Io(e) => SKError::Io(e),
other => SKError::InvalidData { context: "merge", detail: other.to_string() },
}
}
fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pbiv"))
}
fn col_path_int(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pciv"))
}
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
fs::write(dir.join("meta.json"), format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"))
}
// ── KmerPartition::merge_partition ────────────────────────────────────────────
impl KmerPartition {
/// Merge `sources` into destination partition `i`.
///
/// `n_dst_genomes` is the number of genome columns already in the dst
/// matrices (1 after copying source_0, more for subsequent merges).
///
/// Two-pass algorithm:
/// 1. Classify each source kmer as dst-hit or new → build de Bruijn graph
/// of new kmers → write unitigs → build MPHF for the new layer.
/// 2. Iterate source kmers again → fill per-genome column builders
/// (memory-mapped) → close → update matrix metadata.
pub fn merge_partition(
&self,
i: usize,
sources: &[&KmerPartition],
mode: MergeMode,
n_dst_genomes: usize,
) -> SKResult<()> {
let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR);
if !dst_index_dir.exists() {
return Ok(());
}
let dst_map = LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?;
let n_dst_layers = dst_map.n_layers();
let n_src = sources.len();
// First merge in presence mode: init presence matrices on existing layers
// (all slots true — every kmer in a layer belongs to genome_0).
if n_dst_genomes == 1 && mode == MergeMode::Presence {
for l in 0..n_dst_layers {
let layer_dir = dst_index_dir.join(format!("layer_{l}"));
Layer::<()>::init_presence_matrix(&layer_dir, dst_map.layer(l).n())
.map_err(olm_to_sk)?;
}
}
// ── Pass 1: classify kmers, build new-kmer de Bruijn graph ───────────
let mut g = GraphDeBruijn::new();
let mut any_new = false;
for src in sources.iter() {
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
if !src_index_dir.exists() { continue; }
let src_meta = PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)?;
for l in 0..src_meta.n_layers {
let unitigs_path = src_index_dir
.join(format!("layer_{l}")).join("unitigs.bin");
let reader = UnitigFileReader::open(&unitigs_path)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if dst_map.query(kmer).is_none() {
g.push(kmer);
any_new = true;
}
}
}
}
// Build new layer from de Bruijn graph if there are new kmers.
let new_layer_idx = n_dst_layers;
let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}"));
if any_new {
g.compute_degrees();
fs::create_dir_all(&new_layer_dir)?;
let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?;
for unitig in g.iter_unitig() {
uw.write(&unitig)?;
}
uw.close()?;
Layer::<()>::build(&new_layer_dir).map_err(olm_to_sk)?;
}
drop(g);
let new_mphf = if any_new {
Some(MphfLayer::open(&new_layer_dir).map_err(olm_to_sk)?)
} else {
None
};
let n_new = new_mphf.as_ref().map_or(0, |m| m.n());
// ── Prepare matrix directories for the new layer ──────────────────────
// Absent columns (dst genomes) are written now via append_column (all-zero/false).
// Source-genome columns are created as mutable builders for pass 2.
let mut new_src_builders: Vec<ColBuilder> = if any_new {
let data_dir = match mode {
MergeMode::Presence => new_layer_dir.join("presence"),
MergeMode::Count => new_layer_dir.join("counts"),
};
fs::create_dir_all(&data_dir)?;
// Bootstrap meta with 0 cols.
match mode {
MergeMode::Presence => {
PersistentBitMatrixBuilder::new(n_new, &data_dir)
.map_err(SKError::Io)?.close().map_err(SKError::Io)?;
for _ in 0..n_dst_genomes {
PersistentBitMatrix::append_column(&data_dir, |_| false)
.map_err(SKError::Io)?;
}
(0..n_src).map(|g| -> SKResult<ColBuilder> {
let b = PersistentBitVecBuilder::new(
n_new, &col_path_bit(&data_dir, n_dst_genomes + g))?;
Ok(ColBuilder::Bit(b))
}).collect::<SKResult<_>>()?
}
MergeMode::Count => {
PersistentCompactIntMatrixBuilder::new(n_new, &data_dir)
.map_err(SKError::Io)?.close().map_err(SKError::Io)?;
for _ in 0..n_dst_genomes {
PersistentCompactIntMatrix::append_column(&data_dir, |_| 0)
.map_err(SKError::Io)?;
}
(0..n_src).map(|g| -> SKResult<ColBuilder> {
let b = PersistentCompactIntVecBuilder::new(
n_new, &col_path_int(&data_dir, n_dst_genomes + g))?;
Ok(ColBuilder::Int(b))
}).collect::<SKResult<_>>()?
}
}
} else {
vec![]
};
// Builders for existing layers: one per (layer, src_genome).
// Invariant: existing layers already have exactly n_dst_genomes columns.
// New source columns go at positions n_dst_genomes .. n_dst_genomes+n_src-1.
let mut exist_builders: Vec<Vec<ColBuilder>> = (0..n_dst_layers)
.map(|l| {
let layer_dir = dst_index_dir.join(format!("layer_{l}"));
let n = dst_map.layer(l).n();
(0..n_src).map(|src_g| -> SKResult<ColBuilder> {
match mode {
MergeMode::Presence => {
let data_dir = layer_dir.join("presence");
let b = PersistentBitVecBuilder::new(
n, &col_path_bit(&data_dir, n_dst_genomes + src_g))?;
Ok(ColBuilder::Bit(b))
}
MergeMode::Count => {
let data_dir = layer_dir.join("counts");
let b = PersistentCompactIntVecBuilder::new(
n, &col_path_int(&data_dir, n_dst_genomes + src_g))?;
Ok(ColBuilder::Int(b))
}
}
}).collect::<SKResult<_>>()
})
.collect::<SKResult<_>>()?;
// ── Pass 2: fill builders ─────────────────────────────────────────────
for (src_g, src) in sources.iter().enumerate() {
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
if !src_index_dir.exists() { continue; }
let src_meta = PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)?;
for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
let reader = UnitigFileReader::open(&src_layer_dir.join("unitigs.bin"))?;
// Open source MPHF + count matrix for count mode.
let src_count_data: Option<(MphfLayer, PersistentCompactIntMatrix)> =
if mode == MergeMode::Count {
let counts_dir = src_layer_dir.join("counts");
if counts_dir.exists() {
let mphf = MphfLayer::open(&src_layer_dir).map_err(olm_to_sk)?;
let mat = PersistentCompactIntMatrix::open(&counts_dir)
.map_err(SKError::Io)?;
Some((mphf, mat))
} else {
None
}
} else {
None
};
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
let value: u32 = match &src_count_data {
Some((mphf, counts)) => {
mphf.find(kmer).map(|s| counts.col(0).get(s)).unwrap_or(1)
}
None => 1,
};
if let Some((dst_layer, hit)) = dst_map.query(kmer) {
exist_builders[dst_layer][src_g].set_val(hit.slot, value);
} else if let Some(ref mphf) = new_mphf {
if let Some(slot) = mphf.find(kmer) {
new_src_builders[src_g].set_val(slot, value);
}
}
}
}
}
// ── Close builders and update metadata ────────────────────────────────
for (l, builders) in exist_builders.into_iter().enumerate() {
let layer_dir = dst_index_dir.join(format!("layer_{l}"));
for b in builders { b.close()?; }
// Update the matrix meta to reflect the n_src new columns.
let n = dst_map.layer(l).n();
let data_dir = match mode {
MergeMode::Presence => layer_dir.join("presence"),
MergeMode::Count => layer_dir.join("counts"),
};
write_matrix_meta(&data_dir, n, n_dst_genomes + n_src).map_err(SKError::Io)?;
}
for b in new_src_builders { b.close()?; }
// new layer matrix meta was already written by append_column calls above
// with n_dst_genomes cols; update to n_dst_genomes + n_src.
if any_new {
let data_dir = match mode {
MergeMode::Presence => new_layer_dir.join("presence"),
MergeMode::Count => new_layer_dir.join("counts"),
};
write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src).map_err(SKError::Io)?;
let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?;
part_meta.n_layers = new_layer_idx + 1;
part_meta.save(&dst_index_dir).map_err(olm_to_sk)?;
}
Ok(())
}
}
+39
View File
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use obicompactvec::{
@@ -83,6 +84,22 @@ impl Layer<()> {
pub fn build(out_dir: &Path) -> OLMResult<usize> {
MphfLayer::build(out_dir, &mut |_, _| Ok(()))
}
/// Create a presence matrix for a set-membership layer (first merge).
///
/// All `n_kmers` slots are set to `true`: every kmer in this layer belongs
/// to genome_0, so genome_0 is present at every slot.
pub fn init_presence_matrix(layer_dir: &Path, n_kmers: usize) -> OLMResult<()> {
let presence_dir = layer_dir.join(PRESENCE_DIR);
fs::create_dir_all(&presence_dir).map_err(OLMError::Io)?;
let mut mb = PersistentBitMatrixBuilder::new(n_kmers, &presence_dir).map_err(OLMError::Io)?;
let mut col = mb.add_col().map_err(OLMError::Io)?;
for slot in 0..n_kmers {
col.set(slot, true);
}
col.close().map_err(OLMError::Io)?;
mb.close().map_err(OLMError::Io)
}
}
// ── Mode 2 — count matrix (1 column per layer) ────────────────────────────────
@@ -111,9 +128,31 @@ impl Layer<PersistentCompactIntMatrix> {
}
}
// ── Mode 2 — count matrix column append ──────────────────────────────────────
impl Layer<PersistentCompactIntMatrix> {
/// Append a genome column to an existing count matrix.
pub fn append_genome_column(
layer_dir: &Path,
value_of: impl Fn(usize) -> u32,
) -> OLMResult<()> {
PersistentCompactIntMatrix::append_column(&layer_dir.join(COUNTS_DIR), value_of)
.map_err(OLMError::Io)
}
}
// ── Mode 3 — presence/absence matrix (1 column per genome) ───────────────────
impl Layer<PersistentBitMatrix> {
/// Append a genome column to an existing presence matrix.
pub fn append_genome_column(
layer_dir: &Path,
value_of: impl Fn(usize) -> bool,
) -> OLMResult<()> {
PersistentBitMatrix::append_column(&layer_dir.join(PRESENCE_DIR), value_of)
.map_err(OLMError::Io)
}
pub fn build_presence(
out_dir: &Path,
n_genomes: usize,