diff --git a/src/obikindex/src/dump.rs b/src/obikindex/src/dump.rs index 31e5ed3..d1fb8db 100644 --- a/src/obikindex/src/dump.rs +++ b/src/obikindex/src/dump.rs @@ -14,13 +14,17 @@ impl KmerIndex { /// /// The caller must have set the global kmer length (`obikseq::set_k`) before /// calling this method. - pub fn dump(&self, out: &mut W, force_presence: bool) -> OKIResult<()> { + pub fn dump(&self, out: &mut W, force_presence: bool, debug: bool) -> OKIResult<()> { - let genomes = &self.meta.genomes; + let genomes = &self.meta.genomes; let use_counts = self.meta.config.with_counts && !force_presence; let n_genomes = genomes.len().max(1); + let kmer_size = self.kmer_size(); // ── Header ──────────────────────────────────────────────────────────── + if debug { + write!(out, "partition,layer,")?; + } write!(out, "kmer")?; for g in genomes { write!(out, ",{g}")?; @@ -30,18 +34,31 @@ impl KmerIndex { // ── Rows ────────────────────────────────────────────────────────────── let n = self.n_partitions(); for i in 0..n { - self.partition - .iter_partition_kmers(i, use_counts, n_genomes, |kmer, row| { - let seq = String::from_utf8(kmer.to_ascii()) - .unwrap_or_else(|_| "?".repeat(self.kmer_size())); - // write is infallible inside a closure — propagate via a flag if needed - let _ = write!(out, "{seq}"); - for &v in row.iter() { - let _ = write!(out, ",{v}"); - } - let _ = writeln!(out); - }) - .map_err(OKIError::Partition)?; + if debug { + self.partition + .iter_partition_kmers_located(i, use_counts, n_genomes, |part, layer, kmer, row| { + let seq = String::from_utf8(kmer.to_ascii()) + .unwrap_or_else(|_| "?".repeat(kmer_size)); + let _ = write!(out, "{part},{layer},{seq}"); + for &v in row.iter() { + let _ = write!(out, ",{v}"); + } + let _ = writeln!(out); + }) + .map_err(OKIError::Partition)?; + } else { + self.partition + .iter_partition_kmers(i, use_counts, n_genomes, |kmer, row| { + let seq = String::from_utf8(kmer.to_ascii()) + .unwrap_or_else(|_| "?".repeat(kmer_size)); + let _ = write!(out, "{seq}"); + for &v in row.iter() { + let _ = write!(out, ",{v}"); + } + let _ = writeln!(out); + }) + .map_err(OKIError::Partition)?; + } } out.flush()?; diff --git a/src/obikindex/src/error.rs b/src/obikindex/src/error.rs index 74082fe..66f1abc 100644 --- a/src/obikindex/src/error.rs +++ b/src/obikindex/src/error.rs @@ -14,6 +14,8 @@ pub enum OKIError { IncompatibleConfig, /// Count mode requested but a source index lacks count data. MismatchedMode, + /// Two or more sources share the same genome label. + DuplicateGenomeLabel(String), } pub type OKIResult = Result; @@ -27,6 +29,7 @@ impl fmt::Display for OKIError { OKIError::NotIndexed(p) => write!(f, "index not fully built: {}", p.display()), OKIError::IncompatibleConfig => write!(f, "incompatible index configurations"), OKIError::MismatchedMode => write!(f, "count mode requires all sources to have with_counts=true"), + OKIError::DuplicateGenomeLabel(l) => write!(f, "duplicate genome label across sources: {l}"), } } } @@ -37,7 +40,7 @@ impl std::error::Error for OKIError { OKIError::Io(e) => Some(e), OKIError::Json(e) => Some(e), OKIError::Partition(e) => Some(e), - _ => None, + _ => None, // IncompatibleConfig, MismatchedMode, DuplicateGenomeLabel } } } diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index be71a02..305d344 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::fs; use std::io; use std::path::Path; @@ -20,13 +21,16 @@ impl KmerIndex { /// `minimizer_size`, and `n_partitions`. Count mode additionally requires /// every source to have `with_counts = true`. /// - /// The first source is copied to `output`, then each subsequent source is - /// merged partition-by-partition in parallel. + /// Genome labels must be unique across all sources. If `rename_duplicates` + /// is true, repeated labels are disambiguated by appending `.1`, `.2`, … + /// to the second and subsequent occurrences. Otherwise a + /// `DuplicateGenomeLabel` error is returned on the first conflict. pub fn merge>( output: P, sources: &[&KmerIndex], mode: MergeMode, force: bool, + rename_duplicates: bool, ) -> OKIResult { let output = output.as_ref(); @@ -37,7 +41,7 @@ impl KmerIndex { ))); } - // ── Validate ────────────────────────────────────────────────────────── + // ── Validate config compatibility ───────────────────────────────────── let ref0 = sources[0]; for src in sources { if src.state() != IndexState::Indexed { @@ -54,6 +58,9 @@ impl KmerIndex { } } + // ── Compute final genome labels (rename duplicates if requested) ─────── + let (source_labels, all_genomes) = compute_labels(sources, rename_duplicates)?; + // ── Prepare output directory ────────────────────────────────────────── if output.exists() { if force { @@ -70,18 +77,32 @@ impl KmerIndex { info!("copying {} → {}", sources[0].root_path.display(), output.display()); copy_dir_all(&sources[0].root_path, output)?; - // Rewrite index.meta with all genome labels. - let all_genomes: Vec = sources - .iter() - .flat_map(|s| s.meta.genomes.iter().cloned()) - .collect(); + // Rewrite index.meta with final genome labels and the effective mode. let mut meta = IndexMeta::read(output).map_err(OKIError::Io)?; meta.genomes = all_genomes; + meta.config.with_counts = mode == MergeMode::Count; meta.write(output)?; + // In presence/absence mode, purge counts/ directories inherited from + // source_0 — they are stale data from the source's count index. + if mode == MergeMode::Presence { + remove_dirs_named(output, "counts")?; + } + + // Rebuild spectrums/ from all sources using the (possibly renamed) labels. + // Drop the spectrums/ that were copied from source_0 and rebuild from scratch. + let spectrums_dir = output.join("spectrums"); + if spectrums_dir.exists() { + fs::remove_dir_all(&spectrums_dir)?; + } + for (src, new_labels) in sources.iter().zip(&source_labels) { + copy_spectrums(&src.root_path, output, &src.meta.genomes, new_labels)?; + } + // Open the destination index. let dst = KmerIndex::open(output)?; let n_partitions = dst.n_partitions(); + let n_dst_genomes = sources[0].meta.genomes.len(); // ── Merge each subsequent source partition-by-partition ─────────────── let remaining_sources: Vec<&KmerIndex> = sources[1..].to_vec(); @@ -94,10 +115,9 @@ impl KmerIndex { let errors: Vec = (0..n_partitions) .into_par_iter() .filter_map(|i| { - let srcs: Vec<&obikpartitionner::KmerPartition> = - remaining_sources.iter().map(|s| &s.partition).collect(); - // n_dst_genomes = 1 (copied from source_0 only) - dst_partition.merge_partition(i, &srcs, mode, 1).err() + let srcs: Vec<(&obikpartitionner::KmerPartition, usize)> = + remaining_sources.iter().map(|s| (&s.partition, s.meta.genomes.len())).collect(); + dst_partition.merge_partition(i, &srcs, mode, n_dst_genomes).err() }) .collect(); @@ -113,7 +133,78 @@ impl KmerIndex { } } -// ── Directory copy ──────────────────────────────────────────────────────────── +// ── Helpers ─────────────────────────────────────────────────────────────────── + +/// Compute the final genome label lists for all sources. +/// +/// Returns `(per_source_labels, all_genomes_flat)`. +/// The first occurrence of a label keeps the original name. Subsequent +/// occurrences receive `.1`, `.2`, … suffixes when `rename_duplicates` is true, +/// or trigger a `DuplicateGenomeLabel` error otherwise. +fn compute_labels( + sources: &[&KmerIndex], + rename_duplicates: bool, +) -> OKIResult<(Vec>, Vec)> { + let mut seen: HashMap = HashMap::new(); + let mut source_labels: Vec> = Vec::with_capacity(sources.len()); + let mut all_genomes: Vec = Vec::new(); + + for src in sources { + let mut labels = Vec::with_capacity(src.meta.genomes.len()); + for label in &src.meta.genomes { + let count = seen.entry(label.clone()).or_insert(0); + let new_label = if *count == 0 { + label.clone() + } else if rename_duplicates { + format!("{label}.{count}") + } else { + return Err(OKIError::DuplicateGenomeLabel(label.clone())); + }; + *count += 1; + labels.push(new_label.clone()); + all_genomes.push(new_label); + } + source_labels.push(labels); + } + + Ok((source_labels, all_genomes)) +} + +/// Copy spectrum JSON files from `src_root/spectrums/` to `dst_root/spectrums/`, +/// mapping each `old_labels[i]` filename to `new_labels[i]`. +fn copy_spectrums( + src_root: &Path, + dst_root: &Path, + old_labels: &[String], + new_labels: &[String], +) -> io::Result<()> { + let src_dir = src_root.join("spectrums"); + let dst_dir = dst_root.join("spectrums"); + fs::create_dir_all(&dst_dir)?; + for (old, new) in old_labels.iter().zip(new_labels.iter()) { + let src_file = src_dir.join(format!("{old}.json")); + if src_file.exists() { + fs::copy(&src_file, dst_dir.join(format!("{new}.json")))?; + } + } + Ok(()) +} + +/// Recursively remove every directory named `name` under `root`. +fn remove_dirs_named(root: &Path, name: &str) -> io::Result<()> { + for entry in fs::read_dir(root)? { + let entry = entry?; + let path = entry.path(); + if path.is_dir() { + if path.file_name().and_then(|n| n.to_str()) == Some(name) { + fs::remove_dir_all(&path)?; + } else { + remove_dirs_named(&path, name)?; + } + } + } + Ok(()) +} fn copy_dir_all(src: &Path, dst: &Path) -> io::Result<()> { fs::create_dir_all(dst)?; diff --git a/src/obikmer/src/cmd/dump.rs b/src/obikmer/src/cmd/dump.rs index 8775699..f7c324c 100644 --- a/src/obikmer/src/cmd/dump.rs +++ b/src/obikmer/src/cmd/dump.rs @@ -14,6 +14,10 @@ pub struct DumpArgs { /// Output presence/absence (0/1) even if the index stores counts #[arg(long, default_value_t = false)] pub force_presence: bool, + + /// Prepend partition and layer columns to each row + #[arg(long, default_value_t = false)] + pub debug: bool, } pub fn run(args: DumpArgs) { @@ -32,7 +36,7 @@ pub fn run(args: DumpArgs) { let stdout = io::stdout(); let mut out = BufWriter::new(stdout.lock()); - idx.dump(&mut out, args.force_presence).unwrap_or_else(|e| { + idx.dump(&mut out, args.force_presence, args.debug).unwrap_or_else(|e| { eprintln!("dump error: {e}"); std::process::exit(1); }); diff --git a/src/obikmer/src/cmd/merge.rs b/src/obikmer/src/cmd/merge.rs index 0109617..0b18ac8 100644 --- a/src/obikmer/src/cmd/merge.rs +++ b/src/obikmer/src/cmd/merge.rs @@ -2,6 +2,7 @@ use std::path::PathBuf; use clap::Args; use obikindex::{KmerIndex, MergeMode}; +use obikseq::{set_k, set_m}; use obisys::Reporter; use tracing::info; @@ -19,14 +20,16 @@ pub struct MergeArgs { #[arg(long, default_value_t = false)] pub force: bool, - /// Merge in count mode (requires all sources to have with_counts=true) + /// Force presence/absence mode even if all sources have count data #[arg(long, default_value_t = false)] - pub with_counts: bool, + pub force_presence: bool, + + /// Disambiguate duplicate genome labels by appending .1, .2, … instead of erroring + #[arg(long, default_value_t = false)] + pub rename_duplicates: bool, } pub fn run(args: MergeArgs) { - let mode = if args.with_counts { MergeMode::Count } else { MergeMode::Presence }; - let sources: Vec = args.sources.iter().map(|p| { info!("opening source index: {}", p.display()); KmerIndex::open(p).unwrap_or_else(|e| { @@ -35,11 +38,26 @@ pub fn run(args: MergeArgs) { }) }).collect(); + // Auto-detect mode: count if all sources have count data, presence otherwise. + // --force-presence overrides to presence regardless. + let all_have_counts = sources.iter().all(|s| s.meta().config.with_counts); + let mode = if !args.force_presence && all_have_counts { + MergeMode::Count + } else { + MergeMode::Presence + }; + info!( + "merge mode: {}", + if mode == MergeMode::Count { "count" } else { "presence/absence" } + ); + let source_refs: Vec<&KmerIndex> = sources.iter().collect(); + set_k(sources[0].kmer_size()); + set_m(sources[0].minimizer_size()); info!("merging {} indexes → {}", sources.len(), args.output.display()); let rep = Reporter::new(); - KmerIndex::merge(&args.output, &source_refs, mode, args.force).unwrap_or_else(|e| { + KmerIndex::merge(&args.output, &source_refs, mode, args.force, args.rename_duplicates).unwrap_or_else(|e| { eprintln!("error merging: {e}"); std::process::exit(1); }); diff --git a/src/obikpartitionner/src/dump_layer.rs b/src/obikpartitionner/src/dump_layer.rs index 71bcc93..eaf8231 100644 --- a/src/obikpartitionner/src/dump_layer.rs +++ b/src/obikpartitionner/src/dump_layer.rs @@ -2,7 +2,6 @@ use obicompactvec::{PersistentBitMatrix, PersistentCompactIntMatrix}; use obikseq::CanonicalKmer; use obiskio::{SKError, SKResult, UnitigFileReader}; use obilayeredmap::OLMError; -use obilayeredmap::meta::PartitionMeta; use obilayeredmap::MphfLayer; use crate::partition::KmerPartition; @@ -36,10 +35,14 @@ impl KmerPartition { return Ok(()); } - let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?; - - for l in 0..meta.n_layers { + // Discover layers by probing layer_0, layer_1, … until one is absent. + // PartitionMeta (meta.json) is only created by the merge path, not by + // the initial single-genome build, so we cannot rely on it here. + let mut l = 0; + loop { let layer_dir = index_dir.join(format!("layer_{l}")); + if !layer_dir.exists() { break; } + l += 1; let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?; let reader = UnitigFileReader::open(&layer_dir.join("unitigs.bin"))?; @@ -74,4 +77,57 @@ impl KmerPartition { Ok(()) } + + /// Like [`iter_partition_kmers`] but the callback also receives the layer index, + /// enabling debug output that identifies where each kmer was stored. + pub fn iter_partition_kmers_located( + &self, + part: usize, + use_counts: bool, + n_genomes: usize, + mut cb: impl FnMut(usize, usize, CanonicalKmer, Box<[u32]>), + ) -> SKResult<()> { + let index_dir = self.part_dir(part).join(INDEX_SUBDIR); + if !index_dir.exists() { + return Ok(()); + } + + let mut layer = 0; + loop { + let layer_dir = index_dir.join(format!("layer_{layer}")); + if !layer_dir.exists() { break; } + let mphf = MphfLayer::open(&layer_dir).map_err(olm_to_sk)?; + let reader = UnitigFileReader::open(&layer_dir.join("unitigs.bin"))?; + + let counts_dir = layer_dir.join("counts"); + let presence_dir = layer_dir.join("presence"); + + if use_counts && counts_dir.exists() { + let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + if let Some(slot) = mphf.find(kmer) { + cb(part, layer, kmer, mat.row(slot)); + } + } + } else if !use_counts && presence_dir.exists() { + let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + if let Some(slot) = mphf.find(kmer) { + let row: Box<[u32]> = mat.row(slot).iter().map(|&b| b as u32).collect(); + cb(part, layer, kmer, row); + } + } + } else { + let all_present: Box<[u32]> = vec![1u32; n_genomes].into(); + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + if mphf.find(kmer).is_some() { + cb(part, layer, kmer, all_present.clone()); + } + } + } + layer += 1; + } + + Ok(()) + } } diff --git a/src/obikpartitionner/src/index_layer.rs b/src/obikpartitionner/src/index_layer.rs index a6d873a..a6401b5 100644 --- a/src/obikpartitionner/src/index_layer.rs +++ b/src/obikpartitionner/src/index_layer.rs @@ -6,6 +6,7 @@ use epserde::prelude::*; use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec}; use obidebruinj::GraphDeBruijn; use obilayeredmap::{OLMError, layer::Layer}; +use obilayeredmap::meta::PartitionMeta; use obiskio::{SKError, SKFileMeta, SKFileReader}; use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; @@ -118,6 +119,11 @@ impl KmerPartition { Layer::<()>::build(&layer_dir).map_err(olm_to_sk)?; } + // Write meta.json in the index/ directory so LayeredMap::open works + // (e.g. for subsequent merge operations). + let index_dir = layer_dir.parent().expect("layer_dir has a parent"); + PartitionMeta { n_layers: 1 }.save(index_dir).map_err(olm_to_sk)?; + Ok(n_kmers) } diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 7d77c45..c933966 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -7,6 +7,7 @@ use obicompactvec::{PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder}; +use obikseq::CanonicalKmer; use obiskio::{SKError, SKResult, UnitigFileReader}; use obilayeredmap::{Layer, LayeredMap, MphfLayer, OLMError}; use obilayeredmap::meta::PartitionMeta; @@ -41,10 +42,88 @@ impl ColBuilder { } } +// ── SrcLayerData — opened source matrix for pass-2 lookup ───────────────────── + +enum SrcLayerData { + /// Pure set-membership layer (no data matrix): every kmer is present in all genomes. + SetMembership, + Presence(MphfLayer, PersistentBitMatrix), + Count(MphfLayer, PersistentCompactIntMatrix), +} + +impl SrcLayerData { + fn open(layer_dir: &Path, mode: MergeMode) -> SKResult { + let presence_dir = layer_dir.join("presence"); + let counts_dir = layer_dir.join("counts"); + match mode { + MergeMode::Presence => { + if presence_dir.exists() { + let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?; + let mat = PersistentBitMatrix::open(&presence_dir).map_err(SKError::Io)?; + Ok(SrcLayerData::Presence(mphf, mat)) + } else if counts_dir.exists() { + // Source is a count index; treat count > 0 as present via ColBuilder::Bit. + let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?; + let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; + Ok(SrcLayerData::Count(mphf, mat)) + } else { + Ok(SrcLayerData::SetMembership) + } + } + MergeMode::Count => { + if counts_dir.exists() { + let mphf = MphfLayer::open(layer_dir).map_err(olm_to_sk)?; + let mat = PersistentCompactIntMatrix::open(&counts_dir).map_err(SKError::Io)?; + Ok(SrcLayerData::Count(mphf, mat)) + } else { + Ok(SrcLayerData::SetMembership) + } + } + } + } + + /// Return one value per source genome for `kmer`. + fn lookup(&self, kmer: CanonicalKmer, n_genomes: usize) -> Vec { + match self { + SrcLayerData::SetMembership => vec![1u32; n_genomes], + SrcLayerData::Presence(mphf, mat) => { + if let Some(slot) = mphf.find(kmer) { + mat.row(slot).iter().map(|&b| b as u32).collect() + } else { + vec![0u32; n_genomes] + } + } + SrcLayerData::Count(mphf, mat) => { + if let Some(slot) = mphf.find(kmer) { + mat.row(slot).iter().copied().collect() + } else { + vec![0u32; n_genomes] + } + } + } + } +} + // ── helpers ─────────────────────────────────────────────────────────────────── const INDEX_SUBDIR: &str = "index"; +/// Load PartitionMeta, or recover it by probing layer directories. +/// Indexes built before meta.json was introduced lack the file. +fn load_meta(dir: &Path) -> SKResult { + match PartitionMeta::load(dir) { + Ok(m) => Ok(m), + Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => { + let mut n = 0usize; + while dir.join(format!("layer_{n}")).exists() { n += 1; } + let m = PartitionMeta { n_layers: n }; + m.save(dir).map_err(olm_to_sk)?; + Ok(m) + } + Err(e) => Err(olm_to_sk(e)), + } +} + fn olm_to_sk(e: OLMError) -> SKError { match e { OLMError::Io(e) => SKError::Io(e), @@ -69,18 +148,17 @@ fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { impl KmerPartition { /// Merge `sources` into destination partition `i`. /// - /// `n_dst_genomes` is the number of genome columns already in the dst - /// matrices (1 after copying source_0, more for subsequent merges). + /// Each entry in `sources` is `(partition, n_genomes)` where `n_genomes` is + /// the number of genome columns that source contributes. A merged index + /// contributes more than one. The total new columns added to the destination + /// is `sum(n_genomes)`. /// - /// Two-pass algorithm: - /// 1. Classify each source kmer as dst-hit or new → build de Bruijn graph - /// of new kmers → write unitigs → build MPHF for the new layer. - /// 2. Iterate source kmers again → fill per-genome column builders - /// (memory-mapped) → close → update matrix metadata. + /// `n_dst_genomes` is the number of genome columns already in the destination + /// matrices (copied from source_0 before this call). pub fn merge_partition( &self, i: usize, - sources: &[&KmerPartition], + sources: &[(&KmerPartition, usize)], mode: MergeMode, n_dst_genomes: usize, ) -> SKResult<()> { @@ -89,12 +167,13 @@ impl KmerPartition { return Ok(()); } + load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open let dst_map = LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?; let n_dst_layers = dst_map.n_layers(); - let n_src = sources.len(); + let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum(); // First merge in presence mode: init presence matrices on existing layers - // (all slots true — every kmer in a layer belongs to genome_0). + // (all slots true — every kmer in those layers belongs to genome_0). if n_dst_genomes == 1 && mode == MergeMode::Presence { for l in 0..n_dst_layers { let layer_dir = dst_index_dir.join(format!("layer_{l}")); @@ -107,10 +186,10 @@ impl KmerPartition { let mut g = GraphDeBruijn::new(); let mut any_new = false; - for src in sources.iter() { + for (src, _) in sources.iter() { let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); if !src_index_dir.exists() { continue; } - let src_meta = PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)?; + let src_meta = load_meta(&src_index_dir)?; for l in 0..src_meta.n_layers { let unitigs_path = src_index_dir @@ -149,7 +228,7 @@ impl KmerPartition { let n_new = new_mphf.as_ref().map_or(0, |m| m.n()); // ── Prepare matrix directories for the new layer ────────────────────── - // Absent columns (dst genomes) are written now via append_column (all-zero/false). + // Absent columns (dst genomes) are written via append_column (all-zero/false). // Source-genome columns are created as mutable builders for pass 2. let mut new_src_builders: Vec = if any_new { let data_dir = match mode { @@ -157,7 +236,6 @@ impl KmerPartition { MergeMode::Count => new_layer_dir.join("counts"), }; fs::create_dir_all(&data_dir)?; - // Bootstrap meta with 0 cols. match mode { MergeMode::Presence => { PersistentBitMatrixBuilder::new(n_new, &data_dir) @@ -166,7 +244,7 @@ impl KmerPartition { PersistentBitMatrix::append_column(&data_dir, |_| false) .map_err(SKError::Io)?; } - (0..n_src).map(|g| -> SKResult { + (0..n_src_total).map(|g| -> SKResult { let b = PersistentBitVecBuilder::new( n_new, &col_path_bit(&data_dir, n_dst_genomes + g))?; Ok(ColBuilder::Bit(b)) @@ -179,7 +257,7 @@ impl KmerPartition { PersistentCompactIntMatrix::append_column(&data_dir, |_| 0) .map_err(SKError::Io)?; } - (0..n_src).map(|g| -> SKResult { + (0..n_src_total).map(|g| -> SKResult { let b = PersistentCompactIntVecBuilder::new( n_new, &col_path_int(&data_dir, n_dst_genomes + g))?; Ok(ColBuilder::Int(b)) @@ -190,14 +268,13 @@ impl KmerPartition { vec![] }; - // Builders for existing layers: one per (layer, src_genome). - // Invariant: existing layers already have exactly n_dst_genomes columns. - // New source columns go at positions n_dst_genomes .. n_dst_genomes+n_src-1. + // Builders for existing layers: n_src_total per layer. + // Columns n_dst_genomes .. n_dst_genomes + n_src_total - 1. let mut exist_builders: Vec> = (0..n_dst_layers) .map(|l| { let layer_dir = dst_index_dir.join(format!("layer_{l}")); let n = dst_map.layer(l).n(); - (0..n_src).map(|src_g| -> SKResult { + (0..n_src_total).map(|src_g| -> SKResult { match mode { MergeMode::Presence => { let data_dir = layer_dir.join("presence"); @@ -217,72 +294,53 @@ impl KmerPartition { .collect::>()?; // ── Pass 2: fill builders ───────────────────────────────────────────── - for (src_g, src) in sources.iter().enumerate() { + let mut col_offset = 0usize; + for (src, src_n) in sources.iter() { let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); - if !src_index_dir.exists() { continue; } - let src_meta = PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)?; + if !src_index_dir.exists() { col_offset += src_n; continue; } + let src_meta = load_meta(&src_index_dir)?; for l in 0..src_meta.n_layers { let src_layer_dir = src_index_dir.join(format!("layer_{l}")); let reader = UnitigFileReader::open(&src_layer_dir.join("unitigs.bin"))?; - - // Open source MPHF + count matrix for count mode. - let src_count_data: Option<(MphfLayer, PersistentCompactIntMatrix)> = - if mode == MergeMode::Count { - let counts_dir = src_layer_dir.join("counts"); - if counts_dir.exists() { - let mphf = MphfLayer::open(&src_layer_dir).map_err(olm_to_sk)?; - let mat = PersistentCompactIntMatrix::open(&counts_dir) - .map_err(SKError::Io)?; - Some((mphf, mat)) - } else { - None - } - } else { - None - }; + let src_data = SrcLayerData::open(&src_layer_dir, mode)?; for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { - let value: u32 = match &src_count_data { - Some((mphf, counts)) => { - mphf.find(kmer).map(|s| counts.col(0).get(s)).unwrap_or(1) - } - None => 1, - }; - - if let Some((dst_layer, hit)) = dst_map.query(kmer) { - exist_builders[dst_layer][src_g].set_val(hit.slot, value); - } else if let Some(ref mphf) = new_mphf { - if let Some(slot) = mphf.find(kmer) { - new_src_builders[src_g].set_val(slot, value); + let values = src_data.lookup(kmer, *src_n); + for (g, &value) in values.iter().enumerate() { + let builder_idx = col_offset + g; + if let Some((dst_layer, hit)) = dst_map.query(kmer) { + exist_builders[dst_layer][builder_idx].set_val(hit.slot, value); + } else if let Some(ref mphf) = new_mphf { + if let Some(slot) = mphf.find(kmer) { + new_src_builders[builder_idx].set_val(slot, value); + } } } } } + col_offset += src_n; } // ── Close builders and update metadata ──────────────────────────────── for (l, builders) in exist_builders.into_iter().enumerate() { let layer_dir = dst_index_dir.join(format!("layer_{l}")); for b in builders { b.close()?; } - // Update the matrix meta to reflect the n_src new columns. let n = dst_map.layer(l).n(); let data_dir = match mode { MergeMode::Presence => layer_dir.join("presence"), MergeMode::Count => layer_dir.join("counts"), }; - write_matrix_meta(&data_dir, n, n_dst_genomes + n_src).map_err(SKError::Io)?; + write_matrix_meta(&data_dir, n, n_dst_genomes + n_src_total).map_err(SKError::Io)?; } for b in new_src_builders { b.close()?; } - // new layer matrix meta was already written by append_column calls above - // with n_dst_genomes cols; update to n_dst_genomes + n_src. if any_new { let data_dir = match mode { MergeMode::Presence => new_layer_dir.join("presence"), MergeMode::Count => new_layer_dir.join("counts"), }; - write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src).map_err(SKError::Io)?; + write_matrix_meta(&data_dir, n_new, n_dst_genomes + n_src_total).map_err(SKError::Io)?; let mut part_meta = PartitionMeta::load(&dst_index_dir).map_err(olm_to_sk)?; part_meta.n_layers = new_layer_idx + 1;