diff --git a/src/Cargo.lock b/src/Cargo.lock index 583e5a0..98562bb 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -489,6 +489,19 @@ dependencies = [ "impl-tools 0.11.4", ] +[[package]] +name = "console" +version = "0.15.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "054ccb5b10f9f2cbf51eb355ca1d05c2d279ce1804688d0db74b4733a5aeafd8" +dependencies = [ + "encode_unicode", + "libc", + "once_cell", + "unicode-width", + "windows-sys 0.59.0", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -747,6 +760,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "encode_unicode" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" + [[package]] name = "enum-as-inner" version = "0.6.1" @@ -1216,6 +1235,19 @@ dependencies = [ "hashbrown 0.17.0", ] +[[package]] +name = "indicatif" +version = "0.17.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235" +dependencies = [ + "console", + "number_prefix", + "portable-atomic", + "unicode-width", + "web-time", +] + [[package]] name = "infer" version = "0.19.0" @@ -1689,6 +1721,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "obicompactvec" version = "0.1.0" @@ -1723,6 +1761,7 @@ name = "obikmer" version = "0.1.0" dependencies = [ "clap", + "indicatif", "memmap2", "niffler 3.0.0", "obidebruinj", @@ -1748,6 +1787,7 @@ version = "0.1.0" dependencies = [ "cacheline-ef", "epserde 0.8.0", + "indicatif", "memmap2", "niffler 3.0.0", "obicompactvec", @@ -2934,6 +2974,12 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unicode-width" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" + [[package]] name = "untrusted" version = "0.9.0" @@ -3111,6 +3157,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.26.11" diff --git a/src/obikmer/Cargo.toml b/src/obikmer/Cargo.toml index d8a642d..95ad03f 100644 --- a/src/obikmer/Cargo.toml +++ b/src/obikmer/Cargo.toml @@ -23,6 +23,7 @@ niffler = "3" rayon = "1" ph = "0.11" memmap2 = "0.9" +indicatif = "0.17" tracing = "0.1.44" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } pprof = { version = "0.13", features = ["prost-codec"], optional = true } diff --git a/src/obikmer/src/cmd/partition.rs b/src/obikmer/src/cmd/partition.rs index 073c235..c466267 100644 --- a/src/obikmer/src/cmd/partition.rs +++ b/src/obikmer/src/cmd/partition.rs @@ -1,6 +1,8 @@ use std::path::PathBuf; +use std::time::{Duration, Instant}; use clap::Args; +use indicatif::{ProgressBar, ProgressStyle}; use obikpartitionner::KmerPartition; use obisys::{Reporter, Stage}; use obikseq::{RoutableSuperKmer, set_k, set_m}; @@ -49,12 +51,45 @@ pub fn run(args: PartitionArgs) { |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, }; + + let pb = ProgressBar::new_spinner(); + pb.set_style( + ProgressStyle::with_template("{spinner} scatter — {msg} {elapsed}") + .unwrap() + .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]), + ); + pb.enable_steady_tick(Duration::from_millis(100)); + + let mut total_bases: u64 = 0; + let mut ema_rate: f64 = 0.0; + let mut last_t = Instant::now(); + let mut last_bases: u64 = 0; + const ALPHA: f64 = 0.15; + for batch in pipe.apply(path_source, n_workers, 1) { + total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::(); + let now = Instant::now(); + let dt = now.duration_since(last_t).as_secs_f64(); + if dt > 0.1 { + let instant = (total_bases - last_bases) as f64 / dt; + ema_rate = ALPHA * instant + (1.0 - ALPHA) * ema_rate; + last_t = now; + last_bases = total_bases; + let bp = total_bases as f64; + let (count_str, rate_str) = if bp >= 1e9 { + (format!("{:.2} Gbp", bp / 1e9), format!("{:.0} Mbp/s", ema_rate / 1e6)) + } else { + (format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6)) + }; + pb.set_message(format!("{count_str} {rate_str}")); + } kp.write_batch(batch).unwrap_or_else(|e| { eprintln!("error: {e}"); std::process::exit(1) }); } + pb.finish_and_clear(); + kp.close().expect("close error"); rep.push(t.stop()); diff --git a/src/obikpartitionner/Cargo.toml b/src/obikpartitionner/Cargo.toml index 853e630..8e94ff2 100644 --- a/src/obikpartitionner/Cargo.toml +++ b/src/obikpartitionner/Cargo.toml @@ -25,3 +25,4 @@ epserde = "0.8" memmap2 = "0.9.10" obicompactvec = { path = "../obicompactvec" } ptr_hash = "1.1" +indicatif = "0.17" diff --git a/src/obikpartitionner/src/partition.rs b/src/obikpartitionner/src/partition.rs index 61698e2..ab07083 100644 --- a/src/obikpartitionner/src/partition.rs +++ b/src/obikpartitionner/src/partition.rs @@ -2,7 +2,10 @@ use std::collections::{BTreeMap, HashMap}; use std::fs; use std::io; use std::path::{Path, PathBuf}; -use tracing::{debug, info}; +use std::time::Instant; +use tracing::debug; + +use indicatif::{ProgressBar, ProgressStyle}; use cacheline_ef::{CachelineEf, CachelineEfVec}; use epserde::ser::Serialize as EpSerialize; @@ -55,7 +58,7 @@ impl KmerPartition { minimizer_size: usize, force: bool, ) -> SKResult { - Self::create_with(path, n_bits, kmer_size, minimizer_size, Level::Three, force) + Self::create_with(path, n_bits, kmer_size, minimizer_size, Level::One, force) } pub fn create_with>( @@ -256,18 +259,33 @@ impl KmerPartition { let n_threads = rayon::current_num_threads().max(1) as u64; let chunk_kmers = chunk_size_from_ram(available / n_threads); + let pb = ProgressBar::new(self.n_partitions as u64); + pb.set_style( + ProgressStyle::with_template( + "counting [{bar:40}] {pos}/{len} ({percent}%) {per_sec} eta {eta} {msg}", + ) + .unwrap() + .progress_chars("█▌░"), + ); + let results: Vec> = (0..self.n_partitions) .into_par_iter() .map(|i| { let dir = root.join(format!("part_{:05}", i)); let dedup_path = dir.join(format!("dereplicated.{SK_EXT}")); if !dedup_path.exists() { + pb.inc(1); return Ok(()); } - info!("counting kmers in partition {}/{}", i, self.n_partitions); - count_partition(&dir, &dedup_path, chunk_kmers) + let t = Instant::now(); + let result = count_partition(&dir, &dedup_path, chunk_kmers); + pb.set_message(format!("last {:.0}ms", t.elapsed().as_millis())); + pb.inc(1); + result }) .collect(); + + pb.finish_and_clear(); for r in results { r?; }