Files
obikmer/src/obikindex/src/select.rs
T
Eric Coissac c1d6f277ce feat(select): add metrics reporting to selection methods
Integrates an obisys::Reporter across indexing and command modules to capture execution metrics. Replaces discarded timer stops with explicit rep.push() calls, adds timing instrumentation for the pack stage, and prints collected reports after each selection branch.
2026-06-22 10:25:24 +02:00

151 lines
5.0 KiB
Rust

use std::fs;
use std::io;
use std::path::Path;
use obikpartitionner::{KmerPartition, OutputCol, PARTITIONS_SUBDIR};
use obisys::{Reporter, Stage, progress_bar};
use tracing::info;
use crate::error::{OKIError, OKIResult};
use crate::index::KmerIndex;
use crate::meta::{GenomeInfo, IndexMeta};
use crate::state::{IndexState, SENTINEL_INDEXED};
impl KmerIndex {
/// Create a new index at `output` by projecting/aggregating the genome columns
/// of `src` according to `specs`.
///
/// `output_presence` — if true, output uses bit matrices (0/1), regardless of
/// whether the source stores counts. The caller is responsible for ensuring all
/// specs use logical operators when `output_presence=true` on a count source.
pub fn select<P: AsRef<Path>>(
output: P,
src: &KmerIndex,
specs: &[OutputCol],
threshold: u32,
output_presence: bool,
force: bool,
rep: &mut Reporter,
) -> OKIResult<Self> {
let output = output.as_ref();
if src.state() != IndexState::Indexed {
return Err(OKIError::NotIndexed(src.root_path.clone()));
}
if output.exists() {
if force {
fs::remove_dir_all(output)?;
} else {
return Err(OKIError::Io(io::Error::new(
io::ErrorKind::AlreadyExists,
format!("{}: output directory already exists", output.display()),
)));
}
}
fs::create_dir_all(output)?;
let mut meta = IndexMeta::new(src.meta.config.clone());
meta.config.with_counts = !output_presence;
meta.genomes = specs.iter()
.map(|s| GenomeInfo::new(s.label.clone()))
.collect();
meta.write(output)?;
let n_src_genomes = src.meta.genomes.len();
let n_partitions = src.partition.n_partitions();
fs::create_dir_all(output.join(PARTITIONS_SUBDIR))?;
let dst_partition = KmerPartition::open_with_config(
output,
meta.config.kmer_size,
meta.config.minimizer_size,
meta.config.n_bits,
)?;
info!(
"select: {} partition(s), {} source genome(s) → {} output column(s)",
n_partitions, n_src_genomes, specs.len(),
);
let t = Stage::start("select");
let pb = progress_bar("select", n_partitions as u64, "partitions");
let src_partition = &src.partition;
let order: Vec<usize> = (0..n_partitions).collect();
let runner = crate::numa::PartitionRunner::new();
runner.run(
&order,
|i| dst_partition.select_partition(src_partition, i, specs, n_src_genomes, threshold, output_presence, false),
|_, _, _| { pb.inc(1); },
).map_err(OKIError::Partition)?;
pb.finish_and_clear();
rep.push(t.stop());
fs::File::create(output.join(SENTINEL_INDEXED))?;
let idx = KmerIndex::open(output)?;
let t_pack = Stage::start("pack");
idx.pack_matrices()?;
rep.push(t_pack.stop());
Ok(idx)
}
/// Rewrite the genome columns of this index in-place according to `specs`.
///
/// The MPHF and unitig files are unchanged; only data matrices are rewritten.
pub fn select_in_place(
&mut self,
specs: &[OutputCol],
threshold: u32,
output_presence: bool,
rep: &mut Reporter,
) -> OKIResult<()> {
if self.state() != IndexState::Indexed {
return Err(OKIError::NotIndexed(self.root_path.clone()));
}
let n_src_genomes = self.meta.genomes.len();
let n_partitions = self.partition.n_partitions();
let src_partition = KmerPartition::open_with_config(
&self.root_path,
self.meta.config.kmer_size,
self.meta.config.minimizer_size,
self.meta.config.n_bits,
)?;
info!(
"select (in-place): {} partition(s), {} source genome(s) → {} output column(s)",
n_partitions, n_src_genomes, specs.len(),
);
let t = Stage::start("select");
let pb = progress_bar("select", n_partitions as u64, "partitions");
let partition = &self.partition;
let order: Vec<usize> = (0..n_partitions).collect();
let runner = crate::numa::PartitionRunner::new();
runner.run(
&order,
|i| partition.select_partition(&src_partition, i, specs, n_src_genomes, threshold, output_presence, true),
|_, _, _| { pb.inc(1); },
).map_err(OKIError::Partition)?;
pb.finish_and_clear();
rep.push(t.stop());
self.meta.config.with_counts = !output_presence;
self.meta.genomes = specs.iter()
.map(|s| GenomeInfo::new(s.label.clone()))
.collect();
self.meta.write(&self.root_path)?;
let t_pack = Stage::start("pack");
self.pack_matrices()?;
rep.push(t_pack.stop());
Ok(())
}
}