feat(obikmer): add obisys profiling to partition pipeline

Added obisys as a local dependency and integrated its Reporter and Stage instrumentation into the partition command. Each major phase (scatter, dereplicate, and kmer-counting) is now wrapped in timing blocks, with aggregated execution times printed to stdout upon completion.
This commit is contained in:
Eric Coissac
2026-05-17 16:01:49 +08:00
parent d0c277d5b6
commit 8c16b79983
3 changed files with 13 additions and 1 deletions
+1
View File
@@ -1734,6 +1734,7 @@ dependencies = [
"obiread", "obiread",
"obiskbuilder", "obiskbuilder",
"obiskio", "obiskio",
"obisys",
"ph", "ph",
"pprof", "pprof",
"rayon", "rayon",
+1
View File
@@ -17,6 +17,7 @@ obidebruinj = { path = "../obidebruinj" }
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
obikrope = { path = "../obikrope" } obikrope = { path = "../obikrope" }
obikpartitionner = { path = "../obikpartitionner" } obikpartitionner = { path = "../obikpartitionner" }
obisys = { path = "../obisys" }
obiskio = { path = "../obiskio" } obiskio = { path = "../obiskio" }
niffler = "3" niffler = "3"
rayon = "1" rayon = "1"
+11 -1
View File
@@ -2,6 +2,7 @@ use std::path::PathBuf;
use clap::Args; use clap::Args;
use obikpartitionner::KmerPartition; use obikpartitionner::KmerPartition;
use obisys::{Reporter, Stage};
use obikseq::{RoutableSuperKmer, set_k, set_m}; use obikseq::{RoutableSuperKmer, set_k, set_m};
use tracing::info; use tracing::info;
@@ -39,14 +40,15 @@ pub fn run(args: PartitionArgs) {
}); });
let path_source = args.common.seqfile_paths(); let path_source = args.common.seqfile_paths();
let mut rep = Reporter::new();
let t = Stage::start("scatter");
let pipe = obipipeline::make_pipe! { let pipe = obipipeline::make_pipe! {
PipelineData : PathBuf => Vec<RoutableSuperKmer>, PipelineData : PathBuf => Vec<RoutableSuperKmer>,
||? { |path| open_chunks(path) } : Path => RawChunk, ||? { |path| open_chunks(path) } : Path => RawChunk,
|? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk, |? { move |rope| obiread::normalize_sequence_chunk(rope, k) } : RawChunk => NormChunk,
| { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch, | { move |rope| obiskbuilder::build_superkmers(rope, k, level_max, theta) }: NormChunk => Batch,
}; };
for batch in pipe.apply(path_source, n_workers, 1) { for batch in pipe.apply(path_source, n_workers, 1) {
kp.write_batch(batch).unwrap_or_else(|e| { kp.write_batch(batch).unwrap_or_else(|e| {
eprintln!("error: {e}"); eprintln!("error: {e}");
@@ -54,8 +56,16 @@ pub fn run(args: PartitionArgs) {
}); });
} }
kp.close().expect("close error"); kp.close().expect("close error");
rep.push(t.stop());
info!("dereplicating..."); info!("dereplicating...");
let t = Stage::start("dereplicate");
kp.dereplicate().expect("dereplicate error"); kp.dereplicate().expect("dereplicate error");
rep.push(t.stop());
let t = Stage::start("count_kmer");
kp.count_kmer().expect("count kmer error"); kp.count_kmer().expect("count kmer error");
rep.push(t.stop());
rep.print();
} }