feat: introduce layered MPHF indexing and partition metadata

Refactors obikindex and obikpartitionner to delegate index construction to a new layered MPHF implementation. Adds resume-safe building with abundance filtering and count persistence, while introducing a PartitionMeta struct for JSON configuration persistence. Updates OKIError to wrap layer-specific errors, replaces single-path extraction with full path collection and logging, and registers new internal dependencies across the workspace.
This commit is contained in:
Eric Coissac
2026-05-20 21:01:16 +02:00
parent 17c9e076bd
commit c5bcb7b8fa
10 changed files with 193 additions and 229 deletions
+1 -8
View File
@@ -5,15 +5,8 @@ edition = "2024"
[dependencies]
obikpartitionner = { path = "../obikpartitionner" }
obikseq = { path = "../obikseq" }
obisys = { path = "../obisys" }
obiskio = { path = "../obiskio" }
obidebruinj = { path = "../obidebruinj" }
obilayeredmap = { path = "../obilayeredmap" }
obicompactvec = { path = "../obicompactvec" }
cacheline-ef = "1.1"
epserde = "0.8"
ptr_hash = "1.1"
obisys = { path = "../obisys" }
rayon = "1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
-8
View File
@@ -2,14 +2,12 @@ use std::fmt;
use std::io;
use obiskio::SKError;
use obilayeredmap::OLMError;
#[derive(Debug)]
pub enum OKIError {
Io(io::Error),
Json(serde_json::Error),
Partition(SKError),
Layer(OLMError),
}
pub type OKIResult<T> = Result<T, OKIError>;
@@ -20,7 +18,6 @@ impl fmt::Display for OKIError {
OKIError::Io(e) => write!(f, "I/O error: {e}"),
OKIError::Json(e) => write!(f, "JSON error: {e}"),
OKIError::Partition(e) => write!(f, "partition error: {e}"),
OKIError::Layer(e) => write!(f, "layer error: {e}"),
}
}
}
@@ -31,7 +28,6 @@ impl std::error::Error for OKIError {
OKIError::Io(e) => Some(e),
OKIError::Json(e) => Some(e),
OKIError::Partition(e) => Some(e),
OKIError::Layer(e) => Some(e),
}
}
}
@@ -47,7 +43,3 @@ impl From<serde_json::Error> for OKIError {
impl From<SKError> for OKIError {
fn from(e: SKError) -> Self { OKIError::Partition(e) }
}
impl From<OLMError> for OKIError {
fn from(e: OLMError) -> Self { OKIError::Layer(e) }
}
+30 -141
View File
@@ -3,16 +3,9 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use cacheline_ef::{CachelineEf, CachelineEfVec};
use epserde::prelude::*;
use indicatif::{ProgressBar, ProgressStyle};
use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec};
use obidebruinj::GraphDeBruijn;
use obikpartitionner::KmerPartition;
use obilayeredmap::layer::Layer;
use obiskio::{SKFileMeta, SKFileReader};
use obisys::{Reporter, Stage};
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
use rayon::prelude::*;
use tracing::info;
@@ -20,8 +13,6 @@ use crate::error::{OKIError, OKIResult};
use crate::meta::{IndexConfig, IndexMeta};
use crate::state::{IndexState, SENTINEL_INDEXED, SENTINEL_SCATTERED};
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
pub struct KmerIndex {
root_path: PathBuf,
meta: IndexMeta,
@@ -59,7 +50,12 @@ impl KmerIndex {
pub fn open<P: AsRef<Path>>(path: P) -> OKIResult<Self> {
let root_path = path.as_ref().to_owned();
let meta = IndexMeta::read(&root_path).map_err(OKIError::Io)?;
let partition = KmerPartition::open(&root_path)?;
let partition = KmerPartition::open_with_config(
&root_path,
meta.config.kmer_size,
meta.config.minimizer_size,
meta.config.n_bits,
)?;
Ok(Self { root_path, meta, partition })
}
@@ -87,13 +83,10 @@ impl KmerIndex {
/// Mark scatter as complete and write `scatter.done`.
///
/// If no genome label was set at creation time, one is derived from
/// `first_scatter_path` (filename stripped of all extensions).
/// If `first_scatter_path` is also `None`, the label defaults to `"unknown"`.
pub fn mark_scattered(&mut self, first_scatter_path: Option<&Path>) -> OKIResult<()> {
/// the index root directory name (stripped of all extensions).
pub fn mark_scattered(&mut self) -> OKIResult<()> {
if self.meta.genomes.is_empty() {
let label = first_scatter_path
.map(label_from_path)
.unwrap_or_else(|| "unknown".to_string());
let label = label_from_path(&self.root_path);
self.meta.genomes.push(label);
self.meta.write(&self.root_path)?;
}
@@ -116,20 +109,9 @@ impl KmerIndex {
Ok(())
}
/// Build the layered MPHF index for all partitions.
///
/// Default mode (`config.with_counts = false`): set membership only.
/// With counts: count matrix per kmer.
/// Build the layered MPHF index for all partitions in parallel.
///
/// Writes `index.done` upon completion.
/// Path to the unitigs file for partition `part`, layer `layer`.
pub fn layer_unitigs_path(&self, part: usize, layer: usize) -> PathBuf {
self.partition.part_dir(part)
.join("index")
.join(format!("layer_{layer}"))
.join("unitigs.bin")
}
pub fn build_layers(
&self,
min_ab: u32,
@@ -140,12 +122,8 @@ impl KmerIndex {
let n = self.partition.n_partitions();
let t = Stage::start("index");
let with_counts = self.meta.config.with_counts;
let filter_active = min_ab > 1 || max_ab.is_some();
let need_counts = filter_active || with_counts;
let total_kmers = AtomicUsize::new(0);
let partition = &self.partition;
let pb = Arc::new(Mutex::new(
ProgressBar::new(n as u64).with_style(
ProgressStyle::with_template("index — [{bar:20}] {pos}/{len} | {msg}").unwrap(),
@@ -153,101 +131,19 @@ impl KmerIndex {
));
(0..n).into_par_iter().for_each(|i| {
let part_dir = partition.part_dir(i);
let dedup_path = part_dir.join("dereplicated.skmer.zst");
if !dedup_path.exists() {
return;
}
let layer_dir = part_dir.join("index").join("layer_0");
if layer_dir.join("mphf.bin").exists() {
return;
}
let mphf1_opt: Option<Mphf> = if need_counts {
let p = part_dir.join("mphf1.bin");
p.exists().then(|| Mphf::load_full(&p).ok()).flatten()
} else {
None
};
let counts1_opt: Option<PersistentCompactIntVec> = if need_counts {
let p = part_dir.join("counts1.bin");
p.exists()
.then(|| PersistentCompactIntVec::open(&p).ok())
.flatten()
} else {
None
};
let mut g = GraphDeBruijn::new();
let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| {
eprintln!("error opening {}: {e}", dedup_path.display());
std::process::exit(1);
});
for sk in reader.iter() {
for kmer in sk.iter_canonical_kmers() {
let accept = if filter_active {
match (&mphf1_opt, &counts1_opt) {
(Some(mphf), Some(counts)) => {
let ab = counts.get(mphf.index(&kmer.raw()));
ab >= min_ab && max_ab.map_or(true, |max| ab <= max)
}
_ => true,
}
} else {
true
};
if accept {
g.push(kmer);
}
match self.partition.build_index_layer(i, min_ab, max_ab, with_counts) {
Ok(0) => {}
Ok(n_kmers) => {
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
let pb = pb.lock().unwrap();
pb.inc(1);
pb.set_message(format!("{i}: {n_kmers} kmers"));
}
Err(e) => {
eprintln!("error building layer for partition {i}: {e}");
std::process::exit(1);
}
}
let n_kmers = g.len();
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
g.compute_degrees();
fs::create_dir_all(&layer_dir).unwrap_or_else(|e| {
eprintln!("error creating {}: {e}", layer_dir.display());
std::process::exit(1);
});
let mut uw = Layer::<()>::unitig_writer(&layer_dir).unwrap_or_else(|e| {
eprintln!("error creating unitig writer (partition {i}): {e}");
std::process::exit(1);
});
for unitig in g.iter_unitig() {
uw.write(&unitig).unwrap_or_else(|e| {
eprintln!("error writing unitig (partition {i}): {e}");
std::process::exit(1);
});
}
uw.close().unwrap_or_else(|e| {
eprintln!("error closing unitig writer (partition {i}): {e}");
std::process::exit(1);
});
if with_counts {
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, |kmer| {
match (&mphf1_opt, &counts1_opt) {
(Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())),
_ => 1,
}
})
.unwrap_or_else(|e| {
eprintln!("error building count layer (partition {i}): {e}");
std::process::exit(1);
});
} else {
Layer::<()>::build(&layer_dir).unwrap_or_else(|e| {
eprintln!("error building set layer (partition {i}): {e}");
std::process::exit(1);
});
}
let pb = pb.lock().unwrap();
pb.inc(1);
pb.set_message(format!("{i}: {n_kmers} kmers"));
});
pb.lock().unwrap().finish_and_clear();
@@ -258,13 +154,7 @@ impl KmerIndex {
if !keep_intermediate {
for i in 0..n {
let part_dir = partition.part_dir(i);
remove_if_exists(&part_dir.join("dereplicated.skmer.zst"));
remove_if_exists(&SKFileMeta::sidecar_path(
&part_dir.join("dereplicated.skmer.zst"),
));
remove_if_exists(&part_dir.join("mphf1.bin"));
remove_if_exists(&part_dir.join("counts1.bin"));
self.partition.remove_build_artifacts(i);
}
}
@@ -272,9 +162,16 @@ impl KmerIndex {
rep.push(t.stop());
Ok(())
}
/// Path to the unitigs file for partition `part`, layer `layer`.
pub fn layer_unitigs_path(&self, part: usize, layer: usize) -> PathBuf {
self.partition.part_dir(part)
.join("index")
.join(format!("layer_{layer}"))
.join("unitigs.bin")
}
}
/// Derive a genome label from a file path: filename stripped of all extensions.
fn label_from_path(path: &Path) -> String {
let name = path
.file_name()
@@ -291,11 +188,3 @@ fn label_from_path(path: &Path) -> String {
fn touch(path: &Path) -> Result<(), std::io::Error> {
fs::File::create(path).map(|_| ())
}
fn remove_if_exists(path: &Path) {
if let Err(e) = fs::remove_file(path) {
if e.kind() != std::io::ErrorKind::NotFound {
eprintln!("warning: could not remove {}: {e}", path.display());
}
}
}