diff --git a/src/obikindex/src/lib.rs b/src/obikindex/src/lib.rs index d6a0908..65df6c0 100644 --- a/src/obikindex/src/lib.rs +++ b/src/obikindex/src/lib.rs @@ -5,6 +5,7 @@ mod distance; mod dump; mod index; mod merge; +mod rebuild; pub use error::{OKIError, OKIResult}; pub use distance::{DistanceMetric, DistanceOutput}; diff --git a/src/obikindex/src/rebuild.rs b/src/obikindex/src/rebuild.rs new file mode 100644 index 0000000..b360030 --- /dev/null +++ b/src/obikindex/src/rebuild.rs @@ -0,0 +1,116 @@ +use std::fs; +use std::io; +use std::path::Path; +use std::time::Duration; + +use indicatif::{ProgressBar, ProgressStyle}; +use obikpartitionner::{KmerFilter, KmerPartition, MergeMode}; +use obisys::{Reporter, Stage}; +use rayon::prelude::*; +use tracing::info; + +use crate::error::{OKIError, OKIResult}; +use crate::index::KmerIndex; +use crate::meta::IndexMeta; +use crate::state::{IndexState, SENTINEL_INDEXED}; + +impl KmerIndex { + /// Rebuild `src` into a new compact single-layer index at `output`. + /// + /// Only k-mers whose per-genome row passes every filter in `filters` are + /// written. If `filters` is empty every k-mer is kept (pure compaction). + /// + /// `mode` controls whether the output stores counts or presence/absence. + /// A count source may be rebuilt in presence mode; a presence source + /// cannot be rebuilt in count mode. + pub fn rebuild>( + output: P, + src: &KmerIndex, + filters: &[Box], + mode: MergeMode, + force: bool, + rep: &mut Reporter, + ) -> OKIResult { + let output = output.as_ref(); + + if src.state() != IndexState::Indexed { + return Err(OKIError::NotIndexed(src.root_path.clone())); + } + + if mode == MergeMode::Count && !src.meta.config.with_counts { + return Err(OKIError::InvalidInput( + "cannot rebuild in count mode from a presence-only source index".into(), + )); + } + + if output.exists() { + if force { + fs::remove_dir_all(output)?; + } else { + return Err(OKIError::Io(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("{}: output directory already exists", output.display()), + ))); + } + } + + // ── Create output directory + metadata ──────────────────────────────── + fs::create_dir_all(output)?; + let mut meta = IndexMeta::new(src.meta.config.clone()); + meta.config.with_counts = mode == MergeMode::Count; + meta.genomes = src.meta.genomes.clone(); + meta.write(output)?; + + let n_genomes = src.meta.genomes.len(); + let n_partitions = src.partition.n_partitions(); + + // ── Create an empty destination KmerPartition ───────────────────────── + // Create the partitions/ subdirectory so KmerPartition::open_with_config works. + fs::create_dir_all(output.join(obikpartitionner::PARTITIONS_SUBDIR))?; + let dst_partition = KmerPartition::open_with_config( + output, + meta.config.kmer_size, + meta.config.minimizer_size, + meta.config.n_bits, + )?; + + info!( + "rebuild: {} partition(s), {} genome(s), mode={:?}", + n_partitions, n_genomes, mode, + ); + + let t = Stage::start("rebuild"); + let pb = ProgressBar::new(n_partitions as u64).with_style( + ProgressStyle::with_template("rebuild — [{bar:20}] {pos}/{len} | {msg}") + .unwrap() + .progress_chars("=> "), + ); + pb.enable_steady_tick(Duration::from_millis(100)); + + let src_partition = &src.partition; + + let errors: Vec = (0..n_partitions) + .into_par_iter() + .filter_map(|i| { + let result = dst_partition + .rebuild_partition(src_partition, i, filters, mode, n_genomes) + .err(); + pb.inc(1); + result + }) + .collect(); + + pb.finish_and_clear(); + + if let Some(e) = errors.into_iter().next() { + return Err(OKIError::Partition(e)); + } + + rep.push(t.stop()); + + // Write SENTINEL_INDEXED — output is ready to use. + fs::File::create(output.join(SENTINEL_INDEXED))?; + + KmerIndex::open(output) + } +} diff --git a/src/obikmer/src/cmd/mod.rs b/src/obikmer/src/cmd/mod.rs index ed66c65..ac4057e 100644 --- a/src/obikmer/src/cmd/mod.rs +++ b/src/obikmer/src/cmd/mod.rs @@ -2,5 +2,6 @@ pub mod distance; pub mod dump; pub mod index; pub mod merge; +pub mod rebuild; pub mod superkmer; pub mod unitig; diff --git a/src/obikmer/src/cmd/rebuild.rs b/src/obikmer/src/cmd/rebuild.rs new file mode 100644 index 0000000..f172811 --- /dev/null +++ b/src/obikmer/src/cmd/rebuild.rs @@ -0,0 +1,111 @@ +use std::path::PathBuf; + +use clap::Args; +use obikindex::{KmerIndex, MergeMode}; +use obikpartitionner::filter::{ + KmerFilter, MaxGenomeCount, MaxGenomeFraction, MaxTotalCount, + MinGenomeCount, MinGenomeFraction, MinTotalCount, +}; +use obisys::Reporter; +use obikseq::{set_k, set_m}; +use tracing::info; + +#[derive(Args)] +pub struct RebuildArgs { + /// Source index directory + pub source: PathBuf, + + /// Output index directory + #[arg(short, long)] + pub output: PathBuf, + + /// Minimum fraction of genomes containing the k-mer [0.0–1.0] + #[arg(long)] + pub min_quorum_frac: Option, + + /// Maximum fraction of genomes containing the k-mer [0.0–1.0] + #[arg(long)] + pub max_quorum_frac: Option, + + /// Minimum number of genomes containing the k-mer + #[arg(long)] + pub min_quorum_count: Option, + + /// Maximum number of genomes containing the k-mer + #[arg(long)] + pub max_quorum_count: Option, + + /// Minimum total count across all genomes (count index only) + #[arg(long)] + pub min_total_count: Option, + + /// Maximum total count across all genomes (count index only) + #[arg(long)] + pub max_total_count: Option, + + /// Per-genome count threshold for presence in quorum filters (default 0) + #[arg(long, default_value = "0")] + pub presence_threshold: u32, + + /// Output as presence/absence instead of counts + #[arg(long)] + pub presence: bool, + + /// Overwrite existing output directory + #[arg(short, long)] + pub force: bool, +} + +pub fn run(args: RebuildArgs) { + let src = KmerIndex::open(&args.source).unwrap_or_else(|e| { + eprintln!("error opening source index: {e}"); + std::process::exit(1); + }); + + set_k(src.kmer_size()); + set_m(src.minimizer_size()); + + let n_genomes = src.meta().genomes.len(); + let mode = if args.presence || !src.meta().config.with_counts { + MergeMode::Presence + } else { + MergeMode::Count + }; + + info!( + "rebuild: {} genome(s), mode={:?}, source={}", + n_genomes, mode, args.source.display() + ); + + let pt = args.presence_threshold; + let mut filters: Vec> = Vec::new(); + + if let Some(v) = args.min_quorum_frac { + filters.push(Box::new(MinGenomeFraction { frac: v, threshold: pt })); + } + if let Some(v) = args.max_quorum_frac { + filters.push(Box::new(MaxGenomeFraction { frac: v, threshold: pt })); + } + if let Some(v) = args.min_quorum_count { + filters.push(Box::new(MinGenomeCount { count: v, threshold: pt })); + } + if let Some(v) = args.max_quorum_count { + filters.push(Box::new(MaxGenomeCount { count: v, threshold: pt })); + } + if let Some(v) = args.min_total_count { + filters.push(Box::new(MinTotalCount { total: v })); + } + if let Some(v) = args.max_total_count { + filters.push(Box::new(MaxTotalCount { total: v })); + } + + let mut rep = Reporter::new(); + KmerIndex::rebuild(&args.output, &src, &filters, mode, args.force, &mut rep) + .unwrap_or_else(|e| { + eprintln!("error rebuilding index: {e}"); + std::process::exit(1); + }); + + rep.print(); + info!("rebuilt index → {}", args.output.display()); +} diff --git a/src/obikmer/src/main.rs b/src/obikmer/src/main.rs index c88428b..bae4cdf 100644 --- a/src/obikmer/src/main.rs +++ b/src/obikmer/src/main.rs @@ -20,6 +20,8 @@ enum Commands { Index(cmd::index::IndexArgs), /// Merge multiple built indexes into one Merge(cmd::merge::MergeArgs), + /// Filter and compact an existing index into a new single-layer index + Rebuild(cmd::rebuild::RebuildArgs), /// Dump all indexed kmers as CSV (kmer + per-genome counts or presence) Dump(cmd::dump::DumpArgs), /// Compute pairwise distance matrix between genomes; optionally build NJ/UPGMA trees @@ -51,6 +53,7 @@ fn main() { Commands::Index(args) => cmd::index::run(args), Commands::Merge(args) => cmd::merge::run(args), Commands::Dump(args) => cmd::dump::run(args), + Commands::Rebuild(args) => cmd::rebuild::run(args), Commands::Distance(args) => cmd::distance::run(args), Commands::Unitig(args) => cmd::unitig::run(args), } diff --git a/src/obikpartitionner/src/filter.rs b/src/obikpartitionner/src/filter.rs new file mode 100644 index 0000000..f35a3e8 --- /dev/null +++ b/src/obikpartitionner/src/filter.rs @@ -0,0 +1,87 @@ +/// Trait for kmer row filters used in the rebuild command. +/// +/// `row` contains raw per-genome counts (or 0/1 for presence/absence data). +/// `n_genomes` equals `row.len()`. +pub trait KmerFilter: Send + Sync { + fn passes(&self, row: &[u32], n_genomes: usize) -> bool; +} + +// ── Quorum filters ───────────────────────────────────────────────────────────── + +fn present_count(row: &[u32], threshold: u32) -> usize { + row.iter().filter(|&&v| v > threshold).count() +} + +/// At least `frac` fraction of genomes contain this kmer (count > `threshold`). +pub struct MinGenomeFraction { + pub frac: f64, + pub threshold: u32, +} + +impl KmerFilter for MinGenomeFraction { + fn passes(&self, row: &[u32], n_genomes: usize) -> bool { + let p = present_count(row, self.threshold); + p as f64 / n_genomes as f64 >= self.frac + } +} + +/// At most `frac` fraction of genomes contain this kmer (count > `threshold`). +pub struct MaxGenomeFraction { + pub frac: f64, + pub threshold: u32, +} + +impl KmerFilter for MaxGenomeFraction { + fn passes(&self, row: &[u32], n_genomes: usize) -> bool { + let p = present_count(row, self.threshold); + p as f64 / n_genomes as f64 <= self.frac + } +} + +/// At least `count` genomes contain this kmer (count > `threshold`). +pub struct MinGenomeCount { + pub count: usize, + pub threshold: u32, +} + +impl KmerFilter for MinGenomeCount { + fn passes(&self, row: &[u32], _n_genomes: usize) -> bool { + present_count(row, self.threshold) >= self.count + } +} + +/// At most `count` genomes contain this kmer (count > `threshold`). +pub struct MaxGenomeCount { + pub count: usize, + pub threshold: u32, +} + +impl KmerFilter for MaxGenomeCount { + fn passes(&self, row: &[u32], _n_genomes: usize) -> bool { + present_count(row, self.threshold) <= self.count + } +} + +// ── Total-count filters (count indexes only) ─────────────────────────────────── + +/// Sum of counts across all genomes >= `total`. +pub struct MinTotalCount { + pub total: u32, +} + +impl KmerFilter for MinTotalCount { + fn passes(&self, row: &[u32], _n_genomes: usize) -> bool { + row.iter().sum::() >= self.total + } +} + +/// Sum of counts across all genomes <= `total`. +pub struct MaxTotalCount { + pub total: u32, +} + +impl KmerFilter for MaxTotalCount { + fn passes(&self, row: &[u32], _n_genomes: usize) -> bool { + row.iter().sum::() <= self.total + } +} diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index 4b03fce..48d72c1 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -1,9 +1,12 @@ +pub mod filter; mod distance; mod dump_layer; mod index_layer; mod kmer_sort; mod merge_layer; mod partition; +mod rebuild_layer; +pub use filter::KmerFilter; pub use merge_layer::MergeMode; pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR}; diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index c933966..21785d0 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -44,7 +44,7 @@ impl ColBuilder { // ── SrcLayerData — opened source matrix for pass-2 lookup ───────────────────── -enum SrcLayerData { +pub(crate) enum SrcLayerData { /// Pure set-membership layer (no data matrix): every kmer is present in all genomes. SetMembership, Presence(MphfLayer, PersistentBitMatrix), @@ -52,7 +52,7 @@ enum SrcLayerData { } impl SrcLayerData { - fn open(layer_dir: &Path, mode: MergeMode) -> SKResult { + pub(crate) fn open(layer_dir: &Path, mode: MergeMode) -> SKResult { let presence_dir = layer_dir.join("presence"); let counts_dir = layer_dir.join("counts"); match mode { @@ -83,7 +83,7 @@ impl SrcLayerData { } /// Return one value per source genome for `kmer`. - fn lookup(&self, kmer: CanonicalKmer, n_genomes: usize) -> Vec { + pub(crate) fn lookup(&self, kmer: CanonicalKmer, n_genomes: usize) -> Vec { match self { SrcLayerData::SetMembership => vec![1u32; n_genomes], SrcLayerData::Presence(mphf, mat) => { diff --git a/src/obikpartitionner/src/rebuild_layer.rs b/src/obikpartitionner/src/rebuild_layer.rs new file mode 100644 index 0000000..fa581f9 --- /dev/null +++ b/src/obikpartitionner/src/rebuild_layer.rs @@ -0,0 +1,205 @@ +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; + +use obicompactvec::{PersistentBitMatrixBuilder, + PersistentBitVecBuilder, + PersistentCompactIntMatrixBuilder, + PersistentCompactIntVecBuilder}; +use obidebruinj::GraphDeBruijn; +use obiskio::{SKError, SKResult, UnitigFileReader}; +use obilayeredmap::{Layer, MphfLayer, OLMError}; +use obilayeredmap::meta::PartitionMeta; + +use crate::filter::KmerFilter; +use crate::merge_layer::{MergeMode, SrcLayerData}; +use crate::partition::KmerPartition; + +const INDEX_SUBDIR: &str = "index"; + +fn olm_to_sk(e: OLMError) -> SKError { + match e { + OLMError::Io(e) => SKError::Io(e), + other => SKError::InvalidData { context: "rebuild", detail: other.to_string() }, + } +} + +fn col_path_bit(dir: &Path, col: usize) -> PathBuf { + dir.join(format!("col_{col:06}.pbiv")) +} + +fn col_path_int(dir: &Path, col: usize) -> PathBuf { + dir.join(format!("col_{col:06}.pciv")) +} + +fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { + fs::write(dir.join("meta.json"), format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n")) +} + +// ── ColBuilder ──────────────────────────────────────────────────────────────── + +enum ColBuilder { + Bit(PersistentBitVecBuilder), + Int(PersistentCompactIntVecBuilder), +} + +impl ColBuilder { + fn set_val(&mut self, slot: usize, value: u32) { + match self { + ColBuilder::Bit(b) => b.set(slot, value > 0), + ColBuilder::Int(b) => b.set(slot, value), + } + } + + fn close(self) -> SKResult<()> { + match self { + ColBuilder::Bit(b) => b.close().map_err(SKError::Io), + ColBuilder::Int(b) => b.close().map_err(SKError::Io), + } + } +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +fn load_meta(dir: &Path) -> SKResult { + match PartitionMeta::load(dir) { + Ok(m) => Ok(m), + Err(e) if matches!(e, OLMError::Io(ref io_e) if io_e.kind() == std::io::ErrorKind::NotFound) => { + let mut n = 0usize; + while dir.join(format!("layer_{n}")).exists() { n += 1; } + let m = PartitionMeta { n_layers: n }; + m.save(dir).map_err(olm_to_sk)?; + Ok(m) + } + Err(e) => Err(olm_to_sk(e)), + } +} + +fn passes_all(filters: &[Box], row: &[u32], n_genomes: usize) -> bool { + filters.iter().all(|f| f.passes(row, n_genomes)) +} + +// ── KmerPartition::rebuild_partition ───────────────────────────────────────── + +impl KmerPartition { + /// Rebuild partition `i` from `src` into `self` (an empty destination partition). + /// + /// Only k-mers whose per-genome row passes all `filters` are written. + /// The output is a single-layer index — regardless of how many layers the + /// source has. + /// + /// `n_genomes` is the number of genome columns in the source (and destination). + pub fn rebuild_partition( + &self, + src: &KmerPartition, + i: usize, + filters: &[Box], + mode: MergeMode, + n_genomes: usize, + ) -> SKResult<()> { + let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); + if !src_index_dir.exists() { + return Ok(()); + } + + let src_meta = load_meta(&src_index_dir)?; + if src_meta.n_layers == 0 { + return Ok(()); + } + + // ── Pass 1: collect filtered kmers into de Bruijn graph ─────────────── + let mut g = GraphDeBruijn::new(); + + for l in 0..src_meta.n_layers { + let src_layer_dir = src_index_dir.join(format!("layer_{l}")); + let unitigs_path = src_layer_dir.join("unitigs.bin"); + if !unitigs_path.exists() { continue; } + + let reader = UnitigFileReader::open(&unitigs_path)?; + let src_data = SrcLayerData::open(&src_layer_dir, mode)?; + + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + let row = src_data.lookup(kmer, n_genomes); + if passes_all(filters, &row, n_genomes) { + g.push(kmer); + } + } + } + + if g.len() == 0 { + return Ok(()); + } + + let n_new = g.len(); + g.compute_degrees(); + + // ── Build MPHF in dst layer_0 ───────────────────────────────────────── + let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); + let dst_layer_dir = dst_index_dir.join("layer_0"); + fs::create_dir_all(&dst_layer_dir)?; + + let mut uw = Layer::<()>::unitig_writer(&dst_layer_dir).map_err(olm_to_sk)?; + for unitig in g.iter_unitig() { + uw.write(&unitig)?; + } + uw.close()?; + drop(g); + + Layer::<()>::build(&dst_layer_dir).map_err(olm_to_sk)?; + let dst_mphf = MphfLayer::open(&dst_layer_dir).map_err(olm_to_sk)?; + + // ── Prepare matrix builders (one column per genome) ─────────────────── + let data_dir = match mode { + MergeMode::Presence => dst_layer_dir.join("presence"), + MergeMode::Count => dst_layer_dir.join("counts"), + }; + fs::create_dir_all(&data_dir)?; + + let mut builders: Vec = match mode { + MergeMode::Presence => { + PersistentBitMatrixBuilder::new(n_new, &data_dir) + .map_err(SKError::Io)?.close().map_err(SKError::Io)?; + (0..n_genomes).map(|g| -> SKResult { + let b = PersistentBitVecBuilder::new(n_new, &col_path_bit(&data_dir, g))?; + Ok(ColBuilder::Bit(b)) + }).collect::>()? + } + MergeMode::Count => { + PersistentCompactIntMatrixBuilder::new(n_new, &data_dir) + .map_err(SKError::Io)?.close().map_err(SKError::Io)?; + (0..n_genomes).map(|g| -> SKResult { + let b = PersistentCompactIntVecBuilder::new(n_new, &col_path_int(&data_dir, g))?; + Ok(ColBuilder::Int(b)) + }).collect::>()? + } + }; + + // ── Pass 2: fill builders ───────────────────────────────────────────── + for l in 0..src_meta.n_layers { + let src_layer_dir = src_index_dir.join(format!("layer_{l}")); + let unitigs_path = src_layer_dir.join("unitigs.bin"); + if !unitigs_path.exists() { continue; } + + let reader = UnitigFileReader::open(&unitigs_path)?; + let src_data = SrcLayerData::open(&src_layer_dir, mode)?; + + for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { + let row = src_data.lookup(kmer, n_genomes); + if !passes_all(filters, &row, n_genomes) { continue; } + if let Some(slot) = dst_mphf.find(kmer) { + for (col, &value) in row.iter().enumerate() { + builders[col].set_val(slot, value); + } + } + } + } + + // ── Close builders, write metadata ──────────────────────────────────── + for b in builders { b.close()?; } + write_matrix_meta(&data_dir, n_new, n_genomes).map_err(SKError::Io)?; + + PartitionMeta { n_layers: 1 }.save(&dst_index_dir).map_err(olm_to_sk)?; + + Ok(()) + } +}