diff --git a/docmd/implementation/filtering.md b/docmd/implementation/filtering.md index 795d8fd..4dfab31 100644 --- a/docmd/implementation/filtering.md +++ b/docmd/implementation/filtering.md @@ -1,12 +1,12 @@ # Kmer filtering and ingroup/outgroup predicates -The `rebuild`, `dump`, and `unitig` commands share the same filtering system, +The `filter`, `dump`, and `unitig` commands share the same filtering system, implemented as a shared `FilterArgs` clap argument group embedded in each command via `#[command(flatten)]`. Filters select k-mers based on per-genome quorum counts, optionally restricted to **ingroup** and **outgroup** genome sets derived from genome metadata. All rules described here apply identically to all three commands. -`rebuild` additionally accepts `--min-total-count` / `--max-total-count` filters +`filter` additionally accepts `--min-total-count` / `--max-total-count` filters that operate on the sum of counts across all genomes. ## Predicate syntax @@ -93,8 +93,8 @@ For each genome: | `--max-outgroup-count N` | outgroup | k-mer present in at most N outgroup genomes | | `--min-outgroup-frac F` | outgroup | k-mer present in at least fraction F of outgroup genomes | | `--max-outgroup-frac F` | outgroup | k-mer present in at most fraction F of outgroup genomes | -| `--min-total-count N` | all genomes | sum of per-genome counts ≥ N (`rebuild` only) | -| `--max-total-count N` | all genomes | sum of per-genome counts ≤ N (`rebuild` only) | +| `--min-total-count N` | all genomes | sum of per-genome counts ≥ N (`filter` only) | +| `--max-total-count N` | all genomes | sum of per-genome counts ≤ N (`filter` only) | | `--presence-threshold N` | all | per-genome count > N to be considered "present" (default 0) | **Conditional defaults** — the defaults for `--min-frac` and `--max-outgroup-count` depend on two conditions: @@ -169,7 +169,7 @@ Keep k-mers specific to *Betula nana* — present in at least 2 *B. nana* genome and absent from every other genome in the index: ```sh -obikmer rebuild src --output dst \ +obikmer filter src --output dst \ --ingroup "species=Betula_nana" \ --outgroup "*" \ --min-count 2 \ @@ -180,7 +180,7 @@ Keep k-mers found in at least 2 *Betula nana* genomes and absent from all other *Betula*: ```sh -obikmer rebuild src --output dst \ +obikmer filter src --output dst \ --ingroup "species=Betula_nana" \ --outgroup "genus=Betula" \ --min-count 2 \ @@ -191,7 +191,7 @@ Use taxonomic paths — keep k-mers present in ≥ 50 % of the *Betula* clade and in fewer than 10 % of everything outside *Betulaceae*: ```sh -obikmer rebuild src --output dst \ +obikmer filter src --output dst \ --ingroup "taxon~/Betulaceae/Betula" \ --outgroup "taxon!~/Betulaceae" \ --min-frac 0.5 \ @@ -201,7 +201,7 @@ obikmer rebuild src --output dst \ Multiple outgroup predicates (OR): exclude k-mers present in *Alnus* or *Carpinus*: ```sh -obikmer rebuild src --output dst \ +obikmer filter src --output dst \ --ingroup "genus=Betula" \ --outgroup "genus=Alnus" \ --outgroup "genus=Carpinus" \ @@ -244,7 +244,7 @@ obikmer dump myindex --head 20 --ingroup "species=Betula_nana" --min-count 1 ### `distance --presence-threshold N` When computing Jaccard distance on a **count index**, a k-mer is considered present in a genome if its count is ≥ N (default 1). -This option is independent of the `--presence-threshold` used in `rebuild`/`query` filtering. +This option is independent of the `--presence-threshold` used in filtering. ```sh # Jaccard treating kmers with count ≥ 2 as present @@ -262,11 +262,11 @@ This parameter has no effect on presence/absence indexes (where values are alrea lookup and counter. - **`obikmer::cmd::predicate::FilterArgs`** — shared `clap` argument group - embedded via `#[command(flatten)]` in `RebuildArgs`, `DumpArgs`, and + embedded via `#[command(flatten)]` in `FilterArgs`, `DumpArgs`, and `UnitigArgs`. `FilterArgs::build_filters()` returns a ready-to-use filter list. - **`obikpartitionner::KmerPartition::iter_partition_kmers`** — accepts `filters: &[Box]` and applies them per-kmer before invoking - the callback. `rebuild`, `dump`, and `unitig` all go through this single + the callback. `filter`, `dump`, and `unitig` all go through this single entry point. diff --git a/docmd/implementation/select.md b/docmd/implementation/select.md new file mode 100644 index 0000000..d78a3fd --- /dev/null +++ b/docmd/implementation/select.md @@ -0,0 +1,234 @@ +# `select` — column projection and aggregation + +`select` transforms an index by operating on its **genome columns**: projecting a +subset of columns, aggregating groups of genomes into synthetic columns, or both. +It is the column-axis counterpart of `filter` (row-axis operations). + +Following relational algebra conventions: + +| Command | Relational operation | Axis | +|----------|---------------------|----------| +| `filter` | σ — selection | rows (k-mers) | +| `select` | π — projection | columns (genomes) | + +The two commands compose naturally: run `filter` first to restrict the kmer set, +then `select` to reshape the genome columns. + +`select` never changes the kmer set. The MPHF and `unitigs.bin` of each layer +are preserved unchanged; only the data matrices are rewritten. + +--- + +## Synopsis + +```sh +obikmer select + { --output | --in-place } + [--group : ...] + [--group-op : ...] + [--aggregate-by ] + [--aggregate-op ] + [--select ] + [--presence-threshold ] +``` + +--- + +## Output destination + +Exactly one of `--output` or `--in-place` must be specified. + +**`--output `** — writes a new index to ``. The source index is +unchanged. The MPHF and unitig files are copied; only the data matrices are +rewritten with the new column layout. + +**`--in-place`** — rewrites the data matrices of the source index directly. +Removed or replaced columns are lost. The operation writes to temporary files +first, then renames atomically, so an interrupted run leaves the index intact. + +--- + +## Defining output columns + +### Named groups — `--group` + +``` +--group : +``` + +Defines a named group of genomes using the same predicate syntax as `filter`. +Repeatable; a genome can belong to several groups. + +```sh +--group "pub:species=Betula_pubescens" +--group "nan:species=Betula_nana" +``` + +### Per-group operator — `--group-op` + +``` +--group-op : +``` + +Assigns an aggregation operator to a named group. Optional; if absent, the +default operator applies (see below). + +```sh +--group-op "pub:any" +--group-op "nan:all" +``` + +### Shorthand — `--aggregate-by` / `--aggregate-op` + +`--aggregate-by ` automatically creates one group per unique value of the +metadata key ``. Equivalent to one `--group :=` per distinct +value. `--aggregate-op ` sets the operator for all auto-generated groups. + +`--aggregate-by` and `--group` are mutually exclusive. + +### Column selection and ordering — `--select` + +``` +--select col1,col2,... +``` + +Lists the output columns in order. Each element is either a group name (defined +by `--group` or generated by `--aggregate-by`) or a genome label from the source +index (pass-through, no aggregation). + +**Default when `--select` is absent:** +all defined groups in declaration order (for `--group`), or all generated groups +in metadata-value order (for `--aggregate-by`). Individual genomes not in any +group are excluded unless named explicitly. + +**When neither `--group` nor `--aggregate-by` is specified:** +`--select` can still reference genome labels for pure column projection (no +aggregation). If `--select` is also absent, all genomes are output unchanged +(identity transform — useful combined with row filtering via a prior `filter` +run). + +--- + +## Aggregation operators + +| Operator | Input | Output | Semantics | +|----------|-------------|----------|-----------| +| `any` | pres / count | presence | 1 if ≥ 1 genome in group carries the k-mer | +| `all` | pres / count | presence | 1 if every genome in group carries the k-mer | +| `none` | pres / count | presence | 1 if no genome in group carries the k-mer | +| `sum` | count | count | sum of counts across the group | +| `min` | count | count | minimum count | +| `max` | count | count | maximum count | + +**Default operator:** +- Presence index: `any` +- Count index: `sum` + +Logical operators (`any`/`all`/`none`) on a count index use +`--presence-threshold N` (default 0): a genome "carries" the k-mer if its count +is > N. + +**Output index type:** +- If the source is a presence index, the output is always a presence index. +- If the source is a count index and every output column uses a logical operator + or is a pass-through from a presence source, the output is a presence index. +- Otherwise (at least one arithmetic operator on a count source), the output is + a count index. + +--- + +## Behaviour for edge cases + +| Situation | Behaviour | +|-----------|-----------| +| Genome missing the metadata key in `--aggregate-by` | genome ignored (no `NA` group) | +| Genome in multiple groups | contributes independently to each | +| `--group-op` references undefined group | error | +| `--select` element is neither group name nor genome label | error | +| `--output` and `--in-place` both specified | error | +| Neither `--output` nor `--in-place` | error | +| Group with zero matching genomes | column is all-zeros (or all-ones for `none`) | + +--- + +## Examples + +### Aggregate by metadata group, default operators + +```sh +obikmer select myindex --output out --aggregate-by group +# one column per unique value of "group"; presence→any, count→sum +``` + +### Named groups with different operators + +```sh +obikmer select myindex --output out \ + --group "pub:species=Betula_pubescens" \ + --group "nan:species=Betula_nana" \ + --group-op "pub:any" \ + --group-op "nan:all" \ + --select "pub,nan" +``` + +### Mix aggregated group and individual genome + +```sh +obikmer select myindex --output out \ + --group "A:group=A" \ + --select "A,Betula_nana--IGA-24-39" +``` + +### Pure column projection (no aggregation) + +```sh +obikmer select myindex --output out \ + --select "Betula_nana--TROM-V-149986,Betula_nana--AG-P04-25-01" +``` + +### In-place: keep only group A + +```sh +obikmer select myindex --in-place --group "A:group=A" --select "A" +``` + +### Compose with filter + +```sh +# Step 1: keep only B. nana-specific k-mers +obikmer filter myindex --output filtered \ + --ingroup "species=Betula_nana" --outgroup "*" + +# Step 2: aggregate genome columns by collection site +obikmer select filtered --output final --aggregate-by site +``` + +--- + +## Implementation notes + +`select` does not rebuild the MPHF. The 256 partitions are processed in parallel +(rayon), each writing its output independently; results require no synchronisation +because every partition owns a distinct set of files. + +For each layer in each partition: + +1. The slot count `n` is read by opening the source data matrix. +2. A new data matrix is built with M columns (M = number of output columns). +3. For each slot `s` in `0..n`: + - `old_row = matrix.fill_row(s)` — reads the original `N`-column row without allocating. + - For each output column `j`: + - `new_row[j] = aggregate(op, old_row[group_indices])`. + - Pass-through columns are represented as single-element groups with the + default operator (`any` for presence, `sum` for count) — same code path. + - The new row is written slot by slot into each column builder. +4. All plain files in the source layer directory (`mphf.bin`, `unitigs.bin`, + evidence files, `layer_meta.json`) are copied verbatim; only the `presence/` + or `counts/` subdirectory is rewritten. +5. `index.meta` is rewritten with the new genome list and updated `with_counts`. + +**`--in-place` write strategy:** new data is written to a temporary sibling +directory (`presence_new/` or `counts_new/`); on success the old directory is +removed and the temporary one is renamed into place. An interrupted run leaves +at most one stale `*_new/` directory; the original data is intact until the +rename step. diff --git a/docmd/index.md b/docmd/index.md index e38c230..9f6d650 100644 --- a/docmd/index.md +++ b/docmd/index.md @@ -9,12 +9,13 @@ | `superkmer` | Extract super-kmers from a sequence file and write to stdout | | `index` | Build a complete genome index (scatter → dereplicate → count → layered MPHF) | | `merge` | Merge multiple built indexes into one | -| `rebuild` | Filter and compact an existing index into a new single-layer index; supports the shared [kmer filtering](implementation/filtering.md) system | +| `filter` | Apply row-level selection (σ) to an index: retain only k-mers matching the ingroup/outgroup predicates. Output is a new single-layer index — compaction is a consequence, not the goal. Supports the shared [kmer filtering](implementation/filtering.md) system | | `query` | Query an index with sequences and annotate matches | | `dump` | Dump all indexed k-mers as CSV (kmer + per-genome counts or presence); supports the shared [kmer filtering](implementation/filtering.md) system; `--head N` limits output to the first N k-mers | | `annotate` | Add or update genome metadata from a CSV file; or dump metadata as CSV | | `distance` | Compute pairwise distance matrix between genomes; optionally build NJ/UPGMA trees; `--presence-threshold N` sets the minimum count to consider a k-mer present when computing Jaccard on count indexes (default 1) | | `unitig` | Build a global de Bruijn graph across all partitions and enumerate its unitigs as FASTA; supports the shared [kmer filtering](implementation/filtering.md) system | +| `select` | Project and/or aggregate genome columns into a new or in-place index; the column-axis counterpart of `filter` (see [select](implementation/select.md)) | | `estimate` | Estimate approximate-index parameters (z, evidence bits, FP rates) before indexing | | `reindex` | Convert an index's evidence in-place: exact ↔ approx | | `utils` | Miscellaneous index utilities: `--new-label NEW=OLD` renames a genome label; `--upgrade-index` adds missing `layer_meta.json` to old indexes | diff --git a/mkdocs.yml b/mkdocs.yml index 3335e7d..f37c40b 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -50,6 +50,7 @@ nav: - PersistentBitVec: implementation/persistent_bit_vec.md - Merge command: implementation/merge.md - Kmer filtering: implementation/filtering.md + - Select command: implementation/select.md - Architecture: - Sequences: architecture/sequences/invariant.md - Kmer index: architecture/index_architecture.md diff --git a/src/obikindex/src/lib.rs b/src/obikindex/src/lib.rs index f5f20ae..57e3e18 100644 --- a/src/obikindex/src/lib.rs +++ b/src/obikindex/src/lib.rs @@ -7,6 +7,7 @@ mod index; mod merge; mod rebuild; mod reindex; +mod select; mod stats; pub use error::{OKIError, OKIResult}; diff --git a/src/obikindex/src/select.rs b/src/obikindex/src/select.rs new file mode 100644 index 0000000..653c8ef --- /dev/null +++ b/src/obikindex/src/select.rs @@ -0,0 +1,166 @@ +use std::fs; +use std::io; +use std::path::Path; + +use obikpartitionner::{KmerPartition, OutputCol, PARTITIONS_SUBDIR}; +use obisys::{Stage, progress_bar}; +use rayon::prelude::*; +use tracing::info; + +use crate::error::{OKIError, OKIResult}; +use crate::index::KmerIndex; +use crate::meta::{GenomeInfo, IndexMeta}; +use crate::state::{IndexState, SENTINEL_INDEXED}; + +impl KmerIndex { + /// Create a new index at `output` by projecting/aggregating the genome columns + /// of `src` according to `specs`. + /// + /// `output_presence` — if true, output uses bit matrices (0/1), regardless of + /// whether the source stores counts. The caller is responsible for ensuring all + /// specs use logical operators when `output_presence=true` on a count source. + pub fn select>( + output: P, + src: &KmerIndex, + specs: &[OutputCol], + threshold: u32, + output_presence: bool, + force: bool, + ) -> OKIResult { + let output = output.as_ref(); + + if src.state() != IndexState::Indexed { + return Err(OKIError::NotIndexed(src.root_path.clone())); + } + + if output.exists() { + if force { + fs::remove_dir_all(output)?; + } else { + return Err(OKIError::Io(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("{}: output directory already exists", output.display()), + ))); + } + } + + fs::create_dir_all(output)?; + let mut meta = IndexMeta::new(src.meta.config.clone()); + meta.config.with_counts = !output_presence; + meta.genomes = specs.iter() + .map(|s| GenomeInfo::new(s.label.clone())) + .collect(); + meta.write(output)?; + + let n_src_genomes = src.meta.genomes.len(); + let n_partitions = src.partition.n_partitions(); + + fs::create_dir_all(output.join(PARTITIONS_SUBDIR))?; + let dst_partition = KmerPartition::open_with_config( + output, + meta.config.kmer_size, + meta.config.minimizer_size, + meta.config.n_bits, + )?; + + info!( + "select: {} partition(s), {} source genome(s) → {} output column(s)", + n_partitions, n_src_genomes, specs.len(), + ); + + let t = Stage::start("select"); + let pb = progress_bar("select", n_partitions as u64, "partitions"); + let src_partition = &src.partition; + + let errors: Vec = (0..n_partitions) + .into_par_iter() + .filter_map(|i| { + let result = dst_partition.select_partition( + src_partition, i, specs, + n_src_genomes, threshold, output_presence, + false, + ); + pb.inc(1); + result.err() + }) + .collect(); + + pb.finish_and_clear(); + + if let Some(e) = errors.into_iter().next() { + return Err(OKIError::Partition(e)); + } + + let _ = t.stop(); + + fs::File::create(output.join(SENTINEL_INDEXED))?; + + let idx = KmerIndex::open(output)?; + idx.pack_matrices()?; + Ok(idx) + } + + /// Rewrite the genome columns of this index in-place according to `specs`. + /// + /// The MPHF and unitig files are unchanged; only data matrices are rewritten. + pub fn select_in_place( + &mut self, + specs: &[OutputCol], + threshold: u32, + output_presence: bool, + ) -> OKIResult<()> { + if self.state() != IndexState::Indexed { + return Err(OKIError::NotIndexed(self.root_path.clone())); + } + + let n_src_genomes = self.meta.genomes.len(); + let n_partitions = self.partition.n_partitions(); + + // Open a second handle to the same path so we can borrow src and dst simultaneously. + let src_partition = KmerPartition::open_with_config( + &self.root_path, + self.meta.config.kmer_size, + self.meta.config.minimizer_size, + self.meta.config.n_bits, + )?; + + info!( + "select (in-place): {} partition(s), {} source genome(s) → {} output column(s)", + n_partitions, n_src_genomes, specs.len(), + ); + + let t = Stage::start("select"); + let pb = progress_bar("select", n_partitions as u64, "partitions"); + + let errors: Vec = (0..n_partitions) + .into_par_iter() + .filter_map(|i| { + let result = self.partition.select_partition( + &src_partition, i, specs, + n_src_genomes, threshold, output_presence, + true, + ); + pb.inc(1); + result.err() + }) + .collect(); + + pb.finish_and_clear(); + + if let Some(e) = errors.into_iter().next() { + return Err(OKIError::Partition(e)); + } + + let _ = t.stop(); + + // Update index.meta with new genome list and with_counts flag. + self.meta.config.with_counts = !output_presence; + self.meta.genomes = specs.iter() + .map(|s| GenomeInfo::new(s.label.clone())) + .collect(); + self.meta.write(&self.root_path)?; + + self.pack_matrices()?; + Ok(()) + } +} diff --git a/src/obikmer/src/cmd/rebuild.rs b/src/obikmer/src/cmd/filter.rs similarity index 84% rename from src/obikmer/src/cmd/rebuild.rs rename to src/obikmer/src/cmd/filter.rs index 594d5f4..993f80a 100644 --- a/src/obikmer/src/cmd/rebuild.rs +++ b/src/obikmer/src/cmd/filter.rs @@ -6,10 +6,10 @@ use obikpartitionner::filter::{MaxTotalCount, MinTotalCount}; use obisys::Reporter; use tracing::info; -use super::predicate::FilterArgs; +use super::predicate::FilterArgs as KmerFilterArgs; #[derive(Args)] -pub struct RebuildArgs { +pub struct FilterArgs { /// Source index directory pub source: PathBuf, @@ -18,7 +18,7 @@ pub struct RebuildArgs { pub output: PathBuf, #[command(flatten)] - pub filter: FilterArgs, + pub filter: KmerFilterArgs, /// Minimum total count across all genomes (count index only) #[arg(long)] @@ -37,7 +37,7 @@ pub struct RebuildArgs { pub force: bool, } -pub fn run(args: RebuildArgs) { +pub fn run(args: FilterArgs) { let src = KmerIndex::open(&args.source).unwrap_or_else(|e| { eprintln!("error opening source index: {e}"); std::process::exit(1); @@ -50,7 +50,7 @@ pub fn run(args: RebuildArgs) { }; info!( - "rebuild: {} genome(s), mode={:?}, source={}", + "filter: {} genome(s), mode={:?}, source={}", &src.meta().genomes.len(), mode, args.source.display() ); @@ -66,10 +66,10 @@ pub fn run(args: RebuildArgs) { let mut rep = Reporter::new(); KmerIndex::rebuild(&args.output, &src, &filters, mode, args.force, &mut rep) .unwrap_or_else(|e| { - eprintln!("error rebuilding index: {e}"); + eprintln!("error filtering index: {e}"); std::process::exit(1); }); rep.print(); - info!("rebuilt index → {}", args.output.display()); + info!("filtered index → {}", args.output.display()); } diff --git a/src/obikmer/src/cmd/mod.rs b/src/obikmer/src/cmd/mod.rs index 27048f8..eba674b 100644 --- a/src/obikmer/src/cmd/mod.rs +++ b/src/obikmer/src/cmd/mod.rs @@ -1,6 +1,8 @@ pub mod annotate; +pub mod filter; pub mod pack; pub(crate) mod predicate; +pub mod select; pub mod utils; pub mod distance; pub mod dump; @@ -8,7 +10,6 @@ pub mod estimate; pub mod index; pub mod merge; pub mod query; -pub mod rebuild; pub mod reindex; pub mod superkmer; pub mod unitig; diff --git a/src/obikmer/src/cmd/predicate.rs b/src/obikmer/src/cmd/predicate.rs index 3db6254..04678f0 100644 --- a/src/obikmer/src/cmd/predicate.rs +++ b/src/obikmer/src/cmd/predicate.rs @@ -230,6 +230,16 @@ impl FilterArgs { } } +/// Returns indices of genomes matching `pred_str` (single predicate). +pub fn matching_genome_indices(pred_str: &str, genomes: &[GenomeInfo]) -> Result, String> { + let pred = MetaPred::parse(pred_str)?; + Ok(genomes.iter().enumerate() + .filter_map(|(i, g)| { + if pred.eval(&g.meta) == Some(true) { Some(i) } else { std::option::Option::None } + }) + .collect()) +} + pub struct GroupFilterParams { pub threshold: u32, pub min_count: Option, diff --git a/src/obikmer/src/cmd/select.rs b/src/obikmer/src/cmd/select.rs new file mode 100644 index 0000000..e021b36 --- /dev/null +++ b/src/obikmer/src/cmd/select.rs @@ -0,0 +1,248 @@ +use std::collections::{BTreeMap, HashMap}; +use std::path::PathBuf; + +use clap::{Args, ValueEnum}; +use obikindex::{GenomeInfo, KmerIndex}; +use obikpartitionner::{AggOp, OutputCol}; +use tracing::info; + +use super::predicate::matching_genome_indices; + +// ── CLI types ───────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] +pub enum AggOpArg { + Any, + All, + None, + Sum, + Min, + Max, +} + +impl From for AggOp { + fn from(a: AggOpArg) -> Self { + match a { + AggOpArg::Any => AggOp::Any, + AggOpArg::All => AggOp::All, + AggOpArg::None => AggOp::None, + AggOpArg::Sum => AggOp::Sum, + AggOpArg::Min => AggOp::Min, + AggOpArg::Max => AggOp::Max, + } + } +} + +#[derive(Args)] +pub struct SelectArgs { + /// Source index directory + pub source: PathBuf, + + /// Output index directory (mutually exclusive with --in-place) + #[arg(long, conflicts_with = "in_place")] + pub output: Option, + + /// Rewrite the source index in-place (mutually exclusive with --output) + #[arg(long)] + pub in_place: bool, + + /// Define a named group: `:` (repeatable; mutually exclusive with --aggregate-by) + #[arg(long, value_name = "NAME:PRED", conflicts_with = "aggregate_by")] + pub group: Vec, + + /// Per-group aggregation operator: `:` (repeatable) + #[arg(long, value_name = "NAME:OP")] + pub group_op: Vec, + + /// Auto-create one group per unique value of metadata key + #[arg(long, value_name = "KEY", conflicts_with = "group")] + pub aggregate_by: Option, + + /// Aggregation operator for all auto-generated groups + #[arg(long, value_name = "OP")] + pub aggregate_op: Option, + + /// Output columns in order: group names or genome labels, comma-separated + #[arg(long, value_name = "COL,...", value_delimiter = ',')] + pub select: Option>, + + /// Minimum count to consider a genome as "carrying" the k-mer (logical ops only) + #[arg(long, default_value = "0")] + pub presence_threshold: u32, + + /// Overwrite existing output directory + #[arg(short, long)] + pub force: bool, +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +fn parse_name_value(s: &str, flag: &str) -> (String, String) { + match s.find(':') { + Some(pos) => (s[..pos].trim().to_string(), s[pos + 1..].to_string()), + std::option::Option::None => { + eprintln!("error in {flag}: expected :, got: {s}"); + std::process::exit(1); + } + } +} + +fn parse_agg_op(s: &str) -> AggOp { + match s.to_lowercase().as_str() { + "any" => AggOp::Any, + "all" => AggOp::All, + "none" => AggOp::None, + "sum" => AggOp::Sum, + "min" => AggOp::Min, + "max" => AggOp::Max, + other => { + eprintln!("unknown aggregation operator: {other}; valid: any, all, none, sum, min, max"); + std::process::exit(1); + } + } +} + +fn default_op(src_is_count: bool) -> AggOp { + if src_is_count { AggOp::Sum } else { AggOp::Any } +} + +// ── build_specs ─────────────────────────────────────────────────────────────── + +/// Resolve CLI arguments into an ordered list of `OutputCol`. +/// +/// Returns `(specs, output_presence)`. +fn build_specs( + args: &SelectArgs, + genomes: &[GenomeInfo], + src_is_count: bool, +) -> (Vec, bool) { + // ── 1. Build group_indices: name → Vec ──────────────────────────── + // Also keep insertion order for the default `--select *` case. + let mut group_order: Vec = Vec::new(); + let mut group_indices: HashMap> = HashMap::new(); + + if let Some(ref key) = args.aggregate_by { + // One group per unique value of `key`, in sorted order. + let mut value_to_indices: BTreeMap> = BTreeMap::new(); + for (i, g) in genomes.iter().enumerate() { + if let Some(v) = g.meta.get(key) { + value_to_indices.entry(v.clone()).or_default().push(i); + } + } + for (v, idxs) in value_to_indices { + group_order.push(v.clone()); + group_indices.insert(v, idxs); + } + } else { + for raw in &args.group { + let (name, pred) = parse_name_value(raw, "--group"); + let idxs = matching_genome_indices(&pred, genomes).unwrap_or_else(|e| { + eprintln!("error in --group {name}: {e}"); + std::process::exit(1); + }); + if !group_indices.contains_key(&name) { + group_order.push(name.clone()); + } + group_indices.insert(name, idxs); + } + } + + // ── 2. Build per-group ops ──────────────────────────────────────────────── + let global_op = args.aggregate_op.map(AggOp::from); + let mut group_op: HashMap = HashMap::new(); + for raw in &args.group_op { + let (name, op_str) = parse_name_value(raw, "--group-op"); + if !group_indices.contains_key(&name) { + eprintln!("--group-op references undefined group: {name}"); + std::process::exit(1); + } + group_op.insert(name, parse_agg_op(&op_str)); + } + + // ── 3. Genome label → index map for pass-through columns ───────────────── + let label_to_idx: HashMap<&str, usize> = genomes.iter().enumerate() + .map(|(i, g)| (g.label.as_str(), i)) + .collect(); + + // ── 4. Determine output column names ───────────────────────────────────── + let col_names: Vec = if let Some(ref sel) = args.select { + sel.clone() + } else if !group_order.is_empty() { + group_order.clone() + } else { + // Identity: all genomes in original order + genomes.iter().map(|g| g.label.clone()).collect() + }; + + // ── 5. Build OutputCol list ─────────────────────────────────────────────── + let mut specs: Vec = Vec::with_capacity(col_names.len()); + + for name in &col_names { + if let Some(idxs) = group_indices.get(name) { + let op = group_op.get(name) + .copied() + .or(global_op) + .unwrap_or_else(|| default_op(src_is_count)); + specs.push(OutputCol { label: name.clone(), indices: idxs.clone(), op }); + } else if let Some(&idx) = label_to_idx.get(name.as_str()) { + // Pass-through: single-element group with default op. + let op = default_op(src_is_count); + specs.push(OutputCol { label: name.clone(), indices: vec![idx], op }); + } else { + eprintln!("--select: unknown column '{name}' (not a group name or genome label)"); + std::process::exit(1); + } + } + + if specs.is_empty() { + eprintln!("select: no output columns defined"); + std::process::exit(1); + } + + // ── 6. Determine output type ────────────────────────────────────────────── + let output_presence = !src_is_count + || specs.iter().all(|s| s.op.is_logical()); + + (specs, output_presence) +} + +// ── run ─────────────────────────────────────────────────────────────────────── + +pub fn run(args: SelectArgs) { + if !args.in_place && args.output.is_none() { + eprintln!("error: one of --output or --in-place must be specified"); + std::process::exit(1); + } + + let mut src = KmerIndex::open(&args.source).unwrap_or_else(|e| { + eprintln!("error opening source index: {e}"); + std::process::exit(1); + }); + + let src_is_count = src.meta().config.with_counts; + let (specs, output_presence) = build_specs(&args, &src.meta().genomes.clone(), src_is_count); + + info!( + "select: {} genome(s) → {} output column(s), output={}", + src.meta().genomes.len(), + specs.len(), + if output_presence { "presence" } else { "count" }, + ); + + if args.in_place { + src.select_in_place(&specs, args.presence_threshold, output_presence) + .unwrap_or_else(|e| { + eprintln!("select error: {e}"); + std::process::exit(1); + }); + info!("selected in-place → {}", args.source.display()); + } else { + let output = args.output.unwrap(); + KmerIndex::select(&output, &src, &specs, args.presence_threshold, output_presence, args.force) + .unwrap_or_else(|e| { + eprintln!("select error: {e}"); + std::process::exit(1); + }); + info!("selected index → {}", output.display()); + } +} diff --git a/src/obikmer/src/main.rs b/src/obikmer/src/main.rs index 47ff4c7..fdcf69c 100644 --- a/src/obikmer/src/main.rs +++ b/src/obikmer/src/main.rs @@ -20,8 +20,10 @@ enum Commands { Index(cmd::index::IndexArgs), /// Merge multiple built indexes into one Merge(cmd::merge::MergeArgs), - /// Filter and compact an existing index into a new single-layer index - Rebuild(cmd::rebuild::RebuildArgs), + /// Apply row-level selection (σ) to an index: retain only k-mers matching the predicates + Filter(cmd::filter::FilterArgs), + /// Project and/or aggregate genome columns into a new or in-place index + Select(cmd::select::SelectArgs), /// Query an index with sequences and annotate matches Query(cmd::query::QueryArgs), /// Dump all indexed kmers as CSV (kmer + per-genome counts or presence) @@ -65,7 +67,8 @@ fn main() { Commands::Index(args) => cmd::index::run(args), Commands::Merge(args) => cmd::merge::run(args), Commands::Dump(args) => cmd::dump::run(args), - Commands::Rebuild(args) => cmd::rebuild::run(args), + Commands::Filter(args) => cmd::filter::run(args), + Commands::Select(args) => cmd::select::run(args), Commands::Query(args) => cmd::query::run(args), Commands::Annotate(args) => cmd::annotate::run(args), Commands::Distance(args) => cmd::distance::run(args), diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index b152383..fe09264 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -7,7 +7,9 @@ mod merge_layer; mod partition; mod query_layer; mod rebuild_layer; +mod select_layer; pub use filter::{GroupQuorumFilter, KmerFilter, passes_all}; pub use merge_layer::MergeMode; pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR}; +pub use select_layer::{AggOp, OutputCol}; diff --git a/src/obikpartitionner/src/select_layer.rs b/src/obikpartitionner/src/select_layer.rs new file mode 100644 index 0000000..36286c0 --- /dev/null +++ b/src/obikpartitionner/src/select_layer.rs @@ -0,0 +1,287 @@ +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; + +use obicompactvec::{ + PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, + PersistentCompactIntMatrix, PersistentCompactIntMatrixBuilder, PersistentCompactIntVecBuilder, +}; +use obilayeredmap::meta::PartitionMeta; +use obilayeredmap::OLMError; +use obiskio::{SKError, SKResult}; + +use crate::partition::KmerPartition; + +const INDEX_SUBDIR: &str = "index"; + +// ── AggOp ───────────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AggOp { + Any, + All, + None, + Sum, + Min, + Max, +} + +impl AggOp { + pub fn is_logical(self) -> bool { + matches!(self, AggOp::Any | AggOp::All | AggOp::None) + } +} + +// ── OutputCol ───────────────────────────────────────────────────────────────── + +pub struct OutputCol { + pub label: String, + pub indices: Vec, + pub op: AggOp, +} + +// ── Aggregation ─────────────────────────────────────────────────────────────── + +#[inline] +fn aggregate(op: AggOp, indices: &[usize], src_row: &[u32], threshold: u32) -> u32 { + match op { + AggOp::Any => { + if indices.iter().any(|&i| src_row[i] > threshold) { 1 } else { 0 } + } + AggOp::All => { + if indices.is_empty() { return 0; } + if indices.iter().all(|&i| src_row[i] > threshold) { 1 } else { 0 } + } + AggOp::None => { + if indices.iter().all(|&i| src_row[i] <= threshold) { 1 } else { 0 } + } + AggOp::Sum => { + indices.iter().map(|&i| src_row[i]).fold(0u32, |a, b| a.saturating_add(b)) + } + AggOp::Min => indices.iter().map(|&i| src_row[i]).min().unwrap_or(0), + AggOp::Max => indices.iter().map(|&i| src_row[i]).max().unwrap_or(0), + } +} + +// ── ColBuilder ──────────────────────────────────────────────────────────────── + +enum ColBuilder { + Bit(PersistentBitVecBuilder), + Int(PersistentCompactIntVecBuilder), +} + +impl ColBuilder { + fn set_val(&mut self, slot: usize, value: u32) { + match self { + ColBuilder::Bit(b) => b.set(slot, value > 0), + ColBuilder::Int(b) => b.set(slot, value), + } + } + + fn close(self) -> SKResult<()> { + match self { + ColBuilder::Bit(b) => b.close().map_err(SKError::Io), + ColBuilder::Int(b) => b.close().map_err(SKError::Io), + } + } +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +fn olm_to_sk(e: OLMError) -> SKError { + match e { + OLMError::Io(e) => SKError::Io(e), + other => SKError::InvalidData { context: "select", detail: other.to_string() }, + } +} + +fn col_path_bit(dir: &Path, col: usize) -> PathBuf { + dir.join(format!("col_{col:06}.pbiv")) +} + +fn col_path_int(dir: &Path, col: usize) -> PathBuf { + dir.join(format!("col_{col:06}.pciv")) +} + +fn write_matrix_meta(dir: &Path, n: usize, n_cols: usize) -> io::Result<()> { + fs::write( + dir.join("meta.json"), + format!("{{\"n\":{n},\"n_cols\":{n_cols}}}\n"), + ) +} + +/// Copy all plain files (not subdirectories) from `src_dir` to `dst_dir`. +fn copy_layer_files(src_dir: &Path, dst_dir: &Path) -> io::Result<()> { + for entry in fs::read_dir(src_dir)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() { + fs::copy(&path, dst_dir.join(entry.file_name()))?; + } + } + Ok(()) +} + +// ── fill_builders ───────────────────────────────────────────────────────────── + +fn fill_builders( + builders: &mut [ColBuilder], + specs: &[OutputCol], + n: usize, + n_src: usize, + src_layer_dir: &Path, + src_is_count: bool, + threshold: u32, +) -> SKResult<()> { + let mut src_buf = vec![0u32; n_src]; + + if src_is_count { + let mat = PersistentCompactIntMatrix::open(src_layer_dir).map_err(SKError::Io)?; + for slot in 0..n { + mat.fill_row(slot, &mut src_buf); + for (col, spec) in specs.iter().enumerate() { + builders[col].set_val(slot, aggregate(spec.op, &spec.indices, &src_buf, threshold)); + } + } + } else { + let mat = PersistentBitMatrix::open(src_layer_dir).map_err(SKError::Io)?; + for slot in 0..n { + mat.fill_row(slot, &mut src_buf); + for (col, spec) in specs.iter().enumerate() { + builders[col].set_val(slot, aggregate(spec.op, &spec.indices, &src_buf, threshold)); + } + } + } + Ok(()) +} + +// ── KmerPartition::select_partition ────────────────────────────────────────── + +impl KmerPartition { + /// Rewrite the data matrices of partition `i` in `src` into `self`. + /// + /// `specs` defines the output columns (projection/aggregation). + /// `output_presence` — if true, all output builders use bit (0/1) format. + /// `in_place` — `self` and `src` share the same root; write to temp dirs then swap. + pub fn select_partition( + &self, + src: &KmerPartition, + i: usize, + specs: &[OutputCol], + n_src_genomes: usize, + threshold: u32, + output_presence: bool, + in_place: bool, + ) -> SKResult<()> { + let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); + if !src_index_dir.exists() { + return Ok(()); + } + + let src_meta = PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)?; + if src_meta.n_layers == 0 { + return Ok(()); + } + + let dst_index_dir = self.part_dir(i).join(INDEX_SUBDIR); + if !in_place { + fs::create_dir_all(&dst_index_dir)?; + } + + let n_out = specs.len(); + let data_subdir = if output_presence { "presence" } else { "counts" }; + + for l in 0..src_meta.n_layers { + let src_layer_dir = src_index_dir.join(format!("layer_{l}")); + if !src_layer_dir.exists() { continue; } + + let dst_layer_dir = dst_index_dir.join(format!("layer_{l}")); + + let counts_dir = src_layer_dir.join("counts"); + let presence_dir = src_layer_dir.join("presence"); + let src_is_count = counts_dir.exists() && !presence_dir.exists(); + + // Determine number of slots from the source matrix. + let n = if counts_dir.exists() { + PersistentCompactIntMatrix::open(&src_layer_dir).map_err(SKError::Io)?.n() + } else if presence_dir.exists() { + PersistentBitMatrix::open(&src_layer_dir).map_err(SKError::Io)?.n() + } else { + // Implicit single-genome layer: no data matrix needed in output either. + if !in_place { + fs::create_dir_all(&dst_layer_dir)?; + copy_layer_files(&src_layer_dir, &dst_layer_dir)?; + } + continue; + }; + + // Choose the output data directory (temp name for in-place). + let (dst_data_dir, final_data_dir) = if in_place { + let tmp = dst_layer_dir.join(format!("{data_subdir}_new")); + let perm = dst_layer_dir.join(data_subdir); + (tmp, perm) + } else { + let perm = dst_layer_dir.join(data_subdir); + (perm.clone(), perm) + }; + + if !in_place { + fs::create_dir_all(&dst_layer_dir)?; + copy_layer_files(&src_layer_dir, &dst_layer_dir)?; + } + fs::create_dir_all(&dst_data_dir)?; + + // Initialise packed-format skeleton. + if output_presence { + PersistentBitMatrixBuilder::new(n, &dst_data_dir) + .map_err(SKError::Io)?.close().map_err(SKError::Io)?; + } else { + PersistentCompactIntMatrixBuilder::new(n, &dst_data_dir) + .map_err(SKError::Io)?.close().map_err(SKError::Io)?; + } + + // Create column builders. + let mut builders: Vec = (0..n_out) + .map(|col| -> SKResult { + if output_presence { + Ok(ColBuilder::Bit(PersistentBitVecBuilder::new( + n, &col_path_bit(&dst_data_dir, col), + )?)) + } else { + Ok(ColBuilder::Int(PersistentCompactIntVecBuilder::new( + n, &col_path_int(&dst_data_dir, col), + )?)) + } + }) + .collect::>()?; + + fill_builders( + &mut builders, specs, n, n_src_genomes, + &src_layer_dir, src_is_count, threshold, + )?; + + for b in builders { b.close()?; } + write_matrix_meta(&dst_data_dir, n, n_out).map_err(SKError::Io)?; + + // In-place: swap old data dir for new. + if in_place { + let old_data_dir = if src_is_count { + dst_layer_dir.join("counts") + } else { + dst_layer_dir.join("presence") + }; + if old_data_dir.exists() { + fs::remove_dir_all(&old_data_dir)?; + } + fs::rename(&dst_data_dir, &final_data_dir)?; + } + } + + if !in_place { + PartitionMeta::load(&src_index_dir).map_err(olm_to_sk)? + .save(&dst_index_dir).map_err(olm_to_sk)?; + } + + Ok(()) + } +}