Files
obikmer/src/obikindex/src/index.rs
T
Eric Coissac b6fcbc545f refactor: replace rayon with NUMA-aware PartitionRunner
Replaces `rayon` parallel iteration across index, rebuild, reindex, and select modules with a custom `PartitionRunner`. This introduces NUMA-aware task distribution with CPU pinning and round-robin scheduling, eliminating `Arc`, `Mutex`, and atomic synchronization primitives in favor of a flat, pre-spawned worker architecture. Error handling is simplified via `.map_err()` and the `?` operator, while progress bar updates are decoupled into dedicated callbacks.
2026-06-15 18:53:31 +02:00

287 lines
10 KiB
Rust

use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use obikpartitionner::{KmerPartition, KmerSpectrum};
use obilayeredmap;
use obisys::{Reporter, Stage, progress_bar};
use rayon::prelude::*;
use tracing::info;
use obikseq::{set_k, set_m};
use crate::error::{OKIError, OKIResult};
use crate::meta::{GenomeInfo, IndexConfig, IndexMeta};
use crate::state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED};
pub struct KmerIndex {
pub(crate) root_path: PathBuf,
pub(crate) meta: IndexMeta,
pub(crate) partition: KmerPartition,
}
impl KmerIndex {
/// Create a new index at `path`.
///
/// If `genome_info` is `Some`, it is stored immediately.
/// If `None`, the genome entry will be added when `mark_scattered` is called.
pub fn create<P: AsRef<Path>>(
path: P,
config: IndexConfig,
genome_info: Option<GenomeInfo>,
force: bool,
) -> OKIResult<Self> {
let root_path = path.as_ref().to_owned();
let partition = KmerPartition::create(
&root_path,
config.n_bits,
config.kmer_size,
config.minimizer_size,
force,
)?;
set_k(config.kmer_size);
set_m(config.minimizer_size);
let mut meta = IndexMeta::new(config);
if let Some(info) = genome_info {
meta.genomes.push(info);
}
meta.write(&root_path)?;
Ok(Self { root_path, meta, partition })
}
pub fn open<P: AsRef<Path>>(path: P) -> OKIResult<Self> {
let root_path = path.as_ref().to_owned();
let meta = IndexMeta::read(&root_path).map_err(OKIError::Io)?;
set_k(meta.config.kmer_size);
set_m(meta.config.minimizer_size);
let partition = KmerPartition::open_with_config(
&root_path,
meta.config.kmer_size,
meta.config.minimizer_size,
meta.config.n_bits,
)?;
Ok(Self { root_path, meta, partition })
}
/// Return `true` if `path` contains an `index.meta` file.
pub fn exists<P: AsRef<Path>>(path: P) -> bool {
IndexMeta::exists(path.as_ref())
}
/// Current construction state, as reported by sentinel files on disk.
pub fn state(&self) -> IndexState {
IndexState::detect(&self.root_path).unwrap_or(IndexState::Empty)
}
pub fn meta(&self) -> &IndexMeta { &self.meta }
pub fn meta_mut(&mut self) -> &mut IndexMeta { &mut self.meta }
pub fn kmer_size(&self) -> usize { self.meta.config.kmer_size }
pub fn minimizer_size(&self) -> usize { self.meta.config.minimizer_size }
pub fn n_partitions(&self) -> usize { self.partition.n_partitions() }
/// Expose the inner partition so the caller can run scatter into it.
/// Call `mark_scattered` once scatter is complete.
pub fn partition_mut(&mut self) -> &mut KmerPartition {
&mut self.partition
}
/// Mark scatter as complete and write `scatter.done`.
///
/// If no genome label was set at creation time, one is derived from
/// the index root directory name (stripped of all extensions).
pub fn mark_scattered(&mut self) -> OKIResult<()> {
if self.meta.genomes.is_empty() {
let label = label_from_path(&self.root_path);
self.meta.genomes.push(GenomeInfo::new(label));
self.meta.write(&self.root_path)?;
}
touch(&self.root_path.join(SENTINEL_SCATTERED))?;
Ok(())
}
/// Dereplicate all partitions then compute kmer counts.
///
/// Writes `spectrums/{label}.json` and touches `count.done` upon completion.
/// Per-partition spectrum files are removed unless `keep_intermediate` is true.
pub fn dereplicate_and_count(&self, keep_intermediate: bool, rep: &mut Reporter) -> OKIResult<()> {
let t = Stage::start("dereplicate");
self.partition.dereplicate()?;
rep.push(t.stop());
let t = Stage::start("count_kmer");
let spectrum = self.partition.count_kmer(keep_intermediate)?;
rep.push(t.stop());
self.write_spectrum(&spectrum)?;
touch(&self.root_path.join(SENTINEL_COUNTED))?;
Ok(())
}
fn write_spectrum(&self, sp: &KmerSpectrum) -> OKIResult<()> {
let label = self.meta.genomes.first().map(|g| g.label.as_str()).unwrap_or("unknown");
let spectrums_dir = self.root_path.join("spectrums");
fs::create_dir_all(&spectrums_dir)?;
let path = spectrums_dir.join(format!("{label}.json"));
let spectrum_map: BTreeMap<String, u64> = sp.counts
.iter()
.map(|(&c, &f)| (format!("{c:010}"), f))
.collect();
let f = fs::File::create(&path)?;
serde_json::to_writer_pretty(
f,
&serde_json::json!({ "f0": sp.f0, "f1": sp.f1, "spectrum": spectrum_map }),
)
.map_err(OKIError::Json)?;
Ok(())
}
/// Build the layered MPHF index for all partitions in parallel.
///
/// Writes `index.done` upon completion.
pub fn build_layers(
&self,
min_ab: u32,
max_ab: Option<u32>,
keep_intermediate: bool,
rep: &mut Reporter,
) -> OKIResult<()> {
let n = self.partition.n_partitions();
let t = Stage::start("index");
let with_counts = self.meta.config.with_counts;
let evidence = self.meta.config.evidence.clone();
let block_bits = self.meta.config.block_bits;
let mut total_kmers: usize = 0;
let pb = progress_bar("index", n as u64, "partitions");
let order: Vec<usize> = (0..n).collect();
let runner = crate::numa::PartitionRunner::new();
runner.run(
&order,
|i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits),
|i, n_kmers, _| {
if n_kmers > 0 {
total_kmers += n_kmers;
pb.inc(1);
pb.set_message(format!("{i}: {n_kmers} kmers"));
}
},
).map_err(OKIError::Partition)?;
pb.finish_and_clear();
info!("done — {} total kmers indexed", total_kmers);
if !keep_intermediate {
for i in 0..n {
self.partition.remove_build_artifacts(i);
}
}
touch(&self.root_path.join(SENTINEL_INDEXED))?;
rep.push(t.stop());
Ok(())
}
/// Borrow the inner partition for direct superkmer-level queries.
pub fn partition(&self) -> &KmerPartition {
&self.partition
}
/// Path to the unitigs file for partition `part`, layer `layer`.
pub fn layer_unitigs_path(&self, part: usize, layer: usize) -> PathBuf {
self.partition.part_dir(part)
.join("index")
.join(format!("layer_{layer}"))
.join("unitigs.bin")
}
/// Pack all partition matrices into single-file format (presence → .pbmx, counts → .pcmx).
///
/// Reduces per-query file-open overhead from O(n_genomes) to O(1) per partition.
/// Column files are kept in place; packed files take priority when opening.
pub fn pack_matrices(&self) -> OKIResult<()> {
use obicompactvec::{pack_bit_matrix, pack_compact_int_matrix};
use obilayeredmap::meta::PartitionMeta;
let n = self.n_partitions();
let order: Vec<usize> = (0..n).collect();
let pb = progress_bar("pack", n as u64, "partitions");
crate::numa::PartitionRunner::new().run(
&order,
|i| -> OKIResult<()> {
let index_dir = self.partition.part_dir(i).join("index");
if !index_dir.exists() { return Ok(()); }
let meta = PartitionMeta::load(&index_dir)
.map_err(|e| OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())))?;
for l in 0..meta.n_layers {
let layer_dir = index_dir.join(format!("layer_{l}"));
let presence_dir = layer_dir.join("presence");
let counts_dir = layer_dir.join("counts");
if presence_dir.exists() { pack_bit_matrix(&presence_dir).map_err(OKIError::Io)?; }
if counts_dir.exists() { pack_compact_int_matrix(&counts_dir).map_err(OKIError::Io)?; }
}
Ok(())
},
|_, _, _| { pb.inc(1); },
)?;
pb.finish_and_clear();
Ok(())
}
/// Write a `layer_meta.json` in any layer directory that is missing one.
///
/// Old indexes were built before this file was required. The number of
/// kmers is recovered from `unitigs.bin`, which is always present.
pub fn upgrade_layer_meta(&self) -> OKIResult<()> {
use obicompactvec::LayerMeta;
use obiskio::UnitigFileReader;
use obilayeredmap::meta::PartitionMeta;
let n = self.n_partitions();
let errors: Vec<_> = (0..n)
.into_par_iter()
.filter_map(|i| {
let index_dir = self.partition.part_dir(i).join("index");
if !index_dir.exists() { return None; }
let meta = match PartitionMeta::load(&index_dir) {
Ok(m) => m,
Err(e) => return Some(OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
};
for l in 0..meta.n_layers {
let layer_dir = index_dir.join(format!("layer_{l}"));
let meta_path = layer_dir.join(LayerMeta::FILENAME);
if meta_path.exists() { continue; }
let unitigs_path = layer_dir.join("unitigs.bin");
let n_kmers = match UnitigFileReader::open_sequential(&unitigs_path) {
Ok(r) => r.n_kmers(),
Err(e) => return Some(OKIError::Partition(e)),
};
if let Err(e) = LayerMeta::save(&layer_dir, n_kmers) {
return Some(OKIError::Io(e));
}
}
None
})
.collect();
if let Some(e) = errors.into_iter().next() { return Err(e); }
Ok(())
}
}
fn label_from_path(path: &Path) -> String {
let name = path
.file_name()
.unwrap_or(path.as_os_str())
.to_string_lossy()
.into_owned();
let mut s = name;
while let Some(pos) = s.rfind('.') {
s.truncate(pos);
}
if s.is_empty() { "unknown".to_string() } else { s }
}
fn touch(path: &Path) -> Result<(), std::io::Error> {
fs::File::create(path).map(|_| ())
}