feat: Add progress tracking and timing instrumentation to index
Introduces comprehensive progress tracking and timing instrumentation using indicatif and obisys::Reporter/Stage. Adds an EMA-based throughput calculator for the scatter phase and wraps parallel progress bars in Arc<Mutex> for thread-safe concurrent updates across all pipeline stages.
This commit is contained in:
@@ -1,10 +1,13 @@
|
|||||||
use std::fs;
|
use std::fs;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use cacheline_ef::{CachelineEf, CachelineEfVec};
|
use cacheline_ef::{CachelineEf, CachelineEfVec};
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use epserde::prelude::*;
|
use epserde::prelude::*;
|
||||||
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
use obicompactvec::PersistentCompactIntMatrix;
|
use obicompactvec::PersistentCompactIntMatrix;
|
||||||
use obicompactvec::PersistentCompactIntVec;
|
use obicompactvec::PersistentCompactIntVec;
|
||||||
use obidebruinj::GraphDeBruijn;
|
use obidebruinj::GraphDeBruijn;
|
||||||
@@ -12,6 +15,7 @@ use obikpartitionner::KmerPartition;
|
|||||||
use obikseq::{RoutableSuperKmer, set_k, set_m};
|
use obikseq::{RoutableSuperKmer, set_k, set_m};
|
||||||
use obiskio::SKFileReader;
|
use obiskio::SKFileReader;
|
||||||
use obilayeredmap::layer::Layer;
|
use obilayeredmap::layer::Layer;
|
||||||
|
use obisys::{Reporter, Stage};
|
||||||
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
|
use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
@@ -49,6 +53,8 @@ pub fn run(args: IndexArgs) {
|
|||||||
|
|
||||||
// ── Stage 1: scatter (skipped if partition already exists) ───────────────
|
// ── Stage 1: scatter (skipped if partition already exists) ───────────────
|
||||||
let partition_meta = output.join("partition.meta");
|
let partition_meta = output.join("partition.meta");
|
||||||
|
let mut rep = Reporter::new();
|
||||||
|
|
||||||
let kp = if partition_meta.exists() {
|
let kp = if partition_meta.exists() {
|
||||||
info!("resuming from existing partition at {}", output.display());
|
info!("resuming from existing partition at {}", output.display());
|
||||||
let kp = KmerPartition::open(&output).unwrap_or_else(|e| {
|
let kp = KmerPartition::open(&output).unwrap_or_else(|e| {
|
||||||
@@ -76,7 +82,7 @@ pub fn run(args: IndexArgs) {
|
|||||||
|
|
||||||
let path_source = args.common.seqfile_paths();
|
let path_source = args.common.seqfile_paths();
|
||||||
|
|
||||||
info!("scattering...");
|
let t = Stage::start("scatter");
|
||||||
let pipe = obipipeline::make_pipe! {
|
let pipe = obipipeline::make_pipe! {
|
||||||
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
|
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
|
||||||
||? { |path| open_chunks(path) } : Path => RawChunk,
|
||? { |path| open_chunks(path) } : Path => RawChunk,
|
||||||
@@ -84,35 +90,78 @@ pub fn run(args: IndexArgs) {
|
|||||||
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
|
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let pb = ProgressBar::new_spinner();
|
||||||
|
pb.set_style(
|
||||||
|
ProgressStyle::with_template("{spinner} scatter — {msg} {elapsed}")
|
||||||
|
.unwrap()
|
||||||
|
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
|
||||||
|
);
|
||||||
|
pb.enable_steady_tick(Duration::from_millis(100));
|
||||||
|
|
||||||
|
let mut total_bases: u64 = 0;
|
||||||
|
let mut ema_rate: f64 = 0.0;
|
||||||
|
let mut last_t = Instant::now();
|
||||||
|
let mut last_bases: u64 = 0;
|
||||||
|
const ALPHA: f64 = 0.15;
|
||||||
|
|
||||||
for batch in pipe.apply(path_source, n_workers, 1) {
|
for batch in pipe.apply(path_source, n_workers, 1) {
|
||||||
|
total_bases += batch.iter().map(|sk| sk.seql() as u64).sum::<u64>();
|
||||||
|
let now = Instant::now();
|
||||||
|
let dt = now.duration_since(last_t).as_secs_f64();
|
||||||
|
if dt > 0.1 {
|
||||||
|
let instant = (total_bases - last_bases) as f64 / dt;
|
||||||
|
ema_rate = ALPHA * instant + (1.0 - ALPHA) * ema_rate;
|
||||||
|
last_t = now;
|
||||||
|
last_bases = total_bases;
|
||||||
|
let bp = total_bases as f64;
|
||||||
|
let (count_str, rate_str) = if bp >= 1e9 {
|
||||||
|
(format!("{:.2} Gbp", bp / 1e9), format!("{:.0} Mbp/s", ema_rate / 1e6))
|
||||||
|
} else {
|
||||||
|
(format!("{:.0} Mbp", bp / 1e6), format!("{:.0} Mbp/s", ema_rate / 1e6))
|
||||||
|
};
|
||||||
|
pb.set_message(format!("{count_str} {rate_str}"));
|
||||||
|
}
|
||||||
kp.write_batch(batch).unwrap_or_else(|e| {
|
kp.write_batch(batch).unwrap_or_else(|e| {
|
||||||
eprintln!("error: {e}");
|
eprintln!("error: {e}");
|
||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
pb.finish_and_clear();
|
||||||
kp.close().expect("close error");
|
kp.close().expect("close error");
|
||||||
|
rep.push(t.stop());
|
||||||
kp
|
kp
|
||||||
};
|
};
|
||||||
|
|
||||||
let k = kp.kmer_size();
|
|
||||||
let n = kp.n_partitions();
|
let n = kp.n_partitions();
|
||||||
|
|
||||||
// ── Stage 2: dereplicate + count (skipped if already done) ───────────────
|
// ── Stage 2: dereplicate + count (skipped if already done) ───────────────
|
||||||
let counting_done = output.join("kmer_spectrum_raw.json").exists();
|
let counting_done = output.join("kmer_spectrum_raw.json").exists();
|
||||||
if !counting_done {
|
if !counting_done {
|
||||||
info!("dereplicating...");
|
info!("dereplicating...");
|
||||||
|
let t = Stage::start("dereplicate");
|
||||||
kp.dereplicate().expect("dereplicate error");
|
kp.dereplicate().expect("dereplicate error");
|
||||||
|
rep.push(t.stop());
|
||||||
|
|
||||||
|
let t = Stage::start("count_kmer");
|
||||||
info!("counting kmers...");
|
info!("counting kmers...");
|
||||||
kp.count_kmer().expect("count kmer error");
|
kp.count_kmer().expect("count kmer error");
|
||||||
|
rep.push(t.stop());
|
||||||
} else {
|
} else {
|
||||||
info!("kmer counts already present, skipping dereplicate + count");
|
info!("kmer counts already present, skipping dereplicate + count");
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Stage 3: build layered index per partition ────────────────────────────
|
// ── Stage 3: build layered index per partition ────────────────────────────
|
||||||
info!("building index ({n} partitions, k={k})...");
|
let t = Stage::start("index");
|
||||||
let total_kmers = AtomicUsize::new(0);
|
let total_kmers = AtomicUsize::new(0);
|
||||||
let filter_active = min_ab > 1 || max_ab.is_some();
|
let filter_active = min_ab > 1 || max_ab.is_some();
|
||||||
|
|
||||||
|
let pb = ProgressBar::new(n as u64);
|
||||||
|
pb.set_style(ProgressStyle::with_template(
|
||||||
|
"index — [{bar:20}] {pos}/{len} | {msg}",
|
||||||
|
).unwrap());
|
||||||
|
|
||||||
|
let pb = Arc::new(Mutex::new(pb));
|
||||||
|
|
||||||
(0..n).into_par_iter().for_each(|i| {
|
(0..n).into_par_iter().for_each(|i| {
|
||||||
let part_dir = output.join(format!("part_{i:05}"));
|
let part_dir = output.join(format!("part_{i:05}"));
|
||||||
let dedup_path = part_dir.join("dereplicated.skmer.zst");
|
let dedup_path = part_dir.join("dereplicated.skmer.zst");
|
||||||
@@ -206,11 +255,12 @@ pub fn run(args: IndexArgs) {
|
|||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
info!("partition {i}: {n_kmers} kmers indexed");
|
let pb = pb.lock().unwrap();
|
||||||
|
pb.inc(1);
|
||||||
|
pb.set_message(format!("{i}: {n_kmers} kmers"));
|
||||||
});
|
});
|
||||||
|
|
||||||
info!(
|
pb.lock().unwrap().finish_and_clear();
|
||||||
"done — {} total kmers indexed across all partitions",
|
rep.push(t.stop());
|
||||||
total_kmers.load(Ordering::Relaxed)
|
rep.print();
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user