feat: enhance merge label resolution, debug dump, and layer metadata

This commit enhances the CLI and index pipelines by introducing `--force-presence` to normalize output to binary values, `--debug` to expose partition and layer metadata, and `--rename-duplicates` to automatically disambiguate overlapping genome labels. It updates the partitioner and index layers to auto-discover layers, persist `meta.json` for single-genome builds, and fix per-source column offsets during merging. A `DuplicateGenomeLabel` error variant is also added, and stale directories are properly managed in presence/absence mode.
This commit is contained in:
Eric Coissac
2026-05-21 08:12:02 +02:00
parent 1a1f95e59d
commit 11182005a2
8 changed files with 347 additions and 94 deletions
+60 -4
View File
@@ -2,7 +2,6 @@ use obicompactvec::{PersistentBitMatrix, PersistentCompactIntMatrix};
use obikseq::CanonicalKmer;
use obiskio::{SKError, SKResult, UnitigFileReader};
use obilayeredmap::OLMError;
use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::MphfLayer;
use crate::partition::KmerPartition;
@@ -36,10 +35,14 @@ impl KmerPartition {
return Ok(());
}
let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?;
for l in 0..meta.n_layers {
// Discover layers by probing layer_0, layer_1, … until one is absent.
// PartitionMeta (meta.json) is only created by the merge path, not by
// the initial single-genome build, so we cannot rely on it here.
let mut l = 0;
loop {
let layer_dir = index_dir.join(format!("layer_{l}"));
if !layer_dir.exists() { break; }
l += 1;
let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?;
let reader = UnitigFileReader::open(&layer_dir.join("unitigs.bin"))?;
@@ -74,4 +77,57 @@ impl KmerPartition {
Ok(())
}
/// Like [`iter_partition_kmers`] but the callback also receives the layer index,
/// enabling debug output that identifies where each kmer was stored.
pub fn iter_partition_kmers_located(
&self,
part: usize,
use_counts: bool,
n_genomes: usize,
mut cb: impl FnMut(usize, usize, CanonicalKmer, Box<[u32]>),
) -> SKResult<()> {
let index_dir = self.part_dir(part).join(INDEX_SUBDIR);
if !index_dir.exists() {
return Ok(());
}
let mut layer = 0;
loop {
let layer_dir = index_dir.join(format!("layer_{layer}"));
if !layer_dir.exists() { break; }
let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?;
let reader = UnitigFileReader::open(&layer_dir.join("unitigs.bin"))?;
let counts_dir = layer_dir.join("counts");
let presence_dir = layer_dir.join("presence");
if use_counts && counts_dir.exists() {
let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) {
cb(part, layer, kmer, mat.row(slot));
}
}
} else if !use_counts && presence_dir.exists() {
let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if let Some(slot) = mphf.find(kmer) {
let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect();
cb(part, layer, kmer, row);
}
}
} else {
let all_present: Box<[u32]> = vec![1u32; n_genomes].into();
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
if mphf.find(kmer).is_some() {
cb(part, layer, kmer, all_present.clone());
}
}
}
layer += 1;
}
Ok(())
}
}
+6
View File
@@ -6,6 +6,7 @@ use epserde::prelude::*;
use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec};
use obidebruinj::GraphDeBruijn;
use obilayeredmap::{OLMError, layer::Layer};
use obilayeredmap::meta::PartitionMeta;
use obiskio::{SKError, SKFileMeta, SKFileReader};
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
@@ -118,6 +119,11 @@ impl KmerPartition {
Layer::<()>::build(&layer_dir).map_err(olm_to_sk)?;
}
// Write meta.json in the index/ directory so LayeredMap::open works
// (e.g. for subsequent merge operations).
let index_dir = layer_dir.parent().expect("layer_dir has a parent");
PartitionMeta { n_layers: 1 }.save(index_dir).map_err(olm_to_sk)?;
Ok(n_kmers)
}
+114 -56
View File
@@ -7,6 +7,7 @@ use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder,
PersistentBitVecBuilder,
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder,
PersistentCompactIntVecBuilder};
use obikseq::CanonicalKmer;
use obiskio::{SKError, SKResult, UnitigFileReader};
use obilayeredmap::{Layer, LayeredMap, MphfLayer, OLMError};
use obilayeredmap::meta::PartitionMeta;
@@ -41,10 +42,88 @@ impl ColBuilder {
}
}
// ── SrcLayerData — opened source matrix for pass-2 lookup ─────────────────────
enum SrcLayerData {
/// Pure set-membership layer (no data matrix): every kmer is present in all genomes.
SetMembership,
Presence(MphfLayer, PersistentBitMatrix),
Count(MphfLayer, PersistentCompactIntMatrix),
}
impl SrcLayerData {
fn open(layer_dir: &Path, mode: MergeMode) -> SKResult<Self> {
let presence_dir = layer_dir.join("presence");
let counts_dir = layer_dir.join("counts");
match mode {
MergeMode::Presence => {
if presence_dir.exists() {
let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?;
let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Presence(mphf, mat))
} else if counts_dir.exists() {
// Source is a count index; treat count > 0 as present via ColBuilder::Bit.
let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?;
let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Count(mphf, mat))
} else {
Ok(SrcLayerData::SetMembership)
}
}
MergeMode::Count => {
if counts_dir.exists() {
let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?;
let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?;
Ok(SrcLayerData::Count(mphf, mat))
} else {
Ok(SrcLayerData::SetMembership)
}
}
}
}
/// Return one value per source genome for `kmer`.
fn lookup(&self, kmer: CanonicalKmer, n_genomes: usize) -> Vec<u32> {
match self {
SrcLayerData::SetMembership => vec![1u32; n_genomes],
SrcLayerData::Presence(mphf, mat) => {
if let Some(slot) = mphf.find(kmer) {
mat.row(slot).iter().map(|&b| b as u32).collect()
} else {
vec![0u32; n_genomes]
}
}
SrcLayerData::Count(mphf, mat) => {
if let Some(slot) = mphf.find(kmer) {
mat.row(slot).iter().copied().collect()
} else {
vec![0u32; n_genomes]
}
}
}
}
}
// ── helpers ───────────────────────────────────────────────────────────────────
const INDEX_SUBDIR: &str = "index";
/// Load PartitionMeta, or recover it by probing layer directories.
/// Indexes built before meta.json was introduced lack the file.
fn load_meta(dir: &Path) -> SKResult<PartitionMeta> {
match PartitionMeta::load(dir) {
Ok(m) => Ok(m),
Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => {
let mut n = 0usize;
while dir.join(format!("layer_{n}")).exists() { n += 1; }
let m = PartitionMeta { n_layers: n };
m.save(dir).map_err(olm_to_sk)?;
Ok(m)
}
Err(e) => Err(olm_to_sk(e)),
}
}
fn olm_to_sk(e: OLMError) -> SKError {
match e {
OLMError::Io(e) => SKError::Io(e),
@@ -69,18 +148,17 @@ fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
impl KmerPartition {
/// Merge `sources` into destination partition `i`.
///
/// `n_dst_genomes` is the number of genome columns already in the dst
/// matrices (1 after copying source_0, more for subsequent merges).
/// Each entry in `sources` is `(partition, n_genomes)` where `n_genomes` is
/// the number of genome columns that source contributes. A merged index
/// contributes more than one. The total new columns added to the destination
/// is `sum(n_genomes)`.
///
/// Two-pass algorithm:
/// 1. Classify each source kmer as dst-hit or new → build de Bruijn graph
/// of new kmers → write unitigs → build MPHF for the new layer.
/// 2. Iterate source kmers again → fill per-genome column builders
/// (memory-mapped) → close → update matrix metadata.
/// `n_dst_genomes` is the number of genome columns already in the destination
/// matrices (copied from source_0 before this call).
pub fn merge_partition(
&self,
i: usize,
sources: &[&KmerPartition],
sources: &[(&KmerPartition, usize)],
mode: MergeMode,
n_dst_genomes: usize,
) -> SKResult<()> {
@@ -89,12 +167,13 @@ impl KmerPartition {
return Ok(());
}
load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open
let dst_map = LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?;
let n_dst_layers = dst_map.n_layers();
let n_src = sources.len();
let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum();
// First merge in presence mode: init presence matrices on existing layers
// (all slots true — every kmer in a layer belongs to genome_0).
// (all slots true — every kmer in those layers belongs to genome_0).
if n_dst_genomes == 1 && mode == MergeMode::Presence {
for l in 0..n_dst_layers {
let layer_dir = dst_index_dir.join(format!("layer_{l}"));
@@ -107,10 +186,10 @@ impl KmerPartition {
let mut g = GraphDeBruijn::new();
let mut any_new = false;
for src in sources.iter() {
for (src, _) in sources.iter() {
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
if !src_index_dir.exists() { continue; }
let src_meta = PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)?;
let src_meta = load_meta(&src_index_dir)?;
for l in 0..src_meta.n_layers {
let unitigs_path = src_index_dir
@@ -149,7 +228,7 @@ impl KmerPartition {
let n_new = new_mphf.as_ref().map_or(0, |m| m.n());
// ── Prepare matrix directories for the new layer ──────────────────────
// Absent columns (dst genomes) are written now via append_column (all-zero/false).
// Absent columns (dst genomes) are written via append_column (all-zero/false).
// Source-genome columns are created as mutable builders for pass 2.
let mut new_src_builders: Vec<ColBuilder> = if any_new {
let data_dir = match mode {
@@ -157,7 +236,6 @@ impl KmerPartition {
MergeMode::Count => new_layer_dir.join("counts"),
};
fs::create_dir_all(&data_dir)?;
// Bootstrap meta with 0 cols.
match mode {
MergeMode::Presence => {
PersistentBitMatrixBuilder::new(n_new, &data_dir)
@@ -166,7 +244,7 @@ impl KmerPartition {
PersistentBitMatrix::append_column(&data_dir, |_| false)
.map_err(SKError::Io)?;
}
(0..n_src).map(|g| -> SKResult<ColBuilder> {
(0..n_src_total).map(|g| -> SKResult<ColBuilder> {
let b = PersistentBitVecBuilder::new(
n_new, &col_path_bit(&data_dir, n_dst_genomes + g))?;
Ok(ColBuilder::Bit(b))
@@ -179,7 +257,7 @@ impl KmerPartition {
PersistentCompactIntMatrix::append_column(&data_dir, |_| 0)
.map_err(SKError::Io)?;
}
(0..n_src).map(|g| -> SKResult<ColBuilder> {
(0..n_src_total).map(|g| -> SKResult<ColBuilder> {
let b = PersistentCompactIntVecBuilder::new(
n_new, &col_path_int(&data_dir, n_dst_genomes + g))?;
Ok(ColBuilder::Int(b))
@@ -190,14 +268,13 @@ impl KmerPartition {
vec![]
};
// Builders for existing layers: one per (layer, src_genome).
// Invariant: existing layers already have exactly n_dst_genomes columns.
// New source columns go at positions n_dst_genomes .. n_dst_genomes+n_src-1.
// Builders for existing layers: n_src_total per layer.
// Columns n_dst_genomes .. n_dst_genomes + n_src_total - 1.
let mut exist_builders: Vec<Vec<ColBuilder>> = (0..n_dst_layers)
.map(|l| {
let layer_dir = dst_index_dir.join(format!("layer_{l}"));
let n = dst_map.layer(l).n();
(0..n_src).map(|src_g| -> SKResult<ColBuilder> {
(0..n_src_total).map(|src_g| -> SKResult<ColBuilder> {
match mode {
MergeMode::Presence => {
let data_dir = layer_dir.join("presence");
@@ -217,72 +294,53 @@ impl KmerPartition {
.collect::<SKResult<_>>()?;
// ── Pass 2: fill builders ─────────────────────────────────────────────
for (src_g, src) in sources.iter().enumerate() {
let mut col_offset = 0usize;
for (src, src_n) in sources.iter() {
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
if !src_index_dir.exists() { continue; }
let src_meta = PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)?;
if !src_index_dir.exists() { col_offset += src_n; continue; }
let src_meta = load_meta(&src_index_dir)?;
for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
let reader = UnitigFileReader::open(&src_layer_dir.join("unitigs.bin"))?;
// Open source MPHF + count matrix for count mode.
let src_count_data: Option<(MphfLayer, PersistentCompactIntMatrix)> =
if mode == MergeMode::Count {
let counts_dir = src_layer_dir.join("counts");
if counts_dir.exists() {
let mphf = MphfLayer::open(&src_layer_dir).map_err(olm_to_sk)?;
let mat = PersistentCompactIntMatrix::open(&counts_dir)
.map_err(SKError::Io)?;
Some((mphf, mat))
} else {
None
}
} else {
None
};
let src_data = SrcLayerData::open(&src_layer_dir, mode)?;
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() {
let value: u32 = match &src_count_data {
Some((mphf, counts)) => {
mphf.find(kmer).map(|s| counts.col(0).get(s)).unwrap_or(1)
}
None => 1,
};
if let Some((dst_layer, hit)) = dst_map.query(kmer) {
exist_builders[dst_layer][src_g].set_val(hit.slot, value);
} else if let Some(ref mphf) = new_mphf {
if let Some(slot) = mphf.find(kmer) {
new_src_builders[src_g].set_val(slot, value);
let values = src_data.lookup(kmer, *src_n);
for (g, &value) in values.iter().enumerate() {
let builder_idx = col_offset + g;
if let Some((dst_layer, hit)) = dst_map.query(kmer) {
exist_builders[dst_layer][builder_idx].set_val(hit.slot, value);
} else if let Some(ref mphf) = new_mphf {
if let Some(slot) = mphf.find(kmer) {
new_src_builders[builder_idx].set_val(slot, value);
}
}
}
}
}
col_offset += src_n;
}
// ── Close builders and update metadata ────────────────────────────────
for (l, builders) in exist_builders.into_iter().enumerate() {
let layer_dir = dst_index_dir.join(format!("layer_{l}"));
for b in builders { b.close()?; }
// Update the matrix meta to reflect the n_src new columns.
let n = dst_map.layer(l).n();
let data_dir = match mode {
MergeMode::Presence => layer_dir.join("presence"),
MergeMode::Count => layer_dir.join("counts"),
};
write_matrix_meta(&data_dir, n, n_dst_genomes + n_src).map_err(SKError::Io)?;
write_matrix_meta(&data_dir, n, n_dst_genomes + n_src_total).map_err(SKError::Io)?;
}
for b in new_src_builders { b.close()?; }
// new layer matrix meta was already written by append_column calls above
// with n_dst_genomes cols; update to n_dst_genomes + n_src.
if any_new {
let data_dir = match mode {
MergeMode::Presence => new_layer_dir.join("presence"),
MergeMode::Count => new_layer_dir.join("counts"),
};
write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src).map_err(SKError::Io)?;
write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total).map_err(SKError::Io)?;
let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?;
part_meta.n_layers = new_layer_idx + 1;