feat: add merge operation specs and partition progress bar
Added implementation specifications for the `merge` operation, detailing parallel partition processing, I/O paths, and kmer matrix aggregation across multiple indexes. Integrated an `indicatif` progress bar into the `rayon` parallel loop to monitor processing position, throughput, ETA, and recent partition duration.
This commit is contained in:
@@ -11,6 +11,17 @@
|
|||||||
|
|
||||||
- merge : pour construire un index à partir d'index existants
|
- merge : pour construire un index à partir d'index existants
|
||||||
- deux modes : count et presence/absence. count exige que tous les index mergés soient déjà en mode count. mode presence/absence par defaut. Si passage de mode count à mode presence/absence, par defaut presence = count >= 1. Possibilité de spécifier un seuil personnalisé.
|
- deux modes : count et presence/absence. count exige que tous les index mergés soient déjà en mode count. mode presence/absence par defaut. Si passage de mode count à mode presence/absence, par defaut presence = count >= 1. Possibilité de spécifier un seuil personnalisé.
|
||||||
|
- le merge doit se faire en parallèle sur chaque partition
|
||||||
|
- en entrée : une liste de chemins vers les index à fusionner
|
||||||
|
- en sortie : un nouvel index fusionné (option -o <output_index>)
|
||||||
|
- j'imagine comme algo:
|
||||||
|
- on copie le premier index dans le nouvel index
|
||||||
|
- on ajoute a chaque partition une matrice de count ou de presence s'il n'y en avait pas déjà.
|
||||||
|
- si besoin, on cree la colone 0 de la matrice de count ou de presence pour le genome courant
|
||||||
|
- on parcourt les partitions et les index à fusionner en parallèle
|
||||||
|
- pour chaque partition, on ajoute les kmer présents dans les index à fusionner au nouvel index
|
||||||
|
- si le kmer est déjà présent dans le nouvel index on ajoute le compte ou la presence du kmer dans la matrice de count ou de presence
|
||||||
|
- sinon, on ajoute le kmer dans une nouvelle layer
|
||||||
|
|
||||||
- filter : produit un nouvel index filtré à partir d'un index existant en verifiant que les kmer présents dans le nouvel index respectent les critères de filtrage spécifiés
|
- filter : produit un nouvel index filtré à partir d'un index existant en verifiant que les kmer présents dans le nouvel index respectent les critères de filtrage spécifiés
|
||||||
- quorum de presence en fraction-(min/max) du nombre de génomes, en nombre-(min/max) de génomes, si mode count la présence peut être défini par un seuil personnalisé minimum et maximum
|
- quorum de presence en fraction-(min/max) du nombre de génomes, en nombre-(min/max) de génomes, si mode count la présence peut être défini par un seuil personnalisé minimum et maximum
|
||||||
|
|||||||
@@ -219,19 +219,34 @@ impl KmerPartition {
|
|||||||
let n_threads = rayon::current_num_threads().max(1) as u64;
|
let n_threads = rayon::current_num_threads().max(1) as u64;
|
||||||
let available_per_thread = available / n_threads;
|
let available_per_thread = available / n_threads;
|
||||||
|
|
||||||
|
let pb = ProgressBar::new(self.n_partitions as u64);
|
||||||
|
pb.set_style(
|
||||||
|
ProgressStyle::with_template(
|
||||||
|
"dereplicating [{bar:40}] {pos}/{len} ({percent}%) {per_sec} eta {eta} {msg}",
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
.progress_chars("█▌░"),
|
||||||
|
);
|
||||||
|
|
||||||
let results: Vec<SKResult<()>> = (0..self.n_partitions)
|
let results: Vec<SKResult<()>> = (0..self.n_partitions)
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|i| {
|
.map(|i| {
|
||||||
let dir = self.part_dir(i);
|
let dir = self.part_dir(i);
|
||||||
if !dir.exists() {
|
if !dir.exists() {
|
||||||
|
pb.inc(1);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
let raw_path = dir.join(format!("raw.{SK_EXT}"));
|
let raw_path = dir.join(format!("raw.{SK_EXT}"));
|
||||||
|
let t = Instant::now();
|
||||||
let n_buckets = optimal_buckets(&raw_path, available_per_thread);
|
let n_buckets = optimal_buckets(&raw_path, available_per_thread);
|
||||||
dereplicate_partition(&dir, level, n_buckets)
|
let result = dereplicate_partition(&dir, level, n_buckets);
|
||||||
|
pb.set_message(format!("last {:.0}ms", t.elapsed().as_millis()));
|
||||||
|
pb.inc(1);
|
||||||
|
result
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
pb.finish_and_clear();
|
||||||
for r in results {
|
for r in results {
|
||||||
r?;
|
r?;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user