feat: add select command for genome column projection and aggregation

Introduces the `select` CLI command to project and aggregate genome-level k-mer data by column. Adds `filter` as an alias for `rebuild`. The implementation uses parallel partition processing, supports metadata-driven grouping with configurable aggregation operators, and performs atomic in-place rewrites or filtered exports. Updates documentation and navigation accordingly.
This commit is contained in:
Eric Coissac
2026-06-09 15:05:08 +02:00
parent b0dab452f6
commit e66adef23d
11 changed files with 958 additions and 1 deletions
+1
View File
@@ -7,6 +7,7 @@ mod index;
mod merge;
mod rebuild;
mod reindex;
mod select;
mod stats;
pub use error::{OKIError, OKIResult};
+166
View File
@@ -0,0 +1,166 @@
use std::fs;
use std::io;
use std::path::Path;
use obikpartitionner::{KmerPartition, OutputCol, PARTITIONS_SUBDIR};
use obisys::{Stage, progress_bar};
use rayon::prelude::*;
use tracing::info;
use crate::error::{OKIError, OKIResult};
use crate::index::KmerIndex;
use crate::meta::{GenomeInfo, IndexMeta};
use crate::state::{IndexState, SENTINEL_INDEXED};
impl KmerIndex {
/// Create a new index at `output` by projecting/aggregating the genome columns
/// of `src` according to `specs`.
///
/// `output_presence` — if true, output uses bit matrices (0/1), regardless of
/// whether the source stores counts. The caller is responsible for ensuring all
/// specs use logical operators when `output_presence=true` on a count source.
pub fn select<P: AsRef<Path>>(
output: P,
src: &KmerIndex,
specs: &[OutputCol],
threshold: u32,
output_presence: bool,
force: bool,
) -> OKIResult<Self> {
let output = output.as_ref();
if src.state() != IndexState::Indexed {
return Err(OKIError::NotIndexed(src.root_path.clone()));
}
if output.exists() {
if force {
fs::remove_dir_all(output)?;
} else {
return Err(OKIError::Io(io::Error::new(
io::ErrorKind::AlreadyExists,
format!("{}: output directory already exists", output.display()),
)));
}
}
fs::create_dir_all(output)?;
let mut meta = IndexMeta::new(src.meta.config.clone());
meta.config.with_counts = !output_presence;
meta.genomes = specs.iter()
.map(|s| GenomeInfo::new(s.label.clone()))
.collect();
meta.write(output)?;
let n_src_genomes = src.meta.genomes.len();
let n_partitions = src.partition.n_partitions();
fs::create_dir_all(output.join(PARTITIONS_SUBDIR))?;
let dst_partition = KmerPartition::open_with_config(
output,
meta.config.kmer_size,
meta.config.minimizer_size,
meta.config.n_bits,
)?;
info!(
"select: {} partition(s), {} source genome(s) → {} output column(s)",
n_partitions, n_src_genomes, specs.len(),
);
let t = Stage::start("select");
let pb = progress_bar("select", n_partitions as u64, "partitions");
let src_partition = &src.partition;
let errors: Vec<obiskio::SKError> = (0..n_partitions)
.into_par_iter()
.filter_map(|i| {
let result = dst_partition.select_partition(
src_partition, i, specs,
n_src_genomes, threshold, output_presence,
false,
);
pb.inc(1);
result.err()
})
.collect();
pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::Partition(e));
}
let _ = t.stop();
fs::File::create(output.join(SENTINEL_INDEXED))?;
let idx = KmerIndex::open(output)?;
idx.pack_matrices()?;
Ok(idx)
}
/// Rewrite the genome columns of this index in-place according to `specs`.
///
/// The MPHF and unitig files are unchanged; only data matrices are rewritten.
pub fn select_in_place(
&mut self,
specs: &[OutputCol],
threshold: u32,
output_presence: bool,
) -> OKIResult<()> {
if self.state() != IndexState::Indexed {
return Err(OKIError::NotIndexed(self.root_path.clone()));
}
let n_src_genomes = self.meta.genomes.len();
let n_partitions = self.partition.n_partitions();
// Open a second handle to the same path so we can borrow src and dst simultaneously.
let src_partition = KmerPartition::open_with_config(
&self.root_path,
self.meta.config.kmer_size,
self.meta.config.minimizer_size,
self.meta.config.n_bits,
)?;
info!(
"select (in-place): {} partition(s), {} source genome(s) → {} output column(s)",
n_partitions, n_src_genomes, specs.len(),
);
let t = Stage::start("select");
let pb = progress_bar("select", n_partitions as u64, "partitions");
let errors: Vec<obiskio::SKError> = (0..n_partitions)
.into_par_iter()
.filter_map(|i| {
let result = self.partition.select_partition(
&src_partition, i, specs,
n_src_genomes, threshold, output_presence,
true,
);
pb.inc(1);
result.err()
})
.collect();
pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::Partition(e));
}
let _ = t.stop();
// Update index.meta with new genome list and with_counts flag.
self.meta.config.with_counts = !output_presence;
self.meta.genomes = specs.iter()
.map(|s| GenomeInfo::new(s.label.clone()))
.collect();
self.meta.write(&self.root_path)?;
self.pack_matrices()?;
Ok(())
}
}
+1
View File
@@ -1,6 +1,7 @@
pub mod annotate;
pub mod pack;
pub(crate) mod predicate;
pub mod select;
pub mod utils;
pub mod distance;
pub mod dump;
+10
View File
@@ -230,6 +230,16 @@ impl FilterArgs {
}
}
/// Returns indices of genomes matching `pred_str` (single predicate).
pub fn matching_genome_indices(pred_str: &str, genomes: &[GenomeInfo]) -> Result<Vec<usize>, String> {
let pred = MetaPred::parse(pred_str)?;
Ok(genomes.iter().enumerate()
.filter_map(|(i, g)| {
if pred.eval(&g.meta) == Some(true) { Some(i) } else { std::option::Option::None }
})
.collect())
}
pub struct GroupFilterParams {
pub threshold: u32,
pub min_count: Option<usize>,
+248
View File
@@ -0,0 +1,248 @@
use std::collections::{BTreeMap, HashMap};
use std::path::PathBuf;
use clap::{Args, ValueEnum};
use obikindex::{GenomeInfo, KmerIndex};
use obikpartitionner::{AggOp, OutputCol};
use tracing::info;
use super::predicate::matching_genome_indices;
// ── CLI types ─────────────────────────────────────────────────────────────────
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub enum AggOpArg {
Any,
All,
None,
Sum,
Min,
Max,
}
impl From<AggOpArg> for AggOp {
fn from(a: AggOpArg) -> Self {
match a {
AggOpArg::Any => AggOp::Any,
AggOpArg::All => AggOp::All,
AggOpArg::None => AggOp::None,
AggOpArg::Sum => AggOp::Sum,
AggOpArg::Min => AggOp::Min,
AggOpArg::Max => AggOp::Max,
}
}
}
#[derive(Args)]
pub struct SelectArgs {
/// Source index directory
pub source: PathBuf,
/// Output index directory (mutually exclusive with --in-place)
#[arg(long, conflicts_with = "in_place")]
pub output: Option<PathBuf>,
/// Rewrite the source index in-place (mutually exclusive with --output)
#[arg(long)]
pub in_place: bool,
/// Define a named group: `<name>:<pred>` (repeatable; mutually exclusive with --aggregate-by)
#[arg(long, value_name = "NAME:PRED", conflicts_with = "aggregate_by")]
pub group: Vec<String>,
/// Per-group aggregation operator: `<name>:<op>` (repeatable)
#[arg(long, value_name = "NAME:OP")]
pub group_op: Vec<String>,
/// Auto-create one group per unique value of metadata key <KEY>
#[arg(long, value_name = "KEY", conflicts_with = "group")]
pub aggregate_by: Option<String>,
/// Aggregation operator for all auto-generated groups
#[arg(long, value_name = "OP")]
pub aggregate_op: Option<AggOpArg>,
/// Output columns in order: group names or genome labels, comma-separated
#[arg(long, value_name = "COL,...", value_delimiter = ',')]
pub select: Option<Vec<String>>,
/// Minimum count to consider a genome as "carrying" the k-mer (logical ops only)
#[arg(long, default_value = "0")]
pub presence_threshold: u32,
/// Overwrite existing output directory
#[arg(short, long)]
pub force: bool,
}
// ── Helpers ───────────────────────────────────────────────────────────────────
fn parse_name_value(s: &str, flag: &str) -> (String, String) {
match s.find(':') {
Some(pos) => (s[..pos].trim().to_string(), s[pos + 1..].to_string()),
std::option::Option::None => {
eprintln!("error in {flag}: expected <name>:<value>, got: {s}");
std::process::exit(1);
}
}
}
fn parse_agg_op(s: &str) -> AggOp {
match s.to_lowercase().as_str() {
"any" => AggOp::Any,
"all" => AggOp::All,
"none" => AggOp::None,
"sum" => AggOp::Sum,
"min" => AggOp::Min,
"max" => AggOp::Max,
other => {
eprintln!("unknown aggregation operator: {other}; valid: any, all, none, sum, min, max");
std::process::exit(1);
}
}
}
fn default_op(src_is_count: bool) -> AggOp {
if src_is_count { AggOp::Sum } else { AggOp::Any }
}
// ── build_specs ───────────────────────────────────────────────────────────────
/// Resolve CLI arguments into an ordered list of `OutputCol`.
///
/// Returns `(specs, output_presence)`.
fn build_specs(
args: &SelectArgs,
genomes: &[GenomeInfo],
src_is_count: bool,
) -> (Vec<OutputCol>, bool) {
// ── 1. Build group_indices: name → Vec<usize> ────────────────────────────
// Also keep insertion order for the default `--select *` case.
let mut group_order: Vec<String> = Vec::new();
let mut group_indices: HashMap<String, Vec<usize>> = HashMap::new();
if let Some(ref key) = args.aggregate_by {
// One group per unique value of `key`, in sorted order.
let mut value_to_indices: BTreeMap<String, Vec<usize>> = BTreeMap::new();
for (i, g) in genomes.iter().enumerate() {
if let Some(v) = g.meta.get(key) {
value_to_indices.entry(v.clone()).or_default().push(i);
}
}
for (v, idxs) in value_to_indices {
group_order.push(v.clone());
group_indices.insert(v, idxs);
}
} else {
for raw in &args.group {
let (name, pred) = parse_name_value(raw, "--group");
let idxs = matching_genome_indices(&pred, genomes).unwrap_or_else(|e| {
eprintln!("error in --group {name}: {e}");
std::process::exit(1);
});
if !group_indices.contains_key(&name) {
group_order.push(name.clone());
}
group_indices.insert(name, idxs);
}
}
// ── 2. Build per-group ops ────────────────────────────────────────────────
let global_op = args.aggregate_op.map(AggOp::from);
let mut group_op: HashMap<String, AggOp> = HashMap::new();
for raw in &args.group_op {
let (name, op_str) = parse_name_value(raw, "--group-op");
if !group_indices.contains_key(&name) {
eprintln!("--group-op references undefined group: {name}");
std::process::exit(1);
}
group_op.insert(name, parse_agg_op(&op_str));
}
// ── 3. Genome label → index map for pass-through columns ─────────────────
let label_to_idx: HashMap<&str, usize> = genomes.iter().enumerate()
.map(|(i, g)| (g.label.as_str(), i))
.collect();
// ── 4. Determine output column names ─────────────────────────────────────
let col_names: Vec<String> = if let Some(ref sel) = args.select {
sel.clone()
} else if !group_order.is_empty() {
group_order.clone()
} else {
// Identity: all genomes in original order
genomes.iter().map(|g| g.label.clone()).collect()
};
// ── 5. Build OutputCol list ───────────────────────────────────────────────
let mut specs: Vec<OutputCol> = Vec::with_capacity(col_names.len());
for name in &col_names {
if let Some(idxs) = group_indices.get(name) {
let op = group_op.get(name)
.copied()
.or(global_op)
.unwrap_or_else(|| default_op(src_is_count));
specs.push(OutputCol { label: name.clone(), indices: idxs.clone(), op });
} else if let Some(&idx) = label_to_idx.get(name.as_str()) {
// Pass-through: single-element group with default op.
let op = default_op(src_is_count);
specs.push(OutputCol { label: name.clone(), indices: vec![idx], op });
} else {
eprintln!("--select: unknown column '{name}' (not a group name or genome label)");
std::process::exit(1);
}
}
if specs.is_empty() {
eprintln!("select: no output columns defined");
std::process::exit(1);
}
// ── 6. Determine output type ──────────────────────────────────────────────
let output_presence = !src_is_count
|| specs.iter().all(|s| s.op.is_logical());
(specs, output_presence)
}
// ── run ───────────────────────────────────────────────────────────────────────
pub fn run(args: SelectArgs) {
if !args.in_place && args.output.is_none() {
eprintln!("error: one of --output or --in-place must be specified");
std::process::exit(1);
}
let mut src = KmerIndex::open(&args.source).unwrap_or_else(|e| {
eprintln!("error opening source index: {e}");
std::process::exit(1);
});
let src_is_count = src.meta().config.with_counts;
let (specs, output_presence) = build_specs(&args, &src.meta().genomes.clone(), src_is_count);
info!(
"select: {} genome(s) → {} output column(s), output={}",
src.meta().genomes.len(),
specs.len(),
if output_presence { "presence" } else { "count" },
);
if args.in_place {
src.select_in_place(&specs, args.presence_threshold, output_presence)
.unwrap_or_else(|e| {
eprintln!("select error: {e}");
std::process::exit(1);
});
info!("selected in-place → {}", args.source.display());
} else {
let output = args.output.unwrap();
KmerIndex::select(&output, &src, &specs, args.presence_threshold, output_presence, args.force)
.unwrap_or_else(|e| {
eprintln!("select error: {e}");
std::process::exit(1);
});
info!("selected index → {}", output.display());
}
}
+6
View File
@@ -22,6 +22,10 @@ enum Commands {
Merge(cmd::merge::MergeArgs),
/// Filter and compact an existing index into a new single-layer index
Rebuild(cmd::rebuild::RebuildArgs),
/// Alias for rebuild
Filter(cmd::rebuild::RebuildArgs),
/// Project and/or aggregate genome columns into a new or in-place index
Select(cmd::select::SelectArgs),
/// Query an index with sequences and annotate matches
Query(cmd::query::QueryArgs),
/// Dump all indexed kmers as CSV (kmer + per-genome counts or presence)
@@ -66,6 +70,8 @@ fn main() {
Commands::Merge(args) => cmd::merge::run(args),
Commands::Dump(args) => cmd::dump::run(args),
Commands::Rebuild(args) => cmd::rebuild::run(args),
Commands::Filter(args) => cmd::rebuild::run(args),
Commands::Select(args) => cmd::select::run(args),
Commands::Query(args) => cmd::query::run(args),
Commands::Annotate(args) => cmd::annotate::run(args),
Commands::Distance(args) => cmd::distance::run(args),
+2
View File
@@ -7,7 +7,9 @@ mod merge_layer;
mod partition;
mod query_layer;
mod rebuild_layer;
mod select_layer;
pub use filter::{GroupQuorumFilter, KmerFilter, passes_all};
pub use merge_layer::MergeMode;
pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR};
pub use select_layer::{AggOp, OutputCol};
+287
View File
@@ -0,0 +1,287 @@
use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use obicompactvec::{
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder,
};
use obilayeredmap::meta::PartitionMeta;
use obilayeredmap::OLMError;
use obiskio::{SKError, SKResult};
use crate::partition::KmerPartition;
const INDEX_SUBDIR: &str = "index";
// ── AggOp ─────────────────────────────────────────────────────────────────────
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AggOp {
Any,
All,
None,
Sum,
Min,
Max,
}
impl AggOp {
pub fn is_logical(self) -> bool {
matches!(self, AggOp::Any | AggOp::All | AggOp::None)
}
}
// ── OutputCol ─────────────────────────────────────────────────────────────────
pub struct OutputCol {
pub label: String,
pub indices: Vec<usize>,
pub op: AggOp,
}
// ── Aggregation ───────────────────────────────────────────────────────────────
#[inline]
fn aggregate(op: AggOp, indices: &[usize], src_row: &[u32], threshold: u32) -> u32 {
match op {
AggOp::Any => {
if indices.iter().any(|&i| src_row[i] > threshold) { 1 } else { 0 }
}
AggOp::All => {
if indices.is_empty() { return 0; }
if indices.iter().all(|&i| src_row[i] > threshold) { 1 } else { 0 }
}
AggOp::None => {
if indices.iter().all(|&i| src_row[i] <= threshold) { 1 } else { 0 }
}
AggOp::Sum => {
indices.iter().map(|&i| src_row[i]).fold(0u32, |a, b| a.saturating_add(b))
}
AggOp::Min => indices.iter().map(|&i| src_row[i]).min().unwrap_or(0),
AggOp::Max => indices.iter().map(|&i| src_row[i]).max().unwrap_or(0),
}
}
// ── ColBuilder ────────────────────────────────────────────────────────────────
enum ColBuilder {
Bit(PersistentBitVecBuilder),
Int(PersistentCompactIntVecBuilder),
}
impl ColBuilder {
fn set_val(&mut self, slot: usize, value: u32) {
match self {
ColBuilder::Bit(b) => b.set(slot, value > 0),
ColBuilder::Int(b) => b.set(slot, value),
}
}
fn close(self) -> SKResult<()> {
match self {
ColBuilder::Bit(b) => b.close().map_err(SKError::Io),
ColBuilder::Int(b) => b.close().map_err(SKError::Io),
}
}
}
// ── Helpers ───────────────────────────────────────────────────────────────────
fn olm_to_sk(e: OLMError) -> SKError {
match e {
OLMError::Io(e) => SKError::Io(e),
other => SKError::InvalidData { context: "select", detail: other.to_string() },
}
}
fn col_path_bit(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pbiv"))
}
fn col_path_int(dir: &Path, col: usize) -> PathBuf {
dir.join(format!("col_{col:06}.pciv"))
}
fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> {
fs::write(
dir.join("meta.json"),
format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"),
)
}
/// Copy all plain files (not subdirectories) from `src_dir` to `dst_dir`.
fn copy_layer_files(src_dir: &Path, dst_dir: &Path) -> io::Result<()> {
for entry in fs::read_dir(src_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_file() {
fs::copy(&path, dst_dir.join(entry.file_name()))?;
}
}
Ok(())
}
// ── fill_builders ─────────────────────────────────────────────────────────────
fn fill_builders(
builders: &mut [ColBuilder],
specs: &[OutputCol],
n: usize,
n_src: usize,
src_layer_dir: &Path,
src_is_count: bool,
threshold: u32,
) -> SKResult<()> {
let mut src_buf = vec![0u32; n_src];
if src_is_count {
let mat = PersistentCompactIntMatrix::open(src_layer_dir).map_err(SKError::Io)?;
for slot in 0..n {
mat.fill_row(slot, &mut src_buf);
for (col, spec) in specs.iter().enumerate() {
builders[col].set_val(slot, aggregate(spec.op, &spec.indices, &src_buf, threshold));
}
}
} else {
let mat = PersistentBitMatrix::open(src_layer_dir).map_err(SKError::Io)?;
for slot in 0..n {
mat.fill_row(slot, &mut src_buf);
for (col, spec) in specs.iter().enumerate() {
builders[col].set_val(slot, aggregate(spec.op, &spec.indices, &src_buf, threshold));
}
}
}
Ok(())
}
// ── KmerPartition::select_partition ──────────────────────────────────────────
impl KmerPartition {
/// Rewrite the data matrices of partition `i` in `src` into `self`.
///
/// `specs` defines the output columns (projection/aggregation).
/// `output_presence` — if true, all output builders use bit (0/1) format.
/// `in_place` — `self` and `src` share the same root; write to temp dirs then swap.
pub fn select_partition(
&self,
src: &KmerPartition,
i: usize,
specs: &[OutputCol],
n_src_genomes: usize,
threshold: u32,
output_presence: bool,
in_place: bool,
) -> SKResult<()> {
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
if !src_index_dir.exists() {
return Ok(());
}
let src_meta = PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)?;
if src_meta.n_layers == 0 {
return Ok(());
}
let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR);
if !in_place {
fs::create_dir_all(&dst_index_dir)?;
}
let n_out = specs.len();
let data_subdir = if output_presence { "presence" } else { "counts" };
for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
if !src_layer_dir.exists() { continue; }
let dst_layer_dir = dst_index_dir.join(format!("layer_{l}"));
let counts_dir = src_layer_dir.join("counts");
let presence_dir = src_layer_dir.join("presence");
let src_is_count = counts_dir.exists() && !presence_dir.exists();
// Determine number of slots from the source matrix.
let n = if counts_dir.exists() {
PersistentCompactIntMatrix::open(&src_layer_dir).map_err(SKError::Io)?.n()
} else if presence_dir.exists() {
PersistentBitMatrix::open(&src_layer_dir).map_err(SKError::Io)?.n()
} else {
// Implicit single-genome layer: no data matrix needed in output either.
if !in_place {
fs::create_dir_all(&dst_layer_dir)?;
copy_layer_files(&src_layer_dir, &dst_layer_dir)?;
}
continue;
};
// Choose the output data directory (temp name for in-place).
let (dst_data_dir, final_data_dir) = if in_place {
let tmp = dst_layer_dir.join(format!("{data_subdir}_new"));
let perm = dst_layer_dir.join(data_subdir);
(tmp, perm)
} else {
let perm = dst_layer_dir.join(data_subdir);
(perm.clone(), perm)
};
if !in_place {
fs::create_dir_all(&dst_layer_dir)?;
copy_layer_files(&src_layer_dir, &dst_layer_dir)?;
}
fs::create_dir_all(&dst_data_dir)?;
// Initialise packed-format skeleton.
if output_presence {
PersistentBitMatrixBuilder::new(n, &dst_data_dir)
.map_err(SKError::Io)?.close().map_err(SKError::Io)?;
} else {
PersistentCompactIntMatrixBuilder::new(n, &dst_data_dir)
.map_err(SKError::Io)?.close().map_err(SKError::Io)?;
}
// Create column builders.
let mut builders: Vec<ColBuilder> = (0..n_out)
.map(|col| -> SKResult<ColBuilder> {
if output_presence {
Ok(ColBuilder::Bit(PersistentBitVecBuilder::new(
n, &col_path_bit(&dst_data_dir, col),
)?))
} else {
Ok(ColBuilder::Int(PersistentCompactIntVecBuilder::new(
n, &col_path_int(&dst_data_dir, col),
)?))
}
})
.collect::<SKResult<_>>()?;
fill_builders(
&mut builders, specs, n, n_src_genomes,
&src_layer_dir, src_is_count, threshold,
)?;
for b in builders { b.close()?; }
write_matrix_meta(&dst_data_dir, n, n_out).map_err(SKError::Io)?;
// In-place: swap old data dir for new.
if in_place {
let old_data_dir = if src_is_count {
dst_layer_dir.join("counts")
} else {
dst_layer_dir.join("presence")
};
if old_data_dir.exists() {
fs::remove_dir_all(&old_data_dir)?;
}
fs::rename(&dst_data_dir, &final_data_dir)?;
}
}
if !in_place {
PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)?
.save(&dst_index_dir).map_err(olm_to_sk)?;
}
Ok(())
}
}