diff --git a/src/obikmer/src/cmd/fasta.rs b/src/obikmer/src/cmd/fasta.rs index 4fd1e43..9eb384a 100644 --- a/src/obikmer/src/cmd/fasta.rs +++ b/src/obikmer/src/cmd/fasta.rs @@ -37,7 +37,7 @@ pub fn run(args: FastaArgs) { } } -fn dump_super_kmers(kp: &KmerPartition, partition_dir: &PathBuf) { +fn dump_super_kmers(kp: &KmerPartition, _partition_dir: &PathBuf) { let k = kp.kmer_size(); let m = kp.minimizer_size(); let n = kp.n_partitions(); @@ -47,7 +47,7 @@ fn dump_super_kmers(kp: &KmerPartition, partition_dir: &PathBuf) { let total = AtomicUsize::new(0); (0..n).into_par_iter().for_each(|i| { - let part_dir = partition_dir.join(format!("part_{i:05}")); + let part_dir = kp.part_dir(i); let in_path = part_dir.join("dereplicated.skmer.zst"); if !in_path.exists() { return; diff --git a/src/obikmer/src/cmd/index.rs b/src/obikmer/src/cmd/index.rs index 86cda2d..609b45d 100644 --- a/src/obikmer/src/cmd/index.rs +++ b/src/obikmer/src/cmd/index.rs @@ -1,28 +1,13 @@ -use std::fs; use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; -use cacheline_ef::{CachelineEf, CachelineEfVec}; use clap::Args; -use epserde::prelude::*; -use indicatif::{ProgressBar, ProgressStyle}; -use obicompactvec::PersistentCompactIntMatrix; -use obicompactvec::PersistentCompactIntVec; -use obidebruinj::GraphDeBruijn; use obikpartitionner::KmerPartition; -use obikseq::{RoutableSuperKmer, set_k, set_m}; -use obiskio::SKFileReader; -use obilayeredmap::layer::Layer; -use obisys::{Reporter, Stage}; -use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; -use rayon::prelude::*; +use obikseq::{set_k, set_m}; +use obisys::Reporter; use tracing::info; -use crate::cli::{CommonArgs, PipelineData, open_chunks}; - -type Mphf = PtrHash>, Xx64, Vec>; +use crate::cli::CommonArgs; +use crate::steps::{build_index, dereplicate_and_count, scatter}; #[derive(Args)] pub struct IndexArgs { @@ -42,20 +27,24 @@ pub struct IndexArgs { #[arg(long)] pub max_abundance: Option, + /// Store kmer counts in the index (default: set membership only) + #[arg(long, default_value_t = false)] + pub with_counts: bool, + + /// Keep intermediate build files (dereplicated superkmers, mphf1, counts1) + #[arg(long, default_value_t = false)] + pub keep_intermediate: bool, + #[command(flatten)] pub common: CommonArgs, } pub fn run(args: IndexArgs) { let output = args.output.clone(); - let min_ab = args.min_abundance; - let max_ab = args.max_abundance; - - // ── Stage 1: scatter (skipped if partition already exists) ─────────────── - let partition_meta = output.join("partition.meta"); let mut rep = Reporter::new(); - let kp = if partition_meta.exists() { + // ── Stage 1: scatter (skipped if partition already exists) ─────────────── + let kp = if output.join("partition.meta").exists() { info!("resuming from existing partition at {}", output.display()); let kp = KmerPartition::open(&output).unwrap_or_else(|e| { eprintln!("error opening partition: {e}"); @@ -80,187 +69,27 @@ pub fn run(args: IndexArgs) { std::process::exit(1); }); - let path_source = args.common.seqfile_paths(); - - let t = Stage::start("scatter"); - let pipe = obipipeline::make_pipe! { - PipelineData : PathBuf => Vec, - ||? { |path| open_chunks(path) } : Path => RawChunk, - |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, - | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, - }; - - let pb = ProgressBar::new_spinner(); - pb.set_style( - ProgressStyle::with_template("{spinner} scatter — {msg} {elapsed}") - .unwrap() - .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]), - ); - pb.enable_steady_tick(Duration::from_millis(100)); - - let mut total_bases: u64 = 0; - let mut ema_rate: f64 = 0.0; - let mut last_t = Instant::now(); - let mut last_bases: u64 = 0; - const ALPHA: f64 = 0.15; - - for batch in pipe.apply(path_source, n_workers, 1) { - total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::(); - let now = Instant::now(); - let dt = now.duration_since(last_t).as_secs_f64(); - if dt > 0.1 { - let instant = (total_bases - last_bases) as f64 / dt; - ema_rate = ALPHA * instant + (1.0 - ALPHA) * ema_rate; - last_t = now; - last_bases = total_bases; - let bp = total_bases as f64; - let (count_str, rate_str) = if bp >= 1e9 { - (format!("{:.2} Gbp", bp / 1e9), format!("{:.0} Mbp/s", ema_rate / 1e6)) - } else { - (format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6)) - }; - pb.set_message(format!("{count_str} {rate_str}")); - } - kp.write_batch(batch).unwrap_or_else(|e| { - eprintln!("error: {e}"); - std::process::exit(1); - }); - } - pb.finish_and_clear(); - kp.close().expect("close error"); - rep.push(t.stop()); + scatter(&mut kp, args.common.seqfile_paths(), k, level_max, theta, n_workers, &mut rep); kp }; - let n = kp.n_partitions(); // ── Stage 2: dereplicate + count (skipped if already done) ─────────────── - let counting_done = output.join("kmer_spectrum_raw.json").exists(); - if !counting_done { - info!("dereplicating..."); - let t = Stage::start("dereplicate"); - kp.dereplicate().expect("dereplicate error"); - rep.push(t.stop()); - - let t = Stage::start("count_kmer"); - info!("counting kmers..."); - kp.count_kmer().expect("count kmer error"); - rep.push(t.stop()); + if !output.join("kmer_spectrum_raw.json").exists() { + dereplicate_and_count(&kp, &mut rep); } else { info!("kmer counts already present, skipping dereplicate + count"); } - // ── Stage 3: build layered index per partition ──────────────────────────── - let t = Stage::start("index"); - let total_kmers = AtomicUsize::new(0); - let filter_active = min_ab > 1 || max_ab.is_some(); + // ── Stage 3: build layered index ───────────────────────────────────────── + build_index( + &kp, + args.min_abundance, + args.max_abundance, + args.with_counts, + args.keep_intermediate, + &mut rep, + ); - let pb = ProgressBar::new(n as u64); - pb.set_style(ProgressStyle::with_template( - "index — [{bar:20}] {pos}/{len} | {msg}", - ).unwrap()); - - let pb = Arc::new(Mutex::new(pb)); - - (0..n).into_par_iter().for_each(|i| { - let part_dir = output.join(format!("part_{i:05}")); - let dedup_path = part_dir.join("dereplicated.skmer.zst"); - if !dedup_path.exists() { - return; - } - - let layer_dir = part_dir.join("index").join("layer_0"); - if layer_dir.join("mphf.bin").exists() { - return; - } - - // Load partition MPHF + counts for abundance filtering - let mphf1_opt: Option = if filter_active { - let p = part_dir.join("mphf1.bin"); - p.exists() - .then(|| Mphf::load_full(&p).ok()) - .flatten() - } else { - None - }; - - let counts1_opt: Option = if filter_active { - let p = part_dir.join("counts1.bin"); - p.exists() - .then(|| PersistentCompactIntVec::open(&p).ok()) - .flatten() - } else { - None - }; - - // Build de Bruijn graph with optional abundance filter - let mut g = GraphDeBruijn::new(); - let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| { - eprintln!("error opening {}: {e}", dedup_path.display()); - std::process::exit(1); - }); - for sk in reader.iter() { - for kmer in sk.iter_canonical_kmers() { - let accept = match (&mphf1_opt, &counts1_opt) { - (Some(mphf), Some(counts)) => { - let slot = mphf.index(&kmer.raw()); - let ab = counts.get(slot); - ab >= min_ab && max_ab.map_or(true, |max| ab <= max) - } - _ => true, - }; - if accept { - g.push(kmer); - } - } - } - - let n_kmers = g.len(); - total_kmers.fetch_add(n_kmers, Ordering::Relaxed); - g.compute_degrees(); - - // Write unitigs to layer_0/unitigs.bin - fs::create_dir_all(&layer_dir).unwrap_or_else(|e| { - eprintln!("error creating {}: {e}", layer_dir.display()); - std::process::exit(1); - }); - let mut uw = Layer::::unitig_writer(&layer_dir) - .unwrap_or_else(|e| { - eprintln!("error creating unitig writer for partition {i}: {e}"); - std::process::exit(1); - }); - for unitig in g.iter_unitig() { - uw.write(&unitig).unwrap_or_else(|e| { - eprintln!("error writing unitig (partition {i}): {e}"); - std::process::exit(1); - }); - } - uw.close().unwrap_or_else(|e| { - eprintln!("error closing unitig writer (partition {i}): {e}"); - std::process::exit(1); - }); - - // Build MPHF + count matrix for this layer - Layer::::build(&layer_dir, |kmer| { - match (&mphf1_opt, &counts1_opt) { - (Some(mphf), Some(counts)) => { - let slot = mphf.index(&kmer.raw()); - counts.get(slot) - } - _ => 1, - } - }) - .unwrap_or_else(|e| { - eprintln!("error building layer index (partition {i}): {e}"); - std::process::exit(1); - }); - - let pb = pb.lock().unwrap(); - pb.inc(1); - pb.set_message(format!("{i}: {n_kmers} kmers")); - }); - - pb.lock().unwrap().finish_and_clear(); - rep.push(t.stop()); rep.print(); } diff --git a/src/obikmer/src/cmd/longtig.rs b/src/obikmer/src/cmd/longtig.rs index fa8d1bf..1d8b922 100644 --- a/src/obikmer/src/cmd/longtig.rs +++ b/src/obikmer/src/cmd/longtig.rs @@ -41,7 +41,7 @@ pub fn run(args: LongtigArgs) { let total_kmers = AtomicUsize::new(0); (0..n).into_par_iter().for_each(|i| { - let part_dir = args.partition.join(format!("part_{i:05}")); + let part_dir = kp.part_dir(i); let in_path = part_dir.join("dereplicated.skmer.zst"); if !in_path.exists() { return; diff --git a/src/obikmer/src/cmd/partition.rs b/src/obikmer/src/cmd/partition.rs index c466267..6245b83 100644 --- a/src/obikmer/src/cmd/partition.rs +++ b/src/obikmer/src/cmd/partition.rs @@ -1,14 +1,12 @@ use std::path::PathBuf; -use std::time::{Duration, Instant}; use clap::Args; -use indicatif::{ProgressBar, ProgressStyle}; use obikpartitionner::KmerPartition; -use obisys::{Reporter, Stage}; -use obikseq::{RoutableSuperKmer, set_k, set_m}; -use tracing::info; +use obisys::Reporter; +use obikseq::{set_k, set_m}; -use crate::cli::{CommonArgs, PipelineData, open_chunks}; +use crate::cli::CommonArgs; +use crate::steps::{dereplicate_and_count, scatter}; #[derive(Args)] pub struct PartitionArgs { @@ -44,63 +42,8 @@ pub fn run(args: PartitionArgs) { let path_source = args.common.seqfile_paths(); let mut rep = Reporter::new(); - let t = Stage::start("scatter"); - let pipe = obipipeline::make_pipe! { - PipelineData : PathBuf => Vec, - ||? { |path| open_chunks(path) } : Path => RawChunk, - |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, - | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, - }; - - let pb = ProgressBar::new_spinner(); - pb.set_style( - ProgressStyle::with_template("{spinner} scatter — {msg} {elapsed}") - .unwrap() - .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]), - ); - pb.enable_steady_tick(Duration::from_millis(100)); - - let mut total_bases: u64 = 0; - let mut ema_rate: f64 = 0.0; - let mut last_t = Instant::now(); - let mut last_bases: u64 = 0; - const ALPHA: f64 = 0.15; - - for batch in pipe.apply(path_source, n_workers, 1) { - total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::(); - let now = Instant::now(); - let dt = now.duration_since(last_t).as_secs_f64(); - if dt > 0.1 { - let instant = (total_bases - last_bases) as f64 / dt; - ema_rate = ALPHA * instant + (1.0 - ALPHA) * ema_rate; - last_t = now; - last_bases = total_bases; - let bp = total_bases as f64; - let (count_str, rate_str) = if bp >= 1e9 { - (format!("{:.2} Gbp", bp / 1e9), format!("{:.0} Mbp/s", ema_rate / 1e6)) - } else { - (format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6)) - }; - pb.set_message(format!("{count_str} {rate_str}")); - } - kp.write_batch(batch).unwrap_or_else(|e| { - eprintln!("error: {e}"); - std::process::exit(1) - }); - } - pb.finish_and_clear(); - - kp.close().expect("close error"); - rep.push(t.stop()); - - info!("dereplicating..."); - let t = Stage::start("dereplicate"); - kp.dereplicate().expect("dereplicate error"); - rep.push(t.stop()); - - let t = Stage::start("count_kmer"); - kp.count_kmer().expect("count kmer error"); - rep.push(t.stop()); + scatter(&mut kp, path_source, k, level_max, theta, n_workers, &mut rep); + dereplicate_and_count(&kp, &mut rep); rep.print(); } diff --git a/src/obikmer/src/cmd/unitig.rs b/src/obikmer/src/cmd/unitig.rs index 66fd583..ddf0fc0 100644 --- a/src/obikmer/src/cmd/unitig.rs +++ b/src/obikmer/src/cmd/unitig.rs @@ -41,7 +41,7 @@ pub fn run(args: UnitigArgs) { let total_kmers = AtomicUsize::new(0); (0..n).into_par_iter().for_each(|i| { - let part_dir = args.partition.join(format!("part_{i:05}")); + let part_dir = kp.part_dir(i); let in_path = part_dir.join("dereplicated.skmer.zst"); if !in_path.exists() { return; diff --git a/src/obikmer/src/main.rs b/src/obikmer/src/main.rs index 6e19c05..22a3ef8 100644 --- a/src/obikmer/src/main.rs +++ b/src/obikmer/src/main.rs @@ -1,5 +1,6 @@ mod cli; mod cmd; +mod steps; use clap::{Parser, Subcommand}; use tracing_subscriber::{EnvFilter, fmt}; diff --git a/src/obikmer/src/steps/build_index.rs b/src/obikmer/src/steps/build_index.rs new file mode 100644 index 0000000..31e7629 --- /dev/null +++ b/src/obikmer/src/steps/build_index.rs @@ -0,0 +1,177 @@ +use std::fs; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; + +use cacheline_ef::{CachelineEf, CachelineEfVec}; +use epserde::prelude::*; +use indicatif::{ProgressBar, ProgressStyle}; +use obicompactvec::PersistentCompactIntMatrix; +use obicompactvec::PersistentCompactIntVec; +use obidebruinj::GraphDeBruijn; +use obikpartitionner::KmerPartition; +use obilayeredmap::layer::Layer; +use obiskio::{SKFileMeta, SKFileReader}; +use obisys::{Reporter, Stage}; +use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; +use rayon::prelude::*; +use tracing::info; + +type Mphf = PtrHash>, Xx64, Vec>; + +/// Build the layered MPHF index for all partitions in parallel. +/// +/// Default mode (with_counts = false): set membership only (`Layer<()>`). +/// With counts (with_counts = true): count matrix per kmer (`Layer`). +/// +/// Skips any partition whose `index/layer_0/mphf.bin` already exists (resume). +/// Reports the "index" stage to `rep`. +pub fn build_index( + kp: &KmerPartition, + min_ab: u32, + max_ab: Option, + with_counts: bool, + keep_intermediate: bool, + rep: &mut Reporter, +) { + let n = kp.n_partitions(); + let t = Stage::start("index"); + let total_kmers = AtomicUsize::new(0); + let filter_active = min_ab > 1 || max_ab.is_some(); + let need_counts = filter_active || with_counts; + + let pb = Arc::new(Mutex::new( + ProgressBar::new(n as u64).with_style( + ProgressStyle::with_template("index — [{bar:20}] {pos}/{len} | {msg}").unwrap(), + ), + )); + + (0..n).into_par_iter().for_each(|i| { + let part_dir = kp.part_dir(i); + let dedup_path = part_dir.join("dereplicated.skmer.zst"); + if !dedup_path.exists() { + return; + } + + let layer_dir = part_dir.join("index").join("layer_0"); + if layer_dir.join("mphf.bin").exists() { + return; + } + + // Load partition MPHF + counts when needed for filtering or count payload + let mphf1_opt: Option = if need_counts { + let p = part_dir.join("mphf1.bin"); + p.exists().then(|| Mphf::load_full(&p).ok()).flatten() + } else { + None + }; + + let counts1_opt: Option = if need_counts { + let p = part_dir.join("counts1.bin"); + p.exists() + .then(|| PersistentCompactIntVec::open(&p).ok()) + .flatten() + } else { + None + }; + + // Build de Bruijn graph with optional abundance filter + let mut g = GraphDeBruijn::new(); + let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| { + eprintln!("error opening {}: {e}", dedup_path.display()); + std::process::exit(1); + }); + for sk in reader.iter() { + for kmer in sk.iter_canonical_kmers() { + let accept = if filter_active { + match (&mphf1_opt, &counts1_opt) { + (Some(mphf), Some(counts)) => { + let ab = counts.get(mphf.index(&kmer.raw())); + ab >= min_ab && max_ab.map_or(true, |max| ab <= max) + } + _ => true, + } + } else { + true + }; + if accept { + g.push(kmer); + } + } + } + + let n_kmers = g.len(); + total_kmers.fetch_add(n_kmers, Ordering::Relaxed); + g.compute_degrees(); + + // Write unitigs to layer_0/unitigs.bin + fs::create_dir_all(&layer_dir).unwrap_or_else(|e| { + eprintln!("error creating {}: {e}", layer_dir.display()); + std::process::exit(1); + }); + let mut uw = Layer::<()>::unitig_writer(&layer_dir).unwrap_or_else(|e| { + eprintln!("error creating unitig writer (partition {i}): {e}"); + std::process::exit(1); + }); + for unitig in g.iter_unitig() { + uw.write(&unitig).unwrap_or_else(|e| { + eprintln!("error writing unitig (partition {i}): {e}"); + std::process::exit(1); + }); + } + uw.close().unwrap_or_else(|e| { + eprintln!("error closing unitig writer (partition {i}): {e}"); + std::process::exit(1); + }); + + // Build MPHF layer — mode depends on --with-counts + if with_counts { + Layer::::build(&layer_dir, |kmer| { + match (&mphf1_opt, &counts1_opt) { + (Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())), + _ => 1, + } + }) + .unwrap_or_else(|e| { + eprintln!("error building count layer (partition {i}): {e}"); + std::process::exit(1); + }); + } else { + Layer::<()>::build(&layer_dir).unwrap_or_else(|e| { + eprintln!("error building set layer (partition {i}): {e}"); + std::process::exit(1); + }); + } + + let pb = pb.lock().unwrap(); + pb.inc(1); + pb.set_message(format!("{i}: {n_kmers} kmers")); + }); + + pb.lock().unwrap().finish_and_clear(); + info!( + "done — {} total kmers indexed", + total_kmers.load(Ordering::Relaxed) + ); + + // ── Cleanup intermediate build files ────────────────────────────────────── + if !keep_intermediate { + for i in 0..n { + let part_dir = kp.part_dir(i); + remove_if_exists(&part_dir.join("dereplicated.skmer.zst")); + remove_if_exists(&SKFileMeta::sidecar_path(&part_dir.join("dereplicated.skmer.zst"))); + remove_if_exists(&part_dir.join("mphf1.bin")); + remove_if_exists(&part_dir.join("counts1.bin")); + } + } + + rep.push(t.stop()); +} + +fn remove_if_exists(path: &Path) { + if let Err(e) = fs::remove_file(path) { + if e.kind() != std::io::ErrorKind::NotFound { + eprintln!("warning: could not remove {}: {e}", path.display()); + } + } +} diff --git a/src/obikmer/src/steps/dereplicate_and_count.rs b/src/obikmer/src/steps/dereplicate_and_count.rs new file mode 100644 index 0000000..68fd99c --- /dev/null +++ b/src/obikmer/src/steps/dereplicate_and_count.rs @@ -0,0 +1,13 @@ +use obikpartitionner::KmerPartition; +use obisys::{Reporter, Stage}; + +/// Dereplicate then count kmers. Reports each stage to `rep`. +pub fn dereplicate_and_count(kp: &KmerPartition, rep: &mut Reporter) { + let t = Stage::start("dereplicate"); + kp.dereplicate().expect("dereplicate error"); + rep.push(t.stop()); + + let t = Stage::start("count_kmer"); + kp.count_kmer().expect("count kmer error"); + rep.push(t.stop()); +} diff --git a/src/obikmer/src/steps/mod.rs b/src/obikmer/src/steps/mod.rs new file mode 100644 index 0000000..4666631 --- /dev/null +++ b/src/obikmer/src/steps/mod.rs @@ -0,0 +1,7 @@ +mod build_index; +mod dereplicate_and_count; +mod scatter; + +pub use build_index::build_index; +pub use dereplicate_and_count::dereplicate_and_count; +pub use scatter::scatter; diff --git a/src/obikmer/src/steps/scatter.rs b/src/obikmer/src/steps/scatter.rs new file mode 100644 index 0000000..72776ab --- /dev/null +++ b/src/obikmer/src/steps/scatter.rs @@ -0,0 +1,71 @@ +use std::path::PathBuf; +use std::time::{Duration, Instant}; + +use indicatif::{ProgressBar, ProgressStyle}; +use obikpartitionner::KmerPartition; +use obisys::{Reporter, Stage}; + +use crate::cli::{PipelineData, open_chunks}; + +/// Run scatter: normalise → build superkmers → route to partition → close. +/// Reports the "scatter" stage to `rep`. +pub fn scatter( + kp: &mut KmerPartition, + path_source: obiread::PathIter, + k: usize, + level_max: usize, + theta: f64, + n_workers: usize, + rep: &mut Reporter, +) { + use obikseq::RoutableSuperKmer; + + let t = Stage::start("scatter"); + let pipe = obipipeline::make_pipe! { + PipelineData : PathBuf => Vec, + ||? { |path| open_chunks(path) } : Path => RawChunk, + |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, + | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, + }; + + let pb = ProgressBar::new_spinner(); + pb.set_style( + ProgressStyle::with_template("{spinner} scatter — {msg} {elapsed}") + .unwrap() + .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]), + ); + pb.enable_steady_tick(Duration::from_millis(100)); + + let mut total_bases: u64 = 0; + let mut ema_rate: f64 = 0.0; + let mut last_t = Instant::now(); + let mut last_bases: u64 = 0; + const ALPHA: f64 = 0.15; + + for batch in pipe.apply(path_source, n_workers, 1) { + total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::(); + let now = Instant::now(); + let dt = now.duration_since(last_t).as_secs_f64(); + if dt > 0.1 { + let instant = (total_bases - last_bases) as f64 / dt; + ema_rate = ALPHA * instant + (1.0 - ALPHA) * ema_rate; + last_t = now; + last_bases = total_bases; + let bp = total_bases as f64; + let (count_str, rate_str) = if bp >= 1e9 { + (format!("{:.2} Gbp", bp / 1e9), format!("{:.0} Mbp/s", ema_rate / 1e6)) + } else { + (format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6)) + }; + pb.set_message(format!("{count_str} {rate_str}")); + } + kp.write_batch(batch).unwrap_or_else(|e| { + eprintln!("error: {e}"); + std::process::exit(1); + }); + } + pb.finish_and_clear(); + kp.close().expect("close error"); + rep.push(t.stop()); +} + diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index a1e0b9f..09fc147 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -1,4 +1,4 @@ mod kmer_sort; mod partition; -pub use partition::KmerPartition; +pub use partition::{KmerPartition, PARTITIONS_SUBDIR}; diff --git a/src/obikpartitionner/src/partition.rs b/src/obikpartitionner/src/partition.rs index ab07083..a638f93 100644 --- a/src/obikpartitionner/src/partition.rs +++ b/src/obikpartitionner/src/partition.rs @@ -30,6 +30,7 @@ type Mphf = PtrHash>, Xx64, Vec PathBuf { + self.root_path.join(PARTITIONS_SUBDIR).join(format!("part_{i:05}")) + } + pub fn kmer_size(&self) -> usize { self.kmer_size } @@ -208,7 +214,6 @@ impl KmerPartition { /// more temporary file descriptors — all managed by the global fd pool. pub fn dereplicate(&self) -> SKResult<()> { let level = self.level; - let root = &self.root_path; let sys = System::new_all(); // available_memory() can return 0 on macOS when the compressor page count exceeds // free+inactive+purgeable pages (sysinfo saturating_sub). Fall back to half of total. @@ -222,7 +227,7 @@ impl KmerPartition { let results: Vec> = (0..self.n_partitions) .into_par_iter() .map(|i| { - let dir = root.join(format!("part_{:05}", i)); + let dir = self.part_dir(i); if !dir.exists() { return Ok(()); } @@ -249,8 +254,6 @@ impl KmerPartition { /// Partitions are processed in parallel via Rayon (one task per thread). /// Peak memory per partition is ~80 MB, so n_threads partitions run simultaneously. pub fn count_kmer(&self) -> SKResult<()> { - let root = &self.root_path; - let sys = System::new_all(); let available = match sys.available_memory() { 0 => sys.total_memory() / 2, @@ -271,7 +274,7 @@ impl KmerPartition { let results: Vec> = (0..self.n_partitions) .into_par_iter() .map(|i| { - let dir = root.join(format!("part_{:05}", i)); + let dir = self.part_dir(i); let dedup_path = dir.join(format!("dereplicated.{SK_EXT}")); if !dedup_path.exists() { pb.inc(1); @@ -296,9 +299,7 @@ impl KmerPartition { let mut global_f1: u64 = 0; for i in 0..self.n_partitions { - let path = root - .join(format!("part_{:05}", i)) - .join("kmer_spectrum_raw.json"); + let path = self.part_dir(i).join("kmer_spectrum_raw.json"); if !path.exists() { continue; } @@ -320,7 +321,7 @@ impl KmerPartition { .map(|(&c, &f)| (format!("{c:010}"), f)) .collect(); serde_json::to_writer_pretty( - fs::File::create(root.join("kmer_spectrum_raw.json"))?, + fs::File::create(self.root_path.join("kmer_spectrum_raw.json"))?, &serde_json::json!({ "f0": global_f0, "f1": global_f1, "spectrum": &global_spectrum_map }), ) .map_err(io::Error::other)?; @@ -352,7 +353,7 @@ impl KmerPartition { fn ensure_writer(&mut self, partition: usize) -> SKResult<&mut SKFileWriter> { if self.writers[partition].is_none() { - let dir = self.root_path.join(format!("part_{:05}", partition)); + let dir = self.root_path.join(PARTITIONS_SUBDIR).join(format!("part_{:05}", partition)); fs::create_dir_all(&dir)?; let file_path = dir.join(format!("raw.{SK_EXT}")); let writer = SKFileWriter::create_with(file_path, Format::Zstd, self.level)?; @@ -645,7 +646,7 @@ mod tests { kp.close().unwrap(); kp.dereplicate().unwrap(); - let part_dir = dir.path().join("part_00000"); + let part_dir = dir.path().join(PARTITIONS_SUBDIR).join("part_00000"); let dedup_path = part_dir.join("dereplicated.skmer.zst"); if !dedup_path.exists() { return (0, 0);