refactor: extract pipeline stages and centralize partition directory paths
Extracts the scatter, dereplicate/count, and index pipeline stages into a new `steps` module to improve modularity. Centralizes partition directory path construction by introducing a `part_dir()` helper, replacing manual string formatting across multiple command files. Adds `--with-counts` and `--keep-intermediate` CLI flags to the index command and fixes a typo in the `partition_dir` parameter name.
This commit is contained in:
@@ -37,7 +37,7 @@ pub fn run(args: FastaArgs) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dump_super_kmers(kp: &KmerPartition, partition_dir: &PathBuf) {
|
fn dump_super_kmers(kp: &KmerPartition, _partition_dir: &PathBuf) {
|
||||||
let k = kp.kmer_size();
|
let k = kp.kmer_size();
|
||||||
let m = kp.minimizer_size();
|
let m = kp.minimizer_size();
|
||||||
let n = kp.n_partitions();
|
let n = kp.n_partitions();
|
||||||
@@ -47,7 +47,7 @@ fn dump_super_kmers(kp: &KmerPartition, partition_dir: &PathBuf) {
|
|||||||
let total = AtomicUsize::new(0);
|
let total = AtomicUsize::new(0);
|
||||||
|
|
||||||
(0..n).into_par_iter().for_each(|i| {
|
(0..n).into_par_iter().for_each(|i| {
|
||||||
let part_dir = partition_dir.join(format!("part_{i:05}"));
|
let part_dir = kp.part_dir(i);
|
||||||
let in_path = part_dir.join("dereplicated.skmer.zst");
|
let in_path = part_dir.join("dereplicated.skmer.zst");
|
||||||
if !in_path.exists() {
|
if !in_path.exists() {
|
||||||
return;
|
return;
|
||||||
|
|||||||
+26
-197
@@ -1,28 +1,13 @@
|
|||||||
use std::fs;
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
|
|
||||||
use cacheline_ef::{CachelineEf, CachelineEfVec};
|
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use epserde::prelude::*;
|
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
|
||||||
use obicompactvec::PersistentCompactIntMatrix;
|
|
||||||
use obicompactvec::PersistentCompactIntVec;
|
|
||||||
use obidebruinj::GraphDeBruijn;
|
|
||||||
use obikpartitionner::KmerPartition;
|
use obikpartitionner::KmerPartition;
|
||||||
use obikseq::{RoutableSuperKmer, set_k, set_m};
|
use obikseq::{set_k, set_m};
|
||||||
use obiskio::SKFileReader;
|
use obisys::Reporter;
|
||||||
use obilayeredmap::layer::Layer;
|
|
||||||
use obisys::{Reporter, Stage};
|
|
||||||
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
|
|
||||||
use rayon::prelude::*;
|
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
use crate::cli::{CommonArgs, PipelineData, open_chunks};
|
use crate::cli::CommonArgs;
|
||||||
|
use crate::steps::{build_index, dereplicate_and_count, scatter};
|
||||||
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
|
|
||||||
|
|
||||||
#[derive(Args)]
|
#[derive(Args)]
|
||||||
pub struct IndexArgs {
|
pub struct IndexArgs {
|
||||||
@@ -42,20 +27,24 @@ pub struct IndexArgs {
|
|||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
pub max_abundance: Option<u32>,
|
pub max_abundance: Option<u32>,
|
||||||
|
|
||||||
|
/// Store kmer counts in the index (default: set membership only)
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
pub with_counts: bool,
|
||||||
|
|
||||||
|
/// Keep intermediate build files (dereplicated superkmers, mphf1, counts1)
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
pub keep_intermediate: bool,
|
||||||
|
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
pub common: CommonArgs,
|
pub common: CommonArgs,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(args: IndexArgs) {
|
pub fn run(args: IndexArgs) {
|
||||||
let output = args.output.clone();
|
let output = args.output.clone();
|
||||||
let min_ab = args.min_abundance;
|
|
||||||
let max_ab = args.max_abundance;
|
|
||||||
|
|
||||||
// ── Stage 1: scatter (skipped if partition already exists) ───────────────
|
|
||||||
let partition_meta = output.join("partition.meta");
|
|
||||||
let mut rep = Reporter::new();
|
let mut rep = Reporter::new();
|
||||||
|
|
||||||
let kp = if partition_meta.exists() {
|
// ── Stage 1: scatter (skipped if partition already exists) ───────────────
|
||||||
|
let kp = if output.join("partition.meta").exists() {
|
||||||
info!("resuming from existing partition at {}", output.display());
|
info!("resuming from existing partition at {}", output.display());
|
||||||
let kp = KmerPartition::open(&output).unwrap_or_else(|e| {
|
let kp = KmerPartition::open(&output).unwrap_or_else(|e| {
|
||||||
eprintln!("error opening partition: {e}");
|
eprintln!("error opening partition: {e}");
|
||||||
@@ -80,187 +69,27 @@ pub fn run(args: IndexArgs) {
|
|||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
let path_source = args.common.seqfile_paths();
|
scatter(&mut kp, args.common.seqfile_paths(), k, level_max, theta, n_workers, &mut rep);
|
||||||
|
|
||||||
let t = Stage::start("scatter");
|
|
||||||
let pipe = obipipeline::make_pipe! {
|
|
||||||
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
|
|
||||||
||? { |path| open_chunks(path) } : Path => RawChunk,
|
|
||||||
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
|
|
||||||
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
|
|
||||||
};
|
|
||||||
|
|
||||||
let pb = ProgressBar::new_spinner();
|
|
||||||
pb.set_style(
|
|
||||||
ProgressStyle::with_template("{spinner} scatter — {msg} {elapsed}")
|
|
||||||
.unwrap()
|
|
||||||
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
|
|
||||||
);
|
|
||||||
pb.enable_steady_tick(Duration::from_millis(100));
|
|
||||||
|
|
||||||
let mut total_bases: u64 = 0;
|
|
||||||
let mut ema_rate: f64 = 0.0;
|
|
||||||
let mut last_t = Instant::now();
|
|
||||||
let mut last_bases: u64 = 0;
|
|
||||||
const ALPHA: f64 = 0.15;
|
|
||||||
|
|
||||||
for batch in pipe.apply(path_source, n_workers, 1) {
|
|
||||||
total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::<u64>();
|
|
||||||
let now = Instant::now();
|
|
||||||
let dt = now.duration_since(last_t).as_secs_f64();
|
|
||||||
if dt > 0.1 {
|
|
||||||
let instant = (total_bases - last_bases) as f64 / dt;
|
|
||||||
ema_rate = ALPHA * instant + (1.0 - ALPHA) * ema_rate;
|
|
||||||
last_t = now;
|
|
||||||
last_bases = total_bases;
|
|
||||||
let bp = total_bases as f64;
|
|
||||||
let (count_str, rate_str) = if bp >= 1e9 {
|
|
||||||
(format!("{:.2} Gbp", bp / 1e9), format!("{:.0} Mbp/s", ema_rate / 1e6))
|
|
||||||
} else {
|
|
||||||
(format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6))
|
|
||||||
};
|
|
||||||
pb.set_message(format!("{count_str} {rate_str}"));
|
|
||||||
}
|
|
||||||
kp.write_batch(batch).unwrap_or_else(|e| {
|
|
||||||
eprintln!("error: {e}");
|
|
||||||
std::process::exit(1);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
pb.finish_and_clear();
|
|
||||||
kp.close().expect("close error");
|
|
||||||
rep.push(t.stop());
|
|
||||||
kp
|
kp
|
||||||
};
|
};
|
||||||
|
|
||||||
let n = kp.n_partitions();
|
|
||||||
|
|
||||||
// ── Stage 2: dereplicate + count (skipped if already done) ───────────────
|
// ── Stage 2: dereplicate + count (skipped if already done) ───────────────
|
||||||
let counting_done = output.join("kmer_spectrum_raw.json").exists();
|
if !output.join("kmer_spectrum_raw.json").exists() {
|
||||||
if !counting_done {
|
dereplicate_and_count(&kp, &mut rep);
|
||||||
info!("dereplicating...");
|
|
||||||
let t = Stage::start("dereplicate");
|
|
||||||
kp.dereplicate().expect("dereplicate error");
|
|
||||||
rep.push(t.stop());
|
|
||||||
|
|
||||||
let t = Stage::start("count_kmer");
|
|
||||||
info!("counting kmers...");
|
|
||||||
kp.count_kmer().expect("count kmer error");
|
|
||||||
rep.push(t.stop());
|
|
||||||
} else {
|
} else {
|
||||||
info!("kmer counts already present, skipping dereplicate + count");
|
info!("kmer counts already present, skipping dereplicate + count");
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Stage 3: build layered index per partition ────────────────────────────
|
// ── Stage 3: build layered index ─────────────────────────────────────────
|
||||||
let t = Stage::start("index");
|
build_index(
|
||||||
let total_kmers = AtomicUsize::new(0);
|
&kp,
|
||||||
let filter_active = min_ab > 1 || max_ab.is_some();
|
args.min_abundance,
|
||||||
|
args.max_abundance,
|
||||||
|
args.with_counts,
|
||||||
|
args.keep_intermediate,
|
||||||
|
&mut rep,
|
||||||
|
);
|
||||||
|
|
||||||
let pb = ProgressBar::new(n as u64);
|
|
||||||
pb.set_style(ProgressStyle::with_template(
|
|
||||||
"index — [{bar:20}] {pos}/{len} | {msg}",
|
|
||||||
).unwrap());
|
|
||||||
|
|
||||||
let pb = Arc::new(Mutex::new(pb));
|
|
||||||
|
|
||||||
(0..n).into_par_iter().for_each(|i| {
|
|
||||||
let part_dir = output.join(format!("part_{i:05}"));
|
|
||||||
let dedup_path = part_dir.join("dereplicated.skmer.zst");
|
|
||||||
if !dedup_path.exists() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let layer_dir = part_dir.join("index").join("layer_0");
|
|
||||||
if layer_dir.join("mphf.bin").exists() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load partition MPHF + counts for abundance filtering
|
|
||||||
let mphf1_opt: Option<Mphf> = if filter_active {
|
|
||||||
let p = part_dir.join("mphf1.bin");
|
|
||||||
p.exists()
|
|
||||||
.then(|| Mphf::load_full(&p).ok())
|
|
||||||
.flatten()
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let counts1_opt: Option<PersistentCompactIntVec> = if filter_active {
|
|
||||||
let p = part_dir.join("counts1.bin");
|
|
||||||
p.exists()
|
|
||||||
.then(|| PersistentCompactIntVec::open(&p).ok())
|
|
||||||
.flatten()
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
// Build de Bruijn graph with optional abundance filter
|
|
||||||
let mut g = GraphDeBruijn::new();
|
|
||||||
let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| {
|
|
||||||
eprintln!("error opening {}: {e}", dedup_path.display());
|
|
||||||
std::process::exit(1);
|
|
||||||
});
|
|
||||||
for sk in reader.iter() {
|
|
||||||
for kmer in sk.iter_canonical_kmers() {
|
|
||||||
let accept = match (&mphf1_opt, &counts1_opt) {
|
|
||||||
(Some(mphf), Some(counts)) => {
|
|
||||||
let slot = mphf.index(&kmer.raw());
|
|
||||||
let ab = counts.get(slot);
|
|
||||||
ab >= min_ab && max_ab.map_or(true, |max| ab <= max)
|
|
||||||
}
|
|
||||||
_ => true,
|
|
||||||
};
|
|
||||||
if accept {
|
|
||||||
g.push(kmer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let n_kmers = g.len();
|
|
||||||
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
|
|
||||||
g.compute_degrees();
|
|
||||||
|
|
||||||
// Write unitigs to layer_0/unitigs.bin
|
|
||||||
fs::create_dir_all(&layer_dir).unwrap_or_else(|e| {
|
|
||||||
eprintln!("error creating {}: {e}", layer_dir.display());
|
|
||||||
std::process::exit(1);
|
|
||||||
});
|
|
||||||
let mut uw = Layer::<PersistentCompactIntMatrix>::unitig_writer(&layer_dir)
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
eprintln!("error creating unitig writer for partition {i}: {e}");
|
|
||||||
std::process::exit(1);
|
|
||||||
});
|
|
||||||
for unitig in g.iter_unitig() {
|
|
||||||
uw.write(&unitig).unwrap_or_else(|e| {
|
|
||||||
eprintln!("error writing unitig (partition {i}): {e}");
|
|
||||||
std::process::exit(1);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
uw.close().unwrap_or_else(|e| {
|
|
||||||
eprintln!("error closing unitig writer (partition {i}): {e}");
|
|
||||||
std::process::exit(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
// Build MPHF + count matrix for this layer
|
|
||||||
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, |kmer| {
|
|
||||||
match (&mphf1_opt, &counts1_opt) {
|
|
||||||
(Some(mphf), Some(counts)) => {
|
|
||||||
let slot = mphf.index(&kmer.raw());
|
|
||||||
counts.get(slot)
|
|
||||||
}
|
|
||||||
_ => 1,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
eprintln!("error building layer index (partition {i}): {e}");
|
|
||||||
std::process::exit(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
let pb = pb.lock().unwrap();
|
|
||||||
pb.inc(1);
|
|
||||||
pb.set_message(format!("{i}: {n_kmers} kmers"));
|
|
||||||
});
|
|
||||||
|
|
||||||
pb.lock().unwrap().finish_and_clear();
|
|
||||||
rep.push(t.stop());
|
|
||||||
rep.print();
|
rep.print();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ pub fn run(args: LongtigArgs) {
|
|||||||
let total_kmers = AtomicUsize::new(0);
|
let total_kmers = AtomicUsize::new(0);
|
||||||
|
|
||||||
(0..n).into_par_iter().for_each(|i| {
|
(0..n).into_par_iter().for_each(|i| {
|
||||||
let part_dir = args.partition.join(format!("part_{i:05}"));
|
let part_dir = kp.part_dir(i);
|
||||||
let in_path = part_dir.join("dereplicated.skmer.zst");
|
let in_path = part_dir.join("dereplicated.skmer.zst");
|
||||||
if !in_path.exists() {
|
if !in_path.exists() {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -1,14 +1,12 @@
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
|
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
|
||||||
use obikpartitionner::KmerPartition;
|
use obikpartitionner::KmerPartition;
|
||||||
use obisys::{Reporter, Stage};
|
use obisys::Reporter;
|
||||||
use obikseq::{RoutableSuperKmer, set_k, set_m};
|
use obikseq::{set_k, set_m};
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
use crate::cli::{CommonArgs, PipelineData, open_chunks};
|
use crate::cli::CommonArgs;
|
||||||
|
use crate::steps::{dereplicate_and_count, scatter};
|
||||||
|
|
||||||
#[derive(Args)]
|
#[derive(Args)]
|
||||||
pub struct PartitionArgs {
|
pub struct PartitionArgs {
|
||||||
@@ -44,63 +42,8 @@ pub fn run(args: PartitionArgs) {
|
|||||||
let path_source = args.common.seqfile_paths();
|
let path_source = args.common.seqfile_paths();
|
||||||
let mut rep = Reporter::new();
|
let mut rep = Reporter::new();
|
||||||
|
|
||||||
let t = Stage::start("scatter");
|
scatter(&mut kp, path_source, k, level_max, theta, n_workers, &mut rep);
|
||||||
let pipe = obipipeline::make_pipe! {
|
dereplicate_and_count(&kp, &mut rep);
|
||||||
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
|
|
||||||
||? { |path| open_chunks(path) } : Path => RawChunk,
|
|
||||||
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
|
|
||||||
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
|
|
||||||
};
|
|
||||||
|
|
||||||
let pb = ProgressBar::new_spinner();
|
|
||||||
pb.set_style(
|
|
||||||
ProgressStyle::with_template("{spinner} scatter — {msg} {elapsed}")
|
|
||||||
.unwrap()
|
|
||||||
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
|
|
||||||
);
|
|
||||||
pb.enable_steady_tick(Duration::from_millis(100));
|
|
||||||
|
|
||||||
let mut total_bases: u64 = 0;
|
|
||||||
let mut ema_rate: f64 = 0.0;
|
|
||||||
let mut last_t = Instant::now();
|
|
||||||
let mut last_bases: u64 = 0;
|
|
||||||
const ALPHA: f64 = 0.15;
|
|
||||||
|
|
||||||
for batch in pipe.apply(path_source, n_workers, 1) {
|
|
||||||
total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::<u64>();
|
|
||||||
let now = Instant::now();
|
|
||||||
let dt = now.duration_since(last_t).as_secs_f64();
|
|
||||||
if dt > 0.1 {
|
|
||||||
let instant = (total_bases - last_bases) as f64 / dt;
|
|
||||||
ema_rate = ALPHA * instant + (1.0 - ALPHA) * ema_rate;
|
|
||||||
last_t = now;
|
|
||||||
last_bases = total_bases;
|
|
||||||
let bp = total_bases as f64;
|
|
||||||
let (count_str, rate_str) = if bp >= 1e9 {
|
|
||||||
(format!("{:.2} Gbp", bp / 1e9), format!("{:.0} Mbp/s", ema_rate / 1e6))
|
|
||||||
} else {
|
|
||||||
(format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6))
|
|
||||||
};
|
|
||||||
pb.set_message(format!("{count_str} {rate_str}"));
|
|
||||||
}
|
|
||||||
kp.write_batch(batch).unwrap_or_else(|e| {
|
|
||||||
eprintln!("error: {e}");
|
|
||||||
std::process::exit(1)
|
|
||||||
});
|
|
||||||
}
|
|
||||||
pb.finish_and_clear();
|
|
||||||
|
|
||||||
kp.close().expect("close error");
|
|
||||||
rep.push(t.stop());
|
|
||||||
|
|
||||||
info!("dereplicating...");
|
|
||||||
let t = Stage::start("dereplicate");
|
|
||||||
kp.dereplicate().expect("dereplicate error");
|
|
||||||
rep.push(t.stop());
|
|
||||||
|
|
||||||
let t = Stage::start("count_kmer");
|
|
||||||
kp.count_kmer().expect("count kmer error");
|
|
||||||
rep.push(t.stop());
|
|
||||||
|
|
||||||
rep.print();
|
rep.print();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ pub fn run(args: UnitigArgs) {
|
|||||||
let total_kmers = AtomicUsize::new(0);
|
let total_kmers = AtomicUsize::new(0);
|
||||||
|
|
||||||
(0..n).into_par_iter().for_each(|i| {
|
(0..n).into_par_iter().for_each(|i| {
|
||||||
let part_dir = args.partition.join(format!("part_{i:05}"));
|
let part_dir = kp.part_dir(i);
|
||||||
let in_path = part_dir.join("dereplicated.skmer.zst");
|
let in_path = part_dir.join("dereplicated.skmer.zst");
|
||||||
if !in_path.exists() {
|
if !in_path.exists() {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
mod cli;
|
mod cli;
|
||||||
mod cmd;
|
mod cmd;
|
||||||
|
mod steps;
|
||||||
|
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use tracing_subscriber::{EnvFilter, fmt};
|
use tracing_subscriber::{EnvFilter, fmt};
|
||||||
|
|||||||
@@ -0,0 +1,177 @@
|
|||||||
|
use std::fs;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use cacheline_ef::{CachelineEf, CachelineEfVec};
|
||||||
|
use epserde::prelude::*;
|
||||||
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
|
use obicompactvec::PersistentCompactIntMatrix;
|
||||||
|
use obicompactvec::PersistentCompactIntVec;
|
||||||
|
use obidebruinj::GraphDeBruijn;
|
||||||
|
use obikpartitionner::KmerPartition;
|
||||||
|
use obilayeredmap::layer::Layer;
|
||||||
|
use obiskio::{SKFileMeta, SKFileReader};
|
||||||
|
use obisys::{Reporter, Stage};
|
||||||
|
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
|
||||||
|
use rayon::prelude::*;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u8>>;
|
||||||
|
|
||||||
|
/// Build the layered MPHF index for all partitions in parallel.
|
||||||
|
///
|
||||||
|
/// Default mode (with_counts = false): set membership only (`Layer<()>`).
|
||||||
|
/// With counts (with_counts = true): count matrix per kmer (`Layer<PersistentCompactIntMatrix>`).
|
||||||
|
///
|
||||||
|
/// Skips any partition whose `index/layer_0/mphf.bin` already exists (resume).
|
||||||
|
/// Reports the "index" stage to `rep`.
|
||||||
|
pub fn build_index(
|
||||||
|
kp: &KmerPartition,
|
||||||
|
min_ab: u32,
|
||||||
|
max_ab: Option<u32>,
|
||||||
|
with_counts: bool,
|
||||||
|
keep_intermediate: bool,
|
||||||
|
rep: &mut Reporter,
|
||||||
|
) {
|
||||||
|
let n = kp.n_partitions();
|
||||||
|
let t = Stage::start("index");
|
||||||
|
let total_kmers = AtomicUsize::new(0);
|
||||||
|
let filter_active = min_ab > 1 || max_ab.is_some();
|
||||||
|
let need_counts = filter_active || with_counts;
|
||||||
|
|
||||||
|
let pb = Arc::new(Mutex::new(
|
||||||
|
ProgressBar::new(n as u64).with_style(
|
||||||
|
ProgressStyle::with_template("index — [{bar:20}] {pos}/{len} | {msg}").unwrap(),
|
||||||
|
),
|
||||||
|
));
|
||||||
|
|
||||||
|
(0..n).into_par_iter().for_each(|i| {
|
||||||
|
let part_dir = kp.part_dir(i);
|
||||||
|
let dedup_path = part_dir.join("dereplicated.skmer.zst");
|
||||||
|
if !dedup_path.exists() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let layer_dir = part_dir.join("index").join("layer_0");
|
||||||
|
if layer_dir.join("mphf.bin").exists() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load partition MPHF + counts when needed for filtering or count payload
|
||||||
|
let mphf1_opt: Option<Mphf> = if need_counts {
|
||||||
|
let p = part_dir.join("mphf1.bin");
|
||||||
|
p.exists().then(|| Mphf::load_full(&p).ok()).flatten()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let counts1_opt: Option<PersistentCompactIntVec> = if need_counts {
|
||||||
|
let p = part_dir.join("counts1.bin");
|
||||||
|
p.exists()
|
||||||
|
.then(|| PersistentCompactIntVec::open(&p).ok())
|
||||||
|
.flatten()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// Build de Bruijn graph with optional abundance filter
|
||||||
|
let mut g = GraphDeBruijn::new();
|
||||||
|
let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| {
|
||||||
|
eprintln!("error opening {}: {e}", dedup_path.display());
|
||||||
|
std::process::exit(1);
|
||||||
|
});
|
||||||
|
for sk in reader.iter() {
|
||||||
|
for kmer in sk.iter_canonical_kmers() {
|
||||||
|
let accept = if filter_active {
|
||||||
|
match (&mphf1_opt, &counts1_opt) {
|
||||||
|
(Some(mphf), Some(counts)) => {
|
||||||
|
let ab = counts.get(mphf.index(&kmer.raw()));
|
||||||
|
ab >= min_ab && max_ab.map_or(true, |max| ab <= max)
|
||||||
|
}
|
||||||
|
_ => true,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
};
|
||||||
|
if accept {
|
||||||
|
g.push(kmer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let n_kmers = g.len();
|
||||||
|
total_kmers.fetch_add(n_kmers, Ordering::Relaxed);
|
||||||
|
g.compute_degrees();
|
||||||
|
|
||||||
|
// Write unitigs to layer_0/unitigs.bin
|
||||||
|
fs::create_dir_all(&layer_dir).unwrap_or_else(|e| {
|
||||||
|
eprintln!("error creating {}: {e}", layer_dir.display());
|
||||||
|
std::process::exit(1);
|
||||||
|
});
|
||||||
|
let mut uw = Layer::<()>::unitig_writer(&layer_dir).unwrap_or_else(|e| {
|
||||||
|
eprintln!("error creating unitig writer (partition {i}): {e}");
|
||||||
|
std::process::exit(1);
|
||||||
|
});
|
||||||
|
for unitig in g.iter_unitig() {
|
||||||
|
uw.write(&unitig).unwrap_or_else(|e| {
|
||||||
|
eprintln!("error writing unitig (partition {i}): {e}");
|
||||||
|
std::process::exit(1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
uw.close().unwrap_or_else(|e| {
|
||||||
|
eprintln!("error closing unitig writer (partition {i}): {e}");
|
||||||
|
std::process::exit(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Build MPHF layer — mode depends on --with-counts
|
||||||
|
if with_counts {
|
||||||
|
Layer::<PersistentCompactIntMatrix>::build(&layer_dir, |kmer| {
|
||||||
|
match (&mphf1_opt, &counts1_opt) {
|
||||||
|
(Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())),
|
||||||
|
_ => 1,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
eprintln!("error building count layer (partition {i}): {e}");
|
||||||
|
std::process::exit(1);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
Layer::<()>::build(&layer_dir).unwrap_or_else(|e| {
|
||||||
|
eprintln!("error building set layer (partition {i}): {e}");
|
||||||
|
std::process::exit(1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let pb = pb.lock().unwrap();
|
||||||
|
pb.inc(1);
|
||||||
|
pb.set_message(format!("{i}: {n_kmers} kmers"));
|
||||||
|
});
|
||||||
|
|
||||||
|
pb.lock().unwrap().finish_and_clear();
|
||||||
|
info!(
|
||||||
|
"done — {} total kmers indexed",
|
||||||
|
total_kmers.load(Ordering::Relaxed)
|
||||||
|
);
|
||||||
|
|
||||||
|
// ── Cleanup intermediate build files ──────────────────────────────────────
|
||||||
|
if !keep_intermediate {
|
||||||
|
for i in 0..n {
|
||||||
|
let part_dir = kp.part_dir(i);
|
||||||
|
remove_if_exists(&part_dir.join("dereplicated.skmer.zst"));
|
||||||
|
remove_if_exists(&SKFileMeta::sidecar_path(&part_dir.join("dereplicated.skmer.zst")));
|
||||||
|
remove_if_exists(&part_dir.join("mphf1.bin"));
|
||||||
|
remove_if_exists(&part_dir.join("counts1.bin"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rep.push(t.stop());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_if_exists(path: &Path) {
|
||||||
|
if let Err(e) = fs::remove_file(path) {
|
||||||
|
if e.kind() != std::io::ErrorKind::NotFound {
|
||||||
|
eprintln!("warning: could not remove {}: {e}", path.display());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
use obikpartitionner::KmerPartition;
|
||||||
|
use obisys::{Reporter, Stage};
|
||||||
|
|
||||||
|
/// Dereplicate then count kmers. Reports each stage to `rep`.
|
||||||
|
pub fn dereplicate_and_count(kp: &KmerPartition, rep: &mut Reporter) {
|
||||||
|
let t = Stage::start("dereplicate");
|
||||||
|
kp.dereplicate().expect("dereplicate error");
|
||||||
|
rep.push(t.stop());
|
||||||
|
|
||||||
|
let t = Stage::start("count_kmer");
|
||||||
|
kp.count_kmer().expect("count kmer error");
|
||||||
|
rep.push(t.stop());
|
||||||
|
}
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
mod build_index;
|
||||||
|
mod dereplicate_and_count;
|
||||||
|
mod scatter;
|
||||||
|
|
||||||
|
pub use build_index::build_index;
|
||||||
|
pub use dereplicate_and_count::dereplicate_and_count;
|
||||||
|
pub use scatter::scatter;
|
||||||
@@ -0,0 +1,71 @@
|
|||||||
|
use std::path::PathBuf;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
|
use obikpartitionner::KmerPartition;
|
||||||
|
use obisys::{Reporter, Stage};
|
||||||
|
|
||||||
|
use crate::cli::{PipelineData, open_chunks};
|
||||||
|
|
||||||
|
/// Run scatter: normalise → build superkmers → route to partition → close.
|
||||||
|
/// Reports the "scatter" stage to `rep`.
|
||||||
|
pub fn scatter(
|
||||||
|
kp: &mut KmerPartition,
|
||||||
|
path_source: obiread::PathIter,
|
||||||
|
k: usize,
|
||||||
|
level_max: usize,
|
||||||
|
theta: f64,
|
||||||
|
n_workers: usize,
|
||||||
|
rep: &mut Reporter,
|
||||||
|
) {
|
||||||
|
use obikseq::RoutableSuperKmer;
|
||||||
|
|
||||||
|
let t = Stage::start("scatter");
|
||||||
|
let pipe = obipipeline::make_pipe! {
|
||||||
|
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
|
||||||
|
||? { |path| open_chunks(path) } : Path => RawChunk,
|
||||||
|
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
|
||||||
|
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
|
||||||
|
};
|
||||||
|
|
||||||
|
let pb = ProgressBar::new_spinner();
|
||||||
|
pb.set_style(
|
||||||
|
ProgressStyle::with_template("{spinner} scatter — {msg} {elapsed}")
|
||||||
|
.unwrap()
|
||||||
|
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
|
||||||
|
);
|
||||||
|
pb.enable_steady_tick(Duration::from_millis(100));
|
||||||
|
|
||||||
|
let mut total_bases: u64 = 0;
|
||||||
|
let mut ema_rate: f64 = 0.0;
|
||||||
|
let mut last_t = Instant::now();
|
||||||
|
let mut last_bases: u64 = 0;
|
||||||
|
const ALPHA: f64 = 0.15;
|
||||||
|
|
||||||
|
for batch in pipe.apply(path_source, n_workers, 1) {
|
||||||
|
total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::<u64>();
|
||||||
|
let now = Instant::now();
|
||||||
|
let dt = now.duration_since(last_t).as_secs_f64();
|
||||||
|
if dt > 0.1 {
|
||||||
|
let instant = (total_bases - last_bases) as f64 / dt;
|
||||||
|
ema_rate = ALPHA * instant + (1.0 - ALPHA) * ema_rate;
|
||||||
|
last_t = now;
|
||||||
|
last_bases = total_bases;
|
||||||
|
let bp = total_bases as f64;
|
||||||
|
let (count_str, rate_str) = if bp >= 1e9 {
|
||||||
|
(format!("{:.2} Gbp", bp / 1e9), format!("{:.0} Mbp/s", ema_rate / 1e6))
|
||||||
|
} else {
|
||||||
|
(format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6))
|
||||||
|
};
|
||||||
|
pb.set_message(format!("{count_str} {rate_str}"));
|
||||||
|
}
|
||||||
|
kp.write_batch(batch).unwrap_or_else(|e| {
|
||||||
|
eprintln!("error: {e}");
|
||||||
|
std::process::exit(1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
pb.finish_and_clear();
|
||||||
|
kp.close().expect("close error");
|
||||||
|
rep.push(t.stop());
|
||||||
|
}
|
||||||
|
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
mod kmer_sort;
|
mod kmer_sort;
|
||||||
mod partition;
|
mod partition;
|
||||||
|
|
||||||
pub use partition::KmerPartition;
|
pub use partition::{KmerPartition, PARTITIONS_SUBDIR};
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ type Mphf = PtrHash<u64, CubicEps, CachelineEfVec<Vec<CachelineEf>>, Xx64, Vec<u
|
|||||||
|
|
||||||
const META_FILENAME: &str = "partition.meta";
|
const META_FILENAME: &str = "partition.meta";
|
||||||
const SK_EXT: &str = "skmer.zst";
|
const SK_EXT: &str = "skmer.zst";
|
||||||
|
pub const PARTITIONS_SUBDIR: &str = "partitions";
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct PartitionMeta {
|
struct PartitionMeta {
|
||||||
@@ -84,7 +85,7 @@ impl KmerPartition {
|
|||||||
.into());
|
.into());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fs::create_dir_all(&root_path)?;
|
fs::create_dir_all(root_path.join(PARTITIONS_SUBDIR))?;
|
||||||
let n_partitions = 1usize << n_bits;
|
let n_partitions = 1usize << n_bits;
|
||||||
let writers = (0..n_partitions).map(|_| None).collect();
|
let writers = (0..n_partitions).map(|_| None).collect();
|
||||||
let partition = Self {
|
let partition = Self {
|
||||||
@@ -175,6 +176,11 @@ impl KmerPartition {
|
|||||||
&self.root_path
|
&self.root_path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Path of partition `i` directory.
|
||||||
|
pub fn part_dir(&self, i: usize) -> PathBuf {
|
||||||
|
self.root_path.join(PARTITIONS_SUBDIR).join(format!("part_{i:05}"))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn kmer_size(&self) -> usize {
|
pub fn kmer_size(&self) -> usize {
|
||||||
self.kmer_size
|
self.kmer_size
|
||||||
}
|
}
|
||||||
@@ -208,7 +214,6 @@ impl KmerPartition {
|
|||||||
/// more temporary file descriptors — all managed by the global fd pool.
|
/// more temporary file descriptors — all managed by the global fd pool.
|
||||||
pub fn dereplicate(&self) -> SKResult<()> {
|
pub fn dereplicate(&self) -> SKResult<()> {
|
||||||
let level = self.level;
|
let level = self.level;
|
||||||
let root = &self.root_path;
|
|
||||||
let sys = System::new_all();
|
let sys = System::new_all();
|
||||||
// available_memory() can return 0 on macOS when the compressor page count exceeds
|
// available_memory() can return 0 on macOS when the compressor page count exceeds
|
||||||
// free+inactive+purgeable pages (sysinfo saturating_sub). Fall back to half of total.
|
// free+inactive+purgeable pages (sysinfo saturating_sub). Fall back to half of total.
|
||||||
@@ -222,7 +227,7 @@ impl KmerPartition {
|
|||||||
let results: Vec<SKResult<()>> = (0..self.n_partitions)
|
let results: Vec<SKResult<()>> = (0..self.n_partitions)
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|i| {
|
.map(|i| {
|
||||||
let dir = root.join(format!("part_{:05}", i));
|
let dir = self.part_dir(i);
|
||||||
if !dir.exists() {
|
if !dir.exists() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@@ -249,8 +254,6 @@ impl KmerPartition {
|
|||||||
/// Partitions are processed in parallel via Rayon (one task per thread).
|
/// Partitions are processed in parallel via Rayon (one task per thread).
|
||||||
/// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously.
|
/// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously.
|
||||||
pub fn count_kmer(&self) -> SKResult<()> {
|
pub fn count_kmer(&self) -> SKResult<()> {
|
||||||
let root = &self.root_path;
|
|
||||||
|
|
||||||
let sys = System::new_all();
|
let sys = System::new_all();
|
||||||
let available = match sys.available_memory() {
|
let available = match sys.available_memory() {
|
||||||
0 => sys.total_memory() / 2,
|
0 => sys.total_memory() / 2,
|
||||||
@@ -271,7 +274,7 @@ impl KmerPartition {
|
|||||||
let results: Vec<SKResult<()>> = (0..self.n_partitions)
|
let results: Vec<SKResult<()>> = (0..self.n_partitions)
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|i| {
|
.map(|i| {
|
||||||
let dir = root.join(format!("part_{:05}", i));
|
let dir = self.part_dir(i);
|
||||||
let dedup_path = dir.join(format!("dereplicated.{SK_EXT}"));
|
let dedup_path = dir.join(format!("dereplicated.{SK_EXT}"));
|
||||||
if !dedup_path.exists() {
|
if !dedup_path.exists() {
|
||||||
pb.inc(1);
|
pb.inc(1);
|
||||||
@@ -296,9 +299,7 @@ impl KmerPartition {
|
|||||||
let mut global_f1: u64 = 0;
|
let mut global_f1: u64 = 0;
|
||||||
|
|
||||||
for i in 0..self.n_partitions {
|
for i in 0..self.n_partitions {
|
||||||
let path = root
|
let path = self.part_dir(i).join("kmer_spectrum_raw.json");
|
||||||
.join(format!("part_{:05}", i))
|
|
||||||
.join("kmer_spectrum_raw.json");
|
|
||||||
if !path.exists() {
|
if !path.exists() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -320,7 +321,7 @@ impl KmerPartition {
|
|||||||
.map(|(&c, &f)| (format!("{c:010}"), f))
|
.map(|(&c, &f)| (format!("{c:010}"), f))
|
||||||
.collect();
|
.collect();
|
||||||
serde_json::to_writer_pretty(
|
serde_json::to_writer_pretty(
|
||||||
fs::File::create(root.join("kmer_spectrum_raw.json"))?,
|
fs::File::create(self.root_path.join("kmer_spectrum_raw.json"))?,
|
||||||
&serde_json::json!({ "f0": global_f0, "f1": global_f1, "spectrum": &global_spectrum_map }),
|
&serde_json::json!({ "f0": global_f0, "f1": global_f1, "spectrum": &global_spectrum_map }),
|
||||||
)
|
)
|
||||||
.map_err(io::Error::other)?;
|
.map_err(io::Error::other)?;
|
||||||
@@ -352,7 +353,7 @@ impl KmerPartition {
|
|||||||
|
|
||||||
fn ensure_writer(&mut self, partition: usize) -> SKResult<&mut SKFileWriter> {
|
fn ensure_writer(&mut self, partition: usize) -> SKResult<&mut SKFileWriter> {
|
||||||
if self.writers[partition].is_none() {
|
if self.writers[partition].is_none() {
|
||||||
let dir = self.root_path.join(format!("part_{:05}", partition));
|
let dir = self.root_path.join(PARTITIONS_SUBDIR).join(format!("part_{:05}", partition));
|
||||||
fs::create_dir_all(&dir)?;
|
fs::create_dir_all(&dir)?;
|
||||||
let file_path = dir.join(format!("raw.{SK_EXT}"));
|
let file_path = dir.join(format!("raw.{SK_EXT}"));
|
||||||
let writer = SKFileWriter::create_with(file_path, Format::Zstd, self.level)?;
|
let writer = SKFileWriter::create_with(file_path, Format::Zstd, self.level)?;
|
||||||
@@ -645,7 +646,7 @@ mod tests {
|
|||||||
kp.close().unwrap();
|
kp.close().unwrap();
|
||||||
kp.dereplicate().unwrap();
|
kp.dereplicate().unwrap();
|
||||||
|
|
||||||
let part_dir = dir.path().join("part_00000");
|
let part_dir = dir.path().join(PARTITIONS_SUBDIR).join("part_00000");
|
||||||
let dedup_path = part_dir.join("dereplicated.skmer.zst");
|
let dedup_path = part_dir.join("dereplicated.skmer.zst");
|
||||||
if !dedup_path.exists() {
|
if !dedup_path.exists() {
|
||||||
return (0, 0);
|
return (0, 0);
|
||||||
|
|||||||
Reference in New Issue
Block a user