refactor: replace rayon with NUMA-aware PartitionRunner

Replaces `rayon` parallel iteration across index, rebuild, reindex, and select modules with a custom `PartitionRunner`. This introduces NUMA-aware task distribution with CPU pinning and round-robin scheduling, eliminating `Arc`, `Mutex`, and atomic synchronization primitives in favor of a flat, pre-spawned worker architecture. Error handling is simplified via `.map_err()` and the `?` operator, while progress bar updates are decoupled into dedicated callbacks.
This commit is contained in:
Eric Coissac
2026-06-15 15:45:04 +02:00
parent bc92dc4592
commit 1cd7916e06
7 changed files with 194 additions and 318 deletions
+33 -16
View File
@@ -1,5 +1,5 @@
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{self, Write as _}; use std::io::{self, BufWriter, Write as _};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use memmap2::Mmap; use memmap2::Mmap;
@@ -230,30 +230,47 @@ impl PackedBitMatrix {
/// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files. /// Build `presence/matrix.pbmx` from existing `col_*.pbiv` files.
pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> { pub fn pack_bit_matrix(dir: &Path) -> io::Result<()> {
let packed_path = dir.join("matrix.pbmx");
if packed_path.exists() {
// Matrix complete; remove any leftover column files from a killed cleanup.
if let Ok(meta) = MatrixMeta::load(dir) {
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
let _ = fs::remove_file(dir.join("meta.json"));
}
return Ok(());
}
let meta = MatrixMeta::load(dir)?; let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols; let n_cols = meta.n_cols;
let col_files: Vec<Vec<u8>> = (0..n_cols) // Compute offsets from file sizes — no column data loaded into RAM.
.map(|c| fs::read(col_path(dir, c))) let col_sizes: Vec<u64> = (0..n_cols)
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
.collect::<io::Result<_>>()?; .collect::<io::Result<_>>()?;
let header_size = PBMX_HEADER + n_cols * 8; let header_size = (PBMX_HEADER + n_cols * 8) as u64;
let mut col_offset = header_size; let mut col_offset = header_size;
let mut offsets = Vec::with_capacity(n_cols); let mut offsets = Vec::with_capacity(n_cols);
for data in &col_files { for &size in &col_sizes {
offsets.push(col_offset as u64); offsets.push(col_offset);
col_offset += data.len(); col_offset += size;
} }
let packed_path = dir.join("matrix.pbmx"); // Write to a temp file; rename atomically so a killed process never leaves
let mut file = File::create(&packed_path)?; // a truncated matrix.pbmx that would be mistaken for a complete file.
file.write_all(&PBMX_MAGIC)?; let tmp_path = dir.join("matrix.pbmx.tmp");
file.write_all(&[0u8; 4])?; let mut out = BufWriter::new(File::create(&tmp_path)?);
file.write_all(&(meta.n as u64).to_le_bytes())?; out.write_all(&PBMX_MAGIC)?;
file.write_all(&(n_cols as u64).to_le_bytes())?; out.write_all(&[0u8; 4])?;
for &off in &offsets { file.write_all(&off.to_le_bytes())?; } out.write_all(&(meta.n as u64).to_le_bytes())?;
for data in &col_files { file.write_all(data)?; } out.write_all(&(n_cols as u64).to_le_bytes())?;
drop(file); for &off in &offsets { out.write_all(&off.to_le_bytes())?; }
for c in 0..n_cols {
io::copy(&mut File::open(col_path(dir, c))?, &mut out)?;
}
out.flush()?;
drop(out);
fs::rename(&tmp_path, &packed_path)?;
for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; } for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; }
fs::remove_file(dir.join("meta.json"))?; fs::remove_file(dir.join("meta.json"))?;
+33 -16
View File
@@ -1,6 +1,6 @@
use std::cmp::Ordering; use std::cmp::Ordering;
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{self, Write as _}; use std::io::{self, BufWriter, Write as _};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use memmap2::Mmap; use memmap2::Mmap;
@@ -354,30 +354,47 @@ impl PackedCompactIntMatrix {
/// Build `counts/matrix.pcmx` from existing `col_*.pciv` files. /// Build `counts/matrix.pcmx` from existing `col_*.pciv` files.
pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> { pub fn pack_compact_int_matrix(dir: &Path) -> io::Result<()> {
let packed_path = dir.join("matrix.pcmx");
if packed_path.exists() {
// Matrix complete; remove any leftover column files from a killed cleanup.
if let Ok(meta) = MatrixMeta::load(dir) {
for c in 0..meta.n_cols { let _ = fs::remove_file(col_path(dir, c)); }
let _ = fs::remove_file(dir.join("meta.json"));
}
return Ok(());
}
let meta = MatrixMeta::load(dir)?; let meta = MatrixMeta::load(dir)?;
let n_cols = meta.n_cols; let n_cols = meta.n_cols;
let col_files: Vec<Vec<u8>> = (0..n_cols) // Compute offsets from file sizes — no column data loaded into RAM.
.map(|c| fs::read(col_path(dir, c))) let col_sizes: Vec<u64> = (0..n_cols)
.map(|c| fs::metadata(col_path(dir, c)).map(|m| m.len()))
.collect::<io::Result<_>>()?; .collect::<io::Result<_>>()?;
let header_size = PCMX_HEADER + n_cols * 8; let header_size = (PCMX_HEADER + n_cols * 8) as u64;
let mut col_offset = header_size; let mut col_offset = header_size;
let mut offsets = Vec::with_capacity(n_cols); let mut offsets = Vec::with_capacity(n_cols);
for data in &col_files { for &size in &col_sizes {
offsets.push(col_offset as u64); offsets.push(col_offset);
col_offset += data.len(); col_offset += size;
} }
let packed_path = dir.join("matrix.pcmx"); // Write to a temp file; rename atomically so a killed process never leaves
let mut file = File::create(&packed_path)?; // a truncated matrix.pcmx that would be mistaken for a complete file.
file.write_all(&PCMX_MAGIC)?; let tmp_path = dir.join("matrix.pcmx.tmp");
file.write_all(&[0u8; 4])?; let mut out = BufWriter::new(File::create(&tmp_path)?);
file.write_all(&(meta.n as u64).to_le_bytes())?; out.write_all(&PCMX_MAGIC)?;
file.write_all(&(n_cols as u64).to_le_bytes())?; out.write_all(&[0u8; 4])?;
for &off in &offsets { file.write_all(&off.to_le_bytes())?; } out.write_all(&(meta.n as u64).to_le_bytes())?;
for data in &col_files { file.write_all(data)?; } out.write_all(&(n_cols as u64).to_le_bytes())?;
drop(file); for &off in &offsets { out.write_all(&off.to_le_bytes())?; }
for c in 0..n_cols {
io::copy(&mut File::open(col_path(dir, c))?, &mut out)?;
}
out.flush()?;
drop(out);
fs::rename(&tmp_path, &packed_path)?;
for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; } for c in 0..n_cols { fs::remove_file(col_path(dir, c))?; }
fs::remove_file(dir.join("meta.json"))?; fs::remove_file(dir.join("meta.json"))?;
+26 -45
View File
@@ -1,8 +1,6 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use obikpartitionner::{KmerPartition, KmerSpectrum}; use obikpartitionner::{KmerPartition, KmerSpectrum};
use obilayeredmap; use obilayeredmap;
@@ -152,31 +150,25 @@ impl KmerIndex {
let with_counts = self.meta.config.with_counts; let with_counts = self.meta.config.with_counts;
let evidence = self.meta.config.evidence.clone(); let evidence = self.meta.config.evidence.clone();
let block_bits = self.meta.config.block_bits; let block_bits = self.meta.config.block_bits;
let total_kmers = AtomicUsize::new(0); let mut total_kmers: usize = 0;
let pb = progress_bar("index", n as u64, "partitions");
let pb = Arc::new(Mutex::new(progress_bar("index", n as u64, "partitions"))); let order: Vec<usize> = (0..n).collect();
let runner = crate::numa::PartitionRunner::new();
(0..n).into_par_iter().for_each(|i| { runner.run(
match self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits) { &order,
Ok(0) => {} |i| self.partition.build_index_layer(i, min_ab, max_ab, with_counts, &evidence, block_bits),
Ok(n_kmers) => { |i, n_kmers, _| {
total_kmers.fetch_add(n_kmers, Ordering::Relaxed); if n_kmers > 0 {
let pb = pb.lock().unwrap(); total_kmers += n_kmers;
pb.inc(1); pb.inc(1);
pb.set_message(format!("{i}: {n_kmers} kmers")); pb.set_message(format!("{i}: {n_kmers} kmers"));
} }
Err(e) => { },
eprintln!("error building layer for partition {i}: {e}"); ).map_err(OKIError::Partition)?;
std::process::exit(1);
}
}
});
pb.lock().unwrap().finish_and_clear(); pb.finish_and_clear();
info!( info!("done — {} total kmers indexed", total_kmers);
"done — {} total kmers indexed",
total_kmers.load(Ordering::Relaxed)
);
if !keep_intermediate { if !keep_intermediate {
for i in 0..n { for i in 0..n {
@@ -211,36 +203,25 @@ impl KmerIndex {
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
let n = self.n_partitions(); let n = self.n_partitions();
let errors: Vec<_> = (0..n) let order: Vec<usize> = (0..n).collect();
.into_par_iter() crate::numa::PartitionRunner::new().run(
.filter_map(|i| { &order,
|i| -> OKIResult<()> {
let index_dir = self.partition.part_dir(i).join("index"); let index_dir = self.partition.part_dir(i).join("index");
if !index_dir.exists() { return None; } if !index_dir.exists() { return Ok(()); }
let meta = match PartitionMeta::load(&index_dir) { let meta = PartitionMeta::load(&index_dir)
Ok(m) => m, .map_err(|e| OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())))?;
Err(e) => return Some(OKIError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
};
for l in 0..meta.n_layers { for l in 0..meta.n_layers {
let layer_dir = index_dir.join(format!("layer_{l}")); let layer_dir = index_dir.join(format!("layer_{l}"));
let presence_dir = layer_dir.join("presence"); let presence_dir = layer_dir.join("presence");
let counts_dir = layer_dir.join("counts"); let counts_dir = layer_dir.join("counts");
if presence_dir.exists() { if presence_dir.exists() { pack_bit_matrix(&presence_dir).map_err(OKIError::Io)?; }
if let Err(e) = pack_bit_matrix(&presence_dir) { if counts_dir.exists() { pack_compact_int_matrix(&counts_dir).map_err(OKIError::Io)?; }
return Some(OKIError::Io(e));
} }
}
if counts_dir.exists() {
if let Err(e) = pack_compact_int_matrix(&counts_dir) {
return Some(OKIError::Io(e));
}
}
}
None
})
.collect();
if let Some(e) = errors.into_iter().next() { return Err(e); }
Ok(()) Ok(())
},
|_, _, _| {},
)
} }
/// Write a `layer_meta.json` in any layer directory that is missing one. /// Write a `layer_meta.json` in any layer directory that is missing one.
+52 -156
View File
@@ -1,4 +1,4 @@
// NUMA-aware Rayon thread pools via hwlocality. // NUMA-aware partition runner via hwlocality.
// //
// Detects NUMA topology using hwloc (cross-platform: Linux, macOS, etc.) and // Detects NUMA topology using hwloc (cross-platform: Linux, macOS, etc.) and
// builds one Rayon ThreadPool per NUMA node with threads pinned to that node's // builds one Rayon ThreadPool per NUMA node with threads pinned to that node's
@@ -10,15 +10,14 @@
// - the system has only one NUMA node (UMA, Apple Silicon, single-socket) // - the system has only one NUMA node (UMA, Apple Silicon, single-socket)
// - any per-node pool fails to build // - any per-node pool fails to build
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use crossbeam_channel::{RecvTimeoutError, unbounded}; use crossbeam_channel::unbounded;
use hwlocality::Topology; use hwlocality::Topology;
use hwlocality::cpu::binding::CpuBindingFlags; use hwlocality::cpu::binding::CpuBindingFlags;
use hwlocality::cpu::cpuset::CpuSet; use hwlocality::cpu::cpuset::CpuSet;
use hwlocality::object::types::ObjectType; use hwlocality::object::types::ObjectType;
use obisys::CpuSample;
use tracing::debug; use tracing::debug;
// ── Public interface ────────────────────────────────────────────────────────── // ── Public interface ──────────────────────────────────────────────────────────
@@ -104,27 +103,6 @@ fn build_pool(cpus: &[usize]) -> Option<rayon::ThreadPool> {
.ok() .ok()
} }
// ── Adaptive spawn heuristic ──────────────────────────────────────────────────
//
// First worker: spawn if CPU efficiency is below SPAWN_THRESHOLD (machine is
// under-utilised). Subsequent workers: spawn only if the last worker raised
// efficiency by at least the expected marginal gain (1/n_workers), with a
// minimum floor to avoid spurious spawns from efficiency fluctuations.
const SPAWN_THRESHOLD: f64 = 0.95;
const MIN_MARGINAL_GAIN: f64 = 0.03;
const SPAWN_POLL: Duration = Duration::from_secs(20);
fn should_spawn_worker(n_workers: usize, eff: f64, eff_at_last_spawn: f64) -> bool {
if n_workers == 1 {
eff < SPAWN_THRESHOLD
} else {
let gain = eff - eff_at_last_spawn;
let expected = 1.0 / n_workers as f64;
gain >= (expected * 0.25).max(MIN_MARGINAL_GAIN)
}
}
// ── PartitionRunner ─────────────────────────────────────────────────────────── // ── PartitionRunner ───────────────────────────────────────────────────────────
struct NodeConfig { struct NodeConfig {
@@ -135,42 +113,38 @@ struct NodeConfig {
/// Generic NUMA-aware runner for partition-level parallel work. /// Generic NUMA-aware runner for partition-level parallel work.
/// ///
/// Encapsulates worker spawning, NUMA pinning, adaptive activation, and result /// Workers are distributed round-robin across NUMA nodes and pinned to their
/// collection. UMA systems are handled as the degenerate case of a single node /// node's CPUs. UMA systems are the degenerate case: one node, no pinning.
/// with no pinning.
/// ///
/// # Model /// # Termination
/// ///
/// One controller thread per NUMA node (one total on UMA). Each controller /// Termination is driven entirely by channel closure:
/// manages up to `max_workers` dormant workers that drain a shared work queue. ///
/// Workers are activated one at a time; a new worker is added when global CPU /// ```text
/// efficiency justifies it. On NUMA all workers are activated immediately /// drop(part_tx) → part_rx drains → workers exit → drop their result_tx
/// (memory bandwidth, not CPU count, is the bottleneck). /// drop(result_tx) → result_rx closes → controller loop exits
/// ```
///
/// No explicit counter or sentinel needed.
pub struct PartitionRunner { pub struct PartitionRunner {
nodes: Vec<NodeConfig>, nodes: Vec<NodeConfig>,
n_cores: usize,
} }
impl PartitionRunner { impl PartitionRunner {
/// Detect topology and build. Falls back to a single-node UMA runner on /// Total worker slots across all nodes.
/// macOS, single-socket machines, or hwloc failure.
/// Total number of pre-spawned worker slots across all nodes.
pub fn max_workers(&self) -> usize { pub fn max_workers(&self) -> usize {
self.nodes.iter().map(|n| n.max_workers).sum() self.nodes.iter().map(|n| n.max_workers).sum()
} }
/// Detect topology and build. Falls back to a single-node UMA runner on
/// macOS, single-socket machines, or hwloc failure.
pub fn new() -> Self { pub fn new() -> Self {
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
match build() { match build() {
Some(ns) => { Some(ns) => {
let wpn = ns.workers_per_node(); let wpn = ns.workers_per_node();
debug!( debug!(
"PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node", "PartitionRunner: NUMA mode — {} node(s) × {} worker(s)/node",
ns.pools.len(), ns.pools.len(), wpn,
wpn,
); );
let nodes = ns.pools let nodes = ns.pools
.into_iter() .into_iter()
@@ -181,21 +155,20 @@ impl PartitionRunner {
max_workers: wpn, max_workers: wpn,
}) })
.collect(); .collect();
Self { nodes, n_cores } Self { nodes }
} }
None => { None => {
let n_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let max_workers = (n_cores / 2).max(1); let max_workers = (n_cores / 2).max(1);
debug!( debug!("PartitionRunner: UMA mode — {} worker(s)", max_workers);
"PartitionRunner: UMA mode — adaptive up to {} worker(s)",
max_workers,
);
Self { Self {
nodes: vec![NodeConfig { nodes: vec![NodeConfig {
pool: None, pool: None,
cpu_ids: vec![], cpu_ids: vec![],
max_workers, max_workers,
}], }],
n_cores,
} }
} }
} }
@@ -203,19 +176,17 @@ impl PartitionRunner {
/// Run `f(i)` for every index in `order`. /// Run `f(i)` for every index in `order`.
/// ///
/// `on_done(i, result, elapsed)` is called under an internal mutex as each /// Workers are spawned upfront and distributed round-robin across NUMA
/// partition completes — suitable for progress bars, logging, and result /// nodes. `on_done(i, result, elapsed)` is called from the controller
/// aggregation. No `Send` or `Sync` bound is required on the callback. /// thread as each partition completes — suitable for progress bars and
/// /// result aggregation.
/// The work queue is shared across all NUMA nodes: any idle worker takes
/// the next available partition regardless of node, ensuring load balance.
/// ///
/// Returns the first error produced by `f`, if any. /// Returns the first error produced by `f`, if any.
pub fn run<F, R, E, C>( pub fn run<F, R, E, C>(
&self, &self,
order: &[usize], order: &[usize],
f: F, f: F,
on_done: C, mut on_done: C,
) -> Result<(), E> ) -> Result<(), E>
where where
F: Fn(usize) -> Result<R, E> + Send + Sync, F: Fn(usize) -> Result<R, E> + Send + Sync,
@@ -223,49 +194,29 @@ impl PartitionRunner {
E: Send, E: Send,
C: FnMut(usize, R, Duration) + Send, C: FnMut(usize, R, Duration) + Send,
{ {
let f = Arc::new(f); // Pre-load the work queue, then drop the sender so workers' part_rx
let on_done = Arc::new(Mutex::new(on_done)); // iterators terminate when the queue is drained.
let first_err: Arc<Mutex<Option<E>>> = Arc::new(Mutex::new(None));
// Shared work queue — pre-loaded in caller-supplied order.
let (part_tx, part_rx) = unbounded::<usize>(); let (part_tx, part_rx) = unbounded::<usize>();
for &i in order { for &i in order { part_tx.send(i).ok(); }
part_tx.send(i).ok();
}
drop(part_tx); drop(part_tx);
let n_cores = self.n_cores; let (result_tx, result_rx) = unbounded::<(usize, Result<R, E>, Duration)>();
let n_nodes = self.nodes.len();
let f = &f; // shared borrow; F: Sync so concurrent calls are safe
let mut first_err: Option<E> = None;
std::thread::scope(|s| { std::thread::scope(|s| {
for node in &self.nodes { // Spawn all workers upfront, round-robin across NUMA nodes.
let f = Arc::clone(&f); for w in 0..self.max_workers() {
let on_done = Arc::clone(&on_done); let node = &self.nodes[w % n_nodes];
let first_err = Arc::clone(&first_err);
let part_rx = part_rx.clone();
s.spawn(move || {
// Per-node result and activation channels.
let (result_tx, result_rx) =
unbounded::<(usize, Result<R, E>, Duration)>();
let (activate_tx, activate_rx) = unbounded::<()>();
std::thread::scope(|ws| {
// Pre-spawn workers (all dormant until activated).
for _ in 0..node.max_workers {
let prx = part_rx.clone(); let prx = part_rx.clone();
let rtx = result_tx.clone(); let rtx = result_tx.clone();
let arx = activate_rx.clone();
let f = Arc::clone(&f);
let pool = node.pool.clone(); let pool = node.pool.clone();
let cpu_ids = node.cpu_ids.clone(); let cpu_ids = &node.cpu_ids;
ws.spawn(move || { s.spawn(move || {
if !cpu_ids.is_empty() { if !cpu_ids.is_empty() { pin_current_thread(cpu_ids); }
pin_current_thread(&cpu_ids);
}
if arx.recv().is_err() {
return; // never activated — exit cleanly
}
for i in &prx { for i in &prx {
let t = Instant::now(); let t = Instant::now();
let r = match &pool { let r = match &pool {
@@ -276,78 +227,23 @@ impl PartitionRunner {
} }
}); });
} }
// Drop the controller's copy: result_rx disconnects
// once all worker copies are also dropped (workers done). // Drop the controller's sender: result_rx closes once all worker
// rtx clones are dropped (i.e. all workers have exited).
drop(result_tx); drop(result_tx);
// In NUMA mode activate all workers immediately; // Drain results concurrently with workers. The for loop exits
// in UMA mode activate one and grow adaptively. // when result_rx is disconnected — at that point all workers are
let numa_mode = node.pool.is_some(); // done and the scope join below is instantaneous.
let initial = if numa_mode { node.max_workers } else { 1 }; for (i, r, dur) in &result_rx {
for _ in 0..initial {
activate_tx.send(()).ok();
}
let mut active_workers = initial;
let mut cpu_sample = CpuSample::now();
let mut eff_at_last_spawn = 0.0f64;
// Controller loop.
loop {
match result_rx.recv_timeout(SPAWN_POLL) {
Ok((i, r, dur)) => {
match r { match r {
Ok(v) => { Ok(v) => on_done(i, v, dur),
on_done.lock().unwrap()(i, v, dur); Err(e) => { if first_err.is_none() { first_err = Some(e); } }
}
Err(e) => {
let mut g = first_err.lock().unwrap();
if g.is_none() { *g = Some(e); }
} }
} }
if !numa_mode && active_workers < node.max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
debug!(
"activated worker {} — efficiency {:.0}%",
active_workers + 1,
eff * 100.0,
);
activate_tx.send(()).ok();
active_workers += 1;
eff_at_last_spawn = eff;
cpu_sample = CpuSample::now();
}
}
}
Err(RecvTimeoutError::Timeout) => {
if !numa_mode && active_workers < node.max_workers {
let eff = cpu_sample.cpu_efficiency(n_cores);
if should_spawn_worker(active_workers, eff, eff_at_last_spawn) {
debug!(
"activated worker {} (poll) — efficiency {:.0}%",
active_workers + 1,
eff * 100.0,
);
activate_tx.send(()).ok();
active_workers += 1;
eff_at_last_spawn = eff;
cpu_sample = CpuSample::now();
}
}
}
Err(RecvTimeoutError::Disconnected) => break,
}
}
// Signal any dormant workers that were never activated
// to exit (UMA mode where max_workers was never reached).
drop(activate_tx);
}); // ws: waits for all workers of this node
}); });
}
}); // s: waits for all node controllers
let mut g = first_err.lock().unwrap(); match first_err {
match g.take() {
Some(e) => Err(e), Some(e) => Err(e),
None => Ok(()), None => Ok(()),
} }
+7 -15
View File
@@ -4,7 +4,6 @@ use std::path::Path;
use obikpartitionner::{KmerFilter, KmerPartition, MergeMode}; use obikpartitionner::{KmerFilter, KmerPartition, MergeMode};
use obisys::{Reporter, Stage, progress_bar}; use obisys::{Reporter, Stage, progress_bar};
use rayon::prelude::*;
use tracing::info; use tracing::info;
use crate::error::{OKIError, OKIResult}; use crate::error::{OKIError, OKIResult};
@@ -83,23 +82,16 @@ impl KmerIndex {
let src_partition = &src.partition; let src_partition = &src.partition;
let block_bits = meta.config.block_bits; let block_bits = meta.config.block_bits;
let errors: Vec<obiskio::SKError> = (0..n_partitions) let order: Vec<usize> = (0..n_partitions).collect();
.into_par_iter() let runner = crate::numa::PartitionRunner::new();
.filter_map(|i| { runner.run(
let result = dst_partition &order,
.rebuild_partition(src_partition, i, filters, mode, n_genomes, block_bits) |i| dst_partition.rebuild_partition(src_partition, i, filters, mode, n_genomes, block_bits),
.err(); |_, _, _| { pb.inc(1); },
pb.inc(1); ).map_err(OKIError::Partition)?;
result
})
.collect();
pb.finish_and_clear(); pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::Partition(e));
}
rep.push(t.stop()); rep.push(t.stop());
// Write SENTINEL_INDEXED — output is ready to use. // Write SENTINEL_INDEXED — output is ready to use.
+8 -17
View File
@@ -3,7 +3,6 @@ use std::path::Path;
use obilayeredmap::{IndexMode, layer::Layer}; use obilayeredmap::{IndexMode, layer::Layer};
use obilayeredmap::meta::PartitionMeta; use obilayeredmap::meta::PartitionMeta;
use obisys::{Reporter, Stage, progress_bar}; use obisys::{Reporter, Stage, progress_bar};
use rayon::prelude::*;
use tracing::info; use tracing::info;
use crate::error::{OKIError, OKIResult}; use crate::error::{OKIError, OKIResult};
@@ -45,25 +44,17 @@ impl KmerIndex {
let t = Stage::start("reindex"); let t = Stage::start("reindex");
let pb = progress_bar("reindex", n as u64, "partitions"); let pb = progress_bar("reindex", n as u64, "partitions");
let errors: Vec<String> = (0..n) let order: Vec<usize> = (0..n).collect();
.into_par_iter() let runner = crate::numa::PartitionRunner::new();
.filter_map(|i| { runner.run(
let res = reindex_partition( &order,
&self.partition.part_dir(i).join("index"), |i| reindex_partition(&self.partition.part_dir(i).join("index"), &target, block_bits)
&target, .map_err(|e| OKIError::InvalidInput(format!("partition {i}: {e}"))),
block_bits, |_, _, _| { pb.inc(1); },
); )?;
pb.inc(1);
res.err().map(|e| format!("partition {i}: {e}"))
})
.collect();
pb.finish_and_clear(); pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::InvalidInput(e));
}
self.meta.config.evidence = target; self.meta.config.evidence = target;
if matches!(self.meta.config.evidence, IndexMode::Exact) { if matches!(self.meta.config.evidence, IndexMode::Exact) {
self.meta.config.block_bits = block_bits; self.meta.config.block_bits = block_bits;
+15 -33
View File
@@ -4,7 +4,6 @@ use std::path::Path;
use obikpartitionner::{KmerPartition, OutputCol, PARTITIONS_SUBDIR}; use obikpartitionner::{KmerPartition, OutputCol, PARTITIONS_SUBDIR};
use obisys::{Stage, progress_bar}; use obisys::{Stage, progress_bar};
use rayon::prelude::*;
use tracing::info; use tracing::info;
use crate::error::{OKIError, OKIResult}; use crate::error::{OKIError, OKIResult};
@@ -72,25 +71,16 @@ impl KmerIndex {
let pb = progress_bar("select", n_partitions as u64, "partitions"); let pb = progress_bar("select", n_partitions as u64, "partitions");
let src_partition = &src.partition; let src_partition = &src.partition;
let errors: Vec<obiskio::SKError> = (0..n_partitions) let order: Vec<usize> = (0..n_partitions).collect();
.into_par_iter() let runner = crate::numa::PartitionRunner::new();
.filter_map(|i| { runner.run(
let result = dst_partition.select_partition( &order,
src_partition, i, specs, |i| dst_partition.select_partition(src_partition, i, specs, n_src_genomes, threshold, output_presence, false),
n_src_genomes, threshold, output_presence, |_, _, _| { pb.inc(1); },
false, ).map_err(OKIError::Partition)?;
);
pb.inc(1);
result.err()
})
.collect();
pb.finish_and_clear(); pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::Partition(e));
}
let _ = t.stop(); let _ = t.stop();
fs::File::create(output.join(SENTINEL_INDEXED))?; fs::File::create(output.join(SENTINEL_INDEXED))?;
@@ -132,25 +122,17 @@ impl KmerIndex {
let t = Stage::start("select"); let t = Stage::start("select");
let pb = progress_bar("select", n_partitions as u64, "partitions"); let pb = progress_bar("select", n_partitions as u64, "partitions");
let errors: Vec<obiskio::SKError> = (0..n_partitions) let partition = &self.partition;
.into_par_iter() let order: Vec<usize> = (0..n_partitions).collect();
.filter_map(|i| { let runner = crate::numa::PartitionRunner::new();
let result = self.partition.select_partition( runner.run(
&src_partition, i, specs, &order,
n_src_genomes, threshold, output_presence, |i| partition.select_partition(&src_partition, i, specs, n_src_genomes, threshold, output_presence, true),
true, |_, _, _| { pb.inc(1); },
); ).map_err(OKIError::Partition)?;
pb.inc(1);
result.err()
})
.collect();
pb.finish_and_clear(); pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(OKIError::Partition(e));
}
let _ = t.stop(); let _ = t.stop();
// Update index.meta with new genome list and with_counts flag. // Update index.meta with new genome list and with_counts flag.