refactor: extract obikindex crate and remove deprecated CLI commands
Extracted core indexing logic, state tracking, and metadata management into a new `obikindex` crate. Refactored the `index` and `unitig` commands to leverage the `KmerIndex` abstraction and state-driven pipeline transitions. Removed obsolete CLI subcommands (`count`, `fasta`, `longtig`, `partition`) and their associated pipeline steps. Updated FASTA writing utilities for single-line output and deterministic identifiers, and refreshed workspace dependencies.
This commit is contained in:
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "obikindex"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
obikpartitionner = { path = "../obikpartitionner" }
|
||||
obikseq = { path = "../obikseq" }
|
||||
obisys = { path = "../obisys" }
|
||||
obiskio = { path = "../obiskio" }
|
||||
obidebruinj = { path = "../obidebruinj" }
|
||||
obilayeredmap = { path = "../obilayeredmap" }
|
||||
obicompactvec = { path = "../obicompactvec" }
|
||||
cacheline-ef = "1.1"
|
||||
epserde = "0.8"
|
||||
ptr_hash = "1.1"
|
||||
rayon = "1"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
indicatif = "0.17"
|
||||
tracing = "0.1.44"
|
||||
@@ -0,0 +1,53 @@
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
|
||||
use obiskio::SKError;
|
||||
use obilayeredmap::OLMError;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum OKIError {
|
||||
Io(io::Error),
|
||||
Json(serde_json::Error),
|
||||
Partition(SKError),
|
||||
Layer(OLMError),
|
||||
}
|
||||
|
||||
pub type OKIResult<T> = Result<T, OKIError>;
|
||||
|
||||
impl fmt::Display for OKIError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
OKIError::Io(e) => write!(f, "I/O error: {e}"),
|
||||
OKIError::Json(e) => write!(f, "JSON error: {e}"),
|
||||
OKIError::Partition(e) => write!(f, "partition error: {e}"),
|
||||
OKIError::Layer(e) => write!(f, "layer error: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for OKIError {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
match self {
|
||||
OKIError::Io(e) => Some(e),
|
||||
OKIError::Json(e) => Some(e),
|
||||
OKIError::Partition(e) => Some(e),
|
||||
OKIError::Layer(e) => Some(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<io::Error> for OKIError {
|
||||
fn from(e: io::Error) -> Self { OKIError::Io(e) }
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for OKIError {
|
||||
fn from(e: serde_json::Error) -> Self { OKIError::Json(e) }
|
||||
}
|
||||
|
||||
impl From<SKError> for OKIError {
|
||||
fn from(e: SKError) -> Self { OKIError::Partition(e) }
|
||||
}
|
||||
|
||||
impl From<OLMError> for OKIError {
|
||||
fn from(e: OLMError) -> Self { OKIError::Layer(e) }
|
||||
}
|
||||
@@ -0,0 +1,301 @@
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use cacheline_ef::{CachelineEf, CachelineEfVec};
|
||||
use epserde::prelude::*;
|
||||
use indicatif::{ProgressBar, ProgressStyle};
|
||||
use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec};
|
||||
use obidebruinj::GraphDeBruijn;
|
||||
use obikpartitionner::KmerPartition;
|
||||
use obilayeredmap::layer::Layer;
|
||||
use obiskio::{SKFileMeta, SKFileReader};
|
||||
use obisys::{Reporter, Stage};
|
||||
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
|
||||
use rayon::prelude::*;
|
||||
use tracing::info;
|
||||
|
||||
use crate::error::{OKIError, OKIResult};
|
||||
use crate::meta::{IndexConfig, IndexMeta};
|
||||
use crate::state::{IndexState, SENTINEL_INDEXED, SENTINEL_SCATTERED};
|
||||
|
||||
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
|
||||
|
||||
pub struct KmerIndex {
|
||||
root_path: PathBuf,
|
||||
meta: IndexMeta,
|
||||
partition: KmerPartition,
|
||||
}
|
||||
|
||||
impl KmerIndex {
|
||||
/// Create a new index at `path`.
|
||||
///
|
||||
/// If `genome_label` is `Some`, it is stored immediately.
|
||||
/// If `None`, the label will be derived from the first scatter input path
|
||||
/// when `mark_scattered` is called.
|
||||
pub fn create<P: AsRef<Path>>(
|
||||
path: P,
|
||||
config: IndexConfig,
|
||||
genome_label: Option<String>,
|
||||
force: bool,
|
||||
) -> OKIResult<Self> {
|
||||
let root_path = path.as_ref().to_owned();
|
||||
let partition = KmerPartition::create(
|
||||
&root_path,
|
||||
config.n_bits,
|
||||
config.kmer_size,
|
||||
config.minimizer_size,
|
||||
force,
|
||||
)?;
|
||||
let mut meta = IndexMeta::new(config);
|
||||
if let Some(label) = genome_label {
|
||||
meta.genomes.push(label);
|
||||
}
|
||||
meta.write(&root_path)?;
|
||||
Ok(Self { root_path, meta, partition })
|
||||
}
|
||||
|
||||
pub fn open<P: AsRef<Path>>(path: P) -> OKIResult<Self> {
|
||||
let root_path = path.as_ref().to_owned();
|
||||
let meta = IndexMeta::read(&root_path).map_err(OKIError::Io)?;
|
||||
let partition = KmerPartition::open(&root_path)?;
|
||||
Ok(Self { root_path, meta, partition })
|
||||
}
|
||||
|
||||
/// Return `true` if `path` contains an `index.meta` file.
|
||||
pub fn exists<P: AsRef<Path>>(path: P) -> bool {
|
||||
IndexMeta::exists(path.as_ref())
|
||||
}
|
||||
|
||||
/// Current construction state, as reported by sentinel files on disk.
|
||||
pub fn state(&self) -> IndexState {
|
||||
IndexState::detect(&self.root_path).unwrap_or(IndexState::Empty)
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &IndexMeta { &self.meta }
|
||||
pub fn kmer_size(&self) -> usize { self.meta.config.kmer_size }
|
||||
pub fn minimizer_size(&self) -> usize { self.meta.config.minimizer_size }
|
||||
pub fn n_partitions(&self) -> usize { self.partition.n_partitions() }
|
||||
|
||||
/// Expose the inner partition so the caller can run scatter into it.
|
||||
/// Call `mark_scattered` once scatter is complete.
|
||||
pub fn partition_mut(&mut self) -> &mut KmerPartition {
|
||||
&mut self.partition
|
||||
}
|
||||
|
||||
/// Mark scatter as complete and write `scatter.done`.
|
||||
///
|
||||
/// If no genome label was set at creation time, one is derived from
|
||||
/// `first_scatter_path` (filename stripped of all extensions).
|
||||
/// If `first_scatter_path` is also `None`, the label defaults to `"unknown"`.
|
||||
pub fn mark_scattered(&mut self, first_scatter_path: Option<&Path>) -> OKIResult<()> {
|
||||
if self.meta.genomes.is_empty() {
|
||||
let label = first_scatter_path
|
||||
.map(label_from_path)
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
self.meta.genomes.push(label);
|
||||
self.meta.write(&self.root_path)?;
|
||||
}
|
||||
touch(&self.root_path.join(SENTINEL_SCATTERED))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Dereplicate all partitions then compute kmer counts.
|
||||
///
|
||||
/// Writes `kmer_spectrum_raw.json` at the index root upon completion
|
||||
/// (this file doubles as the `Counted` sentinel).
|
||||
pub fn dereplicate_and_count(&self, rep: &mut Reporter) -> OKIResult<()> {
|
||||
let t = Stage::start("dereplicate");
|
||||
self.partition.dereplicate()?;
|
||||
rep.push(t.stop());
|
||||
|
||||
let t = Stage::start("count_kmer");
|
||||
self.partition.count_kmer()?;
|
||||
rep.push(t.stop());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build the layered MPHF index for all partitions.
|
||||
///
|
||||
/// Default mode (`config.with_counts = false`): set membership only.
|
||||
/// With counts: count matrix per kmer.
|
||||
///
|
||||
/// Writes `index.done` upon completion.
|
||||
/// Path to the unitigs file for partition `part`, layer `layer`.
|
||||
pub fn layer_unitigs_path(&self, part: usize, layer: usize) -> PathBuf {
|
||||
self.partition.part_dir(part)
|
||||
.join("index")
|
||||
.join(format!("layer_{layer}"))
|
||||
.join("unitigs.bin")
|
||||
}
|
||||
|
||||
pub fn build_layers(
|
||||
&self,
|
||||
min_ab: u32,
|
||||
max_ab: Option<u32>,
|
||||
keep_intermediate: bool,
|
||||
rep: &mut Reporter,
|
||||
) -> OKIResult<()> {
|
||||
let n = self.partition.n_partitions();
|
||||
let t = Stage::start("index");
|
||||
let with_counts = self.meta.config.with_counts;
|
||||
let filter_active = min_ab > 1 || max_ab.is_some();
|
||||
let need_counts = filter_active || with_counts;
|
||||
let total_kmers = AtomicUsize::new(0);
|
||||
|
||||
let partition = &self.partition;
|
||||
|
||||
let pb = Arc::new(Mutex::new(
|
||||
ProgressBar::new(n as u64).with_style(
|
||||
ProgressStyle::with_template("index — [{bar:20}] {pos}/{len} | {msg}").unwrap(),
|
||||
),
|
||||
));
|
||||
|
||||
(0..n).into_par_iter().for_each(|i| {
|
||||
let part_dir = partition.part_dir(i);
|
||||
let dedup_path = part_dir.join("dereplicated.skmer.zst");
|
||||
if !dedup_path.exists() {
|
||||
return;
|
||||
}
|
||||
|
||||
let layer_dir = part_dir.join("index").join("layer_0");
|
||||
if layer_dir.join("mphf.bin").exists() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mphf1_opt: Option<Mphf> = if need_counts {
|
||||
let p = part_dir.join("mphf1.bin");
|
||||
p.exists().then(|| Mphf::load_full(&p).ok()).flatten()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let counts1_opt: Option<PersistentCompactIntVec> = if need_counts {
|
||||
let p = part_dir.join("counts1.bin");
|
||||
p.exists()
|
||||
.then(|| PersistentCompactIntVec::open(&p).ok())
|
||||
.flatten()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut g = GraphDeBruijn::new();
|
||||
let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| {
|
||||
eprintln!("error opening {}: {e}", dedup_path.display());
|
||||
std::process::exit(1);
|
||||
});
|
||||
for sk in reader.iter() {
|
||||
for kmer in sk.iter_canonical_kmers() {
|
||||
let accept = if filter_active {
|
||||
match (&mphf1_opt, &counts1_opt) {
|
||||
(Some(mphf), Some(counts)) => {
|
||||
let ab = counts.get(mphf.index(&kmer.raw()));
|
||||
ab >= min_ab && max_ab.map_or(true, |max| ab <= max)
|
||||
}
|
||||
_ => true,
|
||||
}
|
||||
} else {
|
||||
true
|
||||
};
|
||||
if accept {
|
||||
g.push(kmer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let n_kmers = g.len();
|
||||
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
|
||||
g.compute_degrees();
|
||||
|
||||
fs::create_dir_all(&layer_dir).unwrap_or_else(|e| {
|
||||
eprintln!("error creating {}: {e}", layer_dir.display());
|
||||
std::process::exit(1);
|
||||
});
|
||||
let mut uw = Layer::<()>::unitig_writer(&layer_dir).unwrap_or_else(|e| {
|
||||
eprintln!("error creating unitig writer (partition {i}): {e}");
|
||||
std::process::exit(1);
|
||||
});
|
||||
for unitig in g.iter_unitig() {
|
||||
uw.write(&unitig).unwrap_or_else(|e| {
|
||||
eprintln!("error writing unitig (partition {i}): {e}");
|
||||
std::process::exit(1);
|
||||
});
|
||||
}
|
||||
uw.close().unwrap_or_else(|e| {
|
||||
eprintln!("error closing unitig writer (partition {i}): {e}");
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
if with_counts {
|
||||
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, |kmer| {
|
||||
match (&mphf1_opt, &counts1_opt) {
|
||||
(Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())),
|
||||
_ => 1,
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|e| {
|
||||
eprintln!("error building count layer (partition {i}): {e}");
|
||||
std::process::exit(1);
|
||||
});
|
||||
} else {
|
||||
Layer::<()>::build(&layer_dir).unwrap_or_else(|e| {
|
||||
eprintln!("error building set layer (partition {i}): {e}");
|
||||
std::process::exit(1);
|
||||
});
|
||||
}
|
||||
|
||||
let pb = pb.lock().unwrap();
|
||||
pb.inc(1);
|
||||
pb.set_message(format!("{i}: {n_kmers} kmers"));
|
||||
});
|
||||
|
||||
pb.lock().unwrap().finish_and_clear();
|
||||
info!(
|
||||
"done — {} total kmers indexed",
|
||||
total_kmers.load(Ordering::Relaxed)
|
||||
);
|
||||
|
||||
if !keep_intermediate {
|
||||
for i in 0..n {
|
||||
let part_dir = partition.part_dir(i);
|
||||
remove_if_exists(&part_dir.join("dereplicated.skmer.zst"));
|
||||
remove_if_exists(&SKFileMeta::sidecar_path(
|
||||
&part_dir.join("dereplicated.skmer.zst"),
|
||||
));
|
||||
remove_if_exists(&part_dir.join("mphf1.bin"));
|
||||
remove_if_exists(&part_dir.join("counts1.bin"));
|
||||
}
|
||||
}
|
||||
|
||||
touch(&self.root_path.join(SENTINEL_INDEXED))?;
|
||||
rep.push(t.stop());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Derive a genome label from a file path: filename stripped of all extensions.
|
||||
fn label_from_path(path: &Path) -> String {
|
||||
let name = path
|
||||
.file_name()
|
||||
.unwrap_or(path.as_os_str())
|
||||
.to_string_lossy()
|
||||
.into_owned();
|
||||
let mut s = name;
|
||||
while let Some(pos) = s.rfind('.') {
|
||||
s.truncate(pos);
|
||||
}
|
||||
if s.is_empty() { "unknown".to_string() } else { s }
|
||||
}
|
||||
|
||||
fn touch(path: &Path) -> Result<(), std::io::Error> {
|
||||
fs::File::create(path).map(|_| ())
|
||||
}
|
||||
|
||||
fn remove_if_exists(path: &Path) {
|
||||
if let Err(e) = fs::remove_file(path) {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
eprintln!("warning: could not remove {}: {e}", path.display());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
pub mod error;
|
||||
pub mod meta;
|
||||
pub mod state;
|
||||
mod index;
|
||||
|
||||
pub use error::{OKIError, OKIResult};
|
||||
pub use index::KmerIndex;
|
||||
pub use meta::{IndexConfig, IndexMeta, META_FILENAME};
|
||||
pub use state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED};
|
||||
@@ -0,0 +1,45 @@
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const META_FILENAME: &str = "index.meta";
|
||||
const META_VERSION: u32 = 1;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexConfig {
|
||||
pub kmer_size: usize,
|
||||
pub minimizer_size: usize,
|
||||
pub n_bits: usize,
|
||||
pub with_counts: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexMeta {
|
||||
pub version: u32,
|
||||
pub config: IndexConfig,
|
||||
/// Ordered list of genome labels indexed here.
|
||||
/// Element 0 is the initial genome; subsequent entries come from merges.
|
||||
pub genomes: Vec<String>,
|
||||
}
|
||||
|
||||
impl IndexMeta {
|
||||
pub fn new(config: IndexConfig) -> Self {
|
||||
Self { version: META_VERSION, config, genomes: Vec::new() }
|
||||
}
|
||||
|
||||
pub fn write(&self, root: &Path) -> io::Result<()> {
|
||||
let file = fs::File::create(root.join(META_FILENAME))?;
|
||||
serde_json::to_writer_pretty(file, self).map_err(io::Error::other)
|
||||
}
|
||||
|
||||
pub fn read(root: &Path) -> io::Result<Self> {
|
||||
let file = fs::File::open(root.join(META_FILENAME))?;
|
||||
serde_json::from_reader(file).map_err(io::Error::other)
|
||||
}
|
||||
|
||||
pub fn exists(root: &Path) -> bool {
|
||||
root.join(META_FILENAME).exists()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
use std::path::Path;
|
||||
|
||||
use crate::meta::META_FILENAME;
|
||||
|
||||
pub const SENTINEL_SCATTERED: &str = "scatter.done";
|
||||
pub const SENTINEL_COUNTED: &str = "kmer_spectrum_raw.json";
|
||||
pub const SENTINEL_INDEXED: &str = "index.done";
|
||||
|
||||
/// Progression state of a `KmerIndex`.
|
||||
///
|
||||
/// Variants are ordered: `Empty < Scattered < Counted < Indexed`.
|
||||
/// A state is reported only when its sentinel file is fully present —
|
||||
/// partial states (e.g. scatter interrupted mid-way) are not accepted.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum IndexState {
|
||||
/// `index.meta` present; scatter not yet completed.
|
||||
Empty,
|
||||
/// `scatter.done` sentinel present — all super-kmers have been routed.
|
||||
Scattered,
|
||||
/// `kmer_spectrum_raw.json` present — dereplicate + count complete.
|
||||
Counted,
|
||||
/// `index.done` sentinel present — layered MPHF index fully built.
|
||||
Indexed,
|
||||
}
|
||||
|
||||
impl IndexState {
|
||||
/// Detect the state of the index at `root`.
|
||||
///
|
||||
/// Returns `None` if `index.meta` is absent (not an obikindex directory).
|
||||
pub fn detect(root: &Path) -> Option<Self> {
|
||||
if !root.join(META_FILENAME).exists() {
|
||||
return None;
|
||||
}
|
||||
if root.join(SENTINEL_INDEXED).exists() {
|
||||
return Some(Self::Indexed);
|
||||
}
|
||||
if root.join(SENTINEL_COUNTED).exists() {
|
||||
return Some(Self::Counted);
|
||||
}
|
||||
if root.join(SENTINEL_SCATTERED).exists() {
|
||||
return Some(Self::Scattered);
|
||||
}
|
||||
Some(Self::Empty)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user