diff --git a/.DS_Store b/.DS_Store index 0c99d24..e0acbe7 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/obikmer 2026-05-20 12.35 profile.json.gz b/obikmer 2026-05-20 12.35 profile.json.gz deleted file mode 100644 index 57da44b..0000000 Binary files a/obikmer 2026-05-20 12.35 profile.json.gz and /dev/null differ diff --git a/obikmer 2026-05-20 12.53 profile.json.gz b/obikmer 2026-05-20 12.53 profile.json.gz new file mode 100644 index 0000000..97c320a Binary files /dev/null and b/obikmer 2026-05-20 12.53 profile.json.gz differ diff --git a/src/Cargo.lock b/src/Cargo.lock index 98562bb..cd1f37c 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1760,15 +1760,19 @@ dependencies = [ name = "obikmer" version = "0.1.0" dependencies = [ + "cacheline-ef", "clap", + "epserde 0.8.0", "indicatif", "memmap2", "niffler 3.0.0", + "obicompactvec", "obidebruinj", "obifastwrite", "obikpartitionner", "obikrope", "obikseq", + "obilayeredmap", "obipipeline", "obiread", "obiskbuilder", @@ -1776,6 +1780,7 @@ dependencies = [ "obisys", "ph", "pprof", + "ptr_hash", "rayon", "tracing", "tracing-subscriber", diff --git a/src/obikmer/Cargo.toml b/src/obikmer/Cargo.toml index 95ad03f..ea384d4 100644 --- a/src/obikmer/Cargo.toml +++ b/src/obikmer/Cargo.toml @@ -19,10 +19,15 @@ obikrope = { path = "../obikrope" } obikpartitionner = { path = "../obikpartitionner" } obisys = { path = "../obisys" } obiskio = { path = "../obiskio" } +obicompactvec = { path = "../obicompactvec" } +obilayeredmap = { path = "../obilayeredmap" } niffler = "3" rayon = "1" ph = "0.11" memmap2 = "0.9" +epserde = "0.8" +ptr_hash = "1.1" +cacheline-ef = "1.1" indicatif = "0.17" tracing = "0.1.44" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } diff --git a/src/obikmer/src/cmd/index.rs b/src/obikmer/src/cmd/index.rs new file mode 100644 index 0000000..3e7d954 --- /dev/null +++ b/src/obikmer/src/cmd/index.rs @@ -0,0 +1,216 @@ +use std::fs; +use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use cacheline_ef::{CachelineEf, CachelineEfVec}; +use clap::Args; +use epserde::prelude::*; +use obicompactvec::PersistentCompactIntMatrix; +use obicompactvec::PersistentCompactIntVec; +use obidebruinj::GraphDeBruijn; +use obikpartitionner::KmerPartition; +use obikseq::{RoutableSuperKmer, set_k, set_m}; +use obiskio::SKFileReader; +use obilayeredmap::layer::Layer; +use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; +use rayon::prelude::*; +use tracing::info; + +use crate::cli::{CommonArgs, PipelineData, open_chunks}; + +type Mphf = PtrHash>, Xx64, Vec>; + +#[derive(Args)] +pub struct IndexArgs { + /// Output partition directory + #[arg(short, long)] + pub output: PathBuf, + + /// Overwrite output directory if it already exists + #[arg(long, default_value_t = false)] + pub force: bool, + + /// Minimum kmer abundance (inclusive) + #[arg(long, default_value_t = 1)] + pub min_abundance: u32, + + /// Maximum kmer abundance (inclusive) + #[arg(long)] + pub max_abundance: Option, + + #[command(flatten)] + pub common: CommonArgs, +} + +pub fn run(args: IndexArgs) { + let output = args.output.clone(); + let min_ab = args.min_abundance; + let max_ab = args.max_abundance; + + // ── Stage 1: scatter (skipped if partition already exists) ─────────────── + let partition_meta = output.join("partition.meta"); + let kp = if partition_meta.exists() { + info!("resuming from existing partition at {}", output.display()); + let kp = KmerPartition::open(&output).unwrap_or_else(|e| { + eprintln!("error opening partition: {e}"); + std::process::exit(1); + }); + set_k(kp.kmer_size()); + set_m(kp.minimizer_size()); + kp + } else { + let k = args.common.kmer_size; + set_k(k); + let m = args.common.minimizer_size; + set_m(m); + let theta = args.common.theta; + let level_max = args.common.level_max; + let n_workers = args.common.threads.max(1); + + let mut kp = + KmerPartition::create(&output, args.common.partition_bits, k, m, args.force) + .unwrap_or_else(|e| { + eprintln!("error: {e}"); + std::process::exit(1); + }); + + let path_source = args.common.seqfile_paths(); + + info!("scattering..."); + let pipe = obipipeline::make_pipe! { + PipelineData : PathBuf => Vec, + ||? { |path| open_chunks(path) } : Path => RawChunk, + |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, + | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, + }; + + for batch in pipe.apply(path_source, n_workers, 1) { + kp.write_batch(batch).unwrap_or_else(|e| { + eprintln!("error: {e}"); + std::process::exit(1); + }); + } + kp.close().expect("close error"); + kp + }; + + let k = kp.kmer_size(); + let n = kp.n_partitions(); + + // ── Stage 2: dereplicate + count (skipped if already done) ─────────────── + let counting_done = output.join("kmer_spectrum_raw.json").exists(); + if !counting_done { + info!("dereplicating..."); + kp.dereplicate().expect("dereplicate error"); + info!("counting kmers..."); + kp.count_kmer().expect("count kmer error"); + } else { + info!("kmer counts already present, skipping dereplicate + count"); + } + + // ── Stage 3: build layered index per partition ──────────────────────────── + info!("building index ({n} partitions, k={k})..."); + let total_kmers = AtomicUsize::new(0); + let filter_active = min_ab > 1 || max_ab.is_some(); + + (0..n).into_par_iter().for_each(|i| { + let part_dir = output.join(format!("part_{i:05}")); + let dedup_path = part_dir.join("dereplicated.skmer.zst"); + if !dedup_path.exists() { + return; + } + + let layer_dir = part_dir.join("index").join("layer_0"); + if layer_dir.join("mphf.bin").exists() { + return; + } + + // Load partition MPHF + counts for abundance filtering + let mphf1_opt: Option = if filter_active { + let p = part_dir.join("mphf1.bin"); + p.exists() + .then(|| Mphf::load_full(&p).ok()) + .flatten() + } else { + None + }; + + let counts1_opt: Option = if filter_active { + let p = part_dir.join("counts1.bin"); + p.exists() + .then(|| PersistentCompactIntVec::open(&p).ok()) + .flatten() + } else { + None + }; + + // Build de Bruijn graph with optional abundance filter + let mut g = GraphDeBruijn::new(); + let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| { + eprintln!("error opening {}: {e}", dedup_path.display()); + std::process::exit(1); + }); + for sk in reader.iter() { + for kmer in sk.iter_canonical_kmers() { + let accept = match (&mphf1_opt, &counts1_opt) { + (Some(mphf), Some(counts)) => { + let slot = mphf.index(&kmer.raw()); + let ab = counts.get(slot); + ab >= min_ab && max_ab.map_or(true, |max| ab <= max) + } + _ => true, + }; + if accept { + g.push(kmer); + } + } + } + + let n_kmers = g.len(); + total_kmers.fetch_add(n_kmers, Ordering::Relaxed); + g.compute_degrees(); + + // Write unitigs to layer_0/unitigs.bin + fs::create_dir_all(&layer_dir).unwrap_or_else(|e| { + eprintln!("error creating {}: {e}", layer_dir.display()); + std::process::exit(1); + }); + let mut uw = Layer::::unitig_writer(&layer_dir) + .unwrap_or_else(|e| { + eprintln!("error creating unitig writer for partition {i}: {e}"); + std::process::exit(1); + }); + for unitig in g.iter_unitig() { + uw.write(&unitig).unwrap_or_else(|e| { + eprintln!("error writing unitig (partition {i}): {e}"); + std::process::exit(1); + }); + } + uw.close().unwrap_or_else(|e| { + eprintln!("error closing unitig writer (partition {i}): {e}"); + std::process::exit(1); + }); + + // Build MPHF + count matrix for this layer + Layer::::build(&layer_dir, |kmer| { + match (&mphf1_opt, &counts1_opt) { + (Some(mphf), Some(counts)) => { + let slot = mphf.index(&kmer.raw()); + counts.get(slot) + } + _ => 1, + } + }) + .unwrap_or_else(|e| { + eprintln!("error building layer index (partition {i}): {e}"); + std::process::exit(1); + }); + + info!("partition {i}: {n_kmers} kmers indexed"); + }); + + info!( + "done — {} total kmers indexed across all partitions", + total_kmers.load(Ordering::Relaxed) + ); +} diff --git a/src/obikmer/src/cmd/mod.rs b/src/obikmer/src/cmd/mod.rs index 580fa1c..bf1fe18 100644 --- a/src/obikmer/src/cmd/mod.rs +++ b/src/obikmer/src/cmd/mod.rs @@ -1,5 +1,6 @@ pub mod count; pub mod fasta; +pub mod index; pub mod longtig; pub mod partition; pub mod superkmer; diff --git a/src/obikmer/src/main.rs b/src/obikmer/src/main.rs index 33a0c12..6e19c05 100644 --- a/src/obikmer/src/main.rs +++ b/src/obikmer/src/main.rs @@ -25,6 +25,8 @@ enum Commands { Unitig(cmd::unitig::UnitigArgs), /// Build de Bruijn longtigs for all partitions and write to longtig.fasta.gz Longtig(cmd::longtig::LongtigArgs), + /// Build the complete genome index (scatter → dereplicate → count → layered MPHF) + Index(cmd::index::IndexArgs), } fn main() { @@ -52,6 +54,7 @@ fn main() { Commands::Fasta(args) => cmd::fasta::run(args), Commands::Unitig(args) => cmd::unitig::run(args), Commands::Longtig(args) => cmd::longtig::run(args), + Commands::Index(args) => cmd::index::run(args), } #[cfg(feature = "profiling")] diff --git a/src/obiskio/src/pool.rs b/src/obiskio/src/pool.rs index 8360d60..1c90299 100644 --- a/src/obiskio/src/pool.rs +++ b/src/obiskio/src/pool.rs @@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex, OnceLock}; pub const MAX_POOL_SIZE: usize = 512; /// Default pending buffer threshold (bytes) before draining to the physical fd. -pub const DEFAULT_FLUSH_THRESHOLD: usize = 8 * 1024; +pub const DEFAULT_FLUSH_THRESHOLD: usize = 64 * 1024; // Convenient alias for the per-entry physical writer slot. type PhysWriter = Option>;