Files
obikmer/src/obikindex/src/index.rs
T
Eric Coissac 0f8f61d3dd feat: introduce genome metadata tracking and CSV export
This commit replaces raw string genome labels with a structured `GenomeInfo` type for better metadata tracking. It adds a `--meta` flag to the index command, and implements a new `annotate` CLI subcommand to import metadata from CSV files or export it via `--dump`. Distance and shared-count matrices are now serialized to CSV, with UPGMA clustering trees exported as Newick files. Query outputs now include per-genome k-mer match counts in JSON, while fixing syntax and variable naming issues in index merging and dump generation.
2026-05-22 09:35:20 +02:00

218 lines
7.3 KiB
Rust

use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use indicatif::{ProgressBar, ProgressStyle};
use obikpartitionner::{KmerPartition, KmerSpectrum};
use obisys::{Reporter, Stage};
use rayon::prelude::*;
use tracing::info;
use crate::error::{OKIError, OKIResult};
use crate::meta::{GenomeInfo, IndexConfig, IndexMeta};
use crate::state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED};
pub struct KmerIndex {
pub(crate) root_path: PathBuf,
pub(crate) meta: IndexMeta,
pub(crate) partition: KmerPartition,
}
impl KmerIndex {
/// Create a new index at `path`.
///
/// If `genome_info` is `Some`, it is stored immediately.
/// If `None`, the genome entry will be added when `mark_scattered` is called.
pub fn create<P: AsRef<Path>>(
path: P,
config: IndexConfig,
genome_info: Option<GenomeInfo>,
force: bool,
) -> OKIResult<Self> {
let root_path = path.as_ref().to_owned();
let partition = KmerPartition::create(
&root_path,
config.n_bits,
config.kmer_size,
config.minimizer_size,
force,
)?;
let mut meta = IndexMeta::new(config);
if let Some(info) = genome_info {
meta.genomes.push(info);
}
meta.write(&root_path)?;
Ok(Self { root_path, meta, partition })
}
pub fn open<P: AsRef<Path>>(path: P) -> OKIResult<Self> {
let root_path = path.as_ref().to_owned();
let meta = IndexMeta::read(&root_path).map_err(OKIError::Io)?;
let partition = KmerPartition::open_with_config(
&root_path,
meta.config.kmer_size,
meta.config.minimizer_size,
meta.config.n_bits,
)?;
Ok(Self { root_path, meta, partition })
}
/// Return `true` if `path` contains an `index.meta` file.
pub fn exists<P: AsRef<Path>>(path: P) -> bool {
IndexMeta::exists(path.as_ref())
}
/// Current construction state, as reported by sentinel files on disk.
pub fn state(&self) -> IndexState {
IndexState::detect(&self.root_path).unwrap_or(IndexState::Empty)
}
pub fn meta(&self) -> &IndexMeta { &self.meta }
pub fn meta_mut(&mut self) -> &mut IndexMeta { &mut self.meta }
pub fn kmer_size(&self) -> usize { self.meta.config.kmer_size }
pub fn minimizer_size(&self) -> usize { self.meta.config.minimizer_size }
pub fn n_partitions(&self) -> usize { self.partition.n_partitions() }
/// Expose the inner partition so the caller can run scatter into it.
/// Call `mark_scattered` once scatter is complete.
pub fn partition_mut(&mut self) -> &mut KmerPartition {
&mut self.partition
}
/// Mark scatter as complete and write `scatter.done`.
///
/// If no genome label was set at creation time, one is derived from
/// the index root directory name (stripped of all extensions).
pub fn mark_scattered(&mut self) -> OKIResult<()> {
if self.meta.genomes.is_empty() {
let label = label_from_path(&self.root_path);
self.meta.genomes.push(GenomeInfo::new(label));
self.meta.write(&self.root_path)?;
}
touch(&self.root_path.join(SENTINEL_SCATTERED))?;
Ok(())
}
/// Dereplicate all partitions then compute kmer counts.
///
/// Writes `spectrums/{label}.json` and touches `count.done` upon completion.
/// Per-partition spectrum files are removed unless `keep_intermediate` is true.
pub fn dereplicate_and_count(&self, keep_intermediate: bool, rep: &mut Reporter) -> OKIResult<()> {
let t = Stage::start("dereplicate");
self.partition.dereplicate()?;
rep.push(t.stop());
let t = Stage::start("count_kmer");
let spectrum = self.partition.count_kmer(keep_intermediate)?;
rep.push(t.stop());
self.write_spectrum(&spectrum)?;
touch(&self.root_path.join(SENTINEL_COUNTED))?;
Ok(())
}
fn write_spectrum(&self, sp: &KmerSpectrum) -> OKIResult<()> {
let label = self.meta.genomes.first().map(|g| g.label.as_str()).unwrap_or("unknown");
let spectrums_dir = self.root_path.join("spectrums");
fs::create_dir_all(&spectrums_dir)?;
let path = spectrums_dir.join(format!("{label}.json"));
let spectrum_map: BTreeMap<String, u64> = sp.counts
.iter()
.map(|(&c, &f)| (format!("{c:010}"), f))
.collect();
let f = fs::File::create(&path)?;
serde_json::to_writer_pretty(
f,
&serde_json::json!({ "f0": sp.f0, "f1": sp.f1, "spectrum": spectrum_map }),
)
.map_err(OKIError::Json)?;
Ok(())
}
/// Build the layered MPHF index for all partitions in parallel.
///
/// Writes `index.done` upon completion.
pub fn build_layers(
&self,
min_ab: u32,
max_ab: Option<u32>,
keep_intermediate: bool,
rep: &mut Reporter,
) -> OKIResult<()> {
let n = self.partition.n_partitions();
let t = Stage::start("index");
let with_counts = self.meta.config.with_counts;
let total_kmers = AtomicUsize::new(0);
let pb = Arc::new(Mutex::new(
ProgressBar::new(n as u64).with_style(
ProgressStyle::with_template("index — [{bar:20}] {pos}/{len} | {msg}").unwrap(),
),
));
(0..n).into_par_iter().for_each(|i| {
match self.partition.build_index_layer(i, min_ab, max_ab, with_counts) {
Ok(0) => {}
Ok(n_kmers) => {
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
let pb = pb.lock().unwrap();
pb.inc(1);
pb.set_message(format!("{i}: {n_kmers} kmers"));
}
Err(e) => {
eprintln!("error building layer for partition {i}: {e}");
std::process::exit(1);
}
}
});
pb.lock().unwrap().finish_and_clear();
info!(
"done — {} total kmers indexed",
total_kmers.load(Ordering::Relaxed)
);
if !keep_intermediate {
for i in 0..n {
self.partition.remove_build_artifacts(i);
}
}
touch(&self.root_path.join(SENTINEL_INDEXED))?;
rep.push(t.stop());
Ok(())
}
/// Borrow the inner partition for direct superkmer-level queries.
pub fn partition(&self) -> &KmerPartition {
&self.partition
}
/// Path to the unitigs file for partition `part`, layer `layer`.
pub fn layer_unitigs_path(&self, part: usize, layer: usize) -> PathBuf {
self.partition.part_dir(part)
.join("index")
.join(format!("layer_{layer}"))
.join("unitigs.bin")
}
}
fn label_from_path(path: &Path) -> String {
let name = path
.file_name()
.unwrap_or(path.as_os_str())
.to_string_lossy()
.into_owned();
let mut s = name;
while let Some(pos) = s.rfind('.') {
s.truncate(pos);
}
if s.is_empty() { "unknown".to_string() } else { s }
}
fn touch(path: &Path) -> Result<(), std::io::Error> {
fs::File::create(path).map(|_| ())
}