From cc2ed4bd315ca04cd522c4a151f4307930f40e8f Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 20 May 2026 15:02:58 +0200 Subject: [PATCH] feat: Add progress tracking and timing instrumentation to index Introduces comprehensive progress tracking and timing instrumentation using indicatif and obisys::Reporter/Stage. Adds an EMA-based throughput calculator for the scatter phase and wraps parallel progress bars in Arc for thread-safe concurrent updates across all pipeline stages. --- src/obikmer/src/cmd/index.rs | 66 +++++++++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 8 deletions(-) diff --git a/src/obikmer/src/cmd/index.rs b/src/obikmer/src/cmd/index.rs index 3e7d954..86cda2d 100644 --- a/src/obikmer/src/cmd/index.rs +++ b/src/obikmer/src/cmd/index.rs @@ -1,10 +1,13 @@ use std::fs; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use cacheline_ef::{CachelineEf, CachelineEfVec}; use clap::Args; use epserde::prelude::*; +use indicatif::{ProgressBar, ProgressStyle}; use obicompactvec::PersistentCompactIntMatrix; use obicompactvec::PersistentCompactIntVec; use obidebruinj::GraphDeBruijn; @@ -12,6 +15,7 @@ use obikpartitionner::KmerPartition; use obikseq::{RoutableSuperKmer, set_k, set_m}; use obiskio::SKFileReader; use obilayeredmap::layer::Layer; +use obisys::{Reporter, Stage}; use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; use rayon::prelude::*; use tracing::info; @@ -49,6 +53,8 @@ pub fn run(args: IndexArgs) { // ── Stage 1: scatter (skipped if partition already exists) ─────────────── let partition_meta = output.join("partition.meta"); + let mut rep = Reporter::new(); + let kp = if partition_meta.exists() { info!("resuming from existing partition at {}", output.display()); let kp = KmerPartition::open(&output).unwrap_or_else(|e| { @@ -76,7 +82,7 @@ pub fn run(args: IndexArgs) { let path_source = args.common.seqfile_paths(); - info!("scattering..."); + let t = Stage::start("scatter"); let pipe = obipipeline::make_pipe! { PipelineData : PathBuf => Vec, ||? { |path| open_chunks(path) } : Path => RawChunk, @@ -84,35 +90,78 @@ pub fn run(args: IndexArgs) { | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, }; + let pb = ProgressBar::new_spinner(); + pb.set_style( + ProgressStyle::with_template("{spinner} scatter — {msg} {elapsed}") + .unwrap() + .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]), + ); + pb.enable_steady_tick(Duration::from_millis(100)); + + let mut total_bases: u64 = 0; + let mut ema_rate: f64 = 0.0; + let mut last_t = Instant::now(); + let mut last_bases: u64 = 0; + const ALPHA: f64 = 0.15; + for batch in pipe.apply(path_source, n_workers, 1) { + total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::(); + let now = Instant::now(); + let dt = now.duration_since(last_t).as_secs_f64(); + if dt > 0.1 { + let instant = (total_bases - last_bases) as f64 / dt; + ema_rate = ALPHA * instant + (1.0 - ALPHA) * ema_rate; + last_t = now; + last_bases = total_bases; + let bp = total_bases as f64; + let (count_str, rate_str) = if bp >= 1e9 { + (format!("{:.2} Gbp", bp / 1e9), format!("{:.0} Mbp/s", ema_rate / 1e6)) + } else { + (format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6)) + }; + pb.set_message(format!("{count_str} {rate_str}")); + } kp.write_batch(batch).unwrap_or_else(|e| { eprintln!("error: {e}"); std::process::exit(1); }); } + pb.finish_and_clear(); kp.close().expect("close error"); + rep.push(t.stop()); kp }; - let k = kp.kmer_size(); let n = kp.n_partitions(); // ── Stage 2: dereplicate + count (skipped if already done) ─────────────── let counting_done = output.join("kmer_spectrum_raw.json").exists(); if !counting_done { info!("dereplicating..."); + let t = Stage::start("dereplicate"); kp.dereplicate().expect("dereplicate error"); + rep.push(t.stop()); + + let t = Stage::start("count_kmer"); info!("counting kmers..."); kp.count_kmer().expect("count kmer error"); + rep.push(t.stop()); } else { info!("kmer counts already present, skipping dereplicate + count"); } // ── Stage 3: build layered index per partition ──────────────────────────── - info!("building index ({n} partitions, k={k})..."); + let t = Stage::start("index"); let total_kmers = AtomicUsize::new(0); let filter_active = min_ab > 1 || max_ab.is_some(); + let pb = ProgressBar::new(n as u64); + pb.set_style(ProgressStyle::with_template( + "index — [{bar:20}] {pos}/{len} | {msg}", + ).unwrap()); + + let pb = Arc::new(Mutex::new(pb)); + (0..n).into_par_iter().for_each(|i| { let part_dir = output.join(format!("part_{i:05}")); let dedup_path = part_dir.join("dereplicated.skmer.zst"); @@ -206,11 +255,12 @@ pub fn run(args: IndexArgs) { std::process::exit(1); }); - info!("partition {i}: {n_kmers} kmers indexed"); + let pb = pb.lock().unwrap(); + pb.inc(1); + pb.set_message(format!("{i}: {n_kmers} kmers")); }); - info!( - "done — {} total kmers indexed across all partitions", - total_kmers.load(Ordering::Relaxed) - ); + pb.lock().unwrap().finish_and_clear(); + rep.push(t.stop()); + rep.print(); }