feat(obikmer): add index subcommand for kmer counting pipeline

Introduce the `index` CLI subcommand, implementing a resumable, multi-stage pipeline to partition, dereplicate, and count kmers from input sequences. The command builds a layered de Bruijn graph index per partition, applies optional abundance filtering, and persists unitigs alongside an MPHF-based count matrix. Update `Cargo.toml` and `Cargo.lock` to include new dependencies (`epserde`, `ptr_hash`, `cacheline-ef`, `obicompactvec`, `obilayeredmap`) required for the index builder, and refresh the profiling data files.
This commit is contained in:
Eric Coissac
2026-05-20 14:37:30 +02:00
parent c20a1ed465
commit e66c4d81ef
9 changed files with 231 additions and 1 deletions
Vendored
BIN
View File
Binary file not shown.
Binary file not shown.
Binary file not shown.
+5
View File
@@ -1760,15 +1760,19 @@ dependencies = [
name = "obikmer" name = "obikmer"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"cacheline-ef",
"clap", "clap",
"epserde 0.8.0",
"indicatif", "indicatif",
"memmap2", "memmap2",
"niffler 3.0.0", "niffler 3.0.0",
"obicompactvec",
"obidebruinj", "obidebruinj",
"obifastwrite", "obifastwrite",
"obikpartitionner", "obikpartitionner",
"obikrope", "obikrope",
"obikseq", "obikseq",
"obilayeredmap",
"obipipeline", "obipipeline",
"obiread", "obiread",
"obiskbuilder", "obiskbuilder",
@@ -1776,6 +1780,7 @@ dependencies = [
"obisys", "obisys",
"ph", "ph",
"pprof", "pprof",
"ptr_hash",
"rayon", "rayon",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
+5
View File
@@ -19,10 +19,15 @@ obikrope = { path = "../obikrope" }
obikpartitionner = { path = "../obikpartitionner" } obikpartitionner = { path = "../obikpartitionner" }
obisys = { path = "../obisys" } obisys = { path = "../obisys" }
obiskio = { path = "../obiskio" } obiskio = { path = "../obiskio" }
obicompactvec = { path = "../obicompactvec" }
obilayeredmap = { path = "../obilayeredmap" }
niffler = "3" niffler = "3"
rayon = "1" rayon = "1"
ph = "0.11" ph = "0.11"
memmap2 = "0.9" memmap2 = "0.9"
epserde = "0.8"
ptr_hash = "1.1"
cacheline-ef = "1.1"
indicatif = "0.17" indicatif = "0.17"
tracing = "0.1.44" tracing = "0.1.44"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
+216
View File
@@ -0,0 +1,216 @@
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use cacheline_ef::{CachelineEf, CachelineEfVec};
use clap::Args;
use epserde::prelude::*;
use obicompactvec::PersistentCompactIntMatrix;
use obicompactvec::PersistentCompactIntVec;
use obidebruinj::GraphDeBruijn;
use obikpartitionner::KmerPartition;
use obikseq::{RoutableSuperKmer, set_k, set_m};
use obiskio::SKFileReader;
use obilayeredmap::layer::Layer;
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
use rayon::prelude::*;
use tracing::info;
use crate::cli::{CommonArgs, PipelineData, open_chunks};
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
#[derive(Args)]
pub struct IndexArgs {
/// Output partition directory
#[arg(short, long)]
pub output: PathBuf,
/// Overwrite output directory if it already exists
#[arg(long, default_value_t = false)]
pub force: bool,
/// Minimum kmer abundance (inclusive)
#[arg(long, default_value_t = 1)]
pub min_abundance: u32,
/// Maximum kmer abundance (inclusive)
#[arg(long)]
pub max_abundance: Option<u32>,
#[command(flatten)]
pub common: CommonArgs,
}
pub fn run(args: IndexArgs) {
let output = args.output.clone();
let min_ab = args.min_abundance;
let max_ab = args.max_abundance;
// ── Stage 1: scatter (skipped if partition already exists) ───────────────
let partition_meta = output.join("partition.meta");
let kp = if partition_meta.exists() {
info!("resuming from existing partition at {}", output.display());
let kp = KmerPartition::open(&output).unwrap_or_else(|e| {
eprintln!("error opening partition: {e}");
std::process::exit(1);
});
set_k(kp.kmer_size());
set_m(kp.minimizer_size());
kp
} else {
let k = args.common.kmer_size;
set_k(k);
let m = args.common.minimizer_size;
set_m(m);
let theta = args.common.theta;
let level_max = args.common.level_max;
let n_workers = args.common.threads.max(1);
let mut kp =
KmerPartition::create(&output, args.common.partition_bits, k, m, args.force)
.unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1);
});
let path_source = args.common.seqfile_paths();
info!("scattering...");
let pipe = obipipeline::make_pipe! {
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
||? { |path| open_chunks(path) } : Path => RawChunk,
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
};
for batch in pipe.apply(path_source, n_workers, 1) {
kp.write_batch(batch).unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1);
});
}
kp.close().expect("close error");
kp
};
let k = kp.kmer_size();
let n = kp.n_partitions();
// ── Stage 2: dereplicate + count (skipped if already done) ───────────────
let counting_done = output.join("kmer_spectrum_raw.json").exists();
if !counting_done {
info!("dereplicating...");
kp.dereplicate().expect("dereplicate error");
info!("counting kmers...");
kp.count_kmer().expect("count kmer error");
} else {
info!("kmer counts already present, skipping dereplicate + count");
}
// ── Stage 3: build layered index per partition ────────────────────────────
info!("building index ({n} partitions, k={k})...");
let total_kmers = AtomicUsize::new(0);
let filter_active = min_ab > 1 || max_ab.is_some();
(0..n).into_par_iter().for_each(|i| {
let part_dir = output.join(format!("part_{i:05}"));
let dedup_path = part_dir.join("dereplicated.skmer.zst");
if !dedup_path.exists() {
return;
}
let layer_dir = part_dir.join("index").join("layer_0");
if layer_dir.join("mphf.bin").exists() {
return;
}
// Load partition MPHF + counts for abundance filtering
let mphf1_opt: Option<Mphf> = if filter_active {
let p = part_dir.join("mphf1.bin");
p.exists()
.then(|| Mphf::load_full(&p).ok())
.flatten()
} else {
None
};
let counts1_opt: Option<PersistentCompactIntVec> = if filter_active {
let p = part_dir.join("counts1.bin");
p.exists()
.then(|| PersistentCompactIntVec::open(&p).ok())
.flatten()
} else {
None
};
// Build de Bruijn graph with optional abundance filter
let mut g = GraphDeBruijn::new();
let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| {
eprintln!("error opening {}: {e}", dedup_path.display());
std::process::exit(1);
});
for sk in reader.iter() {
for kmer in sk.iter_canonical_kmers() {
let accept = match (&mphf1_opt, &counts1_opt) {
(Some(mphf), Some(counts)) => {
let slot = mphf.index(&kmer.raw());
let ab = counts.get(slot);
ab >= min_ab && max_ab.map_or(true, |max| ab <= max)
}
_ => true,
};
if accept {
g.push(kmer);
}
}
}
let n_kmers = g.len();
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
g.compute_degrees();
// Write unitigs to layer_0/unitigs.bin
fs::create_dir_all(&layer_dir).unwrap_or_else(|e| {
eprintln!("error creating {}: {e}", layer_dir.display());
std::process::exit(1);
});
let mut uw = Layer::<PersistentCompactIntMatrix>::unitig_writer(&layer_dir)
.unwrap_or_else(|e| {
eprintln!("error creating unitig writer for partition {i}: {e}");
std::process::exit(1);
});
for unitig in g.iter_unitig() {
uw.write(&unitig).unwrap_or_else(|e| {
eprintln!("error writing unitig (partition {i}): {e}");
std::process::exit(1);
});
}
uw.close().unwrap_or_else(|e| {
eprintln!("error closing unitig writer (partition {i}): {e}");
std::process::exit(1);
});
// Build MPHF + count matrix for this layer
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, |kmer| {
match (&mphf1_opt, &counts1_opt) {
(Some(mphf), Some(counts)) => {
let slot = mphf.index(&kmer.raw());
counts.get(slot)
}
_ => 1,
}
})
.unwrap_or_else(|e| {
eprintln!("error building layer index (partition {i}): {e}");
std::process::exit(1);
});
info!("partition {i}: {n_kmers} kmers indexed");
});
info!(
"done — {} total kmers indexed across all partitions",
total_kmers.load(Ordering::Relaxed)
);
}
+1
View File
@@ -1,5 +1,6 @@
pub mod count; pub mod count;
pub mod fasta; pub mod fasta;
pub mod index;
pub mod longtig; pub mod longtig;
pub mod partition; pub mod partition;
pub mod superkmer; pub mod superkmer;
+3
View File
@@ -25,6 +25,8 @@ enum Commands {
Unitig(cmd::unitig::UnitigArgs), Unitig(cmd::unitig::UnitigArgs),
/// Build de Bruijn longtigs for all partitions and write to longtig.fasta.gz /// Build de Bruijn longtigs for all partitions and write to longtig.fasta.gz
Longtig(cmd::longtig::LongtigArgs), Longtig(cmd::longtig::LongtigArgs),
/// Build the complete genome index (scatter → dereplicate → count → layered MPHF)
Index(cmd::index::IndexArgs),
} }
fn main() { fn main() {
@@ -52,6 +54,7 @@ fn main() {
Commands::Fasta(args) => cmd::fasta::run(args), Commands::Fasta(args) => cmd::fasta::run(args),
Commands::Unitig(args) => cmd::unitig::run(args), Commands::Unitig(args) => cmd::unitig::run(args),
Commands::Longtig(args) => cmd::longtig::run(args), Commands::Longtig(args) => cmd::longtig::run(args),
Commands::Index(args) => cmd::index::run(args),
} }
#[cfg(feature = "profiling")] #[cfg(feature = "profiling")]
+1 -1
View File
@@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex, OnceLock};
pub const MAX_POOL_SIZE: usize = 512; pub const MAX_POOL_SIZE: usize = 512;
/// Default pending buffer threshold (bytes) before draining to the physical fd. /// Default pending buffer threshold (bytes) before draining to the physical fd.
pub const DEFAULT_FLUSH_THRESHOLD: usize = 8 * 1024; pub const DEFAULT_FLUSH_THRESHOLD: usize = 64 * 1024;
// Convenient alias for the per-entry physical writer slot. // Convenient alias for the per-entry physical writer slot.
type PhysWriter = Option<Box<dyn Write + Send>>; type PhysWriter = Option<Box<dyn Write + Send>>;